zbuffer.cpp
上传用户:dzyhzl
上传日期:2019-04-29
资源大小:56270k
文件大小:17k
源码类别:

模拟服务器

开发平台:

C/C++

  1. #include <stdafx.h>
  2. #include "zbuffer.h"
  3. #ifdef ZBUFFER_H
  4. #include <string.h>
  5. #include <stdlib.h>
  6. ZPerf::ZPerf() {
  7. packet_number = remain_number = 0;
  8. total_size = remain_size = 0;
  9. max_size = min_size = -1;
  10. max_interval = min_interval = -1;
  11. }
  12. void ZPerf::start() {
  13. #ifdef WIN32
  14. tick_count = ZTimer::now();
  15. #endif
  16. }
  17. void ZPerf::stop() {
  18. #ifdef WIN32
  19. tick_count = ZTimer::now() - tick_count;
  20. #endif
  21. }
  22. void ZPerf::perfPacket(int size, connection_info *connection) { //发送或者接收一个数据包
  23. if(size > max_size || max_size == -1)
  24. max_size = size;
  25. if(size < min_size || min_size == -1)
  26. min_size = size;
  27. total_size += size;
  28. #ifdef WIN32
  29. unsigned long interval = ZTimer::now();
  30. if(connection->packet_time == 0)
  31. connection->packet_time = interval;
  32. else {
  33. interval -= connection->packet_time;
  34. if(interval > max_interval || max_interval == -1)
  35. max_interval = (short)interval;
  36. if(interval < min_interval || min_interval == -1)
  37. min_interval = (short)interval;
  38. connection->packet_time += interval;
  39. }
  40. #endif
  41. }
  42. void ZPerf::useBuffer(int size, connection_info *connection)
  43. {
  44. remain_size += size;
  45. remain_number++;
  46. connection->used_bufsize += size;
  47. }
  48. void ZPerf::freeBuffer(int size, connection_info *connection)
  49. {
  50. remain_size -= size;
  51. remain_number--;
  52. connection->used_bufsize -= size;
  53. }
  54. bool ZBuffer::getNode(int index, int size) {
  55. int data_size = size;
  56. if (size >= 64 * 1024)
  57. {
  58. printf("too large blockn");
  59. return false;
  60. }
  61. bool bDebug = false;
  62. size += sizeof(packet_info);
  63. unsigned long align = size % MINIMIZE_BLOCK_SIZE;
  64. if(align)
  65. size += MINIMIZE_BLOCK_SIZE - align;
  66. if(size > buffer_size)
  67. {
  68. printf("size too large... <%08X> %d > %dn", this, size, buffer_size);
  69. return false;
  70. }
  71. packet_info *next;
  72. int nDeadLock = 0;
  73. while(free_packet->size < size) { //删除后面的项目直到有足够的空间为止
  74. if(bDebug) {
  75. printf("free %d %dn", (char *)free_packet - buffer, free_packet->size);
  76. }
  77. next = (packet_info *)((char *)free_packet + free_packet->size); //下一个节点
  78. if ((char *)next - buffer + sizeof(packet_info) >= buffer_size) { //已经到了结束的位置,回第一个
  79. free_packet = (packet_info *)buffer;
  80. if (free_packet->state != STATE_FREE)
  81. {
  82. printf("free_packet at head not freen");
  83. while (free_packet->state != STATE_FREE)
  84. free_packet = (packet_info *)((char *)free_packet + free_packet->size);
  85. }
  86. printf("buffer <%08X> recycle...n", this);
  87. printf("free_packet %d size = %dn", (char *)free_packet - buffer, free_packet->size);
  88. continue;
  89. }
  90. else {
  91. if (next->state != STATE_FREE) { //需要删除最老的节点
  92. printf("node alloc failed....request size = %d.n", size);
  93. printf("skip for free node...n");
  94. while (next->state != STATE_FREE && nDeadLock < 1024)
  95. {
  96. next = (packet_info *)((char *)next + next->size);
  97. if ((char *)next - buffer + sizeof(packet_info) >= buffer_size) { //已经到了结束的位置,回第一个
  98. next = (packet_info *)buffer;
  99. printf("abnormal buffer <%08X> recycle...n", this);
  100. }
  101. nDeadLock++;
  102. }
  103. if(nDeadLock == 1024) {
  104. printf("Dead lockn");
  105. }
  106. if (next->state == STATE_FREE)
  107. {
  108. free_packet = next;
  109. printf("skip for free node success %d...n", (char *)free_packet - buffer);
  110. bDebug = true;
  111. continue;
  112. }
  113. /* printf("force recoveryn");
  114. int i;
  115. for (i = 0; i < max_connection; i++)
  116. {
  117. if (connections[i].state != CONNECTION_FREE &&
  118. connections[i].head_offset == (char*)next - buffer)
  119. {
  120. packet_info *packet = (packet_info *)(buffer + connections[i].head_offset);
  121. if (packet->next == -1)
  122. connections[i].head_offset = connections[i].tail_offset = -1;
  123. else
  124. connections[i].head_offset = packet->next;
  125. next->state = STATE_FREE;
  126. break;
  127. }
  128. }
  129. if (i >= max_connection)*/
  130. {
  131. printf("skip for free node failed...n");
  132. printf("node pos %d | %dn", (char*)free_packet - buffer, (char*)next - buffer);
  133. printf("status: total->%d | send-> %d / %d| recv-> %d / %dn", buffer_size, send_perf.remain_size, send_perf.remain_number, receive_perf.remain_size, receive_perf.remain_number);
  134. /* for (int i = 0; i < max_connection; i++)
  135. {
  136. if (connections[i].state != CONNECTION_FREE && connections[i].used_bufsize)
  137. {
  138. printf("    [%d] connection use %dn", i, connections[i].used_bufsize);
  139. }
  140. }*/
  141. return false;
  142. }
  143. }
  144. free_packet->size += next->size;
  145. free_packet->next = -1;
  146. }
  147. }
  148. next = (packet_info *)((char *)free_packet + size);
  149. if ((char *)next - buffer + sizeof(packet_info) >= buffer_size)
  150. {
  151. printf("buffer <%08X> recycle...n", this);
  152. next = (packet_info *)buffer;
  153. nDeadLock = 0;
  154. if (next->state != STATE_FREE)
  155. {
  156. printf("recycle failed....n");
  157. printf("skip for free node...n");
  158. while (next->state != STATE_FREE && nDeadLock < 1024)
  159. {
  160. next = (packet_info *)((char *)next + next->size);
  161. if ((char *)next - buffer + sizeof(packet_info) >= buffer_size) { //已经到了结束的位置,回第一个
  162. next = (packet_info *)buffer;
  163. printf("abnormal buffer <%08X> recycle...n", this);
  164. }
  165. nDeadLock++;
  166. }
  167. if(nDeadLock == 1024) {
  168. printf("Dead lockn");
  169. }
  170. if (next->state != STATE_FREE)
  171. {
  172. printf("skip for free node failed...n");
  173. return false;
  174. }
  175. else {
  176. printf("skip for free node success...n");
  177. }
  178. }
  179. }
  180. else if (free_packet->size > size) { //切分当前空闲块为占用和非占用两块
  181. next->size = free_packet->size - size;
  182. next->data_size = 0;
  183. next->current_size = 0;
  184. next->state = STATE_FREE;
  185. next->next = -1;
  186. }
  187. free_packet->size = size;
  188. free_packet->data_size = data_size;
  189. free_packet->current_size = 0;
  190. free_packet->next = -1;
  191. free_packet->state = STATE_IDLE;
  192. connection_info *current = &connections[index];
  193. if(current->head_offset == -1) { //没有数据
  194. current->head_offset = current->tail_offset = (char *)free_packet - buffer;
  195. }
  196. else {
  197. packet_info *tail = (packet_info *)(buffer + current->tail_offset);
  198. tail->next = (char *)free_packet - buffer;
  199. current->tail_offset = tail->next;
  200. }
  201. free_packet = next; //重新设置空闲块
  202. return true;
  203. }
  204. ZBuffer::ZBuffer(long size, int number) {
  205. printf("alloc..ZBuffer<%08X>..", this);
  206. buffer_size = max_connection = 0;
  207. connections = new connection_info[number];
  208. if(!connections)
  209. return;
  210. max_connection = number;
  211. size += sizeof(packet_info);
  212. unsigned long align = size % MINIMIZE_BLOCK_SIZE;
  213. if(align)
  214. size += MINIMIZE_BLOCK_SIZE - align;
  215. buffer = new char[size];
  216. if(!buffer) {
  217. delete[] connections;
  218. return;
  219. }
  220. buffer_size = size;
  221. printf("size = %dn", buffer_size);
  222. free_packet = (packet_info *)buffer;
  223. free_packet->size = buffer_size;
  224. free_packet->data_size = 0;
  225. free_packet->current_size = 0;
  226. free_packet->next = -1;
  227. bPerf = false;
  228. recv_buffers = new char*[max_connection];
  229. memset(recv_buffers, 0, sizeof(recv_buffers[0]) * max_connection);
  230. recv_buffer_size = new int[max_connection];
  231. memset(recv_buffer_size, 0, sizeof(recv_buffer_size[0]) * max_connection);
  232. }
  233. ZBuffer::~ZBuffer() {
  234. if(buffer_size) {
  235. delete[] buffer;
  236. delete[] connections;
  237. }
  238. if (recv_buffers)
  239. {
  240. for (int i = 0; i < max_connection; i++)
  241. {
  242. if (recv_buffers[i])
  243. delete recv_buffers[i];
  244. }
  245. delete recv_buffers;
  246. }
  247. if (recv_buffer_size)
  248. delete recv_buffer_size;
  249. }
  250. int ZBuffer::getConnection() {
  251. int index;
  252. for(index = 0; index < max_connection; index++) {
  253. if(connections[index].state == CONNECTION_FREE) {
  254. connections[index].state = CONNECTION_USED;
  255. return index;
  256. }
  257. }
  258. return -1;
  259. }
  260. long ZBuffer::getUsedBufferSize(int index)
  261. {
  262. return connections[index].used_bufsize;
  263. }
  264. void ZBuffer::clear(int index, bool bSendOrRecv) {
  265. mutex.lock();
  266. connection_info *current = &connections[index];
  267. if(current->head_offset >= 0 ) {
  268. packet_info *packet = (packet_info *)(buffer + current->head_offset);
  269. while(packet) {
  270. packet->state = STATE_FREE;
  271. if (bPerf)
  272. {
  273. if (bSendOrRecv)
  274. send_perf.freeBuffer(packet->size, current);
  275. else
  276. receive_perf.freeBuffer(packet->size, current);
  277. }
  278. if(packet->next == -1) break;
  279. packet = (packet_info *)(buffer + packet->next);
  280. }
  281. current->head_offset = current->tail_offset = -1;
  282. }
  283. connections[index].reset();
  284. mutex.unlock();
  285. }
  286. bool ZBuffer::receiveData(int index, const char *data, int actsize) { //接收到一块数据,可能不是正好在边界上,可能有几个数据包
  287. //接收到一块数据,可能不是正好在边界上,可能有几个数据包
  288. int size = actsize;
  289. mutex.lock();
  290. //printf("receiving buffer %d - %dn", index, actsize);
  291. connection_info *current = &connections[index];
  292. const char *data_ptr = data;
  293. packet_info *packet = NULL;
  294. unsigned short packet_size = 0xFFFF;
  295. while (size > 0) {
  296. if (current->tail_offset >= 0) {
  297. packet = (packet_info *)(buffer + current->tail_offset);
  298. }
  299. //正好收到末尾一个字节的边界处理 [wxb 2003-8-5]
  300. if (1 == size &&
  301. (packet && packet->current_size == packet->data_size))
  302. {
  303. //ASSERT(current->halfword == 0);
  304. current->halfword = (((unsigned short)0x8000) | (unsigned short)(*((unsigned char*)data_ptr)));
  305. //printf("Half word received..[%d]..tail<%02X>n", index, *((unsigned char*)data_ptr));
  306. break;
  307. }
  308. if (current->head_offset == -1 || 
  309. (packet && packet->current_size == packet->data_size)) {
  310. //没有数据包或者已经是完整的数据包了,需要分配
  311. if (current->halfword)
  312. {
  313. //处理上一个包尾的单个字节 [wxb 2003-8-5]
  314. packet_size = (unsigned short)((current->halfword & 0x7FFF) | ((unsigned short)(*((unsigned char*)data_ptr))) << 8) - sizeof(unsigned short);
  315. //printf("Half word received..[%d]..head<%02X>...processing<%02X>...yes perfect<%d>n", index, *((unsigned char*)data_ptr), (current->halfword & 0x7FFF), packet_size);
  316. current->halfword = 0;
  317. data_ptr += 1;
  318. size -= 1;
  319. }
  320. else
  321. {
  322. packet_size = *(unsigned short *)data_ptr - sizeof(unsigned short);
  323. data_ptr += sizeof(unsigned short);
  324. size -= sizeof(unsigned short);
  325. }
  326. //printf("packet size = %dn", packet_size);
  327. if (!getNode(index, packet_size)) {
  328.   mutex.unlock();
  329. printf("getNode fail while receiveData()n");
  330. exit(0);
  331. return false;
  332. }
  333. packet = (packet_info *)(buffer + current->tail_offset); //重新获取尾部包的指针
  334. packet->state = STATE_RECEIVING; //正在接收数据
  335. //得到一个新的数据包了,进行性能分析
  336. if (bPerf) {
  337. receive_perf.perfPacket(packet_size, current);
  338. receive_perf.packet_number++;
  339. receive_perf.useBuffer(packet->size, current);
  340. }
  341. }
  342. int copy_size = packet->data_size - packet->current_size;
  343. if(copy_size > size)
  344. copy_size = size;
  345. if (copy_size < 0)
  346. {
  347. mutex.unlock();
  348. printf("copy_size negative %08X, %d, %d, %d. <?> %d, %d | pos = %dn", this, copy_size, packet->data_size, packet->current_size, actsize, data_ptr - data, (char*)packet - buffer);
  349. return false;
  350. }
  351. memcpy((char *)packet + sizeof(packet_info) + packet->current_size, data_ptr, copy_size);
  352. size -= copy_size;
  353. data_ptr += copy_size;
  354. packet->current_size += copy_size;
  355. if(packet->current_size == packet->data_size) {
  356. packet->state = STATE_WAITING;
  357. }
  358. }
  359. mutex.unlock();
  360. return true;
  361. }
  362. char *ZBuffer::getPacket(int index, int &size) { //获取一个完整的数据包
  363. mutex.lock();
  364. connection_info *current = &connections[index];
  365. if(current->head_offset == -1) {
  366. mutex.unlock();
  367. return NULL;
  368. }
  369. // flying comment here, just to make sure that this argument
  370. // is correct.
  371. if (index == -1)
  372. {
  373. printf("call getPacket with illegal index!n");
  374. mutex.unlock();
  375. return NULL;
  376. }
  377. packet_info *packet = (packet_info *)(buffer + current->head_offset);
  378. char *result = (char *)packet + sizeof(packet_info);
  379. if(current->head_offset == current->tail_offset) { //只有一个数据包
  380. if(packet->current_size != packet->data_size) {
  381. mutex.unlock();
  382. return NULL; //不完整
  383. }
  384. current->head_offset = current->tail_offset = -1;
  385. }
  386. else current->head_offset = packet->next;
  387. size = packet->data_size;
  388. packet->current_size = 0;
  389. packet->data_size = 0;
  390. packet->state = STATE_FREE;
  391. if(bPerf)
  392. {
  393. receive_perf.freeBuffer(packet->size, current);
  394. }
  395. result = getRecvBuffer(index, size);
  396. memcpy(result, (char *)packet + sizeof(packet_info), size);
  397. mutex.unlock();
  398. return result;
  399. }
  400. //发送数据的接口分成两个函数,一个是获取当前发送的缓冲区和长度,另外一个是从缓冲区清除指定大小的数据
  401. //还有就是同时发送很多数据包的功能。
  402. char *ZBuffer::getSendData(int index, int &size) {
  403. connection_info *current = &connections[index];
  404. if(current->head_offset == -1) return NULL; //没有数据
  405. mutex.lock();
  406. char *result = NULL;
  407. packet_info *packet = (packet_info *)(buffer + current->head_offset);
  408. if(packet->state == STATE_SENDING) {
  409. size = packet->data_size - packet->current_size;
  410. if(size == 0) {
  411. result = NULL;
  412. }
  413. else {
  414. result = (char *)packet + sizeof(packet_info) + packet->current_size;
  415. }
  416. }
  417. mutex.unlock();
  418. return result;
  419. }
  420. void ZBuffer::sendData(int index, int size) {
  421. //一定是获取发送缓冲区成功之后才调用,可以省掉一些错误处理
  422. connection_info *current = &connections[index];
  423. mutex.lock();
  424. packet_info *packet = (packet_info *)(buffer + current->head_offset);
  425. if(packet->state == STATE_SENDING) {
  426. packet->current_size += size;
  427. if(packet->current_size == packet->data_size) { //一个数据包发送完成
  428. if(bPerf) {
  429. send_perf.freeBuffer(packet->size, current);
  430. send_perf.perfPacket(packet->data_size, current); //统计性能
  431. }
  432. packet->state = STATE_FREE;
  433. current->head_offset = packet->next;
  434. if(current->head_offset < 0) 
  435. current->tail_offset = -1;
  436. }
  437. }
  438. //printf("data sent: %d - %dn", index, size);
  439. mutex.unlock();
  440. }
  441. bool ZBuffer::packData(int index, const char *data, int size) { //发送一个数据包的数据
  442. mutex.lock();
  443. connection_info *current = &connections[index];
  444. packet_info *packet = NULL;
  445. if(current->tail_offset >= 0 ) packet = (packet_info *)(buffer + current->tail_offset);
  446. if(!packet || packet->state != STATE_PACKING) {
  447. //记录当前尾部的偏移量
  448. if(!getNode(index, MAX_PACKET_SIZE)) {
  449. mutex.unlock();
  450. printf("getNode fail while packData()n");
  451. exit(0);
  452. return false;
  453. }
  454. packet = (packet_info *)(buffer + current->tail_offset);
  455. packet->state = STATE_PACKING;
  456. packet->current_size += sizeof(unsigned short);
  457. if (bPerf)
  458. {
  459. send_perf.useBuffer(packet->size, current);
  460. }
  461. }
  462. if(size + packet->current_size + sizeof(packet_info) > packet->size) { //超出了一个块的大小
  463. //printf("packing size a bit large...%08X, %d,%d,%dn", this, size,packet->current_size,packet->size);
  464. packet_info *old_packet = packet;
  465. packet_info *previous = (packet_info *)(buffer + current->head_offset);
  466. unsigned long old_tail = current->tail_offset;
  467. while(previous) {
  468. if(current->tail_offset == (char *)previous - buffer || previous->next == (char *)old_packet - buffer) { //找到尾巴了
  469. if(current->tail_offset == (char *)previous - buffer)
  470. current->head_offset = current->tail_offset = -1;
  471. else
  472. current->tail_offset = (char *)previous - buffer;
  473. if(!getNode(index, old_packet->size + MAX_PACKET_SIZE)) { //增加大小
  474. mutex.unlock();
  475. printf("getNode fail while packData() 2n");
  476. exit(0);
  477. return false;
  478. }
  479. old_packet->state = STATE_FREE;
  480. packet = (packet_info *)(buffer + current->tail_offset);
  481. if (bPerf)
  482. {
  483. send_perf.useBuffer(packet->size, current);
  484. send_perf.freeBuffer(old_packet->size, current);
  485. }
  486. memcpy((char *)packet + sizeof(packet_info),
  487. (char *)old_packet + sizeof(packet_info), old_packet->current_size);
  488. packet->current_size = old_packet->current_size;
  489. packet->state = STATE_PACKING;
  490. break;
  491. }
  492. if(previous->next == -1) {
  493. mutex.unlock();
  494. return false;
  495. }
  496. previous = (packet_info *)(buffer + previous->next);
  497. }
  498. }
  499. char *data_ptr = (char *)packet + sizeof(packet_info) + packet->current_size;
  500. memcpy(data_ptr, data, size);
  501. packet->current_size += size;
  502. mutex.unlock();
  503. return true;
  504. }
  505. char *ZBuffer::sendPacket(int index, int &datalength) {
  506. char *result = NULL;
  507. mutex.lock();
  508. connection_info *current = &connections[index];
  509. if(current->tail_offset >= 0) {
  510. packet_info *packet = (packet_info *)(buffer + current->tail_offset);
  511. if(packet->current_size > 0 && packet->state != STATE_SENDING) {
  512. packet->data_size = packet->current_size;
  513. datalength = packet->current_size - sizeof(unsigned short);
  514. result = (char *)packet + sizeof(packet_info) + sizeof(unsigned short);
  515. *(unsigned short *)((char *)packet + sizeof(packet_info)) = packet->data_size;
  516. packet->state = STATE_SENDING;
  517. packet->current_size = 0;
  518. if(bPerf) {
  519. send_perf.packet_number++;
  520. }
  521. }
  522. }
  523. mutex.unlock();
  524. // if (datalength > 0)
  525. //printf("prepare to send %d - %dn", index, datalength);
  526. return result;
  527. }
  528. #endif