diff --git a/src/clientget.cpp b/src/clientget.cpp index c1df8a1..10aebf4 100644 --- a/src/clientget.cpp +++ b/src/clientget.cpp @@ -111,6 +111,7 @@ struct GPROp : public OperationBase { std::function builder; std::function done; + std::function onInit; Value pvRequest; Value rpcarg; Result result; @@ -135,10 +136,11 @@ struct GPROp : public OperationBase _cancel(true); } - void setDone(decltype (done)&& cb) + void setDone(decltype (done)&& donecb, decltype (onInit)&& initcb) { - if(cb) { - done = std::move(cb); + onInit = std::move(initcb); + if(donecb) { + done = std::move(donecb); } else { auto waiter = this->waiter = std::make_shared(); done = [waiter](Result&& result) { @@ -166,10 +168,12 @@ struct GPROp : public OperationBase { auto context = chan->context; decltype (done) junk; + decltype (onInit) junkI; bool ret; - context->tcp_loop.call([this, &junk, &ret](){ + context->tcp_loop.call([this, &junk, &junkI, &ret](){ ret = _cancel(false); junk = std::move(done); + junkI = std::move(onInit); // leave opByIOID for GC }); return ret; @@ -369,6 +373,14 @@ void Connection::handle_GPR(pva_app_msg_t cmd) } else if(gpr->state==GPROp::Creating) { + try { + if(gpr->onInit) + gpr->onInit(data); + } catch(std::exception& e) { + gpr->result = Result(std::current_exception()); + gpr->state = GPROp::Done; + } + if(cmd==CMD_PUT && gpr->getOput) { gpr->state = GPROp::GetOPut; @@ -493,7 +505,7 @@ std::shared_ptr GetBuilder::_exec_get() auto chan = Channel::build(ctx->shared_from_this(), _name); auto op = std::make_shared(Operation::Get, chan); - op->setDone(std::move(_result)); + op->setDone(std::move(_result), std::move(_onInit)); op->pvRequest = _buildReq(); chan->pending.push_back(op); @@ -520,7 +532,7 @@ std::shared_ptr PutBuilder::exec() auto chan = Channel::build(ctx->shared_from_this(), _name); auto op = std::make_shared(Operation::Put, chan); - op->setDone(std::move(_result)); + op->setDone(std::move(_result), std::move(_onInit)); if(_builder) { op->builder = std::move(_builder); @@ -561,7 +573,7 @@ std::shared_ptr RPCBuilder::exec() auto chan = Channel::build(ctx->shared_from_this(), _name); auto op = std::make_shared(Operation::RPC, chan); - op->setDone(std::move(_result)); + op->setDone(std::move(_result), std::move(_onInit)); if(_argument) { op->rpcarg = std::move(_argument); } else if(_args) { diff --git a/src/clientmon.cpp b/src/clientmon.cpp index 1cd7075..5f0cf67 100644 --- a/src/clientmon.cpp +++ b/src/clientmon.cpp @@ -39,6 +39,7 @@ struct SubscriptionImpl : public OperationBase, public Subscription evevent ackTick; // const after exec() + std::function onInit; std::function event; Value pvRequest; bool pipeline = false; @@ -447,6 +448,9 @@ void Connection::handle_MONITOR() peerName.c_str(), mon->chan->name.c_str()); + if(mon->onInit) + mon->onInit(info->prototype); + mon->state = SubscriptionImpl::Idle; if(mon->autostart) @@ -547,6 +551,7 @@ std::shared_ptr MonitorBuilder::exec() auto op = std::make_shared(Operation::Monitor, chan); op->event = std::move(_event); + op->onInit = std::move(_onInit); op->pvRequest = _buildReq(); op->maskConn = _maskConn; op->maskDiscon = _maskDisconn; diff --git a/src/pvxs/client.h b/src/pvxs/client.h index 2f98912..1bdc36e 100644 --- a/src/pvxs/client.h +++ b/src/pvxs/client.h @@ -496,6 +496,7 @@ protected: template class CommonBuilder : public Base { protected: + std::function _onInit; CommonBuilder() = default; constexpr CommonBuilder(const std::shared_ptr& ctx, const std::string& name) : Base(ctx, name) {} inline SubBuilder& _sb() { return static_cast(*this); } @@ -537,6 +538,11 @@ public: SubBuilder& priority(int p) { this->_prio = p; return _sb(); } SubBuilder& server(const std::string& s) { this->_server = s; return _sb(); } + + // Expert API + // called during operation INIT phase for Get/Put/Monitor when remote type + // description is available. + SubBuilder& onInit(std::function&& cb) { this->_onInit = std::move(cb); return _sb(); } }; } // namespace detail diff --git a/test/testget.cpp b/test/testget.cpp index 02e3acc..907644a 100644 --- a/test/testget.cpp +++ b/test/testget.cpp @@ -101,14 +101,21 @@ struct Tester { mbox.open(initial); serv.start(); + std::atomic hadInit{false}; - auto op = cli.get("mailbox").exec(); + auto op = cli.get("mailbox") + .onInit([&hadInit](const Value& prototype) { + testShow()<<"onInit() << "<wait(5.0); testEq(result["value"].as(), 42); + testTrue(hadInit.load()); } void testWait() @@ -302,7 +309,7 @@ void testError(bool phase) MAIN(testget) { - testPlan(23); + testPlan(24); testSetup(); logger_config_env(); Tester().testConnector();