From 33b6f362db4b766893cabaaf91b0c49757d23a33 Mon Sep 17 00:00:00 2001 From: Michael Davidsaver Date: Mon, 27 Jul 2020 11:41:07 -0700 Subject: [PATCH] client: Handle orphaned Operations Allow Operation instances to outlive the Context through which they were created. --- src/client.cpp | 5 +++-- src/clientget.cpp | 6 ++++-- src/clientintrospect.cpp | 6 ++++-- src/clientmon.cpp | 6 ++++-- src/evhelper.cpp | 2 ++ src/evhelper.h | 10 +++++++--- test/testget.cpp | 14 +++++++++++++- test/testinfo.cpp | 12 ++++++++++++ test/testmon.cpp | 14 +++++++++++++- test/testput.cpp | 14 ++++++++++++++ test/testrpc.cpp | 12 ++++++++++++ 11 files changed, 88 insertions(+), 13 deletions(-) diff --git a/src/client.cpp b/src/client.cpp index 7d161b7..665a0c1 100644 --- a/src/client.cpp +++ b/src/client.cpp @@ -412,10 +412,11 @@ void Context::Pvt::close() conns.clear(); chans.clear(); - assert(internal_self.use_count()==1); + // internal_self.use_count() may be >1 if + // we are orphaning some Operations }); - tcp_loop.join(); + tcp_loop.sync(); // ensure any in-progress callbacks have completed manager.sync(); diff --git a/src/clientget.cpp b/src/clientget.cpp index c57598f..eb6b864 100644 --- a/src/clientget.cpp +++ b/src/clientget.cpp @@ -456,9 +456,11 @@ static void gpr_cleanup(std::shared_ptr& ret, std::shared_ptr&& op) { auto cap(std::move(op)); - ret.reset(cap.get(), [cap](Operation*) mutable { + auto loop(cap->chan->context->tcp_loop); + ret.reset(cap.get(), [cap, loop](Operation*) mutable { + auto L(std::move(loop)); // from use thread - cap->chan->context->tcp_loop.call([&cap]() { + L.call([&cap]() { auto temp(std::move(cap)); // on worker try { diff --git a/src/clientintrospect.cpp b/src/clientintrospect.cpp index 6a66a99..8c43007 100644 --- a/src/clientintrospect.cpp +++ b/src/clientintrospect.cpp @@ -192,10 +192,12 @@ std::shared_ptr GetBuilder::_exec_info() chan->pending.push_back(op); chan->createOperations(); - ret.reset(op.get(), [op](Operation*) mutable { + auto loop(op->chan->context->tcp_loop); + ret.reset(op.get(), [op, loop](Operation*) mutable { // on user thread auto temp(std::move(op)); - temp->chan->context->tcp_loop.call([&temp]() { + auto L(std::move(loop)); + L.call([&temp]() { // on worker try { temp->_cancel(true); diff --git a/src/clientmon.cpp b/src/clientmon.cpp index d623abd..da86d85 100644 --- a/src/clientmon.cpp +++ b/src/clientmon.cpp @@ -563,10 +563,12 @@ std::shared_ptr MonitorBuilder::exec() chan->pending.push_back(op); chan->createOperations(); - ret.reset(op.get(), [op](Subscription*) mutable { + auto loop(op->chan->context->tcp_loop); + ret.reset(op.get(), [op, loop](Subscription*) mutable { // on user thread auto temp(std::move(op)); - temp->chan->context->tcp_loop.call([&temp]() { + auto L(std::move(loop)); + L.call([&temp]() { // on worker try { temp->_cancel(true); diff --git a/src/evhelper.cpp b/src/evhelper.cpp index 4e927f2..776234e 100644 --- a/src/evhelper.cpp +++ b/src/evhelper.cpp @@ -122,6 +122,8 @@ struct evbase::Pvt : public epicsThreadRunable epicsThread worker; bool running = true; + INST_COUNTER(evbase); + Pvt(const std::string& name, unsigned prio) :worker(*this, name.c_str(), epicsThreadGetStackSize(epicsThreadStackBig), diff --git a/src/evhelper.h b/src/evhelper.h index 7b4f8be..0bcfe76 100644 --- a/src/evhelper.h +++ b/src/evhelper.h @@ -60,6 +60,9 @@ struct owned_ptr : public std::unique_ptr }; struct PVXS_API evbase { + evbase() = default; + evbase(const evbase&) = default; + evbase(evbase&&) = default; explicit evbase(const std::string& name, unsigned prio=0); ~evbase(); void join(); @@ -75,12 +78,13 @@ struct PVXS_API evbase { void assertInLoop(); bool inLoop(); - INST_COUNTER(evbase); + inline void reset() { pvt.reset(); } + private: struct Pvt; - std::unique_ptr pvt; + std::shared_ptr pvt; public: - event_base* const base; + event_base* base = nullptr; }; typedef owned_ptr evevent; diff --git a/test/testget.cpp b/test/testget.cpp index a1e0112..b1de0cb 100644 --- a/test/testget.cpp +++ b/test/testget.cpp @@ -45,7 +45,7 @@ struct Tester { ~Tester() { - if(cli.use_count()!=1u) + if(cli.use_count()>1u) testAbort("Tester Context leak"); } @@ -173,6 +173,17 @@ struct Tester { testOk1(!done.wait(2.1)); } + + void orphan() + { + testShow()<<__func__; + + auto op = cli.get("nonexistent").exec(); + + // clear Context to orphan in-progress operation + cli = client::Context(); + op.reset(); + } }; struct ErrorSource : public server::Source @@ -253,6 +264,7 @@ MAIN(testget) Tester().lazy(); Tester().timeout(); Tester().cancel(); + Tester().orphan(); testError(false); testError(true); cleanup_for_valgrind(); diff --git a/test/testinfo.cpp b/test/testinfo.cpp index 3e873ea..aaed0d3 100644 --- a/test/testinfo.cpp +++ b/test/testinfo.cpp @@ -149,6 +149,17 @@ struct Tester { testOk1(!done.wait(2.1)); } + + void orphan() + { + testShow()<<__func__; + + auto op = cli.info("nonexistent").exec(); + + // clear Context to orphan in-progress operation + cli = client::Context(); + op.reset(); + } }; struct ErrorSource : public server::Source @@ -213,6 +224,7 @@ MAIN(testinfo) Tester().lazy(); Tester().timeout(); Tester().cancel(); + Tester().orphan(); testError(); return testDone(); } diff --git a/test/testmon.cpp b/test/testmon.cpp index 788fc80..c409127 100644 --- a/test/testmon.cpp +++ b/test/testmon.cpp @@ -48,7 +48,7 @@ struct BasicTest { ~BasicTest() { - if(cli.use_count()!=1u) + if(cli.use_count()>1u) testAbort("Tester Context leak"); } @@ -84,6 +84,17 @@ struct BasicTest { } return ret; } + + void orphan() + { + testShow()<<__func__; + + auto op = cli.monitor("nonexistent").exec(); + + // clear Context to orphan in-progress operation + cli = client::Context(); + op.reset(); + } }; struct TestLifeCycle : public BasicTest @@ -253,6 +264,7 @@ MAIN(testmon) testPlan(22); testSetup(); logger_config_env(); + BasicTest().orphan(); TestLifeCycle().testBasic(true); TestLifeCycle().testBasic(false); TestLifeCycle().testSecond(); diff --git a/test/testput.cpp b/test/testput.cpp index c68dcd9..5d2cb3e 100644 --- a/test/testput.cpp +++ b/test/testput.cpp @@ -174,6 +174,19 @@ struct Tester : public TesterBase testOk1(!done.wait(2.1)); } + + void orphan() + { + testShow()<<__func__; + + auto op = cli.put("nonexistent") + .set("value", "foo") + .exec(); + + // clear Context to orphan in-progress operation + cli = client::Context(); + op.reset(); + } }; struct TestPutBuilder : public TesterBase @@ -326,6 +339,7 @@ MAIN(testput) Tester().lazy(); Tester().timeout(); Tester().cancel(); + Tester().orphan(); TestPutBuilder().testSet(); testRO(); testError(); diff --git a/test/testrpc.cpp b/test/testrpc.cpp index 8d17796..cf1e34b 100644 --- a/test/testrpc.cpp +++ b/test/testrpc.cpp @@ -201,6 +201,17 @@ struct Tester { testEq(result["query.a"].as(), 5); testEq(result["query.b"].as(), "hello"); } + + void orphan() + { + testShow()<<__func__; + + auto op = cli.rpc("nonexistent").exec(); + + // clear Context to orphan in-progress operation + cli = client::Context(); + op.reset(); + } }; } // namespace @@ -216,6 +227,7 @@ MAIN(testrpc) Tester().cancel(); Tester().error(); Tester().builder(); + Tester().orphan(); cleanup_for_valgrind(); return testDone(); }