o reset pendingResponseStatus, reqPayloadNeedsByteSwap,
responseIsPending, and pValueRead whenever removing a message from the stream o eliminated error prone reuse of status variables o dont allow service to postpone IO when no IO is pending against the target o be extra careful not to do things twice when send blocked or IO postponed
This commit is contained in:
@@ -83,7 +83,7 @@ casStrmClient::casStrmClient ( caServerI & cas, clientBufMemoryManager & mgrIn )
|
||||
incommingBytesToDrain ( 0 ),
|
||||
pendingResponseStatus ( S_cas_success ),
|
||||
minor_version_number ( 0 ),
|
||||
payloadNeedsByteSwap ( true ),
|
||||
reqPayloadNeedsByteSwap ( true ),
|
||||
responseIsPending ( false )
|
||||
{
|
||||
this->pHostName = new char [1u];
|
||||
@@ -255,6 +255,10 @@ caStatus casStrmClient :: processMsg ()
|
||||
break;
|
||||
}
|
||||
this->in.removeMsg ( msgSize );
|
||||
this->pendingResponseStatus = S_cas_success;
|
||||
this->reqPayloadNeedsByteSwap = true;
|
||||
this->responseIsPending = false;
|
||||
this->pValueRead.set ( 0 );
|
||||
}
|
||||
}
|
||||
catch ( std::bad_alloc & ) {
|
||||
@@ -439,18 +443,38 @@ void casStrmClient::show ( unsigned level ) const
|
||||
caStatus casStrmClient::readAction ( epicsGuard < casClientMutex > & guard )
|
||||
{
|
||||
const caHdrLargeArray * mp = this->ctx.getMsg();
|
||||
caStatus status;
|
||||
casChannelI * pChan;
|
||||
|
||||
status = this->verifyRequest ( pChan );
|
||||
if ( status != ECA_NORMAL ) {
|
||||
if ( pChan ) {
|
||||
return this->sendErr ( guard, mp, pChan->getCID(),
|
||||
status, "get request" );
|
||||
{
|
||||
caStatus status = this->verifyRequest ( pChan );
|
||||
if ( status != ECA_NORMAL ) {
|
||||
if ( pChan ) {
|
||||
return this->sendErr ( guard, mp, pChan->getCID(),
|
||||
status, "get request" );
|
||||
}
|
||||
else {
|
||||
return this->sendErr ( guard, mp, invalidResID,
|
||||
status, "get request" );
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// dont allow a request that completed with the service in the
|
||||
// past, but was incomplete because no response was sent to be
|
||||
// executed twice with the service
|
||||
if ( this->responseIsPending ) {
|
||||
// dont read twice if we didnt finish in the past
|
||||
// because we were send blocked
|
||||
if ( this->pendingResponseStatus == S_cas_success ) {
|
||||
assert ( pValueRead.valid () );
|
||||
return this->readResponse ( guard, pChan, *mp,
|
||||
*pValueRead, S_cas_success );
|
||||
}
|
||||
else {
|
||||
return this->sendErr ( guard, mp, invalidResID,
|
||||
status, "get request" );
|
||||
return this->sendErrWithEpicsStatus (
|
||||
guard, mp, pChan->getCID(),
|
||||
this->pendingResponseStatus,
|
||||
ECA_GETFAIL );
|
||||
}
|
||||
}
|
||||
|
||||
@@ -458,38 +482,43 @@ caStatus casStrmClient::readAction ( epicsGuard < casClientMutex > & guard )
|
||||
* verify read access
|
||||
*/
|
||||
if ( ! pChan->readAccess() ) {
|
||||
int v41;
|
||||
|
||||
v41 = CA_V41 ( this->minor_version_number );
|
||||
int v41 = CA_V41 ( this->minor_version_number );
|
||||
int cacStatus;
|
||||
if ( v41 ) {
|
||||
status = ECA_NORDACCESS;
|
||||
cacStatus = ECA_NORDACCESS;
|
||||
}
|
||||
else{
|
||||
status = ECA_GETFAIL;
|
||||
cacStatus = ECA_GETFAIL;
|
||||
}
|
||||
|
||||
return this->sendErr ( guard, mp, pChan->getCID(),
|
||||
status, "read access denied" );
|
||||
cacStatus, "read access denied" );
|
||||
}
|
||||
|
||||
const gdd * pDesc = 0;
|
||||
status = this->read ( pDesc );
|
||||
if ( status == S_casApp_success ) {
|
||||
status = this->readResponse ( guard, pChan, *mp, *pDesc, S_cas_success );
|
||||
pDesc->unreference ();
|
||||
}
|
||||
else if ( status == S_casApp_asyncCompletion ) {
|
||||
status = S_cas_success;
|
||||
}
|
||||
else if ( status == S_casApp_postponeAsyncIO ) {
|
||||
pChan->getPVI().addItemToIOBLockedList ( *this );
|
||||
}
|
||||
else {
|
||||
status = this->sendErrWithEpicsStatus ( guard, mp,
|
||||
pChan->getCID(), status, ECA_GETFAIL );
|
||||
}
|
||||
|
||||
return status;
|
||||
{
|
||||
caStatus servStat = this->read ();
|
||||
if ( servStat == S_casApp_success ) {
|
||||
assert ( pValueRead.valid () );
|
||||
caStatus status = this->readResponse ( guard, pChan, *mp,
|
||||
*pValueRead, S_cas_success );
|
||||
this->responseIsPending = ( status != S_cas_success );
|
||||
return status;
|
||||
}
|
||||
else if ( servStat == S_casApp_asyncCompletion ) {
|
||||
return S_cas_success;
|
||||
}
|
||||
else if ( servStat == S_casApp_postponeAsyncIO ) {
|
||||
return S_casApp_postponeAsyncIO;
|
||||
}
|
||||
else {
|
||||
caStatus status = this->sendErrWithEpicsStatus ( guard, mp,
|
||||
pChan->getCID(), servStat, ECA_GETFAIL );
|
||||
if ( status != S_cas_success ) {
|
||||
this->pendingResponseStatus = servStat;
|
||||
this->responseIsPending = true;
|
||||
}
|
||||
return status;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//
|
||||
@@ -556,15 +585,31 @@ caStatus casStrmClient::readNotifyAction ( epicsGuard < casClientMutex > & guard
|
||||
{
|
||||
const caHdrLargeArray * mp = this->ctx.getMsg();
|
||||
casChannelI * pChan;
|
||||
int status;
|
||||
|
||||
status = this->verifyRequest ( pChan );
|
||||
if ( status != ECA_NORMAL ) {
|
||||
return this->readNotifyFailureResponse ( guard, * mp, status );
|
||||
{
|
||||
caStatus status = this->verifyRequest ( pChan );
|
||||
if ( status != ECA_NORMAL ) {
|
||||
return this->readNotifyFailureResponse ( guard, * mp, status );
|
||||
}
|
||||
}
|
||||
|
||||
// dont allow a request that completed with the service in the
|
||||
// past, but was incomplete because no response was sent to be
|
||||
// executed twice with the service
|
||||
if ( this->responseIsPending ) {
|
||||
// dont read twice if we didnt finish in the past
|
||||
// because we were send blocked
|
||||
if ( this->pendingResponseStatus == S_cas_success ) {
|
||||
assert ( pValueRead.valid () );
|
||||
return this->readNotifyResponse ( guard, pChan, *mp,
|
||||
*pValueRead, S_cas_success );
|
||||
}
|
||||
else {
|
||||
return this->readNotifyFailureResponse (
|
||||
guard, *mp, ECA_GETFAIL );
|
||||
}
|
||||
}
|
||||
|
||||
this->ctx.setChannel ( pChan );
|
||||
|
||||
//
|
||||
// verify read access
|
||||
//
|
||||
@@ -572,23 +617,32 @@ caStatus casStrmClient::readNotifyAction ( epicsGuard < casClientMutex > & guard
|
||||
return this->readNotifyFailureResponse ( guard, *mp, ECA_NORDACCESS );
|
||||
}
|
||||
|
||||
const gdd * pDesc = 0;
|
||||
status = this->read ( pDesc );
|
||||
if ( status == S_casApp_success ) {
|
||||
status = this->readNotifyResponse ( guard, pChan, *mp, *pDesc, status );
|
||||
pDesc->unreference ();
|
||||
}
|
||||
else if ( status == S_casApp_asyncCompletion ) {
|
||||
status = S_cas_success;
|
||||
}
|
||||
else if ( status == S_casApp_postponeAsyncIO ) {
|
||||
pChan->getPVI().addItemToIOBLockedList ( *this );
|
||||
}
|
||||
else {
|
||||
status = this->readNotifyFailureResponse ( guard, *mp, ECA_GETFAIL );
|
||||
}
|
||||
|
||||
return status;
|
||||
{
|
||||
caStatus servStat = this->read ();
|
||||
if ( servStat == S_casApp_success ) {
|
||||
assert ( pValueRead.valid () );
|
||||
caStatus status = this->readNotifyResponse (
|
||||
guard, pChan, *mp,
|
||||
*pValueRead, servStat );
|
||||
this->responseIsPending = ( status != S_cas_success );
|
||||
return status;
|
||||
}
|
||||
else if ( servStat == S_casApp_asyncCompletion ) {
|
||||
return S_cas_success;
|
||||
}
|
||||
else if ( servStat == S_casApp_postponeAsyncIO ) {
|
||||
return S_casApp_postponeAsyncIO;
|
||||
}
|
||||
else {
|
||||
caStatus status = this->readNotifyFailureResponse (
|
||||
guard, *mp, ECA_GETFAIL );
|
||||
if ( status != S_cas_success ) {
|
||||
this->pendingResponseStatus = servStat;
|
||||
this->responseIsPending = true;
|
||||
}
|
||||
return status;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//
|
||||
@@ -924,31 +978,29 @@ caStatus casStrmClient ::
|
||||
caStatus casStrmClient::writeAction ( epicsGuard < casClientMutex > & guard )
|
||||
{
|
||||
const caHdrLargeArray *mp = this->ctx.getMsg();
|
||||
caStatus status;
|
||||
casChannelI *pChan;
|
||||
|
||||
status = this->verifyRequest ( pChan );
|
||||
if (status != ECA_NORMAL) {
|
||||
if ( pChan ) {
|
||||
return this->sendErr ( guard, mp, pChan->getCID(),
|
||||
status, "get request" );
|
||||
}
|
||||
else {
|
||||
return this->sendErr ( guard, mp, invalidResID,
|
||||
status, "get request" );
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
caStatus status = this->verifyRequest ( pChan );
|
||||
if (status != ECA_NORMAL) {
|
||||
if ( pChan ) {
|
||||
return this->sendErr ( guard, mp, pChan->getCID(),
|
||||
status, "get request" );
|
||||
}
|
||||
else {
|
||||
return this->sendErr ( guard, mp, invalidResID,
|
||||
status, "get request" );
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// dont allow a request that completed with the service in the
|
||||
// past, but was incomplete because no response was sent be
|
||||
// executed twice with the service
|
||||
if ( this->responseIsPending ) {
|
||||
status = this->writeActionSendFailureStatus ( guard, *mp,
|
||||
pChan->getCID(), this->pendingResponseStatus );
|
||||
if ( status == S_cas_success ) {
|
||||
this->pendingResponseStatus = S_cas_success;
|
||||
this->responseIsPending = false;
|
||||
}
|
||||
caStatus status = this->writeActionSendFailureStatus (
|
||||
guard, *mp, pChan->getCID(),
|
||||
this->pendingResponseStatus );
|
||||
return status;
|
||||
}
|
||||
|
||||
@@ -956,16 +1008,14 @@ caStatus casStrmClient::writeAction ( epicsGuard < casClientMutex > & guard )
|
||||
// verify write access
|
||||
//
|
||||
if ( ! pChan->writeAccess() ) {
|
||||
int v41;
|
||||
|
||||
v41 = CA_V41 ( this->minor_version_number );
|
||||
caStatus status;
|
||||
int v41 = CA_V41 ( this->minor_version_number );
|
||||
if (v41) {
|
||||
status = ECA_NOWTACCESS;
|
||||
}
|
||||
else{
|
||||
status = ECA_PUTFAIL;
|
||||
}
|
||||
|
||||
return this->sendErr ( guard, mp, pChan->getCID(),
|
||||
status, "write access denied");
|
||||
}
|
||||
@@ -973,27 +1023,30 @@ caStatus casStrmClient::writeAction ( epicsGuard < casClientMutex > & guard )
|
||||
//
|
||||
// initiate the write operation
|
||||
//
|
||||
status = this->write ( & casChannelI :: write );
|
||||
if ( status == S_casApp_success || status == S_casApp_asyncCompletion ) {
|
||||
status = S_cas_success;
|
||||
}
|
||||
else if ( status == S_casApp_postponeAsyncIO ) {
|
||||
pChan->getPVI().addItemToIOBLockedList ( *this );
|
||||
}
|
||||
else {
|
||||
int writeServiceStatus = status;
|
||||
status = this->writeActionSendFailureStatus ( guard, *mp,
|
||||
pChan->getCID(), writeServiceStatus );
|
||||
if ( status != S_cas_success ) {
|
||||
this->pendingResponseStatus = writeServiceStatus;
|
||||
this->responseIsPending = true;
|
||||
}
|
||||
}
|
||||
{
|
||||
caStatus servStat = this->write ( & casChannelI :: write );
|
||||
if ( servStat == S_casApp_success ||
|
||||
servStat == S_casApp_asyncCompletion ) {
|
||||
return S_cas_success;
|
||||
}
|
||||
else if ( servStat == S_casApp_postponeAsyncIO ) {
|
||||
return S_casApp_postponeAsyncIO;
|
||||
}
|
||||
else {
|
||||
caStatus status =
|
||||
this->writeActionSendFailureStatus ( guard, *mp,
|
||||
pChan->getCID(), servStat );
|
||||
if ( status != S_cas_success ) {
|
||||
this->pendingResponseStatus = servStat;
|
||||
this->responseIsPending = true;
|
||||
}
|
||||
return status;
|
||||
}
|
||||
}
|
||||
|
||||
//
|
||||
// The gdd created above is deleted by the server tool
|
||||
//
|
||||
return status;
|
||||
}
|
||||
|
||||
//
|
||||
@@ -1026,21 +1079,19 @@ caStatus casStrmClient::writeNotifyAction (
|
||||
const caHdrLargeArray *mp = this->ctx.getMsg ();
|
||||
|
||||
casChannelI *pChan;
|
||||
int status = this->verifyRequest ( pChan );
|
||||
if ( status != ECA_NORMAL ) {
|
||||
return casStrmClient::writeNotifyResponseECA_XXX ( guard, *mp, status );
|
||||
}
|
||||
{
|
||||
caStatus status = this->verifyRequest ( pChan );
|
||||
if ( status != ECA_NORMAL ) {
|
||||
return casStrmClient::writeNotifyResponseECA_XXX ( guard, *mp, status );
|
||||
}
|
||||
}
|
||||
|
||||
// dont allow a request that completed with the service in the
|
||||
// past, but was incomplete because no response was sent be
|
||||
// executed twice with the service
|
||||
if ( this->responseIsPending ) {
|
||||
int status = this->writeNotifyResponse ( guard, *pChan,
|
||||
caStatus status = this->writeNotifyResponse ( guard, *pChan,
|
||||
*mp, this->pendingResponseStatus );
|
||||
if ( status == S_cas_success ) {
|
||||
this->pendingResponseStatus = S_cas_success;
|
||||
this->responseIsPending = false;
|
||||
}
|
||||
return status;
|
||||
}
|
||||
|
||||
@@ -1061,24 +1112,24 @@ caStatus casStrmClient::writeNotifyAction (
|
||||
//
|
||||
// initiate the write operation
|
||||
//
|
||||
status = this->write ( & casChannelI :: writeNotify );
|
||||
if (status == S_casApp_asyncCompletion) {
|
||||
status = S_cas_success;
|
||||
}
|
||||
else if (status==S_casApp_postponeAsyncIO) {
|
||||
pChan->getPVI().addItemToIOBLockedList(*this);
|
||||
}
|
||||
else {
|
||||
int writeNotifyServiceStatus = status;
|
||||
status = this->writeNotifyResponse ( guard, *pChan, *mp,
|
||||
writeNotifyServiceStatus );
|
||||
if ( status != S_cas_success ) {
|
||||
this->pendingResponseStatus = writeNotifyServiceStatus;
|
||||
this->responseIsPending = true;
|
||||
}
|
||||
}
|
||||
|
||||
return status;
|
||||
{
|
||||
caStatus servStat = this->write ( & casChannelI :: writeNotify );
|
||||
if ( servStat == S_casApp_asyncCompletion ) {
|
||||
return S_cas_success;
|
||||
}
|
||||
else if ( servStat == S_casApp_postponeAsyncIO ) {
|
||||
return S_casApp_postponeAsyncIO;
|
||||
}
|
||||
else {
|
||||
caStatus status = this->writeNotifyResponse ( guard, *pChan,
|
||||
*mp, servStat );
|
||||
if ( status != S_cas_success ) {
|
||||
this->pendingResponseStatus = servStat;
|
||||
this->responseIsPending = true;
|
||||
}
|
||||
return status;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -1238,9 +1289,8 @@ caStatus casStrmClient::claimChannelAction (
|
||||
const caHdrLargeArray * mp = this->ctx.getMsg();
|
||||
char *pName = (char *) this->ctx.getData();
|
||||
caServerI & cas = *this->ctx.getServer();
|
||||
caStatus status;
|
||||
|
||||
/*
|
||||
/*
|
||||
* The available field is used (abused)
|
||||
* here to communicate the miner version number
|
||||
* starting with CA 4.1. The field was set to zero
|
||||
@@ -1265,7 +1315,7 @@ caStatus casStrmClient::claimChannelAction (
|
||||
// new API was added to the server (they must
|
||||
// now use clients at EPICS 3.12 or higher)
|
||||
//
|
||||
status = this->sendErr ( guard, mp, mp->m_cid, ECA_DEFUNCT,
|
||||
caStatus status = this->sendErr ( guard, mp, mp->m_cid, ECA_DEFUNCT,
|
||||
"R3.11 connect sequence from old client was ignored");
|
||||
if ( status ) {
|
||||
return status;
|
||||
@@ -1302,22 +1352,49 @@ caStatus casStrmClient::claimChannelAction (
|
||||
" - expected S_casApp_asyncCompletion\n",
|
||||
pvar.getStatus() );
|
||||
}
|
||||
status = S_cas_success;
|
||||
return S_cas_success;
|
||||
}
|
||||
else if ( pvar.getStatus() == S_casApp_asyncCompletion ) {
|
||||
status = this->createChanResponse ( guard,
|
||||
this->ctx, S_cas_badParameter );
|
||||
errMessage ( S_cas_badParameter,
|
||||
"- expected asynch IO creation from caServer::pvAttach()" );
|
||||
"- expected asynch IO creation "
|
||||
"from caServer::pvAttach()" );
|
||||
return this->createChanResponse ( guard,
|
||||
this->ctx, S_cas_badParameter );
|
||||
}
|
||||
else if ( pvar.getStatus() == S_casApp_postponeAsyncIO ) {
|
||||
status = S_casApp_postponeAsyncIO;
|
||||
this->ctx.getServer()->addItemToIOBLockedList ( *this );
|
||||
caServerI & casi ( * this->ctx.getServer() );
|
||||
if ( this->ioIsPending () ) {
|
||||
casi.addItemToIOBLockedList ( *this );
|
||||
return S_casApp_postponeAsyncIO;
|
||||
}
|
||||
else {
|
||||
// Its not ok to postpone IO when there isnt at
|
||||
// least one request pending. In that situation
|
||||
// there is no event from the service telling us
|
||||
// when its ok to start issuing requests again!
|
||||
// So in that situation we tell the client that
|
||||
// the service refused the request, and this
|
||||
// caused the request to fail.
|
||||
this->issuePosponeWhenNonePendingWarning ( "create channel" );
|
||||
return this->createChanResponse ( guard, this->ctx,
|
||||
S_cas_posponeWhenNonePending );
|
||||
}
|
||||
}
|
||||
else {
|
||||
status = this->createChanResponse ( guard, this->ctx, pvar );
|
||||
return this->createChanResponse ( guard, this->ctx, pvar );
|
||||
}
|
||||
return status;
|
||||
}
|
||||
|
||||
//
|
||||
// casStrmClient::issuePosponeWhenNonePendingWarning()
|
||||
//
|
||||
void casStrmClient ::
|
||||
issuePosponeWhenNonePendingWarning ( const char * pReqTypeStr )
|
||||
{
|
||||
errlogPrintf ( "service attempted to postpone %s IO when "
|
||||
"no IO was pending against the target\n" );
|
||||
errlogPrintf ( "server library will not receive a restart event, "
|
||||
"and so failure response was sent to client\n" );
|
||||
}
|
||||
|
||||
//
|
||||
@@ -1648,15 +1725,34 @@ caStatus casStrmClient::eventAddAction (
|
||||
this->ctx.getData();
|
||||
|
||||
casChannelI *pciu;
|
||||
caStatus status = casStrmClient::verifyRequest ( pciu );
|
||||
if ( status != ECA_NORMAL ) {
|
||||
if ( pciu ) {
|
||||
return this->sendErr ( guard, mp,
|
||||
pciu->getCID(), status, NULL);
|
||||
{
|
||||
caStatus status = casStrmClient::verifyRequest ( pciu );
|
||||
if ( status != ECA_NORMAL ) {
|
||||
if ( pciu ) {
|
||||
return this->sendErr ( guard, mp,
|
||||
pciu->getCID(), status, NULL);
|
||||
}
|
||||
else {
|
||||
return this->sendErr ( guard, mp,
|
||||
invalidResID, status, NULL );
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// dont allow a request that completed with the service in the
|
||||
// past, but was incomplete because no response was sent to be
|
||||
// executed twice with the service
|
||||
if ( this->responseIsPending ) {
|
||||
// dont read twice if we didnt finish in the past
|
||||
// because we were send blocked
|
||||
if ( this->pendingResponseStatus == S_cas_success ) {
|
||||
assert ( pValueRead.valid () );
|
||||
return this->monitorResponse ( guard, *pciu,
|
||||
*mp, *pValueRead, S_cas_success );
|
||||
}
|
||||
else {
|
||||
return this->sendErr ( guard, mp,
|
||||
invalidResID, status, NULL );
|
||||
return this->monitorFailureResponse (
|
||||
guard, *mp, ECA_GETFAIL );
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1684,43 +1780,46 @@ caStatus casStrmClient::eventAddAction (
|
||||
ECA_BADMASK, errStr );
|
||||
}
|
||||
|
||||
casMonitor & mon = this->monitorFactory (
|
||||
*pciu, mp->m_available, mp->m_count,
|
||||
mp->m_dataType, mask );
|
||||
pciu->installMonitor ( mon );
|
||||
|
||||
|
||||
//
|
||||
// Attempt to read the first monitored value prior to creating
|
||||
// the monitor object so that if the server tool chooses
|
||||
// to postpone asynchronous IO we can safely restart this
|
||||
// request later.
|
||||
//
|
||||
const gdd * pDD = 0;
|
||||
status = this->read ( pDD );
|
||||
//
|
||||
// always send immediate monitor response at event add
|
||||
//
|
||||
if ( status == S_cas_success ) {
|
||||
status = this->monitorResponse ( guard, *pciu,
|
||||
*mp, *pDD, status );
|
||||
pDD->unreference ();
|
||||
}
|
||||
else if ( status == S_casApp_asyncCompletion ) {
|
||||
status = S_cas_success;
|
||||
}
|
||||
else if ( status == S_casApp_postponeAsyncIO ) {
|
||||
//
|
||||
// try again later
|
||||
//
|
||||
pciu->getPVI().addItemToIOBLockedList ( *this );
|
||||
}
|
||||
else {
|
||||
status = this->monitorFailureResponse ( guard, *mp, ECA_GETFAIL );
|
||||
}
|
||||
|
||||
if ( status == S_cas_success ) {
|
||||
casMonitor & mon = this->monitorFactory (
|
||||
*pciu, mp->m_available, mp->m_count,
|
||||
mp->m_dataType, mask );
|
||||
pciu->installMonitor ( mon );
|
||||
}
|
||||
|
||||
return status;
|
||||
{
|
||||
caStatus servStat = this->read ();
|
||||
//
|
||||
// always send immediate monitor response at event add
|
||||
//
|
||||
if ( servStat == S_cas_success ) {
|
||||
assert ( pValueRead.valid () );
|
||||
caStatus status = this->monitorResponse ( guard, *pciu,
|
||||
*mp, *pValueRead, servStat );
|
||||
this->responseIsPending = ( status != S_cas_success );
|
||||
return status;
|
||||
}
|
||||
else if ( servStat == S_casApp_asyncCompletion ) {
|
||||
return S_cas_success;
|
||||
}
|
||||
else if ( servStat == S_casApp_postponeAsyncIO ) {
|
||||
return S_casApp_postponeAsyncIO;
|
||||
}
|
||||
else {
|
||||
caStatus status = this->monitorFailureResponse (
|
||||
guard, *mp, ECA_GETFAIL );
|
||||
if ( status != S_cas_success ) {
|
||||
pendingResponseStatus = servStat;
|
||||
this->responseIsPending = true;
|
||||
}
|
||||
return status;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -2026,7 +2125,6 @@ caStatus casStrmClient::accessRightsResponse (
|
||||
caStatus casStrmClient :: write ( PWriteMethod pWriteMethod )
|
||||
{
|
||||
const caHdrLargeArray *pHdr = this->ctx.getMsg();
|
||||
caStatus status;
|
||||
|
||||
// no puts via compound types (for now)
|
||||
if (dbr_value_offset[pHdr->m_dataType]) {
|
||||
@@ -2034,14 +2132,14 @@ caStatus casStrmClient :: write ( PWriteMethod pWriteMethod )
|
||||
}
|
||||
|
||||
// dont byte swap twice
|
||||
if ( this->payloadNeedsByteSwap ) {
|
||||
if ( this->reqPayloadNeedsByteSwap ) {
|
||||
int cacStatus = caNetConvert (
|
||||
pHdr->m_dataType, this->ctx.getData(), this->ctx.getData(),
|
||||
false, pHdr->m_count );
|
||||
if ( cacStatus != ECA_NORMAL ) {
|
||||
return S_cas_badType;
|
||||
}
|
||||
this->payloadNeedsByteSwap = false;
|
||||
this->reqPayloadNeedsByteSwap = false;
|
||||
}
|
||||
|
||||
//
|
||||
@@ -2053,37 +2151,56 @@ caStatus casStrmClient :: write ( PWriteMethod pWriteMethod )
|
||||
// DBR_STRING is stored outside the DD so it
|
||||
// lumped in with arrays
|
||||
//
|
||||
if ( pHdr->m_count > 1u ) {
|
||||
status = this->writeArrayData ( pWriteMethod );
|
||||
}
|
||||
else {
|
||||
status = this->writeScalarData ( pWriteMethod );
|
||||
}
|
||||
{
|
||||
caStatus servStatus;
|
||||
if ( pHdr->m_count > 1u ) {
|
||||
servStatus = this->writeArrayData ( pWriteMethod );
|
||||
}
|
||||
else {
|
||||
servStatus = this->writeScalarData ( pWriteMethod );
|
||||
}
|
||||
|
||||
//
|
||||
// prevent problems when they initiate
|
||||
// async IO but dont return status
|
||||
// indicating so (and vise versa)
|
||||
//
|
||||
if ( this->userStartedAsyncIO ) {
|
||||
if ( status != S_casApp_asyncCompletion ) {
|
||||
fprintf(stderr,
|
||||
"Application returned %d from casChannel::write() - expected S_casApp_asyncCompletion\n",
|
||||
status);
|
||||
status = S_casApp_asyncCompletion;
|
||||
}
|
||||
this->payloadNeedsByteSwap = true;
|
||||
}
|
||||
else if ( status != S_casApp_postponeAsyncIO ) {
|
||||
if ( status == S_casApp_asyncCompletion ) {
|
||||
status = S_cas_badParameter;
|
||||
errMessage ( status,
|
||||
"- expected asynch IO creation from casChannel::write()" );
|
||||
//
|
||||
// prevent problems when they initiate
|
||||
// async IO but dont return status
|
||||
// indicating so (and vise versa)
|
||||
//
|
||||
if ( this->userStartedAsyncIO ) {
|
||||
if ( servStatus != S_casApp_asyncCompletion ) {
|
||||
errlogPrintf (
|
||||
"Application returned %d from casChannel::write() - "
|
||||
"expected S_casApp_asyncCompletion\n",
|
||||
servStatus );
|
||||
servStatus = S_casApp_asyncCompletion;
|
||||
}
|
||||
}
|
||||
else if ( servStatus == S_casApp_postponeAsyncIO ) {
|
||||
casPVI & pvi ( this->ctx.getChannel()->getPVI() );
|
||||
if ( pvi.ioIsPending () ) {
|
||||
pvi.addItemToIOBLockedList ( *this );
|
||||
}
|
||||
else {
|
||||
// Its not ok to postpone IO when there isnt at
|
||||
// least one request pending. In that situation
|
||||
// there is no event from the service telling us
|
||||
// when its ok to start issuing requests again!
|
||||
// So in that situation we tell the client that
|
||||
// the service refused the request, and this
|
||||
// caused the request to fail.
|
||||
this->issuePosponeWhenNonePendingWarning ( "write" );
|
||||
servStatus = S_cas_posponeWhenNonePending;
|
||||
}
|
||||
}
|
||||
this->payloadNeedsByteSwap = true;
|
||||
}
|
||||
else {
|
||||
if ( servStatus == S_casApp_asyncCompletion ) {
|
||||
servStatus = S_cas_badParameter;
|
||||
errMessage ( servStatus,
|
||||
"- expected asynch IO creation from casChannel::write()" );
|
||||
}
|
||||
}
|
||||
|
||||
return status;
|
||||
return servStatus;
|
||||
}
|
||||
}
|
||||
|
||||
//
|
||||
@@ -2261,55 +2378,75 @@ caStatus casStrmClient :: writeArrayData ( PWriteMethod pWriteMethod )
|
||||
//
|
||||
// casStrmClient::read()
|
||||
//
|
||||
caStatus casStrmClient::read ( const gdd * & pDescRet )
|
||||
caStatus casStrmClient::read ()
|
||||
{
|
||||
const caHdrLargeArray * pHdr = this->ctx.getMsg();
|
||||
|
||||
pDescRet = 0;
|
||||
gdd * pDD = 0;
|
||||
caStatus status = createDBRDD ( pHdr->m_dataType,
|
||||
pHdr->m_count, pDD );
|
||||
if ( status != S_cas_success ) {
|
||||
return status;
|
||||
}
|
||||
{
|
||||
gdd * pDD = 0;
|
||||
caStatus status = createDBRDD ( pHdr->m_dataType,
|
||||
pHdr->m_count, pDD );
|
||||
if ( status != S_cas_success ) {
|
||||
return status;
|
||||
}
|
||||
pValueRead.set ( pDD );
|
||||
pDD->unreference ();
|
||||
}
|
||||
|
||||
//
|
||||
// clear the async IO flag
|
||||
//
|
||||
this->userStartedAsyncIO = false;
|
||||
|
||||
//
|
||||
// call the server tool's virtual function
|
||||
//
|
||||
status = this->ctx.getChannel()->read ( this->ctx, * pDD );
|
||||
{
|
||||
//
|
||||
// call the server tool's virtual function
|
||||
//
|
||||
caStatus servStat = this->ctx.getChannel()->
|
||||
read ( this->ctx, *pValueRead );
|
||||
|
||||
//
|
||||
// prevent problems when they initiate
|
||||
// async IO but dont return status
|
||||
// indicating so (and vise versa)
|
||||
//
|
||||
if ( this->userStartedAsyncIO ) {
|
||||
if ( status != S_casApp_asyncCompletion ) {
|
||||
fprintf(stderr,
|
||||
"Application returned %d from casChannel::read() - expected S_casApp_asyncCompletion\n",
|
||||
status);
|
||||
status = S_casApp_asyncCompletion;
|
||||
}
|
||||
}
|
||||
else if ( status == S_casApp_asyncCompletion ) {
|
||||
status = S_cas_badParameter;
|
||||
errMessage(status,
|
||||
"- expected asynch IO creation from casChannel::read()");
|
||||
}
|
||||
|
||||
if ( status == S_casApp_success ) {
|
||||
pDescRet = pDD;
|
||||
}
|
||||
else {
|
||||
pDD->unreference ();
|
||||
//
|
||||
// prevent problems when they initiate
|
||||
// async IO but dont return status
|
||||
// indicating so (and vise versa)
|
||||
//
|
||||
if ( this->userStartedAsyncIO ) {
|
||||
if ( servStat != S_casApp_asyncCompletion ) {
|
||||
errlogPrintf (
|
||||
"Application returned %d from casChannel::read() - "
|
||||
"expected S_casApp_asyncCompletion\n",
|
||||
servStat );
|
||||
servStat = S_casApp_asyncCompletion;
|
||||
}
|
||||
pValueRead.set ( 0 );
|
||||
}
|
||||
else if ( servStat == S_casApp_asyncCompletion ) {
|
||||
servStat = S_cas_badParameter;
|
||||
errMessage ( servStat,
|
||||
"- expected asynch IO creation from casChannel::read()");
|
||||
}
|
||||
else if ( servStat != S_casApp_success ) {
|
||||
pValueRead.set ( 0 );
|
||||
if ( servStat == S_casApp_postponeAsyncIO ) {
|
||||
casPVI & pvi ( this->ctx.getChannel()->getPVI() );
|
||||
if ( pvi.ioIsPending () ) {
|
||||
pvi.addItemToIOBLockedList ( *this );
|
||||
}
|
||||
else {
|
||||
// Its not ok to postpone IO when there isnt at
|
||||
// least one request pending. In that situation
|
||||
// there is no event from the service telling us
|
||||
// when its ok to start issuing requests again!
|
||||
// So in that situation we tell the client that
|
||||
// the service refused the request, and this
|
||||
// caused the request to fail.
|
||||
this->issuePosponeWhenNonePendingWarning ( "read" );
|
||||
servStat = S_cas_posponeWhenNonePending;
|
||||
}
|
||||
}
|
||||
}
|
||||
return servStat;
|
||||
}
|
||||
|
||||
return status;
|
||||
}
|
||||
|
||||
//
|
||||
|
||||
Reference in New Issue
Block a user