diff --git a/TODO b/TODO index c0e6c4d..cc19617 100644 --- a/TODO +++ b/TODO @@ -1,5 +1,3 @@ -redefine size encoding to be able to carry negative values (to handle negative offsets), i.e. -128 means take larger integer size (same pattern for 32-bit integer). This also bings symmetric integer range (-value ... value). - opt) connection validation message sends max paylaod size readSize checks if size is in limits of size_t? @@ -19,5 +17,3 @@ readSize checks if size is in limits of size_t? void transportUnresponsive() { not implemented (also in Java) -socket termination - usage of epicsSocketSystemCallInterruptMechanismQuery() API - diff --git a/pvAccessApp/remote/blockingTCPAcceptor.cpp b/pvAccessApp/remote/blockingTCPAcceptor.cpp index bd99b9a..f778f6e 100644 --- a/pvAccessApp/remote/blockingTCPAcceptor.cpp +++ b/pvAccessApp/remote/blockingTCPAcceptor.cpp @@ -73,7 +73,7 @@ namespace pvAccess { int retval = ::bind(_serverSocketChannel, &_bindAddress.sa, sizeof(sockaddr)); if(retval<0) { epicsSocketConvertErrnoToString(strBuffer, sizeof(strBuffer)); - LOG(logLevelDebug, "Socket bind error: %s", strBuffer); + LOG(logLevelDebug, "Socket bind error: %s.", strBuffer); if(_bindAddress.ia.sin_port!=0) { // failed to bind to specified bind address, // try to get port dynamically, but only once @@ -163,21 +163,21 @@ namespace pvAccess { if(newClient!=INVALID_SOCKET) { // accept succeeded ipAddrToDottedIP(&address.ia, ipAddrStr, sizeof(ipAddrStr)); - LOG(logLevelDebug, "Accepted connection from PVA client: %s", ipAddrStr); + LOG(logLevelDebug, "Accepted connection from PVA client: %s.", ipAddrStr); // enable TCP_NODELAY (disable Nagle's algorithm) int optval = 1; // true int retval = ::setsockopt(newClient, IPPROTO_TCP, TCP_NODELAY, (char *)&optval, sizeof(int)); if(retval<0) { epicsSocketConvertErrnoToString(strBuffer, sizeof(strBuffer)); - LOG(logLevelDebug, "Error setting TCP_NODELAY: %s", strBuffer); + LOG(logLevelDebug, "Error setting TCP_NODELAY: %s.", strBuffer); } // enable TCP_KEEPALIVE retval = ::setsockopt(newClient, SOL_SOCKET, SO_KEEPALIVE, (char *)&optval, sizeof(int)); if(retval<0) { epicsSocketConvertErrnoToString(strBuffer, sizeof(strBuffer)); - LOG(logLevelDebug, "Error setting SO_KEEPALIVE: %s", strBuffer); + LOG(logLevelDebug, "Error setting SO_KEEPALIVE: %s.", strBuffer); } // TODO tune buffer sizes?! @@ -188,7 +188,7 @@ namespace pvAccess { retval = getsockopt(newClient, SOL_SOCKET, SO_SNDBUF, (char *)&_socketSendBufferSize, &intLen); if(retval<0) { epicsSocketConvertErrnoToString(strBuffer, sizeof(strBuffer)); - LOG(logLevelDebug, "Error getting SO_SNDBUF: %s", strBuffer); + LOG(logLevelDebug, "Error getting SO_SNDBUF: %s.", strBuffer); } /** @@ -213,7 +213,7 @@ namespace pvAccess { return; } - LOG(logLevelDebug, "Serving to PVA client: %s", ipAddrStr); + LOG(logLevelDebug, "Serving to PVA client: %s.", ipAddrStr); }// accept succeeded else diff --git a/pvAccessApp/remote/blockingTCPConnector.cpp b/pvAccessApp/remote/blockingTCPConnector.cpp index 2e12404..7552035 100644 --- a/pvAccessApp/remote/blockingTCPConnector.cpp +++ b/pvAccessApp/remote/blockingTCPConnector.cpp @@ -50,7 +50,7 @@ namespace epics { if (socket == INVALID_SOCKET) { epicsSocketConvertErrnoToString(strBuffer, sizeof(strBuffer)); - LOG(logLevelWarn, "Socket create error: %s", strBuffer); + LOG(logLevelWarn, "Socket create error: %s.", strBuffer); return INVALID_SOCKET; } else { @@ -60,7 +60,7 @@ namespace epics { else { epicsSocketDestroy (socket); epicsSocketConvertErrnoToString(strBuffer, sizeof(strBuffer)); - LOG(logLevelDebug, "Socket connect error: %s", strBuffer); + LOG(logLevelDebug, "Socket connect error: %s.", strBuffer); } } } @@ -82,7 +82,7 @@ namespace epics { Transport::shared_pointer transport = context->getTransportRegistry()->get("TCP", &address, priority); if(transport.get()) { LOG(logLevelDebug, - "Reusing existing connection to PVA server: %s", + "Reusing existing connection to PVA server: %s.", ipAddrStr); if (transport->acquire(client)) return transport; @@ -95,13 +95,13 @@ namespace epics { transport = context->getTransportRegistry()->get("TCP", &address, priority); if(transport.get()) { LOG(logLevelDebug, - "Reusing existing connection to PVA server: %s", + "Reusing existing connection to PVA server: %s.", ipAddrStr); if (transport->acquire(client)) return transport; } - LOG(logLevelDebug, "Connecting to PVA server: %s", ipAddrStr); + LOG(logLevelDebug, "Connecting to PVA server: %s.", ipAddrStr); socket = tryConnect(address, 3); @@ -123,7 +123,7 @@ namespace epics { if(retval<0) { char errStr[64]; epicsSocketConvertErrnoToString(errStr, sizeof(errStr)); - LOG(logLevelWarn, "Error setting TCP_NODELAY: %s", errStr); + LOG(logLevelWarn, "Error setting TCP_NODELAY: %s.", errStr); } // enable TCP_KEEPALIVE @@ -133,7 +133,7 @@ namespace epics { { char errStr[64]; epicsSocketConvertErrnoToString(errStr, sizeof(errStr)); - LOG(logLevelWarn, "Error setting SO_KEEPALIVE: %s", errStr); + LOG(logLevelWarn, "Error setting SO_KEEPALIVE: %s.", errStr); } // TODO tune buffer sizes?! Win32 defaults are 8k, which is OK @@ -147,7 +147,7 @@ namespace epics { if(retval<0) { char strBuffer[64]; epicsSocketConvertErrnoToString(strBuffer, sizeof(strBuffer)); - LOG(logLevelDebug, "Error getting SO_SNDBUF: %s", strBuffer); + LOG(logLevelDebug, "Error getting SO_SNDBUF: %s.", strBuffer); } transport = detail::BlockingClientTCPTransportCodec::create( @@ -168,7 +168,7 @@ namespace epics { // TODO send security token - LOG(logLevelDebug, "Connected to PVA server: %s", ipAddrStr); + LOG(logLevelDebug, "Connected to PVA server: %s.", ipAddrStr); _namedLocker.releaseSynchronizationObject(&address); return transport; diff --git a/pvAccessApp/remote/blockingUDPTransport.cpp b/pvAccessApp/remote/blockingUDPTransport.cpp index 1c46b8c..3d20eae 100644 --- a/pvAccessApp/remote/blockingUDPTransport.cpp +++ b/pvAccessApp/remote/blockingUDPTransport.cpp @@ -52,23 +52,6 @@ inline int sendto(int s, const char *buf, size_t len, int flags, const struct so _threadId(0) { PVACCESS_REFCOUNT_MONITOR_CONSTRUCT(blockingUDPTransport); - - // set receive timeout so that we do not have problems at shutdown (recvfrom would block) - struct timeval timeout; - memset(&timeout, 0, sizeof(struct timeval)); - timeout.tv_sec = 1; - timeout.tv_usec = 0; - - if (unlikely(::setsockopt (_channel, SOL_SOCKET, SO_RCVTIMEO, (char*)&timeout, sizeof(timeout)) < 0)) - { - char errStr[64]; - epicsSocketConvertErrnoToString(errStr, sizeof(errStr)); - LOG(logLevelError, - "Failed to set SO_RCVTIMEO for UDP socket %s: %s.", - inetAddressToString(_bindAddress).c_str(), errStr); - } - - } BlockingUDPTransport::~BlockingUDPTransport() { @@ -84,9 +67,13 @@ inline int sendto(int s, const char *buf, size_t len, int flags, const struct so void BlockingUDPTransport::start() { - String threadName = "UDP-receive "+inetAddressToString(_bindAddress); - LOG(logLevelDebug, "Starting thread: %s",threadName.c_str()); - + String threadName = "UDP-receive " + inetAddressToString(_bindAddress); + + if (IS_LOGGABLE(logLevelTrace)) + { + LOG(logLevelTrace, "Starting thread: %s.", threadName.c_str()); + } + _threadId = epicsThreadCreate(threadName.c_str(), epicsThreadPriorityMedium, epicsThreadGetStackSize(epicsThreadStackSmall), @@ -102,39 +89,44 @@ inline int sendto(int s, const char *buf, size_t len, int flags, const struct so Lock guard(_mutex); if(_closed.get()) return; _closed.set(); - + } + + if (IS_LOGGABLE(logLevelDebug)) + { LOG(logLevelDebug, "UDP socket %s closed.", inetAddressToString(_bindAddress).c_str()); - - epicsSocketSystemCallInterruptMechanismQueryInfo info = - epicsSocketSystemCallInterruptMechanismQuery (); - switch ( info ) { - case esscimqi_socketCloseRequired: - epicsSocketDestroy ( _channel ); - break; - case esscimqi_socketBothShutdownRequired: - { - int status = ::shutdown ( _channel, SHUT_RDWR ); - if ( status ) { - char sockErrBuf[64]; - epicsSocketConvertErrnoToString ( - sockErrBuf, sizeof ( sockErrBuf ) ); - LOG(logLevelDebug, - "UDP socket %s failed to shutdown: %s.", - inetAddressToString(_bindAddress).c_str(), sockErrBuf); - } - epicsSocketDestroy ( _channel ); } - break; - case esscimqi_socketSigAlarmRequired: - // TODO (not supported anymore anyway) - default: - epicsSocketDestroy(_channel); - } -} - // TODO send yourself a packet + epicsSocketSystemCallInterruptMechanismQueryInfo info = + epicsSocketSystemCallInterruptMechanismQuery (); + switch ( info ) + { + case esscimqi_socketCloseRequired: + epicsSocketDestroy ( _channel ); + break; + case esscimqi_socketBothShutdownRequired: + { + /*int status =*/ ::shutdown ( _channel, SHUT_RDWR ); + /* + if ( status ) { + char sockErrBuf[64]; + epicsSocketConvertErrnoToString ( + sockErrBuf, sizeof ( sockErrBuf ) ); + LOG(logLevelDebug, + "UDP socket %s failed to shutdown: %s.", + inetAddressToString(_bindAddress).c_str(), sockErrBuf); + } + */ + epicsSocketDestroy ( _channel ); + } + break; + case esscimqi_socketSigAlarmRequired: + // not supported anymore anyway + default: + epicsSocketDestroy(_channel); + } + // wait for send thread to exit cleanly if (waitForThreadToComplete) @@ -255,7 +247,7 @@ inline int sendto(int s, const char *buf, size_t len, int flags, const struct so { char errStr[64]; epicsSocketConvertErrnoToString(errStr, sizeof(errStr)); - LOG(logLevelError, "Socket recvfrom error: %s", errStr); + LOG(logLevelError, "Socket recvfrom error: %s.", errStr); } close(false); @@ -268,12 +260,11 @@ inline int sendto(int s, const char *buf, size_t len, int flags, const struct so close(false); } - String threadName = "UDP-receive "+inetAddressToString(_bindAddress); - /* - char threadName[40]; - epicsThreadGetName(_threadId, threadName, 40); - */ - LOG(logLevelDebug, "Thread '%s' exiting", threadName.c_str()); + if (IS_LOGGABLE(logLevelTrace)) + { + String threadName = "UDP-receive "+inetAddressToString(_bindAddress); + LOG(logLevelTrace, "Thread '%s' exiting.", threadName.c_str()); + } _shutdownEvent.signal(); } @@ -337,7 +328,7 @@ inline int sendto(int s, const char *buf, size_t len, int flags, const struct so { char errStr[64]; epicsSocketConvertErrnoToString(errStr, sizeof(errStr)); - LOG(logLevelDebug, "Socket sendto error: %s", errStr); + LOG(logLevelDebug, "Socket sendto error: %s.", errStr); return false; } @@ -358,7 +349,7 @@ inline int sendto(int s, const char *buf, size_t len, int flags, const struct so { char errStr[64]; epicsSocketConvertErrnoToString(errStr, sizeof(errStr)); - LOG(logLevelDebug, "Socket sendto error: %s", errStr); + LOG(logLevelDebug, "Socket sendto error: %s.", errStr); allOK = false; } } @@ -379,7 +370,7 @@ inline int sendto(int s, const char *buf, size_t len, int flags, const struct so { char errStr[64]; epicsSocketConvertErrnoToString(errStr, sizeof(errStr)); - LOG(logLevelError, "Socket getsockopt SO_RCVBUF error: %s", errStr); + LOG(logLevelError, "Socket getsockopt SO_RCVBUF error: %s.", errStr); } return (size_t)sockBufSize; diff --git a/pvAccessApp/remote/codec.cpp b/pvAccessApp/remote/codec.cpp index a96bebe..5cf50b0 100644 --- a/pvAccessApp/remote/codec.cpp +++ b/pvAccessApp/remote/codec.cpp @@ -131,7 +131,7 @@ namespace epics { if (magicCode != PVA_MAGIC) { LOG(logLevelError, - "Invalid header received from the client at %s:%d: %s," + "Invalid header received from the client at %s:%d: %s.," " disconnecting...", __FILE__, __LINE__, inetAddressToString(*getLastReadBufferSocketAddress()).c_str()); invalidDataStreamHandler(); @@ -406,7 +406,7 @@ namespace epics { msg << "requested for buffer size " << size << ", but maximum " << MAX_ENSURE_DATA_SIZE << " is allowed."; LOG(logLevelWarn, - "%s at %s:%d,", msg.str().c_str(), __FILE__, __LINE__); + "%s at %s:%d.,", msg.str().c_str(), __FILE__, __LINE__); std::string s = msg.str(); throw std::invalid_argument(s); } @@ -691,7 +691,7 @@ namespace epics { size << ", but only " << _maxSendPayloadSize << " available."; std::string s = msg.str(); LOG(logLevelWarn, - "%s at %s:%d,", msg.str().c_str(), __FILE__, __LINE__); + "%s at %s:%d.,", msg.str().c_str(), __FILE__, __LINE__); throw std::invalid_argument(s); } @@ -890,7 +890,7 @@ namespace epics { std::ostringstream msg; msg << "an exception caught while processing a send message: " << e.what(); - LOG(logLevelWarn, "%s at %s:%d", + LOG(logLevelWarn, "%s at %s:%d.", msg.str().c_str(), __FILE__, __LINE__); try { @@ -1029,7 +1029,7 @@ namespace epics { __FILE__, __LINE__, e.what()); } catch (...) { LOG(logLevelWarn, - "unknown exception caught while in sendThread at %s:%d", + "unknown exception caught while in sendThread at %s:%d.", __FILE__, __LINE__); } } @@ -1063,7 +1063,7 @@ namespace epics { __FILE__, __LINE__, e.what()); } catch (...) { LOG(logLevelWarn, - "unknown exception caught while in sendThread at %s:%d", + "unknown exception caught while in sendThread at %s:%d.", __FILE__, __LINE__); } } @@ -1111,38 +1111,47 @@ namespace epics { char errStr[64]; epicsSocketConvertErrnoToString(errStr, sizeof(errStr)); LOG(logLevelError, - "Error fetching socket remote address: %s", + "Error fetching socket remote address: %s.", errStr); } - - // set receive timeout so that we do not have problems at - //shutdown (recvfrom would block) - struct timeval timeout; - memset(&timeout, 0, sizeof(struct timeval)); - timeout.tv_sec = 1; - timeout.tv_usec = 0; - - // TODO remove this and implement use epicsSocketSystemCallInterruptMechanismQuery - if (unlikely(::setsockopt (_channel, SOL_SOCKET, SO_RCVTIMEO, - (char*)&timeout, sizeof(timeout)) < 0)) - { - char errStr[64]; - epicsSocketConvertErrnoToString(errStr, sizeof(errStr)); - LOG(logLevelError, - "Failed to set SO_RCVTIMEO for TDP socket %s: %s.", - inetAddressToString(_socketAddress).c_str(), errStr); - } - } // must be called only once, when there will be no operation on socket (e.g. just before tx/rx thread exists) void BlockingSocketAbstractCodec::internalDestroy() { - if(_channel != INVALID_SOCKET) { - // TODO ::shutdown for some OS??!!! - epicsSocketDestroy(_channel); - _channel = INVALID_SOCKET; - } + if(_channel != INVALID_SOCKET) { + + epicsSocketSystemCallInterruptMechanismQueryInfo info = + epicsSocketSystemCallInterruptMechanismQuery (); + switch ( info ) + { + case esscimqi_socketCloseRequired: + epicsSocketDestroy ( _channel ); + break; + case esscimqi_socketBothShutdownRequired: + { + /*int status =*/ ::shutdown ( _channel, SHUT_RDWR ); + /* + if ( status ) { + char sockErrBuf[64]; + epicsSocketConvertErrnoToString ( + sockErrBuf, sizeof ( sockErrBuf ) ); + LOG(logLevelDebug, + "TCP socket to %s failed to shutdown: %s.", + inetAddressToString(_socketAddress).c_str(), sockErrBuf); + } + */ + epicsSocketDestroy ( _channel ); + } + break; + case esscimqi_socketSigAlarmRequired: + // not supported anymore anyway + default: + epicsSocketDestroy(_channel); + } + + _channel = INVALID_SOCKET; + } } @@ -1250,6 +1259,16 @@ namespace epics { } + void BlockingTCPTransportCodec::internalClose(bool force) { + BlockingSocketAbstractCodec::internalClose(force); + if (IS_LOGGABLE(logLevelDebug)) + { + LOG(logLevelDebug, + "TCP socket to %s closed.", + inetAddressToString(_socketAddress).c_str()); + } + } + BlockingServerTCPTransportCodec::BlockingServerTCPTransportCodec( Context::shared_pointer const & context, SOCKET channel, @@ -1545,7 +1564,7 @@ namespace epics { { char ipAddrStr[48]; ipAddrToDottedIP(&_socketAddress.ia, ipAddrStr, sizeof(ipAddrStr)); - LOG(logLevelDebug, "Releasing transport to %s.", ipAddrStr); + LOG(logLevelDebug, "Releasing TCP transport to %s.", ipAddrStr); } _owners.erase(clientID); diff --git a/pvAccessApp/remote/codec.h b/pvAccessApp/remote/codec.h index ce4a5d3..a64406a 100644 --- a/pvAccessApp/remote/codec.h +++ b/pvAccessApp/remote/codec.h @@ -535,6 +535,8 @@ namespace epics { { } + virtual void internalClose(bool force); + Context::shared_pointer _context; IntrospectionRegistry _incomingIR; diff --git a/runTestServer b/runTestServer index 67f0e05..3a7c66a 100755 --- a/runTestServer +++ b/runTestServer @@ -16,4 +16,4 @@ fi # testServer echo "Starting pvAccess C++ test server..." -./bin/$EPICS_HOST_ARCH/testServer +./bin/$EPICS_HOST_ARCH/testServer $*