diff --git a/src/remote/codec.cpp b/src/remote/codec.cpp index 6d4b3eb..fd70ed8 100644 --- a/src/remote/codec.cpp +++ b/src/remote/codec.cpp @@ -1047,24 +1047,24 @@ bool AbstractCodec::directDeserialize(ByteBuffer *existingBuffer, char* deserial // // -BlockingSocketAbstractCodec::~BlockingSocketAbstractCodec() +BlockingTCPTransportCodec::~BlockingTCPTransportCodec() { assert(!_isOpen.get()); _sendThread.exitWait(); _readThread.exitWait(); } -void BlockingSocketAbstractCodec::readPollOne() { +void BlockingTCPTransportCodec::readPollOne() { throw std::logic_error("should not be called for blocking IO"); } -void BlockingSocketAbstractCodec::writePollOne() { +void BlockingTCPTransportCodec::writePollOne() { throw std::logic_error("should not be called for blocking IO"); } -void BlockingSocketAbstractCodec::close() { +void BlockingTCPTransportCodec::close() { if (_isOpen.getAndSet(false)) { @@ -1083,26 +1083,37 @@ void BlockingSocketAbstractCodec::close() { } } -void BlockingSocketAbstractCodec::internalClose(bool /*force*/) +void BlockingTCPTransportCodec::internalClose(bool /*force*/) { this->internalDestroy(); + + // TODO sync + if (_securitySession) + _securitySession->close(); + + if (IS_LOGGABLE(logLevelDebug)) + { + LOG(logLevelDebug, + "TCP socket to %s is to be closed.", + inetAddressToString(_socketAddress).c_str()); + } } -void BlockingSocketAbstractCodec::internalPostClose(bool /*force*/) { +void BlockingTCPTransportCodec::internalPostClose(bool /*force*/) { } -bool BlockingSocketAbstractCodec::terminated() { +bool BlockingTCPTransportCodec::terminated() { return !isOpen(); } -bool BlockingSocketAbstractCodec::isOpen() { +bool BlockingTCPTransportCodec::isOpen() { return _isOpen.get(); } // NOTE: must not be called from constructor (e.g. needs shared_from_this()) -void BlockingSocketAbstractCodec::start() { +void BlockingTCPTransportCodec::start() { _readThread.start(); @@ -1111,7 +1122,7 @@ void BlockingSocketAbstractCodec::start() { } -void BlockingSocketAbstractCodec::receiveThread() +void BlockingTCPTransportCodec::receiveThread() { Transport::shared_pointer ptr = this->shared_from_this(); @@ -1134,7 +1145,7 @@ void BlockingSocketAbstractCodec::receiveThread() } -void BlockingSocketAbstractCodec::sendThread() +void BlockingTCPTransportCodec::sendThread() { Transport::shared_pointer ptr = this->shared_from_this(); @@ -1160,7 +1171,7 @@ void BlockingSocketAbstractCodec::sendThread() } -void BlockingSocketAbstractCodec::sendBufferFull(int tries) { +void BlockingTCPTransportCodec::sendBufferFull(int tries) { // TODO constants epicsThreadSleep(std::max(tries * 0.1, 1)); } @@ -1168,17 +1179,16 @@ void BlockingSocketAbstractCodec::sendBufferFull(int tries) { // // -// BlockingSocketAbstractCodec +// BlockingTCPTransportCodec // // // -BlockingSocketAbstractCodec::BlockingSocketAbstractCodec( - bool serverFlag, - SOCKET channel, +BlockingTCPTransportCodec::BlockingTCPTransportCodec(bool serverFlag, const Context::shared_pointer &context, + SOCKET channel, const ResponseHandler::shared_pointer &responseHandler, int32_t sendBufferSize, - int32_t receiveBufferSize) + int32_t receiveBufferSize, int16 priority) :AbstractCodec( serverFlag, std::tr1::shared_ptr(new ByteBuffer((std::max((std::size_t)( @@ -1188,15 +1198,19 @@ BlockingSocketAbstractCodec::BlockingSocketAbstractCodec( MAX_ENSURE_DATA_BUFFER_SIZE), receiveBufferSize) + (PVA_ALIGNMENT - 1)) & (~(PVA_ALIGNMENT - 1)))), sendBufferSize, true) - ,_readThread(epics::pvData::Thread::Config(this, &BlockingSocketAbstractCodec::receiveThread) + ,_readThread(epics::pvData::Thread::Config(this, &BlockingTCPTransportCodec::receiveThread) .prio(epicsThreadPriorityCAServerLow) .name("TCP-rx") .autostart(false)) - ,_sendThread(epics::pvData::Thread::Config(this, &BlockingSocketAbstractCodec::sendThread) + ,_sendThread(epics::pvData::Thread::Config(this, &BlockingTCPTransportCodec::sendThread) .prio(epicsThreadPriorityCAServerLow) .name("TCP-tx") .autostart(false)) ,_channel(channel) + ,_context(context), _responseHandler(responseHandler) + ,_remoteTransportReceiveBufferSize(MAX_TCP_RECV) + ,_remoteTransportRevision(0), _priority(priority) + ,_verified(false) { _isOpen.getAndSet(true); @@ -1219,7 +1233,7 @@ BlockingSocketAbstractCodec::BlockingSocketAbstractCodec( } // must be called only once, when there will be no operation on socket (e.g. just before tx/rx thread exists) -void BlockingSocketAbstractCodec::internalDestroy() { +void BlockingTCPTransportCodec::internalDestroy() { if(_channel != INVALID_SOCKET) { @@ -1255,15 +1269,17 @@ void BlockingSocketAbstractCodec::internalDestroy() { _channel = INVALID_SOCKET; //TODO: mutex to guard _channel } + Transport::shared_pointer thisSharedPtr = this->shared_from_this(); + _context->getTransportRegistry()->remove(thisSharedPtr); } -void BlockingSocketAbstractCodec::invalidDataStreamHandler() { +void BlockingTCPTransportCodec::invalidDataStreamHandler() { close(); } -int BlockingSocketAbstractCodec::write( +int BlockingTCPTransportCodec::write( epics::pvData::ByteBuffer *src) { std::size_t remaining; @@ -1300,7 +1316,7 @@ int BlockingSocketAbstractCodec::write( } -std::size_t BlockingSocketAbstractCodec::getSocketReceiveBufferSize() +std::size_t BlockingTCPTransportCodec::getSocketReceiveBufferSize() const { osiSocklen_t intLen = sizeof(int); @@ -1321,7 +1337,7 @@ const { } -int BlockingSocketAbstractCodec::read(epics::pvData::ByteBuffer* dst) { +int BlockingTCPTransportCodec::read(epics::pvData::ByteBuffer* dst) { std::size_t remaining; while((remaining=dst->getRemaining()) > 0) { @@ -1366,22 +1382,6 @@ int BlockingSocketAbstractCodec::read(epics::pvData::ByteBuffer* dst) { } -void BlockingTCPTransportCodec::internalClose(bool force) { - BlockingSocketAbstractCodec::internalClose(force); - - // TODO sync - if (_securitySession) - _securitySession->close(); - - if (IS_LOGGABLE(logLevelDebug)) - { - LOG(logLevelDebug, - "TCP socket to %s is to be closed.", - inetAddressToString(_socketAddress).c_str()); - } -} - - bool BlockingTCPTransportCodec::verify(epics::pvData::int32 timeoutMs) { return _verifiedEvent.wait(timeoutMs/1000.0) && _verified; } diff --git a/src/remote/pv/codec.h b/src/remote/pv/codec.h index 6c69b63..94c113a 100644 --- a/src/remote/pv/codec.h +++ b/src/remote/pv/codec.h @@ -293,23 +293,25 @@ private: }; -/////////////////////////////////////////////////////////////////////////////////////////////////////////// - -class epicsShareClass BlockingSocketAbstractCodec: +class epicsShareClass BlockingTCPTransportCodec: public AbstractCodec, - public std::tr1::enable_shared_from_this + public SecurityPluginControl, + public std::tr1::enable_shared_from_this { public: - POINTER_DEFINITIONS(BlockingSocketAbstractCodec); + POINTER_DEFINITIONS(BlockingTCPTransportCodec); - BlockingSocketAbstractCodec( + BlockingTCPTransportCodec( bool serverFlag, + Context::shared_pointer const & context, SOCKET channel, + ResponseHandler::shared_pointer const & responseHandler, int32_t sendBufferSize, - int32_t receiveBufferSize); - virtual ~BlockingSocketAbstractCodec(); + int32_t receiveBufferSize, + epics::pvData::int16 priority); + virtual ~BlockingTCPTransportCodec(); virtual void readPollOne() OVERRIDE FINAL; virtual void writePollOne() OVERRIDE FINAL; @@ -328,56 +330,12 @@ public: virtual void invalidDataStreamHandler() OVERRIDE FINAL; virtual std::size_t getSocketReceiveBufferSize() const OVERRIDE FINAL; -private: - void receiveThread(); - void sendThread(); - -protected: - virtual void sendBufferFull(int tries) OVERRIDE FINAL; - virtual void internalDestroy(); - - /** - * Called to any resources just before closing transport - * @param[in] force flag indicating if forced (e.g. forced - * disconnect) is required - */ - virtual void internalClose(bool force); - - /** - * Called to any resources just after closing transport and without any locks held on transport - * @param[in] force flag indicating if forced (e.g. forced - * disconnect) is required - */ - virtual void internalPostClose(bool force); - -private: - AtomicValue _isOpen; - epics::pvData::Thread _readThread, _sendThread; - epics::pvData::Event _shutdownEvent; -protected: - SOCKET _channel; - osiSockAddr _socketAddress; - std::string _socketName; -}; - -class BlockingTCPTransportCodec : - public BlockingSocketAbstractCodec, - public SecurityPluginControl - -{ - -public: - virtual std::string getType() const OVERRIDE FINAL { return std::string("tcp"); } - virtual void internalDestroy() OVERRIDE FINAL { - BlockingSocketAbstractCodec::internalDestroy(); - Transport::shared_pointer thisSharedPtr = this->shared_from_this(); - _context->getTransportRegistry()->remove(thisSharedPtr); - } + void internalDestroy(); virtual void changedTransport() OVERRIDE {} @@ -453,7 +411,7 @@ public: } - virtual void flushSendQueue() OVERRIDE FINAL { }; + virtual void flushSendQueue() OVERRIDE FINAL { } virtual bool isClosed() OVERRIDE FINAL { @@ -485,27 +443,36 @@ public: virtual void sendSecurityPluginMessage(epics::pvData::PVField::shared_pointer const & data) OVERRIDE FINAL; +private: + void receiveThread(); + void sendThread(); + protected: + virtual void sendBufferFull(int tries) OVERRIDE FINAL; - BlockingTCPTransportCodec( - bool serverFlag, - Context::shared_pointer const & context, - SOCKET channel, - ResponseHandler::shared_pointer const & responseHandler, - int32_t sendBufferSize, - int32_t receiveBufferSize, - epics::pvData::int16 priority - ): - BlockingSocketAbstractCodec(serverFlag, channel, sendBufferSize, receiveBufferSize), - _context(context), _responseHandler(responseHandler), - _remoteTransportReceiveBufferSize(MAX_TCP_RECV), - _remoteTransportRevision(0), _priority(priority), - _verified(false) - { - } + /** + * Called to any resources just before closing transport + * @param[in] force flag indicating if forced (e.g. forced + * disconnect) is required + */ + virtual void internalClose(bool force); - virtual void internalClose(bool force) OVERRIDE; + /** + * Called to any resources just after closing transport and without any locks held on transport + * @param[in] force flag indicating if forced (e.g. forced + * disconnect) is required + */ + virtual void internalPostClose(bool force); +private: + AtomicValue _isOpen; + epics::pvData::Thread _readThread, _sendThread; + epics::pvData::Event _shutdownEvent; +protected: + SOCKET _channel; + osiSockAddr _socketAddress; + std::string _socketName; +protected: Context::shared_pointer _context; IntrospectionRegistry _incomingIR; @@ -523,11 +490,8 @@ private: bool _verified; epics::pvData::Mutex _verifiedMutex; epics::pvData::Event _verifiedEvent; - }; -/////////////////////////////////////////////////////////////////////////////////////////////////////////// - class epicsShareClass BlockingServerTCPTransportCodec : public BlockingTCPTransportCodec, public ChannelHostingTransport,