Connector.cpp
上传用户:lds876
上传日期:2013-05-25
资源大小:567k
文件大小:10k
源码类别:

P2P编程

开发平台:

Visual C++

  1. // Connector.cpp: implementation of the CConnector class.
  2. //
  3. //////////////////////////////////////////////////////////////////////
  4. #include "stdafx.h"
  5. #include "testbt.h"
  6. #include "Connector.h"
  7. #include "Connection.h"
  8. #include "Encrypter.h"
  9. #include "Upload.h"
  10. #include "Downloader.h"
  11. #include "Choker.h"
  12. #ifdef _DEBUG
  13. #undef THIS_FILE
  14. static char THIS_FILE[]=__FILE__;
  15. #define new DEBUG_NEW
  16. #endif
  17. long toint(char* pBuf)
  18. {
  19. unsigned char* p = (unsigned char*) pBuf;
  20. return p[0]*0x1000000 + p[1]*0x10000 + p[2]*0x100 + p[3];
  21. }
  22. char* tobinary(long lnum, char* pBuf, long length=4)
  23. {
  24. char* p = (char*)&lnum;
  25. pBuf[0] = p[3];
  26. pBuf[1] = p[2];
  27. pBuf[2] = p[1];
  28. pBuf[3] = p[0];
  29. return pBuf;
  30. }
  31. //////////////////////////////////////////////////////////////////////
  32. // Construction/Destruction
  33. //////////////////////////////////////////////////////////////////////
  34. CConnector::CConnector(long lMaxUploadRate)
  35. {
  36. m_lMaxUploadRate = lMaxUploadRate;
  37. }
  38. CConnector::~CConnector()
  39. {
  40. for (int i=0;  i<m_connections.size(); i++)
  41. {
  42. delete m_connections[i]->m_pDownload;
  43. delete m_connections[i]->m_pUpload;
  44. delete m_connections[i];
  45. }
  46. }
  47. void CConnector::connection_made(CEncryptedConnection *pEConnection)
  48. {
  49. CConnection* pnew = new CConnection(pEConnection, this);
  50. m_connections.push_back(pnew);
  51. pnew->m_pUpload = m_pUploader->make_upload(pnew);
  52. pnew->m_pDownload = m_pDownloader->make_download(pnew);
  53. m_pChoker->connection_made(pnew);
  54. }
  55. void CConnector::Create(CDownloader* pDownloader, CUploader* pUploader, CChoker* pchoker, 
  56.  long lNumPieces, CStorageWrapper* pStorage,  CMeasure* pUpTotalMeasure)
  57. {
  58. m_pDownloader = pDownloader;
  59. m_pUploader = pUploader;
  60. m_lNumPieces = lNumPieces;
  61. m_pStorage = pStorage;
  62. m_pUpTotalMeasure = pUpTotalMeasure;
  63. m_pChoker = pchoker;
  64. m_bEndgame = false;
  65. m_bRateCapped = false;
  66. m_tcap = 0;
  67. m_tTimeUntilUncap = 0;
  68. check_endgame();
  69. }
  70. void CConnector::connection_lost(CEncryptedConnection* pEConnection)
  71. {
  72. bool bfind = false;
  73. for (int i=0;  i<m_connections.size(); i++)
  74. {
  75. if (m_connections[i]->m_pEConnection == pEConnection)
  76. {
  77. m_pChoker->connection_lost(m_connections[i]);
  78. m_connections[i]->m_pDownload->disconnected();
  79. delete m_connections[i]->m_pDownload;
  80. delete m_connections[i]->m_pUpload;
  81. delete m_connections[i];
  82. m_connections.erase(m_connections.begin() + i);
  83. bfind = true;
  84. break;
  85. }
  86. }
  87. assert(bfind);
  88. }
  89. void CConnector::connection_flushed(CEncryptedConnection *pEConnection)
  90. {
  91. bool bfind = false;
  92. for (int i=0;  i<m_connections.size(); i++)
  93. {
  94. if (m_connections[i]->m_pEConnection == pEConnection)
  95. {
  96. m_connections[i]->m_pUpload->flushed();
  97. bfind = true;
  98. break;
  99. }
  100. }
  101. assert(bfind);
  102. }
  103. long CConnector::how_many_connections()
  104. {
  105. return m_connections.size();
  106. }
  107. const CDownloader* const CConnector::GetDownloader() const
  108. {
  109. return m_pDownloader;
  110. };
  111. const CMeasure* const CConnector::GetUpTotalMeasure() const
  112. {
  113. return m_pUpTotalMeasure;
  114. }
  115. void CConnector::got_message(CEncryptedConnection* pEConnection, memstream& memMessage)
  116. {
  117. CConnection* pCon = 0;
  118. for (int i=0;  i<m_connections.size(); i++)
  119. {
  120. if (m_connections[i]->m_pEConnection == pEConnection)
  121. {
  122. pCon = m_connections[i];
  123. break;
  124. }
  125. }
  126. assert(pCon);
  127. char* pBuf = memMessage;
  128. long length = memMessage.size();
  129. char t = (*pBuf);
  130. if (t == BITFIELD && pCon->m_bGotAnything)
  131. {
  132. pEConnection->Close();
  133. return;
  134. }
  135. pCon->m_bGotAnything = true;
  136. if (t == CHOKE || t == UNCHOKE || t == INTERESTED || t == NOT_INTERESTED)
  137. {
  138. if (length != 1)
  139. {
  140. pEConnection->Close();
  141. return;
  142. }
  143. }
  144. switch (t)
  145. {
  146. case CHOKE:
  147. {
  148. pCon->m_pDownload->got_choke();
  149. }
  150. break;
  151. case UNCHOKE:
  152. {
  153. pCon->m_pDownload->got_unchoke();
  154. check_endgame();
  155. }
  156. break;
  157. case INTERESTED:
  158. {
  159. pCon->m_pUpload->got_interested();
  160. }
  161. break;
  162. case NOT_INTERESTED:
  163. {
  164. pCon->m_pUpload->got_not_interested();
  165. }
  166. break;
  167. case HAVE:
  168. {
  169. if (length != 5)
  170. {
  171. pCon->Close();
  172. return;
  173. }
  174. long index = toint(pBuf + 1);
  175. if (index >= m_lNumPieces || index < 0)
  176. {
  177. pCon->Close();
  178. return;
  179. }
  180. pCon->m_pDownload->got_have(index);
  181. check_endgame();
  182. }
  183. break;
  184. case BITFIELD:
  185. {
  186. memstream memBitfield;
  187. memBitfield.write(pBuf+1, length - 1);
  188. vector<bool> vHave;
  189. if (!bitfield_to_booleans(memBitfield, m_lNumPieces, vHave))
  190. {
  191. pCon->Close();
  192. return;
  193. }
  194. pCon->m_pDownload->got_have_bitfield(vHave);
  195. check_endgame();
  196. }
  197. break;
  198. case REQUEST:
  199. {
  200. if (length != 13)
  201. {
  202. pCon->Close();
  203. return;
  204. }
  205. long index = toint(pBuf + 1);
  206. if (index >= m_lNumPieces)
  207. {
  208. pCon->Close();
  209. return;
  210. }
  211. pCon->m_pUpload->got_request(index, toint(pBuf + 5), toint(pBuf+9));
  212. }
  213. break;
  214. case CANCEL:
  215. {
  216. if (length != 13)
  217. {
  218. pCon->Close();
  219. return;
  220. }
  221. long index = toint(pBuf + 1);
  222. if (index >= m_lNumPieces)
  223. {
  224. pCon->Close();
  225. return;
  226. }
  227. pCon->m_pUpload->got_cancel(index, toint(pBuf + 5), toint(pBuf+9));
  228. }
  229. break;
  230. case PIECE:
  231. {
  232. if (length <= 9)
  233. {
  234. pCon->Close();
  235. return;
  236. }
  237. long index = toint(pBuf + 1);
  238. long begin = toint(pBuf+5);
  239. if (index >= m_lNumPieces)
  240. {
  241. pCon->Close();
  242. return;
  243. }
  244. memMessage.TrimLeft(9);
  245. // memstream memPiece;
  246. // memPiece.write(pBuf+9, length - 9);
  247. // if (pCon->m_pDownload->got_piece(index, toint(pBuf+5), memPiece))
  248. if (pCon->m_pDownload->got_piece(index, begin, memMessage))
  249. {
  250. for (int i=0;  i<m_connections.size(); i++)
  251. {
  252. m_connections[i]->send_have(index);
  253. }
  254. }
  255. check_endgame();
  256. }
  257. break;
  258. default:
  259. {
  260. pCon->Close();
  261. return;
  262. }
  263. break;
  264. }
  265. }
  266. void CConnector::check_endgame()
  267. {
  268. if (!m_pDownloader->is_endgame() && m_pStorage->is_everything_pending())
  269. {
  270. m_bEndgame = true;
  271. m_pDownloader->make_endgame();
  272. }
  273. }
  274. void CConnector::_update_upload_rate(long lAmount)
  275. {
  276. m_pUpTotalMeasure->update_rate(lAmount);
  277. if (m_lMaxUploadRate> 0 && m_pUpTotalMeasure->get_rate_noupdate() > m_lMaxUploadRate)
  278. {
  279. m_bRateCapped = true;
  280. time(&m_tcap);
  281. m_tTimeUntilUncap = m_pUpTotalMeasure->time_until_rate(m_lMaxUploadRate);
  282. TRACE("rate capeped() wait time :(%d)rn", m_tTimeUntilUncap);
  283. }
  284. }
  285. void CConnector::uncap()
  286. {
  287. if (!m_bRateCapped)
  288. return;
  289. // check the time to excute?
  290. time_t t;
  291. time(&t);
  292. time_t tspan = t - m_tcap;
  293. assert(tspan >= 0);
  294. if (tspan < 0)
  295. m_tcap = t;
  296. if (tspan < m_tTimeUntilUncap)
  297. return ;
  298. TRACE("Rate uncap()rn");
  299. // flush the smaller rate connection to up the rate, until rate are recapped or all data flushed.
  300. m_bRateCapped = false;
  301. while (!m_bRateCapped)
  302. {
  303. CUpload * pUp = 0;
  304. long lminrate = -1;
  305. for (int i=0; i<m_connections.size(); i++)
  306. {
  307. if (!m_connections[i]->m_pUpload->is_choked() && 
  308. m_connections[i]->m_pUpload->has_queries() &&
  309. m_connections[i]->IsFlush())
  310. {
  311. long lrate = m_connections[i]->m_pUpload->get_rate();
  312. if (lminrate == -1 || lrate < lminrate)
  313. {
  314. pUp = m_connections[i]->m_pUpload;
  315. lminrate = lrate;
  316. }
  317. }
  318. }
  319. if (!pUp) 
  320. break;
  321. TRACE("nRate uncap() excute flush() minrate(%d)rn", lminrate);
  322. pUp->flushed();
  323. if (m_pUpTotalMeasure->get_rate_noupdate() > m_lMaxUploadRate)
  324. break;
  325. }
  326. }
  327. bool CConnector::IsRatecap()
  328. {
  329. return m_bRateCapped;
  330. }
  331. void CConnector::SetMaxUploadRate(long lMaxUploadRate)
  332. {
  333. m_lMaxUploadRate = lMaxUploadRate;
  334. }
  335. void CConnector::OnIdle()
  336. {
  337. bool bSth = false;
  338. vector<CAddrPort> vToPause, vToContinue, vToClose;
  339. BOOL bRet = m_criticalSection.Lock();
  340. if (!bRet) return;
  341. if (!m_vToPause.empty() ||
  342. !m_vToContinue.empty() || 
  343. !m_vToClose.empty())
  344. {
  345. bSth = true;
  346. vToPause = m_vToPause;
  347. vToContinue = m_vToContinue;
  348. vToClose = m_vToClose;
  349. m_vToPause.erase(m_vToPause.begin(), m_vToPause.end());
  350. m_vToContinue.erase(m_vToContinue.begin(), m_vToContinue.end());
  351. m_vToClose.erase(m_vToClose.begin(), m_vToClose.end());
  352. }
  353. bRet = m_criticalSection.Unlock();
  354. if (!bRet) return;
  355. if (bSth)
  356. _OnIdle(vToPause, vToContinue, vToClose);
  357. }
  358. void CConnector::_OnIdle(vector<CAddrPort>& vToPause, vector<CAddrPort>& vToContinue, vector<CAddrPort>& vToClose)
  359. {
  360. for (int i=0; i<vToPause.size(); i++)
  361. {
  362. CAddrPort addrPort = vToPause[i];
  363. for (int j=0; j<m_connections.size(); j++)
  364. {
  365. long lAddr = 0;
  366. short sPort = 0;
  367. m_connections[j]->GetIP(lAddr, sPort);
  368. if (addrPort.m_lAddr == lAddr && addrPort.m_sPort == sPort)
  369. {
  370. TRACE("user pause connectionrn");
  371. m_connections[j]->Pause(true);
  372. break;
  373. }
  374. }
  375. }
  376. for (i=0; i<vToContinue.size(); i++)
  377. {
  378. CAddrPort addrPort = vToContinue[i];
  379. for (int j=0; j<m_connections.size(); j++)
  380. {
  381. long lAddr = 0;
  382. short sPort = 0;
  383. m_connections[j]->GetIP(lAddr, sPort);
  384. if (addrPort.m_lAddr == lAddr && addrPort.m_sPort == sPort)
  385. {
  386. TRACE("user unpause connectionrn");
  387. m_connections[j]->Pause(false);
  388. break;
  389. }
  390. }
  391. }
  392. for (i=0; i<vToClose.size(); i++)
  393. {
  394. CAddrPort addrPort = vToClose[i];
  395. for (int j=0; j<m_connections.size(); j++)
  396. {
  397. long lAddr = 0;
  398. short sPort = 0;
  399. m_connections[j]->GetIP(lAddr, sPort);
  400. if (addrPort.m_lAddr == lAddr && addrPort.m_sPort == sPort)
  401. {
  402. TRACE("user close connectionrn");
  403. m_connections[j]->Close();
  404. break;
  405. }
  406. }
  407. }
  408. }
  409. void CConnector::PausePeer(CAddrPort addrPort, bool bPause)
  410. {
  411. BOOL bRet = m_criticalSection.Lock();
  412. if (!bRet) return;
  413. if (bPause)
  414. m_vToPause.push_back(addrPort);
  415. else
  416. m_vToContinue.push_back(addrPort);
  417. bRet = m_criticalSection.Unlock();
  418. if (!bRet) return;
  419. }
  420. void CConnector::ClosePeer(CAddrPort addrPort)
  421. {
  422. BOOL bRet = m_criticalSection.Lock();
  423. if (!bRet) return;
  424. m_vToClose.push_back(addrPort);
  425. bRet = m_criticalSection.Unlock();
  426. if (!bRet) return;
  427. }