#include #include #include #include #include #if !defined(_WIN32) #include #define USE_SIGNAL #endif #include #include #include #include #include #include #include #include namespace pvd = epics::pvData; namespace pva = epics::pvAccess; namespace { typedef epicsGuard Guard; typedef epicsGuardRelease UnGuard; // simple work queue with thread. // moves monitor queue handling off of PVA thread(s) struct WorkQueue : public epicsThreadRunable { epicsMutex mutex; typedef std::tr1::shared_ptr value_t; // work queue holds only weak_ptr // so jobs must be kept alive seperately typedef std::deque > queue_t; queue_t queue; epicsEvent event; bool running; pvd::Thread worker; WorkQueue() :running(true) ,worker(pvd::Thread::Config() .name("Monitor handler") .autostart(true) .run(this)) {} ~WorkQueue() {close();} void close() { { Guard G(mutex); running = false; } event.signal(); worker.exitWait(); } void push(const queue_t::value_type& v) { bool wake; { Guard G(mutex); if(!running) return; // silently refuse to queue during/after close() wake = queue.empty(); queue.push_back(v); } if(wake) event.signal(); } virtual void run() { Guard G(mutex); while(running) { if(queue.empty()) { UnGuard U(G); event.wait(); } else { value_t ent(queue.front().lock()); queue.pop_front(); if(!ent) continue; try { UnGuard U(G); ent->run(); }catch(std::exception& e){ std::cout<<"Error in monitor handler : "<chan->getChannelName()<<" "<alldone) { Guard G(self->mutex); if(!self->op) { // called during createMonitor() self->op = monitor; } // store type info // also serves as "connected" flag self->cur_type = structure; // use 'monitor' arg as owner->mon may not be assigned yet pvd::Status msts(monitor->start()); std::cout<<"monitorConnect "<chan->getChannelName()<<" start "<mutex); std::cout<<"channelDisconnect "<chan->getChannelName()<<"\n"; self->cur_type.reset(); self->alldone |= destroy; // no need to call self->op->stop() // monitor implicitly stopped on disconnect pvd::Status msts(self->op->stop()); } virtual void monitorEvent(pva::MonitorPtr const & monitor) { MonTracker::shared_pointer self(owner.lock()); if(!self) return; std::cout<<"monitorEvent "<chan->getChannelName()<<"\n"; { Guard G(self->mutex); if(self->queued) return; self->queued = true; } try { monwork.push(owner); }catch(std::exception& e){ Guard G(self->mutex); self->queued = false; std::cout<<"monitorEvent failed to queue "< req; epicsMutex mutex; pvd::StructureConstPtr cur_type; bool alldone; bool queued; MonTracker() :alldone(false), queued(false) {} virtual ~MonTracker() {} virtual void run() { { Guard G(mutex); queued = false; } while(true) { pva::MonitorElementPtr elem(op->poll()); if(!elem) break; try { std::cout<<"Event "<getChannelName()<<"\n"<pvStructurePtr<<"\n"; } catch(...) { op->release(elem); throw; } op->release(elem); } } }; } // namespace int main(int argc, char *argv[]) { try { double waitTime = -1.0; std::string providerName("pva"); typedef std::vector pvs_t; pvs_t pvs; for(int i=1; i(argv[++i]); } else { std::cout << "--timeout requires value\n"; return 1; } } else { std::cout<<"Unknown argument: "<createProvider(providerName, conf)); if(!provider) throw std::logic_error("pva provider not registered"); std::vector monitors; for(pvs_t::const_iterator it=pvs.begin(); it!=pvs.end(); ++it) { const std::string& pv = *it; MonTracker::shared_pointer mon(new MonTracker); mon->req.reset(new MonTracker::Req(mon)); pva::Channel::shared_pointer chan(provider->createChannel(pv)); { Guard G(mon->mutex); mon->chan = chan; } pva::Monitor::shared_pointer M(chan->createMonitor(mon->req, pvReq)); // monitorConnect may already be called { Guard G(mon->mutex); assert(!mon->op || mon->op==M); mon->op = M; } monitors.push_back(mon); } if(waitTime<0.0) done.wait(); else done.wait(waitTime); } catch(std::exception& e){ std::cout<<"Error: "<