diff --git a/src/client.cpp b/src/client.cpp index e0884af..f89eb7c 100644 --- a/src/client.cpp +++ b/src/client.cpp @@ -483,7 +483,11 @@ Value buildCAMethod() ContextImpl::ContextImpl(const Config& conf, const evbase& tcp_loop) :ifmap(IfaceMap::instance()) - ,effective(conf) + ,effective([conf]() -> Config{ + Config eff(conf); + eff.expand(); + return eff; + }()) ,caMethod(buildCAMethod()) ,searchTx4(AF_INET, SOCK_DGRAM, 0) ,searchTx6(AF_INET6, SOCK_DGRAM, 0) @@ -494,7 +498,7 @@ ContextImpl::ContextImpl(const Config& conf, const evbase& tcp_loop) event_new(tcp_loop.base, searchTx6.sock, EV_READ|EV_PERSIST, &ContextImpl::onSearchS, this)) ,searchTimer(__FILE__, __LINE__, event_new(tcp_loop.base, -1, EV_TIMEOUT, &ContextImpl::tickSearchS, this)) - ,manager(UDPManager::instance()) + ,manager(UDPManager::instance(effective.shareUDP())) ,beaconCleaner(__FILE__, __LINE__, event_new(manager.loop().base, -1, EV_TIMEOUT|EV_PERSIST, &ContextImpl::tickBeaconCleanS, this)) ,cacheCleaner(__FILE__, __LINE__, @@ -502,8 +506,6 @@ ContextImpl::ContextImpl(const Config& conf, const evbase& tcp_loop) ,nsChecker(__FILE__, __LINE__, event_new(tcp_loop.base, -1, EV_TIMEOUT|EV_PERSIST, &ContextImpl::onNSCheckS, this)) { - effective.expand(); - searchBuckets.resize(nBuckets); std::set bcasts; diff --git a/src/clientimpl.h b/src/clientimpl.h index 5f4d4a7..802fb99 100644 --- a/src/clientimpl.h +++ b/src/clientimpl.h @@ -254,8 +254,7 @@ struct ContextImpl : public std::enable_shared_from_this Stopped, } state = Init; - // "const" after ctor - Config effective; + const Config effective; const Value caMethod; diff --git a/src/pvxs/client.h b/src/pvxs/client.h index 981333b..c449fac 100644 --- a/src/pvxs/client.h +++ b/src/pvxs/client.h @@ -1033,6 +1033,7 @@ struct PVXS_API Config { private: bool BE = EPICS_BYTE_ORDER==EPICS_ENDIAN_BIG; + bool UDP = true; public: // compat @@ -1073,6 +1074,8 @@ public: // for protocol compatibility testing inline Config& overrideSendBE(bool be) { BE = be; return *this; } inline bool sendBE() const { return BE; } + inline Config& overrideShareUDP(bool share) { UDP = share; return *this; } + inline bool shareUDP() const { return UDP; } #endif }; diff --git a/src/pvxs/server.h b/src/pvxs/server.h index bd20ec8..c113b70 100644 --- a/src/pvxs/server.h +++ b/src/pvxs/server.h @@ -176,6 +176,7 @@ struct PVXS_API Config { private: bool BE = EPICS_BYTE_ORDER==EPICS_ENDIAN_BIG; + bool UDP = true; public: // compat @@ -220,6 +221,8 @@ public: // for protocol compatibility testing inline Config& overrideSendBE(bool be) { BE = be; return *this; } inline bool sendBE() const { return BE; } + inline Config& overrideShareUDP(bool share) { UDP = share; return *this; } + inline bool shareUDP() const { return UDP; } #endif }; diff --git a/src/server.cpp b/src/server.cpp index 436863a..57bda08 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -390,7 +390,7 @@ Server::Pvt::Pvt(const Config &conf) beaconSender4.set_broadcast(true); - auto manager = UDPManager::instance(); + auto manager = UDPManager::instance(effective.shareUDP()); evsocket dummy(AF_INET, SOCK_DGRAM, 0); diff --git a/src/udp_collector.cpp b/src/udp_collector.cpp index 3f8b0d4..f0fb331 100644 --- a/src/udp_collector.cpp +++ b/src/udp_collector.cpp @@ -524,18 +524,22 @@ void collector_init(void *unused) } } // namespace -UDPManager UDPManager::instance() +UDPManager UDPManager::instance(bool share) { threadOnce(&collector_once, &collector_init, nullptr); assert(udp_gbl); Guard G(udp_gbl->lock); - auto ret = udp_gbl->inst.lock(); + std::shared_ptr ret; + + if(share) + ret = udp_gbl->inst.lock(); if(!ret) { ret.reset(new UDPManager::Pvt); - udp_gbl->inst = ret; + if(share) + udp_gbl->inst = ret; } return UDPManager(ret); diff --git a/src/udp_collector.h b/src/udp_collector.h index 8958e47..16c0d35 100644 --- a/src/udp_collector.h +++ b/src/udp_collector.h @@ -28,7 +28,7 @@ struct PVXS_API UDPManager SockAttach attach; //! get process-wide singleton. - static UDPManager instance(); + static UDPManager instance(bool share=true); static void cleanup(); ~UDPManager(); @@ -81,7 +81,7 @@ struct PVXS_API UDPManager explicit operator bool() const { return !!pvt; } - UDPManager(); + UDPManager() = default; struct Pvt; private: diff --git a/test/Makefile b/test/Makefile index 94eedb6..b802dc1 100644 --- a/test/Makefile +++ b/test/Makefile @@ -98,6 +98,10 @@ TESTPROD_HOST += testendian testendian_SRCS += testendian.cpp TESTS += testendian +TESTPROD_HOST += testudpfwd +testudpfwd_SRCS += testudpfwd.cpp +TESTS += testudpfwd + ifdef BASE_7_0 TESTPROD_HOST += benchdata diff --git a/test/testudpfwd.cpp b/test/testudpfwd.cpp new file mode 100644 index 0000000..7f49425 --- /dev/null +++ b/test/testudpfwd.cpp @@ -0,0 +1,145 @@ +/** + * Copyright - See the COPYRIGHT that is included with this distribution. + * pvxs is distributed subject to a Software License Agreement found + * in file LICENSE that is included with this distribution. + */ + +#define PVXS_ENABLE_EXPERT_API + +#include +#include +#include + +#include +#include +#include +#include +#include +#include "evhelper.h" + + +using namespace pvxs; + +namespace { + +bool testFwdVia(const server::Config& base, const SockAddr& ifaddr) +{ + bool ok = true; + testDiag("In %s(%s)", __func__, ifaddr.tostring().c_str()); + + auto pv(server::SharedPV::buildMailbox()); + pv.open(nt::NTScalar{TypeCode::UInt32}.create().update("value", 42u)); + + server::Server srv1, srv2, srv3; + { + auto sconf = base; + sconf.overrideShareUDP(false); + // unicast through one interface + sconf.tcp_port = sconf.udp_port = 0; + if(ifaddr.family()!=AF_UNSPEC) + sconf.interfaces.push_back(ifaddr.tostring()); + sconf.auto_beacon = false; + + srv1 = sconf.build(); + + sconf = srv1.config(); + sconf.overrideShareUDP(false); + + srv2 = sconf.build(); + + // bind to wildcard + sconf.interfaces.clear(); + /* BSD and MS IP stacks allow two TCP sockets bound to the same port + * one to wildcard and one to an interface address. This leaves one + * of the two sometimes unreachable. + * Explicitly select random port as workaround. + */ + sconf.tcp_port = 0; + srv3 = sconf.build(); + } + + auto tcp1 = srv1.config().tcp_port; + auto tcp2 = srv2.config().tcp_port; + auto tcp3 = srv3.config().tcp_port; + if(tcp1==tcp2 || tcp1==tcp3 || tcp2==tcp3) { + testFail("Server bind() conflict %d, %d, %d", tcp1, tcp2, tcp3); + } + + srv1.addPV("testpv1", pv); + srv2.addPV("testpv2", pv); + srv3.addPV("testpv3", pv); + + srv1.start(); + srv2.start(); + srv3.start(); + + auto cli(srv1.clientConfig().build()); + /* There are now 4x UDP sockets listening. Only one will receive unicast search. + * Which one is OS dependent. With Linux the last (cli), with Windows the first (srv1). + */ + + const auto doGet = [&cli](const char* pvname) -> bool { + try { + auto result = cli.get(pvname).exec()->wait(5.0); + testDiag("Success %s %u", pvname, (unsigned)result["value"].as()); + return true; + } catch (client::Timeout&) { + testDiag("Timeout %s", pvname); + return false; + } + }; + + ok &= doGet("testpv1"); + ok &= doGet("testpv2"); + ok &= doGet("testpv3"); + return ok; +} + +void testFwdIface() +{ + testDiag("In %s", __func__); + + std::vector ifaddrs; + { + auto& ifs(IfaceMap::instance()); + + epicsGuard G(ifs.lock); + + for(auto it : ifs.byIndex) { + auto& iface = it.second; + if(iface.isLO) + continue; + + for(auto it2 : iface.addrs) { + if(it2.first.family()!=AF_INET) + continue; // TODO: ipv6 link local addresses don't have scope set + ifaddrs.emplace_back(it2.first); + } + } + } + + bool ok = false; + for(auto& ifaddr : ifaddrs) { + ok |= testFwdVia(server::Config{}, ifaddr); + } + +#if defined(__rtems__) || defined(vxWorks) + testSkip(1, "local mcast unnecessary with a single OS process"); +#else + testOk(!!ok, "Succeeded via at least one interface"); +#endif +} + + +} // namespace + +MAIN(testudpfwd) +{ + SockAttach attach; + testPlan(1); + testSetup(); + pvxs::logger_config_env(); + testFwdIface(); + cleanup_for_valgrind(); + return testDone(); +}