diff --git a/src/Makefile b/src/Makefile index eb0743e..a11fc25 100644 --- a/src/Makefile +++ b/src/Makefile @@ -78,6 +78,7 @@ LIB_SRCS += sharedpv.cpp LIB_SRCS += client.cpp LIB_SRCS += clientconn.cpp LIB_SRCS += clientintrospect.cpp +LIB_SRCS += clientget.cpp LIB_LIBS += Com diff --git a/src/client.cpp b/src/client.cpp index 861d3eb..bd9cd9d 100644 --- a/src/client.cpp +++ b/src/client.cpp @@ -6,6 +6,7 @@ #include #include +#include #include #include @@ -70,7 +71,10 @@ void Channel::createOperations() ioid = conn->nextIOID++; } while(conn->opByIOID.find(ioid)!=conn->opByIOID.end()); - conn->opByIOID.insert(std::make_pair(ioid, RequestInfo(ioid, op))); + //conn->opByIOID.insert(std::make_pair(ioid, RequestInfo(sid, ioid, op))); + conn->opByIOID.emplace(std::piecewise_construct, + std::forward_as_tuple(ioid), + std::forward_as_tuple(sid, ioid, op)); op->ioid = ioid; op->createOp(); @@ -85,8 +89,9 @@ OperationBase::OperationBase(operation_t op, const std::shared_ptr& cha OperationBase::~OperationBase() {} -RequestInfo::RequestInfo(uint32_t ioid, std::shared_ptr& handle) - :ioid(ioid) +RequestInfo::RequestInfo(uint32_t sid, uint32_t ioid, std::shared_ptr& handle) + :sid(sid) + ,ioid(ioid) ,op(handle->op) ,handle(handle) {} diff --git a/src/clientconn.cpp b/src/clientconn.cpp index 37a5a94..b118f6a 100644 --- a/src/clientconn.cpp +++ b/src/clientconn.cpp @@ -73,6 +73,21 @@ void Connection::createChannels() } } +void Connection::sendDestroyRequest(uint32_t sid, uint32_t ioid) +{ + { + (void)evbuffer_drain(txBody.get(), evbuffer_get_length(txBody.get())); + + EvOutBuf R(hostBE, txBody.get()); + + to_wire(R, uint16_t(1u)); + to_wire(R, sid); + to_wire(R, ioid); + } + enqueueTxBody(CMD_DESTROY_CHANNEL); + +} + void Connection::bevEvent(short events) { ConnBase::bevEvent(events); @@ -347,6 +362,8 @@ void Connection::handle_DESTROY_CHANNEL() self = std::move(chan->conn); context->searchBuckets[context->currentBucket].push_back(chan); + // TODO: disconnect Operations + log_debug_printf(io, "Server %s destroys channel '%s' %u:%u\n", peerName.c_str(), chan->name.c_str(), unsigned(cid), unsigned(sid)); } diff --git a/src/clientget.cpp b/src/clientget.cpp new file mode 100644 index 0000000..12904a0 --- /dev/null +++ b/src/clientget.cpp @@ -0,0 +1,398 @@ +/** + * Copyright - See the COPYRIGHT that is included with this distribution. + * pvxs is distributed subject to a Software License Agreement found + * in file LICENSE that is included with this distribution. + */ + +#include + +#include +#include "clientimpl.h" + +namespace pvxs { +namespace client { + +DEFINE_LOGGER(setup, "pvxs.client.setup"); +DEFINE_LOGGER(io, "pvxs.client.io"); + +namespace { + +struct GPROp : public OperationBase +{ + std::function builder; + std::function done; + Value pvRequest; + Value rpcarg; + Result result; + bool getOput = false; + + enum state_t : uint8_t { + Connecting, // waiting for an active Channel + Creating, // waiting for reply to INIT + GetOPut, // waiting for reply to GET (CMD_PUT only) + BuildPut, // waiting for PUT builder callback + Exec, // waiting for reply to EXEC + Done, + } state = Connecting; + + GPROp(operation_t op, const std::shared_ptr& chan) + :OperationBase (op, chan) + {} + ~GPROp() { + cancel(); + } + + void notify() { + try { + if(done) + done(std::move(result)); + } catch(std::exception& e) { + if(chan && chan->conn) + log_err_printf(io, "Server %s channel %s error in result cb : %s\n", + chan->conn->peerName.c_str(), chan->name.c_str(), e.what()); + + // keep first error (eg. from put builder) + if(!result.error()) + result = Result(std::current_exception()); + } + } + + virtual void cancel() override final + { + auto context = chan->context; + decltype (done) junk; + context->tcp_loop.call([this, &junk](){ + if(state==GetOPut || state==Exec) { + chan->conn->sendDestroyRequest(chan->sid, ioid); + + // This opens up a race with an in-flight reply. + chan->conn->opByIOID.erase(ioid); + } + state = Done; + chan.reset(); + junk = std::move(done); + // leave opByIOID for GC + }); + } + + virtual void createOp() override final + { + if(state!=Connecting) { + return; + } + + auto& conn = chan->conn; + + { + (void)evbuffer_drain(conn->txBody.get(), evbuffer_get_length(conn->txBody.get())); + + EvOutBuf R(hostBE, conn->txBody.get()); + + to_wire(R, chan->sid); + to_wire(R, ioid); + to_wire(R, uint8_t(0x08)); // INIT + to_wire(R, Value::Helper::desc(pvRequest)); + to_wire_full(R, pvRequest); + } + conn->enqueueTxBody(pva_app_msg_t(uint8_t(op))); + + log_debug_printf(io, "Server %s channel '%s' op%02x INIT\n", + conn->peerName.c_str(), chan->name.c_str(), op); + + state = Creating; + } + + virtual void disconnected(const std::shared_ptr &self) override final + { + if(state==Connecting || state==Done) { + // noop + + } else if(state==Creating || state==GetOPut || (state==Exec && op==Get)) { + // return to pending + + chan->pending.push_back(self); + state = Connecting; + + } else if(state==Exec) { + // can't restart as server side-effects may occur + state = Done; + result = Result(std::make_exception_ptr(Disconnect())); + + notify(); + + } else { + throw std::logic_error("GPR Disconnect unexpected state"); + } + } +}; + +} // namespace + +void Connection::handle_GPR(pva_app_msg_t cmd) +{ + EvInBuf M(peerBE, segBuf.get(), 16); + + uint32_t ioid; + uint8_t subcmd; + Status sts; + Value data; // hold prototype (INIT) or reply data (GET) + + from_wire(M, ioid); + from_wire(M, subcmd); + from_wire(M, sts); + bool init = subcmd&0x08; + bool get = subcmd&0x40; + + // immediately deserialize in unambigous cases + + if(M.good() && cmd!=CMD_RPC && init && sts.isSuccess()) { + // INIT of PUT or GET, decode type description + + from_wire_type(M, rxRegistry, data); + + } else if(M.good() && cmd==CMD_RPC && !init && sts.isSuccess()) { + // RPC reply + + from_wire_type(M, rxRegistry, data); + if(data) + from_wire_full(M, rxRegistry, data); + } + + // need type info from INIT reply to decode PUT/GET + + RequestInfo* info=nullptr; + if(M.good()) { + auto it = opByIOID.find(ioid); + if(it!=opByIOID.end()) { + info = &it->second; + + } else { + auto lvl = Level::Debug; + if(cmd!=CMD_RPC && !init) { + // We don't have enough information to decode the rest of the payload. + // This *may* leave rxRegistry out of sync (if it contains Variant Unions). + // We can't know whether this is the case. + // Failing soft here may lead to failures decoding future replies. + // We could force close the Connection here to be "safe". + // However, we assume the such usage of Variant is relatively rare + + lvl = Level::Err; + } + + log_printf(io, lvl, "Server %s uses non-existant IOID %u. Ignoring...\n", + peerName.c_str(), unsigned(ioid)); + return; + } + + if(cmd!=CMD_RPC && init && sts.isSuccess()) { + // INIT of PUT or GET, store type description + info->prototype = data; + + } else if(M.good() && cmd!=CMD_RPC && !init && sts.isSuccess()) { + // GET/PUT reply + + data = info->prototype.cloneEmpty(); + if(data) + from_wire_full(M, rxRegistry, data); + } + } + + // validate received message against operation state + + std::shared_ptr op; + GPROp* gpr = nullptr; + if(M.good() && info) { + op = info->handle.lock(); + if(!op) { + // assume op has already sent CMD_DESTROY_REQUEST + log_debug_printf(io, "Server %s ignoring stake cmd%02x ioid %u\n", + peerName.c_str(), cmd, unsigned(ioid)); + return; + } + + if(uint8_t(op->op)!=cmd) { + // peer mixes up IOID and operation type + M.fault(); + + } else { + gpr = static_cast(op.get()); + + // check that subcmd is as expected based on operation state + if((gpr->state==GPROp::Creating) ^ init) { + M.fault(); + + } else if(gpr->state==GPROp::GetOPut && !get) { + M.fault(); + + } else if(gpr->state!=GPROp::Exec) { + M.fault(); + } + } + } + + if(!M.good() || !gpr) { + log_crit_printf(io, "Server %s sends invalid op%02x. Disconnecting...\n", peerName.c_str(), cmd); + bev.reset(); + return; + } + + // advance operation state + + decltype (gpr->state) prev = gpr->state; + + if(!sts.isSuccess()) { + gpr->result = Result(std::make_exception_ptr(RemoteError(sts.msg))); + gpr->state = GPROp::Done; + + } else if(gpr->state==GPROp::Creating) { + + if(cmd==CMD_PUT && gpr->getOput) { + gpr->state = GPROp::GetOPut; + + } else if(cmd==CMD_PUT && !gpr->getOput) { + gpr->state = GPROp::BuildPut; + + } else { + gpr->state = GPROp::Exec; + } + + } else if(gpr->state==GPROp::GetOPut) { + gpr->state = GPROp::BuildPut; + + info->prototype.assign(data); + + } else if(gpr->state==GPROp::Exec) { + gpr->state = GPROp::Done; + + if(cmd!=CMD_PUT) + gpr->result = Result(std::move(data)); + + } else { + // should be avoided above + throw std::logic_error("GPR advance state inconsistent"); + } + + // transient state (because builder callback is synchronous) + if(gpr->state==GPROp::BuildPut) { + Value arg(info->prototype.clone()); + + try { + info->prototype = gpr->builder(std::move(arg)); + gpr->state = GPROp::Exec; + + } catch(std::exception& e) { + gpr->result = Result(std::current_exception()); + gpr->state = GPROp::Done; + } + } + + log_debug_printf(io, "Server %s channel %s op%02x state %d -> %d\n", + peerName.c_str(), op->chan->name.c_str(), cmd, prev, gpr->state); + + // act on new operation state + + { + (void)evbuffer_drain(txBody.get(), evbuffer_get_length(txBody.get())); + + EvOutBuf R(hostBE, txBody.get()); + + to_wire(R, op->chan->sid); + to_wire(R, ioid); + if(gpr->state==GPROp::GetOPut) { + to_wire(R, 0x40); + + } else if(gpr->state==GPROp::Exec) { + to_wire(R, 0x00); + to_wire_valid(R, info->prototype); + + } else if(gpr->state==GPROp::Done) { + // we're actually building CMD_DESTROY_REQUEST + // nothing more needed + } + } + enqueueTxBody(gpr->state==GPROp::Done ? CMD_DESTROY_REQUEST : cmd); + + if(gpr->state==GPROp::Done) { + // CMD_DESTROY_REQUEST is not acknowledged (sigh...) + // but at this point a server should not send further GET/PUT/RPC w/ this IOID + // so we can ~safely forget about it. + // we might get CMD_MESSAGE, but these could be ignored with no ill effects. + opByIOID.erase(ioid); + + gpr->notify(); + } +} + +void Connection::handle_GET() { handle_GPR(CMD_GET); } +void Connection::handle_PUT() { handle_GPR(CMD_PUT); } +void Connection::handle_RPC() { handle_GPR(CMD_RPC); } + +std::shared_ptr Context::GetBuilder::_exec_get() +{ + std::shared_ptr ret; + assert(_get); + + pvt->tcp_loop.call([&ret, this]() { + auto chan = Channel::build(pvt, _name); + + auto op = std::make_shared(Operation::Get, chan); + op->done = std::move(_result); + // TODO pvRequest + + chan->pending.push_back(op); + chan->createOperations(); + + ret = std::move(op); + }); + + return ret; +} + +std::shared_ptr Context::PutBuilder::exec() +{ + std::shared_ptr ret; + + if(!_builder) + throw std::logic_error("put() requires a builder()"); + + pvt->tcp_loop.call([&ret, this]() { + auto chan = Channel::build(pvt, _name); + + auto op = std::make_shared(Operation::Put, chan); + op->done = std::move(_result); + op->builder = std::move(_builder); + op->getOput = _doGet; + // TODO pvRequest + + chan->pending.push_back(op); + chan->createOperations(); + + ret = std::move(op); + }); + + return ret; +} + +std::shared_ptr Context::RPCBuilder::exec() +{ + std::shared_ptr ret; + + pvt->tcp_loop.call([&ret, this]() { + auto chan = Channel::build(pvt, _name); + + auto op = std::make_shared(Operation::Put, chan); + op->done = std::move(_result); + op->rpcarg = std::move(_argument); + // TODO pvRequest + + chan->pending.push_back(op); + chan->createOperations(); + + ret = std::move(op); + }); + + return ret; +} + +} // namespace client +} // namespace pvxs diff --git a/src/clientimpl.h b/src/clientimpl.h index 444448e..9df3501 100644 --- a/src/clientimpl.h +++ b/src/clientimpl.h @@ -37,13 +37,13 @@ struct OperationBase : public Operation }; struct RequestInfo { - const uint32_t ioid; + const uint32_t sid, ioid; const Operation::operation_t op; const std::weak_ptr handle; Value prototype; - RequestInfo(uint32_t ioid, std::shared_ptr& handle); + RequestInfo(uint32_t sid, uint32_t ioid, std::shared_ptr& handle); }; struct Connection : public ConnBase { @@ -64,10 +64,12 @@ struct Connection : public ConnBase { uint32_t nextIOID = 0u; Connection(const std::shared_ptr& context, const SockAddr &peerAddr); - ~Connection(); + virtual ~Connection(); void createChannels(); + void sendDestroyRequest(uint32_t sid, uint32_t ioid); + virtual void bevEvent(short events) override final; virtual void cleanup() override final; @@ -79,9 +81,15 @@ struct Connection : public ConnBase { CASE(CREATE_CHANNEL); CASE(DESTROY_CHANNEL); + CASE(GET); + CASE(PUT); + //CASE(PUT_GET); + //CASE(MONITOR); + CASE(RPC); CASE(GET_FIELD); #undef CASE + void handle_GPR(pva_app_msg_t cmd); protected: void tickEcho(); static void tickEchoS(evutil_socket_t fd, short evt, void *raw); diff --git a/src/clientintrospect.cpp b/src/clientintrospect.cpp index b4d2f23..de2f360 100644 --- a/src/clientintrospect.cpp +++ b/src/clientintrospect.cpp @@ -41,9 +41,16 @@ struct InfoOp : public OperationBase auto context = chan->context; decltype (done) junk; context->tcp_loop.call([this, &junk](){ + if(state==Waiting) { + chan->conn->sendDestroyRequest(chan->sid, ioid); + + // This opens up a race with an in-flight reply. + chan->conn->opByIOID.erase(ioid); + } state = Done; chan.reset(); junk = std::move(done); + // leave opByIOID for GC }); } @@ -147,12 +154,11 @@ void Connection::handle_GET_FIELD() } } -std::shared_ptr Context::GetBuilder::exec() +std::shared_ptr Context::GetBuilder::_exec_info() { std::shared_ptr ret; - if(_get) - throw std::runtime_error("Get Not Implemented"); + assert(!_get); pvt->tcp_loop.call([&ret, this]() { auto chan = Channel::build(pvt, _name); diff --git a/src/pvxs/client.h b/src/pvxs/client.h index 16e64bc..bdfded2 100644 --- a/src/pvxs/client.h +++ b/src/pvxs/client.h @@ -80,11 +80,11 @@ private: //! Handle for in-progress operation struct PVXS_API Operation { const enum operation_t { - Info, - Get, - Put, - RPC, - Monitor, + Info = 17, // CMD_GET_FIELD + Get = 10, // CMD_GET + Put = 11, // CMD_PUT + RPC = 20, // CMD_RPC + Monitor = 13, // CMD_MONITOR } op; explicit constexpr Operation(operation_t op) :op(op) {} @@ -152,17 +152,22 @@ public: SubBuilder& server(const std::string& s) { _server = s; return _sb(); } }; - class GetBuilder : protected CommonBuilder { + class GetBuilder : public CommonBuilder { std::function _result; bool _get; + PVXS_API + std::shared_ptr _exec_info(); + PVXS_API + std::shared_ptr _exec_get(); public: GetBuilder(const std::shared_ptr& pvt, const std::string& name, bool get) :CommonBuilder{pvt,name}, _get(get) {} //! Callback through which result Value will be delivered GetBuilder& result(decltype (_result)&& cb) { _result = std::move(cb); return *this; } //! Initiate network operation. - PVXS_API - std::shared_ptr exec(); + inline std::shared_ptr exec() { + return _get ? _exec_get() : _exec_info(); + } friend struct Context::Pvt; }; @@ -173,8 +178,8 @@ public: * @code * Context ctxt(...); * auto op = ctxt.info("pv:name") - * .result([](Value&& prototype){ - * std::cout< { bool _doGet = true; std::function _builder; - std::function _result; + std::function _result; public: PutBuilder(const std::shared_ptr& pvt, const std::string& name) :CommonBuilder{pvt,name} {} + PutBuilder& result(decltype (_result)&& cb) { _result = std::move(cb); return *this; } PVXS_API std::shared_ptr exec(); @@ -197,9 +203,10 @@ public: struct RPCBuilder : protected CommonBuilder { Value _argument; - std::function _result; + std::function _result; public: RPCBuilder(const std::shared_ptr& pvt, const std::string& name, Value&& arg) :CommonBuilder{pvt,name}, _argument(std::move(arg)) {} + RPCBuilder& result(decltype (_result)&& cb) { _result = std::move(cb); return *this; } PVXS_API std::shared_ptr exec();