lliosocket.cpp
上传用户:king477883
上传日期:2021-03-01
资源大小:9553k
文件大小:16k
源码类别:

游戏引擎

开发平台:

C++ Builder

  1. /** 
  2.  * @file lliosocket.cpp
  3.  * @author Phoenix
  4.  * @date 2005-07-31
  5.  * @brief Sockets declarations for use with the io pipes
  6.  *
  7.  * $LicenseInfo:firstyear=2005&license=viewergpl$
  8.  * 
  9.  * Copyright (c) 2005-2010, Linden Research, Inc.
  10.  * 
  11.  * Second Life Viewer Source Code
  12.  * The source code in this file ("Source Code") is provided by Linden Lab
  13.  * to you under the terms of the GNU General Public License, version 2.0
  14.  * ("GPL"), unless you have obtained a separate licensing agreement
  15.  * ("Other License"), formally executed by you and Linden Lab.  Terms of
  16.  * the GPL can be found in doc/GPL-license.txt in this distribution, or
  17.  * online at http://secondlifegrid.net/programs/open_source/licensing/gplv2
  18.  * 
  19.  * There are special exceptions to the terms and conditions of the GPL as
  20.  * it is applied to this Source Code. View the full text of the exception
  21.  * in the file doc/FLOSS-exception.txt in this software distribution, or
  22.  * online at
  23.  * http://secondlifegrid.net/programs/open_source/licensing/flossexception
  24.  * 
  25.  * By copying, modifying or distributing this software, you acknowledge
  26.  * that you have read and understood your obligations described above,
  27.  * and agree to abide by those obligations.
  28.  * 
  29.  * ALL LINDEN LAB SOURCE CODE IS PROVIDED "AS IS." LINDEN LAB MAKES NO
  30.  * WARRANTIES, EXPRESS, IMPLIED OR OTHERWISE, REGARDING ITS ACCURACY,
  31.  * COMPLETENESS OR PERFORMANCE.
  32.  * $/LicenseInfo$
  33.  */
  34. #include "linden_common.h"
  35. #include "lliosocket.h"
  36. #include "llapr.h"
  37. #include "llbuffer.h"
  38. #include "llhost.h"
  39. #include "llmemtype.h"
  40. #include "llpumpio.h"
  41. //
  42. // constants
  43. //
  44. static const S32 LL_DEFAULT_LISTEN_BACKLOG = 10;
  45. static const S32 LL_SEND_BUFFER_SIZE = 40000;
  46. static const S32 LL_RECV_BUFFER_SIZE = 40000;
  47. //static const U16 LL_PORT_DISCOVERY_RANGE_MIN = 13000;
  48. //static const U16 LL_PORT_DISCOVERY_RANGE_MAX = 13050;
  49. //
  50. // local methods 
  51. //
  52. bool is_addr_in_use(apr_status_t status)
  53. {
  54. #if LL_WINDOWS
  55. return (WSAEADDRINUSE == APR_TO_OS_ERROR(status));
  56. #else
  57. return (EADDRINUSE == APR_TO_OS_ERROR(status));
  58. #endif
  59. }
  60. #if LL_LINUX
  61. // Define this to see the actual file descriptors being tossed around.
  62. //#define LL_DEBUG_SOCKET_FILE_DESCRIPTORS 1
  63. #if LL_DEBUG_SOCKET_FILE_DESCRIPTORS
  64. #include "apr_portable.h"
  65. #endif
  66. #endif
  67. // Quick function 
  68. void ll_debug_socket(const char* msg, apr_socket_t* apr_sock)
  69. {
  70. #if LL_DEBUG_SOCKET_FILE_DESCRIPTORS
  71. if(!apr_sock)
  72. {
  73. lldebugs << "Socket -- " << (msg?msg:"") << ": no socket." << llendl;
  74. return;
  75. }
  76. // *TODO: Why doesn't this work?
  77. //apr_os_sock_t os_sock;
  78. int os_sock;
  79. if(APR_SUCCESS == apr_os_sock_get(&os_sock, apr_sock))
  80. {
  81. lldebugs << "Socket -- " << (msg?msg:"") << " on fd " << os_sock
  82. << " at " << apr_sock << llendl;
  83. }
  84. else
  85. {
  86. lldebugs << "Socket -- " << (msg?msg:"") << " no fd "
  87. << " at " << apr_sock << llendl;
  88. }
  89. #endif
  90. }
  91. ///
  92. /// LLSocket
  93. ///
  94. // static
  95. LLSocket::ptr_t LLSocket::create(apr_pool_t* pool, EType type, U16 port)
  96. {
  97. LLMemType m1(LLMemType::MTYPE_IO_TCP);
  98. LLSocket::ptr_t rv;
  99. apr_socket_t* socket = NULL;
  100. apr_pool_t* new_pool = NULL;
  101. apr_status_t status = APR_EGENERAL;
  102. // create a pool for the socket
  103. status = apr_pool_create(&new_pool, pool);
  104. if(ll_apr_warn_status(status))
  105. {
  106. if(new_pool) apr_pool_destroy(new_pool);
  107. return rv;
  108. }
  109. if(STREAM_TCP == type)
  110. {
  111. status = apr_socket_create(
  112. &socket,
  113. APR_INET,
  114. SOCK_STREAM,
  115. APR_PROTO_TCP,
  116. new_pool);
  117. }
  118. else if(DATAGRAM_UDP == type)
  119. {
  120. status = apr_socket_create(
  121. &socket,
  122. APR_INET,
  123. SOCK_DGRAM,
  124. APR_PROTO_UDP,
  125. new_pool);
  126. }
  127. else
  128. {
  129. if(new_pool) apr_pool_destroy(new_pool);
  130. return rv;
  131. }
  132. if(ll_apr_warn_status(status))
  133. {
  134. if(new_pool) apr_pool_destroy(new_pool);
  135. return rv;
  136. }
  137. rv = ptr_t(new LLSocket(socket, new_pool));
  138. if(port > 0)
  139. {
  140. apr_sockaddr_t* sa = NULL;
  141. status = apr_sockaddr_info_get(
  142. &sa,
  143. APR_ANYADDR,
  144. APR_UNSPEC,
  145. port,
  146. 0,
  147. new_pool);
  148. if(ll_apr_warn_status(status))
  149. {
  150. rv.reset();
  151. return rv;
  152. }
  153. // This allows us to reuse the address on quick down/up. This
  154. // is unlikely to create problems.
  155. ll_apr_warn_status(apr_socket_opt_set(socket, APR_SO_REUSEADDR, 1));
  156. status = apr_socket_bind(socket, sa);
  157. if(ll_apr_warn_status(status))
  158. {
  159. rv.reset();
  160. return rv;
  161. }
  162. lldebugs << "Bound " << ((DATAGRAM_UDP == type) ? "udp" : "tcp")
  163.  << " socket to port: " << sa->port << llendl;
  164. if(STREAM_TCP == type)
  165. {
  166. // If it's a stream based socket, we need to tell the OS
  167. // to keep a queue of incoming connections for ACCEPT.
  168. lldebugs << "Setting listen state for socket." << llendl;
  169. status = apr_socket_listen(
  170. socket,
  171. LL_DEFAULT_LISTEN_BACKLOG);
  172. if(ll_apr_warn_status(status))
  173. {
  174. rv.reset();
  175. return rv;
  176. }
  177. }
  178. }
  179. else
  180. {
  181. // we need to indicate that we have an ephemeral port if the
  182. // previous calls were successful. It will
  183. port = PORT_EPHEMERAL;
  184. }
  185. rv->mPort = port;
  186. rv->setOptions();
  187. return rv;
  188. }
  189. // static
  190. LLSocket::ptr_t LLSocket::create(apr_socket_t* socket, apr_pool_t* pool)
  191. {
  192. LLMemType m1(LLMemType::MTYPE_IO_TCP);
  193. LLSocket::ptr_t rv;
  194. if(!socket)
  195. {
  196. return rv;
  197. }
  198. rv = ptr_t(new LLSocket(socket, pool));
  199. rv->mPort = PORT_EPHEMERAL;
  200. rv->setOptions();
  201. return rv;
  202. }
  203. bool LLSocket::blockingConnect(const LLHost& host)
  204. {
  205. if(!mSocket) return false;
  206. apr_sockaddr_t* sa = NULL;
  207. std::string ip_address;
  208. ip_address = host.getIPString();
  209. if(ll_apr_warn_status(apr_sockaddr_info_get(
  210. &sa,
  211. ip_address.c_str(),
  212. APR_UNSPEC,
  213. host.getPort(),
  214. 0,
  215. mPool)))
  216. {
  217. return false;
  218. }
  219. apr_socket_timeout_set(mSocket, 1000);
  220. ll_debug_socket("Blocking connect", mSocket);
  221. if(ll_apr_warn_status(apr_socket_connect(mSocket, sa))) return false;
  222. setOptions();
  223. return true;
  224. }
  225. LLSocket::LLSocket(apr_socket_t* socket, apr_pool_t* pool) :
  226. mSocket(socket),
  227. mPool(pool),
  228. mPort(PORT_INVALID)
  229. {
  230. ll_debug_socket("Constructing wholely formed socket", mSocket);
  231. LLMemType m1(LLMemType::MTYPE_IO_TCP);
  232. }
  233. LLSocket::~LLSocket()
  234. {
  235. LLMemType m1(LLMemType::MTYPE_IO_TCP);
  236. // *FIX: clean up memory we are holding.
  237. if(mSocket)
  238. {
  239. ll_debug_socket("Destroying socket", mSocket);
  240. apr_socket_close(mSocket);
  241. }
  242. if(mPool)
  243. {
  244. apr_pool_destroy(mPool);
  245. }
  246. }
  247. void LLSocket::setOptions()
  248. {
  249. LLMemType m1(LLMemType::MTYPE_IO_TCP);
  250. // set up the socket options
  251. ll_apr_warn_status(apr_socket_timeout_set(mSocket, 0));
  252. ll_apr_warn_status(apr_socket_opt_set(mSocket, APR_SO_SNDBUF, LL_SEND_BUFFER_SIZE));
  253. ll_apr_warn_status(apr_socket_opt_set(mSocket, APR_SO_RCVBUF, LL_RECV_BUFFER_SIZE));
  254. }
  255. ///
  256. /// LLIOSocketReader
  257. ///
  258. LLIOSocketReader::LLIOSocketReader(LLSocket::ptr_t socket) :
  259. mSource(socket),
  260. mInitialized(false)
  261. {
  262. LLMemType m1(LLMemType::MTYPE_IO_TCP);
  263. }
  264. LLIOSocketReader::~LLIOSocketReader()
  265. {
  266. LLMemType m1(LLMemType::MTYPE_IO_TCP);
  267. //lldebugs << "Destroying LLIOSocketReader" << llendl;
  268. }
  269. // virtual
  270. LLIOPipe::EStatus LLIOSocketReader::process_impl(
  271. const LLChannelDescriptors& channels,
  272. buffer_ptr_t& buffer,
  273. bool& eos,
  274. LLSD& context,
  275. LLPumpIO* pump)
  276. {
  277. PUMP_DEBUG;
  278. LLMemType m1(LLMemType::MTYPE_IO_TCP);
  279. if(!mSource) return STATUS_PRECONDITION_NOT_MET;
  280. if(!mInitialized)
  281. {
  282. PUMP_DEBUG;
  283. // Since the read will not block, it's ok to initialize and
  284. // attempt to read off the descriptor immediately.
  285. mInitialized = true;
  286. if(pump)
  287. {
  288. PUMP_DEBUG;
  289. lldebugs << "Initializing poll descriptor for LLIOSocketReader."
  290.  << llendl;
  291. apr_pollfd_t poll_fd;
  292. poll_fd.p = NULL;
  293. poll_fd.desc_type = APR_POLL_SOCKET;
  294. poll_fd.reqevents = APR_POLLIN;
  295. poll_fd.rtnevents = 0x0;
  296. poll_fd.desc.s = mSource->getSocket();
  297. poll_fd.client_data = NULL;
  298. pump->setConditional(this, &poll_fd);
  299. }
  300. }
  301. //if(!buffer)
  302. //{
  303. // buffer = new LLBufferArray;
  304. //}
  305. PUMP_DEBUG;
  306. const apr_size_t READ_BUFFER_SIZE = 1024;
  307. char read_buf[READ_BUFFER_SIZE]; /*Flawfinder: ignore*/
  308. apr_size_t len;
  309. apr_status_t status = APR_SUCCESS;
  310. do
  311. {
  312. PUMP_DEBUG;
  313. len = READ_BUFFER_SIZE;
  314. status = apr_socket_recv(mSource->getSocket(), read_buf, &len);
  315. buffer->append(channels.out(), (U8*)read_buf, len);
  316. } while((APR_SUCCESS == status) && (READ_BUFFER_SIZE == len));
  317. lldebugs << "socket read status: " << status << llendl;
  318. LLIOPipe::EStatus rv = STATUS_OK;
  319. PUMP_DEBUG;
  320. // *FIX: Also need to check for broken pipe
  321. if(APR_STATUS_IS_EOF(status))
  322. {
  323. // *FIX: Should we shut down the socket read?
  324. if(pump)
  325. {
  326. pump->setConditional(this, NULL);
  327. }
  328. rv = STATUS_DONE;
  329. eos = true;
  330. }
  331. else if(APR_STATUS_IS_EAGAIN(status))
  332. {
  333. /*Commented out by Aura 9-9-8 for DEV-19961.
  334. // everything is fine, but we can terminate this process pump.
  335. rv = STATUS_BREAK;
  336. */
  337. }
  338. else
  339. {
  340. if(ll_apr_warn_status(status))
  341. {
  342. rv = STATUS_ERROR;
  343. }
  344. }
  345. PUMP_DEBUG;
  346. return rv;
  347. }
  348. ///
  349. /// LLIOSocketWriter
  350. ///
  351. LLIOSocketWriter::LLIOSocketWriter(LLSocket::ptr_t socket) :
  352. mDestination(socket),
  353. mLastWritten(NULL),
  354. mInitialized(false)
  355. {
  356. LLMemType m1(LLMemType::MTYPE_IO_TCP);
  357. }
  358. LLIOSocketWriter::~LLIOSocketWriter()
  359. {
  360. LLMemType m1(LLMemType::MTYPE_IO_TCP);
  361. //lldebugs << "Destroying LLIOSocketWriter" << llendl;
  362. }
  363. // virtual
  364. LLIOPipe::EStatus LLIOSocketWriter::process_impl(
  365. const LLChannelDescriptors& channels,
  366. buffer_ptr_t& buffer,
  367. bool& eos,
  368. LLSD& context,
  369. LLPumpIO* pump)
  370. {
  371. PUMP_DEBUG;
  372. LLMemType m1(LLMemType::MTYPE_IO_TCP);
  373. if(!mDestination) return STATUS_PRECONDITION_NOT_MET;
  374. if(!mInitialized)
  375. {
  376. PUMP_DEBUG;
  377. // Since the write will not block, it's ok to initialize and
  378. // attempt to write immediately.
  379. mInitialized = true;
  380. if(pump)
  381. {
  382. PUMP_DEBUG;
  383. lldebugs << "Initializing poll descriptor for LLIOSocketWriter."
  384.  << llendl;
  385. apr_pollfd_t poll_fd;
  386. poll_fd.p = NULL;
  387. poll_fd.desc_type = APR_POLL_SOCKET;
  388. poll_fd.reqevents = APR_POLLOUT;
  389. poll_fd.rtnevents = 0x0;
  390. poll_fd.desc.s = mDestination->getSocket();
  391. poll_fd.client_data = NULL;
  392. pump->setConditional(this, &poll_fd);
  393. }
  394. }
  395. PUMP_DEBUG;
  396. // *FIX: Some sort of writev implementation would be much more
  397. // efficient - not only because writev() is better, but also
  398. // because we won't have to do as much work to find the start
  399. // address.
  400. LLBufferArray::segment_iterator_t it;
  401. LLBufferArray::segment_iterator_t end = buffer->endSegment();
  402. LLSegment segment;
  403. it = buffer->constructSegmentAfter(mLastWritten, segment);
  404. /*
  405. if(NULL == mLastWritten)
  406. {
  407. it = buffer->beginSegment();
  408. segment = (*it);
  409. }
  410. else
  411. {
  412. it = buffer->getSegment(mLastWritten);
  413. segment = (*it);
  414. S32 size = segment.size();
  415. U8* data = segment.data();
  416. if((data + size) == mLastWritten)
  417. {
  418. ++it;
  419. segment = (*it);
  420. }
  421. else
  422. {
  423. // *FIX: check the math on this one
  424. segment = LLSegment(
  425. (*it).getChannelMask(),
  426. mLastWritten + 1,
  427. size - (mLastWritten - data));
  428. }
  429. }
  430. */
  431. PUMP_DEBUG;
  432. apr_size_t len;
  433. bool done = false;
  434. apr_status_t status = APR_SUCCESS;
  435. while(it != end)
  436. {
  437. PUMP_DEBUG;
  438. if((*it).isOnChannel(channels.in()))
  439. {
  440. PUMP_DEBUG;
  441. len = (apr_size_t)segment.size();
  442. status = apr_socket_send(
  443. mDestination->getSocket(),
  444. (const char*)segment.data(),
  445. &len);
  446. // We sometimes get a 'non-blocking socket operation could not be 
  447. // completed immediately' error from apr_socket_send.  In this
  448. // case we break and the data will be sent the next time the chain
  449. // is pumped.
  450. if(APR_STATUS_IS_EAGAIN(status))
  451. {
  452. ll_apr_warn_status(status);
  453. break;
  454. }
  455. mLastWritten = segment.data() + len - 1;
  456. PUMP_DEBUG;
  457. if((S32)len < segment.size())
  458. {
  459. break;
  460. }
  461. }
  462. ++it;
  463. if(it != end)
  464. {
  465. segment = (*it);
  466. }
  467. else
  468. {
  469. done = true;
  470. }
  471. }
  472. PUMP_DEBUG;
  473. if(done && eos)
  474. {
  475. return STATUS_DONE;
  476. }
  477. return STATUS_OK;
  478. }
  479. ///
  480. /// LLIOServerSocket
  481. ///
  482. LLIOServerSocket::LLIOServerSocket(
  483. apr_pool_t* pool,
  484. LLIOServerSocket::socket_t listener,
  485. factory_t factory) :
  486. mPool(pool),
  487. mListenSocket(listener),
  488. mReactor(factory),
  489. mInitialized(false),
  490. mResponseTimeout(DEFAULT_CHAIN_EXPIRY_SECS)
  491. {
  492. LLMemType m1(LLMemType::MTYPE_IO_TCP);
  493. }
  494. LLIOServerSocket::~LLIOServerSocket()
  495. {
  496. LLMemType m1(LLMemType::MTYPE_IO_TCP);
  497. //lldebugs << "Destroying LLIOServerSocket" << llendl;
  498. }
  499. void LLIOServerSocket::setResponseTimeout(F32 timeout_secs)
  500. {
  501. mResponseTimeout = timeout_secs;
  502. }
  503. // virtual
  504. LLIOPipe::EStatus LLIOServerSocket::process_impl(
  505. const LLChannelDescriptors& channels,
  506. buffer_ptr_t& buffer,
  507. bool& eos,
  508. LLSD& context,
  509. LLPumpIO* pump)
  510. {
  511. PUMP_DEBUG;
  512. LLMemType m1(LLMemType::MTYPE_IO_TCP);
  513. if(!pump)
  514. {
  515. llwarns << "Need a pump for server socket." << llendl;
  516. return STATUS_ERROR;
  517. }
  518. if(!mInitialized)
  519. {
  520. PUMP_DEBUG;
  521. // This segment sets up the pump so that we do not call
  522. // process again until we have an incoming read, aka connect()
  523. // from a remote host.
  524. lldebugs << "Initializing poll descriptor for LLIOServerSocket."
  525.  << llendl;
  526. apr_pollfd_t poll_fd;
  527. poll_fd.p = NULL;
  528. poll_fd.desc_type = APR_POLL_SOCKET;
  529. poll_fd.reqevents = APR_POLLIN;
  530. poll_fd.rtnevents = 0x0;
  531. poll_fd.desc.s = mListenSocket->getSocket();
  532. poll_fd.client_data = NULL;
  533. pump->setConditional(this, &poll_fd);
  534. mInitialized = true;
  535. return STATUS_OK;
  536. }
  537. // we are initialized, and told to process, so we must have a
  538. // socket waiting for a connection.
  539. lldebugs << "accepting socket" << llendl;
  540. PUMP_DEBUG;
  541. apr_pool_t* new_pool = NULL;
  542. apr_status_t status = apr_pool_create(&new_pool, mPool);
  543. apr_socket_t* socket = NULL;
  544. status = apr_socket_accept(
  545. &socket,
  546. mListenSocket->getSocket(),
  547. new_pool);
  548. LLSocket::ptr_t llsocket(LLSocket::create(socket, new_pool));
  549. //EStatus rv = STATUS_ERROR;
  550. if(llsocket)
  551. {
  552. PUMP_DEBUG;
  553. apr_sockaddr_t* remote_addr;
  554. apr_socket_addr_get(&remote_addr, APR_REMOTE, socket);
  555. char* remote_host_string;
  556. apr_sockaddr_ip_get(&remote_host_string, remote_addr);
  557. LLSD context;
  558. context["remote-host"] = remote_host_string;
  559. context["remote-port"] = remote_addr->port;
  560. LLPumpIO::chain_t chain;
  561. chain.push_back(LLIOPipe::ptr_t(new LLIOSocketReader(llsocket)));
  562. if(mReactor->build(chain, context))
  563. {
  564. chain.push_back(LLIOPipe::ptr_t(new LLIOSocketWriter(llsocket)));
  565. pump->addChain(chain, mResponseTimeout);
  566. status = STATUS_OK;
  567. }
  568. else
  569. {
  570. llwarns << "Unable to build reactor to socket." << llendl;
  571. }
  572. }
  573. else
  574. {
  575. llwarns << "Unable to create linden socket." << llendl;
  576. }
  577. PUMP_DEBUG;
  578. // This needs to always return success, lest it get removed from
  579. // the pump.
  580. return STATUS_OK;
  581. }
  582. #if 0
  583. LLIODataSocket::LLIODataSocket(
  584. U16 suggested_port,
  585. U16 start_discovery_port,
  586. apr_pool_t* pool) : 
  587. mSocket(NULL)
  588. {
  589. if(!pool || (PORT_INVALID == suggested_port)) return;
  590. if(ll_apr_warn_status(apr_socket_create(&mSocket, APR_INET, SOCK_DGRAM, APR_PROTO_UDP, pool))) return;
  591. apr_sockaddr_t* sa = NULL;
  592. if(ll_apr_warn_status(apr_sockaddr_info_get(&sa, APR_ANYADDR, APR_UNSPEC, suggested_port, 0, pool))) return;
  593. apr_status_t status = apr_socket_bind(mSocket, sa);
  594. if((start_discovery_port > 0) && is_addr_in_use(status))
  595. {
  596. const U16 MAX_ATTEMPT_PORTS = 50;
  597. for(U16 attempt_port = start_discovery_port;
  598. attempt_port < (start_discovery_port + MAX_ATTEMPT_PORTS);
  599. ++attempt_port)
  600. {
  601. sa->port = attempt_port;
  602. sa->sa.sin.sin_port = htons(attempt_port);
  603. status = apr_socket_bind(mSocket, sa);
  604. if(APR_SUCCESS == status) break;
  605. if(is_addr_in_use(status)) continue;
  606. (void)ll_apr_warn_status(status);
  607. }
  608. }
  609. if(ll_apr_warn_status(status)) return;
  610. if(sa->port)
  611. {
  612. lldebugs << "Bound datagram socket to port: " << sa->port << llendl;
  613. mPort = sa->port;
  614. }
  615. else
  616. {
  617. mPort = LLIOSocket::PORT_EPHEMERAL;
  618. }
  619. // set up the socket options options
  620. ll_apr_warn_status(apr_socket_timeout_set(mSocket, 0));
  621. ll_apr_warn_status(apr_socket_opt_set(mSocket, APR_SO_SNDBUF, LL_SEND_BUFFER_SIZE));
  622. ll_apr_warn_status(apr_socket_opt_set(mSocket, APR_SO_RCVBUF, LL_RECV_BUFFER_SIZE));
  623. }
  624. LLIODataSocket::~LLIODataSocket()
  625. {
  626. }
  627. #endif