From 484da96da67abe9daadaad8228df2539c51005bf Mon Sep 17 00:00:00 2001 From: Matej Sekoranja Date: Tue, 17 May 2011 23:32:39 +0200 Subject: [PATCH] shared_pointer cycles --- .../remote/blockingServerTCPTransport.cpp | 1 + pvAccessApp/remote/blockingTCPTransport.cpp | 2 +- pvAccessApp/remote/remote.h | 4 + .../remoteClient/clientContextImpl.cpp | 173 ++++++------------ pvAccessApp/server/responseHandlers.cpp | 163 ++++++++++------- pvAccessApp/server/responseHandlers.h | 4 +- pvAccessApp/server/serverChannelImpl.h | 2 + pvAccessApp/utils/introspectionRegistry.cpp | 5 + testApp/remote/testServer.cpp | 2 +- 9 files changed, 176 insertions(+), 180 deletions(-) diff --git a/pvAccessApp/remote/blockingServerTCPTransport.cpp b/pvAccessApp/remote/blockingServerTCPTransport.cpp index 3983ab4..b029ac1 100644 --- a/pvAccessApp/remote/blockingServerTCPTransport.cpp +++ b/pvAccessApp/remote/blockingServerTCPTransport.cpp @@ -62,6 +62,7 @@ namespace epics { } void BlockingServerTCPTransport::internalClose(bool force) { + Transport::shared_pointer thisSharedPtr = shared_from_this(); BlockingTCPTransport::internalClose(force); destroyAllChannels(); } diff --git a/pvAccessApp/remote/blockingTCPTransport.cpp b/pvAccessApp/remote/blockingTCPTransport.cpp index 9446a6e..815b542 100644 --- a/pvAccessApp/remote/blockingTCPTransport.cpp +++ b/pvAccessApp/remote/blockingTCPTransport.cpp @@ -224,7 +224,7 @@ namespace epics { // remove from registry Transport::shared_pointer thisSharedPtr = shared_from_this(); - _context->getTransportRegistry()->remove(thisSharedPtr); + _context->getTransportRegistry()->remove(thisSharedPtr).get(); // clean resources internalClose(force); diff --git a/pvAccessApp/remote/remote.h b/pvAccessApp/remote/remote.h index 47c9692..0cdad70 100644 --- a/pvAccessApp/remote/remote.h +++ b/pvAccessApp/remote/remote.h @@ -142,6 +142,8 @@ namespace epics { public: typedef std::tr1::shared_ptr shared_pointer; typedef std::tr1::shared_ptr const_shared_pointer; + typedef std::tr1::weak_ptr weak_pointer; + typedef std::tr1::weak_ptr const_weak_pointer; virtual ~Transport() {} @@ -432,6 +434,8 @@ namespace epics { public: typedef std::tr1::shared_ptr shared_pointer; typedef std::tr1::shared_ptr const_shared_pointer; + typedef std::tr1::weak_ptr weak_pointer; + typedef std::tr1::weak_ptr const_weak_pointer; virtual ~ServerChannel() {} /** diff --git a/pvAccessApp/remoteClient/clientContextImpl.cpp b/pvAccessApp/remoteClient/clientContextImpl.cpp index 50cd084..4e15124 100644 --- a/pvAccessApp/remoteClient/clientContextImpl.cpp +++ b/pvAccessApp/remoteClient/clientContextImpl.cpp @@ -1681,7 +1681,29 @@ namespace epics { + class MonitorElementImpl : public MonitorElement + { + public: + + PVStructure::shared_pointer m_pvStructure; + BitSet::shared_pointer m_changedBitSet; + BitSet::shared_pointer m_overrunBitSet; + + virtual PVStructure::shared_pointer const & getPVStructure() + { + return m_pvStructure; + } + virtual BitSet::shared_pointer const & getChangedBitSet() + { + return m_changedBitSet; + } + + virtual BitSet::shared_pointer const & getOverrunBitSet() + { + return m_overrunBitSet; + } + }; class MonitorStrategy : public Monitor { @@ -1693,12 +1715,11 @@ namespace epics { class MonitorStrategyNotify : public MonitorStrategy, - public MonitorElement, public std::tr1::enable_shared_from_this { private: - MonitorRequester::shared_pointer m_callback; + MonitorRequester::shared_pointer m_callback; bool m_gotMonitor; Mutex m_mutex; @@ -1706,12 +1727,12 @@ namespace epics { PVStructure::shared_pointer nullPVStructure; MonitorElement::shared_pointer m_nullMonitorElement; - MonitorElement::shared_pointer m_thisPtr; + MonitorElement::shared_pointer m_monitorElement; public: MonitorStrategyNotify(MonitorRequester::shared_pointer const & callback) : - m_callback(callback), m_gotMonitor(false), m_mutex() + m_callback(callback), m_gotMonitor(false), m_mutex(), m_monitorElement(new MonitorElementImpl()) { } @@ -1736,17 +1757,12 @@ namespace epics { if (m_gotMonitor) return m_nullMonitorElement; else - { - // TODO this is not OK!!! requires destroy() call to clean-up - m_thisPtr = shared_from_this(); - return m_thisPtr; - } + return m_monitorElement; } virtual void release(MonitorElement::shared_pointer const & monitorElement) { Lock guard(m_mutex); m_gotMonitor = false; - m_thisPtr.reset(); } Status start() { @@ -1758,50 +1774,29 @@ namespace epics { } void destroy() { - m_thisPtr.reset(); } - // ============ MonitorElement ============ - - virtual PVStructure::shared_pointer const & getPVStructure() - { - return nullPVStructure; - } - - virtual BitSet::shared_pointer const & getChangedBitSet() - { - return nullBitSet; - } - - virtual BitSet::shared_pointer const & getOverrunBitSet() - { - return nullBitSet; - } }; class MonitorStrategyEntire : public MonitorStrategy, - public MonitorElement, public std::tr1::enable_shared_from_this { private: - MonitorRequester::shared_pointer m_callback; + MonitorRequester::shared_pointer m_callback; bool m_gotMonitor; Mutex m_mutex; - PVStructure::shared_pointer m_monitorElementStructure; - BitSet::shared_pointer m_monitorElementChangeBitSet; - BitSet::shared_pointer m_monitorElementOverrunBitSet; - MonitorElement::shared_pointer m_nullMonitorElement; - MonitorElement::shared_pointer m_thisPtr; + MonitorElement::shared_pointer m_monitorElement; + std::tr1::shared_ptr m_monitorElementImpl; public: MonitorStrategyEntire(MonitorRequester::shared_pointer const & callback) : - m_callback(callback), m_gotMonitor(false), m_mutex() + m_callback(callback), m_gotMonitor(false), m_mutex(), m_monitorElement(new MonitorElementImpl()), m_monitorElementImpl(static_pointer_cast(m_monitorElement)) { } @@ -1812,18 +1807,18 @@ namespace epics { virtual void init(StructureConstPtr const & structure) { Lock guard(m_mutex); - m_monitorElementStructure.reset(getPVDataCreate()->createPVStructure(0, structure)); - int numberFields = m_monitorElementStructure->getNumberFields(); - m_monitorElementChangeBitSet.reset(new BitSet(numberFields)); - m_monitorElementOverrunBitSet.reset(new BitSet(numberFields)); + m_monitorElementImpl->m_pvStructure.reset(getPVDataCreate()->createPVStructure(0, structure)); + int numberFields = m_monitorElementImpl->m_pvStructure->getNumberFields(); + m_monitorElementImpl->m_changedBitSet.reset(new BitSet(numberFields)); + m_monitorElementImpl->m_overrunBitSet.reset(new BitSet(numberFields)); } virtual void response(Transport::shared_pointer const & transport, ByteBuffer* payloadBuffer) { Lock guard(m_mutex); // simply deserialize and notify - m_monitorElementChangeBitSet->deserialize(payloadBuffer, transport.get()); - m_monitorElementStructure->deserialize(payloadBuffer, transport.get(), m_monitorElementChangeBitSet.get()); - m_monitorElementOverrunBitSet->deserialize(payloadBuffer, transport.get()); + m_monitorElementImpl->m_changedBitSet->deserialize(payloadBuffer, transport.get()); + m_monitorElementImpl->m_pvStructure->deserialize(payloadBuffer, transport.get(), m_monitorElementImpl->m_changedBitSet.get()); + m_monitorElementImpl->m_overrunBitSet->deserialize(payloadBuffer, transport.get()); m_gotMonitor = true; Monitor::shared_pointer thisMonitor = shared_from_this(); m_callback->monitorEvent(thisMonitor); @@ -1834,23 +1829,17 @@ namespace epics { if (m_gotMonitor) return m_nullMonitorElement; else - { - // TODO this is not OK!!! requires destroy() call to clean-up - m_thisPtr = shared_from_this(); - return m_thisPtr; - } + return m_monitorElement; } virtual void release(MonitorElement::shared_pointer const & monitorElement) { Lock guard(m_mutex); m_gotMonitor = false; - m_thisPtr.reset(); } Status start() { Lock guard(m_mutex); m_gotMonitor = false; - m_thisPtr.reset(); return Status::OK; } @@ -1859,31 +1848,13 @@ namespace epics { } void destroy() { - m_thisPtr.reset(); } - // ============ MonitorElement ============ - - virtual PVStructure::shared_pointer const & getPVStructure() - { - return m_monitorElementStructure; - } - - virtual BitSet::shared_pointer const & getChangedBitSet() - { - return m_monitorElementChangeBitSet; - } - - virtual BitSet::shared_pointer const & getOverrunBitSet() - { - return m_monitorElementOverrunBitSet; - } }; class MonitorStrategySingle : public MonitorStrategy, - public MonitorElement, public std::tr1::enable_shared_from_this { private: @@ -1893,23 +1864,20 @@ namespace epics { bool m_gotMonitor; Mutex m_mutex; - MonitorElement::shared_pointer nullMonitorElement; - PVStructure::shared_pointer m_monitorElementStructure; - BitSet::shared_pointer m_monitorElementChangeBitSet; - BitSet::shared_pointer m_monitorElementOverrunBitSet; - BitSet::shared_pointer m_dataChangeBitSet; BitSet::shared_pointer m_dataOverrunBitSet; bool m_needToCompress; - MonitorElement::shared_pointer thisMonitorElement; + MonitorElement::shared_pointer m_nullMonitorElement; + MonitorElement::shared_pointer m_monitorElement; + std::tr1::shared_ptr m_monitorElementImpl; public: MonitorStrategySingle(MonitorRequester::shared_pointer callback) : m_callback(callback), m_gotMonitor(false), m_mutex(), - m_needToCompress(false) + m_needToCompress(false), m_monitorElement(new MonitorElementImpl()), m_monitorElementImpl(static_pointer_cast(m_monitorElement)) { } @@ -1920,10 +1888,10 @@ namespace epics { virtual void init(StructureConstPtr const & structure) { Lock guard(m_mutex); - m_monitorElementStructure.reset(getPVDataCreate()->createPVStructure(0, structure)); - int numberFields = m_monitorElementStructure->getNumberFields(); - m_monitorElementChangeBitSet.reset(new BitSet(numberFields)); - m_monitorElementOverrunBitSet.reset(new BitSet(numberFields)); + m_monitorElementImpl->m_pvStructure.reset(getPVDataCreate()->createPVStructure(0, structure)); + int numberFields = m_monitorElementImpl->m_pvStructure->getNumberFields(); + m_monitorElementImpl->m_changedBitSet.reset(new BitSet(numberFields)); + m_monitorElementImpl->m_overrunBitSet.reset(new BitSet(numberFields)); m_dataChangeBitSet.reset(new BitSet(numberFields)); m_dataOverrunBitSet.reset(new BitSet(numberFields)); @@ -1936,9 +1904,9 @@ namespace epics { if (!m_gotMonitor) { // simply deserialize and notify - m_monitorElementChangeBitSet->deserialize(payloadBuffer, transport.get()); - m_monitorElementStructure->deserialize(payloadBuffer, transport.get(), m_monitorElementChangeBitSet.get()); - m_monitorElementOverrunBitSet->deserialize(payloadBuffer, transport.get()); + m_monitorElementImpl->m_changedBitSet->deserialize(payloadBuffer, transport.get()); + m_monitorElementImpl->m_pvStructure->deserialize(payloadBuffer, transport.get(), m_monitorElementImpl->m_changedBitSet.get()); + m_monitorElementImpl->m_overrunBitSet->deserialize(payloadBuffer, transport.get()); m_gotMonitor = true; Monitor::shared_pointer thisMonitor = shared_from_this(); m_callback->monitorEvent(thisMonitor); @@ -1947,52 +1915,48 @@ namespace epics { { // deserialize first m_dataChangeBitSet->deserialize(payloadBuffer, transport.get()); - m_monitorElementStructure->deserialize(payloadBuffer, transport.get(), m_dataChangeBitSet.get()); + m_monitorElementImpl->m_pvStructure->deserialize(payloadBuffer, transport.get(), m_dataChangeBitSet.get()); m_dataOverrunBitSet->deserialize(payloadBuffer, transport.get()); // OR local overrun // TODO should work only on uncompressed - m_monitorElementOverrunBitSet->or_and(*m_dataChangeBitSet.get(), *m_monitorElementChangeBitSet.get()); + m_monitorElementImpl->m_overrunBitSet->or_and(*m_dataChangeBitSet.get(), *m_monitorElementImpl->m_changedBitSet.get()); // OR new changes - *m_monitorElementChangeBitSet |= *m_dataChangeBitSet.get(); + *(m_monitorElementImpl->m_changedBitSet) |= *m_dataChangeBitSet.get(); // OR remote overrun - *m_monitorElementOverrunBitSet |= *m_dataOverrunBitSet.get(); + *(m_monitorElementImpl->m_overrunBitSet) |= *m_dataOverrunBitSet.get(); } } virtual MonitorElement::shared_pointer const & poll() { Lock guard(m_mutex); - if (!m_gotMonitor) return nullMonitorElement; + if (!m_gotMonitor) return m_nullMonitorElement; // compress if needed if (m_needToCompress) { - BitSetUtil::compress(m_monitorElementChangeBitSet.get(), m_monitorElementStructure.get()); - BitSetUtil::compress(m_monitorElementOverrunBitSet.get(), m_monitorElementStructure.get()); + BitSetUtil::compress(m_monitorElementImpl->m_changedBitSet.get(), m_monitorElementImpl->m_pvStructure.get()); + BitSetUtil::compress(m_monitorElementImpl->m_overrunBitSet.get(), m_monitorElementImpl->m_pvStructure.get()); m_needToCompress = false; } - // TODO fix this - thisMonitorElement = shared_from_this(); - return thisMonitorElement; + return m_monitorElement; } virtual void release(MonitorElement::shared_pointer const & monitorElement) { Lock guard(m_mutex); m_gotMonitor = false; - thisMonitorElement.reset(); } Status start() { Lock guard(m_mutex); - if (!m_monitorElementChangeBitSet) + if (!m_monitorElementImpl->m_changedBitSet.get()) return Status(Status::STATUSTYPE_ERROR, "Monitor not connected."); m_gotMonitor = false; - thisMonitorElement.reset(); - m_monitorElementChangeBitSet->clear(); - m_monitorElementOverrunBitSet->clear(); + m_monitorElementImpl->m_changedBitSet->clear(); + m_monitorElementImpl->m_overrunBitSet->clear(); return Status::OK; } @@ -2001,25 +1965,8 @@ namespace epics { } void destroy() { - thisMonitorElement.reset(); } - // ============ MonitorElement ============ - - virtual PVStructure::shared_pointer const & getPVStructure() - { - return m_monitorElementStructure; - } - - virtual BitSet::shared_pointer const & getChangedBitSet() - { - return m_monitorElementChangeBitSet; - } - - virtual BitSet::shared_pointer const & getOverrunBitSet() - { - return m_monitorElementOverrunBitSet; - } }; diff --git a/pvAccessApp/server/responseHandlers.cpp b/pvAccessApp/server/responseHandlers.cpp index 9f4fb0a..feededb 100644 --- a/pvAccessApp/server/responseHandlers.cpp +++ b/pvAccessApp/server/responseHandlers.cpp @@ -281,11 +281,12 @@ void ServerCreateChannelHandler::disconnect(Transport::shared_pointer const & tr ServerChannelRequesterImpl::ServerChannelRequesterImpl(Transport::shared_pointer const & transport, const String channelName, const pvAccessID cid) : + _serverChannel(), _transport(transport), _channelName(channelName), _cid(cid), _status(), - _channel() + _mutex() { } @@ -301,22 +302,81 @@ ChannelRequester::shared_pointer ServerChannelRequesterImpl::create( void ServerChannelRequesterImpl::channelCreated(const Status& status, Channel::shared_pointer const & channel) { - Lock guard(_mutex); - _status = status; - _channel = channel; - TransportSender::shared_pointer thisSender = shared_from_this(); - _transport->enqueueSendRequest(thisSender); + if(Transport::shared_pointer transport = _transport.lock()) + { + ServerChannel::shared_pointer serverChannel; + try + { + if (status.isSuccess()) + { + // NOTE: we do not explicitly check if transport OK + ChannelHostingTransport::shared_pointer casTransport = dynamic_pointer_cast(transport); + if (!casTransport.get()) + THROW_BASE_EXCEPTION("transport is unable to host channels"); + + // + // create a new channel instance + // + pvAccessID sid = casTransport->preallocateChannelSID(); + try + { + epics::pvData::PVField::shared_pointer securityToken = casTransport->getSecurityToken(); + serverChannel.reset(new ServerChannelImpl(channel, _cid, sid, securityToken)); + + // ack allocation and register + casTransport->registerChannel(sid, serverChannel); + + } catch (...) + { + // depreallocate and rethrow + casTransport->depreallocateChannelSID(sid); + throw; + } + } + + { + Lock guard(_mutex); + _status = status; + _serverChannel = serverChannel; + } + + TransportSender::shared_pointer thisSender = shared_from_this(); + transport->enqueueSendRequest(thisSender); + } + catch (std::exception& e) + { + errlogSevPrintf(errlogMinor, "Exception caught when creating channel: %s", _channelName.c_str()); + { + Lock guard(_mutex); + _status = Status(Status::STATUSTYPE_FATAL, "failed to create channel", e.what()); + } + TransportSender::shared_pointer thisSender = shared_from_this(); + transport->enqueueSendRequest(thisSender); + // TODO make sure that serverChannel gets destroyed + } + catch (...) + { + errlogSevPrintf(errlogMinor, "Exception caught when creating channel: %s", _channelName.c_str()); + { + Lock guard(_mutex); + _status = Status(Status::STATUSTYPE_FATAL, "failed to create channel"); + } + TransportSender::shared_pointer thisSender = shared_from_this(); + transport->enqueueSendRequest(thisSender); + // TODO make sure that serverChannel gets destroyed + } + } } void ServerChannelRequesterImpl::channelStateChange(Channel::shared_pointer const & c, const Channel::ConnectionState isConnected) { - //noop + // TODO should we notify remote side? } String ServerChannelRequesterImpl::getRequesterName() { stringstream name; - name << typeid(*_transport).name() << "/" << _cid; + name << "ServerChannelRequesterImpl/" << _channelName << "[" << _cid << "]"; return name.str(); } @@ -337,77 +397,40 @@ void ServerChannelRequesterImpl::unlock() void ServerChannelRequesterImpl::send(ByteBuffer* buffer, TransportSendControl* control) { - Channel::shared_pointer channel; + ServerChannel::shared_pointer serverChannel; Status status; { Lock guard(_mutex); - channel = _channel; + serverChannel = _serverChannel.lock(); status = _status; - - // TODO - _channel.reset(); } // error response - if (channel.get() == NULL) + if (serverChannel.get() == NULL) { createChannelFailedResponse(buffer, control, status); } // OK - else + else if (Transport::shared_pointer transport = _transport.lock()) { - ServerChannel::shared_pointer serverChannel; - try - { - // NOTE: we do not explicitly check if transport OK - ChannelHostingTransport::shared_pointer casTransport = dynamic_pointer_cast(_transport); - - // - // create a new channel instance - // - pvAccessID sid = casTransport->preallocateChannelSID(); - try - { - epics::pvData::PVField::shared_pointer securityToken = casTransport->getSecurityToken(); - serverChannel.reset(new ServerChannelImpl(channel, _cid, sid, securityToken)); - - // ack allocation and register - casTransport->registerChannel(sid, serverChannel); - - } catch (...) - { - // depreallocate and rethrow - casTransport->depreallocateChannelSID(sid); - throw; - } - - control->startMessage((int8)7, 2*sizeof(int32)/sizeof(int8)); - buffer->putInt(_cid); - buffer->putInt(sid); - _transport->getIntrospectionRegistry()->serializeStatus(buffer, control, status); - } - catch (std::exception& e) - { - errlogSevPrintf(errlogMinor, "Exception caught when creating channel: %s", _channelName.c_str()); - createChannelFailedResponse(buffer, control, Status(Status::STATUSTYPE_FATAL, "failed to create channel", e.what())); - // TODO make sure that serverChannel gets destroyed - } - catch (...) - { - errlogSevPrintf(errlogMinor, "Exception caught when creating channel: %s", _channelName.c_str()); - createChannelFailedResponse(buffer, control, Status(Status::STATUSTYPE_FATAL, "failed to create channel")); - // TODO make sure that serverChannel gets destroyed - } + ServerChannelImpl::shared_pointer serverChannelImpl = dynamic_pointer_cast(serverChannel); + control->startMessage((int8)CMD_CREATE_CHANNEL, 2*sizeof(int32)/sizeof(int8)); + buffer->putInt(serverChannelImpl->getCID()); + buffer->putInt(serverChannelImpl->getSID()); + transport->getIntrospectionRegistry()->serializeStatus(buffer, control, status); } } void ServerChannelRequesterImpl::createChannelFailedResponse(ByteBuffer* buffer, TransportSendControl* control, const Status& status) { - control->startMessage((int8)7, 2*sizeof(int32)/sizeof(int8)); - buffer->putInt(_cid); - buffer->putInt(-1); - _transport->getIntrospectionRegistry()->serializeStatus(buffer, control, status); + if (Transport::shared_pointer transport = _transport.lock()) + { + control->startMessage((int8)CMD_CREATE_CHANNEL, 2*sizeof(int32)/sizeof(int8)); + buffer->putInt(_cid); + buffer->putInt(-1); + transport->getIntrospectionRegistry()->serializeStatus(buffer, control, status); + } } /****************************************************************************************/ @@ -473,7 +496,7 @@ void ServerGetHandler::handleResponse(osiSockAddr* responseFrom, ServerChannelImpl::shared_pointer channel = static_pointer_cast(casTransport->getChannel(sid)); if (channel.get() == NULL) { - BaseChannelRequester::sendFailureMessage((int8)10, transport, ioid, qosCode, BaseChannelRequester::badCIDStatus); + BaseChannelRequester::sendFailureMessage((int8)CMD_GET, transport, ioid, qosCode, BaseChannelRequester::badCIDStatus); return; } @@ -584,6 +607,8 @@ void ServerChannelGetRequesterImpl::destroy() _channelGet->destroy(); } } + // TODO not competely safe for when callig getChannelGet() now + _channelGet.reset(); } ChannelGet::shared_pointer ServerChannelGetRequesterImpl::getChannelGet() @@ -605,7 +630,7 @@ void ServerChannelGetRequesterImpl::send(ByteBuffer* buffer, TransportSendContro { const int32 request = getPendingRequest(); - control->startMessage((int8)10, sizeof(int32)/sizeof(int8) + 1); + control->startMessage((int8)CMD_GET, sizeof(int32)/sizeof(int8) + 1); buffer->putInt(_ioid); buffer->put((int8)request); IntrospectionRegistry* introspectionRegistry = _transport->getIntrospectionRegistry(); @@ -788,6 +813,8 @@ void ServerChannelPutRequesterImpl::destroy() _channelPut->destroy(); } } + // TODO + _channelPut.reset(); } ChannelPut::shared_pointer ServerChannelPutRequesterImpl::getChannelPut() @@ -1006,6 +1033,8 @@ void ServerChannelPutGetRequesterImpl::destroy() _channelPutGet->destroy(); } } + // TODO + _channelPutGet.reset(); } ChannelPutGet::shared_pointer ServerChannelPutGetRequesterImpl::getChannelPutGet() @@ -1224,6 +1253,8 @@ void ServerMonitorRequesterImpl::destroy() _channelMonitor->destroy(); } } + // TODO + _channelMonitor.reset(); } Monitor::shared_pointer ServerMonitorRequesterImpl::getChannelMonitor() @@ -1450,6 +1481,8 @@ void ServerChannelArrayRequesterImpl::destroy() _channelArray->destroy(); } } + // TODO + _channelArray.reset(); } ChannelArray::shared_pointer ServerChannelArrayRequesterImpl::getChannelArray() @@ -1664,6 +1697,8 @@ void ServerChannelProcessRequesterImpl::destroy() _channelProcess->destroy(); } } + // TODO + _channelProcess.reset(); } ChannelProcess::shared_pointer ServerChannelProcessRequesterImpl::getChannelProcess() @@ -1909,6 +1944,8 @@ void ServerChannelRPCRequesterImpl::destroy() _channelRPC->destroy(); } } + // TODO + _channelRPC.reset(); } ChannelRPC::shared_pointer ServerChannelRPCRequesterImpl::getChannelRPC() diff --git a/pvAccessApp/server/responseHandlers.h b/pvAccessApp/server/responseHandlers.h index 4c86a67..5708127 100644 --- a/pvAccessApp/server/responseHandlers.h +++ b/pvAccessApp/server/responseHandlers.h @@ -227,11 +227,11 @@ namespace epics { void unlock(); void send(epics::pvData::ByteBuffer* buffer, TransportSendControl* control); private: - Transport::shared_pointer _transport; + ServerChannel::weak_pointer _serverChannel; + Transport::weak_pointer _transport; const String _channelName; const pvAccessID _cid; epics::pvData::Status _status; - Channel::shared_pointer _channel; epics::pvData::Mutex _mutex; void createChannelFailedResponse(epics::pvData::ByteBuffer* buffer, TransportSendControl* control, const epics::pvData::Status& status); }; diff --git a/pvAccessApp/server/serverChannelImpl.h b/pvAccessApp/server/serverChannelImpl.h index a81ac38..dac429e 100644 --- a/pvAccessApp/server/serverChannelImpl.h +++ b/pvAccessApp/server/serverChannelImpl.h @@ -18,6 +18,8 @@ class ServerChannelImpl : public ServerChannel public: typedef std::tr1::shared_ptr shared_pointer; typedef std::tr1::shared_ptr const_shared_pointer; + typedef std::tr1::weak_ptr weak_pointer; + typedef std::tr1::weak_ptr const_weak_pointer; /** * Create server channel for given process variable. * @param channel local channel. diff --git a/pvAccessApp/utils/introspectionRegistry.cpp b/pvAccessApp/utils/introspectionRegistry.cpp index b401114..a909d54 100644 --- a/pvAccessApp/utils/introspectionRegistry.cpp +++ b/pvAccessApp/utils/introspectionRegistry.cpp @@ -293,7 +293,12 @@ StructureConstPtr IntrospectionRegistry::deserializeStructureField(ByteBuffer* b fields = new FieldConstPtr[size]; for(int i = 0; i < size; i++) { + try { fields[i] = deserialize(buffer, control, registry); + } catch (...) { + delete[] fields; + throw; + } } } diff --git a/testApp/remote/testServer.cpp b/testApp/remote/testServer.cpp index 851c5af..d281281 100644 --- a/testApp/remote/testServer.cpp +++ b/testApp/remote/testServer.cpp @@ -1098,7 +1098,7 @@ void testServer() ctx->printInfo(); - ctx->run(100); + ctx->run(30); ctx->destroy();