client: add Context::connect()

A means of manually (pre)populating
the channel cache.
This commit is contained in:
Michael Davidsaver
2020-08-24 18:57:53 -07:00
parent da004bc54b
commit 564b9ec2cc
5 changed files with 180 additions and 2 deletions
+54
View File
@@ -127,12 +127,66 @@ void Channel::disconnect(const std::shared_ptr<Channel>& self)
self->sid = 0xdeadbeef; // spoil
context->searchBuckets[context->currentBucket].push_back(self);
auto conns(connectors); // copy list
for(auto& conn : conns) {
conn->_connected.store(false, std::memory_order_relaxed);
if(conn->_onDis)
conn->_onDis();
}
log_debug_printf(io, "Server %s detach channel '%s' to re-search\n",
conn ? conn->peerName.c_str() : "<disconnected>",
self->name.c_str());
}
Connect::~Connect() {}
ConnectImpl::~ConnectImpl() {}
const std::string& ConnectImpl::name() const
{
return _name;
}
bool ConnectImpl::connected() const
{
return _connected.load(std::memory_order_relaxed);
}
std::shared_ptr<Connect> ConnectBuilder::exec()
{
if(!ctx)
throw std::logic_error("NULL Builder");
std::shared_ptr<ConnectImpl> ret;
ctx->tcp_loop.call([&ret, this]() {
auto chan = Channel::build(ctx->shared_from_this(), _pvname);
auto loop(this->ctx->tcp_loop);
std::shared_ptr<ConnectImpl> internal(new ConnectImpl(chan, _pvname));
internal->_connected = chan->state==Channel::Active;
ret.reset(internal.get(), [internal, loop](ConnectImpl* ptr) mutable {
// cleanup from user code (maybe user thread)
auto self(std::move(internal));
auto L(std::move(loop));
L.call([&self]() {
self->chan->connectors.remove(self.get());
});
});
internal->_onConn = std::move(_onConn);
internal->_onDis = std::move(_onDis);
chan->connectors.push_back(internal.get());
});
return ret;
}
Value ResultWaiter::wait(double timeout)
{
Guard G(lock);
+8
View File
@@ -340,6 +340,14 @@ void Connection::handle_CREATE_CHANNEL()
chan->name.c_str(), unsigned(chan->cid), unsigned(chan->sid));
chan->createOperations();
auto conns(chan->connectors); // copy list
for(auto& conn : conns) {
conn->_connected.store(true, std::memory_order_relaxed);
if(conn->_onConn)
conn->_onConn();
}
}
}
+21
View File
@@ -122,6 +122,25 @@ protected:
static void tickEchoS(evutil_socket_t fd, short evt, void *raw);
};
struct ConnectImpl : public Connect
{
const std::shared_ptr<Channel> chan;
const std::string _name;
std::atomic<bool> _connected;
std::function<void()> _onConn;
std::function<void()> _onDis;
ConnectImpl(const std::shared_ptr<Channel>& chan, const std::string& name)
:chan(chan)
,_name(name)
,_connected{false}
{}
virtual ~ConnectImpl();
virtual const std::string &name() const override final;
virtual bool connected() const override final;
};
struct Channel {
const std::shared_ptr<Context::Pvt> context;
const std::string name;
@@ -153,6 +172,8 @@ struct Channel {
// points to storage of Connection::opByIOID
std::map<uint32_t, RequestInfo*> opByIOID;
std::list<ConnectImpl*> connectors;
INST_COUNTER(Channel);
Channel(const std::shared_ptr<Context::Pvt>& context, const std::string& name, uint32_t cid);
+48
View File
@@ -191,11 +191,22 @@ public:
virtual Value pop() =0;
};
//! Handle for entry in Channel cache
struct PVXS_API Connect {
virtual ~Connect() =0;
//! Name passed to Context::connect()
virtual const std::string& name() const =0;
//! Poll (momentary) connection status
virtual bool connected() const =0;
};
class GetBuilder;
class PutBuilder;
class RPCBuilder;
class MonitorBuilder;
class RequestBuilder;
class ConnectBuilder;
/** An independent PVA protocol client instance
*
@@ -378,6 +389,19 @@ public:
inline
MonitorBuilder monitor(const std::string& pvname);
/** Manually add, and maintain, an entry in the Channel cache.
*
* This optional method may be used when it is known that a given PV
* will be needed in future.
* ConnectBuilder::onConnect() and ConnectBuilder::onDisconnect()
* may be used to get asynchronous notification, or
* the returned Connect object may be used to poll Channel (dis)connect state.
*
* @since UNRELEASED
*/
inline
ConnectBuilder connect(const std::string& pvname);
/** Compose a pvRequest independently of a network operation.
*
* This is not a network operation.
@@ -700,6 +724,30 @@ public:
};
RequestBuilder Context::request() { return RequestBuilder{}; }
//! cf. Context::connect()
//! @since UNRELEASED
class ConnectBuilder
{
std::shared_ptr<Context::Pvt> ctx;
std::string _pvname;
std::function<void()> _onConn;
std::function<void()> _onDis;
public:
ConnectBuilder(const std::shared_ptr<Context::Pvt>& ctx, const std::string& pvname)
:ctx(ctx)
,_pvname(pvname)
{}
//! Handler to be invoked when channel becomes connected.
ConnectBuilder& onConnect(std::function<void()>&& cb) { _onConn = std::move(cb); return *this; }
//! Handler to be invoked when channel becomes disconnected.
ConnectBuilder& onDisconnect(std::function<void()>&& cb) { _onDis = std::move(cb); return *this; }
PVXS_API
std::shared_ptr<Connect> exec();
};
ConnectBuilder Context::connect(const std::string& pvname) { return ConnectBuilder{pvt, pvname}; }
struct PVXS_API Config {
//! List of unicast and broadcast addresses
std::vector<std::string> addressList;
+49 -2
View File
@@ -46,7 +46,53 @@ struct Tester {
~Tester()
{
if(cli.use_count()>1u)
testAbort("Tester Context leak");
testAbort("Tester Context leak: %u", unsigned(cli.use_count()));
}
void testConnector()
{
testShow()<<__func__;
mbox.open(initial);
serv.start();
epicsEvent evt;
bool connd = false,
discd = false;
auto ctor = cli.connect("mailbox")
.onConnect([&evt, &connd]()
{
testDiag("onConnect%c", !connd ? '.' : '?');
connd = true;
evt.signal();
})
.onDisconnect([&evt, &discd]()
{
testDiag("onDisconnect%c", !discd ? '.' : '?');
discd = true;
evt.signal();
})
.exec();
// ensure de-dup
auto ctor2 = cli.connect("mailbox").exec();
testTrue(evt.wait(5.0))<<"Wait for Connect";
testTrue(connd);
// ensure de-dup
auto ctor3 = cli.connect("mailbox").exec();
testTrue(ctor->connected());
testTrue(ctor2->connected());
testTrue(ctor3->connected());
serv.stop();
testTrue(evt.wait(5.0))<<"Wait for Disconnect";
testTrue(discd);
testFalse(ctor->connected());
}
void testWaiter()
@@ -256,9 +302,10 @@ void testError(bool phase)
MAIN(testget)
{
testPlan(15);
testPlan(23);
testSetup();
logger_config_env();
Tester().testConnector();
Tester().testWaiter();
Tester().loopback();
Tester().lazy();