diff --git a/src/evhelper.cpp b/src/evhelper.cpp index 7bd7932..28f4f67 100644 --- a/src/evhelper.cpp +++ b/src/evhelper.cpp @@ -183,25 +183,6 @@ bool evbase::inLoop() return pvt->worker.isCurrentThread(); } -void to_wire(sbuf& buf, const SockAddr &val, bool be) -{ - if(buf.err || buf.size()<16) { - buf.err = true; - - } else if(val.family()==AF_INET) { - for(unsigned i=0; i<10; i++) - buf[i]=0; - buf[10] = buf[11] = 0xff; - - memcpy(buf.pos+12, &val->in.sin_addr.s_addr, 4); - - } else if(val.family()==AF_INET6) { - static_assert (sizeof(val->in6.sin6_addr)==16, ""); - memcpy(buf.pos, &val->in6.sin6_addr, 16); - } - buf += 16; -} - evsocket::evsocket(evutil_socket_t sock) :sock(sock) { @@ -307,30 +288,70 @@ void evsocket::mcast_iface(const SockAddr& iface) const // IPV6_MULTICAST_IF } +bool VectorOutBuf::refill(size_t more) { + assert(pos <= limit); + assert(pos >= backing.data()); -void from_wire(sbuf& buf, Size& size, bool be) + if(err) return false; + + more = ((more-1)|0xff)+1; // round up to multiple of 256 + size_t idx = pos - backing.data(); // save current offset + try{ + backing.resize(backing.size()+more); + }catch(std::bad_alloc& e) { + return false; + } + pos = backing.data()+idx; + limit = backing.data()+backing.size(); + return true; +} + +bool EvOutBuf::refill(size_t more) { - if(buf.err || buf.empty()) { - buf.err = true; - return; - } - uint8_t s=buf[0]; - buf+=1; - if(s<254) { - size.size = s; + if(err) return false; - } else if(s==255) { - // "null" size. not sure it is used. Replicate weirdness of pvDataCPP - size.size = -1; + evbuffer_iovec vec; + vec.iov_base = base; + vec.iov_len = pos-base; - } else if(s==254) { - uint32_t ls = 0; - from_wire(buf, ls, be); - size.size = ls; - } else { - // unreachable - buf.err = true; + if(base && evbuffer_commit_space(backing, &vec, 1)) + throw std::bad_alloc(); // leak? + + limit = base = pos = nullptr; + + if(more) { + auto n = evbuffer_reserve_space(backing, more, &vec, 1); + if(n!=1) { + return false; + } + + base = pos = (uint8_t*)vec.iov_base; + limit = base+vec.iov_len; } + return true; +} + +bool EvInBuf::refill(size_t more) +{ + if(err) return false; + + if(base && evbuffer_drain(backing, pos-base)) + throw std::bad_alloc(); + + limit = base = pos = nullptr; + + if(more) { + evbuffer_iovec vec; + + auto n = evbuffer_peek(backing, -1, nullptr, &vec, 1); + if(n<=0) { // current (2.1) impl never returns negative + return false; + } + + base = pos = (uint8_t*)vec.iov_base; + limit = base+vec.iov_len; + } + return true; } } // namespace pvxsimpl diff --git a/src/evhelper.h b/src/evhelper.h index f3c1cc9..17b2992 100644 --- a/src/evhelper.h +++ b/src/evhelper.h @@ -83,14 +83,32 @@ typedef owned_ptr evlisten; typedef owned_ptr evbufferevent; typedef owned_ptr evbuf; -PVXS_API -void to_wire(sbuf& buf, const SockAddr& val, bool be); - -template -void from_wire(sbuf &buf, SockAddr& val, bool be) +template +void to_wire(Buf& buf, const SockAddr& val) { - if(buf.err || buf.size()<16) { - buf.err = true; + if(!buf.ensure(16)) { + buf.fault(); + return; + + } else if(val.family()==AF_INET) { + for(unsigned i=0; i<10; i++) + buf[i]=0; + buf[10] = buf[11] = 0xff; + + memcpy(buf.save()+12, &val->in.sin_addr.s_addr, 4); + + } else if(val.family()==AF_INET6) { + static_assert (sizeof(val->in6.sin6_addr)==16, ""); + memcpy(buf.save(), &val->in6.sin6_addr, 16); + } + buf._skip(16); +} + +template +void from_wire(Buf &buf, SockAddr& val) +{ + if(!buf.ensure(16)) { + buf.fault(); return; } @@ -104,16 +122,16 @@ void from_wire(sbuf &buf, SockAddr& val, bool be) if(ismapped) { val->in = {}; val->in.sin_family = AF_INET; - memcpy(&val->in.sin_addr.s_addr, buf.pos+12, 4); + memcpy(&val->in.sin_addr.s_addr, buf.save()+12, 4); } else { val->in6 = {}; val->in6.sin6_family = AF_INET6; static_assert (sizeof(val->in6.sin6_addr)==16, ""); - memcpy(&val->in6.sin6_addr, buf.pos, 16); + memcpy(&val->in6.sin6_addr, buf.save(), 16); } - buf += 16; + buf._skip(16); } struct PVXS_API evsocket diff --git a/src/log.cpp b/src/log.cpp index 4387260..bb3de87 100644 --- a/src/log.cpp +++ b/src/log.cpp @@ -23,6 +23,7 @@ #include #include "evhelper.h" +#include "utilpvt.h" typedef epicsGuard Guard; @@ -226,3 +227,18 @@ void logger_config_env() } } // namespace pvxs + +namespace pvxsimpl { + +void logger_shutdown() +{ + epicsThreadOnce(&logger_once, &logger_prepare, nullptr); + + errlogFlush(); + + delete logger_gbl; + logger_gbl = nullptr; + // no resetting logger_once +} + +} // namespace pvxsimpl diff --git a/src/pvaproto.h b/src/pvaproto.h index 4c617f4..f060305 100644 --- a/src/pvaproto.h +++ b/src/pvaproto.h @@ -12,6 +12,7 @@ #include #include #include +#include #include #include @@ -23,55 +24,125 @@ namespace pvxsimpl { -//! Hold a bounded slice of some other array. -//! like std::span (added in c++20) -//! blending in error state tracking like std::iostream -template -struct sbuf { - typedef T value_type; +//! view of a slice of a buffer. +//! Don't use directly. cf. FixedBuf +template +struct BufCommon { + typedef E value_type; + typedef void is_buffer; - T *pos, *limit; +protected: + // valid range to read/write is [pos, limit) + E *pos, *limit; bool err; - sbuf(std::vector& buf) - :sbuf(buf.data(), buf.size()) - {} - sbuf(T *buf, size_t size) - :pos(buf), limit(buf+size) - ,err(false) - {} + constexpr BufCommon(bool be, E* buf, size_t n) :pos(buf), limit(buf+n), err(false), be(be) {} +public: + const bool be; - inline bool empty() const { return limit==pos; } - inline size_t size() const { return limit-pos; } - inline T& operator[](size_t i) const { return pos[i]; } + // all sub-classes define + // bool refill(size_t more) - sbuf& operator+=(size_t n) { - if(size()=i + inline bool ensure(size_t i) { + return !err && (i<=size() || static_cast(this)->refill(i)); + } + inline void skip(size_t i) { + do { + if(i<=size()) { + pos += i; + return; + } + pos = limit; + i -= size(); + } while(static_cast(this)->refill(i)); } - // partition owned sequence [0, size) at offset n - // return [0, n), retain [n, size) - sbuf split(size_t n) { - if(size()=i + EPICS_ALWAYS_INLINE void _skip(size_t i) { pos+=i; } + + E* save() const { return this->pos; } }; -template -inline void _from_wire(sbuf& buf, uint8_t *mem, bool reverse) +//! (de)serialization to/from buffers which are fixed size and contigious +template +struct FixedBuf : public BufCommon > { - if(buf.err || buf.size() base_type; + EPICS_ALWAYS_INLINE bool refill(size_t more) { return false; } + + template + constexpr FixedBuf(bool be, E(&buf)[N]) :base_type(be, buf, N) {} + constexpr FixedBuf(bool be, E* buf, size_t n) :base_type(be, buf, n) {} + FixedBuf(bool be, std::vector& buf) :base_type(be, buf.data(), buf.size()) {} +}; + +//! serialize into a vector, resizing as necessary +class VectorOutBuf : public BufCommon +{ + typedef BufCommon base_type; + std::vector& backing; +public: + // note: vector::data() is not constexpr in c++11 + VectorOutBuf(bool be, std::vector& b) + :base_type(be, b.data(), b.size()) + ,backing(b) + {} + PVXS_API bool refill(size_t more); +}; + +//! serialize into an evbuffer, resizing as necessary +class EvOutBuf : public BufCommon +{ + typedef BufCommon base_type; + evbuffer * const backing; + uint8_t* base; // original pos +public: + + EvOutBuf(bool be, evbuffer *b, size_t isize=0) + :base_type(be, nullptr, 0) + ,backing(b) + ,base(nullptr) + {refill(isize);} + ~EvOutBuf() + {refill(0);} + PVXS_API bool refill(size_t more); +}; + +//! deserialize from an evbuffer, possibly segmented +class EvInBuf : public BufCommon +{ + typedef BufCommon base_type; + evbuffer * const backing; + uint8_t* base; // original pos after ctor or refill() +public: + + EvInBuf(bool be, evbuffer *b, size_t ifill=0) + :base_type(be, nullptr, 0) + ,backing(b) + ,base(nullptr) + {refill(ifill);} + ~EvInBuf() + {refill(0);} + + PVXS_API bool refill(size_t more); +}; + +template +inline void _from_wire(Buf& buf, uint8_t *mem, bool reverse) +{ + if(!buf.ensure(N)) { + buf.fault(); return; } else if(reverse) { @@ -84,7 +155,7 @@ inline void _from_wire(sbuf& buf, uint8_t *mem, bool reverse) mem[i] = buf[i]; } } - buf += N; + buf._skip(N); } /** Read sizeof(T) bytes from buf and store in val @@ -93,15 +164,15 @@ inline void _from_wire(sbuf& buf, uint8_t *mem, bool reverse) * @param val output variable * @param be true if value encoded in buf is in MSBF order, false if in LSBF order */ -template::value, int>::type =0> -inline void from_wire(sbuf& buf, T& val, bool be) +template::value, int>::type =0> +inline void from_wire(Buf& buf, T& val) { union { T v; uint8_t b[sizeof(T)]; } pun; - _from_wire(buf, pun.b, be ^ (EPICS_BYTE_ORDER==EPICS_ENDIAN_BIG)); - if(!buf.err) + _from_wire(buf, pun.b, buf.be ^ (EPICS_BYTE_ORDER==EPICS_ENDIAN_BIG)); + if(buf.good()) val = pun.v; } @@ -118,15 +189,14 @@ struct Size { size_t size; }; -template -void from_wire(sbuf& buf, Size& size, bool be) +template +void from_wire(Buf& buf, Size& size) { - if(buf.err || buf.empty()) { - buf.err = true; + if(!buf.ensure(1)) { + buf.fault(); return; } - uint8_t s=buf[0]; - buf+=1; + uint8_t s=buf.pop(); if(s<254) { size.size = s; @@ -138,12 +208,12 @@ void from_wire(sbuf& buf, Size& size, bool be) } else if(s==254) { uint32_t ls = 0; - from_wire(buf, ls, be); + from_wire(buf, ls); size.size = ls; } else { // unreachable (64-bit size so far not used) - buf.err = true; + buf.fault(); } } @@ -157,8 +227,8 @@ struct Status { std::string msg; }; -template -void to_wire(sbuf& buf, const Status& sts, bool be) +template +void to_wire(Buf& buf, const Status& sts) { if(buf.err || buf.empty()) { buf.err = true; @@ -168,12 +238,12 @@ void to_wire(sbuf& buf, const Status& sts, bool be) } else { *buf.pos++ = sts.code; - to_wire(buf, sts.msg.c_str(), be); + to_wire(buf, sts.msg.c_str()); } } -template -void from_wire(sbuf& buf, Status& sts, bool be) +template +void from_wire(Buf& buf, Status& sts) { if(buf.err || buf.empty()) { buf.err = true; @@ -185,28 +255,30 @@ void from_wire(sbuf& buf, Status& sts, bool be) } else { sts.code = *buf.pos++; - from_wire(buf, sts.msg, be); + from_wire(buf, sts.msg); } } -template -void from_wire(sbuf& buf, std::string& s, bool be) +template +void from_wire(Buf& buf, std::string& s) { Size len{0}; - from_wire(buf, len, be); - if(buf.err || buf.size() -inline void _to_wire(sbuf& buf, const uint8_t *mem, bool reverse) +// assumes prior buf.ensure(M) where M>=N +template +inline void _to_wire(Buf& buf, const uint8_t *mem, bool reverse) { - if(buf.err || buf.size()& buf, const uint8_t *mem, bool reverse) buf[i] = mem[i]; } } - buf += N; + buf._skip(N); } /** Write sizeof(T) bytes from buf from val * * @param buf output buffer. buf[0] through buf[sizeof(T)-1] must be valid. * @param val input variable - * @param be true to encode buf in MSBF order, false in LSBF order */ -template::value, int>::type =0> -inline void to_wire(sbuf& buf, const T& val, bool be) +template::value, int>::type =0> +inline void to_wire(Buf& buf, const T& val) { union { T v; uint8_t b[sizeof(T)]; } pun; pun.v = val; - _to_wire(buf, pun.b, be ^ (EPICS_BYTE_ORDER==EPICS_ENDIAN_BIG)); + _to_wire(buf, pun.b, buf.be ^ (EPICS_BYTE_ORDER==EPICS_ENDIAN_BIG)); } -template -void to_wire(sbuf& buf, const Size& size, bool be) +template +void to_wire(Buf& buf, const Size& size) { - if(buf.err || buf.empty()) { - buf.err = true; + if(!buf.ensure(1)) { + buf.fault(); } else if(size.size<254) { - *buf.pos++ = size.size; + buf.push(size.size); } else if(size.size<=0xffffffff) { - *buf.pos++ = 254; - to_wire(buf, uint32_t(size.size), be); + buf.push(254); + to_wire(buf, uint32_t(size.size)); } else { - buf.err = true; + buf.fault(); } } -template -void to_wire(sbuf& buf, const char *s, bool be) +template +void to_wire(Buf& buf, const char *s) { Size len{s ? strlen(s) : 0}; - to_wire(buf, len, be); - if(buf.err || buf.size() -void to_wire(sbuf& buf, std::initializer_list bytes, bool be) +template +void to_wire(Buf& buf, std::initializer_list bytes) { - if(buf.err || buf.size() -void to_wire(sbuf& buf, const Header& H, bool be) +template +void to_wire(Buf& buf, const Header& H) { - if(buf.err || buf.size()<8) { - buf.err = true; + if(!buf.ensure(8)) { + buf.fault(); } else { buf[0] = 0xca; buf[1] = (H.flags&pva_flags::Server) ? pva_version::server : pva_version::client; buf[2] = H.flags; - if(be) + if(buf.be) buf[2] |= pva_flags::MSB; buf[3] = H.cmd; - buf += 4; - to_wire(buf, H.len, be); + buf.skip(4); + to_wire(buf, H.len); } } diff --git a/src/pvxs/version.h b/src/pvxs/version.h index 4ae7b63..d1bc68b 100644 --- a/src/pvxs/version.h +++ b/src/pvxs/version.h @@ -46,6 +46,18 @@ const char *version_str(); PVXS_API unsigned long version_int(); +/** Free some internal global allocations to avoid false positives in + * valgrind (or similar) tools looking for memory leaks. + * + * @warning Use with caution! + * + * @pre Release all resources explicitly allocated through PVXS. + * @post Invalidates internal state. + * Use of __any__ API functions afterwards is undefined! + */ +PVXS_API +void cleanup_for_valgrind(); + } #endif // PVXS_VERSION_H diff --git a/src/server.cpp b/src/server.cpp index ab6dd47..6a32441 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -338,33 +338,32 @@ void Server::Pvt::onSearch(const UDPManager::Search& msg) if(nreply==0 && !msg.mustReply) return; - sbuf M(searchReply.data(), searchReply.size()); + VectorOutBuf M(true, searchReply); - const bool be = true; - to_wire(M, {0xca, pva_version::server, pva_flags::MSB|pva_flags::Server, pva_app_msg::SearchReply}, be); - auto blen = M.split(4); + M.skip(8); // fill in header after body length known _to_wire<12>(M, effective.guid.data(), false); - to_wire(M, msg.searchID, be); - to_wire(M, SockAddr::any(AF_INET), be); - to_wire(M, uint16_t(effective.udp_port), be); - to_wire(M, "tcp", be); + to_wire(M, msg.searchID); + to_wire(M, SockAddr::any(AF_INET)); + to_wire(M, uint16_t(effective.udp_port)); + to_wire(M, "tcp"); // "found" flag - to_wire(M, {uint8_t(nreply!=0 ? 1 : 0)}, be); + to_wire(M, {uint8_t(nreply!=0 ? 1 : 0)}); - to_wire(M, uint16_t(nreply), be); + to_wire(M, uint16_t(nreply)); for(auto i : range(msg.names.size())) { if(searchOp._names[i]._claim) - to_wire(M, uint32_t(msg.names[i].id), be); + to_wire(M, uint32_t(msg.names[i].id)); } - uint32_t ntx = M.pos-searchReply.data(); - to_wire(blen, uint32_t(ntx-8), be); + // now going back to fill in header + FixedBuf H(true, searchReply.data(), 8); + to_wire(H, Header{pva_app_msg::SearchReply, pva_flags::Server, uint32_t(M.size()-8)}); - if(M.err || blen.err) { + if(!M.good() || !H.good()) { log_printf(serverio, PLVL_CRIT, "Logic error in Search buffer fill\n"); } else { - (void)msg.reply(searchReply.data(), ntx); + (void)msg.reply(searchReply.data(), M.size()); } } @@ -372,23 +371,23 @@ void Server::Pvt::doBeacons(short evt) { log_printf(serversetup, PLVL_DEBUG, "Server beacon timer expires\n"); - sbuf M(beaconMsg.data(), beaconMsg.size()); - const bool be = true; - to_wire(M, {0xca, pva_version::server, pva_flags::MSB|pva_flags::Server, pva_app_msg::Beacon}, be); - auto lenfld = M.split(4); + VectorOutBuf M(true, beaconMsg); + M.skip(8); // fill in header after body length known _to_wire<12>(M, effective.guid.data(), false); - M += 4; // ignored/unused + M.skip(4); // ignored/unused - to_wire(M, SockAddr::any(AF_INET), be); - to_wire(M, uint16_t(effective.tcp_port), be); - to_wire(M, "tcp", be); + to_wire(M, SockAddr::any(AF_INET)); + to_wire(M, uint16_t(effective.tcp_port)); + to_wire(M, "tcp"); // "NULL" serverStatus - to_wire(M, {0xff}, be); + to_wire(M, {0xff}); - to_wire(lenfld, uint32_t(M.pos - beaconMsg.data()), be); + // now going back to fill in header + FixedBuf H(true, searchReply.data(), 8); + to_wire(H, Header{pva_app_msg::Beacon, pva_flags::Server, uint32_t(M.size()-8)}); - assert(!M.err && !lenfld.err); + assert(M.good() && H.good()); for(const auto& dest : beaconDest) { int ntx = sendto(beaconSender.sock, (char*)beaconMsg.data(), beaconMsg.size(), 0, &dest->sa, dest.size()); diff --git a/src/serverconn.cpp b/src/serverconn.cpp index 3517a83..d0389a2 100644 --- a/src/serverconn.cpp +++ b/src/serverconn.cpp @@ -50,27 +50,26 @@ ServerConn::ServerConn(ServIface* iface, evutil_socket_t sock, struct sockaddr * uint8_t flags = be ? pva_flags::MSB : 0; flags |= pva_flags::Server; - sbuf M(buf.data(), buf.size()); - to_wire(M, {0xca, pva_version::server, uint8_t(flags|pva_flags::Control), pva_ctrl_msg::SetEndian}, be); - to_wire(M, uint32_t(0), be); + VectorOutBuf M(be, buf); + to_wire(M, Header{pva_ctrl_msg::SetEndian, pva_flags::Control|pva_flags::Server, 0}); - to_wire(M, {0xca, pva_version::server, flags, pva_app_msg::ConnValid}, be); - auto blen = M.split(4); - auto bstart = blen.pos; + auto save = M.save(); + M.skip(8); // placeholder for header // serverReceiveBufferSize, not used - to_wire(M, uint32_t(0x10000), be); + to_wire(M, uint32_t(0x10000)); // serverIntrospectionRegistryMaxSize, also not used - to_wire(M, uint16_t(0x7fff), be); - to_wire(M, Size{2}, be); - to_wire(M, "anonymous", be); - to_wire(M, "ca", be); + to_wire(M, uint16_t(0x7fff)); + to_wire(M, Size{2}); + to_wire(M, "anonymous"); + to_wire(M, "ca"); - to_wire(blen, uint32_t(M.pos-bstart), be); + FixedBuf H(be, save, 8); + to_wire(H, Header{pva_app_msg::ConnValid, pva_flags::Server, uint32_t(M.size()-8)}); - assert(!M.err && !blen.err); + assert(M.good() && H.good()); - if(evbuffer_add(tx, buf.data(), M.pos-buf.data())) + if(evbuffer_add(tx, buf.data(), M.size())) throw std::bad_alloc(); } @@ -91,9 +90,9 @@ void ServerConn::handle_Echo() const bool be = EPICS_BYTE_ORDER == EPICS_ENDIAN_BIG; uint8_t header[8]; - sbuf M(header, sizeof(header)); - to_wire(M, Header{pva_app_msg::Echo, pva_flags::Server, len}, be); - assert(!M.err); + FixedBuf M(be, header); + to_wire(M, Header{pva_app_msg::Echo, pva_flags::Server, len}); + assert(M.good()); auto err = evbuffer_add(tx, header, sizeof(header)); err |= evbuffer_add_buffer(tx, segBuf.get()); @@ -103,36 +102,43 @@ void ServerConn::handle_Echo() bufferevent_flush(bev.get(), EV_WRITE, BEV_FLUSH); } +static +void auth_complete(ServerConn *self, const Status& sts) +{ + std::vector buf(8+5+sts.msg.size()); + //sbuf M(buf); + //M[0] = pva_app_msg::ConnValidated +} + void ServerConn::handle_ConnValid() { // Client begins/restarts Auth handshake - // size to extract and process up to auth payload. - // client may only select from our advertised auth - // mechanisms. "anonymous" is the longest. - uint8_t buf[4+2+2+sizeof("anonymous")]; + EvInBuf M(peerBE, segBuf.get(), 16); - const auto n = evbuffer_copyout(segBuf.get(), buf, sizeof(buf)); - - sbuf M(buf, n); - - M += 6; // ignore unused buffer and introspection size - uint16_t qos; - from_wire(M, qos, peerBE); std::string selected; - from_wire(M, selected, peerBE); + { + M.skip(6); // ignore unused buffer and introspection size + uint16_t qos; + from_wire(M, qos); + from_wire(M, selected); - (void)evbuffer_drain(segBuf.get(), M.pos-buf); + if(!M.good()) { + log_printf(connio, PLVL_ERR, "Truncated/Invalid ConnValid from client"); + bev.reset(); + return; - if(M.err) { - log_hex_printf(connio, PLVL_ERR, buf, n, "Truncated/Invalid ConnValid from client"); - bev.reset(); - return; - } else if(selected!="ca" && selected!="anonymous") { + } + } + + if(selected!="ca" && selected!="anonymous") { log_printf(connio, PLVL_DEBUG, "Client selects unadvertised auth \"%s\"", selected.c_str()); + auth_complete(this, Status{Status::Error, "Client selects unadvertised auth"}); } // remainder of segBuf is payload w/ credentials + + auth_complete(this, Status{Status::Ok}); } void ServerConn::handle_AuthZ() @@ -247,10 +253,10 @@ void ServerConn::bevRead() } // a bit verbose :P - sbuf L(&header[4], 4); + FixedBuf L(peerBE, header); uint32_t len = 0; - from_wire(L, len, peerBE); - assert(!L.err); + from_wire(L, len); + assert(L.good()); if(evbuffer_get_length(rx)-8 < len) { // wait for complete payload @@ -322,6 +328,9 @@ void ServerConn::bevRead() // silently drain any unprocessed body (forward compatibility) if(auto n = evbuffer_get_length(segBuf.get())) evbuffer_drain(segBuf.get(), n); + + // wait for next header + bufferevent_setwatermark(bev.get(), EV_READ, 8, tcp_readahead); } } diff --git a/src/udp_collector.cpp b/src/udp_collector.cpp index 0464c46..4b87318 100644 --- a/src/udp_collector.cpp +++ b/src/udp_collector.cpp @@ -90,16 +90,17 @@ struct UDPCollector : public UDPManager::Search, names.clear(); - sbuf M(&buf[0], size_t(nrx)); + bool be = buf[2]&pva_flags::MSB; + + FixedBuf M(be, buf.data(), nrx); uint8_t cmd = M[3]; - bool be = M[2]&pva_flags::MSB; - M += 4; + M.skip(4); uint32_t len=0; - from_wire(M, len, be); + from_wire(M, len); - if(len > M.size() && !M.err) { + if(len > M.size() && M.good()) { log_printf(logio, PLVL_INFO, "UDP ignore header%u %02x%02x%02x%02x on %s\n", unsigned(M.size()), M[0], M[1], M[2], M[3], name.c_str()); @@ -113,13 +114,13 @@ struct UDPCollector : public UDPManager::Search, SockAddr replyAddr; uint16_t port = 0; - from_wire(M, searchID, be); - from_wire(M, flags, be); + from_wire(M, searchID); + from_wire(M, flags); mustReply = flags&pva_search_flags::MustReply; - M += 3; // unused/reserved + M.skip(3); // unused/reserved - from_wire(M, replyAddr, be); - from_wire(M, port, be); + from_wire(M, replyAddr); + from_wire(M, port); if(replyAddr.isAny()) { replyAddr = src; } @@ -129,42 +130,42 @@ struct UDPCollector : public UDPManager::Search, // however, we will consider and ignore any others which might appear bool foundtcp = false; Size nproto{0}; - from_wire(M, nproto, be); - for(size_t i=0; i=3 && nchar.size==3 && M[0]=='t' && M[1]=='c' && M[2]=='p'; - M += nchar.size; + M.skip(nchar.size); } // one Search message can include many PV names. uint16_t nchan=0; - from_wire(M, nchan, be); + from_wire(M, nchan); names.clear(); names.reserve(nchan); - for(size_t i=0; i(M.pos), id}); + if(foundtcp && chlen.size<=M.size() && M.good()) { + names.push_back(UDPManager::Search::Name{reinterpret_cast(M.save()), id}); } - M += chlen.size; + M.skip(chlen.size); } - if(!M.err) { + if(M.good()) { // ensure nil for final PV name - *M.pos = '\0'; + *M.save() = '\0'; for(auto L : listeners) { if(L->searchCB) { @@ -180,21 +181,21 @@ struct UDPCollector : public UDPManager::Search, uint16_t port = 0; _from_wire<12>(M, &beaconMsg.guid[0], false); - M += 4; // skip flags, seq, and change count. unused - from_wire(M, beaconMsg.server, be); - from_wire(M, port, be); + M.skip(4); // skip flags, seq, and change count. unused + from_wire(M, beaconMsg.server); + from_wire(M, port); if(beaconMsg.server.isAny()) { beaconMsg.server = src; } beaconMsg.server.setPort(port); Size protolen{0}; - from_wire(M, protolen, be); - M += protolen.size; // ignore string + from_wire(M, protolen); + M.skip(protolen.size); // ignore string // ignore remaining "server status" blob - if(!M.err) { + if(M.good()) { for(auto L : listeners) { if(L->beaconCB) { (L->beaconCB)(beaconMsg); @@ -311,6 +312,12 @@ UDPManager UDPManager::instance() return UDPManager(ret); } +void UDPManager::cleanup() +{ + delete udp_gbl; + udp_gbl = nullptr; +} + std::unique_ptr UDPManager::onBeacon(SockAddr& dest, std::function&& cb) { diff --git a/src/udp_collector.h b/src/udp_collector.h index 6adc1a0..0fcfb83 100644 --- a/src/udp_collector.h +++ b/src/udp_collector.h @@ -26,6 +26,7 @@ struct PVXS_API UDPManager { //! get process-wide singleton. static UDPManager instance(); + static void cleanup(); ~UDPManager(); struct Beacon { diff --git a/src/util.cpp b/src/util.cpp index b2bc4da..9f741c4 100644 --- a/src/util.cpp +++ b/src/util.cpp @@ -15,6 +15,7 @@ #include #include "utilpvt.h" +#include "udp_collector.h" namespace pvxs { @@ -30,6 +31,12 @@ unsigned long version_int() return PVXS_VERSION; } +void cleanup_for_valgrind() +{ + pvxsimpl::logger_shutdown(); + pvxsimpl::UDPManager::cleanup(); +} + namespace detail { std::ostream& operator<<(std::ostream& strm, const Escaper& esc) diff --git a/src/utilpvt.h b/src/utilpvt.h index 372d5eb..9bb8630 100644 --- a/src/utilpvt.h +++ b/src/utilpvt.h @@ -126,6 +126,9 @@ public: inline Writer& writer() { return _writer; } }; + +void logger_shutdown(); + } // namespace pvxsimpl #endif // UTILPVT_H diff --git a/test/testev.cpp b/test/testev.cpp index bff2fc8..95bcf4b 100644 --- a/test/testev.cpp +++ b/test/testev.cpp @@ -9,10 +9,11 @@ #include #include +#include #include -namespace { using namespace pvxsimpl; +namespace { struct my_special_error : public std::runtime_error { @@ -63,11 +64,58 @@ void test_call() } +void test_fill_evbuf() +{ + testDiag("%s", __func__); + + evbuf buf(evbuffer_new()); + + { + EvOutBuf M(true, buf.get()); + testEq(M.size(), 0u); + + for(uint32_t i : range(1024)) { + to_wire(M, i); + } + testDiag("Extra %u", unsigned(M.size())); + testOk1(!!M.good()); + + // ~EvOutBuf flushes to backing buf + } + + testEq(evbuffer_get_length(buf.get()), 4*1024u); + + { + EvInBuf M(true, buf.get()); + testEq(M.size(), 0u); + + bool match = true; + for(uint32_t expect : range(1024)) { + uint32_t actual=0; + from_wire(M, actual); + if(actual!=expect) { + testDiag("%08x == %08x", unsigned(expect), unsigned(actual)); + match = false; + break; // only show first failure + } + } + testOk1(!!match); + testOk1(!!M.good()); + testEq(M.size(), 0u); + testOk1(!M.refill(42)); // should be completely empty + } + + testEq(evbuffer_get_length(buf.get()), 0u); +} + } // namespace MAIN(testev) { - testPlan(5); + testPlan(14); test_call(); + test_fill_evbuf(); + libevent_global_shutdown(); + cleanup_for_valgrind(); return testDone(); } diff --git a/test/testsock.cpp b/test/testsock.cpp index 630f441..d1bbf01 100644 --- a/test/testsock.cpp +++ b/test/testsock.cpp @@ -104,44 +104,44 @@ void test_from_wire() { uint32_t val; - const uint8_t buf[] = {0x12, 0x34, 0x56, 0x78, 0xff, 0xff, 0xff, 0xff}; - sbuf pkt(buf, 4); + const uint8_t buf[] = {0x12, 0x34, 0x56, 0x78, 0xff, 0xff}; + FixedBuf pkt(true, buf); - from_wire(pkt, val, true); - testOk1(pkt.empty()); - testOk1(!pkt.err); + from_wire(pkt, val); + testEq(pkt.size(), 2u); + testOk1(pkt.good()); testOk(val==0x12345678, "0x%08x == 0x12345678", (unsigned)val); } { uint32_t val; - const uint8_t buf[] = {0x78, 0x56, 0x34, 0x12, 0xff, 0xff, 0xff, 0xff}; - sbuf pkt(buf, 4); + const uint8_t buf[] = {0x78, 0x56, 0x34, 0x12, 0xff, 0xff}; + FixedBuf pkt(false, buf); - from_wire(pkt, val, false); - testOk1(pkt.empty()); - testOk1(!pkt.err); + from_wire(pkt, val); + testEq(pkt.size(), 2u); + testOk1(pkt.good()); testOk(val==0x12345678, "0x%08x == 0x12345678", (unsigned)val); } { uint32_t val = 0; const uint8_t buf[] = {0x12, 0x34, 0x56, 0x78, 0xff, 0xff, 0xff, 0xff}; - sbuf pkt(buf, 2); + FixedBuf pkt(true, buf, 2); - from_wire(pkt, val, true); - testOk1(pkt.size()==2); - testOk1(pkt.err); + from_wire(pkt, val); + testEq(pkt.size(), 2u); + testOk1(!pkt.good()); testOk(val==0, "0x%08x == 0", (unsigned)val); } { SockAddr val; const uint8_t buf[] = {0,0,0,0, 0,0,0,0, 0,0,0xff,0xff, 0x7f,0,0,1, 0xde, 0xad, 0xbe, 0xef}; - sbuf pkt(buf, 16); + FixedBuf pkt(true, buf); - from_wire(pkt, val, true); - testOk1(pkt.empty()); + from_wire(pkt, val); + testEq(pkt.size(), 4u); testOk1(val.family()==AF_INET); testOk(val->in.sin_addr.s_addr==htonl(INADDR_LOOPBACK), "%08x == 0x7f000001", (unsigned)ntohl(val->in.sin_addr.s_addr)); @@ -150,14 +150,16 @@ void test_from_wire() void test_to_wire() { + testDiag("Enter %s", __func__); + { const uint32_t val = 0xdeadbeef; uint8_t buf[8]; - sbuf pkt(buf, 4); + FixedBuf pkt(true, buf); - to_wire(pkt, val, true); - testOk1(pkt.empty()); - testOk1(!pkt.err); + to_wire(pkt, val); + testEq(pkt.size(), 4u); + testOk1(pkt.good()); testOk(buf[0]==0xde && buf[1]==0xad && buf[2]==0xbe && buf[3]==0xef, "0x%02x%02x%02x%02x == 0xdeadbeef", buf[0], buf[1], buf[2], buf[3]); } @@ -165,23 +167,23 @@ void test_to_wire() { const uint32_t val = 0xdeadbeef; uint8_t buf[8]; - sbuf pkt(buf, 4); + FixedBuf pkt(false, buf); - to_wire(pkt, val, false); - testOk1(pkt.empty()); - testOk1(!pkt.err); + to_wire(pkt, val); + testEq(pkt.size(), 4u); + testOk1(pkt.good()); testOk(buf[0]==0xef && buf[1]==0xbe && buf[2]==0xad && buf[3]==0xde, - "0x%02x%02x%02x%02x == 0xdeadbeef", buf[0], buf[1], buf[2], buf[3]); + "0x%02x%02x%02x%02x == 0xefbeadde", buf[0], buf[1], buf[2], buf[3]); } { const SockAddr val(SockAddr::loopback(AF_INET)); uint8_t buf[16+4]; - sbuf pkt(buf, 16); + FixedBuf pkt(true, buf); - to_wire(pkt, val, true); - testOk1(pkt.empty()); - testOk1(!pkt.err); + to_wire(pkt, val); + testEq(pkt.size(), 4u); + testOk1(pkt.good()); const uint8_t expect[16] = {0,0,0,0, 0,0,0,0, 0,0,0xff,0xff, 0x7f,0,0,1}; testOk1(std::memcmp(buf, expect, 16)==0); @@ -190,11 +192,11 @@ void test_to_wire() { const uint32_t val = 0xdeadbeef; uint8_t buf[8] = {0,0,0,0,0,0,0,0}; - sbuf pkt(buf, 2); + FixedBuf pkt(true, buf, 2); - to_wire(pkt, val, true); - testOk1(pkt.size()==2); - testOk1(pkt.err); + to_wire(pkt, val); + testEq(pkt.size(), 2u); + testOk1(!pkt.good()); testOk(buf[0]==0 && buf[1]==0 && buf[2]==0 && buf[3]==0, "0x%02x%02x%02x%02x == 0", buf[0], buf[1], buf[2], buf[3]); } @@ -210,5 +212,7 @@ MAIN(testsock) test_from_wire(); test_to_wire(); testDiag("Done"); + libevent_global_shutdown(); + cleanup_for_valgrind(); return testDone(); } diff --git a/test/testudp.cpp b/test/testudp.cpp index 3992568..4fbabf8 100644 --- a/test/testudp.cpp +++ b/test/testudp.cpp @@ -114,31 +114,32 @@ void testSearch(bool be, std::initializer_list names) sub->start(); std::vector msg(1024, 0); - sbuf M(msg.data(), msg.size()); - to_wire(M, {0xca, pva_version::client, uint8_t(be ? pva_flags::MSB : 0), pva_app_msg::Search}, be); - auto blen = M.split(4); - to_wire(M, uint32_t(0x12345678), be); - M+=4; + VectorOutBuf M(be, msg); + + M.skip(8); // placeholder for header + to_wire(M, uint32_t(0x12345678)); + M.skip(4); SockAddr reply(SockAddr::any(AF_INET, 0x1020)); - to_wire(M, reply, be); - to_wire(M, uint16_t(reply.port()), be); + to_wire(M, reply); + to_wire(M, uint16_t(reply.port())); // one protocol w/ 3 chars - to_wire(M, {1}, be); - to_wire(M, "tcp", be); - to_wire(M, uint16_t(names.size()), be); + to_wire(M, Size{1}); + to_wire(M, "tcp"); + to_wire(M, uint16_t(names.size())); uint32_t i=1; for(auto name : names) { - to_wire(M, i++, be); - to_wire(M, name, be); + to_wire(M, i++); + to_wire(M, name); } - to_wire(blen, uint32_t(M.pos-msg.data()-8), be); + FixedBuf H(be, msg.data(), 8); + to_wire(H, Header{pva_app_msg::Search, 0, uint32_t(M.size()-8)}); - testOk1(!M.err); - testDiag("Buffer pos %u of %u", unsigned(M.pos-msg.data()), unsigned(msg.size())); + testOk1(M.good() && H.good()); + testOk1(M.save()>=msg.data()); + testOk1(M.save()<=msg.data()+msg.size()); - const size_t ntx = M.pos-msg.data(); - testOk1(sendto(sock.sock, (char*)msg.data(), ntx, 0, &listener->sa, listener.size())==int(ntx)); + testOk1(sendto(sock.sock, (char*)msg.data(), M.size(), 0, &listener->sa, listener.size())==int(M.size())); manager.sync(); testOk1(!!rx.wait(30.0)); } @@ -147,7 +148,7 @@ void testSearch(bool be, std::initializer_list names) int main(int argc, char *argv[]) { - testPlan(38); + testPlan(46); pvxs::logger_config_env(); testBeacon(true); testBeacon(false); @@ -155,5 +156,7 @@ int main(int argc, char *argv[]) testSearch(false, {"hello"}); testSearch(true , {"one", "two"}); testSearch(false, {"one", "two"}); + libevent_global_shutdown(); + cleanup_for_valgrind(); return testDone(); }