queue_poll.cpp
上传用户:yhdzpy8989
上传日期:2007-06-13
资源大小:13604k
文件大小:17k
源码类别:

生物技术

开发平台:

C/C++

  1. /*
  2.  * ===========================================================================
  3.  * PRODUCTION $Log: queue_poll.cpp,v $
  4.  * PRODUCTION Revision 1000.3  2004/06/01 18:27:26  gouriano
  5.  * PRODUCTION PRODUCTION: UPGRADED [GCC34_MSVC7] Dev-tree R1.9
  6.  * PRODUCTION
  7.  * ===========================================================================
  8.  */
  9. /*  $Id: queue_poll.cpp,v 1000.3 2004/06/01 18:27:26 gouriano Exp $
  10.  * ===========================================================================
  11.  *
  12.  *                            PUBLIC DOMAIN NOTICE
  13.  *               National Center for Biotechnology Information
  14.  *
  15.  *  This software/database is a "United States Government Work" under the
  16.  *  terms of the United States Copyright Act.  It was written as part of
  17.  *  the author's official duties as a United States Government employee and
  18.  *  thus cannot be copyrighted.  This software/database is freely available
  19.  *  to the public for use. The National Library of Medicine and the U.S.
  20.  *  Government have not placed any restriction on its use or reproduction.
  21.  *
  22.  *  Although all reasonable efforts have been taken to ensure the accuracy
  23.  *  and reliability of the software and data, the NLM and the U.S.
  24.  *  Government do not and cannot warrant the performance or results that
  25.  *  may be obtained by using this software or data. The NLM and the U.S.
  26.  *  Government disclaim all warranties, express or implied, including
  27.  *  warranties of performance, merchantability or fitness for any particular
  28.  *  purpose.
  29.  *
  30.  *  Please cite the author in any work or product based on this material.
  31.  *
  32.  * ===========================================================================
  33.  *
  34.  * Author:  Kevin Bealer
  35.  *
  36.  * File Description:
  37.  *   Queueing and Polling code for blast_client.
  38.  *
  39.  */
  40. // Local
  41. #include <ncbi_pch.hpp>
  42. #include "queue_poll.hpp"
  43. // Corelib
  44. #include <corelib/ncbi_system.hpp>
  45. // Objects
  46. #include <objects/blast/blast__.hpp>
  47. #include <objects/blast/blastclient.hpp>
  48. #include <objects/seqalign/Seq_align_set.hpp>
  49. // Object Manager
  50. #include <objmgr/object_manager.hpp>
  51. #include <objmgr/scope.hpp>
  52. #include <objtools/data_loaders/genbank/gbloader.hpp>
  53. // Objtools
  54. #include <objtools/readers/fasta.hpp>
  55. // Use _exit() if available.
  56. #if defined(NCBI_OS_UNIX)
  57. #include <unistd.h>
  58. #endif
  59. USING_NCBI_SCOPE;
  60. using namespace ncbi::objects;
  61. typedef list< CRef<CBlast4_error> > TErrorList;
  62. /////////////////////////////////////////////////////////////////////////////
  63. //
  64. //  Helper Functions
  65. //
  66. #define BLAST4_POLL_DELAY_SEC 15
  67. #define BLAST4_IGNORE_ERRS    5
  68. static inline bool
  69. s_IsAmino(const string & program)
  70. {
  71.     // Should the FASTA be NUC or PROT data?
  72.         
  73.     return (program == "blastp")  ||  (program == "tblastn");
  74. }
  75. void
  76. s_Setp(list<CRef<CBlast4_parameter> >& l, string n, CRef<CBlast4_cutoff> x)
  77. {
  78.     CRef<CBlast4_value> v(new CBlast4_value);
  79.     v->SetCutoff(*x);
  80.     CRef<CBlast4_parameter> p(new CBlast4_parameter);
  81.     p->SetName(n);
  82.     p->SetValue(*v);
  83.     l.push_back(p);
  84. }
  85. void
  86. s_Setp(list<CRef<CBlast4_parameter> >& l, string n, const string x)
  87. {
  88.     CRef<CBlast4_value> v(new CBlast4_value);
  89.     v->SetString(x);
  90.     CRef<CBlast4_parameter> p(new CBlast4_parameter);
  91.     p->SetName(n);
  92.     p->SetValue(*v);
  93.     l.push_back(p);
  94. }
  95. void
  96. s_Setp(list<CRef<CBlast4_parameter> >& l, string n, const int & x)
  97. {
  98.     CRef<CBlast4_value> v(new CBlast4_value);
  99.     v->SetInteger(x);
  100.     
  101.     CRef<CBlast4_parameter> p(new CBlast4_parameter);
  102.     p->SetName(n);
  103.     p->SetValue(*v);
  104.     
  105.     l.push_back(p);
  106. }
  107. void
  108. s_Setp(list<CRef<CBlast4_parameter> >& l, string n, const bool & x)
  109. {
  110.     CRef<CBlast4_value> v(new CBlast4_value);
  111.     v->SetBoolean(x);
  112.     
  113.     CRef<CBlast4_parameter> p(new CBlast4_parameter);
  114.     p->SetName(n);
  115.     p->SetValue(*v);
  116.     
  117.     l.push_back(p);
  118. }
  119. void
  120. s_Setp(list<CRef<CBlast4_parameter> >& l, string n, const double & x)
  121. {
  122.     CRef<CBlast4_value> v(new CBlast4_value);
  123.     v->SetReal(x);
  124.     
  125.     CRef<CBlast4_parameter> p(new CBlast4_parameter);
  126.     p->SetName(n);
  127.     p->SetValue(*v);
  128.     
  129.     l.push_back(p);
  130. }
  131. template <class T1, class T2, class T3>
  132. void
  133. s_SetpOpt(T1 & params, T2 & name, T3 & object)
  134. {
  135.     if (object.Exists()) {
  136.         s_Setp(params, name, object.GetValue());
  137.     }
  138. }
  139. template <class T>
  140. void
  141. s_Output(CNcbiOstream & os, CRef<T> t)
  142. {
  143.     auto_ptr<CObjectOStream> x(CObjectOStream::Open(eSerial_AsnText, os));
  144.     *x << *t;
  145.     os.flush();
  146. }
  147. /////////////////////////////////////////////////////////////////////////////
  148. //
  149. //  Queueing and Polling
  150. //
  151. static CRef<CBioseq_set>
  152. s_SetupQuery(CNcbiIstream    & query_in,
  153.              CRef<CScope>      scope,
  154.              TReadFastaFlags   fasta_flags)
  155. {
  156.     CRef<CSeq_entry> seqentry = ReadFasta(query_in, fasta_flags, 0, 0);
  157.     
  158.     scope->AddTopLevelSeqEntry(*seqentry);
  159.     
  160.     CRef<CBioseq_set> seqset(new CBioseq_set);
  161.     seqset->SetSeq_set().push_back(seqentry);
  162.     
  163.     return seqset;
  164. }
  165. //#include <unistd.h>
  166. class some_kind_of_nothing : public CEofException {
  167. };
  168. static CRef<CBlast4_reply>
  169. s_Submit(CRef<CBlast4_request_body> body, bool echo = true)
  170. {
  171.     static int errors_ignored = 0;
  172.     
  173.     // Create the request; optionally echo it
  174.     CRef<CBlast4_request> request(new CBlast4_request);
  175.     request->SetBody(*body);
  176.     
  177.     if (echo) {
  178.         s_Output(NcbiCout, request);
  179.     }
  180.     
  181.     // submit to server, get reply; optionally echo it
  182.     
  183.     CRef<CBlast4_reply> reply(new CBlast4_reply);
  184.     
  185.     try {
  186.         //throw some_kind_of_nothing();
  187.         CBlast4Client().Ask(*request, *reply);
  188.     }
  189.     catch(CEofException & e) {
  190.         if (errors_ignored >= BLAST4_IGNORE_ERRS) {
  191.             ERR_POST(Error << "Unexpected EOF when contacting netblast server"
  192.                      " - unable to complete request.");
  193. #if defined(NCBI_OS_UNIX)
  194.             // Use _exit() avoid coredump.
  195.             _exit(-1);
  196. #else
  197.             exit(-1);
  198. #endif
  199.         } else {
  200.             errors_ignored ++;
  201. //             ERR_POST(Error << "Unexpected EOF when contacting netblast server"
  202. //                      " ::: ignoring (" << errors_ignored << "/" << BLAST4_IGNORE_ERRS << ").");
  203.             
  204.             CRef<CBlast4_reply> empty_result;
  205.             return empty_result;
  206.         }
  207.     }
  208.     
  209.     if (echo) {
  210.         s_Output(NcbiCout, reply);
  211.     }
  212.     
  213.     return reply;
  214. }
  215. class CSearchParamBuilder : public COptionWalker
  216. {
  217. public:
  218.     CSearchParamBuilder(list< CRef<CBlast4_parameter> > & algo,
  219.                         list< CRef<CBlast4_parameter> > & prog)
  220.         : m_Algo(algo),
  221.           m_Prog(prog)
  222.     {}
  223.     
  224.     template <class T>
  225.     void Same(T          & valobj,
  226.               CUserOpt,
  227.               CNetName     nb_name,
  228.               CArgKey,
  229.               COptDesc,
  230.               EListPick    lp)
  231.     {
  232.         if (EListAlgo == lp) {
  233.             s_SetpOpt(m_Algo, nb_name, valobj);
  234.         } else if (EListProg == lp) {
  235.             s_SetpOpt(m_Prog, nb_name, valobj);
  236.         }
  237.     }
  238.     
  239.     template <class T>
  240.     void Local(T &,
  241.                CUserOpt,
  242.                CArgKey,
  243.                COptDesc)
  244.     { }
  245.     
  246.     template <class T> void Remote(T & valobj, CNetName net_name, EListPick lp)
  247.     {
  248.         if (EListAlgo == lp) {
  249.             s_SetpOpt(m_Algo, net_name, valobj);
  250.         } else if (EListProg == lp) {
  251.             s_SetpOpt(m_Prog, net_name, valobj);
  252.         }
  253.     }
  254.     
  255.     bool NeedRemote(void) { return true; }
  256.     
  257. private:
  258.     list< CRef<CBlast4_parameter> > & m_Algo;
  259.     list< CRef<CBlast4_parameter> > & m_Prog;
  260. };
  261. static void
  262. s_SetSearchParams(CNetblastSearchOpts             & opts,
  263.                   list< CRef<CBlast4_parameter> > & algo,
  264.                   list< CRef<CBlast4_parameter> > & prog)
  265. {
  266.     CSearchParamBuilder spb(algo, prog);
  267.     
  268.     opts.Apply(spb);
  269. }
  270. // Stolen from: CRemoteBlast::SetQueries(CRef<objects::CBioseq_set> bioseqs)
  271. bool s_SetQueries(CRef<CBlast4_queue_search_request> qsr,
  272.                   CRef<CBioseq_set>                  bioseqs)
  273. {
  274.     if (bioseqs.Empty()) {
  275.         return false;
  276.     }
  277.     
  278.     CRef<CBlast4_queries> queries_p(new CBlast4_queries);
  279.     queries_p->SetBioseq_set(*bioseqs);
  280.     
  281.     qsr->SetQueries(*queries_p);
  282.     
  283.     return true;
  284. }
  285. static string
  286. s_QueueSearch(string              & program,
  287.               string              & database,
  288.               string              & service,
  289.               CNetblastSearchOpts & opts,
  290.               CRef<CBioseq_set>     query,
  291.               bool                  verbose,
  292.               string              & err)
  293. {
  294.     string returned_rid;
  295.     
  296.     CRef<CBlast4_subject> subject(new CBlast4_subject);
  297.     subject->SetDatabase(database);
  298.     
  299.     CRef<CBlast4_queue_search_request> qsr(new CBlast4_queue_search_request);
  300.     qsr->SetProgram(program);
  301.     qsr->SetService(service);
  302.     
  303.     qsr->SetSubject(*subject);
  304.     
  305.     if (query->GetSeq_set().front()->IsSeq()) {
  306.         //qsr->SetQueries(*query);
  307.         s_SetQueries(qsr, query);
  308.     } else {
  309.         CRef<CBioseq_set> myset(& query->SetSeq_set().front()->SetSet());
  310.         s_SetQueries(qsr, myset);
  311.     }
  312.     
  313.     list< CRef<CBlast4_parameter> > & algo = qsr->SetAlgorithm_options().Set();
  314.     list< CRef<CBlast4_parameter> > & prog = qsr->SetProgram_options().Set();
  315.     
  316.     s_SetSearchParams(opts, algo, prog);
  317.     
  318.     CRef<CBlast4_request_body> body(new CBlast4_request_body);
  319.     body->SetQueue_search(*qsr);
  320.     
  321.     CRef<CBlast4_reply> reply = s_Submit(body, verbose);
  322.     
  323.     if (reply.NotEmpty() &&
  324.         reply->CanGetBody() &&
  325.         reply->GetBody().GetQueue_search().CanGetRequest_id()) {
  326.         
  327.         returned_rid = reply->GetBody().GetQueue_search().GetRequest_id();
  328.     }
  329.     
  330.     if (reply.NotEmpty() &&
  331.         reply->CanGetErrors()) {
  332.         const CBlast4_reply::TErrors & errs = reply->GetErrors();
  333.         
  334.         CBlast4_reply::TErrors::const_iterator i;
  335.         
  336.         for (i = errs.begin(); i != errs.end(); i++) {
  337.             if ((*i)->CanGetMessage() &&
  338.                 (*i)->GetMessage().size()) {
  339.                 if (err.size()) {
  340.                     err += "n";
  341.                 }
  342.                 err += (*i)->GetMessage();
  343.             }
  344.         }
  345.     }
  346.     
  347.     return returned_rid;
  348. }
  349. static CRef<CBlast4_reply>
  350. s_GetSearchResults(const string & RID, bool echo = true)
  351. {
  352.     CRef<CBlast4_get_search_results_request>
  353.         gsrr(new CBlast4_get_search_results_request);
  354.     
  355.     gsrr->SetRequest_id(RID);
  356.         
  357.     CRef<CBlast4_request_body> body(new CBlast4_request_body);
  358.     body->SetGet_search_results(*gsrr);
  359.         
  360.     return s_Submit(body, echo);
  361. }
  362. static bool
  363. s_SearchPending(CRef<CBlast4_reply> reply)
  364. {
  365.     // The reply can be empty on certain types of errors.
  366.     if (reply.Empty())
  367.         return true;
  368.     
  369.     const list< CRef<CBlast4_error> > & errors = reply->GetErrors();
  370.    
  371.     TErrorList::const_iterator i;
  372.     for(i = errors.begin(); i != errors.end(); i++) {
  373.         if ((*i)->GetCode() == eBlast4_error_code_search_pending) {
  374.             return true;
  375.         }
  376.     }
  377.     return false;
  378. }
  379. static void
  380. s_ShowAlign(CNcbiOstream       & os,
  381.             CBlast4_reply_body & reply,
  382.             CRef<CScope>         scope,
  383.             CAlignParms        & alparms,
  384.             bool                 gapped)
  385. {
  386.     CBlast4_get_search_results_reply & cgsrr(reply.SetGet_search_results());
  387.     
  388.     if (! cgsrr.CanGetAlignments()) {
  389.         os << "This search did not find any matches.n";
  390.         return;
  391.     }
  392.     
  393.     CSeq_align_set & alignment(cgsrr.SetAlignments());
  394.     
  395.     list <CDisplaySeqalign::SeqlocInfo*>  none1;
  396.     list <CDisplaySeqalign::FeatureInfo*> none2;
  397.     
  398.     AutoPtr<CDisplaySeqalign> dsa_ptr;
  399.     
  400.     if (! gapped) {
  401.         CRef<CSeq_align_set> newalign =
  402.             CDisplaySeqalign::PrepareBlastUngappedSeqalign(alignment);
  403.         
  404.         dsa_ptr = new CDisplaySeqalign(*newalign, none1, none2, 0, * scope);
  405.     } else {
  406.         dsa_ptr = new CDisplaySeqalign(alignment, none1, none2, 0, * scope);
  407.     }
  408.     
  409.     alparms.AdjustDisplay(*dsa_ptr);
  410.     
  411.     dsa_ptr->DisplaySeqalign(os);
  412. }
  413. static Int4
  414. s_PollForResults(const string & RID,
  415.                  bool           verbose,
  416.                  bool           raw_asn,
  417.                  CRef<CScope>   scope,
  418.                  CAlignParms  & alparms,
  419.                  bool           gapped)
  420. {
  421.     CRef<CBlast4_reply> r(s_GetSearchResults(RID, verbose));
  422.     
  423.     Int4 EOFtime    = 0;
  424.     Int4 MaxEOFtime = 120;
  425.     
  426.     bool pending = s_SearchPending(r);
  427.     
  428.     while (pending  &&  (EOFtime < MaxEOFtime)) {
  429.         SleepSec(BLAST4_POLL_DELAY_SEC);
  430.         
  431.         try {
  432.             r = s_GetSearchResults(RID, verbose);
  433.             pending = s_SearchPending(r);
  434.         }
  435.         catch(CEofException &) {
  436.             EOFtime += BLAST4_POLL_DELAY_SEC;
  437.         }
  438.     }
  439.     
  440.     bool raw_output = false;
  441.     
  442.     if (raw_asn) {
  443.         raw_output = true;
  444.     }
  445.     
  446.     if (! (r->CanGetBody()  &&  r->GetBody().IsGet_search_results())) {
  447.         raw_output = true;
  448.     }
  449.     
  450.     if (raw_output) {
  451.         s_Output(NcbiCout, r);
  452.     } else {
  453.         CBlast4_reply_body & repbody = r->SetBody();
  454.         s_ShowAlign(NcbiCout, repbody, scope, alparms, gapped);
  455.     }
  456.     
  457.     return 0;
  458. }
  459. Int4
  460. QueueAndPoll(string                program,
  461.              string                database,
  462.              string                service,
  463.              CNetblastSearchOpts & opts,
  464.              CNcbiIstream        & query_in,
  465.              bool                  verb,
  466.              bool                  trust_defline,
  467.              bool                  raw_asn,
  468.              CAlignParms         & alparms)
  469. {
  470.     Int4 err_ret = 0;
  471.         
  472.     // Read the FASTA input data
  473.     string fasta_line1;
  474.     string fasta_block;
  475.         
  476.     // Queue and poll
  477.     string RID;
  478.         
  479.     CRef<CObjectManager> objmgr(new CObjectManager);
  480.     CRef<CScope>         scope (new CScope(*objmgr));
  481.         
  482.     objmgr->RegisterDataLoader(*new CGBDataLoader("ID", 0, 2),
  483.                                CObjectManager::eDefault);
  484.     
  485.     scope->AddDefaults();
  486.         
  487.     CRef<CBioseq_set> cbss;
  488.     
  489.     bool amino         = s_IsAmino(program);
  490.     
  491.     int flags = fReadFasta_AllSeqIds; // | fReadFasta_OneSeq;
  492.     
  493.     if (amino) {
  494.         flags |= fReadFasta_AssumeProt;
  495.     } else {
  496.         flags |= fReadFasta_AssumeNuc;
  497.     }
  498.     
  499.     if (! trust_defline) {
  500.         flags |= fReadFasta_NoParseID;
  501.     }
  502.     
  503.     cbss = s_SetupQuery(query_in, scope, flags);
  504.     
  505.     string err;
  506.     
  507.     RID = s_QueueSearch(program,
  508.                         database,
  509.                         service,
  510.                         opts,
  511.                         cbss,
  512.                         verb,
  513.                         err);
  514.     
  515.     if (RID.size()) {
  516.         alparms.SetRID(RID);
  517.         
  518.         if (! err_ret) {
  519.             bool gapped = true;
  520.             
  521.             if (opts.Gapped().Exists()) {
  522.                 gapped = opts.Gapped().GetValue();
  523.             }
  524.             
  525.             err_ret =
  526.                 s_PollForResults(RID, verb, raw_asn, scope, alparms, gapped);
  527.         }
  528.     } else {
  529.         ERR_POST(Error << "Could not queue request.");
  530.         
  531.         if (err.size()) {
  532.             ERR_POST(Error << err);
  533.         }
  534.         
  535.         err_ret = -1;
  536.     }
  537.     
  538.     return err_ret;
  539. }
  540. /*
  541.  * ===========================================================================
  542.  *
  543.  * $Log: queue_poll.cpp,v $
  544.  * Revision 1000.3  2004/06/01 18:27:26  gouriano
  545.  * PRODUCTION: UPGRADED [GCC34_MSVC7] Dev-tree R1.9
  546.  *
  547.  * Revision 1.9  2004/05/21 21:41:38  gorelenk
  548.  * Added PCH ncbi_pch.hpp
  549.  *
  550.  * Revision 1.8  2004/05/05 18:20:39  bealer
  551.  * - Update for new ASN.1
  552.  *
  553.  * Revision 1.7  2004/01/30 23:49:59  bealer
  554.  * - Add better handling for results-not-found case.
  555.  *
  556.  * Revision 1.6  2004/01/05 17:59:31  vasilche
  557.  * Moved genbank loader and its readers sources to new location in objtools.
  558.  * Genbank is now in library libncbi_xloader_genbank.
  559.  * Id1 reader is now in library libncbi_xreader_id1.
  560.  * OBJMGR_LIBS macro updated correspondingly.
  561.  *
  562.  * Old headers temporarily will contain redirection to new location
  563.  * for compatibility:
  564.  * objmgr/gbloader.hpp > objtools/data_loaders/genbank/gbloader.hpp
  565.  * objmgr/reader_id1.hpp > objtools/data_loaders/genbank/readers/id1/reader_id1.hpp
  566.  *
  567.  * Revision 1.5  2003/12/29 19:48:30  bealer
  568.  * - Change code to accomodate first half of new ASN changes.
  569.  *
  570.  * Revision 1.4  2003/12/24 01:01:56  ucko
  571.  * Bandaid to compile with current ASN.1 spec.  All parameters are
  572.  * currently classified as algorithm options, and still need to be
  573.  * divided up properly.
  574.  *
  575.  * Revision 1.3  2003/11/05 19:17:45  bealer
  576.  * - Remove .data() from string argument passing.
  577.  *
  578.  * Revision 1.2  2003/09/26 20:01:43  bealer
  579.  * - Fix Solaris compile errors.
  580.  *
  581.  * Revision 1.1  2003/09/26 16:53:49  bealer
  582.  * - Add blast_client project for netblast protocol, initial code commit.
  583.  *
  584.  * ===========================================================================
  585.  */