Introspect can error

This commit is contained in:
Michael Davidsaver
2019-11-19 15:51:36 -08:00
parent f063bd26f5
commit 9ceab63d02
7 changed files with 261 additions and 52 deletions
+1
View File
@@ -48,6 +48,7 @@ LIB_SRCS += udp_collector.cpp
LIB_SRCS += server.cpp
LIB_SRCS += serverconn.cpp
LIB_SRCS += serverchan.cpp
LIB_SRCS += serverintrospect.cpp
LIB_LIBS += Com
+5
View File
@@ -261,6 +261,11 @@ struct PVXS_API Introspect : public OpBase
virtual void error(const std::string& msg) =0;
// void success(Data);
Introspect(const std::string& peerName,
const std::string& iface,
const std::string& name)
:OpBase (peerName, iface, name)
{}
virtual ~Introspect() =0;
};
+37 -26
View File
@@ -287,36 +287,47 @@ void ServerConn::handle_DestroyChan()
from_wire(M, sid);
from_wire(M, cid);
if(!M.good())
throw std::runtime_error("Decode error in DestroyChan");
auto it = chanBySID.find(sid);
if(M.good() && it!=chanBySID.end()) {
{
auto& chan = it->second;
if(chan->cid!=cid) {
log_printf(connsetup, PLVL_DEBUG, "Client %s provides incorrect CID with DestroyChan sid=%d cid=%d!=%d '%s'\n", peerName.c_str(),
unsigned(sid), unsigned(chan->cid), unsigned(cid), chan->name.c_str());
}
}
auto n = chanByCID.erase(cid);
assert(n==1);
chanBySID.erase(it);
assert(it->second.use_count()==1); // we only take transient refs on this thread
// ServerChannel is delete'd
{
auto tx = bufferevent_get_output(bev.get());
constexpr bool be = EPICS_BYTE_ORDER==EPICS_ENDIAN_BIG;
EvOutBuf R(be, tx);
to_wire(R, Header{pva_app_msg::DestroyChan, pva_flags::Server, 8});
to_wire(R, sid);
to_wire(R, cid);
}
} else {
if(it==chanBySID.end()) {
log_printf(connsetup, PLVL_DEBUG, "Client %s DestroyChan non-existant sid=%d cid=%d\n", peerName.c_str(),
unsigned(sid), unsigned(cid));
return;
}
auto chan = it->second;
if(chan->cid!=cid) {
log_printf(connsetup, PLVL_DEBUG, "Client %s provides incorrect CID with DestroyChan sid=%d cid=%d!=%d '%s'\n", peerName.c_str(),
unsigned(sid), unsigned(chan->cid), unsigned(cid), chan->name.c_str());
}
{
auto n = chanByCID.erase(cid);
assert(n==1);
}
chanBySID.erase(it);
// cleanup operations
for(auto& pair : chan->opByIOID) {
auto n = opByIOID.erase(pair.second->ioid);
assert(n==1);
}
assert(chan.use_count()==1); // we only take transient refs on this thread
// ServerChannel is delete'd
{
auto tx = bufferevent_get_output(bev.get());
constexpr bool be = EPICS_BYTE_ORDER==EPICS_ENDIAN_BIG;
EvOutBuf R(be, tx);
to_wire(R, Header{pva_app_msg::DestroyChan, pva_flags::Server, 8});
to_wire(R, sid);
to_wire(R, cid);
}
if(!M.good())
+82 -21
View File
@@ -27,6 +27,8 @@ DEFINE_LOGGER(connsetup, "tcp.setup");
// related to low level send/recv
DEFINE_LOGGER(connio, "tcp.io");
DEFINE_LOGGER(remote, "tcp.log");
ServerConn::ServerConn(ServIface* iface, evutil_socket_t sock, struct sockaddr *peer, int socklen)
:iface(iface)
,peerAddr(peer, socklen)
@@ -90,6 +92,13 @@ ServerConn::ServerConn(ServIface* iface, evutil_socket_t sock, struct sockaddr *
ServerConn::~ServerConn()
{}
const std::shared_ptr<ServerChan>& ServerConn::lookupSID(uint32_t sid)
{
auto it = chanBySID.find(sid);
if(it==chanBySID.end())
throw std::runtime_error(SB()<<"Client "<<peerName<<" non-existant SID "<<sid);
return it->second;
}
void ServerConn::handle_Echo()
{
@@ -184,39 +193,89 @@ void ServerConn::handle_PutGetOp()
{}
void ServerConn::handle_CancelOp()
{}
void ServerConn::handle_DestroyOp()
{}
void ServerConn::handle_Introspect()
{
// aka. GetField
EvInBuf M(peerBE, segBuf.get(), 16);
uint32_t sid = -1, ioid = -1;
std::string subfield;
uint32_t sid=0, ioid=0;
from_wire(M, sid);
from_wire(M, ioid);
from_wire(M, subfield);
Status sts{Status::Ok};
if(!M.good())
throw std::runtime_error("Error decoding DestroyOp");
auto it = chanBySID.find(sid);
if(M.good() && it!=chanBySID.end() && opByIOID.find(ioid)==opByIOID.end()) {
} else {
log_printf(connio, PLVL_DEBUG, "Client %s invalid GetField\n", peerName.c_str());
auto it = opByIOID.find(ioid);
if(it==opByIOID.end()) {
log_printf(connsetup, PLVL_WARN, "Client %s Cancel of non-existant Op\n", peerName.c_str());
return;
}
const auto& op = it->second;
auto chan = op->chan.lock();
if(!chan || chan->sid!=sid) {
log_printf(connsetup, PLVL_ERR, "Client %s Cancel inconsistent Op\n", peerName.c_str());
return;
}
if(op->state==ServerOp::Executing) {
op->state = ServerOp::Idle;
} else {
log_printf(connsetup, PLVL_WARN, "Client %s Cancel of non-executing Op\n", peerName.c_str());
}
}
void ServerConn::handle_DestroyOp()
{
EvInBuf M(peerBE, segBuf.get(), 16);
uint32_t sid=0, ioid=0;
from_wire(M, sid);
from_wire(M, ioid);
if(!M.good())
bev.reset();
throw std::runtime_error("Error decoding DestroyOp");
auto& chan = lookupSID(sid);
auto n = opByIOID.erase(ioid);
n += chan->opByIOID.erase(ioid);
if(n!=2) {
log_printf(connsetup, PLVL_WARN, "Client %s can't destroy non-existant op %u:%u\n",
peerName.c_str(), unsigned(sid), unsigned(ioid));
}
}
void ServerConn::handle_Message()
{}
{
EvInBuf M(peerBE, segBuf.get(), 16);
uint32_t ioid = 0;
uint8_t mtype = 0;
std::string msg;
from_wire(M, ioid);
from_wire(M, mtype);
from_wire(M, msg);
if(!M.good())
throw std::runtime_error("Decode error for Message");
auto it = opByIOID.find(ioid);
if(it==opByIOID.end()) {
log_printf(connsetup, PLVL_DEBUG, "Client %s Message on non-existant ioid\n", peerName.c_str());
return;
}
auto chan = it->second->chan.lock();
switch(mtype) {
case 0: mtype = PLVL_INFO;
case 1: mtype = PLVL_WARN;
case 2: mtype = PLVL_ERR;
default:mtype = PLVL_CRIT;
}
log_printf(remote, mtype, "Client %s Channel %s Remote message: %s\n",
peerName.c_str(), chan ? "<dead>" : chan->name.c_str(),
msg.c_str());
}
void ServerConn::cleanup()
@@ -459,4 +518,6 @@ void ServIface::onConnS(struct evconnlistener *listener, evutil_socket_t sock, s
ServerOp::~ServerOp() {}
void ServerOp::cancel() {}
}} // namespace pvxs::impl
+12 -5
View File
@@ -27,23 +27,26 @@ struct ServerChan;
struct ServerOp
{
ServerChan* const chan;
const std::weak_ptr<ServerChan> chan;
const uint32_t ioid;
enum state_t {
Creating,
Idle,
Active,
Executing,
Dead,
} state;
constexpr ServerOp(ServerChan *chan, uint32_t ioid) :chan(chan), ioid(ioid), state(Idle) {}
ServerOp(const std::weak_ptr<ServerChan>& chan, uint32_t ioid) :chan(chan), ioid(ioid), state(Idle) {}
virtual ~ServerOp() =0;
virtual void cancel();
};
struct ServerChannelControl : public server::ChannelControl
{
explicit ServerChannelControl(const std::shared_ptr<ServerConn>& conn, const std::shared_ptr<ServerChan>& chan);
ServerChannelControl(const std::shared_ptr<ServerConn>& conn, const std::shared_ptr<ServerChan>& chan);
virtual ~ServerChannelControl();
virtual std::shared_ptr<server::Handler> setHandler(const std::shared_ptr<server::Handler> &h) override final;
@@ -68,6 +71,8 @@ struct ServerChan
std::shared_ptr<server::Handler> handler;
std::map<uint32_t, std::shared_ptr<ServerOp> > opByIOID; // our subset of ServerConn::opByIOID
ServerChan(const std::shared_ptr<ServerConn>& conn, uint32_t sid, uint32_t cid, const std::string& name);
ServerChan(const ServerChan&) = delete;
ServerChan& operator=(const ServerChan&) = delete;
@@ -93,13 +98,15 @@ struct ServerConn : public std::enable_shared_from_this<ServerConn>
uint32_t nextSID;
std::map<uint32_t, std::shared_ptr<ServerChan> > chanBySID;
std::map<uint32_t, std::shared_ptr<ServerChan> > chanByCID;
std::map<uint32_t, std::unique_ptr<ServerOp> > opByIOID;
std::map<uint32_t, std::shared_ptr<ServerOp> > opByIOID;
ServerConn(ServIface* iface, evutil_socket_t sock, struct sockaddr *peer, int socklen);
ServerConn(const ServerConn&) = delete;
ServerConn& operator=(const ServerConn&) = delete;
~ServerConn();
const std::shared_ptr<ServerChan>& lookupSID(uint32_t sid);
private:
#define CASE(Op) void handle_##Op();
CASE(Echo);
+119
View File
@@ -0,0 +1,119 @@
/**
* Copyright - See the COPYRIGHT that is included with this distribution.
* pvxs is distributed subject to a Software License Agreement found
* in file LICENSE that is included with this distribution.
*/
#include <cassert>
#include <pvxs/log.h>
#include "serverconn.h"
namespace pvxs { namespace impl {
DEFINE_LOGGER(connsetup, "tcp.setup");
namespace {
struct ServerIntrospect : public ServerOp
{
ServerIntrospect(const std::shared_ptr<ServerChan>& chan, uint32_t ioid)
:ServerOp(chan, ioid)
{}
virtual ~ServerIntrospect() {}
};
struct ServerIntrospectControl : public server::Introspect
{
ServerIntrospectControl(ServerConn* conn,
const std::weak_ptr<server::Server::Pvt>& server,
const std::string& name,
const std::weak_ptr<ServerIntrospect>& op)
:server::Introspect(conn->peerName, conn->iface->name, name)
,server(server)
,op(op)
{}
virtual ~ServerIntrospectControl() {
error("Implict Cancel");
}
virtual void error(const std::string &msg) override final
{
Status sts{Status::Error, msg};
auto serv = server.lock();
if(!serv)
return; // soft fail if already completed, cancelled, disconnected, ....
serv->acceptor_loop.call([this, &sts](){
auto oper = op.lock();
if(!oper || oper->state != ServerOp::Executing)
return;
auto chan = oper->chan.lock();
if(!chan)
return;
auto conn = chan->conn.lock();
if(!conn || !conn->bev)
return;
const bool be = EPICS_BYTE_ORDER==EPICS_ENDIAN_BIG;
{
(void)evbuffer_drain(conn->txBody.get(), evbuffer_get_length(conn->txBody.get()));
EvOutBuf R(be, conn->txBody.get());
to_wire(R, uint32_t(oper->ioid));
to_wire(R, sts);
// would be FieldDesc payload if Ok or Warn
}
auto tx = bufferevent_get_output(conn->bev.get());
to_evbuf(tx, Header{pva_app_msg::Introspect,
pva_flags::Server,
uint32_t(evbuffer_get_length(conn->txBody.get()))},
be);
auto err = evbuffer_add_buffer(tx, conn->txBody.get());
assert(!err);
oper->state = ServerOp::Dead;
conn->opByIOID.erase(oper->ioid);
chan->opByIOID.erase(oper->ioid);
});
}
const std::weak_ptr<server::Server::Pvt> server;
const std::weak_ptr<ServerIntrospect> op;
};
} // namespace
void ServerConn::handle_Introspect()
{
// aka. GetField
EvInBuf M(peerBE, segBuf.get(), 16);
uint32_t sid = -1, ioid = -1;
std::string subfield;
from_wire(M, sid);
from_wire(M, ioid);
from_wire(M, subfield);
if(!M.good())
throw std::runtime_error("Error decoding Introspect");
auto& chan = lookupSID(sid);
if(opByIOID.find(ioid)!=opByIOID.end()) {
log_printf(connsetup, PLVL_ERR, "Client %s reuses existing ioid %d\n", peerName.c_str(), unsigned(ioid));
return;
}
std::shared_ptr<ServerIntrospect> op(new ServerIntrospect(chan, ioid));
std::unique_ptr<ServerIntrospectControl> ctrl(new ServerIntrospectControl(this, iface->server->internal_self, chan->name, op));
op->state = ServerOp::Executing; // this is a one-shot operation
opByIOID[ioid] = op;
chan->opByIOID[ioid] = op;
if(chan->handler)
chan->handler->onIntrospect(std::move(ctrl));
}
}} // namespace pvxs::impl
+5
View File
@@ -22,7 +22,12 @@ DEFINE_LOGGER(dummy,"dummyserv");
struct DummyHandler : public Handler
{
virtual ~DummyHandler() {}
virtual void onIntrospect(std::unique_ptr<Introspect> &&op) override final
{
op->error("Got nothing");
}
};
struct DummySource : public Source