From 1f5c0eedcb4a91f279a685665242c074ba72a86f Mon Sep 17 00:00:00 2001 From: Michael Davidsaver Date: Sun, 26 Aug 2018 18:19:47 +0200 Subject: [PATCH] MonitorFIFO: open() handle compute() error internally Introduce Error state when open() fails due to invalid pvRequest mapping. --- src/client/monitor.cpp | 77 ++++++++++++++++++------------ src/client/pv/monitor.h | 9 +++- testApp/remote/testmonitorfifo.cpp | 37 ++++++++++++-- 3 files changed, 89 insertions(+), 34 deletions(-) diff --git a/src/client/monitor.cpp b/src/client/monitor.cpp index eaa8784..ba85755 100644 --- a/src/client/monitor.cpp +++ b/src/client/monitor.cpp @@ -41,8 +41,8 @@ MonitorFIFO::MonitorFIFO(const std::tr1::shared_ptr &requester ,requester(requester) ,pvRequest(pvRequest) ,upstream(source) + ,state(Closed) ,pipeline(false) - ,opened(false) ,running(false) ,finished(false) ,needConnected(false) @@ -112,7 +112,13 @@ void MonitorFIFO::show(std::ostream& strm) const Guard G(mutex); - strm<<" open="<createPVStructure(type), *pvRequest, conf.mapperMode); - message = mapper.warnings(); + try { + mapper.compute(*create->createPVStructure(type), *pvRequest, conf.mapperMode); + message = mapper.warnings(); - while(empty.size() < conf.actualCount+1) { - MonitorElementPtr elem(new MonitorElement(mapper.buildRequested())); - empty.push_back(elem); + while(empty.size() < conf.actualCount+1) { + MonitorElementPtr elem(new MonitorElement(mapper.buildRequested())); + empty.push_back(elem); + } + + state = Opened; + error = pvd::Status(); // ok + + assert(inuse.empty()); + assert(empty.size()>=2); + assert(returned.empty()); + assert(conf.actualCount>=1); + + }catch(std::runtime_error& e){ + // error from compute() + error = pvd::Status::error(e.what()); + state = Error; } - - opened = true; needConnected = true; - - assert(inuse.empty()); - assert(empty.size()>=2); - assert(returned.empty()); - assert(conf.actualCount>=1); } if(message.empty()) return; requester_type::shared_pointer req(requester.lock()); @@ -176,23 +190,20 @@ void MonitorFIFO::open(const pvd::StructureConstPtr& type) void MonitorFIFO::close() { Guard G(mutex); - if(!opened) - return; // no-op - - opened = false; - needClosed = true; + needClosed = state==Opened; + state = Closed; } void MonitorFIFO::finish() { Guard G(mutex); - if(!opened) - throw std::logic_error("Can not finish() a closed Monitor"); + if(state==Closed) + throw std::logic_error("Can not finish() a closed Monitor"); else if(finished) return; // no-op finished = true; - if(inuse.empty() && running) + if(inuse.empty() && running && state==Opened) needUnlisten = true; } @@ -203,7 +214,8 @@ bool MonitorFIFO::tryPost(const pvData::PVStructure& value, { Guard G(mutex); - assert(opened && !finished); + assert(state!=Closed && !finished); + if(state!=Opened) return false; // when Error, act as always "full" assert(!empty.empty() || !inuse.empty()); const bool havefree = _freeCount()>0u; @@ -251,7 +263,8 @@ void MonitorFIFO::post(const pvData::PVStructure& value, { Guard G(mutex); - assert(opened && !finished); + assert(state!=Closed && !finished); + if(state!=Opened) return; assert(!empty.empty() || !inuse.empty()); const bool use_empty = !empty.empty(); @@ -313,6 +326,7 @@ void MonitorFIFO::notify() evt = false, unl = false, clo = false; + pvd::Status err; { Guard G(mutex); @@ -321,19 +335,22 @@ void MonitorFIFO::notify() std::swap(evt, needEvent); std::swap(unl, needUnlisten); std::swap(clo, needClosed); + std::swap(err, error); if(conn | evt | unl | clo) { req = requester.lock(); self = shared_from_this(); } - if(conn) - type = (!inuse.empty() ? inuse.front() : empty.back())->pvStructurePtr->getStructure(); + if(conn && err.isSuccess()) + type = mapper.requested(); } if(!req) return; - if(conn) + if(conn && err.isSuccess()) req->monitorConnect(pvd::Status(), self, type); + else if(conn) + req->monitorConnect(err, self, type); if(evt) req->monitorEvent(self); if(unl) @@ -350,10 +367,10 @@ pvd::Status MonitorFIFO::start() { Guard G(mutex); - if(!opened) + if(state==Closed) throw std::logic_error("Monitor can't start() before open()"); - if(running) + if(running || state!=Opened) return pvd::Status(); if(!inuse.empty()) { diff --git a/src/client/pv/monitor.h b/src/client/pv/monitor.h index 5a9ef2f..b6ed4ac 100644 --- a/src/client/pv/monitor.h +++ b/src/client/pv/monitor.h @@ -353,6 +353,7 @@ private: // -> MonitorRequester::monitorEvent() // -> MonitorRequester::unlisten() // -> ChannelBaseRequester::channelDisconnect() + // start() -> MonitorRequester::monitorEvent() // release() -> Source::freeHighMark() // -> notify() -> ... // reportRemoteQueueStatus() -> Source::freeHighMark() @@ -375,8 +376,12 @@ private: // and expect that upstream will have only a weak ref to us. const Source::shared_pointer upstream; + enum state_t { + Closed, // not open()'d + Opened, // successful open() + Error, // unsuccessful open() + } state; bool pipeline; // const after ctor - bool opened; // open() vs. close() bool running; // start() vs. stop() bool finished; // finish() called epics::pvData::BitSet scratch, oscratch; // using during post to avoid re-alloc @@ -386,6 +391,8 @@ private: bool needUnlisten; bool needClosed; + epics::pvData::Status error; // Set when entering Error state + size_t freeHighLevel; epicsInt32 flowCount; diff --git a/testApp/remote/testmonitorfifo.cpp b/testApp/remote/testmonitorfifo.cpp index 034664e..7696338 100644 --- a/testApp/remote/testmonitorfifo.cpp +++ b/testApp/remote/testmonitorfifo.cpp @@ -30,6 +30,7 @@ struct Tester { // we only have one thread, so no need for sync. enum cb_t { Connect, + ConnectError, Event, Unlisten, Close, @@ -39,6 +40,7 @@ struct Tester { switch(cb) { #define CASE(NAME) case NAME: return #NAME CASE(Connect); + CASE(ConnectError); CASE(Event); CASE(Unlisten); CASE(Close); @@ -64,9 +66,12 @@ struct Tester { } virtual void monitorConnect(epics::pvData::Status const & status, pva::MonitorPtr const & monitor, epics::pvData::StructureConstPtr const & structure) OVERRIDE FINAL { - testDiag("In %s", CURRENT_FUNCTION); + testDiag("In %s : %s", CURRENT_FUNCTION, status.isSuccess() ? "OK" : status.getMessage().c_str()); Guard G(mutex); - Tester::timeline.push_back(Connect); + if(status.isSuccess()) + Tester::timeline.push_back(Connect); + else + Tester::timeline.push_back(ConnectError); } virtual void monitorEvent(pva::MonitorPtr const & monitor) OVERRIDE FINAL { testDiag("In %s", CURRENT_FUNCTION); @@ -763,11 +768,36 @@ void checkCountdown() tester.testTimeline({Tester::Close}); } +void checkBadRequest() +{ + testDiag("==== %s ====", CURRENT_FUNCTION); + pva::MonitorFIFO::Config conf; + conf.maxCount=4; + conf.defCount=3; + Tester tester(pvd::createRequest("field(invalid)"), &conf); + + tester.connect(pvd::pvInt); + tester.mon->notify(); + tester.testTimeline({Tester::ConnectError}); + + // when in Error, all are no-op + tester.post(15); + tester.tryPost(4, false); + tester.tryPost(5, false, true); + tester.mon->finish(); + + tester.mon->notify(); + tester.testTimeline({}); // nothing happens + + tester.close(); + tester.testTimeline({}); +} + } // namespace MAIN(testmonitorfifo) { - testPlan(184); + testPlan(189); checkPlain(); checkAfterClose(); checkReOpenLost(); @@ -777,6 +807,7 @@ MAIN(testmonitorfifo) checkPipeline(); checkSpam(); checkCountdown(); + checkBadRequest(); return testDone(); }