diff --git a/pvAccessApp/remote/blockingUDPTransport.cpp b/pvAccessApp/remote/blockingUDPTransport.cpp index d7b7552..095bfcc 100644 --- a/pvAccessApp/remote/blockingUDPTransport.cpp +++ b/pvAccessApp/remote/blockingUDPTransport.cpp @@ -21,6 +21,9 @@ /* standard */ #include +#include +#include +#include namespace epics { namespace pvAccess { @@ -28,7 +31,7 @@ namespace epics { using namespace epics::pvData; BlockingUDPTransport::BlockingUDPTransport(SOCKET channel, - osiSockAddr* bindAddress, osiSockAddr* sendAddresses, + osiSockAddr* bindAddress, InetAddrVector* sendAddresses, short remoteTransportRevision) { this->channel = channel; this->bindAddress = bindAddress; @@ -46,11 +49,13 @@ namespace epics { sendTo = NULL; closed = false; lastMessageStartPosition = 0; + readBuffer = new char[MAX_UDP_RECV]; } BlockingUDPTransport::~BlockingUDPTransport() { delete receiveBuffer; delete sendBuffer; + delete readBuffer; } void BlockingUDPTransport::start() { @@ -89,5 +94,101 @@ namespace epics { sizeof(int32)); } + void BlockingUDPTransport::processRead() { + // This function is always called from only one thread - this + // object's own thread. + + pollfd pfd; + pfd.fd = channel; + pfd.events = POLLIN; + + osiSockAddr fromAddress; + + try { + + while(!closed) { + // we poll to prevent blocking indefinitely + + /* From 'accept' man page: + * In order to be notified of incoming connections on + * a socket, you can use select(2) or poll(2). A readable + * event will be delivered when a new connection is + * attempted and you may then call accept() to get a + * socket for that connection. + */ + + int retval = poll(&pfd, 1, 100); + if(retval>0) { + // activity on SOCKET + if(pfd.revents&POLLIN) { + // data ready to be read + receiveBuffer->clear(); + + socklen_t addrStructSize = sizeof(sockaddr); + + int bytesRead = recvfrom(channel, readBuffer, + MAX_UDP_RECV, 0, (sockaddr*)&fromAddress, + &addrStructSize); + + if(bytesRead>0) { + // successfully got datagram + bool ignore = false; + if(ignoredAddresses!=NULL) for(int i = 0; i + size(); i++) + if(ignoredAddresses->at(i)->ia.sin_addr.s_addr + ==fromAddress.ia.sin_addr.s_addr) { + ignore = true; + break; + } + + if(!ignore) { + receiveBuffer->put( + readBuffer, + 0, + bytesRead + getRemaining() ? bytesRead + : receiveBuffer->getRemaining()); + + receiveBuffer->flip(); + + processBuffer(&fromAddress, receiveBuffer); + } + } + else { + // log a 'recvfrom' error + if(bytesRead==-1) errlogSevPrintf(errlogMajor, + "Socket recv error: %s", strerror(errno)); + } + } + else { + // error (POLLERR, POLLHUP, or POLLNVAL) + if(pfd.revents&POLLERR) errlogSevPrintf( + errlogMajor, "Socket poll error (POLLERR)"); + if(pfd.revents&POLLHUP) errlogSevPrintf( + errlogMinor, "Socket poll error (POLLHUP)"); + if(pfd.revents&POLLNVAL) errlogSevPrintf( + errlogMajor, + "Socket poll error: server socket no longer bound."); + } + } + + // retval == 0 : timeout + + // retval < 0 : error + if(retval<0) errlogSevPrintf(errlogMajor, + "Socket poll error: %s", strerror(errno)); + } + } catch(...) { + // TODO: catch all exceptions, and act accordingly + close(true); + } + } + + bool BlockingUDPTransport::processBuffer(osiSockAddr* fromAddress, + ByteBuffer* receiveBuffer) { + // TODO: implement + return true; + } + } } diff --git a/pvAccessApp/remote/blockingUDPTransport.h b/pvAccessApp/remote/blockingUDPTransport.h index 9b5fe68..0b385e5 100644 --- a/pvAccessApp/remote/blockingUDPTransport.h +++ b/pvAccessApp/remote/blockingUDPTransport.h @@ -8,12 +8,16 @@ #ifndef BLOCKINGUDPTRANSPORT_H_ #define BLOCKINGUDPTRANSPORT_H_ +/* pvAccess */ #include "remote.h" #include "caConstants.h" +#include "inetAddressUtil.h" +/* pvData */ #include #include +/* EPICSv3 */ #include #include @@ -25,7 +29,8 @@ namespace epics { public TransportSendControl { public: BlockingUDPTransport(SOCKET channel, osiSockAddr* bindAddress, - osiSockAddr* sendAddresses, short remoteTransportRevision); + InetAddrVector* sendAddresses, + short remoteTransportRevision); virtual ~BlockingUDPTransport(); @@ -125,6 +130,22 @@ namespace epics { // noop } + /** + * Set ignore list. + * @param addresses list of ignored addresses. + */ + void setIgnoredAddresses(InetAddrVector* addresses) { + ignoredAddresses = addresses; + } + + /** + * Get list of ignored addresses. + * @return ignored addresses. + */ + InetAddrVector* getIgnoredAddresses() const { + return ignoredAddresses; + } + protected: bool closed; @@ -154,12 +175,12 @@ namespace epics { /** * Send addresses. */ - osiSockAddr* sendAddresses; + InetAddrVector* sendAddresses; /** * Ignore addresses. */ - osiSockAddr* ignoredAddresses; + InetAddrVector* ignoredAddresses; const osiSockAddr* sendTo; @@ -178,6 +199,11 @@ namespace epics { */ int lastMessageStartPosition; + /** + * Read buffer + */ + char* readBuffer; + }; }