response.cpp
上传用户:egreat
上传日期:2007-07-13
资源大小:29k
文件大小:12k
源码类别:

金融证券系统

开发平台:

Visual C++

  1. #include "config.h"
  2. #include <string>
  3. #include <vector>
  4. #include <set>
  5. #include <map>
  6. #include <iostream>
  7. #include <boost/thread/detail/config.hpp>
  8. #include <boost/thread/thread.hpp>
  9. #include <boost/thread/recursive_mutex.hpp>
  10. #include <boost/date_time/gregorian/gregorian.hpp>
  11. #include <boost/date_time/posix_time/posix_time_types.hpp>
  12. // #include <boost/smart_assert/assert.hpp>
  13. #include <boost/static_assert.hpp>
  14. #include <loki/Factory.h>
  15. #include "../zlib/zlib.h"
  16. #include "request.h"
  17. #include "response.h"
  18. #include "stock.h"
  19. #include "util.hpp"
  20. namespace StockMarket{
  21. recursive_mutex res_fact_mutex;
  22. recursive_mutex process_packet_mutex;
  23. StockRes::StockRes():raw_data(0), raw_len(0), data(0),len(0),current(0),left(0),dataNew(false)
  24. {}
  25. StockRes::StockRes(uchar* pData, ulong data_len) : raw_data(pData), raw_len(data_len)
  26. {
  27. this->read(pData, data_len);
  28. }
  29. StockRes::~StockRes()
  30. {
  31. if(dataNew) delete[] data;
  32. }
  33. uint StockRes::read(const uchar* pData, ulong data_len)
  34. {
  35. raw_data = current = data = pData;
  36. dataNew = false;
  37. ulong len1 = get_packet_len();
  38. len1 = len1 < data_len ? len1 : data_len;
  39. raw_len = left = len = len1;
  40. ulong len2 = *(ushort*)(pData + 14);
  41. skip_byte(16 - 4); // this time, the data and len are re-adjust !! // reserve the len2 for umcompress_if
  42. umcompress_if();
  43. if(len != len2)
  44. {
  45. throw StockResWrongData(8);
  46. }
  47. return len1;
  48. }
  49. ulong StockRes::buff_left()
  50. {
  51. return left;
  52. }
  53. bool StockRes::end()
  54. {
  55. return 0 == left;
  56. }
  57. void StockRes::reset()
  58. {
  59. current = data;
  60. left = len;
  61. }
  62. void StockRes::skip_byte(ulong count)
  63. {
  64. if(count > left) 
  65. throw StockResWrongData(1);
  66. current += count;
  67. left -= count;
  68. }
  69. void StockRes::skip_data(ulong count)
  70. {
  71. while(count --)
  72. {
  73. if(current[0] < 0x80) 
  74. skip_byte(1);
  75. else if(current[1] < 0x80)
  76. skip_byte(2);
  77. else
  78. skip_byte(3);
  79. }
  80. }
  81. char StockRes::get_char()
  82. {
  83. char v = (*current);
  84. skip_byte(1);
  85. return v;
  86. }
  87. uchar StockRes::get_uchar()
  88. {
  89. uchar v = (uchar)(*current);
  90. skip_byte(1);
  91. return v;
  92. }
  93. short StockRes::get_short()
  94. {
  95. short v = *(short*)current;
  96. skip_byte(2);
  97. return v;
  98. }
  99. ushort StockRes::get_ushort()
  100. {
  101. ushort v = *(ushort*)current;
  102. skip_byte(2);
  103. return v;
  104. }
  105. int StockRes::get_int()
  106. {
  107. int v = *(int*)current;
  108. skip_byte(4);
  109. return v;
  110. }
  111. uint StockRes::get_uint()
  112. {
  113. uint v = *(uint*)current;
  114. skip_byte(4);
  115. return v;
  116. }
  117. float StockRes::get_float()
  118. {
  119. float v = *(float*)current;
  120. BOOST_STATIC_ASSERT(sizeof(float) == 4);
  121. skip_byte(4);
  122. return v;
  123. }
  124. int StockRes::parse_data()
  125. {
  126. if((current[0] >= 0x40 && current[0] < 0x80)  || current[0] >= 0xc0) // negative
  127. {
  128. return 0x40 - parse_data2();
  129. }
  130. else
  131. return parse_data2();
  132. }
  133. int StockRes::parse_data2() // we don't know this is correct yet
  134. {
  135. // 8f ff ff ff 1f == -49
  136. // bd ff ff ff 1f == -3
  137. // b0 fe ff ff 1f == -80
  138. // 8c 01  == 76
  139. // a8 fb b6 01 == 1017 万
  140. // a3 8e 11    == 14.02 万
  141. // 82 27         == 2498
  142. int v;
  143. int nBytes = 0;
  144. while(current[nBytes] >= 0x80)
  145. {
  146. ++nBytes;
  147. }
  148. ++nBytes;
  149. switch(nBytes)
  150. {
  151. case 1:
  152. v = current[0];
  153. break;
  154. case 2:
  155. v = current[1] * 0x40 + current[0] - 0x80;
  156. break;
  157. case 3:
  158. v = (current[2] * 0x80 + current[1] - 0x80) * 0x40 + current[0] - 0x80;
  159. break;
  160. case 4:
  161. v = ((current[3] * 0x80 + current[2] - 0x80) * 0x80 + current[1] - 0x80) * 0x40 + current[0] - 0x80;
  162. break;
  163. case 5:
  164. // over flow, positive to negative
  165. v = (((current[4] * 0x80 + current[3] - 0x80) * 0x80 + current[2] - 0x80) * 0x80 + current[1] - 0x80)* 0x40 + current[0] - 0x80;
  166. break;
  167. case 6:
  168. // over flow, positive to negative
  169. v = ((((current[5] * 0x80 + current[4] -0x80) * 0x80 +  current[3] - 0x80) * 0x80 + current[2] - 0x80) * 0x80 + current[1] - 0x80) * 0x40 + current[0] - 0x80;
  170. break;
  171. default:
  172. throw StockResWrongData(10);
  173. }
  174. skip_byte(nBytes);
  175. return v;
  176. }
  177. // if the data is compressed, then uncompressed them; Or else do nothing
  178. void StockRes::umcompress_if()
  179. {
  180. len = *(ushort*)(current);
  181. ulong new_len = *(ushort*)(current + 2);
  182. ulong new_len2 = new_len;
  183. skip_byte(4); // now the current moved to the real or compressed data
  184. if(0x78 == current[0] && 0x9c == (uchar)current[1])
  185. {
  186. uchar* new_data = new uchar[new_len];
  187. if(0 == uncompress((Byte*)new_data, &new_len, (Byte*)current, len) && new_len == new_len2)
  188. {
  189. dataNew = true;
  190. current = data = new_data;
  191. left = len = new_len;
  192. }
  193. else
  194. {
  195. delete[] new_data;
  196. throw StockResWrongData(2);
  197. }
  198. }
  199. else
  200. {
  201. data += 16;
  202. current = data;
  203. }
  204. }
  205. void StockHeartBeatRes ::operator()(){}
  206. ushort StockRes::get_packet_len()
  207. {
  208. return get_packet_len(raw_data);
  209. }
  210. ushort StockRes::get_packet_len(const uchar* pData)
  211. {
  212. return (*(ushort*)(pData + 12)) + 16;
  213. }
  214. uint StockRes::get_seq_id()
  215. {
  216. return get_seq_id(raw_data);
  217. }
  218. uint StockRes::get_seq_id(const uchar* pData)
  219. {
  220. return *(uint*)(pData + 5);
  221. }
  222. ushort StockRes::get_cmd_id()
  223. {
  224. return get_cmd_id(raw_data);
  225. }
  226. ushort StockRes::get_cmd_id(const uchar* pData)
  227. {
  228. return *(ushort*)(pData + 10);
  229. }
  230. StockListRes::StockListRes():StockRes()
  231. {}
  232. StockListRes::StockListRes(uchar* pData, ulong data_len):StockRes(pData, data_len)
  233. {
  234. // assert(CMD_STOCK_LIST == get_cmd_id());
  235. }
  236. void StockListRes::operator()()
  237. {
  238. posix_time::ptime now(posix_time::second_clock::local_time());
  239. posix_time::time_duration td(now.time_of_day());
  240. // uchar market = *(data + 4); // not market, but market location that is no use
  241. string code1((char*)(data + 5), MarketInfo::StocksCodeLen);
  242. short market = MarketInfo::get_market_type(code1);
  243. if(market >= MarketInfo::MARKET_UNKNOWN)
  244. {
  245. throw StockResWrongData(9);
  246. }
  247. MarketInfo::stocks_count[market] = get_ushort();
  248. StockBid::Bid bid; // zero it.
  249. ushort count = get_ushort();
  250. while(count--)
  251. {
  252. std::string code((char*)(current + 1), MarketInfo::StocksCodeLen);
  253. // Set up the lists of all stocks.
  254. if(MarketInfo::stocks_set[market].size() < MarketInfo::stocks_count[market]
  255. && market == MarketInfo::get_market_type(code)
  256. && (!(market == MarketInfo::MARKET_INDEX && code[0] < '3')))
  257. {
  258. MarketInfo::stocks_set[market].insert(code);
  259. }
  260. else
  261. {
  262. continue;
  263. }
  264. skip_byte(MarketInfo::StocksCodeLen + 1);
  265. bid.minute = (uint)(td.hours() * 60 + td.minutes());
  266. if(bid.minute > 15 * 60) 
  267. bid.minute = 15 * 60;
  268. else if(11 * 60 + 30 < bid.minute && bid.minute < 13 * 60)
  269. bid.minute = 11 * 60 + 30;
  270. bid.act = get_ushort();
  271. bid.price = parse_data();
  272. bid.y_close = parse_data() + bid.price;
  273. bid.open    = parse_data() + bid.price;
  274. bid.high    = parse_data() + bid.price;
  275. bid.low    = parse_data() + bid.price;
  276. bid.buy    = parse_data() + bid.price;
  277. bid.sell    = parse_data() + bid.price;
  278. bid.total_vol = parse_data2();
  279. bid.avail_vol = parse_data2();
  280. skip_byte(4);
  281. bid.inner_vol = parse_data2();
  282. bid.outer_vol = parse_data2();
  283. bid.updown   = parse_data();
  284. skip_data(1);
  285. bid.buy_price1 = parse_data() + bid.price;
  286. bid.sell_price1 = parse_data() + bid.price;
  287. bid.buy_vol1 =   parse_data2();
  288. bid.sell_vol1 =   parse_data2();
  289. bid.buy_price2 = parse_data() + bid.price;
  290. bid.sell_price2 = parse_data() + bid.price;
  291. bid.buy_vol2 =   parse_data2();
  292. bid.sell_vol2 =   parse_data2();
  293. bid.buy_price3 = parse_data() + bid.price;
  294. bid.sell_price3 = parse_data() + bid.price;
  295. bid.buy_vol3 =   parse_data2();
  296. bid.sell_vol3 =   parse_data2();
  297. bid.buy_price4 = parse_data() + bid.price;
  298. bid.sell_price4 = parse_data() + bid.price;
  299. bid.buy_vol4 =   parse_data2();
  300. bid.sell_vol4 =   parse_data2();
  301. bid.buy_price5 = parse_data() + bid.price;
  302. bid.sell_price5 = parse_data() + bid.price;
  303. bid.buy_vol5 =   parse_data2();
  304. bid.sell_vol5 =   parse_data2();
  305. recursive_mutex::scoped_lock guard(bid_mutex);
  306. if(!(market == MarketInfo::MARKET_INDEX && code[0] < '3'))
  307. instant_price_list[code][date_to_uint(gregorian::day_clock::local_day())].push_back(bid);
  308. // find next item.
  309. if(0 != count)
  310. {
  311. skip_byte(8);
  312. const uchar* p = 0;
  313. if(0 != (p = stock_code_search(current, 15)))
  314. {
  315. skip_byte(p - current - 1);
  316. }
  317. else
  318. {
  319. throw StockResWrongData(3);
  320. }
  321. }
  322. }
  323. return;
  324. }
  325. StockHoldChgRes::StockHoldChgRes():StockRes()
  326. {}
  327. StockHoldChgRes::StockHoldChgRes(uchar* pData, ulong data_len):StockRes(pData, data_len)
  328. {
  329. // assert(CMD_STOCKHOLD_CHANGE == get_cmd_id());
  330. }
  331. void StockHoldChgRes::operator()()
  332. {
  333. GBBQ gbbq;
  334. ushort stock_count = get_ushort();
  335. while(stock_count -- > 0)
  336. {
  337. skip_byte(1);
  338. if(!stockcode_is_valid(current))
  339. {
  340. throw StockResWrongData(4);
  341. }
  342. string s((char*)current, MarketInfo::StocksCodeLen);
  343. int uints = atoi(s.c_str());
  344. skip_byte(MarketInfo::StocksCodeLen);
  345. ushort record_count = get_ushort();
  346. while( record_count -- > 0)
  347. {
  348. skip_byte(MarketInfo::StocksCodeLen + 2);
  349. uint dt = get_uint();
  350. gbbq.code = s;
  351. gbbq.chg_type = get_uchar();
  352. gbbq.data.gb.old_cir = get_float();
  353. gbbq.data.gb.old_ttl = get_float();
  354. gbbq.data.gb.new_cir = get_float();
  355. gbbq.data.gb.new_ttl = get_float();
  356. stock_basic_info::Instance().add_stock_gbbq(dt, gbbq);
  357. }
  358. }
  359. }
  360. StockResWrongData::StockResWrongData( int i) : n_(i)
  361. {}
  362. const char * StockResWrongData::what() const throw()
  363. {
  364. return "StockResWrongData";
  365. }
  366. void ProcessResponsePacket(const uchar * pData , size_t len, bool test)
  367. {
  368. static uint packet_len = 0;
  369. static uint packet_curr_pos = 0;
  370. static uchar packet_buff[TCP_BUFSIZE_READ];
  371. recursive_mutex::scoped_lock guard(process_packet_mutex);
  372. // cout << hex<< (int)pData[0] << (int)pData[1] << (int)pData[2] << (int)pData[3];
  373. // cout << " len= " << len <<endl;
  374. if(test)
  375. {
  376. packet_len = len;
  377. }
  378. else
  379. {
  380. // application layer assembly 应用层组包
  381. if(pData[0] == 0xb1 && pData[1] == 0xcb && pData[2] == 0x74 && pData[3] == 0x00)
  382. {
  383. packet_curr_pos = 0;
  384. packet_len = StockRes::get_packet_len(pData);
  385. // cout  << "packet_len= " << packet_len << endl;
  386. }
  387. }
  388. if(packet_curr_pos + len <= packet_len)
  389. {
  390. memcpy(&packet_buff[0] + packet_curr_pos, pData, len);
  391. packet_curr_pos += len;
  392. }
  393. else
  394. {
  395. return;
  396. }
  397. if(packet_curr_pos == packet_len)
  398. {
  399. // 组包完成, 调整指针
  400. pData = &packet_buff[0];
  401. len = packet_len;
  402. packet_len = packet_curr_pos = 0;
  403. }
  404. else
  405. {
  406. return;
  407. }
  408. const uchar *orig_ptr = pData; uint orig_len = len;
  409. int cmd_id = StockRes::get_cmd_id(pData);
  410. try{
  411. boost::scoped_ptr <StockRes> pCmd(ResponseFactory::Instance().CreateObject(cmd_id));
  412. uint processed;
  413. while(len > 0)
  414. {
  415. processed = pCmd->read(pData, len);
  416. (*pCmd)();
  417. Request::res_seq_id(StockRes::get_seq_id(pData));
  418. len -= processed;
  419. pData += processed;
  420. }
  421. }
  422. catch(StockResWrongData& e)
  423. {
  424. if(!test)
  425. {
  426. log_packet((const char*)orig_ptr, orig_len);
  427. }
  428. cout << "Exception Id = " << e.n_ << endl;
  429. }
  430. catch(...)
  431. {
  432. cout << "Command" << cmd_id << " not implemented";
  433. }
  434. packet_len = packet_curr_pos = 0;
  435. }
  436. StockRes* CreateStockListRes()
  437. {
  438. return new StockListRes;
  439. }
  440. StockRes* CreateStockHoldChgRes()
  441. {
  442. return new StockHoldChgRes;
  443. }
  444. StockRes* CreateStockHeartBeatRes()
  445. {
  446. return new StockHeartBeatRes;
  447. }
  448. }