diff --git a/.cproject b/.cproject index 0e2a39f..e91f269 100644 --- a/.cproject +++ b/.cproject @@ -307,13 +307,15 @@ make + all true true - true + false make + clean true true @@ -321,7 +323,6 @@ make - uninstall true true diff --git a/pvAccessApp/remote/blockingClientTCPTransport.cpp b/pvAccessApp/remote/blockingClientTCPTransport.cpp index 75f15d4..a18f7a5 100644 --- a/pvAccessApp/remote/blockingClientTCPTransport.cpp +++ b/pvAccessApp/remote/blockingClientTCPTransport.cpp @@ -39,6 +39,7 @@ namespace epics { *1000), _unresponsiveTransport(false), _timerNode( new TimerNode(this)), _mutex(new Mutex()), _ownersMutex( new Mutex()), _verifyOrEcho(true) { + _autoDelete = false; // initialize owners list, send queue acquire(client); @@ -188,7 +189,7 @@ namespace epics { * send verification response message */ - control->startMessage(1, 2*sizeof(int32)+sizeof(int16)); + control->startMessage(CMD_CONNECTION_VALIDATION, 2*sizeof(int32)+sizeof(int16)); // receive buffer size buffer->putInt(getReceiveBufferSize()); @@ -205,7 +206,7 @@ namespace epics { _verifyOrEcho = false; } else { - control->startMessage(2, 0); + control->startMessage(CMD_ECHO, 0); // send immediately control->flush(true); } diff --git a/pvAccessApp/remote/blockingServerTCPTransport.cpp b/pvAccessApp/remote/blockingServerTCPTransport.cpp index 32bb1db..b051e2c 100644 --- a/pvAccessApp/remote/blockingServerTCPTransport.cpp +++ b/pvAccessApp/remote/blockingServerTCPTransport.cpp @@ -33,7 +33,7 @@ namespace epics { _introspectionRegistry(new IntrospectionRegistry(true)), _lastChannelSID(0), _channels( new map ()), _channelsMutex( - new Mutex()), _notifyOnClose(NULL) { + new Mutex()) { // NOTE: priority not yet known, default priority is used to register/unregister // TODO implement priorities in Reactor... not that user will // change it.. still getPriority() must return "registered" priority! @@ -68,7 +68,6 @@ namespace epics { void BlockingServerTCPTransport::internalClose(bool force) { BlockingTCPTransport::internalClose(force); - if(_notifyOnClose!=NULL) _notifyOnClose->transportClosed(this); destroyAllChannels(); } diff --git a/pvAccessApp/remote/blockingTCP.h b/pvAccessApp/remote/blockingTCP.h index 8e82224..d7302e3 100644 --- a/pvAccessApp/remote/blockingTCP.h +++ b/pvAccessApp/remote/blockingTCP.h @@ -37,7 +37,6 @@ namespace epics { namespace pvAccess { class MonitorSender; - class BlockingServerTCPTransport; enum ReceiveStage { READ_FROM_SOCKET, PROCESS_HEADER, PROCESS_PAYLOAD, NONE @@ -47,19 +46,6 @@ namespace epics { IMMEDIATE, DELAYED, USER_CONTROLED }; - class TransportCloseNotification { - public: - virtual ~TransportCloseNotification() { - } - - /** - * When transport closes, the owner will be notified through this - * callback - */ - virtual void - transportClosed(BlockingServerTCPTransport* transport) =0; - }; - class BlockingTCPTransport : public Transport, public TransportSendControl { public: @@ -67,8 +53,6 @@ namespace epics { ResponseHandler* responseHandler, int receiveBufferSize, int16 priority); - virtual ~BlockingTCPTransport(); - virtual bool isClosed() const { return _closed; } @@ -133,7 +117,7 @@ namespace epics { _verified = true; } - virtual void setRecipient(const osiSockAddr* sendTo) { + virtual void setRecipient(const osiSockAddr& sendTo) { // noop } @@ -253,6 +237,8 @@ namespace epics { volatile int64 _remoteBufferFreeSpace; + volatile bool _autoDelete; + virtual void processReadCached(bool nestedCall, ReceiveStage inStage, int requiredBytes, bool addToBuffer); @@ -271,6 +257,8 @@ namespace epics { */ virtual bool send(epics::pvData::ByteBuffer* buffer); + virtual ~BlockingTCPTransport(); + private: /** * Default marker period. @@ -352,6 +340,8 @@ namespace epics { Context* _context; + volatile bool _sendThreadRunning; + /** * Internal method that clears and releases buffer. * sendLock and sendBufferLock must be hold while calling this method. @@ -387,8 +377,6 @@ namespace epics { TransportClient* client, short remoteTransportRevision, float beaconInterval, int16 priority); - virtual ~BlockingClientTCPTransport(); - virtual void timerStopped() { // noop } @@ -444,6 +432,8 @@ namespace epics { virtual void internalClose(bool force); + virtual ~BlockingClientTCPTransport(); + private: /** @@ -502,7 +492,7 @@ namespace epics { virtual ~BlockingTCPConnector(); virtual Transport* connect(TransportClient* client, - ResponseHandler* responseHandler, osiSockAddr* address, + ResponseHandler* responseHandler, osiSockAddr& address, short transportRevision, int16 priority); private: /** @@ -538,7 +528,7 @@ namespace epics { * @return the SOCKET * @throws IOException */ - SOCKET tryConnect(osiSockAddr* address, int tries); + SOCKET tryConnect(osiSockAddr& address, int tries); }; @@ -549,8 +539,6 @@ namespace epics { BlockingServerTCPTransport(Context* context, SOCKET channel, ResponseHandler* responseHandler, int receiveBufferSize); - virtual ~BlockingServerTCPTransport(); - virtual IntrospectionRegistry* getIntrospectionRegistry() { return _introspectionRegistry; } @@ -638,10 +626,6 @@ namespace epics { virtual void send(epics::pvData::ByteBuffer* buffer, TransportSendControl* control); - void addCloseNotification(TransportCloseNotification* notifyTarget) { - _notifyOnClose = notifyTarget; - } - protected: /** * Introspection registry. @@ -650,6 +634,8 @@ namespace epics { virtual void internalClose(bool force); + virtual ~BlockingServerTCPTransport(); + private: /** * Last SID cache. @@ -663,8 +649,6 @@ namespace epics { Mutex* _channelsMutex; - TransportCloseNotification* _notifyOnClose; - /** * Destroy all channels. */ @@ -676,7 +660,7 @@ namespace epics { * @author Matej Sekoranja * @version $Id: BlockingTCPAcceptor.java,v 1.1 2010/05/03 14:45:42 mrkraimer Exp $ */ - class BlockingTCPAcceptor : public TransportCloseNotification { + class BlockingTCPAcceptor { public: /** @@ -705,8 +689,6 @@ namespace epics { */ void destroy(); - virtual void transportClosed(BlockingServerTCPTransport* transport); - private: /** * Context instance. @@ -735,10 +717,6 @@ namespace epics { epicsThreadId _threadId; - std::set* _connectedClients; - - Mutex* _connectedClientsMutex; - /** * Initialize connection acception. * @return port where server is listening diff --git a/pvAccessApp/remote/blockingTCPAcceptor.cpp b/pvAccessApp/remote/blockingTCPAcceptor.cpp index ab4c21e..c5bfcb7 100644 --- a/pvAccessApp/remote/blockingTCPAcceptor.cpp +++ b/pvAccessApp/remote/blockingTCPAcceptor.cpp @@ -24,7 +24,6 @@ #include using std::ostringstream; -using std::set; namespace epics { namespace pvAccess { @@ -33,9 +32,7 @@ namespace epics { int receiveBufferSize) : _context(context), _bindAddress(NULL), _serverSocketChannel( INVALID_SOCKET), _receiveBufferSize(receiveBufferSize), - _destroyed(false), _threadId(NULL), _connectedClients( - new set ()), - _connectedClientsMutex(new Mutex()) { + _destroyed(false), _threadId(NULL) { initialize(port); } @@ -43,22 +40,6 @@ namespace epics { destroy(); if(_bindAddress!=NULL) delete _bindAddress; - - _connectedClientsMutex->lock(); - // go through all the connected clients, close them, and destroy - set::iterator it = - _connectedClients->begin(); - while(it!=_connectedClients->end()) { - BlockingServerTCPTransport* client = *it; - it++; - client->close(true); - delete client; - } - _connectedClients->clear(); - delete _connectedClients; - _connectedClientsMutex->unlock(); - - delete _connectedClientsMutex; } int BlockingTCPAcceptor::initialize(in_port_t port) { @@ -254,16 +235,9 @@ namespace epics { errlogInfo, "Connection to CA client %s failed to be validated, closing it.", ipAddrStr); - delete transport; return; } - // store the new connected client - _connectedClientsMutex->lock(); - _connectedClients->insert(transport); - transport->addCloseNotification(this); - _connectedClientsMutex->unlock(); - errlogSevPrintf(errlogInfo, "Serving to CA client: %s", ipAddrStr); @@ -307,16 +281,5 @@ namespace epics { } } - void BlockingTCPAcceptor::transportClosed( - BlockingServerTCPTransport* transport) { - Lock lock(_connectedClientsMutex); - - // remove the closed client from the list of connected clients - _connectedClients->erase(transport); - - // release the memory - delete transport; - } - } } diff --git a/pvAccessApp/remote/blockingTCPConnector.cpp b/pvAccessApp/remote/blockingTCPConnector.cpp index 0fd7687..4a066b6 100644 --- a/pvAccessApp/remote/blockingTCPConnector.cpp +++ b/pvAccessApp/remote/blockingTCPConnector.cpp @@ -32,13 +32,13 @@ namespace epics { delete _namedLocker; } - SOCKET BlockingTCPConnector::tryConnect(osiSockAddr* address, int tries) { + SOCKET BlockingTCPConnector::tryConnect(osiSockAddr& address, int tries) { for(int tryCount = 0; tryCount0) epicsThreadSleep(0.1); char strBuffer[64]; - ipAddrToA(&address->ia, strBuffer, sizeof(strBuffer)); + ipAddrToA(&address.ia, strBuffer, sizeof(strBuffer)); errlogSevPrintf(errlogInfo, "Opening socket to CA server %s, attempt %d.", @@ -53,7 +53,7 @@ namespace epics { strBuffer); } else { - if(::connect(socket, &address->sa, sizeof(sockaddr))==0) + if(::connect(socket, &address.sa, sizeof(sockaddr))==0) return socket; else { epicsSocketConvertErrnoToString(strBuffer, @@ -67,19 +67,19 @@ namespace epics { } Transport* BlockingTCPConnector::connect(TransportClient* client, - ResponseHandler* responseHandler, osiSockAddr* address, + ResponseHandler* responseHandler, osiSockAddr& address, short transportRevision, int16 priority) { SOCKET socket = INVALID_SOCKET; char ipAddrStr[64]; - ipAddrToA(&address->ia, ipAddrStr, sizeof(ipAddrStr)); + ipAddrToA(&address.ia, ipAddrStr, sizeof(ipAddrStr)); // first try to check cache w/o named lock... BlockingClientTCPTransport * transport = (BlockingClientTCPTransport*)(_context->getTransportRegistry()->get( - "TCP", address, priority)); + "TCP", &address, priority)); if(transport!=NULL) { errlogSevPrintf(errlogInfo, "Reusing existing connection to CA server: %s", @@ -88,13 +88,13 @@ namespace epics { } bool lockAcquired = _namedLocker->acquireSynchronizationObject( - address, LOCK_TIMEOUT); + &address, LOCK_TIMEOUT); if(lockAcquired) { try { // ... transport created during waiting in lock transport = (BlockingClientTCPTransport*)(_context->getTransportRegistry()->get( - "TCP", address, priority)); + "TCP", &address, priority)); if(transport!=NULL) { errlogSevPrintf(errlogInfo, "Reusing existing connection to CA server: %s", @@ -156,10 +156,10 @@ namespace epics { } catch(...) { // close socket, if open if(socket!=INVALID_SOCKET) epicsSocketDestroy(socket); - _namedLocker->releaseSynchronizationObject(address); + _namedLocker->releaseSynchronizationObject(&address); throw; } - _namedLocker->releaseSynchronizationObject(address); + _namedLocker->releaseSynchronizationObject(&address); } else { ostringstream temp; diff --git a/pvAccessApp/remote/blockingTCPTransport.cpp b/pvAccessApp/remote/blockingTCPTransport.cpp index 836a055..113cd48 100644 --- a/pvAccessApp/remote/blockingTCPTransport.cpp +++ b/pvAccessApp/remote/blockingTCPTransport.cpp @@ -72,22 +72,24 @@ namespace epics { _priority(priority), _responseHandler(responseHandler), _totalBytesReceived(0), _totalBytesSent(0), _markerToSend(0), _verified(false), _remoteBufferFreeSpace( - LONG_LONG_MAX), _markerPeriodBytes(MARKER_PERIOD), - _nextMarkerPosition(_markerPeriodBytes), - _sendPending(false), _lastMessageStartPosition(0), _mutex( - new Mutex()), _sendQueueMutex(new Mutex()), - _verifiedMutex(new Mutex()), _monitorMutex(new Mutex()), - _stage(READ_FROM_SOCKET), _lastSegmentedMessageType(0), - _lastSegmentedMessageCommand(0), _storedPayloadSize(0), - _storedPosition(0), _storedLimit(0), _magicAndVersion(0), - _packetType(0), _command(0), _payloadSize(0), - _flushRequested(false), _sendBufferSentPosition(0), - _flushStrategy(DELAYED), _sendQueue( + LONG_LONG_MAX), _autoDelete(true), + _markerPeriodBytes(MARKER_PERIOD), _nextMarkerPosition( + _markerPeriodBytes), _sendPending(false), + _lastMessageStartPosition(0), _mutex(new Mutex()), + _sendQueueMutex(new Mutex()), _verifiedMutex(new Mutex()), + _monitorMutex(new Mutex()), _stage(READ_FROM_SOCKET), + _lastSegmentedMessageType(0), _lastSegmentedMessageCommand( + 0), _storedPayloadSize(0), _storedPosition(0), + _storedLimit(0), _magicAndVersion(0), _packetType(0), + _command(0), _payloadSize(0), _flushRequested(false), + _sendBufferSentPosition(0), _flushStrategy(DELAYED), + _sendQueue( new GrowingCircularBuffer (100)), _rcvThreadId(NULL), _sendThreadId(NULL), _monitorSendQueue( new GrowingCircularBuffer (100)), _monitorSender(new MonitorSender(_monitorMutex, - _monitorSendQueue)), _context(context) { + _monitorSendQueue)), _context(context), + _sendThreadRunning(false) { _socketBuffer = new ByteBuffer(max(MAX_TCP_RECV +MAX_ENSURE_DATA_BUFFER_SIZE, receiveBufferSize)); @@ -141,6 +143,7 @@ namespace epics { } void BlockingTCPTransport::start() { + _sendThreadRunning = true; String threadName = "TCP-receive "+inetAddressToString( _socketAddress); @@ -210,7 +213,10 @@ namespace epics { void BlockingTCPTransport::internalClose(bool force) { // close the socket - epicsSocketDestroy(_channel); + if(_channel!=INVALID_SOCKET) { + epicsSocketDestroy(_channel); + _channel = INVALID_SOCKET; + } } int BlockingTCPTransport::getSocketReceiveBufferSize() const { @@ -462,11 +468,11 @@ namespace epics { maxToRead, 0); _socketBuffer->put(readBuffer, 0, bytesRead); - if(bytesRead<0) { + if(bytesRead<=0) { // error (disconnect, end-of-stream) detected close(true); - if(nestedCall) THROW_BASE_EXCEPTION( + if(bytesRead<0&&nestedCall) THROW_BASE_EXCEPTION( "bytesRead < 0"); return; @@ -834,12 +840,23 @@ namespace epics { errlogSevPrintf(errlogInfo, "Connection to %s closed.", inetAddressToString(_socketAddress).c_str()); - epicsSocketDestroy(_channel); + if(_channel!=INVALID_SOCKET) { + epicsSocketDestroy(_channel); + _channel = INVALID_SOCKET; + } } void BlockingTCPTransport::rcvThreadRunner(void* param) { - ((BlockingTCPTransport*)param)->processReadCached(false, NONE, - CA_MESSAGE_HEADER_SIZE, false); + BlockingTCPTransport* obj = (BlockingTCPTransport*)param; + + obj->processReadCached(false, NONE, CA_MESSAGE_HEADER_SIZE, false); + + if(obj->_autoDelete) { + while(obj->_sendThreadRunning) + epicsThreadSleep(0.1); + + delete obj; + } } void BlockingTCPTransport::sendThreadRunner(void* param) { @@ -848,15 +865,19 @@ namespace epics { obj->processSendQueue(); obj->freeConnectionResorces(); + + obj->_sendThreadRunning = false; } void BlockingTCPTransport::enqueueSendRequest(TransportSender* sender) { + if(_closed) return; Lock lock(_sendQueueMutex); _sendQueue->insert(sender); } void BlockingTCPTransport::enqueueMonitorSendRequest( TransportSender* sender) { + if(_closed) return; Lock lock(_monitorMutex); _monitorSendQueue->insert(sender); if(_monitorSendQueue->size()==1) enqueueSendRequest(_monitorSender); diff --git a/pvAccessApp/remote/blockingUDP.h b/pvAccessApp/remote/blockingUDP.h index 785f327..43b1b6f 100644 --- a/pvAccessApp/remote/blockingUDP.h +++ b/pvAccessApp/remote/blockingUDP.h @@ -32,7 +32,7 @@ namespace epics { public TransportSendControl { public: BlockingUDPTransport(ResponseHandler* responseHandler, - SOCKET channel, osiSockAddr* bindAddress, + SOCKET channel, osiSockAddr& bindAddress, InetAddrVector* sendAddresses, short remoteTransportRevision); @@ -107,8 +107,10 @@ namespace epics { // noop since all UDP requests are sent immediately } - virtual void setRecipient(const osiSockAddr* sendTo) { - _sendTo = sendTo; + virtual void setRecipient(const osiSockAddr& sendTo) { + if(_sendTo!=NULL) delete _sendTo; + _sendTo = new osiSockAddr; + memcpy(_sendTo, &sendTo, sizeof(osiSockAddr)); } virtual void flushSerializeBuffer() { @@ -135,7 +137,9 @@ namespace epics { return _ignoredAddresses; } - bool send(ByteBuffer* buffer, const osiSockAddr* address = NULL); + bool send(ByteBuffer* buffer, const osiSockAddr& address); + + bool send(ByteBuffer* buffer); /** * Get list of send addresses. @@ -149,7 +153,7 @@ namespace epics { * Get bind address. * @return bind address. */ - osiSockAddr* getBindAddress() { + const osiSockAddr* getBindAddress() const { return _bindAddress; } @@ -177,7 +181,7 @@ namespace epics { private: static void threadRunner(void* param); - bool processBuffer(osiSockAddr* fromAddress, + bool processBuffer(osiSockAddr& fromAddress, epics::pvData::ByteBuffer* receiveBuffer); // Context only used for logging in this class @@ -207,7 +211,7 @@ namespace epics { */ InetAddrVector* _ignoredAddresses; - const osiSockAddr* _sendTo; + osiSockAddr* _sendTo; /** * Receive buffer. @@ -259,7 +263,7 @@ namespace epics { * NOTE: transport client is ignored for broadcast (UDP). */ virtual Transport* connect(TransportClient* client, - ResponseHandler* responseHandler, osiSockAddr* bindAddress, + ResponseHandler* responseHandler, osiSockAddr& bindAddress, short transportRevision, int16 priority); private: diff --git a/pvAccessApp/remote/blockingUDPConnector.cpp b/pvAccessApp/remote/blockingUDPConnector.cpp index 002220b..3e97a93 100644 --- a/pvAccessApp/remote/blockingUDPConnector.cpp +++ b/pvAccessApp/remote/blockingUDPConnector.cpp @@ -24,10 +24,10 @@ namespace epics { namespace pvAccess { Transport* BlockingUDPConnector::connect(TransportClient* client, - ResponseHandler* responseHandler, osiSockAddr* bindAddress, + ResponseHandler* responseHandler, osiSockAddr& bindAddress, short transportRevision, int16 priority) { errlogSevPrintf(errlogInfo, "Creating datagram socket to: %s", - inetAddressToString(bindAddress).c_str()); + inetAddressToString(&bindAddress).c_str()); SOCKET socket = epicsSocketCreate(AF_INET, SOCK_DGRAM, IPPROTO_UDP); if(socket==INVALID_SOCKET) { @@ -45,7 +45,7 @@ namespace epics { * because of an early setsockopt call failing. */ - int retval = ::bind(socket, (sockaddr*)&(bindAddress->sa), + int retval = ::bind(socket, (sockaddr*)&(bindAddress.sa), sizeof(sockaddr)); if(retval<0) { errlogSevPrintf(errlogMajor, "Error binding socket: %s", diff --git a/pvAccessApp/remote/blockingUDPTransport.cpp b/pvAccessApp/remote/blockingUDPTransport.cpp index 9a24c71..3f53a8a 100644 --- a/pvAccessApp/remote/blockingUDPTransport.cpp +++ b/pvAccessApp/remote/blockingUDPTransport.cpp @@ -34,21 +34,28 @@ namespace epics { BlockingUDPTransport::BlockingUDPTransport( ResponseHandler* responseHandler, SOCKET channel, - osiSockAddr* bindAddress, InetAddrVector* sendAddresses, + osiSockAddr& bindAddress, InetAddrVector* sendAddresses, short remoteTransportRevision) : _closed(false), _responseHandler(responseHandler), - _channel(channel), _socketAddress(bindAddress), - _bindAddress(bindAddress), _sendAddresses(sendAddresses), + _channel(channel), _sendAddresses(sendAddresses), _ignoredAddresses(NULL), _sendTo(NULL), _receiveBuffer( new ByteBuffer(MAX_UDP_RECV)), _sendBuffer( new ByteBuffer(MAX_UDP_RECV)), _lastMessageStartPosition(0), _readBuffer( new char[MAX_UDP_RECV]), _mutex(new Mutex()), _threadId(NULL) { + _socketAddress = new osiSockAddr; + memcpy(_socketAddress, &bindAddress, sizeof(osiSockAddr)); + _bindAddress = _socketAddress; + } BlockingUDPTransport::~BlockingUDPTransport() { close(true); // close the socket and stop the thread. + if(_sendTo!=NULL) delete _sendTo; + delete _socketAddress; + // _bindAddress equals _socketAddress + delete _receiveBuffer; delete _sendBuffer; delete[] _readBuffer; @@ -89,7 +96,10 @@ namespace epics { sender->send(_sendBuffer, this); sender->unlock(); endMessage(); - send(_sendBuffer, _sendTo); + if(_sendTo==NULL) + send(_sendBuffer); + else + send(_sendBuffer, *_sendTo); } catch(...) { sender->unlock(); } @@ -172,7 +182,7 @@ namespace epics { _receiveBuffer->flip(); - processBuffer(&fromAddress, _receiveBuffer); + processBuffer(fromAddress, _receiveBuffer); } } else { @@ -210,7 +220,7 @@ namespace epics { errlogSevPrintf(errlogInfo, "Thread '%s' exiting", threadName); } - bool BlockingUDPTransport::processBuffer(osiSockAddr* fromAddress, + bool BlockingUDPTransport::processBuffer(osiSockAddr& fromAddress, ByteBuffer* receiveBuffer) { // handle response(s) @@ -238,7 +248,7 @@ namespace epics { if(nextRequestPosition>receiveBuffer->getLimit()) return false; // handle - _responseHandler->handleResponse(fromAddress, this, + _responseHandler->handleResponse(&fromAddress, this, (int8)(magicAndVersion&0xFF), command, payloadSize, _receiveBuffer); @@ -251,32 +261,32 @@ namespace epics { } bool BlockingUDPTransport::send(ByteBuffer* buffer, - const osiSockAddr* address) { - if(address==NULL&&_sendAddresses==NULL) return false; + const osiSockAddr& address) { - if(address!=NULL) { - buffer->flip(); - int retval = - sendto(_channel, buffer->getArray(), - buffer->getLimit(), 0, &(address->sa), - sizeof(sockaddr)); - if(retval<0) { - errlogSevPrintf(errlogMajor, "Socket sendto error: %s", - strerror(errno)); - return false; - } + buffer->flip(); + int retval = sendto(_channel, buffer->getArray(), + buffer->getLimit(), 0, &(address.sa), sizeof(sockaddr)); + if(retval<0) { + errlogSevPrintf(errlogMajor, "Socket sendto error: %s", + strerror(errno)); + return false; } - else { - for(size_t i = 0; i<_sendAddresses->size(); i++) { - buffer->flip(); - int retval = sendto(_channel, buffer->getArray(), - buffer->getLimit(), 0, - &(_sendAddresses->at(i)->sa), sizeof(sockaddr)); - { - if(retval<0) errlogSevPrintf(errlogMajor, - "Socket sendto error: %s", strerror(errno)); - return false; - } + + return true; + } + + bool BlockingUDPTransport::send(ByteBuffer* buffer) { + if(_sendAddresses==NULL) return false; + + for(size_t i = 0; i<_sendAddresses->size(); i++) { + buffer->flip(); + int retval = sendto(_channel, buffer->getArray(), + buffer->getLimit(), 0, &(_sendAddresses->at(i)->sa), + sizeof(sockaddr)); + { + if(retval<0) errlogSevPrintf(errlogMajor, + "Socket sendto error: %s", strerror(errno)); + return false; } } diff --git a/pvAccessApp/remote/channelSearchManager.h b/pvAccessApp/remote/channelSearchManager.h index b180e71..cb5f3c3 100644 --- a/pvAccessApp/remote/channelSearchManager.h +++ b/pvAccessApp/remote/channelSearchManager.h @@ -418,7 +418,7 @@ class MockTransportSendControl: public TransportSendControl public: void endMessage() {} void flush(bool lastMessageCompleted) {} - void setRecipient(const osiSockAddr* sendTo) {} + void setRecipient(const osiSockAddr& sendTo) {} void startMessage(int8 command, int32 ensureCapacity) {} void ensureBuffer(int32 size) {} void flushSerializeBuffer() {} diff --git a/pvAccessApp/remote/remote.h b/pvAccessApp/remote/remote.h index 4b55a0b..46f7a12 100644 --- a/pvAccessApp/remote/remote.h +++ b/pvAccessApp/remote/remote.h @@ -54,7 +54,7 @@ namespace epics { virtual void flush(bool lastMessageCompleted) =0; - virtual void setRecipient(const osiSockAddr* sendTo) =0; + virtual void setRecipient(const osiSockAddr& sendTo) =0; }; /** @@ -326,7 +326,7 @@ namespace epics { * @throws ConnectionException */ virtual Transport* connect(TransportClient* client, - ResponseHandler* responseHandler, osiSockAddr* address, + ResponseHandler* responseHandler, osiSockAddr& address, short transportRevision, int16 priority) =0; }; diff --git a/pvAccessApp/server/responseHandlers.cpp b/pvAccessApp/server/responseHandlers.cpp index f69c5c7..e3bbad5 100644 --- a/pvAccessApp/server/responseHandlers.cpp +++ b/pvAccessApp/server/responseHandlers.cpp @@ -32,8 +32,8 @@ namespace epics { ipAddrToA(&responseFrom->ia, ipAddrStr, sizeof(ipAddrStr)); ostringstream prologue; - prologue<<"Message ["<getArray(), @@ -63,9 +63,9 @@ namespace epics { _handlerTable = new ResponseHandler*[HANDLER_TABLE_LENGTH]; // TODO add real handlers, as they are developed - _handlerTable[0] = badResponse; + _handlerTable[0] = new NoopResponse(_context, "Beacon"); _handlerTable[1] = new ConnectionValidationHandler(_context); - _handlerTable[2] = badResponse; + _handlerTable[2] = new EchoHandler(_context); _handlerTable[3] = badResponse; _handlerTable[4] = badResponse; _handlerTable[5] = badResponse; @@ -96,6 +96,8 @@ namespace epics { ServerResponseHandler::~ServerResponseHandler() { delete _handlerTable[0]; delete _handlerTable[1]; + delete _handlerTable[2]; + delete _handlerTable[27]; delete[] _handlerTable; } @@ -137,5 +139,42 @@ namespace epics { //transport.setPriority(payloadBuffer.getShort()); } + class EchoTransportSender : public TransportSender { + public: + EchoTransportSender(osiSockAddr* echoFrom) { + memcpy(&_echoFrom, echoFrom, sizeof(osiSockAddr)); + } + + virtual void send(ByteBuffer* buffer, TransportSendControl* control) { + control->startMessage(CMD_ECHO, 0); + control->setRecipient(_echoFrom); + } + + virtual void lock() { + } + + virtual void unlock() { + delete this; + } + private: + osiSockAddr _echoFrom; + + virtual ~EchoTransportSender() { + } + }; + + void EchoHandler::handleResponse(osiSockAddr* responseFrom, + Transport* transport, int8 version, int8 command, + int payloadSize, epics::pvData::ByteBuffer* payloadBuffer) { + AbstractServerResponseHandler::handleResponse(responseFrom, + transport, version, command, payloadSize, payloadBuffer); + + EchoTransportSender* echoReply = new EchoTransportSender( + responseFrom); + + // send back + transport->enqueueSendRequest(echoReply); + } + } } diff --git a/pvAccessApp/server/responseHandlers.h b/pvAccessApp/server/responseHandlers.h index f08c399..afbcdba 100644 --- a/pvAccessApp/server/responseHandlers.h +++ b/pvAccessApp/server/responseHandlers.h @@ -103,6 +103,41 @@ namespace epics { int payloadSize, epics::pvData::ByteBuffer* payloadBuffer); }; + /** + * NOOP response. + * @author Matej Sekoranja + * @version $Id: NoopResponse.java,v 1.1 2010/05/03 14:45:39 mrkraimer Exp $ + */ + class NoopResponse : public AbstractServerResponseHandler { + public: + /** + * @param context + * @param description + */ + NoopResponse(ServerContextImpl* context, String description) : + AbstractServerResponseHandler(context, description) { + } + }; + + /** + * Echo request handler. + * @author Matej Sekoranja + * @version $Id: EchoHandler.java,v 1.1 2010/05/03 14:45:39 mrkraimer Exp $ + */ + class EchoHandler : public AbstractServerResponseHandler { + public: + /** + * @param context + */ + EchoHandler(ServerContextImpl* context) : + AbstractServerResponseHandler(context, "Echo request") { + } + + virtual void handleResponse(osiSockAddr* responseFrom, + Transport* transport, int8 version, int8 command, + int payloadSize, epics::pvData::ByteBuffer* payloadBuffer); + }; + } } diff --git a/pvAccessApp/utils/hexDump.cpp b/pvAccessApp/utils/hexDump.cpp index 050c502..088a9c5 100644 --- a/pvAccessApp/utils/hexDump.cpp +++ b/pvAccessApp/utils/hexDump.cpp @@ -11,6 +11,9 @@ #include using namespace epics::pvData; +using std::stringstream; +using std::endl; +using std::cout; namespace epics { namespace pvAccess { @@ -29,9 +32,9 @@ namespace epics { void hexDump(const String prologue, const String name, const int8 *bs, int start, int len) { - std::stringstream header; + stringstream header; - header<ia.sin_port); - if(displayHex) saddr<<" ("<ia.sin_addr.s_addr)<<")"; + if(displayPort) saddr<<":"<ia.sin_port); + if(displayHex) saddr<<" ("<ia.sin_addr.s_addr) + <<")"; return saddr.str(); } diff --git a/pvAccessApp/utils/inetAddressUtil.h b/pvAccessApp/utils/inetAddressUtil.h index 6910279..0450839 100644 --- a/pvAccessApp/utils/inetAddressUtil.h +++ b/pvAccessApp/utils/inetAddressUtil.h @@ -71,7 +71,7 @@ namespace epics { const InetAddrVector* appendList = NULL); const String inetAddressToString(const osiSockAddr *addr, - bool displayHex = false); + bool displayPort = true, bool displayHex = false); /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */ diff --git a/testApp/remote/Makefile b/testApp/remote/Makefile index 620b69c..77979b5 100644 --- a/testApp/remote/Makefile +++ b/testApp/remote/Makefile @@ -26,6 +26,15 @@ PROD_HOST += testChannelSearchManager testChannelSearchManager_SRCS += testChannelSearchManager.cpp testChannelSearchManager_LIBS += pvData pvAccess Com +PROD_HOST += testBlockingTCPSrv +testBlockingTCPSrv_SRCS += testBlockingTCPSrv.cpp +testBlockingTCPSrv_LIBS += pvData pvAccess Com + +PROD_HOST += testBlockingTCPClnt +testBlockingTCPClnt_SRCS += testBlockingTCPClnt.cpp +testBlockingTCPClnt_LIBS += pvData pvAccess Com + + include $(TOP)/configure/RULES #---------------------------------------- # ADD RULES AFTER THIS LINE diff --git a/testApp/remote/testBeaconEmitter.cpp b/testApp/remote/testBeaconEmitter.cpp index 9ddd828..0d3c904 100644 --- a/testApp/remote/testBeaconEmitter.cpp +++ b/testApp/remote/testBeaconEmitter.cpp @@ -53,7 +53,7 @@ void testBeaconEmitter() bindAddr.ia.sin_family = AF_INET; bindAddr.ia.sin_port = htons(5066); bindAddr.ia.sin_addr.s_addr = htonl(INADDR_ANY); - Transport* transport = connector.connect(NULL, &drh, &bindAddr, 1, 50); + Transport* transport = connector.connect(NULL, &drh, bindAddr, 1, 50); cout<<"Sending beacons"<getRemoteAddress()); diff --git a/testApp/remote/testBeaconHandler.cpp b/testApp/remote/testBeaconHandler.cpp index 30a51e2..0ac231c 100644 --- a/testApp/remote/testBeaconHandler.cpp +++ b/testApp/remote/testBeaconHandler.cpp @@ -112,7 +112,7 @@ void testBeaconHandler() bindAddr.ia.sin_family = AF_INET; bindAddr.ia.sin_port = htons(5067); bindAddr.ia.sin_addr.s_addr = htonl(INADDR_ANY); - Transport* transport = connector.connect(NULL, &brh, &bindAddr, 1, 50); + Transport* transport = connector.connect(NULL, &brh, bindAddr, 1, 50); (static_cast(transport))->start(); while(1) sleep(1); diff --git a/testApp/remote/testBlockingTCPClnt.cpp b/testApp/remote/testBlockingTCPClnt.cpp index 32c7c29..3f69392 100644 --- a/testApp/remote/testBlockingTCPClnt.cpp +++ b/testApp/remote/testBlockingTCPClnt.cpp @@ -29,14 +29,19 @@ using std::sscanf; class ContextImpl : public Context { public: ContextImpl() : - _tr(new TransportRegistry()), - _timer(new Timer("client thread", lowPriority)) {} + _tr(new TransportRegistry()), _timer(new Timer("client thread", + lowPriority)) { + } virtual ~ContextImpl() { delete _tr; delete _timer; } - virtual Timer* getTimer() { return _timer; } - virtual TransportRegistry* getTransportRegistry() { return _tr; } + virtual Timer* getTimer() { + return _timer; + } + virtual TransportRegistry* getTransportRegistry() { + return _tr; + } private: TransportRegistry* _tr; Timer* _timer; @@ -47,6 +52,8 @@ public: virtual void handleResponse(osiSockAddr* responseFrom, Transport* transport, int8 version, int8 command, int payloadSize, ByteBuffer* payloadBuffer) { + + if(command==CMD_CONNECTION_VALIDATION) transport->verified(); } }; @@ -105,21 +112,23 @@ void testBlockingTCPSender() { osiSockAddr srvAddr; - srvAddr.ia.sin_family = AF_INET; - //srvAddr.ia.sin_port = htons(CA_SERVER_PORT); + //srvAddr.ia.sin_family = AF_INET; if(aToIPAddr("192.168.71.132", CA_SERVER_PORT, &srvAddr.ia)<0) { cout<<"error in aToIPAddr(...)"<enqueueSendRequest(&dts); + if(!transport->isClosed()) + transport->enqueueSendRequest(&dts); + else + break; sleep(1); } diff --git a/testApp/remote/testBlockingUDPClnt.cpp b/testApp/remote/testBlockingUDPClnt.cpp index cefed59..06443aa 100644 --- a/testApp/remote/testBlockingUDPClnt.cpp +++ b/testApp/remote/testBlockingUDPClnt.cpp @@ -15,6 +15,8 @@ #include #include +#define SRV_IP "192.168.71.132" + using namespace epics::pvAccess; using namespace epics::pvData; @@ -41,15 +43,13 @@ public: } virtual void send(ByteBuffer* buffer, TransportSendControl* control) { - // set recipient - sendTo.ia.sin_family = AF_INET; - sendTo.ia.sin_port = htons(65000); - if(aToIPAddr("192.168.71.129", 65000, &sendTo.ia)<0) { + // SRV_IP defined at the top of the this file + if(aToIPAddr(SRV_IP, 65000, &sendTo.ia)<0) { cout<<"error in aToIPAddr(...)"<setRecipient(&sendTo); + control->setRecipient(sendTo); // send the packet count++; @@ -79,7 +79,7 @@ void testBlockingUDPSender() { bindAddr.ia.sin_port = htons(65001); bindAddr.ia.sin_addr.s_addr = htonl(INADDR_ANY); - Transport* transport = connector.connect(NULL, &drh, &bindAddr, 1, 50); + Transport* transport = connector.connect(NULL, &drh, bindAddr, 1, 50); cout<<"Sending 10 packets..."<start(); diff --git a/testApp/utils/inetAddressUtilsTest.cpp b/testApp/utils/inetAddressUtilsTest.cpp index d6a55d4..ce77ae4 100644 --- a/testApp/utils/inetAddressUtilsTest.cpp +++ b/testApp/utils/inetAddressUtilsTest.cpp @@ -63,7 +63,7 @@ int main(int argc, char *argv[]) { assert(addr->ia.sin_port==htons(6789)); assert(addr->ia.sin_addr.s_addr==htonl(0xAC1037A0)); assert(inetAddressToString(addr)=="172.16.55.160:6789"); -cout<<'\t'<at(1); assert(addr->ia.sin_family==AF_INET); @@ -100,13 +100,13 @@ cout<<'\t'<ia.sin_family==AF_INET); - assert(inetAddressToString(addr)=="127.0.0.1"); + assert(inetAddressToString(addr)=="127.0.0.1:0"); cout<<'\t'<ia.sin_family==AF_INET); - assert(inetAddressToString(addr)=="10.10.12.11"); + assert(inetAddressToString(addr)=="10.10.12.11:0"); cout<<'\t'<