use epicsThread and Thread::Config

Catches errant c++ exceptions and is joinable
This commit is contained in:
Michael Davidsaver
2015-09-08 12:25:19 -04:00
parent b4048c3bfd
commit 6254525cba
6 changed files with 53 additions and 71 deletions
+4 -6
View File
@@ -112,7 +112,7 @@ namespace epics {
* @author <a href="mailto:matej.sekoranjaATcosylab.com">Matej Sekoranja</a>
* @version $Id: BlockingTCPAcceptor.java,v 1.1 2010/05/03 14:45:42 mrkraimer Exp $
*/
class BlockingTCPAcceptor {
class BlockingTCPAcceptor : public epicsThreadRunable {
public:
POINTER_DEFINITIONS(BlockingTCPAcceptor);
@@ -128,8 +128,6 @@ namespace epics {
virtual ~BlockingTCPAcceptor();
void handleEvents();
/**
* Bind socket address.
* @return bind socket address, <code>null</code> if not binded.
@@ -144,6 +142,8 @@ namespace epics {
void destroy();
private:
virtual void run();
/**
* Context instance.
*/
@@ -176,7 +176,7 @@ namespace epics {
epics::pvData::Mutex _mutex;
epicsThreadId _threadId;
epicsThread _thread;
/**
* Initialize connection acception.
@@ -189,8 +189,6 @@ namespace epics {
* @return <code>true</code> on success.
*/
bool validateConnection(Transport::shared_pointer const & transport, const char* address);
static void handleEventsRunner(void* param);
};
}
+6 -14
View File
@@ -33,7 +33,10 @@ namespace pvAccess {
_serverSocketChannel(INVALID_SOCKET),
_receiveBufferSize(receiveBufferSize),
_destroyed(false),
_threadId(0)
_thread(*this, "TCP-acceptor",
epicsThreadGetStackSize(
epicsThreadStackMedium),
epicsThreadPriorityMedium)
{
initialize(port);
}
@@ -118,14 +121,7 @@ namespace pvAccess {
THROW_BASE_EXCEPTION(temp.str().c_str());
}
_threadId
= epicsThreadCreate(
"TCP-acceptor",
epicsThreadPriorityMedium,
epicsThreadGetStackSize(
epicsThreadStackMedium),
BlockingTCPAcceptor::handleEventsRunner,
this);
_thread.start();
// all OK, return
return ntohs(_bindAddress.ia.sin_port);
@@ -139,7 +135,7 @@ namespace pvAccess {
THROW_BASE_EXCEPTION(temp.str().c_str());
}
void BlockingTCPAcceptor::handleEvents() {
void BlockingTCPAcceptor::run() {
// rise level if port is assigned dynamically
char ipAddrStr[48];
ipAddrToDottedIP(&_bindAddress.ia, ipAddrStr, sizeof(ipAddrStr));
@@ -235,10 +231,6 @@ namespace pvAccess {
}
}
void BlockingTCPAcceptor::handleEventsRunner(void* param) {
((BlockingTCPAcceptor*)param)->handleEvents();
}
void BlockingTCPAcceptor::destroy() {
Lock guard(_mutex);
if(_destroyed) return;
+4 -5
View File
@@ -40,7 +40,8 @@ namespace epics {
class BlockingUDPTransport : public epics::pvData::NoDefaultMethods,
public Transport,
public TransportSendControl,
public std::tr1::enable_shared_from_this<BlockingUDPTransport>
public std::tr1::enable_shared_from_this<BlockingUDPTransport>,
public epicsThreadRunable
{
public:
POINTER_DEFINITIONS(BlockingUDPTransport);
@@ -305,11 +306,9 @@ namespace epics {
*/
const std::auto_ptr<ResponseHandler> _responseHandler;
virtual void processRead();
virtual void run();
private:
static void threadRunner(void* param);
bool processBuffer(Transport::shared_pointer const & transport, osiSockAddr& fromAddress, epics::pvData::ByteBuffer* receiveBuffer);
void close(bool waitForThreadToComplete);
@@ -374,7 +373,7 @@ namespace epics {
/**
* Thread ID
*/
epicsThreadId _threadId;
std::auto_ptr<epicsThread> _thread;
epics::pvData::int8 _clientServerWithEndianFlag;
+5 -10
View File
@@ -55,7 +55,6 @@ inline int sendto(int s, const char *buf, size_t len, int flags, const struct so
_receiveBuffer(new ByteBuffer(MAX_UDP_RECV)),
_sendBuffer(new ByteBuffer(MAX_UDP_RECV)),
_lastMessageStartPosition(0),
_threadId(0),
_clientServerWithEndianFlag(
(serverFlag ? 0x40 : 0x00) | ((EPICS_BYTE_ORDER == EPICS_ENDIAN_BIG) ? 0x80 : 0x00))
{
@@ -95,10 +94,10 @@ inline int sendto(int s, const char *buf, size_t len, int flags, const struct so
LOG(logLevelTrace, "Starting thread: %s.", threadName.c_str());
}
_threadId = epicsThreadCreate(threadName.c_str(),
epicsThreadPriorityMedium,
epicsThreadGetStackSize(epicsThreadStackSmall),
BlockingUDPTransport::threadRunner, this);
_thread.reset(new epicsThread(*this, threadName.c_str(),
epicsThreadGetStackSize(epicsThreadStackSmall),
epicsThreadPriorityMedium));
_thread->start();
}
void BlockingUDPTransport::close() {
@@ -203,7 +202,7 @@ inline int sendto(int s, const char *buf, size_t len, int flags, const struct so
_sendBuffer->getPosition()-_lastMessageStartPosition-PVA_MESSAGE_HEADER_SIZE);
}
void BlockingUDPTransport::processRead() {
void BlockingUDPTransport::run() {
// This function is always called from only one thread - this
// object's own thread.
@@ -420,10 +419,6 @@ inline int sendto(int s, const char *buf, size_t len, int flags, const struct so
return (size_t)sockBufSize;
}
void BlockingUDPTransport::threadRunner(void* param) {
((BlockingUDPTransport*)param)->processRead();
}
void BlockingUDPTransport::join(const osiSockAddr & mcastAddr, const osiSockAddr & nifAddr)
{
+29 -29
View File
@@ -1027,6 +1027,22 @@ namespace epics {
//
//
BlockingAbstractCodec::BlockingAbstractCodec(
bool serverFlag,
std::tr1::shared_ptr<epics::pvData::ByteBuffer> const & receiveBuffer,
std::tr1::shared_ptr<epics::pvData::ByteBuffer> const & sendBuffer,
int32_t socketSendBufferSize)
:AbstractCodec(serverFlag, receiveBuffer, sendBuffer, socketSendBufferSize, true)
,_readThread(Thread::Config(this, &BlockingAbstractCodec::receiveThread)
.prio(epicsThreadPriorityCAServerLow)
.name("TCP-rx")
.autostart(false))
,_sendThread(Thread::Config(this, &BlockingAbstractCodec::sendThread)
.prio(epicsThreadPriorityCAServerLow)
.name("TCP-tx")
.autostart(false))
{ _isOpen.getAndSet(true);}
void BlockingAbstractCodec::readPollOne() {
throw std::logic_error("should not be called for blocking IO");
}
@@ -1076,35 +1092,21 @@ namespace epics {
// NOTE: must not be called from constructor (e.g. needs shared_from_this())
void BlockingAbstractCodec::start() {
_readThread = epicsThreadCreate(
"BlockingAbstractCodec-readThread",
epicsThreadPriorityMedium,
epicsThreadGetStackSize(
epicsThreadStackMedium),
BlockingAbstractCodec::receiveThread,
this);
_readThread.start();
_sendThread = epicsThreadCreate(
"BlockingAbstractCodec-_sendThread",
epicsThreadPriorityMedium,
epicsThreadGetStackSize(
epicsThreadStackMedium),
BlockingAbstractCodec::sendThread,
this);
_sendThread.start();
}
void BlockingAbstractCodec::receiveThread(void *param)
void BlockingAbstractCodec::receiveThread()
{
Transport::shared_pointer ptr = this->shared_from_this();
BlockingAbstractCodec *bac = static_cast<BlockingAbstractCodec *>(param);
Transport::shared_pointer ptr = bac->shared_from_this();
while (bac->isOpen())
while (this->isOpen())
{
try {
bac->processRead();
this->processRead();
} catch (std::exception &e) {
LOG(logLevelError,
"an exception caught while in receiveThread at %s:%d: %s",
@@ -1116,22 +1118,20 @@ namespace epics {
}
}
bac->_shutdownEvent.signal();
this->_shutdownEvent.signal();
}
void BlockingAbstractCodec::sendThread(void *param)
void BlockingAbstractCodec::sendThread()
{
Transport::shared_pointer ptr = this->shared_from_this();
BlockingAbstractCodec *bac = static_cast<BlockingAbstractCodec *>(param);
Transport::shared_pointer ptr = bac->shared_from_this();
this->setSenderThread();
bac->setSenderThread();
while (bac->isOpen())
while (this->isOpen())
{
try {
bac->processWrite();
this->processWrite();
} catch (connection_closed_exception &cce) {
// noop
} catch (std::exception &e) {
@@ -1155,7 +1155,7 @@ namespace epics {
*/
// call internal destroy
bac->internalDestroy();
this->internalDestroy();
}
+5 -7
View File
@@ -395,9 +395,7 @@ namespace epics {
bool serverFlag,
std::tr1::shared_ptr<epics::pvData::ByteBuffer> const & receiveBuffer,
std::tr1::shared_ptr<epics::pvData::ByteBuffer> const & sendBuffer,
int32_t socketSendBufferSize):
AbstractCodec(serverFlag, receiveBuffer, sendBuffer, socketSendBufferSize, true),
_readThread(0), _sendThread(0) { _isOpen.getAndSet(true);}
int32_t socketSendBufferSize);
void readPollOne();
void writePollOne();
@@ -408,8 +406,9 @@ namespace epics {
bool isOpen();
void start();
static void receiveThread(void* param);
static void sendThread(void* param);
private:
void receiveThread();
void sendThread();
protected:
void sendBufferFull(int tries);
@@ -431,8 +430,7 @@ namespace epics {
private:
AtomicValue<bool> _isOpen;
volatile epicsThreadId _readThread;
volatile epicsThreadId _sendThread;
epics::pvData::Thread _readThread, _sendThread;
epics::pvData::Event _shutdownEvent;
};