From 9a8c205d54df42eacbbd6bf2f01044e692a854fc Mon Sep 17 00:00:00 2001 From: miha_vitorovic Date: Fri, 31 Dec 2010 12:00:40 +0100 Subject: [PATCH] Some GrowingCircularBuffer and Mutex cleanup. --- pvAccessApp/remote/blockingTCP.h | 9 +++-- pvAccessApp/remote/blockingTCPTransport.cpp | 44 ++++++++++++++++----- 2 files changed, 39 insertions(+), 14 deletions(-) diff --git a/pvAccessApp/remote/blockingTCP.h b/pvAccessApp/remote/blockingTCP.h index ed13b5f..7884c32 100644 --- a/pvAccessApp/remote/blockingTCP.h +++ b/pvAccessApp/remote/blockingTCP.h @@ -225,11 +225,11 @@ namespace epics { /** * Marker to send. */ - int volatile _markerToSend; + volatile int _markerToSend; - bool _verified; + volatile bool _verified; - int64 volatile _remoteBufferFreeSpace; + volatile int64 _remoteBufferFreeSpace; virtual void processReadCached(bool nestedCall, ReceiveStage inStage, int requiredBytes, bool addToBuffer); @@ -312,7 +312,7 @@ namespace epics { int8 _command; int _payloadSize; - bool _flushRequested; + volatile bool _flushRequested; int _sendBufferSentPosition; @@ -349,6 +349,7 @@ namespace epics { */ void freeSendBuffers(); + TransportSender* extractFromSendQueue(); }; } diff --git a/pvAccessApp/remote/blockingTCPTransport.cpp b/pvAccessApp/remote/blockingTCPTransport.cpp index 6d06846..7edca6d 100644 --- a/pvAccessApp/remote/blockingTCPTransport.cpp +++ b/pvAccessApp/remote/blockingTCPTransport.cpp @@ -737,13 +737,30 @@ namespace epics { return true; } + TransportSender* BlockingTCPTransport::extractFromSendQueue() { + TransportSender* retval; + + _sendQueueMutex->lock(); + try { + if(_sendQueue->size()>0) + retval = _sendQueue->extract(); + else + retval = NULL; + } catch(...) { + // not expecting the exception here, but just to be safe + retval = NULL; + } + + _sendQueueMutex->unlock(); + + return retval; + } + void BlockingTCPTransport::processSendQueue() { while(!_closed) { TransportSender* sender; - _sendQueueMutex->lock(); - - sender = _sendQueue->extract(); + sender = extractFromSendQueue(); // wait for new message while(sender==NULL&&!_flushRequested&&!_closed) { if(_flushStrategy==DELAYED) { @@ -754,14 +771,13 @@ namespace epics { >CA_MESSAGE_HEADER_SIZE) _flushRequested = true; else - epicsThreadSleep(delay); + epicsThreadSleep(0); } } - //else - // epicsThreadSleep(delay); - sender = _sendQueue->extract(); + else + epicsThreadSleep(0); + sender = extractFromSendQueue(); } - _sendQueueMutex->unlock(); // always do flush from this thread if(_flushRequested) { @@ -800,7 +816,8 @@ namespace epics { } void BlockingTCPTransport::requestFlush() { - Lock lock(_sendQueueMutex); + // needless lock, manipulating a single byte + //Lock lock(_sendQueueMutex); if(_flushRequested) return; _flushRequested = true; } @@ -854,7 +871,14 @@ namespace epics { while(true) { TransportSender* sender; _monitorMutex->lock(); - sender = _monitorSendQueue->extract(); + if(_monitorSendQueue->size()>0) + try { + sender = _monitorSendQueue->extract(); + } catch(...) { + sender = NULL; + } + else + sender = NULL; _monitorMutex->unlock(); if(sender==NULL) {