generalize Get/Put/RPC handling

This commit is contained in:
Michael Davidsaver
2019-12-15 12:04:50 -08:00
parent 851182c7a1
commit 583ee684ab
10 changed files with 594 additions and 381 deletions
+5
View File
@@ -29,6 +29,11 @@ namespace impl {
void to_wire(Buffer& buf, const FieldDesc* cur)
{
if(!cur) {
to_wire(buf, uint8_t(0xff));
return;
}
// we assume FieldDesc* is valid (checked on creation)
to_wire(buf, cur->code.code);
+5
View File
@@ -348,6 +348,11 @@ struct Status {
std::string trace;
inline bool isSuccess() const { return code==Ok || code==Warn; }
static inline Status error(const std::string& msg, const std::string& trace = std::string())
{
return Status{Error, msg, trace};
}
};
inline
+48 -119
View File
@@ -25,7 +25,6 @@ struct ServerConn;
}
namespace server {
struct Handler;
struct Source;
/** PV Access protocol server instance
@@ -109,38 +108,70 @@ private:
std::shared_ptr<Pvt> pvt;
};
struct OpBase {
struct PVXS_API OpBase {
enum op_t {
None,
Info,
Get,
Put,
RPC,
};
protected:
std::string _peerName;
std::string _ifaceName;
std::string _name;
op_t _op;
public:
//! The Client endpoint address in "X.X.X.X:Y" format.
const std::string peerName;
const std::string& peerName() const { return _peerName; }
//! The local endpoint address in "X.X.X.X:Y" format.
const std::string ifaceName;
const std::string& ifaceName() const { return _ifaceName; }
//! The Channel name
const std::string name;
const std::string& name() const { return _name; }
op_t op() const { return _op; }
// TODO credentials
OpBase(const std::string& peerName,
const std::string& iface,
const std::string& name);
virtual ~OpBase() =0;
};
struct PVXS_API ExecOp : public OpBase {
virtual void reply() =0;
virtual void reply(const Value& val) =0;
virtual void error(const std::string& msg) =0;
virtual void onCancel(std::function<void()>&&) =0;
virtual ~ExecOp();
};
struct PVXS_API ConnectOp : public OpBase {
Value pvRequest;
virtual void connect(const Value& prototype) =0;
virtual void error(const std::string& msg) =0;
virtual ~ConnectOp();
virtual void onGet(std::function<void(std::unique_ptr<ExecOp>&&)>&& fn) =0;
virtual void onPut(std::function<void(std::unique_ptr<ExecOp>&&, Value&&)>&& fn) =0;
virtual void onClose(std::function<void(const std::string&)>&&) =0;
};
/** Manipulate an active Channel, and any in-progress Operations through it.
*
*/
struct PVXS_API ChannelControl : public OpBase {
ChannelControl(const std::string& peerName,
const std::string& iface,
const std::string& name)
:OpBase (peerName, iface, name)
{}
virtual ~ChannelControl() =0;
//! Set/replace Handler associated with this Channel
//! If called from outside a Handler method, blocks until in-progress Handler methods have returned.
virtual std::shared_ptr<Handler> setHandler(const std::shared_ptr<Handler>& h) =0;
//! invoked when a new GET, PUT, or RPC Operation is requested through this Channel
virtual void onOp(std::function<void(std::unique_ptr<ConnectOp>&&)>&& ) =0;
virtual void onRPC(std::function<void(std::unique_ptr<ExecOp>&&, Value&&)>&& fn)=0;
virtual void onClose(std::function<void(const std::string&)>&&) =0;
//! Force disconnection
//! If called from outside a Handler method, blocks until in-progress Handler methods have returned.
//! If called from outside a handler method, blocks until in-progress Handler methods have returned.
//! Reference to currently attached Handler is released.
virtual void close() =0;
@@ -199,108 +230,6 @@ struct PVXS_API Source {
virtual void onCreate(std::unique_ptr<ChannelControl>&& op) =0;
};
//! Token for an in-progress request for Channel data type information.
struct PVXS_API Introspect : public OpBase
{
//! Positive reply. Only the type of the provided Value is used. Any field values are ignored.
virtual void reply(const Value& prototype) =0;
//! Negative reply w/ error message
virtual void error(const std::string& msg) =0;
Introspect(const std::string& peerName,
const std::string& iface,
const std::string& name)
:OpBase (peerName, iface, name)
{}
virtual ~Introspect() =0;
};
struct PVXS_API Get : public OpBase
{
const Value request;
//! Define (and communicate) the type of this Get operation.
virtual void connect(const Value& prototype,
std::function<void()>&& onExec) =0;
//! Positive reply w/ data
virtual void reply(const Value& data) =0;
//! Negative reply w/ error message
virtual void error(const std::string& msg) =0;
Get(const std::string& peerName,
const std::string& iface,
const std::string& name,
const Value& request)
:OpBase (peerName, iface, name)
,request(request)
{}
virtual ~Get() =0;
};
struct PVXS_API Put : public OpBase
{
const Value request;
const Value value;
//! Positive reply
virtual void complete() =0;
//! Negative reply w/ error message
virtual void error(const std::string& msg) =0;
Put(const std::string& peerName,
const std::string& iface,
const std::string& name,
const Value& request,
const Value& value)
:OpBase (peerName, iface, name)
,request(request)
,value(value)
{}
virtual ~Put() =0;
};
struct PVXS_API RPC : public OpBase
{
const Value request;
const Value value;
//! Positive reply w/ data
virtual void reply(const Value& prototype) =0;
//! Negative reply w/ error message
virtual void error(const std::string& msg) =0;
RPC(const std::string& peerName,
const std::string& iface,
const std::string& name,
const Value& request,
const Value& value)
:OpBase (peerName, iface, name)
,request(request)
,value(value)
{}
virtual ~RPC() =0;
};
/** Requests for a particular Channel are dispatched through me.
*
* User code will sub-class.
*/
struct PVXS_API Handler {
virtual ~Handler();
/** Request for Channel data type information
*
* Ownership of the Introspect instance is passed to the callee.
* The request will be implicitly errored if the callee allows
* the Introspect to be deleted prior to replying.
*/
virtual void onIntrospect(std::unique_ptr<Introspect>&& op);
virtual void onGet(std::unique_ptr<Get>&& op);
virtual void onPut(std::unique_ptr<Put>&& op);
virtual void onRPC(std::unique_ptr<RPC>&& op);
};
}} // namespace pvxs::server
#endif // PVXS_SERVER_H
+3 -33
View File
@@ -597,41 +597,11 @@ void Server::Pvt::doBeaconsS(evutil_socket_t fd, short evt, void *raw)
Source::~Source() {}
OpBase::OpBase(const std::string& peerName,
const std::string& iface,
const std::string& name)
:peerName(peerName)
,ifaceName(iface)
,name(name)
{}
OpBase::~OpBase() {}
ChannelControl::~ChannelControl() {}
Introspect::~Introspect() {}
Get::~Get() {}
Put::~Put() {}
RPC::~RPC() {}
ConnectOp::~ConnectOp() {}
ExecOp::~ExecOp() {}
Handler::~Handler() {}
void Handler::onIntrospect(std::unique_ptr<Introspect>&& op)
{
op->error("Not Implemented");
}
void Handler::onGet(std::unique_ptr<Get>&& op)
{
op->error("Not Implemented");
}
void Handler::onPut(std::unique_ptr<Put>&& op)
{
op->error("Not Implemented");
}
void Handler::onRPC(std::unique_ptr<RPC> &&op)
{
op->error("Not Implemented");}
}
} // namespace pvxs::server
}} // namespace pvxs::server
+46 -11
View File
@@ -33,25 +33,60 @@ ServerChan::ServerChan(const std::shared_ptr<ServerConn> &conn,
ServerChan::~ServerChan() {}
ServerChannelControl::ServerChannelControl(const std::shared_ptr<ServerConn> &conn, const std::shared_ptr<ServerChan>& channel)
:server::ChannelControl(conn->peerName, conn->iface->name, channel->name)
,server(conn->iface->server->internal_self)
:server(conn->iface->server->internal_self)
,chan(channel)
{}
{
_op = None;
_name = channel->name;
_peerName = conn->peerName;
_ifaceName = conn->iface->name;
}
ServerChannelControl::~ServerChannelControl() {}
std::shared_ptr<server::Handler> ServerChannelControl::setHandler(const std::shared_ptr<server::Handler> &h)
void ServerChannelControl::onOp(std::function<void(std::unique_ptr<server::ConnectOp>&&)>&& fn)
{
std::shared_ptr<server::Handler> ret(h);
std::shared_ptr<server::Server::Pvt> serv(server);
auto serv = server.lock();
if(!serv)
return;
serv->acceptor_loop.call([this, &ret](){
std::shared_ptr<ServerChan> ch(chan);
serv->acceptor_loop.call([this, &fn](){
auto ch = chan.lock();
if(!ch)
return;
ch->handler.swap(ret);
ch->onOp = std::move(fn);
});
}
return ret;
void ServerChannelControl::onRPC(std::function<void(std::unique_ptr<server::ExecOp>&&, Value&&)>&& fn)
{
auto serv = server.lock();
if(!serv)
return;
serv->acceptor_loop.call([this, &fn](){
auto ch = chan.lock();
if(!ch)
return;
ch->onRPC = std::move(fn);
});
}
void ServerChannelControl::onClose(std::function<void(const std::string&)>&& fn)
{
auto serv = server.lock();
if(!serv)
return;
serv->acceptor_loop.call([this, &fn](){
auto ch = chan.lock();
if(!ch)
return;
ch->onClose = std::move(fn);
});
}
static
@@ -246,7 +281,7 @@ void ServerConn::handle_CREATE_CHANNEL()
for(auto& pair : iface->server->sources) {
try {
pair.second->onCreate(std::move(op));
if(!op || chan->handler || chan->state!=ServerChan::Creating) {
if(!op || chan->onOp || chan->onClose || chan->state!=ServerChan::Creating) {
claimed = chan->state==ServerChan::Creating;
log_printf(connsetup, PLVL_DEBUG, "Client %s %s channel to %s through %s\n", peerName.c_str(),
claimed?"accepted":"rejected", name.c_str(), pair.first.second.c_str());
+10 -10
View File
@@ -190,12 +190,6 @@ void ServerConn::handle_AUTHNZ()
// ignored (so far no auth plugin actually uses)
}
void ServerConn::handle_PUT()
{}
void ServerConn::handle_RPC()
{}
void ServerConn::handle_PUT_GET()
{}
@@ -225,6 +219,9 @@ void ServerConn::handle_CANCEL_REQUEST()
if(op->state==ServerOp::Executing) {
op->state = ServerOp::Idle;
if(op->onCancel)
op->onCancel();
} else {
// an allowed race
log_printf(connsetup, PLVL_DEBUG, "Client %s Cancel of non-executing Op\n", peerName.c_str());
@@ -251,9 +248,12 @@ void ServerConn::handle_DESTROY_REQUEST()
peerName.c_str(), unsigned(sid), unsigned(ioid));
} else {
it->second->state = ServerOp::Dead;
// TODO interface to notify Op of cancel/destroy
auto op = it->second;
opByIOID.erase(it);
op->state = ServerOp::Dead;
if(op->onClose)
op->onClose("");
}
}
@@ -424,6 +424,8 @@ void ServerConn::bevRead()
#undef CASE
}
// handlers may be cleared bev to force disconnect
if(!bev)
break;
// silently drain any unprocessed body (forward compatibility)
if(auto n = evbuffer_get_length(segBuf.get()))
@@ -532,6 +534,4 @@ void ServIface::onConnS(struct evconnlistener *listener, evutil_socket_t sock, s
ServerOp::~ServerOp() {}
void ServerOp::cancel() {}
}} // namespace pvxs::impl
+17 -7
View File
@@ -33,6 +33,9 @@ struct ServerOp
const uint32_t ioid;
std::function<void(const std::string&)> onClose;
std::function<void()> onCancel;
enum state_t {
Creating,
Idle,
@@ -41,9 +44,9 @@ struct ServerOp
} state;
ServerOp(const std::weak_ptr<ServerChan>& chan, uint32_t ioid) :chan(chan), ioid(ioid), state(Idle) {}
ServerOp(const ServerOp&) = delete;
ServerOp& operator=(const ServerOp&) = delete;
virtual ~ServerOp() =0;
virtual void cancel();
};
struct ServerChannelControl : public server::ChannelControl
@@ -51,7 +54,10 @@ struct ServerChannelControl : public server::ChannelControl
ServerChannelControl(const std::shared_ptr<ServerConn>& conn, const std::shared_ptr<ServerChan>& chan);
virtual ~ServerChannelControl();
virtual std::shared_ptr<server::Handler> setHandler(const std::shared_ptr<server::Handler> &h) override final;
virtual void onOp(std::function<void(std::unique_ptr<server::ConnectOp>&&)>&& fn) override final;
virtual void onRPC(std::function<void(std::unique_ptr<server::ExecOp>&&, Value&&)>&& fn) override final;
virtual void onClose(std::function<void(const std::string&)>&& fn) override final;
virtual void close() override final;
const std::weak_ptr<server::Server::Pvt> server;
@@ -66,12 +72,14 @@ struct ServerChan
const std::string name;
enum {
Creating,
Active,
Destroy,
Creating, // CREATE_CHANNEL request received, reply not sent
Active, // reply sent
Destroy, // DESTROY_CHANNEL request received and/or reply sent
} state;
std::shared_ptr<server::Handler> handler;
std::function<void(std::unique_ptr<server::ConnectOp>&&)> onOp;
std::function<void(std::unique_ptr<server::ExecOp>&&, Value&&)> onRPC;
std::function<void(const std::string&)> onClose;
std::map<uint32_t, std::shared_ptr<ServerOp> > opByIOID; // our subset of ServerConn::opByIOID
@@ -133,6 +141,8 @@ private:
CASE(MESSAGE);
#undef CASE
void handle_GPR(pva_app_msg_t cmd);
void cleanup();
void bevEvent(short events);
void bevRead();
+332 -124
View File
@@ -15,147 +15,271 @@ DEFINE_LOGGER(connsetup, "tcp.setup");
DEFINE_LOGGER(connio, "tcp.io");
namespace {
struct ServerGet : public ServerOp
// generalized Get/Put/RPC
struct ServerGPR : public ServerOp
{
ServerGet(const std::shared_ptr<ServerChan>& chan, uint32_t ioid)
ServerGPR(const std::shared_ptr<ServerChan>& chan, uint32_t ioid)
:ServerOp(chan, ioid)
{}
virtual ~ServerGet() {}
virtual ~ServerGPR() {}
void doReply(const std::shared_ptr<const FieldDesc>& type,
const Value& value,
const std::string& msg)
{
auto ch = chan.lock();
if(!ch)
return;
auto conn = ch->conn.lock();
if(!conn || !conn->bev)
return;
if(state==Dead || state==Idle) {
// no warn if Idle as this may result from a remote Cancel
return;
}
if(type && this->type)
throw std::logic_error("Operation already connected (has a type)");
if(cmd!=CMD_PUT && this->type && (!value || Value::Helper::desc(value)!=this->type.get()))
throw std::logic_error("PUT Must reply with exact type previously passed to connect()");
Status sts{};
if(!msg.empty())
sts = Status::error(msg);
{
(void)evbuffer_drain(conn->txBody.get(), evbuffer_get_length(conn->txBody.get()));
EvOutBuf R(hostBE, conn->txBody.get());
to_wire(R, uint32_t(ioid));
to_wire(R, subcmd);
to_wire(R, sts);
if(!sts.isSuccess()) {
// error()
if(state==Executing)
state = Idle;
else // Creating
state = Dead;
} else if(state==Creating) {
// connect()
if(cmd!=CMD_RPC) {
this->type = type;
to_wire(R, type.get());
}
state = Idle;
} else if(state==Executing) {
if(cmd==CMD_GET || (cmd==CMD_PUT && (subcmd&0x40))) {
to_wire_valid(R, value); // GET and PUT/Get reply with bitmask and partial value
} else if(cmd==CMD_RPC) {
auto type = Value::Helper::desc(value);
to_wire(R, type);
if(value)
to_wire_full(R, value);
}
state = lastRequest ? Dead : Idle;
} else {
assert(false);
}
assert(R.good());
}
conn->enqueueTxBody(cmd);
if(state == ServerOp::Dead) {
ch->opByIOID.erase(ioid);
auto it = conn->opByIOID.find(ioid);
if(it!=conn->opByIOID.end()) {
auto self(it->second);
conn->opByIOID.erase(it);
conn->iface->server->acceptor_loop.dispatch([self](){
self->onClose("");
});
} else {
assert(false); // really shouldn't happen
}
conn->opByIOID.erase(ioid);
}
}
pva_app_msg_t cmd;
uint8_t subcmd; // valid when state==Executing or Creating
bool lastRequest=false;
std::function<void()> onExec;
std::shared_ptr<const FieldDesc> type;
std::function<void(const std::string&)> onClose;
std::function<void(std::unique_ptr<server::ExecOp>&&, Value&&)> onPut;
std::function<void(std::unique_ptr<server::ExecOp>&&)> onGet;
};
struct ServerGetControl : public server::Get
struct ServerGPRConnect : public server::ConnectOp
{
ServerGetControl(ServerConn* conn,
ServerGPRConnect(ServerConn* conn,
const std::weak_ptr<server::Server::Pvt>& server,
const std::string& name,
const Value& request,
const std::weak_ptr<ServerGet>& op)
:server::Get(conn->peerName, conn->iface->name, name, request)
,server(server)
const std::weak_ptr<ServerGPR>& op)
:server(server)
,op(op)
{}
virtual ~ServerGetControl() {
error("Implict Cancel");
{
_op = Info;
_name = name;
_peerName = conn->peerName;
_ifaceName = conn->iface->name;
}
virtual ~ServerGPRConnect() {
error("Op Create implied error");
}
virtual void connect(const Value& prototype,
std::function<void()>&& onExec) override final
virtual void connect(const Value& prototype) override final
{
if(!onExec || !prototype)
throw std::logic_error("connect() requires prototype and onExec()");
action(ServerOp::Creating, prototype, std::string(), std::move(onExec));
auto serv = server.lock();
if(!serv)
return;
serv->acceptor_loop.call([this, &prototype](){
if(auto oper = op.lock()) {
if(oper->state!=ServerOp::Creating)
return;
if(!prototype && oper->cmd!=CMD_RPC)
throw std::invalid_argument("Must provide prototype");
std::shared_ptr<const FieldDesc> type;
if(prototype)
type = Value::Helper::type(prototype);
oper->doReply(type, Value(), std::string());
}
});
}
virtual void error(const std::string& msg) override final
{
if(msg.empty())
throw std::invalid_argument("Must provide error message");
auto serv = server.lock();
if(!serv)
return;
serv->acceptor_loop.call([this, &msg](){
if(auto oper = op.lock()) {
if(oper->state==ServerOp::Creating)
oper->doReply(nullptr, Value(), msg);
}
});
}
virtual void reply(const Value& value) override final
virtual void onGet(std::function<void(std::unique_ptr<server::ExecOp>&&)>&& fn) override final
{
if(!value)
throw std::logic_error("reply() requires Value");
action(ServerOp::Executing, value, std::string(), nullptr);
auto serv = server.lock();
if(!serv)
return;
serv->acceptor_loop.call([this, &fn](){
if(auto oper = op.lock())
oper->onGet = std::move(fn);
});
}
virtual void onPut(std::function<void(std::unique_ptr<server::ExecOp>&&, Value&&)>&& fn) override final
{
auto serv = server.lock();
if(!serv)
return;
serv->acceptor_loop.call([this, &fn](){
if(auto oper = op.lock())
oper->onPut = std::move(fn);
});
}
virtual void onClose(std::function<void(const std::string&)>&& fn) override final
{
auto serv = server.lock();
if(!serv)
return;
serv->acceptor_loop.call([this, &fn](){
if(auto oper = op.lock())
oper->onClose = std::move(fn);
});
}
const std::weak_ptr<server::Server::Pvt> server;
const std::weak_ptr<ServerGPR> op;
};
struct ServerGPRExec : public server::ExecOp
{
ServerGPRExec(ServerConn* conn,
const std::weak_ptr<server::Server::Pvt>& server,
const std::string& name,
const Value& request,
const std::weak_ptr<ServerGPR>& op)
:server(server)
,op(op)
{
_op = Info;
_name = name;
_peerName = conn->peerName;
_ifaceName = conn->iface->name;
}
virtual ~ServerGPRExec() {}
virtual void reply() override final
{
reply(Value());
}
virtual void reply(const Value& val) override final
{
auto serv = server.lock();
if(!serv)
return;
serv->acceptor_loop.call([this, &val](){
if(auto oper = op.lock()) {
oper->doReply(nullptr, val, std::string());
}
});
}
virtual void error(const std::string& msg) override final
{
action(ServerOp::Dead, Value(), msg, nullptr);
}
void action(ServerOp::state_t action,
const Value& value,
const std::string& msg,
std::function<void()>&& onExec)
{
if(msg.empty())
throw std::invalid_argument("Must provide error message");
auto serv = server.lock();
if(!serv)
return;
serv->acceptor_loop.call([this, action, &value, &msg, &onExec](){
auto oper = op.lock();
if(!oper || oper->state == ServerOp::Dead)
return;
auto chan = oper->chan.lock();
if(!chan)
return;
auto conn = chan->conn.lock();
if(!conn || !conn->bev)
return;
Status sts{};
uint8_t cmd = oper->state==ServerOp::Creating ? 0x08 : oper->lastRequest ? 0x50 : 0x40;
if(action==ServerOp::Dead) {
// error()
sts.code = Status::Error;
sts.msg = msg;
sts.trace = ""; // TODO
if(oper->state == ServerOp::Executing)
oper->state = ServerOp::Idle;
else
oper->state = ServerOp::Dead;
log_printf(connsetup, PLVL_DEBUG, "CLient %s Get error\n", peerName.c_str());
} else if(oper->state == ServerOp::Creating && action==ServerOp::Creating) {
// connect()
type = Value::Helper::type(value);
oper->onExec = std::move(onExec);
oper->state = ServerOp::Idle;
log_printf(connsetup, PLVL_DEBUG, "CLient %s Get connected\n", peerName.c_str());
} else if(oper->state == ServerOp::Executing && action==ServerOp::Executing) {
// reply()
if(type.get()!=Value::Helper::desc(value))
throw std::logic_error("Can't reply() w/ type change");
log_printf(connsetup, PLVL_DEBUG, "CLient %s Get complete\n", peerName.c_str());
oper->state = oper->lastRequest ? ServerOp::Dead : ServerOp::Idle;
} else {
log_printf(connsetup, PLVL_DEBUG, "Client %s Get operation not possible %d %d\n",
peerName.c_str(), action, oper->state);
return;
}
{
(void)evbuffer_drain(conn->txBody.get(), evbuffer_get_length(conn->txBody.get()));
EvOutBuf R(hostBE, conn->txBody.get());
to_wire(R, uint32_t(oper->ioid));
to_wire(R, cmd);
to_wire(R, sts);
if(sts.code!=Status::Ok) {
// error()
} else if(action==ServerOp::Creating) {
// connect()
to_wire(R, type.get()); // type
} else {
// reply()
to_wire_valid(R, value);
}
}
conn->enqueueTxBody(CMD_GET);
if(oper->state == ServerOp::Dead) {
conn->opByIOID.erase(oper->ioid);
chan->opByIOID.erase(oper->ioid);
serv->acceptor_loop.call([this, &msg](){
if(auto oper = op.lock()) {
oper->doReply(nullptr, Value(), msg);
}
});
}
virtual void onCancel(std::function<void()>&& fn) override final
{
auto serv = server.lock();
if(!serv)
return;
serv->acceptor_loop.call([this, &fn](){
if(auto oper = op.lock())
oper->onCancel = std::move(fn);
});
}
const std::weak_ptr<server::Server::Pvt> server;
const std::weak_ptr<ServerGet> op;
std::shared_ptr<const FieldDesc> type;
const std::weak_ptr<ServerGPR> op;
};
} // namespace
void ServerConn::handle_GET()
void ServerConn::handle_GPR(pva_app_msg_t cmd)
{
EvInBuf M(peerBE, segBuf.get(), 16);
@@ -168,13 +292,21 @@ void ServerConn::handle_GET()
Status reply{};
if(subcmd&0x08) { // INIT
Value pvRequest;
// subcmd bitmask
// 0x08 - Init
// 0x10 - Destroy
// 0x40 - Get
// 0x00 - context dependent. for CMD_GET the same as 0x40, for CMD_PUT and CMD_RPC the opposite of Get
bool isput = cmd!=CMD_GET && !(subcmd&0x40);
if(subcmd&0x08) { // INIT
// type and full value
Value pvRequest;
from_wire_type_value(M, rxRegistry, pvRequest);
if(!M.good()) {
log_printf(connio, PLVL_DEBUG, "Client %s\n Invalid Get INIT\n", peerName.c_str());
log_printf(connio, PLVL_DEBUG, "Client %s\n Invalid op=%x/%x INIT\n",
peerName.c_str(), cmd, subcmd);
bev.reset();
return;
}
@@ -187,9 +319,11 @@ void ServerConn::handle_GET()
return;
}
std::shared_ptr<ServerGet> op(new ServerGet(chan, ioid));
std::unique_ptr<ServerGetControl> ctrl(new ServerGetControl(this, iface->server->internal_self, chan->name, pvRequest, op));
std::shared_ptr<ServerGPR> op(new ServerGPR(chan, ioid));
op->cmd = cmd;
std::unique_ptr<ServerGPRConnect> ctrl(new ServerGPRConnect(this, iface->server->internal_self, chan->name, pvRequest, op));
op->subcmd = subcmd;
op->state = ServerOp::Creating;
opByIOID[ioid] = op;
@@ -199,35 +333,94 @@ void ServerConn::handle_GET()
peerName.c_str(), unsigned(ioid),
std::string(SB()<<pvRequest).c_str());
if(chan->handler)
chan->handler->onGet(std::move(ctrl));
if(cmd==CMD_RPC) {
ctrl->connect(Value());
} else { // EXEC should be 0x40 however, some clients are lax
// no additional message fields
if(!M.good()) {
log_printf(connio, PLVL_DEBUG, "Client %s\n Invalid Get EXEC\n", peerName.c_str());
bev.reset();
return;
} else if(chan->onOp) { // GET, PUT
chan->onOp(std::move(ctrl));
}
std::shared_ptr<ServerGet> op;
} else { // EXEC, maybe Get or Put
std::shared_ptr<ServerGPR> op;
auto it = opByIOID.find(ioid);
if(it==opByIOID.end() || !(op=std::dynamic_pointer_cast<ServerGet>(it->second))) {
if(it==opByIOID.end() || !(op=std::dynamic_pointer_cast<ServerGPR>(it->second))) {
log_printf(connio, PLVL_ERR, "Client %s Gets %s IOID %u\n", peerName.c_str(),
it==opByIOID.end() ? "non-existant" : "invalid", unsigned(ioid));
bev.reset();
return;
}
op->lastRequest = subcmd&0x10;
if(cmd!=CMD_RPC && !op->type) {
log_printf(connsetup, PLVL_ERR, "Client %s tries to Exec to early\n", peerName.c_str());
bev.reset();
return;
}
Value val;
if(cmd==CMD_RPC) {
// type and full value
from_wire_type_value(M, rxRegistry, val);
} else if(isput) {
// bitmask and partial value
val = Value::Helper::build(op->type);
from_wire_valid(M, rxRegistry, val);
}
if(!M.good()) {
log_printf(connio, PLVL_DEBUG, "Client %s\n Invalid op=%x/%x Get\n",
peerName.c_str(), cmd, subcmd);
bev.reset();
return;
}
auto chan = op->chan.lock();
if(!chan)
throw std::logic_error("live op on dead channel");
if(op->state==ServerOp::Idle) {
// all set
if(!op->lastRequest)
op->lastRequest = subcmd&0x10;
std::unique_ptr<ServerGPRExec> ctrl{new ServerGPRExec(this, iface->server->internal_self, chan->name, val, op)};
op->subcmd = subcmd;
op->state = ServerOp::Executing;
log_printf(connsetup, PLVL_DEBUG, "CLient %s Get executing\n", peerName.c_str());
op->onExec(); // notify
try {
if(cmd==CMD_RPC && isput) {
if(chan->onRPC)
chan->onRPC(std::move(ctrl), std::move(val));
else
ctrl->error("RPC Not Implemented");
} else if(cmd==CMD_PUT && isput) {
if(op->onPut)
op->onPut(std::move(ctrl), std::move(val));
else
ctrl->error("PUT Not Implemented");
} else if(cmd!=CMD_RPC && !isput) {
if(op->onGet)
op->onGet(std::move(ctrl));
else
ctrl->error("GET Not Implemented");
} else {
log_printf(connsetup, PLVL_ERR, "Client %s Get exec in incorrect command %d\n",
peerName.c_str(), subcmd);
}
} catch(std::exception& e) {
log_printf(connsetup, PLVL_ERR, "Client %s Unhandled exception in onGet/Put/RPC %s : %s\n",
peerName.c_str(), typeid(e).name(), e.what());
if(ctrl)
ctrl->error(e.what());
}
} else {
log_printf(connsetup, PLVL_ERR, "CLient %s Get exec in incorrect state %d\n",
@@ -237,4 +430,19 @@ void ServerConn::handle_GET()
}
void ServerConn::handle_GET()
{
handle_GPR(CMD_GET);
}
void ServerConn::handle_PUT()
{
handle_GPR(CMD_PUT);
}
void ServerConn::handle_RPC()
{
handle_GPR(CMD_RPC);
}
}} // namespace pvxs::impl
+59 -35
View File
@@ -20,23 +20,54 @@ struct ServerIntrospect : public ServerOp
:ServerOp(chan, ioid)
{}
virtual ~ServerIntrospect() {}
void doReply(const FieldDesc* type, const Status& sts)
{
if(state != ServerOp::Executing)
return;
auto ch = chan.lock();
if(!ch)
return;
auto conn = ch->conn.lock();
if(!conn || !conn->bev)
return;
{
(void)evbuffer_drain(conn->txBody.get(), evbuffer_get_length(conn->txBody.get()));
EvOutBuf R(hostBE, conn->txBody.get());
to_wire(R, uint32_t(ioid));
to_wire(R, sts);
if(type)
to_wire(R, type);
}
conn->enqueueTxBody(CMD_GET_FIELD);
state = ServerOp::Dead;
conn->opByIOID.erase(ioid);
ch->opByIOID.erase(ioid);
}
};
struct ServerIntrospectControl : public server::Introspect
struct ServerIntrospectControl : public server::ConnectOp
{
ServerIntrospectControl(ServerConn* conn,
ServerIntrospectControl(ServerConn *conn, ServerChan *chan,
const std::weak_ptr<server::Server::Pvt>& server,
const std::string& name,
const std::weak_ptr<ServerIntrospect>& op)
:server::Introspect(conn->peerName, conn->iface->name, name)
,server(server)
:server(server)
,op(op)
{}
{
_op = Info;
_name = chan->name;
_peerName = conn->peerName;
_ifaceName = conn->iface->name;
}
virtual ~ServerIntrospectControl() {
error("Implict Cancel");
}
virtual void reply(const Value& prototype) override final
virtual void connect(const Value& prototype) override final
{
auto desc = Value::Helper::desc(prototype);
if(!desc)
@@ -58,34 +89,27 @@ struct ServerIntrospectControl : public server::Introspect
return; // soft fail if already completed, cancelled, disconnected, ....
serv->acceptor_loop.call([this, type, &sts](){
auto oper = op.lock();
if(!oper || oper->state != ServerOp::Executing)
return;
auto chan = oper->chan.lock();
if(!chan)
return;
auto conn = chan->conn.lock();
if(!conn || !conn->bev)
return;
{
(void)evbuffer_drain(conn->txBody.get(), evbuffer_get_length(conn->txBody.get()));
EvOutBuf R(hostBE, conn->txBody.get());
to_wire(R, uint32_t(oper->ioid));
to_wire(R, sts);
if(type)
to_wire(R, type);
}
conn->enqueueTxBody(CMD_GET_FIELD);
oper->state = ServerOp::Dead;
conn->opByIOID.erase(oper->ioid);
chan->opByIOID.erase(oper->ioid);
if(auto oper = op.lock())
oper->doReply(type, sts);
});
}
virtual void onClose(std::function<void(const std::string&)>&& fn) override final
{
auto serv = server.lock();
if(!serv)
return;
serv->acceptor_loop.call([this, &fn](){
if(auto oper = op.lock())
if(auto chan = oper->chan.lock())
chan->onClose = std::move(fn);
});
}
// we'll never use these, so no reason to store
virtual void onGet(std::function<void(std::unique_ptr<server::ExecOp>&& fn)>&& fn) override final {}
virtual void onPut(std::function<void(std::unique_ptr<server::ExecOp>&& fn, Value&&)>&& fn) override final {}
const std::weak_ptr<server::Server::Pvt> server;
const std::weak_ptr<ServerIntrospect> op;
};
@@ -114,15 +138,15 @@ void ServerConn::handle_GET_FIELD()
}
std::shared_ptr<ServerIntrospect> op(new ServerIntrospect(chan, ioid));
std::unique_ptr<ServerIntrospectControl> ctrl(new ServerIntrospectControl(this, iface->server->internal_self, chan->name, op));
std::unique_ptr<ServerIntrospectControl> ctrl(new ServerIntrospectControl(this, chan.get(), iface->server->internal_self, op));
op->state = ServerOp::Executing; // this is a one-shot operation
opByIOID[ioid] = op;
chan->opByIOID[ioid] = op;
if(chan->handler)
chan->handler->onIntrospect(std::move(ctrl));
if(chan->onOp)
chan->onOp(std::move(ctrl));
}
}} // namespace pvxs::impl
+69 -42
View File
@@ -11,6 +11,8 @@
#include <atomic>
#include <epicsEvent.h>
#include <epicsMutex.h>
#include <epicsGuard.h>
#include <epicsTime.h>
#include <pvxs/server.h>
@@ -18,6 +20,7 @@
#include <pvxs/nt.h>
#include <pvxs/log.h>
#include "utilpvt.h"
namespace {
using namespace pvxs;
@@ -25,50 +28,22 @@ using namespace pvxs::server;
DEFINE_LOGGER(dummy,"dummyserv");
struct DummyHandler : public Handler
struct DummySource : public Source
{
static const Value mytype;
static std::atomic<unsigned> count;
const std::string name;
epicsMutex lock;
Value current;
DummyHandler()
:current(mytype.cloneEmpty())
explicit DummySource(const std::string& name)
:name(name)
,current(nt::NTScalar{TypeCode::Int32}.build().create())
{
epicsTimeStamp now;
epicsTimeGetCurrent(&now);
current["value"] = count.fetch_add(1);
current["value"] = 0u;
current["timeStamp.secondsPastEpoch"] = now.secPastEpoch+POSIX_TIME_AT_EPICS_EPOCH;
current["timeStamp.nanoseconds"] = now.nsec;
}
virtual ~DummyHandler() {}
virtual void onIntrospect(std::unique_ptr<Introspect> &&op) override final
{
log_printf(dummy, PLVL_INFO, "GET_FIELD\n");
op->reply(mytype);
}
virtual void onGet(std::unique_ptr<Get>&& op) override final
{
log_printf(dummy, PLVL_INFO, "Create GET\n");
std::shared_ptr<Get> sop(std::move(op));
sop->connect(current, [this, sop]()
{
log_printf(dummy, PLVL_INFO, "Execute GET\n");
// executing
sop->reply(current); // "current" never changes for us, so no locking
});
}
};
const Value DummyHandler::mytype = nt::NTScalar{TypeCode::Int32}.build().create();
std::atomic<unsigned> DummyHandler::count{};
struct DummySource : public Source
{
std::set<std::string> names;
virtual ~DummySource() {}
// Source interface
@@ -76,7 +51,7 @@ public:
virtual void onSearch(Search &search) override final
{
for(auto& op : search) {
if(names.find(op.name())!=names.end()) {
if(op.name()==name) {
log_printf(dummy, PLVL_INFO, "Claiming '%s'\n", op.name());
op.claim();
} else {
@@ -84,10 +59,63 @@ public:
}
}
}
virtual void onCreate(std::unique_ptr<ChannelControl>&& op) override final
virtual void onCreate(std::unique_ptr<ChannelControl>&& raw) override final
{
log_printf(dummy, PLVL_INFO, "Create '%s'\n", op->name.c_str());
op->setHandler(std::unique_ptr<Handler>{new DummyHandler});
if(raw->name()!=name)
return;
std::shared_ptr<ChannelControl> chan(std::move(raw));
log_printf(dummy, PLVL_INFO, "Create '%s'\n", chan->name().c_str());
// callback when client creating Get/Put
chan->onOp([this, chan](std::shared_ptr<ConnectOp>&& raw){
std::shared_ptr<ConnectOp> conn(std::move(raw));
log_printf(dummy, PLVL_INFO, "Begin Operation on '%s'\n", chan->name().c_str());
conn->onGet([this, chan](std::unique_ptr<ExecOp>&& raw) {
// client executing Get or Put
log_printf(dummy, PLVL_INFO, "Exec Get on '%s'\n", chan->name().c_str());
{
epicsGuard<epicsMutex> G(lock);
raw->reply(current);
}
});
conn->onPut([this, chan](std::unique_ptr<ExecOp>&& raw, Value&& top) {
log_printf(dummy, PLVL_INFO, "Exec Put on '%s'\n", chan->name().c_str());
{
epicsTimeStamp now;
epicsTimeGetCurrent(&now);
epicsGuard<epicsMutex> G(lock);
current["value"] = top["value"].as<uint32_t>();
current["timeStamp.secondsPastEpoch"] = now.secPastEpoch+POSIX_TIME_AT_EPICS_EPOCH;
current["timeStamp.nanoseconds"] = now.nsec;
}
raw->reply(); // inform client that Put was successful
});
epicsGuard<epicsMutex> G(lock);
conn->connect(current); // only type is used
});
// callback when client executing RPC
chan->onRPC([this, chan](std::unique_ptr<ExecOp>&& raw, Value&& top) {
log_printf(dummy, PLVL_INFO, "Begin RPC on '%s' with %s\n", chan->name().c_str(),
std::string(SB()<<top).c_str());
auto ret = nt::NTScalar{TypeCode::String}.build().create();
ret["value"] = "RPC test";
raw->reply(ret);
});
}
};
@@ -97,11 +125,10 @@ int main(int argc, char *argv[])
{
int ret = 0;
try {
pvxs::logger_level_set("dummy", PLVL_INFO);
pvxs::logger_level_set("dummyserv", PLVL_INFO);
pvxs::logger_config_env();
std::shared_ptr<DummySource> src(new DummySource);
src->names.emplace("blah");
auto src = std::make_shared<DummySource>("blah");
auto serv = Server::Config::from_env()
.build()