diff --git a/src/server/responseHandlers.cpp b/src/server/responseHandlers.cpp index 341e6c1..4baf98c 100644 --- a/src/server/responseHandlers.cpp +++ b/src/server/responseHandlers.cpp @@ -808,7 +808,7 @@ void ServerGetHandler::handleResponse(osiSockAddr* responseFrom, const bool lastRequest = (QOS_DESTROY & qosCode) != 0; ServerChannelGetRequesterImpl::shared_pointer request = static_pointer_cast(channel->getRequest(ioid)); - if (request == NULL) + if (!request) { BaseChannelRequester::sendFailureMessage((int8)CMD_GET, transport, ioid, qosCode, BaseChannelRequester::badIOIDStatus); return; @@ -822,9 +822,10 @@ void ServerGetHandler::handleResponse(osiSockAddr* responseFrom, MB_POINT(channelGet, 4, "server channelGet->deserialize request (end)"); + ChannelGet::shared_pointer channelGet = request->getChannelGet(); if (lastRequest) - request->getChannelGet()->lastRequest(); - request->getChannelGet()->get(); + channelGet->lastRequest(); + channelGet->get(); } } @@ -920,16 +921,18 @@ void ServerChannelGetRequesterImpl::destroy() // destroyed prematurely shared_pointer self(shared_from_this()); + // hold a reference to channelGet so that _channelGet.reset() + // does not call ~ChannelGet (external code) while we are holding a lock + ChannelGet::shared_pointer channelGet = _channelGet; { Lock guard(_mutex); _channel->unregisterRequest(_ioid); - if (_channelGet != NULL) + if (_channelGet) { _channelGet->destroy(); - } + _channelGet.reset(); + } } - // TODO not competely safe for when callig getChannelGet() now - _channelGet.reset(); } ChannelGet::shared_pointer ServerChannelGetRequesterImpl::getChannelGet() @@ -947,10 +950,20 @@ void ServerChannelGetRequesterImpl::unlock() // noop } +// TODO get rid of all these mutex-es void ServerChannelGetRequesterImpl::send(ByteBuffer* buffer, TransportSendControl* control) { const int32 request = getPendingRequest(); + ChannelGet::shared_pointer channelGet; + { + Lock guard(_mutex); + channelGet = _channelGet; + // we must respond to QOS_INIT (e.g. creation error) + if (!channelGet && !(request & QOS_INIT)) + return; + } + control->startMessage((int8)CMD_GET, sizeof(int32)/sizeof(int8) + 1); buffer->putInt(_ioid); buffer->put((int8)request); @@ -977,8 +990,7 @@ void ServerChannelGetRequesterImpl::send(ByteBuffer* buffer, TransportSendContro { MB_POINT(channelGet, 6, "server channelGet->serialize response (start)"); { - // we locked _mutex above, so _channelGet is valid - ScopedLock lock(_channelGet); + ScopedLock lock(channelGet); _bitSet->serialize(buffer, control); _pvStructure->serialize(buffer, control, _bitSet.get()); @@ -1168,16 +1180,18 @@ void ServerChannelPutRequesterImpl::destroy() // destroyed prematurely shared_pointer self(shared_from_this()); - { + // hold a reference to channelGet so that _channelPut.reset() + // does not call ~ChannelPut (external code) while we are holding a lock + ChannelPut::shared_pointer channelPut = _channelPut; + { Lock guard(_mutex); _channel->unregisterRequest(_ioid); - if (_channelPut != NULL) + if (_channelPut) { _channelPut->destroy(); - } + _channelPut.reset(); + } } - // TODO - _channelPut.reset(); } ChannelPut::shared_pointer ServerChannelPutRequesterImpl::getChannelPut() @@ -1202,7 +1216,16 @@ void ServerChannelPutRequesterImpl::send(ByteBuffer* buffer, TransportSendContro { const int32 request = getPendingRequest(); - control->startMessage((int32)CMD_PUT, sizeof(int32)/sizeof(int8) + 1); + ChannelPut::shared_pointer channelPut; + { + Lock guard(_mutex); + channelPut = _channelPut; + // we must respond to QOS_INIT (e.g. creation error) + if (!channelPut && !(request & QOS_INIT)) + return; + } + + control->startMessage((int32)CMD_PUT, sizeof(int32)/sizeof(int8) + 1); buffer->putInt(_ioid); buffer->putByte((int8)request); { @@ -1219,7 +1242,7 @@ void ServerChannelPutRequesterImpl::send(ByteBuffer* buffer, TransportSendContro } else if ((QOS_GET & request) != 0) { - ScopedLock lock(_channelPut); // _channelPut is valid because we required _mutex above + ScopedLock lock(channelPut); _bitSet->serialize(buffer, control); _pvStructure->serialize(buffer, control, _bitSet.get()); } @@ -1251,7 +1274,7 @@ void ServerPutGetHandler::handleResponse(osiSockAddr* responseFrom, const int8 qosCode = payloadBuffer->getByte(); ServerChannelImpl::shared_pointer channel = static_pointer_cast(casTransport->getChannel(sid)); - if (channel == NULL) + if (channel == NULL) { BaseChannelRequester::sendFailureMessage((int8)CMD_PUT_GET, transport, ioid, qosCode, BaseChannelRequester::badCIDStatus); return; @@ -1423,16 +1446,18 @@ void ServerChannelPutGetRequesterImpl::destroy() // destroyed prematurely shared_pointer self(shared_from_this()); - { + // hold a reference to channelPutGet so that _channelPutGet.reset() + // does not call ~ChannelPutGet (external code) while we are holding a lock + ChannelPutGet::shared_pointer channelPutGet = _channelPutGet; + { Lock guard(_mutex); _channel->unregisterRequest(_ioid); - if (_channelPutGet != NULL) + if (_channelPutGet) { _channelPutGet->destroy(); - } + _channelPutGet.reset(); + } } - // TODO - _channelPutGet.reset(); } ChannelPutGet::shared_pointer ServerChannelPutGetRequesterImpl::getChannelPutGet() @@ -1457,7 +1482,16 @@ void ServerChannelPutGetRequesterImpl::send(ByteBuffer* buffer, TransportSendCon { const int32 request = getPendingRequest(); - control->startMessage((int32)12, sizeof(int32)/sizeof(int8) + 1); + ChannelPutGet::shared_pointer channelPutGet; + { + Lock guard(_mutex); + channelPutGet = _channelPutGet; + // we must respond to QOS_INIT (e.g. creation error) + if (!channelPutGet && !(request & QOS_INIT)) + return; + } + + control->startMessage((int32)12, sizeof(int32)/sizeof(int8) + 1); buffer->putInt(_ioid); buffer->putByte((int8)request); { @@ -1481,14 +1515,14 @@ void ServerChannelPutGetRequesterImpl::send(ByteBuffer* buffer, TransportSendCon } else if ((QOS_GET_PUT & request) != 0) { - ScopedLock lock(_channelPutGet); // valid due to _mutex lock above + ScopedLock lock(channelPutGet); //Lock guard(_mutex); _pvPutBitSet->serialize(buffer, control); _pvPutStructure->serialize(buffer, control, _pvPutBitSet.get()); } else { - ScopedLock lock(_channelPutGet); // valid due to _mutex lock above + ScopedLock lock(channelPutGet); //Lock guard(_mutex); _pvGetBitSet->serialize(buffer, control); _pvGetStructure->serialize(buffer, control, _pvGetBitSet.get()); @@ -1657,16 +1691,18 @@ void ServerMonitorRequesterImpl::destroy() // destroyed prematurely shared_pointer self(shared_from_this()); - { + // hold a reference to channelMonitor so that _channelMonitor.reset() + // does not call ~Monitor (external code) while we are holding a lock + Monitor::shared_pointer monitor = _channelMonitor; + { Lock guard(_mutex); _channel->unregisterRequest(_ioid); - if (_channelMonitor != NULL) + if (_channelMonitor) { _channelMonitor->destroy(); - } + _channelMonitor.reset(); + } } - // TODO - _channelMonitor.reset(); } Monitor::shared_pointer ServerMonitorRequesterImpl::getChannelMonitor() @@ -1701,10 +1737,9 @@ void ServerMonitorRequesterImpl::send(ByteBuffer* buffer, TransportSendControl* else { Monitor::shared_pointer monitor = _channelMonitor; -if(monitor==NULL) { -std::cout << "ServerMonitorRequesterImpl::send monitor is NULL" << std::endl; -return; -} + if (!monitor) + return; + MonitorElement::shared_pointer element = monitor->poll(); if (element != NULL) { @@ -1714,7 +1749,7 @@ return; // changedBitSet and data, if not notify only (i.e. queueSize == -1) BitSet::shared_pointer changedBitSet = element->changedBitSet; - if (changedBitSet != NULL) + if (changedBitSet) { changedBitSet->serialize(buffer, control); element->pvStructurePtr->serialize(buffer, control, changedBitSet.get()); @@ -1936,16 +1971,18 @@ void ServerChannelArrayRequesterImpl::destroy() // destroyed prematurely shared_pointer self(shared_from_this()); - { + // hold a reference to channelArray so that _channelArray.reset() + // does not call ~ChannelArray (external code) while we are holding a lock + ChannelArray::shared_pointer channelArray = _channelArray; + { Lock guard(_mutex); _channel->unregisterRequest(_ioid); - if (_channelArray != NULL) + if (_channelArray) { _channelArray->destroy(); - } + _channelArray.reset(); + } } - // TODO - _channelArray.reset(); } ChannelArray::shared_pointer ServerChannelArrayRequesterImpl::getChannelArray() @@ -1964,7 +2001,16 @@ void ServerChannelArrayRequesterImpl::send(ByteBuffer* buffer, TransportSendCont { const int32 request = getPendingRequest(); - control->startMessage((int32)CMD_ARRAY, sizeof(int32)/sizeof(int8) + 1); + ChannelArray::shared_pointer channelArray; + { + Lock guard(_mutex); + channelArray = _channelArray; + // we must respond to QOS_INIT (e.g. creation error) + if (!channelArray && !(request & QOS_INIT)) + return; + } + + control->startMessage((int32)CMD_ARRAY, sizeof(int32)/sizeof(int8) + 1); buffer->putInt(_ioid); buffer->putByte((int8)request); { @@ -1977,7 +2023,7 @@ void ServerChannelArrayRequesterImpl::send(ByteBuffer* buffer, TransportSendCont if ((QOS_GET & request) != 0) { //Lock guard(_mutex); - ScopedLock lock(_channelArray); // valid due to _mutex lock above + ScopedLock lock(channelArray); _pvArray->serialize(buffer, control, 0, _pvArray->getLength()); _pvArray.reset(); }