diff --git a/src/remoteClient/clientContextImpl.cpp b/src/remoteClient/clientContextImpl.cpp index 1edad7d..4031504 100644 --- a/src/remoteClient/clientContextImpl.cpp +++ b/src/remoteClient/clientContextImpl.cpp @@ -134,6 +134,8 @@ protected: pvAccessID m_ioid; + // holds: NULL_REQUEST, PURE_DESTROY_REQUEST, PURE_CANCEL_REQUEST, or + // a mask of QOS_* int32 m_pendingRequest; Mutex m_mutex; @@ -221,22 +223,29 @@ protected: bool startRequest(int32 qos) { Lock guard(m_mutex); - // we allow pure destroy... - if (m_pendingRequest != NULL_REQUEST && qos != PURE_DESTROY_REQUEST && qos != PURE_CANCEL_REQUEST) - return false; + if(qos==PURE_DESTROY_REQUEST) + {/* always allow destroy */} + else if(qos==PURE_CANCEL_REQUEST && m_pendingRequest!=PURE_DESTROY_REQUEST) + {/* cancel overrides all but destroy */} + else if(m_pendingRequest==NULL_REQUEST) + {/* anything whenidle */} + else + {return false; /* others not allowed */} m_pendingRequest = qos; return true; } - void stopRequest() { + int32 beginRequest() { Lock guard(m_mutex); + int32 ret = m_pendingRequest; m_pendingRequest = NULL_REQUEST; + return ret; } - int32 getPendingRequest() { + void abortRequest() { Lock guard(m_mutex); - return m_pendingRequest; + m_pendingRequest = NULL_REQUEST; } public: @@ -369,7 +378,7 @@ public: else if (status == Channel::DISCONNECTED) { m_subscribed.clear(); - stopRequest(); + abortRequest(); } // TODO notify? } @@ -384,10 +393,11 @@ public: void updateSubscription() {} - virtual void send(ByteBuffer* buffer, TransportSendControl* control) OVERRIDE { - int8 qos = getPendingRequest(); - if (qos == -1) + // sub-class send() calls me + void base_send(ByteBuffer* buffer, TransportSendControl* control, int8 qos) { + if (qos == NULL_REQUEST) { return; + } else if (qos == PURE_DESTROY_REQUEST) { control->startMessage((int8)CMD_DESTROY_REQUEST, 8); @@ -400,7 +410,6 @@ public: buffer->putInt(m_channel->getServerChannelID()); buffer->putInt(m_ioid); } - stopRequest(); } }; @@ -460,10 +469,10 @@ public: ChannelBaseRequester::shared_pointer getRequester() OVERRIDE FINAL { return m_callback.lock(); } virtual void send(ByteBuffer* buffer, TransportSendControl* control) OVERRIDE FINAL { - int32 pendingRequest = getPendingRequest(); + int32 pendingRequest = beginRequest(); if (pendingRequest < 0) { - BaseRequestImpl::send(buffer, control); + base_send(buffer, control, pendingRequest); return; } @@ -477,8 +486,6 @@ public: // pvRequest SerializationHelper::serializePVRequest(buffer, control, m_pvRequest); } - - stopRequest(); } virtual void initResponse(Transport::shared_pointer const & /*transport*/, int8 /*version*/, ByteBuffer* /*payloadBuffer*/, int8 /*qos*/, const Status& status) OVERRIDE FINAL { @@ -513,7 +520,7 @@ public: try { m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this()); } catch (std::runtime_error &rte) { - stopRequest(); + abortRequest(); EXCEPTION_GUARD3(m_callback, cb, cb->processDone(channelNotConnected, thisPtr)); } } @@ -597,12 +604,12 @@ public: ChannelBaseRequester::shared_pointer getRequester() OVERRIDE FINAL { return m_callback.lock(); } virtual void send(ByteBuffer* buffer, TransportSendControl* control) OVERRIDE FINAL { - int32 pendingRequest = getPendingRequest(); + int32 pendingRequest = beginRequest(); bool initStage = ((pendingRequest & QOS_INIT) != 0); if (pendingRequest < 0) { - BaseRequestImpl::send(buffer, control); + base_send(buffer, control, pendingRequest); return; } @@ -616,8 +623,6 @@ public: // pvRequest SerializationHelper::serializePVRequest(buffer, control, m_pvRequest); } - - stopRequest(); } virtual void initResponse(Transport::shared_pointer const & transport, int8 /*version*/, ByteBuffer* payloadBuffer, int8 /*qos*/, const Status& status) OVERRIDE FINAL { @@ -678,7 +683,7 @@ public: try { m_channel->checkAndGetTransport()->flushSendQueue(); } catch (std::runtime_error &rte) { - stopRequest(); + abortRequest(); EXCEPTION_GUARD3(m_callback, cb, cb->getDone(channelNotConnected, thisPtr)); } return; @@ -693,7 +698,7 @@ public: m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this()); //TODO bulk hack m_channel->checkAndGetTransport()->enqueueOnlySendRequest(thisSender); } catch (std::runtime_error &rte) { - stopRequest(); + abortRequest(); EXCEPTION_GUARD3(m_callback, cb, cb->getDone(channelNotConnected, thisPtr, PVStructurePtr(), BitSetPtr())); } } @@ -787,10 +792,10 @@ public: ChannelBaseRequester::shared_pointer getRequester() OVERRIDE FINAL { return m_callback.lock(); } virtual void send(ByteBuffer* buffer, TransportSendControl* control) OVERRIDE FINAL { - int32 pendingRequest = getPendingRequest(); + int32 pendingRequest = beginRequest(); if (pendingRequest < 0) { - BaseRequestImpl::send(buffer, control); + base_send(buffer, control, pendingRequest); return; } @@ -815,8 +820,6 @@ public: m_structure->serialize(buffer, control, m_bitSet.get()); } } - - stopRequest(); } virtual void initResponse(Transport::shared_pointer const & transport, int8 /*version*/, ByteBuffer* payloadBuffer, int8 /*qos*/, const Status& status) OVERRIDE FINAL { @@ -888,7 +891,7 @@ public: try { m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this()); } catch (std::runtime_error &rte) { - stopRequest(); + abortRequest(); EXCEPTION_GUARD3(m_callback, cb, cb->getDone(channelNotConnected, thisPtr, PVStructurePtr(), BitSetPtr())); } } @@ -933,7 +936,7 @@ public: unlock(); m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this()); } catch (std::runtime_error &rte) { - stopRequest(); + abortRequest(); EXCEPTION_GUARD3(m_callback, cb, cb->putDone(channelNotConnected, thisPtr)); } } @@ -1030,10 +1033,10 @@ public: ChannelBaseRequester::shared_pointer getRequester() OVERRIDE FINAL { return m_callback.lock(); } virtual void send(ByteBuffer* buffer, TransportSendControl* control) OVERRIDE FINAL { - int32 pendingRequest = getPendingRequest(); + int32 pendingRequest = beginRequest(); if (pendingRequest < 0) { - BaseRequestImpl::send(buffer, control); + base_send(buffer, control, pendingRequest); return; } @@ -1062,8 +1065,6 @@ public: m_putData->serialize(buffer, control, m_putDataBitSet.get()); } } - - stopRequest(); } virtual void initResponse(Transport::shared_pointer const & transport, int8 /*version*/, ByteBuffer* payloadBuffer, int8 /*qos*/, const Status& status) OVERRIDE FINAL { @@ -1184,7 +1185,7 @@ public: unlock(); m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this()); } catch (std::runtime_error &rte) { - stopRequest(); + abortRequest(); EXCEPTION_GUARD3(m_callback, cb, cb->putGetDone(channelNotConnected, thisPtr, PVStructurePtr(), BitSetPtr())); } } @@ -1213,7 +1214,7 @@ public: try { m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this()); } catch (std::runtime_error &rte) { - stopRequest(); + abortRequest(); EXCEPTION_GUARD3(m_callback, cb, cb->getGetDone(channelNotConnected, thisPtr, PVStructurePtr(), BitSetPtr())); } } @@ -1242,7 +1243,7 @@ public: try { m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this()); } catch (std::runtime_error &rte) { - stopRequest(); + abortRequest(); EXCEPTION_GUARD3(m_callback, cb, cb->getPutDone(channelNotConnected, thisPtr, PVStructurePtr(), BitSetPtr())); } } @@ -1336,10 +1337,10 @@ public: ChannelBaseRequester::shared_pointer getRequester() OVERRIDE FINAL { return m_callback.lock(); } virtual void send(ByteBuffer* buffer, TransportSendControl* control) OVERRIDE FINAL { - int32 pendingRequest = getPendingRequest(); + int32 pendingRequest = beginRequest(); if (pendingRequest < 0) { - BaseRequestImpl::send(buffer, control); + base_send(buffer, control, pendingRequest); return; } @@ -1366,8 +1367,6 @@ public: m_structure.reset(); } } - - stopRequest(); } virtual void initResponse(Transport::shared_pointer const & /*transport*/, int8 /*version*/, ByteBuffer* /*payloadBuffer*/, int8 /*qos*/, const Status& status) OVERRIDE FINAL { @@ -1424,7 +1423,7 @@ public: m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this()); } catch (std::runtime_error &rte) { - stopRequest(); + abortRequest(); EXCEPTION_GUARD3(m_callback, cb, cb->requestDone(channelNotConnected, thisPtr, PVStructurePtr())); } } @@ -1525,10 +1524,10 @@ public: ChannelBaseRequester::shared_pointer getRequester() OVERRIDE FINAL { return m_callback.lock(); } virtual void send(ByteBuffer* buffer, TransportSendControl* control) OVERRIDE FINAL { - int32 pendingRequest = getPendingRequest(); + int32 pendingRequest = beginRequest(); if (pendingRequest < 0) { - BaseRequestImpl::send(buffer, control); + base_send(buffer, control, pendingRequest); return; } @@ -1570,8 +1569,6 @@ public: m_arrayData->serialize(buffer, control, 0, m_count ? m_count : m_arrayData->getLength()); // put from 0 offset (see API doc), m_count == 0 means entire array } } - - stopRequest(); } virtual void initResponse(Transport::shared_pointer const & transport, int8 /*version*/, ByteBuffer* payloadBuffer, int8 /*qos*/, const Status& status) OVERRIDE FINAL { @@ -1660,7 +1657,7 @@ public: } m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this()); } catch (std::runtime_error &rte) { - stopRequest(); + abortRequest(); EXCEPTION_GUARD3(m_callback, cb, cb->getArrayDone(channelNotConnected, thisPtr, PVArray::shared_pointer())); } } @@ -1704,7 +1701,7 @@ public: } m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this()); } catch (std::runtime_error &rte) { - stopRequest(); + abortRequest(); EXCEPTION_GUARD3(m_callback, cb, cb->putArrayDone(channelNotConnected, thisPtr)); } } @@ -1737,7 +1734,7 @@ public: } m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this()); } catch (std::runtime_error &rte) { - stopRequest(); + abortRequest(); EXCEPTION_GUARD3(m_callback, cb, cb->setLengthDone(channelNotConnected, thisPtr)); } } @@ -1767,7 +1764,7 @@ public: try { m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this()); } catch (std::runtime_error &rte) { - stopRequest(); + abortRequest(); EXCEPTION_GUARD3(m_callback, cb, cb->getLengthDone(channelNotConnected, thisPtr, 0)); } } @@ -2231,10 +2228,10 @@ public: ChannelBaseRequester::shared_pointer getRequester() OVERRIDE FINAL { return m_callback.lock(); } virtual void send(ByteBuffer* buffer, TransportSendControl* control) OVERRIDE FINAL { - int32 pendingRequest = getPendingRequest(); + int32 pendingRequest = beginRequest(); if (pendingRequest < 0) { - BaseRequestImpl::send(buffer, control); + base_send(buffer, control, pendingRequest); return; } @@ -2255,8 +2252,6 @@ public: buffer->putInt(m_queueSize); } } - - stopRequest(); } virtual void initResponse( @@ -2373,7 +2368,7 @@ public: m_started = true; return Status::Ok; } catch (std::runtime_error &rte) { - stopRequest(); + abortRequest(); return BaseRequestImpl::channelNotConnected; } } @@ -2399,7 +2394,7 @@ public: m_started = false; return Status::Ok; } catch (std::runtime_error &rte) { - stopRequest(); + abortRequest(); return BaseRequestImpl::channelNotConnected; } }