Files
pvxs/test/testmonpipe.cpp
T
Michael Davidsaver 1e9a22f9a4 test ackAny
2026-02-27 08:16:23 +00:00

172 lines
4.8 KiB
C++

/**
* Copyright - See the COPYRIGHT that is included with this distribution.
* pvxs is distributed subject to a Software License Agreement found
* in file LICENSE that is included with this distribution.
*/
#include <algorithm>
#include <string.h>
#include <testMain.h>
#include <epicsUnitTest.h>
#include <epicsEvent.h>
#include <pvxs/unittest.h>
#include <pvxs/log.h>
#include <pvxs/client.h>
#include <pvxs/server.h>
#include <pvxs/sharedpv.h>
#include <pvxs/source.h>
#include <pvxs/nt.h>
namespace {
using namespace pvxs;
struct Spammer : public server::Source {
Value prototype;
Spammer()
:prototype(nt::NTScalar{TypeCode::UInt16}.create())
{}
virtual void onSearch(Search &op) override final {
for(auto& pv : op) {
if(strcmp(pv.name(), "spam")==0)
pv.claim();
}
}
virtual void onCreate(std::unique_ptr<server::ChannelControl> &&rop) override final {
if(rop->name()!="spam")
return;
auto op(std::move(rop));
auto ptype(prototype);
op->onSubscribe([ptype](std::unique_ptr<server::MonitorSetupOp>&& mop) {
uint32_t highMark = 0u;
mop->pvRequest()["record._options.highMark"].as(highMark);
uint16_t lastVal = 10u;
mop->pvRequest()["record._options.lastVal"].as(lastVal);
struct SpamCounter {
std::unique_ptr<server::MonitorControlOp> mctrl;
Value prototype;
uint16_t nextCnt = 0u;
uint16_t lastVal;
void push() {
testDiag("Wakeup");
// assume there is at least one free slot in the queue
while(nextCnt < lastVal) {
testDiag("Push %u", unsigned(nextCnt));
auto next(prototype.cloneEmpty());
next["value"] = nextCnt++;
if(mctrl->tryPost(next)) {
// There are more empty slots
} else {
// queue is now (over)full
break;
}
}
if(nextCnt == lastVal) {
mctrl->finish();
testDiag("finish()");
nextCnt++;
} else if(nextCnt > lastVal) {
testTrue(false)<<" Excessive wakeups "<<nextCnt<<" / "<<lastVal;
}
}
};
auto counter(std::make_shared<SpamCounter>());
counter->prototype = ptype;
counter->lastVal = lastVal;
counter->mctrl = mop->connect(ptype);
counter->mctrl->setWatermarks(0u, highMark);
counter->mctrl->onHighMark([counter](){ counter->push(); });
counter->push(); // initial fill
});
}
};
void testSpam(uint32_t nQueue, uint32_t highMark, uint16_t lastVal,
const std::string& ackAny="")
{
testShow()<<__func__<<" nQueue="<<nQueue<<" highMark="<<highMark<<" lastVal="<<lastVal;
auto src(std::make_shared<Spammer>());
auto srv(server::Config::isolated().build()
.addSource("dut", std::make_shared<Spammer>())
.start());
auto cli(srv.clientConfig().build());
epicsEvent wait;
std::shared_ptr<client::Subscription> mon;
{
auto b(cli.monitor("spam")
.record("highMark", highMark)
.record("queueSize", nQueue)
.record("lastVal", lastVal)
.record("pipeline", true)
.maskConnected(true)
.maskDisconnected(true)
.event([&wait](client::Subscription&){
wait.signal();
}));
if(!ackAny.empty()) {
b.record("ackAny", ackAny);
}
mon = b.exec();
}
uint16_t expected = 0u;
while(true) {
try {
if(auto val = mon->pop()) {
testEq(val["value"].as<uint16_t>(), expected++);
} else {
if(!wait.wait(5.0)) {
testFail("client timeout");
break;
}
}
}catch(client::Finished&){
testPass("Finished");
break;
}
}
testEq(expected, lastVal)<<" after Finish";
}
} // namespace
MAIN(testmonpipe)
{
testPlan(111);
testSetup();
logger_config_env();
testSpam(3u, 0u, 7u);
testSpam(2u, 0u, 5u);
testSpam(4u, 0u, 9u);
testSpam(4u, 0u, 10u);
testSpam(4u, 1u, 10u);
testSpam(4u, 2u, 10u);
testSpam(4u, 3u, 10u);
testSpam(4u, 4u, 10u);
testSpam(4u, 6u, 10u);
testSpam(4u, 6u, 10u, "50%");
logger_config_env();
cleanup_for_valgrind();
return testDone();
}