From e67e00c96889cdbb2b31789b56230cd0cec177f4 Mon Sep 17 00:00:00 2001 From: Michael Davidsaver Date: Wed, 3 Oct 2018 10:26:47 -0700 Subject: [PATCH 01/12] client.h minor --- src/client/clientPut.cpp | 4 ++-- src/client/clientRPC.cpp | 2 ++ src/client/pva/client.h | 2 +- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/src/client/clientPut.cpp b/src/client/clientPut.cpp index bb481e8..1468c21 100644 --- a/src/client/clientPut.cpp +++ b/src/client/clientPut.cpp @@ -205,13 +205,13 @@ namespace pvac { Operation ClientChannel::put(PutCallback* cb, epics::pvData::PVStructure::const_shared_pointer pvRequest, - bool getcurrent) + bool getprevious) { if(!impl) throw std::logic_error("Dead Channel"); if(!pvRequest) pvRequest = pvd::createRequest("field()"); - std::tr1::shared_ptr ret(Putter::build(cb, getcurrent)); + std::tr1::shared_ptr ret(Putter::build(cb, getprevious)); { Guard G(ret->mutex); diff --git a/src/client/clientRPC.cpp b/src/client/clientRPC.cpp index ff6d54d..c555645 100644 --- a/src/client/clientRPC.cpp +++ b/src/client/clientRPC.cpp @@ -29,6 +29,7 @@ struct RPCer : public pvac::detail::CallbackStorage, operation_type::shared_pointer op; pvac::ClientChannel::GetCallback *cb; + // 'event' may be modified as long as cb!=NULL pvac::GetEvent event; pvd::PVStructure::const_shared_pointer args; @@ -110,6 +111,7 @@ struct RPCer : public pvac::detail::CallbackStorage, { std::tr1::shared_ptr keepalive(internal_shared_from_this()); CallbackGuard G(*this); + if(!cb) return; event.message = "Disconnect"; callEvent(G); diff --git a/src/client/pva/client.h b/src/client/pva/client.h index b5e25e1..89c3a74 100644 --- a/src/client/pva/client.h +++ b/src/client/pva/client.h @@ -385,7 +385,7 @@ public: //! If false, then previous=NULL Operation put(PutCallback* cb, epics::pvData::PVStructure::const_shared_pointer pvRequest = epics::pvData::PVStructure::const_shared_pointer(), - bool getcurrent = false); + bool getprevious = false); //! Synchronious put operation inline From 421d50e3bac21b5e37faf69c6c8a72a5448bed6f Mon Sep 17 00:00:00 2001 From: Michael Davidsaver Date: Wed, 3 Oct 2018 10:54:26 -0700 Subject: [PATCH 02/12] SharedPV too many notifiedConn notifiedConn set more often than it should. --- src/server/sharedstate_channel.cpp | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/server/sharedstate_channel.cpp b/src/server/sharedstate_channel.cpp index 76b6c64..4859ca7 100644 --- a/src/server/sharedstate_channel.cpp +++ b/src/server/sharedstate_channel.cpp @@ -48,10 +48,11 @@ SharedChannel::SharedChannel(const std::tr1::shared_ptr &owner, SharedPV::Handler::shared_pointer handler; { Guard G(owner->mutex); - if(owner->channels.empty()) + if(owner->channels.empty()) { handler = owner->handler; + owner->notifiedConn = true; + } owner->channels.push_back(this); - owner->notifiedConn = true; } if(handler) { handler->onFirstConnect(owner); From abc1b448d584338d982ad5135738ff2211750e7e Mon Sep 17 00:00:00 2001 From: Michael Davidsaver Date: Wed, 3 Oct 2018 19:07:34 -0700 Subject: [PATCH 03/12] minor --- src/client/clientRPC.cpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/client/clientRPC.cpp b/src/client/clientRPC.cpp index c555645..e3d0ce9 100644 --- a/src/client/clientRPC.cpp +++ b/src/client/clientRPC.cpp @@ -71,7 +71,7 @@ struct RPCer : public pvac::detail::CallbackStorage, } // called automatically via wrapped_shared_from_this - virtual void cancel() + virtual void cancel() OVERRIDE FINAL { std::tr1::shared_ptr keepalive(internal_shared_from_this()); CallbackGuard G(*this); @@ -87,7 +87,7 @@ struct RPCer : public pvac::detail::CallbackStorage, virtual void channelRPCConnect( const epics::pvData::Status& status, - pva::ChannelRPC::shared_pointer const & operation) + pva::ChannelRPC::shared_pointer const & operation) OVERRIDE FINAL { std::tr1::shared_ptr keepalive(internal_shared_from_this()); CallbackGuard G(*this); @@ -120,7 +120,7 @@ struct RPCer : public pvac::detail::CallbackStorage, virtual void requestDone( const epics::pvData::Status& status, pva::ChannelRPC::shared_pointer const & operation, - epics::pvData::PVStructure::shared_pointer const & pvResponse) + epics::pvData::PVStructure::shared_pointer const & pvResponse) OVERRIDE FINAL { std::tr1::shared_ptr keepalive(internal_shared_from_this()); CallbackGuard G(*this); From 542d51a215e3bc745dabc478371f1a953ab45249 Mon Sep 17 00:00:00 2001 From: Michael Davidsaver Date: Wed, 3 Oct 2018 23:41:12 -0700 Subject: [PATCH 04/12] RPC troubleshooting something funny is going on --- src/server/responseHandlers.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/server/responseHandlers.cpp b/src/server/responseHandlers.cpp index 80d1d88..d69835f 100644 --- a/src/server/responseHandlers.cpp +++ b/src/server/responseHandlers.cpp @@ -2988,6 +2988,7 @@ ServerChannelRPCRequesterImpl::ServerChannelRPCRequesterImpl( const pvAccessID ioid, Transport::shared_pointer const & transport): BaseChannelRequester(context, channel, ioid, transport), _channelRPC(), _pvResponse() + ,_status(Status::fatal("Invalid State")) { } @@ -3090,6 +3091,7 @@ void ServerChannelRPCRequesterImpl::send(ByteBuffer* buffer, TransportSendContro SerializationHelper::serializeStructureFull(buffer, control, _pvResponse); } } + _status = Status::fatal("Stale state"); } stopRequest(); From 605c172d3c2e86cce378aa751c745adc547bad4e Mon Sep 17 00:00:00 2001 From: Michael Davidsaver Date: Thu, 4 Oct 2018 07:30:24 -0700 Subject: [PATCH 05/12] fix SharedPV allow rpc() while close()'d Don't call channelRPCConnect() again during open(). Turns out that a second channelRPCConnect() has the effect of sending a reply with OK and NULL. --- src/server/sharedstate_channel.cpp | 2 +- src/server/sharedstate_pv.cpp | 12 ------------ 2 files changed, 1 insertion(+), 13 deletions(-) diff --git a/src/server/sharedstate_channel.cpp b/src/server/sharedstate_channel.cpp index 4859ca7..9379a9e 100644 --- a/src/server/sharedstate_channel.cpp +++ b/src/server/sharedstate_channel.cpp @@ -158,11 +158,11 @@ pva::ChannelRPC::shared_pointer SharedChannel::createChannelRPC( pvd::PVStructure::shared_pointer const & pvRequest) { std::tr1::shared_ptr ret(new SharedRPC(shared_from_this(), requester, pvRequest)); + requester->channelRPCConnect(pvd::Status(), ret); { Guard G(owner->mutex); owner->rpcs.push_back(ret.get()); } - requester->channelRPCConnect(pvd::Status(), ret); return ret; } diff --git a/src/server/sharedstate_pv.cpp b/src/server/sharedstate_pv.cpp index 98851dc..8f06029 100644 --- a/src/server/sharedstate_pv.cpp +++ b/src/server/sharedstate_pv.cpp @@ -124,7 +124,6 @@ struct PutInfo { // oh to be able to use std::tuple ... void SharedPV::open(const pvd::PVStructure &value, const epics::pvData::BitSet& valid) { typedef std::vector xputs_t; - typedef std::vector > xrpcs_t; typedef std::vector > xmonitors_t; typedef std::vector > xgetfields_t; @@ -133,7 +132,6 @@ void SharedPV::open(const pvd::PVStructure &value, const epics::pvData::BitSet& newvalue->copyUnchecked(value, valid); xputs_t p_put; - xrpcs_t p_rpc; xmonitors_t p_monitor; xgetfields_t p_getfield; { @@ -143,7 +141,6 @@ void SharedPV::open(const pvd::PVStructure &value, const epics::pvData::BitSet& throw std::logic_error("Already open()"); p_put.reserve(puts.size()); - p_rpc.reserve(rpcs.size()); p_monitor.reserve(monitors.size()); p_getfield.reserve(getfields.size()); @@ -164,11 +161,6 @@ void SharedPV::open(const pvd::PVStructure &value, const epics::pvData::BitSet& //racing destruction } } - FOR_EACH(rpcs_t::const_iterator, it, end, rpcs) { - try { - p_rpc.push_back((*it)->shared_from_this()); - }catch(std::tr1::bad_weak_ptr&) {} - } FOR_EACH(monitors_t::const_iterator, it, end, monitors) { try { (*it)->open(newtype); @@ -192,10 +184,6 @@ void SharedPV::open(const pvd::PVStructure &value, const epics::pvData::BitSet& requester->channelPutConnect(it->status, it->put, it->type); } } - FOR_EACH(xrpcs_t::iterator, it, end, p_rpc) { - SharedRPC::requester_type::shared_pointer requester((*it)->requester.lock()); - if(requester) requester->channelRPCConnect(pvd::Status(), *it); - } FOR_EACH(xmonitors_t::iterator, it, end, p_monitor) { (*it)->notify(); } From 48770f132fcbc1acd3a2944b55500c7b4ed0165d Mon Sep 17 00:00:00 2001 From: Michael Davidsaver Date: Thu, 4 Oct 2018 20:56:12 -0700 Subject: [PATCH 06/12] fix global string constants --- src/pva/pv/pvaConstants.h | 7 ++++--- src/pva/pvaVersion.cpp | 7 ++++--- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/src/pva/pv/pvaConstants.h b/src/pva/pv/pvaConstants.h index 9ee652d..d090141 100644 --- a/src/pva/pv/pvaConstants.h +++ b/src/pva/pv/pvaConstants.h @@ -18,6 +18,7 @@ # define epicsExportSharedSymbols # undef pvaConstantsepicsExportSharedSymbols #endif +#include namespace epics { namespace pvAccess { @@ -73,13 +74,13 @@ const epics::pvData::int16 INVALID_DATA_TYPE = 0xFFFF; const epics::pvData::int32 INVALID_IOID = 0; /** Default PVA provider name. */ -const std::string PVACCESS_DEFAULT_PROVIDER; +epicsShareExtern const std::string PVACCESS_DEFAULT_PROVIDER; /** "All-providers registered" PVA provider name. */ -const std::string PVACCESS_ALL_PROVIDERS; +epicsShareExtern const std::string PVACCESS_ALL_PROVIDERS; /** Name of the system env. variable to turn on debugging. */ -const std::string PVACCESS_DEBUG; +epicsShareExtern const std::string PVACCESS_DEBUG; } } diff --git a/src/pva/pvaVersion.cpp b/src/pva/pvaVersion.cpp index f196242..815a3a5 100644 --- a/src/pva/pvaVersion.cpp +++ b/src/pva/pvaVersion.cpp @@ -8,6 +8,7 @@ #define epicsExportSharedSymbols #include +#include using std::stringstream; using std::string; @@ -15,9 +16,9 @@ using std::string; namespace epics { namespace pvAccess { -const std::string PVACCESS_DEFAULT_PROVIDER = "local"; -const std::string PVACCESS_ALL_PROVIDERS = ""; -const std::string PVACCESS_DEBUG = "EPICS_PVA_DEBUG"; +const std::string PVACCESS_DEFAULT_PROVIDER("local"); +const std::string PVACCESS_ALL_PROVIDERS(""); +const std::string PVACCESS_DEBUG("EPICS_PVA_DEBUG"); Version::Version(std::string const & productName, std::string const & implementationLangugage, From f6eeeac649f3137cd4c074173e625353a71d70e2 Mon Sep 17 00:00:00 2001 From: Michael Davidsaver Date: Thu, 4 Oct 2018 21:03:24 -0700 Subject: [PATCH 07/12] more UDP debugging --- src/remote/blockingUDPTransport.cpp | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/src/remote/blockingUDPTransport.cpp b/src/remote/blockingUDPTransport.cpp index a0db1ef..084a510 100644 --- a/src/remote/blockingUDPTransport.cpp +++ b/src/remote/blockingUDPTransport.cpp @@ -225,7 +225,7 @@ void BlockingUDPTransport::run() { 0, (sockaddr*)&fromAddress, &addrStructSize); - if(likely(bytesRead>0)) { + if(likely(bytesRead>=0)) { // successfully got datagram bool ignore = false; for(size_t i = 0; i <_ignoredAddresses.size(); i++) @@ -233,11 +233,22 @@ void BlockingUDPTransport::run() { if(_ignoredAddresses[i].ia.sin_addr.s_addr==fromAddress.ia.sin_addr.s_addr) { ignore = true; + if(pvAccessIsLoggable(logLevelDebug)) { + char strBuffer[64]; + sockAddrToDottedIP(&fromAddress.sa, strBuffer, sizeof(strBuffer)); + LOG(logLevelDebug, "UDP Ignore (%d) %s x- %s", bytesRead, _remoteName.c_str(), strBuffer); + } break; } } if(likely(!ignore)) { + if(pvAccessIsLoggable(logLevelDebug)) { + char strBuffer[64]; + sockAddrToDottedIP(&fromAddress.sa, strBuffer, sizeof(strBuffer)); + LOG(logLevelDebug, "UDP Rx (%d) %s <- %s", bytesRead, _remoteName.c_str(), strBuffer); + } + _receiveBuffer.setPosition(RECEIVE_BUFFER_PRE_RESERVE); _receiveBuffer.setLimit(RECEIVE_BUFFER_PRE_RESERVE+bytesRead); @@ -253,8 +264,7 @@ void BlockingUDPTransport::run() { __FILE__, __LINE__); } } - } - else if (unlikely(bytesRead == -1)) { + } else { int socketError = SOCKERRNO; @@ -400,7 +410,7 @@ bool BlockingUDPTransport::send(const char* buffer, size_t length, const osiSock { if (IS_LOGGABLE(logLevelDebug)) { - LOG(logLevelDebug, "Sending %zu bytes %s -> %s.", + LOG(logLevelDebug, "UDP Tx (%zu) %s -> %s.", length, _remoteName.c_str(), inetAddressToString(address).c_str()); } From 53b95dd336e9ca187c8d112daa4639086be46350 Mon Sep 17 00:00:00 2001 From: Michael Davidsaver Date: Sat, 6 Oct 2018 16:34:14 -0700 Subject: [PATCH 08/12] more SharedPV rpc state tracking handle re-open() with RPC --- src/server/sharedstate_channel.cpp | 1 + src/server/sharedstate_pv.cpp | 21 ++++++++++++++++++--- src/server/sharedstate_rpc.cpp | 1 + src/server/sharedstateimpl.h | 2 ++ 4 files changed, 22 insertions(+), 3 deletions(-) diff --git a/src/server/sharedstate_channel.cpp b/src/server/sharedstate_channel.cpp index 9379a9e..f499ee1 100644 --- a/src/server/sharedstate_channel.cpp +++ b/src/server/sharedstate_channel.cpp @@ -158,6 +158,7 @@ pva::ChannelRPC::shared_pointer SharedChannel::createChannelRPC( pvd::PVStructure::shared_pointer const & pvRequest) { std::tr1::shared_ptr ret(new SharedRPC(shared_from_this(), requester, pvRequest)); + ret->connected = true; requester->channelRPCConnect(pvd::Status(), ret); { Guard G(owner->mutex); diff --git a/src/server/sharedstate_pv.cpp b/src/server/sharedstate_pv.cpp index 8f06029..6ccebc0 100644 --- a/src/server/sharedstate_pv.cpp +++ b/src/server/sharedstate_pv.cpp @@ -124,6 +124,7 @@ struct PutInfo { // oh to be able to use std::tuple ... void SharedPV::open(const pvd::PVStructure &value, const epics::pvData::BitSet& valid) { typedef std::vector xputs_t; + typedef std::vector > xrpcs_t; typedef std::vector > xmonitors_t; typedef std::vector > xgetfields_t; @@ -132,6 +133,7 @@ void SharedPV::open(const pvd::PVStructure &value, const epics::pvData::BitSet& newvalue->copyUnchecked(value, valid); xputs_t p_put; + xrpcs_t p_rpc; xmonitors_t p_monitor; xgetfields_t p_getfield; { @@ -141,6 +143,7 @@ void SharedPV::open(const pvd::PVStructure &value, const epics::pvData::BitSet& throw std::logic_error("Already open()"); p_put.reserve(puts.size()); + p_rpc.reserve(rpcs.size()); p_monitor.reserve(monitors.size()); p_getfield.reserve(getfields.size()); @@ -161,6 +164,12 @@ void SharedPV::open(const pvd::PVStructure &value, const epics::pvData::BitSet& //racing destruction } } + FOR_EACH(rpcs_t::const_iterator, it, end, rpcs) { + if((*it)->connected) continue; + try { + p_rpc.push_back((*it)->shared_from_this()); + }catch(std::tr1::bad_weak_ptr&) {} + } FOR_EACH(monitors_t::const_iterator, it, end, monitors) { try { (*it)->open(newtype); @@ -184,6 +193,10 @@ void SharedPV::open(const pvd::PVStructure &value, const epics::pvData::BitSet& requester->channelPutConnect(it->status, it->put, it->type); } } + FOR_EACH(xrpcs_t::iterator, it, end, p_rpc) { + SharedRPC::requester_type::shared_pointer requester((*it)->requester.lock()); + if(requester) requester->channelRPCConnect(pvd::Status(), *it); + } FOR_EACH(xmonitors_t::iterator, it, end, p_monitor) { (*it)->notify(); } @@ -219,6 +232,11 @@ void SharedPV::close(bool destroy) { Guard I(mutex); + FOR_EACH(rpcs_t::const_iterator, it, end, rpcs) { + if(!(*it)->connected) continue; + p_rpc.push_back((*it)->requester.lock()); + } + if(type) { p_put.reserve(puts.size()); @@ -230,9 +248,6 @@ void SharedPV::close(bool destroy) (*it)->mapper.reset(); p_put.push_back((*it)->requester.lock()); } - FOR_EACH(rpcs_t::const_iterator, it, end, rpcs) { - p_rpc.push_back((*it)->requester.lock()); - } FOR_EACH(monitors_t::const_iterator, it, end, monitors) { (*it)->close(); try { diff --git a/src/server/sharedstate_rpc.cpp b/src/server/sharedstate_rpc.cpp index 3f192f6..336d083 100644 --- a/src/server/sharedstate_rpc.cpp +++ b/src/server/sharedstate_rpc.cpp @@ -84,6 +84,7 @@ SharedRPC::SharedRPC(const std::tr1::shared_ptr& channel, :channel(channel) ,requester(requester) ,pvRequest(pvRequest) + ,connected(false) { REFTRACE_INCREMENT(num_instances); } diff --git a/src/server/sharedstateimpl.h b/src/server/sharedstateimpl.h index de12eca..8fbf362 100644 --- a/src/server/sharedstateimpl.h +++ b/src/server/sharedstateimpl.h @@ -107,6 +107,8 @@ struct SharedRPC : public pva::ChannelRPC, static size_t num_instances; + bool connected; // have I called requester->channelRPCConnect(Ok) ? + SharedRPC(const std::tr1::shared_ptr& channel, const requester_type::shared_pointer& requester, const pvd::PVStructure::const_shared_pointer &pvRequest); From 165313d8f5742947c887022d1cbabe2c855ddf73 Mon Sep 17 00:00:00 2001 From: Michael Davidsaver Date: Sun, 7 Oct 2018 13:29:11 -0700 Subject: [PATCH 09/12] client.h add some exception guards These are one shot operations anyway, and allowing user exceptions to propagate to core PVA isn't helpful. --- src/client/clientGet.cpp | 6 +++++- src/client/clientPut.cpp | 6 +++++- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/src/client/clientGet.cpp b/src/client/clientGet.cpp index 6314db0..d72e794 100644 --- a/src/client/clientGet.cpp +++ b/src/client/clientGet.cpp @@ -50,7 +50,11 @@ struct Getter : public pvac::detail::CallbackStorage, pvac::ClientChannel::GetCallback *C=cb; cb = 0; CallbackUse U(G); - C->getDone(event); + try { + C->getDone(event); + } catch(std::exception& e) { + LOG(pva::logLevelInfo, "Lost exception during getDone(): %s", e.what()); + } } virtual std::string name() const OVERRIDE FINAL diff --git a/src/client/clientPut.cpp b/src/client/clientPut.cpp index 1468c21..86130aa 100644 --- a/src/client/clientPut.cpp +++ b/src/client/clientPut.cpp @@ -54,7 +54,11 @@ struct Putter : public pvac::detail::CallbackStorage, pvac::ClientChannel::PutCallback *C=cb; cb = 0; CallbackUse U(G); - C->putDone(event); + try { + C->putDone(event); + } catch(std::exception& e) { + LOG(pva::logLevelInfo, "Lost exception during putDone(): %s", e.what()); + } } virtual std::string name() const OVERRIDE FINAL From ed1bd1b962b48c0973f211b462a599abc11fc4c0 Mon Sep 17 00:00:00 2001 From: Michael Davidsaver Date: Sun, 7 Oct 2018 19:27:29 -0700 Subject: [PATCH 10/12] SharedPV defer onFirstConnect() to first getField/Put/Monitor --- src/server/pva/sharedstate.h | 5 +++- src/server/sharedstate_channel.cpp | 44 ++++++++++++++++++++++-------- 2 files changed, 36 insertions(+), 13 deletions(-) diff --git a/src/server/pva/sharedstate.h b/src/server/pva/sharedstate.h index e424dd7..a9e2991 100644 --- a/src/server/pva/sharedstate.h +++ b/src/server/pva/sharedstate.h @@ -206,7 +206,10 @@ private: //! Used for initial Monitor update and Get operations. epics::pvData::BitSet valid; - bool notifiedConn; // whether onFirstConnect() has been, or is being, called + // whether onFirstConnect() has been, or is being, called. + // Set when the first getField, Put, or Monitor (but not RPC) is created. + // Cleared when the last Channel is destroyed. + bool notifiedConn; int debugLvl; diff --git a/src/server/sharedstate_channel.cpp b/src/server/sharedstate_channel.cpp index f499ee1..59e1809 100644 --- a/src/server/sharedstate_channel.cpp +++ b/src/server/sharedstate_channel.cpp @@ -45,18 +45,10 @@ SharedChannel::SharedChannel(const std::tr1::shared_ptr &owner, this); } - SharedPV::Handler::shared_pointer handler; { Guard G(owner->mutex); - if(owner->channels.empty()) { - handler = owner->handler; - owner->notifiedConn = true; - } owner->channels.push_back(this); } - if(handler) { - handler->onFirstConnect(owner); - } } SharedChannel::~SharedChannel() @@ -111,15 +103,25 @@ std::tr1::shared_ptr SharedChannel::getChannelRequester() void SharedChannel::getField(pva::GetFieldRequester::shared_pointer const & requester,std::string const & subField) { epics::pvData::FieldConstPtr desc; + SharedPV::Handler::shared_pointer handler; { Guard G(owner->mutex); - if(owner->type) + if(owner->type) { desc = owner->type; - else - owner->getfields.push_back(requester); + } + + if(!owner->channels.empty() && !owner->notifiedConn) { + handler = owner->handler; + owner->notifiedConn = true; + } + owner->getfields.push_back(requester); } - if(desc) + if(desc) { requester->getDone(pvd::Status(), desc); + } + if(handler) { + handler->onFirstConnect(owner); + } } pva::ChannelPut::shared_pointer SharedChannel::createChannelPut( @@ -130,6 +132,7 @@ pva::ChannelPut::shared_pointer SharedChannel::createChannelPut( pvd::StructureConstPtr type; std::string warning; + SharedPV::Handler::shared_pointer handler; try { { Guard G(owner->mutex); @@ -140,6 +143,11 @@ pva::ChannelPut::shared_pointer SharedChannel::createChannelPut( type = ret->mapper.requested(); warning = ret->mapper.warnings(); } + + if(!owner->channels.empty() && !owner->notifiedConn) { + handler = owner->handler; + owner->notifiedConn = true; + } } if(!warning.empty()) requester->message(warning, pvd::warningMessage); @@ -150,6 +158,9 @@ pva::ChannelPut::shared_pointer SharedChannel::createChannelPut( type.reset(); requester->channelPutConnect(pvd::Status::error(e.what()), ret, type); } + if(handler) { + handler->onFirstConnect(owner); + } return ret; } @@ -172,6 +183,7 @@ pva::Monitor::shared_pointer SharedChannel::createMonitor( pvd::PVStructure::shared_pointer const & pvRequest) { SharedMonitorFIFO::Config mconf; + SharedPV::Handler::shared_pointer handler; mconf.dropEmptyUpdates = owner->config.dropEmptyUpdates; mconf.mapperMode = owner->config.mapperMode; std::tr1::shared_ptr ret(new SharedMonitorFIFO(shared_from_this(), requester, pvRequest, &mconf)); @@ -185,9 +197,17 @@ pva::Monitor::shared_pointer SharedChannel::createMonitor( // post initial update ret->post(*owner->current, owner->valid); } + + if(!owner->channels.empty() && !owner->notifiedConn) { + handler = owner->handler; + owner->notifiedConn = true; + } } if(notify) ret->notify(); + if(handler) { + handler->onFirstConnect(owner); + } return ret; } From 9ca7487e29bf912fc18a3c41618d9014a830bcb0 Mon Sep 17 00:00:00 2001 From: Michael Davidsaver Date: Sun, 7 Oct 2018 20:00:38 -0700 Subject: [PATCH 11/12] SharedPV allow use of Channel::destroy() to trigger onLastDisconnect() Add dead flag to catch double destroy() or use after destroy() --- src/server/sharedstate_channel.cpp | 123 +++++++++++++++++++---------- src/server/sharedstate_put.cpp | 54 ++++++++----- src/server/sharedstate_pv.cpp | 5 +- src/server/sharedstate_rpc.cpp | 25 ++++-- src/server/sharedstateimpl.h | 2 + 5 files changed, 140 insertions(+), 69 deletions(-) diff --git a/src/server/sharedstate_channel.cpp b/src/server/sharedstate_channel.cpp index 59e1809..e837331 100644 --- a/src/server/sharedstate_channel.cpp +++ b/src/server/sharedstate_channel.cpp @@ -35,6 +35,7 @@ SharedChannel::SharedChannel(const std::tr1::shared_ptr &owner, ,channelName(channelName) ,requester(requester) ,provider(provider) + ,dead(false) { REFTRACE_INCREMENT(num_instances); @@ -52,10 +53,20 @@ SharedChannel::SharedChannel(const std::tr1::shared_ptr &owner, } SharedChannel::~SharedChannel() +{ + destroy(); + REFTRACE_DECREMENT(num_instances); +} + +void SharedChannel::destroy() { std::tr1::shared_ptr handler; { Guard G(owner->mutex); + + if(dead) return; + dead = true; + bool wasempty = owner->channels.empty(); owner->channels.remove(this); if(!wasempty && owner->channels.empty() && owner->notifiedConn) { @@ -74,12 +85,8 @@ SharedChannel::~SharedChannel() channelName.c_str(), this); } - - REFTRACE_DECREMENT(num_instances); } -void SharedChannel::destroy() {} - std::tr1::shared_ptr SharedChannel::getProvider() { return provider.lock(); @@ -102,22 +109,28 @@ std::tr1::shared_ptr SharedChannel::getChannelRequester() void SharedChannel::getField(pva::GetFieldRequester::shared_pointer const & requester,std::string const & subField) { - epics::pvData::FieldConstPtr desc; + pvd::FieldConstPtr desc; + pvd::Status sts; SharedPV::Handler::shared_pointer handler; { Guard G(owner->mutex); - if(owner->type) { - desc = owner->type; - } + if(dead) { + sts = pvd::Status::error("Dead Channel"); - if(!owner->channels.empty() && !owner->notifiedConn) { - handler = owner->handler; - owner->notifiedConn = true; + } else { + if(owner->type) { + desc = owner->type; + } + + if(!owner->channels.empty() && !owner->notifiedConn) { + handler = owner->handler; + owner->notifiedConn = true; + } + owner->getfields.push_back(requester); } - owner->getfields.push_back(requester); } - if(desc) { - requester->getDone(pvd::Status(), desc); + if(desc || !sts.isOK()) { + requester->getDone(sts, desc); } if(handler) { handler->onFirstConnect(owner); @@ -131,28 +144,34 @@ pva::ChannelPut::shared_pointer SharedChannel::createChannelPut( std::tr1::shared_ptr ret(new SharedPut(shared_from_this(), requester, pvRequest)); pvd::StructureConstPtr type; + pvd::Status sts; std::string warning; SharedPV::Handler::shared_pointer handler; try { { Guard G(owner->mutex); - // ~SharedPut removes - owner->puts.push_back(ret.get()); - if(owner->current) { - ret->mapper.compute(*owner->current, *pvRequest, owner->config.mapperMode); - type = ret->mapper.requested(); - warning = ret->mapper.warnings(); - } + if(dead) { + sts = pvd::Status::error("Dead Channel"); - if(!owner->channels.empty() && !owner->notifiedConn) { - handler = owner->handler; - owner->notifiedConn = true; + } else { + // ~SharedPut removes + owner->puts.push_back(ret.get()); + if(owner->current) { + ret->mapper.compute(*owner->current, *pvRequest, owner->config.mapperMode); + type = ret->mapper.requested(); + warning = ret->mapper.warnings(); + } + + if(!owner->channels.empty() && !owner->notifiedConn) { + handler = owner->handler; + owner->notifiedConn = true; + } } } if(!warning.empty()) requester->message(warning, pvd::warningMessage); - if(type) - requester->channelPutConnect(pvd::Status(), ret, type); + if(type || !sts.isOK()) + requester->channelPutConnect(sts, ret, type); }catch(std::runtime_error& e){ ret.reset(); type.reset(); @@ -170,11 +189,18 @@ pva::ChannelRPC::shared_pointer SharedChannel::createChannelRPC( { std::tr1::shared_ptr ret(new SharedRPC(shared_from_this(), requester, pvRequest)); ret->connected = true; - requester->channelRPCConnect(pvd::Status(), ret); + + pvd::Status sts; { Guard G(owner->mutex); - owner->rpcs.push_back(ret.get()); + if(dead) { + sts = pvd::Status::error("Dead Channel"); + + } else { + owner->rpcs.push_back(ret.get()); + } } + requester->channelRPCConnect(sts, ret); return ret; } @@ -186,27 +212,42 @@ pva::Monitor::shared_pointer SharedChannel::createMonitor( SharedPV::Handler::shared_pointer handler; mconf.dropEmptyUpdates = owner->config.dropEmptyUpdates; mconf.mapperMode = owner->config.mapperMode; + std::tr1::shared_ptr ret(new SharedMonitorFIFO(shared_from_this(), requester, pvRequest, &mconf)); + bool notify; + pvd::Status sts; { Guard G(owner->mutex); - owner->monitors.push_back(ret.get()); - notify = !!owner->type; - if(notify) { - ret->open(owner->type); - // post initial update - ret->post(*owner->current, owner->valid); - } + if(dead) { + sts = pvd::Status::error("Dead Channel"); + notify = false; - if(!owner->channels.empty() && !owner->notifiedConn) { - handler = owner->handler; - owner->notifiedConn = true; + } else { + owner->monitors.push_back(ret.get()); + notify = !!owner->type; + if(notify) { + ret->open(owner->type); + // post initial update + ret->post(*owner->current, owner->valid); + } + + if(!owner->channels.empty() && !owner->notifiedConn) { + handler = owner->handler; + owner->notifiedConn = true; + } } } - if(notify) + if(!sts.isOK()) { + requester->monitorConnect(sts, pvd::MonitorPtr(), pvd::StructureConstPtr()); + ret.reset(); + + } else if(notify) { ret->notify(); - if(handler) { - handler->onFirstConnect(owner); + + if(handler) { + handler->onFirstConnect(owner); + } } return ret; } diff --git a/src/server/sharedstate_put.cpp b/src/server/sharedstate_put.cpp index f61f3b3..3afa2c6 100644 --- a/src/server/sharedstate_put.cpp +++ b/src/server/sharedstate_put.cpp @@ -107,43 +107,55 @@ void SharedPut::put( std::tr1::shared_ptr handler; pvd::PVStructure::shared_pointer realval; pvd::BitSet changed; + pvd::Status sts; { Guard G(channel->owner->mutex); - if(pvPutStructure->getStructure()!=mapper.requested()) { + if(channel->dead) { + sts = pvd::Status::error("Dead Channel"); + + } else if(pvPutStructure->getStructure()!=mapper.requested()) { requester_type::shared_pointer req(requester.lock()); - if(req) - req->putDone(pvd::Status::error("Type changed"), shared_from_this()); - return; + sts = pvd::Status::error("Type changed"); + + } else { + + handler = channel->owner->handler; + + realval = mapper.buildBase(); + + mapper.copyBaseFromRequested(*realval, changed, *pvPutStructure, *putBitSet); } - - handler = channel->owner->handler; - - realval = mapper.buildBase(); - - mapper.copyBaseFromRequested(*realval, changed, *pvPutStructure, *putBitSet); } - std::tr1::shared_ptr impl(new PutOP(shared_from_this(), pvRequest, realval, changed), - Operation::Impl::Cleanup()); + if(!sts.isOK()) { + requester_type::shared_pointer req(requester.lock()); + if(req) + req->putDone(sts, pva::ChannelPut::shared_pointer()); - if(handler) { - Operation op(impl); - handler->onPut(channel->owner, op); + } else { + std::tr1::shared_ptr impl(new PutOP(shared_from_this(), pvRequest, realval, changed), + Operation::Impl::Cleanup()); + + if(handler) { + Operation op(impl); + handler->onPut(channel->owner, op); + } } } void SharedPut::get() { - pvd::Status sts; pvd::PVStructurePtr current; pvd::BitSetPtr changed; - bool emptyselect = false; { Guard G(channel->owner->mutex); - if(channel->owner->current) { + if(channel->dead) { + sts = pvd::Status::error("Dead Channel"); + + } else if(channel->owner->current) { assert(!!mapper.requested()); current = mapper.buildRequested(); @@ -157,10 +169,10 @@ void SharedPut::get() requester_type::shared_pointer req(requester.lock()); if(!req) return; - if(!current) { + if(!sts.isOK()) { + // no-op + } else if(!current) { sts = pvd::Status::error("Get not possible, cache disabled"); - } else if(emptyselect) { - sts = pvd::Status::warn("pvRequest with empty field mask"); } req->getDone(sts, shared_from_this(), current, changed); diff --git a/src/server/sharedstate_pv.cpp b/src/server/sharedstate_pv.cpp index 6ccebc0..b10edb1 100644 --- a/src/server/sharedstate_pv.cpp +++ b/src/server/sharedstate_pv.cpp @@ -152,6 +152,7 @@ void SharedPV::open(const pvd::PVStructure &value, const epics::pvData::BitSet& this->valid = valid; FOR_EACH(puts_t::const_iterator, it, end, puts) { + if((*it)->channel->dead) continue; try { try { (*it)->mapper.compute(*current, *(*it)->pvRequest, config.mapperMode); @@ -165,12 +166,13 @@ void SharedPV::open(const pvd::PVStructure &value, const epics::pvData::BitSet& } } FOR_EACH(rpcs_t::const_iterator, it, end, rpcs) { - if((*it)->connected) continue; + if((*it)->connected || (*it)->channel->dead) continue; try { p_rpc.push_back((*it)->shared_from_this()); }catch(std::tr1::bad_weak_ptr&) {} } FOR_EACH(monitors_t::const_iterator, it, end, monitors) { + if((*it)->channel->dead) continue; try { (*it)->open(newtype); // post initial update @@ -180,6 +182,7 @@ void SharedPV::open(const pvd::PVStructure &value, const epics::pvData::BitSet& } // consume getField FOR_EACH(getfields_t::iterator, it, end, getfields) { + // TODO: this may be on a dead Channel p_getfield.push_back(it->lock()); } getfields.clear(); // consume diff --git a/src/server/sharedstate_rpc.cpp b/src/server/sharedstate_rpc.cpp index 336d083..e0dd540 100644 --- a/src/server/sharedstate_rpc.cpp +++ b/src/server/sharedstate_rpc.cpp @@ -109,17 +109,30 @@ void SharedRPC::lastRequest() {} void SharedRPC::request(epics::pvData::PVStructure::shared_pointer const & pvArgument) { std::tr1::shared_ptr handler; + pvd::Status sts; { Guard G(channel->owner->mutex); - handler = channel->owner->handler; + if(channel->dead) { + sts = pvd::Status::error("Dead Channel"); + + } else { + handler = channel->owner->handler; + } } - std::tr1::shared_ptr impl(new RPCOP(shared_from_this(), pvRequest, pvArgument), - Operation::Impl::Cleanup()); + if(!sts.isOK()) { + requester_type::shared_pointer req(requester.lock()); + if(req) + req->requestDone(sts, shared_from_this(), pvd::PVStructurePtr()); - if(handler) { - Operation op(impl); - handler->onRPC(channel->owner, op); + } else { + std::tr1::shared_ptr impl(new RPCOP(shared_from_this(), pvRequest, pvArgument), + Operation::Impl::Cleanup()); + + if(handler) { + Operation op(impl); + handler->onRPC(channel->owner, op); + } } } diff --git a/src/server/sharedstateimpl.h b/src/server/sharedstateimpl.h index 8fbf362..c1a72ef 100644 --- a/src/server/sharedstateimpl.h +++ b/src/server/sharedstateimpl.h @@ -31,6 +31,8 @@ struct SharedChannel : public pva::Channel, const requester_type::weak_pointer requester; const pva::ChannelProvider::weak_pointer provider; + bool dead; // has destroy() been called? + SharedChannel(const std::tr1::shared_ptr& owner, const pva::ChannelProvider::shared_pointer provider, const std::string& channelName, From 731767b086e8146f31c1f9038568e3c852c609f7 Mon Sep 17 00:00:00 2001 From: Michael Davidsaver Date: Mon, 8 Oct 2018 07:10:03 -0700 Subject: [PATCH 12/12] SharedPV fix onFirstConnect() on monitor creation --- src/server/sharedstate_channel.cpp | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/server/sharedstate_channel.cpp b/src/server/sharedstate_channel.cpp index e837331..41ebbe0 100644 --- a/src/server/sharedstate_channel.cpp +++ b/src/server/sharedstate_channel.cpp @@ -242,9 +242,10 @@ pva::Monitor::shared_pointer SharedChannel::createMonitor( requester->monitorConnect(sts, pvd::MonitorPtr(), pvd::StructureConstPtr()); ret.reset(); - } else if(notify) { - ret->notify(); - + } else { + if(notify) { + ret->notify(); + } if(handler) { handler->onFirstConnect(owner); }