diff --git a/src/Makefile b/src/Makefile index d58343a..284c63f 100644 --- a/src/Makefile +++ b/src/Makefile @@ -64,6 +64,7 @@ LIB_SRCS += evhelper.cpp LIB_SRCS += udp_collector.cpp LIB_SRCS += config.cpp +LIB_SRCS += conn.cpp LIB_SRCS += server.cpp LIB_SRCS += serverconn.cpp diff --git a/src/conn.cpp b/src/conn.cpp new file mode 100644 index 0000000..d9da174 --- /dev/null +++ b/src/conn.cpp @@ -0,0 +1,250 @@ +/** + * Copyright - See the COPYRIGHT that is included with this distribution. + * pvxs is distributed subject to a Software License Agreement found + * in file LICENSE that is included with this distribution. + */ + +#include + +#include +#include "conn.h" + +DEFINE_LOGGER(connsetup, "pvxs.tcp.setup"); +DEFINE_LOGGER(connio, "pvxs.tcp.io"); + +namespace pvxs { +namespace impl { + +ConnBase::ConnBase(bool isClient, bufferevent* bev, const SockAddr& peerAddr) + :peerAddr(peerAddr) + ,peerName(peerAddr.tostring()) + ,bev(bev) + ,isClient(isClient) + ,peerBE(true) // arbitrary choice, default should be overwritten before use + ,expectSeg(false) + ,segCmd(0xff) + ,segBuf(evbuffer_new()) + ,txBody(evbuffer_new()) +{ + // initially wait for at least a header + bufferevent_setwatermark(this->bev.get(), EV_READ, 8, tcp_readahead); +} + +ConnBase::~ConnBase() {} + +const char* ConnBase::peerLabel() const +{ + return isClient ? "Server" : "Client"; +} + +void ConnBase::enqueueTxBody(pva_app_msg_t cmd) +{ + auto tx = bufferevent_get_output(bev.get()); + to_evbuf(tx, Header{cmd, + uint8_t(isClient ? 0u : pva_flags::Server), + uint32_t(evbuffer_get_length(txBody.get()))}, + hostBE); + auto err = evbuffer_add_buffer(tx, txBody.get()); + assert(!err); +} + +#define CASE(Op) void ConnBase::handle_##Op() {} + CASE(ECHO); + CASE(CONNECTION_VALIDATION); + CASE(CONNECTION_VALIDATED); + CASE(SEARCH); + CASE(AUTHNZ); + + CASE(CREATE_CHANNEL); + CASE(DESTROY_CHANNEL); + + CASE(GET); + CASE(PUT); + CASE(PUT_GET); + CASE(MONITOR); + CASE(RPC); + CASE(CANCEL_REQUEST); + CASE(DESTROY_REQUEST); + CASE(GET_FIELD); + + CASE(MESSAGE); +#undef CASE + +void ConnBase::bevEvent(short events) +{ + if(events&(BEV_EVENT_EOF|BEV_EVENT_ERROR|BEV_EVENT_TIMEOUT)) { + if(events&BEV_EVENT_ERROR) { + int err = EVUTIL_SOCKET_ERROR(); + const char *msg = evutil_socket_error_to_string(err); + log_err_printf(connio, "%s %s connection closed with socket error %d : %s\n", peerLabel(), peerName.c_str(), err, msg); + } + if(events&BEV_EVENT_EOF) { + log_debug_printf(connio, "%s %s connection closed by peer\n", peerLabel(), peerName.c_str()); + } + if(events&BEV_EVENT_TIMEOUT) { + log_warn_printf(connio, "%s %s connection timeout\n", peerLabel(), peerName.c_str()); + } + bev.reset(); + } + + if(!bev) + cleanup(); +} + +void ConnBase::bevRead() +{ + auto rx = bufferevent_get_input(bev.get()); + + while(bev && evbuffer_get_length(rx)>=8) { + uint8_t header[8]; + + auto ret = evbuffer_copyout(rx, header, sizeof(header)); + assert(ret==sizeof(header)); // previously verified + + if(header[0]!=0xca || header[1]==0 + || (isClient ^ !!(header[2]&pva_flags::Server))) { + log_hex_printf(connio, Level::Err, header, sizeof(header), + "%s %s Protocol decode fault. Force disconnect.\n", peerLabel(), peerName.c_str()); + bev.reset(); + break; + } + log_hex_printf(connio, Level::Debug, header, sizeof(header), + "%s %s Receive header\n", peerLabel(), peerName.c_str()); + + if(header[2]&pva_flags::Control) { + // Control messages are not actually useful + evbuffer_drain(rx, 8); + continue; + } + // application message + + peerBE = header[2]&pva_flags::MSB; + + // a bit verbose :P + FixedBuf L(peerBE, header+4, 4); + uint32_t len = 0; + from_wire(L, len); + assert(L.good()); + + if(evbuffer_get_length(rx)-8 < len) { + // wait for complete payload + // and some additional if available + size_t readahead = len; + if(readahead < std::numeric_limits::max()-tcp_readahead) + readahead += tcp_readahead; + bufferevent_setwatermark(bev.get(), EV_READ, len, readahead); + break; + } + + evbuffer_drain(rx, 8); + { + unsigned n = evbuffer_remove_buffer(rx, segBuf.get(), len); + assert(n==len); // we know rx buf contains the entire body + } + + // so far we do not use segmentation to support incremental processing + // of long messages. We instead accumulate all segments of a message + // prior to parsing. + + auto seg = header[2]&pva_flags::SegMask; + + bool continuation = seg&pva_flags::SegLast; // true for mid or last. false for none or first + if((continuation ^ expectSeg) || (continuation && header[3]!=segCmd)) { + log_crit_printf(connio, "%s %s Peer segmentation violation %c%c 0x%02x==0x%02x\n", peerLabel(), peerName.c_str(), + expectSeg?'Y':'N', continuation?'Y':'N', + segCmd, header[3]); + bev.reset(); + break; + } + + if(!seg || seg==pva_flags::SegFirst) { + expectSeg = true; + segCmd = header[3]; + } + + if(!seg || seg==pva_flags::SegLast) { + expectSeg = false; + + // ready to process segBuf + switch(segCmd) { + default: + log_debug_printf(connio, "%s %s Ignore unexpected command 0x%02x\n", peerLabel(), peerName.c_str(), segCmd); + evbuffer_drain(segBuf.get(), evbuffer_get_length(segBuf.get())); + break; +#define CASE(OP) case CMD_##OP: handle_##OP(); break + CASE(ECHO); + CASE(CONNECTION_VALIDATION); + CASE(CONNECTION_VALIDATED); + CASE(SEARCH); + CASE(AUTHNZ); + + CASE(CREATE_CHANNEL); + CASE(DESTROY_CHANNEL); + + CASE(GET); + CASE(PUT); + CASE(PUT_GET); + CASE(MONITOR); + CASE(RPC); + CASE(CANCEL_REQUEST); + CASE(DESTROY_REQUEST); + CASE(GET_FIELD); + + CASE(MESSAGE); +#undef CASE + } + // handlers may have cleared bev to force disconnect + if(!bev) + break; + + // silently drain any unprocessed body (forward compatibility) + if(auto n = evbuffer_get_length(segBuf.get())) + evbuffer_drain(segBuf.get(), n); + + // wait for next header + bufferevent_setwatermark(bev.get(), EV_READ, 8, tcp_readahead); + } + } + + if(!bev) { + cleanup(); + } +} + +void ConnBase::bevWrite() {} + +void ConnBase::bevEventS(struct bufferevent *bev, short events, void *ptr) +{ + auto conn = static_cast(ptr); + try { + conn->bevEvent(events); + }catch(std::exception& e){ + log_crit_printf(connsetup, "%s %s Unhandled error in bev event callback: %s\n", conn->peerLabel(), conn->peerName.c_str(), e.what()); + static_cast(ptr)->cleanup(); + } +} + +void ConnBase::bevReadS(struct bufferevent *bev, void *ptr) +{ + auto conn = static_cast(ptr); + try { + conn->bevRead(); + }catch(std::exception& e){ + log_crit_printf(connsetup, "%s %s Unhandled error in bev read callback: %s\n", conn->peerLabel(), conn->peerName.c_str(), e.what()); + static_cast(ptr)->cleanup(); + } +} + +void ConnBase::bevWriteS(struct bufferevent *bev, void *ptr) +{ + auto conn = static_cast(ptr); + try { + conn->bevWrite(); + }catch(std::exception& e){ + log_crit_printf(connsetup, "%s %s Unhandled error in bev write callback: %s\n", conn->peerLabel(), conn->peerName.c_str(), e.what()); + static_cast(ptr)->cleanup(); + } +} + +} // namespace impl +} // namespace pvxs diff --git a/src/conn.h b/src/conn.h new file mode 100644 index 0000000..8b6ade9 --- /dev/null +++ b/src/conn.h @@ -0,0 +1,79 @@ +/** + * Copyright - See the COPYRIGHT that is included with this distribution. + * pvxs is distributed subject to a Software License Agreement found + * in file LICENSE that is included with this distribution. + */ +#ifndef CONN_H +#define CONN_H + +#include "evhelper.h" +#include "dataimpl.h" +#include "utilpvt.h" + +namespace pvxs { +namespace impl { + +// Amount of following messages which we allow to be read while +// processing the current message. Avoids some extra recv() calls, +// at the price of maybe extra copying. +constexpr size_t tcp_readahead = 0x1000u; + +struct ConnBase +{ + SockAddr peerAddr; + std::string peerName; + evbufferevent bev; + TypeStore rxRegistry; + + const bool isClient; + bool peerBE; + bool expectSeg; + + uint8_t segCmd; + evbuf segBuf, txBody; + + ConnBase(bool isClient, bufferevent* bev, const SockAddr& peerAddr); + ConnBase(const ConnBase&) = delete; + ConnBase& operator=(const ConnBase&) = delete; + virtual ~ConnBase(); + + const char* peerLabel() const; + + void enqueueTxBody(pva_app_msg_t cmd); + +protected: +#define CASE(Op) virtual void handle_##Op(); + CASE(ECHO); + CASE(CONNECTION_VALIDATION); + CASE(CONNECTION_VALIDATED); + CASE(SEARCH); + CASE(AUTHNZ); + + CASE(CREATE_CHANNEL); + CASE(DESTROY_CHANNEL); + + CASE(GET); + CASE(PUT); + CASE(PUT_GET); + CASE(MONITOR); + CASE(RPC); + CASE(CANCEL_REQUEST); + CASE(DESTROY_REQUEST); + CASE(GET_FIELD); + + CASE(MESSAGE); +#undef CASE + + virtual void cleanup() =0; + virtual void bevEvent(short events); + virtual void bevRead(); + virtual void bevWrite(); + static void bevEventS(struct bufferevent *bev, short events, void *ptr); + static void bevReadS(struct bufferevent *bev, void *ptr); + static void bevWriteS(struct bufferevent *bev, void *ptr); +}; + +} // namespace impl +} // namespace pvxs + +#endif // CONN_H diff --git a/src/serverconn.cpp b/src/serverconn.cpp index fbeb820..fa43aa9 100644 --- a/src/serverconn.cpp +++ b/src/serverconn.cpp @@ -15,10 +15,8 @@ #include #include "serverconn.h" -// Amount of following messages which we allow to be read while -// processing the current message. Avoids some extra recv() calls, -// at the price of maybe extra copying. -static const size_t tcp_readahead = 0x1000; +// limit on size of TX buffer above which we suspend RX +static constexpr size_t tcp_tx_limit = 0x100000; namespace pvxs {namespace impl { @@ -30,22 +28,15 @@ DEFINE_LOGGER(connio, "pvxs.tcp.io"); DEFINE_LOGGER(remote, "pvxs.remote.log"); ServerConn::ServerConn(ServIface* iface, evutil_socket_t sock, struct sockaddr *peer, int socklen) - :iface(iface) - ,peerAddr(peer, socklen) - ,peerName(peerAddr.tostring()) - ,bev(bufferevent_socket_new(iface->server->acceptor_loop.base, sock, BEV_OPT_CLOSE_ON_FREE|BEV_OPT_DEFER_CALLBACKS)) - ,peerBE(true) // arbitrary choice, default should be overwritten before use - ,expectSeg(false) - ,segCmd(0xff) - ,segBuf(evbuffer_new()) - ,txBody(evbuffer_new()) + :ConnBase(false, + bufferevent_socket_new(iface->server->acceptor_loop.base, sock, BEV_OPT_CLOSE_ON_FREE|BEV_OPT_DEFER_CALLBACKS), + SockAddr(peer, socklen)) + ,iface(iface) ,nextSID(0) { log_debug_printf(connio, "Client %s connects\n", peerName.c_str()); bufferevent_setcb(bev.get(), &bevReadS, &bevWriteS, &bevEventS, this); - // initially wait for at least a header - bufferevent_setwatermark(bev.get(), EV_READ, 8, tcp_readahead); timeval timo = {30, 0}; bufferevent_set_timeouts(bev.get(), &timo, &timo); @@ -99,17 +90,6 @@ const std::shared_ptr& ServerConn::lookupSID(uint32_t sid) return it->second; } -void ServerConn::enqueueTxBody(pva_app_msg_t cmd) -{ - auto tx = bufferevent_get_output(bev.get()); - to_evbuf(tx, Header{cmd, - pva_flags::Server, - uint32_t(evbuffer_get_length(txBody.get()))}, - hostBE); - auto err = evbuffer_add_buffer(tx, txBody.get()); - assert(!err); -} - void ServerConn::handle_ECHO() { // Client requests echo as a keep-alive check @@ -149,9 +129,7 @@ void ServerConn::handle_CONNECTION_VALIDATION() std::string selected; { - M.skip(6); // ignore unused buffer and introspection size - uint16_t qos; - from_wire(M, qos); + M.skip(4+2+2); // ignore unused buffer, introspection size, and QoS from_wire(M, selected); Value auth; @@ -176,7 +154,7 @@ void ServerConn::handle_CONNECTION_VALIDATION() return; } else { - log_debug_printf(connsetup, "Client %s selects auth \"%s\"", peerName.c_str(), selected.c_str()); + log_debug_printf(connsetup, "Client %s selects auth \"%s\"\n", peerName.c_str(), selected.c_str()); } // remainder of segBuf is payload w/ credentials @@ -316,147 +294,18 @@ void ServerConn::cleanup() } } -void ServerConn::bevEvent(short events) -{ - if(events&(BEV_EVENT_EOF|BEV_EVENT_ERROR|BEV_EVENT_TIMEOUT)) { - if(events&BEV_EVENT_ERROR) { - int err = EVUTIL_SOCKET_ERROR(); - const char *msg = evutil_socket_error_to_string(err); - log_err_printf(connio, "Client %s connection closed with socket error %d : %s\n", peerName.c_str(), err, msg); - } - if(events&BEV_EVENT_EOF) { - log_debug_printf(connio, "Client %s connection closed by peer\n", peerName.c_str()); - } - if(events&BEV_EVENT_TIMEOUT) { - log_warn_printf(connio, "Client %s connection timeout\n", peerName.c_str()); - } - bev.reset(); - } - - if(!bev) - cleanup(); -} - void ServerConn::bevRead() { - auto rx = bufferevent_get_input(bev.get()); - - while(bev && evbuffer_get_length(rx)>=8) { - uint8_t header[8]; - - auto ret = evbuffer_copyout(rx, header, sizeof(header)); - assert(ret==sizeof(header)); // previously verified - - if(header[0]!=0xca || header[1]==0 || (header[2]&pva_flags::Server)) { - log_hex_printf(connio, Level::Err, header, sizeof(header), "Client %s Protocol decode fault. Force disconnect.\n", peerName.c_str()); - bev.reset(); - break; - } - log_hex_printf(connio, Level::Debug, header, sizeof(header), "Client %s Receive header\n", peerName.c_str()); - - if(header[2]&pva_flags::Control) { - // Control messages are not actually useful - evbuffer_drain(rx, 8); - continue; - } - // application message - - peerBE = header[2]&pva_flags::MSB; - - // a bit verbose :P - FixedBuf L(peerBE, header+4, 4); - uint32_t len = 0; - from_wire(L, len); - assert(L.good()); - - if(evbuffer_get_length(rx)-8 < len) { - // wait for complete payload - // and some additional if available - size_t readahead = len; - if(readahead < std::numeric_limits::max()-tcp_readahead) - readahead += tcp_readahead; - bufferevent_setwatermark(bev.get(), EV_READ, len, readahead); - break; - } - - evbuffer_drain(rx, 8); - { - unsigned n = evbuffer_remove_buffer(rx, segBuf.get(), len); - assert(n==len); // we know rx buf contains the entire body - } - - // so far we do not use segmentation to support incremental processing - // of long messages. We instead accumulate all segments of a message - // prior to parsing. - - auto seg = header[2]&pva_flags::SegMask; - - bool continuation = seg&pva_flags::SegLast; // true for mid or last. false for none for first - if((continuation ^ expectSeg) || (continuation && header[3]!=segCmd)) { - log_crit_printf(connio, "Client %s Peer segmentation violation %c%c 0x%02x==0x%02x\n", peerName.c_str(), - expectSeg?'Y':'N', continuation?'Y':'N', - segCmd, header[3]); - bev.reset(); - break; - } - - if(!seg || seg==pva_flags::SegFirst) { - expectSeg = true; - segCmd = header[3]; - } - - if(!seg || seg==pva_flags::SegLast) { - expectSeg = false; - - // ready to process segBuf - switch(segCmd) { - default: - log_debug_printf(connio, "Client %s Ignore unexpected command 0x%02x\n", peerName.c_str(), segCmd); - evbuffer_drain(segBuf.get(), evbuffer_get_length(segBuf.get())); - break; -#define CASE(OP) case CMD_##OP: handle_##OP(); break - CASE(ECHO); - CASE(CONNECTION_VALIDATION); - CASE(SEARCH); - CASE(AUTHNZ); - - CASE(CREATE_CHANNEL); - CASE(DESTROY_CHANNEL); - - CASE(GET); - CASE(PUT); - CASE(PUT_GET); - CASE(MONITOR); - CASE(RPC); - CASE(CANCEL_REQUEST); - CASE(DESTROY_REQUEST); - CASE(GET_FIELD); - - CASE(MESSAGE); -#undef CASE - } - // handlers may have cleared bev to force disconnect - if(!bev) - break; - - // silently drain any unprocessed body (forward compatibility) - if(auto n = evbuffer_get_length(segBuf.get())) - evbuffer_drain(segBuf.get(), n); - - // wait for next header - bufferevent_setwatermark(bev.get(), EV_READ, 8, tcp_readahead); - } - } + ConnBase::bevRead(); if(!bev) { - cleanup(); } else if(auto tx = bufferevent_get_output(bev.get())) { - if(evbuffer_get_length(tx)>=0x100000) { + if(evbuffer_get_length(tx)>=tcp_tx_limit) { // write buffer "full". stop reading until it drains // TODO configure (void)bufferevent_disable(bev.get(), EV_READ); - bufferevent_setwatermark(bev.get(), EV_WRITE, 0x100000/2, 0); + bufferevent_setwatermark(bev.get(), EV_WRITE, tcp_tx_limit/2, 0); log_debug_printf(connio, "%s suspend READ\n", peerName.c_str()); } } @@ -469,7 +318,7 @@ void ServerConn::bevWrite() auto tx = bufferevent_get_output(bev.get()); // handle pending monitors - while(!backlog.empty() && evbuffer_get_length(tx)<0x100000) { + while(!backlog.empty() && evbuffer_get_length(tx)(ptr); - try { - conn->bevEvent(events); - }catch(std::exception& e){ - log_crit_printf(connsetup, "Client %s Unhandled error in bev event callback: %s\n", conn->peerName.c_str(), e.what()); - static_cast(ptr)->cleanup(); - } -} - -void ServerConn::bevReadS(struct bufferevent *bev, void *ptr) -{ - auto conn = static_cast(ptr); - try { - conn->bevRead(); - }catch(std::exception& e){ - log_crit_printf(connsetup, "Client %s Unhandled error in bev read callback: %s\n", conn->peerName.c_str(), e.what()); - static_cast(ptr)->cleanup(); - } -} - -void ServerConn::bevWriteS(struct bufferevent *bev, void *ptr) -{ - auto conn = static_cast(ptr); - try { - conn->bevWrite(); - }catch(std::exception& e){ - log_crit_printf(connsetup, "Client %s Unhandled error in bev write callback: %s\n", conn->peerName.c_str(), e.what()); - static_cast(ptr)->cleanup(); - } -} ServIface::ServIface(const std::string& addr, unsigned short port, server::Server::Pvt *server, bool fallback) :server(server) diff --git a/src/serverconn.h b/src/serverconn.h index f967976..48a3580 100644 --- a/src/serverconn.h +++ b/src/serverconn.h @@ -20,6 +20,7 @@ #include "utilpvt.h" #include "dataimpl.h" #include "udp_collector.h" +#include "conn.h" namespace pvxs {namespace impl { @@ -93,23 +94,12 @@ struct ServerChan ~ServerChan(); }; -struct ServerConn : public std::enable_shared_from_this +struct ServerConn : public ConnBase, public std::enable_shared_from_this { ServIface* const iface; - SockAddr peerAddr; - std::string peerName; - evbufferevent bev; - TypeStore rxRegistry; - // credentials - bool peerBE; - bool expectSeg; - - uint8_t segCmd; - evbuf segBuf, txBody; - uint32_t nextSID; std::map > chanBySID; std::map > opByIOID; @@ -123,10 +113,8 @@ struct ServerConn : public std::enable_shared_from_this const std::shared_ptr& lookupSID(uint32_t sid); - void enqueueTxBody(pva_app_msg_t cmd); - private: -#define CASE(Op) void handle_##Op(); +#define CASE(Op) virtual void handle_##Op() override final; CASE(ECHO); CASE(CONNECTION_VALIDATION); CASE(SEARCH); @@ -149,13 +137,10 @@ private: void handle_GPR(pva_app_msg_t cmd); - void cleanup(); - void bevEvent(short events); - void bevRead(); - void bevWrite(); - static void bevEventS(struct bufferevent *bev, short events, void *ptr); - static void bevReadS(struct bufferevent *bev, void *ptr); - static void bevWriteS(struct bufferevent *bev, void *ptr); + virtual void cleanup() override final; + //void bevEvent(short events); + virtual void bevRead() override final; + virtual void bevWrite() override final; }; struct ServIface