rearranged code so that we have a late start
process receive thread again
This commit is contained in:
@@ -22,7 +22,6 @@ LIBSRCS += cacWriteNotify.cpp
|
||||
LIBSRCS += cacStateNotify.cpp
|
||||
LIBSRCS += cacServiceList.cpp
|
||||
LIBSRCS += access.cpp
|
||||
LIBSRCS += recvProcessThread.cpp
|
||||
LIBSRCS += iocinf.cpp
|
||||
LIBSRCS += convert.cpp
|
||||
LIBSRCS += test_event.cpp
|
||||
@@ -57,7 +56,6 @@ LIBSRCS += comQueRecv.cpp
|
||||
LIBSRCS += comQueSend.cpp
|
||||
LIBSRCS += comBuf.cpp
|
||||
LIBSRCS += hostNameCache.cpp
|
||||
LIBSRCS += ioCounterNet.cpp
|
||||
LIBSRCS += msgForMultiplyDefinedPV.cpp
|
||||
LIBSRCS += limboiiu.cpp
|
||||
|
||||
|
||||
+215
-66
@@ -110,13 +110,20 @@ cac::cac ( cacNotify ¬ifyIn, bool enablePreemptiveCallbackIn ) :
|
||||
pRepeaterSubscribeTmr ( 0 ),
|
||||
tcpSmallRecvBufFreeList ( 0 ),
|
||||
tcpLargeRecvBufFreeList ( 0 ),
|
||||
pRecvProcessThread ( 0 ),
|
||||
notify ( notifyIn ),
|
||||
ioNotifyInProgressId ( 0 ),
|
||||
initializingThreadsPriority ( epicsThreadGetPrioritySelf () ),
|
||||
threadsBlockingOnNotifyCompletion ( 0u ),
|
||||
maxRecvBytesTCP ( MAX_TCP ),
|
||||
recvProcessEnableRefCount ( 0u ),
|
||||
recvProcessCompletionNBlockers ( 0u ),
|
||||
pndRecvCnt ( 0u ),
|
||||
readSeq ( 0u ),
|
||||
enablePreemptiveCallback ( enablePreemptiveCallbackIn ),
|
||||
ioInProgress ( false )
|
||||
ioInProgress ( false ),
|
||||
recvProcessInProgress ( false ),
|
||||
recvProcessThreadExitRequest ( false )
|
||||
{
|
||||
long status;
|
||||
unsigned abovePriority;
|
||||
@@ -204,37 +211,19 @@ cac::cac ( cacNotify ¬ifyIn, bool enablePreemptiveCallbackIn ) :
|
||||
freeListCleanup ( this->tcpLargeRecvBufFreeList );
|
||||
throwWithLocation ( caErrorCode ( ECA_ALLOCMEM ) );
|
||||
}
|
||||
|
||||
//
|
||||
// unfortunately, this must be created here in the
|
||||
// constructor, and not on demand (only when it is needed)
|
||||
// because the enable reference count must be
|
||||
// maintained whenever this object exists.
|
||||
//
|
||||
this->pRecvProcThread = new recvProcessThread ( this );
|
||||
if ( ! this->pRecvProcThread ) {
|
||||
this->pTimerQueue->release ();
|
||||
free ( this->pUserName );
|
||||
freeListCleanup ( this->tcpSmallRecvBufFreeList );
|
||||
freeListCleanup ( this->tcpLargeRecvBufFreeList );
|
||||
throwWithLocation ( caErrorCode ( ECA_ALLOCMEM ) );
|
||||
}
|
||||
else if ( this->enablePreemptiveCallback ) {
|
||||
// only after this->pRecvProcThread is valid
|
||||
this->enableCallbackPreemption ();
|
||||
}
|
||||
}
|
||||
|
||||
cac::~cac ()
|
||||
{
|
||||
this->enableCallbackPreemption ();
|
||||
|
||||
//
|
||||
// make certain that process thread isnt deleting
|
||||
// tcpiiu objects at the same that this thread is
|
||||
//
|
||||
if ( this->pRecvProcThread ) {
|
||||
this->pRecvProcThread->disable ();
|
||||
this->recvProcessThreadExitRequest = true;
|
||||
this->recvProcessActivityEvent.signal ();
|
||||
this->recvProcessThreadExit.wait ();
|
||||
delete this->pRecvProcThread;
|
||||
}
|
||||
|
||||
{
|
||||
@@ -263,7 +252,6 @@ cac::~cac ()
|
||||
piiu->destroy ();
|
||||
}
|
||||
|
||||
delete this->pRecvProcThread;
|
||||
delete this->pRepeaterSubscribeTmr;
|
||||
delete this->pSearchTmr;
|
||||
|
||||
@@ -302,12 +290,11 @@ cac::~cac ()
|
||||
this->pTimerQueue->release ();
|
||||
}
|
||||
|
||||
// lock must be applied
|
||||
void cac::processRecvBacklog ()
|
||||
{
|
||||
tsDLList < tcpiiu > deadIIU;
|
||||
{
|
||||
epicsAutoMutex autoMutex ( this->mutex );
|
||||
|
||||
tsDLIterBD < tcpiiu > piiu = this->iiuList.firstIter ();
|
||||
while ( piiu.valid () ) {
|
||||
tsDLIterBD < tcpiiu > pNext = piiu;
|
||||
@@ -351,13 +338,13 @@ void cac::processRecvBacklog ()
|
||||
}
|
||||
}
|
||||
if ( deadIIU.count() ) {
|
||||
while ( tcpiiu *piiu = deadIIU.get() ) {
|
||||
piiu->destroy ();
|
||||
}
|
||||
{
|
||||
epicsAutoMutex autoMutex ( this->mutex );
|
||||
this->pSearchTmr->resetPeriod ( 0.0 );
|
||||
epicsAutoMutex autoRelease ( this->mutex );
|
||||
while ( tcpiiu *piiu = deadIIU.get() ) {
|
||||
piiu->destroy ();
|
||||
}
|
||||
}
|
||||
this->pSearchTmr->resetPeriod ( 0.0 );
|
||||
}
|
||||
}
|
||||
|
||||
@@ -384,11 +371,12 @@ void cac::show ( unsigned level ) const
|
||||
{
|
||||
epicsAutoMutex autoMutex2 ( this->mutex );
|
||||
|
||||
::printf ( "Channel Access Client Context at %p for user %s\n",
|
||||
static_cast <const void *> ( this ), this->pUserName );
|
||||
::printf ( "Channel Access Client Context at %p for user %s %s\n",
|
||||
static_cast <const void *> ( this ), this->pUserName,
|
||||
this->recvProcessInProgress ? "busy" : "idle" );
|
||||
if ( level > 0u ) {
|
||||
tsDLIterConstBD < tcpiiu > piiu = this->iiuList.firstIter ();
|
||||
while ( piiu.valid () ) {
|
||||
while ( piiu.valid() ) {
|
||||
piiu->show ( level - 1u );
|
||||
piiu++;
|
||||
}
|
||||
@@ -404,7 +392,8 @@ void cac::show ( unsigned level ) const
|
||||
if ( this->pudpiiu ) {
|
||||
this->pudpiiu->show ( level - 2u );
|
||||
}
|
||||
this->ioCounter.show ( level - 2u );
|
||||
::printf ( "\tthere are %u unsatisfied IO operations blocking ca_pend_io()\n",
|
||||
this->pndRecvCnt );
|
||||
}
|
||||
|
||||
if ( level > 2u ) {
|
||||
@@ -422,10 +411,6 @@ void cac::show ( unsigned level ) const
|
||||
::printf ( "Timer queue:\n" );
|
||||
this->pTimerQueue->show ( level - 3u );
|
||||
}
|
||||
if ( this->pRecvProcThread ) {
|
||||
::printf ( "incoming messages processing thread:\n" );
|
||||
this->pRecvProcThread->show ( level - 3u );
|
||||
}
|
||||
if ( this->pSearchTmr ) {
|
||||
::printf ( "search message timer:\n" );
|
||||
this->pSearchTmr->show ( level - 3u );
|
||||
@@ -436,18 +421,29 @@ void cac::show ( unsigned level ) const
|
||||
}
|
||||
::printf ( "IP address to name conversion engine:\n" );
|
||||
this->ipToAEngine.show ( level - 3u );
|
||||
::printf ( "recv process enable count %u\n",
|
||||
this->recvProcessEnableRefCount );
|
||||
::printf ( "recv process blocking for completion count %u\n",
|
||||
this->recvProcessCompletionNBlockers );
|
||||
::printf ( "\trecv process shutdown command boolean %u\n",
|
||||
this->recvProcessThreadExitRequest );
|
||||
::printf ( "\tthe current read sequence number is %u\n",
|
||||
this->readSeq );
|
||||
}
|
||||
|
||||
if ( level > 3u ) {
|
||||
::printf ( "Default mutex:\n");
|
||||
this->mutex.show ( level - 4u );
|
||||
}
|
||||
}
|
||||
|
||||
void cac::signalRecvActivity ()
|
||||
{
|
||||
if ( this->pRecvProcThread ) {
|
||||
this->pRecvProcThread->signalActivity ();
|
||||
::printf ( "Receive process activity event:\n" );
|
||||
this->recvProcessActivityEvent.show ( level - 3u );
|
||||
::printf ( "Receive process exit event:\n" );
|
||||
this->recvProcessThreadExit.show ( level - 3u );
|
||||
::printf ( "Receive processing done event:\n" );
|
||||
this->recvProcessCompletion.show ( level - 3u );
|
||||
::printf ( "IO done event:\n");
|
||||
this->ioDone.show ( level - 3u );
|
||||
::printf ( "mutex:\n" );
|
||||
this->mutex.show ( level - 3u );
|
||||
}
|
||||
}
|
||||
|
||||
@@ -527,7 +523,7 @@ int cac::pendIO ( const double &timeout )
|
||||
{
|
||||
// prevent recursion nightmares by disabling calls to
|
||||
// pendIO () from within a CA callback
|
||||
if ( this->pRecvProcThread->isCurrentThread () ) {
|
||||
if ( this->recvProcessThreadIsCurrentThread () ) {
|
||||
return ECA_EVDISALLOW;
|
||||
}
|
||||
|
||||
@@ -544,21 +540,22 @@ int cac::pendIO ( const double &timeout )
|
||||
else{
|
||||
remaining = timeout;
|
||||
}
|
||||
while ( this->ioCounter.currentCount () > 0 ) {
|
||||
while ( this->pndRecvCnt > 0 ) {
|
||||
if ( remaining < CAC_SIGNIFICANT_SELECT_DELAY ) {
|
||||
status = ECA_TIMEOUT;
|
||||
break;
|
||||
}
|
||||
this->ioCounter.waitForCompletion ( remaining );
|
||||
this->ioDone.wait ( remaining );
|
||||
if ( timeout != 0.0 ) {
|
||||
double delay = epicsTime::getCurrent () - beg_time;
|
||||
remaining = timeout - delay;
|
||||
}
|
||||
}
|
||||
|
||||
this->ioCounter.cleanUp ();
|
||||
{
|
||||
epicsAutoMutex autoMutex ( this->mutex );
|
||||
this->readSeq++;
|
||||
this->pndRecvCnt = 0u;
|
||||
if ( this->pudpiiu ) {
|
||||
this->pudpiiu->connectTimeoutNotify ();
|
||||
}
|
||||
@@ -573,7 +570,7 @@ int cac::pendEvent ( const double &timeout )
|
||||
{
|
||||
// prevent recursion nightmares by disabling calls to
|
||||
// pendIO () from within a CA callback
|
||||
if ( this->pRecvProcThread->isCurrentThread () ) {
|
||||
if ( this->recvProcessThreadIsCurrentThread () ) {
|
||||
return ECA_EVDISALLOW;
|
||||
}
|
||||
|
||||
@@ -595,16 +592,6 @@ int cac::pendEvent ( const double &timeout )
|
||||
return ECA_TIMEOUT;
|
||||
}
|
||||
|
||||
bool cac::ioComplete () const
|
||||
{
|
||||
if ( this->ioCounter.currentCount () == 0u ) {
|
||||
return true;
|
||||
}
|
||||
else{
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
// this is to only be used by early protocol revisions
|
||||
bool cac::connectChannel ( unsigned id )
|
||||
{
|
||||
@@ -648,6 +635,63 @@ void cac::registerService ( cacService &service )
|
||||
this->services.registerService ( service );
|
||||
}
|
||||
|
||||
void cac::startRecvProcessThread ()
|
||||
{
|
||||
bool newThread = false;
|
||||
{
|
||||
epicsAutoMutex epicsMutex ( this->mutex );
|
||||
if ( ! this->pRecvProcessThread ) {
|
||||
this->pRecvProcessThread = new epicsThread ( *this, "CAC-recv-process",
|
||||
epicsThreadGetStackSize ( epicsThreadStackSmall ),
|
||||
this->getInitializingThreadsPriority () );
|
||||
if ( ! this->pRecvProcessThread ) {
|
||||
throw std::bad_alloc ();
|
||||
}
|
||||
this->pRecvProcessThread->start ();
|
||||
newThread = true;
|
||||
}
|
||||
}
|
||||
if ( this->enablePreemptiveCallback && newThread ) {
|
||||
this->enableCallbackPreemption ();
|
||||
}
|
||||
}
|
||||
|
||||
// this is the recv process thread entry point
|
||||
void cac::run ()
|
||||
{
|
||||
epicsAutoMutex autoMutex ( this->mutex );
|
||||
|
||||
this->attachToClientCtx ();
|
||||
|
||||
while ( ! this->recvProcessThreadExitRequest ) {
|
||||
|
||||
{
|
||||
if ( this->recvProcessEnableRefCount ) {
|
||||
this->recvProcessInProgress = true;
|
||||
}
|
||||
}
|
||||
|
||||
if ( this->recvProcessInProgress ) {
|
||||
this->processRecvBacklog ();
|
||||
}
|
||||
|
||||
bool signalNeeded;
|
||||
{
|
||||
this->recvProcessInProgress = false;
|
||||
signalNeeded = this->recvProcessCompletionNBlockers > 0u;
|
||||
}
|
||||
|
||||
{
|
||||
epicsAutoMutexRelease autoRelease ( this->mutex );
|
||||
if ( signalNeeded ) {
|
||||
this->recvProcessCompletion.signal ();
|
||||
}
|
||||
this->recvProcessActivityEvent.wait ();
|
||||
}
|
||||
}
|
||||
this->recvProcessThreadExit.signal ();
|
||||
}
|
||||
|
||||
cacChannel & cac::createChannel ( const char *pName, cacChannelNotify &chan )
|
||||
{
|
||||
cacChannel *pIO;
|
||||
@@ -664,6 +708,7 @@ cacChannel & cac::createChannel ( const char *pName, cacChannelNotify &chan )
|
||||
epics_auto_ptr < cacChannel > pNetChan
|
||||
( new nciu ( *this, limboIIU, chan, pName ) );
|
||||
if ( pNetChan.get() ) {
|
||||
this->startRecvProcessThread ();
|
||||
return *pNetChan.release ();
|
||||
}
|
||||
else {
|
||||
@@ -717,15 +762,54 @@ bool cac::setupUDP ()
|
||||
|
||||
void cac::enableCallbackPreemption ()
|
||||
{
|
||||
if ( this->pRecvProcThread ) {
|
||||
this->pRecvProcThread->enable ();
|
||||
unsigned copy;
|
||||
{
|
||||
epicsAutoMutex autoMutex ( this->mutex );
|
||||
assert ( this->recvProcessEnableRefCount < UINT_MAX );
|
||||
copy = this->recvProcessEnableRefCount;
|
||||
this->recvProcessEnableRefCount++;
|
||||
}
|
||||
if ( copy == 0u ) {
|
||||
this->recvProcessActivityEvent.signal ();
|
||||
}
|
||||
}
|
||||
|
||||
void cac::disableCallbackPreemption ()
|
||||
{
|
||||
if ( this->pRecvProcThread ) {
|
||||
this->pRecvProcThread->disable ();
|
||||
bool wakeupNeeded;
|
||||
|
||||
{
|
||||
epicsAutoMutex autoMutex ( this->mutex );
|
||||
|
||||
if ( ! this->recvProcessInProgress ) {
|
||||
assert ( this->recvProcessEnableRefCount != 0u );
|
||||
this->recvProcessEnableRefCount--;
|
||||
return;
|
||||
}
|
||||
else {
|
||||
this->recvProcessCompletionNBlockers++;
|
||||
}
|
||||
}
|
||||
|
||||
while ( true ) {
|
||||
this->recvProcessCompletion.wait ();
|
||||
|
||||
{
|
||||
epicsAutoMutex autoMutex ( this->mutex );
|
||||
|
||||
if ( ! this->recvProcessInProgress ) {
|
||||
assert ( this->recvProcessEnableRefCount > 0u );
|
||||
this->recvProcessEnableRefCount--;
|
||||
assert ( this->recvProcessCompletionNBlockers > 0u );
|
||||
this->recvProcessCompletionNBlockers--;
|
||||
wakeupNeeded = this->recvProcessCompletionNBlockers > 0u;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if ( wakeupNeeded ) {
|
||||
this->recvProcessCompletion.signal ();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -864,7 +948,7 @@ void cac::flushIfRequired ( nciu &chan )
|
||||
// locking hierarchy reasons.
|
||||
bool blockPermit = true;
|
||||
if ( this->pRecvProcThread ) {
|
||||
if ( this->pRecvProcThread->isCurrentThread () ) {
|
||||
if ( this->recvProcessThreadIsCurrentThread () ) {
|
||||
blockPermit = false;
|
||||
}
|
||||
}
|
||||
@@ -1600,3 +1684,68 @@ void cac::vSignal ( int ca_status, const char *pfilenm,
|
||||
this->printf ( "..................................................................\n" );
|
||||
}
|
||||
|
||||
void cac::incrementOutstandingIO ()
|
||||
{
|
||||
epicsAutoMutex locker ( this->mutex );
|
||||
if ( this->pndRecvCnt < UINT_MAX ) {
|
||||
this->pndRecvCnt++;
|
||||
}
|
||||
else {
|
||||
throwWithLocation ( caErrorCode (ECA_INTERNAL) );
|
||||
}
|
||||
}
|
||||
|
||||
void cac::decrementOutstandingIO ()
|
||||
{
|
||||
bool signalNeeded;
|
||||
|
||||
{
|
||||
epicsAutoMutex locker ( this->mutex );
|
||||
if ( this->pndRecvCnt > 0u ) {
|
||||
this->pndRecvCnt--;
|
||||
if ( this->pndRecvCnt == 0u ) {
|
||||
signalNeeded = true;
|
||||
}
|
||||
else {
|
||||
signalNeeded = false;
|
||||
}
|
||||
}
|
||||
else {
|
||||
signalNeeded = true;
|
||||
}
|
||||
}
|
||||
|
||||
if ( signalNeeded ) {
|
||||
this->ioDone.signal ();
|
||||
}
|
||||
}
|
||||
|
||||
void cac::decrementOutstandingIO ( unsigned sequenceNo )
|
||||
{
|
||||
bool signalNeeded;
|
||||
|
||||
{
|
||||
epicsAutoMutex locker ( this->mutex );
|
||||
if ( this->readSeq == sequenceNo ) {
|
||||
if ( this->pndRecvCnt > 0u ) {
|
||||
this->pndRecvCnt--;
|
||||
if ( this->pndRecvCnt == 0u ) {
|
||||
signalNeeded = true;
|
||||
}
|
||||
else {
|
||||
signalNeeded = false;
|
||||
}
|
||||
}
|
||||
else {
|
||||
signalNeeded = true;
|
||||
}
|
||||
}
|
||||
else {
|
||||
signalNeeded = false;
|
||||
}
|
||||
}
|
||||
|
||||
if ( signalNeeded ) {
|
||||
this->ioDone.signal ();
|
||||
}
|
||||
}
|
||||
|
||||
+35
-70
@@ -30,54 +30,6 @@
|
||||
#include "cacIO.h"
|
||||
#undef epicsExportSharedSymbols
|
||||
|
||||
class ioCounterNet {
|
||||
public:
|
||||
ioCounterNet ();
|
||||
void increment ();
|
||||
void decrement ();
|
||||
void decrement ( unsigned seqNumber );
|
||||
unsigned sequenceNumber () const;
|
||||
unsigned currentCount () const;
|
||||
void cleanUp ();
|
||||
void show ( unsigned level ) const;
|
||||
void waitForCompletion ( double delaySec );
|
||||
private:
|
||||
unsigned pndrecvcnt;
|
||||
unsigned readSeq;
|
||||
epicsMutex mutex;
|
||||
epicsEvent ioDone;
|
||||
};
|
||||
|
||||
class recvProcessThread : public epicsThreadRunable {
|
||||
public:
|
||||
recvProcessThread ( class cac *pcacIn );
|
||||
virtual ~recvProcessThread ();
|
||||
void run ();
|
||||
void enable ();
|
||||
void disable ();
|
||||
void signalActivity ();
|
||||
bool isCurrentThread () const;
|
||||
void show ( unsigned level ) const;
|
||||
private:
|
||||
//
|
||||
// The additional complexity associated with
|
||||
// "processingDone" event and the "processing" flag
|
||||
// avoid complex locking hierarchy constraints
|
||||
// and therefore reduces the chance of creating
|
||||
// a deadlock window during code maintenance.
|
||||
//
|
||||
epicsThread thread;
|
||||
epicsEvent recvActivity;
|
||||
class cac *pcac;
|
||||
epicsEvent exit;
|
||||
epicsEvent processingDone;
|
||||
mutable epicsMutex mutex;
|
||||
unsigned enableRefCount;
|
||||
unsigned blockingForCompletion;
|
||||
bool processing;
|
||||
bool shutDown;
|
||||
};
|
||||
|
||||
class netWriteNotifyIO;
|
||||
class netReadNotifyIO;
|
||||
class netSubscription;
|
||||
@@ -96,7 +48,7 @@ struct CASG;
|
||||
class inetAddrID;
|
||||
struct caHdrLargeArray;
|
||||
|
||||
class cac : private cacRecycle
|
||||
class cac : private cacRecycle, private epicsThreadRunable
|
||||
{
|
||||
public:
|
||||
cac ( cacNotify &, bool enablePreemptiveCallback = false );
|
||||
@@ -185,7 +137,6 @@ public:
|
||||
void releaseLargeBufferTCP ( char * );
|
||||
|
||||
private:
|
||||
ioCounterNet ioCounter;
|
||||
ipAddrToAsciiEngine ipToAEngine;
|
||||
cacServiceList services;
|
||||
tsDLList < tcpiiu > iiuList;
|
||||
@@ -210,28 +161,42 @@ private:
|
||||
double connTMO;
|
||||
mutable epicsMutex mutex;
|
||||
epicsEvent notifyCompletionEvent;
|
||||
epicsEvent recvProcessActivityEvent;
|
||||
epicsEvent recvProcessCompletion;
|
||||
epicsEvent recvProcessThreadExit;
|
||||
epicsEvent ioDone;
|
||||
epicsTimerQueueActive *pTimerQueue;
|
||||
char *pUserName;
|
||||
recvProcessThread *pRecvProcThread;
|
||||
epicsThread *pRecvProcThread;
|
||||
class udpiiu *pudpiiu;
|
||||
class searchTimer *pSearchTmr;
|
||||
class repeaterSubscribeTimer
|
||||
*pRepeaterSubscribeTmr;
|
||||
void *tcpSmallRecvBufFreeList;
|
||||
void *tcpLargeRecvBufFreeList;
|
||||
epicsThread *pRecvProcessThread;
|
||||
cacNotify ¬ify;
|
||||
unsigned ioNotifyInProgressId;
|
||||
unsigned initializingThreadsPriority;
|
||||
unsigned threadsBlockingOnNotifyCompletion;
|
||||
unsigned maxRecvBytesTCP;
|
||||
unsigned recvProcessEnableRefCount;
|
||||
unsigned recvProcessCompletionNBlockers;
|
||||
unsigned pndRecvCnt;
|
||||
unsigned readSeq;
|
||||
bool enablePreemptiveCallback;
|
||||
bool ioInProgress;
|
||||
bool recvProcessInProgress;
|
||||
bool recvProcessThreadExitRequest;
|
||||
|
||||
void run ();
|
||||
bool setupUDP ();
|
||||
void flushIfRequired ( nciu & ); // lock must be applied
|
||||
void recycleReadNotifyIO ( netReadNotifyIO &io );
|
||||
void recycleWriteNotifyIO ( netWriteNotifyIO &io );
|
||||
void recycleSubscription ( netSubscription &io );
|
||||
|
||||
bool recvProcessThreadIsCurrentThread () const;
|
||||
void startRecvProcessThread ();
|
||||
void ioCompletionNotify ( unsigned id, unsigned type,
|
||||
arrayElementCount count, const void *pData );
|
||||
void ioExceptionNotify ( unsigned id,
|
||||
@@ -300,24 +265,9 @@ inline unsigned cac::getInitializingThreadsPriority () const
|
||||
return this->initializingThreadsPriority;
|
||||
}
|
||||
|
||||
inline void cac::incrementOutstandingIO ()
|
||||
{
|
||||
this->ioCounter.increment ();
|
||||
}
|
||||
|
||||
inline void cac::decrementOutstandingIO ()
|
||||
{
|
||||
this->ioCounter.decrement ();
|
||||
}
|
||||
|
||||
inline void cac::decrementOutstandingIO ( unsigned sequenceNo )
|
||||
{
|
||||
this->ioCounter.decrement ( sequenceNo );
|
||||
}
|
||||
|
||||
inline unsigned cac::sequenceNumberOfOutstandingIO () const
|
||||
{
|
||||
return this->ioCounter.sequenceNumber ();
|
||||
return this->readSeq;
|
||||
}
|
||||
|
||||
inline epicsMutex & cac::mutexRef ()
|
||||
@@ -366,9 +316,24 @@ inline void cac::releaseLargeBufferTCP ( char *pBuf )
|
||||
freeListFree ( this->tcpLargeRecvBufFreeList, pBuf );
|
||||
}
|
||||
|
||||
inline bool recvProcessThread::isCurrentThread () const
|
||||
inline bool cac::recvProcessThreadIsCurrentThread () const
|
||||
{
|
||||
return this->thread.isCurrentThread ();
|
||||
if ( this->pRecvProcessThread ) {
|
||||
return this->pRecvProcessThread->isCurrentThread();
|
||||
}
|
||||
else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
inline bool cac::ioComplete () const
|
||||
{
|
||||
return ( this->pndRecvCnt == 0u );
|
||||
}
|
||||
|
||||
inline void cac::signalRecvActivity ()
|
||||
{
|
||||
this->recvProcessActivityEvent.signal ();
|
||||
}
|
||||
|
||||
#endif // ifdef cach
|
||||
|
||||
@@ -1,135 +0,0 @@
|
||||
|
||||
/*
|
||||
* $Id$
|
||||
*
|
||||
*
|
||||
* L O S A L A M O S
|
||||
* Los Alamos National Laboratory
|
||||
* Los Alamos, New Mexico 87545
|
||||
*
|
||||
* Copyright, 1986, The Regents of the University of California.
|
||||
*
|
||||
*
|
||||
* Author Jeffrey O. Hill
|
||||
* johill@lanl.gov
|
||||
* 505 665 1831
|
||||
*/
|
||||
|
||||
#include <limits.h>
|
||||
|
||||
#define epicsAssertAuthor "Jeff Hill johill@lanl.gov"
|
||||
|
||||
#include "iocinf.h"
|
||||
#include "cac.h"
|
||||
|
||||
#define epicsExportSharedSymbols
|
||||
#include "caerr.h" // for CA_INTERNAL
|
||||
#undef epicsExportSharedSymbols
|
||||
|
||||
ioCounterNet::ioCounterNet () : pndrecvcnt ( 0u ), readSeq ( 0u )
|
||||
{
|
||||
}
|
||||
|
||||
void ioCounterNet::increment ()
|
||||
{
|
||||
epicsAutoMutex locker ( this->mutex );
|
||||
if ( this->pndrecvcnt < UINT_MAX ) {
|
||||
this->pndrecvcnt++;
|
||||
}
|
||||
else {
|
||||
throwWithLocation ( caErrorCode (ECA_INTERNAL) );
|
||||
}
|
||||
}
|
||||
|
||||
unsigned ioCounterNet::sequenceNumber () const
|
||||
{
|
||||
return this->readSeq;
|
||||
}
|
||||
|
||||
unsigned ioCounterNet::currentCount () const
|
||||
{
|
||||
return this->pndrecvcnt;
|
||||
}
|
||||
|
||||
void ioCounterNet::waitForCompletion ( double delaySec )
|
||||
{
|
||||
this->ioDone.wait ( delaySec );
|
||||
}
|
||||
|
||||
void ioCounterNet::cleanUp ()
|
||||
{
|
||||
epicsAutoMutex locker ( this->mutex );
|
||||
this->readSeq++;
|
||||
this->pndrecvcnt = 0u;
|
||||
}
|
||||
|
||||
void ioCounterNet::decrement ()
|
||||
{
|
||||
bool signalNeeded;
|
||||
|
||||
{
|
||||
epicsAutoMutex locker ( this->mutex );
|
||||
if ( this->pndrecvcnt > 0u ) {
|
||||
this->pndrecvcnt--;
|
||||
if ( this->pndrecvcnt == 0u ) {
|
||||
signalNeeded = true;
|
||||
}
|
||||
else {
|
||||
signalNeeded = false;
|
||||
}
|
||||
}
|
||||
else {
|
||||
signalNeeded = true;
|
||||
}
|
||||
}
|
||||
|
||||
if ( signalNeeded ) {
|
||||
this->ioDone.signal ();
|
||||
}
|
||||
}
|
||||
|
||||
void ioCounterNet::decrement ( unsigned seqNumber )
|
||||
{
|
||||
bool signalNeeded;
|
||||
|
||||
{
|
||||
epicsAutoMutex locker ( this->mutex );
|
||||
if ( this->readSeq == seqNumber ) {
|
||||
if ( this->pndrecvcnt > 0u ) {
|
||||
this->pndrecvcnt--;
|
||||
if ( this->pndrecvcnt == 0u ) {
|
||||
signalNeeded = true;
|
||||
}
|
||||
else {
|
||||
signalNeeded = false;
|
||||
}
|
||||
}
|
||||
else {
|
||||
signalNeeded = true;
|
||||
}
|
||||
}
|
||||
else {
|
||||
signalNeeded = false;
|
||||
}
|
||||
}
|
||||
|
||||
if ( signalNeeded ) {
|
||||
this->ioDone.signal ();
|
||||
}
|
||||
}
|
||||
|
||||
void ioCounterNet::show ( unsigned level ) const
|
||||
{
|
||||
::printf ( "ioCounterNet at %p\n",
|
||||
static_cast <const void *> ( this ) );
|
||||
::printf ( "\tthere are %u unsatisfied IO operations blocking ca_pend_io()\n",
|
||||
this->pndrecvcnt );
|
||||
if ( level > 0u ) {
|
||||
::printf ( "\tthe current read sequence number is %u\n",
|
||||
this->readSeq );
|
||||
}
|
||||
if ( level > 1u ) {
|
||||
::printf ( "IO done event:\n");
|
||||
this->ioDone.show ( level - 2u );
|
||||
}
|
||||
}
|
||||
@@ -1,164 +0,0 @@
|
||||
|
||||
/*
|
||||
* $Id$
|
||||
*
|
||||
*
|
||||
* L O S A L A M O S
|
||||
* Los Alamos National Laboratory
|
||||
* Los Alamos, New Mexico 87545
|
||||
*
|
||||
* Copyright, 1986, The Regents of the University of California.
|
||||
*
|
||||
*
|
||||
* Author Jeffrey O. Hill
|
||||
* johill@lanl.gov
|
||||
* 505 665 1831
|
||||
*
|
||||
*/
|
||||
|
||||
#define epicsAssertAuthor "Jeff Hill johill@lanl.gov"
|
||||
|
||||
#include "iocinf.h"
|
||||
#include "cac.h"
|
||||
|
||||
#define epicsExportSharedSymbols
|
||||
#include "cadef.h"
|
||||
#undef epicsExportSharedSymbols
|
||||
|
||||
recvProcessThread::recvProcessThread (cac *pcacIn) :
|
||||
thread ( *this, "CAC-recv-process",
|
||||
epicsThreadGetStackSize ( epicsThreadStackSmall ),
|
||||
pcacIn->getInitializingThreadsPriority () ),
|
||||
pcac ( pcacIn ),
|
||||
enableRefCount ( 0u ),
|
||||
blockingForCompletion ( 0u ),
|
||||
processing ( false ),
|
||||
shutDown ( false )
|
||||
{
|
||||
this->thread.start ();
|
||||
}
|
||||
|
||||
recvProcessThread::~recvProcessThread ()
|
||||
{
|
||||
this->shutDown = true;
|
||||
this->recvActivity.signal ();
|
||||
this->exit.wait ();
|
||||
}
|
||||
|
||||
void recvProcessThread::run ()
|
||||
{
|
||||
this->pcac->attachToClientCtx ();
|
||||
|
||||
while ( ! this->shutDown ) {
|
||||
|
||||
{
|
||||
epicsAutoMutex autoMutex ( this->mutex );
|
||||
if ( this->enableRefCount ) {
|
||||
this->processing = true;
|
||||
}
|
||||
}
|
||||
|
||||
if ( this->processing ) {
|
||||
pcac->processRecvBacklog ();
|
||||
}
|
||||
|
||||
bool signalNeeded;
|
||||
{
|
||||
epicsAutoMutex autoMutex ( this->mutex );
|
||||
this->processing = false;
|
||||
signalNeeded = this->blockingForCompletion > 0u;
|
||||
}
|
||||
|
||||
if ( signalNeeded ) {
|
||||
this->processingDone.signal ();
|
||||
}
|
||||
|
||||
this->recvActivity.wait ();
|
||||
}
|
||||
this->exit.signal ();
|
||||
}
|
||||
|
||||
void recvProcessThread::enable ()
|
||||
{
|
||||
unsigned copy;
|
||||
|
||||
{
|
||||
epicsAutoMutex autoMutex ( this->mutex );
|
||||
assert ( this->enableRefCount < UINT_MAX );
|
||||
copy = this->enableRefCount;
|
||||
this->enableRefCount++;
|
||||
}
|
||||
|
||||
if ( copy == 0u ) {
|
||||
this->recvActivity.signal ();
|
||||
}
|
||||
}
|
||||
|
||||
void recvProcessThread::disable ()
|
||||
{
|
||||
bool wakeupNeeded;
|
||||
|
||||
{
|
||||
epicsAutoMutex autoMutex ( this->mutex );
|
||||
|
||||
if ( ! this->processing ) {
|
||||
assert ( this->enableRefCount != 0u );
|
||||
this->enableRefCount--;
|
||||
return;
|
||||
}
|
||||
else {
|
||||
this->blockingForCompletion++;
|
||||
}
|
||||
}
|
||||
|
||||
while ( true ) {
|
||||
this->processingDone.wait ();
|
||||
|
||||
{
|
||||
epicsAutoMutex autoMutex ( this->mutex );
|
||||
|
||||
if ( ! this->processing ) {
|
||||
assert ( this->enableRefCount > 0u );
|
||||
this->enableRefCount--;
|
||||
assert ( this->blockingForCompletion > 0u );
|
||||
this->blockingForCompletion--;
|
||||
wakeupNeeded = this->blockingForCompletion > 0u;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if ( wakeupNeeded ) {
|
||||
this->processingDone.signal ();
|
||||
}
|
||||
}
|
||||
|
||||
void recvProcessThread::signalActivity ()
|
||||
{
|
||||
this->recvActivity.signal ();
|
||||
}
|
||||
|
||||
void recvProcessThread::show ( unsigned level ) const
|
||||
{
|
||||
epicsAutoMutex autoMutex ( this->mutex );
|
||||
::printf ( "CA receive processing thread at %p state=%s\n",
|
||||
static_cast <const void *> ( this ), this->processing ? "busy" : "idle");
|
||||
if ( level > 0u ) {
|
||||
::printf ( "enable count %u\n", this->enableRefCount );
|
||||
::printf ( "blocking for completion count %u\n", this->blockingForCompletion );
|
||||
}
|
||||
if ( level > 1u ) {
|
||||
::printf ( "\tCA client at %p\n", static_cast < void * > ( this->pcac ) );
|
||||
::printf ( "\tshutdown command boolean %u\n", this->shutDown );
|
||||
}
|
||||
if ( level > 2u ) {
|
||||
::printf ( "Receive activity event:\n" );
|
||||
this->recvActivity.show ( level - 3u );
|
||||
::printf ( "exit event:\n" );
|
||||
this->exit.show ( level - 3u );
|
||||
::printf ( "processing done event:\n" );
|
||||
this->processingDone.show ( level - 3u );
|
||||
::printf ( "mutex:\n" );
|
||||
this->mutex.show ( level - 3u );
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user