improved exception handling
This commit is contained in:
+107
-114
@@ -94,7 +94,7 @@ void tcpSendThread::run ()
|
||||
}
|
||||
}
|
||||
catch ( ... ) {
|
||||
this->iiu.printf ("cac: tcp send thread received an exception - disconnecting\n");
|
||||
this->iiu.printf ("cac: tcp send thread received an unexpected exception - disconnecting\n");
|
||||
this->iiu.forcedShutdown ();
|
||||
}
|
||||
|
||||
@@ -226,120 +226,66 @@ void tcpRecvThread::start ()
|
||||
|
||||
void tcpRecvThread::run ()
|
||||
{
|
||||
this->iiu.pCAC()->attachToClientCtx ();
|
||||
try {
|
||||
this->iiu.pCAC()->attachToClientCtx ();
|
||||
|
||||
epicsThreadPrivateSet ( caClientCallbackThreadId, &this->iiu );
|
||||
epicsThreadPrivateSet ( caClientCallbackThreadId, &this->iiu );
|
||||
|
||||
this->iiu.connect ();
|
||||
this->iiu.connect ();
|
||||
|
||||
this->iiu.versionMessage ( this->iiu.priority() );
|
||||
this->iiu.versionMessage ( this->iiu.priority() );
|
||||
|
||||
if ( this->iiu.state == iiu_connected ) {
|
||||
this->iiu.sendThread.start ();
|
||||
}
|
||||
else {
|
||||
this->iiu.sendThreadExitEvent.signal ();
|
||||
this->iiu.cleanShutdown ();
|
||||
}
|
||||
|
||||
comBuf * pComBuf = new ( std::nothrow ) comBuf;
|
||||
while ( this->iiu.state == iiu_connected ) {
|
||||
if ( ! pComBuf ) {
|
||||
// no way to be informed when memory is available
|
||||
epicsThreadSleep ( 0.5 );
|
||||
pComBuf = new ( std::nothrow ) comBuf;
|
||||
continue;
|
||||
}
|
||||
|
||||
//
|
||||
// We leave the bytes pending and fetch them after
|
||||
// callbacks are enabled when running in the old preemptive
|
||||
// call back disabled mode so that asynchronous wakeup via
|
||||
// file manager call backs works correctly. This does not
|
||||
// appear to impact performance.
|
||||
//
|
||||
unsigned nBytesIn;
|
||||
if ( this->iiu.pCAC()->preemptiveCallbakIsEnabled() ) {
|
||||
nBytesIn = pComBuf->fillFromWire ( this->iiu );
|
||||
if ( nBytesIn == 0u ) {
|
||||
break;
|
||||
}
|
||||
if ( this->iiu.state == iiu_connected ) {
|
||||
this->iiu.sendThread.start ();
|
||||
}
|
||||
else {
|
||||
char buf;
|
||||
::recv ( this->iiu.sock, &buf, 1, MSG_PEEK );
|
||||
nBytesIn = 0u; // make gnu hoppy
|
||||
}
|
||||
|
||||
if ( this->iiu.state != iiu_connected ) {
|
||||
break;
|
||||
this->iiu.sendThreadExitEvent.signal ();
|
||||
this->iiu.cleanShutdown ();
|
||||
}
|
||||
|
||||
// reschedule connection activity watchdog
|
||||
// but dont hold the lock for fear of deadlocking
|
||||
// because cancel is blocking for the completion
|
||||
// of the recvDog expire which takes the lock
|
||||
// - it take also the callback lock
|
||||
this->iiu.recvDog.messageArrivalNotify ();
|
||||
|
||||
// only one recv thread at a time may call callbacks
|
||||
// - pendEvent() blocks until threads waiting for
|
||||
// this lock get a chance to run
|
||||
epicsGuard < callbackMutex > guard ( this->cbMutex );
|
||||
|
||||
if ( ! this->iiu.pCAC()->preemptiveCallbakIsEnabled() ) {
|
||||
nBytesIn = pComBuf->fillFromWire ( this->iiu );
|
||||
if ( nBytesIn == 0u ) {
|
||||
// outer loop checks to see if state is connected
|
||||
// ( properly set by fillFromWire() )
|
||||
break;
|
||||
comBuf * pComBuf = new comBuf;
|
||||
while ( this->iiu.state == iiu_connected ) {
|
||||
if ( ! pComBuf ) {
|
||||
pComBuf = new comBuf;
|
||||
}
|
||||
}
|
||||
|
||||
// force the receive watchdog to be reset every 50 frames
|
||||
unsigned contiguousFrameCount = 0;
|
||||
while ( contiguousFrameCount++ < 50 ) {
|
||||
|
||||
if ( nBytesIn == pComBuf->capacityBytes () ) {
|
||||
if ( this->iiu.contigRecvMsgCount >=
|
||||
contiguousMsgCountWhichTriggersFlowControl ) {
|
||||
this->iiu.busyStateDetected = true;
|
||||
}
|
||||
else {
|
||||
this->iiu.contigRecvMsgCount++;
|
||||
//
|
||||
// We leave the bytes pending and fetch them after
|
||||
// callbacks are enabled when running in the old preemptive
|
||||
// call back disabled mode so that asynchronous wakeup via
|
||||
// file manager call backs works correctly. This does not
|
||||
// appear to impact performance.
|
||||
//
|
||||
unsigned nBytesIn;
|
||||
if ( this->iiu.pCAC()->preemptiveCallbakIsEnabled() ) {
|
||||
nBytesIn = pComBuf->fillFromWire ( this->iiu );
|
||||
if ( nBytesIn == 0u ) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
else {
|
||||
this->iiu.contigRecvMsgCount = 0u;
|
||||
this->iiu.busyStateDetected = false;
|
||||
}
|
||||
this->iiu.unacknowledgedSendBytes = 0u;
|
||||
|
||||
this->iiu.recvQue.pushLastComBufReceived ( *pComBuf );
|
||||
pComBuf = 0;
|
||||
|
||||
// execute receive labor
|
||||
bool noProtocolViolation = this->iiu.processIncoming ( guard );
|
||||
if ( ! noProtocolViolation ) {
|
||||
this->iiu.state = iiu_disconnected;
|
||||
char buf;
|
||||
::recv ( this->iiu.sock, &buf, 1, MSG_PEEK );
|
||||
nBytesIn = 0u; // make gnu hoppy
|
||||
}
|
||||
|
||||
if ( this->iiu.state != iiu_connected ) {
|
||||
break;
|
||||
}
|
||||
|
||||
// allocate a new com buf
|
||||
pComBuf = new ( std::nothrow ) comBuf;
|
||||
nBytesIn = 0u;
|
||||
if ( ! pComBuf ) {
|
||||
break;
|
||||
}
|
||||
// reschedule connection activity watchdog
|
||||
// but dont hold the lock for fear of deadlocking
|
||||
// because cancel is blocking for the completion
|
||||
// of the recvDog expire which takes the lock
|
||||
// - it take also the callback lock
|
||||
this->iiu.recvDog.messageArrivalNotify ();
|
||||
|
||||
{
|
||||
int status;
|
||||
osiSockIoctl_t bytesPending = 0;
|
||||
status = socket_ioctl ( this->iiu.sock, // X aCC 392
|
||||
FIONREAD, & bytesPending );
|
||||
if ( status || bytesPending == 0u ) {
|
||||
break;
|
||||
}
|
||||
// only one recv thread at a time may call callbacks
|
||||
// - pendEvent() blocks until threads waiting for
|
||||
// this lock get a chance to run
|
||||
epicsGuard < callbackMutex > guard ( this->cbMutex );
|
||||
|
||||
if ( ! this->iiu.pCAC()->preemptiveCallbakIsEnabled() ) {
|
||||
nBytesIn = pComBuf->fillFromWire ( this->iiu );
|
||||
if ( nBytesIn == 0u ) {
|
||||
// outer loop checks to see if state is connected
|
||||
@@ -347,19 +293,74 @@ void tcpRecvThread::run ()
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// force the receive watchdog to be reset every 50 frames
|
||||
unsigned contiguousFrameCount = 0;
|
||||
while ( contiguousFrameCount++ < 50 ) {
|
||||
|
||||
if ( nBytesIn == pComBuf->capacityBytes () ) {
|
||||
if ( this->iiu.contigRecvMsgCount >=
|
||||
contiguousMsgCountWhichTriggersFlowControl ) {
|
||||
this->iiu.busyStateDetected = true;
|
||||
}
|
||||
else {
|
||||
this->iiu.contigRecvMsgCount++;
|
||||
}
|
||||
}
|
||||
else {
|
||||
this->iiu.contigRecvMsgCount = 0u;
|
||||
this->iiu.busyStateDetected = false;
|
||||
}
|
||||
this->iiu.unacknowledgedSendBytes = 0u;
|
||||
|
||||
this->iiu.recvQue.pushLastComBufReceived ( *pComBuf );
|
||||
pComBuf = 0;
|
||||
|
||||
// execute receive labor
|
||||
bool noProtocolViolation = this->iiu.processIncoming ( guard );
|
||||
if ( ! noProtocolViolation ) {
|
||||
this->iiu.state = iiu_disconnected;
|
||||
break;
|
||||
}
|
||||
|
||||
// allocate a new com buf
|
||||
pComBuf = new comBuf;
|
||||
nBytesIn = 0u;
|
||||
|
||||
{
|
||||
int status;
|
||||
osiSockIoctl_t bytesPending = 0;
|
||||
status = socket_ioctl ( this->iiu.sock, // X aCC 392
|
||||
FIONREAD, & bytesPending );
|
||||
if ( status || bytesPending == 0u ) {
|
||||
break;
|
||||
}
|
||||
nBytesIn = pComBuf->fillFromWire ( this->iiu );
|
||||
if ( nBytesIn == 0u ) {
|
||||
// outer loop checks to see if state is connected
|
||||
// ( properly set by fillFromWire() )
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if ( pComBuf ) {
|
||||
pComBuf->destroy ();
|
||||
}
|
||||
|
||||
this->iiu.stopThreads ();
|
||||
|
||||
// arrange for delete through the timer thread
|
||||
this->iiu.killTimer.start ();
|
||||
}
|
||||
|
||||
if ( pComBuf ) {
|
||||
pComBuf->destroy ();
|
||||
catch ( ... ) {
|
||||
errlogPrintf ("cac tcp receive thread terminating due to a c++ exception\n" );
|
||||
this->iiu.killTimer.start ();
|
||||
}
|
||||
|
||||
this->iiu.stopThreads ();
|
||||
|
||||
// arrange for delete through the timer thread
|
||||
this->iiu.killTimer.start ();
|
||||
}
|
||||
|
||||
|
||||
//
|
||||
// tcpiiu::tcpiiu ()
|
||||
//
|
||||
@@ -401,14 +402,6 @@ tcpiiu::tcpiiu ( cac & cac, callbackMutex & cbMutex, double connectionTimeout,
|
||||
earlyFlush ( false ),
|
||||
recvProcessPostponedFlush ( false )
|
||||
{
|
||||
if ( ! this->pCurData ) {
|
||||
throw std::bad_alloc ();
|
||||
}
|
||||
|
||||
if ( ! this->pHostNameCache.get () ) {
|
||||
throw std::bad_alloc ();
|
||||
}
|
||||
|
||||
this->sock = socket ( AF_INET, SOCK_STREAM, IPPROTO_TCP );
|
||||
if ( this->sock == INVALID_SOCKET ) {
|
||||
this->printf ( "CAC: unable to create virtual circuit because \"%s\"\n",
|
||||
|
||||
Reference in New Issue
Block a user