internalDestroy problem solved + memory leak fixed in abstract codec

This commit is contained in:
damjankumar
2014-02-13 15:52:02 +01:00
parent 5f0f7b9fde
commit 5ab2ead581
5 changed files with 78 additions and 42 deletions

View File

@@ -39,8 +39,8 @@ namespace epics {
const std::size_t AbstractCodec::MAX_ENSURE_DATA_BUFFER_SIZE = 1024;
AbstractCodec::AbstractCodec(
ByteBuffer *receiveBuffer,
ByteBuffer *sendBuffer,
std::tr1::shared_ptr<epics::pvData::ByteBuffer> receiveBuffer,
std::tr1::shared_ptr<epics::pvData::ByteBuffer> sendBuffer,
int32_t socketSendBufferSize,
bool blockingProcessQueue):
//PROTECTED
@@ -49,6 +49,8 @@ namespace epics {
_blockingProcessQueue(false), _senderThread(0),
_writeMode(PROCESS_SEND_QUEUE),
_writeOpReady(false),_lowLatency(false),
_socketBuffer(receiveBuffer),
_sendBuffer(sendBuffer),
//PRIVATE
_storedPayloadSize(0), _storedPosition(0), _startPosition(0),
_maxSendPayloadSize(0),
@@ -76,9 +78,6 @@ namespace epics {
throw std::invalid_argument(
"sendBuffer() % PVAConstants.PVA_ALIGNMENT != 0");
_socketBuffer.reset(receiveBuffer);
_sendBuffer.reset(sendBuffer);
// initialize to be empty
_socketBuffer->setPosition(_socketBuffer->getLimit());
_startPosition = _socketBuffer->getPosition();
@@ -369,6 +368,11 @@ namespace epics {
if (bytesRead < 0)
{
LOG(logLevelTrace,
"AbstractCodec::before close (threadId: %u)",
epicsThreadGetIdSelf());
close();
throw connection_closed_exception("bytesRead < 0");
}
@@ -1153,6 +1157,7 @@ namespace epics {
BlockingAbstractCodec *bac = static_cast<BlockingAbstractCodec *>(param);
Transport::shared_pointer ptr (bac->shared_from_this());
while (bac->isOpen())
{
@@ -1181,6 +1186,8 @@ namespace epics {
BlockingAbstractCodec *bac = static_cast<BlockingAbstractCodec *>(param);
Transport::shared_pointer ptr (bac->shared_from_this());
bac->setSenderThread();
while (bac->isOpen())
@@ -1209,7 +1216,7 @@ namespace epics {
LOG(logLevelTrace, "XXXXXXXXXXXXXXXXXXXXXXXXXXXX"
"XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX (threadId: %u)",
epicsThreadGetIdSelf());
//bac->internalDestroy();
bac->internalDestroy();
LOG(logLevelTrace, "YYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYY"
"YYYYYYYYYYYYYYYYYYYYYYYYYYYY (threadId: %u)",
epicsThreadGetIdSelf());
@@ -1246,12 +1253,12 @@ namespace epics {
int32_t sendBufferSize,
int32_t receiveBufferSize):
BlockingAbstractCodec(
new ByteBuffer((std::max<std::size_t>((std::size_t)(
std::tr1::shared_ptr<epics::pvData::ByteBuffer>(new ByteBuffer((std::max<std::size_t>((std::size_t)(
MAX_TCP_RECV + MAX_ENSURE_DATA_BUFFER_SIZE), receiveBufferSize) +
(PVA_ALIGNMENT - 1)) & (~(PVA_ALIGNMENT - 1))),
new ByteBuffer((std::max<std::size_t>((std::size_t)( MAX_TCP_RECV +
(PVA_ALIGNMENT - 1)) & (~(PVA_ALIGNMENT - 1)))),
std::tr1::shared_ptr<epics::pvData::ByteBuffer>(new ByteBuffer((std::max<std::size_t>((std::size_t)( MAX_TCP_RECV +
MAX_ENSURE_DATA_BUFFER_SIZE), receiveBufferSize) + (PVA_ALIGNMENT - 1))
& (~(PVA_ALIGNMENT - 1))), sendBufferSize),
& (~(PVA_ALIGNMENT - 1)))), sendBufferSize),
_channel(channel)
{
@@ -1325,10 +1332,22 @@ namespace epics {
std::size_t remaining;
while((remaining=src->getRemaining()) > 0) {
LOG(logLevelTrace,
"BlockingSocketAbstractCodec::write before send"
" (threadId: %u)",
epicsThreadGetIdSelf());
int bytesSent = ::send(_channel,
&src->getArray()[src->getPosition()],
remaining, 0);
LOG(logLevelTrace,
"BlockingSocketAbstractCodec::write afer send, read:%d"
" (threadId: %u)",
bytesSent, epicsThreadGetIdSelf());
if(unlikely(bytesSent<0)) {
int socketError = SOCKERRNO;
@@ -1390,9 +1409,21 @@ namespace epics {
// read
std::size_t pos = dst->getPosition();
LOG(logLevelTrace,
"BlockingSocketAbstractCodec::read before recv"
" (threadId: %u)",
epicsThreadGetIdSelf());
int bytesRead = recv(_channel,
(char*)(dst->getArray()+pos), remaining, 0);
LOG(logLevelTrace,
"BlockingSocketAbstractCodec::read after recv, read: %d",
bytesRead," (threadId: %u)",
epicsThreadGetIdSelf());
if (IS_LOGGABLE(logLevelTrace)) {
hexDump(std::string("READ"),
@@ -1414,6 +1445,7 @@ namespace epics {
return -1; // 0 means connection loss for blocking transport, notify codec by returning -1
}
dst->setPosition(dst->getPosition() + bytesRead);
return bytesRead;
}