diff --git a/src/client.cpp b/src/client.cpp index e169be2..48d5b7e 100644 --- a/src/client.cpp +++ b/src/client.cpp @@ -26,6 +26,18 @@ constexpr size_t nBuckets = 30u; constexpr size_t maxSearchPayload = 0x4000; +Disconnect::Disconnect() + :std::runtime_error("Disconnected") +{} + +Disconnect::~Disconnect() {} + +RemoteError::RemoteError(const std::string& msg) + :std::runtime_error(msg) +{} + +RemoteError::~RemoteError() {} + Channel::Channel(const std::shared_ptr& context, const std::string& name, uint32_t cid) :context(context) ,name(name) diff --git a/src/clientconn.cpp b/src/clientconn.cpp index 8923dac..37a5a94 100644 --- a/src/clientconn.cpp +++ b/src/clientconn.cpp @@ -123,6 +123,14 @@ void Connection::cleanup() log_debug_printf(io, "Server %s detach channel '%s' to re-search\n", peerName.c_str(), chan->name.c_str()); } + auto ops = std::move(opByIOID); + for (auto& pair : ops) { + auto op = pair.second.handle.lock(); + if(!op) + continue; + op->disconnected(op); + } + // paranoia pending.clear(); chanBySID.clear(); diff --git a/src/clientimpl.h b/src/clientimpl.h index a981e38..d37828d 100644 --- a/src/clientimpl.h +++ b/src/clientimpl.h @@ -33,6 +33,7 @@ struct OperationBase : public Operation virtual ~OperationBase(); virtual void createOp() =0; + virtual void disconnected(const std::shared_ptr& self) =0; }; struct RequestInfo { diff --git a/src/clientintrospect.cpp b/src/clientintrospect.cpp index caa22eb..b4d2f23 100644 --- a/src/clientintrospect.cpp +++ b/src/clientintrospect.cpp @@ -12,18 +12,19 @@ namespace pvxs { namespace client { +DEFINE_LOGGER(setup, "pvxs.client.setup"); DEFINE_LOGGER(io, "pvxs.client.io"); namespace { struct InfoOp : public OperationBase { - std::function done; + std::function done; Value result; enum state_t { - Connecting, - Waiting, + Connecting, // waiting for an active Channel + Waiting, // waiting for reply to GET_INFO Done, } state = Connecting; @@ -38,9 +39,11 @@ struct InfoOp : public OperationBase virtual void cancel() override final { auto context = chan->context; - context->tcp_loop.call([this](){ + decltype (done) junk; + context->tcp_loop.call([this, &junk](){ state = Done; chan.reset(); + junk = std::move(done); }); } @@ -68,6 +71,17 @@ struct InfoOp : public OperationBase state = Waiting; } + + virtual void disconnected(const std::shared_ptr& self) override final + { + // Do nothing when Connecting or Done + if(state==Waiting) { + // return to pending + + chan->pending.push_back(self); + state = Connecting; + } + } }; } // namespace @@ -77,12 +91,13 @@ void Connection::handle_GET_FIELD() EvInBuf M(peerBE, segBuf.get(), 16); uint32_t ioid=0u; - Status sts; + Status sts{Status::Fatal}; Value prototype; from_wire(M, ioid); from_wire(M, sts); - from_wire_type(M, rxRegistry, prototype); + if(sts.isSuccess()) + from_wire_type(M, rxRegistry, prototype); if(!M.good()) { log_crit_printf(io, "Server %s sends invalid GET_FIELD. Disconnecting...\n", peerName.c_str()); @@ -115,7 +130,17 @@ void Connection::handle_GET_FIELD() if(info->done) { auto done = std::move(info->done); - done(std::move(prototype)); + Result res; + if(sts.isSuccess()) { + res = Result(std::move(prototype)); + } else { + res = Result(std::make_exception_ptr(RemoteError(sts.msg))); + } + try { + done(std::move(res)); + }catch(std::exception& e){ + log_err_printf(setup, "Unhandled exception %s in Info result() callback: %s\n", typeid (e).name(), e.what()); + } } else { info->result = prototype; diff --git a/src/pvxs/client.h b/src/pvxs/client.h index 427c41c..2c03909 100644 --- a/src/pvxs/client.h +++ b/src/pvxs/client.h @@ -6,6 +6,7 @@ #ifndef PVXS_CLIENT_H #define PVXS_CLIENT_H +#include #include #include #include @@ -22,6 +23,38 @@ namespace client { class Context; struct Config; +struct PVXS_API Disconnect : public std::runtime_error +{ + Disconnect(); + virtual ~Disconnect(); +}; + +struct PVXS_API RemoteError : public std::runtime_error +{ + RemoteError(const std::string& msg); + virtual ~RemoteError(); +}; + +//! Holder for a Value or an exception +class Result { + Value _result; + std::exception_ptr _error; +public: + Result() = default; + Result(Value&& val) :_result(std::move(val)) {} + explicit Result(const std::exception_ptr& err) :_error(err) {} + + //! Access to the Value, or rethrow the exception + Value& operator()() { + if(_error) + std::rethrow_exception(_error); + return _result; + } + + bool error() const { return !!_error; } + explicit operator bool() const { return _result || _error; } +}; + //! builder for pvRequest blob struct PVXS_API Request { @@ -120,7 +153,7 @@ public: }; class GetBuilder : protected CommonBuilder { - std::function _result; + std::function _result; bool _get; public: GetBuilder(const std::shared_ptr& pvt, const std::string& name, bool get) :CommonBuilder{pvt,name}, _get(get) {} diff --git a/test/testinfo.cpp b/test/testinfo.cpp index e177cd2..4472291 100644 --- a/test/testinfo.cpp +++ b/test/testinfo.cpp @@ -14,6 +14,7 @@ #include #include #include +#include #include namespace { @@ -39,12 +40,12 @@ struct Tester { void testWait() { - Value actual; + client::Result actual; epicsEvent done; auto op = cli.info("mailbox") - .result([&actual, &done](Value&& val) { - actual = std::move(val); + .result([&actual, &done](client::Result&& result) { + actual = std::move(result); done.trigger(); }) .exec(); @@ -52,7 +53,7 @@ struct Tester { cli.hurryUp(); if(testOk1(done.wait(5.0))) { - testEq(actual["value"].type(), TypeCode::Int32); + testEq(actual()["value"].type(), TypeCode::Int32); } else { testSkip(1, "timeout"); } @@ -91,14 +92,14 @@ struct Tester { { testShow()<<__func__; - Value actual; + client::Result actual; epicsEvent done; // server not started auto op = cli.info("mailbox") - .result([&actual, &done](Value&& val) { - actual = std::move(val); + .result([&actual, &done](client::Result&& result) { + actual = std::move(result); done.trigger(); }) .exec(); @@ -112,15 +113,15 @@ struct Tester { { testShow()<<__func__; - Value actual; + client::Result actual; epicsEvent done; serv.start(); // not storing Operation -> immediate cancel() cli.info("mailbox") - .result([&actual, &done](Value&& val) { - actual = std::move(val); + .result([&actual, &done](client::Result&& result) { + actual = std::move(result); done.trigger(); }) .exec(); @@ -131,15 +132,67 @@ struct Tester { } }; +struct ErrorSource : public server::Source +{ + virtual void onSearch(Search &op) override final + { + for(auto& name : op) { + name.claim(); + } + } + virtual void onCreate(std::unique_ptr &&op) override final + { + auto chan = std::move(op); + + chan->onOp([](std::unique_ptr&& op) { + op->error("haha"); + }); + } +}; + +void testError() +{ + testShow()<<__func__; + + auto serv = server::Config::localhost() + .build() + .addSource("err", std::make_shared()) + .start(); + + auto cli = serv.clientConfig().build(); + + client::Result actual; + epicsEvent done; + + auto op = cli.info("mailbox") + .result([&actual, &done](client::Result&& result) { + actual = std::move(result); + done.trigger(); + }) + .exec(); + + cli.hurryUp(); + + if(testOk1(done.wait(5.0))) { + testThrows([&actual]() { + actual(); + }); + + } else { + testSkip(1, "timeout"); + } +} + } // namespace MAIN(testinfo) { - testPlan(6); + testPlan(8); logger_config_env(); Tester().loopback(); Tester().lazy(); Tester().timeout(); Tester().cancel(); + testError(); return testDone(); } diff --git a/tools/info.cpp b/tools/info.cpp index 5423daf..a4259be 100644 --- a/tools/info.cpp +++ b/tools/info.cpp @@ -79,8 +79,8 @@ int main(int argc, char *argv[]) for(auto n : range(optind, argc)) { ops.push_back(ctxt.info(argv[n]) - .result([&argv, n, &remaining, &done](Value&& prototype) { - std::cout<