diff --git a/src/evhelper.cpp b/src/evhelper.cpp index 4abf875..88e6fb5 100644 --- a/src/evhelper.cpp +++ b/src/evhelper.cpp @@ -183,48 +183,6 @@ bool evbase::inLoop() return pvt->worker.isCurrentThread(); } - -evevent::evevent(struct event_base *base, evutil_socket_t sock, short mask, event_callback_fn fn, void *arg) - :ev(event_new(base, sock, mask, fn, arg)) -{ - if(!ev) - throw std::bad_alloc(); - log_printf(logerr, PLVL_DEBUG, "Create event %p on %p for %d (%x)\n", - ev, base, (int)sock, mask); -} - -evevent::~evevent() -{ - if(ev) { - log_printf(logerr, PLVL_DEBUG, "Destroy event %p\n", ev); - event_free(ev); - } -} - -evevent::evevent(evevent&& o) noexcept - :ev(o.ev) -{ - o.ev = nullptr; -} - -evevent& evevent::operator=(evevent&& o) noexcept -{ - if(this!=&o) { - if(ev) - event_free(ev); - ev = o.ev; - o.ev = nullptr; - } - return *this; -} - -void evevent::add(const struct timeval *tv) -{ - log_printf(logerr, PLVL_DEBUG, "Add event %p\n", ev); - if(event_add(ev, tv)) - throw std::runtime_error("event_add() fails"); -} - void to_wire(sbuf& buf, const SockAddr &val, bool be) { if(buf.err || buf.size()<16) { diff --git a/src/evhelper.h b/src/evhelper.h index 38cf0cf..08d36d2 100644 --- a/src/evhelper.h +++ b/src/evhelper.h @@ -28,11 +28,25 @@ template<> struct default_delete { inline void operator()(event* ev) { event_free(ev); } }; +template<> +struct default_delete { + inline void operator()(evconnlistener* ev) { evconnlistener_free(ev); } +}; } namespace pvxsimpl { using namespace pvxs; +template +struct owned_ptr : public std::unique_ptr +{ + constexpr owned_ptr() {} + explicit owned_ptr(T* ptr) : std::unique_ptr(ptr) { + if(!*this) + throw std::bad_alloc(); + } +}; + struct PVXS_API evbase { explicit evbase(const std::string& name, unsigned prio=0); ~evbase(); @@ -55,57 +69,9 @@ public: event_base* const base; }; -struct PVXS_API evevent { - event *ev; +typedef owned_ptr evevent; +typedef owned_ptr evlisten; - constexpr evevent() :ev(nullptr) {} - evevent(struct event_base *base, evutil_socket_t sock, short mask, event_callback_fn fn, void *arg); - ~evevent(); - evevent(evevent&&) noexcept; - evevent& operator=(evevent&&) noexcept; - evevent(const evevent&) = delete; - evevent& operator=(const evevent&) = delete; - - operator bool() const { return ev; } - operator event*() const { return ev; } - - void add(const timeval *tv=nullptr); -}; - -struct evlisten { - evconnlistener * lev; - - evlisten() :lev(nullptr) {} - evlisten(evlisten&& o) noexcept - :lev(o.lev) - { - o.lev = nullptr; - } - evlisten& operator=(evlisten&& o) noexcept - { - if(this!=&o) { - if(lev) - evconnlistener_free(lev); - lev = o.lev; - o.lev = nullptr; - } - return *this; - } - evlisten(const evlisten&) = delete; - evlisten& operator=(const evlisten&) = delete; - - template - evlisten(Args...args) - :lev(evconnlistener_new(std::forward(args)...)) - { - if(!lev) - throw std::bad_alloc(); - } - ~evlisten() { - if(lev) - evconnlistener_free(lev); - } -}; PVXS_API void to_wire(sbuf& buf, const SockAddr& val, bool be); diff --git a/src/pvaproto.h b/src/pvaproto.h index 46b510f..d3369c7 100644 --- a/src/pvaproto.h +++ b/src/pvaproto.h @@ -28,7 +28,7 @@ struct sbuf { T *pos, *limit; bool err; - sbuf(T *buf, T size) + sbuf(T *buf, size_t size) :pos(buf), limit(buf+size) ,err(false) {} diff --git a/src/pvxs/unittest.h b/src/pvxs/unittest.h index 288f31d..202f9a2 100644 --- a/src/pvxs/unittest.h +++ b/src/pvxs/unittest.h @@ -39,6 +39,8 @@ public: testCase& operator=(testCase&&) noexcept; ~testCase(); + explicit operator bool() const { return result==Pass; } + template inline testCase& operator<<(const T& v) { msg<acceptor_loop.base, onConnS, this, LEV_OPT_DISABLED, backlog, sock.sock); + listener = evlisten(evconnlistener_new(server->acceptor_loop.base, onConnS, this, LEV_OPT_DISABLED, backlog, sock.sock)); } void ServIface::onConn(evutil_socket_t sock, struct sockaddr *peer, int socklen) { + // TODO evutil_closesocket(sock); } diff --git a/src/udp_collector.cpp b/src/udp_collector.cpp index de99e81..1f7b429 100644 --- a/src/udp_collector.cpp +++ b/src/udp_collector.cpp @@ -37,7 +37,7 @@ struct UDPListener : public std::enable_shared_from_this std::function beaconCB; std::shared_ptr collector; const SockAddr dest; - UDPListener(UDPManager::Pvt *manager, const SockAddr& dest); + UDPListener(UDPManager::Pvt *manager, SockAddr& dest); ~UDPListener(); }; @@ -66,7 +66,6 @@ struct UDPCollector : public UDPManager::Search, // For Search messages, we use PV name strings in-place by adding nils. // Ensure one extra byte at the end of the buffer for a nil after the last PV name const int nrx = recvfrom(sock.sock, (char*)&buf[0], buf.size()-1, 0, &src->sa, &alen); - log_printf(logio, PLVL_DEBUG, "recvfrom() -> %d\n", nrx); if(nrx<0) { int err = evutil_socket_geterror(sock.sock); @@ -97,7 +96,7 @@ struct UDPCollector : public UDPManager::Search, return true; } - log_hex_printf(logio, PLVL_DEBUG, &buf[0], nrx, "UDP Rx from %s", src.tostring().c_str()); + log_hex_printf(logio, PLVL_DEBUG, &buf[0], nrx, "UDP Rx %d from %s\n", nrx, src.tostring().c_str()); names.clear(); @@ -218,7 +217,7 @@ struct UDPCollector : public UDPManager::Search, } void handle(short ev) { - log_printf(logio, PLVL_DEBUG, "UDP %p event %x\n", rx.ev, ev); + log_printf(logio, PLVL_DEBUG, "UDP %p event %x\n", rx.get(), ev); if(!(ev&EV_READ)) return; @@ -262,7 +261,7 @@ UDPCollector::UDPCollector(const std::shared_ptr& manager, cons :manager(manager) ,bind_addr(bind_addr) ,sock(bind_addr.family(), SOCK_DGRAM, 0) - ,rx(manager->loop.base, sock.sock, EV_READ|EV_PERSIST, &handle_static, this) + ,rx(event_new(manager->loop.base, sock.sock, EV_READ|EV_PERSIST, &handle_static, this)) ,buf(0x10001) ,beaconMsg(src) { @@ -274,7 +273,8 @@ UDPCollector::UDPCollector(const std::shared_ptr& manager, cons log_printf(logsetup, PLVL_INFO, "Bound to %s\n", name.c_str()); - rx.add(); + if(event_add(rx.get(), nullptr)) + throw std::runtime_error("Unable to create collector Rx event"); } UDPCollector::~UDPCollector() @@ -353,9 +353,19 @@ std::unique_ptr UDPManager::onSearch(SockAddr& dest, return ret; } -UDPListener::UDPListener(UDPManager::Pvt *manager, const SockAddr &dest) +void UDPManager::sync() +{ + if(!pvt) + throw std::invalid_argument("UDPManager null"); + + pvt->loop.sync(); +} + +UDPListener::UDPListener(UDPManager::Pvt *manager, SockAddr &dest) :dest(dest) { + manager->loop.assertInLoop(); + if(dest.port()!=0) { auto it = manager->collectors.find(dest); if(it!=manager->collectors.end()) { @@ -369,6 +379,7 @@ UDPListener::UDPListener(UDPManager::Pvt *manager, const SockAddr &dest) if(!collector) { collector.reset(new UDPCollector(manager->shared_from_this(), dest)); + dest = collector->bind_addr; } collector->listeners.insert(this); @@ -392,6 +403,8 @@ UDPListener::~UDPListener() bool UDPCollector::reply(const void *msg, size_t msglen) const { + manager->loop.assertInLoop(); + int ntx = sendto(sock.sock, (char*)msg, msglen, 0, &src->sa, src.size()); if(ntx<0) { int err = evutil_socket_geterror(sock.sock); diff --git a/src/udp_collector.h b/src/udp_collector.h index 920c793..8c91492 100644 --- a/src/udp_collector.h +++ b/src/udp_collector.h @@ -58,6 +58,8 @@ struct PVXS_API UDPManager std::unique_ptr onSearch(SockAddr& dest, std::function&& cb); + void sync(); + explicit operator bool() const { return !!pvt; } UDPManager(); diff --git a/test/Makefile b/test/Makefile index 04752e1..a4cb72a 100644 --- a/test/Makefile +++ b/test/Makefile @@ -18,6 +18,10 @@ TESTPROD += testev testev_SRCS += testev.cpp TESTS += testev +TESTPROD += testudp +testudp_SRCS += testudp.cpp +TESTS += testudp + PROD_SYS_LIBS += event_core PROD_SYS_LIBS_DEFAULT += event_pthreads diff --git a/test/testudp.cpp b/test/testudp.cpp new file mode 100644 index 0000000..ed09f5d --- /dev/null +++ b/test/testudp.cpp @@ -0,0 +1,163 @@ +/** + * Copyright - See the COPYRIGHT that is included with this distribution. + * pvxs is distributed subject to a Software License Agreement found + * in file LICENSE that is included with this distribution. + */ + +#include +#include + +#include +#include +#include + +#include +#include +#include + +#include +#include "evhelper.h" +#include + +namespace { +using namespace pvxsimpl; + +void testBeacon(bool be) +{ + testDiag("In %s", __func__); + + SockAddr listener(SockAddr::loopback(AF_INET)); + SockAddr sender(SockAddr::loopback(AF_INET)); + + evsocket sock(AF_INET, SOCK_DGRAM, 0); + sock.bind(sender); + testDiag("Sending from %s", sender.tostring().c_str()); + + epicsEvent rx; + auto manager = UDPManager::instance(); + auto sub = manager.onBeacon(listener, + [&sender, &rx](const UDPManager::Beacon& msg) + { + testDiag("Beacon received"); + testEq(msg.src, sender); + testEq(msg.server, SockAddr::any(AF_INET, 0x1234)); + + uint8_t expect[] = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12}; + testOk1(msg.guid.size()==12 && std::equal(msg.guid.begin(), msg.guid.end(), expect)); + + rx.signal(); + }); + + testDiag("Listen on %s", listener.tostring().c_str()); + + uint8_t msg[46] = { + // header + 0xca, pva_version::server, 0, pva_app_msg::Beacon, + 0, 0, 0, 0, // length filled in later + // GUID + 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, + // unused/ignored + 0, 0, 0, 0, + // Server address + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff, 0, 0, 0, 0, + 0, 0, // port filled in later + // protocol + 3, 't', 'c', 'p', + // anything further is ignored + }; + + if(be) { + msg[2] |= pva_flags::MSB; + msg[7] = sizeof(msg)-8; + msg[40] = 0x12; + msg[41] = 0x34; + } else { + msg[4] = sizeof(msg)-8; + msg[40] = 0x34; + msg[41] = 0x12; + } + + testOk1(sendto(sock.sock, (char*)msg, sizeof(msg), 0, &listener->sa, listener.size())==sizeof(msg)); + manager.sync(); + testOk1(!!rx.wait(30.0)); +} + +void testSearch(bool be, std::initializer_list names) +{ + testDiag("In %s", __func__); + + SockAddr listener(SockAddr::loopback(AF_INET)); + SockAddr sender(SockAddr::loopback(AF_INET)); + + evsocket sock(AF_INET, SOCK_DGRAM, 0); + sock.bind(sender); + testDiag("Sending from %s", sender.tostring().c_str()); + + epicsEvent rx; + auto manager = UDPManager::instance(); + auto sub = manager.onSearch(listener, [&rx, names](const UDPManager::Search& msg) + { + testDiag("Search received"); + for(auto name : msg.names) { + testDiag(" For %s", name); + } + if(testEq(msg.names.size(), names.size())) { + size_t i=0; + for(auto name : msg.names) { + testEq(msg.names[i++], name); + } + } + rx.signal(); + }); + + std::vector msg(1024, 0); + sbuf M(msg.data(), msg.size()); + M[0] = 0xca; + M[1] = pva_version::client; + M[2] = be ? pva_flags::MSB : 0; + M[3] = pva_app_msg::Search; + M+=4; + M+=4; //come back to this later + to_wire(M, uint32_t(0x12345678), be); + M+=4; + SockAddr reply(SockAddr::any(AF_INET, 0x1020)); + to_wire(M, reply, be); + to_wire(M, uint16_t(reply.port()), be); + // one protocol w/ 3 chars + M[0] = 1; + M[1] = 3; + M[2] = 't'; + M[3] = 'c'; + M[4] = 'p'; + M+=5; + to_wire(M, uint16_t(names.size()), be); + uint32_t i=1; + for(auto name : names) { + to_wire(M, i++, be); + M[0] = strlen(name); + memcpy((char*)M.pos+1, name, M[0]); + M+=1+M[0]; + } + testOk1(!M.err); + testDiag("Buffer pos %u of %u", unsigned(M.pos-msg.data()), unsigned(msg.size())); + + const size_t ntx = M.pos-msg.data(); + testOk1(sendto(sock.sock, (char*)msg.data(), ntx, 0, &listener->sa, listener.size())==int(ntx)); + manager.sync(); + testOk1(!!rx.wait(30.0)); +} + +} // namespace + +int main(int argc, char *argv[]) +{ + testPlan(32); + pvxs::logger_config_env(); + testBeacon(true); + testBeacon(false); + testSearch(true , {"hello"}); + testSearch(false, {"hello"}); + testSearch(true , {"one", "two"}); + testSearch(false, {"one", "two"}); + return testDone(); +}