server context implementation

This commit is contained in:
Gasper Jansa
2011-01-31 22:52:25 +01:00
parent 5ee480beee
commit 327efbcf80
5 changed files with 977 additions and 10 deletions

View File

@@ -41,6 +41,7 @@ SRC_DIRS += $(PVACCESS)/server
INC += serverContext.h
INC += responseHandlers.h
LIBSRCS += responseHandlers.cpp
LIBSRCS += serverContext.cpp
SRC_DIRS += $(PVACCESS)/factory

View File

@@ -0,0 +1,510 @@
/*
* serverContext.cpp
*/
#include "serverContext.h"
#include "responseHandlers.h"
namespace epics { namespace pvAccess {
const char* ServerContextImpl::StateNames[] = { "NOT_INITIALIZED", "INITIALIZED", "RUNNING", "SHUTDOWN", "DESTROYED"};
const int32 ServerContextImpl::VERSION_MAJOR = 2;
const int32 ServerContextImpl::VERSION_MINOR = 0;
const int32 ServerContextImpl::VERSION_MAINTENANCE = 0;
const int32 ServerContextImpl::VERSION_DEVELOPMENT = 0;
const Version* ServerContextImpl::VERSION = new Version("Channel Access Server in C++", "C++",
ServerContextImpl::VERSION_MAJOR,
ServerContextImpl::VERSION_MINOR,
ServerContextImpl::VERSION_MAINTENANCE,
ServerContextImpl::VERSION_DEVELOPMENT);
ServerContextImpl::ServerContextImpl():
_state(NOT_INITIALIZED),
_beaconAddressList(""),
_ignoreAddressList(""),
_autoBeaconAddressList(true),
_beaconPeriod(15.0),
_broadcastPort(CA_BROADCAST_PORT),
_serverPort(CA_SERVER_PORT),
_receiveBufferSize(MAX_TCP_RECV),
_timer(NULL),
_broadcastTransport(NULL),
_beaconEmitter(NULL),
_acceptor(NULL),
_transportRegistry(NULL),
_channelAccess(NULL),
//TODO CAJ_DEFAULT_PROVIDER is not defined
_channelProviderName("local"),
//_channelProviderName(CAJ_DEFAULT_PROVIDER),
_channelProvider(NULL),
_beaconServerStatusProvider(NULL)
{
initializeLogger();
loadConfiguration();
}
ServerContextImpl::~ServerContextImpl()
{
}
const Version* ServerContextImpl::getVersion()
{
return ServerContextImpl::VERSION;
}
void ServerContextImpl::initializeLogger()
{
createFileLogger("serverContextImpl.log");
}
Configuration* ServerContextImpl::getConfiguration()
{
ConfigurationProvider* configurationProvider = ConfigurationFactory::getProvider();
Configuration* config = configurationProvider->getConfiguration("pvAccess-server");
if (config == NULL)
{
config = configurationProvider->getConfiguration("system");
}
return config;
}
/**
* Load configuration.
*/
void ServerContextImpl::loadConfiguration()
{
Configuration* config = getConfiguration();
_beaconAddressList = config->getPropertyAsString("EPICS4_CA_ADDR_LIST", _beaconAddressList);
_beaconAddressList = config->getPropertyAsString("EPICS4_CAS_BEACON_ADDR_LIST", _beaconAddressList);
_autoBeaconAddressList = config->getPropertyAsBoolean("EPICS4_CA_AUTO_ADDR_LIST", _autoBeaconAddressList);
_autoBeaconAddressList = config->getPropertyAsBoolean("EPICS4_CAS_AUTO_BEACON_ADDR_LIST", _autoBeaconAddressList);
_beaconPeriod = config->getPropertyAsFloat("EPICS4_CA_BEACON_PERIOD", _beaconPeriod);
_beaconPeriod = config->getPropertyAsFloat("EPICS4_CAS_BEACON_PERIOD", _beaconPeriod);
_serverPort = config->getPropertyAsInteger("EPICS4_CA_SERVER_PORT", _serverPort);
_serverPort = config->getPropertyAsInteger("EPICS4_CAS_SERVER_PORT", _serverPort);
_broadcastPort = config->getPropertyAsInteger("EPICS4_CA_BROADCAST_PORT", _broadcastPort);
_broadcastPort = config->getPropertyAsInteger("EPICS4_CAS_BROADCAST_PORT", _broadcastPort);
_receiveBufferSize = config->getPropertyAsInteger("EPICS4_CA_MAX_ARRAY_BYTES", _receiveBufferSize);
_receiveBufferSize = config->getPropertyAsInteger("EPICS4_CAS_MAX_ARRAY_BYTES", _receiveBufferSize);
_channelProviderName = config->getPropertyAsString("EPICS4_CA_PROVIDER_NAME", _channelProviderName);
_channelProviderName = config->getPropertyAsString("EPICS4_CAS_PROVIDER_NAME", _channelProviderName);
}
void ServerContextImpl::initialize(ChannelAccess* channelAccess)
{
//TODO
/*Lock guard(&_mutex);
if (channelAccess == NULL)
{
THROW_BASE_EXCEPTION("non null channelAccess expected");
}
if (_state == DESTROYED)
{
THROW_BASE_EXCEPTION("Context destroyed.");
}
else if (_state != NOT_INITIALIZED)
{
THROW_BASE_EXCEPTION("Context already initialized.");
}
_channelAccess = channelAccess;
_channelProvider = _channelAccess->getProvider(_channelProviderName);
if (_channelProvider == NULL)
{
std::string msg = "Channel provider with name '" + _channelProviderName + "' not available.";
THROW_BASE_EXCEPTION(msg.c_str());
}*/
internalInitialize();
_state = INITIALIZED;
}
void ServerContextImpl::internalInitialize()
{
_timer = new Timer("pvAccess-server timer",lowerPriority);
_transportRegistry = new TransportRegistry();
// setup broadcast UDP transport
initializeBroadcastTransport();
_acceptor = new BlockingTCPAcceptor(this, _serverPort, _receiveBufferSize);
_serverPort = _acceptor->getBindAddress()->ia.sin_port;
_beaconEmitter = new BeaconEmitter(_broadcastTransport, this);
}
void ServerContextImpl::initializeBroadcastTransport()
{
// setup UDP transport
try
{
// where to bind (listen) address
osiSockAddr listenLocalAddress;
listenLocalAddress.ia.sin_family = AF_INET;
listenLocalAddress.ia.sin_port = htons(_broadcastPort);
listenLocalAddress.ia.sin_addr.s_addr = htonl(INADDR_ANY);
// where to send address
SOCKET socket = epicsSocketCreate(AF_INET, SOCK_STREAM, IPPROTO_TCP);
InetAddrVector* broadcasts = getBroadcastAddresses(socket,_broadcastPort);
epicsSocketDestroy(socket);
BlockingUDPConnector* broadcastConnector = new BlockingUDPConnector(true, true);
_broadcastTransport = static_cast<BlockingUDPTransport*>(broadcastConnector->connect(
NULL, new ServerResponseHandler(this),
listenLocalAddress, CA_MINOR_PROTOCOL_REVISION,
CA_DEFAULT_PRIORITY));
_broadcastTransport->setBroadcastAddresses(broadcasts);
// set ignore address list
if (_ignoreAddressList.length() > 0)
{
// we do not care about the port
InetAddrVector* list = getSocketAddressList(_ignoreAddressList, 0, NULL);
if (list != NULL && list->size() > 0)
{
_broadcastTransport->setIgnoredAddresses(list);
}
}
// set broadcast address list
if (_beaconAddressList.length() > 0)
{
// if auto is true, add it to specified list
InetAddrVector* appendList = NULL;
if (_autoBeaconAddressList == true)
{
appendList = _broadcastTransport->getSendAddresses();
}
InetAddrVector* list = getSocketAddressList(_beaconAddressList, _broadcastPort, appendList);
if (list != NULL && list->size() > 0)
{
_broadcastTransport->setBroadcastAddresses(list);
}
}
_broadcastTransport->start();
}
catch (...)
{
THROW_BASE_EXCEPTION("Failed to initialize broadcast UDP transport");
}
}
void ServerContextImpl::run(int32 seconds)
{
if (seconds < 0)
{
THROW_BASE_EXCEPTION("seconds cannot be negative.");
}
{
Lock guard(&_mutex);
if (_state == NOT_INITIALIZED)
{
THROW_BASE_EXCEPTION("Context not initialized.");
}
else if (_state == DESTROYED)
{
THROW_BASE_EXCEPTION("Context destroyed.");
}
else if (_state == RUNNING)
{
THROW_BASE_EXCEPTION("Context is already running.");
}
else if (_state == SHUTDOWN)
{
THROW_BASE_EXCEPTION("Context was shutdown.");
}
_state = RUNNING;
}
// run...
_beaconEmitter->start();
//TODO review how is with guards
if(seconds == 0)
{
_runEvent.wait();
}
else
{
_runEvent.wait(seconds);
}
{
Lock guard(&_mutex);
_state = SHUTDOWN;
}
}
void ServerContextImpl::shutdown()
{
Lock guard(&_mutex);
if(_state == DESTROYED)
{
THROW_BASE_EXCEPTION("Context already destroyed.");
}
// notify to stop running...
_runEvent.signal();
}
void ServerContextImpl::destroy()
{
Lock guard(&_mutex);
if (_state == DESTROYED)
{
THROW_BASE_EXCEPTION("Context already destroyed.");
}
// shutdown if not already
shutdown();
// go into destroyed state ASAP
_state = DESTROYED;
internalDestroy();
}
void ServerContextImpl::internalDestroy()
{
// stop responding to search requests
if (_broadcastTransport != NULL)
{
_broadcastTransport->close(true);
}
// stop accepting connections
if (_acceptor != NULL)
{
_acceptor->destroy();
}
// stop emitting beacons
if (_beaconEmitter != NULL)
{
_beaconEmitter->destroy();
}
// stop timer
if (_timer != NULL)
{
//TODO there is no stop in Timer
// _timer->stop();
}
// this will also destroy all channels
destroyAllTransports();
}
void ServerContextImpl::destroyAllTransports()
{
// not initialized yet
if (_transportRegistry == NULL)
{
return;
}
int32 size;
Transport** transports = _transportRegistry->toArray(size);
if (size == 0)
{
return;
}
errlogSevPrintf(errlogMajor, "Server context still has %d transport(s) active and closing...", size);
for (int i = 0; i < size; i++)
{
Transport* transport = transports[i];
try
{
transport->close(true);
}
catch (std::exception &e)
{
// do all exception safe, log in case of an error
errlogSevPrintf(errlogMajor, "Unhandled exception caught from client code at %s:%d: %s", __FILE__, __LINE__, e.what());
}
catch (...)
{
// do all exception safe, log in case of an error
errlogSevPrintf(errlogMajor, "Unhandled exception caught from client code at %s:%d.", __FILE__, __LINE__);
}
}
delete[] transports;
}
void ServerContextImpl::printInfo()
{
printInfo(cout);
}
void ServerContextImpl::printInfo(ostream& str)
{
Lock guard(&_mutex);
str << "VERSION : " << getVersion()->getVersionString() << endl \
<< "CHANNEL PROVIDER : " << _channelProviderName << endl \
<< "BEACON_ADDR_LIST : " << _beaconAddressList << endl \
<< "AUTO_BEACON_ADDR_LIST : " << _autoBeaconAddressList << endl \
<< "BEACON_PERIOD : " << _beaconPeriod << endl \
<< "BROADCAST_PORT : " << _broadcastPort << endl \
<< "SERVER_PORT : " << _serverPort << endl \
<< "RCV_BUFFER_SIZE : " << _receiveBufferSize << endl \
<< "IGNORE_ADDR_LIST: " << _ignoreAddressList << endl \
<< "STATE : " << ServerContextImpl::StateNames[_state] << endl;
}
void ServerContextImpl::dispose()
{
try
{
destroy();
}
catch(...)
{
// noop
}
}
void ServerContextImpl::setBeaconServerStatusProvider(BeaconServerStatusProvider* beaconServerStatusProvider)
{
_beaconServerStatusProvider = beaconServerStatusProvider;
}
bool ServerContextImpl::isInitialized()
{
Lock guard(&_mutex);
return _state == INITIALIZED || _state == RUNNING || _state == SHUTDOWN;
}
bool ServerContextImpl::isDestroyed()
{
Lock guard(&_mutex);
return _state == DESTROYED;
}
std::string ServerContextImpl::getBeaconAddressList()
{
return _beaconAddressList;
}
bool ServerContextImpl::isAutoBeaconAddressList()
{
return _autoBeaconAddressList;
}
float ServerContextImpl::getBeaconPeriod()
{
return _beaconPeriod;
}
int32 ServerContextImpl::getReceiveBufferSize()
{
return _receiveBufferSize;
}
int32 ServerContextImpl::getServerPort()
{
return _serverPort;
}
void ServerContextImpl::setServerPort(int32 port)
{
_serverPort = port;
}
int32 ServerContextImpl::getBroadcastPort()
{
return _broadcastPort;
}
std::string ServerContextImpl::getIgnoreAddressList()
{
return _ignoreAddressList;
}
BeaconServerStatusProvider* ServerContextImpl::getBeaconServerStatusProvider()
{
return _beaconServerStatusProvider;
}
osiSockAddr* ServerContextImpl::getServerInetAddress()
{
if(_acceptor != NULL)
{
return _acceptor->getBindAddress();
}
return NULL;
}
BlockingUDPTransport* ServerContextImpl::getBroadcastTransport()
{
return _broadcastTransport;
}
ChannelAccess* ServerContextImpl::getChannelAccess()
{
return _channelAccess;
}
std::string ServerContextImpl::getChannelProviderName()
{
return _channelProviderName;
}
ChannelProvider* ServerContextImpl::getChannelProvider()
{
return _channelProvider;
}
Timer* ServerContextImpl::getTimer()
{
return _timer;
}
TransportRegistry* ServerContextImpl::getTransportRegistry()
{
return _transportRegistry;
}
//TODO what with this?
Channel* ServerContextImpl::getChannel(pvAccessID id)
{
return NULL;
}
//TODO what with this?
Transport* ServerContextImpl::getSearchTransport()
{
return NULL;
}
}
}

View File

@@ -1,23 +1,447 @@
/*
* serverContext.h
*
* Created on: Jan 4, 2011
* Author: Miha Vitorovic
*/
#ifndef SERVERCONTEXT_H_
#define SERVERCONTEXT_H_
#include "remote.h"
#include "beaconServerStatusProvider.h"
#include "caConstants.h"
#include "version.h"
#include "pvAccess.h"
#include "logger.h"
#include "blockingUDP.h"
#include "blockingTCP.h"
#include "beaconEmitter.h"
#include "logger.h"
#include <errlog.h>
namespace epics {
namespace pvAccess {
namespace pvAccess {
class ServerContextImpl : public Context {
/**
* The class representing a CA Server context.
*/
class ServerContext
{
public:
/**
* Destructor
*/
virtual ~ServerContext() {};
/**
* Get context implementation version.
* @return version of the context implementation.
*/
virtual const Version* getVersion() = 0;
};
/**
* Set <code>ChannelAccess</code> implementation and initialize server.
* @param channelAccess implementation of channel access to be served.
*/
virtual void initialize(ChannelAccess* channelAccess) = 0;
}
/**
* Run server (process events).
* @param seconds time in seconds the server will process events (method will block), if <code>0</code>
* the method would block until <code>destroy()</code> is called.
* @throws BaseException if server is already destroyed.
*/
virtual void run(int32 seconds) = 0;
/**
* Shutdown (stop executing run() method) of this context.
* After shutdown Context cannot be rerun again, destroy() has to be called to clear all used resources.
* @throws BaseException if the context has been destroyed.
*/
virtual void shutdown() = 0;
/**
* Clear all resources attached to this context.
* @throws BaseException if the context has been destroyed.
*/
virtual void destroy() = 0;
/**
* Prints detailed information about the context to the standard output stream.
*/
virtual void printInfo() = 0;
/**
* Prints detailed information about the context to the specified output stream.
* @param str stream to which to print the info
*/
virtual void printInfo(ostream& str) = 0;
/**
* Dispose (destroy) server context.
* This calls <code>destroy()</code> and silently handles all exceptions.
*/
virtual void dispose() = 0;
// ************************************************************************** //
// **************************** [ Plugins ] ********************************* //
// ************************************************************************** //
/**
* Set beacon server status provider.
* @param beaconServerStatusProvider <code>BeaconServerStatusProvider</code> implementation to set.
*/
virtual void setBeaconServerStatusProvider(BeaconServerStatusProvider* beaconServerStatusProvider) = 0;
};
class BeaconEmitter;
class ServerContextImpl : public ServerContext, public Context
{
public:
/**
* Constructor.
*/
ServerContextImpl();
virtual ~ServerContextImpl();
//**************** derived from ServerContext ****************//
const Version* getVersion();
void initialize(ChannelAccess* channelAccess);
void run(int32 seconds);
void shutdown();
void destroy();
void printInfo();
void printInfo(ostream& str);
void dispose();
void setBeaconServerStatusProvider(BeaconServerStatusProvider* beaconServerStatusProvider);
//**************** derived from Context ****************//
/**
* Get timer.
* @return timer.
*/
Timer* getTimer();
Channel* getChannel(pvAccessID id);
Transport* getSearchTransport();
Configuration* getConfiguration();
/**
* Get CA transport (virtual circuit) registry.
* @return CA transport (virtual circuit) registry.
*/
TransportRegistry* getTransportRegistry();
/**
* Version.
*/
static const Version* VERSION;
/**
* Server state enum.
*/
enum State {
/**
* State value of non-initialized context.
*/
NOT_INITIALIZED,
/**
* State value of initialized context.
*/
INITIALIZED,
/**
* State value of running context.
*/
RUNNING,
/**
* State value of shutdown (once running) context.
*/
SHUTDOWN,
/**
* State value of destroyed context.
*/
DESTROYED
};
/**
* Names of the enum <code>State</code>
*/
static const char* StateNames[];
/**
* Get initialization status.
* @return initialization status.
*/
bool isInitialized();
/**
* Get destruction status.
* @return destruction status.
*/
bool isDestroyed();
/**
* Get beacon address list.
* @return beacon address list.
*/
std::string getBeaconAddressList();
/**
* Get beacon address list auto flag.
* @return beacon address list auto flag.
*/
bool isAutoBeaconAddressList();
/**
* Get beacon period (in seconds).
* @return beacon period (in seconds).
*/
float getBeaconPeriod();
/**
* Get receiver buffer (payload) size.
* @return max payload size.
*/
int32 getReceiveBufferSize();
/**
* Get server port.
* @return server port.
*/
int32 getServerPort();
/**
* Set server port number.
* @param port new server port number.
*/
void setServerPort(int32 port);
/**
* Get broadcast port.
* @return broadcast port.
*/
int32 getBroadcastPort();
/**
* Get ignore search address list.
* @return ignore search address list.
*/
std::string getIgnoreAddressList();
/**
* Get registered beacon server status provider.
* @return registered beacon server status provider.
*/
BeaconServerStatusProvider* getBeaconServerStatusProvider();
/**
* Get server newtwork (IP) address.
* @return server network (IP) address, <code>NULL</code> if not bounded.
*/
osiSockAddr* getServerInetAddress();
/**
* Broadcast transport.
* @return broadcast transport.
*/
BlockingUDPTransport* getBroadcastTransport();
/**
* Get channel access implementation.
* @return channel access implementation.
*/
ChannelAccess* getChannelAccess();
/**
* Get channel provider name.
* @return channel provider name.
*/
std::string getChannelProviderName();
/**
* Get channel provider.
* @return channel provider.
*/
ChannelProvider* getChannelProvider();
private:
//TODO check protected members in java
/**
* Major version.
*/
static const int32 VERSION_MAJOR;
/**
* Minor version.
*/
static const int32 VERSION_MINOR;
/**
* Maintenance version.
*/
static const int32 VERSION_MAINTENANCE;
/**
* Development version.
*/
static const int32 VERSION_DEVELOPMENT;
/**
* Initialization status.
*/
volatile State _state;
/**
* A space-separated list of broadcast address which to send beacons.
* Each address must be of the form: ip.number:port or host.name:port
*/
std::string _beaconAddressList;
/**
* A space-separated list of address from which to ignore name resolution requests.
* Each address must be of the form: ip.number:port or host.name:port
*/
std::string _ignoreAddressList;
/**
* Define whether or not the network interfaces should be discovered at runtime.
*/
bool _autoBeaconAddressList;
/**
* Period in second between two beacon signals.
*/
float _beaconPeriod;
/**
* Broadcast port number to listen to.
*/
int32 _broadcastPort;
/**
* Port number for the server to listen to.
*/
int32 _serverPort;
/**
* Length in bytes of the maximum buffer (payload) size that may pass through CA.
*/
int32 _receiveBufferSize;
/**
* Timer.
*/
Timer* _timer;
/**
* Reactor.
*/
//Reactor _reactor;
/**
* Leader/followers thread pool.
*/
//LeaderFollowersThreadPool _leaderFollowersThreadPool;
/**
* Broadcast transport needed for channel searches.
*/
BlockingUDPTransport* _broadcastTransport;
/**
* Beacon emitter.
*/
BeaconEmitter* _beaconEmitter;
/**
* CAS acceptor (accepts CA virtual circuit).
*/
BlockingTCPAcceptor* _acceptor;
/**
* CA transport (virtual circuit) registry.
* This registry contains all active transports - connections to CA servers.
*/
TransportRegistry* _transportRegistry;
/**
* Channel access.
*/
ChannelAccess* _channelAccess;
/**
* Channel provider name.
*/
std::string _channelProviderName;
/**
* Channel provider.
*/
ChannelProvider* _channelProvider;
/**
* Run mutex.
*/
Mutex _mutex;
/**
* Run event.
*/
Event _runEvent;
/**
* Beacon server status provider interface (optional).
*/
BeaconServerStatusProvider* _beaconServerStatusProvider;
/**
* Initialize logger.
*/
void initializeLogger();
/**
* Load configuration.
*/
void loadConfiguration();
/**
* Check context state.
* @throws BaseExeption if state is <code>DESTROYED</code>
*/
inline void checkState()
{
if (_state == DESTROYED)
{
THROW_BASE_EXCEPTION("Context destroyed.");
}
}
/**
* Internal initialization.
*/
void internalInitialize();
/**
* Initialize broadcast DP transport (broadcast socket and repeater connection).
*/
void initializeBroadcastTransport();
/**
* Internal destroy.
*/
void internalDestroy();
/**
* Destroy all transports.
*/
void destroyAllTransports();
};
}
}

View File

@@ -10,9 +10,9 @@ PROD_HOST += testBlockingUDPClnt
testBlockingUDPClnt_SRCS += testBlockingUDPClnt.cpp
testBlockingUDPClnt_LIBS += pvData pvAccess Com
PROD_HOST += testRemoteClientImpl
testRemoteClientImpl_SRCS += testRemoteClientImpl.cpp
testRemoteClientImpl_LIBS += pvData pvAccess Com
#PROD_HOST += testRemoteClientImpl
#testRemoteClientImpl_SRCS += testRemoteClientImpl.cpp
#testRemoteClientImpl_LIBS += pvData pvAccess Com
PROD_HOST += testBeaconEmitter
testBeaconEmitter_SRCS += testBeaconEmitter.cpp
@@ -34,6 +34,9 @@ PROD_HOST += testBlockingTCPClnt
testBlockingTCPClnt_SRCS += testBlockingTCPClnt.cpp
testBlockingTCPClnt_LIBS += pvData pvAccess Com
PROD_HOST += testServerContext
testServerContext_SRCS += testServerContext.cpp
testServerContext_LIBS += pvData pvAccess Com
include $(TOP)/configure/RULES
#----------------------------------------

View File

@@ -0,0 +1,29 @@
/*
* testServerContext.cpp
*/
#include "serverContext.h"
using namespace epics::pvAccess;
using namespace epics::pvData;
using namespace std;
void testServerContext()
{
ServerContextImpl ctx;
ctx.initialize(NULL);
ctx.printInfo();
ctx.run(1);
ctx.destroy();
}
int main(int argc, char *argv[])
{
testServerContext();
return (0);
}