xprt.c
上传用户:jlfgdled
上传日期:2013-04-10
资源大小:33168k
文件大小:36k
源码类别:

Linux/Unix编程

开发平台:

Unix_Linux

  1. /*
  2.  *  linux/net/sunrpc/xprt.c
  3.  *
  4.  *  This is a generic RPC call interface supporting congestion avoidance,
  5.  *  and asynchronous calls.
  6.  *
  7.  *  The interface works like this:
  8.  *
  9.  *  - When a process places a call, it allocates a request slot if
  10.  * one is available. Otherwise, it sleeps on the backlog queue
  11.  * (xprt_reserve).
  12.  *  - Next, the caller puts together the RPC message, stuffs it into
  13.  * the request struct, and calls xprt_call().
  14.  *  - xprt_call transmits the message and installs the caller on the
  15.  * socket's wait list. At the same time, it installs a timer that
  16.  * is run after the packet's timeout has expired.
  17.  *  - When a packet arrives, the data_ready handler walks the list of
  18.  * pending requests for that socket. If a matching XID is found, the
  19.  * caller is woken up, and the timer removed.
  20.  *  - When no reply arrives within the timeout interval, the timer is
  21.  * fired by the kernel and runs xprt_timer(). It either adjusts the
  22.  * timeout values (minor timeout) or wakes up the caller with a status
  23.  * of -ETIMEDOUT.
  24.  *  - When the caller receives a notification from RPC that a reply arrived,
  25.  * it should release the RPC slot, and process the reply.
  26.  * If the call timed out, it may choose to retry the operation by
  27.  * adjusting the initial timeout value, and simply calling rpc_call
  28.  * again.
  29.  *
  30.  *  Support for async RPC is done through a set of RPC-specific scheduling
  31.  *  primitives that `transparently' work for processes as well as async
  32.  *  tasks that rely on callbacks.
  33.  *
  34.  *  Copyright (C) 1995-1997, Olaf Kirch <okir@monad.swb.de>
  35.  *
  36.  *  TCP callback races fixes (C) 1998 Red Hat Software <alan@redhat.com>
  37.  *  TCP send fixes (C) 1998 Red Hat Software <alan@redhat.com>
  38.  *  TCP NFS related read + write fixes
  39.  *   (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie>
  40.  *
  41.  *  Rewrite of larges part of the code in order to stabilize TCP stuff.
  42.  *  Fix behaviour when socket buffer is full.
  43.  *   (C) 1999 Trond Myklebust <trond.myklebust@fys.uio.no>
  44.  */
  45. #define __KERNEL_SYSCALLS__
  46. #include <linux/version.h>
  47. #include <linux/types.h>
  48. #include <linux/slab.h>
  49. #include <linux/capability.h>
  50. #include <linux/sched.h>
  51. #include <linux/errno.h>
  52. #include <linux/socket.h>
  53. #include <linux/in.h>
  54. #include <linux/net.h>
  55. #include <linux/mm.h>
  56. #include <linux/udp.h>
  57. #include <linux/unistd.h>
  58. #include <linux/sunrpc/clnt.h>
  59. #include <linux/file.h>
  60. #include <net/sock.h>
  61. #include <net/checksum.h>
  62. #include <net/udp.h>
  63. #include <net/tcp.h>
  64. #include <asm/uaccess.h>
  65. /*
  66.  * Local variables
  67.  */
  68. #ifdef RPC_DEBUG
  69. # undef  RPC_DEBUG_DATA
  70. # define RPCDBG_FACILITY RPCDBG_XPRT
  71. #endif
  72. #define XPRT_MAX_BACKOFF (8)
  73. /*
  74.  * Local functions
  75.  */
  76. static void xprt_request_init(struct rpc_task *, struct rpc_xprt *);
  77. static void do_xprt_transmit(struct rpc_task *);
  78. static void xprt_reserve_status(struct rpc_task *task);
  79. static void xprt_disconnect(struct rpc_xprt *);
  80. static void xprt_reconn_status(struct rpc_task *task);
  81. static struct socket *xprt_create_socket(int, struct rpc_timeout *);
  82. static int xprt_bind_socket(struct rpc_xprt *, struct socket *);
  83. static int      __xprt_get_cong(struct rpc_xprt *, struct rpc_task *);
  84. #ifdef RPC_DEBUG_DATA
  85. /*
  86.  * Print the buffer contents (first 128 bytes only--just enough for
  87.  * diropres return).
  88.  */
  89. static void
  90. xprt_pktdump(char *msg, u32 *packet, unsigned int count)
  91. {
  92. u8 *buf = (u8 *) packet;
  93. int j;
  94. dprintk("RPC:      %sn", msg);
  95. for (j = 0; j < count && j < 128; j += 4) {
  96. if (!(j & 31)) {
  97. if (j)
  98. dprintk("n");
  99. dprintk("0x%04x ", j);
  100. }
  101. dprintk("%02x%02x%02x%02x ",
  102. buf[j], buf[j+1], buf[j+2], buf[j+3]);
  103. }
  104. dprintk("n");
  105. }
  106. #else
  107. static inline void
  108. xprt_pktdump(char *msg, u32 *packet, unsigned int count)
  109. {
  110. /* NOP */
  111. }
  112. #endif
  113. /*
  114.  * Look up RPC transport given an INET socket
  115.  */
  116. static inline struct rpc_xprt *
  117. xprt_from_sock(struct sock *sk)
  118. {
  119. return (struct rpc_xprt *) sk->user_data;
  120. }
  121. /*
  122.  * Serialize write access to sockets, in order to prevent different
  123.  * requests from interfering with each other.
  124.  * Also prevents TCP socket reconnections from colliding with writes.
  125.  */
  126. static int
  127. __xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task)
  128. {
  129. if (!xprt->snd_task) {
  130. if (xprt->nocong || __xprt_get_cong(xprt, task))
  131. xprt->snd_task = task;
  132. }
  133. if (xprt->snd_task != task) {
  134. dprintk("RPC: %4d TCP write queue fulln", task->tk_pid);
  135. task->tk_timeout = 0;
  136. task->tk_status = -EAGAIN;
  137. if (task->tk_rqstp && task->tk_rqstp->rq_nresend)
  138. rpc_sleep_on(&xprt->resend, task, NULL, NULL);
  139. else
  140. rpc_sleep_on(&xprt->sending, task, NULL, NULL);
  141. }
  142. return xprt->snd_task == task;
  143. }
  144. static inline int
  145. xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task)
  146. {
  147. int retval;
  148. spin_lock_bh(&xprt->sock_lock);
  149. retval = __xprt_lock_write(xprt, task);
  150. spin_unlock_bh(&xprt->sock_lock);
  151. return retval;
  152. }
  153. static void
  154. __xprt_lock_write_next(struct rpc_xprt *xprt)
  155. {
  156. struct rpc_task *task;
  157. if (xprt->snd_task)
  158. return;
  159. task = rpc_wake_up_next(&xprt->resend);
  160. if (!task) {
  161. if (!xprt->nocong && RPCXPRT_CONGESTED(xprt))
  162. return;
  163. task = rpc_wake_up_next(&xprt->sending);
  164. if (!task)
  165. return;
  166. }
  167. if (xprt->nocong || __xprt_get_cong(xprt, task))
  168. xprt->snd_task = task;
  169. }
  170. /*
  171.  * Releases the socket for use by other requests.
  172.  */
  173. static void
  174. __xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task)
  175. {
  176. if (xprt->snd_task == task)
  177. xprt->snd_task = NULL;
  178. __xprt_lock_write_next(xprt);
  179. }
  180. static inline void
  181. xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task)
  182. {
  183. spin_lock_bh(&xprt->sock_lock);
  184. __xprt_release_write(xprt, task);
  185. spin_unlock_bh(&xprt->sock_lock);
  186. }
  187. /*
  188.  * Write data to socket.
  189.  */
  190. static inline int
  191. xprt_sendmsg(struct rpc_xprt *xprt, struct rpc_rqst *req)
  192. {
  193. struct socket *sock = xprt->sock;
  194. struct msghdr msg;
  195. struct xdr_buf *xdr = &req->rq_snd_buf;
  196. struct iovec niv[MAX_IOVEC];
  197. unsigned int niov, slen, skip;
  198. mm_segment_t oldfs;
  199. int result;
  200. if (!sock)
  201. return -ENOTCONN;
  202. xprt_pktdump("packet data:",
  203. req->rq_svec->iov_base,
  204. req->rq_svec->iov_len);
  205. /* Dont repeat bytes */
  206. skip = req->rq_bytes_sent;
  207. slen = xdr->len - skip;
  208. niov = xdr_kmap(niv, xdr, skip);
  209. msg.msg_flags   = MSG_DONTWAIT|MSG_NOSIGNAL;
  210. msg.msg_iov = niv;
  211. msg.msg_iovlen = niov;
  212. msg.msg_name = (struct sockaddr *) &xprt->addr;
  213. msg.msg_namelen = sizeof(xprt->addr);
  214. msg.msg_control = NULL;
  215. msg.msg_controllen = 0;
  216. oldfs = get_fs(); set_fs(get_ds());
  217. clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags);
  218. result = sock_sendmsg(sock, &msg, slen);
  219. set_fs(oldfs);
  220. xdr_kunmap(xdr, skip);
  221. dprintk("RPC:      xprt_sendmsg(%d) = %dn", slen, result);
  222. if (result >= 0)
  223. return result;
  224. switch (result) {
  225. case -ECONNREFUSED:
  226. /* When the server has died, an ICMP port unreachable message
  227.  * prompts ECONNREFUSED.
  228.  */
  229. case -EAGAIN:
  230. break;
  231. case -ENOTCONN:
  232. case -EPIPE:
  233. /* connection broken */
  234. if (xprt->stream)
  235. result = -ENOTCONN;
  236. break;
  237. default:
  238. printk(KERN_NOTICE "RPC: sendmsg returned error %dn", -result);
  239. }
  240. return result;
  241. }
  242. /*
  243.  * Van Jacobson congestion avoidance. Check if the congestion window
  244.  * overflowed. Put the task to sleep if this is the case.
  245.  */
  246. static int
  247. __xprt_get_cong(struct rpc_xprt *xprt, struct rpc_task *task)
  248. {
  249. struct rpc_rqst *req = task->tk_rqstp;
  250. if (req->rq_cong)
  251. return 1;
  252. dprintk("RPC: %4d xprt_cwnd_limited cong = %ld cwnd = %ldn",
  253. task->tk_pid, xprt->cong, xprt->cwnd);
  254. if (RPCXPRT_CONGESTED(xprt))
  255. return 0;
  256. req->rq_cong = 1;
  257. xprt->cong += RPC_CWNDSCALE;
  258. return 1;
  259. }
  260. /*
  261.  * Adjust the congestion window, and wake up the next task
  262.  * that has been sleeping due to congestion
  263.  */
  264. static void
  265. __xprt_put_cong(struct rpc_xprt *xprt, struct rpc_rqst *req)
  266. {
  267. if (!req->rq_cong)
  268. return;
  269. req->rq_cong = 0;
  270. xprt->cong -= RPC_CWNDSCALE;
  271. __xprt_lock_write_next(xprt);
  272. }
  273. /*
  274.  * Adjust RPC congestion window
  275.  * We use a time-smoothed congestion estimator to avoid heavy oscillation.
  276.  */
  277. static void
  278. xprt_adjust_cwnd(struct rpc_xprt *xprt, int result)
  279. {
  280. unsigned long cwnd;
  281. cwnd = xprt->cwnd;
  282. if (result >= 0 && cwnd <= xprt->cong) {
  283. /* The (cwnd >> 1) term makes sure
  284.  * the result gets rounded properly. */
  285. cwnd += (RPC_CWNDSCALE * RPC_CWNDSCALE + (cwnd >> 1)) / cwnd;
  286. if (cwnd > RPC_MAXCWND)
  287. cwnd = RPC_MAXCWND;
  288. __xprt_lock_write_next(xprt);
  289. } else if (result == -ETIMEDOUT) {
  290. cwnd >>= 1;
  291. if (cwnd < RPC_CWNDSCALE)
  292. cwnd = RPC_CWNDSCALE;
  293. }
  294. dprintk("RPC:      cong %ld, cwnd was %ld, now %ldn",
  295. xprt->cong, xprt->cwnd, cwnd);
  296. xprt->cwnd = cwnd;
  297. }
  298. /*
  299.  * Adjust timeout values etc for next retransmit
  300.  */
  301. int
  302. xprt_adjust_timeout(struct rpc_timeout *to)
  303. {
  304. if (to->to_retries > 0) {
  305. if (to->to_exponential)
  306. to->to_current <<= 1;
  307. else
  308. to->to_current += to->to_increment;
  309. if (to->to_maxval && to->to_current >= to->to_maxval)
  310. to->to_current = to->to_maxval;
  311. } else {
  312. if (to->to_exponential)
  313. to->to_initval <<= 1;
  314. else
  315. to->to_initval += to->to_increment;
  316. if (to->to_maxval && to->to_initval >= to->to_maxval)
  317. to->to_initval = to->to_maxval;
  318. to->to_current = to->to_initval;
  319. }
  320. if (!to->to_current) {
  321. printk(KERN_WARNING "xprt_adjust_timeout: to_current = 0!n");
  322. to->to_current = 5 * HZ;
  323. }
  324. pprintk("RPC: %lu %sn", jiffies,
  325. to->to_retries? "retrans" : "timeout");
  326. return to->to_retries-- > 0;
  327. }
  328. /*
  329.  * Close down a transport socket
  330.  */
  331. static void
  332. xprt_close(struct rpc_xprt *xprt)
  333. {
  334. struct socket *sock = xprt->sock;
  335. struct sock *sk = xprt->inet;
  336. if (!sk)
  337. return;
  338. xprt->inet = NULL;
  339. xprt->sock = NULL;
  340. sk->user_data    = NULL;
  341. sk->data_ready   = xprt->old_data_ready;
  342. sk->state_change = xprt->old_state_change;
  343. sk->write_space  = xprt->old_write_space;
  344. xprt_disconnect(xprt);
  345. sk->no_check  = 0;
  346. sock_release(sock);
  347. /*
  348.  * TCP doesn't require the rpciod now - other things may
  349.  * but rpciod handles that not us.
  350.  */
  351. if(xprt->stream)
  352. rpciod_down();
  353. }
  354. /*
  355.  * Mark a transport as disconnected
  356.  */
  357. static void
  358. xprt_disconnect(struct rpc_xprt *xprt)
  359. {
  360. dprintk("RPC:      disconnected transport %pn", xprt);
  361. xprt_clear_connected(xprt);
  362. rpc_wake_up_status(&xprt->pending, -ENOTCONN);
  363. }
  364. /*
  365.  * Reconnect a broken TCP connection.
  366.  *
  367.  * Note: This cannot collide with the TCP reads, as both run from rpciod
  368.  */
  369. void
  370. xprt_reconnect(struct rpc_task *task)
  371. {
  372. struct rpc_xprt *xprt = task->tk_xprt;
  373. struct socket *sock = xprt->sock;
  374. struct sock *inet;
  375. int status;
  376. dprintk("RPC: %4d xprt_reconnect %p connected %dn",
  377. task->tk_pid, xprt, xprt_connected(xprt));
  378. if (xprt->shutdown)
  379. return;
  380. if (!xprt->stream)
  381. return;
  382. if (!xprt->addr.sin_port) {
  383. task->tk_status = -EIO;
  384. return;
  385. }
  386. if (!xprt_lock_write(xprt, task))
  387. return;
  388. if (xprt_connected(xprt))
  389. goto out_write;
  390. if (sock && sock->state != SS_UNCONNECTED)
  391. xprt_close(xprt);
  392. status = -ENOTCONN;
  393. if (!(inet = xprt->inet)) {
  394. /* Create an unconnected socket */
  395. if (!(sock = xprt_create_socket(xprt->prot, &xprt->timeout)))
  396. goto defer;
  397. xprt_bind_socket(xprt, sock);
  398. inet = sock->sk;
  399. }
  400. /* Now connect it asynchronously. */
  401. dprintk("RPC: %4d connecting new socketn", task->tk_pid);
  402. status = sock->ops->connect(sock, (struct sockaddr *) &xprt->addr,
  403. sizeof(xprt->addr), O_NONBLOCK);
  404. if (status < 0) {
  405. switch (status) {
  406. case -EALREADY:
  407. case -EINPROGRESS:
  408. status = 0;
  409. break;
  410. case -EISCONN:
  411. case -EPIPE:
  412. status = 0;
  413. xprt_close(xprt);
  414. goto defer;
  415. default:
  416. printk("RPC: TCP connect error %d!n", -status);
  417. xprt_close(xprt);
  418. goto defer;
  419. }
  420. /* Protect against TCP socket state changes */
  421. lock_sock(inet);
  422. dprintk("RPC: %4d connect status %d connected %dn",
  423. task->tk_pid, status, xprt_connected(xprt));
  424. if (inet->state != TCP_ESTABLISHED) {
  425. task->tk_timeout = xprt->timeout.to_maxval;
  426. /* if the socket is already closing, delay 5 secs */
  427. if ((1<<inet->state) & ~(TCPF_SYN_SENT|TCPF_SYN_RECV))
  428. task->tk_timeout = 5*HZ;
  429. rpc_sleep_on(&xprt->pending, task, xprt_reconn_status, NULL);
  430. release_sock(inet);
  431. return;
  432. }
  433. release_sock(inet);
  434. }
  435. defer:
  436. if (status < 0) {
  437. rpc_delay(task, 5*HZ);
  438. task->tk_status = -ENOTCONN;
  439. }
  440.  out_write:
  441. xprt_release_write(xprt, task);
  442. }
  443. /*
  444.  * Reconnect timeout. We just mark the transport as not being in the
  445.  * process of reconnecting, and leave the rest to the upper layers.
  446.  */
  447. static void
  448. xprt_reconn_status(struct rpc_task *task)
  449. {
  450. struct rpc_xprt *xprt = task->tk_xprt;
  451. dprintk("RPC: %4d xprt_reconn_timeout %dn",
  452. task->tk_pid, task->tk_status);
  453. xprt_release_write(xprt, task);
  454. }
  455. /*
  456.  * Look up the RPC request corresponding to a reply, and then lock it.
  457.  */
  458. static inline struct rpc_rqst *
  459. xprt_lookup_rqst(struct rpc_xprt *xprt, u32 xid)
  460. {
  461. struct list_head *pos;
  462. struct rpc_rqst *req = NULL;
  463. list_for_each(pos, &xprt->recv) {
  464. struct rpc_rqst *entry = list_entry(pos, struct rpc_rqst, rq_list);
  465. if (entry->rq_xid == xid) {
  466. req = entry;
  467. break;
  468. }
  469. }
  470. return req;
  471. }
  472. /*
  473.  * Complete reply received.
  474.  * The TCP code relies on us to remove the request from xprt->pending.
  475.  */
  476. static void
  477. xprt_complete_rqst(struct rpc_xprt *xprt, struct rpc_rqst *req, int copied)
  478. {
  479. struct rpc_task *task = req->rq_task;
  480. struct rpc_clnt *clnt = task->tk_client;
  481. /* Adjust congestion window */
  482. if (!xprt->nocong) {
  483. xprt_adjust_cwnd(xprt, copied);
  484. __xprt_put_cong(xprt, req);
  485.         if (!req->rq_nresend) {
  486. int timer = rpcproc_timer(clnt, task->tk_msg.rpc_proc);
  487. if (timer)
  488. rpc_update_rtt(&clnt->cl_rtt, timer, (long)jiffies - req->rq_xtime);
  489. }
  490. rpc_clear_timeo(&clnt->cl_rtt);
  491. }
  492. #ifdef RPC_PROFILE
  493. /* Profile only reads for now */
  494. if (copied > 1024) {
  495. static unsigned long nextstat = 0;
  496. static unsigned long pkt_rtt = 0, pkt_len = 0, pkt_cnt = 0;
  497. pkt_cnt++;
  498. pkt_len += req->rq_slen + copied;
  499. pkt_rtt += jiffies - req->rq_xtime;
  500. if (time_before(nextstat, jiffies)) {
  501. printk("RPC: %lu %ld cwndn", jiffies, xprt->cwnd);
  502. printk("RPC: %ld %ld %ld %ld statn",
  503. jiffies, pkt_cnt, pkt_len, pkt_rtt);
  504. pkt_rtt = pkt_len = pkt_cnt = 0;
  505. nextstat = jiffies + 5 * HZ;
  506. }
  507. }
  508. #endif
  509. dprintk("RPC: %4d has input (%d bytes)n", task->tk_pid, copied);
  510. req->rq_received = copied;
  511. list_del_init(&req->rq_list);
  512. /* ... and wake up the process. */
  513. rpc_wake_up_task(task);
  514. return;
  515. }
  516. static size_t
  517. skb_read_bits(skb_reader_t *desc, void *to, size_t len)
  518. {
  519. if (len > desc->count)
  520. len = desc->count;
  521. skb_copy_bits(desc->skb, desc->offset, to, len);
  522. desc->count -= len;
  523. desc->offset += len;
  524. return len;
  525. }
  526. static size_t
  527. skb_read_and_csum_bits(skb_reader_t *desc, void *to, size_t len)
  528. {
  529. unsigned int csum2, pos;
  530. if (len > desc->count)
  531. len = desc->count;
  532. pos = desc->offset;
  533. csum2 = skb_copy_and_csum_bits(desc->skb, pos, to, len, 0);
  534. desc->csum = csum_block_add(desc->csum, csum2, pos);
  535. desc->count -= len;
  536. desc->offset += len;
  537. return len;
  538. }
  539. /*
  540.  * We have set things up such that we perform the checksum of the UDP
  541.  * packet in parallel with the copies into the RPC client iovec.  -DaveM
  542.  */
  543. static int
  544. csum_partial_copy_to_xdr(struct xdr_buf *xdr, struct sk_buff *skb)
  545. {
  546. skb_reader_t desc;
  547. desc.skb = skb;
  548. desc.offset = sizeof(struct udphdr);
  549. desc.count = skb->len - desc.offset;
  550. if (skb->ip_summed == CHECKSUM_UNNECESSARY)
  551. goto no_checksum;
  552. desc.csum = csum_partial(skb->data, desc.offset, skb->csum);
  553. xdr_partial_copy_from_skb(xdr, 0, &desc, skb_read_and_csum_bits);
  554. if (desc.offset != skb->len) {
  555. unsigned int csum2;
  556. csum2 = skb_checksum(skb, desc.offset, skb->len - desc.offset, 0);
  557. desc.csum = csum_block_add(desc.csum, csum2, desc.offset);
  558. }
  559. if ((unsigned short)csum_fold(desc.csum))
  560. return -1;
  561. return 0;
  562. no_checksum:
  563. xdr_partial_copy_from_skb(xdr, 0, &desc, skb_read_bits);
  564. return 0;
  565. }
  566. /*
  567.  * Input handler for RPC replies. Called from a bottom half and hence
  568.  * atomic.
  569.  */
  570. static void
  571. udp_data_ready(struct sock *sk, int len)
  572. {
  573. struct rpc_task *task;
  574. struct rpc_xprt *xprt;
  575. struct rpc_rqst *rovr;
  576. struct sk_buff *skb;
  577. int err, repsize, copied;
  578. dprintk("RPC:      udp_data_ready...n");
  579. if (!(xprt = xprt_from_sock(sk))) {
  580. printk("RPC:      udp_data_ready request not found!n");
  581. goto out;
  582. }
  583. dprintk("RPC:      udp_data_ready client %pn", xprt);
  584. if ((skb = skb_recv_datagram(sk, 0, 1, &err)) == NULL)
  585. goto out;
  586. if (xprt->shutdown)
  587. goto dropit;
  588. repsize = skb->len - sizeof(struct udphdr);
  589. if (repsize < 4) {
  590. printk("RPC: impossible RPC reply size %d!n", repsize);
  591. goto dropit;
  592. }
  593. /* Look up and lock the request corresponding to the given XID */
  594. spin_lock(&xprt->sock_lock);
  595. rovr = xprt_lookup_rqst(xprt, *(u32 *) (skb->h.raw + sizeof(struct udphdr)));
  596. if (!rovr)
  597. goto out_unlock;
  598. task = rovr->rq_task;
  599. dprintk("RPC: %4d received replyn", task->tk_pid);
  600. xprt_pktdump("packet data:",
  601.      (u32 *) (skb->h.raw+sizeof(struct udphdr)), repsize);
  602. if ((copied = rovr->rq_rlen) > repsize)
  603. copied = repsize;
  604. /* Suck it into the iovec, verify checksum if not done by hw. */
  605. if (csum_partial_copy_to_xdr(&rovr->rq_rcv_buf, skb))
  606. goto out_unlock;
  607. /* Something worked... */
  608. dst_confirm(skb->dst);
  609. xprt_complete_rqst(xprt, rovr, copied);
  610.  out_unlock:
  611. spin_unlock(&xprt->sock_lock);
  612.  dropit:
  613. skb_free_datagram(sk, skb);
  614.  out:
  615. if (sk->sleep && waitqueue_active(sk->sleep))
  616. wake_up_interruptible(sk->sleep);
  617. }
  618. /*
  619.  * Copy from an skb into memory and shrink the skb.
  620.  */
  621. static inline size_t
  622. tcp_copy_data(skb_reader_t *desc, void *p, size_t len)
  623. {
  624. if (len > desc->count)
  625. len = desc->count;
  626. skb_copy_bits(desc->skb, desc->offset, p, len);
  627. desc->offset += len;
  628. desc->count -= len;
  629. return len;
  630. }
  631. /*
  632.  * TCP read fragment marker
  633.  */
  634. static inline void
  635. tcp_read_fraghdr(struct rpc_xprt *xprt, skb_reader_t *desc)
  636. {
  637. size_t len, used;
  638. char *p;
  639. p = ((char *) &xprt->tcp_recm) + xprt->tcp_offset;
  640. len = sizeof(xprt->tcp_recm) - xprt->tcp_offset;
  641. used = tcp_copy_data(desc, p, len);
  642. xprt->tcp_offset += used;
  643. if (used != len)
  644. return;
  645. xprt->tcp_reclen = ntohl(xprt->tcp_recm);
  646. if (xprt->tcp_reclen & 0x80000000)
  647. xprt->tcp_flags |= XPRT_LAST_FRAG;
  648. else
  649. xprt->tcp_flags &= ~XPRT_LAST_FRAG;
  650. xprt->tcp_reclen &= 0x7fffffff;
  651. xprt->tcp_flags &= ~XPRT_COPY_RECM;
  652. xprt->tcp_offset = 0;
  653. /* Sanity check of the record length */
  654. if (xprt->tcp_reclen < 4) {
  655. printk(KERN_ERR "RPC: Invalid TCP record fragment lengthn");
  656. xprt_disconnect(xprt);
  657. }
  658. dprintk("RPC:      reading TCP record fragment of length %dn",
  659. xprt->tcp_reclen);
  660. }
  661. static void
  662. tcp_check_recm(struct rpc_xprt *xprt)
  663. {
  664. if (xprt->tcp_offset == xprt->tcp_reclen) {
  665. xprt->tcp_flags |= XPRT_COPY_RECM;
  666. xprt->tcp_offset = 0;
  667. if (xprt->tcp_flags & XPRT_LAST_FRAG) {
  668. xprt->tcp_flags &= ~XPRT_COPY_DATA;
  669. xprt->tcp_flags |= XPRT_COPY_XID;
  670. xprt->tcp_copied = 0;
  671. }
  672. }
  673. }
  674. /*
  675.  * TCP read xid
  676.  */
  677. static inline void
  678. tcp_read_xid(struct rpc_xprt *xprt, skb_reader_t *desc)
  679. {
  680. size_t len, used;
  681. char *p;
  682. len = sizeof(xprt->tcp_xid) - xprt->tcp_offset;
  683. dprintk("RPC:      reading XID (%Zu bytes)n", len);
  684. p = ((char *) &xprt->tcp_xid) + xprt->tcp_offset;
  685. used = tcp_copy_data(desc, p, len);
  686. xprt->tcp_offset += used;
  687. if (used != len)
  688. return;
  689. xprt->tcp_flags &= ~XPRT_COPY_XID;
  690. xprt->tcp_flags |= XPRT_COPY_DATA;
  691. xprt->tcp_copied = 4;
  692. dprintk("RPC:      reading reply for XID %08xn", xprt->tcp_xid);
  693. tcp_check_recm(xprt);
  694. }
  695. /*
  696.  * TCP read and complete request
  697.  */
  698. static inline void
  699. tcp_read_request(struct rpc_xprt *xprt, skb_reader_t *desc)
  700. {
  701. struct rpc_rqst *req;
  702. struct xdr_buf *rcvbuf;
  703. size_t len;
  704. /* Find and lock the request corresponding to this xid */
  705. spin_lock(&xprt->sock_lock);
  706. req = xprt_lookup_rqst(xprt, xprt->tcp_xid);
  707. if (!req) {
  708. xprt->tcp_flags &= ~XPRT_COPY_DATA;
  709. dprintk("RPC:      XID %08x request not found!n",
  710. xprt->tcp_xid);
  711. spin_unlock(&xprt->sock_lock);
  712. return;
  713. }
  714. rcvbuf = &req->rq_rcv_buf;
  715. len = desc->count;
  716. if (len > xprt->tcp_reclen - xprt->tcp_offset) {
  717. skb_reader_t my_desc;
  718. len = xprt->tcp_reclen - xprt->tcp_offset;
  719. memcpy(&my_desc, desc, sizeof(my_desc));
  720. my_desc.count = len;
  721. xdr_partial_copy_from_skb(rcvbuf, xprt->tcp_copied,
  722.   &my_desc, tcp_copy_data);
  723. desc->count -= len;
  724. desc->offset += len;
  725. } else
  726. xdr_partial_copy_from_skb(rcvbuf, xprt->tcp_copied,
  727.   desc, tcp_copy_data);
  728. xprt->tcp_copied += len;
  729. xprt->tcp_offset += len;
  730. if (xprt->tcp_copied == req->rq_rlen)
  731. xprt->tcp_flags &= ~XPRT_COPY_DATA;
  732. else if (xprt->tcp_offset == xprt->tcp_reclen) {
  733. if (xprt->tcp_flags & XPRT_LAST_FRAG)
  734. xprt->tcp_flags &= ~XPRT_COPY_DATA;
  735. }
  736. if (!(xprt->tcp_flags & XPRT_COPY_DATA)) {
  737. dprintk("RPC: %4d received reply completen",
  738. req->rq_task->tk_pid);
  739. xprt_complete_rqst(xprt, req, xprt->tcp_copied);
  740. }
  741. spin_unlock(&xprt->sock_lock);
  742. tcp_check_recm(xprt);
  743. }
  744. /*
  745.  * TCP discard extra bytes from a short read
  746.  */
  747. static inline void
  748. tcp_read_discard(struct rpc_xprt *xprt, skb_reader_t *desc)
  749. {
  750. size_t len;
  751. len = xprt->tcp_reclen - xprt->tcp_offset;
  752. if (len > desc->count)
  753. len = desc->count;
  754. desc->count -= len;
  755. desc->offset += len;
  756. xprt->tcp_offset += len;
  757. tcp_check_recm(xprt);
  758. }
  759. /*
  760.  * TCP record receive routine
  761.  * We first have to grab the record marker, then the XID, then the data.
  762.  */
  763. static int
  764. tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb,
  765. unsigned int offset, size_t len)
  766. {
  767. struct rpc_xprt *xprt = (struct rpc_xprt *)rd_desc->buf;
  768. skb_reader_t desc = { skb, offset, len };
  769. dprintk("RPC:      tcp_data_recvn");
  770. do {
  771. /* Read in a new fragment marker if necessary */
  772. /* Can we ever really expect to get completely empty fragments? */
  773. if (xprt->tcp_flags & XPRT_COPY_RECM) {
  774. tcp_read_fraghdr(xprt, &desc);
  775. continue;
  776. }
  777. /* Read in the xid if necessary */
  778. if (xprt->tcp_flags & XPRT_COPY_XID) {
  779. tcp_read_xid(xprt, &desc);
  780. continue;
  781. }
  782. /* Read in the request data */
  783. if (xprt->tcp_flags & XPRT_COPY_DATA) {
  784. tcp_read_request(xprt, &desc);
  785. continue;
  786. }
  787. /* Skip over any trailing bytes on short reads */
  788. tcp_read_discard(xprt, &desc);
  789. } while (desc.count && xprt_connected(xprt));
  790. dprintk("RPC:      tcp_data_recv donen");
  791. return len - desc.count;
  792. }
  793. static void tcp_data_ready(struct sock *sk, int bytes)
  794. {
  795. struct rpc_xprt *xprt;
  796. read_descriptor_t rd_desc;
  797. dprintk("RPC:      tcp_data_ready...n");
  798. if (!(xprt = xprt_from_sock(sk))) {
  799. printk("RPC:      tcp_data_ready socket info not found!n");
  800. return;
  801. }
  802. if (xprt->shutdown)
  803. return;
  804. /* We use rd_desc to pass struct xprt to tcp_data_recv */
  805. rd_desc.buf = (char *)xprt;
  806. rd_desc.count = 65536;
  807. tcp_read_sock(sk, &rd_desc, tcp_data_recv);
  808. }
  809. static void
  810. tcp_state_change(struct sock *sk)
  811. {
  812. struct rpc_xprt *xprt;
  813. if (!(xprt = xprt_from_sock(sk)))
  814. goto out;
  815. dprintk("RPC:      tcp_state_change client %p...n", xprt);
  816. dprintk("RPC:      state %x conn %d dead %d zapped %dn",
  817. sk->state, xprt_connected(xprt),
  818. sk->dead, sk->zapped);
  819. switch (sk->state) {
  820. case TCP_ESTABLISHED:
  821. if (xprt_test_and_set_connected(xprt))
  822. break;
  823. /* Reset TCP record info */
  824. xprt->tcp_offset = 0;
  825. xprt->tcp_reclen = 0;
  826. xprt->tcp_copied = 0;
  827. xprt->tcp_flags = XPRT_COPY_RECM | XPRT_COPY_XID;
  828. spin_lock(&xprt->sock_lock);
  829. if (xprt->snd_task && xprt->snd_task->tk_rpcwait == &xprt->pending)
  830. rpc_wake_up_task(xprt->snd_task);
  831. spin_unlock(&xprt->sock_lock);
  832. break;
  833. case TCP_SYN_SENT:
  834. case TCP_SYN_RECV:
  835. break;
  836. default:
  837. xprt_disconnect(xprt);
  838. break;
  839. }
  840.  out:
  841. if (sk->sleep && waitqueue_active(sk->sleep))
  842. wake_up_interruptible_all(sk->sleep);
  843. }
  844. /*
  845.  * Called when more output buffer space is available for this socket.
  846.  * We try not to wake our writers until they can make "significant"
  847.  * progress, otherwise we'll waste resources thrashing sock_sendmsg
  848.  * with a bunch of small requests.
  849.  */
  850. static void
  851. xprt_write_space(struct sock *sk)
  852. {
  853. struct rpc_xprt *xprt;
  854. struct socket *sock;
  855. if (!(xprt = xprt_from_sock(sk)) || !(sock = sk->socket))
  856. return;
  857. if (xprt->shutdown)
  858. return;
  859. /* Wait until we have enough socket memory */
  860. if (xprt->stream) {
  861. /* from net/ipv4/tcp.c:tcp_write_space */
  862. if (tcp_wspace(sk) < tcp_min_write_space(sk))
  863. return;
  864. } else {
  865. /* from net/core/sock.c:sock_def_write_space */
  866. if (!sock_writeable(sk))
  867. return;
  868. }
  869. if (!test_and_clear_bit(SOCK_NOSPACE, &sock->flags))
  870. return;
  871. spin_lock_bh(&xprt->sock_lock);
  872. if (xprt->snd_task && xprt->snd_task->tk_rpcwait == &xprt->pending)
  873. rpc_wake_up_task(xprt->snd_task);
  874. spin_unlock_bh(&xprt->sock_lock);
  875. if (sk->sleep && waitqueue_active(sk->sleep))
  876. wake_up_interruptible(sk->sleep);
  877. }
  878. /*
  879.  * Exponential backoff for UDP retries
  880.  */
  881. static inline int
  882. xprt_expbackoff(struct rpc_task *task, struct rpc_rqst *req)
  883. {
  884. int backoff;
  885. req->rq_ntimeo++;
  886. backoff = min(rpc_ntimeo(&task->tk_client->cl_rtt), XPRT_MAX_BACKOFF);
  887. if (req->rq_ntimeo < (1 << backoff))
  888. return 1;
  889. return 0;
  890. }
  891. /*
  892.  * RPC receive timeout handler.
  893.  */
  894. static void
  895. xprt_timer(struct rpc_task *task)
  896. {
  897. struct rpc_rqst *req = task->tk_rqstp;
  898. struct rpc_xprt *xprt = req->rq_xprt;
  899. spin_lock(&xprt->sock_lock);
  900. if (req->rq_received)
  901. goto out;
  902. if (!xprt->nocong) {
  903. if (xprt_expbackoff(task, req)) {
  904. rpc_add_timer(task, xprt_timer);
  905. goto out_unlock;
  906. }
  907. rpc_inc_timeo(&task->tk_client->cl_rtt);
  908. xprt_adjust_cwnd(req->rq_xprt, -ETIMEDOUT);
  909. }
  910. req->rq_nresend++;
  911. dprintk("RPC: %4d xprt_timer (%s request)n",
  912. task->tk_pid, req ? "pending" : "backlogged");
  913. task->tk_status  = -ETIMEDOUT;
  914. out:
  915. task->tk_timeout = 0;
  916. rpc_wake_up_task(task);
  917. out_unlock:
  918. spin_unlock(&xprt->sock_lock);
  919. }
  920. /*
  921.  * Place the actual RPC call.
  922.  * We have to copy the iovec because sendmsg fiddles with its contents.
  923.  */
  924. void
  925. xprt_transmit(struct rpc_task *task)
  926. {
  927. struct rpc_rqst *req = task->tk_rqstp;
  928. struct rpc_xprt *xprt = req->rq_xprt;
  929. dprintk("RPC: %4d xprt_transmit(%x)n", task->tk_pid, 
  930. *(u32 *)(req->rq_svec[0].iov_base));
  931. if (xprt->shutdown)
  932. task->tk_status = -EIO;
  933. if (!xprt_connected(xprt))
  934. task->tk_status = -ENOTCONN;
  935. if (task->tk_status < 0)
  936. return;
  937. if (task->tk_rpcwait)
  938. rpc_remove_wait_queue(task);
  939. /* set up everything as needed. */
  940. /* Write the record marker */
  941. if (xprt->stream) {
  942. u32 *marker = req->rq_svec[0].iov_base;
  943. *marker = htonl(0x80000000|(req->rq_slen-sizeof(*marker)));
  944. }
  945. spin_lock_bh(&xprt->sock_lock);
  946. if (!__xprt_lock_write(xprt, task)) {
  947. spin_unlock_bh(&xprt->sock_lock);
  948. return;
  949. }
  950. if (list_empty(&req->rq_list)) {
  951. list_add_tail(&req->rq_list, &xprt->recv);
  952. req->rq_received = 0;
  953. }
  954. spin_unlock_bh(&xprt->sock_lock);
  955. do_xprt_transmit(task);
  956. }
  957. static void
  958. do_xprt_transmit(struct rpc_task *task)
  959. {
  960. struct rpc_clnt *clnt = task->tk_client;
  961. struct rpc_rqst *req = task->tk_rqstp;
  962. struct rpc_xprt *xprt = req->rq_xprt;
  963. int status, retry = 0;
  964. /* Continue transmitting the packet/record. We must be careful
  965.  * to cope with writespace callbacks arriving _after_ we have
  966.  * called xprt_sendmsg().
  967.  */
  968. while (1) {
  969. req->rq_xtime = jiffies;
  970. status = xprt_sendmsg(xprt, req);
  971. if (status < 0)
  972. break;
  973. if (xprt->stream) {
  974. req->rq_bytes_sent += status;
  975. if (req->rq_bytes_sent >= req->rq_slen)
  976. goto out_receive;
  977. } else {
  978. if (status >= req->rq_slen)
  979. goto out_receive;
  980. status = -EAGAIN;
  981. break;
  982. }
  983. dprintk("RPC: %4d xmit incomplete (%d left of %d)n",
  984. task->tk_pid, req->rq_slen - req->rq_bytes_sent,
  985. req->rq_slen);
  986. status = -EAGAIN;
  987. if (retry++ > 50)
  988. break;
  989. }
  990. /* Note: at this point, task->tk_sleeping has not yet been set,
  991.  *  hence there is no danger of the waking up task being put on
  992.  *  schedq, and being picked up by a parallel run of rpciod().
  993.  */
  994. if (req->rq_received)
  995. goto out_release;
  996. task->tk_status = status;
  997. switch (status) {
  998. case -EAGAIN:
  999. if (test_bit(SOCK_ASYNC_NOSPACE, &xprt->sock->flags)) {
  1000. /* Protect against races with xprt_write_space */
  1001. spin_lock_bh(&xprt->sock_lock);
  1002. if (test_bit(SOCK_NOSPACE, &xprt->sock->flags)) {
  1003. task->tk_timeout = req->rq_timeout.to_current;
  1004. rpc_sleep_on(&xprt->pending, task, NULL, NULL);
  1005. }
  1006. spin_unlock_bh(&xprt->sock_lock);
  1007. return;
  1008. }
  1009. /* Keep holding the socket if it is blocked */
  1010. rpc_delay(task, HZ>>4);
  1011. return;
  1012. case -ECONNREFUSED:
  1013. case -ENOTCONN:
  1014. if (!xprt->stream)
  1015. return;
  1016. default:
  1017. if (xprt->stream)
  1018. xprt_disconnect(xprt);
  1019. req->rq_bytes_sent = 0;
  1020. }
  1021.  out_release:
  1022. xprt_release_write(xprt, task);
  1023. return;
  1024.  out_receive:
  1025. dprintk("RPC: %4d xmit completen", task->tk_pid);
  1026. /* Set the task's receive timeout value */
  1027. if (!xprt->nocong) {
  1028. task->tk_timeout = rpc_calc_rto(&clnt->cl_rtt,
  1029. rpcproc_timer(clnt, task->tk_msg.rpc_proc));
  1030. req->rq_ntimeo = 0;
  1031. if (task->tk_timeout > req->rq_timeout.to_maxval)
  1032. task->tk_timeout = req->rq_timeout.to_maxval;
  1033. } else
  1034. task->tk_timeout = req->rq_timeout.to_current;
  1035. spin_lock_bh(&xprt->sock_lock);
  1036. if (!req->rq_received)
  1037. rpc_sleep_on(&xprt->pending, task, NULL, xprt_timer);
  1038. __xprt_release_write(xprt, task);
  1039. spin_unlock_bh(&xprt->sock_lock);
  1040. }
  1041. /*
  1042.  * Reserve an RPC call slot.
  1043.  */
  1044. int
  1045. xprt_reserve(struct rpc_task *task)
  1046. {
  1047. struct rpc_xprt *xprt = task->tk_xprt;
  1048. /* We already have an initialized request. */
  1049. if (task->tk_rqstp)
  1050. return 0;
  1051. spin_lock(&xprt->xprt_lock);
  1052. xprt_reserve_status(task);
  1053. if (task->tk_rqstp) {
  1054. task->tk_timeout = 0;
  1055. } else if (!task->tk_timeout) {
  1056. task->tk_status = -ENOBUFS;
  1057. } else {
  1058. dprintk("RPC:      xprt_reserve waiting on backlogn");
  1059. task->tk_status = -EAGAIN;
  1060. rpc_sleep_on(&xprt->backlog, task, NULL, NULL);
  1061. }
  1062. spin_unlock(&xprt->xprt_lock);
  1063. dprintk("RPC: %4d xprt_reserve returns %dn",
  1064. task->tk_pid, task->tk_status);
  1065. return task->tk_status;
  1066. }
  1067. /*
  1068.  * Reservation callback
  1069.  */
  1070. static void
  1071. xprt_reserve_status(struct rpc_task *task)
  1072. {
  1073. struct rpc_xprt *xprt = task->tk_xprt;
  1074. struct rpc_rqst *req;
  1075. if (xprt->shutdown) {
  1076. task->tk_status = -EIO;
  1077. } else if (task->tk_status < 0) {
  1078. /* NOP */
  1079. } else if (task->tk_rqstp) {
  1080. /* We've already been given a request slot: NOP */
  1081. } else {
  1082. if (!(req = xprt->free))
  1083. goto out_nofree;
  1084. /* OK: There's room for us. Grab a free slot */
  1085. xprt->free     = req->rq_next;
  1086. req->rq_next   = NULL;
  1087. task->tk_rqstp = req;
  1088. xprt_request_init(task, xprt);
  1089. }
  1090. return;
  1091. out_nofree:
  1092. task->tk_status = -EAGAIN;
  1093. }
  1094. /*
  1095.  * Initialize RPC request
  1096.  */
  1097. static void
  1098. xprt_request_init(struct rpc_task *task, struct rpc_xprt *xprt)
  1099. {
  1100. struct rpc_rqst *req = task->tk_rqstp;
  1101. static u32 xid = 0;
  1102. if (!xid)
  1103. xid = CURRENT_TIME << 12;
  1104. dprintk("RPC: %4d reserved req %p xid %08xn", task->tk_pid, req, xid);
  1105. task->tk_status = 0;
  1106. req->rq_timeout = xprt->timeout;
  1107. req->rq_task = task;
  1108. req->rq_xprt    = xprt;
  1109. req->rq_xid     = xid++;
  1110. if (!xid)
  1111. xid++;
  1112. INIT_LIST_HEAD(&req->rq_list);
  1113. }
  1114. /*
  1115.  * Release an RPC call slot
  1116.  */
  1117. void
  1118. xprt_release(struct rpc_task *task)
  1119. {
  1120. struct rpc_xprt *xprt = task->tk_xprt;
  1121. struct rpc_rqst *req;
  1122. if (!(req = task->tk_rqstp))
  1123. return;
  1124. spin_lock_bh(&xprt->sock_lock);
  1125. __xprt_release_write(xprt, task);
  1126. __xprt_put_cong(xprt, req);
  1127. if (!list_empty(&req->rq_list))
  1128. list_del(&req->rq_list);
  1129. spin_unlock_bh(&xprt->sock_lock);
  1130. task->tk_rqstp = NULL;
  1131. memset(req, 0, sizeof(*req)); /* mark unused */
  1132. dprintk("RPC: %4d release request %pn", task->tk_pid, req);
  1133. spin_lock(&xprt->xprt_lock);
  1134. req->rq_next = xprt->free;
  1135. xprt->free   = req;
  1136. xprt_clear_backlog(xprt);
  1137. spin_unlock(&xprt->xprt_lock);
  1138. }
  1139. /*
  1140.  * Set default timeout parameters
  1141.  */
  1142. void
  1143. xprt_default_timeout(struct rpc_timeout *to, int proto)
  1144. {
  1145. if (proto == IPPROTO_UDP)
  1146. xprt_set_timeout(to, 5,  5 * HZ);
  1147. else
  1148. xprt_set_timeout(to, 5, 60 * HZ);
  1149. }
  1150. /*
  1151.  * Set constant timeout
  1152.  */
  1153. void
  1154. xprt_set_timeout(struct rpc_timeout *to, unsigned int retr, unsigned long incr)
  1155. {
  1156. to->to_current   = 
  1157. to->to_initval   = 
  1158. to->to_increment = incr;
  1159. to->to_maxval    = incr * retr;
  1160. to->to_resrvval  = incr * retr;
  1161. to->to_retries   = retr;
  1162. to->to_exponential = 0;
  1163. }
  1164. /*
  1165.  * Initialize an RPC client
  1166.  */
  1167. static struct rpc_xprt *
  1168. xprt_setup(struct socket *sock, int proto,
  1169. struct sockaddr_in *ap, struct rpc_timeout *to)
  1170. {
  1171. struct rpc_xprt *xprt;
  1172. struct rpc_rqst *req;
  1173. int i;
  1174. dprintk("RPC:      setting up %s transport...n",
  1175. proto == IPPROTO_UDP? "UDP" : "TCP");
  1176. if ((xprt = kmalloc(sizeof(struct rpc_xprt), GFP_KERNEL)) == NULL)
  1177. return NULL;
  1178. memset(xprt, 0, sizeof(*xprt)); /* Nnnngh! */
  1179. xprt->addr = *ap;
  1180. xprt->prot = proto;
  1181. xprt->stream = (proto == IPPROTO_TCP)? 1 : 0;
  1182. if (xprt->stream) {
  1183. xprt->cwnd = RPC_MAXCWND;
  1184. xprt->nocong = 1;
  1185. } else
  1186. xprt->cwnd = RPC_INITCWND;
  1187. spin_lock_init(&xprt->sock_lock);
  1188. spin_lock_init(&xprt->xprt_lock);
  1189. init_waitqueue_head(&xprt->cong_wait);
  1190. INIT_LIST_HEAD(&xprt->recv);
  1191. /* Set timeout parameters */
  1192. if (to) {
  1193. xprt->timeout = *to;
  1194. xprt->timeout.to_current = to->to_initval;
  1195. xprt->timeout.to_resrvval = to->to_maxval << 1;
  1196. } else
  1197. xprt_default_timeout(&xprt->timeout, xprt->prot);
  1198. INIT_RPC_WAITQ(&xprt->pending, "xprt_pending");
  1199. INIT_RPC_WAITQ(&xprt->sending, "xprt_sending");
  1200. INIT_RPC_WAITQ(&xprt->resend, "xprt_resend");
  1201. INIT_RPC_WAITQ(&xprt->backlog, "xprt_backlog");
  1202. /* initialize free list */
  1203. for (i = 0, req = xprt->slot; i < RPC_MAXREQS-1; i++, req++)
  1204. req->rq_next = req + 1;
  1205. req->rq_next = NULL;
  1206. xprt->free = xprt->slot;
  1207. dprintk("RPC:      created transport %pn", xprt);
  1208. xprt_bind_socket(xprt, sock);
  1209. return xprt;
  1210. }
  1211. /*
  1212.  * Bind to a reserved port
  1213.  */
  1214. static inline int
  1215. xprt_bindresvport(struct socket *sock)
  1216. {
  1217. struct sockaddr_in myaddr;
  1218. int err, port;
  1219. memset(&myaddr, 0, sizeof(myaddr));
  1220. myaddr.sin_family = AF_INET;
  1221. port = 800;
  1222. do {
  1223. myaddr.sin_port = htons(port);
  1224. err = sock->ops->bind(sock, (struct sockaddr *) &myaddr,
  1225. sizeof(myaddr));
  1226. } while (err == -EADDRINUSE && --port > 0);
  1227. if (err < 0)
  1228. printk("RPC: Can't bind to reserved port (%d).n", -err);
  1229. return err;
  1230. }
  1231. static int 
  1232. xprt_bind_socket(struct rpc_xprt *xprt, struct socket *sock)
  1233. {
  1234. struct sock *sk = sock->sk;
  1235. if (xprt->inet)
  1236. return -EBUSY;
  1237. sk->user_data = xprt;
  1238. xprt->old_data_ready = sk->data_ready;
  1239. xprt->old_state_change = sk->state_change;
  1240. xprt->old_write_space = sk->write_space;
  1241. if (xprt->prot == IPPROTO_UDP) {
  1242. sk->data_ready = udp_data_ready;
  1243. sk->no_check = UDP_CSUM_NORCV;
  1244. xprt_set_connected(xprt);
  1245. } else {
  1246. sk->data_ready = tcp_data_ready;
  1247. sk->state_change = tcp_state_change;
  1248. xprt_clear_connected(xprt);
  1249. }
  1250. sk->write_space = xprt_write_space;
  1251. /* Reset to new socket */
  1252. xprt->sock = sock;
  1253. xprt->inet = sk;
  1254. /*
  1255.  * TCP requires the rpc I/O daemon is present
  1256.  */
  1257. if(xprt->stream)
  1258. rpciod_up();
  1259. return 0;
  1260. }
  1261. /*
  1262.  * Set socket buffer length
  1263.  */
  1264. void
  1265. xprt_sock_setbufsize(struct rpc_xprt *xprt)
  1266. {
  1267. struct sock *sk = xprt->inet;
  1268. if (xprt->stream)
  1269. return;
  1270. if (xprt->rcvsize) {
  1271. sk->userlocks |= SOCK_RCVBUF_LOCK;
  1272. sk->rcvbuf = xprt->rcvsize * RPC_MAXCONG * 2;
  1273. }
  1274. if (xprt->sndsize) {
  1275. sk->userlocks |= SOCK_SNDBUF_LOCK;
  1276. sk->sndbuf = xprt->sndsize * RPC_MAXCONG * 2;
  1277. sk->write_space(sk);
  1278. }
  1279. }
  1280. /*
  1281.  * Create a client socket given the protocol and peer address.
  1282.  */
  1283. static struct socket *
  1284. xprt_create_socket(int proto, struct rpc_timeout *to)
  1285. {
  1286. struct socket *sock;
  1287. int type, err;
  1288. dprintk("RPC:      xprt_create_socket(%s %d)n",
  1289.    (proto == IPPROTO_UDP)? "udp" : "tcp", proto);
  1290. type = (proto == IPPROTO_UDP)? SOCK_DGRAM : SOCK_STREAM;
  1291. if ((err = sock_create(PF_INET, type, proto, &sock)) < 0) {
  1292. printk("RPC: can't create socket (%d).n", -err);
  1293. goto failed;
  1294. }
  1295. /* If the caller has the capability, bind to a reserved port */
  1296. if (capable(CAP_NET_BIND_SERVICE) && xprt_bindresvport(sock) < 0)
  1297. goto failed;
  1298. return sock;
  1299. failed:
  1300. sock_release(sock);
  1301. return NULL;
  1302. }
  1303. /*
  1304.  * Create an RPC client transport given the protocol and peer address.
  1305.  */
  1306. struct rpc_xprt *
  1307. xprt_create_proto(int proto, struct sockaddr_in *sap, struct rpc_timeout *to)
  1308. {
  1309. struct socket *sock;
  1310. struct rpc_xprt *xprt;
  1311. dprintk("RPC:      xprt_create_proto calledn");
  1312. if (!(sock = xprt_create_socket(proto, to)))
  1313. return NULL;
  1314. if (!(xprt = xprt_setup(sock, proto, sap, to)))
  1315. sock_release(sock);
  1316. return xprt;
  1317. }
  1318. /*
  1319.  * Prepare for transport shutdown.
  1320.  */
  1321. void
  1322. xprt_shutdown(struct rpc_xprt *xprt)
  1323. {
  1324. xprt->shutdown = 1;
  1325. rpc_wake_up(&xprt->sending);
  1326. rpc_wake_up(&xprt->resend);
  1327. rpc_wake_up(&xprt->pending);
  1328. rpc_wake_up(&xprt->backlog);
  1329. if (waitqueue_active(&xprt->cong_wait))
  1330. wake_up(&xprt->cong_wait);
  1331. }
  1332. /*
  1333.  * Clear the xprt backlog queue
  1334.  */
  1335. int
  1336. xprt_clear_backlog(struct rpc_xprt *xprt) {
  1337. rpc_wake_up_next(&xprt->backlog);
  1338. if (waitqueue_active(&xprt->cong_wait))
  1339. wake_up(&xprt->cong_wait);
  1340. return 1;
  1341. }
  1342. /*
  1343.  * Destroy an RPC transport, killing off all requests.
  1344.  */
  1345. int
  1346. xprt_destroy(struct rpc_xprt *xprt)
  1347. {
  1348. dprintk("RPC:      destroying transport %pn", xprt);
  1349. xprt_shutdown(xprt);
  1350. xprt_close(xprt);
  1351. kfree(xprt);
  1352. return 0;
  1353. }