faxQueueApp.c++
上传用户:weiyuanprp
上传日期:2020-05-20
资源大小:1169k
文件大小:116k
源码类别:

传真(Fax)编程

开发平台:

C/C++

  1. /* $Id: faxQueueApp.c++,v 1.88 2009/10/17 21:20:59 faxguy Exp $ */
  2. /*
  3.  * Copyright (c) 1990-1996 Sam Leffler
  4.  * Copyright (c) 1991-1996 Silicon Graphics, Inc.
  5.  * HylaFAX is a trademark of Silicon Graphics
  6.  *
  7.  * Permission to use, copy, modify, distribute, and sell this software and 
  8.  * its documentation for any purpose is hereby granted without fee, provided
  9.  * that (i) the above copyright notices and this permission notice appear in
  10.  * all copies of the software and related documentation, and (ii) the names of
  11.  * Sam Leffler and Silicon Graphics may not be used in any advertising or
  12.  * publicity relating to the software without the specific, prior written
  13.  * permission of Sam Leffler and Silicon Graphics.
  14.  * 
  15.  * THE SOFTWARE IS PROVIDED "AS-IS" AND WITHOUT WARRANTY OF ANY KIND, 
  16.  * EXPRESS, IMPLIED OR OTHERWISE, INCLUDING WITHOUT LIMITATION, ANY 
  17.  * WARRANTY OF MERCHANTABILITY OR FITNESS FOR A PARTICULAR PURPOSE.  
  18.  * 
  19.  * IN NO EVENT SHALL SAM LEFFLER OR SILICON GRAPHICS BE LIABLE FOR
  20.  * ANY SPECIAL, INCIDENTAL, INDIRECT OR CONSEQUENTIAL DAMAGES OF ANY KIND,
  21.  * OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS,
  22.  * WHETHER OR NOT ADVISED OF THE POSSIBILITY OF DAMAGE, AND ON ANY THEORY OF 
  23.  * LIABILITY, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE 
  24.  * OF THIS SOFTWARE.
  25.  */
  26. #include "Sys.h"
  27. #include <ctype.h>
  28. #include <errno.h>
  29. #include <math.h>
  30. #include <limits.h>
  31. #include <sys/file.h>
  32. #include <tiffio.h>
  33. #include "Dispatcher.h"
  34. #include "FaxMachineInfo.h"
  35. #include "FaxAcctInfo.h"
  36. #include "FaxRequest.h"
  37. #include "FaxTrace.h"
  38. #include "FaxRecvInfo.h"
  39. #include "Timeout.h"
  40. #include "UUCPLock.h"
  41. #include "DialRules.h"
  42. #include "RE.h"
  43. #include "Modem.h"
  44. #include "Trigger.h"
  45. #include "faxQueueApp.h"
  46. #include "HylaClient.h"
  47. #include "MemoryDecoder.h"
  48. #include "FaxSendInfo.h"
  49. #include "config.h"
  50. /*
  51.  * HylaFAX Spooling and Command Agent.
  52.  */
  53. const fxStr faxQueueApp::sendDir = FAX_SENDDIR;
  54. const fxStr faxQueueApp::docDir = FAX_DOCDIR;
  55. const fxStr faxQueueApp::clientDir = FAX_CLIENTDIR;
  56. fxStr strTime(time_t t) { return fxStr(fmtTime(t)); }
  57. #define JOBHASH(pri) (((pri) >> 4) & (NQHASH-1))
  58. faxQueueApp::SchedTimeout::SchedTimeout()
  59. {
  60.     started = false;
  61.     pending = false;
  62.     lastRun = Sys::now() - 1;
  63. }
  64. faxQueueApp::SchedTimeout::~SchedTimeout() {}
  65. void
  66. faxQueueApp::SchedTimeout::timerExpired(long, long)
  67. {
  68.     if (pending && lastRun <= Sys::now()) pending = false;
  69.     if (faxQueueApp::instance().scheduling() ) {
  70. start(0);
  71. return;
  72.     }
  73.     faxQueueApp::instance().runScheduler();
  74.     started = false;
  75. }
  76. void
  77. faxQueueApp::SchedTimeout::start(u_short s)
  78. {
  79.     /*
  80.      * If we don't throttle the scheduler then large
  81.      * queues can halt the system with CPU consumption.
  82.      * So we keep the scheduler from running more than
  83.      * once per second.
  84.      */
  85.     if (!started && Sys::now() != lastRun) {
  86. started = true;
  87. pending = false;
  88. Dispatcher::instance().startTimer(s, 1, this);
  89. lastRun = Sys::now() + s;
  90.     } else {
  91. if (!pending && lastRun <= Sys::now()) {
  92.     /*
  93.      * The scheduler is either running now or has been run
  94.      * within the last second and there are no timers set
  95.      * to trigger another scheduler run.  So we set a
  96.      * timer to go off in one second to avoid a stalled
  97.      * run queue.
  98.      */
  99.     Dispatcher::instance().startTimer(s + 1, 0, this);
  100.     lastRun = Sys::now() + 1 + s;
  101.     pending = true;
  102. }
  103.     }
  104. }
  105. faxQueueApp* faxQueueApp::_instance = NULL;
  106. faxQueueApp::faxQueueApp()
  107.     : configFile(FAX_CONFIG)
  108. {
  109.     fifo = -1;
  110.     quit = false;
  111.     dialRules = NULL;
  112.     inSchedule = false;
  113.     setupConfig();
  114.     fxAssert(_instance == NULL, "Cannot create multiple faxQueueApp instances");
  115.     _instance = this;
  116. }
  117. faxQueueApp::~faxQueueApp()
  118. {
  119.     HylaClient::purge();
  120.     delete dialRules;
  121. }
  122. faxQueueApp& faxQueueApp::instance() { return *_instance; }
  123. void
  124. faxQueueApp::initialize(int argc, char** argv)
  125. {
  126.     updateConfig(configFile); // read config file
  127.     faxApp::initialize(argc, argv);
  128.     for (GetoptIter iter(argc, argv, getOpts()); iter.notDone(); iter++)
  129. switch (iter.option()) {
  130. case 'c': // set configuration parameter
  131.     readConfigItem(iter.optArg());
  132.     break;
  133. }
  134.     logInfo("%s", HYLAFAX_VERSION);
  135.     logInfo("%s", "Copyright (c) 1990-1996 Sam Leffler");
  136.     logInfo("%s", "Copyright (c) 1991-1996 Silicon Graphics, Inc.");
  137.     scanForModems();
  138. }
  139. void
  140. faxQueueApp::open()
  141. {
  142.     faxApp::open();
  143.     scanQueueDirectory();
  144.     Modem::broadcast("HELLO"); // announce queuer presence
  145.     scanClientDirectory(); // announce queuer presence
  146.     pokeScheduler();
  147. }
  148. void
  149. faxQueueApp::blockSignals()
  150. {
  151.     sigset_t block;
  152.     sigemptyset(&block);
  153.     sigaddset(&block, SIGCHLD);
  154.     sigprocmask(SIG_BLOCK, &block, NULL);
  155. }
  156. void
  157. faxQueueApp::releaseSignals()
  158. {
  159.     sigset_t release;
  160.     sigemptyset(&release);
  161.     sigaddset(&release, SIGCHLD);
  162.     sigprocmask (SIG_UNBLOCK, &release, NULL);
  163. }
  164. /*
  165.  * Scan the spool area for modems.  We can't be certain the
  166.  * modems are actively working without probing them; this
  167.  * work is done simply to buildup the internal database for
  168.  * broadcasting a ``HELLO'' message.  Later on, individual
  169.  * modems are enabled for use based on messages received
  170.  * through the FIFO.
  171.  */
  172. void
  173. faxQueueApp::scanForModems()
  174. {
  175.     DIR* dir = Sys::opendir(".");
  176.     if (dir == NULL) {
  177. logError("Could not scan directory for modems");
  178. return;
  179.     }
  180.     fxStr fifoMatch(fifoName | ".");
  181.     for (dirent* dp = readdir(dir); dp; dp = readdir(dir)) {
  182. if (dp->d_name[0] != fifoName[0])
  183.     continue;
  184. if (!strneq(dp->d_name, fifoMatch, fifoMatch.length()))
  185.     continue;
  186. if (Sys::isFIFOFile(dp->d_name)) {
  187.     fxStr devid(dp->d_name);
  188.     devid.remove(0, fifoMatch.length()-1); // NB: leave "."
  189.     if (Sys::isRegularFile(FAX_CONFIG | devid)) {
  190. devid.remove(0); // strip "."
  191. (void) Modem::getModemByID(devid); // adds to list
  192.     }
  193. }
  194.     }
  195.     closedir(dir);
  196. }
  197. /*
  198.  * Scan the spool directory for queue files and
  199.  * enter them in the queues of outgoing jobs.
  200.  */
  201. void
  202. faxQueueApp::scanQueueDirectory()
  203. {
  204.     DIR* dir = Sys::opendir(sendDir);
  205.     if (dir == NULL) {
  206. logError("Could not scan %s directory for outbound jobs",
  207. (const char*)sendDir);
  208. return;
  209.     }
  210.     for (dirent* dp = readdir(dir); dp; dp = readdir(dir)) {
  211. if (dp->d_name[0] == 'q')
  212.     submitJob(&dp->d_name[1], true);
  213.     }
  214.     closedir(dir);
  215. }
  216. /*
  217.  * Scan the client area for active client processes
  218.  * and send a ``HELLO message'' to notify them the
  219.  * queuer process has restarted.  If no process is
  220.  * listening on the FIFO, remove it; the associated
  221.  * client state will be purged later.
  222.  */
  223. void
  224. faxQueueApp::scanClientDirectory()
  225. {
  226.     DIR* dir = Sys::opendir(clientDir);
  227.     if (dir == NULL) {
  228. logError("Could not scan %s directory for clients",
  229. (const char*) clientDir);
  230. return;
  231.     }
  232.     for (dirent* dp = readdir(dir); dp; dp = readdir(dir)) {
  233. if (!isdigit(dp->d_name[0]))
  234.     continue;
  235. fxStr fifo(clientDir | "/" | dp->d_name);
  236. if (Sys::isFIFOFile((const char*) fifo))
  237.     if (!HylaClient::getClient(fifo).send("HELLO", 6))
  238. Sys::unlink(fifo);
  239.     }
  240.     closedir(dir);
  241. }
  242. /*
  243.  * Process a job.  Prepare it for transmission and
  244.  * pass it on to the thread that does the actual
  245.  * transmission work.  The job is marked ``active to
  246.  * this destination'' prior to preparing it because
  247.  * preparation may involve asynchronous activities.
  248.  * The job is placed on the active list so that it
  249.  * can be located by filename if necessary.
  250.  */
  251. void
  252. faxQueueApp::processJob(Job& job, FaxRequest* req, DestInfo& di)
  253. {
  254.     JobStatus status;
  255.     FaxMachineInfo& info = di.getInfo(job.dest);
  256.     Job* bjob = job.bfirst(); // first job in batch
  257.     Job* cjob = &job; // current job
  258.     FaxRequest* creq = req; // current request
  259.     Job* njob = NULL; // next job
  260.     
  261.     for (; cjob != NULL; cjob = njob) {
  262. creq = cjob->breq;
  263. njob = cjob->bnext;
  264. cjob->commid = ""; // set on return
  265. req->notice = ""; // Clear for new procssing
  266. di.active(*cjob);
  267. setActive(*cjob); // place job on active list
  268. updateRequest(*creq, *cjob);
  269. if (!prepareJobNeeded(*cjob, *creq, status)) {
  270.     if (status != Job::done) {
  271. if (cjob->bprev == NULL)
  272.     bjob = njob;
  273. cjob->state = FaxRequest::state_failed;
  274. deleteRequest(*cjob, creq, status, true);
  275. setDead(*cjob);
  276.     }
  277. } else {
  278.     if (prepareJobStart(*cjob, creq, info))
  279. return;
  280.     else if (cjob->bprev == NULL)
  281. bjob = njob;
  282. }
  283.     }
  284.     if (bjob != NULL)
  285. sendJobStart(*bjob, bjob->breq);
  286. }
  287. /*
  288.  * Check if the job requires preparation that should
  289.  * done in a fork'd copy of the server.  A sub-fork is
  290.  * used if documents must be converted or a continuation
  291.  * cover page must be crafted (i.e. the work may take
  292.  * a while).
  293.  */
  294. bool
  295. faxQueueApp::prepareJobNeeded(Job& job, FaxRequest& req, JobStatus& status)
  296. {
  297.     if (!req.items.length()) {
  298. req.notice = "Job contains no documents {E323}";
  299. status = Job::rejected;
  300. jobError(job, "SEND REJECT: %s", (const char*) req.notice);
  301. return (false);
  302.     }
  303.     for (u_int i = 0, n = req.items.length(); i < n; i++)
  304. switch (req.items[i].op) {
  305. case FaxRequest::send_postscript: // convert PostScript
  306. case FaxRequest::send_pcl: // convert PCL
  307. case FaxRequest::send_tiff: // verify&possibly convert TIFF
  308. case FaxRequest::send_pdf: // convert PDF
  309.     return (true);
  310. case FaxRequest::send_poll: // verify modem is capable
  311.     if (!job.modem->supportsPolling()) {
  312. req.notice = "Modem does not support polling {E324}";
  313. status = Job::rejected;
  314. jobError(job, "SEND REJECT: %s", (const char*) req.notice);
  315. return (false);
  316.     }
  317.     break;
  318. }
  319.     status = Job::done;
  320.     return (req.cover != ""); // need continuation cover page
  321. }
  322. /*
  323.  * Handler used by job preparation subprocess
  324.  * to pass signal from parent queuer process.
  325.  * We mark state so job preparation will be aborted
  326.  * at the next safe point in the procedure.
  327.  */
  328. void
  329. faxQueueApp::prepareCleanup(int s)
  330. {
  331.     int old_errno = errno;
  332.     signal(s, fxSIGHANDLER(faxQueueApp::prepareCleanup));
  333.     logError("CAUGHT SIGNAL %d, ABORT JOB PREPARATION", s);
  334.     faxQueueApp::instance().abortPrepare = true;
  335.     errno = old_errno;
  336. }
  337. /*
  338.  * Start job preparation in a sub-fork.  The server process
  339.  * forks and sets up a Dispatcher handler to reap the child
  340.  * process.  The exit status from the child is actually the
  341.  * return value from the prepareJob method; this and a
  342.  * reference to the original Job are passed back into the
  343.  * server thread at which point the transmit work is actually
  344.  * initiated.
  345.  */
  346. bool
  347. faxQueueApp::prepareJobStart(Job& job, FaxRequest* req,
  348.     FaxMachineInfo& info)
  349. {
  350.     traceQueue(job, "PREPARE START");
  351.     abortPrepare = false;
  352.     pid_t pid = fork();
  353.     switch (pid) {
  354.     case 0: // child, do work
  355. /*
  356.  * NB: There is a window here where the subprocess
  357.  * doing the job preparation can have the old signal
  358.  * handlers installed when a signal comes in.  This
  359.  * could be fixed by using the appropriate POSIX calls
  360.  * to block and unblock signals, but signal usage is
  361.  * quite tenuous (i.e. what is and is not supported
  362.  * on a system), so rather than depend on this
  363.  * functionality being supported, we'll just leave
  364.  * the (small) window in until it shows itself to
  365.  * be a problem.
  366.  */
  367. signal(SIGTERM, fxSIGHANDLER(faxQueueApp::prepareCleanup));
  368. signal(SIGINT, fxSIGHANDLER(faxQueueApp::prepareCleanup));
  369. _exit(prepareJob(job, *req, info));
  370. /*NOTREACHED*/
  371.     case -1: // fork failed, sleep and retry
  372. if (job.isOnList()) job.remove(); // Remove from active queue
  373. delayJob(job, *req, "Could not fork to prepare job for transmission {E340}",
  374.     Sys::now() + random() % requeueInterval);
  375. delete req;
  376. return false;
  377.     default: // parent, setup handler to wait
  378. job.startPrepare(pid);
  379. delete req; // must reread after preparation
  380. job.breq = NULL;
  381. Trigger::post(Trigger::JOB_PREP_BEGIN, job);
  382. return true;
  383.     }
  384. }
  385. /*
  386.  * Handle notification from the sub-fork that job preparation
  387.  * is done.  The exit status is checked and interpreted as the
  388.  * return value from prepareJob if it was passed via _exit.
  389.  */
  390. void
  391. faxQueueApp::prepareJobDone(Job& job, int status)
  392. {
  393.     traceQueue(job, "PREPARE DONE");
  394.     Trigger::post(Trigger::JOB_PREP_END, job);
  395.     if (status&0xff) {
  396. logError("JOB %s: bad exit status %#x from sub-fork",
  397.     (const char*) job.jobid, status);
  398. status = Job::failed;
  399.     } else
  400. status >>= 8;
  401.     if (job.suspendPending) { // co-thread waiting
  402. job.suspendPending = false;
  403. releaseModem(job);
  404.     } else {
  405. FaxRequest* req = readRequest(job);
  406. if (!req) {
  407.     // NB: no way to notify the user (XXX)
  408.     logError("JOB %s: qfile vanished during preparation",
  409. (const char*) job.jobid);
  410.     setDead(job);
  411. } else {
  412.     bool processnext = false;
  413.     bool startsendjob = false;
  414.     Job* targetjob = &job;
  415.     if (status == Job::done) { // preparation completed successfully
  416. job.breq = req;
  417. startsendjob = (job.bnext == NULL);
  418. processnext = !startsendjob;
  419. if (processnext) {
  420.     targetjob = job.bnext;
  421. }
  422.     } else {
  423. /*
  424.  * Job preparation did not complete successfully.
  425.  *
  426.  * If there is more than one job in this batch, then remove this job
  427.  * and adjust the batch accordingly.
  428.  */
  429. if (job.bnext == NULL) { // no jobs left in batch
  430.     targetjob = job.bprev;
  431.     startsendjob = (targetjob != NULL);
  432. } else { // more jobs in batch
  433.     targetjob = job.bnext;
  434.     processnext = true;
  435. }
  436. /*
  437.  * If there are other jobs in the batch, we have to be
  438.  * careful to *not* release the modem, otherwise faxq will
  439.  * schedule new jobs on this modem while the rest of the jobs
  440.  * in the batch are still using it.
  441.  */
  442. if (startsendjob || processnext)
  443. job.modem = NULL;
  444. if (status == Job::requeued) {
  445.     if (job.isOnList()) job.remove();
  446.     delayJob(job, *req, "Could not fork to prepare job for transmission {E340}",
  447. Sys::now() + random() % requeueInterval);
  448.     delete req;
  449. } else {
  450.     deleteRequest(job, req, status, true);
  451.     setDead(job);
  452. }
  453.     }
  454.     if (processnext)
  455. processJob(*targetjob, targetjob->breq, destJobs[targetjob->dest]);
  456.     else if (startsendjob)
  457. sendJobStart(*targetjob->bfirst(), targetjob->bfirst()->breq);
  458.     else {
  459. /*
  460.  * This destination was marked as called, but all jobs to this
  461.  * destination failed preparation, so we must undo the call marking.
  462.  */
  463. DestInfo& di = destJobs[job.dest];
  464. di.hangup(); // do before unblockDestJobs
  465. unblockDestJobs(di); // release any blocked jobs
  466. removeDestInfoJob(job);
  467. pokeScheduler();
  468.     }
  469. }
  470.     }
  471. }
  472. /*
  473.  * Document Use Database.
  474.  *
  475.  * The server minimizes imaging operations by checking for the
  476.  * existence of compatible, previously imaged, versions of documents.
  477.  * This is done by using a file naming convention that includes the
  478.  * source document name and the remote machine capabilities that are
  479.  * used for imaging.  The work done here (and in other HylaFAX apps)
  480.  * also assumes certain naming convention used by hfaxd when creating
  481.  * document files.  Source documents are named:
  482.  *
  483.  *     doc<docnum>.<type>
  484.  *
  485.  * where <docnum> is a unique document number that is assigned by
  486.  * hfaxd at the time the document is stored on the server.  Document
  487.  * references by a job are then done using filenames (i.e. hard
  488.  * links) of the form:
  489.  *
  490.  * doc<docnum>.<type>.<jobid>
  491.  *
  492.  * where <jobid> is the unique identifier assigned to each outbound
  493.  * job.  Then, each imaged document is named:
  494.  *
  495.  * doc<docnum>.<type>;<encoded-capabilities>
  496.  *
  497.  * where <encoded-capabilities> is a string that encodes the remote
  498.  * machine's capabilities.
  499.  *
  500.  * Before imaging a document the scheduler checks to see if there is
  501.  * an existing file with the appropriate name.  If so then the file
  502.  * is used and no preparation work is needed for sending the document.
  503.  * Otherwise the document must be converted for transmission; this
  504.  * result is written to a file with the appropriate name.  After an
  505.  * imaged document has been transmitted it is not immediately removed,
  506.  * but rather the scheduler is informed that the job no longer holds
  507.  * (needs) a reference to the document and the scheduler records this
  508.  * information so that when no jobs reference the original source
  509.  * document, all imaged forms may be expunged.  As documents are
  510.  * transmitted the job references to the original source documents are
  511.  * converted to references to the ``base document name'' (the form
  512.  * without the <jobid>) so that the link count on the inode for this
  513.  * file reflects the number of references from jobs that are still
  514.  * pending transmission.  This means that the scheduler can use the
  515.  * link count to decide when to expunge imaged versions of a document.
  516.  *
  517.  * Note that the reference counts used here do not necessarily
  518.  * *guarantee* that a pre-imaged version of a document will be available.
  519.  * There are race conditions under which a document may be re-imaged
  520.  * because a previously imaged version was removed.
  521.  *
  522.  * A separate document scavenger program should be run periodically
  523.  * to expunge any document files that might be left in the docq for
  524.  * unexpected reasons.  This program should read the set of jobs in
  525.  * the sendq to build a onetime table of uses and then remove any
  526.  * files found in the docq that are not referenced by a job.
  527.  */
  528. /*
  529.  * Remove a reference to an imaged document and if no
  530.  * references exist for the corresponding source document,
  531.  * expunge all imaged versions of the document.
  532.  */
  533. void
  534. faxQueueApp::unrefDoc(const fxStr& file)
  535. {
  536.     /*
  537.      * Convert imaged document name to the base
  538.      * (source) document name by removing the
  539.      * encoded session parameters used for imaging.
  540.      */
  541.     u_int l = file.nextR(file.length(), ';');
  542.     if (l == 0) {
  543. logError("Bogus document handed to unrefDoc: %s", (const char*) file);
  544. return;
  545.     }
  546.     fxStr doc = file.head(l-1);
  547.     /*
  548.      * Add file to the list of pending removals.  We
  549.      * do this before checking below so that the list
  550.      * of files will always have something on it.
  551.      */
  552.     fxStr& files = pendingDocs[doc];
  553.     if (files.find(0, file) == files.length()) // suppress duplicates
  554. files.append(file | " ");
  555.     if (tracingLevel & FAXTRACE_DOCREFS)
  556. logInfo("DOC UNREF: %s files %s",
  557.     (const char*) file, (const char*) files);
  558.     /*
  559.      * The following assumes that any source document has
  560.      * been renamed to the base document name *before* this
  561.      * routine is invoked (either directly or via a msg
  562.      * received on a FIFO).  Specifically, if the stat
  563.      * call fails we assume the file does not exist and
  564.      * that it is safe to remove the imaged documents.
  565.      * This is conservative and if wrong will not break
  566.      * anything; just potentially cause extra imaging
  567.      * work to be done.
  568.      */
  569.     struct stat sb;
  570.     if (Sys::stat(doc, sb) < 0 || sb.st_nlink == 1) {
  571. if (tracingLevel & FAXTRACE_DOCREFS)
  572.     logInfo("DOC UNREF: expunge imaged files");
  573. /*
  574.  * There are no additional references to the
  575.  * original source document (all references
  576.  * should be from completed jobs that reference
  577.  * the original source document by its basename).
  578.  * Expunge imaged documents that were waiting for
  579.  * all potential uses to complete.
  580.  */
  581. l = 0;
  582. do {
  583.     (void) Sys::unlink(files.token(l, ' '));
  584. } while (l < files.length());
  585. pendingDocs.remove(doc);
  586.     }
  587. }
  588. #include "class2.h"
  589. /*
  590.  * Prepare a job by converting any user-submitted documents
  591.  * to a format suitable for transmission.
  592.  */
  593. JobStatus
  594. faxQueueApp::prepareJob(Job& job, FaxRequest& req,
  595.     const FaxMachineInfo& info)
  596. {
  597.     /*
  598.      * Select imaging parameters according to requested
  599.      * values, client capabilities, and modem capabilities.
  600.      * Note that by this time we believe the modem is capable
  601.      * of certain requirements needed to transmit the document
  602.      * (based on the capabilities passed to us by faxgetty).
  603.      */
  604.     Class2Params params;
  605.     
  606.     /*
  607.      * User requested vres (98 or 196) and usexvres (1=true or 0=false)
  608.      */
  609.     int vres = req.resolution;
  610.     int usexvres = req.usexvres;
  611.     /*
  612.      * System overrides in JobControl:
  613.      * VRes: we check for vres = 98 or vres = 196 in JobControl;
  614.      *       if vres is not set getVRes returns 0.
  615.      * UseXVres: we check for usexvres = 0 or usexvres = 1 in JobControl;
  616.      *           if usexvres is not set getUseXVRes retuns -1.
  617.      */
  618.     if (job.getJCI().getVRes() == 98)
  619. vres = 98;
  620.     else if (job.getJCI().getVRes() == 196)
  621. vres = 196;
  622.     if (job.getJCI().getUseXVRes() == 0)
  623. usexvres = 0;
  624.     else if (job.getJCI().getUseXVRes() == 1)
  625. usexvres = 1;
  626.     // use the highest resolution the client supports
  627.     params.vr = VR_NORMAL;
  628.     if (usexvres) {
  629. if (info.getSupportsVRes() & VR_200X100 && job.modem->supportsVR(VR_200X100))
  630.     params.vr = VR_200X100;
  631. if (info.getSupportsVRes() & VR_FINE && job.modem->supportsVR(VR_FINE))
  632.     params.vr = VR_FINE;
  633. if (info.getSupportsVRes() & VR_200X200 && job.modem->supportsVR(VR_200X200))
  634.     params.vr = VR_200X200;
  635. if (info.getSupportsVRes() & VR_R8 && job.modem->supportsVR(VR_R8))
  636.     params.vr = VR_R8;
  637. if (info.getSupportsVRes() & VR_200X400 && job.modem->supportsVR(VR_200X400))
  638.     params.vr = VR_200X400;
  639. if (info.getSupportsVRes() & VR_300X300 && job.modem->supportsVR(VR_300X300))
  640.     params.vr = VR_300X300;
  641. if (info.getSupportsVRes() & VR_R16 && job.modem->supportsVR(VR_R16))
  642.     params.vr = VR_R16;
  643.     } else { // limit ourselves to normal and fine
  644. if (vres > 150) {
  645.     if (info.getSupportsVRes() & VR_FINE && job.modem->supportsVR(VR_FINE))
  646. params.vr = VR_FINE;
  647. }
  648.     }
  649.     params.setPageWidthInMM(
  650. fxmin((u_int) req.pagewidth, (u_int) info.getMaxPageWidthInMM()));
  651.     /*
  652.      * Follow faxsend and use unlimited page length whenever possible.
  653.      */
  654.     useUnlimitedLN = (info.getMaxPageLengthInMM() == (u_short) -1);
  655.     params.setPageLengthInMM(
  656. fxmin((u_int) req.pagelength, (u_int) info.getMaxPageLengthInMM()));
  657.     /*
  658.      * Generate MMR or 2D-encoded facsimile if:
  659.      * o the server is permitted to generate it,
  660.      * o the modem is capable of sending it,
  661.      * o the remote side is known to be capable of it, and
  662.      * o the user hasn't specified a desire to send 1D data.
  663.      */
  664.     int jcdf = job.getJCI().getDesiredDF();
  665.     if (jcdf != -1) req.desireddf = jcdf;
  666.     if (req.desireddf == DF_2DMMR && (req.desiredec != EC_DISABLE) && 
  667. use2D && job.modem->supportsMMR() &&
  668.  (! info.getCalledBefore() || info.getSupportsMMR()) )
  669.     params.df = DF_2DMMR;
  670.     else if (req.desireddf > DF_1DMH) {
  671. params.df = (use2D && job.modem->supports2D() &&
  672.     (! info.getCalledBefore() || info.getSupports2DEncoding()) ) ?
  673. DF_2DMR : DF_1DMH;
  674.     } else
  675. params.df = DF_1DMH;
  676.     /*
  677.      * Check and process the documents to be sent
  678.      * using the parameter selected above.
  679.      */
  680.     JobStatus status = Job::done;
  681.     bool updateQFile = false;
  682.     fxStr tmp; // NB: here to avoid compiler complaint
  683.     u_int i = 0;
  684.     while (i < req.items.length() && status == Job::done && !abortPrepare) {
  685. FaxItem& fitem = req.items[i];
  686. switch (fitem.op) {
  687. case FaxRequest::send_postscript: // convert PostScript
  688. case FaxRequest::send_pcl: // convert PCL
  689. case FaxRequest::send_tiff: // verify&possibly convert TIFF
  690.         case FaxRequest::send_pdf: // convert PDF
  691.     tmp = FaxRequest::mkbasedoc(fitem.item) | ";" | params.encodePage();
  692.     status = convertDocument(job, fitem, tmp, params, req.notice);
  693.     if (status == Job::done) {
  694. /*
  695.  * Insert converted file into list and mark the
  696.  * original document so that it's saved, but
  697.  * not processed again.  The converted file
  698.  * is sent, while the saved file is kept around
  699.  * in case it needs to be returned to the sender.
  700.  */
  701. fitem.op++; // NB: assumes order of enum
  702. req.insertFax(i+1, tmp);
  703.     } else
  704. Sys::unlink(tmp); // bail out
  705.     updateQFile = true;
  706.     break;
  707. }
  708. i++;
  709.     }
  710.     if (status == Job::done && !abortPrepare) {
  711. if (req.pagehandling == "" && !abortPrepare) {
  712.     /*
  713.      * Calculate/recalculate the per-page session parameters
  714.      * and check the page count against the max pages.  We
  715.      * do this before generating continuation any cover page
  716.      * to prevent any skippages setting from trying to skip
  717.      * the continuation cover page.
  718.      */
  719.     if (!preparePageHandling(job, req, info, req.notice)) {
  720. status = Job::rejected; // XXX
  721. req.notice.insert("Document preparation failed: ");
  722.     }
  723.     updateQFile = true;
  724. }
  725. if (req.cover != "" && !abortPrepare) {
  726.     /*
  727.      * Generate a continuation cover page if necessary.
  728.      * Note that a failure in doing this is not considered
  729.      * fatal; perhaps this should be configurable?
  730.      */
  731.     if (updateQFile) 
  732. updateRequest(req, job); // cover-page generation may look at the file
  733.     if (makeCoverPage(job, req, params))
  734. req.nocountcover++;
  735.     updateQFile = true;
  736.     /*
  737.      * Recalculate the per-page session parameters.
  738.      */
  739.     if (!preparePageHandling(job, req, info, req.notice)) {
  740.   status = Job::rejected; // XXX
  741. req.notice.insert("Document preparation failed: ");
  742.     }
  743. }    
  744.     }
  745.     if (updateQFile)
  746. updateRequest(req, job);
  747.     return (status);
  748. }
  749. /*
  750.  * Prepare the job for transmission by analysing
  751.  * the page characteristics and determining whether
  752.  * or not the page transfer parameters will have
  753.  * to be renegotiated after the page is sent.  This
  754.  * is done before the call is placed because it can
  755.  * be slow and there can be timing problems if this
  756.  * is done during transmission.
  757.  */
  758. bool
  759. faxQueueApp::preparePageHandling(Job& job, FaxRequest& req,
  760.     const FaxMachineInfo& info, fxStr& emsg)
  761. {
  762.     /*
  763.      * Figure out whether to try chopping off white space
  764.      * from the bottom of pages.  This can only be done
  765.      * if the remote device is thought to be capable of
  766.      * accepting variable-length pages.
  767.      */
  768.     u_int pagechop;
  769.     if (info.getMaxPageLengthInMM() == (u_short)-1) {
  770. pagechop = req.pagechop;
  771. if (pagechop == FaxRequest::chop_default)
  772.     pagechop = pageChop;
  773.     } else
  774. pagechop = FaxRequest::chop_none;
  775.     u_int maxPages = job.getJCI().getMaxSendPages();
  776.     /*
  777.      * Scan the pages and figure out where session parameters
  778.      * will need to be renegotiated.  Construct a string of
  779.      * indicators to use when doing the actual transmission.
  780.      *
  781.      * NB: all files are coalesced into a single fax document
  782.      *     if possible
  783.      */
  784.     Class2Params params; // current parameters
  785.     Class2Params next; // parameters for ``next'' page
  786.     TIFF* tif = NULL; // current open TIFF image
  787.     req.totpages = req.npages; // count pages previously transmitted
  788.     if (req.skippages) { // job instructs us to skip pages...
  789. u_int i = req.findItem(FaxRequest::send_fax, 0);
  790. if (i) req.items[0].dirnum = req.skippages; // mark original
  791. req.items[i].dirnum = req.skippages; // mark prepared image
  792. req.skippedpages += req.skippages; // update tagline indicators
  793. req.skippages = 0;
  794.     }
  795.     for (u_int i = 0;;) {
  796. if (!tif || TIFFLastDirectory(tif)) {
  797.     /*
  798.      * Locate the next file to be sent.
  799.      */
  800.     if (tif) // close previous file
  801. TIFFClose(tif), tif = NULL;
  802.     if (i >= req.items.length()) {
  803. req.pagehandling.append('P'); // EOP
  804. return (true);
  805.     }
  806.     i = req.findItem(FaxRequest::send_fax, i);
  807.     if (i == fx_invalidArrayIndex) {
  808. req.pagehandling.append('P'); // EOP
  809. return (true);
  810.     }
  811.     const FaxItem& fitem = req.items[i];
  812.     tif = TIFFOpen(fitem.item, "r");
  813.     if (tif == NULL) {
  814. emsg = "Can not open document file " | fitem.item | " {E314}";
  815. if (tif)
  816.     TIFFClose(tif);
  817. return (false);
  818.     }
  819.     if (fitem.dirnum != 0 && !TIFFSetDirectory(tif, fitem.dirnum)) {
  820. emsg = fxStr::format(
  821.     "Can not set directory %u in document file %s {E315}"
  822.     , fitem.dirnum
  823.     , (const char*) fitem.item
  824. );
  825. if (tif)
  826.     TIFFClose(tif);
  827. return (false);
  828.     }
  829.     i++; // advance for next find
  830. } else {
  831.     /*
  832.      * Read the next TIFF directory.
  833.      */
  834.     if (!TIFFReadDirectory(tif)) {
  835. emsg = fxStr::format(
  836.     "Error reading directory %u in document file %s {E316}"
  837.     , TIFFCurrentDirectory(tif)
  838.     , TIFFFileName(tif)
  839. );
  840. if (tif)
  841.     TIFFClose(tif);
  842. return (false);
  843.     }
  844. }
  845. if (++req.totpages > maxPages) {
  846.     emsg = fxStr::format("Too many pages in submission; max %u {E317}",
  847. maxPages);
  848.     if (tif)
  849. TIFFClose(tif);
  850.     return (false);
  851. }
  852. next = params;
  853. setupParams(tif, next, info);
  854. if (params.df != (u_int) -1) {
  855.     /*
  856.      * The pagehandling string has:
  857.      * 'M' = EOM, for when parameters must be renegotiated
  858.      * 'S' = MPS, for when next page uses the same parameters
  859.      * 'P' = EOP, for the last page to be transmitted
  860.      */
  861.     req.pagehandling.append(next == params ? 'S' : 'M');
  862. }
  863. /*
  864.  * Record the session parameters needed by each page
  865.  * so that we can set the initial session parameters
  866.  * as needed *before* dialing the telephone.  This is
  867.  * to cope with Class 2 modems that do not properly
  868.  * implement the +FDIS command.
  869.  */
  870. req.pagehandling.append(next.encodePage());
  871. /*
  872.  * If page is to be chopped (i.e. sent with trailing white
  873.  * space removed so the received page uses minimal paper),
  874.  * scan the data and, if possible, record the amount of data
  875.  * that should not be sent.  The modem drivers will use this
  876.  * information during transmission if it's actually possible
  877.  * to do the chop (based on the negotiated session parameters).
  878.  */
  879. if (pagechop == FaxRequest::chop_all ||
  880.   (pagechop == FaxRequest::chop_last && TIFFLastDirectory(tif)))
  881.     preparePageChop(req, tif, next, req.pagehandling);
  882. params = next;
  883.     }
  884. }
  885. /*
  886.  * Select session parameters according to the info
  887.  * in the TIFF file.  We setup the encoding scheme,
  888.  * page width & length, and vertical-resolution
  889.  * parameters.
  890.  */
  891. void
  892. faxQueueApp::setupParams(TIFF* tif, Class2Params& params, const FaxMachineInfo& info)
  893. {
  894.     uint16 compression = 0;
  895.     (void) TIFFGetField(tif, TIFFTAG_COMPRESSION, &compression);
  896.     if (compression == COMPRESSION_CCITTFAX4) {
  897. params.df = DF_2DMMR;
  898.     } else {
  899. uint32 g3opts = 0;
  900. TIFFGetField(tif, TIFFTAG_GROUP3OPTIONS, &g3opts);
  901. params.df = (g3opts&GROUP3OPT_2DENCODING ? DF_2DMR : DF_1DMH);
  902.     }
  903.     uint32 w;
  904.     TIFFGetField(tif, TIFFTAG_IMAGEWIDTH, &w);
  905.     params.setPageWidthInPixels((u_int) w);
  906.     /*
  907.      * Try to deduce the vertical resolution of the image
  908.      * image.  This can be problematical for arbitrary TIFF
  909.      * images 'cuz vendors sometimes don't give the units.
  910.      * We, however, can depend on the info in images that
  911.      * we generate 'cuz we're careful to include valid info.
  912.      */
  913.     float yres, xres;
  914.     if (TIFFGetField(tif, TIFFTAG_YRESOLUTION, &yres) && TIFFGetField(tif, TIFFTAG_XRESOLUTION, &xres)) {
  915. uint16 resunit;
  916. TIFFGetFieldDefaulted(tif, TIFFTAG_RESOLUTIONUNIT, &resunit);
  917. if (resunit == RESUNIT_CENTIMETER)
  918.     yres *= 25.4;
  919.     xres *= 25.4;
  920. params.setRes((u_int) xres, (u_int) yres);
  921.     } else {
  922. /*
  923.  * No resolution is specified, try
  924.  * to deduce one from the image length.
  925.  */
  926. uint32 l;
  927. TIFFGetField(tif, TIFFTAG_IMAGELENGTH, &l);
  928. // B4 at 98 lpi is ~1400 lines
  929. params.setRes(204, (l < 1450 ? 98 : 196));
  930.     }
  931.     /*
  932.      * Select page length according to the image size and
  933.      * vertical resolution.  Note that if the resolution
  934.      * info is bogus, we may select the wrong page size.
  935.      */
  936.     if (info.getMaxPageLengthInMM() != (u_short)-1) {
  937. uint32 h;
  938. TIFFGetField(tif, TIFFTAG_IMAGELENGTH, &h);
  939. params.setPageLengthInMM((u_int)(h / yres));
  940.     } else
  941. params.ln = LN_INF;
  942. }
  943. void
  944. faxQueueApp::preparePageChop(const FaxRequest& req,
  945.     TIFF* tif, const Class2Params& params, fxStr& pagehandling)
  946. {
  947.     tstrip_t s = TIFFNumberOfStrips(tif)-1;
  948.     uint32* stripbytecount;
  949.     (void) TIFFGetField(tif, TIFFTAG_STRIPBYTECOUNTS, &stripbytecount);
  950.     u_int stripSize = (u_int) stripbytecount[s];
  951.     if (stripSize == 0)
  952. return;
  953.     u_char* data = new u_char[stripSize];
  954.     if (TIFFReadRawStrip(tif, s, data, stripSize) >= 0) {
  955. uint16 fillorder;
  956. TIFFGetFieldDefaulted(tif, TIFFTAG_FILLORDER, &fillorder);
  957. MemoryDecoder dec(data, stripSize);
  958. dec.scanPageForBlanks(fillorder, params);
  959. float threshold = req.chopthreshold;
  960. if (threshold == -1)
  961.     threshold = pageChopThreshold;
  962. u_int minRows = 0;
  963. switch(params.vr) {
  964.     case VR_NORMAL:
  965.     case VR_200X100:
  966. minRows = (u_int) (98. * threshold);
  967. break;
  968.     case VR_FINE:
  969.     case VR_200X200:
  970. minRows = (u_int) (196. * threshold);
  971. break;
  972.     case VR_300X300:
  973. minRows = (u_int) (300. * threshold);
  974. break;
  975.     case VR_R8:
  976.     case VR_R16:
  977.     case VR_200X400:
  978. minRows = (u_int) (391. * threshold);
  979. break;
  980. }
  981. if (dec.getLastBlanks() > minRows)
  982. {
  983.     pagehandling.append(fxStr::format("Z%04x",
  984. fxmin((unsigned)0xFFFF, stripSize - (dec.getEndOfPage() - data))));
  985. }
  986.     }
  987.     delete [] data;
  988. }
  989. /*
  990.  * Convert a document into a form suitable
  991.  * for transmission to the remote fax machine.
  992.  */
  993. JobStatus
  994. faxQueueApp::convertDocument(Job& job,
  995.     const FaxItem& req,
  996.     const fxStr& outFile,
  997.     const Class2Params& params,
  998.     fxStr& emsg)
  999. {
  1000.     JobStatus status;
  1001.     /*
  1002.      * Open/create the target file and lock it to guard against
  1003.      * concurrent jobs imaging the same document with the same
  1004.      * parameters.  The parent will hold the open file descriptor
  1005.      * for the duration of the imaging job.  Concurrent jobs will
  1006.      * block on flock and wait for the imaging to be completed.
  1007.      * Previously imaged documents will be flock'd immediately
  1008.      * and reused without delays after verifying that they were
  1009.      * last modified *after* the source image.
  1010.      *
  1011.      * NB: There is a race condition here.  One process may create
  1012.      * the file but another may get the shared lock above before
  1013.      * the exclusive lock below is captured.  If this happens
  1014.      * then the exclusive lock will block temporarily, but the
  1015.      * process with the shared lock may attempt to send a document
  1016.      * before it's preparation is completed.  We could add a delay
  1017.      * before the shared lock but that would slow down the normal
  1018.      * case and the window is small--so let's leave it there for now.
  1019.      */
  1020.     struct stat sin;
  1021.     struct stat sout;
  1022.     if (Sys::stat(outFile, sout) == 0 && Sys::stat(req.item, sin) == 0) {
  1023. if (sout.st_mtime < sin.st_mtime) {
  1024.     /*
  1025.      * It appears that the target file exists and is 
  1026.      * older than the source image.  (Thus the old target is an image 
  1027.      * file from a previous job.)  This can happen, for example,
  1028.      * if faxqclean isn't being run frequently-enough and faxq
  1029.      * for some reason did not delete the old target file after its job 
  1030.      * completion.  Thus, we delete the old file before continuing.
  1031.      */
  1032.      jobError(job, "Removing old image file: %s (run faxqclean more often)", (const char*) outFile);
  1033.      (void) Sys::unlink(outFile);
  1034. }
  1035.     }
  1036.     int fd = Sys::open(outFile, O_RDWR|O_CREAT|O_EXCL, 0600);
  1037.     if (fd == -1) {
  1038. if (errno == EEXIST) {
  1039.     /*
  1040.      * The file already exist, flock it in case it's
  1041.      * being created (we'll block until the imaging
  1042.      * is completed).  Otherwise, the document imaging
  1043.      * has already been completed and we can just use it.
  1044.      */
  1045.     fd = Sys::open(outFile, O_RDWR); // NB: RDWR for flock emulations
  1046.     if (fd != -1) {
  1047. if (flock(fd, LOCK_SH) == -1) {
  1048.     status = Job::format_failed;
  1049.     emsg = "Unable to lock shared document file {E318}";
  1050. } else
  1051.     status = Job::done;
  1052. (void) Sys::close(fd); // NB: implicit unlock
  1053.     } else {
  1054. /*
  1055.  * This *can* happen if document preparation done
  1056.  * by another job fails (e.g. because of a time
  1057.  * limit or a malformed PostScript submission).
  1058.  */
  1059. status = Job::format_failed;
  1060. emsg = "Unable to open shared document file {E319}";
  1061.     }
  1062. } else {
  1063.     status = Job::format_failed;
  1064.     emsg = "Unable to create document file {E320}";
  1065. }
  1066. /*
  1067.  * We were unable to open, create, or flock
  1068.  * the file.  This should not happen.
  1069.  */
  1070. if (status != Job::done)
  1071.     jobError(job, "CONVERT DOCUMENT: %s: %m", (const char*) emsg);
  1072.     } else {
  1073. (void) flock(fd, LOCK_EX); // XXX check for errors?
  1074. /*
  1075.  * Imaged document does not exist, run the document converter
  1076.  * to generate it.  The converter is invoked according to:
  1077.  *   -i jobid jobid number
  1078.  *   -o file output (temp) file
  1079.  *   -r <res> output resolution (dpi)
  1080.  *   -w <pagewidth> output page width (pixels)
  1081.  *   -l <pagelength> output page length (mm)
  1082.  *   -m <maxpages> max pages to generate
  1083.  *   -1|-2|-3 1d, 2d, or 2d-mmr encoding
  1084.  */
  1085. fxStr rbuf = fxStr::format("%u", params.verticalRes());
  1086. fxStr wbuf = fxStr::format("%u", params.pageWidth());
  1087. fxStr lbuf = fxStr::format("%d", params.pageLength());
  1088. fxStr mbuf = fxStr::format("%u", job.getJCI().getMaxSendPages());
  1089. const char* argv[30];
  1090. int ac = 0;
  1091. switch (req.op) {
  1092. case FaxRequest::send_postscript: argv[ac++] = ps2faxCmd; break;
  1093. case FaxRequest::send_pdf:   argv[ac++] = pdf2faxCmd; break;
  1094. case FaxRequest::send_pcl:   argv[ac++] = pcl2faxCmd; break;
  1095. case FaxRequest::send_tiff:   argv[ac++] = tiff2faxCmd; break;
  1096. }
  1097. argv[ac++] = "-i"; argv[ac++] = (const char*)job.jobid;
  1098. argv[ac++] = "-o"; argv[ac++] = outFile;
  1099. argv[ac++] = "-r"; argv[ac++] = (const char*)rbuf;
  1100. argv[ac++] = "-w"; argv[ac++] = (const char*)wbuf;
  1101. argv[ac++] = "-l"; argv[ac++] = (const char*)lbuf;
  1102. argv[ac++] = "-m"; argv[ac++] = (const char*)mbuf;
  1103. if (useUnlimitedLN) argv[ac++] = "-U";
  1104. if (params.df == DF_2DMMR)
  1105.     argv[ac++] = "-3";
  1106. else
  1107.     argv[ac++] = params.df == DF_1DMH ? "-1" : "-2";
  1108. argv[ac++] = req.item;
  1109. argv[ac] = NULL;
  1110. // XXX the (char* const*) is a hack to force type compatibility
  1111. status = runConverter(job, argv[0], (char* const*) argv, emsg);
  1112. if (status == Job::done) {
  1113.     /*
  1114.      * Many converters exit with zero status even when
  1115.      * there are problems so scan the the generated TIFF
  1116.      * to verify the integrity of the converted data.
  1117.      *
  1118.      * NB: We must reopen the file instead of using the
  1119.      *     previously opened file descriptor in case the
  1120.      *     converter creates a new file with the given
  1121.      *     output filename instead of just overwriting the
  1122.      *     file created above.  This can easily happen if,
  1123.      *     for example, the converter creates a link from
  1124.      *     the input file to the target (e.g. tiff2fax
  1125.      *     does this when no conversion is required).
  1126.      */
  1127.     TIFF* tif = TIFFOpen(outFile, "r");
  1128.     if (tif) {
  1129. while (!TIFFLastDirectory(tif))
  1130.     if (!TIFFReadDirectory(tif)) {
  1131. status = Job::format_failed;
  1132. emsg = "Converted document is not valid TIFF {E321}";
  1133. break;
  1134.     }
  1135. TIFFClose(tif);
  1136.     } else {
  1137. status = Job::format_failed;
  1138. emsg = "Could not reopen converted document to verify format {E322}";
  1139.     }
  1140.     if (status == Job::done) // discard any debugging output
  1141. emsg = "";
  1142.     else
  1143. jobError(job, "CONVERT DOCUMENT: %s", (const char*) emsg);
  1144. } else if (status == Job::rejected)
  1145.     jobError(job, "SEND REJECT: %s", (const char*) emsg);
  1146. (void) Sys::close(fd); // NB: implicit unlock
  1147.     }
  1148.     return (status);
  1149. }
  1150. static void
  1151. closeAllBut(int fd)
  1152. {
  1153.     for (int f = Sys::getOpenMax()-1; f >= 0; f--)
  1154. if (f != fd)
  1155.     Sys::close(f);
  1156. }
  1157. /*
  1158.  * Startup a document converter program in a subprocess
  1159.  * with the output returned through a pipe.  We could just use
  1160.  * popen or similar here, but we want to detect fork failure
  1161.  * separately from others so that jobs can be requeued instead
  1162.  * of rejected.
  1163.  */
  1164. JobStatus
  1165. faxQueueApp::runConverter(Job& job, const char* app, char* const* argv, fxStr& emsg)
  1166. {
  1167.     fxStr cmdline(argv[0]);
  1168.     for (u_int i = 1; argv[i] != NULL; i++)
  1169. cmdline.append(fxStr::format(" %s", argv[i]));
  1170.     traceQueue(job, "CONVERT DOCUMENT: %s", (const char*)cmdline);
  1171.     JobStatus status;
  1172.     int pfd[2];
  1173.     if (pipe(pfd) >= 0) {
  1174. int fd;
  1175. pid_t pid = fork();
  1176. switch (pid) {
  1177. case -1: // error
  1178.     jobError(job, "CONVERT DOCUMENT: fork: %m");
  1179.     status = Job::requeued; // job should be retried
  1180.     Sys::close(pfd[1]);
  1181.     break;
  1182. case 0: // child, exec command
  1183.     if (pfd[1] != STDOUT_FILENO)
  1184. dup2(pfd[1], STDOUT_FILENO);
  1185.     closeAllBut(STDOUT_FILENO);
  1186.     dup2(STDOUT_FILENO, STDERR_FILENO);
  1187.     fd = Sys::open(_PATH_DEVNULL, O_RDWR);
  1188.     if (fd != STDIN_FILENO)
  1189.     {
  1190.     dup2(fd, STDIN_FILENO);
  1191.     Sys::close(fd);
  1192.     }
  1193.     Sys::execv(app, argv);
  1194.     sleep(3); // XXX give parent time to catch signal
  1195.     _exit(255);
  1196.     /*NOTREACHED*/
  1197. default: // parent, read from pipe and wait
  1198.     Sys::close(pfd[1]);
  1199.     if (runConverter1(job, pfd[0], emsg)) {
  1200. int estat = -1;
  1201. (void) Sys::waitpid(pid, estat);
  1202. if (estat)
  1203.     jobError(job, "CONVERT DOCUMENT: exit status %#x", estat);
  1204. switch (estat) {
  1205. case 0:  status = Job::done; break;
  1206.         case (254<<8):  status = Job::rejected; break;
  1207. case (255<<8): case 255: status = Job::no_formatter; break;
  1208. default:  status = Job::format_failed; break;
  1209. }
  1210.     } else {
  1211. kill(pid, SIGTERM);
  1212. (void) Sys::waitpid(pid);
  1213. status = Job::format_failed;
  1214.     }
  1215.     break;
  1216. }
  1217. Sys::close(pfd[0]);
  1218.     } else {
  1219. jobError(job, "CONVERT DOCUMENT: pipe: %m");
  1220. status = Job::format_failed;
  1221.     }
  1222.     return (status);
  1223. }
  1224. /*
  1225.  * Replace unprintable characters with ``?''s.
  1226.  */
  1227. static void
  1228. cleanse(char buf[], int n)
  1229. {
  1230.     while (--n >= 0)
  1231. if (!isprint(buf[n]) && !isspace(buf[n]))
  1232.     buf[n] = '?';
  1233. }
  1234. /*
  1235.  * Run the interpreter with the configured timeout and
  1236.  * collect the output from the interpreter in case there
  1237.  * is an error -- this is sent back to the user that
  1238.  * submitted the job.
  1239.  */
  1240. bool
  1241. faxQueueApp::runConverter1(Job& job, int fd, fxStr& output)
  1242. {
  1243.     int n;
  1244.     Timeout timer;
  1245.     timer.startTimeout(postscriptTimeout*1000);
  1246.     char buf[1024];
  1247.     while ((n = Sys::read(fd, buf, sizeof (buf))) > 0 && !timer.wasTimeout()) {
  1248. cleanse(buf, n);
  1249. output.append(buf, n);
  1250.     }
  1251.     timer.stopTimeout();
  1252.     if (timer.wasTimeout()) {
  1253. jobError(job, "CONVERT DOCUMENT: job time limit exceeded");
  1254. if (output.length()) output.append("n");
  1255. output.append("[Job time limit exceeded]");
  1256. return (false);
  1257.     } else
  1258. return (true);
  1259. }
  1260. /*
  1261.  * Generate a continuation cover page and insert it in
  1262.  * the array of files to be sent.  Note that we assume
  1263.  * the cover page command generates PostScript which we
  1264.  * immediately image, discarding the PostScript.  We
  1265.  * could have the cover page command script do this, but
  1266.  * then it would need to know how to invoke the PostScript
  1267.  * imager per the job characteristics.  Note that we could
  1268.  * optimize things here by updating the pagehandling and
  1269.  * page counts for the job instead of resetting pagehandling
  1270.  * so that everything just gets recalculated from scratch.
  1271.  */
  1272. bool
  1273. faxQueueApp::makeCoverPage(Job& job, FaxRequest& req, const Class2Params& params)
  1274. {
  1275.     bool ok = true;
  1276.     FaxItem fitem(FaxRequest::send_postscript, 0, fxStr::null, req.cover);
  1277.     fxStr cmd(coverCmd
  1278. | quote | quoted(req.qfile)             | enquote
  1279. | quote | quoted(contCoverPageTemplate) | enquote
  1280. | quote | fitem.item                    | enquote
  1281.     );
  1282.     traceQueue(job, "COVER PAGE: %s", (const char*)cmd);
  1283.     if (runCmd(cmd, true)) {
  1284. fxStr emsg;
  1285. fxStr tmp = fitem.item | ";" | params.encodePage();
  1286. if (convertDocument(job, fitem, tmp, params, emsg)) {
  1287.     req.insertFax(0, tmp);
  1288.     req.cover = tmp; // needed in sendJobDone
  1289.     req.pagehandling = ""; // XXX force recalculation
  1290. } else {
  1291.     jobError(job, "SEND: No continuation cover page, "
  1292. " document conversion failed: %s", (const char*) emsg);
  1293.     ok = false;
  1294. }
  1295. Sys::unlink(fitem.item);
  1296.     } else {
  1297. jobError(job,
  1298.     "SEND: No continuation cover page, generation cmd failed");
  1299. ok = false;
  1300.     }
  1301.     return (ok);
  1302. }
  1303. const fxStr&
  1304. faxQueueApp::pickCmd(const FaxRequest& req)
  1305. {
  1306.     if (req.jobtype == "pager")
  1307. return (sendPageCmd);
  1308.     if (req.jobtype == "uucp")
  1309. return (sendUUCPCmd);
  1310.     return (sendFaxCmd); // XXX gotta return something
  1311. }
  1312. /*
  1313.  * Setup the argument vector and exec a subprocess.
  1314.  * This code assumes the command and dargs strings have
  1315.  * previously been processed to insert  characters
  1316.  * between each argument string (see crackArgv below).
  1317.  */
  1318. static void
  1319. doexec(const char* cmd, const fxStr& dargs, const char* devid, const char* files, int nfiles)
  1320. {
  1321. #define MAXARGS 128
  1322.     const char* av[MAXARGS];
  1323.     int ac = 0;
  1324.     const char* cp = strrchr(cmd, '/');
  1325.     // NB: can't use ?: 'cuz of AIX compiler (XXX)
  1326.     if (cp)
  1327. av[ac++] = cp+1; // program name
  1328.     else
  1329. av[ac++] = cmd;
  1330.     cp = strchr(cmd,'');
  1331.     const char* ep = strchr(cmd, '');
  1332.     while (cp < ep && ac < MAXARGS-4) { // additional pre-split args
  1333. av[ac++] = ++cp;
  1334. cp = strchr(cp,'');
  1335.     }
  1336.     cp = dargs;
  1337.     ep = cp + dargs.length();
  1338.     while (cp < ep && ac < MAXARGS-4) { // pre-split dargs
  1339. av[ac++] = cp;
  1340. cp = strchr(cp,'')+1;
  1341.     }
  1342.     av[ac++] = "-m"; av[ac++] = devid;
  1343.     if (! (MAXARGS > ac + nfiles))
  1344.     {
  1345. sleep(1);
  1346.      logError("%d files requires %d arguments, max %d", nfiles, ac+nfiles+1, MAXARGS);
  1347. return;
  1348.     }
  1349.     while (files)
  1350.     {
  1351. av[ac++] = files;
  1352. files = strchr(files, ' ');
  1353. /*
  1354.  * We can be naster with memory here - we're exec()ing right way
  1355.  */
  1356. if (files)
  1357.     *(char*)files++ = '';
  1358.     }
  1359.     av[ac] = NULL;
  1360.     Sys::execv(cmd, (char* const*) av);
  1361. }
  1362. #undef MAXARGS
  1363. static void
  1364. join(fxStr& s, const fxStr& a)
  1365. {
  1366.     const char* cp = a;
  1367.     const char* ep = cp + a.length();
  1368.     while (cp < ep) {
  1369. s.append(' ');
  1370. s.append(cp);
  1371. cp = strchr(cp,'')+1;
  1372.     }
  1373. }
  1374. static fxStr
  1375. joinargs(const fxStr& cmd, const fxStr& dargs)
  1376. {
  1377.     fxStr s;
  1378.     join(s, cmd);
  1379.     join(s, dargs);
  1380.     return s;
  1381. }
  1382. void
  1383. faxQueueApp::sendJobStart(Job& job, FaxRequest* req)
  1384. {
  1385.     Job* cjob;
  1386.     int nfiles = 1;
  1387.     job.start = Sys::now(); // start of transmission
  1388.     fxStr files = job.file;
  1389.     for (cjob = job.bnext; cjob != NULL; cjob = cjob->bnext) {
  1390. files = files | " " | cjob->file;
  1391. cjob->start = job.start;
  1392. // XXX start deadman timeout on active jobs
  1393. nfiles++;
  1394.     }
  1395.     
  1396.     const fxStr& cmd = pickCmd(*req);
  1397.     fxStr dargs(job.getJCI().getArgs());
  1398.     pid_t pid = fork();
  1399.     switch (pid) {
  1400.     case 0: // child, startup command
  1401. closeAllBut(-1); // NB: close 'em all
  1402. doexec(cmd, dargs, job.modem->getDeviceID(), files, nfiles);
  1403. sleep(10); // XXX give parent time to catch signal
  1404. _exit(127);
  1405. /*NOTREACHED*/
  1406.     case -1: // fork failed, sleep and retry
  1407. /*
  1408.  * We were unable to start the command because the
  1409.  * system is out of processes.  Take the jobs off the
  1410.  * active list and requeue them for a future time. 
  1411.  * If it appears that the we're doing this a lot,
  1412.  * then lengthen the backoff.
  1413.  */
  1414. Job* njob;
  1415. for (cjob = &job; cjob != NULL; cjob = njob) {
  1416.     njob = cjob->bnext;
  1417.     req = cjob->breq;
  1418.     cjob->remove(); // Remove from active queue
  1419.     delayJob(*cjob, *req, "Could not fork to start job transmission {E341}",
  1420. cjob->start + random() % requeueInterval);
  1421.     delete req;
  1422. }
  1423. break;
  1424.     default: // parent, setup handler to wait
  1425. // joinargs puts a leading space so this looks funny here
  1426. traceQueue(job, "CMD START%s -m %s %s (PID %d)"
  1427.     , (const char*) joinargs(cmd, dargs)
  1428.     , (const char*) job.modem->getDeviceID()
  1429.     , (const char*) files
  1430.     , pid
  1431. );
  1432. job.startSend(pid);
  1433. for (cjob = &job; cjob != NULL; cjob = njob) {
  1434.     cjob->pid = pid;
  1435.     njob = cjob->bnext;
  1436.     Trigger::post(Trigger::SEND_BEGIN, *cjob);
  1437.     delete cjob->breq; // discard handle (NB: releases lock)
  1438.     cjob->breq = NULL;
  1439. }
  1440. break;
  1441.     }
  1442. }
  1443. void
  1444. faxQueueApp::sendJobDone(Job& job, int status)
  1445. {
  1446.     traceQueue(job, "CMD DONE: exit status %#x", status);
  1447.     if (status&0xff)
  1448. logError("Send program terminated abnormally with exit status %#x", status);
  1449.     Job* cjob;
  1450.     Job* njob;
  1451.     DestInfo& di = destJobs[job.dest];
  1452.     di.hangup(); // do before unblockDestJobs
  1453.     releaseModem(job); // done with modem
  1454.     FaxRequest* req = readRequest(job);
  1455.     if (req && req->status == send_retry) {
  1456. // prevent turnaround-redialing, delay any blocked jobs
  1457. time_t newtts = req->tts;
  1458. while ((cjob = di.nextBlocked())) {
  1459.     FaxRequest* blockedreq = readRequest(*cjob);
  1460.     if (blockedreq) {
  1461. delayJob(*cjob, *blockedreq, "Delayed by prior call {E342}", newtts);
  1462. delete blockedreq;
  1463.     }
  1464. }
  1465.     } else {
  1466. unblockDestJobs(di);
  1467.     }
  1468.     removeDestInfoJob(job);
  1469.     for (cjob = &job; cjob != NULL; cjob = njob) {
  1470. njob = cjob->bnext;
  1471. if (cjob != &job) req = readRequest(*cjob); // the first was already read
  1472. if (!req) {
  1473.     time_t now = Sys::now();
  1474.     time_t duration = now - job.start;
  1475.     logError("JOB %s: SEND FINISHED: %s; but job file vanished",
  1476. (const char*) cjob->jobid, fmtTime(duration));
  1477.     setDead(*cjob);
  1478.     continue;
  1479. }
  1480. sendJobDone(*cjob, req);
  1481.     }
  1482.     pokeScheduler();
  1483. }
  1484. void
  1485. faxQueueApp::sendJobDone(Job& job, FaxRequest* req)
  1486. {
  1487.     time_t now = Sys::now();
  1488.     time_t duration = now - job.start;
  1489.     Trigger::post(Trigger::SEND_END, job);
  1490.     job.bnext = NULL; job.bprev = NULL; // clear any batching
  1491.     job.commid = req->commid; // passed from subprocess
  1492.     if (req->status == 127) {
  1493. req->notice = "Send program terminated abnormally; unable to exec " |
  1494.     pickCmd(*req) | "{E343}";
  1495. req->status = send_failed;
  1496. logError("JOB %s: %s",
  1497. (const char*)job.jobid, (const char*)req->notice);
  1498.     }
  1499.     if (req->status == send_reformat) {
  1500. /*
  1501.  * Job requires reformatting to deal with the discovery
  1502.  * of unexpected remote capabilities (either because
  1503.  * the capabilities changed or because the remote side
  1504.  * has never been called before and the default setup
  1505.  * created a capabilities mismatch).  Purge the job of
  1506.  * any formatted information and reset the state so that
  1507.  * when the job is retried it will be reformatted according
  1508.  * to the updated remote capabilities.
  1509.  */
  1510. Trigger::post(Trigger::SEND_REFORMAT, job);
  1511. u_int i = 0;
  1512. while (i < req->items.length()) {
  1513.     FaxItem& fitem = req->items[i];
  1514.     if (fitem.op == FaxRequest::send_fax) {
  1515. unrefDoc(fitem.item);
  1516. req->items.remove(i);
  1517. continue;
  1518.     }
  1519.     if (fitem.isSavedOp())
  1520. fitem.op--; // assumes order of enum
  1521.     i++;
  1522. }
  1523. req->pagehandling = ""; // force recalculation
  1524. req->status = send_retry; // ... force retry
  1525.     }
  1526.     /*
  1527.      * If the job did not finish and it is due to be
  1528.      * suspended (possibly followed by termination),
  1529.      * then treat it as if it is to be retried in case
  1530.      * it does get rescheduled.
  1531.      */
  1532.     if (req->status != send_done && job.suspendPending) {
  1533. req->notice = "Job interrupted by user {E344}";
  1534. req->status = send_retry;
  1535.     }
  1536.     if (job.killtime == 0 && !job.suspendPending && req->status == send_retry) {
  1537. /*
  1538.  * The job timed out during the send attempt.  We
  1539.  * couldn't do anything then, but now the job can
  1540.  * be cleaned up.  Not sure if the user should be
  1541.  * notified of the requeue as well as the timeout?
  1542.  */
  1543. req->notice = "Kill time expired {E325}";
  1544. updateRequest(*req, job);
  1545. job.state = FaxRequest::state_failed;
  1546. deleteRequest(job, req, Job::timedout, true);
  1547. setDead(job);
  1548.     } else if (req->status == send_retry) {
  1549. /*
  1550.  * If a continuation cover page is required for
  1551.  * the retransmission, fixup the job state so
  1552.  * that it'll get one when it's next processed.
  1553.  */
  1554. if (req->cover != "") {
  1555.     /*
  1556.      * Job was previously setup to get a continuation
  1557.      * cover page.  If the generated cover page was not
  1558.      * sent, then delete it so that it'll get recreated.
  1559.      */
  1560.     if (req->items[0].item == req->cover) {
  1561. Sys::unlink(req->cover);
  1562. req->items.remove(0);
  1563.     }
  1564. } else if (req->useccover &&
  1565.   req->npages > 0 && contCoverPageTemplate != "") {
  1566.     /*
  1567.      * Setup to generate a cover page when the job is
  1568.      * retried.  Note that we assume the continuation
  1569.      * cover page will be PostScript (though the
  1570.      * type is not used anywhere just now).
  1571.      */
  1572.     req->cover = docDir | "/cover" | req->jobid | ".ps";
  1573. }
  1574. if (req->tts < now) {
  1575.     /*
  1576.      * Send failed and send app didn't specify a new
  1577.      * tts, bump the ``time to send'' by the requeue
  1578.      * interval, then rewrite the queue file.  This causes
  1579.      * the job to be rescheduled for transmission
  1580.      * at a future time.
  1581.      */
  1582.     req->tts = now + (req->retrytime != 0
  1583. ? req->retrytime
  1584. : (requeueInterval>>1) + (random()%requeueInterval));
  1585.     job.tts = req->tts;
  1586. }
  1587. /*
  1588.  * Bump the job priority if is not bulk-style in which case
  1589.  * we dip the job job priority.  This is intended to prevent
  1590.  * non-bulk faxes from becoming buried by new jobs which
  1591.  * could prevent a timely retry.  However, it is also intended
  1592.  * to allow all bulk faxes to be attempted before retrying
  1593.  * any that could not complete on the first attempt.  This 
  1594.  * aids in timely delivery of bulk faxes as a group rather than
  1595.  * preoccupation with individual jobs as is the case with 
  1596.  * non-bulk style jobs.  We bound the priority to keep it
  1597.  * within a fixed "range" around it's starting priority.  This
  1598.  * is intended to keep "normal" and "high" priority jobs
  1599.  * from conflicting.
  1600.  */
  1601. if (job.pri != 255 && job.pri > 190) job.pri++;
  1602. else if (JOBHASH(job.pri-1) == JOBHASH(req->usrpri))
  1603.     job.pri--; 
  1604. job.state = (req->tts > now) ?
  1605.     FaxRequest::state_sleeping : FaxRequest::state_ready;
  1606. updateRequest(*req, job); // update on-disk status
  1607. if (!job.suspendPending) {
  1608.     if (job.isOnList()) job.remove(); // remove from active list
  1609.     if (req->tts > now) {
  1610. traceQueue(job, "SEND INCOMPLETE: requeue for %s; %s",
  1611.     (const char*)strTime(req->tts - now), (const char*)req->notice);
  1612. setSleep(job, req->tts);
  1613. Trigger::post(Trigger::SEND_REQUEUE, job);
  1614. if (job.getJCI().getNotify() != -1) {
  1615.     if (job.getJCI().isNotify(FaxRequest::when_requeued))
  1616. notifySender(job, Job::requeued);
  1617. } else
  1618.     if (req->isNotify(FaxRequest::when_requeued))
  1619. notifySender(job, Job::requeued);
  1620.     } else {
  1621. traceQueue(job, "SEND INCOMPLETE: retry immediately; %s",
  1622.     (const char*)req->notice);
  1623. setReadyToRun(job, false); // NB: job.tts will be <= now
  1624.     }
  1625. } else // signal waiting co-thread
  1626.     job.suspendPending = false;
  1627. delete req; // implicit unlock of q file
  1628.     } else {
  1629. // NB: always notify client if job failed
  1630. if (req->status == send_failed) {
  1631.     job.state = FaxRequest::state_failed;
  1632.     deleteRequest(job, req, Job::failed, true, fmtTime(duration));
  1633. } else {
  1634.     job.state = FaxRequest::state_done;
  1635.     deleteRequest(job, req, Job::done, false, fmtTime(duration));
  1636. }
  1637. traceQueue(job, "SEND DONE: %s", (const char*)strTime(duration));
  1638. Trigger::post(Trigger::SEND_DONE, job);
  1639. setDead(job);
  1640.     }
  1641. }
  1642. /*
  1643.  * Job Queue Management Routines.
  1644.  */
  1645. /*
  1646.  * Begin the process to insert a job in the queue
  1647.  * of ready-to-run jobs.  We run JobControl, and when it's done
  1648.  * the job is placed on the ready-to-run queue.
  1649.  */
  1650. void
  1651. faxQueueApp::setReadyToRun(Job& job, bool wait)
  1652. {
  1653.     if (jobCtrlCmd.length()) {
  1654. const char *app[3];
  1655. app[0] = jobCtrlCmd;
  1656. app[1] = job.jobid;
  1657. app[2] = NULL;
  1658. traceJob(job, "CONTROL");
  1659. int pfd[2];
  1660. if (pipe(pfd) >= 0) {
  1661.     pid_t pid = fork();
  1662.     switch (pid) {
  1663.     case -1: // error - continue with no JCI
  1664. jobError(job, "JOB CONTROL: fork: %m");
  1665. Sys::close(pfd[1]);
  1666. // When fork fails we need to run ctrlJobDone, since there
  1667. // will be no child signal to start it.
  1668. ctrlJobDone(job, -1);
  1669. break;
  1670.     case 0: // child, exec command
  1671. if (pfd[1] != STDOUT_FILENO)
  1672.     dup2(pfd[1], STDOUT_FILENO);
  1673. closeAllBut(STDOUT_FILENO);
  1674. traceQueue(job, "JOB CONTROL: %s %s", app[0], app[1]);
  1675. Sys::execv(app[0], (char * const*)app);
  1676. sleep(1); // XXX give parent time to catch signal
  1677. traceQueue(job, "JOB CONTROL: failed to exec: %m");
  1678. _exit(255);
  1679. /*NOTREACHED*/
  1680.     default: // parent, read from pipe and wait
  1681. {
  1682.     Sys::close(pfd[1]);
  1683.     int estat = -1;
  1684.     char data[1024];
  1685.     int n;
  1686.     fxStr buf;
  1687.     while ((n = Sys::read(pfd[0], data, sizeof(data))) > 0) {
  1688. buf.append(data, n);
  1689.     }
  1690.     Sys::close(pfd[0]);
  1691.     job.jci = new JobControlInfo(buf);
  1692.                     (void) Sys::waitpid(pid, estat);
  1693.     /*
  1694.      * JobControl modification of job priority must be
  1695.      * handled before ctrlJobDone, as that's where the
  1696.      * job is placed into runq based on the priority.
  1697.      */
  1698.     if (job.getJCI().getPriority() != -1) {
  1699. job.pri = job.getJCI().getPriority();
  1700.     }
  1701.     ctrlJobDone(job, estat);
  1702. }
  1703. break;
  1704.     }
  1705. } else {
  1706.     // If our pipe fails, we can't run the child, but we still
  1707.     // Need ctrlJobDone to be called to proceed this job
  1708.     ctrlJobDone(job, -1);
  1709. }
  1710.     } else {
  1711.      ctrlJobDone(job, 0);
  1712.     }
  1713. }
  1714. /*
  1715.  * Insert the job into the runq.  We have finished
  1716.  * all the JobControl execution
  1717.  */
  1718. void
  1719. faxQueueApp::ctrlJobDone(Job& job, int status)
  1720. {
  1721.     if (status) {
  1722. logError("JOB %s: bad exit status %#x from sub-fork",
  1723.     (const char*) job.jobid, status);
  1724.     }
  1725.     blockSignals();
  1726.     JobIter iter(runqs[JOBHASH(job.pri)]);
  1727.     for (; iter.notDone() && (iter.job().pri < job.pri || 
  1728.       (iter.job().pri == job.pri && iter.job().tts <= job.tts)); iter++)
  1729. ;
  1730.     job.state = FaxRequest::state_ready;
  1731.     job.insert(iter.job());
  1732.     job.pid = 0;
  1733.     releaseSignals();
  1734.     traceJob(job, "READY");
  1735.     Trigger::post(Trigger::JOB_READY, job);
  1736. }
  1737. /*
  1738.  * Place a job on the queue of jobs waiting to run
  1739.  * and start the associated timer.
  1740.  */
  1741. void
  1742. faxQueueApp::setSleep(Job& job, time_t tts)
  1743. {
  1744.     blockSignals();
  1745.     JobIter iter(sleepq);
  1746.     for (; iter.notDone() && iter.job().tts <= tts; iter++)
  1747. ;
  1748.     job.insert(iter.job());
  1749.     job.startTTSTimer(tts);
  1750.     releaseSignals();
  1751.     traceJob(job, "SLEEP FOR %s", (const char*)strTime(tts - Sys::now()));
  1752.     Trigger::post(Trigger::JOB_SLEEP, job);
  1753. }
  1754. /*
  1755.  * Process a job that's finished.  The corpse gets placed
  1756.  * on the deadq and is reaped the next time the scheduler
  1757.  * runs.  If any jobs are blocked waiting for this job to
  1758.  * complete, one is made ready to run.
  1759.  */
  1760. void
  1761. faxQueueApp::setDead(Job& job)
  1762. {
  1763.     job.stopTTSTimer();
  1764.     if (job.state != FaxRequest::state_done 
  1765.       && job.state != FaxRequest::state_failed)
  1766. job.state = FaxRequest::state_failed;
  1767.     job.suspendPending = false;
  1768.     traceJob(job, "DEAD");
  1769.     Trigger::post(Trigger::JOB_DEAD, job);
  1770.     removeDestInfoJob(job);
  1771.     if (job.isOnList()) // lazy remove from active list
  1772. job.remove();
  1773.     job.insert(*deadq.next); // setup job corpus for reaping
  1774.     if (job.modem) // called from many places
  1775. releaseModem(job);
  1776.     pokeScheduler();
  1777. }
  1778. /*
  1779.  * Place a job on the list of jobs actively being processed.
  1780.  */
  1781. void
  1782. faxQueueApp::setActive(Job& job)
  1783. {
  1784.     job.state = FaxRequest::state_active;
  1785.     traceJob(job, "ACTIVE");
  1786.     Trigger::post(Trigger::JOB_ACTIVE, job);
  1787.     job.insert(*activeq.next);
  1788. }
  1789. /*
  1790.  * Place a job on the list of jobs not being scheduled.
  1791.  */
  1792. void
  1793. faxQueueApp::setSuspend(Job& job)
  1794. {
  1795.     job.state = FaxRequest::state_suspended;
  1796.     traceJob(job, "SUSPEND");
  1797.     Trigger::post(Trigger::JOB_SUSPEND, job);
  1798.     job.insert(*suspendq.next);
  1799. }
  1800. /*
  1801.  * Create a new job entry and place them on the
  1802.  * appropriate queue.  A kill timer is also setup
  1803.  * for the job.
  1804.  */
  1805. bool
  1806. faxQueueApp::submitJob(FaxRequest& req, bool checkState)
  1807. {
  1808.     Job* job = new Job(req);
  1809.     traceJob(*job, "CREATE");
  1810.     Trigger::post(Trigger::JOB_CREATE, *job);
  1811.     return (submitJob(*job, req, checkState));
  1812. }
  1813. bool
  1814. faxQueueApp::submitJob(Job& job, FaxRequest& req, bool checkState)
  1815. {
  1816.     /*
  1817.      * Check various submission parameters.  We setup the
  1818.      * canonical version of the destination phone number
  1819.      * first so that any rejections that cause the notification
  1820.      * script to be run will return a proper value for the
  1821.      * destination phone number.
  1822.      */
  1823.     job.dest = canonicalizePhoneNumber(req.number);
  1824.     if (job.dest == "") {
  1825. if (req.external == "") // NB: for notification logic
  1826.     req.external = req.number;
  1827. rejectSubmission(job, req,
  1828.     "REJECT: Unable to convert dial string to canonical format {E327}");
  1829. return (false);
  1830.     }
  1831.     req.canonical = job.dest;
  1832.     time_t now = Sys::now();
  1833.     if (req.killtime <= now) {
  1834. timeoutJob(job, req);
  1835. return (false);
  1836.     }
  1837.     if (!Modem::modemExists(req.modem, true) && !ModemGroup::find(req.modem)) {
  1838. rejectSubmission(job, req,
  1839.     "REJECT: Requested modem " | req.modem | " is not registered {E328}");
  1840. return (false);
  1841.     }
  1842.     if (req.items.length() == 0) {
  1843. rejectSubmission(job, req, "REJECT: No work found in job file {E329}");
  1844. return (false);
  1845.     }
  1846.     if (req.pagewidth > 303) {
  1847. rejectSubmission(job, req,
  1848.     fxStr::format("REJECT: Page width (%u) appears invalid {E330}",
  1849. req.pagewidth));
  1850. return (false);
  1851.     }
  1852.     /*
  1853.      * Verify the killtime is ``reasonable''; otherwise
  1854.      * select (through the Dispatcher) may be given a
  1855.      * crazy time value, potentially causing problems.
  1856.      */
  1857.     if (req.killtime-now > 365*24*60*60) { // XXX should be based on tts
  1858. rejectSubmission(job, req,
  1859.     fxStr::format("REJECT: Job expiration time (%u) appears invalid {E331}",
  1860. req.killtime));
  1861. return (false);
  1862.     }
  1863.     if (checkState) {
  1864. /*
  1865.  * Check the state from queue file and if
  1866.  * it indicates the job was not being
  1867.  * scheduled before then don't schedule it
  1868.  * now.  This is used when the scheduler
  1869.  * is restarted and reading the queue for
  1870.  * the first time.
  1871.  *
  1872.  * NB: We reschedule blocked jobs in case
  1873.  *     the job that was previously blocking
  1874.  *     it was removed somehow.
  1875.  */
  1876. switch (req.state) {
  1877. case FaxRequest::state_suspended:
  1878.     setSuspend(job);
  1879.     return (true);
  1880. case FaxRequest::state_done:
  1881. case FaxRequest::state_failed:
  1882.     setDead(job);
  1883.     return (true);
  1884. }
  1885.     }
  1886.     if (req.useccover && (req.serverdocover || req.skippedpages || req.skippages) && contCoverPageTemplate != "") {
  1887. /*
  1888.  * The user submitted a job with "skipped" pages.  This equates
  1889.  * to a user-initiated resubmission.  Add a continuation coverpage
  1890.  * if appropriate.
  1891.  */
  1892. req.cover = docDir | "/cover" | req.jobid | ".ps";
  1893.     }
  1894.     /*
  1895.      * Put the job on the appropriate queue
  1896.      * and start the job kill timer.
  1897.      */
  1898.     if (req.tts > now) { // scheduled for future
  1899. /*
  1900.  * Check time-to-send as for killtime above.
  1901.  */
  1902. if (req.tts - now > 365*24*60*60) {
  1903.     rejectSubmission(job, req,
  1904. fxStr::format("REJECT: Time-to-send (%u) appears invalid {E332}",
  1905.     req.tts));
  1906.     return (false);
  1907. }
  1908. job.startKillTimer(req.killtime);
  1909. job.state = FaxRequest::state_pending;
  1910. setSleep(job, job.tts);
  1911.     } else { // ready to go now
  1912. job.startKillTimer(req.killtime);
  1913. setReadyToRun(job, true);
  1914. pokeScheduler();
  1915.     }
  1916.     updateRequest(req, job);
  1917.     return (true);
  1918. }
  1919. /*
  1920.  * Reject a job submission.
  1921.  */
  1922. void
  1923. faxQueueApp::rejectSubmission(Job& job, FaxRequest& req, const fxStr& reason)
  1924. {
  1925.     Trigger::post(Trigger::JOB_REJECT, job);
  1926.     req.status = send_failed;
  1927.     req.notice = reason;
  1928.     traceServer("JOB %s: ", (const char*)job.jobid, (const char*)reason);
  1929.     deleteRequest(job, req, Job::rejected, true);
  1930.     setDead(job); // dispose of job
  1931. }
  1932. /*
  1933.  * Suspend a job by removing it from whatever
  1934.  * queue it's currently on and/or stopping any
  1935.  * timers.  If the job has an active subprocess
  1936.  * then the process is optionally sent a signal
  1937.  * and we wait for the process to stop before
  1938.  * returning to the caller.
  1939.  */
  1940. bool
  1941. faxQueueApp::suspendJob(Job& job, bool abortActive)
  1942. {
  1943.     if (job.suspendPending) // already being suspended
  1944. return (false);
  1945.     switch (job.state) {
  1946.     case FaxRequest::state_active:
  1947. /*
  1948.  * Job is being handled by a subprocess; optionally
  1949.  * signal the process and wait for it to terminate
  1950.  * before returning.  We disable the kill timer so
  1951.  * that if it goes off while we wait for the process
  1952.  * to terminate the process completion work will not
  1953.  * mistakenly terminate the job (see sendJobDone).
  1954.  */
  1955. job.suspendPending = true; // mark thread waiting
  1956. if (abortActive)
  1957.     (void) kill(job.pid, SIGTERM); // signal subprocess
  1958. job.stopKillTimer();
  1959. while (job.suspendPending) // wait for subprocess to exit
  1960.     Dispatcher::instance().dispatch();
  1961. /*
  1962.  * Recheck the job state; it may have changed while
  1963.  * we were waiting for the subprocess to terminate.
  1964.  */
  1965. if (job.state != FaxRequest::state_done &&
  1966.   job.state != FaxRequest::state_failed)
  1967.     break;
  1968. /* fall thru... */
  1969.     case FaxRequest::state_done:
  1970.     case FaxRequest::state_failed:
  1971. return (false);
  1972.     case FaxRequest::state_sleeping:
  1973.     case FaxRequest::state_pending:
  1974. job.stopTTSTimer(); // cancel timeout
  1975. /* fall thru... */
  1976.     case FaxRequest::state_suspended:
  1977.     case FaxRequest::state_ready:
  1978. break;
  1979.     case FaxRequest::state_blocked:
  1980. /*
  1981.  * Decrement the count of job blocked to
  1982.  * to the same destination.
  1983.  */
  1984. destJobs[job.dest].unblock(job);
  1985. break;
  1986.     }
  1987.     /*
  1988.      * We must remove any DestInfo stuff this is recorded in
  1989.      * When the job is resubmitted (or killed), we don't know
  1990.      * when (could be hours/never), or even if the dest number
  1991.      * will be the same
  1992.      */
  1993.     removeDestInfoJob(job);
  1994.     if (job.isOnList()) job.remove(); // remove from old queue
  1995.     job.stopKillTimer(); // clear kill timer
  1996.     return (true);
  1997. }
  1998. /*
  1999.  * Suspend a job and place it on the suspend queue.
  2000.  * If the job is currently active then we wait for
  2001.  * it to reach a state where it can be safely suspended.
  2002.  * This control is used by clients that want to modify
  2003.  * the state of a job (i.e. suspend, modify, submit).
  2004.  */
  2005. bool
  2006. faxQueueApp::suspendJob(const fxStr& jobid, bool abortActive)
  2007. {
  2008.     Job* job = Job::getJobByID(jobid);
  2009.     if (job && suspendJob(*job, abortActive)) {
  2010. setSuspend(*job);
  2011. FaxRequest* req = readRequest(*job);
  2012. if (req) {
  2013.     updateRequest(*req, *job);
  2014.     delete req;
  2015. }
  2016. return (true);
  2017.     } else
  2018. return (false);
  2019. }
  2020. /*
  2021.  * Terminate a job in response to a command message.
  2022.  * If the job is currently running the subprocess is
  2023.  * sent a signal to request that it abort whatever
  2024.  * it's doing and we wait for the process to terminate.
  2025.  * Otherwise, the job is immediately removed from
  2026.  * the appropriate queue and any associated resources
  2027.  * are purged.
  2028.  */
  2029. bool
  2030. faxQueueApp::terminateJob(const fxStr& jobid, JobStatus why)
  2031. {
  2032.     Job* job = Job::getJobByID(jobid);
  2033.     if (job && suspendJob(*job, true)) {
  2034. job->state = FaxRequest::state_failed;
  2035. Trigger::post(Trigger::JOB_KILL, *job);
  2036. FaxRequest* req = readRequest(*job);
  2037. if (req) {
  2038.     req->notice = "Job aborted by request {E345}";
  2039.     deleteRequest(*job, req, why, why != Job::removed);
  2040. }
  2041. setDead(*job);
  2042. return (true);
  2043.     } else
  2044. return (false);
  2045. }
  2046. /*
  2047.  * Reject a job at some time before it's handed off to the server thread.
  2048.  */
  2049. void
  2050. faxQueueApp::rejectJob(Job& job, FaxRequest& req, const fxStr& reason)
  2051. {
  2052.     req.status = send_failed;
  2053.     req.notice = reason;
  2054.     traceServer("JOB %s: %s",
  2055.     (const char*)job.jobid, (const char*)reason);
  2056.     job.state = FaxRequest::state_failed;
  2057.     Trigger::post(Trigger::JOB_REJECT, job);
  2058.     setDead(job); // dispose of job
  2059. }
  2060. /*
  2061.  * Deal with a job that's blocked by a concurrent call.
  2062.  */
  2063. void
  2064. faxQueueApp::blockJob(Job& job, FaxRequest& req, const char* mesg)
  2065. {
  2066.     int old_state = job.state;
  2067.     job.state = FaxRequest::state_blocked;
  2068.     req.notice = mesg;
  2069.     updateRequest(req, job);
  2070.     traceQueue(job, "%s", mesg);
  2071.     if (old_state != FaxRequest::state_blocked) {
  2072. if (job.getJCI().getNotify() != -1) {
  2073.     if (job.getJCI().isNotify(FaxRequest::when_requeued))
  2074. notifySender(job, Job::blocked);
  2075. } else
  2076.     if (req.isNotify(FaxRequest::when_requeued))
  2077. notifySender(job, Job::blocked);
  2078.     }
  2079.     Trigger::post(Trigger::JOB_BLOCKED, job);
  2080. }
  2081. /*
  2082.  * Requeue a job that's delayed for some reason.
  2083.  */
  2084. void
  2085. faxQueueApp::delayJob(Job& job, FaxRequest& req, const char* mesg, time_t tts)
  2086. {
  2087.     job.state = FaxRequest::state_sleeping;
  2088.     fxStr reason(mesg);
  2089.     job.tts = tts;
  2090.     req.tts = tts;
  2091.     time_t delay = tts - Sys::now();
  2092.     req.notice = reason;
  2093.     updateRequest(req, job);
  2094.     traceQueue(job, "%s: requeue for %s",
  2095.     (const char*)mesg, (const char*)strTime(delay));
  2096.     setSleep(job, tts);
  2097.     Trigger::post(Trigger::JOB_DELAYED, job);
  2098.     if (job.getJCI().getNotify() != -1) {
  2099. if (job.getJCI().isNotify(FaxRequest::when_requeued))
  2100.     notifySender(job, Job::requeued); 
  2101.     } else
  2102. if (req.isNotify(FaxRequest::when_requeued))
  2103.     notifySender(job, Job::requeued); 
  2104.     if (job.modem != NULL)
  2105. releaseModem(job);
  2106. }
  2107. void
  2108. faxQueueApp::queueAccounting(Job& job, FaxRequest& req, const char* type)
  2109. {
  2110.     FaxAcctInfo ai;
  2111.     ai.jobid = (const char*) req.jobid;
  2112.     ai.jobtag = (const char*) req.jobtag;
  2113.     ai.user = (const char*) req.mailaddr;
  2114.     ai.start = Sys::now();
  2115.     ai.duration = 0;
  2116.     ai.conntime = 0;
  2117.     ai.commid = "";
  2118.     ai.device = "";
  2119.     ai.dest = (const char*) req.external;
  2120.     ai.csi = "";
  2121.     ai.npages = req.npages;
  2122.     ai.params = 0;
  2123.     if (req.status == send_done)
  2124. ai.status = "";
  2125.     else {
  2126. ai.status = req.notice;
  2127.     }
  2128.     if (strstr(type, "SUBMIT"))
  2129. ai.status = "Submitted";
  2130.     CallID empty_callid;
  2131.     ai.callid = empty_callid;
  2132.     ai.owner = (const char*) req.owner;
  2133.     ai.faxdcs = "";
  2134.     ai.jobinfo = fxStr::format("%u/%u/%u/%u/%u/%u/%u", 
  2135. req.totpages, req.ntries, req.ndials, req.totdials, req.maxdials, req.tottries, req.maxtries);
  2136.     pid_t pid = fork();
  2137.     switch (pid) {
  2138. case -1: // error
  2139.     if (!ai.record(type))
  2140. logError("Error writing %s accounting record, dest=%s",
  2141.     type, (const char*) ai.dest);
  2142.     break;
  2143. case 0: // child
  2144.     if (!ai.record(type))
  2145. logError("Error writing %s accounting record, dest=%s",
  2146.     type, (const char*) ai.dest);
  2147.     _exit(255);
  2148.     /*NOTREACHED*/
  2149. default: // parent
  2150.     Dispatcher::instance().startChild(pid, this);
  2151.     break;
  2152.     }
  2153. }
  2154. /*
  2155.  * Process the job who's kill time expires.  The job is
  2156.  * terminated unless it is currently being tried, in which
  2157.  * case it's marked for termination after the attempt is
  2158.  * completed.
  2159.  */
  2160. void
  2161. faxQueueApp::timeoutJob(Job& job)
  2162. {
  2163.     traceQueue(job, "KILL TIME EXPIRED");
  2164.     Trigger::post(Trigger::JOB_TIMEDOUT, job);
  2165.     if (job.state != FaxRequest::state_active) {
  2166. if (job.isOnList())
  2167.     job.remove(); // i.e. remove from sleep queue
  2168. job.state = FaxRequest::state_failed;
  2169. FaxRequest* req = readRequest(job);
  2170. if (req) {
  2171.     req->notice = "Kill time expired {E325}";
  2172.     deleteRequest(job, req, Job::timedout, true);
  2173. }
  2174. setDead(job);
  2175.     } else
  2176. job.killtime = 0; // mark job to be removed
  2177. }
  2178. /*
  2179.  * Like above, but called for a job that times
  2180.  * out at the point at which it is submitted (e.g.
  2181.  * after the server is restarted).  The work here
  2182.  * is subtley different; the q file must not be
  2183.  * re-read because it may result in recursive flock
  2184.  * calls which on some systems may cause deadlock
  2185.  * (systems that emulate flock with lockf do not
  2186.  * properly emulate flock).
  2187.  */
  2188. void
  2189. faxQueueApp::timeoutJob(Job& job, FaxRequest& req)
  2190. {
  2191.     job.state = FaxRequest::state_failed;
  2192.     traceQueue(job, "KILL TIME EXPIRED");
  2193.     Trigger::post(Trigger::JOB_TIMEDOUT, job);
  2194.     req.notice = "Kill time expired {E325}";
  2195.     deleteRequest(job, req, Job::timedout, true);
  2196.     setDead(job);
  2197. }
  2198. /*
  2199.  * Resubmit an existing job or create a new job
  2200.  * using the specified job description file.
  2201.  */
  2202. bool
  2203. faxQueueApp::submitJob(const fxStr& jobid, bool checkState, bool nascent)
  2204. {
  2205.     Job* job = Job::getJobByID(jobid);
  2206.     if (job) {
  2207. bool ok = false;
  2208. if (job->state == FaxRequest::state_suspended) {
  2209.     job->remove(); // remove from suspend queue
  2210.     FaxRequest* req = readRequest(*job);// XXX need better mechanism
  2211.     if (req) {
  2212. job->update(*req); // update job state from file
  2213. ok = submitJob(*job, *req); // resubmit to scheduler
  2214. delete req; // NB: unlock qfile
  2215.     } else
  2216. setDead(*job); // XXX???
  2217. } else if (job->state == FaxRequest::state_done ||
  2218.   job->state == FaxRequest::state_failed)
  2219.     jobError(*job, "Cannot resubmit a completed job");
  2220. else
  2221.     ok = true; // other, nothing to do
  2222. return (ok);
  2223.     }
  2224.     /*
  2225.      * Create a job from a queue file and add it
  2226.      * to the scheduling queues.
  2227.      */
  2228.     fxStr filename(FAX_SENDDIR "/" FAX_QFILEPREF | jobid);
  2229.     if (!Sys::isRegularFile(filename)) {
  2230. logError("JOB %s: qfile %s is not a regular file.",
  2231.     (const char*) jobid, (const char*) filename);
  2232. return (false);
  2233.     }
  2234.     bool status = false;
  2235.     int fd = Sys::open(filename, O_RDWR);
  2236.     if (fd >= 0) {
  2237. if (flock(fd, LOCK_SH) >= 0) {
  2238.     FaxRequest req(filename, fd);
  2239.     /*
  2240.      * There are four possibilities:
  2241.      *
  2242.      * 1. The queue file was read properly and the job
  2243.      *    can be submitted.
  2244.      * 2. There were problems reading the file, but
  2245.      *    enough information was obtained to purge the
  2246.      *    job from the queue.
  2247.      * 3. The job was previously submitted and completed
  2248.      *    (either with success or failure).
  2249.      * 4. Insufficient information was obtained to purge
  2250.      *    the job; just skip it.
  2251.      */
  2252.     bool reject;
  2253.     if (req.readQFile(reject) && !reject &&
  2254.       req.state != FaxRequest::state_done &&
  2255.       req.state != FaxRequest::state_failed) {
  2256. status = submitJob(req, checkState);
  2257. if (nascent) queueAccounting(*job, req, "SUBMIT");
  2258.     } else if (reject) {
  2259. Job job(req);
  2260. job.state = FaxRequest::state_failed;
  2261. req.status = send_failed;
  2262. req.notice = "Invalid or corrupted job description file {E326}";
  2263. traceServer("JOB %s : %s", (const char*)jobid, (const char*) req.notice);
  2264. // NB: this may not work, but we try...
  2265. deleteRequest(job, req, Job::rejected, true);
  2266.     } else if (req.state == FaxRequest::state_done ||
  2267.       req.state == FaxRequest::state_failed) {
  2268. logError("JOB %s: Cannot resubmit a completed job",
  2269.     (const char*) jobid);
  2270.     } else
  2271. traceServer("%s: Unable to purge job, ignoring it",
  2272. (const char*)filename);
  2273. } else
  2274.     logError("JOB %s: Could not lock job file; %m.",
  2275. (const char*) jobid);
  2276. Sys::close(fd);
  2277.     } else
  2278. logError("JOB %s: Could not open job file; %m.", (const char*) jobid);
  2279.     return (status);
  2280. }
  2281. /*
  2282.  * Process the expiration of a job's time-to-send timer.
  2283.  * The job is moved to the ready-to-run queues and the
  2284.  * scheduler is poked.
  2285.  */
  2286. void
  2287. faxQueueApp::runJob(Job& job)
  2288. {
  2289.     if (job.state != FaxRequest::state_failed) { // don't run a dead job corpus
  2290. if (job.isOnList()) job.remove();
  2291. setReadyToRun(job, true);
  2292. FaxRequest* req = readRequest(job);
  2293. if (req) {
  2294.     updateRequest(*req, job);
  2295.     delete req;
  2296. }
  2297.     }
  2298.     /*
  2299.      * In order to deliberately batch jobs by using a common
  2300.      * time-to-send we need to give time for the other jobs'
  2301.      * timers to expire and to enter the run queue before
  2302.      * running the scheduler.  Thus the scheduler is poked
  2303.      * with a delay.
  2304.      */
  2305.     pokeScheduler(1);
  2306. }
  2307. /*
  2308.  * Process the DestInfo job-block list
  2309.  * for this job.  If the job is active and blocking other
  2310.  * jobs, we need to unblock...
  2311.  */
  2312. #define isOKToCall(di, dci, n) 
  2313.     (di.getCalls()+n <= dci.getMaxConcurrentCalls())
  2314. void
  2315. faxQueueApp::unblockDestJobs(DestInfo& di)
  2316. {
  2317.     /*
  2318.      * Check if there are blocked jobs waiting to run
  2319.      * and that there is now room to run one.  If so,
  2320.      * take jobs off the blocked queue and make them
  2321.      * ready for processing.
  2322.      */
  2323.     Job* jb;
  2324.     u_int n = 1, b = 1;
  2325.     while ((jb = di.nextBlocked())) {
  2326. if (isOKToCall(di, jb->getJCI(), n)) {
  2327.     FaxRequest* req = readRequest(*jb);
  2328.     if (!req) {
  2329. setDead(*jb);
  2330. continue;
  2331.     }
  2332.     setReadyToRun(*jb, false);
  2333.     if (!di.supportsBatching()) n++;
  2334.     else if (++b > maxBatchJobs) {
  2335. n++;
  2336. b -= maxBatchJobs;
  2337.     }
  2338.     req->notice = "";
  2339.     updateRequest(*req, *jb);
  2340.     delete req;
  2341.     /*
  2342.      * We check isOKToCall again here now to avoid di.nextBlocked
  2343.      * which would pull jb from the blocked list and then possibly
  2344.      * require us to re-block it.
  2345.      */
  2346.     if (di.getBlocked() && !isOKToCall(di, jb->getJCI(), n)) {
  2347. traceQueue("Continue BLOCK on %d job(s) to %s, current calls: %d, max concurrent calls: %d", 
  2348.     di.getBlocked(), (const char*) jb->dest, di.getCalls()+n-1, jb->getJCI().getMaxConcurrentCalls());
  2349. break;
  2350.     }
  2351. } else {
  2352.     /*
  2353.      * unblockDestJobs was called, but a new
  2354.      * call cannot be placed.  This would be
  2355.      * unusual, but because di.nextBlocked
  2356.      * removed jb from the di list, we need
  2357.      * to put it back.
  2358.      */
  2359.     di.block(*jb);
  2360.     traceQueue("Continue BLOCK on %d job(s) to %s, current calls: %d, max concurrent calls: %d", 
  2361. di.getBlocked(), (const char*) jb->dest, di.getCalls()+n-1, jb->getJCI().getMaxConcurrentCalls());
  2362.     break;
  2363. }
  2364.     }
  2365. }
  2366. void
  2367. faxQueueApp::removeDestInfoJob(Job& job)
  2368. {
  2369.     DestInfo& di = destJobs[job.dest];
  2370.     di.done(job); // remove from active destination list
  2371.     di.updateConfig(); // update file if something changed
  2372.     if (di.isEmpty()) {
  2373. /*
  2374.  * This is the last job to the destination; purge
  2375.  * the entry from the destination jobs database.
  2376.  */
  2377. destJobs.remove(job.dest);
  2378.     }
  2379. }
  2380. /*
  2381.  * Compare two job requests to each other and to a selected
  2382.  * job to see if they can be batched together.
  2383.  */
  2384. bool
  2385. faxQueueApp::areBatchable(FaxRequest& reqa, FaxRequest& reqb, Job& job, Job& cjob)
  2386. {
  2387.     // make sure the job's modem is in the requested ModemGroup 
  2388.     if (!job.modem->isInGroup(reqb.modem)) return(false);
  2389.     // make sure cjob's TimeOfDay is for now
  2390.     time_t now = Sys::now();
  2391.     if ((cjob.getJCI().nextTimeToSend(now) != now) || (cjob.tod.nextTimeOfDay(now) != now)) return (false);
  2392.     return(true);
  2393. }
  2394. /*
  2395.  * Scan the list of jobs and process those that are ready
  2396.  * to go.  Note that the scheduler should only ever be
  2397.  * invoked from the dispatcher via a timeout.  This way we
  2398.  * can be certain there are no active contexts holding
  2399.  * references to job corpses (or other data structures) that
  2400.  * we want to reap.  To invoke the scheduler the pokeScheduler
  2401.  * method should be called to setup an immediate timeout that
  2402.  * will cause the scheduler to be invoked from the dispatcher.
  2403.  */
  2404. void
  2405. faxQueueApp::runScheduler()
  2406. {
  2407.     /*
  2408.      * Terminate the server if there are no jobs currently
  2409.      * being processed.  We must be sure to wait for jobs
  2410.      * so that we can capture exit status from subprocesses
  2411.      * and so that any locks held on behalf of outbound jobs
  2412.      * do not appear to be stale (since they are held by this
  2413.      * process).
  2414.      */
  2415.     if (quit && activeq.next == &activeq) {
  2416. close();
  2417. return;
  2418.     }
  2419.     fxAssert(inSchedule == false, "Scheduler running twice");
  2420.     inSchedule = true;
  2421.     /*
  2422.      * Reread the configuration file if it has been
  2423.      * changed.  We do this before each scheduler run
  2424.      * since we are a long-running process and it should
  2425.      * not be necessary to restart the process to have
  2426.      * config file changes take effect.
  2427.      */
  2428.     (void) updateConfig(configFile);
  2429.     /*
  2430.      * Scan the job queue and locate a compatible modem to
  2431.      * use in processing the job.  Doing things in this order
  2432.      * insures the highest priority job is always processed
  2433.      * first.
  2434.      */
  2435.     blockSignals();
  2436.     if (! quit) {
  2437. for (u_int i = 0; i < NQHASH; i++) {
  2438.     for (JobIter iter(runqs[i]); iter.notDone(); iter++) {
  2439. Job& job = iter;
  2440. if (job.bprev != NULL) {
  2441.     /*
  2442.      * The batching sub-loop below already allocated this job to a batch.
  2443.      * Thus, this loop's copy of the run queue is incorrect.
  2444.      */
  2445.     pokeScheduler();
  2446.     break;
  2447. }
  2448. fxAssert(job.modem == NULL, "Job on run queue holding modem");
  2449. /*
  2450.  * Read the on-disk job state and process the job.
  2451.  * Doing all the processing below each time the job
  2452.  * is considered for processing could be avoided by
  2453.  * doing it only after assigning a modem but that
  2454.  * would potentially cause the order of dispatch
  2455.  * to be significantly different from the order
  2456.  * of submission; something some folks care about.
  2457.  */
  2458. traceJob(job, "PROCESS");
  2459. Trigger::post(Trigger::JOB_PROCESS, job);
  2460. FaxRequest* req = readRequest(job);
  2461. if (!req) { // problem reading job state on-disk
  2462.     setDead(job);
  2463.     continue;
  2464. }
  2465. time_t tts;
  2466. time_t now = Sys::now();
  2467. /*
  2468.  * A computer's clock can jump backwards.  For example, if 
  2469.  * the system runs ntp and regularly syncs the system clock
  2470.  * with some outside source it is possible that the local
  2471.  * clock will move backwards.  We cannot die, then, simply
  2472.  * because we find a job on the run queue that has a future 
  2473.  * tts.  The possibility exists that it is due to some 
  2474.  * adjustment in the system clock.
  2475.  */
  2476. if (job.tts > now) {
  2477.     traceJob(job, "WARNING: Job tts is %d seconds in the future.  Proceeding anyway.", job.tts - now);
  2478.     job.tts = now;
  2479. }
  2480. /*
  2481.  * Do per-destination processing and checking.
  2482.  */
  2483. DestInfo& di = destJobs[job.dest];
  2484. /*
  2485.  * Constrain the maximum number of times the phone
  2486.  * will be dialed and/or the number of attempts that
  2487.  * will be made (and reject jobs accordingly).
  2488.  */
  2489. u_short maxdials = fxmin((u_short) job.getJCI().getMaxDials(),req->maxdials);
  2490. if (req->totdials >= maxdials) {
  2491.     rejectJob(job, *req, fxStr::format(
  2492. "REJECT: Too many attempts to dial {E333}: %u, max %u",
  2493. req->totdials, maxdials));
  2494.     deleteRequest(job, req, Job::rejected, true);
  2495.     continue;
  2496. }
  2497. u_short maxtries = fxmin((u_short) job.getJCI().getMaxTries(),req->maxtries);
  2498. if (req->tottries >= maxtries) {
  2499.     rejectJob(job, *req, fxStr::format(
  2500. "REJECT: Too many attempts to transmit: %u, max %u {E334}",
  2501. req->tottries, maxtries));
  2502.     deleteRequest(job, req, Job::rejected, true);
  2503.     continue;
  2504. }
  2505. // NB: repeat this check so changes in max pages are applied
  2506. u_int maxpages = job.getJCI().getMaxSendPages();
  2507. if (req->totpages > maxpages) {
  2508.     rejectJob(job, *req, fxStr::format(
  2509. "REJECT: Too many pages in submission: %u, max %u {E335}",
  2510. req->totpages, maxpages));
  2511.     deleteRequest(job, req, Job::rejected, true);
  2512.     continue;
  2513. }
  2514. if (job.getJCI().getRejectNotice() != "") {
  2515.     /*
  2516.      * Calls to this destination are being rejected for
  2517.      * a specified reason that we return to the sender.
  2518.      */
  2519.     rejectJob(job, *req, "REJECT: " | job.getJCI().getRejectNotice());
  2520.     deleteRequest(job, req, Job::rejected, true);
  2521.     continue;
  2522. }
  2523. if (!isOKToCall(di, job.getJCI(), 1)) {
  2524.     /*
  2525.      * This job would exceed the max number of concurrent
  2526.      * calls that may be made to this destination.  Put it
  2527.      * on a ``blocked queue'' for the destination; the job
  2528.      * will be made ready to run when one of the existing
  2529.      * jobs terminates.
  2530.      */
  2531.     blockJob(job, *req, "Blocked by concurrent calls {E337}");
  2532.     if (job.isOnList()) job.remove(); // remove from run queue
  2533.     di.block(job); // place at tail of di queue, honors job priority
  2534.     delete req;
  2535. } else if (((tts = job.getJCI().nextTimeToSend(now)) != now) || ((tts = job.tod.nextTimeOfDay(now)) != now)) {
  2536.     /*
  2537.      * This job may not be started now because of time-of-day
  2538.      * restrictions.  Reschedule it for the next possible time.
  2539.      */
  2540.     if (job.isOnList()) job.remove(); // remove from run queue
  2541.     delayJob(job, *req, "Delayed by time-of-day restrictions {E338}", tts);
  2542.     delete req;
  2543. } else if (staggerCalls && ((u_int) lastCall + staggerCalls) > (u_int) now) {
  2544.     /*
  2545.      * This job may not be started now because we last started
  2546.      * another job too recently and we're staggering jobs.
  2547.      * Reschedule it for the time when next okay.
  2548.      */
  2549.     if (job.isOnList()) job.remove();
  2550.     delayJob(job, *req, "Delayed by outbound call staggering {E339}", lastCall + staggerCalls);
  2551.     delete req;
  2552. } else if (assignModem(job, (allowIgnoreModemBusy && req->ignoremodembusy))) {
  2553.     lastCall = now;
  2554.     if (job.isOnList()) job.remove(); // remove from run queue
  2555.     job.breq = req;
  2556.     /*
  2557.      * We have a modem and have assigned it to the
  2558.      * job.  The job is not on any list; processJob
  2559.      * is responsible for requeing the job according
  2560.      * to the outcome of the work it does (which may
  2561.      * take place asynchronously in a sub-process).
  2562.      * Likewise the release of the assigned modem is
  2563.      * also assumed to take place asynchronously in
  2564.      * the context of the job's processing.
  2565.      */
  2566.     (void) di.getInfo(job.dest); // must read file for supportsBatching
  2567.     FaxMachineInfo info;
  2568.     if (di.supportsBatching() && maxBatchJobs > 1
  2569.      && (req->jobtype == "facsimile"
  2570.      || (req->jobtype == "pager" 
  2571.      && streq(info.getPagingProtocol(), "ixo")))) { 
  2572. // fax and IXO pages only for now
  2573. /*
  2574.  * The destination supports batching and batching is enabled.  
  2575.  * Continue down the queue and build an array of all processable 
  2576.  * jobs to this destination allowed on this modem which are not 
  2577.  * of a lesser priority than jobs to other destinations.
  2578.  */
  2579. unblockDestJobs(di);
  2580. /*
  2581.  * Since job files are passed to the send program as command-line
  2582.  * parameters, our batch size is limited by that number of
  2583.  * parameters.  64 should be a portable number.
  2584.  */
  2585. if (maxBatchJobs > 64) maxBatchJobs = 64;
  2586. Job* bjob = &job; // Last batched Job
  2587. Job* cjob = &job; // current Job
  2588. u_int batchedjobs = 1;
  2589. for (u_int j = 0; batchedjobs < maxBatchJobs && j < NQHASH; j++) {
  2590.     blockSignals();
  2591.     for (JobIter joblist(runqs[j]); batchedjobs < maxBatchJobs && joblist.notDone(); joblist++) {
  2592. if (joblist.job().dest != cjob->dest)
  2593.     continue;
  2594. cjob = joblist;
  2595. if (job.jobid == cjob->jobid)
  2596.     continue; // Skip the current job
  2597. fxAssert(cjob->tts <= Sys::now(), "Sleeping job on run queue");
  2598. fxAssert(cjob->modem == NULL, "Job on run queue holding modem");
  2599. FaxRequest* creq = readRequest(*cjob);
  2600. if (!areBatchable(*req, *creq, job, *cjob)) {
  2601.     delete creq;
  2602.     continue;
  2603. }
  2604. if (iter.notDone() && &iter.job() == bjob)
  2605.     iter++;
  2606. traceJob(job, "ADDING JOB %s TO BATCH", (const char*) cjob->jobid);
  2607. cjob->modem = job.modem;
  2608. cjob->remove();
  2609. bjob->bnext = cjob;
  2610. cjob->bprev = bjob;
  2611. bjob = cjob;
  2612. cjob->breq = creq;
  2613. batchedjobs++;
  2614.     }
  2615.     releaseSignals();
  2616. }
  2617. /*
  2618.  * Jobs that are on the sleep queue with state_sleeping
  2619.  * can be batched because the tts that the submitter requested
  2620.  * is known to have passed already.  So we pull these jobs out
  2621.  * of the sleep queue and batch them directly.
  2622.  */
  2623. blockSignals();
  2624. for (JobIter sleepiter(sleepq); batchedjobs < maxBatchJobs && sleepiter.notDone(); sleepiter++) {
  2625.     cjob = sleepiter;
  2626.     if (cjob->dest != job.dest || cjob->state != FaxRequest::state_sleeping)
  2627. continue;
  2628.     FaxRequest* creq = readRequest(*cjob);
  2629.     if (!(req && areBatchable(*req, *creq, job, *cjob))) {
  2630. delete creq;
  2631. continue;
  2632.     }
  2633.     traceJob(job, "ADDING JOB %s TO BATCH", (const char*) cjob->jobid);
  2634.     cjob->stopTTSTimer();
  2635.     cjob->tts = now;
  2636.     cjob->state = FaxRequest::state_ready;
  2637.     cjob->remove();
  2638.     cjob->modem = job.modem;
  2639.     bjob->bnext = cjob;
  2640.     cjob->bprev = bjob;
  2641.     cjob->breq = creq;
  2642.     bjob = cjob;
  2643.     // This job was batched from sleeping, things have
  2644.     // changed; Update the queue file for onlookers.
  2645.     creq->tts = now;
  2646.     updateRequest(*creq, *cjob);
  2647.     batchedjobs++;
  2648. }
  2649. bjob->bnext = NULL;
  2650. releaseSignals();
  2651.     } else
  2652. job.bnext = NULL;
  2653.     di.call(); // mark as called to correctly block other jobs
  2654.     processJob(job, req, di);
  2655. } else if (job.state == FaxRequest::state_failed) {
  2656.     rejectJob(job, *req, fxStr::format("REJECT: Modem is configured as exempt from accepting jobs {E336}"));
  2657.     deleteRequest(job, req, Job::rejected, true);
  2658.     continue;
  2659. } else // leave job on run queue
  2660.     delete req;
  2661.     }
  2662. }
  2663.     }
  2664.     /*
  2665.      * Reap dead jobs.
  2666.      */
  2667.     for (JobIter iter(deadq); iter.notDone(); iter++) {
  2668. Job* job = iter;
  2669. job->remove();
  2670. traceJob(*job, "DELETE");
  2671. Trigger::post(Trigger::JOB_REAP, *job);
  2672. delete job;
  2673.     }
  2674.     releaseSignals();
  2675.     /*
  2676.      * Reclaim resources associated with clients
  2677.      * that terminated without telling us.
  2678.      */
  2679.     HylaClient::purge(); // XXX maybe do this less often
  2680.     inSchedule = false;
  2681. }
  2682. bool
  2683. faxQueueApp::scheduling(void)
  2684. {
  2685.     return inSchedule;
  2686. }
  2687. /*
  2688.  * Attempt to assign a modem to a job.  If we are
  2689.  * unsuccessful and it was due to the modem being
  2690.  * locked for use by another program then we start
  2691.  * a thread to poll for the removal of the lock file;
  2692.  * this is necessary for send-only setups where we
  2693.  * do not get information about when modems are in
  2694.  * use from faxgetty processes.
  2695.  */
  2696. bool
  2697. faxQueueApp::assignModem(Job& job, bool ignorebusy)
  2698. {
  2699.     fxAssert(job.modem == NULL, "Assigning modem to job that already has one");
  2700.     bool retryModemLookup;
  2701.     do {
  2702. retryModemLookup = false;
  2703. Modem* modem = Modem::findModem(job, ignorebusy);
  2704. if (modem) {
  2705.     if (modem->getState() == Modem::EXEMPT) {
  2706. job.state = FaxRequest::state_failed;
  2707. return (false);
  2708.     }
  2709.     if (modem->assign(job, (modem->getState() == Modem::BUSY && ignorebusy))) {
  2710. Trigger::post(Trigger::MODEM_ASSIGN, *modem);
  2711. return (true);
  2712.     }
  2713.     /*
  2714.      * Modem could not be assigned to job.  The
  2715.      * modem is assumed to be ``removed'' from
  2716.      * the list of potential modems scanned by
  2717.      * findModem so we arrange to re-lookup a
  2718.      * suitable modem for this job.  (a goto would
  2719.      * be fine here but too many C++ compilers
  2720.      * can't handle jumping past the above code...)
  2721.      */
  2722.     traceJob(job, "Unable to assign modem %s (cannot lock)",
  2723. (const char*) modem->getDeviceID());
  2724.     modem->startLockPolling(pollLockWait);
  2725.     traceModem(*modem, "BUSY (begin polling)");
  2726.     retryModemLookup = true;
  2727. } else
  2728.     traceJob(job, "No assignable modem located");
  2729.     } while (retryModemLookup);
  2730.     return (false);
  2731. }
  2732. /*
  2733.  * Release a modem assigned to a job.  The scheduler
  2734.  * is prodded since doing this may permit something
  2735.  * else to be processed.
  2736.  */
  2737. void
  2738. faxQueueApp::releaseModem(Job& job)
  2739. {
  2740.     Trigger::post(Trigger::MODEM_RELEASE, *job.modem);
  2741.     job.modem->release();
  2742.     pokeScheduler();
  2743.     Job* cjob;
  2744.     for (cjob = &job; cjob != NULL; cjob = cjob->bnext) {
  2745. fxAssert(cjob->modem != NULL, "No assigned modem to release");
  2746. cjob->modem = NULL; // remove reference to modem
  2747.     }
  2748. }
  2749. /*
  2750.  * Poll to see if a modem's UUCP lock file is still
  2751.  * present.  If the lock has been removed then mark
  2752.  * the modem ready for use and poke the job scheduler
  2753.  * in case jobs were waiting for an available modem.
  2754.  * This work is only done when a modem is ``discovered''
  2755.  * to be in-use by an outbound process when operating
  2756.  * in a send-only environment (i.e. one w/o a faxgetty
  2757.  * process monitoring the state of each modem).
  2758.  */
  2759. void
  2760. faxQueueApp::pollForModemLock(Modem& modem)
  2761. {
  2762.     if (modem.lock->lock()) {
  2763. modem.release();
  2764. traceModem(modem, "READY (end polling)");
  2765. pokeScheduler();
  2766.     } else
  2767. modem.startLockPolling(pollLockWait);
  2768. }
  2769. /*
  2770.  * Set a timeout so that the job scheduler runs the
  2771.  * next time the dispatcher is invoked.
  2772.  */
  2773. void
  2774. faxQueueApp::pokeScheduler(u_short s)
  2775. {
  2776.     schedTimeout.start(s);
  2777. }
  2778. /*
  2779.  * Create a request instance and read the
  2780.  * associated queue file into it.
  2781.  */
  2782. FaxRequest*
  2783. faxQueueApp::readRequest(Job& job)
  2784. {
  2785.     int fd = Sys::open(job.file, O_RDWR);
  2786.     if (fd >= 0) {
  2787. if (flock(fd, LOCK_EX) >= 0) {
  2788.     FaxRequest* req = new FaxRequest(job.file, fd);
  2789.     bool reject;
  2790.     if (req->readQFile(reject) && !reject) {
  2791. if (req->external == "")
  2792.     req->external = job.dest;
  2793. return (req);
  2794.     }
  2795.     jobError(job, "Could not read job file");
  2796.     delete req;
  2797. } else
  2798.     jobError(job, "Could not lock job file: %m");
  2799. Sys::close(fd);
  2800.     } else {
  2801. // file might have been removed by another server
  2802. if (errno != ENOENT)
  2803.     jobError(job, "Could not open job file: %m");
  2804.     }
  2805.     return (NULL);
  2806. }
  2807. /*
  2808.  * Update the request instance with information
  2809.  * from the job structure and then write the
  2810.  * associated queue file.
  2811.  */
  2812. void
  2813. faxQueueApp::updateRequest(FaxRequest& req, Job& job)
  2814. {
  2815.     req.state = job.state;
  2816.     req.pri = job.pri;
  2817.     req.writeQFile();
  2818. }
  2819. /*
  2820.  * Delete a request and associated state.
  2821.  */
  2822. void
  2823. faxQueueApp::deleteRequest(Job& job, FaxRequest* req, JobStatus why,
  2824.     bool force, const char* duration)
  2825. {
  2826.     if (why != Job::done) queueAccounting(job, *req, "UNSENT");
  2827.     deleteRequest(job, *req, why, force, duration);
  2828.     delete req;
  2829. }
  2830. void
  2831. faxQueueApp::deleteRequest(Job& job, FaxRequest& req, JobStatus why,
  2832.     bool force, const char* duration)
  2833. {
  2834.     fxStr dest = FAX_DONEDIR |
  2835. req.qfile.tail(req.qfile.length() - (sizeof (FAX_SENDDIR)-1));
  2836.     /*
  2837.      * Move completed jobs to the doneq area where
  2838.      * they can be retrieved for a period of time;
  2839.      * after which they are either removed or archived.
  2840.      */
  2841.     if (Sys::rename(req.qfile, dest) >= 0) {
  2842. u_int i = 0;
  2843. /*
  2844.  * Remove entries for imaged documents and
  2845.  * delete/rename references to source documents
  2846.  * so the imaged versions can be expunged.
  2847.  */
  2848. while (i < req.items.length()) {
  2849.     FaxItem& fitem = req.items[i];
  2850.     if (fitem.op == FaxRequest::send_fax) {
  2851. req.renameSaved(i);
  2852. unrefDoc(fitem.item);
  2853. req.items.remove(i);
  2854.     } else
  2855. i++;
  2856. }
  2857. req.qfile = dest; // moved to doneq
  2858. job.file = req.qfile; // ...and track change
  2859. if (why == Job::done)
  2860.     req.state = FaxRequest::state_done; // job is definitely done
  2861. else
  2862.     req.state = FaxRequest::state_failed;// job is definitely done
  2863. req.pri = job.pri; // just in case someone cares
  2864. req.tts = Sys::now(); // mark job termination time
  2865. job.tts = req.tts;
  2866. req.writeQFile();
  2867. if (force) {
  2868.     notifySender(job, why, duration);
  2869. } else {
  2870.     if (job.getJCI().getNotify() != -1) {
  2871. if (job.getJCI().isNotify(FaxRequest::notify_any))
  2872.     notifySender(job, why, duration);
  2873.     } else
  2874. if (req.isNotify(FaxRequest::notify_any))
  2875.     notifySender(job, why, duration);
  2876. }
  2877.     } else {
  2878. /*
  2879.  * Move failed, probably because there's no
  2880.  * directory.  Treat the job the way we used
  2881.  * to: purge everything.  This avoids filling
  2882.  * the disk with stuff that'll not get removed;
  2883.  * except for a scavenger program.
  2884.  */
  2885. jobError(job, "rename to %s failed: %s",
  2886.     (const char*) dest, strerror(errno));
  2887. req.writeQFile();
  2888. if (force) {
  2889.     notifySender(job, why, duration);
  2890. } else {
  2891.     if (job.getJCI().getNotify() != -1) {
  2892. if (job.getJCI().isNotify(FaxRequest::notify_any))
  2893.     notifySender(job, why, duration);
  2894.     } else
  2895. if (req.isNotify(FaxRequest::notify_any))
  2896.     notifySender(job, why, duration);
  2897. }
  2898. u_int n = req.items.length();
  2899. for (u_int i = 0; i < n; i++) {
  2900.     const FaxItem& fitem = req.items[i];
  2901.     switch (fitem.op) {
  2902.     case FaxRequest::send_fax:
  2903. unrefDoc(fitem.item);
  2904. break;
  2905.     case FaxRequest::send_tiff:
  2906.     case FaxRequest::send_tiff_saved:
  2907.     case FaxRequest::send_pdf:
  2908.     case FaxRequest::send_pdf_saved:
  2909.     case FaxRequest::send_postscript:
  2910.     case FaxRequest::send_postscript_saved:
  2911.     case FaxRequest::send_pcl:
  2912.     case FaxRequest::send_pcl_saved:
  2913. Sys::unlink(fitem.item);
  2914. break;
  2915.     }
  2916. }
  2917. req.items.remove(0, n);
  2918. Sys::unlink(req.qfile);
  2919.     }
  2920. }
  2921. /*
  2922.  * FIFO-related support.
  2923.  */
  2924. /*
  2925.  * Open the requisite FIFO special files.
  2926.  */
  2927. void
  2928. faxQueueApp::openFIFOs()
  2929. {
  2930.     fifo = openFIFO(fifoName, 0600, true);
  2931.     Dispatcher::instance().link(fifo, Dispatcher::ReadMask, this);
  2932. }
  2933. void
  2934. faxQueueApp::closeFIFOs()
  2935. {
  2936.     Sys::close(fifo), fifo = -1;
  2937. }
  2938. int faxQueueApp::inputReady(int fd) { return FIFOInput(fd); }
  2939. /*
  2940.  * Process a message received through a FIFO.
  2941.  */
  2942. void
  2943. faxQueueApp::FIFOMessage(const char* cp)
  2944. {
  2945.     if (tracingLevel & FAXTRACE_FIFO)
  2946. logInfo("FIFO RECV "%s"", cp);
  2947.     if (cp[0] == '') {
  2948. logError("Empty FIFO message");
  2949. return;
  2950.     }
  2951.     const char* tp = strchr(++cp, ':');
  2952.     if (tp && (tp[1] != ''))
  2953. FIFOMessage(cp[-1], fxStr(cp,tp-cp), tp+1);
  2954.     else
  2955. FIFOMessage(cp[-1], fxStr::null, cp);
  2956. }
  2957. /*
  2958.  * Process a parsed FIFO message.
  2959.  *
  2960.  * If an application goes crazy, or if the FIFO overflows, then it's possible 
  2961.  * to see corrupt FIFO messages.  Thus, the previous parsing of the FIFO message
  2962.  * cannot be entirely trusted.  Here, "id" and "args" must be checked for size
  2963.  * before continued processing.  The downstream functions will need to make sure 
  2964.  * that the id and args are actually meaningful.
  2965.  */
  2966. void
  2967. faxQueueApp::FIFOMessage(char cmd, const fxStr& id, const char* args)
  2968. {
  2969.     bool status = false;
  2970.     switch (cmd) {
  2971.     case '+': // modem status msg
  2972. if (id.length()) FIFOModemMessage(id, args);
  2973. return;
  2974.     case '*': // job status msg from subproc's
  2975. if (id.length()) FIFOJobMessage(id, args);
  2976. return;
  2977.     case '@': // receive status msg
  2978. if (id.length()) FIFORecvMessage(id, args);
  2979. return;
  2980.     case 'Q': // quit
  2981. if (!id.length()) {
  2982.     traceServer("QUIT");
  2983.     quit = true;
  2984.     pokeScheduler();
  2985. }
  2986. return; // NB: no return value expected
  2987.     case 'T': // create new trigger 
  2988. if (id.length()) {
  2989.     traceServer("TRIGGER %s", args);
  2990.     Trigger::create(id, args);
  2991. }
  2992. return; // NB: trigger id returned specially
  2993.     /*
  2994.      * The remaining commands generate a response if
  2995.      * the client has included a return address.
  2996.      */
  2997.     case 'C': // configuration control
  2998. if (args[0] == '') return;
  2999. traceServer("CONFIG %s", args);
  3000. status = readConfigItem(args);
  3001. break;
  3002.     case 'D': // cancel an existing trigger
  3003. if (args[0] == '') return;
  3004. traceServer("DELETE %s", args);
  3005. status = Trigger::cancel(args);
  3006. break;
  3007.     case 'R': // remove job
  3008. if (args[0] == '') return;
  3009. traceServer("REMOVE JOB %s", args);
  3010. status = terminateJob(args, Job::removed);
  3011. break;
  3012.     case 'K': // kill job
  3013. if (args[0] == '') return;
  3014. traceServer("KILL JOB %s", args);
  3015. status = terminateJob(args, Job::killed);
  3016. break;
  3017.     case 'S': // submit an outbound job
  3018. if (args[0] == '') return;
  3019. traceServer("SUBMIT JOB %s", args);
  3020. status = submitJob(args, false, true);
  3021. if (status)
  3022.     pokeScheduler();
  3023. break;
  3024.     case 'U': // unreference file
  3025. if (args[0] == '') return;
  3026. traceServer("UNREF DOC %s", args);
  3027. unrefDoc(args);
  3028. status = true;
  3029. break;
  3030.     case 'X': // suspend job
  3031. if (args[0] == '') return;
  3032. traceServer("SUSPEND JOB %s", args);
  3033. status = suspendJob(args, false);
  3034. if (status)
  3035.     pokeScheduler();
  3036. break;
  3037.     case 'Y': // interrupt job
  3038. if (args[0] == '') return;
  3039. traceServer("INTERRUPT JOB %s", args);
  3040. status = suspendJob(args, true);
  3041. if (status)
  3042.     pokeScheduler();
  3043. break;
  3044.     case 'N': // noop
  3045. status = true;
  3046. break;
  3047.     case 'Z':
  3048. showDebugState();
  3049. break;
  3050.     default:
  3051. logError("Bad FIFO cmd '%c' from client %s", cmd, (const char*) id);
  3052. break;
  3053.     }
  3054.     if (id != fxStr::null) {
  3055. char msg[3];
  3056. msg[0] = cmd;
  3057. msg[1] = (status ? '*' : '!');
  3058. msg[2] = '';
  3059. if (tracingLevel & FAXTRACE_FIFO)
  3060.     logInfo("FIFO SEND %s msg "%s"", (const char*) id, msg);
  3061. HylaClient::getClient(id).send(msg, sizeof (msg));
  3062.     }
  3063. }
  3064. void
  3065. faxQueueApp::notifyModemWedged(Modem& modem)
  3066. {
  3067.     fxStr dev(idToDev(modem.getDeviceID()));
  3068.     logError("MODEM %s appears to be wedged", (const char*)dev);
  3069.     fxStr cmd(wedgedCmd
  3070. | quote | quoted(modem.getDeviceID()) | enquote
  3071. | quote |                 quoted(dev) | enquote
  3072.     );
  3073.     traceServer("MODEM WEDGED: %s", (const char*) cmd);
  3074.     runCmd(cmd, true, this);
  3075. }
  3076. void
  3077. faxQueueApp::FIFOModemMessage(const fxStr& devid, const char* msg)
  3078. {
  3079.     if (! devid.length() > 0)
  3080.     {
  3081. traceServer("Invalid modem FIFO message");
  3082. return;
  3083.     }
  3084.     Modem& modem = Modem::getModemByID(devid);
  3085.     switch (msg[0]) {
  3086.     case 'R': // modem ready, parse capabilities
  3087. modem.stopLockPolling();
  3088. if (msg[1] != '') {
  3089.     modem.setCapabilities(msg+1); // NB: also sets modem READY
  3090.     traceModem(modem, "READY, capabilities %s", msg+1);
  3091. } else {
  3092.     modem.setState(Modem::READY);
  3093.     traceModem(modem, "READY (no capabilities)");
  3094. }
  3095. Trigger::post(Trigger::MODEM_READY, modem);
  3096. pokeScheduler();
  3097. break;
  3098.     case 'B': // modem busy doing something
  3099. modem.stopLockPolling();
  3100. traceModem(modem, "BUSY");
  3101. modem.setState(Modem::BUSY);
  3102. Trigger::post(Trigger::MODEM_BUSY, modem);
  3103. break;
  3104.     case 'D': // modem to be marked down
  3105. modem.stopLockPolling();
  3106. traceModem(modem, "DOWN");
  3107. modem.setState(Modem::DOWN);
  3108. Trigger::post(Trigger::MODEM_DOWN, modem);
  3109. break;
  3110.     case 'E': // modem exempt from sending use
  3111. modem.stopLockPolling();
  3112. traceModem(modem, "EXEMPT");
  3113. modem.setState(Modem::EXEMPT);
  3114. Trigger::post(Trigger::MODEM_EXEMPT, modem);
  3115. // clear any pending jobs for this modem
  3116. pokeScheduler();
  3117. break;
  3118.     case 'N': // modem phone number updated
  3119. traceModem(modem, "NUMBER %s", msg+1);
  3120. modem.setNumber(msg+1);
  3121. break;
  3122.     case 'I': // modem communication ID
  3123. traceModem(modem, "COMID %s", msg+1);
  3124. modem.setCommID(msg+1);
  3125. break;
  3126.     case 'W': // modem appears wedged
  3127. // NB: modem should be marked down in a separate message
  3128. notifyModemWedged(modem);
  3129.         Trigger::post(Trigger::MODEM_WEDGED, modem);
  3130. break;
  3131.     case 'U': // modem inuse by outbound job
  3132. modem.stopLockPolling();
  3133. traceModem(modem, "BUSY");
  3134. modem.setState(Modem::BUSY);
  3135. Trigger::post(Trigger::MODEM_INUSE, modem);
  3136. break;
  3137.     case 'C': // caller-ID information
  3138. Trigger::post(Trigger::MODEM_CID, modem, msg+1);
  3139. break;
  3140.     case 'd': // data call begun
  3141. Trigger::post(Trigger::MODEM_DATA_BEGIN, modem);
  3142. break;
  3143.     case 'e': // data call finished
  3144. Trigger::post(Trigger::MODEM_DATA_END, modem);
  3145. break;
  3146.     case 'v': // voice call begun
  3147. Trigger::post(Trigger::MODEM_VOICE_BEGIN, modem);
  3148. break;
  3149.     case 'w': // voice call finished
  3150. Trigger::post(Trigger::MODEM_VOICE_END, modem);
  3151. break;
  3152.     default:
  3153. traceServer("FIFO: Bad modem message "%s" for modem %s",
  3154. msg, (const char*)devid);
  3155. break;
  3156.     }
  3157. }
  3158. void
  3159. faxQueueApp::FIFOJobMessage(const fxStr& jobid, const char* msg)
  3160. {
  3161.     Job* jp = Job::getJobByID(jobid);
  3162.     if (!jp) {
  3163. traceServer("FIFO: JOB %s not found for msg "%s"",
  3164.     (const char*) jobid, msg);
  3165. return;
  3166.     }
  3167.     switch (msg[0]) {
  3168.     case 'c': // call placed
  3169. Trigger::post(Trigger::SEND_CALL, *jp);
  3170. break;
  3171.     case 'C': // call connected with fax
  3172. Trigger::post(Trigger::SEND_CONNECTED, *jp);
  3173. break;
  3174.     case 'd': // page sent
  3175. Trigger::post(Trigger::SEND_PAGE, *jp, msg+1);
  3176. break;
  3177.     case 'D': // document sent
  3178. { FaxSendInfo si; si.decode(msg+1); unrefDoc(si.qfile); }
  3179. Trigger::post(Trigger::SEND_DOC, *jp, msg+1);
  3180. break;
  3181.     case 'p': // polled document received
  3182. Trigger::post(Trigger::SEND_POLLRCVD, *jp, msg+1);
  3183. break;
  3184.     case 'P': // polling operation done
  3185. Trigger::post(Trigger::SEND_POLLDONE, *jp, msg+1);
  3186. break;
  3187.     default:
  3188. traceServer("FIFO: Unknown job message "%s" for job %s",
  3189. msg, (const char*)jobid);
  3190. break;
  3191.     }
  3192. }
  3193. void
  3194. faxQueueApp::FIFORecvMessage(const fxStr& devid, const char* msg)
  3195. {
  3196.     if (! devid.length() > 0)
  3197.     {
  3198. traceServer("Invalid modem FIFO message");
  3199. return;
  3200.     }
  3201.     Modem& modem = Modem::getModemByID(devid);
  3202.     switch (msg[0]) {
  3203.     case 'B': // inbound call started
  3204. Trigger::post(Trigger::RECV_BEGIN, modem);
  3205. break;
  3206.     case 'E': // inbound call finished
  3207. Trigger::post(Trigger::RECV_END, modem);
  3208. break;
  3209.     case 'S': // session started (received initial parameters)
  3210. Trigger::post(Trigger::RECV_START, modem, msg+1);
  3211. break;
  3212.     case 'P': // page done
  3213. Trigger::post(Trigger::RECV_PAGE, modem, msg+1);
  3214. break;
  3215.     case 'D': // document done
  3216. Trigger::post(Trigger::RECV_DOC, modem, msg+1);
  3217. break;
  3218.     default:
  3219. traceServer("FIFO: Unknown recv message "%s" for modem %s",
  3220. msg, (const char*)devid);
  3221. break;
  3222.     }
  3223. }
  3224. /*
  3225.  * Configuration support.
  3226.  */
  3227. void
  3228. faxQueueApp::resetConfig()
  3229. {
  3230.     FaxConfig::resetConfig();
  3231.     dialRules = NULL;
  3232.     setupConfig();
  3233. }
  3234. #define N(a) (sizeof (a) / sizeof (a[0]))
  3235. faxQueueApp::stringtag faxQueueApp::strings[] = {
  3236. { "logfacility", &faxQueueApp::logFacility, LOG_FAX },
  3237. { "areacode", &faxQueueApp::areaCode },
  3238. { "countrycode", &faxQueueApp::countryCode },
  3239. { "longdistanceprefix", &faxQueueApp::longDistancePrefix },
  3240. { "internationalprefix",&faxQueueApp::internationalPrefix },
  3241. { "uucplockdir", &faxQueueApp::uucpLockDir, UUCP_LOCKDIR },
  3242. { "uucplocktype", &faxQueueApp::uucpLockType, UUCP_LOCKTYPE },
  3243. { "contcoverpage", &faxQueueApp::contCoverPageTemplate },
  3244. { "contcovercmd", &faxQueueApp::coverCmd, FAX_COVERCMD },
  3245. { "notifycmd", &faxQueueApp::notifyCmd, FAX_NOTIFYCMD },
  3246. { "ps2faxcmd", &faxQueueApp::ps2faxCmd, FAX_PS2FAXCMD },
  3247. { "pdf2faxcmd", &faxQueueApp::pdf2faxCmd, FAX_PDF2FAXCMD },
  3248. { "pcl2faxcmd", &faxQueueApp::pcl2faxCmd, FAX_PCL2FAXCMD },
  3249. { "tiff2faxcmd", &faxQueueApp::tiff2faxCmd, FAX_TIFF2FAXCMD },
  3250. { "sendfaxcmd", &faxQueueApp::sendFaxCmd,
  3251.    FAX_LIBEXEC "/faxsend" },
  3252. { "sendpagecmd", &faxQueueApp::sendPageCmd,
  3253.    FAX_LIBEXEC "/pagesend" },
  3254. { "senduucpcmd", &faxQueueApp::sendUUCPCmd,
  3255.    FAX_LIBEXEC "/uucpsend" },
  3256. { "wedgedcmd", &faxQueueApp::wedgedCmd, FAX_WEDGEDCMD },
  3257. { "jobcontrolcmd", &faxQueueApp::jobCtrlCmd, "" },
  3258. };
  3259. faxQueueApp::numbertag faxQueueApp::numbers[] = {
  3260. { "tracingmask", &faxQueueApp::tracingMask, // NB: must be first
  3261.    FAXTRACE_MODEMIO|FAXTRACE_TIMEOUTS },
  3262. { "servertracing", &faxQueueApp::tracingLevel, FAXTRACE_SERVER },
  3263. { "uucplocktimeout", &faxQueueApp::uucpLockTimeout, 0 },
  3264. { "postscripttimeout", &faxQueueApp::postscriptTimeout, 3*60 },
  3265. { "maxconcurrentjobs", &faxQueueApp::maxConcurrentCalls, 1 },
  3266. { "maxconcurrentcalls", &faxQueueApp::maxConcurrentCalls, 1 },
  3267. { "maxbatchjobs", &faxQueueApp::maxBatchJobs, (u_int) 64 },
  3268. { "maxsendpages", &faxQueueApp::maxSendPages, (u_int) -1 },
  3269. { "maxtries", &faxQueueApp::maxTries, (u_int) FAX_RETRIES },
  3270. { "maxdials", &faxQueueApp::maxDials, (u_int) FAX_REDIALS },
  3271. { "jobreqother", &faxQueueApp::requeueInterval, FAX_REQUEUE },
  3272. { "polllockwait", &faxQueueApp::pollLockWait, 30 },
  3273. { "staggercalls", &faxQueueApp::staggerCalls, 0 },
  3274. };
  3275. void
  3276. faxQueueApp::setupConfig()
  3277. {
  3278.     int i;
  3279.     for (i = N(strings)-1; i >= 0; i--)
  3280. (*this).*strings[i].p = (strings[i].def ? strings[i].def : "");
  3281.     for (i = N(numbers)-1; i >= 0; i--)
  3282. (*this).*numbers[i].p = numbers[i].def;
  3283.     tod.reset(); // any day, any time
  3284.     use2D = true; // ok to use 2D data
  3285.     useUnlimitedLN = true; // ok to use LN_INF
  3286.     allowIgnoreModemBusy = false; // to allow jobs to ignore modem busy status
  3287.     uucpLockMode = UUCP_LOCKMODE;
  3288.     delete dialRules, dialRules = NULL;
  3289.     ModemGroup::reset(); // clear+add ``any modem'' class
  3290.     ModemGroup::set(MODEM_ANY, new RE(".*"));
  3291.     pageChop = FaxRequest::chop_last;
  3292.     pageChopThreshold = 3.0; // minimum of 3" of white space
  3293.     lastCall = Sys::now() - 3600;
  3294. }
  3295. void
  3296. faxQueueApp::configError(const char* fmt, ...)
  3297. {
  3298.     va_list ap;
  3299.     va_start(ap, fmt);
  3300.     vlogError(fmt, ap);
  3301.     va_end(ap);
  3302. }
  3303. void
  3304. faxQueueApp::configTrace(const char* fmt, ...)
  3305. {
  3306.     if (tracingLevel & FAXTRACE_CONFIG) {
  3307. va_list ap;
  3308. va_start(ap, fmt);
  3309. vlogError(fmt, ap);
  3310. va_end(ap);
  3311.     }
  3312. }
  3313. static void
  3314. crackArgv(fxStr& s)
  3315. {
  3316.     u_int i = 0;
  3317.     do {
  3318.         while (i < s.length() && !isspace(s[i])) i++;
  3319.         if (i < s.length()) {
  3320.             s[i++] = '';
  3321.             u_int j = i;
  3322.             while (j < s.length() && isspace(s[j])) j++;
  3323.             if (j > i) {
  3324.                 s.remove(i, j - i);
  3325.             }
  3326.         }
  3327.     } while (i < s.length());
  3328.     s.resize(i);
  3329. }
  3330. static void
  3331. tiffErrorHandler(const char* module, const char* fmt0, va_list ap)
  3332. {
  3333.     fxStr fmt = (module != NULL) ?
  3334.         fxStr::format("%s: Warning, %s.", module, fmt0)
  3335.         : fxStr::format("Warning, %s.", fmt0);
  3336.     vlogError(fmt, ap);
  3337. }
  3338. static void
  3339. tiffWarningHandler(const char* module, const char* fmt0, va_list ap)
  3340. {
  3341.     fxStr fmt = (module != NULL) ?
  3342.         fxStr::format("%s: Warning, %s.", module, fmt0)
  3343.         : fxStr::format("Warning, %s.", fmt0);
  3344.     vlogWarning(fmt, ap);
  3345. }
  3346. bool
  3347. faxQueueApp::setConfigItem(const char* tag, const char* value)
  3348. {
  3349.     u_int ix;
  3350.     if (findTag(tag, (const tags*) strings, N(strings), ix)) {
  3351. (*this).*strings[ix].p = value;
  3352. switch (ix) {
  3353. case 0: faxApp::setLogFacility(logFacility); break;
  3354. }
  3355. if (ix >= 8)
  3356.     crackArgv((*this).*strings[ix].p);
  3357.     } else if (findTag(tag, (const tags*) numbers, N(numbers), ix)) {
  3358. (*this).*numbers[ix].p = getNumber(value);
  3359. switch (ix) {
  3360. case 1:
  3361.     tracingLevel &= ~tracingMask;
  3362.     if (dialRules)
  3363. dialRules->setVerbose((tracingLevel&FAXTRACE_DIALRULES) != 0);
  3364.     if (tracingLevel&FAXTRACE_TIFF) {
  3365. TIFFSetErrorHandler(tiffErrorHandler);
  3366. TIFFSetWarningHandler(tiffWarningHandler);
  3367.     } else {
  3368. TIFFSetErrorHandler(NULL);
  3369. TIFFSetWarningHandler(NULL);
  3370.     }
  3371.     break;
  3372. case 2: UUCPLock::setLockTimeout(uucpLockTimeout); break;
  3373. }
  3374.     } else if (streq(tag, "dialstringrules"))
  3375. setDialRules(value);
  3376.     else if (streq(tag, "timeofday"))
  3377. tod.parse(value);
  3378.     else if (streq(tag, "use2d"))
  3379. use2D = getBoolean(value);
  3380.     else if (streq(tag, "allowignoremodembusy"))
  3381. allowIgnoreModemBusy = getBoolean(value);
  3382.     else if (streq(tag, "uucplockmode"))
  3383. uucpLockMode = (mode_t) strtol(value, 0, 8);
  3384.     else if (streq(tag, "modemgroup")) {
  3385. const char* cp;
  3386. for (cp = value; *cp && *cp != ':'; cp++)
  3387.     ;
  3388. if (*cp == ':') {
  3389.     fxStr name(value, cp-value);
  3390.     for (cp++; *cp && isspace(*cp); cp++)
  3391. ;
  3392.     if (*cp != '') {
  3393. RE* re = new RE(cp);
  3394. if (re->getErrorCode() > REG_NOMATCH) {
  3395.     fxStr emsg;
  3396.     re->getError(emsg);
  3397.     configError("Bad pattern for modem group "%s": %s: %s", (const char*) emsg,
  3398. (const char*) name, re->pattern());
  3399. } else
  3400.     ModemGroup::set(name, re);
  3401.     } else
  3402. configError("No regular expression for modem group");
  3403. } else
  3404.     configError("Missing ':' separator in modem group specification");
  3405.     } else if (streq(tag, "pagechop")) {
  3406. if (streq(value, "all"))
  3407.     pageChop = FaxRequest::chop_all;
  3408. else if (streq(value, "none"))
  3409.     pageChop = FaxRequest::chop_none;
  3410. else if (streq(value, "last"))
  3411.     pageChop = FaxRequest::chop_last;
  3412.     } else if (streq(tag, "pagechopthreshold"))
  3413. pageChopThreshold = atof(value);
  3414.     else if (streq(tag, "audithook") )
  3415.     {
  3416.         const char* cp;
  3417. for (cp = value; *cp && *cp != ':'; cp++)
  3418.     ;
  3419. if (*cp == ':') {
  3420.     fxStr cmd(value, cp-value);
  3421.     for (cp++; *cp && isspace(*cp); cp++)
  3422. ;
  3423.     if (*cp != '') {
  3424.      Trigger::setTriggerHook(cmd, cp);
  3425.     } else
  3426. configError("No trigger specification for audit hook");
  3427. } else
  3428.     configError("Missing ':' separator in audit hook specification");
  3429.     
  3430.     } else
  3431. return (false);
  3432.     return (true);
  3433. }
  3434. /*
  3435.  * Subclass DialStringRules so that we can redirect the
  3436.  * diagnostic and tracing interfaces through the server.
  3437.  */
  3438. class MyDialStringRules : public DialStringRules {
  3439. private:
  3440.     virtual void parseError(const char* fmt ...);
  3441.     virtual void traceParse(const char* fmt ...);
  3442.     virtual void traceRules(const char* fmt ...);
  3443. public:
  3444.     MyDialStringRules(const char* filename);
  3445.     ~MyDialStringRules();
  3446. };
  3447. MyDialStringRules::MyDialStringRules(const char* f) : DialStringRules(f) {}
  3448. MyDialStringRules::~MyDialStringRules() {}
  3449. void
  3450. MyDialStringRules::parseError(const char* fmt ...)
  3451. {
  3452.     va_list ap;
  3453.     va_start(ap, fmt);
  3454.     vlogError(fmt, ap);
  3455.     va_end(ap);
  3456. }
  3457. void
  3458. MyDialStringRules::traceParse(const char* fmt ...)
  3459. {
  3460.     if (faxQueueApp::instance().getTracingLevel() & FAXTRACE_DIALRULES) {
  3461. va_list ap;
  3462. va_start(ap, fmt);
  3463. vlogInfo(fmt, ap);
  3464. va_end(ap);
  3465.     }
  3466. }
  3467. void
  3468. MyDialStringRules::traceRules(const char* fmt ...)
  3469. {
  3470.     if (faxQueueApp::instance().getTracingLevel() & FAXTRACE_DIALRULES) {
  3471. va_list ap;
  3472. va_start(ap, fmt);
  3473. vlogInfo(fmt, ap);
  3474. va_end(ap);
  3475.     }
  3476. }
  3477. void
  3478. faxQueueApp::setDialRules(const char* name)
  3479. {
  3480.     delete dialRules;
  3481.     dialRules = new MyDialStringRules(name);
  3482.     dialRules->setVerbose((tracingLevel & FAXTRACE_DIALRULES) != 0);
  3483.     /*
  3484.      * Setup configuration environment.
  3485.      */
  3486.     dialRules->def("AreaCode", areaCode);
  3487.     dialRules->def("CountryCode", countryCode);
  3488.     dialRules->def("LongDistancePrefix", longDistancePrefix);
  3489.     dialRules->def("InternationalPrefix", internationalPrefix);
  3490.     if (!dialRules->parse()) {
  3491. configError("Parse error in dial string rules "%s"", name);
  3492. delete dialRules, dialRules = NULL;
  3493.     }
  3494. }
  3495. /*
  3496.  * Convert a dialing string to a canonical format.
  3497.  */
  3498. fxStr
  3499. faxQueueApp::canonicalizePhoneNumber(const fxStr& ds)
  3500. {
  3501.     if (dialRules)
  3502. return dialRules->canonicalNumber(ds);
  3503.     else
  3504. return ds;
  3505. }
  3506. /*
  3507.  * Create an appropriate UUCP lock instance.
  3508.  */
  3509. UUCPLock*
  3510. faxQueueApp::getUUCPLock(const fxStr& deviceName)
  3511. {
  3512.     return UUCPLock::newLock(uucpLockType,
  3513. uucpLockDir, deviceName, uucpLockMode);
  3514. }
  3515. u_int faxQueueApp::getTracingLevel() const
  3516.     { return tracingLevel; }
  3517. u_int faxQueueApp::getMaxConcurrentCalls() const
  3518.     { return maxConcurrentCalls; }
  3519. u_int faxQueueApp::getMaxSendPages() const
  3520.     { return maxSendPages; }
  3521. u_int faxQueueApp::getMaxDials() const
  3522.     { return maxDials; }
  3523. u_int faxQueueApp::getMaxTries() const
  3524.     { return maxTries; }
  3525. time_t faxQueueApp::nextTimeToSend(time_t t) const
  3526.     { return tod.nextTimeOfDay(t); }
  3527. /*
  3528.  * Miscellaneous stuff.
  3529.  */
  3530. /*
  3531.  * Notify the sender of a job that something has
  3532.  * happened -- the job has completed, it's been requeued
  3533.  * for later processing, etc.
  3534.  */
  3535. void
  3536. faxQueueApp::notifySender(Job& job, JobStatus why, const char* duration)
  3537. {
  3538.     fxStr cmd(notifyCmd
  3539. | quote |  quoted(job.file) | enquote
  3540. | quote | quoted(Job::jobStatusName(why)) | enquote
  3541. | quote |  quoted(duration) | enquote
  3542.     );
  3543.     if (why == Job::requeued) {
  3544. /*
  3545.  * It's too hard to do localtime in an awk script,
  3546.  * so if we may need it, we calculate it here
  3547.  * and pass the result as an optional argument.
  3548.  */
  3549. char buf[30];
  3550. strftime(buf, sizeof (buf), " '%H:%M'", localtime(&job.tts));
  3551. cmd.append(buf);
  3552.     }
  3553.     traceServer("NOTIFY: %s", (const char*) cmd);
  3554.     runCmd(cmd, true, this);
  3555. }
  3556. void
  3557. faxQueueApp::vtraceServer(const char* fmt, va_list ap)
  3558. {
  3559.     if (tracingLevel & FAXTRACE_SERVER)
  3560. vlogInfo(fmt, ap);
  3561. }
  3562. void
  3563. faxQueueApp::traceServer(const char* fmt ...)
  3564. {
  3565.     if (tracingLevel & FAXTRACE_SERVER) {
  3566. va_list ap;
  3567. va_start(ap, fmt);
  3568. vlogInfo(fmt, ap);
  3569. va_end(ap);
  3570.     }
  3571. }
  3572. static void
  3573. vtraceJob(const Job& job, const char* fmt, va_list ap)
  3574. {
  3575.     static const char* stateNames[] = {
  3576.         "state#0", "suspended", "pending", "sleeping", "blocked",
  3577. "ready", "active", "done", "failed"
  3578.     };
  3579.     time_t now = Sys::now();
  3580.     vlogInfo(
  3581.   "JOB " | job.jobid
  3582. | " (" | stateNames[job.state%9]
  3583. | " dest " | job.dest
  3584. | fxStr::format(" pri %u", job.pri)
  3585. | " tts " | strTime(job.tts - now)
  3586. | " killtime " | strTime(job.killtime - now)
  3587. | "): "
  3588. | fmt, ap);
  3589. }
  3590. void
  3591. faxQueueApp::traceQueue(const Job& job, const char* fmt ...)
  3592. {
  3593.     if (tracingLevel & FAXTRACE_QUEUEMGMT) {
  3594. va_list ap;
  3595. va_start(ap, fmt);
  3596. vtraceJob(job, fmt, ap);
  3597. va_end(ap);
  3598.     }
  3599. }
  3600. void
  3601. faxQueueApp::traceJob(const Job& job, const char* fmt ...)
  3602. {
  3603.     if (tracingLevel & FAXTRACE_JOBMGMT) {
  3604. va_list ap;
  3605. va_start(ap, fmt);
  3606. vtraceJob(job, fmt, ap);
  3607. va_end(ap);
  3608.     }
  3609. }
  3610. void
  3611. faxQueueApp::traceQueue(const char* fmt ...)
  3612. {
  3613.     if (tracingLevel & FAXTRACE_QUEUEMGMT) {
  3614. va_list ap;
  3615. va_start(ap, fmt);
  3616. vlogInfo(fmt, ap);
  3617. va_end(ap);
  3618.     }
  3619. }
  3620. void
  3621. faxQueueApp::traceModem(const Modem& modem, const char* fmt ...)
  3622. {
  3623.     if (tracingLevel & FAXTRACE_MODEMSTATE) {
  3624. va_list ap;
  3625. va_start(ap, fmt);
  3626. vlogInfo("MODEM " | modem.getDeviceID() | ": " | fmt, ap);
  3627. va_end(ap);
  3628.     }
  3629. }
  3630. void
  3631. faxQueueApp::jobError(const Job& job, const char* fmt ...)
  3632. {
  3633.     va_list ap;
  3634.     va_start(ap, fmt);
  3635.     vlogError("JOB " | job.jobid | ": " | fmt, ap);
  3636.     va_end(ap);
  3637. }
  3638. void
  3639. faxQueueApp::showDebugState(void)
  3640. {
  3641.     traceServer("DEBUG: Listing destJobs with %d items", destJobs.size());
  3642.     for (DestInfoDictIter iter(destJobs); iter.notDone(); iter++)
  3643.     {
  3644. const fxStr& dest(iter.key());
  3645. const DestInfo& di(iter.value());
  3646. traceServer("DestInfo (%p) to %s", &di, (const char*)dest);
  3647.     }
  3648.     for (int i = 0; i < NQHASH; i++)
  3649.     {
  3650. traceServer("DEBUG: runqs[%d](%p) next %p", i, &runqs[i], runqs[i].next);
  3651. for (JobIter iter(runqs[i]); iter.notDone(); iter++)
  3652. {
  3653.     Job& job(iter);
  3654.     traceJob(job, "In run queue");
  3655. }
  3656.     }
  3657.     traceServer("DEBUG: sleepq(%p) next %p", &sleepq, sleepq.next);
  3658.     for (JobIter iter(sleepq); iter.notDone(); iter++)
  3659.     {
  3660. Job& job(iter);
  3661. traceJob(job, "In sleep queue");
  3662.     }
  3663.     traceServer("DEBUG: suspendq(%p) next %p", &suspendq, suspendq.next);
  3664.     for (JobIter iter(suspendq); iter.notDone(); iter++)
  3665.     {
  3666. Job& job(iter);
  3667. traceJob(job, "In suspend queue");
  3668.     }
  3669.     traceServer("DEBUG: activeq(%p) next %p", &activeq, activeq.next);
  3670.     for (JobIter iter(activeq); iter.notDone(); iter++)
  3671.     {
  3672. Job& job(iter);
  3673. traceJob(job, "In active queue");
  3674.     }
  3675.     traceServer("DEBUG: inSchedule: %s", inSchedule ? "YES" : "NO");
  3676. }
  3677. void faxQueueApp::childStatus(pid_t pid, int status)
  3678. {
  3679.     // We don't do anything here - nothing to act on.
  3680.     //traceServer("Child exit status: %#o (%u)", status, pid);
  3681. }
  3682. static void
  3683. usage(const char* appName)
  3684. {
  3685.     faxApp::fatal("usage: %s [-q queue-directory] [-D]", appName);
  3686. }
  3687. static void
  3688. sigCleanup(int)
  3689. {
  3690.     faxQueueApp::instance().close();
  3691.     _exit(-1);
  3692. }
  3693. int
  3694. main(int argc, char** argv)
  3695. {
  3696.     faxApp::setupLogging("FaxQueuer");
  3697.     fxStr appName = argv[0];
  3698.     u_int l = appName.length();
  3699.     appName = appName.tokenR(l, '/');
  3700.     faxApp::setupPermissions();
  3701.     faxApp::setOpts("q:Dc:");
  3702.     bool detach = true;
  3703.     fxStr queueDir(FAX_SPOOLDIR);
  3704.     for (GetoptIter iter(argc, argv, faxApp::getOpts()); iter.notDone(); iter++)
  3705. switch (iter.option()) {
  3706. case 'q': queueDir = iter.optArg(); break;
  3707. case 'D': detach = false; break;
  3708. case '?': usage(appName);
  3709. }
  3710.     if (Sys::chdir(queueDir) < 0)
  3711. faxApp::fatal(queueDir | ": Can not change directory");
  3712.     if (!Sys::isRegularFile(FAX_ETCDIR "/setup.cache"))
  3713. faxApp::fatal("No " FAX_ETCDIR "/setup.cache file; run faxsetup first");
  3714.     if (detach)
  3715. faxApp::detachFromTTY();
  3716.     faxQueueApp* app = new faxQueueApp;
  3717.     signal(SIGTERM, fxSIGHANDLER(sigCleanup));
  3718.     signal(SIGINT, fxSIGHANDLER(sigCleanup));
  3719.     app->initialize(argc, argv);
  3720.     app->open();
  3721.     while (app->isRunning())
  3722. Dispatcher::instance().dispatch();
  3723.     app->close();
  3724.     delete app;
  3725.     Modem::CLEANUP();
  3726.     delete &Dispatcher::instance();
  3727.     
  3728.     return 0;
  3729. }