MemoryChannelTest.cpp
上传用户:romrleung
上传日期:2022-05-23
资源大小:18897k
文件大小:4k
源码类别:

MySQL数据库

开发平台:

Visual C++

  1. /* Copyright (C) 2003 MySQL AB
  2.    This program is free software; you can redistribute it and/or modify
  3.    it under the terms of the GNU General Public License as published by
  4.    the Free Software Foundation; either version 2 of the License, or
  5.    (at your option) any later version.
  6.    This program is distributed in the hope that it will be useful,
  7.    but WITHOUT ANY WARRANTY; without even the implied warranty of
  8.    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  9.    GNU General Public License for more details.
  10.    You should have received a copy of the GNU General Public License
  11.    along with this program; if not, write to the Free Software
  12.    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
  13. #include "MemoryChannel.hpp"
  14. #include "NdbThread.h"
  15. #include "NdbSleep.h"
  16. #include "NdbOut.hpp"
  17. #include "NdbMain.h"
  18. MemoryChannel<int>* theMemoryChannel;
  19. extern "C" void* runProducer(void*arg)
  20. {
  21.   // The producer will items into the MemoryChannel
  22.   int count = *(int*)arg;
  23.   int* p;
  24.   int i = 0;
  25.   while (i <= count)
  26.     {
  27.       p = new int(i);
  28.       ndbout << "P: " << *p << endl;
  29.       theMemoryChannel->writeChannel(p);
  30.       if (i%5==0)
  31.         NdbSleep_MilliSleep(i);
  32.       i++;
  33.     }
  34.   return NULL;
  35. }
  36. extern "C" void* runConsumer(void* arg)
  37. {
  38.   // The producer will read items from MemoryChannel and print on screen
  39.   int count = *(int*)arg;
  40.   int* p;
  41.   int i = 0;
  42.   while (i < count)
  43.     {
  44.       p = theMemoryChannel->readChannel();
  45.       ndbout << "C: " << *p << endl;
  46.       i = *p;
  47.       delete p;
  48.       
  49.     }
  50.   return NULL;
  51. }
  52. class ArgStruct 
  53. {
  54. public:
  55.   ArgStruct(int _items, int _no){
  56.     items=_items; 
  57.     no=_no;
  58.   };
  59.   int items;
  60.   int no;
  61. };
  62. MemoryChannelMultipleWriter<ArgStruct>* theMemoryChannel2;
  63. extern "C" void* runProducer2(void*arg)
  64. {
  65.   // The producer will items into the MemoryChannel
  66.   ArgStruct* pArg = (ArgStruct*)arg;
  67.   int count = pArg->items;
  68.   ArgStruct* p;
  69.   int i = 0;
  70.   while (i < count)
  71.     {
  72.       p = new ArgStruct(i, pArg->no);
  73.       ndbout << "P"<<pArg->no<<": " << i << endl;
  74.       theMemoryChannel2->writeChannel(p);
  75.       NdbSleep_MilliSleep(i);
  76.       i++;
  77.     }
  78.   return NULL;
  79. }
  80. extern "C" void* runConsumer2(void* arg)
  81. {
  82.   // The producer will read items from MemoryChannel and print on screen
  83.   ArgStruct* pArg = (ArgStruct*)arg;
  84.   int count =  pArg->items * pArg->no;
  85.   ArgStruct* p;
  86.   int i = 0;
  87.   while (i < count)
  88.     {
  89.       p = theMemoryChannel2->readChannel();
  90.       ndbout << "C: "<< p->no << ", " << p->items << endl;
  91.       i++;
  92.       delete p;
  93.     }
  94.   ndbout << "Consumer2: " << count << " received" << endl;
  95.   return NULL;
  96. }
  97. //#if defined MEMORYCHANNELTEST
  98. //int main(int argc, char **argv)
  99. NDB_COMMAND(mctest, "mctest", "mctest", "Test the memory channel used in Ndb", 32768)
  100. {
  101.   ndbout << "==== testing MemoryChannel ====" << endl;
  102.   theMemoryChannel = new MemoryChannel<int>;
  103.   theMemoryChannel2 = new MemoryChannelMultipleWriter<ArgStruct>;
  104.   NdbThread* consumerThread;
  105.   NdbThread* producerThread;
  106.   NdbThread_SetConcurrencyLevel(2);
  107.   int numItems = 100;
  108.   producerThread = NdbThread_Create(runProducer, 
  109.     (void**)&numItems,
  110.     4096,
  111.     (char*)"producer");
  112.   consumerThread = NdbThread_Create(runConsumer, 
  113.     (void**)&numItems,
  114.     4096,
  115.     (char*)"consumer");
  116.   void *status;
  117.   NdbThread_WaitFor(consumerThread, &status);
  118.   NdbThread_WaitFor(producerThread, &status);
  119.   ndbout << "==== testing MemoryChannelMultipleWriter ====" << endl;
  120. #define NUM_THREADS2 5
  121.   NdbThread_SetConcurrencyLevel(NUM_THREADS2+2);
  122.   NdbThread* producerThreads[NUM_THREADS2];
  123.   ArgStruct *pArg;
  124.   for (int j = 0; j < NUM_THREADS2; j++)
  125.     {
  126.       char buf[25];
  127.       sprintf((char*)&buf, "producer%d", j);
  128.       pArg = new ArgStruct(numItems, j);
  129.       producerThreads[j] = NdbThread_Create(runProducer2, 
  130.     (void**)pArg,
  131.     4096,
  132.     (char*)&buf);
  133.     }
  134.   pArg = new ArgStruct(numItems, NUM_THREADS2);
  135.   consumerThread = NdbThread_Create(runConsumer2, 
  136.     (void**)pArg,
  137.     4096,
  138.     (char*)"consumer");
  139.   NdbThread_WaitFor(consumerThread, &status);
  140.   for (int j = 0; j < NUM_THREADS2; j++)
  141.   {
  142.     NdbThread_WaitFor(producerThreads[j], &status);
  143.   }
  144.     
  145.   return 0;
  146. }
  147. void ErrorReporter::handleError(ErrorCategory type, int messageID,
  148.                                 const char* problemData, const char* objRef,
  149. NdbShutdownType nst)
  150. {
  151.   ndbout << "ErrorReporter::handleError activated"  << endl;
  152.   exit(1);
  153. }
  154. //#endif