AsyncFileTest.cpp
上传用户:romrleung
上传日期:2022-05-23
资源大小:18897k
文件大小:17k
源码类别:

MySQL数据库

开发平台:

Visual C++

  1. /* Copyright (C) 2003 MySQL AB
  2.    This program is free software; you can redistribute it and/or modify
  3.    it under the terms of the GNU General Public License as published by
  4.    the Free Software Foundation; either version 2 of the License, or
  5.    (at your option) any later version.
  6.    This program is distributed in the hope that it will be useful,
  7.    but WITHOUT ANY WARRANTY; without even the implied warranty of
  8.    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  9.    GNU General Public License for more details.
  10.    You should have received a copy of the GNU General Public License
  11.    along with this program; if not, write to the Free Software
  12.    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
  13. //#define TESTDEBUG 1
  14. #include <ndb_global.h>
  15. #include <kernel_types.h>
  16. #include <Pool.hpp>
  17. #include "AsyncFile.hpp"
  18. #include "NdbOut.hpp"
  19. #include "NdbTick.h"
  20. #include "NdbThread.h"
  21. #include "NdbMain.h"
  22. // Test and benchmark functionality of AsyncFile
  23. // -n Number of files
  24. // -r Number of simultaneous requests
  25. // -s Filesize, number of pages
  26. // -l Number of iterations
  27. // -remove, remove files after close
  28. // -reverse, write files in reverse order, start with the last page
  29. #define MAXFILES 255
  30. #define DEFAULT_NUM_FILES 1
  31. #define MAXREQUESTS 256
  32. #define DEFAULT_NUM_REQUESTS 1
  33. #define MAXFILESIZE 4096
  34. #define DEFAULT_FILESIZE 2048
  35. #define FVERSION 0x01000000
  36. #define PAGESIZE 8192
  37. #define TIMER_START { Uint64 starttick = NdbTick_CurrentMillisecond()
  38. #define TIMER_PRINT(str, ops) Uint64 stoptick = NdbTick_CurrentMillisecond();
  39.             Uint64 totaltime = (stoptick-starttick);  
  40.             ndbout << ops << " " << str << 
  41.     " total time " << (int)totaltime << "ms" << endl;
  42.             char buf[255];
  43.             sprintf(buf,  "%d %s/secn",(int)((ops*1000)/totaltime), str);
  44.             ndbout <<buf << endl;}
  45. static int numberOfFiles = DEFAULT_NUM_FILES;
  46. static int numberOfRequests = DEFAULT_NUM_REQUESTS;
  47. static int fileSize = DEFAULT_FILESIZE;
  48. static int removeFiles = 0;
  49. static int writeFilesReverse = 0;
  50. static int numberOfIterations = 1;
  51. Uint32 FileNameArray[4];
  52. Pool<AsyncFile>* files;
  53. AsyncFile* openFiles[MAXFILES];
  54. Pool<Request>* theRequestPool;
  55. MemoryChannelMultipleWriter<Request>* theReportChannel;
  56. char WritePages[MAXFILES][PAGESIZE];
  57. char ReadPages[MAXFILES][PAGESIZE];
  58. int readArguments(int argc, const char** argv);
  59. int openFile(int fileNum);
  60. int openFileWait();
  61. int closeFile(int fileNum);
  62. int closeFileWait();
  63. int writeFile( int fileNum, int pagenum);
  64. int writeFileWait();
  65. int writeSyncFile( int fileNum, int pagenum);
  66. int writeSyncFileWait();
  67. int readFile( int fileNum, int pagenum);
  68. int readFileWait();
  69. NDB_COMMAND(aftest, "aftest", "aftest [-n <Number of files>] [-r <Number of simultaneous requests>] [-s <Filesize, number of pages>] [-l <Number of iterations>] [-remove, remove files after close] [-reverse, write files in reverse order, start with the last page]", "Test the AsyncFile class of Ndb", 8192)
  70. {
  71.   int s, numReq, numOps;
  72.   readArguments(argc, argv);
  73.    
  74.   files = new Pool<AsyncFile>(numberOfFiles, 2);
  75.   theRequestPool = new Pool<Request>;
  76.   theReportChannel = new MemoryChannelMultipleWriter<Request>;
  77.   ndbout << "AsyncFileTest starting" << endl;
  78.   ndbout << "  " << numberOfFiles << " files" << endl;
  79.   ndbout << "  " << numberOfRequests << " requests" << endl;
  80.   ndbout << "  " << fileSize << " * 8k files" << endl << endl;
  81.   ndbout << "  " << numberOfIterations << " iterations" << endl << endl;
  82.   NdbThread_SetConcurrencyLevel(numberOfFiles+2);
  83.   // initialize data to write to files
  84.   for (int i = 0; i < MAXFILES; i++) {
  85.     for (int j = 0; j < PAGESIZE; j++){
  86.       WritePages[i][j] = (64+i+j)%256;
  87.     }
  88.       //      memset(&WritePages[i][0], i+64, PAGESIZE);
  89.   }
  90.   // Set file directory and name
  91.   // /T27/F27/NDBFS/S27Pnn.data
  92.   FileNameArray[0] = 27; // T27
  93.   FileNameArray[1] = 27; // F27
  94.   FileNameArray[2] = 27; // S27 
  95.   FileNameArray[3] = FVERSION; // Version
  96.   
  97.   for (int l = 0; l < numberOfIterations; l++)
  98.     {
  99.   ndbout << "Opening files" << endl;
  100.   // Open files
  101.   for (int f = 0; f < numberOfFiles; f++)
  102.   {
  103.       openFile(f);
  104.  
  105.   }
  106.   // Wait for answer
  107.   openFileWait();
  108.   ndbout << "Files opened!" << endl<< endl;
  109.   // Write to files
  110.   ndbout << "Started writing" << endl;
  111.   TIMER_START;
  112.   s = 0;
  113.   numReq = 0;
  114.   numOps = 0;
  115.   while ( s < fileSize)
  116.   {
  117.     for (int r = 0; r < numberOfRequests; r++)
  118.     {
  119.       for (int f = 0; f < numberOfFiles; f++)
  120.       {
  121. writeFile(f, s);
  122. numReq++;
  123. numOps++;
  124.       }
  125.       
  126.       s++;
  127.     }
  128.     
  129.     while (numReq > 0)
  130.       {
  131. writeFileWait();
  132. numReq--;
  133.       }
  134.   }
  135.   TIMER_PRINT("writes", numOps);
  136.   ndbout << "Started reading" << endl;
  137.   TIMER_START;
  138.   // Read from files
  139.   s = 0;
  140.   numReq = 0;
  141.   numOps = 0;
  142.   while ( s < fileSize)
  143.   {
  144.     for (int r = 0; r < numberOfRequests; r++)
  145.     {
  146.       for (int f = 0; f < numberOfFiles; f++)
  147.       {
  148. readFile(f, s);
  149. numReq++;
  150. numOps++;
  151.       }
  152.       
  153.       s++;
  154.       
  155.     }
  156.     
  157.     while (numReq > 0)
  158.       {
  159. readFileWait();
  160. numReq--;
  161.       }
  162.   }
  163.   TIMER_PRINT("reads", numOps);
  164.   ndbout << "Started writing with sync" << endl;
  165.   TIMER_START;
  166.   // Write to files
  167.   s = 0;
  168.   numReq = 0;
  169.   numOps = 0;
  170.   while ( s < fileSize)
  171.   {
  172.     for (int r = 0; r < numberOfRequests; r++)
  173.     {
  174.       for (int f = 0; f < numberOfFiles; f++)
  175.       {
  176. writeSyncFile(f, s);
  177. numReq++;
  178. numOps++;
  179.       }
  180.       
  181.       s++;
  182.     }
  183.     
  184.     while (numReq > 0)
  185.       {
  186. writeSyncFileWait();
  187. numReq--;
  188.       }
  189.   }
  190.   TIMER_PRINT("writeSync", numOps);
  191.   
  192.   // Close files
  193.   ndbout << "Closing files" << endl;
  194.   for (int f = 0; f < numberOfFiles; f++)
  195.   {
  196.       closeFile(f);
  197.  
  198.   }
  199.   // Wait for answer
  200.   closeFileWait();
  201.   ndbout << "Files closed!" << endl<< endl;
  202.     }
  203.   // Deallocate memory
  204.   delete files;
  205.   delete theReportChannel;
  206.   delete theRequestPool;
  207.   return 0;
  208. }
  209. int forward( AsyncFile * file, Request* request )
  210. {
  211.    file->execute(request);
  212.    ERROR_CHECK 0;
  213.    return 1;
  214. }
  215. int openFile( int fileNum)
  216. {
  217.   AsyncFile* file = (AsyncFile *)files->get();
  218.   FileNameArray[3] = fileNum | FVERSION;
  219.   file->fileName().set( NDBFS_REF, &FileNameArray[0] );
  220.   ndbout << "openFile: " << file->fileName().c_str() << endl;
  221.   
  222.   if( ERROR_STATE ) {
  223.      ERROR_RESET;
  224.      files->put( file );
  225.      ndbout <<  "Failed to set filename" << endl;
  226.      return 1;
  227.   }
  228.   file->reportTo(theReportChannel);
  229.           
  230.   Request* request = theRequestPool->get();
  231.   request->action= Request::open;
  232.   request->error= 0;
  233.   request->par.open.flags = 0x302; //O_RDWR | O_CREAT | O_TRUNC ; // 770
  234.   request->set(NDBFS_REF, 0x23456789, fileNum );
  235.   request->file = file;
  236.   
  237.   if (!forward(file,request)) {
  238.      // Something went wrong
  239.      ndbout << "Could not forward open request" << endl;
  240.      theRequestPool->put(request);
  241.      return 1;
  242.   }
  243.   return 0;
  244. }
  245. int closeFile( int fileNum)
  246. {
  247.   AsyncFile* file = openFiles[fileNum];
  248.           
  249.   Request* request = theRequestPool->get();
  250.   if (removeFiles == 1)
  251.     request->action = Request::closeRemove;
  252.   else
  253.     request->action= Request::close;
  254.   request->error= 0;
  255.   request->set(NDBFS_REF, 0x23456789, fileNum );
  256.   request->file = file;
  257.   
  258.   if (!forward(file,request)) {
  259.      // Something went wrong
  260.      ndbout << "Could not forward close request" << endl;
  261.      theRequestPool->put(request);
  262.      return 1;
  263.   }
  264.   return 0;
  265. }
  266. int writeFile( int fileNum, int pagenum)
  267. {
  268.   AsyncFile* file = openFiles[fileNum];
  269. #ifdef TESTDEBUG
  270.   ndbout << "writeFile" << fileNum <<": "<<pagenum<<", " << file->fileName().c_str()<< endl;
  271. #endif
  272.   Request *request = theRequestPool->get();
  273.   request->action = Request::write;
  274.   request->error = 0;
  275.   request->set(NDBFS_REF, pagenum, fileNum);
  276.   request->file = openFiles[fileNum];
  277.   // Write only one page, choose the correct page for each file using fileNum
  278.   request->par.readWrite.pages[0].buf = &WritePages[fileNum][0];
  279.   request->par.readWrite.pages[0].size = PAGESIZE;
  280.   if (writeFilesReverse == 1)
  281.   {
  282.     // write the last page in the files first
  283.     // This is a normal way for the Blocks in Ndb to write to a file
  284.      request->par.readWrite.pages[0].offset = (fileSize - pagenum - 1) * PAGESIZE;
  285.   }
  286.   else
  287.   {
  288.      request->par.readWrite.pages[0].offset = pagenum * PAGESIZE;
  289.   }
  290.   request->par.readWrite.numberOfPages = 1;
  291.   
  292.   if (!forward(file,request)) {
  293.      // Something went wrong
  294.      ndbout << "Could not forward write request" << endl;
  295.      theRequestPool->put(request);
  296.      return 1;
  297.   }
  298.   return 0;
  299. }
  300. int writeSyncFile( int fileNum, int pagenum)
  301. {
  302.   AsyncFile* file = openFiles[fileNum];
  303. #ifdef TESTDEBUG
  304.   ndbout << "writeFile" << fileNum <<": "<<pagenum<<", " << file->fileName().c_str() << endl;
  305. #endif
  306.   Request *request = theRequestPool->get();
  307.   request->action = Request::writeSync;
  308.   request->error = 0;
  309.   request->set(NDBFS_REF, pagenum, fileNum);
  310.   request->file = openFiles[fileNum];
  311.   // Write only one page, choose the correct page for each file using fileNum
  312.   request->par.readWrite.pages[0].buf = &WritePages[fileNum][0];
  313.   request->par.readWrite.pages[0].size = PAGESIZE;
  314.   request->par.readWrite.pages[0].offset = pagenum * PAGESIZE;
  315.   request->par.readWrite.numberOfPages = 1;
  316.   
  317.   if (!forward(file,request)) {
  318.      // Something went wrong
  319.      ndbout << "Could not forward write request" << endl;
  320.      theRequestPool->put(request);
  321.      return 1;
  322.   }
  323.   return 0;
  324. }
  325. int readFile( int fileNum, int pagenum)
  326. {
  327.   AsyncFile* file = openFiles[fileNum];
  328. #ifdef TESTDEBUG
  329.   ndbout << "readFile" << fileNum <<": "<<pagenum<<", " << file->fileName().c_str() << endl;
  330. #endif
  331.   Request *request = theRequestPool->get();
  332.   request->action = Request::read;
  333.   request->error = 0;
  334.   request->set(NDBFS_REF, pagenum, fileNum);
  335.   request->file = openFiles[fileNum];
  336.   // Read only one page, choose the correct page for each file using fileNum
  337.   request->par.readWrite.pages[0].buf = &ReadPages[fileNum][0];
  338.   request->par.readWrite.pages[0].size = PAGESIZE;
  339.   request->par.readWrite.pages[0].offset = pagenum * PAGESIZE;
  340.   request->par.readWrite.numberOfPages = 1;
  341.   
  342.   if (!forward(file,request)) {
  343.      // Something went wrong
  344.      ndbout << "Could not forward read request" << endl;
  345.      theRequestPool->put(request);
  346.      return 1;
  347.   }
  348.   return 0;
  349. }
  350. int openFileWait()
  351. {
  352.   int openedFiles = 0;
  353.   while (openedFiles < numberOfFiles)
  354.     {
  355.       Request* request = theReportChannel->readChannel();
  356.       if (request) 
  357. {
  358.   if (request->action == Request::open)
  359.     {       
  360.       if (request->error ==0)
  361. {
  362. #ifdef TESTDEBUG
  363.              ndbout << "Opened file " << request->file->fileName().c_str() << endl;
  364. #endif
  365.   openFiles[request->theFilePointer] = request->file;
  366. }
  367.       else
  368. {
  369.   ndbout << "error while opening file" << endl;
  370.   exit(1);
  371. }
  372.       theRequestPool->put(request);
  373.       openedFiles++;
  374.     }
  375.   else
  376.     {
  377.       ndbout << "Unexpected request received" << endl;
  378.     }
  379. }
  380.       else
  381. {
  382.   ndbout << "Nothing read from theReportChannel" << endl;
  383. }
  384.     }
  385.   return 0;
  386. }
  387. int closeFileWait()
  388. {
  389.   int closedFiles = 0;
  390.   while (closedFiles < numberOfFiles)
  391.     {
  392.       Request* request = theReportChannel->readChannel();
  393.       if (request) 
  394. {
  395.   if (request->action == Request::close || request->action == Request::closeRemove)
  396.     {       
  397.       if (request->error ==0)
  398. {
  399. #ifdef TESTDEBUG
  400.              ndbout << "Closed file " << request->file->fileName().c_str() << endl;
  401. #endif
  402.   openFiles[request->theFilePointer] = NULL;
  403.   files->put(request->file);
  404. }
  405.       else
  406. {
  407.   ndbout << "error while closing file" << endl;
  408.   exit(1);
  409. }
  410.       theRequestPool->put(request);
  411.       closedFiles++;
  412.     }
  413.   else
  414.     {
  415.       ndbout << "Unexpected request received" << endl;
  416.     }
  417. }
  418.       else
  419. {
  420.   ndbout << "Nothing read from theReportChannel" << endl;
  421. }
  422.     }
  423.   return 0;
  424. }
  425. int writeFileWait()
  426. {
  427.   Request* request = theReportChannel->readChannel();
  428.   if (request) 
  429.     {
  430.       if (request->action == Request::write)
  431. {       
  432.   if (request->error == 0)
  433.     {
  434. #ifdef TESTDEBUG
  435.       ndbout << "writeFileWait"<<request->theFilePointer<<", " << request->theUserPointer<<" "<< request->file->fileName().c_str() << endl;
  436. #endif
  437.     }
  438.   else
  439.     {
  440.       ndbout << "error while writing file, error=" << request->error << endl;
  441.       exit(1);
  442.     }
  443.   theRequestPool->put(request);
  444. }
  445.       else
  446. {
  447.   ndbout << "Unexpected request received" << endl;
  448. }
  449.     }
  450.   else
  451.     {
  452.       ndbout << "Nothing read from theReportChannel" << endl;
  453.     }
  454.   return 0;
  455. }
  456. int writeSyncFileWait()
  457. {
  458.   Request* request = theReportChannel->readChannel();
  459.   if (request) 
  460.     {
  461.       if (request->action == Request::writeSync)
  462. {       
  463.   if (request->error == 0)
  464.     {
  465. #ifdef TESTDEBUG
  466.       ndbout << "writeFileWait"<<request->theFilePointer<<", " << request->theUserPointer<<" "<< request->file->fileName().c_str() << endl;
  467. #endif
  468.     }
  469.   else
  470.     {
  471.       ndbout << "error while writing file" << endl;
  472.       exit(1);
  473.     }
  474.   theRequestPool->put(request);
  475. }
  476.       else
  477. {
  478.   ndbout << "Unexpected request received" << endl;
  479. }
  480.     }
  481.   else
  482.     {
  483.       ndbout << "Nothing read from theReportChannel" << endl;
  484.     }
  485.   return 0;
  486. }
  487. int readFileWait()
  488. {
  489.   Request* request = theReportChannel->readChannel();
  490.   if (request) 
  491.     {
  492.       if (request->action == Request::read)
  493. {       
  494.   if (request->error == 0)
  495.     {
  496. #ifdef TESTDEBUG
  497.       ndbout << "readFileWait"<<request->theFilePointer<<", " << request->theUserPointer<<" "<< request->file->fileName().c_str() << endl;
  498. #endif
  499.       if (memcmp(&(ReadPages[request->theFilePointer][0]), &(WritePages[request->theFilePointer][0]), PAGESIZE)!=0) 
  500. {
  501.   ndbout <<"Verification error!" << endl;
  502.   for (int i = 0; i < PAGESIZE; i++ ){
  503.     ndbout <<" Compare Page " <<  i << " : " << ReadPages[request->theFilePointer][i] <<", " <<WritePages[request->theFilePointer][i] << endl;;
  504.     if( ReadPages[request->theFilePointer][i] !=WritePages[request->theFilePointer][i])
  505.       exit(1);
  506.        }
  507. }
  508.     }
  509.   else
  510.     {
  511.       ndbout << "error while reading file" << endl;
  512.       exit(1);
  513.     }
  514.   theRequestPool->put(request);
  515. }
  516.       else
  517. {
  518.   ndbout << "Unexpected request received" << endl;
  519. }
  520.     }
  521.   else
  522.     {
  523.       ndbout << "Nothing read from theReportChannel" << endl;
  524.     }
  525.   return 0;
  526. }
  527. int readArguments(int argc, const char** argv)
  528. {
  529.   int i = 1;
  530.   while (argc > 1)
  531.   {
  532.     if (strcmp(argv[i], "-n") == 0)
  533.     {
  534.       numberOfFiles = atoi(argv[i+1]);
  535.       if ((numberOfFiles < 1) || (numberOfFiles > MAXFILES))
  536.       {
  537. ndbout << "Wrong number of files, default = "<<DEFAULT_NUM_FILES << endl;
  538.   numberOfFiles = DEFAULT_NUM_FILES;
  539.       }
  540.     }
  541.     else if (strcmp(argv[i], "-r") == 0)
  542.     {
  543.       numberOfRequests = atoi(argv[i+1]);
  544.       if ((numberOfRequests < 1) || (numberOfRequests > MAXREQUESTS))
  545.       {
  546. ndbout << "Wrong number of requests, default = "<<DEFAULT_NUM_REQUESTS << endl;
  547.   numberOfRequests = DEFAULT_NUM_REQUESTS;
  548.       }
  549.     }
  550.     else if (strcmp(argv[i], "-s") == 0)
  551.     {
  552.       fileSize = atoi(argv[i+1]);
  553.       if ((fileSize < 1) || (fileSize > MAXFILESIZE))
  554.       {
  555. ndbout << "Wrong number of 8k pages, default = "<<DEFAULT_FILESIZE << endl;
  556.   fileSize = DEFAULT_FILESIZE;
  557.       }
  558.     }
  559.     else if (strcmp(argv[i], "-l") == 0)
  560.     {
  561.       numberOfIterations = atoi(argv[i+1]);
  562.       if ((numberOfIterations < 1))
  563.       {
  564. ndbout << "Wrong number of iterations, default = 1" << endl;
  565. numberOfIterations = 1;
  566.       }
  567.     }
  568.     else if (strcmp(argv[i], "-remove") == 0)
  569.     {
  570.       removeFiles = 1;
  571.       argc++;
  572.       i--;
  573.     }
  574.     else if (strcmp(argv[i], "-reverse") == 0)
  575.     {
  576.       ndbout << "Writing files reversed" << endl;
  577.       writeFilesReverse = 1;
  578.       argc++;
  579.       i--;
  580.     }
  581.     argc -= 2;
  582.     i = i + 2;
  583.   }
  584.   
  585.   if ((fileSize % numberOfRequests)!= 0)
  586.     {
  587.     numberOfRequests = numberOfRequests - (fileSize % numberOfRequests);
  588.     ndbout <<"numberOfRequest must be modulo of filesize" << endl;
  589.     ndbout << "New numberOfRequest="<<numberOfRequests<<endl;
  590.     }
  591.   return 0;
  592. }
  593. // Needed for linking...
  594. void ErrorReporter::handleError(ErrorCategory type, int messageID,
  595.                                 const char* problemData, const char* objRef, NdbShutdownType stype)
  596. {
  597.   ndbout << "ErrorReporter::handleError activated" << endl;
  598.   ndbout << "type= " << type << endl; 
  599.   ndbout << "messageID= " << messageID << endl;
  600.   ndbout << "problemData= " << problemData << endl;
  601.   ndbout << "objRef= " << objRef << endl;
  602.   exit(1);
  603. }
  604. void ErrorReporter::handleAssert(const char* message, const char* file, int line)
  605. {
  606.   ndbout << "ErrorReporter::handleAssert activated" << endl;
  607.   ndbout << "message= " << message << endl;
  608.   ndbout << "file= " << file << endl;
  609.   ndbout << "line= " << line << endl;
  610.   exit(1);
  611. }
  612. GlobalData globalData;
  613. Signal::Signal()
  614. {
  615. }