asyncio.cpp
上传用户:hhs829
上传日期:2022-06-17
资源大小:586k
文件大小:17k
源码类别:

DirextX编程

开发平台:

Visual C++

  1. //------------------------------------------------------------------------------
  2. // File: AsyncIo.cpp
  3. //
  4. // Desc: DirectShow sample code - base library with I/O functionality.
  5. //
  6. // Copyright (c) Microsoft Corporation.  All rights reserved.
  7. //------------------------------------------------------------------------------
  8. #include <streams.h>
  9. #include "asyncio.h"
  10. // --- CAsyncRequest ---
  11. // implementation of CAsyncRequest representing a single
  12. // outstanding request. All the i/o for this object is done
  13. // in the Complete method.
  14. // init the params for this request.
  15. // Read is not issued until the complete call
  16. HRESULT
  17. CAsyncRequest::Request(
  18.     CAsyncIo *pIo,
  19.     CAsyncStream *pStream,
  20.     LONGLONG llPos,
  21.     LONG lLength,
  22.     BOOL bAligned,
  23.     BYTE* pBuffer,
  24.     LPVOID pContext,    // filter's context
  25.     DWORD dwUser)   // downstream filter's context
  26. {
  27.     m_pIo = pIo;
  28.     m_pStream = pStream;
  29.     m_llPos = llPos;
  30.     m_lLength = lLength;
  31.     m_bAligned = bAligned;
  32.     m_pBuffer = pBuffer;
  33.     m_pContext = pContext;
  34.     m_dwUser = dwUser;
  35.     m_hr = VFW_E_TIMEOUT;   // not done yet
  36.     return S_OK;
  37. }
  38. // issue the i/o if not overlapped, and block until i/o complete.
  39. // returns error code of file i/o
  40. //
  41. //
  42. HRESULT
  43. CAsyncRequest::Complete()
  44. {
  45.     m_pStream->Lock();
  46.     m_hr = m_pStream->SetPointer(m_llPos);
  47.     if(S_OK == m_hr)
  48.     {
  49.         DWORD dwActual;
  50.         m_hr = m_pStream->Read(m_pBuffer, m_lLength, m_bAligned, &dwActual);
  51.         if(m_hr == OLE_S_FIRST)
  52.         {
  53.             if(m_pContext)
  54.             {
  55.                 IMediaSample *pSample = reinterpret_cast<IMediaSample *>(m_pContext);
  56.                 pSample->SetDiscontinuity(TRUE);
  57.                 m_hr = S_OK;
  58.             }
  59.         }
  60.         if(FAILED(m_hr))
  61.         {
  62.         }
  63.         else if(dwActual != (DWORD)m_lLength)
  64.         {
  65.             // tell caller size changed - probably because of EOF
  66.             m_lLength = (LONG) dwActual;
  67.             m_hr = S_FALSE;
  68.         }
  69.         else
  70.         {
  71.             m_hr = S_OK;
  72.         }
  73.     }
  74.     m_pStream->Unlock();
  75.     return m_hr;
  76. }
  77. // --- CAsyncIo ---
  78. // note - all events created manual reset
  79. CAsyncIo::CAsyncIo(CAsyncStream *pStream)
  80.          : m_hThread(NULL),
  81.            m_evWork(TRUE),
  82.            m_evDone(TRUE),
  83.            m_evStop(TRUE),
  84.            m_listWork(NAME("Work list")),
  85.            m_listDone(NAME("Done list")),
  86.            m_bFlushing(FALSE),
  87.            m_cItemsOut(0),
  88.            m_bWaiting(FALSE),
  89.            m_pStream(pStream)
  90. {
  91. }
  92. CAsyncIo::~CAsyncIo()
  93. {
  94.     // move everything to the done list
  95.     BeginFlush();
  96.     // shutdown worker thread
  97.     CloseThread();
  98.     // empty the done list
  99.     POSITION pos = m_listDone.GetHeadPosition();
  100.     while(pos)
  101.     {
  102.         CAsyncRequest* pRequest = m_listDone.GetNext(pos);
  103.         delete pRequest;
  104.     }
  105.     m_listDone.RemoveAll();
  106. }
  107. // ready for async activity - call this before calling Request.
  108. //
  109. // start the worker thread if we need to
  110. //
  111. // !!! use overlapped i/o if possible
  112. HRESULT
  113. CAsyncIo::AsyncActive(void)
  114. {
  115.     return StartThread();
  116. }
  117. // call this when no more async activity will happen before
  118. // the next AsyncActive call
  119. //
  120. // stop the worker thread if active
  121. HRESULT
  122. CAsyncIo::AsyncInactive(void)
  123. {
  124.     return CloseThread();
  125. }
  126. // add a request to the queue.
  127. HRESULT
  128. CAsyncIo::Request(
  129.                 LONGLONG llPos,
  130.                 LONG lLength,
  131.                 BOOL bAligned,
  132.                 BYTE * pBuffer,
  133.                 LPVOID pContext,
  134.                 DWORD dwUser)
  135. {
  136.     if(bAligned)
  137.     {
  138.         if(!IsAligned(llPos) ||
  139.             !IsAligned(lLength) ||
  140.             !IsAligned((LONG) pBuffer))
  141.         {
  142.             return VFW_E_BADALIGN;
  143.         }
  144.     }
  145.     CAsyncRequest* pRequest = new CAsyncRequest;
  146.     if (!pRequest)
  147.         return E_OUTOFMEMORY;
  148.     HRESULT hr = pRequest->Request(this,
  149.                                    m_pStream,
  150.                                    llPos,
  151.                                    lLength,
  152.                                    bAligned,
  153.                                    pBuffer,
  154.                                    pContext,
  155.                                    dwUser);
  156.     if(SUCCEEDED(hr))
  157.     {
  158.         // might fail if flushing
  159.         hr = PutWorkItem(pRequest);
  160.     }
  161.     if(FAILED(hr))
  162.     {
  163.         delete pRequest;
  164.     }
  165.     return hr;
  166. }
  167. // wait for the next request to complete
  168. HRESULT
  169. CAsyncIo::WaitForNext(
  170.     DWORD dwTimeout,
  171.     LPVOID * ppContext,
  172.     DWORD  * pdwUser,
  173.     LONG   * pcbActual)
  174. {
  175.     CheckPointer(ppContext,E_POINTER);
  176.     CheckPointer(pdwUser,E_POINTER);
  177.     CheckPointer(pcbActual,E_POINTER);
  178.     // some errors find a sample, others don't. Ensure that
  179.     // *ppContext is NULL if no sample found
  180.     *ppContext = NULL;
  181.     // wait until the event is set, but since we are not
  182.     // holding the critsec when waiting, we may need to re-wait
  183.     for(;;)
  184.     {
  185.         if(!m_evDone.Wait(dwTimeout))
  186.         {
  187.             // timeout occurred
  188.             return VFW_E_TIMEOUT;
  189.         }
  190.         // get next event from list
  191.         CAsyncRequest* pRequest = GetDoneItem();
  192.         if(pRequest)
  193.         {
  194.             // found a completed request
  195.             // check if ok
  196.             HRESULT hr = pRequest->GetHResult();
  197.             if(hr == S_FALSE)
  198.             {
  199.                 // this means the actual length was less than
  200.                 // requested - may be ok if he aligned the end of file
  201.                 if((pRequest->GetActualLength() +
  202.                     pRequest->GetStart()) == Size())
  203.                 {
  204.                     hr = S_OK;
  205.                 }
  206.                 else
  207.                 {
  208.                     // it was an actual read error
  209.                     hr = E_FAIL;
  210.                 }
  211.             }
  212.             // return actual bytes read
  213.             *pcbActual = pRequest->GetActualLength();
  214.             // return his context
  215.             *ppContext = pRequest->GetContext();
  216.             *pdwUser = pRequest->GetUser();
  217.             delete pRequest;
  218.             return hr;
  219.         }
  220.         else
  221.         {
  222.             //  Hold the critical section while checking the list state
  223.             CAutoLock lck(&m_csLists);
  224.             if(m_bFlushing && !m_bWaiting)
  225.             {
  226.                 // can't block as we are between BeginFlush and EndFlush
  227.                 // but note that if m_bWaiting is set, then there are some
  228.                 // items not yet complete that we should block for.
  229.                 return VFW_E_WRONG_STATE;
  230.             }
  231.         }
  232.         // done item was grabbed between completion and
  233.         // us locking m_csLists.
  234.     }
  235. }
  236. // perform a synchronous read request on this thread.
  237. // Need to hold m_csFile while doing this (done in request object)
  238. HRESULT
  239. CAsyncIo::SyncReadAligned(
  240.                         LONGLONG llPos,
  241.                         LONG lLength,
  242.                         BYTE * pBuffer,
  243.                         LONG * pcbActual,
  244.                         PVOID pvContext)
  245. {
  246.     CheckPointer(pcbActual,E_POINTER);
  247.     if(!IsAligned(llPos) ||
  248.         !IsAligned(lLength) ||
  249.         !IsAligned((LONG) pBuffer))
  250.     {
  251.         return VFW_E_BADALIGN;
  252.     }
  253.     CAsyncRequest request;
  254.     HRESULT hr = request.Request(this,
  255.                                 m_pStream,
  256.                                 llPos,
  257.                                 lLength,
  258.                                 TRUE,
  259.                                 pBuffer,
  260.                                 pvContext,
  261.                                 0);
  262.     if(FAILED(hr))
  263.         return hr;
  264.     hr = request.Complete();
  265.     // return actual data length
  266.     *pcbActual = request.GetActualLength();
  267.     return hr;
  268. }
  269. HRESULT
  270. CAsyncIo::Length(LONGLONG *pllTotal, LONGLONG *pllAvailable)
  271. {
  272.     CheckPointer(pllTotal,E_POINTER);
  273.     *pllTotal = m_pStream->Size(pllAvailable);
  274.     return S_OK;
  275. }
  276. // cancel all items on the worklist onto the done list
  277. // and refuse further requests or further WaitForNext calls
  278. // until the end flush
  279. //
  280. // WaitForNext must return with NULL only if there are no successful requests.
  281. // So Flush does the following:
  282. // 1. set m_bFlushing ensures no more requests succeed
  283. // 2. move all items from work list to the done list.
  284. // 3. If there are any outstanding requests, then we need to release the
  285. //    critsec to allow them to complete. The m_bWaiting as well as ensuring
  286. //    that we are signalled when they are all done is also used to indicate
  287. //    to WaitForNext that it should continue to block.
  288. // 4. Once all outstanding requests are complete, we force m_evDone set and
  289. //    m_bFlushing set and m_bWaiting false. This ensures that WaitForNext will
  290. //    not block when the done list is empty.
  291. HRESULT
  292. CAsyncIo::BeginFlush()
  293. {
  294.     // hold the lock while emptying the work list
  295.     {
  296.         CAutoLock lock(&m_csLists);
  297.         // prevent further requests being queued.
  298.         // Also WaitForNext will refuse to block if this is set
  299.         // unless m_bWaiting is also set which it will be when we release
  300.         // the critsec if there are any outstanding).
  301.         m_bFlushing = TRUE;
  302.         CAsyncRequest * preq;
  303.         while((preq = GetWorkItem()) != 0)
  304.         {
  305.             preq->Cancel();
  306.             PutDoneItem(preq);
  307.         }
  308.         // now wait for any outstanding requests to complete
  309.         if(m_cItemsOut > 0)
  310.         {
  311.             // can be only one person waiting
  312.             ASSERT(!m_bWaiting);
  313.             // this tells the completion routine that we need to be
  314.             // signalled via m_evAllDone when all outstanding items are
  315.             // done. It also tells WaitForNext to continue blocking.
  316.             m_bWaiting = TRUE;
  317.         }
  318.         else
  319.         {
  320.             // all done
  321.             // force m_evDone set so that even if list is empty,
  322.             // WaitForNext will not block
  323.             // don't do this until we are sure that all
  324.             // requests are on the done list.
  325.             m_evDone.Set();
  326.             return S_OK;
  327.         }
  328.     }
  329.     ASSERT(m_bWaiting);
  330.     // wait without holding critsec
  331.     for(;;)
  332.     {
  333.         m_evAllDone.Wait();
  334.         {
  335.             // hold critsec to check
  336.             CAutoLock lock(&m_csLists);
  337.             if(m_cItemsOut == 0)
  338.             {
  339.                 // now we are sure that all outstanding requests are on
  340.                 // the done list and no more will be accepted
  341.                 m_bWaiting = FALSE;
  342.                 // force m_evDone set so that even if list is empty,
  343.                 // WaitForNext will not block
  344.                 // don't do this until we are sure that all
  345.                 // requests are on the done list.
  346.                 m_evDone.Set();
  347.                 return S_OK;
  348.             }
  349.         }
  350.     }
  351. }
  352. // end a flushing state
  353. HRESULT
  354. CAsyncIo::EndFlush()
  355. {
  356.     CAutoLock lock(&m_csLists);
  357.     m_bFlushing = FALSE;
  358.     ASSERT(!m_bWaiting);
  359.     // m_evDone might have been set by BeginFlush - ensure it is
  360.     // set IFF m_listDone is non-empty
  361.     if(m_listDone.GetCount() > 0)
  362.     {
  363.         m_evDone.Set();
  364.     }
  365.     else
  366.     {
  367.         m_evDone.Reset();
  368.     }
  369.     return S_OK;
  370. }
  371. // start the thread
  372. HRESULT
  373. CAsyncIo::StartThread(void)
  374. {
  375.     if(m_hThread)
  376.     {
  377.         return S_OK;
  378.     }
  379.     // clear the stop event before starting
  380.     m_evStop.Reset();
  381.     DWORD dwThreadID;
  382.     m_hThread = CreateThread(NULL,
  383.                             0,
  384.                             InitialThreadProc,
  385.                             this,
  386.                             0,
  387.                             &dwThreadID);
  388.     if(!m_hThread)
  389.     {
  390.         DWORD dwErr = GetLastError();
  391.         return HRESULT_FROM_WIN32(dwErr);
  392.     }
  393.     return S_OK;
  394. }
  395. // stop the thread and close the handle
  396. HRESULT
  397. CAsyncIo::CloseThread(void)
  398. {
  399.     // signal the thread-exit object
  400.     m_evStop.Set();
  401.     if(m_hThread)
  402.     {
  403.         WaitForSingleObject(m_hThread, INFINITE);
  404.         CloseHandle(m_hThread);
  405.         m_hThread = NULL;
  406.     }
  407.     return S_OK;
  408. }
  409. // manage the list of requests. hold m_csLists and ensure
  410. // that the (manual reset) event hevList is set when things on
  411. // the list but reset when the list is empty.
  412. // returns null if list empty
  413. CAsyncRequest*
  414. CAsyncIo::GetWorkItem()
  415. {
  416.     CAutoLock lck(&m_csLists);
  417.     CAsyncRequest * preq  = m_listWork.RemoveHead();
  418.     // force event set correctly
  419.     if(m_listWork.GetCount() == 0)
  420.     {
  421.         m_evWork.Reset();
  422.     }
  423.     return preq;
  424. }
  425. // get an item from the done list
  426. CAsyncRequest*
  427. CAsyncIo::GetDoneItem()
  428. {
  429.     CAutoLock lock(&m_csLists);
  430.     CAsyncRequest * preq  = m_listDone.RemoveHead();
  431.     // force event set correctly if list now empty
  432.     // or we're in the final stages of flushing
  433.     // Note that during flushing the way it's supposed to work is that
  434.     // everything is shoved on the Done list then the application is
  435.     // supposed to pull until it gets nothing more
  436.     //
  437.     // Thus we should not set m_evDone unconditionally until everything
  438.     // has moved to the done list which means we must wait until
  439.     // cItemsOut is 0 (which is guaranteed by m_bWaiting being TRUE).
  440.     if(m_listDone.GetCount() == 0 &&
  441.         (!m_bFlushing || m_bWaiting))
  442.     {
  443.         m_evDone.Reset();
  444.     }
  445.     return preq;
  446. }
  447. // put an item on the work list - fail if bFlushing
  448. HRESULT
  449. CAsyncIo::PutWorkItem(CAsyncRequest* pRequest)
  450. {
  451.     CAutoLock lock(&m_csLists);
  452.     HRESULT hr;
  453.     if(m_bFlushing)
  454.     {
  455.         hr = VFW_E_WRONG_STATE;
  456.     }
  457.     else if(m_listWork.AddTail(pRequest))
  458.     {
  459.         // event should now be in a set state - force this
  460.         m_evWork.Set();
  461.         // start the thread now if not already started
  462.         hr = StartThread();
  463.     }
  464.     else
  465.     {
  466.         hr = E_OUTOFMEMORY;
  467.     }
  468.     return(hr);
  469. }
  470. // put an item on the done list - ok to do this when
  471. // flushing
  472. HRESULT
  473. CAsyncIo::PutDoneItem(CAsyncRequest* pRequest)
  474. {
  475.     ASSERT(CritCheckIn(&m_csLists));
  476.     if(m_listDone.AddTail(pRequest))
  477.     {
  478.         // event should now be in a set state - force this
  479.         m_evDone.Set();
  480.         return S_OK;
  481.     }
  482.     else
  483.     {
  484.         return E_OUTOFMEMORY;
  485.     }
  486. }
  487. // called on thread to process any active requests
  488. void
  489. CAsyncIo::ProcessRequests(void)
  490. {
  491.     // lock to get the item and increment the outstanding count
  492.     CAsyncRequest * preq = NULL;
  493.     for(;;)
  494.     {
  495.         {
  496.             CAutoLock lock(&m_csLists);
  497.             preq = GetWorkItem();
  498.             if(preq == NULL)
  499.             {
  500.                 // done
  501.                 return;
  502.             }
  503.             // one more item not on the done or work list
  504.             m_cItemsOut++;
  505.             // release critsec
  506.         }
  507.         preq->Complete();
  508.         // regain critsec to replace on done list
  509.         {
  510.             CAutoLock l(&m_csLists);
  511.             PutDoneItem(preq);
  512.             if(--m_cItemsOut == 0)
  513.             {
  514.                 if(m_bWaiting)
  515.                     m_evAllDone.Set();
  516.             }
  517.         }
  518.     }
  519. }
  520. // the thread proc - assumes that DWORD thread param is the
  521. // this pointer
  522. DWORD
  523. CAsyncIo::ThreadProc(void)
  524. {
  525.     HANDLE ahev[] = {m_evStop, m_evWork};
  526.     for(;;)
  527.     {
  528.         DWORD dw = WaitForMultipleObjects(2,
  529.                                           ahev,
  530.                                           FALSE,
  531.                                           INFINITE);
  532.         if(dw == WAIT_OBJECT_0+1)
  533.         {
  534.             // requests need processing
  535.             ProcessRequests();
  536.         }
  537.         else
  538.         {
  539.             // any error or stop event - we should exit
  540.             return 0;
  541.         }
  542.     }
  543. }
  544. // perform a synchronous read request on this thread.
  545. // may not be aligned - so we will have to buffer.
  546. HRESULT
  547. CAsyncIo::SyncRead(
  548.                 LONGLONG llPos,
  549.                 LONG lLength,
  550.                 BYTE * pBuffer)
  551. {
  552.     if(IsAligned(llPos) &&
  553.         IsAligned(lLength) &&
  554.         IsAligned((LONG) pBuffer))
  555.     {
  556.         LONG cbUnused;
  557.         return SyncReadAligned(llPos, lLength, pBuffer, &cbUnused, NULL);
  558.     }
  559.     // not aligned with requirements - use buffered file handle.
  560.     //!!! might want to fix this to buffer the data ourselves?
  561.     CAsyncRequest request;
  562.     HRESULT hr = request.Request(this,
  563.                                 m_pStream,
  564.                                 llPos,
  565.                                 lLength,
  566.                                 FALSE,
  567.                                 pBuffer,
  568.                                 NULL,
  569.                                 0);
  570.     if(FAILED(hr))
  571.     {
  572.         return hr;
  573.     }
  574.     return request.Complete();
  575. }
  576. //  Return the alignment
  577. HRESULT
  578. CAsyncIo::Alignment(LONG *pAlignment)
  579. {
  580.     CheckPointer(pAlignment,E_POINTER);
  581.     *pAlignment = Alignment();
  582.     return S_OK;
  583. }