bulk_copy.cpp
上传用户:romrleung
上传日期:2022-05-23
资源大小:18897k
文件大小:6k
源码类别:

MySQL数据库

开发平台:

Visual C++

  1. /* Copyright (C) 2003 MySQL AB
  2.    This program is free software; you can redistribute it and/or modify
  3.    it under the terms of the GNU General Public License as published by
  4.    the Free Software Foundation; either version 2 of the License, or
  5.    (at your option) any later version.
  6.    This program is distributed in the hope that it will be useful,
  7.    but WITHOUT ANY WARRANTY; without even the implied warranty of
  8.    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  9.    GNU General Public License for more details.
  10.    You should have received a copy of the GNU General Public License
  11.    along with this program; if not, write to the Free Software
  12.    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
  13. #include <NdbOut.hpp>
  14. #include <NdbApi.hpp>
  15. #include <NdbSleep.h>
  16. #include <NDBT.hpp>
  17. #include <getarg.h>
  18. int setValuesFromLine(NdbOperation* pOp,
  19.       const NdbDictionary::Table* pTab, 
  20.       char* line){
  21.   int check = 0;
  22.   char* p = line;
  23.   char* pn;
  24.   char buf[8000];
  25.   // Loop through each attribute in this table  
  26.   for (int a = 0; a<pTab->getNoOfColumns(); a++){
  27.     pn = p;
  28.     while (*pn != ';')
  29.       pn++;
  30.     
  31.     memset(buf, 0, sizeof(buf));
  32.     strncpy(buf, p, pn-p);
  33.     //    ndbout << a << ": " << buf << endl;
  34.     const NdbDictionary::Column* attr = pTab->getColumn(a);            
  35.     switch (attr->getType()){
  36.     case NdbDictionary::Column::Unsigned:
  37.       Int32 sval;
  38.       if (sscanf(buf, "%d", &sval) == 0)
  39. return -2;
  40.       check = pOp->setValue(a, sval);
  41.       break;
  42.     case NdbDictionary::Column::Int:
  43.       Uint32 uval;
  44.       if (sscanf(buf, "%u", &uval) == 0)
  45. return -2;
  46.       check = pOp->setValue(a, uval);
  47.       break;
  48.     case NdbDictionary::Column::Char:
  49.       char buf2[8000];
  50.       char* p2;
  51.       memset(buf2, 0, sizeof(buf));
  52.       p2 = &buf2[0];
  53.       while(*p != ';'){
  54. *p2 = *p;
  55. p++;p2++;
  56.       };
  57.       *p2 = 0;
  58.       check = pOp->setValue(a, buf2);
  59.       break;
  60.     default:
  61.       check = -2;
  62.       break;
  63.     }
  64.     // Move pointer to after next ";"
  65.     while (*p != ';')
  66.       p++;
  67.     p++;
  68.   }
  69.   return check;
  70. }
  71. int insertLine(Ndb* pNdb, 
  72.        const NdbDictionary::Table* pTab,
  73.        char* line){
  74.   int             check;
  75.   int             retryAttempt = 0;
  76.   int             retryMax = 5;
  77.   NdbConnection   *pTrans;
  78.   NdbOperation   *pOp;
  79.   while (retryAttempt < retryMax){
  80.     pTrans = pNdb->startTransaction();
  81.     if (pTrans == NULL) {
  82.       ERR(pNdb->getNdbError());
  83.       NdbSleep_MilliSleep(50);
  84.       retryAttempt++;
  85.       continue;
  86.     }
  87.     pOp = pTrans->getNdbOperation(pTab->getName());
  88.     if (pOp == NULL) {
  89.       ERR(pTrans->getNdbError());
  90.       pNdb->closeTransaction(pTrans);
  91.       return -1;
  92.     }
  93.     check = pOp->insertTuple();
  94.     if( check == -1 ) {
  95.       ERR(pTrans->getNdbError());
  96.       pNdb->closeTransaction(pTrans);
  97.       return -1;
  98.     }
  99.     check = setValuesFromLine(pOp,
  100.       pTab,
  101.       line);
  102.     if (check == -2){
  103.       pNdb->closeTransaction(pTrans);
  104.       return -2;
  105.     }
  106.     if( check == -1 ) {
  107.       ERR(pTrans->getNdbError());
  108.       pNdb->closeTransaction(pTrans);
  109.       return -1;
  110.     }
  111.     // Execute the transaction and insert the record
  112.     check = pTrans->execute( Commit ); 
  113.     if(check == -1 ) {
  114.       const NdbError err = pTrans->getNdbError();
  115.       pNdb->closeTransaction(pTrans);
  116.       switch(err.status){
  117.       case NdbError::Success:
  118. ERR(err);
  119. ndbout << "ERROR: NdbError reports success when transcaction failed" << endl;
  120. return -1;
  121. break;
  122.       case NdbError::TemporaryError:      
  123. ERR(err);
  124. NdbSleep_MilliSleep(50);
  125. retryAttempt++;
  126. continue;
  127. break;
  128.       case NdbError::UnknownResult:
  129. ERR(err);
  130. return -1;
  131. break;
  132.       case NdbError::PermanentError:
  133. switch (err.classification){
  134. case NdbError::ConstraintViolation:
  135.   // Tuple already existed, OK in this application, but should be reported
  136.   ndbout << err.code << " " << err.message << endl;
  137.   break;
  138. default:
  139.   ERR(err);
  140.   return -1;
  141.   break;
  142. }
  143. break;
  144.       }
  145.     }
  146.     else{
  147.       pNdb->closeTransaction(pTrans);
  148.     }
  149.     return 0;
  150.   }
  151.   return check;
  152. }
  153. int insertFile(Ndb* pNdb, 
  154.        const NdbDictionary::Table* pTab,
  155.        const char* fileName){
  156.   const int MAX_LINE_LEN = 8000;
  157.   char line[MAX_LINE_LEN];
  158.   int lineNo = 0;
  159.   FILE* instr = fopen(fileName, "r");
  160.   if (instr == NULL){
  161.     ndbout << "Coul'd not open " << fileName << endl;
  162.     return -1;
  163.   }
  164.   while(fgets(line, MAX_LINE_LEN, instr)){
  165.     lineNo++;
  166.     if (line[strlen(line)-1] == 'n') {
  167.       line[strlen(line)-1] = '';
  168.     }
  169.     int check = insertLine(pNdb, pTab, line);
  170.     if (check == -2){
  171.       ndbout << "Wrong format in input data file, line: " << lineNo << endl;
  172.       fclose(instr);
  173.       return -1;
  174.     }
  175.     if (check == -1){
  176.       fclose(instr);
  177.       return -1;
  178.     }
  179.   }
  180.   fclose(instr);
  181.   return 0;
  182. }
  183. int main(int argc, const char** argv){
  184.   ndb_init();
  185.   const char* _tabname = NULL;
  186.   int _help = 0;
  187.   
  188.   struct getargs args[] = {
  189.     { "usage", '?', arg_flag, &_help, "Print help", "" }
  190.   };
  191.   int num_args = sizeof(args) / sizeof(args[0]);
  192.   int optind = 0;
  193.   char desc[] = 
  194.     "tabnamen"
  195.     "This program will bulk copy data from a file to a table in Ndb.n";
  196.   
  197.   if(getarg(args, num_args, argc, argv, &optind) ||
  198.      argv[optind] == NULL || _help) {
  199.     arg_printusage(args, num_args, argv[0], desc);
  200.     return NDBT_ProgramExit(NDBT_WRONGARGS);
  201.   }
  202.   _tabname = argv[optind];
  203.   ndbout << "Tablename: " << _tabname << endl;
  204.   // Connect to Ndb
  205.   Ndb MyNdb( "TEST_DB" );
  206.   if(MyNdb.init() != 0){
  207.     ERR(MyNdb.getNdbError());
  208.     return NDBT_ProgramExit(NDBT_FAILED);
  209.   }
  210.   // Connect to Ndb and wait for it to become ready
  211.   while(MyNdb.waitUntilReady() != 0)
  212.     ndbout << "Waiting for ndb to become ready..." << endl;
  213.    
  214.   // Check if table exists in db
  215.   const NdbDictionary::Table* pTab = MyNdb.getDictionary()->getTable(_tabname);
  216.   if(pTab == NULL){
  217.     ndbout << " Table " << _tabname << " does not exist!" << endl;
  218.     return NDBT_ProgramExit(NDBT_WRONGARGS);
  219.   }
  220.   
  221.   char buf[255];
  222.   BaseString::snprintf(buf, sizeof(buf), "%s.data", (const char*)_tabname);
  223.   if (insertFile(&MyNdb, pTab, buf) != 0){
  224.     return NDBT_ProgramExit(NDBT_FAILED);
  225.   }
  226.   return NDBT_ProgramExit(NDBT_OK);
  227. }