redo packet build/parse

now with evbuffer
This commit is contained in:
Michael Davidsaver
2019-11-07 11:48:58 -08:00
parent 06e780872b
commit bab82affb8
14 changed files with 514 additions and 295 deletions
+47 -38
View File
@@ -50,27 +50,26 @@ ServerConn::ServerConn(ServIface* iface, evutil_socket_t sock, struct sockaddr *
uint8_t flags = be ? pva_flags::MSB : 0;
flags |= pva_flags::Server;
sbuf<uint8_t> M(buf.data(), buf.size());
to_wire(M, {0xca, pva_version::server, uint8_t(flags|pva_flags::Control), pva_ctrl_msg::SetEndian}, be);
to_wire(M, uint32_t(0), be);
VectorOutBuf M(be, buf);
to_wire(M, Header{pva_ctrl_msg::SetEndian, pva_flags::Control|pva_flags::Server, 0});
to_wire(M, {0xca, pva_version::server, flags, pva_app_msg::ConnValid}, be);
auto blen = M.split(4);
auto bstart = blen.pos;
auto save = M.save();
M.skip(8); // placeholder for header
// serverReceiveBufferSize, not used
to_wire(M, uint32_t(0x10000), be);
to_wire(M, uint32_t(0x10000));
// serverIntrospectionRegistryMaxSize, also not used
to_wire(M, uint16_t(0x7fff), be);
to_wire(M, Size{2}, be);
to_wire(M, "anonymous", be);
to_wire(M, "ca", be);
to_wire(M, uint16_t(0x7fff));
to_wire(M, Size{2});
to_wire(M, "anonymous");
to_wire(M, "ca");
to_wire(blen, uint32_t(M.pos-bstart), be);
FixedBuf<uint8_t> H(be, save, 8);
to_wire(H, Header{pva_app_msg::ConnValid, pva_flags::Server, uint32_t(M.size()-8)});
assert(!M.err && !blen.err);
assert(M.good() && H.good());
if(evbuffer_add(tx, buf.data(), M.pos-buf.data()))
if(evbuffer_add(tx, buf.data(), M.size()))
throw std::bad_alloc();
}
@@ -91,9 +90,9 @@ void ServerConn::handle_Echo()
const bool be = EPICS_BYTE_ORDER == EPICS_ENDIAN_BIG;
uint8_t header[8];
sbuf<uint8_t> M(header, sizeof(header));
to_wire(M, Header{pva_app_msg::Echo, pva_flags::Server, len}, be);
assert(!M.err);
FixedBuf<uint8_t> M(be, header);
to_wire(M, Header{pva_app_msg::Echo, pva_flags::Server, len});
assert(M.good());
auto err = evbuffer_add(tx, header, sizeof(header));
err |= evbuffer_add_buffer(tx, segBuf.get());
@@ -103,36 +102,43 @@ void ServerConn::handle_Echo()
bufferevent_flush(bev.get(), EV_WRITE, BEV_FLUSH);
}
static
void auth_complete(ServerConn *self, const Status& sts)
{
std::vector<uint8_t> buf(8+5+sts.msg.size());
//sbuf<uint8_t> M(buf);
//M[0] = pva_app_msg::ConnValidated
}
void ServerConn::handle_ConnValid()
{
// Client begins/restarts Auth handshake
// size to extract and process up to auth payload.
// client may only select from our advertised auth
// mechanisms. "anonymous" is the longest.
uint8_t buf[4+2+2+sizeof("anonymous")];
EvInBuf M(peerBE, segBuf.get(), 16);
const auto n = evbuffer_copyout(segBuf.get(), buf, sizeof(buf));
sbuf<uint8_t> M(buf, n);
M += 6; // ignore unused buffer and introspection size
uint16_t qos;
from_wire(M, qos, peerBE);
std::string selected;
from_wire(M, selected, peerBE);
{
M.skip(6); // ignore unused buffer and introspection size
uint16_t qos;
from_wire(M, qos);
from_wire(M, selected);
(void)evbuffer_drain(segBuf.get(), M.pos-buf);
if(!M.good()) {
log_printf(connio, PLVL_ERR, "Truncated/Invalid ConnValid from client");
bev.reset();
return;
if(M.err) {
log_hex_printf(connio, PLVL_ERR, buf, n, "Truncated/Invalid ConnValid from client");
bev.reset();
return;
} else if(selected!="ca" && selected!="anonymous") {
}
}
if(selected!="ca" && selected!="anonymous") {
log_printf(connio, PLVL_DEBUG, "Client selects unadvertised auth \"%s\"", selected.c_str());
auth_complete(this, Status{Status::Error, "Client selects unadvertised auth"});
}
// remainder of segBuf is payload w/ credentials
auth_complete(this, Status{Status::Ok});
}
void ServerConn::handle_AuthZ()
@@ -247,10 +253,10 @@ void ServerConn::bevRead()
}
// a bit verbose :P
sbuf<uint8_t> L(&header[4], 4);
FixedBuf<uint8_t> L(peerBE, header);
uint32_t len = 0;
from_wire(L, len, peerBE);
assert(!L.err);
from_wire(L, len);
assert(L.good());
if(evbuffer_get_length(rx)-8 < len) {
// wait for complete payload
@@ -322,6 +328,9 @@ void ServerConn::bevRead()
// silently drain any unprocessed body (forward compatibility)
if(auto n = evbuffer_get_length(segBuf.get()))
evbuffer_drain(segBuf.get(), n);
// wait for next header
bufferevent_setwatermark(bev.get(), EV_READ, 8, tcp_readahead);
}
}