outputq.cpp
上传用户:liguizhu
上传日期:2015-11-01
资源大小:2422k
文件大小:23k
源码类别:

P2P编程

开发平台:

Visual C++

  1. //------------------------------------------------------------------------------
  2. // File: OutputQ.cpp
  3. //
  4. // Desc: DirectShow base classes - implements COutputQueue class used by an
  5. //       output pin which may sometimes want to queue output samples on a
  6. //       separate thread and sometimes call Receive() directly on the input
  7. //       pin.
  8. //
  9. // Copyright (c) Microsoft Corporation.  All rights reserved.
  10. //------------------------------------------------------------------------------
  11. #include <streams.h>
  12. //
  13. //  COutputQueue Constructor :
  14. //
  15. //  Determines if a thread is to be created and creates resources
  16. //
  17. //     pInputPin  - the downstream input pin we're queueing samples to
  18. //
  19. //     phr        - changed to a failure code if this function fails
  20. //                  (otherwise unchanges)
  21. //
  22. //     bAuto      - Ask pInputPin if it can block in Receive by calling
  23. //                  its ReceiveCanBlock method and create a thread if
  24. //                  it can block, otherwise not.
  25. //
  26. //     bQueue     - if bAuto == FALSE then we create a thread if and only
  27. //                  if bQueue == TRUE
  28. //
  29. //     lBatchSize - work in batches of lBatchSize
  30. //
  31. //     bBatchEact - Use exact batch sizes so don't send until the
  32. //                  batch is full or SendAnyway() is called
  33. //
  34. //     lListSize  - If we create a thread make the list of samples queued
  35. //                  to the thread have this size cache
  36. //
  37. //     dwPriority - If we create a thread set its priority to this
  38. //
  39. COutputQueue::COutputQueue(
  40.              IPin         *pInputPin,          //  Pin to send stuff to
  41.              HRESULT      *phr,                //  'Return code'
  42.              BOOL          bAuto,              //  Ask pin if queue or not
  43.              BOOL          bQueue,             //  Send through queue
  44.              LONG          lBatchSize,         //  Batch
  45.              BOOL          bBatchExact,        //  Batch exactly to BatchSize
  46.              LONG          lListSize,
  47.              DWORD         dwPriority,
  48.              bool          bFlushingOpt        // flushing optimization
  49.             ) : m_lBatchSize(lBatchSize),
  50.                 m_bBatchExact(bBatchExact && (lBatchSize > 1)),
  51.                 m_hThread(NULL),
  52.                 m_hSem(NULL),
  53.                 m_List(NULL),
  54.                 m_pPin(pInputPin),
  55.                 m_ppSamples(NULL),
  56.                 m_lWaiting(0),
  57.                 m_pInputPin(NULL),
  58.                 m_bSendAnyway(FALSE),
  59.                 m_nBatched(0),
  60.                 m_bFlushing(FALSE),
  61.                 m_bFlushed(TRUE),
  62.                 m_bFlushingOpt(bFlushingOpt),
  63.                 m_bTerminate(FALSE),
  64.                 m_hEventPop(NULL),
  65.                 m_hr(S_OK)
  66. {
  67.     ASSERT(m_lBatchSize > 0);
  68.     if (FAILED(*phr)) {
  69.         return;
  70.     }
  71.     //  Check the input pin is OK and cache its IMemInputPin interface
  72.     *phr = pInputPin->QueryInterface(IID_IMemInputPin, (void **)&m_pInputPin);
  73.     if (FAILED(*phr)) {
  74.         return;
  75.     }
  76.     // See if we should ask the downstream pin
  77.     if (bAuto) {
  78.         HRESULT hr = m_pInputPin->ReceiveCanBlock();
  79.         if (SUCCEEDED(hr)) {
  80.             bQueue = hr == S_OK;
  81.         }
  82.     }
  83.     //  Create our sample batch
  84.     m_ppSamples = new PMEDIASAMPLE[m_lBatchSize];
  85.     if (m_ppSamples == NULL) {
  86.         *phr = E_OUTOFMEMORY;
  87.         return;
  88.     }
  89.     //  If we're queueing allocate resources
  90.     if (bQueue) {
  91.         DbgLog((LOG_TRACE, 2, TEXT("Creating thread for output pin")));
  92.         m_hSem = CreateSemaphore(NULL, 0, 0x7FFFFFFF, NULL);
  93.         if (m_hSem == NULL) {
  94.             DWORD dwError = GetLastError();
  95.             *phr = AmHresultFromWin32(dwError);
  96.             return;
  97.         }
  98.         m_List = new CSampleList(NAME("Sample Queue List"),
  99.                                  lListSize,
  100.                                  FALSE         // No lock
  101.                                 );
  102.         if (m_List == NULL) {
  103.             *phr = E_OUTOFMEMORY;
  104.             return;
  105.         }
  106.         DWORD dwThreadId;
  107.         m_hThread = CreateThread(NULL,
  108.                                  0,
  109.                                  InitialThreadProc,
  110.                                  (LPVOID)this,
  111.                                  0,
  112.                                  &dwThreadId);
  113.         if (m_hThread == NULL) {
  114.             DWORD dwError = GetLastError();
  115.             *phr = AmHresultFromWin32(dwError);
  116.             return;
  117.         }
  118.         SetThreadPriority(m_hThread, dwPriority);
  119.     } else {
  120.         DbgLog((LOG_TRACE, 2, TEXT("Calling input pin directly - no thread")));
  121.     }
  122. }
  123. //
  124. //  COutputQueuee Destructor :
  125. //
  126. //  Free all resources -
  127. //
  128. //      Thread,
  129. //      Batched samples
  130. //
  131. COutputQueue::~COutputQueue()
  132. {
  133.     DbgLog((LOG_TRACE, 3, TEXT("COutputQueue::~COutputQueue")));
  134.     /*  Free our pointer */
  135.     if (m_pInputPin != NULL) {
  136.         m_pInputPin->Release();
  137.     }
  138.     if (m_hThread != NULL) {
  139.         {
  140.             CAutoLock lck(this);
  141.             m_bTerminate = TRUE;
  142.             m_hr = S_FALSE;
  143.             NotifyThread();
  144.         }
  145.         DbgWaitForSingleObject(m_hThread);
  146.         EXECUTE_ASSERT(CloseHandle(m_hThread));
  147.         //  The thread frees the samples when asked to terminate
  148.         ASSERT(m_List->GetCount() == 0);
  149.         delete m_List;
  150.     } else {
  151.         FreeSamples();
  152.     }
  153.     if (m_hSem != NULL) {
  154.         EXECUTE_ASSERT(CloseHandle(m_hSem));
  155.     }
  156.     delete [] m_ppSamples;
  157. }
  158. //
  159. //  Call the real thread proc as a member function
  160. //
  161. DWORD WINAPI COutputQueue::InitialThreadProc(LPVOID pv)
  162. {
  163.     HRESULT hrCoInit = CAMThread::CoInitializeHelper();
  164.     
  165.     COutputQueue *pSampleQueue = (COutputQueue *)pv;
  166.     DWORD dwReturn = pSampleQueue->ThreadProc();
  167.     if(hrCoInit == S_OK) {
  168.         CoUninitialize();
  169.     }
  170.     
  171.     return dwReturn;
  172. }
  173. //
  174. //  Thread sending the samples downstream :
  175. //
  176. //  When there is nothing to do the thread sets m_lWaiting (while
  177. //  holding the critical section) and then waits for m_hSem to be
  178. //  set (not holding the critical section)
  179. //
  180. DWORD COutputQueue::ThreadProc()
  181. {
  182.     while (TRUE) {
  183.         BOOL          bWait = FALSE;
  184.         IMediaSample *pSample;
  185.         LONG          lNumberToSend; // Local copy
  186.         NewSegmentPacket* ppacket;
  187.         //
  188.         //  Get a batch of samples and send it if possible
  189.         //  In any case exit the loop if there is a control action
  190.         //  requested
  191.         //
  192.         {
  193.             CAutoLock lck(this);
  194.             while (TRUE) {
  195.                 if (m_bTerminate) {
  196.                     FreeSamples();
  197.                     return 0;
  198.                 }
  199.                 if (m_bFlushing) {
  200.                     FreeSamples();
  201.                     SetEvent(m_evFlushComplete);
  202.                 }
  203.                 //  Get a sample off the list
  204.                 pSample = m_List->RemoveHead();
  205. // inform derived class we took something off the queue
  206. if (m_hEventPop) {
  207.                     //DbgLog((LOG_TRACE,3,TEXT("Queue: Delivered  SET EVENT")));
  208.     SetEvent(m_hEventPop);
  209. }
  210.                 if (pSample != NULL &&
  211.                     !IsSpecialSample(pSample)) {
  212.                     //  If its just a regular sample just add it to the batch
  213.                     //  and exit the loop if the batch is full
  214.                     m_ppSamples[m_nBatched++] = pSample;
  215.                     if (m_nBatched == m_lBatchSize) {
  216.                         break;
  217.                     }
  218.                 } else {
  219.                     //  If there was nothing in the queue and there's nothing
  220.                     //  to send (either because there's nothing or the batch
  221.                     //  isn't full) then prepare to wait
  222.                     if (pSample == NULL &&
  223.                         (m_bBatchExact || m_nBatched == 0)) {
  224.                         //  Tell other thread to set the event when there's
  225.                         //  something do to
  226.                         ASSERT(m_lWaiting == 0);
  227.                         m_lWaiting++;
  228.                         bWait      = TRUE;
  229.                     } else {
  230.                         //  We break out of the loop on SEND_PACKET unless
  231.                         //  there's nothing to send
  232.                         if (pSample == SEND_PACKET && m_nBatched == 0) {
  233.                             continue;
  234.                         }
  235.                         if (pSample == NEW_SEGMENT) {
  236.                             // now we need the parameters - we are
  237.                             // guaranteed that the next packet contains them
  238.                             ppacket = (NewSegmentPacket *) m_List->RemoveHead();
  239.     // we took something off the queue
  240.     if (m_hEventPop) {
  241.                              //DbgLog((LOG_TRACE,3,TEXT("Queue: Delivered  SET EVENT")));
  242.              SetEvent(m_hEventPop);
  243.     }
  244.                             ASSERT(ppacket);
  245.                         }
  246.                         //  EOS_PACKET falls through here and we exit the loop
  247.                         //  In this way it acts like SEND_PACKET
  248.                     }
  249.                     break;
  250.                 }
  251.             }
  252.             if (!bWait) {
  253.                 // We look at m_nBatched from the client side so keep
  254.                 // it up to date inside the critical section
  255.                 lNumberToSend = m_nBatched;  // Local copy
  256.                 m_nBatched = 0;
  257.             }
  258.         }
  259.         //  Wait for some more data
  260.         if (bWait) {
  261.             DbgWaitForSingleObject(m_hSem);
  262.             continue;
  263.         }
  264.         //  OK - send it if there's anything to send
  265.         //  We DON'T check m_bBatchExact here because either we've got
  266.         //  a full batch or we dropped through because we got
  267.         //  SEND_PACKET or EOS_PACKET - both of which imply we should
  268.         //  flush our batch
  269.         if (lNumberToSend != 0) {
  270.             long nProcessed;
  271.             if (m_hr == S_OK) {
  272.                 ASSERT(!m_bFlushed);
  273.                 HRESULT hr = m_pInputPin->ReceiveMultiple(m_ppSamples,
  274.                                                           lNumberToSend,
  275.                                                           &nProcessed);
  276.                 /*  Don't overwrite a flushing state HRESULT */
  277.                 CAutoLock lck(this);
  278.                 if (m_hr == S_OK) {
  279.                     m_hr = hr;
  280.                 }
  281.                 ASSERT(!m_bFlushed);
  282.             }
  283.             while (lNumberToSend != 0) {
  284.                 m_ppSamples[--lNumberToSend]->Release();
  285.             }
  286.             if (m_hr != S_OK) {
  287.                 //  In any case wait for more data - S_OK just
  288.                 //  means there wasn't an error
  289.                 DbgLog((LOG_ERROR, 2, TEXT("ReceiveMultiple returned %8.8X"),
  290.                        m_hr));
  291.             }
  292.         }
  293.         //  Check for end of stream
  294.         if (pSample == EOS_PACKET) {
  295.             //  We don't send even end of stream on if we've previously
  296.             //  returned something other than S_OK
  297.             //  This is because in that case the pin which returned
  298.             //  something other than S_OK should have either sent
  299.             //  EndOfStream() or notified the filter graph
  300.             if (m_hr == S_OK) {
  301.                 DbgLog((LOG_TRACE, 2, TEXT("COutputQueue sending EndOfStream()")));
  302.                 HRESULT hr = m_pPin->EndOfStream();
  303.                 if (FAILED(hr)) {
  304.                     DbgLog((LOG_ERROR, 2, TEXT("COutputQueue got code 0x%8.8X from EndOfStream()")));
  305.                 }
  306.             }
  307.         }
  308.         //  Data from a new source
  309.         if (pSample == RESET_PACKET) {
  310.             m_hr = S_OK;
  311.             SetEvent(m_evFlushComplete);
  312.         }
  313.         if (pSample == NEW_SEGMENT) {
  314.             m_pPin->NewSegment(ppacket->tStart, ppacket->tStop, ppacket->dRate);
  315.             delete ppacket;
  316.         }
  317.     }
  318. }
  319. //  Send batched stuff anyway
  320. void COutputQueue::SendAnyway()
  321. {
  322.     if (!IsQueued()) {
  323.         //  m_bSendAnyway is a private parameter checked in ReceiveMultiple
  324.         m_bSendAnyway = TRUE;
  325.         LONG nProcessed;
  326.         ReceiveMultiple(NULL, 0, &nProcessed);
  327.         m_bSendAnyway = FALSE;
  328.     } else {
  329.         CAutoLock lck(this);
  330.         QueueSample(SEND_PACKET);
  331.         NotifyThread();
  332.     }
  333. }
  334. void
  335. COutputQueue::NewSegment(
  336.     REFERENCE_TIME tStart,
  337.     REFERENCE_TIME tStop,
  338.     double dRate)
  339. {
  340.     if (!IsQueued()) {
  341.         if (S_OK == m_hr) {
  342.             if (m_bBatchExact) {
  343.                 SendAnyway();
  344.             }
  345.             m_pPin->NewSegment(tStart, tStop, dRate);
  346.         }
  347.     } else {
  348.         if (m_hr == S_OK) {
  349.             //
  350.             // we need to queue the new segment to appear in order in the
  351.             // data, but we need to pass parameters to it. Rather than
  352.             // take the hit of wrapping every single sample so we can tell
  353.             // special ones apart, we queue special pointers to indicate
  354.             // special packets, and we guarantee (by holding the
  355.             // critical section) that the packet immediately following a
  356.             // NEW_SEGMENT value is a NewSegmentPacket containing the
  357.             // parameters.
  358.             NewSegmentPacket * ppack = new NewSegmentPacket;
  359.             if (ppack == NULL) {
  360.                 return;
  361.             }
  362.             ppack->tStart = tStart;
  363.             ppack->tStop = tStop;
  364.             ppack->dRate = dRate;
  365.             CAutoLock lck(this);
  366.             QueueSample(NEW_SEGMENT);
  367.             QueueSample( (IMediaSample*) ppack);
  368.             NotifyThread();
  369.         }
  370.     }
  371. }
  372. //
  373. //  End of Stream is queued to output device
  374. //
  375. void COutputQueue::EOS()
  376. {
  377.     CAutoLock lck(this);
  378.     if (!IsQueued()) {
  379.         if (m_bBatchExact) {
  380.             SendAnyway();
  381.         }
  382.         if (m_hr == S_OK) {
  383.             DbgLog((LOG_TRACE, 2, TEXT("COutputQueue sending EndOfStream()")));
  384.             m_bFlushed = FALSE;
  385.             HRESULT hr = m_pPin->EndOfStream();
  386.             if (FAILED(hr)) {
  387.                 DbgLog((LOG_ERROR, 2, TEXT("COutputQueue got code 0x%8.8X from EndOfStream()")));
  388.             }
  389.         }
  390.     } else {
  391.         if (m_hr == S_OK) {
  392.             m_bFlushed = FALSE;
  393.             QueueSample(EOS_PACKET);
  394.             NotifyThread();
  395.         }
  396.     }
  397. }
  398. //
  399. //  Flush all the samples in the queue
  400. //
  401. void COutputQueue::BeginFlush()
  402. {
  403.     if (IsQueued()) {
  404.         {
  405.             CAutoLock lck(this);
  406.             // block receives -- we assume this is done by the
  407.             // filter in which we are a component
  408.             // discard all queued data
  409.             m_bFlushing = TRUE;
  410.             //  Make sure we discard all samples from now on
  411.             if (m_hr == S_OK) {
  412.                 m_hr = S_FALSE;
  413.             }
  414.             // Optimize so we don't keep calling downstream all the time
  415.             if (m_bFlushed && m_bFlushingOpt) {
  416.                 return;
  417.             }
  418.             // Make sure we really wait for the flush to complete
  419.             m_evFlushComplete.Reset();
  420.             NotifyThread();
  421.         }
  422.         // pass this downstream
  423.         m_pPin->BeginFlush();
  424.     } else {
  425.         // pass downstream first to avoid deadlocks
  426.         m_pPin->BeginFlush();
  427.         CAutoLock lck(this);
  428.         // discard all queued data
  429.         m_bFlushing = TRUE;
  430.         //  Make sure we discard all samples from now on
  431.         if (m_hr == S_OK) {
  432.             m_hr = S_FALSE;
  433.         }
  434.     }
  435. }
  436. //
  437. // leave flush mode - pass this downstream
  438. void COutputQueue::EndFlush()
  439. {
  440.     {
  441.         CAutoLock lck(this);
  442.         ASSERT(m_bFlushing);
  443.         if (m_bFlushingOpt && m_bFlushed && IsQueued()) {
  444.             m_bFlushing = FALSE;
  445.             m_hr = S_OK;
  446.             return;
  447.         }
  448.     }
  449.     // sync with pushing thread -- done in BeginFlush
  450.     // ensure no more data to go downstream -- done in BeginFlush
  451.     //
  452.     // Because we are synching here there is no need to hold the critical
  453.     // section (in fact we'd deadlock if we did!)
  454.     if (IsQueued()) {
  455.         m_evFlushComplete.Wait();
  456.     } else {
  457.         FreeSamples();
  458.     }
  459.     //  Be daring - the caller has guaranteed no samples will arrive
  460.     //  before EndFlush() returns
  461.     m_bFlushing = FALSE;
  462.     m_bFlushed  = TRUE;
  463.     // call EndFlush on downstream pins
  464.     m_pPin->EndFlush();
  465.     m_hr = S_OK;
  466. }
  467. //  COutputQueue::QueueSample
  468. //
  469. //  private method to Send a sample to the output queue
  470. //  The critical section MUST be held when this is called
  471. void COutputQueue::QueueSample(IMediaSample *pSample)
  472. {
  473.     if (NULL == m_List->AddTail(pSample)) {
  474.         if (!IsSpecialSample(pSample)) {
  475.             pSample->Release();
  476.         }
  477.     }
  478. }
  479. //
  480. //  COutputQueue::Receive()
  481. //
  482. //  Send a single sample by the multiple sample route
  483. //  (NOTE - this could be optimized if necessary)
  484. //
  485. //  On return the sample will have been Release()'d
  486. //
  487. HRESULT COutputQueue::Receive(IMediaSample *pSample)
  488. {
  489.     LONG nProcessed;
  490.     return ReceiveMultiple(&pSample, 1, &nProcessed);
  491. }
  492. //
  493. //  COutputQueue::ReceiveMultiple()
  494. //
  495. //  Send a set of samples to the downstream pin
  496. //
  497. //      ppSamples           - array of samples
  498. //      nSamples            - how many
  499. //      nSamplesProcessed   - How many were processed
  500. //
  501. //  On return all samples will have been Release()'d
  502. //
  503. HRESULT COutputQueue::ReceiveMultiple (
  504.     IMediaSample **ppSamples,
  505.     long nSamples,
  506.     long *nSamplesProcessed)
  507. {
  508.     CAutoLock lck(this);
  509.     //  Either call directly or queue up the samples
  510.     if (!IsQueued()) {
  511.         //  If we already had a bad return code then just return
  512.         if (S_OK != m_hr) {
  513.             //  If we've never received anything since the last Flush()
  514.             //  and the sticky return code is not S_OK we must be
  515.             //  flushing
  516.             //  ((!A || B) is equivalent to A implies B)
  517.             ASSERT(!m_bFlushed || m_bFlushing);
  518.             //  We're supposed to Release() them anyway!
  519.             *nSamplesProcessed = 0;
  520.             for (int i = 0; i < nSamples; i++) {
  521.                 DbgLog((LOG_TRACE, 3, TEXT("COutputQueue (direct) : Discarding %d samples code 0x%8.8X"),
  522.                         nSamples, m_hr));
  523.                 ppSamples[i]->Release();
  524.             }
  525.             return m_hr;
  526.         }
  527.         //
  528.         //  If we're flushing the sticky return code should be S_FALSE
  529.         //
  530.         ASSERT(!m_bFlushing);
  531.         m_bFlushed = FALSE;
  532.         ASSERT(m_nBatched < m_lBatchSize);
  533.         ASSERT(m_nBatched == 0 || m_bBatchExact);
  534.         //  Loop processing the samples in batches
  535.         LONG iLost = 0;
  536.         for (long iDone = 0;
  537.              iDone < nSamples || (m_nBatched != 0 && m_bSendAnyway);
  538.             ) {
  539. //pragma message (REMIND("Implement threshold scheme"))
  540.             ASSERT(m_nBatched < m_lBatchSize);
  541.             if (iDone < nSamples) {
  542.                 m_ppSamples[m_nBatched++] = ppSamples[iDone++];
  543.             }
  544.             if (m_nBatched == m_lBatchSize ||
  545.                 nSamples == 0 && (m_bSendAnyway || !m_bBatchExact)) {
  546.                 LONG nDone;
  547.                 DbgLog((LOG_TRACE, 4, TEXT("Batching %d samples"),
  548.                        m_nBatched));
  549.                 if (m_hr == S_OK) {
  550.                     m_hr = m_pInputPin->ReceiveMultiple(m_ppSamples,
  551.                                                         m_nBatched,
  552.                                                         &nDone);
  553.                 } else {
  554.                     nDone = 0;
  555.                 }
  556.                 iLost += m_nBatched - nDone;
  557.                 for (LONG i = 0; i < m_nBatched; i++) {
  558.                     m_ppSamples[i]->Release();
  559.                 }
  560.                 m_nBatched = 0;
  561.             }
  562.         }
  563.         *nSamplesProcessed = iDone - iLost;
  564.         if (*nSamplesProcessed < 0) {
  565.             *nSamplesProcessed = 0;
  566.         }
  567.         return m_hr;
  568.     } else {
  569.         /*  We're sending to our thread */
  570.         if (m_hr != S_OK) {
  571.             *nSamplesProcessed = 0;
  572.             DbgLog((LOG_TRACE, 3, TEXT("COutputQueue (queued) : Discarding %d samples code 0x%8.8X"),
  573.                     nSamples, m_hr));
  574.             for (int i = 0; i < nSamples; i++) {
  575.                 ppSamples[i]->Release();
  576.             }
  577.             return m_hr;
  578.         }
  579.         m_bFlushed = FALSE;
  580.         for (long i = 0; i < nSamples; i++) {
  581.             QueueSample(ppSamples[i]);
  582.         }
  583.         *nSamplesProcessed = nSamples;
  584.         if (!m_bBatchExact ||
  585.             m_nBatched + m_List->GetCount() >= m_lBatchSize) {
  586.             NotifyThread();
  587.         }
  588.         return S_OK;
  589.     }
  590. }
  591. //  Get ready for new data - cancels sticky m_hr
  592. void COutputQueue::Reset()
  593. {
  594.     if (!IsQueued()) {
  595.         m_hr = S_OK;
  596.     } else {
  597.         CAutoLock lck(this);
  598.         QueueSample(RESET_PACKET);
  599.         NotifyThread();
  600.         m_evFlushComplete.Wait();
  601.     }
  602. }
  603. //  Remove and Release() all queued and Batched samples
  604. void COutputQueue::FreeSamples()
  605. {
  606.     CAutoLock lck(this);
  607.     if (IsQueued()) {
  608.         while (TRUE) {
  609.             IMediaSample *pSample = m_List->RemoveHead();
  610.     // inform derived class we took something off the queue
  611.     if (m_hEventPop) {
  612.                 //DbgLog((LOG_TRACE,3,TEXT("Queue: Delivered  SET EVENT")));
  613.         SetEvent(m_hEventPop);
  614.     }
  615.             if (pSample == NULL) {
  616.                 break;
  617.             }
  618.             if (!IsSpecialSample(pSample)) {
  619.                 pSample->Release();
  620.             } else {
  621.                 if (pSample == NEW_SEGMENT) {
  622.                     //  Free NEW_SEGMENT packet
  623.                     NewSegmentPacket *ppacket =
  624.                         (NewSegmentPacket *) m_List->RemoveHead();
  625.     // inform derived class we took something off the queue
  626.     if (m_hEventPop) {
  627.                         //DbgLog((LOG_TRACE,3,TEXT("Queue: Delivered  SET EVENT")));
  628.         SetEvent(m_hEventPop);
  629.     }
  630.                     ASSERT(ppacket != NULL);
  631.                     delete ppacket;
  632.                 }
  633.             }
  634.         }
  635.     }
  636.     for (int i = 0; i < m_nBatched; i++) {
  637.         m_ppSamples[i]->Release();
  638.     }
  639.     m_nBatched = 0;
  640. }
  641. //  Notify the thread if there is something to do
  642. //
  643. //  The critical section MUST be held when this is called
  644. void COutputQueue::NotifyThread()
  645. {
  646.     //  Optimize - no need to signal if it's not waiting
  647.     ASSERT(IsQueued());
  648.     if (m_lWaiting) {
  649.         ReleaseSemaphore(m_hSem, m_lWaiting, NULL);
  650.         m_lWaiting = 0;
  651.     }
  652. }
  653. //  See if there's any work to do
  654. //  Returns
  655. //      TRUE  if there is nothing on the queue and nothing in the batch
  656. //            and all data has been sent
  657. //      FALSE otherwise
  658. //
  659. BOOL COutputQueue::IsIdle()
  660. {
  661.     CAutoLock lck(this);
  662.     //  We're idle if
  663.     //      there is no thread (!IsQueued()) OR
  664.     //      the thread is waiting for more work  (m_lWaiting != 0)
  665.     //  AND
  666.     //      there's nothing in the current batch (m_nBatched == 0)
  667.     if (IsQueued() && m_lWaiting == 0 || m_nBatched != 0) {
  668.         return FALSE;
  669.     } else {
  670.         //  If we're idle it shouldn't be possible for there
  671.         //  to be anything on the work queue
  672.         ASSERT(!IsQueued() || m_List->GetCount() == 0);
  673.         return TRUE;
  674.     }
  675. }
  676. void COutputQueue::SetPopEvent(HANDLE hEvent)
  677. {
  678.     m_hEventPop = hEvent;
  679. }