collapse BlockingSocketAbstractCodec -> BlockingTCPTransportCodec

This commit is contained in:
Michael Davidsaver
2017-05-15 16:50:26 -04:00
parent e47124aa30
commit ab4f0b7e3a
2 changed files with 78 additions and 114 deletions

View File

@@ -1047,24 +1047,24 @@ bool AbstractCodec::directDeserialize(ByteBuffer *existingBuffer, char* deserial
//
//
BlockingSocketAbstractCodec::~BlockingSocketAbstractCodec()
BlockingTCPTransportCodec::~BlockingTCPTransportCodec()
{
assert(!_isOpen.get());
_sendThread.exitWait();
_readThread.exitWait();
}
void BlockingSocketAbstractCodec::readPollOne() {
void BlockingTCPTransportCodec::readPollOne() {
throw std::logic_error("should not be called for blocking IO");
}
void BlockingSocketAbstractCodec::writePollOne() {
void BlockingTCPTransportCodec::writePollOne() {
throw std::logic_error("should not be called for blocking IO");
}
void BlockingSocketAbstractCodec::close() {
void BlockingTCPTransportCodec::close() {
if (_isOpen.getAndSet(false))
{
@@ -1083,26 +1083,37 @@ void BlockingSocketAbstractCodec::close() {
}
}
void BlockingSocketAbstractCodec::internalClose(bool /*force*/)
void BlockingTCPTransportCodec::internalClose(bool /*force*/)
{
this->internalDestroy();
// TODO sync
if (_securitySession)
_securitySession->close();
if (IS_LOGGABLE(logLevelDebug))
{
LOG(logLevelDebug,
"TCP socket to %s is to be closed.",
inetAddressToString(_socketAddress).c_str());
}
}
void BlockingSocketAbstractCodec::internalPostClose(bool /*force*/) {
void BlockingTCPTransportCodec::internalPostClose(bool /*force*/) {
}
bool BlockingSocketAbstractCodec::terminated() {
bool BlockingTCPTransportCodec::terminated() {
return !isOpen();
}
bool BlockingSocketAbstractCodec::isOpen() {
bool BlockingTCPTransportCodec::isOpen() {
return _isOpen.get();
}
// NOTE: must not be called from constructor (e.g. needs shared_from_this())
void BlockingSocketAbstractCodec::start() {
void BlockingTCPTransportCodec::start() {
_readThread.start();
@@ -1111,7 +1122,7 @@ void BlockingSocketAbstractCodec::start() {
}
void BlockingSocketAbstractCodec::receiveThread()
void BlockingTCPTransportCodec::receiveThread()
{
Transport::shared_pointer ptr = this->shared_from_this();
@@ -1134,7 +1145,7 @@ void BlockingSocketAbstractCodec::receiveThread()
}
void BlockingSocketAbstractCodec::sendThread()
void BlockingTCPTransportCodec::sendThread()
{
Transport::shared_pointer ptr = this->shared_from_this();
@@ -1160,7 +1171,7 @@ void BlockingSocketAbstractCodec::sendThread()
}
void BlockingSocketAbstractCodec::sendBufferFull(int tries) {
void BlockingTCPTransportCodec::sendBufferFull(int tries) {
// TODO constants
epicsThreadSleep(std::max<double>(tries * 0.1, 1));
}
@@ -1168,17 +1179,16 @@ void BlockingSocketAbstractCodec::sendBufferFull(int tries) {
//
//
// BlockingSocketAbstractCodec
// BlockingTCPTransportCodec
//
//
//
BlockingSocketAbstractCodec::BlockingSocketAbstractCodec(
bool serverFlag,
SOCKET channel,
BlockingTCPTransportCodec::BlockingTCPTransportCodec(bool serverFlag, const Context::shared_pointer &context,
SOCKET channel, const ResponseHandler::shared_pointer &responseHandler,
int32_t sendBufferSize,
int32_t receiveBufferSize)
int32_t receiveBufferSize, int16 priority)
:AbstractCodec(
serverFlag,
std::tr1::shared_ptr<epics::pvData::ByteBuffer>(new ByteBuffer((std::max<std::size_t>((std::size_t)(
@@ -1188,15 +1198,19 @@ BlockingSocketAbstractCodec::BlockingSocketAbstractCodec(
MAX_ENSURE_DATA_BUFFER_SIZE), receiveBufferSize) + (PVA_ALIGNMENT - 1))
& (~(PVA_ALIGNMENT - 1)))), sendBufferSize,
true)
,_readThread(epics::pvData::Thread::Config(this, &BlockingSocketAbstractCodec::receiveThread)
,_readThread(epics::pvData::Thread::Config(this, &BlockingTCPTransportCodec::receiveThread)
.prio(epicsThreadPriorityCAServerLow)
.name("TCP-rx")
.autostart(false))
,_sendThread(epics::pvData::Thread::Config(this, &BlockingSocketAbstractCodec::sendThread)
,_sendThread(epics::pvData::Thread::Config(this, &BlockingTCPTransportCodec::sendThread)
.prio(epicsThreadPriorityCAServerLow)
.name("TCP-tx")
.autostart(false))
,_channel(channel)
,_context(context), _responseHandler(responseHandler)
,_remoteTransportReceiveBufferSize(MAX_TCP_RECV)
,_remoteTransportRevision(0), _priority(priority)
,_verified(false)
{
_isOpen.getAndSet(true);
@@ -1219,7 +1233,7 @@ BlockingSocketAbstractCodec::BlockingSocketAbstractCodec(
}
// must be called only once, when there will be no operation on socket (e.g. just before tx/rx thread exists)
void BlockingSocketAbstractCodec::internalDestroy() {
void BlockingTCPTransportCodec::internalDestroy() {
if(_channel != INVALID_SOCKET) {
@@ -1255,15 +1269,17 @@ void BlockingSocketAbstractCodec::internalDestroy() {
_channel = INVALID_SOCKET; //TODO: mutex to guard _channel
}
Transport::shared_pointer thisSharedPtr = this->shared_from_this();
_context->getTransportRegistry()->remove(thisSharedPtr);
}
void BlockingSocketAbstractCodec::invalidDataStreamHandler() {
void BlockingTCPTransportCodec::invalidDataStreamHandler() {
close();
}
int BlockingSocketAbstractCodec::write(
int BlockingTCPTransportCodec::write(
epics::pvData::ByteBuffer *src) {
std::size_t remaining;
@@ -1300,7 +1316,7 @@ int BlockingSocketAbstractCodec::write(
}
std::size_t BlockingSocketAbstractCodec::getSocketReceiveBufferSize()
std::size_t BlockingTCPTransportCodec::getSocketReceiveBufferSize()
const {
osiSocklen_t intLen = sizeof(int);
@@ -1321,7 +1337,7 @@ const {
}
int BlockingSocketAbstractCodec::read(epics::pvData::ByteBuffer* dst) {
int BlockingTCPTransportCodec::read(epics::pvData::ByteBuffer* dst) {
std::size_t remaining;
while((remaining=dst->getRemaining()) > 0) {
@@ -1366,22 +1382,6 @@ int BlockingSocketAbstractCodec::read(epics::pvData::ByteBuffer* dst) {
}
void BlockingTCPTransportCodec::internalClose(bool force) {
BlockingSocketAbstractCodec::internalClose(force);
// TODO sync
if (_securitySession)
_securitySession->close();
if (IS_LOGGABLE(logLevelDebug))
{
LOG(logLevelDebug,
"TCP socket to %s is to be closed.",
inetAddressToString(_socketAddress).c_str());
}
}
bool BlockingTCPTransportCodec::verify(epics::pvData::int32 timeoutMs) {
return _verifiedEvent.wait(timeoutMs/1000.0) && _verified;
}