Com: rewrite errlog

Switch to double buffering to allow errlogThread
to unlock while printing.
This commit is contained in:
Michael Davidsaver
2021-02-19 17:50:04 -08:00
parent 465920fcf1
commit 29fa0621d7
2 changed files with 332 additions and 449 deletions

View File

@@ -20,12 +20,12 @@
#include <errno.h>
#define ERRLOG_INIT
#include "adjustment.h"
#include "dbDefs.h"
#include "epicsThread.h"
#include "cantProceed.h"
#include "epicsMutex.h"
#include "epicsEvent.h"
#include "epicsString.h"
#include "epicsInterrupt.h"
#include "errMdef.h"
#include "errSymTbl.h"
@@ -35,8 +35,20 @@
#include "epicsExit.h"
#define BUFFER_SIZE 1280
#define MAX_MESSAGE_SIZE 256
#define MIN_BUFFER_SIZE 1280
#define MIN_MESSAGE_SIZE 256
#define MAX_MESSAGE_SIZE 0x00ffffff
/* errlog buffers contain null terminated strings, each prefixed
* with a 1 byte header containing flags.
*/
/* State of entries in a buffer. */
#define ERL_STATE_MASK 0xc0
#define ERL_STATE_FREE 0x00
#define ERL_STATE_WRITE 0x80
#define ERL_STATE_READY 0x40
/* should this message be echoed to the console? */
#define ERL_LOCALECHO 0x20
/*Declare storage for errVerbose */
int errVerbose = 0;
@@ -44,137 +56,172 @@ int errVerbose = 0;
static void errlogExitHandler(void *);
static void errlogThread(void);
static char *msgbufGetFree(int noConsoleMessage);
static void msgbufSetSize(int size); /* Send 'size' chars plus trailing '\0' */
static char *msgbufGetSend(int *noConsoleMessage);
static void msgbufFreeSend(void);
typedef struct listenerNode{
ELLNODE node;
errlogListener listener;
void *pPrivate;
} listenerNode;
/*each message consists of a msgNode immediately followed by the message */
typedef struct msgNode {
ELLNODE node;
char *message;
int length;
int noConsoleMessage;
} msgNode;
typedef struct {
char *base;
size_t pos;
} buffer_t;
static struct {
epicsEventId waitForWork; /*errlogThread waits for this*/
epicsMutexId msgQueueLock;
/* const after errlogInit() */
size_t maxMsgSize;
/* alloc size of both buffer_t::base */
size_t bufSize;
int errlogInitFailed;
epicsMutexId listenerLock;
epicsEventId waitForFlush; /*errlogFlush waits for this*/
epicsEventId flush; /*errlogFlush sets errlogThread does a Try*/
epicsMutexId flushLock;
epicsEventId waitForExit; /*errlogExitHandler waits for this*/
int atExit; /*TRUE when errlogExitHandler is active*/
ELLLIST listenerList;
ELLLIST msgQueue;
msgNode *pnextSend;
int errlogInitFailed;
int buffersize;
int maxMsgSize;
int msgNeeded;
/* notify when log->size!=0 */
epicsEventId waitForWork;
/* signals when worker increments flushSeq */
epicsEventId waitForSeq;
epicsMutexId msgQueueLock;
/* guarded by msgQueueLock */
int atExit;
int sevToLog;
int toConsole;
FILE *console;
int missedMessages;
char *pbuffer;
} pvtData;
/* A loop counter maintained by errlogThread. */
epicsUInt32 flushSeq;
size_t nFlushers;
size_t nLost;
/*
* vsnprintf with truncation message
/* 'log' and 'print' combine to form a double buffer. */
buffer_t *log;
buffer_t *print;
/* actual storage which 'log' and 'print' point to */
buffer_t bufs[2];
} pvt;
/* Returns an pointer to pvt.maxMsgSize bytes, or NULL if ring buffer is full.
* When !NULL, caller _must_ later msgbufCommit()
*/
static int tvsnPrint(char *str, size_t size, const char *format, va_list ap)
static
char* msgbufAlloc(void)
{
static const char tmsg[] = "<<TRUNCATED>>\n";
int nchar = epicsVsnprintf(str, size, format ? format : "", ap);
char *ret = NULL;
if (nchar >= size) {
if (size > sizeof tmsg)
strcpy(str + size - sizeof tmsg, tmsg);
nchar = size - 1;
if (epicsInterruptIsInterruptContext()) {
epicsInterruptContextMessage
("errlog called from interrupt level\n");
return ret;
}
errlogInit(0);
epicsMutexMustLock(pvt.msgQueueLock); /* matched in msgbufCommit() */
if(pvt.bufSize - pvt.log->pos >= 1+pvt.maxMsgSize) {
/* there is enough space for the worst cast */
ret = pvt.log->base + pvt.log->pos;
ret[0] = ERL_STATE_WRITE;
ret++;
}
if(!ret) {
pvt.nLost++;
epicsMutexUnlock(pvt.msgQueueLock);
}
return ret;
}
static
size_t msgbufCommit(size_t nchar, int localEcho)
{
int isOkToBlock = epicsThreadIsOkToBlock();
int wasEmpty = pvt.log->pos==0;
int atExit = pvt.atExit;
char *start = pvt.log->base + pvt.log->pos;
/* nchar returned by snprintf() is >= maxMsgSize when truncated */
if(nchar >= pvt.maxMsgSize) {
const char *trunc = "<<TRUNCATED>>\n";
nchar = pvt.maxMsgSize - 1u;
strcpy(start + 1u + nchar - strlen(trunc), trunc);
/* assert(strlen(start+1u)==nchar); */
}
start[1u + nchar] = '\0';
if(localEcho && isOkToBlock && atExit) {
/* errlogThread is not running, so we print directly
* and then abandon the buffer.
*/
fprintf(pvt.console, "%s", start);
} else {
start[0u] = ERL_STATE_READY | (localEcho ? ERL_LOCALECHO : 0);
pvt.log->pos += 1u + nchar + 1u;
}
epicsMutexUnlock(pvt.msgQueueLock); /* matched in msgbufAlloc() */
if(wasEmpty && !atExit)
epicsEventMustTrigger(pvt.waitForWork);
if(localEcho && isOkToBlock && !atExit)
errlogFlush();
return nchar;
}
static
void errlogSequence(void)
{
int wakeNext = 0;
size_t seq;
if (pvt.atExit)
return;
epicsMutexMustLock(pvt.msgQueueLock);
pvt.nFlushers++;
seq = pvt.flushSeq;
while(seq == pvt.flushSeq && !pvt.atExit) {
epicsMutexUnlock(pvt.msgQueueLock);
/* force worker to wake and increment seq */
epicsEventMustTrigger(pvt.waitForWork);
epicsEventMustWait(pvt.waitForSeq);
epicsMutexMustLock(pvt.msgQueueLock);
}
pvt.nFlushers--;
wakeNext = pvt.nFlushers!=0u;
epicsMutexUnlock(pvt.msgQueueLock);
if(wakeNext)
epicsEventMustTrigger(pvt.waitForSeq);
}
int errlogPrintf(const char *pFormat, ...)
{
va_list pvar;
char *pbuffer;
int nchar;
int isOkToBlock;
if (epicsInterruptIsInterruptContext()) {
epicsInterruptContextMessage
("errlogPrintf called from interrupt level\n");
return 0;
}
errlogInit(0);
isOkToBlock = epicsThreadIsOkToBlock();
if (pvtData.atExit || (isOkToBlock && pvtData.toConsole)) {
FILE *console = pvtData.console ? pvtData.console : stderr;
va_start(pvar, pFormat);
nchar = vfprintf(console, pFormat, pvar);
va_end (pvar);
fflush(console);
}
if (pvtData.atExit)
return nchar;
pbuffer = msgbufGetFree(isOkToBlock);
if (!pbuffer)
return 0;
va_start(pvar, pFormat);
nchar = tvsnPrint(pbuffer, pvtData.maxMsgSize, pFormat?pFormat:"", pvar);
va_end(pvar);
msgbufSetSize(nchar);
return nchar;
int ret;
va_list args;
va_start(args, pFormat);
ret = errlogVprintf(pFormat, args);
va_end(args);
return ret;
}
int errlogVprintf(const char *pFormat,va_list pvar)
{
int nchar;
char *pbuffer;
int isOkToBlock;
FILE *console;
int nchar = 0;
char *buf = msgbufAlloc();
if (epicsInterruptIsInterruptContext()) {
epicsInterruptContextMessage
("errlogVprintf called from interrupt level\n");
return 0;
if(buf) {
nchar = epicsVsnprintf(buf, pvt.maxMsgSize, pFormat, pvar);
nchar = msgbufCommit(nchar, pvt.toConsole);
}
errlogInit(0);
if (pvtData.atExit)
return 0;
isOkToBlock = epicsThreadIsOkToBlock();
pbuffer = msgbufGetFree(isOkToBlock);
if (!pbuffer) {
console = pvtData.console ? pvtData.console : stderr;
vfprintf(console, pFormat, pvar);
fflush(console);
return 0;
}
nchar = tvsnPrint(pbuffer, pvtData.maxMsgSize, pFormat?pFormat:"", pvar);
if (pvtData.atExit || (isOkToBlock && pvtData.toConsole)) {
console = pvtData.console ? pvtData.console : stderr;
fprintf(console, "%s", pbuffer);
fflush(console);
}
msgbufSetSize(nchar);
return nchar;
}
@@ -188,14 +235,6 @@ int errlogPrintfNoConsole(const char *pFormat, ...)
{
va_list pvar;
int nchar;
if (epicsInterruptIsInterruptContext()) {
epicsInterruptContextMessage
("errlogPrintfNoConsole called from interrupt level\n");
return 0;
}
errlogInit(0);
va_start(pvar, pFormat);
nchar = errlogVprintfNoConsole(pFormat, pvar);
va_end(pvar);
@@ -204,25 +243,13 @@ int errlogPrintfNoConsole(const char *pFormat, ...)
int errlogVprintfNoConsole(const char *pFormat, va_list pvar)
{
int nchar;
char *pbuffer;
int nchar = 0;
char *buf = msgbufAlloc();
if (epicsInterruptIsInterruptContext()) {
epicsInterruptContextMessage
("errlogVprintfNoConsole called from interrupt level\n");
return 0;
if(buf) {
nchar = epicsVsnprintf(buf, pvt.maxMsgSize, pFormat, pvar);
nchar = msgbufCommit(nchar, 0);
}
errlogInit(0);
if (pvtData.atExit)
return 0;
pbuffer = msgbufGetFree(1);
if (!pbuffer)
return 0;
nchar = tvsnPrint(pbuffer, pvtData.maxMsgSize, pFormat?pFormat:"", pvar);
msgbufSetSize(nchar);
return nchar;
}
@@ -231,29 +258,6 @@ int errlogSevPrintf(errlogSevEnum severity, const char *pFormat, ...)
{
va_list pvar;
int nchar;
int isOkToBlock;
if (epicsInterruptIsInterruptContext()) {
epicsInterruptContextMessage
("errlogSevPrintf called from interrupt level\n");
return 0;
}
errlogInit(0);
if (pvtData.sevToLog > severity)
return 0;
isOkToBlock = epicsThreadIsOkToBlock();
if (pvtData.atExit || (isOkToBlock && pvtData.toConsole)) {
FILE *console = pvtData.console ? pvtData.console : stderr;
fprintf(console, "sevr=%s ", errlogGetSevEnumString(severity));
va_start(pvar, pFormat);
vfprintf(console, pFormat, pvar);
va_end(pvar);
fflush(console);
}
va_start(pvar, pFormat);
nchar = errlogSevVprintf(severity, pFormat, pvar);
va_end(pvar);
@@ -262,35 +266,15 @@ int errlogSevPrintf(errlogSevEnum severity, const char *pFormat, ...)
int errlogSevVprintf(errlogSevEnum severity, const char *pFormat, va_list pvar)
{
char *pnext;
int nchar;
int totalChar = 0;
int isOkToBlock;
int nchar = 0;
char *buf = msgbufAlloc();
if (epicsInterruptIsInterruptContext()) {
epicsInterruptContextMessage
("errlogSevVprintf called from interrupt level\n");
return 0;
if(buf) {
nchar = sprintf(buf, "sevr=%s ", errlogGetSevEnumString(severity));
if(nchar < pvt.maxMsgSize)
nchar += epicsVsnprintf(buf + nchar, pvt.maxMsgSize - nchar, pFormat, pvar);
nchar = msgbufCommit(nchar, pvt.toConsole);
}
errlogInit(0);
if (pvtData.atExit)
return 0;
isOkToBlock = epicsThreadIsOkToBlock();
pnext = msgbufGetFree(isOkToBlock);
if (!pnext)
return 0;
nchar = sprintf(pnext, "sevr=%s ", errlogGetSevEnumString(severity));
pnext += nchar; totalChar += nchar;
nchar = tvsnPrint(pnext, pvtData.maxMsgSize - totalChar - 1, pFormat, pvar);
pnext += nchar; totalChar += nchar;
if (pnext[-1] != '\n') {
strcpy(pnext,"\n");
totalChar++;
}
msgbufSetSize(totalChar);
return nchar;
}
@@ -306,13 +290,19 @@ const char * errlogGetSevEnumString(errlogSevEnum severity)
void errlogSetSevToLog(errlogSevEnum severity)
{
errlogInit(0);
pvtData.sevToLog = severity;
epicsMutexMustLock(pvt.msgQueueLock);
pvt.sevToLog = severity;
epicsMutexUnlock(pvt.msgQueueLock);
}
errlogSevEnum errlogGetSevToLog(void)
{
errlogSevEnum ret;
errlogInit(0);
return pvtData.sevToLog;
epicsMutexMustLock(pvt.msgQueueLock);
ret = pvt.sevToLog;
epicsMutexUnlock(pvt.msgQueueLock);
return ret;
}
void errlogAddListener(errlogListener listener, void *pPrivate)
@@ -320,16 +310,16 @@ void errlogAddListener(errlogListener listener, void *pPrivate)
listenerNode *plistenerNode;
errlogInit(0);
if (pvtData.atExit)
if (pvt.atExit)
return;
plistenerNode = callocMustSucceed(1,sizeof(listenerNode),
"errlogAddListener");
epicsMutexMustLock(pvtData.listenerLock);
epicsMutexMustLock(pvt.listenerLock);
plistenerNode->listener = listener;
plistenerNode->pPrivate = pPrivate;
ellAdd(&pvtData.listenerList,&plistenerNode->node);
epicsMutexUnlock(pvtData.listenerLock);
ellAdd(&pvt.listenerList,&plistenerNode->node);
epicsMutexUnlock(pvt.listenerLock);
}
int errlogRemoveListeners(errlogListener listener, void *pPrivate)
@@ -338,46 +328,42 @@ int errlogRemoveListeners(errlogListener listener, void *pPrivate)
int count = 0;
errlogInit(0);
if (!pvtData.atExit)
epicsMutexMustLock(pvtData.listenerLock);
epicsMutexMustLock(pvt.listenerLock);
plistenerNode = (listenerNode *)ellFirst(&pvtData.listenerList);
plistenerNode = (listenerNode *)ellFirst(&pvt.listenerList);
while (plistenerNode) {
listenerNode *pnext = (listenerNode *)ellNext(&plistenerNode->node);
if (plistenerNode->listener == listener &&
plistenerNode->pPrivate == pPrivate) {
ellDelete(&pvtData.listenerList, &plistenerNode->node);
ellDelete(&pvt.listenerList, &plistenerNode->node);
free(plistenerNode);
++count;
}
plistenerNode = pnext;
}
if (!pvtData.atExit)
epicsMutexUnlock(pvtData.listenerLock);
if (count == 0) {
FILE *console = pvtData.console ? pvtData.console : stderr;
fprintf(console,
"errlogRemoveListeners: No listeners found\n");
}
epicsMutexUnlock(pvt.listenerLock);
return count;
}
int eltc(int yesno)
{
errlogInit(0);
errlogFlush();
pvtData.toConsole = yesno;
epicsMutexMustLock(pvt.msgQueueLock);
pvt.toConsole = yesno;
epicsMutexUnlock(pvt.msgQueueLock);
return 0;
}
int errlogSetConsole(FILE *stream)
{
errlogInit(0);
pvtData.console = stream;
epicsMutexMustLock(pvt.msgQueueLock);
pvt.console = stream ? stream : stderr;
epicsMutexUnlock(pvt.msgQueueLock);
/* make sure worker has stopped writing to the previous stream */
errlogSequence();
return 0;
}
@@ -385,115 +371,93 @@ void errPrintf(long status, const char *pFileName, int lineno,
const char *pformat, ...)
{
va_list pvar;
char *pnext;
int nchar;
int totalChar=0;
int isOkToBlock;
char name[256];
int nchar = 0;
char *buf = msgbufAlloc();
if (epicsInterruptIsInterruptContext()) {
epicsInterruptContextMessage("errPrintf called from interrupt level\n");
return;
}
errlogInit(0);
isOkToBlock = epicsThreadIsOkToBlock();
if (status == 0)
status = errno;
if (status > 0) {
errSymLookup(status, name, sizeof(name));
}
if (pvtData.atExit || (isOkToBlock && pvtData.toConsole)) {
FILE *console = pvtData.console ? pvtData.console : stderr;
if (pFileName)
fprintf(console, "filename=\"%s\" line number=%d\n",
pFileName, lineno);
if (status > 0)
fprintf(console, "%s ", name);
va_start(pvar, pformat);
vfprintf(console, pformat, pvar);
va_end(pvar);
fputc('\n', console);
fflush(console);
}
if (pvtData.atExit)
return;
pnext = msgbufGetFree(isOkToBlock);
if (!pnext)
return;
if (pFileName) {
nchar = sprintf(pnext,"filename=\"%s\" line number=%d\n",
pFileName, lineno);
pnext += nchar; totalChar += nchar;
}
if (status > 0) {
nchar = sprintf(pnext,"%s ",name);
pnext += nchar; totalChar += nchar;
}
va_start(pvar, pformat);
nchar = tvsnPrint(pnext, pvtData.maxMsgSize - totalChar - 1, pformat, pvar);
va_end(pvar);
if (nchar>0) {
pnext += nchar;
totalChar += nchar;
if(buf) {
char name[256] = "";
if (status > 0) {
errSymLookup(status, name, sizeof(name));
}
nchar = epicsSnprintf(buf, pvt.maxMsgSize, "%s%sfilename=\"%s\" line number=%d",
name, status ? " " : "", pFileName, lineno);
if(nchar < pvt.maxMsgSize)
nchar += epicsVsnprintf(buf + nchar, pvt.maxMsgSize - nchar, pformat, pvar);
msgbufCommit(nchar, pvt.toConsole);
}
strcpy(pnext, "\n");
totalChar++ ; /*include the \n */
msgbufSetSize(totalChar);
va_end(pvar);
}
static void errlogExitHandler(void *pvt)
/* On *NIX. also RTEM and vxWorks during controlled shutdown.
* On Windows when main() explicitly calls epicsExit(0), like default IOC main().
* Switch to sync. print and join errlogThread.
*
* On Windows otherwise, errlogThread killed by exit(), and this handler is never
* invoked. Use of errlog from OS atexit() handler is undefined.
*/
static void errlogExitHandler(void *raw)
{
pvtData.atExit = 1;
epicsEventSignal(pvtData.waitForWork);
epicsEventMustWait(pvtData.waitForExit);
epicsThreadId tid = raw;
epicsMutexMustLock(pvt.msgQueueLock);
pvt.atExit = 1;
epicsMutexUnlock(pvt.msgQueueLock);
epicsEventSignal(pvt.waitForWork);
epicsThreadMustJoin(tid);
}
struct initArgs {
int bufsize;
int maxMsgSize;
size_t bufsize;
size_t maxMsgSize;
};
static void errlogInitPvt(void *arg)
{
struct initArgs *pconfig = (struct initArgs *) arg;
epicsThreadId tid;
epicsThreadId tid = NULL;
epicsThreadOpts topts = EPICS_THREAD_OPTS_INIT;
pvtData.errlogInitFailed = TRUE;
pvtData.buffersize = pconfig->bufsize;
pvtData.maxMsgSize = pconfig->maxMsgSize;
pvtData.msgNeeded = adjustToWorstCaseAlignment(pvtData.maxMsgSize +
sizeof(msgNode));
ellInit(&pvtData.listenerList);
ellInit(&pvtData.msgQueue);
pvtData.toConsole = TRUE;
pvtData.console = NULL;
pvtData.waitForWork = epicsEventMustCreate(epicsEventEmpty);
pvtData.listenerLock = epicsMutexMustCreate();
pvtData.msgQueueLock = epicsMutexMustCreate();
pvtData.waitForFlush = epicsEventMustCreate(epicsEventEmpty);
pvtData.flush = epicsEventMustCreate(epicsEventEmpty);
pvtData.flushLock = epicsMutexMustCreate();
pvtData.waitForExit = epicsEventMustCreate(epicsEventEmpty);
pvtData.pbuffer = callocMustSucceed(1, pvtData.buffersize,
"errlogInitPvt");
topts.priority = epicsThreadPriorityLow;
topts.stackSize = epicsThreadStackSmall;
topts.joinable = 1;
/* Use of *Must* alloc functions would recurse on failure since
* cantProceed() calls us.
*/
pvt.errlogInitFailed = TRUE;
pvt.bufSize = pconfig->bufsize;
pvt.maxMsgSize = pconfig->maxMsgSize;
ellInit(&pvt.listenerList);
pvt.toConsole = TRUE;
pvt.console = stderr;
pvt.waitForWork = epicsEventCreate(epicsEventEmpty);
pvt.listenerLock = epicsMutexCreate();
pvt.msgQueueLock = epicsMutexCreate();
pvt.waitForSeq = epicsEventCreate(epicsEventEmpty);
pvt.log = &pvt.bufs[0];
pvt.print = &pvt.bufs[1];
pvt.log->base = calloc(1, pvt.bufSize);
pvt.print->base = calloc(1, pvt.bufSize);
errSymBld(); /* Better not to do this lazily... */
tid = epicsThreadCreate("errlog", epicsThreadPriorityLow,
epicsThreadGetStackSize(epicsThreadStackSmall),
(EPICSTHREADFUNC)errlogThread, 0);
if(pvt.waitForWork
&& pvt.listenerLock
&& pvt.msgQueueLock
&& pvt.waitForSeq
&& pvt.log->base
&& pvt.print->base
) {
tid = epicsThreadCreateOpt("errlog", (EPICSTHREADFUNC)errlogThread, 0, &topts);
}
if (tid) {
pvtData.errlogInitFailed = FALSE;
pvt.errlogInitFailed = FALSE;
epicsAtExit(errlogExitHandler, tid);
}
}
@@ -503,19 +467,21 @@ int errlogInit2(int bufsize, int maxMsgSize)
static epicsThreadOnceId errlogOnceFlag = EPICS_THREAD_ONCE_INIT;
struct initArgs config;
if (pvtData.atExit)
if (pvt.atExit)
return 0;
if (bufsize < BUFFER_SIZE)
bufsize = BUFFER_SIZE;
if (bufsize < MIN_BUFFER_SIZE)
bufsize = MIN_BUFFER_SIZE;
config.bufsize = bufsize;
if (maxMsgSize < MAX_MESSAGE_SIZE)
if (maxMsgSize < MIN_MESSAGE_SIZE)
maxMsgSize = MIN_MESSAGE_SIZE;
else if (maxMsgSize > MAX_MESSAGE_SIZE)
maxMsgSize = MAX_MESSAGE_SIZE;
config.maxMsgSize = maxMsgSize;
epicsThreadOnce(&errlogOnceFlag, errlogInitPvt, &config);
if (pvtData.errlogInitFailed) {
if (pvt.errlogInitFailed) {
fprintf(stderr,"errlogInit failed\n");
exit(1);
}
@@ -524,172 +490,88 @@ int errlogInit2(int bufsize, int maxMsgSize)
int errlogInit(int bufsize)
{
return errlogInit2(bufsize, MAX_MESSAGE_SIZE);
return errlogInit2(bufsize, MIN_MESSAGE_SIZE);
}
void errlogFlush(void)
{
int count;
/* wait for both buffers to be handled to know that all currently
* logged message have been seen/sent.
*/
errlogInit(0);
if (pvtData.atExit)
return;
/*If nothing in queue don't wake up errlogThread*/
epicsMutexMustLock(pvtData.msgQueueLock);
count = ellCount(&pvtData.msgQueue);
epicsMutexUnlock(pvtData.msgQueueLock);
if (count <= 0)
return;
/*must let errlogThread empty queue*/
epicsMutexMustLock(pvtData.flushLock);
epicsEventSignal(pvtData.flush);
epicsEventSignal(pvtData.waitForWork);
epicsEventMustWait(pvtData.waitForFlush);
epicsMutexUnlock(pvtData.flushLock);
errlogSequence();
errlogSequence();
}
static void errlogThread(void)
{
listenerNode *plistenerNode;
int noConsoleMessage;
char *pmessage;
epicsMutexMustLock(pvt.msgQueueLock);
while (!pvt.atExit) {
pvt.flushSeq++;
epicsAtExit(errlogExitHandler,0);
while (TRUE) {
epicsEventMustWait(pvtData.waitForWork);
while ((pmessage = msgbufGetSend(&noConsoleMessage))) {
epicsMutexMustLock(pvtData.listenerLock);
if (pvtData.toConsole && !noConsoleMessage) {
FILE *console = pvtData.console ? pvtData.console : stderr;
if(pvt.log->pos==0u) {
int wakeFlusher = pvt.nFlushers!=0;
epicsMutexUnlock(pvt.msgQueueLock);
if(wakeFlusher)
epicsEventMustTrigger(pvt.waitForSeq);
epicsEventMustWait(pvt.waitForWork);
epicsMutexMustLock(pvt.msgQueueLock);
fprintf(console, "%s", pmessage);
} else {
/* snapshot and swap buffers for use while unlocked */
size_t nLost = pvt.nLost;
FILE *console = pvt.toConsole ? pvt.console : NULL;
size_t pos = 0u;
buffer_t *print;
{
buffer_t *temp = pvt.log;
pvt.log = pvt.print;
pvt.print = print = temp;
}
pvt.nLost = 0u;
epicsMutexUnlock(pvt.msgQueueLock);
while(pos < print->pos) {
listenerNode *plistenerNode;
const char* base = print->base + pos;
size_t mlen = epicsStrnLen(base+1u, pvt.bufSize - pos);
if((base[0]&ERL_STATE_MASK) != ERL_STATE_READY || mlen>=pvt.bufSize - pos) {
fprintf(stderr, "Logic Error: errlog buffer corruption. %02x, %zu\n",
(unsigned)base[0], mlen);
/* try to reset and recover */
break;
}
if(base[0]&ERL_LOCALECHO && console) {
fprintf(console, "%s", base+1u);
}
epicsMutexMustLock(pvt.listenerLock);
plistenerNode = (listenerNode *)ellFirst(&pvt.listenerList);
while (plistenerNode) {
(*plistenerNode->listener)(plistenerNode->pPrivate, base+1u);
plistenerNode = (listenerNode *)ellNext(&plistenerNode->node);
}
epicsMutexUnlock(pvt.listenerLock);
pos += 1u + mlen+1u;
}
memset(print->base, 0, pvt.bufSize);
print->pos = 0u;
if(nLost && console)
fprintf(console, "errlog: lost %zu messages\n", nLost);
if(console)
fflush(console);
}
plistenerNode = (listenerNode *)ellFirst(&pvtData.listenerList);
while (plistenerNode) {
(*plistenerNode->listener)(plistenerNode->pPrivate, pmessage);
plistenerNode = (listenerNode *)ellNext(&plistenerNode->node);
}
epicsMutexMustLock(pvt.msgQueueLock);
epicsMutexUnlock(pvtData.listenerLock);
msgbufFreeSend();
}
if (pvtData.atExit)
break;
if (epicsEventTryWait(pvtData.flush) != epicsEventWaitOK)
continue;
epicsThreadSleep(.2); /*just wait an extra .2 seconds*/
epicsEventSignal(pvtData.waitForFlush);
}
epicsEventSignal(pvtData.waitForExit);
}
static msgNode * msgbufGetNode(void)
{
char *pbuffer = pvtData.pbuffer;
char *pnextFree;
msgNode *pnextSend;
if (ellCount(&pvtData.msgQueue) == 0 ) {
pnextFree = pbuffer; /* Reset if empty */
}
else {
msgNode *pfirst = (msgNode *)ellFirst(&pvtData.msgQueue);
msgNode *plast = (msgNode *)ellLast(&pvtData.msgQueue);
char *plimit = pbuffer + pvtData.buffersize;
pnextFree = plast->message + adjustToWorstCaseAlignment(plast->length);
if (pfirst > plast) {
plimit = (char *)pfirst;
}
else if (pnextFree + pvtData.msgNeeded > plimit) {
pnextFree = pbuffer; /* Hit end, wrap to start */
plimit = (char *)pfirst;
}
if (pnextFree + pvtData.msgNeeded > plimit) {
return 0; /* No room */
}
}
pnextSend = (msgNode *)pnextFree;
pnextSend->message = pnextFree + sizeof(msgNode);
pnextSend->length = 0;
return pnextSend;
}
static char * msgbufGetFree(int noConsoleMessage)
{
msgNode *pnextSend;
if (epicsMutexLock(pvtData.msgQueueLock) != epicsMutexLockOK)
return 0;
if ((ellCount(&pvtData.msgQueue) == 0) && pvtData.missedMessages) {
int nchar;
pnextSend = msgbufGetNode();
nchar = sprintf(pnextSend->message,
"errlog: %d messages were discarded\n", pvtData.missedMessages);
pnextSend->length = nchar + 1;
pvtData.missedMessages = 0;
ellAdd(&pvtData.msgQueue, &pnextSend->node);
}
pvtData.pnextSend = pnextSend = msgbufGetNode();
if (pnextSend) {
pnextSend->noConsoleMessage = noConsoleMessage;
pnextSend->length = 0;
return pnextSend->message; /* NB: msgQueueLock is still locked */
}
++pvtData.missedMessages;
epicsMutexUnlock(pvtData.msgQueueLock);
return 0;
}
static void msgbufSetSize(int size)
{
msgNode *pnextSend = pvtData.pnextSend;
pnextSend->length = size+1;
ellAdd(&pvtData.msgQueue, &pnextSend->node);
epicsMutexUnlock(pvtData.msgQueueLock);
epicsEventSignal(pvtData.waitForWork);
}
static char * msgbufGetSend(int *noConsoleMessage)
{
msgNode *pnextSend;
epicsMutexMustLock(pvtData.msgQueueLock);
pnextSend = (msgNode *)ellFirst(&pvtData.msgQueue);
epicsMutexUnlock(pvtData.msgQueueLock);
if (!pnextSend)
return 0;
*noConsoleMessage = pnextSend->noConsoleMessage;
return pnextSend->message;
}
static void msgbufFreeSend(void)
{
msgNode *pnextSend;
epicsMutexMustLock(pvtData.msgQueueLock);
pnextSend = (msgNode *)ellFirst(&pvtData.msgQueue);
if (!pnextSend) {
FILE *console = pvtData.console ? pvtData.console : stderr;
fprintf(console, "errlog: msgbufFreeSend logic error\n");
epicsThreadSuspendSelf();
}
ellDelete(&pvtData.msgQueue, &pnextSend->node);
epicsMutexUnlock(pvtData.msgQueueLock);
epicsMutexUnlock(pvt.msgQueueLock);
}

View File

@@ -370,7 +370,8 @@ MAIN(epicsErrlogTest)
testDiag("Logged %u messages", pvt.count);
epicsEventMustWait(pvt.done);
testEqInt(pvt.count, N+1);
/* Expect N+1 messages +- 1 depending on impl */
testOk(pvt.count >= N && pvt.count<=N+2, "Logged %u messages, expected %zu", pvt.count, N+1);
/* Clean up */
testOk(1 == errlogRemoveListeners(&logClient, &pvt),