AsyncFile.cpp
上传用户:romrleung
上传日期:2022-05-23
资源大小:18897k
文件大小:24k
源码类别:

MySQL数据库

开发平台:

Visual C++

  1. /* Copyright (C) 2003 MySQL AB
  2.    This program is free software; you can redistribute it and/or modify
  3.    it under the terms of the GNU General Public License as published by
  4.    the Free Software Foundation; either version 2 of the License, or
  5.    (at your option) any later version.
  6.    This program is distributed in the hope that it will be useful,
  7.    but WITHOUT ANY WARRANTY; without even the implied warranty of
  8.    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  9.    GNU General Public License for more details.
  10.    You should have received a copy of the GNU General Public License
  11.    along with this program; if not, write to the Free Software
  12.    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
  13. #include <ndb_global.h>
  14. #include <my_sys.h>
  15. #include <my_pthread.h>
  16. #include <Error.hpp>
  17. #include "AsyncFile.hpp"
  18. #include <ErrorHandlingMacros.hpp>
  19. #include <kernel_types.h>
  20. #include <NdbMem.h>
  21. #include <NdbThread.h>
  22. #include <signaldata/FsOpenReq.hpp>
  23. // use this to test broken pread code
  24. //#define HAVE_BROKEN_PREAD 
  25. #ifdef HAVE_BROKEN_PREAD
  26. #undef HAVE_PWRITE
  27. #undef HAVE_PREAD
  28. #endif
  29. #if defined NDB_WIN32 || defined NDB_OSE || defined NDB_SOFTOSE
  30. #else
  31. // For readv and writev
  32. #include <sys/uio.h> 
  33. #endif
  34. #ifndef NDB_WIN32
  35. #include <dirent.h>
  36. #endif
  37. // Use this define if you want printouts from AsyncFile class
  38. //#define DEBUG_ASYNCFILE
  39. #ifdef DEBUG_ASYNCFILE
  40. #include <NdbOut.hpp>
  41. #define DEBUG(x) x
  42. #define PRINT_ERRORANDFLAGS(f) printErrorAndFlags(f)
  43. void printErrorAndFlags(Uint32 used_flags);
  44. #else
  45. #define DEBUG(x) 
  46. #define PRINT_ERRORANDFLAGS(f)
  47. #endif
  48. // Define the size of the write buffer (for each thread)
  49. #if defined NDB_SOFTOSE || defined NDB_OSE
  50. #define WRITEBUFFERSIZE 65536
  51. #else 
  52. #define WRITEBUFFERSIZE 262144
  53. #endif
  54. const char *actionName[] = {
  55.   "open",
  56.   "close",
  57.   "closeRemove",
  58.   "read",
  59.   "readv",
  60.   "write",
  61.   "writev",
  62.   "writeSync",
  63.   "writevSync",
  64.   "sync",
  65.   "end" };
  66. static int numAsyncFiles = 0;
  67. extern "C" void * runAsyncFile(void* arg)
  68. {
  69.   ((AsyncFile*)arg)->run();
  70.   return (NULL);
  71. }
  72. AsyncFile::AsyncFile() :
  73.   theFileName(),
  74. #ifdef NDB_WIN32
  75.   hFile(INVALID_HANDLE_VALUE),
  76. #else
  77.   theFd(-1),
  78. #endif
  79.   theReportTo(0),
  80.   theMemoryChannelPtr(NULL)
  81. {
  82. }
  83. void
  84. AsyncFile::doStart(Uint32 nodeId,
  85.    const char * filesystemPath,
  86.    const char * backup_path) {
  87.   theFileName.init(nodeId, filesystemPath, backup_path);
  88.   // Stacksize for filesystem threads
  89.   // An 8k stack should be enough
  90.   const NDB_THREAD_STACKSIZE stackSize = 8192;
  91.   char buf[16];
  92.   numAsyncFiles++;
  93.   BaseString::snprintf(buf, sizeof(buf), "AsyncFile%d", numAsyncFiles);
  94.   theStartMutexPtr = NdbMutex_Create();
  95.   theStartConditionPtr = NdbCondition_Create();
  96.   NdbMutex_Lock(theStartMutexPtr);
  97.   theStartFlag = false;
  98.   theThreadPtr = NdbThread_Create(runAsyncFile,
  99.                                   (void**)this,
  100.                                   stackSize,
  101.                                   (char*)&buf,
  102.                                   NDB_THREAD_PRIO_MEAN);
  103.   NdbCondition_Wait(theStartConditionPtr,
  104.                     theStartMutexPtr);    
  105.   NdbMutex_Unlock(theStartMutexPtr);
  106.   NdbMutex_Destroy(theStartMutexPtr);
  107.   NdbCondition_Destroy(theStartConditionPtr);
  108. }
  109. AsyncFile::~AsyncFile() 
  110. {
  111.   void *status;
  112.   Request request;
  113.   request.action = Request::end;
  114.   theMemoryChannelPtr->writeChannel( &request );
  115.   NdbThread_WaitFor(theThreadPtr, &status);
  116.   NdbThread_Destroy(&theThreadPtr);
  117.   delete theMemoryChannelPtr;
  118. }
  119. void
  120. AsyncFile::reportTo( MemoryChannel<Request> *reportTo )
  121. {
  122.   theReportTo = reportTo;
  123. }
  124. void AsyncFile::execute(Request* request) 
  125. {
  126.   theMemoryChannelPtr->writeChannel( request );
  127. }
  128. void
  129. AsyncFile::run()
  130. {
  131.   Request *request;
  132.   // Create theMemoryChannel in the thread that will wait for it
  133.   NdbMutex_Lock(theStartMutexPtr);
  134.   theMemoryChannelPtr = new MemoryChannel<Request>();
  135.   theStartFlag = true;
  136.   // Create write buffer for bigger writes
  137.   theWriteBufferSize = WRITEBUFFERSIZE;
  138.   theWriteBuffer = (char *) NdbMem_Allocate(theWriteBufferSize); 
  139.   NdbMutex_Unlock(theStartMutexPtr);
  140.   NdbCondition_Signal(theStartConditionPtr);
  141.   
  142.   if (!theWriteBuffer) {
  143.     DEBUG(ndbout_c("AsyncFile::writeReq, Failed allocating write buffer"));
  144.     return;
  145.   }//if
  146.   while (1) {
  147.     request = theMemoryChannelPtr->readChannel();
  148.     if (!request) {
  149.       DEBUG(ndbout_c("Nothing read from Memory Channel in AsyncFile"));
  150.       endReq();
  151.       return;
  152.     }//if
  153.     switch (request->action) {
  154.     case Request:: open:
  155.       openReq(request);
  156.       break;
  157.     case Request:: close:
  158.       closeReq(request);
  159.       break;
  160.     case Request:: closeRemove:
  161.       closeReq(request);
  162.       removeReq(request);
  163.       break;
  164.     case Request:: read:
  165.       readReq(request);
  166.       break;
  167.     case Request:: readv:
  168.       readvReq(request);
  169.       break;
  170.     case Request:: write:
  171.       writeReq(request);
  172.       break;
  173.     case Request:: writev:
  174.       writevReq(request);
  175.       break;
  176.     case Request:: writeSync:
  177.       writeReq(request);
  178.       syncReq(request);
  179.       break;
  180.     case Request:: writevSync:
  181.       writevReq(request);
  182.       syncReq(request);
  183.       break;
  184.     case Request:: sync:
  185.       syncReq(request);
  186.       break;
  187.     case Request:: append:
  188.       appendReq(request);
  189.       break;
  190.     case Request::rmrf:
  191.       rmrfReq(request, (char*)theFileName.c_str(), request->par.rmrf.own_directory);
  192.       break;
  193.     case Request:: end:
  194.       if (theFd > 0)
  195.         closeReq(request);
  196.       endReq();
  197.       return;
  198.     default:
  199.       abort();
  200.       break;
  201.     }//switch
  202.     
  203.     // No need to signal as ndbfs only uses tryRead
  204.     theReportTo->writeChannelNoSignal(request);
  205.   }//while
  206. }//AsyncFile::run()
  207. extern bool Global_useO_SYNC;
  208. extern bool Global_useO_DIRECT;
  209. extern bool Global_unlinkO_CREAT;
  210. extern Uint32 Global_syncFreq;
  211. void AsyncFile::openReq(Request* request)
  212. {  
  213.   m_openedWithSync = false;
  214.   m_syncFrequency = 0;
  215.   m_syncCount= 0;
  216.   // for open.flags, see signal FSOPENREQ
  217. #ifdef NDB_WIN32
  218.   DWORD dwCreationDisposition;
  219.   DWORD dwDesiredAccess = 0;
  220.   DWORD dwShareMode = FILE_SHARE_READ | FILE_SHARE_WRITE;
  221.   DWORD dwFlagsAndAttributes = FILE_ATTRIBUTE_NORMAL | FILE_FLAG_RANDOM_ACCESS | FILE_FLAG_NO_BUFFERING;
  222.   const Uint32 flags = request->par.open.flags;
  223.     
  224.     // Convert file open flags from Solaris to Windows
  225.   if ((flags & FsOpenReq::OM_CREATE) && (flags & FsOpenReq::OM_TRUNCATE)){
  226.     dwCreationDisposition = CREATE_ALWAYS;
  227.   } else if (flags & FsOpenReq::OM_TRUNCATE){
  228.     dwCreationDisposition = TRUNCATE_EXISTING;
  229.   } else if (flags & FsOpenReq::OM_CREATE){
  230.     dwCreationDisposition = CREATE_NEW;
  231.   } else {
  232.     dwCreationDisposition = OPEN_EXISTING;
  233.   }
  234.   
  235.   switch(flags & 3){
  236.   case FsOpenReq::OM_READONLY:
  237.     dwDesiredAccess = GENERIC_READ;
  238.     break;
  239.   case FsOpenReq::OM_WRITEONLY:
  240.     dwDesiredAccess = GENERIC_WRITE;
  241.     break;
  242.   case FsOpenReq::OM_READWRITE:
  243.     dwDesiredAccess = GENERIC_READ | GENERIC_WRITE;
  244.     break;
  245.   default:
  246.     request->error = 1000;
  247.     break;
  248.     return;
  249.   }
  250.   hFile = CreateFile(theFileName.c_str(), dwDesiredAccess, dwShareMode, 
  251.                      0, dwCreationDisposition, dwFlagsAndAttributes, 0);
  252.     
  253.   if(INVALID_HANDLE_VALUE == hFile) {
  254.     request->error = GetLastError();
  255.     if(((ERROR_PATH_NOT_FOUND == request->error) || (ERROR_INVALID_NAME == request->error))
  256.        && (flags & FsOpenReq::OM_CREATE)) {
  257.       createDirectories();
  258.       hFile = CreateFile(theFileName.c_str(), dwDesiredAccess, dwShareMode, 
  259.                          0, dwCreationDisposition, dwFlagsAndAttributes, 0);
  260.             
  261.       if(INVALID_HANDLE_VALUE == hFile)
  262.         request->error = GetLastError();
  263.       else
  264.         request->error = 0;
  265.             
  266.       return;
  267.     }
  268.   } 
  269.   else {
  270.     request->error = 0;
  271.     return;
  272.   }
  273. #else
  274.   const Uint32 flags = request->par.open.flags;
  275.   Uint32 new_flags = 0;
  276.   // Convert file open flags from Solaris to Liux
  277.   if(flags & FsOpenReq::OM_CREATE){
  278.     new_flags |= O_CREAT;
  279.   }
  280.   if(flags & FsOpenReq::OM_TRUNCATE){
  281. #if 0
  282.     if(Global_unlinkO_CREAT){
  283.       unlink(theFileName.c_str());
  284.     } else 
  285. #endif
  286.       new_flags |= O_TRUNC;
  287.   }  
  288.   if(flags & FsOpenReq::OM_APPEND){
  289.     new_flags |= O_APPEND;
  290.   }
  291.   if(flags & FsOpenReq::OM_SYNC){
  292. #if 0
  293.     if(Global_useO_SYNC){
  294.       new_flags |= O_SYNC;
  295.       m_openedWithSync = true;
  296.       m_syncFrequency = 0;
  297.     } else {
  298. #endif
  299.       m_openedWithSync = false;
  300.       m_syncFrequency = Global_syncFreq;
  301. #if 0
  302.     }
  303. #endif
  304.   } else {
  305.     m_openedWithSync = false;
  306.     m_syncFrequency = 0;
  307.   }
  308. #if 0
  309.   //#if NDB_LINUX
  310.   if(Global_useO_DIRECT){
  311.     new_flags |= O_DIRECT;
  312.   }
  313. #endif
  314.   switch(flags & 0x3){
  315.   case FsOpenReq::OM_READONLY:
  316.     new_flags |= O_RDONLY;
  317.     break;
  318.   case FsOpenReq::OM_WRITEONLY:
  319.     new_flags |= O_WRONLY;
  320.     break;
  321.   case FsOpenReq::OM_READWRITE:
  322.     new_flags |= O_RDWR;
  323.     break;
  324.   default:
  325.     request->error = 1000;
  326.     break;
  327.     return;
  328.   }
  329.   const int mode = S_IRUSR | S_IWUSR | S_IRGRP;
  330.   
  331.   if (-1 == (theFd = ::open(theFileName.c_str(), new_flags, mode))) {
  332.     PRINT_ERRORANDFLAGS(new_flags);
  333.     if( (errno == ENOENT ) && (new_flags & O_CREAT ) ) {
  334.       createDirectories();
  335.       if (-1 == (theFd = ::open(theFileName.c_str(), new_flags, mode))) {
  336.         PRINT_ERRORANDFLAGS(new_flags);
  337.         request->error = errno;
  338.       }
  339.     } else {
  340.       request->error = errno;
  341.     }
  342.   }
  343. #endif
  344. }
  345. int
  346. AsyncFile::readBuffer(char * buf, size_t size, off_t offset){
  347.   int return_value;
  348.   
  349. #ifdef NDB_WIN32
  350.   DWORD dwSFP = SetFilePointer(hFile, offset, 0, FILE_BEGIN);
  351.   if(dwSFP != offset) {
  352.     return GetLastError();
  353.   }
  354. #elif ! defined(HAVE_PREAD)
  355.   off_t seek_val;
  356.   while((seek_val= lseek(theFd, offset, SEEK_SET)) == (off_t)-1 
  357. && errno == EINTR);
  358.   if(seek_val == (off_t)-1)
  359.   {
  360.     return errno;
  361.   }
  362. #endif
  363.     
  364.   while (size > 0) {
  365.     size_t bytes_read = 0;
  366.     
  367. #ifdef NDB_WIN32
  368.     DWORD dwBytesRead;
  369.     BOOL bRead = ReadFile(hFile, 
  370.                           buf,
  371.                           size,
  372.                           &dwBytesRead,
  373.                           0);
  374.     if(!bRead){
  375.       return GetLastError();
  376.     } 
  377.     bytes_read = dwBytesRead;
  378. #elif  ! defined(HAVE_PREAD)
  379.     return_value = ::read(theFd, buf, size);
  380. #else // UNIX
  381.     return_value = ::pread(theFd, buf, size, offset);
  382. #endif
  383. #ifndef NDB_WIN32
  384.     if (return_value == -1 && errno == EINTR) {
  385.       DEBUG(ndbout_c("EINTR in read"));
  386.       continue;
  387.     } else if (return_value == -1){
  388.       return errno;
  389.     } else {
  390.       bytes_read = return_value;
  391.     }
  392. #endif
  393.     if(bytes_read == 0){
  394.       DEBUG(ndbout_c("Read underflow %d %dn %xn%d %d", 
  395.             size, offset, buf, bytes_read, return_value));
  396.       return ERR_ReadUnderflow;
  397.     }
  398.     
  399.     if(bytes_read != size){
  400.       DEBUG(ndbout_c("Warning partial read %d != %d", 
  401.             bytes_read, size));
  402.     }
  403.     buf += bytes_read;
  404.     size -= bytes_read;
  405.     offset += bytes_read;
  406.   }
  407.   return 0;
  408. }
  409. void
  410. AsyncFile::readReq( Request * request)
  411. {
  412.   for(int i = 0; i < request->par.readWrite.numberOfPages ; i++) {
  413.     off_t offset = request->par.readWrite.pages[i].offset;
  414.     size_t size  = request->par.readWrite.pages[i].size;
  415.     char * buf   = request->par.readWrite.pages[i].buf;
  416.     
  417.     int err = readBuffer(buf, size, offset);
  418.     if(err != 0){
  419.       request->error = err;
  420.       return;
  421.     }
  422.   }
  423. }
  424. void
  425. AsyncFile::readvReq( Request * request)
  426. {
  427. #if ! defined(HAVE_PREAD)
  428.   readReq(request);
  429.   return;
  430. #elif defined NDB_WIN32
  431.   // ReadFileScatter?
  432.   readReq(request);
  433.   return;
  434. #else
  435.   int return_value;
  436.   int length = 0;
  437.   struct iovec iov[20]; // the parameter in the signal restricts this to 20 deep
  438.   for(int i=0; i < request->par.readWrite.numberOfPages ; i++) {
  439.     iov[i].iov_base= request->par.readWrite.pages[i].buf;
  440.     iov[i].iov_len= request->par.readWrite.pages[i].size;
  441.     length = length + iov[i].iov_len;
  442.   }
  443.   lseek( theFd, request->par.readWrite.pages[0].offset, SEEK_SET );
  444.   return_value = ::readv(theFd, iov, request->par.readWrite.numberOfPages);
  445.   if (return_value == -1) {
  446.     request->error = errno;
  447.     return;
  448.   } else if (return_value != length) {
  449.     request->error = 1011;
  450.     return;
  451.   }
  452. #endif
  453. }
  454. int 
  455. AsyncFile::extendfile(Request* request) {
  456. #if ! defined(HAVE_PWRITE)
  457.   // Find max size of this file in this request
  458.   int maxOffset = 0;
  459.   int maxSize = 0;
  460.   for(int i=0; i < request->par.readWrite.numberOfPages ; i++) {
  461.     if (request->par.readWrite.pages[i].offset > maxOffset) {
  462.       maxOffset = request->par.readWrite.pages[i].offset;
  463.       maxSize = request->par.readWrite.pages[i].size;
  464.     }
  465.   }
  466.   DEBUG(ndbout_c("extendfile: maxOffset=%d, size=%d", maxOffset, maxSize));
  467.   // Allocate a buffer and fill it with zeros
  468.   void* pbuf = NdbMem_Allocate(maxSize);
  469.   memset(pbuf, 0, maxSize);
  470.   for (int p = 0; p <= maxOffset; p = p + maxSize) {
  471.     int return_value;
  472.     return_value = lseek(theFd, 
  473.                          p,
  474.                          SEEK_SET);
  475.     if((return_value == -1 ) || (return_value != p)) {
  476.       return -1;
  477.     }
  478.     return_value = ::write(theFd, 
  479.                            pbuf,
  480.                            maxSize);
  481.     if ((return_value == -1) || (return_value != maxSize)) {
  482.       return -1;
  483.     }
  484.   }
  485.   free(pbuf);
  486.   
  487.   DEBUG(ndbout_c("extendfile: "%s" OK!", theFileName.c_str()));
  488.   return 0;
  489. #else
  490.   request = request;
  491.   abort();
  492.   return -1;
  493. #endif
  494. }
  495. void
  496. AsyncFile::writeReq( Request * request)
  497. {
  498.   int page_num = 0;
  499.   bool write_not_complete = true;
  500.   while(write_not_complete) {
  501.     int totsize = 0;
  502.     off_t offset = request->par.readWrite.pages[page_num].offset;
  503.     char* bufptr = theWriteBuffer;
  504.       
  505.     write_not_complete = false;
  506.     if (request->par.readWrite.numberOfPages > 1) {
  507.       off_t page_offset = offset;
  508.       // Multiple page write, copy to buffer for one write
  509.       for(int i=page_num; i < request->par.readWrite.numberOfPages; i++) {
  510.         memcpy(bufptr, 
  511.                request->par.readWrite.pages[i].buf, 
  512.                request->par.readWrite.pages[i].size);
  513.         bufptr += request->par.readWrite.pages[i].size;
  514.         totsize += request->par.readWrite.pages[i].size;
  515.         if (((i + 1) < request->par.readWrite.numberOfPages)) {
  516.           // There are more pages to write
  517.           // Check that offsets are consequtive
  518.   off_t tmp = page_offset + request->par.readWrite.pages[i].size;
  519.           if (tmp != request->par.readWrite.pages[i+1].offset) {
  520.             // Next page is not aligned with previous, not allowed
  521.             DEBUG(ndbout_c("Page offsets are not aligned"));
  522.             request->error = EINVAL;
  523.             return;
  524.           }
  525.           if ((unsigned)(totsize + request->par.readWrite.pages[i+1].size) > (unsigned)theWriteBufferSize) {
  526.             // We are not finished and the buffer is full
  527.             write_not_complete = true;
  528.             // Start again with next page
  529.             page_num = i + 1;
  530.             break;
  531.           }
  532.         }
  533.         page_offset += request->par.readWrite.pages[i].size;
  534.       }
  535.       bufptr = theWriteBuffer;
  536.     } else { 
  537.       // One page write, write page directly
  538.       bufptr = request->par.readWrite.pages[0].buf;
  539.       totsize = request->par.readWrite.pages[0].size;
  540.     }
  541.     int err = writeBuffer(bufptr, totsize, offset);
  542.     if(err != 0){
  543.       request->error = err;
  544.       return;
  545.     }
  546.   } // while(write_not_complete)
  547. }
  548. int
  549. AsyncFile::writeBuffer(const char * buf, size_t size, off_t offset, 
  550.        size_t chunk_size)
  551. {
  552.   size_t bytes_to_write = chunk_size;
  553.   int return_value;
  554. #ifdef NDB_WIN32
  555.   DWORD dwSFP = SetFilePointer(hFile, offset, 0, FILE_BEGIN);
  556.   if(dwSFP != offset) {
  557.     return GetLastError();
  558.   }
  559. #elif ! defined(HAVE_PWRITE)
  560.   off_t seek_val;
  561.   while((seek_val= lseek(theFd, offset, SEEK_SET)) == (off_t)-1 
  562. && errno == EINTR);
  563.   if(seek_val == (off_t)-1)
  564.   {
  565.     return errno;
  566.   }
  567. #endif
  568.     
  569.   while (size > 0) {
  570.     if (size < bytes_to_write){
  571.       // We are at the last chunk
  572.       bytes_to_write = size;
  573.     }
  574.     size_t bytes_written = 0;
  575.     
  576. #ifdef NDB_WIN32
  577.     DWORD dwWritten;
  578.     BOOL bWrite = WriteFile(hFile, buf, bytes_to_write, &dwWritten, 0);
  579.     if(!bWrite) {
  580.       return GetLastError();
  581.     }
  582.     bytes_written = dwWritten;
  583.     if (bytes_written != bytes_to_write) {
  584.       DEBUG(ndbout_c("Warning partial write %d != %d", bytes_written, bytes_to_write));
  585.     }
  586.     
  587. #elif ! defined(HAVE_PWRITE)
  588.     return_value = ::write(theFd, buf, bytes_to_write);
  589. #else // UNIX
  590.     return_value = ::pwrite(theFd, buf, bytes_to_write, offset);
  591. #endif
  592. #ifndef NDB_WIN32
  593.     if (return_value == -1 && errno == EINTR) {
  594.       bytes_written = 0;
  595.       DEBUG(ndbout_c("EINTR in write"));
  596.     } else if (return_value == -1){
  597.       return errno;
  598.     } else {
  599.       bytes_written = return_value;
  600.       if(bytes_written == 0){
  601. abort();
  602.       }
  603.       
  604.       if(bytes_written != bytes_to_write){
  605. DEBUG(ndbout_c("Warning partial write %d != %d", 
  606.  bytes_written, bytes_to_write));
  607.       }
  608.     }
  609. #endif
  610.     
  611.     m_syncCount+= bytes_written;
  612.     buf += bytes_written;
  613.     size -= bytes_written;
  614.     offset += bytes_written;
  615.   }
  616.   return 0;
  617. }
  618. void
  619. AsyncFile::writevReq( Request * request)
  620. {
  621.   // WriteFileGather on WIN32?
  622.   writeReq(request);
  623. }
  624. void
  625. AsyncFile::closeReq(Request * request)
  626. {
  627.   syncReq(request);
  628. #ifdef NDB_WIN32
  629.   if(!CloseHandle(hFile)) {
  630.     request->error = GetLastError();
  631.   }
  632.   hFile = INVALID_HANDLE_VALUE;
  633. #else
  634.   if (-1 == ::close(theFd)) {
  635. #ifndef DBUG_OFF
  636.     if (theFd == -1)
  637.       abort();
  638. #endif
  639.     request->error = errno;
  640.   }
  641.   theFd = -1;
  642. #endif
  643. }
  644. bool AsyncFile::isOpen(){
  645. #ifdef NDB_WIN32
  646.   return (hFile != INVALID_HANDLE_VALUE);
  647. #else
  648.   return (theFd != -1);
  649. #endif
  650. }
  651. void
  652. AsyncFile::syncReq(Request * request)
  653. {
  654.   if(m_openedWithSync ||
  655.      m_syncCount == 0){
  656.     return;
  657.   }
  658. #ifdef NDB_WIN32
  659.   if(!FlushFileBuffers(hFile)) {
  660.     request->error = GetLastError();
  661.     return;
  662.   }
  663. #else
  664.   if (-1 == ::fsync(theFd)){
  665.     request->error = errno;
  666.     return;
  667.   }
  668. #endif
  669.   m_syncCount = 0;
  670. }
  671. void
  672. AsyncFile::appendReq(Request * request){
  673.   
  674.   const char * buf = request->par.append.buf;
  675.   Uint32 size = request->par.append.size;
  676.   m_syncCount += size;
  677. #ifdef NDB_WIN32
  678.   DWORD dwWritten = 0;  
  679.   while(size > 0){
  680.     if(!WriteFile(hFile, buf, size, &dwWritten, 0)){
  681.       request->error = GetLastError();
  682.       return ;
  683.     }
  684.     
  685.     buf += dwWritten;
  686.     size -= dwWritten;
  687.   }
  688. #else
  689.   while(size > 0){
  690.     const int n = write(theFd, buf, size);
  691.     if(n == -1 && errno == EINTR){
  692.       continue;
  693.     }
  694.     if(n == -1){
  695.       request->error = errno;
  696.       return;
  697.     }
  698.     if(n == 0){
  699.       abort();
  700.     }
  701.     size -= n;
  702.     buf += n;
  703.   }
  704. #endif
  705.   if(m_syncFrequency != 0 && m_syncCount > m_syncFrequency){
  706.     syncReq(request);
  707.   }
  708. }
  709. void
  710. AsyncFile::removeReq(Request * request)
  711. {
  712. #ifdef NDB_WIN32
  713.   if(!DeleteFile(theFileName.c_str())) {
  714.     request->error = GetLastError();
  715.   }
  716. #else
  717.   if (-1 == ::remove(theFileName.c_str())) {
  718.     request->error = errno;
  719.   }
  720. #endif
  721. }
  722. void
  723. AsyncFile::rmrfReq(Request * request, char * path, bool removePath){
  724.   Uint32 path_len = strlen(path);
  725.   Uint32 path_max_copy = PATH_MAX - path_len;
  726.   char* path_add = &path[path_len];
  727. #ifndef NDB_WIN32
  728.   if(!request->par.rmrf.directory){
  729.     // Remove file
  730.     if(unlink((const char *)path) != 0 && errno != ENOENT)
  731.       request->error = errno;
  732.     return;
  733.   }
  734.   // Remove directory 
  735.   DIR* dirp = opendir((const char *)path);
  736.   if(dirp == 0){
  737.     if(errno != ENOENT) 
  738.       request->error = errno;
  739.     return;
  740.   }
  741.   struct dirent * dp;
  742.   while ((dp = readdir(dirp)) != NULL){
  743.     if ((strcmp(".", dp->d_name) != 0) && (strcmp("..", dp->d_name) != 0)) {
  744.       BaseString::snprintf(path_add, (size_t)path_max_copy, "%s%s",
  745.        DIR_SEPARATOR, dp->d_name);
  746.       if(remove((const char*)path) == 0){
  747.         path[path_len] = 0;
  748. continue;
  749.       }
  750.       
  751.       rmrfReq(request, path, true);
  752.       path[path_len] = 0;
  753.       if(request->error != 0){
  754. closedir(dirp);
  755. return;
  756.       }
  757.     }
  758.   }
  759.   closedir(dirp);
  760.   if(removePath && rmdir((const char *)path) != 0){
  761.     request->error = errno;
  762.   }
  763.   return;
  764. #else
  765.   if(!request->par.rmrf.directory){
  766.     // Remove file
  767.     if(!DeleteFile(path)){
  768.       DWORD dwError = GetLastError();
  769.       if(dwError!=ERROR_FILE_NOT_FOUND)
  770. request->error = dwError;
  771.     }
  772.     return;
  773.   }
  774.   
  775.   strcat(path, "\*");
  776.   WIN32_FIND_DATA ffd;
  777.   HANDLE hFindFile = FindFirstFile(path, &ffd);
  778.   path[path_len] = 0;
  779.   if(INVALID_HANDLE_VALUE==hFindFile){
  780.     DWORD dwError = GetLastError();
  781.     if(dwError!=ERROR_PATH_NOT_FOUND)
  782.       request->error = dwError;
  783.     return;
  784.   }
  785.   
  786.   do {
  787.     if(0!=strcmp(".", ffd.cFileName) && 0!=strcmp("..", ffd.cFileName)){
  788.       strcat(path, "\");
  789.       strcat(path, ffd.cFileName);
  790.       if(DeleteFile(path)) {
  791.         path[path_len] = 0;
  792. continue;
  793.       }//if
  794.       
  795.       rmrfReq(request, path, true);
  796.       path[path_len] = 0;
  797.       if(request->error != 0){
  798. FindClose(hFindFile);
  799. return;
  800.       }
  801.     }
  802.   } while(FindNextFile(hFindFile, &ffd));
  803.   
  804.   FindClose(hFindFile);
  805.   
  806.   if(removePath && !RemoveDirectory(path))
  807.     request->error = GetLastError();
  808.   
  809. #endif
  810. }
  811. void AsyncFile::endReq()
  812. {
  813.   // Thread is ended with return
  814.   if (theWriteBuffer) NdbMem_Free(theWriteBuffer);
  815. }
  816. void AsyncFile::createDirectories()
  817. {
  818.   for (int i = 0; i < theFileName.levels(); i++) {
  819. #ifdef NDB_WIN32
  820.     CreateDirectory(theFileName.directory(i), 0);
  821. #else
  822.     //printf("AsyncFile::createDirectories : "%s"n", theFileName.directory(i)); 
  823.     mkdir(theFileName.directory(i), S_IRUSR | S_IWUSR | S_IXUSR | S_IXGRP | S_IRGRP);
  824. #endif
  825.   }
  826. }
  827. #ifdef DEBUG_ASYNCFILE
  828. void printErrorAndFlags(Uint32 used_flags) {
  829.   char buf[255];
  830.   sprintf(buf, "PEAF: errno=%d "", errno);
  831.   switch(errno) {
  832.   case     EACCES:
  833.     strcat(buf, "EACCES");
  834.     break;
  835.   case     EDQUOT: 
  836.     strcat(buf, "EDQUOT");
  837.     break;
  838.   case     EEXIST    :
  839.     strcat(buf, "EEXIST");
  840.     break;
  841.   case     EINTR     :
  842.     strcat(buf, "EINTR");
  843.     break;
  844.   case     EFAULT    :
  845.     strcat(buf, "EFAULT");
  846.     break;
  847.   case     EIO       :
  848.     strcat(buf, "EIO");
  849.     break;
  850.   case     EISDIR    :
  851.     strcat(buf, "EISDIR");
  852.     break;
  853.   case     ELOOP       :
  854.     strcat(buf, "ELOOP");
  855.     break;
  856.   case     EMFILE   :
  857.     strcat(buf, "EMFILE");
  858.     break;
  859.   case     ENFILE    :
  860.     strcat(buf, "ENFILE");
  861.     break;
  862.   case     ENOENT    :
  863.     strcat(buf, "ENOENT ");
  864.     break;
  865.   case     ENOSPC    :
  866.     strcat(buf, "ENOSPC");
  867.     break;
  868.   case     ENOTDIR  :
  869.     strcat(buf, "ENOTDIR");
  870.     break;
  871.   case     ENXIO     : 
  872.     strcat(buf, "ENXIO");
  873.     break;  
  874.   case     EOPNOTSUPP:
  875.     strcat(buf, "EOPNOTSUPP");
  876.     break;
  877. #if !defined NDB_OSE && !defined NDB_SOFTOSE
  878.   case     EMULTIHOP :
  879.     strcat(buf, "EMULTIHOP");
  880.     break;
  881.   case     ENOLINK    :
  882.     strcat(buf, "ENOLINK");
  883.     break;
  884.   case     ENOSR      :
  885.     strcat(buf, "ENOSR");
  886.     break;      
  887.   case     EOVERFLOW :
  888.     strcat(buf,  "EOVERFLOW");
  889.     break;
  890. #endif
  891.   case     EROFS    :
  892.     strcat(buf,  "EROFS");
  893.     break;
  894.   case     EAGAIN :   
  895.     strcat(buf,  "EAGAIN");
  896.     break;
  897.   case     EINVAL    :
  898.     strcat(buf,  "EINVAL");
  899.     break;
  900.   case     ENOMEM    :
  901.     strcat(buf, "ENOMEM");
  902.     break;
  903.   case     ETXTBSY   :
  904.     strcat(buf, "ETXTBSY");
  905.     break;
  906.   case ENAMETOOLONG:
  907.     strcat(buf, "ENAMETOOLONG");
  908.     break;
  909.   case EBADF:
  910.     strcat(buf, "EBADF");
  911.     break;
  912.   case ESPIPE:
  913.     strcat(buf, "ESPIPE");
  914.     break;
  915.   case ESTALE:
  916.     strcat(buf, "ESTALE");
  917.     break;
  918.   default:  
  919.     strcat(buf, "EOTHER");
  920.     break;
  921.   }
  922.   strcat(buf, "" ");
  923. #if defined NDB_OSE
  924.   strcat(buf, strerror(errno) << " ");
  925. #endif
  926.   strcat(buf, " flags: ");
  927.   switch(used_flags & 3){
  928.   case O_RDONLY:
  929.     strcat(buf, "O_RDONLY, ");
  930.     break;
  931.   case O_WRONLY:
  932.     strcat(buf, "O_WRONLY, ");
  933.     break;
  934.   case O_RDWR:
  935.     strcat(buf, "O_RDWR, ");
  936.     break;
  937.   default:
  938.     strcat(buf, "Unknown!!, ");
  939.   }
  940.   if((used_flags & O_APPEND)==O_APPEND)
  941.     strcat(buf, "O_APPEND, ");
  942.   if((used_flags & O_CREAT)==O_CREAT)
  943.     strcat(buf, "O_CREAT, ");
  944.   if((used_flags & O_EXCL)==O_EXCL)
  945.     strcat(buf, "O_EXCL, ");
  946.   if((used_flags & O_NOCTTY) == O_NOCTTY)
  947.     strcat(buf, "O_NOCTTY, ");
  948.   if((used_flags & O_NONBLOCK)==O_NONBLOCK)
  949.     strcat(buf, "O_NONBLOCK, ");
  950.   if((used_flags & O_TRUNC)==O_TRUNC)
  951.     strcat(buf, "O_TRUNC, ");
  952. #if !defined NDB_OSE && !defined NDB_SOFTOSE
  953.   if((used_flags & O_DSYNC)==O_DSYNC)
  954.     strcat(buf, "O_DSYNC, ");
  955.   if((used_flags & O_NDELAY)==O_NDELAY)
  956.     strcat(buf, "O_NDELAY, ");
  957.   if((used_flags & O_RSYNC)==O_RSYNC)
  958.     strcat(buf, "O_RSYNC, ");
  959.   if((used_flags & O_SYNC)==O_SYNC)
  960.     strcat(buf, "O_SYNC, ");
  961.   DEBUG(ndbout_c(buf));
  962. #endif
  963. }
  964. #endif