From ba3b777978dbdfb40c59aa94742a41c08618e4b7 Mon Sep 17 00:00:00 2001 From: Michael Davidsaver Date: Thu, 16 Nov 2017 11:18:13 -0600 Subject: [PATCH 01/11] missed one cf 2f0bb7d44882a36ade2eb48947aeca7eef69f575 --- src/remote/codec.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/remote/codec.cpp b/src/remote/codec.cpp index 0311ecc..bb1d6cd 100644 --- a/src/remote/codec.cpp +++ b/src/remote/codec.cpp @@ -321,7 +321,7 @@ void AbstractCodec::processReadSegmented() { if (!notFirstSegment) { LOG(logLevelWarn, - "Not-a-first segmented message expected from the client at" + "Protocol Violation: Not-a-first segmented message expected from the client at" " %s:%d: %s, disconnecting...", __FILE__, __LINE__, inetAddressToString(*getLastReadBufferSocketAddress()).c_str()); invalidDataStreamHandler(); From 6f39f02f3ae745c5681ef1cd47ebc934d8e187e6 Mon Sep 17 00:00:00 2001 From: Michael Davidsaver Date: Thu, 16 Nov 2017 14:56:48 -0600 Subject: [PATCH 02/11] more client locking violations As with #72 more use of m_pendingRequest w/o locking --- src/remoteClient/clientContextImpl.cpp | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/remoteClient/clientContextImpl.cpp b/src/remoteClient/clientContextImpl.cpp index da79f38..271a75c 100644 --- a/src/remoteClient/clientContextImpl.cpp +++ b/src/remoteClient/clientContextImpl.cpp @@ -470,7 +470,7 @@ public: control->startMessage((int8)CMD_PROCESS, 9); buffer->putInt(m_channel->getServerChannelID()); buffer->putInt(m_ioid); - buffer->putByte((int8)m_pendingRequest); + buffer->putByte((int8)pendingRequest); if (pendingRequest & QOS_INIT) { @@ -609,7 +609,7 @@ public: control->startMessage((int8)CMD_GET, 9); buffer->putInt(m_channel->getServerChannelID()); buffer->putInt(m_ioid); - buffer->putByte((int8)m_pendingRequest); + buffer->putByte((int8)pendingRequest); if (initStage) { @@ -797,7 +797,7 @@ public: control->startMessage((int8)CMD_PUT, 9); buffer->putInt(m_channel->getServerChannelID()); buffer->putInt(m_ioid); - buffer->putByte((int8)m_pendingRequest); + buffer->putByte((int8)pendingRequest); if (pendingRequest & QOS_INIT) { @@ -1346,8 +1346,8 @@ public: control->startMessage((int8)CMD_RPC, 9); buffer->putInt(m_channel->getServerChannelID()); buffer->putInt(m_ioid); - if ((m_pendingRequest & QOS_INIT) == 0) - buffer->putByte((int8)m_pendingRequest); + if ((pendingRequest & QOS_INIT) == 0) + buffer->putByte((int8)pendingRequest); if (pendingRequest & QOS_INIT) { @@ -1535,7 +1535,7 @@ public: control->startMessage((int8)CMD_ARRAY, 9); buffer->putInt(m_channel->getServerChannelID()); buffer->putInt(m_ioid); - buffer->putByte((int8)m_pendingRequest); + buffer->putByte((int8)pendingRequest); if (pendingRequest & QOS_INIT) { From b3fcffb18e7d9fc7051df155ed38c36c1d651e83 Mon Sep 17 00:00:00 2001 From: Michael Davidsaver Date: Thu, 16 Nov 2017 14:56:59 -0600 Subject: [PATCH 03/11] avoid magic number --- src/remoteClient/clientContextImpl.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/remoteClient/clientContextImpl.cpp b/src/remoteClient/clientContextImpl.cpp index 271a75c..63057c4 100644 --- a/src/remoteClient/clientContextImpl.cpp +++ b/src/remoteClient/clientContextImpl.cpp @@ -4786,7 +4786,7 @@ public: } virtual void send(ByteBuffer* buffer, TransportSendControl* control) OVERRIDE FINAL { - control->startMessage((int8)17, 8); + control->startMessage((int8)CMD_GET_FIELD, 8); buffer->putInt(m_channel->getServerChannelID()); buffer->putInt(m_ioid); SerializeHelper::serializeString(m_subField, buffer, control); From 1d56ee72830d69602e4af4f897a924abfffa2ed6 Mon Sep 17 00:00:00 2001 From: Michael Davidsaver Date: Thu, 16 Nov 2017 15:34:57 -0600 Subject: [PATCH 04/11] pva client ignore CMD_DESTROY_REQUEST reply. Presently CMD_DESTROY_REQUEST and CMD_CANCEL_REQUEST aren't acknowledged. This allows ambiguity in when the request ioid goes out of scope. Currently, the client forgets about an ioid as soon as the destroy request is sent. So any in-flight response will be erroneously flagged as "Undecipherable message". Similarly, a cancel request can cross paths with a reply. This creates confusion in the event the client starts a new request immediately after canceling the first. Let's build some forward compatibility by ignoring replies to cancel and destroy requests. --- src/remoteClient/clientContextImpl.cpp | 34 ++++---------------------- 1 file changed, 5 insertions(+), 29 deletions(-) diff --git a/src/remoteClient/clientContextImpl.cpp b/src/remoteClient/clientContextImpl.cpp index 63057c4..1edad7d 100644 --- a/src/remoteClient/clientContextImpl.cpp +++ b/src/remoteClient/clientContextImpl.cpp @@ -2448,30 +2448,6 @@ public: }; -class BadResponse : public AbstractClientResponseHandler, private epics::pvData::NoDefaultMethods { -public: - BadResponse(ClientContextImpl::shared_pointer const & context) : - AbstractClientResponseHandler(context, "Bad response") - { - } - - virtual ~BadResponse() { - } - - virtual void handleResponse(osiSockAddr* responseFrom, - Transport::shared_pointer const & /*transport*/, int8 /*version*/, int8 command, - size_t /*payloadSize*/, epics::pvData::ByteBuffer* /*payloadBuffer*/) OVERRIDE FINAL - { - char ipAddrStr[48]; - ipAddrToDottedIP(&responseFrom->ia, ipAddrStr, sizeof(ipAddrStr)); - - LOG(logLevelInfo, - "Undecipherable message (bad response type %d) from %s.", - command, ipAddrStr); - } -}; - - class ResponseRequestHandler : public AbstractClientResponseHandler, private epics::pvData::NoDefaultMethods { public: ResponseRequestHandler(ClientContextImpl::shared_pointer const & context) : @@ -2964,18 +2940,18 @@ public: ClientResponseHandler(ClientContextImpl::shared_pointer const & context) :ResponseHandler(context.get(), "ClientResponseHandler") { - ResponseHandler::shared_pointer badResponse(new BadResponse(context)); + ResponseHandler::shared_pointer ignoreResponse(new NoopResponse(context, "Ignore")); ResponseHandler::shared_pointer dataResponse(new ResponseRequestHandler(context)); m_handlerTable.resize(CMD_CANCEL_REQUEST+1); m_handlerTable[CMD_BEACON].reset(new BeaconResponseHandler(context)); /* 0 */ m_handlerTable[CMD_CONNECTION_VALIDATION].reset(new ClientConnectionValidationHandler(context)); /* 1 */ - m_handlerTable[CMD_ECHO].reset(new NoopResponse(context, "Echo")); /* 2 */ + m_handlerTable[CMD_ECHO] = ignoreResponse; /* 2 */ m_handlerTable[CMD_SEARCH].reset(new SearchHandler(context)); /* 3 */ m_handlerTable[CMD_SEARCH_RESPONSE].reset(new SearchResponseHandler(context)); /* 4 */ m_handlerTable[CMD_AUTHNZ].reset(new AuthNZHandler(context.get())); /* 5 */ - m_handlerTable[CMD_ACL_CHANGE].reset(new NoopResponse(context, "Access rights change")); /* 6 */ + m_handlerTable[CMD_ACL_CHANGE] = ignoreResponse; /* 6 */ m_handlerTable[CMD_CREATE_CHANNEL].reset(new CreateChannelHandler(context)); /* 7 */ m_handlerTable[CMD_DESTROY_CHANNEL].reset(new DestroyChannelHandler(context)); /* 8 */ m_handlerTable[CMD_CONNECTION_VALIDATED].reset(new ClientConnectionValidatedHandler(context)); /* 9 */ @@ -2984,13 +2960,13 @@ public: m_handlerTable[CMD_PUT_GET] = dataResponse; /* 12 - put-get response */ m_handlerTable[CMD_MONITOR] = dataResponse; /* 13 - monitor response */ m_handlerTable[CMD_ARRAY] = dataResponse; /* 14 - array response */ - m_handlerTable[CMD_DESTROY_REQUEST] = badResponse; /* 15 - destroy request */ + m_handlerTable[CMD_DESTROY_REQUEST] = ignoreResponse; /* 15 - destroy request */ m_handlerTable[CMD_PROCESS] = dataResponse; /* 16 - process response */ m_handlerTable[CMD_GET_FIELD] = dataResponse; /* 17 - get field response */ m_handlerTable[CMD_MESSAGE].reset(new MessageHandler(context)); /* 18 - message to Requester */ m_handlerTable[CMD_MULTIPLE_DATA].reset(new MultipleResponseRequestHandler(context)); /* 19 - grouped monitors */ m_handlerTable[CMD_RPC] = dataResponse; /* 20 - RPC response */ - m_handlerTable[CMD_CANCEL_REQUEST] = badResponse; /* 21 - cancel request */ + m_handlerTable[CMD_CANCEL_REQUEST] = ignoreResponse; /* 21 - cancel request */ } virtual void handleResponse(osiSockAddr* responseFrom, From 00e01d5211bcfe6d95df1f8b3bdb7bc01fd0b966 Mon Sep 17 00:00:00 2001 From: Michael Davidsaver Date: Thu, 16 Nov 2017 16:16:03 -0600 Subject: [PATCH 05/11] tighten up client operation send state handling --- src/remoteClient/clientContextImpl.cpp | 103 ++++++++++++------------- 1 file changed, 49 insertions(+), 54 deletions(-) diff --git a/src/remoteClient/clientContextImpl.cpp b/src/remoteClient/clientContextImpl.cpp index 1edad7d..4031504 100644 --- a/src/remoteClient/clientContextImpl.cpp +++ b/src/remoteClient/clientContextImpl.cpp @@ -134,6 +134,8 @@ protected: pvAccessID m_ioid; + // holds: NULL_REQUEST, PURE_DESTROY_REQUEST, PURE_CANCEL_REQUEST, or + // a mask of QOS_* int32 m_pendingRequest; Mutex m_mutex; @@ -221,22 +223,29 @@ protected: bool startRequest(int32 qos) { Lock guard(m_mutex); - // we allow pure destroy... - if (m_pendingRequest != NULL_REQUEST && qos != PURE_DESTROY_REQUEST && qos != PURE_CANCEL_REQUEST) - return false; + if(qos==PURE_DESTROY_REQUEST) + {/* always allow destroy */} + else if(qos==PURE_CANCEL_REQUEST && m_pendingRequest!=PURE_DESTROY_REQUEST) + {/* cancel overrides all but destroy */} + else if(m_pendingRequest==NULL_REQUEST) + {/* anything whenidle */} + else + {return false; /* others not allowed */} m_pendingRequest = qos; return true; } - void stopRequest() { + int32 beginRequest() { Lock guard(m_mutex); + int32 ret = m_pendingRequest; m_pendingRequest = NULL_REQUEST; + return ret; } - int32 getPendingRequest() { + void abortRequest() { Lock guard(m_mutex); - return m_pendingRequest; + m_pendingRequest = NULL_REQUEST; } public: @@ -369,7 +378,7 @@ public: else if (status == Channel::DISCONNECTED) { m_subscribed.clear(); - stopRequest(); + abortRequest(); } // TODO notify? } @@ -384,10 +393,11 @@ public: void updateSubscription() {} - virtual void send(ByteBuffer* buffer, TransportSendControl* control) OVERRIDE { - int8 qos = getPendingRequest(); - if (qos == -1) + // sub-class send() calls me + void base_send(ByteBuffer* buffer, TransportSendControl* control, int8 qos) { + if (qos == NULL_REQUEST) { return; + } else if (qos == PURE_DESTROY_REQUEST) { control->startMessage((int8)CMD_DESTROY_REQUEST, 8); @@ -400,7 +410,6 @@ public: buffer->putInt(m_channel->getServerChannelID()); buffer->putInt(m_ioid); } - stopRequest(); } }; @@ -460,10 +469,10 @@ public: ChannelBaseRequester::shared_pointer getRequester() OVERRIDE FINAL { return m_callback.lock(); } virtual void send(ByteBuffer* buffer, TransportSendControl* control) OVERRIDE FINAL { - int32 pendingRequest = getPendingRequest(); + int32 pendingRequest = beginRequest(); if (pendingRequest < 0) { - BaseRequestImpl::send(buffer, control); + base_send(buffer, control, pendingRequest); return; } @@ -477,8 +486,6 @@ public: // pvRequest SerializationHelper::serializePVRequest(buffer, control, m_pvRequest); } - - stopRequest(); } virtual void initResponse(Transport::shared_pointer const & /*transport*/, int8 /*version*/, ByteBuffer* /*payloadBuffer*/, int8 /*qos*/, const Status& status) OVERRIDE FINAL { @@ -513,7 +520,7 @@ public: try { m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this()); } catch (std::runtime_error &rte) { - stopRequest(); + abortRequest(); EXCEPTION_GUARD3(m_callback, cb, cb->processDone(channelNotConnected, thisPtr)); } } @@ -597,12 +604,12 @@ public: ChannelBaseRequester::shared_pointer getRequester() OVERRIDE FINAL { return m_callback.lock(); } virtual void send(ByteBuffer* buffer, TransportSendControl* control) OVERRIDE FINAL { - int32 pendingRequest = getPendingRequest(); + int32 pendingRequest = beginRequest(); bool initStage = ((pendingRequest & QOS_INIT) != 0); if (pendingRequest < 0) { - BaseRequestImpl::send(buffer, control); + base_send(buffer, control, pendingRequest); return; } @@ -616,8 +623,6 @@ public: // pvRequest SerializationHelper::serializePVRequest(buffer, control, m_pvRequest); } - - stopRequest(); } virtual void initResponse(Transport::shared_pointer const & transport, int8 /*version*/, ByteBuffer* payloadBuffer, int8 /*qos*/, const Status& status) OVERRIDE FINAL { @@ -678,7 +683,7 @@ public: try { m_channel->checkAndGetTransport()->flushSendQueue(); } catch (std::runtime_error &rte) { - stopRequest(); + abortRequest(); EXCEPTION_GUARD3(m_callback, cb, cb->getDone(channelNotConnected, thisPtr)); } return; @@ -693,7 +698,7 @@ public: m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this()); //TODO bulk hack m_channel->checkAndGetTransport()->enqueueOnlySendRequest(thisSender); } catch (std::runtime_error &rte) { - stopRequest(); + abortRequest(); EXCEPTION_GUARD3(m_callback, cb, cb->getDone(channelNotConnected, thisPtr, PVStructurePtr(), BitSetPtr())); } } @@ -787,10 +792,10 @@ public: ChannelBaseRequester::shared_pointer getRequester() OVERRIDE FINAL { return m_callback.lock(); } virtual void send(ByteBuffer* buffer, TransportSendControl* control) OVERRIDE FINAL { - int32 pendingRequest = getPendingRequest(); + int32 pendingRequest = beginRequest(); if (pendingRequest < 0) { - BaseRequestImpl::send(buffer, control); + base_send(buffer, control, pendingRequest); return; } @@ -815,8 +820,6 @@ public: m_structure->serialize(buffer, control, m_bitSet.get()); } } - - stopRequest(); } virtual void initResponse(Transport::shared_pointer const & transport, int8 /*version*/, ByteBuffer* payloadBuffer, int8 /*qos*/, const Status& status) OVERRIDE FINAL { @@ -888,7 +891,7 @@ public: try { m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this()); } catch (std::runtime_error &rte) { - stopRequest(); + abortRequest(); EXCEPTION_GUARD3(m_callback, cb, cb->getDone(channelNotConnected, thisPtr, PVStructurePtr(), BitSetPtr())); } } @@ -933,7 +936,7 @@ public: unlock(); m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this()); } catch (std::runtime_error &rte) { - stopRequest(); + abortRequest(); EXCEPTION_GUARD3(m_callback, cb, cb->putDone(channelNotConnected, thisPtr)); } } @@ -1030,10 +1033,10 @@ public: ChannelBaseRequester::shared_pointer getRequester() OVERRIDE FINAL { return m_callback.lock(); } virtual void send(ByteBuffer* buffer, TransportSendControl* control) OVERRIDE FINAL { - int32 pendingRequest = getPendingRequest(); + int32 pendingRequest = beginRequest(); if (pendingRequest < 0) { - BaseRequestImpl::send(buffer, control); + base_send(buffer, control, pendingRequest); return; } @@ -1062,8 +1065,6 @@ public: m_putData->serialize(buffer, control, m_putDataBitSet.get()); } } - - stopRequest(); } virtual void initResponse(Transport::shared_pointer const & transport, int8 /*version*/, ByteBuffer* payloadBuffer, int8 /*qos*/, const Status& status) OVERRIDE FINAL { @@ -1184,7 +1185,7 @@ public: unlock(); m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this()); } catch (std::runtime_error &rte) { - stopRequest(); + abortRequest(); EXCEPTION_GUARD3(m_callback, cb, cb->putGetDone(channelNotConnected, thisPtr, PVStructurePtr(), BitSetPtr())); } } @@ -1213,7 +1214,7 @@ public: try { m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this()); } catch (std::runtime_error &rte) { - stopRequest(); + abortRequest(); EXCEPTION_GUARD3(m_callback, cb, cb->getGetDone(channelNotConnected, thisPtr, PVStructurePtr(), BitSetPtr())); } } @@ -1242,7 +1243,7 @@ public: try { m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this()); } catch (std::runtime_error &rte) { - stopRequest(); + abortRequest(); EXCEPTION_GUARD3(m_callback, cb, cb->getPutDone(channelNotConnected, thisPtr, PVStructurePtr(), BitSetPtr())); } } @@ -1336,10 +1337,10 @@ public: ChannelBaseRequester::shared_pointer getRequester() OVERRIDE FINAL { return m_callback.lock(); } virtual void send(ByteBuffer* buffer, TransportSendControl* control) OVERRIDE FINAL { - int32 pendingRequest = getPendingRequest(); + int32 pendingRequest = beginRequest(); if (pendingRequest < 0) { - BaseRequestImpl::send(buffer, control); + base_send(buffer, control, pendingRequest); return; } @@ -1366,8 +1367,6 @@ public: m_structure.reset(); } } - - stopRequest(); } virtual void initResponse(Transport::shared_pointer const & /*transport*/, int8 /*version*/, ByteBuffer* /*payloadBuffer*/, int8 /*qos*/, const Status& status) OVERRIDE FINAL { @@ -1424,7 +1423,7 @@ public: m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this()); } catch (std::runtime_error &rte) { - stopRequest(); + abortRequest(); EXCEPTION_GUARD3(m_callback, cb, cb->requestDone(channelNotConnected, thisPtr, PVStructurePtr())); } } @@ -1525,10 +1524,10 @@ public: ChannelBaseRequester::shared_pointer getRequester() OVERRIDE FINAL { return m_callback.lock(); } virtual void send(ByteBuffer* buffer, TransportSendControl* control) OVERRIDE FINAL { - int32 pendingRequest = getPendingRequest(); + int32 pendingRequest = beginRequest(); if (pendingRequest < 0) { - BaseRequestImpl::send(buffer, control); + base_send(buffer, control, pendingRequest); return; } @@ -1570,8 +1569,6 @@ public: m_arrayData->serialize(buffer, control, 0, m_count ? m_count : m_arrayData->getLength()); // put from 0 offset (see API doc), m_count == 0 means entire array } } - - stopRequest(); } virtual void initResponse(Transport::shared_pointer const & transport, int8 /*version*/, ByteBuffer* payloadBuffer, int8 /*qos*/, const Status& status) OVERRIDE FINAL { @@ -1660,7 +1657,7 @@ public: } m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this()); } catch (std::runtime_error &rte) { - stopRequest(); + abortRequest(); EXCEPTION_GUARD3(m_callback, cb, cb->getArrayDone(channelNotConnected, thisPtr, PVArray::shared_pointer())); } } @@ -1704,7 +1701,7 @@ public: } m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this()); } catch (std::runtime_error &rte) { - stopRequest(); + abortRequest(); EXCEPTION_GUARD3(m_callback, cb, cb->putArrayDone(channelNotConnected, thisPtr)); } } @@ -1737,7 +1734,7 @@ public: } m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this()); } catch (std::runtime_error &rte) { - stopRequest(); + abortRequest(); EXCEPTION_GUARD3(m_callback, cb, cb->setLengthDone(channelNotConnected, thisPtr)); } } @@ -1767,7 +1764,7 @@ public: try { m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this()); } catch (std::runtime_error &rte) { - stopRequest(); + abortRequest(); EXCEPTION_GUARD3(m_callback, cb, cb->getLengthDone(channelNotConnected, thisPtr, 0)); } } @@ -2231,10 +2228,10 @@ public: ChannelBaseRequester::shared_pointer getRequester() OVERRIDE FINAL { return m_callback.lock(); } virtual void send(ByteBuffer* buffer, TransportSendControl* control) OVERRIDE FINAL { - int32 pendingRequest = getPendingRequest(); + int32 pendingRequest = beginRequest(); if (pendingRequest < 0) { - BaseRequestImpl::send(buffer, control); + base_send(buffer, control, pendingRequest); return; } @@ -2255,8 +2252,6 @@ public: buffer->putInt(m_queueSize); } } - - stopRequest(); } virtual void initResponse( @@ -2373,7 +2368,7 @@ public: m_started = true; return Status::Ok; } catch (std::runtime_error &rte) { - stopRequest(); + abortRequest(); return BaseRequestImpl::channelNotConnected; } } @@ -2399,7 +2394,7 @@ public: m_started = false; return Status::Ok; } catch (std::runtime_error &rte) { - stopRequest(); + abortRequest(); return BaseRequestImpl::channelNotConnected; } } From cc9c6667314315c77688354ad51d6f39acbfb4c5 Mon Sep 17 00:00:00 2001 From: Michael Davidsaver Date: Thu, 16 Nov 2017 16:44:14 -0600 Subject: [PATCH 06/11] Improve client pvRequest option parsing Give some notice of parse errors --- src/remoteClient/clientContextImpl.cpp | 83 +++++++++++++++++--------- testApp/remote/channelAccessIFTest.cpp | 2 +- 2 files changed, 55 insertions(+), 30 deletions(-) diff --git a/src/remoteClient/clientContextImpl.cpp b/src/remoteClient/clientContextImpl.cpp index 4031504..1777b82 100644 --- a/src/remoteClient/clientContextImpl.cpp +++ b/src/remoteClient/clientContextImpl.cpp @@ -73,6 +73,9 @@ typedef std::map IOIDResponseRequestM catch (std::exception &e) { LOG(logLevelError, "Unhandled exception caught from client code at %s:%d: %s", __FILE__, __LINE__, e.what()); } \ catch (...) { LOG(logLevelError, "Unhandled exception caught from client code at %s:%d.", __FILE__, __LINE__); }} +#define SEND_MESSAGE(WEAK, PTR, MSG, MTYPE) \ +do{requester_type::shared_pointer PTR((WEAK).lock()); if(PTR) (PTR)->message(MSG, MTYPE); }while(0) + /** * Base channel request. * @author Matej Sekoranja @@ -2154,18 +2157,25 @@ public: PVStructurePtr pvOptions = m_pvRequest->getSubField("record._options"); if (pvOptions) { - PVStringPtr pvString = pvOptions->getSubField("queueSize"); - if (pvString) { - int32 size; - std::stringstream ss; - ss << pvString->get(); - ss >> size; - if (size > 1) - m_queueSize = size; + PVScalarPtr option(pvOptions->getSubField("queueSize")); + if (option) { + try { + m_queueSize = option->getAs(); + if(m_queueSize<2) + m_queueSize = 2; + }catch(std::runtime_error& e){ + SEND_MESSAGE(m_callback, cb, "Invalid queueSize=", warningMessage); + } + } + + option = pvOptions->getSubField("pipeline"); + if (option) { + try { + m_pipeline = option->getAs(); + }catch(std::runtime_error& e){ + SEND_MESSAGE(m_callback, cb, "Invalid pipeline=", warningMessage); + } } - pvString = pvOptions->getSubField("pipeline"); - if (pvString) - m_pipeline = (pvString->get() == "true"); // pipeline options if (m_pipeline) @@ -2173,23 +2183,40 @@ public: // defaults to queueSize/2 m_ackAny = m_queueSize/2; - pvString = pvOptions->getSubField("ackAny"); - if (pvString) { - int32 size; - string sval = pvString->get(); - string::size_type slen = sval.length(); - bool percentage = (slen > 0) && (sval[slen-1] == '%'); - if (percentage) - sval = sval.substr(0, slen-1); - std::stringstream ss; - ss << sval; - ss >> size; - if (percentage) - size = (m_queueSize * size) / 100; - if (size <= 0) + bool done = false; + int32 size; + + option = pvOptions->getSubField("ackAny"); + if (option) { + if(option->getScalar()->getScalarType()==pvString) { + std::string sval(option->getAs()); + + if(!sval.empty() && sval[sval.size()-1]=='%') { + try { + double percent = castUnsafe(sval.substr(0, sval.size()-1)); + size = (m_queueSize * percent) / 100.0; + done = true; + }catch(std::runtime_error&){ + SEND_MESSAGE(m_callback, cb, "ackAny= invalid precentage", warningMessage); + } + } + } + + if(!done) { + try { + size = option->getAs(); + done = true; + }catch(std::runtime_error&){ + SEND_MESSAGE(m_callback, cb, "ackAny= invalid value", warningMessage); + } + } + + if(!done) { + } else if (size <= 0) { m_ackAny = 1; - else + } else { m_ackAny = (m_ackAny <= m_queueSize) ? size : m_queueSize; + } } } } @@ -2221,9 +2248,7 @@ public: } } - virtual ~ChannelMonitorImpl() - { - } + virtual ~ChannelMonitorImpl() {} ChannelBaseRequester::shared_pointer getRequester() OVERRIDE FINAL { return m_callback.lock(); } diff --git a/testApp/remote/channelAccessIFTest.cpp b/testApp/remote/channelAccessIFTest.cpp index 99a4210..c95c2a2 100644 --- a/testApp/remote/channelAccessIFTest.cpp +++ b/testApp/remote/channelAccessIFTest.cpp @@ -1755,7 +1755,7 @@ void ChannelAccessIFTest::test_channelMonitorWithInvalidRequesterAndRequest() { void ChannelAccessIFTest::test_channelMonitor(int queueSize) { - testDiag("BEGIN TEST %s:", CURRENT_FUNCTION); + testDiag("BEGIN TEST %s: queueSize=%d", CURRENT_FUNCTION, queueSize); ostringstream ostream; ostream << queueSize; string request = "record[queueSize="; From 8e846b02bce3313666c00b2fe9071afde8c67f7a Mon Sep 17 00:00:00 2001 From: Michael Davidsaver Date: Thu, 16 Nov 2017 16:55:45 -0600 Subject: [PATCH 07/11] tighten up validation --- src/remoteClient/clientContextImpl.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/remoteClient/clientContextImpl.cpp b/src/remoteClient/clientContextImpl.cpp index 1777b82..9e311d4 100644 --- a/src/remoteClient/clientContextImpl.cpp +++ b/src/remoteClient/clientContextImpl.cpp @@ -2296,6 +2296,8 @@ public: dynamic_pointer_cast( transport->cachedDeserialize(payloadBuffer) ); + if(!structure) + throw std::runtime_error("initResponse() w/o Structure"); m_monitorStrategy->init(structure); bool restoreStartedState = m_started; From f45de1de68c58d4fdea05ef015c86bc12bd0c434 Mon Sep 17 00:00:00 2001 From: Michael Davidsaver Date: Thu, 16 Nov 2017 17:28:27 -0600 Subject: [PATCH 08/11] client minor locking --- src/remoteClient/clientContextImpl.cpp | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/src/remoteClient/clientContextImpl.cpp b/src/remoteClient/clientContextImpl.cpp index 9e311d4..0af7839 100644 --- a/src/remoteClient/clientContextImpl.cpp +++ b/src/remoteClient/clientContextImpl.cpp @@ -274,9 +274,8 @@ public: if (status.isSuccess()) { // once created set destroy flag - m_mutex.lock(); + Lock G(m_mutex); m_initialized = true; - m_mutex.unlock(); } initResponse(transport, version, payloadBuffer, qos, status); @@ -287,10 +286,9 @@ public: if (qos & QOS_DESTROY) { - m_mutex.lock(); + Lock G(m_mutex); m_initialized = false; destroyReq = true; - m_mutex.unlock(); } normalResponse(transport, version, payloadBuffer, qos, status); @@ -340,11 +338,13 @@ public: virtual void destroy(bool createRequestFailed) { + bool initd; { Lock guard(m_mutex); if (m_destroyed) return; m_destroyed = true; + initd = m_initialized; } // unregister response request @@ -352,7 +352,7 @@ public: m_channel->unregisterResponseRequest(m_ioid); // destroy remote instance - if (!createRequestFailed && m_initialized) + if (!createRequestFailed && initd) { try { @@ -2350,9 +2350,8 @@ public: status.deserialize(payloadBuffer, transport.get()); if (status.isSuccess()) { - m_mutex.lock(); + Lock G(m_mutex); m_initialized = true; - m_mutex.unlock(); } initResponse(transport, version, payloadBuffer, qos, status); } @@ -2361,9 +2360,10 @@ public: Status status; status.deserialize(payloadBuffer, transport.get()); - m_mutex.lock(); - m_initialized = false; - m_mutex.unlock(); + { + Lock G(m_mutex); + m_initialized = false; + } normalResponse(transport, version, payloadBuffer, qos, status); } From e626ca0aafcdfc8331a920a3a79e38bb13a0647f Mon Sep 17 00:00:00 2001 From: Michael Davidsaver Date: Thu, 16 Nov 2017 17:32:00 -0600 Subject: [PATCH 09/11] client const-ify where possible --- src/remoteClient/clientContextImpl.cpp | 106 +++++++++++-------------- 1 file changed, 46 insertions(+), 60 deletions(-) diff --git a/src/remoteClient/clientContextImpl.cpp b/src/remoteClient/clientContextImpl.cpp index 0af7839..cbe6dd5 100644 --- a/src/remoteClient/clientContextImpl.cpp +++ b/src/remoteClient/clientContextImpl.cpp @@ -128,18 +128,21 @@ public: protected: - ClientChannelImpl::shared_pointer m_channel; + const ClientChannelImpl::shared_pointer m_channel; /* negative... */ static const int NULL_REQUEST = -1; static const int PURE_DESTROY_REQUEST = -2; static const int PURE_CANCEL_REQUEST = -3; + // const after activate() pvAccessID m_ioid; +private: // holds: NULL_REQUEST, PURE_DESTROY_REQUEST, PURE_CANCEL_REQUEST, or // a mask of QOS_* int32 m_pendingRequest; +protected: Mutex m_mutex; @@ -439,15 +442,14 @@ class ChannelProcessRequestImpl : public ChannelProcess { public: - requester_type::weak_pointer m_callback; - PVStructure::shared_pointer m_pvRequest; + const requester_type::weak_pointer m_callback; + const PVStructure::shared_pointer m_pvRequest; ChannelProcessRequestImpl(ClientChannelImpl::shared_pointer const & channel, ChannelProcessRequester::shared_pointer const & callback, PVStructure::shared_pointer const & pvRequest) : BaseRequestImpl(channel), m_callback(callback), m_pvRequest(pvRequest) - { - } + {} virtual void activate() OVERRIDE FINAL { @@ -465,9 +467,7 @@ public: } } - ~ChannelProcessRequestImpl() - { - } + virtual ~ChannelProcessRequestImpl() {} ChannelBaseRequester::shared_pointer getRequester() OVERRIDE FINAL { return m_callback.lock(); } @@ -561,9 +561,9 @@ class ChannelGetImpl : public ChannelGet { public: - ChannelGetRequester::weak_pointer m_callback; + const ChannelGetRequester::weak_pointer m_callback; - PVStructure::shared_pointer m_pvRequest; + const PVStructure::shared_pointer m_pvRequest; PVStructure::shared_pointer m_structure; BitSet::shared_pointer m_bitSet; @@ -749,9 +749,9 @@ class ChannelPutImpl : public ChannelPut { public: - ChannelPutRequester::weak_pointer m_callback; + const ChannelPutRequester::weak_pointer m_callback; - PVStructure::shared_pointer m_pvRequest; + const PVStructure::shared_pointer m_pvRequest; PVStructure::shared_pointer m_structure; BitSet::shared_pointer m_bitSet; @@ -987,9 +987,9 @@ class ChannelPutGetImpl : public ChannelPutGet { public: - ChannelPutGetRequester::weak_pointer m_callback; + const ChannelPutGetRequester::weak_pointer m_callback; - PVStructure::shared_pointer m_pvRequest; + const PVStructure::shared_pointer m_pvRequest; // put data container PVStructure::shared_pointer m_putData; @@ -1297,9 +1297,9 @@ class ChannelRPCImpl : public ChannelRPC { public: - ChannelRPCRequester::weak_pointer m_callback; + const ChannelRPCRequester::weak_pointer m_callback; - PVStructure::shared_pointer m_pvRequest; + const PVStructure::shared_pointer m_pvRequest; PVStructure::shared_pointer m_structure; @@ -1475,9 +1475,9 @@ class ChannelArrayImpl : public ChannelArray { public: - ChannelArrayRequester::weak_pointer m_callback; + const ChannelArrayRequester::weak_pointer m_callback; - PVStructure::shared_pointer m_pvRequest; + const PVStructure::shared_pointer m_pvRequest; // data container PVArray::shared_pointer m_arrayData; @@ -1828,14 +1828,14 @@ class MonitorStrategyQueue : { private: - int32 m_queueSize; + const int32 m_queueSize; StructureConstPtr m_lastStructure; FreeElementQueue m_freeQueue; MonitorElementQueue m_monitorQueue; - MonitorRequester::weak_pointer m_callback; + const MonitorRequester::weak_pointer m_callback; Mutex m_mutex; @@ -1853,11 +1853,11 @@ private: bool m_reportQueueStateInProgress; // TODO check for cyclic-ref - ClientChannelImpl::shared_pointer m_channel; - pvAccessID m_ioid; + const ClientChannelImpl::shared_pointer m_channel; + const pvAccessID m_ioid; - bool m_pipeline; - int32 m_ackAny; + const bool m_pipeline; + const int32 m_ackAny; bool m_unlisten; @@ -1887,9 +1887,7 @@ public: //m_monitorQueue.reserve(m_queueSize); } - virtual ~MonitorStrategyQueue() - { - } + virtual ~MonitorStrategyQueue() {} virtual void init(StructureConstPtr const & structure) OVERRIDE FINAL { Lock guard(m_mutex); @@ -2121,10 +2119,10 @@ class ChannelMonitorImpl : public Monitor { public: - MonitorRequester::weak_pointer m_callback; + const MonitorRequester::weak_pointer m_callback; bool m_started; - PVStructure::shared_pointer m_pvRequest; + const PVStructure::shared_pointer m_pvRequest; std::tr1::shared_ptr m_monitorStrategy; @@ -2692,11 +2690,9 @@ class BeaconResponseHandler : public AbstractClientResponseHandler, private epic public: BeaconResponseHandler(ClientContextImpl::shared_pointer const & context) : AbstractClientResponseHandler(context, "Beacon") - { - } + {} - virtual ~BeaconResponseHandler() { - } + virtual ~BeaconResponseHandler() {} virtual void handleResponse(osiSockAddr* responseFrom, Transport::shared_pointer const & transport, int8 version, int8 command, @@ -2761,11 +2757,9 @@ class ClientConnectionValidationHandler : public AbstractClientResponseHandler, public: ClientConnectionValidationHandler(ClientContextImpl::shared_pointer context) : AbstractClientResponseHandler(context, "Connection validation") - { - } + {} - virtual ~ClientConnectionValidationHandler() { - } + virtual ~ClientConnectionValidationHandler() {} virtual void handleResponse(osiSockAddr* responseFrom, Transport::shared_pointer const & transport, int8 version, int8 command, @@ -2803,11 +2797,9 @@ class ClientConnectionValidatedHandler : public AbstractClientResponseHandler, p public: ClientConnectionValidatedHandler(ClientContextImpl::shared_pointer context) : AbstractClientResponseHandler(context, "Connection validated") - { - } + {} - virtual ~ClientConnectionValidatedHandler() { - } + virtual ~ClientConnectionValidatedHandler() {} virtual void handleResponse(osiSockAddr* responseFrom, Transport::shared_pointer const & transport, int8 version, int8 command, @@ -2826,11 +2818,9 @@ class MessageHandler : public AbstractClientResponseHandler, private epics::pvDa public: MessageHandler(ClientContextImpl::shared_pointer const & context) : AbstractClientResponseHandler(context, "Message") - { - } + {} - virtual ~MessageHandler() { - } + virtual ~MessageHandler() {} virtual void handleResponse(osiSockAddr* responseFrom, Transport::shared_pointer const & transport, int8 version, int8 command, @@ -2863,11 +2853,9 @@ class CreateChannelHandler : public AbstractClientResponseHandler, private epics public: CreateChannelHandler(ClientContextImpl::shared_pointer const & context) : AbstractClientResponseHandler(context, "Create channel") - { - } + {} - virtual ~CreateChannelHandler() { - } + virtual ~CreateChannelHandler() {} virtual void handleResponse(osiSockAddr* responseFrom, Transport::shared_pointer const & transport, int8 version, int8 command, @@ -2915,11 +2903,9 @@ class DestroyChannelHandler : public AbstractClientResponseHandler, private epic public: DestroyChannelHandler(ClientContextImpl::shared_pointer const & context) : AbstractClientResponseHandler(context, "Destroy channel") - { - } + {} - virtual ~DestroyChannelHandler() { - } + virtual ~DestroyChannelHandler() {} virtual void handleResponse(osiSockAddr* responseFrom, Transport::shared_pointer const & transport, int8 version, int8 command, @@ -2953,8 +2939,7 @@ private: public: - virtual ~ClientResponseHandler() { - } + virtual ~ClientResponseHandler() {} /** * @param context @@ -3129,22 +3114,22 @@ public: /** * Context. */ - std::tr1::shared_ptr m_context; + const std::tr1::shared_ptr m_context; /** * Client channel ID. */ - pvAccessID m_channelID; + const pvAccessID m_channelID; /** * Channel name. */ - string m_name; + const string m_name; /** * Channel requester. */ - ChannelRequester::weak_pointer m_requester; + const ChannelRequester::weak_pointer m_requester; public: //! The in-progress GetField operation. @@ -3155,7 +3140,7 @@ public: /** * Process priority. */ - short m_priority; + const short m_priority; /** * List of fixed addresses, if name resolution will be used. @@ -4718,6 +4703,7 @@ public: const GetFieldRequester::weak_pointer m_callback; string m_subField; + // const after activate() pvAccessID m_ioid; Mutex m_mutex; From cf7827ceab414feb42200491129fe4c10e586388 Mon Sep 17 00:00:00 2001 From: Michael Davidsaver Date: Thu, 16 Nov 2017 18:45:18 -0600 Subject: [PATCH 10/11] quiet mingw include order warning #warning Please include winsock2.h before windows.h --- src/factory/ChannelAccessFactory.cpp | 1 + src/remoteClient/clientContextImpl.cpp | 1 + 2 files changed, 2 insertions(+) diff --git a/src/factory/ChannelAccessFactory.cpp b/src/factory/ChannelAccessFactory.cpp index 4b5ca16..0842c3f 100644 --- a/src/factory/ChannelAccessFactory.cpp +++ b/src/factory/ChannelAccessFactory.cpp @@ -7,6 +7,7 @@ #include #include +#include #include #include diff --git a/src/remoteClient/clientContextImpl.cpp b/src/remoteClient/clientContextImpl.cpp index cbe6dd5..3d75691 100644 --- a/src/remoteClient/clientContextImpl.cpp +++ b/src/remoteClient/clientContextImpl.cpp @@ -10,6 +10,7 @@ #include #include +#include #include #include #include From 3f51c74ba1318cc97a82b9fa84e2a97e8c624ffb Mon Sep 17 00:00:00 2001 From: Michael Davidsaver Date: Thu, 16 Nov 2017 18:47:39 -0600 Subject: [PATCH 11/11] fix mingw build error --- src/remoteClient/clientContextImpl.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/remoteClient/clientContextImpl.cpp b/src/remoteClient/clientContextImpl.cpp index 3d75691..b8a215a 100644 --- a/src/remoteClient/clientContextImpl.cpp +++ b/src/remoteClient/clientContextImpl.cpp @@ -2170,7 +2170,7 @@ public: option = pvOptions->getSubField("pipeline"); if (option) { try { - m_pipeline = option->getAs(); + m_pipeline = option->getAs(); }catch(std::runtime_error& e){ SEND_MESSAGE(m_callback, cb, "Invalid pipeline=", warningMessage); }