shared_pointer cycles

This commit is contained in:
Matej Sekoranja
2011-05-17 23:32:39 +02:00
parent 9c1eda3655
commit 484da96da6
9 changed files with 176 additions and 180 deletions

View File

@@ -281,11 +281,12 @@ void ServerCreateChannelHandler::disconnect(Transport::shared_pointer const & tr
ServerChannelRequesterImpl::ServerChannelRequesterImpl(Transport::shared_pointer const & transport,
const String channelName, const pvAccessID cid) :
_serverChannel(),
_transport(transport),
_channelName(channelName),
_cid(cid),
_status(),
_channel()
_mutex()
{
}
@@ -301,22 +302,81 @@ ChannelRequester::shared_pointer ServerChannelRequesterImpl::create(
void ServerChannelRequesterImpl::channelCreated(const Status& status, Channel::shared_pointer const & channel)
{
Lock guard(_mutex);
_status = status;
_channel = channel;
TransportSender::shared_pointer thisSender = shared_from_this();
_transport->enqueueSendRequest(thisSender);
if(Transport::shared_pointer transport = _transport.lock())
{
ServerChannel::shared_pointer serverChannel;
try
{
if (status.isSuccess())
{
// NOTE: we do not explicitly check if transport OK
ChannelHostingTransport::shared_pointer casTransport = dynamic_pointer_cast<ChannelHostingTransport>(transport);
if (!casTransport.get())
THROW_BASE_EXCEPTION("transport is unable to host channels");
//
// create a new channel instance
//
pvAccessID sid = casTransport->preallocateChannelSID();
try
{
epics::pvData::PVField::shared_pointer securityToken = casTransport->getSecurityToken();
serverChannel.reset(new ServerChannelImpl(channel, _cid, sid, securityToken));
// ack allocation and register
casTransport->registerChannel(sid, serverChannel);
} catch (...)
{
// depreallocate and rethrow
casTransport->depreallocateChannelSID(sid);
throw;
}
}
{
Lock guard(_mutex);
_status = status;
_serverChannel = serverChannel;
}
TransportSender::shared_pointer thisSender = shared_from_this();
transport->enqueueSendRequest(thisSender);
}
catch (std::exception& e)
{
errlogSevPrintf(errlogMinor, "Exception caught when creating channel: %s", _channelName.c_str());
{
Lock guard(_mutex);
_status = Status(Status::STATUSTYPE_FATAL, "failed to create channel", e.what());
}
TransportSender::shared_pointer thisSender = shared_from_this();
transport->enqueueSendRequest(thisSender);
// TODO make sure that serverChannel gets destroyed
}
catch (...)
{
errlogSevPrintf(errlogMinor, "Exception caught when creating channel: %s", _channelName.c_str());
{
Lock guard(_mutex);
_status = Status(Status::STATUSTYPE_FATAL, "failed to create channel");
}
TransportSender::shared_pointer thisSender = shared_from_this();
transport->enqueueSendRequest(thisSender);
// TODO make sure that serverChannel gets destroyed
}
}
}
void ServerChannelRequesterImpl::channelStateChange(Channel::shared_pointer const & c, const Channel::ConnectionState isConnected)
{
//noop
// TODO should we notify remote side?
}
String ServerChannelRequesterImpl::getRequesterName()
{
stringstream name;
name << typeid(*_transport).name() << "/" << _cid;
name << "ServerChannelRequesterImpl/" << _channelName << "[" << _cid << "]";
return name.str();
}
@@ -337,77 +397,40 @@ void ServerChannelRequesterImpl::unlock()
void ServerChannelRequesterImpl::send(ByteBuffer* buffer, TransportSendControl* control)
{
Channel::shared_pointer channel;
ServerChannel::shared_pointer serverChannel;
Status status;
{
Lock guard(_mutex);
channel = _channel;
serverChannel = _serverChannel.lock();
status = _status;
// TODO
_channel.reset();
}
// error response
if (channel.get() == NULL)
if (serverChannel.get() == NULL)
{
createChannelFailedResponse(buffer, control, status);
}
// OK
else
else if (Transport::shared_pointer transport = _transport.lock())
{
ServerChannel::shared_pointer serverChannel;
try
{
// NOTE: we do not explicitly check if transport OK
ChannelHostingTransport::shared_pointer casTransport = dynamic_pointer_cast<ChannelHostingTransport>(_transport);
//
// create a new channel instance
//
pvAccessID sid = casTransport->preallocateChannelSID();
try
{
epics::pvData::PVField::shared_pointer securityToken = casTransport->getSecurityToken();
serverChannel.reset(new ServerChannelImpl(channel, _cid, sid, securityToken));
// ack allocation and register
casTransport->registerChannel(sid, serverChannel);
} catch (...)
{
// depreallocate and rethrow
casTransport->depreallocateChannelSID(sid);
throw;
}
control->startMessage((int8)7, 2*sizeof(int32)/sizeof(int8));
buffer->putInt(_cid);
buffer->putInt(sid);
_transport->getIntrospectionRegistry()->serializeStatus(buffer, control, status);
}
catch (std::exception& e)
{
errlogSevPrintf(errlogMinor, "Exception caught when creating channel: %s", _channelName.c_str());
createChannelFailedResponse(buffer, control, Status(Status::STATUSTYPE_FATAL, "failed to create channel", e.what()));
// TODO make sure that serverChannel gets destroyed
}
catch (...)
{
errlogSevPrintf(errlogMinor, "Exception caught when creating channel: %s", _channelName.c_str());
createChannelFailedResponse(buffer, control, Status(Status::STATUSTYPE_FATAL, "failed to create channel"));
// TODO make sure that serverChannel gets destroyed
}
ServerChannelImpl::shared_pointer serverChannelImpl = dynamic_pointer_cast<ServerChannelImpl>(serverChannel);
control->startMessage((int8)CMD_CREATE_CHANNEL, 2*sizeof(int32)/sizeof(int8));
buffer->putInt(serverChannelImpl->getCID());
buffer->putInt(serverChannelImpl->getSID());
transport->getIntrospectionRegistry()->serializeStatus(buffer, control, status);
}
}
void ServerChannelRequesterImpl::createChannelFailedResponse(ByteBuffer* buffer, TransportSendControl* control, const Status& status)
{
control->startMessage((int8)7, 2*sizeof(int32)/sizeof(int8));
buffer->putInt(_cid);
buffer->putInt(-1);
_transport->getIntrospectionRegistry()->serializeStatus(buffer, control, status);
if (Transport::shared_pointer transport = _transport.lock())
{
control->startMessage((int8)CMD_CREATE_CHANNEL, 2*sizeof(int32)/sizeof(int8));
buffer->putInt(_cid);
buffer->putInt(-1);
transport->getIntrospectionRegistry()->serializeStatus(buffer, control, status);
}
}
/****************************************************************************************/
@@ -473,7 +496,7 @@ void ServerGetHandler::handleResponse(osiSockAddr* responseFrom,
ServerChannelImpl::shared_pointer channel = static_pointer_cast<ServerChannelImpl>(casTransport->getChannel(sid));
if (channel.get() == NULL)
{
BaseChannelRequester::sendFailureMessage((int8)10, transport, ioid, qosCode, BaseChannelRequester::badCIDStatus);
BaseChannelRequester::sendFailureMessage((int8)CMD_GET, transport, ioid, qosCode, BaseChannelRequester::badCIDStatus);
return;
}
@@ -584,6 +607,8 @@ void ServerChannelGetRequesterImpl::destroy()
_channelGet->destroy();
}
}
// TODO not competely safe for when callig getChannelGet() now
_channelGet.reset();
}
ChannelGet::shared_pointer ServerChannelGetRequesterImpl::getChannelGet()
@@ -605,7 +630,7 @@ void ServerChannelGetRequesterImpl::send(ByteBuffer* buffer, TransportSendContro
{
const int32 request = getPendingRequest();
control->startMessage((int8)10, sizeof(int32)/sizeof(int8) + 1);
control->startMessage((int8)CMD_GET, sizeof(int32)/sizeof(int8) + 1);
buffer->putInt(_ioid);
buffer->put((int8)request);
IntrospectionRegistry* introspectionRegistry = _transport->getIntrospectionRegistry();
@@ -788,6 +813,8 @@ void ServerChannelPutRequesterImpl::destroy()
_channelPut->destroy();
}
}
// TODO
_channelPut.reset();
}
ChannelPut::shared_pointer ServerChannelPutRequesterImpl::getChannelPut()
@@ -1006,6 +1033,8 @@ void ServerChannelPutGetRequesterImpl::destroy()
_channelPutGet->destroy();
}
}
// TODO
_channelPutGet.reset();
}
ChannelPutGet::shared_pointer ServerChannelPutGetRequesterImpl::getChannelPutGet()
@@ -1224,6 +1253,8 @@ void ServerMonitorRequesterImpl::destroy()
_channelMonitor->destroy();
}
}
// TODO
_channelMonitor.reset();
}
Monitor::shared_pointer ServerMonitorRequesterImpl::getChannelMonitor()
@@ -1450,6 +1481,8 @@ void ServerChannelArrayRequesterImpl::destroy()
_channelArray->destroy();
}
}
// TODO
_channelArray.reset();
}
ChannelArray::shared_pointer ServerChannelArrayRequesterImpl::getChannelArray()
@@ -1664,6 +1697,8 @@ void ServerChannelProcessRequesterImpl::destroy()
_channelProcess->destroy();
}
}
// TODO
_channelProcess.reset();
}
ChannelProcess::shared_pointer ServerChannelProcessRequesterImpl::getChannelProcess()
@@ -1909,6 +1944,8 @@ void ServerChannelRPCRequesterImpl::destroy()
_channelRPC->destroy();
}
}
// TODO
_channelRPC.reset();
}
ChannelRPC::shared_pointer ServerChannelRPCRequesterImpl::getChannelRPC()