ChannelProviderFactory, eget support for it

This commit is contained in:
Matej Sekoranja
2013-06-04 00:32:58 +02:00
parent d9eda7d908
commit a20dd07b85
17 changed files with 372 additions and 62 deletions

View File

@@ -10,6 +10,7 @@
using namespace epics::pvData;
using namespace epics::pvAccess;
using namespace epics::pvAccess::ca;
#define EXCEPTION_GUARD(code) try { code; } \
catch (std::exception &e) { LOG(logLevelError, "Unhandled exception caught from client code at %s:%d: %s", __FILE__, __LINE__, e.what()); } \

View File

@@ -14,6 +14,7 @@
namespace epics {
namespace pvAccess {
namespace ca {
class CAChannel :
public Channel,
@@ -261,6 +262,6 @@ private:
Monitor::shared_pointer thisPointer;
};
}}
}}}
#endif /* CACHANNEL_H */

View File

@@ -10,14 +10,18 @@
/* for CA */
#include <cadef.h>
#include <epicsSignal.h>
using namespace epics::pvData;
using namespace epics::pvAccess;
using namespace epics::pvAccess::ca;
#define EXCEPTION_GUARD(code) try { code; } \
catch (std::exception &e) { LOG(logLevelError, "Unhandled exception caught from client code at %s:%d: %s", __FILE__, __LINE__, e.what()); } \
catch (...) { LOG(logLevelError, "Unhandled exception caught from client code at %s:%d.", __FILE__, __LINE__); }
String CAChannelProvider::PROVIDER_NAME = "ca";
CAChannelProvider::CAChannelProvider()
{
initialize();
@@ -29,7 +33,7 @@ CAChannelProvider::~CAChannelProvider()
epics::pvData::String CAChannelProvider::getProviderName()
{
return "ca";
return PROVIDER_NAME;
}
void CAChannelProvider::destroy()
@@ -99,12 +103,87 @@ void CAChannelProvider::initialize()
// TODO create a ca_poll thread, if ca_disable_preemptive_callback
}
namespace epics { namespace pvAccess {
ChannelProvider::shared_pointer createCAChannelProvider()
// TODO global static variable (de/initialization order not guaranteed)
static Mutex mutex;
static CAChannelProvider::shared_pointer sharedProvider;
class CAChannelProviderFactoryImpl : public ChannelProviderFactory
{
ChannelProvider::shared_pointer ptr(new CAChannelProvider());
return ptr;
public:
POINTER_DEFINITIONS(CAChannelProviderFactoryImpl);
virtual epics::pvData::String getFactoryName()
{
return CAChannelProvider::PROVIDER_NAME;
}
virtual ChannelProvider::shared_pointer sharedInstance()
{
Lock guard(mutex);
if (!sharedProvider.get())
{
try {
sharedProvider.reset(new CAChannelProvider());
} catch (std::exception &e) {
LOG(logLevelError, "Unhandled exception caught at %s:%d: %s", __FILE__, __LINE__, e.what());
} catch (...) {
LOG(logLevelError, "Unhandled exception caught at %s:%d.", __FILE__, __LINE__);
}
}
return sharedProvider;
}
virtual ChannelProvider::shared_pointer newInstance()
{
try {
return ChannelProvider::shared_pointer(new CAChannelProvider());
} catch (std::exception &e) {
LOG(logLevelError, "Unhandled exception caught at %s:%d: %s", __FILE__, __LINE__, e.what());
return ChannelProvider::shared_pointer();
} catch (...) {
LOG(logLevelError, "Unhandled exception caught at %s:%d.", __FILE__, __LINE__);
return ChannelProvider::shared_pointer();
}
}
void destroySharedInstance()
{
sharedProvider->destroy();
sharedProvider.reset();
}
};
static CAChannelProviderFactoryImpl::shared_pointer factory;
void CAClientFactory::start()
{
epicsSignalInstallSigAlarmIgnore();
epicsSignalInstallSigPipeIgnore();
Lock guard(mutex);
if (!factory.get())
factory.reset(new CAChannelProviderFactoryImpl());
registerChannelProviderFactory(factory);
}
}}
void CAClientFactory::stop()
{
Lock guard(mutex);
if (factory.get())
{
unregisterChannelProviderFactory(factory);
factory->destroySharedInstance();
}
}

View File

@@ -11,6 +11,7 @@
namespace epics {
namespace pvAccess {
namespace ca {
class CAChannelProvider :
public ChannelProvider,
@@ -18,6 +19,8 @@ class CAChannelProvider :
{
public:
static epics::pvData::String PROVIDER_NAME;
CAChannelProvider();
virtual ~CAChannelProvider();
@@ -50,9 +53,14 @@ private:
void initialize();
};
extern ChannelProvider::shared_pointer createCAChannelProvider();
class CAClientFactory
{
public:
static void start();
static void stop();
};
}}
}}}
#endif /* CAPROVIDER_H */

View File

@@ -723,6 +723,32 @@ namespace pvAccess {
};
/**
* <code>ChanneProvider</code> factory interface.
*/
class ChannelProviderFactory : private epics::pvData::NoDefaultMethods {
public:
POINTER_DEFINITIONS(ChannelProviderFactory);
/**
* Get factory name (i.e. name of the provider).
* @return the factory name.
*/
virtual epics::pvData::String getFactoryName() = 0;
/**
* Get a shared instance.
* @return a shared instance.
*/
virtual ChannelProvider::shared_pointer sharedInstance() = 0;
/**
* Create a new instance.
* @return a new instance.
*/
virtual ChannelProvider::shared_pointer newInstance() = 0;
};
/**
* Interface for locating channel providers.
*/
@@ -735,12 +761,19 @@ namespace pvAccess {
virtual ~ChannelAccess() {};
/**
* Get the provider with the specified name.
* Get a shared instance of the provider with the specified name.
* @param providerName The name of the provider.
* @return The interface for the provider or null if the provider is not known.
*/
virtual ChannelProvider::shared_pointer getProvider(epics::pvData::String const & providerName) = 0;
/**
* Creates a new instanceof the provider with the specified name.
* @param providerName The name of the provider.
* @return The interface for the provider or null if the provider is not known.
*/
virtual ChannelProvider::shared_pointer createProvider(epics::pvData::String const & providerName) = 0;
/**
* Get a array of the names of all the known providers.
* @return The names. Be sure to delete vector instance.
@@ -749,8 +782,8 @@ namespace pvAccess {
};
extern ChannelAccess::shared_pointer getChannelAccess();
extern void registerChannelProvider(ChannelProvider::shared_pointer const & channelProvider);
extern void unregisterChannelProvider(ChannelProvider::shared_pointer const & channelProvider);
extern void registerChannelProviderFactory(ChannelProviderFactory::shared_pointer const & channelProviderFactory);
extern void unregisterChannelProviderFactory(ChannelProviderFactory::shared_pointer const & channelProviderFactory);
/**
* Interface for creating request structure.

View File

@@ -21,8 +21,8 @@ static ChannelAccess::shared_pointer channelAccess;
static Mutex channelProviderMutex;
typedef std::map<String, ChannelProvider::shared_pointer> ChannelProviderMap;
static ChannelProviderMap channelProviders;
typedef std::map<String, ChannelProviderFactory::shared_pointer> ChannelProviderFactoryMap;
static ChannelProviderFactoryMap channelProviders;
class ChannelAccessImpl : public ChannelAccess {
@@ -30,13 +30,26 @@ class ChannelAccessImpl : public ChannelAccess {
ChannelProvider::shared_pointer getProvider(String const & providerName) {
Lock guard(channelProviderMutex);
return channelProviders[providerName];
ChannelProviderFactoryMap::const_iterator iter = channelProviders.find(providerName);
if (iter != channelProviders.end())
return iter->second->sharedInstance();
else
return ChannelProvider::shared_pointer();
}
ChannelProvider::shared_pointer createProvider(String const & providerName) {
Lock guard(channelProviderMutex);
ChannelProviderFactoryMap::const_iterator iter = channelProviders.find(providerName);
if (iter != channelProviders.end())
return iter->second->newInstance();
else
return ChannelProvider::shared_pointer();
}
std::auto_ptr<stringVector_t> getProviderNames() {
Lock guard(channelProviderMutex);
std::auto_ptr<stringVector_t> providers(new stringVector_t());
for (ChannelProviderMap::const_iterator i = channelProviders.begin();
for (ChannelProviderFactoryMap::const_iterator i = channelProviders.begin();
i != channelProviders.end(); i++)
providers->push_back(i->first);
@@ -54,14 +67,14 @@ ChannelAccess::shared_pointer getChannelAccess() {
return channelAccess;
}
void registerChannelProvider(ChannelProvider::shared_pointer const & channelProvider) {
void registerChannelProviderFactory(ChannelProviderFactory::shared_pointer const & channelProviderFactory) {
Lock guard(channelProviderMutex);
channelProviders[channelProvider->getProviderName()] = channelProvider;
channelProviders[channelProviderFactory->getFactoryName()] = channelProviderFactory;
}
void unregisterChannelProvider(ChannelProvider::shared_pointer const & channelProvider) {
void unregisterChannelProviderFactory(ChannelProviderFactory::shared_pointer const & channelProviderFactory) {
Lock guard(channelProviderMutex);
channelProviders.erase(channelProvider->getProviderName());
channelProviders.erase(channelProviderFactory->getFactoryName());
}
}}

View File

@@ -16,35 +16,85 @@ using namespace epics::pvData;
using namespace epics::pvAccess;
// TODO global static variable (de/initialization order not guaranteed)
static Mutex m_mutex;
static ClientContextImpl::shared_pointer m_context;
static Mutex mutex;
static ClientContextImpl::shared_pointer context;
class ChannelProviderFactoryImpl : public ChannelProviderFactory
{
public:
POINTER_DEFINITIONS(ChannelProviderFactoryImpl);
virtual epics::pvData::String getFactoryName()
{
return ClientContextImpl::PROVIDER_NAME;
}
virtual ChannelProvider::shared_pointer sharedInstance()
{
Lock guard(mutex);
if (!context.get())
{
try {
ClientContextImpl::shared_pointer lcontext = createClientContextImpl();
lcontext->initialize();
context = lcontext;
} catch (std::exception &e) {
LOG(logLevelError, "Unhandled exception caught at %s:%d: %s", __FILE__, __LINE__, e.what());
} catch (...) {
LOG(logLevelError, "Unhandled exception caught at %s:%d.", __FILE__, __LINE__);
}
}
return context->getProvider();
}
virtual ChannelProvider::shared_pointer newInstance()
{
Lock guard(mutex);
try {
ClientContextImpl::shared_pointer lcontext = createClientContextImpl();
lcontext->initialize();
return lcontext->getProvider();
} catch (std::exception &e) {
LOG(logLevelError, "Unhandled exception caught at %s:%d: %s", __FILE__, __LINE__, e.what());
return ChannelProvider::shared_pointer();
} catch (...) {
LOG(logLevelError, "Unhandled exception caught at %s:%d.", __FILE__, __LINE__);
return ChannelProvider::shared_pointer();
}
}
void destroySharedInstance()
{
Lock guard(mutex);
if (context.get())
{
context->dispose();
context.reset();
}
}
};
static ChannelProviderFactoryImpl::shared_pointer factory;
void ClientFactory::start()
{
epicsSignalInstallSigAlarmIgnore();
epicsSignalInstallSigPipeIgnore();
Lock guard(m_mutex);
if (m_context.get()) return;
try {
m_context = createClientContextImpl();
m_context->initialize();
registerChannelProvider(m_context->getProvider());
} catch (std::exception &e) {
LOG(logLevelError, "Unhandled exception caught at %s:%d: %s", __FILE__, __LINE__, e.what());
} catch (...) {
LOG(logLevelError, "Unhandled exception caught at %s:%d.", __FILE__, __LINE__);
}
Lock guard(mutex);
if (!factory.get())
factory.reset(new ChannelProviderFactoryImpl());
registerChannelProviderFactory(factory);
}
void ClientFactory::stop()
{
Lock guard(m_mutex);
if (!m_context.get()) return;
Lock guard(mutex);
unregisterChannelProvider(m_context->getProvider());
m_context->dispose();
m_context.reset();
if (factory.get())
{
unregisterChannelProviderFactory(factory);
factory->destroySharedInstance();
}
}

View File

@@ -44,6 +44,8 @@ using namespace epics::pvData;
namespace epics {
namespace pvAccess {
String ClientContextImpl::PROVIDER_NAME = "pvAccess"; // TODO to be renamed to "pva"
Status ChannelImpl::channelDestroyed = Status(Status::STATUSTYPE_WARNING, "channel destroyed");
Status ChannelImpl::channelDisconnected = Status(Status::STATUSTYPE_WARNING, "channel disconnected");
@@ -3046,7 +3048,7 @@ namespace epics {
virtual epics::pvData::String getProviderName()
{
return "pvAccess";
return PROVIDER_NAME;
}
virtual void destroy()

View File

@@ -53,6 +53,8 @@ namespace epics {
public:
POINTER_DEFINITIONS(ClientContextImpl);
static epics::pvData::String PROVIDER_NAME;
/**
* Get context implementation version.
* @return version of the context implementation.

View File

@@ -407,12 +407,46 @@ String RPCChannelProvider::PROVIDER_NAME("rpcService");
Status RPCChannelProvider::noSuchChannelStatus(Status::STATUSTYPE_ERROR, "no such channel");
class RPCChannelProviderFactory : public ChannelProviderFactory
{
public:
POINTER_DEFINITIONS(RPCChannelProviderFactory);
RPCChannelProviderFactory() :
m_channelProviderImpl(new RPCChannelProvider())
{
}
virtual epics::pvData::String getFactoryName()
{
return RPCChannelProvider::PROVIDER_NAME;
}
virtual ChannelProvider::shared_pointer sharedInstance()
{
return m_channelProviderImpl;
}
virtual ChannelProvider::shared_pointer newInstance()
{
return ChannelProvider::shared_pointer(new RPCChannelProvider());
}
private:
RPCChannelProvider::shared_pointer m_channelProviderImpl;
};
RPCServer::RPCServer()
{
m_channelProviderImpl.reset(new RPCChannelProvider());
registerChannelProvider(m_channelProviderImpl);
// TODO factory is never deregistered, multiple RPCServer instances create multiple factories, etc.
m_channelProviderFactory.reset(new RPCChannelProviderFactory());
registerChannelProviderFactory(m_channelProviderFactory);
setenv("EPICS4_CAS_PROVIDER_NAMES", m_channelProviderImpl->getProviderName().c_str(), 1);
m_channelProviderImpl = m_channelProviderFactory->sharedInstance();
setenv("EPICS_PVAS_PROVIDER_NAMES", m_channelProviderImpl->getProviderName().c_str(), 1);
m_serverContext = ServerContextImpl::create();
m_serverContext->initialize(getChannelAccess());

View File

@@ -19,8 +19,9 @@ class RPCServer {
private:
ServerContextImpl::shared_pointer m_serverContext;
ChannelProviderFactory::shared_pointer m_channelProviderFactory;
ChannelProvider::shared_pointer m_channelProviderImpl;
// TODO no thread poll implementation
public:

View File

@@ -16,7 +16,6 @@ pvAccessApp/pva/pvaVersion.cpp
pvAccessApp/pva/pvaVersion.h
pvAccessApp/client/pvAccess.cpp
pvAccessApp/client/pvAccess.h
pvAccessApp/client/pvAccess.h.orig
pvAccessApp/factory/ChannelAccessFactory.cpp
pvAccessApp/factory/CreateRequestFactory.cpp
pvAccessApp/mb/pvAccessMB.cpp

View File

@@ -2,6 +2,8 @@
#include <pv/clientFactory.h>
#include <pv/pvAccess.h>
#include <caProvider.h>
#include <stdio.h>
#include <epicsStdlib.h>
#include <epicsGetopt.h>
@@ -1270,15 +1272,16 @@ int main (int argc, char *argv[])
if (!serviceRequest)
{
vector<string> pvs;
vector<string> providerNames;
if (validURI)
{
// standard get request
// for now only pva schema is supported, without authroity
// for now no only pva/ca schema is supported, without authority
// TODO
if (uri.protocol != "pva")
if (uri.protocol != "pva" && uri.protocol != "ca")
{
std::cerr << "invalid URI scheme '" << uri.protocol << "', only 'pva' is supported" << std::endl;
std::cerr << "invalid URI scheme '" << uri.protocol << "', only 'pva' and 'ca' is supported" << std::endl;
// TODO
return 1;
}
@@ -1294,12 +1297,47 @@ int main (int argc, char *argv[])
// skip trailing '/'
pvs.push_back(uri.path.substr(1));
providerNames.push_back(uri.protocol);
}
else
{
// TODO URI support
for (int n = 0; optind < argc; n++, optind++)
pvs.push_back(argv[optind]);
{
URI uri;
bool validURI = URI::parse(argv[optind], uri);
if (validURI)
{
// TODO this is copy&pase code from above, clean it up
// for now no only pva/ca schema is supported, without authority
// TODO
if (uri.protocol != "pva" && uri.protocol != "ca")
{
std::cerr << "invalid URI scheme '" << uri.protocol << "', only 'pva' and 'ca' is supported" << std::endl;
// TODO
return 1;
}
// authority = uri.host;
if (uri.path.length() <= 1)
{
std::cerr << "invalid URI, empty path" << std::endl;
// TODO
return 1;
}
// skip trailing '/'
pvs.push_back(uri.path.substr(1));
providerNames.push_back(uri.protocol);
}
else
{
// defaults to "pva"
pvs.push_back(argv[optind]);
providerNames.push_back("pva");
}
}
}
PVStructure::shared_pointer pvRequest =
@@ -1309,15 +1347,23 @@ int main (int argc, char *argv[])
return 1;
}
// register "pva" and "ca" providers
ClientFactory::start();
ChannelProvider::shared_pointer provider = getChannelAccess()->getProvider("pvAccess");
epics::pvAccess::ca::CAClientFactory::start();
// first connect to all, this allows resource (e.g. TCP connection) sharing
vector<Channel::shared_pointer> channels(nPvs);
for (int n = 0; n < nPvs; n++)
{
shared_ptr<ChannelRequesterImpl> channelRequesterImpl(new ChannelRequesterImpl());
channels[n] = provider->createChannel(pvs[n], channelRequesterImpl);
// TODO to be removed
String providerName = providerNames[n];
if (providerName == "pva")
providerName = "pvAccess";
// TODO no privder check
channels[n] = getChannelAccess()->getProvider(providerName)->createChannel(pvs[n], channelRequesterImpl);
}
// TODO maybe unify for nPvs == 1?!
@@ -1590,6 +1636,7 @@ int main (int argc, char *argv[])
channel->destroy();
epics::pvAccess::ca::CAClientFactory::stop();
ClientFactory::stop();
}

View File

@@ -18,6 +18,8 @@
#include "pvutils.cpp"
#include <pv/caProvider.h>
using namespace std;
using namespace std::tr1;
using namespace epics::pvData;
@@ -426,7 +428,10 @@ int main (int argc, char *argv[])
ClientFactory::start();
ChannelProvider::shared_pointer provider = getChannelAccess()->getProvider("pvAccess");
//epics::pvAccess::ca::CAClientFactory::start();
//ChannelProvider::shared_pointer provider = getChannelAccess()->getProvider("ca");
// first connect to all, this allows resource (e.g. TCP connection) sharing
vector<Channel::shared_pointer> channels(nPvs);
for (int n = 0; n < nPvs; n++)

View File

@@ -19,6 +19,8 @@
#include "pvutils.cpp"
#include <pv/convert.h>
#include <pv/caProvider.h>
using namespace std;
using namespace std::tr1;
using namespace epics::pvData;
@@ -714,7 +716,7 @@ class ChannelPutRequesterImpl : public ChannelPutRequester
}
else
{
std::cerr << "[" << m_channelName << "] failed to get: " << status.toString() << std::endl;
std::cerr << "[" << m_channelName << "] failed to put: " << status.toString() << std::endl;
}
m_event->signal();
@@ -854,6 +856,9 @@ int main (int argc, char *argv[])
ClientFactory::start();
ChannelProvider::shared_pointer provider = getChannelAccess()->getProvider("pvAccess");
//epics::pvAccess::ca::CAClientFactory::start();
//ChannelProvider::shared_pointer provider = getChannelAccess()->getProvider("ca");
bool allOK = true;
try
@@ -863,7 +868,7 @@ int main (int argc, char *argv[])
// first connect
shared_ptr<ChannelRequesterImpl> channelRequesterImpl(new ChannelRequesterImpl());
Channel::shared_pointer channel = provider->createChannel(pvName, channelRequesterImpl);
if (channelRequesterImpl->waitUntilConnected(timeOut))
{
shared_ptr<ChannelPutRequesterImpl> putRequesterImpl(new ChannelPutRequesterImpl(channel->getChannelName()));

View File

@@ -1874,6 +1874,7 @@ public:
typedef std::tr1::shared_ptr<MockServerChannelProvider> shared_pointer;
typedef std::tr1::shared_ptr<const MockServerChannelProvider> const_shared_pointer;
static String PROVIDER_NAME;
MockServerChannelProvider() :
m_mockChannelFind(),
@@ -1936,7 +1937,7 @@ public:
virtual epics::pvData::String getProviderName()
{
return "local";
return PROVIDER_NAME;
}
virtual void destroy()
@@ -2020,17 +2021,41 @@ private:
auto_ptr<Thread> m_imgThread;
};
String MockServerChannelProvider::PROVIDER_NAME = "local";
class MockChannelProviderFactory : public ChannelProviderFactory
{
public:
POINTER_DEFINITIONS(MockChannelProviderFactory);
virtual epics::pvData::String getFactoryName()
{
return MockServerChannelProvider::PROVIDER_NAME;
}
virtual ChannelProvider::shared_pointer sharedInstance()
{
// no shared instance support for mock...
return newInstance();
}
virtual ChannelProvider::shared_pointer newInstance()
{
MockServerChannelProvider::shared_pointer channelProvider(new MockServerChannelProvider());
channelProvider->initialize();
return channelProvider;
}
};
static ServerContextImpl::shared_pointer ctx;
void testServer(int timeToRun)
{
MockServerChannelProvider::shared_pointer channelProvider(new MockServerChannelProvider());
channelProvider->initialize();
ChannelProvider::shared_pointer ptr = channelProvider;
registerChannelProvider(ptr);
MockChannelProviderFactory::shared_pointer factory(new MockChannelProviderFactory());
registerChannelProviderFactory(factory);
//ServerContextImpl::shared_pointer ctx = ServerContextImpl::create();
ctx = ServerContextImpl::create();
@@ -2043,7 +2068,7 @@ void testServer(int timeToRun)
ctx->destroy();
unregisterChannelProvider(ptr);
unregisterChannelProviderFactory(factory);
structureChangedListeners.clear();
structureStore.clear();

View File

@@ -63,6 +63,11 @@ public:
return ChannelProvider::shared_pointer();
}
ChannelProvider::shared_pointer createProvider(epics::pvData::String const & providerName)
{
return getProvider(providerName);
}
std::auto_ptr<stringVector_t> getProviderNames()
{
std::auto_ptr<stringVector_t> pn(new stringVector_t());