From 583ee684abc6fae3fdbc4358f827b36e4e41d50a Mon Sep 17 00:00:00 2001 From: Michael Davidsaver Date: Sun, 15 Dec 2019 12:04:50 -0800 Subject: [PATCH] generalize Get/Put/RPC handling --- src/dataencode.cpp | 5 + src/pvaproto.h | 5 + src/pvxs/server.h | 167 +++++--------- src/server.cpp | 36 +--- src/serverchan.cpp | 57 ++++- src/serverconn.cpp | 20 +- src/serverconn.h | 24 ++- src/serverget.cpp | 456 ++++++++++++++++++++++++++++----------- src/serverintrospect.cpp | 94 +++++--- test/dummyserv.cpp | 111 ++++++---- 10 files changed, 594 insertions(+), 381 deletions(-) diff --git a/src/dataencode.cpp b/src/dataencode.cpp index 7c2907d..edbdfac 100644 --- a/src/dataencode.cpp +++ b/src/dataencode.cpp @@ -29,6 +29,11 @@ namespace impl { void to_wire(Buffer& buf, const FieldDesc* cur) { + if(!cur) { + to_wire(buf, uint8_t(0xff)); + return; + } + // we assume FieldDesc* is valid (checked on creation) to_wire(buf, cur->code.code); diff --git a/src/pvaproto.h b/src/pvaproto.h index 9eda0a9..355f6d9 100644 --- a/src/pvaproto.h +++ b/src/pvaproto.h @@ -348,6 +348,11 @@ struct Status { std::string trace; inline bool isSuccess() const { return code==Ok || code==Warn; } + + static inline Status error(const std::string& msg, const std::string& trace = std::string()) + { + return Status{Error, msg, trace}; + } }; inline diff --git a/src/pvxs/server.h b/src/pvxs/server.h index a9300e8..4aa1431 100644 --- a/src/pvxs/server.h +++ b/src/pvxs/server.h @@ -25,7 +25,6 @@ struct ServerConn; } namespace server { -struct Handler; struct Source; /** PV Access protocol server instance @@ -109,38 +108,70 @@ private: std::shared_ptr pvt; }; -struct OpBase { +struct PVXS_API OpBase { + enum op_t { + None, + Info, + Get, + Put, + RPC, + }; +protected: + std::string _peerName; + std::string _ifaceName; + std::string _name; + op_t _op; +public: //! The Client endpoint address in "X.X.X.X:Y" format. - const std::string peerName; + const std::string& peerName() const { return _peerName; } //! The local endpoint address in "X.X.X.X:Y" format. - const std::string ifaceName; + const std::string& ifaceName() const { return _ifaceName; } //! The Channel name - const std::string name; + const std::string& name() const { return _name; } + op_t op() const { return _op; } // TODO credentials - OpBase(const std::string& peerName, - const std::string& iface, - const std::string& name); virtual ~OpBase() =0; }; +struct PVXS_API ExecOp : public OpBase { + + virtual void reply() =0; + virtual void reply(const Value& val) =0; + virtual void error(const std::string& msg) =0; + + virtual void onCancel(std::function&&) =0; + + virtual ~ExecOp(); +}; + +struct PVXS_API ConnectOp : public OpBase { + Value pvRequest; + + virtual void connect(const Value& prototype) =0; + virtual void error(const std::string& msg) =0; + + virtual ~ConnectOp(); + + virtual void onGet(std::function&&)>&& fn) =0; + virtual void onPut(std::function&&, Value&&)>&& fn) =0; + virtual void onClose(std::function&&) =0; +}; + /** Manipulate an active Channel, and any in-progress Operations through it. * */ struct PVXS_API ChannelControl : public OpBase { - ChannelControl(const std::string& peerName, - const std::string& iface, - const std::string& name) - :OpBase (peerName, iface, name) - {} virtual ~ChannelControl() =0; - //! Set/replace Handler associated with this Channel - //! If called from outside a Handler method, blocks until in-progress Handler methods have returned. - virtual std::shared_ptr setHandler(const std::shared_ptr& h) =0; + //! invoked when a new GET, PUT, or RPC Operation is requested through this Channel + virtual void onOp(std::function&&)>&& ) =0; + virtual void onRPC(std::function&&, Value&&)>&& fn)=0; + + virtual void onClose(std::function&&) =0; //! Force disconnection - //! If called from outside a Handler method, blocks until in-progress Handler methods have returned. + //! If called from outside a handler method, blocks until in-progress Handler methods have returned. //! Reference to currently attached Handler is released. virtual void close() =0; @@ -199,108 +230,6 @@ struct PVXS_API Source { virtual void onCreate(std::unique_ptr&& op) =0; }; -//! Token for an in-progress request for Channel data type information. -struct PVXS_API Introspect : public OpBase -{ - //! Positive reply. Only the type of the provided Value is used. Any field values are ignored. - virtual void reply(const Value& prototype) =0; - //! Negative reply w/ error message - virtual void error(const std::string& msg) =0; - - Introspect(const std::string& peerName, - const std::string& iface, - const std::string& name) - :OpBase (peerName, iface, name) - {} - virtual ~Introspect() =0; -}; - -struct PVXS_API Get : public OpBase -{ - const Value request; - - //! Define (and communicate) the type of this Get operation. - virtual void connect(const Value& prototype, - std::function&& onExec) =0; - //! Positive reply w/ data - virtual void reply(const Value& data) =0; - //! Negative reply w/ error message - virtual void error(const std::string& msg) =0; - - Get(const std::string& peerName, - const std::string& iface, - const std::string& name, - const Value& request) - :OpBase (peerName, iface, name) - ,request(request) - {} - virtual ~Get() =0; -}; - -struct PVXS_API Put : public OpBase -{ - const Value request; - const Value value; - - //! Positive reply - virtual void complete() =0; - //! Negative reply w/ error message - virtual void error(const std::string& msg) =0; - - Put(const std::string& peerName, - const std::string& iface, - const std::string& name, - const Value& request, - const Value& value) - :OpBase (peerName, iface, name) - ,request(request) - ,value(value) - {} - virtual ~Put() =0; -}; - -struct PVXS_API RPC : public OpBase -{ - const Value request; - const Value value; - - //! Positive reply w/ data - virtual void reply(const Value& prototype) =0; - //! Negative reply w/ error message - virtual void error(const std::string& msg) =0; - - RPC(const std::string& peerName, - const std::string& iface, - const std::string& name, - const Value& request, - const Value& value) - :OpBase (peerName, iface, name) - ,request(request) - ,value(value) - {} - virtual ~RPC() =0; -}; - -/** Requests for a particular Channel are dispatched through me. - * - * User code will sub-class. - */ -struct PVXS_API Handler { - virtual ~Handler(); - - /** Request for Channel data type information - * - * Ownership of the Introspect instance is passed to the callee. - * The request will be implicitly errored if the callee allows - * the Introspect to be deleted prior to replying. - */ - virtual void onIntrospect(std::unique_ptr&& op); - - virtual void onGet(std::unique_ptr&& op); - virtual void onPut(std::unique_ptr&& op); - virtual void onRPC(std::unique_ptr&& op); -}; - }} // namespace pvxs::server #endif // PVXS_SERVER_H diff --git a/src/server.cpp b/src/server.cpp index 56a77d2..14d9244 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -597,41 +597,11 @@ void Server::Pvt::doBeaconsS(evutil_socket_t fd, short evt, void *raw) Source::~Source() {} -OpBase::OpBase(const std::string& peerName, - const std::string& iface, - const std::string& name) - :peerName(peerName) - ,ifaceName(iface) - ,name(name) -{} - OpBase::~OpBase() {} ChannelControl::~ChannelControl() {} -Introspect::~Introspect() {} -Get::~Get() {} -Put::~Put() {} -RPC::~RPC() {} +ConnectOp::~ConnectOp() {} +ExecOp::~ExecOp() {} -Handler::~Handler() {} - -void Handler::onIntrospect(std::unique_ptr&& op) -{ - op->error("Not Implemented"); -} -void Handler::onGet(std::unique_ptr&& op) -{ - op->error("Not Implemented"); -} -void Handler::onPut(std::unique_ptr&& op) -{ - op->error("Not Implemented"); -} -void Handler::onRPC(std::unique_ptr &&op) -{ - op->error("Not Implemented");} - -} - -} // namespace pvxs::server +}} // namespace pvxs::server diff --git a/src/serverchan.cpp b/src/serverchan.cpp index bf8d6c0..fbbb78f 100644 --- a/src/serverchan.cpp +++ b/src/serverchan.cpp @@ -33,25 +33,60 @@ ServerChan::ServerChan(const std::shared_ptr &conn, ServerChan::~ServerChan() {} ServerChannelControl::ServerChannelControl(const std::shared_ptr &conn, const std::shared_ptr& channel) - :server::ChannelControl(conn->peerName, conn->iface->name, channel->name) - ,server(conn->iface->server->internal_self) + :server(conn->iface->server->internal_self) ,chan(channel) -{} +{ + _op = None; + _name = channel->name; + _peerName = conn->peerName; + _ifaceName = conn->iface->name; +} ServerChannelControl::~ServerChannelControl() {} -std::shared_ptr ServerChannelControl::setHandler(const std::shared_ptr &h) +void ServerChannelControl::onOp(std::function&&)>&& fn) { - std::shared_ptr ret(h); - std::shared_ptr serv(server); + auto serv = server.lock(); + if(!serv) + return; - serv->acceptor_loop.call([this, &ret](){ - std::shared_ptr ch(chan); + serv->acceptor_loop.call([this, &fn](){ + auto ch = chan.lock(); + if(!ch) + return; - ch->handler.swap(ret); + ch->onOp = std::move(fn); }); +} - return ret; +void ServerChannelControl::onRPC(std::function&&, Value&&)>&& fn) +{ + auto serv = server.lock(); + if(!serv) + return; + + serv->acceptor_loop.call([this, &fn](){ + auto ch = chan.lock(); + if(!ch) + return; + + ch->onRPC = std::move(fn); + }); +} + +void ServerChannelControl::onClose(std::function&& fn) +{ + auto serv = server.lock(); + if(!serv) + return; + + serv->acceptor_loop.call([this, &fn](){ + auto ch = chan.lock(); + if(!ch) + return; + + ch->onClose = std::move(fn); + }); } static @@ -246,7 +281,7 @@ void ServerConn::handle_CREATE_CHANNEL() for(auto& pair : iface->server->sources) { try { pair.second->onCreate(std::move(op)); - if(!op || chan->handler || chan->state!=ServerChan::Creating) { + if(!op || chan->onOp || chan->onClose || chan->state!=ServerChan::Creating) { claimed = chan->state==ServerChan::Creating; log_printf(connsetup, PLVL_DEBUG, "Client %s %s channel to %s through %s\n", peerName.c_str(), claimed?"accepted":"rejected", name.c_str(), pair.first.second.c_str()); diff --git a/src/serverconn.cpp b/src/serverconn.cpp index 5f1f244..1b57fb2 100644 --- a/src/serverconn.cpp +++ b/src/serverconn.cpp @@ -190,12 +190,6 @@ void ServerConn::handle_AUTHNZ() // ignored (so far no auth plugin actually uses) } -void ServerConn::handle_PUT() -{} - -void ServerConn::handle_RPC() -{} - void ServerConn::handle_PUT_GET() {} @@ -225,6 +219,9 @@ void ServerConn::handle_CANCEL_REQUEST() if(op->state==ServerOp::Executing) { op->state = ServerOp::Idle; + if(op->onCancel) + op->onCancel(); + } else { // an allowed race log_printf(connsetup, PLVL_DEBUG, "Client %s Cancel of non-executing Op\n", peerName.c_str()); @@ -251,9 +248,12 @@ void ServerConn::handle_DESTROY_REQUEST() peerName.c_str(), unsigned(sid), unsigned(ioid)); } else { - it->second->state = ServerOp::Dead; - // TODO interface to notify Op of cancel/destroy + auto op = it->second; opByIOID.erase(it); + op->state = ServerOp::Dead; + + if(op->onClose) + op->onClose(""); } } @@ -424,6 +424,8 @@ void ServerConn::bevRead() #undef CASE } // handlers may be cleared bev to force disconnect + if(!bev) + break; // silently drain any unprocessed body (forward compatibility) if(auto n = evbuffer_get_length(segBuf.get())) @@ -532,6 +534,4 @@ void ServIface::onConnS(struct evconnlistener *listener, evutil_socket_t sock, s ServerOp::~ServerOp() {} -void ServerOp::cancel() {} - }} // namespace pvxs::impl diff --git a/src/serverconn.h b/src/serverconn.h index ad459fb..6b1d8c9 100644 --- a/src/serverconn.h +++ b/src/serverconn.h @@ -33,6 +33,9 @@ struct ServerOp const uint32_t ioid; + std::function onClose; + std::function onCancel; + enum state_t { Creating, Idle, @@ -41,9 +44,9 @@ struct ServerOp } state; ServerOp(const std::weak_ptr& chan, uint32_t ioid) :chan(chan), ioid(ioid), state(Idle) {} + ServerOp(const ServerOp&) = delete; + ServerOp& operator=(const ServerOp&) = delete; virtual ~ServerOp() =0; - - virtual void cancel(); }; struct ServerChannelControl : public server::ChannelControl @@ -51,7 +54,10 @@ struct ServerChannelControl : public server::ChannelControl ServerChannelControl(const std::shared_ptr& conn, const std::shared_ptr& chan); virtual ~ServerChannelControl(); - virtual std::shared_ptr setHandler(const std::shared_ptr &h) override final; + virtual void onOp(std::function&&)>&& fn) override final; + virtual void onRPC(std::function&&, Value&&)>&& fn) override final; + + virtual void onClose(std::function&& fn) override final; virtual void close() override final; const std::weak_ptr server; @@ -66,12 +72,14 @@ struct ServerChan const std::string name; enum { - Creating, - Active, - Destroy, + Creating, // CREATE_CHANNEL request received, reply not sent + Active, // reply sent + Destroy, // DESTROY_CHANNEL request received and/or reply sent } state; - std::shared_ptr handler; + std::function&&)> onOp; + std::function&&, Value&&)> onRPC; + std::function onClose; std::map > opByIOID; // our subset of ServerConn::opByIOID @@ -133,6 +141,8 @@ private: CASE(MESSAGE); #undef CASE + void handle_GPR(pva_app_msg_t cmd); + void cleanup(); void bevEvent(short events); void bevRead(); diff --git a/src/serverget.cpp b/src/serverget.cpp index c654d4d..4b59e04 100644 --- a/src/serverget.cpp +++ b/src/serverget.cpp @@ -15,147 +15,271 @@ DEFINE_LOGGER(connsetup, "tcp.setup"); DEFINE_LOGGER(connio, "tcp.io"); namespace { -struct ServerGet : public ServerOp + +// generalized Get/Put/RPC +struct ServerGPR : public ServerOp { - ServerGet(const std::shared_ptr& chan, uint32_t ioid) + ServerGPR(const std::shared_ptr& chan, uint32_t ioid) :ServerOp(chan, ioid) {} - virtual ~ServerGet() {} + virtual ~ServerGPR() {} + void doReply(const std::shared_ptr& type, + const Value& value, + const std::string& msg) + { + auto ch = chan.lock(); + if(!ch) + return; + auto conn = ch->conn.lock(); + if(!conn || !conn->bev) + return; + + if(state==Dead || state==Idle) { + // no warn if Idle as this may result from a remote Cancel + return; + } + + if(type && this->type) + throw std::logic_error("Operation already connected (has a type)"); + if(cmd!=CMD_PUT && this->type && (!value || Value::Helper::desc(value)!=this->type.get())) + throw std::logic_error("PUT Must reply with exact type previously passed to connect()"); + + Status sts{}; + if(!msg.empty()) + sts = Status::error(msg); + + { + (void)evbuffer_drain(conn->txBody.get(), evbuffer_get_length(conn->txBody.get())); + + EvOutBuf R(hostBE, conn->txBody.get()); + to_wire(R, uint32_t(ioid)); + to_wire(R, subcmd); + to_wire(R, sts); + + if(!sts.isSuccess()) { + // error() + + if(state==Executing) + state = Idle; + else // Creating + state = Dead; + + } else if(state==Creating) { + // connect() + if(cmd!=CMD_RPC) { + this->type = type; + to_wire(R, type.get()); + } + state = Idle; + + } else if(state==Executing) { + if(cmd==CMD_GET || (cmd==CMD_PUT && (subcmd&0x40))) { + to_wire_valid(R, value); // GET and PUT/Get reply with bitmask and partial value + + } else if(cmd==CMD_RPC) { + auto type = Value::Helper::desc(value); + to_wire(R, type); + if(value) + to_wire_full(R, value); + } + state = lastRequest ? Dead : Idle; + + } else { + assert(false); + } + assert(R.good()); + } + + conn->enqueueTxBody(cmd); + + if(state == ServerOp::Dead) { + ch->opByIOID.erase(ioid); + auto it = conn->opByIOID.find(ioid); + if(it!=conn->opByIOID.end()) { + auto self(it->second); + conn->opByIOID.erase(it); + + conn->iface->server->acceptor_loop.dispatch([self](){ + self->onClose(""); + }); + + } else { + assert(false); // really shouldn't happen + } + conn->opByIOID.erase(ioid); + } + } + + pva_app_msg_t cmd; + uint8_t subcmd; // valid when state==Executing or Creating bool lastRequest=false; - std::function onExec; + + std::shared_ptr type; + + std::function onClose; + std::function&&, Value&&)> onPut; + + std::function&&)> onGet; }; -struct ServerGetControl : public server::Get + +struct ServerGPRConnect : public server::ConnectOp { - ServerGetControl(ServerConn* conn, + ServerGPRConnect(ServerConn* conn, const std::weak_ptr& server, const std::string& name, const Value& request, - const std::weak_ptr& op) - :server::Get(conn->peerName, conn->iface->name, name, request) - ,server(server) + const std::weak_ptr& op) + :server(server) ,op(op) - {} - virtual ~ServerGetControl() { - error("Implict Cancel"); + { + _op = Info; + _name = name; + _peerName = conn->peerName; + _ifaceName = conn->iface->name; + } + virtual ~ServerGPRConnect() { + error("Op Create implied error"); } - virtual void connect(const Value& prototype, - std::function&& onExec) override final + virtual void connect(const Value& prototype) override final { - if(!onExec || !prototype) - throw std::logic_error("connect() requires prototype and onExec()"); - action(ServerOp::Creating, prototype, std::string(), std::move(onExec)); + auto serv = server.lock(); + if(!serv) + return; + serv->acceptor_loop.call([this, &prototype](){ + if(auto oper = op.lock()) { + if(oper->state!=ServerOp::Creating) + return; + + if(!prototype && oper->cmd!=CMD_RPC) + throw std::invalid_argument("Must provide prototype"); + + std::shared_ptr type; + if(prototype) + type = Value::Helper::type(prototype); + + oper->doReply(type, Value(), std::string()); + } + }); + } + virtual void error(const std::string& msg) override final + { + if(msg.empty()) + throw std::invalid_argument("Must provide error message"); + auto serv = server.lock(); + if(!serv) + return; + serv->acceptor_loop.call([this, &msg](){ + if(auto oper = op.lock()) { + if(oper->state==ServerOp::Creating) + oper->doReply(nullptr, Value(), msg); + } + }); } - virtual void reply(const Value& value) override final + virtual void onGet(std::function&&)>&& fn) override final { - if(!value) - throw std::logic_error("reply() requires Value"); - action(ServerOp::Executing, value, std::string(), nullptr); + auto serv = server.lock(); + if(!serv) + return; + serv->acceptor_loop.call([this, &fn](){ + if(auto oper = op.lock()) + oper->onGet = std::move(fn); + }); + } + virtual void onPut(std::function&&, Value&&)>&& fn) override final + { + auto serv = server.lock(); + if(!serv) + return; + serv->acceptor_loop.call([this, &fn](){ + if(auto oper = op.lock()) + oper->onPut = std::move(fn); + }); + } + virtual void onClose(std::function&& fn) override final + { + auto serv = server.lock(); + if(!serv) + return; + serv->acceptor_loop.call([this, &fn](){ + if(auto oper = op.lock()) + oper->onClose = std::move(fn); + }); + } + + const std::weak_ptr server; + const std::weak_ptr op; +}; + +struct ServerGPRExec : public server::ExecOp +{ + ServerGPRExec(ServerConn* conn, + const std::weak_ptr& server, + const std::string& name, + const Value& request, + const std::weak_ptr& op) + :server(server) + ,op(op) + { + _op = Info; + _name = name; + _peerName = conn->peerName; + _ifaceName = conn->iface->name; + } + virtual ~ServerGPRExec() {} + + virtual void reply() override final + { + reply(Value()); + } + + virtual void reply(const Value& val) override final + { + auto serv = server.lock(); + if(!serv) + return; + serv->acceptor_loop.call([this, &val](){ + if(auto oper = op.lock()) { + oper->doReply(nullptr, val, std::string()); + } + }); } virtual void error(const std::string& msg) override final { - action(ServerOp::Dead, Value(), msg, nullptr); - } - - void action(ServerOp::state_t action, - const Value& value, - const std::string& msg, - std::function&& onExec) - { - + if(msg.empty()) + throw std::invalid_argument("Must provide error message"); auto serv = server.lock(); if(!serv) return; - - serv->acceptor_loop.call([this, action, &value, &msg, &onExec](){ - auto oper = op.lock(); - if(!oper || oper->state == ServerOp::Dead) - return; - auto chan = oper->chan.lock(); - if(!chan) - return; - auto conn = chan->conn.lock(); - if(!conn || !conn->bev) - return; - - Status sts{}; - uint8_t cmd = oper->state==ServerOp::Creating ? 0x08 : oper->lastRequest ? 0x50 : 0x40; - - if(action==ServerOp::Dead) { - // error() - sts.code = Status::Error; - sts.msg = msg; - sts.trace = ""; // TODO - - if(oper->state == ServerOp::Executing) - oper->state = ServerOp::Idle; - else - oper->state = ServerOp::Dead; - log_printf(connsetup, PLVL_DEBUG, "CLient %s Get error\n", peerName.c_str()); - - } else if(oper->state == ServerOp::Creating && action==ServerOp::Creating) { - // connect() - type = Value::Helper::type(value); - oper->onExec = std::move(onExec); - oper->state = ServerOp::Idle; - log_printf(connsetup, PLVL_DEBUG, "CLient %s Get connected\n", peerName.c_str()); - - } else if(oper->state == ServerOp::Executing && action==ServerOp::Executing) { - // reply() - if(type.get()!=Value::Helper::desc(value)) - throw std::logic_error("Can't reply() w/ type change"); - log_printf(connsetup, PLVL_DEBUG, "CLient %s Get complete\n", peerName.c_str()); - - oper->state = oper->lastRequest ? ServerOp::Dead : ServerOp::Idle; - - } else { - log_printf(connsetup, PLVL_DEBUG, "Client %s Get operation not possible %d %d\n", - peerName.c_str(), action, oper->state); - return; - } - - { - (void)evbuffer_drain(conn->txBody.get(), evbuffer_get_length(conn->txBody.get())); - - EvOutBuf R(hostBE, conn->txBody.get()); - to_wire(R, uint32_t(oper->ioid)); - to_wire(R, cmd); - to_wire(R, sts); - if(sts.code!=Status::Ok) { - // error() - - } else if(action==ServerOp::Creating) { - // connect() - to_wire(R, type.get()); // type - - } else { - // reply() - to_wire_valid(R, value); - - } - } - - conn->enqueueTxBody(CMD_GET); - - if(oper->state == ServerOp::Dead) { - conn->opByIOID.erase(oper->ioid); - chan->opByIOID.erase(oper->ioid); + serv->acceptor_loop.call([this, &msg](){ + if(auto oper = op.lock()) { + oper->doReply(nullptr, Value(), msg); } }); + } + virtual void onCancel(std::function&& fn) override final + { + auto serv = server.lock(); + if(!serv) + return; + serv->acceptor_loop.call([this, &fn](){ + if(auto oper = op.lock()) + oper->onCancel = std::move(fn); + }); } const std::weak_ptr server; - const std::weak_ptr op; - std::shared_ptr type; + const std::weak_ptr op; }; + } // namespace -void ServerConn::handle_GET() +void ServerConn::handle_GPR(pva_app_msg_t cmd) { EvInBuf M(peerBE, segBuf.get(), 16); @@ -168,13 +292,21 @@ void ServerConn::handle_GET() Status reply{}; - if(subcmd&0x08) { // INIT - Value pvRequest; + // subcmd bitmask + // 0x08 - Init + // 0x10 - Destroy + // 0x40 - Get + // 0x00 - context dependent. for CMD_GET the same as 0x40, for CMD_PUT and CMD_RPC the opposite of Get + bool isput = cmd!=CMD_GET && !(subcmd&0x40); + if(subcmd&0x08) { // INIT + // type and full value + Value pvRequest; from_wire_type_value(M, rxRegistry, pvRequest); if(!M.good()) { - log_printf(connio, PLVL_DEBUG, "Client %s\n Invalid Get INIT\n", peerName.c_str()); + log_printf(connio, PLVL_DEBUG, "Client %s\n Invalid op=%x/%x INIT\n", + peerName.c_str(), cmd, subcmd); bev.reset(); return; } @@ -187,9 +319,11 @@ void ServerConn::handle_GET() return; } - std::shared_ptr op(new ServerGet(chan, ioid)); - std::unique_ptr ctrl(new ServerGetControl(this, iface->server->internal_self, chan->name, pvRequest, op)); + std::shared_ptr op(new ServerGPR(chan, ioid)); + op->cmd = cmd; + std::unique_ptr ctrl(new ServerGPRConnect(this, iface->server->internal_self, chan->name, pvRequest, op)); + op->subcmd = subcmd; op->state = ServerOp::Creating; opByIOID[ioid] = op; @@ -199,35 +333,94 @@ void ServerConn::handle_GET() peerName.c_str(), unsigned(ioid), std::string(SB()<handler) - chan->handler->onGet(std::move(ctrl)); + if(cmd==CMD_RPC) { + ctrl->connect(Value()); - } else { // EXEC should be 0x40 however, some clients are lax - // no additional message fields - if(!M.good()) { - log_printf(connio, PLVL_DEBUG, "Client %s\n Invalid Get EXEC\n", peerName.c_str()); - bev.reset(); - return; + } else if(chan->onOp) { // GET, PUT + chan->onOp(std::move(ctrl)); } - std::shared_ptr op; + } else { // EXEC, maybe Get or Put + + std::shared_ptr op; auto it = opByIOID.find(ioid); - if(it==opByIOID.end() || !(op=std::dynamic_pointer_cast(it->second))) { + if(it==opByIOID.end() || !(op=std::dynamic_pointer_cast(it->second))) { log_printf(connio, PLVL_ERR, "Client %s Gets %s IOID %u\n", peerName.c_str(), it==opByIOID.end() ? "non-existant" : "invalid", unsigned(ioid)); bev.reset(); return; } - op->lastRequest = subcmd&0x10; + if(cmd!=CMD_RPC && !op->type) { + log_printf(connsetup, PLVL_ERR, "Client %s tries to Exec to early\n", peerName.c_str()); + bev.reset(); + return; + } + + Value val; + if(cmd==CMD_RPC) { + // type and full value + from_wire_type_value(M, rxRegistry, val); + + } else if(isput) { + // bitmask and partial value + val = Value::Helper::build(op->type); + from_wire_valid(M, rxRegistry, val); + } + + if(!M.good()) { + log_printf(connio, PLVL_DEBUG, "Client %s\n Invalid op=%x/%x Get\n", + peerName.c_str(), cmd, subcmd); + bev.reset(); + return; + } + + auto chan = op->chan.lock(); + if(!chan) + throw std::logic_error("live op on dead channel"); if(op->state==ServerOp::Idle) { // all set + if(!op->lastRequest) + op->lastRequest = subcmd&0x10; + + std::unique_ptr ctrl{new ServerGPRExec(this, iface->server->internal_self, chan->name, val, op)}; + + op->subcmd = subcmd; op->state = ServerOp::Executing; log_printf(connsetup, PLVL_DEBUG, "CLient %s Get executing\n", peerName.c_str()); - op->onExec(); // notify + + try { + if(cmd==CMD_RPC && isput) { + if(chan->onRPC) + chan->onRPC(std::move(ctrl), std::move(val)); + else + ctrl->error("RPC Not Implemented"); + + } else if(cmd==CMD_PUT && isput) { + if(op->onPut) + op->onPut(std::move(ctrl), std::move(val)); + else + ctrl->error("PUT Not Implemented"); + + } else if(cmd!=CMD_RPC && !isput) { + if(op->onGet) + op->onGet(std::move(ctrl)); + else + ctrl->error("GET Not Implemented"); + + } else { + log_printf(connsetup, PLVL_ERR, "Client %s Get exec in incorrect command %d\n", + peerName.c_str(), subcmd); + } + } catch(std::exception& e) { + log_printf(connsetup, PLVL_ERR, "Client %s Unhandled exception in onGet/Put/RPC %s : %s\n", + peerName.c_str(), typeid(e).name(), e.what()); + if(ctrl) + ctrl->error(e.what()); + } } else { log_printf(connsetup, PLVL_ERR, "CLient %s Get exec in incorrect state %d\n", @@ -237,4 +430,19 @@ void ServerConn::handle_GET() } +void ServerConn::handle_GET() +{ + handle_GPR(CMD_GET); +} + +void ServerConn::handle_PUT() +{ + handle_GPR(CMD_PUT); +} + +void ServerConn::handle_RPC() +{ + handle_GPR(CMD_RPC); +} + }} // namespace pvxs::impl diff --git a/src/serverintrospect.cpp b/src/serverintrospect.cpp index 14ef199..fefc612 100644 --- a/src/serverintrospect.cpp +++ b/src/serverintrospect.cpp @@ -20,23 +20,54 @@ struct ServerIntrospect : public ServerOp :ServerOp(chan, ioid) {} virtual ~ServerIntrospect() {} + + void doReply(const FieldDesc* type, const Status& sts) + { + if(state != ServerOp::Executing) + return; + auto ch = chan.lock(); + if(!ch) + return; + auto conn = ch->conn.lock(); + if(!conn || !conn->bev) + return; + + { + (void)evbuffer_drain(conn->txBody.get(), evbuffer_get_length(conn->txBody.get())); + + EvOutBuf R(hostBE, conn->txBody.get()); + to_wire(R, uint32_t(ioid)); + to_wire(R, sts); + if(type) + to_wire(R, type); + } + + conn->enqueueTxBody(CMD_GET_FIELD); + + state = ServerOp::Dead; + conn->opByIOID.erase(ioid); + ch->opByIOID.erase(ioid); + } }; -struct ServerIntrospectControl : public server::Introspect +struct ServerIntrospectControl : public server::ConnectOp { - ServerIntrospectControl(ServerConn* conn, + ServerIntrospectControl(ServerConn *conn, ServerChan *chan, const std::weak_ptr& server, - const std::string& name, const std::weak_ptr& op) - :server::Introspect(conn->peerName, conn->iface->name, name) - ,server(server) + :server(server) ,op(op) - {} + { + _op = Info; + _name = chan->name; + _peerName = conn->peerName; + _ifaceName = conn->iface->name; + } virtual ~ServerIntrospectControl() { error("Implict Cancel"); } - virtual void reply(const Value& prototype) override final + virtual void connect(const Value& prototype) override final { auto desc = Value::Helper::desc(prototype); if(!desc) @@ -58,34 +89,27 @@ struct ServerIntrospectControl : public server::Introspect return; // soft fail if already completed, cancelled, disconnected, .... serv->acceptor_loop.call([this, type, &sts](){ - auto oper = op.lock(); - if(!oper || oper->state != ServerOp::Executing) - return; - auto chan = oper->chan.lock(); - if(!chan) - return; - auto conn = chan->conn.lock(); - if(!conn || !conn->bev) - return; - - { - (void)evbuffer_drain(conn->txBody.get(), evbuffer_get_length(conn->txBody.get())); - - EvOutBuf R(hostBE, conn->txBody.get()); - to_wire(R, uint32_t(oper->ioid)); - to_wire(R, sts); - if(type) - to_wire(R, type); - } - - conn->enqueueTxBody(CMD_GET_FIELD); - - oper->state = ServerOp::Dead; - conn->opByIOID.erase(oper->ioid); - chan->opByIOID.erase(oper->ioid); + if(auto oper = op.lock()) + oper->doReply(type, sts); }); } + virtual void onClose(std::function&& fn) override final + { + auto serv = server.lock(); + if(!serv) + return; + serv->acceptor_loop.call([this, &fn](){ + if(auto oper = op.lock()) + if(auto chan = oper->chan.lock()) + chan->onClose = std::move(fn); + }); + } + + // we'll never use these, so no reason to store + virtual void onGet(std::function&& fn)>&& fn) override final {} + virtual void onPut(std::function&& fn, Value&&)>&& fn) override final {} + const std::weak_ptr server; const std::weak_ptr op; }; @@ -114,15 +138,15 @@ void ServerConn::handle_GET_FIELD() } std::shared_ptr op(new ServerIntrospect(chan, ioid)); - std::unique_ptr ctrl(new ServerIntrospectControl(this, iface->server->internal_self, chan->name, op)); + std::unique_ptr ctrl(new ServerIntrospectControl(this, chan.get(), iface->server->internal_self, op)); op->state = ServerOp::Executing; // this is a one-shot operation opByIOID[ioid] = op; chan->opByIOID[ioid] = op; - if(chan->handler) - chan->handler->onIntrospect(std::move(ctrl)); + if(chan->onOp) + chan->onOp(std::move(ctrl)); } }} // namespace pvxs::impl diff --git a/test/dummyserv.cpp b/test/dummyserv.cpp index 274cc12..20e968b 100644 --- a/test/dummyserv.cpp +++ b/test/dummyserv.cpp @@ -11,6 +11,8 @@ #include #include +#include +#include #include #include @@ -18,6 +20,7 @@ #include #include +#include "utilpvt.h" namespace { using namespace pvxs; @@ -25,50 +28,22 @@ using namespace pvxs::server; DEFINE_LOGGER(dummy,"dummyserv"); -struct DummyHandler : public Handler +struct DummySource : public Source { - static const Value mytype; - static std::atomic count; - + const std::string name; + epicsMutex lock; Value current; - DummyHandler() - :current(mytype.cloneEmpty()) + explicit DummySource(const std::string& name) + :name(name) + ,current(nt::NTScalar{TypeCode::Int32}.build().create()) { epicsTimeStamp now; epicsTimeGetCurrent(&now); - current["value"] = count.fetch_add(1); + current["value"] = 0u; current["timeStamp.secondsPastEpoch"] = now.secPastEpoch+POSIX_TIME_AT_EPICS_EPOCH; current["timeStamp.nanoseconds"] = now.nsec; } - virtual ~DummyHandler() {} - - virtual void onIntrospect(std::unique_ptr &&op) override final - { - log_printf(dummy, PLVL_INFO, "GET_FIELD\n"); - op->reply(mytype); - } - - virtual void onGet(std::unique_ptr&& op) override final - { - log_printf(dummy, PLVL_INFO, "Create GET\n"); - std::shared_ptr sop(std::move(op)); - sop->connect(current, [this, sop]() - { - log_printf(dummy, PLVL_INFO, "Execute GET\n"); - // executing - sop->reply(current); // "current" never changes for us, so no locking - }); - } -}; - - -const Value DummyHandler::mytype = nt::NTScalar{TypeCode::Int32}.build().create(); -std::atomic DummyHandler::count{}; - -struct DummySource : public Source -{ - std::set names; virtual ~DummySource() {} // Source interface @@ -76,7 +51,7 @@ public: virtual void onSearch(Search &search) override final { for(auto& op : search) { - if(names.find(op.name())!=names.end()) { + if(op.name()==name) { log_printf(dummy, PLVL_INFO, "Claiming '%s'\n", op.name()); op.claim(); } else { @@ -84,10 +59,63 @@ public: } } } - virtual void onCreate(std::unique_ptr&& op) override final + virtual void onCreate(std::unique_ptr&& raw) override final { - log_printf(dummy, PLVL_INFO, "Create '%s'\n", op->name.c_str()); - op->setHandler(std::unique_ptr{new DummyHandler}); + if(raw->name()!=name) + return; + + std::shared_ptr chan(std::move(raw)); + + log_printf(dummy, PLVL_INFO, "Create '%s'\n", chan->name().c_str()); + + // callback when client creating Get/Put + chan->onOp([this, chan](std::shared_ptr&& raw){ + std::shared_ptr conn(std::move(raw)); + + log_printf(dummy, PLVL_INFO, "Begin Operation on '%s'\n", chan->name().c_str()); + + conn->onGet([this, chan](std::unique_ptr&& raw) { + // client executing Get or Put + log_printf(dummy, PLVL_INFO, "Exec Get on '%s'\n", chan->name().c_str()); + + { + epicsGuard G(lock); + raw->reply(current); + } + }); + + conn->onPut([this, chan](std::unique_ptr&& raw, Value&& top) { + log_printf(dummy, PLVL_INFO, "Exec Put on '%s'\n", chan->name().c_str()); + + { + epicsTimeStamp now; + epicsTimeGetCurrent(&now); + + epicsGuard G(lock); + + current["value"] = top["value"].as(); + current["timeStamp.secondsPastEpoch"] = now.secPastEpoch+POSIX_TIME_AT_EPICS_EPOCH; + current["timeStamp.nanoseconds"] = now.nsec; + } + + raw->reply(); // inform client that Put was successful + }); + + epicsGuard G(lock); + conn->connect(current); // only type is used + }); + + // callback when client executing RPC + chan->onRPC([this, chan](std::unique_ptr&& raw, Value&& top) { + log_printf(dummy, PLVL_INFO, "Begin RPC on '%s' with %s\n", chan->name().c_str(), + std::string(SB()<reply(ret); + }); } }; @@ -97,11 +125,10 @@ int main(int argc, char *argv[]) { int ret = 0; try { - pvxs::logger_level_set("dummy", PLVL_INFO); + pvxs::logger_level_set("dummyserv", PLVL_INFO); pvxs::logger_config_env(); - std::shared_ptr src(new DummySource); - src->names.emplace("blah"); + auto src = std::make_shared("blah"); auto serv = Server::Config::from_env() .build()