blockingUDPTransport - working on processRead implementation.

This commit is contained in:
miha_vitorovic
2010-12-24 15:00:26 +01:00
parent 3d8b354705
commit bf5810ff61
2 changed files with 131 additions and 4 deletions

View File

@@ -21,6 +21,9 @@
/* standard */
#include <cstdio>
#include <unistd.h>
#include <poll.h>
#include <errno.h>
namespace epics {
namespace pvAccess {
@@ -28,7 +31,7 @@ namespace epics {
using namespace epics::pvData;
BlockingUDPTransport::BlockingUDPTransport(SOCKET channel,
osiSockAddr* bindAddress, osiSockAddr* sendAddresses,
osiSockAddr* bindAddress, InetAddrVector* sendAddresses,
short remoteTransportRevision) {
this->channel = channel;
this->bindAddress = bindAddress;
@@ -46,11 +49,13 @@ namespace epics {
sendTo = NULL;
closed = false;
lastMessageStartPosition = 0;
readBuffer = new char[MAX_UDP_RECV];
}
BlockingUDPTransport::~BlockingUDPTransport() {
delete receiveBuffer;
delete sendBuffer;
delete readBuffer;
}
void BlockingUDPTransport::start() {
@@ -89,5 +94,101 @@ namespace epics {
sizeof(int32));
}
void BlockingUDPTransport::processRead() {
// This function is always called from only one thread - this
// object's own thread.
pollfd pfd;
pfd.fd = channel;
pfd.events = POLLIN;
osiSockAddr fromAddress;
try {
while(!closed) {
// we poll to prevent blocking indefinitely
/* From 'accept' man page:
* In order to be notified of incoming connections on
* a socket, you can use select(2) or poll(2). A readable
* event will be delivered when a new connection is
* attempted and you may then call accept() to get a
* socket for that connection.
*/
int retval = poll(&pfd, 1, 100);
if(retval>0) {
// activity on SOCKET
if(pfd.revents&POLLIN) {
// data ready to be read
receiveBuffer->clear();
socklen_t addrStructSize = sizeof(sockaddr);
int bytesRead = recvfrom(channel, readBuffer,
MAX_UDP_RECV, 0, (sockaddr*)&fromAddress,
&addrStructSize);
if(bytesRead>0) {
// successfully got datagram
bool ignore = false;
if(ignoredAddresses!=NULL) for(int i = 0; i
<ignoredAddresses->size(); i++)
if(ignoredAddresses->at(i)->ia.sin_addr.s_addr
==fromAddress.ia.sin_addr.s_addr) {
ignore = true;
break;
}
if(!ignore) {
receiveBuffer->put(
readBuffer,
0,
bytesRead
<receiveBuffer->getRemaining() ? bytesRead
: receiveBuffer->getRemaining());
receiveBuffer->flip();
processBuffer(&fromAddress, receiveBuffer);
}
}
else {
// log a 'recvfrom' error
if(bytesRead==-1) errlogSevPrintf(errlogMajor,
"Socket recv error: %s", strerror(errno));
}
}
else {
// error (POLLERR, POLLHUP, or POLLNVAL)
if(pfd.revents&POLLERR) errlogSevPrintf(
errlogMajor, "Socket poll error (POLLERR)");
if(pfd.revents&POLLHUP) errlogSevPrintf(
errlogMinor, "Socket poll error (POLLHUP)");
if(pfd.revents&POLLNVAL) errlogSevPrintf(
errlogMajor,
"Socket poll error: server socket no longer bound.");
}
}
// retval == 0 : timeout
// retval < 0 : error
if(retval<0) errlogSevPrintf(errlogMajor,
"Socket poll error: %s", strerror(errno));
}
} catch(...) {
// TODO: catch all exceptions, and act accordingly
close(true);
}
}
bool BlockingUDPTransport::processBuffer(osiSockAddr* fromAddress,
ByteBuffer* receiveBuffer) {
// TODO: implement
return true;
}
}
}

View File

@@ -8,12 +8,16 @@
#ifndef BLOCKINGUDPTRANSPORT_H_
#define BLOCKINGUDPTRANSPORT_H_
/* pvAccess */
#include "remote.h"
#include "caConstants.h"
#include "inetAddressUtil.h"
/* pvData */
#include <noDefaultMethods.h>
#include <byteBuffer.h>
/* EPICSv3 */
#include <osdSock.h>
#include <osiSock.h>
@@ -25,7 +29,8 @@ namespace epics {
public TransportSendControl {
public:
BlockingUDPTransport(SOCKET channel, osiSockAddr* bindAddress,
osiSockAddr* sendAddresses, short remoteTransportRevision);
InetAddrVector* sendAddresses,
short remoteTransportRevision);
virtual ~BlockingUDPTransport();
@@ -125,6 +130,22 @@ namespace epics {
// noop
}
/**
* Set ignore list.
* @param addresses list of ignored addresses.
*/
void setIgnoredAddresses(InetAddrVector* addresses) {
ignoredAddresses = addresses;
}
/**
* Get list of ignored addresses.
* @return ignored addresses.
*/
InetAddrVector* getIgnoredAddresses() const {
return ignoredAddresses;
}
protected:
bool closed;
@@ -154,12 +175,12 @@ namespace epics {
/**
* Send addresses.
*/
osiSockAddr* sendAddresses;
InetAddrVector* sendAddresses;
/**
* Ignore addresses.
*/
osiSockAddr* ignoredAddresses;
InetAddrVector* ignoredAddresses;
const osiSockAddr* sendTo;
@@ -178,6 +199,11 @@ namespace epics {
*/
int lastMessageStartPosition;
/**
* Read buffer
*/
char* readBuffer;
};
}