MessQueue.cpp
上传用户:woshihumen
上传日期:2013-07-18
资源大小:484k
文件大小:40k
源码类别:

Email服务器

开发平台:

Visual C++

  1. /*
  2.  *  XMail by Davide Libenzi ( Intranet and Internet mail server )
  3.  *  Copyright (C) 1999,..,2004  Davide Libenzi
  4.  *
  5.  *  This program is free software; you can redistribute it and/or modify
  6.  *  it under the terms of the GNU General Public License as published by
  7.  *  the Free Software Foundation; either version 2 of the License, or
  8.  *  (at your option) any later version.
  9.  *
  10.  *  This program is distributed in the hope that it will be useful,
  11.  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
  12.  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  13.  *  GNU General Public License for more details.
  14.  *
  15.  *  You should have received a copy of the GNU General Public License
  16.  *  along with this program; if not, write to the Free Software
  17.  *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
  18.  *
  19.  *  Davide Libenzi <davidel@xmailserver.org>
  20.  *
  21.  */
  22. #include "SysInclude.h"
  23. #include "SysDep.h"
  24. #include "SvrDefines.h"
  25. #include "ShBlocks.h"
  26. #include "ResLocks.h"
  27. #include "StrUtils.h"
  28. #include "SList.h"
  29. #include "MD5.h"
  30. #include "Base64Enc.h"
  31. #include "BuffSock.h"
  32. #include "MailConfig.h"
  33. #include "UsrUtils.h"
  34. #include "SvrUtils.h"
  35. #include "AppDefines.h"
  36. #include "MiscUtils.h"
  37. #include "SMTPUtils.h"
  38. #include "MessQueue.h"
  39. #include "SMAILUtils.h"
  40. #define QUEF_SHUTDOWN               (1 << 0)
  41. #define QUMF_DELETED                (1 << 0)
  42. #define QUMF_FREEZE                 (1 << 1)
  43. #define QUE_MASK_TMPFLAGS(v)        ((v) & ~(QUMF_DELETED | QUMF_FREEZE))
  44. #define QUE_ARENA_SCAN_INTERVAL     15
  45. #define QUE_ARENA_SCAN_WAIT         2
  46. #define QUE_SCAN_THREAD_MAXWAIT     60
  47. struct MessageQueue {
  48. SysListHead ReadyQueue;
  49. SysListHead RsndArenaQueue;
  50. int iReadyCount;
  51. int iRsndArenaCount;
  52. SYS_MUTEX hMutex;
  53. SYS_EVENT hReadyEvent;
  54. char *pszRootPath;
  55. int iMaxRetry;
  56. int iRetryTimeout;
  57. int iRetryIncrRatio;
  58. int iNumDirsLevel;
  59. unsigned long ulFlags;
  60. SYS_THREAD hRsndScanThread;
  61. };
  62. struct QueueMessage {
  63. SysListHead LLink;
  64. int iLevel1;
  65. int iLevel2;
  66. char const *pszQueueDir;
  67. char *pszFileName;
  68. int iNumTries;
  69. time_t tLastTry;
  70. unsigned long ulFlags;
  71. };
  72. static int QueCreateStruct(char const *pszRootPath);
  73. static int QueLoad(MessageQueue * pMQ);
  74. static int QueLoadMessages(MessageQueue * pMQ, int iLevel1, int iLevel2);
  75. static QueueMessage *QueAllocMessage(int iLevel1, int iLevel2, char const *pszQueueDir,
  76.      char const *pszFileName, int iNumTries, time_t tLastTry);
  77. static int QueFreeMessage(QueueMessage * pQM);
  78. static int QueFreeMessList(SysListHead * pHead);
  79. static int QueLoadMessageStat(MessageQueue * pMQ, QueueMessage * pQM);
  80. static int QueStatMessage(MessageQueue * pMQ, QueueMessage * pQM);
  81. static int QueGetFilePath(MessageQueue * pMQ, QueueMessage * pQM, char *pszFilePath,
  82.   char const *pszQueueDir = NULL);
  83. static int QueDoMessageCleanup(QUEUE_HANDLE hQueue, QMSG_HANDLE hMessage);
  84. static int QueAddNew(MessageQueue * pMQ, QueueMessage * pQM);
  85. static bool QueMessageExpired(MessageQueue * pMQ, QueueMessage * pQM);
  86. static time_t QueNextRetryOp(int iNumTries, unsigned int uRetryTimeout,
  87.      unsigned int uRetryIncrRatio);
  88. static bool QueMessageReadyToSend(MessageQueue * pMQ, QueueMessage * pQM);
  89. static int QueAddRsnd(MessageQueue * pMQ, QueueMessage * pQM);
  90. static unsigned int QueRsndThread(void *pThreadData);
  91. static int QueScanRsndArena(MessageQueue * pMQ);
  92. static bool QueMessageDestMatch(MessageQueue * pMQ, QueueMessage * pQM,
  93. char const *pszAddressMatch);
  94. QUEUE_HANDLE QueOpen(char const *pszRootPath, int iMaxRetry, int iRetryTimeout,
  95.      int iRetryIncrRatio, int iNumDirsLevel)
  96. {
  97. MessageQueue *pMQ = (MessageQueue *) SysAlloc(sizeof(MessageQueue));
  98. if (pMQ == NULL)
  99. return (INVALID_QUEUE_HANDLE);
  100. SYS_INIT_LIST_HEAD(&pMQ->ReadyQueue);
  101. SYS_INIT_LIST_HEAD(&pMQ->RsndArenaQueue);
  102. pMQ->iReadyCount = 0;
  103. pMQ->iRsndArenaCount = 0;
  104. pMQ->iMaxRetry = iMaxRetry;
  105. pMQ->iRetryTimeout = iRetryTimeout;
  106. pMQ->iRetryIncrRatio = iRetryIncrRatio;
  107. pMQ->iNumDirsLevel = iNumDirsLevel;
  108. pMQ->ulFlags = 0;
  109. if ((pMQ->hMutex = SysCreateMutex()) == SYS_INVALID_MUTEX) {
  110. SysFree(pMQ);
  111. return (INVALID_QUEUE_HANDLE);
  112. }
  113. if ((pMQ->hReadyEvent = SysCreateEvent(1)) == SYS_INVALID_EVENT) {
  114. SysCloseMutex(pMQ->hMutex);
  115. SysFree(pMQ);
  116. return (INVALID_QUEUE_HANDLE);
  117. }
  118. ///////////////////////////////////////////////////////////////////////////////
  119. //  Set the queue root path
  120. ///////////////////////////////////////////////////////////////////////////////
  121. char szRootPath[SYS_MAX_PATH] = "";
  122. StrSNCpy(szRootPath, pszRootPath);
  123. AppendSlash(szRootPath);
  124. pMQ->pszRootPath = SysStrDup(szRootPath);
  125. ///////////////////////////////////////////////////////////////////////////////
  126. //  Load queue status
  127. ///////////////////////////////////////////////////////////////////////////////
  128. if (QueLoad(pMQ) < 0) {
  129. ErrorPush();
  130. SysFree(pMQ->pszRootPath);
  131. SysCloseEvent(pMQ->hReadyEvent);
  132. SysCloseMutex(pMQ->hMutex);
  133. SysFree(pMQ);
  134. ErrSetErrorCode(ErrorPop());
  135. return (INVALID_QUEUE_HANDLE);
  136. }
  137. ///////////////////////////////////////////////////////////////////////////////
  138. //  Start rsnd arena scan thread
  139. ///////////////////////////////////////////////////////////////////////////////
  140. if ((pMQ->hRsndScanThread = SysCreateThread(QueRsndThread, pMQ)) == SYS_INVALID_THREAD) {
  141. ErrorPush();
  142. QueFreeMessList(&pMQ->ReadyQueue);
  143. QueFreeMessList(&pMQ->RsndArenaQueue);
  144. SysFree(pMQ->pszRootPath);
  145. SysCloseEvent(pMQ->hReadyEvent);
  146. SysCloseMutex(pMQ->hMutex);
  147. SysFree(pMQ);
  148. ErrSetErrorCode(ErrorPop());
  149. return (INVALID_QUEUE_HANDLE);
  150. }
  151. return ((QUEUE_HANDLE) pMQ);
  152. }
  153. int QueClose(QUEUE_HANDLE hQueue)
  154. {
  155. MessageQueue *pMQ = (MessageQueue *) hQueue;
  156. ///////////////////////////////////////////////////////////////////////////////
  157. //  Set the shutdown flag and wait for rsnd scan thread to terminate
  158. ///////////////////////////////////////////////////////////////////////////////
  159. pMQ->ulFlags |= QUEF_SHUTDOWN;
  160. SysWaitThread(pMQ->hRsndScanThread, QUE_SCAN_THREAD_MAXWAIT);
  161. SysCloseThread(pMQ->hRsndScanThread, 1);
  162. ///////////////////////////////////////////////////////////////////////////////
  163. //  Clear queues
  164. ///////////////////////////////////////////////////////////////////////////////
  165. QueFreeMessList(&pMQ->ReadyQueue);
  166. QueFreeMessList(&pMQ->RsndArenaQueue);
  167. SysCloseEvent(pMQ->hReadyEvent);
  168. SysCloseMutex(pMQ->hMutex);
  169. SysFree(pMQ->pszRootPath);
  170. SysFree(pMQ);
  171. return (0);
  172. }
  173. int QueGetDirsLevel(QUEUE_HANDLE hQueue)
  174. {
  175. MessageQueue *pMQ = (MessageQueue *) hQueue;
  176. return (pMQ->iNumDirsLevel);
  177. }
  178. char const *QueGetRootPath(QUEUE_HANDLE hQueue)
  179. {
  180. MessageQueue *pMQ = (MessageQueue *) hQueue;
  181. return (pMQ->pszRootPath);
  182. }
  183. static int QueCreateStruct(char const *pszRootPath)
  184. {
  185. ///////////////////////////////////////////////////////////////////////////////
  186. //  Create message dir ( new messages queue )
  187. ///////////////////////////////////////////////////////////////////////////////
  188. char szDirPath[SYS_MAX_PATH] = "";
  189. StrSNCpy(szDirPath, pszRootPath);
  190. AppendSlash(szDirPath);
  191. StrSNCat(szDirPath, QUEUE_MESS_DIR);
  192. if (!SysExistDir(szDirPath) && (SysMakeDir(szDirPath) < 0))
  193. return (ErrGetErrorCode());
  194. ///////////////////////////////////////////////////////////////////////////////
  195. //  Create message resend dir ( resend messages queue )
  196. ///////////////////////////////////////////////////////////////////////////////
  197. StrSNCpy(szDirPath, pszRootPath);
  198. AppendSlash(szDirPath);
  199. StrSNCat(szDirPath, QUEUE_RSND_DIR);
  200. if (!SysExistDir(szDirPath) && (SysMakeDir(szDirPath) < 0))
  201. return (ErrGetErrorCode());
  202. ///////////////////////////////////////////////////////////////////////////////
  203. //  Create info dir
  204. ///////////////////////////////////////////////////////////////////////////////
  205. StrSNCpy(szDirPath, pszRootPath);
  206. AppendSlash(szDirPath);
  207. StrSNCat(szDirPath, QUEUE_INFO_DIR);
  208. if (!SysExistDir(szDirPath) && (SysMakeDir(szDirPath) < 0))
  209. return (ErrGetErrorCode());
  210. ///////////////////////////////////////////////////////////////////////////////
  211. //  Create temp dir
  212. ///////////////////////////////////////////////////////////////////////////////
  213. StrSNCpy(szDirPath, pszRootPath);
  214. AppendSlash(szDirPath);
  215. StrSNCat(szDirPath, QUEUE_TEMP_DIR);
  216. if (!SysExistDir(szDirPath) && (SysMakeDir(szDirPath) < 0))
  217. return (ErrGetErrorCode());
  218. ///////////////////////////////////////////////////////////////////////////////
  219. //  Create send log dir
  220. ///////////////////////////////////////////////////////////////////////////////
  221. StrSNCpy(szDirPath, pszRootPath);
  222. AppendSlash(szDirPath);
  223. StrSNCat(szDirPath, QUEUE_SLOG_DIR);
  224. if (!SysExistDir(szDirPath) && (SysMakeDir(szDirPath) < 0))
  225. return (ErrGetErrorCode());
  226. ///////////////////////////////////////////////////////////////////////////////
  227. //  Create custom message processing dir
  228. ///////////////////////////////////////////////////////////////////////////////
  229. StrSNCpy(szDirPath, pszRootPath);
  230. AppendSlash(szDirPath);
  231. StrSNCat(szDirPath, QUEUE_CUST_DIR);
  232. if (!SysExistDir(szDirPath) && (SysMakeDir(szDirPath) < 0))
  233. return (ErrGetErrorCode());
  234. ///////////////////////////////////////////////////////////////////////////////
  235. //  Create user custom message processing dir ( mailproc.tab cache )
  236. ///////////////////////////////////////////////////////////////////////////////
  237. StrSNCpy(szDirPath, pszRootPath);
  238. AppendSlash(szDirPath);
  239. StrSNCat(szDirPath, QUEUE_MPRC_DIR);
  240. if (!SysExistDir(szDirPath) && (SysMakeDir(szDirPath) < 0))
  241. return (ErrGetErrorCode());
  242. ///////////////////////////////////////////////////////////////////////////////
  243. //  Create frozen dir
  244. ///////////////////////////////////////////////////////////////////////////////
  245. StrSNCpy(szDirPath, pszRootPath);
  246. AppendSlash(szDirPath);
  247. StrSNCat(szDirPath, QUEUE_FROZ_DIR);
  248. if (!SysExistDir(szDirPath) && (SysMakeDir(szDirPath) < 0))
  249. return (ErrGetErrorCode());
  250. return (0);
  251. }
  252. static int QueLoad(MessageQueue * pMQ)
  253. {
  254. char szCurrPath[SYS_MAX_PATH] = "";
  255. for (int ii = 0; ii < pMQ->iNumDirsLevel; ii++) {
  256. SysSNPrintf(szCurrPath, sizeof(szCurrPath) - 1, "%s%d", pMQ->pszRootPath, ii);
  257. if (!SysExistDir(szCurrPath) && (SysMakeDir(szCurrPath) < 0))
  258. return (ErrGetErrorCode());
  259. for (int jj = 0; jj < pMQ->iNumDirsLevel; jj++) {
  260. SysSNPrintf(szCurrPath, sizeof(szCurrPath) - 1, "%s%d%s%d",
  261.     pMQ->pszRootPath, ii, SYS_SLASH_STR, jj);
  262. if (!SysExistDir(szCurrPath) && (SysMakeDir(szCurrPath) < 0))
  263. return (ErrGetErrorCode());
  264. if (QueCreateStruct(szCurrPath) < 0)
  265. return (ErrGetErrorCode());
  266. ///////////////////////////////////////////////////////////////////////////////
  267. //  Load queue directory
  268. ///////////////////////////////////////////////////////////////////////////////
  269. if (QueLoadMessages(pMQ, ii, jj) < 0)
  270. return (ErrGetErrorCode());
  271. }
  272. }
  273. return (0);
  274. }
  275. static int QueLoadMessages(MessageQueue * pMQ, int iLevel1, int iLevel2)
  276. {
  277. ///////////////////////////////////////////////////////////////////////////////
  278. //  File scan the new messages dir
  279. ///////////////////////////////////////////////////////////////////////////////
  280. char szDirPath[SYS_MAX_PATH] = "";
  281. SysSNPrintf(szDirPath, sizeof(szDirPath) - 1, "%s%d%s%d%s%s",
  282.     pMQ->pszRootPath, iLevel1, SYS_SLASH_STR, iLevel2,
  283.     SYS_SLASH_STR, QUEUE_MESS_DIR);
  284. char szMsgFileName[SYS_MAX_PATH] = "";
  285. FSCAN_HANDLE hFileScan = MscFirstFile(szDirPath, 0, szMsgFileName);
  286. if (hFileScan != INVALID_FSCAN_HANDLE) {
  287. do {
  288. if (!IsDotFilename(szMsgFileName)) {
  289. QueueMessage *pQM =
  290.     QueAllocMessage(iLevel1, iLevel2, QUEUE_MESS_DIR,
  291.     szMsgFileName, 0, 0);
  292. if (pQM != NULL) {
  293. ///////////////////////////////////////////////////////////////////////////////
  294. //  Add the file to the message queue
  295. ///////////////////////////////////////////////////////////////////////////////
  296. SYS_LIST_ADDT(&pQM->LLink, &pMQ->ReadyQueue);
  297. ++pMQ->iReadyCount;
  298. }
  299. }
  300. } while (MscNextFile(hFileScan, szMsgFileName));
  301. MscCloseFindFile(hFileScan);
  302. ///////////////////////////////////////////////////////////////////////////////
  303. //  Set the mess event if the queue is not empty
  304. ///////////////////////////////////////////////////////////////////////////////
  305. if (pMQ->iReadyCount > 0)
  306. SysSetEvent(pMQ->hReadyEvent);
  307. }
  308. ///////////////////////////////////////////////////////////////////////////////
  309. //  File scan the resend messages dir
  310. ///////////////////////////////////////////////////////////////////////////////
  311. SysSNPrintf(szDirPath, sizeof(szDirPath) - 1, "%s%d%s%d%s%s",
  312.     pMQ->pszRootPath, iLevel1, SYS_SLASH_STR, iLevel2,
  313.     SYS_SLASH_STR, QUEUE_RSND_DIR);
  314. if ((hFileScan = MscFirstFile(szDirPath, 0, szMsgFileName)) != INVALID_FSCAN_HANDLE) {
  315. do {
  316. if (!IsDotFilename(szMsgFileName)) {
  317. QueueMessage *pQM =
  318.     QueAllocMessage(iLevel1, iLevel2, QUEUE_RSND_DIR,
  319.     szMsgFileName, 0, 0);
  320. if (pQM != NULL) {
  321. ///////////////////////////////////////////////////////////////////////////////
  322. //  Load message statistics
  323. ///////////////////////////////////////////////////////////////////////////////
  324. if (QueLoadMessageStat(pMQ, pQM) < 0) {
  325. SysLogMessage(LOG_LEV_ERROR,
  326.       "Error loading queue file: '%s%d%s%d%s%s%s%s'n",
  327.       pMQ->pszRootPath, iLevel1,
  328.       SYS_SLASH_STR, iLevel2,
  329.       SYS_SLASH_STR, QUEUE_RSND_DIR,
  330.       SYS_SLASH_STR, szMsgFileName);
  331. QueFreeMessage(pQM);
  332. } else {
  333. ///////////////////////////////////////////////////////////////////////////////
  334. //  Add the file to the resend queue
  335. ///////////////////////////////////////////////////////////////////////////////
  336. SYS_LIST_ADDT(&pQM->LLink, &pMQ->RsndArenaQueue);
  337. ++pMQ->iRsndArenaCount;
  338. }
  339. }
  340. }
  341. } while (MscNextFile(hFileScan, szMsgFileName));
  342. MscCloseFindFile(hFileScan);
  343. }
  344. return (0);
  345. }
  346. static QueueMessage *QueAllocMessage(int iLevel1, int iLevel2, char const *pszQueueDir,
  347.      char const *pszFileName, int iNumTries, time_t tLastTry)
  348. {
  349. QueueMessage *pQM = (QueueMessage *) SysAlloc(sizeof(QueueMessage));
  350. if (pQM == NULL)
  351. return (NULL);
  352. SYS_INIT_LIST_LINK(&pQM->LLink);
  353. pQM->iLevel1 = iLevel1;
  354. pQM->iLevel2 = iLevel2;
  355. pQM->pszQueueDir = pszQueueDir;
  356. pQM->pszFileName = SysStrDup(pszFileName);
  357. pQM->iNumTries = iNumTries;
  358. pQM->tLastTry = tLastTry;
  359. pQM->ulFlags = 0;
  360. return (pQM);
  361. }
  362. static int QueFreeMessage(QueueMessage * pQM)
  363. {
  364. SysFree(pQM->pszFileName);
  365. SysFree(pQM);
  366. return (0);
  367. }
  368. static int QueFreeMessList(SysListHead * pHead)
  369. {
  370. SysListHead *pLLink;
  371. while ((pLLink = SYS_LIST_FIRST(pHead)) != NULL) {
  372. QueueMessage *pQM = SYS_LIST_ENTRY(pLLink, QueueMessage, LLink);
  373. SYS_LIST_DEL(pLLink);
  374. QueFreeMessage(pQM);
  375. }
  376. return (0);
  377. }
  378. char *QueLoadLastLogEntry(char const *pszLogFilePath)
  379. {
  380. FILE *pLogFile = fopen(pszLogFilePath, "rb");
  381. if (pLogFile == NULL) {
  382. ErrSetErrorCode(ERR_FILE_OPEN, pszLogFilePath);
  383. return (NULL);
  384. }
  385. ///////////////////////////////////////////////////////////////////////////////
  386. //  Walk log entries
  387. ///////////////////////////////////////////////////////////////////////////////
  388. unsigned long ulCurrOffset = 0;
  389. unsigned long ulBaseOffset = (unsigned long) -1;
  390. unsigned long ulEndOffset;
  391. unsigned long ulPeekTime;
  392. char szLogLine[1024] = "";
  393. for (;;) {
  394. ulCurrOffset = (unsigned long) ftell(pLogFile);
  395. if (MscFGets(szLogLine, sizeof(szLogLine) - 1, pLogFile) == NULL)
  396. break;
  397. if (sscanf(szLogLine, "[PeekTime] %lu", &ulPeekTime) == 1)
  398. ulBaseOffset = ulCurrOffset;
  399. }
  400. if (ulBaseOffset == (unsigned long) -1) {
  401. fclose(pLogFile);
  402. ErrSetErrorCode(ERR_EMPTY_LOG, pszLogFilePath);
  403. return (NULL);
  404. }
  405. ///////////////////////////////////////////////////////////////////////////////
  406. //  Get end offset ( end of file )
  407. ///////////////////////////////////////////////////////////////////////////////
  408. fseek(pLogFile, 0, SEEK_END);
  409. ulEndOffset = (unsigned long) ftell(pLogFile);
  410. ///////////////////////////////////////////////////////////////////////////////
  411. //  Load last entry
  412. ///////////////////////////////////////////////////////////////////////////////
  413. unsigned int uEntrySize = (unsigned int) (ulEndOffset - ulBaseOffset);
  414. char *pszEntry = (char *) SysAlloc(uEntrySize + 1);
  415. if (pszEntry == NULL) {
  416. fclose(pLogFile);
  417. return (NULL);
  418. }
  419. fseek(pLogFile, ulBaseOffset, SEEK_SET);
  420. if (!fread(pszEntry, uEntrySize, 1, pLogFile)) {
  421. SysFree(pszEntry);
  422. fclose(pLogFile);
  423. ErrSetErrorCode(ERR_FILE_READ, pszLogFilePath);
  424. return (NULL);
  425. }
  426. pszEntry[uEntrySize] = '';
  427. fclose(pLogFile);
  428. return (pszEntry);
  429. }
  430. static int QueLoadMessageStat(MessageQueue * pMQ, QueueMessage * pQM)
  431. {
  432. ///////////////////////////////////////////////////////////////////////////////
  433. //  Build the slog file path
  434. ///////////////////////////////////////////////////////////////////////////////
  435. char szSlogFilePath[SYS_MAX_PATH] = "";
  436. QueGetFilePath(pMQ, pQM, szSlogFilePath, QUEUE_SLOG_DIR);
  437. ///////////////////////////////////////////////////////////////////////////////
  438. //  Try to load message statistics
  439. ///////////////////////////////////////////////////////////////////////////////
  440. FILE *pLogFile = fopen(szSlogFilePath, "rt");
  441. if (pLogFile != NULL) {
  442. int iNumTries = 0;
  443. unsigned long ulLastTime = 0;
  444. unsigned long ulPeekTime;
  445. char szLogLine[1024] = "";
  446. while (MscFGets(szLogLine, sizeof(szLogLine) - 1, pLogFile) != NULL)
  447. if (sscanf(szLogLine, "[PeekTime] %lu", &ulPeekTime) == 1)
  448. ++iNumTries, ulLastTime = ulPeekTime;
  449. fclose(pLogFile);
  450. pQM->iNumTries = iNumTries;
  451. pQM->tLastTry = (time_t) ulLastTime;
  452. }
  453. return (0);
  454. }
  455. static int QueStatMessage(MessageQueue * pMQ, QueueMessage * pQM)
  456. {
  457. ///////////////////////////////////////////////////////////////////////////////
  458. //  Build the slog file path
  459. ///////////////////////////////////////////////////////////////////////////////
  460. char szSlogFilePath[SYS_MAX_PATH] = "";
  461. QueGetFilePath(pMQ, pQM, szSlogFilePath, QUEUE_SLOG_DIR);
  462. FILE *pLogFile = fopen(szSlogFilePath, "a+t");
  463. if (pLogFile == NULL) {
  464. ErrSetErrorCode(ERR_FILE_OPEN, szSlogFilePath);
  465. return (ERR_FILE_OPEN);
  466. }
  467. ///////////////////////////////////////////////////////////////////////////////
  468. //  Dump peek time
  469. ///////////////////////////////////////////////////////////////////////////////
  470. time_t tCurr = time(NULL);
  471. char szTime[128] = "";
  472. MscGetTimeStr(szTime, sizeof(szTime) - 1, tCurr);
  473. fprintf(pLogFile, "[PeekTime] %lu : %sn", (unsigned long) tCurr, szTime);
  474. fclose(pLogFile);
  475. return (0);
  476. }
  477. QMSG_HANDLE QueCreateMessage(QUEUE_HANDLE hQueue)
  478. {
  479. MessageQueue *pMQ = (MessageQueue *) hQueue;
  480. ///////////////////////////////////////////////////////////////////////////////
  481. //  Initialize random number generator
  482. ///////////////////////////////////////////////////////////////////////////////
  483. SRand();
  484. ///////////////////////////////////////////////////////////////////////////////
  485. //  Build message file path
  486. ///////////////////////////////////////////////////////////////////////////////
  487. int iLevel1 = rand() % pMQ->iNumDirsLevel;
  488. int iLevel2 = rand() % pMQ->iNumDirsLevel;
  489. char szSubPath[SYS_MAX_PATH] = "";
  490. char szMsgFilePath[SYS_MAX_PATH] = "";
  491. SysSNPrintf(szSubPath, sizeof(szSubPath) - 1, "%s%d%s%d%s%s",
  492.     pMQ->pszRootPath, iLevel1, SYS_SLASH_STR,
  493.     iLevel2, SYS_SLASH_STR, QUEUE_TEMP_DIR);
  494. if (MscUniqueFile(szSubPath, szMsgFilePath) < 0)
  495. return (INVALID_QMSG_HANDLE);
  496. ///////////////////////////////////////////////////////////////////////////////
  497. //  Extract file name
  498. ///////////////////////////////////////////////////////////////////////////////
  499. char szMsgFileName[SYS_MAX_PATH] = "";
  500. MscGetFileName(szMsgFilePath, szMsgFileName);
  501. ///////////////////////////////////////////////////////////////////////////////
  502. //  Create queue message data
  503. ///////////////////////////////////////////////////////////////////////////////
  504. QueueMessage *pQM = QueAllocMessage(iLevel1, iLevel2, QUEUE_TEMP_DIR,
  505.     szMsgFileName, 0, 0);
  506. if (pQM == NULL)
  507. return (INVALID_QMSG_HANDLE);
  508. return ((QMSG_HANDLE) pQM);
  509. }
  510. static int QueGetFilePath(MessageQueue * pMQ, QueueMessage * pQM, char *pszFilePath,
  511.   char const *pszQueueDir)
  512. {
  513. if (pszQueueDir == NULL)
  514. pszQueueDir = pQM->pszQueueDir;
  515. SysSNPrintf(pszFilePath, SYS_MAX_PATH - 1, "%s%d%s%d%s%s%s%s",
  516.     pMQ->pszRootPath, pQM->iLevel1, SYS_SLASH_STR,
  517.     pQM->iLevel2, SYS_SLASH_STR, pszQueueDir, SYS_SLASH_STR, pQM->pszFileName);
  518. return (0);
  519. }
  520. int QueGetFilePath(QUEUE_HANDLE hQueue, QMSG_HANDLE hMessage, char *pszFilePath,
  521.    char const *pszQueueDir)
  522. {
  523. MessageQueue *pMQ = (MessageQueue *) hQueue;
  524. QueueMessage *pQM = (QueueMessage *) hMessage;
  525. return (QueGetFilePath(pMQ, pQM, pszFilePath, pszQueueDir));
  526. }
  527. int QueCloseMessage(QUEUE_HANDLE hQueue, QMSG_HANDLE hMessage)
  528. {
  529. MessageQueue *pMQ = (MessageQueue *) hQueue;
  530. QueueMessage *pQM = (QueueMessage *) hMessage;
  531. if (pQM->ulFlags & QUMF_DELETED)
  532. QueDoMessageCleanup(hQueue, hMessage);
  533. QueFreeMessage(pQM);
  534. return (0);
  535. }
  536. QMSG_HANDLE QueGetHandle(QUEUE_HANDLE hQueue, int iLevel1, int iLevel2, char const *pszQueueDir,
  537.  char const *pszFileName)
  538. {
  539. MessageQueue *pMQ = (MessageQueue *) hQueue;
  540. QueueMessage *pQM = QueAllocMessage(iLevel1, iLevel2, pszQueueDir, pszFileName, 0, 0);
  541. if (pQM == NULL)
  542. return (INVALID_QMSG_HANDLE);
  543. ///////////////////////////////////////////////////////////////////////////////
  544. //  Load message statistics
  545. ///////////////////////////////////////////////////////////////////////////////
  546. QueLoadMessageStat(pMQ, pQM);
  547. return ((QMSG_HANDLE) pQM);
  548. }
  549. char const *QueGetFileName(QMSG_HANDLE hMessage)
  550. {
  551. QueueMessage *pQM = (QueueMessage *) hMessage;
  552. return (pQM->pszFileName);
  553. }
  554. int QueGetLevel1(QMSG_HANDLE hMessage)
  555. {
  556. QueueMessage *pQM = (QueueMessage *) hMessage;
  557. return (pQM->iLevel1);
  558. }
  559. int QueGetLevel2(QMSG_HANDLE hMessage)
  560. {
  561. QueueMessage *pQM = (QueueMessage *) hMessage;
  562. return (pQM->iLevel2);
  563. }
  564. int QueGetTryCount(QMSG_HANDLE hMessage)
  565. {
  566. QueueMessage *pQM = (QueueMessage *) hMessage;
  567. return (pQM->iNumTries);
  568. }
  569. time_t QueGetLastTryTime(QMSG_HANDLE hMessage)
  570. {
  571. QueueMessage *pQM = (QueueMessage *) hMessage;
  572. return (pQM->tLastTry);
  573. }
  574. time_t QueGetMessageNextOp(QUEUE_HANDLE hQueue, QMSG_HANDLE hMessage)
  575. {
  576. MessageQueue *pMQ = (MessageQueue *) hQueue;
  577. QueueMessage *pQM = (QueueMessage *) hMessage;
  578. return (pQM->tLastTry + QueNextRetryOp(pQM->iNumTries, (unsigned int) pMQ->iRetryTimeout,
  579.        (unsigned int) pMQ->iRetryIncrRatio));
  580. }
  581. int QueInitMessageStats(QUEUE_HANDLE hQueue, QMSG_HANDLE hMessage)
  582. {
  583. MessageQueue *pMQ = (MessageQueue *) hQueue;
  584. QueueMessage *pQM = (QueueMessage *) hMessage;
  585. ///////////////////////////////////////////////////////////////////////////////
  586. //  Clean 'slog' file
  587. ///////////////////////////////////////////////////////////////////////////////
  588. char szQueueFilePath[SYS_MAX_PATH] = "";
  589. QueGetFilePath(pMQ, pQM, szQueueFilePath, QUEUE_SLOG_DIR);
  590. CheckRemoveFile(szQueueFilePath);
  591. ///////////////////////////////////////////////////////////////////////////////
  592. //  Init message statistics
  593. ///////////////////////////////////////////////////////////////////////////////
  594. pQM->iNumTries = 0;
  595. pQM->tLastTry = 0;
  596. return (0);
  597. }
  598. static int QueDoMessageCleanup(QUEUE_HANDLE hQueue, QMSG_HANDLE hMessage)
  599. {
  600. MessageQueue *pMQ = (MessageQueue *) hQueue;
  601. QueueMessage *pQM = (QueueMessage *) hMessage;
  602. char szQueueFilePath[SYS_MAX_PATH] = "";
  603. if (pQM->ulFlags & QUMF_FREEZE) {
  604. ///////////////////////////////////////////////////////////////////////////////
  605. //  Move message file
  606. ///////////////////////////////////////////////////////////////////////////////
  607. char szTargetFile[SYS_MAX_PATH] = "";
  608. QueGetFilePath(pMQ, pQM, szQueueFilePath);
  609. QueGetFilePath(pMQ, pQM, szTargetFile, QUEUE_FROZ_DIR);
  610. if (SysMoveFile(szQueueFilePath, szTargetFile) < 0)
  611. return (ErrGetErrorCode());
  612. ///////////////////////////////////////////////////////////////////////////////
  613. //  Change message location
  614. ///////////////////////////////////////////////////////////////////////////////
  615. pQM->pszQueueDir = QUEUE_FROZ_DIR;
  616. } else {
  617. ///////////////////////////////////////////////////////////////////////////////
  618. //  Clean message file
  619. ///////////////////////////////////////////////////////////////////////////////
  620. QueGetFilePath(pMQ, pQM, szQueueFilePath);
  621. CheckRemoveFile(szQueueFilePath);
  622. ///////////////////////////////////////////////////////////////////////////////
  623. //  Clean 'info' file
  624. ///////////////////////////////////////////////////////////////////////////////
  625. QueGetFilePath(pMQ, pQM, szQueueFilePath, QUEUE_INFO_DIR);
  626. CheckRemoveFile(szQueueFilePath);
  627. ///////////////////////////////////////////////////////////////////////////////
  628. //  Clean 'slog' file
  629. ///////////////////////////////////////////////////////////////////////////////
  630. QueGetFilePath(pMQ, pQM, szQueueFilePath, QUEUE_SLOG_DIR);
  631. CheckRemoveFile(szQueueFilePath);
  632. }
  633. ///////////////////////////////////////////////////////////////////////////////
  634. //  Clean 'temp' file
  635. ///////////////////////////////////////////////////////////////////////////////
  636. QueGetFilePath(pMQ, pQM, szQueueFilePath, QUEUE_TEMP_DIR);
  637. CheckRemoveFile(szQueueFilePath);
  638. ///////////////////////////////////////////////////////////////////////////////
  639. //  Clean 'cust' file
  640. ///////////////////////////////////////////////////////////////////////////////
  641. QueGetFilePath(pMQ, pQM, szQueueFilePath, QUEUE_CUST_DIR);
  642. CheckRemoveFile(szQueueFilePath);
  643. ///////////////////////////////////////////////////////////////////////////////
  644. //  Clean 'mprc' file
  645. ///////////////////////////////////////////////////////////////////////////////
  646. QueGetFilePath(pMQ, pQM, szQueueFilePath, QUEUE_MPRC_DIR);
  647. CheckRemoveFile(szQueueFilePath);
  648. return (0);
  649. }
  650. int QueCleanupMessage(QUEUE_HANDLE hQueue, QMSG_HANDLE hMessage, bool bFreeze)
  651. {
  652. MessageQueue *pMQ = (MessageQueue *) hQueue;
  653. QueueMessage *pQM = (QueueMessage *) hMessage;
  654. pQM->ulFlags |= QUMF_DELETED;
  655. if (bFreeze)
  656. pQM->ulFlags |= QUMF_FREEZE;
  657. return (0);
  658. }
  659. int QueCommitMessage(QUEUE_HANDLE hQueue, QMSG_HANDLE hMessage)
  660. {
  661. MessageQueue *pMQ = (MessageQueue *) hQueue;
  662. QueueMessage *pQM = (QueueMessage *) hMessage;
  663. ///////////////////////////////////////////////////////////////////////////////
  664. //  Move message file ( if not in mess )
  665. ///////////////////////////////////////////////////////////////////////////////
  666. if (strcmp(pQM->pszQueueDir, QUEUE_MESS_DIR) != 0) {
  667. char szSourceFile[SYS_MAX_PATH] = "";
  668. char szTargetFile[SYS_MAX_PATH] = "";
  669. QueGetFilePath(pMQ, pQM, szSourceFile);
  670. QueGetFilePath(pMQ, pQM, szTargetFile, QUEUE_MESS_DIR);
  671. if (SysMoveFile(szSourceFile, szTargetFile) < 0)
  672. return (ErrGetErrorCode());
  673. ///////////////////////////////////////////////////////////////////////////////
  674. //  Change message location
  675. ///////////////////////////////////////////////////////////////////////////////
  676. pQM->pszQueueDir = QUEUE_MESS_DIR;
  677. }
  678. ///////////////////////////////////////////////////////////////////////////////
  679. //  Unmask temporary flags
  680. ///////////////////////////////////////////////////////////////////////////////
  681. pQM->ulFlags = QUE_MASK_TMPFLAGS(pQM->ulFlags);
  682. ///////////////////////////////////////////////////////////////////////////////
  683. //  Add to queue
  684. ///////////////////////////////////////////////////////////////////////////////
  685. if (QueAddNew(pMQ, pQM) < 0)
  686. return (ErrGetErrorCode());
  687. return (0);
  688. }
  689. static int QueAddNew(MessageQueue * pMQ, QueueMessage * pQM)
  690. {
  691. ///////////////////////////////////////////////////////////////////////////////
  692. //  Add the queue entry
  693. ///////////////////////////////////////////////////////////////////////////////
  694. if (SysLockMutex(pMQ->hMutex, SYS_INFINITE_TIMEOUT) < 0)
  695. return (ErrGetErrorCode());
  696. SYS_LIST_ADDT(&pQM->LLink, &pMQ->ReadyQueue);
  697. ++pMQ->iReadyCount;
  698. SysSetEvent(pMQ->hReadyEvent);
  699. SysUnlockMutex(pMQ->hMutex);
  700. return (0);
  701. }
  702. static bool QueMessageExpired(MessageQueue * pMQ, QueueMessage * pQM)
  703. {
  704. return (pQM->iNumTries >= pMQ->iMaxRetry);
  705. }
  706. static time_t QueNextRetryOp(int iNumTries, unsigned int uRetryTimeout,
  707.      unsigned int uRetryIncrRatio)
  708. {
  709. unsigned int uNextOp = uRetryTimeout;
  710. if (uRetryIncrRatio != 0)
  711. for (int ii = 1; ii < iNumTries; ii++)
  712. uNextOp += uNextOp / uRetryIncrRatio;
  713. return ((time_t) uNextOp);
  714. }
  715. static bool QueMessageReadyToSend(MessageQueue * pMQ, QueueMessage * pQM)
  716. {
  717. return (time(NULL) > (pQM->tLastTry +
  718.       QueNextRetryOp(pQM->iNumTries, (unsigned int) pMQ->iRetryTimeout,
  719.      (unsigned int) pMQ->iRetryIncrRatio)));
  720. }
  721. int QueResendMessage(QUEUE_HANDLE hQueue, QMSG_HANDLE hMessage)
  722. {
  723. MessageQueue *pMQ = (MessageQueue *) hQueue;
  724. QueueMessage *pQM = (QueueMessage *) hMessage;
  725. ///////////////////////////////////////////////////////////////////////////////
  726. //  Check for message expired
  727. ///////////////////////////////////////////////////////////////////////////////
  728. if (QueMessageExpired(pMQ, pQM)) {
  729. ErrSetErrorCode(ERR_SPOOL_FILE_EXPIRED);
  730. return (ERR_SPOOL_FILE_EXPIRED);
  731. }
  732. ///////////////////////////////////////////////////////////////////////////////
  733. //  Move message file ( if not in rsnd dir )
  734. ///////////////////////////////////////////////////////////////////////////////
  735. if (strcmp(pQM->pszQueueDir, QUEUE_RSND_DIR) != 0) {
  736. char szSourceFile[SYS_MAX_PATH] = "";
  737. char szTargetFile[SYS_MAX_PATH] = "";
  738. QueGetFilePath(pMQ, pQM, szSourceFile);
  739. QueGetFilePath(pMQ, pQM, szTargetFile, QUEUE_RSND_DIR);
  740. if (SysMoveFile(szSourceFile, szTargetFile) < 0)
  741. return (ErrGetErrorCode());
  742. ///////////////////////////////////////////////////////////////////////////////
  743. //  Change message location
  744. ///////////////////////////////////////////////////////////////////////////////
  745. pQM->pszQueueDir = QUEUE_RSND_DIR;
  746. }
  747. ///////////////////////////////////////////////////////////////////////////////
  748. //  Unmask temporary flags
  749. ///////////////////////////////////////////////////////////////////////////////
  750. pQM->ulFlags = QUE_MASK_TMPFLAGS(pQM->ulFlags);
  751. ///////////////////////////////////////////////////////////////////////////////
  752. //  Add to queue
  753. ///////////////////////////////////////////////////////////////////////////////
  754. if (QueAddRsnd(pMQ, pQM) < 0)
  755. return (ErrGetErrorCode());
  756. return (0);
  757. }
  758. static int QueAddRsnd(MessageQueue * pMQ, QueueMessage * pQM)
  759. {
  760. ///////////////////////////////////////////////////////////////////////////////
  761. //  Add the queue entry
  762. ///////////////////////////////////////////////////////////////////////////////
  763. if (SysLockMutex(pMQ->hMutex, SYS_INFINITE_TIMEOUT) < 0)
  764. return (ErrGetErrorCode());
  765. SYS_LIST_ADDT(&pQM->LLink, &pMQ->RsndArenaQueue);
  766. ++pMQ->iRsndArenaCount;
  767. SysUnlockMutex(pMQ->hMutex);
  768. return (0);
  769. }
  770. QMSG_HANDLE QueExtractMessage(QUEUE_HANDLE hQueue, int iTimeout)
  771. {
  772. MessageQueue *pMQ = (MessageQueue *) hQueue;
  773. ///////////////////////////////////////////////////////////////////////////////
  774. //  Wait for message available
  775. ///////////////////////////////////////////////////////////////////////////////
  776. if (SysWaitEvent(pMQ->hReadyEvent, iTimeout) < 0)
  777. return (INVALID_QMSG_HANDLE);
  778. if (SysLockMutex(pMQ->hMutex, SYS_INFINITE_TIMEOUT) < 0)
  779. return (INVALID_QMSG_HANDLE);
  780. ///////////////////////////////////////////////////////////////////////////////
  781. //  Get the first message of the queue
  782. ///////////////////////////////////////////////////////////////////////////////
  783. SysListHead *pLLink = SYS_LIST_FIRST(&pMQ->ReadyQueue);
  784. if (pLLink == NULL) {
  785. SysUnlockMutex(pMQ->hMutex);
  786. return (INVALID_QMSG_HANDLE);
  787. }
  788. ///////////////////////////////////////////////////////////////////////////////
  789. //  Remove the message from the list
  790. ///////////////////////////////////////////////////////////////////////////////
  791. SYS_LIST_DEL(pLLink);
  792. ///////////////////////////////////////////////////////////////////////////////
  793. //  Decrement message count by resetting the event if no more messages are in
  794. ///////////////////////////////////////////////////////////////////////////////
  795. if (--pMQ->iReadyCount == 0)
  796. SysResetEvent(pMQ->hReadyEvent);
  797. SysUnlockMutex(pMQ->hMutex);
  798. ///////////////////////////////////////////////////////////////////////////////
  799. //  Get queue message pointer
  800. ///////////////////////////////////////////////////////////////////////////////
  801. QueueMessage *pQM = SYS_LIST_ENTRY(pLLink, QueueMessage, LLink);
  802. ///////////////////////////////////////////////////////////////////////////////
  803. //  Update message statistics
  804. ///////////////////////////////////////////////////////////////////////////////
  805. ++pQM->iNumTries;
  806. pQM->tLastTry = time(NULL);
  807. ///////////////////////////////////////////////////////////////////////////////
  808. //  Update log file
  809. ///////////////////////////////////////////////////////////////////////////////
  810. QueStatMessage(pMQ, pQM);
  811. return ((QMSG_HANDLE) pQM);
  812. }
  813. static unsigned int QueRsndThread(void *pThreadData)
  814. {
  815. MessageQueue *pMQ = (MessageQueue *) pThreadData;
  816. ///////////////////////////////////////////////////////////////////////////////
  817. //  Scan the rsnd arena to find out ready-to-send messages
  818. ///////////////////////////////////////////////////////////////////////////////
  819. int iElapsedTime = 0;
  820. while ((pMQ->ulFlags & QUEF_SHUTDOWN) == 0) {
  821. SysSleep(QUE_ARENA_SCAN_WAIT);
  822. iElapsedTime += QUE_ARENA_SCAN_WAIT;
  823. if (iElapsedTime > QUE_ARENA_SCAN_INTERVAL) {
  824. iElapsedTime = 0;
  825. ///////////////////////////////////////////////////////////////////////////////
  826. //  Scan rsnd arena to prepare messages to resend
  827. ///////////////////////////////////////////////////////////////////////////////
  828. QueScanRsndArena(pMQ);
  829. }
  830. }
  831. ///////////////////////////////////////////////////////////////////////////////
  832. //  Reset the shutdown flag
  833. ///////////////////////////////////////////////////////////////////////////////
  834. pMQ->ulFlags &= ~QUEF_SHUTDOWN;
  835. return (0);
  836. }
  837. static int QueScanRsndArena(MessageQueue * pMQ)
  838. {
  839. if (SysLockMutex(pMQ->hMutex, SYS_INFINITE_TIMEOUT) < 0)
  840. return (ErrGetErrorCode());
  841. SysListHead *pLLink;
  842. SYS_LIST_FOR_EACH(pLLink, &pMQ->RsndArenaQueue) {
  843. QueueMessage *pQM = SYS_LIST_ENTRY(pLLink, QueueMessage, LLink);
  844. if (QueMessageReadyToSend(pMQ, pQM)) {
  845. ///////////////////////////////////////////////////////////////////////////////
  846. //  Set the list pointer to the next item
  847. ///////////////////////////////////////////////////////////////////////////////
  848. pLLink = pLLink->pPrev;
  849. ///////////////////////////////////////////////////////////////////////////////
  850. //  Remove item from resend arena
  851. ///////////////////////////////////////////////////////////////////////////////
  852. SYS_LIST_DEL(&pQM->LLink);
  853. --pMQ->iRsndArenaCount;
  854. ///////////////////////////////////////////////////////////////////////////////
  855. //  Add item from resend queue
  856. ///////////////////////////////////////////////////////////////////////////////
  857. SYS_LIST_ADDT(&pQM->LLink, &pMQ->ReadyQueue);
  858. ++pMQ->iReadyCount;
  859. }
  860. }
  861. ///////////////////////////////////////////////////////////////////////////////
  862. //  If the count of rsnd queue is not zero, set the event
  863. ///////////////////////////////////////////////////////////////////////////////
  864. if (pMQ->iReadyCount > 0)
  865. SysSetEvent(pMQ->hReadyEvent);
  866. SysUnlockMutex(pMQ->hMutex);
  867. return (0);
  868. }
  869. int QueCheckMessage(QUEUE_HANDLE hQueue, QMSG_HANDLE hMessage)
  870. {
  871. MessageQueue *pMQ = (MessageQueue *) hQueue;
  872. QueueMessage *pQM = (QueueMessage *) hMessage;
  873. char szQueueFilePath[SYS_MAX_PATH] = "";
  874. if (pQM->ulFlags & QUMF_DELETED) {
  875. ErrSetErrorCode(ERR_MESSAGE_DELETED);
  876. return (ERR_MESSAGE_DELETED);
  877. }
  878. QueGetFilePath(hQueue, hMessage, szQueueFilePath, QUEUE_MESS_DIR);
  879. if (!SysExistFile(szQueueFilePath)) {
  880. QueGetFilePath(hQueue, hMessage, szQueueFilePath, QUEUE_RSND_DIR);
  881. if (!SysExistFile(szQueueFilePath)) {
  882. ErrSetErrorCode(ERR_NO_MESSAGE_FILE);
  883. return (ERR_NO_MESSAGE_FILE);
  884. }
  885. }
  886. return (0);
  887. }
  888. static bool QueMessageDestMatch(MessageQueue * pMQ, QueueMessage * pQM,
  889. char const *pszAddressMatch)
  890. {
  891. ///////////////////////////////////////////////////////////////////////////////
  892. //  Get the queue file path
  893. ///////////////////////////////////////////////////////////////////////////////
  894. char szQueueFilePath[SYS_MAX_PATH] = "";
  895. QueGetFilePath(pMQ, pQM, szQueueFilePath);
  896. ///////////////////////////////////////////////////////////////////////////////
  897. //  Load queue file header
  898. ///////////////////////////////////////////////////////////////////////////////
  899. SpoolFileHeader SFH;
  900. if (USmlLoadSpoolFileHeader(szQueueFilePath, SFH) < 0)
  901. return (false);
  902. bool bAddressMatch = false;
  903. if (strchr(pszAddressMatch, '@') == NULL) {
  904. ///////////////////////////////////////////////////////////////////////////////
  905. //  RFC style ETRN ( domain based )
  906. ///////////////////////////////////////////////////////////////////////////////
  907. char szDestUser[MAX_ADDR_NAME] = "";
  908. char szDestDomain[MAX_ADDR_NAME] = "";
  909. if ((StrStringsCount(SFH.ppszRcpt) < 1) ||
  910.     (USmtpSplitEmailAddr(SFH.ppszRcpt[0], szDestUser, szDestDomain) < 0)) {
  911. USmlCleanupSpoolFileHeader(SFH);
  912. return (false);
  913. }
  914. bAddressMatch = (StrIWildMatch(szDestDomain, pszAddressMatch) != 0);
  915. } else {
  916. ///////////////////////////////////////////////////////////////////////////////
  917. //  XMail style ETRN ( email based )
  918. ///////////////////////////////////////////////////////////////////////////////
  919. bAddressMatch = (StrIWildMatch(SFH.ppszRcpt[0], pszAddressMatch) != 0);
  920. }
  921. USmlCleanupSpoolFileHeader(SFH);
  922. return (bAddressMatch);
  923. }
  924. int QueFlushRsndArena(QUEUE_HANDLE hQueue, char const *pszAddressMatch)
  925. {
  926. MessageQueue *pMQ = (MessageQueue *) hQueue;
  927. if (SysLockMutex(pMQ->hMutex, SYS_INFINITE_TIMEOUT) < 0)
  928. return (ErrGetErrorCode());
  929. SysListHead *pLLink;
  930. SYS_LIST_FOR_EACH(pLLink, &pMQ->RsndArenaQueue) {
  931. QueueMessage *pQM = SYS_LIST_ENTRY(pLLink, QueueMessage, LLink);
  932. if ((pszAddressMatch == NULL) || QueMessageDestMatch(pMQ, pQM, pszAddressMatch)) {
  933. ///////////////////////////////////////////////////////////////////////////////
  934. //  Set the list pointer to the next item
  935. ///////////////////////////////////////////////////////////////////////////////
  936. pLLink = pLLink->pPrev;
  937. ///////////////////////////////////////////////////////////////////////////////
  938. //  Remove item from resend arena
  939. ///////////////////////////////////////////////////////////////////////////////
  940. SYS_LIST_DEL(&pQM->LLink);
  941. --pMQ->iRsndArenaCount;
  942. ///////////////////////////////////////////////////////////////////////////////
  943. //  Add item from resend queue
  944. ///////////////////////////////////////////////////////////////////////////////
  945. SYS_LIST_ADDT(&pQM->LLink, &pMQ->ReadyQueue);
  946. ++pMQ->iReadyCount;
  947. }
  948. }
  949. ///////////////////////////////////////////////////////////////////////////////
  950. //  If the count of rsnd queue is not zero, set the event
  951. ///////////////////////////////////////////////////////////////////////////////
  952. if (pMQ->iReadyCount > 0)
  953. SysSetEvent(pMQ->hReadyEvent);
  954. SysUnlockMutex(pMQ->hMutex);
  955. return (0);
  956. }