request destroy() while the request is in sendQueue fix

This commit is contained in:
Matej Sekoranja
2014-08-04 13:29:37 +02:00
parent 621a9a927f
commit 219e69ceed

View File

@@ -808,7 +808,7 @@ void ServerGetHandler::handleResponse(osiSockAddr* responseFrom,
const bool lastRequest = (QOS_DESTROY & qosCode) != 0;
ServerChannelGetRequesterImpl::shared_pointer request = static_pointer_cast<ServerChannelGetRequesterImpl>(channel->getRequest(ioid));
if (request == NULL)
if (!request)
{
BaseChannelRequester::sendFailureMessage((int8)CMD_GET, transport, ioid, qosCode, BaseChannelRequester::badIOIDStatus);
return;
@@ -822,9 +822,10 @@ void ServerGetHandler::handleResponse(osiSockAddr* responseFrom,
MB_POINT(channelGet, 4, "server channelGet->deserialize request (end)");
ChannelGet::shared_pointer channelGet = request->getChannelGet();
if (lastRequest)
request->getChannelGet()->lastRequest();
request->getChannelGet()->get();
channelGet->lastRequest();
channelGet->get();
}
}
@@ -920,16 +921,18 @@ void ServerChannelGetRequesterImpl::destroy()
// destroyed prematurely
shared_pointer self(shared_from_this());
// hold a reference to channelGet so that _channelGet.reset()
// does not call ~ChannelGet (external code) while we are holding a lock
ChannelGet::shared_pointer channelGet = _channelGet;
{
Lock guard(_mutex);
_channel->unregisterRequest(_ioid);
if (_channelGet != NULL)
if (_channelGet)
{
_channelGet->destroy();
}
_channelGet.reset();
}
}
// TODO not competely safe for when callig getChannelGet() now
_channelGet.reset();
}
ChannelGet::shared_pointer ServerChannelGetRequesterImpl::getChannelGet()
@@ -947,10 +950,20 @@ void ServerChannelGetRequesterImpl::unlock()
// noop
}
// TODO get rid of all these mutex-es
void ServerChannelGetRequesterImpl::send(ByteBuffer* buffer, TransportSendControl* control)
{
const int32 request = getPendingRequest();
ChannelGet::shared_pointer channelGet;
{
Lock guard(_mutex);
channelGet = _channelGet;
// we must respond to QOS_INIT (e.g. creation error)
if (!channelGet && !(request & QOS_INIT))
return;
}
control->startMessage((int8)CMD_GET, sizeof(int32)/sizeof(int8) + 1);
buffer->putInt(_ioid);
buffer->put((int8)request);
@@ -977,8 +990,7 @@ void ServerChannelGetRequesterImpl::send(ByteBuffer* buffer, TransportSendContro
{
MB_POINT(channelGet, 6, "server channelGet->serialize response (start)");
{
// we locked _mutex above, so _channelGet is valid
ScopedLock lock(_channelGet);
ScopedLock lock(channelGet);
_bitSet->serialize(buffer, control);
_pvStructure->serialize(buffer, control, _bitSet.get());
@@ -1168,16 +1180,18 @@ void ServerChannelPutRequesterImpl::destroy()
// destroyed prematurely
shared_pointer self(shared_from_this());
{
// hold a reference to channelGet so that _channelPut.reset()
// does not call ~ChannelPut (external code) while we are holding a lock
ChannelPut::shared_pointer channelPut = _channelPut;
{
Lock guard(_mutex);
_channel->unregisterRequest(_ioid);
if (_channelPut != NULL)
if (_channelPut)
{
_channelPut->destroy();
}
_channelPut.reset();
}
}
// TODO
_channelPut.reset();
}
ChannelPut::shared_pointer ServerChannelPutRequesterImpl::getChannelPut()
@@ -1202,7 +1216,16 @@ void ServerChannelPutRequesterImpl::send(ByteBuffer* buffer, TransportSendContro
{
const int32 request = getPendingRequest();
control->startMessage((int32)CMD_PUT, sizeof(int32)/sizeof(int8) + 1);
ChannelPut::shared_pointer channelPut;
{
Lock guard(_mutex);
channelPut = _channelPut;
// we must respond to QOS_INIT (e.g. creation error)
if (!channelPut && !(request & QOS_INIT))
return;
}
control->startMessage((int32)CMD_PUT, sizeof(int32)/sizeof(int8) + 1);
buffer->putInt(_ioid);
buffer->putByte((int8)request);
{
@@ -1219,7 +1242,7 @@ void ServerChannelPutRequesterImpl::send(ByteBuffer* buffer, TransportSendContro
}
else if ((QOS_GET & request) != 0)
{
ScopedLock lock(_channelPut); // _channelPut is valid because we required _mutex above
ScopedLock lock(channelPut);
_bitSet->serialize(buffer, control);
_pvStructure->serialize(buffer, control, _bitSet.get());
}
@@ -1251,7 +1274,7 @@ void ServerPutGetHandler::handleResponse(osiSockAddr* responseFrom,
const int8 qosCode = payloadBuffer->getByte();
ServerChannelImpl::shared_pointer channel = static_pointer_cast<ServerChannelImpl>(casTransport->getChannel(sid));
if (channel == NULL)
if (channel == NULL)
{
BaseChannelRequester::sendFailureMessage((int8)CMD_PUT_GET, transport, ioid, qosCode, BaseChannelRequester::badCIDStatus);
return;
@@ -1423,16 +1446,18 @@ void ServerChannelPutGetRequesterImpl::destroy()
// destroyed prematurely
shared_pointer self(shared_from_this());
{
// hold a reference to channelPutGet so that _channelPutGet.reset()
// does not call ~ChannelPutGet (external code) while we are holding a lock
ChannelPutGet::shared_pointer channelPutGet = _channelPutGet;
{
Lock guard(_mutex);
_channel->unregisterRequest(_ioid);
if (_channelPutGet != NULL)
if (_channelPutGet)
{
_channelPutGet->destroy();
}
_channelPutGet.reset();
}
}
// TODO
_channelPutGet.reset();
}
ChannelPutGet::shared_pointer ServerChannelPutGetRequesterImpl::getChannelPutGet()
@@ -1457,7 +1482,16 @@ void ServerChannelPutGetRequesterImpl::send(ByteBuffer* buffer, TransportSendCon
{
const int32 request = getPendingRequest();
control->startMessage((int32)12, sizeof(int32)/sizeof(int8) + 1);
ChannelPutGet::shared_pointer channelPutGet;
{
Lock guard(_mutex);
channelPutGet = _channelPutGet;
// we must respond to QOS_INIT (e.g. creation error)
if (!channelPutGet && !(request & QOS_INIT))
return;
}
control->startMessage((int32)12, sizeof(int32)/sizeof(int8) + 1);
buffer->putInt(_ioid);
buffer->putByte((int8)request);
{
@@ -1481,14 +1515,14 @@ void ServerChannelPutGetRequesterImpl::send(ByteBuffer* buffer, TransportSendCon
}
else if ((QOS_GET_PUT & request) != 0)
{
ScopedLock lock(_channelPutGet); // valid due to _mutex lock above
ScopedLock lock(channelPutGet);
//Lock guard(_mutex);
_pvPutBitSet->serialize(buffer, control);
_pvPutStructure->serialize(buffer, control, _pvPutBitSet.get());
}
else
{
ScopedLock lock(_channelPutGet); // valid due to _mutex lock above
ScopedLock lock(channelPutGet);
//Lock guard(_mutex);
_pvGetBitSet->serialize(buffer, control);
_pvGetStructure->serialize(buffer, control, _pvGetBitSet.get());
@@ -1657,16 +1691,18 @@ void ServerMonitorRequesterImpl::destroy()
// destroyed prematurely
shared_pointer self(shared_from_this());
{
// hold a reference to channelMonitor so that _channelMonitor.reset()
// does not call ~Monitor (external code) while we are holding a lock
Monitor::shared_pointer monitor = _channelMonitor;
{
Lock guard(_mutex);
_channel->unregisterRequest(_ioid);
if (_channelMonitor != NULL)
if (_channelMonitor)
{
_channelMonitor->destroy();
}
_channelMonitor.reset();
}
}
// TODO
_channelMonitor.reset();
}
Monitor::shared_pointer ServerMonitorRequesterImpl::getChannelMonitor()
@@ -1701,10 +1737,9 @@ void ServerMonitorRequesterImpl::send(ByteBuffer* buffer, TransportSendControl*
else
{
Monitor::shared_pointer monitor = _channelMonitor;
if(monitor==NULL) {
std::cout << "ServerMonitorRequesterImpl::send monitor is NULL" << std::endl;
return;
}
if (!monitor)
return;
MonitorElement::shared_pointer element = monitor->poll();
if (element != NULL)
{
@@ -1714,7 +1749,7 @@ return;
// changedBitSet and data, if not notify only (i.e. queueSize == -1)
BitSet::shared_pointer changedBitSet = element->changedBitSet;
if (changedBitSet != NULL)
if (changedBitSet)
{
changedBitSet->serialize(buffer, control);
element->pvStructurePtr->serialize(buffer, control, changedBitSet.get());
@@ -1936,16 +1971,18 @@ void ServerChannelArrayRequesterImpl::destroy()
// destroyed prematurely
shared_pointer self(shared_from_this());
{
// hold a reference to channelArray so that _channelArray.reset()
// does not call ~ChannelArray (external code) while we are holding a lock
ChannelArray::shared_pointer channelArray = _channelArray;
{
Lock guard(_mutex);
_channel->unregisterRequest(_ioid);
if (_channelArray != NULL)
if (_channelArray)
{
_channelArray->destroy();
}
_channelArray.reset();
}
}
// TODO
_channelArray.reset();
}
ChannelArray::shared_pointer ServerChannelArrayRequesterImpl::getChannelArray()
@@ -1964,7 +2001,16 @@ void ServerChannelArrayRequesterImpl::send(ByteBuffer* buffer, TransportSendCont
{
const int32 request = getPendingRequest();
control->startMessage((int32)CMD_ARRAY, sizeof(int32)/sizeof(int8) + 1);
ChannelArray::shared_pointer channelArray;
{
Lock guard(_mutex);
channelArray = _channelArray;
// we must respond to QOS_INIT (e.g. creation error)
if (!channelArray && !(request & QOS_INIT))
return;
}
control->startMessage((int32)CMD_ARRAY, sizeof(int32)/sizeof(int8) + 1);
buffer->putInt(_ioid);
buffer->putByte((int8)request);
{
@@ -1977,7 +2023,7 @@ void ServerChannelArrayRequesterImpl::send(ByteBuffer* buffer, TransportSendCont
if ((QOS_GET & request) != 0)
{
//Lock guard(_mutex);
ScopedLock lock(_channelArray); // valid due to _mutex lock above
ScopedLock lock(channelArray);
_pvArray->serialize(buffer, control, 0, _pvArray->getLength());
_pvArray.reset();
}