CSClient.cpp
资源名称:p2p_vod.rar [点击查看]
上传用户:liguizhu
上传日期:2015-11-01
资源大小:2422k
文件大小:18k
源码类别:
P2P编程
开发平台:
Visual C++
- /*
- * Openmysee
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 2 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
- *
- */
- #include "StdAfx.h"
- #include "CSClient.h"
- #include "Communicator.h"
- namespace NPLayer1 {
- CSClient::CSClient(Communicator* c) :comm(c) {
- memset(recvBuf, 0, MAX_UDP_LEN);
- maxFailCount = MAX_LOGIN_FAIL_COUNT;
- bStopLogin = false;
- }
- CSClient::~CSClient() {
- if(m_Socket != INVALID_SOCKET)
- closesocket(m_Socket);
- }
- P2P_RETURN_TYPE CSClient::Init() {
- // 初始化成员
- m_Socket = INVALID_SOCKET;
- isReqResSucceed = false;
- content = recvBuf;
- isLogin = false;
- loginFailCount = 0;
- // Create UDP Socket
- m_Socket = socket(AF_INET, SOCK_DGRAM, 0);
- if (m_Socket == INVALID_SOCKET) {
- comm->logFile.StatusErr("Creating UDP socket", WSAGetLastError());
- return PRT_NET;
- }
- // SO_EXCLUSIVEADDRUSE is only supported in 2000/XP/2003 and higher OS.
- if(comm->osvi.dwMajorVersion > 5) {
- // 防止窃听, 设定独占端口
- BOOL bExAddrUse = TRUE;
- if(setsockopt(m_Socket, SOL_SOCKET, ((int)(~SO_REUSEADDR)), (const char*)&bExAddrUse, sizeof(BOOL)) == SOCKET_ERROR) {
- comm->logFile.StatusErr("Setting UDP socket as SO_EXCLUSIVEADDRUSE", WSAGetLastError());
- return PRT_NET;
- }
- }
- //int newSize = SO_MAX_MSG_SIZE*100;
- //setsockopt(m_Socket, SOL_SOCKET, SO_RCVBUF, (const char*)&newSize, sizeof(int));
- //setsockopt(m_Socket, SOL_SOCKET, SO_SNDBUF, (const char*)&newSize, sizeof(int));
- sockaddr_in addr;
- memset(&addr, 0, sizeof(addr));
- addr.sin_family = AF_INET;
- // 绑定所有本机IP
- addr.sin_addr.s_addr = INADDR_ANY;
- USHORT port = 0;
- for(int i = 0; i < 1000; ++i) {
- port = static_cast<USHORT>(rand(&comm->ctx))%10000 + 50000;
- addr.sin_port = htons(port);
- if(bind(m_Socket, (sockaddr*)&addr, sizeof(addr)) != SOCKET_ERROR)
- break;
- comm->logFile.StatusOut("Bind UDP socket at port %d. Failed", port);
- }
- if(i == 1000)
- return PRT_INIT_BIND;
- comm->logFile.StatusOut("Bind UDP socket at port %d.", port);
- return PRT_OK;
- }
- BOOL CSClient::ParseMsg(sockaddr_in& addr, int& addrlen) {
- CriticalSection::Owner lock(sendCS);
- int recvSize = recvfrom(m_Socket, recvBuf, MAX_UDP_LEN, 0, (sockaddr*)&addr, &addrlen);
- if(recvSize == SOCKET_ERROR)
- return FALSE;
- // 如果来源地址不是TS,就返回
- if(memcmp(&addr, &comm->trackerIP, sizeof(comm->trackerIP)) != 0)
- return FALSE;
- // 如果过小,则不是正常的包
- if(recvSize < sizeof(UINT)+sizeof(UINT8))
- return FALSE;
- // 把移动指针放到数据的起始地址
- content = recvBuf;
- // 读取消息大小
- UINT msgSize;
- CopyMoveSrc(&msgSize, content, sizeof(msgSize));
- if(msgSize != recvSize)
- return FALSE;
- // 读取消息类型
- UINT8 msgType;
- CopyMoveSrc(&msgType, content, sizeof(msgType));
- if(!SwitchMsg(msgType)) {
- // 重新登录
- SendLogout();
- SendLogin();
- return FALSE;
- }
- return TRUE;
- }
- BOOL CSClient::SwitchMsg(UINT8 msgType) {
- switch(msgType) {
- case TS2NP_WELCOME:
- if(!OnWelcome())
- return FALSE;
- break;
- case TS2NP_PEERS:
- if(!OnPeers())
- return FALSE;
- break;
- case TS2NP_CONNECT_TO:
- if(!OnConnectTo())
- return FALSE;
- break;
- case TS2NP_RESINFO:
- if(!OnResInfo())
- return FALSE;
- break;
- case TS2NP_MSG:
- if(!OnMsg())
- return FALSE;
- break;
- default:
- return FALSE;
- }
- return TRUE;
- }
- BOOL CSClient::OnWelcome() {
- // 复制验证码
- CopyMoveSrc(checkCode, content, 7);
- // 复制本机地址
- CopyMoveSrc(&comm->localAddress, content, sizeof(comm->localAddress));
- assert(comm->localAddress.outerIP.sin_addr.s_addr != 0);
- comm->logFile.StatusOut("CSClient Logon TS...%s", comm->p2pMgr.FormatIPAddress(comm->localAddress));
- isLogin = true;
- loginFailCount = 0;
- lastRecvPeers = timeGetTime();
- // 如果当前资源没有告知TS,则发送ReqRes
- if(!isReqResSucceed && comm->currRes) {
- SendReqRes();
- }
- return TRUE;
- }
- BOOL CSClient::OnPeers() {
- // 是否需要尝试连接CP
- bool tryCP = false;
- // 复制CP地址个数
- UINT8 listSize = 0;
- CopyMoveSrc(&listSize, content, sizeof(listSize));
- if(listSize > 0 && listSize < 100) {
- // 复制CP地址
- for(UINT8 i = 0; i < listSize; ++i) {
- PeerInfoWithAddr nAddr;
- CopyMoveSrc(&nAddr.outerIP, content, sizeof(NormalAddress));
- nAddr.subnetIP.sin_addr.s_addr = 0xffffffff;
- nAddr.subnetIP.sin_port = nAddr.outerIP.sin_port;
- comm->logFile.StatusOut("CSClient got CP address %s...", comm->p2pMgr.FormatIPAddress(nAddr));
- nAddr.layer = 0;
- nAddr.isCachePeer = true;
- if(nAddr.outerIP.sin_addr.s_addr != 0) {
- comm->p2pMgr.AddPeerInfo(nAddr);
- // 如果得到CP地址,就尝试连接
- tryCP = true;
- }
- }
- }
- // 复制NP地址个数
- CopyMoveSrc(&listSize, content, sizeof(listSize));
- if(listSize > 0 && listSize < 100) {
- for(UINT8 i = 0; i < listSize; ++i) {
- PeerInfoWithAddr nAddr;
- CopyMoveSrc(&nAddr, content, sizeof(nAddr));
- assert(nAddr.outerIP.sin_addr.s_addr != 0);
- nAddr.isCachePeer = false;
- // put in known np list
- comm->p2pMgr.AddPeerInfo(nAddr);
- }
- comm->logFile.StatusOut("CSClient got %d NP.", listSize);
- }
- lastRecvPeers = timeGetTime();
- // 尝试连接CP
- if(tryCP)
- comm->tryClient.Try();
- return TRUE;
- }
- BOOL CSClient::OnConnectTo() {
- // 复制NP的地址
- P2PAddress nAddr;
- CopyMoveSrc(&nAddr, content, sizeof(nAddr));
- // 是否使用free outgoing
- bool connectForFree;
- CopyMoveSrc(&connectForFree, content, sizeof(connectForFree));
- // add to connectto peer list
- comm->p2pMgr.AddConnectTo(nAddr, connectForFree);
- comm->logFile.StatusOut("CSClient got ConnectTo.");
- return TRUE;
- }
- BOOL CSClient::OnResInfo() {
- // TODO: 接收返回的资源信息
- return TRUE;
- }
- BOOL CSClient::OnMsg() {
- // 复制错误代码
- UINT16 errCode;
- CopyMoveSrc(&errCode, content, sizeof(errCode));
- // 是否需要退出
- bool shouldQuit;
- CopyMoveSrc(&shouldQuit, content, sizeof(shouldQuit));
- // 是否不能继续登录
- bool bShouldStopLogin = false;
- // 根据错误代码处理
- switch(errCode) {
- case ERR_PROTOCOL_FORMAT:
- comm->logFile.StatusOut("TS sent ERR_PROTOCOL_FORMAT error!");
- bShouldStopLogin = true;
- break;
- case ERR_AUTHORIZATION:
- comm->logFile.StatusOut("TS sent ERR_AUTHORIZATION error!");
- bShouldStopLogin = true;
- break;
- case ERR_INTERNAL:
- comm->logFile.StatusOut("TS sent ERR_INTERNAL error!");
- break;
- case ERR_LOW_VERSION:
- comm->logFile.StatusOut("TS sent ERR_LOW_VERSION error!");
- // 发送版本太低的消息给外界
- comm->PostErrMessage(PNT_LOW_VERSION, 0, true);
- bShouldStopLogin = true;
- break;
- case ERR_NO_SUCH_PEER:
- comm->logFile.StatusOut("TS sent ERR_NO_SUCH_PEER error!");
- break;
- case ERR_NO_SUCH_RES:
- comm->logFile.StatusOut("TS sent ERR_NO_SUCH_RES error!");
- break;
- case ERR_CHECK_BYTES:
- comm->logFile.StatusOut("TS sent ERR_CHECK_BYTES error!");
- break;
- case ERR_ADD_RES_OK:
- // 请求资源成功!
- isReqResSucceed = true;
- comm->logFile.StatusOut("TS sent ERR_ADD_RES_OK!");
- // 发送第一次Report
- SendReport(true);
- break;
- default:
- shouldQuit = true;
- }
- if(shouldQuit) {
- isLogin = false;
- loginFailCount = 0;
- }
- if(bShouldStopLogin)
- bStopLogin = true;
- return TRUE;
- }
- void CSClient::SendLogin() {
- CriticalSection::Owner lock(sendCS);
- if(isLogin)
- return;
- if(bStopLogin)
- return;
- if(!SendBegin(NP2TS_LOGIN))
- return;
- // 用户ID和密码
- CopyMoveDes(content, &comm->userID, sizeof(comm->userID));
- CopyMoveDes(content, comm->userPass, MD5_LEN);
- // 版本号和监听端口
- CopyMoveDes(content, &comm->cfgData.COMMUNICATOR_VERSION,
- sizeof(comm->cfgData.COMMUNICATOR_VERSION));
- // 写入监听端口
- CopyMoveDes(content, &comm->localAddress.subnetIP.sin_port, sizeof(comm->localAddress.subnetIP.sin_port));
- // 本机IP列表
- in_addr* addrList = NULL;
- UINT8 ipSize = GetLocalIPList(addrList);
- if(ipSize == 0) {
- comm->logFile.StatusErr("Get Local IP List", WSAGetLastError());
- return;
- }
- CopyMoveDes(content, &ipSize, sizeof(ipSize));
- CopyMoveDes(content, addrList, ipSize*sizeof(in_addr));
- delete [] addrList;
- addrList = NULL;
- comm->logFile.StatusOut("CSClient sent NP2TS_LOGIN.");
- isReqResSucceed = false;
- if(loginFailCount == 2) {
- // 两次登录失败,设置默认CP地址
- if(comm->currRes)
- comm->currRes->SetDefaultCP();
- }
- loginFailCount++;
- if(loginFailCount == maxFailCount) {
- // 发送登陆失败的消息给外界
- comm->PostErrMessage(PNT_CANNOT_LOGON_TS, 0, true);
- loginFailCount = 0;
- // 按MAX_LOGIN_FAIL_COUNT倍递增
- maxFailCount *= MAX_LOGIN_FAIL_COUNT;
- }
- SendEnd();
- }
- void CSClient::SendResList() {
- CriticalSection::Owner lock(sendCS);
- if(!isLogin || !comm->currRes)
- return;
- if(!SendBegin(NP2TS_RES_LIST))
- return;
- // 要共享的资源个数
- UINT8 resNum = 1;
- CopyMoveDes(content, &resNum, sizeof(resNum));
- // 依次写入所有要共享的资源
- CopyMoveDes(content, comm->currRes->GetHashCode().data(), MD5_LEN);
- // send block intervals
- UINT8 intervalNum = MAX_BLOCK_INTERVALS;
- comm->currRes->GetAllIntervals((BlockInterval*)(content+sizeof(intervalNum)), intervalNum);
- CopyMoveDes(content, &intervalNum, sizeof(intervalNum));
- content += intervalNum*sizeof(BlockInterval);
- comm->logFile.StatusOut("CSClient sent NP2TS_RES_LIST.");
- SendEnd();
- }
- void CSClient::SendReqRes() {
- CriticalSection::Owner lock(sendCS);
- assert(comm->currRes);
- // 如果已经登录,则发送请求资源;否则,登录成功之后再发送
- if(!isLogin) {
- comm->logFile.StatusOut("NOT LOGIN YET, first request will be sent after recving TS2NP_WELCOME...");
- return;
- }
- if(!comm->currRes)
- return;
- if(!SendBegin(NP2TS_REQ_RES))
- return;
- // request resource md5
- CopyMoveDes(content, comm->currRes->GetHashCode().data(), MD5_LEN);
- // send block intervals
- UINT8 intervalNum = MAX_BLOCK_INTERVALS;
- comm->currRes->GetAllIntervals((BlockInterval*)(content+sizeof(intervalNum)), intervalNum);
- CopyMoveDes(content, &intervalNum, sizeof(intervalNum));
- content += intervalNum*sizeof(BlockInterval);
- // current block ID
- UINT currBlockID = comm->currRes->GetPlayingBlock();
- CopyMoveDes(content, &currBlockID, sizeof(currBlockID));
- // need CP?
- bool needCP = (comm->p2pMgr.GetCPConCount(true) < MAX_CONNECTION_OF_CP);
- CopyMoveDes(content, &needCP, sizeof(needCP));
- comm->logFile.StatusOut("CSClient sent NP2TS_REQ_RES.");
- SendEnd();
- // 发送了请求资源的消息,等待请求成功的消息返回
- isReqResSucceed = false;
- }
- void CSClient::SendDelRes(BaseResource* res) {
- CriticalSection::Owner lock(sendCS);
- if(!res || !isLogin)
- return;
- if(!SendBegin(NP2TS_DEL_RES))
- return;
- // request resource md5
- CopyMoveDes(content, res->GetHashCode().data(), MD5_LEN);
- comm->logFile.StatusOut("CSClient sent NP2TS_DEL_RES(%s).", res->GetHashCode().data());
- SendEnd();
- }
- void CSClient::SendReport(bool bRefresh) {
- CriticalSection::Owner lock(sendCS);
- if(!isLogin)
- return;
- if(!SendBegin(NP2TS_REPORT))
- return;
- // 本机信息
- CorePeerInfo thisPeer;
- comm->p2pMgr.GetSelfInfo(thisPeer);
- CopyMoveDes(content, &thisPeer, sizeof(thisPeer));
- // 如果没有当前资源,则发送bRefresh=true和intervalNum=0
- if(!comm->currRes)
- bRefresh = true;
- // 是否更新全部区间列表
- CSClient::CopyMoveDes(content, &bRefresh, sizeof(bRefresh));
- // send block intervals
- // TODO: 这里存在重复代码,有待改进
- UINT8 intervalNum = comm->currRes?MAX_BLOCK_INTERVALS:0;
- if(bRefresh) {
- if(comm->currRes) {
- // 如果是更新全部区间,则取得当前资源全部区间的列表,并发送
- comm->currRes->GetAllIntervals((BlockInterval*)(content+sizeof(intervalNum)), intervalNum);
- }
- CSClient::CopyMoveDes(content, &intervalNum, sizeof(intervalNum));
- content += intervalNum*sizeof(BlockInterval);
- assert(intervalNum <= 20);
- }
- else {
- // 如果是更新变化的区间,则先后写入增加的区间和减少的区间
- comm->currRes->GetDiffIntervals((BlockInterval*)(content+sizeof(intervalNum)), intervalNum, true, true);
- CSClient::CopyMoveDes(content, &intervalNum, sizeof(intervalNum));
- content += intervalNum*sizeof(BlockInterval);
- assert(intervalNum <= 20);
- comm->currRes->GetDiffIntervals((BlockInterval*)(content+sizeof(intervalNum)), intervalNum, true, false);
- CSClient::CopyMoveDes(content, &intervalNum, sizeof(intervalNum));
- content += intervalNum*sizeof(BlockInterval);
- assert(intervalNum <= 20);
- }
- // 发送本机传输信息
- TransferInfo ti;
- comm->p2pMgr.GetTransferInfo(ti);
- CSClient::CopyMoveDes(content, &ti, sizeof(TransferInfo));
- comm->logFile.StatusOut("CSClient sent NP2TS_REPORT.");
- SendEnd();
- }
- void CSClient::SendNeedPeers() {
- CriticalSection::Owner lock(sendCS);
- if(!comm->currRes || !isLogin)
- return;
- // 如果请求资源还没有成功,就发送请求消息,然后返回
- if(!isReqResSucceed) {
- SendReqRes();
- return;
- }
- // 如果当前下载速度大于直播的码率,则不必发送NeedPeers
- if(comm->p2pMgr.GetCurDownSpeed()/1024 > comm->currRes->GetBitRate())
- return;
- // 如果已知的Peer列表足够多,就不用发送NeedPeers
- if(comm->p2pMgr.GetKnownPeerCount() + comm->p2pMgr.GetAllCount() >= MIN_KNOWN_PEERS)
- return;
- if(!SendBegin(NP2TS_NEED_PEERS))
- return;
- // 本机是否需要CachePeer的地址
- bool needCP = (comm->p2pMgr.GetCPConCount(true) < MAX_CONNECTION_OF_CP);
- CopyMoveDes(content, &needCP, sizeof(needCP));
- UINT currBlockID = comm->currRes->GetPlayingBlock();
- CopyMoveDes(content, &currBlockID, sizeof(currBlockID));
- // 本机在P2P网络中所属的层
- UINT8 layer = comm->p2pMgr.GetSelfLayer();
- CopyMoveDes(content, &layer, sizeof(layer));
- comm->logFile.StatusOut("CSClient sent NP2TS_NEED_PEERS.");
- SendEnd();
- }
- void CSClient::SendQueryRes() {
- // TODO: 什么时候发送请求的,目前客户端不关心现在有多少频道(md5),每个频道有多少人
- }
- void CSClient::SendLogout() {
- CriticalSection::Owner lock(sendCS);
- if(!comm->currRes || !isLogin)
- return;
- if(!SendBegin(NP2TS_LOGOUT))
- return;
- SendEnd();
- comm->logFile.StatusOut("CSClient sent NP2TS_LOGOUT.");
- // 退出登录
- isLogin = false;
- loginFailCount = 0;
- }
- void CSClient::SendReport2() {
- CriticalSection::Owner lock(sendCS);
- if(!comm->currRes || !isLogin)
- return;
- if(!SendBegin(NP2TS_REPORT2))
- return;
- UINT16 value = 0;
- // playing block
- UINT currBlockID = comm->currRes->GetPlayingBlock();
- CopyMoveDes(content, &currBlockID, sizeof(currBlockID));
- // 当前缓冲的时间,没有缓冲的话就是0
- value = comm->currRes->GetBufferingTime();
- CopyMoveDes(content, &value, sizeof(value));
- // 之前缓冲的次数
- value = comm->currRes->GetBufferCount();
- CopyMoveDes(content, &value, sizeof(value));
- // 之前所有缓冲所用的时间
- value = comm->currRes->GetBufferTime();
- CopyMoveDes(content, &value, sizeof(value));
- // connect Fail Count
- value = comm->p2pMgr.GetConnectFailCount();
- CopyMoveDes(content, &value, sizeof(value));
- // incoming connection count
- value = comm->p2pMgr.GetTotalIncomingCount();
- CopyMoveDes(content, &value, sizeof(value));
- // outgoing connection count
- value = comm->p2pMgr.GetTotalOutgoingCount();
- CopyMoveDes(content, &value, sizeof(value));
- // avg incoming connection elapsed time
- value = comm->p2pMgr.GetAvgIncomingTime();
- CopyMoveDes(content, &value, sizeof(value));
- // avg outgoing connection elapsed time
- value = comm->p2pMgr.GetAvgOutgoingTime();
- CopyMoveDes(content, &value, sizeof(value));
- // message percent
- float msgPercent = comm->p2pMgr.GetMessagePercent();
- CopyMoveDes(content, &msgPercent, sizeof(msgPercent));
- // 发送本机传输信息
- TransferInfo ti;
- comm->p2pMgr.GetTransferInfo(ti);
- CSClient::CopyMoveDes(content, &ti, sizeof(TransferInfo));
- comm->logFile.StatusOut("CSClient sent NP2TS_REPORT2.");
- SendEnd();
- }
- BOOL CSClient::SendBegin(UINT8 msgType) {
- // 先留着消息大小不写,到最后再写
- content = recvBuf+sizeof(UINT);
- // 消息类型
- CopyMoveDes(content, &msgType, sizeof(msgType));
- if(msgType != NP2TS_LOGIN) {
- // TS的验证码
- CopyMoveDes(content, checkCode, 7);
- }
- return TRUE;
- }
- void CSClient::SendEnd() {
- // 消息的大小就是移动的指针减去初始的指针
- UINT msgSize = content-recvBuf;
- memcpy(recvBuf, &msgSize, sizeof(msgSize));
- // 发送
- sockaddr_in addr;
- memcpy(&addr, &comm->trackerIP, sizeof(comm->trackerIP));
- int ret = sendto(m_Socket, (const char*)recvBuf, msgSize,
- 0, (SOCKADDR*)&addr, sizeof(addr));
- if(ret == SOCKET_ERROR) {
- comm->logFile.StatusErr("sendto() error, rebind udp socket", WSAGetLastError());
- // 重新连接TS
- Reinit();
- }
- }
- void CSClient::CopyMoveSrc(void * des, const char *& src, size_t size) {
- assert(des && src);
- if(!des || !src)
- return;
- memcpy(des, src, size);
- src += size;
- }
- void CSClient::CopyMoveDes(char *& des, const void * src, size_t size) {
- assert(des && src);
- if(!des || !src)
- return;
- memcpy(des, src, size);
- des += size;
- }
- UINT8 CSClient::GetLocalIPList(in_addr*& addrList) {
- UINT8 ipCount = 0;
- addrList = NULL;
- // Get local host name
- char szHostName[128] = "";
- if(gethostname(szHostName, sizeof(szHostName)))
- return 0;
- // Get local IP addresses
- hostent *pHost = 0;
- if((pHost = gethostbyname(szHostName)) == NULL)
- return 0;
- // 计算IP地址个数
- while(pHost->h_addr_list[ipCount]) {
- ipCount++;
- // 注意不能越界了,超过0xff个IP地址将不被支持
- if(ipCount == 0xff)
- break;
- }
- // 分配数组,要在外部释放
- addrList = new in_addr[ipCount];
- if(!addrList)
- return 0;
- // 复制IP列表
- for(int i = 0; i < ipCount; ++i) {
- memcpy(&addrList[i], pHost->h_addr_list[i], pHost->h_length);
- }
- // 返回IP地址个数
- return ipCount;
- }
- // 重新初始化
- void CSClient::Reinit() {
- if(m_Socket != INVALID_SOCKET)
- closesocket(m_Socket);
- Init();
- SendLogin();
- }
- }