llqueuedthread.cpp
上传用户:king477883
上传日期:2021-03-01
资源大小:9553k
文件大小:11k
源码类别:

游戏引擎

开发平台:

C++ Builder

  1. /** 
  2.  * @file llqueuedthread.cpp
  3.  *
  4.  * $LicenseInfo:firstyear=2004&license=viewergpl$
  5.  * 
  6.  * Copyright (c) 2004-2010, Linden Research, Inc.
  7.  * 
  8.  * Second Life Viewer Source Code
  9.  * The source code in this file ("Source Code") is provided by Linden Lab
  10.  * to you under the terms of the GNU General Public License, version 2.0
  11.  * ("GPL"), unless you have obtained a separate licensing agreement
  12.  * ("Other License"), formally executed by you and Linden Lab.  Terms of
  13.  * the GPL can be found in doc/GPL-license.txt in this distribution, or
  14.  * online at http://secondlifegrid.net/programs/open_source/licensing/gplv2
  15.  * 
  16.  * There are special exceptions to the terms and conditions of the GPL as
  17.  * it is applied to this Source Code. View the full text of the exception
  18.  * in the file doc/FLOSS-exception.txt in this software distribution, or
  19.  * online at
  20.  * http://secondlifegrid.net/programs/open_source/licensing/flossexception
  21.  * 
  22.  * By copying, modifying or distributing this software, you acknowledge
  23.  * that you have read and understood your obligations described above,
  24.  * and agree to abide by those obligations.
  25.  * 
  26.  * ALL LINDEN LAB SOURCE CODE IS PROVIDED "AS IS." LINDEN LAB MAKES NO
  27.  * WARRANTIES, EXPRESS, IMPLIED OR OTHERWISE, REGARDING ITS ACCURACY,
  28.  * COMPLETENESS OR PERFORMANCE.
  29.  * $/LicenseInfo$
  30.  */
  31. #include "linden_common.h"
  32. #include "llqueuedthread.h"
  33. #include "llstl.h"
  34. #include "lltimer.h" // ms_sleep()
  35. //============================================================================
  36. // MAIN THREAD
  37. LLQueuedThread::LLQueuedThread(const std::string& name, bool threaded) :
  38. LLThread(name),
  39. mThreaded(threaded),
  40. mIdleThread(TRUE),
  41. mNextHandle(0),
  42. mStarted(FALSE)
  43. {
  44. if (mThreaded)
  45. {
  46. start();
  47. }
  48. }
  49. // MAIN THREAD
  50. LLQueuedThread::~LLQueuedThread()
  51. {
  52. if (!mThreaded)
  53. {
  54. endThread();
  55. }
  56. shutdown();
  57. // ~LLThread() will be called here
  58. }
  59. void LLQueuedThread::shutdown()
  60. {
  61. setQuitting();
  62. unpause(); // MAIN THREAD
  63. if (mThreaded)
  64. {
  65. S32 timeout = 100;
  66. for ( ; timeout>0; timeout--)
  67. {
  68. if (isStopped())
  69. {
  70. break;
  71. }
  72. ms_sleep(100);
  73. LLThread::yield();
  74. }
  75. if (timeout == 0)
  76. {
  77. llwarns << "~LLQueuedThread (" << mName << ") timed out!" << llendl;
  78. }
  79. }
  80. else
  81. {
  82. mStatus = STOPPED;
  83. }
  84. QueuedRequest* req;
  85. S32 active_count = 0;
  86. while ( (req = (QueuedRequest*)mRequestHash.pop_element()) )
  87. {
  88. if (req->getStatus() == STATUS_QUEUED || req->getStatus() == STATUS_INPROGRESS)
  89. {
  90. ++active_count;
  91. req->setStatus(STATUS_ABORTED); // avoid assert in deleteRequest
  92. }
  93. req->deleteRequest();
  94. }
  95. if (active_count)
  96. {
  97. llwarns << "~LLQueuedThread() called with active requests: " << active_count << llendl;
  98. }
  99. }
  100. //----------------------------------------------------------------------------
  101. // MAIN THREAD
  102. // virtual
  103. S32 LLQueuedThread::update(U32 max_time_ms)
  104. {
  105. if (!mStarted)
  106. {
  107. if (!mThreaded)
  108. {
  109. startThread();
  110. mStarted = TRUE;
  111. }
  112. }
  113. return updateQueue(max_time_ms);
  114. }
  115. S32 LLQueuedThread::updateQueue(U32 max_time_ms)
  116. {
  117. F64 max_time = (F64)max_time_ms * .001;
  118. LLTimer timer;
  119. S32 pending = 1;
  120. // Frame Update
  121. if (mThreaded)
  122. {
  123. pending = getPending();
  124. unpause();
  125. }
  126. else
  127. {
  128. while (pending > 0)
  129. {
  130. pending = processNextRequest();
  131. if (max_time && timer.getElapsedTimeF64() > max_time)
  132. break;
  133. }
  134. }
  135. return pending;
  136. }
  137. void LLQueuedThread::incQueue()
  138. {
  139. // Something has been added to the queue
  140. if (!isPaused())
  141. {
  142. if (mThreaded)
  143. {
  144. wake(); // Wake the thread up if necessary.
  145. }
  146. }
  147. }
  148. //virtual
  149. // May be called from any thread
  150. S32 LLQueuedThread::getPending()
  151. {
  152. S32 res;
  153. lockData();
  154. res = mRequestQueue.size();
  155. unlockData();
  156. return res;
  157. }
  158. // MAIN thread
  159. void LLQueuedThread::waitOnPending()
  160. {
  161. while(1)
  162. {
  163. update(0);
  164. if (mIdleThread)
  165. {
  166. break;
  167. }
  168. if (mThreaded)
  169. {
  170. yield();
  171. }
  172. }
  173. return;
  174. }
  175. // MAIN thread
  176. void LLQueuedThread::printQueueStats()
  177. {
  178. lockData();
  179. if (!mRequestQueue.empty())
  180. {
  181. QueuedRequest *req = *mRequestQueue.begin();
  182. llinfos << llformat("Pending Requests:%d Current status:%d", mRequestQueue.size(), req->getStatus()) << llendl;
  183. }
  184. else
  185. {
  186. llinfos << "Queued Thread Idle" << llendl;
  187. }
  188. unlockData();
  189. }
  190. // MAIN thread
  191. LLQueuedThread::handle_t LLQueuedThread::generateHandle()
  192. {
  193. lockData();
  194. while ((mNextHandle == nullHandle()) || (mRequestHash.find(mNextHandle)))
  195. {
  196. mNextHandle++;
  197. }
  198. const LLQueuedThread::handle_t res = mNextHandle++;
  199. unlockData();
  200. return res;
  201. }
  202. // MAIN thread
  203. bool LLQueuedThread::addRequest(QueuedRequest* req)
  204. {
  205. if (mStatus == QUITTING)
  206. {
  207. return false;
  208. }
  209. lockData();
  210. req->setStatus(STATUS_QUEUED);
  211. mRequestQueue.insert(req);
  212. mRequestHash.insert(req);
  213. #if _DEBUG
  214. //  llinfos << llformat("LLQueuedThread::Added req [%08d]",handle) << llendl;
  215. #endif
  216. unlockData();
  217. incQueue();
  218. return true;
  219. }
  220. // MAIN thread
  221. bool LLQueuedThread::waitForResult(LLQueuedThread::handle_t handle, bool auto_complete)
  222. {
  223. llassert (handle != nullHandle())
  224. bool res = false;
  225. bool waspaused = isPaused();
  226. bool done = false;
  227. while(!done)
  228. {
  229. update(0); // unpauses
  230. lockData();
  231. QueuedRequest* req = (QueuedRequest*)mRequestHash.find(handle);
  232. if (!req)
  233. {
  234. done = true; // request does not exist
  235. }
  236. else if (req->getStatus() == STATUS_COMPLETE)
  237. {
  238. res = true;
  239. if (auto_complete)
  240. {
  241. mRequestHash.erase(handle);
  242. req->deleteRequest();
  243. //  check();
  244. }
  245. done = true;
  246. }
  247. unlockData();
  248. if (!done && mThreaded)
  249. {
  250. yield();
  251. }
  252. }
  253. if (waspaused)
  254. {
  255. pause();
  256. }
  257. return res;
  258. }
  259. // MAIN thread
  260. LLQueuedThread::QueuedRequest* LLQueuedThread::getRequest(handle_t handle)
  261. {
  262. if (handle == nullHandle())
  263. {
  264. return 0;
  265. }
  266. lockData();
  267. QueuedRequest* res = (QueuedRequest*)mRequestHash.find(handle);
  268. unlockData();
  269. return res;
  270. }
  271. LLQueuedThread::status_t LLQueuedThread::getRequestStatus(handle_t handle)
  272. {
  273. status_t res = STATUS_EXPIRED;
  274. lockData();
  275. QueuedRequest* req = (QueuedRequest*)mRequestHash.find(handle);
  276. if (req)
  277. {
  278. res = req->getStatus();
  279. }
  280. unlockData();
  281. return res;
  282. }
  283. void LLQueuedThread::abortRequest(handle_t handle, bool autocomplete)
  284. {
  285. lockData();
  286. QueuedRequest* req = (QueuedRequest*)mRequestHash.find(handle);
  287. if (req)
  288. {
  289. req->setFlags(FLAG_ABORT | (autocomplete ? FLAG_AUTO_COMPLETE : 0));
  290. }
  291. unlockData();
  292. }
  293. // MAIN thread
  294. void LLQueuedThread::setFlags(handle_t handle, U32 flags)
  295. {
  296. lockData();
  297. QueuedRequest* req = (QueuedRequest*)mRequestHash.find(handle);
  298. if (req)
  299. {
  300. req->setFlags(flags);
  301. }
  302. unlockData();
  303. }
  304. void LLQueuedThread::setPriority(handle_t handle, U32 priority)
  305. {
  306. lockData();
  307. QueuedRequest* req = (QueuedRequest*)mRequestHash.find(handle);
  308. if (req)
  309. {
  310. if(req->getStatus() == STATUS_INPROGRESS)
  311. {
  312. // not in list
  313. req->setPriority(priority);
  314. }
  315. else if(req->getStatus() == STATUS_QUEUED)
  316. {
  317. // remove from list then re-insert
  318. llverify(mRequestQueue.erase(req) == 1);
  319. req->setPriority(priority);
  320. mRequestQueue.insert(req);
  321. }
  322. }
  323. unlockData();
  324. }
  325. bool LLQueuedThread::completeRequest(handle_t handle)
  326. {
  327. bool res = false;
  328. lockData();
  329. QueuedRequest* req = (QueuedRequest*)mRequestHash.find(handle);
  330. if (req)
  331. {
  332. llassert_always(req->getStatus() != STATUS_QUEUED);
  333. llassert_always(req->getStatus() != STATUS_INPROGRESS);
  334. #if _DEBUG
  335. //  llinfos << llformat("LLQueuedThread::Completed req [%08d]",handle) << llendl;
  336. #endif
  337. mRequestHash.erase(handle);
  338. req->deleteRequest();
  339. //  check();
  340. res = true;
  341. }
  342. unlockData();
  343. return res;
  344. }
  345. bool LLQueuedThread::check()
  346. {
  347. #if 0 // not a reliable check once mNextHandle wraps, just for quick and dirty debugging
  348. for (int i=0; i<REQUEST_HASH_SIZE; i++)
  349. {
  350. LLSimpleHashEntry<handle_t>* entry = mRequestHash.get_element_at_index(i);
  351. while (entry)
  352. {
  353. if (entry->getHashKey() > mNextHandle)
  354. {
  355. llerrs << "Hash Error" << llendl;
  356. return false;
  357. }
  358. entry = entry->getNextEntry();
  359. }
  360. }
  361. #endif
  362. return true;
  363. }
  364. //============================================================================
  365. // Runs on its OWN thread
  366. S32 LLQueuedThread::processNextRequest()
  367. {
  368. QueuedRequest *req;
  369. // Get next request from pool
  370. lockData();
  371. while(1)
  372. {
  373. req = NULL;
  374. if (mRequestQueue.empty())
  375. {
  376. break;
  377. }
  378. req = *mRequestQueue.begin();
  379. mRequestQueue.erase(mRequestQueue.begin());
  380. if ((req->getFlags() & FLAG_ABORT) || (mStatus == QUITTING))
  381. {
  382. req->setStatus(STATUS_ABORTED);
  383. req->finishRequest(false);
  384. if (req->getFlags() & FLAG_AUTO_COMPLETE)
  385. {
  386. mRequestHash.erase(req);
  387. req->deleteRequest();
  388. //  check();
  389. }
  390. continue;
  391. }
  392. llassert_always(req->getStatus() == STATUS_QUEUED);
  393. break;
  394. }
  395. if (req)
  396. {
  397. req->setStatus(STATUS_INPROGRESS);
  398. }
  399. unlockData();
  400. // This is the only place we will call req->setStatus() after
  401. // it has initially been seet to STATUS_QUEUED, so it is
  402. // safe to access req.
  403. if (req)
  404. {
  405. // process request
  406. U32 start_priority = req->getPriority();
  407. bool complete = req->processRequest();
  408. if (complete)
  409. {
  410. lockData();
  411. req->setStatus(STATUS_COMPLETE);
  412. req->finishRequest(true);
  413. if (req->getFlags() & FLAG_AUTO_COMPLETE)
  414. {
  415. mRequestHash.erase(req);
  416. req->deleteRequest();
  417. //  check();
  418. }
  419. unlockData();
  420. }
  421. else
  422. {
  423. lockData();
  424. req->setStatus(STATUS_QUEUED);
  425. mRequestQueue.insert(req);
  426. unlockData();
  427. if (mThreaded && start_priority <= PRIORITY_LOW)
  428. {
  429. ms_sleep(1); // sleep the thread a little
  430. }
  431. }
  432. }
  433. S32 pending = getPending();
  434. return pending;
  435. }
  436. // virtual
  437. bool LLQueuedThread::runCondition()
  438. {
  439. // mRunCondition must be locked here
  440. if (mRequestQueue.empty() && mIdleThread)
  441. return false;
  442. else
  443. return true;
  444. }
  445. // virtual
  446. void LLQueuedThread::run()
  447. {
  448. // call checPause() immediately so we don't try to do anything before the class is fully constructed
  449. checkPause();
  450. startThread();
  451. mStarted = TRUE;
  452. while (1)
  453. {
  454. // this will block on the condition until runCondition() returns true, the thread is unpaused, or the thread leaves the RUNNING state.
  455. checkPause();
  456. if (isQuitting())
  457. {
  458. endThread();
  459. break;
  460. }
  461. mIdleThread = FALSE;
  462. threadedUpdate();
  463. int res = processNextRequest();
  464. if (res == 0)
  465. {
  466. mIdleThread = TRUE;
  467. ms_sleep(1);
  468. }
  469. //LLThread::yield(); // thread should yield after each request
  470. }
  471. llinfos << "LLQueuedThread " << mName << " EXITING." << llendl;
  472. }
  473. // virtual
  474. void LLQueuedThread::startThread()
  475. {
  476. }
  477. // virtual
  478. void LLQueuedThread::endThread()
  479. {
  480. }
  481. // virtual
  482. void LLQueuedThread::threadedUpdate()
  483. {
  484. }
  485. //============================================================================
  486. LLQueuedThread::QueuedRequest::QueuedRequest(LLQueuedThread::handle_t handle, U32 priority, U32 flags) :
  487. LLSimpleHashEntry<LLQueuedThread::handle_t>(handle),
  488. mStatus(STATUS_UNKNOWN),
  489. mPriority(priority),
  490. mFlags(flags)
  491. {
  492. }
  493. LLQueuedThread::QueuedRequest::~QueuedRequest()
  494. {
  495. llassert_always(mStatus == STATUS_DELETE);
  496. }
  497. //virtual
  498. void LLQueuedThread::QueuedRequest::finishRequest(bool completed)
  499. {
  500. }
  501. //virtual
  502. void LLQueuedThread::QueuedRequest::deleteRequest()
  503. {
  504. llassert_always(mStatus != STATUS_INPROGRESS);
  505. setStatus(STATUS_DELETE);
  506. delete this;
  507. }