From 89f9c54d6282e92e704c7bd2d3c23359ed9bdbc2 Mon Sep 17 00:00:00 2001 From: Michael Davidsaver Date: Tue, 29 Dec 2020 12:14:02 -0800 Subject: [PATCH] client: relax exec() synchronization dispatch() instead of call(). --- src/client.cpp | 4 +- src/clientget.cpp | 174 ++++++++++++++++----------------------- src/clientimpl.h | 6 +- src/clientintrospect.cpp | 80 +++++++++--------- src/clientmon.cpp | 168 +++++++++++++++++++------------------ 5 files changed, 198 insertions(+), 234 deletions(-) diff --git a/src/client.cpp b/src/client.cpp index b616fdc..e1d02ec 100644 --- a/src/client.cpp +++ b/src/client.cpp @@ -213,9 +213,9 @@ void ResultWaiter::complete(Result&& result, bool interrupt) notify.signal(); } -OperationBase::OperationBase(operation_t op, const std::shared_ptr& chan) +OperationBase::OperationBase(operation_t op, const evbase& loop) :Operation(op) - ,chan(chan) + ,loop(loop) {} OperationBase::~OperationBase() {} diff --git a/src/clientget.cpp b/src/clientget.cpp index 677c3a4..26797ea 100644 --- a/src/clientget.cpp +++ b/src/clientget.cpp @@ -1,4 +1,4 @@ -/** +/** * Copyright - See the COPYRIGHT that is included with this distribution. * pvxs is distributed subject to a Software License Agreement found * in file LICENSE that is included with this distribution. @@ -130,11 +130,11 @@ struct GPROp : public OperationBase INST_COUNTER(GPROp); - GPROp(operation_t op, const std::shared_ptr& chan) - :OperationBase (op, chan) + GPROp(operation_t op, const evbase& loop) + :OperationBase (op, loop) {} ~GPROp() { - chan->context->tcp_loop.assertInLoop(); + loop.assertInLoop(); _cancel(true); } @@ -168,11 +168,10 @@ struct GPROp : public OperationBase virtual bool cancel() override final { - auto context = chan->context; decltype (done) junk; decltype (onInit) junkI; bool ret; - context->tcp_loop.call([this, &junk, &junkI, &ret](){ + loop.call([this, &junk, &junkI, &ret](){ ret = _cancel(false); junk = std::move(done); junkI = std::move(onInit); @@ -208,8 +207,7 @@ struct GPROp : public OperationBase auto a(arg); auto cb(std::move(resultcb)); - auto context = chan->context; - context->tcp_loop.dispatch([this, a, cb]() mutable { + loop.dispatch([this, a, cb]() mutable { if(autoExec) { client::Result ret(std::make_exception_ptr(std::invalid_argument("reExec() requires Operation creation with .autoExec(false)"))); cb(std::move(ret)); @@ -527,52 +525,50 @@ void Connection::handle_PUT() { handle_GPR(CMD_PUT); } void Connection::handle_RPC() { handle_GPR(CMD_RPC); } static -void gpr_cleanup(std::shared_ptr& ret, std::shared_ptr&& op) +std::shared_ptr gpr_setup(const std::shared_ptr& context, + std::string name, // need to capture by value + const std::shared_ptr& op) { - auto cap(std::move(op)); - auto loop(cap->chan->context->tcp_loop); - ret.reset(cap.get(), [cap, loop](Operation*) mutable { - auto L(std::move(loop)); - // from use thread - L.call([&cap]() { - auto temp(std::move(cap)); - // on worker - try { - temp->_cancel(true); - }catch(std::exception& e){ - log_exc_printf(setup, "Channel %s error in get cancel(): %s", - temp->chan->name.c_str(), e.what()); - } - // ensure dtor on worker - temp.reset(); - }); + auto internal(op); + std::shared_ptr external(internal.get(), [internal](GPROp*) mutable { + // (maybe) user thread + auto loop(internal->loop); + // std::bind for lack of c++14 generalized capture + // to move internal ref to worker for dtor + loop.call(std::bind([](std::shared_ptr& op) { + // on worker + + // ordering of dispatch()/call() ensures creation before destruction + assert(op->chan); + op->_cancel(true); }, std::move(internal))); + assert(!internal); + internal.reset(); }); + + context->tcp_loop.dispatch([internal, context, name]() { + // on worker + + internal->chan = Channel::build(context, name); + + internal->chan->pending.push_back(internal); + internal->chan->createOperations(); + }); + + return external; } std::shared_ptr GetBuilder::_exec_get() { + assert(_get); if(!ctx) throw std::logic_error("NULL Builder"); - std::shared_ptr ret; - assert(_get); + auto op(std::make_shared(Operation::Get, ctx->tcp_loop)); + op->setDone(std::move(_result), std::move(_onInit)); + op->autoExec = _autoexec; + op->pvRequest = _buildReq(); - ctx->tcp_loop.call([&ret, this]() { - auto chan = Channel::build(ctx->shared_from_this(), _name); - - auto op = std::make_shared(Operation::Get, chan); - op->setDone(std::move(_result), std::move(_onInit)); - op->autoExec = _autoexec; - op->pvRequest = _buildReq(); - - chan->pending.push_back(op); - chan->createOperations(); - - gpr_cleanup(ret, std::move(op)); - assert(ret); - }); - - return ret; + return gpr_setup(ctx->shared_from_this(), _name, op); } std::shared_ptr PutBuilder::exec() @@ -580,41 +576,27 @@ std::shared_ptr PutBuilder::exec() if(!ctx) throw std::logic_error("NULL Builder"); - std::shared_ptr ret; + auto op(std::make_shared(Operation::Put, ctx->tcp_loop)); + op->setDone(std::move(_result), std::move(_onInit)); - if(!_builder && !_args) - throw std::logic_error("put() needs either a .build() or at least one .set()"); + if(_builder) { + op->builder = std::move(_builder); + } else if(_args) { + // PRBase builder doesn't use current value + _doGet = false; - ctx->tcp_loop.call([&ret, this]() { - auto chan = Channel::build(ctx->shared_from_this(), _name); + auto build = std::move(_args); + op->builder = [build](Value&& prototype) -> Value { + return build->build(std::move(prototype)); + }; + } else { + // handled above + } + op->getOput = _doGet; + op->autoExec = _autoexec; + op->pvRequest = _buildReq(); - auto op = std::make_shared(Operation::Put, chan); - op->setDone(std::move(_result), std::move(_onInit)); - - if(_builder) { - op->builder = std::move(_builder); - } else if(_args) { - // PRBase builder doesn't use current value - _doGet = false; - - auto build = std::move(_args); - op->builder = [build](Value&& prototype) -> Value { - return build->build(std::move(prototype)); - }; - } else { - // handled above - } - op->getOput = _doGet; - op->autoExec = _autoexec; - op->pvRequest = _buildReq(); - - chan->pending.push_back(op); - chan->createOperations(); - - gpr_cleanup(ret, std::move(op)); - }); - - return ret; + return gpr_setup(ctx->shared_from_this(), _name, op); } std::shared_ptr RPCBuilder::exec() @@ -622,34 +604,20 @@ std::shared_ptr RPCBuilder::exec() if(!ctx) throw std::logic_error("NULL Builder"); - std::shared_ptr ret; + auto op(std::make_shared(Operation::RPC, ctx->tcp_loop)); + op->setDone(std::move(_result), std::move(_onInit)); + if(_argument) { + if(!_autoexec) + throw std::invalid_argument("Pass RPC argument during reExec()"); + op->arg = std::move(_argument); + } else if(_args) { + op->arg = _args->uriArgs(); + op->arg["path"] = _name; + } + op->autoExec = _autoexec; + op->pvRequest = _buildReq(); - if(_args && _argument) - throw std::logic_error("Use of rpc() with argument and builder .arg() are mutually exclusive"); - - ctx->tcp_loop.call([&ret, this]() { - auto chan = Channel::build(ctx->shared_from_this(), _name); - - auto op = std::make_shared(Operation::RPC, chan); - op->setDone(std::move(_result), std::move(_onInit)); - if(_argument) { - if(!_autoexec) - throw std::invalid_argument("Pass RPC argument during reExec()"); - op->arg = std::move(_argument); - } else if(_args) { - op->arg = _args->uriArgs(); - op->arg["path"] = _name; - } - op->autoExec = _autoexec; - op->pvRequest = _buildReq(); - - chan->pending.push_back(op); - chan->createOperations(); - - gpr_cleanup(ret, std::move(op)); - }); - - return ret; + return gpr_setup(ctx->shared_from_this(), _name, op); } } // namespace client diff --git a/src/clientimpl.h b/src/clientimpl.h index de542e5..eff8d6a 100644 --- a/src/clientimpl.h +++ b/src/clientimpl.h @@ -42,13 +42,15 @@ struct ResultWaiter { // internal actions on an Operation struct OperationBase : public Operation { - const std::shared_ptr chan; + const evbase loop; + // remaining members only accessibly from loop worker + std::shared_ptr chan; uint32_t ioid = 0; Value result; bool done = false; std::shared_ptr waiter; - OperationBase(operation_t op, const std::shared_ptr& chan); + OperationBase(operation_t op, const evbase& loop); virtual ~OperationBase(); virtual void createOp() =0; diff --git a/src/clientintrospect.cpp b/src/clientintrospect.cpp index 81a4c48..ee61cd5 100644 --- a/src/clientintrospect.cpp +++ b/src/clientintrospect.cpp @@ -30,21 +30,20 @@ struct InfoOp : public OperationBase INST_COUNTER(InfoOp); - explicit InfoOp(const std::shared_ptr& chan) - :OperationBase(Info, chan) + explicit InfoOp(const evbase& loop) + :OperationBase(Info, loop) {} virtual ~InfoOp() { - chan->context->tcp_loop.assertInLoop(); + loop.assertInLoop(); _cancel(true); } virtual bool cancel() override final { - auto context = chan->context; decltype (done) junk; bool ret = false; - context->tcp_loop.call([this, &junk, &ret](){ + loop.call([this, &junk, &ret](){ ret = _cancel(false); junk = std::move(done); // leave opByIOID for GC @@ -178,47 +177,44 @@ std::shared_ptr GetBuilder::_exec_info() if(!ctx) throw std::logic_error("NULL Builder"); - std::shared_ptr ret; + auto op(std::make_shared(ctx->tcp_loop)); + if(_result) { + op->done = std::move(_result); + } else { + auto waiter = op->waiter = std::make_shared(); + op->done = [waiter](Result&& result) { + waiter->complete(std::move(result), false); + }; + } - assert(!_get); + std::shared_ptr external(op.get(), [op](InfoOp*) mutable { + // from user thread + auto loop(op->loop); + // std::bind for lack of c++14 generalized capture + // to move internal ref to worker for dtor + loop.call(std::bind([](std::shared_ptr& op) { + // on worker - ctx->tcp_loop.call([&ret, this]() { - auto chan = Channel::build(ctx->shared_from_this(), _name); - - auto op = std::make_shared(chan); - - if(_result) { - op->done = std::move(_result); - } else { - auto waiter = op->waiter = std::make_shared(); - op->done = [waiter](Result&& result) { - waiter->complete(std::move(result), false); - }; - } - - chan->pending.push_back(op); - chan->createOperations(); - - auto loop(op->chan->context->tcp_loop); - ret.reset(op.get(), [op, loop](Operation*) mutable { - // on user thread - auto temp(std::move(op)); - auto L(std::move(loop)); - L.call([&temp]() { - // on worker - try { - temp->_cancel(true); - }catch(std::exception& e){ - log_exc_printf(setup, "Channel %s error in info cancel(): %s", - temp->chan->name.c_str(), e.what()); - } - // ensure dtor on worker - temp.reset(); - }); - }); + // ordering of dispatch()/call() ensures creation before destruction + assert(op->chan); + op->_cancel(true); + }, std::move(op))); + assert(!op); + op.reset(); }); - return ret; + auto name(std::move(_name)); + auto context(ctx->shared_from_this()); + context->tcp_loop.dispatch([op, context, name]() { + // on worker + + op->chan = Channel::build(context, name); + + op->chan->pending.push_back(op); + op->chan->createOperations(); + }); + + return external; } } // namespace client diff --git a/src/clientmon.cpp b/src/clientmon.cpp index 2e6b2f1..0730ac7 100644 --- a/src/clientmon.cpp +++ b/src/clientmon.cpp @@ -33,8 +33,8 @@ struct Entry { struct SubscriptionImpl : public OperationBase, public Subscription { - // for use in log messages, event after cancel() - const std::string channelName; + // for use in log messages, even after cancel() + std::string channelName; evevent ackTick; @@ -48,7 +48,7 @@ struct SubscriptionImpl : public OperationBase, public Subscription bool maskConn = false, maskDiscon = true; uint32_t queueSize = 4u, ackAt=0u; - // only access from tcp_loop + // only access from loop enum state_t : uint8_t { Connecting, // waiting for an active Channel @@ -69,13 +69,12 @@ struct SubscriptionImpl : public OperationBase, public Subscription INST_COUNTER(SubscriptionImpl); - SubscriptionImpl(operation_t op, const std::shared_ptr& chan) - :OperationBase (op, chan) - ,channelName(chan->name) - ,ackTick(event_new(chan->context->tcp_loop.base, -1, EV_TIMEOUT, &tickAckS, this)) + SubscriptionImpl(const evbase& loop) + :OperationBase (Operation::Monitor, loop) + ,ackTick(event_new(loop.base, -1, EV_TIMEOUT, &tickAckS, this)) {} virtual ~SubscriptionImpl() { - chan->context->tcp_loop.assertInLoop(); + loop.assertInLoop(); _cancel(true); } @@ -102,7 +101,7 @@ struct SubscriptionImpl : public OperationBase, public Subscription virtual void pause(bool p) override final { - chan->context->tcp_loop.call([this, p](){ + loop.call([this, p](){ log_info_printf(io, "Server %s channel %s monitor %s\n", chan->conn ? chan->conn->peerName.c_str() : "", chan->name.c_str(), @@ -179,10 +178,9 @@ struct SubscriptionImpl : public OperationBase, public Subscription } virtual bool cancel() override final { - auto context = chan->context; decltype (event) junk; bool ret; - context->tcp_loop.call([this, &junk, &ret](){ + loop.call([this, &junk, &ret](){ ret = _cancel(false); junk = std::move(event); // leave opByIOID for GC @@ -551,84 +549,84 @@ std::shared_ptr MonitorBuilder::exec() if(!ctx) throw std::logic_error("NULL Builder"); - std::shared_ptr ret; + auto op(std::make_shared(ctx->tcp_loop)); + op->self = op; + op->event = std::move(_event); + op->onInit = std::move(_onInit); + op->pvRequest = _buildReq(); + op->maskConn = _maskConn; + op->maskDiscon = _maskDisconn; - ctx->tcp_loop.call([&ret, this]() { - auto chan = Channel::build(ctx->shared_from_this(), _name); + auto options = op->pvRequest["record._options"]; - auto op = std::make_shared(Operation::Monitor, chan); - op->self = op; - op->event = std::move(_event); - op->onInit = std::move(_onInit); - op->pvRequest = _buildReq(); - op->maskConn = _maskConn; - op->maskDiscon = _maskDisconn; - - auto options = op->pvRequest["record._options"]; - - options["queueSize"].as([&op](uint32_t Q) { - if(Q>1) - op->queueSize = Q; - }); - - (void)options["pipeline"].as(op->pipeline); - - auto ackAny = options["ackAny"]; - - if(ackAny.type()==TypeCode::String) { - auto sval = ackAny.as(); - if(sval.size()>1 && sval.back()=='%') { - try { - auto percent = parseTo(sval); - if(percent>0.0 && percent<=100.0) { - op->ackAt = uint32_t(percent * op->queueSize); - } else { - throw std::invalid_argument("not in range (0%, 100%]"); - } - }catch(std::exception&){ - log_warn_printf(monevt, "Error parsing as percent ackAny: \"%s\"\n", sval.c_str()); - } - } - - } - - if(op->ackAt==0u){ - uint32_t count=0u; - - if(ackAny.as(count)) { - op->ackAt = count; - } - } - - if(op->ackAt==0u){ - op->ackAt = op->queueSize/2u; - } - - op->ackAt = std::max(1u, std::min(op->ackAt, op->queueSize)); - - chan->pending.push_back(op); - chan->createOperations(); - - auto loop(op->chan->context->tcp_loop); - ret.reset(op.get(), [op, loop](Subscription*) mutable { - // on user thread - auto temp(std::move(op)); - auto L(std::move(loop)); - L.call([&temp]() { - // on worker - try { - temp->_cancel(true); - }catch(std::exception& e){ - log_exc_printf(monevt, "Channel %s error in monitor cancel(): %s", - temp->channelName.c_str(), e.what()); - } - // ensure dtor on worker - temp.reset(); - }); - }); + options["queueSize"].as([&op](uint32_t Q) { + if(Q>1) + op->queueSize = Q; }); - return ret; + (void)options["pipeline"].as(op->pipeline); + + auto ackAny = options["ackAny"]; + + if(ackAny.type()==TypeCode::String) { + auto sval = ackAny.as(); + if(sval.size()>1 && sval.back()=='%') { + try { + auto percent = parseTo(sval); + if(percent>0.0 && percent<=100.0) { + op->ackAt = uint32_t(percent * op->queueSize); + } else { + throw std::invalid_argument("not in range (0%, 100%]"); + } + }catch(std::exception&){ + log_warn_printf(monevt, "Error parsing as percent ackAny: \"%s\"\n", sval.c_str()); + } + } + + } + + if(op->ackAt==0u){ + uint32_t count=0u; + + if(ackAny.as(count)) { + op->ackAt = count; + } + } + + if(op->ackAt==0u){ + op->ackAt = op->queueSize/2u; + } + + op->ackAt = std::max(1u, std::min(op->ackAt, op->queueSize)); + + std::shared_ptr external(op.get(), [op](SubscriptionImpl*) mutable { + // from user thread + auto loop(op->loop); + // std::bind for lack of c++14 generalized capture + // to move internal ref to worker for dtor + loop.call(std::bind([](std::shared_ptr& op) { + // on worker + + // ordering of dispatch()/call() ensures creation before destruction + assert(op->chan); + op->_cancel(true); + }, std::move(op))); + assert(!op); + op.reset(); + }); + + auto name(std::move(_name)); + auto context(ctx->shared_from_this()); + context->tcp_loop.dispatch([op, context, name]() { + // on worker + + op->chan = Channel::build(context, name); + + op->chan->pending.push_back(op); + op->chan->createOperations(); + }); + + return external; } } // namespace client