response.cpp
上传用户:egreat
上传日期:2007-07-13
资源大小:29k
文件大小:12k
- #include "config.h"
- #include <string>
- #include <vector>
- #include <set>
- #include <map>
- #include <iostream>
- #include <boost/thread/detail/config.hpp>
- #include <boost/thread/thread.hpp>
- #include <boost/thread/recursive_mutex.hpp>
- #include <boost/date_time/gregorian/gregorian.hpp>
- #include <boost/date_time/posix_time/posix_time_types.hpp>
- // #include <boost/smart_assert/assert.hpp>
- #include <boost/static_assert.hpp>
- #include <loki/Factory.h>
- #include "../zlib/zlib.h"
- #include "request.h"
- #include "response.h"
- #include "stock.h"
- #include "util.hpp"
- namespace StockMarket{
- recursive_mutex res_fact_mutex;
- recursive_mutex process_packet_mutex;
- StockRes::StockRes():raw_data(0), raw_len(0), data(0),len(0),current(0),left(0),dataNew(false)
- {}
- StockRes::StockRes(uchar* pData, ulong data_len) : raw_data(pData), raw_len(data_len)
- {
- this->read(pData, data_len);
- }
- StockRes::~StockRes()
- {
- if(dataNew) delete[] data;
- }
- uint StockRes::read(const uchar* pData, ulong data_len)
- {
- raw_data = current = data = pData;
- dataNew = false;
- ulong len1 = get_packet_len();
- len1 = len1 < data_len ? len1 : data_len;
- raw_len = left = len = len1;
- ulong len2 = *(ushort*)(pData + 14);
- skip_byte(16 - 4); // this time, the data and len are re-adjust !! // reserve the len2 for umcompress_if
- umcompress_if();
- if(len != len2)
- {
- throw StockResWrongData(8);
- }
- return len1;
- }
- ulong StockRes::buff_left()
- {
- return left;
- }
- bool StockRes::end()
- {
- return 0 == left;
- }
- void StockRes::reset()
- {
- current = data;
- left = len;
- }
- void StockRes::skip_byte(ulong count)
- {
- if(count > left)
- throw StockResWrongData(1);
- current += count;
- left -= count;
- }
- void StockRes::skip_data(ulong count)
- {
- while(count --)
- {
- if(current[0] < 0x80)
- skip_byte(1);
- else if(current[1] < 0x80)
- skip_byte(2);
- else
- skip_byte(3);
- }
- }
- char StockRes::get_char()
- {
- char v = (*current);
- skip_byte(1);
- return v;
- }
- uchar StockRes::get_uchar()
- {
- uchar v = (uchar)(*current);
- skip_byte(1);
- return v;
- }
- short StockRes::get_short()
- {
- short v = *(short*)current;
- skip_byte(2);
- return v;
- }
- ushort StockRes::get_ushort()
- {
- ushort v = *(ushort*)current;
- skip_byte(2);
- return v;
- }
- int StockRes::get_int()
- {
- int v = *(int*)current;
- skip_byte(4);
- return v;
- }
- uint StockRes::get_uint()
- {
- uint v = *(uint*)current;
- skip_byte(4);
- return v;
- }
- float StockRes::get_float()
- {
- float v = *(float*)current;
- BOOST_STATIC_ASSERT(sizeof(float) == 4);
- skip_byte(4);
- return v;
- }
-
- int StockRes::parse_data()
- {
- if((current[0] >= 0x40 && current[0] < 0x80) || current[0] >= 0xc0) // negative
- {
- return 0x40 - parse_data2();
- }
- else
- return parse_data2();
- }
- int StockRes::parse_data2() // we don't know this is correct yet
- {
- // 8f ff ff ff 1f == -49
- // bd ff ff ff 1f == -3
- // b0 fe ff ff 1f == -80
- // 8c 01 == 76
- // a8 fb b6 01 == 1017 万
- // a3 8e 11 == 14.02 万
- // 82 27 == 2498
- int v;
- int nBytes = 0;
- while(current[nBytes] >= 0x80)
- {
- ++nBytes;
- }
- ++nBytes;
- switch(nBytes)
- {
- case 1:
- v = current[0];
- break;
- case 2:
- v = current[1] * 0x40 + current[0] - 0x80;
- break;
- case 3:
- v = (current[2] * 0x80 + current[1] - 0x80) * 0x40 + current[0] - 0x80;
- break;
- case 4:
- v = ((current[3] * 0x80 + current[2] - 0x80) * 0x80 + current[1] - 0x80) * 0x40 + current[0] - 0x80;
- break;
- case 5:
- // over flow, positive to negative
- v = (((current[4] * 0x80 + current[3] - 0x80) * 0x80 + current[2] - 0x80) * 0x80 + current[1] - 0x80)* 0x40 + current[0] - 0x80;
- break;
- case 6:
- // over flow, positive to negative
- v = ((((current[5] * 0x80 + current[4] -0x80) * 0x80 + current[3] - 0x80) * 0x80 + current[2] - 0x80) * 0x80 + current[1] - 0x80) * 0x40 + current[0] - 0x80;
- break;
- default:
- throw StockResWrongData(10);
- }
- skip_byte(nBytes);
- return v;
- }
- // if the data is compressed, then uncompressed them; Or else do nothing
- void StockRes::umcompress_if()
- {
- len = *(ushort*)(current);
- ulong new_len = *(ushort*)(current + 2);
- ulong new_len2 = new_len;
- skip_byte(4); // now the current moved to the real or compressed data
- if(0x78 == current[0] && 0x9c == (uchar)current[1])
- {
- uchar* new_data = new uchar[new_len];
- if(0 == uncompress((Byte*)new_data, &new_len, (Byte*)current, len) && new_len == new_len2)
- {
- dataNew = true;
- current = data = new_data;
- left = len = new_len;
- }
- else
- {
- delete[] new_data;
- throw StockResWrongData(2);
- }
- }
- else
- {
- data += 16;
- current = data;
- }
- }
- void StockHeartBeatRes ::operator()(){}
-
- ushort StockRes::get_packet_len()
- {
- return get_packet_len(raw_data);
- }
- ushort StockRes::get_packet_len(const uchar* pData)
- {
- return (*(ushort*)(pData + 12)) + 16;
- }
- uint StockRes::get_seq_id()
- {
- return get_seq_id(raw_data);
- }
- uint StockRes::get_seq_id(const uchar* pData)
- {
- return *(uint*)(pData + 5);
- }
- ushort StockRes::get_cmd_id()
- {
- return get_cmd_id(raw_data);
- }
- ushort StockRes::get_cmd_id(const uchar* pData)
- {
- return *(ushort*)(pData + 10);
- }
- StockListRes::StockListRes():StockRes()
- {}
- StockListRes::StockListRes(uchar* pData, ulong data_len):StockRes(pData, data_len)
- {
- // assert(CMD_STOCK_LIST == get_cmd_id());
- }
- void StockListRes::operator()()
- {
- posix_time::ptime now(posix_time::second_clock::local_time());
- posix_time::time_duration td(now.time_of_day());
- // uchar market = *(data + 4); // not market, but market location that is no use
- string code1((char*)(data + 5), MarketInfo::StocksCodeLen);
- short market = MarketInfo::get_market_type(code1);
- if(market >= MarketInfo::MARKET_UNKNOWN)
- {
- throw StockResWrongData(9);
- }
- MarketInfo::stocks_count[market] = get_ushort();
-
- StockBid::Bid bid; // zero it.
- ushort count = get_ushort();
- while(count--)
- {
- std::string code((char*)(current + 1), MarketInfo::StocksCodeLen);
- // Set up the lists of all stocks.
- if(MarketInfo::stocks_set[market].size() < MarketInfo::stocks_count[market]
- && market == MarketInfo::get_market_type(code)
- && (!(market == MarketInfo::MARKET_INDEX && code[0] < '3')))
- {
- MarketInfo::stocks_set[market].insert(code);
- }
- else
- {
- continue;
- }
- skip_byte(MarketInfo::StocksCodeLen + 1);
- bid.minute = (uint)(td.hours() * 60 + td.minutes());
- if(bid.minute > 15 * 60)
- bid.minute = 15 * 60;
- else if(11 * 60 + 30 < bid.minute && bid.minute < 13 * 60)
- bid.minute = 11 * 60 + 30;
- bid.act = get_ushort();
- bid.price = parse_data();
- bid.y_close = parse_data() + bid.price;
- bid.open = parse_data() + bid.price;
- bid.high = parse_data() + bid.price;
- bid.low = parse_data() + bid.price;
- bid.buy = parse_data() + bid.price;
- bid.sell = parse_data() + bid.price;
- bid.total_vol = parse_data2();
- bid.avail_vol = parse_data2();
- skip_byte(4);
- bid.inner_vol = parse_data2();
- bid.outer_vol = parse_data2();
- bid.updown = parse_data();
- skip_data(1);
- bid.buy_price1 = parse_data() + bid.price;
- bid.sell_price1 = parse_data() + bid.price;
- bid.buy_vol1 = parse_data2();
- bid.sell_vol1 = parse_data2();
- bid.buy_price2 = parse_data() + bid.price;
- bid.sell_price2 = parse_data() + bid.price;
- bid.buy_vol2 = parse_data2();
- bid.sell_vol2 = parse_data2();
- bid.buy_price3 = parse_data() + bid.price;
- bid.sell_price3 = parse_data() + bid.price;
- bid.buy_vol3 = parse_data2();
- bid.sell_vol3 = parse_data2();
- bid.buy_price4 = parse_data() + bid.price;
- bid.sell_price4 = parse_data() + bid.price;
- bid.buy_vol4 = parse_data2();
- bid.sell_vol4 = parse_data2();
- bid.buy_price5 = parse_data() + bid.price;
- bid.sell_price5 = parse_data() + bid.price;
- bid.buy_vol5 = parse_data2();
- bid.sell_vol5 = parse_data2();
- recursive_mutex::scoped_lock guard(bid_mutex);
- if(!(market == MarketInfo::MARKET_INDEX && code[0] < '3'))
- instant_price_list[code][date_to_uint(gregorian::day_clock::local_day())].push_back(bid);
- // find next item.
- if(0 != count)
- {
- skip_byte(8);
- const uchar* p = 0;
- if(0 != (p = stock_code_search(current, 15)))
- {
- skip_byte(p - current - 1);
- }
- else
- {
- throw StockResWrongData(3);
- }
- }
- }
- return;
- }
- StockHoldChgRes::StockHoldChgRes():StockRes()
- {}
- StockHoldChgRes::StockHoldChgRes(uchar* pData, ulong data_len):StockRes(pData, data_len)
- {
- // assert(CMD_STOCKHOLD_CHANGE == get_cmd_id());
- }
- void StockHoldChgRes::operator()()
- {
- GBBQ gbbq;
- ushort stock_count = get_ushort();
- while(stock_count -- > 0)
- {
- skip_byte(1);
- if(!stockcode_is_valid(current))
- {
- throw StockResWrongData(4);
- }
- string s((char*)current, MarketInfo::StocksCodeLen);
- int uints = atoi(s.c_str());
- skip_byte(MarketInfo::StocksCodeLen);
- ushort record_count = get_ushort();
- while( record_count -- > 0)
- {
- skip_byte(MarketInfo::StocksCodeLen + 2);
- uint dt = get_uint();
- gbbq.code = s;
- gbbq.chg_type = get_uchar();
- gbbq.data.gb.old_cir = get_float();
- gbbq.data.gb.old_ttl = get_float();
- gbbq.data.gb.new_cir = get_float();
- gbbq.data.gb.new_ttl = get_float();
- stock_basic_info::Instance().add_stock_gbbq(dt, gbbq);
- }
- }
- }
- StockResWrongData::StockResWrongData( int i) : n_(i)
- {}
- const char * StockResWrongData::what() const throw()
- {
- return "StockResWrongData";
- }
- void ProcessResponsePacket(const uchar * pData , size_t len, bool test)
- {
- static uint packet_len = 0;
- static uint packet_curr_pos = 0;
- static uchar packet_buff[TCP_BUFSIZE_READ];
- recursive_mutex::scoped_lock guard(process_packet_mutex);
- // cout << hex<< (int)pData[0] << (int)pData[1] << (int)pData[2] << (int)pData[3];
- // cout << " len= " << len <<endl;
- if(test)
- {
- packet_len = len;
- }
- else
- {
- // application layer assembly 应用层组包
- if(pData[0] == 0xb1 && pData[1] == 0xcb && pData[2] == 0x74 && pData[3] == 0x00)
- {
- packet_curr_pos = 0;
- packet_len = StockRes::get_packet_len(pData);
- // cout << "packet_len= " << packet_len << endl;
- }
- }
- if(packet_curr_pos + len <= packet_len)
- {
- memcpy(&packet_buff[0] + packet_curr_pos, pData, len);
- packet_curr_pos += len;
- }
- else
- {
- return;
- }
- if(packet_curr_pos == packet_len)
- {
- // 组包完成, 调整指针
- pData = &packet_buff[0];
- len = packet_len;
- packet_len = packet_curr_pos = 0;
- }
- else
- {
- return;
- }
-
- const uchar *orig_ptr = pData; uint orig_len = len;
- int cmd_id = StockRes::get_cmd_id(pData);
- try{
- boost::scoped_ptr <StockRes> pCmd(ResponseFactory::Instance().CreateObject(cmd_id));
- uint processed;
- while(len > 0)
- {
- processed = pCmd->read(pData, len);
- (*pCmd)();
- Request::res_seq_id(StockRes::get_seq_id(pData));
- len -= processed;
- pData += processed;
- }
- }
- catch(StockResWrongData& e)
- {
- if(!test)
- {
- log_packet((const char*)orig_ptr, orig_len);
- }
- cout << "Exception Id = " << e.n_ << endl;
- }
- catch(...)
- {
- cout << "Command" << cmd_id << " not implemented";
- }
- packet_len = packet_curr_pos = 0;
- }
-
- StockRes* CreateStockListRes()
- {
- return new StockListRes;
- }
- StockRes* CreateStockHoldChgRes()
- {
- return new StockHoldChgRes;
- }
-
- StockRes* CreateStockHeartBeatRes()
- {
- return new StockHeartBeatRes;
- }
-
- }