local multicast reimplemented

This commit is contained in:
Matej Sekoranja
2016-03-01 12:11:25 +01:00
parent 19031af095
commit 1ca3918afa
8 changed files with 260 additions and 146 deletions

View File

@@ -38,6 +38,9 @@ inline int sendto(int s, const char *buf, size_t len, int flags, const struct so
}
#endif
// reserve some space for CMD_ORIGIN_TAG message
#define RECEIVE_BUFFER_PRE_RESERVE PVA_MESSAGE_HEADER_SIZE + 16
PVACCESS_REFCOUNT_MONITOR_DEFINE(blockingUDPTransport);
BlockingUDPTransport::BlockingUDPTransport(bool serverFlag,
@@ -50,9 +53,10 @@ inline int sendto(int s, const char *buf, size_t len, int flags, const struct so
_bindAddress(bindAddress),
_sendAddresses(0),
_ignoredAddresses(0),
_tappedNIF(0),
_sendToEnabled(false),
_localMulticastAddressEnabled(false),
_receiveBuffer(new ByteBuffer(MAX_UDP_RECV)),
_receiveBuffer(new ByteBuffer(MAX_UDP_RECV+RECEIVE_BUFFER_PRE_RESERVE)),
_sendBuffer(new ByteBuffer(MAX_UDP_RECV)),
_lastMessageStartPosition(0),
_clientServerWithEndianFlag(
@@ -218,15 +222,20 @@ inline int sendto(int s, const char *buf, size_t len, int flags, const struct so
try {
char* recvfrom_buffer_start = (char*)(_receiveBuffer->getArray()+RECEIVE_BUFFER_PRE_RESERVE);
size_t recvfrom_buffer_len =_receiveBuffer->getSize()-RECEIVE_BUFFER_PRE_RESERVE;
while(!_closed.get())
{
// we poll to prevent blocking indefinitely
// data ready to be read
_receiveBuffer->clear();
// reserve some space for CMD_ORIGIN_TAG
_receiveBuffer->setPosition(RECEIVE_BUFFER_PRE_RESERVE);
int bytesRead = recvfrom(_channel, (char*)_receiveBuffer->getArray(),
_receiveBuffer->getRemaining(), 0, (sockaddr*)&fromAddress,
int bytesRead = recvfrom(_channel,
recvfrom_buffer_start, recvfrom_buffer_len,
0, (sockaddr*)&fromAddress,
&addrStructSize);
if(likely(bytesRead>0)) {
@@ -245,9 +254,8 @@ inline int sendto(int s, const char *buf, size_t len, int flags, const struct so
}
if(likely(!ignore)) {
_receiveBuffer->setPosition(bytesRead);
_receiveBuffer->flip();
_receiveBuffer->setPosition(RECEIVE_BUFFER_PRE_RESERVE);
_receiveBuffer->setLimit(RECEIVE_BUFFER_PRE_RESERVE+bytesRead);
try{
processBuffer(replyTo, fromAddress, _receiveBuffer.get());
@@ -322,11 +330,10 @@ inline int sendto(int s, const char *buf, size_t len, int flags, const struct so
// second byte version
int8 version = receiveBuffer->getByte();
// only data for UDP
int8 flags = receiveBuffer->getByte();
if (flags < 0)
if (flags & 0x80)
{
// 7-bit set
// 7th bit set
receiveBuffer->setEndianess(EPICS_ENDIAN_BIG);
}
else
@@ -338,21 +345,80 @@ inline int sendto(int s, const char *buf, size_t len, int flags, const struct so
int8 command = receiveBuffer->getByte();
// TODO check this cast (size_t must be 32-bit)
size_t payloadSize = receiveBuffer->getInt();
// control message check (skip message)
if (flags & 0x01)
continue;
size_t nextRequestPosition = receiveBuffer->getPosition() + payloadSize;
// payload size check
if(unlikely(nextRequestPosition>receiveBuffer->getLimit())) return false;
// handle
_responseHandler->handleResponse(&fromAddress, replyTransport,
version, command, payloadSize,
_receiveBuffer.get());
// CMD_ORIGIN_TAG filtering
// NOTE: from design point of view this is not a right place to process application message here
if (unlikely((command == CMD_ORIGIN_TAG) && _tappedNIF))
{
// 128-bit IPv6 address
osiSockAddr originNIFAddress;
if (decodeAsIPv6Address(receiveBuffer, &originNIFAddress))
{
originNIFAddress.ia.sin_family = AF_INET;
/*
LOG(logLevelDebug, "Got CMD_ORIGIN_TAG message form %s tagged as %s.",
inetAddressToString(fromAddress, true).c_str(),
inetAddressToString(originNIFAddress, false).c_str());
*/
// filter
if (originNIFAddress.ia.sin_addr.s_addr != htonl(INADDR_ANY))
{
bool accept = false;
for(size_t i = 0; i < _tappedNIF->size(); i++)
{
if((*_tappedNIF)[i].ia.sin_addr.s_addr == originNIFAddress.ia.sin_addr.s_addr)
{
accept = true;
break;
}
}
// ignore messages from non-tapped NIFs
if (!accept)
return false;
}
}
}
else
{
// handle
_responseHandler->handleResponse(&fromAddress, replyTransport,
version, command, payloadSize,
_receiveBuffer.get());
}
// set position (e.g. in case handler did not read all)
receiveBuffer->setPosition(nextRequestPosition);
}
//all ok
// all ok
return true;
}
bool BlockingUDPTransport::send(const char* buffer, size_t length, const osiSockAddr& address)
{
int retval = sendto(_channel, buffer,
length, 0, &(address.sa), sizeof(sockaddr));
if(unlikely(retval<0))
{
char errStr[64];
epicsSocketConvertErrnoToString(errStr, sizeof(errStr));
LOG(logLevelDebug, "Socket sendto to %s error: %s.",
inetAddressToString(address).c_str(), errStr);
return false;
}
return true;
}