copy lastelem
can't keep this when the default queue size is 2.
This commit is contained in:
@ -34,7 +34,8 @@ struct MonitorCacheEntry : public epics::pvData::MonitorRequester
|
||||
|
||||
typedef std::vector<epicsUInt8> pvrequest_t;
|
||||
|
||||
bool done;
|
||||
bool havedata; // set when initial update is received
|
||||
bool done; // set when unlisten() is received
|
||||
size_t nwakeups; // # of upstream monitorEvent() calls
|
||||
size_t nevents; // # of upstream events poll()'d
|
||||
|
||||
|
@ -35,6 +35,7 @@ T getS(const pvd::PVStructurePtr& s, const char* name, T dft)
|
||||
MonitorCacheEntry::MonitorCacheEntry(ChannelCacheEntry *ent, const pvd::PVStructure::shared_pointer& pvr)
|
||||
:chan(ent)
|
||||
,bufferSize(getS<pvd::uint32>(pvr, "record._options.queueSize", 2)) // should be same default as pvAccess, but not required
|
||||
,havedata(false)
|
||||
,done(false)
|
||||
,nwakeups(0)
|
||||
,nevents(0)
|
||||
@ -69,6 +70,10 @@ MonitorCacheEntry::monitorConnect(pvd::Status const & status,
|
||||
startresult = status;
|
||||
}
|
||||
|
||||
if(startresult.isSuccess()) {
|
||||
lastelem.reset(new pvd::MonitorElement(pvd::getPVDataCreate()->createPVStructure(structure)));
|
||||
}
|
||||
|
||||
// set typedesc and startresult for futured MonitorUsers
|
||||
// and copy snapshot of already interested MonitorUsers
|
||||
tonotify = interested.lock_vector();
|
||||
@ -119,16 +124,19 @@ MonitorCacheEntry::monitorEvent(pvd::MonitorPtr const & monitor)
|
||||
|
||||
{
|
||||
Guard G(mutex()); // MCE and MU guarded by the same mutex
|
||||
if(!havedata)
|
||||
havedata = true;
|
||||
|
||||
//TODO: flow control, if all MU buffers are full, break before poll()==NULL
|
||||
while((update=monitor->poll()))
|
||||
{
|
||||
epicsAtomicIncrSizeT(&nevents);
|
||||
|
||||
if(lastelem)
|
||||
monitor->release(lastelem);
|
||||
lastelem = update;
|
||||
|
||||
lastelem->pvStructurePtr->copyUnchecked(*update->pvStructurePtr,
|
||||
*update->changedBitSet);
|
||||
*lastelem->changedBitSet = *update->changedBitSet;
|
||||
*lastelem->overrunBitSet = *update->overrunBitSet;
|
||||
monitor->release(update);
|
||||
|
||||
interested_t::iterator IIT(interested); // recursively locks interested.mutex() (assumes this->mutex() is interestd.mutex())
|
||||
for(interested_t::value_pointer pusr = IIT.next(); pusr; pusr = IIT.next())
|
||||
@ -193,8 +201,6 @@ MonitorCacheEntry::unlisten(pvd::MonitorPtr const & monitor)
|
||||
M.swap(mon);
|
||||
tonotify = interested.lock_vector();
|
||||
// assume that upstream won't call monitorEvent() again
|
||||
monitor->release(lastelem);
|
||||
lastelem.reset();
|
||||
|
||||
// cause future downstream start() to error
|
||||
startresult = pvd::Status(pvd::Status::STATUSTYPE_ERROR, "upstream unlisten()");
|
||||
@ -257,21 +263,16 @@ MonitorUser::start()
|
||||
return pvd::Status(pvd::Status::STATUSTYPE_FATAL, "already dead");
|
||||
|
||||
bool doEvt = false;
|
||||
pvd::PVStructurePtr lval;
|
||||
pvd::StructureConstPtr typedesc;
|
||||
{
|
||||
Guard G(entry->mutex());
|
||||
Guard G(entry->mutex()); // MCE and MU have share a lock
|
||||
|
||||
if(!entry->startresult.isSuccess())
|
||||
return entry->startresult;
|
||||
|
||||
if(entry->lastelem)
|
||||
pvd::PVStructurePtr lval;
|
||||
if(entry->havedata)
|
||||
lval = entry->lastelem->pvStructurePtr;
|
||||
typedesc = entry->typedesc;
|
||||
}
|
||||
|
||||
{
|
||||
Guard G(mutex());
|
||||
pvd::StructureConstPtr typedesc(entry->typedesc);
|
||||
|
||||
if(initial) {
|
||||
initial = false;
|
||||
|
Reference in New Issue
Block a user