diff --git a/src/evhelper.cpp b/src/evhelper.cpp index 28f4f67..0cf8fae 100644 --- a/src/evhelper.cpp +++ b/src/evhelper.cpp @@ -354,4 +354,12 @@ bool EvInBuf::refill(size_t more) return true; } +void to_evbuf(evbuffer *buf, const Header& H, bool be) +{ + EvOutBuf M(be, buf, 8); + to_wire(M, H); + if(!M.good()) + throw std::bad_alloc(); +} + } // namespace pvxsimpl diff --git a/src/pvaproto.h b/src/pvaproto.h index f060305..f6193a3 100644 --- a/src/pvaproto.h +++ b/src/pvaproto.h @@ -225,37 +225,41 @@ struct Status { Fatal=3, } code; std::string msg; + std::string trace; }; template void to_wire(Buf& buf, const Status& sts) { - if(buf.err || buf.empty()) { - buf.err = true; + if(!buf.ensure(1)) { + buf.fault(); - } else if(sts.code==Status::Ok && sts.msg.empty()) { - *buf.pos++ = 255; + } else if(sts.code==Status::Ok && sts.msg.empty() && sts.trace.empty()) { + buf.push(255); } else { - *buf.pos++ = sts.code; + buf.push(sts.code); to_wire(buf, sts.msg.c_str()); + to_wire(buf, sts.trace.c_str()); } } template void from_wire(Buf& buf, Status& sts) { - if(buf.err || buf.empty()) { - buf.err = true; + if(!buf.ensure(1)) { + buf.fault(); - } else if(255==*buf.pos) { - buf.pos++; + } else if(255==buf[0]) { + buf._skip(1); sts.code = Status::Ok; sts.msg.clear(); + sts.trace.clear(); } else { - sts.code = *buf.pos++; + sts.code = buf.pop(); from_wire(buf, sts.msg); + from_wire(buf, sts.trace); } } @@ -451,11 +455,13 @@ void to_wire(Buf& buf, const Header& H) if(buf.be) buf[2] |= pva_flags::MSB; buf[3] = H.cmd; - buf.skip(4); + buf._skip(4); to_wire(buf, H.len); } } +void to_evbuf(evbuffer *buf, const Header& H, bool be); + } // namespace pvxsimpl #endif // PVAPROTO_H diff --git a/src/pvxs/server.h b/src/pvxs/server.h index 7d38c47..1f5ed95 100644 --- a/src/pvxs/server.h +++ b/src/pvxs/server.h @@ -105,17 +105,34 @@ public: PVXS_API static Config from_env(); Config() :tcp_port(5075), udp_port(5076), auto_beacon(true), guid{} {} + + PVXS_API Server build(); }; //! An empty/dummy Server Server(); //! Create/allocate, but do not start, a new server with the provided config. explicit Server(Config&&); + Server(Server&&) noexcept; + Server(const Server&) = delete; + Server& operator=(Server&&) noexcept; + Server& operator=(const Server&) = delete; ~Server(); + //! Begin serving. Does not block. Server& start(); + //! Stop server Server& stop(); + /** start() and then (maybe) stop() + * + * run() may be interupted by calling interrupt(), + * or by SIGINT SIGTERM (only one Server per process) + */ + Server& run(); + //! Queue a request to break run() + Server& interrupt(); + //! effective config const Config& config() const; @@ -123,11 +140,13 @@ public: const std::shared_ptr& src, int order =0); - std::unique_ptr removeSource(const std::string& name); + std::shared_ptr removeSource(const std::string& name, + int order =0); - std::unique_ptr getSource(const std::string& name); + std::shared_ptr getSource(const std::string& name, + int order =0); - void listSource(std::vector& names); + void listSource(std::vector >& names); explicit operator bool() const { return !!pvt; } @@ -136,6 +155,7 @@ private: std::unique_ptr pvt; }; + struct PVXS_API Source { virtual ~Source(); @@ -169,6 +189,8 @@ struct PVXS_API Source { virtual std::unique_ptr onCreate(const Create& op) =0; }; +struct Handler {}; + }} // namespace pvxs::server #endif // PVXS_SERVER_H diff --git a/src/server.cpp b/src/server.cpp index 6a32441..b2e17e6 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -9,8 +9,11 @@ #include #include #include +#include #include +#include + #include #include #include @@ -79,6 +82,11 @@ Server::Config Server::Config::from_env() } +Server Server::Config::build() +{ + Server ret(std::move(*this)); + return ret; +} Server::Server() {} @@ -86,8 +94,88 @@ Server::Server(Config&& conf) :pvt(new Pvt(std::move(conf))) {} +Server::Server(Server&& other) noexcept + :pvt(std::move(other.pvt)) +{} + +Server& Server::operator=(Server&& other) noexcept +{ + if(this!=&other) { + pvt = std::move(other.pvt); + } + return *this; +} + Server::~Server() {} +Server& Server::addSource(const std::string& name, + const std::shared_ptr& src, + int order) +{ + if(!pvt) + throw std::logic_error("NULL Server"); + if(!src) + throw std::logic_error(SB()<<"Attempt to add NULL Source "< G(pvt->sourcesLock.writer()); + + auto& ent = pvt->sources[std::make_pair(order, name)]; + if(ent) + throw std::runtime_error(SB()<<"Source already registered : ("< Server::removeSource(const std::string& name, int order) +{ + if(!pvt) + throw std::logic_error("NULL Server"); + + epicsGuard G(pvt->sourcesLock.writer()); + + std::shared_ptr ret; + auto it = pvt->sources.find(std::make_pair(order, name)); + if(it!=pvt->sources.end()) { + ret = it->second; + pvt->sources.erase(it); + } + + return ret; +} + +std::shared_ptr Server::getSource(const std::string& name, int order) +{ + if(!pvt) + throw std::logic_error("NULL Server"); + + epicsGuard G(pvt->sourcesLock.reader()); + + std::shared_ptr ret; + auto it = pvt->sources.find(std::make_pair(order, name)); + if(it!=pvt->sources.end()) { + ret = it->second; + } + + return ret; +} + +void Server::listSource(std::vector > &names) +{ + if(!pvt) + throw std::logic_error("NULL Server"); + + names.clear(); + + epicsGuard G(pvt->sourcesLock.reader()); + + names.reserve(pvt->sources.size()); + + for(auto& pair : pvt->sources) { + names.emplace_back(pair.first.second, pair.first.first); + } +} + const Server::Config& Server::config() const { if(!pvt) @@ -104,10 +192,75 @@ Server& Server::start() return *this; } +Server& Server::stop() +{ + if(!pvt) + throw std::logic_error("NULL Server"); + pvt->stop(); + return *this; +} + +static std::atomic sig_target{nullptr}; + +static void sig_handle(int sig) +{ + auto serv = sig_target.load(); + + if(serv) + serv->done.signal(); +} + +Server& Server::run() +{ + if(!pvt) + throw std::logic_error("NULL Server"); + + Server::Pvt* expect = nullptr; + + + std::function cleanup; + if(sig_target.compare_exchange_weak(expect, pvt.get())) { + auto prevINT = signal(SIGINT , &sig_handle); + auto prevTERM = signal(SIGTERM, &sig_handle); + + cleanup = [this, prevINT, prevTERM]() { + Server::Pvt* expect = pvt.get(); + if(sig_target.compare_exchange_weak(expect, nullptr)) { + signal(SIGINT , prevINT); + signal(SIGTERM, prevTERM); + } + }; + } + + try { + pvt->start(); + + pvt->done.wait(); + + pvt->stop(); + } catch(...) { + if(cleanup) + cleanup(); + throw; + } + if(cleanup) + cleanup(); + + return *this; +} + +Server& Server::interrupt() +{ + if(!pvt) + throw std::logic_error("NULL Server"); + pvt->done.signal(); + return *this; +} + Server::Pvt::Pvt(Config&& conf) :effective(std::move(conf)) ,beaconMsg(128) - ,acceptor_loop("PVXS Acceptor", epicsThreadPriorityCAServerLow-2) + ,acceptor_loop("PVXTCP", epicsThreadPriorityCAServerLow-2) ,beaconSender(AF_INET, SOCK_DGRAM, 0) ,beaconTimer(event_new(acceptor_loop.base, -1, EV_TIMEOUT, doBeaconsS, this)) ,searchReply(0x10000) @@ -151,7 +304,11 @@ Server::Pvt::Pvt(Config&& conf) pun.i[1] = 0xdeadbeef; // because... why not { ELLLIST bcasts = ELLLIST_INIT; - osiSockDiscoverBroadcastAddresses(&bcasts, dummy.sock, nullptr); + osiSockAddr match; + match.ia.sin_family = AF_INET; + match.ia.sin_addr.s_addr = htonl(INADDR_ANY); + match.ia.sin_port = 0; + osiSockDiscoverBroadcastAddresses(&bcasts, dummy.sock, &match); while(ELLNODE *cur = ellGet(&bcasts)) { osiSockAddrNode *node = CONTAINER(cur, osiSockAddrNode, node); @@ -345,7 +502,7 @@ void Server::Pvt::onSearch(const UDPManager::Search& msg) _to_wire<12>(M, effective.guid.data(), false); to_wire(M, msg.searchID); to_wire(M, SockAddr::any(AF_INET)); - to_wire(M, uint16_t(effective.udp_port)); + to_wire(M, uint16_t(effective.tcp_port)); to_wire(M, "tcp"); // "found" flag to_wire(M, {uint8_t(nreply!=0 ? 1 : 0)}); @@ -355,15 +512,16 @@ void Server::Pvt::onSearch(const UDPManager::Search& msg) if(searchOp._names[i]._claim) to_wire(M, uint32_t(msg.names[i].id)); } + auto pktlen = M.save()-searchReply.data(); // now going back to fill in header FixedBuf H(true, searchReply.data(), 8); - to_wire(H, Header{pva_app_msg::SearchReply, pva_flags::Server, uint32_t(M.size()-8)}); + to_wire(H, Header{pva_app_msg::SearchReply, pva_flags::Server, uint32_t(pktlen-8)}); if(!M.good() || !H.good()) { log_printf(serverio, PLVL_CRIT, "Logic error in Search buffer fill\n"); } else { - (void)msg.reply(searchReply.data(), M.size()); + (void)msg.reply(searchReply.data(), pktlen); } } @@ -383,14 +541,16 @@ void Server::Pvt::doBeacons(short evt) // "NULL" serverStatus to_wire(M, {0xff}); + auto pktlen = M.save()-searchReply.data(); + // now going back to fill in header FixedBuf H(true, searchReply.data(), 8); - to_wire(H, Header{pva_app_msg::Beacon, pva_flags::Server, uint32_t(M.size()-8)}); + to_wire(H, Header{pva_app_msg::Beacon, pva_flags::Server, uint32_t(pktlen-8)}); assert(M.good() && H.good()); for(const auto& dest : beaconDest) { - int ntx = sendto(beaconSender.sock, (char*)beaconMsg.data(), beaconMsg.size(), 0, &dest->sa, dest.size()); + int ntx = sendto(beaconSender.sock, (char*)beaconMsg.data(), pktlen, 0, &dest->sa, dest.size()); if(ntx<0) { int err = evutil_socket_geterror(beaconSender.sock); @@ -416,4 +576,6 @@ void Server::Pvt::doBeaconsS(evutil_socket_t fd, short evt, void *raw) } } +Source::~Source() {} + }} // namespace pvxs::server diff --git a/src/serverconn.cpp b/src/serverconn.cpp index d0389a2..909c1a2 100644 --- a/src/serverconn.cpp +++ b/src/serverconn.cpp @@ -32,6 +32,7 @@ ServerConn::ServerConn(ServIface* iface, evutil_socket_t sock, struct sockaddr * ,expectSeg(false) ,segCmd(0xff) ,segBuf(evbuffer_new()) + ,txBody(evbuffer_new()) { bufferevent_setcb(bev.get(), &bevReadS, &bevWriteS, &bevEventS, this); // initially wait for at least a header @@ -55,6 +56,7 @@ ServerConn::ServerConn(ServIface* iface, evutil_socket_t sock, struct sockaddr * auto save = M.save(); M.skip(8); // placeholder for header + auto bstart = M.save(); // serverReceiveBufferSize, not used to_wire(M, uint32_t(0x10000)); @@ -63,13 +65,14 @@ ServerConn::ServerConn(ServIface* iface, evutil_socket_t sock, struct sockaddr * to_wire(M, Size{2}); to_wire(M, "anonymous"); to_wire(M, "ca"); + auto bend = M.save(); FixedBuf H(be, save, 8); - to_wire(H, Header{pva_app_msg::ConnValid, pva_flags::Server, uint32_t(M.size()-8)}); + to_wire(H, Header{pva_app_msg::ConnValid, pva_flags::Server, uint32_t(bend-bstart)}); assert(M.good() && H.good()); - if(evbuffer_add(tx, buf.data(), M.size())) + if(evbuffer_add(tx, buf.data(), M.save()-buf.data())) throw std::bad_alloc(); } @@ -89,13 +92,9 @@ void ServerConn::handle_Echo() uint32_t len = evbuffer_get_length(segBuf.get()); const bool be = EPICS_BYTE_ORDER == EPICS_ENDIAN_BIG; - uint8_t header[8]; - FixedBuf M(be, header); - to_wire(M, Header{pva_app_msg::Echo, pva_flags::Server, len}); - assert(M.good()); + to_evbuf(tx, Header{pva_app_msg::Echo, pva_flags::Server, len}, be); - auto err = evbuffer_add(tx, header, sizeof(header)); - err |= evbuffer_add_buffer(tx, segBuf.get()); + auto err = evbuffer_add_buffer(tx, segBuf.get()); assert(!err); // maybe help reduce latency @@ -105,14 +104,28 @@ void ServerConn::handle_Echo() static void auth_complete(ServerConn *self, const Status& sts) { - std::vector buf(8+5+sts.msg.size()); - //sbuf M(buf); - //M[0] = pva_app_msg::ConnValidated + const bool be = EPICS_BYTE_ORDER==EPICS_ENDIAN_BIG; + (void)evbuffer_drain(self->txBody.get(), evbuffer_get_length(self->txBody.get())); + + { + EvOutBuf M(be, self->txBody.get()); + to_wire(M, sts); + } + + auto tx = bufferevent_get_output(self->bev.get()); + to_evbuf(tx, Header{pva_app_msg::ConnValidated, + pva_flags::Server, + uint32_t(evbuffer_get_length(self->txBody.get()))}, + be); + auto err = evbuffer_add_buffer(tx, self->txBody.get()); + assert(!err); + + log_printf(connsetup, PLVL_DEBUG, "Auth complete with %d\n", sts.code); } void ServerConn::handle_ConnValid() { - // Client begins/restarts Auth handshake + // Client begins (restarts?) Auth handshake EvInBuf M(peerBE, segBuf.get(), 16); @@ -138,11 +151,14 @@ void ServerConn::handle_ConnValid() // remainder of segBuf is payload w/ credentials + // TODO actually check credentials auth_complete(this, Status{Status::Ok}); } void ServerConn::handle_AuthZ() -{} +{ + // ignored (so far no auth plugin actually uses) +} void ServerConn::handle_Search() {} @@ -180,15 +196,18 @@ void ServerConn::handle_Message() void ServerConn::cleanup() { + log_printf(connsetup, PLVL_DEBUG, "Cleanup TCP Connection\n"); + // remove myself from connections list decltype (iface->connections) trash; for (auto it = iface->connections.begin(), end = iface->connections.end(); it!=end; ++it) { if((&*it)==this) { - trash.splice(it, iface->connections); + trash.splice(trash.end(), iface->connections, it); break; } } assert(!trash.empty()); + // delete this } void ServerConn::bevEvent(short events) @@ -222,38 +241,24 @@ void ServerConn::bevRead() auto ret = evbuffer_copyout(rx, header, sizeof(header)); assert(ret==sizeof(header)); // previously verified - if(header[0]!=0xca || header[1]==0 || !(header[2]&pva_flags::Server)) { + if(header[0]!=0xca || header[1]==0 || (header[2]&pva_flags::Server)) { log_hex_printf(connio, PLVL_ERR, header, sizeof(header), "Protocol decode fault. Force disconnect.\n"); bev.reset(); break; } + log_hex_printf(connio, PLVL_DEBUG, header, sizeof(header), "Receive header\n"); if(header[2]&pva_flags::Control) { - switch (header[3]) { - case pva_ctrl_msg::SetEndian: - // while we don't enforce. This should be the very first message sent. - peerBE = header[2]&pva_flags::MSB; - break; - default: - // Set/AckMarker never used - break; - } + // Control messages are not actually useful evbuffer_drain(rx, 8); continue; - } // application message - const bool be = header[2]&pva_flags::MSB; - if(be!=peerBE) { - // wonderful PVA is redundant in communicating peer byte order. - // Which is included in every header _and_ the special SetEndian control message. - // While they really should be consistent, the original impl. only uses SetEndian - log_printf(connio, PLVL_CRIT, "Peer messages with inconsistent endian\n"); - } + peerBE = header[2]&pva_flags::MSB; // a bit verbose :P - FixedBuf L(peerBE, header); + FixedBuf L(peerBE, header+4, 4); uint32_t len = 0; from_wire(L, len); assert(L.good()); @@ -420,6 +425,7 @@ void ServIface::onConnS(struct evconnlistener *listener, evutil_socket_t sock, s } auto self = static_cast(raw); self->connections.emplace_back(self, sock, peer, socklen); + log_printf(connsetup, PLVL_DEBUG, "Setup TCP Connection\n"); }catch(std::exception& e){ log_printf(connio, PLVL_CRIT, "Unhandled error in accept callback: %s\n", e.what()); evutil_closesocket(sock); diff --git a/src/serverconn.h b/src/serverconn.h index ce6d9b6..cd07ad0 100644 --- a/src/serverconn.h +++ b/src/serverconn.h @@ -10,6 +10,8 @@ #include #include +#include + #include #include "evhelper.h" #include "utilpvt.h" @@ -43,7 +45,7 @@ struct ServerConn bool expectSeg; uint8_t segCmd; - evbuf segBuf; + evbuf segBuf, txBody; ServerConn(ServIface* iface, evutil_socket_t sock, struct sockaddr *peer, int socklen); ~ServerConn(); @@ -106,6 +108,8 @@ struct Server::Pvt // "const" after ctor Config effective; + epicsEvent done; + std::vector beaconMsg; std::list > listeners; diff --git a/test/Makefile b/test/Makefile index a4cb72a..a804448 100644 --- a/test/Makefile +++ b/test/Makefile @@ -22,6 +22,10 @@ TESTPROD += testudp testudp_SRCS += testudp.cpp TESTS += testudp +TESTPROD += dummyserv +dummyserv_SRCS += dummyserv.cpp +# not a unittest + PROD_SYS_LIBS += event_core PROD_SYS_LIBS_DEFAULT += event_pthreads diff --git a/test/dummyserv.cpp b/test/dummyserv.cpp new file mode 100644 index 0000000..9423d04 --- /dev/null +++ b/test/dummyserv.cpp @@ -0,0 +1,78 @@ +/** + * Copyright - See the COPYRIGHT that is included with this distribution. + * pvxs is distributed subject to a Software License Agreement found + * in file LICENSE that is included with this distribution. + */ + +#include +#include +#include +#include + +#include + +#include +#include + + +namespace { +using namespace pvxs::server; + +DEFINE_LOGGER(dummy,"dummyserv"); + +struct DummySource : public Source +{ + std::set names; + virtual ~DummySource() {} + + // Source interface +public: + virtual void onSearch(Search &op) override + { + for(auto& name : op) { + if(names.find(name.name())!=names.end()) { + log_printf(dummy, PLVL_INFO, "Claiming '%s'\n", name.name()); + name.claim(); + } else { + log_printf(dummy, PLVL_DEBUG, "Ignoring '%s'\n", name.name()); + } + } + } + virtual std::unique_ptr onCreate(const Create &op) override + { + return nullptr; + } +}; + +} // namespace + +int main(int argc, char *argv[]) +{ + int ret = 0; + try { + pvxs::logger_level_set("dummy", PLVL_INFO); + pvxs::logger_config_env(); + + std::shared_ptr src(new DummySource); + src->names.emplace("blah"); + + auto serv = std::move(Server::Config::from_env() + .build() + .addSource("dummy", src)); + + auto& conf = serv.config(); + + std::cout<<"Serving from :\n"; + for(auto& iface : conf.interfaces) { + std::cout<<" "< names) to_wire(M, name); } + auto pktlen = M.save()-msg.data(); + FixedBuf H(be, msg.data(), 8); - to_wire(H, Header{pva_app_msg::Search, 0, uint32_t(M.size()-8)}); + to_wire(H, Header{pva_app_msg::Search, 0, uint32_t(pktlen-8)}); testOk1(M.good() && H.good()); testOk1(M.save()>=msg.data()); testOk1(M.save()<=msg.data()+msg.size()); - testOk1(sendto(sock.sock, (char*)msg.data(), M.size(), 0, &listener->sa, listener.size())==int(M.size())); + testOk1(sendto(sock.sock, (char*)msg.data(), pktlen, 0, &listener->sa, listener.size())==int(pktlen)); manager.sync(); testOk1(!!rx.wait(30.0)); }