BlockingTCPTransport: added TransportRegistry, using osiSock.h functions wherever possible blockingUDP.h: removed "using namespace" blockingUDPConnector.cpp: using osiSock.h functions wherever possible blockingUDPTransport: using osiSock.h functions wherever possible, delete -> delete[] remote.h: removed "using namespace" arrayFIFO.h: fixed default ctor, delete -> delete[] growingCircularBuffer.h: delete -> delete[] testBlockingUDPClnt.cpp: using osiSock.h functions wherever possible testBlockingUDPCSrv.cpp: using osiSock.h functions wherever possible
308 lines
11 KiB
C++
308 lines
11 KiB
C++
/* blockingUDPTransport.cpp
|
|
*
|
|
* Created on: Dec 20, 2010
|
|
* Author: Miha Vitorovic
|
|
*/
|
|
|
|
/* pvAccess */
|
|
#include "blockingUDP.h"
|
|
|
|
#include "caConstants.h"
|
|
#include "inetAddressUtil.h"
|
|
|
|
/* pvData */
|
|
#include <byteBuffer.h>
|
|
#include <lock.h>
|
|
|
|
/* EPICSv3 */
|
|
#include <osdSock.h>
|
|
#include <osiSock.h>
|
|
#include <errlog.h>
|
|
#include <epicsThread.h>
|
|
|
|
/* standard */
|
|
#include <cstdio>
|
|
#include <sys/types.h>
|
|
#include <sys/socket.h>
|
|
#include <poll.h>
|
|
#include <errno.h>
|
|
|
|
namespace epics {
|
|
namespace pvAccess {
|
|
|
|
using namespace epics::pvData;
|
|
|
|
BlockingUDPTransport::BlockingUDPTransport(
|
|
ResponseHandler* responseHandler, SOCKET channel,
|
|
osiSockAddr* bindAddress, InetAddrVector* sendAddresses,
|
|
short remoteTransportRevision) :
|
|
_closed(false), _responseHandler(responseHandler),
|
|
_channel(channel), _socketAddress(bindAddress),
|
|
_bindAddress(bindAddress), _sendAddresses(sendAddresses),
|
|
_ignoredAddresses(NULL), _sendTo(NULL), _receiveBuffer(
|
|
new ByteBuffer(MAX_UDP_RECV)), _sendBuffer(
|
|
new ByteBuffer(MAX_UDP_RECV)),
|
|
_lastMessageStartPosition(0), _readBuffer(
|
|
new char[MAX_UDP_RECV]), _mutex(new Mutex()),
|
|
_threadId(NULL) {
|
|
}
|
|
|
|
BlockingUDPTransport::~BlockingUDPTransport() {
|
|
close(true); // close the socket and stop the thread.
|
|
delete _receiveBuffer;
|
|
delete _sendBuffer;
|
|
delete[] _readBuffer;
|
|
delete _mutex;
|
|
}
|
|
|
|
void BlockingUDPTransport::start() {
|
|
String threadName = "UDP-receive "+inetAddressToString(
|
|
_socketAddress);
|
|
|
|
errlogSevPrintf(errlogInfo, "Starting thread: %s",
|
|
threadName.c_str());
|
|
|
|
_threadId = epicsThreadCreate(threadName.c_str(),
|
|
epicsThreadPriorityMedium, epicsThreadGetStackSize(
|
|
epicsThreadStackMedium),
|
|
BlockingUDPTransport::threadRunner, this);
|
|
}
|
|
|
|
void BlockingUDPTransport::close(bool forced) {
|
|
if(_closed) return;
|
|
_closed = true;
|
|
|
|
if(_bindAddress!=NULL) errlogSevPrintf(errlogInfo,
|
|
"UDP socket %s closed.",
|
|
inetAddressToString(_bindAddress).c_str());
|
|
|
|
epicsSocketDestroy(_channel);
|
|
}
|
|
|
|
void BlockingUDPTransport::enqueueSendRequest(TransportSender* sender) {
|
|
Lock lock(_mutex);
|
|
|
|
_sendTo = NULL;
|
|
_sendBuffer->clear();
|
|
sender->lock();
|
|
try {
|
|
sender->send(_sendBuffer, this);
|
|
sender->unlock();
|
|
endMessage();
|
|
send(_sendBuffer, _sendTo);
|
|
} catch(...) {
|
|
sender->unlock();
|
|
}
|
|
}
|
|
|
|
void BlockingUDPTransport::startMessage(int8 command,
|
|
int ensureCapacity) {
|
|
_lastMessageStartPosition = _sendBuffer->getPosition();
|
|
_sendBuffer->putShort(CA_MAGIC_AND_VERSION);
|
|
_sendBuffer->putByte(0); // data
|
|
_sendBuffer->putByte(command); // command
|
|
_sendBuffer->putInt(0); // temporary zero payload
|
|
}
|
|
|
|
void BlockingUDPTransport::endMessage() {
|
|
_sendBuffer->putInt(_lastMessageStartPosition+(sizeof(int16)+2),
|
|
_sendBuffer->getPosition()-_lastMessageStartPosition
|
|
-CA_MESSAGE_HEADER_SIZE);
|
|
|
|
}
|
|
|
|
void BlockingUDPTransport::processRead() {
|
|
// This function is always called from only one thread - this
|
|
// object's own thread.
|
|
|
|
pollfd pfd;
|
|
pfd.fd = _channel;
|
|
pfd.events = POLLIN;
|
|
|
|
osiSockAddr fromAddress;
|
|
|
|
try {
|
|
|
|
while(!_closed) {
|
|
// we poll to prevent blocking indefinitely
|
|
|
|
/* From 'accept' man page:
|
|
* In order to be notified of incoming connections on
|
|
* a socket, you can use select(2) or poll(2). A readable
|
|
* event will be delivered when a new connection is
|
|
* attempted and you may then call accept() to get a
|
|
* socket for that connection.
|
|
*/
|
|
|
|
int retval = poll(&pfd, 1, 100);
|
|
|
|
if(_closed) break; // if the dtor was called during wait
|
|
// none of the object properties are no longer valid.
|
|
|
|
if(retval>0) {
|
|
// activity on SOCKET
|
|
if(pfd.revents&POLLIN) {
|
|
// data ready to be read
|
|
_receiveBuffer->clear();
|
|
|
|
socklen_t addrStructSize = sizeof(sockaddr);
|
|
|
|
int bytesRead = recvfrom(_channel, _readBuffer,
|
|
MAX_UDP_RECV, 0, (sockaddr*)&fromAddress,
|
|
&addrStructSize);
|
|
|
|
if(bytesRead>0) {
|
|
// successfully got datagram
|
|
bool ignore = false;
|
|
if(_ignoredAddresses!=NULL) for(size_t i = 0; i
|
|
<_ignoredAddresses->size(); i++)
|
|
if(_ignoredAddresses->at(i)->ia.sin_addr.s_addr
|
|
==fromAddress.ia.sin_addr.s_addr) {
|
|
ignore = true;
|
|
break;
|
|
}
|
|
|
|
if(!ignore) {
|
|
_receiveBuffer->put(
|
|
_readBuffer,
|
|
0,
|
|
bytesRead
|
|
<_receiveBuffer->getRemaining() ? bytesRead
|
|
: _receiveBuffer->getRemaining());
|
|
|
|
_receiveBuffer->flip();
|
|
|
|
processBuffer(&fromAddress, _receiveBuffer);
|
|
}
|
|
}
|
|
else {
|
|
// log a 'recvfrom' error
|
|
if(bytesRead==-1) errlogSevPrintf(errlogMajor,
|
|
"Socket recv error: %s",
|
|
strerror(errno));
|
|
}
|
|
}
|
|
else {
|
|
// error (POLLERR, POLLHUP, or POLLNVAL)
|
|
if(pfd.revents&POLLERR) errlogSevPrintf(
|
|
errlogMajor, "Socket poll error (POLLERR)");
|
|
if(pfd.revents&POLLHUP) errlogSevPrintf(
|
|
errlogMinor, "Socket poll error (POLLHUP)");
|
|
if(pfd.revents&POLLNVAL) errlogSevPrintf(
|
|
errlogMajor,
|
|
"Socket poll error: server socket no longer bound.");
|
|
}
|
|
}
|
|
|
|
// retval == 0 : timeout
|
|
|
|
// retval < 0 : error
|
|
if(retval<0) errlogSevPrintf(errlogMajor,
|
|
"Socket poll error: %s", strerror(errno));
|
|
}
|
|
} catch(...) {
|
|
// TODO: catch all exceptions, and act accordingly
|
|
close(true);
|
|
}
|
|
|
|
char threadName[40];
|
|
epicsThreadGetName(_threadId, threadName, 40);
|
|
errlogSevPrintf(errlogInfo, "Thread '%s' exiting", threadName);
|
|
}
|
|
|
|
bool BlockingUDPTransport::processBuffer(osiSockAddr* fromAddress,
|
|
ByteBuffer* receiveBuffer) {
|
|
|
|
// handle response(s)
|
|
while(receiveBuffer->getRemaining()>=CA_MESSAGE_HEADER_SIZE) {
|
|
//
|
|
// read header
|
|
//
|
|
|
|
// first byte is CA_MAGIC
|
|
// second byte version - major/minor nibble
|
|
// check magic and version at once
|
|
short magicAndVersion = receiveBuffer->getShort();
|
|
if((short)(magicAndVersion&0xFFF0)!=CA_MAGIC_AND_MAJOR_VERSION) return false;
|
|
|
|
// only data for UDP
|
|
receiveBuffer->getByte();
|
|
|
|
// command ID and paylaod
|
|
int8 command = receiveBuffer->getByte();
|
|
int payloadSize = receiveBuffer->getInt();
|
|
int nextRequestPosition = receiveBuffer->getPosition()
|
|
+payloadSize;
|
|
|
|
// payload size check
|
|
if(nextRequestPosition>receiveBuffer->getLimit()) return false;
|
|
|
|
// handle
|
|
_responseHandler->handleResponse(fromAddress, this,
|
|
(int8)(magicAndVersion&0xFF), command, payloadSize,
|
|
_receiveBuffer);
|
|
|
|
// set position (e.g. in case handler did not read all)
|
|
receiveBuffer->setPosition(nextRequestPosition);
|
|
}
|
|
|
|
//all ok
|
|
return true;
|
|
}
|
|
|
|
bool BlockingUDPTransport::send(ByteBuffer* buffer,
|
|
const osiSockAddr* address) {
|
|
if(address==NULL&&_sendAddresses==NULL) return false;
|
|
|
|
if(address!=NULL) {
|
|
buffer->flip();
|
|
int retval =
|
|
sendto(_channel, buffer->getArray(),
|
|
buffer->getLimit(), 0, &(address->sa),
|
|
sizeof(sockaddr));
|
|
if(retval<0) {
|
|
errlogSevPrintf(errlogMajor, "Socket sendto error: %s",
|
|
strerror(errno));
|
|
return false;
|
|
}
|
|
}
|
|
else {
|
|
for(size_t i = 0; i<_sendAddresses->size(); i++) {
|
|
buffer->flip();
|
|
int retval = sendto(_channel, buffer->getArray(),
|
|
buffer->getLimit(), 0,
|
|
&(_sendAddresses->at(i)->sa), sizeof(sockaddr));
|
|
{
|
|
if(retval<0) errlogSevPrintf(errlogMajor,
|
|
"Socket sendto error: %s", strerror(errno));
|
|
return false;
|
|
}
|
|
}
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
int BlockingUDPTransport::getSocketReceiveBufferSize() const {
|
|
// Get value of the SO_RCVBUF option for this DatagramSocket,
|
|
// that is the buffer size used by the platform for input on
|
|
// this DatagramSocket.
|
|
|
|
int sockBufSize;
|
|
socklen_t intLen = sizeof(int);
|
|
|
|
int retval = getsockopt(_channel, SOL_SOCKET, SO_RCVBUF,
|
|
&sockBufSize, &intLen);
|
|
if(retval<0) errlogSevPrintf(errlogMajor,
|
|
"Socket getsockopt SO_RCVBUF error: %s", strerror(errno));
|
|
|
|
return sockBufSize;
|
|
}
|
|
|
|
void BlockingUDPTransport::threadRunner(void* param) {
|
|
((BlockingUDPTransport*)param)->processRead();
|
|
}
|
|
|
|
}
|
|
}
|