/** * Copyright - See the COPYRIGHT that is included with this distribution. * pvAccessCPP is distributed subject to a Software License Agreement found * in file LICENSE that is included with this distribution. */ #include #include #include #include #include #if !defined(_WIN32) #include #define USE_SIGNAL #endif #include #include #include #include #include #include #include #include namespace pvd = epics::pvData; namespace pva = epics::pvAccess; namespace { typedef epicsGuard Guard; typedef epicsGuardRelease UnGuard; struct Worker { virtual ~Worker() {} virtual void process(const TestMonitorEvent& event) =0; }; // simple work queue with thread. // moves monitor queue handling off of PVA thread(s) struct WorkQueue : public epicsThreadRunable { epicsMutex mutex; typedef std::tr1::shared_ptr value_type; typedef std::tr1::weak_ptr weak_type; // work queue holds only weak_ptr // so jobs must be kept alive seperately typedef std::deque > queue_t; queue_t queue; epicsEvent event; bool running; pvd::Thread worker; WorkQueue() :running(true) ,worker(pvd::Thread::Config() .name("Monitor handler") .autostart(true) .run(this)) {} ~WorkQueue() {close();} void close() { { Guard G(mutex); running = false; } event.signal(); worker.exitWait(); } void push(const weak_type& cb, const TestMonitorEvent& evt) { bool wake; { Guard G(mutex); if(!running) return; // silently refuse to queue during/after close() wake = queue.empty(); queue.push_back(std::make_pair(cb, evt)); } if(wake) event.signal(); } virtual void run() OVERRIDE FINAL { Guard G(mutex); while(running) { if(queue.empty()) { UnGuard U(G); event.wait(); } else { queue_t::value_type ent(queue.front()); value_type cb(ent.first.lock()); queue.pop_front(); if(!cb) continue; try { UnGuard U(G); cb->process(ent.second); }catch(std::exception& e){ std::cout<<"Error in monitor handler : "< { POINTER_DEFINITIONS(MonTracker); MonTracker(const std::string& name) :name(name) {} virtual ~MonTracker() {} const std::string name; TestMonitor mon; virtual void monitorEvent(const TestMonitorEvent& evt) OVERRIDE FINAL { // running on internal provider worker thread // minimize work here. // TODO: bound queue size monwork.push(shared_from_this(), evt); } virtual void process(const TestMonitorEvent& evt) OVERRIDE FINAL { // running on our worker thread switch(evt.event) { case TestMonitorEvent::Fail: std::cout<<"Error "<getSubField("value")); if(!fld) fld = mon.root; std::cout<<"Event "< pvs_t; pvs_t pvs; for(int i=1; i(argv[++i]); } else { std::cout << "--timeout requires value\n"; return 1; } } else if(strcmp(argv[i], "-r")==0 || strcmp(argv[i], "--request")==0) { if(i monitors; for(pvs_t::const_iterator it=pvs.begin(); it!=pvs.end(); ++it) { const std::string& pv = *it; MonTracker::shared_pointer mon(new MonTracker(pv)); TestClientChannel chan(provider.connect(pv)); mon->mon = chan.monitor(mon.get()); monitors.push_back(mon); } if(waitTime<0.0) done.wait(); else done.wait(waitTime); } catch(std::exception& e){ std::cout<<"Error: "<