server: pipeline=true windowing

Implement flow control windows in the server itself,
so that this applies to all Monitor.

Then the reportRemoteQueueStatus() callback is only needed
to deliver the initial window update.

This is done by not calling Monitor::release() immediately
when pipeline=true.  Rather, hold onto MonitorElement s
until an ack is received.

Also addresses a race with clearing _channelMonitor
vs. having send() queued at/after destroy().
This commit is contained in:
Michael Davidsaver
2018-05-23 15:29:32 -07:00
parent a413f8f9db
commit c0126d9a1c
2 changed files with 110 additions and 26 deletions
+12 -1
View File
@@ -7,6 +7,8 @@
#ifndef RESPONSEHANDLERS_H_
#define RESPONSEHANDLERS_H_
#include <list>
#include <pv/timer.h>
#include <pv/serverContext.h>
@@ -470,7 +472,7 @@ protected:
Transport::shared_pointer const & transport);
void activate(epics::pvData::PVStructure::shared_pointer const & pvRequest);
public:
static MonitorRequester::shared_pointer create(ServerContextImpl::shared_pointer const & context,
static shared_pointer create(ServerContextImpl::shared_pointer const & context,
std::tr1::shared_ptr<ServerChannel> const & channel, const pvAccessID ioid,
Transport::shared_pointer const & transport,epics::pvData::PVStructure::shared_pointer const & pvRequest);
virtual ~ServerMonitorRequesterImpl() {}
@@ -484,12 +486,21 @@ public:
Monitor::shared_pointer getChannelMonitor();
virtual void send(epics::pvData::ByteBuffer* buffer, TransportSendControl* control) OVERRIDE FINAL;
void ack(size_t cnt);
private:
// Note: this forms a reference loop, which is broken in destroy()
Monitor::shared_pointer _channelMonitor;
epics::pvData::StructureConstPtr _structure;
epics::pvData::Status _status;
// when _pipeline==true
// _window_open + _window_closed.size() are together the congestion control window.
// _window_open are the number of elements which we can send w/o further ack's
size_t _window_open;
// The elements we have sent, but have not been acknowledged
typedef std::list<epics::pvData::MonitorElementPtr> window_t;
window_t _window_closed;
bool _unlisten;
bool _pipeline; // const after activate()
};
+98 -25
View File
@@ -1889,7 +1889,7 @@ void ServerMonitorHandler::handleResponse(osiSockAddr* responseFrom,
}
// create...
ServerMonitorRequesterImpl::create(_context, channel, ioid, transport, pvRequest);
ServerMonitorRequesterImpl::shared_pointer request(ServerMonitorRequesterImpl::create(_context, channel, ioid, transport, pvRequest));
// pipelining monitor (i.e. w/ flow control)
const bool ack = (QOS_GET_PUT & qosCode) != 0;
@@ -1897,9 +1897,8 @@ void ServerMonitorHandler::handleResponse(osiSockAddr* responseFrom,
{
transport->ensureData(4);
int32 nfree = payloadBuffer->getInt();
ServerMonitorRequesterImpl::shared_pointer request = static_pointer_cast<ServerMonitorRequesterImpl>(channel->getRequest(ioid));
request->getChannelMonitor()->reportRemoteQueueStatus(nfree);
request->ack(nfree);
}
}
@@ -1921,7 +1920,7 @@ void ServerMonitorHandler::handleResponse(osiSockAddr* responseFrom,
{
transport->ensureData(4);
int32 nfree = payloadBuffer->getInt();
request->getChannelMonitor()->reportRemoteQueueStatus(nfree);
request->ack(nfree);
return;
// note: not possible to ack and destroy
}
@@ -1964,25 +1963,36 @@ void ServerMonitorHandler::handleResponse(osiSockAddr* responseFrom,
}
ServerMonitorRequesterImpl::ServerMonitorRequesterImpl(
ServerContextImpl::shared_pointer const & context, ServerChannel::shared_pointer const & channel,
const pvAccessID ioid, Transport::shared_pointer const & transport):
BaseChannelRequester(context, channel, ioid, transport), _channelMonitor(), _structure(), _unlisten(false)
{
}
ServerContextImpl::shared_pointer const & context,
ServerChannel::shared_pointer const & channel,
const pvAccessID ioid, Transport::shared_pointer const & transport)
:BaseChannelRequester(context, channel, ioid, transport)
,_window_open(0u)
,_unlisten(false)
,_pipeline(false)
{}
MonitorRequester::shared_pointer ServerMonitorRequesterImpl::create(
ServerMonitorRequesterImpl::shared_pointer ServerMonitorRequesterImpl::create(
ServerContextImpl::shared_pointer const & context, ServerChannel::shared_pointer const & channel,
const pvAccessID ioid, Transport::shared_pointer const & transport,PVStructure::shared_pointer const & pvRequest)
{
// TODO use std::make_shared
std::tr1::shared_ptr<ServerMonitorRequesterImpl> tp(new ServerMonitorRequesterImpl(context, channel, ioid, transport));
MonitorRequester::shared_pointer thisPointer = tp;
static_cast<ServerMonitorRequesterImpl*>(thisPointer.get())->activate(pvRequest);
return thisPointer;
tp->activate(pvRequest);
return tp;
}
void ServerMonitorRequesterImpl::activate(PVStructure::shared_pointer const & pvRequest)
{
epics::pvData::PVScalar::const_shared_pointer O(pvRequest->getSubField<epics::pvData::PVScalar>("record._options.pipeline"));
if(O) {
try{
_pipeline = O->getAs<epics::pvData::boolean>();
}catch(std::exception& e){
std::ostringstream strm;
strm<<"Ignoring invalid pipeline= : "<<e.what();
message(strm.str(), epics::pvData::errorMessage);
}
}
startRequest(QOS_INIT);
MonitorRequester::shared_pointer thisPointer = shared_from_this();
Destroyable::shared_pointer thisDestroyable = shared_from_this();
@@ -2045,7 +2055,8 @@ void ServerMonitorRequesterImpl::destroy()
// hold a reference to channelMonitor so that _channelMonitor.reset()
// does not call ~Monitor (external code) while we are holding a lock
Monitor::shared_pointer monitor(_channelMonitor);
Monitor::shared_pointer monitor;
window_t window;
{
Lock guard(_mutex);
_channel->unregisterRequest(_ioid);
@@ -2053,10 +2064,11 @@ void ServerMonitorRequesterImpl::destroy()
// asCheck
_channel->getChannelSecuritySession()->release(_ioid);
if (_channelMonitor) {
_channelMonitor.reset();
}
window.swap(_window_closed);
monitor.swap(_channelMonitor);
}
window.clear();
if(monitor) {
monitor->destroy();
}
@@ -2093,7 +2105,7 @@ void ServerMonitorRequesterImpl::send(ByteBuffer* buffer, TransportSendControl*
}
else
{
Monitor::shared_pointer monitor(_channelMonitor);
Monitor::shared_pointer monitor(getChannelMonitor());
if (!monitor)
return;
@@ -2107,7 +2119,7 @@ void ServerMonitorRequesterImpl::send(ByteBuffer* buffer, TransportSendControl*
buffer->putByte((int8)request);
// changedBitSet and data, if not notify only (i.e. queueSize == -1)
BitSet::shared_pointer changedBitSet = element->changedBitSet;
const BitSet::shared_pointer& changedBitSet = element->changedBitSet;
if (changedBitSet)
{
changedBitSet->serialize(buffer, control);
@@ -2117,7 +2129,23 @@ void ServerMonitorRequesterImpl::send(ByteBuffer* buffer, TransportSendControl*
element->overrunBitSet->serialize(buffer, control);
}
element.reset();
{
Lock guard(_mutex);
if(!_pipeline) {
} else if(_window_open==0) {
if(_pipeline) {
// Our logic has a problem, but try to continue.
message("Monitor Logic Error: send outside of window", epics::pvData::warningMessage);
LOG(logLevelError, "Monitor Logic Error: send outside of window %zu", _window_closed.size());
}
} else {
_window_closed.push_back(element.letGo());
_window_open--;
}
}
element.reset(); // calls Monitor::release() if not swap()'d
// TODO if we try to proces several monitors at once, then fairness suffers
// TODO compbine several monitors into one message (reduces payload)
@@ -2128,10 +2156,18 @@ void ServerMonitorRequesterImpl::send(ByteBuffer* buffer, TransportSendControl*
{
// TODO CAS
bool unlisten;
Lock guard(_mutex);
unlisten = _unlisten;
_unlisten = false;
guard.unlock();
window_t window;
{
Lock guard(_mutex);
unlisten = _unlisten;
_unlisten = false;
if(unlisten) {
window.swap(_window_closed);
_window_open = 0u;
}
}
window.clear(); // calls Monitor::release()
if (unlisten)
{
@@ -2145,6 +2181,43 @@ void ServerMonitorRequesterImpl::send(ByteBuffer* buffer, TransportSendControl*
}
}
void ServerMonitorRequesterImpl::ack(size_t cnt)
{
typedef std::vector<MonitorElementPtr> acking_t;
acking_t acking;
Monitor::shared_pointer mon;
{
Lock guard(_mutex);
// cnt will be larger if this is the initial window update,
// or if the window is being enlarged.
size_t nack = std::min(cnt, _window_closed.size());
_window_open += cnt;
window_t::iterator it, end(_window_closed.begin());
std::advance(end, nack);
acking.resize(nack);
size_t i;
for(i=0, it=_window_closed.begin(); i<nack; i++, ++it)
{
acking[i].swap(*it);
}
_window_closed.erase(_window_closed.begin(), end);
mon = _channelMonitor;
}
for(acking_t::iterator it(acking.begin()), end(acking.end()); it!=end; ++it) {
mon->release(*it);
}
mon->reportRemoteQueueStatus(cnt);
}
/****************************************************************************************/
void ServerArrayHandler::handleResponse(osiSockAddr* responseFrom,
Transport::shared_pointer const & transport, int8 version, int8 command,