dao.cc
上传用户:weiliju62
上传日期:2007-01-06
资源大小:619k
文件大小:28k
源码类别:

SCSI/ASPI

开发平台:

MultiPlatform

  1. /*  cdrdao - write audio CD-Rs in disc-at-once mode
  2.  *
  3.  *  Copyright (C) 1998, 1999  Andreas Mueller <mueller@daneb.ping.de>
  4.  *
  5.  *  This program is free software; you can redistribute it and/or modify
  6.  *  it under the terms of the GNU General Public License as published by
  7.  *  the Free Software Foundation; either version 2 of the License, or
  8.  *  (at your option) any later version.
  9.  *
  10.  *  This program is distributed in the hope that it will be useful,
  11.  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
  12.  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  13.  *  GNU General Public License for more details.
  14.  *
  15.  *  You should have received a copy of the GNU General Public License
  16.  *  along with this program; if not, write to the Free Software
  17.  *  Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
  18.  */
  19. /*
  20.  * $Log: dao.cc,v $
  21.  * Revision 1.12  1999/05/11 20:04:09  mueller
  22.  * SYSV message queues are not used anymore. The communication between
  23.  * reader and writer is based on a polling mechanism, now.
  24.  * Added posix thread support.
  25.  *
  26.  * Revision 1.11  1999/03/27 20:57:27  mueller
  27.  * Improved buffer code. Does not use semaphores anymore.
  28.  * The reader is now the forked process.
  29.  *
  30.  * Revision 1.10  1999/01/24 16:03:57  mueller
  31.  * Applied Radek Doulik's ring buffer patch. Added some cleanups and
  32.  * improved behavior in error case.
  33.  *
  34.  * Revision 1.9  1998/10/03 14:33:46  mueller
  35.  * Applied patch from Bjoern Fischer <bfischer@Techfak.Uni-Bielefeld.DE>:
  36.  * Cosmetic changes and correction of real time scheduling selection.
  37.  *
  38.  * Revision 1.8  1998/09/22 19:14:29  mueller
  39.  * Added locking of memory pages.
  40.  *
  41.  * Revision 1.7  1998/09/21 19:35:51  mueller
  42.  * Added real time scheduling. Submitted by Bjoern Fischer <bfischer@Techfak.Uni-Bielefeld.DE>.
  43.  *
  44.  * Revision 1.6  1998/09/06 13:34:22  mueller
  45.  * Use 'message()' for printing messages.
  46.  *
  47.  * Revision 1.5  1998/08/30 19:21:15  mueller
  48.  * Rearranged the code.
  49.  *
  50.  * Revision 1.4  1998/08/15 20:47:16  mueller
  51.  * Added support for GenericMMC driver.
  52.  *
  53.  */
  54. static char rcsid[] = "$Id: dao.cc,v 1.12 1999/05/11 20:04:09 mueller Exp mueller $";
  55. #include <config.h>
  56. #include <stdio.h>
  57. #include <unistd.h>
  58. #include <fcntl.h>
  59. #include <errno.h>
  60. #include <string.h>
  61. #include <signal.h>
  62. #include <sys/types.h>
  63. #include <sys/wait.h>
  64. #include <assert.h>
  65. #ifdef linux
  66. #include <linux/unistd.h>
  67. #include <linux/types.h>
  68. #include <linux/sysctl.h>
  69. #include <sys/sysctl.h>
  70. #endif
  71. #ifdef HAVE_SYS_MMAN_H
  72. #include <sys/mman.h>
  73. #endif
  74. #ifdef USE_POSIX_THREADS
  75. #include <pthread.h>
  76. #include <sched.h>
  77. #else
  78. #include <sys/ipc.h>
  79. #include <sys/shm.h>
  80. #endif
  81. #include "port.h"
  82. #define DEBUG_WRITE 0
  83. #if defined(__FreeBSD__)
  84. #define IPC_ARG_T void
  85. #else
  86. #define IPC_ARG_T msgbuf
  87. #endif
  88. /* Select POSIX scheduler interface for real time scheduling if possible */
  89. #if (defined HAVE_SCHED_GETPARAM) && (defined HAVE_SCHED_GET_PRIORITY_MAX) && (defined HAVE_SCHED_SETSCHEDULER) && (!defined LINUX_QNX_SCHEDULING)
  90. #define POSIX_SCHEDULING
  91. #endif
  92. #ifdef LINUX_QNX_SCHEDULING
  93. #define SCHED_OTHER     0
  94. #define SCHED_FIFO      1
  95. #define SCHED_RR        2
  96. struct sched_param {
  97.   unsigned int priority;
  98.   int fork_penalty_threshold;
  99.   unsigned int starvation_threshold;
  100.   unsigned int ts_max;
  101.   unsigned int run_q, run_q_min, run_q_max;
  102. };
  103. extern "C" int sched_setparam __P((pid_t __pid, const struct sched_param *__param));
  104. extern "C" int sched_getparam __P((pid_t __pid, struct sched_param *__param));
  105. extern "C" int sched_setscheduler __P((pid_t __pid, int __policy, const struct sched_param *__param));
  106. extern "C" int sched_getscheduler __P((pid_t __pid));
  107. #elif defined POSIX_SCHEDULING
  108. #include <sched.h>
  109. #endif
  110. #include "dao.h"
  111. #include "util.h"
  112. #include "remote.h"
  113. struct ShmSegment {
  114.   int id;
  115.   char *buffer;
  116. };
  117. struct Buffer {
  118.   long bufLen;  // number of blocks in buffer that should be written
  119.   TrackData::Mode mode; // data mode for writing
  120.   TrackData::Mode trackMode; // mode of track may differ from 'mode' if data
  121.                              // blocks must be encoded in audio blocks,
  122.                              // only used for message printing
  123.   int trackNr; // if != 0 a new track with given number has started
  124.   char *buffer; // address of buffer that should be written
  125. };
  126.   
  127. struct BufferHeader {
  128.   long buffersRead;    // number of blocks that are read and put to the buffer
  129.   long buffersWritten; // number of blocks that were taken from the buffer
  130.   int buffersFilled;   // set to 1 by reader process when buffer is filled the
  131.                        // first time
  132.   int readerFinished;  
  133.   int readerTerminated;
  134.   int terminateReader;
  135.   long nofBuffers;     // number of available buffers
  136.   Buffer *buffers;
  137. };
  138. // buffer size in blocks
  139. int BUFFER_SIZE = 75;
  140. static int TERMINATE = 0;
  141. static int getSharedMemory(long nofBuffers, BufferHeader **header,
  142.    long *nofSegments, ShmSegment **shmSegments);
  143. static void releaseSharedMemory(long nofSegments, ShmSegment *shmSegments);
  144. static RETSIGTYPE terminationRequest(int sig)
  145. {
  146.   if (sig == SIGQUIT || sig == SIGTERM) 
  147.     TERMINATE = 1;
  148. #if 0
  149.   if (sig == SIGCHLD) {
  150.     message(0, "SIGCHLD received.");
  151.   }
  152. #endif
  153. }
  154. #ifndef USE_POSIX_THREADS
  155. // Waits or polls for termination of a child process.
  156. // noHang: 0: wait until child terminates, 1: just poll if child terminated
  157. // status: filled with status information, only valid if 0 is returned
  158. // return: 0: child exited
  159. //         1: no child exited, can only happen if 'noHang' is 1
  160. //         2: wait failed, 'errno' contains cause
  161. static int waitForChild(int noHang, int *status)
  162. {
  163.   int ret;
  164.   do {
  165.     if (noHang)
  166.       ret = wait3(status, WNOHANG, NULL);
  167.     else
  168.       ret = wait(status);
  169.     if (ret > 0)
  170.       return 0;
  171.     if (ret < 0 && errno != EINTR 
  172. #ifdef ERESTARTSYS
  173. && errno != ERESTARTSYS
  174. #endif
  175. ) {
  176.       return 2;
  177.     }
  178.   } while (ret < 0);
  179.   return 1;
  180. }
  181. #endif
  182. // Blocks all signals that are handled by this module.
  183. static void blockSignals()
  184. {
  185.   sigset_t set;
  186.   sigemptyset(&set);
  187.   sigaddset(&set, SIGCHLD);
  188.   sigaddset(&set, SIGQUIT);
  189.   sigaddset(&set, SIGTERM);
  190. #ifdef USE_POSIX_THREADS
  191. #ifdef HAVE_PTHREAD_SIGMASK
  192.   pthread_sigmask(SIG_BLOCK, &set, NULL);
  193. #endif
  194. #else
  195.   sigprocmask(SIG_BLOCK, &set, NULL);
  196. #endif
  197. }
  198. // Blocks all signals that are handled by this module.
  199. static void unblockSignals()
  200. {
  201.   sigset_t set;
  202.   sigemptyset(&set);
  203.   sigaddset(&set, SIGCHLD);
  204.   sigaddset(&set, SIGQUIT);
  205.   sigaddset(&set, SIGTERM);
  206. #ifdef USE_POSIX_THREADS
  207. #ifdef HAVE_PTHREAD_SIGMASK
  208.   pthread_sigmask(SIG_UNBLOCK, &set, NULL);
  209. #endif
  210. #else
  211.   sigprocmask(SIG_UNBLOCK, &set, NULL);
  212. #endif
  213. }
  214. // return: 0: OK 
  215. //         1: child process terminated and has been collected with 'wait()'
  216. //         2: error -> child process must be terminated
  217. static int writer(CdrDriver *cdr, long total, BufferHeader *header, long lba,
  218.   long remoteMode)
  219. {
  220.   long cnt = 0;
  221.   long blkCount = 0;
  222.   long len = 0;
  223.   long cntMb;
  224.   long lastMb = 0;
  225.   long buffered;
  226.   int buffFill;
  227.   int minFill = 100;
  228.   int maxFill = 0;
  229.   int actTrackNr = 0;
  230.   long actProgress;
  231.   TrackData::Mode dataMode;
  232.   DaoWritingProgress remoteMsg;
  233.   char remoteMsgSync[4];
  234. #ifndef USE_POSIX_THREADS
  235.   int status;
  236. #endif
  237.   remoteMsgSync[0] = 0xff;
  238.   remoteMsgSync[1] = 0x00;
  239.   remoteMsgSync[2] = 0xff;
  240.   remoteMsgSync[3] = 0x00;
  241.   message (3, "Waiting for reader process");
  242.   while (header->buffersFilled == 0) {
  243.     sleep(1);
  244.     if (header->readerTerminated) {
  245.       message(-2, "Reader process terminated abnormally.");
  246.       return 1;
  247.     }
  248. #ifndef USE_POSIX_THREADS
  249.   // Check if child has terminated
  250.     switch (waitForChild(1, &status)) {
  251.     case 0: // Child exited
  252.       message(-2, "Reader process terminated abnormally.");
  253.       return 1;
  254.     case 2:
  255.       message(-2, "wait failed: %s", strerror(errno));
  256.       return 2;
  257.     }
  258. #endif
  259.   }
  260. #if DEBUG_WRITE
  261.   FILE *fp = fopen("test.out", "w");
  262. #endif
  263.   message (3, "Awaken, will start writing");
  264.   if (cdr != NULL) {
  265.     if (remoteMode) {
  266.       remoteMsg.status = 1; // writing lead-in
  267.       remoteMsg.track = 0;
  268.       remoteMsg.totalProgress = 0;
  269.       remoteMsg.bufferFillRate = 100;
  270.       if (write(3/*fileno(stdout)*/, remoteMsgSync, sizeof(remoteMsgSync)) < 0 ||
  271.   write(3/*fileno(stdout)*/, (const char*)&remoteMsg,
  272. sizeof(remoteMsg)) < 0) {
  273. message(-1, "Disabling remote messages.");
  274. remoteMode = 0;
  275.       }
  276.     }
  277.     blockSignals();
  278.     if (cdr->startDao() != 0) {
  279.       unblockSignals();
  280.       return 2;
  281.     }
  282.     unblockSignals();
  283.   }
  284.   do {
  285.     //message(3, "Slave: waiting for master.");
  286.     while (header->buffersWritten == header->buffersRead) {
  287.       if (header->readerTerminated) {
  288. message(-2, "Reader process terminated abnormally.");
  289. return 1;
  290.       }
  291. #ifndef USE_POSIX_THREADS
  292.       // Check if child has terminated
  293.       switch (waitForChild(1, &status)) {
  294.       case 0: // Child exited
  295. message(-2, "Reader process terminated abnormally.");
  296. return 1;
  297.       case 2:
  298. message(-2, "wait failed: %s", strerror(errno));
  299. return 2;
  300.       }
  301. #endif
  302.       mSleep(20);
  303.     }
  304.     Buffer &buf = header->buffers[header->buffersWritten % header->nofBuffers];
  305.     len = buf.bufLen;
  306.     dataMode = buf.mode;
  307.     if (header->readerFinished) {
  308.       buffFill = 100;
  309.       if (maxFill == 0)
  310. maxFill = 100;
  311.     }
  312.     else {
  313.       buffered = header->buffersRead - header->buffersWritten;
  314.       if (buffered == header->nofBuffers ||
  315.   buffered == header->nofBuffers - 1) {
  316. buffFill = 100;
  317.       }
  318.       else {
  319. buffFill = 100 * buffered;
  320. buffFill /= header->nofBuffers;
  321.       }
  322.       if (buffFill > maxFill)
  323. maxFill = buffFill;
  324.     }
  325.     if (buffFill < minFill)
  326.       minFill = buffFill;
  327.     if (len == 0) {
  328.       // all data is written
  329.       message(1, "");
  330.       if (cdr == NULL)
  331. message(1, "Read %ld blocks.", blkCount);
  332.       else
  333. message(1, "Wrote %ld blocks. Buffer fill min %d%%/max %d%%.",
  334. blkCount, minFill, maxFill);
  335. #if DEBUG_WRITE
  336.       if (fp != NULL)
  337. fclose(fp);
  338. #endif
  339.       if (remoteMode) {
  340. remoteMsg.status = 3; // writing lead-out
  341. remoteMsg.track = 0xaa;
  342. remoteMsg.totalProgress = 1000;
  343. remoteMsg.bufferFillRate = 100;
  344. if (write(3/*fileno(stdout)*/, remoteMsgSync, sizeof(remoteMsgSync)) < 0 ||
  345.     write(3/*fileno(stdout)*/, (const char*)&remoteMsg,
  346.   sizeof(remoteMsg)) < 0) {
  347.   message(-1, "Disabling remote messages.");
  348.   remoteMode = 0;
  349. }
  350.       }
  351.       if (cdr != NULL) {
  352. blockSignals();
  353. if (cdr->finishDao() != 0) {
  354.   unblockSignals();
  355.   return 2;
  356. }
  357. unblockSignals();
  358.       }
  359.       return 0;
  360.     }
  361.     cnt += len * AUDIO_BLOCK_LEN;
  362.     blkCount += len;
  363.     if (buf.trackNr > 0) {
  364.       message(1, "Writing track %02d (mode %s/%s)...", buf.trackNr,
  365.       TrackData::mode2String(buf.trackMode),
  366.       TrackData::mode2String(dataMode));
  367.       actTrackNr = buf.trackNr;
  368.     }
  369.     //message(3, "Slave: writing buffer %p (%ld).", buf, len);
  370. #if DEBUG_WRITE
  371.     if (fp != NULL) {
  372.       if (cdr != NULL)
  373. fwrite(buf.buffer, cdr->blockSize(dataMode), len, fp);
  374.       else
  375. fwrite(buf.buffer, 2352, len, fp);
  376.     }
  377. #endif
  378.     if (cdr != NULL) {
  379.       blockSignals();
  380.       if (cdr->writeData(dataMode, lba, buf.buffer, len) != 0) {
  381. message(-2, "Writing failed - buffer under run?");
  382. unblockSignals();
  383. return 2;
  384.       }
  385.       else {
  386. cntMb = cnt >> 20;
  387. if (cntMb > lastMb) {
  388.   message(1, "Wrote %ld of %ld MB (Buffer %3d%%).r", cnt >> 20,
  389.   total >> 20, buffFill);
  390.   lastMb = cntMb;
  391. }
  392.       }
  393.       unblockSignals();
  394.     }
  395.     else {
  396.       message(1, "Read %ld of %ld MB.r", cnt >> 20, total >> 20);
  397.     }
  398.     if (remoteMode) {
  399.       actProgress = cnt;
  400.       actProgress /= total / 1000;
  401.       remoteMsg.status = 2; // writing data
  402.       remoteMsg.track = actTrackNr;
  403.       remoteMsg.totalProgress = actProgress;
  404.       remoteMsg.bufferFillRate = buffFill;
  405.       
  406.       if (write(3/*fileno(stdout)*/, remoteMsgSync, sizeof(remoteMsgSync)) < 0 ||
  407.   write(3/*fileno(stdout)*/, (const char*)&remoteMsg,
  408. sizeof(remoteMsg)) < 0) {
  409. message(-1, "Disabling remote messages.");
  410. remoteMode = 0;
  411.       }
  412.     }
  413.     header->buffersWritten += 1;
  414.   } while (!TERMINATE);
  415.   message(-1, "Writing/simulation/read-test aborted on user request.");
  416.   return 2;
  417. }
  418. struct ReaderArgs {
  419.   const Toc *toc;
  420.   CdrDriver *cdr;
  421.   int swap;
  422.   BufferHeader *header;
  423.   long startLba;
  424. };
  425. static void *reader(void *args)
  426. {
  427.   const Toc *toc = ((ReaderArgs*)args)->toc;
  428.   CdrDriver *cdr = ((ReaderArgs*)args)->cdr;
  429.   int swap = ((ReaderArgs*)args)->swap;
  430.   BufferHeader *header = ((ReaderArgs*)args)->header;
  431.   long lba = ((ReaderArgs*)args)->startLba + 150; // used to encode the sector
  432.                                                   // header (MSF)
  433.   long length = toc->length().lba();
  434.   long n, rn;
  435.   int first = header->nofBuffers;
  436.   const Track *track;
  437.   int trackNr = 1;
  438.   Msf tstart, tend;
  439.   TrackData::Mode dataMode;
  440.   int encodingMode = 1;
  441.   int newTrack;
  442.   if (cdr != NULL) {
  443.     if (cdr->bigEndianSamples() == 0) {
  444.       // swap samples for little endian recorders
  445.       swap = !swap;
  446.     }
  447.     encodingMode = cdr->encodingMode();
  448.   }
  449.   message(3, "Swap: %d", swap);
  450.   TrackIterator itr(toc);
  451.   TrackReader reader;
  452.   track = itr.first(tstart, tend);
  453.   reader.init(track);
  454.   if (reader.openData() != 0) {
  455.     message(-2, "Opening of track data failed.");
  456.     goto fail;
  457.   }
  458.   newTrack = 1;
  459.   dataMode = (encodingMode == 0) ? TrackData::AUDIO : track->type();
  460.   do {
  461.     n = (length > BUFFER_SIZE ? BUFFER_SIZE : length);
  462.     Buffer &buf = header->buffers[header->buffersRead % header->nofBuffers];
  463.     do {
  464.       rn = reader.readData(encodingMode, lba, buf.buffer, n);
  465.     
  466.       if (rn < 0) {
  467. message(-2, "Reading of track data failed.");
  468. goto fail;
  469.       }
  470.       
  471.       if (rn == 0) {
  472. track = itr.next(tstart, tend);
  473. reader.init(track);
  474. if (reader.openData() != 0) {
  475.   message(-2, "Opening of track data failed.");
  476.   goto fail;
  477. }
  478. trackNr++;
  479. if (encodingMode != 0)
  480.   dataMode = track->type();
  481. newTrack = 1;
  482.       }
  483.     } while (rn == 0);
  484.     lba += rn;
  485.     if (track->type() == TrackData::AUDIO) {
  486.       if (swap) {
  487. // swap audio data
  488. swapSamples((Sample *)(buf.buffer), rn * SAMPLES_PER_BLOCK);
  489.       }
  490.     }
  491.     else {
  492.       if (encodingMode == 0 && cdr != NULL && cdr->bigEndianSamples() == 0) {
  493. // swap encoded data blocks
  494. swapSamples((Sample *)(buf.buffer), rn * SAMPLES_PER_BLOCK);
  495.       }
  496.     }
  497.     // notify writer that it can write buffer
  498.     buf.bufLen = rn;
  499.     buf.mode = dataMode;
  500.     buf.trackMode = track->type();
  501.     if (newTrack) {
  502.       // inform write process that it should print message about new track
  503.       buf.trackNr = trackNr;
  504.     }
  505.     else {
  506.       buf.trackNr = 0;
  507.     }
  508.     header->buffersRead += 1;
  509.     length -= rn;
  510.     if (first > 0) {
  511.       first--;
  512.       if (first == 0 || length == 0) {
  513. message (3, "Buffer filled");
  514. header->buffersFilled = 1;
  515.       }
  516.     }
  517.     
  518.     // wait for writing process to finish writing of previous buffer
  519.     //message(3, "Reader: waiting for Writer.");
  520.     while (header->buffersRead - header->buffersWritten 
  521.    == header->nofBuffers &&
  522.    header->terminateReader == 0) {
  523.       mSleep(20);
  524.     }
  525.     newTrack = 0;
  526.   } while (length > 0 && header->terminateReader == 0);
  527.   header->readerFinished = 1;
  528.   if (header->terminateReader == 0) {
  529.     Buffer &buf1 = header->buffers[header->buffersRead % header->nofBuffers];
  530.     buf1.bufLen = 0;
  531.     buf1.trackNr = 0;
  532.     header->buffersRead += 1;
  533.   }
  534. #ifndef USE_POSIX_THREADS
  535.   // wait until we get killed
  536.   while (1)
  537.     sleep(1000);
  538.   exit(0);
  539. #endif
  540.   return NULL;
  541. fail:
  542.   header->readerTerminated = 1;
  543. #ifndef USE_POSIX_THREADS
  544.   exit(1);
  545. #endif
  546.   return NULL;
  547. }
  548. int writeDiskAtOnce(const Toc *toc, CdrDriver *cdr, int nofBuffers, int swap,
  549.     int testMode, int remoteMode)
  550. {
  551.   long length = toc->length().lba();
  552.   long total = length * AUDIO_BLOCK_LEN;
  553.   int err = 0;
  554.   BufferHeader *header = NULL;
  555.   long nofShmSegments = 0;
  556.   ShmSegment *shmSegments = NULL;
  557.   long startLba = 0;
  558. #if defined(POSIX_SCHEDULING) || defined(LINUX_QNX_SCHEDULING) || 
  559.     defined(USE_POSIX_THREADS)
  560.   struct sched_param schedp;
  561. #endif
  562. #ifdef USE_POSIX_THREADS
  563.   pthread_t readerThread;
  564.   pthread_attr_t readerThreadAttr;
  565.   int threadStarted = 0;
  566. #else
  567.   int pid = 0;
  568.   int status;
  569. #endif
  570. #if 1
  571.   if (nofBuffers < 10) {
  572.     nofBuffers = 10;
  573.     message(-1, "Adjusted number of FIFO buffers to 10.");
  574.   }
  575. #endif
  576.   if (remoteMode) {
  577.     // switch stdout to non blocking IO
  578.     int flags;
  579.     if ((flags = fcntl(3/*fileno(stdout)*/, F_GETFL)) == -1) {
  580.       message(-2, "Cannot get flags of stdout: %s", strerror(errno));
  581.       return 1;
  582.     }
  583.     flags |= O_NONBLOCK;
  584.     if (fcntl(3/*fileno(stdout)*/, F_SETFL, flags) < 0) {
  585.       message(-2, "Cannot set flags of stdout: %s", strerror(errno));
  586.       return 1;
  587.     }
  588.   }
  589.   if (getSharedMemory(nofBuffers, &header, &nofShmSegments,
  590.       &shmSegments)  != 0) {
  591.     releaseSharedMemory(nofShmSegments, shmSegments);
  592.     return 1;
  593.   }
  594.   header->buffersRead = 0;
  595.   header->buffersWritten = 0;
  596.   header->buffersFilled = 0;
  597.   header->readerFinished = 0;
  598.   header->readerTerminated = 0;
  599.   header->terminateReader = 0;
  600.   TERMINATE = 0;
  601.   installSignalHandler(SIGINT, SIG_IGN);
  602.   installSignalHandler(SIGPIPE, SIG_IGN);
  603.   installSignalHandler(SIGALRM, SIG_IGN);
  604.   installSignalHandler(SIGCHLD, terminationRequest);
  605.   installSignalHandler(SIGQUIT, terminationRequest);
  606.   installSignalHandler(SIGTERM, terminationRequest);
  607.   if (!testMode) {
  608.     const DiskInfo *di;
  609.     if (cdr->initDao(toc) != 0) {
  610.       err = 1; goto fail;
  611.     }
  612.     if ((di = cdr->diskInfo()) != NULL) {
  613.       startLba = di->thisSessionLba;
  614.     }
  615.   }
  616.   // start reader process
  617. #ifdef USE_POSIX_THREADS
  618.   if (pthread_attr_init(&readerThreadAttr) != 0) {
  619.     message(-2, "pthread_attr_init failed: %s", strerror(errno));
  620.     err = 1; goto fail;
  621.   }
  622.   
  623. #if defined(POSIX_SCHEDULING) && defined(HAVE_PTHREAD_ATTR_SETSCHEDPOLICY) && 
  624.   defined(HAVE_PTHREAD_ATTR_SETSCHEDPARAM)
  625.   if (geteuid() == 0) {
  626.     if (pthread_attr_setschedpolicy(&readerThreadAttr, SCHED_RR) == 0) {
  627.       struct sched_param schedParam;
  628.       if (pthread_attr_getschedparam(&readerThreadAttr, &schedParam) == 0) {
  629. schedParam.sched_priority = sched_get_priority_max(SCHED_RR) - 4;
  630. if (pthread_attr_setschedparam(&readerThreadAttr, &schedParam) != 0) {
  631.   message(-1, "pthread_attr_setschedparam failed: %s",
  632.   strerror(errno));
  633. }
  634.       }
  635.     }
  636.   }
  637. #else
  638.   message(-1, "Real time scheduling is not available for reader thread.");
  639. #endif
  640.   ReaderArgs rargs;
  641.   rargs.toc = toc;
  642.   rargs.cdr = cdr;
  643.   rargs.swap = swap;
  644.   rargs.header = header;
  645.   rargs.startLba = startLba;
  646.   if (pthread_create(&readerThread, &readerThreadAttr, reader, &rargs) != 0) {
  647.     message(-2, "Cannot create thread: %s", strerror(errno));
  648.     pthread_attr_destroy(&readerThreadAttr);
  649.     err = 1; goto fail;
  650.   }
  651.   else {
  652.     threadStarted = 1;
  653.     
  654.   }
  655. #else /* USE_POSIX_THREADS */
  656.   if ((pid = fork()) == 0) {
  657.     // we are the new process
  658.     setsid(); // detach from controlling terminal
  659. #ifdef LINUX_QNX_SCHEDULING
  660.     if (geteuid() == 0) {
  661.       sched_getparam (0, &schedp);
  662.       schedp.run_q_min = schedp.run_q_max = 1;
  663.       if (sched_setscheduler (0, SCHED_RR, &schedp) != 0) {
  664. message(-1, "Cannot setup real time scheduling: %s", strerror(errno));
  665.       }
  666.     }
  667. #elif defined POSIX_SCHEDULING
  668.     if (geteuid() == 0) {
  669.       sched_getparam (0, &schedp);
  670.       schedp.sched_priority = sched_get_priority_max (SCHED_RR) - 4;
  671.       if (sched_setscheduler (0, SCHED_RR, &schedp) != 0) {
  672. message(-1, "Cannot setup real time scheduling: %s", strerror(errno));
  673.       }
  674.     }
  675. #endif
  676. #ifdef HAVE_MLOCKALL
  677.     if (geteuid() == 0) {
  678.       if (mlockall(MCL_CURRENT|MCL_FUTURE) != 0) {
  679. message(-1, "Cannot lock memory pages: %s", strerror(errno));
  680.       }
  681.       message(3, "Reader process memory locked");
  682.     }
  683. #endif
  684.     ReaderArgs rargs;
  685.     rargs.toc = toc;
  686.     rargs.cdr = cdr;
  687.     rargs.swap = swap;
  688.     rargs.header = header;
  689.     rargs.startLba = startLba;
  690.     reader(&rargs);
  691.   }
  692.   else if (pid < 0) {
  693.     message(-2, "fork failed: %s", strerror(errno));
  694.     err = 1; goto fail;
  695.   }
  696. #endif /* USE_POSIX_THREADS */
  697.   // setup scheduler for writing process/thread
  698. #if defined(USE_POSIX_THREADS)
  699. #if defined(HAVE_PTHREAD_GETSCHEDPARAM) && defined(HAVE_PTHREAD_SETSCHEDPARAM)
  700.   if (geteuid() == 0) {
  701.     int schedPolicy;
  702.     pthread_getschedparam(pthread_self(), &schedPolicy, &schedp);
  703.     schedp.sched_priority = sched_get_priority_max(SCHED_RR) - 5;
  704.     if (pthread_setschedparam(pthread_self(), SCHED_RR, &schedp) != 0) {
  705.       message(-1, "Cannot setup real time scheduling: %s", strerror(errno));
  706.     }
  707.     else {
  708.       message(1, "Using POSIX real time scheduling.");
  709.     }
  710.   }
  711.   else {
  712.     message(-1, "No root permissions for real time scheduling.");
  713.   }
  714. #else
  715.   message(-1, "Real time scheduling is not available.");
  716. #endif
  717. #elif defined(LINUX_QNX_SCHEDULING)
  718.   
  719.   if (geteuid() == 0) {
  720.     sched_getparam (0, &schedp);
  721.     schedp.run_q_min = schedp.run_q_max = 2;
  722.     if (sched_setscheduler (0, SCHED_RR, &schedp) != 0) {
  723.       message(-1, "Cannot setup real time scheduling: %s", strerror(errno));
  724.     }
  725.     else {
  726.       message(1, "Using Linux QNX real time scheduling.");
  727.     }
  728.   }
  729.   else {
  730.     message(-1, "No root permissions for real time scheduling.");
  731.   }
  732. #elif defined POSIX_SCHEDULING
  733.   
  734.   if (geteuid() == 0) {
  735.     sched_getparam (0, &schedp);
  736.     schedp.sched_priority = sched_get_priority_max (SCHED_RR) - 5;
  737.     if (sched_setscheduler (0, SCHED_RR, &schedp) != 0) {
  738.       message(-1, "Cannot setup real time scheduling: %s", strerror(errno));
  739.     }
  740.     else {
  741.       message(1, "Using POSIX real time scheduling.");
  742.     }
  743.   }
  744.   else {
  745.     message(-1, "No root permissions for real time scheduling.");
  746.   }
  747.   
  748. #endif
  749. #ifdef HAVE_MLOCKALL
  750.   if (geteuid() == 0) {
  751.     if (mlockall(MCL_CURRENT|MCL_FUTURE) != 0) {
  752.       message(-1, "Cannot lock memory pages: %s", strerror(errno));
  753.     }
  754.     message(3, "Memory locked");
  755.   }
  756. #endif
  757.   switch (writer(cdr, total, header, startLba, remoteMode)) {
  758.   case 1: // error, reader process terminated abnormally
  759. #ifndef USE_POSIX_THREADS
  760.     pid = 0;
  761. #endif
  762.     err = 1;
  763.     break;
  764.   case 2: // error, reader process must be terminated
  765.     err = 1;
  766.     break;
  767.   }
  768.   if (err != 0 && cdr != NULL)
  769.     cdr->abortDao(); // abort writing process
  770.  fail:
  771. #ifdef HAVE_MUNLOCKALL
  772.   munlockall();
  773. #endif
  774. #ifdef USE_POSIX_THREADS
  775.   if (threadStarted) {
  776.     header->terminateReader = 1;
  777.     if (pthread_join(readerThread, NULL) != 0) {
  778.       message(-2, "pthread_join failed: %s", strerror(errno));
  779.       err = 1;
  780.     }
  781.     pthread_attr_destroy(&readerThreadAttr);
  782.   }
  783. #else
  784.   if (pid != 0) {
  785.     if (kill(pid, SIGKILL) == 0) {
  786.       waitForChild(0, &status);
  787.     }
  788.   }
  789. #endif
  790.   releaseSharedMemory(nofShmSegments, shmSegments);
  791.   installSignalHandler(SIGINT, SIG_DFL);
  792.   installSignalHandler(SIGPIPE, SIG_DFL);
  793.   installSignalHandler(SIGALRM, SIG_DFL);
  794.   installSignalHandler(SIGCHLD, SIG_DFL);
  795.   installSignalHandler(SIGQUIT, SIG_DFL);
  796.   installSignalHandler(SIGTERM, SIG_DFL);
  797.   return err;
  798. }
  799. #ifdef USE_POSIX_THREADS
  800. static int getSharedMemory(long nofBuffers,
  801.    BufferHeader **header, long *nofSegments,
  802.    ShmSegment **shmSegment)
  803. {
  804.   long b;
  805.   long bufferSize = BUFFER_SIZE * AUDIO_BLOCK_LEN;
  806.   *header = NULL;
  807.   *nofSegments = 0;
  808.   *shmSegment = NULL;
  809.   if (nofBuffers <= 0) {
  810.     return 1;
  811.   }
  812.   *shmSegment = new ShmSegment;
  813.   *nofSegments = 1;
  814.   (*shmSegment)->id = -1;
  815.   (*shmSegment)->buffer = new char[sizeof(BufferHeader) +
  816.    nofBuffers * sizeof(Buffer) +
  817.    nofBuffers * bufferSize];
  818.   if ( (*shmSegment)->buffer == NULL) {
  819.     message(-2, "Cannot allocated memory for ring buffer.");
  820.     return 1;
  821.   }
  822.   *header = (BufferHeader*)((*shmSegment)->buffer);
  823.   (*header)->nofBuffers = nofBuffers;
  824.   (*header)->buffers =
  825. (Buffer*)((*shmSegment)->buffer + sizeof(BufferHeader));
  826.   char *bufferBase = (*shmSegment)->buffer + sizeof(BufferHeader) +
  827.                       nofBuffers * sizeof(Buffer);
  828.   for (b = 0; b < nofBuffers; b++)
  829.     (*header)->buffers[b].buffer = bufferBase + b * bufferSize;
  830.   return 0;
  831. }
  832. static void releaseSharedMemory(long nofSegments, ShmSegment *shmSegment)
  833. {
  834.   if (shmSegment == NULL || nofSegments == 0)
  835.     return;
  836.   if (shmSegment->buffer != NULL) {
  837.     delete[] shmSegment->buffer;
  838.     shmSegment->buffer = NULL;
  839.   }
  840.   delete shmSegment;
  841. }
  842. #else /* USE_POSIX_THREADS */
  843. static int getSharedMemory(long nofBuffers,
  844.    BufferHeader **header, long *nofSegments,
  845.    ShmSegment **shmSegments)
  846. {
  847.   long i, b;
  848.   long bufferSize = BUFFER_SIZE * AUDIO_BLOCK_LEN;
  849.   long maxSegmentSize = 0;
  850.   long bcnt = 0;
  851.   *header = NULL;
  852.   *nofSegments = 0;
  853.   *shmSegments = NULL;
  854.   if (nofBuffers <= 0) {
  855.     return 1;
  856.   }
  857. #if defined(linux) && defined(IPC_INFO)
  858.   struct shminfo info;
  859.   if (shmctl(0, IPC_INFO, (struct shmid_ds*)&info) < 0) {
  860.     message(-1, "Cannot get IPC info: %s", strerror(errno));
  861.     maxSegmentSize = 4 * 1024 * 1024;
  862.     message(-1, "Assuming %ld MB shared memory segment size.",
  863.     maxSegmentSize >> 20);
  864.   }
  865.   else {
  866.     maxSegmentSize = info.shmmax;
  867.   }
  868. #elif defined(__FreeBSD__)
  869.   maxSegmentSize = 4 * 1024 * 1024; // 4 MB
  870. #else
  871.   maxSegmentSize = 1 * 1024 * 1024; // 1 MB
  872. #endif
  873.   message(3, "Shm max segement size: %ld (%ld MB)", maxSegmentSize,
  874.   maxSegmentSize >> 20);
  875.   if (maxSegmentSize < sizeof(BufferHeader) + nofBuffers * sizeof(Buffer)) {
  876.     message(-2, "Shared memory segment cannot hold a single buffer.");
  877.     return 1;
  878.   }
  879.   maxSegmentSize -= sizeof(BufferHeader) + nofBuffers * sizeof(Buffer);
  880.   long buffersPerSegment = maxSegmentSize / bufferSize;
  881.   if (buffersPerSegment == 0) {
  882.     message(-2, "Shared memory segment cannot hold a single buffer.");
  883.     return 1;
  884.   }
  885.   *nofSegments = nofBuffers / buffersPerSegment;
  886.   if (nofBuffers % buffersPerSegment != 0)
  887.     *nofSegments += 1;
  888.   *shmSegments = new ShmSegment[*nofSegments];
  889.   message(3, "Using %ld shared memory segments.", *nofSegments);
  890.   for (i = 0; i < *nofSegments; i++) {
  891.     (*shmSegments)[i].id = -1;
  892.     (*shmSegments)[i].buffer = NULL;
  893.   }
  894.   long bufCnt = nofBuffers;
  895.   long n;
  896.   long segmentLength;
  897.   char *bufferBase;
  898.   for (i = 0; i < *nofSegments; i++) {
  899.     n = (bufCnt > buffersPerSegment ? buffersPerSegment : bufCnt);
  900.     segmentLength = n * bufferSize;
  901.     if (*header == NULL) {
  902.       // first segment contains the buffer header
  903.       segmentLength += sizeof(BufferHeader) + nofBuffers * sizeof(Buffer);
  904.     }
  905.     (*shmSegments)[i].id = shmget(IPC_PRIVATE, segmentLength, 0600);
  906.     if ((*shmSegments)[i].id < 0) {
  907.       message(-2, "Cannot create shared memory segment: %s",
  908.       strerror(errno));
  909.       message(-2, "Try to reduce the buffer count (option --buffers).");
  910.       return 1;
  911.     }
  912.     (*shmSegments)[i].buffer = (char *)shmat((*shmSegments)[i].id, 0, 0);
  913.     if (((*shmSegments)[i].buffer) == NULL ||
  914. ((*shmSegments)[i].buffer) == (char *)-1) {
  915.       (*shmSegments)[i].buffer = NULL;
  916.       message(-2, "Cannot get shared memory: %s", strerror(errno));
  917.       message(-2, "Try to reduce the buffer count (option --buffers).");
  918.       return 1;
  919.     }
  920.     
  921.     if (*header == NULL) {
  922.       bufferBase = (*shmSegments)[i].buffer + sizeof(BufferHeader) +
  923.            nofBuffers * sizeof(Buffer);
  924.       *header = (BufferHeader*)(*shmSegments)[i].buffer;
  925.       (*header)->nofBuffers = nofBuffers;
  926.       (*header)->buffers =
  927. (Buffer*)((*shmSegments)[i].buffer + sizeof(BufferHeader));
  928.     }
  929.     else {
  930.       bufferBase = (*shmSegments)[i].buffer;
  931.     }
  932.     for (b = 0; b < n; b++)
  933.       (*header)->buffers[bcnt++].buffer = bufferBase + b * bufferSize;
  934.     bufCnt -= n;
  935.   }
  936.   assert(bcnt == nofBuffers);
  937.   return 0;
  938. }
  939. static void releaseSharedMemory(long nofSegments, ShmSegment *shmSegments)
  940. {
  941.   long i;
  942.   if (shmSegments == NULL || nofSegments == 0)
  943.     return;
  944.   for (i = 0; i < nofSegments; i++) {
  945.     if (shmSegments[i].id >= 0) {
  946.       if (shmSegments[i].buffer != NULL) {
  947. if (shmdt(shmSegments[i].buffer) != 0) {
  948.   message(-2, "shmdt: %s", strerror(errno));
  949. }
  950.       }
  951.       if (shmctl(shmSegments[i].id, IPC_RMID, NULL) != 0) {
  952. message(-2, "Cannot remove shared memory: %s", strerror(errno));
  953.       }
  954.     }
  955.   }
  956.   delete[] shmSegments;
  957. }
  958. #endif /* USE_POSIX_THREADS */