diff --git a/src/server/pv/responseHandlers.h b/src/server/pv/responseHandlers.h index a302cc7..291ccc2 100644 --- a/src/server/pv/responseHandlers.h +++ b/src/server/pv/responseHandlers.h @@ -7,6 +7,8 @@ #ifndef RESPONSEHANDLERS_H_ #define RESPONSEHANDLERS_H_ +#include + #include #include @@ -470,7 +472,7 @@ protected: Transport::shared_pointer const & transport); void activate(epics::pvData::PVStructure::shared_pointer const & pvRequest); public: - static MonitorRequester::shared_pointer create(ServerContextImpl::shared_pointer const & context, + static shared_pointer create(ServerContextImpl::shared_pointer const & context, std::tr1::shared_ptr const & channel, const pvAccessID ioid, Transport::shared_pointer const & transport,epics::pvData::PVStructure::shared_pointer const & pvRequest); virtual ~ServerMonitorRequesterImpl() {} @@ -484,12 +486,21 @@ public: Monitor::shared_pointer getChannelMonitor(); virtual void send(epics::pvData::ByteBuffer* buffer, TransportSendControl* control) OVERRIDE FINAL; + void ack(size_t cnt); private: // Note: this forms a reference loop, which is broken in destroy() Monitor::shared_pointer _channelMonitor; epics::pvData::StructureConstPtr _structure; epics::pvData::Status _status; + // when _pipeline==true + // _window_open + _window_closed.size() are together the congestion control window. + // _window_open are the number of elements which we can send w/o further ack's + size_t _window_open; + // The elements we have sent, but have not been acknowledged + typedef std::list window_t; + window_t _window_closed; bool _unlisten; + bool _pipeline; // const after activate() }; diff --git a/src/server/responseHandlers.cpp b/src/server/responseHandlers.cpp index d3c5549..1b7358f 100644 --- a/src/server/responseHandlers.cpp +++ b/src/server/responseHandlers.cpp @@ -1889,7 +1889,7 @@ void ServerMonitorHandler::handleResponse(osiSockAddr* responseFrom, } // create... - ServerMonitorRequesterImpl::create(_context, channel, ioid, transport, pvRequest); + ServerMonitorRequesterImpl::shared_pointer request(ServerMonitorRequesterImpl::create(_context, channel, ioid, transport, pvRequest)); // pipelining monitor (i.e. w/ flow control) const bool ack = (QOS_GET_PUT & qosCode) != 0; @@ -1897,9 +1897,8 @@ void ServerMonitorHandler::handleResponse(osiSockAddr* responseFrom, { transport->ensureData(4); int32 nfree = payloadBuffer->getInt(); - ServerMonitorRequesterImpl::shared_pointer request = static_pointer_cast(channel->getRequest(ioid)); - request->getChannelMonitor()->reportRemoteQueueStatus(nfree); + request->ack(nfree); } } @@ -1921,7 +1920,7 @@ void ServerMonitorHandler::handleResponse(osiSockAddr* responseFrom, { transport->ensureData(4); int32 nfree = payloadBuffer->getInt(); - request->getChannelMonitor()->reportRemoteQueueStatus(nfree); + request->ack(nfree); return; // note: not possible to ack and destroy } @@ -1964,25 +1963,36 @@ void ServerMonitorHandler::handleResponse(osiSockAddr* responseFrom, } ServerMonitorRequesterImpl::ServerMonitorRequesterImpl( - ServerContextImpl::shared_pointer const & context, ServerChannel::shared_pointer const & channel, - const pvAccessID ioid, Transport::shared_pointer const & transport): - BaseChannelRequester(context, channel, ioid, transport), _channelMonitor(), _structure(), _unlisten(false) -{ -} + ServerContextImpl::shared_pointer const & context, + ServerChannel::shared_pointer const & channel, + const pvAccessID ioid, Transport::shared_pointer const & transport) + :BaseChannelRequester(context, channel, ioid, transport) + ,_window_open(0u) + ,_unlisten(false) + ,_pipeline(false) +{} -MonitorRequester::shared_pointer ServerMonitorRequesterImpl::create( +ServerMonitorRequesterImpl::shared_pointer ServerMonitorRequesterImpl::create( ServerContextImpl::shared_pointer const & context, ServerChannel::shared_pointer const & channel, const pvAccessID ioid, Transport::shared_pointer const & transport,PVStructure::shared_pointer const & pvRequest) { - // TODO use std::make_shared std::tr1::shared_ptr tp(new ServerMonitorRequesterImpl(context, channel, ioid, transport)); - MonitorRequester::shared_pointer thisPointer = tp; - static_cast(thisPointer.get())->activate(pvRequest); - return thisPointer; + tp->activate(pvRequest); + return tp; } void ServerMonitorRequesterImpl::activate(PVStructure::shared_pointer const & pvRequest) { + epics::pvData::PVScalar::const_shared_pointer O(pvRequest->getSubField("record._options.pipeline")); + if(O) { + try{ + _pipeline = O->getAs(); + }catch(std::exception& e){ + std::ostringstream strm; + strm<<"Ignoring invalid pipeline= : "<unregisterRequest(_ioid); @@ -2053,10 +2064,11 @@ void ServerMonitorRequesterImpl::destroy() // asCheck _channel->getChannelSecuritySession()->release(_ioid); - if (_channelMonitor) { - _channelMonitor.reset(); - } + window.swap(_window_closed); + + monitor.swap(_channelMonitor); } + window.clear(); if(monitor) { monitor->destroy(); } @@ -2093,7 +2105,7 @@ void ServerMonitorRequesterImpl::send(ByteBuffer* buffer, TransportSendControl* } else { - Monitor::shared_pointer monitor(_channelMonitor); + Monitor::shared_pointer monitor(getChannelMonitor()); if (!monitor) return; @@ -2107,7 +2119,7 @@ void ServerMonitorRequesterImpl::send(ByteBuffer* buffer, TransportSendControl* buffer->putByte((int8)request); // changedBitSet and data, if not notify only (i.e. queueSize == -1) - BitSet::shared_pointer changedBitSet = element->changedBitSet; + const BitSet::shared_pointer& changedBitSet = element->changedBitSet; if (changedBitSet) { changedBitSet->serialize(buffer, control); @@ -2117,7 +2129,23 @@ void ServerMonitorRequesterImpl::send(ByteBuffer* buffer, TransportSendControl* element->overrunBitSet->serialize(buffer, control); } - element.reset(); + { + Lock guard(_mutex); + if(!_pipeline) { + } else if(_window_open==0) { + if(_pipeline) { + // Our logic has a problem, but try to continue. + message("Monitor Logic Error: send outside of window", epics::pvData::warningMessage); + LOG(logLevelError, "Monitor Logic Error: send outside of window %zu", _window_closed.size()); + } + + } else { + _window_closed.push_back(element.letGo()); + _window_open--; + } + } + + element.reset(); // calls Monitor::release() if not swap()'d // TODO if we try to proces several monitors at once, then fairness suffers // TODO compbine several monitors into one message (reduces payload) @@ -2128,10 +2156,18 @@ void ServerMonitorRequesterImpl::send(ByteBuffer* buffer, TransportSendControl* { // TODO CAS bool unlisten; - Lock guard(_mutex); - unlisten = _unlisten; - _unlisten = false; - guard.unlock(); + window_t window; + { + Lock guard(_mutex); + unlisten = _unlisten; + _unlisten = false; + if(unlisten) { + window.swap(_window_closed); + _window_open = 0u; + } + } + + window.clear(); // calls Monitor::release() if (unlisten) { @@ -2145,6 +2181,43 @@ void ServerMonitorRequesterImpl::send(ByteBuffer* buffer, TransportSendControl* } } +void ServerMonitorRequesterImpl::ack(size_t cnt) +{ + typedef std::vector acking_t; + acking_t acking; + Monitor::shared_pointer mon; + { + Lock guard(_mutex); + + // cnt will be larger if this is the initial window update, + // or if the window is being enlarged. + size_t nack = std::min(cnt, _window_closed.size()); + + _window_open += cnt; + + window_t::iterator it, end(_window_closed.begin()); + std::advance(end, nack); + + acking.resize(nack); + + size_t i; + for(i=0, it=_window_closed.begin(); irelease(*it); + } + + mon->reportRemoteQueueStatus(cnt); +} + /****************************************************************************************/ void ServerArrayHandler::handleResponse(osiSockAddr* responseFrom, Transport::shared_pointer const & transport, int8 version, int8 command,