client: multiple NIF support
This commit is contained in:
@@ -22,7 +22,7 @@ namespace epics {
|
||||
namespace pvAccess {
|
||||
|
||||
void AbstractResponseHandler::handleResponse(osiSockAddr* responseFrom,
|
||||
Transport::shared_pointer const & /*transport*/, int8 version, int8 command,
|
||||
Transport::shared_pointer const & transport, int8 version, int8 command,
|
||||
size_t payloadSize, ByteBuffer* payloadBuffer) {
|
||||
if(_debugLevel >= 3) { // TODO make a constant of sth (0 - off, 1 - debug, 2 - more/trace, 3 - messages)
|
||||
char ipAddrStr[48];
|
||||
@@ -30,7 +30,7 @@ namespace epics {
|
||||
|
||||
ostringstream prologue;
|
||||
prologue<<"Message [0x"<<hex<<(int)command<<", v0x"<<hex;
|
||||
prologue<<(int)version<<"] received from "<<ipAddrStr;
|
||||
prologue<<(int)version<<"] received from "<<ipAddrStr<<" on "<<transport->getRemoteName();
|
||||
|
||||
hexDump(prologue.str(), _description,
|
||||
(const int8*)payloadBuffer->getArray(),
|
||||
|
||||
@@ -390,8 +390,11 @@ inline int sendto(int s, const char *buf, size_t len, int flags, const struct so
|
||||
(target == inetAddressType_broadcast_multicast && _isSendAddressUnicast[i]))
|
||||
continue;
|
||||
|
||||
LOG(logLevelDebug, "Sending to %d bytes to %s.",
|
||||
buffer->getRemaining(), inetAddressToString((*_sendAddresses)[i]).c_str());
|
||||
if (IS_LOGGABLE(logLevelDebug))
|
||||
{
|
||||
LOG(logLevelDebug, "Sending %d bytes to %s.",
|
||||
buffer->getRemaining(), inetAddressToString((*_sendAddresses)[i]).c_str());
|
||||
}
|
||||
|
||||
int retval = sendto(_channel, buffer->getArray(),
|
||||
buffer->getLimit(), 0, &((*_sendAddresses)[i].sa),
|
||||
|
||||
@@ -2914,7 +2914,7 @@ namespace epics {
|
||||
// to do the local multicast
|
||||
|
||||
//
|
||||
// locally broadcast if unicast (qosCode & 0x80 == 0x80)
|
||||
// locally broadcast if unicast (qosCode & 0x80 == 0x80) via UDP
|
||||
//
|
||||
if ((qosCode & 0x80) == 0x80)
|
||||
{
|
||||
@@ -2923,11 +2923,8 @@ namespace epics {
|
||||
if (!context)
|
||||
return;
|
||||
|
||||
BlockingUDPTransport::shared_pointer bt =
|
||||
//context->getLocalMulticastTransport();
|
||||
std::tr1::dynamic_pointer_cast<BlockingUDPTransport>(context->getSearchTransport());
|
||||
|
||||
if (bt)
|
||||
BlockingUDPTransport::shared_pointer bt = dynamic_pointer_cast<BlockingUDPTransport>(transport);
|
||||
if (bt && bt->hasLocalMulticastAddress())
|
||||
{
|
||||
// clear unicast flag
|
||||
payloadBuffer->put(startPosition+4, (int8)(qosCode & ~0x80));
|
||||
@@ -2938,7 +2935,7 @@ namespace epics {
|
||||
|
||||
payloadBuffer->setPosition(payloadBuffer->getLimit()); // send will call flip()
|
||||
|
||||
bt->send(payloadBuffer, context->getLocalBroadcastAddress());
|
||||
bt->send(payloadBuffer, bt->getLocalMulticastAddress());
|
||||
return;
|
||||
}
|
||||
}
|
||||
@@ -4417,11 +4414,6 @@ namespace epics {
|
||||
PVACCESS_REFCOUNT_MONITOR_DESTRUCT(remoteClientContext);
|
||||
};
|
||||
|
||||
virtual const osiSockAddr& getLocalBroadcastAddress() const
|
||||
{
|
||||
return m_localBroadcastAddress;
|
||||
}
|
||||
|
||||
private:
|
||||
|
||||
void loadConfiguration() {
|
||||
@@ -4468,119 +4460,228 @@ namespace epics {
|
||||
*/
|
||||
bool initializeUDPTransport() {
|
||||
|
||||
// where to bind (listen) address
|
||||
osiSockAddr listenLocalAddress;
|
||||
listenLocalAddress.ia.sin_family = AF_INET;
|
||||
listenLocalAddress.ia.sin_port = htons(m_broadcastPort);
|
||||
listenLocalAddress.ia.sin_addr.s_addr = htonl(INADDR_ANY);
|
||||
|
||||
// quary broadcast addresses of all IFs
|
||||
// query broadcast addresses of all IFs
|
||||
SOCKET socket = epicsSocketCreate(AF_INET, SOCK_DGRAM, 0);
|
||||
if (socket == INVALID_SOCKET) return false;
|
||||
auto_ptr<InetAddrVector> broadcastAddresses(getBroadcastAddresses(socket, m_broadcastPort));
|
||||
|
||||
int ifIndex = discoverInterfaceIndex(socket, &listenLocalAddress);
|
||||
if (ifIndex == -1)
|
||||
if (socket == INVALID_SOCKET)
|
||||
{
|
||||
LOG(logLevelWarn, "Unable to find interface index for %s.", inetAddressToString(listenLocalAddress, false).c_str());
|
||||
// TODO fallback
|
||||
LOG(logLevelError, "Failed to initialize broadcast UDP transport.");
|
||||
return false;
|
||||
}
|
||||
|
||||
IfaceNodeVector ifaceList;
|
||||
if (discoverInterfaces(ifaceList, socket, 0) || ifaceList.size() == 0)
|
||||
{
|
||||
LOG(logLevelError, "Failed to initialize broadcast UDP transport, no interfaces available.");
|
||||
return false;
|
||||
}
|
||||
|
||||
epicsSocketDestroy (socket);
|
||||
|
||||
// set broadcast address list
|
||||
if (!m_addressList.empty())
|
||||
{
|
||||
// if auto is true, add it to specified list
|
||||
InetAddrVector* appendList = 0;
|
||||
if (m_autoAddressList)
|
||||
appendList = broadcastAddresses.get();
|
||||
|
||||
auto_ptr<InetAddrVector> list(getSocketAddressList(m_addressList, m_broadcastPort, appendList));
|
||||
if (list.get() && list->size()) {
|
||||
// delete old list and take ownership of a new one
|
||||
broadcastAddresses = list;
|
||||
}
|
||||
}
|
||||
|
||||
if (!broadcastAddresses.get() || !broadcastAddresses->size())
|
||||
LOG(logLevelWarn,
|
||||
"No broadcast addresses found or specified!");
|
||||
else
|
||||
for (size_t i = 0; i < broadcastAddresses->size(); i++)
|
||||
LOG(logLevelDebug,
|
||||
"Broadcast address #%d: %s.", i, inetAddressToString((*broadcastAddresses)[i]).c_str());
|
||||
|
||||
ClientContextImpl::shared_pointer thisPointer = shared_from_this();
|
||||
|
||||
//ClientContextImpl::shared_pointer thisPointer = shared_from_this();
|
||||
TransportClient::shared_pointer nullTransportClient;
|
||||
|
||||
auto_ptr<BlockingUDPConnector> broadcastConnector(new BlockingUDPConnector(false, true, true));
|
||||
m_broadcastTransport = static_pointer_cast<BlockingUDPTransport>(broadcastConnector->connect(
|
||||
nullTransportClient, m_responseHandler,
|
||||
listenLocalAddress, PVA_PROTOCOL_REVISION,
|
||||
PVA_DEFAULT_PRIORITY));
|
||||
if (!m_broadcastTransport.get())
|
||||
return false;
|
||||
m_broadcastTransport->setSendAddresses(broadcastAddresses.get());
|
||||
|
||||
// undefined address
|
||||
osiSockAddr undefinedAddress;
|
||||
undefinedAddress.ia.sin_family = AF_INET;
|
||||
undefinedAddress.ia.sin_port = htons(0);
|
||||
undefinedAddress.ia.sin_addr.s_addr = htonl(INADDR_ANY);
|
||||
osiSockAddr anyAddress;
|
||||
anyAddress.ia.sin_family = AF_INET;
|
||||
anyAddress.ia.sin_port = htons(0);
|
||||
anyAddress.ia.sin_addr.s_addr = htonl(INADDR_ANY);
|
||||
|
||||
auto_ptr<BlockingUDPConnector> searchConnector(new BlockingUDPConnector(false, false, true));
|
||||
m_searchTransport = static_pointer_cast<BlockingUDPTransport>(searchConnector->connect(
|
||||
m_searchTransport = static_pointer_cast<BlockingUDPTransport>(broadcastConnector->connect(
|
||||
nullTransportClient, m_responseHandler,
|
||||
undefinedAddress, PVA_PROTOCOL_REVISION,
|
||||
anyAddress, PVA_PROTOCOL_REVISION,
|
||||
PVA_DEFAULT_PRIORITY));
|
||||
if (!m_searchTransport.get())
|
||||
return false;
|
||||
m_searchTransport->setSendAddresses(broadcastAddresses.get());
|
||||
|
||||
// TODO do not use searchBroadcast in future
|
||||
// setup local broadcasting
|
||||
// TODO configurable local NIF, address
|
||||
osiSockAddr loAddr;
|
||||
getLoopbackNIF(loAddr, "", 0);
|
||||
if (true)
|
||||
|
||||
|
||||
|
||||
InetAddrVector autoBCastAddr;
|
||||
for (IfaceNodeVector::const_iterator iter = ifaceList.begin(); iter != ifaceList.end(); iter++)
|
||||
{
|
||||
try
|
||||
ifaceNode node = *iter;
|
||||
|
||||
if (node.ifaceBCast.ia.sin_family != AF_UNSPEC)
|
||||
{
|
||||
int lastAddr = 128 + ifIndex;
|
||||
std::ostringstream o;
|
||||
// TODO configurable
|
||||
o << "224.0.0." << lastAddr;
|
||||
|
||||
//osiSockAddr group;
|
||||
aToIPAddr(o.str().c_str(), m_broadcastPort, &m_localBroadcastAddress.ia);
|
||||
m_broadcastTransport->join(m_localBroadcastAddress, loAddr);
|
||||
|
||||
// NOTE: this disables usage of multicast addresses in EPICS_PVA_ADDR_LIST
|
||||
m_searchTransport->setMutlicastNIF(loAddr, true);
|
||||
|
||||
//InetAddrVector sendAddressList;
|
||||
//sendAddressList.push_back(group);
|
||||
//m_searchTransport->setSendAddresses(&sendAddressList);
|
||||
|
||||
LOG(logLevelDebug, "Local multicast enabled on %s using network interface %s.",
|
||||
inetAddressToString(m_localBroadcastAddress).c_str(), inetAddressToString(loAddr, false).c_str());
|
||||
node.ifaceBCast.ia.sin_port = htons(m_broadcastPort);
|
||||
autoBCastAddr.push_back(node.ifaceBCast);
|
||||
}
|
||||
catch (std::exception& ex)
|
||||
}
|
||||
|
||||
//
|
||||
// set beacon (broadcast) address list
|
||||
//
|
||||
|
||||
|
||||
if (!m_addressList.empty())
|
||||
{
|
||||
// if auto is true, add it to specified list
|
||||
if (!m_autoAddressList)
|
||||
autoBCastAddr.clear();
|
||||
|
||||
auto_ptr<InetAddrVector> list(getSocketAddressList(m_addressList, m_broadcastPort, &autoBCastAddr));
|
||||
if (list.get() && list->size())
|
||||
{
|
||||
LOG(logLevelDebug, "Failed to initialize local multicast, funcionality disabled. Reason: %s.", ex.what());
|
||||
m_searchTransport->setSendAddresses(list.get());
|
||||
}
|
||||
else // TODO or no fallback at all
|
||||
{
|
||||
// fallback
|
||||
// set default (auto) address list
|
||||
m_searchTransport->setSendAddresses(&autoBCastAddr);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG(logLevelDebug, "Failed to detect a loopback network interface, local multicast disabled.");
|
||||
// fallback
|
||||
// set default (auto) address list
|
||||
m_searchTransport->setSendAddresses(&autoBCastAddr);
|
||||
}
|
||||
|
||||
// become active
|
||||
m_broadcastTransport->start();
|
||||
m_searchTransport->start();
|
||||
|
||||
// debug output for broadcast addresses
|
||||
InetAddrVector* blist = m_searchTransport->getSendAddresses();
|
||||
if (!blist || !blist->size())
|
||||
LOG(logLevelWarn,
|
||||
"No broadcast addresses found or specified!");
|
||||
else
|
||||
for (size_t i = 0; i < blist->size(); i++)
|
||||
LOG(logLevelDebug,
|
||||
"Broadcast address #%d: %s.", i, inetAddressToString((*blist)[i]).c_str());
|
||||
|
||||
|
||||
// TODO configurable local NIF, address
|
||||
osiSockAddr loAddr;
|
||||
getLoopbackNIF(loAddr, "", 0);
|
||||
|
||||
for (IfaceNodeVector::const_iterator iter = ifaceList.begin(); iter != ifaceList.end(); iter++)
|
||||
{
|
||||
ifaceNode node = *iter;
|
||||
|
||||
LOG(logLevelDebug, "Setting up UDP for interface %s, broadcast %s, index %d.",
|
||||
inetAddressToString(node.ifaceAddr, false).c_str(),
|
||||
inetAddressToString(node.ifaceBCast, false).c_str(),
|
||||
node.ifaceIndex);
|
||||
try
|
||||
{
|
||||
// where to bind (listen) address
|
||||
// TODO opt copy
|
||||
osiSockAddr listenLocalAddress;
|
||||
listenLocalAddress.ia.sin_family = AF_INET;
|
||||
listenLocalAddress.ia.sin_port = htons(m_broadcastPort);
|
||||
listenLocalAddress.ia.sin_addr.s_addr = node.ifaceAddr.ia.sin_addr.s_addr;
|
||||
|
||||
BlockingUDPTransport::shared_pointer transport = static_pointer_cast<BlockingUDPTransport>(broadcastConnector->connect(
|
||||
nullTransportClient, m_responseHandler,
|
||||
listenLocalAddress, PVA_PROTOCOL_REVISION,
|
||||
PVA_DEFAULT_PRIORITY));
|
||||
listenLocalAddress = *transport->getRemoteAddress();
|
||||
|
||||
BlockingUDPTransport::shared_pointer transport2;
|
||||
|
||||
if(node.ifaceBCast.ia.sin_family == AF_UNSPEC ||
|
||||
node.ifaceBCast.ia.sin_addr.s_addr == listenLocalAddress.ia.sin_addr.s_addr) {
|
||||
LOG(logLevelWarn, "Unable to find broadcast address of interface %s.", inetAddressToString(node.ifaceBCast, false).c_str());
|
||||
}
|
||||
#if !defined(_WIN32)
|
||||
else
|
||||
{
|
||||
/* An oddness of BSD sockets (not winsock) is that binding to
|
||||
* INADDR_ANY will receive unicast and broadcast, but binding to
|
||||
* a specific interface address receives only unicast. The trick
|
||||
* is to bind a second socket to the interface broadcast address,
|
||||
* which will then receive only broadcasts.
|
||||
*/
|
||||
|
||||
// TODO opt copy
|
||||
osiSockAddr bcastAddress;
|
||||
bcastAddress.ia.sin_family = AF_INET;
|
||||
bcastAddress.ia.sin_port = htons(m_broadcastPort);
|
||||
bcastAddress.ia.sin_addr.s_addr = node.ifaceBCast.ia.sin_addr.s_addr;
|
||||
|
||||
transport2 = static_pointer_cast<BlockingUDPTransport>(broadcastConnector->connect(
|
||||
nullTransportClient, m_responseHandler,
|
||||
bcastAddress, PVA_PROTOCOL_REVISION,
|
||||
PVA_DEFAULT_PRIORITY));
|
||||
/* The other wrinkle is that nothing should be sent from this second
|
||||
* socket. So replies are made through the unicast socket.
|
||||
*/
|
||||
transport2->setReplyTransport(transport);
|
||||
}
|
||||
#endif
|
||||
|
||||
// TODO set ignore list on transport and transport2
|
||||
|
||||
|
||||
//
|
||||
// Setup local broadcasting
|
||||
//
|
||||
// Each network interface gets its own multicast group on a local interface.
|
||||
// Multicast address is determined by prefix 224.0.0.128 + NIF index
|
||||
//
|
||||
|
||||
osiSockAddr group;
|
||||
int lastAddr = 128 + node.ifaceIndex;
|
||||
std::ostringstream o;
|
||||
// TODO configurable prefix and base
|
||||
o << "224.0.0." << lastAddr;
|
||||
aToIPAddr(o.str().c_str(), m_broadcastPort, &group.ia);
|
||||
|
||||
transport->setMutlicastNIF(loAddr, true);
|
||||
transport->setLocalMulticastAddress(group);
|
||||
|
||||
BlockingUDPTransport::shared_pointer localMulticastTransport;
|
||||
|
||||
if (true)
|
||||
{
|
||||
try
|
||||
{
|
||||
// NOTE: multicast receiver socket must be "bound" to INADDR_ANY or multicast address
|
||||
localMulticastTransport = static_pointer_cast<BlockingUDPTransport>(broadcastConnector->connect(
|
||||
nullTransportClient, m_responseHandler,
|
||||
group, PVA_PROTOCOL_REVISION,
|
||||
PVA_DEFAULT_PRIORITY));
|
||||
localMulticastTransport->join(group, loAddr);
|
||||
|
||||
LOG(logLevelDebug, "Local multicast for %s enabled on %s/%s.",
|
||||
inetAddressToString(listenLocalAddress, false).c_str(),
|
||||
inetAddressToString(loAddr, false).c_str(),
|
||||
inetAddressToString(group).c_str());
|
||||
}
|
||||
catch (std::exception& ex)
|
||||
{
|
||||
LOG(logLevelDebug, "Failed to initialize local multicast, funcionality disabled. Reason: %s.", ex.what());
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG(logLevelDebug, "Failed to detect a loopback network interface, local multicast disabled.");
|
||||
}
|
||||
|
||||
transport->start();
|
||||
if(transport2)
|
||||
transport2->start();
|
||||
if (localMulticastTransport)
|
||||
localMulticastTransport->start();
|
||||
|
||||
m_udpTransports.push_back(transport);
|
||||
m_udpTransports.push_back(transport2);
|
||||
m_udpTransports.push_back(localMulticastTransport);
|
||||
|
||||
}
|
||||
catch (std::exception& e)
|
||||
{
|
||||
THROW_BASE_EXCEPTION_CAUSE("Failed to initialize broadcast UDP transport", e);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
THROW_BASE_EXCEPTION("Failed to initialize broadcast UDP transport");
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -4594,8 +4695,14 @@ namespace epics {
|
||||
destroyAllChannels();
|
||||
|
||||
// stop UDPs
|
||||
m_searchTransport->close();
|
||||
m_broadcastTransport->close();
|
||||
for (BlockingUDPTransportVector::const_iterator iter = m_udpTransports.begin();
|
||||
iter != m_udpTransports.end(); iter++)
|
||||
(*iter)->close();
|
||||
m_udpTransports.clear();
|
||||
|
||||
// stop UDPs
|
||||
if (m_searchTransport)
|
||||
m_searchTransport->close();
|
||||
|
||||
// wait for all transports to cleanly exit
|
||||
int tries = 40;
|
||||
@@ -4990,9 +5097,10 @@ namespace epics {
|
||||
Timer::shared_pointer m_timer;
|
||||
|
||||
/**
|
||||
* Broadcast transport needed to listen for broadcasts.
|
||||
* UDP transports needed to receive channel searches.
|
||||
*/
|
||||
BlockingUDPTransport::shared_pointer m_broadcastTransport;
|
||||
typedef std::vector<BlockingUDPTransport::shared_pointer> BlockingUDPTransportVector;
|
||||
BlockingUDPTransportVector m_udpTransports;
|
||||
|
||||
/**
|
||||
* UDP transport needed for channel searches.
|
||||
@@ -5105,8 +5213,6 @@ namespace epics {
|
||||
TransportRegistry::transportVector_t m_flushTransports;
|
||||
|
||||
FlushStrategy m_flushStrategy;
|
||||
|
||||
osiSockAddr m_localBroadcastAddress;
|
||||
};
|
||||
|
||||
namespace {
|
||||
|
||||
@@ -129,8 +129,6 @@ namespace epics {
|
||||
virtual void poll() = 0;
|
||||
|
||||
virtual void destroy() = 0;
|
||||
|
||||
virtual const osiSockAddr& getLocalBroadcastAddress() const = 0;
|
||||
};
|
||||
|
||||
epicsShareExtern ChannelProvider::shared_pointer createClientProvider(const Configuration::shared_pointer& conf);
|
||||
|
||||
Reference in New Issue
Block a user