/** * Copyright - See the COPYRIGHT that is included with this distribution. * pvAccessCPP is distributed subject to a Software License Agreement found * in file LICENSE that is included with this distribution. */ #include #include #include #include #define epicsExportSharedSymbols #include #include using namespace epics::pvData; using namespace std; namespace { using namespace epics::pvAccess; class ChannelPipelineMonitorImpl : public PipelineMonitor, public PipelineControl, public std::tr1::enable_shared_from_this { private: typedef vector FreeElementQueue; typedef queue MonitorElementQueue; Channel::shared_pointer m_channel; MonitorRequester::shared_pointer m_monitorRequester; PipelineSession::shared_pointer m_pipelineSession; size_t m_queueSize; FreeElementQueue m_freeQueue; MonitorElementQueue m_monitorQueue; Mutex m_freeQueueLock; Mutex m_monitorQueueLock; bool m_active; MonitorElement::shared_pointer m_nullMonitorElement; size_t m_requestedCount; bool m_pipeline; bool m_done; bool m_unlistenReported; public: ChannelPipelineMonitorImpl( Channel::shared_pointer const & channel, MonitorRequester::shared_pointer const & monitorRequester, epics::pvData::PVStructure::shared_pointer const & pvRequest, PipelineService::shared_pointer const & pipelineService) : m_channel(channel), m_monitorRequester(monitorRequester), m_queueSize(2), m_freeQueueLock(), m_monitorQueueLock(), m_active(false), m_requestedCount(0), m_pipeline(false), m_done(false), m_unlistenReported(false) { m_pipelineSession = pipelineService->createPipeline(pvRequest); // extract queueSize and pipeline parameter PVStructurePtr pvOptions = pvRequest->getSubField("record._options"); if (pvOptions) { PVStringPtr pvString = pvOptions->getSubField("queueSize"); if (pvString) { int32 size; std::stringstream ss; ss << pvString->get(); ss >> size; if (size > 1) m_queueSize = static_cast(size); } pvString = pvOptions->getSubField("pipeline"); if (pvString) m_pipeline = (pvString->get() == "true"); } // server queue size must be >= client queue size size_t minQueueSize = m_pipelineSession->getMinQueueSize(); if (m_queueSize < minQueueSize) m_queueSize = minQueueSize; Structure::const_shared_pointer structure = m_pipelineSession->getStructure(); // create free elements { Lock guard(m_freeQueueLock); m_freeQueue.reserve(m_queueSize); for (size_t i = 0; i < m_queueSize; i++) { PVStructure::shared_pointer pvStructure = getPVDataCreate()->createPVStructure(structure); MonitorElement::shared_pointer monitorElement(new MonitorElement(pvStructure)); // we always send all monitorElement->changedBitSet->set(0); m_freeQueue.push_back(monitorElement); } } } PipelineSession::shared_pointer getPipelineSession() const { return m_pipelineSession; } bool isPipelineEnabled() const { return m_pipeline; } virtual ~ChannelPipelineMonitorImpl() { destroy(); } virtual Status start() { bool notify = false; { Lock guard(m_monitorQueueLock); // already started if (m_active) return Status::Ok; m_active = true; notify = (m_monitorQueue.size() != 0); } if (notify) { Monitor::shared_pointer thisPtr = shared_from_this(); m_monitorRequester->monitorEvent(thisPtr); } return Status::Ok; } virtual Status stop() { Lock guard(m_monitorQueueLock); m_active = false; return Status::Ok; } // get next free element virtual MonitorElement::shared_pointer poll() { Lock guard(m_monitorQueueLock); // do not give send more elements than m_requestedCount // even if m_monitorQueue is not empty bool emptyQueue = m_monitorQueue.empty(); if (emptyQueue || m_requestedCount == 0 || !m_active) { // report "unlisten" event if queue empty and done, release lock first if (!m_unlistenReported && m_done && emptyQueue) { m_unlistenReported = true; guard.unlock(); m_monitorRequester->unlisten(shared_from_this()); } return m_nullMonitorElement; } MonitorElement::shared_pointer element = m_monitorQueue.front(); m_monitorQueue.pop(); m_requestedCount--; return element; } virtual void release(MonitorElement::shared_pointer const & monitorElement) { Lock guard(m_freeQueueLock); m_freeQueue.push_back(monitorElement); } virtual void reportRemoteQueueStatus(int32 freeElements) { // TODO check size_t count = static_cast(freeElements); //std::cout << "reportRemoteQueueStatus(" << count << ')' << std::endl; bool notify = false; { Lock guard(m_monitorQueueLock); m_requestedCount += count; notify = m_active && (m_monitorQueue.size() != 0); } // notify // TODO too many notify calls? if (notify) { Monitor::shared_pointer thisPtr = shared_from_this(); m_monitorRequester->monitorEvent(thisPtr); } m_pipelineSession->request(shared_from_this(), count); } virtual void destroy() { bool notifyCancel = false; { Lock guard(m_monitorQueueLock); m_active = false; notifyCancel = !m_done; m_done = true; } if (notifyCancel) m_pipelineSession->cancel(); } virtual size_t getFreeElementCount() { Lock guard(m_freeQueueLock); return m_freeQueue.size(); } virtual size_t getRequestedCount() { // TODO consider using atomic ops Lock guard(m_monitorQueueLock); return m_requestedCount; } virtual MonitorElement::shared_pointer getFreeElement() { Lock guard(m_freeQueueLock); if (m_freeQueue.empty()) return m_nullMonitorElement; MonitorElement::shared_pointer freeElement = m_freeQueue.back(); m_freeQueue.pop_back(); return freeElement; } virtual void putElement(MonitorElement::shared_pointer const & element) { bool notify = false; { Lock guard(m_monitorQueueLock); if (m_done) return; // throw std::logic_error("putElement called after done"); m_monitorQueue.push(element); // TODO there is way to much of notification, per each putElement notify = (m_requestedCount != 0); } // notify if (notify) { Monitor::shared_pointer thisPtr = shared_from_this(); m_monitorRequester->monitorEvent(thisPtr); } } virtual void done() { Lock guard(m_monitorQueueLock); m_done = true; bool report = !m_unlistenReported && m_monitorQueue.empty(); if (report) m_unlistenReported = true; guard.unlock(); if (report) m_monitorRequester->unlisten(shared_from_this()); } }; class PipelineChannel : public Channel, public std::tr1::enable_shared_from_this { private: static Status notSupportedStatus; static Status destroyedStatus; AtomicBoolean m_destroyed; ChannelProvider::shared_pointer m_provider; string m_channelName; ChannelRequester::shared_pointer m_channelRequester; PipelineService::shared_pointer m_pipelineService; public: POINTER_DEFINITIONS(PipelineChannel); PipelineChannel( ChannelProvider::shared_pointer const & provider, string const & channelName, ChannelRequester::shared_pointer const & channelRequester, PipelineService::shared_pointer const & pipelineService) : m_provider(provider), m_channelName(channelName), m_channelRequester(channelRequester), m_pipelineService(pipelineService) { } virtual ~PipelineChannel() { destroy(); } virtual std::tr1::shared_ptr getProvider() { return m_provider; } virtual std::string getRemoteAddress() { // local return getChannelName(); } virtual ConnectionState getConnectionState() { return m_destroyed.get() ? Channel::DESTROYED : Channel::CONNECTED; } virtual std::string getChannelName() { return m_channelName; } virtual std::tr1::shared_ptr getChannelRequester() { return m_channelRequester; } virtual AccessRights getAccessRights(epics::pvData::PVField::shared_pointer const & /*pvField*/) { return none; } virtual Monitor::shared_pointer createMonitor( MonitorRequester::shared_pointer const & monitorRequester, epics::pvData::PVStructure::shared_pointer const & pvRequest) { if (!pvRequest) throw std::invalid_argument("pvRequest == null"); if (m_destroyed.get()) { Monitor::shared_pointer nullPtr; epics::pvData::Structure::const_shared_pointer nullStructure; monitorRequester->monitorConnect(destroyedStatus, nullPtr, nullStructure); return nullPtr; } // TODO use std::make_shared std::tr1::shared_ptr tp( new ChannelPipelineMonitorImpl(shared_from_this(), monitorRequester, pvRequest, m_pipelineService) ); Monitor::shared_pointer channelPipelineMonitorImpl = tp; if (tp->isPipelineEnabled()) { monitorRequester->monitorConnect(Status::Ok, channelPipelineMonitorImpl, tp->getPipelineSession()->getStructure()); return channelPipelineMonitorImpl; } else { Monitor::shared_pointer nullPtr; epics::pvData::Structure::const_shared_pointer nullStructure; Status noPipelineEnabledStatus(Status::STATUSTYPE_ERROR, "pipeline option not enabled, use e.g. 'record[queueSize=16,pipeline=true]field(value)' pvRequest to enable pipelining"); monitorRequester->monitorConnect(noPipelineEnabledStatus, nullPtr, nullStructure); return nullPtr; } } virtual ChannelArray::shared_pointer createChannelArray( ChannelArrayRequester::shared_pointer const & channelArrayRequester, epics::pvData::PVStructure::shared_pointer const & /*pvRequest*/) { ChannelArray::shared_pointer nullPtr; channelArrayRequester->channelArrayConnect(notSupportedStatus, nullPtr, epics::pvData::Array::const_shared_pointer()); return nullPtr; } virtual void printInfo(std::ostream& out) { out << "PipelineChannel: "; out << getChannelName(); out << " ["; out << Channel::ConnectionStateNames[getConnectionState()]; out << "]"; } virtual string getRequesterName() { return getChannelName(); } virtual void destroy() { m_destroyed.set(); } }; Status PipelineChannel::notSupportedStatus(Status::STATUSTYPE_ERROR, "only monitor (aka pipeline) requests are supported by this channel"); Status PipelineChannel::destroyedStatus(Status::STATUSTYPE_ERROR, "channel destroyed"); } // namespace namespace epics { namespace pvAccess { Channel::shared_pointer createPipelineChannel(ChannelProvider::shared_pointer const & provider, std::string const & channelName, ChannelRequester::shared_pointer const & channelRequester, PipelineService::shared_pointer const & pipelineService) { // TODO use std::make_shared std::tr1::shared_ptr tp( new PipelineChannel(provider, channelName, channelRequester, pipelineService) ); Channel::shared_pointer channel = tp; return channel; } }} // epics::pvAccess namespace { class PipelineChannelProvider : public virtual ChannelProvider, public virtual ChannelFind, public std::tr1::enable_shared_from_this { public: POINTER_DEFINITIONS(PipelineChannelProvider); static string PROVIDER_NAME; static Status noSuchChannelStatus; // TODO thread pool support PipelineChannelProvider() { } virtual string getProviderName() { return PROVIDER_NAME; } virtual std::tr1::shared_ptr getChannelProvider() { return shared_from_this(); } virtual void cancel() {} virtual void destroy() {} virtual ChannelFind::shared_pointer channelFind(std::string const & channelName, ChannelFindRequester::shared_pointer const & channelFindRequester) { bool found; { Lock guard(m_mutex); found = (m_services.find(channelName) != m_services.end()) || findWildService(channelName); } ChannelFind::shared_pointer thisPtr(shared_from_this()); channelFindRequester->channelFindResult(Status::Ok, thisPtr, found); return thisPtr; } virtual ChannelFind::shared_pointer channelList( ChannelListRequester::shared_pointer const & channelListRequester) { if (!channelListRequester.get()) throw std::runtime_error("null requester"); PVStringArray::svector channelNames; { Lock guard(m_mutex); channelNames.reserve(m_services.size()); for (PipelineServiceMap::const_iterator iter = m_services.begin(); iter != m_services.end(); iter++) channelNames.push_back(iter->first); } ChannelFind::shared_pointer thisPtr(shared_from_this()); channelListRequester->channelListResult(Status::Ok, thisPtr, freeze(channelNames), false); return thisPtr; } virtual Channel::shared_pointer createChannel( std::string const & channelName, ChannelRequester::shared_pointer const & channelRequester, short /*priority*/) { PipelineService::shared_pointer service; PipelineServiceMap::const_iterator iter; { Lock guard(m_mutex); iter = m_services.find(channelName); } if (iter != m_services.end()) service = iter->second; // check for wild services if (!service) service = findWildService(channelName); if (!service) { Channel::shared_pointer nullChannel; channelRequester->channelCreated(noSuchChannelStatus, nullChannel); return nullChannel; } // TODO use std::make_shared std::tr1::shared_ptr tp( new PipelineChannel( shared_from_this(), channelName, channelRequester, service)); Channel::shared_pointer pipelineChannel = tp; channelRequester->channelCreated(Status::Ok, pipelineChannel); return pipelineChannel; } virtual Channel::shared_pointer createChannel( std::string const & /*channelName*/, ChannelRequester::shared_pointer const & /*channelRequester*/, short /*priority*/, std::string const & /*address*/) { // this will never get called by the pvAccess server throw std::runtime_error("not supported"); } void registerService(std::string const & serviceName, PipelineService::shared_pointer const & service) { Lock guard(m_mutex); m_services[serviceName] = service; if (isWildcardPattern(serviceName)) m_wildServices.push_back(std::make_pair(serviceName, service)); } void unregisterService(std::string const & serviceName) { Lock guard(m_mutex); m_services.erase(serviceName); if (isWildcardPattern(serviceName)) { for (PipelineWildServiceList::iterator iter = m_wildServices.begin(); iter != m_wildServices.end(); iter++) if (iter->first == serviceName) { m_wildServices.erase(iter); break; } } } private: // assumes sync on services PipelineService::shared_pointer findWildService(string const & wildcard) { if (!m_wildServices.empty()) for (PipelineWildServiceList::iterator iter = m_wildServices.begin(); iter != m_wildServices.end(); iter++) if (Wildcard::wildcardfit(iter->first.c_str(), wildcard.c_str())) return iter->second; return PipelineService::shared_pointer(); } // (too) simple check bool isWildcardPattern(string const & pattern) { return (pattern.find('*') != string::npos || pattern.find('?') != string::npos || (pattern.find('[') != string::npos && pattern.find(']') != string::npos)); } typedef std::map PipelineServiceMap; PipelineServiceMap m_services; typedef std::vector > PipelineWildServiceList; PipelineWildServiceList m_wildServices; epics::pvData::Mutex m_mutex; }; string PipelineChannelProvider::PROVIDER_NAME("PipelineService"); Status PipelineChannelProvider::noSuchChannelStatus(Status::STATUSTYPE_ERROR, "no such channel"); } //namespace namespace epics { namespace pvAccess { PipelineServer::PipelineServer() { ChannelProvider::shared_pointer prov(new PipelineChannelProvider); m_serverContext = ServerContext::create(ServerContext::Config() .provider(m_channelProviderImpl)); } PipelineServer::~PipelineServer() { // multiple destroy call is OK destroy(); } void PipelineServer::printInfo() { std::cout << m_serverContext->getVersion().getVersionString() << std::endl; m_serverContext->printInfo(); } void PipelineServer::run(int seconds) { m_serverContext->run(seconds); } /// Method requires usage of std::tr1::shared_ptr. This instance must be /// owned by a shared_ptr instance. void PipelineServer::runInNewThread(int seconds) { if(seconds!=0) std::cerr<<"PipelineServer::runInNewThread() only suppose seconds=0\n"; } void PipelineServer::destroy() { m_serverContext->shutdown(); } void PipelineServer::registerService(std::string const & serviceName, PipelineService::shared_pointer const & service) { std::tr1::dynamic_pointer_cast(m_channelProviderImpl)->registerService(serviceName, service); } void PipelineServer::unregisterService(std::string const & serviceName) { std::tr1::dynamic_pointer_cast(m_channelProviderImpl)->unregisterService(serviceName); } } }