From f063bd26f58ca55cce38a27e42451abe3e090e53 Mon Sep 17 00:00:00 2001 From: Michael Davidsaver Date: Tue, 19 Nov 2019 09:52:01 -0800 Subject: [PATCH] tcp search --- src/pvxs/server.h | 5 +-- src/serverchan.cpp | 90 ++++++++++++++++++++++++++++++++++++++++++++++ src/serverconn.cpp | 3 -- 3 files changed, 93 insertions(+), 5 deletions(-) diff --git a/src/pvxs/server.h b/src/pvxs/server.h index af5d093..b7a7578 100644 --- a/src/pvxs/server.h +++ b/src/pvxs/server.h @@ -213,9 +213,10 @@ struct PVXS_API Source { //! An iteratable of names being sought struct Search { class Name { - const char* _name; - bool _claim; + const char* _name = nullptr; + bool _claim = false; friend struct Server::Pvt; + friend struct impl::ServerConn; public: //! The Channel name inline const char* name() const { return _name; } diff --git a/src/serverchan.cpp b/src/serverchan.cpp index 40322f9..53dcacd 100644 --- a/src/serverchan.cpp +++ b/src/serverchan.cpp @@ -17,6 +17,8 @@ DEFINE_LOGGER(connsetup, "tcp.setup"); // related to low level send/recv DEFINE_LOGGER(connio, "tcp.io"); +DEFINE_LOGGER(serversetup, "server.setup"); + ServerChan::ServerChan(const std::shared_ptr &conn, uint32_t sid, uint32_t cid, @@ -80,6 +82,94 @@ void ServerChannelControl::close() }); } +void ServerConn::handle_Search() +{ + const bool be = EPICS_BYTE_ORDER==EPICS_ENDIAN_BIG; + EvInBuf M(peerBE, segBuf.get(), 16); + + uint32_t searchID=0; + uint8_t flags=0; + + from_wire(M, searchID); + from_wire(M, flags); + bool mustReply = flags&pva_search_flags::MustReply; + M.skip(3 + 16 + 2); // unused and replyAddr (we always and only reply to TCP peer) + + bool foundtcp = false; + Size nproto{0}; + from_wire(M, nproto); + for(size_t i=0; i> nameStorage(nchan); + op._names.resize(nchan); + + for(auto n : range(nchan)) { + from_wire(M, nameStorage[n].first); + from_wire(M, nameStorage[n].second); + op._names[n]._name = nameStorage[n].second.c_str(); + } + + if(!M.good()) + throw std::runtime_error("TCP Search decode error"); + + { + auto G(iface->server->sourcesLock.lockReader()); + for(const auto& pair : iface->server->sources) { + try { + pair.second->onSearch(op); + }catch(std::exception& e){ + log_printf(serversetup, PLVL_ERR, "Unhandled error in Source::onSearch for '%s' : %s\n", + pair.first.second.c_str(), e.what()); + } + } + } + + uint16_t nreply = 0; + for(const auto& name : op._names) { + if(name._claim) + nreply++; + } + + if(nreply==0 && !mustReply) + return; + + { + (void)evbuffer_drain(txBody.get(), evbuffer_get_length(txBody.get())); + + EvOutBuf R(be, txBody.get()); + + to_wire(M, searchID); + to_wire(M, iface->bind_addr); + to_wire(M, iface->bind_addr.port()); + to_wire(M, "tcp"); + // "found" flag + to_wire(M, {uint8_t(nreply!=0 ? 1 : 0)}); + + to_wire(M, uint16_t(nreply)); + for(auto i : range(op._names.size())) { + if(op._names[i]._claim) + to_wire(M, uint32_t(nameStorage[i].first)); + } + } + + auto tx = bufferevent_get_output(bev.get()); + to_evbuf(tx, Header{pva_app_msg::SearchReply, + pva_flags::Server, + uint32_t(evbuffer_get_length(txBody.get()))}, + be); + auto err = evbuffer_add_buffer(tx, txBody.get()); + assert(!err); +} + void ServerConn::handle_CreateChan() { const bool be = EPICS_BYTE_ORDER==EPICS_ENDIAN_BIG; diff --git a/src/serverconn.cpp b/src/serverconn.cpp index 5cef626..d29c954 100644 --- a/src/serverconn.cpp +++ b/src/serverconn.cpp @@ -171,9 +171,6 @@ void ServerConn::handle_AuthZ() // ignored (so far no auth plugin actually uses) } -void ServerConn::handle_Search() -{} - void ServerConn::handle_GetOp() {}