diff --git a/examples/epicschat.cpp b/examples/epicschat.cpp index 950799b..d79a28e 100644 --- a/examples/epicschat.cpp +++ b/examples/epicschat.cpp @@ -51,17 +51,17 @@ struct ChatHandler : public pvas::SharedPV::Handler virtual ~ChatHandler() { printf("Cleanup Room\n"); } - virtual void onLastDisconnect(pvas::SharedPV& self) { + virtual void onLastDisconnect(const pvas::SharedPV::shared_pointer& self) OVERRIDE FINAL { printf("Close Room %p\n", &self); } - virtual void onPut(pvas::SharedPV& self, pvas::Operation& op) { + virtual void onPut(const pvas::SharedPV::shared_pointer& self, pvas::Operation& op) OVERRIDE FINAL { pva::ChannelRequester::shared_pointer req(op.getChannel()->getChannelRequester()); std::ostringstream strm; if(req) { strm<getRequesterName()<<" says "; } else { - op.complete(pvd::Status::error("Defuct Put")); + op.complete(pvd::Status::error("Defunct Put")); return; } @@ -71,7 +71,7 @@ struct ChatHandler : public pvas::SharedPV::Handler replacement->getSubFieldT("value")->put(strm.str()); - self.post(*replacement, op.changed()); + self->post(*replacement, op.changed()); op.complete(); } }; diff --git a/examples/getme.cpp b/examples/getme.cpp index fdf2632..3e65d21 100644 --- a/examples/getme.cpp +++ b/examples/getme.cpp @@ -60,7 +60,7 @@ struct Getter : public pvac::ClientChannel::GetCallback, op.cancel(); } - virtual void getDone(const pvac::GetEvent& event) + virtual void getDone(const pvac::GetEvent& event) OVERRIDE FINAL { switch(event.event) { case pvac::GetEvent::Fail: @@ -78,7 +78,7 @@ struct Getter : public pvac::ClientChannel::GetCallback, } } - virtual void connectEvent(const pvac::ConnectEvent& evt) + virtual void connectEvent(const pvac::ConnectEvent& evt) OVERRIDE FINAL { if(evt.connected) { op = channel.get(this); diff --git a/pvtoolsSrc/pvget.cpp b/pvtoolsSrc/pvget.cpp index fe6c878..cdae143 100644 --- a/pvtoolsSrc/pvget.cpp +++ b/pvtoolsSrc/pvget.cpp @@ -216,6 +216,7 @@ struct ChannelGetRequesterImpl : public ChannelGetRequester, public Tracker { std::cerr << "[" << m_channelName << "] failed to get: " << status << '\n'; } + std::cout.flush(); done(); } @@ -325,6 +326,7 @@ struct MonitorRequesterImpl : public MonitorRequester, public Tracker myos << *(element->pvStructurePtr.get()) << "\n\n"; } + std::cout.flush(); } } diff --git a/pvtoolsSrc/pvput.cpp b/pvtoolsSrc/pvput.cpp index 98efae1..3635e76 100644 --- a/pvtoolsSrc/pvput.cpp +++ b/pvtoolsSrc/pvput.cpp @@ -175,6 +175,7 @@ void printValue(std::string const & channelName, PVStructure::const_shared_point terseStructure(std::cout, pv) << std::endl; else std::cout << std::endl << *(pv.get()) << std::endl << std::endl; + std::cout.flush(); } void early(const char *inp, unsigned pos) diff --git a/src/client/monitor.cpp b/src/client/monitor.cpp index 1514955..ba85755 100644 --- a/src/client/monitor.cpp +++ b/src/client/monitor.cpp @@ -26,7 +26,8 @@ MonitorFIFO::Config::Config() :maxCount(4) ,defCount(4) ,actualCount(0) // readback - ,ignoreRequestMask(false) + ,dropEmptyUpdates(true) + ,mapperMode(pvd::PVRequestMapper::Mask) {} size_t MonitorFIFO::num_instances; @@ -40,8 +41,8 @@ MonitorFIFO::MonitorFIFO(const std::tr1::shared_ptr &requester ,requester(requester) ,pvRequest(pvRequest) ,upstream(source) + ,state(Closed) ,pipeline(false) - ,opened(false) ,running(false) ,finished(false) ,needConnected(false) @@ -111,7 +112,13 @@ void MonitorFIFO::show(std::ostream& strm) const Guard G(mutex); - strm<<" open="<createPVStructure(type))); - empty.push_back(elem); - } - opened = true; + try { + mapper.compute(*create->createPVStructure(type), *pvRequest, conf.mapperMode); + message = mapper.warnings(); + + while(empty.size() < conf.actualCount+1) { + MonitorElementPtr elem(new MonitorElement(mapper.buildRequested())); + empty.push_back(elem); + } + + state = Opened; + error = pvd::Status(); // ok + + assert(inuse.empty()); + assert(empty.size()>=2); + assert(returned.empty()); + assert(conf.actualCount>=1); + + }catch(std::runtime_error& e){ + // error from compute() + error = pvd::Status::error(e.what()); + state = Error; + } needConnected = true; - this->type = type; - - if(conf.ignoreRequestMask) { - selectMask.clear(); - for(size_t i=0, N=empty.back()->pvStructurePtr->getNextFieldOffset(); ipvStructurePtr, - pvRequest->getSubField("field")); - } - emptyselect = selectMask.isEmpty(); - - assert(inuse.empty()); - assert(empty.size()>=2); - assert(returned.empty()); - assert(conf.actualCount>=1); } - if(!emptyselect) return; + if(message.empty()) return; requester_type::shared_pointer req(requester.lock()); if(req) { - req->message("pvRequest with empty field mask", warningMessage); + req->message(message, warningMessage); } } void MonitorFIFO::close() { Guard G(mutex); - if(!opened) - return; // no-op - - opened = false; - needClosed = true; - selectMask.clear(); - type.reset(); + needClosed = state==Opened; + state = Closed; } void MonitorFIFO::finish() { Guard G(mutex); - if(!opened) - throw std::logic_error("Can not finish() a closed Monitor"); + if(state==Closed) + throw std::logic_error("Can not finish() a closed Monitor"); else if(finished) return; // no-op finished = true; - if(inuse.empty() && running) + if(inuse.empty() && running && state==Opened) needUnlisten = true; } @@ -211,17 +214,14 @@ bool MonitorFIFO::tryPost(const pvData::PVStructure& value, { Guard G(mutex); - assert(opened && !finished); + assert(state!=Closed && !finished); + if(state!=Opened) return false; // when Error, act as always "full" assert(!empty.empty() || !inuse.empty()); - // compute effective changed mask for this subscription - scratch = changed; - scratch &= selectMask; - const bool havefree = _freeCount()>0u; MonitorElementPtr elem; - if(!conf.ignoreRequestMask && scratch.isEmpty()) { + if(conf.dropEmptyUpdates && !changed.logical_and(mapper.requestedMask())) { // drop empty update } else if(havefree) { // take an empty element @@ -229,16 +229,16 @@ bool MonitorFIFO::tryPost(const pvData::PVStructure& value, empty.pop_front(); } else if(force) { // allocate an extra element - elem.reset(new MonitorElement(pvd::getPVDataCreate()->createPVStructure(type))); + elem.reset(new MonitorElement(mapper.buildRequested())); } if(elem) { try { - assert(value.getStructure() == elem->pvStructurePtr->getStructure()); - elem->pvStructurePtr->copyUnchecked(value, scratch); - *elem->changedBitSet = scratch; - *elem->overrunBitSet = overrun; - *elem->overrunBitSet &= selectMask; + elem->changedBitSet->clear(); + mapper.copyBaseToRequested(value, changed, + *elem->pvStructurePtr, *elem->changedBitSet); + elem->overrunBitSet->clear(); + mapper.maskBaseToRequested(overrun, *elem->overrunBitSet); if(inuse.empty() && running) needEvent = true; @@ -263,7 +263,8 @@ void MonitorFIFO::post(const pvData::PVStructure& value, { Guard G(mutex); - assert(opened && !finished); + assert(state!=Closed && !finished); + if(state!=Opened) return; assert(!empty.empty() || !inuse.empty()); const bool use_empty = !empty.empty(); @@ -284,19 +285,16 @@ void MonitorFIFO::post(const pvData::PVStructure& value, elem = inuse.back(); } - scratch = changed; - scratch &= selectMask; - - if(!conf.ignoreRequestMask && scratch.isEmpty()) + if(conf.dropEmptyUpdates && !changed.logical_and(mapper.requestedMask())) return; // drop empty update - assert(value.getStructure() == elem->pvStructurePtr->getStructure()); - elem->pvStructurePtr->copyUnchecked(value, scratch); + scratch.clear(); + mapper.copyBaseToRequested(value, changed, *elem->pvStructurePtr, scratch); if(use_empty) { *elem->changedBitSet = scratch; - *elem->overrunBitSet = overrun; - *elem->overrunBitSet &= selectMask; + elem->overrunBitSet->clear(); + mapper.maskBaseToRequested(overrun, *elem->overrunBitSet); if(inuse.empty() && running) needEvent = true; @@ -311,7 +309,9 @@ void MonitorFIFO::post(const pvData::PVStructure& value, // squash elem->overrunBitSet->or_and(*elem->changedBitSet, scratch); *elem->changedBitSet |= scratch; - elem->overrunBitSet->or_and(overrun, selectMask); + oscratch.clear(); + mapper.maskBaseToRequested(overrun, oscratch); + elem->overrunBitSet->or_and(oscratch, scratch); // leave as inuse.back() } @@ -326,6 +326,7 @@ void MonitorFIFO::notify() evt = false, unl = false, clo = false; + pvd::Status err; { Guard G(mutex); @@ -334,19 +335,22 @@ void MonitorFIFO::notify() std::swap(evt, needEvent); std::swap(unl, needUnlisten); std::swap(clo, needClosed); + std::swap(err, error); if(conn | evt | unl | clo) { req = requester.lock(); self = shared_from_this(); } - if(conn) - type = (!inuse.empty() ? inuse.front() : empty.back())->pvStructurePtr->getStructure(); + if(conn && err.isSuccess()) + type = mapper.requested(); } if(!req) return; - if(conn) + if(conn && err.isSuccess()) req->monitorConnect(pvd::Status(), self, type); + else if(conn) + req->monitorConnect(err, self, type); if(evt) req->monitorEvent(self); if(unl) @@ -363,10 +367,10 @@ pvd::Status MonitorFIFO::start() { Guard G(mutex); - if(!opened) + if(state==Closed) throw std::logic_error("Monitor can't start() before open()"); - if(running) + if(running || state!=Opened) return pvd::Status(); if(!inuse.empty()) { diff --git a/src/client/pv/monitor.h b/src/client/pv/monitor.h index 14df362..b6ed4ac 100644 --- a/src/client/pv/monitor.h +++ b/src/client/pv/monitor.h @@ -22,6 +22,7 @@ #include #include #include +#include #ifdef monitorEpicsExportSharedSymbols # define epicsExportSharedSymbols @@ -274,7 +275,8 @@ public: size_t maxCount, //!< upper limit on requested FIFO size defCount, //!< FIFO size when client makes no request actualCount; //!< filled in with actual FIFO size - bool ignoreRequestMask; + bool dropEmptyUpdates; //!< default true. Drop updates which don't include an field values. + epics::pvData::PVRequestMapper::mode_t mapperMode; //!< default Mask. @see epics::pvData::PVRequestMapper::mode_t Config(); }; @@ -351,6 +353,7 @@ private: // -> MonitorRequester::monitorEvent() // -> MonitorRequester::unlisten() // -> ChannelBaseRequester::channelDisconnect() + // start() -> MonitorRequester::monitorEvent() // release() -> Source::freeHighMark() // -> notify() -> ... // reportRemoteQueueStatus() -> Source::freeHighMark() @@ -373,22 +376,27 @@ private: // and expect that upstream will have only a weak ref to us. const Source::shared_pointer upstream; + enum state_t { + Closed, // not open()'d + Opened, // successful open() + Error, // unsuccessful open() + } state; bool pipeline; // const after ctor - bool opened; // open() vs. close() bool running; // start() vs. stop() bool finished; // finish() called - epics::pvData::BitSet selectMask, // set during open() - scratch; // using during post to avoid re-alloc + epics::pvData::BitSet scratch, oscratch; // using during post to avoid re-alloc bool needConnected; bool needEvent; bool needUnlisten; bool needClosed; + epics::pvData::Status error; // Set when entering Error state + size_t freeHighLevel; epicsInt32 flowCount; - epics::pvData::StructureConstPtr type; // NULL if not opened + epics::pvData::PVRequestMapper mapper; typedef std::list buffer_t; // we allocate one extra buffer element to hold data when post() diff --git a/src/remoteClient/clientContextImpl.cpp b/src/remoteClient/clientContextImpl.cpp index bb2c0e1..929b9a0 100644 --- a/src/remoteClient/clientContextImpl.cpp +++ b/src/remoteClient/clientContextImpl.cpp @@ -310,6 +310,8 @@ public: { startRequest(PURE_CANCEL_REQUEST); m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this()); + } catch (std::runtime_error& e) { + // assume from checkAndGetTransport() due to wrong channel state } catch (std::exception& e) { // noop (do not complain if fails) LOG(logLevelWarn, "Ignore exception during ChanneGet::cancel: %s", e.what()); diff --git a/src/server/pva/sharedstate.h b/src/server/pva/sharedstate.h index dd85c24..6d6d19f 100644 --- a/src/server/pva/sharedstate.h +++ b/src/server/pva/sharedstate.h @@ -12,6 +12,7 @@ #include #include #include +#include #include @@ -78,6 +79,12 @@ class epicsShareClass SharedPV friend struct SharedRPC; public: POINTER_DEFINITIONS(SharedPV); + struct epicsShareClass Config { + bool dropEmptyUpdates; //!< default true. Drop updates which don't include an field values. + epics::pvData::PVRequestMapper::mode_t mapperMode; //!< default Mask. @see epics::pvData::PVRequestMapper::mode_t + Config(); + }; + /** Callbacks associated with a SharedPV. * * @note For the purposes of locking, this class is an Requester (see @ref provider_roles_requester_locking) @@ -96,15 +103,16 @@ public: /** Allocate a new PV in the closed state. * @param handler Our callbacks. May be NULL. Stored internally as a shared_ptr<> + * @param conf Optional. Extra configuration. If !NULL, will be modified to reflect configuration actually used. * @post In the closed state */ - static shared_pointer build(const std::tr1::shared_ptr& handler); - //! A SharedPV which fails all Put and RPC operations. - static shared_pointer buildReadOnly(); - //! A SharedPV which accepts all Put operations, and fails all RPC operations. - static shared_pointer buildMailbox(); + static shared_pointer build(const std::tr1::shared_ptr& handler, Config* conf=0); + //! A SharedPV which fails all Put and RPC operations. In closed state. + static shared_pointer buildReadOnly(Config* conf=0); + //! A SharedPV which accepts all Put operations, and fails all RPC operations. In closed state. + static shared_pointer buildMailbox(Config* conf=0); private: - explicit SharedPV(const std::tr1::shared_ptr& handler); + explicit SharedPV(const std::tr1::shared_ptr& handler, Config* conf); public: virtual ~SharedPV(); @@ -119,7 +127,7 @@ public: void open(const epics::pvData::PVStructure& value); //! Begin allowing clients to connect. - //! @param value The initial value of this PV. (any pending Get operation will complete this this) + //! @param value The initial value of this PV. (any pending Get/Monitor operation will complete with this) //! @param valid Only these marked fields are considered to have non-default values. //! @throws std::logic_error if not in the closed state. //! @post In the opened state @@ -169,6 +177,8 @@ private: weak_pointer internal_self; // const after build() + const Config config; + mutable epicsMutex mutex; std::tr1::shared_ptr handler; @@ -192,6 +202,8 @@ private: //! Used for initial Monitor update and Get operations. epics::pvData::BitSet valid; + bool notifiedConn; // whether onFirstConnect() has been, or is being, called + int debugLvl; EPICS_NOT_COPYABLE(SharedPV) diff --git a/src/server/server.cpp b/src/server/server.cpp index 138da51..ba4867e 100644 --- a/src/server/server.cpp +++ b/src/server/server.cpp @@ -118,7 +118,7 @@ StaticProvider::StaticProvider(const std::string &name) impl->external_self = impl; } -StaticProvider::~StaticProvider() { close(); } +StaticProvider::~StaticProvider() { close(true); } void StaticProvider::close(bool destroy) { diff --git a/src/server/sharedstate_channel.cpp b/src/server/sharedstate_channel.cpp index 0de195a..b5912e7 100644 --- a/src/server/sharedstate_channel.cpp +++ b/src/server/sharedstate_channel.cpp @@ -51,6 +51,7 @@ SharedChannel::SharedChannel(const std::tr1::shared_ptr &owner, if(owner->channels.empty()) handler = owner->handler; owner->channels.push_back(this); + owner->notifiedConn = !!handler; } if(handler) { handler->onFirstConnect(owner); @@ -62,10 +63,11 @@ SharedChannel::~SharedChannel() std::tr1::shared_ptr handler; { Guard G(owner->mutex); + bool wasempty = owner->channels.empty(); owner->channels.remove(this); - if(owner->channels.empty()) { - Guard G(owner->mutex); + if(!wasempty && owner->channels.empty() && owner->notifiedConn) { handler = owner->handler; + owner->notifiedConn = false; } } if(handler) { @@ -74,7 +76,7 @@ SharedChannel::~SharedChannel() if(owner->debugLvl>5) { pva::ChannelRequester::shared_pointer req(requester.lock()); - errlogPrintf("%s : Open channel to %s > %p\n", + errlogPrintf("%s : Close channel to %s > %p\n", req ? req->getRequesterName().c_str() : "", channelName.c_str(), this); @@ -126,14 +128,27 @@ pva::ChannelPut::shared_pointer SharedChannel::createChannelPut( std::tr1::shared_ptr ret(new SharedPut(shared_from_this(), requester, pvRequest)); pvd::StructureConstPtr type; - { - Guard G(owner->mutex); - // ~SharedPut removes - owner->puts.push_back(ret.get()); - type = owner->type; + std::string warning; + try { + { + Guard G(owner->mutex); + // ~SharedPut removes + owner->puts.push_back(ret.get()); + if(owner->current) { + ret->mapper.compute(*owner->current, *pvRequest, owner->config.mapperMode); + type = ret->mapper.requested(); + warning = ret->mapper.warnings(); + } + } + if(!warning.empty()) + requester->message(warning, pvd::warningMessage); + if(type) + requester->channelPutConnect(pvd::Status(), ret, type); + }catch(std::runtime_error& e){ + ret.reset(); + type.reset(); + requester->channelPutConnect(pvd::Status::error(e.what()), ret, type); } - if(type) - requester->channelPutConnect(pvd::Status(), ret, type); return ret; } @@ -157,7 +172,10 @@ pva::Monitor::shared_pointer SharedChannel::createMonitor( pva::MonitorRequester::shared_pointer const & requester, pvd::PVStructure::shared_pointer const & pvRequest) { - std::tr1::shared_ptr ret(new SharedMonitorFIFO(shared_from_this(), requester, pvRequest)); + SharedMonitorFIFO::Config mconf; + mconf.dropEmptyUpdates = owner->config.dropEmptyUpdates; + mconf.mapperMode = owner->config.mapperMode; + std::tr1::shared_ptr ret(new SharedMonitorFIFO(shared_from_this(), requester, pvRequest, &mconf)); bool notify; { Guard G(owner->mutex); @@ -177,8 +195,9 @@ pva::Monitor::shared_pointer SharedChannel::createMonitor( SharedMonitorFIFO::SharedMonitorFIFO(const std::tr1::shared_ptr& channel, const requester_type::shared_pointer& requester, - const pvd::PVStructure::const_shared_pointer &pvRequest) - :pva::MonitorFIFO(requester, pvRequest) + const pvd::PVStructure::const_shared_pointer &pvRequest, + Config *conf) + :pva::MonitorFIFO(requester, pvRequest, pva::MonitorFIFO::Source::shared_pointer(), conf) ,channel(channel) {} diff --git a/src/server/sharedstate_put.cpp b/src/server/sharedstate_put.cpp index 29db288..f61f3b3 100644 --- a/src/server/sharedstate_put.cpp +++ b/src/server/sharedstate_put.cpp @@ -105,12 +105,26 @@ void SharedPut::put( pvd::BitSet::shared_pointer const & putBitSet) { std::tr1::shared_ptr handler; + pvd::PVStructure::shared_pointer realval; + pvd::BitSet changed; { Guard G(channel->owner->mutex); + + if(pvPutStructure->getStructure()!=mapper.requested()) { + requester_type::shared_pointer req(requester.lock()); + if(req) + req->putDone(pvd::Status::error("Type changed"), shared_from_this()); + return; + } + handler = channel->owner->handler; + + realval = mapper.buildBase(); + + mapper.copyBaseFromRequested(*realval, changed, *pvPutStructure, *putBitSet); } - std::tr1::shared_ptr impl(new PutOP(shared_from_this(), pvRequest, pvPutStructure, *putBitSet), + std::tr1::shared_ptr impl(new PutOP(shared_from_this(), pvRequest, realval, changed), Operation::Impl::Cleanup()); if(handler) { @@ -130,20 +144,13 @@ void SharedPut::get() Guard G(channel->owner->mutex); if(channel->owner->current) { - const pvd::StructureConstPtr& currentType = channel->owner->current->getStructure(); + assert(!!mapper.requested()); - current = pvd::getPVDataCreate()->createPVStructure(currentType); + current = mapper.buildRequested(); + changed.reset(new pvd::BitSet); - if(currentType!=lastStruct) { - selectMask = pvd::extractRequestMask(current, pvRequest->getSubField("field")); - emptyselect = selectMask.isEmpty(); - lastStruct = currentType; - } - changed.reset(new pvd::BitSet(channel->owner->valid)); - *changed &= selectMask; - - // clone - current->copyUnchecked(*channel->owner->current, *changed); + mapper.copyBaseToRequested(*channel->owner->current, channel->owner->valid, + *current, *changed); } } diff --git a/src/server/sharedstate_pv.cpp b/src/server/sharedstate_pv.cpp index 082168a..17f45f0 100644 --- a/src/server/sharedstate_pv.cpp +++ b/src/server/sharedstate_pv.cpp @@ -41,39 +41,47 @@ struct MailboxHandler : public pvas::SharedPV::Handler { namespace pvas { +SharedPV::Config::Config() + :dropEmptyUpdates(true) + ,mapperMode(pvd::PVRequestMapper::Mask) +{} + size_t SharedPV::num_instances; -SharedPV::shared_pointer SharedPV::build(const std::tr1::shared_ptr& handler) +SharedPV::shared_pointer SharedPV::build(const std::tr1::shared_ptr& handler, Config *conf) { assert(!!handler); - SharedPV::shared_pointer ret(new SharedPV(handler)); + SharedPV::shared_pointer ret(new SharedPV(handler, conf)); ret->internal_self = ret; return ret; } -SharedPV::shared_pointer SharedPV::buildReadOnly() +SharedPV::shared_pointer SharedPV::buildReadOnly(Config *conf) { - SharedPV::shared_pointer ret(new SharedPV(std::tr1::shared_ptr())); + SharedPV::shared_pointer ret(new SharedPV(std::tr1::shared_ptr(), conf)); ret->internal_self = ret; return ret; } -SharedPV::shared_pointer SharedPV::buildMailbox() +SharedPV::shared_pointer SharedPV::buildMailbox(pvas::SharedPV::Config *conf) { std::tr1::shared_ptr handler(new MailboxHandler); - SharedPV::shared_pointer ret(new SharedPV(handler)); + SharedPV::shared_pointer ret(new SharedPV(handler, conf)); ret->internal_self = ret; return ret; } -SharedPV::SharedPV(const std::tr1::shared_ptr &handler) - :handler(handler) +SharedPV::SharedPV(const std::tr1::shared_ptr &handler, pvas::SharedPV::Config *conf) + :config(conf ? *conf : Config()) + ,handler(handler) + ,notifiedConn(false) ,debugLvl(0) { REFTRACE_INCREMENT(num_instances); } SharedPV::~SharedPV() { + close(); REFTRACE_DECREMENT(num_instances); } @@ -96,9 +104,26 @@ bool SharedPV::isOpen() const return !!type; } +namespace { +struct PutInfo { // oh to be able to use std::tuple ... + std::tr1::shared_ptr put; + pvd::StructureConstPtr type; + pvd::Status status; + PutInfo(const std::tr1::shared_ptr& put, const pvd::StructureConstPtr& type, const pvd::Status& status) + :put(put), type(type), status(status) + {} + PutInfo(const std::tr1::shared_ptr& put, const pvd::StructureConstPtr& type, const std::string& message) + :put(put), type(type) + { + if(!message.empty()) + status = pvd::Status::warn(message); + } +}; +} + void SharedPV::open(const pvd::PVStructure &value, const epics::pvData::BitSet& valid) { - typedef std::vector > xputs_t; + typedef std::vector xputs_t; typedef std::vector > xrpcs_t; typedef std::vector > xmonitors_t; typedef std::vector > xgetfields_t; @@ -127,7 +152,13 @@ void SharedPV::open(const pvd::PVStructure &value, const epics::pvData::BitSet& FOR_EACH(puts_t::const_iterator, it, end, puts) { try { - p_put.push_back((*it)->shared_from_this()); + try { + (*it)->mapper.compute(*current, *(*it)->pvRequest, config.mapperMode); + p_put.push_back(PutInfo((*it)->shared_from_this(), (*it)->mapper.requested(), (*it)->mapper.warnings())); + }catch(std::runtime_error& e) { + // compute() error + p_put.push_back(PutInfo((*it)->shared_from_this(), pvd::StructureConstPtr(), pvd::Status::error(e.what()))); + } }catch(std::tr1::bad_weak_ptr&) { //racing destruction } @@ -151,9 +182,14 @@ void SharedPV::open(const pvd::PVStructure &value, const epics::pvData::BitSet& } getfields.clear(); // consume } + // unlock for callbacks FOR_EACH(xputs_t::iterator, it, end, p_put) { - SharedPut::requester_type::shared_pointer requester((*it)->requester.lock()); - if(requester) requester->channelPutConnect(pvd::Status(), *it, newtype); + SharedPut::requester_type::shared_pointer requester(it->put->requester.lock()); + if(requester) { + if(it->status.getType()==pvd::Status::STATUSTYPE_WARNING) + requester->message(it->status.getMessage(), pvd::warningMessage); + requester->channelPutConnect(it->status, it->put, it->type); + } } FOR_EACH(xrpcs_t::iterator, it, end, p_rpc) { SharedRPC::requester_type::shared_pointer requester((*it)->requester.lock()); @@ -190,6 +226,7 @@ void SharedPV::close(bool destroy) xrpcs_t p_rpc; xmonitors_t p_monitor; xchannels_t p_channel; + Handler::shared_pointer p_handler; { Guard I(mutex); @@ -202,6 +239,7 @@ void SharedPV::close(bool destroy) p_channel.reserve(channels.size()); FOR_EACH(puts_t::const_iterator, it, end, puts) { + (*it)->mapper.reset(); p_put.push_back((*it)->requester.lock()); } FOR_EACH(rpcs_t::const_iterator, it, end, rpcs) { @@ -209,10 +247,14 @@ void SharedPV::close(bool destroy) } FOR_EACH(monitors_t::const_iterator, it, end, monitors) { (*it)->close(); - p_monitor.push_back((*it)->shared_from_this()); + try { + p_monitor.push_back((*it)->shared_from_this()); + }catch(std::tr1::bad_weak_ptr&) { /* ignore, racing dtor */ } } FOR_EACH(channels_t::const_iterator, it, end, channels) { - p_channel.push_back((*it)->shared_from_this()); + try { + p_channel.push_back((*it)->shared_from_this()); + }catch(std::tr1::bad_weak_ptr&) { /* ignore, racing dtor */ } } type.reset(); @@ -223,6 +265,10 @@ void SharedPV::close(bool destroy) puts.clear(); rpcs.clear(); monitors.clear(); + if(!channels.empty() && notifiedConn) { + p_handler = handler; + notifiedConn = false; + } channels.clear(); } } @@ -240,6 +286,10 @@ void SharedPV::close(bool destroy) if(!req) continue; req->channelStateChange(*it, destroy ? pva::Channel::DESTROYED : pva::Channel::DISCONNECTED); } + if(p_handler) { + shared_pointer self(internal_self); + p_handler->onLastDisconnect(self); + } } pvd::PVStructure::shared_pointer SharedPV::build() diff --git a/src/server/sharedstateimpl.h b/src/server/sharedstateimpl.h index 34126c6..de12eca 100644 --- a/src/server/sharedstateimpl.h +++ b/src/server/sharedstateimpl.h @@ -5,6 +5,8 @@ #ifndef SHAREDSTATEIMPL_H #define SHAREDSTATEIMPL_H +#include + #include "pva/sharedstate.h" #include #include @@ -62,7 +64,8 @@ struct SharedMonitorFIFO : public pva::MonitorFIFO const std::tr1::shared_ptr channel; SharedMonitorFIFO(const std::tr1::shared_ptr& channel, const requester_type::shared_pointer& requester, - const pvd::PVStructure::const_shared_pointer &pvRequest); + const pvd::PVStructure::const_shared_pointer &pvRequest, + Config *conf); virtual ~SharedMonitorFIFO(); }; @@ -74,8 +77,7 @@ struct SharedPut : public pva::ChannelPut, const pvd::PVStructure::const_shared_pointer pvRequest; // guarded by PV mutex - pvd::StructureConstPtr lastStruct; - pvd::BitSet selectMask; + pvd::PVRequestMapper mapper; static size_t num_instances; diff --git a/testApp/remote/testmonitorfifo.cpp b/testApp/remote/testmonitorfifo.cpp index 034664e..7696338 100644 --- a/testApp/remote/testmonitorfifo.cpp +++ b/testApp/remote/testmonitorfifo.cpp @@ -30,6 +30,7 @@ struct Tester { // we only have one thread, so no need for sync. enum cb_t { Connect, + ConnectError, Event, Unlisten, Close, @@ -39,6 +40,7 @@ struct Tester { switch(cb) { #define CASE(NAME) case NAME: return #NAME CASE(Connect); + CASE(ConnectError); CASE(Event); CASE(Unlisten); CASE(Close); @@ -64,9 +66,12 @@ struct Tester { } virtual void monitorConnect(epics::pvData::Status const & status, pva::MonitorPtr const & monitor, epics::pvData::StructureConstPtr const & structure) OVERRIDE FINAL { - testDiag("In %s", CURRENT_FUNCTION); + testDiag("In %s : %s", CURRENT_FUNCTION, status.isSuccess() ? "OK" : status.getMessage().c_str()); Guard G(mutex); - Tester::timeline.push_back(Connect); + if(status.isSuccess()) + Tester::timeline.push_back(Connect); + else + Tester::timeline.push_back(ConnectError); } virtual void monitorEvent(pva::MonitorPtr const & monitor) OVERRIDE FINAL { testDiag("In %s", CURRENT_FUNCTION); @@ -763,11 +768,36 @@ void checkCountdown() tester.testTimeline({Tester::Close}); } +void checkBadRequest() +{ + testDiag("==== %s ====", CURRENT_FUNCTION); + pva::MonitorFIFO::Config conf; + conf.maxCount=4; + conf.defCount=3; + Tester tester(pvd::createRequest("field(invalid)"), &conf); + + tester.connect(pvd::pvInt); + tester.mon->notify(); + tester.testTimeline({Tester::ConnectError}); + + // when in Error, all are no-op + tester.post(15); + tester.tryPost(4, false); + tester.tryPost(5, false, true); + tester.mon->finish(); + + tester.mon->notify(); + tester.testTimeline({}); // nothing happens + + tester.close(); + tester.testTimeline({}); +} + } // namespace MAIN(testmonitorfifo) { - testPlan(184); + testPlan(189); checkPlain(); checkAfterClose(); checkReOpenLost(); @@ -777,6 +807,7 @@ MAIN(testmonitorfifo) checkPipeline(); checkSpam(); checkCountdown(); + checkBadRequest(); return testDone(); }