file_messaging.cpp
上传用户:yhdzpy8989
上传日期:2007-06-13
资源大小:13604k
文件大小:18k
源码类别:

生物技术

开发平台:

C/C++

  1. /*
  2.  * ===========================================================================
  3.  * PRODUCTION $Log: file_messaging.cpp,v $
  4.  * PRODUCTION Revision 1000.2  2004/06/01 18:28:37  gouriano
  5.  * PRODUCTION PRODUCTION: UPGRADED [GCC34_MSVC7] Dev-tree R1.11
  6.  * PRODUCTION
  7.  * ===========================================================================
  8.  */
  9. /*  $Id: file_messaging.cpp,v 1000.2 2004/06/01 18:28:37 gouriano Exp $
  10. * ===========================================================================
  11. *
  12. *                            PUBLIC DOMAIN NOTICE
  13. *               National Center for Biotechnology Information
  14. *
  15. *  This software/database is a "United States Government Work" under the
  16. *  terms of the United States Copyright Act.  It was written as part of
  17. *  the author's official duties as a United States Government employee and
  18. *  thus cannot be copyrighted.  This software/database is freely available
  19. *  to the public for use. The National Library of Medicine and the U.S.
  20. *  Government have not placed any restriction on its use or reproduction.
  21. *
  22. *  Although all reasonable efforts have been taken to ensure the accuracy
  23. *  and reliability of the software and data, the NLM and the U.S.
  24. *  Government do not and cannot warrant the performance or results that
  25. *  may be obtained by using this software or data. The NLM and the U.S.
  26. *  Government disclaim all warranties, express or implied, including
  27. *  warranties of performance, merchantability or fitness for any particular
  28. *  purpose.
  29. *
  30. *  Please cite the author in any work or product based on this material.
  31. *
  32. * ===========================================================================
  33. *
  34. * Authors:  Paul Thiessen
  35. *
  36. * File Description:
  37. *       file-based messaging system
  38. *
  39. * ===========================================================================
  40. */
  41. #ifdef _MSC_VER
  42. #pragma warning(disable:4018)   // disable signed/unsigned mismatch warning in MSVC
  43. #endif
  44. #include <ncbi_pch.hpp>
  45. #include <corelib/ncbistd.hpp>
  46. #include <corelib/ncbidiag.hpp>
  47. #include <corelib/ncbi_system.hpp>
  48. #include <util/stream_utils.hpp>
  49. #include <memory>
  50. #include "file_messaging.hpp"
  51. BEGIN_NCBI_SCOPE
  52. // diagnostic streams
  53. #define TRACEMSG(stream) ERR_POST(Trace << stream)
  54. #define INFOMSG(stream) ERR_POST(Info << stream)
  55. #define WARNINGMSG(stream) ERR_POST(Warning << stream)
  56. #define ERRORMSG(stream) ERR_POST(Error << stream)
  57. #define FATALMSG(stream) ERR_POST(Fatal << stream)
  58. FileMessenger::FileMessenger(FileMessagingManager *parentManager,
  59.     const std::string& messageFilename, MessageResponder *responderObject, bool isReadOnly) :
  60.         manager(parentManager), responder(responderObject),
  61.         messageFile(messageFilename), lockFile(string(messageFilename) + ".lock"),
  62.         lastKnownSize(0), readOnly(isReadOnly)
  63. {
  64.     TRACEMSG("monitoring message file " << messageFilename);
  65. }
  66. static fstream * CreateLock(const CDirEntry& lockFile)
  67. {
  68.     if (lockFile.Exists()) {
  69.         TRACEMSG("unable to establish a lock - lock file exists already");
  70.         return NULL;
  71.     }
  72.     auto_ptr<fstream> lockStream(CFile::CreateTmpFile(lockFile.GetPath(), CFile::eText, CFile::eAllowRead));
  73.     if (lockStream.get() == NULL || !(*lockStream)) {
  74.         TRACEMSG("unable to establish a lock - cannot create lock file");
  75.         return NULL;
  76.     }
  77.     char lockWord[4];
  78.     lockStream->seekg(0);
  79.     if (CStreamUtils::Readsome(*lockStream, lockWord, 4) == 4 &&
  80.             lockWord[0] == 'L' && lockWord[1] == 'O' &&
  81.             lockWord[2] == 'C' && lockWord[3] == 'K') {
  82.         ERRORMSG("lock file opened for writing but apparently already LOCKed!");
  83.         return NULL;
  84.     }
  85.     lockStream->seekg(0);
  86.     lockStream->write("LOCK", 4);
  87.     lockStream->flush();
  88.     TRACEMSG("lock file established: " << lockFile.GetPath());
  89.     return lockStream.release();
  90. }
  91. FileMessenger::~FileMessenger(void)
  92. {
  93.     // sanity check to make sure each command issued received a reply
  94.     bool okay = false;
  95.     CommandOriginators::const_iterator c, ce = commandsSent.end();
  96.     TargetApp2Command::const_iterator a, ae;
  97.     if (commandsSent.size() == repliesReceived.size()) {
  98.         for (c=commandsSent.begin(); c!=ce; ++c) {
  99.             CommandReplies::const_iterator r = repliesReceived.find(c->first);
  100.             if (r == repliesReceived.end())
  101.                 break;
  102.             for (a=c->second.begin(), ae=c->second.end(); a!=ae; ++a) {
  103.                 if (r->second.find(a->first) == r->second.end())
  104.                     break;
  105.             }
  106.             if (a != ae) break;
  107.         }
  108.         if (c == ce) okay = true;
  109.     }
  110.     if (!okay) WARNINGMSG("FileMessenger: did not receive a reply to all commands sent!");
  111.     // last-minute attempt to write any pending commands to the file
  112.     if (pendingCommands.size() > 0) {
  113.         auto_ptr<fstream> lockStream(CreateLock(lockFile.GetPath()));
  114.         if (lockStream.get() == NULL) {
  115.             int nTries = 1;
  116.             do {
  117.                 SleepSec(1);
  118.                 lockStream.reset(CreateLock(lockFile));
  119.                 ++nTries;
  120.             } while (lockStream.get() == NULL && nTries <= 30);
  121.         }
  122.         if (lockStream.get() != NULL)
  123.             SendPendingCommands();
  124.         else
  125.             ERRORMSG("Timeout occurred when attempting to flush pending commands to file");
  126.     }
  127.     // sanity check to make sure each command received was sent a reply
  128.     okay = false;
  129.     ce = commandsReceived.end();
  130.     if (commandsReceived.size() == repliesSent.size()) {
  131.         for (c=commandsReceived.begin(); c!=ce; ++c) {
  132.             CommandReplies::const_iterator r = repliesSent.find(c->first);
  133.             if (r == repliesSent.end())
  134.                 break;
  135.             for (a=c->second.begin(), ae=c->second.end(); a!=ae; ++a) {
  136.                 if (r->second.find(a->first) == r->second.end())
  137.                     break;
  138.             }
  139.             if (a != ae) break;
  140.         }
  141.         if (c == ce) okay = true;
  142.     }
  143.     if (!okay) ERRORMSG("FileMessenger: did not send a reply to all commands received!");
  144. }
  145. void FileMessenger::SendCommand(const std::string& targetApp, unsigned long id,
  146.         const std::string& command, const std::string& data)
  147. {
  148.     if (readOnly) {
  149.         WARNINGMSG("command '" << command << "' to " << targetApp
  150.             << " received but not written to read-only message file");
  151.         return;
  152.     }
  153.     // check against record of commands already sent
  154.     CommandOriginators::iterator c = commandsSent.find(id);
  155.     if (c != commandsSent.end() && c->second.find(targetApp) != c->second.end()) {
  156.         ERRORMSG("Already sent command " << id << " to " << targetApp << '!');
  157.         return;
  158.     }
  159.     commandsSent[id][targetApp] = command;
  160.     // create a new CommandInfo on the queue - will actually be sent later
  161.     pendingCommands.resize(pendingCommands.size() + 1);
  162.     pendingCommands.back().to = targetApp;
  163.     pendingCommands.back().id = id;
  164.     pendingCommands.back().command = command;
  165.     pendingCommands.back().data = data;
  166. }
  167. void FileMessenger::SendReply(const std::string& targetApp, unsigned long id,
  168.         MessageResponder::ReplyStatus status, const std::string& data)
  169. {
  170.     // check against record of commands already received and replies already sent
  171.     CommandOriginators::iterator c = commandsReceived.find(id);
  172.     if (c == commandsReceived.end() || c->second.find(targetApp) == c->second.end()) {
  173.         ERRORMSG("Can't reply; have not received command " << id << " from " << targetApp << '!');
  174.         return;
  175.     }
  176.     CommandReplies::iterator r = repliesSent.find(id);
  177.     if (r != repliesSent.end() && r->second.find(targetApp) != r->second.end()) {
  178.         ERRORMSG("Already sent reply " << id << " to " << targetApp << '!');
  179.         return;
  180.     }
  181.     repliesSent[id][targetApp] = status;
  182.     if (readOnly) {
  183.         TRACEMSG("reply " << id << " to " << targetApp
  184.             << " logged but not written to read-only message file");
  185.     } else {
  186.         // create a new CommandInfo on the queue - will actually be sent later
  187.         pendingCommands.resize(pendingCommands.size() + 1);
  188.         pendingCommands.back().to = targetApp;
  189.         pendingCommands.back().id = id;
  190.         switch (status) {
  191.             case MessageResponder::REPLY_OKAY:
  192.                 pendingCommands.back().command = "OKAY"; break;
  193.             case MessageResponder::REPLY_ERROR:
  194.                 pendingCommands.back().command = "ERROR"; break;
  195.             default:
  196.                 ERRORMSG("Unknown reply status " << status << '!');
  197.                 pendingCommands.back().command = "ERROR"; break;
  198.         }
  199.         pendingCommands.back().data = data;
  200.     }
  201. }
  202. void FileMessenger::PollMessageFile(void)
  203. {
  204.     // skip all checking if message file doesn't exist or lock file is already present
  205.     if (!messageFile.Exists() || !messageFile.IsFile() || lockFile.Exists())
  206.         return;
  207.     // check to see if we need to read file's contents
  208.     CFile mf(messageFile.GetPath());
  209.     Int8 messageFileSize = mf.GetLength();
  210.     if (messageFileSize < 0) {
  211.         ERRORMSG("Couldn't get message file size!");
  212.         return;
  213.     }
  214.     bool needToRead = (messageFileSize > lastKnownSize);
  215.     // only continue if have new commands to receive, or have pending commands to send
  216.     if (!needToRead && pendingCommands.size() == 0)
  217.         return;
  218.     TRACEMSG("message file: " << messageFile.GetPath());
  219. //    TRACEMSG("current size: " << (long) messageFileSize);
  220. //    TRACEMSG("last known size: " << (long) lastKnownSize);
  221.     if (needToRead) TRACEMSG("message file has grown since last read");
  222.     if (pendingCommands.size() > 0) TRACEMSG("has pending commands to send");
  223.     // since we're going to read or write the file, establish a lock now
  224.     auto_ptr<fstream> lockStream(CreateLock(lockFile));
  225.     if (lockStream.get() == NULL)
  226.         return; // try again later, so program isn't locked during wait
  227.     // first read any new commands from the file
  228.     if (needToRead) ReceiveCommands();
  229.     // then send any pending commands
  230.     if (pendingCommands.size() > 0)
  231.         SendPendingCommands();
  232.     // now update the size stamp to current size so we don't unnecessarily read in any commands just sent
  233.     lastKnownSize = mf.GetLength();
  234.     if (lastKnownSize < 0) {
  235.         ERRORMSG("Couldn't get message file size!");
  236.         lastKnownSize = 0;
  237.     }
  238.     // lock file automatically deleted upon return...
  239. }
  240. static const string COMMAND_END = "### END COMMAND ###";
  241. // returns true if some data was read (up to eol) before eof
  242. static bool ReadSingleLine(CNcbiIfstream& inStream, string *str)
  243. {
  244.     str->erase();
  245.     CT_CHAR_TYPE ch;
  246.     do {
  247.         ch = inStream.get();
  248.         if (inStream.bad() || inStream.fail() || CT_EQ_INT_TYPE(CT_TO_INT_TYPE(ch), CT_EOF))
  249.             return false;
  250.         else if (CT_EQ_INT_TYPE(CT_TO_INT_TYPE(ch), CT_TO_INT_TYPE('n')))
  251.             break;
  252.         else
  253.             *str += CT_TO_CHAR_TYPE(ch);
  254.     } while (1);
  255.     return true;
  256. }
  257. // must be called only after lock is established!
  258. void FileMessenger::ReceiveCommands(void)
  259. {
  260.     TRACEMSG("receiving commands...");
  261.     auto_ptr<CNcbiIfstream> inStream(new ncbi::CNcbiIfstream(
  262.         messageFile.GetPath().c_str(), IOS_BASE::in));
  263.     if (!(*inStream)) {
  264.         ERRORMSG("cannot open message file for reading!");
  265.         return;
  266.     }
  267. #define GET_EXPECTED_LINE 
  268.     if (!ReadSingleLine(*inStream, &line)) { 
  269.         ERRORMSG("unexpected EOF!"); 
  270.         return; 
  271.     }
  272. #define SKIP_THROUGH_END_OF_COMMAND 
  273.     do { 
  274.         if (!ReadSingleLine(*inStream, &line)) { 
  275.             ERRORMSG("no end-of-command marker found before EOF!"); 
  276.             return; 
  277.         } 
  278.         if (line == COMMAND_END) break; 
  279.     } while (1)
  280. #define GET_ITEM(ident) 
  281.     item = string(ident); 
  282.     if (line.substr(0, item.size()) != item) { 
  283.         ERRORMSG("Line does not begin with expected '" << item << "'!"); 
  284.         return; 
  285.     } 
  286.     item = line.substr(item.size());
  287.     string line, item, from;
  288.     CommandInfo command;
  289.     do {
  290.         // get To: (ignore if not this app)
  291.         if (!ReadSingleLine(*inStream, &line) || line.size() == 0) {
  292.             return;
  293.         }
  294.         GET_ITEM("To: ")
  295.         if (item != manager->applicationName) {
  296.             SKIP_THROUGH_END_OF_COMMAND;
  297.             continue;
  298.         }
  299.         // get From:
  300.         GET_EXPECTED_LINE
  301.         GET_ITEM("From: ")
  302.         from = item;
  303.         // get ID:
  304.         GET_EXPECTED_LINE
  305.         GET_ITEM("ID: ")
  306.         char *endptr;
  307.         command.id = strtoul(item.c_str(), &endptr, 10);
  308.         if (endptr == item.c_str()) {
  309.             ERRORMSG("Bad " << line << '!');
  310.             return;
  311.         }
  312.         // get command or reply
  313.         GET_EXPECTED_LINE
  314.         if (line.substr(0, 9) != "Command: " && line.substr(0, 7) != "Reply: ") {
  315.             ERRORMSG("Line does not begin with expected 'Command: ' or 'Reply: '!");
  316.             return;
  317.         }
  318.         bool isCommand = (line.substr(0, 9) == "Command: ");
  319.         command.command = line.substr(isCommand ? 9 : 7);
  320.         // skip commands/replies already read
  321.         if (isCommand) {
  322.             CommandOriginators::iterator c = commandsReceived.find(command.id);
  323.             if (c != commandsReceived.end() && c->second.find(from) != c->second.end()) {
  324.                 SKIP_THROUGH_END_OF_COMMAND;
  325.                 continue;
  326.             }
  327.         } else {    // reply
  328.             CommandReplies::iterator r = repliesReceived.find(command.id);
  329.             if (r != repliesReceived.end() && r->second.find(from) != r->second.end()) {
  330.                 SKIP_THROUGH_END_OF_COMMAND;
  331.                 continue;
  332.             }
  333.         }
  334.         // get data (all lines up to end marker)
  335.         command.data.erase();
  336.         do {
  337.             GET_EXPECTED_LINE
  338.             if (line == COMMAND_END) break;
  339.             command.data += line;
  340.             command.data += 'n';
  341.         } while (1);
  342.         // process new commands/replies
  343.         if (isCommand) {
  344.             commandsReceived[command.id][from] = command.command;
  345.             TRACEMSG("processing command " << command.id << " from " << from << ": " << command.command);
  346.             // command received callback
  347.             responder->ReceivedCommand(from, command.id, command.command, command.data);
  348.         } else {    // reply
  349.             MessageResponder::ReplyStatus status = MessageResponder::REPLY_ERROR;
  350.             if (command.command == "OKAY")
  351.                 status = MessageResponder::REPLY_OKAY;
  352.             else if (command.command != "ERROR")
  353.                 ERRORMSG("Unknown reply status " << command.command << '!');
  354.             repliesReceived[command.id][from] = status;
  355.             TRACEMSG("processing reply " << command.id << " from " << from);
  356.             // reply received callback
  357.             responder->ReceivedReply(from, command.id, status, command.data);
  358.         }
  359.     } while (1);
  360. }
  361. // must be called only after lock is established!
  362. void FileMessenger::SendPendingCommands(void)
  363. {
  364.     TRACEMSG("sending commands...");
  365.     if (pendingCommands.size() == 0)
  366.         return;
  367.     auto_ptr<CNcbiOfstream> outStream(new ncbi::CNcbiOfstream(
  368.         messageFile.GetPath().c_str(), IOS_BASE::out | IOS_BASE::app));
  369.     if (!(*outStream)) {
  370.         ERRORMSG("cannot open message file for writing!");
  371.         return;
  372.     }
  373.     CommandList::iterator c, ce = pendingCommands.end();
  374.     for (c=pendingCommands.begin(); c!=ce; ++c) {
  375.         // dump the command to the file, noting different syntax for replies
  376.         bool isReply = (c->command == "OKAY" || c->command == "ERROR");
  377.         *outStream
  378.             << "To: " << c->to << 'n'
  379.             << "From: " << manager->applicationName << 'n'
  380.             << "ID: " << c->id << 'n'
  381.             << (isReply ? "Reply: " : "Command: ") << c->command << 'n';
  382.         if (c->data.size() > 0) {
  383.             *outStream << c->data;
  384.             if (c->data[c->data.size() - 1] != 'n')    // append n if data doesn't end with one
  385.                 *outStream << 'n';
  386.         }
  387.         *outStream << COMMAND_END << 'n';
  388.         outStream->flush();
  389.         TRACEMSG("sent " << (isReply ? "reply " : "command ") << c->id << " to " << c->to);
  390.     }
  391.     pendingCommands.clear();
  392. }
  393. FileMessagingManager::FileMessagingManager(const std::string& appName):
  394.     applicationName(appName)
  395. {
  396. }
  397. FileMessagingManager::~FileMessagingManager(void)
  398. {
  399.     FileMessengerList::iterator m, me = messengers.end();
  400.     for (m=messengers.begin(); m!=me; ++m)
  401.         delete *m;
  402. }
  403. FileMessenger * FileMessagingManager::CreateNewFileMessenger(
  404.     const std::string& messageFilename, MessageResponder *responderObject, bool readOnly)
  405. {
  406.     if (!responderObject) {
  407.         ERRORMSG("CreateNewFileMessenger() - got NULL responderObject!");
  408.         return NULL;
  409.     }
  410.     FileMessenger *newMessenger = new FileMessenger(this, messageFilename, responderObject, readOnly);
  411.     messengers.push_back(newMessenger);
  412.     return newMessenger;
  413. }
  414. void FileMessagingManager::DeleteFileMessenger(FileMessenger *messenger)
  415. {
  416.     FileMessengerList::iterator f, fe = messengers.end();
  417.     for (f=messengers.begin(); f!=fe; ++f) {
  418.         if (*f == messenger) {
  419.             delete *f;
  420.             messengers.erase(f);
  421.             return;
  422.         }
  423.     }
  424.     ERRORMSG("DeleteFileMessenger() - given FileMessenger* not created by this FileMessagingManager!");
  425. }
  426. void FileMessagingManager::PollMessageFiles(void)
  427. {
  428.     FileMessengerList::iterator f, fe = messengers.end();
  429.     for (f=messengers.begin(); f!=fe; ++f)
  430.         (*f)->PollMessageFile();
  431. }
  432. END_NCBI_SCOPE
  433. /*
  434. * ---------------------------------------------------------------------------
  435. * $Log: file_messaging.cpp,v $
  436. * Revision 1000.2  2004/06/01 18:28:37  gouriano
  437. * PRODUCTION: UPGRADED [GCC34_MSVC7] Dev-tree R1.11
  438. *
  439. * Revision 1.11  2004/05/21 21:41:39  gorelenk
  440. * Added PCH ncbi_pch.hpp
  441. *
  442. * Revision 1.10  2004/03/15 18:25:36  thiessen
  443. * prefer prefix vs. postfix ++/-- operators
  444. *
  445. * Revision 1.9  2004/02/19 17:04:56  thiessen
  446. * remove cn3d/ from include paths; add pragma to disable annoying msvc warning
  447. *
  448. * Revision 1.8  2003/10/20 23:03:33  thiessen
  449. * send pending commands before messenger is destroyed
  450. *
  451. * Revision 1.7  2003/10/02 18:45:22  thiessen
  452. * make non-reply message warning only
  453. *
  454. * Revision 1.6  2003/09/22 17:33:12  thiessen
  455. * add AlignmentChanged flag; flush message file; check row order of repeats
  456. *
  457. * Revision 1.5  2003/07/10 18:47:29  thiessen
  458. * add CDTree->Select command
  459. *
  460. * Revision 1.4  2003/03/19 14:44:36  thiessen
  461. * fix char/traits problem
  462. *
  463. * Revision 1.3  2003/03/14 19:22:59  thiessen
  464. * add CommandProcessor to handle file-message commands; fixes for GCC 2.9
  465. *
  466. * Revision 1.2  2003/03/13 18:55:04  thiessen
  467. * add messenger destroy function
  468. *
  469. * Revision 1.1  2003/03/13 14:26:18  thiessen
  470. * add file_messaging module; split cn3d_main_wxwin into cn3d_app, cn3d_glcanvas, structure_window, cn3d_tools
  471. *
  472. */