/** * Copyright - See the COPYRIGHT that is included with this distribution. * pvxs is distributed subject to a Software License Agreement found * in file LICENSE that is included with this distribution. */ #ifndef SERVERCONN_H #define SERVERCONN_H #include #include #include #include #include #include #include #include #include "evhelper.h" #include "utilpvt.h" #include "dataimpl.h" #include "udp_collector.h" #include "conn.h" namespace pvxs {namespace impl { struct ServIface; struct ServerConn; struct ServerChan; // base for tracking in-progress operations. cf. ServerConn::opByIOID and ServerChan::opByIOID struct ServerOp { const std::weak_ptr chan; const uint32_t ioid; std::function onClose; std::function onCancel; enum state_t { Creating, Idle, Executing, Dead, } state; ServerOp(const std::weak_ptr& chan, uint32_t ioid) :chan(chan), ioid(ioid), state(Idle) {} ServerOp(const ServerOp&) = delete; ServerOp& operator=(const ServerOp&) = delete; virtual ~ServerOp() =0; virtual void show(std::ostream& strm) const =0; }; struct ServerChannelControl : public server::ChannelControl { ServerChannelControl(const std::shared_ptr& conn, const std::shared_ptr& chan); virtual ~ServerChannelControl(); virtual void onOp(std::function&&)>&& fn) override final; virtual void onRPC(std::function&&, Value&&)>&& fn) override final; virtual void onSubscribe(std::function&&)>&& fn) override final; virtual void onClose(std::function&& fn) override final; virtual void close() override final; virtual void _updateInfo(const std::shared_ptr& info) override final; const std::weak_ptr server; const std::weak_ptr chan; INST_COUNTER(ServerChannelControl); }; struct ServerChan { const std::weak_ptr conn; const uint32_t sid, cid; const std::string name; enum { Creating, // CREATE_CHANNEL request received, reply not sent Active, // reply sent Destroy, // DESTROY_CHANNEL request received and/or reply sent } state; size_t statTx{}, statRx{}; std::shared_ptr reportInfo; std::function&&)> onOp; std::function&&, Value&&)> onRPC; std::function&&)> onSubscribe; std::function onClose; std::map > opByIOID; // our subset of ServerConn::opByIOID INST_COUNTER(ServerChan); ServerChan(const std::shared_ptr& conn, uint32_t sid, uint32_t cid, const std::string& name); ServerChan(const ServerChan&) = delete; ServerChan& operator=(const ServerChan&) = delete; ~ServerChan(); }; struct ServerConn : public ConnBase, public std::enable_shared_from_this { ServIface* const iface; std::shared_ptr cred; uint32_t nextSID=0x07050301; std::map > chanBySID; std::map > opByIOID; std::list> backlog; INST_COUNTER(ServerConn); ServerConn(ServIface* iface, evutil_socket_t sock, struct sockaddr *peer, int socklen); ServerConn(const ServerConn&) = delete; ServerConn& operator=(const ServerConn&) = delete; ~ServerConn(); const std::shared_ptr& lookupSID(uint32_t sid); private: #define CASE(Op) virtual void handle_##Op() override final; CASE(ECHO); CASE(CONNECTION_VALIDATION); CASE(SEARCH); CASE(AUTHNZ); CASE(CREATE_CHANNEL); CASE(DESTROY_CHANNEL); CASE(GET); CASE(PUT); CASE(PUT_GET); CASE(MONITOR); CASE(RPC); CASE(CANCEL_REQUEST); CASE(DESTROY_REQUEST); CASE(GET_FIELD); CASE(MESSAGE); #undef CASE void handle_GPR(pva_app_msg_t cmd); virtual std::shared_ptr self_from_this() override final; public: virtual void cleanup() override final; private: //void bevEvent(short events); virtual void bevRead() override final; virtual void bevWrite() override final; }; struct ServIface { server::Server::Pvt * const server; SockAddr bind_addr; std::string name; evsocket sock; evlisten listener; ServIface(const SockAddr &addr, server::Server::Pvt *server, bool fallback); static void onConnS(struct evconnlistener *listener, evutil_socket_t sock, struct sockaddr *peer, int socklen, void *raw); }; //! Home of the magic "server" PV used by "pvinfo" struct ServerSource : public server::Source { const std::string name; server::Server::Pvt* const serv; const Value info; INST_COUNTER(ServerSource); ServerSource(server::Server::Pvt* serv); virtual void onSearch(Search &op) override final; virtual void onCreate(std::unique_ptr &&op) override final; }; } // namespace impl namespace server { using namespace impl; struct Server::Pvt { SockAttach attach; std::weak_ptr internal_self; // "const" after ctor Config effective; epicsEvent done; std::vector beaconMsg; uint8_t beaconSeq = 0u; uint8_t beaconCnt = 0u; std::atomic beaconChange{0u}; // handle server "background" tasks. // accept new connections and send beacons evbase acceptor_loop; std::list > listeners; std::vector beaconDest; std::vector ignoreList; std::list interfaces; std::map > connections; evsocket beaconSender4, beaconSender6; evevent beaconTimer; std::vector searchReply; // properly a local of Pvt::onSearch() on the UDP worker. // made a member to avoid re-alloc of _names vector. Source::Search searchOp; StaticSource builtinsrc; RWLock sourcesLock; std::map, std::shared_ptr > sources; enum state_t { Stopped, Starting, Running, Stopping, } state; INST_COUNTER(ServerPvt); Pvt(const Config& conf); ~Pvt(); void start(); void stop(); private: void onSearch(const UDPManager::Search& msg); void doBeacons(short evt); static void doBeaconsS(evutil_socket_t fd, short evt, void *raw); }; }} // namespace pvxs::server #endif // SERVERCONN_H