reExecGet/Put()

This commit is contained in:
Michael Davidsaver
2020-12-31 11:12:19 -08:00
parent 708fbc8062
commit cd990fb459
7 changed files with 161 additions and 30 deletions
+61 -20
View File
@@ -109,6 +109,8 @@ namespace {
struct GPROp : public OperationBase
{
std::weak_ptr<GPROp> internal_self;
std::function<Value(Value&&)> builder;
std::function<void(Result&&)> done;
std::function<void (const Value&)> onInit;
@@ -199,36 +201,61 @@ struct GPROp : public OperationBase
return ret;
}
virtual void reExec(const Value& arg, std::function<void(client::Result&&)>&& resultcb) override final
void _reExecImpl(bool put, const Value& arg, std::function<void(client::Result&&)>&& resultcb)
{
if(op!=RPC && arg)
throw std::invalid_argument("Only RPC may reExec() with Value");
auto a(arg);
auto cb(std::move(resultcb));
std::shared_ptr<GPROp> self(internal_self);
loop.dispatch([this, a, cb]() mutable {
if(autoExec) {
loop.dispatch([self, a, cb, put]() mutable {
if(self->autoExec) {
client::Result ret(std::make_exception_ptr(std::invalid_argument("reExec() requires Operation creation with .autoExec(false)")));
cb(std::move(ret));
return;
}
if(state!=Idle)
if(self->state!=Idle)
return;
this->arg = std::move(a);
this->done = std::move(cb);
if(self->op==RPC) {
self->arg = std::move(a);
_reExec();
} else if(put && self->op==Put) {
self->builder = [a](Value&&) -> Value {
// caller should be passing a Value of the correct prototype
// given through onInit().
return a;
};
}
self->done = std::move(cb);
self->_reExec(put);
});
}
void _reExec()
void reExecGet(std::function<void(client::Result&&)>&& resultcb) override final
{
if(op==Put && getOput) {
if(op!=Get && op!=Put)
throw std::logic_error("reExecGet() only meaningful for .get() and .put()");
_reExecImpl(false, Value(), std::move(resultcb));
}
void reExecPut(const Value& arg, std::function<void(client::Result&&)>&& resultcb) override final
{
if(op!=Get && op!=Put) {
throw std::logic_error("reExecPut() only meaningful for .put()");
} else if(!arg) {
throw std::invalid_argument("reExecPut() Put requires Value");
}
_reExecImpl(true, arg, std::move(resultcb));
}
void _reExec(bool put)
{
if(op==Put && !put) {
state = GPROp::GetOPut;
} else if(op==Put && !getOput) {
} else if(op==Put && put) {
state = GPROp::BuildPut;
} else {
@@ -492,12 +519,22 @@ void Connection::handle_GPR(pva_app_msg_t cmd)
}
if(gpr->state==GPROp::Idle && gpr->autoExec)
gpr->_reExec();
gpr->_reExec(!gpr->getOput);
// reply may now be sent, or deferred
return;
} else if(gpr->state==GPROp::GetOPut) {
gpr->state = GPROp::BuildPut;
if(gpr->autoExec) {
// proceed to execute put
gpr->state = GPROp::BuildPut;
} else {
// deliver get result
gpr->state = GPROp::Idle;
gpr->result = Result(std::move(data), peerName);
gpr->notify();
return;
}
info->prototype.assign(data);
@@ -530,10 +567,12 @@ void Connection::handle_RPC() { handle_GPR(CMD_RPC); }
static
std::shared_ptr<Operation> gpr_setup(const std::shared_ptr<ContextImpl>& context,
std::string name, // need to capture by value
const std::shared_ptr<GPROp>& op,
std::shared_ptr<GPROp>&& op,
bool syncCancel)
{
auto internal(op);
auto internal(std::move(op));
internal->internal_self = internal;
std::shared_ptr<GPROp> external(internal.get(), [internal, syncCancel](GPROp*) mutable {
// (maybe) user thread
auto temp(std::move(internal));
@@ -574,7 +613,7 @@ std::shared_ptr<Operation> GetBuilder::_exec_get()
op->autoExec = _autoexec;
op->pvRequest = _buildReq();
return gpr_setup(context, _name, op, _syncCancel);
return gpr_setup(context, _name, std::move(op), _syncCancel);
}
std::shared_ptr<Operation> PutBuilder::exec()
@@ -604,13 +643,15 @@ std::shared_ptr<Operation> PutBuilder::exec()
op->autoExec = _autoexec;
op->pvRequest = _buildReq();
return gpr_setup(context, _name, op, _syncCancel);
return gpr_setup(context, _name, std::move(op), _syncCancel);
}
std::shared_ptr<Operation> RPCBuilder::exec()
{
if(!ctx)
throw std::logic_error("NULL Builder");
if(!_autoexec)
throw std::logic_error("autoExec(false) not possible for rpc()");
auto context(ctx->impl->shared_from_this());
@@ -627,7 +668,7 @@ std::shared_ptr<Operation> RPCBuilder::exec()
op->autoExec = _autoexec;
op->pvRequest = _buildReq();
return gpr_setup(context, _name, op, _syncCancel);
return gpr_setup(context, _name, std::move(op), _syncCancel);
}
} // namespace client