/** * Copyright - See the COPYRIGHT that is included with this distribution. * pvxs is distributed subject to a Software License Agreement found * in file LICENSE that is included with this distribution. */ #include #include #include #include #include #include #include #include #include #include #include #include namespace { using namespace pvxs; struct Spammer : public server::Source { Value prototype; Spammer() :prototype(nt::NTScalar{TypeCode::UInt16}.create()) {} virtual void onSearch(Search &op) override final { for(auto& pv : op) { if(strcmp(pv.name(), "spam")==0) pv.claim(); } } virtual void onCreate(std::unique_ptr &&rop) override final { if(rop->name()!="spam") return; auto op(std::move(rop)); auto ptype(prototype); op->onSubscribe([ptype](std::unique_ptr&& mop) { uint32_t highMark = 0u; mop->pvRequest()["record._options.highMark"].as(highMark); uint16_t lastVal = 10u; mop->pvRequest()["record._options.lastVal"].as(lastVal); struct SpamCounter { std::unique_ptr mctrl; Value prototype; uint16_t nextCnt = 0u; uint16_t lastVal; void push() { testDiag("Wakeup"); // assume there is at least one free slot in the queue while(nextCnt < lastVal) { testDiag("Push %u", unsigned(nextCnt)); auto next(prototype.cloneEmpty()); next["value"] = nextCnt++; if(mctrl->tryPost(next)) { // There are more empty slots } else { // queue is now (over)full break; } } if(nextCnt == lastVal) { mctrl->finish(); testDiag("finish()"); nextCnt++; } else if(nextCnt > lastVal) { testTrue(false)<<" Excessive wakeups "<()); counter->prototype = ptype; counter->lastVal = lastVal; counter->mctrl = mop->connect(ptype); counter->mctrl->setWatermarks(0u, highMark); counter->mctrl->onHighMark([counter](){ counter->push(); }); counter->push(); // initial fill }); } }; void testSpam(uint32_t nQueue, uint32_t highMark, uint16_t lastVal) { testShow()<<__func__<<" nQueue="<()) .start()); auto cli(srv.clientConfig().build()); epicsEvent wait; auto mon(cli.monitor("spam") .record("highMark", highMark) .record("queueSize", nQueue) .record("lastVal", lastVal) .record("pipeline", true) .maskConnected(true) .maskDisconnected(true) .event([&wait](client::Subscription&){ wait.signal(); }) .exec()); uint16_t expected = 0u; while(true) { try { if(auto val = mon->pop()) { testEq(val["value"].as(), expected++); } else { if(!wait.wait(5.0)) { testFail("client timeout"); break; } } }catch(client::Finished&){ testPass("Finished"); break; } } testEq(expected, lastVal)<<" after Finish"; } } // namespace MAIN(testmonpipe) { testPlan(99); testSetup(); logger_config_env(); testSpam(3u, 0u, 7u); testSpam(2u, 0u, 5u); testSpam(4u, 0u, 9u); testSpam(4u, 0u, 10u); testSpam(4u, 1u, 10u); testSpam(4u, 2u, 10u); testSpam(4u, 3u, 10u); testSpam(4u, 4u, 10u); testSpam(4u, 6u, 10u); logger_config_env(); cleanup_for_valgrind(); return testDone(); }