bb_store.c
上传用户:gzpyjq
上传日期:2013-01-31
资源大小:1852k
文件大小:12k
源码类别:

手机WAP编程

开发平台:

WINDOWS

  1. /*
  2.  * bb_store.c : bearerbox box SMS storage/retrieval module
  3.  *
  4.  * Kalle Marjola 2001 for project Kannel
  5.  */
  6. #include <errno.h>
  7. #include <stdlib.h>
  8. #include <stdio.h>
  9. #include <time.h>
  10. #include <string.h>
  11. #include <sys/time.h>
  12. #include <sys/types.h>
  13. #include <sys/socket.h>
  14. #include <unistd.h>
  15. #include <signal.h>
  16. #include "gwlib/gwlib.h"
  17. #include "msg.h"
  18. #include "bearerbox.h"
  19. /* passed from bearerbox core */
  20. extern List *incoming_sms;
  21. extern List *outgoing_sms;
  22. extern List *flow_threads;
  23. /* growing number to be given to messages */
  24. static Counter *msg_id;
  25. static FILE *file = NULL;
  26. static Octstr *filename = NULL;
  27. static Octstr *newfile = NULL;
  28. static Octstr *bakfile = NULL;
  29. static Mutex *file_mutex = NULL;
  30. static long cleanup_thread;
  31. static List *sms_store;
  32. static List *ack_store;
  33. static void write_msg(Msg *msg)
  34. {
  35.     Octstr *pack, *line;
  36.     pack = msg_pack(msg);
  37.     line = octstr_duplicate(pack);
  38.     octstr_url_encode(line);
  39.     octstr_print(file, line);
  40.     fprintf(file, "n");
  41.     octstr_destroy(pack);
  42.     octstr_destroy(line);
  43.     
  44. }    
  45. static int open_file(Octstr *name)
  46. {
  47.     file = fopen(octstr_get_cstr(name), "w");
  48.     if (file == NULL) {
  49. error(errno, "Failed to open '%s' for writing, cannot create store-file",
  50.       octstr_get_cstr(name));
  51. return -1;
  52.     }
  53.     return 0;
  54. }
  55. static int rename_store(void)
  56. {
  57.     if (rename(octstr_get_cstr(filename), octstr_get_cstr(bakfile)) == -1) {
  58. if (errno != ENOENT) {
  59.     error(errno, "Failed to rename old store '%s' as '%s'",
  60.     octstr_get_cstr(filename), octstr_get_cstr(bakfile));
  61.     return -1;
  62. }
  63.     }
  64.     if (rename(octstr_get_cstr(newfile), octstr_get_cstr(filename)) == -1) {
  65. error(errno, "Failed to rename new store '%s' as '%s'",
  66.       octstr_get_cstr(newfile), octstr_get_cstr(filename));
  67. return -1;
  68.     }
  69.     return 0;
  70. }
  71. static int do_dump(void)
  72. {
  73.     Msg *msg;
  74.     long l;
  75.     if (filename == NULL)
  76. return 0;
  77.     /* create a new store-file and save all non-acknowledged
  78.      * messages into it
  79.      */
  80.     if (open_file(newfile)==-1)
  81. return -1;
  82.     for (l=0; l < list_len(sms_store); l++) {
  83. msg = list_get(sms_store, l);
  84. write_msg(msg);
  85.     }    
  86.     for (l=0; l < list_len(ack_store); l++) {
  87. msg = list_get(ack_store, l);
  88. write_msg(msg);
  89.     }    
  90.     fflush(file);
  91.     /* rename old storefile as .bak, and then new as regular file
  92.      * without .new ending */
  93.     return rename_store();
  94. }
  95. static int cmp_msgs(void *item, void *pattern) {
  96.     Msg *smsm, *ackm;
  97.     ackm = pattern;
  98.     smsm = item;
  99.     if (   ackm->ack.time == smsm->sms.time
  100. && ackm->ack.id == smsm->sms.id)
  101. return 1;
  102.     else
  103. return 0;
  104. }
  105. /*
  106.  * thread to cleanup store and to write it to file now and then
  107.  */
  108. static void store_cleanup(void *arg)
  109. {
  110.     Msg *ack;
  111.     List *match;
  112.     time_t last, now;
  113.     long len;
  114.     int cleanup = 0;
  115.     list_add_producer(flow_threads);
  116.     last = time(NULL);
  117.     
  118.     while((ack = list_consume(ack_store)) != NULL) {
  119. match = list_extract_matching(sms_store, ack, cmp_msgs);
  120. msg_destroy(ack);
  121. if (match == NULL) {
  122.     warning(0, "bb_store: get ACK of message not found "
  123.     "from store, strange?");
  124.     continue;
  125. }
  126. if (list_len(match) > 1)
  127.     warning(0, "bb-store cleanup: Found %ld matches!?",
  128.     list_len(match));
  129. list_destroy(match, msg_destroy_item);
  130. len = list_len(ack_store);
  131. if (len > 100)
  132.     cleanup = 1;
  133. now = time(NULL);
  134. /*
  135.  * write store to file up to each 10. second, providing
  136.  * that something happened, of course
  137.  */
  138. if (now - last > 10 || (len == 0 && cleanup)) {
  139.     store_dump();
  140.     last = now;
  141.     if (len == 0)
  142. cleanup = 0;
  143. }
  144.     }
  145.     store_dump();
  146.     if (file != NULL)
  147. fclose(file);
  148.     octstr_destroy(filename);
  149.     octstr_destroy(newfile);
  150.     octstr_destroy(bakfile);
  151.     mutex_destroy(file_mutex);
  152.     counter_destroy(msg_id);
  153.     
  154.     list_destroy(ack_store, msg_destroy_item);
  155.     list_destroy(sms_store, msg_destroy_item);
  156.     list_remove_producer(flow_threads);
  157. }
  158. /*------------------------------------------------------*/
  159. Octstr *store_status(int status_type)
  160. {
  161.     char *frmt;
  162.     char buf[1024], p[22];
  163.     Octstr *ret, *str, *t;
  164.     unsigned long l;
  165.     struct tm tm;
  166.     Msg *msg;
  167.     ret = octstr_create("");
  168.     /* set the type based header */
  169.     if (status_type == BBSTATUS_HTML) {
  170.         octstr_append_cstr(ret, "<table border=1>n"
  171.             "<tr><td>SMS ID</td><td>Sender</td><td>Receiver</td>"
  172.             "<td>SMSC ID</td><td>UDH</td><td>Message</td>"
  173.             "<td>Time</td></tr>n");
  174.     } else if (status_type == BBSTATUS_TEXT) {
  175.         octstr_append_cstr(ret, "[SMS ID] [Sender] [Receiver] [SMSC ID] [UDH] [Message] [Time]n");
  176.     }
  177.    
  178.     list_lock(sms_store);
  179.     for (l = 0; l < list_len(sms_store); l++) {
  180.         msg = list_get(sms_store, l);
  181.         if (msg_type(msg) == sms) {
  182.             
  183.             if (status_type == BBSTATUS_HTML) {
  184.                 frmt = "<tr><td>%d</td><td>%s</td><td>%s</td><td>%s</td>"
  185.                        "<td>%s</td><td>%s</td><td>%s</td></tr>n";
  186.             } else if (status_type == BBSTATUS_XML) {
  187.                 frmt = "<message>nt<id>%d</id>nt<sender>%s</sender>nt"
  188.                        "<receiver>%s</receiver>nt<smsc-id>%s</smsc-id>nt"
  189.                        "<udh-data>%s</udh-data>nt<msg-data>%s</msg-data>nt"
  190.                        "<time>%s</time>n</message>n";
  191.             } else {
  192.                 frmt = "[%d] [%s] [%s] [%s] [%s] [%s] [%s]n";
  193.             }
  194.             /* transform the time value */
  195. #if LOG_TIMESTAMP_LOCALTIME
  196.             tm = gw_localtime(msg->sms.time);
  197. #else
  198.             tm = gw_gmtime(msg->sms.time);
  199. #endif
  200.             sprintf(p, "%04d-%02d-%02d %02d:%02d:%02d",
  201.                     tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday,
  202.                     tm.tm_hour, tm.tm_min, tm.tm_sec);
  203.             t = octstr_create(p);
  204.             sprintf(buf, frmt,
  205.                 msg->sms.id,
  206.                 (msg->sms.sender ? octstr_get_cstr(msg->sms.sender) : ""),
  207.                 (msg->sms.receiver ? octstr_get_cstr(msg->sms.receiver) : ""),
  208.                 (msg->sms.smsc_id ? octstr_get_cstr(msg->sms.smsc_id) : ""),
  209.                 (msg->sms.udhdata ? octstr_get_cstr(msg->sms.udhdata) : ""),
  210.                 (msg->sms.msgdata ? octstr_get_cstr(msg->sms.msgdata) : ""),
  211.                 (t ? octstr_get_cstr(t) : ""));
  212.             octstr_destroy(t);
  213.             str = octstr_create(buf);
  214.             octstr_append(ret, str);
  215.         }
  216.     }
  217.     list_unlock(sms_store);
  218.     /* set the type based footer */
  219.     if (status_type == BBSTATUS_HTML) {
  220.         octstr_append_cstr(ret,"</table>");
  221.     }
  222.     return ret;
  223. }
  224. long store_messages(void)
  225. {
  226.     return list_len(sms_store);
  227. }
  228. int store_save(Msg *msg)
  229. {
  230.     Msg *copy;
  231.     
  232.     if (file == NULL)
  233. return 0;
  234.     if (msg_type(msg) == sms) {
  235. msg->sms.id = counter_increase(msg_id);
  236. if (counter_value(msg_id) >= 1000000)   /* limit to 1,000,000
  237.  * distinct msg/s */
  238.     counter_set(msg_id, 1);
  239. copy = msg_duplicate(msg);
  240. list_produce(sms_store, copy);
  241.     }
  242.     else if (msg_type(msg) == ack) {
  243. copy = msg_duplicate(msg);
  244. list_produce(ack_store, copy);
  245.     }
  246.     else
  247. return -1;
  248.     /* write to file, too */
  249.     mutex_lock(file_mutex);
  250.     write_msg(msg);
  251.     fflush(file);
  252.     mutex_unlock(file_mutex);
  253.     return 0;
  254. }
  255. int store_load(void)
  256. {
  257.     List *keys;
  258.     Octstr *store_file, *pack, *key;
  259.     Dict *msg_hash;
  260.     Msg *msg, *dmsg, *copy;
  261.     int retval, msgs;
  262.     long end, pos;
  263.     long store_size;
  264.     
  265.     if (filename == NULL)
  266. return 0;
  267.     list_lock(sms_store);
  268.     list_lock(ack_store);
  269.     while((msg = list_extract_first(sms_store))!=NULL)
  270. msg_destroy(msg);
  271.     while((msg = list_extract_first(ack_store))!=NULL)
  272. msg_destroy(msg);
  273.     mutex_lock(file_mutex);
  274.     if (file != NULL) {
  275. fclose(file);
  276. file = NULL;
  277.     }
  278.     
  279.     store_file = octstr_read_file(octstr_get_cstr(filename));
  280.     if (store_file != NULL)
  281. info(0, "Loading store file %s", octstr_get_cstr(filename));
  282.     else {
  283. store_file = octstr_read_file(octstr_get_cstr(newfile));
  284. if (store_file != NULL)
  285.     info(0, "Loading store file %s", octstr_get_cstr(newfile));
  286. else {
  287.     store_file = octstr_read_file(octstr_get_cstr(bakfile));
  288.     if (store_file != NULL)
  289. info(0, "Loading store file %s", octstr_get_cstr(bakfile));
  290.     else {
  291. info(0, "Cannot open any store file, starting new one");
  292. retval = open_file(filename);
  293. list_unlock(sms_store);
  294. list_unlock(ack_store);
  295. mutex_unlock(file_mutex);
  296. return retval;
  297.     }
  298. }
  299.     }
  300.     info(0, "Store-file size %ld, starting to unpack%s", octstr_len(store_file),
  301.  octstr_len(store_file) > 10000 ? " (may take awhile)" : "");
  302.     msg_hash = dict_create(101, NULL);  /* XXX should be different? */
  303.     pos = 0;
  304.     msgs = 0;
  305.     
  306.     while ((end = octstr_search(store_file, octstr_imm("n"), pos)) != -1) {
  307. pack = octstr_copy(store_file, pos, end-pos);
  308. pos = end+1;
  309. if (octstr_url_decode(pack) == -1) {
  310.     debug("bb.store", 0, "Garbage at store-file, skipped");
  311.     continue;
  312. }
  313. msg = msg_unpack(pack);
  314. if (msg == NULL) 
  315.     continue;
  316. if (msg_type(msg) == sms) {
  317.     if (msg->sms.sms_type == report) {
  318. octstr_destroy(pack);
  319. continue;
  320.     }
  321.     key = octstr_format("%d-%d", msg->sms.time, msg->sms.id);
  322.     dict_put(msg_hash, key, msg);
  323.     octstr_destroy(key);
  324.     msgs++;
  325. }
  326. else if (msg_type(msg) == ack) {
  327.     if (msg->sms.sms_type == report) {
  328. octstr_destroy(pack);
  329. continue;
  330.     }
  331.     key = octstr_format("%d-%d", msg->ack.time, msg->ack.id);
  332.     dmsg = dict_remove(msg_hash, key);
  333.     if (dmsg != NULL) 
  334. msg_destroy(dmsg);
  335.     else
  336. info(0, "Acknowledge of non-existant message found '%s', "
  337.    "discarded", octstr_get_cstr(key));
  338.     msg_destroy(msg);
  339.     octstr_destroy(key);
  340. } else {
  341.     warning(0, "Strange message in store-file, discarded, "
  342.     "dump follows:");
  343.     msg_dump(msg, 0);
  344.     msg_destroy(msg);
  345. }
  346. octstr_destroy(pack);
  347.     }
  348.     octstr_destroy(store_file);
  349.     store_size = dict_key_count(msg_hash);
  350.     info(0, "Retrieved %d messages, non-acknowledged messages: %ld",
  351.  msgs, store_size);
  352.     /* now create a new sms_store out of messages left
  353.      */
  354.     keys = dict_keys(msg_hash);
  355.     while((key = list_extract_first(keys))!=NULL) {
  356. msg = dict_get(msg_hash, key);
  357. if (msg_type(msg) != sms) {
  358.     msg_destroy(msg);
  359.     octstr_destroy(key);
  360.     continue;
  361. }
  362. copy = msg_duplicate(msg);
  363. list_produce(sms_store, copy);
  364. if (msg->sms.sms_type == mo)
  365.     list_produce(incoming_sms, msg);
  366. if (msg->sms.sms_type == mt_push ||
  367.     msg->sms.sms_type == mt_reply)
  368.     list_produce(outgoing_sms, msg);
  369. octstr_destroy(key);
  370.     }
  371.     list_destroy(keys, NULL);
  372.     /* Finally, generate new store file out of left messages
  373.      */
  374.     retval = do_dump();
  375.     
  376.     mutex_unlock(file_mutex);
  377.     /* destroy the hash */
  378.     dict_destroy(msg_hash);
  379.     list_unlock(sms_store);
  380.     list_unlock(ack_store);
  381.     return retval;
  382. }
  383. int store_dump(void)
  384. {
  385.     int retval;
  386.     list_lock(sms_store);
  387.     list_lock(ack_store);
  388.     debug("bb.store", 0, "Dumping %ld messages and %ld acks to store",
  389.   list_len(sms_store), list_len(ack_store));
  390.     mutex_lock(file_mutex);
  391.     if (file != NULL) {
  392. fclose(file);
  393. file = NULL;
  394.     }
  395.     retval = do_dump();
  396.     mutex_unlock(file_mutex);
  397.     list_unlock(ack_store);
  398.     list_unlock(sms_store);
  399.     return retval;
  400. }
  401. int store_init(Octstr *fname)
  402. {
  403.     if (octstr_len(fname) > (FILENAME_MAX-5)) {
  404.         error(0, "Store file filename too long: `%s', failed to init.",
  405.       octstr_get_cstr(fname));
  406. return -1;
  407.     }
  408.     filename = octstr_duplicate(fname);
  409.     newfile = octstr_format("%s.new", octstr_get_cstr(filename));
  410.     bakfile = octstr_format("%s.bak", octstr_get_cstr(filename));
  411.     msg_id = counter_create();
  412.     counter_set(msg_id, 1);
  413.     sms_store = list_create();
  414.     ack_store = list_create();
  415.     file_mutex = mutex_create();
  416.     list_add_producer(ack_store);
  417.     
  418.     if ((cleanup_thread = gwthread_create(store_cleanup, NULL))==-1)
  419. panic(0, "Failed to create a cleanup thread!");
  420.     return 0;
  421. }
  422. void store_shutdown(void)
  423. {
  424.     if (filename == NULL)
  425. return;
  426.     list_remove_producer(ack_store);
  427. }