Some GrowingCircularBuffer and Mutex cleanup.
This commit is contained in:
@@ -225,11 +225,11 @@ namespace epics {
|
||||
/**
|
||||
* Marker to send.
|
||||
*/
|
||||
int volatile _markerToSend;
|
||||
volatile int _markerToSend;
|
||||
|
||||
bool _verified;
|
||||
volatile bool _verified;
|
||||
|
||||
int64 volatile _remoteBufferFreeSpace;
|
||||
volatile int64 _remoteBufferFreeSpace;
|
||||
|
||||
virtual void processReadCached(bool nestedCall,
|
||||
ReceiveStage inStage, int requiredBytes, bool addToBuffer);
|
||||
@@ -312,7 +312,7 @@ namespace epics {
|
||||
int8 _command;
|
||||
int _payloadSize;
|
||||
|
||||
bool _flushRequested;
|
||||
volatile bool _flushRequested;
|
||||
|
||||
int _sendBufferSentPosition;
|
||||
|
||||
@@ -349,6 +349,7 @@ namespace epics {
|
||||
*/
|
||||
void freeSendBuffers();
|
||||
|
||||
TransportSender* extractFromSendQueue();
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -737,13 +737,30 @@ namespace epics {
|
||||
return true;
|
||||
}
|
||||
|
||||
TransportSender* BlockingTCPTransport::extractFromSendQueue() {
|
||||
TransportSender* retval;
|
||||
|
||||
_sendQueueMutex->lock();
|
||||
try {
|
||||
if(_sendQueue->size()>0)
|
||||
retval = _sendQueue->extract();
|
||||
else
|
||||
retval = NULL;
|
||||
} catch(...) {
|
||||
// not expecting the exception here, but just to be safe
|
||||
retval = NULL;
|
||||
}
|
||||
|
||||
_sendQueueMutex->unlock();
|
||||
|
||||
return retval;
|
||||
}
|
||||
|
||||
void BlockingTCPTransport::processSendQueue() {
|
||||
while(!_closed) {
|
||||
TransportSender* sender;
|
||||
|
||||
_sendQueueMutex->lock();
|
||||
|
||||
sender = _sendQueue->extract();
|
||||
sender = extractFromSendQueue();
|
||||
// wait for new message
|
||||
while(sender==NULL&&!_flushRequested&&!_closed) {
|
||||
if(_flushStrategy==DELAYED) {
|
||||
@@ -754,14 +771,13 @@ namespace epics {
|
||||
>CA_MESSAGE_HEADER_SIZE)
|
||||
_flushRequested = true;
|
||||
else
|
||||
epicsThreadSleep(delay);
|
||||
epicsThreadSleep(0);
|
||||
}
|
||||
}
|
||||
//else
|
||||
// epicsThreadSleep(delay);
|
||||
sender = _sendQueue->extract();
|
||||
else
|
||||
epicsThreadSleep(0);
|
||||
sender = extractFromSendQueue();
|
||||
}
|
||||
_sendQueueMutex->unlock();
|
||||
|
||||
// always do flush from this thread
|
||||
if(_flushRequested) {
|
||||
@@ -800,7 +816,8 @@ namespace epics {
|
||||
}
|
||||
|
||||
void BlockingTCPTransport::requestFlush() {
|
||||
Lock lock(_sendQueueMutex);
|
||||
// needless lock, manipulating a single byte
|
||||
//Lock lock(_sendQueueMutex);
|
||||
if(_flushRequested) return;
|
||||
_flushRequested = true;
|
||||
}
|
||||
@@ -854,7 +871,14 @@ namespace epics {
|
||||
while(true) {
|
||||
TransportSender* sender;
|
||||
_monitorMutex->lock();
|
||||
sender = _monitorSendQueue->extract();
|
||||
if(_monitorSendQueue->size()>0)
|
||||
try {
|
||||
sender = _monitorSendQueue->extract();
|
||||
} catch(...) {
|
||||
sender = NULL;
|
||||
}
|
||||
else
|
||||
sender = NULL;
|
||||
_monitorMutex->unlock();
|
||||
|
||||
if(sender==NULL) {
|
||||
|
||||
Reference in New Issue
Block a user