From bd1a4e2634a5b0a1a37f80b43135abdbf54497c6 Mon Sep 17 00:00:00 2001 From: miha_vitorovic Date: Tue, 4 Jan 2011 11:58:00 +0100 Subject: [PATCH] Makefile: included all the new sources blockingClientTCPTransport.cpp: implementation blockingTCP.h: * class BlockingTCPTransport: - added Context to ctor - added 'virtual' declaration to overrides - 'priority' is now 'int16' * added class 'BlockingClientTCPTransport' blockingTCPConnector.cpp: implementation blockingTCPTransport.cpp: * removed 'transportRegistry' added 'context' blockingUDP.h: * added missing override 'getIntrospectionRegistry' * 'BlockingUDPConnector::_priority' is now 'int16' instead of 'short' blockingUDPConnector.cpp: * 'connect' parameter priority is now 'int16' instead of 'short' * fixed and added error logging remote.h: * added 'TransportRegistry' forward declaration * added 'Transport::getIntrospectionRegistry' prototype * changed 'Connector::connect' prototype parameter 'priority': 'short'->'int16' * added 'Context' interface * added 'ReferenceCountingTransport' interface transportRegistry.h: * added 'Transport' forward declaration testRemoteClientImpl.cpp: * added '#include ' to fix compile error * lots of auto-format changes transportRegistryTest.cpp: * --- pvAccessApp/Makefile | 4 + .../remote/blockingClientTCPTransport.cpp | 216 +++++++++++++++ pvAccessApp/remote/blockingTCP.h | 192 ++++++++++++- pvAccessApp/remote/blockingTCPConnector.cpp | 174 ++++++++++++ pvAccessApp/remote/blockingTCPTransport.cpp | 13 +- pvAccessApp/remote/blockingUDP.h | 9 +- pvAccessApp/remote/blockingUDPConnector.cpp | 6 +- pvAccessApp/remote/remote.h | 55 +++- pvAccessApp/utils/transportRegistry.h | 1 + testApp/remote/testRemoteClientImpl.cpp | 261 +++++++++--------- testApp/utils/transportRegistryTest.cpp | 2 + 11 files changed, 775 insertions(+), 158 deletions(-) create mode 100644 pvAccessApp/remote/blockingClientTCPTransport.cpp create mode 100644 pvAccessApp/remote/blockingTCPConnector.cpp diff --git a/pvAccessApp/Makefile b/pvAccessApp/Makefile index 368e322..8cd62cc 100644 --- a/pvAccessApp/Makefile +++ b/pvAccessApp/Makefile @@ -43,11 +43,15 @@ INC += blockingUDP.h INC += beaconEmitter.h INC += beaconServerStatusProvider.h INC += beaconHandler.h +INC += blockingTCP.h LIBSRCS += blockingUDPTransport.cpp LIBSRCS += blockingUDPConnector.cpp LIBSRCS += beaconEmitter.cpp LIBSRCS += beaconServerStatusProvider.cpp LIBSRCS += beaconHandler.cpp +LIBSRCS += blockingTCPTransport.cpp +LIBSRCS += blockingClientTCPTransport.cpp +LIBSRCS += blockingTCPConnector.cpp LIBRARY = pvAccess pvAccess_LIBS += Com diff --git a/pvAccessApp/remote/blockingClientTCPTransport.cpp b/pvAccessApp/remote/blockingClientTCPTransport.cpp new file mode 100644 index 0000000..e380b58 --- /dev/null +++ b/pvAccessApp/remote/blockingClientTCPTransport.cpp @@ -0,0 +1,216 @@ +/* + * BlockingClientTCPTransport.cpp + * + * Created on: Jan 3, 2011 + * Author: Miha Vitorovic + */ + +/* pvAccess */ +#include "blockingTCP.h" + +#include "introspectionRegistry.h" + +/* pvData */ +#include + +/* EPICSv3 */ +#include + +/* standard */ +#include +#include +#include + +using std::set; +using namespace epics::pvData; + +namespace epics { + namespace pvAccess { + + BlockingClientTCPTransport::BlockingClientTCPTransport( + Context* context, SOCKET channel, + ResponseHandler* responseHandler, int receiveBufferSize, + TransportClient* client, short remoteTransportRevision, + float beaconInterval, int16 priority) : + BlockingTCPTransport(context, channel, responseHandler, + receiveBufferSize, priority), _introspectionRegistry( + new IntrospectionRegistry(false)), _owners(new set< + TransportClient*> ()), _connectionTimeout(beaconInterval + *1000), _unresponsiveTransport(false), _timerNode( + new TimerNode(this)), _mutex(new Mutex()), _ownersMutex( + new Mutex()), _verifyOrEcho(true) { + + // initialize owners list, send queue + acquire(client); + + // use immediate for clients + setSendQueueFlushStrategy(IMMEDIATE); + + // setup connection timeout timer (watchdog) + epicsTimeGetCurrent(const_cast (&_aliveTimestamp)); + + context->getTimer()->schedulePeriodic(_timerNode, beaconInterval, + beaconInterval); + + start(); + + } + + BlockingClientTCPTransport::~BlockingClientTCPTransport() { + delete _introspectionRegistry; + delete _owners; + delete _timerNode; + delete _mutex; + delete _ownersMutex; + } + + void BlockingClientTCPTransport::callback() { + epicsTimeStamp currentTime; + epicsTimeGetCurrent(¤tTime); + + double diff = epicsTimeDiffInSeconds(¤tTime, + const_cast (&_aliveTimestamp)); + if(diff>2*_connectionTimeout) { + unresponsiveTransport(); + } + else if(diff>_connectionTimeout) { + // send echo + enqueueSendRequest(this); + } + } + + void BlockingClientTCPTransport::unresponsiveTransport() { + if(!_unresponsiveTransport) { + _unresponsiveTransport = true; + + Lock lock(_ownersMutex); + set::iterator it = _owners->begin(); + for(; it!=_owners->end(); it++) + (*it)->transportUnresponsive(); + } + } + + bool BlockingClientTCPTransport::acquire(TransportClient* client) { + Lock lock(_mutex); + + if(_closed) return false; + + char ipAddrStr[48]; + ipAddrToA(&_socketAddress->ia, ipAddrStr, sizeof(ipAddrStr)); + errlogSevPrintf(errlogInfo, "Acquiring transport to %s.", ipAddrStr); + + _ownersMutex->lock(); + if(_closed) return false; + + _owners->insert(client); + _ownersMutex->unlock(); + + return true; + } + + void BlockingClientTCPTransport::internalClose(bool forced) { + BlockingTCPTransport::internalClose(forced); + + _timerNode->cancel(); + + closedNotifyClients(); + } + + /** + * Notifies clients about disconnect. + */ + void BlockingClientTCPTransport::closedNotifyClients() { + Lock lock(_ownersMutex); + + // check if still acquired + int refs = _owners->size(); + if(refs>0) { + ostringstream temp; + char ipAddrStr[48]; + ipAddrToA(&_socketAddress->ia, ipAddrStr, sizeof(ipAddrStr)); + temp<<"Transport to "<::iterator it = _owners->begin(); + for(; it!=_owners->end(); it++) + (*it)->transportClosed(); + } + + _owners->clear(); + } + + void BlockingClientTCPTransport::release(TransportClient* client) { + if(_closed) return; + + char ipAddrStr[48]; + ipAddrToA(&_socketAddress->ia, ipAddrStr, sizeof(ipAddrStr)); + + errlogSevPrintf(errlogInfo, "Releasing transport to %s.", ipAddrStr); + + Lock lock(_ownersMutex); + _owners->erase(client); + + // not used anymore + // TODO consider delayed destruction (can improve performance!!!) + if(_owners->size()==0) close(false); + } + + void BlockingClientTCPTransport::aliveNotification() { + epicsTimeGetCurrent(const_cast (&_aliveTimestamp)); + if(_unresponsiveTransport) responsiveTransport(); + } + + void BlockingClientTCPTransport::responsiveTransport() { + if(_unresponsiveTransport) { + _unresponsiveTransport = false; + Lock lock(_ownersMutex); + + set::iterator it = _owners->begin(); + for(; it!=_owners->end(); it++) + (*it)->transportResponsive(this); + } + } + + void BlockingClientTCPTransport::changedTransport() { + _introspectionRegistry->reset(); + Lock lock(_ownersMutex); + + set::iterator it = _owners->begin(); + for(; it!=_owners->end(); it++) + (*it)->transportChanged(); + } + + void BlockingClientTCPTransport::send(ByteBuffer* buffer, + TransportSendControl* control) { + if(_verifyOrEcho) { + /* + * send verification response message + */ + + control->startMessage(1, 2*sizeof(int32)+sizeof(int16)); + + // receive buffer size + buffer->putInt(getReceiveBufferSize()); + + // socket receive buffer size + buffer->putInt(getSocketReceiveBufferSize()); + + // connection priority + buffer->putShort(getPriority()); + + // send immediately + control->flush(true); + + _verifyOrEcho = false; + } + else { + control->startMessage(2, 0); + // send immediately + control->flush(true); + } + + } + + } +} diff --git a/pvAccessApp/remote/blockingTCP.h b/pvAccessApp/remote/blockingTCP.h index 35220c2..8bb1c8c 100644 --- a/pvAccessApp/remote/blockingTCP.h +++ b/pvAccessApp/remote/blockingTCP.h @@ -13,16 +13,22 @@ #include "remote.h" #include "growingCircularBuffer.h" #include "transportRegistry.h" +#include "introspectionRegistry.h" /* pvData */ #include #include #include #include +#include /* EPICSv3 */ #include #include +#include + +/* standard */ +#include namespace epics { namespace pvAccess { @@ -40,27 +46,27 @@ namespace epics { class BlockingTCPTransport : public Transport, public TransportSendControl { public: - BlockingTCPTransport(SOCKET channel, + BlockingTCPTransport(Context* context, SOCKET channel, ResponseHandler* responseHandler, int receiveBufferSize, - short priority, TransportRegistry* transportRegistry); + int16 priority); - ~BlockingTCPTransport(); + virtual ~BlockingTCPTransport(); - bool isClosed() const { + virtual bool isClosed() const { return _closed; } - void setRemoteMinorRevision(int minorRevision) { + virtual void setRemoteMinorRevision(int8 minorRevision) { _remoteTransportRevision = minorRevision; } - void setRemoteTransportReceiveBufferSize( + virtual void setRemoteTransportReceiveBufferSize( int remoteTransportReceiveBufferSize) { _remoteTransportReceiveBufferSize = remoteTransportReceiveBufferSize; } - void setRemoteTransportSocketReceiveBufferSize( + virtual void setRemoteTransportSocketReceiveBufferSize( int socketReceiveBufferSize) { _remoteTransportSocketReceiveBufferSize = socketReceiveBufferSize; @@ -195,9 +201,10 @@ namespace epics { /** * Priority. - * NOTE: Priority cannot just be changed, since it is registered in transport registry with given priority. + * NOTE: Priority cannot just be changed, since it is registered + * in transport registry with given priority. */ - short _priority; + int16 _priority; // TODO to be implemeneted /** @@ -326,7 +333,7 @@ namespace epics { MonitorSender* _monitorSender; - TransportRegistry* _transportRegistry; + Context* _context; /** * Internal method that clears and releases buffer. @@ -352,6 +359,171 @@ namespace epics { TransportSender* extractFromSendQueue(); }; + class BlockingClientTCPTransport : public BlockingTCPTransport, + public TransportSender, + public epics::pvData::TimerCallback, + public ReferenceCountingTransport { + + public: + BlockingClientTCPTransport(Context* context, SOCKET channel, + ResponseHandler* responseHandler, int receiveBufferSize, + TransportClient* client, short remoteTransportRevision, + float beaconInterval, int16 priority); + + virtual ~BlockingClientTCPTransport(); + + virtual void timerStopped() { + // noop + } + + virtual void callback(); + + /** + * Acquires transport. + * @param client client (channel) acquiring the transport + * @return true if transport was granted, false otherwise. + */ + virtual bool acquire(TransportClient* client); + + virtual IntrospectionRegistry* getIntrospectionRegistry() { + return _introspectionRegistry; + } + + /** + * Releases transport. + * @param client client (channel) releasing the transport + */ + virtual void release(TransportClient* client); + + /** + * Alive notification. + * This method needs to be called (by newly received data or beacon) + * at least once in this period, if not echo will be issued + * and if there is not response to it, transport will be considered as unresponsive. + */ + virtual void aliveNotification(); + + /** + * Changed transport (server restared) notify. + */ + virtual void changedTransport(); + + virtual void lock() { + // noop + } + + virtual void unlock() { + // noop + } + + virtual void send(epics::pvData::ByteBuffer* buffer, + TransportSendControl* control); + + protected: + /** + * Introspection registry. + */ + IntrospectionRegistry* _introspectionRegistry; + + virtual void internalClose(bool force); + + private: + + /** + * Owners (users) of the transport. + */ + std::set* _owners; + + /** + * Connection timeout (no-traffic) flag. + */ + double _connectionTimeout; + + /** + * Unresponsive transport flag. + */ + volatile bool _unresponsiveTransport; + + /** + * Timer task node. + */ + TimerNode* _timerNode; + + /** + * Timestamp of last "live" event on this transport. + */ + volatile epicsTimeStamp _aliveTimestamp; + + epics::pvData::Mutex* _mutex; + epics::pvData::Mutex* _ownersMutex; + + bool _verifyOrEcho; + + void unresponsiveTransport(); + + /** + * Notifies clients about disconnect. + */ + void closedNotifyClients(); + + /** + * Responsive transport notify. + */ + void responsiveTransport(); + }; + + /** + * Channel Access TCP connector. + * @author Matej Sekoranja + * @version $Id: BlockingTCPConnector.java,v 1.1 2010/05/03 14:45:47 mrkraimer Exp $ + */ + class BlockingTCPConnector : public Connector { + public: + BlockingTCPConnector(Context* context, int receiveBufferSize, + float beaconInterval); + + virtual ~BlockingTCPConnector(); + + virtual Transport* connect(TransportClient* client, + ResponseHandler* responseHandler, osiSockAddr* address, + short transportRevision, int16 priority); + private: + /** + * Lock timeout + */ + static const int LOCK_TIMEOUT = 20*1000; // 20s + + /** + * Context instance. + */ + Context* _context; + + /** + * Context instance. + */ + //NamedLockPattern* _namedLocker; + + /** + * Receive buffer size. + */ + int _receiveBufferSize; + + /** + * Beacon interval. + */ + float _beaconInterval; + + /** + * Tries to connect to the given address. + * @param[in] address + * @param[in] tries + * @return the SOCKET + * @throws IOException + */ + SOCKET tryConnect(osiSockAddr* address, int tries); + + }; + } } diff --git a/pvAccessApp/remote/blockingTCPConnector.cpp b/pvAccessApp/remote/blockingTCPConnector.cpp new file mode 100644 index 0000000..232cebc --- /dev/null +++ b/pvAccessApp/remote/blockingTCPConnector.cpp @@ -0,0 +1,174 @@ +/* + * blockingTCPConnector.cpp + * + * Created on: Jan 4, 2011 + * Author: Miha Vitorovic + */ + +#include "blockingTCP.h" +#include "remote.h" + +#include +#include +#include + +#include +#include +#include + +namespace epics { + namespace pvAccess { + + BlockingTCPConnector::BlockingTCPConnector(Context* context, + int receiveBufferSize, float beaconInterval) : + _context(context), _receiveBufferSize(receiveBufferSize), + _beaconInterval(beaconInterval) + //TODO , _namedLocker(new NamedLockPattern()) + { + } + + BlockingTCPConnector::~BlockingTCPConnector() { + // TODO delete _namedLocker; + } + + SOCKET BlockingTCPConnector::tryConnect(osiSockAddr* address, int tries) { + for(int tryCount = 0; tryCount0) epicsThreadSleep(0.1); + + char strBuffer[64]; + ipAddrToA(&address->ia, strBuffer, sizeof(strBuffer)); + + errlogSevPrintf(errlogInfo, + "Opening socket to CA server %s, attempt %d.", + strBuffer, tryCount+1); + + SOCKET socket = epicsSocketCreate(AF_INET, SOCK_STREAM, + IPPROTO_TCP); + if(socket==INVALID_SOCKET) { + epicsSocketConvertErrnoToString(strBuffer, + sizeof(strBuffer)); + errlogSevPrintf(errlogMinor, "Socket create error: %s", + strBuffer); + } + else { + if(::connect(socket, &address->sa, sizeof(sockaddr))==0) + return socket; + else { + epicsSocketConvertErrnoToString(strBuffer, + sizeof(strBuffer)); + errlogSevPrintf(errlogMinor, + "Socket connect error: %s", strBuffer); + } + } + } + return INVALID_SOCKET; + } + + Transport* BlockingTCPConnector::connect(TransportClient* client, + ResponseHandler* responseHandler, osiSockAddr* address, + short transportRevision, int16 priority) { + + SOCKET socket = INVALID_SOCKET; + + char ipAddrStr[64]; + ipAddrToA(&address->ia, ipAddrStr, sizeof(ipAddrStr)); + + // first try to check cache w/o named lock... + BlockingClientTCPTransport + * transport = + (BlockingClientTCPTransport*)(_context->getTransportRegistry()->get( + "TCP", address, priority)); + if(transport!=NULL) { + errlogSevPrintf(errlogInfo, + "Reusing existing connection to CA server: %s", + ipAddrStr); + if(transport->acquire(client)) return transport; + } + + bool lockAcquired = true; + // TODO comment out + //bool lockAcquired = _namedLocker->acquireSynchronizationObject( + // address, LOCK_TIMEOUT); + if(lockAcquired) { + try { + // ... transport created during waiting in lock + transport + = (BlockingClientTCPTransport*)(_context->getTransportRegistry()->get( + "TCP", address, priority)); + if(transport!=NULL) { + errlogSevPrintf(errlogInfo, + "Reusing existing connection to CA server: %s", + ipAddrStr); + if(transport->acquire(client)) return transport; + } + + errlogSevPrintf(errlogInfo, "Connecting to CA server: %s", + ipAddrStr); + + socket = tryConnect(address, 3); + + // use blocking channel + // socket is blocking bya default + //socket.configureBlocking(true); + + // enable TCP_NODELAY (disable Nagle's algorithm) + int optval = 1; // true + int retval = ::setsockopt(socket, IPPROTO_TCP, TCP_NODELAY, + &optval, sizeof(optval)); + if(retval<0) errlogSevPrintf(errlogMajor, + "Error setting TCP_NODELAY: %s", strerror(errno)); + + // enable TCP_KEEPALIVE + retval = ::setsockopt(socket, SOL_SOCKET, SO_KEEPALIVE, + &optval, sizeof(optval)); + if(retval<0) errlogSevPrintf(errlogMinor, + "Error setting SO_KEEPALIVE: %s", strerror(errno)); + + // TODO tune buffer sizes?! Win32 defaults are 8k, which is OK + //socket.socket().setReceiveBufferSize(); + //socket.socket().setSendBufferSize(); + + // create transport + transport = new BlockingClientTCPTransport(_context, socket, + responseHandler, _receiveBufferSize, client, + transportRevision, _beaconInterval, priority); + + // verify + if(!transport->waitUntilVerified(3.0)) { + transport->close(true); + errlogSevPrintf( + errlogInfo, + "Connection to CA client %s failed to be validated, closing it.", + ipAddrStr); + ostringstream temp; + temp<<"Failed to verify TCP connection to '"<* _monitorSendQueue; }; - BlockingTCPTransport::BlockingTCPTransport(SOCKET channel, - ResponseHandler* responseHandler, int receiveBufferSize, - short priority, TransportRegistry* transportRegistry) : + BlockingTCPTransport::BlockingTCPTransport(Context* context, + SOCKET channel, ResponseHandler* responseHandler, + int receiveBufferSize, int16 priority) : _closed(false), _channel(channel), _remoteTransportRevision(0), _remoteTransportReceiveBufferSize(MAX_TCP_RECV), _remoteTransportSocketReceiveBufferSize(MAX_TCP_RECV), @@ -86,8 +86,7 @@ namespace epics { _rcvThreadId(NULL), _sendThreadId(NULL), _monitorSendQueue( new GrowingCircularBuffer (100)), _monitorSender(new MonitorSender(_monitorMutex, - _monitorSendQueue)), _transportRegistry( - transportRegistry) { + _monitorSendQueue)), _context(context) { _socketBuffer = new ByteBuffer(max(MAX_TCP_RECV +MAX_ENSURE_DATA_BUFFER_SIZE, receiveBufferSize)); @@ -124,7 +123,7 @@ namespace epics { clearAndReleaseBuffer(); // add to registry - _transportRegistry->put(this); + _context->getTransportRegistry()->put(this); } BlockingTCPTransport::~BlockingTCPTransport() { @@ -195,7 +194,7 @@ namespace epics { _closed = true; // remove from registry - _transportRegistry->remove(this); + _context->getTransportRegistry()->remove(this); // clean resources internalClose(force); diff --git a/pvAccessApp/remote/blockingUDP.h b/pvAccessApp/remote/blockingUDP.h index 0aed8fc..785f327 100644 --- a/pvAccessApp/remote/blockingUDP.h +++ b/pvAccessApp/remote/blockingUDP.h @@ -17,6 +17,7 @@ #include #include #include +#include /* EPICSv3 */ #include @@ -160,6 +161,10 @@ namespace epics { _sendAddresses = addresses; } + virtual IntrospectionRegistry* getIntrospectionRegistry() { + THROW_BASE_EXCEPTION("not supported by UDP transport"); + } + protected: bool volatile _closed; @@ -237,7 +242,7 @@ namespace epics { }; class BlockingUDPConnector : public Connector, - epics::pvData::NoDefaultMethods { + public epics::pvData::NoDefaultMethods { public: BlockingUDPConnector(bool reuseSocket, @@ -255,7 +260,7 @@ namespace epics { */ virtual Transport* connect(TransportClient* client, ResponseHandler* responseHandler, osiSockAddr* bindAddress, - short transportRevision, short priority); + short transportRevision, int16 priority); private: diff --git a/pvAccessApp/remote/blockingUDPConnector.cpp b/pvAccessApp/remote/blockingUDPConnector.cpp index 1868d23..002220b 100644 --- a/pvAccessApp/remote/blockingUDPConnector.cpp +++ b/pvAccessApp/remote/blockingUDPConnector.cpp @@ -25,7 +25,7 @@ namespace epics { Transport* BlockingUDPConnector::connect(TransportClient* client, ResponseHandler* responseHandler, osiSockAddr* bindAddress, - short transportRevision, short priority) { + short transportRevision, int16 priority) { errlogSevPrintf(errlogInfo, "Creating datagram socket to: %s", inetAddressToString(bindAddress).c_str()); @@ -59,11 +59,13 @@ namespace epics { retval = ::setsockopt(socket, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)); if(retval<0) errlogSevPrintf(errlogMajor, - "Error binding socket: %s", strerror(errno)); + "Error setting SO_REUSEADDR: %s", strerror(errno)); optval = _broadcast ? 1 : 0; retval = ::setsockopt(socket, SOL_SOCKET, SO_BROADCAST, &optval, sizeof(optval)); + if(retval<0) errlogSevPrintf(errlogMajor, + "Error setting SO_BROADCAST: %s", strerror(errno)); // sockets are blocking by default diff --git a/pvAccessApp/remote/remote.h b/pvAccessApp/remote/remote.h index d411f38..91232a2 100644 --- a/pvAccessApp/remote/remote.h +++ b/pvAccessApp/remote/remote.h @@ -9,10 +9,13 @@ #define REMOTE_H_ #include "caConstants.h" +#include "transportRegistry.h" +#include "introspectionRegistry.h" #include #include #include +#include #include #include @@ -20,6 +23,8 @@ namespace epics { namespace pvAccess { + class TransportRegistry; + enum ProtocolType { TCP, UDP, SSL }; @@ -54,7 +59,8 @@ namespace epics { * NOTE: these limitations allows efficient implementation. */ virtual void - send(epics::pvData::ByteBuffer* buffer, TransportSendControl* control) =0; + send(epics::pvData::ByteBuffer* buffer, + TransportSendControl* control) =0; virtual void lock() =0; virtual void unlock() =0; @@ -156,7 +162,7 @@ namespace epics { * Get introspection registry for transport. * @return IntrospectionRegistry instance. */ - //virtual IntrospectionRegistry getIntrospectionRegistry() =0; + virtual IntrospectionRegistry* getIntrospectionRegistry() =0; /** * Close transport. @@ -206,9 +212,10 @@ namespace epics { * Note that this might not be the only message in the buffer. * Code must not manipulate buffer. */ - virtual void handleResponse(osiSockAddr* responseFrom, - Transport* transport, int8 version, int8 command, - int payloadSize, epics::pvData::ByteBuffer* payloadBuffer) =0; + virtual void + handleResponse(osiSockAddr* responseFrom, Transport* transport, + int8 version, int8 command, int payloadSize, + epics::pvData::ByteBuffer* payloadBuffer) =0; }; /** @@ -261,10 +268,46 @@ namespace epics { */ virtual Transport* connect(TransportClient* client, ResponseHandler* responseHandler, osiSockAddr* address, - short transportRevision, short priority) =0; + short transportRevision, int16 priority) =0; }; + class Context { + public: + /** + * Get timer. + * @return timer. + */ + virtual Timer* getTimer() =0; + + /** + * Get transport (virtual circuit) registry. + * @return transport (virtual circuit) registry. + */ + virtual TransportRegistry* getTransportRegistry() =0; + }; + + /** + * Interface defining reference counting transport IF. + * @author Matej Sekoranja + * @version $Id: ReferenceCountingTransport.java,v 1.1 2010/05/03 14:45:39 mrkraimer Exp $ + */ + class ReferenceCountingTransport { + public: + /** + * Acquires transport. + * @param client client (channel) acquiring the transport + * @return true if transport was granted, false otherwise. + */ + virtual bool acquire(TransportClient* client) =0; + + /** + * Releases transport. + * @param client client (channel) releasing the transport + */ + virtual void release(TransportClient* client) =0; + }; + } } diff --git a/pvAccessApp/utils/transportRegistry.h b/pvAccessApp/utils/transportRegistry.h index f4ed110..15235fd 100644 --- a/pvAccessApp/utils/transportRegistry.h +++ b/pvAccessApp/utils/transportRegistry.h @@ -24,6 +24,7 @@ using namespace std; namespace epics { namespace pvAccess { +class Transport; typedef std::map prioritiesMap_t; typedef std::map transportsMap_t; diff --git a/testApp/remote/testRemoteClientImpl.cpp b/testApp/remote/testRemoteClientImpl.cpp index 237e0f1..7db24eb 100644 --- a/testApp/remote/testRemoteClientImpl.cpp +++ b/testApp/remote/testRemoteClientImpl.cpp @@ -1,7 +1,7 @@ /* testRemoteClientImpl.cpp */ /* Author: Matej Sekoranja Date: 2011.1.1 */ - +#include #include #include #include @@ -25,7 +25,7 @@ class ChannelImplProcess : public ChannelProcess ChannelProcessRequester* m_channelProcessRequester; PVStructure* m_pvStructure; PVScalar* m_valueField; - + private: ~ChannelImplProcess() { @@ -44,29 +44,29 @@ class ChannelImplProcess : public ChannelProcess Status* noValueFieldStatus = getStatusCreate()->createStatus(STATUSTYPE_ERROR, "no 'value' field"); m_channelProcessRequester->channelProcessConnect(noValueFieldStatus, this); delete noValueFieldStatus; - + // NOTE client must destroy this instance... // do not access any fields and return ASAP return; } - + if (field->getField()->getType() != scalar) { Status* notAScalarStatus = getStatusCreate()->createStatus(STATUSTYPE_ERROR, "'value' field not scalar type"); m_channelProcessRequester->channelProcessConnect(notAScalarStatus, this); delete notAScalarStatus; - + // NOTE client must destroy this instance…. // do not access any fields and return ASAP return; } - + m_valueField = static_cast(field); - - // TODO pvRequest + + // TODO pvRequest m_channelProcessRequester->channelProcessConnect(getStatusCreate()->getStatusOK(), this); } - + virtual void process(bool lastRequest) { switch (m_valueField->getScalar()->getScalarType()) @@ -138,19 +138,19 @@ class ChannelImplProcess : public ChannelProcess default: // noop break; - - } + + } m_channelProcessRequester->processDone(getStatusCreate()->getStatusOK()); - + if (lastRequest) destroy(); } - + virtual void destroy() { delete this; } - + }; @@ -167,7 +167,7 @@ class ChannelImplGet : public ChannelGet PVStructure* m_pvStructure; BitSet* m_bitSet; volatile bool m_first; - + private: ~ChannelImplGet() { @@ -181,10 +181,10 @@ class ChannelImplGet : public ChannelGet { PVDATA_REFCOUNT_MONITOR_CONSTRUCT(mockChannelGet); - // TODO pvRequest + // TODO pvRequest m_channelGetRequester->channelGetConnect(getStatusCreate()->getStatusOK(), this, m_pvStructure, m_bitSet); } - + virtual void get(bool lastRequest) { m_channelGetRequester->getDone(getStatusCreate()->getStatusOK()); @@ -193,17 +193,17 @@ class ChannelImplGet : public ChannelGet m_first = false; m_bitSet->set(0); // TODO } - + if (lastRequest) destroy(); } - + virtual void destroy() { delete m_bitSet; delete this; } - + }; @@ -222,7 +222,7 @@ class ChannelImplPut : public ChannelPut PVStructure* m_pvStructure; BitSet* m_bitSet; volatile bool m_first; - + private: ~ChannelImplPut() { @@ -236,17 +236,17 @@ class ChannelImplPut : public ChannelPut { PVDATA_REFCOUNT_MONITOR_CONSTRUCT(mockChannelPut); - // TODO pvRequest + // TODO pvRequest m_channelPutRequester->channelPutConnect(getStatusCreate()->getStatusOK(), this, m_pvStructure, m_bitSet); } - + virtual void put(bool lastRequest) { m_channelPutRequester->putDone(getStatusCreate()->getStatusOK()); if (lastRequest) destroy(); } - + virtual void get() { m_channelPutRequester->getDone(getStatusCreate()->getStatusOK()); @@ -257,7 +257,7 @@ class ChannelImplPut : public ChannelPut delete m_bitSet; delete this; } - + }; @@ -278,7 +278,7 @@ class MockMonitor : public Monitor, public MonitorElement volatile bool m_first; Mutex* m_lock; volatile int m_count; - + private: ~MockMonitor() { @@ -297,16 +297,16 @@ class MockMonitor : public Monitor, public MonitorElement PVDATA_REFCOUNT_MONITOR_CONSTRUCT(mockMonitor); m_changedBitSet->set(0); - - // TODO pvRequest + + // TODO pvRequest m_monitorRequester->monitorConnect(getStatusCreate()->getStatusOK(), this, const_cast(m_pvStructure->getStructure())); } - + virtual Status* start() { // fist monitor m_monitorRequester->monitorEvent(this); - + // client needs to delete status, so passing shared OK instance is not right thing to do return getStatusCreate()->createStatus(STATUSTYPE_OK, "Monitor started."); } @@ -337,24 +337,24 @@ class MockMonitor : public Monitor, public MonitorElement if (m_count) m_count--; } - + virtual void destroy() { delete stop(); - + delete m_lock; delete m_overrunBitSet; delete m_changedBitSet; delete this; } - + // ============ MonitorElement ============ - + virtual PVStructure* getPVStructure() { return m_pvStructure; } - + virtual BitSet* getChangedBitSet() { return m_changedBitSet; @@ -364,8 +364,8 @@ class MockMonitor : public Monitor, public MonitorElement { return m_overrunBitSet; } - - + + }; @@ -381,17 +381,17 @@ class ChannelImpl : public Channel { ChannelRequester* m_requester; String m_name; String m_remoteAddress; - + PVStructure* m_pvStructure; - + private: ~ChannelImpl() { PVDATA_REFCOUNT_MONITOR_DESTRUCT(mockChannel); } - + public: - + ChannelImpl( ChannelProvider* provider, ChannelRequester* requester, @@ -403,8 +403,8 @@ class ChannelImpl : public Channel { m_remoteAddress(remoteAddress) { PVDATA_REFCOUNT_MONITOR_CONSTRUCT(mockChannel); - - + + ScalarType stype = pvDouble; String allProperties("alarm,timeStamp,display,control,valueAlarm"); @@ -413,11 +413,11 @@ class ChannelImpl : public Channel { PVDouble *pvField = m_pvStructure->getDoubleField(String("value")); pvField->put(1.123); - + // already connected, report state m_requester->channelStateChange(this, CONNECTED); } - + virtual void destroy() { delete m_pvStructure; @@ -428,13 +428,13 @@ class ChannelImpl : public Channel { { return getChannelName(); }; - - virtual void message(String message,MessageType messageType) + + virtual void message(String message,MessageType messageType) { - std::cout << "[" << getRequesterName() << "] message(" << message << ", " << messageTypeName[messageType] << ")" << std::endl; + std::cout << "[" << getRequesterName() << "] message(" << message << ", " << messageTypeName[messageType] << ")" << std::endl; } - virtual ChannelProvider* getProvider() + virtual ChannelProvider* getProvider() { return m_provider; } @@ -502,7 +502,7 @@ class ChannelImpl : public Channel { // TODO return 0; } - + virtual ChannelRPC* createChannelRPC(ChannelRPCRequester *channelRPCRequester, epics::pvData::PVStructure *pvRequest) { @@ -539,24 +539,24 @@ class ChannelImplFind : public ChannelFind { // one instance for all, do not delete at all } - + virtual ChannelProvider* getChannelProvider() { return m_provider; }; - + virtual void cancelChannelFind() { throw std::runtime_error("not supported"); } - + private: - + // only to be destroyed by it friend class ChannelProviderImpl; virtual ~ChannelImplFind() {} - - ChannelProvider* m_provider; + + ChannelProvider* m_provider; }; class ChannelProviderImpl : public ChannelProvider { @@ -569,13 +569,13 @@ class ChannelProviderImpl : public ChannelProvider { { return "ChannelProviderImpl"; } - + virtual void destroy() { delete m_mockChannelFind; delete this; } - + virtual ChannelFind* channelFind( epics::pvData::String channelName, ChannelFindRequester *channelFindRequester) @@ -605,23 +605,22 @@ class ChannelProviderImpl : public ChannelProvider { return channel; } else - { + { Status* errorStatus = getStatusCreate()->createStatus(STATUSTYPE_ERROR, "only local supported", 0); channelRequester->channelCreated(errorStatus, 0); delete errorStatus; // TODO guard from CB return 0; } } - + private: ~ChannelProviderImpl() {}; - + ChannelImplFind* m_mockChannelFind; - + }; -class TransportRegistry; class ChannelSearchManager; class BlockingTCPConnector; class NamedLockPattern; @@ -631,8 +630,8 @@ class BeaconHandlerImpl; class ClientContextImpl : public ClientContext { public: - - ClientContextImpl() : + + ClientContextImpl() : m_addressList(""), m_autoAddressList(true), m_connectionTimeout(30.0f), m_beaconPeriod(15.0f), m_broadcastPort(CA_BROADCAST_PORT), m_receiveBufferSize(MAX_TCP_RECV), m_timer(0), m_broadcastTransport(0), m_searchTransport(0), m_connector(0), m_transportRegistry(0), @@ -641,7 +640,7 @@ class ClientContextImpl : public ClientContext { initialize(); } - + virtual Version* getVersion() { return m_version; } @@ -649,33 +648,33 @@ class ClientContextImpl : public ClientContext virtual ChannelProvider* getProvider() { return m_provider; } - + virtual void initialize() { m_provider = new ChannelProviderImpl(); } - + virtual void printInfo() { String info; printInfo(&info); std::cout << info.c_str() << std::endl; } - + virtual void printInfo(epics::pvData::StringBuilder out) { out->append(m_version->getVersionString()); } - + virtual void destroy() { m_provider->destroy(); delete m_version; delete this; } - + virtual void dispose() { destroy(); - } - + } + private: ~ClientContextImpl() {}; @@ -684,9 +683,9 @@ class ClientContextImpl : public ClientContext * Each address must be of the form: ip.number:port or host.name:port */ String m_addressList; - + /** - * Define whether or not the network interfaces should be discovered at runtime. + * Define whether or not the network interfaces should be discovered at runtime. */ bool m_autoAddressList; @@ -697,22 +696,22 @@ class ClientContextImpl : public ClientContext * the server is no longer present on the network and disconnect. */ float m_connectionTimeout; - + /** * Period in second between two beacon signals. */ float m_beaconPeriod; - + /** * Broadcast (beacon, search) port number to listen to. */ int m_broadcastPort; - + /** * Receive buffer size (max size of payload). */ int m_receiveBufferSize; - + /** * Timer. */ @@ -722,7 +721,7 @@ class ClientContextImpl : public ClientContext * Broadcast transport needed to listen for broadcasts. */ BlockingUDPTransport* m_broadcastTransport; - + /** * UDP transport needed for channel searches. */ @@ -735,7 +734,7 @@ class ClientContextImpl : public ClientContext /** * CA transport (virtual circuit) registry. - * This registry contains all active transports - connections to CA servers. + * This registry contains all active transports - connections to CA servers. */ TransportRegistry* m_transportRegistry; @@ -758,7 +757,7 @@ class ClientContextImpl : public ClientContext typedef int pvAccessID; /** - * Last CID cache. + * Last CID cache. */ pvAccessID m_lastCID; @@ -770,7 +769,7 @@ typedef int pvAccessID; IntResponseRequestMap m_pendingResponseRequests; /** - * Last IOID cache. + * Last IOID cache. */ pvAccessID m_lastIOID; @@ -786,7 +785,7 @@ typedef int pvAccessID; // TODO consider std::unordered_map typedef std::map AddressBeaconHandlerMap; AddressBeaconHandlerMap m_beaconHandlers; - + /** * Version. */ @@ -804,7 +803,7 @@ class ChannelFindRequesterImpl : public ChannelFindRequester virtual void channelFindResult(epics::pvData::Status *status,ChannelFind *channelFind,bool wasFound) { std::cout << "[ChannelFindRequesterImpl] channelFindResult(" - << status->toString() << ", ..., " << wasFound << ")" << std::endl; + << status->toString() << ", ..., " << wasFound << ")" << std::endl; } }; @@ -814,10 +813,10 @@ class ChannelRequesterImpl : public ChannelRequester { return "ChannelRequesterImpl"; }; - - virtual void message(String message,MessageType messageType) + + virtual void message(String message,MessageType messageType) { - std::cout << "[" << getRequesterName() << "] message(" << message << ", " << messageTypeName[messageType] << ")" << std::endl; + std::cout << "[" << getRequesterName() << "] message(" << message << ", " << messageTypeName[messageType] << ")" << std::endl; } virtual void channelCreated(epics::pvData::Status* status, Channel *channel) @@ -825,7 +824,7 @@ class ChannelRequesterImpl : public ChannelRequester std::cout << "channelCreated(" << status->toString() << ", " << (channel ? channel->getChannelName() : "(null)") << ")" << std::endl; } - + virtual void channelStateChange(Channel *c, ConnectionState connectionState) { std::cout << "channelStateChange(" << c->getChannelName() << ", " << ConnectionStateNames[connectionState] << ")" << std::endl; @@ -838,10 +837,10 @@ class GetFieldRequesterImpl : public GetFieldRequester { return "GetFieldRequesterImpl"; }; - - virtual void message(String message,MessageType messageType) + + virtual void message(String message,MessageType messageType) { - std::cout << "[" << getRequesterName() << "] message(" << message << ", " << messageTypeName[messageType] << ")" << std::endl; + std::cout << "[" << getRequesterName() << "] message(" << message << ", " << messageTypeName[messageType] << ")" << std::endl; } virtual void getDone(epics::pvData::Status *status,epics::pvData::FieldConstPtr field) @@ -864,22 +863,22 @@ class ChannelGetRequesterImpl : public ChannelGetRequester ChannelGet *m_channelGet; epics::pvData::PVStructure *m_pvStructure; epics::pvData::BitSet *m_bitSet; - + virtual String getRequesterName() { return "ChannelGetRequesterImpl"; }; - - virtual void message(String message,MessageType messageType) + + virtual void message(String message,MessageType messageType) { - std::cout << "[" << getRequesterName() << "] message(" << message << ", " << messageTypeName[messageType] << ")" << std::endl; + std::cout << "[" << getRequesterName() << "] message(" << message << ", " << messageTypeName[messageType] << ")" << std::endl; } virtual void channelGetConnect(epics::pvData::Status *status,ChannelGet *channelGet, epics::pvData::PVStructure *pvStructure,epics::pvData::BitSet *bitSet) { std::cout << "channelGetConnect(" << status->toString() << ")" << std::endl; - + // TODO sync m_channelGet = channelGet; m_pvStructure = pvStructure; @@ -901,22 +900,22 @@ class ChannelPutRequesterImpl : public ChannelPutRequester ChannelPut *m_channelPut; epics::pvData::PVStructure *m_pvStructure; epics::pvData::BitSet *m_bitSet; - + virtual String getRequesterName() { return "ChannelPutRequesterImpl"; }; - - virtual void message(String message,MessageType messageType) + + virtual void message(String message,MessageType messageType) { - std::cout << "[" << getRequesterName() << "] message(" << message << ", " << messageTypeName[messageType] << ")" << std::endl; + std::cout << "[" << getRequesterName() << "] message(" << message << ", " << messageTypeName[messageType] << ")" << std::endl; } virtual void channelPutConnect(epics::pvData::Status *status,ChannelPut *channelPut, epics::pvData::PVStructure *pvStructure,epics::pvData::BitSet *bitSet) { std::cout << "channelPutConnect(" << status->toString() << ")" << std::endl; - + // TODO sync m_channelPut = channelPut; m_pvStructure = pvStructure; @@ -942,20 +941,20 @@ class ChannelPutRequesterImpl : public ChannelPutRequester } }; - - + + class MonitorRequesterImpl : public MonitorRequester { virtual String getRequesterName() { return "MonitorRequesterImpl"; }; - - virtual void message(String message,MessageType messageType) + + virtual void message(String message,MessageType messageType) { - std::cout << "[" << getRequesterName() << "] message(" << message << ", " << messageTypeName[messageType] << ")" << std::endl; + std::cout << "[" << getRequesterName() << "] message(" << message << ", " << messageTypeName[messageType] << ")" << std::endl; } - + virtual void monitorConnect(Status* status, Monitor* monitor, Structure* structure) { std::cout << "monitorConnect(" << status->toString() << ")" << std::endl; @@ -966,13 +965,13 @@ class MonitorRequesterImpl : public MonitorRequester std::cout << str << std::endl; } } - + virtual void monitorEvent(Monitor* monitor) { std::cout << "monitorEvent" << std::endl; MonitorElement* element = monitor->poll(); - + String str("changed/overrun "); element->getChangedBitSet()->toString(&str); str += '/'; @@ -980,35 +979,35 @@ class MonitorRequesterImpl : public MonitorRequester str += '\n'; element->getPVStructure()->toString(&str); std::cout << str << std::endl; - + monitor->release(element); } - + virtual void unlisten(Monitor* monitor) { std::cout << "unlisten" << std::endl; } -}; +}; class ChannelProcessRequesterImpl : public ChannelProcessRequester { ChannelProcess *m_channelProcess; - + virtual String getRequesterName() { return "ProcessRequesterImpl"; }; - - virtual void message(String message,MessageType messageType) + + virtual void message(String message,MessageType messageType) { - std::cout << "[" << getRequesterName() << "] message(" << message << ", " << messageTypeName[messageType] << ")" << std::endl; + std::cout << "[" << getRequesterName() << "] message(" << message << ", " << messageTypeName[messageType] << ")" << std::endl; } virtual void channelProcessConnect(epics::pvData::Status *status,ChannelProcess *channelProcess) { std::cout << "channelProcessConnect(" << status->toString() << ")" << std::endl; - + // TODO sync m_channelProcess = channelProcess; } @@ -1024,13 +1023,13 @@ int main(int argc,char *argv[]) { ClientContextImpl* context = new ClientContextImpl(); context->printInfo(); - - + + ChannelFindRequesterImpl findRequester; context->getProvider()->channelFind("something", &findRequester); - + ChannelRequesterImpl channelRequester; - //Channel* noChannel + //Channel* noChannel context->getProvider()->createChannel("test", &channelRequester, ChannelProvider::PRIORITY_DEFAULT, "over the rainbow"); Channel* channel = context->getProvider()->createChannel("test", &channelRequester); @@ -1038,19 +1037,19 @@ int main(int argc,char *argv[]) /* GetFieldRequesterImpl getFieldRequesterImpl; channel->getField(&getFieldRequesterImpl, "timeStamp.secondsPastEpoch"); - + ChannelGetRequesterImpl channelGetRequesterImpl; ChannelGet* channelGet = channel->createChannelGet(&channelGetRequesterImpl, 0); channelGet->get(false); channelGet->destroy(); - + ChannelPutRequesterImpl channelPutRequesterImpl; ChannelPut* channelPut = channel->createChannelPut(&channelPutRequesterImpl, 0); channelPut->get(); channelPut->put(false); channelPut->destroy(); - - + + MonitorRequesterImpl monitorRequesterImpl; Monitor* monitor = channel->createMonitor(&monitorRequesterImpl, 0); @@ -1063,19 +1062,19 @@ int main(int argc,char *argv[]) ChannelProcess* channelProcess = channel->createChannelProcess(&channelProcessRequester, 0); channelProcess->process(false); channelProcess->destroy(); - + status = monitor->stop(); std::cout << "monitor->stop() = " << status->toString() << std::endl; delete status; - - + + monitor->destroy(); */ channel->destroy(); - + context->destroy(); - + std::cout << "-----------------------------------------------------------------------" << std::endl; getShowConstructDestruct()->constuctDestructTotals(stdout); return(0); diff --git a/testApp/utils/transportRegistryTest.cpp b/testApp/utils/transportRegistryTest.cpp index 872c42f..836794c 100644 --- a/testApp/utils/transportRegistryTest.cpp +++ b/testApp/utils/transportRegistryTest.cpp @@ -4,6 +4,7 @@ */ #include "transportRegistry.h" +#include "introspectionRegistry.h" #include "showConstructDestruct.h" #include @@ -39,6 +40,7 @@ namespace epics { virtual void verified(){}; virtual void enqueueSendRequest(TransportSender* sender){}; virtual void ensureData(int) {}; + virtual IntrospectionRegistry* getIntrospectionRegistry() {return NULL;}; private: string _type; int16 _priority;