pullpin.cpp
资源名称:p2p_vod.rar [点击查看]
上传用户:liguizhu
上传日期:2015-11-01
资源大小:2422k
文件大小:10k
源码类别:
P2P编程
开发平台:
Visual C++
- //------------------------------------------------------------------------------
- // File: PullPin.cpp
- //
- // Desc: DirectShow base classes - implements CPullPin class that pulls data
- // from IAsyncReader.
- //
- // Copyright (c) Microsoft Corporation. All rights reserved.
- //------------------------------------------------------------------------------
- #include <streams.h>
- #include "pullpin.h"
- CPullPin::CPullPin()
- : m_pReader(NULL),
- m_pAlloc(NULL),
- m_State(TM_Exit)
- {
- }
- CPullPin::~CPullPin()
- {
- Disconnect();
- }
- // returns S_OK if successfully connected to an IAsyncReader interface
- // from this object
- // Optional allocator should be proposed as a preferred allocator if
- // necessary
- HRESULT
- CPullPin::Connect(IUnknown* pUnk, IMemAllocator* pAlloc, BOOL bSync)
- {
- CAutoLock lock(&m_AccessLock);
- if (m_pReader) {
- return VFW_E_ALREADY_CONNECTED;
- }
- HRESULT hr = pUnk->QueryInterface(IID_IAsyncReader, (void**)&m_pReader);
- if (FAILED(hr)) {
- return(hr);
- }
- hr = DecideAllocator(pAlloc, NULL);
- if (FAILED(hr)) {
- Disconnect();
- return hr;
- }
- LONGLONG llTotal, llAvail;
- hr = m_pReader->Length(&llTotal, &llAvail);
- if (FAILED(hr)) {
- Disconnect();
- return hr;
- }
- // convert from file position to reference time
- m_tDuration = llTotal * UNITS;
- m_tStop = m_tDuration;
- m_tStart = 0;
- m_bSync = bSync;
- return S_OK;
- }
- // disconnect any connection made in Connect
- HRESULT
- CPullPin::Disconnect()
- {
- CAutoLock lock(&m_AccessLock);
- StopThread();
- if (m_pReader) {
- m_pReader->Release();
- m_pReader = NULL;
- }
- if (m_pAlloc) {
- m_pAlloc->Release();
- m_pAlloc = NULL;
- }
- return S_OK;
- }
- // agree an allocator using RequestAllocator - optional
- // props param specifies your requirements (non-zero fields).
- // returns an error code if fail to match requirements.
- // optional IMemAllocator interface is offered as a preferred allocator
- // but no error occurs if it can't be met.
- HRESULT
- CPullPin::DecideAllocator(
- IMemAllocator * pAlloc,
- ALLOCATOR_PROPERTIES * pProps)
- {
- ALLOCATOR_PROPERTIES *pRequest;
- ALLOCATOR_PROPERTIES Request;
- if (pProps == NULL) {
- Request.cBuffers = 3;
- Request.cbBuffer = 64*1024;
- Request.cbAlign = 0;
- Request.cbPrefix = 0;
- pRequest = &Request;
- } else {
- pRequest = pProps;
- }
- HRESULT hr = m_pReader->RequestAllocator(
- pAlloc,
- pRequest,
- &m_pAlloc);
- return hr;
- }
- // start pulling data
- HRESULT
- CPullPin::Active(void)
- {
- ASSERT(!ThreadExists());
- return StartThread();
- }
- // stop pulling data
- HRESULT
- CPullPin::Inactive(void)
- {
- StopThread();
- return S_OK;
- }
- HRESULT
- CPullPin::Seek(REFERENCE_TIME tStart, REFERENCE_TIME tStop)
- {
- CAutoLock lock(&m_AccessLock);
- ThreadMsg AtStart = m_State;
- if (AtStart == TM_Start) {
- BeginFlush();
- PauseThread();
- EndFlush();
- }
- m_tStart = tStart;
- m_tStop = tStop;
- HRESULT hr = S_OK;
- if (AtStart == TM_Start) {
- hr = StartThread();
- }
- return hr;
- }
- HRESULT
- CPullPin::Duration(REFERENCE_TIME* ptDuration)
- {
- *ptDuration = m_tDuration;
- return S_OK;
- }
- HRESULT
- CPullPin::StartThread()
- {
- CAutoLock lock(&m_AccessLock);
- if (!m_pAlloc || !m_pReader) {
- return E_UNEXPECTED;
- }
- HRESULT hr;
- if (!ThreadExists()) {
- // commit allocator
- hr = m_pAlloc->Commit();
- if (FAILED(hr)) {
- return hr;
- }
- // start thread
- if (!Create()) {
- return E_FAIL;
- }
- }
- m_State = TM_Start;
- hr = (HRESULT) CallWorker(m_State);
- return hr;
- }
- HRESULT
- CPullPin::PauseThread()
- {
- CAutoLock lock(&m_AccessLock);
- if (!ThreadExists()) {
- return E_UNEXPECTED;
- }
- // need to flush to ensure the thread is not blocked
- // in WaitForNext
- HRESULT hr = m_pReader->BeginFlush();
- if (FAILED(hr)) {
- return hr;
- }
- m_State = TM_Pause;
- hr = CallWorker(TM_Pause);
- m_pReader->EndFlush();
- return hr;
- }
- HRESULT
- CPullPin::StopThread()
- {
- CAutoLock lock(&m_AccessLock);
- if (!ThreadExists()) {
- return S_FALSE;
- }
- // need to flush to ensure the thread is not blocked
- // in WaitForNext
- HRESULT hr = m_pReader->BeginFlush();
- if (FAILED(hr)) {
- return hr;
- }
- m_State = TM_Exit;
- hr = CallWorker(TM_Exit);
- m_pReader->EndFlush();
- // wait for thread to completely exit
- Close();
- // decommit allocator
- if (m_pAlloc) {
- m_pAlloc->Decommit();
- }
- return S_OK;
- }
- DWORD
- CPullPin::ThreadProc(void)
- {
- while(1) {
- DWORD cmd = GetRequest();
- switch(cmd) {
- case TM_Exit:
- Reply(S_OK);
- return 0;
- case TM_Pause:
- // we are paused already
- Reply(S_OK);
- break;
- case TM_Start:
- Reply(S_OK);
- Process();
- break;
- }
- // at this point, there should be no outstanding requests on the
- // upstream filter.
- // We should force begin/endflush to ensure that this is true.
- // !!!Note that we may currently be inside a BeginFlush/EndFlush pair
- // on another thread, but the premature EndFlush will do no harm now
- // that we are idle.
- m_pReader->BeginFlush();
- CleanupCancelled();
- m_pReader->EndFlush();
- }
- }
- HRESULT
- CPullPin::QueueSample(
- REFERENCE_TIME& tCurrent,
- REFERENCE_TIME tAlignStop,
- BOOL bDiscontinuity
- )
- {
- IMediaSample* pSample;
- HRESULT hr = m_pAlloc->GetBuffer(&pSample, NULL, NULL, 0);
- if (FAILED(hr)) {
- return hr;
- }
- LONGLONG tStopThis = tCurrent + (pSample->GetSize() * UNITS);
- if (tStopThis > tAlignStop) {
- tStopThis = tAlignStop;
- }
- pSample->SetTime(&tCurrent, &tStopThis);
- tCurrent = tStopThis;
- pSample->SetDiscontinuity(bDiscontinuity);
- hr = m_pReader->Request(
- pSample,
- 0);
- if (FAILED(hr)) {
- pSample->Release();
- CleanupCancelled();
- OnError(hr);
- }
- return hr;
- }
- HRESULT
- CPullPin::CollectAndDeliver(
- REFERENCE_TIME tStart,
- REFERENCE_TIME tStop)
- {
- IMediaSample* pSample = NULL; // better be sure pSample is set
- DWORD_PTR dwUnused;
- HRESULT hr = m_pReader->WaitForNext(
- INFINITE,
- &pSample,
- &dwUnused);
- if (FAILED(hr)) {
- if (pSample) {
- pSample->Release();
- }
- } else {
- hr = DeliverSample(pSample, tStart, tStop);
- }
- if (FAILED(hr)) {
- CleanupCancelled();
- OnError(hr);
- }
- return hr;
- }
- HRESULT
- CPullPin::DeliverSample(
- IMediaSample* pSample,
- REFERENCE_TIME tStart,
- REFERENCE_TIME tStop
- )
- {
- // fix up sample if past actual stop (for sector alignment)
- REFERENCE_TIME t1, t2;
- pSample->GetTime(&t1, &t2);
- if (t2 > tStop) {
- t2 = tStop;
- }
- // adjust times to be relative to (aligned) start time
- t1 -= tStart;
- t2 -= tStart;
- pSample->SetTime(&t1, &t2);
- HRESULT hr = Receive(pSample);
- pSample->Release();
- return hr;
- }
- void
- CPullPin::Process(void)
- {
- // is there anything to do?
- if (m_tStop <= m_tStart) {
- EndOfStream();
- return;
- }
- BOOL bDiscontinuity = TRUE;
- // if there is more than one sample at the allocator,
- // then try to queue 2 at once in order to overlap.
- // -- get buffer count and required alignment
- ALLOCATOR_PROPERTIES Actual;
- HRESULT hr = m_pAlloc->GetProperties(&Actual);
- // align the start position downwards
- REFERENCE_TIME tStart = AlignDown(m_tStart / UNITS, Actual.cbAlign) * UNITS;
- REFERENCE_TIME tCurrent = tStart;
- REFERENCE_TIME tStop = m_tStop;
- if (tStop > m_tDuration) {
- tStop = m_tDuration;
- }
- // align the stop position - may be past stop, but that
- // doesn't matter
- REFERENCE_TIME tAlignStop = AlignUp(tStop / UNITS, Actual.cbAlign) * UNITS;
- DWORD dwRequest;
- if (!m_bSync) {
- // Break out of the loop either if we get to the end or we're asked
- // to do something else
- while (tCurrent < tAlignStop) {
- // Break out without calling EndOfStream if we're asked to
- // do something different
- if (CheckRequest(&dwRequest)) {
- return;
- }
- // queue a first sample
- if (Actual.cBuffers > 1) {
- hr = QueueSample(tCurrent, tAlignStop, TRUE);
- bDiscontinuity = FALSE;
- if (FAILED(hr)) {
- return;
- }
- }
- // loop queueing second and waiting for first..
- while (tCurrent < tAlignStop) {
- hr = QueueSample(tCurrent, tAlignStop, bDiscontinuity);
- bDiscontinuity = FALSE;
- if (FAILED(hr)) {
- return;
- }
- hr = CollectAndDeliver(tStart, tStop);
- if (S_OK != hr) {
- // stop if error, or if downstream filter said
- // to stop.
- return;
- }
- }
- if (Actual.cBuffers > 1) {
- hr = CollectAndDeliver(tStart, tStop);
- if (FAILED(hr)) {
- return;
- }
- }
- }
- } else {
- // sync version of above loop
- while (tCurrent < tAlignStop) {
- // Break out without calling EndOfStream if we're asked to
- // do something different
- if (CheckRequest(&dwRequest)) {
- return;
- }
- IMediaSample* pSample;
- hr = m_pAlloc->GetBuffer(&pSample, NULL, NULL, 0);
- if (FAILED(hr)) {
- OnError(hr);
- return;
- }
- LONGLONG tStopThis = tCurrent + (pSample->GetSize() * UNITS);
- if (tStopThis > tAlignStop) {
- tStopThis = tAlignStop;
- }
- pSample->SetTime(&tCurrent, &tStopThis);
- tCurrent = tStopThis;
- if (bDiscontinuity) {
- pSample->SetDiscontinuity(TRUE);
- bDiscontinuity = FALSE;
- }
- hr = m_pReader->SyncReadAligned(pSample);
- if (FAILED(hr)) {
- pSample->Release();
- OnError(hr);
- return;
- }
- hr = DeliverSample(pSample, tStart, tStop);
- if (hr != S_OK) {
- if (FAILED(hr)) {
- OnError(hr);
- }
- return;
- }
- }
- }
- EndOfStream();
- }
- // after a flush, cancelled i/o will be waiting for collection
- // and release
- void
- CPullPin::CleanupCancelled(void)
- {
- while (1) {
- IMediaSample * pSample;
- DWORD_PTR dwUnused;
- HRESULT hr = m_pReader->WaitForNext(
- 0, // no wait
- &pSample,
- &dwUnused);
- if(pSample) {
- pSample->Release();
- } else {
- // no more samples
- return;
- }
- }
- }