collapse BlockingAbstractCodec into BlockingSocketAbstractCodec

This commit is contained in:
Michael Davidsaver
2017-05-15 16:26:56 -04:00
parent 5428b1ed2d
commit e47124aa30
2 changed files with 54 additions and 77 deletions

View File

@@ -1047,42 +1047,24 @@ bool AbstractCodec::directDeserialize(ByteBuffer *existingBuffer, char* deserial
//
//
BlockingAbstractCodec::BlockingAbstractCodec(
bool serverFlag,
std::tr1::shared_ptr<epics::pvData::ByteBuffer> const & receiveBuffer,
std::tr1::shared_ptr<epics::pvData::ByteBuffer> const & sendBuffer,
int32_t socketSendBufferSize)
:AbstractCodec(serverFlag, receiveBuffer, sendBuffer, socketSendBufferSize, true)
,_readThread(epics::pvData::Thread::Config(this, &BlockingAbstractCodec::receiveThread)
.prio(epicsThreadPriorityCAServerLow)
.name("TCP-rx")
.autostart(false))
,_sendThread(epics::pvData::Thread::Config(this, &BlockingAbstractCodec::sendThread)
.prio(epicsThreadPriorityCAServerLow)
.name("TCP-tx")
.autostart(false))
{
_isOpen.getAndSet(true);
}
BlockingAbstractCodec::~BlockingAbstractCodec()
BlockingSocketAbstractCodec::~BlockingSocketAbstractCodec()
{
assert(!_isOpen.get());
_sendThread.exitWait();
_readThread.exitWait();
}
void BlockingAbstractCodec::readPollOne() {
void BlockingSocketAbstractCodec::readPollOne() {
throw std::logic_error("should not be called for blocking IO");
}
void BlockingAbstractCodec::writePollOne() {
void BlockingSocketAbstractCodec::writePollOne() {
throw std::logic_error("should not be called for blocking IO");
}
void BlockingAbstractCodec::close() {
void BlockingSocketAbstractCodec::close() {
if (_isOpen.getAndSet(false))
{
@@ -1101,26 +1083,26 @@ void BlockingAbstractCodec::close() {
}
}
void BlockingAbstractCodec::internalClose(bool /*force*/)
void BlockingSocketAbstractCodec::internalClose(bool /*force*/)
{
this->internalDestroy();
}
void BlockingAbstractCodec::internalPostClose(bool /*force*/) {
void BlockingSocketAbstractCodec::internalPostClose(bool /*force*/) {
}
bool BlockingAbstractCodec::terminated() {
bool BlockingSocketAbstractCodec::terminated() {
return !isOpen();
}
bool BlockingAbstractCodec::isOpen() {
bool BlockingSocketAbstractCodec::isOpen() {
return _isOpen.get();
}
// NOTE: must not be called from constructor (e.g. needs shared_from_this())
void BlockingAbstractCodec::start() {
void BlockingSocketAbstractCodec::start() {
_readThread.start();
@@ -1129,7 +1111,7 @@ void BlockingAbstractCodec::start() {
}
void BlockingAbstractCodec::receiveThread()
void BlockingSocketAbstractCodec::receiveThread()
{
Transport::shared_pointer ptr = this->shared_from_this();
@@ -1152,7 +1134,7 @@ void BlockingAbstractCodec::receiveThread()
}
void BlockingAbstractCodec::sendThread()
void BlockingSocketAbstractCodec::sendThread()
{
Transport::shared_pointer ptr = this->shared_from_this();
@@ -1178,7 +1160,7 @@ void BlockingAbstractCodec::sendThread()
}
void BlockingAbstractCodec::sendBufferFull(int tries) {
void BlockingSocketAbstractCodec::sendBufferFull(int tries) {
// TODO constants
epicsThreadSleep(std::max<double>(tries * 0.1, 1));
}
@@ -1196,17 +1178,28 @@ BlockingSocketAbstractCodec::BlockingSocketAbstractCodec(
bool serverFlag,
SOCKET channel,
int32_t sendBufferSize,
int32_t receiveBufferSize):
BlockingAbstractCodec(
serverFlag,
std::tr1::shared_ptr<epics::pvData::ByteBuffer>(new ByteBuffer((std::max<std::size_t>((std::size_t)(
MAX_TCP_RECV + MAX_ENSURE_DATA_BUFFER_SIZE), receiveBufferSize) +
(PVA_ALIGNMENT - 1)) & (~(PVA_ALIGNMENT - 1)))),
std::tr1::shared_ptr<epics::pvData::ByteBuffer>(new ByteBuffer((std::max<std::size_t>((std::size_t)( MAX_TCP_RECV +
MAX_ENSURE_DATA_BUFFER_SIZE), receiveBufferSize) + (PVA_ALIGNMENT - 1))
& (~(PVA_ALIGNMENT - 1)))), sendBufferSize),
_channel(channel)
int32_t receiveBufferSize)
:AbstractCodec(
serverFlag,
std::tr1::shared_ptr<epics::pvData::ByteBuffer>(new ByteBuffer((std::max<std::size_t>((std::size_t)(
MAX_TCP_RECV + MAX_ENSURE_DATA_BUFFER_SIZE), receiveBufferSize) +
(PVA_ALIGNMENT - 1)) & (~(PVA_ALIGNMENT - 1)))),
std::tr1::shared_ptr<epics::pvData::ByteBuffer>(new ByteBuffer((std::max<std::size_t>((std::size_t)( MAX_TCP_RECV +
MAX_ENSURE_DATA_BUFFER_SIZE), receiveBufferSize) + (PVA_ALIGNMENT - 1))
& (~(PVA_ALIGNMENT - 1)))), sendBufferSize,
true)
,_readThread(epics::pvData::Thread::Config(this, &BlockingSocketAbstractCodec::receiveThread)
.prio(epicsThreadPriorityCAServerLow)
.name("TCP-rx")
.autostart(false))
,_sendThread(epics::pvData::Thread::Config(this, &BlockingSocketAbstractCodec::sendThread)
.prio(epicsThreadPriorityCAServerLow)
.name("TCP-tx")
.autostart(false))
,_channel(channel)
{
_isOpen.getAndSet(true);
// get remote address
osiSocklen_t saSize = sizeof(sockaddr);
int retval = getpeername(_channel, &(_socketAddress.sa), &saSize);

View File

@@ -293,21 +293,23 @@ private:
};
class epicsShareClass BlockingAbstractCodec:
///////////////////////////////////////////////////////////////////////////////////////////////////////////
class epicsShareClass BlockingSocketAbstractCodec:
public AbstractCodec,
public std::tr1::enable_shared_from_this<BlockingAbstractCodec>
public std::tr1::enable_shared_from_this<BlockingSocketAbstractCodec>
{
public:
POINTER_DEFINITIONS(BlockingAbstractCodec);
POINTER_DEFINITIONS(BlockingSocketAbstractCodec);
BlockingAbstractCodec(
bool serverFlag,
std::tr1::shared_ptr<epics::pvData::ByteBuffer> const & receiveBuffer,
std::tr1::shared_ptr<epics::pvData::ByteBuffer> const & sendBuffer,
int32_t socketSendBufferSize);
virtual ~BlockingAbstractCodec();
BlockingSocketAbstractCodec(
bool serverFlag,
SOCKET channel,
int32_t sendBufferSize,
int32_t receiveBufferSize);
virtual ~BlockingSocketAbstractCodec();
virtual void readPollOne() OVERRIDE FINAL;
virtual void writePollOne() OVERRIDE FINAL;
@@ -318,13 +320,21 @@ public:
virtual bool isOpen() OVERRIDE FINAL;
void start();
virtual int read(epics::pvData::ByteBuffer* dst) OVERRIDE FINAL;
virtual int write(epics::pvData::ByteBuffer* src) OVERRIDE FINAL;
virtual const osiSockAddr* getLastReadBufferSocketAddress() OVERRIDE FINAL {
return &_socketAddress;
}
virtual void invalidDataStreamHandler() OVERRIDE FINAL;
virtual std::size_t getSocketReceiveBufferSize() const OVERRIDE FINAL;
private:
void receiveThread();
void sendThread();
protected:
virtual void sendBufferFull(int tries) OVERRIDE FINAL;
virtual void internalDestroy() = 0;
virtual void internalDestroy();
/**
* Called to any resources just before closing transport
@@ -344,39 +354,12 @@ private:
AtomicValue<bool> _isOpen;
epics::pvData::Thread _readThread, _sendThread;
epics::pvData::Event _shutdownEvent;
};
class epicsShareClass BlockingSocketAbstractCodec:
public BlockingAbstractCodec
{
public:
BlockingSocketAbstractCodec(
bool serverFlag,
SOCKET channel,
int32_t sendBufferSize,
int32_t receiveBufferSize);
virtual int read(epics::pvData::ByteBuffer* dst) OVERRIDE FINAL;
virtual int write(epics::pvData::ByteBuffer* src) OVERRIDE FINAL;
virtual const osiSockAddr* getLastReadBufferSocketAddress() OVERRIDE FINAL {
return &_socketAddress;
}
virtual void invalidDataStreamHandler() OVERRIDE FINAL;
virtual std::size_t getSocketReceiveBufferSize() const OVERRIDE FINAL;
protected:
virtual void internalDestroy() OVERRIDE;
SOCKET _channel;
osiSockAddr _socketAddress;
std::string _socketName;
};
class BlockingTCPTransportCodec :
public BlockingSocketAbstractCodec,
public SecurityPluginControl
@@ -543,6 +526,7 @@ private:
};
///////////////////////////////////////////////////////////////////////////////////////////////////////////
class epicsShareClass BlockingServerTCPTransportCodec :
public BlockingTCPTransportCodec,