asyncio.cpp
上传用户:hxb_1234
上传日期:2010-03-30
资源大小:8328k
文件大小:16k
源码类别:

VC书籍

开发平台:

Visual C++

  1. //==========================================================================;
  2. //
  3. //  THIS CODE AND INFORMATION IS PROVIDED "AS IS" WITHOUT WARRANTY OF ANY
  4. //  KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
  5. //  IMPLIED WARRANTIES OF MERCHANTABILITY AND/OR FITNESS FOR A PARTICULAR
  6. //  PURPOSE.
  7. //
  8. //  Copyright (c) 1992 - 1997  Microsoft Corporation.  All Rights Reserved.
  9. //
  10. //--------------------------------------------------------------------------;
  11.  
  12. #include "stdafx.h"
  13. #include <streams.h>
  14. #include "asyncio.h"
  15. // --- CAsyncRequest ---
  16. // implementation of CAsyncRequest representing a single
  17. // outstanding request. All the i/o for this object is done
  18. // in the Complete method.
  19. // init the params for this request.
  20. // Read is not issued until the complete call
  21. HRESULT
  22. CAsyncRequest::Request(
  23.     CAsyncIo *pIo,
  24.     CAsyncStream *pStream,
  25.     LONGLONG llPos,
  26.     LONG lLength,
  27.     BOOL bAligned,
  28.     BYTE* pBuffer,
  29.     LPVOID pContext, // filter's context
  30.     DWORD dwUser) // downstream filter's context
  31. {
  32.     m_pIo = pIo;
  33.     m_pStream = pStream;
  34.     m_llPos = llPos;
  35.     m_lLength = lLength;
  36.     m_bAligned = bAligned;
  37.     m_pBuffer = pBuffer;
  38.     m_pContext = pContext;
  39.     m_dwUser = dwUser;
  40.     m_hr = VFW_E_TIMEOUT;   // not done yet
  41.     return S_OK;
  42. }
  43. // issue the i/o if not overlapped, and block until i/o complete.
  44. // returns error code of file i/o
  45. //
  46. //
  47. HRESULT
  48. CAsyncRequest::Complete()
  49. {
  50.     m_pStream->Lock();
  51.     m_hr = m_pStream->SetPointer(m_llPos);
  52.     if (S_OK == m_hr) {
  53.         DWORD dwActual;
  54.         m_hr = m_pStream->Read(m_pBuffer, m_lLength, m_bAligned, &dwActual);
  55.         if (FAILED(m_hr)) {
  56.         } else if (dwActual != (DWORD)m_lLength) {
  57.             // tell caller size changed - probably because of EOF
  58.             m_lLength = (LONG) dwActual;
  59.             m_hr = S_FALSE;
  60.         } else {
  61.             m_hr = S_OK;
  62.         }
  63.     }
  64.     m_pStream->Unlock();
  65.     return m_hr;
  66. }
  67. // --- CAsyncIo ---
  68. // note - all events created manual reset
  69. CAsyncIo::CAsyncIo(CAsyncStream *pStream)
  70.  : m_hThread(NULL),
  71.    m_evWork(TRUE),
  72.    m_evDone(TRUE),
  73.    m_evStop(TRUE),
  74.    m_listWork(NAME("Work list")),
  75.    m_listDone(NAME("Done list")),
  76.    m_bFlushing(FALSE),
  77.    m_cItemsOut(0),
  78.    m_bWaiting(FALSE),
  79.    m_pStream(pStream)
  80. {
  81. }
  82. CAsyncIo::~CAsyncIo()
  83. {
  84.     // move everything to the done list
  85.     BeginFlush();
  86.     // shutdown worker thread
  87.     CloseThread();
  88.     // empty the done list
  89.     POSITION pos = m_listDone.GetHeadPosition();
  90.     while (pos) {
  91.         CAsyncRequest* pRequest = m_listDone.GetNext(pos);
  92.         delete pRequest;
  93.     }
  94.     m_listDone.RemoveAll();
  95. }
  96. // ready for async activity - call this before
  97. // calling Request.
  98. //
  99. // start the worker thread if we need to
  100. //
  101. // !!! use overlapped i/o if possible
  102. HRESULT
  103. CAsyncIo::AsyncActive(void)
  104. {
  105.     return StartThread();
  106. }
  107. // call this when no more async activity will happen before
  108. // the next AsyncActive call
  109. //
  110. // stop the worker thread if active
  111. HRESULT
  112. CAsyncIo::AsyncInactive(void)
  113. {
  114.     return CloseThread();
  115. }
  116. // add a request to the queue.
  117. HRESULT
  118. CAsyncIo::Request(
  119.             LONGLONG llPos,
  120.             LONG lLength,
  121.             BOOL bAligned,
  122.             BYTE* pBuffer,
  123.             LPVOID pContext,
  124.             DWORD dwUser)
  125. {
  126.     if (bAligned) {
  127.         if (!IsAligned(llPos) ||
  128.      !IsAligned(lLength) ||
  129.      !IsAligned((LONG) pBuffer)) {
  130.             return VFW_E_BADALIGN;
  131.         }
  132.     }
  133.     CAsyncRequest* pRequest = new CAsyncRequest;
  134.     HRESULT hr = pRequest->Request(
  135.                             this,
  136.                             m_pStream,
  137.                             llPos,
  138.                             lLength,
  139.                             bAligned,
  140.                             pBuffer,
  141.                             pContext,
  142.                             dwUser);
  143.     if (SUCCEEDED(hr)) {
  144.         // might fail if flushing
  145.         hr = PutWorkItem(pRequest);
  146.     }
  147.     if (FAILED(hr)) {
  148.         delete pRequest;
  149.     }
  150.     return hr;
  151. }
  152. // wait for the next request to complete
  153. HRESULT
  154. CAsyncIo::WaitForNext(
  155.     DWORD dwTimeout,
  156.     LPVOID *ppContext,
  157.     DWORD * pdwUser,
  158.     LONG* pcbActual)
  159. {
  160.     // some errors find a sample, others don't. Ensure that
  161.     // *ppContext is NULL if no sample found
  162.     *ppContext = NULL;
  163.     // wait until the event is set, but since we are not
  164.     // holding the critsec when waiting, we may need to re-wait
  165.     for (;;) {
  166.         if (!m_evDone.Wait(dwTimeout)) {
  167.             // timeout occurred
  168.             return VFW_E_TIMEOUT;
  169.         }
  170.         // get next event from list
  171.         CAsyncRequest* pRequest = GetDoneItem();
  172.         if (pRequest) {
  173.             // found a completed request
  174.             // check if ok
  175.             HRESULT hr = pRequest->GetHResult();
  176.             if (hr == S_FALSE) {
  177.                 // this means the actual length was less than
  178.                 // requested - may be ok if he aligned the end of file
  179.                 if ((pRequest->GetActualLength() +
  180.                      pRequest->GetStart()) == Size()) {
  181.                         hr = S_OK;
  182.                 } else {
  183.                     // it was an actual read error
  184.                     hr = E_FAIL;
  185.                 }
  186.             }
  187.             // return actual bytes read
  188.             *pcbActual = pRequest->GetActualLength();
  189.             // return his context
  190.             *ppContext = pRequest->GetContext();
  191.             *pdwUser = pRequest->GetUser();
  192.             delete pRequest;
  193.             return hr;
  194.         } else {
  195.             //  Hold the critical section while checking the
  196.             //  list state
  197.             CAutoLock lck(&m_csLists);
  198.             if (m_bFlushing && !m_bWaiting) {
  199.                 // can't block as we are between BeginFlush and EndFlush
  200.                 // but note that if m_bWaiting is set, then there are some
  201.                 // items not yet complete that we should block for.
  202.                 return VFW_E_WRONG_STATE;
  203.             }
  204.         }
  205.         // done item was grabbed between completion and
  206.         // us locking m_csLists.
  207.     }
  208. }
  209. // perform a synchronous read request on this thread.
  210. // Need to hold m_csFile while doing this (done in
  211. // request object)
  212. HRESULT
  213. CAsyncIo::SyncReadAligned(
  214.             LONGLONG llPos,
  215.             LONG lLength,
  216.             BYTE* pBuffer,
  217.             LONG* pcbActual
  218.             )
  219. {
  220.     if (!IsAligned(llPos) ||
  221. !IsAligned(lLength) ||
  222. !IsAligned((LONG) pBuffer)) {
  223.             return VFW_E_BADALIGN;
  224.     }
  225.     CAsyncRequest request;
  226.     HRESULT hr = request.Request(
  227.                     this,
  228.                     m_pStream,
  229.                     llPos,
  230.                     lLength,
  231.                     TRUE,
  232.                     pBuffer,
  233.                     NULL,
  234.                     0);
  235.     if (FAILED(hr)) {
  236.         return hr;
  237.     }
  238.     hr = request.Complete();
  239.     // return actual data length
  240.     *pcbActual = request.GetActualLength();
  241.     return hr;
  242. }
  243. HRESULT
  244. CAsyncIo::Length(LONGLONG *pllTotal, LONGLONG* pllAvailable)
  245. {
  246.     *pllTotal = m_pStream->Size(pllAvailable);
  247.     return S_OK;
  248. }
  249. // cancel all items on the worklist onto the done list
  250. // and refuse further requests or further WaitForNext calls
  251. // until the end flush
  252. //
  253. // WaitForNext must return with NULL only if there are no successful requests.
  254. // So Flush does the following:
  255. // 1. set m_bFlushing ensures no more requests succeed
  256. // 2. move all items from work list to the done list.
  257. // 3. If there are any outstanding requests, then we need to release the
  258. //    critsec to allow them to complete. The m_bWaiting as well as ensuring
  259. //    that we are signalled when they are all done is also used to indicate
  260. //    to WaitForNext that it should continue to block.
  261. // 4. Once all outstanding requests are complete, we force m_evDone set and
  262. //    m_bFlushing set and m_bWaiting false. This ensures that WaitForNext will
  263. //    not block when the done list is empty.
  264. HRESULT
  265. CAsyncIo::BeginFlush()
  266. {
  267.     // hold the lock while emptying the work list
  268.     {
  269.         CAutoLock lock(&m_csLists);
  270.         // prevent further requests being queued.
  271.         // Also WaitForNext will refuse to block if this is set
  272.         // unless m_bWaiting is also set which it will be when we release
  273.         // the critsec if there are any outstanding).
  274.         m_bFlushing = TRUE;
  275.         CAsyncRequest * preq;
  276.         while(preq = GetWorkItem()) {
  277.             preq->Cancel();
  278.             PutDoneItem(preq);
  279.         }
  280.         // now wait for any outstanding requests to complete
  281.         if (m_cItemsOut > 0) {
  282.             // can be only one person waiting
  283.             ASSERT(!m_bWaiting);
  284.             // this tells the completion routine that we need to be
  285.             // signalled via m_evAllDone when all outstanding items are
  286.             // done. It also tells WaitForNext to continue blocking.
  287.             m_bWaiting = TRUE;
  288.         } else {
  289.             // all done
  290.             // force m_evDone set so that even if list is empty,
  291.             // WaitForNext will not block
  292.             // don't do this until we are sure that all
  293.             // requests are on the done list.
  294.             m_evDone.Set();
  295.             return S_OK;
  296.         }
  297.     }
  298.     ASSERT(m_bWaiting);
  299.     // wait without holding critsec
  300.     for (;;) {
  301.         m_evAllDone.Wait();
  302.         {
  303.             // hold critsec to check
  304.             CAutoLock lock(&m_csLists);
  305.             if (m_cItemsOut == 0) {
  306.                 // now we are sure that all outstanding requests are on
  307.                 // the done list and no more will be accepted
  308.                 m_bWaiting = FALSE;
  309.                 // force m_evDone set so that even if list is empty,
  310.                 // WaitForNext will not block
  311.                 // don't do this until we are sure that all
  312.                 // requests are on the done list.
  313.                 m_evDone.Set();
  314.                 return S_OK;
  315.             }
  316.         }
  317.     }
  318. }
  319. // end a flushing state
  320. HRESULT
  321. CAsyncIo::EndFlush()
  322. {
  323.     CAutoLock lock(&m_csLists);
  324.     m_bFlushing = FALSE;
  325.     ASSERT(!m_bWaiting);
  326.     // m_evDone might have been set by BeginFlush - ensure it is
  327.     // set IFF m_listDone is non-empty
  328.     if (m_listDone.GetCount() > 0) {
  329.         m_evDone.Set();
  330.     } else {
  331.         m_evDone.Reset();
  332.     }
  333.     return S_OK;
  334. }
  335. // start the thread
  336. HRESULT
  337. CAsyncIo::StartThread(void)
  338. {
  339.     if (m_hThread) {
  340.         return S_OK;
  341.     }
  342.     // clear the stop event before starting
  343.     m_evStop.Reset();
  344.     DWORD dwThreadID;
  345.     m_hThread = CreateThread(
  346.                     NULL,
  347.                     0,
  348.                     (unsigned long (__stdcall *)(void *))InitialThreadProc,
  349.                     this,
  350.                     0,
  351.                     &dwThreadID);
  352.     if (!m_hThread) {
  353. DWORD dwErr = GetLastError();
  354.         return HRESULT_FROM_WIN32(dwErr);
  355.     }
  356.     return S_OK;
  357. }
  358. // stop the thread and close the handle
  359. HRESULT
  360. CAsyncIo::CloseThread(void)
  361. {
  362.     // signal the thread-exit object
  363.     m_evStop.Set();
  364.     if (m_hThread) {
  365.         WaitForSingleObject(m_hThread, INFINITE);
  366.         CloseHandle(m_hThread);
  367.         m_hThread = NULL;
  368.     }
  369.     return S_OK;
  370. }
  371. // manage the list of requests. hold m_csLists and ensure
  372. // that the (manual reset) event hevList is set when things on
  373. // the list but reset when the list is empty.
  374. // returns null if list empty
  375. CAsyncRequest*
  376. CAsyncIo::GetWorkItem()
  377. {
  378.     CAutoLock lck(&m_csLists);
  379.     CAsyncRequest * preq  = m_listWork.RemoveHead();
  380.     // force event set correctly
  381.     if (m_listWork.GetCount() == 0) {
  382.         m_evWork.Reset();
  383.     }
  384.     return preq;
  385. }
  386. // get an item from the done list
  387. CAsyncRequest*
  388. CAsyncIo::GetDoneItem()
  389. {
  390.     CAutoLock lock(&m_csLists);
  391.     CAsyncRequest * preq  = m_listDone.RemoveHead();
  392.     // force event set correctly if list now empty
  393.     // or we're in the final stages of flushing
  394.     // Note that during flushing the way it's supposed to work is that
  395.     // everything is shoved on the Done list then the application is
  396.     // supposed to suck until it gets nothing more
  397.     //
  398.     // Thus we should not set m_evDone unconditionally until everything
  399.     // has moved to the done list which means we must wait until
  400.     // cItemsOut is 0 (which is guaranteed by m_bWaiting being TRUE).
  401.     if (m_listDone.GetCount() == 0 &&
  402.         (!m_bFlushing || m_bWaiting)) {
  403.         m_evDone.Reset();
  404.     }
  405.     return preq;
  406. }
  407. // put an item on the work list - fail if bFlushing
  408. HRESULT
  409. CAsyncIo::PutWorkItem(CAsyncRequest* pRequest)
  410. {
  411.     CAutoLock lock(&m_csLists);
  412.     HRESULT hr;
  413.     if (m_bFlushing) {
  414.         hr = VFW_E_WRONG_STATE;
  415.     }
  416.     else if (m_listWork.AddTail(pRequest)) {
  417.         // event should now be in a set state - force this
  418.         m_evWork.Set();
  419.         // start the thread now if not already started
  420.         hr = StartThread();
  421.     } else {
  422.         hr = E_OUTOFMEMORY;
  423.     }
  424.     return(hr);
  425. }
  426. // put an item on the done list - ok to do this when
  427. // flushing
  428. HRESULT
  429. CAsyncIo::PutDoneItem(CAsyncRequest* pRequest)
  430. {
  431.     ASSERT(CritCheckIn(&m_csLists));
  432.     if (m_listDone.AddTail(pRequest)) {
  433.         // event should now be in a set state - force this
  434.         m_evDone.Set();
  435.         return S_OK;
  436.     } else {
  437.         return E_OUTOFMEMORY;
  438.     }
  439. }
  440. // called on thread to process any active requests
  441. void
  442. CAsyncIo::ProcessRequests(void)
  443. {
  444.     // lock to get the item and increment the outstanding count
  445.     CAsyncRequest * preq = NULL;
  446.     for (;;) {
  447.         {
  448.             CAutoLock lock(&m_csLists);
  449.             preq = GetWorkItem();
  450.             if (preq == NULL) {
  451.                 // done
  452.                 return;
  453.             }
  454.             // one more item not on the done or work list
  455.             m_cItemsOut++;
  456.             // release critsec
  457.         }
  458.         preq->Complete();
  459.         // regain critsec to replace on done list
  460.         {
  461.             CAutoLock l(&m_csLists);
  462.             PutDoneItem(preq);
  463.             if (--m_cItemsOut == 0) {
  464.                 if (m_bWaiting) {
  465.                     m_evAllDone.Set();
  466.                 }
  467.             }
  468.         }
  469.     }
  470. }
  471. // the thread proc - assumes that DWORD thread param is the
  472. // this pointer
  473. DWORD
  474. CAsyncIo::ThreadProc(void)
  475. {
  476.     HANDLE ahev[] = {m_evStop, m_evWork};
  477.     for (;;) {
  478. DWORD dw = WaitForMultipleObjects(
  479.     2,
  480.     ahev,
  481.     FALSE,
  482.     INFINITE);
  483. if (dw == WAIT_OBJECT_0+1) {
  484.     // requests need processing
  485.     ProcessRequests();
  486. } else {
  487.     // any error or stop event - we should exit
  488.     return 0;
  489. }
  490.     }
  491. }
  492. // perform a synchronous read request on this thread.
  493. // may not be aligned - so we will have to buffer.
  494. HRESULT
  495. CAsyncIo::SyncRead(
  496.             LONGLONG llPos,
  497.             LONG lLength,
  498.             BYTE* pBuffer)
  499. {
  500.     if (IsAligned(llPos) &&
  501. IsAligned(lLength) &&
  502. IsAligned((LONG) pBuffer)) {
  503.             LONG cbUnused;
  504.     return SyncReadAligned(llPos, lLength, pBuffer, &cbUnused);
  505.     }
  506.     // not aligned with requirements - use buffered file handle.
  507.     //!!! might want to fix this to buffer the data ourselves?
  508.     CAsyncRequest request;
  509.     HRESULT hr = request.Request(
  510.                     this,
  511.                     m_pStream,
  512.                     llPos,
  513.                     lLength,
  514.                     FALSE,
  515.                     pBuffer,
  516.                     NULL,
  517.                     0);
  518.     if (FAILED(hr)) {
  519.         return hr;
  520.     }
  521.     return request.Complete();
  522. }
  523. //  Return the alignment
  524. HRESULT
  525. CAsyncIo::Alignment(LONG *pl)
  526. {
  527.     *pl = Alignment();
  528.     return S_OK;
  529. }