diff --git a/src/client/pv/monitor.h b/src/client/pv/monitor.h index 39b16dd..1b83d50 100644 --- a/src/client/pv/monitor.h +++ b/src/client/pv/monitor.h @@ -54,7 +54,7 @@ class epicsShareClass MonitorElement { * This is used by pvAccess to implement monitors. * @author mrk */ -class epicsShareClass Monitor : public epics::pvData::Destroyable{ +class epicsShareClass Monitor : public virtual epics::pvData::Destroyable{ public: POINTER_DEFINITIONS(Monitor); virtual ~Monitor(){} diff --git a/src/client/pv/pvAccess.h b/src/client/pv/pvAccess.h index 9463b8f..ee788c6 100644 --- a/src/client/pv/pvAccess.h +++ b/src/client/pv/pvAccess.h @@ -138,7 +138,7 @@ struct epicsShareClass ChannelBaseRequester : virtual public epics::pvData::Requ /** * Base interface for all channel requests (aka. Operations). */ -class epicsShareClass ChannelRequest : public epics::pvData::Destroyable, public Lockable, private epics::pvData::NoDefaultMethods { +class epicsShareClass ChannelRequest : public virtual epics::pvData::Destroyable, public Lockable, private epics::pvData::NoDefaultMethods { public: POINTER_DEFINITIONS(ChannelRequest); diff --git a/src/remoteClient/clientContextImpl.cpp b/src/remoteClient/clientContextImpl.cpp index 63e2b6e..a4a401c 100644 --- a/src/remoteClient/clientContextImpl.cpp +++ b/src/remoteClient/clientContextImpl.cpp @@ -47,6 +47,8 @@ using namespace epics::pvData; namespace epics { namespace pvAccess { +class ChannelGetFieldRequestImpl; + Status ChannelImpl::channelDestroyed( Status::STATUSTYPE_WARNING, "channel destroyed"); Status ChannelImpl::channelDisconnected( @@ -62,27 +64,6 @@ typedef std::map IOIDResponseRequestM catch (std::exception &e) { LOG(logLevelError, "Unhandled exception caught from client code at %s:%d: %s", __FILE__, __LINE__, e.what()); } \ catch (...) { LOG(logLevelError, "Unhandled exception caught from client code at %s:%d.", __FILE__, __LINE__); } -struct delayed_destroyable_deleter -{ - template void operator()(T * p) - { - try - { - // new owner, this also allows to use shared_from_this() in destroy() method - std::tr1::shared_ptr ptr(p); - ptr->destroy(); - } - catch(std::exception &ex) - { - printf("delayed_destroyable_deleter: unhandled exception: %s", ex.what()); - } - catch(...) - { - printf("delayed_destroyable_deleter: unhandled exception"); - } - } -}; - /** * Base channel request. * @author Matej Sekoranja @@ -91,12 +72,10 @@ class BaseRequestImpl : public DataResponse, public SubscriptionRequest, public TransportSender, - public Destroyable, - public std::tr1::enable_shared_from_this + public virtual Destroyable { public: - typedef std::tr1::shared_ptr shared_pointer; - typedef std::tr1::shared_ptr const_shared_pointer; + POINTER_DEFINITIONS(BaseRequestImpl); static PVDataCreatePtr pvDataCreate; @@ -153,8 +132,51 @@ protected: Mutex m_mutex; - // used to hold ownership until create is called (to support complete async usage) - ResponseRequest::shared_pointer m_thisPointer; + /* ownership here is a bit complicated... + * + * each instance maintains two shared_ptr/weak_ptr + * 1. internal - calls 'delete' when ref count reaches zero + * 2. external - wraps 'internal' ref. calls ->destroy() and releases internal ref. when ref count reaches zero + * + * Any internal ref. loops must be broken by destroy() + * + * Only external refs. are returned by Channel::create*() or passed to *Requester methods. + * + * Internal refs. are held by internal relations which need to ensure memory is not + * prematurely free'd, but should not keep the channel/operation "alive". + * eg. A Channel holds an internal ref to ChannelGet + */ + const BaseRequestImpl::weak_pointer m_this_internal, + m_this_external; + + template + std::tr1::shared_ptr internal_from_this() { + ResponseRequest::shared_pointer P(m_this_internal); + return std::tr1::static_pointer_cast(P); + } + template + std::tr1::shared_ptr external_from_this() { + ResponseRequest::shared_pointer P(m_this_external); + return std::tr1::static_pointer_cast(P); + } + + template + static + typename std::tr1::shared_ptr + build(ChannelImpl::shared_pointer const & channel, + const typename subklass::requester_type::shared_pointer& requester, + const epics::pvData::PVStructure::shared_pointer& pvRequest) + { + std::tr1::shared_ptr internal(new subklass(channel, requester, pvRequest)), + external(internal.get(), + Destroyable::cleaner(internal)); + // only we get to set these, but since this isn't the ctor, we aren't able to + // follow the rules. + const_cast(internal->m_this_internal) = internal; + const_cast(internal->m_this_external) = external; + internal->activate(); + return external; + } bool m_destroyed; bool m_initialized; @@ -163,8 +185,6 @@ protected: AtomicBoolean m_subscribed; - virtual ~BaseRequestImpl() {}; - BaseRequestImpl(ChannelImpl::shared_pointer const & channel, ChannelBaseRequester::shared_pointer const & requester) : m_channel(channel), m_requester(requester), @@ -173,15 +193,16 @@ protected: m_destroyed(false), m_initialized(false), m_subscribed() - { - } + {} - void activate() { + virtual ~BaseRequestImpl() {} + + virtual void activate() { // register response request // ResponseRequest::shared_pointer to this instance must already exist - m_thisPointer = shared_from_this(); - m_ioid = m_channel->getContext()->registerResponseRequest(m_thisPointer); - m_channel->registerResponseRequest(m_thisPointer); + shared_pointer self(m_this_internal); + m_ioid = m_channel->getContext()->registerResponseRequest(self); + m_channel->registerResponseRequest(self); } bool startRequest(int32 qos) { @@ -238,10 +259,6 @@ public: m_mutex.unlock(); } - // we are initialized now, release pointer - // this is safe since at least caller owns it - m_thisPointer.reset(); - initResponse(transport, version, payloadBuffer, qos, status); } else @@ -281,7 +298,7 @@ public: try { startRequest(PURE_CANCEL_REQUEST); - m_channel->checkAndGetTransport()->enqueueSendRequest(shared_from_this()); + m_channel->checkAndGetTransport()->enqueueSendRequest(external_from_this()); } catch (std::exception& e) { // noop (do not complain if fails) LOG(logLevelWarn, "Ignore exception during ChanneGet::cancel: %s", e.what()); @@ -320,16 +337,13 @@ public: try { startRequest(PURE_DESTROY_REQUEST); - m_channel->checkAndGetTransport()->enqueueSendRequest(shared_from_this()); + m_channel->checkAndGetTransport()->enqueueSendRequest(external_from_this()); } catch (std::exception& e) { // noop (do not complain if fails) LOG(logLevelWarn, "Ignore exception during BaseRequestImpl::destroy: %s", e.what()); } } - - // in case this instance is destroyed uninitialized - m_thisPointer.reset(); } virtual void timeout() { @@ -353,7 +367,7 @@ public: if (transport.get() != 0 && !m_subscribed.get() && startRequest(QOS_INIT)) { m_subscribed.set(); - transport->enqueueSendRequest(shared_from_this()); + transport->enqueueSendRequest(external_from_this()); } } @@ -402,7 +416,7 @@ class ChannelProcessRequestImpl : public BaseRequestImpl, public ChannelProcess { -private: +public: ChannelProcessRequester::shared_pointer m_callback; PVStructure::shared_pointer m_pvRequest; @@ -414,7 +428,7 @@ private: PVACCESS_REFCOUNT_MONITOR_CONSTRUCT(channelProcess); } - void activate() + virtual void activate() { BaseRequestImpl::activate(); @@ -425,23 +439,17 @@ private: try { resubscribeSubscription(m_channel->checkDestroyedAndGetTransport()); } catch (std::runtime_error &rte) { - ChannelProcess::shared_pointer thisPointer = dynamic_pointer_cast(shared_from_this()); - EXCEPTION_GUARD(m_callback->channelProcessConnect(channelDestroyed, thisPointer)); + EXCEPTION_GUARD(m_callback->channelProcessConnect(channelDestroyed, external_from_this())); BaseRequestImpl::destroy(true); } } public: - static ChannelProcess::shared_pointer create(ChannelImpl::shared_pointer const & channel, ChannelProcessRequester::shared_pointer const & callback, PVStructure::shared_pointer const & pvRequest) + static ChannelProcess::shared_pointer create(ChannelImpl::shared_pointer const & channel, + ChannelProcessRequester::shared_pointer const & requester, + PVStructure::shared_pointer const & pvRequest) { - // TODO use std::make_shared - std::tr1::shared_ptr tp( - new ChannelProcessRequestImpl(channel, callback, pvRequest), - delayed_destroyable_deleter() - ); - ChannelProcess::shared_pointer thisPointer = tp; - static_cast(thisPointer.get())->activate(); - return thisPointer; + return build(channel, requester, pvRequest); } ~ChannelProcessRequestImpl() @@ -472,18 +480,16 @@ public: } virtual void initResponse(Transport::shared_pointer const & /*transport*/, int8 /*version*/, ByteBuffer* /*payloadBuffer*/, int8 /*qos*/, const Status& status) { - ChannelProcess::shared_pointer thisPtr = dynamic_pointer_cast(shared_from_this()); - EXCEPTION_GUARD(m_callback->channelProcessConnect(status, thisPtr)); + EXCEPTION_GUARD(m_callback->channelProcessConnect(status, external_from_this())); } virtual void normalResponse(Transport::shared_pointer const & /*transport*/, int8 /*version*/, ByteBuffer* /*payloadBuffer*/, int8 /*qos*/, const Status& status) { - ChannelProcess::shared_pointer thisPtr = dynamic_pointer_cast(shared_from_this()); - EXCEPTION_GUARD(m_callback->processDone(status, thisPtr)); + EXCEPTION_GUARD(m_callback->processDone(status, external_from_this())); } virtual void process() { - ChannelProcess::shared_pointer thisPtr = dynamic_pointer_cast(shared_from_this()); + ChannelProcess::shared_pointer thisPtr(external_from_this()); { Lock guard(m_mutex); @@ -503,7 +509,7 @@ public: } try { - m_channel->checkAndGetTransport()->enqueueSendRequest(shared_from_this()); + m_channel->checkAndGetTransport()->enqueueSendRequest(external_from_this()); } catch (std::runtime_error &rte) { stopRequest(); EXCEPTION_GUARD(m_callback->processDone(channelNotConnected, thisPtr)); @@ -543,7 +549,7 @@ class ChannelGetImpl : public BaseRequestImpl, public ChannelGet { -private: +public: ChannelGetRequester::shared_pointer m_channelGetRequester; PVStructure::shared_pointer m_pvRequest; @@ -561,12 +567,11 @@ private: PVACCESS_REFCOUNT_MONITOR_CONSTRUCT(channelGet); } - void activate() + virtual void activate() { if (!m_pvRequest) { - ChannelGet::shared_pointer thisPointer = dynamic_pointer_cast(shared_from_this()); - EXCEPTION_GUARD(m_channelGetRequester->channelGetConnect(pvRequestNull, thisPointer, StructureConstPtr())); + EXCEPTION_GUARD(m_channelGetRequester->channelGetConnect(pvRequestNull, external_from_this(), StructureConstPtr())); return; } @@ -578,22 +583,17 @@ private: try { resubscribeSubscription(m_channel->checkDestroyedAndGetTransport()); } catch (std::runtime_error &rte) { - ChannelGet::shared_pointer thisPointer = dynamic_pointer_cast(shared_from_this()); - EXCEPTION_GUARD(m_channelGetRequester->channelGetConnect(channelDestroyed, thisPointer, StructureConstPtr())); + EXCEPTION_GUARD(m_channelGetRequester->channelGetConnect(channelDestroyed, external_from_this(), StructureConstPtr())); BaseRequestImpl::destroy(true); } } public: - static ChannelGet::shared_pointer create(ChannelImpl::shared_pointer const & channel, ChannelGetRequester::shared_pointer const & channelGetRequester, PVStructure::shared_pointer const & pvRequest) + static ChannelGet::shared_pointer create(ChannelImpl::shared_pointer const & channel, + ChannelGetRequester::shared_pointer const & requester, + PVStructure::shared_pointer const & pvRequest) { - // TODO use std::make_shared - std::tr1::shared_ptr tp( - new ChannelGetImpl(channel, channelGetRequester, pvRequest), - delayed_destroyable_deleter()); - ChannelGet::shared_pointer thisPointer = tp; - static_cast(thisPointer.get())->activate(); - return thisPointer; + return build(channel, requester, pvRequest); } ~ChannelGetImpl() @@ -632,8 +632,7 @@ public: virtual void initResponse(Transport::shared_pointer const & transport, int8 /*version*/, ByteBuffer* payloadBuffer, int8 /*qos*/, const Status& status) { if (!status.isSuccess()) { - ChannelGet::shared_pointer thisPointer = dynamic_pointer_cast(shared_from_this()); - EXCEPTION_GUARD(m_channelGetRequester->channelGetConnect(status, thisPointer, StructureConstPtr())); + EXCEPTION_GUARD(m_channelGetRequester->channelGetConnect(status, external_from_this(), StructureConstPtr())); return; } @@ -645,19 +644,16 @@ public: } // notify - ChannelGet::shared_pointer thisChannelGet = dynamic_pointer_cast(shared_from_this()); - EXCEPTION_GUARD(m_channelGetRequester->channelGetConnect(status, thisChannelGet, m_structure->getStructure())); + EXCEPTION_GUARD(m_channelGetRequester->channelGetConnect(status, external_from_this(), m_structure->getStructure())); } virtual void normalResponse(Transport::shared_pointer const & transport, int8 /*version*/, ByteBuffer* payloadBuffer, int8 /*qos*/, const Status& status) { MB_POINT(channelGet, 8, "client channelGet->deserialize (start)"); - ChannelGet::shared_pointer thisPtr = dynamic_pointer_cast(shared_from_this()); - if (!status.isSuccess()) { - EXCEPTION_GUARD(m_channelGetRequester->getDone(status, thisPtr, PVStructurePtr(), BitSetPtr())); + EXCEPTION_GUARD(m_channelGetRequester->getDone(status, external_from_this(), PVStructurePtr(), BitSetPtr())); return; } @@ -670,7 +666,7 @@ public: MB_POINT(channelGet, 9, "client channelGet->deserialize (end), just before channelGet->getDone() is called"); - EXCEPTION_GUARD(m_channelGetRequester->getDone(status, thisPtr, m_structure, m_bitSet)); + EXCEPTION_GUARD(m_channelGetRequester->getDone(status, external_from_this(), m_structure, m_bitSet)); } virtual void get() { @@ -678,7 +674,7 @@ public: MB_INC_AUTO_ID(channelGet); MB_POINT(channelGet, 0, "client channelGet->get()"); - ChannelGet::shared_pointer thisPtr = dynamic_pointer_cast(shared_from_this()); + ChannelGet::shared_pointer thisPtr(external_from_this()); { Lock guard(m_mutex); @@ -710,7 +706,7 @@ public: } try { - m_channel->checkAndGetTransport()->enqueueSendRequest(shared_from_this()); + m_channel->checkAndGetTransport()->enqueueSendRequest(external_from_this()); //TODO bulk hack m_channel->checkAndGetTransport()->enqueueOnlySendRequest(thisSender); } catch (std::runtime_error &rte) { stopRequest(); @@ -765,7 +761,7 @@ class ChannelPutImpl : public BaseRequestImpl, public ChannelPut { -private: +public: ChannelPutRequester::shared_pointer m_channelPutRequester; PVStructure::shared_pointer m_pvRequest; @@ -783,12 +779,11 @@ private: PVACCESS_REFCOUNT_MONITOR_CONSTRUCT(channelPut); } - void activate() + virtual void activate() { if (!m_pvRequest) { - ChannelPut::shared_pointer thisPointer = dynamic_pointer_cast(shared_from_this()); - EXCEPTION_GUARD(m_channelPutRequester->channelPutConnect(pvRequestNull, thisPointer, StructureConstPtr())); + EXCEPTION_GUARD(m_channelPutRequester->channelPutConnect(pvRequestNull, external_from_this(), StructureConstPtr())); return; } @@ -800,22 +795,17 @@ private: try { resubscribeSubscription(m_channel->checkDestroyedAndGetTransport()); } catch (std::runtime_error &rte) { - ChannelPut::shared_pointer thisPointer = dynamic_pointer_cast(shared_from_this()); - EXCEPTION_GUARD(m_channelPutRequester->channelPutConnect(channelDestroyed, thisPointer, StructureConstPtr())); + EXCEPTION_GUARD(m_channelPutRequester->channelPutConnect(channelDestroyed, external_from_this(), StructureConstPtr())); BaseRequestImpl::destroy(true); } } public: - static ChannelPut::shared_pointer create(ChannelImpl::shared_pointer const & channel, ChannelPutRequester::shared_pointer const & channelPutRequester, PVStructure::shared_pointer const & pvRequest) + static ChannelPut::shared_pointer create(ChannelImpl::shared_pointer const & channel, + ChannelPutRequester::shared_pointer const & requester, + PVStructure::shared_pointer const & pvRequest) { - // TODO use std::make_shared - std::tr1::shared_ptr tp( - new ChannelPutImpl(channel, channelPutRequester, pvRequest), - delayed_destroyable_deleter()); - ChannelPut::shared_pointer thisPointer = tp; - static_cast(thisPointer.get())->activate(); - return thisPointer; + return build(channel, requester, pvRequest); } ~ChannelPutImpl() @@ -859,8 +849,7 @@ public: virtual void initResponse(Transport::shared_pointer const & transport, int8 /*version*/, ByteBuffer* payloadBuffer, int8 /*qos*/, const Status& status) { if (!status.isSuccess()) { - ChannelPut::shared_pointer thisChannelPut = dynamic_pointer_cast(shared_from_this()); - EXCEPTION_GUARD(m_channelPutRequester->channelPutConnect(status, thisChannelPut, StructureConstPtr())); + EXCEPTION_GUARD(m_channelPutRequester->channelPutConnect(status, external_from_this(), StructureConstPtr())); return; } @@ -872,13 +861,12 @@ public: } // notify - ChannelPut::shared_pointer thisChannelPut = dynamic_pointer_cast(shared_from_this()); - EXCEPTION_GUARD(m_channelPutRequester->channelPutConnect(status, thisChannelPut, m_structure->getStructure())); + EXCEPTION_GUARD(m_channelPutRequester->channelPutConnect(status, external_from_this(), m_structure->getStructure())); } virtual void normalResponse(Transport::shared_pointer const & transport, int8 /*version*/, ByteBuffer* payloadBuffer, int8 qos, const Status& status) { - ChannelPut::shared_pointer thisPtr = dynamic_pointer_cast(shared_from_this()); + ChannelPut::shared_pointer thisPtr(external_from_this()); if (qos & QOS_GET) { @@ -904,7 +892,7 @@ public: virtual void get() { - ChannelPut::shared_pointer thisPtr = dynamic_pointer_cast(shared_from_this()); + ChannelPut::shared_pointer thisPtr(external_from_this()); { Lock guard(m_mutex); @@ -925,7 +913,7 @@ public: try { - m_channel->checkAndGetTransport()->enqueueSendRequest(shared_from_this()); + m_channel->checkAndGetTransport()->enqueueSendRequest(external_from_this()); } catch (std::runtime_error &rte) { stopRequest(); EXCEPTION_GUARD(m_channelPutRequester->getDone(channelNotConnected, thisPtr, PVStructurePtr(), BitSetPtr())); @@ -934,7 +922,7 @@ public: virtual void put(PVStructure::shared_pointer const & pvPutStructure, BitSet::shared_pointer const & pvPutBitSet) { - ChannelPut::shared_pointer thisPtr = dynamic_pointer_cast(shared_from_this()); + ChannelPut::shared_pointer thisPtr(external_from_this()); { Lock guard(m_mutex); @@ -970,7 +958,7 @@ public: *m_bitSet = *pvPutBitSet; m_structure->copyUnchecked(*pvPutStructure, *m_bitSet); unlock(); - m_channel->checkAndGetTransport()->enqueueSendRequest(shared_from_this()); + m_channel->checkAndGetTransport()->enqueueSendRequest(external_from_this()); } catch (std::runtime_error &rte) { stopRequest(); EXCEPTION_GUARD(m_channelPutRequester->putDone(channelNotConnected, thisPtr)); @@ -1021,7 +1009,7 @@ class ChannelPutGetImpl : public BaseRequestImpl, public ChannelPutGet { -private: +public: ChannelPutGetRequester::shared_pointer m_channelPutGetRequester; PVStructure::shared_pointer m_pvRequest; @@ -1044,12 +1032,11 @@ private: PVACCESS_REFCOUNT_MONITOR_CONSTRUCT(channelPutGet); } - void activate() + virtual void activate() { if (!m_pvRequest) { - ChannelPutGet::shared_pointer thisPointer = dynamic_pointer_cast(shared_from_this()); - EXCEPTION_GUARD(m_channelPutGetRequester->channelPutGetConnect(pvRequestNull, thisPointer, StructureConstPtr(), StructureConstPtr())); + EXCEPTION_GUARD(m_channelPutGetRequester->channelPutGetConnect(pvRequestNull, external_from_this(), StructureConstPtr(), StructureConstPtr())); return; } @@ -1058,22 +1045,17 @@ private: try { resubscribeSubscription(m_channel->checkDestroyedAndGetTransport()); } catch (std::runtime_error &rte) { - ChannelPutGet::shared_pointer thisPointer = dynamic_pointer_cast(shared_from_this()); - EXCEPTION_GUARD(m_channelPutGetRequester->channelPutGetConnect(channelDestroyed, thisPointer, StructureConstPtr(), StructureConstPtr())); + EXCEPTION_GUARD(m_channelPutGetRequester->channelPutGetConnect(channelDestroyed, external_from_this(), StructureConstPtr(), StructureConstPtr())); BaseRequestImpl::destroy(true); } } public: - static ChannelPutGet::shared_pointer create(ChannelImpl::shared_pointer const & channel, ChannelPutGetRequester::shared_pointer const & channelPutGetRequester, PVStructure::shared_pointer const & pvRequest) + static ChannelPutGet::shared_pointer create(ChannelImpl::shared_pointer const & channel, + ChannelPutGetRequester::shared_pointer const & requester, + PVStructure::shared_pointer const & pvRequest) { - // TODO use std::make_shared - std::tr1::shared_ptr tp( - new ChannelPutGetImpl(channel, channelPutGetRequester, pvRequest), - delayed_destroyable_deleter()); - ChannelPutGet::shared_pointer thisPointer = tp; - static_cast(thisPointer.get())->activate(); - return thisPointer; + return build(channel, requester, pvRequest); } ~ChannelPutGetImpl() @@ -1121,8 +1103,7 @@ public: virtual void initResponse(Transport::shared_pointer const & transport, int8 /*version*/, ByteBuffer* payloadBuffer, int8 /*qos*/, const Status& status) { if (!status.isSuccess()) { - ChannelPutGet::shared_pointer thisChannelPutGet = dynamic_pointer_cast(shared_from_this()); - EXCEPTION_GUARD(m_channelPutGetRequester->channelPutGetConnect(status, thisChannelPutGet, StructureConstPtr(), StructureConstPtr())); + EXCEPTION_GUARD(m_channelPutGetRequester->channelPutGetConnect(status, external_from_this(), StructureConstPtr(), StructureConstPtr())); return; } @@ -1135,14 +1116,13 @@ public: } // notify - ChannelPutGet::shared_pointer thisChannelPutGet = dynamic_pointer_cast(shared_from_this()); - EXCEPTION_GUARD(m_channelPutGetRequester->channelPutGetConnect(status, thisChannelPutGet, m_putData->getStructure(), m_getData->getStructure())); + EXCEPTION_GUARD(m_channelPutGetRequester->channelPutGetConnect(status, external_from_this(), m_putData->getStructure(), m_getData->getStructure())); } virtual void normalResponse(Transport::shared_pointer const & transport, int8 /*version*/, ByteBuffer* payloadBuffer, int8 qos, const Status& status) { - ChannelPutGet::shared_pointer thisPtr = dynamic_pointer_cast(shared_from_this()); + ChannelPutGet::shared_pointer thisPtr(external_from_this()); if (qos & QOS_GET) { @@ -1200,7 +1180,7 @@ public: virtual void putGet(PVStructure::shared_pointer const & pvPutStructure, BitSet::shared_pointer const & bitSet) { - ChannelPutGet::shared_pointer thisPtr = dynamic_pointer_cast(shared_from_this()); + ChannelPutGet::shared_pointer thisPtr(external_from_this()); { Lock guard(m_mutex); @@ -1236,7 +1216,7 @@ public: *m_putDataBitSet = *bitSet; m_putData->copyUnchecked(*pvPutStructure, *m_putDataBitSet); unlock(); - m_channel->checkAndGetTransport()->enqueueSendRequest(shared_from_this()); + m_channel->checkAndGetTransport()->enqueueSendRequest(external_from_this()); } catch (std::runtime_error &rte) { stopRequest(); EXCEPTION_GUARD(m_channelPutGetRequester->putGetDone(channelNotConnected, thisPtr, PVStructurePtr(), BitSetPtr())); @@ -1245,7 +1225,7 @@ public: virtual void getGet() { - ChannelPutGet::shared_pointer thisPtr = dynamic_pointer_cast(shared_from_this()); + ChannelPutGet::shared_pointer thisPtr(external_from_this()); { Lock guard(m_mutex); @@ -1265,7 +1245,7 @@ public: } try { - m_channel->checkAndGetTransport()->enqueueSendRequest(shared_from_this()); + m_channel->checkAndGetTransport()->enqueueSendRequest(external_from_this()); } catch (std::runtime_error &rte) { stopRequest(); EXCEPTION_GUARD(m_channelPutGetRequester->getGetDone(channelNotConnected, thisPtr, PVStructurePtr(), BitSetPtr())); @@ -1274,7 +1254,7 @@ public: virtual void getPut() { - ChannelPutGet::shared_pointer thisPtr = dynamic_pointer_cast(shared_from_this()); + ChannelPutGet::shared_pointer thisPtr(external_from_this()); { Lock guard(m_mutex); @@ -1294,7 +1274,7 @@ public: } try { - m_channel->checkAndGetTransport()->enqueueSendRequest(shared_from_this()); + m_channel->checkAndGetTransport()->enqueueSendRequest(external_from_this()); } catch (std::runtime_error &rte) { stopRequest(); EXCEPTION_GUARD(m_channelPutGetRequester->getPutDone(channelNotConnected, thisPtr, PVStructurePtr(), BitSetPtr())); @@ -1348,7 +1328,7 @@ class ChannelRPCImpl : public BaseRequestImpl, public ChannelRPC { -private: +public: ChannelRPCRequester::shared_pointer m_channelRPCRequester; PVStructure::shared_pointer m_pvRequest; @@ -1365,12 +1345,11 @@ private: PVACCESS_REFCOUNT_MONITOR_CONSTRUCT(channelRPC); } - void activate() + virtual void activate() { if (!m_pvRequest) { - ChannelRPC::shared_pointer thisPointer = dynamic_pointer_cast(shared_from_this()); - EXCEPTION_GUARD(m_channelRPCRequester->channelRPCConnect(pvRequestNull, thisPointer)); + EXCEPTION_GUARD(m_channelRPCRequester->channelRPCConnect(pvRequestNull, external_from_this())); return; } @@ -1380,22 +1359,17 @@ private: try { resubscribeSubscription(m_channel->checkDestroyedAndGetTransport()); } catch (std::runtime_error &rte) { - ChannelRPC::shared_pointer thisPointer = dynamic_pointer_cast(shared_from_this()); - EXCEPTION_GUARD(m_channelRPCRequester->channelRPCConnect(channelDestroyed, thisPointer)); + EXCEPTION_GUARD(m_channelRPCRequester->channelRPCConnect(channelDestroyed, external_from_this())); BaseRequestImpl::destroy(true); } } public: - static ChannelRPC::shared_pointer create(ChannelImpl::shared_pointer const & channel, ChannelRPCRequester::shared_pointer const & channelRPCRequester, PVStructure::shared_pointer const & pvRequest) + static ChannelRPC::shared_pointer create(ChannelImpl::shared_pointer const & channel, + ChannelRPCRequester::shared_pointer const & requester, + PVStructure::shared_pointer const & pvRequest) { - // TODO use std::make_shared - std::tr1::shared_ptr tp( - new ChannelRPCImpl(channel, channelRPCRequester, pvRequest), - delayed_destroyable_deleter()); - ChannelRPC::shared_pointer thisPointer = tp; - static_cast(thisPointer.get())->activate(); - return thisPointer; + return build(channel, requester, pvRequest); } ~ChannelRPCImpl() @@ -1441,19 +1415,17 @@ public: virtual void initResponse(Transport::shared_pointer const & /*transport*/, int8 /*version*/, ByteBuffer* /*payloadBuffer*/, int8 /*qos*/, const Status& status) { if (!status.isSuccess()) { - ChannelRPC::shared_pointer thisChannelRPC = dynamic_pointer_cast(shared_from_this()); - EXCEPTION_GUARD(m_channelRPCRequester->channelRPCConnect(status, thisChannelRPC)); + EXCEPTION_GUARD(m_channelRPCRequester->channelRPCConnect(status, external_from_this())); return; } // notify - ChannelRPC::shared_pointer thisChannelRPC = dynamic_pointer_cast(shared_from_this()); - EXCEPTION_GUARD(m_channelRPCRequester->channelRPCConnect(status, thisChannelRPC)); + EXCEPTION_GUARD(m_channelRPCRequester->channelRPCConnect(status, external_from_this())); } virtual void normalResponse(Transport::shared_pointer const & transport, int8 /*version*/, ByteBuffer* payloadBuffer, int8 /*qos*/, const Status& status) { - ChannelRPC::shared_pointer thisPtr = dynamic_pointer_cast(shared_from_this()); + ChannelRPC::shared_pointer thisPtr(external_from_this()); if (!status.isSuccess()) { @@ -1468,7 +1440,7 @@ public: virtual void request(epics::pvData::PVStructure::shared_pointer const & pvArgument) { - ChannelRPC::shared_pointer thisPtr = dynamic_pointer_cast(shared_from_this()); + ChannelRPC::shared_pointer thisPtr(external_from_this()); { Lock guard(m_mutex); @@ -1492,7 +1464,7 @@ public: m_structure = pvArgument; m_structureMutex.unlock(); - m_channel->checkAndGetTransport()->enqueueSendRequest(shared_from_this()); + m_channel->checkAndGetTransport()->enqueueSendRequest(external_from_this()); } catch (std::runtime_error &rte) { stopRequest(); EXCEPTION_GUARD(m_channelRPCRequester->requestDone(channelNotConnected, thisPtr, PVStructurePtr())); @@ -1544,7 +1516,7 @@ class ChannelArrayImpl : public BaseRequestImpl, public ChannelArray { -private: +public: ChannelArrayRequester::shared_pointer m_channelArrayRequester; PVStructure::shared_pointer m_pvRequest; @@ -1570,12 +1542,11 @@ private: PVACCESS_REFCOUNT_MONITOR_CONSTRUCT(channelArray); } - void activate() + virtual void activate() { if (!m_pvRequest) { - ChannelArray::shared_pointer thisPointer = dynamic_pointer_cast(shared_from_this()); - EXCEPTION_GUARD(m_channelArrayRequester->channelArrayConnect(pvRequestNull, thisPointer, Array::shared_pointer())); + EXCEPTION_GUARD(m_channelArrayRequester->channelArrayConnect(pvRequestNull, external_from_this(), Array::shared_pointer())); return; } @@ -1585,22 +1556,17 @@ private: try { resubscribeSubscription(m_channel->checkDestroyedAndGetTransport()); } catch (std::runtime_error &rte) { - ChannelArray::shared_pointer thisPointer = dynamic_pointer_cast(shared_from_this()); - EXCEPTION_GUARD(m_channelArrayRequester->channelArrayConnect(channelDestroyed, thisPointer, Array::shared_pointer())); + EXCEPTION_GUARD(m_channelArrayRequester->channelArrayConnect(channelDestroyed, external_from_this(), Array::shared_pointer())); BaseRequestImpl::destroy(true); } } public: - static ChannelArray::shared_pointer create(ChannelImpl::shared_pointer const & channel, ChannelArrayRequester::shared_pointer const & channelArrayRequester, PVStructure::shared_pointer const & pvRequest) + static ChannelArray::shared_pointer create(ChannelImpl::shared_pointer const & channel, + ChannelArrayRequester::shared_pointer const & requester, + PVStructure::shared_pointer const & pvRequest) { - // TODO use std::make_shared - std::tr1::shared_ptr tp( - new ChannelArrayImpl(channel, channelArrayRequester, pvRequest), - delayed_destroyable_deleter()); - ChannelArray::shared_pointer thisPointer = tp; - static_cast(thisPointer.get())->activate(); - return thisPointer; + return build(channel, requester, pvRequest); } ~ChannelArrayImpl() @@ -1661,8 +1627,7 @@ public: virtual void initResponse(Transport::shared_pointer const & transport, int8 /*version*/, ByteBuffer* payloadBuffer, int8 /*qos*/, const Status& status) { if (!status.isSuccess()) { - ChannelArray::shared_pointer thisChannelArray = dynamic_pointer_cast(shared_from_this()); - EXCEPTION_GUARD(m_channelArrayRequester->channelArrayConnect(status, thisChannelArray, Array::shared_pointer())); + EXCEPTION_GUARD(m_channelArrayRequester->channelArrayConnect(status, external_from_this(), Array::shared_pointer())); return; } @@ -1674,19 +1639,18 @@ public: } // notify - ChannelArray::shared_pointer thisChannelArray = dynamic_pointer_cast(shared_from_this()); - EXCEPTION_GUARD(m_channelArrayRequester->channelArrayConnect(status, thisChannelArray, m_arrayData->getArray())); + EXCEPTION_GUARD(m_channelArrayRequester->channelArrayConnect(status, external_from_this(), m_arrayData->getArray())); } virtual void normalResponse(Transport::shared_pointer const & transport, int8 /*version*/, ByteBuffer* payloadBuffer, int8 qos, const Status& status) { - ChannelArray::shared_pointer thisChannelArray = dynamic_pointer_cast(shared_from_this()); + ChannelArray::shared_pointer thisPtr(external_from_this()); if (qos & QOS_GET) { if (!status.isSuccess()) { - m_channelArrayRequester->getArrayDone(status, thisChannelArray, PVArray::shared_pointer()); + m_channelArrayRequester->getArrayDone(status, thisPtr, PVArray::shared_pointer()); return; } @@ -1695,21 +1659,21 @@ public: m_arrayData->deserialize(payloadBuffer, transport.get()); } - EXCEPTION_GUARD(m_channelArrayRequester->getArrayDone(status, thisChannelArray, m_arrayData)); + EXCEPTION_GUARD(m_channelArrayRequester->getArrayDone(status, thisPtr, m_arrayData)); } else if (qos & QOS_GET_PUT) { - EXCEPTION_GUARD(m_channelArrayRequester->setLengthDone(status, thisChannelArray)); + EXCEPTION_GUARD(m_channelArrayRequester->setLengthDone(status, thisPtr)); } else if (qos & QOS_PROCESS) { size_t length = SerializeHelper::readSize(payloadBuffer, transport.get()); - EXCEPTION_GUARD(m_channelArrayRequester->getLengthDone(status, thisChannelArray, length)); + EXCEPTION_GUARD(m_channelArrayRequester->getLengthDone(status, thisPtr, length)); } else { - EXCEPTION_GUARD(m_channelArrayRequester->putArrayDone(status, thisChannelArray)); + EXCEPTION_GUARD(m_channelArrayRequester->putArrayDone(status, thisPtr)); } } @@ -1718,22 +1682,22 @@ public: // TODO stride == 0 check - ChannelArray::shared_pointer thisChannelArray = dynamic_pointer_cast(shared_from_this()); + ChannelArray::shared_pointer thisPtr(external_from_this()); { Lock guard(m_mutex); if (m_destroyed) { - EXCEPTION_GUARD(m_channelArrayRequester->getArrayDone(destroyedStatus, thisChannelArray, PVArray::shared_pointer())); + EXCEPTION_GUARD(m_channelArrayRequester->getArrayDone(destroyedStatus, thisPtr, PVArray::shared_pointer())); return; } if (!m_initialized) { - EXCEPTION_GUARD(m_channelArrayRequester->getArrayDone(notInitializedStatus, thisChannelArray, PVArray::shared_pointer())); + EXCEPTION_GUARD(m_channelArrayRequester->getArrayDone(notInitializedStatus, thisPtr, PVArray::shared_pointer())); return; } } if (!startRequest(m_lastRequest.get() ? QOS_DESTROY | QOS_GET : QOS_GET)) { - EXCEPTION_GUARD(m_channelArrayRequester->getArrayDone(otherRequestPendingStatus, thisChannelArray, PVArray::shared_pointer())); + EXCEPTION_GUARD(m_channelArrayRequester->getArrayDone(otherRequestPendingStatus, thisPtr, PVArray::shared_pointer())); return; } @@ -1744,10 +1708,10 @@ public: m_count = count; m_stride = stride; } - m_channel->checkAndGetTransport()->enqueueSendRequest(shared_from_this()); + m_channel->checkAndGetTransport()->enqueueSendRequest(external_from_this()); } catch (std::runtime_error &rte) { stopRequest(); - EXCEPTION_GUARD(m_channelArrayRequester->getArrayDone(channelNotConnected, thisChannelArray, PVArray::shared_pointer())); + EXCEPTION_GUARD(m_channelArrayRequester->getArrayDone(channelNotConnected, thisPtr, PVArray::shared_pointer())); } } @@ -1755,28 +1719,28 @@ public: // TODO stride == 0 check - ChannelArray::shared_pointer thisChannelArray = dynamic_pointer_cast(shared_from_this()); + ChannelArray::shared_pointer thisPtr(external_from_this()); { Lock guard(m_mutex); if (m_destroyed) { - EXCEPTION_GUARD(m_channelArrayRequester->putArrayDone(destroyedStatus, thisChannelArray)); + EXCEPTION_GUARD(m_channelArrayRequester->putArrayDone(destroyedStatus, thisPtr)); return; } if (!m_initialized) { - EXCEPTION_GUARD(m_channelArrayRequester->putArrayDone(notInitializedStatus, thisChannelArray)); + EXCEPTION_GUARD(m_channelArrayRequester->putArrayDone(notInitializedStatus, thisPtr)); return; } } if (!(*m_arrayData->getArray() == *putArray->getArray())) { - EXCEPTION_GUARD(m_channelArrayRequester->putArrayDone(invalidPutArrayStatus, thisChannelArray)); + EXCEPTION_GUARD(m_channelArrayRequester->putArrayDone(invalidPutArrayStatus, thisPtr)); return; } if (!startRequest(m_lastRequest.get() ? QOS_DESTROY : QOS_DEFAULT)) { - EXCEPTION_GUARD(m_channelArrayRequester->putArrayDone(otherRequestPendingStatus, thisChannelArray)); + EXCEPTION_GUARD(m_channelArrayRequester->putArrayDone(otherRequestPendingStatus, thisPtr)); return; } @@ -1788,31 +1752,31 @@ public: m_count = count; m_stride = stride; } - m_channel->checkAndGetTransport()->enqueueSendRequest(shared_from_this()); + m_channel->checkAndGetTransport()->enqueueSendRequest(external_from_this()); } catch (std::runtime_error &rte) { stopRequest(); - EXCEPTION_GUARD(m_channelArrayRequester->putArrayDone(channelNotConnected, thisChannelArray)); + EXCEPTION_GUARD(m_channelArrayRequester->putArrayDone(channelNotConnected, thisPtr)); } } virtual void setLength(size_t length) { - ChannelArray::shared_pointer thisChannelArray = dynamic_pointer_cast(shared_from_this()); + ChannelArray::shared_pointer thisPtr(external_from_this()); { Lock guard(m_mutex); if (m_destroyed) { - EXCEPTION_GUARD(m_channelArrayRequester->setLengthDone(destroyedStatus, thisChannelArray)); + EXCEPTION_GUARD(m_channelArrayRequester->setLengthDone(destroyedStatus, thisPtr)); return; } if (!m_initialized) { - EXCEPTION_GUARD(m_channelArrayRequester->setLengthDone(notInitializedStatus, thisChannelArray)); + EXCEPTION_GUARD(m_channelArrayRequester->setLengthDone(notInitializedStatus, thisPtr)); return; } } if (!startRequest(m_lastRequest.get() ? QOS_DESTROY | QOS_GET_PUT : QOS_GET_PUT)) { - EXCEPTION_GUARD(m_channelArrayRequester->setLengthDone(otherRequestPendingStatus, thisChannelArray)); + EXCEPTION_GUARD(m_channelArrayRequester->setLengthDone(otherRequestPendingStatus, thisPtr)); return; } @@ -1821,40 +1785,40 @@ public: Lock lock(m_structureMutex); m_length = length; } - m_channel->checkAndGetTransport()->enqueueSendRequest(shared_from_this()); + m_channel->checkAndGetTransport()->enqueueSendRequest(external_from_this()); } catch (std::runtime_error &rte) { stopRequest(); - EXCEPTION_GUARD(m_channelArrayRequester->setLengthDone(channelNotConnected, thisChannelArray)); + EXCEPTION_GUARD(m_channelArrayRequester->setLengthDone(channelNotConnected, thisPtr)); } } virtual void getLength() { - ChannelArray::shared_pointer thisChannelArray = dynamic_pointer_cast(shared_from_this()); + ChannelArray::shared_pointer thisPtr(external_from_this()); { Lock guard(m_mutex); if (m_destroyed) { - EXCEPTION_GUARD(m_channelArrayRequester->getLengthDone(destroyedStatus, thisChannelArray, 0)); + EXCEPTION_GUARD(m_channelArrayRequester->getLengthDone(destroyedStatus, thisPtr, 0)); return; } if (!m_initialized) { - EXCEPTION_GUARD(m_channelArrayRequester->getLengthDone(notInitializedStatus, thisChannelArray, 0)); + EXCEPTION_GUARD(m_channelArrayRequester->getLengthDone(notInitializedStatus, thisPtr, 0)); return; } } if (!startRequest(m_lastRequest.get() ? QOS_DESTROY | QOS_PROCESS : QOS_PROCESS)) { - EXCEPTION_GUARD(m_channelArrayRequester->getLengthDone(otherRequestPendingStatus, thisChannelArray, 0)); + EXCEPTION_GUARD(m_channelArrayRequester->getLengthDone(otherRequestPendingStatus, thisPtr, 0)); return; } try { - m_channel->checkAndGetTransport()->enqueueSendRequest(shared_from_this()); + m_channel->checkAndGetTransport()->enqueueSendRequest(external_from_this()); } catch (std::runtime_error &rte) { stopRequest(); - EXCEPTION_GUARD(m_channelArrayRequester->getLengthDone(channelNotConnected, thisChannelArray, 0)); + EXCEPTION_GUARD(m_channelArrayRequester->getLengthDone(channelNotConnected, thisPtr, 0)); } } @@ -1895,147 +1859,6 @@ public: - - -PVACCESS_REFCOUNT_MONITOR_DEFINE(channelGetField); - -// NOTE: this instance is not returned as Request, so it must self-destruct -class ChannelGetFieldRequestImpl : - public DataResponse, - public TransportSender, - public std::tr1::enable_shared_from_this -{ -public: - typedef std::tr1::shared_ptr shared_pointer; - typedef std::tr1::shared_ptr const_shared_pointer; - -private: - ChannelImpl::shared_pointer m_channel; - - GetFieldRequester::shared_pointer m_callback; - string m_subField; - - pvAccessID m_ioid; - - Mutex m_mutex; - bool m_destroyed; - - ResponseRequest::shared_pointer m_thisPointer; - - ChannelGetFieldRequestImpl(ChannelImpl::shared_pointer const & channel, GetFieldRequester::shared_pointer const & callback, string const & subField) : - m_channel(channel), - m_callback(callback), - m_subField(subField), - m_ioid(INVALID_IOID), - m_destroyed(false) - { - PVACCESS_REFCOUNT_MONITOR_CONSTRUCT(channelGetField); - } - - void activate() - { - // register response request - m_thisPointer = shared_from_this(); - m_ioid = m_channel->getContext()->registerResponseRequest(m_thisPointer); - m_channel->registerResponseRequest(m_thisPointer); - - // enqueue send request - try { - m_channel->checkAndGetTransport()->enqueueSendRequest(shared_from_this()); - } catch (std::runtime_error &rte) { - EXCEPTION_GUARD(m_callback->getDone(BaseRequestImpl::channelNotConnected, FieldConstPtr())); - } - } - -public: - static shared_pointer create(ChannelImpl::shared_pointer const & channel, GetFieldRequester::shared_pointer const & callback, string const & subField) - { - shared_pointer thisPointer(new ChannelGetFieldRequestImpl(channel, callback, subField), delayed_destroyable_deleter()); - thisPointer->activate(); - return thisPointer; - } - - ~ChannelGetFieldRequestImpl() - { - PVACCESS_REFCOUNT_MONITOR_DESTRUCT(channelGetField); - } - - ChannelBaseRequester::shared_pointer getRequester() { - return m_callback; - } - - pvAccessID getIOID() const { - return m_ioid; - } - - virtual void send(ByteBuffer* buffer, TransportSendControl* control) { - control->startMessage((int8)17, 8); - buffer->putInt(m_channel->getServerChannelID()); - buffer->putInt(m_ioid); - SerializeHelper::serializeString(m_subField, buffer, control); - } - - - virtual Channel::shared_pointer getChannel() - { - return m_channel; - } - - virtual void cancel() { - // TODO - // noop - } - - virtual void timeout() { - cancel(); - } - - void reportStatus(Channel::ConnectionState status) { - // destroy, since channel (parent) was destroyed - if (status == Channel::DESTROYED) - destroy(); - // TODO notify? - } - - virtual void destroy() - { - { - Lock guard(m_mutex); - if (m_destroyed) - return; - m_destroyed = true; - } - - // unregister response request - m_channel->getContext()->unregisterResponseRequest(m_ioid); - m_channel->unregisterResponseRequest(m_ioid); - - m_thisPointer.reset(); - } - - virtual void response(Transport::shared_pointer const & transport, int8 /*version*/, ByteBuffer* payloadBuffer) { - - Status status; - status.deserialize(payloadBuffer, transport.get()); - if (status.isSuccess()) - { - // deserialize Field... - FieldConstPtr field = transport->cachedDeserialize(payloadBuffer); - EXCEPTION_GUARD(m_callback->getDone(status, field)); - } - else - { - EXCEPTION_GUARD(m_callback->getDone(status, FieldConstPtr())); - } - - destroy(); - } - - -}; - - - class MonitorStrategy : public Monitor { public: virtual ~MonitorStrategy() {}; @@ -2346,7 +2169,8 @@ class ChannelMonitorImpl : public BaseRequestImpl, public Monitor { -private: +public: + typedef MonitorRequester requester_type; MonitorRequester::shared_pointer m_monitorRequester; bool m_started; @@ -2374,12 +2198,11 @@ private: PVACCESS_REFCOUNT_MONITOR_CONSTRUCT(channelMonitor); } - void activate() + virtual void activate() { if (!m_pvRequest) { - Monitor::shared_pointer thisPointer = dynamic_pointer_cast(shared_from_this()); - EXCEPTION_GUARD(m_monitorRequester->monitorConnect(pvRequestNull, thisPointer, StructureConstPtr())); + EXCEPTION_GUARD(m_monitorRequester->monitorConnect(pvRequestNull, external_from_this(), StructureConstPtr())); return; } @@ -2437,8 +2260,7 @@ private: try { resubscribeSubscription(m_channel->checkDestroyedAndGetTransport()); } catch (std::runtime_error &rte) { - Monitor::shared_pointer thisPointer = dynamic_pointer_cast(shared_from_this()); - EXCEPTION_GUARD(m_monitorRequester->monitorConnect(channelDestroyed, thisPointer, StructureConstPtr())); + EXCEPTION_GUARD(m_monitorRequester->monitorConnect(channelDestroyed, external_from_this(), StructureConstPtr())); BaseRequestImpl::destroy(true); } } @@ -2449,25 +2271,17 @@ private: startRequest(m_pipeline ? (QOS_INIT | QOS_GET_PUT) : QOS_INIT)) { m_subscribed.set(); - transport->enqueueSendRequest(shared_from_this()); + transport->enqueueSendRequest(external_from_this()); } } public: static Monitor::shared_pointer create( ChannelImpl::shared_pointer const & channel, - MonitorRequester::shared_pointer const & monitorRequester, + MonitorRequester::shared_pointer const & requester, PVStructure::shared_pointer const & pvRequest) { - // TODO use std::make_shared - std::tr1::shared_ptr tp( - new ChannelMonitorImpl( - channel, monitorRequester, - pvRequest), - delayed_destroyable_deleter()); - Monitor::shared_pointer thisPointer = tp; - static_cast(thisPointer.get())->activate(); - return thisPointer; + return build(channel, requester, pvRequest); } ~ChannelMonitorImpl() @@ -2513,8 +2327,7 @@ public: { if (!status.isSuccess()) { - Monitor::shared_pointer thisChannelMonitor = dynamic_pointer_cast(shared_from_this()); - EXCEPTION_GUARD(m_monitorRequester->monitorConnect(status, thisChannelMonitor, StructureConstPtr())); + EXCEPTION_GUARD(m_monitorRequester->monitorConnect(status, external_from_this(), StructureConstPtr())); return; } @@ -2527,8 +2340,7 @@ public: bool restoreStartedState = m_started; // notify - Monitor::shared_pointer thisChannelMonitor = dynamic_pointer_cast(shared_from_this()); - EXCEPTION_GUARD(m_monitorRequester->monitorConnect(status, thisChannelMonitor, structure)); + EXCEPTION_GUARD(m_monitorRequester->monitorConnect(status, external_from_this(), structure)); if (restoreStartedState) start(); @@ -2616,7 +2428,7 @@ public: try { - m_channel->checkAndGetTransport()->enqueueSendRequest(shared_from_this()); + m_channel->checkAndGetTransport()->enqueueSendRequest(external_from_this()); m_started = true; return Status::Ok; } catch (std::runtime_error &rte) { @@ -2642,7 +2454,7 @@ public: try { - m_channel->checkAndGetTransport()->enqueueSendRequest(shared_from_this()); + m_channel->checkAndGetTransport()->enqueueSendRequest(external_from_this()); m_started = false; return Status::Ok; } catch (std::runtime_error &rte) { @@ -3356,7 +3168,7 @@ public: // NOTE it's up to internal code to respond w/ error to requester and return 0 in case of errors } -private: +public: /** * Implementation of Channel. */ @@ -3365,6 +3177,8 @@ private: public std::tr1::enable_shared_from_this, public TimerCallback { + public: + POINTER_DEFINITIONS(InternalChannelImpl); private: /** @@ -3387,6 +3201,12 @@ private: */ ChannelRequester::shared_pointer m_requester; + public: + //! The in-progress GetField operation. + //! held here as the present API doesn't support cancellation + std::tr1::shared_ptr m_getfield; + private: + /** * Process priority. */ @@ -3437,12 +3257,12 @@ private: * Server channel ID. */ pvAccessID m_serverChannelID; - +public: /** * Context sync. mutex. */ Mutex m_channelMutex; - +private: /** * Flag indicting what message to send. */ @@ -3505,12 +3325,14 @@ private: short priority, auto_ptr& addresses) { - // TODO use std::make_shared std::tr1::shared_ptr tp( - new InternalChannelImpl(context, channelID, name, requester, priority, addresses), - delayed_destroyable_deleter()); + new InternalChannelImpl(context, channelID, name, requester, priority, addresses)); ChannelImpl::shared_pointer thisPointer = tp; static_cast(thisPointer.get())->activate(); + { + ChannelImpl::shared_pointer wrap(thisPointer.get(), Destroyable::cleaner(thisPointer)); + thisPointer.swap(wrap); + } return thisPointer; } @@ -3534,6 +3356,8 @@ private: std::cout << "[" << getRequesterName() << "] message(" << message << ", " << getMessageTypeName(messageType) << ")" << std::endl; } + private: + int32_t& getUserValue() { return m_userValue; } @@ -3589,7 +3413,7 @@ private: pvAccessID getChannelID() { return m_channelID; } - +public: virtual ClientContextImpl::shared_pointer getContext() { return m_context; } @@ -4193,10 +4017,7 @@ private: } } - virtual void getField(GetFieldRequester::shared_pointer const & requester,std::string const & subField) - { - ChannelGetFieldRequestImpl::create(shared_from_this(), requester, subField); - } + virtual void getField(GetFieldRequester::shared_pointer const & requester,std::string const & subField); virtual ChannelProcess::shared_pointer createChannelProcess( ChannelProcessRequester::shared_pointer const & channelProcessRequester, @@ -4974,11 +4795,179 @@ private: FlushStrategy m_flushStrategy; }; +PVACCESS_REFCOUNT_MONITOR_DEFINE(channelGetField); + +class ChannelGetFieldRequestImpl : + public DataResponse, + public TransportSender, + public Destroyable, + public std::tr1::enable_shared_from_this +{ +public: + POINTER_DEFINITIONS(ChannelGetFieldRequestImpl); + + const InternalClientContextImpl::InternalChannelImpl::shared_pointer m_channel; + + const GetFieldRequester::shared_pointer m_callback; + string m_subField; + + pvAccessID m_ioid; + + Mutex m_mutex; + bool m_destroyed; + bool m_notified; + + ChannelGetFieldRequestImpl(InternalClientContextImpl::InternalChannelImpl::shared_pointer const & channel, + GetFieldRequester::shared_pointer const & callback, + std::string const & subField) : + m_channel(channel), + m_callback(callback), + m_subField(subField), + m_ioid(INVALID_IOID), + m_destroyed(false), + m_notified(false) + { + PVACCESS_REFCOUNT_MONITOR_CONSTRUCT(channelGetField); + } + + void activate() + { + { + // register response request + ChannelGetFieldRequestImpl::shared_pointer self(shared_from_this()); + m_ioid = m_channel->getContext()->registerResponseRequest(self); + m_channel->registerResponseRequest(self); + { + Lock L(m_channel->m_channelMutex); + m_channel->m_getfield.swap(self); + } + // self goes out of scope, may call GetFieldRequester::getDone() from dtor + } + + // enqueue send request + try { + m_channel->checkAndGetTransport()->enqueueSendRequest(shared_from_this()); + } catch (std::runtime_error &rte) { + notify(BaseRequestImpl::channelNotConnected, FieldConstPtr()); + } + } + +public: + static void create(InternalClientContextImpl::InternalChannelImpl::shared_pointer const & channel, + GetFieldRequester::shared_pointer const & requester, + std::string const & subField) + { + ChannelGetFieldRequestImpl::shared_pointer self(new ChannelGetFieldRequestImpl(channel, requester, subField)); + self->activate(); + // activate() stores self in channel + } + + virtual ~ChannelGetFieldRequestImpl() + { + destroy(); + notify(BaseRequestImpl::channelDestroyed, FieldConstPtr()); + + PVACCESS_REFCOUNT_MONITOR_DESTRUCT(channelGetField); + } + + void notify(const Status& sts, const FieldConstPtr& field) + { + { + Lock G(m_mutex); + if(m_notified) + return; + m_notified = true; + } + EXCEPTION_GUARD(m_callback->getDone(sts, field)); + } + + ChannelBaseRequester::shared_pointer getRequester() { + return m_callback; + } + + pvAccessID getIOID() const { + return m_ioid; + } + + virtual void send(ByteBuffer* buffer, TransportSendControl* control) { + control->startMessage((int8)17, 8); + buffer->putInt(m_channel->getServerChannelID()); + buffer->putInt(m_ioid); + SerializeHelper::serializeString(m_subField, buffer, control); + } + + + virtual Channel::shared_pointer getChannel() + { + return m_channel; + } + + virtual void cancel() { + // TODO + // noop + } + + virtual void timeout() { + cancel(); + } + + void reportStatus(Channel::ConnectionState status) { + // destroy, since channel (parent) was destroyed + if (status == Channel::DESTROYED) + destroy(); + // TODO notify? + } + + virtual void destroy() + { + { + Lock guard(m_mutex); + if (m_destroyed) + return; + m_destroyed = true; + } + + { + Lock L(m_channel->m_channelMutex); + if(m_channel->m_getfield.get()==this) + m_channel->m_getfield.reset(); + } + + // unregister response request + m_channel->getContext()->unregisterResponseRequest(m_ioid); + m_channel->unregisterResponseRequest(m_ioid); + } + + virtual void response(Transport::shared_pointer const & transport, int8 /*version*/, ByteBuffer* payloadBuffer) { + + Status status; + FieldConstPtr field; + status.deserialize(payloadBuffer, transport.get()); + if (status.isSuccess()) + { + // deserialize Field... + field = transport->cachedDeserialize(payloadBuffer); + } + notify(status, field); + + destroy(); + } + + +}; + + +void InternalClientContextImpl::InternalChannelImpl::getField(GetFieldRequester::shared_pointer const & requester,std::string const & subField) +{ + ChannelGetFieldRequestImpl::create(shared_from_this(), requester, subField); +} + ChannelProvider::shared_pointer createClientProvider(const Configuration::shared_pointer& conf) { - InternalClientContextImpl::shared_pointer t(new InternalClientContextImpl(conf), delayed_destroyable_deleter()); - t->initialize(); - return t; + InternalClientContextImpl::shared_pointer internal(new InternalClientContextImpl(conf)), + external(internal.get(), Destroyable::cleaner(internal)); + internal->initialize(); + return external; } } diff --git a/testApp/remote/channelAccessIFTest.cpp b/testApp/remote/channelAccessIFTest.cpp index e35d731..bf14c0b 100644 --- a/testApp/remote/channelAccessIFTest.cpp +++ b/testApp/remote/channelAccessIFTest.cpp @@ -181,9 +181,9 @@ SyncChannelGetRequesterImpl::shared_pointer ChannelAccessIFTest::syncCreateChann TR1::shared_ptr channelGetReq(new SyncChannelGetRequesterImpl(channel->getChannelName(), debug)); - PVStructure::shared_pointer pvRequest = CreateRequest::create()->createRequest(request); + PVStructure::shared_pointer pvRequest = createRequest(request); - channel->createChannelGet(channelGetReq,pvRequest); + ChannelGet::shared_pointer op(channel->createChannelGet(channelGetReq,pvRequest)); bool succStatus = channelGetReq->waitUntilGetDone(getTimeoutSec()); if (!succStatus) { std::cerr << "[" << channel->getChannelName() << "] failed to get. " << std::endl; @@ -201,9 +201,9 @@ SyncChannelPutRequesterImpl::shared_pointer ChannelAccessIFTest::syncCreateChann channelPutReq(new SyncChannelPutRequesterImpl(channel->getChannelName(), debug)); - PVStructure::shared_pointer pvRequest = CreateRequest::create()->createRequest(request); + PVStructure::shared_pointer pvRequest = createRequest(request); - channel->createChannelPut(channelPutReq,pvRequest); + ChannelPut::shared_pointer op(channel->createChannelPut(channelPutReq,pvRequest)); bool succStatus = channelPutReq->waitUntilConnected(getTimeoutSec()); if (!succStatus) { @@ -221,9 +221,9 @@ SyncChannelPutGetRequesterImpl::shared_pointer ChannelAccessIFTest::syncCreateCh TR1::shared_ptr channelPutGetReq(new SyncChannelPutGetRequesterImpl(debug)); - PVStructure::shared_pointer pvRequest = CreateRequest::create()->createRequest(request); + PVStructure::shared_pointer pvRequest = createRequest(request); - channel->createChannelPutGet(channelPutGetReq,pvRequest); + ChannelPutGet::shared_pointer op(channel->createChannelPutGet(channelPutGetReq,pvRequest)); bool succStatus = channelPutGetReq->waitUntilConnected(getTimeoutSec()); if (!succStatus) { @@ -243,7 +243,7 @@ SyncChannelRPCRequesterImpl::shared_pointer ChannelAccessIFTest::syncCreateChann PVStructure::shared_pointer pvRequest = CreateRequest::create()->createRequest(string()); - channel->createChannelRPC(channelRPCReq, pvRequest); + ChannelRPC::shared_pointer op(channel->createChannelRPC(channelRPCReq, pvRequest)); bool succStatus = channelRPCReq->waitUntilConnected(getTimeoutSec()); if (!succStatus) { @@ -259,9 +259,9 @@ SyncMonitorRequesterImpl::shared_pointer ChannelAccessIFTest::syncCreateChannelM { TR1::shared_ptr monitorReq(new SyncMonitorRequesterImpl(debug)); - PVStructure::shared_pointer pvRequest = CreateRequest::create()->createRequest(request); + PVStructure::shared_pointer pvRequest = createRequest(request); - channel->createMonitor(monitorReq, pvRequest); + Monitor::shared_pointer op(channel->createMonitor(monitorReq, pvRequest)); bool succStatus = monitorReq->waitUntilConnected(getTimeoutSec()); if (!succStatus) { @@ -277,7 +277,7 @@ SyncChannelArrayRequesterImpl::shared_pointer ChannelAccessIFTest::syncCreateCha { TR1::shared_ptr arrayReq(new SyncChannelArrayRequesterImpl(debug)); - channel->createChannelArray(arrayReq, pvRequest); + ChannelArray::shared_pointer op(channel->createChannelArray(arrayReq, pvRequest)); bool succStatus = arrayReq->waitUntilConnected(getTimeoutSec()); if (!succStatus) { @@ -519,6 +519,7 @@ void ChannelAccessIFTest::test_channelGetNoProcess() { return; } + testDiag("start Get"); SyncChannelGetRequesterImpl::shared_pointer channelGetReq = syncCreateChannelGet(channel,request); if (!channelGetReq.get()) {