diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..609e229 --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +bin +lib +include +dbd diff --git a/pvtoolsSrc/eget.cpp b/pvtoolsSrc/eget.cpp index fc5ae1d..53a6770 100644 --- a/pvtoolsSrc/eget.cpp +++ b/pvtoolsSrc/eget.cpp @@ -1422,7 +1422,9 @@ class MonitorRequesterImpl : public MonitorRequester virtual void unlisten(Monitor::shared_pointer const & /*monitor*/) { - std::cerr << "unlisten" << std::endl; + //std::cerr << "unlisten" << std::endl; + // TODO + epicsExit(0); } }; @@ -1747,7 +1749,7 @@ int main (int argc, char *argv[]) fprintf(stderr, "failed to parse request string\n"); return 1; } - + // register "pva" and "ca" providers ClientFactory::start(); epics::pvAccess::ca::CAClientFactory::start(); diff --git a/src/pipelineService/pipelineServer.cpp b/src/pipelineService/pipelineServer.cpp index 94caf92..ca88a29 100644 --- a/src/pipelineService/pipelineServer.cpp +++ b/src/pipelineService/pipelineServer.cpp @@ -40,11 +40,17 @@ class ChannelPipelineMonitorImpl : Mutex m_freeQueueLock; Mutex m_monitorQueueLock; - AtomicBoolean m_active; + bool m_active; MonitorElement::shared_pointer m_nullMonitorElement; size_t m_requestedCount; + bool m_pipeline; + + bool m_done; + + bool m_unlistenReported; + public: ChannelPipelineMonitorImpl( Channel::shared_pointer const & channel, @@ -56,28 +62,30 @@ class ChannelPipelineMonitorImpl : m_queueSize(2), m_freeQueueLock(), m_monitorQueueLock(), - m_active(), - m_requestedCount(0) + m_active(false), + m_requestedCount(0), + m_pipeline(false), + m_done(false), + m_unlistenReported(false) { m_pipelineSession = pipelineService->createPipeline(pvRequest); - // extract queueSize parameter - PVFieldPtr pvField = pvRequest->getSubField("record._options"); - if (pvField.get()) { - PVStructurePtr pvOptions = static_pointer_cast(pvField); - pvField = pvOptions->getSubField("queueSize"); - if (pvField.get()) { - PVStringPtr pvString = pvOptions->getStringField("queueSize"); - if(pvString) { - int32 size; - std::stringstream ss; - ss << pvString->get(); - ss >> size; - if (size < 2) size = 2; + // extract queueSize and pipeline parameter + PVStructurePtr pvOptions = pvRequest->getSubField("record._options"); + if (pvOptions) { + PVStringPtr pvString = pvOptions->getSubField("queueSize"); + if (pvString) { + int32 size; + std::stringstream ss; + ss << pvString->get(); + ss >> size; + if (size > 1) m_queueSize = static_cast(size); - } } + pvString = pvOptions->getSubField("pipeline"); + if (pvString) + m_pipeline = (pvString->get() == "true"); } // server queue size must be >= client queue size @@ -106,6 +114,10 @@ class ChannelPipelineMonitorImpl : return m_pipelineSession; } + bool isPipelineEnabled() const { + return m_pipeline; + } + virtual ~ChannelPipelineMonitorImpl() { destroy(); @@ -113,15 +125,17 @@ class ChannelPipelineMonitorImpl : virtual Status start() { - // already started - if (m_active.get()) - return Status::Ok; + bool notify = false; + { + Lock guard(m_monitorQueueLock); - m_active.set(); + // already started + if (m_active) + return Status::Ok; + m_active = true; - m_monitorQueueLock.lock(); - bool notify = (m_monitorQueue.size() != 0); - m_monitorQueueLock.unlock(); + notify = (m_monitorQueue.size() != 0); + } if (notify) { @@ -134,7 +148,8 @@ class ChannelPipelineMonitorImpl : virtual Status stop() { - m_active.clear(); + Lock guard(m_monitorQueueLock); + m_active = false; return Status::Ok; } @@ -145,8 +160,19 @@ class ChannelPipelineMonitorImpl : // do not give send more elements than m_requestedCount // even if m_monitorQueue is not empty - if (m_monitorQueue.empty() || m_requestedCount == 0) + bool emptyQueue = m_monitorQueue.empty(); + if (emptyQueue || m_requestedCount == 0 || !m_active) + { + // report "unlisten" event if queue empty and done, release lock first + if (!m_unlistenReported && m_done && emptyQueue) + { + m_unlistenReported = true; + guard.unlock(); + m_monitorRequester->unlisten(shared_from_this()); + } + return m_nullMonitorElement; + } MonitorElement::shared_pointer element = m_monitorQueue.front(); m_monitorQueue.pop(); @@ -173,7 +199,7 @@ class ChannelPipelineMonitorImpl : { Lock guard(m_monitorQueueLock); m_requestedCount += count; - notify = (m_monitorQueue.size() != 0); + notify = m_active && (m_monitorQueue.size() != 0); } // notify @@ -189,7 +215,17 @@ class ChannelPipelineMonitorImpl : virtual void destroy() { - stop(); + bool notifyCancel = false; + + { + Lock guard(m_monitorQueueLock); + m_active = false; + notifyCancel = !m_done; + m_done = true; + } + + if (notifyCancel) + m_pipelineSession->cancel(); } virtual void lock() @@ -229,6 +265,10 @@ class ChannelPipelineMonitorImpl : bool notify = false; { Lock guard(m_monitorQueueLock); + if (m_done) + return; + // throw std::logic_error("putElement called after done"); + m_monitorQueue.push(element); // TODO there is way to much of notification, per each putElement notify = (m_requestedCount != 0); @@ -243,7 +283,17 @@ class ChannelPipelineMonitorImpl : } virtual void done() { - // TODO + Lock guard(m_monitorQueueLock); + m_done = true; + + bool report = !m_unlistenReported && m_monitorQueue.empty(); + if (report) + m_unlistenReported = true; + + guard.unlock(); + + if (report) + m_monitorRequester->unlisten(shared_from_this()); } }; @@ -398,9 +448,21 @@ public: std::tr1::shared_ptr tp( new ChannelPipelineMonitorImpl(shared_from_this(), monitorRequester, pvRequest, m_pipelineService) ); - Monitor::shared_pointer ChannelPipelineMonitorImpl = tp; - monitorRequester->monitorConnect(Status::Ok, ChannelPipelineMonitorImpl, tp->getPipelineSession()->getStructure()); - return ChannelPipelineMonitorImpl; + Monitor::shared_pointer channelPipelineMonitorImpl = tp; + + if (tp->isPipelineEnabled()) + { + monitorRequester->monitorConnect(Status::Ok, channelPipelineMonitorImpl, tp->getPipelineSession()->getStructure()); + return channelPipelineMonitorImpl; + } + else + { + Monitor::shared_pointer nullPtr; + epics::pvData::Structure::const_shared_pointer nullStructure; + Status noPipelineEnabledStatus(Status::STATUSTYPE_ERROR, "pipeline option not enabled, use e.g. 'record[queueSize=16,pipeline=true]field(value)' pvRequest to enable pipelining"); + monitorRequester->monitorConnect(noPipelineEnabledStatus, nullPtr, nullStructure); + return nullPtr; + } } virtual ChannelArray::shared_pointer createChannelArray( diff --git a/src/remoteClient/clientContextImpl.cpp b/src/remoteClient/clientContextImpl.cpp index 651d514..bc3cf5b 100644 --- a/src/remoteClient/clientContextImpl.cpp +++ b/src/remoteClient/clientContextImpl.cpp @@ -2063,6 +2063,7 @@ namespace epics { virtual ~MonitorStrategy() {}; virtual void init(StructureConstPtr const & structure) = 0; virtual void response(Transport::shared_pointer const & transport, ByteBuffer* payloadBuffer) = 0; + virtual void unlisten() = 0; }; typedef vector FreeElementQueue; @@ -2103,10 +2104,18 @@ namespace epics { // TODO check for cyclic-ref ChannelImpl::shared_pointer m_channel; pvAccessID m_ioid; + + bool m_pipeline; + int32 m_ackAny; + + bool m_unlisten; + public: MonitorStrategyQueue(ChannelImpl::shared_pointer channel, pvAccessID ioid, - MonitorRequester::shared_pointer const & callback, int32 queueSize) : + MonitorRequester::shared_pointer const & callback, + int32 queueSize, + bool pipeline, int32 ackAny) : m_queueSize(queueSize), m_lastStructure(), m_freeQueue(), m_monitorQueue(), @@ -2115,7 +2124,9 @@ namespace epics { m_nullMonitorElement(), m_releasedCount(0), m_reportQueueStateInProgress(false), - m_channel(channel), m_ioid(ioid) + m_channel(channel), m_ioid(ioid), + m_pipeline(pipeline), m_ackAny(ackAny), + m_unlisten(false) { if (queueSize <= 1) throw std::invalid_argument("queueSize <= 1"); @@ -2318,12 +2329,33 @@ namespace epics { } } + virtual void unlisten() + { + bool notifyUnlisten = false; + { + Lock guard(m_mutex); + notifyUnlisten = m_monitorQueue.empty(); + m_unlisten = !notifyUnlisten; + } + + if (notifyUnlisten) + { + EXCEPTION_GUARD(m_callback->unlisten(shared_from_this())); + } + } virtual MonitorElement::shared_pointer poll() { Lock guard(m_mutex); - if (m_monitorQueue.empty()) + if (m_monitorQueue.empty()) { + + if (m_unlisten) { + m_unlisten = false; + guard.unlock(); + EXCEPTION_GUARD(m_callback->unlisten(shared_from_this())); + } return m_nullMonitorElement; + } MonitorElement::shared_pointer retVal = m_monitorQueue.front(); m_monitorQueue.pop(); @@ -2351,25 +2383,27 @@ namespace epics { m_overrunInProgress = false; } - m_releasedCount++; - // TODO limit reporting back? - if (!m_reportQueueStateInProgress) + if (m_pipeline) { - sendAck = true; - m_reportQueueStateInProgress = true; - } - } - - if (sendAck) - { - try - { - m_channel->checkAndGetTransport()->enqueueSendRequest(shared_from_this()); - } catch (...) { - // noop (do not complain if fails) - m_reportQueueStateInProgress = false; + m_releasedCount++; + if (!m_reportQueueStateInProgress && m_releasedCount >= m_ackAny) + { + sendAck = true; + m_reportQueueStateInProgress = true; + } } - } + + if (sendAck) + { + try + { + m_channel->checkAndGetTransport()->enqueueSendRequest(shared_from_this()); + } catch (...) { + // noop (do not complain if fails) + m_reportQueueStateInProgress = false; + } + } + } } virtual void reportRemoteQueueStatus(int32 /*freeElements*/) @@ -2447,6 +2481,7 @@ namespace epics { int32 m_queueSize; bool m_pipeline; + int32 m_ackAny; ChannelMonitorImpl( ChannelImpl::shared_pointer const & channel, @@ -2457,8 +2492,9 @@ namespace epics { m_monitorRequester(monitorRequester), m_started(false), m_pvRequest(pvRequest), - m_queueSize(0), - m_pipeline(false) + m_queueSize(2), + m_pipeline(false), + m_ackAny(0) { PVACCESS_REFCOUNT_MONITOR_CONSTRUCT(channelMonitor); } @@ -2472,30 +2508,45 @@ namespace epics { return; } - m_queueSize = 2; - PVFieldPtr pvField = m_pvRequest->getSubField("record._options"); - if (pvField.get()) { - PVStructurePtr pvOptions = static_pointer_cast(pvField); - pvField = pvOptions->getSubField("queueSize"); - if (pvField.get()) { - PVStringPtr pvString = pvOptions->getStringField("queueSize"); - if(pvString) { + PVStructurePtr pvOptions = m_pvRequest->getSubField("record._options"); + if (pvOptions) { + PVStringPtr pvString = pvOptions->getSubField("queueSize"); + if (pvString) { + int32 size; + std::stringstream ss; + ss << pvString->get(); + ss >> size; + if (size > 1) + m_queueSize = size; + } + pvString = pvOptions->getSubField("pipeline"); + if (pvString) + m_pipeline = (pvString->get() == "true"); + + // pipeline options + if (m_pipeline) + { + // defaults to queueSize/2 + m_ackAny = m_queueSize/2; + + pvString = pvOptions->getSubField("ackAny"); + if (pvString) { int32 size; std::stringstream ss; ss << pvString->get(); ss >> size; - m_queueSize = size; + if (size > 0) + m_ackAny = size; } } - PVStringPtr pvString = pvOptions->getSubField("pipeline"); - if (pvString) - m_pipeline = (pvString->get() == "true"); } BaseRequestImpl::activate(); - if (m_queueSize < 2) m_queueSize = 2; - std::tr1::shared_ptr tp(new MonitorStrategyQueue(m_channel, m_ioid, m_monitorRequester, m_queueSize)); + std::tr1::shared_ptr tp( + new MonitorStrategyQueue(m_channel, m_ioid, m_monitorRequester, m_queueSize, + m_pipeline, m_ackAny) + ); m_monitorStrategy = tp; // subscribe @@ -2609,6 +2660,16 @@ namespace epics { { // TODO not supported by IF yet... } + else if (qos & QOS_DESTROY) + { + // TODO for now status is ignored + + if (payloadBuffer->getRemaining()) + m_monitorStrategy->response(transport, payloadBuffer); + + // unlisten will be called when all the elements in the queue gets processed + m_monitorStrategy->unlisten(); + } else { m_monitorStrategy->response(transport, payloadBuffer); diff --git a/src/server/responseHandlers.cpp b/src/server/responseHandlers.cpp index f5a758e..8773b5e 100644 --- a/src/server/responseHandlers.cpp +++ b/src/server/responseHandlers.cpp @@ -1948,7 +1948,7 @@ void ServerMonitorHandler::handleResponse(osiSockAddr* responseFrom, ServerMonitorRequesterImpl::ServerMonitorRequesterImpl( ServerContextImpl::shared_pointer const & context, ServerChannelImpl::shared_pointer const & channel, const pvAccessID ioid, Transport::shared_pointer const & transport): - BaseChannelRequester(context, channel, ioid, transport), _channelMonitor(), _structure() + BaseChannelRequester(context, channel, ioid, transport), _channelMonitor(), _structure(), _unlisten(false) { } @@ -1992,7 +1992,12 @@ void ServerMonitorRequesterImpl::monitorConnect(const Status& status, Monitor::s void ServerMonitorRequesterImpl::unlisten(Monitor::shared_pointer const & /*monitor*/) { - //TODO + { + Lock guard(_mutex); + _unlisten = true; + } + TransportSender::shared_pointer thisSender = shared_from_this(); + _transport->enqueueSendRequest(thisSender); } void ServerMonitorRequesterImpl::monitorEvent(Monitor::shared_pointer const & /*monitor*/) @@ -2110,6 +2115,23 @@ void ServerMonitorRequesterImpl::send(ByteBuffer* buffer, TransportSendControl* TransportSender::shared_pointer thisSender = shared_from_this(); _transport->enqueueSendRequest(thisSender); } + else + { + // TODO CAS + bool unlisten; + Lock guard(_mutex); + unlisten = _unlisten; + _unlisten = false; + guard.unlock(); + + if (unlisten) + { + control->startMessage((int8)CMD_MONITOR, sizeof(int32)/sizeof(int8) + 1); + buffer->putInt(_ioid); + buffer->putByte((int8)QOS_DESTROY); + Status::Ok.serialize(buffer, control); + } + } } } diff --git a/src/server/responseHandlers.h b/src/server/responseHandlers.h index 9309670..fa8eef7 100644 --- a/src/server/responseHandlers.h +++ b/src/server/responseHandlers.h @@ -542,6 +542,7 @@ namespace pvAccess { epics::pvData::Monitor::shared_pointer _channelMonitor; epics::pvData::StructureConstPtr _structure; epics::pvData::Status _status; + bool _unlisten; }; diff --git a/testApp/remote/pipelineServiceExample.cpp b/testApp/remote/pipelineServiceExample.cpp index 1fcfe21..547b24a 100644 --- a/testApp/remote/pipelineServiceExample.cpp +++ b/testApp/remote/pipelineServiceExample.cpp @@ -21,10 +21,21 @@ class PipelineSessionImpl : public: PipelineSessionImpl( - epics::pvData::PVStructure::shared_pointer const & /*pvRequest*/ + epics::pvData::PVStructure::shared_pointer const & pvRequest ) : - m_counter(0) + m_counter(0), + m_max(0) { + PVStructure::shared_pointer pvOptions = pvRequest->getSubField("record._options"); + if (pvOptions) { + PVString::shared_pointer pvString = pvOptions->getSubField("limit"); + if (pvString) + { + // note: this throws an exception if conversion fails + m_max = pvString->getAs(); + } + } + } size_t getMinQueueSize() const { @@ -44,6 +55,13 @@ public: MonitorElement::shared_pointer element = control->getFreeElement(); element->pvStructurePtr->getSubField(1 /*"count"*/)->put(m_counter++); control->putElement(element); + + // we reached the limit, no more data + if (m_max != 0 && m_counter == m_max) + { + control->done(); + break; + } } } @@ -54,6 +72,7 @@ public: private: // NOTE: all the request calls will be made from the same thread, so we do not need sync m_counter int32 m_counter; + int32 m_max; }; class PipelineServiceImpl :