diff --git a/src/client/pv/pvAccess.h b/src/client/pv/pvAccess.h index 888a4e5..1903612 100644 --- a/src/client/pv/pvAccess.h +++ b/src/client/pv/pvAccess.h @@ -148,6 +148,31 @@ class ChannelPutRequester; class ChannelPutGetRequester; class ChannelRPCRequester; +/** @brief Expose statistics related to network transport + * + * Various sub-classes of ChannelBaseRequester (for servers) or ChannelRequest (for clients) + * may by dynamic_cast<>able to NetStats. + */ +struct epicsShareClass NetStats { + struct Counter { + size_t tx, rx; + + inline Counter() :tx(0u), rx(0u) {} + }; + struct Stats { + std::string transportPeer; + Counter transportBytes; + Counter operationBytes; + bool populated; + + inline Stats() :populated(false) {} + }; + + virtual ~NetStats(); + //! Query current counter values + virtual void stats(Stats& s) const =0; +}; + //! Base for all Requesters (callbacks to client) struct epicsShareClass ChannelBaseRequester : virtual public epics::pvData::Requester { diff --git a/src/client/pvAccess.cpp b/src/client/pvAccess.cpp index 5d2bef3..5e35af6 100644 --- a/src/client/pvAccess.cpp +++ b/src/client/pvAccess.cpp @@ -384,6 +384,8 @@ ChannelProvider::~ChannelProvider() REFTRACE_DECREMENT(num_instances); } +NetStats::~NetStats() {} + size_t ChannelBaseRequester::num_instances; ChannelBaseRequester::ChannelBaseRequester() diff --git a/src/remote/blockingUDPTransport.cpp b/src/remote/blockingUDPTransport.cpp index 6e9c2de..fe53193 100644 --- a/src/remote/blockingUDPTransport.cpp +++ b/src/remote/blockingUDPTransport.cpp @@ -16,6 +16,7 @@ #include #include +#include #include #include @@ -240,6 +241,7 @@ void BlockingUDPTransport::run() { if(likely(bytesRead>=0)) { // successfully got datagram + atomic::add(_totalBytesRecv, bytesRead); bool ignore = false; for(size_t i = 0; i <_ignoredAddresses.size(); i++) { @@ -446,6 +448,7 @@ bool BlockingUDPTransport::send(const char* buffer, size_t length, const osiSock inetAddressToString(address).c_str(), errStr); return false; } + atomic::add(_totalBytesSent, length); return true; } @@ -470,6 +473,7 @@ bool BlockingUDPTransport::send(ByteBuffer* buffer, const osiSockAddr& address) inetAddressToString(address).c_str(), errStr); return false; } + atomic::add(_totalBytesSent, buffer->getLimit()); // all sent buffer->setPosition(buffer->getLimit()); @@ -508,6 +512,7 @@ bool BlockingUDPTransport::send(ByteBuffer* buffer, InetAddressType target) { inetAddressToString(_sendAddresses[i]).c_str(), errStr); allOK = false; } + atomic::add(_totalBytesSent, buffer->getLimit()); } // all sent diff --git a/src/remote/codec.cpp b/src/remote/codec.cpp index 057fee3..c01a338 100644 --- a/src/remote/codec.cpp +++ b/src/remote/codec.cpp @@ -17,6 +17,7 @@ #include #include #include +#include #include #include @@ -60,6 +61,8 @@ namespace pvAccess { size_t Transport::num_instances; Transport::Transport() + :_totalBytesSent(0u) + ,_totalBytesRecv(0u) { REFTRACE_INCREMENT(num_instances); } @@ -92,7 +95,7 @@ AbstractCodec::AbstractCodec( bool blockingProcessQueue): //PROTECTED _readMode(NORMAL), _version(0), _flags(0), _command(0), _payloadSize(0), - _remoteTransportSocketReceiveBufferSize(MAX_TCP_RECV), _totalBytesSent(0), + _remoteTransportSocketReceiveBufferSize(MAX_TCP_RECV), _senderThread(0), _writeMode(PROCESS_SEND_QUEUE), _writeOpReady(false), @@ -405,6 +408,8 @@ bool AbstractCodec::readToBuffer( return false; } } + + atomic::add(_totalBytesRecv, bytesRead); } // set pointers (aka flip) @@ -808,7 +813,7 @@ void AbstractCodec::send(ByteBuffer *buffer) continue; } - _totalBytesSent += bytesSent; + atomic::add(_totalBytesSent, bytesSent); // readjust limit if (bytesToSend == maxBytesToSend) @@ -887,10 +892,16 @@ void AbstractCodec::processSender( try { _lastMessageStartPosition = _sendBuffer.getPosition(); + size_t before = atomic::get(_totalBytesSent) + _sendBuffer.getPosition(); + sender->send(&_sendBuffer, this); // automatic end (to set payload size) endMessage(false); + + size_t after = atomic::get(_totalBytesSent) + _sendBuffer.getPosition(); + + atomic::add(sender->bytesTX, after - before); } catch (connection_closed_exception & ) { throw; diff --git a/src/remote/pv/codec.h b/src/remote/pv/codec.h index a634c1c..39dd099 100644 --- a/src/remote/pv/codec.h +++ b/src/remote/pv/codec.h @@ -260,7 +260,6 @@ protected: int8_t _command; int32_t _payloadSize; // TODO why not size_t? epics::pvData::int32 _remoteTransportSocketReceiveBufferSize; - int64_t _totalBytesSent; //TODO initialize union osiSockAddr _sendTo; epicsThreadId _senderThread; diff --git a/src/remote/pv/remote.h b/src/remote/pv/remote.h index 6521677..3fb6013 100644 --- a/src/remote/pv/remote.h +++ b/src/remote/pv/remote.h @@ -138,6 +138,7 @@ class TransportSender : public Lockable, public fair_queue::ent public: POINTER_DEFINITIONS(TransportSender); + TransportSender() :bytesTX(0u), bytesRX(0u) {} virtual ~TransportSender() {} /** @@ -149,6 +150,9 @@ public: * NOTE: these limitations allow efficient implementation. */ virtual void send(epics::pvData::ByteBuffer* buffer, TransportSendControl* control) = 0; + + size_t bytesTX; + size_t bytesRX; }; class ClientChannelImpl; @@ -264,6 +268,9 @@ public: * @param data the data (any data), can be null. */ virtual void authNZMessage(epics::pvData::PVStructure::shared_pointer const & data) = 0; + + size_t _totalBytesSent; + size_t _totalBytesRecv; }; class Channel; @@ -341,7 +348,7 @@ protected: * A request that expects an response. * Responses identified by its I/O ID. */ -class ResponseRequest { +class ResponseRequest : public TransportSender { public: POINTER_DEFINITIONS(ResponseRequest); diff --git a/src/remoteClient/clientContextImpl.cpp b/src/remoteClient/clientContextImpl.cpp index 13f2f5a..9763239 100644 --- a/src/remoteClient/clientContextImpl.cpp +++ b/src/remoteClient/clientContextImpl.cpp @@ -13,6 +13,7 @@ #include #include #include +#include #include #include @@ -80,7 +81,7 @@ do{requester_type::shared_pointer PTR((WEAK).lock()); if(PTR) (PTR)->message(MSG */ class BaseRequestImpl : public ResponseRequest, - public TransportSender, + public NetStats, public virtual epics::pvAccess::Destroyable { public: @@ -410,6 +411,18 @@ public: } } + virtual void stats(Stats& s) const OVERRIDE FINAL + { + s.populated = true; + s.operationBytes.tx = epics::atomic::get(bytesTX); + s.operationBytes.rx = epics::atomic::get(bytesRX); + Transport::shared_pointer T(m_channel->getTransport()); + if(T) { // must be connected + s.transportPeer = T->getRemoteName(); + s.transportBytes.tx = epics::atomic::get(T->_totalBytesSent); + s.transportBytes.rx = epics::atomic::get(T->_totalBytesRecv); + } + } }; size_t BaseRequestImpl::num_instances; @@ -2500,6 +2513,7 @@ public: ResponseRequest::shared_pointer rr = _context.lock()->getResponseRequest(payloadBuffer->getInt()); if (rr) { + epics::atomic::add(rr->bytesRX, payloadSize); rr->response(transport, version, payloadBuffer); } else { // oh no, we can't complete parsing this message! @@ -2540,6 +2554,7 @@ public: ResponseRequest::shared_pointer rr = context->getResponseRequest(ioid); if (rr) { + epics::atomic::add(rr->bytesRX, payloadSize); rr->response(transport, version, payloadBuffer); } else @@ -2853,6 +2868,7 @@ public: ResponseRequest::shared_pointer rr = _context.lock()->getResponseRequest(ioid); if (rr) { + epics::atomic::add(rr->bytesRX, payloadSize); Requester::shared_pointer requester = rr->getRequester(); if (requester) { requester->message(message, type); @@ -4560,7 +4576,6 @@ size_t InternalClientContextImpl::InternalChannelImpl::num_active; class ChannelGetFieldRequestImpl : public ResponseRequest, - public TransportSender, public epics::pvAccess::Destroyable, public std::tr1::enable_shared_from_this { diff --git a/src/server/baseChannelRequester.cpp b/src/server/baseChannelRequester.cpp index cfe3b43..2e2b8b1 100644 --- a/src/server/baseChannelRequester.cpp +++ b/src/server/baseChannelRequester.cpp @@ -4,6 +4,8 @@ * in file LICENSE that is included with this distribution. */ +#include + #define epicsExportSharedSymbols #include @@ -85,6 +87,16 @@ void BaseChannelRequester::sendFailureMessage(const int8 command, Transport::sha transport->enqueueSendRequest(sender); } +void BaseChannelRequester::stats(Stats &s) const +{ + s.populated = true; + s.operationBytes.tx = atomic::get(bytesTX); + s.operationBytes.rx = atomic::get(bytesRX); + s.transportBytes.tx = atomic::get(_transport->_totalBytesSent); + s.transportBytes.rx = atomic::get(_transport->_totalBytesRecv); + s.transportPeer = _transport->getRemoteName(); +} + BaseChannelRequesterMessageTransportSender::BaseChannelRequesterMessageTransportSender(const pvAccessID ioid, const string message,const epics::pvData::MessageType messageType): _ioid(ioid), _message(message), diff --git a/src/server/pv/baseChannelRequester.h b/src/server/pv/baseChannelRequester.h index 607c54c..0c2bf90 100644 --- a/src/server/pv/baseChannelRequester.h +++ b/src/server/pv/baseChannelRequester.h @@ -18,7 +18,11 @@ namespace pvAccess { class ServerChannel; class ChannelRequest; -class BaseChannelRequester : virtual public epics::pvData::Requester, public Destroyable +class BaseChannelRequester : + virtual public epics::pvData::Requester, + public TransportSender, + public NetStats, + public Destroyable { public: POINTER_DEFINITIONS(BaseChannelRequester); @@ -36,6 +40,8 @@ public: static void message(Transport::shared_pointer const & transport, const pvAccessID ioid, const std::string message, const epics::pvData::MessageType messageType); static void sendFailureMessage(const epics::pvData::int8 command, Transport::shared_pointer const & transport, const pvAccessID ioid, const epics::pvData::int8 qos, const epics::pvData::Status status); + virtual void stats(Stats &s) const OVERRIDE FINAL; + static const epics::pvData::Status okStatus; static const epics::pvData::Status badCIDStatus; static const epics::pvData::Status badIOIDStatus; diff --git a/src/server/pv/responseHandlers.h b/src/server/pv/responseHandlers.h index 652dd21..fae600d 100644 --- a/src/server/pv/responseHandlers.h +++ b/src/server/pv/responseHandlers.h @@ -286,7 +286,6 @@ public: class ServerChannelGetRequesterImpl : public BaseChannelRequester, public ChannelGetRequester, - public TransportSender, public std::tr1::enable_shared_from_this { public: @@ -343,7 +342,6 @@ public: class ServerChannelPutRequesterImpl : public BaseChannelRequester, public ChannelPutRequester, - public TransportSender, public std::tr1::enable_shared_from_this { public: @@ -402,7 +400,6 @@ public: class ServerChannelPutGetRequesterImpl : public BaseChannelRequester, public ChannelPutGetRequester, - public TransportSender, public std::tr1::enable_shared_from_this { public: @@ -472,7 +469,6 @@ public: class ServerMonitorRequesterImpl : public BaseChannelRequester, public MonitorRequester, - public TransportSender, public std::tr1::enable_shared_from_this { public: @@ -538,7 +534,6 @@ public: class ServerChannelArrayRequesterImpl : public BaseChannelRequester, public ChannelArrayRequester, - public TransportSender, public std::tr1::enable_shared_from_this { public: @@ -647,7 +642,6 @@ public: class ServerChannelProcessRequesterImpl : public BaseChannelRequester, public ChannelProcessRequester, - public TransportSender, public std::tr1::enable_shared_from_this { public: @@ -701,7 +695,6 @@ private: class ServerGetFieldRequesterImpl : public BaseChannelRequester, public GetFieldRequester, - public TransportSender, public std::tr1::enable_shared_from_this { public: @@ -766,7 +759,6 @@ public: class ServerChannelRPCRequesterImpl : public BaseChannelRequester, public ChannelRPCRequester, - public TransportSender, public std::tr1::enable_shared_from_this { public: diff --git a/src/server/responseHandlers.cpp b/src/server/responseHandlers.cpp index e115504..4479d60 100644 --- a/src/server/responseHandlers.cpp +++ b/src/server/responseHandlers.cpp @@ -24,6 +24,7 @@ #include #include #include +#include #include #include @@ -1048,6 +1049,7 @@ void ServerGetHandler::handleResponse(osiSockAddr* responseFrom, BaseChannelRequester::sendFailureMessage((int8)CMD_GET, transport, ioid, qosCode, BaseChannelRequester::badIOIDStatus); return; } + atomic::add(request->bytesRX, payloadSize); if (!request->startRequest(qosCode)) { @@ -1284,6 +1286,7 @@ void ServerPutHandler::handleResponse(osiSockAddr* responseFrom, BaseChannelRequester::sendFailureMessage((int8)CMD_PUT, transport, ioid, qosCode, BaseChannelRequester::badIOIDStatus); return; } + atomic::add(request->bytesRX, payloadSize); if (!request->startRequest(qosCode)) { @@ -1523,6 +1526,7 @@ void ServerPutGetHandler::handleResponse(osiSockAddr* responseFrom, BaseChannelRequester::sendFailureMessage((int8)CMD_PUT_GET, transport, ioid, qosCode, BaseChannelRequester::badIOIDStatus); return; } + atomic::add(request->bytesRX, payloadSize); if (!request->startRequest(qosCode)) { @@ -1817,6 +1821,7 @@ void ServerMonitorHandler::handleResponse(osiSockAddr* responseFrom, BaseChannelRequester::sendFailureMessage((int8)CMD_MONITOR, transport, ioid, qosCode, BaseChannelRequester::badIOIDStatus); return; } + atomic::add(request->bytesRX, payloadSize); if (ack) { @@ -2150,6 +2155,7 @@ void ServerArrayHandler::handleResponse(osiSockAddr* responseFrom, BaseChannelRequester::sendFailureMessage((int8)CMD_ARRAY, transport, ioid, qosCode, BaseChannelRequester::badIOIDStatus); return; } + atomic::add(request->bytesRX, payloadSize); if (!request->startRequest(qosCode)) { @@ -2411,6 +2417,7 @@ void ServerDestroyRequestHandler::handleResponse(osiSockAddr* responseFrom, failureResponse(transport, ioid, BaseChannelRequester::badIOIDStatus); return; } + // atomic::add(request->bytesRX, payloadSize); // destroy request->destroy(); @@ -2451,6 +2458,7 @@ void ServerCancelRequestHandler::handleResponse(osiSockAddr* responseFrom, failureResponse(transport, ioid, BaseChannelRequester::badIOIDStatus); return; } + //atomic::add(request->bytesRX, payloadSize); ChannelRequest::shared_pointer cr = dynamic_pointer_cast(request->getOperation()); if (!cr) @@ -2510,6 +2518,7 @@ void ServerProcessHandler::handleResponse(osiSockAddr* responseFrom, BaseChannelRequester::sendFailureMessage((int8)CMD_PROCESS, transport, ioid, qosCode, BaseChannelRequester::badIOIDStatus); return; } + atomic::add(request->bytesRX, payloadSize); if (!request->startRequest(qosCode)) { @@ -2750,6 +2759,7 @@ void ServerRPCHandler::handleResponse(osiSockAddr* responseFrom, BaseChannelRequester::sendFailureMessage((int8)CMD_RPC, transport, ioid, qosCode, BaseChannelRequester::badIOIDStatus); return; } + atomic::add(request->bytesRX, payloadSize); if (!request->startRequest(qosCode)) {