From c0126d9a1ca78cae2c4b1938aba811f10f682776 Mon Sep 17 00:00:00 2001 From: Michael Davidsaver Date: Wed, 23 May 2018 15:29:32 -0700 Subject: [PATCH] server: pipeline=true windowing Implement flow control windows in the server itself, so that this applies to all Monitor. Then the reportRemoteQueueStatus() callback is only needed to deliver the initial window update. This is done by not calling Monitor::release() immediately when pipeline=true. Rather, hold onto MonitorElement s until an ack is received. Also addresses a race with clearing _channelMonitor vs. having send() queued at/after destroy(). --- src/server/pv/responseHandlers.h | 13 +++- src/server/responseHandlers.cpp | 123 ++++++++++++++++++++++++------- 2 files changed, 110 insertions(+), 26 deletions(-) diff --git a/src/server/pv/responseHandlers.h b/src/server/pv/responseHandlers.h index a302cc7..291ccc2 100644 --- a/src/server/pv/responseHandlers.h +++ b/src/server/pv/responseHandlers.h @@ -7,6 +7,8 @@ #ifndef RESPONSEHANDLERS_H_ #define RESPONSEHANDLERS_H_ +#include + #include #include @@ -470,7 +472,7 @@ protected: Transport::shared_pointer const & transport); void activate(epics::pvData::PVStructure::shared_pointer const & pvRequest); public: - static MonitorRequester::shared_pointer create(ServerContextImpl::shared_pointer const & context, + static shared_pointer create(ServerContextImpl::shared_pointer const & context, std::tr1::shared_ptr const & channel, const pvAccessID ioid, Transport::shared_pointer const & transport,epics::pvData::PVStructure::shared_pointer const & pvRequest); virtual ~ServerMonitorRequesterImpl() {} @@ -484,12 +486,21 @@ public: Monitor::shared_pointer getChannelMonitor(); virtual void send(epics::pvData::ByteBuffer* buffer, TransportSendControl* control) OVERRIDE FINAL; + void ack(size_t cnt); private: // Note: this forms a reference loop, which is broken in destroy() Monitor::shared_pointer _channelMonitor; epics::pvData::StructureConstPtr _structure; epics::pvData::Status _status; + // when _pipeline==true + // _window_open + _window_closed.size() are together the congestion control window. + // _window_open are the number of elements which we can send w/o further ack's + size_t _window_open; + // The elements we have sent, but have not been acknowledged + typedef std::list window_t; + window_t _window_closed; bool _unlisten; + bool _pipeline; // const after activate() }; diff --git a/src/server/responseHandlers.cpp b/src/server/responseHandlers.cpp index d3c5549..1b7358f 100644 --- a/src/server/responseHandlers.cpp +++ b/src/server/responseHandlers.cpp @@ -1889,7 +1889,7 @@ void ServerMonitorHandler::handleResponse(osiSockAddr* responseFrom, } // create... - ServerMonitorRequesterImpl::create(_context, channel, ioid, transport, pvRequest); + ServerMonitorRequesterImpl::shared_pointer request(ServerMonitorRequesterImpl::create(_context, channel, ioid, transport, pvRequest)); // pipelining monitor (i.e. w/ flow control) const bool ack = (QOS_GET_PUT & qosCode) != 0; @@ -1897,9 +1897,8 @@ void ServerMonitorHandler::handleResponse(osiSockAddr* responseFrom, { transport->ensureData(4); int32 nfree = payloadBuffer->getInt(); - ServerMonitorRequesterImpl::shared_pointer request = static_pointer_cast(channel->getRequest(ioid)); - request->getChannelMonitor()->reportRemoteQueueStatus(nfree); + request->ack(nfree); } } @@ -1921,7 +1920,7 @@ void ServerMonitorHandler::handleResponse(osiSockAddr* responseFrom, { transport->ensureData(4); int32 nfree = payloadBuffer->getInt(); - request->getChannelMonitor()->reportRemoteQueueStatus(nfree); + request->ack(nfree); return; // note: not possible to ack and destroy } @@ -1964,25 +1963,36 @@ void ServerMonitorHandler::handleResponse(osiSockAddr* responseFrom, } ServerMonitorRequesterImpl::ServerMonitorRequesterImpl( - ServerContextImpl::shared_pointer const & context, ServerChannel::shared_pointer const & channel, - const pvAccessID ioid, Transport::shared_pointer const & transport): - BaseChannelRequester(context, channel, ioid, transport), _channelMonitor(), _structure(), _unlisten(false) -{ -} + ServerContextImpl::shared_pointer const & context, + ServerChannel::shared_pointer const & channel, + const pvAccessID ioid, Transport::shared_pointer const & transport) + :BaseChannelRequester(context, channel, ioid, transport) + ,_window_open(0u) + ,_unlisten(false) + ,_pipeline(false) +{} -MonitorRequester::shared_pointer ServerMonitorRequesterImpl::create( +ServerMonitorRequesterImpl::shared_pointer ServerMonitorRequesterImpl::create( ServerContextImpl::shared_pointer const & context, ServerChannel::shared_pointer const & channel, const pvAccessID ioid, Transport::shared_pointer const & transport,PVStructure::shared_pointer const & pvRequest) { - // TODO use std::make_shared std::tr1::shared_ptr tp(new ServerMonitorRequesterImpl(context, channel, ioid, transport)); - MonitorRequester::shared_pointer thisPointer = tp; - static_cast(thisPointer.get())->activate(pvRequest); - return thisPointer; + tp->activate(pvRequest); + return tp; } void ServerMonitorRequesterImpl::activate(PVStructure::shared_pointer const & pvRequest) { + epics::pvData::PVScalar::const_shared_pointer O(pvRequest->getSubField("record._options.pipeline")); + if(O) { + try{ + _pipeline = O->getAs(); + }catch(std::exception& e){ + std::ostringstream strm; + strm<<"Ignoring invalid pipeline= : "<unregisterRequest(_ioid); @@ -2053,10 +2064,11 @@ void ServerMonitorRequesterImpl::destroy() // asCheck _channel->getChannelSecuritySession()->release(_ioid); - if (_channelMonitor) { - _channelMonitor.reset(); - } + window.swap(_window_closed); + + monitor.swap(_channelMonitor); } + window.clear(); if(monitor) { monitor->destroy(); } @@ -2093,7 +2105,7 @@ void ServerMonitorRequesterImpl::send(ByteBuffer* buffer, TransportSendControl* } else { - Monitor::shared_pointer monitor(_channelMonitor); + Monitor::shared_pointer monitor(getChannelMonitor()); if (!monitor) return; @@ -2107,7 +2119,7 @@ void ServerMonitorRequesterImpl::send(ByteBuffer* buffer, TransportSendControl* buffer->putByte((int8)request); // changedBitSet and data, if not notify only (i.e. queueSize == -1) - BitSet::shared_pointer changedBitSet = element->changedBitSet; + const BitSet::shared_pointer& changedBitSet = element->changedBitSet; if (changedBitSet) { changedBitSet->serialize(buffer, control); @@ -2117,7 +2129,23 @@ void ServerMonitorRequesterImpl::send(ByteBuffer* buffer, TransportSendControl* element->overrunBitSet->serialize(buffer, control); } - element.reset(); + { + Lock guard(_mutex); + if(!_pipeline) { + } else if(_window_open==0) { + if(_pipeline) { + // Our logic has a problem, but try to continue. + message("Monitor Logic Error: send outside of window", epics::pvData::warningMessage); + LOG(logLevelError, "Monitor Logic Error: send outside of window %zu", _window_closed.size()); + } + + } else { + _window_closed.push_back(element.letGo()); + _window_open--; + } + } + + element.reset(); // calls Monitor::release() if not swap()'d // TODO if we try to proces several monitors at once, then fairness suffers // TODO compbine several monitors into one message (reduces payload) @@ -2128,10 +2156,18 @@ void ServerMonitorRequesterImpl::send(ByteBuffer* buffer, TransportSendControl* { // TODO CAS bool unlisten; - Lock guard(_mutex); - unlisten = _unlisten; - _unlisten = false; - guard.unlock(); + window_t window; + { + Lock guard(_mutex); + unlisten = _unlisten; + _unlisten = false; + if(unlisten) { + window.swap(_window_closed); + _window_open = 0u; + } + } + + window.clear(); // calls Monitor::release() if (unlisten) { @@ -2145,6 +2181,43 @@ void ServerMonitorRequesterImpl::send(ByteBuffer* buffer, TransportSendControl* } } +void ServerMonitorRequesterImpl::ack(size_t cnt) +{ + typedef std::vector acking_t; + acking_t acking; + Monitor::shared_pointer mon; + { + Lock guard(_mutex); + + // cnt will be larger if this is the initial window update, + // or if the window is being enlarged. + size_t nack = std::min(cnt, _window_closed.size()); + + _window_open += cnt; + + window_t::iterator it, end(_window_closed.begin()); + std::advance(end, nack); + + acking.resize(nack); + + size_t i; + for(i=0, it=_window_closed.begin(); irelease(*it); + } + + mon->reportRemoteQueueStatus(cnt); +} + /****************************************************************************************/ void ServerArrayHandler::handleResponse(osiSockAddr* responseFrom, Transport::shared_pointer const & transport, int8 version, int8 command,