diff --git a/src/clientmon.cpp b/src/clientmon.cpp index 6d6d4ec..dd0f782 100644 --- a/src/clientmon.cpp +++ b/src/clientmon.cpp @@ -170,10 +170,7 @@ struct SubscriptionImpl final : public OperationBase, public Subscription if(pipeline) { timeval tick{}; // immediate ACK - // schedule delayed ack while below threshold. - // avoid overhead of re-scheduling when unack in range [1, ackAt) - if(unack==0u && ackAt!=1u) - tick = timeval{1,0}; + unack++; if(!ackPending && unack>=ackAt) { if(event_add(ackTick.get(), &tick)) { @@ -184,13 +181,12 @@ struct SubscriptionImpl final : public OperationBase, public Subscription ackPending = true; } } - - unack++; } - log_info_printf(monevt, "channel '%s' monitor pop() %s %u,%u\n", - channelName.c_str(), - ent.exc ? "exception" : ent.val ? "data" : "null!", - unsigned(window), unsigned(unack)); + log_printf(monevt, ent.exc || ent.val ? Level::Info : Level::Err, + "channel '%s' monitor pop() %s %u,%u\n", + channelName.c_str(), + ent.exc ? "exception" : ent.val ? "data" : "null!", + unsigned(window), unsigned(unack)); if(ent.exc) std::rethrow_exception(ent.exc); @@ -722,20 +718,22 @@ void Connection::handle_MONITOR() notify = mon->queue.empty(); - if(update.exc || (mon->queue.size() < mon->queueSize) || mon->queue.back().exc) { + assert(mon->queueSize >= 1u); + if(update.val && mon->queue.size() >= mon->queueSize && mon->queue.back().val && !mon->pipeline) { + log_debug_printf(io, "Server %s channel %s monitor Squash\n", + peerName.c_str(), + mon->chan->name.c_str()); + + mon->queue.back().val.assign(update.val); + mon->nCliSquash++; + + } else if(update.exc || update.val) { log_debug_printf(io, "Server %s channel %s monitor PUSH\n", peerName.c_str(), mon->chan->name.c_str()); mon->queue.emplace_back(std::move(update)); - } else if(update.val) { - log_debug_printf(io, "Server %s channel %s monitor Squash\n", - peerName.c_str(), - mon->chan->name.c_str()); - - mon->queue.back().val.assign(update.val); - mon->nCliSquash++; } if(final && !update.exc) { @@ -812,7 +810,7 @@ std::shared_ptr MonitorBuilder::exec() auto sval = ackAny.as(); if(sval.size()>1 && sval.back()=='%') { try { - auto percent = parseTo(sval); + auto percent = parseTo(sval.substr(0, sval.size()-1u)); if(percent>0.0 && percent<=100.0) { op->ackAt = uint32_t(percent * op->queueSize); } else { diff --git a/src/pvxs/source.h b/src/pvxs/source.h index 197214e..06f09e2 100644 --- a/src/pvxs/source.h +++ b/src/pvxs/source.h @@ -106,7 +106,7 @@ public: //! Signal to subscriber that this subscription will not yield any further events. //! This is not an error. Client should not retry. void finish() { - doPost(Value(), false, false); + doPost(Value(), false, true); } //! Poll information and statistics for this subscription. diff --git a/src/servermon.cpp b/src/servermon.cpp index ed1eae4..7a7c522 100644 --- a/src/servermon.cpp +++ b/src/servermon.cpp @@ -61,9 +61,11 @@ struct MonitorOp : public ServerOp, // is doReply() scheduled to run bool scheduled=false; bool pipeline=false; + // finish() called bool finished=false; size_t window=0u, limit=4u; size_t low=0u, high=0u; + size_t ackAt=1u; size_t maxQueue=0u; std::deque queue; @@ -96,6 +98,8 @@ struct MonitorOp : public ServerOp, }); op->scheduled = true; + } else { + log_debug_printf(connio, "Skip reply%s", "\n"); } } @@ -111,6 +115,8 @@ struct MonitorOp : public ServerOp, Guard G(lock); scheduled = false; + log_debug_printf(connio, "%s state=%d\n", __func__, state); + if(state==Dead) return; @@ -120,13 +126,16 @@ struct MonitorOp : public ServerOp, state = type ? Idle : Dead; } else if(state==Executing) { - if(queue.empty() || (pipeline && !window)) { + if(queue.empty() || (pipeline && !window && !finished)) { + log_debug_printf(connio, "Client %s IOID %u done reply\n", + conn->peerName.c_str(), unsigned(ioid)); return; // nothing to do } else if(!queue.front()) { - finished = true; subcmd = 0x10; state = Dead; + log_debug_printf(connio, "Client %s IOID %u finishes\n", + conn->peerName.c_str(), unsigned(ioid)); } } @@ -169,22 +178,26 @@ struct MonitorOp : public ServerOp, auto self(shared_from_this()); - if(state==Executing && pipeline) { - assert(window); // previously tested + if(state==Executing && pipeline && window) { window--; if(!lowMarkPending && window <= low && onLowMark) { lowMarkPending = true; conn->iface->server->acceptor_loop.dispatch([self]() { - self->lowMarkPending = false; - if(self->onLowMark) - self->onLowMark(); + decltype (self->onLowMark) fn; + { + Guard G(self->lock); + self->lowMarkPending = false; + fn = self->onLowMark; + } + if(fn) + fn(); }); } } - if(state==Executing && !queue.empty() && (!pipeline || window)) { + if(state==Executing && !queue.empty() && (!pipeline || window || finished)) { // reschedule myself assert(!scheduled); // we've been holding the lock, so this should not have changed @@ -227,9 +240,14 @@ struct ServerMonitorControl : public server::MonitorControlOp bool real = testmask(val, mon->pvMask); Guard G(mon->lock); - if(real) { + if(mon->finished) + throw std::logic_error("Already finish()'d"); // TODO fail soft + + if(real || !val) { if((mon->queue.size() < mon->limit) || force || !val) { + + mon->finished = !val; mon->queue.push_back(val); if(mon->maxQueue < mon->queue.size()) @@ -285,8 +303,9 @@ struct ServerMonitorControl : public server::MonitorControlOp serv->acceptor_loop.call([this, low, high](){ if(auto oper = op.lock()) { Guard G(oper->lock); - oper->low = low; - oper->high = high; + oper->low = std::min(low, oper->ackAt-1u); + oper->high = std::min(high, oper->ackAt-1u); + log_debug_printf(connsetup, "setWatermarks(%zu, %zu)", oper->low, oper->high); // TODO handle change of levels after start } }); @@ -468,6 +487,34 @@ void ServerConn::handle_MONITOR() if(op->limit < op->window) op->limit = op->window; + auto ackAny = pvRequest["record._options.ackAny"]; + if(ackAny.type()==TypeCode::String) { + auto sval = ackAny.as(); + if(sval.size()>1 && sval.back()=='%') { + try { + auto percent = parseTo(sval.substr(0, sval.size()-1u)); + op->ackAt = std::max(0.0, std::min(percent, 100.0)) * op->limit; + }catch(std::exception&){ + log_warn_printf(connio, "Error parsing as percent ackAny: \"%s\"\n", sval.c_str()); + } + } + + } + + if(op->ackAt==0u){ + uint32_t count=0u; + + if(ackAny.as(count)) { + op->ackAt = count; + } + } + + if(op->ackAt==0u){ + op->ackAt = op->limit/2u; + } + + op->ackAt = std::max(1u, std::min(op->ackAt, op->limit)); + std::unique_ptr ctrl(new ServerMonitorSetup(this, iface->server->internal_self, chan->name, pvRequest, op)); op->state = ServerOp::Creating; @@ -531,22 +578,30 @@ void ServerConn::handle_MONITOR() // pvAccessCPP won't accept ack and start/stop in the same message, // although it will accept destroy in any !INIT message. // We do accept ack+start/stop as there is no reason not to. - if(subcmd&0x80 && op->pipeline) { // ack + if((subcmd&0x80) && op->pipeline) { // ack Guard G(op->lock); - log_debug_printf(connio, "Client %s IOID %u acks %u, %u/%u\n", + log_debug_printf(connio, "Client %s IOID %u acks %u, %u/%u%s%s\n", peerName.c_str(), unsigned(ioid), unsigned(nack), - unsigned(op->window), unsigned(op->high)); + unsigned(op->window), unsigned(op->high), + op->highMarkPending ? " pend" : "", + op->finished ? " fin" : ""); op->window += nack; - if(!op->highMarkPending && op->window > op->high && op->onHighMark) { + if(!op->highMarkPending && op->window > op->high && op->onHighMark && !op->finished) { op->highMarkPending = true; iface->server->acceptor_loop.dispatch([op](){ - op->highMarkPending = false; - if(op->onHighMark) - op->onHighMark(); + decltype(op->onHighMark) fn; + { + Guard G(op->lock); + op->highMarkPending = false; + if(!op->finished) + fn = op->onHighMark; + } + if(fn) + fn(); }); } } diff --git a/test/Makefile b/test/Makefile index b728add..5190cab 100644 --- a/test/Makefile +++ b/test/Makefile @@ -82,6 +82,10 @@ TESTPROD_HOST += testmon testmon_SRCS += testmon.cpp TESTS += testmon +TESTPROD_HOST += testmonpipe +testmonpipe_SRCS += testmonpipe.cpp +TESTS += testmonpipe + TESTPROD_HOST += testput testput_SRCS += testput.cpp TESTS += testput diff --git a/test/testmonpipe.cpp b/test/testmonpipe.cpp new file mode 100644 index 0000000..7bf493c --- /dev/null +++ b/test/testmonpipe.cpp @@ -0,0 +1,163 @@ +/** + * Copyright - See the COPYRIGHT that is included with this distribution. + * pvxs is distributed subject to a Software License Agreement found + * in file LICENSE that is included with this distribution. + */ + +#include + +#include + +#include + +#include + +#include + +#include +#include +#include +#include +#include +#include +#include + +namespace { +using namespace pvxs; + +struct Spammer : public server::Source { + Value prototype; + + Spammer() + :prototype(nt::NTScalar{TypeCode::UInt16}.create()) + {} + + virtual void onSearch(Search &op) override final { + for(auto& pv : op) { + if(strcmp(pv.name(), "spam")==0) + pv.claim(); + } + } + + virtual void onCreate(std::unique_ptr &&rop) override final { + if(rop->name()!="spam") + return; + + auto op(std::move(rop)); + + auto ptype(prototype); + op->onSubscribe([ptype](std::unique_ptr&& mop) { + + uint32_t highMark = 0u; + mop->pvRequest()["record._options.highMark"].as(highMark); + + uint16_t lastVal = 10u; + mop->pvRequest()["record._options.lastVal"].as(lastVal); + + struct SpamCounter { + std::unique_ptr mctrl; + Value prototype; + uint16_t nextCnt = 0u; + uint16_t lastVal; + + void push() { + testDiag("Wakeup"); + // assume there is at least one free slot in the queue + while(nextCnt < lastVal) { + testDiag("Push %u", unsigned(nextCnt)); + auto next(prototype.cloneEmpty()); + next["value"] = nextCnt++; + if(mctrl->tryPost(next)) { + // There are more empty slots + } else { + // queue is now (over)full + break; + } + } + if(nextCnt == lastVal) { + mctrl->finish(); + testDiag("finish()"); + nextCnt++; + } else if(nextCnt > lastVal) { + testTrue(false)<<" Excessive wakeups "<()); + + counter->prototype = ptype; + counter->lastVal = lastVal; + counter->mctrl = mop->connect(ptype); + counter->mctrl->setWatermarks(0u, highMark); + + counter->mctrl->onHighMark([counter](){ counter->push(); }); + + counter->push(); // initial fill + }); + } +}; + +void testSpam(uint32_t nQueue, uint32_t highMark, uint16_t lastVal) +{ + testShow()<<__func__<<" nQueue="<()) + .start()); + + auto cli(srv.clientConfig().build()); + + epicsEvent wait; + auto mon(cli.monitor("spam") + .record("highMark", highMark) + .record("queueSize", nQueue) + .record("lastVal", lastVal) + .record("pipeline", true) + .maskConnected(true) + .maskDisconnected(true) + .event([&wait](client::Subscription&){ + wait.signal(); + }) + .exec()); + + uint16_t expected = 0u; + while(true) { + try { + if(auto val = mon->pop()) { + testEq(val["value"].as(), expected++); + } else { + if(!wait.wait(5.0)) { + testFail("client timeout"); + break; + } + } + }catch(client::Finished&){ + testPass("Finished"); + break; + } + } + testEq(expected, lastVal)<<" after Finish"; +} + +} // namespace + +MAIN(testmonpipe) +{ + testPlan(99); + testSetup(); + logger_config_env(); + testSpam(3u, 0u, 7u); + testSpam(2u, 0u, 5u); + testSpam(4u, 0u, 9u); + testSpam(4u, 0u, 10u); + testSpam(4u, 1u, 10u); + testSpam(4u, 2u, 10u); + testSpam(4u, 3u, 10u); + testSpam(4u, 4u, 10u); + testSpam(4u, 6u, 10u); + logger_config_env(); + cleanup_for_valgrind(); + return testDone(); +}