diff --git a/setup.py b/setup.py index 2f1178a..926a000 100755 --- a/setup.py +++ b/setup.py @@ -542,6 +542,7 @@ def define_DSOS(self): 'clientintrospect.cpp', 'clientget.cpp', 'clientmon.cpp', + 'clientdiscover.cpp', ] src_pvxs = [os.path.join('src', src) for src in src_pvxs] diff --git a/src/Makefile b/src/Makefile index a53d14f..4a885d5 100644 --- a/src/Makefile +++ b/src/Makefile @@ -106,6 +106,7 @@ LIB_SRCS += clientconn.cpp LIB_SRCS += clientintrospect.cpp LIB_SRCS += clientget.cpp LIB_SRCS += clientmon.cpp +LIB_SRCS += clientdiscover.cpp LIB_LIBS += Com diff --git a/src/client.cpp b/src/client.cpp index 117c8a0..d404769 100644 --- a/src/client.cpp +++ b/src/client.cpp @@ -34,10 +34,14 @@ constexpr size_t maxSearchPayload = 1400; constexpr timeval channelCacheCleanInterval{10,0}; -constexpr timeval beaconCleanInterval{2*180, 0}; +constexpr timeval beaconCleanInterval{180, 0}; constexpr timeval tcpNSCheckInterval{10, 0}; +// searchSequenceID in CMD_SEARCH is redundant. +// So we use a static value and instead rely on IDs for individual PVs +constexpr uint32_t search_seq{0x66696e64}; // "find" + Disconnect::Disconnect() :std::runtime_error("Disconnected") ,time(epicsTime::getCurrent()) @@ -645,21 +649,23 @@ void ContextImpl::onBeacon(const UDPManager::Beacon& msg) epicsTimeStamp now; epicsTimeGetCurrent(&now); + bool newserv; { Guard G(pokeLock); - auto it = beaconSenders.find(msg.src); - if(it!=beaconSenders.end() && msg.guid==it->second.guid) { - it->second.lastRx = now; - return; - } - beaconSenders.emplace(msg.src, BTrack{msg.guid, now}); + auto& lastRx(beaconTrack[msg.guid][std::make_pair(msg.proto, msg.server)]); + if((newserv = !lastRx)) { + serverEvent(Discovered{Discovered::Online, msg.src.tostring(), msg.proto, msg.server.tostring(), msg.guid, now}); + } + lastRx.time = now; } - log_debug_printf(io, "%s\n", - std::string(SB()<(M, &guid[0], false, __FILE__, __LINE__); // searchSequenceID - // we don't use this and instead rely on ID for individual PVs - M.skip(4u, __FILE__, __LINE__); + // we don't use this for normal search and instead rely on ID for individual PVs + from_wire(M, seq); from_wire(M, serv); if(serv.isAny()) @@ -683,13 +690,9 @@ void procSearchReply(ContextImpl& self, const SockAddr& src, Buffer& M, bool ist port = src.port(); serv.setPort(port); - if(M.size()<4u || M[0]!=3u || M[1]!='t' || M[2]!='c' || M[3]!='p') - return; - M.skip(4u, __FILE__, __LINE__); - + std::string proto; + from_wire(M, proto); from_wire(M, found); - if(!found) - return; uint16_t nSearch = 0u; from_wire(M, nSearch); @@ -704,6 +707,21 @@ void procSearchReply(ContextImpl& self, const SockAddr& src, Buffer& M, bool ist } } + if(M.good() && !istcp && seq==search_seq && nSearch==0u && !found && !self.discoverers.empty()) { + // a discovery pong, process this like a beacon + log_debug_printf(io, "Discover reply for %s\n", src.tostring().c_str()); + + UDPManager::Beacon fakebeacon{src}; + fakebeacon.proto = proto; + fakebeacon.server = serv; + fakebeacon.guid = guid; + + self.onBeacon(fakebeacon); + } + + if(!found || proto!="tcp") + return; + for(auto n : range(nSearch)) { (void)n; @@ -836,34 +854,42 @@ void ContextImpl::onSearchS(evutil_socket_t fd, short evt, void *raw) } } -void ContextImpl::tickSearch() +void ContextImpl::tickSearch(bool discover) { + // If !discover, then this is a discovery ping. + // these are really empty searches with must-reply set. + // So if !discover, then we should not be modifying any internal state { Guard G(pokeLock); poked = false; } auto idx = currentBucket; - currentBucket = (currentBucket+1u)%searchBuckets.size(); + if(!discover) + currentBucket = (currentBucket+1u)%searchBuckets.size(); log_debug_printf(io, "Search tick %zu\n", idx); decltype (searchBuckets)::value_type bucket; - searchBuckets[idx].swap(bucket); + if(!discover) + searchBuckets[idx].swap(bucket); + + while(!bucket.empty() || discover) { + // when 'discover' we only loop once - while(!bucket.empty()) { searchMsg.resize(0x10000); FixedBuf M(true, searchMsg.data(), searchMsg.size()); M.skip(8, __FILE__, __LINE__); // fill in header after body length known // searchSequenceID - // we don't use this and instead rely on IDs for individual PVs - to_wire(M, uint32_t(0x66696e64)); + to_wire(M, search_seq); // flags and reserved. // initially flags[7] is cleared (bcast) auto pflags = M.save(); - to_wire(M, uint32_t(0u)); + to_wire(M, uint8_t(discover ? pva_search_flags::MustReply : 0u)); // must-reply to discovery, ignore regular negative search + to_wire(M, uint8_t(0u)); + to_wire(M, uint16_t(0u)); // IN6ADDR_ANY_INIT to_wire(M, uint32_t(0u)); @@ -874,8 +900,13 @@ void ContextImpl::tickSearch() auto pport = M.save(); to_wire(M, uint16_t(searchRxPort)); - to_wire(M, uint8_t(1u)); - to_wire(M, "tcp"); + if(discover) { + to_wire(M, uint8_t(0u)); + + } else { + to_wire(M, uint8_t(1u)); + to_wire(M, "tcp"); + } // placeholder for channel count; auto pcount = M.save(); @@ -884,6 +915,8 @@ void ContextImpl::tickSearch() bool payload = false; while(!bucket.empty()) { + assert(!discover); + auto chan = bucket.front().lock(); if(!chan || chan->state!=Channel::Searching) { bucket.pop_front(); @@ -932,7 +965,7 @@ void ContextImpl::tickSearch() } assert(M.good()); - if(!payload) + if(!payload && !discover) break; { @@ -945,7 +978,10 @@ void ContextImpl::tickSearch() to_wire(H, Header{CMD_SEARCH, 0, uint32_t(consumed-8u)}); } for(auto& pair : searchDest) { - *pflags = pair.second ? 0x80 : 0x00; + if(pair.second) + *pflags |= pva_search_flags::Unicast; + else + *pflags &= ~pva_search_flags::Unicast; int ntx = sendto(searchTx.sock, (char*)searchMsg.data(), consumed, 0, &pair.first->sa, pair.first.size()); @@ -990,6 +1026,8 @@ void ContextImpl::tickSearch() // fail silently, will retry } + if(discover) + break; } if(event_add(searchTimer.get(), &bucketInterval)) @@ -999,7 +1037,7 @@ void ContextImpl::tickSearch() void ContextImpl::tickSearchS(evutil_socket_t fd, short evt, void *raw) { try { - static_cast(raw)->tickSearch(); + static_cast(raw)->tickSearch(false); }catch(std::exception& e){ log_exc_printf(io, "Unhandled error in search timer callback: %s\n", e.what()); } @@ -1012,18 +1050,27 @@ void ContextImpl::tickBeaconClean() Guard G(pokeLock); - auto it = beaconSenders.begin(); - while(it!=beaconSenders.end()) { - auto cur = it++; + auto it = beaconTrack.begin(); + while(it!=beaconTrack.end()) { + auto cur = it++; // [GUID, {[proto, EP], lastRx}] - double age = epicsTimeDiffInSeconds(&now, &cur->second.lastRx); + auto it2 = cur->second.begin(); + while(it2!=cur->second.end()) { + auto cur2 = it2++; // [[proto, EP], lastRx] + double age = epicsTimeDiffInSeconds(&now, &cur2->second.time); - if(age < -15.0 || age > 2.1*180.0) { - auto& guid = cur->second.guid; - log_debug_printf(io, "%s\n", - std::string(SB()<<" Lost server "<first).c_str()); + if(age < -15.0 || age > 2*beaconCleanInterval.tv_sec) { + log_debug_printf(io, "%s\n", + std::string(SB()<<" Lost server "<first<<' '<first).c_str()); - beaconSenders.erase(cur); + serverEvent(Discovered{Discovered::Timeout, "", cur2->first.first, cur2->first.second.tostring(), cur->first, now}); + + cur->second.erase(cur2); + } + } + + if(cur->second.empty()) { + beaconTrack.erase(cur); } } } diff --git a/src/clientdiscover.cpp b/src/clientdiscover.cpp new file mode 100644 index 0000000..647c4b6 --- /dev/null +++ b/src/clientdiscover.cpp @@ -0,0 +1,138 @@ +/** + * Copyright - See the COPYRIGHT that is included with this distribution. + * pvxs is distributed subject to a Software License Agreement found + * in file LICENSE that is included with this distribution. + */ + +#include +#include +#include "utilpvt.h" +#include "clientimpl.h" + +DEFINE_LOGGER(setup, "pvxs.client.setup"); +DEFINE_LOGGER(io, "pvxs.client.io"); + +namespace pvxs { +namespace client { + +Discovery::Discovery(const std::shared_ptr &context) + :OperationBase (Operation::Discover, context->tcp_loop) + ,context(context) +{} + +Discovery::~Discovery() { + if(loop.assertInRunningLoop()) + _cancel(true); +} + +bool Discovery::cancel() +{ + decltype (notify) junk; + bool ret; + loop.call([this, &junk, &ret](){ + ret = _cancel(false); + junk = std::move(notify); + // leave opByIOID for GC + }); + return ret; +} + +bool Discovery::_cancel(bool implicit) { + bool active = running; + + if(active) { + context->discoverers.erase(this); + running = false; + } + return active; +} + +// unused for this special case +void Discovery::_reExecGet(std::function &&resultcb) {} +void Discovery::_reExecPut(const Value &arg, std::function &&resultcb) {} +void Discovery::createOp() {} +void Discovery::disconnected(const std::shared_ptr &self) {} + +std::shared_ptr DiscoverBuilder::exec() +{ + if(!ctx) + throw std::logic_error("NULL Builder"); + if(!_fn) + throw std::logic_error("Callback required"); + + auto context(ctx->impl->shared_from_this()); + auto ping(_ping); + + auto op(std::make_shared(context)); + op->notify = std::move(_fn); + + auto syncCancel(_syncCancel); + std::shared_ptr external(op.get(), [op, syncCancel](Discovery*) mutable { + // (maybe) user thread + auto loop(op->context->tcp_loop); + auto temp(std::move(op)); + loop.tryInvoke(syncCancel, std::bind([](std::shared_ptr& op){ + // on worker + op->context->discoverers.erase(op.get()); + + }, std::move(temp))); + }); + + // setup timer to send discovery + + context->tcp_loop.dispatch([op, context, ping]() { + + bool first = context->discoverers.empty(); + + context->discoverers[op.get()] = op; + op->running = true; + + if(first && ping) { + log_debug_printf(setup, "Starting Discover%s", "\n"); + + context->tickSearch(true); + } + }); + + return external; +} + +void ContextImpl::serverEvent(const Discovered &evt) +{ + for(auto& pair : discoverers) { + if(auto dis = pair.second.lock()) { + try { + dis->notify(evt); + } catch(std::exception& e) { + log_exc_printf(io, "Unhandled exception during Discovery callback : %s\n", e.what()); + } + } + } +} + +std::ostream& operator<<(std::ostream& strm, const Discovered& serv) +{ + char prefix[64]; + + serv.time.strftime(prefix, sizeof(prefix), "%Y-%m-%dT%H:%M:%S.%9f"); + strm< context; + std::function notify; + bool running = false; + + Discovery(const std::shared_ptr& context); + ~Discovery(); + + virtual bool cancel() override final; +private: + bool _cancel(bool implicit); + + // unused for this special case + virtual void _reExecGet(std::function &&resultcb) override final; + virtual void _reExecPut(const Value &arg, std::function &&resultcb) override final; + virtual void createOp() override final; + virtual void disconnected(const std::shared_ptr &self) override final; +}; + struct ContextImpl : public std::enable_shared_from_this { SockAttach attach; @@ -225,11 +245,13 @@ struct ContextImpl : public std::enable_shared_from_this epicsTimeStamp lastPoke{}; bool poked = false; - struct BTrack { - ServerGUID guid; - epicsTimeStamp lastRx; + // map: GUID -> proto+endpoint -> last beacon time + struct LastTime { + epicsTimeStamp time{}; + explicit operator bool() const { return time.secPastEpoch || time.nsec; } }; - std::map beaconSenders; + + std::map, LastTime>> beaconTrack; std::vector searchMsg; @@ -259,6 +281,8 @@ struct ContextImpl : public std::enable_shared_from_this // we keep a ref here as long as beaconCleaner is in use UDPManager manager; + std::map> discoverers; + const evevent beaconCleaner; const evevent cacheCleaner; const evevent nsChecker; @@ -274,11 +298,13 @@ struct ContextImpl : public std::enable_shared_from_this void poke(bool force); + void serverEvent(const Discovered &evt); + void onBeacon(const UDPManager::Beacon& msg); bool onSearch(); static void onSearchS(evutil_socket_t fd, short evt, void *raw); - void tickSearch(); + void tickSearch(bool discover); static void tickSearchS(evutil_socket_t fd, short evt, void *raw); void tickBeaconClean(); static void tickBeaconCleanS(evutil_socket_t fd, short evt, void *raw); diff --git a/src/pvxs/client.h b/src/pvxs/client.h index e031ba2..a996f2c 100644 --- a/src/pvxs/client.h +++ b/src/pvxs/client.h @@ -107,6 +107,7 @@ struct PVXS_API Operation { Put = 11, // CMD_PUT RPC = 20, // CMD_RPC Monitor = 13, // CMD_MONITOR + Discover = 3, // CMD_SEARCH } op; explicit constexpr Operation(operation_t op) :op(op) {} @@ -235,6 +236,8 @@ class RPCBuilder; class MonitorBuilder; class RequestBuilder; class ConnectBuilder; +struct Discovered; +class DiscoverBuilder; /** An independent PVA protocol client instance * @@ -463,6 +466,30 @@ public: static inline RequestBuilder request(); + /** Discover the presence or absence of Servers. + * + * Combines information from periodic Server Beacon messages, and optionally + * Discover pings, to provide notice when PVA servers appear or disappear + * from attached networks. + * + * Note that a discover() Operation will never complete with a Value, + * and so can only end with a timeout or cancellation. + * + * @code + * Context ctxt(...); + * auto op = ctxt.discover([](const Discovered& evt) { + * std::cout<wait(10.0); // wait 10 seconds, will always timeout. + * @endcode + * + * @since UNRELEASED + */ + inline + DiscoverBuilder discover(std::function && fn); + /** Request prompt search of any disconnected channels. * * This method is recommended for use when executing a batch of operations. @@ -863,6 +890,55 @@ public: }; ConnectBuilder Context::connect(const std::string& pvname) { return ConnectBuilder{pvt, pvname}; } +//! Change of state event associated with a Context::discover() +struct Discovered { + //! What sort of event is this? + enum event_t { + Online=1, //!< Beacon from new server GUID + Timeout=2, //!< Beacon timeout for previous server + } event; + std::string peer; //!< source of Beacon + std::string proto; //!< Advertised protocol. eg. "tcp" + std::string server;//!< Server protocol endpoint. + ServerGUID guid; //!< Server provided ID + epicsTime time; +}; +PVXS_API +std::ostream& operator<<(std::ostream& strm, const Discovered& evt); +//! Prepare a Context::discover() operation +//! @since UNRELEASED +class DiscoverBuilder +{ + std::shared_ptr ctx; + std::function _fn; + bool _syncCancel = true; + bool _ping = false; +public: + DiscoverBuilder(const std::shared_ptr& ctx, std::function&& fn) + :ctx(ctx) + ,_fn(fn) + {} + + /** Controls whether client will actively seek to immediately discover all servers. + * + * If false, then client will only wait for servers to periodically announce themselves. + */ + DiscoverBuilder& pingAll(bool b) { this->_ping = b; return *this; } + + /** Controls whether Operation::cancel() synchronizes. + * + * When true (the default) explicit or implicit cancel blocks until any + * in progress callback has completed. This makes safe some use of + * references in callbacks. + */ + DiscoverBuilder& syncCancel(bool b) { this->_syncCancel = b; return *this; } + + //! Execute. The returned Operation will never complete. + PVXS_API + std::shared_ptr exec(); +}; +DiscoverBuilder Context::discover(std::function && fn) { return DiscoverBuilder(pvt, std::move(fn)); } + struct PVXS_API Config { //! List of unicast and broadcast addresses std::vector addressList; diff --git a/src/server.cpp b/src/server.cpp index 48428ba..50f77d0 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -40,6 +40,11 @@ DEFINE_LOGGER(serversetup, "pvxs.server.setup"); DEFINE_LOGGER(serverio, "pvxs.server.io"); DEFINE_LOGGER(serversearch, "pvxs.server.search"); +// mimic pvAccessCPP server (almost) +// send a "burst" of beacons, then fallback to a longer interval +static constexpr timeval beaconIntervalShort{15, 0}; +static constexpr timeval beaconIntervalLong{180, 0}; + Server Server::fromEnv() { return Config::fromEnv().build(); @@ -722,9 +727,9 @@ void Server::Pvt::doBeacons(short evt) // mimic pvAccessCPP server (almost) // send a "burst" of beacons, then fallback to a longer interval - timeval interval{180, 0}; + timeval interval(beaconIntervalLong); if(beaconCnt<10u) { - interval = {15, 0}; + interval = beaconIntervalShort; beaconCnt++; } if(event_add(beaconTimer.get(), &interval)) diff --git a/src/udp_collector.cpp b/src/udp_collector.cpp index ddd0119..166c580 100644 --- a/src/udp_collector.cpp +++ b/src/udp_collector.cpp @@ -383,12 +383,11 @@ void UDPCollector::process_one(const SockAddr &dest, const uint8_t *buf, size_t } beaconMsg.server.setPort(port); - std::string proto; - from_wire(M, proto); + from_wire(M, beaconMsg.proto); // ignore remaining "server status" blob - if(M.good() && proto=="tcp") { + if(M.good()) { for(auto L : listeners) { if(L->beaconCB) { (L->beaconCB)(beaconMsg); diff --git a/src/udp_collector.h b/src/udp_collector.h index 1802967..f830ca5 100644 --- a/src/udp_collector.h +++ b/src/udp_collector.h @@ -35,10 +35,11 @@ struct PVXS_API UDPManager evbase& loop(); struct Beacon { - SockAddr& src; + const SockAddr& src; + std::string proto; SockAddr server; ServerGUID guid; - Beacon(SockAddr& src) :src(src) {} + Beacon(const SockAddr& src) :src(src) {} }; //! Create subscription for Beacon messages. //! Must call UDPListener::start() diff --git a/tools/Makefile b/tools/Makefile index 52e0903..2bb5a03 100644 --- a/tools/Makefile +++ b/tools/Makefile @@ -32,6 +32,9 @@ pvxput_SRCS += put.cpp PROD += pvxcall pvxcall_SRCS += call.cpp +PROD += pvxlist +pvxlist_SRCS += list.cpp + #=========================== include $(TOP)/configure/RULES diff --git a/tools/list.cpp b/tools/list.cpp new file mode 100644 index 0000000..388488b --- /dev/null +++ b/tools/list.cpp @@ -0,0 +1,213 @@ +/** + * Copyright - See the COPYRIGHT that is included with this distribution. + * pvxs is distributed subject to a Software License Agreement found + * in file LICENSE that is included with this distribution. + */ + +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include +#include +#include +#include "utilpvt.h" +#include "evhelper.h" + +using namespace pvxs; + +namespace { + +void usage(const char* argv0) +{ + std::cerr<< + "Usage:\n" + " Discover Servers:\n" + " "<\n" + "\n" + " Server Info:\n" + " "<\n" + "\n" + "Examples:\n" + " Monitor server beacons to detect servers coming online, and going offline.\n" + " "< Operation timeout in seconds. Default 5 sec. '0' disables timeout,\n" + " useful in combination with '-v'.\n" + ; +} + +} // namespace + +int main(int argc, char *argv[]) +{ + try { + logger_config_env(); // from $PVXS_LOG + double timeout = 5.0; + bool verbose = false; + bool info = false; + bool active = true; + + { + int opt; + while ((opt = getopt(argc, argv, "hVApivdw:")) != -1) { + switch(opt) { + case 'h': + usage(argv[0]); + return 0; + case 'V': + std::cout<(optarg); + break; + default: + usage(argv[0]); + std::cerr<<"\nUnknown argument: "<> ops; + ops.reserve(argc-optind); + + if(optind==argc) { // discover mode, search of all servers + std::set> servprotos; + + ops.push_back(ctxt.discover([servprotos, verbose](const client::Discovered& serv) mutable { + if(verbose) { // print all events and info + std::cout< remaining{argc-optind}; + + for(auto n : range(optind, argc)) { + ops.push_back(ctxt.rpc("server") + .server(argv[n]) + .arg("op", info ? "info" : "channels") + .result([argv, n, info, verbose, &remaining, &done](client::Result&& r) + { + try { + auto top(r()); + + if(info) { + std::cout<>()); + for(auto& name : channels) { + std::cout<0.0) + done.wait(timeout); + else + done.wait(); + + return 0; + + }catch(std::exception& e){ + std::cerr<<"Error: "<