From 29fa0621d7b1ed51920a5f8de81ee71c2a5935f8 Mon Sep 17 00:00:00 2001 From: Michael Davidsaver Date: Fri, 19 Feb 2021 17:50:04 -0800 Subject: [PATCH] Com: rewrite errlog Switch to double buffering to allow errlogThread to unlock while printing. --- modules/libcom/src/error/errlog.c | 778 +++++++++++--------------- modules/libcom/test/epicsErrlogTest.c | 3 +- 2 files changed, 332 insertions(+), 449 deletions(-) diff --git a/modules/libcom/src/error/errlog.c b/modules/libcom/src/error/errlog.c index 90241c473..7ed6a8bd7 100644 --- a/modules/libcom/src/error/errlog.c +++ b/modules/libcom/src/error/errlog.c @@ -20,12 +20,12 @@ #include #define ERRLOG_INIT -#include "adjustment.h" #include "dbDefs.h" #include "epicsThread.h" #include "cantProceed.h" #include "epicsMutex.h" #include "epicsEvent.h" +#include "epicsString.h" #include "epicsInterrupt.h" #include "errMdef.h" #include "errSymTbl.h" @@ -35,8 +35,20 @@ #include "epicsExit.h" -#define BUFFER_SIZE 1280 -#define MAX_MESSAGE_SIZE 256 +#define MIN_BUFFER_SIZE 1280 +#define MIN_MESSAGE_SIZE 256 +#define MAX_MESSAGE_SIZE 0x00ffffff + +/* errlog buffers contain null terminated strings, each prefixed + * with a 1 byte header containing flags. + */ +/* State of entries in a buffer. */ +#define ERL_STATE_MASK 0xc0 +#define ERL_STATE_FREE 0x00 +#define ERL_STATE_WRITE 0x80 +#define ERL_STATE_READY 0x40 +/* should this message be echoed to the console? */ +#define ERL_LOCALECHO 0x20 /*Declare storage for errVerbose */ int errVerbose = 0; @@ -44,137 +56,172 @@ int errVerbose = 0; static void errlogExitHandler(void *); static void errlogThread(void); -static char *msgbufGetFree(int noConsoleMessage); -static void msgbufSetSize(int size); /* Send 'size' chars plus trailing '\0' */ -static char *msgbufGetSend(int *noConsoleMessage); -static void msgbufFreeSend(void); - typedef struct listenerNode{ ELLNODE node; errlogListener listener; void *pPrivate; } listenerNode; -/*each message consists of a msgNode immediately followed by the message */ -typedef struct msgNode { - ELLNODE node; - char *message; - int length; - int noConsoleMessage; -} msgNode; +typedef struct { + char *base; + size_t pos; +} buffer_t; static struct { - epicsEventId waitForWork; /*errlogThread waits for this*/ - epicsMutexId msgQueueLock; + /* const after errlogInit() */ + size_t maxMsgSize; + /* alloc size of both buffer_t::base */ + size_t bufSize; + int errlogInitFailed; + epicsMutexId listenerLock; - epicsEventId waitForFlush; /*errlogFlush waits for this*/ - epicsEventId flush; /*errlogFlush sets errlogThread does a Try*/ - epicsMutexId flushLock; - epicsEventId waitForExit; /*errlogExitHandler waits for this*/ - int atExit; /*TRUE when errlogExitHandler is active*/ ELLLIST listenerList; - ELLLIST msgQueue; - msgNode *pnextSend; - int errlogInitFailed; - int buffersize; - int maxMsgSize; - int msgNeeded; + + /* notify when log->size!=0 */ + epicsEventId waitForWork; + /* signals when worker increments flushSeq */ + epicsEventId waitForSeq; + epicsMutexId msgQueueLock; + + /* guarded by msgQueueLock */ + int atExit; int sevToLog; int toConsole; FILE *console; - int missedMessages; - char *pbuffer; -} pvtData; + /* A loop counter maintained by errlogThread. */ + epicsUInt32 flushSeq; + size_t nFlushers; + size_t nLost; -/* - * vsnprintf with truncation message + /* 'log' and 'print' combine to form a double buffer. */ + buffer_t *log; + buffer_t *print; + + /* actual storage which 'log' and 'print' point to */ + buffer_t bufs[2]; +} pvt; + +/* Returns an pointer to pvt.maxMsgSize bytes, or NULL if ring buffer is full. + * When !NULL, caller _must_ later msgbufCommit() */ -static int tvsnPrint(char *str, size_t size, const char *format, va_list ap) +static +char* msgbufAlloc(void) { - static const char tmsg[] = "<>\n"; - int nchar = epicsVsnprintf(str, size, format ? format : "", ap); + char *ret = NULL; - if (nchar >= size) { - if (size > sizeof tmsg) - strcpy(str + size - sizeof tmsg, tmsg); - nchar = size - 1; + if (epicsInterruptIsInterruptContext()) { + epicsInterruptContextMessage + ("errlog called from interrupt level\n"); + return ret; } + + errlogInit(0); + epicsMutexMustLock(pvt.msgQueueLock); /* matched in msgbufCommit() */ + if(pvt.bufSize - pvt.log->pos >= 1+pvt.maxMsgSize) { + /* there is enough space for the worst cast */ + ret = pvt.log->base + pvt.log->pos; + ret[0] = ERL_STATE_WRITE; + ret++; + } + + if(!ret) { + pvt.nLost++; + epicsMutexUnlock(pvt.msgQueueLock); + } + return ret; +} + +static +size_t msgbufCommit(size_t nchar, int localEcho) +{ + int isOkToBlock = epicsThreadIsOkToBlock(); + int wasEmpty = pvt.log->pos==0; + int atExit = pvt.atExit; + char *start = pvt.log->base + pvt.log->pos; + + /* nchar returned by snprintf() is >= maxMsgSize when truncated */ + if(nchar >= pvt.maxMsgSize) { + const char *trunc = "<>\n"; + nchar = pvt.maxMsgSize - 1u; + + strcpy(start + 1u + nchar - strlen(trunc), trunc); + /* assert(strlen(start+1u)==nchar); */ + } + + start[1u + nchar] = '\0'; + + if(localEcho && isOkToBlock && atExit) { + /* errlogThread is not running, so we print directly + * and then abandon the buffer. + */ + fprintf(pvt.console, "%s", start); + + } else { + start[0u] = ERL_STATE_READY | (localEcho ? ERL_LOCALECHO : 0); + + pvt.log->pos += 1u + nchar + 1u; + } + + epicsMutexUnlock(pvt.msgQueueLock); /* matched in msgbufAlloc() */ + + if(wasEmpty && !atExit) + epicsEventMustTrigger(pvt.waitForWork); + + if(localEcho && isOkToBlock && !atExit) + errlogFlush(); + return nchar; } +static +void errlogSequence(void) +{ + int wakeNext = 0; + size_t seq; + + if (pvt.atExit) + return; + + epicsMutexMustLock(pvt.msgQueueLock); + pvt.nFlushers++; + seq = pvt.flushSeq; + + while(seq == pvt.flushSeq && !pvt.atExit) { + epicsMutexUnlock(pvt.msgQueueLock); + /* force worker to wake and increment seq */ + epicsEventMustTrigger(pvt.waitForWork); + epicsEventMustWait(pvt.waitForSeq); + epicsMutexMustLock(pvt.msgQueueLock); + } + + pvt.nFlushers--; + wakeNext = pvt.nFlushers!=0u; + epicsMutexUnlock(pvt.msgQueueLock); + + if(wakeNext) + epicsEventMustTrigger(pvt.waitForSeq); +} + int errlogPrintf(const char *pFormat, ...) { - va_list pvar; - char *pbuffer; - int nchar; - int isOkToBlock; - - if (epicsInterruptIsInterruptContext()) { - epicsInterruptContextMessage - ("errlogPrintf called from interrupt level\n"); - return 0; - } - - errlogInit(0); - isOkToBlock = epicsThreadIsOkToBlock(); - - if (pvtData.atExit || (isOkToBlock && pvtData.toConsole)) { - FILE *console = pvtData.console ? pvtData.console : stderr; - - va_start(pvar, pFormat); - nchar = vfprintf(console, pFormat, pvar); - va_end (pvar); - fflush(console); - } - - if (pvtData.atExit) - return nchar; - - pbuffer = msgbufGetFree(isOkToBlock); - if (!pbuffer) - return 0; - - va_start(pvar, pFormat); - nchar = tvsnPrint(pbuffer, pvtData.maxMsgSize, pFormat?pFormat:"", pvar); - va_end(pvar); - msgbufSetSize(nchar); - return nchar; + int ret; + va_list args; + va_start(args, pFormat); + ret = errlogVprintf(pFormat, args); + va_end(args); + return ret; } int errlogVprintf(const char *pFormat,va_list pvar) { - int nchar; - char *pbuffer; - int isOkToBlock; - FILE *console; + int nchar = 0; + char *buf = msgbufAlloc(); - if (epicsInterruptIsInterruptContext()) { - epicsInterruptContextMessage - ("errlogVprintf called from interrupt level\n"); - return 0; + if(buf) { + nchar = epicsVsnprintf(buf, pvt.maxMsgSize, pFormat, pvar); + nchar = msgbufCommit(nchar, pvt.toConsole); } - - errlogInit(0); - if (pvtData.atExit) - return 0; - isOkToBlock = epicsThreadIsOkToBlock(); - - pbuffer = msgbufGetFree(isOkToBlock); - if (!pbuffer) { - console = pvtData.console ? pvtData.console : stderr; - vfprintf(console, pFormat, pvar); - fflush(console); - return 0; - } - - nchar = tvsnPrint(pbuffer, pvtData.maxMsgSize, pFormat?pFormat:"", pvar); - if (pvtData.atExit || (isOkToBlock && pvtData.toConsole)) { - console = pvtData.console ? pvtData.console : stderr; - fprintf(console, "%s", pbuffer); - fflush(console); - } - msgbufSetSize(nchar); return nchar; } @@ -188,14 +235,6 @@ int errlogPrintfNoConsole(const char *pFormat, ...) { va_list pvar; int nchar; - - if (epicsInterruptIsInterruptContext()) { - epicsInterruptContextMessage - ("errlogPrintfNoConsole called from interrupt level\n"); - return 0; - } - - errlogInit(0); va_start(pvar, pFormat); nchar = errlogVprintfNoConsole(pFormat, pvar); va_end(pvar); @@ -204,25 +243,13 @@ int errlogPrintfNoConsole(const char *pFormat, ...) int errlogVprintfNoConsole(const char *pFormat, va_list pvar) { - int nchar; - char *pbuffer; + int nchar = 0; + char *buf = msgbufAlloc(); - if (epicsInterruptIsInterruptContext()) { - epicsInterruptContextMessage - ("errlogVprintfNoConsole called from interrupt level\n"); - return 0; + if(buf) { + nchar = epicsVsnprintf(buf, pvt.maxMsgSize, pFormat, pvar); + nchar = msgbufCommit(nchar, 0); } - - errlogInit(0); - if (pvtData.atExit) - return 0; - - pbuffer = msgbufGetFree(1); - if (!pbuffer) - return 0; - - nchar = tvsnPrint(pbuffer, pvtData.maxMsgSize, pFormat?pFormat:"", pvar); - msgbufSetSize(nchar); return nchar; } @@ -231,29 +258,6 @@ int errlogSevPrintf(errlogSevEnum severity, const char *pFormat, ...) { va_list pvar; int nchar; - int isOkToBlock; - - if (epicsInterruptIsInterruptContext()) { - epicsInterruptContextMessage - ("errlogSevPrintf called from interrupt level\n"); - return 0; - } - - errlogInit(0); - if (pvtData.sevToLog > severity) - return 0; - - isOkToBlock = epicsThreadIsOkToBlock(); - if (pvtData.atExit || (isOkToBlock && pvtData.toConsole)) { - FILE *console = pvtData.console ? pvtData.console : stderr; - - fprintf(console, "sevr=%s ", errlogGetSevEnumString(severity)); - va_start(pvar, pFormat); - vfprintf(console, pFormat, pvar); - va_end(pvar); - fflush(console); - } - va_start(pvar, pFormat); nchar = errlogSevVprintf(severity, pFormat, pvar); va_end(pvar); @@ -262,35 +266,15 @@ int errlogSevPrintf(errlogSevEnum severity, const char *pFormat, ...) int errlogSevVprintf(errlogSevEnum severity, const char *pFormat, va_list pvar) { - char *pnext; - int nchar; - int totalChar = 0; - int isOkToBlock; + int nchar = 0; + char *buf = msgbufAlloc(); - if (epicsInterruptIsInterruptContext()) { - epicsInterruptContextMessage - ("errlogSevVprintf called from interrupt level\n"); - return 0; + if(buf) { + nchar = sprintf(buf, "sevr=%s ", errlogGetSevEnumString(severity)); + if(nchar < pvt.maxMsgSize) + nchar += epicsVsnprintf(buf + nchar, pvt.maxMsgSize - nchar, pFormat, pvar); + nchar = msgbufCommit(nchar, pvt.toConsole); } - - errlogInit(0); - if (pvtData.atExit) - return 0; - - isOkToBlock = epicsThreadIsOkToBlock(); - pnext = msgbufGetFree(isOkToBlock); - if (!pnext) - return 0; - - nchar = sprintf(pnext, "sevr=%s ", errlogGetSevEnumString(severity)); - pnext += nchar; totalChar += nchar; - nchar = tvsnPrint(pnext, pvtData.maxMsgSize - totalChar - 1, pFormat, pvar); - pnext += nchar; totalChar += nchar; - if (pnext[-1] != '\n') { - strcpy(pnext,"\n"); - totalChar++; - } - msgbufSetSize(totalChar); return nchar; } @@ -306,13 +290,19 @@ const char * errlogGetSevEnumString(errlogSevEnum severity) void errlogSetSevToLog(errlogSevEnum severity) { errlogInit(0); - pvtData.sevToLog = severity; + epicsMutexMustLock(pvt.msgQueueLock); + pvt.sevToLog = severity; + epicsMutexUnlock(pvt.msgQueueLock); } errlogSevEnum errlogGetSevToLog(void) { + errlogSevEnum ret; errlogInit(0); - return pvtData.sevToLog; + epicsMutexMustLock(pvt.msgQueueLock); + ret = pvt.sevToLog; + epicsMutexUnlock(pvt.msgQueueLock); + return ret; } void errlogAddListener(errlogListener listener, void *pPrivate) @@ -320,16 +310,16 @@ void errlogAddListener(errlogListener listener, void *pPrivate) listenerNode *plistenerNode; errlogInit(0); - if (pvtData.atExit) + if (pvt.atExit) return; plistenerNode = callocMustSucceed(1,sizeof(listenerNode), "errlogAddListener"); - epicsMutexMustLock(pvtData.listenerLock); + epicsMutexMustLock(pvt.listenerLock); plistenerNode->listener = listener; plistenerNode->pPrivate = pPrivate; - ellAdd(&pvtData.listenerList,&plistenerNode->node); - epicsMutexUnlock(pvtData.listenerLock); + ellAdd(&pvt.listenerList,&plistenerNode->node); + epicsMutexUnlock(pvt.listenerLock); } int errlogRemoveListeners(errlogListener listener, void *pPrivate) @@ -338,46 +328,42 @@ int errlogRemoveListeners(errlogListener listener, void *pPrivate) int count = 0; errlogInit(0); - if (!pvtData.atExit) - epicsMutexMustLock(pvtData.listenerLock); + epicsMutexMustLock(pvt.listenerLock); - plistenerNode = (listenerNode *)ellFirst(&pvtData.listenerList); + plistenerNode = (listenerNode *)ellFirst(&pvt.listenerList); while (plistenerNode) { listenerNode *pnext = (listenerNode *)ellNext(&plistenerNode->node); if (plistenerNode->listener == listener && plistenerNode->pPrivate == pPrivate) { - ellDelete(&pvtData.listenerList, &plistenerNode->node); + ellDelete(&pvt.listenerList, &plistenerNode->node); free(plistenerNode); ++count; } plistenerNode = pnext; } - if (!pvtData.atExit) - epicsMutexUnlock(pvtData.listenerLock); - - if (count == 0) { - FILE *console = pvtData.console ? pvtData.console : stderr; - - fprintf(console, - "errlogRemoveListeners: No listeners found\n"); - } + epicsMutexUnlock(pvt.listenerLock); return count; } int eltc(int yesno) { errlogInit(0); - errlogFlush(); - pvtData.toConsole = yesno; + epicsMutexMustLock(pvt.msgQueueLock); + pvt.toConsole = yesno; + epicsMutexUnlock(pvt.msgQueueLock); return 0; } int errlogSetConsole(FILE *stream) { errlogInit(0); - pvtData.console = stream; + epicsMutexMustLock(pvt.msgQueueLock); + pvt.console = stream ? stream : stderr; + epicsMutexUnlock(pvt.msgQueueLock); + /* make sure worker has stopped writing to the previous stream */ + errlogSequence(); return 0; } @@ -385,115 +371,93 @@ void errPrintf(long status, const char *pFileName, int lineno, const char *pformat, ...) { va_list pvar; - char *pnext; - int nchar; - int totalChar=0; - int isOkToBlock; - char name[256]; + int nchar = 0; + char *buf = msgbufAlloc(); - if (epicsInterruptIsInterruptContext()) { - epicsInterruptContextMessage("errPrintf called from interrupt level\n"); - return; - } - - errlogInit(0); - isOkToBlock = epicsThreadIsOkToBlock(); - if (status == 0) - status = errno; - - if (status > 0) { - errSymLookup(status, name, sizeof(name)); - } - - if (pvtData.atExit || (isOkToBlock && pvtData.toConsole)) { - FILE *console = pvtData.console ? pvtData.console : stderr; - - if (pFileName) - fprintf(console, "filename=\"%s\" line number=%d\n", - pFileName, lineno); - if (status > 0) - fprintf(console, "%s ", name); - - va_start(pvar, pformat); - vfprintf(console, pformat, pvar); - va_end(pvar); - fputc('\n', console); - fflush(console); - } - - if (pvtData.atExit) - return; - - pnext = msgbufGetFree(isOkToBlock); - if (!pnext) - return; - - if (pFileName) { - nchar = sprintf(pnext,"filename=\"%s\" line number=%d\n", - pFileName, lineno); - pnext += nchar; totalChar += nchar; - } - - if (status > 0) { - nchar = sprintf(pnext,"%s ",name); - pnext += nchar; totalChar += nchar; - } va_start(pvar, pformat); - nchar = tvsnPrint(pnext, pvtData.maxMsgSize - totalChar - 1, pformat, pvar); - va_end(pvar); - if (nchar>0) { - pnext += nchar; - totalChar += nchar; + + if(buf) { + char name[256] = ""; + + if (status > 0) { + errSymLookup(status, name, sizeof(name)); + } + + nchar = epicsSnprintf(buf, pvt.maxMsgSize, "%s%sfilename=\"%s\" line number=%d", + name, status ? " " : "", pFileName, lineno); + if(nchar < pvt.maxMsgSize) + nchar += epicsVsnprintf(buf + nchar, pvt.maxMsgSize - nchar, pformat, pvar); + msgbufCommit(nchar, pvt.toConsole); } - strcpy(pnext, "\n"); - totalChar++ ; /*include the \n */ - msgbufSetSize(totalChar); + + va_end(pvar); } - -static void errlogExitHandler(void *pvt) +/* On *NIX. also RTEM and vxWorks during controlled shutdown. + * On Windows when main() explicitly calls epicsExit(0), like default IOC main(). + * Switch to sync. print and join errlogThread. + * + * On Windows otherwise, errlogThread killed by exit(), and this handler is never + * invoked. Use of errlog from OS atexit() handler is undefined. + */ +static void errlogExitHandler(void *raw) { - pvtData.atExit = 1; - epicsEventSignal(pvtData.waitForWork); - epicsEventMustWait(pvtData.waitForExit); + epicsThreadId tid = raw; + epicsMutexMustLock(pvt.msgQueueLock); + pvt.atExit = 1; + epicsMutexUnlock(pvt.msgQueueLock); + epicsEventSignal(pvt.waitForWork); + epicsThreadMustJoin(tid); } struct initArgs { - int bufsize; - int maxMsgSize; + size_t bufsize; + size_t maxMsgSize; }; static void errlogInitPvt(void *arg) { struct initArgs *pconfig = (struct initArgs *) arg; - epicsThreadId tid; + epicsThreadId tid = NULL; + epicsThreadOpts topts = EPICS_THREAD_OPTS_INIT; - pvtData.errlogInitFailed = TRUE; - pvtData.buffersize = pconfig->bufsize; - pvtData.maxMsgSize = pconfig->maxMsgSize; - pvtData.msgNeeded = adjustToWorstCaseAlignment(pvtData.maxMsgSize + - sizeof(msgNode)); - ellInit(&pvtData.listenerList); - ellInit(&pvtData.msgQueue); - pvtData.toConsole = TRUE; - pvtData.console = NULL; - pvtData.waitForWork = epicsEventMustCreate(epicsEventEmpty); - pvtData.listenerLock = epicsMutexMustCreate(); - pvtData.msgQueueLock = epicsMutexMustCreate(); - pvtData.waitForFlush = epicsEventMustCreate(epicsEventEmpty); - pvtData.flush = epicsEventMustCreate(epicsEventEmpty); - pvtData.flushLock = epicsMutexMustCreate(); - pvtData.waitForExit = epicsEventMustCreate(epicsEventEmpty); - pvtData.pbuffer = callocMustSucceed(1, pvtData.buffersize, - "errlogInitPvt"); + topts.priority = epicsThreadPriorityLow; + topts.stackSize = epicsThreadStackSmall; + topts.joinable = 1; + + /* Use of *Must* alloc functions would recurse on failure since + * cantProceed() calls us. + */ + + pvt.errlogInitFailed = TRUE; + pvt.bufSize = pconfig->bufsize; + pvt.maxMsgSize = pconfig->maxMsgSize; + ellInit(&pvt.listenerList); + pvt.toConsole = TRUE; + pvt.console = stderr; + pvt.waitForWork = epicsEventCreate(epicsEventEmpty); + pvt.listenerLock = epicsMutexCreate(); + pvt.msgQueueLock = epicsMutexCreate(); + pvt.waitForSeq = epicsEventCreate(epicsEventEmpty); + pvt.log = &pvt.bufs[0]; + pvt.print = &pvt.bufs[1]; + pvt.log->base = calloc(1, pvt.bufSize); + pvt.print->base = calloc(1, pvt.bufSize); errSymBld(); /* Better not to do this lazily... */ - tid = epicsThreadCreate("errlog", epicsThreadPriorityLow, - epicsThreadGetStackSize(epicsThreadStackSmall), - (EPICSTHREADFUNC)errlogThread, 0); + if(pvt.waitForWork + && pvt.listenerLock + && pvt.msgQueueLock + && pvt.waitForSeq + && pvt.log->base + && pvt.print->base + ) { + tid = epicsThreadCreateOpt("errlog", (EPICSTHREADFUNC)errlogThread, 0, &topts); + } if (tid) { - pvtData.errlogInitFailed = FALSE; + pvt.errlogInitFailed = FALSE; + epicsAtExit(errlogExitHandler, tid); } } @@ -503,19 +467,21 @@ int errlogInit2(int bufsize, int maxMsgSize) static epicsThreadOnceId errlogOnceFlag = EPICS_THREAD_ONCE_INIT; struct initArgs config; - if (pvtData.atExit) + if (pvt.atExit) return 0; - if (bufsize < BUFFER_SIZE) - bufsize = BUFFER_SIZE; + if (bufsize < MIN_BUFFER_SIZE) + bufsize = MIN_BUFFER_SIZE; config.bufsize = bufsize; - if (maxMsgSize < MAX_MESSAGE_SIZE) + if (maxMsgSize < MIN_MESSAGE_SIZE) + maxMsgSize = MIN_MESSAGE_SIZE; + else if (maxMsgSize > MAX_MESSAGE_SIZE) maxMsgSize = MAX_MESSAGE_SIZE; config.maxMsgSize = maxMsgSize; epicsThreadOnce(&errlogOnceFlag, errlogInitPvt, &config); - if (pvtData.errlogInitFailed) { + if (pvt.errlogInitFailed) { fprintf(stderr,"errlogInit failed\n"); exit(1); } @@ -524,172 +490,88 @@ int errlogInit2(int bufsize, int maxMsgSize) int errlogInit(int bufsize) { - return errlogInit2(bufsize, MAX_MESSAGE_SIZE); + return errlogInit2(bufsize, MIN_MESSAGE_SIZE); } void errlogFlush(void) { - int count; - + /* wait for both buffers to be handled to know that all currently + * logged message have been seen/sent. + */ errlogInit(0); - if (pvtData.atExit) - return; - - /*If nothing in queue don't wake up errlogThread*/ - epicsMutexMustLock(pvtData.msgQueueLock); - count = ellCount(&pvtData.msgQueue); - epicsMutexUnlock(pvtData.msgQueueLock); - if (count <= 0) - return; - - /*must let errlogThread empty queue*/ - epicsMutexMustLock(pvtData.flushLock); - epicsEventSignal(pvtData.flush); - epicsEventSignal(pvtData.waitForWork); - epicsEventMustWait(pvtData.waitForFlush); - epicsMutexUnlock(pvtData.flushLock); + errlogSequence(); + errlogSequence(); } static void errlogThread(void) { - listenerNode *plistenerNode; - int noConsoleMessage; - char *pmessage; + epicsMutexMustLock(pvt.msgQueueLock); + while (!pvt.atExit) { + pvt.flushSeq++; - epicsAtExit(errlogExitHandler,0); - while (TRUE) { - epicsEventMustWait(pvtData.waitForWork); - while ((pmessage = msgbufGetSend(&noConsoleMessage))) { - epicsMutexMustLock(pvtData.listenerLock); - if (pvtData.toConsole && !noConsoleMessage) { - FILE *console = pvtData.console ? pvtData.console : stderr; + if(pvt.log->pos==0u) { + int wakeFlusher = pvt.nFlushers!=0; + epicsMutexUnlock(pvt.msgQueueLock); + if(wakeFlusher) + epicsEventMustTrigger(pvt.waitForSeq); + epicsEventMustWait(pvt.waitForWork); + epicsMutexMustLock(pvt.msgQueueLock); - fprintf(console, "%s", pmessage); + } else { + /* snapshot and swap buffers for use while unlocked */ + size_t nLost = pvt.nLost; + FILE *console = pvt.toConsole ? pvt.console : NULL; + size_t pos = 0u; + buffer_t *print; + + { + buffer_t *temp = pvt.log; + pvt.log = pvt.print; + pvt.print = print = temp; + } + + pvt.nLost = 0u; + epicsMutexUnlock(pvt.msgQueueLock); + + while(pos < print->pos) { + listenerNode *plistenerNode; + const char* base = print->base + pos; + size_t mlen = epicsStrnLen(base+1u, pvt.bufSize - pos); + + if((base[0]&ERL_STATE_MASK) != ERL_STATE_READY || mlen>=pvt.bufSize - pos) { + fprintf(stderr, "Logic Error: errlog buffer corruption. %02x, %zu\n", + (unsigned)base[0], mlen); + /* try to reset and recover */ + break; + } + + if(base[0]&ERL_LOCALECHO && console) { + fprintf(console, "%s", base+1u); + } + + epicsMutexMustLock(pvt.listenerLock); + plistenerNode = (listenerNode *)ellFirst(&pvt.listenerList); + while (plistenerNode) { + (*plistenerNode->listener)(plistenerNode->pPrivate, base+1u); + plistenerNode = (listenerNode *)ellNext(&plistenerNode->node); + } + epicsMutexUnlock(pvt.listenerLock); + + pos += 1u + mlen+1u; + } + + memset(print->base, 0, pvt.bufSize); + print->pos = 0u; + + if(nLost && console) + fprintf(console, "errlog: lost %zu messages\n", nLost); + + if(console) fflush(console); - } - plistenerNode = (listenerNode *)ellFirst(&pvtData.listenerList); - while (plistenerNode) { - (*plistenerNode->listener)(plistenerNode->pPrivate, pmessage); - plistenerNode = (listenerNode *)ellNext(&plistenerNode->node); - } + epicsMutexMustLock(pvt.msgQueueLock); - epicsMutexUnlock(pvtData.listenerLock); - msgbufFreeSend(); - } - - if (pvtData.atExit) - break; - if (epicsEventTryWait(pvtData.flush) != epicsEventWaitOK) - continue; - - epicsThreadSleep(.2); /*just wait an extra .2 seconds*/ - epicsEventSignal(pvtData.waitForFlush); - } - epicsEventSignal(pvtData.waitForExit); -} - - -static msgNode * msgbufGetNode(void) -{ - char *pbuffer = pvtData.pbuffer; - char *pnextFree; - msgNode *pnextSend; - - if (ellCount(&pvtData.msgQueue) == 0 ) { - pnextFree = pbuffer; /* Reset if empty */ - } - else { - msgNode *pfirst = (msgNode *)ellFirst(&pvtData.msgQueue); - msgNode *plast = (msgNode *)ellLast(&pvtData.msgQueue); - char *plimit = pbuffer + pvtData.buffersize; - - pnextFree = plast->message + adjustToWorstCaseAlignment(plast->length); - if (pfirst > plast) { - plimit = (char *)pfirst; - } - else if (pnextFree + pvtData.msgNeeded > plimit) { - pnextFree = pbuffer; /* Hit end, wrap to start */ - plimit = (char *)pfirst; - } - if (pnextFree + pvtData.msgNeeded > plimit) { - return 0; /* No room */ } } - - pnextSend = (msgNode *)pnextFree; - pnextSend->message = pnextFree + sizeof(msgNode); - pnextSend->length = 0; - return pnextSend; -} - -static char * msgbufGetFree(int noConsoleMessage) -{ - msgNode *pnextSend; - - if (epicsMutexLock(pvtData.msgQueueLock) != epicsMutexLockOK) - return 0; - - if ((ellCount(&pvtData.msgQueue) == 0) && pvtData.missedMessages) { - int nchar; - - pnextSend = msgbufGetNode(); - nchar = sprintf(pnextSend->message, - "errlog: %d messages were discarded\n", pvtData.missedMessages); - pnextSend->length = nchar + 1; - pvtData.missedMessages = 0; - ellAdd(&pvtData.msgQueue, &pnextSend->node); - } - - pvtData.pnextSend = pnextSend = msgbufGetNode(); - if (pnextSend) { - pnextSend->noConsoleMessage = noConsoleMessage; - pnextSend->length = 0; - return pnextSend->message; /* NB: msgQueueLock is still locked */ - } - - ++pvtData.missedMessages; - epicsMutexUnlock(pvtData.msgQueueLock); - return 0; -} - -static void msgbufSetSize(int size) -{ - msgNode *pnextSend = pvtData.pnextSend; - - pnextSend->length = size+1; - ellAdd(&pvtData.msgQueue, &pnextSend->node); - epicsMutexUnlock(pvtData.msgQueueLock); - epicsEventSignal(pvtData.waitForWork); -} - - -static char * msgbufGetSend(int *noConsoleMessage) -{ - msgNode *pnextSend; - - epicsMutexMustLock(pvtData.msgQueueLock); - pnextSend = (msgNode *)ellFirst(&pvtData.msgQueue); - epicsMutexUnlock(pvtData.msgQueueLock); - if (!pnextSend) - return 0; - - *noConsoleMessage = pnextSend->noConsoleMessage; - return pnextSend->message; -} - -static void msgbufFreeSend(void) -{ - msgNode *pnextSend; - - epicsMutexMustLock(pvtData.msgQueueLock); - pnextSend = (msgNode *)ellFirst(&pvtData.msgQueue); - if (!pnextSend) { - FILE *console = pvtData.console ? pvtData.console : stderr; - - fprintf(console, "errlog: msgbufFreeSend logic error\n"); - epicsThreadSuspendSelf(); - } - ellDelete(&pvtData.msgQueue, &pnextSend->node); - epicsMutexUnlock(pvtData.msgQueueLock); + epicsMutexUnlock(pvt.msgQueueLock); } diff --git a/modules/libcom/test/epicsErrlogTest.c b/modules/libcom/test/epicsErrlogTest.c index 5d6bbdcad..105eecc05 100644 --- a/modules/libcom/test/epicsErrlogTest.c +++ b/modules/libcom/test/epicsErrlogTest.c @@ -370,7 +370,8 @@ MAIN(epicsErrlogTest) testDiag("Logged %u messages", pvt.count); epicsEventMustWait(pvt.done); - testEqInt(pvt.count, N+1); + /* Expect N+1 messages +- 1 depending on impl */ + testOk(pvt.count >= N && pvt.count<=N+2, "Logged %u messages, expected %zu", pvt.count, N+1); /* Clean up */ testOk(1 == errlogRemoveListeners(&logClient, &pvt),