multiple providers per server initial support

This commit is contained in:
Matej Sekoranja
2011-06-05 23:41:59 +02:00
parent 0985466573
commit f19ceb7793
5 changed files with 93 additions and 33 deletions

View File

@@ -46,6 +46,10 @@ namespace epics {
return Transport::shared_pointer();
}
/*
IPv4 multicast addresses are defined by the leading address bits of 1110, originating from the classful network design of the early Internet when this group of addresses was designated as Class D. The Classless Inter-Domain Routing (CIDR) prefix of this group is 224.0.0.0/4. The group includes the addresses from 224.0.0.0 to 239.255.255.255. Address assignments from within this range are specified in RFC 5771, an Internet Engineering Task Force (IETF) Best Current Practice document (BCP 51).*/
// set SO_REUSEADDR or SO_REUSEPORT, OS dependant
if (_reuseSocket)
epicsSocketEnableAddressUseForDatagramFanout(socket);

View File

@@ -137,7 +137,7 @@ void ServerIntrospectionSearchHandler::handleResponse(osiSockAddr* responseFrom,
/****************************************************************************************/
ServerSearchHandler::ServerSearchHandler(ServerContextImpl::shared_pointer const & context) :
AbstractServerResponseHandler(context, "Search request"), _provider(context->getChannelProvider())
AbstractServerResponseHandler(context, "Search request"), _providers(context->getChannelProviders())
{
}
@@ -166,28 +166,37 @@ void ServerSearchHandler::handleResponse(osiSockAddr* responseFrom,
// no name check here...
// TODO object pool!!!
ServerChannelFindRequesterImpl* pr = new ServerChannelFindRequesterImpl(_context);
pr->set(searchSequenceId, cid, responseFrom, responseRequired);
int providerCount = _providers.size();
ServerChannelFindRequesterImpl* pr = new ServerChannelFindRequesterImpl(_context, providerCount);
pr->set(name, searchSequenceId, cid, responseFrom, responseRequired);
ChannelFindRequester::shared_pointer spr(pr);
_provider->channelFind(name, spr);
for (int i = 0; i < providerCount; i++)
_providers[i]->channelFind(name, spr);
}
}
ServerChannelFindRequesterImpl::ServerChannelFindRequesterImpl(ServerContextImpl::shared_pointer const & context) :
ServerChannelFindRequesterImpl::ServerChannelFindRequesterImpl(ServerContextImpl::shared_pointer const & context,
int32 expectedResponseCount) :
_sendTo(NULL),
_context(context)
_wasFound(false),
_context(context),
_expectedResponseCount(expectedResponseCount),
_responseCount(0)
{}
void ServerChannelFindRequesterImpl::clear()
{
Lock guard(_mutex);
_sendTo = NULL;
_wasFound = false;
_responseCount = 0;
}
ServerChannelFindRequesterImpl* ServerChannelFindRequesterImpl::set(int32 searchSequenceId, int32 cid, osiSockAddr* sendTo, bool responseRequired)
ServerChannelFindRequesterImpl* ServerChannelFindRequesterImpl::set(String name, int32 searchSequenceId, int32 cid, osiSockAddr* sendTo, bool responseRequired)
{
Lock guard(_mutex);
_name = name;
_searchSequenceId = searchSequenceId;
_cid = cid;
_sendTo = sendTo;
@@ -195,12 +204,36 @@ ServerChannelFindRequesterImpl* ServerChannelFindRequesterImpl::set(int32 search
return this;
}
std::map<String, std::tr1::weak_ptr<ChannelProvider> > ServerSearchHandler::s_channelNameToProvider;
void ServerChannelFindRequesterImpl::channelFindResult(const Status& status, ChannelFind::shared_pointer const & channelFind, bool wasFound)
{
// TODO status
Lock guard(_mutex);
if (wasFound || _responseRequired)
_responseCount++;
if (_responseCount > _expectedResponseCount)
{
if ((_responseCount+1) == _expectedResponseCount)
{
errlogSevPrintf(errlogMinor,"[ServerChannelFindRequesterImpl::channelFindResult] More responses received than expected fpr channel '%s'!", _name.c_str());
}
return;
}
if (wasFound && _wasFound)
{
errlogSevPrintf(errlogMinor,"[ServerChannelFindRequesterImpl::channelFindResult] Channel '%s' is hosted by different channel providers!", _name.c_str());
return;
}
if (wasFound || (_responseRequired && (_responseCount == _expectedResponseCount)))
{
if (wasFound && _expectedResponseCount > 1)
{
ServerSearchHandler::s_channelNameToProvider[_name] = channelFind->getChannelProvider();
}
_wasFound = wasFound;
TransportSender::shared_pointer thisSender = shared_from_this();
_context->getBroadcastTransport()->enqueueSendRequest(thisSender);
@@ -271,7 +304,14 @@ void ServerCreateChannelHandler::handleResponse(osiSockAddr* responseFrom,
return;
}
ServerChannelRequesterImpl::create(_provider, transport, channelName, cid);
// TODO !!!
//ServerChannelRequesterImpl::create(_providers.at(0), transport, channelName, cid);
if (_providers.size() == 1)
ServerChannelRequesterImpl::create(_providers.at(0), transport, channelName, cid);
else
ServerChannelRequesterImpl::create(ServerSearchHandler::s_channelNameToProvider[channelName].lock(), transport, channelName, cid); // TODO !!!!
}
void ServerCreateChannelHandler::disconnect(Transport::shared_pointer const & transport)

View File

@@ -151,6 +151,9 @@ namespace epics {
class ServerSearchHandler : public AbstractServerResponseHandler
{
public:
// TODO
static std::map<String, std::tr1::weak_ptr<ChannelProvider> > s_channelNameToProvider;
ServerSearchHandler(ServerContextImpl::shared_pointer const & context);
virtual ~ServerSearchHandler();
@@ -159,7 +162,7 @@ namespace epics {
int payloadSize, epics::pvData::ByteBuffer* payloadBuffer);
private:
ChannelProvider::shared_pointer _provider;
std::vector<ChannelProvider::shared_pointer> _providers;
};
@@ -169,21 +172,24 @@ namespace epics {
public std::tr1::enable_shared_from_this<ServerChannelFindRequesterImpl>
{
public:
ServerChannelFindRequesterImpl(ServerContextImpl::shared_pointer const & context);
ServerChannelFindRequesterImpl(ServerContextImpl::shared_pointer const & context, int32 expectedResponseCount);
void clear();
ServerChannelFindRequesterImpl* set(int32 searchSequenceId, int32 cid, osiSockAddr* sendTo, boolean responseRequired);
ServerChannelFindRequesterImpl* set(String _name, int32 searchSequenceId, int32 cid, osiSockAddr* sendTo, boolean responseRequired);
void channelFindResult(const epics::pvData::Status& status, ChannelFind::shared_pointer const & channelFind, boolean wasFound);
void lock();
void unlock();
void send(epics::pvData::ByteBuffer* buffer, TransportSendControl* control);
private:
String _name;
int32 _searchSequenceId;
int32 _cid;
osiSockAddr* _sendTo;
boolean _responseRequired;
boolean _wasFound;
bool _responseRequired;
bool _wasFound;
ServerContextImpl::shared_pointer _context;
epics::pvData::Mutex _mutex;
int32 _expectedResponseCount;
int32 _responseCount;
};
/****************************************************************************************/
@@ -195,7 +201,7 @@ namespace epics {
public:
ServerCreateChannelHandler(ServerContextImpl::shared_pointer const & context) :
AbstractServerResponseHandler(context, "Create channel request") {
_provider = context->getChannelProvider();
_providers = context->getChannelProviders();
}
virtual void handleResponse(osiSockAddr* responseFrom,
@@ -204,7 +210,7 @@ namespace epics {
private:
void disconnect(Transport::shared_pointer const & transport);
ChannelProvider::shared_pointer _provider;
std::vector<ChannelProvider::shared_pointer> _providers;
};
class ServerChannelRequesterImpl :

View File

@@ -37,8 +37,8 @@ ServerContextImpl::ServerContextImpl():
_acceptor(),
_transportRegistry(),
_channelAccess(),
_channelProviderName(PVACCESS_DEFAULT_PROVIDER),
_channelProvider(),
_channelProviderNames(PVACCESS_DEFAULT_PROVIDER),
_channelProviders(),
_beaconServerStatusProvider()
{
@@ -111,8 +111,8 @@ void ServerContextImpl::loadConfiguration()
_receiveBufferSize = config->getPropertyAsInteger("EPICS4_CA_MAX_ARRAY_BYTES", _receiveBufferSize);
_receiveBufferSize = config->getPropertyAsInteger("EPICS4_CAS_MAX_ARRAY_BYTES", _receiveBufferSize);
_channelProviderName = config->getPropertyAsString("EPICS4_CA_PROVIDER_NAME", _channelProviderName);
_channelProviderName = config->getPropertyAsString("EPICS4_CAS_PROVIDER_NAME", _channelProviderName);
_channelProviderNames = config->getPropertyAsString("EPICS4_CA_PROVIDER_NAMES", _channelProviderNames);
_channelProviderNames = config->getPropertyAsString("EPICS4_CAS_PROVIDER_NAMES", _channelProviderNames);
}
void ServerContextImpl::initialize(ChannelAccess::shared_pointer const & channelAccess)
@@ -134,10 +134,20 @@ void ServerContextImpl::initialize(ChannelAccess::shared_pointer const & channel
_channelAccess = channelAccess;
_channelProvider = _channelAccess->getProvider(_channelProviderName);
if (_channelProvider == NULL)
// split comma separated names
std::stringstream ss(_channelProviderNames);
std::string providerName;
while (std::getline(ss, providerName, ',')) {
ChannelProvider::shared_pointer channelProvider = _channelAccess->getProvider(providerName);
if (channelProvider)
_channelProviders.push_back(channelProvider);
}
//_channelProvider = _channelAccess->getProvider(_channelProviderNames);
if (_channelProviders.size() == 0)
{
std::string msg = "Channel provider with name '" + _channelProviderName + "' not available.";
std::string msg = "None of the specified channel providers are available: " + _channelProviderNames + ".";
THROW_BASE_EXCEPTION(msg.c_str());
}
@@ -397,7 +407,7 @@ void ServerContextImpl::printInfo(ostream& str)
{
Lock guard(_mutex);
str << "VERSION : " << getVersion().getVersionString() << endl \
<< "CHANNEL PROVIDER : " << _channelProviderName << endl \
<< "PROVIDER_NAMES : " << _channelProviderNames << endl \
<< "BEACON_ADDR_LIST : " << _beaconAddressList << endl \
<< "AUTO_BEACON_ADDR_LIST : " << _autoBeaconAddressList << endl \
<< "BEACON_PERIOD : " << _beaconPeriod << endl \
@@ -503,19 +513,19 @@ ChannelAccess::shared_pointer ServerContextImpl::getChannelAccess()
std::string ServerContextImpl::getChannelProviderName()
{
return _channelProviderName;
return _channelProviderNames;
}
void ServerContextImpl::setChannelProviderName(std::string channelProviderName)
{
if (_state != NOT_INITIALIZED)
throw std::logic_error("must be called before initialize");
_channelProviderName = channelProviderName;
_channelProviderNames = channelProviderName;
}
ChannelProvider::shared_pointer ServerContextImpl::getChannelProvider()
std::vector<ChannelProvider::shared_pointer> ServerContextImpl::getChannelProviders()
{
return _channelProvider;
return _channelProviders;
}
Timer::shared_pointer ServerContextImpl::getTimer()

View File

@@ -269,10 +269,10 @@ public:
void setChannelProviderName(std::string providerName);
/**
* Get channel provider.
* @return channel provider.
* Get channel providers.
* @return channel providers.
*/
ChannelProvider::shared_pointer getChannelProvider();
std::vector<ChannelProvider::shared_pointer> getChannelProviders();
private:
/**
@@ -371,12 +371,12 @@ private:
/**
* Channel provider name.
*/
std::string _channelProviderName;
std::string _channelProviderNames;
/**
* Channel provider.
*/
ChannelProvider::shared_pointer _channelProvider;
std::vector<ChannelProvider::shared_pointer> _channelProviders;
/**
* Run mutex.