From 54b4862e16e38a74716c3474139b4806184e78b4 Mon Sep 17 00:00:00 2001 From: miha_vitorovic Date: Mon, 27 Dec 2010 13:58:49 +0100 Subject: [PATCH 1/6] Finished the implementation of BlockingUDPTransport. Ready for testing and debugging. --- pvAccessApp/remote/blockingUDPTransport.cpp | 208 +++++++++++++++----- pvAccessApp/remote/blockingUDPTransport.h | 82 +++++--- pvAccessApp/remote/remote.h | 24 ++- 3 files changed, 238 insertions(+), 76 deletions(-) diff --git a/pvAccessApp/remote/blockingUDPTransport.cpp b/pvAccessApp/remote/blockingUDPTransport.cpp index 095bfcc..764ae8d 100644 --- a/pvAccessApp/remote/blockingUDPTransport.cpp +++ b/pvAccessApp/remote/blockingUDPTransport.cpp @@ -1,5 +1,4 @@ -/* - * blockingUDPTransport.cpp +/* blockingUDPTransport.cpp * * Created on: Dec 20, 2010 * Author: Miha Vitorovic @@ -18,6 +17,7 @@ #include #include #include +#include /* standard */ #include @@ -30,67 +30,91 @@ namespace epics { using namespace epics::pvData; - BlockingUDPTransport::BlockingUDPTransport(SOCKET channel, + BlockingUDPTransport::BlockingUDPTransport( + ResponseHandler* responseHandler, SOCKET channel, osiSockAddr* bindAddress, InetAddrVector* sendAddresses, short remoteTransportRevision) { - this->channel = channel; - this->bindAddress = bindAddress; - this->sendAddresses = sendAddresses; + _responseHandler = responseHandler; + _channel = channel; + _bindAddress = bindAddress; + _sendAddresses = sendAddresses; - socketAddress = bindAddress; + _socketAddress = bindAddress; // allocate receive buffer - receiveBuffer = new ByteBuffer(MAX_UDP_RECV); + _receiveBuffer = new ByteBuffer(MAX_UDP_RECV); // allocate send buffer and non-reentrant lock - sendBuffer = new ByteBuffer(MAX_UDP_SEND); + _sendBuffer = new ByteBuffer(MAX_UDP_SEND); - ignoredAddresses = NULL; - sendTo = NULL; - closed = false; - lastMessageStartPosition = 0; - readBuffer = new char[MAX_UDP_RECV]; + _ignoredAddresses = NULL; + _sendTo = NULL; + _closed = false; + _lastMessageStartPosition = 0; + _readBuffer = new char[MAX_UDP_RECV]; } BlockingUDPTransport::~BlockingUDPTransport() { - delete receiveBuffer; - delete sendBuffer; - delete readBuffer; + delete _receiveBuffer; + delete _sendBuffer; + delete _readBuffer; } void BlockingUDPTransport::start() { - // TODO implement + String threadName = "UDP-receive "+inetAddressToString( + _socketAddress); + + errlogSevPrintf(errlogInfo, "Starting thread: %s", + threadName.c_str()); + + epicsThreadCreate(threadName.c_str(), epicsThreadPriorityMedium, + epicsThreadGetStackSize(epicsThreadStackMedium), + BlockingUDPTransport::threadRunner, this); } void BlockingUDPTransport::close(bool forced) { - if(closed) return; - closed = true; + if(_closed) return; + _closed = true; - if(bindAddress!=NULL) errlogSevPrintf(errlogInfo, + if(_bindAddress!=NULL) errlogSevPrintf(errlogInfo, "UDP connection to %s closed.", inetAddressToString( - bindAddress).c_str()); + _bindAddress).c_str()); - // TODO: finish implementation + int retval = ::close(_channel); + if(retval<0) errlogSevPrintf(errlogMajor, "Socket close error: %s", + strerror(errno)); } void BlockingUDPTransport::enqueueSendRequest(TransportSender* sender) { - // TODO implement + // TODO: Java version uses synchronized. Why? + + _sendTo = NULL; + _sendBuffer->clear(); + sender->lock(); + try { + sender->send(_sendBuffer, this); + sender->unlock(); + endMessage(); + if(_sendTo!=NULL) send(_sendBuffer, _sendTo); + } catch(...) { + sender->unlock(); + } } void BlockingUDPTransport::startMessage(int8 command, int ensureCapacity) { - lastMessageStartPosition = sendBuffer->getPosition(); - sendBuffer->putShort(CA_MAGIC_AND_VERSION); - sendBuffer->putByte(0); // data - sendBuffer->putByte(command); // command - sendBuffer->putInt(0); // temporary zero payload + _lastMessageStartPosition = _sendBuffer->getPosition(); + _sendBuffer->putShort(CA_MAGIC_AND_VERSION); + _sendBuffer->putByte(0); // data + _sendBuffer->putByte(command); // command + _sendBuffer->putInt(0); // temporary zero payload } void BlockingUDPTransport::endMessage() { - int32 data = lastMessageStartPosition+(16/8+2); - sendBuffer->put((char*)&data, sendBuffer->getPosition() - -lastMessageStartPosition-CA_MESSAGE_HEADER_SIZE, + int32 data = _lastMessageStartPosition+(16/8+2); + _sendBuffer->put((char*)&data, _sendBuffer->getPosition() + -_lastMessageStartPosition-CA_MESSAGE_HEADER_SIZE, sizeof(int32)); } @@ -99,14 +123,14 @@ namespace epics { // object's own thread. pollfd pfd; - pfd.fd = channel; + pfd.fd = _channel; pfd.events = POLLIN; osiSockAddr fromAddress; try { - while(!closed) { + while(!_closed) { // we poll to prevent blocking indefinitely /* From 'accept' man page: @@ -122,42 +146,43 @@ namespace epics { // activity on SOCKET if(pfd.revents&POLLIN) { // data ready to be read - receiveBuffer->clear(); + _receiveBuffer->clear(); socklen_t addrStructSize = sizeof(sockaddr); - int bytesRead = recvfrom(channel, readBuffer, + int bytesRead = recvfrom(_channel, _readBuffer, MAX_UDP_RECV, 0, (sockaddr*)&fromAddress, &addrStructSize); if(bytesRead>0) { // successfully got datagram bool ignore = false; - if(ignoredAddresses!=NULL) for(int i = 0; i - size(); i++) - if(ignoredAddresses->at(i)->ia.sin_addr.s_addr + if(_ignoredAddresses!=NULL) for(int i = 0; i + <_ignoredAddresses->size(); i++) + if(_ignoredAddresses->at(i)->ia.sin_addr.s_addr ==fromAddress.ia.sin_addr.s_addr) { ignore = true; break; } if(!ignore) { - receiveBuffer->put( - readBuffer, + _receiveBuffer->put( + _readBuffer, 0, bytesRead - getRemaining() ? bytesRead - : receiveBuffer->getRemaining()); + <_receiveBuffer->getRemaining() ? bytesRead + : _receiveBuffer->getRemaining()); - receiveBuffer->flip(); + _receiveBuffer->flip(); - processBuffer(&fromAddress, receiveBuffer); + processBuffer(&fromAddress, _receiveBuffer); } } else { // log a 'recvfrom' error if(bytesRead==-1) errlogSevPrintf(errlogMajor, - "Socket recv error: %s", strerror(errno)); + "Socket recv error: %s", + strerror(errno)); } } else { @@ -186,9 +211,98 @@ namespace epics { bool BlockingUDPTransport::processBuffer(osiSockAddr* fromAddress, ByteBuffer* receiveBuffer) { - // TODO: implement + + // handle response(s) + while(receiveBuffer->getRemaining()>=CA_MESSAGE_HEADER_SIZE) { + // + // read header + // + + // first byte is CA_MAGIC + // second byte version - major/minor nibble + // check magic and version at once + short magicAndVersion = receiveBuffer->getShort(); + if((short)(magicAndVersion&0xFFF0)!=CA_MAGIC_AND_MAJOR_VERSION) return false; + + // only data for UDP + receiveBuffer->getByte(); + + // command ID and paylaod + int8 command = receiveBuffer->getByte(); + int payloadSize = receiveBuffer->getInt(); + int nextRequestPosition = receiveBuffer->getPosition() + +payloadSize; + + // payload size check + if(nextRequestPosition>receiveBuffer->getLimit()) return false; + + // handle + _responseHandler->handleResponse(fromAddress, this, + (int8)(magicAndVersion&0xFF), command, payloadSize, + _receiveBuffer); + + // set position (e.g. in case handler did not read all) + receiveBuffer->setPosition(nextRequestPosition); + } + + //all ok return true; } + bool BlockingUDPTransport::send(ByteBuffer* buffer, + const osiSockAddr* address) { + if(address==NULL||_sendAddresses==NULL) return false; + + if(address!=NULL) { + buffer->flip(); + int retval = + sendto(_channel, buffer->getArray(), + buffer->getLimit(), 0, &(address->sa), + sizeof(sockaddr)); + if(retval<0) { + errlogSevPrintf(errlogMajor, "Socket sendto error: %s", + strerror(errno)); + return false; + } + } + else { + for(int i = 0; i<_sendAddresses->size(); i++) { + buffer->flip(); + int retval = sendto(_channel, buffer->getArray(), + buffer->getLimit(), 0, + &(_sendAddresses->at(i)->sa), sizeof(sockaddr)); + { + if(retval<0) errlogSevPrintf(errlogMajor, + "Socket sendto error: %s", strerror(errno)); + return false; + } + } + } + + return true; + } + + int BlockingUDPTransport::getSocketReceiveBufferSize() const { + // Get value of the SO_RCVBUF option for this DatagramSocket, + // that is the buffer size used by the platform for input on + // this DatagramSocket. + + int sockBufSize; + socklen_t intLen; + + intLen = sizeof(int); + + int retval = getsockopt(_channel, SOL_SOCKET, SO_RCVBUF, + &sockBufSize, &intLen); + if(retval<0) errlogSevPrintf(errlogMajor, + "Socket getsockopt SO_RCVBUF error: %s", strerror(errno)); + + return sockBufSize; + } + + void BlockingUDPTransport::threadRunner(void* param) { + ((BlockingUDPTransport*)param)->processRead(); + } + } } diff --git a/pvAccessApp/remote/blockingUDPTransport.h b/pvAccessApp/remote/blockingUDPTransport.h index 0b385e5..7b9b9ef 100644 --- a/pvAccessApp/remote/blockingUDPTransport.h +++ b/pvAccessApp/remote/blockingUDPTransport.h @@ -28,18 +28,19 @@ namespace epics { public Transport, public TransportSendControl { public: - BlockingUDPTransport(SOCKET channel, osiSockAddr* bindAddress, + BlockingUDPTransport(ResponseHandler* responseHandler, + SOCKET channel, osiSockAddr* bindAddress, InetAddrVector* sendAddresses, short remoteTransportRevision); virtual ~BlockingUDPTransport(); virtual bool isClosed() const { - return closed; + return _closed; } virtual const osiSockAddr* getRemoteAddress() const { - return socketAddress; + return _socketAddress; } virtual const String getType() const { @@ -55,17 +56,10 @@ namespace epics { } virtual int getReceiveBufferSize() const { - return receiveBuffer->getSize(); + return _receiveBuffer->getSize(); } - virtual int getSocketReceiveBufferSize() const { - // Get value of the SO_RCVBUF option for this DatagramSocket, - // that is the buffer size used by the platform for input on - // this DatagramSocket. - - // TODO: real implementation - return MAX_UDP_RECV; - } + virtual int getSocketReceiveBufferSize() const; virtual int16 getPriority() const { return CA_DEFAULT_PRIORITY; @@ -108,7 +102,7 @@ namespace epics { virtual void close(bool forced); virtual void ensureData(int size) { - // TODO: implement + // TODO Auto-generated method stub } virtual void startMessage(int8 command, int ensureCapacity); @@ -119,7 +113,7 @@ namespace epics { } virtual void setRecipient(const osiSockAddr* sendTo) { - this->sendTo = sendTo; + _sendTo = sendTo; } virtual void flushSerializeBuffer() { @@ -135,7 +129,7 @@ namespace epics { * @param addresses list of ignored addresses. */ void setIgnoredAddresses(InetAddrVector* addresses) { - ignoredAddresses = addresses; + _ignoredAddresses = addresses; } /** @@ -143,15 +137,47 @@ namespace epics { * @return ignored addresses. */ InetAddrVector* getIgnoredAddresses() const { - return ignoredAddresses; + return _ignoredAddresses; + } + + bool send(ByteBuffer* buffer, const osiSockAddr* address = NULL); + + /** + * Get list of send addresses. + * @return send addresses. + */ + InetAddrVector* getSendAddresses() { + return _sendAddresses; + } + + /** + * Get bind address. + * @return bind address. + */ + osiSockAddr* getBindAddress() { + return _bindAddress; + } + + /** + * Set list of send addresses. + * @param addresses list of send addresses, non-null. + */ + void setBroadcastAddresses(InetAddrVector* addresses) { + _sendAddresses = addresses; } protected: - bool closed; + bool _closed; + + /** + * Response handler. + */ + ResponseHandler* _responseHandler; virtual void processRead(); - private: + static void threadRunner(void* param); + bool processBuffer(osiSockAddr* fromAddress, ByteBuffer* receiveBuffer); @@ -160,49 +186,49 @@ namespace epics { /** * Corresponding channel. */ - SOCKET channel; + SOCKET _channel; /** * Cached socket address. */ - osiSockAddr* socketAddress; + osiSockAddr* _socketAddress; /** * Bind address. */ - osiSockAddr* bindAddress; + osiSockAddr* _bindAddress; /** * Send addresses. */ - InetAddrVector* sendAddresses; + InetAddrVector* _sendAddresses; /** * Ignore addresses. */ - InetAddrVector* ignoredAddresses; + InetAddrVector* _ignoredAddresses; - const osiSockAddr* sendTo; + const osiSockAddr* _sendTo; /** * Receive buffer. */ - epics::pvData::ByteBuffer* receiveBuffer; + epics::pvData::ByteBuffer* _receiveBuffer; /** * Send buffer. */ - epics::pvData::ByteBuffer* sendBuffer; + epics::pvData::ByteBuffer* _sendBuffer; /** * Last message start position. */ - int lastMessageStartPosition; + int _lastMessageStartPosition; /** * Read buffer */ - char* readBuffer; + char* _readBuffer; }; diff --git a/pvAccessApp/remote/remote.h b/pvAccessApp/remote/remote.h index 4681feb..57038c1 100644 --- a/pvAccessApp/remote/remote.h +++ b/pvAccessApp/remote/remote.h @@ -54,7 +54,7 @@ namespace epics { * NOTE: these limitations allows efficient implementation. */ virtual void - send(ByteBuffer* buffer, TransportSendControl* control) =0; + send(ByteBuffer* buffer, TransportSendControl* control) =0; virtual void lock() =0; virtual void unlock() =0; @@ -182,6 +182,28 @@ namespace epics { }; + /** + * Interface defining response handler. + * @author Matej Sekoranja + * @version $Id: ResponseHandler.java,v 1.1 2010/05/03 14:45:39 mrkraimer Exp $ + */ + class ResponseHandler { + public: + /** + * Handle response. + * @param[in] responseFrom remote address of the responder, null if unknown. + * @param[in] transport response source transport. + * @param[in] version message version. + * @param[in] payloadSize size of this message data available in the payloadBuffer. + * @param[in] payloadBuffer message payload data. + * Note that this might not be the only message in the buffer. + * Code must not manilupate buffer. + */ + virtual void handleResponse(osiSockAddr* responseFrom, + Transport* transport, int8 version, int8 command, + int payloadSize, ByteBuffer* payloadBuffer) =0; + }; + } } From 06842f78900e3f489afe8702b59a24f55dd930a1 Mon Sep 17 00:00:00 2001 From: miha_vitorovic Date: Tue, 28 Dec 2010 08:59:25 +0100 Subject: [PATCH 2/6] Makefile: renamed blockingUDP header, added blockingUDPConnector.cpp blockingUDPTransport.cpp: fixed ctor, addedmutex to 'enqueueSendRequest', started using 'sys/socket.h' remote.h: added TransportClient and Connector classes introspectionRegistry.h: organized #includes --- .cproject | 13 ++-- pvAccessApp/Makefile | 3 +- .../{blockingUDPTransport.h => blockingUDP.h} | 51 +++++++++++++- pvAccessApp/remote/blockingUDPConnector.cpp | 68 +++++++++++++++++++ pvAccessApp/remote/blockingUDPTransport.cpp | 37 ++++------ pvAccessApp/remote/remote.h | 54 +++++++++++++++ pvAccessApp/utils/introspectionRegistry.h | 22 +++--- 7 files changed, 208 insertions(+), 40 deletions(-) rename pvAccessApp/remote/{blockingUDPTransport.h => blockingUDP.h} (82%) create mode 100644 pvAccessApp/remote/blockingUDPConnector.cpp diff --git a/.cproject b/.cproject index fb31768..0e2a39f 100644 --- a/.cproject +++ b/.cproject @@ -26,7 +26,6 @@ @@ -45,7 +44,7 @@ - @@ -308,7 +307,6 @@ make - all true true @@ -316,12 +314,19 @@ make - clean true true false + + make + + uninstall + true + true + false + diff --git a/pvAccessApp/Makefile b/pvAccessApp/Makefile index 01422d6..b093ac9 100644 --- a/pvAccessApp/Makefile +++ b/pvAccessApp/Makefile @@ -37,8 +37,9 @@ LIBSRCS += CreateRequestFactory.cpp SRC_DIRS += $(PVACCESS)/remote INC += remote.h -INC += blockingUDPTransport.h +INC += blockingUDP.h LIBSRCS += blockingUDPTransport.cpp +LIBSRCS += blockingUDPConnector.cpp LIBRARY = pvAccess diff --git a/pvAccessApp/remote/blockingUDPTransport.h b/pvAccessApp/remote/blockingUDP.h similarity index 82% rename from pvAccessApp/remote/blockingUDPTransport.h rename to pvAccessApp/remote/blockingUDP.h index 7b9b9ef..f9281c2 100644 --- a/pvAccessApp/remote/blockingUDPTransport.h +++ b/pvAccessApp/remote/blockingUDP.h @@ -5,8 +5,8 @@ * Author: Miha Vitorovic */ -#ifndef BLOCKINGUDPTRANSPORT_H_ -#define BLOCKINGUDPTRANSPORT_H_ +#ifndef BLOCKINGUDP_H_ +#define BLOCKINGUDP_H_ /* pvAccess */ #include "remote.h" @@ -16,6 +16,7 @@ /* pvData */ #include #include +#include /* EPICSv3 */ #include @@ -230,9 +231,53 @@ namespace epics { */ char* _readBuffer; + /** + * Used for process sync. + */ + Mutex* _mutex; + + }; + + class BlockingUDPConnector : public Connector, NoDefaultMethods { + public: + + BlockingUDPConnector(bool reuseSocket, + InetAddrVector* sendAddresses, bool broadcast) : + _sendAddresses(sendAddresses), _reuseSocket(reuseSocket), + _broadcast(broadcast) { + } + + virtual ~BlockingUDPConnector() { + // TODO: delete _sendAddresses here? + } + + /** + * NOTE: transport client is ignored for broadcast (UDP). + */ + virtual Transport* connect(TransportClient* client, + ResponseHandler* responseHandler, osiSockAddr* bindAddress, + short transportRevision, short priority); + + private: + + /** + * Send address. + */ + InetAddrVector* _sendAddresses; + + /** + * Reuse socket flag. + */ + bool _reuseSocket; + + /** + * Broadcast flag. + */ + bool _broadcast; + }; } } -#endif /* BLOCKINGUDPTRANSPORT_H_ */ +#endif /* BLOCKINGUDP_H_ */ diff --git a/pvAccessApp/remote/blockingUDPConnector.cpp b/pvAccessApp/remote/blockingUDPConnector.cpp new file mode 100644 index 0000000..f207fee --- /dev/null +++ b/pvAccessApp/remote/blockingUDPConnector.cpp @@ -0,0 +1,68 @@ +/* + * blockingUDPConnector.cpp + * + * Created on: Dec 27, 2010 + * Author: Miha Vitorovic + */ + +/* pvAccess */ +#include "blockingUDP.h" +#include "remote.h" + +/* pvData */ +#include + +/* EPICSv3 */ +#include + +/* standard */ +#include +#include + +namespace epics { + namespace pvAccess { + + Transport* BlockingUDPConnector::connect(TransportClient* client, + ResponseHandler* responseHandler, osiSockAddr* bindAddress, + short transportRevision, short priority) { + errlogSevPrintf(errlogInfo, "Creating datagram socket to: %s", + inetAddressToString(bindAddress).c_str()); + + SOCKET socket = ::socket(PF_INET, SOCK_DGRAM, 0); + + /* from MSDN: + * Note: If the setsockopt function is called before the bind + * function, TCP/IP options will not be checked by using TCP/IP + * until the bind occurs. In this case, the setsockopt function + * call will always succeed, but the bind function call can fail + * because of an early setsockopt call failing. + */ + + int retval = ::bind(socket, (sockaddr*)&(bindAddress->sa), + sizeof(sockaddr)); + if(retval<0) { + errlogSevPrintf(errlogMajor, "Error binding socket: %s", + strerror(errno)); + THROW_BASE_EXCEPTION(strerror(errno)); + } + + // set the socket options + + int optval = 1; // true + + retval = ::setsockopt(socket, SOL_SOCKET, SO_REUSEADDR, &optval, + sizeof(optval)); + if(retval<0) errlogSevPrintf(errlogMajor, + "Error binding socket: %s", strerror(errno)); + + retval = ::setsockopt(socket, SOL_SOCKET, SO_BROADCAST, &optval, + sizeof(optval)); + + // sockets are blocking by default + + return new BlockingUDPTransport(responseHandler, socket, + bindAddress, _sendAddresses, transportRevision); + } + + } +} diff --git a/pvAccessApp/remote/blockingUDPTransport.cpp b/pvAccessApp/remote/blockingUDPTransport.cpp index 764ae8d..9a83973 100644 --- a/pvAccessApp/remote/blockingUDPTransport.cpp +++ b/pvAccessApp/remote/blockingUDPTransport.cpp @@ -5,13 +5,14 @@ */ /* pvAccess */ -#include "blockingUDPTransport.h" +#include "blockingUDP.h" #include "caConstants.h" #include "inetAddressUtil.h" /* pvData */ #include +#include /* EPICSv3 */ #include @@ -21,7 +22,8 @@ /* standard */ #include -#include +#include +#include #include #include @@ -33,31 +35,22 @@ namespace epics { BlockingUDPTransport::BlockingUDPTransport( ResponseHandler* responseHandler, SOCKET channel, osiSockAddr* bindAddress, InetAddrVector* sendAddresses, - short remoteTransportRevision) { - _responseHandler = responseHandler; - _channel = channel; - _bindAddress = bindAddress; - _sendAddresses = sendAddresses; - - _socketAddress = bindAddress; - - // allocate receive buffer - _receiveBuffer = new ByteBuffer(MAX_UDP_RECV); - - // allocate send buffer and non-reentrant lock - _sendBuffer = new ByteBuffer(MAX_UDP_SEND); - - _ignoredAddresses = NULL; - _sendTo = NULL; - _closed = false; - _lastMessageStartPosition = 0; - _readBuffer = new char[MAX_UDP_RECV]; + short remoteTransportRevision) : + _closed(false), _responseHandler(responseHandler), + _channel(channel), _socketAddress(bindAddress), + _bindAddress(bindAddress), _sendAddresses(sendAddresses), + _ignoredAddresses(NULL), _sendTo(NULL), _receiveBuffer( + new ByteBuffer(MAX_UDP_RECV)), _sendBuffer( + new ByteBuffer(MAX_UDP_RECV)), + _lastMessageStartPosition(0), _readBuffer( + new char[MAX_UDP_RECV]), _mutex(new Mutex()) { } BlockingUDPTransport::~BlockingUDPTransport() { delete _receiveBuffer; delete _sendBuffer; delete _readBuffer; + delete _mutex; } void BlockingUDPTransport::start() { @@ -87,7 +80,7 @@ namespace epics { } void BlockingUDPTransport::enqueueSendRequest(TransportSender* sender) { - // TODO: Java version uses synchronized. Why? + Lock lock(_mutex); _sendTo = NULL; _sendBuffer->clear(); diff --git a/pvAccessApp/remote/remote.h b/pvAccessApp/remote/remote.h index 57038c1..e8b0dc2 100644 --- a/pvAccessApp/remote/remote.h +++ b/pvAccessApp/remote/remote.h @@ -204,6 +204,60 @@ namespace epics { int payloadSize, ByteBuffer* payloadBuffer) =0; }; + /** + * Client (user) of the transport. + * @author Matej Sekoranja + * @version $Id: TransportClient.java,v 1.1 2010/05/03 14:45:39 mrkraimer Exp $ + */ + class TransportClient { + public: + /** + * Notification of unresponsive transport (e.g. no heartbeat detected) . + */ + virtual void transportUnresponsive() =0; + + /** + * Notification of responsive transport (e.g. heartbeat detected again), + * called to discard transportUnresponsive notification. + * @param transport responsive transport. + */ + virtual void transportResponsive(Transport* transport) =0; + + /** + * Notification of network change (server restarted). + */ + virtual void transportChanged() =0; + + /** + * Notification of forcefully closed transport. + */ + virtual void transportClosed() =0; + + }; + + + /** + * Interface defining socket connector (Connector-Transport pattern). + * @author Matej Sekoranja + * @version $Id: Connector.java,v 1.1 2010/05/03 14:45:39 mrkraimer Exp $ + */ + class Connector { + public: + /** + * Connect. + * @param[in] client client requesting connection (transport). + * @param[in] address address of the server. + * @param[in] responseHandler reponse handler. + * @param[in] transportRevision transport revision to be used. + * @param[in] priority process priority. + * @return transport instance. + * @throws ConnectionException + */ + virtual Transport* connect(TransportClient* client, ResponseHandler* responseHandler, + osiSockAddr* address, short transportRevision, short priority) =0; + + }; + } } diff --git a/pvAccessApp/utils/introspectionRegistry.h b/pvAccessApp/utils/introspectionRegistry.h index dd32d1d..549fbdd 100644 --- a/pvAccessApp/utils/introspectionRegistry.h +++ b/pvAccessApp/utils/introspectionRegistry.h @@ -5,18 +5,20 @@ #ifndef INTROSPECTIONREGISTRY_H #define INTROSPECTIONREGISTRY_H -#include -#include +#include +#include +#include +#include +#include +#include +#include +#include + #include -#include "lock.h" -#include "pvIntrospect.h" -#include "pvData.h" -#include "byteBuffer.h" -#include "serialize.h" -#include "serializeHelper.h" -#include "status.h" -#include "standardField.h" +#include +#include + using namespace epics::pvData; using namespace std; From 16542b9e3c3124fac3e58362a23932897c4b1546 Mon Sep 17 00:00:00 2001 From: miha_vitorovic Date: Tue, 28 Dec 2010 09:06:38 +0100 Subject: [PATCH 3/6] Set socket options based on Connector properties. --- pvAccessApp/remote/blockingUDPConnector.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pvAccessApp/remote/blockingUDPConnector.cpp b/pvAccessApp/remote/blockingUDPConnector.cpp index f207fee..442bef8 100644 --- a/pvAccessApp/remote/blockingUDPConnector.cpp +++ b/pvAccessApp/remote/blockingUDPConnector.cpp @@ -48,13 +48,13 @@ namespace epics { // set the socket options - int optval = 1; // true - + int optval = _reuseSocket ? 1 : 0; retval = ::setsockopt(socket, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)); if(retval<0) errlogSevPrintf(errlogMajor, "Error binding socket: %s", strerror(errno)); + optval = _broadcast ? 1 : 0; retval = ::setsockopt(socket, SOL_SOCKET, SO_BROADCAST, &optval, sizeof(optval)); From 58f03384c27554ae4e90cfb3208c219c588fc756 Mon Sep 17 00:00:00 2001 From: miha_vitorovic Date: Tue, 28 Dec 2010 10:09:37 +0100 Subject: [PATCH 4/6] UDP transport cleanup - closing the socket and stopping the thread; --- pvAccessApp/remote/blockingUDP.h | 6 ++++++ pvAccessApp/remote/blockingUDPTransport.cpp | 19 +++++++++++++++---- 2 files changed, 21 insertions(+), 4 deletions(-) diff --git a/pvAccessApp/remote/blockingUDP.h b/pvAccessApp/remote/blockingUDP.h index f9281c2..55dce30 100644 --- a/pvAccessApp/remote/blockingUDP.h +++ b/pvAccessApp/remote/blockingUDP.h @@ -21,6 +21,7 @@ /* EPICSv3 */ #include #include +#include namespace epics { namespace pvAccess { @@ -236,6 +237,11 @@ namespace epics { */ Mutex* _mutex; + /** + * Thread ID + */ + epicsThreadId _threadId; + }; class BlockingUDPConnector : public Connector, NoDefaultMethods { diff --git a/pvAccessApp/remote/blockingUDPTransport.cpp b/pvAccessApp/remote/blockingUDPTransport.cpp index 9a83973..1ea101c 100644 --- a/pvAccessApp/remote/blockingUDPTransport.cpp +++ b/pvAccessApp/remote/blockingUDPTransport.cpp @@ -43,10 +43,12 @@ namespace epics { new ByteBuffer(MAX_UDP_RECV)), _sendBuffer( new ByteBuffer(MAX_UDP_RECV)), _lastMessageStartPosition(0), _readBuffer( - new char[MAX_UDP_RECV]), _mutex(new Mutex()) { + new char[MAX_UDP_RECV]), _mutex(new Mutex()), + _threadId(NULL) { } BlockingUDPTransport::~BlockingUDPTransport() { + close(true); // close the socket and stop the thread. delete _receiveBuffer; delete _sendBuffer; delete _readBuffer; @@ -60,8 +62,9 @@ namespace epics { errlogSevPrintf(errlogInfo, "Starting thread: %s", threadName.c_str()); - epicsThreadCreate(threadName.c_str(), epicsThreadPriorityMedium, - epicsThreadGetStackSize(epicsThreadStackMedium), + _threadId = epicsThreadCreate(threadName.c_str(), + epicsThreadPriorityMedium, epicsThreadGetStackSize( + epicsThreadStackMedium), BlockingUDPTransport::threadRunner, this); } @@ -70,7 +73,7 @@ namespace epics { _closed = true; if(_bindAddress!=NULL) errlogSevPrintf(errlogInfo, - "UDP connection to %s closed.", inetAddressToString( + "UDP socket %s closed.", inetAddressToString( _bindAddress).c_str()); int retval = ::close(_channel); @@ -135,6 +138,10 @@ namespace epics { */ int retval = poll(&pfd, 1, 100); + + if(_closed) break; // if the dtor was called during wait + // none of the object properties are no longer valid. + if(retval>0) { // activity on SOCKET if(pfd.revents&POLLIN) { @@ -200,6 +207,10 @@ namespace epics { // TODO: catch all exceptions, and act accordingly close(true); } + + char threadName[40]; + epicsThreadGetName(_threadId, threadName, 40); + errlogSevPrintf(errlogInfo, "Thread '%s' exiting", threadName); } bool BlockingUDPTransport::processBuffer(osiSockAddr* fromAddress, From fe5ea9442c674eec8ce90496257b451a354ee6c8 Mon Sep 17 00:00:00 2001 From: miha_vitorovic Date: Tue, 28 Dec 2010 15:47:05 +0100 Subject: [PATCH 5/6] A working blockingUDPTransport with test. TODO: debug stopping listener thread. --- pvAccessApp/remote/blockingUDPTransport.cpp | 23 ++--- pvAccessApp/remote/remote.h | 2 +- pvAccessApp/utils/inetAddressUtil.cpp | 12 +-- testApp/Makefile | 1 + testApp/remote/Makefile | 17 ++++ testApp/remote/testBlockingUDPClnt.cpp | 98 +++++++++++++++++++++ testApp/remote/testBlockingUDPSrv.cpp | 95 ++++++++++++++++++++ 7 files changed, 232 insertions(+), 16 deletions(-) create mode 100644 testApp/remote/Makefile create mode 100644 testApp/remote/testBlockingUDPClnt.cpp create mode 100644 testApp/remote/testBlockingUDPSrv.cpp diff --git a/pvAccessApp/remote/blockingUDPTransport.cpp b/pvAccessApp/remote/blockingUDPTransport.cpp index 1ea101c..b8131b4 100644 --- a/pvAccessApp/remote/blockingUDPTransport.cpp +++ b/pvAccessApp/remote/blockingUDPTransport.cpp @@ -73,8 +73,8 @@ namespace epics { _closed = true; if(_bindAddress!=NULL) errlogSevPrintf(errlogInfo, - "UDP socket %s closed.", inetAddressToString( - _bindAddress).c_str()); + "UDP socket %s closed.", + inetAddressToString(_bindAddress).c_str()); int retval = ::close(_channel); @@ -92,7 +92,7 @@ namespace epics { sender->send(_sendBuffer, this); sender->unlock(); endMessage(); - if(_sendTo!=NULL) send(_sendBuffer, _sendTo); + send(_sendBuffer, _sendTo); } catch(...) { sender->unlock(); } @@ -108,10 +108,13 @@ namespace epics { } void BlockingUDPTransport::endMessage() { - int32 data = _lastMessageStartPosition+(16/8+2); - _sendBuffer->put((char*)&data, _sendBuffer->getPosition() - -_lastMessageStartPosition-CA_MESSAGE_HEADER_SIZE, - sizeof(int32)); + int oldPosition = _sendBuffer->getPosition(); + _sendBuffer->setPosition(_lastMessageStartPosition + +(sizeof(int16)+2)); + _sendBuffer->putInt(oldPosition-_lastMessageStartPosition + -CA_MESSAGE_HEADER_SIZE); + _sendBuffer->setPosition(oldPosition); + } void BlockingUDPTransport::processRead() { @@ -157,7 +160,7 @@ namespace epics { if(bytesRead>0) { // successfully got datagram bool ignore = false; - if(_ignoredAddresses!=NULL) for(int i = 0; i + if(_ignoredAddresses!=NULL) for(size_t i = 0; i <_ignoredAddresses->size(); i++) if(_ignoredAddresses->at(i)->ia.sin_addr.s_addr ==fromAddress.ia.sin_addr.s_addr) { @@ -255,7 +258,7 @@ namespace epics { bool BlockingUDPTransport::send(ByteBuffer* buffer, const osiSockAddr* address) { - if(address==NULL||_sendAddresses==NULL) return false; + if(address==NULL&&_sendAddresses==NULL) return false; if(address!=NULL) { buffer->flip(); @@ -270,7 +273,7 @@ namespace epics { } } else { - for(int i = 0; i<_sendAddresses->size(); i++) { + for(size_t i = 0; i<_sendAddresses->size(); i++) { buffer->flip(); int retval = sendto(_channel, buffer->getArray(), buffer->getLimit(), 0, diff --git a/pvAccessApp/remote/remote.h b/pvAccessApp/remote/remote.h index e8b0dc2..46f3509 100644 --- a/pvAccessApp/remote/remote.h +++ b/pvAccessApp/remote/remote.h @@ -197,7 +197,7 @@ namespace epics { * @param[in] payloadSize size of this message data available in the payloadBuffer. * @param[in] payloadBuffer message payload data. * Note that this might not be the only message in the buffer. - * Code must not manilupate buffer. + * Code must not manipulate buffer. */ virtual void handleResponse(osiSockAddr* responseFrom, Transport* transport, int8 version, int8 command, diff --git a/pvAccessApp/utils/inetAddressUtil.cpp b/pvAccessApp/utils/inetAddressUtil.cpp index 8c9de71..e513380 100644 --- a/pvAccessApp/utils/inetAddressUtil.cpp +++ b/pvAccessApp/utils/inetAddressUtil.cpp @@ -214,11 +214,13 @@ namespace epics { bool displayHex) { stringstream saddr; - saddr<<(int)((addr->ia.sin_addr.s_addr)>>24)<<'.'; - saddr<<((int)((addr->ia.sin_addr.s_addr)>>16)&0xFF)<<'.'; - saddr<<((int)((addr->ia.sin_addr.s_addr)>>8)&0xFF)<<'.'; - saddr<<((int)(addr->ia.sin_addr.s_addr)&0xFF); - if(addr->ia.sin_port>0) saddr<<":"<ia.sin_port; + int ipa = ntohl(addr->ia.sin_addr.s_addr); + + saddr<<((int)(ipa>>24)&0xFF)<<'.'; + saddr<<((int)(ipa>>16)&0xFF)<<'.'; + saddr<<((int)(ipa>>8)&0xFF)<<'.'; + saddr<<((int)ipa&0xFF); + if(addr->ia.sin_port>0) saddr<<":"<ia.sin_port); if(displayHex) saddr<<" ("<ia.sin_addr.s_addr))<<")"; diff --git a/testApp/Makefile b/testApp/Makefile index 700bd55..0b70107 100644 --- a/testApp/Makefile +++ b/testApp/Makefile @@ -2,4 +2,5 @@ TOP = .. include $(TOP)/configure/CONFIG DIRS += utils DIRS += client +DIRS += remote include $(TOP)/configure/RULES_DIRS diff --git a/testApp/remote/Makefile b/testApp/remote/Makefile new file mode 100644 index 0000000..8e5c932 --- /dev/null +++ b/testApp/remote/Makefile @@ -0,0 +1,17 @@ +TOP=../.. + +include $(TOP)/configure/CONFIG + +PROD_HOST += testBlockingUDPSrv +testBlockingUDPSrv_SRCS += testBlockingUDPSrv.cpp +testBlockingUDPSrv_LIBS += pvData pvAccess Com + +PROD_HOST += testBlockingUDPClnt +testBlockingUDPClnt_SRCS += testBlockingUDPClnt.cpp +testBlockingUDPClnt_LIBS += pvData pvAccess Com + + +include $(TOP)/configure/RULES +#---------------------------------------- +# ADD RULES AFTER THIS LINE + diff --git a/testApp/remote/testBlockingUDPClnt.cpp b/testApp/remote/testBlockingUDPClnt.cpp new file mode 100644 index 0000000..c874898 --- /dev/null +++ b/testApp/remote/testBlockingUDPClnt.cpp @@ -0,0 +1,98 @@ +/* + * testBlockingUDPClnt.cpp + * + * Created on: Dec 28, 2010 + * Author: Miha Vitorovic + */ + +#include "remote.h" +#include "blockingUDP.h" +#include "logger.h" +#include "inetAddressUtil.h" + +#include + +#include +#include + +using namespace epics::pvAccess; +using namespace epics::pvData; + +using std::cout; +using std::endl; +using std::sscanf; + +static osiSockAddr sendTo; + +class DummyResponseHandler : public ResponseHandler { +public: + virtual void handleResponse(osiSockAddr* responseFrom, + Transport* transport, int8 version, int8 command, int payloadSize, + ByteBuffer* payloadBuffer) { + } +}; + +class DummyTransportSender : public TransportSender { +public: + DummyTransportSender() { + for(int i = 0; i<20; i++) + data[i] = (char)(i+1); + count = 0; + } + + virtual void send(ByteBuffer* buffer, TransportSendControl* control) { + // set recipient + sendTo.ia.sin_family = AF_INET; + sendTo.ia.sin_port = htons(65000); + if(inet_aton("192.168.71.129",&sendTo.ia.sin_addr)==0) { + cout<<"error in inet_aton()"<setRecipient(&sendTo); + + // send the packet + count++; + control->startMessage((int8)(count+0x10), 0); + buffer->put(data, 0, count); + //control->endMessage(); + } + + virtual void lock() { + } + virtual void unlock() { + } +private: + char data[20]; + int count; +}; + +void testBlockingUDPSender() { + BlockingUDPConnector connector(false, NULL, true); + + DummyTransportSender dts; + DummyResponseHandler drh; + + osiSockAddr bindAddr; + + bindAddr.ia.sin_family = AF_INET; + bindAddr.ia.sin_port = htons(65001); + bindAddr.ia.sin_addr.s_addr = htonl(INADDR_ANY); + + Transport* transport = connector.connect(NULL, &drh, &bindAddr, 1, 50); + + cout<<"Sending 10 packets..."<enqueueSendRequest(&dts); + sleep(1); + } + +} + +int main(int argc, char *argv[]) { + createFileLogger("testBlockingUDPClnt.log"); + + testBlockingUDPSender(); + return (0); +} diff --git a/testApp/remote/testBlockingUDPSrv.cpp b/testApp/remote/testBlockingUDPSrv.cpp new file mode 100644 index 0000000..9df1d59 --- /dev/null +++ b/testApp/remote/testBlockingUDPSrv.cpp @@ -0,0 +1,95 @@ +/* + * blockingUDPTest.cpp + * + * Created on: Dec 28, 2010 + * Author: Miha Vitorovic + */ + +#include "remote.h" +#include "blockingUDP.h" +#include "logger.h" +#include "inetAddressUtil.h" +#include "hexDump.h" + +#include +#include + +using namespace epics::pvAccess; +using std::cout; +using std::endl; +using std::hex; +using std::dec; + +class DummyResponseHandler : public ResponseHandler { +public: + DummyResponseHandler() : + packets(0) { + } + + int getPackets() { + return packets; + } + + virtual void handleResponse(osiSockAddr* responseFrom, + Transport* transport, int8 version, int8 command, int payloadSize, + ByteBuffer* payloadBuffer); +private: + int packets; +}; + +void DummyResponseHandler::handleResponse(osiSockAddr* responseFrom, + Transport* transport, int8 version, int8 command, int payloadSize, + ByteBuffer* payloadBuffer) { + + packets++; + + std::ostringstream os; + + cout<<"Received new UDP datagram["<get(payload, 0, dataCount); + os<<"Payload ("<start(); + + cout<<"Waiting for 10 packets..."< Date: Tue, 28 Dec 2010 16:29:15 +0100 Subject: [PATCH 6/6] Some changes to how transport is destroyed. Added virtual dtor to Transport class. In server the transport is now explicitly deleted. --- pvAccessApp/remote/remote.h | 3 +++ testApp/remote/testBlockingUDPSrv.cpp | 8 ++++---- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/pvAccessApp/remote/remote.h b/pvAccessApp/remote/remote.h index 46f3509..5eb5813 100644 --- a/pvAccessApp/remote/remote.h +++ b/pvAccessApp/remote/remote.h @@ -67,6 +67,9 @@ namespace epics { */ class Transport : public DeserializableControl { public: + virtual ~Transport() { + } + /** * Get remote address. * @return remote address. diff --git a/testApp/remote/testBlockingUDPSrv.cpp b/testApp/remote/testBlockingUDPSrv.cpp index 9df1d59..695d980 100644 --- a/testApp/remote/testBlockingUDPSrv.cpp +++ b/testApp/remote/testBlockingUDPSrv.cpp @@ -40,12 +40,9 @@ private: void DummyResponseHandler::handleResponse(osiSockAddr* responseFrom, Transport* transport, int8 version, int8 command, int payloadSize, ByteBuffer* payloadBuffer) { - - packets++; - std::ostringstream os; - cout<<"Received new UDP datagram["<