From f2ee8de75854c97fafea5614160f7637e7f408cc Mon Sep 17 00:00:00 2001 From: Matej Sekoranja Date: Sun, 9 Mar 2014 01:36:44 +0100 Subject: [PATCH] codec: BlockingTCPClientTransportCodec --- pvAccessApp/remote/codec.cpp | 279 +++++++++++++++++++++++++++++++++++ pvAccessApp/remote/codec.h | 160 +++++++++++++++++--- 2 files changed, 420 insertions(+), 19 deletions(-) diff --git a/pvAccessApp/remote/codec.cpp b/pvAccessApp/remote/codec.cpp index 87abd57..081e42a 100644 --- a/pvAccessApp/remote/codec.cpp +++ b/pvAccessApp/remote/codec.cpp @@ -1602,4 +1602,283 @@ namespace epics { control->flush(true); } } + + + + // TODO + /* + + void BlockingServerTCPTransportCodec::destroyAllChannels() { + Lock lock(_channelsMutex); + if(_channels.size()==0) return; + + char ipAddrStr[64]; + ipAddrToDottedIP(&_socketAddress.ia, ipAddrStr, sizeof(ipAddrStr)); + + LOG( + logLevelDebug, + "Transport to %s still has %u channel(s) active and closing...", + ipAddrStr, (unsigned int)_channels.size()); + + map::iterator it = _channels.begin(); + for(; it!=_channels.end(); it++) + it->second->destroy(); + + _channels.clear(); + } + + void BlockingServerTCPTransportCodec::internalClose(bool force) { + Transport::shared_pointer thisSharedPtr = shared_from_this(); + BlockingTCPTransport::internalClose(force); + destroyAllChannels(); + } + + void BlockingServerTCPTransportCodec::internalPostClose(bool forced) { + BlockingTCPTransport::internalPostClose(forced); + } +*/ + + + BlockingClientTCPTransportCodec::BlockingClientTCPTransportCodec( + Context::shared_pointer const & context, + SOCKET channel, + std::auto_ptr& responseHandler, + int32_t sendBufferSize, + int32_t receiveBufferSize, + TransportClient::shared_pointer const & client, + epics::pvData::int8 remoteTransportRevision, + float beaconInterval, + int16_t priority ) : + BlockingTCPTransportCodec(context, channel, responseHandler, + sendBufferSize, receiveBufferSize, priority), + _connectionTimeout(beaconInterval*1000), + _unresponsiveTransport(false), + _verifyOrEcho(true), + _verified(false) + { + // initialize owners list, send queue + acquire(client); + + // use immediate for clients + //setFlushStrategy(DELAYED); + + // setup connection timeout timer (watchdog) - moved to start() method + epicsTimeGetCurrent(&_aliveTimestamp); + } + + void BlockingClientTCPTransportCodec::start() + { + TimerCallbackPtr tcb = std::tr1::dynamic_pointer_cast(shared_from_this()); + _context->getTimer()->schedulePeriodic(tcb, _connectionTimeout, _connectionTimeout); + BlockingTCPTransportCodec::start(); + } + + BlockingClientTCPTransportCodec::~BlockingClientTCPTransportCodec() { + } + + + + + + + + + + void BlockingClientTCPTransportCodec::callback() { + epicsTimeStamp currentTime; + epicsTimeGetCurrent(¤tTime); + + _mutex.lock(); + // no exception expected here + double diff = epicsTimeDiffInSeconds(¤tTime, &_aliveTimestamp); + _mutex.unlock(); + + if(diff>2*_connectionTimeout) { + unresponsiveTransport(); + } + // use some k (3/4) to handle "jitter" + else if(diff>=((3*_connectionTimeout)/4)) { + // send echo + TransportSender::shared_pointer transportSender = std::tr1::dynamic_pointer_cast(shared_from_this()); + enqueueSendRequest(transportSender); + } + } + +#define EXCEPTION_GUARD(code) try { code; } \ + catch (std::exception &e) { LOG(logLevelError, "Unhandled exception caught from code at %s:%d: %s", __FILE__, __LINE__, e.what()); } \ + catch (...) { LOG(logLevelError, "Unhandled exception caught from code at %s:%d.", __FILE__, __LINE__); } + + void BlockingClientTCPTransportCodec::unresponsiveTransport() { + Lock lock(_mutex); + if(!_unresponsiveTransport) { + _unresponsiveTransport = true; + + TransportClientMap_t::iterator it = _owners.begin(); + for(; it!=_owners.end(); it++) { + TransportClient::shared_pointer client = it->second.lock(); + if (client) + { + EXCEPTION_GUARD(client->transportUnresponsive()); + } + } + } + } + + bool BlockingClientTCPTransportCodec::acquire(TransportClient::shared_pointer const & client) { + Lock lock(_mutex); + if(isClosed()) return false; + + char ipAddrStr[48]; + ipAddrToDottedIP(&_socketAddress.ia, ipAddrStr, sizeof(ipAddrStr)); + LOG(logLevelDebug, "Acquiring transport to %s.", ipAddrStr); + + _owners[client->getID()] = TransportClient::weak_pointer(client); + //_owners.insert(TransportClient::weak_pointer(client)); + + return true; + } + + // _mutex is held when this method is called + void BlockingClientTCPTransportCodec::internalClose(bool forced) { +// TODO !!! BlockingTCPTransportCodec::internalClose(forced); + + TimerCallbackPtr tcb = std::tr1::dynamic_pointer_cast(shared_from_this()); + _context->getTimer()->cancel(tcb); + } + + void BlockingClientTCPTransportCodec::internalPostClose(bool forced) { +// TODO !!! BlockingTCPTransportCodec::internalPostClose(forced); + + // _owners cannot change when transport is closed + closedNotifyClients(); + } + + /** + * Notifies clients about disconnect. + */ + void BlockingClientTCPTransportCodec::closedNotifyClients() { + + // check if still acquired + size_t refs = _owners.size(); + if(refs>0) { + char ipAddrStr[48]; + ipAddrToDottedIP(&_socketAddress.ia, ipAddrStr, sizeof(ipAddrStr)); + LOG( + logLevelDebug, + "Transport to %s still has %d client(s) active and closing...", + ipAddrStr, refs); + + TransportClientMap_t::iterator it = _owners.begin(); + for(; it!=_owners.end(); it++) { + TransportClient::shared_pointer client = it->second.lock(); + if (client) + { + EXCEPTION_GUARD(client->transportClosed()); + } + } + + } + + _owners.clear(); + } + + //void BlockingClientTCPTransportCodec::release(TransportClient::shared_pointer const & client) { + void BlockingClientTCPTransportCodec::release(pvAccessID clientID) { + Lock lock(_mutex); + if(isClosed()) return; + + char ipAddrStr[48]; + ipAddrToDottedIP(&_socketAddress.ia, ipAddrStr, sizeof(ipAddrStr)); + + LOG(logLevelDebug, "Releasing transport to %s.", ipAddrStr); + + _owners.erase(clientID); + //_owners.erase(TransportClient::weak_pointer(client)); + + // not used anymore, close it + // TODO consider delayed destruction (can improve performance!!!) + if(_owners.size()==0) close(); // TODO close(false) + } + + void BlockingClientTCPTransportCodec::aliveNotification() { + Lock guard(_mutex); + epicsTimeGetCurrent(&_aliveTimestamp); + if(_unresponsiveTransport) responsiveTransport(); + } + + bool BlockingClientTCPTransportCodec::verify(epics::pvData::int32 timeoutMs) { + return _verifiedEvent.wait(timeoutMs/1000.0); + } + + void BlockingClientTCPTransportCodec::verified() { + epics::pvData::Lock lock(_verifiedMutex); + _verified = true; + _verifiedEvent.signal(); + } + + void BlockingClientTCPTransportCodec::responsiveTransport() { + Lock lock(_mutex); + if(_unresponsiveTransport) { + _unresponsiveTransport = false; + + Transport::shared_pointer thisSharedPtr = shared_from_this(); + TransportClientMap_t::iterator it = _owners.begin(); + for(; it!=_owners.end(); it++) { + TransportClient::shared_pointer client = it->second.lock(); + if (client) + { + EXCEPTION_GUARD(client->transportResponsive(thisSharedPtr)); + } + } + } + } + + void BlockingClientTCPTransportCodec::changedTransport() { + _outgoingIR.reset(); + + Lock lock(_mutex); + TransportClientMap_t::iterator it = _owners.begin(); + for(; it!=_owners.end(); it++) { + TransportClient::shared_pointer client = it->second.lock(); + if (client) + { + EXCEPTION_GUARD(client->transportChanged()); + } + } + } + + void BlockingClientTCPTransportCodec::send(ByteBuffer* buffer, + TransportSendControl* control) { + if(_verifyOrEcho) { + /* + * send verification response message + */ + + control->startMessage(CMD_CONNECTION_VALIDATION, 2*sizeof(int32)+sizeof(int16)); + + // receive buffer size + buffer->putInt(static_cast(getReceiveBufferSize())); + + // socket receive buffer size + buffer->putInt(static_cast(getSocketReceiveBufferSize())); + + // connection priority + buffer->putShort(getPriority()); + + // send immediately + control->flush(true); + + _verifyOrEcho = false; + } + else { + control->startMessage(CMD_ECHO, 0); + // send immediately + control->flush(true); + } + + } + + + + } diff --git a/pvAccessApp/remote/codec.h b/pvAccessApp/remote/codec.h index ba4a777..21146f9 100644 --- a/pvAccessApp/remote/codec.h +++ b/pvAccessApp/remote/codec.h @@ -648,7 +648,7 @@ namespace epics { ): BlockingSocketAbstractCodec(channel, sendBufferSize, receiveBufferSize), _context(context), _responseHandler(responseHandler), - _verified(false), _remoteTransportReceiveBufferSize(MAX_TCP_RECV), + _remoteTransportReceiveBufferSize(MAX_TCP_RECV), _remoteTransportRevision(0), _priority(priority) { LOG(logLevelTrace, @@ -656,22 +656,18 @@ namespace epics { epicsThreadGetIdSelf()); } - - private: - Context::shared_pointer _context; - std::auto_ptr _responseHandler; - bool _verified; - size_t _remoteTransportReceiveBufferSize; - epics::pvData::int8 _remoteTransportRevision; - epics::pvData::int16 _priority; osiSockAddr _socketAddress; - epics::pvData::Mutex _verifiedMutex; - epics::pvData::Event _verifiedEvent; IntrospectionRegistry _incomingIR; IntrospectionRegistry _outgoingIR; + private: + + std::auto_ptr _responseHandler; + size_t _remoteTransportReceiveBufferSize; + epics::pvData::int8 _remoteTransportRevision; + epics::pvData::int16 _priority; }; @@ -757,14 +753,6 @@ namespace epics { // noop } - void acquire() { - // noop, since does not make sence on itself - } - - void release() { - // noop, since does not make sence on itself - } - bool verify(epics::pvData::int32 timeoutMs) { LOG(logLevelTrace, @@ -806,6 +794,140 @@ namespace epics { epics::pvData::Mutex _channelsMutex; }; + + class BlockingClientTCPTransportCodec : + public BlockingTCPTransportCodec, + public TransportSender, + public epics::pvData::TimerCallback { + + public: + POINTER_DEFINITIONS(BlockingClientTCPTransportCodec); + + protected: + BlockingClientTCPTransportCodec( + Context::shared_pointer const & context, + SOCKET channel, + std::auto_ptr& responseHandler, + int32_t sendBufferSize, + int32_t receiveBufferSize, + TransportClient::shared_pointer const & client, + epics::pvData::int8 remoteTransportRevision, + float beaconInterval, + int16_t priority ); + + public: + static shared_pointer create( + Context::shared_pointer const & context, + SOCKET channel, + std::auto_ptr& responseHandler, + int32_t sendBufferSize, + int32_t receiveBufferSize, + TransportClient::shared_pointer const & client, + int8_t remoteTransportRevision, + float beaconInterval, + int16_t priority ) + { + shared_pointer thisPointer( + new BlockingClientTCPTransportCodec( + context, channel, responseHandler, + sendBufferSize, receiveBufferSize, + client, remoteTransportRevision, + beaconInterval, priority) + ); + thisPointer->activate(); + return thisPointer; + } + + public: + + void start(); + + virtual ~BlockingClientTCPTransportCodec(); + + virtual void timerStopped() { + // noop + } + + virtual void callback(); + + bool acquire(TransportClient::shared_pointer const & client); + + void release(pvAccessID clientId); + + void changedTransport(); + + void lock() { + // noop + } + + void unlock() { + // noop + } + + bool verify(epics::pvData::int32 timeoutMs); + + void verified(); + + void aliveNotification(); + + void send(epics::pvData::ByteBuffer* buffer, + TransportSendControl* control); + + protected: + + virtual void internalClose(bool force); + virtual void internalPostClose(bool force); + + private: + + /** + * Owners (users) of the transport. + */ + // TODO consider using TR1 hash map + typedef std::map TransportClientMap_t; + TransportClientMap_t _owners; + + /** + * Connection timeout (no-traffic) flag. + */ + double _connectionTimeout; + + /** + * Unresponsive transport flag. + */ + bool _unresponsiveTransport; + + /** + * Timestamp of last "live" event on this transport. + */ + epicsTimeStamp _aliveTimestamp; + + bool _verifyOrEcho; + + /** + * Unresponsive transport notify. + */ + void unresponsiveTransport(); + + /** + * Notifies clients about disconnect. + */ + void closedNotifyClients(); + + /** + * Responsive transport notify. + */ + void responsiveTransport(); + + + epics::pvData::Mutex _mutex; + + bool _verified; + epics::pvData::Mutex _verifiedMutex; + epics::pvData::Event _verifiedEvent; + + }; + } }