- #include <ncbi_pch.hpp>
- #include <objtools/data_loaders/genbank/readers/id1/reader_id1_cache.hpp>
- #include <objtools/data_loaders/genbank/reader_snp.hpp>
- #include <objtools/data_loaders/genbank/split_parser.hpp>
- #include <corelib/ncbitime.hpp>
- #include <util/cache/blob_cache.hpp>
- #include <util/cache/int_cache.hpp>
- #include <util/cache/icache.hpp>
- #include <util/rwstream.hpp>
- #include <util/bytesrc.hpp>
- #include <serial/objistr.hpp>
- #include <serial/objistrasnb.hpp>
- #include <serial/objostrasnb.hpp>
- #include <objmgr/objmgr_exception.hpp>
- #include <objmgr/impl/snp_annot_info.hpp>
- #include <objmgr/impl/tse_chunk_info.hpp>
- #include <util/compress/reader_zlib.hpp>
- #include <connect/ncbi_conn_stream.hpp>
- #include <objects/seqset/Seq_entry.hpp>
- #include <objects/seqsplit/ID2S_Split_Info.hpp>
- #include <objects/seqsplit/ID2S_Chunk_Info.hpp>
- #include <objects/seqsplit/ID2S_Chunk.hpp>
- #include <objects/seqsplit/ID2S_Chunk_Id.hpp>
- #include <objects/id1/id1__.hpp>
- #include <objects/id2/ID2_Reply_Data.hpp>
- #include <serial/serial.hpp>
- #include <stdio.h>
- BEGIN_SCOPE(objects)
- /// Utility function to skip part of the input byte source
- void Id1ReaderSkipBytes(CByteSourceReader& reader, size_t to_skip);
- static size_t resolve_id_count = 0;
- static double resolve_id_time = 0;
- static size_t resolve_gi_count = 0;
- static double resolve_gi_time = 0;
- static size_t resolve_ver_count = 0;
- static double resolve_ver_time = 0;
- static size_t main_blob_count = 0;
- static double main_bytes = 0;
- static double main_time = 0;
- static size_t chunk_blob_count = 0;
- static double chunk_bytes = 0;
- static double chunk_time = 0;
- static size_t snp_load_count = 0;
- static double snp_load_bytes = 0;
- static double snp_load_time = 0;
- static size_t snp_store_count = 0;
- static double snp_store_bytes = 0;
- static double snp_store_time = 0;
- CCachedId1Reader::CCachedId1Reader(TConn noConn,
- IBLOB_Cache* blob_cache,
- IIntCache* id_cache)
- : CId1Reader(noConn),
- m_BlobCache(0), m_IdCache(0),
- m_OldBlobCache(0), m_OldIdCache(0)
- {
- SetBlobCache(blob_cache);
- SetIdCache(id_cache);
- }
- CCachedId1Reader::CCachedId1Reader(TConn noConn,
- ICache* blob_cache,
- ICache* id_cache)
- : CId1Reader(noConn),
- m_BlobCache(0), m_IdCache(0),
- m_OldBlobCache(0), m_OldIdCache(0)
- {
- SetBlobCache(blob_cache);
- SetIdCache(id_cache);
- }
- CCachedId1Reader::~CCachedId1Reader()
- {
- if ( CollectStatistics() ) {
- PrintStatistics();
- }
- }
- void CCachedId1Reader::PrintStatistics(void) const
- {
- PrintStat("Cache resolution: resolved",
- resolve_id_count, "ids", resolve_id_time);
- PrintStat("Cache resolution: resolved",
- resolve_gi_count, "gis", resolve_gi_time);
- PrintStat("Cache resolution: resolved",
- resolve_ver_count, "blob vers", resolve_ver_time);
- PrintBlobStat("Cache main: loaded",
- main_blob_count, main_bytes, main_time);
- PrintBlobStat("Cache chunk: loaded",
- chunk_blob_count, chunk_bytes, chunk_time);
- PrintBlobStat("Cache SNP: loaded",
- snp_load_count, snp_load_bytes, snp_load_time);
- PrintBlobStat("Cache SNP: stored",
- snp_store_count, snp_store_bytes, snp_store_time);
- }
- void CCachedId1Reader::SetBlobCache(ICache* blob_cache)
- {
- m_OldBlobCache = 0;
- m_BlobCache = blob_cache;
- }
- void CCachedId1Reader::SetIdCache(ICache* id_cache)
- {
- m_OldIdCache = 0;
- m_IdCache = id_cache;
- }
- void CCachedId1Reader::SetBlobCache(IBLOB_Cache* blob_cache)
- {
- m_BlobCache = 0;
- if ( blob_cache && blob_cache != m_OldBlobCache ) {
- ERR_POST(Warning << "CCachedId1Reader: "
- "IBLOB_Cache is deprecated, use ICache instead");
- }
- m_OldBlobCache = blob_cache;
- }
- void CCachedId1Reader::SetIdCache(IIntCache* id_cache)
- {
- m_IdCache = 0;
- if ( id_cache && id_cache != m_OldIdCache ) {
- ERR_POST(Warning << "CCachedId1Reader: "
- "IIntCache is deprecated, use ICache instead");
- }
- m_OldIdCache = id_cache;
- }
- string CCachedId1Reader::GetBlobKey(const CSeqref& seqref) const
- {
- int sat = seqref.GetSat();
- int sat_key = seqref.GetSatKey();
- char szBlobKeyBuf[256];
- sprintf(szBlobKeyBuf, "%i-%i", sat, sat_key);
- return szBlobKeyBuf;
- }
- string CCachedId1Reader::GetIdKey(int gi) const
- {
- return NStr::IntToString(gi);
- }
- string CCachedId1Reader::GetIdKey(const CSeq_id& id) const
- {
- return id.IsGi()? GetIdKey(id.GetGi()): id.AsFastaString();
- }
- const char* CCachedId1Reader::GetSeqrefsSubkey(void) const
- {
- return "srs";
- }
- const char* CCachedId1Reader::GetGiSubkey(void) const
- {
- return "gi";
- }
- const char* CCachedId1Reader::GetBlobVersionSubkey(void) const
- {
- return "ver";
- }
- const char* CCachedId1Reader::GetSeqEntrySubkey(void) const
- {
- return "Seq-entry";
- }
- const char* CCachedId1Reader::GetSNPTableSubkey(void) const
- {
- return "SNP table";
- }
- const char* CCachedId1Reader::GetSkeletonSubkey(void) const
- {
- return "Skeleton";
- }
- const char* CCachedId1Reader::GetSplitInfoSubkey(void) const
- {
- return "ID2S-Split-Info";
- }
- string CCachedId1Reader::GetChunkSubkey(int chunk_id) const
- {
- return "ID2S-Chunk "+NStr::IntToString(chunk_id);
- }
- void CCachedId1Reader::PurgeSeqrefs(const TSeqrefs& srs, const CSeq_id& id)
- {
- if ( m_IdCache ) {
- m_IdCache->Remove(GetIdKey(id));
- ITERATE ( TSeqrefs, it, srs ) {
- const CSeqref& sr = **it;
- m_IdCache->Remove(GetBlobKey(sr));
- }
- }
- else if ( m_OldIdCache ) {
- ITERATE ( TSeqrefs, it, srs ) {
- const CSeqref& sr = **it;
- m_OldIdCache->Remove(sr.GetGi(), 0);
- m_OldIdCache->Remove(sr.GetSatKey(), sr.GetSat());
- }
- }
- }
- bool CCachedId1Reader::x_GetIdCache(const string& key,
- const string& subkey,
- vector<int>& ints)
- {
- CStopWatch sw;
- if ( CollectStatistics() ) {
- sw.Start();
- }
- size_t size = m_IdCache->GetSize(key, 0, subkey);
- ints.resize(size / sizeof(int));
- if ( size == 0 || size % sizeof(int) != 0 ||
- !m_IdCache->Read(key, 0, subkey, &ints[0], size) ) {
- if ( CollectStatistics() ) {
- double time = sw.Elapsed();
- LogStat("CId1Cache: failed to read id cache record for id",
- key, subkey, time);
- resolve_id_count++;
- resolve_id_time += time;
- }
- return false;
- }
- if ( CollectStatistics() ) {
- double time = sw.Elapsed();
- LogStat("CId1Cache: resolved id", key, subkey, time);
- resolve_id_count++;
- resolve_id_time += time;
- }
- return true;
- }
- bool CCachedId1Reader::x_GetIdCache(const string& key,
- const string& subkey,
- int& value)
- {
- CStopWatch sw;
- if ( CollectStatistics() ) {
- sw.Start();
- }
- size_t size = m_IdCache->GetSize(key, 0, subkey);
- if ( size != sizeof(int) ||
- !m_IdCache->Read(key, 0, subkey, &value, size) ) {
- if ( CollectStatistics() ) {
- double time = sw.Elapsed();
- LogStat("CId1Cache: failed to read id cache record for id",
- key, subkey, time);
- resolve_id_count++;
- resolve_id_time += time;
- }
- return false;
- }
- if ( CollectStatistics() ) {
- double time = sw.Elapsed();
- LogStat("CId1Cache: resolved id", key, subkey, time);
- resolve_id_count++;
- resolve_id_time += time;
- }
- return true;
- }
- void CCachedId1Reader::x_StoreIdCache(const string& key,
- const string& subkey,
- const vector<int>& ints)
- {
- CStopWatch sw;
- if ( CollectStatistics() ) {
- sw.Start();
- }
- m_IdCache->Store(key, 0, subkey, &ints[0], ints.size()*sizeof(int));
- if ( CollectStatistics() ) {
- double time = sw.Elapsed();
- LogStat("CId1Cache: stored id", key, subkey, time);
- resolve_id_count++;
- resolve_id_time += time;
- }
- }
- void CCachedId1Reader::x_StoreIdCache(const string& key,
- const string& subkey,
- const int& value)
- {
- CStopWatch sw;
- if ( CollectStatistics() ) {
- sw.Start();
- }
- m_IdCache->Store(key, 0, subkey, &value, sizeof(value));
- if ( CollectStatistics() ) {
- double time = sw.Elapsed();
- LogStat("CId1Cache: stored id", key, subkey, time);
- resolve_id_count++;
- resolve_id_time += time;
- }
- }
- bool CCachedId1Reader::GetSeqrefs(const string& key, TSeqrefs& srs)
- {
- vector<int> data;
- if ( !x_GetIdCache(key, GetSeqrefsSubkey(), data) ) {
- return false;
- }
- if ( data.size() % 5 != 0 || data.size() > 50 ) {
- return false;
- }
- ITERATE ( vector<int>, it, data ) {
- int gi = *it++;
- int sat = *it++;
- int satkey = *it++;
- int version = *it++;
- int flags = *it;
- CRef<CSeqref> sr(new CSeqref(gi, sat, satkey));
- sr->SetVersion(version);
- sr->SetFlags(flags);
- srs.push_back(sr);
- }
- return true;
- }
- void CCachedId1Reader::StoreSeqrefs(const string& key, const TSeqrefs& srs)
- {
- vector<int> data;
- ITERATE ( TSeqrefs, it, srs ) {
- const CSeqref& sr = **it;
- data.push_back(sr.GetGi());
- data.push_back(sr.GetSat());
- data.push_back(sr.GetSatKey());
- data.push_back(sr.GetVersion());
- data.push_back(sr.GetFlags());
- }
- x_StoreIdCache(key, GetSeqrefsSubkey(), data);
- }
- bool CCachedId1Reader::GetSeqrefs(int gi, TSeqrefs& srs)
- {
- if ( m_IdCache ) {
- return GetSeqrefs(GetIdKey(gi), srs);
- }
- else if ( m_OldIdCache) {
- CStopWatch sw;
- if ( CollectStatistics() ) {
- sw.Start();
- }
- vector<int> data;
- if ( !m_OldIdCache->Read(gi, 0, data) ) {
- if ( CollectStatistics() ) {
- double time = sw.Elapsed();
- LogStat("CId1Cache: failed to resolve gi", gi, time);
- resolve_gi_count++;
- resolve_gi_time += time;
- }
- return false;
- }
- _ASSERT(data.size() == 4 || data.size() == 8);
- for ( size_t pos = 0; pos + 4 <= data.size(); pos += 4 ) {
- int sat = data[pos];
- int satkey = data[pos+1];
- int version = data[pos+2];
- int flags = data[pos+3];
- CRef<CSeqref> sr(new CSeqref(gi, sat, satkey));
- sr->SetVersion(version);
- sr->SetFlags(flags);
- srs.push_back(sr);
- }
- if ( CollectStatistics() ) {
- double time = sw.Elapsed();
- LogStat("CId1Cache: resolved gi", gi, time);
- resolve_gi_count++;
- resolve_gi_time += time;
- }
- return true;
- }
- else {
- return false;
- }
- }
- void CCachedId1Reader::StoreSeqrefs(int gi, const TSeqrefs& srs)
- {
- if ( m_IdCache ) {
- StoreSeqrefs(GetIdKey(gi), srs);
- }
- else if ( m_OldIdCache ) {
- CStopWatch sw;
- if ( CollectStatistics() ) {
- sw.Start();
- }
- vector<int> data;
- ITERATE ( TSeqrefs, it, srs ) {
- const CSeqref& sr = **it;
- data.push_back(sr.GetSat());
- data.push_back(sr.GetSatKey());
- data.push_back(sr.GetVersion());
- data.push_back(sr.GetFlags());
- }
- _ASSERT(data.size() == 4 || data.size() == 8);
- m_OldIdCache->Store(gi, 0, data);
- if ( CollectStatistics() ) {
- double time = sw.Elapsed();
- LogStat("CId1Cache: saved gi", gi, time);
- resolve_gi_count++;
- resolve_gi_time += time;
- }
- }
- }
- bool CCachedId1Reader::GetSeqrefs(const CSeq_id& id, TSeqrefs& srs)
- {
- if ( m_IdCache ) {
- return GetSeqrefs(GetIdKey(id), srs);
- }
- else {
- return false;
- }
- }
- void CCachedId1Reader::StoreSeqrefs(const CSeq_id& id, const TSeqrefs& srs)
- {
- if ( m_IdCache ) {
- StoreSeqrefs(GetIdKey(id), srs);
- }
- }
- int CCachedId1Reader::GetBlobVersion(const CSeqref& seqref)
- {
- if ( m_IdCache ) {
- int version = 0;
- if ( x_GetIdCache(GetBlobKey(seqref),
- GetBlobVersionSubkey(),
- version) ) {
- return version;
- }
- }
- else if ( m_OldIdCache ) {
- CStopWatch sw;
- if ( CollectStatistics() ) {
- sw.Start();
- }
- vector<int> data;
- if ( !m_OldIdCache->Read(seqref.GetSatKey(), seqref.GetSat(), data) ) {
- if ( CollectStatistics() ) {
- double time = sw.Elapsed();
- LogStat("CId1Cache: failed to get blob version",
- seqref.printTSE(), time);
- resolve_ver_count++;
- resolve_ver_time += time;
- }
- return 0;
- }
- _ASSERT(data.size() == 1);
- if ( CollectStatistics() ) {
- double time = sw.Elapsed();
- LogStat("CId1Cache: got blob version", seqref.printTSE(), time);
- resolve_ver_count++;
- resolve_ver_time += time;
- }
- return data[0];
- }
- return 0;
- }
- void CCachedId1Reader::StoreBlobVersion(const CSeqref& seqref, int version)
- {
- if ( m_IdCache ) {
- x_StoreIdCache(GetBlobKey(seqref),
- GetBlobVersionSubkey(),
- version);
- }
- else if ( m_OldIdCache ) {
- CStopWatch sw;
- if ( CollectStatistics() ) {
- sw.Start();
- }
- vector<int> data;
- data.push_back(version);
- _ASSERT(data.size() == 1);
- m_OldIdCache->Store(seqref.GetSatKey(), seqref.GetSat(), data);
- if ( CollectStatistics() ) {
- double time = sw.Elapsed();
- LogStat("CId1Cache: saved blob version", seqref.printTSE(), time);
- resolve_ver_count++;
- resolve_ver_time += time;
- }
- }
- }
- int CCachedId1Reader::ResolveSeq_id_to_gi(const CSeq_id& id, TConn conn)
- {
- if ( m_IdCache ) {
- int gi = 0;
- string key = GetIdKey(id);
- string subkey = GetGiSubkey();
- if ( !x_GetIdCache(key, subkey, gi) ) {
- gi = CId1Reader::ResolveSeq_id_to_gi(id, conn);
- x_StoreIdCache(key, subkey, gi);
- }
- return gi;
- }
- else {
- return CId1Reader::ResolveSeq_id_to_gi(id, conn);
- }
- }
- void CCachedId1Reader::RetrieveSeqrefs(TSeqrefs& srs, int gi, TConn conn)
- {
- if ( !GetSeqrefs(gi, srs) ) {
- CId1Reader::RetrieveSeqrefs(srs, gi, conn);
- StoreSeqrefs(gi, srs);
- }
- }
- void CCachedId1Reader::GetTSEChunk(const CSeqref& seqref,
- CTSE_Chunk_Info& chunk_info,
- TConn /*conn*/)
- {
- if ( m_BlobCache ) {
- CID2_Reply_Data chunk_data;
- string key = GetBlobKey(seqref);
- string subkey = GetChunkSubkey(chunk_info.GetChunkId());
- if ( !LoadData(key, seqref.GetVersion(), subkey.c_str(),
- chunk_data) ) {
- NCBI_THROW(CLoaderException, eLoaderFailed,
- "CCachedId1Reader::GetTSEChunk: chunk is missing");
- }
- CRef<CID2S_Chunk> chunk(new CID2S_Chunk);
- size_t size = 0;
- {{
- CRef<CByteSourceReader> reader = GetReader(chunk_data,
- eDataType_Chunk);
- AutoPtr<CObjectIStream> in(OpenData(chunk_data, *reader));
- CReader::SetSeqEntryReadHooks(*in);
- *in >> *chunk;
- size = in->GetStreamOffset();
- }}
- CSplitParser::Load(chunk_info, *chunk);
- // everything is fine
- }
- else if ( m_OldBlobCache ) {
- CStopWatch sw;
- if ( CollectStatistics() ) {
- sw.Start();
- }
- CID2_Reply_Data chunk_data;
- string key = GetBlobKey(seqref);
- string suffix = "-chunk-"+NStr::IntToString(chunk_info.GetChunkId());
- if ( !LoadData(key, suffix.c_str(), seqref.GetVersion(),
- chunk_data) ) {
- NCBI_THROW(CLoaderException, eLoaderFailed,
- "CCachedId1Reader::GetTSEChunk: chunk is missing");
- }
- CRef<CID2S_Chunk> chunk(new CID2S_Chunk);
- size_t size = 0;
- {{
- CRef<CByteSourceReader> reader = GetReader(chunk_data,
- eDataType_Chunk);
- AutoPtr<CObjectIStream> in(OpenData(chunk_data, *reader));
- CReader::SetSeqEntryReadHooks(*in);
- *in >> *chunk;
- size = in->GetStreamOffset();
- }}
- CSplitParser::Load(chunk_info, *chunk);
- // everything is fine
- if ( CollectStatistics() ) {
- double time = sw.Elapsed();
- LogBlobStat("CId1Cache: read chunk", seqref, size, time);
- chunk_blob_count++;
- chunk_bytes += size;
- chunk_time += time;
- }
- }
- else {
- }
- }
- int CCachedId1Reader::x_GetVersion(const CSeqref& seqref, TConn conn)
- {
- int version = GetBlobVersion(seqref);
- if ( version == 0 ) {
- version = CId1Reader::x_GetVersion(seqref, conn);
- _ASSERT(version != 0);
- StoreBlobVersion(seqref, version);
- }
- return version;
- }
- void CCachedId1Reader::x_GetTSEBlob(CID1server_back& id1_reply,
- CRef<CID2S_Split_Info>& split_info,
- const CSeqref& seqref,
- TConn conn)
- {
- // update seqref's version
- GetVersion(seqref, conn);
- if ( !LoadBlob(id1_reply, split_info, seqref) ) {
- // we'll intercept loading deeper and write loaded data on the fly
- CId1Reader::x_GetTSEBlob(id1_reply, split_info, seqref, conn);
- }
- }
- void CCachedId1Reader::x_ReadTSEBlob(CID1server_back& id1_reply,
- const CSeqref& seqref,
- CNcbiIstream& stream)
- {
- if ( m_BlobCache ) {
- string key = GetBlobKey(seqref);
- int ver = seqref.GetVersion();
- string subkey = GetSeqEntrySubkey();
- try {
- auto_ptr<IWriter> writer(
- m_BlobCache->GetWriteStream(key, ver, subkey));
- if ( writer.get() ) {
- {{
- CWriterByteSourceReader proxy(&stream, writer.get());
- CObjectIStreamAsnBinary obj_stream(proxy);
- CStreamDelayBufferGuard guard(obj_stream);
- CId1Reader::x_ReadTSEBlob(id1_reply, obj_stream);
- }}
- writer->Flush();
- // everything is fine
- return;
- }
- }
- catch ( ... ) {
- // In case of an error we need to remove incomplete BLOB
- // from the cache.
- try {
- m_BlobCache->Remove(key);
- }
- catch ( exception& /*exc*/ ) {
- // ignored
- }
- // continue with exception
- throw;
- }
- }
- else if ( m_OldBlobCache ) {
- string key = GetBlobKey(seqref);
- int version = seqref.GetVersion();
- try {
- auto_ptr<IWriter> writer(m_OldBlobCache->GetWriteStream(key,
- version));
- if ( writer.get() ) {
- {{
- CWriterByteSourceReader proxy(&stream, writer.get());
- CObjectIStreamAsnBinary obj_stream(proxy);
- CStreamDelayBufferGuard guard(obj_stream);
- CId1Reader::x_ReadTSEBlob(id1_reply, obj_stream);
- }}
- writer->Flush();
- writer.reset();
- // everything is fine
- return;
- }
- }
- catch ( ... ) {
- // In case of an error we need to remove incomplete BLOB
- // from the cache.
- try {
- m_OldBlobCache->Remove(key);
- }
- catch ( exception& /*exc*/ ) {
- // ignored
- }
- // continue with exception
- throw;
- }
- }
- // by deault read from ID1
- CId1Reader::x_ReadTSEBlob(id1_reply, seqref, stream);
- }
- void CCachedId1Reader::x_GetSNPAnnot(CSeq_annot_SNP_Info& snp_info,
- const CSeqref& seqref,
- TConn conn)
- {
- // update seqref's version
- GetVersion(seqref, conn);
- if ( !LoadSNPTable(snp_info, seqref) ) {
- snp_info.Reset();
- // load SNP table from GenBank
- CId1Reader::x_GetSNPAnnot(snp_info, seqref, conn);
- // and store SNP table in cache
- StoreSNPTable(snp_info, seqref);
- }
- }
- bool CCachedId1Reader::LoadBlob(CID1server_back& id1_reply,
- CRef<CID2S_Split_Info>& split_info,
- const CSeqref& seqref)
- {
- return LoadSplitBlob(id1_reply, split_info, seqref) ||
- LoadWholeBlob(id1_reply, seqref);
- }
- bool CCachedId1Reader::LoadWholeBlob(CID1server_back& id1_reply,
- const CSeqref& seqref)
- {
- if ( m_BlobCache ) {
- string key = GetBlobKey(seqref);
- int ver = seqref.GetVersion();
- string subkey = GetSeqEntrySubkey();
- CStopWatch sw;
- if ( CollectStatistics() ) {
- sw.Start();
- }
- try {
- auto_ptr<IReader> reader(
- m_BlobCache->GetReadStream(key, ver, subkey));
- if ( !reader.get() ) {
- return false;
- }
- CIRByteSourceReader rd(reader.get());
- CObjectIStreamAsnBinary in(rd);
- CReader::SetSeqEntryReadHooks(in);
- in >> id1_reply;
- // everything is fine
- if ( CollectStatistics() ) {
- double time = sw.Elapsed();
- size_t size = in.GetStreamOffset();
- LogBlobStat("CId1Cache: read blob", seqref, size, time);
- main_blob_count++;
- main_bytes += size;
- main_time += time;
- }
- return true;
- }
- catch ( exception& exc ) {
- ERR_POST("CId1Cache: Exception while loading cached blob: " <<
- seqref.printTSE() << ": " << exc.what());
- if ( CollectStatistics() ) {
- double time = sw.Elapsed();
- LogBlobStat("CId1Cache: read fail blob",
- seqref, 0, time);
- main_blob_count++;
- main_time += time;
- }
- return false;
- }
- }
- else if ( m_OldBlobCache ) {
- string key = GetBlobKey(seqref);
- int ver = seqref.GetVersion();
- CStopWatch sw;
- if ( CollectStatistics() ) {
- sw.Start();
- }
- try {
- auto_ptr<IReader> reader(m_OldBlobCache->GetReadStream(key, ver));
- if ( !reader.get() ) {
- return false;
- }
- CIRByteSourceReader rd(reader.get());
- CObjectIStreamAsnBinary in(rd);
- CReader::SetSeqEntryReadHooks(in);
- in >> id1_reply;
- // everything is fine
- if ( CollectStatistics() ) {
- double time = sw.Elapsed();
- size_t size = in.GetStreamOffset();
- LogBlobStat("CId1Cache: read blob", seqref, size, time);
- main_blob_count++;
- main_bytes += size;
- main_time += time;
- }
- return true;
- }
- catch ( exception& exc ) {
- ERR_POST("CId1Cache: Exception while loading cached blob: " <<
- seqref.printTSE() << ": " << exc.what());
- if ( CollectStatistics() ) {
- double time = sw.Elapsed();
- LogBlobStat("CId1Cache: read fail blob",
- seqref, 0, time);
- main_blob_count++;
- main_time += time;
- }
- return false;
- }
- }
- else {
- return false;
- }
- }
- bool CCachedId1Reader::LoadSplitBlob(CID1server_back& id1_reply,
- CRef<CID2S_Split_Info>& split_info,
- const CSeqref& seqref)
- {
- if ( m_BlobCache ) {
- string key = GetBlobKey(seqref);
- int ver = seqref.GetVersion();
- try {
- CID2_Reply_Data main_data, split_data;
- if ( !LoadData(key, ver, GetSkeletonSubkey(), main_data) ||
- !LoadData(key, ver, GetSplitInfoSubkey(), split_data) ) {
- return false;
- }
- size_t size = 0;
- CRef<CSeq_entry> main(new CSeq_entry);
- {{
- CRef<CByteSourceReader> reader(GetReader(main_data,
- eDataType_MainBlob));
- AutoPtr<CObjectIStream> in(OpenData(main_data, *reader));
- CReader::SetSeqEntryReadHooks(*in);
- *in >> *main;
- size += in->GetStreamOffset();
- }}
- CRef<CID2S_Split_Info> split(new CID2S_Split_Info);
- {{
- CRef<CByteSourceReader> reader(GetReader(split_data,
- eDataType_SplitInfo));
- AutoPtr<CObjectIStream> in(OpenData(split_data, *reader));
- CReader::SetSeqEntryReadHooks(*in);
- *in >> *split;
- size += in->GetStreamOffset();
- }}
- id1_reply.SetGotseqentry(*main);
- split_info = split;
- // everything is fine
- return true;
- }
- catch ( exception& exc ) {
- ERR_POST("CId1Cache: Exception while loading cached blob: " <<
- seqref.printTSE() << ": " << exc.what());
- return false;
- }
- return false;
- }
- else if ( m_OldBlobCache ) {
- string key = GetBlobKey(seqref);
- int ver = seqref.GetVersion();
- CStopWatch sw;
- if ( CollectStatistics() ) {
- sw.Start();
- }
- try {
- CID2_Reply_Data main_data, split_data;
- if ( !LoadData(key, "-main", ver, main_data) ||
- !LoadData(key, "-split", ver, split_data) ) {
- return false;
- }
- size_t size = 0;
- CRef<CSeq_entry> main(new CSeq_entry);
- {{
- CRef<CByteSourceReader> reader(GetReader(main_data,
- eDataType_MainBlob));
- AutoPtr<CObjectIStream> in(OpenData(main_data, *reader));
- CReader::SetSeqEntryReadHooks(*in);
- *in >> *main;
- size += in->GetStreamOffset();
- }}
- CRef<CID2S_Split_Info> split(new CID2S_Split_Info);
- {{
- CRef<CByteSourceReader> reader(GetReader(split_data,
- eDataType_SplitInfo));
- AutoPtr<CObjectIStream> in(OpenData(split_data, *reader));
- CReader::SetSeqEntryReadHooks(*in);
- *in >> *split;
- size += in->GetStreamOffset();
- }}
- id1_reply.SetGotseqentry(*main);
- split_info = split;
- // everything is fine
- if ( CollectStatistics() ) {
- double time = sw.Elapsed();
- LogBlobStat("CId1Cache: read blob", seqref, size, time);
- main_blob_count++;
- main_bytes += size;
- main_time += time;
- }
- return true;
- }
- catch ( exception& exc ) {
- ERR_POST("CId1Cache: Exception while loading cached blob: " <<
- seqref.printTSE() << ": " << exc.what());
- if ( CollectStatistics() ) {
- double time = sw.Elapsed();
- LogBlobStat("CId1Cache: read fail blob",
- seqref, 0, time);
- main_blob_count++;
- main_time += time;
- }
- return false;
- }
- }
- else {
- return false;
- }
- }
- bool CCachedId1Reader::LoadSNPTable(CSeq_annot_SNP_Info& snp_info,
- const CSeqref& seqref)
- {
- if ( m_BlobCache ) {
- string key = GetBlobKey(seqref);
- int ver = seqref.GetVersion();
- string subkey = GetSNPTableSubkey();
- CStopWatch sw;
- if ( CollectStatistics() ) {
- sw.Start();
- }
- try {
- auto_ptr<IReader> reader(
- m_BlobCache->GetReadStream(key, ver, subkey));
- if ( !reader.get() ) {
- return false;
- }
- CRStream stream(reader.get());
- // table
- CSeq_annot_SNP_Info_Reader::Read(stream, snp_info);
- if ( CollectStatistics() ) {
- double time = sw.Elapsed();
- size_t size = m_BlobCache->GetSize(key, ver, subkey);
- LogBlobStat("CId1Cache: read SNP blob",
- seqref, size, time);
- snp_load_count++;
- snp_load_bytes += size;
- snp_load_time += time;
- }
- return true;
- }
- catch ( exception& exc ) {
- ERR_POST("CId1Cache: "
- "Exception while loading cached SNP table: "<<
- seqref.printTSE() << ": " << exc.what());
- snp_info.Reset();
- if ( CollectStatistics() ) {
- double time = sw.Elapsed();
- LogBlobStat("CId1Cache: read fail SNP blob",
- seqref, 0, time);
- snp_load_count++;
- snp_load_time += time;
- }
- return false;
- }
- }
- else if ( m_OldBlobCache ) {
- string key = GetBlobKey(seqref);
- int ver = seqref.GetVersion();
- CStopWatch sw;
- if ( CollectStatistics() ) {
- sw.Start();
- }
- try {
- auto_ptr<IReader> reader(m_OldBlobCache->GetReadStream(key, ver));
- if ( !reader.get() ) {
- return false;
- }
- CRStream stream(reader.get());
- // blob type
- char type[4];
- if ( !, 4) || memcmp(type, "STBL", 4) != 0 ) {
- if ( CollectStatistics() ) {
- double time = sw.Elapsed();
- LogBlobStat("CId1Cache: read fail SNP blob",
- seqref, 0, time);
- snp_load_count++;
- snp_load_time += time;
- }
- return false;
- }
- // table
- CSeq_annot_SNP_Info_Reader::Read(stream, snp_info);
- if ( CollectStatistics() ) {
- double time = sw.Elapsed();
- size_t size = m_OldBlobCache->GetSize(key, ver);
- LogBlobStat("CId1Cache: read SNP blob",
- seqref, size, time);
- snp_load_count++;
- snp_load_bytes += size;
- snp_load_time += time;
- }
- return true;
- }
- catch ( exception& exc ) {
- ERR_POST("CId1Cache: "
- "Exception while loading cached SNP table: "<<
- seqref.printTSE() << ": " << exc.what());
- snp_info.Reset();
- if ( CollectStatistics() ) {
- double time = sw.Elapsed();
- LogBlobStat("CId1Cache: read fail SNP blob",
- seqref, 0, time);
- snp_load_count++;
- snp_load_time += time;
- }
- return false;
- }
- }
- else {
- return false;
- }
- }
- void CCachedId1Reader::StoreSNPTable(const CSeq_annot_SNP_Info& snp_info,
- const CSeqref& seqref)
- {
- if ( m_BlobCache ) {
- string key = GetBlobKey(seqref);
- int ver = seqref.GetVersion();
- string subkey = GetSNPTableSubkey();
- CStopWatch sw;
- if ( CollectStatistics() ) {
- sw.Start();
- }
- try {
- {{
- auto_ptr<IWriter> writer;
- writer.reset(m_BlobCache->GetWriteStream(key, ver, subkey));
- if ( !writer.get() ) {
- return;
- }
- {{
- CWStream stream(writer.get());
- CSeq_annot_SNP_Info_Reader::Write(stream, snp_info);
- }}
- writer->Flush();
- writer.reset();
- }}
- if ( CollectStatistics() ) {
- double time = sw.Elapsed();
- size_t size = m_BlobCache->GetSize(key, ver, subkey);
- LogBlobStat("CId1Cache: saved SNP blob",
- seqref, size, time);
- snp_store_count++;
- snp_store_bytes += size;
- snp_store_time += time;
- }
- }
- catch ( exception& exc ) {
- ERR_POST("CId1Cache: "
- "Exception while storing SNP table: "<<
- seqref.printTSE() << ": " << exc.what());
- try {
- m_BlobCache->Remove(key);
- }
- catch ( exception& /*exc*/ ) {
- // ignored
- }
- if ( CollectStatistics() ) {
- double time = sw.Elapsed();
- LogBlobStat("CId1Cache: save fail SNP blob",
- seqref, 0, time);
- snp_store_count++;
- snp_store_time += time;
- }
- }
- }
- else if ( m_OldBlobCache ) {
- string key = GetBlobKey(seqref);
- int ver = seqref.GetVersion();
- CStopWatch sw;
- if ( CollectStatistics() ) {
- sw.Start();
- }
- try {
- {{
- auto_ptr<IWriter> writer;
- writer.reset(m_OldBlobCache->GetWriteStream(key, ver));
- if ( !writer.get() ) {
- return;
- }
- {{
- CWStream stream(writer.get());
- stream.write("STBL", 4);
- CSeq_annot_SNP_Info_Reader::Write(stream, snp_info);
- }}
- writer->Flush();
- writer.reset();
- }}
- if ( CollectStatistics() ) {
- double time = sw.Elapsed();
- size_t size = m_OldBlobCache->GetSize(key, ver);
- LogBlobStat("CId1Cache: saved SNP blob",
- seqref, size, time);
- snp_store_count++;
- snp_store_bytes += size;
- snp_store_time += time;
- }
- }
- catch ( exception& exc ) {
- ERR_POST("CId1Cache: "
- "Exception while storing SNP table: "<<
- seqref.printTSE() << ": " << exc.what());
- try {
- m_OldBlobCache->Remove(key);
- }
- catch ( exception& /*exc*/ ) {
- // ignored
- }
- if ( CollectStatistics() ) {
- double time = sw.Elapsed();
- LogBlobStat("CId1Cache: save fail SNP blob",
- seqref, 0, time);
- snp_store_count++;
- snp_store_time += time;
- }
- }
- }
- }
- bool CCachedId1Reader::LoadData(const string& key, int version,
- const char* suffix,
- CID2_Reply_Data& data)
- {
- AutoPtr<IReader> reader(m_BlobCache->GetReadStream(key, version, suffix));
- if ( !reader.get() ) {
- return false;
- }
- CIRByteSourceReader rd(reader.get());
- CObjectIStreamAsnBinary in(rd);
- in >> data;
- return true;
- }
- bool CCachedId1Reader::LoadData(const string& key, const char* suffix,
- int version, CID2_Reply_Data& data)
- {
- AutoPtr<IReader> reader(m_OldBlobCache->GetReadStream(key + suffix,
- version));
- if ( !reader.get() ) {
- return false;
- }
- CIRByteSourceReader rd(reader.get());
- CObjectIStreamAsnBinary in(rd);
- in >> data;
- return true;
- }
- class CVectorListReader : public CByteSourceReader
- {
- public:
- typedef list< vector< char >* > TData;
- CVectorListReader(const TData& data)
- : m_Data(data),
- m_CurrentIter(data.begin()),
- m_CurrentOffset(0)
- {
- }
- size_t Read(char* buffer, size_t bufferLength)
- {
- while ( m_CurrentIter != m_Data.end() ) {
- const vector<char> curr = **m_CurrentIter;
- if ( m_CurrentOffset < curr.size() ) {
- size_t remaining = curr.size() - m_CurrentOffset;
- size_t count = min(bufferLength, remaining);
- memcpy(buffer, &curr[m_CurrentOffset], count);
- m_CurrentOffset += count;
- return count;
- }
- ++m_CurrentIter;
- m_CurrentOffset = 0;
- }
- return 0;
- }
- private:
- const TData& m_Data;
- TData::const_iterator m_CurrentIter;
- size_t m_CurrentOffset;
- };
- CRef<CByteSourceReader> CCachedId1Reader::GetReader(CID2_Reply_Data& data,
- EDataType data_type)
- {
- CRef<CByteSourceReader> ret;
- if ( data.GetData_type() != data_type ) {
- return ret;
- }
- ret.Reset(new CVectorListReader(data.GetData()));
- switch ( data.GetData_compression() ) {
- case eCompression_none:
- break;
- case eCompression_nlm_zip:
- ret.Reset(new CNlmZipBtRdr(ret.GetPointer()));
- break;
- default:
- NCBI_THROW(CLoaderException, eLoaderFailed,
- "unknown compression");
- }
- return ret;
- }
- AutoPtr<CObjectIStream> CCachedId1Reader::OpenData(CID2_Reply_Data& data,
- CByteSourceReader& reader)
- {
- if ( data.GetData_format() != eSerial_AsnBinary ) {
- NCBI_THROW(CLoaderException, eLoaderFailed,
- "unknown serial format");
- }
- return new CObjectIStreamAsnBinary(reader);
- }
- END_SCOPE(objects)
