P2PClient.cpp
上传用户:liguizhu
上传日期:2015-11-01
资源大小:2422k
文件大小:39k
源码类别:

P2P编程

开发平台:

Visual C++

  1. /*
  2.  *  Openmysee
  3.  *
  4.  *  This program is free software; you can redistribute it and/or modify
  5.  *  it under the terms of the GNU General Public License as published by
  6.  *  the Free Software Foundation; either version 2 of the License, or
  7.  *  (at your option) any later version.
  8.  *
  9.  *  This program is distributed in the hope that it will be useful,
  10.  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
  11.  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  12.  *  GNU General Public License for more details.
  13.  *
  14.  *  You should have received a copy of the GNU General Public License
  15.  *  along with this program; if not, write to the Free Software
  16.  *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
  17.  *
  18.  */
  19. // P2PClient.cpp: implementation of the P2PClient class.
  20. //
  21. //////////////////////////////////////////////////////////////////////
  22. #include "stdafx.h"
  23. #include "P2PClient.h"
  24. #include "Communicator.h"
  25. namespace NPLayer1 {
  26. //////////////////////////////////////////////////////////////////////
  27. // Construction/Destruction
  28. //////////////////////////////////////////////////////////////////////
  29. P2PClient::P2PClient() 
  30. : recvPointer(0)
  31. , sendPointer(0) 
  32. , isSameLan(false)
  33. {
  34. valid = FALSE;
  35. errStr[0] = 0;
  36. }
  37. P2PClient::~P2PClient() {
  38. if(valid)
  39. SetInvalid();
  40. }
  41. BOOL P2PClient::SetValid(Communicator* c, const ConnectingPeer& peer) {
  42. comm = c;
  43. recvOff = 0;
  44. valid = TRUE;
  45. m_Socket = peer.sock;
  46. isIncoming= peer.isIncoming;
  47. isForFree = peer.isForFree;
  48. isPassive = peer.isPassive;
  49. isSameRes = false;
  50. isSameLan = peer.isSameLan;
  51. // 到这里为止,我们完全不知道对方有哪些块
  52. remoteInterval.Clear();
  53. remotePush.Clear();
  54. localPush.Clear();
  55. sentMediaArray.Clear();
  56. connectionBeginTime = lastRecvDataTime = lastSendDataTime = lastRecvDataHdTime = timeGetTime();
  57. bGotFirstMsg = FALSE;
  58. memcpy((PeerInfoWithAddr*)&remotePeer, (PeerInfoWithAddr*)&peer, sizeof(PeerInfoWithAddr));
  59. comm->logFile.StatusOut("Connected on %s.", comm->p2pMgr.FormatIPAddress(remotePeer));
  60. if(GetIsCachePeer()) {
  61. remoteVersion = comm->cfgData.COMMUNICATOR_VERSION;
  62. remotePeer.layer = 0;
  63. }
  64. else {
  65. remoteVersion = 0.0f;
  66. remotePeer.layer = 0xff;
  67. }
  68. // clear transfer calculator members
  69. ClearTransferInfo();
  70. lastRequestStartTime = 0;
  71. lastRequestBytes = 0;
  72. m_reqStartTime = 0;
  73. m_transUsedTime = 0;
  74. if(!SendHello())
  75. return FALSE;
  76. CorePeerInfo thisPeer;
  77. comm->p2pMgr.GetSelfInfo(thisPeer);
  78. if(!SendReport(thisPeer, true))
  79. return FALSE;
  80. if(GetIsCachePeer()) {
  81. // 请求重新分配所有push list
  82. comm->p2pMgr.RedistributeAllBlocks(this);
  83. }
  84. return TRUE;
  85. }
  86. void P2PClient::SetInvalid() {
  87. if(valid) {
  88. valid = FALSE;
  89. remotePush.Clear();
  90. comm->logFile.StatusOut("Clear TCPPacket Send list...");
  91. while (!m_sendList.empty()) {
  92. comm->p2pMgr.ReleasePacket(m_sendList.front());
  93. m_sendList.pop_front();
  94. }
  95. comm->logFile.StatusOut("Disconnected from %s.", comm->p2pMgr.FormatIPAddress(remotePeer));
  96. comm->p2pMgr.SafeCloseSocket(m_Socket); // no force disconnect
  97. // 此处没有排除低版本客户端的情况,所以统计会有少许偏差
  98. comm->p2pMgr.ConnectionClosed(isIncoming, timeGetTime()-connectionBeginTime);
  99. }
  100. }
  101. BOOL P2PClient::BaseRecv() {
  102. int ret = recv(m_Socket, recvBuf+recvOff, P2P_BUF_SIZE-recvOff, 0);
  103. if(ret < 0) {
  104. DWORD lastError = ::WSAGetLastError();
  105. if (WSAEWOULDBLOCK != lastError) {
  106. comm->logFile.StatusErr("Receiving data on TCP", lastError);
  107. return FALSE;
  108. }
  109. else
  110. return TRUE;
  111. }
  112. else if(0 == ret) {
  113. comm->logFile.StatusOut("Connection has been disconnected gracefully.");
  114. return FALSE;
  115. }
  116. recvOff += ret;
  117. AddIncomingBytes(ret);
  118. comm->p2pMgr.AddIncomingBytes(ret);
  119. BOOL retVal = FALSE;
  120. for(;;) {
  121. // because multiple msgs can be received at once.
  122. // keep call parseMsg() till !MSG_COMPLETE
  123. MSG_STATE ms = ParseMsg(); 
  124. bool bBadAddr = false;
  125. switch(ms) {
  126. case MSG_COMPLETE:
  127. continue;
  128. case MSG_UNCOMPLETE:
  129. retVal = TRUE;
  130. break;
  131. case MSG_ERR_SIZE:
  132. sprintf(errStr, "错误的消息大小!");
  133. break;
  134. case MSG_ERR_TYPE: 
  135. sprintf(errStr, "错误的消息类型!");
  136. break;
  137. case MSG_DIFF_CHNL:
  138. sprintf(errStr, "属于不同的频道!");
  139. bBadAddr = true;
  140. break;
  141. case MSG_NOMORE_CONS:
  142. sprintf(errStr, "已经存在一个连接!");
  143. break;
  144. case MSG_ERR_LIST_SIZE:
  145. sprintf(errStr, "错误的列表大小!");
  146. break;
  147. case MSG_SEND_ERR:
  148. sprintf(errStr, "发送消息错误!");
  149. break;
  150. case MSG_SAVEDATA_ERR:
  151. sprintf(errStr, "无法保存数据!");
  152. break;
  153. case MSG_UNMATCH_BLOCKID:
  154. sprintf(errStr, "返回了并未请求的块!");
  155. break;
  156. case MSG_NOSUCH_RES_HERE:
  157. sprintf(errStr, "本机没有这个资源!");
  158. bBadAddr = true;
  159. break;
  160. case MSG_REMOTE_ERR:
  161. // 错误信息已经在errStr中了
  162. break;
  163. case MSG_CHNL_CLOSED:
  164. sprintf(errStr, "频道关闭!!");
  165. // 发送没有频道关闭的消息给外界
  166. comm->PostErrMessage(PNT_CHNL_CLOSED, 0, true);
  167. break;
  168. case MSG_NOSUCH_RES_SP:
  169. sprintf(errStr, "SP没有这个资源!!");
  170. // 发送没有资源的消息给外界
  171. comm->PostErrMessage(PNT_NO_SUCH_RES, 0, true);
  172. break;
  173. case MSG_CHNL_END:
  174. sprintf(errStr, "频道结束了!");
  175. // 发送频道结束的消息给外界
  176. comm->PostErrMessage(PNT_CHNL_ENDED, 0, true);
  177. break;
  178. case MSG_LOW_VERSION:
  179. sprintf(errStr, "对方客户端版本过低!");
  180. bBadAddr = true;
  181. comm->p2pMgr.AddLowVersionConCount();
  182. break;
  183. default:
  184. sprintf(errStr, "未知错误类型!");
  185. bBadAddr = true;
  186. break;
  187. }
  188. if(strlen(errStr) > 0) {
  189. comm->logFile.StatusOut("来自%s的错误: %s", comm->p2pMgr.FormatIPAddress(remotePeer), errStr);
  190. errStr[0] = 0;
  191. // reject this client
  192. comm->p2pMgr.AddBadAddr(remotePeer);
  193. }
  194. break;
  195. }
  196. return retVal;
  197. }
  198. MSG_STATE P2PClient::ParseMsg() {
  199. // 如果过小,则不是正常的包
  200. if(recvOff < sizeof(int)+sizeof(BYTE))
  201. return MSG_UNCOMPLETE;
  202. // 把移动指针放到数据的起始地址
  203. recvPointer = recvBuf;
  204. // 读取消息大小
  205. CSClient::CopyMoveSrc(&msgSize, recvPointer, sizeof(msgSize));
  206. // 读取消息类型
  207. UINT8 msgType;
  208. CSClient::CopyMoveSrc(&msgType, recvPointer, sizeof(msgType));
  209. // msgSize是否正常
  210. if(msgSize > P2P_BUF_SIZE || msgSize < sizeof(int)+sizeof(BYTE))
  211. return MSG_ERR_SIZE;
  212. // 因为P2P_RESPONSE包含数据,可能传输比较慢
  213. // 在这里至少我们知道发送的请求的到回应了,要根据这个判断请求是否超时
  214. if(msgType == P2P_RESPONSE) {
  215. lastRecvDataHdTime = timeGetTime();
  216. }
  217. // 是否包含完成的消息
  218. if(recvOff < msgSize)
  219. return MSG_UNCOMPLETE;
  220. MSG_STATE ret = MSG_COMPLETE;
  221. switch(msgType) {
  222. case P2P_HELLO:
  223. ret = OnHello();
  224. break;
  225. case P2P_SPUPDATE:
  226. ret = OnSPUpdate();
  227. break;
  228. case P2P_REPORT:
  229. ret = OnReport();
  230. break;
  231. case P2P_NEAR_PEERS:
  232. ret = OnNearPeers();
  233. break;
  234. case P2P_PUSHLIST:
  235. ret = OnPushList();
  236. break;
  237. case P2P_RESPONSE:
  238. ret = OnResponse();
  239. break;
  240. case P2P_MSG:
  241. ret = OnMsg();
  242. break;
  243. case P2P_REQMEDIA:
  244. ret = OnReqMedia();
  245. break;
  246. case P2P_MEDIATYPE:
  247. ret = OnMediaType();
  248. break;
  249. default:
  250. ret = MSG_ERR_TYPE;
  251. break;
  252. }
  253. // copy left data to start of recvBuf
  254. if(recvOff >= msgSize) {
  255. memcpy(recvBuf, recvBuf+msgSize, recvOff-msgSize);
  256. recvOff -= msgSize;
  257. }
  258. return ret;
  259. }
  260. MSG_STATE P2PClient::OnHello() {
  261. assert(!GetIsCachePeer());
  262. // NP version
  263. CSClient::CopyMoveSrc(&remoteVersion, recvPointer, sizeof(remoteVersion));
  264. if(remoteVersion < comm->cfgData.ACCEPT_VERSION) {
  265. comm->logFile.StatusOut("Reject low version client %s %.5f.", comm->p2pMgr.FormatIPAddress(remotePeer), remoteVersion);
  266. return MSG_LOW_VERSION;
  267. }
  268. // 对方需要的资源Hash码
  269. char resHashCode[MD5_LEN+1];
  270. resHashCode[MD5_LEN] = 0;
  271. CSClient::CopyMoveSrc(resHashCode, recvPointer, MD5_LEN);
  272. // 是否被动连接,因为对方可能是因为收到TS2NP_CONNECTO,被动连接过来的
  273. // 那本连接就算做outgoing,对方会将本连接算做incoming
  274. bool passiveConnect = false;
  275. CSClient::CopyMoveSrc(&passiveConnect, recvPointer, sizeof(passiveConnect));
  276. if(passiveConnect)
  277. isIncoming = false;
  278. // 对方Peer信息
  279. CSClient::CopyMoveSrc(&remotePeer, recvPointer, sizeof(remotePeer));
  280. comm->logFile.StatusOut("Got P2P_HELLO from to %s.", comm->p2pMgr.FormatIPAddress(remotePeer));
  281. // 如果对方资源和本机当前资源相同,则是非Free的连接
  282. if(comm->currRes && memcmp(resHashCode, comm->currRes->GetHashCode().data(), MD5_LEN) == 0) {
  283. isForFree = false;
  284. isSameRes = true;
  285. }
  286. else {
  287. return MSG_NOSUCH_RES_HERE;
  288. }
  289. if(isIncoming || passiveConnect) {
  290. // 此时isIncoming为true,说明是accept进来的连接,要在这里判断是否重复连接
  291. // 前面的passiveConnect可能改变了isIncoming的值, 
  292. // 而passiveConnect必定是incoming的连接,所以一并在这里判断
  293. if(!comm->p2pMgr.CheckN4One(remotePeer, isForFree, true, MAX_CONNECTION_PER_NP))
  294. return MSG_NOMORE_CONS;
  295. }
  296. // 如果对方不是CachePeer, 则第一个收到的消息应该在这里
  297. bGotFirstMsg = TRUE;
  298. if (remotePeer.outerIP.sin_addr.s_addr == comm->localAddress.outerIP.sin_addr.s_addr) {
  299. isSameLan = true;
  300. }
  301. comm->p2pMgr.ConnectionEstablished(isIncoming);
  302. return MSG_COMPLETE;
  303. }
  304. MSG_STATE P2PClient::OnSPUpdate() {
  305. if(!comm->currRes) {
  306. assert(0);
  307. return MSG_ERR_TYPE;
  308. }
  309. MSG_STATE state = MSG_COMPLETE;
  310. UINT oldMaxBlockID = UINT_MAX;
  311. // 记录旧的最大块ID
  312. oldMaxBlockID = comm->currRes->GetSPUpdate().maxBlockID;
  313. // 收到的SPUpdate
  314. SPUpdate tmpUpdate;
  315. CSClient::CopyMoveSrc(&tmpUpdate, recvPointer, sizeof(tmpUpdate));
  316. comm->logFile.StatusOut("Recv SPUpdate %d->%d from %s", tmpUpdate.minBlockID, tmpUpdate.maxBlockID, 
  317. comm->p2pMgr.FormatIPAddress(remotePeer));
  318. // 计算收到的SPUpdate的校验码
  319. PBYTE temp = reinterpret_cast<PBYTE>(recvPointer-sizeof(tmpUpdate));
  320. BYTE calsum = 0;
  321. for(int i = 0; i < sizeof(tmpUpdate); ++i) {
  322. calsum += *temp;
  323. temp++;
  324. }
  325. // 如果是不带校验的SPUpdate,则不接受
  326. if(msgSize != 5+sizeof(SPUpdate)+1) {
  327. comm->logFile.StatusOut("Old SPUpdate!");
  328. return MSG_COMPLETE;
  329. }
  330. // 读取对方发送的SPUpdate校验码
  331. BYTE sum = 0;
  332. CSClient::CopyMoveSrc(&sum, recvPointer, sizeof(sum));
  333. // 比较两个SPUpdate的校验码,必须符合
  334. if(calsum != sum) {
  335. comm->logFile.StatusOut("Bad SPUpdate, err sum!");
  336. return MSG_COMPLETE;
  337. }
  338. if(tmpUpdate.minBlockID == UINT_MAX && tmpUpdate.maxBlockID == UINT_MAX) {
  339. if(tmpUpdate.minKeySample == 0xffffffffffffffff && tmpUpdate.maxKeySample == 0xffffffffffffffff)
  340. state = MSG_NOSUCH_RES_SP; // SP 上没有这个资源
  341. else if(tmpUpdate.minKeySample == 0 && tmpUpdate.maxKeySample == 0)
  342. state = MSG_CHNL_CLOSED; // 这个频道已经关闭
  343. else 
  344. assert(0); // 错误的消息
  345. }
  346. if(tmpUpdate.minBlockID == 0 && tmpUpdate.maxBlockID == 0 && 
  347. tmpUpdate.minKeySample == 0 && tmpUpdate.maxKeySample == 0)
  348. state = MSG_CHNL_END;
  349. if(state == MSG_COMPLETE) {
  350. if(GetIsCachePeer()) {
  351. remoteInterval.Clear();
  352. remoteInterval.AddInterval(tmpUpdate.minBlockID, tmpUpdate.maxBlockID-tmpUpdate.minBlockID);
  353. }
  354. // 如果收到SPUpdate的maxBlockID比本机SPUpdate的maxBlockID更大
  355. // 则更新本机SPUpdate,并向比本机层数更高的连接广播
  356. if(tmpUpdate.maxBlockID > oldMaxBlockID) {
  357. comm->currRes->SetSPUpdate(tmpUpdate, sum);
  358. // 检查PushList状况,如果只剩0块了,就要求重新分配所有连接的块
  359. // 直播系统中连上CP的连接,不会接收到P2P_REPORT,所以只能根据spUpdate决定是否开始请求数据
  360. if(remotePush.GetValidSize() == 0 && GetIsCachePeer())
  361. comm->p2pMgr.RedistributeAllBlocks(this);
  362. // 广播最新的SPUpdate
  363. comm->p2pMgr.BroadCastSPUpdate(tmpUpdate, sum);
  364. }
  365. }
  366. else {
  367. // 广播频道的非正常状态
  368. comm->p2pMgr.BroadCastSPUpdate(tmpUpdate, sum);
  369. }
  370. return state;
  371. }
  372. MSG_STATE P2PClient::OnReport() {
  373. // 复制对方的信息
  374. CSClient::CopyMoveSrc((CorePeerInfo*)&remotePeer, recvPointer, sizeof(CorePeerInfo));
  375. // 是否更新全部区间列表
  376. bool bRefresh;
  377. CSClient::CopyMoveSrc(&bRefresh, recvPointer, sizeof(bRefresh));
  378. if(bRefresh)
  379. remoteInterval.Clear();
  380. // 如果REFRESH,那么只有一组区间;如果不是,那么有两组区间,先后是增加和删除
  381. for(UINT8 j = 0; j < (bRefresh?1:2); ++j) {
  382. // read interval count
  383. UINT8 intervalNum = 0;
  384. CSClient::CopyMoveSrc(&intervalNum, recvPointer, sizeof(intervalNum));
  385. assert(recvPointer - recvBuf < 1000);
  386. // read intervals
  387. for(UINT8 i = 0; i < intervalNum; ++i) {
  388. BlockInterval temp;
  389. CSClient::CopyMoveSrc(&temp, recvPointer, sizeof(temp));
  390. if(j == 0)
  391. remoteInterval.AddInterval(temp.start, temp.size);
  392. else
  393. remoteInterval.DelInterval(temp.start, temp.size);
  394. }
  395. // 如果对方新增了Block
  396. if(j == 0 && intervalNum > 0) {
  397. // 检查PushList状况,如果只剩0块了,就要求重新分配所有连接的块
  398. if(remotePush.GetValidSize() == 0 && comm->currRes)
  399. comm->p2pMgr.RedistributeAllBlocks(this);
  400. }
  401. }
  402. //comm->logFile.StatusOut("Recv P2P_REPORT from %s", comm->p2pMgr.FormatIPAddress(remotePeer));
  403. return MSG_COMPLETE;
  404. }
  405. MSG_STATE P2PClient::OnNearPeers() {
  406. // get size of peer list and move the pointer
  407. UINT8 listSize;
  408. CSClient::CopyMoveSrc(&listSize, recvPointer, sizeof(listSize));
  409. // 检查listSize是否正常
  410. if(static_cast<int>(listSize*sizeof(PeerInfoWithAddr)) > recvBuf+P2P_BUF_SIZE-recvPointer)
  411. return MSG_ERR_LIST_SIZE;
  412. // read peer one by one, and add to known peer list
  413. for(UINT8 i = 0; i < listSize; ++i) {
  414. PeerInfoWithAddr peer;
  415. CSClient::CopyMoveSrc(&peer, recvPointer, sizeof(peer));
  416. peer.isCachePeer = false;
  417. comm->p2pMgr.AddPeerInfo(peer);
  418. }
  419. comm->logFile.StatusOut("Recv P2P_NEAR_PEERS(%d NP) from %s.", listSize, comm->p2pMgr.FormatIPAddress(remotePeer));
  420. return MSG_COMPLETE;
  421. }
  422. MSG_STATE P2PClient::OnPushList() {
  423. // 当没有当前资源时,不给对方发送数据
  424. if(!comm->currRes)
  425. return MSG_SEND_ERR;
  426. // 是否更新整个PushList
  427. bool bRefresh;
  428. CSClient::CopyMoveSrc(&bRefresh, recvPointer, sizeof(bRefresh));
  429. if(bRefresh) {
  430. // 清空所有准备发送的块(已经开始发送的块,不用清空)
  431. while(DeleteSendingBlock(UINT_MAX));
  432. localPush.Clear();
  433. }
  434. UINT blockID = UINT_MAX;
  435. if(bRefresh) {
  436. UINT8 count;
  437. CSClient::CopyMoveSrc(&count, recvPointer, sizeof(count));
  438. assert(count < 30);
  439. for(UINT8 i = 0; i < count; ++i) {
  440. CSClient::CopyMoveSrc(&blockID, recvPointer, sizeof(blockID));
  441. // 查找本机有没有这个块,如果没有,立即告知对方;
  442. if(!comm->currRes->FindBlock(blockID)) {
  443. if(!SendResponse(blockID, false))
  444. return MSG_SEND_ERR;
  445. }
  446. else 
  447. localPush.AddBlock(blockID);
  448. }
  449. if(count)
  450. comm->logFile.StatusOut("Got request refresh(%d...) from %s.", blockID, comm->p2pMgr.FormatIPAddress(remotePeer));
  451. }
  452. else {
  453. UINT8 count;
  454. CSClient::CopyMoveSrc(&count, recvPointer, sizeof(count));
  455. assert(count < 2);
  456. for(UINT8 i = 0; i < count; ++i) {
  457. CSClient::CopyMoveSrc(&blockID, recvPointer, sizeof(blockID));
  458. // 查找本机有没有这个块,如果没有,立即告知对方;
  459. if(!comm->currRes->FindBlock(blockID)) {
  460. if(!SendResponse(blockID, false))
  461. return MSG_SEND_ERR;
  462. }
  463. else 
  464. localPush.AddBlock(blockID);
  465. }
  466. if(count)
  467. comm->logFile.StatusOut("Got request add(%d) from %s.", blockID, comm->p2pMgr.FormatIPAddress(remotePeer));
  468. CSClient::CopyMoveSrc(&count, recvPointer, sizeof(count));
  469. assert(count < 2);
  470. for(UINT8 i = 0; i < count; ++i) {
  471. CSClient::CopyMoveSrc(&blockID, recvPointer, sizeof(blockID));
  472. // 如果在localPush中查找并删除此块
  473. if(!localPush.DelBlock(blockID)) {
  474. // 如果在localPush中没有找到,则说明此块已经在发送列表中了,甚至可能已经被发送了
  475. // 尝试从发送列表中查找并删除此块
  476. if(!DeleteSendingBlock(blockID)) {
  477. comm->logFile.StatusOut("Ahhhhh..., the block has already been sent!");
  478. }
  479. }
  480. }
  481. if(count)
  482. comm->logFile.StatusOut("Got request del(%d) from %s.", blockID, comm->p2pMgr.FormatIPAddress(remotePeer));
  483. }
  484. // 发送push列表中的第一个块
  485. if(!SendFirstPushBlock())
  486. return MSG_SEND_ERR;
  487. return MSG_COMPLETE;
  488. }
  489. MSG_STATE P2PClient::OnResponse() {
  490. // 如果对方是CachePeer, 则第一个收到的消息应该在这里
  491. if(GetIsCachePeer())
  492. bGotFirstMsg = TRUE;
  493. MSG_STATE ret = MSG_COMPLETE;
  494. // 回应的BlockID和大小
  495. UINT blockID;
  496. CSClient::CopyMoveSrc(&blockID, recvPointer, sizeof(blockID));
  497. UINT blockSize;
  498. CSClient::CopyMoveSrc(&blockSize, recvPointer, sizeof(blockSize));
  499. // 检查Block大小是否正常
  500. if(static_cast<int>(blockSize) > recvBuf+P2P_BUF_SIZE-recvPointer) {
  501. ret = MSG_ERR_SIZE;
  502. assert(0);
  503. }
  504. // 检查对方返回的Block是不是在Push List之中,如果在,则删除之;
  505. // 如果不在,则检查是否在其他连接的Push List之中,如果在,则删除之
  506. DWORD reqTime;
  507. if(!remotePush.DelBlock(blockID, &reqTime))
  508. comm->p2pMgr.ReclaimBlocks(this, blockID);
  509. if(ret == MSG_COMPLETE) {
  510. if(blockSize > 0) {
  511. // 记录此次得到数据的时间
  512. lastRecvDataTime = timeGetTime();
  513. // 记录本次传输所用的时间
  514. m_transUsedTime += timeGetTime()-m_reqStartTime;
  515. //assert(lastRecvDataTime-m_reqStartTime < 25000);
  516. // 确实已经下载了,虽然可能发生存储错误
  517. comm->p2pMgr.AddBlockDataDown(blockSize);
  518. if(GetIsCachePeer())
  519. comm->p2pMgr.AddCPData(blockSize);
  520. // 存储此块
  521. P2P_RETURN_TYPE bSuccess = comm->currRes->PutBlock(blockID, blockSize, reinterpret_cast<PBYTE>(recvPointer));
  522. recvPointer += blockSize;
  523. if(bSuccess < PRT_OK) {
  524. if(bSuccess == PRT_BUFFER_FULL) {
  525. comm->logFile.StatusOut("缓冲区已满");
  526. Sleep(300);
  527. }
  528. else if(bSuccess == PRT_BAD_BLOCK) {
  529. if(!remotePeer.isCachePeer) {
  530. // 这个块是错误的,不再从这个NP下载此块
  531. remoteInterval.DelInterval(blockID, 1);
  532. }
  533. }
  534. else {
  535. comm->logFile.StatusErr("写入磁盘错误", GetLastError());
  536. // 发送写入磁盘错误消息
  537. comm->PostErrMessage(PNT_DISK_ERR, 0, true);
  538. ret = MSG_SAVEDATA_ERR;
  539. assert(0);
  540. }
  541. }
  542. char* temp = "";
  543. #ifdef _DEBUG
  544. MD5 md5(reinterpret_cast<BYTE*>(recvPointer-blockSize), blockSize);
  545. temp = md5.hex_digest();
  546. #endif
  547. comm->logFile.StatusOut("Got %sblock %d(%d)(%s)(used %dms) from %s.", 
  548. (bSuccess==PRT_BAD_BLOCK?"bad ":""), blockID, blockSize, temp, lastRecvDataTime-m_reqStartTime, 
  549. comm->p2pMgr.FormatIPAddress(remotePeer));
  550. #ifdef _DEBUG
  551. delete [] temp;
  552. // 记录下次传输开始的时间
  553. m_reqStartTime = timeGetTime();
  554. #endif
  555. }
  556. else {
  557. comm->logFile.StatusOut("Got block %d(%d) from %s.", 
  558. blockID, blockSize, comm->p2pMgr.FormatIPAddress(remotePeer));
  559. }
  560. // 既然得到了数据,登录不上TS的问题就算了:)
  561. comm->csClient.ResetLoginFail();
  562. }
  563. if(ret == MSG_COMPLETE) {
  564. if(blockSize == 0) {
  565. // 对方没有这个块,从区间表中删除,并将此块交给其他连接
  566. remoteInterval.DelInterval(blockID, 1);
  567. }
  568. // 检查PushList状况,如果只剩0块了,就要求重新分配所有连接的块
  569. if(remotePush.GetValidSize() == 0)
  570. comm->p2pMgr.RedistributeAllBlocks(this);
  571. }
  572. return ret;
  573. }
  574. MSG_STATE P2PClient::OnMsg() {
  575. //assert(GetIsCachePeer());
  576. // 错误代码
  577. UINT16 errCode;
  578. CSClient::CopyMoveSrc(&errCode, recvPointer, sizeof(errCode));
  579. // 是否需要断开连接
  580. bool shouldDisconnect;
  581. CSClient::CopyMoveSrc(&shouldDisconnect, recvPointer, sizeof(shouldDisconnect));
  582. // 根据错误代码处理
  583. switch(errCode) {
  584. case ERR_PROTOCOL_FORMAT:
  585. sprintf(errStr, "协议错误");
  586. break;
  587. case ERR_AUTHORIZATION:
  588. sprintf(errStr, "验证错误");
  589. break;
  590. case ERR_INTERNAL:
  591. sprintf(errStr, "未知错误");
  592. break;
  593. case ERR_CONNECTION_FULL:
  594. sprintf(errStr, "对方连接已满");
  595. break;
  596. default:
  597. shouldDisconnect = true;
  598. }
  599. if(shouldDisconnect) {
  600. // 停止连接这个地址
  601. comm->p2pMgr.AddBadAddr(remotePeer);
  602. return MSG_REMOTE_ERR;
  603. }
  604. return MSG_COMPLETE;
  605. }
  606. MSG_STATE P2PClient::OnReqMedia() {
  607. UINT blockID = 0;
  608. CSClient::CopyMoveSrc(&blockID, recvPointer, sizeof(blockID));
  609. if(!SendMediaType(blockID))
  610. return MSG_SEND_ERR;
  611. return MSG_COMPLETE;
  612. }
  613. MSG_STATE P2PClient::OnMediaType() {
  614. assert(comm->currRes);
  615. if(!comm->currRes)
  616. return MSG_DIFF_CHNL;
  617. // 一些较早版本的客户端会读取gtv文件中的媒体类型,然后发送很大的mediainterval,这种消息不予接受
  618. if(remoteVersion < comm->cfgData.ADD_PROGNAME_VERSION) {
  619. return MSG_COMPLETE;
  620. }
  621. MediaInterval mInterval;
  622. // 区间
  623. CSClient::CopyMoveSrc(&mInterval.start, recvPointer, sizeof(mInterval.start));
  624. CSClient::CopyMoveSrc(&mInterval.size, recvPointer, sizeof(mInterval.size));
  625. UINT len = 0;
  626. CSClient::CopyMoveSrc(&len, recvPointer, sizeof(len));
  627. if(len > 1024) {
  628. assert(0);
  629. return MSG_ERR_LIST_SIZE;
  630. }
  631. // 首先读取视频编码格式
  632. CSClient::CopyMoveSrc(&mInterval.videoType, recvPointer, sizeof(mInterval.videoType));
  633. if(mInterval.videoType.cbFormat > 1024) {
  634. assert(0);
  635. return MSG_ERR_LIST_SIZE;
  636. }
  637. mInterval.videoData = new BYTE[mInterval.videoType.cbFormat];
  638. CSClient::CopyMoveSrc(mInterval.videoData, recvPointer, mInterval.videoType.cbFormat);
  639. // 然后读取音频编码格式
  640. CSClient::CopyMoveSrc(&mInterval.audioType, recvPointer, sizeof(mInterval.audioType));
  641. if(mInterval.audioType.cbFormat > 1024) {
  642. assert(0);
  643. return MSG_ERR_LIST_SIZE;
  644. }
  645. mInterval.audioData = new BYTE[mInterval.audioType.cbFormat];
  646. CSClient::CopyMoveSrc(mInterval.audioData, recvPointer, mInterval.audioType.cbFormat);
  647. assert(len == mInterval.videoType.cbFormat+sizeof(mInterval.videoType)+mInterval.audioType.cbFormat+sizeof(mInterval.audioType));
  648. // 超过ADD_PROGNAME_VERSION的版本将发送节目的名字
  649. if(remoteVersion > comm->cfgData.ADD_PROGNAME_VERSION) {
  650. CSClient::CopyMoveSrc(&mInterval.pnamesize, recvPointer, sizeof(mInterval.pnamesize));
  651. /// 直播频道没有轮播节目名
  652. if(mInterval.pnamesize && mInterval.pnamesize + recvPointer-recvBuf <= msgSize) {
  653. mInterval.pname = new char[mInterval.pnamesize+1];
  654. CSClient::CopyMoveSrc(mInterval.pname, recvPointer, mInterval.pnamesize);
  655. mInterval.pname[mInterval.pnamesize] = 0;
  656. }
  657. else
  658. mInterval.pnamesize = 0; // 既然不能复制, 一定要置空!
  659. // 超过ADD_PROGNAME_VERSION的版本将发送节目的时间长度和频道的名字
  660. if(remoteVersion > comm->cfgData.ADD_PROGTIME_VERSION) {
  661. CSClient::CopyMoveSrc(&mInterval.progtime, recvPointer, sizeof(mInterval.progtime));
  662. CSClient::CopyMoveSrc(&mInterval.cnamesize, recvPointer, sizeof(mInterval.cnamesize));
  663. if(mInterval.cnamesize && mInterval.cnamesize + recvPointer-recvBuf <= msgSize) {
  664. mInterval.cname = new char[mInterval.cnamesize+1];
  665. CSClient::CopyMoveSrc(mInterval.cname, recvPointer, mInterval.cnamesize);
  666. mInterval.cname[mInterval.cnamesize] = 0;
  667. }
  668. else
  669. mInterval.cnamesize = 0; // 既然不能复制, 一定要置空!
  670. }
  671. }
  672. // add to interval array
  673. comm->currRes->AddMediaInterval(mInterval);
  674. return MSG_COMPLETE;
  675. }
  676. BOOL P2PClient::SendHello() {
  677. // 不可能没有currRes
  678. if(!comm->currRes)
  679. return FALSE;
  680. TCPPacket* packet;
  681. if(!SendBegin(packet, P2P_HELLO))
  682. return FALSE;
  683. // NP version
  684. CSClient::CopyMoveDes(sendPointer, &comm->cfgData.COMMUNICATOR_VERSION, sizeof(float));
  685. // 当前资源的Hash码
  686. CSClient::CopyMoveDes(sendPointer, comm->currRes->GetHashCode().data(), MD5_LEN);
  687. // 是否被动连接
  688. CSClient::CopyMoveDes(sendPointer, &isPassive, sizeof(isPassive));
  689. // 本机信息
  690. PeerInfoWithAddr thisPeer;
  691. memcpy(static_cast<P2PAddress*>(&thisPeer), &comm->localAddress, sizeof(comm->localAddress));
  692. comm->p2pMgr.GetSelfInfo(thisPeer);
  693. CSClient::CopyMoveDes(sendPointer, &thisPeer, sizeof(thisPeer));
  694. // 如果对方是CachePeer,还要发送SP的地址列表
  695. if(GetIsCachePeer()) {
  696. NormalAddress* spIPList = comm->currRes->GetSPList();
  697. if(spIPList) {
  698. UINT8 spIPListSize = comm->currRes->GetSPListSize();
  699. CSClient::CopyMoveDes(sendPointer, &spIPListSize, sizeof(spIPListSize));
  700. for(UINT8 i = 0; i < spIPListSize; ++i)
  701. CSClient::CopyMoveDes(sendPointer, &spIPList[i], sizeof(spIPList[i]));
  702. }
  703. }
  704. comm->logFile.StatusOut("Sending P2P_HELLO to %s.", comm->p2pMgr.FormatIPAddress(remotePeer));
  705. SendEnd(packet);
  706. return TRUE;
  707. }
  708. BOOL P2PClient::SendReport(const CorePeerInfo& peer, bool bRefresh) {
  709. TCPPacket* packet;
  710. if(!SendBegin(packet, P2P_REPORT))
  711. return FALSE;
  712. // PeerInfoWithAddr of local peer
  713. CSClient::CopyMoveDes(sendPointer, &peer, sizeof(peer));
  714. // 如果没有当前资源,则发送bRefresh=true和intervalNum=0
  715. if(!comm->currRes)
  716. bRefresh = true;
  717. // 是否更新全部区间列表
  718. CSClient::CopyMoveDes(sendPointer, &bRefresh, sizeof(bRefresh));
  719. // send block intervals
  720. // TODO: 这里存在重复代码,有待改进
  721. UINT8 intervalNum = comm->currRes?0xff:0;
  722. if(bRefresh) {
  723. if(comm->currRes) {
  724. // 如果是更新全部区间,则取得当前资源全部区间的列表,并发送
  725. comm->currRes->GetAllIntervals((BlockInterval*)(sendPointer+sizeof(intervalNum)), intervalNum);
  726. }
  727. CSClient::CopyMoveDes(sendPointer, &intervalNum, sizeof(intervalNum));
  728. sendPointer += intervalNum*sizeof(BlockInterval);
  729. }
  730. else {
  731. // 如果是更新变化的区间,则先后写入增加的区间和减少的区间
  732. if(comm->currRes)
  733. comm->currRes->GetDiffIntervals((BlockInterval*)(sendPointer+sizeof(intervalNum)), intervalNum, false, true);
  734. CSClient::CopyMoveDes(sendPointer, &intervalNum, sizeof(intervalNum));
  735. sendPointer += intervalNum*sizeof(BlockInterval);
  736. if(comm->currRes)
  737. comm->currRes->GetDiffIntervals((BlockInterval*)(sendPointer+sizeof(intervalNum)), intervalNum, false, false);
  738. CSClient::CopyMoveDes(sendPointer, &intervalNum, sizeof(intervalNum));
  739. sendPointer += intervalNum*sizeof(BlockInterval);
  740. }
  741. SendEnd(packet);
  742. //comm->logFile.StatusOut("Sending Report To %s", comm->p2pMgr.FormatIPAddress(remotePeer));
  743. return TRUE;
  744. }
  745. BOOL P2PClient::SendSPUpdate(const SPUpdate& spUpdate, UINT8 selfLayer, BYTE sum) {
  746. // 如果对方比自己离CP更近,那就不用发了
  747. if(selfLayer > remotePeer.layer)
  748. return TRUE;
  749. if(!isSameRes)
  750. return TRUE;
  751. TCPPacket* packet;
  752. if(!SendBegin(packet, P2P_SPUPDATE))
  753. return FALSE;
  754. // SPUpdate
  755. CSClient::CopyMoveDes(sendPointer, &spUpdate, sizeof(spUpdate));
  756. // SPUpdate sum
  757. CSClient::CopyMoveDes(sendPointer, &sum, sizeof(sum));
  758. SendEnd(packet);
  759. return TRUE;
  760. }
  761. BOOL P2PClient::SendNearPeers() {
  762. // 对方资源与本机相同时,才能发送有用的PeerInfo给它
  763. if(!isSameRes)
  764. return TRUE;
  765. TCPPacket* packet;
  766. if(!SendBegin(packet, P2P_NEAR_PEERS))
  767. return FALSE;
  768. list<PeerInfoWithAddr> peerList;
  769. comm->p2pMgr.GetPeersForNeighbour(peerList, 20, remotePeer);
  770. // 如果PeerList中元素个数大于0xff,则删除更多的元素
  771. UINT8 listSize = min(0xff, peerList.size());
  772. peerList.resize(listSize);
  773. if(listSize == 0) {
  774. // 释放这个Packet
  775. comm->p2pMgr.ReleasePacket(packet);
  776. return TRUE;
  777. }
  778. // 列表大小
  779. CSClient::CopyMoveDes(sendPointer, &listSize, sizeof(listSize));
  780. // write peerInfo one by one
  781. for(PeerAddrIt it = peerList.begin(); it != peerList.end(); ++it) {
  782. CSClient::CopyMoveDes(sendPointer, &*it, sizeof(PeerInfoWithAddr));
  783. }
  784. comm->logFile.StatusOut("Sending P2P_NEAR_PEERS to %s.", comm->p2pMgr.FormatIPAddress(remotePeer));
  785. SendEnd(packet);
  786. return TRUE;
  787. }
  788. BOOL P2PClient::SendPushList(UINT blockID, bool bAdd) {
  789. // 是否更新全部
  790. bool bRefresh = (blockID == UINT_MAX);
  791. // 如果大小为零就不必更新
  792. if(bRefresh && remotePush.GetValidSize() == 0)
  793. return TRUE;
  794. TCPPacket* packet;
  795. if(!SendBegin(packet, P2P_PUSHLIST))
  796. return FALSE;
  797. CSClient::CopyMoveDes(sendPointer, &bRefresh, sizeof(bRefresh));
  798. if(bRefresh) {
  799. // 更新整个PushList
  800. UINT8 count = 0xff;
  801. remotePush.CopyPushList(reinterpret_cast<UINT*>(sendPointer+sizeof(count)), count);
  802. CSClient::CopyMoveDes(sendPointer, &count, sizeof(count));
  803. sendPointer += count*sizeof(UINT);
  804. comm->logFile.StatusOut("Sending P2P_PUSHLIST %s block(%d) ... to %s.", 
  805. bRefresh?"Refresh":(bAdd?"Add":"Del"), count, comm->p2pMgr.FormatIPAddress(remotePeer));
  806. }
  807. else {
  808. UINT8 count = bAdd?1:0;
  809. CSClient::CopyMoveDes(sendPointer, &count, sizeof(count));
  810. if(bAdd)
  811. CSClient::CopyMoveDes(sendPointer, &blockID, sizeof(blockID));
  812. count = bAdd?0:1;
  813. CSClient::CopyMoveDes(sendPointer, &count, sizeof(count));
  814. if(!bAdd)
  815. CSClient::CopyMoveDes(sendPointer, &blockID, sizeof(blockID));
  816. comm->logFile.StatusOut("Sending P2P_PUSHLIST %s block %d to %s.", 
  817. bAdd?"Add":"Del",blockID, comm->p2pMgr.FormatIPAddress(remotePeer));
  818. }
  819. SendEnd(packet);
  820. return TRUE;
  821. BOOL P2PClient::SendResponse(UINT blockID, bool tryGetData) {
  822. // 此块所在的区间,其媒体类型是否已经发送过了
  823. if(!sentMediaArray.FindBlock(blockID)) {
  824. // 没有发送过,则发送此块所在区间的媒体类型
  825. if(!SendMediaType(blockID))
  826. return FALSE;
  827. }
  828. TCPPacket* packet;
  829. if(!SendBegin(packet, P2P_RESPONSE))
  830. return FALSE;
  831. // 请求的BlockID
  832. CSClient::CopyMoveDes(sendPointer, &blockID, sizeof(blockID));
  833. // 检查是否存在这个Block,取出他
  834. UINT blockSize = 0;
  835. // 如果需要获取数据,则尝试获取数据;如果只是为了告诉对方此块不存在,则不作尝试;
  836. if(comm->currRes && tryGetData) {
  837. P2P_RETURN_TYPE ret = comm->currRes->GetBlock(blockID, blockSize, sendPointer+sizeof(blockSize));
  838. if(ret < PRT_OK) {
  839. // 一定要释放这个packet
  840. comm->p2pMgr.ReleasePacket(packet);
  841. return ret;
  842. }
  843. }
  844. // write block size (bSize==0 means no data)
  845. CSClient::CopyMoveDes(sendPointer, &blockSize, sizeof(blockSize));
  846. // 将指针移动到数据的末尾
  847. sendPointer += blockSize;
  848. comm->logFile.StatusOut("Sending P2P_RESPONSE block(%d)(%d) to %s.", 
  849. blockID, blockSize, comm->p2pMgr.FormatIPAddress(remotePeer));
  850. if(blockSize > 0) {
  851. // 记录此次发送数据的时间
  852. lastSendDataTime = timeGetTime();
  853. }
  854. SendEnd(packet);
  855. if(tryGetData && blockSize == 0) {
  856. // 如果想要发送这个块,结果发现本机没有,则尝试发送下一个块
  857. return SendFirstPushBlock();
  858. }
  859. return TRUE;
  860. }
  861. BOOL P2PClient::SendReqMedia(UINT blockID) {
  862. TCPPacket* packet;
  863. if(!SendBegin(packet, P2P_REQMEDIA))
  864. return FALSE;
  865. CSClient::CopyMoveDes(sendPointer, &blockID, sizeof(blockID));
  866. SendEnd(packet);
  867. return TRUE;
  868. }
  869. BOOL P2PClient::SendMediaType(UINT blockID) {
  870. if(!comm->currRes)
  871. return FALSE;
  872. // 查找媒体类型
  873. MediaInterval mInterval;
  874. if(!comm->currRes->GetMediaInterval(blockID, mInterval))
  875. return TRUE;
  876. TCPPacket* packet;
  877. if(!SendBegin(packet, P2P_MEDIATYPE))
  878. return FALSE;
  879. // 写入区间
  880. CSClient::CopyMoveDes(sendPointer, &mInterval.start, sizeof(mInterval.start));
  881. CSClient::CopyMoveDes(sendPointer, &mInterval.size, sizeof(mInterval.size));
  882. // 写入媒体数据长度
  883. UINT len = sizeof(mInterval.videoType)+mInterval.videoType.cbFormat+sizeof(mInterval.audioType)+mInterval.audioType.cbFormat;
  884. CSClient::CopyMoveDes(sendPointer, &len, sizeof(len));
  885. // 写入媒体数据
  886. CSClient::CopyMoveDes(sendPointer, &mInterval.videoType, sizeof(mInterval.videoType));
  887. CSClient::CopyMoveDes(sendPointer, mInterval.videoData, mInterval.videoType.cbFormat);
  888. CSClient::CopyMoveDes(sendPointer, &mInterval.audioType, sizeof(mInterval.audioType));
  889. CSClient::CopyMoveDes(sendPointer, mInterval.audioData, mInterval.audioType.cbFormat);
  890. // 超过ADD_PROGNAME_VERSION的版本将发送节目的名字
  891. if(remoteVersion > comm->cfgData.ADD_PROGNAME_VERSION) {
  892. // 写入节目的名字
  893. CSClient::CopyMoveDes(sendPointer, &mInterval.pnamesize, sizeof(mInterval.pnamesize));
  894. CSClient::CopyMoveDes(sendPointer, mInterval.pname, mInterval.pnamesize);
  895. // 超过ADD_PROGTIME_VERSION的版本将发送节目的时间和频道名
  896. if(remoteVersion > comm->cfgData.ADD_PROGTIME_VERSION) {
  897. // 写入节目的时间长度
  898. CSClient::CopyMoveDes(sendPointer, &mInterval.progtime, sizeof(mInterval.progtime));
  899. // 写入频道的名字
  900. CSClient::CopyMoveDes(sendPointer, &mInterval.cnamesize, sizeof(mInterval.cnamesize));
  901. CSClient::CopyMoveDes(sendPointer, mInterval.cname, mInterval.cnamesize);
  902. }
  903. }
  904. SendEnd(packet);
  905. sentMediaArray.AddInterval(mInterval);
  906. return TRUE;
  907. }
  908. BOOL P2PClient::SendBegin(TCPPacket*& packet, UINT8 msgType) {
  909. packet= comm->p2pMgr.AllocatePacket();
  910. if(!packet)
  911. return FALSE;
  912. // 先留着消息大小不写,到最后再写
  913. sendPointer = packet->buf+sizeof(UINT);
  914. CSClient::CopyMoveDes(sendPointer, &msgType, sizeof(msgType));
  915. return TRUE;
  916. }
  917. void P2PClient::SendEnd(TCPPacket*& packet) {
  918. // 消息的大小就是移动的指针减去初始的指针
  919. packet->size = sendPointer-packet->buf;
  920. packet->sent = 0;
  921. memcpy(packet->buf, &packet->size, sizeof(packet->size));
  922. m_sendList.push_back(packet);
  923. }
  924. // 添加一个要Push的块
  925. bool P2PClient::AddPushBlock(UINT blockID, bool bRefreshing) {
  926. if(!remotePush.AddBlock(blockID, timeGetTime()))
  927. return false;
  928. if(remotePush.GetValidSize() == 1) {
  929. // pushlist大小从0到1,说明新的下载即将开始
  930. lastRecvDataHdTime = timeGetTime();
  931. // pushlist大小从0到1,记录此时的时间,用于统计实际传输速度
  932. m_reqStartTime = timeGetTime();
  933. //comm->logFile.StatusOut("Request Start Time %d", m_reqStartTime);
  934. }
  935. if(!bRefreshing) {
  936. // 如果不是正在更新整个PushList,就立即发送消息
  937. if(!SendPushList(blockID, true)) {
  938. if(!remotePush.DelBlock(blockID))
  939. assert(0);
  940. return false;
  941. }
  942. }
  943. return true;
  944. }
  945. // 查找一个PushList中的块
  946. bool P2PClient::FindPushBlock(const UINT blockID) const {
  947. return remotePush.FindBlock(blockID);
  948. }
  949. // 删除一个要Push的块
  950. bool P2PClient::DelPushBlock(UINT blockID) {
  951. // 从PushList中查找并删除此块
  952. if(!remotePush.DelBlock(blockID))
  953. return false;
  954. // 发送删除一个块的消息
  955. return SendPushList(blockID, false) == TRUE;
  956. }
  957. // Push List是否已满
  958. bool P2PClient::IsPushListFull() {
  959. return (remotePush.GetValidSize() == remotePush.GetTotalSize());
  960. }
  961. // 取得Pushlist大小
  962. UINT8 P2PClient::GetTotalPushSize() {
  963. return remotePush.GetTotalSize();
  964. }
  965. // 清空PushList
  966. void P2PClient::ClearPush() {
  967. remotePush.Clear();
  968. }
  969. // 发送整个PushList,因为之前重新分配了所有连接的块
  970. void P2PClient::SendPush() {
  971. if(!SendPushList(UINT_MAX, false))
  972. assert(0);
  973. }
  974. // 重新加载PushList,只有在普通的下载时才会被调用
  975. void P2PClient::ReloadPushList() {
  976. if(remotePush.GetValidSize() > 0)
  977. return;
  978. UINT array[P2P_HIGH_PUSH];
  979. std::fill(array, array+P2P_HIGH_PUSH, UINT_MAX);
  980. UINT count = 0;
  981. // 计算此连接一次push的大小
  982. UINT pushNum = P2P_MID_PUSH; // default
  983. // 注意两者单位不同,前者是BPS, 后者是KBPS
  984. if(avgDownSpeed/1024/comm->currRes->GetBitRate() < 0.1667f)
  985. pushNum = P2P_LOW_PUSH;
  986. else if(avgDownSpeed/1024/comm->currRes->GetBitRate() > 0.5f)
  987. pushNum = P2P_HIGH_PUSH;
  988. //else 
  989. // the default value
  990. // 1. 获取可以下载的区间列表
  991. IntervalArray downloadable;
  992. comm->currRes->GetDownloadableArray(remoteInterval, downloadable);
  993. if(downloadable.IsEmpty()) {
  994. return;
  995. }
  996. // 2. 遍历此区间列表,根据条件排除不可以下载的块,直到遍历完毕或者找到pushNum个Block
  997. BlockInterval temp;
  998. while(!downloadable.IsEmpty() && count < pushNum) {
  999. downloadable.PopFront(temp);
  1000. if(temp.size == 0)
  1001. break;
  1002. for(UINT i = temp.start; i < temp.start+temp.size && count < pushNum; ++i) {
  1003. // 如果此块有别人正在下载,则去除此块
  1004. if(comm->p2pMgr.FindInAllPushList(i, this))
  1005. continue;
  1006. // 如果有其他NP有此块,那么就不要从CP上下载了, 前提是缓冲进度100%而且有此块的NP下载速度大于5.0KBPS
  1007. if(GetIsCachePeer() && 
  1008. comm->currRes->GetBufferPercent() == 100 && 
  1009. comm->p2pMgr.FindInAllRemoteInterval(i, this, false, 5.0f) > 0)
  1010. continue;
  1011. // 如果是外网节点,且内网节点上有,则不从这下载
  1012. if (GetIsSameLan() &&
  1013. comm->p2pMgr.FindInAllLanRemoteInterval(i, this, 5.0f) > 0)
  1014. continue;
  1015. // 记录一个可以下载的块
  1016. array[count++] = i;
  1017. }
  1018. }
  1019. if(count) {
  1020. // 根据缓冲情况决定是否要随机下载
  1021. if(comm->currRes->GetBufferPercent() == 100) {
  1022. random_shuffle(array, array+count);
  1023. comm->logFile.StatusOut("Do by random %d.", pushNum);
  1024. }
  1025. else
  1026. comm->logFile.StatusOut("Do by order %d.", pushNum);
  1027. // 添加到pushlist中,并发送
  1028. UINT addNum = 0;
  1029. for(UINT i = 0; i < count && addNum < pushNum; ++i) {
  1030. if(array[i] != UINT_MAX) {
  1031. if(!AddPushBlock(array[i], true))
  1032. break;
  1033. addNum++;
  1034. }
  1035. }
  1036. if(addNum)
  1037. SendPush();
  1038. }
  1039. }
  1040. // 发送push列表中的第一个块
  1041. BOOL P2PClient::SendFirstPushBlock() {
  1042. // 是否有需要发送的块,如果有,取出第一个(也是最小的一个)
  1043. UINT blockID;
  1044. localPush.GetFirstBlock(blockID);
  1045. if(blockID == UINT_MAX)
  1046. return TRUE;
  1047. // 检查当前发送列表中是否已经有本连接的块,
  1048. // 1. 如果没有,那么发送;
  1049. // 2. 如果有,则暂时不发送
  1050. if(!CheckSendingBlock())
  1051. return SendResponse(blockID, true);
  1052. localPush.AddBlock(blockID);
  1053. return TRUE;
  1054. }
  1055. BOOL P2PClient::IsIdleTooLong() {
  1056. DWORD tNow = timeGetTime();
  1057. // 对于Listener, 是上次发送数据的时间;对于Connector,是上次收到数据的时间
  1058. //int transferIdle = isIncoming?tNow-lastSendDataTime:tNow-lastRecvDataTime;
  1059. int transferIdle = min(tNow-lastSendDataTime, tNow-lastRecvDataTime);
  1060. // 明明push list非空,但是却没有收到数据头部的时间
  1061. int responseIdle = tNow-lastRecvDataHdTime;
  1062. GenerateTransferInfo(TRUE);
  1063. TransferInfo tiTemp;
  1064. GetTransferInfo(tiTemp);
  1065. comm->logFile.StatusOut("%s: transfer idle %d. response idle %d. rn
  1066. tcurr down/up:%.2f/%.2f, avg down/up:%.2f/%.2f, total down/up:%.2f/%.2fMB. layer: %d. bandwidth: %.4fKBPS %s.", 
  1067. comm->p2pMgr.FormatIPAddress(remotePeer), 
  1068. transferIdle, responseIdle, 
  1069. tiTemp.currDownSpeed/1024, tiTemp.currUpSpeed/1024, 
  1070. tiTemp.avgDownSpeed/1024, tiTemp.avgUpSpeed/1024, 
  1071. tiTemp.totalDownBytes/1024.f/1024.f, tiTemp.totalUpBytes/1024.f/1024.f, 
  1072. GetLayer(), GetBandWidth(), isIncoming?"in":"out");
  1073. BOOL kill = FALSE;
  1074. if(bGotFirstMsg) {
  1075. if(isIncoming)
  1076. kill = (transferIdle > MAX_INCOMING_SENDDATA_IDLE);
  1077. else
  1078. kill = (transferIdle > MAX_OUTGOING_RECVDATA_IDLE);
  1079. // TOOD: 如果pushlist非空,但是有一段时间没有数据的头部了,那么就杀掉此连接
  1080. //       注意此timeout时间比transfer timeout要短一些
  1081. kill = kill || (remotePush.GetValidSize() != 0 && responseIdle > MAX_RESPONSE_IDLE);
  1082. }
  1083. else {
  1084. int startIdle = transferIdle;
  1085. if(GetIsCachePeer()) {
  1086. kill = (startIdle > MAX_CP_FIRST_MSG_IDLE);
  1087. }
  1088. else
  1089. kill = (startIdle > MAX_NP_FIRST_MSG_IDLE);
  1090. }
  1091. if(kill) {
  1092. comm->logFile.StatusOut("Kill Idle %s.", comm->p2pMgr.FormatIPAddress(remotePeer));
  1093. }
  1094. return kill;
  1095. }
  1096. // 检查当前发送列表中是否已经有某连接的块,
  1097. // 1. 如果有,返回TRUE
  1098. // 2. 如果没有,那么返回FALSE;
  1099. BOOL P2PClient::CheckSendingBlock() {
  1100. TCPPacket* packet = NULL;
  1101. UINT temp;
  1102. for(TCPPackIt it = m_sendList.begin(); it != m_sendList.end(); ++it) {
  1103. packet = *it;
  1104. // 如果消息类型不是P2P_RESPONSE,接着找下一个
  1105. if(packet->buf[4] != P2P_RESPONSE)
  1106. continue;
  1107. // 如果blockSize为0,接着找下一个
  1108. memcpy(&temp, packet->GetBlockSize(), sizeof(temp));
  1109. if(temp == 0)
  1110. continue;
  1111. // 已经有准备发送的块
  1112. return TRUE;
  1113. }
  1114. // 没有找到,说明可以发送blockID
  1115. return FALSE;
  1116. }
  1117. // 从发送列表中查找一个块,如果此块准备发送(packet->send == 0),则删除并返回TRUE;
  1118. // 如果此块已经开始发送,则直接返回TRUE。如未找到,则返回FALSE
  1119. // 如果blockID是UINT_MAX,则删除任意一个准备发送的块
  1120. BOOL P2PClient::DeleteSendingBlock(UINT blockID) {
  1121. TCPPacket* packet = NULL;
  1122. UINT temp;
  1123. UINT index = 0;
  1124. for(TCPPackIt it = m_sendList.begin(); it != m_sendList.end(); ++it, index++) {
  1125. packet = *it;
  1126. // 如果消息类型不是P2P_RESPONSE,接着找下一个
  1127. if(packet->GetMsgType() != P2P_RESPONSE)
  1128. continue;
  1129. // 如果不是目标块,则接着找下一个
  1130. // 如果blockID是UINT_MAX,则删除所有准备发送的块
  1131. memcpy(&temp, packet->buf+5, sizeof(temp));
  1132. if(temp != blockID && blockID != UINT_MAX)
  1133. continue;
  1134. // 如果已经开始发送,就不能打断,返回TRUE
  1135. if(packet->sent > 0)
  1136. return TRUE;
  1137. // 如果是尚未开始发送的块,则删除,返回TRUE
  1138. comm->p2pMgr.ReleasePacket(packet);
  1139. m_sendList.erase(it);
  1140. return TRUE;
  1141. }
  1142. // 没有找到
  1143. return FALSE;
  1144. }
  1145. }