From 1ca3918afa3b2421ec47aeffa8fcfc81074a7d60 Mon Sep 17 00:00:00 2001 From: Matej Sekoranja Date: Tue, 1 Mar 2016 12:11:25 +0100 Subject: [PATCH] local multicast reimplemented --- src/remote/blockingUDP.h | 33 ++++++- src/remote/blockingUDPTransport.cpp | 94 ++++++++++++++++--- src/remote/remote.h | 3 +- src/remoteClient/clientContextImpl.cpp | 124 ++++++++++++++----------- src/server/responseHandlers.cpp | 18 +++- src/server/serverContext.cpp | 114 ++++++++++++----------- src/utils/inetAddressUtil.cpp | 19 ---- src/utils/inetAddressUtil.h | 1 - 8 files changed, 260 insertions(+), 146 deletions(-) diff --git a/src/remote/blockingUDP.h b/src/remote/blockingUDP.h index be9745a..e0b2c5b 100644 --- a/src/remote/blockingUDP.h +++ b/src/remote/blockingUDP.h @@ -243,7 +243,7 @@ namespace epics { /** * Set ignore list. - * @param addresses list of ignored addresses. + * @param address list of ignored addresses. */ void setIgnoredAddresses(InetAddrVector* addresses) { if (addresses) @@ -265,6 +265,32 @@ namespace epics { return _ignoredAddresses; } + /** + * Set tapped NIF list. + * @param NIF address list to tap. + */ + void setTappedNIF(InetAddrVector* addresses) { + if (addresses) + { + if (!_tappedNIF) _tappedNIF = new InetAddrVector; + *_tappedNIF = *addresses; + } + else + { + if (_tappedNIF) { delete _tappedNIF; _tappedNIF = 0; } + } + } + + /** + * Get list of tapped NIF addresses. + * @return tapped NIF addresses. + */ + InetAddrVector* getTappedNIF() const { + return _tappedNIF; + } + + bool send(const char* buffer, size_t length, const osiSockAddr& address); + bool send(epics::pvData::ByteBuffer* buffer, const osiSockAddr& address); bool send(epics::pvData::ByteBuffer* buffer, InetAddressType target = inetAddressType_all); @@ -373,6 +399,11 @@ namespace epics { */ InetAddrVector* _ignoredAddresses; + /** + * Tapped NIF addresses. + */ + InetAddrVector* _tappedNIF; + /** * Send address. */ diff --git a/src/remote/blockingUDPTransport.cpp b/src/remote/blockingUDPTransport.cpp index 77e5813..6ce492e 100644 --- a/src/remote/blockingUDPTransport.cpp +++ b/src/remote/blockingUDPTransport.cpp @@ -38,6 +38,9 @@ inline int sendto(int s, const char *buf, size_t len, int flags, const struct so } #endif +// reserve some space for CMD_ORIGIN_TAG message +#define RECEIVE_BUFFER_PRE_RESERVE PVA_MESSAGE_HEADER_SIZE + 16 + PVACCESS_REFCOUNT_MONITOR_DEFINE(blockingUDPTransport); BlockingUDPTransport::BlockingUDPTransport(bool serverFlag, @@ -50,9 +53,10 @@ inline int sendto(int s, const char *buf, size_t len, int flags, const struct so _bindAddress(bindAddress), _sendAddresses(0), _ignoredAddresses(0), + _tappedNIF(0), _sendToEnabled(false), _localMulticastAddressEnabled(false), - _receiveBuffer(new ByteBuffer(MAX_UDP_RECV)), + _receiveBuffer(new ByteBuffer(MAX_UDP_RECV+RECEIVE_BUFFER_PRE_RESERVE)), _sendBuffer(new ByteBuffer(MAX_UDP_RECV)), _lastMessageStartPosition(0), _clientServerWithEndianFlag( @@ -218,15 +222,20 @@ inline int sendto(int s, const char *buf, size_t len, int flags, const struct so try { + char* recvfrom_buffer_start = (char*)(_receiveBuffer->getArray()+RECEIVE_BUFFER_PRE_RESERVE); + size_t recvfrom_buffer_len =_receiveBuffer->getSize()-RECEIVE_BUFFER_PRE_RESERVE; while(!_closed.get()) { // we poll to prevent blocking indefinitely // data ready to be read _receiveBuffer->clear(); + // reserve some space for CMD_ORIGIN_TAG + _receiveBuffer->setPosition(RECEIVE_BUFFER_PRE_RESERVE); - int bytesRead = recvfrom(_channel, (char*)_receiveBuffer->getArray(), - _receiveBuffer->getRemaining(), 0, (sockaddr*)&fromAddress, + int bytesRead = recvfrom(_channel, + recvfrom_buffer_start, recvfrom_buffer_len, + 0, (sockaddr*)&fromAddress, &addrStructSize); if(likely(bytesRead>0)) { @@ -245,9 +254,8 @@ inline int sendto(int s, const char *buf, size_t len, int flags, const struct so } if(likely(!ignore)) { - _receiveBuffer->setPosition(bytesRead); - - _receiveBuffer->flip(); + _receiveBuffer->setPosition(RECEIVE_BUFFER_PRE_RESERVE); + _receiveBuffer->setLimit(RECEIVE_BUFFER_PRE_RESERVE+bytesRead); try{ processBuffer(replyTo, fromAddress, _receiveBuffer.get()); @@ -322,11 +330,10 @@ inline int sendto(int s, const char *buf, size_t len, int flags, const struct so // second byte version int8 version = receiveBuffer->getByte(); - // only data for UDP int8 flags = receiveBuffer->getByte(); - if (flags < 0) + if (flags & 0x80) { - // 7-bit set + // 7th bit set receiveBuffer->setEndianess(EPICS_ENDIAN_BIG); } else @@ -338,21 +345,80 @@ inline int sendto(int s, const char *buf, size_t len, int flags, const struct so int8 command = receiveBuffer->getByte(); // TODO check this cast (size_t must be 32-bit) size_t payloadSize = receiveBuffer->getInt(); + + // control message check (skip message) + if (flags & 0x01) + continue; + size_t nextRequestPosition = receiveBuffer->getPosition() + payloadSize; // payload size check if(unlikely(nextRequestPosition>receiveBuffer->getLimit())) return false; - // handle - _responseHandler->handleResponse(&fromAddress, replyTransport, - version, command, payloadSize, - _receiveBuffer.get()); + // CMD_ORIGIN_TAG filtering + // NOTE: from design point of view this is not a right place to process application message here + if (unlikely((command == CMD_ORIGIN_TAG) && _tappedNIF)) + { + // 128-bit IPv6 address + osiSockAddr originNIFAddress; + if (decodeAsIPv6Address(receiveBuffer, &originNIFAddress)) + { + originNIFAddress.ia.sin_family = AF_INET; + + /* + LOG(logLevelDebug, "Got CMD_ORIGIN_TAG message form %s tagged as %s.", + inetAddressToString(fromAddress, true).c_str(), + inetAddressToString(originNIFAddress, false).c_str()); + */ + + // filter + if (originNIFAddress.ia.sin_addr.s_addr != htonl(INADDR_ANY)) + { + bool accept = false; + for(size_t i = 0; i < _tappedNIF->size(); i++) + { + if((*_tappedNIF)[i].ia.sin_addr.s_addr == originNIFAddress.ia.sin_addr.s_addr) + { + accept = true; + break; + } + } + + // ignore messages from non-tapped NIFs + if (!accept) + return false; + } + } + } + else + { + // handle + _responseHandler->handleResponse(&fromAddress, replyTransport, + version, command, payloadSize, + _receiveBuffer.get()); + } // set position (e.g. in case handler did not read all) receiveBuffer->setPosition(nextRequestPosition); } - //all ok + // all ok + return true; + } + + bool BlockingUDPTransport::send(const char* buffer, size_t length, const osiSockAddr& address) + { + int retval = sendto(_channel, buffer, + length, 0, &(address.sa), sizeof(sockaddr)); + if(unlikely(retval<0)) + { + char errStr[64]; + epicsSocketConvertErrnoToString(errStr, sizeof(errStr)); + LOG(logLevelDebug, "Socket sendto to %s error: %s.", + inetAddressToString(address).c_str(), errStr); + return false; + } + return true; } diff --git a/src/remote/remote.h b/src/remote/remote.h index acb3a9c..a865ad7 100644 --- a/src/remote/remote.h +++ b/src/remote/remote.h @@ -114,7 +114,8 @@ namespace epics { CMD_MESSAGE = 18, CMD_MULTIPLE_DATA = 19, CMD_RPC = 20, - CMD_CANCEL_REQUEST = 21 + CMD_CANCEL_REQUEST = 21, + CMD_ORIGIN_TAG = 22 }; enum ControlCommands { diff --git a/src/remoteClient/clientContextImpl.cpp b/src/remoteClient/clientContextImpl.cpp index 3945250..6f64393 100644 --- a/src/remoteClient/clientContextImpl.cpp +++ b/src/remoteClient/clientContextImpl.cpp @@ -2926,6 +2926,17 @@ namespace epics { BlockingUDPTransport::shared_pointer bt = dynamic_pointer_cast(transport); if (bt && bt->hasLocalMulticastAddress()) { + // RECEIVE_BUFFER_PRE_RESERVE allows to pre-fix message + size_t newStartPos = (startPosition-PVA_MESSAGE_HEADER_SIZE)-PVA_MESSAGE_HEADER_SIZE-16; + payloadBuffer->setPosition(newStartPos); + + // copy part of a header, and add: command, payloadSize, NIF address + payloadBuffer->put(payloadBuffer->getArray(), startPosition-PVA_MESSAGE_HEADER_SIZE, PVA_MESSAGE_HEADER_SIZE-5); + payloadBuffer->putByte(CMD_ORIGIN_TAG); + payloadBuffer->putInt(16); + // encode this socket bind address + encodeAsIPv6Address(payloadBuffer, bt->getBindAddress()); + // clear unicast flag payloadBuffer->put(startPosition+4, (int8)(qosCode & ~0x80)); @@ -2933,9 +2944,12 @@ namespace epics { payloadBuffer->setPosition(startPosition+8); encodeAsIPv6Address(payloadBuffer, &responseAddress); - payloadBuffer->setPosition(payloadBuffer->getLimit()); // send will call flip() + // set to end of a message + payloadBuffer->setPosition(payloadBuffer->getLimit()); + + bt->send(payloadBuffer->getArray()+newStartPos, payloadBuffer->getPosition()-newStartPos, + bt->getLocalMulticastAddress()); - bt->send(payloadBuffer, bt->getLocalMulticastAddress()); return; } } @@ -4558,14 +4572,59 @@ namespace epics { osiSockAddr loAddr; getLoopbackNIF(loAddr, "", 0); + // + // Setup local multicasting + // + + osiSockAddr group; + // TODO configurable local multicast address + aToIPAddr("224.0.0.128", m_broadcastPort, &group.ia); + + BlockingUDPTransport::shared_pointer localMulticastTransport; + + if (true) + { + try + { + // NOTE: multicast receiver socket must be "bound" to INADDR_ANY or multicast address + localMulticastTransport = static_pointer_cast(broadcastConnector->connect( + nullTransportClient, m_responseHandler, + group, PVA_PROTOCOL_REVISION, + PVA_DEFAULT_PRIORITY)); + localMulticastTransport->join(group, loAddr); + + if (localMulticastTransport) + { + localMulticastTransport->start(); + m_udpTransports.push_back(localMulticastTransport); + } + + LOG(logLevelDebug, "Local multicast enabled on %s/%s.", + inetAddressToString(loAddr, false).c_str(), + inetAddressToString(group).c_str()); + } + catch (std::exception& ex) + { + LOG(logLevelDebug, "Failed to initialize local multicast, funcionality disabled. Reason: %s.", ex.what()); + } + } + else + { + LOG(logLevelDebug, "Failed to detect a loopback network interface, local multicast disabled."); + } + + + + + + for (IfaceNodeVector::const_iterator iter = ifaceList.begin(); iter != ifaceList.end(); iter++) { ifaceNode node = *iter; - LOG(logLevelDebug, "Setting up UDP for interface %s, broadcast %s, index %d.", + LOG(logLevelDebug, "Setting up UDP for interface %s, broadcast %s.", inetAddressToString(node.ifaceAddr, false).c_str(), - inetAddressToString(node.ifaceBCast, false).c_str(), - node.ifaceIndex); + inetAddressToString(node.ifaceBCast, false).c_str()); try { // where to bind (listen) address @@ -4614,62 +4673,17 @@ namespace epics { } #endif - // - // Setup local broadcasting - // - // Each network interface gets its own multicast group on a local interface. - // Multicast address is determined by prefix 224.0.0.128 + NIF index - // - - osiSockAddr group; - int lastAddr = 128 + node.ifaceIndex; - std::ostringstream o; - // TODO configurable prefix and base - o << "224.0.0." << lastAddr; - aToIPAddr(o.str().c_str(), m_broadcastPort, &group.ia); - transport->setMutlicastNIF(loAddr, true); transport->setLocalMulticastAddress(group); - BlockingUDPTransport::shared_pointer localMulticastTransport; - - if (true) - { - try - { - // NOTE: multicast receiver socket must be "bound" to INADDR_ANY or multicast address - localMulticastTransport = static_pointer_cast(broadcastConnector->connect( - nullTransportClient, m_responseHandler, - group, PVA_PROTOCOL_REVISION, - PVA_DEFAULT_PRIORITY)); - localMulticastTransport->join(group, loAddr); - - LOG(logLevelDebug, "Local multicast for %s enabled on %s/%s.", - inetAddressToString(listenLocalAddress, false).c_str(), - inetAddressToString(loAddr, false).c_str(), - inetAddressToString(group).c_str()); - } - catch (std::exception& ex) - { - LOG(logLevelDebug, "Failed to initialize local multicast, funcionality disabled. Reason: %s.", ex.what()); - } - } - else - { - LOG(logLevelDebug, "Failed to detect a loopback network interface, local multicast disabled."); - } - transport->start(); - if(transport2) - transport2->start(); - if (localMulticastTransport) - localMulticastTransport->start(); - m_udpTransports.push_back(transport); - if (transport2) - m_udpTransports.push_back(transport2); - m_udpTransports.push_back(localMulticastTransport); + if (transport2) + { + transport2->start(); + m_udpTransports.push_back(transport2); + } } catch (std::exception& e) { diff --git a/src/server/responseHandlers.cpp b/src/server/responseHandlers.cpp index 8a6980f..3804cc9 100644 --- a/src/server/responseHandlers.cpp +++ b/src/server/responseHandlers.cpp @@ -266,6 +266,17 @@ void ServerSearchHandler::handleResponse(osiSockAddr* responseFrom, BlockingUDPTransport::shared_pointer bt = dynamic_pointer_cast(transport); if (bt && bt->hasLocalMulticastAddress()) { + // RECEIVE_BUFFER_PRE_RESERVE allows to pre-fix message + size_t newStartPos = (startPosition-PVA_MESSAGE_HEADER_SIZE)-PVA_MESSAGE_HEADER_SIZE-16; + payloadBuffer->setPosition(newStartPos); + + // copy part of a header, and add: command, payloadSize, NIF address + payloadBuffer->put(payloadBuffer->getArray(), startPosition-PVA_MESSAGE_HEADER_SIZE, PVA_MESSAGE_HEADER_SIZE-5); + payloadBuffer->putByte(CMD_ORIGIN_TAG); + payloadBuffer->putInt(16); + // encode this socket bind address + encodeAsIPv6Address(payloadBuffer, bt->getBindAddress()); + // clear unicast flag payloadBuffer->put(startPosition+4, (int8)(qosCode & ~0x80)); @@ -273,9 +284,12 @@ void ServerSearchHandler::handleResponse(osiSockAddr* responseFrom, payloadBuffer->setPosition(startPosition+8); encodeAsIPv6Address(payloadBuffer, &responseAddress); - payloadBuffer->setPosition(payloadBuffer->getLimit()); // send will call flip() + // set to end of a message + payloadBuffer->setPosition(payloadBuffer->getLimit()); + + bt->send(payloadBuffer->getArray()+newStartPos, payloadBuffer->getPosition()-newStartPos, + bt->getLocalMulticastAddress()); - bt->send(payloadBuffer, bt->getLocalMulticastAddress()); return; } } diff --git a/src/server/serverContext.cpp b/src/server/serverContext.cpp index 5ed0ea9..5dfa0a2 100644 --- a/src/server/serverContext.cpp +++ b/src/server/serverContext.cpp @@ -361,14 +361,60 @@ void ServerContextImpl::initializeBroadcastTransport() osiSockAddr loAddr; getLoopbackNIF(loAddr, "", 0); + + // + // Setup local multicasting + // + + osiSockAddr group; + // TODO configurable local multicast address + aToIPAddr("224.0.0.128", _broadcastPort, &group.ia); + + BlockingUDPTransport::shared_pointer localMulticastTransport; + + if (true) + { + try + { + // NOTE: multicast receiver socket must be "bound" to INADDR_ANY or multicast address + localMulticastTransport = static_pointer_cast(broadcastConnector->connect( + nullTransportClient, _responseHandler, + group, PVA_PROTOCOL_REVISION, + PVA_DEFAULT_PRIORITY)); + localMulticastTransport->join(group, loAddr); + + if (localMulticastTransport) + { + localMulticastTransport->start(); + _udpTransports.push_back(localMulticastTransport); + } + + LOG(logLevelDebug, "Local multicast enabled on %s/%s.", + inetAddressToString(loAddr, false).c_str(), + inetAddressToString(group).c_str()); + } + catch (std::exception& ex) + { + LOG(logLevelDebug, "Failed to initialize local multicast, funcionality disabled. Reason: %s.", ex.what()); + } + } + else + { + LOG(logLevelDebug, "Failed to detect a loopback network interface, local multicast disabled."); + } + + + + + InetAddrVector tappedNIF; + for (IfaceNodeVector::const_iterator iter = _ifaceList.begin(); iter != _ifaceList.end(); iter++) { ifaceNode node = *iter; - LOG(logLevelDebug, "Setting up UDP for interface %s, broadcast %s, index %d.", + LOG(logLevelDebug, "Setting up UDP for interface %s, broadcast %s.", inetAddressToString(node.ifaceAddr, false).c_str(), - inetAddressToString(node.ifaceBCast, false).c_str(), - node.ifaceIndex); + inetAddressToString(node.ifaceBCast, false).c_str()); try { // where to bind (listen) address @@ -387,6 +433,8 @@ void ServerContextImpl::initializeBroadcastTransport() if (ignoreAddressList.get() && ignoreAddressList->size()) transport->setIgnoredAddresses(ignoreAddressList.get()); + tappedNIF.push_back(listenLocalAddress); + BlockingUDPTransport::shared_pointer transport2; @@ -421,65 +469,22 @@ void ServerContextImpl::initializeBroadcastTransport() if (ignoreAddressList.get() && ignoreAddressList->size()) transport2->setIgnoredAddresses(ignoreAddressList.get()); + + tappedNIF.push_back(bcastAddress); } #endif - // - // Setup local broadcasting - // - // Each network interface gets its own multicast group on a local interface. - // Multicast address is determined by prefix 224.0.0.128 + NIF index - // - - osiSockAddr group; - int lastAddr = 128 + node.ifaceIndex; - std::ostringstream o; - // TODO configurable prefix and base - o << "224.0.0." << lastAddr; - aToIPAddr(o.str().c_str(), _broadcastPort, &group.ia); - transport->setMutlicastNIF(loAddr, true); transport->setLocalMulticastAddress(group); - BlockingUDPTransport::shared_pointer localMulticastTransport; - - if (true) - { - try - { - // NOTE: multicast receiver socket must be "bound" to INADDR_ANY or multicast address - localMulticastTransport = static_pointer_cast(broadcastConnector->connect( - nullTransportClient, _responseHandler, - group, PVA_PROTOCOL_REVISION, - PVA_DEFAULT_PRIORITY)); - localMulticastTransport->join(group, loAddr); - - LOG(logLevelDebug, "Local multicast for %s enabled on %s/%s.", - inetAddressToString(listenLocalAddress, false).c_str(), - inetAddressToString(loAddr, false).c_str(), - inetAddressToString(group).c_str()); - } - catch (std::exception& ex) - { - LOG(logLevelDebug, "Failed to initialize local multicast, funcionality disabled. Reason: %s.", ex.what()); - } - } - else - { - LOG(logLevelDebug, "Failed to detect a loopback network interface, local multicast disabled."); - } - transport->start(); - if(transport2) - transport2->start(); - if (localMulticastTransport) - localMulticastTransport->start(); - _udpTransports.push_back(transport); - if(transport2) - _udpTransports.push_back(transport2); - _udpTransports.push_back(localMulticastTransport); + if (transport2) + { + transport2->start(); + _udpTransports.push_back(transport2); + } } catch (std::exception& e) { @@ -490,6 +495,9 @@ void ServerContextImpl::initializeBroadcastTransport() THROW_BASE_EXCEPTION("Failed to initialize broadcast UDP transport"); } } + + if (localMulticastTransport) + localMulticastTransport->setTappedNIF(&tappedNIF); } void ServerContextImpl::run(int32 seconds) diff --git a/src/utils/inetAddressUtil.cpp b/src/utils/inetAddressUtil.cpp index 6eca68f..819ca21 100644 --- a/src/utils/inetAddressUtil.cpp +++ b/src/utils/inetAddressUtil.cpp @@ -396,25 +396,6 @@ int discoverInterfaces(IfaceNodeVector &list, SOCKET socket, const osiSockAddr * } } - - unsigned int index = if_nametoindex(pIfreqList->ifr_name); - if ( !index ) { - errlogPrintf ("discoverInterfaces(): net intf index fetch for \"%s\" failed\n", pIfreqList->ifr_name); - continue; - } - - node.ifaceIndex = index; - - /* - status = socket_ioctl ( socket, SIOCGIFINDEX, pIfreqList ); - if ( status ) { - errlogPrintf ("discoverInterfaces(): net intf index fetch for \"%s\" failed\n", pIfreqList->ifr_name); - continue; - } - - node.ifaceIndex = pIfreqList->ifr_ifindex; - */ - /*ifDepenDebugPrintf ( ("discoverInterfaces(): net intf \"%s\" found\n", pIfreqList->ifr_name) );*/ list.push_back(node); diff --git a/src/utils/inetAddressUtil.h b/src/utils/inetAddressUtil.h index 7c83983..7972a23 100644 --- a/src/utils/inetAddressUtil.h +++ b/src/utils/inetAddressUtil.h @@ -43,7 +43,6 @@ namespace pvAccess { epicsShareFunc InetAddrVector* getBroadcastAddresses(SOCKET sock, unsigned short defaultPort); struct ifaceNode { - int ifaceIndex; osiSockAddr ifaceAddr, ifaceBCast; }; typedef std::vector IfaceNodeVector;