From cd990fb4599be5337307ff01d6e0ed416728bc3b Mon Sep 17 00:00:00 2001 From: Michael Davidsaver Date: Thu, 31 Dec 2020 11:12:19 -0800 Subject: [PATCH] reExecGet/Put() --- src/clientget.cpp | 81 ++++++++++++++++++++++++++++++---------- src/clientintrospect.cpp | 6 ++- src/clientmon.cpp | 7 +++- src/pvxs/client.h | 11 ++++-- src/serverget.cpp | 2 +- test/testget.cpp | 4 +- test/testput.cpp | 80 ++++++++++++++++++++++++++++++++++++++- 7 files changed, 161 insertions(+), 30 deletions(-) diff --git a/src/clientget.cpp b/src/clientget.cpp index cd2f797..87ab752 100644 --- a/src/clientget.cpp +++ b/src/clientget.cpp @@ -109,6 +109,8 @@ namespace { struct GPROp : public OperationBase { + std::weak_ptr internal_self; + std::function builder; std::function done; std::function onInit; @@ -199,36 +201,61 @@ struct GPROp : public OperationBase return ret; } - virtual void reExec(const Value& arg, std::function&& resultcb) override final + void _reExecImpl(bool put, const Value& arg, std::function&& resultcb) { - if(op!=RPC && arg) - throw std::invalid_argument("Only RPC may reExec() with Value"); - auto a(arg); auto cb(std::move(resultcb)); + std::shared_ptr self(internal_self); - loop.dispatch([this, a, cb]() mutable { - if(autoExec) { + loop.dispatch([self, a, cb, put]() mutable { + if(self->autoExec) { client::Result ret(std::make_exception_ptr(std::invalid_argument("reExec() requires Operation creation with .autoExec(false)"))); cb(std::move(ret)); return; } - if(state!=Idle) + if(self->state!=Idle) return; - this->arg = std::move(a); - this->done = std::move(cb); + if(self->op==RPC) { + self->arg = std::move(a); - _reExec(); + } else if(put && self->op==Put) { + self->builder = [a](Value&&) -> Value { + // caller should be passing a Value of the correct prototype + // given through onInit(). + return a; + }; + } + self->done = std::move(cb); + + self->_reExec(put); }); } - void _reExec() + void reExecGet(std::function&& resultcb) override final { - if(op==Put && getOput) { + if(op!=Get && op!=Put) + throw std::logic_error("reExecGet() only meaningful for .get() and .put()"); + + _reExecImpl(false, Value(), std::move(resultcb)); + } + void reExecPut(const Value& arg, std::function&& resultcb) override final + { + if(op!=Get && op!=Put) { + throw std::logic_error("reExecPut() only meaningful for .put()"); + + } else if(!arg) { + throw std::invalid_argument("reExecPut() Put requires Value"); + } + _reExecImpl(true, arg, std::move(resultcb)); + } + + void _reExec(bool put) + { + if(op==Put && !put) { state = GPROp::GetOPut; - } else if(op==Put && !getOput) { + } else if(op==Put && put) { state = GPROp::BuildPut; } else { @@ -492,12 +519,22 @@ void Connection::handle_GPR(pva_app_msg_t cmd) } if(gpr->state==GPROp::Idle && gpr->autoExec) - gpr->_reExec(); + gpr->_reExec(!gpr->getOput); // reply may now be sent, or deferred return; } else if(gpr->state==GPROp::GetOPut) { - gpr->state = GPROp::BuildPut; + if(gpr->autoExec) { + // proceed to execute put + gpr->state = GPROp::BuildPut; + + } else { + // deliver get result + gpr->state = GPROp::Idle; + gpr->result = Result(std::move(data), peerName); + gpr->notify(); + return; + } info->prototype.assign(data); @@ -530,10 +567,12 @@ void Connection::handle_RPC() { handle_GPR(CMD_RPC); } static std::shared_ptr gpr_setup(const std::shared_ptr& context, std::string name, // need to capture by value - const std::shared_ptr& op, + std::shared_ptr&& op, bool syncCancel) { - auto internal(op); + auto internal(std::move(op)); + internal->internal_self = internal; + std::shared_ptr external(internal.get(), [internal, syncCancel](GPROp*) mutable { // (maybe) user thread auto temp(std::move(internal)); @@ -574,7 +613,7 @@ std::shared_ptr GetBuilder::_exec_get() op->autoExec = _autoexec; op->pvRequest = _buildReq(); - return gpr_setup(context, _name, op, _syncCancel); + return gpr_setup(context, _name, std::move(op), _syncCancel); } std::shared_ptr PutBuilder::exec() @@ -604,13 +643,15 @@ std::shared_ptr PutBuilder::exec() op->autoExec = _autoexec; op->pvRequest = _buildReq(); - return gpr_setup(context, _name, op, _syncCancel); + return gpr_setup(context, _name, std::move(op), _syncCancel); } std::shared_ptr RPCBuilder::exec() { if(!ctx) throw std::logic_error("NULL Builder"); + if(!_autoexec) + throw std::logic_error("autoExec(false) not possible for rpc()"); auto context(ctx->impl->shared_from_this()); @@ -627,7 +668,7 @@ std::shared_ptr RPCBuilder::exec() op->autoExec = _autoexec; op->pvRequest = _buildReq(); - return gpr_setup(context, _name, op, _syncCancel); + return gpr_setup(context, _name, std::move(op), _syncCancel); } } // namespace client diff --git a/src/clientintrospect.cpp b/src/clientintrospect.cpp index 57aad6a..ae2e48b 100644 --- a/src/clientintrospect.cpp +++ b/src/clientintrospect.cpp @@ -68,7 +68,9 @@ struct InfoOp : public OperationBase return ret; } - void reExec(const Value& arg, std::function&& resultcb) override final {} + // not meaningful for GET_FIELD operation + void reExecGet(std::function&& resultcb) override final {} + void reExecPut(const Value& arg, std::function&& resultcb) override final {} virtual void createOp() override final { @@ -179,6 +181,8 @@ std::shared_ptr GetBuilder::_exec_info() { if(!ctx) throw std::logic_error("NULL Builder"); + if(!_autoexec) + throw std::logic_error("autoExec(false) not possible for info()"); auto context(ctx->impl->shared_from_this()); diff --git a/src/clientmon.cpp b/src/clientmon.cpp index f6a42bb..8736dd8 100644 --- a/src/clientmon.cpp +++ b/src/clientmon.cpp @@ -213,7 +213,10 @@ struct SubscriptionImpl : public OperationBase, public Subscription return ret; } - void reExec(const Value& arg, std::function&& resultcb) override final {} + // not actually visible through Subscription. + // an artifact of using OperationBase for convenience + void reExecGet(std::function&& resultcb) override final {} + void reExecPut(const Value& arg, std::function&& resultcb) override final {} virtual void createOp() override final { @@ -551,6 +554,8 @@ std::shared_ptr MonitorBuilder::exec() { if(!ctx) throw std::logic_error("NULL Builder"); + if(!_autoexec) + throw std::logic_error("autoExec(false) not possible for monitor()"); auto context(ctx->impl->shared_from_this()); diff --git a/src/pvxs/client.h b/src/pvxs/client.h index b8e0bc2..29e7a1f 100644 --- a/src/pvxs/client.h +++ b/src/pvxs/client.h @@ -143,8 +143,11 @@ struct PVXS_API Operation { virtual void interrupt() =0; // Expert API - // (Re)issue request built with autoExec(false) - virtual void reExec(const Value& arg, std::function&& resultcb) =0; + // usable when Builder::autoExec(false) + // For GET/PUT, (re)issue request for current value + virtual void reExecGet(std::function&& resultcb) =0; + // For PUT (re)issue request to set current value + virtual void reExecPut(const Value& arg, std::function&& resultcb) =0; }; //! Handle for monitor subscription @@ -562,8 +565,8 @@ public: SubBuilder& onInit(std::function&& cb) { this->_onInit = std::move(cb); return _sb(); } // Expert API - // control whether operations automatically proceed from INIT to EXEC - // cf. reExec() + // for GET/PUT control whether operations automatically proceed from INIT to EXEC + // cf. Operation::reExec() SubBuilder& autoExec(bool b) { this->_autoexec = b; return _sb(); } /** Controls whether Operation::cancel() and Subscription::cancel() synchronize. diff --git a/src/serverget.cpp b/src/serverget.cpp index eff493b..1a44882 100644 --- a/src/serverget.cpp +++ b/src/serverget.cpp @@ -457,7 +457,7 @@ void ServerConn::handle_GPR(pva_app_msg_t cmd) op->subcmd = subcmd; op->state = ServerOp::Executing; - log_debug_printf(connsetup, "CLient %s Get executing\n", peerName.c_str()); + log_debug_printf(connsetup, "Client %s op%x executing\n", peerName.c_str(), cmd); try { if(cmd==CMD_RPC && isput) { diff --git a/test/testget.cpp b/test/testget.cpp index 5453fc3..68826b7 100644 --- a/test/testget.cpp +++ b/test/testget.cpp @@ -334,7 +334,7 @@ struct Tester { testOk1(initd.wait(5.0)); testDiag("reExec() 1"); - op->reExec(Value(), [&done](client::Result&& result) { + op->reExecGet([&done](client::Result&& result) { testTrue(!!result()); testDiag("result() 1"); done.signal(); @@ -342,7 +342,7 @@ struct Tester { testOk1(done.wait(5.0)); testDiag("reExec() 2"); - op->reExec(Value(), [&done](client::Result&& result) { + op->reExecGet([&done](client::Result&& result) { testTrue(!!result()); testDiag("result() 2"); done.signal(); diff --git a/test/testput.cpp b/test/testput.cpp index e798ad2..8920880 100644 --- a/test/testput.cpp +++ b/test/testput.cpp @@ -189,6 +189,83 @@ struct Tester : public TesterBase cli = client::Context(); op.reset(); } + + void manualExec() + { + testShow()<<__func__; + + epicsEvent initd; + epicsEvent done; + Value top; + + mbox.open(initial); + serv.start(); + + auto op = cli.put("mailbox") + .autoExec(false) + .onInit([&initd, &top](const Value& prototype) { + testDiag("onInit()"); + top = prototype; + initd.signal(); + }) + .result([&initd](client::Result&& result) { + testFail("result() unexpected error prior to onInit()"); + initd.signal(); + }) + .exec(); + + testOk1(initd.wait(5.0)); + + testDiag("reExec() GET 1"); + top["value"] = 123u; + + op->reExecGet([&done](client::Result&& result) { + testDiag("result() GET 1"); + if(testFalse(result.error())) { + testEq(result()["value"].as(), 1u); + } + done.signal(); + }); + + testOk1(done.wait(5.0)); + + testDiag("reExec() PUT 1"); + top["value"] = 123u; + + op->reExecPut(top.clone(), [&done](client::Result&& result) { + testDiag("result() PUT 1"); + testFalse(result.error()); + done.signal(); + }); + + testOk1(done.wait(5.0)); + testEq(mbox.fetch()["value"].as(), 123u); + + testDiag("reExec() GET 2"); + top["value"] = 123u; + + op->reExecGet([&done](client::Result&& result) { + testDiag("result() GET 2"); + if(testFalse(result.error())) { + testEq(result()["value"].as(), 123u); + } + done.signal(); + }); + + testOk1(done.wait(5.0)); + + testDiag("reExec() 2"); + top["value"] = 124u; + + op->reExecPut(top, [&done](client::Result&& result) { + testDiag("result() PUT 2"); + testFalse(result.error()); + done.signal(); + }); + + testOk1(done.wait(5.0)); + testEq(mbox.fetch()["value"].as(), 124u); + } }; struct TestPutBuilder : public TesterBase @@ -333,7 +410,7 @@ void testError() MAIN(testput) { - testPlan(26); + testPlan(39); testSetup(); logger_config_env(); Tester().loopback(false); @@ -342,6 +419,7 @@ MAIN(testput) Tester().timeout(); Tester().cancel(); Tester().orphan(); + Tester().manualExec(); TestPutBuilder().testSet(); testRO(); testError();