mbus.c
上传用户:sun1608
上传日期:2007-02-02
资源大小:6116k
文件大小:30k
源码类别:

流媒体/Mpeg4/MP4

开发平台:

Visual C++

  1. /*
  2.  * FILE:     mbus.c
  3.  * AUTHOR:   Colin Perkins
  4.  * MODIFIED: Orion Hodson
  5.  *           Markus Germeier
  6.  * 
  7.  * Copyright (c) 1997-2000 University College London
  8.  * All rights reserved.
  9.  *
  10.  * Redistribution and use in source and binary forms, with or without
  11.  * modification, is permitted provided that the following conditions 
  12.  * are met:
  13.  * 1. Redistributions of source code must retain the above copyright
  14.  *    notice, this list of conditions and the following disclaimer.
  15.  * 2. Redistributions in binary form must reproduce the above copyright
  16.  *    notice, this list of conditions and the following disclaimer in the
  17.  *    documentation and/or other materials provided with the distribution.
  18.  * 3. All advertising materials mentioning features or use of this software
  19.  *    must display the following acknowledgement:
  20.  *      This product includes software developed by the Computer Science
  21.  *      Department at University College London
  22.  * 4. Neither the name of the University nor of the Department may be used
  23.  *    to endorse or promote products derived from this software without
  24.  *    specific prior written permission.
  25.  * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND
  26.  * ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
  27.  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
  28.  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE
  29.  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
  30.  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
  31.  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
  32.  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
  33.  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
  34.  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
  35.  * SUCH DAMAGE.
  36.  */
  37. #include "config_unix.h"
  38. #include "config_win32.h"
  39. #include "debug.h"
  40. #include "memory.h"
  41. #include "net_udp.h"
  42. #include "hmac.h"
  43. #include "qfDES.h"
  44. #include "base64.h"
  45. #include "gettimeofday.h"
  46. #include "vsnprintf.h"
  47. #include "mbus.h"
  48. #include "mbus_config.h"
  49. #include "mbus_parser.h"
  50. #include "mbus_addr.h"
  51. #define MBUS_BUF_SIZE   1500
  52. #define MBUS_ACK_BUF_SIZE 1500
  53. #define MBUS_MAX_ADDR     10
  54. #define MBUS_MAX_QLEN     50 /* Number of messages we can queue with mbus_qmsg() */
  55. #define MBUS_MAGIC 0x87654321
  56. #define MBUS_MSG_MAGIC 0x12345678
  57. struct mbus_msg {
  58. struct mbus_msg *next;
  59. struct timeval  send_time; /* Time the message was sent, to trigger a retransmit */
  60. struct timeval  comp_time; /* Time the message was composed, the timestamp in the packet header */
  61. char *dest;
  62. int  reliable;
  63. int  complete; /* Indicates that we've finished adding cmds to this message */
  64. int  seqnum;
  65. int  retransmit_count;
  66. int  message_size;
  67. int  num_cmds;
  68. char *cmd_list[MBUS_MAX_QLEN];
  69. char *arg_list[MBUS_MAX_QLEN];
  70. uint32_t  idx_list[MBUS_MAX_QLEN];
  71. uint32_t  magic; /* For debugging... */
  72. };
  73. struct mbus {
  74. socket_udp    *s;
  75. char    *addr; /* Addresses we respond to.  */
  76. int     max_other_addr;
  77. int     num_other_addr;
  78. char **other_addr; /* Addresses of other entities on the mbus.  */
  79.         struct timeval          **other_hello;                  /* Time of last mbus.hello we received from other entities      */
  80. int     seqnum;
  81. struct mbus_msg    *cmd_queue; /* Queue of messages waiting to be sent */
  82. struct mbus_msg    *waiting_ack; /* The last reliable message sent, if we have not yet got the ACK */
  83. char    *hashkey;
  84. int     hashkeylen;
  85. char    *encrkey;
  86. int     encrkeylen;
  87. struct timeval     last_heartbeat; /* Last time we sent a heartbeat message */
  88. struct mbus_config  *cfg;
  89. void (*cmd_handler)(char *src, char *cmd, char *arg, void *dat);
  90. void (*err_handler)(int seqnum, int reason);
  91. uint32_t   magic; /* For debugging...                                             */
  92. uint32_t   index;
  93. uint32_t   index_sent;
  94. };
  95. static void mbus_validate(struct mbus *m)
  96. {
  97. #ifdef DEBUG
  98. int i;
  99. ASSERT(m->num_other_addr <= m->max_other_addr);
  100. ASSERT(m->num_other_addr >= 0);
  101. for (i = 0; i < m->num_other_addr; i++) {
  102. ASSERT(m->other_addr[i]  != NULL);
  103. ASSERT(m->other_hello[i] != NULL);
  104. }
  105. for (i = m->num_other_addr + 1; i < m->max_other_addr; i++) {
  106. ASSERT(m->other_addr[i]  == NULL);
  107. ASSERT(m->other_hello[i] == NULL);
  108. }
  109. #endif
  110. ASSERT(m->magic == MBUS_MAGIC);
  111. xmemchk();
  112. }
  113. static void mbus_msg_validate(struct mbus_msg *m)
  114. {
  115. #ifdef DEBUG
  116. int i;
  117. ASSERT((m->num_cmds < MBUS_MAX_QLEN) && (m->num_cmds >= 0));
  118. for (i = 0; i < m->num_cmds; i++) {
  119. ASSERT(m->cmd_list[i] != NULL);
  120. ASSERT(m->arg_list[i] != NULL);
  121. if (i > 0) {
  122. ASSERT(m->idx_list[i] > m->idx_list[i-1]);
  123. }
  124. }
  125. for (i = m->num_cmds + 1; i < MBUS_MAX_QLEN; i++) {
  126. ASSERT(m->cmd_list[i] == NULL);
  127. ASSERT(m->arg_list[i] == NULL);
  128. }
  129. ASSERT(m->dest != NULL);
  130. #endif
  131. ASSERT(m->magic == MBUS_MSG_MAGIC);
  132. }
  133. static void store_other_addr(struct mbus *m, char *a)
  134. {
  135. /* This takes the address a and ensures it is stored in the   */
  136. /* m->other_addr field of the mbus structure. The other_addr  */
  137. /* field should probably be a hash table, but for now we hope */
  138. /* that there are not too many entities on the mbus, so the   */
  139. /* list is small.                                             */
  140. int i;
  141. mbus_validate(m);
  142. for (i = 0; i < m->num_other_addr; i++) {
  143. if (mbus_addr_match(m->other_addr[i], a)) {
  144. /* Already in the list... */
  145. gettimeofday(m->other_hello[i],NULL);
  146. return;
  147. }
  148. }
  149. if (m->num_other_addr == m->max_other_addr) {
  150. /* Expand the list... */
  151. m->max_other_addr *= 2;
  152. m->other_addr = (char **) xrealloc(m->other_addr, m->max_other_addr * sizeof(char *));
  153. m->other_hello = (struct timeval **) xrealloc(m->other_hello, m->max_other_addr * sizeof(struct timeval *));
  154. }
  155. m->other_hello[m->num_other_addr]=(struct timeval *)xmalloc(sizeof(struct timeval));
  156. gettimeofday(m->other_hello[m->num_other_addr],NULL);
  157. m->other_addr[m->num_other_addr++] = xstrdup(a);
  158. }
  159. static void remove_other_addr(struct mbus *m, char *a)
  160. {
  161. /* Removes the address a from the m->other_addr field of the */
  162. /* mbus structure.                                           */
  163. int i, j;
  164. mbus_validate(m);
  165. for (i = 0; i < m->num_other_addr; i++) {
  166. if (mbus_addr_match(m->other_addr[i], a)) {
  167. xfree(m->other_addr[i]);
  168. xfree(m->other_hello[i]);
  169. for (j = i+1; j < m->num_other_addr; j++) {
  170. m->other_addr[j-1] = m->other_addr[j];
  171. m->other_hello[j-1] = m->other_hello[j];
  172. }
  173. m->other_addr[m->num_other_addr  - 1] = NULL;
  174. m->other_hello[m->num_other_addr - 1] = NULL;
  175. m->num_other_addr--;
  176. }
  177. }
  178. }
  179. static void remove_inactiv_other_addr(struct mbus *m, struct timeval t, int interval){
  180. /* Remove addresses we haven't heard from for about 5 * interval */
  181. /* Count backwards so it is safe to remove entries               */
  182. int i;
  183.     
  184. mbus_validate(m);
  185. for (i=m->num_other_addr-1; i>=0; i--){
  186. if ((t.tv_sec-(m->other_hello[i]->tv_sec)) > 5 * interval) {
  187. debug_msg("remove dead entity (%s)n", m->other_addr[i]);
  188. remove_other_addr(m, m->other_addr[i]);
  189. }
  190. }
  191. }
  192. int mbus_addr_valid(struct mbus *m, char *addr)
  193. {
  194. int i;
  195. mbus_validate(m);
  196. for (i = 0; i < m->num_other_addr; i++) {
  197. if (mbus_addr_match(m->other_addr[i], addr)) {
  198. return TRUE;
  199. }
  200. }
  201. return FALSE;
  202. }
  203. static int mbus_addr_unique(struct mbus *m, char *addr)
  204. {
  205. int     i, n = 0;
  206. mbus_validate(m);
  207. for (i = 0; i < m->num_other_addr; i++) {
  208. if (mbus_addr_match(m->other_addr[i], addr)) {
  209. n++;
  210. }
  211. }
  212. return n==1;
  213. }
  214. /* The mb_* functions are used to build an mbus message up in the */
  215. /* mb_buffer, and to add authentication and encryption before the */
  216. /* message is sent.                                               */
  217. char  mb_cryptbuf[MBUS_BUF_SIZE];
  218. char *mb_buffer;
  219. char *mb_bufpos;
  220. #define MBUS_AUTH_LEN 16
  221. static void mb_header(int seqnum, int ts, char reliable, const char *src, const char *dst, int ackseq)
  222. {
  223. xmemchk();
  224. mb_buffer   = (char *) xmalloc(MBUS_BUF_SIZE + 1);
  225. memset(mb_buffer,   0, MBUS_BUF_SIZE);
  226. memset(mb_buffer, ' ', MBUS_AUTH_LEN);
  227. mb_bufpos = mb_buffer + MBUS_AUTH_LEN;
  228. sprintf(mb_bufpos, "nmbus/1.0 %6d %9d %c (%s) %s ", seqnum, ts, reliable, src, dst);
  229. mb_bufpos += 33 + strlen(src) + strlen(dst);
  230. if (ackseq == -1) {
  231. sprintf(mb_bufpos, "()n");
  232. mb_bufpos += 3;
  233. } else {
  234. sprintf(mb_bufpos, "(%6d)n", ackseq);
  235. mb_bufpos += 9;
  236. }
  237. }
  238. static void mb_add_command(const char *cmnd, const char *args)
  239. {
  240. int offset = strlen(cmnd) + strlen(args) + 5;
  241. ASSERT((mb_bufpos + offset - mb_buffer) < MBUS_BUF_SIZE);
  242. sprintf(mb_bufpos, "%s (%s)n", cmnd, args);
  243. mb_bufpos += offset - 1; /* The -1 in offset means we're not NUL terminated - fix in mb_send */
  244. }
  245. static void mb_send(struct mbus *m)
  246. {
  247. char digest[16];
  248. int len;
  249. unsigned char initVec[8] = {0,0,0,0,0,0,0,0};
  250.  
  251. mbus_validate(m);
  252. *(mb_bufpos++) = '';
  253. ASSERT((mb_bufpos - mb_buffer) < MBUS_BUF_SIZE);
  254. ASSERT(strlen(mb_buffer) < MBUS_BUF_SIZE);
  255. /* Pad to a multiple of 8 bytes, so the encryption can work... */
  256. while (((mb_bufpos - mb_buffer) % 8) != 0) {
  257. *(mb_bufpos++) = '';
  258. }
  259. len = mb_bufpos - mb_buffer;
  260. ASSERT(len < MBUS_BUF_SIZE);
  261. ASSERT(strlen(mb_buffer) < MBUS_BUF_SIZE);
  262. xmemchk();
  263. if (m->hashkey != NULL) {
  264. /* Authenticate... */
  265. hmac_md5(mb_buffer + MBUS_AUTH_LEN+1, strlen(mb_buffer) - (MBUS_AUTH_LEN+1), m->hashkey, m->hashkeylen, digest);
  266. base64encode(digest, 12, mb_buffer, MBUS_AUTH_LEN);
  267. }
  268. xmemchk();
  269. if (m->encrkey != NULL) {
  270. /* Encrypt... */
  271. memset(mb_cryptbuf, 0, MBUS_BUF_SIZE);
  272. memcpy(mb_cryptbuf, mb_buffer, len);
  273. ASSERT((len % 8) == 0);
  274. ASSERT(len < MBUS_BUF_SIZE);
  275. ASSERT(m->encrkeylen == 8);
  276. xmemchk();
  277. qfDES_CBC_e(m->encrkey, mb_cryptbuf, len, initVec);
  278. xmemchk();
  279. memcpy(mb_buffer, mb_cryptbuf, len);
  280. }
  281. xmemchk();
  282. udp_send(m->s, mb_buffer, len);
  283. xfree(mb_buffer);
  284. }
  285. static void resend(struct mbus *m, struct mbus_msg *curr) 
  286. {
  287. /* Don't need to check for buffer overflows: this was done in mbus_send() when */
  288. /* this message was first transmitted. If it was okay then, it's okay now.     */
  289. int  i;
  290. mbus_validate(m);
  291. mb_header(curr->seqnum, curr->comp_time.tv_sec, (char)(curr->reliable?'R':'U'), m->addr, curr->dest, -1);
  292. for (i = 0; i < curr->num_cmds; i++) {
  293. mb_add_command(curr->cmd_list[i], curr->arg_list[i]);
  294. }
  295. mb_send(m);
  296. curr->retransmit_count++;
  297. }
  298. void mbus_retransmit(struct mbus *m)
  299. {
  300. struct mbus_msg *curr = m->waiting_ack;
  301. struct timeval time;
  302. long diff;
  303. mbus_validate(m);
  304. if (!mbus_waiting_ack(m)) {
  305. return;
  306. }
  307. mbus_msg_validate(curr);
  308. gettimeofday(&time, NULL);
  309. /* diff is time in milliseconds that the message has been awaiting an ACK */
  310. diff = ((time.tv_sec * 1000) + (time.tv_usec / 1000)) - ((curr->send_time.tv_sec * 1000) + (curr->send_time.tv_usec / 1000));
  311. if (diff > 10000) {
  312. debug_msg("Reliable mbus message failed!n");
  313. if (m->err_handler == NULL) {
  314. abort();
  315. }
  316. m->err_handler(curr->seqnum, MBUS_MESSAGE_LOST);
  317. /* if we don't delete this failed message, the error handler
  318.                    gets triggered every time we call mbus_retransmit */
  319. while (m->waiting_ack->num_cmds > 0) {
  320.     m->waiting_ack->num_cmds--;
  321.     xfree(m->waiting_ack->cmd_list[m->waiting_ack->num_cmds]);
  322.     xfree(m->waiting_ack->arg_list[m->waiting_ack->num_cmds]);
  323. }
  324. xfree(m->waiting_ack->dest);
  325. xfree(m->waiting_ack);
  326. m->waiting_ack = NULL;
  327. return;
  328. /* Note: We only send one retransmission each time, to avoid
  329.  * overflowing the receiver with a burst of requests...
  330.  */
  331. if ((diff > 750) && (curr->retransmit_count == 2)) {
  332. resend(m, curr);
  333. return;
  334. if ((diff > 500) && (curr->retransmit_count == 1)) {
  335. resend(m, curr);
  336. return;
  337. if ((diff > 250) && (curr->retransmit_count == 0)) {
  338. resend(m, curr);
  339. return;
  340. }
  341. }
  342. void mbus_heartbeat(struct mbus *m, int interval)
  343. {
  344. struct timeval curr_time;
  345. char *a = (char *) xmalloc(3);
  346. sprintf(a, "()");
  347. mbus_validate(m);
  348. gettimeofday(&curr_time, NULL);
  349. if (curr_time.tv_sec - m->last_heartbeat.tv_sec >= interval) {
  350. mb_header(++m->seqnum, (int) curr_time.tv_sec, 'U', m->addr, "()", -1);
  351. mb_add_command("mbus.hello", "");
  352. mb_send(m);
  353. m->last_heartbeat = curr_time;
  354. /* Remove dead sources */
  355. remove_inactiv_other_addr(m, curr_time, interval);
  356. }
  357. xfree(a);
  358. }
  359. int mbus_waiting_ack(struct mbus *m)
  360. {
  361. mbus_validate(m);
  362. return m->waiting_ack != NULL;
  363. }
  364. int mbus_sent_all(struct mbus *m)
  365. {
  366. mbus_validate(m);
  367. return (m->cmd_queue == NULL) && (m->waiting_ack == NULL);
  368. }
  369. struct mbus *mbus_init(void  (*cmd_handler)(char *src, char *cmd, char *arg, void *dat), 
  370.        void  (*err_handler)(int seqnum, int reason),
  371.        char  *addr)
  372. {
  373. struct mbus *m;
  374. struct mbus_key    k;
  375. struct mbus_parser *mp;
  376. int    i;
  377. char             *net_addr, *tmp;
  378. uint16_t           net_port;
  379. int                net_scope;
  380. m = (struct mbus *) xmalloc(sizeof(struct mbus));
  381. if (m == NULL) {
  382. debug_msg("Unable to allocate memory for mbusn");
  383. return NULL;
  384. }
  385. m->cfg = mbus_create_config();
  386. mbus_lock_config_file(m->cfg);
  387. net_addr = (char *) xmalloc(20);
  388. mbus_get_net_addr(m->cfg, net_addr, &net_port, &net_scope);
  389. m->s   = udp_init(net_addr, net_port, net_port, net_scope);
  390.         if (m->s == NULL) {
  391.                 debug_msg("Unable to initialize mbus addressn");
  392.                 xfree(m);
  393.                 return NULL;
  394.         }
  395. m->seqnum         = 0;
  396. m->cmd_handler    = cmd_handler;
  397. m->err_handler   = err_handler;
  398. m->num_other_addr = 0;
  399. m->max_other_addr = 10;
  400. m->other_addr     = (char **) xmalloc(sizeof(char *) * 10);
  401. m->other_hello    = (struct timeval **) xmalloc(sizeof(struct timeval *) * 10);
  402. for (i = 0; i < 10; i++) {
  403. m->other_addr[i]  = NULL;
  404. m->other_hello[i] = NULL;
  405. }
  406. m->cmd_queue   = NULL;
  407. m->waiting_ack   = NULL;
  408. m->magic          = MBUS_MAGIC;
  409. m->index          = 0;
  410. m->index_sent     = 0;
  411. mp = mbus_parse_init(xstrdup(addr));
  412. if (!mbus_parse_lst(mp, &tmp)) {
  413. debug_msg("Invalid mbus addressn");
  414. abort();
  415. }
  416. m->addr = xstrdup(tmp);
  417. mbus_parse_done(mp);
  418. ASSERT(m->addr != NULL);
  419. gettimeofday(&(m->last_heartbeat), NULL);
  420. mbus_get_encrkey(m->cfg, &k);
  421. m->encrkey    = k.key;
  422. m->encrkeylen = k.key_len;
  423. mbus_get_hashkey(m->cfg, &k);
  424. m->hashkey    = k.key;
  425. m->hashkeylen = k.key_len;
  426. mbus_unlock_config_file(m->cfg);
  427. xfree(net_addr);
  428. return m;
  429. }
  430. void mbus_cmd_handler(struct mbus *m, void  (*cmd_handler)(char *src, char *cmd, char *arg, void *dat))
  431. {
  432. mbus_validate(m);
  433. m->cmd_handler = cmd_handler;
  434. }
  435. static void mbus_flush_msgs(struct mbus_msg **queue)
  436. {
  437.         struct mbus_msg *curr, *next;
  438.         int i;
  439.         curr = *queue;
  440.         while(curr) {
  441.                 next = curr->next;
  442.                 xfree(curr->dest);
  443.                 for(i = 0; i < curr->num_cmds; i++) {
  444.                         xfree(curr->cmd_list[i]);
  445.                         xfree(curr->arg_list[i]);
  446.                 }
  447. xfree(curr);
  448.                 curr = next;
  449.         }
  450. *queue = NULL;
  451. }
  452. void mbus_exit(struct mbus *m) 
  453. {
  454.         int i;
  455.         ASSERT(m != NULL);
  456. mbus_validate(m);
  457. mbus_qmsg(m, "()", "mbus.bye", "", FALSE);
  458. mbus_send(m);
  459. /* FIXME: It should be a fatal error to call mbus_exit() if some messages are still outstanding. */
  460. /*        We will need an mbus_flush() call first though, to ensure nothing is waiting.          */
  461.         mbus_flush_msgs(&m->cmd_queue);
  462.         mbus_flush_msgs(&m->waiting_ack);
  463.         if (m->encrkey != NULL) {
  464.                 xfree(m->encrkey);
  465.         }
  466.         if (m->hashkey != NULL) {
  467.          xfree(m->hashkey);
  468. }
  469.         udp_exit(m->s);
  470. /* Clean up other_* */
  471. for (i=m->num_other_addr-1; i>=0; i--){
  472.     remove_other_addr(m, m->other_addr[i]);
  473. }
  474.         xfree(m->addr);
  475. xfree(m->other_addr);
  476. xfree(m->other_hello);
  477. xfree(m->cfg);
  478.         xfree(m);
  479. }
  480. void mbus_send(struct mbus *m)
  481. {
  482. /* Send one, or more, messages previosly queued with mbus_qmsg(). */
  483. /* Messages for the same destination are batched together. Stops  */
  484. /* when a reliable message is sent, until the ACK is received.    */
  485. struct mbus_msg *curr = m->cmd_queue;
  486. int  i;
  487. mbus_validate(m);
  488. if (m->waiting_ack != NULL) {
  489. return;
  490. }
  491. while (curr != NULL) {
  492. mbus_msg_validate(curr);
  493. /* It's okay for us to send messages which haven't been marked as complete - */
  494. /* that just means we're sending something which has the potential to have   */
  495. /* more data piggybacked. However, if it's not complete it MUST be the last  */
  496. /* in the list, or something has been reordered - which is bad.              */
  497. if (!curr->complete) {
  498. ASSERT(curr->next == NULL);
  499. }
  500. if (curr->reliable) {
  501.         if (!mbus_addr_valid(m, curr->dest)) {
  502.     debug_msg("Trying to send reliably to an unknown address...n");
  503.     if (m->err_handler == NULL) {
  504. abort();
  505.     }
  506.     m->err_handler(curr->seqnum, MBUS_DESTINATION_UNKNOWN);
  507. }
  508.         if (!mbus_addr_unique(m, curr->dest)) {
  509.     debug_msg("Trying to send reliably but address is not unique...n");
  510.     if (m->err_handler == NULL) {
  511. abort();
  512.     }
  513.     m->err_handler(curr->seqnum, MBUS_DESTINATION_NOT_UNIQUE);
  514. }
  515. }
  516. /* Create the message... */
  517. mb_header(curr->seqnum, curr->comp_time.tv_sec, (char)(curr->reliable?'R':'U'), m->addr, curr->dest, -1);
  518. for (i = 0; i < curr->num_cmds; i++) {
  519. ASSERT(m->index_sent == (curr->idx_list[i] - 1));
  520. m->index_sent = curr->idx_list[i];
  521. mb_add_command(curr->cmd_list[i], curr->arg_list[i]);
  522. }
  523. mb_send(m);
  524. m->cmd_queue = curr->next;
  525. if (curr->reliable) {
  526. /* Reliable message, wait for the ack... */
  527. gettimeofday(&(curr->send_time), NULL);
  528. m->waiting_ack = curr;
  529. curr->next = NULL;
  530. return;
  531. } else {
  532. while (curr->num_cmds > 0) {
  533. curr->num_cmds--;
  534. xfree(curr->cmd_list[curr->num_cmds]); curr->cmd_list[curr->num_cmds] = NULL;
  535. xfree(curr->arg_list[curr->num_cmds]); curr->arg_list[curr->num_cmds] = NULL;
  536. }
  537. xfree(curr->dest);
  538. xfree(curr);
  539. }
  540. curr = m->cmd_queue;
  541. }
  542. }
  543. void mbus_qmsg(struct mbus *m, const char *dest, const char *cmnd, const char *args, int reliable)
  544. {
  545. /* Queue up a message for sending. The message is not */
  546. /* actually sent until mbus_send() is called.         */
  547. struct mbus_msg *curr = m->cmd_queue;
  548. struct mbus_msg *prev = NULL;
  549. int  alen = strlen(cmnd) + strlen(args) + 4;
  550. int  i;
  551. mbus_validate(m);
  552. while (curr != NULL) {
  553. mbus_msg_validate(curr);
  554. if (!curr->complete) {
  555. /* This message is still open for new commands. It MUST be the last in the */
  556. /* cmd_queue, else commands will be reordered.                             */
  557. ASSERT(curr->next == NULL);
  558. if (mbus_addr_identical(curr->dest, dest) &&
  559.             (curr->num_cmds < MBUS_MAX_QLEN) && ((curr->message_size + alen) < (MBUS_BUF_SIZE - 500))) {
  560. curr->num_cmds++;
  561. curr->reliable |= reliable;
  562. curr->cmd_list[curr->num_cmds-1] = xstrdup(cmnd);
  563. curr->arg_list[curr->num_cmds-1] = xstrdup(args);
  564. curr->idx_list[curr->num_cmds-1] = ++(m->index);
  565. curr->message_size += alen;
  566. mbus_msg_validate(curr);
  567. return;
  568. } else {
  569. curr->complete = TRUE;
  570. }
  571. }
  572. prev = curr;
  573. curr = curr->next;
  574. }
  575. /* If we get here, we've not found an open message in the cmd_queue.  We */
  576. /* have to create a new message, and add it to the end of the cmd_queue. */
  577. curr = (struct mbus_msg *) xmalloc(sizeof(struct mbus_msg));
  578. curr->magic            = MBUS_MSG_MAGIC;
  579. curr->next             = NULL;
  580. curr->dest             = xstrdup(dest);
  581. curr->retransmit_count = 0;
  582. curr->message_size     = alen + 60 + strlen(dest) + strlen(m->addr);
  583. curr->seqnum           = ++m->seqnum;
  584. curr->reliable         = reliable;
  585. curr->complete         = FALSE;
  586. curr->num_cmds         = 1;
  587. curr->cmd_list[0]      = xstrdup(cmnd);
  588. curr->arg_list[0]      = xstrdup(args);
  589. curr->idx_list[curr->num_cmds-1] = ++(m->index);
  590. for (i = 1; i < MBUS_MAX_QLEN; i++) {
  591. curr->cmd_list[i] = NULL;
  592. curr->arg_list[i] = NULL;
  593. }
  594. if (prev == NULL) {
  595. m->cmd_queue = curr;
  596. } else {
  597. prev->next = curr;
  598. }
  599. gettimeofday(&(curr->send_time), NULL);
  600. gettimeofday(&(curr->comp_time), NULL);
  601. mbus_msg_validate(curr);
  602. }
  603. #define mbus_qmsgf(m, dest, reliable, cmnd, format, var) 
  604. char buffer[MBUS_BUF_SIZE]; 
  605. mbus_validate(m); 
  606. snprintf(buffer, MBUS_BUF_SIZE, format, var); 
  607. mbus_qmsg(m, dest, cmnd, buffer, reliable); 
  608. }
  609. #if 0
  610. void mbus_qmsgf(struct mbus *m, const char *dest, int reliable, const char *cmnd, const char *format, ...)
  611. {
  612. /* This is a wrapper around mbus_qmsg() which does a printf() style format into  */
  613. /* a buffer. Saves the caller from having to a a malloc(), write the args string */
  614. /* and then do a free(), and also saves worring about overflowing the buffer, so */
  615. /* removing a common source of bugs!                                             */
  616. char buffer[MBUS_BUF_SIZE];
  617. va_list ap;
  618. mbus_validate(m);
  619. va_start(ap, format);
  620. #ifdef WIN32
  621.         _vsnprintf(buffer, MBUS_BUF_SIZE, format, ap);
  622. #else
  623.         vsnprintf(buffer, MBUS_BUF_SIZE, format, ap);
  624. #endif
  625. va_end(ap);
  626. mbus_qmsg(m, dest, cmnd, buffer, reliable);
  627. }
  628. #endif
  629. int mbus_recv(struct mbus *m, void *data, struct timeval *timeout)
  630. {
  631. char *auth, *ver, *src, *dst, *ack, *r, *cmd, *param, *npos;
  632. char    buffer[MBUS_BUF_SIZE];
  633. int    buffer_len, seq, a, rx, ts, authlen, loop_count;
  634. char    ackbuf[MBUS_ACK_BUF_SIZE];
  635. char    digest[16];
  636. unsigned char  initVec[8] = {0,0,0,0,0,0,0,0};
  637. struct timeval  t;
  638. struct mbus_parser *mp, *mp2;
  639. mbus_validate(m);
  640. rx = FALSE;
  641. loop_count = 0;
  642. while (loop_count++ < 10) {
  643. memset(buffer, 0, MBUS_BUF_SIZE);
  644.                 ASSERT(m->s != NULL);
  645. udp_fd_zero();
  646. udp_fd_set(m->s);
  647. t.tv_sec  = timeout->tv_sec;
  648. t.tv_usec = timeout->tv_usec;
  649.                 if ((udp_select(&t) > 0) && udp_fd_isset(m->s)) {
  650. buffer_len = udp_recv(m->s, buffer, MBUS_BUF_SIZE);
  651. if (buffer_len > 0) {
  652. rx = TRUE;
  653. } else {
  654. return rx;
  655. }
  656. } else {
  657. return FALSE;
  658. }
  659. if (m->encrkey != NULL) {
  660. /* Decrypt the message... */
  661. if ((buffer_len % 8) != 0) {
  662. debug_msg("Encrypted message not a multiple of 8 bytes in lengthn");
  663. continue;
  664. }
  665. memcpy(mb_cryptbuf, buffer, buffer_len);
  666. memset(initVec, 0, 8);
  667. qfDES_CBC_d(m->encrkey, mb_cryptbuf, buffer_len, initVec);
  668. memcpy(buffer, mb_cryptbuf, buffer_len);
  669. }
  670. /* Sanity check that this is a vaguely sensible format message... Should prevent */
  671. /* problems if we're fed complete garbage, but won't prevent determined hackers. */
  672. if (strncmp(buffer + MBUS_AUTH_LEN + 1, "mbus/1.0", 8) != 0) {
  673. continue;
  674. }
  675. mp = mbus_parse_init(buffer);
  676. /* remove trailing 0 bytes */
  677. npos = (char *) strchr(buffer,'');
  678. if(npos!=NULL) {
  679. buffer_len=npos-buffer;
  680. }
  681. /* Parse the authentication header */
  682. if (!mbus_parse_sym(mp, &auth)) {
  683. debug_msg("Failed to parse authentication headern");
  684. mbus_parse_done(mp);
  685. continue;
  686. }
  687. /* Check that the packet authenticates correctly... */
  688. authlen = strlen(auth);
  689. hmac_md5(buffer + authlen + 1, buffer_len - authlen - 1, m->hashkey, m->hashkeylen, digest);
  690. base64encode(digest, 12, ackbuf, 16);
  691. if ((strlen(auth) != 16) || (strncmp(auth, ackbuf, 16) != 0)) {
  692. debug_msg("Failed to authenticate message...n");
  693. mbus_parse_done(mp);
  694. continue;
  695. }
  696. /* Parse the header */
  697. if (!mbus_parse_sym(mp, &ver)) {
  698. mbus_parse_done(mp);
  699. debug_msg("Parser failed version (1): %sn",ver);
  700. continue;
  701. }
  702. if (strcmp(ver, "mbus/1.0") != 0) {
  703. mbus_parse_done(mp);
  704. debug_msg("Parser failed version (2): %sn",ver);
  705. continue;
  706. }
  707. if (!mbus_parse_int(mp, &seq)) {
  708. mbus_parse_done(mp);
  709. debug_msg("Parser failed seqn");
  710. continue;
  711. }
  712. if (!mbus_parse_int(mp, &ts)) {
  713. mbus_parse_done(mp);
  714. debug_msg("Parser failed tsn");
  715. continue;
  716. }
  717. if (!mbus_parse_sym(mp, &r)) {
  718. mbus_parse_done(mp);
  719. debug_msg("Parser failed reliablen");
  720. continue;
  721. }
  722. if (!mbus_parse_lst(mp, &src)) {
  723. mbus_parse_done(mp);
  724. debug_msg("Parser failed srcn");
  725. continue;
  726. }
  727. if (!mbus_parse_lst(mp, &dst)) {
  728. mbus_parse_done(mp);
  729. debug_msg("Parser failed dstn");
  730. continue;
  731. }
  732. if (!mbus_parse_lst(mp, &ack)) {
  733. mbus_parse_done(mp);
  734. debug_msg("Parser failed ackn");
  735. continue;
  736. }
  737. store_other_addr(m, src);
  738. /* Check if the message was addressed to us... */
  739. if (mbus_addr_match(m->addr, dst)) {
  740. /* ...if so, process any ACKs received... */
  741. mp2 = mbus_parse_init(ack);
  742. while (mbus_parse_int(mp2, &a)) {
  743. if (mbus_waiting_ack(m)) {
  744. if (m->waiting_ack->seqnum == a) {
  745. while (m->waiting_ack->num_cmds > 0) {
  746. m->waiting_ack->num_cmds--;
  747. xfree(m->waiting_ack->cmd_list[m->waiting_ack->num_cmds]);
  748. xfree(m->waiting_ack->arg_list[m->waiting_ack->num_cmds]);
  749. }
  750. xfree(m->waiting_ack->dest);
  751. xfree(m->waiting_ack);
  752. m->waiting_ack = NULL;
  753. } else {
  754. debug_msg("Got ACK %d but wanted %dn", a, m->waiting_ack->seqnum);
  755. }
  756. } else {
  757. debug_msg("Got ACK %d but wasn't expecting itn", a);
  758. }
  759. }
  760. mbus_parse_done(mp2);
  761. /* ...if an ACK was requested, send one... */
  762. if (strcmp(r, "R") == 0) {
  763. char  *newsrc = (char *) xmalloc(strlen(src) + 3);
  764. struct timeval  t;
  765. sprintf(newsrc, "(%s)", src); /* Yes, this is a kludge. */
  766. gettimeofday(&t, NULL);
  767. mb_header(++m->seqnum, (int) t.tv_sec, 'U', m->addr, newsrc, seq);
  768. mb_send(m);
  769. xfree(newsrc);
  770. } else if (strcmp(r, "U") == 0) {
  771. /* Unreliable message.... not need to do anything */
  772. } else {
  773. debug_msg("Message with invalid reliability field "%s" ignoredn", r);
  774. }
  775. /* ...and process the commands contained in the message */
  776. while (mbus_parse_sym(mp, &cmd)) {
  777. if (mbus_parse_lst(mp, &param)) {
  778. char  *newsrc = (char *) xmalloc(strlen(src) + 3);
  779. sprintf(newsrc, "(%s)", src); /* Yes, this is a kludge. */
  780. /* Finally, we snoop on the message we just passed to the application, */
  781. /* to do housekeeping of our list of known mbus sources...             */
  782. if (strcmp(cmd, "mbus.bye") == 0) {
  783. remove_other_addr(m, newsrc);
  784. if (strcmp(cmd, "mbus.hello") == 0) {
  785. /* Mark this source as activ. We remove dead sources in mbus_heartbeat */
  786. store_other_addr(m, newsrc);
  787. }
  788. m->cmd_handler(newsrc, cmd, param, data);
  789. xfree(newsrc);
  790. } else {
  791. debug_msg("Unable to parse mbus command:n");
  792. debug_msg("cmd = %sn", cmd);
  793. debug_msg("arg = %sn", param);
  794. break;
  795. }
  796. }
  797. }
  798. mbus_parse_done(mp);
  799. }
  800. return rx;
  801. }
  802. #define RZ_HANDLE_WAITING 1
  803. #define RZ_HANDLE_GO      2
  804. struct mbus_rz {
  805. char *peer;
  806. char *token;
  807. struct mbus *m;
  808. void *data;
  809. int  mode;
  810. void (*cmd_handler)(char *src, char *cmd, char *args, void *data);
  811. };
  812. static void rz_handler(char *src, char *cmd, char *args, void *data)
  813. {
  814. struct mbus_rz *r = (struct mbus_rz *) data;
  815. struct mbus_parser *mp;
  816. if ((r->mode == RZ_HANDLE_WAITING) && (strcmp(cmd, "mbus.waiting") == 0)) {
  817. char *t;
  818. mp = mbus_parse_init(args);
  819. mbus_parse_str(mp, &t);
  820. if (strcmp(mbus_decode_str(t), r->token) == 0) {
  821.                         if (r->peer != NULL) xfree(r->peer);
  822. r->peer = xstrdup(src);
  823. }
  824. mbus_parse_done(mp);
  825. } else if ((r->mode == RZ_HANDLE_GO) && (strcmp(cmd, "mbus.go") == 0)) {
  826. char *t;
  827. mp = mbus_parse_init(args);
  828. mbus_parse_str(mp, &t);
  829. if (strcmp(mbus_decode_str(t), r->token) == 0) {
  830.                         if (r->peer != NULL) xfree(r->peer);
  831. r->peer = xstrdup(src);
  832. }
  833. mbus_parse_done(mp);
  834. } else {
  835. r->cmd_handler(src, cmd, args, r->data);
  836. }
  837. }
  838. char *mbus_rendezvous_waiting(struct mbus *m, char *addr, char *token, void *data)
  839. {
  840. /* Loop, sending mbus.waiting(token) to "addr", until we get mbus.go(token) */
  841. /* back from our peer. Any other mbus commands received whilst waiting are  */
  842. /* processed in the normal manner, as if mbus_recv() had been called.       */
  843. char *token_e, *peer;
  844. struct timeval  timeout;
  845. struct mbus_rz *r;
  846. mbus_validate(m);
  847. r = (struct mbus_rz *) xmalloc(sizeof(struct mbus_rz));
  848. r->peer        = NULL;
  849. r->token       = token;
  850. r->m           = m;
  851. r->data        = data;
  852. r->mode        = RZ_HANDLE_GO;
  853. r->cmd_handler = m->cmd_handler;
  854. m->cmd_handler = rz_handler;
  855. token_e        = mbus_encode_str(token);
  856. while (r->peer == NULL) {
  857. timeout.tv_sec  = 0;
  858. timeout.tv_usec = 100000;
  859. mbus_heartbeat(m, 1);
  860. mbus_qmsgf(m, addr, FALSE, "mbus.waiting", "%s", token_e);
  861. mbus_send(m);
  862. mbus_recv(m, r, &timeout);
  863. mbus_retransmit(m);
  864. }
  865. m->cmd_handler = r->cmd_handler;
  866. peer = r->peer;
  867. xfree(r);
  868. xfree(token_e);
  869. return peer;
  870. }
  871. char *mbus_rendezvous_go(struct mbus *m, char *token, void *data)
  872. {
  873. /* Wait until we receive mbus.waiting(token), then send mbus.go(token) back to   */
  874. /* the sender of that message. Whilst waiting, other mbus commands are processed */
  875. /* in the normal manner as if mbus_recv() had been called.                       */
  876. char *token_e, *peer;
  877. struct timeval  timeout;
  878. struct mbus_rz *r;
  879. mbus_validate(m);
  880. r = (struct mbus_rz *) xmalloc(sizeof(struct mbus_rz));
  881. r->peer        = NULL;
  882. r->token       = token;
  883. r->m           = m;
  884. r->data        = data;
  885. r->mode        = RZ_HANDLE_WAITING;
  886. r->cmd_handler = m->cmd_handler;
  887. m->cmd_handler = rz_handler;
  888. token_e        = mbus_encode_str(token);
  889. while (r->peer == NULL) {
  890. timeout.tv_sec  = 0;
  891. timeout.tv_usec = 100000;
  892. mbus_heartbeat(m, 1);
  893. mbus_send(m);
  894. mbus_recv(m, r, &timeout);
  895. mbus_retransmit(m);
  896. }
  897. mbus_qmsgf(m, r->peer, TRUE, "mbus.go", "%s", token_e);
  898. do {
  899. mbus_heartbeat(m, 1);
  900. mbus_retransmit(m);
  901. mbus_send(m);
  902. timeout.tv_sec  = 0;
  903. timeout.tv_usec = 100000;
  904. mbus_recv(m, r, &timeout);
  905. } while (!mbus_sent_all(m));
  906. m->cmd_handler = r->cmd_handler;
  907. peer = r->peer;
  908. xfree(r);
  909. xfree(token_e);
  910. return peer;
  911. }