P2PClient.cpp
资源名称:p2p_vod.rar [点击查看]
上传用户:liguizhu
上传日期:2015-11-01
资源大小:2422k
文件大小:39k
源码类别:
P2P编程
开发平台:
Visual C++
- /*
- * Openmysee
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 2 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
- *
- */
- // P2PClient.cpp: implementation of the P2PClient class.
- //
- //////////////////////////////////////////////////////////////////////
- #include "stdafx.h"
- #include "P2PClient.h"
- #include "Communicator.h"
- namespace NPLayer1 {
- //////////////////////////////////////////////////////////////////////
- // Construction/Destruction
- //////////////////////////////////////////////////////////////////////
- P2PClient::P2PClient()
- : recvPointer(0)
- , sendPointer(0)
- , isSameLan(false)
- {
- valid = FALSE;
- errStr[0] = 0;
- }
- P2PClient::~P2PClient() {
- if(valid)
- SetInvalid();
- }
- BOOL P2PClient::SetValid(Communicator* c, const ConnectingPeer& peer) {
- comm = c;
- recvOff = 0;
- valid = TRUE;
- m_Socket = peer.sock;
- isIncoming= peer.isIncoming;
- isForFree = peer.isForFree;
- isPassive = peer.isPassive;
- isSameRes = false;
- isSameLan = peer.isSameLan;
- // 到这里为止,我们完全不知道对方有哪些块
- remoteInterval.Clear();
- remotePush.Clear();
- localPush.Clear();
- sentMediaArray.Clear();
- connectionBeginTime = lastRecvDataTime = lastSendDataTime = lastRecvDataHdTime = timeGetTime();
- bGotFirstMsg = FALSE;
- memcpy((PeerInfoWithAddr*)&remotePeer, (PeerInfoWithAddr*)&peer, sizeof(PeerInfoWithAddr));
- comm->logFile.StatusOut("Connected on %s.", comm->p2pMgr.FormatIPAddress(remotePeer));
- if(GetIsCachePeer()) {
- remoteVersion = comm->cfgData.COMMUNICATOR_VERSION;
- remotePeer.layer = 0;
- }
- else {
- remoteVersion = 0.0f;
- remotePeer.layer = 0xff;
- }
- // clear transfer calculator members
- ClearTransferInfo();
- lastRequestStartTime = 0;
- lastRequestBytes = 0;
- m_reqStartTime = 0;
- m_transUsedTime = 0;
- if(!SendHello())
- return FALSE;
- CorePeerInfo thisPeer;
- comm->p2pMgr.GetSelfInfo(thisPeer);
- if(!SendReport(thisPeer, true))
- return FALSE;
- if(GetIsCachePeer()) {
- // 请求重新分配所有push list
- comm->p2pMgr.RedistributeAllBlocks(this);
- }
- return TRUE;
- }
- void P2PClient::SetInvalid() {
- if(valid) {
- valid = FALSE;
- remotePush.Clear();
- comm->logFile.StatusOut("Clear TCPPacket Send list...");
- while (!m_sendList.empty()) {
- comm->p2pMgr.ReleasePacket(m_sendList.front());
- m_sendList.pop_front();
- }
- comm->logFile.StatusOut("Disconnected from %s.", comm->p2pMgr.FormatIPAddress(remotePeer));
- comm->p2pMgr.SafeCloseSocket(m_Socket); // no force disconnect
- // 此处没有排除低版本客户端的情况,所以统计会有少许偏差
- comm->p2pMgr.ConnectionClosed(isIncoming, timeGetTime()-connectionBeginTime);
- }
- }
- BOOL P2PClient::BaseRecv() {
- int ret = recv(m_Socket, recvBuf+recvOff, P2P_BUF_SIZE-recvOff, 0);
- if(ret < 0) {
- DWORD lastError = ::WSAGetLastError();
- if (WSAEWOULDBLOCK != lastError) {
- comm->logFile.StatusErr("Receiving data on TCP", lastError);
- return FALSE;
- }
- else
- return TRUE;
- }
- else if(0 == ret) {
- comm->logFile.StatusOut("Connection has been disconnected gracefully.");
- return FALSE;
- }
- recvOff += ret;
- AddIncomingBytes(ret);
- comm->p2pMgr.AddIncomingBytes(ret);
- BOOL retVal = FALSE;
- for(;;) {
- // because multiple msgs can be received at once.
- // keep call parseMsg() till !MSG_COMPLETE
- MSG_STATE ms = ParseMsg();
- bool bBadAddr = false;
- switch(ms) {
- case MSG_COMPLETE:
- continue;
- case MSG_UNCOMPLETE:
- retVal = TRUE;
- break;
- case MSG_ERR_SIZE:
- sprintf(errStr, "错误的消息大小!");
- break;
- case MSG_ERR_TYPE:
- sprintf(errStr, "错误的消息类型!");
- break;
- case MSG_DIFF_CHNL:
- sprintf(errStr, "属于不同的频道!");
- bBadAddr = true;
- break;
- case MSG_NOMORE_CONS:
- sprintf(errStr, "已经存在一个连接!");
- break;
- case MSG_ERR_LIST_SIZE:
- sprintf(errStr, "错误的列表大小!");
- break;
- case MSG_SEND_ERR:
- sprintf(errStr, "发送消息错误!");
- break;
- case MSG_SAVEDATA_ERR:
- sprintf(errStr, "无法保存数据!");
- break;
- case MSG_UNMATCH_BLOCKID:
- sprintf(errStr, "返回了并未请求的块!");
- break;
- case MSG_NOSUCH_RES_HERE:
- sprintf(errStr, "本机没有这个资源!");
- bBadAddr = true;
- break;
- case MSG_REMOTE_ERR:
- // 错误信息已经在errStr中了
- break;
- case MSG_CHNL_CLOSED:
- sprintf(errStr, "频道关闭!!");
- // 发送没有频道关闭的消息给外界
- comm->PostErrMessage(PNT_CHNL_CLOSED, 0, true);
- break;
- case MSG_NOSUCH_RES_SP:
- sprintf(errStr, "SP没有这个资源!!");
- // 发送没有资源的消息给外界
- comm->PostErrMessage(PNT_NO_SUCH_RES, 0, true);
- break;
- case MSG_CHNL_END:
- sprintf(errStr, "频道结束了!");
- // 发送频道结束的消息给外界
- comm->PostErrMessage(PNT_CHNL_ENDED, 0, true);
- break;
- case MSG_LOW_VERSION:
- sprintf(errStr, "对方客户端版本过低!");
- bBadAddr = true;
- comm->p2pMgr.AddLowVersionConCount();
- break;
- default:
- sprintf(errStr, "未知错误类型!");
- bBadAddr = true;
- break;
- }
- if(strlen(errStr) > 0) {
- comm->logFile.StatusOut("来自%s的错误: %s", comm->p2pMgr.FormatIPAddress(remotePeer), errStr);
- errStr[0] = 0;
- // reject this client
- comm->p2pMgr.AddBadAddr(remotePeer);
- }
- break;
- }
- return retVal;
- }
- MSG_STATE P2PClient::ParseMsg() {
- // 如果过小,则不是正常的包
- if(recvOff < sizeof(int)+sizeof(BYTE))
- return MSG_UNCOMPLETE;
- // 把移动指针放到数据的起始地址
- recvPointer = recvBuf;
- // 读取消息大小
- CSClient::CopyMoveSrc(&msgSize, recvPointer, sizeof(msgSize));
- // 读取消息类型
- UINT8 msgType;
- CSClient::CopyMoveSrc(&msgType, recvPointer, sizeof(msgType));
- // msgSize是否正常
- if(msgSize > P2P_BUF_SIZE || msgSize < sizeof(int)+sizeof(BYTE))
- return MSG_ERR_SIZE;
- // 因为P2P_RESPONSE包含数据,可能传输比较慢
- // 在这里至少我们知道发送的请求的到回应了,要根据这个判断请求是否超时
- if(msgType == P2P_RESPONSE) {
- lastRecvDataHdTime = timeGetTime();
- }
- // 是否包含完成的消息
- if(recvOff < msgSize)
- return MSG_UNCOMPLETE;
- MSG_STATE ret = MSG_COMPLETE;
- switch(msgType) {
- case P2P_HELLO:
- ret = OnHello();
- break;
- case P2P_SPUPDATE:
- ret = OnSPUpdate();
- break;
- case P2P_REPORT:
- ret = OnReport();
- break;
- case P2P_NEAR_PEERS:
- ret = OnNearPeers();
- break;
- case P2P_PUSHLIST:
- ret = OnPushList();
- break;
- case P2P_RESPONSE:
- ret = OnResponse();
- break;
- case P2P_MSG:
- ret = OnMsg();
- break;
- case P2P_REQMEDIA:
- ret = OnReqMedia();
- break;
- case P2P_MEDIATYPE:
- ret = OnMediaType();
- break;
- default:
- ret = MSG_ERR_TYPE;
- break;
- }
- // copy left data to start of recvBuf
- if(recvOff >= msgSize) {
- memcpy(recvBuf, recvBuf+msgSize, recvOff-msgSize);
- recvOff -= msgSize;
- }
- return ret;
- }
- MSG_STATE P2PClient::OnHello() {
- assert(!GetIsCachePeer());
- // NP version
- CSClient::CopyMoveSrc(&remoteVersion, recvPointer, sizeof(remoteVersion));
- if(remoteVersion < comm->cfgData.ACCEPT_VERSION) {
- comm->logFile.StatusOut("Reject low version client %s %.5f.", comm->p2pMgr.FormatIPAddress(remotePeer), remoteVersion);
- return MSG_LOW_VERSION;
- }
- // 对方需要的资源Hash码
- char resHashCode[MD5_LEN+1];
- resHashCode[MD5_LEN] = 0;
- CSClient::CopyMoveSrc(resHashCode, recvPointer, MD5_LEN);
- // 是否被动连接,因为对方可能是因为收到TS2NP_CONNECTO,被动连接过来的
- // 那本连接就算做outgoing,对方会将本连接算做incoming
- bool passiveConnect = false;
- CSClient::CopyMoveSrc(&passiveConnect, recvPointer, sizeof(passiveConnect));
- if(passiveConnect)
- isIncoming = false;
- // 对方Peer信息
- CSClient::CopyMoveSrc(&remotePeer, recvPointer, sizeof(remotePeer));
- comm->logFile.StatusOut("Got P2P_HELLO from to %s.", comm->p2pMgr.FormatIPAddress(remotePeer));
- // 如果对方资源和本机当前资源相同,则是非Free的连接
- if(comm->currRes && memcmp(resHashCode, comm->currRes->GetHashCode().data(), MD5_LEN) == 0) {
- isForFree = false;
- isSameRes = true;
- }
- else {
- return MSG_NOSUCH_RES_HERE;
- }
- if(isIncoming || passiveConnect) {
- // 此时isIncoming为true,说明是accept进来的连接,要在这里判断是否重复连接
- // 前面的passiveConnect可能改变了isIncoming的值,
- // 而passiveConnect必定是incoming的连接,所以一并在这里判断
- if(!comm->p2pMgr.CheckN4One(remotePeer, isForFree, true, MAX_CONNECTION_PER_NP))
- return MSG_NOMORE_CONS;
- }
- // 如果对方不是CachePeer, 则第一个收到的消息应该在这里
- bGotFirstMsg = TRUE;
- if (remotePeer.outerIP.sin_addr.s_addr == comm->localAddress.outerIP.sin_addr.s_addr) {
- isSameLan = true;
- }
- comm->p2pMgr.ConnectionEstablished(isIncoming);
- return MSG_COMPLETE;
- }
- MSG_STATE P2PClient::OnSPUpdate() {
- if(!comm->currRes) {
- assert(0);
- return MSG_ERR_TYPE;
- }
- MSG_STATE state = MSG_COMPLETE;
- UINT oldMaxBlockID = UINT_MAX;
- // 记录旧的最大块ID
- oldMaxBlockID = comm->currRes->GetSPUpdate().maxBlockID;
- // 收到的SPUpdate
- SPUpdate tmpUpdate;
- CSClient::CopyMoveSrc(&tmpUpdate, recvPointer, sizeof(tmpUpdate));
- comm->logFile.StatusOut("Recv SPUpdate %d->%d from %s", tmpUpdate.minBlockID, tmpUpdate.maxBlockID,
- comm->p2pMgr.FormatIPAddress(remotePeer));
- // 计算收到的SPUpdate的校验码
- PBYTE temp = reinterpret_cast<PBYTE>(recvPointer-sizeof(tmpUpdate));
- BYTE calsum = 0;
- for(int i = 0; i < sizeof(tmpUpdate); ++i) {
- calsum += *temp;
- temp++;
- }
- // 如果是不带校验的SPUpdate,则不接受
- if(msgSize != 5+sizeof(SPUpdate)+1) {
- comm->logFile.StatusOut("Old SPUpdate!");
- return MSG_COMPLETE;
- }
- // 读取对方发送的SPUpdate校验码
- BYTE sum = 0;
- CSClient::CopyMoveSrc(&sum, recvPointer, sizeof(sum));
- // 比较两个SPUpdate的校验码,必须符合
- if(calsum != sum) {
- comm->logFile.StatusOut("Bad SPUpdate, err sum!");
- return MSG_COMPLETE;
- }
- if(tmpUpdate.minBlockID == UINT_MAX && tmpUpdate.maxBlockID == UINT_MAX) {
- if(tmpUpdate.minKeySample == 0xffffffffffffffff && tmpUpdate.maxKeySample == 0xffffffffffffffff)
- state = MSG_NOSUCH_RES_SP; // SP 上没有这个资源
- else if(tmpUpdate.minKeySample == 0 && tmpUpdate.maxKeySample == 0)
- state = MSG_CHNL_CLOSED; // 这个频道已经关闭
- else
- assert(0); // 错误的消息
- }
- if(tmpUpdate.minBlockID == 0 && tmpUpdate.maxBlockID == 0 &&
- tmpUpdate.minKeySample == 0 && tmpUpdate.maxKeySample == 0)
- state = MSG_CHNL_END;
- if(state == MSG_COMPLETE) {
- if(GetIsCachePeer()) {
- remoteInterval.Clear();
- remoteInterval.AddInterval(tmpUpdate.minBlockID, tmpUpdate.maxBlockID-tmpUpdate.minBlockID);
- }
- // 如果收到SPUpdate的maxBlockID比本机SPUpdate的maxBlockID更大
- // 则更新本机SPUpdate,并向比本机层数更高的连接广播
- if(tmpUpdate.maxBlockID > oldMaxBlockID) {
- comm->currRes->SetSPUpdate(tmpUpdate, sum);
- // 检查PushList状况,如果只剩0块了,就要求重新分配所有连接的块
- // 直播系统中连上CP的连接,不会接收到P2P_REPORT,所以只能根据spUpdate决定是否开始请求数据
- if(remotePush.GetValidSize() == 0 && GetIsCachePeer())
- comm->p2pMgr.RedistributeAllBlocks(this);
- // 广播最新的SPUpdate
- comm->p2pMgr.BroadCastSPUpdate(tmpUpdate, sum);
- }
- }
- else {
- // 广播频道的非正常状态
- comm->p2pMgr.BroadCastSPUpdate(tmpUpdate, sum);
- }
- return state;
- }
- MSG_STATE P2PClient::OnReport() {
- // 复制对方的信息
- CSClient::CopyMoveSrc((CorePeerInfo*)&remotePeer, recvPointer, sizeof(CorePeerInfo));
- // 是否更新全部区间列表
- bool bRefresh;
- CSClient::CopyMoveSrc(&bRefresh, recvPointer, sizeof(bRefresh));
- if(bRefresh)
- remoteInterval.Clear();
- // 如果REFRESH,那么只有一组区间;如果不是,那么有两组区间,先后是增加和删除
- for(UINT8 j = 0; j < (bRefresh?1:2); ++j) {
- // read interval count
- UINT8 intervalNum = 0;
- CSClient::CopyMoveSrc(&intervalNum, recvPointer, sizeof(intervalNum));
- assert(recvPointer - recvBuf < 1000);
- // read intervals
- for(UINT8 i = 0; i < intervalNum; ++i) {
- BlockInterval temp;
- CSClient::CopyMoveSrc(&temp, recvPointer, sizeof(temp));
- if(j == 0)
- remoteInterval.AddInterval(temp.start, temp.size);
- else
- remoteInterval.DelInterval(temp.start, temp.size);
- }
- // 如果对方新增了Block
- if(j == 0 && intervalNum > 0) {
- // 检查PushList状况,如果只剩0块了,就要求重新分配所有连接的块
- if(remotePush.GetValidSize() == 0 && comm->currRes)
- comm->p2pMgr.RedistributeAllBlocks(this);
- }
- }
- //comm->logFile.StatusOut("Recv P2P_REPORT from %s", comm->p2pMgr.FormatIPAddress(remotePeer));
- return MSG_COMPLETE;
- }
- MSG_STATE P2PClient::OnNearPeers() {
- // get size of peer list and move the pointer
- UINT8 listSize;
- CSClient::CopyMoveSrc(&listSize, recvPointer, sizeof(listSize));
- // 检查listSize是否正常
- if(static_cast<int>(listSize*sizeof(PeerInfoWithAddr)) > recvBuf+P2P_BUF_SIZE-recvPointer)
- return MSG_ERR_LIST_SIZE;
- // read peer one by one, and add to known peer list
- for(UINT8 i = 0; i < listSize; ++i) {
- PeerInfoWithAddr peer;
- CSClient::CopyMoveSrc(&peer, recvPointer, sizeof(peer));
- peer.isCachePeer = false;
- comm->p2pMgr.AddPeerInfo(peer);
- }
- comm->logFile.StatusOut("Recv P2P_NEAR_PEERS(%d NP) from %s.", listSize, comm->p2pMgr.FormatIPAddress(remotePeer));
- return MSG_COMPLETE;
- }
- MSG_STATE P2PClient::OnPushList() {
- // 当没有当前资源时,不给对方发送数据
- if(!comm->currRes)
- return MSG_SEND_ERR;
- // 是否更新整个PushList
- bool bRefresh;
- CSClient::CopyMoveSrc(&bRefresh, recvPointer, sizeof(bRefresh));
- if(bRefresh) {
- // 清空所有准备发送的块(已经开始发送的块,不用清空)
- while(DeleteSendingBlock(UINT_MAX));
- localPush.Clear();
- }
- UINT blockID = UINT_MAX;
- if(bRefresh) {
- UINT8 count;
- CSClient::CopyMoveSrc(&count, recvPointer, sizeof(count));
- assert(count < 30);
- for(UINT8 i = 0; i < count; ++i) {
- CSClient::CopyMoveSrc(&blockID, recvPointer, sizeof(blockID));
- // 查找本机有没有这个块,如果没有,立即告知对方;
- if(!comm->currRes->FindBlock(blockID)) {
- if(!SendResponse(blockID, false))
- return MSG_SEND_ERR;
- }
- else
- localPush.AddBlock(blockID);
- }
- if(count)
- comm->logFile.StatusOut("Got request refresh(%d...) from %s.", blockID, comm->p2pMgr.FormatIPAddress(remotePeer));
- }
- else {
- UINT8 count;
- CSClient::CopyMoveSrc(&count, recvPointer, sizeof(count));
- assert(count < 2);
- for(UINT8 i = 0; i < count; ++i) {
- CSClient::CopyMoveSrc(&blockID, recvPointer, sizeof(blockID));
- // 查找本机有没有这个块,如果没有,立即告知对方;
- if(!comm->currRes->FindBlock(blockID)) {
- if(!SendResponse(blockID, false))
- return MSG_SEND_ERR;
- }
- else
- localPush.AddBlock(blockID);
- }
- if(count)
- comm->logFile.StatusOut("Got request add(%d) from %s.", blockID, comm->p2pMgr.FormatIPAddress(remotePeer));
- CSClient::CopyMoveSrc(&count, recvPointer, sizeof(count));
- assert(count < 2);
- for(UINT8 i = 0; i < count; ++i) {
- CSClient::CopyMoveSrc(&blockID, recvPointer, sizeof(blockID));
- // 如果在localPush中查找并删除此块
- if(!localPush.DelBlock(blockID)) {
- // 如果在localPush中没有找到,则说明此块已经在发送列表中了,甚至可能已经被发送了
- // 尝试从发送列表中查找并删除此块
- if(!DeleteSendingBlock(blockID)) {
- comm->logFile.StatusOut("Ahhhhh..., the block has already been sent!");
- }
- }
- }
- if(count)
- comm->logFile.StatusOut("Got request del(%d) from %s.", blockID, comm->p2pMgr.FormatIPAddress(remotePeer));
- }
- // 发送push列表中的第一个块
- if(!SendFirstPushBlock())
- return MSG_SEND_ERR;
- return MSG_COMPLETE;
- }
- MSG_STATE P2PClient::OnResponse() {
- // 如果对方是CachePeer, 则第一个收到的消息应该在这里
- if(GetIsCachePeer())
- bGotFirstMsg = TRUE;
- MSG_STATE ret = MSG_COMPLETE;
- // 回应的BlockID和大小
- UINT blockID;
- CSClient::CopyMoveSrc(&blockID, recvPointer, sizeof(blockID));
- UINT blockSize;
- CSClient::CopyMoveSrc(&blockSize, recvPointer, sizeof(blockSize));
- // 检查Block大小是否正常
- if(static_cast<int>(blockSize) > recvBuf+P2P_BUF_SIZE-recvPointer) {
- ret = MSG_ERR_SIZE;
- assert(0);
- }
- // 检查对方返回的Block是不是在Push List之中,如果在,则删除之;
- // 如果不在,则检查是否在其他连接的Push List之中,如果在,则删除之
- DWORD reqTime;
- if(!remotePush.DelBlock(blockID, &reqTime))
- comm->p2pMgr.ReclaimBlocks(this, blockID);
- if(ret == MSG_COMPLETE) {
- if(blockSize > 0) {
- // 记录此次得到数据的时间
- lastRecvDataTime = timeGetTime();
- // 记录本次传输所用的时间
- m_transUsedTime += timeGetTime()-m_reqStartTime;
- //assert(lastRecvDataTime-m_reqStartTime < 25000);
- // 确实已经下载了,虽然可能发生存储错误
- comm->p2pMgr.AddBlockDataDown(blockSize);
- if(GetIsCachePeer())
- comm->p2pMgr.AddCPData(blockSize);
- // 存储此块
- P2P_RETURN_TYPE bSuccess = comm->currRes->PutBlock(blockID, blockSize, reinterpret_cast<PBYTE>(recvPointer));
- recvPointer += blockSize;
- if(bSuccess < PRT_OK) {
- if(bSuccess == PRT_BUFFER_FULL) {
- comm->logFile.StatusOut("缓冲区已满");
- Sleep(300);
- }
- else if(bSuccess == PRT_BAD_BLOCK) {
- if(!remotePeer.isCachePeer) {
- // 这个块是错误的,不再从这个NP下载此块
- remoteInterval.DelInterval(blockID, 1);
- }
- }
- else {
- comm->logFile.StatusErr("写入磁盘错误", GetLastError());
- // 发送写入磁盘错误消息
- comm->PostErrMessage(PNT_DISK_ERR, 0, true);
- ret = MSG_SAVEDATA_ERR;
- assert(0);
- }
- }
- char* temp = "";
- #ifdef _DEBUG
- MD5 md5(reinterpret_cast<BYTE*>(recvPointer-blockSize), blockSize);
- temp = md5.hex_digest();
- #endif
- comm->logFile.StatusOut("Got %sblock %d(%d)(%s)(used %dms) from %s.",
- (bSuccess==PRT_BAD_BLOCK?"bad ":""), blockID, blockSize, temp, lastRecvDataTime-m_reqStartTime,
- comm->p2pMgr.FormatIPAddress(remotePeer));
- #ifdef _DEBUG
- delete [] temp;
- // 记录下次传输开始的时间
- m_reqStartTime = timeGetTime();
- #endif
- }
- else {
- comm->logFile.StatusOut("Got block %d(%d) from %s.",
- blockID, blockSize, comm->p2pMgr.FormatIPAddress(remotePeer));
- }
- // 既然得到了数据,登录不上TS的问题就算了:)
- comm->csClient.ResetLoginFail();
- }
- if(ret == MSG_COMPLETE) {
- if(blockSize == 0) {
- // 对方没有这个块,从区间表中删除,并将此块交给其他连接
- remoteInterval.DelInterval(blockID, 1);
- }
- // 检查PushList状况,如果只剩0块了,就要求重新分配所有连接的块
- if(remotePush.GetValidSize() == 0)
- comm->p2pMgr.RedistributeAllBlocks(this);
- }
- return ret;
- }
- MSG_STATE P2PClient::OnMsg() {
- //assert(GetIsCachePeer());
- // 错误代码
- UINT16 errCode;
- CSClient::CopyMoveSrc(&errCode, recvPointer, sizeof(errCode));
- // 是否需要断开连接
- bool shouldDisconnect;
- CSClient::CopyMoveSrc(&shouldDisconnect, recvPointer, sizeof(shouldDisconnect));
- // 根据错误代码处理
- switch(errCode) {
- case ERR_PROTOCOL_FORMAT:
- sprintf(errStr, "协议错误");
- break;
- case ERR_AUTHORIZATION:
- sprintf(errStr, "验证错误");
- break;
- case ERR_INTERNAL:
- sprintf(errStr, "未知错误");
- break;
- case ERR_CONNECTION_FULL:
- sprintf(errStr, "对方连接已满");
- break;
- default:
- shouldDisconnect = true;
- }
- if(shouldDisconnect) {
- // 停止连接这个地址
- comm->p2pMgr.AddBadAddr(remotePeer);
- return MSG_REMOTE_ERR;
- }
- return MSG_COMPLETE;
- }
- MSG_STATE P2PClient::OnReqMedia() {
- UINT blockID = 0;
- CSClient::CopyMoveSrc(&blockID, recvPointer, sizeof(blockID));
- if(!SendMediaType(blockID))
- return MSG_SEND_ERR;
- return MSG_COMPLETE;
- }
- MSG_STATE P2PClient::OnMediaType() {
- assert(comm->currRes);
- if(!comm->currRes)
- return MSG_DIFF_CHNL;
- // 一些较早版本的客户端会读取gtv文件中的媒体类型,然后发送很大的mediainterval,这种消息不予接受
- if(remoteVersion < comm->cfgData.ADD_PROGNAME_VERSION) {
- return MSG_COMPLETE;
- }
- MediaInterval mInterval;
- // 区间
- CSClient::CopyMoveSrc(&mInterval.start, recvPointer, sizeof(mInterval.start));
- CSClient::CopyMoveSrc(&mInterval.size, recvPointer, sizeof(mInterval.size));
- UINT len = 0;
- CSClient::CopyMoveSrc(&len, recvPointer, sizeof(len));
- if(len > 1024) {
- assert(0);
- return MSG_ERR_LIST_SIZE;
- }
- // 首先读取视频编码格式
- CSClient::CopyMoveSrc(&mInterval.videoType, recvPointer, sizeof(mInterval.videoType));
- if(mInterval.videoType.cbFormat > 1024) {
- assert(0);
- return MSG_ERR_LIST_SIZE;
- }
- mInterval.videoData = new BYTE[mInterval.videoType.cbFormat];
- CSClient::CopyMoveSrc(mInterval.videoData, recvPointer, mInterval.videoType.cbFormat);
- // 然后读取音频编码格式
- CSClient::CopyMoveSrc(&mInterval.audioType, recvPointer, sizeof(mInterval.audioType));
- if(mInterval.audioType.cbFormat > 1024) {
- assert(0);
- return MSG_ERR_LIST_SIZE;
- }
- mInterval.audioData = new BYTE[mInterval.audioType.cbFormat];
- CSClient::CopyMoveSrc(mInterval.audioData, recvPointer, mInterval.audioType.cbFormat);
- assert(len == mInterval.videoType.cbFormat+sizeof(mInterval.videoType)+mInterval.audioType.cbFormat+sizeof(mInterval.audioType));
- // 超过ADD_PROGNAME_VERSION的版本将发送节目的名字
- if(remoteVersion > comm->cfgData.ADD_PROGNAME_VERSION) {
- CSClient::CopyMoveSrc(&mInterval.pnamesize, recvPointer, sizeof(mInterval.pnamesize));
- /// 直播频道没有轮播节目名
- if(mInterval.pnamesize && mInterval.pnamesize + recvPointer-recvBuf <= msgSize) {
- mInterval.pname = new char[mInterval.pnamesize+1];
- CSClient::CopyMoveSrc(mInterval.pname, recvPointer, mInterval.pnamesize);
- mInterval.pname[mInterval.pnamesize] = 0;
- }
- else
- mInterval.pnamesize = 0; // 既然不能复制, 一定要置空!
- // 超过ADD_PROGNAME_VERSION的版本将发送节目的时间长度和频道的名字
- if(remoteVersion > comm->cfgData.ADD_PROGTIME_VERSION) {
- CSClient::CopyMoveSrc(&mInterval.progtime, recvPointer, sizeof(mInterval.progtime));
- CSClient::CopyMoveSrc(&mInterval.cnamesize, recvPointer, sizeof(mInterval.cnamesize));
- if(mInterval.cnamesize && mInterval.cnamesize + recvPointer-recvBuf <= msgSize) {
- mInterval.cname = new char[mInterval.cnamesize+1];
- CSClient::CopyMoveSrc(mInterval.cname, recvPointer, mInterval.cnamesize);
- mInterval.cname[mInterval.cnamesize] = 0;
- }
- else
- mInterval.cnamesize = 0; // 既然不能复制, 一定要置空!
- }
- }
- // add to interval array
- comm->currRes->AddMediaInterval(mInterval);
- return MSG_COMPLETE;
- }
- BOOL P2PClient::SendHello() {
- // 不可能没有currRes
- if(!comm->currRes)
- return FALSE;
- TCPPacket* packet;
- if(!SendBegin(packet, P2P_HELLO))
- return FALSE;
- // NP version
- CSClient::CopyMoveDes(sendPointer, &comm->cfgData.COMMUNICATOR_VERSION, sizeof(float));
- // 当前资源的Hash码
- CSClient::CopyMoveDes(sendPointer, comm->currRes->GetHashCode().data(), MD5_LEN);
- // 是否被动连接
- CSClient::CopyMoveDes(sendPointer, &isPassive, sizeof(isPassive));
- // 本机信息
- PeerInfoWithAddr thisPeer;
- memcpy(static_cast<P2PAddress*>(&thisPeer), &comm->localAddress, sizeof(comm->localAddress));
- comm->p2pMgr.GetSelfInfo(thisPeer);
- CSClient::CopyMoveDes(sendPointer, &thisPeer, sizeof(thisPeer));
- // 如果对方是CachePeer,还要发送SP的地址列表
- if(GetIsCachePeer()) {
- NormalAddress* spIPList = comm->currRes->GetSPList();
- if(spIPList) {
- UINT8 spIPListSize = comm->currRes->GetSPListSize();
- CSClient::CopyMoveDes(sendPointer, &spIPListSize, sizeof(spIPListSize));
- for(UINT8 i = 0; i < spIPListSize; ++i)
- CSClient::CopyMoveDes(sendPointer, &spIPList[i], sizeof(spIPList[i]));
- }
- }
- comm->logFile.StatusOut("Sending P2P_HELLO to %s.", comm->p2pMgr.FormatIPAddress(remotePeer));
- SendEnd(packet);
- return TRUE;
- }
- BOOL P2PClient::SendReport(const CorePeerInfo& peer, bool bRefresh) {
- TCPPacket* packet;
- if(!SendBegin(packet, P2P_REPORT))
- return FALSE;
- // PeerInfoWithAddr of local peer
- CSClient::CopyMoveDes(sendPointer, &peer, sizeof(peer));
- // 如果没有当前资源,则发送bRefresh=true和intervalNum=0
- if(!comm->currRes)
- bRefresh = true;
- // 是否更新全部区间列表
- CSClient::CopyMoveDes(sendPointer, &bRefresh, sizeof(bRefresh));
- // send block intervals
- // TODO: 这里存在重复代码,有待改进
- UINT8 intervalNum = comm->currRes?0xff:0;
- if(bRefresh) {
- if(comm->currRes) {
- // 如果是更新全部区间,则取得当前资源全部区间的列表,并发送
- comm->currRes->GetAllIntervals((BlockInterval*)(sendPointer+sizeof(intervalNum)), intervalNum);
- }
- CSClient::CopyMoveDes(sendPointer, &intervalNum, sizeof(intervalNum));
- sendPointer += intervalNum*sizeof(BlockInterval);
- }
- else {
- // 如果是更新变化的区间,则先后写入增加的区间和减少的区间
- if(comm->currRes)
- comm->currRes->GetDiffIntervals((BlockInterval*)(sendPointer+sizeof(intervalNum)), intervalNum, false, true);
- CSClient::CopyMoveDes(sendPointer, &intervalNum, sizeof(intervalNum));
- sendPointer += intervalNum*sizeof(BlockInterval);
- if(comm->currRes)
- comm->currRes->GetDiffIntervals((BlockInterval*)(sendPointer+sizeof(intervalNum)), intervalNum, false, false);
- CSClient::CopyMoveDes(sendPointer, &intervalNum, sizeof(intervalNum));
- sendPointer += intervalNum*sizeof(BlockInterval);
- }
- SendEnd(packet);
- //comm->logFile.StatusOut("Sending Report To %s", comm->p2pMgr.FormatIPAddress(remotePeer));
- return TRUE;
- }
- BOOL P2PClient::SendSPUpdate(const SPUpdate& spUpdate, UINT8 selfLayer, BYTE sum) {
- // 如果对方比自己离CP更近,那就不用发了
- if(selfLayer > remotePeer.layer)
- return TRUE;
- if(!isSameRes)
- return TRUE;
- TCPPacket* packet;
- if(!SendBegin(packet, P2P_SPUPDATE))
- return FALSE;
- // SPUpdate
- CSClient::CopyMoveDes(sendPointer, &spUpdate, sizeof(spUpdate));
- // SPUpdate sum
- CSClient::CopyMoveDes(sendPointer, &sum, sizeof(sum));
- SendEnd(packet);
- return TRUE;
- }
- BOOL P2PClient::SendNearPeers() {
- // 对方资源与本机相同时,才能发送有用的PeerInfo给它
- if(!isSameRes)
- return TRUE;
- TCPPacket* packet;
- if(!SendBegin(packet, P2P_NEAR_PEERS))
- return FALSE;
- list<PeerInfoWithAddr> peerList;
- comm->p2pMgr.GetPeersForNeighbour(peerList, 20, remotePeer);
- // 如果PeerList中元素个数大于0xff,则删除更多的元素
- UINT8 listSize = min(0xff, peerList.size());
- peerList.resize(listSize);
- if(listSize == 0) {
- // 释放这个Packet
- comm->p2pMgr.ReleasePacket(packet);
- return TRUE;
- }
- // 列表大小
- CSClient::CopyMoveDes(sendPointer, &listSize, sizeof(listSize));
- // write peerInfo one by one
- for(PeerAddrIt it = peerList.begin(); it != peerList.end(); ++it) {
- CSClient::CopyMoveDes(sendPointer, &*it, sizeof(PeerInfoWithAddr));
- }
- comm->logFile.StatusOut("Sending P2P_NEAR_PEERS to %s.", comm->p2pMgr.FormatIPAddress(remotePeer));
- SendEnd(packet);
- return TRUE;
- }
- BOOL P2PClient::SendPushList(UINT blockID, bool bAdd) {
- // 是否更新全部
- bool bRefresh = (blockID == UINT_MAX);
- // 如果大小为零就不必更新
- if(bRefresh && remotePush.GetValidSize() == 0)
- return TRUE;
- TCPPacket* packet;
- if(!SendBegin(packet, P2P_PUSHLIST))
- return FALSE;
- CSClient::CopyMoveDes(sendPointer, &bRefresh, sizeof(bRefresh));
- if(bRefresh) {
- // 更新整个PushList
- UINT8 count = 0xff;
- remotePush.CopyPushList(reinterpret_cast<UINT*>(sendPointer+sizeof(count)), count);
- CSClient::CopyMoveDes(sendPointer, &count, sizeof(count));
- sendPointer += count*sizeof(UINT);
- comm->logFile.StatusOut("Sending P2P_PUSHLIST %s block(%d) ... to %s.",
- bRefresh?"Refresh":(bAdd?"Add":"Del"), count, comm->p2pMgr.FormatIPAddress(remotePeer));
- }
- else {
- UINT8 count = bAdd?1:0;
- CSClient::CopyMoveDes(sendPointer, &count, sizeof(count));
- if(bAdd)
- CSClient::CopyMoveDes(sendPointer, &blockID, sizeof(blockID));
- count = bAdd?0:1;
- CSClient::CopyMoveDes(sendPointer, &count, sizeof(count));
- if(!bAdd)
- CSClient::CopyMoveDes(sendPointer, &blockID, sizeof(blockID));
- comm->logFile.StatusOut("Sending P2P_PUSHLIST %s block %d to %s.",
- bAdd?"Add":"Del",blockID, comm->p2pMgr.FormatIPAddress(remotePeer));
- }
- SendEnd(packet);
- return TRUE;
- }
- BOOL P2PClient::SendResponse(UINT blockID, bool tryGetData) {
- // 此块所在的区间,其媒体类型是否已经发送过了
- if(!sentMediaArray.FindBlock(blockID)) {
- // 没有发送过,则发送此块所在区间的媒体类型
- if(!SendMediaType(blockID))
- return FALSE;
- }
- TCPPacket* packet;
- if(!SendBegin(packet, P2P_RESPONSE))
- return FALSE;
- // 请求的BlockID
- CSClient::CopyMoveDes(sendPointer, &blockID, sizeof(blockID));
- // 检查是否存在这个Block,取出他
- UINT blockSize = 0;
- // 如果需要获取数据,则尝试获取数据;如果只是为了告诉对方此块不存在,则不作尝试;
- if(comm->currRes && tryGetData) {
- P2P_RETURN_TYPE ret = comm->currRes->GetBlock(blockID, blockSize, sendPointer+sizeof(blockSize));
- if(ret < PRT_OK) {
- // 一定要释放这个packet
- comm->p2pMgr.ReleasePacket(packet);
- return ret;
- }
- }
- // write block size (bSize==0 means no data)
- CSClient::CopyMoveDes(sendPointer, &blockSize, sizeof(blockSize));
- // 将指针移动到数据的末尾
- sendPointer += blockSize;
- comm->logFile.StatusOut("Sending P2P_RESPONSE block(%d)(%d) to %s.",
- blockID, blockSize, comm->p2pMgr.FormatIPAddress(remotePeer));
- if(blockSize > 0) {
- // 记录此次发送数据的时间
- lastSendDataTime = timeGetTime();
- }
- SendEnd(packet);
- if(tryGetData && blockSize == 0) {
- // 如果想要发送这个块,结果发现本机没有,则尝试发送下一个块
- return SendFirstPushBlock();
- }
- return TRUE;
- }
- BOOL P2PClient::SendReqMedia(UINT blockID) {
- TCPPacket* packet;
- if(!SendBegin(packet, P2P_REQMEDIA))
- return FALSE;
- CSClient::CopyMoveDes(sendPointer, &blockID, sizeof(blockID));
- SendEnd(packet);
- return TRUE;
- }
- BOOL P2PClient::SendMediaType(UINT blockID) {
- if(!comm->currRes)
- return FALSE;
- // 查找媒体类型
- MediaInterval mInterval;
- if(!comm->currRes->GetMediaInterval(blockID, mInterval))
- return TRUE;
- TCPPacket* packet;
- if(!SendBegin(packet, P2P_MEDIATYPE))
- return FALSE;
- // 写入区间
- CSClient::CopyMoveDes(sendPointer, &mInterval.start, sizeof(mInterval.start));
- CSClient::CopyMoveDes(sendPointer, &mInterval.size, sizeof(mInterval.size));
- // 写入媒体数据长度
- UINT len = sizeof(mInterval.videoType)+mInterval.videoType.cbFormat+sizeof(mInterval.audioType)+mInterval.audioType.cbFormat;
- CSClient::CopyMoveDes(sendPointer, &len, sizeof(len));
- // 写入媒体数据
- CSClient::CopyMoveDes(sendPointer, &mInterval.videoType, sizeof(mInterval.videoType));
- CSClient::CopyMoveDes(sendPointer, mInterval.videoData, mInterval.videoType.cbFormat);
- CSClient::CopyMoveDes(sendPointer, &mInterval.audioType, sizeof(mInterval.audioType));
- CSClient::CopyMoveDes(sendPointer, mInterval.audioData, mInterval.audioType.cbFormat);
- // 超过ADD_PROGNAME_VERSION的版本将发送节目的名字
- if(remoteVersion > comm->cfgData.ADD_PROGNAME_VERSION) {
- // 写入节目的名字
- CSClient::CopyMoveDes(sendPointer, &mInterval.pnamesize, sizeof(mInterval.pnamesize));
- CSClient::CopyMoveDes(sendPointer, mInterval.pname, mInterval.pnamesize);
- // 超过ADD_PROGTIME_VERSION的版本将发送节目的时间和频道名
- if(remoteVersion > comm->cfgData.ADD_PROGTIME_VERSION) {
- // 写入节目的时间长度
- CSClient::CopyMoveDes(sendPointer, &mInterval.progtime, sizeof(mInterval.progtime));
- // 写入频道的名字
- CSClient::CopyMoveDes(sendPointer, &mInterval.cnamesize, sizeof(mInterval.cnamesize));
- CSClient::CopyMoveDes(sendPointer, mInterval.cname, mInterval.cnamesize);
- }
- }
- SendEnd(packet);
- sentMediaArray.AddInterval(mInterval);
- return TRUE;
- }
- BOOL P2PClient::SendBegin(TCPPacket*& packet, UINT8 msgType) {
- packet= comm->p2pMgr.AllocatePacket();
- if(!packet)
- return FALSE;
- // 先留着消息大小不写,到最后再写
- sendPointer = packet->buf+sizeof(UINT);
- CSClient::CopyMoveDes(sendPointer, &msgType, sizeof(msgType));
- return TRUE;
- }
- void P2PClient::SendEnd(TCPPacket*& packet) {
- // 消息的大小就是移动的指针减去初始的指针
- packet->size = sendPointer-packet->buf;
- packet->sent = 0;
- memcpy(packet->buf, &packet->size, sizeof(packet->size));
- m_sendList.push_back(packet);
- }
- // 添加一个要Push的块
- bool P2PClient::AddPushBlock(UINT blockID, bool bRefreshing) {
- if(!remotePush.AddBlock(blockID, timeGetTime()))
- return false;
- if(remotePush.GetValidSize() == 1) {
- // pushlist大小从0到1,说明新的下载即将开始
- lastRecvDataHdTime = timeGetTime();
- // pushlist大小从0到1,记录此时的时间,用于统计实际传输速度
- m_reqStartTime = timeGetTime();
- //comm->logFile.StatusOut("Request Start Time %d", m_reqStartTime);
- }
- if(!bRefreshing) {
- // 如果不是正在更新整个PushList,就立即发送消息
- if(!SendPushList(blockID, true)) {
- if(!remotePush.DelBlock(blockID))
- assert(0);
- return false;
- }
- }
- return true;
- }
- // 查找一个PushList中的块
- bool P2PClient::FindPushBlock(const UINT blockID) const {
- return remotePush.FindBlock(blockID);
- }
- // 删除一个要Push的块
- bool P2PClient::DelPushBlock(UINT blockID) {
- // 从PushList中查找并删除此块
- if(!remotePush.DelBlock(blockID))
- return false;
- // 发送删除一个块的消息
- return SendPushList(blockID, false) == TRUE;
- }
- // Push List是否已满
- bool P2PClient::IsPushListFull() {
- return (remotePush.GetValidSize() == remotePush.GetTotalSize());
- }
- // 取得Pushlist大小
- UINT8 P2PClient::GetTotalPushSize() {
- return remotePush.GetTotalSize();
- }
- // 清空PushList
- void P2PClient::ClearPush() {
- remotePush.Clear();
- }
- // 发送整个PushList,因为之前重新分配了所有连接的块
- void P2PClient::SendPush() {
- if(!SendPushList(UINT_MAX, false))
- assert(0);
- }
- // 重新加载PushList,只有在普通的下载时才会被调用
- void P2PClient::ReloadPushList() {
- if(remotePush.GetValidSize() > 0)
- return;
- UINT array[P2P_HIGH_PUSH];
- std::fill(array, array+P2P_HIGH_PUSH, UINT_MAX);
- UINT count = 0;
- // 计算此连接一次push的大小
- UINT pushNum = P2P_MID_PUSH; // default
- // 注意两者单位不同,前者是BPS, 后者是KBPS
- if(avgDownSpeed/1024/comm->currRes->GetBitRate() < 0.1667f)
- pushNum = P2P_LOW_PUSH;
- else if(avgDownSpeed/1024/comm->currRes->GetBitRate() > 0.5f)
- pushNum = P2P_HIGH_PUSH;
- //else
- // the default value
- // 1. 获取可以下载的区间列表
- IntervalArray downloadable;
- comm->currRes->GetDownloadableArray(remoteInterval, downloadable);
- if(downloadable.IsEmpty()) {
- return;
- }
- // 2. 遍历此区间列表,根据条件排除不可以下载的块,直到遍历完毕或者找到pushNum个Block
- BlockInterval temp;
- while(!downloadable.IsEmpty() && count < pushNum) {
- downloadable.PopFront(temp);
- if(temp.size == 0)
- break;
- for(UINT i = temp.start; i < temp.start+temp.size && count < pushNum; ++i) {
- // 如果此块有别人正在下载,则去除此块
- if(comm->p2pMgr.FindInAllPushList(i, this))
- continue;
- // 如果有其他NP有此块,那么就不要从CP上下载了, 前提是缓冲进度100%而且有此块的NP下载速度大于5.0KBPS
- if(GetIsCachePeer() &&
- comm->currRes->GetBufferPercent() == 100 &&
- comm->p2pMgr.FindInAllRemoteInterval(i, this, false, 5.0f) > 0)
- continue;
- // 如果是外网节点,且内网节点上有,则不从这下载
- if (GetIsSameLan() &&
- comm->p2pMgr.FindInAllLanRemoteInterval(i, this, 5.0f) > 0)
- continue;
- // 记录一个可以下载的块
- array[count++] = i;
- }
- }
- if(count) {
- // 根据缓冲情况决定是否要随机下载
- if(comm->currRes->GetBufferPercent() == 100) {
- random_shuffle(array, array+count);
- comm->logFile.StatusOut("Do by random %d.", pushNum);
- }
- else
- comm->logFile.StatusOut("Do by order %d.", pushNum);
- // 添加到pushlist中,并发送
- UINT addNum = 0;
- for(UINT i = 0; i < count && addNum < pushNum; ++i) {
- if(array[i] != UINT_MAX) {
- if(!AddPushBlock(array[i], true))
- break;
- addNum++;
- }
- }
- if(addNum)
- SendPush();
- }
- }
- // 发送push列表中的第一个块
- BOOL P2PClient::SendFirstPushBlock() {
- // 是否有需要发送的块,如果有,取出第一个(也是最小的一个)
- UINT blockID;
- localPush.GetFirstBlock(blockID);
- if(blockID == UINT_MAX)
- return TRUE;
- // 检查当前发送列表中是否已经有本连接的块,
- // 1. 如果没有,那么发送;
- // 2. 如果有,则暂时不发送
- if(!CheckSendingBlock())
- return SendResponse(blockID, true);
- localPush.AddBlock(blockID);
- return TRUE;
- }
- BOOL P2PClient::IsIdleTooLong() {
- DWORD tNow = timeGetTime();
- // 对于Listener, 是上次发送数据的时间;对于Connector,是上次收到数据的时间
- //int transferIdle = isIncoming?tNow-lastSendDataTime:tNow-lastRecvDataTime;
- int transferIdle = min(tNow-lastSendDataTime, tNow-lastRecvDataTime);
- // 明明push list非空,但是却没有收到数据头部的时间
- int responseIdle = tNow-lastRecvDataHdTime;
- GenerateTransferInfo(TRUE);
- TransferInfo tiTemp;
- GetTransferInfo(tiTemp);
- comm->logFile.StatusOut("%s: transfer idle %d. response idle %d. rn
- tcurr down/up:%.2f/%.2f, avg down/up:%.2f/%.2f, total down/up:%.2f/%.2fMB. layer: %d. bandwidth: %.4fKBPS %s.",
- comm->p2pMgr.FormatIPAddress(remotePeer),
- transferIdle, responseIdle,
- tiTemp.currDownSpeed/1024, tiTemp.currUpSpeed/1024,
- tiTemp.avgDownSpeed/1024, tiTemp.avgUpSpeed/1024,
- tiTemp.totalDownBytes/1024.f/1024.f, tiTemp.totalUpBytes/1024.f/1024.f,
- GetLayer(), GetBandWidth(), isIncoming?"in":"out");
- BOOL kill = FALSE;
- if(bGotFirstMsg) {
- if(isIncoming)
- kill = (transferIdle > MAX_INCOMING_SENDDATA_IDLE);
- else
- kill = (transferIdle > MAX_OUTGOING_RECVDATA_IDLE);
- // TOOD: 如果pushlist非空,但是有一段时间没有数据的头部了,那么就杀掉此连接
- // 注意此timeout时间比transfer timeout要短一些
- kill = kill || (remotePush.GetValidSize() != 0 && responseIdle > MAX_RESPONSE_IDLE);
- }
- else {
- int startIdle = transferIdle;
- if(GetIsCachePeer()) {
- kill = (startIdle > MAX_CP_FIRST_MSG_IDLE);
- }
- else
- kill = (startIdle > MAX_NP_FIRST_MSG_IDLE);
- }
- if(kill) {
- comm->logFile.StatusOut("Kill Idle %s.", comm->p2pMgr.FormatIPAddress(remotePeer));
- }
- return kill;
- }
- // 检查当前发送列表中是否已经有某连接的块,
- // 1. 如果有,返回TRUE
- // 2. 如果没有,那么返回FALSE;
- BOOL P2PClient::CheckSendingBlock() {
- TCPPacket* packet = NULL;
- UINT temp;
- for(TCPPackIt it = m_sendList.begin(); it != m_sendList.end(); ++it) {
- packet = *it;
- // 如果消息类型不是P2P_RESPONSE,接着找下一个
- if(packet->buf[4] != P2P_RESPONSE)
- continue;
- // 如果blockSize为0,接着找下一个
- memcpy(&temp, packet->GetBlockSize(), sizeof(temp));
- if(temp == 0)
- continue;
- // 已经有准备发送的块
- return TRUE;
- }
- // 没有找到,说明可以发送blockID
- return FALSE;
- }
- // 从发送列表中查找一个块,如果此块准备发送(packet->send == 0),则删除并返回TRUE;
- // 如果此块已经开始发送,则直接返回TRUE。如未找到,则返回FALSE
- // 如果blockID是UINT_MAX,则删除任意一个准备发送的块
- BOOL P2PClient::DeleteSendingBlock(UINT blockID) {
- TCPPacket* packet = NULL;
- UINT temp;
- UINT index = 0;
- for(TCPPackIt it = m_sendList.begin(); it != m_sendList.end(); ++it, index++) {
- packet = *it;
- // 如果消息类型不是P2P_RESPONSE,接着找下一个
- if(packet->GetMsgType() != P2P_RESPONSE)
- continue;
- // 如果不是目标块,则接着找下一个
- // 如果blockID是UINT_MAX,则删除所有准备发送的块
- memcpy(&temp, packet->buf+5, sizeof(temp));
- if(temp != blockID && blockID != UINT_MAX)
- continue;
- // 如果已经开始发送,就不能打断,返回TRUE
- if(packet->sent > 0)
- return TRUE;
- // 如果是尚未开始发送的块,则删除,返回TRUE
- comm->p2pMgr.ReleasePacket(packet);
- m_sendList.erase(it);
- return TRUE;
- }
- // 没有找到
- return FALSE;
- }
- }