local multicast on the client side

This commit is contained in:
Matej Sekoranja
2014-11-13 08:37:06 +01:00
parent 4ec961e98b
commit b3c030d946
3 changed files with 129 additions and 1 deletions

View File

@@ -2775,6 +2775,82 @@ namespace epics {
}
};
class SearchHandler : public AbstractClientResponseHandler, private epics::pvData::NoDefaultMethods {
public:
SearchHandler(ClientContextImpl::shared_pointer const & context) :
AbstractClientResponseHandler(context, "Search")
{
}
virtual ~SearchHandler() {
}
virtual void handleResponse(osiSockAddr* responseFrom,
Transport::shared_pointer const & transport, int8 version, int8 command,
size_t payloadSize, epics::pvData::ByteBuffer* payloadBuffer)
{
AbstractClientResponseHandler::handleResponse(responseFrom, transport, version, command, payloadSize, payloadBuffer);
transport->ensureData(4+1+3+16+2);
size_t startPosition = payloadBuffer->getPosition();
/*const int32 searchSequenceId =*/ payloadBuffer->getInt();
const int8 qosCode = payloadBuffer->getByte();
// reserved part
payloadBuffer->getByte();
payloadBuffer->getShort();
osiSockAddr responseAddress;
responseAddress.ia.sin_family = AF_INET;
// 128-bit IPv6 address
if (!decodeAsIPv6Address(payloadBuffer, &responseAddress)) return;
// accept given address if explicitly specified by sender
if (responseAddress.ia.sin_addr.s_addr == INADDR_ANY)
responseAddress.ia.sin_addr = responseFrom->ia.sin_addr;
// NOTE: htons might be a macro (e.g. vxWorks)
int16 port = payloadBuffer->getShort();
responseAddress.ia.sin_port = htons(port);
// we ignore the rest, since we care only about data relevant
// to do the local multicast
//
// locally broadcast if unicast (qosCode & 0x80 == 0x80)
//
if ((qosCode & 0x80) == 0x80)
{
// TODO optimize
ClientContextImpl::shared_pointer context = _context.lock();
if (!context)
return;
BlockingUDPTransport::shared_pointer bt =
//context->getLocalMulticastTransport();
std::tr1::dynamic_pointer_cast<BlockingUDPTransport>(context->getSearchTransport());
if (bt)
{
// clear unicast flag
payloadBuffer->put(startPosition+4, (int8)(qosCode & ~0x80));
// update response address
payloadBuffer->setPosition(startPosition+8);
encodeAsIPv6Address(payloadBuffer, &responseAddress);
payloadBuffer->setPosition(payloadBuffer->getLimit()); // send will call flip()
bt->send(payloadBuffer, context->getLocalBroadcastAddress());
return;
}
}
}
};
class BeaconResponseHandler : public AbstractClientResponseHandler, private epics::pvData::NoDefaultMethods {
public:
@@ -3013,7 +3089,7 @@ namespace epics {
m_handlerTable[CMD_BEACON].reset(new BeaconResponseHandler(context)); /* 0 */
m_handlerTable[CMD_CONNECTION_VALIDATION].reset(new ClientConnectionValidationHandler(context)); /* 1 */
m_handlerTable[CMD_ECHO].reset(new NoopResponse(context, "Echo")); /* 2 */
m_handlerTable[CMD_SEARCH].reset(new NoopResponse(context, "Search")); /* 3 */
m_handlerTable[CMD_SEARCH].reset(new SearchHandler(context)); /* 3 */
m_handlerTable[CMD_SEARCH_RESPONSE].reset(new SearchResponseHandler(context)); /* 4 */
m_handlerTable[CMD_AUTHNZ].reset(new AuthNZHandler(context.get())); /* 5 */
m_handlerTable[CMD_ACL_CHANGE].reset(new NoopResponse(context, "Access rights change")); /* 6 */
@@ -4226,6 +4302,11 @@ TODO
PVACCESS_REFCOUNT_MONITOR_DESTRUCT(remoteClientContext);
};
virtual const osiSockAddr& getLocalBroadcastAddress() const
{
return m_localBroadcastAddress;
}
private:
void loadConfiguration() {
@@ -4331,6 +4412,44 @@ TODO
return false;
m_searchTransport->setSendAddresses(broadcastAddresses.get());
// TODO do not use searchBroadcast in future
// setup local broadcasting
// TODO configurable local NIF, address
osiSockAddr loAddr;
getLoopbackNIF(loAddr, "", 0);
if (true)
{
try
{
//osiSockAddr group;
aToIPAddr("224.0.0.128", m_broadcastPort, &m_localBroadcastAddress.ia);
m_broadcastTransport->join(m_localBroadcastAddress, loAddr);
osiSockAddr anyAddress;
anyAddress.ia.sin_family = AF_INET;
anyAddress.ia.sin_port = htons(0);
anyAddress.ia.sin_addr.s_addr = htonl(INADDR_ANY);
// NOTE: this disables usage of multicast addresses in EPICS_PVA_ADDR_LIST
m_searchTransport->setMutlicastNIF(loAddr, true);
//InetAddrVector sendAddressList;
//sendAddressList.push_back(group);
//m_searchTransport->setSendAddresses(&sendAddressList);
LOG(logLevelDebug, "Local multicast enabled on %s using network interface %s.",
inetAddressToString(m_localBroadcastAddress).c_str(), inetAddressToString(loAddr, false).c_str());
}
catch (std::exception& ex)
{
LOG(logLevelDebug, "Failed to initialize local multicast, funcionality disabled. Reason: %s.", ex.what());
}
}
else
{
LOG(logLevelDebug, "Failed to detect a loopback network interface, local multicast disabled.");
}
// become active
m_broadcastTransport->start();
m_searchTransport->start();
@@ -4844,6 +4963,8 @@ TODO
TransportRegistry::transportVector_t m_flushTransports;
FlushStrategy m_flushStrategy;
osiSockAddr m_localBroadcastAddress;
};
ClientContextImpl::shared_pointer createClientContextImpl()