diff --git a/src/client.cpp b/src/client.cpp index 1bb3cd7..31bc702 100644 --- a/src/client.cpp +++ b/src/client.cpp @@ -36,6 +36,8 @@ constexpr timeval channelCacheCleanInterval{10,0}; constexpr timeval beaconCleanInterval{2*180, 0}; +constexpr timeval tcpNSCheckInterval{10, 0}; + Disconnect::Disconnect() :std::runtime_error("Disconnected") ,time(epicsTime::getCurrent()) @@ -289,7 +291,9 @@ Subscription::~Subscription() {} Context::Context(const Config& conf) :pvt(std::make_shared(conf)) -{} +{ + pvt->impl->startNS(); +} Context::~Context() {} @@ -381,6 +385,7 @@ ContextImpl::ContextImpl(const Config& conf, const evbase& tcp_loop) ,manager(UDPManager::instance()) ,beaconCleaner(event_new(manager.loop().base, -1, EV_TIMEOUT|EV_PERSIST, &ContextImpl::tickBeaconCleanS, this)) ,cacheCleaner(event_new(tcp_loop.base, -1, EV_TIMEOUT|EV_PERSIST, &ContextImpl::cacheCleanS, this)) + ,nsChecker(event_new(tcp_loop.base, -1, EV_TIMEOUT|EV_PERSIST, &ContextImpl::onNSCheckS, this)) { effective.expand(); @@ -429,6 +434,18 @@ ContextImpl::ContextImpl(const Config& conf, const evbase& tcp_loop) searchDest.emplace_back(saddr, isucast); } + for(auto& addr : effective.nameServers) { + SockAddr saddr(AF_INET); + try { + saddr.setAddress(addr.c_str(), 5075); + }catch(std::runtime_error& e) { + log_err_printf(setup, "%s Ignoring...\n", e.what()); + } + + log_info_printf(io, "Searching to TCP %s\n", saddr.tostring().c_str()); + nameServers.emplace_back(saddr, nullptr); + } + for(auto& iface : effective.interfaces) { SockAddr addr(AF_INET, iface.c_str(), effective.udp_port); log_info_printf(io, "Listening for beacons on %s\n", addr.tostring().c_str()); @@ -453,6 +470,25 @@ ContextImpl::ContextImpl(const Config& conf, const evbase& tcp_loop) ContextImpl::~ContextImpl() {} +void ContextImpl::startNS() +{ + if(nameServers.empty()) // vector size const after ctor, contents remain mutable + return; + + tcp_loop.call([this]() { + // start connections to name servers + for(auto& ns : nameServers) { + const auto& serv = ns.first; + connByAddr[serv] = ns.second = std::make_shared(shared_from_this(), serv); + ns.second->nameserver = true; + log_debug_printf(io, "Connecting to nameserver %s\n", ns.second->peerName.c_str()); + } + + if(event_add(nsChecker.get(), &tcpNSCheckInterval)) + log_err_printf(setup, "Error enabling TCP search reconnect timer\n%s", ""); + }); +} + void ContextImpl::close() { // terminate all active connections @@ -755,6 +791,7 @@ void ContextImpl::tickSearch() to_wire(M, uint32_t(0u)); to_wire(M, uint32_t(0u)); + auto pport = M.save(); to_wire(M, uint16_t(searchRxPort)); to_wire(M, uint8_t(1u)); @@ -849,6 +886,28 @@ void ContextImpl::tickSearch() pair.second ? "ucast" : "bcast"); } } + *pflags |= 0x80; // TCP search is always "unicast" + // TCP search replies should always come back on the same connection, + // so zero out the meaningless response port. + pport[0] = pport[1] = 0; + + for(auto& pair : nameServers) { + auto& serv = pair.second; + + if(!serv->ready || !serv->bev) + continue; + + auto tx = bufferevent_get_output(serv->bev.get()); + + // arbitrarily skip searching if TX buffer is too full + // TODO: configure limit? + if(evbuffer_get_length(tx) > 64*1024u) + continue; + + (void)evbuffer_add(tx, (char*)searchMsg.data(), consumed); + // fail silently, will retry + } + } if(event_add(searchTimer.get(), &bucketInterval)) @@ -896,6 +955,27 @@ void ContextImpl::tickBeaconCleanS(evutil_socket_t fd, short evt, void *raw) } } +void ContextImpl::onNSCheck() +{ + for(auto& ns : nameServers) { + if(ns.second && ns.second->bev) // connecting or connected + continue; + + connByAddr[ns.first] = ns.second = std::make_shared(shared_from_this(), ns.first); + ns.second->nameserver = true; + log_debug_printf(io, "Reconnecting nameserver %s\n", ns.second->peerName.c_str()); + } +} + +void ContextImpl::onNSCheckS(evutil_socket_t fd, short evt, void *raw) +{ + try { + static_cast(raw)->onNSCheck(); + }catch(std::exception& e){ + log_exc_printf(io, "Unhandled error in TCP nameserver timer callback: %s\n", e.what()); + } +} + void ContextImpl::cacheClean() { std::set trash; diff --git a/src/clientconn.cpp b/src/clientconn.cpp index 723701a..6022246 100644 --- a/src/clientconn.cpp +++ b/src/clientconn.cpp @@ -92,6 +92,7 @@ void Connection::sendDestroyRequest(uint32_t sid, uint32_t ioid) void Connection::bevEvent(short events) { ConnBase::bevEvent(events); + // called Connection::cleanup() if(bev && (events&BEV_EVENT_CONNECTED)) { log_debug_printf(io, "Connected to %s\n", peerName.c_str()); @@ -118,6 +119,8 @@ void Connection::cleanup() // (maybe) keep myself alive std::shared_ptr self; + ready = false; + context->connByAddr.erase(peerAddr); if(bev) @@ -274,6 +277,11 @@ void Connection::handle_CONNECTION_VALIDATED() ready = true; createChannels(); + + if(nameserver) { + log_info_printf(io, "(re)connected to nameserver %s\n", peerName.c_str()); + context->poke(true); + } } void Connection::handle_CREATE_CHANNEL() diff --git a/src/clientimpl.h b/src/clientimpl.h index a6be356..0030c2c 100644 --- a/src/clientimpl.h +++ b/src/clientimpl.h @@ -78,6 +78,7 @@ struct Connection : public ConnBase, public std::enable_shared_from_this> pending; @@ -236,6 +237,8 @@ struct ContextImpl : public std::enable_shared_from_this std::map> connByAddr; + std::vector>> nameServers; + evbase tcp_loop; const evevent searchRx; const evevent searchTimer; @@ -246,12 +249,15 @@ struct ContextImpl : public std::enable_shared_from_this const evevent beaconCleaner; const evevent cacheCleaner; + const evevent nsChecker; INST_COUNTER(ClientContextImpl); ContextImpl(const Config& conf, const evbase &tcp_loop); ~ContextImpl(); + void startNS(); + void close(); void poke(bool force); @@ -266,6 +272,8 @@ struct ContextImpl : public std::enable_shared_from_this static void tickBeaconCleanS(evutil_socket_t fd, short evt, void *raw); void cacheClean(); static void cacheCleanS(evutil_socket_t fd, short evt, void *raw); + void onNSCheck(); + static void onNSCheckS(evutil_socket_t fd, short evt, void *raw); }; struct Context::Pvt { diff --git a/src/config.cpp b/src/config.cpp index 03dfdc2..424d3f3 100644 --- a/src/config.cpp +++ b/src/config.cpp @@ -360,6 +360,10 @@ void _fromDefs(Config& self, const std::map& defs, boo split_addr_into(pickone.name.c_str(), self.addressList, pickone.val, self.udp_port); } + if(pickone({"EPICS_PVA_NAME_SERVERS"})) { + split_addr_into(pickone.name.c_str(), self.nameServers, pickone.val, 5075); + } + if(pickone({"EPICS_PVA_AUTO_ADDR_LIST"})) { parse_bool(self.autoAddrList, pickone.name, pickone.val); } diff --git a/src/pvxs/client.h b/src/pvxs/client.h index b2e8334..7555d17 100644 --- a/src/pvxs/client.h +++ b/src/pvxs/client.h @@ -822,6 +822,11 @@ struct PVXS_API Config { //! Empty implies wildcard 0.0.0.0 std::vector interfaces; + //! List of TCP name servers. + //! Client context will maintain connections, and send search requests, to these servers. + //! @since UNRELEASED + std::vector nameServers; + //! UDP port to bind. Default is 5076. May be zero, cf. Server::config() to find allocated port. unsigned short udp_port = 5076; //! Whether to extend the addressList with local interface broadcast addresses. (recommended) diff --git a/src/serverchan.cpp b/src/serverchan.cpp index 65d16fd..e81a30a 100644 --- a/src/serverchan.cpp +++ b/src/serverchan.cpp @@ -237,7 +237,7 @@ void ServerConn::handle_SEARCH() EvOutBuf R(hostBE, txBody.get()); - _to_wire<12>(M, iface->server->effective.guid.data(), false); + _to_wire<12>(R, iface->server->effective.guid.data(), false); to_wire(R, searchID); to_wire(R, SockAddr::any(AF_INET)); to_wire(R, iface->bind_addr.port());