pipeline support

This commit is contained in:
Matej Sekoranja
2015-04-29 15:17:54 -04:00
parent c492406d57
commit 197f763452
14 changed files with 2569 additions and 20 deletions

View File

@@ -2061,16 +2061,17 @@ namespace epics {
class MonitorStrategy : public Monitor {
public:
virtual ~MonitorStrategy() {};
virtual void init(StructureConstPtr const & structure) = 0;
virtual void response(Transport::shared_pointer const & transport, ByteBuffer* payloadBuffer) = 0;
};
virtual void init(StructureConstPtr const & structure) = 0;
virtual void response(Transport::shared_pointer const & transport, ByteBuffer* payloadBuffer) = 0;
};
typedef vector<MonitorElement::shared_pointer> FreeElementQueue;
typedef queue<MonitorElement::shared_pointer> MonitorElementQueue;
class MonitorStrategyQueue :
public MonitorStrategy,
public TransportSender,
public std::tr1::enable_shared_from_this<MonitorStrategyQueue>
{
private:
@@ -2096,16 +2097,26 @@ namespace epics {
PVStructure::shared_pointer m_up2datePVStructure;
public:
int32 m_releasedCount;
bool m_reportQueueStateInProgress;
MonitorStrategyQueue(MonitorRequester::shared_pointer const & callback, int32 queueSize) :
// TODO check for cyclic-ref
ChannelImpl::shared_pointer m_channel;
pvAccessID m_ioid;
public:
MonitorStrategyQueue(ChannelImpl::shared_pointer channel, pvAccessID ioid,
MonitorRequester::shared_pointer const & callback, int32 queueSize) :
m_queueSize(queueSize), m_lastStructure(),
m_freeQueue(),
m_monitorQueue(),
m_callback(callback), m_mutex(),
m_bitSet1(), m_bitSet2(), m_overrunInProgress(false),
m_nullMonitorElement()
{
m_nullMonitorElement(),
m_releasedCount(0),
m_reportQueueStateInProgress(false),
m_channel(channel), m_ioid(ioid)
{
if (queueSize <= 1)
throw std::invalid_argument("queueSize <= 1");
@@ -2121,6 +2132,9 @@ namespace epics {
virtual void init(StructureConstPtr const & structure) {
Lock guard(m_mutex);
m_releasedCount = 0;
m_reportQueueStateInProgress = false;
// reuse on reconnect
if (m_lastStructure.get() == 0 ||
*(m_lastStructure.get()) == *(structure.get()))
@@ -2318,6 +2332,8 @@ namespace epics {
// NOTE: a client must always call poll() after release() to check the presence of any new monitor elements
virtual void release(MonitorElement::shared_pointer const & monitorElement) {
bool sendAck = false;
{
Lock guard(m_mutex);
m_freeQueue.push_back(monitorElement);
@@ -2334,6 +2350,58 @@ namespace epics {
m_overrunElement.reset();
m_overrunInProgress = false;
}
m_releasedCount++;
// TODO limit reporting back?
if (!m_reportQueueStateInProgress)
{
sendAck = true;
m_reportQueueStateInProgress = true;
}
}
if (sendAck)
{
try
{
m_channel->checkAndGetTransport()->enqueueSendRequest(shared_from_this());
} catch (...) {
// noop (do not complain if fails)
m_reportQueueStateInProgress = false;
}
}
}
virtual void reportRemoteQueueStatus(int32 /*freeElements*/)
{
// noop for the client
}
virtual void send(ByteBuffer* buffer, TransportSendControl* control) {
control->startMessage((int8)CMD_MONITOR, 9);
buffer->putInt(m_channel->getServerChannelID());
buffer->putInt(m_ioid);
buffer->putByte((int8)QOS_GET_PUT);
{
Lock guard(m_mutex);
buffer->putInt(m_releasedCount);
m_releasedCount = 0;
m_reportQueueStateInProgress = false;
}
// immediate send
control->flush(true);
}
virtual void lock()
{
// noop
}
virtual void unlock()
{
// noop
}
Status start() {
@@ -2377,6 +2445,9 @@ namespace epics {
std::tr1::shared_ptr<MonitorStrategy> m_monitorStrategy;
int32 m_queueSize;
bool m_pipeline;
ChannelMonitorImpl(
ChannelImpl::shared_pointer const & channel,
MonitorRequester::shared_pointer const & monitorRequester,
@@ -2385,7 +2456,9 @@ namespace epics {
BaseRequestImpl(channel, monitorRequester),
m_monitorRequester(monitorRequester),
m_started(false),
m_pvRequest(pvRequest)
m_pvRequest(pvRequest),
m_queueSize(0),
m_pipeline(false)
{
PVACCESS_REFCOUNT_MONITOR_CONSTRUCT(channelMonitor);
}
@@ -2399,7 +2472,7 @@ namespace epics {
return;
}
int queueSize = 2;
m_queueSize = 2;
PVFieldPtr pvField = m_pvRequest->getSubField("record._options");
if (pvField.get()) {
PVStructurePtr pvOptions = static_pointer_cast<PVStructure>(pvField);
@@ -2411,15 +2484,18 @@ namespace epics {
std::stringstream ss;
ss << pvString->get();
ss >> size;
queueSize = size;
m_queueSize = size;
}
}
PVStringPtr pvString = pvOptions->getSubField<PVString>("pipeline");
if (pvString)
m_pipeline = (pvString->get() == "true");
}
BaseRequestImpl::activate();
if (queueSize<2) queueSize = 2;
std::tr1::shared_ptr<MonitorStrategyQueue> tp(new MonitorStrategyQueue(m_monitorRequester, queueSize));
BaseRequestImpl::activate();
if (m_queueSize < 2) m_queueSize = 2;
std::tr1::shared_ptr<MonitorStrategyQueue> tp(new MonitorStrategyQueue(m_channel, m_ioid, m_monitorRequester, m_queueSize));
m_monitorStrategy = tp;
// subscribe
@@ -2432,6 +2508,16 @@ namespace epics {
}
}
// override default impl. to provide pipeline QoS flag
virtual void resubscribeSubscription(Transport::shared_pointer const & transport) {
if (transport.get() != 0 && !m_subscribed.get() &&
startRequest(m_pipeline ? (QOS_INIT | QOS_GET_PUT) : QOS_INIT))
{
m_subscribed.set();
transport->enqueueSendRequest(shared_from_this());
}
}
public:
static Monitor::shared_pointer create(
ChannelImpl::shared_pointer const & channel,
@@ -2471,6 +2557,12 @@ namespace epics {
{
// pvRequest
SerializationHelper::serializePVRequest(buffer, control, m_pvRequest);
// if streaming
if (pendingRequest & QOS_GET_PUT)
{
buffer->putInt(m_queueSize);
}
}
stopRequest();
@@ -2629,6 +2721,11 @@ namespace epics {
m_monitorStrategy->release(monitorElement);
}
virtual void reportRemoteQueueStatus(int32 freeElements)
{
m_monitorStrategy->reportRemoteQueueStatus(freeElements);
}
virtual void lock()
{
// noop