client: add discover() and pvxlist

This commit is contained in:
Michael Davidsaver
2021-04-17 10:57:25 -07:00
parent d77ef29b53
commit 7a65a85c99
11 changed files with 561 additions and 51 deletions
+86 -39
View File
@@ -34,10 +34,14 @@ constexpr size_t maxSearchPayload = 1400;
constexpr timeval channelCacheCleanInterval{10,0};
constexpr timeval beaconCleanInterval{2*180, 0};
constexpr timeval beaconCleanInterval{180, 0};
constexpr timeval tcpNSCheckInterval{10, 0};
// searchSequenceID in CMD_SEARCH is redundant.
// So we use a static value and instead rely on IDs for individual PVs
constexpr uint32_t search_seq{0x66696e64}; // "find"
Disconnect::Disconnect()
:std::runtime_error("Disconnected")
,time(epicsTime::getCurrent())
@@ -645,21 +649,23 @@ void ContextImpl::onBeacon(const UDPManager::Beacon& msg)
epicsTimeStamp now;
epicsTimeGetCurrent(&now);
bool newserv;
{
Guard G(pokeLock);
auto it = beaconSenders.find(msg.src);
if(it!=beaconSenders.end() && msg.guid==it->second.guid) {
it->second.lastRx = now;
return;
}
beaconSenders.emplace(msg.src, BTrack{msg.guid, now});
auto& lastRx(beaconTrack[msg.guid][std::make_pair(msg.proto, msg.server)]);
if((newserv = !lastRx)) {
serverEvent(Discovered{Discovered::Online, msg.src.tostring(), msg.proto, msg.server.tostring(), msg.guid, now});
}
lastRx.time = now;
}
log_debug_printf(io, "%s\n",
std::string(SB()<<msg.src<<" New server "<<guid<<' '<<msg.server).c_str());
if(newserv) {
log_debug_printf(io, "%s\n",
std::string(SB()<<msg.src<<" New server "<<guid<<' '<<msg.server).c_str());
poke(false);
poke(false);
}
}
static
@@ -669,11 +675,12 @@ void procSearchReply(ContextImpl& self, const SockAddr& src, Buffer& M, bool ist
SockAddr serv(AF_INET);
uint16_t port = 0;
uint8_t found = 0u;
uint32_t seq = 0u;
_from_wire<12>(M, &guid[0], false, __FILE__, __LINE__);
// searchSequenceID
// we don't use this and instead rely on ID for individual PVs
M.skip(4u, __FILE__, __LINE__);
// we don't use this for normal search and instead rely on ID for individual PVs
from_wire(M, seq);
from_wire(M, serv);
if(serv.isAny())
@@ -683,13 +690,9 @@ void procSearchReply(ContextImpl& self, const SockAddr& src, Buffer& M, bool ist
port = src.port();
serv.setPort(port);
if(M.size()<4u || M[0]!=3u || M[1]!='t' || M[2]!='c' || M[3]!='p')
return;
M.skip(4u, __FILE__, __LINE__);
std::string proto;
from_wire(M, proto);
from_wire(M, found);
if(!found)
return;
uint16_t nSearch = 0u;
from_wire(M, nSearch);
@@ -704,6 +707,21 @@ void procSearchReply(ContextImpl& self, const SockAddr& src, Buffer& M, bool ist
}
}
if(M.good() && !istcp && seq==search_seq && nSearch==0u && !found && !self.discoverers.empty()) {
// a discovery pong, process this like a beacon
log_debug_printf(io, "Discover reply for %s\n", src.tostring().c_str());
UDPManager::Beacon fakebeacon{src};
fakebeacon.proto = proto;
fakebeacon.server = serv;
fakebeacon.guid = guid;
self.onBeacon(fakebeacon);
}
if(!found || proto!="tcp")
return;
for(auto n : range(nSearch)) {
(void)n;
@@ -836,34 +854,42 @@ void ContextImpl::onSearchS(evutil_socket_t fd, short evt, void *raw)
}
}
void ContextImpl::tickSearch()
void ContextImpl::tickSearch(bool discover)
{
// If !discover, then this is a discovery ping.
// these are really empty searches with must-reply set.
// So if !discover, then we should not be modifying any internal state
{
Guard G(pokeLock);
poked = false;
}
auto idx = currentBucket;
currentBucket = (currentBucket+1u)%searchBuckets.size();
if(!discover)
currentBucket = (currentBucket+1u)%searchBuckets.size();
log_debug_printf(io, "Search tick %zu\n", idx);
decltype (searchBuckets)::value_type bucket;
searchBuckets[idx].swap(bucket);
if(!discover)
searchBuckets[idx].swap(bucket);
while(!bucket.empty() || discover) {
// when 'discover' we only loop once
while(!bucket.empty()) {
searchMsg.resize(0x10000);
FixedBuf M(true, searchMsg.data(), searchMsg.size());
M.skip(8, __FILE__, __LINE__); // fill in header after body length known
// searchSequenceID
// we don't use this and instead rely on IDs for individual PVs
to_wire(M, uint32_t(0x66696e64));
to_wire(M, search_seq);
// flags and reserved.
// initially flags[7] is cleared (bcast)
auto pflags = M.save();
to_wire(M, uint32_t(0u));
to_wire(M, uint8_t(discover ? pva_search_flags::MustReply : 0u)); // must-reply to discovery, ignore regular negative search
to_wire(M, uint8_t(0u));
to_wire(M, uint16_t(0u));
// IN6ADDR_ANY_INIT
to_wire(M, uint32_t(0u));
@@ -874,8 +900,13 @@ void ContextImpl::tickSearch()
auto pport = M.save();
to_wire(M, uint16_t(searchRxPort));
to_wire(M, uint8_t(1u));
to_wire(M, "tcp");
if(discover) {
to_wire(M, uint8_t(0u));
} else {
to_wire(M, uint8_t(1u));
to_wire(M, "tcp");
}
// placeholder for channel count;
auto pcount = M.save();
@@ -884,6 +915,8 @@ void ContextImpl::tickSearch()
bool payload = false;
while(!bucket.empty()) {
assert(!discover);
auto chan = bucket.front().lock();
if(!chan || chan->state!=Channel::Searching) {
bucket.pop_front();
@@ -932,7 +965,7 @@ void ContextImpl::tickSearch()
}
assert(M.good());
if(!payload)
if(!payload && !discover)
break;
{
@@ -945,7 +978,10 @@ void ContextImpl::tickSearch()
to_wire(H, Header{CMD_SEARCH, 0, uint32_t(consumed-8u)});
}
for(auto& pair : searchDest) {
*pflags = pair.second ? 0x80 : 0x00;
if(pair.second)
*pflags |= pva_search_flags::Unicast;
else
*pflags &= ~pva_search_flags::Unicast;
int ntx = sendto(searchTx.sock, (char*)searchMsg.data(), consumed, 0, &pair.first->sa, pair.first.size());
@@ -990,6 +1026,8 @@ void ContextImpl::tickSearch()
// fail silently, will retry
}
if(discover)
break;
}
if(event_add(searchTimer.get(), &bucketInterval))
@@ -999,7 +1037,7 @@ void ContextImpl::tickSearch()
void ContextImpl::tickSearchS(evutil_socket_t fd, short evt, void *raw)
{
try {
static_cast<ContextImpl*>(raw)->tickSearch();
static_cast<ContextImpl*>(raw)->tickSearch(false);
}catch(std::exception& e){
log_exc_printf(io, "Unhandled error in search timer callback: %s\n", e.what());
}
@@ -1012,18 +1050,27 @@ void ContextImpl::tickBeaconClean()
Guard G(pokeLock);
auto it = beaconSenders.begin();
while(it!=beaconSenders.end()) {
auto cur = it++;
auto it = beaconTrack.begin();
while(it!=beaconTrack.end()) {
auto cur = it++; // [GUID, {[proto, EP], lastRx}]
double age = epicsTimeDiffInSeconds(&now, &cur->second.lastRx);
auto it2 = cur->second.begin();
while(it2!=cur->second.end()) {
auto cur2 = it2++; // [[proto, EP], lastRx]
double age = epicsTimeDiffInSeconds(&now, &cur2->second.time);
if(age < -15.0 || age > 2.1*180.0) {
auto& guid = cur->second.guid;
log_debug_printf(io, "%s\n",
std::string(SB()<<" Lost server "<<guid<<' '<<cur->first).c_str());
if(age < -15.0 || age > 2*beaconCleanInterval.tv_sec) {
log_debug_printf(io, "%s\n",
std::string(SB()<<" Lost server "<<cur->first<<' '<<cur->first).c_str());
beaconSenders.erase(cur);
serverEvent(Discovered{Discovered::Timeout, "", cur2->first.first, cur2->first.second.tostring(), cur->first, now});
cur->second.erase(cur2);
}
}
if(cur->second.empty()) {
beaconTrack.erase(cur);
}
}
}