diff --git a/pvtoolsSrc/eget.cpp b/pvtoolsSrc/eget.cpp index fd96c1a..511c5a3 100644 --- a/pvtoolsSrc/eget.cpp +++ b/pvtoolsSrc/eget.cpp @@ -1422,7 +1422,9 @@ class MonitorRequesterImpl : public MonitorRequester virtual void unlisten(Monitor::shared_pointer const & /*monitor*/) { - std::cerr << "unlisten" << std::endl; + //std::cerr << "unlisten" << std::endl; + // TODO + epicsExit(0); } }; @@ -1747,7 +1749,7 @@ int main (int argc, char *argv[]) fprintf(stderr, "failed to parse request string\n"); return 1; } - + // register "pva" and "ca" providers ClientFactory::start(); epics::pvAccess::ca::CAClientFactory::start(); diff --git a/src/Makefile b/src/Makefile index a456b70..6d6b9d7 100644 --- a/src/Makefile +++ b/src/Makefile @@ -17,6 +17,7 @@ include $(PVACCESS_SRC)/remoteClient/Makefile include $(PVACCESS_SRC)/server/Makefile include $(PVACCESS_SRC)/rpcService/Makefile include $(PVACCESS_SRC)/rpcClient/Makefile +include $(PVACCESS_SRC)/pipelineService/Makefile include $(PVACCESS_SRC)/ca/Makefile include $(PVACCESS_SRC)/mb/Makefile include $(PVACCESS_SRC)/ioc/Makefile diff --git a/src/ca/caChannel.cpp b/src/ca/caChannel.cpp index fa385a8..6c2e132 100644 --- a/src/ca/caChannel.cpp +++ b/src/ca/caChannel.cpp @@ -1438,8 +1438,6 @@ void CAChannelMonitor::release(epics::pvData::MonitorElementPtr const & /*monito // noop } - - /* --------------- epics::pvData::ChannelRequest --------------- */ void CAChannelMonitor::cancel() diff --git a/src/client/pvAccess.h b/src/client/pvAccess.h index 8c3eeac..c8afdf0 100644 --- a/src/client/pvAccess.h +++ b/src/client/pvAccess.h @@ -941,6 +941,23 @@ namespace pvAccess { epicsShareExtern void unregisterChannelProviderFactory(ChannelProviderFactory::shared_pointer const & channelProviderFactory); + /** + * @brief Pipeline (streaming) support API (optional). + * This is used by pvAccess to implement pipeline (streaming) monitors. + */ + class epicsShareClass PipelineMonitor : public virtual epics::pvData::Monitor { + public: + POINTER_DEFINITIONS(PipelineMonitor); + virtual ~PipelineMonitor(){} + + /** + * Report remote queue status. + * @param freeElements number of free elements. + */ + virtual void reportRemoteQueueStatus(epics::pvData::int32 freeElements) = 0; + }; + + }} #endif /* PVACCESS_H */ diff --git a/src/pipelineService/Makefile b/src/pipelineService/Makefile new file mode 100644 index 0000000..005179c --- /dev/null +++ b/src/pipelineService/Makefile @@ -0,0 +1,9 @@ +# This is a Makefile fragment, see ../Makefile + +SRC_DIRS += $(PVACCESS_SRC)/pipelineService + +INC += pipelineService.h +INC += pipelineServer.h + +LIBSRCS += pipelineService.cpp +LIBSRCS += pipelineServer.cpp diff --git a/src/pipelineService/pipelineServer.cpp b/src/pipelineService/pipelineServer.cpp new file mode 100644 index 0000000..3e25649 --- /dev/null +++ b/src/pipelineService/pipelineServer.cpp @@ -0,0 +1,814 @@ +/** + * Copyright - See the COPYRIGHT that is included with this distribution. + * pvAccessCPP is distributed subject to a Software License Agreement found + * in file LICENSE that is included with this distribution. + */ + +#include +#include +#include +#include + +#define epicsExportSharedSymbols +#include +#include + +using namespace epics::pvData; +using namespace std; + +namespace epics { namespace pvAccess { + +class ChannelPipelineMonitorImpl : + public PipelineMonitor, + public PipelineControl, + public std::tr1::enable_shared_from_this +{ + private: + + typedef vector FreeElementQueue; + typedef queue MonitorElementQueue; + + Channel::shared_pointer m_channel; + MonitorRequester::shared_pointer m_monitorRequester; + PipelineSession::shared_pointer m_pipelineSession; + + size_t m_queueSize; + + FreeElementQueue m_freeQueue; + MonitorElementQueue m_monitorQueue; + + Mutex m_freeQueueLock; + Mutex m_monitorQueueLock; + + bool m_active; + MonitorElement::shared_pointer m_nullMonitorElement; + + size_t m_requestedCount; + + bool m_pipeline; + + bool m_done; + + bool m_unlistenReported; + + public: + ChannelPipelineMonitorImpl( + Channel::shared_pointer const & channel, + MonitorRequester::shared_pointer const & monitorRequester, + epics::pvData::PVStructure::shared_pointer const & pvRequest, + PipelineService::shared_pointer const & pipelineService) : + m_channel(channel), + m_monitorRequester(monitorRequester), + m_queueSize(2), + m_freeQueueLock(), + m_monitorQueueLock(), + m_active(false), + m_requestedCount(0), + m_pipeline(false), + m_done(false), + m_unlistenReported(false) + { + + m_pipelineSession = pipelineService->createPipeline(pvRequest); + + // extract queueSize and pipeline parameter + PVStructurePtr pvOptions = pvRequest->getSubField("record._options"); + if (pvOptions) { + PVStringPtr pvString = pvOptions->getSubField("queueSize"); + if (pvString) { + int32 size; + std::stringstream ss; + ss << pvString->get(); + ss >> size; + if (size > 1) + m_queueSize = static_cast(size); + } + pvString = pvOptions->getSubField("pipeline"); + if (pvString) + m_pipeline = (pvString->get() == "true"); + } + + // server queue size must be >= client queue size + size_t minQueueSize = m_pipelineSession->getMinQueueSize(); + if (m_queueSize < minQueueSize) + m_queueSize = minQueueSize; + + Structure::const_shared_pointer structure = m_pipelineSession->getStructure(); + + // create free elements + { + Lock guard(m_freeQueueLock); + m_freeQueue.reserve(m_queueSize); + for (int32 i = 0; i < m_queueSize; i++) + { + PVStructure::shared_pointer pvStructure = getPVDataCreate()->createPVStructure(structure); + MonitorElement::shared_pointer monitorElement(new MonitorElement(pvStructure)); + // we always send all + monitorElement->changedBitSet->set(0); + m_freeQueue.push_back(monitorElement); + } + } + } + + PipelineSession::shared_pointer getPipelineSession() const { + return m_pipelineSession; + } + + bool isPipelineEnabled() const { + return m_pipeline; + } + + virtual ~ChannelPipelineMonitorImpl() + { + destroy(); + } + + virtual Status start() + { + bool notify = false; + { + Lock guard(m_monitorQueueLock); + + // already started + if (m_active) + return Status::Ok; + m_active = true; + + notify = (m_monitorQueue.size() != 0); + } + + if (notify) + { + Monitor::shared_pointer thisPtr = shared_from_this(); + m_monitorRequester->monitorEvent(thisPtr); + } + + return Status::Ok; + } + + virtual Status stop() + { + Lock guard(m_monitorQueueLock); + m_active = false; + return Status::Ok; + } + + // get next free element + virtual MonitorElement::shared_pointer poll() + { + Lock guard(m_monitorQueueLock); + + // do not give send more elements than m_requestedCount + // even if m_monitorQueue is not empty + bool emptyQueue = m_monitorQueue.empty(); + if (emptyQueue || m_requestedCount == 0 || !m_active) + { + // report "unlisten" event if queue empty and done, release lock first + if (!m_unlistenReported && m_done && emptyQueue) + { + m_unlistenReported = true; + guard.unlock(); + m_monitorRequester->unlisten(shared_from_this()); + } + + return m_nullMonitorElement; + } + + MonitorElement::shared_pointer element = m_monitorQueue.front(); + m_monitorQueue.pop(); + + m_requestedCount--; + + return element; + } + + virtual void release(MonitorElement::shared_pointer const & monitorElement) + { + Lock guard(m_freeQueueLock); + m_freeQueue.push_back(monitorElement); + } + + virtual void reportRemoteQueueStatus(int32 freeElements) + { + // TODO check + size_t count = static_cast(freeElements); + + //std::cout << "reportRemoteQueueStatus(" << count << ')' << std::endl; + + bool notify = false; + { + Lock guard(m_monitorQueueLock); + m_requestedCount += count; + notify = m_active && (m_monitorQueue.size() != 0); + } + + // notify + // TODO too many notify calls? + if (notify) + { + Monitor::shared_pointer thisPtr = shared_from_this(); + m_monitorRequester->monitorEvent(thisPtr); + } + + m_pipelineSession->request(shared_from_this(), count); + } + + virtual void destroy() + { + bool notifyCancel = false; + + { + Lock guard(m_monitorQueueLock); + m_active = false; + notifyCancel = !m_done; + m_done = true; + } + + if (notifyCancel) + m_pipelineSession->cancel(); + } + + virtual void lock() + { + // noop + } + + virtual void unlock() + { + // noop + } + + virtual size_t getFreeElementCount() { + Lock guard(m_freeQueueLock); + return m_freeQueue.size(); + } + + virtual size_t getRequestedCount() { + // TODO consider using atomic ops + Lock guard(m_monitorQueueLock); + return m_requestedCount; + } + + virtual epics::pvData::MonitorElement::shared_pointer getFreeElement() { + Lock guard(m_freeQueueLock); + if (m_freeQueue.empty()) + return m_nullMonitorElement; + + MonitorElement::shared_pointer freeElement = m_freeQueue.back(); + m_freeQueue.pop_back(); + + return freeElement; + } + + virtual void putElement(epics::pvData::MonitorElement::shared_pointer const & element) { + + bool notify = false; + { + Lock guard(m_monitorQueueLock); + if (m_done) + return; + // throw std::logic_error("putElement called after done"); + + m_monitorQueue.push(element); + // TODO there is way to much of notification, per each putElement + notify = (m_requestedCount != 0); + } + + // notify + if (notify) + { + Monitor::shared_pointer thisPtr = shared_from_this(); + m_monitorRequester->monitorEvent(thisPtr); + } + } + + virtual void done() { + Lock guard(m_monitorQueueLock); + m_done = true; + + bool report = !m_unlistenReported && m_monitorQueue.empty(); + if (report) + m_unlistenReported = true; + + guard.unlock(); + + if (report) + m_monitorRequester->unlisten(shared_from_this()); + } + +}; + + +class PipelineChannel : + public Channel, + public std::tr1::enable_shared_from_this +{ +private: + + static Status notSupportedStatus; + static Status destroyedStatus; + + AtomicBoolean m_destroyed; + + ChannelProvider::shared_pointer m_provider; + string m_channelName; + ChannelRequester::shared_pointer m_channelRequester; + + PipelineService::shared_pointer m_pipelineService; + +public: + POINTER_DEFINITIONS(PipelineChannel); + + PipelineChannel( + ChannelProvider::shared_pointer const & provider, + string const & channelName, + ChannelRequester::shared_pointer const & channelRequester, + PipelineService::shared_pointer const & pipelineService) : + m_provider(provider), + m_channelName(channelName), + m_channelRequester(channelRequester), + m_pipelineService(pipelineService) + { + } + + virtual ~PipelineChannel() + { + destroy(); + } + + virtual std::tr1::shared_ptr getProvider() + { + return m_provider; + } + + virtual std::string getRemoteAddress() + { + // local + return getChannelName(); + } + + virtual ConnectionState getConnectionState() + { + return isConnected() ? + Channel::CONNECTED : + Channel::DESTROYED; + } + + virtual std::string getChannelName() + { + return m_channelName; + } + + virtual std::tr1::shared_ptr getChannelRequester() + { + return m_channelRequester; + } + + virtual bool isConnected() + { + return !m_destroyed.get(); + } + + + virtual AccessRights getAccessRights(epics::pvData::PVField::shared_pointer const & /*pvField*/) + { + return none; + } + + virtual void getField(GetFieldRequester::shared_pointer const & requester,std::string const & /*subField*/) + { + requester->getDone(notSupportedStatus, epics::pvData::Field::shared_pointer()); + } + + virtual ChannelProcess::shared_pointer createChannelProcess( + ChannelProcessRequester::shared_pointer const & channelProcessRequester, + epics::pvData::PVStructure::shared_pointer const & /*pvRequest*/) + { + ChannelProcess::shared_pointer nullPtr; + channelProcessRequester->channelProcessConnect(notSupportedStatus, nullPtr); + return nullPtr; + } + + virtual ChannelGet::shared_pointer createChannelGet( + ChannelGetRequester::shared_pointer const & channelGetRequester, + epics::pvData::PVStructure::shared_pointer const & /*pvRequest*/) + { + ChannelGet::shared_pointer nullPtr; + channelGetRequester->channelGetConnect(notSupportedStatus, nullPtr, + epics::pvData::Structure::const_shared_pointer()); + return nullPtr; + } + + virtual ChannelPut::shared_pointer createChannelPut( + ChannelPutRequester::shared_pointer const & channelPutRequester, + epics::pvData::PVStructure::shared_pointer const & /*pvRequest*/) + { + ChannelPut::shared_pointer nullPtr; + channelPutRequester->channelPutConnect(notSupportedStatus, nullPtr, + epics::pvData::Structure::const_shared_pointer()); + return nullPtr; + } + + + virtual ChannelPutGet::shared_pointer createChannelPutGet( + ChannelPutGetRequester::shared_pointer const & channelPutGetRequester, + epics::pvData::PVStructure::shared_pointer const & /*pvRequest*/) + { + ChannelPutGet::shared_pointer nullPtr; + epics::pvData::Structure::const_shared_pointer nullStructure; + channelPutGetRequester->channelPutGetConnect(notSupportedStatus, nullPtr, nullStructure, nullStructure); + return nullPtr; + } + + virtual ChannelRPC::shared_pointer createChannelRPC( + ChannelRPCRequester::shared_pointer const & channelRPCRequester, + epics::pvData::PVStructure::shared_pointer const & /*pvRequest*/) + { + ChannelRPC::shared_pointer nullPtr; + channelRPCRequester->channelRPCConnect(notSupportedStatus, nullPtr); + return nullPtr; + } + + virtual epics::pvData::Monitor::shared_pointer createMonitor( + epics::pvData::MonitorRequester::shared_pointer const & monitorRequester, + epics::pvData::PVStructure::shared_pointer const & pvRequest) + { + if (!pvRequest) + throw std::invalid_argument("pvRequest == null"); + + if (m_destroyed.get()) + { + Monitor::shared_pointer nullPtr; + epics::pvData::Structure::const_shared_pointer nullStructure; + monitorRequester->monitorConnect(destroyedStatus, nullPtr, nullStructure); + return nullPtr; + } + + // TODO use std::make_shared + std::tr1::shared_ptr tp( + new ChannelPipelineMonitorImpl(shared_from_this(), monitorRequester, pvRequest, m_pipelineService) + ); + Monitor::shared_pointer channelPipelineMonitorImpl = tp; + + if (tp->isPipelineEnabled()) + { + monitorRequester->monitorConnect(Status::Ok, channelPipelineMonitorImpl, tp->getPipelineSession()->getStructure()); + return channelPipelineMonitorImpl; + } + else + { + Monitor::shared_pointer nullPtr; + epics::pvData::Structure::const_shared_pointer nullStructure; + Status noPipelineEnabledStatus(Status::STATUSTYPE_ERROR, "pipeline option not enabled, use e.g. 'record[queueSize=16,pipeline=true]field(value)' pvRequest to enable pipelining"); + monitorRequester->monitorConnect(noPipelineEnabledStatus, nullPtr, nullStructure); + return nullPtr; + } + } + + virtual ChannelArray::shared_pointer createChannelArray( + ChannelArrayRequester::shared_pointer const & channelArrayRequester, + epics::pvData::PVStructure::shared_pointer const & /*pvRequest*/) + { + ChannelArray::shared_pointer nullPtr; + channelArrayRequester->channelArrayConnect(notSupportedStatus, nullPtr, epics::pvData::Array::const_shared_pointer()); + return nullPtr; + } + + + virtual void printInfo() + { + printInfo(std::cout); + } + + virtual void printInfo(std::ostream& out) + { + out << "PipelineChannel: "; + out << getChannelName(); + out << " ["; + out << Channel::ConnectionStateNames[getConnectionState()]; + out << "]"; + } + + virtual string getRequesterName() + { + return getChannelName(); + } + + virtual void message(std::string const & message,MessageType messageType) + { + // just delegate + m_channelRequester->message(message, messageType); + } + + virtual void destroy() + { + m_destroyed.set(); + } +}; + +Status PipelineChannel::notSupportedStatus(Status::STATUSTYPE_ERROR, "only monitor (aka pipeline) requests are supported by this channel"); +Status PipelineChannel::destroyedStatus(Status::STATUSTYPE_ERROR, "channel destroyed"); + +Channel::shared_pointer createPipelineChannel(ChannelProvider::shared_pointer const & provider, + std::string const & channelName, + ChannelRequester::shared_pointer const & channelRequester, + PipelineService::shared_pointer const & pipelineService) +{ + // TODO use std::make_shared + std::tr1::shared_ptr tp( + new PipelineChannel(provider, channelName, channelRequester, pipelineService) + ); + Channel::shared_pointer channel = tp; + return channel; +} + + +class PipelineChannelProvider : + public virtual ChannelProvider, + public virtual ChannelFind, + public std::tr1::enable_shared_from_this { + +public: + POINTER_DEFINITIONS(PipelineChannelProvider); + + static string PROVIDER_NAME; + + static Status noSuchChannelStatus; + + // TODO thread pool support + + PipelineChannelProvider() { + } + + virtual string getProviderName() { + return PROVIDER_NAME; + } + + virtual std::tr1::shared_ptr getChannelProvider() + { + return shared_from_this(); + } + + virtual void cancel() {} + + virtual void destroy() {} + + virtual ChannelFind::shared_pointer channelFind(std::string const & channelName, + ChannelFindRequester::shared_pointer const & channelFindRequester) + { + bool found; + { + Lock guard(m_mutex); + found = (m_services.find(channelName) != m_services.end()) || + findWildService(channelName); + } + ChannelFind::shared_pointer thisPtr(shared_from_this()); + channelFindRequester->channelFindResult(Status::Ok, thisPtr, found); + return thisPtr; + } + + + virtual ChannelFind::shared_pointer channelList( + ChannelListRequester::shared_pointer const & channelListRequester) + { + if (!channelListRequester.get()) + throw std::runtime_error("null requester"); + + PVStringArray::svector channelNames; + { + Lock guard(m_mutex); + channelNames.reserve(m_services.size()); + for (PipelineServiceMap::const_iterator iter = m_services.begin(); + iter != m_services.end(); + iter++) + channelNames.push_back(iter->first); + } + + ChannelFind::shared_pointer thisPtr(shared_from_this()); + channelListRequester->channelListResult(Status::Ok, thisPtr, freeze(channelNames), false); + return thisPtr; + } + + virtual Channel::shared_pointer createChannel( + std::string const & channelName, + ChannelRequester::shared_pointer const & channelRequester, + short /*priority*/) + { + PipelineService::shared_pointer service; + + PipelineServiceMap::const_iterator iter; + { + Lock guard(m_mutex); + iter = m_services.find(channelName); + } + if (iter != m_services.end()) + service = iter->second; + + // check for wild services + if (!service) + service = findWildService(channelName); + + if (!service) + { + Channel::shared_pointer nullChannel; + channelRequester->channelCreated(noSuchChannelStatus, nullChannel); + return nullChannel; + } + + // TODO use std::make_shared + std::tr1::shared_ptr tp( + new PipelineChannel( + shared_from_this(), + channelName, + channelRequester, + service)); + Channel::shared_pointer pipelineChannel = tp; + channelRequester->channelCreated(Status::Ok, pipelineChannel); + return pipelineChannel; + } + + virtual Channel::shared_pointer createChannel( + std::string const & /*channelName*/, + ChannelRequester::shared_pointer const & /*channelRequester*/, + short /*priority*/, + std::string const & /*address*/) + { + // this will never get called by the pvAccess server + throw std::runtime_error("not supported"); + } + + void registerService(std::string const & serviceName, PipelineService::shared_pointer const & service) + { + Lock guard(m_mutex); + m_services[serviceName] = service; + + if (isWildcardPattern(serviceName)) + m_wildServices.push_back(std::make_pair(serviceName, service)); + } + + void unregisterService(std::string const & serviceName) + { + Lock guard(m_mutex); + m_services.erase(serviceName); + + if (isWildcardPattern(serviceName)) + { + for (PipelineWildServiceList::iterator iter = m_wildServices.begin(); + iter != m_wildServices.end(); + iter++) + if (iter->first == serviceName) + { + m_wildServices.erase(iter); + break; + } + } + } + +private: + // assumes sync on services + PipelineService::shared_pointer findWildService(string const & wildcard) + { + if (!m_wildServices.empty()) + for (PipelineWildServiceList::iterator iter = m_wildServices.begin(); + iter != m_wildServices.end(); + iter++) + if (Wildcard::wildcardfit(iter->first.c_str(), wildcard.c_str())) + return iter->second; + + return PipelineService::shared_pointer(); + } + + // (too) simple check + bool isWildcardPattern(string const & pattern) + { + return + (pattern.find('*') != string::npos || + pattern.find('?') != string::npos || + (pattern.find('[') != string::npos && pattern.find(']') != string::npos)); + } + + typedef std::map PipelineServiceMap; + PipelineServiceMap m_services; + + typedef std::vector > PipelineWildServiceList; + PipelineWildServiceList m_wildServices; + + epics::pvData::Mutex m_mutex; +}; + +string PipelineChannelProvider::PROVIDER_NAME("PipelineService"); +Status PipelineChannelProvider::noSuchChannelStatus(Status::STATUSTYPE_ERROR, "no such channel"); + + + +class PipelineChannelProviderFactory : public ChannelProviderFactory +{ +public: + POINTER_DEFINITIONS(PipelineChannelProviderFactory); + + PipelineChannelProviderFactory() : + m_channelProviderImpl(new PipelineChannelProvider()) + { + } + + virtual std::string getFactoryName() + { + return PipelineChannelProvider::PROVIDER_NAME; + } + + virtual ChannelProvider::shared_pointer sharedInstance() + { + return m_channelProviderImpl; + } + + virtual ChannelProvider::shared_pointer newInstance() + { + // TODO use std::make_shared + std::tr1::shared_ptr tp(new PipelineChannelProvider()); + ChannelProvider::shared_pointer channelProvider = tp; + return channelProvider; + } + +private: + PipelineChannelProvider::shared_pointer m_channelProviderImpl; +}; + + +PipelineServer::PipelineServer() +{ + // TODO factory is never deregistered, multiple PipelineServer instances create multiple factories, etc. + m_channelProviderFactory.reset(new PipelineChannelProviderFactory()); + registerChannelProviderFactory(m_channelProviderFactory); + + m_channelProviderImpl = m_channelProviderFactory->sharedInstance(); + + m_serverContext = ServerContextImpl::create(); + m_serverContext->setChannelProviderName(m_channelProviderImpl->getProviderName()); + + m_serverContext->initialize(getChannelProviderRegistry()); +} + +PipelineServer::~PipelineServer() +{ + // multiple destroy call is OK + destroy(); +} + +void PipelineServer::printInfo() +{ + std::cout << m_serverContext->getVersion().getVersionString() << std::endl; + m_serverContext->printInfo(); +} + +void PipelineServer::run(int seconds) +{ + m_serverContext->run(seconds); +} + +struct ThreadRunnerParam { + PipelineServer::shared_pointer server; + int timeToRun; +}; + +static void threadRunner(void* usr) +{ + ThreadRunnerParam* pusr = static_cast(usr); + ThreadRunnerParam param = *pusr; + delete pusr; + + param.server->run(param.timeToRun); +} + +/// Method requires usage of std::tr1::shared_ptr. This instance must be +/// owned by a shared_ptr instance. +void PipelineServer::runInNewThread(int seconds) +{ + std::auto_ptr param(new ThreadRunnerParam()); + param->server = shared_from_this(); + param->timeToRun = seconds; + + epicsThreadCreate("PipelineServer thread", + epicsThreadPriorityMedium, + epicsThreadGetStackSize(epicsThreadStackSmall), + threadRunner, param.get()); + + // let the thread delete 'param' + param.release(); +} + +void PipelineServer::destroy() +{ + m_serverContext->destroy(); +} + +void PipelineServer::registerService(std::string const & serviceName, PipelineService::shared_pointer const & service) +{ + std::tr1::dynamic_pointer_cast(m_channelProviderImpl)->registerService(serviceName, service); +} + +void PipelineServer::unregisterService(std::string const & serviceName) +{ + std::tr1::dynamic_pointer_cast(m_channelProviderImpl)->unregisterService(serviceName); +} + +}} diff --git a/src/pipelineService/pipelineServer.h b/src/pipelineService/pipelineServer.h new file mode 100644 index 0000000..222afea --- /dev/null +++ b/src/pipelineService/pipelineServer.h @@ -0,0 +1,74 @@ +/** + * Copyright - See the COPYRIGHT that is included with this distribution. + * pvAccessCPP is distributed subject to a Software License Agreement found + * in file LICENSE that is included with this distribution. + */ + +#ifndef PIPELINESERVER_H +#define PIPELINESERVER_H + +#ifdef epicsExportSharedSymbols +# define pipelineServerEpicsExportSharedSymbols +# undef epicsExportSharedSymbols +#endif + +#include + +#ifdef pipelineServerEpicsExportSharedSymbols +# define epicsExportSharedSymbols +# undef pipelineServerEpicsExportSharedSymbols +#endif + +#include +#include +#include + +#include + +namespace epics { namespace pvAccess { + +class epicsShareClass PipelineServer : + public std::tr1::enable_shared_from_this +{ + private: + + ServerContextImpl::shared_pointer m_serverContext; + ChannelProviderFactory::shared_pointer m_channelProviderFactory; + ChannelProvider::shared_pointer m_channelProviderImpl; + + // TODO no thread poll implementation + + public: + POINTER_DEFINITIONS(PipelineServer); + + PipelineServer(); + + virtual ~PipelineServer(); + + void registerService(std::string const & serviceName, PipelineService::shared_pointer const & service); + + void unregisterService(std::string const & serviceName); + + void run(int seconds = 0); + + /// Method requires usage of std::tr1::shared_ptr. This instance must be + /// owned by a shared_ptr instance. + void runInNewThread(int seconds = 0); + + void destroy(); + + /** + * Display basic information about the context. + */ + void printInfo(); + +}; + +epicsShareExtern Channel::shared_pointer createPipelineChannel(ChannelProvider::shared_pointer const & provider, + std::string const & channelName, + ChannelRequester::shared_pointer const & channelRequester, + PipelineService::shared_pointer const & pipelineService); + +}} + +#endif /* PIPELINESERVER_H */ diff --git a/src/pipelineService/pipelineService.cpp b/src/pipelineService/pipelineService.cpp new file mode 100644 index 0000000..3ae6938 --- /dev/null +++ b/src/pipelineService/pipelineService.cpp @@ -0,0 +1,8 @@ +/** + * Copyright - See the COPYRIGHT that is included with this distribution. + * pvAccessCPP is distributed subject to a Software License Agreement found + * in file LICENSE that is included with this distribution. + */ + +#define epicsExportSharedSymbols +#include diff --git a/src/pipelineService/pipelineService.h b/src/pipelineService/pipelineService.h new file mode 100644 index 0000000..6e4a394 --- /dev/null +++ b/src/pipelineService/pipelineService.h @@ -0,0 +1,101 @@ +/** + * Copyright - See the COPYRIGHT that is included with this distribution. + * pvAccessCPP is distributed subject to a Software License Agreement found + * in file LICENSE that is included with this distribution. + */ + +#ifndef PIPELINESERVICE_H +#define PIPELINESERVICE_H + +#include + +#ifdef epicsExportSharedSymbols +# define pipelineServiceEpicsExportSharedSymbols +# undef epicsExportSharedSymbols +#endif + +#include +#include + +#ifdef pipelineServiceEpicsExportSharedSymbols +# define epicsExportSharedSymbols +# undef pipelineServiceEpicsExportSharedSymbols +#endif + +#include + +#include + +namespace epics { namespace pvAccess { + +class epicsShareClass PipelineControl +{ +public: + POINTER_DEFINITIONS(PipelineControl); + + virtual ~PipelineControl() {}; + + /// Number of free elements in the local queue. + /// A service can (should) full up the entire queue. + virtual size_t getFreeElementCount() = 0; + + /// Total count of requested elements. + /// This is the minimum element count that a service should provide. + virtual size_t getRequestedCount() = 0; + + /// Grab next free element. + /// A service should take this element, populate it with the data + /// and return it back by calling putElement(). + virtual epics::pvData::MonitorElement::shared_pointer getFreeElement() = 0; + + /// Put element on the local queue (an element to be sent to a client). + virtual void putElement(epics::pvData::MonitorElement::shared_pointer const & element) = 0; + + /// Call to notify that there is no more data to pipelined. + /// This call destroyes corresponding pipeline session. + virtual void done() = 0; + +}; + + +class epicsShareClass PipelineSession +{ +public: + POINTER_DEFINITIONS(PipelineSession); + + virtual ~PipelineSession() {}; + + /// Returns (minimum) local queue size. + /// Actual local queue size = max( getMinQueueSize(), client queue size ); + virtual size_t getMinQueueSize() const = 0; + + /// Description of the structure used by this session. + virtual epics::pvData::Structure::const_shared_pointer getStructure() const = 0; + + /// Request for additional (!) elementCount elements. + /// The service should eventually call PipelineControl.getFreeElement() and PipelineControl.putElement() + /// to provide [PipelineControl.getRequestedCount(), PipelineControl.getFreeElementCount()] elements. + virtual void request(PipelineControl::shared_pointer const & control, size_t elementCount) = 0; + + /// Cancel the session (called by the client). + virtual void cancel() = 0; +}; + + +class epicsShareClass PipelineService +{ +public: + POINTER_DEFINITIONS(PipelineService); + + virtual ~PipelineService() {}; + + virtual PipelineSession::shared_pointer createPipeline( + epics::pvData::PVStructure::shared_pointer const & pvRequest + ) = 0; + +}; + + +}} + +#endif /* PIPELINESERVICE_H */ diff --git a/src/remoteClient/clientContextImpl.cpp b/src/remoteClient/clientContextImpl.cpp index 8ead845..fc6e92c 100644 --- a/src/remoteClient/clientContextImpl.cpp +++ b/src/remoteClient/clientContextImpl.cpp @@ -2061,16 +2061,18 @@ namespace epics { class MonitorStrategy : public Monitor { public: virtual ~MonitorStrategy() {}; - virtual void init(StructureConstPtr const & structure) = 0; - virtual void response(Transport::shared_pointer const & transport, ByteBuffer* payloadBuffer) = 0; - }; - + virtual void init(StructureConstPtr const & structure) = 0; + virtual void response(Transport::shared_pointer const & transport, ByteBuffer* payloadBuffer) = 0; + virtual void unlisten() = 0; + }; + typedef vector FreeElementQueue; typedef queue MonitorElementQueue; class MonitorStrategyQueue : public MonitorStrategy, + public TransportSender, public std::tr1::enable_shared_from_this { private: @@ -2096,16 +2098,36 @@ namespace epics { PVStructure::shared_pointer m_up2datePVStructure; - public: + int32 m_releasedCount; + bool m_reportQueueStateInProgress; - MonitorStrategyQueue(MonitorRequester::shared_pointer const & callback, int32 queueSize) : + // TODO check for cyclic-ref + ChannelImpl::shared_pointer m_channel; + pvAccessID m_ioid; + + bool m_pipeline; + int32 m_ackAny; + + bool m_unlisten; + + public: + + MonitorStrategyQueue(ChannelImpl::shared_pointer channel, pvAccessID ioid, + MonitorRequester::shared_pointer const & callback, + int32 queueSize, + bool pipeline, int32 ackAny) : m_queueSize(queueSize), m_lastStructure(), m_freeQueue(), m_monitorQueue(), m_callback(callback), m_mutex(), m_bitSet1(), m_bitSet2(), m_overrunInProgress(false), - m_nullMonitorElement() - { + m_nullMonitorElement(), + m_releasedCount(0), + m_reportQueueStateInProgress(false), + m_channel(channel), m_ioid(ioid), + m_pipeline(pipeline), m_ackAny(ackAny), + m_unlisten(false) + { if (queueSize <= 1) throw std::invalid_argument("queueSize <= 1"); @@ -2121,6 +2143,9 @@ namespace epics { virtual void init(StructureConstPtr const & structure) { Lock guard(m_mutex); + m_releasedCount = 0; + m_reportQueueStateInProgress = false; + // reuse on reconnect if (m_lastStructure.get() == 0 || *(m_lastStructure.get()) == *(structure.get())) @@ -2304,12 +2329,33 @@ namespace epics { } } + virtual void unlisten() + { + bool notifyUnlisten = false; + { + Lock guard(m_mutex); + notifyUnlisten = m_monitorQueue.empty(); + m_unlisten = !notifyUnlisten; + } + + if (notifyUnlisten) + { + EXCEPTION_GUARD(m_callback->unlisten(shared_from_this())); + } + } virtual MonitorElement::shared_pointer poll() { Lock guard(m_mutex); - if (m_monitorQueue.empty()) + if (m_monitorQueue.empty()) { + + if (m_unlisten) { + m_unlisten = false; + guard.unlock(); + EXCEPTION_GUARD(m_callback->unlisten(shared_from_this())); + } return m_nullMonitorElement; + } MonitorElement::shared_pointer retVal = m_monitorQueue.front(); m_monitorQueue.pop(); @@ -2318,6 +2364,8 @@ namespace epics { // NOTE: a client must always call poll() after release() to check the presence of any new monitor elements virtual void release(MonitorElement::shared_pointer const & monitorElement) { + bool sendAck = false; + { Lock guard(m_mutex); m_freeQueue.push_back(monitorElement); @@ -2334,6 +2382,55 @@ namespace epics { m_overrunElement.reset(); m_overrunInProgress = false; } + + if (m_pipeline) + { + m_releasedCount++; + if (!m_reportQueueStateInProgress && m_releasedCount >= m_ackAny) + { + sendAck = true; + m_reportQueueStateInProgress = true; + } + } + + if (sendAck) + { + try + { + m_channel->checkAndGetTransport()->enqueueSendRequest(shared_from_this()); + } catch (...) { + // noop (do not complain if fails) + m_reportQueueStateInProgress = false; + } + } + } + } + + virtual void send(ByteBuffer* buffer, TransportSendControl* control) { + control->startMessage((int8)CMD_MONITOR, 9); + buffer->putInt(m_channel->getServerChannelID()); + buffer->putInt(m_ioid); + buffer->putByte((int8)QOS_GET_PUT); + + { + Lock guard(m_mutex); + buffer->putInt(m_releasedCount); + m_releasedCount = 0; + m_reportQueueStateInProgress = false; + } + + // immediate send + control->flush(true); + } + + virtual void lock() + { + // noop + } + + virtual void unlock() + { + // noop } Status start() { @@ -2377,6 +2474,10 @@ namespace epics { std::tr1::shared_ptr m_monitorStrategy; + int32 m_queueSize; + bool m_pipeline; + int32 m_ackAny; + ChannelMonitorImpl( ChannelImpl::shared_pointer const & channel, MonitorRequester::shared_pointer const & monitorRequester, @@ -2385,7 +2486,10 @@ namespace epics { BaseRequestImpl(channel, monitorRequester), m_monitorRequester(monitorRequester), m_started(false), - m_pvRequest(pvRequest) + m_pvRequest(pvRequest), + m_queueSize(2), + m_pipeline(false), + m_ackAny(0) { PVACCESS_REFCOUNT_MONITOR_CONSTRUCT(channelMonitor); } @@ -2399,25 +2503,45 @@ namespace epics { return; } - int queueSize = 2; - { - PVStructurePtr pvOptions = m_pvRequest->getSubField("record._options"); - if(pvOptions.get()) { - PVStringPtr pvString = pvOptions->getSubField("queueSize"); - if(pvString.get()) { + PVStructurePtr pvOptions = m_pvRequest->getSubField("record._options"); + if (pvOptions) { + PVStringPtr pvString = pvOptions->getSubField("queueSize"); + if (pvString) { + int32 size; + std::stringstream ss; + ss << pvString->get(); + ss >> size; + if (size > 1) + m_queueSize = size; + } + pvString = pvOptions->getSubField("pipeline"); + if (pvString) + m_pipeline = (pvString->get() == "true"); + + // pipeline options + if (m_pipeline) + { + // defaults to queueSize/2 + m_ackAny = m_queueSize/2; + + pvString = pvOptions->getSubField("ackAny"); + if (pvString) { int32 size; std::stringstream ss; ss << pvString->get(); ss >> size; - queueSize = size; + if (size > 0) + m_ackAny = size; } } } - - BaseRequestImpl::activate(); - if (queueSize<2) queueSize = 2; - std::tr1::shared_ptr tp(new MonitorStrategyQueue(m_monitorRequester, queueSize)); + BaseRequestImpl::activate(); + + std::tr1::shared_ptr tp( + new MonitorStrategyQueue(m_channel, m_ioid, m_monitorRequester, m_queueSize, + m_pipeline, m_ackAny) + ); m_monitorStrategy = tp; // subscribe @@ -2430,6 +2554,16 @@ namespace epics { } } + // override default impl. to provide pipeline QoS flag + virtual void resubscribeSubscription(Transport::shared_pointer const & transport) { + if (transport.get() != 0 && !m_subscribed.get() && + startRequest(m_pipeline ? (QOS_INIT | QOS_GET_PUT) : QOS_INIT)) + { + m_subscribed.set(); + transport->enqueueSendRequest(shared_from_this()); + } + } + public: static Monitor::shared_pointer create( ChannelImpl::shared_pointer const & channel, @@ -2469,6 +2603,12 @@ namespace epics { { // pvRequest SerializationHelper::serializePVRequest(buffer, control, m_pvRequest); + + // if streaming + if (pendingRequest & QOS_GET_PUT) + { + buffer->putInt(m_queueSize); + } } stopRequest(); @@ -2515,6 +2655,16 @@ namespace epics { { // TODO not supported by IF yet... } + else if (qos & QOS_DESTROY) + { + // TODO for now status is ignored + + if (payloadBuffer->getRemaining()) + m_monitorStrategy->response(transport, payloadBuffer); + + // unlisten will be called when all the elements in the queue gets processed + m_monitorStrategy->unlisten(); + } else { m_monitorStrategy->response(transport, payloadBuffer); diff --git a/src/server/responseHandlers.cpp b/src/server/responseHandlers.cpp index 41484a8..0a78d33 100644 --- a/src/server/responseHandlers.cpp +++ b/src/server/responseHandlers.cpp @@ -1875,20 +1875,46 @@ void ServerMonitorHandler::handleResponse(osiSockAddr* responseFrom, // create... ServerMonitorRequesterImpl::create(_context, channel, ioid, transport, pvRequest); + + // pipelining monitor (i.e. w/ flow control) + const bool ack = (QOS_GET_PUT & qosCode) != 0; + if (ack) + { + int32 nfree = payloadBuffer->getInt(); + ServerMonitorRequesterImpl::shared_pointer request = static_pointer_cast(channel->getRequest(ioid)); + + Monitor::shared_pointer mp = request->getChannelMonitor(); + PipelineMonitor* pmp = dynamic_cast(mp.get()); + if (pmp) + pmp->reportRemoteQueueStatus(nfree); + } + } else { - const bool lastRequest = (QOS_DESTROY & qosCode) != 0; - const bool get = (QOS_GET & qosCode) != 0; - const bool process = (QOS_PROCESS & qosCode) != 0; + const bool lastRequest = (QOS_DESTROY & qosCode) != 0; + const bool get = (QOS_GET & qosCode) != 0; + const bool process = (QOS_PROCESS & qosCode) != 0; + const bool ack = (QOS_GET_PUT & qosCode) != 0; - ServerMonitorRequesterImpl::shared_pointer request = static_pointer_cast(channel->getRequest(ioid)); + ServerMonitorRequesterImpl::shared_pointer request = static_pointer_cast(channel->getRequest(ioid)); if (!request.get()) { BaseChannelRequester::sendFailureMessage((int8)CMD_MONITOR, transport, ioid, qosCode, BaseChannelRequester::badIOIDStatus); return; } + if (ack) + { + int32 nfree = payloadBuffer->getInt(); + Monitor::shared_pointer mp = request->getChannelMonitor(); + PipelineMonitor* pmp = dynamic_cast(mp.get()); + if (pmp) + pmp->reportRemoteQueueStatus(nfree); + return; + // note: not possible to ack and destroy + } + /* if (!request->startRequest(qosCode)) { @@ -1929,7 +1955,7 @@ void ServerMonitorHandler::handleResponse(osiSockAddr* responseFrom, ServerMonitorRequesterImpl::ServerMonitorRequesterImpl( ServerContextImpl::shared_pointer const & context, ServerChannelImpl::shared_pointer const & channel, const pvAccessID ioid, Transport::shared_pointer const & transport): - BaseChannelRequester(context, channel, ioid, transport), _channelMonitor(), _structure() + BaseChannelRequester(context, channel, ioid, transport), _channelMonitor(), _structure(), _unlisten(false) { } @@ -1946,7 +1972,7 @@ MonitorRequester::shared_pointer ServerMonitorRequesterImpl::create( void ServerMonitorRequesterImpl::activate(PVStructure::shared_pointer const & pvRequest) { - startRequest(QOS_INIT); + startRequest(QOS_INIT); MonitorRequester::shared_pointer thisPointer = shared_from_this(); Destroyable::shared_pointer thisDestroyable = shared_from_this(); _channel->registerRequest(_ioid, thisDestroyable); @@ -1973,7 +1999,12 @@ void ServerMonitorRequesterImpl::monitorConnect(const Status& status, Monitor::s void ServerMonitorRequesterImpl::unlisten(Monitor::shared_pointer const & /*monitor*/) { - //TODO + { + Lock guard(_mutex); + _unlisten = true; + } + TransportSender::shared_pointer thisSender = shared_from_this(); + _transport->enqueueSendRequest(thisSender); } void ServerMonitorRequesterImpl::monitorEvent(Monitor::shared_pointer const & /*monitor*/) @@ -2091,6 +2122,23 @@ void ServerMonitorRequesterImpl::send(ByteBuffer* buffer, TransportSendControl* TransportSender::shared_pointer thisSender = shared_from_this(); _transport->enqueueSendRequest(thisSender); } + else + { + // TODO CAS + bool unlisten; + Lock guard(_mutex); + unlisten = _unlisten; + _unlisten = false; + guard.unlock(); + + if (unlisten) + { + control->startMessage((int8)CMD_MONITOR, sizeof(int32)/sizeof(int8) + 1); + buffer->putInt(_ioid); + buffer->putByte((int8)QOS_DESTROY); + Status::Ok.serialize(buffer, control); + } + } } } diff --git a/src/server/responseHandlers.h b/src/server/responseHandlers.h index 9309670..fa8eef7 100644 --- a/src/server/responseHandlers.h +++ b/src/server/responseHandlers.h @@ -542,6 +542,7 @@ namespace pvAccess { epics::pvData::Monitor::shared_pointer _channelMonitor; epics::pvData::StructureConstPtr _structure; epics::pvData::Status _status; + bool _unlisten; }; diff --git a/testApp/remote/Makefile b/testApp/remote/Makefile index 9ef227e..a161fab 100644 --- a/testApp/remote/Makefile +++ b/testApp/remote/Makefile @@ -44,3 +44,7 @@ rpcWildServiceExample_SRCS += rpcWildServiceExample.cpp PROD_HOST += rpcClientExample rpcClientExample_SRCS += rpcClientExample.cpp + +PROD_HOST += pipelineServiceExample +pipelineServiceExample_SRCS += pipelineServiceExample.cpp + diff --git a/testApp/remote/pipelineServiceExample.cpp b/testApp/remote/pipelineServiceExample.cpp new file mode 100644 index 0000000..547b24a --- /dev/null +++ b/testApp/remote/pipelineServiceExample.cpp @@ -0,0 +1,100 @@ +/** + * Copyright - See the COPYRIGHT that is included with this distribution. + * pvAccessCPP is distributed subject to a Software License Agreement found + * in file LICENSE that is included with this distribution. + */ + +#include +#include + +using namespace epics::pvData; +using namespace epics::pvAccess; + +static Structure::const_shared_pointer dataStructure = + getFieldCreate()->createFieldBuilder()-> + add("count", pvInt)-> + createStructure(); + +class PipelineSessionImpl : + public PipelineSession +{ +public: + + PipelineSessionImpl( + epics::pvData::PVStructure::shared_pointer const & pvRequest + ) : + m_counter(0), + m_max(0) + { + PVStructure::shared_pointer pvOptions = pvRequest->getSubField("record._options"); + if (pvOptions) { + PVString::shared_pointer pvString = pvOptions->getSubField("limit"); + if (pvString) + { + // note: this throws an exception if conversion fails + m_max = pvString->getAs(); + } + } + + } + + size_t getMinQueueSize() const { + return 16; //1024; + } + + Structure::const_shared_pointer getStructure() const { + return dataStructure; + } + + virtual void request(PipelineControl::shared_pointer const & control, size_t elementCount) { + // blocking in this call is not a good thing + // but generating a simple counter data is fast + // we will generate as much elements as we can + size_t count = control->getFreeElementCount(); + for (size_t i = 0; i < count; i++) { + MonitorElement::shared_pointer element = control->getFreeElement(); + element->pvStructurePtr->getSubField(1 /*"count"*/)->put(m_counter++); + control->putElement(element); + + // we reached the limit, no more data + if (m_max != 0 && m_counter == m_max) + { + control->done(); + break; + } + } + } + + virtual void cancel() { + // noop, no need to clean any data-source resources + } + +private: + // NOTE: all the request calls will be made from the same thread, so we do not need sync m_counter + int32 m_counter; + int32 m_max; +}; + +class PipelineServiceImpl : + public PipelineService +{ + PipelineSession::shared_pointer createPipeline( + epics::pvData::PVStructure::shared_pointer const & pvRequest + ) + { + return PipelineSession::shared_pointer(new PipelineSessionImpl(pvRequest)); + } +}; + +int main() +{ + PipelineServer server; + + server.registerService("counterPipe", PipelineService::shared_pointer(new PipelineServiceImpl())); + // you can register as many services as you want here ... + + server.printInfo(); + server.run(); + + return 0; +}