diff --git a/configure/CONFIG_SITE b/configure/CONFIG_SITE index 79e8025..3fb5197 100644 --- a/configure/CONFIG_SITE +++ b/configure/CONFIG_SITE @@ -41,8 +41,8 @@ CHECK_RELEASE = YES -include $(TOP)/../CONFIG_SITE.local -include $(TOP)/configure/CONFIG_SITE.local -# MSVC - skip defining min()/max() macros -USR_CPPFLAGS_WIN32 += -DNOMINMAX +# MSVC - skip defining min()/max() macros and enable APIs +USR_CPPFLAGS_WIN32 += -DNOMINMAX -D_WIN32_WINNT=_WIN32_WINNT_VISTA ifneq ($(filter-out msvc,$(CMPLR_CLASS)),) USR_CXXFLAGS += -std=c++11 diff --git a/src/Makefile b/src/Makefile index bec8861..0ae273c 100644 --- a/src/Makefile +++ b/src/Makefile @@ -32,6 +32,7 @@ LIB_SRCS += util.cpp LIB_SRCS += evhelper.cpp LIB_SRCS += udp_collector.cpp LIB_SRCS += server.cpp +LIB_SRCS += serverconn.cpp LIB_LIBS += Com diff --git a/src/evhelper.cpp b/src/evhelper.cpp index 88e6fb5..7bd7932 100644 --- a/src/evhelper.cpp +++ b/src/evhelper.cpp @@ -114,7 +114,7 @@ void dispatch_action(evutil_socket_t _fd, short _ev, void *raw) std::unique_ptr > action(reinterpret_cast*>(raw)); (*action)(); }catch(std::exception& e){ - log_printf(logerr, PLVL_CRIT, "evhelper::call unhandled error: %s\n", e.what()); + log_printf(logerr, PLVL_CRIT, "evhelper::call unhandled error %s : %s\n", typeid(&e).name(), e.what()); } } } @@ -308,7 +308,7 @@ void evsocket::mcast_iface(const SockAddr& iface) const } -void from_wire(sbuf& buf, Size size, bool be) +void from_wire(sbuf& buf, Size& size, bool be) { if(buf.err || buf.empty()) { buf.err = true; @@ -317,16 +317,16 @@ void from_wire(sbuf& buf, Size size, bool be) uint8_t s=buf[0]; buf+=1; if(s<254) { - *size.size = s; + size.size = s; } else if(s==255) { // "null" size. not sure it is used. Replicate weirdness of pvDataCPP - *size.size = -1; + size.size = -1; } else if(s==254) { uint32_t ls = 0; from_wire(buf, ls, be); - *size.size = ls; + size.size = ls; } else { // unreachable buf.err = true; diff --git a/src/evhelper.h b/src/evhelper.h index 08d36d2..f3c1cc9 100644 --- a/src/evhelper.h +++ b/src/evhelper.h @@ -32,11 +32,20 @@ template<> struct default_delete { inline void operator()(evconnlistener* ev) { evconnlistener_free(ev); } }; +template<> +struct default_delete { + inline void operator()(bufferevent* ev) { bufferevent_free(ev); } +}; +template<> +struct default_delete { + inline void operator()(evbuffer* ev) { evbuffer_free(ev); } +}; } namespace pvxsimpl { using namespace pvxs; +//! unique_ptr which is never constructed with NULL template struct owned_ptr : public std::unique_ptr { @@ -71,7 +80,8 @@ public: typedef owned_ptr evevent; typedef owned_ptr evlisten; - +typedef owned_ptr evbufferevent; +typedef owned_ptr evbuf; PVXS_API void to_wire(sbuf& buf, const SockAddr& val, bool be); diff --git a/src/pvaproto.h b/src/pvaproto.h index d3369c7..4c617f4 100644 --- a/src/pvaproto.h +++ b/src/pvaproto.h @@ -9,11 +9,16 @@ #include #include +#include +#include +#include +#include #include #include +#include #include namespace pvxsimpl { @@ -28,6 +33,9 @@ struct sbuf { T *pos, *limit; bool err; + sbuf(std::vector& buf) + :sbuf(buf.data(), buf.size()) + {} sbuf(T *buf, size_t size) :pos(buf), limit(buf+size) ,err(false) @@ -45,6 +53,18 @@ struct sbuf { } return *this; } + + // partition owned sequence [0, size) at offset n + // return [0, n), retain [n, size) + sbuf split(size_t n) { + if(size() @@ -85,15 +105,21 @@ inline void from_wire(sbuf& buf, T& val, bool be) val = pun.v; } -//! ref. wrapper to disambiguate in case size_t and uint64_t are the same type -template +//! wrapper to disambiguate size_t from uint32_t or uint64_t. +//! +//! __Always__ initialize w/ zero for sane behavour on error. +//! @code +//! sbuf M; +//! Size blen{0}; +//! from_wire(M, blen, be); +//! for(auto n : range(blen)) { // well defined, even if M.err==true +//! @endcode struct Size { - T* size; - explicit Size(T& size) :size(&size) {} + size_t size; }; template -void from_wire(sbuf& buf, Size size, bool be) +void from_wire(sbuf& buf, Size& size, bool be) { if(buf.err || buf.empty()) { buf.err = true; @@ -102,22 +128,80 @@ void from_wire(sbuf& buf, Size size, bool be) uint8_t s=buf[0]; buf+=1; if(s<254) { - *size.size = s; + size.size = s; } else if(s==255) { - // "null" size. not sure it is used. Replicate weirdness of pvDataCPP - *size.size = -1; + // "null" size. not sure it is used. + // Replicate weirdness of pvDataCPP + // FIXME this is almost certainly a bug + size.size = -1; } else if(s==254) { uint32_t ls = 0; from_wire(buf, ls, be); - *size.size = ls; + size.size = ls; + } else { - // unreachable + // unreachable (64-bit size so far not used) buf.err = true; } } +struct Status { + enum type_t { + Ok =0, + Warn=1, + Error=2, + Fatal=3, + } code; + std::string msg; +}; + +template +void to_wire(sbuf& buf, const Status& sts, bool be) +{ + if(buf.err || buf.empty()) { + buf.err = true; + + } else if(sts.code==Status::Ok && sts.msg.empty()) { + *buf.pos++ = 255; + + } else { + *buf.pos++ = sts.code; + to_wire(buf, sts.msg.c_str(), be); + } +} + +template +void from_wire(sbuf& buf, Status& sts, bool be) +{ + if(buf.err || buf.empty()) { + buf.err = true; + + } else if(255==*buf.pos) { + buf.pos++; + sts.code = Status::Ok; + sts.msg.clear(); + + } else { + sts.code = *buf.pos++; + from_wire(buf, sts.msg, be); + } +} + +template +void from_wire(sbuf& buf, std::string& s, bool be) +{ + Size len{0}; + from_wire(buf, len, be); + if(buf.err || buf.size() inline void _to_wire(sbuf& buf, const uint8_t *mem, bool reverse) { @@ -144,8 +228,8 @@ inline void _to_wire(sbuf& buf, const uint8_t *mem, bool reverse) * @param val input variable * @param be true to encode buf in MSBF order, false in LSBF order */ -template::value, int>::type =0> -inline void to_wire(sbuf& buf, const T& val, bool be) +template::value, int>::type =0> +inline void to_wire(sbuf& buf, const T& val, bool be) { union { T v; @@ -155,8 +239,52 @@ inline void to_wire(sbuf& buf, const T& val, bool be) _to_wire(buf, pun.b, be ^ (EPICS_BYTE_ORDER==EPICS_ENDIAN_BIG)); } -PVXS_API -void to_wire(sbuf& buf, Size size, bool be); +template +void to_wire(sbuf& buf, const Size& size, bool be) +{ + if(buf.err || buf.empty()) { + buf.err = true; + + } else if(size.size<254) { + *buf.pos++ = size.size; + + } else if(size.size<=0xffffffff) { + *buf.pos++ = 254; + to_wire(buf, uint32_t(size.size), be); + + } else { + buf.err = true; + } +} + +template +void to_wire(sbuf& buf, const char *s, bool be) +{ + Size len{s ? strlen(s) : 0}; + to_wire(buf, len, be); + if(buf.err || buf.size() +void to_wire(sbuf& buf, std::initializer_list bytes, bool be) +{ + if(buf.err || buf.size() +void to_wire(sbuf& buf, const Header& H, bool be) +{ + if(buf.err || buf.size()<8) { + buf.err = true; + + } else { + buf[0] = 0xca; + buf[1] = (H.flags&pva_flags::Server) ? pva_version::server : pva_version::client; + buf[2] = H.flags; + if(be) + buf[2] |= pva_flags::MSB; + buf[3] = H.cmd; + buf += 4; + to_wire(buf, H.len, be); + } +} + } // namespace pvxsimpl #endif // PVAPROTO_H diff --git a/src/pvxs/server.h b/src/pvxs/server.h index c802d83..7d38c47 100644 --- a/src/pvxs/server.h +++ b/src/pvxs/server.h @@ -13,6 +13,7 @@ #include #include #include +#include #include #include @@ -76,24 +77,34 @@ struct FallbackHandler class Attachment { }; */ + +struct Handler; +struct Source; + +/** PV Access protocol server instance + * + * Use a Server::Config to determine how this server will bind, listen, + * and announce itself. + */ class PVXS_API Server { public: struct Config { - //! List of network interface addresses to which this server will bind (list of TCP connections). + //! List of network interface addresses to which this server will bind. //! interfaces.empty() treated as an alias for "0.0.0.0", which may also be given explicitly. //! Port numbers are optional and unused (parsed and ignored) std::vector interfaces; //! Addresses to which (UDP) beacons message will be sent. //! May include broadcast and/or unicast addresses. - //! Special value "*" is expanded with all local interfaces broadcast addresses. std::vector beaconDestinations; unsigned short tcp_port; - unsigned short default_udp; + unsigned short udp_port; bool auto_beacon; + std::array guid; + PVXS_API static Config from_env(); - Config() :tcp_port(5075), default_udp(5076), auto_beacon(true) {} + Config() :tcp_port(5075), udp_port(5076), auto_beacon(true), guid{} {} }; //! An empty/dummy Server @@ -108,14 +119,15 @@ public: //! effective config const Config& config() const; - /* - Attachment attach(const std::string& name, - std::unique_ptr&& handler); + Server& addSource(const std::string& name, + const std::shared_ptr& src, + int order =0); - Attachment attachFallback(std::unique_ptr&& handler); + std::unique_ptr removeSource(const std::string& name); - void detach(Attachment&& attach); - */ + std::unique_ptr getSource(const std::string& name); + + void listSource(std::vector& names); explicit operator bool() const { return !!pvt; } @@ -124,6 +136,39 @@ private: std::unique_ptr pvt; }; +struct PVXS_API Source { + virtual ~Source(); + + struct Search { + class Name { + const char* _name; + bool _claim; + friend struct Server::Pvt; + public: + inline const char* name() const { return _name; } + inline void claim() { _claim = true; } + }; + private: + typedef std::vector _names_t; + _names_t _names; + SockAddr _src; + friend struct Server::Pvt; + public: + + _names_t::iterator begin() { return _names.begin(); } + _names_t::iterator end() { return _names.end(); } + const SockAddr& source() const { return _src; } + }; + virtual void onSearch(Search& op) =0; + + struct Create { + std::string name; + std::string src; + // credentials + }; + virtual std::unique_ptr onCreate(const Create& op) =0; +}; + }} // namespace pvxs::server #endif // PVXS_SERVER_H diff --git a/src/pvxs/util.h b/src/pvxs/util.h index 33f1cf8..e441094 100644 --- a/src/pvxs/util.h +++ b/src/pvxs/util.h @@ -66,7 +66,8 @@ private: public: explicit SockAddr(int af = AF_UNSPEC); - explicit SockAddr(int af, const char *address); + explicit SockAddr(int af, const char *address, unsigned short port=0); + explicit SockAddr(const sockaddr *addr, ev_socklen_t len); inline explicit SockAddr(int af, const std::string& address) :SockAddr(af, address.c_str()) {} size_t size() const; @@ -75,8 +76,9 @@ public: unsigned short port() const; void setPort(unsigned short port); - void setAddress(const char *); + void setAddress(const char *, unsigned short port=0); + bool isAny() const; bool isLO() const; store_t* operator->() { return &store; } diff --git a/src/server.cpp b/src/server.cpp index 53a8fcc..ab6dd47 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -8,14 +8,19 @@ #include #include #include +#include +#include #include #include #include +#include +#include #include #include #include "evhelper.h" +#include "serverconn.h" #include "utilpvt.h" #include "udp_collector.h" @@ -42,7 +47,7 @@ void split_into(std::vector& out, const char *inp) Server::Config Server::Config::from_env() { Server::Config ret; - ret.default_udp = 5076; + ret.udp_port = 5076; if(const char *env = getenv("EPICS_PVAS_INTF_ADDR_LIST")) { split_into(ret.interfaces, env); @@ -54,6 +59,8 @@ Server::Config Server::Config::from_env() split_into(ret.beaconDestinations, env); } + // TODO resolve host->IP in interfaces and beaconDestinations + ret.tcp_port = 5075; if(const char *env = getenv("EPICS_PVAS_SERVER_PORT")) { ret.tcp_port = lexical_cast(env); @@ -61,93 +68,16 @@ Server::Config Server::Config::from_env() ret.tcp_port = lexical_cast(env); } - ret.default_udp = 5076; + ret.udp_port = 5076; if(const char *env = getenv("EPICS_PVAS_BROADCAST_PORT")) { - ret.default_udp = lexical_cast(env); + ret.udp_port = lexical_cast(env); } else if(const char *env = getenv("EPICS_PVA_BROADCAST_PORT")) { - ret.default_udp = lexical_cast(env); + ret.udp_port = lexical_cast(env); } return ret; } -namespace { - -struct ServIface -{ - Server::Pvt * const server; - - SockAddr bind_addr; - std::string name; - - evsocket sock; - evlisten listener; - - std::unique_ptr searchrx; - - ServIface(const std::string& addr, Server::Pvt *server); - - void onConn(evutil_socket_t sock, struct sockaddr *peer, int socklen); - static void onConnS(struct evconnlistener *listener, evutil_socket_t sock, struct sockaddr *peer, int socklen, void *raw) - { - try { - if(peer->sa_family!=AF_INET) { - log_printf(serversetup, PLVL_CRIT, "Rejecting !ipv4 client\n"); - evutil_closesocket(sock); - return; - } - static_cast(raw)->onConn(sock, peer, socklen); - }catch(std::exception& e){ - log_printf(serverio, PLVL_CRIT, "Unhandled error in accept callback: %s\n", e.what()); - } - } -}; - -} // namespace - -struct Server::Pvt -{ - // "const" after ctor - Config effective; - - std::list interfaces; - - std::vector beaconDest; - - // handlers for active TCP connections, by priority. - // once added, these remain stable for the lifetime of the Server - std::map prio_loops; - - // handle server "background" tasks. - // accept new connections and send beacons - evbase acceptor_loop; - - evsocket beaconSender; - evevent beaconTimer; - - enum { - Stopped, - Starting, - Running, - Stopping, - } state; - - Pvt(Config&& conf); - ~Pvt(); - - void start(); - void stop(); - - void doBeacons(short evt); - static void doBeaconsS(evutil_socket_t fd, short evt, void *raw) - { - try { - static_cast(raw)->doBeacons(evt); - }catch(std::exception& e){ - log_printf(serverio, PLVL_CRIT, "Unhandled error in beacon timer callback: %s\n", e.what()); - } - } -}; Server::Server() {} @@ -176,9 +106,11 @@ Server& Server::start() Server::Pvt::Pvt(Config&& conf) :effective(std::move(conf)) + ,beaconMsg(128) ,acceptor_loop("PVXS Acceptor", epicsThreadPriorityCAServerLow-2) ,beaconSender(AF_INET, SOCK_DGRAM, 0) ,beaconTimer(event_new(acceptor_loop.base, -1, EV_TIMEOUT, doBeaconsS, this)) + ,searchReply(0x10000) ,state(Stopped) { // empty interface address list implies the wildcard @@ -187,15 +119,65 @@ Server::Pvt::Pvt(Config&& conf) effective.interfaces.push_back("0.0.0.0"); } - acceptor_loop.call([this](){ + auto manager = UDPManager::instance(); + + for(const auto& iface : effective.interfaces) { + SockAddr addr(AF_INET, iface.c_str()); + addr.setPort(effective.udp_port); + listeners.push_back(manager.onSearch(addr, + std::bind(&Pvt::onSearch, this, std::placeholders::_1) )); + // update to allow udp_port==0 + effective.udp_port = addr.port(); + } + + evsocket dummy(AF_INET, SOCK_DGRAM, 0); + + { + // choose new GUID. + // treat as 3x 32-bit unsigned. + union { + std::array i; + std::array b; + } pun; + static_assert (sizeof(pun)==12, ""); + + // i[0] time + epicsTimeStamp now; + epicsTimeGetCurrent(&now); + pun.i[0] = now.secPastEpoch ^ now.nsec; + + // i[1] host + // mix together all local bcast addresses + pun.i[1] = 0xdeadbeef; // because... why not + { + ELLLIST bcasts = ELLLIST_INIT; + osiSockDiscoverBroadcastAddresses(&bcasts, dummy.sock, nullptr); + + while(ELLNODE *cur = ellGet(&bcasts)) { + osiSockAddrNode *node = CONTAINER(cur, osiSockAddrNode, node); + if(node->addr.sa.sa_family==AF_INET) + pun.i[1] ^= ntohl(node->addr.ia.sin_addr.s_addr); + free(cur); + } + } + + // i[2] random + pun.i[2] = (rand()/double(RAND_MAX))*0xffffffff; + + std::copy(pun.b.begin(), pun.b.end(), effective.guid.begin()); + } + + acceptor_loop.call([this, &dummy](){ // from acceptor worker for(const auto& addr : effective.interfaces) { - interfaces.emplace_back(addr, this); + interfaces.emplace_back(addr, effective.tcp_port, this); + if(effective.tcp_port==0) + effective.tcp_port = interfaces.back().bind_addr.port(); } for(const auto& addr : effective.beaconDestinations) { - beaconDest.emplace_back(AF_INET, addr); + beaconDest.emplace_back(AF_INET, addr.c_str(), effective.udp_port); } if(effective.auto_beacon) { @@ -204,8 +186,6 @@ Server::Pvt::Pvt(Config&& conf) ELLLIST bcasts = ELLLIST_INIT; try { - evsocket dummy(AF_INET, SOCK_DGRAM, 0); - for(const auto& iface : interfaces) { if(iface.bind_addr.family()!=AF_INET) continue; @@ -214,7 +194,7 @@ Server::Pvt::Pvt(Config&& conf) osiSockDiscoverBroadcastAddresses(&bcasts, dummy.sock, &match); } - // do our best to avoid an bad_alloc during iteration + // do our best to avoid a bad_alloc during iteration beaconDest.reserve(beaconDest.size()+(size_t)ellCount(&bcasts)); while(ELLNODE *cur = ellGet(&bcasts)) { @@ -244,11 +224,16 @@ Server::Pvt::Pvt(Config&& conf) }); } -Server::Pvt::~Pvt() {} +Server::Pvt::~Pvt() +{ + stop(); +} void Server::Pvt::start() { log_printf(serversetup, PLVL_DEBUG, "Server Starting\n"); + + // begin accepting connections acceptor_loop.call([this]() { if(state!=Stopped) { @@ -265,7 +250,16 @@ void Server::Pvt::start() } log_printf(serversetup, PLVL_DEBUG, "Server enabled listener on %s\n", iface.name.c_str()); } + }); + // being processing Searches + for(auto& L : listeners) { + L->start(); + } + + // begin sending beacons + acceptor_loop.call([this]() + { // send first beacon immediately if(event_add(beaconTimer.get(), nullptr)) log_printf(serversetup, PLVL_ERR, "Error enabling beacon timer on\n"); @@ -273,16 +267,7 @@ void Server::Pvt::start() state = Running; }); - auto manager = UDPManager::instance(); - for(auto& iface : interfaces) { - auto addr = iface.bind_addr; - addr.setPort(effective.default_udp); - iface.searchrx = manager.onSearch(addr, [](const UDPManager::Search& msg) - { - // TODO handle search - }); - } } void Server::Pvt::stop() @@ -303,72 +288,133 @@ void Server::Pvt::stop() }); // stop processing Search requests - for(auto& iface : interfaces) { - iface.searchrx.reset(); + for(auto& L : listeners) { + L->stop(); } - // stop listening for new TCP connections + // stop accepting new TCP connections acceptor_loop.call([this]() { - for(auto& iface : interfaces) { if(evconnlistener_disable(iface.listener.get())) { log_printf(serversetup, PLVL_ERR, "Error disabling listener on %s\n", iface.name.c_str()); } log_printf(serversetup, PLVL_DEBUG, "Server disabled listener on %s\n", iface.name.c_str()); } - }); - // Close in-progress connections (and cancel Ops) - - acceptor_loop.call([this]() - { state = Stopped; }); } +void Server::Pvt::onSearch(const UDPManager::Search& msg) +{ + // on UDPManager worker + + searchOp._names.resize(msg.names.size()); + for(auto i : range(msg.names.size())) { + searchOp._names[i]._name = msg.names[i].name; + searchOp._names[i]._claim = false; + } + + { + epicsGuard G(sourcesLock.reader()); + for(const auto& pair : sources) { + try { + pair.second->onSearch(searchOp); + }catch(std::exception& e){ + log_printf(serversetup, PLVL_ERR, "Unhandled error in Source::onSearch for '%s' : %s\n", + pair.first.second.c_str(), e.what()); + } + } + } + + uint16_t nreply = 0; + for(const auto& name : searchOp._names) { + if(name._claim) + nreply++; + } + + // "pvlist" breaks unless we honor mustReply flag + if(nreply==0 && !msg.mustReply) + return; + + sbuf M(searchReply.data(), searchReply.size()); + + const bool be = true; + to_wire(M, {0xca, pva_version::server, pva_flags::MSB|pva_flags::Server, pva_app_msg::SearchReply}, be); + auto blen = M.split(4); + + _to_wire<12>(M, effective.guid.data(), false); + to_wire(M, msg.searchID, be); + to_wire(M, SockAddr::any(AF_INET), be); + to_wire(M, uint16_t(effective.udp_port), be); + to_wire(M, "tcp", be); + // "found" flag + to_wire(M, {uint8_t(nreply!=0 ? 1 : 0)}, be); + + to_wire(M, uint16_t(nreply), be); + for(auto i : range(msg.names.size())) { + if(searchOp._names[i]._claim) + to_wire(M, uint32_t(msg.names[i].id), be); + } + + uint32_t ntx = M.pos-searchReply.data(); + to_wire(blen, uint32_t(ntx-8), be); + + if(M.err || blen.err) { + log_printf(serverio, PLVL_CRIT, "Logic error in Search buffer fill\n"); + } else { + (void)msg.reply(searchReply.data(), ntx); + } +} + void Server::Pvt::doBeacons(short evt) { log_printf(serversetup, PLVL_DEBUG, "Server beacon timer expires\n"); - // TODO send beacons + sbuf M(beaconMsg.data(), beaconMsg.size()); + const bool be = true; + to_wire(M, {0xca, pva_version::server, pva_flags::MSB|pva_flags::Server, pva_app_msg::Beacon}, be); + auto lenfld = M.split(4); + + _to_wire<12>(M, effective.guid.data(), false); + M += 4; // ignored/unused + + to_wire(M, SockAddr::any(AF_INET), be); + to_wire(M, uint16_t(effective.tcp_port), be); + to_wire(M, "tcp", be); + // "NULL" serverStatus + to_wire(M, {0xff}, be); + + to_wire(lenfld, uint32_t(M.pos - beaconMsg.data()), be); + + assert(!M.err && !lenfld.err); + + for(const auto& dest : beaconDest) { + int ntx = sendto(beaconSender.sock, (char*)beaconMsg.data(), beaconMsg.size(), 0, &dest->sa, dest.size()); + + if(ntx<0) { + int err = evutil_socket_geterror(beaconSender.sock); + log_printf(serverio, PLVL_WARN, "Beacon tx error (%d) %s\n", + err, evutil_socket_error_to_string(err)); + + } else if(unsigned(ntx)acceptor_loop.assertInLoop(); - - // try to bind to requested port, then fallback to a random port - while(true) { - try { - sock.bind(bind_addr); - } catch(std::system_error& e) { - if(e.code().value()==SOCK_EADDRINUSE && bind_addr.port()!=0) { - bind_addr.setPort(0); - continue; - } - throw; - } - break; + try { + static_cast(raw)->doBeacons(evt); + }catch(std::exception& e){ + log_printf(serverio, PLVL_CRIT, "Unhandled error in beacon timer callback: %s\n", e.what()); } - - name = bind_addr.tostring(); - - const int backlog = 4; - listener = evlisten(evconnlistener_new(server->acceptor_loop.base, onConnS, this, LEV_OPT_DISABLED, backlog, sock.sock)); -} - -void ServIface::onConn(evutil_socket_t sock, struct sockaddr *peer, int socklen) -{ - // TODO - evutil_closesocket(sock); } }} // namespace pvxs::server diff --git a/src/serverconn.cpp b/src/serverconn.cpp new file mode 100644 index 0000000..3517a83 --- /dev/null +++ b/src/serverconn.cpp @@ -0,0 +1,420 @@ +/** + * Copyright - See the COPYRIGHT that is included with this distribution. + * pvxs is distributed subject to a Software License Agreement found + * in file LICENSE that is included with this distribution. + */ + +#include +#include + +#include +#include + +#include +#include "serverconn.h" + +// Amount of following messages which we allow to be read while +// processing the current message. Avoids some extra recv() calls, +// at the price of maybe extra copying. +static const size_t tcp_readahead = 0x1000; + +namespace pvxsimpl { + +DEFINE_LOGGER(connsetup, "tcp.setup"); +DEFINE_LOGGER(connio, "tcp.io"); + +ServerConn::ServerConn(ServIface* iface, evutil_socket_t sock, struct sockaddr *peer, int socklen) + :iface(iface) + ,peerAddr(peer, socklen) + ,peerName(peerAddr.tostring()) + ,bev(bufferevent_socket_new(iface->server->acceptor_loop.base, sock, BEV_OPT_CLOSE_ON_FREE|BEV_OPT_DEFER_CALLBACKS)) + ,peerBE(true) // arbitrary choice, default should be overwritten before use + ,expectSeg(false) + ,segCmd(0xff) + ,segBuf(evbuffer_new()) +{ + bufferevent_setcb(bev.get(), &bevReadS, &bevWriteS, &bevEventS, this); + // initially wait for at least a header + bufferevent_setwatermark(bev.get(), EV_READ, 8, tcp_readahead); + + timeval timo = {30, 0}; + bufferevent_set_timeouts(bev.get(), &timo, &timo); + + auto tx = bufferevent_get_output(bev.get()); + + std::vector buf(128); + const bool be = EPICS_BYTE_ORDER == EPICS_ENDIAN_BIG; + + // queue connection validation message + { + uint8_t flags = be ? pva_flags::MSB : 0; + flags |= pva_flags::Server; + + sbuf M(buf.data(), buf.size()); + to_wire(M, {0xca, pva_version::server, uint8_t(flags|pva_flags::Control), pva_ctrl_msg::SetEndian}, be); + to_wire(M, uint32_t(0), be); + + to_wire(M, {0xca, pva_version::server, flags, pva_app_msg::ConnValid}, be); + auto blen = M.split(4); + auto bstart = blen.pos; + + // serverReceiveBufferSize, not used + to_wire(M, uint32_t(0x10000), be); + // serverIntrospectionRegistryMaxSize, also not used + to_wire(M, uint16_t(0x7fff), be); + to_wire(M, Size{2}, be); + to_wire(M, "anonymous", be); + to_wire(M, "ca", be); + + to_wire(blen, uint32_t(M.pos-bstart), be); + + assert(!M.err && !blen.err); + + if(evbuffer_add(tx, buf.data(), M.pos-buf.data())) + throw std::bad_alloc(); + } + + if(bufferevent_enable(bev.get(), EV_READ|EV_WRITE)) + throw std::logic_error("Unable to enable BEV"); +} + +ServerConn::~ServerConn() +{} + + +void ServerConn::handle_Echo() +{ + // Client requests echo as a keep-alive check + + auto tx = bufferevent_get_output(bev.get()); + uint32_t len = evbuffer_get_length(segBuf.get()); + + const bool be = EPICS_BYTE_ORDER == EPICS_ENDIAN_BIG; + uint8_t header[8]; + sbuf M(header, sizeof(header)); + to_wire(M, Header{pva_app_msg::Echo, pva_flags::Server, len}, be); + assert(!M.err); + + auto err = evbuffer_add(tx, header, sizeof(header)); + err |= evbuffer_add_buffer(tx, segBuf.get()); + assert(!err); + + // maybe help reduce latency + bufferevent_flush(bev.get(), EV_WRITE, BEV_FLUSH); +} + +void ServerConn::handle_ConnValid() +{ + // Client begins/restarts Auth handshake + + // size to extract and process up to auth payload. + // client may only select from our advertised auth + // mechanisms. "anonymous" is the longest. + uint8_t buf[4+2+2+sizeof("anonymous")]; + + const auto n = evbuffer_copyout(segBuf.get(), buf, sizeof(buf)); + + sbuf M(buf, n); + + M += 6; // ignore unused buffer and introspection size + uint16_t qos; + from_wire(M, qos, peerBE); + std::string selected; + from_wire(M, selected, peerBE); + + (void)evbuffer_drain(segBuf.get(), M.pos-buf); + + if(M.err) { + log_hex_printf(connio, PLVL_ERR, buf, n, "Truncated/Invalid ConnValid from client"); + bev.reset(); + return; + } else if(selected!="ca" && selected!="anonymous") { + log_printf(connio, PLVL_DEBUG, "Client selects unadvertised auth \"%s\"", selected.c_str()); + } + + // remainder of segBuf is payload w/ credentials +} + +void ServerConn::handle_AuthZ() +{} + +void ServerConn::handle_Search() +{} + +void ServerConn::handle_CreateChan() +{} + +void ServerConn::handle_DestroyChan() +{} + +void ServerConn::handle_GetOp() +{} + +void ServerConn::handle_PutOp() +{} + +void ServerConn::handle_RPCOp() +{} + +void ServerConn::handle_PutGetOp() +{} + +void ServerConn::handle_CancelOp() +{} + +void ServerConn::handle_DestroyOp() +{} + +void ServerConn::handle_Introspect() +{} + +void ServerConn::handle_Message() +{} + + +void ServerConn::cleanup() +{ + // remove myself from connections list + decltype (iface->connections) trash; + for (auto it = iface->connections.begin(), end = iface->connections.end(); it!=end; ++it) { + if((&*it)==this) { + trash.splice(it, iface->connections); + break; + } + } + assert(!trash.empty()); +} + +void ServerConn::bevEvent(short events) +{ + if(events&(BEV_EVENT_EOF|BEV_EVENT_ERROR|BEV_EVENT_TIMEOUT)) { + if(events&BEV_EVENT_ERROR) { + int err = EVUTIL_SOCKET_ERROR(); + const char *msg = evutil_socket_error_to_string(err); + log_printf(connio, PLVL_ERR, "Server connection closed with socket error %d : %s\n", err, msg); + } + if(events&BEV_EVENT_EOF) { + log_printf(connio, PLVL_DEBUG, "Server connection closed by peer\n"); + } + if(events&BEV_EVENT_TIMEOUT) { + log_printf(connio, PLVL_WARN, "Server connection timeout\n"); + } + bev.reset(); + } + + if(!bev) + cleanup(); +} + +void ServerConn::bevRead() +{ + auto rx = bufferevent_get_input(bev.get()); + + while(bev && evbuffer_get_length(rx)>=8) { + uint8_t header[8]; + + auto ret = evbuffer_copyout(rx, header, sizeof(header)); + assert(ret==sizeof(header)); // previously verified + + if(header[0]!=0xca || header[1]==0 || !(header[2]&pva_flags::Server)) { + log_hex_printf(connio, PLVL_ERR, header, sizeof(header), "Protocol decode fault. Force disconnect.\n"); + bev.reset(); + break; + } + + if(header[2]&pva_flags::Control) { + switch (header[3]) { + case pva_ctrl_msg::SetEndian: + // while we don't enforce. This should be the very first message sent. + peerBE = header[2]&pva_flags::MSB; + break; + default: + // Set/AckMarker never used + break; + } + evbuffer_drain(rx, 8); + continue; + + } + // application message + + const bool be = header[2]&pva_flags::MSB; + if(be!=peerBE) { + // wonderful PVA is redundant in communicating peer byte order. + // Which is included in every header _and_ the special SetEndian control message. + // While they really should be consistent, the original impl. only uses SetEndian + log_printf(connio, PLVL_CRIT, "Peer messages with inconsistent endian\n"); + } + + // a bit verbose :P + sbuf L(&header[4], 4); + uint32_t len = 0; + from_wire(L, len, peerBE); + assert(!L.err); + + if(evbuffer_get_length(rx)-8 < len) { + // wait for complete payload + // and some additional if available + size_t readahead = len; + if(readahead < std::numeric_limits::max()-tcp_readahead) + readahead += tcp_readahead; + bufferevent_setwatermark(bev.get(), EV_READ, len, readahead); + break; + } + + evbuffer_drain(rx, 8); + { + unsigned n = evbuffer_remove_buffer(rx, segBuf.get(), len); + assert(n==len); // we know rx buf contains the entire body + } + + // so far we do not use segmentation to support incremental processing + // of long messages. We instead accumulate all segments of a message + // prior to parsing. + + auto seg = header[2]&pva_flags::SegMask; + + bool continuation = seg&pva_flags::SegLast; // true for mid or last. false for none for first + if((continuation ^ expectSeg) || (continuation && header[3]!=segCmd)) { + log_printf(connio, PLVL_CRIT, "Peer segmentation violation %c%c 0x%02x==0x%02x\n", + expectSeg?'Y':'N', continuation?'Y':'N', + segCmd, header[3]); + bev.reset(); + break; + } + + if(!seg || seg==pva_flags::SegFirst) { + expectSeg = true; + segCmd = header[3]; + } + + if(!seg || seg==pva_flags::SegLast) { + expectSeg = false; + + // ready to process segBuf + switch(segCmd) { + default: + log_printf(connio, PLVL_DEBUG, "Ignore unexpected command 0x%02x\n", segCmd); + evbuffer_drain(segBuf.get(), evbuffer_get_length(segBuf.get())); + break; +#define CASE(Op) case pva_app_msg::Op: handle_##Op(); break + CASE(Echo); + CASE(ConnValid); + CASE(Search); + CASE(AuthZ); + + CASE(CreateChan); + CASE(DestroyChan); + + CASE(GetOp); + CASE(PutOp); + CASE(PutGetOp); + CASE(RPCOp); + CASE(CancelOp); + CASE(DestroyOp); + CASE(Introspect); + + CASE(Message); +#undef CASE + } + // handlers may be cleared bev to force disconnect + + // silently drain any unprocessed body (forward compatibility) + if(auto n = evbuffer_get_length(segBuf.get())) + evbuffer_drain(segBuf.get(), n); + } + } + + if(!bev) { + cleanup(); + + } else if(auto tx = bufferevent_get_output(bev.get())) { + if(evbuffer_get_length(tx)>=0x100000) { + // write buffer "full". stop reading until it drains + // TODO configure + (void)bufferevent_disable(bev.get(), EV_READ); + bufferevent_setwatermark(bev.get(), EV_WRITE, 0x100000/2, 0); + } + } +} + +void ServerConn::bevWrite() +{ + (void)bufferevent_enable(bev.get(), EV_READ); + bufferevent_setwatermark(bev.get(), EV_WRITE, 0, 0); +} + +void ServerConn::bevEventS(struct bufferevent *bev, short events, void *ptr) +{ + try { + static_cast(ptr)->bevEvent(events); + }catch(std::exception& e){ + log_printf(connio, PLVL_CRIT, "Unhandled error in bev event callback: %s\n", e.what()); + static_cast(ptr)->cleanup(); + } +} + +void ServerConn::bevReadS(struct bufferevent *bev, void *ptr) +{ + try { + static_cast(ptr)->bevRead(); + }catch(std::exception& e){ + log_printf(connio, PLVL_CRIT, "Unhandled error in bev read callback: %s\n", e.what()); + static_cast(ptr)->cleanup(); + } +} + +void ServerConn::bevWriteS(struct bufferevent *bev, void *ptr) +{ + try { + static_cast(ptr)->bevWrite(); + }catch(std::exception& e){ + log_printf(connio, PLVL_CRIT, "Unhandled error in bev write callback: %s\n", e.what()); + static_cast(ptr)->cleanup(); + } +} + +ServIface::ServIface(const std::string& addr, unsigned short port, server::Server::Pvt *server) + :server(server) + ,bind_addr(AF_INET, addr.c_str(), port) + ,sock(AF_INET, SOCK_STREAM, 0) +{ + server->acceptor_loop.assertInLoop(); + + // try to bind to requested port, then fallback to a random port + while(true) { + try { + sock.bind(bind_addr); + } catch(std::system_error& e) { + if(e.code().value()==SOCK_EADDRINUSE && bind_addr.port()!=0) { + bind_addr.setPort(0); + continue; + } + throw; + } + break; + } + + name = bind_addr.tostring(); + + const int backlog = 4; + listener = evlisten(evconnlistener_new(server->acceptor_loop.base, onConnS, this, LEV_OPT_DISABLED, backlog, sock.sock)); +} + +void ServIface::onConnS(struct evconnlistener *listener, evutil_socket_t sock, struct sockaddr *peer, int socklen, void *raw) +{ + try { + if(peer->sa_family!=AF_INET) { + log_printf(connsetup, PLVL_CRIT, "Rejecting !ipv4 client\n"); + evutil_closesocket(sock); + return; + } + auto self = static_cast(raw); + self->connections.emplace_back(self, sock, peer, socklen); + }catch(std::exception& e){ + log_printf(connio, PLVL_CRIT, "Unhandled error in accept callback: %s\n", e.what()); + evutil_closesocket(sock); + } +} + +} // namespace pvxsimpl diff --git a/src/serverconn.h b/src/serverconn.h new file mode 100644 index 0000000..ce6d9b6 --- /dev/null +++ b/src/serverconn.h @@ -0,0 +1,155 @@ +/** + * Copyright - See the COPYRIGHT that is included with this distribution. + * pvxs is distributed subject to a Software License Agreement found + * in file LICENSE that is included with this distribution. + */ + +#ifndef SERVERCONN_H +#define SERVERCONN_H + +#include +#include + +#include +#include "evhelper.h" +#include "utilpvt.h" +#include "udp_collector.h" + +namespace pvxsimpl { + +struct ServIface; +struct ServerConn; + +struct ServerOp +{ + ServerConn* const conn; + + evbuf rx; + + ServerOp(); +}; + +struct ServerConn +{ + ServIface* const iface; + + SockAddr peerAddr; + std::string peerName; + evbufferevent bev; + + // credentials + + bool peerBE; + bool expectSeg; + + uint8_t segCmd; + evbuf segBuf; + + ServerConn(ServIface* iface, evutil_socket_t sock, struct sockaddr *peer, int socklen); + ~ServerConn(); + +private: +#define CASE(Op) void handle_##Op(); + CASE(Echo); + CASE(ConnValid); + CASE(Search); + CASE(AuthZ); + + CASE(CreateChan); + CASE(DestroyChan); + + CASE(GetOp); + CASE(PutOp); + CASE(PutGetOp); + CASE(RPCOp); + CASE(CancelOp); + CASE(DestroyOp); + CASE(Introspect); + + CASE(Message); +#undef CASE + + void cleanup(); + void bevEvent(short events); + void bevRead(); + void bevWrite(); + static void bevEventS(struct bufferevent *bev, short events, void *ptr); + static void bevReadS(struct bufferevent *bev, void *ptr); + static void bevWriteS(struct bufferevent *bev, void *ptr); +}; + +struct ServIface +{ + server::Server::Pvt * const server; + + SockAddr bind_addr; + std::string name; + + evsocket sock; + evlisten listener; + + std::list connections; + + ServIface(const std::string& addr, unsigned short port, server::Server::Pvt *server); + + static void onConnS(struct evconnlistener *listener, evutil_socket_t sock, struct sockaddr *peer, int socklen, void *raw); +}; + +} // namespace pvxsimpl + +namespace pvxs { +namespace server { +using namespace pvxsimpl; + +struct Server::Pvt +{ + // "const" after ctor + Config effective; + + std::vector beaconMsg; + + std::list > listeners; + std::list interfaces; + + std::vector beaconDest; + + // handlers for active TCP connections, by priority. + // once added, these remain stable for the lifetime of the Server + std::map prio_loops; + + // handle server "background" tasks. + // accept new connections and send beacons + evbase acceptor_loop; + + evsocket beaconSender; + evevent beaconTimer; + + std::vector searchReply; + + Source::Search searchOp; + + RWLock sourcesLock; + std::map, std::shared_ptr > sources; + + enum { + Stopped, + Starting, + Running, + Stopping, + } state; + + Pvt(Config&& conf); + ~Pvt(); + + void start(); + void stop(); + +private: + void onSearch(const UDPManager::Search& msg); + void doBeacons(short evt); + static void doBeaconsS(evutil_socket_t fd, short evt, void *raw); +}; + +}} // namespace pvxs::server + +#endif // SERVERCONN_H diff --git a/src/udp_collector.cpp b/src/udp_collector.cpp index 1f7b429..0464c46 100644 --- a/src/udp_collector.cpp +++ b/src/udp_collector.cpp @@ -31,16 +31,6 @@ namespace pvxsimpl { DEFINE_LOGGER(logio, "udp.io"); DEFINE_LOGGER(logsetup, "udp.setup"); -struct UDPListener : public std::enable_shared_from_this -{ - std::function searchCB; - std::function beaconCB; - std::shared_ptr collector; - const SockAddr dest; - UDPListener(UDPManager::Pvt *manager, SockAddr& dest); - ~UDPListener(); -}; - struct UDPCollector : public UDPManager::Search, public std::enable_shared_from_this { @@ -119,36 +109,35 @@ struct UDPCollector : public UDPManager::Search, switch(cmd) { case pva_app_msg::Search: { - uint32_t id; + uint8_t flags = 0; SockAddr replyAddr; + uint16_t port = 0; - from_wire(M, id, be); - M += 4; // flags and unused/reserved + from_wire(M, searchID, be); + from_wire(M, flags, be); + mustReply = flags&pva_search_flags::MustReply; + M += 3; // unused/reserved from_wire(M, replyAddr, be); - uint16_t port = 0; from_wire(M, port, be); + if(replyAddr.isAny()) { + replyAddr = src; + } replyAddr.setPort(port); // so far, only "tcp" transport has ever been seen. // however, we will consider and ignore any others which might appear bool foundtcp = false; - size_t nproto=0; - from_wire(M, Size(nproto), be); - for(size_t i=0; i(nchar), be); + Size nproto{0}; + from_wire(M, nproto, be); + for(size_t i=0; i=3 && nchar==3 && M[0]=='t' && M[1]=='c' && M[2]=='p') { - foundtcp = true; - M += 3; - break; - } - } - if(!foundtcp && !M.err) { - // so far, not something which should actually happen - log_printf(logio, PLVL_DEBUG, " Search w/o proto \"tcp\"\n"); - return true; + // shortcut to avoid allocating a std::string + // "tcp" is the only value we expect to see + foundtcp |= M.size()>=3 && nchar.size==3 && M[0]=='t' && M[1]=='c' && M[2]=='p'; + M += nchar.size; } // one Search message can include many PV names. @@ -160,17 +149,17 @@ struct UDPCollector : public UDPManager::Search, for(size_t i=0; i(chlen), be); + from_wire(M, chlen, be); // inject nil for previous PV name *mundge = '\0'; - if(chlen<=M.size() && !M.err) { - names.push_back(reinterpret_cast(M.pos)); + if(foundtcp && chlen.size<=M.size() && !M.err) { + names.push_back(UDPManager::Search::Name{reinterpret_cast(M.pos), id}); } - M += chlen; + M += chlen.size; } if(!M.err) { @@ -194,11 +183,14 @@ struct UDPCollector : public UDPManager::Search, M += 4; // skip flags, seq, and change count. unused from_wire(M, beaconMsg.server, be); from_wire(M, port, be); + if(beaconMsg.server.isAny()) { + beaconMsg.server = src; + } beaconMsg.server.setPort(port); - size_t protolen=0; - from_wire(M, Size(protolen), be); - M += protolen; // ignore string + Size protolen{0}; + from_wire(M, protolen, be); + M += protolen.size; // ignore string // ignore remaining "server status" blob @@ -265,7 +257,7 @@ UDPCollector::UDPCollector(const std::shared_ptr& manager, cons ,buf(0x10001) ,beaconMsg(src) { - beaconMsg.guid.resize(12); + manager->loop.assertInLoop(); epicsSocketEnableAddressUseForDatagramFanout(sock.sock); sock.bind(this->bind_addr); @@ -279,6 +271,8 @@ UDPCollector::UDPCollector(const std::shared_ptr& manager, cons UDPCollector::~UDPCollector() { + manager->loop.assertInLoop(); + // we should only be destroyed after that last listener has removed itself assert(listeners.empty()); manager->loop.assertInLoop(); @@ -363,6 +357,7 @@ void UDPManager::sync() UDPListener::UDPListener(UDPManager::Pvt *manager, SockAddr &dest) :dest(dest) + ,active(false) { manager->loop.assertInLoop(); @@ -381,26 +376,36 @@ UDPListener::UDPListener(UDPManager::Pvt *manager, SockAddr &dest) collector.reset(new UDPCollector(manager->shared_from_this(), dest)); dest = collector->bind_addr; } - - collector->listeners.insert(this); } UDPListener::~UDPListener() { - if(!collector) - return; - auto manager = collector->manager; manager->loop.call([this](){ // from event loop worker - collector->listeners.erase(this); + if(active) + collector->listeners.erase(this); collector.reset(); // destroy UDPCollector from worker }); // UDPManager may be destroyed at this point, which joins its event loop worker } +void UDPListener::start(bool s) +{ + collector->manager->loop.call([this, s](){ + if(s && !active) { + collector->listeners.insert(this); + + } else if(!s && active) { + collector->listeners.erase(this); + } + + active = s; + }); +} + bool UDPCollector::reply(const void *msg, size_t msglen) const { manager->loop.assertInLoop(); @@ -422,9 +427,3 @@ bool UDPCollector::reply(const void *msg, size_t msglen) const UDPManager::Search::~Search() {} } // namespace pvxsimpl - -namespace std { -void default_delete::operator()(pvxsimpl::UDPListener* listener) { - delete listener; -}; -} // namespace std diff --git a/src/udp_collector.h b/src/udp_collector.h index 8c91492..6adc1a0 100644 --- a/src/udp_collector.h +++ b/src/udp_collector.h @@ -11,23 +11,13 @@ #include #include #include +#include #include #include "evhelper.h" namespace pvxsimpl { struct UDPListener; -} // namespace pvxsimpl - -namespace std { -template<> -struct default_delete { - PVXS_API void operator()(pvxsimpl::UDPListener*); -}; -} // namespace std - -namespace pvxsimpl { - struct UDPCollector; struct UDPManager; @@ -41,20 +31,34 @@ struct PVXS_API UDPManager struct Beacon { SockAddr& src; SockAddr server; - std::vector guid; + std::array guid; Beacon(SockAddr& src) :src(src) {} }; + //! Create subscription for Beacon messages. + //! Must call UDPListener::start() std::unique_ptr onBeacon(SockAddr& dest, std::function&& cb); struct PVXS_API Search { SockAddr src; SockAddr server; - std::vector names; + uint32_t searchID; + bool mustReply; + struct Name { + const char *name; + uint32_t id; + }; + + std::vector names; + + decltype (names)::const_iterator begin() const { return names.begin(); } + decltype (names)::const_iterator end() const { return names.end(); } virtual bool reply(const void *msg, size_t msglen) const =0; virtual ~Search(); }; + //! Create subscription for Search messages. + //! Must call UDPListener::start() std::unique_ptr onSearch(SockAddr& dest, std::function&& cb); @@ -72,6 +76,24 @@ private: friend struct UDPCollector; }; +class PVXS_API UDPListener +{ + std::function searchCB; + std::function beaconCB; + std::shared_ptr collector; + const SockAddr dest; + bool active; + friend struct UDPCollector; + friend struct UDPManager; + +public: + UDPListener(UDPManager::Pvt *manager, SockAddr& dest); + ~UDPListener(); + + void start(bool s=true); + inline void stop() { start(false); } +}; + } // namespace pvxsimpl #endif // UDP_COLLECTOR_H diff --git a/src/util.cpp b/src/util.cpp index 0a3d5c6..b2bc4da 100644 --- a/src/util.cpp +++ b/src/util.cpp @@ -79,10 +79,18 @@ SockAddr::SockAddr(int af) throw std::invalid_argument("Unsupported address family"); } -SockAddr::SockAddr(int af, const char *address) +SockAddr::SockAddr(int af, const char *address, unsigned short port) :SockAddr(af) { - setAddress(address); + setAddress(address, port); +} + +SockAddr::SockAddr(const sockaddr *addr, ev_socklen_t len) + :SockAddr(addr->sa_family) +{ + if(len<0 || len>ev_socklen_t(size())) + throw std::invalid_argument("Truncated Address"); + memcpy(&store, addr, len); } size_t SockAddr::size() const @@ -120,15 +128,28 @@ void SockAddr::setPort(unsigned short port) } } -void SockAddr::setAddress(const char *name) +void SockAddr::setAddress(const char *name, unsigned short port) { SockAddr temp; int templen = sizeof(temp.store); if(evutil_parse_sockaddr_port(name, &temp->sa, &templen)) throw std::runtime_error(std::string("Unable to parse as IP addresss: ")+name); + if(temp.port()==0) + temp.setPort(port); (*this) = temp; } +bool SockAddr::isAny() const +{ + switch(store.sa.sa_family) { + case AF_INET: return store.in.sin_addr.s_addr==htonl(INADDR_ANY); +#ifdef AF_INET6 + case AF_INET6: return IN6_IS_ADDR_UNSPECIFIED(&store.in6.sin6_addr); +#endif + default: return false; + } +} + bool SockAddr::isLO() const { switch(store.sa.sa_family) { diff --git a/src/utilpvt.h b/src/utilpvt.h index a57dd26..372d5eb 100644 --- a/src/utilpvt.h +++ b/src/utilpvt.h @@ -6,6 +6,13 @@ #ifndef UTILPVT_H #define UTILPVT_H + +#ifdef _WIN32 +# include +#else +# include +#endif + #include #include @@ -43,6 +50,82 @@ inline T lexical_cast(const std::string& s) return detail::as_str::op(s.c_str()); } + +namespace detail { +template +struct Range { + I a, b; + + struct iterator : std::iterator_traits { + I val; + explicit iterator(I val) :val(val) {} + inline I operator*() const { return val; } + inline iterator& operator++() { val++; return *this; } + inline iterator operator++(int) { return iterator{val++}; } + inline bool operator==(const iterator& o) const { return val==o.val; } + inline bool operator!=(const iterator& o) const { return val!=o.val; } + }; + + inline iterator begin() const { return iterator{a}; } + inline iterator cbegin() const { return begin(); } + inline iterator end() const { return iterator{b}; } + inline iterator cend() const { return end(); } +}; +} // namespace detail + +template +detail::Range range(I end) { return detail::Range{I(0), end}; } + +template +detail::Range range(I begin, I end) { return detail::Range{begin, end}; } + +class RWLock +{ +#ifdef _WIN32 + SRWLOCK lock; +public: + inline RWLock() :_reader(*this), _writer(*this) { InitializeSRWLock(&lock); } +#else + pthread_rwlock_t lock; +public: + inline RWLock() :_reader(*this), _writer(*this) { pthread_rwlock_init(&lock, nullptr); } + inline ~RWLock() { pthread_rwlock_destroy(&lock); } +#endif + + RWLock(const RWLock&) = delete; + RWLock(RWLock&&) = delete; + RWLock& operator=(const RWLock&) = delete; + RWLock& operator=(RWLock&&) = delete; + + class Reader { + RWLock& rw; + public: + Reader(RWLock& rw) : rw(rw) {} +#ifdef _WIN32 + inline void lock() { AcquireSRWLockShared(&rw.lock); } + inline void unlock() { ReleaseSRWLockShared(&rw.lock); } +#else + inline void lock() { pthread_rwlock_rdlock(&rw.lock); } + inline void unlock() { pthread_rwlock_unlock(&rw.lock); } +#endif + } _reader; + inline Reader& reader() { return _reader; } + + class Writer { + RWLock& rw; + public: + Writer(RWLock& rw) : rw(rw) {} +#ifdef _WIN32 + inline void lock() { AcquireSRWLockExclusive(&rw.lock); } + inline void unlock() { ReleaseSRWLockExclusive(&rw.lock); } +#else + inline void lock() { pthread_rwlock_wrlock(&rw.lock); } + inline void unlock() { pthread_rwlock_unlock(&rw.lock); } +#endif + } _writer; + inline Writer& writer() { return _writer; } +}; + } // namespace pvxsimpl #endif // UTILPVT_H diff --git a/test/testudp.cpp b/test/testudp.cpp index ed09f5d..3992568 100644 --- a/test/testudp.cpp +++ b/test/testudp.cpp @@ -40,13 +40,14 @@ void testBeacon(bool be) { testDiag("Beacon received"); testEq(msg.src, sender); - testEq(msg.server, SockAddr::any(AF_INET, 0x1234)); + testEq(msg.server, SockAddr::loopback(AF_INET, 0x1234)); uint8_t expect[] = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12}; testOk1(msg.guid.size()==12 && std::equal(msg.guid.begin(), msg.guid.end(), expect)); rx.signal(); }); + sub->start(); testDiag("Listen on %s", listener.tostring().c_str()); @@ -99,45 +100,40 @@ void testSearch(bool be, std::initializer_list names) { testDiag("Search received"); for(auto name : msg.names) { - testDiag(" For %s", name); + testDiag(" For %s", name.name); } if(testEq(msg.names.size(), names.size())) { size_t i=0; for(auto name : msg.names) { - testEq(msg.names[i++], name); + testEq(msg.names[i].id, i+1); + testEq(msg.names[i++].name, name.name); } } rx.signal(); }); + sub->start(); std::vector msg(1024, 0); sbuf M(msg.data(), msg.size()); - M[0] = 0xca; - M[1] = pva_version::client; - M[2] = be ? pva_flags::MSB : 0; - M[3] = pva_app_msg::Search; - M+=4; - M+=4; //come back to this later + to_wire(M, {0xca, pva_version::client, uint8_t(be ? pva_flags::MSB : 0), pva_app_msg::Search}, be); + auto blen = M.split(4); to_wire(M, uint32_t(0x12345678), be); M+=4; SockAddr reply(SockAddr::any(AF_INET, 0x1020)); to_wire(M, reply, be); to_wire(M, uint16_t(reply.port()), be); // one protocol w/ 3 chars - M[0] = 1; - M[1] = 3; - M[2] = 't'; - M[3] = 'c'; - M[4] = 'p'; - M+=5; + to_wire(M, {1}, be); + to_wire(M, "tcp", be); to_wire(M, uint16_t(names.size()), be); uint32_t i=1; for(auto name : names) { to_wire(M, i++, be); - M[0] = strlen(name); - memcpy((char*)M.pos+1, name, M[0]); - M+=1+M[0]; + to_wire(M, name, be); } + + to_wire(blen, uint32_t(M.pos-msg.data()-8), be); + testOk1(!M.err); testDiag("Buffer pos %u of %u", unsigned(M.pos-msg.data()), unsigned(msg.size())); @@ -151,7 +147,7 @@ void testSearch(bool be, std::initializer_list names) int main(int argc, char *argv[]) { - testPlan(32); + testPlan(38); pvxs::logger_config_env(); testBeacon(true); testBeacon(false); diff --git a/tools/pvxvct.cpp b/tools/pvxvct.cpp index d6e76c9..23bdb83 100644 --- a/tools/pvxvct.cpp +++ b/tools/pvxvct.cpp @@ -199,7 +199,7 @@ int main(int argc, char *argv[]) { log_printf(out, PLVL_INFO, "%s Searching for:\n", msg.src.tostring().c_str()); for(const auto pv : msg.names) { - log_printf(out, PLVL_INFO, " \"%s\"\n", pv); + log_printf(out, PLVL_INFO, " \"%s\"\n", pv.name); } }; @@ -213,13 +213,15 @@ int main(int argc, char *argv[]) }; - std::vector, std::unique_ptr>> listeners; + std::vector, std::unique_ptr>> listeners; listeners.reserve(bindaddrs.size()); for(auto& baddr : bindaddrs) { auto manager = pva::UDPManager::instance(); listeners.emplace_back(manager.onSearch(baddr, searchCB), manager.onBeacon(baddr, beaconCB)); + listeners.back().first->start(); + listeners.back().second->start(); log_printf(out, PLVL_DEBUG, "Bind: %s\n", baddr.tostring().c_str()); }