diff --git a/src/remoteClient/clientContextImpl.cpp b/src/remoteClient/clientContextImpl.cpp index 226cce1..d08f4bb 100644 --- a/src/remoteClient/clientContextImpl.cpp +++ b/src/remoteClient/clientContextImpl.cpp @@ -298,7 +298,7 @@ public: try { startRequest(PURE_CANCEL_REQUEST); - m_channel->checkAndGetTransport()->enqueueSendRequest(external_from_this()); + m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this()); } catch (std::exception& e) { // noop (do not complain if fails) LOG(logLevelWarn, "Ignore exception during ChanneGet::cancel: %s", e.what()); @@ -337,7 +337,7 @@ public: try { startRequest(PURE_DESTROY_REQUEST); - m_channel->checkAndGetTransport()->enqueueSendRequest(external_from_this()); + m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this()); } catch (std::tr1::bad_weak_ptr& e) { // noop (do not complain if fails) } catch (std::exception& e) { @@ -368,7 +368,7 @@ public: if (transport.get() != 0 && !m_subscribed.get() && startRequest(QOS_INIT)) { m_subscribed.set(); - transport->enqueueSendRequest(external_from_this()); + transport->enqueueSendRequest(internal_from_this()); } } @@ -503,7 +503,7 @@ public: } try { - m_channel->checkAndGetTransport()->enqueueSendRequest(external_from_this()); + m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this()); } catch (std::runtime_error &rte) { stopRequest(); EXCEPTION_GUARD(m_callback->processDone(channelNotConnected, thisPtr)); @@ -693,7 +693,7 @@ public: } try { - m_channel->checkAndGetTransport()->enqueueSendRequest(external_from_this()); + m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this()); //TODO bulk hack m_channel->checkAndGetTransport()->enqueueOnlySendRequest(thisSender); } catch (std::runtime_error &rte) { stopRequest(); @@ -893,7 +893,7 @@ public: try { - m_channel->checkAndGetTransport()->enqueueSendRequest(external_from_this()); + m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this()); } catch (std::runtime_error &rte) { stopRequest(); EXCEPTION_GUARD(m_channelPutRequester->getDone(channelNotConnected, thisPtr, PVStructurePtr(), BitSetPtr())); @@ -938,7 +938,7 @@ public: *m_bitSet = *pvPutBitSet; m_structure->copyUnchecked(*pvPutStructure, *m_bitSet); unlock(); - m_channel->checkAndGetTransport()->enqueueSendRequest(external_from_this()); + m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this()); } catch (std::runtime_error &rte) { stopRequest(); EXCEPTION_GUARD(m_channelPutRequester->putDone(channelNotConnected, thisPtr)); @@ -1196,7 +1196,7 @@ public: *m_putDataBitSet = *bitSet; m_putData->copyUnchecked(*pvPutStructure, *m_putDataBitSet); unlock(); - m_channel->checkAndGetTransport()->enqueueSendRequest(external_from_this()); + m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this()); } catch (std::runtime_error &rte) { stopRequest(); EXCEPTION_GUARD(m_channelPutGetRequester->putGetDone(channelNotConnected, thisPtr, PVStructurePtr(), BitSetPtr())); @@ -1225,7 +1225,7 @@ public: } try { - m_channel->checkAndGetTransport()->enqueueSendRequest(external_from_this()); + m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this()); } catch (std::runtime_error &rte) { stopRequest(); EXCEPTION_GUARD(m_channelPutGetRequester->getGetDone(channelNotConnected, thisPtr, PVStructurePtr(), BitSetPtr())); @@ -1254,7 +1254,7 @@ public: } try { - m_channel->checkAndGetTransport()->enqueueSendRequest(external_from_this()); + m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this()); } catch (std::runtime_error &rte) { stopRequest(); EXCEPTION_GUARD(m_channelPutGetRequester->getPutDone(channelNotConnected, thisPtr, PVStructurePtr(), BitSetPtr())); @@ -1437,7 +1437,7 @@ public: m_structure = pvArgument; m_structureMutex.unlock(); - m_channel->checkAndGetTransport()->enqueueSendRequest(external_from_this()); + m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this()); } catch (std::runtime_error &rte) { stopRequest(); EXCEPTION_GUARD(m_channelRPCRequester->requestDone(channelNotConnected, thisPtr, PVStructurePtr())); @@ -1674,7 +1674,7 @@ public: m_count = count; m_stride = stride; } - m_channel->checkAndGetTransport()->enqueueSendRequest(external_from_this()); + m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this()); } catch (std::runtime_error &rte) { stopRequest(); EXCEPTION_GUARD(m_channelArrayRequester->getArrayDone(channelNotConnected, thisPtr, PVArray::shared_pointer())); @@ -1718,7 +1718,7 @@ public: m_count = count; m_stride = stride; } - m_channel->checkAndGetTransport()->enqueueSendRequest(external_from_this()); + m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this()); } catch (std::runtime_error &rte) { stopRequest(); EXCEPTION_GUARD(m_channelArrayRequester->putArrayDone(channelNotConnected, thisPtr)); @@ -1751,7 +1751,7 @@ public: Lock lock(m_structureMutex); m_length = length; } - m_channel->checkAndGetTransport()->enqueueSendRequest(external_from_this()); + m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this()); } catch (std::runtime_error &rte) { stopRequest(); EXCEPTION_GUARD(m_channelArrayRequester->setLengthDone(channelNotConnected, thisPtr)); @@ -1781,7 +1781,7 @@ public: } try { - m_channel->checkAndGetTransport()->enqueueSendRequest(external_from_this()); + m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this()); } catch (std::runtime_error &rte) { stopRequest(); EXCEPTION_GUARD(m_channelArrayRequester->getLengthDone(channelNotConnected, thisPtr, 0)); @@ -2237,7 +2237,7 @@ public: startRequest(m_pipeline ? (QOS_INIT | QOS_GET_PUT) : QOS_INIT)) { m_subscribed.set(); - transport->enqueueSendRequest(external_from_this()); + transport->enqueueSendRequest(internal_from_this()); } } @@ -2386,7 +2386,7 @@ public: try { - m_channel->checkAndGetTransport()->enqueueSendRequest(external_from_this()); + m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this()); m_started = true; return Status::Ok; } catch (std::runtime_error &rte) { @@ -2412,7 +2412,7 @@ public: try { - m_channel->checkAndGetTransport()->enqueueSendRequest(external_from_this()); + m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this()); m_started = false; return Status::Ok; } catch (std::runtime_error &rte) { @@ -3132,13 +3132,21 @@ public: */ class InternalChannelImpl : public ChannelImpl, - public std::tr1::enable_shared_from_this, public TimerCallback { public: POINTER_DEFINITIONS(InternalChannelImpl); private: + const weak_pointer m_external_this, m_internal_this; + + shared_pointer external_from_this() { + return shared_pointer(m_external_this); + } + shared_pointer internal_from_this() { + return shared_pointer(m_internal_this); + } + /** * Context. */ @@ -3267,8 +3275,7 @@ private: void activate() { // register before issuing search request - ChannelImpl::shared_pointer thisPointer = shared_from_this(); - m_context->registerChannel(thisPointer); + m_context->registerChannel(internal_from_this()); // connect connect(); @@ -3283,11 +3290,13 @@ private: short priority, auto_ptr& addresses) { - std::tr1::shared_ptr tp( - new InternalChannelImpl(context, channelID, name, requester, priority, addresses)); - ChannelImpl::shared_pointer thisPointer = tp; - static_cast(thisPointer.get())->activate(); - return thisPointer; + std::tr1::shared_ptr internal( + new InternalChannelImpl(context, channelID, name, requester, priority, addresses)), + external(internal.get(), Destroyable::cleaner(internal)); + const_cast(internal->m_internal_this) = internal; + const_cast(internal->m_external_this) = external; + internal->activate(); + return external; } virtual ~InternalChannelImpl() @@ -3451,7 +3460,7 @@ public: } m_transport = transport; - m_transport->enqueueSendRequest(shared_from_this()); + m_transport->enqueueSendRequest(internal_from_this()); } virtual void cancel() { @@ -3539,8 +3548,7 @@ public: //throw std::runtime_error("Channel already destroyed."); } - // do destruction via context - m_context->destroyChannel(shared_from_this(), force); + destroyChannel(force); } @@ -3558,8 +3566,10 @@ public: if (m_connectionState == DESTROYED) throw std::runtime_error("Channel already destroyed."); + m_getfield.reset(); + // stop searching... - SearchInstance::shared_pointer thisChannelPointer = shared_from_this(); + shared_pointer thisChannelPointer = internal_from_this(); m_context->getChannelSearchManager()->unregisterSearchInstance(thisChannelPointer); cancel(); @@ -3580,7 +3590,7 @@ public: setConnectionState(DESTROYED); // unregister - m_context->unregisterChannel(shared_from_this()); + m_context->unregisterChannel(thisChannelPointer); } // should be called without any lock hold @@ -3603,7 +3613,7 @@ public: if (!initiateSearch) { // stop searching... - m_context->getChannelSearchManager()->unregisterSearchInstance(shared_from_this()); + m_context->getChannelSearchManager()->unregisterSearchInstance(internal_from_this()); cancel(); } setConnectionState(DISCONNECTED); @@ -3615,7 +3625,7 @@ public: { if (remoteDestroy) { m_issueCreateMessage = false; - m_transport->enqueueSendRequest(shared_from_this()); + m_transport->enqueueSendRequest(internal_from_this()); } m_transport->release(getID()); @@ -3651,12 +3661,11 @@ public: if (!m_addresses.get()) { - m_context->getChannelSearchManager()->registerSearchInstance(shared_from_this(), penalize); + m_context->getChannelSearchManager()->registerSearchInstance(internal_from_this(), penalize); } else if (!m_addresses->empty()) { - TimerCallback::shared_pointer tc = std::tr1::dynamic_pointer_cast(shared_from_this()); - m_context->getTimer()->scheduleAfterDelay(tc, + m_context->getTimer()->scheduleAfterDelay(internal_from_this(), (m_addressIndex / m_addresses->size())*STATIC_SEARCH_BASE_DELAY_SEC); } } @@ -3698,7 +3707,7 @@ public: } // NOTE: this creates a new or acquires an existing transport (implies increases usage count) - transport = m_context->getTransport(shared_from_this(), serverAddress, minorRevision, m_priority); + transport = m_context->getTransport(internal_from_this(), serverAddress, minorRevision, m_priority); if (!transport.get()) { createChannelFailed(); @@ -3813,7 +3822,7 @@ public: void reportChannelStateChange() { - Channel::shared_pointer self(shared_from_this()); + shared_pointer self(external_from_this()); while (true) { @@ -3978,49 +3987,49 @@ public: ChannelProcessRequester::shared_pointer const & requester, epics::pvData::PVStructure::shared_pointer const & pvRequest) { - return BaseRequestImpl::build(shared_from_this(), requester, pvRequest); + return BaseRequestImpl::build(external_from_this(), requester, pvRequest); } virtual ChannelGet::shared_pointer createChannelGet( ChannelGetRequester::shared_pointer const & requester, epics::pvData::PVStructure::shared_pointer const & pvRequest) { - return BaseRequestImpl::build(shared_from_this(), requester, pvRequest); + return BaseRequestImpl::build(external_from_this(), requester, pvRequest); } virtual ChannelPut::shared_pointer createChannelPut( ChannelPutRequester::shared_pointer const & requester, epics::pvData::PVStructure::shared_pointer const & pvRequest) { - return BaseRequestImpl::build(shared_from_this(), requester, pvRequest); + return BaseRequestImpl::build(external_from_this(), requester, pvRequest); } virtual ChannelPutGet::shared_pointer createChannelPutGet( ChannelPutGetRequester::shared_pointer const & requester, epics::pvData::PVStructure::shared_pointer const & pvRequest) { - return BaseRequestImpl::build(shared_from_this(), requester, pvRequest); + return BaseRequestImpl::build(external_from_this(), requester, pvRequest); } virtual ChannelRPC::shared_pointer createChannelRPC( ChannelRPCRequester::shared_pointer const & requester, epics::pvData::PVStructure::shared_pointer const & pvRequest) { - return BaseRequestImpl::build(shared_from_this(), requester, pvRequest); + return BaseRequestImpl::build(external_from_this(), requester, pvRequest); } virtual Monitor::shared_pointer createMonitor( MonitorRequester::shared_pointer const & requester, epics::pvData::PVStructure::shared_pointer const & pvRequest) { - return BaseRequestImpl::build(shared_from_this(), requester, pvRequest); + return BaseRequestImpl::build(external_from_this(), requester, pvRequest); } virtual ChannelArray::shared_pointer createChannelArray( ChannelArrayRequester::shared_pointer const & requester, epics::pvData::PVStructure::shared_pointer const & pvRequest) { - return BaseRequestImpl::build(shared_from_this(), requester, pvRequest); + return BaseRequestImpl::build(external_from_this(), requester, pvRequest); } @@ -4188,8 +4197,10 @@ private: osiSockAttach(); m_timer.reset(new Timer("pvAccess-client timer", lowPriority)); Context::shared_pointer thisPointer = shared_from_this(); + // stores weak_ptr m_connector.reset(new BlockingTCPConnector(thisPointer, m_receiveBufferSize, m_connectionTimeout)); + // stores many weak_ptr m_responseHandler.reset(new ClientResponseHandler(shared_from_this())); // preinitialize security plugins @@ -4456,6 +4467,7 @@ private: BeaconHandler::shared_pointer handler; if (it == m_beaconHandlers.end()) { + // stores weak_ptr handler.reset(new BeaconHandler(shared_from_this(), protocol, responseFrom)); m_beaconHandlers[*responseFrom] = handler; } @@ -4524,35 +4536,6 @@ private: } } - /** - * Destroy channel. - * @param channel - * @param force - * @throws PVAException - * @throws std::runtime_error - */ - void destroyChannel(ChannelImpl::shared_pointer const & channel, bool force) { - - string name = channel->getChannelName(); - bool lockAcquired = true; //namedLocker->acquireSynchronizationObject(name, LOCK_TIMEOUT); - if (lockAcquired) - { - try - { - channel->destroyChannel(force); - } - catch(...) { - // TODO - } - // TODO namedLocker->releaseSynchronizationObject(channel.getChannelName()); - } - else - { - // TODO is this OK? - throw std::runtime_error("Failed to obtain synchronization lock for '" + name + "', possible deadlock."); - } - } - virtual void configure(epics::pvData::PVStructure::shared_pointer configuration) { // remove? if (m_transportRegistry.numberOfActiveTransports() > 0) @@ -4905,17 +4888,16 @@ public: void InternalClientContextImpl::InternalChannelImpl::getField(GetFieldRequester::shared_pointer const & requester,std::string const & subField) { - ChannelGetFieldRequestImpl::shared_pointer self(new ChannelGetFieldRequestImpl(shared_from_this(), requester, subField)); + ChannelGetFieldRequestImpl::shared_pointer self(new ChannelGetFieldRequestImpl(internal_from_this(), requester, subField)); self->activate(); // activate() stores self in channel } ChannelProvider::shared_pointer createClientProvider(const Configuration::shared_pointer& conf) { - InternalClientContextImpl::shared_pointer internal(new InternalClientContextImpl(conf)), - external(internal.get(), Destroyable::cleaner(internal)); - internal->initialize(); - return external; + InternalClientContextImpl::shared_pointer prov(new InternalClientContextImpl(conf)); + prov->initialize(); + return prov; } } diff --git a/src/remoteClient/pv/clientContextImpl.h b/src/remoteClient/pv/clientContextImpl.h index e8a802c..45ed1f2 100644 --- a/src/remoteClient/pv/clientContextImpl.h +++ b/src/remoteClient/pv/clientContextImpl.h @@ -108,8 +108,10 @@ public: virtual void registerChannel(ChannelImpl::shared_pointer const & channel) = 0; virtual void unregisterChannel(ChannelImpl::shared_pointer const & channel) = 0; - virtual void destroyChannel(ChannelImpl::shared_pointer const & channel, bool force) = 0; - virtual ChannelImpl::shared_pointer createChannelInternal(std::string const &name, ChannelRequester::shared_pointer const & requester, short priority, std::auto_ptr& addresses) = 0; + virtual ChannelImpl::shared_pointer createChannelInternal(std::string const &name, + ChannelRequester::shared_pointer const & requester, + short priority, + std::auto_ptr& addresses) = 0; virtual ResponseRequest::shared_pointer getResponseRequest(pvAccessID ioid) = 0; virtual pvAccessID registerResponseRequest(ResponseRequest::shared_pointer const & request) = 0;