pvlist improvements

This commit is contained in:
Matej Sekoranja
2014-11-20 22:13:29 +01:00
parent 2bde091ac3
commit 976fe49d60

View File

@ -105,12 +105,13 @@ struct ServerEntry {
typedef map<string, ServerEntry> ServerMap;
static ServerMap serverMap;
void processSearchResponse(osiSockAddr const & responseFrom, ByteBuffer & receiveBuffer)
// return true if new server response is recevived
bool processSearchResponse(osiSockAddr const & responseFrom, ByteBuffer & receiveBuffer)
{
// first byte is PVA_MAGIC
int8 magic = receiveBuffer.getByte();
if(magic != PVA_MAGIC)
return;
return false;
// second byte version
int8 version = receiveBuffer.getByte();
@ -130,11 +131,11 @@ void processSearchResponse(osiSockAddr const & responseFrom, ByteBuffer & receiv
// command ID and paylaod
int8 command = receiveBuffer.getByte();
if (command != (int8)0x04)
return;
return false;
size_t payloadSize = receiveBuffer.getInt();
if (payloadSize < (12+4+16+2))
return;
return false;
epics::pvAccess::GUID guid;
@ -146,7 +147,8 @@ void processSearchResponse(osiSockAddr const & responseFrom, ByteBuffer & receiv
serverAddress.ia.sin_family = AF_INET;
// 128-bit IPv6 address
if (!decodeAsIPv6Address(&receiveBuffer, &serverAddress)) return;
if (!decodeAsIPv6Address(&receiveBuffer, &serverAddress))
return false;
// accept given address if explicitly specified by sender
if (serverAddress.ia.sin_addr.s_addr == INADDR_ANY)
@ -178,7 +180,12 @@ void processSearchResponse(osiSockAddr const & responseFrom, ByteBuffer & receiv
}
if (!found)
{
vec.push_back(serverAddress);
return true;
}
else
return false;
}
else
{
@ -189,9 +196,9 @@ void processSearchResponse(osiSockAddr const & responseFrom, ByteBuffer & receiv
serverEntry.version = version;
serverMap[guidString] = serverEntry;
}
return;
return true;
}
}
bool discoverServers(double timeOut)
@ -267,6 +274,22 @@ bool discoverServers(double timeOut)
return false;
}
// set timeout
struct timeval timeout;
memset(&timeout, 0, sizeof(struct timeval));
timeout.tv_sec = 0;
timeout.tv_usec = 250000;
status = ::setsockopt (socket, SOL_SOCKET, SO_RCVTIMEO,
(char*)&timeout, sizeof(timeout));
if (status)
{
char errStr[64];
epicsSocketConvertErrnoToString(errStr, sizeof(errStr));
fprintf(stderr, "Error setting SO_RCVTIMEO: %s\n", errStr);
return false;
}
osiSockAddr responseAddress;
osiSocklen_t sockLen = sizeof(sockaddr);
// read the actual socket info
@ -319,28 +342,14 @@ bool discoverServers(double timeOut)
return false;
// set timeout in case message is not sent
struct timeval timeout;
memset(&timeout, 0, sizeof(struct timeval));
timeout.tv_sec = 1;
timeout.tv_usec = 0;
status = ::setsockopt (socket, SOL_SOCKET, SO_RCVTIMEO,
(char*)&timeout, sizeof(timeout));
if (status)
{
char errStr[64];
epicsSocketConvertErrnoToString(errStr, sizeof(errStr));
fprintf(stderr, "Error setting SO_RCVTIMEO: %s\n", errStr);
return false;
}
char rxbuff[1024];
ByteBuffer receiveBuffer(rxbuff, sizeof(rxbuff)/sizeof(char));
osiSockAddr fromAddress;
osiSocklen_t addrStructSize = sizeof(sockaddr);
int sendCount = 0;
while (true)
{
receiveBuffer.clear();
@ -349,6 +358,7 @@ bool discoverServers(double timeOut)
int bytesRead = ::recvfrom(socket, (char*)receiveBuffer.getArray(),
receiveBuffer.getRemaining(), 0,
(sockaddr*)&fromAddress, &addrStructSize);
if (bytesRead > 0)
{
receiveBuffer.setPosition(bytesRead);
@ -357,9 +367,9 @@ bool discoverServers(double timeOut)
processSearchResponse(fromAddress, receiveBuffer);
}
else if (status <= 0)
else
{
if (status == -1)
if (bytesRead == -1)
{
int socketError = SOCKERRNO;
@ -369,17 +379,50 @@ bool discoverServers(double timeOut)
// windows times out with this
//socketError == SOCK_ETIMEDOUT ||
socketError == SOCK_EWOULDBLOCK)
continue;
if (socketError == SOCK_ECONNREFUSED || // avoid spurious ECONNREFUSED in Linux
{
// OK
}
else if (socketError == SOCK_ECONNREFUSED || // avoid spurious ECONNREFUSED in Linux
socketError == SOCK_ECONNRESET) // or ECONNRESET in Windows
continue;
{
// OK
}
else
{
// unexpected error
char errStr[64];
epicsSocketConvertErrnoToString(errStr, sizeof(errStr));
fprintf(stderr, "Socket recv error: %s\n", errStr);
break;
}
char errStr[64];
epicsSocketConvertErrnoToString(errStr, sizeof(errStr));
fprintf(stderr, "Socket recv error: %s\n", errStr);
}
break;
if (++sendCount < 3)
{
// TODO duplicate code
bool oneOK = false;
for (size_t i = 0; i < broadcastAddresses->size(); i++)
{
// send the packet
status = ::sendto(socket, sendBuffer.getArray(), sendBuffer.getPosition(), 0,
&((*broadcastAddresses)[i].sa), sizeof(sockaddr));
if (status < 0)
{
char errStr[64];
epicsSocketConvertErrnoToString(errStr, sizeof(errStr));
fprintf(stderr, "Send error: %s\n", errStr);
}
else
oneOK = true;
}
if (!oneOK)
return false;
}
else
break;
}
}