From 9ceab63d0282258ffdda86cdd1c4c915ff98ea18 Mon Sep 17 00:00:00 2001 From: Michael Davidsaver Date: Tue, 19 Nov 2019 15:51:36 -0800 Subject: [PATCH] Introspect can error --- src/Makefile | 1 + src/pvxs/server.h | 5 ++ src/serverchan.cpp | 63 ++++++++++++--------- src/serverconn.cpp | 103 ++++++++++++++++++++++++++------- src/serverconn.h | 17 ++++-- src/serverintrospect.cpp | 119 +++++++++++++++++++++++++++++++++++++++ test/dummyserv.cpp | 5 ++ 7 files changed, 261 insertions(+), 52 deletions(-) create mode 100644 src/serverintrospect.cpp diff --git a/src/Makefile b/src/Makefile index e65bdca..1cc1bd2 100644 --- a/src/Makefile +++ b/src/Makefile @@ -48,6 +48,7 @@ LIB_SRCS += udp_collector.cpp LIB_SRCS += server.cpp LIB_SRCS += serverconn.cpp LIB_SRCS += serverchan.cpp +LIB_SRCS += serverintrospect.cpp LIB_LIBS += Com diff --git a/src/pvxs/server.h b/src/pvxs/server.h index b7a7578..07085a2 100644 --- a/src/pvxs/server.h +++ b/src/pvxs/server.h @@ -261,6 +261,11 @@ struct PVXS_API Introspect : public OpBase virtual void error(const std::string& msg) =0; // void success(Data); + Introspect(const std::string& peerName, + const std::string& iface, + const std::string& name) + :OpBase (peerName, iface, name) + {} virtual ~Introspect() =0; }; diff --git a/src/serverchan.cpp b/src/serverchan.cpp index 53dcacd..92ae897 100644 --- a/src/serverchan.cpp +++ b/src/serverchan.cpp @@ -287,36 +287,47 @@ void ServerConn::handle_DestroyChan() from_wire(M, sid); from_wire(M, cid); + if(!M.good()) + throw std::runtime_error("Decode error in DestroyChan"); auto it = chanBySID.find(sid); - if(M.good() && it!=chanBySID.end()) { - { - auto& chan = it->second; - if(chan->cid!=cid) { - log_printf(connsetup, PLVL_DEBUG, "Client %s provides incorrect CID with DestroyChan sid=%d cid=%d!=%d '%s'\n", peerName.c_str(), - unsigned(sid), unsigned(chan->cid), unsigned(cid), chan->name.c_str()); - } - } - - auto n = chanByCID.erase(cid); - assert(n==1); - - chanBySID.erase(it); - assert(it->second.use_count()==1); // we only take transient refs on this thread - // ServerChannel is delete'd - - { - auto tx = bufferevent_get_output(bev.get()); - constexpr bool be = EPICS_BYTE_ORDER==EPICS_ENDIAN_BIG; - EvOutBuf R(be, tx); - to_wire(R, Header{pva_app_msg::DestroyChan, pva_flags::Server, 8}); - to_wire(R, sid); - to_wire(R, cid); - } - - } else { + if(it==chanBySID.end()) { log_printf(connsetup, PLVL_DEBUG, "Client %s DestroyChan non-existant sid=%d cid=%d\n", peerName.c_str(), unsigned(sid), unsigned(cid)); + return; + } + + + auto chan = it->second; + if(chan->cid!=cid) { + log_printf(connsetup, PLVL_DEBUG, "Client %s provides incorrect CID with DestroyChan sid=%d cid=%d!=%d '%s'\n", peerName.c_str(), + unsigned(sid), unsigned(chan->cid), unsigned(cid), chan->name.c_str()); + } + + { + auto n = chanByCID.erase(cid); + assert(n==1); + } + + chanBySID.erase(it); + + // cleanup operations + + for(auto& pair : chan->opByIOID) { + auto n = opByIOID.erase(pair.second->ioid); + assert(n==1); + } + + assert(chan.use_count()==1); // we only take transient refs on this thread + // ServerChannel is delete'd + + { + auto tx = bufferevent_get_output(bev.get()); + constexpr bool be = EPICS_BYTE_ORDER==EPICS_ENDIAN_BIG; + EvOutBuf R(be, tx); + to_wire(R, Header{pva_app_msg::DestroyChan, pva_flags::Server, 8}); + to_wire(R, sid); + to_wire(R, cid); } if(!M.good()) diff --git a/src/serverconn.cpp b/src/serverconn.cpp index d29c954..accd094 100644 --- a/src/serverconn.cpp +++ b/src/serverconn.cpp @@ -27,6 +27,8 @@ DEFINE_LOGGER(connsetup, "tcp.setup"); // related to low level send/recv DEFINE_LOGGER(connio, "tcp.io"); +DEFINE_LOGGER(remote, "tcp.log"); + ServerConn::ServerConn(ServIface* iface, evutil_socket_t sock, struct sockaddr *peer, int socklen) :iface(iface) ,peerAddr(peer, socklen) @@ -90,6 +92,13 @@ ServerConn::ServerConn(ServIface* iface, evutil_socket_t sock, struct sockaddr * ServerConn::~ServerConn() {} +const std::shared_ptr& ServerConn::lookupSID(uint32_t sid) +{ + auto it = chanBySID.find(sid); + if(it==chanBySID.end()) + throw std::runtime_error(SB()<<"Client "<second; +} void ServerConn::handle_Echo() { @@ -184,39 +193,89 @@ void ServerConn::handle_PutGetOp() {} void ServerConn::handle_CancelOp() -{} - -void ServerConn::handle_DestroyOp() -{} - -void ServerConn::handle_Introspect() { - // aka. GetField - EvInBuf M(peerBE, segBuf.get(), 16); - uint32_t sid = -1, ioid = -1; - std::string subfield; - + uint32_t sid=0, ioid=0; from_wire(M, sid); from_wire(M, ioid); - from_wire(M, subfield); - Status sts{Status::Ok}; + if(!M.good()) + throw std::runtime_error("Error decoding DestroyOp"); - auto it = chanBySID.find(sid); - - if(M.good() && it!=chanBySID.end() && opByIOID.find(ioid)==opByIOID.end()) { - - } else { - log_printf(connio, PLVL_DEBUG, "Client %s invalid GetField\n", peerName.c_str()); + auto it = opByIOID.find(ioid); + if(it==opByIOID.end()) { + log_printf(connsetup, PLVL_WARN, "Client %s Cancel of non-existant Op\n", peerName.c_str()); + return; } + const auto& op = it->second; + auto chan = op->chan.lock(); + if(!chan || chan->sid!=sid) { + log_printf(connsetup, PLVL_ERR, "Client %s Cancel inconsistent Op\n", peerName.c_str()); + return; + } + + if(op->state==ServerOp::Executing) { + op->state = ServerOp::Idle; + + } else { + log_printf(connsetup, PLVL_WARN, "Client %s Cancel of non-executing Op\n", peerName.c_str()); + } +} + +void ServerConn::handle_DestroyOp() +{ + EvInBuf M(peerBE, segBuf.get(), 16); + + uint32_t sid=0, ioid=0; + from_wire(M, sid); + from_wire(M, ioid); if(!M.good()) - bev.reset(); + throw std::runtime_error("Error decoding DestroyOp"); + + auto& chan = lookupSID(sid); + + auto n = opByIOID.erase(ioid); + n += chan->opByIOID.erase(ioid); + if(n!=2) { + log_printf(connsetup, PLVL_WARN, "Client %s can't destroy non-existant op %u:%u\n", + peerName.c_str(), unsigned(sid), unsigned(ioid)); + } } void ServerConn::handle_Message() -{} +{ + EvInBuf M(peerBE, segBuf.get(), 16); + + uint32_t ioid = 0; + uint8_t mtype = 0; + std::string msg; + + from_wire(M, ioid); + from_wire(M, mtype); + from_wire(M, msg); + + if(!M.good()) + throw std::runtime_error("Decode error for Message"); + + auto it = opByIOID.find(ioid); + if(it==opByIOID.end()) { + log_printf(connsetup, PLVL_DEBUG, "Client %s Message on non-existant ioid\n", peerName.c_str()); + return; + } + auto chan = it->second->chan.lock(); + + switch(mtype) { + case 0: mtype = PLVL_INFO; + case 1: mtype = PLVL_WARN; + case 2: mtype = PLVL_ERR; + default:mtype = PLVL_CRIT; + } + + log_printf(remote, mtype, "Client %s Channel %s Remote message: %s\n", + peerName.c_str(), chan ? "" : chan->name.c_str(), + msg.c_str()); +} void ServerConn::cleanup() @@ -459,4 +518,6 @@ void ServIface::onConnS(struct evconnlistener *listener, evutil_socket_t sock, s ServerOp::~ServerOp() {} +void ServerOp::cancel() {} + }} // namespace pvxs::impl diff --git a/src/serverconn.h b/src/serverconn.h index ff9866d..552564a 100644 --- a/src/serverconn.h +++ b/src/serverconn.h @@ -27,23 +27,26 @@ struct ServerChan; struct ServerOp { - ServerChan* const chan; + const std::weak_ptr chan; const uint32_t ioid; enum state_t { + Creating, Idle, - Active, + Executing, Dead, } state; - constexpr ServerOp(ServerChan *chan, uint32_t ioid) :chan(chan), ioid(ioid), state(Idle) {} + ServerOp(const std::weak_ptr& chan, uint32_t ioid) :chan(chan), ioid(ioid), state(Idle) {} virtual ~ServerOp() =0; + + virtual void cancel(); }; struct ServerChannelControl : public server::ChannelControl { - explicit ServerChannelControl(const std::shared_ptr& conn, const std::shared_ptr& chan); + ServerChannelControl(const std::shared_ptr& conn, const std::shared_ptr& chan); virtual ~ServerChannelControl(); virtual std::shared_ptr setHandler(const std::shared_ptr &h) override final; @@ -68,6 +71,8 @@ struct ServerChan std::shared_ptr handler; + std::map > opByIOID; // our subset of ServerConn::opByIOID + ServerChan(const std::shared_ptr& conn, uint32_t sid, uint32_t cid, const std::string& name); ServerChan(const ServerChan&) = delete; ServerChan& operator=(const ServerChan&) = delete; @@ -93,13 +98,15 @@ struct ServerConn : public std::enable_shared_from_this uint32_t nextSID; std::map > chanBySID; std::map > chanByCID; - std::map > opByIOID; + std::map > opByIOID; ServerConn(ServIface* iface, evutil_socket_t sock, struct sockaddr *peer, int socklen); ServerConn(const ServerConn&) = delete; ServerConn& operator=(const ServerConn&) = delete; ~ServerConn(); + const std::shared_ptr& lookupSID(uint32_t sid); + private: #define CASE(Op) void handle_##Op(); CASE(Echo); diff --git a/src/serverintrospect.cpp b/src/serverintrospect.cpp new file mode 100644 index 0000000..577ecbe --- /dev/null +++ b/src/serverintrospect.cpp @@ -0,0 +1,119 @@ +/** + * Copyright - See the COPYRIGHT that is included with this distribution. + * pvxs is distributed subject to a Software License Agreement found + * in file LICENSE that is included with this distribution. + */ + +#include + +#include +#include "serverconn.h" + +namespace pvxs { namespace impl { +DEFINE_LOGGER(connsetup, "tcp.setup"); + +namespace { +struct ServerIntrospect : public ServerOp +{ + ServerIntrospect(const std::shared_ptr& chan, uint32_t ioid) + :ServerOp(chan, ioid) + {} + virtual ~ServerIntrospect() {} +}; + +struct ServerIntrospectControl : public server::Introspect +{ + ServerIntrospectControl(ServerConn* conn, + const std::weak_ptr& server, + const std::string& name, + const std::weak_ptr& op) + :server::Introspect(conn->peerName, conn->iface->name, name) + ,server(server) + ,op(op) + {} + virtual ~ServerIntrospectControl() { + error("Implict Cancel"); + } + + virtual void error(const std::string &msg) override final + { + Status sts{Status::Error, msg}; + auto serv = server.lock(); + if(!serv) + return; // soft fail if already completed, cancelled, disconnected, .... + + serv->acceptor_loop.call([this, &sts](){ + auto oper = op.lock(); + if(!oper || oper->state != ServerOp::Executing) + return; + auto chan = oper->chan.lock(); + if(!chan) + return; + auto conn = chan->conn.lock(); + if(!conn || !conn->bev) + return; + + const bool be = EPICS_BYTE_ORDER==EPICS_ENDIAN_BIG; + { + (void)evbuffer_drain(conn->txBody.get(), evbuffer_get_length(conn->txBody.get())); + + EvOutBuf R(be, conn->txBody.get()); + to_wire(R, uint32_t(oper->ioid)); + to_wire(R, sts); + // would be FieldDesc payload if Ok or Warn + } + + auto tx = bufferevent_get_output(conn->bev.get()); + to_evbuf(tx, Header{pva_app_msg::Introspect, + pva_flags::Server, + uint32_t(evbuffer_get_length(conn->txBody.get()))}, + be); + auto err = evbuffer_add_buffer(tx, conn->txBody.get()); + assert(!err); + + oper->state = ServerOp::Dead; + conn->opByIOID.erase(oper->ioid); + chan->opByIOID.erase(oper->ioid); + }); + } + + const std::weak_ptr server; + const std::weak_ptr op; +}; +} // namespace + +void ServerConn::handle_Introspect() +{ + // aka. GetField + + EvInBuf M(peerBE, segBuf.get(), 16); + + uint32_t sid = -1, ioid = -1; + std::string subfield; + + from_wire(M, sid); + from_wire(M, ioid); + from_wire(M, subfield); + if(!M.good()) + throw std::runtime_error("Error decoding Introspect"); + + auto& chan = lookupSID(sid); + + if(opByIOID.find(ioid)!=opByIOID.end()) { + log_printf(connsetup, PLVL_ERR, "Client %s reuses existing ioid %d\n", peerName.c_str(), unsigned(ioid)); + return; + } + + std::shared_ptr op(new ServerIntrospect(chan, ioid)); + std::unique_ptr ctrl(new ServerIntrospectControl(this, iface->server->internal_self, chan->name, op)); + + op->state = ServerOp::Executing; // this is a one-shot operation + + opByIOID[ioid] = op; + chan->opByIOID[ioid] = op; + + if(chan->handler) + chan->handler->onIntrospect(std::move(ctrl)); +} + +}} // namespace pvxs::impl diff --git a/test/dummyserv.cpp b/test/dummyserv.cpp index 93aebbc..38a55b7 100644 --- a/test/dummyserv.cpp +++ b/test/dummyserv.cpp @@ -22,7 +22,12 @@ DEFINE_LOGGER(dummy,"dummyserv"); struct DummyHandler : public Handler { + virtual ~DummyHandler() {} + virtual void onIntrospect(std::unique_ptr &&op) override final + { + op->error("Got nothing"); + } }; struct DummySource : public Source