/** * Copyright - See the COPYRIGHT that is included with this distribution. * pvxs is distributed subject to a Software License Agreement found * in file LICENSE that is included with this distribution. */ #define PVXS_ENABLE_EXPERT_API #include #include #include #include #include #include #include #include #include #include #include #include "evhelper.h" namespace { using namespace pvxs; struct Tester { Value initial; server::SharedPV mbox; server::Server serv; client::Context cli; Tester(int family=AF_INET) :initial(nt::NTScalar{TypeCode::Int32}.create()) ,mbox(server::SharedPV::buildReadonly()) ,serv(server::Config::isolated(family) .build() .addPV("mailbox", mbox)) ,cli(serv.clientConfig().build()) { testShow()<<"Server:\n"<1u) testAbort("Tester Context leak: %u", unsigned(cli.use_count())); } void testConnector() { testShow()<<__func__; mbox.open(initial); serv.start(); struct info { epicsEvent evt; std::atomic current{false}; std::atomic connd{0}, discd{0}; bool wait(bool state, double timeout) { while(current!=state) { if(!evt.wait(timeout)) return false; } return true; } } evt, evt2, evt3; auto setup = [this](info& i) -> std::shared_ptr { return cli.connect("mailbox") .onConnect([&i]() { i.current = true; i.connd++; testDiag("onConnect %p %zu", &i, i.connd.load()); i.evt.signal(); }) .onDisconnect([&i]() { i.current = false; i.discd++; testDiag("onDisconnect %p %zu", &i, i.discd.load()); i.evt.signal(); }) .exec(); }; auto ctor(setup(evt)); // ensure de-dup auto ctor2(setup(evt2)); testTrue(evt.wait(true, 5.0))<<" Wait for Connect 1"; testEq(evt.discd, 1u); // initially disconnected testEq(evt.connd, 1u); testTrue(evt2.wait(true, 5.0))<<" Wait for Connect 2"; // evt2 may not see the initial "fake" disconnected event if the channel has already connected testOk(evt2.discd<=1u, "second event #discd=%zu", evt2.discd.load()); // initially disconnected testEq(evt2.connd, 1u); // ensure de-dup of connected auto ctor3(setup(evt3)); testTrue(evt3.wait(true, 5.0))<<" Wait for Connect 3"; testEq(evt3.discd, 0u); // initially connected testEq(evt3.connd, 1u); testTrue(ctor->connected()); testTrue(ctor2->connected()); testTrue(ctor3->connected()); // generate some traffic on the channel (void)cli.get("mailbox").exec()->wait(1.0); auto sreport(serv.report()); auto creport(cli.report()); testDiag("Stop server"); serv.stop(); testTrue(evt.wait(false, 5.0))<<" Wait for Disconnect 1"; testEq(evt.discd, 2u); testEq(evt.connd, 1u); testFalse(ctor->connected()); auto checkReport = [](const impl::Report& report) { if(testEq(report.connections.size(), 1u)) { auto& conn = report.connections.front(); testNotEq(conn.tx, 0u); testNotEq(conn.rx, 0u); if(testEq(conn.channels.size(), 1u)) { auto& chan = conn.channels.front(); testNotEq(chan.tx, 0u); testNotEq(chan.rx, 0u); testEq(chan.name, "mailbox"); } } }; checkReport(sreport); checkReport(creport); } void testWaiter() { testShow()<<__func__; mbox.open(initial); serv.start(); std::atomic hadInit{false}; auto op = cli.get("mailbox") .onInit([&hadInit](const Value& prototype) { testShow()<<"onInit() << "<wait(5.0); testEq(result["value"].as(), 42); testTrue(hadInit.load()); } void testWait() { client::Result actual; epicsEvent done; auto op = cli.get("mailbox") .result([&actual, &done](client::Result&& result) { actual = std::move(result); done.signal(); }) .exec(); cli.hurryUp(); if(testOk1(done.wait(5.0))) { testEq(actual()["value"].as(), 42); } else { testSkip(1, "timeout"); } op.reset(); cli.cacheClear(); } void loopback() { testShow()<<__func__; mbox.open(initial); serv.start(); testWait(); } void lazy() { testShow()<<__func__; std::atomic onFC{false}, onLD{false}; epicsEvent done; mbox.onFirstConnect([this, &onFC](server::SharedPV&){ testShow()<<"In onFirstConnect()"; mbox.open(initial); onFC.store(true); }); mbox.onLastDisconnect([this, &onLD, &done](server::SharedPV&){ testShow()<<"In onLastDisconnect"; mbox.close(); onLD.store(true); done.signal(); }); serv.start(); testWait(); testOk1(done.wait(5.0)); serv.stop(); testOk1(!mbox.isOpen()); testOk1(!!onFC.load()); testOk1(!!onLD.load()); } void timeout() { testShow()<<__func__; client::Result actual; epicsEvent done; // server not started auto op = cli.info("mailbox") .result([&actual, &done](client::Result&& result) { actual = std::move(result); done.signal(); }) .exec(); cli.hurryUp(); testOk1(!done.wait(1.1)); } void cancel() { testShow()<<__func__; client::Result actual; epicsEvent done; serv.start(); // not storing Operation -> immediate cancel() cli.info("mailbox") .result([&actual, &done](client::Result&& result) { actual = std::move(result); done.signal(); }) .exec(); cli.hurryUp(); testOk1(!done.wait(2.1)); } void asyncCancel() { testShow()<<__func__; struct info_t { client::Result actual; epicsEvent done; }; auto info(std::make_shared()); serv.start(); // not storing Operation -> immediate cancel() cli.info("mailbox") .syncCancel(false) .result([info](client::Result&& result) { info->actual = std::move(result); info->done.signal(); }) .exec(); cli.hurryUp(); testOk1(!info->done.wait(2.1)); cli.close(); } void orphan() { testShow()<<__func__; auto op = cli.get("nonexistent").exec(); // clear Context to orphan in-progress operation cli = client::Context(); op.reset(); } void name() { testShow()<<__func__; auto op = cli.get("nonexistent").exec(); auto op_name = op->name(); testEq(op_name, "nonexistent"); } void manualExec() { testShow()<<__func__; epicsEvent initd; epicsEvent done; mbox.open(initial); serv.start(); auto op = cli.get("mailbox") .autoExec(false) .onInit([&initd](const Value& prototype) { testDiag("onInit()"); initd.signal(); }) .result([&initd](client::Result&& result) { testFail("result() unexpected error prior to onInit()"); initd.signal(); }) .exec(); testOk1(initd.wait(5.0)); testDiag("reExec() 1"); op->reExecGet([&done](client::Result&& result) { testTrue(!!result()); testDiag("result() 1"); done.signal(); }); testOk1(done.wait(5.0)); testDiag("reExec() 2"); op->reExecGet([&done](client::Result&& result) { testTrue(!!result()); testDiag("result() 2"); done.signal(); }); testOk1(done.wait(5.0)); serv.stop(); serv.start(); // TODO: should reExec* while disconnected be queued? testOk1(initd.wait(5.0)); testDiag("reExec() 3"); op->reExecGet([&done](client::Result&& result) { testTrue(!!result()); testDiag("result() 3"); done.signal(); }); testOk1(done.wait(5.0)); } void badRequest() { testShow()<<__func__; mbox.open(initial); serv.start(); auto op = cli.get("mailbox") .field("invalid") .exec(); testThrowsMatch("pvRequest must select at least one field", [&op]() { testShow()<wait(4.0); })<<" pvRequest selects no fields"; } void delayExec() { testShow()<<__func__; // ref'd by both put and timer functors auto done(std::make_shared()); // ref'd by put functor auto slowdown(std::make_shared()); mbox.onPut([this, done, slowdown](server::SharedPV& pv, std::unique_ptr&& rawop, Value&& rawval) { // on server worker std::shared_ptr op(std::move(rawop)); auto val(std::move(rawval)); testPass("In onPut"); *slowdown = op->timerOneShot(0.01, [](){ testFail("I should not run."); }); testTrue(slowdown->cancel()); *slowdown = op->timerOneShot(0.01, [this, done, op, val](){ testPass("I should run"); done->signal(); mbox.post(val); op->reply(); }); // op->reply() from timer }); mbox.open(initial); serv.start(); auto op = cli.put("mailbox") .set("value", 42) .exec()->wait(5.0); } void ordering() { testShow()<<__func__; auto src2(server::StaticSource::build()); auto mbox2(server::SharedPV::buildMailbox()); src2.add("mailbox", mbox2); serv.addSource("other", src2.source(), -50); auto other = initial["value"].as()+1; mbox.open(initial); mbox2.open(initial.cloneEmpty() .update("value", other)); serv.start(); auto val = cli.get("mailbox").exec()->wait(5.0); testEq(val["value"].as(), other); } }; struct ErrorSource : public server::Source { const bool phase = false; const Value type; explicit ErrorSource(bool phase) :phase(phase) ,type(nt::NTScalar{TypeCode::Int32}.create()) {} virtual void onSearch(Search &op) override final { for(auto& name : op) { name.claim(); } } virtual void onCreate(std::unique_ptr &&op) override final { auto chan = std::move(op); chan->onOp([this](std::unique_ptr&& op) { if(!phase) { op->error("haha"); return; } op->onGet([](std::unique_ptr&& op) { op->error("nice try"); }); op->connect(type); }); } }; void testError(bool phase) { testShow()<<__func__<<" phase="<(phase)) .start(); auto cli = serv.clientConfig().build(); client::Result actual; epicsEvent done; auto op = cli.get("mailbox") .result([&actual, &done](client::Result&& result) { actual = std::move(result); done.signal(); }) .exec(); cli.hurryUp(); if(testOk1(done.wait(5.0))) { testThrows([&actual]() { auto val = actual(); testShow()<<"unexpected result\n"< closecount{0u}; explicit DisconnSource() :type(nt::NTScalar{TypeCode::Int32}.create()) {} virtual void onSearch(Search &op) override final { for(auto& name : op) { name.claim(); } } virtual void onCreate(std::unique_ptr &&op) override final { std::shared_ptr chan(std::move(op)); chan->onOp([this, chan](std::unique_ptr&& op) { op->onGet([this, chan](std::unique_ptr&& op) { testDiag("Force close!"); closecount++; chan->close(); }); op->connect(type); }); } }; void testServerClose() { testShow()<<__func__; auto source(std::make_shared()); auto serv = server::Config::isolated() .build() .addSource("disconn", source) .start(); auto cli = serv.clientConfig().build(); epicsEvent econn, edisconn; auto connop = cli.connect("mailbox") .onConnect([&](){ testDiag("Connect"); econn.signal(); }) .onDisconnect([&](){ testDiag("Disconn"); edisconn.signal(); }) .exec(); testTrue(edisconn.wait(5.0))<<" Initially Disconnected"; testTrue(econn.wait(5.0))<<" Connected"; auto op = cli.get("mailbox") .exec(); testTrue(edisconn.wait(5.0))<<" Disconnected"; testEq(source->closecount, 1u); } } // namespace MAIN(testget) { testPlan(67); testSetup(); logger_config_env(); const bool canIPv6 = pvxs::impl::evsocket::canIPv6; Tester().testConnector(); Tester().testWaiter(); Tester(AF_INET).loopback(); if(canIPv6) { Tester(AF_INET6).loopback(); } else { testSkip(2, "No IPv6 Support"); } Tester().lazy(); Tester().timeout(); Tester().cancel(); Tester().asyncCancel(); Tester().orphan(); Tester().name(); Tester().manualExec(); Tester().badRequest(); Tester().delayExec(); Tester().ordering(); testError(false); testError(true); testServerClose(); cleanup_for_valgrind(); return testDone(); }