client add TCP search
This commit is contained in:
+81
-1
@@ -36,6 +36,8 @@ constexpr timeval channelCacheCleanInterval{10,0};
|
||||
|
||||
constexpr timeval beaconCleanInterval{2*180, 0};
|
||||
|
||||
constexpr timeval tcpNSCheckInterval{10, 0};
|
||||
|
||||
Disconnect::Disconnect()
|
||||
:std::runtime_error("Disconnected")
|
||||
,time(epicsTime::getCurrent())
|
||||
@@ -289,7 +291,9 @@ Subscription::~Subscription() {}
|
||||
|
||||
Context::Context(const Config& conf)
|
||||
:pvt(std::make_shared<Pvt>(conf))
|
||||
{}
|
||||
{
|
||||
pvt->impl->startNS();
|
||||
}
|
||||
|
||||
Context::~Context() {}
|
||||
|
||||
@@ -381,6 +385,7 @@ ContextImpl::ContextImpl(const Config& conf, const evbase& tcp_loop)
|
||||
,manager(UDPManager::instance())
|
||||
,beaconCleaner(event_new(manager.loop().base, -1, EV_TIMEOUT|EV_PERSIST, &ContextImpl::tickBeaconCleanS, this))
|
||||
,cacheCleaner(event_new(tcp_loop.base, -1, EV_TIMEOUT|EV_PERSIST, &ContextImpl::cacheCleanS, this))
|
||||
,nsChecker(event_new(tcp_loop.base, -1, EV_TIMEOUT|EV_PERSIST, &ContextImpl::onNSCheckS, this))
|
||||
{
|
||||
effective.expand();
|
||||
|
||||
@@ -429,6 +434,18 @@ ContextImpl::ContextImpl(const Config& conf, const evbase& tcp_loop)
|
||||
searchDest.emplace_back(saddr, isucast);
|
||||
}
|
||||
|
||||
for(auto& addr : effective.nameServers) {
|
||||
SockAddr saddr(AF_INET);
|
||||
try {
|
||||
saddr.setAddress(addr.c_str(), 5075);
|
||||
}catch(std::runtime_error& e) {
|
||||
log_err_printf(setup, "%s Ignoring...\n", e.what());
|
||||
}
|
||||
|
||||
log_info_printf(io, "Searching to TCP %s\n", saddr.tostring().c_str());
|
||||
nameServers.emplace_back(saddr, nullptr);
|
||||
}
|
||||
|
||||
for(auto& iface : effective.interfaces) {
|
||||
SockAddr addr(AF_INET, iface.c_str(), effective.udp_port);
|
||||
log_info_printf(io, "Listening for beacons on %s\n", addr.tostring().c_str());
|
||||
@@ -453,6 +470,25 @@ ContextImpl::ContextImpl(const Config& conf, const evbase& tcp_loop)
|
||||
|
||||
ContextImpl::~ContextImpl() {}
|
||||
|
||||
void ContextImpl::startNS()
|
||||
{
|
||||
if(nameServers.empty()) // vector size const after ctor, contents remain mutable
|
||||
return;
|
||||
|
||||
tcp_loop.call([this]() {
|
||||
// start connections to name servers
|
||||
for(auto& ns : nameServers) {
|
||||
const auto& serv = ns.first;
|
||||
connByAddr[serv] = ns.second = std::make_shared<Connection>(shared_from_this(), serv);
|
||||
ns.second->nameserver = true;
|
||||
log_debug_printf(io, "Connecting to nameserver %s\n", ns.second->peerName.c_str());
|
||||
}
|
||||
|
||||
if(event_add(nsChecker.get(), &tcpNSCheckInterval))
|
||||
log_err_printf(setup, "Error enabling TCP search reconnect timer\n%s", "");
|
||||
});
|
||||
}
|
||||
|
||||
void ContextImpl::close()
|
||||
{
|
||||
// terminate all active connections
|
||||
@@ -755,6 +791,7 @@ void ContextImpl::tickSearch()
|
||||
to_wire(M, uint32_t(0u));
|
||||
to_wire(M, uint32_t(0u));
|
||||
|
||||
auto pport = M.save();
|
||||
to_wire(M, uint16_t(searchRxPort));
|
||||
|
||||
to_wire(M, uint8_t(1u));
|
||||
@@ -849,6 +886,28 @@ void ContextImpl::tickSearch()
|
||||
pair.second ? "ucast" : "bcast");
|
||||
}
|
||||
}
|
||||
*pflags |= 0x80; // TCP search is always "unicast"
|
||||
// TCP search replies should always come back on the same connection,
|
||||
// so zero out the meaningless response port.
|
||||
pport[0] = pport[1] = 0;
|
||||
|
||||
for(auto& pair : nameServers) {
|
||||
auto& serv = pair.second;
|
||||
|
||||
if(!serv->ready || !serv->bev)
|
||||
continue;
|
||||
|
||||
auto tx = bufferevent_get_output(serv->bev.get());
|
||||
|
||||
// arbitrarily skip searching if TX buffer is too full
|
||||
// TODO: configure limit?
|
||||
if(evbuffer_get_length(tx) > 64*1024u)
|
||||
continue;
|
||||
|
||||
(void)evbuffer_add(tx, (char*)searchMsg.data(), consumed);
|
||||
// fail silently, will retry
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
if(event_add(searchTimer.get(), &bucketInterval))
|
||||
@@ -896,6 +955,27 @@ void ContextImpl::tickBeaconCleanS(evutil_socket_t fd, short evt, void *raw)
|
||||
}
|
||||
}
|
||||
|
||||
void ContextImpl::onNSCheck()
|
||||
{
|
||||
for(auto& ns : nameServers) {
|
||||
if(ns.second && ns.second->bev) // connecting or connected
|
||||
continue;
|
||||
|
||||
connByAddr[ns.first] = ns.second = std::make_shared<Connection>(shared_from_this(), ns.first);
|
||||
ns.second->nameserver = true;
|
||||
log_debug_printf(io, "Reconnecting nameserver %s\n", ns.second->peerName.c_str());
|
||||
}
|
||||
}
|
||||
|
||||
void ContextImpl::onNSCheckS(evutil_socket_t fd, short evt, void *raw)
|
||||
{
|
||||
try {
|
||||
static_cast<ContextImpl*>(raw)->onNSCheck();
|
||||
}catch(std::exception& e){
|
||||
log_exc_printf(io, "Unhandled error in TCP nameserver timer callback: %s\n", e.what());
|
||||
}
|
||||
}
|
||||
|
||||
void ContextImpl::cacheClean()
|
||||
{
|
||||
std::set<std::string> trash;
|
||||
|
||||
Reference in New Issue
Block a user