From 708fbc806237c51a30e55ae576fc68eeaf898e3b Mon Sep 17 00:00:00 2001 From: Michael Davidsaver Date: Wed, 30 Dec 2020 10:43:15 -0800 Subject: [PATCH] client: add *Builder::syncCancel(bool) Option for asynchronous cancel (eg. implicitly through ~Operation). --- src/client.cpp | 2 +- src/clientget.cpp | 31 ++++++++++++++++--------------- src/clientimpl.h | 4 ++++ src/clientintrospect.cpp | 24 ++++++++++++------------ src/clientmon.cpp | 24 ++++++++++++------------ src/pvxs/client.h | 10 ++++++++++ test/testget.cpp | 29 ++++++++++++++++++++++++++++- test/testinfo.cpp | 29 ++++++++++++++++++++++++++++- test/testmon.cpp | 33 ++++++++++++++++++++++++++++++++- 9 files changed, 143 insertions(+), 43 deletions(-) diff --git a/src/client.cpp b/src/client.cpp index 4a75b94..e889169 100644 --- a/src/client.cpp +++ b/src/client.cpp @@ -910,7 +910,7 @@ void ContextImpl::cacheCleanS(evutil_socket_t fd, short evt, void *raw) Context::Pvt::Pvt(const Config& conf) :loop("PVXCTCP", epicsThreadPriorityCAServerLow) - ,impl(std::make_shared(conf, loop)) + ,impl(std::make_shared(conf, loop.internal())) {} Context::Pvt::~Pvt() diff --git a/src/clientget.cpp b/src/clientget.cpp index a2dba98..cd2f797 100644 --- a/src/clientget.cpp +++ b/src/clientget.cpp @@ -134,8 +134,8 @@ struct GPROp : public OperationBase :OperationBase (op, loop) {} ~GPROp() { - loop.assertInLoop(); - _cancel(true); + if(loop.assertInRunningLoop()) + _cancel(true); } void setDone(decltype (done)&& donecb, decltype (onInit)&& initcb) @@ -530,22 +530,23 @@ void Connection::handle_RPC() { handle_GPR(CMD_RPC); } static std::shared_ptr gpr_setup(const std::shared_ptr& context, std::string name, // need to capture by value - const std::shared_ptr& op) + const std::shared_ptr& op, + bool syncCancel) { auto internal(op); - std::shared_ptr external(internal.get(), [internal](GPROp*) mutable { + std::shared_ptr external(internal.get(), [internal, syncCancel](GPROp*) mutable { // (maybe) user thread - auto loop(internal->loop); + auto temp(std::move(internal)); + auto loop(temp->loop); // std::bind for lack of c++14 generalized capture // to move internal ref to worker for dtor - loop.call(std::bind([](std::shared_ptr& op) { - // on worker + loop.tryInvoke(syncCancel, std::bind([](std::shared_ptr& op) { + // on worker - // ordering of dispatch()/call() ensures creation before destruction - assert(op->chan); - op->_cancel(true); }, std::move(internal))); - assert(!internal); - internal.reset(); + // ordering of dispatch()/call() ensures creation before destruction + assert(op->chan); + op->_cancel(true); + }, std::move(temp))); }); context->tcp_loop.dispatch([internal, context, name]() { @@ -573,7 +574,7 @@ std::shared_ptr GetBuilder::_exec_get() op->autoExec = _autoexec; op->pvRequest = _buildReq(); - return gpr_setup(context, _name, op); + return gpr_setup(context, _name, op, _syncCancel); } std::shared_ptr PutBuilder::exec() @@ -603,7 +604,7 @@ std::shared_ptr PutBuilder::exec() op->autoExec = _autoexec; op->pvRequest = _buildReq(); - return gpr_setup(context, _name, op); + return gpr_setup(context, _name, op, _syncCancel); } std::shared_ptr RPCBuilder::exec() @@ -626,7 +627,7 @@ std::shared_ptr RPCBuilder::exec() op->autoExec = _autoexec; op->pvRequest = _buildReq(); - return gpr_setup(context, _name, op); + return gpr_setup(context, _name, op, _syncCancel); } } // namespace client diff --git a/src/clientimpl.h b/src/clientimpl.h index b8862f0..78f1347 100644 --- a/src/clientimpl.h +++ b/src/clientimpl.h @@ -268,7 +268,11 @@ struct ContextImpl : public std::enable_shared_from_this }; struct Context::Pvt { + // external ref to running loop. + // impl directly, and indirectly, contains internal refs +private: evbase loop; +public: std::shared_ptr impl; INST_COUNTER(ClientPvt); diff --git a/src/clientintrospect.cpp b/src/clientintrospect.cpp index fdee3f6..57aad6a 100644 --- a/src/clientintrospect.cpp +++ b/src/clientintrospect.cpp @@ -36,8 +36,8 @@ struct InfoOp : public OperationBase virtual ~InfoOp() { - loop.assertInLoop(); - _cancel(true); + if(loop.assertInRunningLoop()) + _cancel(true); } virtual bool cancel() override final { @@ -192,20 +192,20 @@ std::shared_ptr GetBuilder::_exec_info() }; } - std::shared_ptr external(op.get(), [op](InfoOp*) mutable { + auto syncCancel(_syncCancel); + std::shared_ptr external(op.get(), [op, syncCancel](InfoOp*) mutable { // from user thread - auto loop(op->loop); + auto temp(std::move(op)); + auto loop(temp->loop); // std::bind for lack of c++14 generalized capture // to move internal ref to worker for dtor - loop.call(std::bind([](std::shared_ptr& op) { - // on worker + loop.tryInvoke(syncCancel, std::bind([](std::shared_ptr& op) { + // on worker - // ordering of dispatch()/call() ensures creation before destruction - assert(op->chan); - op->_cancel(true); - }, std::move(op))); - assert(!op); - op.reset(); + // ordering of dispatch()/call() ensures creation before destruction + assert(op->chan); + op->_cancel(true); + }, std::move(temp))); }); auto name(std::move(_name)); diff --git a/src/clientmon.cpp b/src/clientmon.cpp index 8cead55..f6a42bb 100644 --- a/src/clientmon.cpp +++ b/src/clientmon.cpp @@ -74,8 +74,8 @@ struct SubscriptionImpl : public OperationBase, public Subscription ,ackTick(event_new(loop.base, -1, EV_TIMEOUT, &tickAckS, this)) {} virtual ~SubscriptionImpl() { - loop.assertInLoop(); - _cancel(true); + if(loop.assertInRunningLoop()) + _cancel(true); } virtual const std::string& _name() override final { @@ -604,20 +604,20 @@ std::shared_ptr MonitorBuilder::exec() op->ackAt = std::max(1u, std::min(op->ackAt, op->queueSize)); - std::shared_ptr external(op.get(), [op](SubscriptionImpl*) mutable { + auto syncCancel(_syncCancel); + std::shared_ptr external(op.get(), [op, syncCancel](SubscriptionImpl*) mutable { // from user thread - auto loop(op->loop); + auto temp(std::move(op)); + auto loop(temp->loop); // std::bind for lack of c++14 generalized capture // to move internal ref to worker for dtor - loop.call(std::bind([](std::shared_ptr& op) { - // on worker + loop.tryInvoke(syncCancel, std::bind([](std::shared_ptr& op) { + // on worker - // ordering of dispatch()/call() ensures creation before destruction - assert(op->chan); - op->_cancel(true); - }, std::move(op))); - assert(!op); - op.reset(); + // ordering of dispatch()/call() ensures creation before destruction + assert(op->chan); + op->_cancel(true); + }, std::move(temp))); }); auto name(std::move(_name)); diff --git a/src/pvxs/client.h b/src/pvxs/client.h index f83cd1f..b8e0bc2 100644 --- a/src/pvxs/client.h +++ b/src/pvxs/client.h @@ -480,6 +480,7 @@ protected: std::shared_ptr req; unsigned _prio = 0u; bool _autoexec = true; + bool _syncCancel = true; CommonBase() = default; CommonBase(const std::shared_ptr& ctx, const std::string& name) : ctx(ctx), _name(name) {} @@ -564,6 +565,15 @@ public: // control whether operations automatically proceed from INIT to EXEC // cf. reExec() SubBuilder& autoExec(bool b) { this->_autoexec = b; return _sb(); } + + /** Controls whether Operation::cancel() and Subscription::cancel() synchronize. + * + * When true (the default) explicit or implicit cancel blocks until any + * in progress callback has completed. This makes safe some use of + * references in callbacks. + * @since UNRELEASED + */ + SubBuilder& syncCancel(bool b) { this->_syncCancel = b; return _sb(); } }; } // namespace detail diff --git a/test/testget.cpp b/test/testget.cpp index 31a8ed0..5453fc3 100644 --- a/test/testget.cpp +++ b/test/testget.cpp @@ -273,6 +273,32 @@ struct Tester { testOk1(!done.wait(2.1)); } + void asyncCancel() + { + testShow()<<__func__; + + struct info_t { + client::Result actual; + epicsEvent done; + }; + auto info(std::make_shared()); + + serv.start(); + + // not storing Operation -> immediate cancel() + cli.info("mailbox") + .syncCancel(false) + .result([info](client::Result&& result) { + info->actual = std::move(result); + info->done.signal(); + }) + .exec(); + + cli.hurryUp(); + + testOk1(!info->done.wait(2.1)); + } + void orphan() { testShow()<<__func__; @@ -395,7 +421,7 @@ void testError(bool phase) MAIN(testget) { - testPlan(51); + testPlan(52); testSetup(); logger_config_env(); Tester().testConnector(); @@ -404,6 +430,7 @@ MAIN(testget) Tester().lazy(); Tester().timeout(); Tester().cancel(); + Tester().asyncCancel(); Tester().orphan(); Tester().manualExec(); testError(false); diff --git a/test/testinfo.cpp b/test/testinfo.cpp index aaed0d3..8e5aa53 100644 --- a/test/testinfo.cpp +++ b/test/testinfo.cpp @@ -150,6 +150,32 @@ struct Tester { testOk1(!done.wait(2.1)); } + void asyncCancel() + { + testShow()<<__func__; + + struct info_t { + client::Result actual; + epicsEvent done; + }; + auto info(std::make_shared()); + + serv.start(); + + // not storing Operation -> immediate cancel() + cli.info("mailbox") + .syncCancel(false) + .result([info](client::Result&& result) { + info->actual = std::move(result); + info->done.signal(); + }) + .exec(); + + cli.hurryUp(); + + testOk1(!info->done.wait(2.1)); + } + void orphan() { testShow()<<__func__; @@ -217,13 +243,14 @@ void testError() MAIN(testinfo) { - testPlan(12); + testPlan(13); testSetup(); logger_config_env(); Tester().loopback(); Tester().lazy(); Tester().timeout(); Tester().cancel(); + Tester().asyncCancel(); Tester().orphan(); testError(); return testDone(); diff --git a/test/testmon.cpp b/test/testmon.cpp index c409127..0bfb973 100644 --- a/test/testmon.cpp +++ b/test/testmon.cpp @@ -95,6 +95,35 @@ struct BasicTest { cli = client::Context(); op.reset(); } + + void cancel() + { + testShow()<<__func__; + epicsEvent done; + + cli.monitor("nonexistent") + .onInit([&done](const Value&) { + done.signal(); + }) + .exec(); + + testOk1(!done.wait(1.1)); + } + + void asyncCancel() + { + testShow()<<__func__; + auto done(std::make_shared()); + + cli.monitor("nonexistent") + .syncCancel(false) + .onInit([done](const Value&) { + done->signal(); + }) + .exec(); + + testOk1(!done->wait(1.1)); + } }; struct TestLifeCycle : public BasicTest @@ -261,10 +290,12 @@ struct TestReconn : public BasicTest MAIN(testmon) { - testPlan(22); + testPlan(24); testSetup(); logger_config_env(); BasicTest().orphan(); + BasicTest().cancel(); + BasicTest().asyncCancel(); TestLifeCycle().testBasic(true); TestLifeCycle().testBasic(false); TestLifeCycle().testSecond();