/** * Copyright - See the COPYRIGHT that is included with this distribution. * pvxs is distributed subject to a Software License Agreement found * in file LICENSE that is included with this distribution. */ #ifndef CLIENTIMPL_H #define CLIENTIMPL_H #include #include #include #include #include #include "evhelper.h" #include "dataimpl.h" #include "utilpvt.h" #include "udp_collector.h" #include "conn.h" namespace pvxs { namespace client { struct Channel; struct ContextImpl; struct ResultWaiter { epicsMutex lock; epicsEvent notify; Result result; enum { Busy, Done, Abort, } outcome = Busy; Value wait(double timeout=-1.0); void complete(Result&& result, bool interrupt); }; // internal actions on an Operation struct OperationBase : public Operation { const evbase loop; // remaining members only accessibly from loop worker std::shared_ptr chan; uint32_t ioid = 0; Value result; bool done = false; std::shared_ptr waiter; OperationBase(operation_t op, const evbase& loop); virtual ~OperationBase(); virtual void createOp() =0; virtual void disconnected(const std::shared_ptr& self) =0; virtual const std::string& name() override final; virtual Value wait(double timeout=-1.0) override final; virtual void interrupt() override final; }; struct RequestFL { const size_t limit; epicsMutex lock; std::vector unused; explicit RequestFL(size_t limit) :limit(limit) {} }; struct RequestInfo { const uint32_t sid, ioid; const Operation::operation_t op; const std::weak_ptr handle; Value prototype; std::shared_ptr fl; RequestInfo(uint32_t sid, uint32_t ioid, std::shared_ptr& handle); }; struct Connection final : public ConnBase, public std::enable_shared_from_this { const std::shared_ptr context; // While HoldOff, the time until re-connection // While Connected, periodic Echo const evevent echoTimer; bool ready = false; bool nameserver = false; // channels to be created on this Connection in state==Connecting std::map> pending; std::map> creatingByCID, // in state==Creating chanBySID; // in state==Active // entries always have matching entry in a Channel::opByIOID std::map opByIOID; uint32_t nextIOID = 0x10002000u; INST_COUNTER(Connection); Connection(const std::shared_ptr& context, const SockAddr &peerAddr, bool reconn); virtual ~Connection(); static std::shared_ptr build(const std::shared_ptr& context, const SockAddr& serv, bool reconn=false); private: void startConnecting(); public: void createChannels(); void sendDestroyRequest(uint32_t sid, uint32_t ioid); virtual void bevEvent(short events) override final; virtual std::shared_ptr self_from_this() override final; virtual void cleanup() override final; #define CASE(Op) virtual void handle_##Op() override final; CASE(CONNECTION_VALIDATION); CASE(CONNECTION_VALIDATED); CASE(SEARCH_RESPONSE); CASE(CREATE_CHANNEL); CASE(DESTROY_CHANNEL); CASE(GET); CASE(PUT); //CASE(PUT_GET); CASE(MONITOR); CASE(RPC); CASE(GET_FIELD); CASE(MESSAGE); #undef CASE void handle_GPR(pva_app_msg_t cmd); protected: void tickEcho(); static void tickEchoS(evutil_socket_t fd, short evt, void *raw); }; struct ConnectImpl final : public Connect { const evbase loop; std::shared_ptr chan; const std::string _name; std::atomic _connected; std::function _onConn; std::function _onDis; ConnectImpl(const evbase& loop, const std::string& name) :loop(loop) ,_name(name) ,_connected{false} {} virtual ~ConnectImpl(); virtual const std::string &name() const override final; virtual bool connected() const override final; }; struct Channel { const std::shared_ptr context; const std::string name; // Our chosen ID for this channel. // used as persistent CID and searchID const uint32_t cid; enum state_t { Searching, // waiting for a server to claim Connecting, // waiting for Connection to become ready Creating, // waiting for reply to CREATE_CHANNEL Active, } state = Searching; bool garbage = false; std::shared_ptr conn; uint32_t sid = 0u; // channel created with .server() to bypass normal search process SockAddr forcedServer; // when state==Searching, number of repetitions size_t nSearch = 0u; // GUID of last positive reply when state!=Searching ServerGUID guid{}; SockAddr replyAddr; std::list> pending; // points to storage of Connection::opByIOID std::map opByIOID; std::list connectors; size_t statTx{}, statRx{}; INST_COUNTER(Channel); Channel(const std::shared_ptr& context, const std::string& name, uint32_t cid); ~Channel(); void createOperations(); void disconnect(const std::shared_ptr& self); static std::shared_ptr build(const std::shared_ptr& context, const std::string& name, const std::string& server); }; struct Discovery final : public OperationBase { const std::shared_ptr context; std::function notify; bool running = false; Discovery(const std::shared_ptr& context); ~Discovery(); virtual bool cancel() override final; private: bool _cancel(bool implicit); // unused for this special case virtual void _reExecGet(std::function &&resultcb) override final; virtual void _reExecPut(const Value &arg, std::function &&resultcb) override final; virtual void createOp() override final; virtual void disconnected(const std::shared_ptr &self) override final; }; struct ContextImpl : public std::enable_shared_from_this { SockAttach attach; IfaceMap& ifmap; enum state_t { Init, Running, Stopped, } state = Init; const Config effective; const Value caMethod; uint32_t nextCID=0x12345678; uint32_t prevndrop = 0u; evsocket searchTx4, searchTx6; uint16_t searchRxPort; std::vector ignoreServerGUIDs; // poked and beaconSenders from both TCP and UDP workers epicsMutex pokeLock; epicsTimeStamp lastPoke{}; size_t nPoked = 0u; // unlike `poke`, `scheduleInitialSearch` is only ever called from the // tcp_loop so this does not need to be guarded by a mutex bool initialSearchScheduled = false; // map: endpoint+proto -> Beaconer typedef std::pair BeaconServer; struct BeaconInfo { SockAddr sender; ServerGUID guid{}; uint8_t peerVersion{}; epicsTimeStamp time{}; }; std::map beaconTrack; std::vector searchMsg; // search destination address and whether to set the unicast flag std::vector> searchDest; size_t currentBucket = 0u; // Channels where we have yet to send out an initial search request std::list> initialSearchBucket; // Channels where we are waiting for a search response std::vector>> searchBuckets; std::list > beaconRx; std::map> chanByCID; // strong ref. loop through Channel::context // explicitly broken by Context::close(), Context::cacheClear(), or ContextImpl::cacheClean() // chanByName key'd by (pv, forceServer) std::map, std::shared_ptr> chanByName; std::map> connByAddr; std::vector>> nameServers; evbase tcp_loop; const evevent searchRx4, searchRx6; const evevent searchTimer; const evevent initialSearcher; // beacon handling done on UDP worker. // we keep a ref here as long as beaconCleaner is in use UDPManager manager; std::map> discoverers; const evevent beaconCleaner; const evevent cacheCleaner; const evevent nsChecker; INST_COUNTER(ClientContextImpl); ContextImpl(const Config& conf, const evbase &tcp_loop); ~ContextImpl(); void startNS(); void close(); void poke(); void serverEvent(const Discovered &evt); void onBeacon(const UDPManager::Beacon& msg); void scheduleInitialSearch(); bool onSearch(evutil_socket_t fd); static void onSearchS(evutil_socket_t fd, short evt, void *raw); enum class SearchKind { discover, initial, check }; void tickSearch(SearchKind kind, bool poked); static void tickSearchS(evutil_socket_t fd, short evt, void *raw); static void initialSearchS(evutil_socket_t fd, short evt, void *raw); void tickBeaconClean(); static void tickBeaconCleanS(evutil_socket_t fd, short evt, void *raw); void cacheClean(const std::string &name, Context::cacheAction force); static void cacheCleanS(evutil_socket_t fd, short evt, void *raw); void onNSCheck(); static void onNSCheckS(evutil_socket_t fd, short evt, void *raw); }; struct Context::Pvt { // external ref to running loop. // impl directly, and indirectly, contains internal refs private: evbase loop; public: const std::shared_ptr impl; INST_COUNTER(ClientPvt); Pvt(const Config& conf); ~Pvt(); // I call ContextImpl::close() }; } // namespace client } // namespace pvxs #endif // CLIENTIMPL_H