From 25e712eb2ab6ea96fd419f1061bf966f47f9bc1b Mon Sep 17 00:00:00 2001 From: Michael Davidsaver Date: Wed, 11 Mar 2020 11:41:22 -0700 Subject: [PATCH] client Add Operation::wait() --- src/client.cpp | 56 ++++++++++++++++++++++++++++++ src/clientget.cpp | 19 ++++++++-- src/clientimpl.h | 22 ++++++++++++ src/clientintrospect.cpp | 10 +++++- src/pvxs/client.h | 75 +++++++++++++++++++++++++++++++++------- test/testget.cpp | 19 +++++++++- 6 files changed, 184 insertions(+), 17 deletions(-) diff --git a/src/client.cpp b/src/client.cpp index 7bd8c21..3b51d91 100644 --- a/src/client.cpp +++ b/src/client.cpp @@ -11,6 +11,7 @@ #include #include #include +#include #include #include @@ -19,6 +20,9 @@ DEFINE_LOGGER(setup, "pvxs.client.setup"); DEFINE_LOGGER(io, "pvxs.client.io"); DEFINE_LOGGER(duppv, "pvxs.client.dup"); +typedef epicsGuard Guard; +typedef epicsGuardRelease UnGuard; + namespace pvxs { namespace client { @@ -52,6 +56,16 @@ Connected::Connected(const std::string& peerName) Connected::~Connected() {} +Interrupted::Interrupted() + :std::runtime_error ("Interrupted") +{} +Interrupted::~Interrupted() {} + +Timeout::Timeout() + :std::runtime_error ("Interrupted") +{} +Timeout::~Timeout() {} + Channel::Channel(const std::shared_ptr& context, const std::string& name, uint32_t cid) :context(context) ,name(name) @@ -117,6 +131,35 @@ void Channel::disconnect(const std::shared_ptr& self) } +Value ResultWaiter::wait(double timeout) +{ + Guard G(lock); + while(outcome==Busy) { + UnGuard U(G); + if(!notify.wait(timeout)) + throw Timeout(); + } + if(outcome==Done) + return result(); + else + throw Interrupted(); +} + +void ResultWaiter::complete(Result&& result, bool interrupt) +{ + bool wakeup; + { + Guard G(lock); + wakeup = outcome==Busy; + if(wakeup) { + this->result = std::move(result); + outcome = interrupt ? Abort : Done; + } + } + if(wakeup) + notify.trigger(); +} + OperationBase::OperationBase(operation_t op, const std::shared_ptr& chan) :Operation(op) ,chan(chan) @@ -124,6 +167,19 @@ OperationBase::OperationBase(operation_t op, const std::shared_ptr& cha OperationBase::~OperationBase() {} +Value OperationBase::wait(double timeout) +{ + if(!waiter) + throw std::logic_error("Operation has custom .result() callback"); + return waiter->wait(timeout); +} + +void OperationBase::interrupt() +{ + if(waiter) + waiter->complete(Result(), true); +} + RequestInfo::RequestInfo(uint32_t sid, uint32_t ioid, std::shared_ptr& handle) :sid(sid) ,ioid(ioid) diff --git a/src/clientget.cpp b/src/clientget.cpp index 87f7f34..b28a510 100644 --- a/src/clientget.cpp +++ b/src/clientget.cpp @@ -116,6 +116,18 @@ struct GPROp : public OperationBase _cancel(true); } + void setDone(decltype (done)&& cb) + { + if(cb) { + done = std::move(cb); + } else { + auto waiter = this->waiter = std::make_shared(); + done = [waiter](Result&& result) { + waiter->complete(std::move(result), false); + }; + } + } + void notify() { try { if(done) @@ -429,7 +441,7 @@ std::shared_ptr GetBuilder::_exec_get() auto chan = Channel::build(ctx, _name); auto op = std::make_shared(Operation::Get, chan); - op->done = std::move(_result); + op->setDone(std::move(_result)); op->pvRequest = _build(); chan->pending.push_back(op); @@ -452,7 +464,8 @@ std::shared_ptr PutBuilder::exec() auto chan = Channel::build(ctx, _name); auto op = std::make_shared(Operation::Put, chan); - op->done = std::move(_result); + op->setDone(std::move(_result)); + if(_builder) { op->builder = std::move(_builder); } else if(pvt) { @@ -483,7 +496,7 @@ std::shared_ptr RPCBuilder::exec() auto chan = Channel::build(ctx, _name); auto op = std::make_shared(Operation::RPC, chan); - op->done = std::move(_result); + op->setDone(std::move(_result)); op->rpcarg = std::move(_argument); op->pvRequest = _build(); diff --git a/src/clientimpl.h b/src/clientimpl.h index e2a43a9..e991e20 100644 --- a/src/clientimpl.h +++ b/src/clientimpl.h @@ -9,6 +9,8 @@ #include #include +#include +#include #include @@ -23,17 +25,37 @@ namespace client { struct Channel; +struct ResultWaiter { + epicsMutex lock; + epicsEvent notify; + Result result; + enum { + Busy, + Done, + Abort, + } outcome = Busy; + + Value wait(double timeout=-1.0); + void complete(Result&& result, bool interrupt); +}; + // internal actions on an Operation struct OperationBase : public Operation { std::shared_ptr chan; uint32_t ioid; + Value result; + bool done; + std::shared_ptr waiter; OperationBase(operation_t op, const std::shared_ptr& chan); virtual ~OperationBase(); virtual void createOp() =0; virtual void disconnected(const std::shared_ptr& self) =0; + + virtual Value wait(double timeout=-1.0) override final; + virtual void interrupt() override final; }; struct RequestInfo { diff --git a/src/clientintrospect.cpp b/src/clientintrospect.cpp index 2d09e71..1999a45 100644 --- a/src/clientintrospect.cpp +++ b/src/clientintrospect.cpp @@ -174,7 +174,15 @@ std::shared_ptr GetBuilder::_exec_info() auto chan = Channel::build(ctx, _name); auto op = std::make_shared(chan); - op->done = std::move(_result); + + if(_result) { + op->done = std::move(_result); + } else { + auto waiter = op->waiter = std::make_shared(); + op->done = [waiter](Result&& result) { + waiter->complete(std::move(result), false); + }; + } chan->pending.push_back(op); chan->createOperations(); diff --git a/src/pvxs/client.h b/src/pvxs/client.h index 0a17c67..e7a343a 100644 --- a/src/pvxs/client.h +++ b/src/pvxs/client.h @@ -60,6 +60,18 @@ struct PVXS_API Connected : public std::runtime_error const epicsTime time; }; +struct PVXS_API Interrupted : public std::runtime_error +{ + Interrupted(); + virtual ~Interrupted(); +}; + +struct PVXS_API Timeout : public std::runtime_error +{ + Timeout(); + virtual ~Timeout(); +}; + //! Holder for a Value or an exception class Result { Value _result; @@ -101,6 +113,26 @@ struct PVXS_API Operation { //! Explicitly cancel a pending operation. //! Blocks until an in-progress callback has completed. virtual void cancel() =0; + + /** @brief Block until Operation completion + * + * As an alternative to a .result() callback, wait for operation competion, + * timeout, or interruption (via. interrupt() ). + * + * @param timeout Time to wait prior to throwing TimeoutError. cf. epicsEvent::wait(double) + * @return result Value. Always empty/invalid for put() + * @throws Timeout Timeout exceeded + * @throws Interrupted interrupt() called + */ + virtual Value wait(double timeout) =0; + + //! wait(double) without a timeout + Value wait() { + return wait(99999999.0); + } + + //! Queue an interruption of a wait() or wait(double) call. + virtual void interrupt() =0; }; //! Handle for monitor subscription @@ -175,6 +207,13 @@ public: const Config& config() const; /** Request the present value of a PV + * + * @code + * Context ctxt(...); + * auto result = ctxt.get("pv:name") + * .exec() + * .wait(); + * @endcode * * @code * Context ctxt(...); @@ -183,6 +222,7 @@ public: * std::cout<wait(5.0); + + testEq(result["value"].as(), 42); + } + void testWait() { client::Result actual; @@ -217,8 +233,9 @@ void testError(bool phase) MAIN(testget) { - testPlan(13); + testPlan(14); logger_config_env(); + Tester().testWaiter(); Tester().loopback(); Tester().lazy(); Tester().timeout();