TCP transport cleanup, lots of printf still in use

This commit is contained in:
Matej Sekoranja
2011-01-24 18:07:37 +01:00
parent b23e7f13d8
commit 5cb78eac05
5 changed files with 300 additions and 207 deletions

View File

@@ -65,7 +65,7 @@ namespace epics {
BlockingTCPTransport::BlockingTCPTransport(Context* context,
SOCKET channel, ResponseHandler* responseHandler,
int receiveBufferSize, int16 priority) :
_closed(false), _channel(channel), _socketAddress(new osiSockAddr),
_closed(false), _channel(channel),
_remoteTransportRevision(0),
_remoteTransportReceiveBufferSize(MAX_TCP_RECV),
_remoteTransportSocketReceiveBufferSize(MAX_TCP_RECV),
@@ -75,9 +75,7 @@ namespace epics {
LONG_LONG_MAX), _autoDelete(true),
_markerPeriodBytes(MARKER_PERIOD), _nextMarkerPosition(
_markerPeriodBytes), _sendPending(false),
_lastMessageStartPosition(0), _mutex(new Mutex()),
_sendQueueMutex(new Mutex()), _verifiedMutex(new Mutex()),
_monitorMutex(new Mutex()), _stage(READ_FROM_SOCKET),
_lastMessageStartPosition(0), _stage(READ_FROM_SOCKET),
_lastSegmentedMessageType(0), _lastSegmentedMessageCommand(
0), _storedPayloadSize(0), _storedPosition(0),
_storedLimit(0), _magicAndVersion(0), _packetType(0),
@@ -87,9 +85,9 @@ namespace epics {
new GrowingCircularBuffer<TransportSender*> (100)),
_rcvThreadId(NULL), _sendThreadId(NULL), _monitorSendQueue(
new GrowingCircularBuffer<TransportSender*> (100)),
_monitorSender(new MonitorSender(_monitorMutex,
_monitorSender(new MonitorSender(&_monitorMutex,
_monitorSendQueue)), _context(context),
_sendThreadRunning(false) {
_sendThreadExited(false) {
_socketBuffer = new ByteBuffer(max(MAX_TCP_RECV
+MAX_ENSURE_DATA_BUFFER_SIZE, receiveBufferSize), EPICS_ENDIAN_BIG);
@@ -114,7 +112,7 @@ namespace epics {
}
socklen_t saSize = sizeof(sockaddr);
retval = getpeername(_channel, &(_socketAddress->sa), &saSize);
retval = getpeername(_channel, &(_socketAddress.sa), &saSize);
if(retval<0) {
errlogSevPrintf(errlogMajor,
"Error fetching socket remote address: %s", strerror(
@@ -130,26 +128,20 @@ namespace epics {
BlockingTCPTransport::~BlockingTCPTransport() {
close(true);
// TODO remove
epicsThreadSleep(3.0);
delete _socketAddress;
delete _sendQueue;
delete _socketBuffer;
delete _sendBuffer;
delete _mutex;
delete _sendQueueMutex;
delete _verifiedMutex;
delete _monitorMutex;
delete _responseHandler;
}
void BlockingTCPTransport::start() {
_sendThreadRunning = true;
// TODO consuder epics::pvData::Thread
String threadName = "TCP-receive "+inetAddressToString(
* _socketAddress);
_socketAddress);
errlogSevPrintf(errlogInfo, "Starting thread: %s",
threadName.c_str());
@@ -159,7 +151,7 @@ namespace epics {
epicsThreadStackMedium),
BlockingTCPTransport::rcvThreadRunner, this);
threadName = "TCP-send "+inetAddressToString(*_socketAddress);
threadName = "TCP-send "+inetAddressToString(_socketAddress);
errlogSevPrintf(errlogInfo, "Starting thread: %s",
threadName.c_str());
@@ -178,9 +170,9 @@ namespace epics {
_nextMarkerPosition -= _sendBuffer->getPosition()
-CA_MESSAGE_HEADER_SIZE;
_sendQueueMutex->lock();
_sendQueueMutex.lock();
_flushRequested = false;
_sendQueueMutex->unlock();
_sendQueueMutex.unlock();
_sendBuffer->clear();
@@ -194,12 +186,13 @@ namespace epics {
}
void BlockingTCPTransport::close(bool force) {
Lock lock(_mutex);
Lock lock(&_mutex);
// already closed check
if(_closed) return;
_closed = true;
printf("closing.\n");
// remove from registry
_context->getTransportRegistry()->remove(this);
@@ -207,12 +200,8 @@ namespace epics {
// clean resources
internalClose(force);
// threads cannot "wait" Epics, no need to notify
// TODO check alternatives to "wait"
// notify send queue
//synchronized (sendQueue) {
// sendQueue.notifyAll();
//}
_sendQueueEvent.signal();
}
void BlockingTCPTransport::internalClose(bool force) {
@@ -239,21 +228,22 @@ namespace epics {
return sockBufSize;
}
// TODO reimplement using Event
bool BlockingTCPTransport::waitUntilVerified(double timeout) {
double internalTimeout = timeout;
bool internalVerified = false;
_verifiedMutex->lock();
_verifiedMutex.lock();
internalVerified = _verified;
_verifiedMutex->unlock();
_verifiedMutex.unlock();
while(!internalVerified&&internalTimeout>0) {
epicsThreadSleep(min(0.1, internalTimeout));
internalTimeout -= 0.1;
_verifiedMutex->lock();
_verifiedMutex.lock();
internalVerified = _verified;
_verifiedMutex->unlock();
_verifiedMutex.unlock();
}
return internalVerified;
}
@@ -510,7 +500,7 @@ namespace epics {
errlogSevPrintf(
errlogMinor,
"Invalid header received from client %s, disconnecting...",
inetAddressToString(*_socketAddress).c_str());
inetAddressToString(_socketAddress).c_str());
close(true);
return;
}
@@ -531,11 +521,14 @@ namespace epics {
}
else if(type==1) {
if(_command==0) {
if(_markerToSend==0) _markerToSend
= _payloadSize; // TODO send back response
_flowControlMutex.lock();
if(_markerToSend==0)
_markerToSend = _payloadSize; // TODO send back response
_flowControlMutex.unlock();
}
else //if (command == 1)
{
_flowControlMutex.lock();
int difference = (int)_totalBytesSent
-_payloadSize+CA_MESSAGE_HEADER_SIZE;
// overrun check
@@ -545,6 +538,7 @@ namespace epics {
+_remoteTransportSocketReceiveBufferSize
-difference;
// TODO if this is calculated wrong, this can be critical !!!
_flowControlMutex.unlock();
}
// no payload
@@ -556,7 +550,7 @@ namespace epics {
errlogMajor,
"Unknown packet type %d, received from client %s, disconnecting...",
type,
inetAddressToString(*_socketAddress).c_str());
inetAddressToString(_socketAddress).c_str());
close(true);
return;
}
@@ -580,7 +574,7 @@ namespace epics {
+_storedPayloadSize, _storedLimit));
try {
// handle response
_responseHandler->handleResponse(_socketAddress,
_responseHandler->handleResponse(&_socketAddress,
this, version, _command, _payloadSize,
_socketBuffer);
} catch(...) {
@@ -628,8 +622,10 @@ namespace epics {
_sendBufferSentPosition = 0;
// if not set skip marker otherwise set it
_flowControlMutex.lock();
int markerValue = _markerToSend;
_markerToSend = 0;
_flowControlMutex.unlock();
if(markerValue==0)
_sendBufferSentPosition = CA_MESSAGE_HEADER_SIZE;
else
@@ -694,7 +690,7 @@ namespace epics {
//errlogSevPrintf(errlogInfo,
// "Sending %d of total %d bytes in the packet to %s.",
// bytesToSend, limit,
// inetAddressToString(*_socketAddress).c_str());
// inetAddressToString(_socketAddress).c_str());
while(buffer->getRemaining()>0) {
ssize_t bytesSent = ::send(_channel,
@@ -720,13 +716,15 @@ namespace epics {
//errlogSevPrintf(errlogInfo,
// "Send buffer full for %s, waiting...",
// inetAddressToString(*_socketAddress));
// inetAddressToString(_socketAddress));
return false;
}
buffer->setPosition(buffer->getPosition()+bytesSent);
_flowControlMutex.lock();
_totalBytesSent += bytesSent;
_flowControlMutex.unlock();
// readjust limit
if(bytesToSend==maxBytesToSend) {
@@ -752,18 +750,9 @@ namespace epics {
TransportSender* BlockingTCPTransport::extractFromSendQueue() {
TransportSender* retval;
_sendQueueMutex->lock();
try {
if(_sendQueue->size()>0)
retval = _sendQueue->extract();
else
retval = NULL;
} catch(...) {
// not expecting the exception here, but just to be safe
retval = NULL;
}
_sendQueueMutex->unlock();
_sendQueueMutex.lock();
retval = _sendQueue->extract();
_sendQueueMutex.unlock();
return retval;
}
@@ -772,23 +761,35 @@ namespace epics {
while(!_closed) {
TransportSender* sender;
// TODO race!
sender = extractFromSendQueue();
printf("extraced %d\n", sender);
// wait for new message
while(sender==NULL&&!_flushRequested&&!_closed) {
while(sender==NULL&&!_flushRequested/*&&!_closed*/) {
bool c;
_mutex.lock();
c = _closed;
printf("closed %d\n", c);
_mutex.unlock();
if (c)
break;
if(_flushStrategy==DELAYED) {
if(delay>0) epicsThreadSleep(delay);
if(_delay>0) epicsThreadSleep(_delay);
if(_sendQueue->size()==0) {
// if (hasMonitors || sendBuffer.position() > CAConstants.CA_MESSAGE_HEADER_SIZE)
if(_sendBuffer->getPosition()
>CA_MESSAGE_HEADER_SIZE)
if(_sendBuffer->getPosition()>CA_MESSAGE_HEADER_SIZE)
_flushRequested = true;
else
epicsThreadSleep(0);
_sendQueueEvent.wait();
}
}
else
epicsThreadSleep(0);
_sendQueueEvent.wait();
sender = extractFromSendQueue();
printf("extraced2 %d\n", sender);
}
// always do flush from this thread
@@ -827,13 +828,6 @@ namespace epics {
} // while(!_closed)
}
void BlockingTCPTransport::requestFlush() {
// needless lock, manipulating a single byte
//Lock lock(_sendQueueMutex);
if(_flushRequested) return;
_flushRequested = true;
}
void BlockingTCPTransport::freeSendBuffers() {
// TODO ?
}
@@ -842,7 +836,7 @@ namespace epics {
freeSendBuffers();
errlogSevPrintf(errlogInfo, "Connection to %s closed.",
inetAddressToString(*_socketAddress).c_str());
inetAddressToString(_socketAddress).c_str());
if(_channel!=INVALID_SOCKET) {
epicsSocketDestroy(_channel);
@@ -853,53 +847,68 @@ namespace epics {
void BlockingTCPTransport::rcvThreadRunner(void* param) {
BlockingTCPTransport* obj = (BlockingTCPTransport*)param;
try{
obj->processReadCached(false, NONE, CA_MESSAGE_HEADER_SIZE, false);
} catch (...) {
printf("rcvThreadRunnner exception\n");
}
printf("rcvThreadRunner done, autodelete %d-\n", obj->_autoDelete);
if(obj->_autoDelete) {
while(obj->_sendThreadRunning)
while(true)
{
printf("waiting send thread to exit.\n");
bool exited;
obj->_mutex.lock();
exited = obj->_sendThreadExited;
obj->_mutex.unlock();
if (exited)
break;
epicsThreadSleep(0.1);
}
printf("deleting.\n");
delete obj;
}
}
void BlockingTCPTransport::sendThreadRunner(void* param) {
BlockingTCPTransport* obj = (BlockingTCPTransport*)param;
try {
obj->processSendQueue();
} catch (...) {
printf("sendThreadRunnner exception\n");
}
obj->freeConnectionResorces();
printf("exited.\n");
obj->_sendThreadRunning = false;
// TODO possible crash on unlock
obj->_mutex.lock();
obj->_sendThreadExited = true;
obj->_mutex.unlock();
}
void BlockingTCPTransport::enqueueSendRequest(TransportSender* sender) {
Lock lock(&_sendQueueMutex);
if(_closed) return;
Lock lock(_sendQueueMutex);
_sendQueue->insert(sender);
_sendQueueEvent.signal();
}
void BlockingTCPTransport::enqueueMonitorSendRequest(
TransportSender* sender) {
void BlockingTCPTransport::enqueueMonitorSendRequest(TransportSender* sender) {
Lock lock(&_monitorMutex);
if(_closed) return;
Lock lock(_monitorMutex);
_monitorSendQueue->insert(sender);
if(_monitorSendQueue->size()==1) enqueueSendRequest(_monitorSender);
}
void MonitorSender::send(ByteBuffer* buffer,
TransportSendControl* control) {
void MonitorSender::send(ByteBuffer* buffer, TransportSendControl* control) {
control->startMessage(19, 0);
while(true) {
TransportSender* sender;
_monitorMutex->lock();
if(_monitorSendQueue->size()>0)
try {
sender = _monitorSendQueue->extract();
} catch(...) {
sender = NULL;
}
sender = _monitorSendQueue->extract();
else
sender = NULL;
_monitorMutex->unlock();