zbuffer.cpp
上传用户:dzyhzl
上传日期:2019-04-29
资源大小:56270k
文件大小:17k
源码类别:

模拟服务器

开发平台:

C/C++

  1. #include "xbuffer.h"
  2. #include <string.h>
  3. #include <stdlib.h>
  4. ZPerf::ZPerf() {
  5. packet_number = remain_number = 0;
  6. total_size = remain_size = 0;
  7. max_size = min_size = -1;
  8. max_interval = min_interval = -1;
  9. }
  10. void ZPerf::start() {
  11. #ifdef WIN32
  12. tick_count = ZTimer::now();
  13. #endif
  14. }
  15. void ZPerf::stop() {
  16. #ifdef WIN32
  17. tick_count = ZTimer::now() - tick_count;
  18. #endif
  19. }
  20. void ZPerf::perfPacket(int size) { //发送或者接收一个数据包
  21. if(size > max_size || max_size == -1)
  22. max_size = size;
  23. if(size < min_size || min_size == -1)
  24. min_size = size;
  25. total_size += size;
  26. #ifdef WIN32
  27. /* unsigned long interval = ZTimer::now();
  28. if(connection->packet_time == 0)
  29. connection->packet_time = interval;
  30. else {
  31. interval -= connection->packet_time;
  32. if(interval > max_interval || max_interval == -1)
  33. max_interval = (short)interval;
  34. if(interval < min_interval || min_interval == -1)
  35. min_interval = (short)interval;
  36. connection->packet_time += interval;
  37. }*/
  38. #endif
  39. }
  40. void ZPerf::useBuffer(int size)
  41. {
  42. remain_size += size;
  43. remain_number++;
  44. }
  45. void ZPerf::freeBuffer(int size)
  46. {
  47. remain_size -= size;
  48. remain_number--;
  49. }
  50. #ifdef ZBUFFER_H
  51. bool ZBuffer::getNode(int index, int size) {
  52. int data_size = size;
  53. if (size >= 64 * 1024)
  54. {
  55. printf("too large blockn");
  56. return false;
  57. }
  58. bool bDebug = false;
  59. size += sizeof(packet_info);
  60. unsigned long align = size % MINIMIZE_BLOCK_SIZE;
  61. if(align)
  62. size += MINIMIZE_BLOCK_SIZE - align;
  63. if(size > buffer_size)
  64. {
  65. printf("size too large... <%08X> %d > %dn", this, size, buffer_size);
  66. return false;
  67. }
  68. packet_info *next;
  69. int nDeadLock = 0;
  70. while(free_packet->size < size) { //删除后面的项目直到有足够的空间为止
  71. if(bDebug) {
  72. printf("free %d %dn", (char *)free_packet - buffer, free_packet->size);
  73. }
  74. next = (packet_info *)((char *)free_packet + free_packet->size); //下一个节点
  75. if ((char *)next - buffer + sizeof(packet_info) >= buffer_size) { //已经到了结束的位置,回第一个
  76. free_packet = (packet_info *)buffer;
  77. if (free_packet->state != STATE_FREE)
  78. {
  79. printf("free_packet at head not freen");
  80. while (free_packet->state != STATE_FREE)
  81. free_packet = (packet_info *)((char *)free_packet + free_packet->size);
  82. }
  83. printf("buffer <%08X> recycle...n", this);
  84. printf("free_packet %d size = %dn", (char *)free_packet - buffer, free_packet->size);
  85. continue;
  86. }
  87. else {
  88. if (next->state != STATE_FREE) { //需要删除最老的节点
  89. printf("node alloc failed....request size = %d.n", size);
  90. printf("skip for free node...n");
  91. while (next->state != STATE_FREE && nDeadLock < 1024)
  92. {
  93. next = (packet_info *)((char *)next + next->size);
  94. if ((char *)next - buffer + sizeof(packet_info) >= buffer_size) { //已经到了结束的位置,回第一个
  95. next = (packet_info *)buffer;
  96. printf("abnormal buffer <%08X> recycle...n", this);
  97. }
  98. nDeadLock++;
  99. }
  100. if(nDeadLock == 1024) {
  101. printf("Dead lockn");
  102. }
  103. if (next->state == STATE_FREE)
  104. {
  105. free_packet = next;
  106. printf("skip for free node success %d...n", (char *)free_packet - buffer);
  107. bDebug = true;
  108. continue;
  109. }
  110. /* printf("force recoveryn");
  111. int i;
  112. for (i = 0; i < max_connection; i++)
  113. {
  114. if (connections[i].state != CONNECTION_FREE &&
  115. connections[i].head_offset == (char*)next - buffer)
  116. {
  117. packet_info *packet = (packet_info *)(buffer + connections[i].head_offset);
  118. if (packet->next == -1)
  119. connections[i].head_offset = connections[i].tail_offset = -1;
  120. else
  121. connections[i].head_offset = packet->next;
  122. next->state = STATE_FREE;
  123. break;
  124. }
  125. }
  126. if (i >= max_connection)*/
  127. {
  128. printf("skip for free node failed...n");
  129. printf("node pos %d | %dn", (char*)free_packet - buffer, (char*)next - buffer);
  130. printf("status: total->%d | send-> %d / %d| recv-> %d / %dn", buffer_size, send_perf.remain_size, send_perf.remain_number, receive_perf.remain_size, receive_perf.remain_number);
  131. /* for (int i = 0; i < max_connection; i++)
  132. {
  133. if (connections[i].state != CONNECTION_FREE && connections[i].used_bufsize)
  134. {
  135. printf("    [%d] connection use %dn", i, connections[i].used_bufsize);
  136. }
  137. }*/
  138. return false;
  139. }
  140. }
  141. free_packet->size += next->size;
  142. free_packet->next = -1;
  143. }
  144. }
  145. next = (packet_info *)((char *)free_packet + size);
  146. if ((char *)next - buffer + sizeof(packet_info) >= buffer_size)
  147. {
  148. printf("buffer <%08X> recycle...n", this);
  149. next = (packet_info *)buffer;
  150. nDeadLock = 0;
  151. if (next->state != STATE_FREE)
  152. {
  153. printf("recycle failed....n");
  154. printf("skip for free node...n");
  155. while (next->state != STATE_FREE && nDeadLock < 1024)
  156. {
  157. next = (packet_info *)((char *)next + next->size);
  158. if ((char *)next - buffer + sizeof(packet_info) >= buffer_size) { //已经到了结束的位置,回第一个
  159. next = (packet_info *)buffer;
  160. printf("abnormal buffer <%08X> recycle...n", this);
  161. }
  162. nDeadLock++;
  163. }
  164. if(nDeadLock == 1024) {
  165. printf("Dead lockn");
  166. }
  167. if (next->state != STATE_FREE)
  168. {
  169. printf("skip for free node failed...n");
  170. return false;
  171. }
  172. else {
  173. printf("skip for free node success...n");
  174. }
  175. }
  176. }
  177. else if (free_packet->size > size) { //切分当前空闲块为占用和非占用两块
  178. next->size = free_packet->size - size;
  179. next->data_size = 0;
  180. next->current_size = 0;
  181. next->state = STATE_FREE;
  182. next->next = -1;
  183. }
  184. free_packet->size = size;
  185. free_packet->data_size = data_size;
  186. free_packet->current_size = 0;
  187. free_packet->next = -1;
  188. free_packet->state = STATE_IDLE;
  189. connection_info *current = &connections[index];
  190. if(current->head_offset == -1) { //没有数据
  191. current->head_offset = current->tail_offset = (char *)free_packet - buffer;
  192. }
  193. else {
  194. packet_info *tail = (packet_info *)(buffer + current->tail_offset);
  195. tail->next = (char *)free_packet - buffer;
  196. current->tail_offset = tail->next;
  197. }
  198. free_packet = next; //重新设置空闲块
  199. return true;
  200. }
  201. ZBuffer::ZBuffer(long size, int number) {
  202. printf("alloc..ZBuffer<%08X>..", this);
  203. buffer_size = max_connection = 0;
  204. connections = new connection_info[number];
  205. if(!connections)
  206. return;
  207. max_connection = number;
  208. size += sizeof(packet_info);
  209. unsigned long align = size % MINIMIZE_BLOCK_SIZE;
  210. if(align)
  211. size += MINIMIZE_BLOCK_SIZE - align;
  212. buffer = new char[size];
  213. if(!buffer) {
  214. delete[] connections;
  215. return;
  216. }
  217. buffer_size = size;
  218. printf("size = %dn", buffer_size);
  219. free_packet = (packet_info *)buffer;
  220. free_packet->size = buffer_size;
  221. free_packet->data_size = 0;
  222. free_packet->current_size = 0;
  223. free_packet->next = -1;
  224. bPerf = false;
  225. recv_buffers = new char*[max_connection];
  226. memset(recv_buffers, 0, sizeof(recv_buffers[0]) * max_connection);
  227. recv_buffer_size = new int[max_connection];
  228. memset(recv_buffer_size, 0, sizeof(recv_buffer_size[0]) * max_connection);
  229. }
  230. ZBuffer::~ZBuffer() {
  231. if(buffer_size) {
  232. delete[] buffer;
  233. delete[] connections;
  234. }
  235. if (recv_buffers)
  236. {
  237. for (int i = 0; i < max_connection; i++)
  238. {
  239. if (recv_buffers[i])
  240. delete recv_buffers[i];
  241. }
  242. delete recv_buffers;
  243. }
  244. if (recv_buffer_size)
  245. delete recv_buffer_size;
  246. }
  247. int ZBuffer::getConnection() {
  248. int index;
  249. for(index = 0; index < max_connection; index++) {
  250. if(connections[index].state == CONNECTION_FREE) {
  251. connections[index].state = CONNECTION_USED;
  252. return index;
  253. }
  254. }
  255. return -1;
  256. }
  257. long ZBuffer::getUsedBufferSize(int index)
  258. {
  259. return connections[index].used_bufsize;
  260. }
  261. void ZBuffer::clear(int index, bool bSendOrRecv) {
  262. mutex.lock();
  263. connection_info *current = &connections[index];
  264. if(current->head_offset >= 0 ) {
  265. packet_info *packet = (packet_info *)(buffer + current->head_offset);
  266. while(packet) {
  267. packet->state = STATE_FREE;
  268. if (bPerf)
  269. {
  270. if (bSendOrRecv)
  271. send_perf.freeBuffer(packet->size, current);
  272. else
  273. receive_perf.freeBuffer(packet->size, current);
  274. }
  275. if(packet->next == -1) break;
  276. packet = (packet_info *)(buffer + packet->next);
  277. }
  278. current->head_offset = current->tail_offset = -1;
  279. }
  280. connections[index].reset();
  281. mutex.unlock();
  282. }
  283. bool ZBuffer::receiveData(int index, const char *data, int actsize) {
  284. //接收到一块数据,可能不是正好在边界上,可能有几个数据包
  285. //接收到一块数据,可能不是正好在边界上,可能有几个数据包
  286. int size = actsize;
  287. mutex.lock();
  288. //printf("receiving buffer %d - %dn", index, actsize);
  289. connection_info *current = &connections[index];
  290. const char *data_ptr = data;
  291. packet_info *packet = NULL;
  292. unsigned short packet_size = 0xFFFF;
  293. while (size > 0) {
  294. if (current->tail_offset >= 0) {
  295. packet = (packet_info *)(buffer + current->tail_offset);
  296. }
  297. //正好收到末尾一个字节的边界处理 [wxb 2003-8-5]
  298. if (1 == size &&
  299. (packet && packet->current_size == packet->data_size))
  300. {
  301. //ASSERT(current->halfword == 0);
  302. current->halfword = (((unsigned short)0x8000) | (unsigned short)(*((unsigned char*)data_ptr)));
  303. //printf("Half word received..[%d]..tail<%02X>n", index, *((unsigned char*)data_ptr));
  304. break;
  305. }
  306. if (current->head_offset == -1 || 
  307. (packet && packet->current_size == packet->data_size)) {
  308. //没有数据包或者已经是完整的数据包了,需要分配
  309. if (current->halfword)
  310. {
  311. //处理上一个包尾的单个字节 [wxb 2003-8-5]
  312. packet_size = (unsigned short)((current->halfword & 0x7FFF) | ((unsigned short)(*((unsigned char*)data_ptr))) << 8) - sizeof(unsigned short);
  313. //printf("Half word received..[%d]..head<%02X>...processing<%02X>...yes perfect<%d>n", index, *((unsigned char*)data_ptr), (current->halfword & 0x7FFF), packet_size);
  314. current->halfword = 0;
  315. data_ptr += 1;
  316. size -= 1;
  317. }
  318. else
  319. {
  320. packet_size = *(unsigned short *)data_ptr - sizeof(unsigned short);
  321. data_ptr += sizeof(unsigned short);
  322. size -= sizeof(unsigned short);
  323. }
  324. //printf("packet size = %dn", packet_size);
  325. if (!getNode(index, packet_size)) {
  326.   mutex.unlock();
  327. printf("getNode fail while receiveData()n");
  328. exit(0);
  329. return false;
  330. }
  331. packet = (packet_info *)(buffer + current->tail_offset); //重新获取尾部包的指针
  332. packet->state = STATE_RECEIVING; //正在接收数据
  333. //得到一个新的数据包了,进行性能分析
  334. if (bPerf) {
  335. receive_perf.perfPacket(packet_size, current);
  336. receive_perf.packet_number++;
  337. receive_perf.useBuffer(packet->size, current);
  338. }
  339. }
  340. int copy_size = packet->data_size - packet->current_size;
  341. if(copy_size > size)
  342. copy_size = size;
  343. if (copy_size < 0)
  344. {
  345. mutex.unlock();
  346. printf("copy_size negative %08X, %d, %d, %d. <?> %d, %d | pos = %dn", this, copy_size, packet->data_size, packet->current_size, actsize, data_ptr - data, (char*)packet - buffer);
  347. return false;
  348. }
  349. memcpy((char *)packet + sizeof(packet_info) + packet->current_size, data_ptr, copy_size);
  350. size -= copy_size;
  351. data_ptr += copy_size;
  352. packet->current_size += copy_size;
  353. if(packet->current_size == packet->data_size) {
  354. packet->state = STATE_WAITING;
  355. }
  356. }
  357. mutex.unlock();
  358. return true;
  359. }
  360. char *ZBuffer::getPacket(int index, int &size) { //获取一个完整的数据包
  361. mutex.lock();
  362. connection_info *current = &connections[index];
  363. if(current->head_offset == -1) {
  364. mutex.unlock();
  365. return NULL;
  366. }
  367. // flying comment here, just to make sure that this argument
  368. // is correct.
  369. if (index == -1)
  370. {
  371. printf("call getPacket with illegal index!n");
  372. mutex.unlock();
  373. return NULL;
  374. }
  375. packet_info *packet = (packet_info *)(buffer + current->head_offset);
  376. char *result = (char *)packet + sizeof(packet_info);
  377. if(current->head_offset == current->tail_offset) { //只有一个数据包
  378. if(packet->current_size != packet->data_size) {
  379. mutex.unlock();
  380. return NULL; //不完整
  381. }
  382. current->head_offset = current->tail_offset = -1;
  383. }
  384. else current->head_offset = packet->next;
  385. size = packet->data_size;
  386. packet->current_size = 0;
  387. packet->data_size = 0;
  388. packet->state = STATE_FREE;
  389. if(bPerf)
  390. {
  391. receive_perf.freeBuffer(packet->size, current);
  392. }
  393. result = getRecvBuffer(index, size);
  394. memcpy(result, (char *)packet + sizeof(packet_info), size);
  395. mutex.unlock();
  396. return result;
  397. }
  398. //发送数据的接口分成两个函数,一个是获取当前发送的缓冲区和长度,另外一个是从缓冲区清除指定大小的数据
  399. //还有就是同时发送很多数据包的功能。
  400. char *ZBuffer::getSendData(int index, int &size) {
  401. connection_info *current = &connections[index];
  402. if(current->head_offset == -1) return NULL; //没有数据
  403. mutex.lock();
  404. char *result = NULL;
  405. packet_info *packet = (packet_info *)(buffer + current->head_offset);
  406. if(packet->state == STATE_SENDING) {
  407. size = packet->data_size - packet->current_size;
  408. if(size == 0) {
  409. result = NULL;
  410. }
  411. else {
  412. result = (char *)packet + sizeof(packet_info) + packet->current_size;
  413. }
  414. }
  415. mutex.unlock();
  416. return result;
  417. }
  418. void ZBuffer::sendData(int index, int size) {
  419. //一定是获取发送缓冲区成功之后才调用,可以省掉一些错误处理
  420. connection_info *current = &connections[index];
  421. mutex.lock();
  422. packet_info *packet = (packet_info *)(buffer + current->head_offset);
  423. if(packet->state == STATE_SENDING) {
  424. packet->current_size += size;
  425. if(packet->current_size == packet->data_size) { //一个数据包发送完成
  426. if(bPerf) {
  427. send_perf.freeBuffer(packet->size, current);
  428. send_perf.perfPacket(packet->data_size, current); //统计性能
  429. }
  430. packet->state = STATE_FREE;
  431. current->head_offset = packet->next;
  432. if(current->head_offset < 0) 
  433. current->tail_offset = -1;
  434. }
  435. }
  436. //printf("data sent: %d - %dn", index, size);
  437. mutex.unlock();
  438. }
  439. bool ZBuffer::packData(int index, const char *data, int size) { //发送一个数据包的数据
  440. mutex.lock();
  441. connection_info *current = &connections[index];
  442. packet_info *packet = NULL;
  443. if(current->tail_offset >= 0 ) packet = (packet_info *)(buffer + current->tail_offset);
  444. if(!packet || packet->state != STATE_PACKING) {
  445. //记录当前尾部的偏移量
  446. if(!getNode(index, MAX_PACKET_SIZE)) {
  447. mutex.unlock();
  448. printf("getNode fail while packData()n");
  449. exit(0);
  450. return false;
  451. }
  452. packet = (packet_info *)(buffer + current->tail_offset);
  453. packet->state = STATE_PACKING;
  454. packet->current_size += sizeof(unsigned short);
  455. if (bPerf)
  456. {
  457. send_perf.useBuffer(packet->size, current);
  458. }
  459. }
  460. if(size + packet->current_size + sizeof(packet_info) > packet->size) { //超出了一个块的大小
  461. //printf("packing size a bit large...%08X, %d,%d,%dn", this, size,packet->current_size,packet->size);
  462. packet_info *old_packet = packet;
  463. packet_info *previous = (packet_info *)(buffer + current->head_offset);
  464. unsigned long old_tail = current->tail_offset;
  465. while(previous) {
  466. if(current->tail_offset == (char *)previous - buffer || previous->next == (char *)old_packet - buffer) { //找到尾巴了
  467. if(current->tail_offset == (char *)previous - buffer)
  468. current->head_offset = current->tail_offset = -1;
  469. else
  470. current->tail_offset = (char *)previous - buffer;
  471. if(!getNode(index, old_packet->size + MAX_PACKET_SIZE)) { //增加大小
  472. mutex.unlock();
  473. printf("getNode fail while packData() 2n");
  474. exit(0);
  475. return false;
  476. }
  477. old_packet->state = STATE_FREE;
  478. packet = (packet_info *)(buffer + current->tail_offset);
  479. if (bPerf)
  480. {
  481. send_perf.useBuffer(packet->size, current);
  482. send_perf.freeBuffer(old_packet->size, current);
  483. }
  484. memcpy((char *)packet + sizeof(packet_info),
  485. (char *)old_packet + sizeof(packet_info), old_packet->current_size);
  486. packet->current_size = old_packet->current_size;
  487. packet->state = STATE_PACKING;
  488. break;
  489. }
  490. if(previous->next == -1) {
  491. mutex.unlock();
  492. return false;
  493. }
  494. previous = (packet_info *)(buffer + previous->next);
  495. }
  496. }
  497. char *data_ptr = (char *)packet + sizeof(packet_info) + packet->current_size;
  498. memcpy(data_ptr, data, size);
  499. packet->current_size += size;
  500. mutex.unlock();
  501. return true;
  502. }
  503. char *ZBuffer::sendPacket(int index, int &datalength) {
  504. char *result = NULL;
  505. mutex.lock();
  506. connection_info *current = &connections[index];
  507. if(current->tail_offset >= 0) {
  508. packet_info *packet = (packet_info *)(buffer + current->tail_offset);
  509. if(packet->current_size > 0 && packet->state != STATE_SENDING) {
  510. packet->data_size = packet->current_size;
  511. datalength = packet->current_size - sizeof(unsigned short);
  512. result = (char *)packet + sizeof(packet_info) + sizeof(unsigned short);
  513. *(unsigned short *)((char *)packet + sizeof(packet_info)) = packet->data_size;
  514. packet->state = STATE_SENDING;
  515. packet->current_size = 0;
  516. if(bPerf) {
  517. send_perf.packet_number++;
  518. }
  519. }
  520. }
  521. mutex.unlock();
  522. // if (datalength > 0)
  523. //printf("prepare to send %d - %dn", index, datalength);
  524. return result;
  525. }
  526. #endif