tighten up client operation send state handling

This commit is contained in:
Michael Davidsaver
2017-11-16 16:16:03 -06:00
parent 1d56ee7283
commit 00e01d5211

View File

@@ -134,6 +134,8 @@ protected:
pvAccessID m_ioid;
// holds: NULL_REQUEST, PURE_DESTROY_REQUEST, PURE_CANCEL_REQUEST, or
// a mask of QOS_*
int32 m_pendingRequest;
Mutex m_mutex;
@@ -221,22 +223,29 @@ protected:
bool startRequest(int32 qos) {
Lock guard(m_mutex);
// we allow pure destroy...
if (m_pendingRequest != NULL_REQUEST && qos != PURE_DESTROY_REQUEST && qos != PURE_CANCEL_REQUEST)
return false;
if(qos==PURE_DESTROY_REQUEST)
{/* always allow destroy */}
else if(qos==PURE_CANCEL_REQUEST && m_pendingRequest!=PURE_DESTROY_REQUEST)
{/* cancel overrides all but destroy */}
else if(m_pendingRequest==NULL_REQUEST)
{/* anything whenidle */}
else
{return false; /* others not allowed */}
m_pendingRequest = qos;
return true;
}
void stopRequest() {
int32 beginRequest() {
Lock guard(m_mutex);
int32 ret = m_pendingRequest;
m_pendingRequest = NULL_REQUEST;
return ret;
}
int32 getPendingRequest() {
void abortRequest() {
Lock guard(m_mutex);
return m_pendingRequest;
m_pendingRequest = NULL_REQUEST;
}
public:
@@ -369,7 +378,7 @@ public:
else if (status == Channel::DISCONNECTED)
{
m_subscribed.clear();
stopRequest();
abortRequest();
}
// TODO notify?
}
@@ -384,10 +393,11 @@ public:
void updateSubscription() {}
virtual void send(ByteBuffer* buffer, TransportSendControl* control) OVERRIDE {
int8 qos = getPendingRequest();
if (qos == -1)
// sub-class send() calls me
void base_send(ByteBuffer* buffer, TransportSendControl* control, int8 qos) {
if (qos == NULL_REQUEST) {
return;
}
else if (qos == PURE_DESTROY_REQUEST)
{
control->startMessage((int8)CMD_DESTROY_REQUEST, 8);
@@ -400,7 +410,6 @@ public:
buffer->putInt(m_channel->getServerChannelID());
buffer->putInt(m_ioid);
}
stopRequest();
}
};
@@ -460,10 +469,10 @@ public:
ChannelBaseRequester::shared_pointer getRequester() OVERRIDE FINAL { return m_callback.lock(); }
virtual void send(ByteBuffer* buffer, TransportSendControl* control) OVERRIDE FINAL {
int32 pendingRequest = getPendingRequest();
int32 pendingRequest = beginRequest();
if (pendingRequest < 0)
{
BaseRequestImpl::send(buffer, control);
base_send(buffer, control, pendingRequest);
return;
}
@@ -477,8 +486,6 @@ public:
// pvRequest
SerializationHelper::serializePVRequest(buffer, control, m_pvRequest);
}
stopRequest();
}
virtual void initResponse(Transport::shared_pointer const & /*transport*/, int8 /*version*/, ByteBuffer* /*payloadBuffer*/, int8 /*qos*/, const Status& status) OVERRIDE FINAL {
@@ -513,7 +520,7 @@ public:
try {
m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this<BaseRequestImpl>());
} catch (std::runtime_error &rte) {
stopRequest();
abortRequest();
EXCEPTION_GUARD3(m_callback, cb, cb->processDone(channelNotConnected, thisPtr));
}
}
@@ -597,12 +604,12 @@ public:
ChannelBaseRequester::shared_pointer getRequester() OVERRIDE FINAL { return m_callback.lock(); }
virtual void send(ByteBuffer* buffer, TransportSendControl* control) OVERRIDE FINAL {
int32 pendingRequest = getPendingRequest();
int32 pendingRequest = beginRequest();
bool initStage = ((pendingRequest & QOS_INIT) != 0);
if (pendingRequest < 0)
{
BaseRequestImpl::send(buffer, control);
base_send(buffer, control, pendingRequest);
return;
}
@@ -616,8 +623,6 @@ public:
// pvRequest
SerializationHelper::serializePVRequest(buffer, control, m_pvRequest);
}
stopRequest();
}
virtual void initResponse(Transport::shared_pointer const & transport, int8 /*version*/, ByteBuffer* payloadBuffer, int8 /*qos*/, const Status& status) OVERRIDE FINAL {
@@ -678,7 +683,7 @@ public:
try {
m_channel->checkAndGetTransport()->flushSendQueue();
} catch (std::runtime_error &rte) {
stopRequest();
abortRequest();
EXCEPTION_GUARD3(m_callback, cb, cb->getDone(channelNotConnected, thisPtr));
}
return;
@@ -693,7 +698,7 @@ public:
m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this<ChannelGetImpl>());
//TODO bulk hack m_channel->checkAndGetTransport()->enqueueOnlySendRequest(thisSender);
} catch (std::runtime_error &rte) {
stopRequest();
abortRequest();
EXCEPTION_GUARD3(m_callback, cb, cb->getDone(channelNotConnected, thisPtr, PVStructurePtr(), BitSetPtr()));
}
}
@@ -787,10 +792,10 @@ public:
ChannelBaseRequester::shared_pointer getRequester() OVERRIDE FINAL { return m_callback.lock(); }
virtual void send(ByteBuffer* buffer, TransportSendControl* control) OVERRIDE FINAL {
int32 pendingRequest = getPendingRequest();
int32 pendingRequest = beginRequest();
if (pendingRequest < 0)
{
BaseRequestImpl::send(buffer, control);
base_send(buffer, control, pendingRequest);
return;
}
@@ -815,8 +820,6 @@ public:
m_structure->serialize(buffer, control, m_bitSet.get());
}
}
stopRequest();
}
virtual void initResponse(Transport::shared_pointer const & transport, int8 /*version*/, ByteBuffer* payloadBuffer, int8 /*qos*/, const Status& status) OVERRIDE FINAL {
@@ -888,7 +891,7 @@ public:
try {
m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this<ChannelPutImpl>());
} catch (std::runtime_error &rte) {
stopRequest();
abortRequest();
EXCEPTION_GUARD3(m_callback, cb, cb->getDone(channelNotConnected, thisPtr, PVStructurePtr(), BitSetPtr()));
}
}
@@ -933,7 +936,7 @@ public:
unlock();
m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this<ChannelPutImpl>());
} catch (std::runtime_error &rte) {
stopRequest();
abortRequest();
EXCEPTION_GUARD3(m_callback, cb, cb->putDone(channelNotConnected, thisPtr));
}
}
@@ -1030,10 +1033,10 @@ public:
ChannelBaseRequester::shared_pointer getRequester() OVERRIDE FINAL { return m_callback.lock(); }
virtual void send(ByteBuffer* buffer, TransportSendControl* control) OVERRIDE FINAL {
int32 pendingRequest = getPendingRequest();
int32 pendingRequest = beginRequest();
if (pendingRequest < 0)
{
BaseRequestImpl::send(buffer, control);
base_send(buffer, control, pendingRequest);
return;
}
@@ -1062,8 +1065,6 @@ public:
m_putData->serialize(buffer, control, m_putDataBitSet.get());
}
}
stopRequest();
}
virtual void initResponse(Transport::shared_pointer const & transport, int8 /*version*/, ByteBuffer* payloadBuffer, int8 /*qos*/, const Status& status) OVERRIDE FINAL {
@@ -1184,7 +1185,7 @@ public:
unlock();
m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this<ChannelPutGetImpl>());
} catch (std::runtime_error &rte) {
stopRequest();
abortRequest();
EXCEPTION_GUARD3(m_callback, cb, cb->putGetDone(channelNotConnected, thisPtr, PVStructurePtr(), BitSetPtr()));
}
}
@@ -1213,7 +1214,7 @@ public:
try {
m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this<ChannelPutGetImpl>());
} catch (std::runtime_error &rte) {
stopRequest();
abortRequest();
EXCEPTION_GUARD3(m_callback, cb, cb->getGetDone(channelNotConnected, thisPtr, PVStructurePtr(), BitSetPtr()));
}
}
@@ -1242,7 +1243,7 @@ public:
try {
m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this<ChannelPutGetImpl>());
} catch (std::runtime_error &rte) {
stopRequest();
abortRequest();
EXCEPTION_GUARD3(m_callback, cb, cb->getPutDone(channelNotConnected, thisPtr, PVStructurePtr(), BitSetPtr()));
}
}
@@ -1336,10 +1337,10 @@ public:
ChannelBaseRequester::shared_pointer getRequester() OVERRIDE FINAL { return m_callback.lock(); }
virtual void send(ByteBuffer* buffer, TransportSendControl* control) OVERRIDE FINAL {
int32 pendingRequest = getPendingRequest();
int32 pendingRequest = beginRequest();
if (pendingRequest < 0)
{
BaseRequestImpl::send(buffer, control);
base_send(buffer, control, pendingRequest);
return;
}
@@ -1366,8 +1367,6 @@ public:
m_structure.reset();
}
}
stopRequest();
}
virtual void initResponse(Transport::shared_pointer const & /*transport*/, int8 /*version*/, ByteBuffer* /*payloadBuffer*/, int8 /*qos*/, const Status& status) OVERRIDE FINAL {
@@ -1424,7 +1423,7 @@ public:
m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this<ChannelRPCImpl>());
} catch (std::runtime_error &rte) {
stopRequest();
abortRequest();
EXCEPTION_GUARD3(m_callback, cb, cb->requestDone(channelNotConnected, thisPtr, PVStructurePtr()));
}
}
@@ -1525,10 +1524,10 @@ public:
ChannelBaseRequester::shared_pointer getRequester() OVERRIDE FINAL { return m_callback.lock(); }
virtual void send(ByteBuffer* buffer, TransportSendControl* control) OVERRIDE FINAL {
int32 pendingRequest = getPendingRequest();
int32 pendingRequest = beginRequest();
if (pendingRequest < 0)
{
BaseRequestImpl::send(buffer, control);
base_send(buffer, control, pendingRequest);
return;
}
@@ -1570,8 +1569,6 @@ public:
m_arrayData->serialize(buffer, control, 0, m_count ? m_count : m_arrayData->getLength()); // put from 0 offset (see API doc), m_count == 0 means entire array
}
}
stopRequest();
}
virtual void initResponse(Transport::shared_pointer const & transport, int8 /*version*/, ByteBuffer* payloadBuffer, int8 /*qos*/, const Status& status) OVERRIDE FINAL {
@@ -1660,7 +1657,7 @@ public:
}
m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this<ChannelArrayImpl>());
} catch (std::runtime_error &rte) {
stopRequest();
abortRequest();
EXCEPTION_GUARD3(m_callback, cb, cb->getArrayDone(channelNotConnected, thisPtr, PVArray::shared_pointer()));
}
}
@@ -1704,7 +1701,7 @@ public:
}
m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this<ChannelArrayImpl>());
} catch (std::runtime_error &rte) {
stopRequest();
abortRequest();
EXCEPTION_GUARD3(m_callback, cb, cb->putArrayDone(channelNotConnected, thisPtr));
}
}
@@ -1737,7 +1734,7 @@ public:
}
m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this<ChannelArrayImpl>());
} catch (std::runtime_error &rte) {
stopRequest();
abortRequest();
EXCEPTION_GUARD3(m_callback, cb, cb->setLengthDone(channelNotConnected, thisPtr));
}
}
@@ -1767,7 +1764,7 @@ public:
try {
m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this<ChannelArrayImpl>());
} catch (std::runtime_error &rte) {
stopRequest();
abortRequest();
EXCEPTION_GUARD3(m_callback, cb, cb->getLengthDone(channelNotConnected, thisPtr, 0));
}
}
@@ -2231,10 +2228,10 @@ public:
ChannelBaseRequester::shared_pointer getRequester() OVERRIDE FINAL { return m_callback.lock(); }
virtual void send(ByteBuffer* buffer, TransportSendControl* control) OVERRIDE FINAL {
int32 pendingRequest = getPendingRequest();
int32 pendingRequest = beginRequest();
if (pendingRequest < 0)
{
BaseRequestImpl::send(buffer, control);
base_send(buffer, control, pendingRequest);
return;
}
@@ -2255,8 +2252,6 @@ public:
buffer->putInt(m_queueSize);
}
}
stopRequest();
}
virtual void initResponse(
@@ -2373,7 +2368,7 @@ public:
m_started = true;
return Status::Ok;
} catch (std::runtime_error &rte) {
stopRequest();
abortRequest();
return BaseRequestImpl::channelNotConnected;
}
}
@@ -2399,7 +2394,7 @@ public:
m_started = false;
return Status::Ok;
} catch (std::runtime_error &rte) {
stopRequest();
abortRequest();
return BaseRequestImpl::channelNotConnected;
}
}