From d03ac9d00fbce24a709878b70ab21b013e69eaf5 Mon Sep 17 00:00:00 2001 From: Matej Sekoranja Date: Tue, 20 Sep 2011 17:37:28 +0200 Subject: [PATCH] TCP read fixed, some unactive alignment code added --- pvAccessApp/ca/caConstants.h | 3 + pvAccessApp/remote/blockingTCP.h | 6 +- pvAccessApp/remote/blockingTCPTransport.cpp | 185 ++++++++++++-------- pvAccessApp/remote/blockingUDP.h | 8 + pvAccessApp/remote/blockingUDPTransport.cpp | 31 ++-- pvAccessApp/remote/channelSearchManager.h | 1 + testApp/remote/testServer.cpp | 2 +- testApp/utils/introspectionRegistryTest.cpp | 6 + testApp/utils/transportRegistryTest.cpp | 1 + 9 files changed, 157 insertions(+), 86 deletions(-) diff --git a/pvAccessApp/ca/caConstants.h b/pvAccessApp/ca/caConstants.h index 5ab07ef..a9759b0 100644 --- a/pvAccessApp/ca/caConstants.h +++ b/pvAccessApp/ca/caConstants.h @@ -75,6 +75,9 @@ namespace epics { /** Invalid IOID. */ const int32 INVALID_IOID = 0; + /** All messages must be aligned to 8-bytes (64-bit). */ + const int32 CA_ALIGNMENT = 8; + /** Default CA provider name. */ const String PVACCESS_DEFAULT_PROVIDER = "local"; } diff --git a/pvAccessApp/remote/blockingTCP.h b/pvAccessApp/remote/blockingTCP.h index 2b2fe1a..3bb8e48 100644 --- a/pvAccessApp/remote/blockingTCP.h +++ b/pvAccessApp/remote/blockingTCP.h @@ -141,8 +141,12 @@ namespace epics { virtual void ensureBuffer(int size); + virtual void alignBuffer(int alignment); + virtual void ensureData(int size); + virtual void alignData(int alignment); + virtual void close(bool force); SendQueueFlushStrategy getSendQueueFlushStrategy() { @@ -172,7 +176,7 @@ namespace epics { protected: virtual void processReadCached(bool nestedCall, - ReceiveStage inStage, int requiredBytes, bool addToBuffer); + ReceiveStage inStage, int requiredBytes); /** * Called to any resources just before closing transport diff --git a/pvAccessApp/remote/blockingTCPTransport.cpp b/pvAccessApp/remote/blockingTCPTransport.cpp index 8e2d736..d9f3d46 100644 --- a/pvAccessApp/remote/blockingTCPTransport.cpp +++ b/pvAccessApp/remote/blockingTCPTransport.cpp @@ -23,11 +23,14 @@ #include #include +#include + /* standard */ #include #include #include + #ifdef _WIN32 #include typedef SSIZE_T ssize_t; @@ -39,6 +42,15 @@ using std::max; using std::min; using std::ostringstream; +// TODO moved to some compiler_utils.h? +#if defined(__GNUC__) + #define likely(x) __builtin_expect (x, 1) + #define unlikely(x) __builtin_expect (x, 0) +#else + #define likely(x) (x) + #define unlikely(x) (x) +#endif + namespace epics { namespace pvAccess { @@ -136,7 +148,7 @@ namespace epics { osiSocklen_t intLen = sizeof(int); int retval = getsockopt(_channel, SOL_SOCKET, SO_SNDBUF, (char *)&_socketSendBufferSize, &intLen); - if(retval<0) { + if(unlikely(retval<0)) { _socketSendBufferSize = MAX_TCP_RECV; char errStr[64]; epicsSocketConvertErrnoToString(errStr, sizeof(errStr)); @@ -147,7 +159,7 @@ namespace epics { osiSocklen_t saSize = sizeof(sockaddr); retval = getpeername(_channel, &(_socketAddress.sa), &saSize); - if(retval<0) { + if(unlikely(retval<0)) { char errStr[64]; epicsSocketConvertErrnoToString(errStr, sizeof(errStr)); LOG(logLevelError, @@ -161,7 +173,7 @@ namespace epics { timeout.tv_sec = 1; timeout.tv_usec = 0; - if (::setsockopt (_channel, SOL_SOCKET, SO_RCVTIMEO, (char*)&timeout, sizeof(timeout)) < 0) + if (unlikely(::setsockopt (_channel, SOL_SOCKET, SO_RCVTIMEO, (char*)&timeout, sizeof(timeout)) < 0)) { char errStr[64]; epicsSocketConvertErrnoToString(errStr, sizeof(errStr)); @@ -285,7 +297,7 @@ namespace epics { osiSocklen_t intLen = sizeof(int); int retval = getsockopt(_channel, SOL_SOCKET, SO_RCVBUF, (char *)&sockBufSize, &intLen); - if(retval<0) + if(unlikely(retval<0)) { char errStr[64]; epicsSocketConvertErrnoToString(errStr, sizeof(errStr)); @@ -319,9 +331,10 @@ namespace epics { } _lastMessageStartPosition = _sendBuffer->getPosition(); + // start with last header - if(!lastMessageCompleted&&_lastSegmentedMessageType!=0) startMessage( - _lastSegmentedMessageCommand, 0); + if (unlikely(!lastMessageCompleted && _lastSegmentedMessageType!=0)) + startMessage(_lastSegmentedMessageCommand, 0); } void BlockingTCPTransport::startMessage(int8 command, int ensureCapacity) { @@ -341,10 +354,10 @@ namespace epics { } void BlockingTCPTransport::ensureBuffer(int size) { - if((int)(_sendBuffer->getRemaining())>=size) return; + if(likely((int)(_sendBuffer->getRemaining())>=size)) return; // too large for buffer... - if(_maxPayloadSizegetRemaining())getRemaining())getRemaining())<(alignment-1))) + ensureBuffer(alignment-1); + _sendBuffer->align(alignment); + } + void BlockingTCPTransport::endMessage(bool hasMoreSegments) { - if(_lastMessageStartPosition>=0) { - - // TODO align? - // set message size + if(likely(_lastMessageStartPosition>=0)) { + + // align + // alignBuffer(CA_ALIGNMENT); + + // set paylaod size _sendBuffer->putInt(_lastMessageStartPosition+sizeof(int16)+2, _sendBuffer->getPosition()-_lastMessageStartPosition -CA_MESSAGE_HEADER_SIZE); int flagsPosition = _lastMessageStartPosition+sizeof(int16); // set segmented bit - if(hasMoreSegments) { + if(likely(hasMoreSegments)) { // first segment - if(_lastSegmentedMessageType==0) { + if(unlikely(_lastSegmentedMessageType==0)) { int8 type = _sendBuffer->getByte(flagsPosition); // set first segment bit @@ -386,7 +410,7 @@ namespace epics { } else { // last segment - if(_lastSegmentedMessageType!=0) { + if(unlikely(_lastSegmentedMessageType!=0)) { // set last segment bit (by clearing first segment bit) _sendBuffer->putByte(flagsPosition, (int8)(_lastSegmentedMessageType&0xEF)); @@ -398,14 +422,13 @@ namespace epics { int position = _sendBuffer->getPosition(); int bytesLeft = _sendBuffer->getRemaining(); - if(position>=_nextMarkerPosition && - bytesLeft>=CA_MESSAGE_HEADER_SIZE) { + if(unlikely(position>=_nextMarkerPosition && + bytesLeft>=CA_MESSAGE_HEADER_SIZE)) { _sendBuffer->putByte(CA_MAGIC); _sendBuffer->putByte(CA_VERSION); _sendBuffer->putByte(1); // control data _sendBuffer->putByte(0); // marker - _sendBuffer->putInt((int)(_totalBytesSent+position - +CA_MESSAGE_HEADER_SIZE)); + _sendBuffer->putInt((int)(_totalBytesSent+position+CA_MESSAGE_HEADER_SIZE)); _nextMarkerPosition = position+_markerPeriodBytes; } } @@ -413,13 +436,13 @@ namespace epics { void BlockingTCPTransport::ensureData(int size) { // enough of data? - if(((int)_socketBuffer->getRemaining())>=size) return; + if(likely(((int)_socketBuffer->getRemaining())>=size)) return; // too large for buffer... - if(_maxPayloadSizegetPosition()-_storedPosition; // no more data and we have some payload left => read buffer - if(_storedPayloadSize>=size) { + if(likely(_storedPayloadSize>=size)) { //LOG(logLevelInfo, // "storedPayloadSize >= size, remaining: %d", // _socketBuffer->getRemaining()); - // just read up remaining payload - // since there is no data on the buffer, read to the beginning of it, at least size bytes - processReadCached(true, PROCESS_PAYLOAD, size, false); + // just read up remaining payload, move current (getPosition(); _storedLimit = _socketBuffer->getLimit(); _socketBuffer->setLimit(min(_storedPosition+_storedPayloadSize, @@ -450,7 +473,7 @@ namespace epics { _socketBuffer->setLimit(_storedLimit); _stage = PROCESS_HEADER; - processReadCached(true, NONE, size, false); + processReadCached(true, NONE, size-remainingBytes); // copy before position for(int i = remainingBytes-1, j = _socketBuffer->getPosition() @@ -467,39 +490,46 @@ namespace epics { // TODO sync _closed // add if missing... - if(!_closed&&((int)_socketBuffer->getRemaining())getRemaining())getRemaining())<(alignment-1))) + ensureData(alignment-1); + + _socketBuffer->align(alignment); } void BlockingTCPTransport::processReadCached(bool nestedCall, - ReceiveStage inStage, int requiredBytes, bool addToBuffer) { + ReceiveStage inStage, int requiredBytes) { try { // TODO sync _closed - while(!_closed) { + while(likely(!_closed)) { if(_stage==READ_FROM_SOCKET||inStage!=NONE) { - int currentStartPosition; - if(addToBuffer) { - currentStartPosition = _socketBuffer->getPosition(); - _socketBuffer->setPosition(_socketBuffer->getLimit()); - _socketBuffer->setLimit(_socketBuffer->getSize()); - } - else { - // add to bytes read - _totalBytesReceived += (_socketBuffer->getPosition() -_startPosition); - // copy remaining bytes, if any - int remainingBytes = _socketBuffer->getRemaining(); - int endPosition = MAX_ENSURE_DATA_BUFFER_SIZE+remainingBytes; - for(int i = MAX_ENSURE_DATA_BUFFER_SIZE; iputByte(i, _socketBuffer->getByte()); + // add to bytes read + int currentPosition = _socketBuffer->getPosition(); + _totalBytesReceived += (currentPosition - _startPosition); - currentStartPosition = _startPosition = MAX_ENSURE_DATA_BUFFER_SIZE; - _socketBuffer->setPosition(MAX_ENSURE_DATA_BUFFER_SIZE+remainingBytes); - _socketBuffer->setLimit(_socketBuffer->getSize()); - } + // preserve alignment + int currentStartPosition = _startPosition = + MAX_ENSURE_DATA_BUFFER_SIZE; // "TODO uncomment align" + (unsigned int)currentPosition % CA_ALIGNMENT; + + // copy remaining bytes, if any + int remainingBytes = _socketBuffer->getRemaining(); + int endPosition = currentStartPosition + remainingBytes; + // TODO memmove + for(int i = MAX_ENSURE_DATA_BUFFER_SIZE; iputByte(i, _socketBuffer->getByte()); + + _socketBuffer->setPosition(endPosition); + _socketBuffer->setLimit(_socketBuffer->getSize()); // read at least requiredBytes bytes @@ -509,9 +539,8 @@ namespace epics { int pos = _socketBuffer->getPosition(); ssize_t bytesRead = recv(_channel, (char*)(_socketBuffer->getArray()+pos), _socketBuffer->getRemaining(), 0); - _socketBuffer->setPosition(pos+bytesRead); - if(bytesRead<=0) { + if(unlikely(bytesRead<=0)) { if (bytesRead<0) { @@ -532,10 +561,18 @@ namespace epics { return; } + + _socketBuffer->setPosition(pos+bytesRead); } _socketBuffer->setLimit(_socketBuffer->getPosition()); _socketBuffer->setPosition(currentStartPosition); + /* + hexDump("\n\n\n", "READ", + (const int8*)_socketBuffer->getArray(), + _socketBuffer->getPosition(), _socketBuffer->getRemaining()); + */ + // notify liveness aliveNotification(); @@ -545,16 +582,16 @@ namespace epics { _stage = PROCESS_HEADER; } - if(_stage==PROCESS_HEADER) { + if(likely(_stage==PROCESS_HEADER)) { // ensure CAConstants.CA_MESSAGE_HEADER_SIZE bytes of data - if(((int)_socketBuffer->getRemaining())getRemaining())getByte(); _version = _socketBuffer->getByte(); - if((magic != CA_MAGIC) || (((unsigned int8)_version) >> 4)!=CA_MAJOR_PROTOCOL_REVISION) + if(unlikely((magic != CA_MAGIC) || (((unsigned int8)_version) >> 4)!=CA_MAJOR_PROTOCOL_REVISION)) { // error... disconnect LOG( @@ -575,12 +612,12 @@ namespace epics { _payloadSize = _socketBuffer->getInt(); int8 type = (int8)(_packetType&0x0F); - if(type==0) + if(likely(type==0)) { // data _stage = PROCESS_PAYLOAD; } - else if(type==1) + else if(unlikely(type==1)) { // control @@ -633,7 +670,7 @@ namespace epics { } } - if(_stage==PROCESS_PAYLOAD) { + if(likely(_stage==PROCESS_PAYLOAD)) { // read header // last segment bit set (means in-between segment or last segment) @@ -660,10 +697,10 @@ namespace epics { _socketBuffer->setLimit(_storedLimit); int newPosition = _storedPosition+_storedPayloadSize; - if(newPosition>_storedLimit) { + if(unlikely(newPosition>_storedLimit)) { newPosition -= _storedLimit; _socketBuffer->setPosition(_storedLimit); - processReadCached(true, PROCESS_PAYLOAD,newPosition, false); + processReadCached(true, PROCESS_PAYLOAD,newPosition); newPosition += _startPosition; } _socketBuffer->setPosition(newPosition); @@ -685,7 +722,7 @@ namespace epics { bool BlockingTCPTransport::flush() { // request issues, has not sent anything yet (per partes) - if(!_sendPending) { + if(likely(!_sendPending)) { _sendPending = true; // start sending from the start @@ -714,7 +751,7 @@ namespace epics { success = send(_sendBuffer); // all sent? - if(success) + if(likely(success)) clearAndReleaseBuffer(); else { // remember position @@ -765,7 +802,7 @@ namespace epics { &buffer->getArray()[buffer->getPosition()], buffer->getRemaining(), 0); - if(bytesSent<0) { + if(unlikely(bytesSent<0)) { int socketError = SOCKERRNO; @@ -791,7 +828,7 @@ namespace epics { //LOG(logLevelError, "%s", temp.str().c_str()); THROW_BASE_EXCEPTION(temp.str().c_str()); } - else if(bytesSent==0) { + else if(unlikely(bytesSent==0)) { // TODO WINSOCK indicates disconnect by returning zero here !!! @@ -839,12 +876,12 @@ namespace epics { void BlockingTCPTransport::processSendQueue() { // TODO sync _closed - while(!_closed) { + while(unlikely(!_closed)) { _sendQueueMutex.lock(); // TODO optimize TransportSender::shared_pointer sender; - if (!_sendQueue.empty()) + if (likely(!_sendQueue.empty())) { sender = _sendQueue.front(); _sendQueue.pop_front(); @@ -852,10 +889,10 @@ namespace epics { _sendQueueMutex.unlock(); // wait for new message - while(sender.get()==0&&!_flushRequested&&!_closed) { + while(likely(sender.get()==0&&!_flushRequested&&!_closed)) { if(_flushStrategy==DELAYED) { if(_delay>0) epicsThreadSleep(_delay); - if(_sendQueue.empty()) { + if(unlikely(_sendQueue.empty())) { // if (hasMonitors || sendBuffer.position() > CAConstants.CA_MESSAGE_HEADER_SIZE) if(((int)_sendBuffer->getPosition())>CA_MESSAGE_HEADER_SIZE) _flushRequested = true; @@ -867,7 +904,7 @@ namespace epics { _sendQueueEvent.wait(); _sendQueueMutex.lock(); - if (!_sendQueue.empty()) + if (likely(!_sendQueue.empty())) { sender = _sendQueue.front(); _sendQueue.pop_front(); @@ -878,7 +915,7 @@ namespace epics { } // always do flush from this thread - if(_flushRequested) { + if(unlikely(_flushRequested)) { /* if (hasMonitors) { @@ -889,7 +926,7 @@ namespace epics { flush(); } - if(sender.get()) { + if(likely(sender.get() != 0)) { sender->lock(); try { _lastMessageStartPosition = _sendBuffer->getPosition(); @@ -932,7 +969,7 @@ namespace epics { Transport::shared_pointer ptr = obj->shared_from_this(); // hold reference try{ - obj->processReadCached(false, NONE, CA_MESSAGE_HEADER_SIZE, false); + obj->processReadCached(false, NONE, CA_MESSAGE_HEADER_SIZE); } catch (...) { printf("rcvThreadRunnner exception\n"); } diff --git a/pvAccessApp/remote/blockingUDP.h b/pvAccessApp/remote/blockingUDP.h index eb19611..ac9a58f 100644 --- a/pvAccessApp/remote/blockingUDP.h +++ b/pvAccessApp/remote/blockingUDP.h @@ -117,6 +117,10 @@ namespace epics { // noop } + virtual void alignData(int alignment) { + _receiveBuffer->align(alignment); + } + virtual void startMessage(int8 command, int ensureCapacity); virtual void endMessage(); @@ -137,6 +141,10 @@ namespace epics { // noop } + virtual void alignBuffer(int alignment) { + _sendBuffer->align(alignment); + } + /** * Set ignore list. * @param addresses list of ignored addresses. diff --git a/pvAccessApp/remote/blockingUDPTransport.cpp b/pvAccessApp/remote/blockingUDPTransport.cpp index 75955cc..583fcfa 100644 --- a/pvAccessApp/remote/blockingUDPTransport.cpp +++ b/pvAccessApp/remote/blockingUDPTransport.cpp @@ -28,6 +28,15 @@ using namespace epics::pvData; using namespace std; +// TODO moved to some compiler_utils.h? +#if defined(__GNUC__) + #define likely(x) __builtin_expect (x, 1) + #define unlikely(x) __builtin_expect (x, 0) +#else + #define likely(x) (x) + #define unlikely(x) (x) +#endif + namespace epics { namespace pvAccess { @@ -57,7 +66,7 @@ namespace epics { timeout.tv_sec = 1; timeout.tv_usec = 0; - if (::setsockopt (_channel, SOL_SOCKET, SO_RCVTIMEO, (char*)&timeout, sizeof(timeout)) < 0) + if (unlikely(::setsockopt (_channel, SOL_SOCKET, SO_RCVTIMEO, (char*)&timeout, sizeof(timeout)) < 0)) { char errStr[64]; epicsSocketConvertErrnoToString(errStr, sizeof(errStr)); @@ -152,6 +161,8 @@ namespace epics { } void BlockingUDPTransport::endMessage() { + //we always (for now) send by packet, so no need for this here... + //alignBuffer(CA_ALIGNMENT); _sendBuffer->putInt( _lastMessageStartPosition+(sizeof(int16)+2), _sendBuffer->getPosition()-_lastMessageStartPosition-CA_MESSAGE_HEADER_SIZE); @@ -167,7 +178,7 @@ namespace epics { try { bool closed; - while(!_closed) + while(likely(!_closed)) { _mutex.lock(); @@ -187,7 +198,7 @@ namespace epics { _receiveBuffer->getRemaining(), 0, (sockaddr*)&fromAddress, &addrStructSize); - if(bytesRead>0) { + if(likely(bytesRead>0)) { // successfully got datagram bool ignore = false; if(_ignoredAddresses!=0) @@ -210,7 +221,7 @@ namespace epics { processBuffer(thisTransport, fromAddress, _receiveBuffer); } } - else if (bytesRead == -1) { + else if (unlikely(bytesRead == -1)) { int socketError = SOCKERRNO; @@ -254,7 +265,7 @@ namespace epics { bool BlockingUDPTransport::processBuffer(Transport::shared_pointer const & thisTransport, osiSockAddr& fromAddress, ByteBuffer* receiveBuffer) { // handle response(s) - while((int)receiveBuffer->getRemaining()>=CA_MESSAGE_HEADER_SIZE) { + while(likely((int)receiveBuffer->getRemaining()>=CA_MESSAGE_HEADER_SIZE)) { // // read header // @@ -263,7 +274,7 @@ namespace epics { // second byte version - major/minor nibble int8 magic = receiveBuffer->getByte(); int8 version = receiveBuffer->getByte(); - if((magic != CA_MAGIC) || (((unsigned int8)version) >> 4)!=CA_MAJOR_PROTOCOL_REVISION) + if(unlikely((magic != CA_MAGIC) || (((unsigned int8)version) >> 4)!=CA_MAJOR_PROTOCOL_REVISION)) return false; // only data for UDP @@ -284,7 +295,7 @@ namespace epics { int nextRequestPosition = receiveBuffer->getPosition() + payloadSize; // payload size check - if(nextRequestPosition>(int)receiveBuffer->getLimit()) return false; + if(unlikely(nextRequestPosition>(int)receiveBuffer->getLimit())) return false; // handle _responseHandler->handleResponse(&fromAddress, thisTransport, @@ -304,7 +315,7 @@ namespace epics { buffer->flip(); int retval = sendto(_channel, buffer->getArray(), buffer->getLimit(), 0, &(address.sa), sizeof(sockaddr)); - if(retval<0) + if(unlikely(retval<0)) { char errStr[64]; epicsSocketConvertErrnoToString(errStr, sizeof(errStr)); @@ -324,7 +335,7 @@ namespace epics { buffer->getLimit(), 0, &(_sendAddresses->at(i).sa), sizeof(sockaddr)); { - if(retval<0) + if(unlikely(retval<0)) { char errStr[64]; epicsSocketConvertErrnoToString(errStr, sizeof(errStr)); @@ -346,7 +357,7 @@ namespace epics { osiSocklen_t intLen = sizeof(int); int retval = getsockopt(_channel, SOL_SOCKET, SO_RCVBUF, (char *)&sockBufSize, &intLen); - if(retval<0) + if(unlikely(retval<0)) { char errStr[64]; epicsSocketConvertErrnoToString(errStr, sizeof(errStr)); diff --git a/pvAccessApp/remote/channelSearchManager.h b/pvAccessApp/remote/channelSearchManager.h index 90854d0..c1af7c2 100644 --- a/pvAccessApp/remote/channelSearchManager.h +++ b/pvAccessApp/remote/channelSearchManager.h @@ -268,6 +268,7 @@ public: void setRecipient(const osiSockAddr& sendTo) {} void startMessage(int8 command, int ensureCapacity) {} void ensureBuffer(int size) {} + void alignBuffer(int alignment) {} void flushSerializeBuffer() {} }; diff --git a/testApp/remote/testServer.cpp b/testApp/remote/testServer.cpp index fba49a9..04f265a 100644 --- a/testApp/remote/testServer.cpp +++ b/testApp/remote/testServer.cpp @@ -1143,7 +1143,7 @@ int main(int argc, char *argv[]) cout << "Done" << endl; - epicsThreadSleep ( 1.0 ); + epicsThreadSleep ( 3.0 ); std::cout << "-----------------------------------------------------------------------" << std::endl; epicsExitCallAtExits(); CDRMonitor::get().show(stdout, true); diff --git a/testApp/utils/introspectionRegistryTest.cpp b/testApp/utils/introspectionRegistryTest.cpp index 9e9aec9..a3fbd33 100644 --- a/testApp/utils/introspectionRegistryTest.cpp +++ b/testApp/utils/introspectionRegistryTest.cpp @@ -31,6 +31,9 @@ namespace epics { virtual void ensureBuffer(int size) { } + virtual void alignBuffer(int alignment) { + } + SerializableControlImpl() { } @@ -44,6 +47,9 @@ namespace epics { virtual void ensureData(int size) { } + virtual void alignData(int alignment) { + } + DeserializableControlImpl() { } diff --git a/testApp/utils/transportRegistryTest.cpp b/testApp/utils/transportRegistryTest.cpp index 8b2c14d..ad6321d 100644 --- a/testApp/utils/transportRegistryTest.cpp +++ b/testApp/utils/transportRegistryTest.cpp @@ -44,6 +44,7 @@ namespace epics { virtual void verified(){}; virtual void enqueueSendRequest(TransportSender::shared_pointer const & sender){}; virtual void ensureData(int) {}; + virtual void alignData(int) {}; virtual IntrospectionRegistry* getIntrospectionRegistry() {return NULL;}; private: string _type;