From c84d41396d10a775ff904ddf419c4c3f2769d88b Mon Sep 17 00:00:00 2001 From: Michael Davidsaver Date: Wed, 28 Jun 2017 15:47:38 +0200 Subject: [PATCH] pva client: reverse operation -> requester strong ref make this a weak ref as it is more natural that the initiator (requester) holds a strong ref to the operation. --- src/remote/pv/remote.h | 5 +- src/remoteClient/clientContextImpl.cpp | 316 +++++++++++++------------ 2 files changed, 168 insertions(+), 153 deletions(-) diff --git a/src/remote/pv/remote.h b/src/remote/pv/remote.h index 0940bde..888315d 100644 --- a/src/remote/pv/remote.h +++ b/src/remote/pv/remote.h @@ -564,8 +564,9 @@ public: virtual void reportStatus(Channel::ConnectionState status) = 0; /** - * Get request requester. - * @return request requester. + * used by MessageHandler and reportChannelStateChange(). + * + * May return NULL */ virtual std::tr1::shared_ptr getRequester() = 0; }; diff --git a/src/remoteClient/clientContextImpl.cpp b/src/remoteClient/clientContextImpl.cpp index c9e1b37..a4f8476 100644 --- a/src/remoteClient/clientContextImpl.cpp +++ b/src/remoteClient/clientContextImpl.cpp @@ -68,6 +68,10 @@ typedef std::map IOIDResponseRequestM catch (std::exception &e) { LOG(logLevelError, "Unhandled exception caught from client code at %s:%d: %s", __FILE__, __LINE__, e.what()); } \ catch (...) { LOG(logLevelError, "Unhandled exception caught from client code at %s:%d.", __FILE__, __LINE__); } +#define EXCEPTION_GUARD3(WEAK, PTR, code) {requester_type::shared_pointer PTR((WEAK).lock()); if(PTR) try { code; } \ + catch (std::exception &e) { LOG(logLevelError, "Unhandled exception caught from client code at %s:%d: %s", __FILE__, __LINE__, e.what()); } \ + catch (...) { LOG(logLevelError, "Unhandled exception caught from client code at %s:%d.", __FILE__, __LINE__); }} + /** * Base channel request. * @author Matej Sekoranja @@ -123,8 +127,6 @@ protected: ChannelImpl::shared_pointer m_channel; - ChannelBaseRequester::shared_pointer m_requester; - /* negative... */ static const int NULL_REQUEST = -1; static const int PURE_DESTROY_REQUEST = -2; @@ -189,9 +191,8 @@ protected: AtomicBoolean m_subscribed; - BaseRequestImpl(ChannelImpl::shared_pointer const & channel, ChannelBaseRequester::shared_pointer const & requester) : + BaseRequestImpl(ChannelImpl::shared_pointer const & channel) : m_channel(channel), - m_requester(requester), m_ioid(INVALID_IOID), m_pendingRequest(NULL_REQUEST), m_destroyed(false), @@ -232,11 +233,6 @@ protected: public: - // used to send message to this request - ChannelBaseRequester::shared_pointer getRequester() OVERRIDE FINAL { - return m_requester; - } - pvAccessID getIOID() const OVERRIDE FINAL { return m_ioid; } @@ -422,11 +418,11 @@ class ChannelProcessRequestImpl : public ChannelProcess { public: - ChannelProcessRequester::shared_pointer m_callback; + requester_type::weak_pointer m_callback; PVStructure::shared_pointer m_pvRequest; ChannelProcessRequestImpl(ChannelImpl::shared_pointer const & channel, ChannelProcessRequester::shared_pointer const & callback, PVStructure::shared_pointer const & pvRequest) : - BaseRequestImpl(channel, callback), + BaseRequestImpl(channel), m_callback(callback), m_pvRequest(pvRequest) { @@ -444,17 +440,18 @@ public: try { resubscribeSubscription(m_channel->checkDestroyedAndGetTransport()); } catch (std::runtime_error &rte) { - EXCEPTION_GUARD(m_callback->channelProcessConnect(channelDestroyed, external_from_this())); + EXCEPTION_GUARD3(m_callback, cb, cb->channelProcessConnect(channelDestroyed, external_from_this())); BaseRequestImpl::destroy(true); } } -public: ~ChannelProcessRequestImpl() { PVACCESS_REFCOUNT_MONITOR_DESTRUCT(channelProcess); } + ChannelBaseRequester::shared_pointer getRequester() OVERRIDE FINAL { return m_callback.lock(); } + virtual void send(ByteBuffer* buffer, TransportSendControl* control) OVERRIDE FINAL { int32 pendingRequest = getPendingRequest(); if (pendingRequest < 0) @@ -478,11 +475,11 @@ public: } virtual void initResponse(Transport::shared_pointer const & /*transport*/, int8 /*version*/, ByteBuffer* /*payloadBuffer*/, int8 /*qos*/, const Status& status) OVERRIDE FINAL { - EXCEPTION_GUARD(m_callback->channelProcessConnect(status, external_from_this())); + EXCEPTION_GUARD3(m_callback, cb, cb->channelProcessConnect(status, external_from_this())); } virtual void normalResponse(Transport::shared_pointer const & /*transport*/, int8 /*version*/, ByteBuffer* /*payloadBuffer*/, int8 /*qos*/, const Status& status) OVERRIDE FINAL { - EXCEPTION_GUARD(m_callback->processDone(status, external_from_this())); + EXCEPTION_GUARD3(m_callback, cb, cb->processDone(status, external_from_this())); } virtual void process() OVERRIDE FINAL @@ -492,17 +489,17 @@ public: { Lock guard(m_mutex); if (m_destroyed) { - EXCEPTION_GUARD(m_callback->processDone(destroyedStatus, thisPtr)); + EXCEPTION_GUARD3(m_callback, cb, cb->processDone(destroyedStatus, thisPtr)); return; } if (!m_initialized) { - EXCEPTION_GUARD(m_callback->processDone(notInitializedStatus, thisPtr)); + EXCEPTION_GUARD3(m_callback, cb, cb->processDone(notInitializedStatus, thisPtr)); return; } } if (!startRequest(m_lastRequest.get() ? QOS_DESTROY : QOS_DEFAULT)) { - EXCEPTION_GUARD(m_callback->processDone(otherRequestPendingStatus, thisPtr)); + EXCEPTION_GUARD3(m_callback, cb, cb->processDone(otherRequestPendingStatus, thisPtr)); return; } @@ -510,7 +507,7 @@ public: m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this()); } catch (std::runtime_error &rte) { stopRequest(); - EXCEPTION_GUARD(m_callback->processDone(channelNotConnected, thisPtr)); + EXCEPTION_GUARD3(m_callback, cb, cb->processDone(channelNotConnected, thisPtr)); } } @@ -548,7 +545,7 @@ class ChannelGetImpl : public ChannelGet { public: - ChannelGetRequester::shared_pointer m_channelGetRequester; + ChannelGetRequester::weak_pointer m_callback; PVStructure::shared_pointer m_pvRequest; @@ -557,9 +554,11 @@ public: Mutex m_structureMutex; - ChannelGetImpl(ChannelImpl::shared_pointer const & channel, ChannelGetRequester::shared_pointer const & channelGetRequester, PVStructure::shared_pointer const & pvRequest) : - BaseRequestImpl(channel, channelGetRequester), - m_channelGetRequester(channelGetRequester), + ChannelGetImpl(ChannelImpl::shared_pointer const & channel, + ChannelGetRequester::shared_pointer const & requester, + PVStructure::shared_pointer const & pvRequest) : + BaseRequestImpl(channel), + m_callback(requester), m_pvRequest(pvRequest) { PVACCESS_REFCOUNT_MONITOR_CONSTRUCT(channelGet); @@ -569,7 +568,7 @@ public: { if (!m_pvRequest) { - EXCEPTION_GUARD(m_channelGetRequester->channelGetConnect(pvRequestNull, external_from_this(), StructureConstPtr())); + EXCEPTION_GUARD3(m_callback, cb, cb->channelGetConnect(pvRequestNull, external_from_this(), StructureConstPtr())); return; } @@ -581,17 +580,18 @@ public: try { resubscribeSubscription(m_channel->checkDestroyedAndGetTransport()); } catch (std::runtime_error &rte) { - EXCEPTION_GUARD(m_channelGetRequester->channelGetConnect(channelDestroyed, external_from_this(), StructureConstPtr())); + EXCEPTION_GUARD3(m_callback, cb, cb->channelGetConnect(channelDestroyed, external_from_this(), StructureConstPtr())); BaseRequestImpl::destroy(true); } } -public: virtual ~ChannelGetImpl() { PVACCESS_REFCOUNT_MONITOR_DESTRUCT(channelGet); } + ChannelBaseRequester::shared_pointer getRequester() OVERRIDE FINAL { return m_callback.lock(); } + virtual void send(ByteBuffer* buffer, TransportSendControl* control) OVERRIDE FINAL { int32 pendingRequest = getPendingRequest(); bool initStage = ((pendingRequest & QOS_INIT) != 0); @@ -623,7 +623,7 @@ public: virtual void initResponse(Transport::shared_pointer const & transport, int8 /*version*/, ByteBuffer* payloadBuffer, int8 /*qos*/, const Status& status) OVERRIDE FINAL { if (!status.isSuccess()) { - EXCEPTION_GUARD(m_channelGetRequester->channelGetConnect(status, external_from_this(), StructureConstPtr())); + EXCEPTION_GUARD3(m_callback, cb, cb->channelGetConnect(status, external_from_this(), StructureConstPtr())); return; } @@ -635,7 +635,7 @@ public: } // notify - EXCEPTION_GUARD(m_channelGetRequester->channelGetConnect(status, external_from_this(), m_structure->getStructure())); + EXCEPTION_GUARD3(m_callback, cb, cb->channelGetConnect(status, external_from_this(), m_structure->getStructure())); } virtual void normalResponse(Transport::shared_pointer const & transport, int8 /*version*/, ByteBuffer* payloadBuffer, int8 /*qos*/, const Status& status) OVERRIDE FINAL { @@ -644,7 +644,7 @@ public: if (!status.isSuccess()) { - EXCEPTION_GUARD(m_channelGetRequester->getDone(status, external_from_this(), PVStructurePtr(), BitSetPtr())); + EXCEPTION_GUARD3(m_callback, cb, cb->getDone(status, external_from_this(), PVStructurePtr(), BitSetPtr())); return; } @@ -657,7 +657,7 @@ public: MB_POINT(channelGet, 9, "client channelGet->deserialize (end), just before channelGet->getDone() is called"); - EXCEPTION_GUARD(m_channelGetRequester->getDone(status, external_from_this(), m_structure, m_bitSet)); + EXCEPTION_GUARD3(m_callback, cb, cb->getDone(status, external_from_this(), m_structure, m_bitSet)); } virtual void get() OVERRIDE FINAL { @@ -670,11 +670,11 @@ public: { Lock guard(m_mutex); if (m_destroyed) { - EXCEPTION_GUARD(m_channelGetRequester->getDone(destroyedStatus, thisPtr, PVStructurePtr(), BitSetPtr())); + EXCEPTION_GUARD3(m_callback, cb, cb->getDone(destroyedStatus, thisPtr, PVStructurePtr(), BitSetPtr())); return; } if (!m_initialized) { - EXCEPTION_GUARD(m_channelGetRequester->getDone(notInitializedStatus, thisPtr, PVStructurePtr(), BitSetPtr())); + EXCEPTION_GUARD3(m_callback, cb, cb->getDone(notInitializedStatus, thisPtr, PVStructurePtr(), BitSetPtr())); return; } } @@ -686,13 +686,13 @@ public: m_channel->checkAndGetTransport()->flushSendQueue(); } catch (std::runtime_error &rte) { stopRequest(); - EXCEPTION_GUARD(m_channelGetRequester->getDone(channelNotConnected, thisPtr)); + EXCEPTION_GUARD3(m_callback, cb, cb->getDone(channelNotConnected, thisPtr)); } return; } */ if (!startRequest(m_lastRequest.get() ? QOS_DESTROY | QOS_GET : QOS_DEFAULT)) { - EXCEPTION_GUARD(m_channelGetRequester->getDone(otherRequestPendingStatus, thisPtr, PVStructurePtr(), BitSetPtr())); + EXCEPTION_GUARD3(m_callback, cb, cb->getDone(otherRequestPendingStatus, thisPtr, PVStructurePtr(), BitSetPtr())); return; } @@ -701,7 +701,7 @@ public: //TODO bulk hack m_channel->checkAndGetTransport()->enqueueOnlySendRequest(thisSender); } catch (std::runtime_error &rte) { stopRequest(); - EXCEPTION_GUARD(m_channelGetRequester->getDone(channelNotConnected, thisPtr, PVStructurePtr(), BitSetPtr())); + EXCEPTION_GUARD3(m_callback, cb, cb->getDone(channelNotConnected, thisPtr, PVStructurePtr(), BitSetPtr())); } } @@ -753,7 +753,7 @@ class ChannelPutImpl : public ChannelPut { public: - ChannelPutRequester::shared_pointer m_channelPutRequester; + ChannelPutRequester::weak_pointer m_callback; PVStructure::shared_pointer m_pvRequest; @@ -762,9 +762,11 @@ public: Mutex m_structureMutex; - ChannelPutImpl(ChannelImpl::shared_pointer const & channel, ChannelPutRequester::shared_pointer const & channelPutRequester, PVStructure::shared_pointer const & pvRequest) : - BaseRequestImpl(channel, channelPutRequester), - m_channelPutRequester(channelPutRequester), + ChannelPutImpl(ChannelImpl::shared_pointer const & channel, + ChannelPutRequester::shared_pointer const & requester, + PVStructure::shared_pointer const & pvRequest) : + BaseRequestImpl(channel), + m_callback(requester), m_pvRequest(pvRequest) { PVACCESS_REFCOUNT_MONITOR_CONSTRUCT(channelPut); @@ -774,7 +776,7 @@ public: { if (!m_pvRequest) { - EXCEPTION_GUARD(m_channelPutRequester->channelPutConnect(pvRequestNull, external_from_this(), StructureConstPtr())); + EXCEPTION_GUARD3(m_callback, cb, cb->channelPutConnect(pvRequestNull, external_from_this(), StructureConstPtr())); return; } @@ -786,17 +788,18 @@ public: try { resubscribeSubscription(m_channel->checkDestroyedAndGetTransport()); } catch (std::runtime_error &rte) { - EXCEPTION_GUARD(m_channelPutRequester->channelPutConnect(channelDestroyed, external_from_this(), StructureConstPtr())); + EXCEPTION_GUARD3(m_callback, cb, cb->channelPutConnect(channelDestroyed, external_from_this(), StructureConstPtr())); BaseRequestImpl::destroy(true); } } -public: virtual ~ChannelPutImpl() { PVACCESS_REFCOUNT_MONITOR_DESTRUCT(channelPut); } + ChannelBaseRequester::shared_pointer getRequester() OVERRIDE FINAL { return m_callback.lock(); } + virtual void send(ByteBuffer* buffer, TransportSendControl* control) OVERRIDE FINAL { int32 pendingRequest = getPendingRequest(); if (pendingRequest < 0) @@ -833,7 +836,7 @@ public: virtual void initResponse(Transport::shared_pointer const & transport, int8 /*version*/, ByteBuffer* payloadBuffer, int8 /*qos*/, const Status& status) OVERRIDE FINAL { if (!status.isSuccess()) { - EXCEPTION_GUARD(m_channelPutRequester->channelPutConnect(status, external_from_this(), StructureConstPtr())); + EXCEPTION_GUARD3(m_callback, cb, cb->channelPutConnect(status, external_from_this(), StructureConstPtr())); return; } @@ -845,7 +848,7 @@ public: } // notify - EXCEPTION_GUARD(m_channelPutRequester->channelPutConnect(status, external_from_this(), m_structure->getStructure())); + EXCEPTION_GUARD3(m_callback, cb, cb->channelPutConnect(status, external_from_this(), m_structure->getStructure())); } virtual void normalResponse(Transport::shared_pointer const & transport, int8 /*version*/, ByteBuffer* payloadBuffer, int8 qos, const Status& status) OVERRIDE FINAL { @@ -856,7 +859,7 @@ public: { if (!status.isSuccess()) { - EXCEPTION_GUARD(m_channelPutRequester->getDone(status, thisPtr, PVStructurePtr(), BitSetPtr())); + EXCEPTION_GUARD3(m_callback, cb, cb->getDone(status, thisPtr, PVStructurePtr(), BitSetPtr())); return; } @@ -866,11 +869,11 @@ public: m_structure->deserialize(payloadBuffer, transport.get(), m_bitSet.get()); } - EXCEPTION_GUARD(m_channelPutRequester->getDone(status, thisPtr, m_structure, m_bitSet)); + EXCEPTION_GUARD3(m_callback, cb, cb->getDone(status, thisPtr, m_structure, m_bitSet)); } else { - EXCEPTION_GUARD(m_channelPutRequester->putDone(status, thisPtr)); + EXCEPTION_GUARD3(m_callback, cb, cb->putDone(status, thisPtr)); } } @@ -881,17 +884,17 @@ public: { Lock guard(m_mutex); if (m_destroyed) { - EXCEPTION_GUARD(m_channelPutRequester->getDone(destroyedStatus, thisPtr, PVStructurePtr(), BitSetPtr())); + EXCEPTION_GUARD3(m_callback, cb, cb->getDone(destroyedStatus, thisPtr, PVStructurePtr(), BitSetPtr())); return; } if (!m_initialized) { - EXCEPTION_GUARD(m_channelPutRequester->getDone(notInitializedStatus, thisPtr, PVStructurePtr(), BitSetPtr())); + EXCEPTION_GUARD3(m_callback, cb, cb->getDone(notInitializedStatus, thisPtr, PVStructurePtr(), BitSetPtr())); return; } } if (!startRequest(m_lastRequest.get() ? QOS_GET | QOS_DESTROY : QOS_GET)) { - EXCEPTION_GUARD(m_channelPutRequester->getDone(otherRequestPendingStatus, thisPtr, PVStructurePtr(), BitSetPtr())); + EXCEPTION_GUARD3(m_callback, cb, cb->getDone(otherRequestPendingStatus, thisPtr, PVStructurePtr(), BitSetPtr())); return; } @@ -900,7 +903,7 @@ public: m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this()); } catch (std::runtime_error &rte) { stopRequest(); - EXCEPTION_GUARD(m_channelPutRequester->getDone(channelNotConnected, thisPtr, PVStructurePtr(), BitSetPtr())); + EXCEPTION_GUARD3(m_callback, cb, cb->getDone(channelNotConnected, thisPtr, PVStructurePtr(), BitSetPtr())); } } @@ -911,29 +914,29 @@ public: { Lock guard(m_mutex); if (m_destroyed) { - m_channelPutRequester->putDone(destroyedStatus, thisPtr); + EXCEPTION_GUARD3(m_callback, cb, cb->putDone(destroyedStatus, thisPtr)); return; } if (!m_initialized) { - EXCEPTION_GUARD(m_channelPutRequester->putDone(notInitializedStatus, thisPtr)); + EXCEPTION_GUARD3(m_callback, cb, cb->putDone(notInitializedStatus, thisPtr)); return; } } if (!(*m_structure->getStructure() == *pvPutStructure->getStructure())) { - EXCEPTION_GUARD(m_channelPutRequester->putDone(invalidPutStructureStatus, thisPtr)); + EXCEPTION_GUARD3(m_callback, cb, cb->putDone(invalidPutStructureStatus, thisPtr)); return; } if (pvPutBitSet->size() < m_bitSet->size()) { - EXCEPTION_GUARD(m_channelPutRequester->putDone(invalidBitSetLengthStatus, thisPtr)); + EXCEPTION_GUARD3(m_callback, cb, cb->putDone(invalidBitSetLengthStatus, thisPtr)); return; } if (!startRequest(m_lastRequest.get() ? QOS_DESTROY : QOS_DEFAULT)) { - m_channelPutRequester->putDone(otherRequestPendingStatus, thisPtr); + EXCEPTION_GUARD3(m_callback, cb, cb->putDone(otherRequestPendingStatus, thisPtr)); return; } @@ -945,7 +948,7 @@ public: m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this()); } catch (std::runtime_error &rte) { stopRequest(); - EXCEPTION_GUARD(m_channelPutRequester->putDone(channelNotConnected, thisPtr)); + EXCEPTION_GUARD3(m_callback, cb, cb->putDone(channelNotConnected, thisPtr)); } } @@ -994,7 +997,7 @@ class ChannelPutGetImpl : public ChannelPutGet { public: - ChannelPutGetRequester::shared_pointer m_channelPutGetRequester; + ChannelPutGetRequester::weak_pointer m_callback; PVStructure::shared_pointer m_pvRequest; @@ -1008,9 +1011,11 @@ public: Mutex m_structureMutex; - ChannelPutGetImpl(ChannelImpl::shared_pointer const & channel, ChannelPutGetRequester::shared_pointer const & channelPutGetRequester, PVStructure::shared_pointer const & pvRequest) : - BaseRequestImpl(channel, channelPutGetRequester), - m_channelPutGetRequester(channelPutGetRequester), + ChannelPutGetImpl(ChannelImpl::shared_pointer const & channel, + ChannelPutGetRequester::shared_pointer const & requester, + PVStructure::shared_pointer const & pvRequest) : + BaseRequestImpl(channel), + m_callback(requester), m_pvRequest(pvRequest) { PVACCESS_REFCOUNT_MONITOR_CONSTRUCT(channelPutGet); @@ -1020,7 +1025,7 @@ public: { if (!m_pvRequest) { - EXCEPTION_GUARD(m_channelPutGetRequester->channelPutGetConnect(pvRequestNull, external_from_this(), StructureConstPtr(), StructureConstPtr())); + EXCEPTION_GUARD3(m_callback, cb, cb->channelPutGetConnect(pvRequestNull, external_from_this(), StructureConstPtr(), StructureConstPtr())); return; } @@ -1029,17 +1034,19 @@ public: try { resubscribeSubscription(m_channel->checkDestroyedAndGetTransport()); } catch (std::runtime_error &rte) { - EXCEPTION_GUARD(m_channelPutGetRequester->channelPutGetConnect(channelDestroyed, external_from_this(), StructureConstPtr(), StructureConstPtr())); + EXCEPTION_GUARD3(m_callback, cb, cb->channelPutGetConnect(channelDestroyed, external_from_this(), StructureConstPtr(), StructureConstPtr())); BaseRequestImpl::destroy(true); } } -public: + virtual ~ChannelPutGetImpl() { PVACCESS_REFCOUNT_MONITOR_DESTRUCT(channelPutGet); } + ChannelBaseRequester::shared_pointer getRequester() OVERRIDE FINAL { return m_callback.lock(); } + virtual void send(ByteBuffer* buffer, TransportSendControl* control) OVERRIDE FINAL { int32 pendingRequest = getPendingRequest(); if (pendingRequest < 0) @@ -1080,7 +1087,7 @@ public: virtual void initResponse(Transport::shared_pointer const & transport, int8 /*version*/, ByteBuffer* payloadBuffer, int8 /*qos*/, const Status& status) OVERRIDE FINAL { if (!status.isSuccess()) { - EXCEPTION_GUARD(m_channelPutGetRequester->channelPutGetConnect(status, external_from_this(), StructureConstPtr(), StructureConstPtr())); + EXCEPTION_GUARD3(m_callback, cb, cb->channelPutGetConnect(status, external_from_this(), StructureConstPtr(), StructureConstPtr())); return; } @@ -1093,7 +1100,7 @@ public: } // notify - EXCEPTION_GUARD(m_channelPutGetRequester->channelPutGetConnect(status, external_from_this(), m_putData->getStructure(), m_getData->getStructure())); + EXCEPTION_GUARD3(m_callback, cb, cb->channelPutGetConnect(status, external_from_this(), m_putData->getStructure(), m_getData->getStructure())); } @@ -1105,7 +1112,7 @@ public: { if (!status.isSuccess()) { - EXCEPTION_GUARD(m_channelPutGetRequester->getGetDone(status, thisPtr, PVStructurePtr(), BitSetPtr())); + EXCEPTION_GUARD3(m_callback, cb, cb->getGetDone(status, thisPtr, PVStructurePtr(), BitSetPtr())); return; } @@ -1116,13 +1123,13 @@ public: m_getData->deserialize(payloadBuffer, transport.get(), m_getDataBitSet.get()); } - EXCEPTION_GUARD(m_channelPutGetRequester->getGetDone(status, thisPtr, m_getData, m_getDataBitSet)); + EXCEPTION_GUARD3(m_callback, cb, cb->getGetDone(status, thisPtr, m_getData, m_getDataBitSet)); } else if (qos & QOS_GET_PUT) { if (!status.isSuccess()) { - EXCEPTION_GUARD(m_channelPutGetRequester->getPutDone(status, thisPtr, PVStructurePtr(), BitSetPtr())); + EXCEPTION_GUARD3(m_callback, cb, cb->getPutDone(status, thisPtr, PVStructurePtr(), BitSetPtr())); return; } @@ -1133,13 +1140,13 @@ public: m_putData->deserialize(payloadBuffer, transport.get(), m_putDataBitSet.get()); } - EXCEPTION_GUARD(m_channelPutGetRequester->getPutDone(status, thisPtr, m_putData, m_putDataBitSet)); + EXCEPTION_GUARD3(m_callback, cb, cb->getPutDone(status, thisPtr, m_putData, m_putDataBitSet)); } else { if (!status.isSuccess()) { - EXCEPTION_GUARD(m_channelPutGetRequester->putGetDone(status, thisPtr, PVStructurePtr(), BitSetPtr())); + EXCEPTION_GUARD3(m_callback, cb, cb->putGetDone(status, thisPtr, PVStructurePtr(), BitSetPtr())); return; } @@ -1150,7 +1157,7 @@ public: m_getData->deserialize(payloadBuffer, transport.get(), m_getDataBitSet.get()); } - EXCEPTION_GUARD(m_channelPutGetRequester->putGetDone(status, thisPtr, m_getData, m_getDataBitSet)); + EXCEPTION_GUARD3(m_callback, cb, cb->putGetDone(status, thisPtr, m_getData, m_getDataBitSet)); } } @@ -1162,29 +1169,29 @@ public: { Lock guard(m_mutex); if (m_destroyed) { - EXCEPTION_GUARD(m_channelPutGetRequester->putGetDone(destroyedStatus, thisPtr, PVStructurePtr(), BitSetPtr())); + EXCEPTION_GUARD3(m_callback, cb, cb->putGetDone(destroyedStatus, thisPtr, PVStructurePtr(), BitSetPtr())); return; } if (!m_initialized) { - EXCEPTION_GUARD(m_channelPutGetRequester->putGetDone(notInitializedStatus, thisPtr, PVStructurePtr(), BitSetPtr())); + EXCEPTION_GUARD3(m_callback, cb, cb->putGetDone(notInitializedStatus, thisPtr, PVStructurePtr(), BitSetPtr())); return; } } if (!(*m_putData->getStructure() == *pvPutStructure->getStructure())) { - EXCEPTION_GUARD(m_channelPutGetRequester->putGetDone(invalidPutStructureStatus, thisPtr, PVStructurePtr(), BitSetPtr())); + EXCEPTION_GUARD3(m_callback, cb, cb->putGetDone(invalidPutStructureStatus, thisPtr, PVStructurePtr(), BitSetPtr())); return; } if (bitSet->size() < m_putDataBitSet->size()) { - EXCEPTION_GUARD(m_channelPutGetRequester->putGetDone(invalidBitSetLengthStatus, thisPtr, PVStructurePtr(), BitSetPtr())); + EXCEPTION_GUARD3(m_callback, cb, cb->putGetDone(invalidBitSetLengthStatus, thisPtr, PVStructurePtr(), BitSetPtr())); return; } if (!startRequest(m_lastRequest.get() ? QOS_DESTROY : QOS_DEFAULT)) { - EXCEPTION_GUARD(m_channelPutGetRequester->putGetDone(otherRequestPendingStatus, thisPtr, PVStructurePtr(), BitSetPtr())); + EXCEPTION_GUARD3(m_callback, cb, cb->putGetDone(otherRequestPendingStatus, thisPtr, PVStructurePtr(), BitSetPtr())); return; } @@ -1196,7 +1203,7 @@ public: m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this()); } catch (std::runtime_error &rte) { stopRequest(); - EXCEPTION_GUARD(m_channelPutGetRequester->putGetDone(channelNotConnected, thisPtr, PVStructurePtr(), BitSetPtr())); + EXCEPTION_GUARD3(m_callback, cb, cb->putGetDone(channelNotConnected, thisPtr, PVStructurePtr(), BitSetPtr())); } } @@ -1207,17 +1214,17 @@ public: { Lock guard(m_mutex); if (m_destroyed) { - EXCEPTION_GUARD(m_channelPutGetRequester->getGetDone(destroyedStatus, thisPtr, PVStructurePtr(), BitSetPtr())); + EXCEPTION_GUARD3(m_callback, cb, cb->getGetDone(destroyedStatus, thisPtr, PVStructurePtr(), BitSetPtr())); return; } if (!m_initialized) { - EXCEPTION_GUARD(m_channelPutGetRequester->getGetDone(notInitializedStatus, thisPtr, PVStructurePtr(), BitSetPtr())); + EXCEPTION_GUARD3(m_callback, cb, cb->getGetDone(notInitializedStatus, thisPtr, PVStructurePtr(), BitSetPtr())); return; } } if (!startRequest(m_lastRequest.get() ? QOS_DESTROY | QOS_GET : QOS_GET)) { - EXCEPTION_GUARD(m_channelPutGetRequester->getGetDone(otherRequestPendingStatus, thisPtr, PVStructurePtr(), BitSetPtr())); + EXCEPTION_GUARD3(m_callback, cb, cb->getGetDone(otherRequestPendingStatus, thisPtr, PVStructurePtr(), BitSetPtr())); return; } @@ -1225,7 +1232,7 @@ public: m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this()); } catch (std::runtime_error &rte) { stopRequest(); - EXCEPTION_GUARD(m_channelPutGetRequester->getGetDone(channelNotConnected, thisPtr, PVStructurePtr(), BitSetPtr())); + EXCEPTION_GUARD3(m_callback, cb, cb->getGetDone(channelNotConnected, thisPtr, PVStructurePtr(), BitSetPtr())); } } @@ -1236,17 +1243,17 @@ public: { Lock guard(m_mutex); if (m_destroyed) { - m_channelPutGetRequester->getPutDone(destroyedStatus, thisPtr, PVStructurePtr(), BitSetPtr()); + EXCEPTION_GUARD3(m_callback, cb, cb->getPutDone(destroyedStatus, thisPtr, PVStructurePtr(), BitSetPtr())); return; } if (!m_initialized) { - EXCEPTION_GUARD(m_channelPutGetRequester->getPutDone(notInitializedStatus, thisPtr, PVStructurePtr(), BitSetPtr())); + EXCEPTION_GUARD3(m_callback, cb, cb->getPutDone(notInitializedStatus, thisPtr, PVStructurePtr(), BitSetPtr())); return; } } if (!startRequest(m_lastRequest.get() ? QOS_DESTROY | QOS_GET_PUT : QOS_GET_PUT)) { - m_channelPutGetRequester->getPutDone(otherRequestPendingStatus, thisPtr, PVStructurePtr(), BitSetPtr()); + EXCEPTION_GUARD3(m_callback, cb, cb->getPutDone(otherRequestPendingStatus, thisPtr, PVStructurePtr(), BitSetPtr())); return; } @@ -1254,7 +1261,7 @@ public: m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this()); } catch (std::runtime_error &rte) { stopRequest(); - EXCEPTION_GUARD(m_channelPutGetRequester->getPutDone(channelNotConnected, thisPtr, PVStructurePtr(), BitSetPtr())); + EXCEPTION_GUARD3(m_callback, cb, cb->getPutDone(channelNotConnected, thisPtr, PVStructurePtr(), BitSetPtr())); } } @@ -1306,7 +1313,7 @@ class ChannelRPCImpl : public ChannelRPC { public: - ChannelRPCRequester::shared_pointer m_channelRPCRequester; + ChannelRPCRequester::weak_pointer m_callback; PVStructure::shared_pointer m_pvRequest; @@ -1314,9 +1321,11 @@ public: Mutex m_structureMutex; - ChannelRPCImpl(ChannelImpl::shared_pointer const & channel, ChannelRPCRequester::shared_pointer const & channelRPCRequester, PVStructure::shared_pointer const & pvRequest) : - BaseRequestImpl(channel, channelRPCRequester), - m_channelRPCRequester(channelRPCRequester), + ChannelRPCImpl(ChannelImpl::shared_pointer const & channel, + ChannelRPCRequester::shared_pointer const & requester, + PVStructure::shared_pointer const & pvRequest) : + BaseRequestImpl(channel), + m_callback(requester), m_pvRequest(pvRequest) { PVACCESS_REFCOUNT_MONITOR_CONSTRUCT(channelRPC); @@ -1326,7 +1335,7 @@ public: { if (!m_pvRequest) { - EXCEPTION_GUARD(m_channelRPCRequester->channelRPCConnect(pvRequestNull, external_from_this())); + EXCEPTION_GUARD3(m_callback, cb, cb->channelRPCConnect(pvRequestNull, external_from_this())); return; } @@ -1336,17 +1345,18 @@ public: try { resubscribeSubscription(m_channel->checkDestroyedAndGetTransport()); } catch (std::runtime_error &rte) { - EXCEPTION_GUARD(m_channelRPCRequester->channelRPCConnect(channelDestroyed, external_from_this())); + EXCEPTION_GUARD3(m_callback, cb, cb->channelRPCConnect(channelDestroyed, external_from_this())); BaseRequestImpl::destroy(true); } } -public: virtual ~ChannelRPCImpl() { PVACCESS_REFCOUNT_MONITOR_DESTRUCT(channelRPC); } + ChannelBaseRequester::shared_pointer getRequester() OVERRIDE FINAL { return m_callback.lock(); } + virtual void send(ByteBuffer* buffer, TransportSendControl* control) OVERRIDE FINAL { int32 pendingRequest = getPendingRequest(); if (pendingRequest < 0) @@ -1385,12 +1395,12 @@ public: virtual void initResponse(Transport::shared_pointer const & /*transport*/, int8 /*version*/, ByteBuffer* /*payloadBuffer*/, int8 /*qos*/, const Status& status) OVERRIDE FINAL { if (!status.isSuccess()) { - EXCEPTION_GUARD(m_channelRPCRequester->channelRPCConnect(status, external_from_this())); + EXCEPTION_GUARD3(m_callback, cb, cb->channelRPCConnect(status, external_from_this())); return; } // notify - EXCEPTION_GUARD(m_channelRPCRequester->channelRPCConnect(status, external_from_this())); + EXCEPTION_GUARD3(m_callback, cb, cb->channelRPCConnect(status, external_from_this())); } virtual void normalResponse(Transport::shared_pointer const & transport, int8 /*version*/, ByteBuffer* payloadBuffer, int8 /*qos*/, const Status& status) OVERRIDE FINAL { @@ -1399,13 +1409,13 @@ public: if (!status.isSuccess()) { - EXCEPTION_GUARD(m_channelRPCRequester->requestDone(status, thisPtr, PVStructurePtr())); + EXCEPTION_GUARD3(m_callback, cb, cb->requestDone(status, thisPtr, PVStructurePtr())); return; } PVStructure::shared_pointer response(SerializationHelper::deserializeStructureFull(payloadBuffer, transport.get())); - EXCEPTION_GUARD(m_channelRPCRequester->requestDone(status, thisPtr, response)); + EXCEPTION_GUARD3(m_callback, cb, cb->requestDone(status, thisPtr, response)); } virtual void request(epics::pvData::PVStructure::shared_pointer const & pvArgument) OVERRIDE FINAL { @@ -1415,17 +1425,17 @@ public: { Lock guard(m_mutex); if (m_destroyed) { - EXCEPTION_GUARD(m_channelRPCRequester->requestDone(destroyedStatus, thisPtr, PVStructurePtr())); + EXCEPTION_GUARD3(m_callback, cb, cb->requestDone(destroyedStatus, thisPtr, PVStructurePtr())); return; } if (!m_initialized) { - EXCEPTION_GUARD(m_channelRPCRequester->requestDone(notInitializedStatus, thisPtr, PVStructurePtr())); + EXCEPTION_GUARD3(m_callback, cb, cb->requestDone(notInitializedStatus, thisPtr, PVStructurePtr())); return; } } if (!startRequest(m_lastRequest.get() ? QOS_DESTROY : QOS_DEFAULT)) { - EXCEPTION_GUARD(m_channelRPCRequester->requestDone(otherRequestPendingStatus, thisPtr, PVStructurePtr())); + EXCEPTION_GUARD3(m_callback, cb, cb->requestDone(otherRequestPendingStatus, thisPtr, PVStructurePtr())); return; } @@ -1437,7 +1447,7 @@ public: m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this()); } catch (std::runtime_error &rte) { stopRequest(); - EXCEPTION_GUARD(m_channelRPCRequester->requestDone(channelNotConnected, thisPtr, PVStructurePtr())); + EXCEPTION_GUARD3(m_callback, cb, cb->requestDone(channelNotConnected, thisPtr, PVStructurePtr())); } } @@ -1487,7 +1497,7 @@ class ChannelArrayImpl : public ChannelArray { public: - ChannelArrayRequester::shared_pointer m_channelArrayRequester; + ChannelArrayRequester::weak_pointer m_callback; PVStructure::shared_pointer m_pvRequest; @@ -1502,9 +1512,11 @@ public: Mutex m_structureMutex; - ChannelArrayImpl(ChannelImpl::shared_pointer const & channel, ChannelArrayRequester::shared_pointer const & channelArrayRequester, PVStructure::shared_pointer const & pvRequest) : - BaseRequestImpl(channel, channelArrayRequester), - m_channelArrayRequester(channelArrayRequester), + ChannelArrayImpl(ChannelImpl::shared_pointer const & channel, + ChannelArrayRequester::shared_pointer const & requester, + PVStructure::shared_pointer const & pvRequest) : + BaseRequestImpl(channel), + m_callback(requester), m_pvRequest(pvRequest), m_offset(0), m_count(0), m_length(0) @@ -1516,7 +1528,7 @@ public: { if (!m_pvRequest) { - EXCEPTION_GUARD(m_channelArrayRequester->channelArrayConnect(pvRequestNull, external_from_this(), Array::shared_pointer())); + EXCEPTION_GUARD3(m_callback, cb, cb->channelArrayConnect(pvRequestNull, external_from_this(), Array::shared_pointer())); return; } @@ -1526,17 +1538,18 @@ public: try { resubscribeSubscription(m_channel->checkDestroyedAndGetTransport()); } catch (std::runtime_error &rte) { - EXCEPTION_GUARD(m_channelArrayRequester->channelArrayConnect(channelDestroyed, external_from_this(), Array::shared_pointer())); + EXCEPTION_GUARD3(m_callback, cb, cb->channelArrayConnect(channelDestroyed, external_from_this(), Array::shared_pointer())); BaseRequestImpl::destroy(true); } } -public: virtual ~ChannelArrayImpl() { PVACCESS_REFCOUNT_MONITOR_DESTRUCT(channelArray); } + ChannelBaseRequester::shared_pointer getRequester() OVERRIDE FINAL { return m_callback.lock(); } + virtual void send(ByteBuffer* buffer, TransportSendControl* control) OVERRIDE FINAL { int32 pendingRequest = getPendingRequest(); if (pendingRequest < 0) @@ -1590,7 +1603,7 @@ public: virtual void initResponse(Transport::shared_pointer const & transport, int8 /*version*/, ByteBuffer* payloadBuffer, int8 /*qos*/, const Status& status) OVERRIDE FINAL { if (!status.isSuccess()) { - EXCEPTION_GUARD(m_channelArrayRequester->channelArrayConnect(status, external_from_this(), Array::shared_pointer())); + EXCEPTION_GUARD3(m_callback, cb, cb->channelArrayConnect(status, external_from_this(), Array::shared_pointer())); return; } @@ -1602,7 +1615,7 @@ public: } // notify - EXCEPTION_GUARD(m_channelArrayRequester->channelArrayConnect(status, external_from_this(), m_arrayData->getArray())); + EXCEPTION_GUARD3(m_callback, cb, cb->channelArrayConnect(status, external_from_this(), m_arrayData->getArray())); } virtual void normalResponse(Transport::shared_pointer const & transport, int8 /*version*/, ByteBuffer* payloadBuffer, int8 qos, const Status& status) OVERRIDE FINAL { @@ -1613,7 +1626,7 @@ public: { if (!status.isSuccess()) { - m_channelArrayRequester->getArrayDone(status, thisPtr, PVArray::shared_pointer()); + EXCEPTION_GUARD3(m_callback, cb, cb->getArrayDone(status, thisPtr, PVArray::shared_pointer())); return; } @@ -1622,21 +1635,21 @@ public: m_arrayData->deserialize(payloadBuffer, transport.get()); } - EXCEPTION_GUARD(m_channelArrayRequester->getArrayDone(status, thisPtr, m_arrayData)); + EXCEPTION_GUARD3(m_callback, cb, cb->getArrayDone(status, thisPtr, m_arrayData)); } else if (qos & QOS_GET_PUT) { - EXCEPTION_GUARD(m_channelArrayRequester->setLengthDone(status, thisPtr)); + EXCEPTION_GUARD3(m_callback, cb, cb->setLengthDone(status, thisPtr)); } else if (qos & QOS_PROCESS) { size_t length = SerializeHelper::readSize(payloadBuffer, transport.get()); - EXCEPTION_GUARD(m_channelArrayRequester->getLengthDone(status, thisPtr, length)); + EXCEPTION_GUARD3(m_callback, cb, cb->getLengthDone(status, thisPtr, length)); } else { - EXCEPTION_GUARD(m_channelArrayRequester->putArrayDone(status, thisPtr)); + EXCEPTION_GUARD3(m_callback, cb, cb->putArrayDone(status, thisPtr)); } } @@ -1650,17 +1663,17 @@ public: { Lock guard(m_mutex); if (m_destroyed) { - EXCEPTION_GUARD(m_channelArrayRequester->getArrayDone(destroyedStatus, thisPtr, PVArray::shared_pointer())); + EXCEPTION_GUARD3(m_callback, cb, cb->getArrayDone(destroyedStatus, thisPtr, PVArray::shared_pointer())); return; } if (!m_initialized) { - EXCEPTION_GUARD(m_channelArrayRequester->getArrayDone(notInitializedStatus, thisPtr, PVArray::shared_pointer())); + EXCEPTION_GUARD3(m_callback, cb, cb->getArrayDone(notInitializedStatus, thisPtr, PVArray::shared_pointer())); return; } } if (!startRequest(m_lastRequest.get() ? QOS_DESTROY | QOS_GET : QOS_GET)) { - EXCEPTION_GUARD(m_channelArrayRequester->getArrayDone(otherRequestPendingStatus, thisPtr, PVArray::shared_pointer())); + EXCEPTION_GUARD3(m_callback, cb, cb->getArrayDone(otherRequestPendingStatus, thisPtr, PVArray::shared_pointer())); return; } @@ -1674,7 +1687,7 @@ public: m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this()); } catch (std::runtime_error &rte) { stopRequest(); - EXCEPTION_GUARD(m_channelArrayRequester->getArrayDone(channelNotConnected, thisPtr, PVArray::shared_pointer())); + EXCEPTION_GUARD3(m_callback, cb, cb->getArrayDone(channelNotConnected, thisPtr, PVArray::shared_pointer())); } } @@ -1687,23 +1700,23 @@ public: { Lock guard(m_mutex); if (m_destroyed) { - EXCEPTION_GUARD(m_channelArrayRequester->putArrayDone(destroyedStatus, thisPtr)); + EXCEPTION_GUARD3(m_callback, cb, cb->putArrayDone(destroyedStatus, thisPtr)); return; } if (!m_initialized) { - EXCEPTION_GUARD(m_channelArrayRequester->putArrayDone(notInitializedStatus, thisPtr)); + EXCEPTION_GUARD3(m_callback, cb, cb->putArrayDone(notInitializedStatus, thisPtr)); return; } } if (!(*m_arrayData->getArray() == *putArray->getArray())) { - EXCEPTION_GUARD(m_channelArrayRequester->putArrayDone(invalidPutArrayStatus, thisPtr)); + EXCEPTION_GUARD3(m_callback, cb, cb->putArrayDone(invalidPutArrayStatus, thisPtr)); return; } if (!startRequest(m_lastRequest.get() ? QOS_DESTROY : QOS_DEFAULT)) { - EXCEPTION_GUARD(m_channelArrayRequester->putArrayDone(otherRequestPendingStatus, thisPtr)); + EXCEPTION_GUARD3(m_callback, cb, cb->putArrayDone(otherRequestPendingStatus, thisPtr)); return; } @@ -1718,7 +1731,7 @@ public: m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this()); } catch (std::runtime_error &rte) { stopRequest(); - EXCEPTION_GUARD(m_channelArrayRequester->putArrayDone(channelNotConnected, thisPtr)); + EXCEPTION_GUARD3(m_callback, cb, cb->putArrayDone(channelNotConnected, thisPtr)); } } @@ -1729,17 +1742,17 @@ public: { Lock guard(m_mutex); if (m_destroyed) { - EXCEPTION_GUARD(m_channelArrayRequester->setLengthDone(destroyedStatus, thisPtr)); + EXCEPTION_GUARD3(m_callback, cb, cb->setLengthDone(destroyedStatus, thisPtr)); return; } if (!m_initialized) { - EXCEPTION_GUARD(m_channelArrayRequester->setLengthDone(notInitializedStatus, thisPtr)); + EXCEPTION_GUARD3(m_callback, cb, cb->setLengthDone(notInitializedStatus, thisPtr)); return; } } if (!startRequest(m_lastRequest.get() ? QOS_DESTROY | QOS_GET_PUT : QOS_GET_PUT)) { - EXCEPTION_GUARD(m_channelArrayRequester->setLengthDone(otherRequestPendingStatus, thisPtr)); + EXCEPTION_GUARD3(m_callback, cb, cb->setLengthDone(otherRequestPendingStatus, thisPtr)); return; } @@ -1751,7 +1764,7 @@ public: m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this()); } catch (std::runtime_error &rte) { stopRequest(); - EXCEPTION_GUARD(m_channelArrayRequester->setLengthDone(channelNotConnected, thisPtr)); + EXCEPTION_GUARD3(m_callback, cb, cb->setLengthDone(channelNotConnected, thisPtr)); } } @@ -1763,17 +1776,17 @@ public: { Lock guard(m_mutex); if (m_destroyed) { - EXCEPTION_GUARD(m_channelArrayRequester->getLengthDone(destroyedStatus, thisPtr, 0)); + EXCEPTION_GUARD3(m_callback, cb, cb->getLengthDone(destroyedStatus, thisPtr, 0)); return; } if (!m_initialized) { - EXCEPTION_GUARD(m_channelArrayRequester->getLengthDone(notInitializedStatus, thisPtr, 0)); + EXCEPTION_GUARD3(m_callback, cb, cb->getLengthDone(notInitializedStatus, thisPtr, 0)); return; } } if (!startRequest(m_lastRequest.get() ? QOS_DESTROY | QOS_PROCESS : QOS_PROCESS)) { - EXCEPTION_GUARD(m_channelArrayRequester->getLengthDone(otherRequestPendingStatus, thisPtr, 0)); + EXCEPTION_GUARD3(m_callback, cb, cb->getLengthDone(otherRequestPendingStatus, thisPtr, 0)); return; } @@ -1781,7 +1794,7 @@ public: m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this()); } catch (std::runtime_error &rte) { stopRequest(); - EXCEPTION_GUARD(m_channelArrayRequester->getLengthDone(channelNotConnected, thisPtr, 0)); + EXCEPTION_GUARD3(m_callback, cb, cb->getLengthDone(channelNotConnected, thisPtr, 0)); } } @@ -1848,7 +1861,7 @@ private: MonitorElementQueue m_monitorQueue; - MonitorRequester::shared_pointer m_callback; + MonitorRequester::weak_pointer m_callback; Mutex m_mutex; @@ -1877,7 +1890,7 @@ private: public: MonitorStrategyQueue(ChannelImpl::shared_pointer channel, pvAccessID ioid, - MonitorRequester::shared_pointer const & callback, + MonitorRequester::weak_pointer const & callback, int32 queueSize, bool pipeline, int32 ackAny) : m_queueSize(queueSize), m_lastStructure(), @@ -1989,7 +2002,7 @@ public: if (!m_overrunInProgress) { - EXCEPTION_GUARD(m_callback->monitorEvent(shared_from_this())); + EXCEPTION_GUARD3(m_callback, cb, cb->monitorEvent(shared_from_this())); } } @@ -2004,7 +2017,7 @@ public: if (notifyUnlisten) { - EXCEPTION_GUARD(m_callback->unlisten(shared_from_this())); + EXCEPTION_GUARD3(m_callback, cb, cb->unlisten(shared_from_this())); } } @@ -2016,7 +2029,7 @@ public: if (m_unlisten) { m_unlisten = false; guard.unlock(); - EXCEPTION_GUARD(m_callback->unlisten(shared_from_this())); + EXCEPTION_GUARD3(m_callback, cb, cb->unlisten(shared_from_this())); } return m_nullMonitorElement; } @@ -2131,8 +2144,7 @@ class ChannelMonitorImpl : public Monitor { public: - typedef MonitorRequester requester_type; - MonitorRequester::shared_pointer m_monitorRequester; + MonitorRequester::weak_pointer m_callback; bool m_started; PVStructure::shared_pointer m_pvRequest; @@ -2145,11 +2157,11 @@ public: ChannelMonitorImpl( ChannelImpl::shared_pointer const & channel, - MonitorRequester::shared_pointer const & monitorRequester, + MonitorRequester::shared_pointer const & requester, PVStructure::shared_pointer const & pvRequest) : - BaseRequestImpl(channel, monitorRequester), - m_monitorRequester(monitorRequester), + BaseRequestImpl(channel), + m_callback(requester), m_started(false), m_pvRequest(pvRequest), m_queueSize(2), @@ -2163,7 +2175,7 @@ public: { if (!m_pvRequest) { - EXCEPTION_GUARD(m_monitorRequester->monitorConnect(pvRequestNull, external_from_this(), StructureConstPtr())); + EXCEPTION_GUARD3(m_callback, cb, cb->monitorConnect(pvRequestNull, external_from_this(), StructureConstPtr())); return; } @@ -2212,7 +2224,7 @@ public: BaseRequestImpl::activate(); std::tr1::shared_ptr tp( - new MonitorStrategyQueue(m_channel, m_ioid, m_monitorRequester, m_queueSize, + new MonitorStrategyQueue(m_channel, m_ioid, m_callback, m_queueSize, m_pipeline, m_ackAny) ); m_monitorStrategy = tp; @@ -2221,7 +2233,7 @@ public: try { resubscribeSubscription(m_channel->checkDestroyedAndGetTransport()); } catch (std::runtime_error &rte) { - EXCEPTION_GUARD(m_monitorRequester->monitorConnect(channelDestroyed, external_from_this(), StructureConstPtr())); + EXCEPTION_GUARD3(m_callback, cb, cb->monitorConnect(channelDestroyed, external_from_this(), StructureConstPtr())); BaseRequestImpl::destroy(true); } } @@ -2236,12 +2248,13 @@ public: } } -public: virtual ~ChannelMonitorImpl() { PVACCESS_REFCOUNT_MONITOR_DESTRUCT(channelMonitor); } + ChannelBaseRequester::shared_pointer getRequester() OVERRIDE FINAL { return m_callback.lock(); } + virtual void send(ByteBuffer* buffer, TransportSendControl* control) OVERRIDE FINAL { int32 pendingRequest = getPendingRequest(); if (pendingRequest < 0) @@ -2280,7 +2293,7 @@ public: { if (!status.isSuccess()) { - EXCEPTION_GUARD(m_monitorRequester->monitorConnect(status, external_from_this(), StructureConstPtr())); + EXCEPTION_GUARD3(m_callback, cb, cb->monitorConnect(status, external_from_this(), StructureConstPtr())); return; } @@ -2293,7 +2306,7 @@ public: bool restoreStartedState = m_started; // notify - EXCEPTION_GUARD(m_monitorRequester->monitorConnect(status, external_from_this(), structure)); + EXCEPTION_GUARD3(m_callback, cb, cb->monitorConnect(status, external_from_this(), structure)); if (restoreStartedState) start(); @@ -4730,11 +4743,12 @@ class ChannelGetFieldRequestImpl : public std::tr1::enable_shared_from_this { public: + typedef GetFieldRequester requester_type; POINTER_DEFINITIONS(ChannelGetFieldRequestImpl); const InternalClientContextImpl::InternalChannelImpl::shared_pointer m_channel; - const GetFieldRequester::shared_pointer m_callback; + const GetFieldRequester::weak_pointer m_callback; string m_subField; pvAccessID m_ioid; @@ -4795,11 +4809,11 @@ public: return; m_notified = true; } - EXCEPTION_GUARD(m_callback->getDone(sts, field)); + EXCEPTION_GUARD3(m_callback, cb, cb->getDone(sts, field)); } ChannelBaseRequester::shared_pointer getRequester() OVERRIDE FINAL { - return m_callback; + return m_callback.lock(); } pvAccessID getIOID() const OVERRIDE FINAL {