From e47124aa3046bfaa632420188efece4f0aa0e610 Mon Sep 17 00:00:00 2001 From: Michael Davidsaver Date: Mon, 15 May 2017 16:26:56 -0400 Subject: [PATCH] collapse BlockingAbstractCodec into BlockingSocketAbstractCodec --- src/remote/codec.cpp | 73 +++++++++++++++++++------------------------ src/remote/pv/codec.h | 58 +++++++++++++--------------------- 2 files changed, 54 insertions(+), 77 deletions(-) diff --git a/src/remote/codec.cpp b/src/remote/codec.cpp index 15f6c2e..6d4b3eb 100644 --- a/src/remote/codec.cpp +++ b/src/remote/codec.cpp @@ -1047,42 +1047,24 @@ bool AbstractCodec::directDeserialize(ByteBuffer *existingBuffer, char* deserial // // -BlockingAbstractCodec::BlockingAbstractCodec( - bool serverFlag, - std::tr1::shared_ptr const & receiveBuffer, - std::tr1::shared_ptr const & sendBuffer, - int32_t socketSendBufferSize) - :AbstractCodec(serverFlag, receiveBuffer, sendBuffer, socketSendBufferSize, true) - ,_readThread(epics::pvData::Thread::Config(this, &BlockingAbstractCodec::receiveThread) - .prio(epicsThreadPriorityCAServerLow) - .name("TCP-rx") - .autostart(false)) - ,_sendThread(epics::pvData::Thread::Config(this, &BlockingAbstractCodec::sendThread) - .prio(epicsThreadPriorityCAServerLow) - .name("TCP-tx") - .autostart(false)) -{ - _isOpen.getAndSet(true); -} - -BlockingAbstractCodec::~BlockingAbstractCodec() +BlockingSocketAbstractCodec::~BlockingSocketAbstractCodec() { assert(!_isOpen.get()); _sendThread.exitWait(); _readThread.exitWait(); } -void BlockingAbstractCodec::readPollOne() { +void BlockingSocketAbstractCodec::readPollOne() { throw std::logic_error("should not be called for blocking IO"); } -void BlockingAbstractCodec::writePollOne() { +void BlockingSocketAbstractCodec::writePollOne() { throw std::logic_error("should not be called for blocking IO"); } -void BlockingAbstractCodec::close() { +void BlockingSocketAbstractCodec::close() { if (_isOpen.getAndSet(false)) { @@ -1101,26 +1083,26 @@ void BlockingAbstractCodec::close() { } } -void BlockingAbstractCodec::internalClose(bool /*force*/) +void BlockingSocketAbstractCodec::internalClose(bool /*force*/) { this->internalDestroy(); } -void BlockingAbstractCodec::internalPostClose(bool /*force*/) { +void BlockingSocketAbstractCodec::internalPostClose(bool /*force*/) { } -bool BlockingAbstractCodec::terminated() { +bool BlockingSocketAbstractCodec::terminated() { return !isOpen(); } -bool BlockingAbstractCodec::isOpen() { +bool BlockingSocketAbstractCodec::isOpen() { return _isOpen.get(); } // NOTE: must not be called from constructor (e.g. needs shared_from_this()) -void BlockingAbstractCodec::start() { +void BlockingSocketAbstractCodec::start() { _readThread.start(); @@ -1129,7 +1111,7 @@ void BlockingAbstractCodec::start() { } -void BlockingAbstractCodec::receiveThread() +void BlockingSocketAbstractCodec::receiveThread() { Transport::shared_pointer ptr = this->shared_from_this(); @@ -1152,7 +1134,7 @@ void BlockingAbstractCodec::receiveThread() } -void BlockingAbstractCodec::sendThread() +void BlockingSocketAbstractCodec::sendThread() { Transport::shared_pointer ptr = this->shared_from_this(); @@ -1178,7 +1160,7 @@ void BlockingAbstractCodec::sendThread() } -void BlockingAbstractCodec::sendBufferFull(int tries) { +void BlockingSocketAbstractCodec::sendBufferFull(int tries) { // TODO constants epicsThreadSleep(std::max(tries * 0.1, 1)); } @@ -1196,17 +1178,28 @@ BlockingSocketAbstractCodec::BlockingSocketAbstractCodec( bool serverFlag, SOCKET channel, int32_t sendBufferSize, - int32_t receiveBufferSize): - BlockingAbstractCodec( - serverFlag, - std::tr1::shared_ptr(new ByteBuffer((std::max((std::size_t)( - MAX_TCP_RECV + MAX_ENSURE_DATA_BUFFER_SIZE), receiveBufferSize) + - (PVA_ALIGNMENT - 1)) & (~(PVA_ALIGNMENT - 1)))), - std::tr1::shared_ptr(new ByteBuffer((std::max((std::size_t)( MAX_TCP_RECV + - MAX_ENSURE_DATA_BUFFER_SIZE), receiveBufferSize) + (PVA_ALIGNMENT - 1)) - & (~(PVA_ALIGNMENT - 1)))), sendBufferSize), - _channel(channel) + int32_t receiveBufferSize) + :AbstractCodec( + serverFlag, + std::tr1::shared_ptr(new ByteBuffer((std::max((std::size_t)( + MAX_TCP_RECV + MAX_ENSURE_DATA_BUFFER_SIZE), receiveBufferSize) + + (PVA_ALIGNMENT - 1)) & (~(PVA_ALIGNMENT - 1)))), + std::tr1::shared_ptr(new ByteBuffer((std::max((std::size_t)( MAX_TCP_RECV + + MAX_ENSURE_DATA_BUFFER_SIZE), receiveBufferSize) + (PVA_ALIGNMENT - 1)) + & (~(PVA_ALIGNMENT - 1)))), sendBufferSize, + true) + ,_readThread(epics::pvData::Thread::Config(this, &BlockingSocketAbstractCodec::receiveThread) + .prio(epicsThreadPriorityCAServerLow) + .name("TCP-rx") + .autostart(false)) + ,_sendThread(epics::pvData::Thread::Config(this, &BlockingSocketAbstractCodec::sendThread) + .prio(epicsThreadPriorityCAServerLow) + .name("TCP-tx") + .autostart(false)) + ,_channel(channel) { + _isOpen.getAndSet(true); + // get remote address osiSocklen_t saSize = sizeof(sockaddr); int retval = getpeername(_channel, &(_socketAddress.sa), &saSize); diff --git a/src/remote/pv/codec.h b/src/remote/pv/codec.h index 13ac168..6c69b63 100644 --- a/src/remote/pv/codec.h +++ b/src/remote/pv/codec.h @@ -293,21 +293,23 @@ private: }; -class epicsShareClass BlockingAbstractCodec: +/////////////////////////////////////////////////////////////////////////////////////////////////////////// + +class epicsShareClass BlockingSocketAbstractCodec: public AbstractCodec, - public std::tr1::enable_shared_from_this + public std::tr1::enable_shared_from_this { public: - POINTER_DEFINITIONS(BlockingAbstractCodec); + POINTER_DEFINITIONS(BlockingSocketAbstractCodec); - BlockingAbstractCodec( - bool serverFlag, - std::tr1::shared_ptr const & receiveBuffer, - std::tr1::shared_ptr const & sendBuffer, - int32_t socketSendBufferSize); - virtual ~BlockingAbstractCodec(); + BlockingSocketAbstractCodec( + bool serverFlag, + SOCKET channel, + int32_t sendBufferSize, + int32_t receiveBufferSize); + virtual ~BlockingSocketAbstractCodec(); virtual void readPollOne() OVERRIDE FINAL; virtual void writePollOne() OVERRIDE FINAL; @@ -318,13 +320,21 @@ public: virtual bool isOpen() OVERRIDE FINAL; void start(); + virtual int read(epics::pvData::ByteBuffer* dst) OVERRIDE FINAL; + virtual int write(epics::pvData::ByteBuffer* src) OVERRIDE FINAL; + virtual const osiSockAddr* getLastReadBufferSocketAddress() OVERRIDE FINAL { + return &_socketAddress; + } + virtual void invalidDataStreamHandler() OVERRIDE FINAL; + virtual std::size_t getSocketReceiveBufferSize() const OVERRIDE FINAL; + private: void receiveThread(); void sendThread(); protected: virtual void sendBufferFull(int tries) OVERRIDE FINAL; - virtual void internalDestroy() = 0; + virtual void internalDestroy(); /** * Called to any resources just before closing transport @@ -344,39 +354,12 @@ private: AtomicValue _isOpen; epics::pvData::Thread _readThread, _sendThread; epics::pvData::Event _shutdownEvent; -}; - - -class epicsShareClass BlockingSocketAbstractCodec: - public BlockingAbstractCodec -{ - -public: - - BlockingSocketAbstractCodec( - bool serverFlag, - SOCKET channel, - int32_t sendBufferSize, - int32_t receiveBufferSize); - - virtual int read(epics::pvData::ByteBuffer* dst) OVERRIDE FINAL; - virtual int write(epics::pvData::ByteBuffer* src) OVERRIDE FINAL; - virtual const osiSockAddr* getLastReadBufferSocketAddress() OVERRIDE FINAL { - return &_socketAddress; - } - virtual void invalidDataStreamHandler() OVERRIDE FINAL; - virtual std::size_t getSocketReceiveBufferSize() const OVERRIDE FINAL; - protected: - - virtual void internalDestroy() OVERRIDE; - SOCKET _channel; osiSockAddr _socketAddress; std::string _socketName; }; - class BlockingTCPTransportCodec : public BlockingSocketAbstractCodec, public SecurityPluginControl @@ -543,6 +526,7 @@ private: }; +/////////////////////////////////////////////////////////////////////////////////////////////////////////// class epicsShareClass BlockingServerTCPTransportCodec : public BlockingTCPTransportCodec,