client: relax exec() synchronization

dispatch() instead of call().
This commit is contained in:
Michael Davidsaver
2020-12-29 12:14:02 -08:00
parent 346b79d3b7
commit 89f9c54d62
5 changed files with 198 additions and 234 deletions
+83 -85
View File
@@ -33,8 +33,8 @@ struct Entry {
struct SubscriptionImpl : public OperationBase, public Subscription
{
// for use in log messages, event after cancel()
const std::string channelName;
// for use in log messages, even after cancel()
std::string channelName;
evevent ackTick;
@@ -48,7 +48,7 @@ struct SubscriptionImpl : public OperationBase, public Subscription
bool maskConn = false, maskDiscon = true;
uint32_t queueSize = 4u, ackAt=0u;
// only access from tcp_loop
// only access from loop
enum state_t : uint8_t {
Connecting, // waiting for an active Channel
@@ -69,13 +69,12 @@ struct SubscriptionImpl : public OperationBase, public Subscription
INST_COUNTER(SubscriptionImpl);
SubscriptionImpl(operation_t op, const std::shared_ptr<Channel>& chan)
:OperationBase (op, chan)
,channelName(chan->name)
,ackTick(event_new(chan->context->tcp_loop.base, -1, EV_TIMEOUT, &tickAckS, this))
SubscriptionImpl(const evbase& loop)
:OperationBase (Operation::Monitor, loop)
,ackTick(event_new(loop.base, -1, EV_TIMEOUT, &tickAckS, this))
{}
virtual ~SubscriptionImpl() {
chan->context->tcp_loop.assertInLoop();
loop.assertInLoop();
_cancel(true);
}
@@ -102,7 +101,7 @@ struct SubscriptionImpl : public OperationBase, public Subscription
virtual void pause(bool p) override final
{
chan->context->tcp_loop.call([this, p](){
loop.call([this, p](){
log_info_printf(io, "Server %s channel %s monitor %s\n",
chan->conn ? chan->conn->peerName.c_str() : "<disconnected>",
chan->name.c_str(),
@@ -179,10 +178,9 @@ struct SubscriptionImpl : public OperationBase, public Subscription
}
virtual bool cancel() override final {
auto context = chan->context;
decltype (event) junk;
bool ret;
context->tcp_loop.call([this, &junk, &ret](){
loop.call([this, &junk, &ret](){
ret = _cancel(false);
junk = std::move(event);
// leave opByIOID for GC
@@ -551,84 +549,84 @@ std::shared_ptr<Subscription> MonitorBuilder::exec()
if(!ctx)
throw std::logic_error("NULL Builder");
std::shared_ptr<Subscription> ret;
auto op(std::make_shared<SubscriptionImpl>(ctx->tcp_loop));
op->self = op;
op->event = std::move(_event);
op->onInit = std::move(_onInit);
op->pvRequest = _buildReq();
op->maskConn = _maskConn;
op->maskDiscon = _maskDisconn;
ctx->tcp_loop.call([&ret, this]() {
auto chan = Channel::build(ctx->shared_from_this(), _name);
auto options = op->pvRequest["record._options"];
auto op = std::make_shared<SubscriptionImpl>(Operation::Monitor, chan);
op->self = op;
op->event = std::move(_event);
op->onInit = std::move(_onInit);
op->pvRequest = _buildReq();
op->maskConn = _maskConn;
op->maskDiscon = _maskDisconn;
auto options = op->pvRequest["record._options"];
options["queueSize"].as<uint32_t>([&op](uint32_t Q) {
if(Q>1)
op->queueSize = Q;
});
(void)options["pipeline"].as(op->pipeline);
auto ackAny = options["ackAny"];
if(ackAny.type()==TypeCode::String) {
auto sval = ackAny.as<std::string>();
if(sval.size()>1 && sval.back()=='%') {
try {
auto percent = parseTo<double>(sval);
if(percent>0.0 && percent<=100.0) {
op->ackAt = uint32_t(percent * op->queueSize);
} else {
throw std::invalid_argument("not in range (0%, 100%]");
}
}catch(std::exception&){
log_warn_printf(monevt, "Error parsing as percent ackAny: \"%s\"\n", sval.c_str());
}
}
}
if(op->ackAt==0u){
uint32_t count=0u;
if(ackAny.as(count)) {
op->ackAt = count;
}
}
if(op->ackAt==0u){
op->ackAt = op->queueSize/2u;
}
op->ackAt = std::max(1u, std::min(op->ackAt, op->queueSize));
chan->pending.push_back(op);
chan->createOperations();
auto loop(op->chan->context->tcp_loop);
ret.reset(op.get(), [op, loop](Subscription*) mutable {
// on user thread
auto temp(std::move(op));
auto L(std::move(loop));
L.call([&temp]() {
// on worker
try {
temp->_cancel(true);
}catch(std::exception& e){
log_exc_printf(monevt, "Channel %s error in monitor cancel(): %s",
temp->channelName.c_str(), e.what());
}
// ensure dtor on worker
temp.reset();
});
});
options["queueSize"].as<uint32_t>([&op](uint32_t Q) {
if(Q>1)
op->queueSize = Q;
});
return ret;
(void)options["pipeline"].as(op->pipeline);
auto ackAny = options["ackAny"];
if(ackAny.type()==TypeCode::String) {
auto sval = ackAny.as<std::string>();
if(sval.size()>1 && sval.back()=='%') {
try {
auto percent = parseTo<double>(sval);
if(percent>0.0 && percent<=100.0) {
op->ackAt = uint32_t(percent * op->queueSize);
} else {
throw std::invalid_argument("not in range (0%, 100%]");
}
}catch(std::exception&){
log_warn_printf(monevt, "Error parsing as percent ackAny: \"%s\"\n", sval.c_str());
}
}
}
if(op->ackAt==0u){
uint32_t count=0u;
if(ackAny.as(count)) {
op->ackAt = count;
}
}
if(op->ackAt==0u){
op->ackAt = op->queueSize/2u;
}
op->ackAt = std::max(1u, std::min(op->ackAt, op->queueSize));
std::shared_ptr<SubscriptionImpl> external(op.get(), [op](SubscriptionImpl*) mutable {
// from user thread
auto loop(op->loop);
// std::bind for lack of c++14 generalized capture
// to move internal ref to worker for dtor
loop.call(std::bind([](std::shared_ptr<SubscriptionImpl>& op) {
// on worker
// ordering of dispatch()/call() ensures creation before destruction
assert(op->chan);
op->_cancel(true);
}, std::move(op)));
assert(!op);
op.reset();
});
auto name(std::move(_name));
auto context(ctx->shared_from_this());
context->tcp_loop.dispatch([op, context, name]() {
// on worker
op->chan = Channel::build(context, name);
op->chan->pending.push_back(op);
op->chan->createOperations();
});
return external;
}
} // namespace client