MonitorFIFO: open() handle compute() error internally

Introduce Error state when open() fails due to invalid
pvRequest mapping.
This commit is contained in:
Michael Davidsaver
2018-08-26 18:19:47 +02:00
parent 643f7e47c8
commit 1f5c0eedcb
3 changed files with 89 additions and 34 deletions

View File

@ -41,8 +41,8 @@ MonitorFIFO::MonitorFIFO(const std::tr1::shared_ptr<MonitorRequester> &requester
,requester(requester)
,pvRequest(pvRequest)
,upstream(source)
,state(Closed)
,pipeline(false)
,opened(false)
,running(false)
,finished(false)
,needConnected(false)
@ -112,7 +112,13 @@ void MonitorFIFO::show(std::ostream& strm) const
Guard G(mutex);
strm<<" open="<<opened<<" running="<<running<<" finished="<<finished<<"\n";
switch(state) {
case Closed: strm<<" Closed"; break;
case Opened: strm<<" Opened"; break;
case Error: strm<<" Error:"<<error; break;
}
strm<<" running="<<running<<" finished="<<finished<<"\n";
strm<<" #empty="<<empty.size()<<" #returned="<<returned.size()<<" #inuse="<<inuse.size()<<" flowCount="<<flowCount<<"\n";
strm<<" events "<<(needConnected?'C':'_')<<(needEvent?'E':'_')<<(needUnlisten?'U':'_')<<(needClosed?'X':'_')
<<"\n";
@ -134,7 +140,7 @@ void MonitorFIFO::open(const pvd::StructureConstPtr& type)
{
Guard G(mutex);
if(opened)
if(state!=Closed)
throw std::logic_error("Monitor already open. Must close() before re-openning");
else if(needClosed)
throw std::logic_error("Monitor needs notify() between close() and open().");
@ -150,21 +156,29 @@ void MonitorFIFO::open(const pvd::StructureConstPtr& type)
// fill up empty.
pvd::PVDataCreatePtr create(pvd::getPVDataCreate());
mapper.compute(*create->createPVStructure(type), *pvRequest, conf.mapperMode);
message = mapper.warnings();
try {
mapper.compute(*create->createPVStructure(type), *pvRequest, conf.mapperMode);
message = mapper.warnings();
while(empty.size() < conf.actualCount+1) {
MonitorElementPtr elem(new MonitorElement(mapper.buildRequested()));
empty.push_back(elem);
while(empty.size() < conf.actualCount+1) {
MonitorElementPtr elem(new MonitorElement(mapper.buildRequested()));
empty.push_back(elem);
}
state = Opened;
error = pvd::Status(); // ok
assert(inuse.empty());
assert(empty.size()>=2);
assert(returned.empty());
assert(conf.actualCount>=1);
}catch(std::runtime_error& e){
// error from compute()
error = pvd::Status::error(e.what());
state = Error;
}
opened = true;
needConnected = true;
assert(inuse.empty());
assert(empty.size()>=2);
assert(returned.empty());
assert(conf.actualCount>=1);
}
if(message.empty()) return;
requester_type::shared_pointer req(requester.lock());
@ -176,23 +190,20 @@ void MonitorFIFO::open(const pvd::StructureConstPtr& type)
void MonitorFIFO::close()
{
Guard G(mutex);
if(!opened)
return; // no-op
opened = false;
needClosed = true;
needClosed = state==Opened;
state = Closed;
}
void MonitorFIFO::finish()
{
Guard G(mutex);
if(!opened)
throw std::logic_error("Can not finish() a closed Monitor");
if(state==Closed)
throw std::logic_error("Can not finish() a closed Monitor");
else if(finished)
return; // no-op
finished = true;
if(inuse.empty() && running)
if(inuse.empty() && running && state==Opened)
needUnlisten = true;
}
@ -203,7 +214,8 @@ bool MonitorFIFO::tryPost(const pvData::PVStructure& value,
{
Guard G(mutex);
assert(opened && !finished);
assert(state!=Closed && !finished);
if(state!=Opened) return false; // when Error, act as always "full"
assert(!empty.empty() || !inuse.empty());
const bool havefree = _freeCount()>0u;
@ -251,7 +263,8 @@ void MonitorFIFO::post(const pvData::PVStructure& value,
{
Guard G(mutex);
assert(opened && !finished);
assert(state!=Closed && !finished);
if(state!=Opened) return;
assert(!empty.empty() || !inuse.empty());
const bool use_empty = !empty.empty();
@ -313,6 +326,7 @@ void MonitorFIFO::notify()
evt = false,
unl = false,
clo = false;
pvd::Status err;
{
Guard G(mutex);
@ -321,19 +335,22 @@ void MonitorFIFO::notify()
std::swap(evt, needEvent);
std::swap(unl, needUnlisten);
std::swap(clo, needClosed);
std::swap(err, error);
if(conn | evt | unl | clo) {
req = requester.lock();
self = shared_from_this();
}
if(conn)
type = (!inuse.empty() ? inuse.front() : empty.back())->pvStructurePtr->getStructure();
if(conn && err.isSuccess())
type = mapper.requested();
}
if(!req)
return;
if(conn)
if(conn && err.isSuccess())
req->monitorConnect(pvd::Status(), self, type);
else if(conn)
req->monitorConnect(err, self, type);
if(evt)
req->monitorEvent(self);
if(unl)
@ -350,10 +367,10 @@ pvd::Status MonitorFIFO::start()
{
Guard G(mutex);
if(!opened)
if(state==Closed)
throw std::logic_error("Monitor can't start() before open()");
if(running)
if(running || state!=Opened)
return pvd::Status();
if(!inuse.empty()) {

View File

@ -353,6 +353,7 @@ private:
// -> MonitorRequester::monitorEvent()
// -> MonitorRequester::unlisten()
// -> ChannelBaseRequester::channelDisconnect()
// start() -> MonitorRequester::monitorEvent()
// release() -> Source::freeHighMark()
// -> notify() -> ...
// reportRemoteQueueStatus() -> Source::freeHighMark()
@ -375,8 +376,12 @@ private:
// and expect that upstream will have only a weak ref to us.
const Source::shared_pointer upstream;
enum state_t {
Closed, // not open()'d
Opened, // successful open()
Error, // unsuccessful open()
} state;
bool pipeline; // const after ctor
bool opened; // open() vs. close()
bool running; // start() vs. stop()
bool finished; // finish() called
epics::pvData::BitSet scratch, oscratch; // using during post to avoid re-alloc
@ -386,6 +391,8 @@ private:
bool needUnlisten;
bool needClosed;
epics::pvData::Status error; // Set when entering Error state
size_t freeHighLevel;
epicsInt32 flowCount;

View File

@ -30,6 +30,7 @@ struct Tester {
// we only have one thread, so no need for sync.
enum cb_t {
Connect,
ConnectError,
Event,
Unlisten,
Close,
@ -39,6 +40,7 @@ struct Tester {
switch(cb) {
#define CASE(NAME) case NAME: return #NAME
CASE(Connect);
CASE(ConnectError);
CASE(Event);
CASE(Unlisten);
CASE(Close);
@ -64,9 +66,12 @@ struct Tester {
}
virtual void monitorConnect(epics::pvData::Status const & status,
pva::MonitorPtr const & monitor, epics::pvData::StructureConstPtr const & structure) OVERRIDE FINAL {
testDiag("In %s", CURRENT_FUNCTION);
testDiag("In %s : %s", CURRENT_FUNCTION, status.isSuccess() ? "OK" : status.getMessage().c_str());
Guard G(mutex);
Tester::timeline.push_back(Connect);
if(status.isSuccess())
Tester::timeline.push_back(Connect);
else
Tester::timeline.push_back(ConnectError);
}
virtual void monitorEvent(pva::MonitorPtr const & monitor) OVERRIDE FINAL {
testDiag("In %s", CURRENT_FUNCTION);
@ -763,11 +768,36 @@ void checkCountdown()
tester.testTimeline({Tester::Close});
}
void checkBadRequest()
{
testDiag("==== %s ====", CURRENT_FUNCTION);
pva::MonitorFIFO::Config conf;
conf.maxCount=4;
conf.defCount=3;
Tester tester(pvd::createRequest("field(invalid)"), &conf);
tester.connect(pvd::pvInt);
tester.mon->notify();
tester.testTimeline({Tester::ConnectError});
// when in Error, all are no-op
tester.post(15);
tester.tryPost(4, false);
tester.tryPost(5, false, true);
tester.mon->finish();
tester.mon->notify();
tester.testTimeline({}); // nothing happens
tester.close();
tester.testTimeline({});
}
} // namespace
MAIN(testmonitorfifo)
{
testPlan(184);
testPlan(189);
checkPlain();
checkAfterClose();
checkReOpenLost();
@ -777,6 +807,7 @@ MAIN(testmonitorfifo)
checkPipeline();
checkSpam();
checkCountdown();
checkBadRequest();
return testDone();
}