fixed subscription install race

This commit is contained in:
Jeff Hill
2001-03-08 21:29:04 +00:00
parent 6447488b9f
commit 994f1a9b0c
3 changed files with 44 additions and 36 deletions
+11 -8
View File
@@ -444,17 +444,20 @@ int nciu::subscribe ( unsigned type, unsigned long nElem,
unsigned mask, cacNotify &notify,
cacNotifyIO *&pNotifyIO )
{
if ( INVALID_DB_REQ (type) ) {
return ECA_BADTYPE;
}
if ( mask > 0xffff || mask == 0u ) {
return ECA_BADMASK;
}
netSubscription *pSubcr = new netSubscription ( *this,
type, nElem, mask, notify );
if ( pSubcr ) {
int status = this->piiu->installSubscription ( *pSubcr );
if ( status != ECA_NORMAL ) {
delete static_cast < baseNMIU * > ( pSubcr );
}
else {
pNotifyIO = pSubcr;
}
return status;
this->piiu->installSubscription ( *pSubcr );
pNotifyIO = pSubcr;
return ECA_NORMAL;;
}
else {
return ECA_ALLOCMEM;
+7 -8
View File
@@ -206,27 +206,26 @@ int netiiu::clearChannelRequest ( nciu & )
return ECA_DISCONNCHID;
}
int netiiu::subscriptionRequest ( netSubscription &, bool )
void netiiu::subscriptionRequest ( netSubscription &, bool )
{
return ECA_NORMAL;
}
void netiiu::subscriptionCancelRequest ( netSubscription &, bool )
{
}
int netiiu::installSubscription ( netSubscription &subscr )
void netiiu::installSubscription ( netSubscription &subscr )
{
bool connectedWhenInstalled;
{
epicsAutoMutex autoMutex ( this->mutex );
subscr.channel ().tcpiiuPrivateListOfIO::eventq.add ( subscr );
connectedWhenInstalled = subscr.channel ().connected ();
}
int status = this->subscriptionRequest ( subscr, true );
if ( status != ECA_NORMAL ) {
epicsAutoMutex autoMutex ( this->mutex );
subscr.channel ().tcpiiuPrivateListOfIO::eventq.remove ( subscr );
// iiu pointer briefly points at tcpiiu before the channel is connected
if ( connectedWhenInstalled ) {
this->subscriptionRequest ( subscr, true );
}
return status;
}
void netiiu::hostName ( char *pBuf, unsigned bufLength ) const
+26 -20
View File
@@ -335,7 +335,7 @@ extern "C" void cacRecvThreadTCP ( void *pParam )
//
// tcpiiu::tcpiiu ()
//
tcpiiu::tcpiiu ( cac &cac, double connectionTimeout, osiTimerQueue &timerQueue ) :
tcpiiu::tcpiiu ( cac &cac, double connectionTimeout, epicsTimerQueue &timerQueue ) :
netiiu ( &cac ),
recvDog ( *this, connectionTimeout, timerQueue ),
sendDog ( *this, connectionTimeout, timerQueue ),
@@ -1666,20 +1666,26 @@ int tcpiiu::clearChannelRequest ( nciu &chan )
return status;
}
int tcpiiu::subscriptionRequest ( netSubscription &subscr, bool userThread )
//
// this routine return void because if this internally fails the best response
// is to try again the next time that we reconnect
//
void tcpiiu::subscriptionRequest ( netSubscription & subscr, bool userThread )
{
if ( subscr.getType() > 0xffff ) {
this->pCAC () -> printf ( "CAC: subscriptionRequest() ignored because of unexpected bad type that was checked earlier\n" );
return;
}
if ( subscr.getMask() > 0xffff || subscr.getMask () == 0u ) {
this->pCAC () -> printf ( "CAC: subscriptionRequest() ignored because of unexpected bad mask that was checked earlier\n" );
return;
}
unsigned long count = subscr.getCount ();
if ( count == 0u || count > 0xffff ) {
return ECA_BADCOUNT;
}
if ( subscr.getType () > 0xffff ) {
return ECA_BADTYPE;
}
if ( subscr.getMask () > 0xffff ) {
return ECA_BADMASK;
this->pCAC () -> printf ( "CAC: subscriptionRequest() ignored because of unexpected bad count that was checked earlier\n" );
return;
}
if ( this->sendQue.flushThreshold ( 32u ) ) {
@@ -1694,18 +1700,18 @@ int tcpiiu::subscriptionRequest ( netSubscription &subscr, bool userThread )
epicsAutoMutex autoMutex ( this->mutex );
int status;
if ( subscr.channel ().verifyConnected ( *this ) ) {
if ( subscr.channel().verifyConnected ( *this ) ) {
status = this->sendQue.reserveSpace ( 32u );
if ( status == ECA_NORMAL ) {
this->ioTable.add ( subscr );
this->ioTable . add ( subscr );
// header
this->sendQue.pushUInt16 ( CA_PROTO_EVENT_ADD ); // cmd
this->sendQue.pushUInt16 ( 16u ); // postsize
this->sendQue.pushUInt16 ( static_cast < ca_uint16_t > ( subscr.getType () ) ); // dataType
this->sendQue.pushUInt16 ( static_cast < ca_uint16_t > ( count ) ); // count
this->sendQue.pushUInt32 ( subscr.channel ().getSID () ); // cid
this->sendQue.pushUInt32 ( subscr.getID () ); // available
this->sendQue.pushUInt32 ( subscr . channel () . getSID () ); // cid
this->sendQue.pushUInt32 ( subscr . getID () ); // available
// extension
this->sendQue.pushFloat32 ( 0.0 ); // m_lval
@@ -1716,15 +1722,15 @@ int tcpiiu::subscriptionRequest ( netSubscription &subscr, bool userThread )
}
}
else {
status = ECA_NORMAL;
this->pCAC () -> printf ( "CAC: subscriptionRequest() ignored because of insufficient memory\n" );
}
return status;
return;
}
void tcpiiu::subscriptionCancelRequest ( netSubscription &subscr, bool userThread )
{
if ( this->sendQue.flushThreshold ( 16u ) ) {
if ( this->sendQue.flushThreshold(16u) ) {
if ( userThread ) {
this->threadContextSensitiveFlushToWire ( true );
}
@@ -1737,7 +1743,7 @@ void tcpiiu::subscriptionCancelRequest ( netSubscription &subscr, bool userThrea
int status = this->sendQue.reserveSpace ( 16u );
if ( status == ECA_NORMAL ) {
if ( subscr.channel ().verifyConnected ( *this ) ) {
if ( subscr.channel().verifyConnected(*this) ) {
this->sendQue.pushUInt16 ( CA_PROTO_EVENT_CANCEL ); // cmd
this->sendQue.pushUInt16 ( 0u ); // postsize
this->sendQue.pushUInt16 ( static_cast < ca_uint16_t > ( subscr.getType () ) ); // dataType