diff --git a/src/client.cpp b/src/client.cpp index e1d02ec..110ea01 100644 --- a/src/client.cpp +++ b/src/client.cpp @@ -88,7 +88,7 @@ Channel::~Channel() to_wire(R, sid); to_wire(R, cid); } - conn->enqueueTxBody(CMD_DESTROY_CHANNEL); + statTx += conn->enqueueTxBody(CMD_DESTROY_CHANNEL); } } diff --git a/src/clientconn.cpp b/src/clientconn.cpp index 35dabd4..ae22791 100644 --- a/src/clientconn.cpp +++ b/src/clientconn.cpp @@ -63,7 +63,7 @@ void Connection::createChannels() to_wire(R, chan->cid); to_wire(R, chan->name); } - enqueueTxBody(CMD_CREATE_CHANNEL); + chan->statTx += enqueueTxBody(CMD_CREATE_CHANNEL); creatingByCID[chan->cid] = chan; chan->state = Channel::Creating; @@ -278,6 +278,7 @@ void Connection::handle_CONNECTION_VALIDATED() void Connection::handle_CREATE_CHANNEL() { + auto rxlen = 8u + evbuffer_get_length(segBuf.get()); EvInBuf M(peerBE, segBuf.get(), 16); uint32_t cid, sid; @@ -320,6 +321,7 @@ void Connection::handle_CREATE_CHANNEL() } creatingByCID.erase(it); } + chan->statRx += rxlen; if(!sts.isSuccess()) { // server refuses to create a channel, but presumably responded positivly to search @@ -409,6 +411,8 @@ void Connection::tickEcho() // maybe help reduce latency bufferevent_flush(bev.get(), EV_WRITE, BEV_FLUSH); + + statTx += 8; } void Connection::tickEchoS(evutil_socket_t fd, short evt, void *raw) diff --git a/src/clientget.cpp b/src/clientget.cpp index 26797ea..41c3ebb 100644 --- a/src/clientget.cpp +++ b/src/clientget.cpp @@ -287,7 +287,7 @@ struct GPROp : public OperationBase throw std::logic_error("Invalid state in GPR sendReply()"); } } - chan->conn->enqueueTxBody(state==GPROp::Done ? CMD_DESTROY_REQUEST : (pva_app_msg_t)op); + chan->statTx += chan->conn->enqueueTxBody(state==GPROp::Done ? CMD_DESTROY_REQUEST : (pva_app_msg_t)op); if(state==GPROp::Done) { // CMD_DESTROY_REQUEST is not acknowledged (sigh...) @@ -320,7 +320,7 @@ struct GPROp : public OperationBase to_wire(R, Value::Helper::desc(pvRequest)); to_wire_full(R, pvRequest); } - conn->enqueueTxBody(pva_app_msg_t(uint8_t(op))); + chan->statTx += conn->enqueueTxBody(pva_app_msg_t(uint8_t(op))); log_debug_printf(io, "Server %s channel '%s' op%02x INIT\n", conn->peerName.c_str(), chan->name.c_str(), op); @@ -359,6 +359,7 @@ struct GPROp : public OperationBase void Connection::handle_GPR(pva_app_msg_t cmd) { + auto rxlen = 8u + evbuffer_get_length(segBuf.get()); EvInBuf M(peerBE, segBuf.get(), 16); uint32_t ioid; @@ -466,6 +467,8 @@ void Connection::handle_GPR(pva_app_msg_t cmd) return; } + gpr->chan->statRx += rxlen; + // advance operation state decltype (gpr->state) prev = gpr->state; diff --git a/src/clientimpl.h b/src/clientimpl.h index eff8d6a..b0faad5 100644 --- a/src/clientimpl.h +++ b/src/clientimpl.h @@ -176,6 +176,8 @@ struct Channel { std::list connectors; + size_t statTx{}, statRx{}; + INST_COUNTER(Channel); Channel(const std::shared_ptr& context, const std::string& name, uint32_t cid); diff --git a/src/clientintrospect.cpp b/src/clientintrospect.cpp index ee61cd5..5d46c12 100644 --- a/src/clientintrospect.cpp +++ b/src/clientintrospect.cpp @@ -88,7 +88,7 @@ struct InfoOp : public OperationBase // sub-field, which no one knows how to use... to_wire(R, ""); } - conn->enqueueTxBody(CMD_GET_FIELD); + chan->statTx += conn->enqueueTxBody(CMD_GET_FIELD); log_debug_printf(io, "Server %s channel '%s' GET_INFO\n", conn->peerName.c_str(), chan->name.c_str()); @@ -111,6 +111,7 @@ struct InfoOp : public OperationBase void Connection::handle_GET_FIELD() { + auto rxlen = 8u + evbuffer_get_length(segBuf.get()); EvInBuf M(peerBE, segBuf.get(), 16); uint32_t ioid=0u; @@ -144,6 +145,8 @@ void Connection::handle_GET_FIELD() info->chan->opByIOID.erase(ioid); } + info->chan->statRx += rxlen; + if(info->state!=InfoOp::Waiting) { log_warn_printf(io, "Server %s ignore second reply to GET_FIELD\n", peerName.c_str()); return; diff --git a/src/clientmon.cpp b/src/clientmon.cpp index 0730ac7..c30f50f 100644 --- a/src/clientmon.cpp +++ b/src/clientmon.cpp @@ -121,7 +121,7 @@ struct SubscriptionImpl : public OperationBase, public Subscription to_wire(R, ioid); to_wire(R, subcmd); } - conn->enqueueTxBody(CMD_MONITOR); + chan->statTx += conn->enqueueTxBody(CMD_MONITOR); state = p ? Idle : Running; } @@ -240,7 +240,7 @@ struct SubscriptionImpl : public OperationBase, public Subscription if(pipeline) to_wire(R, queueSize); } - conn->enqueueTxBody(CMD_MONITOR); + chan->statTx += conn->enqueueTxBody(CMD_MONITOR); log_debug_printf(io, "Server %s channel '%s' monitor INIT%s\n", conn->peerName.c_str(), chan->name.c_str(), pipeline?" pipeline":""); @@ -325,7 +325,7 @@ struct SubscriptionImpl : public OperationBase, public Subscription to_wire(R, uint8_t(0x80)); to_wire(R, uint32_t(unack)); } - conn->enqueueTxBody(CMD_MONITOR); + chan->statTx += conn->enqueueTxBody(CMD_MONITOR); window += unack; unack = 0u; @@ -345,6 +345,7 @@ struct SubscriptionImpl : public OperationBase, public Subscription void Connection::handle_MONITOR() { + auto rxlen = 8u + evbuffer_get_length(segBuf.get()); EvInBuf M(peerBE, segBuf.get(), 16); uint32_t ioid=0; @@ -442,6 +443,8 @@ void Connection::handle_MONITOR() return; } + mon->chan->statRx += rxlen; + Entry update; if(!sts.isSuccess()) { diff --git a/src/conn.cpp b/src/conn.cpp index 8435e86..5ca5189 100644 --- a/src/conn.cpp +++ b/src/conn.cpp @@ -39,15 +39,18 @@ const char* ConnBase::peerLabel() const return isClient ? "Server" : "Client"; } -void ConnBase::enqueueTxBody(pva_app_msg_t cmd) +size_t ConnBase::enqueueTxBody(pva_app_msg_t cmd) { + auto blen = evbuffer_get_length(txBody.get()); auto tx = bufferevent_get_output(bev.get()); to_evbuf(tx, Header{cmd, uint8_t(isClient ? 0u : pva_flags::Server), - uint32_t(evbuffer_get_length(txBody.get()))}, + uint32_t(blen)}, hostBE); auto err = evbuffer_add_buffer(tx, txBody.get()); assert(!err); + statTx += 8u + blen; + return 8u + blen; } #define CASE(Op) void ConnBase::handle_##Op() {} @@ -119,6 +122,7 @@ void ConnBase::bevRead() if(header[2]&pva_flags::Control) { // Control messages are not actually useful evbuffer_drain(rx, 8); + statRx += 8u; continue; } // application message @@ -147,6 +151,7 @@ void ConnBase::bevRead() unsigned n = evbuffer_remove_buffer(rx, segBuf.get(), len); assert(n==len); // we know rx buf contains the entire body } + statRx += 8u + len; // so far we do not use segmentation to support incremental processing // of long messages. We instead accumulate all segments of a message diff --git a/src/conn.h b/src/conn.h index 0007b50..85dce56 100644 --- a/src/conn.h +++ b/src/conn.h @@ -33,6 +33,8 @@ struct ConnBase uint8_t segCmd; evbuf segBuf, txBody; + size_t statTx{}, statRx{}; + ConnBase(bool isClient, bufferevent* bev, const SockAddr& peerAddr); ConnBase(const ConnBase&) = delete; ConnBase& operator=(const ConnBase&) = delete; @@ -40,7 +42,7 @@ struct ConnBase const char* peerLabel() const; - void enqueueTxBody(pva_app_msg_t cmd); + size_t enqueueTxBody(pva_app_msg_t cmd); protected: #define CASE(Op) virtual void handle_##Op(); diff --git a/src/serverchan.cpp b/src/serverchan.cpp index ed4197c..b05ad60 100644 --- a/src/serverchan.cpp +++ b/src/serverchan.cpp @@ -165,6 +165,8 @@ void ServerChannelControl::close() to_wire(R, Header{CMD_DESTROY_CHANNEL, pva_flags::Server, 8}); to_wire(R, ch->sid); to_wire(R, ch->cid); + conn->statTx += 16u; + ch->statTx += 16u; } ServerChannel_shutdown(ch); }); @@ -389,6 +391,9 @@ void ServerConn::handle_DESTROY_CHANNEL() if(!R.good()) bev.reset(); + + statTx += 16u; + // don't bother to increment for channel } } diff --git a/src/serverconn.cpp b/src/serverconn.cpp index 95b8f49..d6b1ce3 100644 --- a/src/serverconn.cpp +++ b/src/serverconn.cpp @@ -94,6 +94,8 @@ ServerConn::ServerConn(ServIface* iface, evutil_socket_t sock, struct sockaddr * if(evbuffer_add(tx, buf.data(), M.save()-buf.data())) throw std::bad_alloc(); + + statTx += M.save()-buf.data(); } if(bufferevent_enable(bev.get(), EV_READ|EV_WRITE)) @@ -128,6 +130,8 @@ void ServerConn::handle_ECHO() // maybe help reduce latency bufferevent_flush(bev.get(), EV_WRITE, BEV_FLUSH); + + statTx += 8u + len; } static diff --git a/src/serverconn.h b/src/serverconn.h index 74977b3..6bfd66d 100644 --- a/src/serverconn.h +++ b/src/serverconn.h @@ -85,6 +85,8 @@ struct ServerChan Destroy, // DESTROY_CHANNEL request received and/or reply sent } state; + size_t statTx{}, statRx{}; + std::function&&)> onOp; std::function&&, Value&&)> onRPC; std::function&&)> onSubscribe; diff --git a/src/serverget.cpp b/src/serverget.cpp index 0bda837..eff493b 100644 --- a/src/serverget.cpp +++ b/src/serverget.cpp @@ -105,7 +105,7 @@ struct ServerGPR : public ServerOp assert(R.good()); } - conn->enqueueTxBody(cmd); + ch->statTx += conn->enqueueTxBody(cmd); if(state == ServerOp::Dead) { ch->opByIOID.erase(ioid); @@ -330,6 +330,7 @@ struct ServerGPRExec : public server::ExecOp void ServerConn::handle_GPR(pva_app_msg_t cmd) { + auto rxlen = 8u + evbuffer_get_length(segBuf.get()); EvInBuf M(peerBE, segBuf.get(), 16); uint32_t sid = -1, ioid = -1; @@ -366,6 +367,7 @@ void ServerConn::handle_GPR(pva_app_msg_t cmd) bev.reset(); return; } + chan->statRx += rxlen; auto op(std::make_shared(chan, ioid)); op->cmd = cmd; @@ -442,6 +444,8 @@ void ServerConn::handle_GPR(pva_app_msg_t cmd) if(!chan) throw std::logic_error("live op on dead channel"); + chan->statRx += rxlen; + if(op->state==ServerOp::Idle) { // all set diff --git a/src/serverintrospect.cpp b/src/serverintrospect.cpp index b2ddb72..98a758e 100644 --- a/src/serverintrospect.cpp +++ b/src/serverintrospect.cpp @@ -42,7 +42,7 @@ struct ServerIntrospect : public ServerOp to_wire(R, type); } - conn->enqueueTxBody(CMD_GET_FIELD); + ch->statTx += conn->enqueueTxBody(CMD_GET_FIELD); state = ServerOp::Dead; conn->opByIOID.erase(ioid); @@ -126,6 +126,7 @@ void ServerConn::handle_GET_FIELD() { // aka. GetField + auto rxlen = 8u + evbuffer_get_length(segBuf.get()); EvInBuf M(peerBE, segBuf.get(), 16); uint32_t sid = -1, ioid = -1; @@ -144,6 +145,7 @@ void ServerConn::handle_GET_FIELD() peerName.c_str(), unsigned(sid), unsigned(ioid)); return; } + chan->statRx += rxlen; auto op(std::make_shared(chan, ioid)); std::unique_ptr ctrl(new ServerIntrospectControl(this, chan.get(), iface->server->internal_self, op)); diff --git a/src/servermon.cpp b/src/servermon.cpp index c35f4a3..059480a 100644 --- a/src/servermon.cpp +++ b/src/servermon.cpp @@ -147,7 +147,7 @@ struct MonitorOp : public ServerOp, } } - conn->enqueueTxBody(pva_app_msg_t::CMD_MONITOR); + ch->statTx += conn->enqueueTxBody(pva_app_msg_t::CMD_MONITOR); if(state == ServerOp::Dead) { ch->opByIOID.erase(ioid); @@ -417,6 +417,7 @@ ServerMonitorControl::ServerMonitorControl(ServerMonitorSetup* setup, void ServerConn::handle_MONITOR() { + auto rxlen = 8u + evbuffer_get_length(segBuf.get()); EvInBuf M(peerBE, segBuf.get(), 16); uint32_t sid = -1, ioid = -1; @@ -451,6 +452,7 @@ void ServerConn::handle_MONITOR() bev.reset(); return; } + chan->statRx += rxlen; auto op(std::make_shared(chan, ioid)); op->window = nack; @@ -521,6 +523,7 @@ void ServerConn::handle_MONITOR() bev.reset(); return; } + chan->statRx += rxlen; // pvAccessCPP won't accept ack and start/stop in the same message, // although it will accept destroy in any !INIT message.