spamme example use MonitorFIFO

This commit is contained in:
Michael Davidsaver
2018-05-23 13:57:43 -07:00
parent 5e887a6d02
commit 3341d9a2ba

View File

@ -32,6 +32,8 @@ typedef epicsGuard<epicsMutex> Guard;
epicsEvent done; epicsEvent done;
bool debug;
#ifdef USE_SIGNAL #ifdef USE_SIGNAL
void alldone(int num) void alldone(int num)
{ {
@ -48,145 +50,39 @@ pvd::Structure::const_shared_pointer spamtype(pvd::getFieldCreate()->createField
struct SpamProvider; struct SpamProvider;
struct SpamChannel; struct SpamChannel;
struct SpamMonitor : public pva::Monitor, struct SpamSource : public pva::MonitorFIFO::Source
public std::tr1::enable_shared_from_this<SpamMonitor>
{ {
const std::tr1::shared_ptr<SpamChannel> channel;
const requester_type::weak_pointer requester;
pvd::int32 maxQueue;
bool pipeline;
// Has the client seen poll()==NULL
bool clientEmpty;
epicsMutex mutex; epicsMutex mutex;
bool running;
epicsUInt32 remoteQueue;
std::deque<epics::pvData::MonitorElementPtr> filled, empty;
pvd::PVStructure::shared_pointer value;
epicsUInt32 counter; epicsUInt32 counter;
SpamMonitor(const std::tr1::shared_ptr<SpamChannel>& chan, const pvd::PVStructurePtr cur;
const pva::MonitorRequester::shared_pointer &requester, const pvd::PVIntPtr val;
const pvd::PVStructure::shared_pointer &pvRequest) pvd::BitSet changed;
:channel(chan)
,requester(requester) SpamSource()
,maxQueue(0) :counter(0)
,pipeline(false) ,cur(pvd::getPVDataCreate()->createPVStructure(spamtype))
,clientEmpty(true) ,val(cur->getSubFieldT<pvd::PVInt>("value"))
,running(false)
,remoteQueue(0)
,counter(0)
{ {
pvd::PVScalar::shared_pointer fld; changed.set(val->getFieldOffset()); // our value always changes
fld = pvRequest->getSubField<pvd::PVScalar>("record._options.queueSize");
if(fld)
maxQueue = fld->getAs<pvd::int32>();
if(maxQueue<3)
maxQueue = 3;
fld = pvRequest->getSubField<pvd::PVScalar>("record._options.pipeline");
if(fld)
pipeline = fld->getAs<pvd::boolean>();
pvd::PVDataCreatePtr create(pvd::getPVDataCreate());
value = create->createPVStructure(spamtype);
for(pvd::int32 i=0; i<maxQueue; i++)
{
pvd::MonitorElementPtr elem(new pvd::MonitorElement(create->createPVStructure(spamtype)));
empty.push_back(elem);
}
} }
virtual ~SpamMonitor() {} virtual ~SpamSource() {}
virtual void freeHighMark(pva::MonitorFIFO *mon, size_t numEmpty) OVERRIDE FINAL
virtual void destroy() OVERRIDE FINAL {(void)stop();}
virtual pvd::Status start() OVERRIDE FINAL
{
{
Guard G(mutex);
running = true;
clientEmpty = true;
}
pushall();
return pvd::Status::Ok;
}
virtual pvd::Status stop() OVERRIDE FINAL
{
{
Guard G(mutex);
running = false;
}
return pvd::Status::Ok;
}
virtual pva::MonitorElementPtr poll() OVERRIDE FINAL
{ {
Guard G(mutex); Guard G(mutex);
pva::MonitorElementPtr ret; for(;numEmpty; numEmpty--)
if(!filled.empty()) {
ret = filled.front();
filled.pop_front();
}
clientEmpty = !ret;
return ret;
}
virtual void release(const pva::MonitorElementPtr& elem) OVERRIDE FINAL
{
if(elem->pvStructurePtr->getField().get()!=spamtype.get())
return;
{ {
Guard G(mutex); val->put(counter);
empty.push_back(elem); if(!mon->tryPost(*cur, changed) && numEmpty!=1) {
} std::cerr<<"spam tryPost() inconsistent "<<numEmpty<<"\n";
pushall(); return;
}
virtual void reportRemoteQueueStatus(pvd::int32 freeElements) OVERRIDE FINAL
{
{
Guard G(mutex);
remoteQueue += freeElements;
}
pushall();
}
void pushall()
{
bool signal;
{
Guard G(mutex);
signal = clientEmpty && filled.empty();
while(!empty.empty() && (!pipeline || remoteQueue>0)) {
pva::MonitorElementPtr elem(empty.front());
pvd::PVIntPtr fld(value->getSubFieldT<pvd::PVInt>("value"));
fld->put(counter++);
elem->pvStructurePtr->copyUnchecked(*value);
elem->changedBitSet->clear();
elem->changedBitSet->set(0);
elem->overrunBitSet->clear();
filled.push_back(elem);
empty.pop_front();
remoteQueue--;
} }
counter++;
signal &= !filled.empty();
if(signal)
clientEmpty = false;
}
if(signal) {
requester_type::shared_pointer req(requester.lock());
if(req)
req->monitorEvent(shared_from_this());
} }
} }
}; };
struct SpamChannel : public pva::Channel, struct SpamChannel : public pva::Channel,
public std::tr1::enable_shared_from_this<SpamChannel> public std::tr1::enable_shared_from_this<SpamChannel>
{ {
@ -226,8 +122,11 @@ struct SpamChannel : public pva::Channel,
virtual pva::Monitor::shared_pointer createMonitor(const pva::MonitorRequester::shared_pointer &requester, virtual pva::Monitor::shared_pointer createMonitor(const pva::MonitorRequester::shared_pointer &requester,
const pvd::PVStructure::shared_pointer &pvRequest) OVERRIDE FINAL const pvd::PVStructure::shared_pointer &pvRequest) OVERRIDE FINAL
{ {
std::tr1::shared_ptr<SpamMonitor> ret(new SpamMonitor(shared_from_this(), requester, pvRequest)); std::tr1::shared_ptr<SpamSource> us(new SpamSource);
requester->monitorConnect(pvd::Status::Ok, ret, spamtype); std::tr1::shared_ptr<pva::MonitorFIFO> ret(new pva::MonitorFIFO(requester, pvRequest, us));
// ret holds strong ref. to us
ret->open(spamtype);
ret->notify();
return ret; return ret;
} }
}; };
@ -277,6 +176,10 @@ struct SpamProvider : public pva::ChannelProvider,
int main(int argc, char *argv[]) { int main(int argc, char *argv[]) {
try { try {
for(int i=1; i<argc; i++) {
if(strcmp("-d", argv[i])==0)
debug = true;
}
std::tr1::shared_ptr<SpamProvider> provider(new SpamProvider("spam")); std::tr1::shared_ptr<SpamProvider> provider(new SpamProvider("spam"));
pva::ServerContext::shared_pointer server(pva::ServerContext::create(pva::ServerContext::Config() pva::ServerContext::shared_pointer server(pva::ServerContext::create(pva::ServerContext::Config()
.provider(provider) .provider(provider)