Merge branch 'msekoranja-default'

This commit is contained in:
Matej Sekoranja
2015-10-15 21:22:57 +02:00
14 changed files with 1359 additions and 32 deletions

View File

@@ -1422,7 +1422,9 @@ class MonitorRequesterImpl : public MonitorRequester
virtual void unlisten(Monitor::shared_pointer const & /*monitor*/)
{
std::cerr << "unlisten" << std::endl;
//std::cerr << "unlisten" << std::endl;
// TODO
epicsExit(0);
}
};
@@ -1747,7 +1749,7 @@ int main (int argc, char *argv[])
fprintf(stderr, "failed to parse request string\n");
return 1;
}
// register "pva" and "ca" providers
ClientFactory::start();
epics::pvAccess::ca::CAClientFactory::start();

View File

@@ -17,6 +17,7 @@ include $(PVACCESS_SRC)/remoteClient/Makefile
include $(PVACCESS_SRC)/server/Makefile
include $(PVACCESS_SRC)/rpcService/Makefile
include $(PVACCESS_SRC)/rpcClient/Makefile
include $(PVACCESS_SRC)/pipelineService/Makefile
include $(PVACCESS_SRC)/ca/Makefile
include $(PVACCESS_SRC)/mb/Makefile
include $(PVACCESS_SRC)/ioc/Makefile

View File

@@ -1438,8 +1438,6 @@ void CAChannelMonitor::release(epics::pvData::MonitorElementPtr const & /*monito
// noop
}
/* --------------- epics::pvData::ChannelRequest --------------- */
void CAChannelMonitor::cancel()

View File

@@ -941,6 +941,23 @@ namespace pvAccess {
epicsShareExtern void unregisterChannelProviderFactory(ChannelProviderFactory::shared_pointer const & channelProviderFactory);
/**
* @brief Pipeline (streaming) support API (optional).
* This is used by pvAccess to implement pipeline (streaming) monitors.
*/
class epicsShareClass PipelineMonitor : public virtual epics::pvData::Monitor {
public:
POINTER_DEFINITIONS(PipelineMonitor);
virtual ~PipelineMonitor(){}
/**
* Report remote queue status.
* @param freeElements number of free elements.
*/
virtual void reportRemoteQueueStatus(epics::pvData::int32 freeElements) = 0;
};
}}
#endif /* PVACCESS_H */

View File

@@ -0,0 +1,9 @@
# This is a Makefile fragment, see ../Makefile
SRC_DIRS += $(PVACCESS_SRC)/pipelineService
INC += pipelineService.h
INC += pipelineServer.h
LIBSRCS += pipelineService.cpp
LIBSRCS += pipelineServer.cpp

View File

@@ -0,0 +1,814 @@
/**
* Copyright - See the COPYRIGHT that is included with this distribution.
* pvAccessCPP is distributed subject to a Software License Agreement found
* in file LICENSE that is included with this distribution.
*/
#include <stdexcept>
#include <vector>
#include <queue>
#include <utility>
#define epicsExportSharedSymbols
#include <pv/pipelineServer.h>
#include <pv/wildcard.h>
using namespace epics::pvData;
using namespace std;
namespace epics { namespace pvAccess {
class ChannelPipelineMonitorImpl :
public PipelineMonitor,
public PipelineControl,
public std::tr1::enable_shared_from_this<ChannelPipelineMonitorImpl>
{
private:
typedef vector<MonitorElement::shared_pointer> FreeElementQueue;
typedef queue<MonitorElement::shared_pointer> MonitorElementQueue;
Channel::shared_pointer m_channel;
MonitorRequester::shared_pointer m_monitorRequester;
PipelineSession::shared_pointer m_pipelineSession;
size_t m_queueSize;
FreeElementQueue m_freeQueue;
MonitorElementQueue m_monitorQueue;
Mutex m_freeQueueLock;
Mutex m_monitorQueueLock;
bool m_active;
MonitorElement::shared_pointer m_nullMonitorElement;
size_t m_requestedCount;
bool m_pipeline;
bool m_done;
bool m_unlistenReported;
public:
ChannelPipelineMonitorImpl(
Channel::shared_pointer const & channel,
MonitorRequester::shared_pointer const & monitorRequester,
epics::pvData::PVStructure::shared_pointer const & pvRequest,
PipelineService::shared_pointer const & pipelineService) :
m_channel(channel),
m_monitorRequester(monitorRequester),
m_queueSize(2),
m_freeQueueLock(),
m_monitorQueueLock(),
m_active(false),
m_requestedCount(0),
m_pipeline(false),
m_done(false),
m_unlistenReported(false)
{
m_pipelineSession = pipelineService->createPipeline(pvRequest);
// extract queueSize and pipeline parameter
PVStructurePtr pvOptions = pvRequest->getSubField<PVStructure>("record._options");
if (pvOptions) {
PVStringPtr pvString = pvOptions->getSubField<PVString>("queueSize");
if (pvString) {
int32 size;
std::stringstream ss;
ss << pvString->get();
ss >> size;
if (size > 1)
m_queueSize = static_cast<size_t>(size);
}
pvString = pvOptions->getSubField<PVString>("pipeline");
if (pvString)
m_pipeline = (pvString->get() == "true");
}
// server queue size must be >= client queue size
size_t minQueueSize = m_pipelineSession->getMinQueueSize();
if (m_queueSize < minQueueSize)
m_queueSize = minQueueSize;
Structure::const_shared_pointer structure = m_pipelineSession->getStructure();
// create free elements
{
Lock guard(m_freeQueueLock);
m_freeQueue.reserve(m_queueSize);
for (int32 i = 0; i < m_queueSize; i++)
{
PVStructure::shared_pointer pvStructure = getPVDataCreate()->createPVStructure(structure);
MonitorElement::shared_pointer monitorElement(new MonitorElement(pvStructure));
// we always send all
monitorElement->changedBitSet->set(0);
m_freeQueue.push_back(monitorElement);
}
}
}
PipelineSession::shared_pointer getPipelineSession() const {
return m_pipelineSession;
}
bool isPipelineEnabled() const {
return m_pipeline;
}
virtual ~ChannelPipelineMonitorImpl()
{
destroy();
}
virtual Status start()
{
bool notify = false;
{
Lock guard(m_monitorQueueLock);
// already started
if (m_active)
return Status::Ok;
m_active = true;
notify = (m_monitorQueue.size() != 0);
}
if (notify)
{
Monitor::shared_pointer thisPtr = shared_from_this();
m_monitorRequester->monitorEvent(thisPtr);
}
return Status::Ok;
}
virtual Status stop()
{
Lock guard(m_monitorQueueLock);
m_active = false;
return Status::Ok;
}
// get next free element
virtual MonitorElement::shared_pointer poll()
{
Lock guard(m_monitorQueueLock);
// do not give send more elements than m_requestedCount
// even if m_monitorQueue is not empty
bool emptyQueue = m_monitorQueue.empty();
if (emptyQueue || m_requestedCount == 0 || !m_active)
{
// report "unlisten" event if queue empty and done, release lock first
if (!m_unlistenReported && m_done && emptyQueue)
{
m_unlistenReported = true;
guard.unlock();
m_monitorRequester->unlisten(shared_from_this());
}
return m_nullMonitorElement;
}
MonitorElement::shared_pointer element = m_monitorQueue.front();
m_monitorQueue.pop();
m_requestedCount--;
return element;
}
virtual void release(MonitorElement::shared_pointer const & monitorElement)
{
Lock guard(m_freeQueueLock);
m_freeQueue.push_back(monitorElement);
}
virtual void reportRemoteQueueStatus(int32 freeElements)
{
// TODO check
size_t count = static_cast<size_t>(freeElements);
//std::cout << "reportRemoteQueueStatus(" << count << ')' << std::endl;
bool notify = false;
{
Lock guard(m_monitorQueueLock);
m_requestedCount += count;
notify = m_active && (m_monitorQueue.size() != 0);
}
// notify
// TODO too many notify calls?
if (notify)
{
Monitor::shared_pointer thisPtr = shared_from_this();
m_monitorRequester->monitorEvent(thisPtr);
}
m_pipelineSession->request(shared_from_this(), count);
}
virtual void destroy()
{
bool notifyCancel = false;
{
Lock guard(m_monitorQueueLock);
m_active = false;
notifyCancel = !m_done;
m_done = true;
}
if (notifyCancel)
m_pipelineSession->cancel();
}
virtual void lock()
{
// noop
}
virtual void unlock()
{
// noop
}
virtual size_t getFreeElementCount() {
Lock guard(m_freeQueueLock);
return m_freeQueue.size();
}
virtual size_t getRequestedCount() {
// TODO consider using atomic ops
Lock guard(m_monitorQueueLock);
return m_requestedCount;
}
virtual epics::pvData::MonitorElement::shared_pointer getFreeElement() {
Lock guard(m_freeQueueLock);
if (m_freeQueue.empty())
return m_nullMonitorElement;
MonitorElement::shared_pointer freeElement = m_freeQueue.back();
m_freeQueue.pop_back();
return freeElement;
}
virtual void putElement(epics::pvData::MonitorElement::shared_pointer const & element) {
bool notify = false;
{
Lock guard(m_monitorQueueLock);
if (m_done)
return;
// throw std::logic_error("putElement called after done");
m_monitorQueue.push(element);
// TODO there is way to much of notification, per each putElement
notify = (m_requestedCount != 0);
}
// notify
if (notify)
{
Monitor::shared_pointer thisPtr = shared_from_this();
m_monitorRequester->monitorEvent(thisPtr);
}
}
virtual void done() {
Lock guard(m_monitorQueueLock);
m_done = true;
bool report = !m_unlistenReported && m_monitorQueue.empty();
if (report)
m_unlistenReported = true;
guard.unlock();
if (report)
m_monitorRequester->unlisten(shared_from_this());
}
};
class PipelineChannel :
public Channel,
public std::tr1::enable_shared_from_this<PipelineChannel>
{
private:
static Status notSupportedStatus;
static Status destroyedStatus;
AtomicBoolean m_destroyed;
ChannelProvider::shared_pointer m_provider;
string m_channelName;
ChannelRequester::shared_pointer m_channelRequester;
PipelineService::shared_pointer m_pipelineService;
public:
POINTER_DEFINITIONS(PipelineChannel);
PipelineChannel(
ChannelProvider::shared_pointer const & provider,
string const & channelName,
ChannelRequester::shared_pointer const & channelRequester,
PipelineService::shared_pointer const & pipelineService) :
m_provider(provider),
m_channelName(channelName),
m_channelRequester(channelRequester),
m_pipelineService(pipelineService)
{
}
virtual ~PipelineChannel()
{
destroy();
}
virtual std::tr1::shared_ptr<ChannelProvider> getProvider()
{
return m_provider;
}
virtual std::string getRemoteAddress()
{
// local
return getChannelName();
}
virtual ConnectionState getConnectionState()
{
return isConnected() ?
Channel::CONNECTED :
Channel::DESTROYED;
}
virtual std::string getChannelName()
{
return m_channelName;
}
virtual std::tr1::shared_ptr<ChannelRequester> getChannelRequester()
{
return m_channelRequester;
}
virtual bool isConnected()
{
return !m_destroyed.get();
}
virtual AccessRights getAccessRights(epics::pvData::PVField::shared_pointer const & /*pvField*/)
{
return none;
}
virtual void getField(GetFieldRequester::shared_pointer const & requester,std::string const & /*subField*/)
{
requester->getDone(notSupportedStatus, epics::pvData::Field::shared_pointer());
}
virtual ChannelProcess::shared_pointer createChannelProcess(
ChannelProcessRequester::shared_pointer const & channelProcessRequester,
epics::pvData::PVStructure::shared_pointer const & /*pvRequest*/)
{
ChannelProcess::shared_pointer nullPtr;
channelProcessRequester->channelProcessConnect(notSupportedStatus, nullPtr);
return nullPtr;
}
virtual ChannelGet::shared_pointer createChannelGet(
ChannelGetRequester::shared_pointer const & channelGetRequester,
epics::pvData::PVStructure::shared_pointer const & /*pvRequest*/)
{
ChannelGet::shared_pointer nullPtr;
channelGetRequester->channelGetConnect(notSupportedStatus, nullPtr,
epics::pvData::Structure::const_shared_pointer());
return nullPtr;
}
virtual ChannelPut::shared_pointer createChannelPut(
ChannelPutRequester::shared_pointer const & channelPutRequester,
epics::pvData::PVStructure::shared_pointer const & /*pvRequest*/)
{
ChannelPut::shared_pointer nullPtr;
channelPutRequester->channelPutConnect(notSupportedStatus, nullPtr,
epics::pvData::Structure::const_shared_pointer());
return nullPtr;
}
virtual ChannelPutGet::shared_pointer createChannelPutGet(
ChannelPutGetRequester::shared_pointer const & channelPutGetRequester,
epics::pvData::PVStructure::shared_pointer const & /*pvRequest*/)
{
ChannelPutGet::shared_pointer nullPtr;
epics::pvData::Structure::const_shared_pointer nullStructure;
channelPutGetRequester->channelPutGetConnect(notSupportedStatus, nullPtr, nullStructure, nullStructure);
return nullPtr;
}
virtual ChannelRPC::shared_pointer createChannelRPC(
ChannelRPCRequester::shared_pointer const & channelRPCRequester,
epics::pvData::PVStructure::shared_pointer const & /*pvRequest*/)
{
ChannelRPC::shared_pointer nullPtr;
channelRPCRequester->channelRPCConnect(notSupportedStatus, nullPtr);
return nullPtr;
}
virtual epics::pvData::Monitor::shared_pointer createMonitor(
epics::pvData::MonitorRequester::shared_pointer const & monitorRequester,
epics::pvData::PVStructure::shared_pointer const & pvRequest)
{
if (!pvRequest)
throw std::invalid_argument("pvRequest == null");
if (m_destroyed.get())
{
Monitor::shared_pointer nullPtr;
epics::pvData::Structure::const_shared_pointer nullStructure;
monitorRequester->monitorConnect(destroyedStatus, nullPtr, nullStructure);
return nullPtr;
}
// TODO use std::make_shared
std::tr1::shared_ptr<ChannelPipelineMonitorImpl> tp(
new ChannelPipelineMonitorImpl(shared_from_this(), monitorRequester, pvRequest, m_pipelineService)
);
Monitor::shared_pointer channelPipelineMonitorImpl = tp;
if (tp->isPipelineEnabled())
{
monitorRequester->monitorConnect(Status::Ok, channelPipelineMonitorImpl, tp->getPipelineSession()->getStructure());
return channelPipelineMonitorImpl;
}
else
{
Monitor::shared_pointer nullPtr;
epics::pvData::Structure::const_shared_pointer nullStructure;
Status noPipelineEnabledStatus(Status::STATUSTYPE_ERROR, "pipeline option not enabled, use e.g. 'record[queueSize=16,pipeline=true]field(value)' pvRequest to enable pipelining");
monitorRequester->monitorConnect(noPipelineEnabledStatus, nullPtr, nullStructure);
return nullPtr;
}
}
virtual ChannelArray::shared_pointer createChannelArray(
ChannelArrayRequester::shared_pointer const & channelArrayRequester,
epics::pvData::PVStructure::shared_pointer const & /*pvRequest*/)
{
ChannelArray::shared_pointer nullPtr;
channelArrayRequester->channelArrayConnect(notSupportedStatus, nullPtr, epics::pvData::Array::const_shared_pointer());
return nullPtr;
}
virtual void printInfo()
{
printInfo(std::cout);
}
virtual void printInfo(std::ostream& out)
{
out << "PipelineChannel: ";
out << getChannelName();
out << " [";
out << Channel::ConnectionStateNames[getConnectionState()];
out << "]";
}
virtual string getRequesterName()
{
return getChannelName();
}
virtual void message(std::string const & message,MessageType messageType)
{
// just delegate
m_channelRequester->message(message, messageType);
}
virtual void destroy()
{
m_destroyed.set();
}
};
Status PipelineChannel::notSupportedStatus(Status::STATUSTYPE_ERROR, "only monitor (aka pipeline) requests are supported by this channel");
Status PipelineChannel::destroyedStatus(Status::STATUSTYPE_ERROR, "channel destroyed");
Channel::shared_pointer createPipelineChannel(ChannelProvider::shared_pointer const & provider,
std::string const & channelName,
ChannelRequester::shared_pointer const & channelRequester,
PipelineService::shared_pointer const & pipelineService)
{
// TODO use std::make_shared
std::tr1::shared_ptr<PipelineChannel> tp(
new PipelineChannel(provider, channelName, channelRequester, pipelineService)
);
Channel::shared_pointer channel = tp;
return channel;
}
class PipelineChannelProvider :
public virtual ChannelProvider,
public virtual ChannelFind,
public std::tr1::enable_shared_from_this<PipelineChannelProvider> {
public:
POINTER_DEFINITIONS(PipelineChannelProvider);
static string PROVIDER_NAME;
static Status noSuchChannelStatus;
// TODO thread pool support
PipelineChannelProvider() {
}
virtual string getProviderName() {
return PROVIDER_NAME;
}
virtual std::tr1::shared_ptr<ChannelProvider> getChannelProvider()
{
return shared_from_this();
}
virtual void cancel() {}
virtual void destroy() {}
virtual ChannelFind::shared_pointer channelFind(std::string const & channelName,
ChannelFindRequester::shared_pointer const & channelFindRequester)
{
bool found;
{
Lock guard(m_mutex);
found = (m_services.find(channelName) != m_services.end()) ||
findWildService(channelName);
}
ChannelFind::shared_pointer thisPtr(shared_from_this());
channelFindRequester->channelFindResult(Status::Ok, thisPtr, found);
return thisPtr;
}
virtual ChannelFind::shared_pointer channelList(
ChannelListRequester::shared_pointer const & channelListRequester)
{
if (!channelListRequester.get())
throw std::runtime_error("null requester");
PVStringArray::svector channelNames;
{
Lock guard(m_mutex);
channelNames.reserve(m_services.size());
for (PipelineServiceMap::const_iterator iter = m_services.begin();
iter != m_services.end();
iter++)
channelNames.push_back(iter->first);
}
ChannelFind::shared_pointer thisPtr(shared_from_this());
channelListRequester->channelListResult(Status::Ok, thisPtr, freeze(channelNames), false);
return thisPtr;
}
virtual Channel::shared_pointer createChannel(
std::string const & channelName,
ChannelRequester::shared_pointer const & channelRequester,
short /*priority*/)
{
PipelineService::shared_pointer service;
PipelineServiceMap::const_iterator iter;
{
Lock guard(m_mutex);
iter = m_services.find(channelName);
}
if (iter != m_services.end())
service = iter->second;
// check for wild services
if (!service)
service = findWildService(channelName);
if (!service)
{
Channel::shared_pointer nullChannel;
channelRequester->channelCreated(noSuchChannelStatus, nullChannel);
return nullChannel;
}
// TODO use std::make_shared
std::tr1::shared_ptr<PipelineChannel> tp(
new PipelineChannel(
shared_from_this(),
channelName,
channelRequester,
service));
Channel::shared_pointer pipelineChannel = tp;
channelRequester->channelCreated(Status::Ok, pipelineChannel);
return pipelineChannel;
}
virtual Channel::shared_pointer createChannel(
std::string const & /*channelName*/,
ChannelRequester::shared_pointer const & /*channelRequester*/,
short /*priority*/,
std::string const & /*address*/)
{
// this will never get called by the pvAccess server
throw std::runtime_error("not supported");
}
void registerService(std::string const & serviceName, PipelineService::shared_pointer const & service)
{
Lock guard(m_mutex);
m_services[serviceName] = service;
if (isWildcardPattern(serviceName))
m_wildServices.push_back(std::make_pair(serviceName, service));
}
void unregisterService(std::string const & serviceName)
{
Lock guard(m_mutex);
m_services.erase(serviceName);
if (isWildcardPattern(serviceName))
{
for (PipelineWildServiceList::iterator iter = m_wildServices.begin();
iter != m_wildServices.end();
iter++)
if (iter->first == serviceName)
{
m_wildServices.erase(iter);
break;
}
}
}
private:
// assumes sync on services
PipelineService::shared_pointer findWildService(string const & wildcard)
{
if (!m_wildServices.empty())
for (PipelineWildServiceList::iterator iter = m_wildServices.begin();
iter != m_wildServices.end();
iter++)
if (Wildcard::wildcardfit(iter->first.c_str(), wildcard.c_str()))
return iter->second;
return PipelineService::shared_pointer();
}
// (too) simple check
bool isWildcardPattern(string const & pattern)
{
return
(pattern.find('*') != string::npos ||
pattern.find('?') != string::npos ||
(pattern.find('[') != string::npos && pattern.find(']') != string::npos));
}
typedef std::map<string, PipelineService::shared_pointer> PipelineServiceMap;
PipelineServiceMap m_services;
typedef std::vector<std::pair<string, PipelineService::shared_pointer> > PipelineWildServiceList;
PipelineWildServiceList m_wildServices;
epics::pvData::Mutex m_mutex;
};
string PipelineChannelProvider::PROVIDER_NAME("PipelineService");
Status PipelineChannelProvider::noSuchChannelStatus(Status::STATUSTYPE_ERROR, "no such channel");
class PipelineChannelProviderFactory : public ChannelProviderFactory
{
public:
POINTER_DEFINITIONS(PipelineChannelProviderFactory);
PipelineChannelProviderFactory() :
m_channelProviderImpl(new PipelineChannelProvider())
{
}
virtual std::string getFactoryName()
{
return PipelineChannelProvider::PROVIDER_NAME;
}
virtual ChannelProvider::shared_pointer sharedInstance()
{
return m_channelProviderImpl;
}
virtual ChannelProvider::shared_pointer newInstance()
{
// TODO use std::make_shared
std::tr1::shared_ptr<PipelineChannelProvider> tp(new PipelineChannelProvider());
ChannelProvider::shared_pointer channelProvider = tp;
return channelProvider;
}
private:
PipelineChannelProvider::shared_pointer m_channelProviderImpl;
};
PipelineServer::PipelineServer()
{
// TODO factory is never deregistered, multiple PipelineServer instances create multiple factories, etc.
m_channelProviderFactory.reset(new PipelineChannelProviderFactory());
registerChannelProviderFactory(m_channelProviderFactory);
m_channelProviderImpl = m_channelProviderFactory->sharedInstance();
m_serverContext = ServerContextImpl::create();
m_serverContext->setChannelProviderName(m_channelProviderImpl->getProviderName());
m_serverContext->initialize(getChannelProviderRegistry());
}
PipelineServer::~PipelineServer()
{
// multiple destroy call is OK
destroy();
}
void PipelineServer::printInfo()
{
std::cout << m_serverContext->getVersion().getVersionString() << std::endl;
m_serverContext->printInfo();
}
void PipelineServer::run(int seconds)
{
m_serverContext->run(seconds);
}
struct ThreadRunnerParam {
PipelineServer::shared_pointer server;
int timeToRun;
};
static void threadRunner(void* usr)
{
ThreadRunnerParam* pusr = static_cast<ThreadRunnerParam*>(usr);
ThreadRunnerParam param = *pusr;
delete pusr;
param.server->run(param.timeToRun);
}
/// Method requires usage of std::tr1::shared_ptr<PipelineServer>. This instance must be
/// owned by a shared_ptr instance.
void PipelineServer::runInNewThread(int seconds)
{
std::auto_ptr<ThreadRunnerParam> param(new ThreadRunnerParam());
param->server = shared_from_this();
param->timeToRun = seconds;
epicsThreadCreate("PipelineServer thread",
epicsThreadPriorityMedium,
epicsThreadGetStackSize(epicsThreadStackSmall),
threadRunner, param.get());
// let the thread delete 'param'
param.release();
}
void PipelineServer::destroy()
{
m_serverContext->destroy();
}
void PipelineServer::registerService(std::string const & serviceName, PipelineService::shared_pointer const & service)
{
std::tr1::dynamic_pointer_cast<PipelineChannelProvider>(m_channelProviderImpl)->registerService(serviceName, service);
}
void PipelineServer::unregisterService(std::string const & serviceName)
{
std::tr1::dynamic_pointer_cast<PipelineChannelProvider>(m_channelProviderImpl)->unregisterService(serviceName);
}
}}

View File

@@ -0,0 +1,74 @@
/**
* Copyright - See the COPYRIGHT that is included with this distribution.
* pvAccessCPP is distributed subject to a Software License Agreement found
* in file LICENSE that is included with this distribution.
*/
#ifndef PIPELINESERVER_H
#define PIPELINESERVER_H
#ifdef epicsExportSharedSymbols
# define pipelineServerEpicsExportSharedSymbols
# undef epicsExportSharedSymbols
#endif
#include <pv/sharedPtr.h>
#ifdef pipelineServerEpicsExportSharedSymbols
# define epicsExportSharedSymbols
# undef pipelineServerEpicsExportSharedSymbols
#endif
#include <pv/pvAccess.h>
#include <pv/pipelineService.h>
#include <pv/serverContext.h>
#include <shareLib.h>
namespace epics { namespace pvAccess {
class epicsShareClass PipelineServer :
public std::tr1::enable_shared_from_this<PipelineServer>
{
private:
ServerContextImpl::shared_pointer m_serverContext;
ChannelProviderFactory::shared_pointer m_channelProviderFactory;
ChannelProvider::shared_pointer m_channelProviderImpl;
// TODO no thread poll implementation
public:
POINTER_DEFINITIONS(PipelineServer);
PipelineServer();
virtual ~PipelineServer();
void registerService(std::string const & serviceName, PipelineService::shared_pointer const & service);
void unregisterService(std::string const & serviceName);
void run(int seconds = 0);
/// Method requires usage of std::tr1::shared_ptr<PipelineServer>. This instance must be
/// owned by a shared_ptr instance.
void runInNewThread(int seconds = 0);
void destroy();
/**
* Display basic information about the context.
*/
void printInfo();
};
epicsShareExtern Channel::shared_pointer createPipelineChannel(ChannelProvider::shared_pointer const & provider,
std::string const & channelName,
ChannelRequester::shared_pointer const & channelRequester,
PipelineService::shared_pointer const & pipelineService);
}}
#endif /* PIPELINESERVER_H */

View File

@@ -0,0 +1,8 @@
/**
* Copyright - See the COPYRIGHT that is included with this distribution.
* pvAccessCPP is distributed subject to a Software License Agreement found
* in file LICENSE that is included with this distribution.
*/
#define epicsExportSharedSymbols
#include <pv/pipelineService.h>

View File

@@ -0,0 +1,101 @@
/**
* Copyright - See the COPYRIGHT that is included with this distribution.
* pvAccessCPP is distributed subject to a Software License Agreement found
* in file LICENSE that is included with this distribution.
*/
#ifndef PIPELINESERVICE_H
#define PIPELINESERVICE_H
#include <stdexcept>
#ifdef epicsExportSharedSymbols
# define pipelineServiceEpicsExportSharedSymbols
# undef epicsExportSharedSymbols
#endif
#include <pv/sharedPtr.h>
#include <pv/status.h>
#ifdef pipelineServiceEpicsExportSharedSymbols
# define epicsExportSharedSymbols
# undef pipelineServiceEpicsExportSharedSymbols
#endif
#include <pv/pvAccess.h>
#include <shareLib.h>
namespace epics { namespace pvAccess {
class epicsShareClass PipelineControl
{
public:
POINTER_DEFINITIONS(PipelineControl);
virtual ~PipelineControl() {};
/// Number of free elements in the local queue.
/// A service can (should) full up the entire queue.
virtual size_t getFreeElementCount() = 0;
/// Total count of requested elements.
/// This is the minimum element count that a service should provide.
virtual size_t getRequestedCount() = 0;
/// Grab next free element.
/// A service should take this element, populate it with the data
/// and return it back by calling putElement().
virtual epics::pvData::MonitorElement::shared_pointer getFreeElement() = 0;
/// Put element on the local queue (an element to be sent to a client).
virtual void putElement(epics::pvData::MonitorElement::shared_pointer const & element) = 0;
/// Call to notify that there is no more data to pipelined.
/// This call destroyes corresponding pipeline session.
virtual void done() = 0;
};
class epicsShareClass PipelineSession
{
public:
POINTER_DEFINITIONS(PipelineSession);
virtual ~PipelineSession() {};
/// Returns (minimum) local queue size.
/// Actual local queue size = max( getMinQueueSize(), client queue size );
virtual size_t getMinQueueSize() const = 0;
/// Description of the structure used by this session.
virtual epics::pvData::Structure::const_shared_pointer getStructure() const = 0;
/// Request for additional (!) elementCount elements.
/// The service should eventually call PipelineControl.getFreeElement() and PipelineControl.putElement()
/// to provide [PipelineControl.getRequestedCount(), PipelineControl.getFreeElementCount()] elements.
virtual void request(PipelineControl::shared_pointer const & control, size_t elementCount) = 0;
/// Cancel the session (called by the client).
virtual void cancel() = 0;
};
class epicsShareClass PipelineService
{
public:
POINTER_DEFINITIONS(PipelineService);
virtual ~PipelineService() {};
virtual PipelineSession::shared_pointer createPipeline(
epics::pvData::PVStructure::shared_pointer const & pvRequest
) = 0;
};
}}
#endif /* PIPELINESERVICE_H */

View File

@@ -2061,16 +2061,18 @@ namespace epics {
class MonitorStrategy : public Monitor {
public:
virtual ~MonitorStrategy() {};
virtual void init(StructureConstPtr const & structure) = 0;
virtual void response(Transport::shared_pointer const & transport, ByteBuffer* payloadBuffer) = 0;
};
virtual void init(StructureConstPtr const & structure) = 0;
virtual void response(Transport::shared_pointer const & transport, ByteBuffer* payloadBuffer) = 0;
virtual void unlisten() = 0;
};
typedef vector<MonitorElement::shared_pointer> FreeElementQueue;
typedef queue<MonitorElement::shared_pointer> MonitorElementQueue;
class MonitorStrategyQueue :
public MonitorStrategy,
public TransportSender,
public std::tr1::enable_shared_from_this<MonitorStrategyQueue>
{
private:
@@ -2096,16 +2098,36 @@ namespace epics {
PVStructure::shared_pointer m_up2datePVStructure;
public:
int32 m_releasedCount;
bool m_reportQueueStateInProgress;
MonitorStrategyQueue(MonitorRequester::shared_pointer const & callback, int32 queueSize) :
// TODO check for cyclic-ref
ChannelImpl::shared_pointer m_channel;
pvAccessID m_ioid;
bool m_pipeline;
int32 m_ackAny;
bool m_unlisten;
public:
MonitorStrategyQueue(ChannelImpl::shared_pointer channel, pvAccessID ioid,
MonitorRequester::shared_pointer const & callback,
int32 queueSize,
bool pipeline, int32 ackAny) :
m_queueSize(queueSize), m_lastStructure(),
m_freeQueue(),
m_monitorQueue(),
m_callback(callback), m_mutex(),
m_bitSet1(), m_bitSet2(), m_overrunInProgress(false),
m_nullMonitorElement()
{
m_nullMonitorElement(),
m_releasedCount(0),
m_reportQueueStateInProgress(false),
m_channel(channel), m_ioid(ioid),
m_pipeline(pipeline), m_ackAny(ackAny),
m_unlisten(false)
{
if (queueSize <= 1)
throw std::invalid_argument("queueSize <= 1");
@@ -2121,6 +2143,9 @@ namespace epics {
virtual void init(StructureConstPtr const & structure) {
Lock guard(m_mutex);
m_releasedCount = 0;
m_reportQueueStateInProgress = false;
// reuse on reconnect
if (m_lastStructure.get() == 0 ||
*(m_lastStructure.get()) == *(structure.get()))
@@ -2304,12 +2329,33 @@ namespace epics {
}
}
virtual void unlisten()
{
bool notifyUnlisten = false;
{
Lock guard(m_mutex);
notifyUnlisten = m_monitorQueue.empty();
m_unlisten = !notifyUnlisten;
}
if (notifyUnlisten)
{
EXCEPTION_GUARD(m_callback->unlisten(shared_from_this()));
}
}
virtual MonitorElement::shared_pointer poll() {
Lock guard(m_mutex);
if (m_monitorQueue.empty())
if (m_monitorQueue.empty()) {
if (m_unlisten) {
m_unlisten = false;
guard.unlock();
EXCEPTION_GUARD(m_callback->unlisten(shared_from_this()));
}
return m_nullMonitorElement;
}
MonitorElement::shared_pointer retVal = m_monitorQueue.front();
m_monitorQueue.pop();
@@ -2318,6 +2364,8 @@ namespace epics {
// NOTE: a client must always call poll() after release() to check the presence of any new monitor elements
virtual void release(MonitorElement::shared_pointer const & monitorElement) {
bool sendAck = false;
{
Lock guard(m_mutex);
m_freeQueue.push_back(monitorElement);
@@ -2334,6 +2382,55 @@ namespace epics {
m_overrunElement.reset();
m_overrunInProgress = false;
}
if (m_pipeline)
{
m_releasedCount++;
if (!m_reportQueueStateInProgress && m_releasedCount >= m_ackAny)
{
sendAck = true;
m_reportQueueStateInProgress = true;
}
}
if (sendAck)
{
try
{
m_channel->checkAndGetTransport()->enqueueSendRequest(shared_from_this());
} catch (...) {
// noop (do not complain if fails)
m_reportQueueStateInProgress = false;
}
}
}
}
virtual void send(ByteBuffer* buffer, TransportSendControl* control) {
control->startMessage((int8)CMD_MONITOR, 9);
buffer->putInt(m_channel->getServerChannelID());
buffer->putInt(m_ioid);
buffer->putByte((int8)QOS_GET_PUT);
{
Lock guard(m_mutex);
buffer->putInt(m_releasedCount);
m_releasedCount = 0;
m_reportQueueStateInProgress = false;
}
// immediate send
control->flush(true);
}
virtual void lock()
{
// noop
}
virtual void unlock()
{
// noop
}
Status start() {
@@ -2377,6 +2474,10 @@ namespace epics {
std::tr1::shared_ptr<MonitorStrategy> m_monitorStrategy;
int32 m_queueSize;
bool m_pipeline;
int32 m_ackAny;
ChannelMonitorImpl(
ChannelImpl::shared_pointer const & channel,
MonitorRequester::shared_pointer const & monitorRequester,
@@ -2385,7 +2486,10 @@ namespace epics {
BaseRequestImpl(channel, monitorRequester),
m_monitorRequester(monitorRequester),
m_started(false),
m_pvRequest(pvRequest)
m_pvRequest(pvRequest),
m_queueSize(2),
m_pipeline(false),
m_ackAny(0)
{
PVACCESS_REFCOUNT_MONITOR_CONSTRUCT(channelMonitor);
}
@@ -2399,25 +2503,45 @@ namespace epics {
return;
}
int queueSize = 2;
{
PVStructurePtr pvOptions = m_pvRequest->getSubField<PVStructure>("record._options");
if(pvOptions.get()) {
PVStringPtr pvString = pvOptions->getSubField<PVString>("queueSize");
if(pvString.get()) {
PVStructurePtr pvOptions = m_pvRequest->getSubField<PVStructure>("record._options");
if (pvOptions) {
PVStringPtr pvString = pvOptions->getSubField<PVString>("queueSize");
if (pvString) {
int32 size;
std::stringstream ss;
ss << pvString->get();
ss >> size;
if (size > 1)
m_queueSize = size;
}
pvString = pvOptions->getSubField<PVString>("pipeline");
if (pvString)
m_pipeline = (pvString->get() == "true");
// pipeline options
if (m_pipeline)
{
// defaults to queueSize/2
m_ackAny = m_queueSize/2;
pvString = pvOptions->getSubField<PVString>("ackAny");
if (pvString) {
int32 size;
std::stringstream ss;
ss << pvString->get();
ss >> size;
queueSize = size;
if (size > 0)
m_ackAny = size;
}
}
}
BaseRequestImpl::activate();
if (queueSize<2) queueSize = 2;
std::tr1::shared_ptr<MonitorStrategyQueue> tp(new MonitorStrategyQueue(m_monitorRequester, queueSize));
BaseRequestImpl::activate();
std::tr1::shared_ptr<MonitorStrategyQueue> tp(
new MonitorStrategyQueue(m_channel, m_ioid, m_monitorRequester, m_queueSize,
m_pipeline, m_ackAny)
);
m_monitorStrategy = tp;
// subscribe
@@ -2430,6 +2554,16 @@ namespace epics {
}
}
// override default impl. to provide pipeline QoS flag
virtual void resubscribeSubscription(Transport::shared_pointer const & transport) {
if (transport.get() != 0 && !m_subscribed.get() &&
startRequest(m_pipeline ? (QOS_INIT | QOS_GET_PUT) : QOS_INIT))
{
m_subscribed.set();
transport->enqueueSendRequest(shared_from_this());
}
}
public:
static Monitor::shared_pointer create(
ChannelImpl::shared_pointer const & channel,
@@ -2469,6 +2603,12 @@ namespace epics {
{
// pvRequest
SerializationHelper::serializePVRequest(buffer, control, m_pvRequest);
// if streaming
if (pendingRequest & QOS_GET_PUT)
{
buffer->putInt(m_queueSize);
}
}
stopRequest();
@@ -2515,6 +2655,16 @@ namespace epics {
{
// TODO not supported by IF yet...
}
else if (qos & QOS_DESTROY)
{
// TODO for now status is ignored
if (payloadBuffer->getRemaining())
m_monitorStrategy->response(transport, payloadBuffer);
// unlisten will be called when all the elements in the queue gets processed
m_monitorStrategy->unlisten();
}
else
{
m_monitorStrategy->response(transport, payloadBuffer);

View File

@@ -1875,20 +1875,46 @@ void ServerMonitorHandler::handleResponse(osiSockAddr* responseFrom,
// create...
ServerMonitorRequesterImpl::create(_context, channel, ioid, transport, pvRequest);
// pipelining monitor (i.e. w/ flow control)
const bool ack = (QOS_GET_PUT & qosCode) != 0;
if (ack)
{
int32 nfree = payloadBuffer->getInt();
ServerMonitorRequesterImpl::shared_pointer request = static_pointer_cast<ServerMonitorRequesterImpl>(channel->getRequest(ioid));
Monitor::shared_pointer mp = request->getChannelMonitor();
PipelineMonitor* pmp = dynamic_cast<PipelineMonitor*>(mp.get());
if (pmp)
pmp->reportRemoteQueueStatus(nfree);
}
}
else
{
const bool lastRequest = (QOS_DESTROY & qosCode) != 0;
const bool get = (QOS_GET & qosCode) != 0;
const bool process = (QOS_PROCESS & qosCode) != 0;
const bool lastRequest = (QOS_DESTROY & qosCode) != 0;
const bool get = (QOS_GET & qosCode) != 0;
const bool process = (QOS_PROCESS & qosCode) != 0;
const bool ack = (QOS_GET_PUT & qosCode) != 0;
ServerMonitorRequesterImpl::shared_pointer request = static_pointer_cast<ServerMonitorRequesterImpl>(channel->getRequest(ioid));
ServerMonitorRequesterImpl::shared_pointer request = static_pointer_cast<ServerMonitorRequesterImpl>(channel->getRequest(ioid));
if (!request.get())
{
BaseChannelRequester::sendFailureMessage((int8)CMD_MONITOR, transport, ioid, qosCode, BaseChannelRequester::badIOIDStatus);
return;
}
if (ack)
{
int32 nfree = payloadBuffer->getInt();
Monitor::shared_pointer mp = request->getChannelMonitor();
PipelineMonitor* pmp = dynamic_cast<PipelineMonitor*>(mp.get());
if (pmp)
pmp->reportRemoteQueueStatus(nfree);
return;
// note: not possible to ack and destroy
}
/*
if (!request->startRequest(qosCode))
{
@@ -1929,7 +1955,7 @@ void ServerMonitorHandler::handleResponse(osiSockAddr* responseFrom,
ServerMonitorRequesterImpl::ServerMonitorRequesterImpl(
ServerContextImpl::shared_pointer const & context, ServerChannelImpl::shared_pointer const & channel,
const pvAccessID ioid, Transport::shared_pointer const & transport):
BaseChannelRequester(context, channel, ioid, transport), _channelMonitor(), _structure()
BaseChannelRequester(context, channel, ioid, transport), _channelMonitor(), _structure(), _unlisten(false)
{
}
@@ -1946,7 +1972,7 @@ MonitorRequester::shared_pointer ServerMonitorRequesterImpl::create(
void ServerMonitorRequesterImpl::activate(PVStructure::shared_pointer const & pvRequest)
{
startRequest(QOS_INIT);
startRequest(QOS_INIT);
MonitorRequester::shared_pointer thisPointer = shared_from_this();
Destroyable::shared_pointer thisDestroyable = shared_from_this();
_channel->registerRequest(_ioid, thisDestroyable);
@@ -1973,7 +1999,12 @@ void ServerMonitorRequesterImpl::monitorConnect(const Status& status, Monitor::s
void ServerMonitorRequesterImpl::unlisten(Monitor::shared_pointer const & /*monitor*/)
{
//TODO
{
Lock guard(_mutex);
_unlisten = true;
}
TransportSender::shared_pointer thisSender = shared_from_this();
_transport->enqueueSendRequest(thisSender);
}
void ServerMonitorRequesterImpl::monitorEvent(Monitor::shared_pointer const & /*monitor*/)
@@ -2091,6 +2122,23 @@ void ServerMonitorRequesterImpl::send(ByteBuffer* buffer, TransportSendControl*
TransportSender::shared_pointer thisSender = shared_from_this();
_transport->enqueueSendRequest(thisSender);
}
else
{
// TODO CAS
bool unlisten;
Lock guard(_mutex);
unlisten = _unlisten;
_unlisten = false;
guard.unlock();
if (unlisten)
{
control->startMessage((int8)CMD_MONITOR, sizeof(int32)/sizeof(int8) + 1);
buffer->putInt(_ioid);
buffer->putByte((int8)QOS_DESTROY);
Status::Ok.serialize(buffer, control);
}
}
}
}

View File

@@ -542,6 +542,7 @@ namespace pvAccess {
epics::pvData::Monitor::shared_pointer _channelMonitor;
epics::pvData::StructureConstPtr _structure;
epics::pvData::Status _status;
bool _unlisten;
};

View File

@@ -44,3 +44,7 @@ rpcWildServiceExample_SRCS += rpcWildServiceExample.cpp
PROD_HOST += rpcClientExample
rpcClientExample_SRCS += rpcClientExample.cpp
PROD_HOST += pipelineServiceExample
pipelineServiceExample_SRCS += pipelineServiceExample.cpp

View File

@@ -0,0 +1,100 @@
/**
* Copyright - See the COPYRIGHT that is included with this distribution.
* pvAccessCPP is distributed subject to a Software License Agreement found
* in file LICENSE that is included with this distribution.
*/
#include <pv/pvData.h>
#include <pv/pipelineServer.h>
using namespace epics::pvData;
using namespace epics::pvAccess;
static Structure::const_shared_pointer dataStructure =
getFieldCreate()->createFieldBuilder()->
add("count", pvInt)->
createStructure();
class PipelineSessionImpl :
public PipelineSession
{
public:
PipelineSessionImpl(
epics::pvData::PVStructure::shared_pointer const & pvRequest
) :
m_counter(0),
m_max(0)
{
PVStructure::shared_pointer pvOptions = pvRequest->getSubField<PVStructure>("record._options");
if (pvOptions) {
PVString::shared_pointer pvString = pvOptions->getSubField<PVString>("limit");
if (pvString)
{
// note: this throws an exception if conversion fails
m_max = pvString->getAs<int32>();
}
}
}
size_t getMinQueueSize() const {
return 16; //1024;
}
Structure::const_shared_pointer getStructure() const {
return dataStructure;
}
virtual void request(PipelineControl::shared_pointer const & control, size_t elementCount) {
// blocking in this call is not a good thing
// but generating a simple counter data is fast
// we will generate as much elements as we can
size_t count = control->getFreeElementCount();
for (size_t i = 0; i < count; i++) {
MonitorElement::shared_pointer element = control->getFreeElement();
element->pvStructurePtr->getSubField<PVInt>(1 /*"count"*/)->put(m_counter++);
control->putElement(element);
// we reached the limit, no more data
if (m_max != 0 && m_counter == m_max)
{
control->done();
break;
}
}
}
virtual void cancel() {
// noop, no need to clean any data-source resources
}
private:
// NOTE: all the request calls will be made from the same thread, so we do not need sync m_counter
int32 m_counter;
int32 m_max;
};
class PipelineServiceImpl :
public PipelineService
{
PipelineSession::shared_pointer createPipeline(
epics::pvData::PVStructure::shared_pointer const & pvRequest
)
{
return PipelineSession::shared_pointer(new PipelineSessionImpl(pvRequest));
}
};
int main()
{
PipelineServer server;
server.registerService("counterPipe", PipelineService::shared_pointer(new PipelineServiceImpl()));
// you can register as many services as you want here ...
server.printInfo();
server.run();
return 0;
}