fix monitor pipeline and finish()

This commit is contained in:
Michael Davidsaver
2023-05-10 13:32:24 -07:00
parent a4e974def9
commit a36dd2a9cc
5 changed files with 258 additions and 38 deletions
+17 -19
View File
@@ -170,10 +170,7 @@ struct SubscriptionImpl final : public OperationBase, public Subscription
if(pipeline) {
timeval tick{}; // immediate ACK
// schedule delayed ack while below threshold.
// avoid overhead of re-scheduling when unack in range [1, ackAt)
if(unack==0u && ackAt!=1u)
tick = timeval{1,0};
unack++;
if(!ackPending && unack>=ackAt) {
if(event_add(ackTick.get(), &tick)) {
@@ -184,13 +181,12 @@ struct SubscriptionImpl final : public OperationBase, public Subscription
ackPending = true;
}
}
unack++;
}
log_info_printf(monevt, "channel '%s' monitor pop() %s %u,%u\n",
channelName.c_str(),
ent.exc ? "exception" : ent.val ? "data" : "null!",
unsigned(window), unsigned(unack));
log_printf(monevt, ent.exc || ent.val ? Level::Info : Level::Err,
"channel '%s' monitor pop() %s %u,%u\n",
channelName.c_str(),
ent.exc ? "exception" : ent.val ? "data" : "null!",
unsigned(window), unsigned(unack));
if(ent.exc)
std::rethrow_exception(ent.exc);
@@ -722,20 +718,22 @@ void Connection::handle_MONITOR()
notify = mon->queue.empty();
if(update.exc || (mon->queue.size() < mon->queueSize) || mon->queue.back().exc) {
assert(mon->queueSize >= 1u);
if(update.val && mon->queue.size() >= mon->queueSize && mon->queue.back().val && !mon->pipeline) {
log_debug_printf(io, "Server %s channel %s monitor Squash\n",
peerName.c_str(),
mon->chan->name.c_str());
mon->queue.back().val.assign(update.val);
mon->nCliSquash++;
} else if(update.exc || update.val) {
log_debug_printf(io, "Server %s channel %s monitor PUSH\n",
peerName.c_str(),
mon->chan->name.c_str());
mon->queue.emplace_back(std::move(update));
} else if(update.val) {
log_debug_printf(io, "Server %s channel %s monitor Squash\n",
peerName.c_str(),
mon->chan->name.c_str());
mon->queue.back().val.assign(update.val);
mon->nCliSquash++;
}
if(final && !update.exc) {
@@ -812,7 +810,7 @@ std::shared_ptr<Subscription> MonitorBuilder::exec()
auto sval = ackAny.as<std::string>();
if(sval.size()>1 && sval.back()=='%') {
try {
auto percent = parseTo<double>(sval);
auto percent = parseTo<double>(sval.substr(0, sval.size()-1u));
if(percent>0.0 && percent<=100.0) {
op->ackAt = uint32_t(percent * op->queueSize);
} else {