llpumpio.cpp
上传用户:king477883
上传日期:2021-03-01
资源大小:9553k
文件大小:28k
源码类别:

游戏引擎

开发平台:

C++ Builder

  1. /** 
  2.  * @file llpumpio.cpp
  3.  * @author Phoenix
  4.  * @date 2004-11-21
  5.  * @brief Implementation of the i/o pump and related functions.
  6.  *
  7.  * $LicenseInfo:firstyear=2004&license=viewergpl$
  8.  * 
  9.  * Copyright (c) 2004-2010, Linden Research, Inc.
  10.  * 
  11.  * Second Life Viewer Source Code
  12.  * The source code in this file ("Source Code") is provided by Linden Lab
  13.  * to you under the terms of the GNU General Public License, version 2.0
  14.  * ("GPL"), unless you have obtained a separate licensing agreement
  15.  * ("Other License"), formally executed by you and Linden Lab.  Terms of
  16.  * the GPL can be found in doc/GPL-license.txt in this distribution, or
  17.  * online at http://secondlifegrid.net/programs/open_source/licensing/gplv2
  18.  * 
  19.  * There are special exceptions to the terms and conditions of the GPL as
  20.  * it is applied to this Source Code. View the full text of the exception
  21.  * in the file doc/FLOSS-exception.txt in this software distribution, or
  22.  * online at
  23.  * http://secondlifegrid.net/programs/open_source/licensing/flossexception
  24.  * 
  25.  * By copying, modifying or distributing this software, you acknowledge
  26.  * that you have read and understood your obligations described above,
  27.  * and agree to abide by those obligations.
  28.  * 
  29.  * ALL LINDEN LAB SOURCE CODE IS PROVIDED "AS IS." LINDEN LAB MAKES NO
  30.  * WARRANTIES, EXPRESS, IMPLIED OR OTHERWISE, REGARDING ITS ACCURACY,
  31.  * COMPLETENESS OR PERFORMANCE.
  32.  * $/LicenseInfo$
  33.  */
  34. #include "linden_common.h"
  35. #include "llpumpio.h"
  36. #include <map>
  37. #include <set>
  38. #include "apr_poll.h"
  39. #include "llapr.h"
  40. #include "llmemtype.h"
  41. #include "llstl.h"
  42. #include "llstat.h"
  43. // These should not be enabled in production, but they can be
  44. // intensely useful during development for finding certain kinds of
  45. // bugs.
  46. #if LL_LINUX
  47. //#define LL_DEBUG_PIPE_TYPE_IN_PUMP 1
  48. //#define LL_DEBUG_POLL_FILE_DESCRIPTORS 1
  49. #if LL_DEBUG_POLL_FILE_DESCRIPTORS
  50. #include "apr_portable.h"
  51. #endif
  52. #endif
  53. #if LL_DEBUG_PIPE_TYPE_IN_PUMP
  54. #include <typeinfo>
  55. #endif
  56. // constants for poll timeout. if we are threading, we want to have a
  57. // longer poll timeout.
  58. #if LL_THREADS_APR
  59. static const S32 DEFAULT_POLL_TIMEOUT = 1000;
  60. #else
  61. static const S32 DEFAULT_POLL_TIMEOUT = 0;
  62. #endif
  63. // The default (and fallback) expiration time for chains
  64. const F32 DEFAULT_CHAIN_EXPIRY_SECS = 30.0f;
  65. extern const F32 SHORT_CHAIN_EXPIRY_SECS = 1.0f;
  66. extern const F32 NEVER_CHAIN_EXPIRY_SECS = 0.0f;
  67. // sorta spammy debug modes.
  68. //#define LL_DEBUG_SPEW_BUFFER_CHANNEL_IN_ON_ERROR 1
  69. //#define LL_DEBUG_PROCESS_LINK 1
  70. //#define LL_DEBUG_PROCESS_RETURN_VALUE 1
  71. // Super spammy debug mode.
  72. //#define LL_DEBUG_SPEW_BUFFER_CHANNEL_IN 1
  73. //#define LL_DEBUG_SPEW_BUFFER_CHANNEL_OUT 1
  74. //
  75. // local functions
  76. //
  77. void ll_debug_poll_fd(const char* msg, const apr_pollfd_t* poll)
  78. {
  79. #if LL_DEBUG_POLL_FILE_DESCRIPTORS
  80. if(!poll)
  81. {
  82. lldebugs << "Poll -- " << (msg?msg:"") << ": no pollfd." << llendl;
  83. return;
  84. }
  85. if(poll->desc.s)
  86. {
  87. apr_os_sock_t os_sock;
  88. if(APR_SUCCESS == apr_os_sock_get(&os_sock, poll->desc.s))
  89. {
  90. lldebugs << "Poll -- " << (msg?msg:"") << " on fd " << os_sock
  91.  << " at " << poll->desc.s << llendl;
  92. }
  93. else
  94. {
  95. lldebugs << "Poll -- " << (msg?msg:"") << " no fd "
  96.  << " at " << poll->desc.s << llendl;
  97. }
  98. }
  99. else if(poll->desc.f)
  100. {
  101. apr_os_file_t os_file;
  102. if(APR_SUCCESS == apr_os_file_get(&os_file, poll->desc.f))
  103. {
  104. lldebugs << "Poll -- " << (msg?msg:"") << " on fd " << os_file
  105.  << " at " << poll->desc.f << llendl;
  106. }
  107. else
  108. {
  109. lldebugs << "Poll -- " << (msg?msg:"") << " no fd "
  110.  << " at " << poll->desc.f << llendl;
  111. }
  112. }
  113. else
  114. {
  115. lldebugs << "Poll -- " << (msg?msg:"") << ": no descriptor." << llendl;
  116. }
  117. #endif
  118. }
  119. /**
  120.  * @class
  121.  */
  122. class LLChainSleeper : public LLRunnable
  123. {
  124. public:
  125. static LLRunner::run_ptr_t build(LLPumpIO* pump, S32 key)
  126. {
  127. return LLRunner::run_ptr_t(new LLChainSleeper(pump, key));
  128. }
  129. virtual void run(LLRunner* runner, S64 handle)
  130. {
  131. mPump->clearLock(mKey);
  132. }
  133. protected:
  134. LLChainSleeper(LLPumpIO* pump, S32 key) : mPump(pump), mKey(key) {}
  135. LLPumpIO* mPump;
  136. S32 mKey;
  137. };
  138. /**
  139.  * @struct ll_delete_apr_pollset_fd_client_data
  140.  * @brief This is a simple helper class to clean up our client data.
  141.  */
  142. struct ll_delete_apr_pollset_fd_client_data
  143. {
  144. typedef std::pair<LLIOPipe::ptr_t, apr_pollfd_t> pipe_conditional_t;
  145. void operator()(const pipe_conditional_t& conditional)
  146. {
  147. LLMemType m1(LLMemType::MTYPE_IO_PUMP);
  148. S32* client_id = (S32*)conditional.second.client_data;
  149. delete client_id;
  150. }
  151. };
  152. /**
  153.  * LLPumpIO
  154.  */
  155. LLPumpIO::LLPumpIO(apr_pool_t* pool) :
  156. mState(LLPumpIO::NORMAL),
  157. mRebuildPollset(false),
  158. mPollset(NULL),
  159. mPollsetClientID(0),
  160. mNextLock(0),
  161. mPool(NULL),
  162. mCurrentPool(NULL),
  163. mCurrentPoolReallocCount(0),
  164. mChainsMutex(NULL),
  165. mCallbackMutex(NULL),
  166. mCurrentChain(mRunningChains.end())
  167. {
  168. mCurrentChain = mRunningChains.end();
  169. LLMemType m1(LLMemType::MTYPE_IO_PUMP);
  170. initialize(pool);
  171. }
  172. LLPumpIO::~LLPumpIO()
  173. {
  174. LLMemType m1(LLMemType::MTYPE_IO_PUMP);
  175. cleanup();
  176. }
  177. bool LLPumpIO::prime(apr_pool_t* pool)
  178. {
  179. LLMemType m1(LLMemType::MTYPE_IO_PUMP);
  180. cleanup();
  181. initialize(pool);
  182. return ((pool == NULL) ? false : true);
  183. }
  184. bool LLPumpIO::addChain(const chain_t& chain, F32 timeout)
  185. {
  186. LLMemType m1(LLMemType::MTYPE_IO_PUMP);
  187. if(chain.empty()) return false;
  188. #if LL_THREADS_APR
  189. LLScopedLock lock(mChainsMutex);
  190. #endif
  191. LLChainInfo info;
  192. info.setTimeoutSeconds(timeout);
  193. info.mData = LLIOPipe::buffer_ptr_t(new LLBufferArray);
  194. LLLinkInfo link;
  195. #if LL_DEBUG_PIPE_TYPE_IN_PUMP
  196. lldebugs << "LLPumpIO::addChain() " << chain[0] << " '"
  197. << typeid(*(chain[0])).name() << "'" << llendl;
  198. #else
  199. lldebugs << "LLPumpIO::addChain() " << chain[0] <<llendl;
  200. #endif
  201. chain_t::const_iterator it = chain.begin();
  202. chain_t::const_iterator end = chain.end();
  203. for(; it != end; ++it)
  204. {
  205. link.mPipe = (*it);
  206. link.mChannels = info.mData->nextChannel();
  207. info.mChainLinks.push_back(link);
  208. }
  209. mPendingChains.push_back(info);
  210. return true;
  211. }
  212. bool LLPumpIO::addChain(
  213. const LLPumpIO::links_t& links,
  214. LLIOPipe::buffer_ptr_t data,
  215. LLSD context,
  216. F32 timeout)
  217. {
  218. LLMemType m1(LLMemType::MTYPE_IO_PUMP);
  219. // remember that if the caller is providing a full link
  220. // description, we need to have that description matched to a
  221. // particular buffer.
  222. if(!data) return false;
  223. if(links.empty()) return false;
  224. #if LL_THREADS_APR
  225. LLScopedLock lock(mChainsMutex);
  226. #endif
  227. #if LL_DEBUG_PIPE_TYPE_IN_PUMP
  228. lldebugs << "LLPumpIO::addChain() " << links[0].mPipe << " '"
  229. << typeid(*(links[0].mPipe)).name() << "'" << llendl;
  230. #else
  231. lldebugs << "LLPumpIO::addChain() " << links[0].mPipe << llendl;
  232. #endif
  233. LLChainInfo info;
  234. info.setTimeoutSeconds(timeout);
  235. info.mChainLinks = links;
  236. info.mData = data;
  237. info.mContext = context;
  238. mPendingChains.push_back(info);
  239. return true;
  240. }
  241. bool LLPumpIO::setTimeoutSeconds(F32 timeout)
  242. {
  243. // If no chain is running, return failure.
  244. if(mRunningChains.end() == mCurrentChain)
  245. {
  246. return false;
  247. }
  248. (*mCurrentChain).setTimeoutSeconds(timeout);
  249. return true;
  250. }
  251. void LLPumpIO::adjustTimeoutSeconds(F32 delta)
  252. {
  253. // Ensure a chain is running
  254. if(mRunningChains.end() != mCurrentChain)
  255. {
  256. (*mCurrentChain).adjustTimeoutSeconds(delta);
  257. }
  258. }
  259. static std::string events_2_string(apr_int16_t events)
  260. {
  261. std::ostringstream ostr;
  262. if(events & APR_POLLIN)
  263. {
  264. ostr << "read,";
  265. }
  266. if(events & APR_POLLPRI)
  267. {
  268. ostr << "priority,";
  269. }
  270. if(events & APR_POLLOUT)
  271. {
  272. ostr << "write,";
  273. }
  274. if(events & APR_POLLERR)
  275. {
  276. ostr << "error,";
  277. }
  278. if(events & APR_POLLHUP)
  279. {
  280. ostr << "hangup,";
  281. }
  282. if(events & APR_POLLNVAL)
  283. {
  284. ostr << "invalid,";
  285. }
  286. return chop_tail_copy(ostr.str(), 1);
  287. }
  288. bool LLPumpIO::setConditional(LLIOPipe* pipe, const apr_pollfd_t* poll)
  289. {
  290. LLMemType m1(LLMemType::MTYPE_IO_PUMP);
  291. if(!pipe) return false;
  292. ll_debug_poll_fd("Set conditional", poll);
  293. lldebugs << "Setting conditionals (" << (poll ? events_2_string(poll->reqevents) :"null")
  294.  << ") "
  295. #if LL_DEBUG_PIPE_TYPE_IN_PUMP
  296.  << "on pipe " << typeid(*pipe).name() 
  297. #endif
  298.  << " at " << pipe << llendl;
  299. // remove any matching poll file descriptors for this pipe.
  300. LLIOPipe::ptr_t pipe_ptr(pipe);
  301. LLChainInfo::conditionals_t::iterator it;
  302. it = (*mCurrentChain).mDescriptors.begin();
  303. while(it != (*mCurrentChain).mDescriptors.end())
  304. {
  305. LLChainInfo::pipe_conditional_t& value = (*it);
  306. if(pipe_ptr == value.first)
  307. {
  308. ll_delete_apr_pollset_fd_client_data()(value);
  309. it = (*mCurrentChain).mDescriptors.erase(it);
  310. mRebuildPollset = true;
  311. }
  312. else
  313. {
  314. ++it;
  315. }
  316. }
  317. if(!poll)
  318. {
  319. mRebuildPollset = true;
  320. return true;
  321. }
  322. LLChainInfo::pipe_conditional_t value;
  323. value.first = pipe_ptr;
  324. value.second = *poll;
  325. value.second.rtnevents = 0;
  326. if(!poll->p)
  327. {
  328. // each fd needs a pool to work with, so if one was
  329. // not specified, use this pool.
  330. // *FIX: Should it always be this pool?
  331. value.second.p = mPool;
  332. }
  333. value.second.client_data = new S32(++mPollsetClientID);
  334. (*mCurrentChain).mDescriptors.push_back(value);
  335. mRebuildPollset = true;
  336. return true;
  337. }
  338. S32 LLPumpIO::setLock()
  339. {
  340. // *NOTE: I do not think it is necessary to acquire a mutex here
  341. // since this should only be called during the pump(), and should
  342. // only change the running chain. Any other use of this method is
  343. // incorrect usage. If it becomes necessary to acquire a lock
  344. // here, be sure to lock here and call a protected method to get
  345. // the lock, and sleepChain() should probably acquire the same
  346. // lock while and calling the same protected implementation to
  347. // lock the runner at the same time.
  348. // If no chain is running, return failure.
  349. if(mRunningChains.end() == mCurrentChain)
  350. {
  351. return 0;
  352. }
  353. // deal with wrap.
  354. if(++mNextLock <= 0)
  355. {
  356. mNextLock = 1;
  357. }
  358. // set the lock
  359. (*mCurrentChain).mLock = mNextLock;
  360. return mNextLock;
  361. }
  362. void LLPumpIO::clearLock(S32 key)
  363. {
  364. // We need to lock it here since we do not want to be iterating
  365. // over the chains twice. We can safely call process() while this
  366. // is happening since we should not be erasing a locked pipe, and
  367. // therefore won't be treading into deleted memory. I think we can
  368. // also clear the lock on the chain safely since the pump only
  369. // reads that value.
  370. #if LL_THREADS_APR
  371. LLScopedLock lock(mChainsMutex);
  372. #endif
  373. mClearLocks.insert(key);
  374. }
  375. bool LLPumpIO::sleepChain(F64 seconds)
  376. {
  377. // Much like the call to setLock(), this should only be called
  378. // from one chain during processing, so there is no need to
  379. // acquire a mutex.
  380. if(seconds <= 0.0) return false;
  381. S32 key = setLock();
  382. if(!key) return false;
  383. LLRunner::run_handle_t handle = mRunner.addRunnable(
  384. LLChainSleeper::build(this, key),
  385. LLRunner::RUN_IN,
  386. seconds);
  387. if(0 == handle) return false;
  388. return true;
  389. }
  390. bool LLPumpIO::copyCurrentLinkInfo(links_t& links) const
  391. {
  392. LLMemType m1(LLMemType::MTYPE_IO_PUMP);
  393. if(mRunningChains.end() == mCurrentChain)
  394. {
  395. return false;
  396. }
  397. std::copy(
  398. (*mCurrentChain).mChainLinks.begin(),
  399. (*mCurrentChain).mChainLinks.end(),
  400. std::back_insert_iterator<links_t>(links));
  401. return true;
  402. }
  403. void LLPumpIO::pump()
  404. {
  405. pump(DEFAULT_POLL_TIMEOUT);
  406. }
  407. static LLFastTimer::DeclareTimer FTM_PUMP("Pump");
  408. //timeout is in microseconds
  409. void LLPumpIO::pump(const S32& poll_timeout)
  410. {
  411. LLMemType m1(LLMemType::MTYPE_IO_PUMP);
  412. LLFastTimer t1(FTM_PUMP);
  413. //llinfos << "LLPumpIO::pump()" << llendl;
  414. // Run any pending runners.
  415. mRunner.run();
  416. // We need to move all of the pending heads over to the running
  417. // chains.
  418. PUMP_DEBUG;
  419. if(true)
  420. {
  421. #if LL_THREADS_APR
  422. LLScopedLock lock(mChainsMutex);
  423. #endif
  424. // bail if this pump is paused.
  425. if(PAUSING == mState)
  426. {
  427. mState = PAUSED;
  428. }
  429. if(PAUSED == mState)
  430. {
  431. return;
  432. }
  433. PUMP_DEBUG;
  434. // Move the pending chains over to the running chaings
  435. if(!mPendingChains.empty())
  436. {
  437. PUMP_DEBUG;
  438. //lldebugs << "Pushing " << mPendingChains.size() << "." << llendl;
  439. std::copy(
  440. mPendingChains.begin(),
  441. mPendingChains.end(),
  442. std::back_insert_iterator<running_chains_t>(mRunningChains));
  443. mPendingChains.clear();
  444. PUMP_DEBUG;
  445. }
  446. // Clear any locks. This needs to be done here so that we do
  447. // not clash during a call to clearLock().
  448. if(!mClearLocks.empty())
  449. {
  450. PUMP_DEBUG;
  451. running_chains_t::iterator it = mRunningChains.begin();
  452. running_chains_t::iterator end = mRunningChains.end();
  453. std::set<S32>::iterator not_cleared = mClearLocks.end();
  454. for(; it != end; ++it)
  455. {
  456. if((*it).mLock && mClearLocks.find((*it).mLock) != not_cleared)
  457. {
  458. (*it).mLock = 0;
  459. }
  460. }
  461. PUMP_DEBUG;
  462. mClearLocks.clear();
  463. }
  464. }
  465. PUMP_DEBUG;
  466. // rebuild the pollset if necessary
  467. if(mRebuildPollset)
  468. {
  469. PUMP_DEBUG;
  470. rebuildPollset();
  471. mRebuildPollset = false;
  472. }
  473. // Poll based on the last known pollset
  474. // *TODO: may want to pass in a poll timeout so it works correctly
  475. // in single and multi threaded processes.
  476. PUMP_DEBUG;
  477. typedef std::map<S32, S32> signal_client_t;
  478. signal_client_t signalled_client;
  479. const apr_pollfd_t* poll_fd = NULL;
  480. if(mPollset)
  481. {
  482. PUMP_DEBUG;
  483. //llinfos << "polling" << llendl;
  484. S32 count = 0;
  485. S32 client_id = 0;
  486.         {
  487.             LLPerfBlock polltime("pump_poll");
  488.             apr_pollset_poll(mPollset, poll_timeout, &count, &poll_fd);
  489.         }
  490. PUMP_DEBUG;
  491. for(S32 ii = 0; ii < count; ++ii)
  492. {
  493. ll_debug_poll_fd("Signalled pipe", &poll_fd[ii]);
  494. client_id = *((S32*)poll_fd[ii].client_data);
  495. signalled_client[client_id] = ii;
  496. }
  497. PUMP_DEBUG;
  498. }
  499. PUMP_DEBUG;
  500. // set up for a check to see if each one was signalled
  501. signal_client_t::iterator not_signalled = signalled_client.end();
  502. // Process everything as appropriate
  503. //lldebugs << "Running chain count: " << mRunningChains.size() << llendl;
  504. running_chains_t::iterator run_chain = mRunningChains.begin();
  505. bool process_this_chain = false;
  506. while( run_chain != mRunningChains.end() )
  507. {
  508. PUMP_DEBUG;
  509. if((*run_chain).mInit
  510.    && (*run_chain).mTimer.getStarted()
  511.    && (*run_chain).mTimer.hasExpired())
  512. {
  513. PUMP_DEBUG;
  514. if(handleChainError(*run_chain, LLIOPipe::STATUS_EXPIRED))
  515. {
  516. // the pipe probably handled the error. If the handler
  517. // forgot to reset the expiration then we need to do
  518. // that here.
  519. if((*run_chain).mTimer.getStarted()
  520.    && (*run_chain).mTimer.hasExpired())
  521. {
  522. PUMP_DEBUG;
  523. llinfos << "Error handler forgot to reset timeout. "
  524. << "Resetting to " << DEFAULT_CHAIN_EXPIRY_SECS
  525. << " seconds." << llendl;
  526. (*run_chain).setTimeoutSeconds(DEFAULT_CHAIN_EXPIRY_SECS);
  527. }
  528. }
  529. else
  530. {
  531. PUMP_DEBUG;
  532. // it timed out and no one handled it, so we need to
  533. // retire the chain
  534. #if LL_DEBUG_PIPE_TYPE_IN_PUMP
  535. lldebugs << "Removing chain "
  536. << (*run_chain).mChainLinks[0].mPipe
  537. << " '"
  538. << typeid(*((*run_chain).mChainLinks[0].mPipe)).name()
  539. << "' because it timed out." << llendl;
  540. #else
  541. // lldebugs << "Removing chain "
  542. // << (*run_chain).mChainLinks[0].mPipe
  543. // << " because we reached the end." << llendl;
  544. #endif
  545. run_chain = mRunningChains.erase(run_chain);
  546. continue;
  547. }
  548. }
  549. PUMP_DEBUG;
  550. if((*run_chain).mLock)
  551. {
  552. ++run_chain;
  553. continue;
  554. }
  555. PUMP_DEBUG;
  556. mCurrentChain = run_chain;
  557. if((*run_chain).mDescriptors.empty())
  558. {
  559. // if there are no conditionals, just process this chain.
  560. process_this_chain = true;
  561. //lldebugs << "no conditionals - processing" << llendl;
  562. }
  563. else
  564. {
  565. PUMP_DEBUG;
  566. //lldebugs << "checking conditionals" << llendl;
  567. // Check if this run chain was signalled. If any file
  568. // descriptor is ready for something, then go ahead and
  569. // process this chian.
  570. process_this_chain = false;
  571. if(!signalled_client.empty())
  572. {
  573. PUMP_DEBUG;
  574. LLChainInfo::conditionals_t::iterator it;
  575. it = (*run_chain).mDescriptors.begin();
  576. LLChainInfo::conditionals_t::iterator end;
  577. end = (*run_chain).mDescriptors.end();
  578. S32 client_id = 0;
  579. signal_client_t::iterator signal;
  580. for(; it != end; ++it)
  581. {
  582. PUMP_DEBUG;
  583. client_id = *((S32*)((*it).second.client_data));
  584. signal = signalled_client.find(client_id);
  585. if (signal == not_signalled) continue;
  586. static const apr_int16_t POLL_CHAIN_ERROR =
  587. APR_POLLHUP | APR_POLLNVAL | APR_POLLERR;
  588. const apr_pollfd_t* poll = &(poll_fd[(*signal).second]);
  589. if(poll->rtnevents & POLL_CHAIN_ERROR)
  590. {
  591. // Potential eror condition has been
  592. // returned. If HUP was one of them, we pass
  593. // that as the error even though there may be
  594. // more. If there are in fact more errors,
  595. // we'll just wait for that detection until
  596. // the next pump() cycle to catch it so that
  597. // the logic here gets no more strained than
  598. // it already is.
  599. LLIOPipe::EStatus error_status;
  600. if(poll->rtnevents & APR_POLLHUP)
  601. error_status = LLIOPipe::STATUS_LOST_CONNECTION;
  602. else
  603. error_status = LLIOPipe::STATUS_ERROR;
  604. if(handleChainError(*run_chain, error_status)) break;
  605. ll_debug_poll_fd("Removing pipe", poll);
  606. llwarns << "Removing pipe "
  607. << (*run_chain).mChainLinks[0].mPipe
  608. << " '"
  609. #if LL_DEBUG_PIPE_TYPE_IN_PUMP
  610. << typeid(
  611. *((*run_chain).mChainLinks[0].mPipe)).name()
  612. #endif
  613. << "' because: "
  614. << events_2_string(poll->rtnevents)
  615. << llendl;
  616. (*run_chain).mHead = (*run_chain).mChainLinks.end();
  617. break;
  618. }
  619. // at least 1 fd got signalled, and there were no
  620. // errors. That means we process this chain.
  621. process_this_chain = true;
  622. break;
  623. }
  624. }
  625. }
  626. if(process_this_chain)
  627. {
  628. PUMP_DEBUG;
  629. if(!((*run_chain).mInit))
  630. {
  631. (*run_chain).mHead = (*run_chain).mChainLinks.begin();
  632. (*run_chain).mInit = true;
  633. }
  634. PUMP_DEBUG;
  635. processChain(*run_chain);
  636. }
  637. PUMP_DEBUG;
  638. if((*run_chain).mHead == (*run_chain).mChainLinks.end())
  639. {
  640. #if LL_DEBUG_PIPE_TYPE_IN_PUMP
  641. lldebugs << "Removing chain " << (*run_chain).mChainLinks[0].mPipe
  642. << " '"
  643. << typeid(*((*run_chain).mChainLinks[0].mPipe)).name()
  644. << "' because we reached the end." << llendl;
  645. #else
  646. // lldebugs << "Removing chain " << (*run_chain).mChainLinks[0].mPipe
  647. // << " because we reached the end." << llendl;
  648. #endif
  649. PUMP_DEBUG;
  650. // This chain is done. Clean up any allocated memory and
  651. // erase the chain info.
  652. std::for_each(
  653. (*run_chain).mDescriptors.begin(),
  654. (*run_chain).mDescriptors.end(),
  655. ll_delete_apr_pollset_fd_client_data());
  656. run_chain = mRunningChains.erase(run_chain);
  657. // *NOTE: may not always need to rebuild the pollset.
  658. mRebuildPollset = true;
  659. }
  660. else
  661. {
  662. PUMP_DEBUG;
  663. // this chain needs more processing - just go to the next
  664. // chain.
  665. ++run_chain;
  666. }
  667. }
  668. PUMP_DEBUG;
  669. // null out the chain
  670. mCurrentChain = mRunningChains.end();
  671. END_PUMP_DEBUG;
  672. }
  673. //bool LLPumpIO::respond(const chain_t& pipes)
  674. //{
  675. //#if LL_THREADS_APR
  676. // LLScopedLock lock(mCallbackMutex);
  677. //#endif
  678. // LLChainInfo info;
  679. // links_t links;
  680. //
  681. // mPendingCallbacks.push_back(info);
  682. // return true;
  683. //}
  684. bool LLPumpIO::respond(LLIOPipe* pipe)
  685. {
  686. LLMemType m1(LLMemType::MTYPE_IO_PUMP);
  687. if(NULL == pipe) return false;
  688. #if LL_THREADS_APR
  689. LLScopedLock lock(mCallbackMutex);
  690. #endif
  691. LLChainInfo info;
  692. LLLinkInfo link;
  693. link.mPipe = pipe;
  694. info.mChainLinks.push_back(link);
  695. mPendingCallbacks.push_back(info);
  696. return true;
  697. }
  698. bool LLPumpIO::respond(
  699. const links_t& links,
  700. LLIOPipe::buffer_ptr_t data,
  701. LLSD context)
  702. {
  703. LLMemType m1(LLMemType::MTYPE_IO_PUMP);
  704. // if the caller is providing a full link description, we need to
  705. // have that description matched to a particular buffer.
  706. if(!data) return false;
  707. if(links.empty()) return false;
  708. #if LL_THREADS_APR
  709. LLScopedLock lock(mCallbackMutex);
  710. #endif
  711. // Add the callback response
  712. LLChainInfo info;
  713. info.mChainLinks = links;
  714. info.mData = data;
  715. info.mContext = context;
  716. mPendingCallbacks.push_back(info);
  717. return true;
  718. }
  719. void LLPumpIO::callback()
  720. {
  721. LLMemType m1(LLMemType::MTYPE_IO_PUMP);
  722. //llinfos << "LLPumpIO::callback()" << llendl;
  723. if(true)
  724. {
  725. #if LL_THREADS_APR
  726. LLScopedLock lock(mCallbackMutex);
  727. #endif
  728. std::copy(
  729. mPendingCallbacks.begin(),
  730. mPendingCallbacks.end(),
  731. std::back_insert_iterator<callbacks_t>(mCallbacks));
  732. mPendingCallbacks.clear();
  733. }
  734. if(!mCallbacks.empty())
  735. {
  736. callbacks_t::iterator it = mCallbacks.begin();
  737. callbacks_t::iterator end = mCallbacks.end();
  738. for(; it != end; ++it)
  739. {
  740. // it's always the first and last time for respone chains
  741. (*it).mHead = (*it).mChainLinks.begin();
  742. (*it).mInit = true;
  743. (*it).mEOS = true;
  744. processChain(*it);
  745. }
  746. mCallbacks.clear();
  747. }
  748. }
  749. void LLPumpIO::control(LLPumpIO::EControl op)
  750. {
  751. #if LL_THREADS_APR
  752. LLScopedLock lock(mChainsMutex);
  753. #endif
  754. switch(op)
  755. {
  756. case PAUSE:
  757. mState = PAUSING;
  758. break;
  759. case RESUME:
  760. mState = NORMAL;
  761. break;
  762. default:
  763. // no-op
  764. break;
  765. }
  766. }
  767. void LLPumpIO::initialize(apr_pool_t* pool)
  768. {
  769. LLMemType m1(LLMemType::MTYPE_IO_PUMP);
  770. if(!pool) return;
  771. #if LL_THREADS_APR
  772. // SJB: Windows defaults to NESTED and OSX defaults to UNNESTED, so use UNNESTED explicitly.
  773. apr_thread_mutex_create(&mChainsMutex, APR_THREAD_MUTEX_UNNESTED, pool);
  774. apr_thread_mutex_create(&mCallbackMutex, APR_THREAD_MUTEX_UNNESTED, pool);
  775. #endif
  776. mPool = pool;
  777. }
  778. void LLPumpIO::cleanup()
  779. {
  780. LLMemType m1(LLMemType::MTYPE_IO_PUMP);
  781. #if LL_THREADS_APR
  782. if(mChainsMutex) apr_thread_mutex_destroy(mChainsMutex);
  783. if(mCallbackMutex) apr_thread_mutex_destroy(mCallbackMutex);
  784. #endif
  785. mChainsMutex = NULL;
  786. mCallbackMutex = NULL;
  787. if(mPollset)
  788. {
  789. // lldebugs << "cleaning up pollset" << llendl;
  790. apr_pollset_destroy(mPollset);
  791. mPollset = NULL;
  792. }
  793. if(mCurrentPool)
  794. {
  795. apr_pool_destroy(mCurrentPool);
  796. mCurrentPool = NULL;
  797. }
  798. mPool = NULL;
  799. }
  800. void LLPumpIO::rebuildPollset()
  801. {
  802. LLMemType m1(LLMemType::MTYPE_IO_PUMP);
  803. // lldebugs << "LLPumpIO::rebuildPollset()" << llendl;
  804. if(mPollset)
  805. {
  806. //lldebugs << "destroying pollset" << llendl;
  807. apr_pollset_destroy(mPollset);
  808. mPollset = NULL;
  809. }
  810. U32 size = 0;
  811. running_chains_t::iterator run_it = mRunningChains.begin();
  812. running_chains_t::iterator run_end = mRunningChains.end();
  813. for(; run_it != run_end; ++run_it)
  814. {
  815. size += (*run_it).mDescriptors.size();
  816. }
  817. //lldebugs << "found " << size << " descriptors." << llendl;
  818. if(size)
  819. {
  820. // Recycle the memory pool
  821. const S32 POLLSET_POOL_RECYCLE_COUNT = 100;
  822. if(mCurrentPool
  823.    && (0 == (++mCurrentPoolReallocCount % POLLSET_POOL_RECYCLE_COUNT)))
  824. {
  825. apr_pool_destroy(mCurrentPool);
  826. mCurrentPool = NULL;
  827. mCurrentPoolReallocCount = 0;
  828. }
  829. if(!mCurrentPool)
  830. {
  831. apr_status_t status = apr_pool_create(&mCurrentPool, mPool);
  832. (void)ll_apr_warn_status(status);
  833. }
  834. // add all of the file descriptors
  835. run_it = mRunningChains.begin();
  836. LLChainInfo::conditionals_t::iterator fd_it;
  837. LLChainInfo::conditionals_t::iterator fd_end;
  838. apr_pollset_create(&mPollset, size, mCurrentPool, 0);
  839. for(; run_it != run_end; ++run_it)
  840. {
  841. fd_it = (*run_it).mDescriptors.begin();
  842. fd_end = (*run_it).mDescriptors.end();
  843. for(; fd_it != fd_end; ++fd_it)
  844. {
  845. apr_pollset_add(mPollset, &((*fd_it).second));
  846. }
  847. }
  848. }
  849. }
  850. void LLPumpIO::processChain(LLChainInfo& chain)
  851. {
  852. PUMP_DEBUG;
  853. LLMemType m1(LLMemType::MTYPE_IO_PUMP);
  854. LLIOPipe::EStatus status = LLIOPipe::STATUS_OK;
  855. links_t::iterator it = chain.mHead;
  856. links_t::iterator end = chain.mChainLinks.end();
  857. bool need_process_signaled = false;
  858. bool keep_going = true;
  859. do
  860. {
  861. #if LL_DEBUG_PROCESS_LINK
  862. #if LL_DEBUG_PIPE_TYPE_IN_PUMP
  863. llinfos << "Processing " << typeid(*((*it).mPipe)).name() << "."
  864. << llendl;
  865. #else
  866. llinfos << "Processing link " << (*it).mPipe << "." << llendl;
  867. #endif
  868. #endif
  869. #if LL_DEBUG_SPEW_BUFFER_CHANNEL_IN
  870. if(chain.mData)
  871. {
  872. char* buf = NULL;
  873. S32 bytes = chain.mData->countAfter((*it).mChannels.in(), NULL);
  874. if(bytes)
  875. {
  876. buf = new char[bytes + 1];
  877. chain.mData->readAfter(
  878. (*it).mChannels.in(),
  879. NULL,
  880. (U8*)buf,
  881. bytes);
  882. buf[bytes] = '';
  883. llinfos << "CHANNEL IN(" << (*it).mChannels.in() << "): "
  884. << buf << llendl;
  885. delete[] buf;
  886. buf = NULL;
  887. }
  888. else
  889. {
  890. llinfos << "CHANNEL IN(" << (*it).mChannels.in()<< "): (null)"
  891. << llendl;
  892. }
  893. }
  894. #endif
  895. PUMP_DEBUG;
  896. status = (*it).mPipe->process(
  897. (*it).mChannels,
  898. chain.mData,
  899. chain.mEOS,
  900. chain.mContext,
  901. this);
  902. #if LL_DEBUG_SPEW_BUFFER_CHANNEL_OUT
  903. if(chain.mData)
  904. {
  905. char* buf = NULL;
  906. S32 bytes = chain.mData->countAfter((*it).mChannels.out(), NULL);
  907. if(bytes)
  908. {
  909. buf = new char[bytes + 1];
  910. chain.mData->readAfter(
  911. (*it).mChannels.out(),
  912. NULL,
  913. (U8*)buf,
  914. bytes);
  915. buf[bytes] = '';
  916. llinfos << "CHANNEL OUT(" << (*it).mChannels.out()<< "): "
  917. << buf << llendl;
  918. delete[] buf;
  919. buf = NULL;
  920. }
  921. else
  922. {
  923. llinfos << "CHANNEL OUT(" << (*it).mChannels.out()<< "): (null)"
  924. << llendl;
  925. }
  926. }
  927. #endif
  928. #if LL_DEBUG_PROCESS_RETURN_VALUE
  929. // Only bother with the success codes - error codes are logged
  930. // below.
  931. if(LLIOPipe::isSuccess(status))
  932. {
  933. llinfos << "Pipe returned: '"
  934. #if LL_DEBUG_PIPE_TYPE_IN_PUMP
  935. << typeid(*((*it).mPipe)).name() << "':'"
  936. #endif
  937. << LLIOPipe::lookupStatusString(status) << "'" << llendl;
  938. }
  939. #endif
  940. PUMP_DEBUG;
  941. switch(status)
  942. {
  943. case LLIOPipe::STATUS_OK:
  944. // no-op
  945. break;
  946. case LLIOPipe::STATUS_STOP:
  947. PUMP_DEBUG;
  948. status = LLIOPipe::STATUS_OK;
  949. chain.mHead = end;
  950. keep_going = false;
  951. break;
  952. case LLIOPipe::STATUS_DONE:
  953. PUMP_DEBUG;
  954. status = LLIOPipe::STATUS_OK;
  955. chain.mHead = (it + 1);
  956. chain.mEOS = true;
  957. break;
  958. case LLIOPipe::STATUS_BREAK:
  959. PUMP_DEBUG;
  960. status = LLIOPipe::STATUS_OK;
  961. keep_going = false;
  962. break;
  963. case LLIOPipe::STATUS_NEED_PROCESS:
  964. PUMP_DEBUG;
  965. status = LLIOPipe::STATUS_OK;
  966. if(!need_process_signaled)
  967. {
  968. need_process_signaled = true;
  969. chain.mHead = it;
  970. }
  971. break;
  972. default:
  973. PUMP_DEBUG;
  974. if(LLIOPipe::isError(status))
  975. {
  976. llinfos << "Pump generated pipe err: '"
  977. #if LL_DEBUG_PIPE_TYPE_IN_PUMP
  978. << typeid(*((*it).mPipe)).name() << "':'"
  979. #endif
  980. << LLIOPipe::lookupStatusString(status)
  981. << "'" << llendl;
  982. #if LL_DEBUG_SPEW_BUFFER_CHANNEL_IN_ON_ERROR
  983. if(chain.mData)
  984. {
  985. char* buf = NULL;
  986. S32 bytes = chain.mData->countAfter(
  987. (*it).mChannels.in(),
  988. NULL);
  989. if(bytes)
  990. {
  991. buf = new char[bytes + 1];
  992. chain.mData->readAfter(
  993. (*it).mChannels.in(),
  994. NULL,
  995. (U8*)buf,
  996. bytes);
  997. buf[bytes] = '';
  998. llinfos << "Input After Error: " << buf << llendl;
  999. delete[] buf;
  1000. buf = NULL;
  1001. }
  1002. else
  1003. {
  1004. llinfos << "Input After Error: (null)" << llendl;
  1005. }
  1006. }
  1007. else
  1008. {
  1009. llinfos << "Input After Error: (null)" << llendl;
  1010. }
  1011. #endif
  1012. keep_going = false;
  1013. chain.mHead  = it;
  1014. if(!handleChainError(chain, status))
  1015. {
  1016. chain.mHead = end;
  1017. }
  1018. }
  1019. else
  1020. {
  1021. llinfos << "Unhandled status code: " << status << ":"
  1022. << LLIOPipe::lookupStatusString(status) << llendl;
  1023. }
  1024. break;
  1025. }
  1026. PUMP_DEBUG;
  1027. } while(keep_going && (++it != end));
  1028. PUMP_DEBUG;
  1029. }
  1030. bool LLPumpIO::handleChainError(
  1031. LLChainInfo& chain,
  1032. LLIOPipe::EStatus error)
  1033. {
  1034. LLMemType m1(LLMemType::MTYPE_IO_PUMP);
  1035. links_t::reverse_iterator rit;
  1036. if(chain.mHead == chain.mChainLinks.end())
  1037. {
  1038. rit = links_t::reverse_iterator(chain.mHead);
  1039. }
  1040. else
  1041. {
  1042. rit = links_t::reverse_iterator(chain.mHead + 1);
  1043. }
  1044. links_t::reverse_iterator rend = chain.mChainLinks.rend();
  1045. bool handled = false;
  1046. bool keep_going = true;
  1047. do
  1048. {
  1049. #if LL_DEBUG_PIPE_TYPE_IN_PUMP
  1050. lldebugs << "Passing error to " << typeid(*((*rit).mPipe)).name()
  1051.  << "." << llendl;
  1052. #endif
  1053. error = (*rit).mPipe->handleError(error, this);
  1054. switch(error)
  1055. {
  1056. case LLIOPipe::STATUS_OK:
  1057. handled = true;
  1058. chain.mHead = rit.base();
  1059. break;
  1060. case LLIOPipe::STATUS_STOP:
  1061. case LLIOPipe::STATUS_DONE:
  1062. case LLIOPipe::STATUS_BREAK:
  1063. case LLIOPipe::STATUS_NEED_PROCESS:
  1064. #if LL_DEBUG_PIPE_TYPE_IN_PUMP
  1065. lldebugs << "Pipe " << typeid(*((*rit).mPipe)).name()
  1066.  << " returned code to stop error handler." << llendl;
  1067. #endif
  1068. keep_going = false;
  1069. break;
  1070. default:
  1071. if(LLIOPipe::isSuccess(error))
  1072. {
  1073. llinfos << "Unhandled status code: " << error << ":"
  1074. << LLIOPipe::lookupStatusString(error) << llendl;
  1075. error = LLIOPipe::STATUS_ERROR;
  1076. keep_going = false;
  1077. }
  1078. break;
  1079. }
  1080. } while(keep_going && !handled && (++rit != rend));
  1081. return handled;
  1082. }
  1083. /**
  1084.  * LLPumpIO::LLChainInfo
  1085.  */
  1086. LLPumpIO::LLChainInfo::LLChainInfo() :
  1087. mInit(false),
  1088. mLock(0),
  1089. mEOS(false)
  1090. {
  1091. LLMemType m1(LLMemType::MTYPE_IO_PUMP);
  1092. mTimer.setTimerExpirySec(DEFAULT_CHAIN_EXPIRY_SECS);
  1093. }
  1094. void LLPumpIO::LLChainInfo::setTimeoutSeconds(F32 timeout)
  1095. {
  1096. LLMemType m1(LLMemType::MTYPE_IO_PUMP);
  1097. if(timeout > 0.0f)
  1098. {
  1099. mTimer.start();
  1100. mTimer.reset();
  1101. mTimer.setTimerExpirySec(timeout);
  1102. }
  1103. else
  1104. {
  1105. mTimer.stop();
  1106. }
  1107. }
  1108. void LLPumpIO::LLChainInfo::adjustTimeoutSeconds(F32 delta)
  1109. {
  1110. LLMemType m1(LLMemType::MTYPE_IO_PUMP);
  1111. if(mTimer.getStarted())
  1112. {
  1113. F64 expiry = mTimer.expiresAt();
  1114. expiry += delta;
  1115. mTimer.setExpiryAt(expiry);
  1116. }
  1117. }