simplified schedualing

This commit is contained in:
Jeff Hill
2001-08-02 00:34:46 +00:00
parent cd43af94e1
commit d0e4a93563
18 changed files with 772 additions and 845 deletions

View File

@@ -73,9 +73,15 @@ int CASG::block ( double timeout )
{
epicsTime cur_time;
epicsTime beg_time;
double delay;
double remaining;
int status;
double delay;
double remaining;
int status;
// prevent recursion nightmares by disabling blocking
// for IO from within a CA callback.
if ( epicsThreadPrivateGet ( caClientCallbackThreadId ) ) {
return ECA_EVDISALLOW;
}
if ( timeout < 0.0 ) {
return ECA_TIMEOUT;
@@ -105,7 +111,15 @@ int CASG::block ( double timeout )
break;
}
this->client.blockForEventAndEnableCallbacks ( this->sem, remaining );
{
// serialize access the blocking mechanism below
epicsAutoMutex autoMutex ( this->serializeBlock );
status = this->client.blockForEventAndEnableCallbacks ( this->sem, remaining );
if ( status != ECA_NORMAL ) {
return status;
}
}
/*
* force a time update

View File

@@ -32,7 +32,7 @@
#include "oldAccess.h"
#include "autoPtrDestroy.h"
epicsThreadPrivateId caClientContextId;
static epicsThreadPrivateId caClientContextId;
static epicsThreadOnceId caClientContextIdOnce = EPICS_THREAD_ONCE_INIT;

File diff suppressed because it is too large Load Diff

View File

@@ -48,7 +48,9 @@ struct CASG;
class inetAddrID;
struct caHdrLargeArray;
class cac : private cacRecycle, private epicsThreadRunable
extern epicsThreadPrivateId caClientCallbackThreadId;
class cac : private cacRecycle
{
public:
cac ( cacNotify &, bool enablePreemptiveCallback = false );
@@ -59,9 +61,6 @@ public:
const epicsTime &currentTime );
void repeaterSubscribeConfirmNotify ();
// IIU routines
void signalRecvActivity ();
// outstanding IO count management routines
void incrementOutstandingIO ();
void decrementOutstandingIO ();
@@ -73,15 +72,12 @@ public:
void flushRequest ();
int pendIO ( const double &timeout );
int pendEvent ( const double &timeout );
void connectAllIO ( nciu &chan );
void disconnectAllIO ( nciu &chan );
void destroyAllIO ( nciu &chan );
bool executeResponse ( tcpiiu &, caHdrLargeArray &, char *pMsgBody );
void ioCancel ( nciu &chan, const cacChannel::ioid &id );
void ioShow ( const cacChannel::ioid &id, unsigned level ) const;
// channel routines
bool connectChannel ( unsigned id );
void installNetworkChannel ( nciu &, netiiu *&piiu );
bool lookupChannelAndTransferToTCP ( unsigned cid, unsigned sid,
ca_uint16_t typeCode, arrayElementCount count, unsigned minorVersionNumber,
@@ -110,7 +106,7 @@ public:
const char *pFileName, unsigned lineNo );
// callback preemption control
void blockForEventAndEnableCallbacks ( epicsEvent &event, double timeout );
int blockForEventAndEnableCallbacks ( epicsEvent &event, double timeout );
// diagnostics
unsigned connectionCount () const;
@@ -123,17 +119,22 @@ public:
void vSignal ( int ca_status, const char *pfilenm,
int lineno, const char *pFormat, va_list args );
// misc
const char * userNamePointer () const;
unsigned getInitializingThreadsPriority () const;
epicsMutex & mutexRef ();
void attachToClientCtx ();
// buffer management
char * allocateSmallBufferTCP ();
void releaseSmallBufferTCP ( char * );
unsigned largeBufferSizeTCP () const;
char * allocateLargeBufferTCP ();
void releaseLargeBufferTCP ( char * );
void selfTest ();
// misc
const char * userNamePointer () const;
unsigned getInitializingThreadsPriority () const;
epicsMutex & mutexRef ();
void attachToClientCtx ();
void selfTest () const;
void notifyNewFD ( SOCKET ) const;
void notifyDestroyFD ( SOCKET ) const;
void uninstallIIU ( tcpiiu &iiu );
private:
ipAddrToAsciiEngine ipToAEngine;
@@ -159,11 +160,12 @@ private:
epicsTime programBeginTime;
double connTMO;
mutable epicsMutex mutex;
epicsMutex preemptiveCallbackLock;
epicsEvent notifyCompletionEvent;
epicsEvent recvProcessActivityEvent;
epicsEvent recvProcessThreadExit;
epicsMutex callbackMutex;
epicsMutex serializePendIO;
epicsMutex serializePendEvent;
epicsEvent ioDone;
epicsEvent noRecvThreadsPending;
epicsEvent iiuUninstal;
epicsTimerQueueActive *pTimerQueue;
char *pUserName;
class udpiiu *pudpiiu;
@@ -172,32 +174,28 @@ private:
*pRepeaterSubscribeTmr;
void *tcpSmallRecvBufFreeList;
void *tcpLargeRecvBufFreeList;
epicsThread *pRecvProcessThread;
epicsThreadPrivateId isRecvProcessId;
cacNotify &notify;
unsigned ioNotifyInProgressId;
unsigned initializingThreadsPriority;
unsigned threadsBlockingOnNotifyCompletion;
unsigned maxRecvBytesTCP;
unsigned recvProcessEnableRefCount;
unsigned pndRecvCnt;
unsigned readSeq;
unsigned recvThreadsPendingCount;
bool enablePreemptiveCallback;
bool ioInProgress;
bool recvProcessThreadExitRequest;
void processRecvBacklog ();
void flushRequestPrivate ();
void run ();
bool setupUDP ();
void enableCallbackPreemption ();
void disableCallbackPreemption ();
void flushIfRequired ( nciu & ); // lock must be applied
void connectAllIO ( nciu &chan );
void disconnectAllIO ( nciu &chan, bool enableCallbacks );
void privateDestroyAllIO ( nciu & chan );
void ioCancelPrivate ( nciu &chan, const cacChannel::ioid &id );
void flushIfRequired ( netiiu & );
void recycleReadNotifyIO ( netReadNotifyIO &io );
void recycleWriteNotifyIO ( netWriteNotifyIO &io );
void recycleSubscription ( netSubscription &io );
bool recvProcessThreadIsCurrentThread () const;
void startRecvProcessThread ();
void preemptiveCallbackLock ();
void preemptiveCallbackUnlock ();
void ioCompletionNotify ( unsigned id, unsigned type,
arrayElementCount count, const void *pData );
void ioExceptionNotify ( unsigned id,
@@ -212,7 +210,6 @@ private:
int status, const char *pContext );
void ioExceptionNotifyAndDestroy ( unsigned id,
int status, const char *pContext, unsigned type, arrayElementCount count );
bool blockForIOCallbackCompletion ( const cacChannel::ioid & id );
// recv protocol stubs
bool noopAction ( tcpiiu &, const caHdrLargeArray &, void *pMsgBdy );
@@ -249,6 +246,22 @@ private:
tcpiiu &iiu, const caHdrLargeArray &hdr,
const char *pCtx, unsigned status );
static const pExcepProtoStubTCP tcpExcepJumpTableCAC [];
friend class callbackAutoMutex;
};
class callbackAutoMutex {
public:
callbackAutoMutex ( cac & ctxIn ) : ctx ( ctxIn )
{
this->ctx.preemptiveCallbackLock ();
}
~callbackAutoMutex ()
{
this->ctx.preemptiveCallbackUnlock ();
}
private:
cac & ctx;
};
inline const char * cac::userNamePointer () const
@@ -321,25 +334,10 @@ inline void cac::releaseLargeBufferTCP ( char *pBuf )
freeListFree ( this->tcpLargeRecvBufFreeList, pBuf );
}
inline bool cac::recvProcessThreadIsCurrentThread () const
{
if ( this->pRecvProcessThread ) {
return this->pRecvProcessThread->isCurrentThread();
}
else {
return false;
}
}
inline bool cac::ioComplete () const
{
return ( this->pndRecvCnt == 0u );
}
inline void cac::signalRecvActivity ()
{
this->recvProcessActivityEvent.signal ();
}
#endif // ifdef cach

View File

@@ -49,6 +49,7 @@ public:
unsigned unoccupiedBytes () const;
unsigned occupiedBytes () const;
static unsigned capacityBytes ();
void clear ();
unsigned copyInBytes ( const void *pBuf, unsigned nBytes );
unsigned copyIn ( comBuf & );
unsigned copyIn ( const epicsInt8 *pValue, unsigned nElem );
@@ -105,6 +106,12 @@ inline void comBuf::destroy ()
delete this;
}
inline void comBuf::clear ()
{
this->nextWriteIndex = 0u;
this->nextReadIndex = 0u;
}
inline void * comBuf::operator new ( size_t size, const std::nothrow_t & )
{
epicsAutoMutex locker ( comBuf::freeListMutex );

View File

@@ -44,5 +44,6 @@ void msgForMultiplyDefinedPV::ioCompletionNotify ( const char *pHostNameRej )
char buf[256];
sprintf ( buf, "Channel: \"%.64s\", Connecting to: %.64s, Ignored: %.64s",
this->channel, this->acc, pHostNameRej );
callbackAutoMutex autoMutex ( this->cacRef );
genLocalExcep ( this->cacRef, ECA_DBLCHNL, buf );
}

View File

@@ -118,37 +118,17 @@ void nciu::connect ( unsigned nativeType,
/*
* if less than v4.1 then the server will never
* send access rights and we know that there
* will always be access
* send access rights and there will always be access
*/
if ( ! v41Ok ) {
this->accessRightState.setReadPermit();
this->accessRightState.setWritePermit();
}
// resubscribe for monitors from this channel
this->cacCtx.connectAllIO ( *this );
this->notify().connectNotify ();
/*
* if less than v4.1 then the server will never
* send access rights and we know that there
* will always be access and also need to call
* their call back here
*/
if ( ! v41Ok ) {
this->notify().accessRightsNotify ( this->accessRightState );
}
}
void nciu::disconnect ( netiiu &newiiu )
void nciu::disconnect ( netiiu & newiiu )
{
bool wasConnected;
this->piiu->disconnectAllIO ( *this );
this->piiu = &newiiu;
this->piiu = & newiiu;
this->retry = 0u;
this->typeCode = USHRT_MAX;
this->count = 0u;
@@ -156,24 +136,7 @@ void nciu::disconnect ( netiiu &newiiu )
this->accessRightState.clrReadPermit();
this->accessRightState.clrWritePermit();
this->f_claimSent = false;
if ( this->f_connected ) {
wasConnected = true;
}
else {
wasConnected = false;
}
this->f_connected = false;
if ( wasConnected ) {
/*
* look for events that have an event cancel in progress
*/
this->notify().disconnectNotify ();
this->notify().accessRightsNotify ( this->accessRightState );
}
this->resetRetryCount ();
}
/*

View File

@@ -47,6 +47,8 @@ public:
void connect ( unsigned nativeType,
unsigned nativeCount, unsigned sid, bool v41Ok );
void connect ();
void connectStateNotify () const;
void accessRightsNotify () const;
void disconnect ( netiiu &newiiu );
bool searchMsg ( unsigned short retrySeqNumber,
unsigned &retryNoForThisChannel );
@@ -77,9 +79,9 @@ protected:
~nciu (); // force pool allocation
private:
caAccessRights accessRightState;
cac &cacCtx;
char *pNameStr;
netiiu *piiu;
cac & cacCtx;
char * pNameStr;
netiiu * piiu;
ca_uint32_t sid; // server id
unsigned count;
unsigned retry; // search retry number
@@ -139,7 +141,6 @@ inline void nciu::resetRetryCount ()
inline void nciu::accessRightsStateChange ( const caAccessRights &arIn )
{
this->accessRightState = arIn;
this->notify().accessRightsNotify ( arIn );
}
inline ca_uint32_t nciu::getSID () const
@@ -203,4 +204,19 @@ inline void nciu::writeException ( int status,
this->notify().writeException ( status, pContext, typeIn, countIn );
}
inline void nciu::connectStateNotify () const
{
if ( this->f_connected ) {
this->notify().connectNotify ();
}
else {
this->notify().disconnectNotify ();
}
}
inline void nciu::accessRightsNotify () const
{
this->notify().accessRightsNotify ( this->accessRightState );
}
#endif // ifdef nciuh

View File

@@ -43,7 +43,7 @@ protected:
// perhaps we should not store the channel here and instead fetch it out of the
// notify
//
nciu &chan;
nciu & chan;
};
class netSubscription : public baseNMIU {

View File

@@ -45,17 +45,11 @@ void netiiu::show ( unsigned level ) const
}
// cac lock must also be applied when calling this
void netiiu::disconnectAllChan ( netiiu & newiiu )
void netiiu::uninstallAllChan ( tsDLList < nciu > & dstList )
{
tsDLIterBD < nciu > chan = this->channelList.firstIter ();
while ( chan.valid () ) {
tsDLIterBD < nciu > next = chan;
next++;
this->clearChannelRequest ( *chan );
this->channelList.remove ( *chan );
chan->disconnect ( newiiu );
newiiu.channelList.add ( *chan );
chan = next;
while ( nciu *pChan = this->channelList.get () ) {
this->clearChannelRequest ( *pChan );
dstList.add ( *pChan );
}
}
@@ -160,14 +154,6 @@ const char * netiiu::pHostName () const
return "<disconnected>";
}
void netiiu::disconnectAllIO ( nciu & )
{
}
void netiiu::connectAllIO ( nciu & )
{
}
double netiiu::beaconPeriod () const
{
return ( - DBL_MAX );
@@ -186,7 +172,7 @@ void netiiu::flushRequestIfAboveEarlyThreshold ()
{
}
void netiiu::blockUntilSendBacklogIsReasonable ( epicsMutex & )
void netiiu::blockUntilSendBacklogIsReasonable ( epicsMutex *, epicsMutex & )
{
}

View File

@@ -35,7 +35,7 @@ public:
virtual ~netiiu ();
void show ( unsigned level ) const;
unsigned channelCount () const;
void disconnectAllChan ( netiiu & newiiu );
void uninstallAllChan ( tsDLList < nciu > & dstList );
void connectTimeoutNotify ();
bool searchMsg ( unsigned short retrySeqNumber, unsigned &retryNoForThisChannel );
void resetChannelRetryCounts ();
@@ -51,8 +51,6 @@ public:
virtual void writeNotifyRequest ( nciu &, netWriteNotifyIO &, unsigned type, unsigned nElem, const void *pValue );
virtual void readNotifyRequest ( nciu &, netReadNotifyIO &, unsigned type, unsigned nElem );
virtual void createChannelRequest ( nciu & );
virtual void connectAllIO ( nciu &chan );
virtual void disconnectAllIO ( nciu &chan );
virtual void clearChannelRequest ( nciu & );
virtual void subscriptionRequest ( nciu &, netSubscription &subscr );
virtual void subscriptionCancelRequest ( nciu &, netSubscription &subscr );
@@ -60,7 +58,7 @@ public:
virtual void flushRequest ();
virtual bool flushBlockThreshold () const;
virtual void flushRequestIfAboveEarlyThreshold ();
virtual void blockUntilSendBacklogIsReasonable ( epicsMutex & );
virtual void blockUntilSendBacklogIsReasonable ( epicsMutex *, epicsMutex & );
virtual void requestRecvProcessPostponedFlush ();
protected:
cac * pCAC () const;

View File

@@ -212,7 +212,7 @@ public:
CASG * lookupCASG ( unsigned id );
void installCASG ( CASG & );
void uninstallCASG ( CASG & );
void blockForEventAndEnableCallbacks ( epicsEvent &event, double timeout );
int blockForEventAndEnableCallbacks ( epicsEvent &event, double timeout );
void selfTest ();
// perhaps these should be eliminated in deference to the exception mechanism
int printf ( const char *pformat, ... ) const;
@@ -492,10 +492,10 @@ inline void oldCAC::uninstallCASG ( CASG &sg )
this->clientCtx.uninstallCASG ( sg );
}
inline void oldCAC::blockForEventAndEnableCallbacks (
inline int oldCAC::blockForEventAndEnableCallbacks (
epicsEvent &event, double timeout )
{
this->clientCtx.blockForEventAndEnableCallbacks ( event, timeout );
return this->clientCtx.blockForEventAndEnableCallbacks ( event, timeout );
}
inline void oldCAC::vSignal ( int ca_status, const char *pfilenm,

View File

@@ -191,16 +191,6 @@ void oldCAC::show ( unsigned level ) const
{
::printf ( "oldCAC at %p\n",
static_cast <const void *> ( this ) );
#if 0 // gnu compiler does not like casting func ptr to void ptr
::printf ( "exception func at %p arg at %p\n",
static_cast <const void *> ( this->ca_exception_func ),
static_cast <const void *> ( this->ca_exception_arg ) );
::printf ( "printf func at %p\n",
static_cast <const void *> ( this->pVPrintfFunc ) );
::printf ( "fd registration func at %p arg at %p\n",
static_cast <const void *> ( this->fdRegFunc ),
static_cast <const void *> ( this->fdRegArg ) );
#endif
if ( level > 0u ) {
this->mutex.show ( level - 1u );
this->clientCtx.show ( level - 1u );

View File

@@ -130,6 +130,7 @@ protected:
private:
tsDLList < syncGroupNotify > ioList;
epicsMutex mutable mutex;
epicsMutex serializeBlock;
epicsEvent sem;
oldCAC & client;
unsigned magic;

View File

@@ -89,7 +89,7 @@ extern "C" void cacSendThreadTCP ( void *pParam )
}
}
catch ( ... ) {
piiu->printf ("cac: tcp send thread received an exception - diconnecting\n");
piiu->printf ("cac: tcp send thread received an exception - disconnecting\n");
piiu->forcedShutdown ();
}
@@ -158,8 +158,9 @@ unsigned tcpiiu::recvBytes ( void *pBuf, unsigned nBytesInBuf )
}
assert ( nBytesInBuf <= INT_MAX );
int status = ::recv ( this->sock, static_cast <char *> ( pBuf ),
static_cast <int> ( nBytesInBuf ), 0);
static_cast <int> ( nBytesInBuf ), this->recvFlag );
if ( status <= 0 ) {
int localErrno = SOCKERRNO;
@@ -207,102 +208,107 @@ extern "C" void cacRecvThreadTCP ( void *pParam )
{
tcpiiu *piiu = ( tcpiiu * ) pParam;
piiu->pCAC()->attachToClientCtx ();
epicsThreadPrivateSet ( caClientCallbackThreadId, piiu );
piiu->connect ();
{
epicsAutoMutex autoMutex ( piiu->pCAC()->mutexRef() );
if ( piiu->state == iiu_connected ) {
unsigned priorityOfSend;
epicsThreadBooleanStatus tbs = epicsThreadLowestPriorityLevelAbove (
piiu->pCAC ()->getInitializingThreadsPriority (), &priorityOfSend );
if ( tbs != epicsThreadBooleanStatusSuccess ) {
priorityOfSend = piiu->pCAC ()->getInitializingThreadsPriority ();
}
epicsThreadId tid = epicsThreadCreate ( "CAC-TCP-send", priorityOfSend,
epicsThreadGetStackSize ( epicsThreadStackMedium ), cacSendThreadTCP, piiu );
if ( ! tid ) {
piiu->recvThreadExitEvent.signal ();
piiu->sendThreadExitEvent.signal ();
piiu->cleanShutdown ();
return;
}
}
else {
piiu->recvThreadExitEvent.signal ();
piiu->sendThreadExitEvent.signal ();
piiu->cleanShutdown ();
return;
}
callbackAutoMutex autoMutex ( *piiu->pCAC() );
piiu->pCAC()->notifyNewFD ( piiu->sock );
}
unsigned nBytes = 0u;
while ( piiu->state == iiu_connected ) {
if ( nBytes >= maxBytesPendingTCP ) {
piiu->recvThreadRingBufferSpaceAvailableEvent.wait ();
epicsAutoMutex autoMutex ( piiu->pCAC()->mutexRef() );
nBytes = piiu->recvQue.occupiedBytes ();
if ( piiu->state == iiu_connected ) {
unsigned priorityOfSend;
epicsThreadBooleanStatus tbs = epicsThreadLowestPriorityLevelAbove (
piiu->pCAC()->getInitializingThreadsPriority (), &priorityOfSend );
if ( tbs != epicsThreadBooleanStatusSuccess ) {
priorityOfSend = piiu->pCAC ()->getInitializingThreadsPriority ();
}
else {
comBuf * pComBuf = new ( std::nothrow ) comBuf;
if ( pComBuf ) {
unsigned nBytesIn = pComBuf->fillFromWire ( *piiu );
if ( nBytesIn ) {
bool msgHeaderButNoBody;
{
epicsAutoMutex autoMutex ( piiu->pCAC()->mutexRef() );
nBytes = piiu->recvQue.occupiedBytes ();
msgHeaderButNoBody = piiu->oldMsgHeaderAvailable;
piiu->recvQue.pushLastComBufReceived ( *pComBuf );
if ( nBytesIn == pComBuf->capacityBytes () ) {
if ( piiu->contigRecvMsgCount >= contiguousMsgCountWhichTriggersFlowControl ) {
piiu->busyStateDetected = true;
}
else {
piiu->contigRecvMsgCount++;
}
}
else {
piiu->contigRecvMsgCount = 0u;
piiu->busyStateDetected = false;
}
piiu->unacknowledgedSendBytes = 0u;
}
// reschedule connection activity watchdog
// but dont hold the lock for fear of deadlocking
// because cancel is blocking for the completion
// of the recvDog expire which takes the lock
piiu->recvDog.messageArrivalNotify ();
epicsThreadId tid = epicsThreadCreate ( "CAC-TCP-send", priorityOfSend,
epicsThreadGetStackSize ( epicsThreadStackMedium ), cacSendThreadTCP, piiu );
if ( ! tid ) {
piiu->sendThreadExitEvent.signal ();
piiu->cleanShutdown ();
}
}
else {
piiu->sendThreadExitEvent.signal ();
piiu->cleanShutdown ();
}
// wake up recv thread only if
// 1) there are currently no bytes in the queue
// 2) if the recv thread is currently blocking for an incomplete msg
if ( nBytes < sizeof ( caHdr ) || msgHeaderButNoBody ) {
piiu->pCAC()->signalRecvActivity ();
}
while ( piiu->state == iiu_connected ) {
comBuf * pComBuf = new ( std::nothrow ) comBuf;
if ( pComBuf ) {
if ( piiu->preemptiveCallbackEnable ) {
piiu->recvFlag = 0;
}
else {
piiu->recvFlag = MSG_PEEK;
}
unsigned nBytesIn = pComBuf->fillFromWire ( *piiu );
if ( nBytes <= UINT_MAX - nBytesIn ) {
nBytes += nBytesIn;
// only one recv thread at a time may call callbacks
callbackAutoMutex autoMutex ( *piiu->pCAC() );
//
// We leave the bytes pending and fetch them again after
// callbacks are enabled when running in the old preemptive
// call back disabled mode so that asynchronous wakeup via
// file manager call backs works correctly. Oddly enough,
// this does not appear to impact performance.
//
if ( ! piiu->preemptiveCallbackEnable ) {
pComBuf->clear ();
piiu->recvFlag = 0;
nBytesIn = pComBuf->fillFromWire ( *piiu );
}
if ( nBytesIn ) {
piiu->recvQue.pushLastComBufReceived ( *pComBuf );
if ( nBytesIn == pComBuf->capacityBytes () ) {
if ( piiu->contigRecvMsgCount >= contiguousMsgCountWhichTriggersFlowControl ) {
piiu->busyStateDetected = true;
}
else {
nBytes = UINT_MAX;
else {
piiu->contigRecvMsgCount++;
}
}
else {
pComBuf->destroy ();
epicsAutoMutex autoMutex ( piiu->pCAC()->mutexRef() );
nBytes = piiu->recvQue.occupiedBytes ();
}
piiu->contigRecvMsgCount = 0u;
piiu->busyStateDetected = false;
}
piiu->unacknowledgedSendBytes = 0u;
// reschedule connection activity watchdog
// but dont hold the lock for fear of deadlocking
// because cancel is blocking for the completion
// of the recvDog expire which takes the lock
piiu->recvDog.messageArrivalNotify ();
//
// execute receive labor
//
piiu->processIncoming ();
}
else {
// no way to be informed when memory is available
epicsThreadSleep ( 0.5 );
epicsAutoMutex autoMutex ( piiu->pCAC()->mutexRef() );
nBytes = piiu->recvQue.occupiedBytes ();
pComBuf->destroy ();
}
}
else {
// no way to be informed when memory is available
epicsThreadSleep ( 0.5 );
}
}
piiu->recvThreadExitEvent.signal ();
{
callbackAutoMutex autoMutex ( *piiu->pCAC() );
piiu->pCAC()->uninstallIIU ( *piiu );
piiu->pCAC()->notifyDestroyFD ( piiu->sock );
}
piiu->destroy ();
}
//
@@ -311,7 +317,8 @@ extern "C" void cacRecvThreadTCP ( void *pParam )
tcpiiu::tcpiiu ( cac &cac, double connectionTimeout,
epicsTimerQueue &timerQueue, const osiSockAddr &addrIn,
unsigned minorVersion, class bhe &bheIn,
ipAddrToAsciiEngine &engineIn ) :
ipAddrToAsciiEngine &engineIn,
bool preemptiveCallbackEnableIn ) :
netiiu ( &cac ),
recvDog ( *this, connectionTimeout, timerQueue ),
sendDog ( *this, connectionTimeout, timerQueue ),
@@ -329,15 +336,16 @@ tcpiiu::tcpiiu ( cac &cac, double connectionTimeout,
blockingForFlush ( 0u ),
socketLibrarySendBufferSize ( 0u ),
unacknowledgedSendBytes ( 0u ),
recvFlag ( 0 ),
busyStateDetected ( false ),
flowControlActive ( false ),
echoRequestPending ( false ),
oldMsgHeaderAvailable ( false ),
msgHeaderAvailable ( false ),
sockCloseCompleted ( false ),
f_trueOnceOnly ( true ),
earlyFlush ( false ),
recvProcessPostponedFlush ( false )
recvProcessPostponedFlush ( false ),
preemptiveCallbackEnable ( preemptiveCallbackEnableIn )
{
if ( ! this->pCurData ) {
throw std::bad_alloc ();
@@ -378,7 +386,6 @@ tcpiiu::tcpiiu ( cac &cac, double connectionTimeout,
this->userNameSetRequest ();
this->hostNameSetRequest ();
# if 0
{
int i;
@@ -424,7 +431,7 @@ tcpiiu::tcpiiu ( cac &cac, double connectionTimeout,
unsigned priorityOfRecv;
epicsThreadBooleanStatus tbs = epicsThreadLowestPriorityLevelAbove (
this->pCAC ()->getInitializingThreadsPriority (), &priorityOfRecv );
this->pCAC()->getInitializingThreadsPriority (), &priorityOfRecv );
if ( tbs != epicsThreadBooleanStatusSuccess ) {
priorityOfRecv = this->pCAC ()->getInitializingThreadsPriority ();
}
@@ -524,7 +531,7 @@ void tcpiiu::cleanShutdown ()
else if ( this->state == iiu_connecting ) {
int status = socket_close ( this->sock );
if ( status ) {
errlogPrintf ("CAC TCP socket close error was %s\n",
errlogPrintf ( "CAC TCP socket close error was %s\n",
SOCKERRSTR (SOCKERRNO) );
}
else {
@@ -533,8 +540,6 @@ void tcpiiu::cleanShutdown ()
}
}
this->sendThreadFlushEvent.signal ();
this->recvThreadRingBufferSpaceAvailableEvent.signal ();
this->pCAC ()->signalRecvActivity ();
}
/*
@@ -569,8 +574,6 @@ void tcpiiu::forcedShutdown ()
}
this->sendThreadFlushEvent.signal ();
this->recvThreadRingBufferSpaceAvailableEvent.signal ();
this->pCAC()->signalRecvActivity ();
}
//
@@ -605,27 +608,6 @@ tcpiiu::~tcpiiu ()
}
}
// wait for recv thread to exit
while ( true ) {
bool signaled = this->recvThreadExitEvent.wait ( shutdownDelay );
if ( signaled ) {
break;
}
if ( ! this->sockCloseCompleted ) {
printf ( "Gave up waiting for \"shutdown()\" to force receive thread to exit after %f sec\n",
shutdownDelay);
printf ( "Closing socket\n" );
int status = socket_close ( this->sock );
if ( status ) {
errlogPrintf ("CAC TCP socket close error was %s\n",
SOCKERRSTR ( SOCKERRNO ) );
}
else {
this->sockCloseCompleted = true;
}
}
}
if ( ! this->sockCloseCompleted ) {
int status = socket_close ( this->sock );
if ( status ) {
@@ -722,12 +704,8 @@ void tcpiiu::show ( unsigned level ) const
::printf ( "\tvirtual circuit socket identifier %d\n", this->sock );
::printf ( "\tsend thread flush signal:\n" );
this->sendThreadFlushEvent.show ( level-3u );
::printf ( "\trecv thread buffer space available signal:\n" );
this->recvThreadRingBufferSpaceAvailableEvent.show ( level-3u );
::printf ( "\tsend thread exit signal:\n" );
this->sendThreadExitEvent.show ( level-3u );
::printf ( "\trecv thread exit signal:\n" );
this->recvThreadExitEvent.show ( level-3u );
::printf ("\techo pending bool = %u\n", this->echoRequestPending );
::printf ( "IO identifier hash table:\n" );
this->BHE.show ( level - 3u );
@@ -756,7 +734,7 @@ bool tcpiiu::setEchoRequestPending ()
//
void tcpiiu::processIncoming ()
{
while ( 1 ) {
while ( true ) {
//
// fetch a complete message header
@@ -825,7 +803,6 @@ void tcpiiu::processIncoming ()
&this->pCurData[this->curDataBytes],
this->curMsg.m_postsize - this->curDataBytes );
if ( this->curDataBytes < this->curMsg.m_postsize ) {
this->recvThreadRingBufferSpaceAvailableEvent.signal ();
this->flushIfRecvProcessRequested ();
return;
}
@@ -848,16 +825,10 @@ void tcpiiu::processIncoming ()
this->curDataBytes += this->recvQue.removeBytes (
this->curMsg.m_postsize - this->curDataBytes );
if ( this->curDataBytes < this->curMsg.m_postsize ) {
this->recvThreadRingBufferSpaceAvailableEvent.signal ();
this->flushIfRecvProcessRequested ();
return;
}
}
if ( nBytes >= maxBytesPendingTCP &&
this->recvQue.occupiedBytes () < maxBytesPendingTCP ) {
this->recvThreadRingBufferSpaceAvailableEvent.signal ();
}
this->oldMsgHeaderAvailable = false;
this->msgHeaderAvailable = false;
@@ -1276,19 +1247,26 @@ bool tcpiiu::flush ()
}
// ~tcpiiu() will not return while this->blockingForFlush is greater than zero
void tcpiiu::blockUntilSendBacklogIsReasonable ( epicsMutex &mutex )
void tcpiiu::blockUntilSendBacklogIsReasonable (
epicsMutex *pCallbackMutex, epicsMutex & primaryMutex )
{
assert ( this->blockingForFlush < UINT_MAX );
this->blockingForFlush++;
while ( this->sendQue.flushBlockThreshold(0u) && this->state == iiu_connected ) {
epicsAutoMutexRelease autoRelease ( mutex );
this->flushBlockEvent.wait ( 5.0 );
}
if ( this->blockingForFlush == 1 ) {
this->flushBlockEvent.signal ();
epicsAutoMutexRelease autoRelease ( primaryMutex );
if ( pCallbackMutex ) {
epicsAutoMutexRelease autoRelease ( *pCallbackMutex );
this->flushBlockEvent.wait ();
}
else {
this->flushBlockEvent.wait ();
}
}
assert ( this->blockingForFlush > 0u );
this->blockingForFlush--;
if ( this->blockingForFlush == 0 ) {
this->flushBlockEvent.signal ();
}
}
void tcpiiu::flushRequestIfAboveEarlyThreshold ()

View File

@@ -51,9 +51,9 @@ const udpiiu::pProtoStubUDP udpiiu::udpJumpTableCAC [] =
//
// udpiiu::udpiiu ()
//
udpiiu::udpiiu ( cac &cac, epicsThreadPrivateId isRecvProcessIdIn ) :
netiiu ( &cac ), isRecvProcessId ( isRecvProcessIdIn ),
shutdownCmd ( false ), sockCloseCompleted ( false )
udpiiu::udpiiu ( cac &cac ) :
netiiu ( &cac ), shutdownCmd ( false ),
sockCloseCompleted ( false )
{
static const unsigned short PORT_ANY = 0u;
osiSockAddr addr;
@@ -146,6 +146,11 @@ udpiiu::udpiiu ( cac &cac, epicsThreadPrivateId isRecvProcessIdIn ) :
ellInit ( &this->dest );
configureChannelAccessAddressList ( &this->dest, this->sock, this->serverPort );
if ( ellCount ( &this->dest ) == 0 ) {
// no need to lock callbacks here because
// 1) this is called while in a CA client function
// 2) no auxiliary threads are running at this point
// (taking the callback lock here would break the
// lock hierarchy and risk deadlocks)
genLocalExcep ( *this->pCAC (), ECA_NOSEARCHADDR, NULL );
}
@@ -245,7 +250,7 @@ void udpiiu::recvMsg ()
extern "C" void cacRecvThreadUDP ( void *pParam )
{
udpiiu *piiu = (udpiiu *) pParam;
epicsThreadPrivateSet ( piiu->isRecvProcessId, pParam );
epicsThreadPrivateSet ( caClientCallbackThreadId, pParam );
do {
piiu->recvMsg ();
} while ( ! piiu->shutdownCmd );
@@ -643,12 +648,14 @@ bool udpiiu::exceptionRespAction ( const caHdr &msg,
return true;
}
void udpiiu::postMsg ( const osiSockAddr &net_addr,
char *pInBuf, arrayElementCount blockSize,
const epicsTime &currentTime)
void udpiiu::postMsg ( const osiSockAddr & net_addr,
char * pInBuf, arrayElementCount blockSize,
const epicsTime & currentTime )
{
caHdr *pCurMsg;
callbackAutoMutex autoMutex ( *this->pCAC() );
while ( blockSize ) {
arrayElementCount size;
@@ -656,8 +663,8 @@ void udpiiu::postMsg ( const osiSockAddr &net_addr,
char buf[64];
sockAddrToDottedIP ( &net_addr.sa, buf, sizeof ( buf ) );
this->printf (
"%s: undecipherable (too small) UDP msg from %s ignored\n", __FILE__,
buf );
"%s: undecipherable (too small) UDP msg from %s ignored\n",
__FILE__, buf );
return;
}

View File

@@ -43,7 +43,7 @@ class epicsTime;
class udpiiu : public netiiu {
public:
udpiiu ( class cac &, epicsThreadPrivateId );
udpiiu ( class cac & );
virtual ~udpiiu ();
void shutdown ();
void recvMsg ();
@@ -59,15 +59,12 @@ public:
// exceptions
class noSocket {};
SOCKET getSock () const;
private:
char xmitBuf [MAX_UDP_SEND];
char recvBuf [MAX_UDP_RECV];
ELLLIST dest;
epicsThreadId recvThreadId;
epicsEventId recvThreadExitSignal;
epicsThreadPrivateId isRecvProcessId;
unsigned nBytesInXmitBuf;
SOCKET sock;
unsigned short repeaterPort;
@@ -113,10 +110,5 @@ inline unsigned udpiiu::getPort () const
return this->localPort;
}
inline SOCKET udpiiu::getSock () const
{
return this->sock;
}
#endif // udpiiuh

View File

@@ -215,20 +215,21 @@ public:
tcpiiu ( cac &cac, double connectionTimeout,
epicsTimerQueue &timerQueue, const osiSockAddr &addrIn,
unsigned minorVersion, class bhe &bhe,
ipAddrToAsciiEngine &engineIn );
ipAddrToAsciiEngine & engineIn,
bool preemptiveCallbackEnable );
~tcpiiu ();
void connect ();
void processIncoming ();
void destroy ();
void cleanShutdown ();
void forcedShutdown ();
void cleanShutdown ();
void beaconAnomalyNotify ();
void beaconArrivalNotify ();
void flushRequest ();
bool flushBlockThreshold () const;
void flushRequestIfAboveEarlyThreshold ();
void blockUntilSendBacklogIsReasonable ( epicsMutex & );
void blockUntilSendBacklogIsReasonable
( epicsMutex * pCallBack, epicsMutex & primary );
virtual void show ( unsigned level ) const;
bool setEchoRequestPending ();
void requestRecvProcessPostponedFlush ();
@@ -245,9 +246,6 @@ public:
double beaconPeriod () const;
bhe & getBHE () const;
SOCKET getSock() const;
bool trueOnceOnly ();
private:
tcpRecvWatchdog recvDog;
tcpSendWatchdog sendDog;
@@ -263,24 +261,25 @@ private:
unsigned minorProtocolVersion;
iiu_conn_state state;
epicsEvent sendThreadFlushEvent;
epicsEvent recvThreadRingBufferSpaceAvailableEvent;
epicsEvent sendThreadExitEvent;
epicsEvent recvThreadExitEvent;
epicsEvent flushBlockEvent;
SOCKET sock;
unsigned contigRecvMsgCount;
unsigned blockingForFlush;
unsigned socketLibrarySendBufferSize;
unsigned unacknowledgedSendBytes;
int recvFlag;
bool busyStateDetected; // only modified by the recv thread
bool flowControlActive; // only modified by the send process thread
bool echoRequestPending;
bool oldMsgHeaderAvailable;
bool msgHeaderAvailable;
bool sockCloseCompleted;
bool f_trueOnceOnly;
bool earlyFlush;
bool recvProcessPostponedFlush;
bool preemptiveCallbackEnable;
void processIncoming ();
unsigned sendBytes ( const void *pBuf, unsigned nBytesInBuf );
unsigned recvBytes ( void *pBuf, unsigned nBytesInBuf );
@@ -491,22 +490,6 @@ inline void tcpiiu::beaconArrivalNotify ()
this->recvDog.beaconArrivalNotify ();
}
inline bool tcpiiu::trueOnceOnly ()
{
if ( this->f_trueOnceOnly ) {
this->f_trueOnceOnly = false;
return true;
}
else {
return false;
}
}
inline SOCKET tcpiiu::getSock () const
{
return this->sock;
}
inline void tcpiiu::flushIfRecvProcessRequested ()
{
if ( this->recvProcessPostponedFlush ) {