pullpin.cpp
上传用户:liguizhu
上传日期:2015-11-01
资源大小:2422k
文件大小:10k
源码类别:

P2P编程

开发平台:

Visual C++

  1. //------------------------------------------------------------------------------
  2. // File: PullPin.cpp
  3. //
  4. // Desc: DirectShow base classes - implements CPullPin class that pulls data
  5. //       from IAsyncReader.
  6. //
  7. // Copyright (c) Microsoft Corporation.  All rights reserved.
  8. //------------------------------------------------------------------------------
  9. #include <streams.h>
  10. #include "pullpin.h"
  11. CPullPin::CPullPin()
  12.   : m_pReader(NULL),
  13.     m_pAlloc(NULL),
  14.     m_State(TM_Exit)
  15. {
  16. }
  17. CPullPin::~CPullPin()
  18. {
  19.     Disconnect();
  20. }
  21. // returns S_OK if successfully connected to an IAsyncReader interface
  22. // from this object
  23. // Optional allocator should be proposed as a preferred allocator if
  24. // necessary
  25. HRESULT
  26. CPullPin::Connect(IUnknown* pUnk, IMemAllocator* pAlloc, BOOL bSync)
  27. {
  28.     CAutoLock lock(&m_AccessLock);
  29.     if (m_pReader) {
  30. return VFW_E_ALREADY_CONNECTED;
  31.     }
  32.     HRESULT hr = pUnk->QueryInterface(IID_IAsyncReader, (void**)&m_pReader);
  33.     if (FAILED(hr)) {
  34. return(hr);
  35.     }
  36.     hr = DecideAllocator(pAlloc, NULL);
  37.     if (FAILED(hr)) {
  38. Disconnect();
  39. return hr;
  40.     }
  41.     LONGLONG llTotal, llAvail;
  42.     hr = m_pReader->Length(&llTotal, &llAvail);
  43.     if (FAILED(hr)) {
  44. Disconnect();
  45. return hr;
  46.     }
  47.     // convert from file position to reference time
  48.     m_tDuration = llTotal * UNITS;
  49.     m_tStop = m_tDuration;
  50.     m_tStart = 0;
  51.     m_bSync = bSync;
  52.     return S_OK;
  53. }
  54. // disconnect any connection made in Connect
  55. HRESULT
  56. CPullPin::Disconnect()
  57. {
  58.     CAutoLock lock(&m_AccessLock);
  59.     StopThread();
  60.     if (m_pReader) {
  61. m_pReader->Release();
  62. m_pReader = NULL;
  63.     }
  64.     if (m_pAlloc) {
  65. m_pAlloc->Release();
  66. m_pAlloc = NULL;
  67.     }
  68.     return S_OK;
  69. }
  70. // agree an allocator using RequestAllocator - optional
  71. // props param specifies your requirements (non-zero fields).
  72. // returns an error code if fail to match requirements.
  73. // optional IMemAllocator interface is offered as a preferred allocator
  74. // but no error occurs if it can't be met.
  75. HRESULT
  76. CPullPin::DecideAllocator(
  77.     IMemAllocator * pAlloc,
  78.     ALLOCATOR_PROPERTIES * pProps)
  79. {
  80.     ALLOCATOR_PROPERTIES *pRequest;
  81.     ALLOCATOR_PROPERTIES Request;
  82.     if (pProps == NULL) {
  83. Request.cBuffers = 3;
  84. Request.cbBuffer = 64*1024;
  85. Request.cbAlign = 0;
  86. Request.cbPrefix = 0;
  87. pRequest = &Request;
  88.     } else {
  89. pRequest = pProps;
  90.     }
  91.     HRESULT hr = m_pReader->RequestAllocator(
  92.     pAlloc,
  93.     pRequest,
  94.     &m_pAlloc);
  95.     return hr;
  96. }
  97. // start pulling data
  98. HRESULT
  99. CPullPin::Active(void)
  100. {
  101.     ASSERT(!ThreadExists());
  102.     return StartThread();
  103. }
  104. // stop pulling data
  105. HRESULT
  106. CPullPin::Inactive(void)
  107. {
  108.     StopThread();
  109.     return S_OK;
  110. }
  111. HRESULT
  112. CPullPin::Seek(REFERENCE_TIME tStart, REFERENCE_TIME tStop)
  113. {
  114.     CAutoLock lock(&m_AccessLock);
  115.     ThreadMsg AtStart = m_State;
  116.     if (AtStart == TM_Start) {
  117. BeginFlush();
  118. PauseThread();
  119. EndFlush();
  120.     }
  121.     m_tStart = tStart;
  122.     m_tStop = tStop;
  123.     HRESULT hr = S_OK;
  124.     if (AtStart == TM_Start) {
  125. hr = StartThread();
  126.     }
  127.     return hr;
  128. }
  129. HRESULT
  130. CPullPin::Duration(REFERENCE_TIME* ptDuration)
  131. {
  132.     *ptDuration = m_tDuration;
  133.     return S_OK;
  134. }
  135. HRESULT
  136. CPullPin::StartThread()
  137. {
  138.     CAutoLock lock(&m_AccessLock);
  139.     if (!m_pAlloc || !m_pReader) {
  140. return E_UNEXPECTED;
  141.     }
  142.     HRESULT hr;
  143.     if (!ThreadExists()) {
  144. // commit allocator
  145. hr = m_pAlloc->Commit();
  146. if (FAILED(hr)) {
  147.     return hr;
  148. }
  149. // start thread
  150. if (!Create()) {
  151.     return E_FAIL;
  152. }
  153.     }
  154.     m_State = TM_Start;
  155.     hr = (HRESULT) CallWorker(m_State);
  156.     return hr;
  157. }
  158. HRESULT
  159. CPullPin::PauseThread()
  160. {
  161.     CAutoLock lock(&m_AccessLock);
  162.     if (!ThreadExists()) {
  163. return E_UNEXPECTED;
  164.     }
  165.     // need to flush to ensure the thread is not blocked
  166.     // in WaitForNext
  167.     HRESULT hr = m_pReader->BeginFlush();
  168.     if (FAILED(hr)) {
  169. return hr;
  170.     }
  171.     m_State = TM_Pause;
  172.     hr = CallWorker(TM_Pause);
  173.     m_pReader->EndFlush();
  174.     return hr;
  175. }
  176. HRESULT
  177. CPullPin::StopThread()
  178. {
  179.     CAutoLock lock(&m_AccessLock);
  180.     if (!ThreadExists()) {
  181. return S_FALSE;
  182.     }
  183.     // need to flush to ensure the thread is not blocked
  184.     // in WaitForNext
  185.     HRESULT hr = m_pReader->BeginFlush();
  186.     if (FAILED(hr)) {
  187. return hr;
  188.     }
  189.     m_State = TM_Exit;
  190.     hr = CallWorker(TM_Exit);
  191.     m_pReader->EndFlush();
  192.     // wait for thread to completely exit
  193.     Close();
  194.     // decommit allocator
  195.     if (m_pAlloc) {
  196. m_pAlloc->Decommit();
  197.     }
  198.     return S_OK;
  199. }
  200. DWORD
  201. CPullPin::ThreadProc(void)
  202. {
  203.     while(1) {
  204. DWORD cmd = GetRequest();
  205. switch(cmd) {
  206. case TM_Exit:
  207.     Reply(S_OK);
  208.     return 0;
  209. case TM_Pause:
  210.     // we are paused already
  211.     Reply(S_OK);
  212.     break;
  213. case TM_Start:
  214.     Reply(S_OK);
  215.     Process();
  216.     break;
  217. }
  218. // at this point, there should be no outstanding requests on the
  219. // upstream filter.
  220. // We should force begin/endflush to ensure that this is true.
  221. // !!!Note that we may currently be inside a BeginFlush/EndFlush pair
  222. // on another thread, but the premature EndFlush will do no harm now
  223. // that we are idle.
  224. m_pReader->BeginFlush();
  225. CleanupCancelled();
  226. m_pReader->EndFlush();
  227.     }
  228. }
  229. HRESULT
  230. CPullPin::QueueSample(
  231.     REFERENCE_TIME& tCurrent,
  232.     REFERENCE_TIME tAlignStop,
  233.     BOOL bDiscontinuity
  234.     )
  235. {
  236.     IMediaSample* pSample;
  237.     HRESULT hr = m_pAlloc->GetBuffer(&pSample, NULL, NULL, 0);
  238.     if (FAILED(hr)) {
  239. return hr;
  240.     }
  241.     LONGLONG tStopThis = tCurrent + (pSample->GetSize() * UNITS);
  242.     if (tStopThis > tAlignStop) {
  243. tStopThis = tAlignStop;
  244.     }
  245.     pSample->SetTime(&tCurrent, &tStopThis);
  246.     tCurrent = tStopThis;
  247.     pSample->SetDiscontinuity(bDiscontinuity);
  248.     hr = m_pReader->Request(
  249. pSample,
  250. 0);
  251.     if (FAILED(hr)) {
  252. pSample->Release();
  253. CleanupCancelled();
  254. OnError(hr);
  255.     }
  256.     return hr;
  257. }
  258. HRESULT
  259. CPullPin::CollectAndDeliver(
  260.     REFERENCE_TIME tStart,
  261.     REFERENCE_TIME tStop)
  262. {
  263.     IMediaSample* pSample = NULL;   // better be sure pSample is set
  264.     DWORD_PTR dwUnused;
  265.     HRESULT hr = m_pReader->WaitForNext(
  266. INFINITE,
  267. &pSample,
  268. &dwUnused);
  269.     if (FAILED(hr)) {
  270. if (pSample) {
  271.     pSample->Release();
  272. }
  273.     } else {
  274. hr = DeliverSample(pSample, tStart, tStop);
  275.     }
  276.     if (FAILED(hr)) {
  277. CleanupCancelled();
  278. OnError(hr);
  279.     }
  280.     return hr;
  281. }
  282. HRESULT
  283. CPullPin::DeliverSample(
  284.     IMediaSample* pSample,
  285.     REFERENCE_TIME tStart,
  286.     REFERENCE_TIME tStop
  287.     )
  288. {
  289.     // fix up sample if past actual stop (for sector alignment)
  290.     REFERENCE_TIME t1, t2;
  291.     pSample->GetTime(&t1, &t2);
  292.     if (t2 > tStop) {
  293. t2 = tStop;
  294.     }
  295.     // adjust times to be relative to (aligned) start time
  296.     t1 -= tStart;
  297.     t2 -= tStart;
  298.     pSample->SetTime(&t1, &t2);
  299.     HRESULT hr = Receive(pSample);
  300.     pSample->Release();
  301.     return hr;
  302. }
  303. void
  304. CPullPin::Process(void)
  305. {
  306.     // is there anything to do?
  307.     if (m_tStop <= m_tStart) {
  308. EndOfStream();
  309. return;
  310.     }
  311.     BOOL bDiscontinuity = TRUE;
  312.     // if there is more than one sample at the allocator,
  313.     // then try to queue 2 at once in order to overlap.
  314.     // -- get buffer count and required alignment
  315.     ALLOCATOR_PROPERTIES Actual;
  316.     HRESULT hr = m_pAlloc->GetProperties(&Actual);
  317.     // align the start position downwards
  318.     REFERENCE_TIME tStart = AlignDown(m_tStart / UNITS, Actual.cbAlign) * UNITS;
  319.     REFERENCE_TIME tCurrent = tStart;
  320.     REFERENCE_TIME tStop = m_tStop;
  321.     if (tStop > m_tDuration) {
  322. tStop = m_tDuration;
  323.     }
  324.     // align the stop position - may be past stop, but that
  325.     // doesn't matter
  326.     REFERENCE_TIME tAlignStop = AlignUp(tStop / UNITS, Actual.cbAlign) * UNITS;
  327.     DWORD dwRequest;
  328.     if (!m_bSync) {
  329. //  Break out of the loop either if we get to the end or we're asked
  330. //  to do something else
  331. while (tCurrent < tAlignStop) {
  332.     // Break out without calling EndOfStream if we're asked to
  333.     // do something different
  334.     if (CheckRequest(&dwRequest)) {
  335. return;
  336.     }
  337.     // queue a first sample
  338.     if (Actual.cBuffers > 1) {
  339. hr = QueueSample(tCurrent, tAlignStop, TRUE);
  340. bDiscontinuity = FALSE;
  341. if (FAILED(hr)) {
  342.     return;
  343. }
  344.     }
  345.     // loop queueing second and waiting for first..
  346.     while (tCurrent < tAlignStop) {
  347. hr = QueueSample(tCurrent, tAlignStop, bDiscontinuity);
  348. bDiscontinuity = FALSE;
  349. if (FAILED(hr)) {
  350.     return;
  351. }
  352. hr = CollectAndDeliver(tStart, tStop);
  353. if (S_OK != hr) {
  354.     // stop if error, or if downstream filter said
  355.     // to stop.
  356.     return;
  357. }
  358.     }
  359.     if (Actual.cBuffers > 1) {
  360. hr = CollectAndDeliver(tStart, tStop);
  361. if (FAILED(hr)) {
  362.     return;
  363. }
  364.     }
  365. }
  366.     } else {
  367. // sync version of above loop
  368. while (tCurrent < tAlignStop) {
  369.     // Break out without calling EndOfStream if we're asked to
  370.     // do something different
  371.     if (CheckRequest(&dwRequest)) {
  372. return;
  373.     }
  374.     IMediaSample* pSample;
  375.     hr = m_pAlloc->GetBuffer(&pSample, NULL, NULL, 0);
  376.     if (FAILED(hr)) {
  377. OnError(hr);
  378. return;
  379.     }
  380.     LONGLONG tStopThis = tCurrent + (pSample->GetSize() * UNITS);
  381.     if (tStopThis > tAlignStop) {
  382. tStopThis = tAlignStop;
  383.     }
  384.     pSample->SetTime(&tCurrent, &tStopThis);
  385.     tCurrent = tStopThis;
  386.     if (bDiscontinuity) {
  387. pSample->SetDiscontinuity(TRUE);
  388. bDiscontinuity = FALSE;
  389.     }
  390.     hr = m_pReader->SyncReadAligned(pSample);
  391.     if (FAILED(hr)) {
  392. pSample->Release();
  393. OnError(hr);
  394. return;
  395.     }
  396.     hr = DeliverSample(pSample, tStart, tStop);
  397.     if (hr != S_OK) {
  398. if (FAILED(hr)) {
  399.     OnError(hr);
  400. }
  401. return;
  402.     }
  403. }
  404.     }
  405.     EndOfStream();
  406. }
  407. // after a flush, cancelled i/o will be waiting for collection
  408. // and release
  409. void
  410. CPullPin::CleanupCancelled(void)
  411. {
  412.     while (1) {
  413. IMediaSample * pSample;
  414. DWORD_PTR dwUnused;
  415. HRESULT hr = m_pReader->WaitForNext(
  416.     0,          // no wait
  417.     &pSample,
  418.     &dwUnused);
  419. if(pSample) {
  420.     pSample->Release();
  421. } else {
  422.     // no more samples
  423.     return;
  424. }
  425.     }
  426. }