diff --git a/src/client/clientGet.cpp b/src/client/clientGet.cpp index 6314db0..d72e794 100644 --- a/src/client/clientGet.cpp +++ b/src/client/clientGet.cpp @@ -50,7 +50,11 @@ struct Getter : public pvac::detail::CallbackStorage, pvac::ClientChannel::GetCallback *C=cb; cb = 0; CallbackUse U(G); - C->getDone(event); + try { + C->getDone(event); + } catch(std::exception& e) { + LOG(pva::logLevelInfo, "Lost exception during getDone(): %s", e.what()); + } } virtual std::string name() const OVERRIDE FINAL diff --git a/src/client/clientPut.cpp b/src/client/clientPut.cpp index bb481e8..86130aa 100644 --- a/src/client/clientPut.cpp +++ b/src/client/clientPut.cpp @@ -54,7 +54,11 @@ struct Putter : public pvac::detail::CallbackStorage, pvac::ClientChannel::PutCallback *C=cb; cb = 0; CallbackUse U(G); - C->putDone(event); + try { + C->putDone(event); + } catch(std::exception& e) { + LOG(pva::logLevelInfo, "Lost exception during putDone(): %s", e.what()); + } } virtual std::string name() const OVERRIDE FINAL @@ -205,13 +209,13 @@ namespace pvac { Operation ClientChannel::put(PutCallback* cb, epics::pvData::PVStructure::const_shared_pointer pvRequest, - bool getcurrent) + bool getprevious) { if(!impl) throw std::logic_error("Dead Channel"); if(!pvRequest) pvRequest = pvd::createRequest("field()"); - std::tr1::shared_ptr ret(Putter::build(cb, getcurrent)); + std::tr1::shared_ptr ret(Putter::build(cb, getprevious)); { Guard G(ret->mutex); diff --git a/src/client/clientRPC.cpp b/src/client/clientRPC.cpp index ff6d54d..e3d0ce9 100644 --- a/src/client/clientRPC.cpp +++ b/src/client/clientRPC.cpp @@ -29,6 +29,7 @@ struct RPCer : public pvac::detail::CallbackStorage, operation_type::shared_pointer op; pvac::ClientChannel::GetCallback *cb; + // 'event' may be modified as long as cb!=NULL pvac::GetEvent event; pvd::PVStructure::const_shared_pointer args; @@ -70,7 +71,7 @@ struct RPCer : public pvac::detail::CallbackStorage, } // called automatically via wrapped_shared_from_this - virtual void cancel() + virtual void cancel() OVERRIDE FINAL { std::tr1::shared_ptr keepalive(internal_shared_from_this()); CallbackGuard G(*this); @@ -86,7 +87,7 @@ struct RPCer : public pvac::detail::CallbackStorage, virtual void channelRPCConnect( const epics::pvData::Status& status, - pva::ChannelRPC::shared_pointer const & operation) + pva::ChannelRPC::shared_pointer const & operation) OVERRIDE FINAL { std::tr1::shared_ptr keepalive(internal_shared_from_this()); CallbackGuard G(*this); @@ -110,6 +111,7 @@ struct RPCer : public pvac::detail::CallbackStorage, { std::tr1::shared_ptr keepalive(internal_shared_from_this()); CallbackGuard G(*this); + if(!cb) return; event.message = "Disconnect"; callEvent(G); @@ -118,7 +120,7 @@ struct RPCer : public pvac::detail::CallbackStorage, virtual void requestDone( const epics::pvData::Status& status, pva::ChannelRPC::shared_pointer const & operation, - epics::pvData::PVStructure::shared_pointer const & pvResponse) + epics::pvData::PVStructure::shared_pointer const & pvResponse) OVERRIDE FINAL { std::tr1::shared_ptr keepalive(internal_shared_from_this()); CallbackGuard G(*this); diff --git a/src/client/pva/client.h b/src/client/pva/client.h index b5e25e1..89c3a74 100644 --- a/src/client/pva/client.h +++ b/src/client/pva/client.h @@ -385,7 +385,7 @@ public: //! If false, then previous=NULL Operation put(PutCallback* cb, epics::pvData::PVStructure::const_shared_pointer pvRequest = epics::pvData::PVStructure::const_shared_pointer(), - bool getcurrent = false); + bool getprevious = false); //! Synchronious put operation inline diff --git a/src/pva/pv/pvaConstants.h b/src/pva/pv/pvaConstants.h index 9ee652d..d090141 100644 --- a/src/pva/pv/pvaConstants.h +++ b/src/pva/pv/pvaConstants.h @@ -18,6 +18,7 @@ # define epicsExportSharedSymbols # undef pvaConstantsepicsExportSharedSymbols #endif +#include namespace epics { namespace pvAccess { @@ -73,13 +74,13 @@ const epics::pvData::int16 INVALID_DATA_TYPE = 0xFFFF; const epics::pvData::int32 INVALID_IOID = 0; /** Default PVA provider name. */ -const std::string PVACCESS_DEFAULT_PROVIDER; +epicsShareExtern const std::string PVACCESS_DEFAULT_PROVIDER; /** "All-providers registered" PVA provider name. */ -const std::string PVACCESS_ALL_PROVIDERS; +epicsShareExtern const std::string PVACCESS_ALL_PROVIDERS; /** Name of the system env. variable to turn on debugging. */ -const std::string PVACCESS_DEBUG; +epicsShareExtern const std::string PVACCESS_DEBUG; } } diff --git a/src/pva/pvaVersion.cpp b/src/pva/pvaVersion.cpp index f196242..815a3a5 100644 --- a/src/pva/pvaVersion.cpp +++ b/src/pva/pvaVersion.cpp @@ -8,6 +8,7 @@ #define epicsExportSharedSymbols #include +#include using std::stringstream; using std::string; @@ -15,9 +16,9 @@ using std::string; namespace epics { namespace pvAccess { -const std::string PVACCESS_DEFAULT_PROVIDER = "local"; -const std::string PVACCESS_ALL_PROVIDERS = ""; -const std::string PVACCESS_DEBUG = "EPICS_PVA_DEBUG"; +const std::string PVACCESS_DEFAULT_PROVIDER("local"); +const std::string PVACCESS_ALL_PROVIDERS(""); +const std::string PVACCESS_DEBUG("EPICS_PVA_DEBUG"); Version::Version(std::string const & productName, std::string const & implementationLangugage, diff --git a/src/remote/blockingUDPTransport.cpp b/src/remote/blockingUDPTransport.cpp index a0db1ef..084a510 100644 --- a/src/remote/blockingUDPTransport.cpp +++ b/src/remote/blockingUDPTransport.cpp @@ -225,7 +225,7 @@ void BlockingUDPTransport::run() { 0, (sockaddr*)&fromAddress, &addrStructSize); - if(likely(bytesRead>0)) { + if(likely(bytesRead>=0)) { // successfully got datagram bool ignore = false; for(size_t i = 0; i <_ignoredAddresses.size(); i++) @@ -233,11 +233,22 @@ void BlockingUDPTransport::run() { if(_ignoredAddresses[i].ia.sin_addr.s_addr==fromAddress.ia.sin_addr.s_addr) { ignore = true; + if(pvAccessIsLoggable(logLevelDebug)) { + char strBuffer[64]; + sockAddrToDottedIP(&fromAddress.sa, strBuffer, sizeof(strBuffer)); + LOG(logLevelDebug, "UDP Ignore (%d) %s x- %s", bytesRead, _remoteName.c_str(), strBuffer); + } break; } } if(likely(!ignore)) { + if(pvAccessIsLoggable(logLevelDebug)) { + char strBuffer[64]; + sockAddrToDottedIP(&fromAddress.sa, strBuffer, sizeof(strBuffer)); + LOG(logLevelDebug, "UDP Rx (%d) %s <- %s", bytesRead, _remoteName.c_str(), strBuffer); + } + _receiveBuffer.setPosition(RECEIVE_BUFFER_PRE_RESERVE); _receiveBuffer.setLimit(RECEIVE_BUFFER_PRE_RESERVE+bytesRead); @@ -253,8 +264,7 @@ void BlockingUDPTransport::run() { __FILE__, __LINE__); } } - } - else if (unlikely(bytesRead == -1)) { + } else { int socketError = SOCKERRNO; @@ -400,7 +410,7 @@ bool BlockingUDPTransport::send(const char* buffer, size_t length, const osiSock { if (IS_LOGGABLE(logLevelDebug)) { - LOG(logLevelDebug, "Sending %zu bytes %s -> %s.", + LOG(logLevelDebug, "UDP Tx (%zu) %s -> %s.", length, _remoteName.c_str(), inetAddressToString(address).c_str()); } diff --git a/src/server/pva/sharedstate.h b/src/server/pva/sharedstate.h index e424dd7..a9e2991 100644 --- a/src/server/pva/sharedstate.h +++ b/src/server/pva/sharedstate.h @@ -206,7 +206,10 @@ private: //! Used for initial Monitor update and Get operations. epics::pvData::BitSet valid; - bool notifiedConn; // whether onFirstConnect() has been, or is being, called + // whether onFirstConnect() has been, or is being, called. + // Set when the first getField, Put, or Monitor (but not RPC) is created. + // Cleared when the last Channel is destroyed. + bool notifiedConn; int debugLvl; diff --git a/src/server/responseHandlers.cpp b/src/server/responseHandlers.cpp index 80d1d88..d69835f 100644 --- a/src/server/responseHandlers.cpp +++ b/src/server/responseHandlers.cpp @@ -2988,6 +2988,7 @@ ServerChannelRPCRequesterImpl::ServerChannelRPCRequesterImpl( const pvAccessID ioid, Transport::shared_pointer const & transport): BaseChannelRequester(context, channel, ioid, transport), _channelRPC(), _pvResponse() + ,_status(Status::fatal("Invalid State")) { } @@ -3090,6 +3091,7 @@ void ServerChannelRPCRequesterImpl::send(ByteBuffer* buffer, TransportSendContro SerializationHelper::serializeStructureFull(buffer, control, _pvResponse); } } + _status = Status::fatal("Stale state"); } stopRequest(); diff --git a/src/server/sharedstate_channel.cpp b/src/server/sharedstate_channel.cpp index 76b6c64..41ebbe0 100644 --- a/src/server/sharedstate_channel.cpp +++ b/src/server/sharedstate_channel.cpp @@ -35,6 +35,7 @@ SharedChannel::SharedChannel(const std::tr1::shared_ptr &owner, ,channelName(channelName) ,requester(requester) ,provider(provider) + ,dead(false) { REFTRACE_INCREMENT(num_instances); @@ -45,24 +46,27 @@ SharedChannel::SharedChannel(const std::tr1::shared_ptr &owner, this); } - SharedPV::Handler::shared_pointer handler; { Guard G(owner->mutex); - if(owner->channels.empty()) - handler = owner->handler; owner->channels.push_back(this); - owner->notifiedConn = true; - } - if(handler) { - handler->onFirstConnect(owner); } } SharedChannel::~SharedChannel() +{ + destroy(); + REFTRACE_DECREMENT(num_instances); +} + +void SharedChannel::destroy() { std::tr1::shared_ptr handler; { Guard G(owner->mutex); + + if(dead) return; + dead = true; + bool wasempty = owner->channels.empty(); owner->channels.remove(this); if(!wasempty && owner->channels.empty() && owner->notifiedConn) { @@ -81,12 +85,8 @@ SharedChannel::~SharedChannel() channelName.c_str(), this); } - - REFTRACE_DECREMENT(num_instances); } -void SharedChannel::destroy() {} - std::tr1::shared_ptr SharedChannel::getProvider() { return provider.lock(); @@ -109,16 +109,32 @@ std::tr1::shared_ptr SharedChannel::getChannelRequester() void SharedChannel::getField(pva::GetFieldRequester::shared_pointer const & requester,std::string const & subField) { - epics::pvData::FieldConstPtr desc; + pvd::FieldConstPtr desc; + pvd::Status sts; + SharedPV::Handler::shared_pointer handler; { Guard G(owner->mutex); - if(owner->type) - desc = owner->type; - else + if(dead) { + sts = pvd::Status::error("Dead Channel"); + + } else { + if(owner->type) { + desc = owner->type; + } + + if(!owner->channels.empty() && !owner->notifiedConn) { + handler = owner->handler; + owner->notifiedConn = true; + } owner->getfields.push_back(requester); + } + } + if(desc || !sts.isOK()) { + requester->getDone(sts, desc); + } + if(handler) { + handler->onFirstConnect(owner); } - if(desc) - requester->getDone(pvd::Status(), desc); } pva::ChannelPut::shared_pointer SharedChannel::createChannelPut( @@ -128,27 +144,42 @@ pva::ChannelPut::shared_pointer SharedChannel::createChannelPut( std::tr1::shared_ptr ret(new SharedPut(shared_from_this(), requester, pvRequest)); pvd::StructureConstPtr type; + pvd::Status sts; std::string warning; + SharedPV::Handler::shared_pointer handler; try { { Guard G(owner->mutex); - // ~SharedPut removes - owner->puts.push_back(ret.get()); - if(owner->current) { - ret->mapper.compute(*owner->current, *pvRequest, owner->config.mapperMode); - type = ret->mapper.requested(); - warning = ret->mapper.warnings(); + if(dead) { + sts = pvd::Status::error("Dead Channel"); + + } else { + // ~SharedPut removes + owner->puts.push_back(ret.get()); + if(owner->current) { + ret->mapper.compute(*owner->current, *pvRequest, owner->config.mapperMode); + type = ret->mapper.requested(); + warning = ret->mapper.warnings(); + } + + if(!owner->channels.empty() && !owner->notifiedConn) { + handler = owner->handler; + owner->notifiedConn = true; + } } } if(!warning.empty()) requester->message(warning, pvd::warningMessage); - if(type) - requester->channelPutConnect(pvd::Status(), ret, type); + if(type || !sts.isOK()) + requester->channelPutConnect(sts, ret, type); }catch(std::runtime_error& e){ ret.reset(); type.reset(); requester->channelPutConnect(pvd::Status::error(e.what()), ret, type); } + if(handler) { + handler->onFirstConnect(owner); + } return ret; } @@ -157,11 +188,19 @@ pva::ChannelRPC::shared_pointer SharedChannel::createChannelRPC( pvd::PVStructure::shared_pointer const & pvRequest) { std::tr1::shared_ptr ret(new SharedRPC(shared_from_this(), requester, pvRequest)); + ret->connected = true; + + pvd::Status sts; { Guard G(owner->mutex); - owner->rpcs.push_back(ret.get()); + if(dead) { + sts = pvd::Status::error("Dead Channel"); + + } else { + owner->rpcs.push_back(ret.get()); + } } - requester->channelRPCConnect(pvd::Status(), ret); + requester->channelRPCConnect(sts, ret); return ret; } @@ -170,22 +209,47 @@ pva::Monitor::shared_pointer SharedChannel::createMonitor( pvd::PVStructure::shared_pointer const & pvRequest) { SharedMonitorFIFO::Config mconf; + SharedPV::Handler::shared_pointer handler; mconf.dropEmptyUpdates = owner->config.dropEmptyUpdates; mconf.mapperMode = owner->config.mapperMode; + std::tr1::shared_ptr ret(new SharedMonitorFIFO(shared_from_this(), requester, pvRequest, &mconf)); + bool notify; + pvd::Status sts; { Guard G(owner->mutex); - owner->monitors.push_back(ret.get()); - notify = !!owner->type; - if(notify) { - ret->open(owner->type); - // post initial update - ret->post(*owner->current, owner->valid); + if(dead) { + sts = pvd::Status::error("Dead Channel"); + notify = false; + + } else { + owner->monitors.push_back(ret.get()); + notify = !!owner->type; + if(notify) { + ret->open(owner->type); + // post initial update + ret->post(*owner->current, owner->valid); + } + + if(!owner->channels.empty() && !owner->notifiedConn) { + handler = owner->handler; + owner->notifiedConn = true; + } + } + } + if(!sts.isOK()) { + requester->monitorConnect(sts, pvd::MonitorPtr(), pvd::StructureConstPtr()); + ret.reset(); + + } else { + if(notify) { + ret->notify(); + } + if(handler) { + handler->onFirstConnect(owner); } } - if(notify) - ret->notify(); return ret; } diff --git a/src/server/sharedstate_put.cpp b/src/server/sharedstate_put.cpp index f61f3b3..3afa2c6 100644 --- a/src/server/sharedstate_put.cpp +++ b/src/server/sharedstate_put.cpp @@ -107,43 +107,55 @@ void SharedPut::put( std::tr1::shared_ptr handler; pvd::PVStructure::shared_pointer realval; pvd::BitSet changed; + pvd::Status sts; { Guard G(channel->owner->mutex); - if(pvPutStructure->getStructure()!=mapper.requested()) { + if(channel->dead) { + sts = pvd::Status::error("Dead Channel"); + + } else if(pvPutStructure->getStructure()!=mapper.requested()) { requester_type::shared_pointer req(requester.lock()); - if(req) - req->putDone(pvd::Status::error("Type changed"), shared_from_this()); - return; + sts = pvd::Status::error("Type changed"); + + } else { + + handler = channel->owner->handler; + + realval = mapper.buildBase(); + + mapper.copyBaseFromRequested(*realval, changed, *pvPutStructure, *putBitSet); } - - handler = channel->owner->handler; - - realval = mapper.buildBase(); - - mapper.copyBaseFromRequested(*realval, changed, *pvPutStructure, *putBitSet); } - std::tr1::shared_ptr impl(new PutOP(shared_from_this(), pvRequest, realval, changed), - Operation::Impl::Cleanup()); + if(!sts.isOK()) { + requester_type::shared_pointer req(requester.lock()); + if(req) + req->putDone(sts, pva::ChannelPut::shared_pointer()); - if(handler) { - Operation op(impl); - handler->onPut(channel->owner, op); + } else { + std::tr1::shared_ptr impl(new PutOP(shared_from_this(), pvRequest, realval, changed), + Operation::Impl::Cleanup()); + + if(handler) { + Operation op(impl); + handler->onPut(channel->owner, op); + } } } void SharedPut::get() { - pvd::Status sts; pvd::PVStructurePtr current; pvd::BitSetPtr changed; - bool emptyselect = false; { Guard G(channel->owner->mutex); - if(channel->owner->current) { + if(channel->dead) { + sts = pvd::Status::error("Dead Channel"); + + } else if(channel->owner->current) { assert(!!mapper.requested()); current = mapper.buildRequested(); @@ -157,10 +169,10 @@ void SharedPut::get() requester_type::shared_pointer req(requester.lock()); if(!req) return; - if(!current) { + if(!sts.isOK()) { + // no-op + } else if(!current) { sts = pvd::Status::error("Get not possible, cache disabled"); - } else if(emptyselect) { - sts = pvd::Status::warn("pvRequest with empty field mask"); } req->getDone(sts, shared_from_this(), current, changed); diff --git a/src/server/sharedstate_pv.cpp b/src/server/sharedstate_pv.cpp index 98851dc..b10edb1 100644 --- a/src/server/sharedstate_pv.cpp +++ b/src/server/sharedstate_pv.cpp @@ -152,6 +152,7 @@ void SharedPV::open(const pvd::PVStructure &value, const epics::pvData::BitSet& this->valid = valid; FOR_EACH(puts_t::const_iterator, it, end, puts) { + if((*it)->channel->dead) continue; try { try { (*it)->mapper.compute(*current, *(*it)->pvRequest, config.mapperMode); @@ -165,11 +166,13 @@ void SharedPV::open(const pvd::PVStructure &value, const epics::pvData::BitSet& } } FOR_EACH(rpcs_t::const_iterator, it, end, rpcs) { + if((*it)->connected || (*it)->channel->dead) continue; try { p_rpc.push_back((*it)->shared_from_this()); }catch(std::tr1::bad_weak_ptr&) {} } FOR_EACH(monitors_t::const_iterator, it, end, monitors) { + if((*it)->channel->dead) continue; try { (*it)->open(newtype); // post initial update @@ -179,6 +182,7 @@ void SharedPV::open(const pvd::PVStructure &value, const epics::pvData::BitSet& } // consume getField FOR_EACH(getfields_t::iterator, it, end, getfields) { + // TODO: this may be on a dead Channel p_getfield.push_back(it->lock()); } getfields.clear(); // consume @@ -231,6 +235,11 @@ void SharedPV::close(bool destroy) { Guard I(mutex); + FOR_EACH(rpcs_t::const_iterator, it, end, rpcs) { + if(!(*it)->connected) continue; + p_rpc.push_back((*it)->requester.lock()); + } + if(type) { p_put.reserve(puts.size()); @@ -242,9 +251,6 @@ void SharedPV::close(bool destroy) (*it)->mapper.reset(); p_put.push_back((*it)->requester.lock()); } - FOR_EACH(rpcs_t::const_iterator, it, end, rpcs) { - p_rpc.push_back((*it)->requester.lock()); - } FOR_EACH(monitors_t::const_iterator, it, end, monitors) { (*it)->close(); try { diff --git a/src/server/sharedstate_rpc.cpp b/src/server/sharedstate_rpc.cpp index 3f192f6..e0dd540 100644 --- a/src/server/sharedstate_rpc.cpp +++ b/src/server/sharedstate_rpc.cpp @@ -84,6 +84,7 @@ SharedRPC::SharedRPC(const std::tr1::shared_ptr& channel, :channel(channel) ,requester(requester) ,pvRequest(pvRequest) + ,connected(false) { REFTRACE_INCREMENT(num_instances); } @@ -108,17 +109,30 @@ void SharedRPC::lastRequest() {} void SharedRPC::request(epics::pvData::PVStructure::shared_pointer const & pvArgument) { std::tr1::shared_ptr handler; + pvd::Status sts; { Guard G(channel->owner->mutex); - handler = channel->owner->handler; + if(channel->dead) { + sts = pvd::Status::error("Dead Channel"); + + } else { + handler = channel->owner->handler; + } } - std::tr1::shared_ptr impl(new RPCOP(shared_from_this(), pvRequest, pvArgument), - Operation::Impl::Cleanup()); + if(!sts.isOK()) { + requester_type::shared_pointer req(requester.lock()); + if(req) + req->requestDone(sts, shared_from_this(), pvd::PVStructurePtr()); - if(handler) { - Operation op(impl); - handler->onRPC(channel->owner, op); + } else { + std::tr1::shared_ptr impl(new RPCOP(shared_from_this(), pvRequest, pvArgument), + Operation::Impl::Cleanup()); + + if(handler) { + Operation op(impl); + handler->onRPC(channel->owner, op); + } } } diff --git a/src/server/sharedstateimpl.h b/src/server/sharedstateimpl.h index de12eca..c1a72ef 100644 --- a/src/server/sharedstateimpl.h +++ b/src/server/sharedstateimpl.h @@ -31,6 +31,8 @@ struct SharedChannel : public pva::Channel, const requester_type::weak_pointer requester; const pva::ChannelProvider::weak_pointer provider; + bool dead; // has destroy() been called? + SharedChannel(const std::tr1::shared_ptr& owner, const pva::ChannelProvider::shared_pointer provider, const std::string& channelName, @@ -107,6 +109,8 @@ struct SharedRPC : public pva::ChannelRPC, static size_t num_instances; + bool connected; // have I called requester->channelRPCConnect(Ok) ? + SharedRPC(const std::tr1::shared_ptr& channel, const requester_type::shared_pointer& requester, const pvd::PVStructure::const_shared_pointer &pvRequest);