diff --git a/src/remote/blockingTCP.h b/src/remote/blockingTCP.h index cc14af3..1a314da 100644 --- a/src/remote/blockingTCP.h +++ b/src/remote/blockingTCP.h @@ -112,7 +112,7 @@ namespace epics { * @author Matej Sekoranja * @version $Id: BlockingTCPAcceptor.java,v 1.1 2010/05/03 14:45:42 mrkraimer Exp $ */ - class BlockingTCPAcceptor { + class BlockingTCPAcceptor : public epicsThreadRunable { public: POINTER_DEFINITIONS(BlockingTCPAcceptor); @@ -128,8 +128,6 @@ namespace epics { virtual ~BlockingTCPAcceptor(); - void handleEvents(); - /** * Bind socket address. * @return bind socket address, null if not binded. @@ -144,6 +142,8 @@ namespace epics { void destroy(); private: + virtual void run(); + /** * Context instance. */ @@ -176,7 +176,7 @@ namespace epics { epics::pvData::Mutex _mutex; - epicsThreadId _threadId; + epicsThread _thread; /** * Initialize connection acception. @@ -189,8 +189,6 @@ namespace epics { * @return true on success. */ bool validateConnection(Transport::shared_pointer const & transport, const char* address); - - static void handleEventsRunner(void* param); }; } diff --git a/src/remote/blockingTCPAcceptor.cpp b/src/remote/blockingTCPAcceptor.cpp index 32ce611..eb6ea45 100644 --- a/src/remote/blockingTCPAcceptor.cpp +++ b/src/remote/blockingTCPAcceptor.cpp @@ -33,7 +33,10 @@ namespace pvAccess { _serverSocketChannel(INVALID_SOCKET), _receiveBufferSize(receiveBufferSize), _destroyed(false), - _threadId(0) + _thread(*this, "TCP-acceptor", + epicsThreadGetStackSize( + epicsThreadStackMedium), + epicsThreadPriorityMedium) { initialize(port); } @@ -118,14 +121,7 @@ namespace pvAccess { THROW_BASE_EXCEPTION(temp.str().c_str()); } - _threadId - = epicsThreadCreate( - "TCP-acceptor", - epicsThreadPriorityMedium, - epicsThreadGetStackSize( - epicsThreadStackMedium), - BlockingTCPAcceptor::handleEventsRunner, - this); + _thread.start(); // all OK, return return ntohs(_bindAddress.ia.sin_port); @@ -139,7 +135,7 @@ namespace pvAccess { THROW_BASE_EXCEPTION(temp.str().c_str()); } - void BlockingTCPAcceptor::handleEvents() { + void BlockingTCPAcceptor::run() { // rise level if port is assigned dynamically char ipAddrStr[48]; ipAddrToDottedIP(&_bindAddress.ia, ipAddrStr, sizeof(ipAddrStr)); @@ -235,10 +231,6 @@ namespace pvAccess { } } - void BlockingTCPAcceptor::handleEventsRunner(void* param) { - ((BlockingTCPAcceptor*)param)->handleEvents(); - } - void BlockingTCPAcceptor::destroy() { Lock guard(_mutex); if(_destroyed) return; diff --git a/src/remote/blockingUDP.h b/src/remote/blockingUDP.h index e14cf98..d8ab0ea 100644 --- a/src/remote/blockingUDP.h +++ b/src/remote/blockingUDP.h @@ -40,7 +40,8 @@ namespace epics { class BlockingUDPTransport : public epics::pvData::NoDefaultMethods, public Transport, public TransportSendControl, - public std::tr1::enable_shared_from_this + public std::tr1::enable_shared_from_this, + public epicsThreadRunable { public: POINTER_DEFINITIONS(BlockingUDPTransport); @@ -305,11 +306,9 @@ namespace epics { */ const std::auto_ptr _responseHandler; - virtual void processRead(); + virtual void run(); private: - static void threadRunner(void* param); - bool processBuffer(Transport::shared_pointer const & transport, osiSockAddr& fromAddress, epics::pvData::ByteBuffer* receiveBuffer); void close(bool waitForThreadToComplete); @@ -374,7 +373,7 @@ namespace epics { /** * Thread ID */ - epicsThreadId _threadId; + std::auto_ptr _thread; epics::pvData::int8 _clientServerWithEndianFlag; diff --git a/src/remote/blockingUDPTransport.cpp b/src/remote/blockingUDPTransport.cpp index 9723853..e735f3e 100644 --- a/src/remote/blockingUDPTransport.cpp +++ b/src/remote/blockingUDPTransport.cpp @@ -55,7 +55,6 @@ inline int sendto(int s, const char *buf, size_t len, int flags, const struct so _receiveBuffer(new ByteBuffer(MAX_UDP_RECV)), _sendBuffer(new ByteBuffer(MAX_UDP_RECV)), _lastMessageStartPosition(0), - _threadId(0), _clientServerWithEndianFlag( (serverFlag ? 0x40 : 0x00) | ((EPICS_BYTE_ORDER == EPICS_ENDIAN_BIG) ? 0x80 : 0x00)) { @@ -95,10 +94,10 @@ inline int sendto(int s, const char *buf, size_t len, int flags, const struct so LOG(logLevelTrace, "Starting thread: %s.", threadName.c_str()); } - _threadId = epicsThreadCreate(threadName.c_str(), - epicsThreadPriorityMedium, - epicsThreadGetStackSize(epicsThreadStackSmall), - BlockingUDPTransport::threadRunner, this); + _thread.reset(new epicsThread(*this, threadName.c_str(), + epicsThreadGetStackSize(epicsThreadStackSmall), + epicsThreadPriorityMedium)); + _thread->start(); } void BlockingUDPTransport::close() { @@ -203,7 +202,7 @@ inline int sendto(int s, const char *buf, size_t len, int flags, const struct so _sendBuffer->getPosition()-_lastMessageStartPosition-PVA_MESSAGE_HEADER_SIZE); } - void BlockingUDPTransport::processRead() { + void BlockingUDPTransport::run() { // This function is always called from only one thread - this // object's own thread. @@ -420,10 +419,6 @@ inline int sendto(int s, const char *buf, size_t len, int flags, const struct so return (size_t)sockBufSize; } - void BlockingUDPTransport::threadRunner(void* param) { - ((BlockingUDPTransport*)param)->processRead(); - } - void BlockingUDPTransport::join(const osiSockAddr & mcastAddr, const osiSockAddr & nifAddr) { diff --git a/src/remote/codec.cpp b/src/remote/codec.cpp index 4bb1cb5..4b6fee0 100644 --- a/src/remote/codec.cpp +++ b/src/remote/codec.cpp @@ -1027,6 +1027,22 @@ namespace epics { // // + BlockingAbstractCodec::BlockingAbstractCodec( + bool serverFlag, + std::tr1::shared_ptr const & receiveBuffer, + std::tr1::shared_ptr const & sendBuffer, + int32_t socketSendBufferSize) + :AbstractCodec(serverFlag, receiveBuffer, sendBuffer, socketSendBufferSize, true) + ,_readThread(Thread::Config(this, &BlockingAbstractCodec::receiveThread) + .prio(epicsThreadPriorityCAServerLow) + .name("TCP-rx") + .autostart(false)) + ,_sendThread(Thread::Config(this, &BlockingAbstractCodec::sendThread) + .prio(epicsThreadPriorityCAServerLow) + .name("TCP-tx") + .autostart(false)) + { _isOpen.getAndSet(true);} + void BlockingAbstractCodec::readPollOne() { throw std::logic_error("should not be called for blocking IO"); } @@ -1076,35 +1092,21 @@ namespace epics { // NOTE: must not be called from constructor (e.g. needs shared_from_this()) void BlockingAbstractCodec::start() { - _readThread = epicsThreadCreate( - "BlockingAbstractCodec-readThread", - epicsThreadPriorityMedium, - epicsThreadGetStackSize( - epicsThreadStackMedium), - BlockingAbstractCodec::receiveThread, - this); + _readThread.start(); - _sendThread = epicsThreadCreate( - "BlockingAbstractCodec-_sendThread", - epicsThreadPriorityMedium, - epicsThreadGetStackSize( - epicsThreadStackMedium), - BlockingAbstractCodec::sendThread, - this); + _sendThread.start(); } - void BlockingAbstractCodec::receiveThread(void *param) + void BlockingAbstractCodec::receiveThread() { + Transport::shared_pointer ptr = this->shared_from_this(); - BlockingAbstractCodec *bac = static_cast(param); - Transport::shared_pointer ptr = bac->shared_from_this(); - - while (bac->isOpen()) + while (this->isOpen()) { try { - bac->processRead(); + this->processRead(); } catch (std::exception &e) { LOG(logLevelError, "an exception caught while in receiveThread at %s:%d: %s", @@ -1116,22 +1118,20 @@ namespace epics { } } - bac->_shutdownEvent.signal(); + this->_shutdownEvent.signal(); } - void BlockingAbstractCodec::sendThread(void *param) + void BlockingAbstractCodec::sendThread() { + Transport::shared_pointer ptr = this->shared_from_this(); - BlockingAbstractCodec *bac = static_cast(param); - Transport::shared_pointer ptr = bac->shared_from_this(); + this->setSenderThread(); - bac->setSenderThread(); - - while (bac->isOpen()) + while (this->isOpen()) { try { - bac->processWrite(); + this->processWrite(); } catch (connection_closed_exception &cce) { // noop } catch (std::exception &e) { @@ -1155,7 +1155,7 @@ namespace epics { */ // call internal destroy - bac->internalDestroy(); + this->internalDestroy(); } diff --git a/src/remote/codec.h b/src/remote/codec.h index c45174b..a6c88c8 100644 --- a/src/remote/codec.h +++ b/src/remote/codec.h @@ -395,9 +395,7 @@ namespace epics { bool serverFlag, std::tr1::shared_ptr const & receiveBuffer, std::tr1::shared_ptr const & sendBuffer, - int32_t socketSendBufferSize): - AbstractCodec(serverFlag, receiveBuffer, sendBuffer, socketSendBufferSize, true), - _readThread(0), _sendThread(0) { _isOpen.getAndSet(true);} + int32_t socketSendBufferSize); void readPollOne(); void writePollOne(); @@ -408,8 +406,9 @@ namespace epics { bool isOpen(); void start(); - static void receiveThread(void* param); - static void sendThread(void* param); + private: + void receiveThread(); + void sendThread(); protected: void sendBufferFull(int tries); @@ -431,8 +430,7 @@ namespace epics { private: AtomicValue _isOpen; - volatile epicsThreadId _readThread; - volatile epicsThreadId _sendThread; + epics::pvData::Thread _readThread, _sendThread; epics::pvData::Event _shutdownEvent; };