From 564b9ec2cc0cb80b82e995075edba5e1e7e5e40d Mon Sep 17 00:00:00 2001 From: Michael Davidsaver Date: Mon, 24 Aug 2020 18:57:53 -0700 Subject: [PATCH] client: add Context::connect() A means of manually (pre)populating the channel cache. --- src/client.cpp | 54 ++++++++++++++++++++++++++++++++++++++++++++++ src/clientconn.cpp | 8 +++++++ src/clientimpl.h | 21 ++++++++++++++++++ src/pvxs/client.h | 48 +++++++++++++++++++++++++++++++++++++++++ test/testget.cpp | 51 +++++++++++++++++++++++++++++++++++++++++-- 5 files changed, 180 insertions(+), 2 deletions(-) diff --git a/src/client.cpp b/src/client.cpp index 42b7020..b616fdc 100644 --- a/src/client.cpp +++ b/src/client.cpp @@ -127,12 +127,66 @@ void Channel::disconnect(const std::shared_ptr& self) self->sid = 0xdeadbeef; // spoil context->searchBuckets[context->currentBucket].push_back(self); + auto conns(connectors); // copy list + + for(auto& conn : conns) { + conn->_connected.store(false, std::memory_order_relaxed); + if(conn->_onDis) + conn->_onDis(); + } + log_debug_printf(io, "Server %s detach channel '%s' to re-search\n", conn ? conn->peerName.c_str() : "", self->name.c_str()); } +Connect::~Connect() {} + +ConnectImpl::~ConnectImpl() {} + +const std::string& ConnectImpl::name() const +{ + return _name; +} +bool ConnectImpl::connected() const +{ + return _connected.load(std::memory_order_relaxed); +} + +std::shared_ptr ConnectBuilder::exec() +{ + if(!ctx) + throw std::logic_error("NULL Builder"); + + std::shared_ptr ret; + + ctx->tcp_loop.call([&ret, this]() { + auto chan = Channel::build(ctx->shared_from_this(), _pvname); + + auto loop(this->ctx->tcp_loop); + std::shared_ptr internal(new ConnectImpl(chan, _pvname)); + internal->_connected = chan->state==Channel::Active; + + ret.reset(internal.get(), [internal, loop](ConnectImpl* ptr) mutable { + // cleanup from user code (maybe user thread) + auto self(std::move(internal)); + auto L(std::move(loop)); + + L.call([&self]() { + self->chan->connectors.remove(self.get()); + }); + }); + + internal->_onConn = std::move(_onConn); + internal->_onDis = std::move(_onDis); + + chan->connectors.push_back(internal.get()); + }); + + return ret; +} + Value ResultWaiter::wait(double timeout) { Guard G(lock); diff --git a/src/clientconn.cpp b/src/clientconn.cpp index 7680d24..35dabd4 100644 --- a/src/clientconn.cpp +++ b/src/clientconn.cpp @@ -340,6 +340,14 @@ void Connection::handle_CREATE_CHANNEL() chan->name.c_str(), unsigned(chan->cid), unsigned(chan->sid)); chan->createOperations(); + + auto conns(chan->connectors); // copy list + + for(auto& conn : conns) { + conn->_connected.store(true, std::memory_order_relaxed); + if(conn->_onConn) + conn->_onConn(); + } } } diff --git a/src/clientimpl.h b/src/clientimpl.h index 38b971c..de542e5 100644 --- a/src/clientimpl.h +++ b/src/clientimpl.h @@ -122,6 +122,25 @@ protected: static void tickEchoS(evutil_socket_t fd, short evt, void *raw); }; +struct ConnectImpl : public Connect +{ + const std::shared_ptr chan; + const std::string _name; + std::atomic _connected; + std::function _onConn; + std::function _onDis; + + ConnectImpl(const std::shared_ptr& chan, const std::string& name) + :chan(chan) + ,_name(name) + ,_connected{false} + {} + virtual ~ConnectImpl(); + + virtual const std::string &name() const override final; + virtual bool connected() const override final; +}; + struct Channel { const std::shared_ptr context; const std::string name; @@ -153,6 +172,8 @@ struct Channel { // points to storage of Connection::opByIOID std::map opByIOID; + std::list connectors; + INST_COUNTER(Channel); Channel(const std::shared_ptr& context, const std::string& name, uint32_t cid); diff --git a/src/pvxs/client.h b/src/pvxs/client.h index 3059271..2f98912 100644 --- a/src/pvxs/client.h +++ b/src/pvxs/client.h @@ -191,11 +191,22 @@ public: virtual Value pop() =0; }; +//! Handle for entry in Channel cache +struct PVXS_API Connect { + virtual ~Connect() =0; + + //! Name passed to Context::connect() + virtual const std::string& name() const =0; + //! Poll (momentary) connection status + virtual bool connected() const =0; +}; + class GetBuilder; class PutBuilder; class RPCBuilder; class MonitorBuilder; class RequestBuilder; +class ConnectBuilder; /** An independent PVA protocol client instance * @@ -378,6 +389,19 @@ public: inline MonitorBuilder monitor(const std::string& pvname); + /** Manually add, and maintain, an entry in the Channel cache. + * + * This optional method may be used when it is known that a given PV + * will be needed in future. + * ConnectBuilder::onConnect() and ConnectBuilder::onDisconnect() + * may be used to get asynchronous notification, or + * the returned Connect object may be used to poll Channel (dis)connect state. + * + * @since UNRELEASED + */ + inline + ConnectBuilder connect(const std::string& pvname); + /** Compose a pvRequest independently of a network operation. * * This is not a network operation. @@ -700,6 +724,30 @@ public: }; RequestBuilder Context::request() { return RequestBuilder{}; } +//! cf. Context::connect() +//! @since UNRELEASED +class ConnectBuilder +{ + std::shared_ptr ctx; + std::string _pvname; + std::function _onConn; + std::function _onDis; +public: + ConnectBuilder(const std::shared_ptr& ctx, const std::string& pvname) + :ctx(ctx) + ,_pvname(pvname) + {} + + //! Handler to be invoked when channel becomes connected. + ConnectBuilder& onConnect(std::function&& cb) { _onConn = std::move(cb); return *this; } + //! Handler to be invoked when channel becomes disconnected. + ConnectBuilder& onDisconnect(std::function&& cb) { _onDis = std::move(cb); return *this; } + + PVXS_API + std::shared_ptr exec(); +}; +ConnectBuilder Context::connect(const std::string& pvname) { return ConnectBuilder{pvt, pvname}; } + struct PVXS_API Config { //! List of unicast and broadcast addresses std::vector addressList; diff --git a/test/testget.cpp b/test/testget.cpp index b1de0cb..02e3acc 100644 --- a/test/testget.cpp +++ b/test/testget.cpp @@ -46,7 +46,53 @@ struct Tester { ~Tester() { if(cli.use_count()>1u) - testAbort("Tester Context leak"); + testAbort("Tester Context leak: %u", unsigned(cli.use_count())); + } + + void testConnector() + { + testShow()<<__func__; + + mbox.open(initial); + serv.start(); + + epicsEvent evt; + bool connd = false, + discd = false; + + auto ctor = cli.connect("mailbox") + .onConnect([&evt, &connd]() + { + testDiag("onConnect%c", !connd ? '.' : '?'); + connd = true; + evt.signal(); + }) + .onDisconnect([&evt, &discd]() + { + testDiag("onDisconnect%c", !discd ? '.' : '?'); + discd = true; + evt.signal(); + }) + .exec(); + + // ensure de-dup + auto ctor2 = cli.connect("mailbox").exec(); + + testTrue(evt.wait(5.0))<<"Wait for Connect"; + testTrue(connd); + + // ensure de-dup + auto ctor3 = cli.connect("mailbox").exec(); + + testTrue(ctor->connected()); + testTrue(ctor2->connected()); + testTrue(ctor3->connected()); + + serv.stop(); + + testTrue(evt.wait(5.0))<<"Wait for Disconnect"; + testTrue(discd); + testFalse(ctor->connected()); } void testWaiter() @@ -256,9 +302,10 @@ void testError(bool phase) MAIN(testget) { - testPlan(15); + testPlan(23); testSetup(); logger_config_env(); + Tester().testConnector(); Tester().testWaiter(); Tester().loopback(); Tester().lazy();