SPClient.cpp
资源名称:p2p_vod.rar [点击查看]
上传用户:liguizhu
上传日期:2015-11-01
资源大小:2422k
文件大小:17k
源码类别:
P2P编程
开发平台:
Visual C++
- /*
- * Openmysee
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 2 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
- *
- */
- #include "StdAfx.h"
- #include "SPClient.h"
- #include "CaptureServer.h"
- #include "MD5.h"
- SPClient::SPClient(CaptureServer* cServer, NormalAddress address, BufferMgr* bufferMgr, LogMgr* log) : m_freeList(8, 1) {
- cs = cServer;
- m_bufferMgr = bufferMgr;
- logFile = log;
- addr = address;
- isRunning = TRUE;
- isLogin = FALSE;
- m_socket = INVALID_SOCKET;
- sendPointer = NULL;
- recvPointer = NULL;
- recvOff = 0;
- errStr[0] = 0;
- lastSentBlockID = 0;
- memset(&header, 0, sizeof(header));
- readData = 0;
- DWORD threadID;
- hThread = CreateThread(
- NULL, 0, (LPTHREAD_START_ROUTINE)(SPClient::RunReceiver), this, 0, &threadID);
- }
- SPClient::~SPClient() {
- isRunning = FALSE;
- Disconnect();
- if(hThread) {
- // 如果一段时间内不能正常停止,就强行终止线程
- DWORD ret = WaitForSingleObject(hThread, 5000);
- if(ret == WAIT_TIMEOUT) {
- TerminateThread(hThread, 0);
- }
- CloseHandle(hThread);
- hThread = NULL;
- }
- }
- BOOL SPClient::SendRegister() {
- UINT8 nameSize = cs->cfgData.chnlStr.size();
- if(nameSize >= 0xff)
- return FALSE;
- TCPPacket* packet;
- if(!SendBegin(packet, CS2SP_REGISTER))
- return FALSE;
- //2.write channel name
- CopyMoveDes(sendPointer, &nameSize, sizeof(nameSize));
- CopyMoveDes(sendPointer, cs->cfgData.chnlStr.data(), nameSize);
- //3.write userinfo.
- CopyMoveDes(sendPointer, &cs->cfgData.userID, sizeof(UINT));
- CopyMoveDes(sendPointer, cs->cfgData.password.data(), MD5_LEN);
- //4. ratio ,maxblocksize
- UINT maxblockSize = BLOCK_SIZE;
- CopyMoveDes(sendPointer, &maxblockSize, sizeof(maxblockSize));
- float ratio = cs->GetSpeedInKBPS();
- CopyMoveDes(sendPointer, &ratio, sizeof(ratio));
- //5. write video & audio
- TVMEDIATYPESECTION videoTV, audioTV;
- PBYTE videoData = NULL;
- PBYTE audioData = NULL;
- do{
- delete [] videoData;
- delete [] audioData;
- Sleep(100);
- if(!isRunning) {
- m_freeList.Release(packet);
- return FALSE;
- }
- }
- while(!cs->GetFormatData(videoTV, videoData, FALSE) || !cs->GetFormatData(audioTV, audioData, TRUE) );
- // 检查媒体类型数据是否正确,关键是是否为空
- if(videoTV.cbFormat <= 0 && audioTV.cbFormat <= 0) {
- MessageBox(cs->parentWindow, "媒体数据为空!请重新启动采集端。", "错误", MB_OK|MB_ICONSTOP);
- return FALSE;
- }
- USHORT channelinfoLen = 2*sizeof(TVMEDIATYPESECTION)+videoTV.cbFormat + audioTV.cbFormat;
- CopyMoveDes(sendPointer, &channelinfoLen, sizeof(channelinfoLen));
- CopyMoveDes(sendPointer, &videoTV, sizeof(TVMEDIATYPESECTION));
- if(videoData)
- CopyMoveDes(sendPointer, videoData, videoTV.cbFormat);
- CopyMoveDes(sendPointer, &audioTV, sizeof(TVMEDIATYPESECTION));
- if(audioData)
- CopyMoveDes(sendPointer, audioData, audioTV.cbFormat);
- SendEnd(packet);
- return TRUE;
- }
- /**********发送消息的函数**********/
- BOOL SPClient::SendBlock() {
- if(m_sendList.size() > 10)
- return TRUE;
- TCPPacket* packet;
- if(!SendBegin(packet, CS2SP_BLOCK))
- return FALSE;
- //2.write ID.
- CopyMoveDes(sendPointer, &lastSentBlockID, sizeof(lastSentBlockID));
- //4.write data
- UINT size = 0;
- if(!m_bufferMgr->GetBlock(lastSentBlockID, reinterpret_cast<PBYTE>(sendPointer+sizeof(size)), size)) {
- // 一定要释放这个packet
- m_freeList.Release(packet);
- return TRUE;
- }
- char* hashcode = "";
- #ifdef DEBUG
- MD5 md5(reinterpret_cast<PBYTE>(sendPointer+sizeof(size)), size);
- hashcode = md5.hex_digest();
- #endif
- // verify block
- char* startPos = sendPointer+sizeof(size);
- char* tempPos = startPos + 8;
- if(lastSentBlockID == 0) {
- if(*((UINT*)startPos + 1) != UINT_MAX)
- tempPos = startPos + *((UINT*)startPos + 1);
- else
- tempPos = startPos + size;
- }
- while(header.size < 1638400) {
- //写header信息
- if((UINT)readData < sizeof(header)) {
- if(startPos+size - tempPos < sizeof(header)-readData){
- memcpy((char*)&header + readData, tempPos, startPos+size - tempPos);
- readData += startPos+size - tempPos;
- break;
- }
- else {
- memcpy((char*)&header + readData, tempPos, sizeof(header)-readData);
- tempPos += sizeof(header)-readData;//头的位置tempPos
- readData = sizeof(header);
- //assert(header.size < 1638400);
- if(header.size >= 1638400) {
- logFile->StatusOut("Header size %d, blockID %d, offset %d",
- header.size, lastSentBlockID, readData);
- break;
- }
- }
- }
- if(readData >= sizeof(header)) {
- if(startPos+size - tempPos < header.size - readData) {
- readData += startPos+size - tempPos;
- break;
- }
- else {
- tempPos += header.size - readData;
- readData = 0;
- }
- }
- }
- //3.write data size
- CopyMoveDes(sendPointer, &size, sizeof(size));
- int firstKeySampleOffset;
- memcpy(&firstKeySampleOffset, packet->buf+sizeof(UINT)*3+sizeof(char), sizeof(firstKeySampleOffset));
- LONGLONG keySample;
- memcpy(&keySample, packet->buf+sizeof(UINT)*3+sizeof(char)+firstKeySampleOffset, sizeof(keySample));
- memcpy(&firstKeySampleOffset, packet->buf+sizeof(UINT)*3+sizeof(char), sizeof(firstKeySampleOffset));
- if(firstKeySampleOffset == 0)
- keySample = 0;
- char temp[64];
- _i64toa(keySample, temp, 10);
- logFile->StatusOut("Queue Block. ID:%d/%d, offset: %d, keysample: %s, hash %s",
- lastSentBlockID, m_bufferMgr->GetMaxBlockID(), firstKeySampleOffset, temp, hashcode);
- #ifdef DEBUG
- delete [] hashcode;
- #endif
- sendPointer += size;
- SendEnd(packet);
- lastSentBlockID++;
- return TRUE;
- }
- BOOL SPClient::SendUpdate(){
- TCPPacket* packet;
- if(!SendBegin(packet, CS2SP_UPDATE))
- return FALSE;
- float ratio = cs->GetSpeedInKBPS();
- CopyMoveDes(sendPointer, &ratio, sizeof(ratio));
- SendEnd(packet);
- return TRUE;
- }
- BOOL SPClient::SendMediaType() {
- MediaData mediaData;
- if(!m_bufferMgr->GetMediaData(lastSentBlockID, mediaData))
- return FALSE;
- TCPPacket* packet;
- if(!SendBegin(packet, CS2SP_MEDIA_TYPE))
- return FALSE;
- UINT size = mediaData.audioType.cbFormat + mediaData.videoType.cbFormat +
- sizeof(mediaData.audioType) + sizeof(mediaData.videoType);
- CopyMoveDes(sendPointer, &size, sizeof(UINT));
- CopyMoveDes(sendPointer, &mediaData.videoType, sizeof(mediaData.videoType));
- CopyMoveDes(sendPointer, mediaData.videoData, mediaData.videoType.cbFormat);
- CopyMoveDes(sendPointer, &mediaData.audioType, sizeof(mediaData.audioType));
- CopyMoveDes(sendPointer, mediaData.audioData, mediaData.audioType.cbFormat);
- logFile->StatusOut("Queue Block media data. ID:%d/%d", lastSentBlockID, m_bufferMgr->GetMaxBlockID());
- SendEnd(packet);
- return TRUE;
- }
- CONNECT_RESULT SPClient::Connecting() {
- logFile->StatusOut("Connecting to %s:%d.", inet_ntoa(addr.sin_addr), ntohs(addr.sin_port));
- CONNECT_RESULT ret = CR_ERROR;
- // Create a TCP/IP socket that is bound to the server.
- // Microsoft Knowledge Base: WSA_FLAG_OVERLAPPED Is Needed for Non-Blocking Sockets
- // http://support.microsoft.com/default.aspx?scid=kb;EN-US;179942
- m_socket = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
- if(m_socket == INVALID_SOCKET) {
- logFile->StatusErr("Creating socket", WSAGetLastError());
- return ret;
- }
- // 不使用Nagle算法
- BOOL bNoDelay = TRUE;
- if(setsockopt(m_socket, SOL_SOCKET, TCP_NODELAY, (const char*)&bNoDelay, sizeof(bNoDelay)) == SOCKET_ERROR) {
- logFile->StatusErr("Setting UDP socket as TCP_NODELAY", WSAGetLastError());
- return ret;
- }
- // Set this socket as a Non-blocking socket.
- ULONG flag = 1;
- if(ioctlsocket(m_socket, FIONBIO, &flag) == SOCKET_ERROR) {
- logFile->StatusErr("Setting socket as non-blocking", WSAGetLastError());
- return ret;
- }
- // Connect to remote address
- if(WSAConnect(m_socket, (sockaddr*)&addr, sizeof(sockaddr), NULL, NULL, NULL, NULL) == SOCKET_ERROR) {
- if(WSAGetLastError() != WSAEWOULDBLOCK) {
- logFile->StatusErr("Connecting socket", WSAGetLastError());
- return ret;
- }
- else {
- ret = CR_WOULDBLOCK;
- logFile->StatusOut("%s:%d is blocking.", inet_ntoa(addr.sin_addr), ntohs(addr.sin_port));
- }
- }
- else {
- ret = CR_CONNECTED;
- logFile->StatusOut("%s:%d is connected.", inet_ntoa(addr.sin_addr), ntohs(addr.sin_port));
- }
- return ret;
- }
- void SPClient::Disconnect() {
- isLogin = FALSE;
- m_bufferMgr->StopSave();
- ClearTransferInfo();
- m_sendList.clear();
- if(m_socket != INVALID_SOCKET) {
- ::TE_CloseSocket(m_socket, FALSE);
- logFile->StatusOut("SPClient: disconnected from %s:%d.", inet_ntoa(addr.sin_addr), ntohs(addr.sin_port));
- m_socket = INVALID_SOCKET;
- }
- }
- BOOL SPClient::BaseRecv() {
- int ret = recv(m_socket, recvBuf+recvOff, P2P_BUF_SIZE-recvOff, 0);
- if(ret < 0) {
- DWORD lastError = ::WSAGetLastError();
- if (WSAEWOULDBLOCK != lastError) {
- logFile->StatusErr("Receiving data on TCP", lastError);
- return FALSE;
- }
- else
- return TRUE;
- }
- else if(0 == ret) {
- logFile->StatusOut("Connection has been disconnected gracefully.");
- return FALSE;
- }
- recvOff += ret;
- totalDownBytes += ret;
- BOOL retVal = FALSE;
- for(;;) {
- // because multiple msgs can be received at once.
- // keep call parseMsg() till !MSG_COMPLETE
- MSG_STATE ms = ParseMsg();
- switch(ms) {
- case MSG_COMPLETE:
- continue;
- case MSG_UNCOMPLETE:
- retVal = TRUE;
- break;
- case MSG_ERR_SIZE:
- sprintf(errStr, "错误的消息大小!");
- break;
- case MSG_ERR_TYPE:
- sprintf(errStr, "错误的消息类型!");
- break;
- break;
- case MSG_REMOTE_ERR:
- // 错误信息已经在errStr中了
- break;
- default:
- sprintf(errStr, "未知错误类型!");
- break;
- }
- if(strlen(errStr) > 0)
- logFile->StatusOut("错误: %s", errStr);
- break;
- }
- return retVal;
- }
- MSG_STATE SPClient::ParseMsg() {
- // 如果过小,则不是正常的包
- if(recvOff < sizeof(int)+sizeof(BYTE))
- return MSG_UNCOMPLETE;
- // 把移动指针放到数据的起始地址
- recvPointer = recvBuf;
- // 读取消息大小
- UINT msgSize;
- CopyMoveSrc(&msgSize, recvPointer, sizeof(msgSize));
- // 读取消息类型
- UINT8 msgType;
- CopyMoveSrc(&msgType, recvPointer, sizeof(msgType));
- // msgSize是否正常
- if(msgSize > P2P_BUF_SIZE || msgSize < sizeof(int)+sizeof(BYTE))
- return MSG_ERR_SIZE;
- // 是否包含完成的消息
- if(recvOff < msgSize)
- return MSG_UNCOMPLETE;
- MSG_STATE ret = MSG_COMPLETE;
- switch(msgType) {
- case SP2CS_WELCOME:
- ret = OnWelcome();
- break;
- case SP2CS_MSG:
- ret = OnMsg();
- break;
- default:
- ret = MSG_ERR_TYPE;
- break;
- }
- // copy left data to start of recvBuf
- if(recvOff >= msgSize) {
- memcpy(recvBuf, recvBuf+msgSize, recvOff-msgSize);
- recvOff -= msgSize;
- }
- return ret;
- }
- //解析注册成功消息
- MSG_STATE SPClient::OnWelcome() {
- UINT startBlockID;
- CopyMoveSrc(&startBlockID, recvPointer, sizeof(startBlockID));
- lastSentBlockID = startBlockID;
- logFile->StatusOut("Start Block %d.", lastSentBlockID);
- isLogin = TRUE;
- return MSG_COMPLETE;
- }
- //收到SP_MSG消息
- MSG_STATE SPClient::OnMsg() {
- // 错误代码
- UINT16 errCode;
- CopyMoveSrc(&errCode, recvPointer, sizeof(errCode));
- // 是否需要断开连接
- bool shouldDisconnect;
- CopyMoveSrc(&shouldDisconnect, recvPointer, sizeof(shouldDisconnect));
- // 根据错误代码处理
- switch(errCode) {
- case ERR_PROTOCOL_FORMAT:
- sprintf(errStr, "协议错误");
- break;
- case ERR_AUTHORIZATION:
- sprintf(errStr, "验证错误");
- break;
- case ERR_INTERNAL:
- sprintf(errStr, "未知错误");
- break;
- default:
- shouldDisconnect = true;
- }
- if(shouldDisconnect) {
- return MSG_REMOTE_ERR;
- }
- return MSG_COMPLETE;
- }
- BOOL SPClient::SendPackets() {
- if(m_sendList.empty())
- return TRUE;
- TCPPacket* packet = m_sendList.front();
- m_sendList.pop_front();
- int ret = send(m_socket, packet->buf+packet->sent, min(packet->size-packet->sent, 40000), 0);
- if(SOCKET_ERROR == ret) {
- DWORD lastError = ::WSAGetLastError();
- if (WSAEWOULDBLOCK == lastError) {
- m_sendList.push_front(packet);
- }
- else {
- m_sendList.push_front(packet);
- logFile->StatusErr("Sending data on TCP", lastError);
- return FALSE;
- }
- }
- else {
- totalUpBytes += ret;
- logFile->StatusOut("Send data %d of %d", ret, packet->size-packet->sent);
- assert(ret <= packet->size-packet->sent);
- if(ret == packet->size-packet->sent) {
- if(packet->size > 16384)
- logFile->StatusOut("Sent Block %d.", *packet->GetBlockID());
- // 已经发送完毕,释放Buffer
- m_freeList.Release(packet);
- }
- else { // 尚未发送完毕,下次继续发送
- packet->sent += ret;
- m_sendList.push_front(packet);
- logFile->StatusOut("!!!!!!!!!!!");
- }
- }
- return TRUE;
- }
- void SPClient::RunReceiver(SPClient* client)
- {
- timeval timeout;
- DWORD lastSentUpdate = 0;
- fd_set read_set, write_set;
- DWORD lastManageTime=0, currTime=0;
- while(client->isRunning)
- {
- if(client->m_socket == INVALID_SOCKET)
- {
- if(!client->m_bufferMgr->ShouldConnect()) {
- Sleep(500);
- continue;
- }
- CONNECT_RESULT ret = client->Connecting();
- if(ret == CR_WOULDBLOCK) {
- // 等待8秒钟,看能否连接上
- FD_ZERO(&write_set);
- FD_SET(client->m_socket, &write_set);
- timeout.tv_sec = 8;
- timeout.tv_usec = 0;
- int s = select(0, NULL, &write_set, NULL, &timeout);
- if(s > 0)
- ret = CR_CONNECTED;
- }
- if(ret == CR_CONNECTED) {
- client->lastSentBlockID = 0;
- client->readData = 0;
- // 可以开始存储数据了
- client->m_bufferMgr->StartSave();
- }
- else {
- TE_CloseSocket(client->m_socket, FALSE);
- client->m_socket = INVALID_SOCKET;
- client->logFile->StatusOut("无法连接SP,请检查网络。");
- //MessageBox(NULL, "无法连接SP,请检查网络。然后重新打开本程序。", "错误", MB_OK|MB_ICONINFORMATION);
- for(int i = 0; i < client->cs->cfgData.reconnectSecond; ++i) {
- if(!client->isRunning)
- break;
- Sleep(1000);
- }
- continue;
- }
- }
- if(client->SendRegister()) {
- // 开始接收发送数据
- while(client->isRunning) {
- // 获取当前时间
- currTime = timeGetTime();
- timeout.tv_sec = 0;
- timeout.tv_usec = 20000;
- FD_ZERO(&read_set);
- FD_SET(client->m_socket, &read_set);
- int s = select(0, &read_set, 0, NULL, &timeout);
- if(s > 0) {
- if(FD_ISSET(client->m_socket, &read_set)) {
- // 接收信息
- if(!client->BaseRecv()) {
- for(int i = 0; i < client->cs->cfgData.reconnectSecond; ++i) {
- if(!client->isRunning)
- break;
- Sleep(1000);
- }
- break;
- }
- }
- }
- else if(s == SOCKET_ERROR) {
- client->logFile->StatusErr("selecting", WSAGetLastError());
- Sleep(1); // prevent dead loop
- }
- if(currTime-lastManageTime >= 10000) {
- lastManageTime = currTime;
- // 生成传输信息并打印
- client->GenerateTransferInfo(TRUE);
- client->logFile->StatusOut("Cur: (%.2f/%.2f)KB/s. Avg: (%.2f/%.2f)KB/s. Total: %.2f/%.2fMB.",
- client->currDownSpeed/1024, client->currUpSpeed/1024,
- client->avgDownSpeed/1024, client->avgUpSpeed/1024,
- client->totalDownBytes/1024.f/1024.f, client->totalUpBytes/1024.f/1024.f);
- }
- if(!client->m_bufferMgr->CheckRecvingSample()) {
- client->logFile->StatusOut("No Sample Any More!");
- //MessageBox(client->cs->parentWindow, "CaptureServer 20秒钟没有接收到Sample了!", "Sample中断", MB_OK|MB_ICONSTOP);
- if(client->cs->parentWindow)
- SendMessage(client->cs->parentWindow, WM_NOMORESAMPLE, 0, 0);
- }
- // 发送数据
- if(client->isLogin && client->isRunning) {
- // 目前SP尚不支持这个协议
- //client->SendMediaType();
- client->SendBlock();
- }
- if(currTime-lastSentUpdate > 10000) {
- if(!client->SendUpdate())
- break;
- lastSentUpdate = currTime;
- }
- if(!client->SendPackets())
- break;
- }
- }
- client->Disconnect();
- }
- }
- BOOL SPClient::SendBegin(TCPPacket*& packet, UINT8 msgType) {
- packet= m_freeList.Allocate();
- if(!packet)
- return FALSE;
- // 先留着消息大小不写,到最后再写
- sendPointer = packet->buf+sizeof(UINT);
- CopyMoveDes(sendPointer, &msgType, sizeof(msgType));
- return TRUE;
- }
- void SPClient::SendEnd(TCPPacket*& packet) {
- // 消息的大小就是移动的指针减去初始的指针
- packet->size = sendPointer-packet->buf;
- packet->sent = 0;
- memcpy(packet->buf, &packet->size, sizeof(packet->size));
- m_sendList.push_back(packet);
- }
- void SPClient::CopyMoveSrc(void * des, const char *& src, size_t size) {
- assert(des && src);
- if(!des || !src)
- return;
- memcpy(des, src, size);
- src += size;
- }
- void SPClient::CopyMoveDes(char *& des, const void * src, size_t size) {
- assert(des && src);
- if(!des || !src)
- return;
- memcpy(des, src, size);
- des += size;
- }