fixed channel not destroyed if client disconnects during asynchronous
enum string table fetch
This commit is contained in:
@@ -38,9 +38,17 @@ caStatus casAsyncPVAttachIOI::cbFuncAsyncIO (
|
||||
{
|
||||
caStatus status;
|
||||
|
||||
// uninstall here in case the channel is deleted
|
||||
// further down the call stack
|
||||
this->client.uninstallAsynchIO ( *this );
|
||||
|
||||
if ( this->msg.m_cmmd == CA_PROTO_CREATE_CHAN ) {
|
||||
casCtx tmpCtx;
|
||||
tmpCtx.setMsg ( this->msg, 0 );
|
||||
tmpCtx.setServer ( & this->client.getCAS() );
|
||||
tmpCtx.setClient ( & this->client );
|
||||
status = this->client.createChanResponse ( guard,
|
||||
this->msg, this->retVal );
|
||||
tmpCtx, this->retVal );
|
||||
}
|
||||
else {
|
||||
errPrintf ( S_cas_invalidAsynchIO, __FILE__, __LINE__,
|
||||
@@ -48,8 +56,8 @@ caStatus casAsyncPVAttachIOI::cbFuncAsyncIO (
|
||||
status = S_cas_invalidAsynchIO;
|
||||
}
|
||||
|
||||
if ( status != S_cas_sendBlocked ) {
|
||||
this->client.uninstallAsynchIO ( *this );
|
||||
if ( status == S_cas_sendBlocked ) {
|
||||
this->client.installAsynchIO ( *this );
|
||||
}
|
||||
|
||||
return status;
|
||||
|
||||
@@ -23,8 +23,7 @@ casAsyncReadIOI::casAsyncReadIOI (
|
||||
casAsyncReadIO & intf, const casCtx & ctx ) :
|
||||
casAsyncIOI ( ctx ), msg ( *ctx.getMsg() ),
|
||||
asyncReadIO ( intf ), chan ( *ctx.getChannel () ),
|
||||
pDD ( NULL ), completionStatus ( S_cas_internal ),
|
||||
createChannelWasSuccessful ( false )
|
||||
pDD ( NULL ), completionStatus ( S_cas_internal )
|
||||
{
|
||||
this->chan.installIO ( *this );
|
||||
}
|
||||
@@ -32,10 +31,6 @@ casAsyncReadIOI::casAsyncReadIOI (
|
||||
casAsyncReadIOI::~casAsyncReadIOI ()
|
||||
{
|
||||
this->asyncReadIO.serverInitiatedDestroy ();
|
||||
if ( this->msg.m_cmmd == CA_PROTO_CREATE_CHAN &&
|
||||
! this->createChannelWasSuccessful ) {
|
||||
delete & this->chan;
|
||||
}
|
||||
}
|
||||
|
||||
caStatus casAsyncReadIOI::postIOCompletion (
|
||||
@@ -54,7 +49,11 @@ bool casAsyncReadIOI::oneShotReadOP () const
|
||||
caStatus casAsyncReadIOI::cbFuncAsyncIO (
|
||||
epicsGuard < casClientMutex > & guard )
|
||||
{
|
||||
caStatus status;
|
||||
caStatus status;
|
||||
|
||||
// uninstall the io early on to prevent a channel delete from
|
||||
// destroying this object twice
|
||||
this->chan.uninstallIO ( *this );
|
||||
|
||||
switch ( this->msg.m_cmmd ) {
|
||||
case CA_PROTO_READ:
|
||||
@@ -76,19 +75,11 @@ caStatus casAsyncReadIOI::cbFuncAsyncIO (
|
||||
break;
|
||||
|
||||
case CA_PROTO_CREATE_CHAN:
|
||||
unsigned nativeTypeDBR;
|
||||
status = this->chan.getPVI().bestDBRType ( nativeTypeDBR );
|
||||
if ( status ) {
|
||||
errMessage ( status, "best external dbr type fetch failed" );
|
||||
status = client.channelCreateFailedResp (
|
||||
guard, this->msg, status );
|
||||
if ( status != S_cas_sendBlocked ) {
|
||||
delete & this->chan;
|
||||
}
|
||||
}
|
||||
else {
|
||||
// we end up here if the channel claim protocol response is delayed
|
||||
// by an asynchronous enum string table fetch response
|
||||
// we end up here if the channel claim protocol response is delayed
|
||||
// by an asynchronous enum string table fetch response
|
||||
status = client.enumPostponedCreateChanResponse (
|
||||
guard, this->chan, this->msg );
|
||||
if ( status == S_cas_success ) {
|
||||
if ( this->completionStatus == S_cas_success && this->pDD.valid() ) {
|
||||
this->chan.getPVI().updateEnumStringTableAsyncCompletion ( *this->pDD );
|
||||
}
|
||||
@@ -97,10 +88,6 @@ caStatus casAsyncReadIOI::cbFuncAsyncIO (
|
||||
"unable to read application type \"enums\" string"
|
||||
" conversion table for enumerated PV" );
|
||||
}
|
||||
status = client.enumPostponedCreateChanResponse (
|
||||
guard, this->chan, this->msg, nativeTypeDBR );
|
||||
this->createChannelWasSuccessful =
|
||||
( status == S_cas_success );
|
||||
}
|
||||
break;
|
||||
|
||||
@@ -111,8 +98,8 @@ caStatus casAsyncReadIOI::cbFuncAsyncIO (
|
||||
break;
|
||||
}
|
||||
|
||||
if ( status != S_cas_sendBlocked ) {
|
||||
this->chan.uninstallIO ( *this );
|
||||
if ( status == S_cas_sendBlocked ) {
|
||||
this->chan.installIO ( *this );
|
||||
}
|
||||
|
||||
return status;
|
||||
|
||||
@@ -37,7 +37,6 @@ private:
|
||||
class casChannelI & chan;
|
||||
smartConstGDDPointer pDD;
|
||||
caStatus completionStatus;
|
||||
bool createChannelWasSuccessful;
|
||||
epicsShareFunc bool oneShotReadOP () const;
|
||||
epicsShareFunc caStatus cbFuncAsyncIO (
|
||||
epicsGuard < casClientMutex > & );
|
||||
|
||||
@@ -72,7 +72,7 @@ caStatus casCoreClient::asyncSearchResponse (
|
||||
}
|
||||
caStatus casCoreClient::createChanResponse (
|
||||
epicsGuard < casClientMutex > &,
|
||||
const caHdrLargeArray &, const pvAttachReturn & )
|
||||
casCtx &, const pvAttachReturn & )
|
||||
{
|
||||
return S_casApp_noSupport;
|
||||
}
|
||||
@@ -113,7 +113,7 @@ caStatus casCoreClient::accessRightsResponse (
|
||||
}
|
||||
caStatus casCoreClient::enumPostponedCreateChanResponse (
|
||||
epicsGuard < casClientMutex > &, casChannelI &,
|
||||
const caHdrLargeArray &, unsigned )
|
||||
const caHdrLargeArray & )
|
||||
{
|
||||
return S_casApp_noSupport;
|
||||
}
|
||||
|
||||
@@ -57,7 +57,7 @@ public:
|
||||
ca_uint16_t protocolRevision, ca_uint32_t sequenceNumber );
|
||||
virtual caStatus createChanResponse (
|
||||
epicsGuard < casClientMutex > &,
|
||||
const caHdrLargeArray &, const pvAttachReturn &);
|
||||
casCtx &, const pvAttachReturn &);
|
||||
virtual caStatus readResponse (
|
||||
epicsGuard < casClientMutex > &,
|
||||
casChannelI *, const caHdrLargeArray &,
|
||||
@@ -80,8 +80,7 @@ public:
|
||||
epicsGuard < casClientMutex > &, casChannelI * );
|
||||
virtual caStatus enumPostponedCreateChanResponse (
|
||||
epicsGuard < casClientMutex > &,
|
||||
casChannelI &, const caHdrLargeArray &,
|
||||
unsigned dbrType );
|
||||
casChannelI &, const caHdrLargeArray & );
|
||||
virtual caStatus channelCreateFailedResp (
|
||||
epicsGuard < casClientMutex > &,
|
||||
const caHdrLargeArray &, const caStatus createStatus );
|
||||
|
||||
@@ -539,6 +539,8 @@ caStatus casStrmClient::readNotifyAction ( epicsGuard < casClientMutex > & guard
|
||||
return this->readNotifyFailureResponse ( guard, * mp, status );
|
||||
}
|
||||
|
||||
this->ctx.setChannel ( pChan );
|
||||
|
||||
//
|
||||
// verify read access
|
||||
//
|
||||
@@ -1229,7 +1231,8 @@ caStatus casStrmClient::claimChannelAction (
|
||||
status = S_cas_success;
|
||||
}
|
||||
else if ( pvar.getStatus() == S_casApp_asyncCompletion ) {
|
||||
status = this->createChanResponse ( guard, *mp, S_cas_badParameter );
|
||||
status = this->createChanResponse ( guard,
|
||||
this->ctx, S_cas_badParameter );
|
||||
errMessage ( S_cas_badParameter,
|
||||
"- expected asynch IO creation from caServer::pvAttach()" );
|
||||
}
|
||||
@@ -1238,7 +1241,7 @@ caStatus casStrmClient::claimChannelAction (
|
||||
this->ctx.getServer()->addItemToIOBLockedList ( *this );
|
||||
}
|
||||
else {
|
||||
status = this->createChanResponse ( guard, *mp, pvar );
|
||||
status = this->createChanResponse ( guard, this->ctx, pvar );
|
||||
}
|
||||
return status;
|
||||
}
|
||||
@@ -1248,9 +1251,10 @@ caStatus casStrmClient::claimChannelAction (
|
||||
//
|
||||
caStatus casStrmClient::createChanResponse (
|
||||
epicsGuard < casClientMutex > & guard,
|
||||
const caHdrLargeArray & hdr,
|
||||
const pvAttachReturn & pvar )
|
||||
casCtx & ctxIn, const pvAttachReturn & pvar )
|
||||
{
|
||||
const caHdrLargeArray & hdr = *ctxIn.getMsg();
|
||||
|
||||
if ( pvar.getStatus() != S_cas_success ) {
|
||||
return this->channelCreateFailedResp ( guard,
|
||||
hdr, pvar.getStatus() );
|
||||
@@ -1296,11 +1300,11 @@ caStatus casStrmClient::createChanResponse (
|
||||
|
||||
//
|
||||
// create server tool XXX derived from casChannel
|
||||
// (use temp context because this can be called asynchronously)
|
||||
//
|
||||
casChannel * pChan = pvar.getPV()->pPVI->createChannel (
|
||||
this->ctx, this->pUserName, this->pHostName );
|
||||
ctxIn, this->pUserName, this->pHostName );
|
||||
if ( ! pChan ) {
|
||||
pvar.getPV()->pPVI->deleteSignal();
|
||||
return this->channelCreateFailedResp (
|
||||
guard, hdr, S_cas_noMemory );
|
||||
}
|
||||
@@ -1330,6 +1334,18 @@ caStatus casStrmClient::createChanResponse (
|
||||
}
|
||||
}
|
||||
|
||||
//
|
||||
// Install the channel now so that the server will
|
||||
// clean up properly if the client disconnects
|
||||
// while an asynchronous IO fetching the enum
|
||||
// string table is outstanding
|
||||
//
|
||||
this->chanTable.add ( *pChan->pChanI );
|
||||
this->chanList.add ( *pChan->pChanI );
|
||||
pChan->pChanI->installIntoPV ();
|
||||
|
||||
assert ( hdr.m_cid == pChan->pChanI->getCID() );
|
||||
|
||||
//
|
||||
// check to see if the enum table is empty and therefore
|
||||
// an update is needed every time that a PV attaches
|
||||
@@ -1337,10 +1353,10 @@ caStatus casStrmClient::createChanResponse (
|
||||
// an asynchronous IO to get the table completed
|
||||
//
|
||||
if ( nativeTypeDBR == DBR_ENUM ) {
|
||||
this->ctx.setPV ( pvar.getPV()->pPVI );
|
||||
this->ctx.setChannel ( pChan->pChanI );
|
||||
ctxIn.setChannel ( pChan->pChanI );
|
||||
ctxIn.setPV ( pvar.getPV()->pPVI );
|
||||
this->userStartedAsyncIO = false;
|
||||
status = pvar.getPV()->pPVI->updateEnumStringTable ( this->ctx );
|
||||
status = pvar.getPV()->pPVI->updateEnumStringTable ( ctxIn );
|
||||
if ( this->userStartedAsyncIO ) {
|
||||
if ( status != S_casApp_asyncCompletion ) {
|
||||
fprintf ( stderr,
|
||||
@@ -1349,31 +1365,40 @@ caStatus casStrmClient::createChanResponse (
|
||||
}
|
||||
status = S_cas_success;
|
||||
}
|
||||
else if ( status == S_casApp_success ) {
|
||||
status = enumPostponedCreateChanResponse (
|
||||
guard, * pChan->pChanI, hdr, nativeTypeDBR );
|
||||
else if ( status == S_cas_success ) {
|
||||
status = privateCreateChanResponse (
|
||||
guard, * pChan->pChanI, hdr, nativeTypeDBR );
|
||||
}
|
||||
else if ( status == S_casApp_asyncCompletion ) {
|
||||
status = S_cas_badParameter;
|
||||
errMessage ( status,
|
||||
"- asynch IO creation status returned, but async IO not started?");
|
||||
}
|
||||
else if ( status == S_casApp_postponeAsyncIO ) {
|
||||
errlogPrintf ( "The server library does not currently support postponment of " );
|
||||
errlogPrintf ( "string table cache update of casChannel::read()." );
|
||||
errlogPrintf ( "To postpone this request please postpone the PC attach IO request." );
|
||||
errlogPrintf ( "String table cache update did not occur." );
|
||||
status = enumPostponedCreateChanResponse (
|
||||
else {
|
||||
if ( status == S_casApp_asyncCompletion ) {
|
||||
errMessage ( status,
|
||||
"- enum string tbl cache read returned asynch IO creation, but async IO not started?");
|
||||
}
|
||||
else if ( status == S_casApp_postponeAsyncIO ) {
|
||||
errMessage ( status, "- enum string tbl cache read ASYNC IO postponed ?");
|
||||
errlogPrintf ( "The server library does not currently support postponment of\n" );
|
||||
errlogPrintf ( "string table cache update of casChannel::read().\n" );
|
||||
errlogPrintf ( "To postpone this request please postpone the PC attach IO request.\n" );
|
||||
errlogPrintf ( "String table cache update did not occur.\n" );
|
||||
}
|
||||
else {
|
||||
errMessage ( status, "- enum string tbl cache read failed ?");
|
||||
}
|
||||
status = privateCreateChanResponse (
|
||||
guard, * pChan->pChanI, hdr, nativeTypeDBR );
|
||||
}
|
||||
}
|
||||
else {
|
||||
status = enumPostponedCreateChanResponse (
|
||||
status = privateCreateChanResponse (
|
||||
guard, * pChan->pChanI, hdr, nativeTypeDBR );
|
||||
}
|
||||
|
||||
if ( status != S_cas_success ) {
|
||||
delete ctx.getChannel();
|
||||
this->chanTable.remove ( *pChan->pChanI );
|
||||
this->chanList.remove ( *pChan->pChanI );
|
||||
pChan->pChanI->uninstallFromPV ( this->eventSys );
|
||||
pChan->getPV()->pPVI->deleteSignal ();
|
||||
delete pChan->pChanI;
|
||||
}
|
||||
|
||||
return status;
|
||||
@@ -1385,8 +1410,29 @@ caStatus casStrmClient::createChanResponse (
|
||||
// LOCK must be applied
|
||||
//
|
||||
caStatus casStrmClient::enumPostponedCreateChanResponse (
|
||||
epicsGuard < casClientMutex > & guard, casChannelI & chan,
|
||||
const caHdrLargeArray & hdr )
|
||||
{
|
||||
caStatus status = this->privateCreateChanResponse (
|
||||
guard, chan, hdr, DBR_ENUM );
|
||||
if ( status != S_cas_success ) {
|
||||
if ( status != S_cas_sendBlocked ) {
|
||||
this->chanTable.remove ( chan );
|
||||
this->chanList.remove ( chan );
|
||||
chan.uninstallFromPV ( this->eventSys );
|
||||
delete & chan;
|
||||
}
|
||||
}
|
||||
return status;
|
||||
}
|
||||
|
||||
//
|
||||
// privateCreateChanResponse
|
||||
//
|
||||
caStatus casStrmClient::privateCreateChanResponse (
|
||||
epicsGuard < casClientMutex > & guard,
|
||||
casChannelI & chan, const caHdrLargeArray & hdr, unsigned nativeTypeDBR )
|
||||
casChannelI & chan, const caHdrLargeArray & hdr,
|
||||
unsigned nativeTypeDBR )
|
||||
{
|
||||
//
|
||||
// We are allocating enough space for both the claim
|
||||
@@ -1413,18 +1459,15 @@ caStatus casStrmClient::enumPostponedCreateChanResponse (
|
||||
this->out.popCtx ( outctx );
|
||||
errMessage ( status, "incomplete channel create?" );
|
||||
status = this->channelCreateFailedResp ( guard, hdr, status );
|
||||
if ( status == S_cas_success ) {
|
||||
if ( status != S_cas_sendBlocked ) {
|
||||
this->chanTable.remove ( chan );
|
||||
this->chanList.remove ( chan );
|
||||
chan.uninstallFromPV ( this->eventSys );
|
||||
delete & chan;
|
||||
}
|
||||
return status;
|
||||
}
|
||||
|
||||
// must install into server table before using server id
|
||||
// member of channel
|
||||
this->chanTable.add ( chan );
|
||||
this->chanList.add ( chan );
|
||||
chan.installIntoPV ();
|
||||
|
||||
//
|
||||
// We are allocated enough space for both the claim
|
||||
// response and the access response so that we know for
|
||||
@@ -1437,16 +1480,16 @@ caStatus casStrmClient::enumPostponedCreateChanResponse (
|
||||
assert ( nativeTypeDBR <= 0xffff );
|
||||
aitIndex nativeCount = chan.getPVI().nativeCount();
|
||||
assert ( nativeCount <= 0xffffffff );
|
||||
assert ( hdr.m_cid == chan.getCID() );
|
||||
status = this->out.copyInHeader ( CA_PROTO_CREATE_CHAN, 0,
|
||||
static_cast <ca_uint16_t> ( nativeTypeDBR ),
|
||||
static_cast <ca_uint32_t> ( nativeCount ), // X aCC 392
|
||||
hdr.m_cid, chan.getSID(), 0 );
|
||||
chan.getCID(), chan.getSID(), 0 );
|
||||
if ( status != S_cas_success ) {
|
||||
|
||||
this->out.popCtx ( outctx );
|
||||
errMessage ( status, "incomplete channel create?" );
|
||||
status = this->channelCreateFailedResp ( guard, hdr, status );
|
||||
if ( status == S_cas_success ) {
|
||||
if ( status != S_cas_sendBlocked ) {
|
||||
this->chanTable.remove ( chan );
|
||||
this->chanList.remove ( chan );
|
||||
chan.uninstallFromPV ( this->eventSys );
|
||||
|
||||
@@ -109,7 +109,7 @@ private:
|
||||
// asynchronous completion
|
||||
//
|
||||
caStatus createChanResponse ( epicsGuard < casClientMutex > &,
|
||||
const caHdrLargeArray &, const pvAttachReturn & );
|
||||
casCtx &, const pvAttachReturn & );
|
||||
caStatus readResponse ( epicsGuard < casClientMutex > &,
|
||||
casChannelI * pChan, const caHdrLargeArray & msg,
|
||||
const gdd & desc, const caStatus status );
|
||||
@@ -124,6 +124,8 @@ private:
|
||||
casChannelI & chan, const caHdrLargeArray & msg,
|
||||
const gdd & desc, const caStatus status );
|
||||
caStatus enumPostponedCreateChanResponse ( epicsGuard < casClientMutex > &,
|
||||
casChannelI & chan, const caHdrLargeArray & hdr );
|
||||
caStatus privateCreateChanResponse ( epicsGuard < casClientMutex > &,
|
||||
casChannelI & chan, const caHdrLargeArray & hdr, unsigned dbrType );
|
||||
caStatus channelCreateFailedResp ( epicsGuard < casClientMutex > &,
|
||||
const caHdrLargeArray &, const caStatus createStatus );
|
||||
|
||||
Reference in New Issue
Block a user