asyncio.cpp
上传用户:xjjlds
上传日期:2015-12-05
资源大小:22823k
文件大小:16k
源码类别:

多媒体编程

开发平台:

Visual C++

  1. //------------------------------------------------------------------------------
  2. // File: AsyncIo.cpp
  3. //
  4. // Desc: DirectShow sample code - base library with I/O functionality.
  5. //
  6. // Copyright (c) 1992-2001 Microsoft Corporation.  All rights reserved.
  7. //------------------------------------------------------------------------------
  8. #include "stdafx.h"
  9. #include "asyncio.h"
  10. // --- CAsyncRequest ---
  11. // implementation of CAsyncRequest representing a single
  12. // outstanding request. All the i/o for this object is done
  13. // in the Complete method.
  14. // init the params for this request.
  15. // Read is not issued until the complete call
  16. HRESULT
  17. CAsyncRequest::Request(
  18.     CAsyncIo *pIo,
  19.     CAsyncStream *pStream,
  20.     LONGLONG llPos,
  21.     LONG lLength,
  22.     BOOL bAligned,
  23.     BYTE* pBuffer,
  24.     LPVOID pContext, // filter's context
  25.     DWORD dwUser) // downstream filter's context
  26. {
  27.     m_pIo = pIo;
  28.     m_pStream = pStream;
  29.     m_llPos = llPos;
  30.     m_lLength = lLength;
  31.     m_bAligned = bAligned;
  32.     m_pBuffer = pBuffer;
  33.     m_pContext = pContext;
  34.     m_dwUser = dwUser;
  35.     m_hr = VFW_E_TIMEOUT;   // not done yet
  36.     return S_OK;
  37. }
  38. // issue the i/o if not overlapped, and block until i/o complete.
  39. // returns error code of file i/o
  40. //
  41. //
  42. HRESULT
  43. CAsyncRequest::Complete()
  44. {
  45.     m_pStream->Lock();
  46.     m_hr = m_pStream->SetPointer(m_llPos);
  47.     if (S_OK == m_hr) {
  48.         DWORD dwActual;
  49.         m_hr = m_pStream->Read(m_pBuffer, m_lLength, m_bAligned, &dwActual);
  50.         if (m_hr == OLE_S_FIRST) {
  51.             if (m_pContext) {
  52.                 IMediaSample *pSample = reinterpret_cast<IMediaSample *>(m_pContext);
  53.                 pSample->SetDiscontinuity(TRUE);
  54.                 m_hr = S_OK;
  55.             }
  56.         }
  57.         if (FAILED(m_hr)) {
  58.         } else if (dwActual != (DWORD)m_lLength) {
  59.             // tell caller size changed - probably because of EOF
  60.             m_lLength = (LONG) dwActual;
  61.             m_hr = S_FALSE;
  62.         } else {
  63.             m_hr = S_OK;
  64.         }
  65.     }
  66.     m_pStream->Unlock();
  67.     return m_hr;
  68. }
  69. // --- CAsyncIo ---
  70. // note - all events created manual reset
  71. CAsyncIo::CAsyncIo(CAsyncStream *pStream)
  72.  : m_hThread(NULL),
  73.    m_evWork(TRUE),
  74.    m_evDone(TRUE),
  75.    m_evStop(TRUE),
  76.    m_listWork(NAME("Work list")),
  77.    m_listDone(NAME("Done list")),
  78.    m_bFlushing(FALSE),
  79.    m_cItemsOut(0),
  80.    m_bWaiting(FALSE),
  81.    m_pStream(pStream)
  82. {
  83. }
  84. CAsyncIo::~CAsyncIo()
  85. {
  86.     // move everything to the done list
  87.     BeginFlush();
  88.     // shutdown worker thread
  89.     CloseThread();
  90.     // empty the done list
  91.     POSITION pos = m_listDone.GetHeadPosition();
  92.     while (pos) {
  93.         CAsyncRequest* pRequest = m_listDone.GetNext(pos);
  94.         delete pRequest;
  95.     }
  96.     m_listDone.RemoveAll();
  97. }
  98. // ready for async activity - call this before calling Request.
  99. //
  100. // start the worker thread if we need to
  101. //
  102. // !!! use overlapped i/o if possible
  103. HRESULT
  104. CAsyncIo::AsyncActive(void)
  105. {
  106.     return StartThread();
  107. }
  108. // call this when no more async activity will happen before
  109. // the next AsyncActive call
  110. //
  111. // stop the worker thread if active
  112. HRESULT
  113. CAsyncIo::AsyncInactive(void)
  114. {
  115.     return CloseThread();
  116. }
  117. // add a request to the queue.
  118. HRESULT
  119. CAsyncIo::Request(
  120.             LONGLONG llPos,
  121.             LONG lLength,
  122.             BOOL bAligned,
  123.             BYTE* pBuffer,
  124.             LPVOID pContext,
  125.             DWORD dwUser)
  126. {
  127.     if (bAligned) {
  128.         if (!IsAligned(llPos) ||
  129.      !IsAligned(lLength) ||
  130.      !IsAligned((LONG) pBuffer)) {
  131.             return VFW_E_BADALIGN;
  132.         }
  133.     }
  134.     CAsyncRequest* pRequest = new CAsyncRequest;
  135.     HRESULT hr = pRequest->Request(
  136.                             this,
  137.                             m_pStream,
  138.                             llPos,
  139.                             lLength,
  140.                             bAligned,
  141.                             pBuffer,
  142.                             pContext,
  143.                             dwUser);
  144.     if (SUCCEEDED(hr)) {
  145.         // might fail if flushing
  146.         hr = PutWorkItem(pRequest);
  147.     }
  148.     if (FAILED(hr)) {
  149.         delete pRequest;
  150.     }
  151.     return hr;
  152. }
  153. // wait for the next request to complete
  154. HRESULT
  155. CAsyncIo::WaitForNext(
  156.     DWORD dwTimeout,
  157.     LPVOID *ppContext,
  158.     DWORD * pdwUser,
  159.     LONG* pcbActual)
  160. {
  161.     // some errors find a sample, others don't. Ensure that
  162.     // *ppContext is NULL if no sample found
  163.     *ppContext = NULL;
  164.     // wait until the event is set, but since we are not
  165.     // holding the critsec when waiting, we may need to re-wait
  166.     for (;;) {
  167.         if (!m_evDone.Wait(dwTimeout)) {
  168.             // timeout occurred
  169.             return VFW_E_TIMEOUT;
  170.         }
  171.         // get next event from list
  172.         CAsyncRequest* pRequest = GetDoneItem();
  173.         if (pRequest) {
  174.             // found a completed request
  175.             // check if ok
  176.             HRESULT hr = pRequest->GetHResult();
  177.             if (hr == S_FALSE) {
  178.                 // this means the actual length was less than
  179.                 // requested - may be ok if he aligned the end of file
  180.                 if ((pRequest->GetActualLength() +
  181.                      pRequest->GetStart()) == Size()) {
  182.                     hr = S_OK;
  183.                 } else {
  184.                     // it was an actual read error
  185.                     hr = E_FAIL;
  186.                 }
  187.             }
  188.             // return actual bytes read
  189.             *pcbActual = pRequest->GetActualLength();
  190.             // return his context
  191.             *ppContext = pRequest->GetContext();
  192.             *pdwUser = pRequest->GetUser();
  193.             delete pRequest;
  194.             return hr;
  195.         } else {
  196.             //  Hold the critical section while checking the list state
  197.             CAutoLock lck(&m_csLists);
  198.             if (m_bFlushing && !m_bWaiting) {
  199.                 // can't block as we are between BeginFlush and EndFlush
  200.                 // but note that if m_bWaiting is set, then there are some
  201.                 // items not yet complete that we should block for.
  202.                 return VFW_E_WRONG_STATE;
  203.             }
  204.         }
  205.         // done item was grabbed between completion and
  206.         // us locking m_csLists.
  207.     }
  208. }
  209. // perform a synchronous read request on this thread.
  210. // Need to hold m_csFile while doing this (done in request object)
  211. HRESULT
  212. CAsyncIo::SyncReadAligned(
  213.             LONGLONG llPos,
  214.             LONG lLength,
  215.             BYTE* pBuffer,
  216.             LONG* pcbActual,
  217.             PVOID pvContext
  218.             )
  219. {
  220.     if (!IsAligned(llPos) ||
  221. !IsAligned(lLength) ||
  222. !IsAligned((LONG) pBuffer)) {
  223.         return VFW_E_BADALIGN;
  224.     }
  225.     CAsyncRequest request;
  226.     HRESULT hr = request.Request(
  227.                     this,
  228.                     m_pStream,
  229.                     llPos,
  230.                     lLength,
  231.                     TRUE,
  232.                     pBuffer,
  233.                     pvContext,
  234.                     0);
  235.     if (FAILED(hr)) {
  236.         return hr;
  237.     }
  238.     hr = request.Complete();
  239.     // return actual data length
  240.     *pcbActual = request.GetActualLength();
  241.     return hr;
  242. }
  243. HRESULT
  244. CAsyncIo::Length(LONGLONG *pllTotal, LONGLONG* pllAvailable)
  245. {
  246.     *pllTotal = m_pStream->Size(pllAvailable);
  247.     return S_OK;
  248. }
  249. // cancel all items on the worklist onto the done list
  250. // and refuse further requests or further WaitForNext calls
  251. // until the end flush
  252. //
  253. // WaitForNext must return with NULL only if there are no successful requests.
  254. // So Flush does the following:
  255. // 1. set m_bFlushing ensures no more requests succeed
  256. // 2. move all items from work list to the done list.
  257. // 3. If there are any outstanding requests, then we need to release the
  258. //    critsec to allow them to complete. The m_bWaiting as well as ensuring
  259. //    that we are signalled when they are all done is also used to indicate
  260. //    to WaitForNext that it should continue to block.
  261. // 4. Once all outstanding requests are complete, we force m_evDone set and
  262. //    m_bFlushing set and m_bWaiting false. This ensures that WaitForNext will
  263. //    not block when the done list is empty.
  264. HRESULT
  265. CAsyncIo::BeginFlush()
  266. {
  267.     // hold the lock while emptying the work list
  268.     {
  269.         CAutoLock lock(&m_csLists);
  270.         // prevent further requests being queued.
  271.         // Also WaitForNext will refuse to block if this is set
  272.         // unless m_bWaiting is also set which it will be when we release
  273.         // the critsec if there are any outstanding).
  274.         m_bFlushing = TRUE;
  275.         CAsyncRequest * preq;
  276.         while((preq = GetWorkItem()) != 0) {
  277.             preq->Cancel();
  278.             PutDoneItem(preq);
  279.         }
  280.         // now wait for any outstanding requests to complete
  281.         if (m_cItemsOut > 0) {
  282.             // can be only one person waiting
  283.             ASSERT(!m_bWaiting);
  284.             // this tells the completion routine that we need to be
  285.             // signalled via m_evAllDone when all outstanding items are
  286.             // done. It also tells WaitForNext to continue blocking.
  287.             m_bWaiting = TRUE;
  288.         } else {
  289.             // all done
  290.             // force m_evDone set so that even if list is empty,
  291.             // WaitForNext will not block
  292.             // don't do this until we are sure that all
  293.             // requests are on the done list.
  294.             m_evDone.Set();
  295.             return S_OK;
  296.         }
  297.     }
  298.     ASSERT(m_bWaiting);
  299.     // wait without holding critsec
  300.     for (;;) {
  301.         m_evAllDone.Wait();
  302.         {
  303.             // hold critsec to check
  304.             CAutoLock lock(&m_csLists);
  305.             if (m_cItemsOut == 0) {
  306.                 // now we are sure that all outstanding requests are on
  307.                 // the done list and no more will be accepted
  308.                 m_bWaiting = FALSE;
  309.                 // force m_evDone set so that even if list is empty,
  310.                 // WaitForNext will not block
  311.                 // don't do this until we are sure that all
  312.                 // requests are on the done list.
  313.                 m_evDone.Set();
  314.                 return S_OK;
  315.             }
  316.         }
  317.     }
  318. }
  319. // end a flushing state
  320. HRESULT
  321. CAsyncIo::EndFlush()
  322. {
  323.     CAutoLock lock(&m_csLists);
  324.     m_bFlushing = FALSE;
  325.     ASSERT(!m_bWaiting);
  326.     // m_evDone might have been set by BeginFlush - ensure it is
  327.     // set IFF m_listDone is non-empty
  328.     if (m_listDone.GetCount() > 0) {
  329.         m_evDone.Set();
  330.     } else {
  331.         m_evDone.Reset();
  332.     }
  333.     return S_OK;
  334. }
  335. // start the thread
  336. HRESULT
  337. CAsyncIo::StartThread(void)
  338. {
  339.     if (m_hThread) {
  340.         return S_OK;
  341.     }
  342.     // clear the stop event before starting
  343.     m_evStop.Reset();
  344.     DWORD dwThreadID;
  345.     m_hThread = CreateThread(
  346.                     NULL,
  347.                     0,
  348.                     InitialThreadProc,
  349.                     this,
  350.                     0,
  351.                     &dwThreadID);
  352.     if (!m_hThread) {
  353. DWORD dwErr = GetLastError();
  354.         return HRESULT_FROM_WIN32(dwErr);
  355.     }
  356.     return S_OK;
  357. }
  358. // stop the thread and close the handle
  359. HRESULT
  360. CAsyncIo::CloseThread(void)
  361. {
  362.     // signal the thread-exit object
  363.     m_evStop.Set();
  364.     if (m_hThread) {
  365.         WaitForSingleObject(m_hThread, INFINITE);
  366.         CloseHandle(m_hThread);
  367.         m_hThread = NULL;
  368.     }
  369.     return S_OK;
  370. }
  371. // manage the list of requests. hold m_csLists and ensure
  372. // that the (manual reset) event hevList is set when things on
  373. // the list but reset when the list is empty.
  374. // returns null if list empty
  375. CAsyncRequest*
  376. CAsyncIo::GetWorkItem()
  377. {
  378.     CAutoLock lck(&m_csLists);
  379.     CAsyncRequest * preq  = m_listWork.RemoveHead();
  380.     // force event set correctly
  381.     if (m_listWork.GetCount() == 0) {
  382.         m_evWork.Reset();
  383.     }
  384.     return preq;
  385. }
  386. // get an item from the done list
  387. CAsyncRequest*
  388. CAsyncIo::GetDoneItem()
  389. {
  390.     CAutoLock lock(&m_csLists);
  391.     CAsyncRequest * preq  = m_listDone.RemoveHead();
  392.     // force event set correctly if list now empty
  393.     // or we're in the final stages of flushing
  394.     // Note that during flushing the way it's supposed to work is that
  395.     // everything is shoved on the Done list then the application is
  396.     // supposed to pull until it gets nothing more
  397.     //
  398.     // Thus we should not set m_evDone unconditionally until everything
  399.     // has moved to the done list which means we must wait until
  400.     // cItemsOut is 0 (which is guaranteed by m_bWaiting being TRUE).
  401.     if (m_listDone.GetCount() == 0 &&
  402.         (!m_bFlushing || m_bWaiting)) {
  403.         m_evDone.Reset();
  404.     }
  405.     return preq;
  406. }
  407. // put an item on the work list - fail if bFlushing
  408. HRESULT
  409. CAsyncIo::PutWorkItem(CAsyncRequest* pRequest)
  410. {
  411.     CAutoLock lock(&m_csLists);
  412.     HRESULT hr;
  413.     if (m_bFlushing) {
  414.         hr = VFW_E_WRONG_STATE;
  415.     }
  416.     else if (m_listWork.AddTail(pRequest)) {
  417.         // event should now be in a set state - force this
  418.         m_evWork.Set();
  419.         // start the thread now if not already started
  420.         hr = StartThread();
  421.     } else {
  422.         hr = E_OUTOFMEMORY;
  423.     }
  424.     return(hr);
  425. }
  426. // put an item on the done list - ok to do this when
  427. // flushing
  428. HRESULT
  429. CAsyncIo::PutDoneItem(CAsyncRequest* pRequest)
  430. {
  431.     ASSERT(CritCheckIn(&m_csLists));
  432.     if (m_listDone.AddTail(pRequest)) {
  433.         // event should now be in a set state - force this
  434.         m_evDone.Set();
  435.         return S_OK;
  436.     } else {
  437.         return E_OUTOFMEMORY;
  438.     }
  439. }
  440. // called on thread to process any active requests
  441. void
  442. CAsyncIo::ProcessRequests(void)
  443. {
  444.     // lock to get the item and increment the outstanding count
  445.     CAsyncRequest * preq = NULL;
  446.     for (;;) {
  447.         {
  448.             CAutoLock lock(&m_csLists);
  449.             preq = GetWorkItem();
  450.             if (preq == NULL) {
  451.                 // done
  452.                 return;
  453.             }
  454.             // one more item not on the done or work list
  455.             m_cItemsOut++;
  456.             // release critsec
  457.         }
  458.         preq->Complete();
  459.         // regain critsec to replace on done list
  460.         {
  461.             CAutoLock l(&m_csLists);
  462.             PutDoneItem(preq);
  463.             if (--m_cItemsOut == 0) {
  464.                 if (m_bWaiting) {
  465.                     m_evAllDone.Set();
  466.                 }
  467.             }
  468.         }
  469.     }
  470. }
  471. // the thread proc - assumes that DWORD thread param is the
  472. // this pointer
  473. DWORD
  474. CAsyncIo::ThreadProc(void)
  475. {
  476.     HANDLE ahev[] = {m_evStop, m_evWork};
  477.     for (;;) {
  478.     DWORD dw = WaitForMultipleObjects(
  479.         2,
  480.         ahev,
  481.         FALSE,
  482.         INFINITE);
  483.     if (dw == WAIT_OBJECT_0+1) {
  484.         // requests need processing
  485.         ProcessRequests();
  486.     } else {
  487.         // any error or stop event - we should exit
  488.         return 0;
  489.     }
  490.     }
  491. }
  492. // perform a synchronous read request on this thread.
  493. // may not be aligned - so we will have to buffer.
  494. HRESULT
  495. CAsyncIo::SyncRead(
  496.             LONGLONG llPos,
  497.             LONG lLength,
  498.             BYTE* pBuffer)
  499. {
  500.     if (IsAligned(llPos) &&
  501. IsAligned(lLength) &&
  502. IsAligned((LONG) pBuffer)) {
  503.         LONG cbUnused;
  504.     return SyncReadAligned(llPos, lLength, pBuffer, &cbUnused, NULL);
  505.     }
  506.     // not aligned with requirements - use buffered file handle.
  507.     //!!! might want to fix this to buffer the data ourselves?
  508.     CAsyncRequest request;
  509.     HRESULT hr = request.Request(
  510.                     this,
  511.                     m_pStream,
  512.                     llPos,
  513.                     lLength,
  514.                     FALSE,
  515.                     pBuffer,
  516.                     NULL,
  517.                     0);
  518.     if (FAILED(hr)) {
  519.         return hr;
  520.     }
  521.     return request.Complete();
  522. }
  523. //  Return the alignment
  524. HRESULT
  525. CAsyncIo::Alignment(LONG *pl)
  526. {
  527.     *pl = Alignment();
  528.     return S_OK;
  529. }