diff --git a/src/factory/ChannelAccessFactory.cpp b/src/factory/ChannelAccessFactory.cpp index 4b5ca16..0842c3f 100644 --- a/src/factory/ChannelAccessFactory.cpp +++ b/src/factory/ChannelAccessFactory.cpp @@ -7,6 +7,7 @@ #include #include +#include #include #include diff --git a/src/remote/codec.cpp b/src/remote/codec.cpp index 0311ecc..bb1d6cd 100644 --- a/src/remote/codec.cpp +++ b/src/remote/codec.cpp @@ -321,7 +321,7 @@ void AbstractCodec::processReadSegmented() { if (!notFirstSegment) { LOG(logLevelWarn, - "Not-a-first segmented message expected from the client at" + "Protocol Violation: Not-a-first segmented message expected from the client at" " %s:%d: %s, disconnecting...", __FILE__, __LINE__, inetAddressToString(*getLastReadBufferSocketAddress()).c_str()); invalidDataStreamHandler(); diff --git a/src/remoteClient/clientContextImpl.cpp b/src/remoteClient/clientContextImpl.cpp index da79f38..b8a215a 100644 --- a/src/remoteClient/clientContextImpl.cpp +++ b/src/remoteClient/clientContextImpl.cpp @@ -10,6 +10,7 @@ #include #include +#include #include #include #include @@ -73,6 +74,9 @@ typedef std::map IOIDResponseRequestM catch (std::exception &e) { LOG(logLevelError, "Unhandled exception caught from client code at %s:%d: %s", __FILE__, __LINE__, e.what()); } \ catch (...) { LOG(logLevelError, "Unhandled exception caught from client code at %s:%d.", __FILE__, __LINE__); }} +#define SEND_MESSAGE(WEAK, PTR, MSG, MTYPE) \ +do{requester_type::shared_pointer PTR((WEAK).lock()); if(PTR) (PTR)->message(MSG, MTYPE); }while(0) + /** * Base channel request. * @author Matej Sekoranja @@ -125,16 +129,21 @@ public: protected: - ClientChannelImpl::shared_pointer m_channel; + const ClientChannelImpl::shared_pointer m_channel; /* negative... */ static const int NULL_REQUEST = -1; static const int PURE_DESTROY_REQUEST = -2; static const int PURE_CANCEL_REQUEST = -3; + // const after activate() pvAccessID m_ioid; +private: + // holds: NULL_REQUEST, PURE_DESTROY_REQUEST, PURE_CANCEL_REQUEST, or + // a mask of QOS_* int32 m_pendingRequest; +protected: Mutex m_mutex; @@ -221,22 +230,29 @@ protected: bool startRequest(int32 qos) { Lock guard(m_mutex); - // we allow pure destroy... - if (m_pendingRequest != NULL_REQUEST && qos != PURE_DESTROY_REQUEST && qos != PURE_CANCEL_REQUEST) - return false; + if(qos==PURE_DESTROY_REQUEST) + {/* always allow destroy */} + else if(qos==PURE_CANCEL_REQUEST && m_pendingRequest!=PURE_DESTROY_REQUEST) + {/* cancel overrides all but destroy */} + else if(m_pendingRequest==NULL_REQUEST) + {/* anything whenidle */} + else + {return false; /* others not allowed */} m_pendingRequest = qos; return true; } - void stopRequest() { + int32 beginRequest() { Lock guard(m_mutex); + int32 ret = m_pendingRequest; m_pendingRequest = NULL_REQUEST; + return ret; } - int32 getPendingRequest() { + void abortRequest() { Lock guard(m_mutex); - return m_pendingRequest; + m_pendingRequest = NULL_REQUEST; } public: @@ -262,9 +278,8 @@ public: if (status.isSuccess()) { // once created set destroy flag - m_mutex.lock(); + Lock G(m_mutex); m_initialized = true; - m_mutex.unlock(); } initResponse(transport, version, payloadBuffer, qos, status); @@ -275,10 +290,9 @@ public: if (qos & QOS_DESTROY) { - m_mutex.lock(); + Lock G(m_mutex); m_initialized = false; destroyReq = true; - m_mutex.unlock(); } normalResponse(transport, version, payloadBuffer, qos, status); @@ -328,11 +342,13 @@ public: virtual void destroy(bool createRequestFailed) { + bool initd; { Lock guard(m_mutex); if (m_destroyed) return; m_destroyed = true; + initd = m_initialized; } // unregister response request @@ -340,7 +356,7 @@ public: m_channel->unregisterResponseRequest(m_ioid); // destroy remote instance - if (!createRequestFailed && m_initialized) + if (!createRequestFailed && initd) { try { @@ -369,7 +385,7 @@ public: else if (status == Channel::DISCONNECTED) { m_subscribed.clear(); - stopRequest(); + abortRequest(); } // TODO notify? } @@ -384,10 +400,11 @@ public: void updateSubscription() {} - virtual void send(ByteBuffer* buffer, TransportSendControl* control) OVERRIDE { - int8 qos = getPendingRequest(); - if (qos == -1) + // sub-class send() calls me + void base_send(ByteBuffer* buffer, TransportSendControl* control, int8 qos) { + if (qos == NULL_REQUEST) { return; + } else if (qos == PURE_DESTROY_REQUEST) { control->startMessage((int8)CMD_DESTROY_REQUEST, 8); @@ -400,7 +417,6 @@ public: buffer->putInt(m_channel->getServerChannelID()); buffer->putInt(m_ioid); } - stopRequest(); } }; @@ -427,15 +443,14 @@ class ChannelProcessRequestImpl : public ChannelProcess { public: - requester_type::weak_pointer m_callback; - PVStructure::shared_pointer m_pvRequest; + const requester_type::weak_pointer m_callback; + const PVStructure::shared_pointer m_pvRequest; ChannelProcessRequestImpl(ClientChannelImpl::shared_pointer const & channel, ChannelProcessRequester::shared_pointer const & callback, PVStructure::shared_pointer const & pvRequest) : BaseRequestImpl(channel), m_callback(callback), m_pvRequest(pvRequest) - { - } + {} virtual void activate() OVERRIDE FINAL { @@ -453,32 +468,28 @@ public: } } - ~ChannelProcessRequestImpl() - { - } + virtual ~ChannelProcessRequestImpl() {} ChannelBaseRequester::shared_pointer getRequester() OVERRIDE FINAL { return m_callback.lock(); } virtual void send(ByteBuffer* buffer, TransportSendControl* control) OVERRIDE FINAL { - int32 pendingRequest = getPendingRequest(); + int32 pendingRequest = beginRequest(); if (pendingRequest < 0) { - BaseRequestImpl::send(buffer, control); + base_send(buffer, control, pendingRequest); return; } control->startMessage((int8)CMD_PROCESS, 9); buffer->putInt(m_channel->getServerChannelID()); buffer->putInt(m_ioid); - buffer->putByte((int8)m_pendingRequest); + buffer->putByte((int8)pendingRequest); if (pendingRequest & QOS_INIT) { // pvRequest SerializationHelper::serializePVRequest(buffer, control, m_pvRequest); } - - stopRequest(); } virtual void initResponse(Transport::shared_pointer const & /*transport*/, int8 /*version*/, ByteBuffer* /*payloadBuffer*/, int8 /*qos*/, const Status& status) OVERRIDE FINAL { @@ -513,7 +524,7 @@ public: try { m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this()); } catch (std::runtime_error &rte) { - stopRequest(); + abortRequest(); EXCEPTION_GUARD3(m_callback, cb, cb->processDone(channelNotConnected, thisPtr)); } } @@ -551,9 +562,9 @@ class ChannelGetImpl : public ChannelGet { public: - ChannelGetRequester::weak_pointer m_callback; + const ChannelGetRequester::weak_pointer m_callback; - PVStructure::shared_pointer m_pvRequest; + const PVStructure::shared_pointer m_pvRequest; PVStructure::shared_pointer m_structure; BitSet::shared_pointer m_bitSet; @@ -597,27 +608,25 @@ public: ChannelBaseRequester::shared_pointer getRequester() OVERRIDE FINAL { return m_callback.lock(); } virtual void send(ByteBuffer* buffer, TransportSendControl* control) OVERRIDE FINAL { - int32 pendingRequest = getPendingRequest(); + int32 pendingRequest = beginRequest(); bool initStage = ((pendingRequest & QOS_INIT) != 0); if (pendingRequest < 0) { - BaseRequestImpl::send(buffer, control); + base_send(buffer, control, pendingRequest); return; } control->startMessage((int8)CMD_GET, 9); buffer->putInt(m_channel->getServerChannelID()); buffer->putInt(m_ioid); - buffer->putByte((int8)m_pendingRequest); + buffer->putByte((int8)pendingRequest); if (initStage) { // pvRequest SerializationHelper::serializePVRequest(buffer, control, m_pvRequest); } - - stopRequest(); } virtual void initResponse(Transport::shared_pointer const & transport, int8 /*version*/, ByteBuffer* payloadBuffer, int8 /*qos*/, const Status& status) OVERRIDE FINAL { @@ -678,7 +687,7 @@ public: try { m_channel->checkAndGetTransport()->flushSendQueue(); } catch (std::runtime_error &rte) { - stopRequest(); + abortRequest(); EXCEPTION_GUARD3(m_callback, cb, cb->getDone(channelNotConnected, thisPtr)); } return; @@ -693,7 +702,7 @@ public: m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this()); //TODO bulk hack m_channel->checkAndGetTransport()->enqueueOnlySendRequest(thisSender); } catch (std::runtime_error &rte) { - stopRequest(); + abortRequest(); EXCEPTION_GUARD3(m_callback, cb, cb->getDone(channelNotConnected, thisPtr, PVStructurePtr(), BitSetPtr())); } } @@ -741,9 +750,9 @@ class ChannelPutImpl : public ChannelPut { public: - ChannelPutRequester::weak_pointer m_callback; + const ChannelPutRequester::weak_pointer m_callback; - PVStructure::shared_pointer m_pvRequest; + const PVStructure::shared_pointer m_pvRequest; PVStructure::shared_pointer m_structure; BitSet::shared_pointer m_bitSet; @@ -787,17 +796,17 @@ public: ChannelBaseRequester::shared_pointer getRequester() OVERRIDE FINAL { return m_callback.lock(); } virtual void send(ByteBuffer* buffer, TransportSendControl* control) OVERRIDE FINAL { - int32 pendingRequest = getPendingRequest(); + int32 pendingRequest = beginRequest(); if (pendingRequest < 0) { - BaseRequestImpl::send(buffer, control); + base_send(buffer, control, pendingRequest); return; } control->startMessage((int8)CMD_PUT, 9); buffer->putInt(m_channel->getServerChannelID()); buffer->putInt(m_ioid); - buffer->putByte((int8)m_pendingRequest); + buffer->putByte((int8)pendingRequest); if (pendingRequest & QOS_INIT) { @@ -815,8 +824,6 @@ public: m_structure->serialize(buffer, control, m_bitSet.get()); } } - - stopRequest(); } virtual void initResponse(Transport::shared_pointer const & transport, int8 /*version*/, ByteBuffer* payloadBuffer, int8 /*qos*/, const Status& status) OVERRIDE FINAL { @@ -888,7 +895,7 @@ public: try { m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this()); } catch (std::runtime_error &rte) { - stopRequest(); + abortRequest(); EXCEPTION_GUARD3(m_callback, cb, cb->getDone(channelNotConnected, thisPtr, PVStructurePtr(), BitSetPtr())); } } @@ -933,7 +940,7 @@ public: unlock(); m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this()); } catch (std::runtime_error &rte) { - stopRequest(); + abortRequest(); EXCEPTION_GUARD3(m_callback, cb, cb->putDone(channelNotConnected, thisPtr)); } } @@ -981,9 +988,9 @@ class ChannelPutGetImpl : public ChannelPutGet { public: - ChannelPutGetRequester::weak_pointer m_callback; + const ChannelPutGetRequester::weak_pointer m_callback; - PVStructure::shared_pointer m_pvRequest; + const PVStructure::shared_pointer m_pvRequest; // put data container PVStructure::shared_pointer m_putData; @@ -1030,10 +1037,10 @@ public: ChannelBaseRequester::shared_pointer getRequester() OVERRIDE FINAL { return m_callback.lock(); } virtual void send(ByteBuffer* buffer, TransportSendControl* control) OVERRIDE FINAL { - int32 pendingRequest = getPendingRequest(); + int32 pendingRequest = beginRequest(); if (pendingRequest < 0) { - BaseRequestImpl::send(buffer, control); + base_send(buffer, control, pendingRequest); return; } @@ -1062,8 +1069,6 @@ public: m_putData->serialize(buffer, control, m_putDataBitSet.get()); } } - - stopRequest(); } virtual void initResponse(Transport::shared_pointer const & transport, int8 /*version*/, ByteBuffer* payloadBuffer, int8 /*qos*/, const Status& status) OVERRIDE FINAL { @@ -1184,7 +1189,7 @@ public: unlock(); m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this()); } catch (std::runtime_error &rte) { - stopRequest(); + abortRequest(); EXCEPTION_GUARD3(m_callback, cb, cb->putGetDone(channelNotConnected, thisPtr, PVStructurePtr(), BitSetPtr())); } } @@ -1213,7 +1218,7 @@ public: try { m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this()); } catch (std::runtime_error &rte) { - stopRequest(); + abortRequest(); EXCEPTION_GUARD3(m_callback, cb, cb->getGetDone(channelNotConnected, thisPtr, PVStructurePtr(), BitSetPtr())); } } @@ -1242,7 +1247,7 @@ public: try { m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this()); } catch (std::runtime_error &rte) { - stopRequest(); + abortRequest(); EXCEPTION_GUARD3(m_callback, cb, cb->getPutDone(channelNotConnected, thisPtr, PVStructurePtr(), BitSetPtr())); } } @@ -1293,9 +1298,9 @@ class ChannelRPCImpl : public ChannelRPC { public: - ChannelRPCRequester::weak_pointer m_callback; + const ChannelRPCRequester::weak_pointer m_callback; - PVStructure::shared_pointer m_pvRequest; + const PVStructure::shared_pointer m_pvRequest; PVStructure::shared_pointer m_structure; @@ -1336,18 +1341,18 @@ public: ChannelBaseRequester::shared_pointer getRequester() OVERRIDE FINAL { return m_callback.lock(); } virtual void send(ByteBuffer* buffer, TransportSendControl* control) OVERRIDE FINAL { - int32 pendingRequest = getPendingRequest(); + int32 pendingRequest = beginRequest(); if (pendingRequest < 0) { - BaseRequestImpl::send(buffer, control); + base_send(buffer, control, pendingRequest); return; } control->startMessage((int8)CMD_RPC, 9); buffer->putInt(m_channel->getServerChannelID()); buffer->putInt(m_ioid); - if ((m_pendingRequest & QOS_INIT) == 0) - buffer->putByte((int8)m_pendingRequest); + if ((pendingRequest & QOS_INIT) == 0) + buffer->putByte((int8)pendingRequest); if (pendingRequest & QOS_INIT) { @@ -1366,8 +1371,6 @@ public: m_structure.reset(); } } - - stopRequest(); } virtual void initResponse(Transport::shared_pointer const & /*transport*/, int8 /*version*/, ByteBuffer* /*payloadBuffer*/, int8 /*qos*/, const Status& status) OVERRIDE FINAL { @@ -1424,7 +1427,7 @@ public: m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this()); } catch (std::runtime_error &rte) { - stopRequest(); + abortRequest(); EXCEPTION_GUARD3(m_callback, cb, cb->requestDone(channelNotConnected, thisPtr, PVStructurePtr())); } } @@ -1473,9 +1476,9 @@ class ChannelArrayImpl : public ChannelArray { public: - ChannelArrayRequester::weak_pointer m_callback; + const ChannelArrayRequester::weak_pointer m_callback; - PVStructure::shared_pointer m_pvRequest; + const PVStructure::shared_pointer m_pvRequest; // data container PVArray::shared_pointer m_arrayData; @@ -1525,17 +1528,17 @@ public: ChannelBaseRequester::shared_pointer getRequester() OVERRIDE FINAL { return m_callback.lock(); } virtual void send(ByteBuffer* buffer, TransportSendControl* control) OVERRIDE FINAL { - int32 pendingRequest = getPendingRequest(); + int32 pendingRequest = beginRequest(); if (pendingRequest < 0) { - BaseRequestImpl::send(buffer, control); + base_send(buffer, control, pendingRequest); return; } control->startMessage((int8)CMD_ARRAY, 9); buffer->putInt(m_channel->getServerChannelID()); buffer->putInt(m_ioid); - buffer->putByte((int8)m_pendingRequest); + buffer->putByte((int8)pendingRequest); if (pendingRequest & QOS_INIT) { @@ -1570,8 +1573,6 @@ public: m_arrayData->serialize(buffer, control, 0, m_count ? m_count : m_arrayData->getLength()); // put from 0 offset (see API doc), m_count == 0 means entire array } } - - stopRequest(); } virtual void initResponse(Transport::shared_pointer const & transport, int8 /*version*/, ByteBuffer* payloadBuffer, int8 /*qos*/, const Status& status) OVERRIDE FINAL { @@ -1660,7 +1661,7 @@ public: } m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this()); } catch (std::runtime_error &rte) { - stopRequest(); + abortRequest(); EXCEPTION_GUARD3(m_callback, cb, cb->getArrayDone(channelNotConnected, thisPtr, PVArray::shared_pointer())); } } @@ -1704,7 +1705,7 @@ public: } m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this()); } catch (std::runtime_error &rte) { - stopRequest(); + abortRequest(); EXCEPTION_GUARD3(m_callback, cb, cb->putArrayDone(channelNotConnected, thisPtr)); } } @@ -1737,7 +1738,7 @@ public: } m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this()); } catch (std::runtime_error &rte) { - stopRequest(); + abortRequest(); EXCEPTION_GUARD3(m_callback, cb, cb->setLengthDone(channelNotConnected, thisPtr)); } } @@ -1767,7 +1768,7 @@ public: try { m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this()); } catch (std::runtime_error &rte) { - stopRequest(); + abortRequest(); EXCEPTION_GUARD3(m_callback, cb, cb->getLengthDone(channelNotConnected, thisPtr, 0)); } } @@ -1828,14 +1829,14 @@ class MonitorStrategyQueue : { private: - int32 m_queueSize; + const int32 m_queueSize; StructureConstPtr m_lastStructure; FreeElementQueue m_freeQueue; MonitorElementQueue m_monitorQueue; - MonitorRequester::weak_pointer m_callback; + const MonitorRequester::weak_pointer m_callback; Mutex m_mutex; @@ -1853,11 +1854,11 @@ private: bool m_reportQueueStateInProgress; // TODO check for cyclic-ref - ClientChannelImpl::shared_pointer m_channel; - pvAccessID m_ioid; + const ClientChannelImpl::shared_pointer m_channel; + const pvAccessID m_ioid; - bool m_pipeline; - int32 m_ackAny; + const bool m_pipeline; + const int32 m_ackAny; bool m_unlisten; @@ -1887,9 +1888,7 @@ public: //m_monitorQueue.reserve(m_queueSize); } - virtual ~MonitorStrategyQueue() - { - } + virtual ~MonitorStrategyQueue() {} virtual void init(StructureConstPtr const & structure) OVERRIDE FINAL { Lock guard(m_mutex); @@ -2121,10 +2120,10 @@ class ChannelMonitorImpl : public Monitor { public: - MonitorRequester::weak_pointer m_callback; + const MonitorRequester::weak_pointer m_callback; bool m_started; - PVStructure::shared_pointer m_pvRequest; + const PVStructure::shared_pointer m_pvRequest; std::tr1::shared_ptr m_monitorStrategy; @@ -2157,18 +2156,25 @@ public: PVStructurePtr pvOptions = m_pvRequest->getSubField("record._options"); if (pvOptions) { - PVStringPtr pvString = pvOptions->getSubField("queueSize"); - if (pvString) { - int32 size; - std::stringstream ss; - ss << pvString->get(); - ss >> size; - if (size > 1) - m_queueSize = size; + PVScalarPtr option(pvOptions->getSubField("queueSize")); + if (option) { + try { + m_queueSize = option->getAs(); + if(m_queueSize<2) + m_queueSize = 2; + }catch(std::runtime_error& e){ + SEND_MESSAGE(m_callback, cb, "Invalid queueSize=", warningMessage); + } + } + + option = pvOptions->getSubField("pipeline"); + if (option) { + try { + m_pipeline = option->getAs(); + }catch(std::runtime_error& e){ + SEND_MESSAGE(m_callback, cb, "Invalid pipeline=", warningMessage); + } } - pvString = pvOptions->getSubField("pipeline"); - if (pvString) - m_pipeline = (pvString->get() == "true"); // pipeline options if (m_pipeline) @@ -2176,23 +2182,40 @@ public: // defaults to queueSize/2 m_ackAny = m_queueSize/2; - pvString = pvOptions->getSubField("ackAny"); - if (pvString) { - int32 size; - string sval = pvString->get(); - string::size_type slen = sval.length(); - bool percentage = (slen > 0) && (sval[slen-1] == '%'); - if (percentage) - sval = sval.substr(0, slen-1); - std::stringstream ss; - ss << sval; - ss >> size; - if (percentage) - size = (m_queueSize * size) / 100; - if (size <= 0) + bool done = false; + int32 size; + + option = pvOptions->getSubField("ackAny"); + if (option) { + if(option->getScalar()->getScalarType()==pvString) { + std::string sval(option->getAs()); + + if(!sval.empty() && sval[sval.size()-1]=='%') { + try { + double percent = castUnsafe(sval.substr(0, sval.size()-1)); + size = (m_queueSize * percent) / 100.0; + done = true; + }catch(std::runtime_error&){ + SEND_MESSAGE(m_callback, cb, "ackAny= invalid precentage", warningMessage); + } + } + } + + if(!done) { + try { + size = option->getAs(); + done = true; + }catch(std::runtime_error&){ + SEND_MESSAGE(m_callback, cb, "ackAny= invalid value", warningMessage); + } + } + + if(!done) { + } else if (size <= 0) { m_ackAny = 1; - else + } else { m_ackAny = (m_ackAny <= m_queueSize) ? size : m_queueSize; + } } } } @@ -2224,17 +2247,15 @@ public: } } - virtual ~ChannelMonitorImpl() - { - } + virtual ~ChannelMonitorImpl() {} ChannelBaseRequester::shared_pointer getRequester() OVERRIDE FINAL { return m_callback.lock(); } virtual void send(ByteBuffer* buffer, TransportSendControl* control) OVERRIDE FINAL { - int32 pendingRequest = getPendingRequest(); + int32 pendingRequest = beginRequest(); if (pendingRequest < 0) { - BaseRequestImpl::send(buffer, control); + base_send(buffer, control, pendingRequest); return; } @@ -2255,8 +2276,6 @@ public: buffer->putInt(m_queueSize); } } - - stopRequest(); } virtual void initResponse( @@ -2276,6 +2295,8 @@ public: dynamic_pointer_cast( transport->cachedDeserialize(payloadBuffer) ); + if(!structure) + throw std::runtime_error("initResponse() w/o Structure"); m_monitorStrategy->init(structure); bool restoreStartedState = m_started; @@ -2328,9 +2349,8 @@ public: status.deserialize(payloadBuffer, transport.get()); if (status.isSuccess()) { - m_mutex.lock(); + Lock G(m_mutex); m_initialized = true; - m_mutex.unlock(); } initResponse(transport, version, payloadBuffer, qos, status); } @@ -2339,9 +2359,10 @@ public: Status status; status.deserialize(payloadBuffer, transport.get()); - m_mutex.lock(); - m_initialized = false; - m_mutex.unlock(); + { + Lock G(m_mutex); + m_initialized = false; + } normalResponse(transport, version, payloadBuffer, qos, status); } @@ -2373,7 +2394,7 @@ public: m_started = true; return Status::Ok; } catch (std::runtime_error &rte) { - stopRequest(); + abortRequest(); return BaseRequestImpl::channelNotConnected; } } @@ -2399,7 +2420,7 @@ public: m_started = false; return Status::Ok; } catch (std::runtime_error &rte) { - stopRequest(); + abortRequest(); return BaseRequestImpl::channelNotConnected; } } @@ -2448,30 +2469,6 @@ public: }; -class BadResponse : public AbstractClientResponseHandler, private epics::pvData::NoDefaultMethods { -public: - BadResponse(ClientContextImpl::shared_pointer const & context) : - AbstractClientResponseHandler(context, "Bad response") - { - } - - virtual ~BadResponse() { - } - - virtual void handleResponse(osiSockAddr* responseFrom, - Transport::shared_pointer const & /*transport*/, int8 /*version*/, int8 command, - size_t /*payloadSize*/, epics::pvData::ByteBuffer* /*payloadBuffer*/) OVERRIDE FINAL - { - char ipAddrStr[48]; - ipAddrToDottedIP(&responseFrom->ia, ipAddrStr, sizeof(ipAddrStr)); - - LOG(logLevelInfo, - "Undecipherable message (bad response type %d) from %s.", - command, ipAddrStr); - } -}; - - class ResponseRequestHandler : public AbstractClientResponseHandler, private epics::pvData::NoDefaultMethods { public: ResponseRequestHandler(ClientContextImpl::shared_pointer const & context) : @@ -2694,11 +2691,9 @@ class BeaconResponseHandler : public AbstractClientResponseHandler, private epic public: BeaconResponseHandler(ClientContextImpl::shared_pointer const & context) : AbstractClientResponseHandler(context, "Beacon") - { - } + {} - virtual ~BeaconResponseHandler() { - } + virtual ~BeaconResponseHandler() {} virtual void handleResponse(osiSockAddr* responseFrom, Transport::shared_pointer const & transport, int8 version, int8 command, @@ -2763,11 +2758,9 @@ class ClientConnectionValidationHandler : public AbstractClientResponseHandler, public: ClientConnectionValidationHandler(ClientContextImpl::shared_pointer context) : AbstractClientResponseHandler(context, "Connection validation") - { - } + {} - virtual ~ClientConnectionValidationHandler() { - } + virtual ~ClientConnectionValidationHandler() {} virtual void handleResponse(osiSockAddr* responseFrom, Transport::shared_pointer const & transport, int8 version, int8 command, @@ -2805,11 +2798,9 @@ class ClientConnectionValidatedHandler : public AbstractClientResponseHandler, p public: ClientConnectionValidatedHandler(ClientContextImpl::shared_pointer context) : AbstractClientResponseHandler(context, "Connection validated") - { - } + {} - virtual ~ClientConnectionValidatedHandler() { - } + virtual ~ClientConnectionValidatedHandler() {} virtual void handleResponse(osiSockAddr* responseFrom, Transport::shared_pointer const & transport, int8 version, int8 command, @@ -2828,11 +2819,9 @@ class MessageHandler : public AbstractClientResponseHandler, private epics::pvDa public: MessageHandler(ClientContextImpl::shared_pointer const & context) : AbstractClientResponseHandler(context, "Message") - { - } + {} - virtual ~MessageHandler() { - } + virtual ~MessageHandler() {} virtual void handleResponse(osiSockAddr* responseFrom, Transport::shared_pointer const & transport, int8 version, int8 command, @@ -2865,11 +2854,9 @@ class CreateChannelHandler : public AbstractClientResponseHandler, private epics public: CreateChannelHandler(ClientContextImpl::shared_pointer const & context) : AbstractClientResponseHandler(context, "Create channel") - { - } + {} - virtual ~CreateChannelHandler() { - } + virtual ~CreateChannelHandler() {} virtual void handleResponse(osiSockAddr* responseFrom, Transport::shared_pointer const & transport, int8 version, int8 command, @@ -2917,11 +2904,9 @@ class DestroyChannelHandler : public AbstractClientResponseHandler, private epic public: DestroyChannelHandler(ClientContextImpl::shared_pointer const & context) : AbstractClientResponseHandler(context, "Destroy channel") - { - } + {} - virtual ~DestroyChannelHandler() { - } + virtual ~DestroyChannelHandler() {} virtual void handleResponse(osiSockAddr* responseFrom, Transport::shared_pointer const & transport, int8 version, int8 command, @@ -2955,8 +2940,7 @@ private: public: - virtual ~ClientResponseHandler() { - } + virtual ~ClientResponseHandler() {} /** * @param context @@ -2964,18 +2948,18 @@ public: ClientResponseHandler(ClientContextImpl::shared_pointer const & context) :ResponseHandler(context.get(), "ClientResponseHandler") { - ResponseHandler::shared_pointer badResponse(new BadResponse(context)); + ResponseHandler::shared_pointer ignoreResponse(new NoopResponse(context, "Ignore")); ResponseHandler::shared_pointer dataResponse(new ResponseRequestHandler(context)); m_handlerTable.resize(CMD_CANCEL_REQUEST+1); m_handlerTable[CMD_BEACON].reset(new BeaconResponseHandler(context)); /* 0 */ m_handlerTable[CMD_CONNECTION_VALIDATION].reset(new ClientConnectionValidationHandler(context)); /* 1 */ - m_handlerTable[CMD_ECHO].reset(new NoopResponse(context, "Echo")); /* 2 */ + m_handlerTable[CMD_ECHO] = ignoreResponse; /* 2 */ m_handlerTable[CMD_SEARCH].reset(new SearchHandler(context)); /* 3 */ m_handlerTable[CMD_SEARCH_RESPONSE].reset(new SearchResponseHandler(context)); /* 4 */ m_handlerTable[CMD_AUTHNZ].reset(new AuthNZHandler(context.get())); /* 5 */ - m_handlerTable[CMD_ACL_CHANGE].reset(new NoopResponse(context, "Access rights change")); /* 6 */ + m_handlerTable[CMD_ACL_CHANGE] = ignoreResponse; /* 6 */ m_handlerTable[CMD_CREATE_CHANNEL].reset(new CreateChannelHandler(context)); /* 7 */ m_handlerTable[CMD_DESTROY_CHANNEL].reset(new DestroyChannelHandler(context)); /* 8 */ m_handlerTable[CMD_CONNECTION_VALIDATED].reset(new ClientConnectionValidatedHandler(context)); /* 9 */ @@ -2984,13 +2968,13 @@ public: m_handlerTable[CMD_PUT_GET] = dataResponse; /* 12 - put-get response */ m_handlerTable[CMD_MONITOR] = dataResponse; /* 13 - monitor response */ m_handlerTable[CMD_ARRAY] = dataResponse; /* 14 - array response */ - m_handlerTable[CMD_DESTROY_REQUEST] = badResponse; /* 15 - destroy request */ + m_handlerTable[CMD_DESTROY_REQUEST] = ignoreResponse; /* 15 - destroy request */ m_handlerTable[CMD_PROCESS] = dataResponse; /* 16 - process response */ m_handlerTable[CMD_GET_FIELD] = dataResponse; /* 17 - get field response */ m_handlerTable[CMD_MESSAGE].reset(new MessageHandler(context)); /* 18 - message to Requester */ m_handlerTable[CMD_MULTIPLE_DATA].reset(new MultipleResponseRequestHandler(context)); /* 19 - grouped monitors */ m_handlerTable[CMD_RPC] = dataResponse; /* 20 - RPC response */ - m_handlerTable[CMD_CANCEL_REQUEST] = badResponse; /* 21 - cancel request */ + m_handlerTable[CMD_CANCEL_REQUEST] = ignoreResponse; /* 21 - cancel request */ } virtual void handleResponse(osiSockAddr* responseFrom, @@ -3131,22 +3115,22 @@ public: /** * Context. */ - std::tr1::shared_ptr m_context; + const std::tr1::shared_ptr m_context; /** * Client channel ID. */ - pvAccessID m_channelID; + const pvAccessID m_channelID; /** * Channel name. */ - string m_name; + const string m_name; /** * Channel requester. */ - ChannelRequester::weak_pointer m_requester; + const ChannelRequester::weak_pointer m_requester; public: //! The in-progress GetField operation. @@ -3157,7 +3141,7 @@ public: /** * Process priority. */ - short m_priority; + const short m_priority; /** * List of fixed addresses, if name resolution will be used. @@ -4720,6 +4704,7 @@ public: const GetFieldRequester::weak_pointer m_callback; string m_subField; + // const after activate() pvAccessID m_ioid; Mutex m_mutex; @@ -4786,7 +4771,7 @@ public: } virtual void send(ByteBuffer* buffer, TransportSendControl* control) OVERRIDE FINAL { - control->startMessage((int8)17, 8); + control->startMessage((int8)CMD_GET_FIELD, 8); buffer->putInt(m_channel->getServerChannelID()); buffer->putInt(m_ioid); SerializeHelper::serializeString(m_subField, buffer, control); diff --git a/testApp/remote/channelAccessIFTest.cpp b/testApp/remote/channelAccessIFTest.cpp index 99a4210..c95c2a2 100644 --- a/testApp/remote/channelAccessIFTest.cpp +++ b/testApp/remote/channelAccessIFTest.cpp @@ -1755,7 +1755,7 @@ void ChannelAccessIFTest::test_channelMonitorWithInvalidRequesterAndRequest() { void ChannelAccessIFTest::test_channelMonitor(int queueSize) { - testDiag("BEGIN TEST %s:", CURRENT_FUNCTION); + testDiag("BEGIN TEST %s: queueSize=%d", CURRENT_FUNCTION, queueSize); ostringstream ostream; ostream << queueSize; string request = "record[queueSize=";