o track number of async read and writes independently
o when the last async write occurs, check to see if a value is cached and write it immediately (instead of waiting for aync io object's ctor to run. This allows the regression tests to pass.
This commit is contained in:
@@ -15,6 +15,16 @@
|
||||
|
||||
#include "exServer.h"
|
||||
|
||||
exAsyncPV::exAsyncPV ( exServer & cas, pvInfo & setup,
|
||||
bool preCreateFlag, bool scanOnIn,
|
||||
double asyncDelayIn ) :
|
||||
exScalarPV ( cas, setup, preCreateFlag, scanOnIn ),
|
||||
asyncDelay ( asyncDelayIn ),
|
||||
simultAsychReadIOCount ( 0u ),
|
||||
simultAsychWriteIOCount ( 0u )
|
||||
{
|
||||
}
|
||||
|
||||
//
|
||||
// exAsyncPV::read()
|
||||
//
|
||||
@@ -22,16 +32,21 @@ caStatus exAsyncPV::read (const casCtx &ctx, gdd &valueIn)
|
||||
{
|
||||
exAsyncReadIO *pIO;
|
||||
|
||||
if ( this->simultAsychIOCount >= this->cas.maxSimultAsyncIO () ) {
|
||||
if ( this->simultAsychReadIOCount >= this->cas.maxSimultAsyncIO () ) {
|
||||
return S_casApp_postponeAsyncIO;
|
||||
}
|
||||
|
||||
pIO = new exAsyncReadIO ( this->cas, ctx,
|
||||
*this, valueIn, this->asyncDelay );
|
||||
if ( ! pIO ) {
|
||||
return S_casApp_noMemory;
|
||||
if ( this->simultAsychReadIOCount > 0 ) {
|
||||
return S_casApp_postponeAsyncIO;
|
||||
}
|
||||
else {
|
||||
return S_casApp_noMemory;
|
||||
}
|
||||
}
|
||||
this->simultAsychIOCount++;
|
||||
this->simultAsychReadIOCount++;
|
||||
return S_casApp_asyncCompletion;
|
||||
}
|
||||
|
||||
@@ -40,7 +55,7 @@ caStatus exAsyncPV::read (const casCtx &ctx, gdd &valueIn)
|
||||
//
|
||||
caStatus exAsyncPV::writeNotify ( const casCtx &ctx, const gdd &valueIn )
|
||||
{
|
||||
if ( this->simultAsychIOCount >= this->cas.maxSimultAsyncIO() ) {
|
||||
if ( this->simultAsychWriteIOCount >= this->cas.maxSimultAsyncIO() ) {
|
||||
return S_casApp_postponeAsyncIO;
|
||||
}
|
||||
|
||||
@@ -48,9 +63,14 @@ caStatus exAsyncPV::writeNotify ( const casCtx &ctx, const gdd &valueIn )
|
||||
exAsyncWriteIO ( this->cas, ctx, *this,
|
||||
valueIn, this->asyncDelay );
|
||||
if ( ! pIO ) {
|
||||
return S_casApp_noMemory;
|
||||
}
|
||||
this->simultAsychIOCount++;
|
||||
if ( this->simultAsychReadIOCount > 0 ) {
|
||||
return S_casApp_postponeAsyncIO;
|
||||
}
|
||||
else {
|
||||
return S_casApp_noMemory;
|
||||
}
|
||||
}
|
||||
this->simultAsychWriteIOCount++;
|
||||
return S_casApp_asyncCompletion;
|
||||
}
|
||||
|
||||
@@ -63,7 +83,7 @@ caStatus exAsyncPV::write ( const casCtx &ctx, const gdd &valueIn )
|
||||
// sent always applied behavior that IOCs provide excepting
|
||||
// that we will alow N requests to pend instead of a limit
|
||||
// of only one imposed in the IOC
|
||||
if ( this->simultAsychIOCount >= this->cas.maxSimultAsyncIO() ) {
|
||||
if ( this->simultAsychWriteIOCount >= this->cas.maxSimultAsyncIO() ) {
|
||||
pStandbyValue.set ( & valueIn );
|
||||
return S_casApp_success;
|
||||
}
|
||||
@@ -72,24 +92,55 @@ caStatus exAsyncPV::write ( const casCtx &ctx, const gdd &valueIn )
|
||||
exAsyncWriteIO ( this->cas, ctx, *this,
|
||||
valueIn, this->asyncDelay );
|
||||
if ( ! pIO ) {
|
||||
return S_casApp_noMemory;
|
||||
}
|
||||
this->simultAsychIOCount++;
|
||||
pStandbyValue.set ( & valueIn );
|
||||
return S_casApp_success;
|
||||
}
|
||||
this->simultAsychWriteIOCount++;
|
||||
return S_casApp_asyncCompletion;
|
||||
}
|
||||
|
||||
void exAsyncPV::removeIO ()
|
||||
// Implementing a specialized update for exAsyncPV
|
||||
// allows standby value to update when we update
|
||||
// the PV from an asynchronous write timer expiration
|
||||
// which is a better time compared to removeIO below
|
||||
// which, if used, gets the reads and writes out of
|
||||
// order. This type of reordering can cause the
|
||||
// regression tests to fail.
|
||||
caStatus exAsyncPV :: updateFromAsyncWrite ( const gdd & src )
|
||||
{
|
||||
if ( this->simultAsychIOCount > 0u ) {
|
||||
this->simultAsychIOCount--;
|
||||
if ( this->simultAsychIOCount == 0 &&
|
||||
caStatus stat = this->update ( src );
|
||||
if ( this->simultAsychWriteIOCount <=1 &&
|
||||
pStandbyValue.valid () ) {
|
||||
//printf("updateFromAsyncWrite: write standby\n");
|
||||
stat = this->update ( *this->pStandbyValue );
|
||||
this->pStandbyValue.set ( 0 );
|
||||
}
|
||||
return stat;
|
||||
}
|
||||
|
||||
void exAsyncPV::removeReadIO ()
|
||||
{
|
||||
if ( this->simultAsychReadIOCount > 0u ) {
|
||||
this->simultAsychReadIOCount--;
|
||||
}
|
||||
else {
|
||||
fprintf ( stderr, "inconsistent simultAsychReadIOCount?\n" );
|
||||
}
|
||||
}
|
||||
|
||||
void exAsyncPV::removeWriteIO ()
|
||||
{
|
||||
if ( this->simultAsychWriteIOCount > 0u ) {
|
||||
this->simultAsychWriteIOCount--;
|
||||
if ( this->simultAsychWriteIOCount == 0 &&
|
||||
pStandbyValue.valid () ) {
|
||||
//printf("removeIO: write standby\n");
|
||||
this->update ( *this->pStandbyValue );
|
||||
this->pStandbyValue.set ( 0 );
|
||||
}
|
||||
}
|
||||
else {
|
||||
fprintf ( stderr, "inconsistent simultAsychIOCount?\n" );
|
||||
fprintf ( stderr, "inconsistent simultAsychWriteIOCount?\n" );
|
||||
}
|
||||
}
|
||||
|
||||
@@ -115,9 +166,9 @@ exAsyncWriteIO::~exAsyncWriteIO()
|
||||
// hasnt been written then force it to happen
|
||||
// now so that regression testing works
|
||||
if ( this->pValue.valid () ) {
|
||||
this->pv.update ( *this->pValue );
|
||||
this->pv.updateFromAsyncWrite ( *this->pValue );
|
||||
}
|
||||
this->pv.removeIO();
|
||||
this->pv.removeWriteIO();
|
||||
}
|
||||
|
||||
//
|
||||
@@ -127,7 +178,8 @@ exAsyncWriteIO::~exAsyncWriteIO()
|
||||
epicsTimerNotify::expireStatus exAsyncWriteIO::
|
||||
expire ( const epicsTime & /* currentTime */ )
|
||||
{
|
||||
caStatus status = this->pv.update ( *this->pValue );
|
||||
assert ( this->pValue.valid () );
|
||||
caStatus status = this->pv.updateFromAsyncWrite ( *this->pValue );
|
||||
this->pValue.set ( 0 );
|
||||
this->postIOCompletion ( status );
|
||||
return noRestart;
|
||||
@@ -150,16 +202,16 @@ exAsyncReadIO::exAsyncReadIO ( exServer & cas, const casCtx & ctxIn,
|
||||
//
|
||||
exAsyncReadIO::~exAsyncReadIO()
|
||||
{
|
||||
this->pv.removeIO ();
|
||||
this->pv.removeReadIO ();
|
||||
this->timer.destroy ();
|
||||
}
|
||||
|
||||
|
||||
//
|
||||
// exAsyncReadIO::expire()
|
||||
// (a virtual function that runs when the base timer expires)
|
||||
//
|
||||
epicsTimerNotify::expireStatus exAsyncReadIO::expire ( const epicsTime & /* currentTime */ )
|
||||
epicsTimerNotify::expireStatus
|
||||
exAsyncReadIO::expire ( const epicsTime & /* currentTime */ )
|
||||
{
|
||||
//
|
||||
// map between the prototype in and the
|
||||
|
||||
@@ -334,11 +334,14 @@ public:
|
||||
caStatus read ( const casCtx & ctxIn, gdd & protoIn );
|
||||
caStatus write ( const casCtx & ctxIn, const gdd & value );
|
||||
caStatus writeNotify ( const casCtx & ctxIn, const gdd & value );
|
||||
void removeIO();
|
||||
void removeReadIO();
|
||||
void removeWriteIO();
|
||||
caStatus updateFromAsyncWrite ( const gdd & );
|
||||
private:
|
||||
double asyncDelay;
|
||||
smartConstGDDPointer pStandbyValue;
|
||||
unsigned simultAsychIOCount;
|
||||
unsigned simultAsychReadIOCount;
|
||||
unsigned simultAsychWriteIOCount;
|
||||
exAsyncPV & operator = ( const exAsyncPV & );
|
||||
exAsyncPV ( const exAsyncPV & );
|
||||
};
|
||||
@@ -580,15 +583,6 @@ inline unsigned exServer :: maxSimultAsyncIO () const
|
||||
return this->_maxSimultAsyncIO;
|
||||
}
|
||||
|
||||
inline exAsyncPV::exAsyncPV ( exServer & cas, pvInfo & setup,
|
||||
bool preCreateFlag, bool scanOnIn,
|
||||
double asyncDelayIn ) :
|
||||
exScalarPV ( cas, setup, preCreateFlag, scanOnIn ),
|
||||
asyncDelay ( asyncDelayIn ),
|
||||
simultAsychIOCount ( 0u )
|
||||
{
|
||||
}
|
||||
|
||||
inline exChannel::exChannel ( const casCtx & ctxIn ) :
|
||||
casChannel(ctxIn)
|
||||
{
|
||||
|
||||
Reference in New Issue
Block a user