start client beacon rx

This commit is contained in:
Michael Davidsaver
2020-02-22 09:28:47 -08:00
parent 187def97f9
commit acfba6469e
5 changed files with 103 additions and 8 deletions
+74 -5
View File
@@ -26,6 +26,8 @@ constexpr size_t nBuckets = 30u;
constexpr size_t maxSearchPayload = 0x4000;
constexpr timeval beaconCleanInterval{2*180, 0};
Disconnect::Disconnect()
:std::runtime_error("Disconnected")
{}
@@ -180,14 +182,12 @@ Context::Pvt::Pvt(const Config& conf)
,tcp_loop("PVXCTCP", epicsThreadPriorityCAServerLow)
,searchRx(event_new(tcp_loop.base, searchTx.sock, EV_READ|EV_PERSIST, &Pvt::onSearchS, this))
,searchTimer(event_new(tcp_loop.base, -1, EV_TIMEOUT, &Pvt::tickSearchS, this))
,beaconCleaner(event_new(tcp_loop.base, -1, EV_TIMEOUT, &Pvt::tickBeaconCleanS, this))
{
effective.expand();
searchBuckets.resize(nBuckets);
if(effective.udp_port==0)
throw std::runtime_error("Client can't use UDP random port");
std::set<std::string> bcasts;
{
ELLLIST list = ELLLIST_INIT;
@@ -242,14 +242,26 @@ Context::Pvt::Pvt(const Config& conf)
searchDest.emplace_back(saddr, isucast);
}
// TODO: receive beacons
//auto manager = UDPManager::instance();
auto manager = UDPManager::instance();
for(auto& iface : effective.interfaces) {
SockAddr addr(AF_INET, iface.c_str(), effective.udp_port);
log_info_printf(io, "Listening for beacons on %s\n", addr.tostring().c_str());
beaconRx.push_back(manager.onBeacon(addr, [this](const UDPManager::Beacon& msg) {
onBeacon(msg);
}));
}
for(auto& listener : beaconRx) {
listener->start();
}
if(event_add(searchTimer.get(), &bucketInterval))
log_err_printf(setup, "Error enabling search timer\n%s", "");
if(event_add(searchRx.get(), nullptr))
log_err_printf(setup, "Error enabling search RX\n%s", "");
if(event_add(searchTimer.get(), &beaconCleanInterval))
log_err_printf(setup, "Error enabling beacon clean timer on\n%s", "");
}
Context::Pvt::~Pvt() {}
@@ -291,6 +303,29 @@ void Context::Pvt::poke()
throw std::runtime_error("Unable to schedule searchTimer");
}
void Context::Pvt::onBeacon(const UDPManager::Beacon& msg)
{
const auto& guid = msg.guid;
epicsTimeStamp now;
epicsTimeGetCurrent(&now);
auto it = beaconSenders.find(msg.src);
if(it!=beaconSenders.end() && msg.guid==it->second.guid) {
it->second.lastRx = now;
return;
}
beaconSenders.emplace(msg.src, BTrack{msg.guid, now});
log_debug_printf(io, "%s New server %02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x %s\n",
msg.src.tostring().c_str(),
guid[0], guid[1], guid[2], guid[3], guid[4], guid[5], guid[6], guid[7], guid[8], guid[9], guid[10], guid[11],
msg.server.tostring().c_str());
poke();
}
bool Context::Pvt::onSearch()
{
searchMsg.resize(0x10000);
@@ -560,6 +595,40 @@ void Context::Pvt::tickSearchS(evutil_socket_t fd, short evt, void *raw)
}
}
void Context::Pvt::tickBeaconClean()
{
epicsTimeStamp now;
epicsTimeGetCurrent(&now);
auto it = beaconSenders.begin();
while(it!=beaconSenders.end()) {
auto cur = it++;
double age = epicsTimeDiffInSeconds(&now, &cur->second.lastRx);
if(age < -15.0 || age > 2.1*180.0) {
auto& guid = cur->second.guid;
log_debug_printf(io, "Lost server %02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x %s\n",
guid[0], guid[1], guid[2], guid[3], guid[4], guid[5], guid[6], guid[7], guid[8], guid[9], guid[10], guid[11],
cur->first.tostring().c_str());
beaconSenders.erase(cur);
}
}
if(event_add(searchTimer.get(), &beaconCleanInterval))
log_err_printf(setup, "Error re-enabling beacon clean timer on\n%s", "");
}
void Context::Pvt::tickBeaconCleanS(evutil_socket_t fd, short evt, void *raw)
{
try {
static_cast<Pvt*>(raw)->tickBeaconClean();
}catch(std::exception& e){
log_crit_printf(io, "Unhandled error in beacon cleaner timer callback: %s\n", e.what());
}
}
} // namespace client
} // namespace pvxs