separate basic TCP RX handling

This commit is contained in:
Michael Davidsaver
2020-02-20 11:41:09 -08:00
parent 8c646cd65a
commit 6504466bbf
5 changed files with 350 additions and 218 deletions
+1
View File
@@ -64,6 +64,7 @@ LIB_SRCS += evhelper.cpp
LIB_SRCS += udp_collector.cpp
LIB_SRCS += config.cpp
LIB_SRCS += conn.cpp
LIB_SRCS += server.cpp
LIB_SRCS += serverconn.cpp
+250
View File
@@ -0,0 +1,250 @@
/**
* Copyright - See the COPYRIGHT that is included with this distribution.
* pvxs is distributed subject to a Software License Agreement found
* in file LICENSE that is included with this distribution.
*/
#include <epicsAssert.h>
#include <pvxs/log.h>
#include "conn.h"
DEFINE_LOGGER(connsetup, "pvxs.tcp.setup");
DEFINE_LOGGER(connio, "pvxs.tcp.io");
namespace pvxs {
namespace impl {
ConnBase::ConnBase(bool isClient, bufferevent* bev, const SockAddr& peerAddr)
:peerAddr(peerAddr)
,peerName(peerAddr.tostring())
,bev(bev)
,isClient(isClient)
,peerBE(true) // arbitrary choice, default should be overwritten before use
,expectSeg(false)
,segCmd(0xff)
,segBuf(evbuffer_new())
,txBody(evbuffer_new())
{
// initially wait for at least a header
bufferevent_setwatermark(this->bev.get(), EV_READ, 8, tcp_readahead);
}
ConnBase::~ConnBase() {}
const char* ConnBase::peerLabel() const
{
return isClient ? "Server" : "Client";
}
void ConnBase::enqueueTxBody(pva_app_msg_t cmd)
{
auto tx = bufferevent_get_output(bev.get());
to_evbuf(tx, Header{cmd,
uint8_t(isClient ? 0u : pva_flags::Server),
uint32_t(evbuffer_get_length(txBody.get()))},
hostBE);
auto err = evbuffer_add_buffer(tx, txBody.get());
assert(!err);
}
#define CASE(Op) void ConnBase::handle_##Op() {}
CASE(ECHO);
CASE(CONNECTION_VALIDATION);
CASE(CONNECTION_VALIDATED);
CASE(SEARCH);
CASE(AUTHNZ);
CASE(CREATE_CHANNEL);
CASE(DESTROY_CHANNEL);
CASE(GET);
CASE(PUT);
CASE(PUT_GET);
CASE(MONITOR);
CASE(RPC);
CASE(CANCEL_REQUEST);
CASE(DESTROY_REQUEST);
CASE(GET_FIELD);
CASE(MESSAGE);
#undef CASE
void ConnBase::bevEvent(short events)
{
if(events&(BEV_EVENT_EOF|BEV_EVENT_ERROR|BEV_EVENT_TIMEOUT)) {
if(events&BEV_EVENT_ERROR) {
int err = EVUTIL_SOCKET_ERROR();
const char *msg = evutil_socket_error_to_string(err);
log_err_printf(connio, "%s %s connection closed with socket error %d : %s\n", peerLabel(), peerName.c_str(), err, msg);
}
if(events&BEV_EVENT_EOF) {
log_debug_printf(connio, "%s %s connection closed by peer\n", peerLabel(), peerName.c_str());
}
if(events&BEV_EVENT_TIMEOUT) {
log_warn_printf(connio, "%s %s connection timeout\n", peerLabel(), peerName.c_str());
}
bev.reset();
}
if(!bev)
cleanup();
}
void ConnBase::bevRead()
{
auto rx = bufferevent_get_input(bev.get());
while(bev && evbuffer_get_length(rx)>=8) {
uint8_t header[8];
auto ret = evbuffer_copyout(rx, header, sizeof(header));
assert(ret==sizeof(header)); // previously verified
if(header[0]!=0xca || header[1]==0
|| (isClient ^ !!(header[2]&pva_flags::Server))) {
log_hex_printf(connio, Level::Err, header, sizeof(header),
"%s %s Protocol decode fault. Force disconnect.\n", peerLabel(), peerName.c_str());
bev.reset();
break;
}
log_hex_printf(connio, Level::Debug, header, sizeof(header),
"%s %s Receive header\n", peerLabel(), peerName.c_str());
if(header[2]&pva_flags::Control) {
// Control messages are not actually useful
evbuffer_drain(rx, 8);
continue;
}
// application message
peerBE = header[2]&pva_flags::MSB;
// a bit verbose :P
FixedBuf L(peerBE, header+4, 4);
uint32_t len = 0;
from_wire(L, len);
assert(L.good());
if(evbuffer_get_length(rx)-8 < len) {
// wait for complete payload
// and some additional if available
size_t readahead = len;
if(readahead < std::numeric_limits<size_t>::max()-tcp_readahead)
readahead += tcp_readahead;
bufferevent_setwatermark(bev.get(), EV_READ, len, readahead);
break;
}
evbuffer_drain(rx, 8);
{
unsigned n = evbuffer_remove_buffer(rx, segBuf.get(), len);
assert(n==len); // we know rx buf contains the entire body
}
// so far we do not use segmentation to support incremental processing
// of long messages. We instead accumulate all segments of a message
// prior to parsing.
auto seg = header[2]&pva_flags::SegMask;
bool continuation = seg&pva_flags::SegLast; // true for mid or last. false for none or first
if((continuation ^ expectSeg) || (continuation && header[3]!=segCmd)) {
log_crit_printf(connio, "%s %s Peer segmentation violation %c%c 0x%02x==0x%02x\n", peerLabel(), peerName.c_str(),
expectSeg?'Y':'N', continuation?'Y':'N',
segCmd, header[3]);
bev.reset();
break;
}
if(!seg || seg==pva_flags::SegFirst) {
expectSeg = true;
segCmd = header[3];
}
if(!seg || seg==pva_flags::SegLast) {
expectSeg = false;
// ready to process segBuf
switch(segCmd) {
default:
log_debug_printf(connio, "%s %s Ignore unexpected command 0x%02x\n", peerLabel(), peerName.c_str(), segCmd);
evbuffer_drain(segBuf.get(), evbuffer_get_length(segBuf.get()));
break;
#define CASE(OP) case CMD_##OP: handle_##OP(); break
CASE(ECHO);
CASE(CONNECTION_VALIDATION);
CASE(CONNECTION_VALIDATED);
CASE(SEARCH);
CASE(AUTHNZ);
CASE(CREATE_CHANNEL);
CASE(DESTROY_CHANNEL);
CASE(GET);
CASE(PUT);
CASE(PUT_GET);
CASE(MONITOR);
CASE(RPC);
CASE(CANCEL_REQUEST);
CASE(DESTROY_REQUEST);
CASE(GET_FIELD);
CASE(MESSAGE);
#undef CASE
}
// handlers may have cleared bev to force disconnect
if(!bev)
break;
// silently drain any unprocessed body (forward compatibility)
if(auto n = evbuffer_get_length(segBuf.get()))
evbuffer_drain(segBuf.get(), n);
// wait for next header
bufferevent_setwatermark(bev.get(), EV_READ, 8, tcp_readahead);
}
}
if(!bev) {
cleanup();
}
}
void ConnBase::bevWrite() {}
void ConnBase::bevEventS(struct bufferevent *bev, short events, void *ptr)
{
auto conn = static_cast<ConnBase*>(ptr);
try {
conn->bevEvent(events);
}catch(std::exception& e){
log_crit_printf(connsetup, "%s %s Unhandled error in bev event callback: %s\n", conn->peerLabel(), conn->peerName.c_str(), e.what());
static_cast<ConnBase*>(ptr)->cleanup();
}
}
void ConnBase::bevReadS(struct bufferevent *bev, void *ptr)
{
auto conn = static_cast<ConnBase*>(ptr);
try {
conn->bevRead();
}catch(std::exception& e){
log_crit_printf(connsetup, "%s %s Unhandled error in bev read callback: %s\n", conn->peerLabel(), conn->peerName.c_str(), e.what());
static_cast<ConnBase*>(ptr)->cleanup();
}
}
void ConnBase::bevWriteS(struct bufferevent *bev, void *ptr)
{
auto conn = static_cast<ConnBase*>(ptr);
try {
conn->bevWrite();
}catch(std::exception& e){
log_crit_printf(connsetup, "%s %s Unhandled error in bev write callback: %s\n", conn->peerLabel(), conn->peerName.c_str(), e.what());
static_cast<ConnBase*>(ptr)->cleanup();
}
}
} // namespace impl
} // namespace pvxs
+79
View File
@@ -0,0 +1,79 @@
/**
* Copyright - See the COPYRIGHT that is included with this distribution.
* pvxs is distributed subject to a Software License Agreement found
* in file LICENSE that is included with this distribution.
*/
#ifndef CONN_H
#define CONN_H
#include "evhelper.h"
#include "dataimpl.h"
#include "utilpvt.h"
namespace pvxs {
namespace impl {
// Amount of following messages which we allow to be read while
// processing the current message. Avoids some extra recv() calls,
// at the price of maybe extra copying.
constexpr size_t tcp_readahead = 0x1000u;
struct ConnBase
{
SockAddr peerAddr;
std::string peerName;
evbufferevent bev;
TypeStore rxRegistry;
const bool isClient;
bool peerBE;
bool expectSeg;
uint8_t segCmd;
evbuf segBuf, txBody;
ConnBase(bool isClient, bufferevent* bev, const SockAddr& peerAddr);
ConnBase(const ConnBase&) = delete;
ConnBase& operator=(const ConnBase&) = delete;
virtual ~ConnBase();
const char* peerLabel() const;
void enqueueTxBody(pva_app_msg_t cmd);
protected:
#define CASE(Op) virtual void handle_##Op();
CASE(ECHO);
CASE(CONNECTION_VALIDATION);
CASE(CONNECTION_VALIDATED);
CASE(SEARCH);
CASE(AUTHNZ);
CASE(CREATE_CHANNEL);
CASE(DESTROY_CHANNEL);
CASE(GET);
CASE(PUT);
CASE(PUT_GET);
CASE(MONITOR);
CASE(RPC);
CASE(CANCEL_REQUEST);
CASE(DESTROY_REQUEST);
CASE(GET_FIELD);
CASE(MESSAGE);
#undef CASE
virtual void cleanup() =0;
virtual void bevEvent(short events);
virtual void bevRead();
virtual void bevWrite();
static void bevEventS(struct bufferevent *bev, short events, void *ptr);
static void bevReadS(struct bufferevent *bev, void *ptr);
static void bevWriteS(struct bufferevent *bev, void *ptr);
};
} // namespace impl
} // namespace pvxs
#endif // CONN_H
+13 -196
View File
@@ -15,10 +15,8 @@
#include <pvxs/log.h>
#include "serverconn.h"
// Amount of following messages which we allow to be read while
// processing the current message. Avoids some extra recv() calls,
// at the price of maybe extra copying.
static const size_t tcp_readahead = 0x1000;
// limit on size of TX buffer above which we suspend RX
static constexpr size_t tcp_tx_limit = 0x100000;
namespace pvxs {namespace impl {
@@ -30,22 +28,15 @@ DEFINE_LOGGER(connio, "pvxs.tcp.io");
DEFINE_LOGGER(remote, "pvxs.remote.log");
ServerConn::ServerConn(ServIface* iface, evutil_socket_t sock, struct sockaddr *peer, int socklen)
:iface(iface)
,peerAddr(peer, socklen)
,peerName(peerAddr.tostring())
,bev(bufferevent_socket_new(iface->server->acceptor_loop.base, sock, BEV_OPT_CLOSE_ON_FREE|BEV_OPT_DEFER_CALLBACKS))
,peerBE(true) // arbitrary choice, default should be overwritten before use
,expectSeg(false)
,segCmd(0xff)
,segBuf(evbuffer_new())
,txBody(evbuffer_new())
:ConnBase(false,
bufferevent_socket_new(iface->server->acceptor_loop.base, sock, BEV_OPT_CLOSE_ON_FREE|BEV_OPT_DEFER_CALLBACKS),
SockAddr(peer, socklen))
,iface(iface)
,nextSID(0)
{
log_debug_printf(connio, "Client %s connects\n", peerName.c_str());
bufferevent_setcb(bev.get(), &bevReadS, &bevWriteS, &bevEventS, this);
// initially wait for at least a header
bufferevent_setwatermark(bev.get(), EV_READ, 8, tcp_readahead);
timeval timo = {30, 0};
bufferevent_set_timeouts(bev.get(), &timo, &timo);
@@ -99,17 +90,6 @@ const std::shared_ptr<ServerChan>& ServerConn::lookupSID(uint32_t sid)
return it->second;
}
void ServerConn::enqueueTxBody(pva_app_msg_t cmd)
{
auto tx = bufferevent_get_output(bev.get());
to_evbuf(tx, Header{cmd,
pva_flags::Server,
uint32_t(evbuffer_get_length(txBody.get()))},
hostBE);
auto err = evbuffer_add_buffer(tx, txBody.get());
assert(!err);
}
void ServerConn::handle_ECHO()
{
// Client requests echo as a keep-alive check
@@ -149,9 +129,7 @@ void ServerConn::handle_CONNECTION_VALIDATION()
std::string selected;
{
M.skip(6); // ignore unused buffer and introspection size
uint16_t qos;
from_wire(M, qos);
M.skip(4+2+2); // ignore unused buffer, introspection size, and QoS
from_wire(M, selected);
Value auth;
@@ -176,7 +154,7 @@ void ServerConn::handle_CONNECTION_VALIDATION()
return;
} else {
log_debug_printf(connsetup, "Client %s selects auth \"%s\"", peerName.c_str(), selected.c_str());
log_debug_printf(connsetup, "Client %s selects auth \"%s\"\n", peerName.c_str(), selected.c_str());
}
// remainder of segBuf is payload w/ credentials
@@ -316,147 +294,18 @@ void ServerConn::cleanup()
}
}
void ServerConn::bevEvent(short events)
{
if(events&(BEV_EVENT_EOF|BEV_EVENT_ERROR|BEV_EVENT_TIMEOUT)) {
if(events&BEV_EVENT_ERROR) {
int err = EVUTIL_SOCKET_ERROR();
const char *msg = evutil_socket_error_to_string(err);
log_err_printf(connio, "Client %s connection closed with socket error %d : %s\n", peerName.c_str(), err, msg);
}
if(events&BEV_EVENT_EOF) {
log_debug_printf(connio, "Client %s connection closed by peer\n", peerName.c_str());
}
if(events&BEV_EVENT_TIMEOUT) {
log_warn_printf(connio, "Client %s connection timeout\n", peerName.c_str());
}
bev.reset();
}
if(!bev)
cleanup();
}
void ServerConn::bevRead()
{
auto rx = bufferevent_get_input(bev.get());
while(bev && evbuffer_get_length(rx)>=8) {
uint8_t header[8];
auto ret = evbuffer_copyout(rx, header, sizeof(header));
assert(ret==sizeof(header)); // previously verified
if(header[0]!=0xca || header[1]==0 || (header[2]&pva_flags::Server)) {
log_hex_printf(connio, Level::Err, header, sizeof(header), "Client %s Protocol decode fault. Force disconnect.\n", peerName.c_str());
bev.reset();
break;
}
log_hex_printf(connio, Level::Debug, header, sizeof(header), "Client %s Receive header\n", peerName.c_str());
if(header[2]&pva_flags::Control) {
// Control messages are not actually useful
evbuffer_drain(rx, 8);
continue;
}
// application message
peerBE = header[2]&pva_flags::MSB;
// a bit verbose :P
FixedBuf L(peerBE, header+4, 4);
uint32_t len = 0;
from_wire(L, len);
assert(L.good());
if(evbuffer_get_length(rx)-8 < len) {
// wait for complete payload
// and some additional if available
size_t readahead = len;
if(readahead < std::numeric_limits<size_t>::max()-tcp_readahead)
readahead += tcp_readahead;
bufferevent_setwatermark(bev.get(), EV_READ, len, readahead);
break;
}
evbuffer_drain(rx, 8);
{
unsigned n = evbuffer_remove_buffer(rx, segBuf.get(), len);
assert(n==len); // we know rx buf contains the entire body
}
// so far we do not use segmentation to support incremental processing
// of long messages. We instead accumulate all segments of a message
// prior to parsing.
auto seg = header[2]&pva_flags::SegMask;
bool continuation = seg&pva_flags::SegLast; // true for mid or last. false for none for first
if((continuation ^ expectSeg) || (continuation && header[3]!=segCmd)) {
log_crit_printf(connio, "Client %s Peer segmentation violation %c%c 0x%02x==0x%02x\n", peerName.c_str(),
expectSeg?'Y':'N', continuation?'Y':'N',
segCmd, header[3]);
bev.reset();
break;
}
if(!seg || seg==pva_flags::SegFirst) {
expectSeg = true;
segCmd = header[3];
}
if(!seg || seg==pva_flags::SegLast) {
expectSeg = false;
// ready to process segBuf
switch(segCmd) {
default:
log_debug_printf(connio, "Client %s Ignore unexpected command 0x%02x\n", peerName.c_str(), segCmd);
evbuffer_drain(segBuf.get(), evbuffer_get_length(segBuf.get()));
break;
#define CASE(OP) case CMD_##OP: handle_##OP(); break
CASE(ECHO);
CASE(CONNECTION_VALIDATION);
CASE(SEARCH);
CASE(AUTHNZ);
CASE(CREATE_CHANNEL);
CASE(DESTROY_CHANNEL);
CASE(GET);
CASE(PUT);
CASE(PUT_GET);
CASE(MONITOR);
CASE(RPC);
CASE(CANCEL_REQUEST);
CASE(DESTROY_REQUEST);
CASE(GET_FIELD);
CASE(MESSAGE);
#undef CASE
}
// handlers may have cleared bev to force disconnect
if(!bev)
break;
// silently drain any unprocessed body (forward compatibility)
if(auto n = evbuffer_get_length(segBuf.get()))
evbuffer_drain(segBuf.get(), n);
// wait for next header
bufferevent_setwatermark(bev.get(), EV_READ, 8, tcp_readahead);
}
}
ConnBase::bevRead();
if(!bev) {
cleanup();
} else if(auto tx = bufferevent_get_output(bev.get())) {
if(evbuffer_get_length(tx)>=0x100000) {
if(evbuffer_get_length(tx)>=tcp_tx_limit) {
// write buffer "full". stop reading until it drains
// TODO configure
(void)bufferevent_disable(bev.get(), EV_READ);
bufferevent_setwatermark(bev.get(), EV_WRITE, 0x100000/2, 0);
bufferevent_setwatermark(bev.get(), EV_WRITE, tcp_tx_limit/2, 0);
log_debug_printf(connio, "%s suspend READ\n", peerName.c_str());
}
}
@@ -469,7 +318,7 @@ void ServerConn::bevWrite()
auto tx = bufferevent_get_output(bev.get());
// handle pending monitors
while(!backlog.empty() && evbuffer_get_length(tx)<0x100000) {
while(!backlog.empty() && evbuffer_get_length(tx)<tcp_tx_limit) {
auto fn = std::move(backlog.front());
backlog.pop_front();
@@ -477,45 +326,13 @@ void ServerConn::bevWrite()
}
// TODO configure
if(evbuffer_get_length(tx)<0x100000) {
if(evbuffer_get_length(tx)<tcp_tx_limit) {
(void)bufferevent_enable(bev.get(), EV_READ);
bufferevent_setwatermark(bev.get(), EV_WRITE, 0, 0);
log_debug_printf(connio, "%s resume READ\n", peerName.c_str());
}
}
void ServerConn::bevEventS(struct bufferevent *bev, short events, void *ptr)
{
auto conn = static_cast<ServerConn*>(ptr);
try {
conn->bevEvent(events);
}catch(std::exception& e){
log_crit_printf(connsetup, "Client %s Unhandled error in bev event callback: %s\n", conn->peerName.c_str(), e.what());
static_cast<ServerConn*>(ptr)->cleanup();
}
}
void ServerConn::bevReadS(struct bufferevent *bev, void *ptr)
{
auto conn = static_cast<ServerConn*>(ptr);
try {
conn->bevRead();
}catch(std::exception& e){
log_crit_printf(connsetup, "Client %s Unhandled error in bev read callback: %s\n", conn->peerName.c_str(), e.what());
static_cast<ServerConn*>(ptr)->cleanup();
}
}
void ServerConn::bevWriteS(struct bufferevent *bev, void *ptr)
{
auto conn = static_cast<ServerConn*>(ptr);
try {
conn->bevWrite();
}catch(std::exception& e){
log_crit_printf(connsetup, "Client %s Unhandled error in bev write callback: %s\n", conn->peerName.c_str(), e.what());
static_cast<ServerConn*>(ptr)->cleanup();
}
}
ServIface::ServIface(const std::string& addr, unsigned short port, server::Server::Pvt *server, bool fallback)
:server(server)
+7 -22
View File
@@ -20,6 +20,7 @@
#include "utilpvt.h"
#include "dataimpl.h"
#include "udp_collector.h"
#include "conn.h"
namespace pvxs {namespace impl {
@@ -93,23 +94,12 @@ struct ServerChan
~ServerChan();
};
struct ServerConn : public std::enable_shared_from_this<ServerConn>
struct ServerConn : public ConnBase, public std::enable_shared_from_this<ServerConn>
{
ServIface* const iface;
SockAddr peerAddr;
std::string peerName;
evbufferevent bev;
TypeStore rxRegistry;
// credentials
bool peerBE;
bool expectSeg;
uint8_t segCmd;
evbuf segBuf, txBody;
uint32_t nextSID;
std::map<uint32_t, std::shared_ptr<ServerChan> > chanBySID;
std::map<uint32_t, std::shared_ptr<ServerOp> > opByIOID;
@@ -123,10 +113,8 @@ struct ServerConn : public std::enable_shared_from_this<ServerConn>
const std::shared_ptr<ServerChan>& lookupSID(uint32_t sid);
void enqueueTxBody(pva_app_msg_t cmd);
private:
#define CASE(Op) void handle_##Op();
#define CASE(Op) virtual void handle_##Op() override final;
CASE(ECHO);
CASE(CONNECTION_VALIDATION);
CASE(SEARCH);
@@ -149,13 +137,10 @@ private:
void handle_GPR(pva_app_msg_t cmd);
void cleanup();
void bevEvent(short events);
void bevRead();
void bevWrite();
static void bevEventS(struct bufferevent *bev, short events, void *ptr);
static void bevReadS(struct bufferevent *bev, void *ptr);
static void bevWriteS(struct bufferevent *bev, void *ptr);
virtual void cleanup() override final;
//void bevEvent(short events);
virtual void bevRead() override final;
virtual void bevWrite() override final;
};
struct ServIface