client: add onInit hook

This commit is contained in:
Michael Davidsaver
2020-08-31 15:29:34 -07:00
parent 564b9ec2cc
commit 2972bd8205
4 changed files with 39 additions and 9 deletions
+19 -7
View File
@@ -111,6 +111,7 @@ struct GPROp : public OperationBase
{
std::function<Value(Value&&)> builder;
std::function<void(Result&&)> done;
std::function<void (const Value&)> onInit;
Value pvRequest;
Value rpcarg;
Result result;
@@ -135,10 +136,11 @@ struct GPROp : public OperationBase
_cancel(true);
}
void setDone(decltype (done)&& cb)
void setDone(decltype (done)&& donecb, decltype (onInit)&& initcb)
{
if(cb) {
done = std::move(cb);
onInit = std::move(initcb);
if(donecb) {
done = std::move(donecb);
} else {
auto waiter = this->waiter = std::make_shared<ResultWaiter>();
done = [waiter](Result&& result) {
@@ -166,10 +168,12 @@ struct GPROp : public OperationBase
{
auto context = chan->context;
decltype (done) junk;
decltype (onInit) junkI;
bool ret;
context->tcp_loop.call([this, &junk, &ret](){
context->tcp_loop.call([this, &junk, &junkI, &ret](){
ret = _cancel(false);
junk = std::move(done);
junkI = std::move(onInit);
// leave opByIOID for GC
});
return ret;
@@ -369,6 +373,14 @@ void Connection::handle_GPR(pva_app_msg_t cmd)
} else if(gpr->state==GPROp::Creating) {
try {
if(gpr->onInit)
gpr->onInit(data);
} catch(std::exception& e) {
gpr->result = Result(std::current_exception());
gpr->state = GPROp::Done;
}
if(cmd==CMD_PUT && gpr->getOput) {
gpr->state = GPROp::GetOPut;
@@ -493,7 +505,7 @@ std::shared_ptr<Operation> GetBuilder::_exec_get()
auto chan = Channel::build(ctx->shared_from_this(), _name);
auto op = std::make_shared<GPROp>(Operation::Get, chan);
op->setDone(std::move(_result));
op->setDone(std::move(_result), std::move(_onInit));
op->pvRequest = _buildReq();
chan->pending.push_back(op);
@@ -520,7 +532,7 @@ std::shared_ptr<Operation> PutBuilder::exec()
auto chan = Channel::build(ctx->shared_from_this(), _name);
auto op = std::make_shared<GPROp>(Operation::Put, chan);
op->setDone(std::move(_result));
op->setDone(std::move(_result), std::move(_onInit));
if(_builder) {
op->builder = std::move(_builder);
@@ -561,7 +573,7 @@ std::shared_ptr<Operation> RPCBuilder::exec()
auto chan = Channel::build(ctx->shared_from_this(), _name);
auto op = std::make_shared<GPROp>(Operation::RPC, chan);
op->setDone(std::move(_result));
op->setDone(std::move(_result), std::move(_onInit));
if(_argument) {
op->rpcarg = std::move(_argument);
} else if(_args) {
+5
View File
@@ -39,6 +39,7 @@ struct SubscriptionImpl : public OperationBase, public Subscription
evevent ackTick;
// const after exec()
std::function<void (const Value&)> onInit;
std::function<void(Subscription&)> event;
Value pvRequest;
bool pipeline = false;
@@ -447,6 +448,9 @@ void Connection::handle_MONITOR()
peerName.c_str(),
mon->chan->name.c_str());
if(mon->onInit)
mon->onInit(info->prototype);
mon->state = SubscriptionImpl::Idle;
if(mon->autostart)
@@ -547,6 +551,7 @@ std::shared_ptr<Subscription> MonitorBuilder::exec()
auto op = std::make_shared<SubscriptionImpl>(Operation::Monitor, chan);
op->event = std::move(_event);
op->onInit = std::move(_onInit);
op->pvRequest = _buildReq();
op->maskConn = _maskConn;
op->maskDiscon = _maskDisconn;
+6
View File
@@ -496,6 +496,7 @@ protected:
template<typename SubBuilder, typename Base>
class CommonBuilder : public Base {
protected:
std::function<void (const Value&)> _onInit;
CommonBuilder() = default;
constexpr CommonBuilder(const std::shared_ptr<Context::Pvt>& ctx, const std::string& name) : Base(ctx, name) {}
inline SubBuilder& _sb() { return static_cast<SubBuilder&>(*this); }
@@ -537,6 +538,11 @@ public:
SubBuilder& priority(int p) { this->_prio = p; return _sb(); }
SubBuilder& server(const std::string& s) { this->_server = s; return _sb(); }
// Expert API
// called during operation INIT phase for Get/Put/Monitor when remote type
// description is available.
SubBuilder& onInit(std::function<void (const Value&)>&& cb) { this->_onInit = std::move(cb); return _sb(); }
};
} // namespace detail
+9 -2
View File
@@ -101,14 +101,21 @@ struct Tester {
mbox.open(initial);
serv.start();
std::atomic<bool> hadInit{false};
auto op = cli.get("mailbox").exec();
auto op = cli.get("mailbox")
.onInit([&hadInit](const Value& prototype) {
testShow()<<"onInit() << "<<prototype;
hadInit.store(prototype["value"].valid());
})
.exec();
cli.hurryUp();
auto result = op->wait(5.0);
testEq(result["value"].as<int32_t>(), 42);
testTrue(hadInit.load());
}
void testWait()
@@ -302,7 +309,7 @@ void testError(bool phase)
MAIN(testget)
{
testPlan(23);
testPlan(24);
testSetup();
logger_config_env();
Tester().testConnector();