From c4d2cd4a1081cb4bf6f4ea238710dc5b61782257 Mon Sep 17 00:00:00 2001 From: Michael Davidsaver Date: Wed, 30 Dec 2020 09:00:47 -0800 Subject: [PATCH] client: reorg context Pvt --- src/client.cpp | 115 ++++++++++++++++----------------------- src/clientconn.cpp | 2 +- src/clientget.cpp | 20 ++++--- src/clientimpl.h | 37 +++++++------ src/clientintrospect.cpp | 5 +- src/clientmon.cpp | 5 +- src/util.cpp | 4 +- src/utilpvt.h | 2 +- 8 files changed, 92 insertions(+), 98 deletions(-) diff --git a/src/client.cpp b/src/client.cpp index cc0c6cb..4a75b94 100644 --- a/src/client.cpp +++ b/src/client.cpp @@ -69,7 +69,7 @@ Timeout::Timeout() {} Timeout::~Timeout() {} -Channel::Channel(const std::shared_ptr& context, const std::string& name, uint32_t cid) +Channel::Channel(const std::shared_ptr& context, const std::string& name, uint32_t cid) :context(context) ,name(name) ,cid(cid) @@ -159,7 +159,9 @@ std::shared_ptr ConnectBuilder::exec() if(!ctx) throw std::logic_error("NULL Builder"); - auto op(std::make_shared(ctx->tcp_loop, _pvname)); + auto context(ctx->impl->shared_from_this()); + + auto op(std::make_shared(context->tcp_loop, _pvname)); op->_onConn = std::move(_onConn); op->_onDis = std::move(_onDis); @@ -179,7 +181,6 @@ std::shared_ptr ConnectBuilder::exec() op.reset(); }); - auto context(ctx->shared_from_this()); context->tcp_loop.dispatch([op, context]() { // on worker @@ -255,7 +256,7 @@ RequestInfo::RequestInfo(uint32_t sid, uint32_t ioid, std::shared_ptr Channel::build(const std::shared_ptr& context, const std::string& name) +std::shared_ptr Channel::build(const std::shared_ptr& context, const std::string& name) { std::shared_ptr chan; @@ -287,38 +288,8 @@ Operation::~Operation() {} Subscription::~Subscription() {} Context::Context(const Config& conf) -{ - /* Here be dragons. - * - * We keep two different ref. counters. - * - "externel" counter which keeps a server running. - * - "internal" which only keeps server storage from being destroyed. - * - * External refs are held as Server::pvt. Internal refs are - * held by various in-progress operations (OpBase sub-classes) - * Which need to safely access server storage, but should not - * prevent a server from stopping. - */ - auto internal(std::make_shared(conf)); - internal->internal_self = internal; - cnt_ClientPvtLive.fetch_add(1u); - - // external - pvt.reset(internal.get(), [internal](Pvt*) mutable { - auto temp(std::move(internal)); - try { - temp->close(); - }catch(std::exception& e){ - // called through ~shared_ptr and can't propagate exceptions. - // log and continue... - log_exc_printf(setup, "Error while closing Context (%s) : %s\n", - typeid(e).name(), e.what()); - } - cnt_ClientPvtLive.fetch_sub(1u); - }); - // we don't keep a weak_ptr to the external reference. - // Caller is entirely responsible for keeping this server running -} + :pvt(std::make_shared(conf)) +{} Context::~Context() {} @@ -327,7 +298,7 @@ const Config& Context::config() const if(!pvt) throw std::logic_error("NULL Context"); - return pvt->effective; + return pvt->impl->effective; } void Context::hurryUp() @@ -335,8 +306,8 @@ void Context::hurryUp() if(!pvt) throw std::logic_error("NULL Context"); - pvt->manager.loop().call([this](){ - pvt->poke(true); + pvt->impl->manager.loop().call([this](){ + pvt->impl->poke(true); }); } @@ -345,10 +316,10 @@ void Context::cacheClear() if(!pvt) throw std::logic_error("NULL Context"); - pvt->tcp_loop.call([this](){ + pvt->impl->tcp_loop.call([this](){ // run twice to ensure both mark and sweep of all unused channels - pvt->cacheClean(); - pvt->cacheClean(); + pvt->impl->cacheClean(); + pvt->impl->cacheClean(); }); } @@ -356,9 +327,9 @@ Report Context::report() const { Report ret; - pvt->tcp_loop.call([this, &ret](){ + pvt->impl->tcp_loop.call([this, &ret](){ - for(auto& pair : pvt->connByAddr) { + for(auto& pair : pvt->impl->connByAddr) { auto conn = pair.second.lock(); if(!conn) continue; @@ -400,16 +371,16 @@ Value buildCAMethod() }).create(); } -Context::Pvt::Pvt(const Config& conf) +ContextImpl::ContextImpl(const Config& conf, const evbase& tcp_loop) :effective(conf) ,caMethod(buildCAMethod()) ,searchTx(AF_INET, SOCK_DGRAM, 0) - ,tcp_loop("PVXCTCP", epicsThreadPriorityCAServerLow) - ,searchRx(event_new(tcp_loop.base, searchTx.sock, EV_READ|EV_PERSIST, &Pvt::onSearchS, this)) - ,searchTimer(event_new(tcp_loop.base, -1, EV_TIMEOUT, &Pvt::tickSearchS, this)) + ,tcp_loop(tcp_loop) + ,searchRx(event_new(tcp_loop.base, searchTx.sock, EV_READ|EV_PERSIST, &ContextImpl::onSearchS, this)) + ,searchTimer(event_new(tcp_loop.base, -1, EV_TIMEOUT, &ContextImpl::tickSearchS, this)) ,manager(UDPManager::instance()) - ,beaconCleaner(event_new(manager.loop().base, -1, EV_TIMEOUT|EV_PERSIST, &Pvt::tickBeaconCleanS, this)) - ,cacheCleaner(event_new(tcp_loop.base, -1, EV_TIMEOUT|EV_PERSIST, &Pvt::cacheCleanS, this)) + ,beaconCleaner(event_new(manager.loop().base, -1, EV_TIMEOUT|EV_PERSIST, &ContextImpl::tickBeaconCleanS, this)) + ,cacheCleaner(event_new(tcp_loop.base, -1, EV_TIMEOUT|EV_PERSIST, &ContextImpl::cacheCleanS, this)) { effective.expand(); @@ -480,9 +451,9 @@ Context::Pvt::Pvt(const Config& conf) log_err_printf(setup, "Error enabling channel cache clean timer on\n%s", ""); } -Context::Pvt::~Pvt() {} +ContextImpl::~ContextImpl() {} -void Context::Pvt::close() +void ContextImpl::close() { // terminate all active connections tcp_loop.call([this]() { @@ -516,7 +487,7 @@ void Context::Pvt::close() manager.sync(); } -void Context::Pvt::poke(bool force) +void ContextImpl::poke(bool force) { { Guard G(pokeLock); @@ -541,7 +512,7 @@ void Context::Pvt::poke(bool force) throw std::runtime_error("Unable to schedule searchTimer"); } -void Context::Pvt::onBeacon(const UDPManager::Beacon& msg) +void ContextImpl::onBeacon(const UDPManager::Beacon& msg) { const auto& guid = msg.guid; @@ -565,7 +536,7 @@ void Context::Pvt::onBeacon(const UDPManager::Beacon& msg) poke(false); } -bool Context::Pvt::onSearch() +bool ContextImpl::onSearch() { searchMsg.resize(0x10000); SockAddr src; @@ -680,7 +651,7 @@ bool Context::Pvt::onSearch() auto it = connByAddr.find(serv); if(it==connByAddr.end() || !(chan->conn = it->second.lock())) { - connByAddr[serv] = chan->conn = std::make_shared(internal_self.lock(), serv); + connByAddr[serv] = chan->conn = std::make_shared(shared_from_this(), serv); } chan->conn->pending.push_back(chan); @@ -709,7 +680,7 @@ bool Context::Pvt::onSearch() return true; } -void Context::Pvt::onSearchS(evutil_socket_t fd, short evt, void *raw) +void ContextImpl::onSearchS(evutil_socket_t fd, short evt, void *raw) { try { log_debug_printf(io, "UDP search Rx event %x\n", evt); @@ -719,7 +690,7 @@ void Context::Pvt::onSearchS(evutil_socket_t fd, short evt, void *raw) // limit number of packets processed before going back to the reactor unsigned i; const unsigned limit = 40; - for(i=0; i(raw)->onSearch(); i++) {} + for(i=0; i(raw)->onSearch(); i++) {} log_debug_printf(io, "UDP search processed %u/%u\n", i, limit); }catch(std::exception& e){ @@ -727,7 +698,7 @@ void Context::Pvt::onSearchS(evutil_socket_t fd, short evt, void *raw) } } -void Context::Pvt::tickSearch() +void ContextImpl::tickSearch() { { Guard G(pokeLock); @@ -862,16 +833,16 @@ void Context::Pvt::tickSearch() log_err_printf(setup, "Error re-enabling search timer on\n%s", ""); } -void Context::Pvt::tickSearchS(evutil_socket_t fd, short evt, void *raw) +void ContextImpl::tickSearchS(evutil_socket_t fd, short evt, void *raw) { try { - static_cast(raw)->tickSearch(); + static_cast(raw)->tickSearch(); }catch(std::exception& e){ log_exc_printf(io, "Unhandled error in search timer callback: %s\n", e.what()); } } -void Context::Pvt::tickBeaconClean() +void ContextImpl::tickBeaconClean() { epicsTimeStamp now; epicsTimeGetCurrent(&now); @@ -894,16 +865,16 @@ void Context::Pvt::tickBeaconClean() } } -void Context::Pvt::tickBeaconCleanS(evutil_socket_t fd, short evt, void *raw) +void ContextImpl::tickBeaconCleanS(evutil_socket_t fd, short evt, void *raw) { try { - static_cast(raw)->tickBeaconClean(); + static_cast(raw)->tickBeaconClean(); }catch(std::exception& e){ log_exc_printf(io, "Unhandled error in beacon cleaner timer callback: %s\n", e.what()); } } -void Context::Pvt::cacheClean() +void ContextImpl::cacheClean() { std::set trash; @@ -928,15 +899,25 @@ void Context::Pvt::cacheClean() } } -void Context::Pvt::cacheCleanS(evutil_socket_t fd, short evt, void *raw) +void ContextImpl::cacheCleanS(evutil_socket_t fd, short evt, void *raw) { try { - static_cast(raw)->tickBeaconClean(); + static_cast(raw)->tickBeaconClean(); }catch(std::exception& e){ log_exc_printf(io, "Unhandled error in beacon cleaner timer callback: %s\n", e.what()); } } +Context::Pvt::Pvt(const Config& conf) + :loop("PVXCTCP", epicsThreadPriorityCAServerLow) + ,impl(std::make_shared(conf, loop)) +{} + +Context::Pvt::~Pvt() +{ + impl->close(); +} + } // namespace client } // namespace pvxs diff --git a/src/clientconn.cpp b/src/clientconn.cpp index ae22791..723701a 100644 --- a/src/clientconn.cpp +++ b/src/clientconn.cpp @@ -14,7 +14,7 @@ namespace client { DEFINE_LOGGER(io, "pvxs.client.io"); -Connection::Connection(const std::shared_ptr& context, const SockAddr& peerAddr) +Connection::Connection(const std::shared_ptr& context, const SockAddr& peerAddr) :ConnBase (true, bufferevent_socket_new(context->tcp_loop.base, -1, BEV_OPT_CLOSE_ON_FREE|BEV_OPT_DEFER_CALLBACKS), peerAddr) diff --git a/src/clientget.cpp b/src/clientget.cpp index 41c3ebb..a2dba98 100644 --- a/src/clientget.cpp +++ b/src/clientget.cpp @@ -528,7 +528,7 @@ void Connection::handle_PUT() { handle_GPR(CMD_PUT); } void Connection::handle_RPC() { handle_GPR(CMD_RPC); } static -std::shared_ptr gpr_setup(const std::shared_ptr& context, +std::shared_ptr gpr_setup(const std::shared_ptr& context, std::string name, // need to capture by value const std::shared_ptr& op) { @@ -566,12 +566,14 @@ std::shared_ptr GetBuilder::_exec_get() if(!ctx) throw std::logic_error("NULL Builder"); - auto op(std::make_shared(Operation::Get, ctx->tcp_loop)); + auto context(ctx->impl->shared_from_this()); + + auto op(std::make_shared(Operation::Get, context->tcp_loop)); op->setDone(std::move(_result), std::move(_onInit)); op->autoExec = _autoexec; op->pvRequest = _buildReq(); - return gpr_setup(ctx->shared_from_this(), _name, op); + return gpr_setup(context, _name, op); } std::shared_ptr PutBuilder::exec() @@ -579,7 +581,9 @@ std::shared_ptr PutBuilder::exec() if(!ctx) throw std::logic_error("NULL Builder"); - auto op(std::make_shared(Operation::Put, ctx->tcp_loop)); + auto context(ctx->impl->shared_from_this()); + + auto op(std::make_shared(Operation::Put, context->tcp_loop)); op->setDone(std::move(_result), std::move(_onInit)); if(_builder) { @@ -599,7 +603,7 @@ std::shared_ptr PutBuilder::exec() op->autoExec = _autoexec; op->pvRequest = _buildReq(); - return gpr_setup(ctx->shared_from_this(), _name, op); + return gpr_setup(context, _name, op); } std::shared_ptr RPCBuilder::exec() @@ -607,7 +611,9 @@ std::shared_ptr RPCBuilder::exec() if(!ctx) throw std::logic_error("NULL Builder"); - auto op(std::make_shared(Operation::RPC, ctx->tcp_loop)); + auto context(ctx->impl->shared_from_this()); + + auto op(std::make_shared(Operation::RPC, context->tcp_loop)); op->setDone(std::move(_result), std::move(_onInit)); if(_argument) { if(!_autoexec) @@ -620,7 +626,7 @@ std::shared_ptr RPCBuilder::exec() op->autoExec = _autoexec; op->pvRequest = _buildReq(); - return gpr_setup(ctx->shared_from_this(), _name, op); + return gpr_setup(context, _name, op); } } // namespace client diff --git a/src/clientimpl.h b/src/clientimpl.h index 55bf57f..b8862f0 100644 --- a/src/clientimpl.h +++ b/src/clientimpl.h @@ -24,6 +24,7 @@ namespace pvxs { namespace client { struct Channel; +struct ContextImpl; struct ResultWaiter { epicsMutex lock; @@ -72,7 +73,7 @@ struct RequestInfo { }; struct Connection : public ConnBase, public std::enable_shared_from_this { - const std::shared_ptr context; + const std::shared_ptr context; const evevent echoTimer; @@ -91,7 +92,7 @@ struct Connection : public ConnBase, public std::enable_shared_from_this& context, const SockAddr &peerAddr); + Connection(const std::shared_ptr& context, const SockAddr &peerAddr); virtual ~Connection(); void createChannels(); @@ -145,7 +146,7 @@ struct ConnectImpl : public Connect }; struct Channel { - const std::shared_ptr context; + const std::shared_ptr context; const std::string name; // Our choosen ID for this channel. // used as persistent CID and searchID @@ -181,26 +182,20 @@ struct Channel { INST_COUNTER(Channel); - Channel(const std::shared_ptr& context, const std::string& name, uint32_t cid); + Channel(const std::shared_ptr& context, const std::string& name, uint32_t cid); ~Channel(); void createOperations(); void disconnect(const std::shared_ptr& self); static - std::shared_ptr build(const std::shared_ptr& context, const std::string &name); + std::shared_ptr build(const std::shared_ptr& context, const std::string &name); }; -struct Context::Pvt +struct ContextImpl : public std::enable_shared_from_this { SockAttach attach; - std::weak_ptr internal_self; - std::shared_ptr shared_from_this() { - std::shared_ptr ret(internal_self); - return ret; - } - // "const" after ctor Config effective; @@ -235,7 +230,7 @@ struct Context::Pvt std::map> chanByCID; // strong ref. loop through Channel::context - // explicitly broken by Context::close(), Context::cacheClear, or Context::Pvt::cacheClean() + // explicitly broken by Context::close(), Context::cacheClear, or ContextImpl::cacheClean() std::map> chanByName; std::map> connByAddr; @@ -251,10 +246,10 @@ struct Context::Pvt const evevent beaconCleaner; const evevent cacheCleaner; - INST_COUNTER(ClientPvt); + INST_COUNTER(ClientContextImpl); - Pvt(const Config& conf); - ~Pvt(); + ContextImpl(const Config& conf, const evbase &tcp_loop); + ~ContextImpl(); void close(); @@ -272,6 +267,16 @@ struct Context::Pvt static void cacheCleanS(evutil_socket_t fd, short evt, void *raw); }; +struct Context::Pvt { + evbase loop; + std::shared_ptr impl; + + INST_COUNTER(ClientPvt); + + Pvt(const Config& conf); + ~Pvt(); // I call ContextImpl::close() +}; + } // namespace client } // namespace pvxs diff --git a/src/clientintrospect.cpp b/src/clientintrospect.cpp index 5d46c12..fdee3f6 100644 --- a/src/clientintrospect.cpp +++ b/src/clientintrospect.cpp @@ -180,7 +180,9 @@ std::shared_ptr GetBuilder::_exec_info() if(!ctx) throw std::logic_error("NULL Builder"); - auto op(std::make_shared(ctx->tcp_loop)); + auto context(ctx->impl->shared_from_this()); + + auto op(std::make_shared(context->tcp_loop)); if(_result) { op->done = std::move(_result); } else { @@ -207,7 +209,6 @@ std::shared_ptr GetBuilder::_exec_info() }); auto name(std::move(_name)); - auto context(ctx->shared_from_this()); context->tcp_loop.dispatch([op, context, name]() { // on worker diff --git a/src/clientmon.cpp b/src/clientmon.cpp index c30f50f..8cead55 100644 --- a/src/clientmon.cpp +++ b/src/clientmon.cpp @@ -552,7 +552,9 @@ std::shared_ptr MonitorBuilder::exec() if(!ctx) throw std::logic_error("NULL Builder"); - auto op(std::make_shared(ctx->tcp_loop)); + auto context(ctx->impl->shared_from_this()); + + auto op(std::make_shared(context->tcp_loop)); op->self = op; op->event = std::move(_event); op->onInit = std::move(_onInit); @@ -619,7 +621,6 @@ std::shared_ptr MonitorBuilder::exec() }); auto name(std::move(_name)); - auto context(ctx->shared_from_this()); context->tcp_loop.dispatch([op, context, name]() { // on worker diff --git a/src/util.cpp b/src/util.cpp index 336ad80..b2aad80 100644 --- a/src/util.cpp +++ b/src/util.cpp @@ -67,7 +67,7 @@ CASE(GPROp); CASE(Connection); CASE(Channel); CASE(ClientPvt); -CASE(ClientPvtLive); +CASE(ClientContextImpl); CASE(InfoOp); CASE(SubScriptionImpl); @@ -104,7 +104,7 @@ CASE(GPROp); CASE(Connection); CASE(Channel); CASE(ClientPvt); -CASE(ClientPvtLive); +CASE(ClientContextImpl); CASE(InfoOp); CASE(SubScriptionImpl); diff --git a/src/utilpvt.h b/src/utilpvt.h index cb85efd..8a12bda 100644 --- a/src/utilpvt.h +++ b/src/utilpvt.h @@ -303,7 +303,7 @@ CASE(GPROp); CASE(Connection); CASE(Channel); CASE(ClientPvt); -CASE(ClientPvtLive); +CASE(ClientContextImpl); CASE(InfoOp); CASE(SubScriptionImpl);