client reExec() for Get

This commit is contained in:
Michael Davidsaver
2020-09-14 13:42:07 -07:00
parent 2972bd8205
commit fb9f356457
5 changed files with 197 additions and 81 deletions
+141 -80
View File
@@ -113,13 +113,15 @@ struct GPROp : public OperationBase
std::function<void(Result&&)> done;
std::function<void (const Value&)> onInit;
Value pvRequest;
Value rpcarg;
Value arg;
Result result;
bool getOput = false;
bool autoExec = true;
enum state_t : uint8_t {
Connecting, // waiting for an active Channel
Creating, // waiting for reply to INIT
Idle, // waiting for explicit exec request
GetOPut, // waiting for reply to GET (CMD_PUT only)
BuildPut, // waiting for PUT builder callback
Exec, // waiting for reply to EXEC
@@ -185,10 +187,10 @@ struct GPROp : public OperationBase
log_info_printf(setup, "implied cancel of op%x on channel '%s'\n",
op, chan ? chan->name.c_str() : "");
}
if(state==GetOPut || state==Exec) {
if(state==Idle || state==GetOPut || state==Exec) {
chan->conn->sendDestroyRequest(chan->sid, ioid);
}
if(state==Creating || state==GetOPut || state==Exec) {
if(state==Creating || state==Idle || state==GetOPut || state==Exec) {
// This opens up a race with an in-flight reply.
chan->conn->opByIOID.erase(ioid);
chan->opByIOID.erase(ioid);
@@ -198,6 +200,109 @@ struct GPROp : public OperationBase
return ret;
}
virtual void reExec(const Value& arg, std::function<void(client::Result&&)>&& resultcb) override final
{
if(op!=RPC && arg)
throw std::invalid_argument("Only RPC may reExec() with Value");
auto a(arg);
auto cb(std::move(resultcb));
auto context = chan->context;
context->tcp_loop.dispatch([this, a, cb]() mutable {
if(autoExec) {
client::Result ret(std::make_exception_ptr(std::invalid_argument("reExec() requires Operation creation with .autoExec(false)")));
cb(std::move(ret));
return;
}
if(state!=Idle)
return;
this->arg = std::move(a);
this->done = std::move(cb);
_reExec();
});
}
void _reExec()
{
if(op==Put && getOput) {
state = GPROp::GetOPut;
} else if(op==Put && !getOput) {
state = GPROp::BuildPut;
} else {
state = GPROp::Exec;
}
sendReply();
}
void sendReply()
{
Value temp;
// transient state (because builder callback is synchronous)
if(state==GPROp::BuildPut) {
temp = arg.clone();
try {
temp = builder(std::move(temp));
state = GPROp::Exec;
} catch(std::exception& e) {
result = Result(std::current_exception());
state = GPROp::Done;
}
}
// act on new operation state
{
(void)evbuffer_drain(chan->conn->txBody.get(), evbuffer_get_length(chan->conn->txBody.get()));
EvOutBuf R(hostBE, chan->conn->txBody.get());
to_wire(R, chan->sid);
to_wire(R, ioid);
if(state==GPROp::GetOPut) {
to_wire(R, uint8_t(0x40));
} else if(state==GPROp::Exec) {
to_wire(R, uint8_t(0x00));
if(op==Put) {
to_wire_valid(R, temp);
} else if(op==RPC) {
to_wire(R, Value::Helper::desc(arg));
if(arg)
to_wire_full(R, arg);
}
} else if(state==GPROp::Done) {
// we're actually building CMD_DESTROY_REQUEST
// nothing more needed
} else {
throw std::logic_error("Invalid state in GPR sendReply()");
}
}
chan->conn->enqueueTxBody(state==GPROp::Done ? CMD_DESTROY_REQUEST : (pva_app_msg_t)op);
if(state==GPROp::Done) {
// CMD_DESTROY_REQUEST is not acknowledged (sigh...)
// but at this point a server should not send further GET/PUT/RPC w/ this IOID
// so we can ~safely forget about it.
// we might get CMD_MESSAGE, but these could be ignored with no ill effects.
chan->conn->opByIOID.erase(ioid);
chan->opByIOID.erase(ioid);
notify();
}
}
virtual void createOp() override final
{
if(state!=Connecting) {
@@ -230,19 +335,19 @@ struct GPROp : public OperationBase
if(state==Connecting || state==Done) {
// noop
} else if(state==Creating || state==GetOPut || (state==Exec && op==Get)) {
// return to pending
chan->pending.push_back(self);
state = Connecting;
} else if(state==Exec) {
} else if(state==Exec && op!=Get && !autoExec) {
// can't restart as server side-effects may occur
state = Done;
result = Result(std::make_exception_ptr(Disconnect()));
notify();
} else if(state==Creating || state==Idle || state==GetOPut || state==Exec) {
// return to pending
chan->pending.push_back(self);
state = Connecting;
} else {
state = Done;
result = Result(std::make_exception_ptr(std::logic_error("GPR Disconnect in unexpected state")));
@@ -369,10 +474,14 @@ void Connection::handle_GPR(pva_app_msg_t cmd)
if(!sts.isSuccess()) {
gpr->result = Result(std::make_exception_ptr(RemoteError(sts.msg)));
gpr->state = GPROp::Done;
gpr->state = gpr->state==GPROp::Creating || gpr->autoExec ? GPROp::Done : GPROp::Idle;
} else if(gpr->state==GPROp::Creating) {
gpr->state = GPROp::Idle;
if(cmd==CMD_PUT || cmd==CMD_GET)
gpr->arg = data; // save for later use in sendReply()
try {
if(gpr->onInit)
gpr->onInit(data);
@@ -381,15 +490,10 @@ void Connection::handle_GPR(pva_app_msg_t cmd)
gpr->state = GPROp::Done;
}
if(cmd==CMD_PUT && gpr->getOput) {
gpr->state = GPROp::GetOPut;
} else if(cmd==CMD_PUT && !gpr->getOput) {
gpr->state = GPROp::BuildPut;
} else {
gpr->state = GPROp::Exec;
}
if(gpr->state==GPROp::Idle && gpr->autoExec)
gpr->_reExec();
// reply may now be sent, or deferred
return;
} else if(gpr->state==GPROp::GetOPut) {
gpr->state = GPROp::BuildPut;
@@ -397,73 +501,25 @@ void Connection::handle_GPR(pva_app_msg_t cmd)
info->prototype.assign(data);
} else if(gpr->state==GPROp::Exec) {
gpr->state = GPROp::Done;
// data always empty for CMD_PUT
gpr->result = Result(std::move(data), peerName);
if(!gpr->autoExec) {
gpr->state = GPROp::Idle;
gpr->notify();
return;
}
gpr->state = GPROp::Done;
} else {
// should be avoided above
throw std::logic_error("GPR advance state inconsistent");
}
// transient state (because builder callback is synchronous)
if(gpr->state==GPROp::BuildPut) {
Value arg(info->prototype.clone());
try {
info->prototype = gpr->builder(std::move(arg));
gpr->state = GPROp::Exec;
} catch(std::exception& e) {
gpr->result = Result(std::current_exception());
gpr->state = GPROp::Done;
}
}
log_debug_printf(io, "Server %s channel %s op%02x state %d -> %d\n",
peerName.c_str(), op->chan->name.c_str(), cmd, prev, gpr->state);
peerName.c_str(), gpr->chan->name.c_str(), cmd, prev, gpr->state);
// act on new operation state
{
(void)evbuffer_drain(txBody.get(), evbuffer_get_length(txBody.get()));
EvOutBuf R(hostBE, txBody.get());
to_wire(R, op->chan->sid);
to_wire(R, ioid);
if(gpr->state==GPROp::GetOPut) {
to_wire(R, uint8_t(0x40));
} else if(gpr->state==GPROp::Exec) {
to_wire(R, uint8_t(0x00));
if(cmd==CMD_PUT) {
to_wire_valid(R, info->prototype);
} else if(cmd==CMD_RPC) {
to_wire(R, Value::Helper::desc(gpr->rpcarg));
if(gpr->rpcarg)
to_wire_full(R, gpr->rpcarg);
}
} else if(gpr->state==GPROp::Done) {
// we're actually building CMD_DESTROY_REQUEST
// nothing more needed
}
}
enqueueTxBody(gpr->state==GPROp::Done ? CMD_DESTROY_REQUEST : cmd);
if(gpr->state==GPROp::Done) {
// CMD_DESTROY_REQUEST is not acknowledged (sigh...)
// but at this point a server should not send further GET/PUT/RPC w/ this IOID
// so we can ~safely forget about it.
// we might get CMD_MESSAGE, but these could be ignored with no ill effects.
opByIOID.erase(ioid);
gpr->chan->opByIOID.erase(ioid);
gpr->notify();
}
gpr->sendReply();
}
void Connection::handle_GET() { handle_GPR(CMD_GET); }
@@ -506,6 +562,7 @@ std::shared_ptr<Operation> GetBuilder::_exec_get()
auto op = std::make_shared<GPROp>(Operation::Get, chan);
op->setDone(std::move(_result), std::move(_onInit));
op->autoExec = _autoexec;
op->pvRequest = _buildReq();
chan->pending.push_back(op);
@@ -548,6 +605,7 @@ std::shared_ptr<Operation> PutBuilder::exec()
// handled above
}
op->getOput = _doGet;
op->autoExec = _autoexec;
op->pvRequest = _buildReq();
chan->pending.push_back(op);
@@ -575,11 +633,14 @@ std::shared_ptr<Operation> RPCBuilder::exec()
auto op = std::make_shared<GPROp>(Operation::RPC, chan);
op->setDone(std::move(_result), std::move(_onInit));
if(_argument) {
op->rpcarg = std::move(_argument);
if(!_autoexec)
throw std::invalid_argument("Pass RPC argument during reExec()");
op->arg = std::move(_argument);
} else if(_args) {
op->rpcarg = _args->uriArgs();
op->rpcarg["path"] = _name;
op->arg = _args->uriArgs();
op->arg["path"] = _name;
}
op->autoExec = _autoexec;
op->pvRequest = _buildReq();
chan->pending.push_back(op);
+2
View File
@@ -69,6 +69,8 @@ struct InfoOp : public OperationBase
return ret;
}
void reExec(const Value& arg, std::function<void(client::Result&&)>&& resultcb) override final {}
virtual void createOp() override final
{
if(state!=Connecting) {
+2
View File
@@ -210,6 +210,8 @@ struct SubscriptionImpl : public OperationBase, public Subscription
return ret;
}
void reExec(const Value& arg, std::function<void(client::Result&&)>&& resultcb) override final {}
virtual void createOp() override final
{
if(state!=Connecting) {
+10
View File
@@ -139,6 +139,10 @@ struct PVXS_API Operation {
//! Queue an interruption of a wait() or wait(double) call.
virtual void interrupt() =0;
// Expert API
// (Re)issue request built with autoExec(false)
virtual void reExec(const Value& arg, std::function<void(client::Result&&)>&& resultcb) =0;
};
//! Handle for monitor subscription
@@ -464,6 +468,7 @@ protected:
struct Req;
std::shared_ptr<Req> req;
unsigned _prio = 0u;
bool _autoexec = true;
CommonBase() = default;
CommonBase(const std::shared_ptr<Context::Pvt>& ctx, const std::string& name) : ctx(ctx), _name(name) {}
@@ -543,6 +548,11 @@ public:
// called during operation INIT phase for Get/Put/Monitor when remote type
// description is available.
SubBuilder& onInit(std::function<void (const Value&)>&& cb) { this->_onInit = std::move(cb); return _sb(); }
// Expert API
// control whether operations automatically proceed from INIT to EXEC
// cf. reExec()
SubBuilder& autoExec(bool b) { this->_autoexec = b; return _sb(); }
};
} // namespace detail
+42 -1
View File
@@ -237,6 +237,46 @@ struct Tester {
cli = client::Context();
op.reset();
}
void manualExec()
{
testShow()<<__func__;
epicsEvent initd;
epicsEvent done;
mbox.open(initial);
serv.start();
auto op = cli.get("mailbox")
.autoExec(false)
.onInit([&initd](const Value& prototype) {
testDiag("onInit()");
initd.signal();
})
.result([&initd](client::Result&& result) {
testFail("result() unexpected error prior to onInit()");
initd.signal();
})
.exec();
testOk1(initd.wait(5.0));
testDiag("reExec() 1");
op->reExec(Value(), [&done](client::Result&& result) {
testTrue(!!result());
testDiag("result() 1");
done.signal();
});
testOk1(done.wait(5.0));
testDiag("reExec() 2");
op->reExec(Value(), [&done](client::Result&& result) {
testTrue(!!result());
testDiag("result() 2");
done.signal();
});
testOk1(done.wait(5.0));
}
};
struct ErrorSource : public server::Source
@@ -309,7 +349,7 @@ void testError(bool phase)
MAIN(testget)
{
testPlan(24);
testPlan(29);
testSetup();
logger_config_env();
Tester().testConnector();
@@ -319,6 +359,7 @@ MAIN(testget)
Tester().timeout();
Tester().cancel();
Tester().orphan();
Tester().manualExec();
testError(false);
testError(true);
cleanup_for_valgrind();