Connector.cpp
资源名称:GGBT.rar [点击查看]
上传用户:lds876
上传日期:2013-05-25
资源大小:567k
文件大小:10k
源码类别:
P2P编程
开发平台:
Visual C++
- // Connector.cpp: implementation of the CConnector class.
- //
- //////////////////////////////////////////////////////////////////////
- #include "stdafx.h"
- #include "testbt.h"
- #include "Connector.h"
- #include "Connection.h"
- #include "Encrypter.h"
- #include "Upload.h"
- #include "Downloader.h"
- #include "Choker.h"
- #ifdef _DEBUG
- #undef THIS_FILE
- static char THIS_FILE[]=__FILE__;
- #define new DEBUG_NEW
- #endif
- long toint(char* pBuf)
- {
- unsigned char* p = (unsigned char*) pBuf;
- return p[0]*0x1000000 + p[1]*0x10000 + p[2]*0x100 + p[3];
- }
- char* tobinary(long lnum, char* pBuf, long length=4)
- {
- char* p = (char*)&lnum;
- pBuf[0] = p[3];
- pBuf[1] = p[2];
- pBuf[2] = p[1];
- pBuf[3] = p[0];
- return pBuf;
- }
- //////////////////////////////////////////////////////////////////////
- // Construction/Destruction
- //////////////////////////////////////////////////////////////////////
- CConnector::CConnector(long lMaxUploadRate)
- {
- m_lMaxUploadRate = lMaxUploadRate;
- }
- CConnector::~CConnector()
- {
- for (int i=0; i<m_connections.size(); i++)
- {
- delete m_connections[i]->m_pDownload;
- delete m_connections[i]->m_pUpload;
- delete m_connections[i];
- }
- }
- void CConnector::connection_made(CEncryptedConnection *pEConnection)
- {
- CConnection* pnew = new CConnection(pEConnection, this);
- m_connections.push_back(pnew);
- pnew->m_pUpload = m_pUploader->make_upload(pnew);
- pnew->m_pDownload = m_pDownloader->make_download(pnew);
- m_pChoker->connection_made(pnew);
- }
- void CConnector::Create(CDownloader* pDownloader, CUploader* pUploader, CChoker* pchoker,
- long lNumPieces, CStorageWrapper* pStorage, CMeasure* pUpTotalMeasure)
- {
- m_pDownloader = pDownloader;
- m_pUploader = pUploader;
- m_lNumPieces = lNumPieces;
- m_pStorage = pStorage;
- m_pUpTotalMeasure = pUpTotalMeasure;
- m_pChoker = pchoker;
- m_bEndgame = false;
- m_bRateCapped = false;
- m_tcap = 0;
- m_tTimeUntilUncap = 0;
- check_endgame();
- }
- void CConnector::connection_lost(CEncryptedConnection* pEConnection)
- {
- bool bfind = false;
- for (int i=0; i<m_connections.size(); i++)
- {
- if (m_connections[i]->m_pEConnection == pEConnection)
- {
- m_pChoker->connection_lost(m_connections[i]);
- m_connections[i]->m_pDownload->disconnected();
- delete m_connections[i]->m_pDownload;
- delete m_connections[i]->m_pUpload;
- delete m_connections[i];
- m_connections.erase(m_connections.begin() + i);
- bfind = true;
- break;
- }
- }
- assert(bfind);
- }
- void CConnector::connection_flushed(CEncryptedConnection *pEConnection)
- {
- bool bfind = false;
- for (int i=0; i<m_connections.size(); i++)
- {
- if (m_connections[i]->m_pEConnection == pEConnection)
- {
- m_connections[i]->m_pUpload->flushed();
- bfind = true;
- break;
- }
- }
- assert(bfind);
- }
- long CConnector::how_many_connections()
- {
- return m_connections.size();
- }
- const CDownloader* const CConnector::GetDownloader() const
- {
- return m_pDownloader;
- };
- const CMeasure* const CConnector::GetUpTotalMeasure() const
- {
- return m_pUpTotalMeasure;
- }
- void CConnector::got_message(CEncryptedConnection* pEConnection, memstream& memMessage)
- {
- CConnection* pCon = 0;
- for (int i=0; i<m_connections.size(); i++)
- {
- if (m_connections[i]->m_pEConnection == pEConnection)
- {
- pCon = m_connections[i];
- break;
- }
- }
- assert(pCon);
- char* pBuf = memMessage;
- long length = memMessage.size();
- char t = (*pBuf);
- if (t == BITFIELD && pCon->m_bGotAnything)
- {
- pEConnection->Close();
- return;
- }
- pCon->m_bGotAnything = true;
- if (t == CHOKE || t == UNCHOKE || t == INTERESTED || t == NOT_INTERESTED)
- {
- if (length != 1)
- {
- pEConnection->Close();
- return;
- }
- }
- switch (t)
- {
- case CHOKE:
- {
- pCon->m_pDownload->got_choke();
- }
- break;
- case UNCHOKE:
- {
- pCon->m_pDownload->got_unchoke();
- check_endgame();
- }
- break;
- case INTERESTED:
- {
- pCon->m_pUpload->got_interested();
- }
- break;
- case NOT_INTERESTED:
- {
- pCon->m_pUpload->got_not_interested();
- }
- break;
- case HAVE:
- {
- if (length != 5)
- {
- pCon->Close();
- return;
- }
- long index = toint(pBuf + 1);
- if (index >= m_lNumPieces || index < 0)
- {
- pCon->Close();
- return;
- }
- pCon->m_pDownload->got_have(index);
- check_endgame();
- }
- break;
- case BITFIELD:
- {
- memstream memBitfield;
- memBitfield.write(pBuf+1, length - 1);
- vector<bool> vHave;
- if (!bitfield_to_booleans(memBitfield, m_lNumPieces, vHave))
- {
- pCon->Close();
- return;
- }
- pCon->m_pDownload->got_have_bitfield(vHave);
- check_endgame();
- }
- break;
- case REQUEST:
- {
- if (length != 13)
- {
- pCon->Close();
- return;
- }
- long index = toint(pBuf + 1);
- if (index >= m_lNumPieces)
- {
- pCon->Close();
- return;
- }
- pCon->m_pUpload->got_request(index, toint(pBuf + 5), toint(pBuf+9));
- }
- break;
- case CANCEL:
- {
- if (length != 13)
- {
- pCon->Close();
- return;
- }
- long index = toint(pBuf + 1);
- if (index >= m_lNumPieces)
- {
- pCon->Close();
- return;
- }
- pCon->m_pUpload->got_cancel(index, toint(pBuf + 5), toint(pBuf+9));
- }
- break;
- case PIECE:
- {
- if (length <= 9)
- {
- pCon->Close();
- return;
- }
- long index = toint(pBuf + 1);
- long begin = toint(pBuf+5);
- if (index >= m_lNumPieces)
- {
- pCon->Close();
- return;
- }
- memMessage.TrimLeft(9);
- // memstream memPiece;
- // memPiece.write(pBuf+9, length - 9);
- // if (pCon->m_pDownload->got_piece(index, toint(pBuf+5), memPiece))
- if (pCon->m_pDownload->got_piece(index, begin, memMessage))
- {
- for (int i=0; i<m_connections.size(); i++)
- {
- m_connections[i]->send_have(index);
- }
- }
- check_endgame();
- }
- break;
- default:
- {
- pCon->Close();
- return;
- }
- break;
- }
- }
- void CConnector::check_endgame()
- {
- if (!m_pDownloader->is_endgame() && m_pStorage->is_everything_pending())
- {
- m_bEndgame = true;
- m_pDownloader->make_endgame();
- }
- }
- void CConnector::_update_upload_rate(long lAmount)
- {
- m_pUpTotalMeasure->update_rate(lAmount);
- if (m_lMaxUploadRate> 0 && m_pUpTotalMeasure->get_rate_noupdate() > m_lMaxUploadRate)
- {
- m_bRateCapped = true;
- time(&m_tcap);
- m_tTimeUntilUncap = m_pUpTotalMeasure->time_until_rate(m_lMaxUploadRate);
- TRACE("rate capeped() wait time :(%d)rn", m_tTimeUntilUncap);
- }
- }
- void CConnector::uncap()
- {
- if (!m_bRateCapped)
- return;
- // check the time to excute?
- time_t t;
- time(&t);
- time_t tspan = t - m_tcap;
- assert(tspan >= 0);
- if (tspan < 0)
- m_tcap = t;
- if (tspan < m_tTimeUntilUncap)
- return ;
- TRACE("Rate uncap()rn");
- // flush the smaller rate connection to up the rate, until rate are recapped or all data flushed.
- m_bRateCapped = false;
- while (!m_bRateCapped)
- {
- CUpload * pUp = 0;
- long lminrate = -1;
- for (int i=0; i<m_connections.size(); i++)
- {
- if (!m_connections[i]->m_pUpload->is_choked() &&
- m_connections[i]->m_pUpload->has_queries() &&
- m_connections[i]->IsFlush())
- {
- long lrate = m_connections[i]->m_pUpload->get_rate();
- if (lminrate == -1 || lrate < lminrate)
- {
- pUp = m_connections[i]->m_pUpload;
- lminrate = lrate;
- }
- }
- }
- if (!pUp)
- break;
- TRACE("nRate uncap() excute flush() minrate(%d)rn", lminrate);
- pUp->flushed();
- if (m_pUpTotalMeasure->get_rate_noupdate() > m_lMaxUploadRate)
- break;
- }
- }
- bool CConnector::IsRatecap()
- {
- return m_bRateCapped;
- }
- void CConnector::SetMaxUploadRate(long lMaxUploadRate)
- {
- m_lMaxUploadRate = lMaxUploadRate;
- }
- void CConnector::OnIdle()
- {
- bool bSth = false;
- vector<CAddrPort> vToPause, vToContinue, vToClose;
- BOOL bRet = m_criticalSection.Lock();
- if (!bRet) return;
- if (!m_vToPause.empty() ||
- !m_vToContinue.empty() ||
- !m_vToClose.empty())
- {
- bSth = true;
- vToPause = m_vToPause;
- vToContinue = m_vToContinue;
- vToClose = m_vToClose;
- m_vToPause.erase(m_vToPause.begin(), m_vToPause.end());
- m_vToContinue.erase(m_vToContinue.begin(), m_vToContinue.end());
- m_vToClose.erase(m_vToClose.begin(), m_vToClose.end());
- }
- bRet = m_criticalSection.Unlock();
- if (!bRet) return;
- if (bSth)
- _OnIdle(vToPause, vToContinue, vToClose);
- }
- void CConnector::_OnIdle(vector<CAddrPort>& vToPause, vector<CAddrPort>& vToContinue, vector<CAddrPort>& vToClose)
- {
- for (int i=0; i<vToPause.size(); i++)
- {
- CAddrPort addrPort = vToPause[i];
- for (int j=0; j<m_connections.size(); j++)
- {
- long lAddr = 0;
- short sPort = 0;
- m_connections[j]->GetIP(lAddr, sPort);
- if (addrPort.m_lAddr == lAddr && addrPort.m_sPort == sPort)
- {
- TRACE("user pause connectionrn");
- m_connections[j]->Pause(true);
- break;
- }
- }
- }
- for (i=0; i<vToContinue.size(); i++)
- {
- CAddrPort addrPort = vToContinue[i];
- for (int j=0; j<m_connections.size(); j++)
- {
- long lAddr = 0;
- short sPort = 0;
- m_connections[j]->GetIP(lAddr, sPort);
- if (addrPort.m_lAddr == lAddr && addrPort.m_sPort == sPort)
- {
- TRACE("user unpause connectionrn");
- m_connections[j]->Pause(false);
- break;
- }
- }
- }
- for (i=0; i<vToClose.size(); i++)
- {
- CAddrPort addrPort = vToClose[i];
- for (int j=0; j<m_connections.size(); j++)
- {
- long lAddr = 0;
- short sPort = 0;
- m_connections[j]->GetIP(lAddr, sPort);
- if (addrPort.m_lAddr == lAddr && addrPort.m_sPort == sPort)
- {
- TRACE("user close connectionrn");
- m_connections[j]->Close();
- break;
- }
- }
- }
- }
- void CConnector::PausePeer(CAddrPort addrPort, bool bPause)
- {
- BOOL bRet = m_criticalSection.Lock();
- if (!bRet) return;
- if (bPause)
- m_vToPause.push_back(addrPort);
- else
- m_vToContinue.push_back(addrPort);
- bRet = m_criticalSection.Unlock();
- if (!bRet) return;
- }
- void CConnector::ClosePeer(CAddrPort addrPort)
- {
- BOOL bRet = m_criticalSection.Lock();
- if (!bRet) return;
- m_vToClose.push_back(addrPort);
- bRet = m_criticalSection.Unlock();
- if (!bRet) return;
- }