Merge remote-tracking branch 'zimoch/iocLogClientFixesTry2' into 7.0

* zimoch/iocLogClientFixesTry2: (22 commits)
  use EPICS_PRIVATE_API macro and fix bug with darwin/ios
  renamed epicsSocketCountUnsentBytes to epicsSocketUnsentCount and moved it to osi/os/
  fix bug from commit f85454. Apparently epicsExportSharedSymbols is needed even though epicsExport.h is included
  epicsSocketCountUnsentBytes returns -1 on failure
  bugfix: memmove'ed to much
  epicsSockCountUnsentBytes renamed to epicsSocketCountUnsentBytes
  moved logClientSendMessage and made it static
  fix wrong function name in comment
  sending 0 bytes helps to detect broken connections on some systems (but is undefined behavior on Linux, fails on vxWorks and is a documented no-op on Windows)
  cannot print sockets with %d in Windows, they are not small ints but maybe pointers.
  ask logClient socket how many bytes are still in the send queue and don't discard them in case the connection turns out broken.
  use dynamic debug flag for logClient
  increase error message buffer size for long (Windows) error messges
  improve logClientShow to show unsent bytes on level 2 (and fix level 1)
  removed unneeded include
  no need to delay startup only because log server is currently not available
  send pending log messages directly after connecting
  avoid needless memmove calls
  elimitate duplicate code in logClient
  do not discard unsent messages when log server has closed connection, instead try to send them after reconnect
  ...
This commit is contained in:
Michael Davidsaver
2019-10-24 10:22:07 -07:00
11 changed files with 238 additions and 153 deletions

View File

@@ -33,3 +33,6 @@ variable(callbackParallelThreadsDefault,int)
# Real-time operation
variable(dbThreadRealtimeLock,int)
# show logClient network activity
variable(logClientDebug,int)

View File

@@ -18,8 +18,10 @@
#define epicsExportSharedSymbols
#include "envDefs.h"
#include "errlog.h"
#include "logClient.h"
#include "iocLog.h"
#include "epicsExit.h"
int iocLogDisable = 0;
@@ -74,6 +76,24 @@ void epicsShareAPI epicsShareAPI iocLogFlush (void)
}
}
/*
* logClientSendMessage ()
*/
static void logClientSendMessage ( logClientId id, const char * message )
{
if ( !iocLogDisable ) {
logClientSend (id, message);
}
}
/*
* iocLogClientDestroy()
*/
static void iocLogClientDestroy (logClientId id)
{
errlogRemoveListeners (logClientSendMessage, id);
}
/*
* iocLogClientInit()
*/
@@ -89,6 +109,10 @@ static logClientId iocLogClientInit (void)
return NULL;
}
id = logClientCreate (addr, port);
if (id != NULL) {
errlogAddListener (logClientSendMessage, id);
epicsAtExit (iocLogClientDestroy, id);
}
return id;
}
@@ -135,3 +159,4 @@ logClientId epicsShareAPI logClientInit (void)
{
return iocLogClientInit ();
}

View File

@@ -21,11 +21,11 @@
#include <string.h>
#include <stdio.h>
#define EPICS_PRIVATE_API
#define epicsExportSharedSymbols
#include "dbDefs.h"
#include "epicsEvent.h"
#include "iocLog.h"
#include "errlog.h"
#include "epicsMutex.h"
#include "epicsThread.h"
#include "epicsTime.h"
@@ -33,9 +33,13 @@
#include "epicsAssert.h"
#include "epicsExit.h"
#include "epicsSignal.h"
#include "epicsExport.h"
#include "logClient.h"
int logClientDebug = 0;
epicsExportAddress (int, logClientDebug);
typedef struct {
char msgBuf[0x4000];
struct sockaddr_in addr;
@@ -44,8 +48,10 @@ typedef struct {
SOCKET sock;
epicsThreadId restartThreadId;
epicsEventId stateChangeNotify;
epicsEventId shutdownNotify;
unsigned connectCount;
unsigned nextMsgIndex;
unsigned backlog;
unsigned connected;
unsigned shutdown;
unsigned shutdownConfirm;
@@ -53,7 +59,6 @@ typedef struct {
} logClient;
static const double LOG_RESTART_DELAY = 5.0; /* sec */
static const double LOG_SERVER_CREATE_CONNECT_SYNC_TIMEOUT = 5.0; /* sec */
static const double LOG_SERVER_SHUTDOWN_TIMEOUT = 30.0; /* sec */
/*
@@ -66,10 +71,10 @@ static char* logClientPrefix = NULL;
*/
static void logClientClose ( logClient *pClient )
{
# ifdef DEBUG
if (logClientDebug) {
fprintf (stderr, "log client: lingering for connection close...");
fflush (stderr);
# endif
}
/*
* mutex on
@@ -84,8 +89,6 @@ static void logClientClose ( logClient *pClient )
pClient->sock = INVALID_SOCKET;
}
pClient->nextMsgIndex = 0u;
memset ( pClient->msgBuf, '\0', sizeof ( pClient->msgBuf ) );
pClient->connected = 0u;
/*
@@ -93,9 +96,8 @@ static void logClientClose ( logClient *pClient )
*/
epicsMutexUnlock (pClient->mutex);
# ifdef DEBUG
if (logClientDebug)
fprintf (stderr, "done\n");
# endif
}
/*
@@ -113,6 +115,7 @@ static void logClientDestroy (logClientId id)
epicsMutexMustLock ( pClient->mutex );
pClient->shutdown = 1u;
epicsMutexUnlock ( pClient->mutex );
epicsEventSignal ( pClient->shutdownNotify );
/* unblock log client thread blocking in send() or connect() */
interruptInfo =
@@ -154,13 +157,11 @@ static void logClientDestroy (logClientId id)
return;
}
errlogRemoveListeners ( logClientSendMessage, (void *) pClient );
logClientClose ( pClient );
epicsMutexDestroy ( pClient->mutex );
epicsEventDestroy ( pClient->stateChangeNotify );
epicsEventDestroy ( pClient->shutdownNotify );
free ( pClient );
}
@@ -176,61 +177,26 @@ static void sendMessageChunk(logClient * pClient, const char * message) {
unsigned msgBufBytesLeft =
sizeof ( pClient->msgBuf ) - pClient->nextMsgIndex;
if ( strSize > msgBufBytesLeft ) {
int status;
if ( ! pClient->connected ) {
break;
}
if ( msgBufBytesLeft > 0u ) {
memcpy ( & pClient->msgBuf[pClient->nextMsgIndex],
message, msgBufBytesLeft );
pClient->nextMsgIndex += msgBufBytesLeft;
strSize -= msgBufBytesLeft;
message += msgBufBytesLeft;
}
status = send ( pClient->sock, pClient->msgBuf,
pClient->nextMsgIndex, 0 );
if ( status > 0 ) {
unsigned nSent = (unsigned) status;
if ( nSent < pClient->nextMsgIndex ) {
unsigned newNextMsgIndex = pClient->nextMsgIndex - nSent;
memmove ( pClient->msgBuf, & pClient->msgBuf[nSent],
newNextMsgIndex );
pClient->nextMsgIndex = newNextMsgIndex;
}
else {
pClient->nextMsgIndex = 0u;
}
}
else {
if ( ! pClient->shutdown ) {
char sockErrBuf[64];
if ( status ) {
epicsSocketConvertErrnoToString ( sockErrBuf, sizeof ( sockErrBuf ) );
}
else {
strcpy ( sockErrBuf, "server initiated disconnect" );
}
fprintf ( stderr, "log client: lost contact with log server at \"%s\" because \"%s\"\n",
pClient->name, sockErrBuf );
}
logClientClose ( pClient );
break;
}
if ( msgBufBytesLeft < strSize && pClient->nextMsgIndex != 0u && pClient->connected)
{
/* buffer is full, thus flush it */
logClientFlush ( pClient );
msgBufBytesLeft = sizeof ( pClient->msgBuf ) - pClient->nextMsgIndex;
}
else {
memcpy ( & pClient->msgBuf[pClient->nextMsgIndex],
message, strSize );
pClient->nextMsgIndex += strSize;
if ( msgBufBytesLeft == 0u ) {
fprintf ( stderr, "log client: messages to \"%s\" are lost\n",
pClient->name );
break;
}
if ( msgBufBytesLeft > strSize) msgBufBytesLeft = strSize;
memcpy ( & pClient->msgBuf[pClient->nextMsgIndex],
message, msgBufBytesLeft );
pClient->nextMsgIndex += msgBufBytesLeft;
strSize -= msgBufBytesLeft;
message += msgBufBytesLeft;
}
}
/*
* logClientSend ()
*/
@@ -255,43 +221,54 @@ void epicsShareAPI logClientSend ( logClientId id, const char * message )
void epicsShareAPI logClientFlush ( logClientId id )
{
unsigned nSent;
int status = 0;
logClient * pClient = ( logClient * ) id;
if ( ! pClient ) {
if ( ! pClient || ! pClient->connected ) {
return;
}
epicsMutexMustLock ( pClient->mutex );
while ( pClient->nextMsgIndex && pClient->connected ) {
int status = send ( pClient->sock, pClient->msgBuf,
pClient->nextMsgIndex, 0 );
if ( status > 0 ) {
unsigned nSent = (unsigned) status;
if ( nSent < pClient->nextMsgIndex ) {
unsigned newNextMsgIndex = pClient->nextMsgIndex - nSent;
memmove ( pClient->msgBuf, & pClient->msgBuf[nSent],
newNextMsgIndex );
pClient->nextMsgIndex = newNextMsgIndex;
}
else {
pClient->nextMsgIndex = 0u;
}
nSent = pClient->backlog;
while ( nSent < pClient->nextMsgIndex && pClient->connected ) {
status = send ( pClient->sock, pClient->msgBuf + nSent,
pClient->nextMsgIndex - nSent, 0 );
if ( status < 0 ) break;
nSent += status;
}
if ( pClient->backlog > 0 && status >= 0 )
{
/* On Linux send 0 bytes can detect EPIPE */
/* NOOP on Windows, fails on vxWorks */
errno = 0;
status = send ( pClient->sock, NULL, 0, 0 );
if (!(errno == ECONNRESET || errno == EPIPE)) status = 0;
}
if ( status < 0 ) {
if ( ! pClient->shutdown ) {
char sockErrBuf[128];
epicsSocketConvertErrnoToString ( sockErrBuf, sizeof ( sockErrBuf ) );
fprintf ( stderr, "log client: lost contact with log server at \"%s\" because \"%s\"\n",
pClient->name, sockErrBuf );
}
else {
if ( ! pClient->shutdown ) {
char sockErrBuf[64];
if ( status ) {
epicsSocketConvertErrnoToString ( sockErrBuf, sizeof ( sockErrBuf ) );
}
else {
strcpy ( sockErrBuf, "server initiated disconnect" );
}
fprintf ( stderr, "log client: lost contact with log server at \"%s\" because \"%s\"\n",
pClient->name, sockErrBuf );
}
logClientClose ( pClient );
break;
pClient->backlog = 0;
logClientClose ( pClient );
}
else if ( nSent > 0 && pClient->nextMsgIndex > 0 ) {
int backlog = epicsSocketUnsentCount ( pClient->sock );
if (backlog >= 0) {
pClient->backlog = backlog;
nSent -= backlog;
}
pClient->nextMsgIndex -= nSent;
if ( nSent > 0 && pClient->nextMsgIndex > 0 ) {
memmove ( pClient->msgBuf, & pClient->msgBuf[nSent],
pClient->nextMsgIndex );
}
}
epicsMutexUnlock ( pClient->mutex );
@@ -302,10 +279,10 @@ void epicsShareAPI logClientFlush ( logClientId id )
*/
static void logClientMakeSock (logClient *pClient)
{
# ifdef DEBUG
if (logClientDebug) {
fprintf (stderr, "log client: creating socket...");
# endif
fflush (stderr);
}
epicsMutexMustLock (pClient->mutex);
@@ -314,7 +291,7 @@ static void logClientMakeSock (logClient *pClient)
*/
pClient->sock = epicsSocketCreate ( AF_INET, SOCK_STREAM, 0 );
if ( pClient->sock == INVALID_SOCKET ) {
char sockErrBuf[64];
char sockErrBuf[128];
epicsSocketConvertErrnoToString (
sockErrBuf, sizeof ( sockErrBuf ) );
fprintf ( stderr, "log client: no socket error %s\n",
@@ -323,10 +300,8 @@ static void logClientMakeSock (logClient *pClient)
epicsMutexUnlock (pClient->mutex);
# ifdef DEBUG
if (logClientDebug)
fprintf (stderr, "done\n");
# endif
}
/*
@@ -366,7 +341,7 @@ static void logClientConnect (logClient *pClient)
}
else {
if ( pClient->connFailStatus != errnoCpy && ! pClient->shutdown ) {
char sockErrBuf[64];
char sockErrBuf[128];
epicsSocketConvertErrnoToString (
sockErrBuf, sizeof ( sockErrBuf ) );
fprintf (stderr,
@@ -392,7 +367,7 @@ static void logClientConnect (logClient *pClient)
optval = TRUE;
status = setsockopt (pClient->sock, SOL_SOCKET, SO_KEEPALIVE, (char *)&optval, sizeof(optval));
if (status<0) {
char sockErrBuf[64];
char sockErrBuf[128];
epicsSocketConvertErrnoToString (
sockErrBuf, sizeof ( sockErrBuf ) );
fprintf (stderr, "log client: unable to enable keepalive option because \"%s\"\n", sockErrBuf);
@@ -404,11 +379,11 @@ static void logClientConnect (logClient *pClient)
*/
status = shutdown (pClient->sock, SHUT_RD);
if (status < 0) {
char sockErrBuf[64];
char sockErrBuf[128];
epicsSocketConvertErrnoToString (
sockErrBuf, sizeof ( sockErrBuf ) );
fprintf (stderr, "%s:%d shutdown(%d,SHUT_RD) error was \"%s\"\n",
__FILE__, __LINE__, pClient->sock, sockErrBuf);
fprintf (stderr, "%s:%d shutdown(sock,SHUT_RD) error was \"%s\"\n",
__FILE__, __LINE__, sockErrBuf);
/* not fatal (although it shouldn't happen) */
}
@@ -425,7 +400,7 @@ static void logClientConnect (logClient *pClient)
lingerval.l_linger = 60*5;
status = setsockopt (pClient->sock, SOL_SOCKET, SO_LINGER, (char *) &lingerval, sizeof(lingerval));
if (status<0) {
char sockErrBuf[64];
char sockErrBuf[128];
epicsSocketConvertErrnoToString (
sockErrBuf, sizeof ( sockErrBuf ) );
fprintf (stderr, "log client: unable to set linger options because \"%s\"\n", sockErrBuf);
@@ -457,14 +432,10 @@ static void logClientRestart ( logClientId id )
epicsMutexUnlock ( pClient->mutex );
if ( isConn ) {
logClientFlush ( pClient );
}
else {
logClientConnect ( pClient );
}
epicsThreadSleep ( LOG_RESTART_DELAY );
if ( ! isConn ) logClientConnect ( pClient );
logClientFlush ( pClient );
epicsEventWaitWithTimeout ( pClient->shutdownNotify, LOG_RESTART_DELAY);
epicsMutexMustLock ( pClient->mutex );
}
@@ -480,9 +451,7 @@ static void logClientRestart ( logClientId id )
logClientId epicsShareAPI logClientCreate (
struct in_addr server_addr, unsigned short server_port)
{
epicsTimeStamp begin, current;
logClient *pClient;
double diff;
pClient = calloc (1, sizeof (*pClient));
if (pClient==NULL) {
@@ -507,14 +476,22 @@ logClientId epicsShareAPI logClientCreate (
pClient->shutdownConfirm = 0;
epicsAtExit (logClientDestroy, (void*) pClient);
pClient->stateChangeNotify = epicsEventCreate (epicsEventEmpty);
if ( ! pClient->stateChangeNotify ) {
epicsMutexDestroy ( pClient->mutex );
free ( pClient );
return NULL;
}
pClient->shutdownNotify = epicsEventCreate (epicsEventEmpty);
if ( ! pClient->shutdownNotify ) {
epicsMutexDestroy ( pClient->mutex );
epicsEventDestroy ( pClient->stateChangeNotify );
free ( pClient );
return NULL;
}
pClient->restartThreadId = epicsThreadCreate (
"logRestart", epicsThreadPriorityLow,
epicsThreadGetStackSize(epicsThreadStackSmall),
@@ -522,35 +499,12 @@ logClientId epicsShareAPI logClientCreate (
if ( pClient->restartThreadId == NULL ) {
epicsMutexDestroy ( pClient->mutex );
epicsEventDestroy ( pClient->stateChangeNotify );
epicsEventDestroy ( pClient->shutdownNotify );
free (pClient);
fprintf(stderr, "log client: unable to start log client connection watch dog thread\n");
return NULL;
}
/*
* attempt to synchronize with circuit connect
*/
epicsTimeGetCurrent ( & begin );
epicsMutexMustLock ( pClient->mutex );
do {
epicsMutexUnlock ( pClient->mutex );
epicsEventWaitWithTimeout (
pClient->stateChangeNotify,
LOG_SERVER_CREATE_CONNECT_SYNC_TIMEOUT / 10.0 );
epicsTimeGetCurrent ( & current );
diff = epicsTimeDiffInSeconds ( & current, & begin );
epicsMutexMustLock ( pClient->mutex );
}
while ( ! pClient->connected && diff < LOG_SERVER_CREATE_CONNECT_SYNC_TIMEOUT );
epicsMutexUnlock ( pClient->mutex );
if ( ! pClient->connected ) {
fprintf (stderr, "log client create: timed out synchronizing with circuit connect to \"%s\" after %.1f seconds\n",
pClient->name, LOG_SERVER_CREATE_CONNECT_SYNC_TIMEOUT );
}
errlogAddListener ( logClientSendMessage, (void *) pClient );
return (void *) pClient;
}
@@ -568,24 +522,21 @@ void epicsShareAPI logClientShow (logClientId id, unsigned level)
printf ("log client: disconnected from log server at \"%s\"\n", pClient->name);
}
if (level>1) {
printf ("log client: sock=%s, connect cycles = %u\n",
pClient->sock==INVALID_SOCKET?"INVALID":"OK",
pClient->connectCount);
}
if (logClientPrefix) {
printf ("log client: prefix is \"%s\"\n", logClientPrefix);
}
}
/*
* logClientSendMessage (); deprecated
*/
void logClientSendMessage ( logClientId id, const char * message )
{
if ( !iocLogDisable ) {
logClientSend (id, message);
if (level>0) {
printf ("log client: sock %s, connect cycles = %u\n",
pClient->sock==INVALID_SOCKET?"INVALID":"OK",
pClient->connectCount);
}
if (level>1) {
printf ("log client: %u bytes in buffer\n", pClient->nextMsgIndex);
if (pClient->nextMsgIndex)
printf("-------------------------\n"
"%.*s-------------------------\n",
(int)(pClient->nextMsgIndex), pClient->msgBuf);
}
}

View File

@@ -38,7 +38,6 @@ epicsShareFunc void epicsShareAPI iocLogPrefix(const char* prefix);
/* deprecated interface; retained for backward compatibility */
/* note: implementations are in iocLog.c, not logClient.c */
epicsShareFunc logClientId epicsShareAPI logClientInit (void);
epicsShareFunc void logClientSendMessage (logClientId id, const char *message);
#ifdef __cplusplus
}

View File

@@ -86,6 +86,7 @@ endif
Com_SRCS += osdSock.c
Com_SRCS += osdSockAddrReuse.cpp
Com_SRCS += osdSockUnsentCount.c
Com_SRCS += osiSock.c
Com_SRCS += systemCallIntMech.cpp
Com_SRCS += epicsSocketConvertErrnoToString.cpp

View File

@@ -0,0 +1,19 @@
/*************************************************************************\
* EPICS BASE is distributed subject to a Software License Agreement found
* in file LICENSE that is included with this distribution.
\*************************************************************************/
#define EPICS_PRIVATE_API
#include "osiSock.h"
/*
* epicsSocketUnsentCount ()
* See https://www.unix.com/man-page/osx/2/setsockopt
*/
int epicsSocketUnsentCount(SOCKET sock) {
int unsent;
socklen_t len = sizeof(unsent);
if (getsockopt(sock, SOL_SOCKET, SO_NWRITE, &unsent, &len) == 0)
return unsent;
return -1;
}

View File

@@ -0,0 +1,19 @@
/*************************************************************************\
* EPICS BASE is distributed subject to a Software License Agreement found
* in file LICENSE that is included with this distribution.
\*************************************************************************/
#include <linux/sockios.h>
#define EPICS_PRIVATE_API
#include "osiSock.h"
/*
* epicsSocketUnsentCount ()
* See https://linux.die.net/man/7/tcp
*/
int epicsSocketUnsentCount(SOCKET sock) {
int unsent;
if (ioctl(sock, SIOCOUTQ, &unsent) == 0)
return unsent;
return -1;
}

View File

@@ -0,0 +1,26 @@
/*************************************************************************\
* EPICS BASE is distributed subject to a Software License Agreement found
* in file LICENSE that is included with this distribution.
\*************************************************************************/
#define epicsExportSharedSymbols
#define EPICS_PRIVATE_API
#include "osiSock.h"
#include <mstcpip.h>
/*
* epicsSocketUnsentCount ()
* See https://docs.microsoft.com/en-us/windows/win32/api/mstcpip/ns-mstcpip-tcp_info_v0
*/
int epicsSocketUnsentCount(SOCKET sock) {
#if defined (_WIN32) && WINVER >= _WIN32_WINNT_WIN10
/* Windows 10 Version 1703 / Server 2016 */
DWORD infoVersion = 0, bytesReturned;
TCP_INFO_v0 tcpInfo;
int status;
if ((status = WSAIoctl(sock, SIO_TCP_INFO, &infoVersion, sizeof(infoVersion),
&tcpInfo, sizeof(tcpInfo), &bytesReturned, NULL, NULL)) == 0)
return tcpInfo.BytesInFlight;
#endif
return -1;
}

View File

@@ -0,0 +1,15 @@
/*************************************************************************\
* EPICS BASE is distributed subject to a Software License Agreement found
* in file LICENSE that is included with this distribution.
\*************************************************************************/
#define EPICS_PRIVATE_API
#include "osiSock.h"
/*
* epicsSocketUnsentCount ()
*/
int epicsSocketUnsentCount(SOCKET sock) {
/* not implemented */
return -1;
}

View File

@@ -0,0 +1,19 @@
/*************************************************************************\
* EPICS BASE is distributed subject to a Software License Agreement found
* in file LICENSE that is included with this distribution.
\*************************************************************************/
#define EPICS_PRIVATE_API
#include "osiSock.h"
/*
* epicsSocketUnsentCount ()
* See https://www.unix.com/man-page/osx/2/setsockopt
*/
int epicsSocketUnsentCount(SOCKET sock) {
int unsent;
socklen_t len = sizeof(unsent);
if (getsockopt(sock, SOL_SOCKET, SO_NWRITE, &unsent, &len) == 0)
return unsent;
return -1;
}

View File

@@ -52,6 +52,14 @@ enum epicsSocketSystemCallInterruptMechanismQueryInfo {
epicsShareFunc enum epicsSocketSystemCallInterruptMechanismQueryInfo
epicsSocketSystemCallInterruptMechanismQuery ();
#ifdef EPICS_PRIVATE_API
/*
* Some systems (e.g Linux and Windows 10) allow to check the amount
* of unsent data in the output queue.
* Returns -1 if the information is not available.
*/
epicsShareFunc int epicsSocketUnsentCount(SOCKET sock);
#endif
/*
* convert socket address to ASCII in this order