Asyncio.cpp
上传用户:hxb_1234
上传日期:2010-03-30
资源大小:8328k
文件大小:17k
源码类别:

VC书籍

开发平台:

Visual C++

  1. //------------------------------------------------------------------------------
  2. // File: AsyncIo.cpp
  3. //
  4. // Desc: DirectShow sample code - base library with I/O functionality.
  5. //
  6. // Copyright (c) Microsoft Corporation.  All rights reserved.
  7. //------------------------------------------------------------------------------
  8. #include "stdafx.h"
  9. #include <streams.h>
  10. #include "asyncio.h"
  11. // --- CAsyncRequest ---
  12. // implementation of CAsyncRequest representing a single
  13. // outstanding request. All the i/o for this object is done
  14. // in the Complete method.
  15. // init the params for this request.
  16. // Read is not issued until the complete call
  17. HRESULT
  18. CAsyncRequest::Request(
  19.     CAsyncIo *pIo,
  20.     CAsyncStream *pStream,
  21.     LONGLONG llPos,
  22.     LONG lLength,
  23.     BOOL bAligned,
  24.     BYTE* pBuffer,
  25.     LPVOID pContext,    // filter's context
  26.     DWORD dwUser)   // downstream filter's context
  27. {
  28.     m_pIo = pIo;
  29.     m_pStream = pStream;
  30.     m_llPos = llPos;
  31.     m_lLength = lLength;
  32.     m_bAligned = bAligned;
  33.     m_pBuffer = pBuffer;
  34.     m_pContext = pContext;
  35.     m_dwUser = dwUser;
  36.     m_hr = VFW_E_TIMEOUT;   // not done yet
  37.     return S_OK;
  38. }
  39. // issue the i/o if not overlapped, and block until i/o complete.
  40. // returns error code of file i/o
  41. //
  42. //
  43. HRESULT
  44. CAsyncRequest::Complete()
  45. {
  46.     m_pStream->Lock();
  47.     m_hr = m_pStream->SetPointer(m_llPos);
  48.     if(S_OK == m_hr)
  49.     {
  50.         DWORD dwActual;
  51.         m_hr = m_pStream->Read(m_pBuffer, m_lLength, m_bAligned, &dwActual);
  52.         if(m_hr == OLE_S_FIRST)
  53.         {
  54.             if(m_pContext)
  55.             {
  56.                 IMediaSample *pSample = reinterpret_cast<IMediaSample *>(m_pContext);
  57.                 pSample->SetDiscontinuity(TRUE);
  58.                 m_hr = S_OK;
  59.             }
  60.         }
  61.         if(FAILED(m_hr))
  62.         {
  63.         }
  64.         else if(dwActual != (DWORD)m_lLength)
  65.         {
  66.             // tell caller size changed - probably because of EOF
  67.             m_lLength = (LONG) dwActual;
  68.             m_hr = S_FALSE;
  69.         }
  70.         else
  71.         {
  72.             m_hr = S_OK;
  73.         }
  74.     }
  75.     m_pStream->Unlock();
  76.     return m_hr;
  77. }
  78. // --- CAsyncIo ---
  79. // note - all events created manual reset
  80. CAsyncIo::CAsyncIo(CAsyncStream *pStream)
  81.          : m_hThread(NULL),
  82.            m_evWork(TRUE),
  83.            m_evDone(TRUE),
  84.            m_evStop(TRUE),
  85.            m_listWork(NAME("Work list")),
  86.            m_listDone(NAME("Done list")),
  87.            m_bFlushing(FALSE),
  88.            m_cItemsOut(0),
  89.            m_bWaiting(FALSE),
  90.            m_pStream(pStream)
  91. {
  92. }
  93. CAsyncIo::~CAsyncIo()
  94. {
  95.     // move everything to the done list
  96.     BeginFlush();
  97.     // shutdown worker thread
  98.     CloseThread();
  99.     // empty the done list
  100.     POSITION pos = m_listDone.GetHeadPosition();
  101.     while(pos)
  102.     {
  103.         CAsyncRequest* pRequest = m_listDone.GetNext(pos);
  104.         delete pRequest;
  105.     }
  106.     m_listDone.RemoveAll();
  107. }
  108. // ready for async activity - call this before calling Request.
  109. //
  110. // start the worker thread if we need to
  111. //
  112. // !!! use overlapped i/o if possible
  113. HRESULT
  114. CAsyncIo::AsyncActive(void)
  115. {
  116.     return StartThread();
  117. }
  118. // call this when no more async activity will happen before
  119. // the next AsyncActive call
  120. //
  121. // stop the worker thread if active
  122. HRESULT
  123. CAsyncIo::AsyncInactive(void)
  124. {
  125.     return CloseThread();
  126. }
  127. // add a request to the queue.
  128. HRESULT
  129. CAsyncIo::Request(
  130.                 LONGLONG llPos,
  131.                 LONG lLength,
  132.                 BOOL bAligned,
  133.                 BYTE * pBuffer,
  134.                 LPVOID pContext,
  135.                 DWORD dwUser)
  136. {
  137.     if(bAligned)
  138.     {
  139.         if(!IsAligned(llPos) ||
  140.             !IsAligned(lLength) ||
  141.             !IsAligned((LONG) pBuffer))
  142.         {
  143.             return VFW_E_BADALIGN;
  144.         }
  145.     }
  146.     CAsyncRequest* pRequest = new CAsyncRequest;
  147.     if (!pRequest)
  148.         return E_OUTOFMEMORY;
  149.     HRESULT hr = pRequest->Request(this,
  150.                                    m_pStream,
  151.                                    llPos,
  152.                                    lLength,
  153.                                    bAligned,
  154.                                    pBuffer,
  155.                                    pContext,
  156.                                    dwUser);
  157.     if(SUCCEEDED(hr))
  158.     {
  159.         // might fail if flushing
  160.         hr = PutWorkItem(pRequest);
  161.     }
  162.     if(FAILED(hr))
  163.     {
  164.         delete pRequest;
  165.     }
  166.     return hr;
  167. }
  168. // wait for the next request to complete
  169. HRESULT
  170. CAsyncIo::WaitForNext(
  171.     DWORD dwTimeout,
  172.     LPVOID * ppContext,
  173.     DWORD  * pdwUser,
  174.     LONG   * pcbActual)
  175. {
  176.     CheckPointer(ppContext,E_POINTER);
  177.     CheckPointer(pdwUser,E_POINTER);
  178.     CheckPointer(pcbActual,E_POINTER);
  179.     // some errors find a sample, others don't. Ensure that
  180.     // *ppContext is NULL if no sample found
  181.     *ppContext = NULL;
  182.     // wait until the event is set, but since we are not
  183.     // holding the critsec when waiting, we may need to re-wait
  184.     for(;;)
  185.     {
  186.         if(!m_evDone.Wait(dwTimeout))
  187.         {
  188.             // timeout occurred
  189.             return VFW_E_TIMEOUT;
  190.         }
  191.         // get next event from list
  192.         CAsyncRequest* pRequest = GetDoneItem();
  193.         if(pRequest)
  194.         {
  195.             // found a completed request
  196.             // check if ok
  197.             HRESULT hr = pRequest->GetHResult();
  198.             if(hr == S_FALSE)
  199.             {
  200.                 // this means the actual length was less than
  201.                 // requested - may be ok if he aligned the end of file
  202.                 if((pRequest->GetActualLength() +
  203.                     pRequest->GetStart()) == Size())
  204.                 {
  205.                     hr = S_OK;
  206.                 }
  207.                 else
  208.                 {
  209.                     // it was an actual read error
  210.                     hr = E_FAIL;
  211.                 }
  212.             }
  213.             // return actual bytes read
  214.             *pcbActual = pRequest->GetActualLength();
  215.             // return his context
  216.             *ppContext = pRequest->GetContext();
  217.             *pdwUser = pRequest->GetUser();
  218.             delete pRequest;
  219.             return hr;
  220.         }
  221.         else
  222.         {
  223.             //  Hold the critical section while checking the list state
  224.             CAutoLock lck(&m_csLists);
  225.             if(m_bFlushing && !m_bWaiting)
  226.             {
  227.                 // can't block as we are between BeginFlush and EndFlush
  228.                 // but note that if m_bWaiting is set, then there are some
  229.                 // items not yet complete that we should block for.
  230.                 return VFW_E_WRONG_STATE;
  231.             }
  232.         }
  233.         // done item was grabbed between completion and
  234.         // us locking m_csLists.
  235.     }
  236. }
  237. // perform a synchronous read request on this thread.
  238. // Need to hold m_csFile while doing this (done in request object)
  239. HRESULT
  240. CAsyncIo::SyncReadAligned(
  241.                         LONGLONG llPos,
  242.                         LONG lLength,
  243.                         BYTE * pBuffer,
  244.                         LONG * pcbActual,
  245.                         PVOID pvContext)
  246. {
  247.     CheckPointer(pcbActual,E_POINTER);
  248.     if(!IsAligned(llPos) ||
  249.         !IsAligned(lLength) ||
  250.         !IsAligned((LONG) pBuffer))
  251.     {
  252.         return VFW_E_BADALIGN;
  253.     }
  254.     CAsyncRequest request;
  255.     HRESULT hr = request.Request(this,
  256.                                 m_pStream,
  257.                                 llPos,
  258.                                 lLength,
  259.                                 TRUE,
  260.                                 pBuffer,
  261.                                 pvContext,
  262.                                 0);
  263.     if(FAILED(hr))
  264.         return hr;
  265.     hr = request.Complete();
  266.     // return actual data length
  267.     *pcbActual = request.GetActualLength();
  268.     return hr;
  269. }
  270. HRESULT
  271. CAsyncIo::Length(LONGLONG *pllTotal, LONGLONG *pllAvailable)
  272. {
  273.     CheckPointer(pllTotal,E_POINTER);
  274.     *pllTotal = m_pStream->Size(pllAvailable);
  275.     return S_OK;
  276. }
  277. // cancel all items on the worklist onto the done list
  278. // and refuse further requests or further WaitForNext calls
  279. // until the end flush
  280. //
  281. // WaitForNext must return with NULL only if there are no successful requests.
  282. // So Flush does the following:
  283. // 1. set m_bFlushing ensures no more requests succeed
  284. // 2. move all items from work list to the done list.
  285. // 3. If there are any outstanding requests, then we need to release the
  286. //    critsec to allow them to complete. The m_bWaiting as well as ensuring
  287. //    that we are signalled when they are all done is also used to indicate
  288. //    to WaitForNext that it should continue to block.
  289. // 4. Once all outstanding requests are complete, we force m_evDone set and
  290. //    m_bFlushing set and m_bWaiting false. This ensures that WaitForNext will
  291. //    not block when the done list is empty.
  292. HRESULT
  293. CAsyncIo::BeginFlush()
  294. {
  295.     // hold the lock while emptying the work list
  296.     {
  297.         CAutoLock lock(&m_csLists);
  298.         // prevent further requests being queued.
  299.         // Also WaitForNext will refuse to block if this is set
  300.         // unless m_bWaiting is also set which it will be when we release
  301.         // the critsec if there are any outstanding).
  302.         m_bFlushing = TRUE;
  303.         CAsyncRequest * preq;
  304.         while((preq = GetWorkItem()) != 0)
  305.         {
  306.             preq->Cancel();
  307.             PutDoneItem(preq);
  308.         }
  309.         // now wait for any outstanding requests to complete
  310.         if(m_cItemsOut > 0)
  311.         {
  312.             // can be only one person waiting
  313.             ASSERT(!m_bWaiting);
  314.             // this tells the completion routine that we need to be
  315.             // signalled via m_evAllDone when all outstanding items are
  316.             // done. It also tells WaitForNext to continue blocking.
  317.             m_bWaiting = TRUE;
  318.         }
  319.         else
  320.         {
  321.             // all done
  322.             // force m_evDone set so that even if list is empty,
  323.             // WaitForNext will not block
  324.             // don't do this until we are sure that all
  325.             // requests are on the done list.
  326.             m_evDone.Set();
  327.             return S_OK;
  328.         }
  329.     }
  330.     ASSERT(m_bWaiting);
  331.     // wait without holding critsec
  332.     for(;;)
  333.     {
  334.         m_evAllDone.Wait();
  335.         {
  336.             // hold critsec to check
  337.             CAutoLock lock(&m_csLists);
  338.             if(m_cItemsOut == 0)
  339.             {
  340.                 // now we are sure that all outstanding requests are on
  341.                 // the done list and no more will be accepted
  342.                 m_bWaiting = FALSE;
  343.                 // force m_evDone set so that even if list is empty,
  344.                 // WaitForNext will not block
  345.                 // don't do this until we are sure that all
  346.                 // requests are on the done list.
  347.                 m_evDone.Set();
  348.                 return S_OK;
  349.             }
  350.         }
  351.     }
  352. }
  353. // end a flushing state
  354. HRESULT
  355. CAsyncIo::EndFlush()
  356. {
  357.     CAutoLock lock(&m_csLists);
  358.     m_bFlushing = FALSE;
  359.     ASSERT(!m_bWaiting);
  360.     // m_evDone might have been set by BeginFlush - ensure it is
  361.     // set IFF m_listDone is non-empty
  362.     if(m_listDone.GetCount() > 0)
  363.     {
  364.         m_evDone.Set();
  365.     }
  366.     else
  367.     {
  368.         m_evDone.Reset();
  369.     }
  370.     return S_OK;
  371. }
  372. // start the thread
  373. HRESULT
  374. CAsyncIo::StartThread(void)
  375. {
  376.     if(m_hThread)
  377.     {
  378.         return S_OK;
  379.     }
  380.     // clear the stop event before starting
  381.     m_evStop.Reset();
  382.     DWORD dwThreadID;
  383.     m_hThread = CreateThread(NULL,
  384.                             0,
  385.                             InitialThreadProc,
  386.                             this,
  387.                             0,
  388.                             &dwThreadID);
  389.     if(!m_hThread)
  390.     {
  391.         DWORD dwErr = GetLastError();
  392.         return HRESULT_FROM_WIN32(dwErr);
  393.     }
  394.     return S_OK;
  395. }
  396. // stop the thread and close the handle
  397. HRESULT
  398. CAsyncIo::CloseThread(void)
  399. {
  400.     // signal the thread-exit object
  401.     m_evStop.Set();
  402.     if(m_hThread)
  403.     {
  404.         WaitForSingleObject(m_hThread, INFINITE);
  405.         CloseHandle(m_hThread);
  406.         m_hThread = NULL;
  407.     }
  408.     return S_OK;
  409. }
  410. // manage the list of requests. hold m_csLists and ensure
  411. // that the (manual reset) event hevList is set when things on
  412. // the list but reset when the list is empty.
  413. // returns null if list empty
  414. CAsyncRequest*
  415. CAsyncIo::GetWorkItem()
  416. {
  417.     CAutoLock lck(&m_csLists);
  418.     CAsyncRequest * preq  = m_listWork.RemoveHead();
  419.     // force event set correctly
  420.     if(m_listWork.GetCount() == 0)
  421.     {
  422.         m_evWork.Reset();
  423.     }
  424.     return preq;
  425. }
  426. // get an item from the done list
  427. CAsyncRequest*
  428. CAsyncIo::GetDoneItem()
  429. {
  430.     CAutoLock lock(&m_csLists);
  431.     CAsyncRequest * preq  = m_listDone.RemoveHead();
  432.     // force event set correctly if list now empty
  433.     // or we're in the final stages of flushing
  434.     // Note that during flushing the way it's supposed to work is that
  435.     // everything is shoved on the Done list then the application is
  436.     // supposed to pull until it gets nothing more
  437.     //
  438.     // Thus we should not set m_evDone unconditionally until everything
  439.     // has moved to the done list which means we must wait until
  440.     // cItemsOut is 0 (which is guaranteed by m_bWaiting being TRUE).
  441.     if(m_listDone.GetCount() == 0 &&
  442.         (!m_bFlushing || m_bWaiting))
  443.     {
  444.         m_evDone.Reset();
  445.     }
  446.     return preq;
  447. }
  448. // put an item on the work list - fail if bFlushing
  449. HRESULT
  450. CAsyncIo::PutWorkItem(CAsyncRequest* pRequest)
  451. {
  452.     CAutoLock lock(&m_csLists);
  453.     HRESULT hr;
  454.     if(m_bFlushing)
  455.     {
  456.         hr = VFW_E_WRONG_STATE;
  457.     }
  458.     else if(m_listWork.AddTail(pRequest))
  459.     {
  460.         // event should now be in a set state - force this
  461.         m_evWork.Set();
  462.         // start the thread now if not already started
  463.         hr = StartThread();
  464.     }
  465.     else
  466.     {
  467.         hr = E_OUTOFMEMORY;
  468.     }
  469.     return(hr);
  470. }
  471. // put an item on the done list - ok to do this when
  472. // flushing
  473. HRESULT
  474. CAsyncIo::PutDoneItem(CAsyncRequest* pRequest)
  475. {
  476.     ASSERT(CritCheckIn(&m_csLists));
  477.     if(m_listDone.AddTail(pRequest))
  478.     {
  479.         // event should now be in a set state - force this
  480.         m_evDone.Set();
  481.         return S_OK;
  482.     }
  483.     else
  484.     {
  485.         return E_OUTOFMEMORY;
  486.     }
  487. }
  488. // called on thread to process any active requests
  489. void
  490. CAsyncIo::ProcessRequests(void)
  491. {
  492.     // lock to get the item and increment the outstanding count
  493.     CAsyncRequest * preq = NULL;
  494.     for(;;)
  495.     {
  496.         {
  497.             CAutoLock lock(&m_csLists);
  498.             preq = GetWorkItem();
  499.             if(preq == NULL)
  500.             {
  501.                 // done
  502.                 return;
  503.             }
  504.             // one more item not on the done or work list
  505.             m_cItemsOut++;
  506.             // release critsec
  507.         }
  508.         preq->Complete();
  509.         // regain critsec to replace on done list
  510.         {
  511.             CAutoLock l(&m_csLists);
  512.             PutDoneItem(preq);
  513.             if(--m_cItemsOut == 0)
  514.             {
  515.                 if(m_bWaiting)
  516.                     m_evAllDone.Set();
  517.             }
  518.         }
  519.     }
  520. }
  521. // the thread proc - assumes that DWORD thread param is the
  522. // this pointer
  523. DWORD
  524. CAsyncIo::ThreadProc(void)
  525. {
  526.     HANDLE ahev[] = {m_evStop, m_evWork};
  527.     for(;;)
  528.     {
  529.         DWORD dw = WaitForMultipleObjects(2,
  530.                                           ahev,
  531.                                           FALSE,
  532.                                           INFINITE);
  533.         if(dw == WAIT_OBJECT_0+1)
  534.         {
  535.             // requests need processing
  536.             ProcessRequests();
  537.         }
  538.         else
  539.         {
  540.             // any error or stop event - we should exit
  541.             return 0;
  542.         }
  543.     }
  544. }
  545. // perform a synchronous read request on this thread.
  546. // may not be aligned - so we will have to buffer.
  547. HRESULT
  548. CAsyncIo::SyncRead(
  549.                 LONGLONG llPos,
  550.                 LONG lLength,
  551.                 BYTE * pBuffer)
  552. {
  553.     if(IsAligned(llPos) &&
  554.         IsAligned(lLength) &&
  555.         IsAligned((LONG) pBuffer))
  556.     {
  557.         LONG cbUnused;
  558.         return SyncReadAligned(llPos, lLength, pBuffer, &cbUnused, NULL);
  559.     }
  560.     // not aligned with requirements - use buffered file handle.
  561.     //!!! might want to fix this to buffer the data ourselves?
  562.     CAsyncRequest request;
  563.     HRESULT hr = request.Request(this,
  564.                                 m_pStream,
  565.                                 llPos,
  566.                                 lLength,
  567.                                 FALSE,
  568.                                 pBuffer,
  569.                                 NULL,
  570.                                 0);
  571.     if(FAILED(hr))
  572.     {
  573.         return hr;
  574.     }
  575.     return request.Complete();
  576. }
  577. //  Return the alignment
  578. HRESULT
  579. CAsyncIo::Alignment(LONG *pAlignment)
  580. {
  581.     CheckPointer(pAlignment,E_POINTER);
  582.     *pAlignment = Alignment();
  583.     return S_OK;
  584. }