This commit is contained in:
mrkraimer
2017-11-20 08:46:58 -05:00
4 changed files with 177 additions and 191 deletions

View File

@@ -7,6 +7,7 @@
#include <map>
#include <vector>
#include <osiSock.h>
#include <epicsThread.h>
#include <pv/lock.h>

View File

@@ -321,7 +321,7 @@ void AbstractCodec::processReadSegmented() {
if (!notFirstSegment)
{
LOG(logLevelWarn,
"Not-a-first segmented message expected from the client at"
"Protocol Violation: Not-a-first segmented message expected from the client at"
" %s:%d: %s, disconnecting...",
__FILE__, __LINE__, inetAddressToString(*getLastReadBufferSocketAddress()).c_str());
invalidDataStreamHandler();

View File

@@ -10,6 +10,7 @@
#include <queue>
#include <stdexcept>
#include <osiSock.h>
#include <pv/lock.h>
#include <pv/timer.h>
#include <pv/bitSetUtil.h>
@@ -73,6 +74,9 @@ typedef std::map<pvAccessID, ResponseRequest::weak_pointer> IOIDResponseRequestM
catch (std::exception &e) { LOG(logLevelError, "Unhandled exception caught from client code at %s:%d: %s", __FILE__, __LINE__, e.what()); } \
catch (...) { LOG(logLevelError, "Unhandled exception caught from client code at %s:%d.", __FILE__, __LINE__); }}
#define SEND_MESSAGE(WEAK, PTR, MSG, MTYPE) \
do{requester_type::shared_pointer PTR((WEAK).lock()); if(PTR) (PTR)->message(MSG, MTYPE); }while(0)
/**
* Base channel request.
* @author <a href="mailto:matej.sekoranjaATcosylab.com">Matej Sekoranja</a>
@@ -125,16 +129,21 @@ public:
protected:
ClientChannelImpl::shared_pointer m_channel;
const ClientChannelImpl::shared_pointer m_channel;
/* negative... */
static const int NULL_REQUEST = -1;
static const int PURE_DESTROY_REQUEST = -2;
static const int PURE_CANCEL_REQUEST = -3;
// const after activate()
pvAccessID m_ioid;
private:
// holds: NULL_REQUEST, PURE_DESTROY_REQUEST, PURE_CANCEL_REQUEST, or
// a mask of QOS_*
int32 m_pendingRequest;
protected:
Mutex m_mutex;
@@ -221,22 +230,29 @@ protected:
bool startRequest(int32 qos) {
Lock guard(m_mutex);
// we allow pure destroy...
if (m_pendingRequest != NULL_REQUEST && qos != PURE_DESTROY_REQUEST && qos != PURE_CANCEL_REQUEST)
return false;
if(qos==PURE_DESTROY_REQUEST)
{/* always allow destroy */}
else if(qos==PURE_CANCEL_REQUEST && m_pendingRequest!=PURE_DESTROY_REQUEST)
{/* cancel overrides all but destroy */}
else if(m_pendingRequest==NULL_REQUEST)
{/* anything whenidle */}
else
{return false; /* others not allowed */}
m_pendingRequest = qos;
return true;
}
void stopRequest() {
int32 beginRequest() {
Lock guard(m_mutex);
int32 ret = m_pendingRequest;
m_pendingRequest = NULL_REQUEST;
return ret;
}
int32 getPendingRequest() {
void abortRequest() {
Lock guard(m_mutex);
return m_pendingRequest;
m_pendingRequest = NULL_REQUEST;
}
public:
@@ -262,9 +278,8 @@ public:
if (status.isSuccess())
{
// once created set destroy flag
m_mutex.lock();
Lock G(m_mutex);
m_initialized = true;
m_mutex.unlock();
}
initResponse(transport, version, payloadBuffer, qos, status);
@@ -275,10 +290,9 @@ public:
if (qos & QOS_DESTROY)
{
m_mutex.lock();
Lock G(m_mutex);
m_initialized = false;
destroyReq = true;
m_mutex.unlock();
}
normalResponse(transport, version, payloadBuffer, qos, status);
@@ -328,11 +342,13 @@ public:
virtual void destroy(bool createRequestFailed) {
bool initd;
{
Lock guard(m_mutex);
if (m_destroyed)
return;
m_destroyed = true;
initd = m_initialized;
}
// unregister response request
@@ -340,7 +356,7 @@ public:
m_channel->unregisterResponseRequest(m_ioid);
// destroy remote instance
if (!createRequestFailed && m_initialized)
if (!createRequestFailed && initd)
{
try
{
@@ -369,7 +385,7 @@ public:
else if (status == Channel::DISCONNECTED)
{
m_subscribed.clear();
stopRequest();
abortRequest();
}
// TODO notify?
}
@@ -384,10 +400,11 @@ public:
void updateSubscription() {}
virtual void send(ByteBuffer* buffer, TransportSendControl* control) OVERRIDE {
int8 qos = getPendingRequest();
if (qos == -1)
// sub-class send() calls me
void base_send(ByteBuffer* buffer, TransportSendControl* control, int8 qos) {
if (qos == NULL_REQUEST) {
return;
}
else if (qos == PURE_DESTROY_REQUEST)
{
control->startMessage((int8)CMD_DESTROY_REQUEST, 8);
@@ -400,7 +417,6 @@ public:
buffer->putInt(m_channel->getServerChannelID());
buffer->putInt(m_ioid);
}
stopRequest();
}
};
@@ -427,15 +443,14 @@ class ChannelProcessRequestImpl :
public ChannelProcess
{
public:
requester_type::weak_pointer m_callback;
PVStructure::shared_pointer m_pvRequest;
const requester_type::weak_pointer m_callback;
const PVStructure::shared_pointer m_pvRequest;
ChannelProcessRequestImpl(ClientChannelImpl::shared_pointer const & channel, ChannelProcessRequester::shared_pointer const & callback, PVStructure::shared_pointer const & pvRequest) :
BaseRequestImpl(channel),
m_callback(callback),
m_pvRequest(pvRequest)
{
}
{}
virtual void activate() OVERRIDE FINAL
{
@@ -453,32 +468,28 @@ public:
}
}
~ChannelProcessRequestImpl()
{
}
virtual ~ChannelProcessRequestImpl() {}
ChannelBaseRequester::shared_pointer getRequester() OVERRIDE FINAL { return m_callback.lock(); }
virtual void send(ByteBuffer* buffer, TransportSendControl* control) OVERRIDE FINAL {
int32 pendingRequest = getPendingRequest();
int32 pendingRequest = beginRequest();
if (pendingRequest < 0)
{
BaseRequestImpl::send(buffer, control);
base_send(buffer, control, pendingRequest);
return;
}
control->startMessage((int8)CMD_PROCESS, 9);
buffer->putInt(m_channel->getServerChannelID());
buffer->putInt(m_ioid);
buffer->putByte((int8)m_pendingRequest);
buffer->putByte((int8)pendingRequest);
if (pendingRequest & QOS_INIT)
{
// pvRequest
SerializationHelper::serializePVRequest(buffer, control, m_pvRequest);
}
stopRequest();
}
virtual void initResponse(Transport::shared_pointer const & /*transport*/, int8 /*version*/, ByteBuffer* /*payloadBuffer*/, int8 /*qos*/, const Status& status) OVERRIDE FINAL {
@@ -513,7 +524,7 @@ public:
try {
m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this<BaseRequestImpl>());
} catch (std::runtime_error &rte) {
stopRequest();
abortRequest();
EXCEPTION_GUARD3(m_callback, cb, cb->processDone(channelNotConnected, thisPtr));
}
}
@@ -551,9 +562,9 @@ class ChannelGetImpl :
public ChannelGet
{
public:
ChannelGetRequester::weak_pointer m_callback;
const ChannelGetRequester::weak_pointer m_callback;
PVStructure::shared_pointer m_pvRequest;
const PVStructure::shared_pointer m_pvRequest;
PVStructure::shared_pointer m_structure;
BitSet::shared_pointer m_bitSet;
@@ -597,27 +608,25 @@ public:
ChannelBaseRequester::shared_pointer getRequester() OVERRIDE FINAL { return m_callback.lock(); }
virtual void send(ByteBuffer* buffer, TransportSendControl* control) OVERRIDE FINAL {
int32 pendingRequest = getPendingRequest();
int32 pendingRequest = beginRequest();
bool initStage = ((pendingRequest & QOS_INIT) != 0);
if (pendingRequest < 0)
{
BaseRequestImpl::send(buffer, control);
base_send(buffer, control, pendingRequest);
return;
}
control->startMessage((int8)CMD_GET, 9);
buffer->putInt(m_channel->getServerChannelID());
buffer->putInt(m_ioid);
buffer->putByte((int8)m_pendingRequest);
buffer->putByte((int8)pendingRequest);
if (initStage)
{
// pvRequest
SerializationHelper::serializePVRequest(buffer, control, m_pvRequest);
}
stopRequest();
}
virtual void initResponse(Transport::shared_pointer const & transport, int8 /*version*/, ByteBuffer* payloadBuffer, int8 /*qos*/, const Status& status) OVERRIDE FINAL {
@@ -678,7 +687,7 @@ public:
try {
m_channel->checkAndGetTransport()->flushSendQueue();
} catch (std::runtime_error &rte) {
stopRequest();
abortRequest();
EXCEPTION_GUARD3(m_callback, cb, cb->getDone(channelNotConnected, thisPtr));
}
return;
@@ -693,7 +702,7 @@ public:
m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this<ChannelGetImpl>());
//TODO bulk hack m_channel->checkAndGetTransport()->enqueueOnlySendRequest(thisSender);
} catch (std::runtime_error &rte) {
stopRequest();
abortRequest();
EXCEPTION_GUARD3(m_callback, cb, cb->getDone(channelNotConnected, thisPtr, PVStructurePtr(), BitSetPtr()));
}
}
@@ -741,9 +750,9 @@ class ChannelPutImpl :
public ChannelPut
{
public:
ChannelPutRequester::weak_pointer m_callback;
const ChannelPutRequester::weak_pointer m_callback;
PVStructure::shared_pointer m_pvRequest;
const PVStructure::shared_pointer m_pvRequest;
PVStructure::shared_pointer m_structure;
BitSet::shared_pointer m_bitSet;
@@ -787,17 +796,17 @@ public:
ChannelBaseRequester::shared_pointer getRequester() OVERRIDE FINAL { return m_callback.lock(); }
virtual void send(ByteBuffer* buffer, TransportSendControl* control) OVERRIDE FINAL {
int32 pendingRequest = getPendingRequest();
int32 pendingRequest = beginRequest();
if (pendingRequest < 0)
{
BaseRequestImpl::send(buffer, control);
base_send(buffer, control, pendingRequest);
return;
}
control->startMessage((int8)CMD_PUT, 9);
buffer->putInt(m_channel->getServerChannelID());
buffer->putInt(m_ioid);
buffer->putByte((int8)m_pendingRequest);
buffer->putByte((int8)pendingRequest);
if (pendingRequest & QOS_INIT)
{
@@ -815,8 +824,6 @@ public:
m_structure->serialize(buffer, control, m_bitSet.get());
}
}
stopRequest();
}
virtual void initResponse(Transport::shared_pointer const & transport, int8 /*version*/, ByteBuffer* payloadBuffer, int8 /*qos*/, const Status& status) OVERRIDE FINAL {
@@ -888,7 +895,7 @@ public:
try {
m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this<ChannelPutImpl>());
} catch (std::runtime_error &rte) {
stopRequest();
abortRequest();
EXCEPTION_GUARD3(m_callback, cb, cb->getDone(channelNotConnected, thisPtr, PVStructurePtr(), BitSetPtr()));
}
}
@@ -933,7 +940,7 @@ public:
unlock();
m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this<ChannelPutImpl>());
} catch (std::runtime_error &rte) {
stopRequest();
abortRequest();
EXCEPTION_GUARD3(m_callback, cb, cb->putDone(channelNotConnected, thisPtr));
}
}
@@ -981,9 +988,9 @@ class ChannelPutGetImpl :
public ChannelPutGet
{
public:
ChannelPutGetRequester::weak_pointer m_callback;
const ChannelPutGetRequester::weak_pointer m_callback;
PVStructure::shared_pointer m_pvRequest;
const PVStructure::shared_pointer m_pvRequest;
// put data container
PVStructure::shared_pointer m_putData;
@@ -1030,10 +1037,10 @@ public:
ChannelBaseRequester::shared_pointer getRequester() OVERRIDE FINAL { return m_callback.lock(); }
virtual void send(ByteBuffer* buffer, TransportSendControl* control) OVERRIDE FINAL {
int32 pendingRequest = getPendingRequest();
int32 pendingRequest = beginRequest();
if (pendingRequest < 0)
{
BaseRequestImpl::send(buffer, control);
base_send(buffer, control, pendingRequest);
return;
}
@@ -1062,8 +1069,6 @@ public:
m_putData->serialize(buffer, control, m_putDataBitSet.get());
}
}
stopRequest();
}
virtual void initResponse(Transport::shared_pointer const & transport, int8 /*version*/, ByteBuffer* payloadBuffer, int8 /*qos*/, const Status& status) OVERRIDE FINAL {
@@ -1184,7 +1189,7 @@ public:
unlock();
m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this<ChannelPutGetImpl>());
} catch (std::runtime_error &rte) {
stopRequest();
abortRequest();
EXCEPTION_GUARD3(m_callback, cb, cb->putGetDone(channelNotConnected, thisPtr, PVStructurePtr(), BitSetPtr()));
}
}
@@ -1213,7 +1218,7 @@ public:
try {
m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this<ChannelPutGetImpl>());
} catch (std::runtime_error &rte) {
stopRequest();
abortRequest();
EXCEPTION_GUARD3(m_callback, cb, cb->getGetDone(channelNotConnected, thisPtr, PVStructurePtr(), BitSetPtr()));
}
}
@@ -1242,7 +1247,7 @@ public:
try {
m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this<ChannelPutGetImpl>());
} catch (std::runtime_error &rte) {
stopRequest();
abortRequest();
EXCEPTION_GUARD3(m_callback, cb, cb->getPutDone(channelNotConnected, thisPtr, PVStructurePtr(), BitSetPtr()));
}
}
@@ -1293,9 +1298,9 @@ class ChannelRPCImpl :
public ChannelRPC
{
public:
ChannelRPCRequester::weak_pointer m_callback;
const ChannelRPCRequester::weak_pointer m_callback;
PVStructure::shared_pointer m_pvRequest;
const PVStructure::shared_pointer m_pvRequest;
PVStructure::shared_pointer m_structure;
@@ -1336,18 +1341,18 @@ public:
ChannelBaseRequester::shared_pointer getRequester() OVERRIDE FINAL { return m_callback.lock(); }
virtual void send(ByteBuffer* buffer, TransportSendControl* control) OVERRIDE FINAL {
int32 pendingRequest = getPendingRequest();
int32 pendingRequest = beginRequest();
if (pendingRequest < 0)
{
BaseRequestImpl::send(buffer, control);
base_send(buffer, control, pendingRequest);
return;
}
control->startMessage((int8)CMD_RPC, 9);
buffer->putInt(m_channel->getServerChannelID());
buffer->putInt(m_ioid);
if ((m_pendingRequest & QOS_INIT) == 0)
buffer->putByte((int8)m_pendingRequest);
if ((pendingRequest & QOS_INIT) == 0)
buffer->putByte((int8)pendingRequest);
if (pendingRequest & QOS_INIT)
{
@@ -1366,8 +1371,6 @@ public:
m_structure.reset();
}
}
stopRequest();
}
virtual void initResponse(Transport::shared_pointer const & /*transport*/, int8 /*version*/, ByteBuffer* /*payloadBuffer*/, int8 /*qos*/, const Status& status) OVERRIDE FINAL {
@@ -1424,7 +1427,7 @@ public:
m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this<ChannelRPCImpl>());
} catch (std::runtime_error &rte) {
stopRequest();
abortRequest();
EXCEPTION_GUARD3(m_callback, cb, cb->requestDone(channelNotConnected, thisPtr, PVStructurePtr()));
}
}
@@ -1473,9 +1476,9 @@ class ChannelArrayImpl :
public ChannelArray
{
public:
ChannelArrayRequester::weak_pointer m_callback;
const ChannelArrayRequester::weak_pointer m_callback;
PVStructure::shared_pointer m_pvRequest;
const PVStructure::shared_pointer m_pvRequest;
// data container
PVArray::shared_pointer m_arrayData;
@@ -1525,17 +1528,17 @@ public:
ChannelBaseRequester::shared_pointer getRequester() OVERRIDE FINAL { return m_callback.lock(); }
virtual void send(ByteBuffer* buffer, TransportSendControl* control) OVERRIDE FINAL {
int32 pendingRequest = getPendingRequest();
int32 pendingRequest = beginRequest();
if (pendingRequest < 0)
{
BaseRequestImpl::send(buffer, control);
base_send(buffer, control, pendingRequest);
return;
}
control->startMessage((int8)CMD_ARRAY, 9);
buffer->putInt(m_channel->getServerChannelID());
buffer->putInt(m_ioid);
buffer->putByte((int8)m_pendingRequest);
buffer->putByte((int8)pendingRequest);
if (pendingRequest & QOS_INIT)
{
@@ -1570,8 +1573,6 @@ public:
m_arrayData->serialize(buffer, control, 0, m_count ? m_count : m_arrayData->getLength()); // put from 0 offset (see API doc), m_count == 0 means entire array
}
}
stopRequest();
}
virtual void initResponse(Transport::shared_pointer const & transport, int8 /*version*/, ByteBuffer* payloadBuffer, int8 /*qos*/, const Status& status) OVERRIDE FINAL {
@@ -1660,7 +1661,7 @@ public:
}
m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this<ChannelArrayImpl>());
} catch (std::runtime_error &rte) {
stopRequest();
abortRequest();
EXCEPTION_GUARD3(m_callback, cb, cb->getArrayDone(channelNotConnected, thisPtr, PVArray::shared_pointer()));
}
}
@@ -1704,7 +1705,7 @@ public:
}
m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this<ChannelArrayImpl>());
} catch (std::runtime_error &rte) {
stopRequest();
abortRequest();
EXCEPTION_GUARD3(m_callback, cb, cb->putArrayDone(channelNotConnected, thisPtr));
}
}
@@ -1737,7 +1738,7 @@ public:
}
m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this<ChannelArrayImpl>());
} catch (std::runtime_error &rte) {
stopRequest();
abortRequest();
EXCEPTION_GUARD3(m_callback, cb, cb->setLengthDone(channelNotConnected, thisPtr));
}
}
@@ -1767,7 +1768,7 @@ public:
try {
m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this<ChannelArrayImpl>());
} catch (std::runtime_error &rte) {
stopRequest();
abortRequest();
EXCEPTION_GUARD3(m_callback, cb, cb->getLengthDone(channelNotConnected, thisPtr, 0));
}
}
@@ -1828,14 +1829,14 @@ class MonitorStrategyQueue :
{
private:
int32 m_queueSize;
const int32 m_queueSize;
StructureConstPtr m_lastStructure;
FreeElementQueue m_freeQueue;
MonitorElementQueue m_monitorQueue;
MonitorRequester::weak_pointer m_callback;
const MonitorRequester::weak_pointer m_callback;
Mutex m_mutex;
@@ -1853,11 +1854,11 @@ private:
bool m_reportQueueStateInProgress;
// TODO check for cyclic-ref
ClientChannelImpl::shared_pointer m_channel;
pvAccessID m_ioid;
const ClientChannelImpl::shared_pointer m_channel;
const pvAccessID m_ioid;
bool m_pipeline;
int32 m_ackAny;
const bool m_pipeline;
const int32 m_ackAny;
bool m_unlisten;
@@ -1887,9 +1888,7 @@ public:
//m_monitorQueue.reserve(m_queueSize);
}
virtual ~MonitorStrategyQueue()
{
}
virtual ~MonitorStrategyQueue() {}
virtual void init(StructureConstPtr const & structure) OVERRIDE FINAL {
Lock guard(m_mutex);
@@ -2121,10 +2120,10 @@ class ChannelMonitorImpl :
public Monitor
{
public:
MonitorRequester::weak_pointer m_callback;
const MonitorRequester::weak_pointer m_callback;
bool m_started;
PVStructure::shared_pointer m_pvRequest;
const PVStructure::shared_pointer m_pvRequest;
std::tr1::shared_ptr<MonitorStrategy> m_monitorStrategy;
@@ -2157,18 +2156,25 @@ public:
PVStructurePtr pvOptions = m_pvRequest->getSubField<PVStructure>("record._options");
if (pvOptions) {
PVStringPtr pvString = pvOptions->getSubField<PVString>("queueSize");
if (pvString) {
int32 size;
std::stringstream ss;
ss << pvString->get();
ss >> size;
if (size > 1)
m_queueSize = size;
PVScalarPtr option(pvOptions->getSubField<PVScalar>("queueSize"));
if (option) {
try {
m_queueSize = option->getAs<int32>();
if(m_queueSize<2)
m_queueSize = 2;
}catch(std::runtime_error& e){
SEND_MESSAGE(m_callback, cb, "Invalid queueSize=", warningMessage);
}
}
option = pvOptions->getSubField<PVScalar>("pipeline");
if (option) {
try {
m_pipeline = option->getAs<epics::pvData::boolean>();
}catch(std::runtime_error& e){
SEND_MESSAGE(m_callback, cb, "Invalid pipeline=", warningMessage);
}
}
pvString = pvOptions->getSubField<PVString>("pipeline");
if (pvString)
m_pipeline = (pvString->get() == "true");
// pipeline options
if (m_pipeline)
@@ -2176,23 +2182,40 @@ public:
// defaults to queueSize/2
m_ackAny = m_queueSize/2;
pvString = pvOptions->getSubField<PVString>("ackAny");
if (pvString) {
int32 size;
string sval = pvString->get();
string::size_type slen = sval.length();
bool percentage = (slen > 0) && (sval[slen-1] == '%');
if (percentage)
sval = sval.substr(0, slen-1);
std::stringstream ss;
ss << sval;
ss >> size;
if (percentage)
size = (m_queueSize * size) / 100;
if (size <= 0)
bool done = false;
int32 size;
option = pvOptions->getSubField<PVScalar>("ackAny");
if (option) {
if(option->getScalar()->getScalarType()==pvString) {
std::string sval(option->getAs<std::string>());
if(!sval.empty() && sval[sval.size()-1]=='%') {
try {
double percent = castUnsafe<double>(sval.substr(0, sval.size()-1));
size = (m_queueSize * percent) / 100.0;
done = true;
}catch(std::runtime_error&){
SEND_MESSAGE(m_callback, cb, "ackAny= invalid precentage", warningMessage);
}
}
}
if(!done) {
try {
size = option->getAs<int32>();
done = true;
}catch(std::runtime_error&){
SEND_MESSAGE(m_callback, cb, "ackAny= invalid value", warningMessage);
}
}
if(!done) {
} else if (size <= 0) {
m_ackAny = 1;
else
} else {
m_ackAny = (m_ackAny <= m_queueSize) ? size : m_queueSize;
}
}
}
}
@@ -2224,17 +2247,15 @@ public:
}
}
virtual ~ChannelMonitorImpl()
{
}
virtual ~ChannelMonitorImpl() {}
ChannelBaseRequester::shared_pointer getRequester() OVERRIDE FINAL { return m_callback.lock(); }
virtual void send(ByteBuffer* buffer, TransportSendControl* control) OVERRIDE FINAL {
int32 pendingRequest = getPendingRequest();
int32 pendingRequest = beginRequest();
if (pendingRequest < 0)
{
BaseRequestImpl::send(buffer, control);
base_send(buffer, control, pendingRequest);
return;
}
@@ -2255,8 +2276,6 @@ public:
buffer->putInt(m_queueSize);
}
}
stopRequest();
}
virtual void initResponse(
@@ -2276,6 +2295,8 @@ public:
dynamic_pointer_cast<const Structure>(
transport->cachedDeserialize(payloadBuffer)
);
if(!structure)
throw std::runtime_error("initResponse() w/o Structure");
m_monitorStrategy->init(structure);
bool restoreStartedState = m_started;
@@ -2328,9 +2349,8 @@ public:
status.deserialize(payloadBuffer, transport.get());
if (status.isSuccess())
{
m_mutex.lock();
Lock G(m_mutex);
m_initialized = true;
m_mutex.unlock();
}
initResponse(transport, version, payloadBuffer, qos, status);
}
@@ -2339,9 +2359,10 @@ public:
Status status;
status.deserialize(payloadBuffer, transport.get());
m_mutex.lock();
m_initialized = false;
m_mutex.unlock();
{
Lock G(m_mutex);
m_initialized = false;
}
normalResponse(transport, version, payloadBuffer, qos, status);
}
@@ -2373,7 +2394,7 @@ public:
m_started = true;
return Status::Ok;
} catch (std::runtime_error &rte) {
stopRequest();
abortRequest();
return BaseRequestImpl::channelNotConnected;
}
}
@@ -2399,7 +2420,7 @@ public:
m_started = false;
return Status::Ok;
} catch (std::runtime_error &rte) {
stopRequest();
abortRequest();
return BaseRequestImpl::channelNotConnected;
}
}
@@ -2448,30 +2469,6 @@ public:
};
class BadResponse : public AbstractClientResponseHandler, private epics::pvData::NoDefaultMethods {
public:
BadResponse(ClientContextImpl::shared_pointer const & context) :
AbstractClientResponseHandler(context, "Bad response")
{
}
virtual ~BadResponse() {
}
virtual void handleResponse(osiSockAddr* responseFrom,
Transport::shared_pointer const & /*transport*/, int8 /*version*/, int8 command,
size_t /*payloadSize*/, epics::pvData::ByteBuffer* /*payloadBuffer*/) OVERRIDE FINAL
{
char ipAddrStr[48];
ipAddrToDottedIP(&responseFrom->ia, ipAddrStr, sizeof(ipAddrStr));
LOG(logLevelInfo,
"Undecipherable message (bad response type %d) from %s.",
command, ipAddrStr);
}
};
class ResponseRequestHandler : public AbstractClientResponseHandler, private epics::pvData::NoDefaultMethods {
public:
ResponseRequestHandler(ClientContextImpl::shared_pointer const & context) :
@@ -2694,11 +2691,9 @@ class BeaconResponseHandler : public AbstractClientResponseHandler, private epic
public:
BeaconResponseHandler(ClientContextImpl::shared_pointer const & context) :
AbstractClientResponseHandler(context, "Beacon")
{
}
{}
virtual ~BeaconResponseHandler() {
}
virtual ~BeaconResponseHandler() {}
virtual void handleResponse(osiSockAddr* responseFrom,
Transport::shared_pointer const & transport, int8 version, int8 command,
@@ -2763,11 +2758,9 @@ class ClientConnectionValidationHandler : public AbstractClientResponseHandler,
public:
ClientConnectionValidationHandler(ClientContextImpl::shared_pointer context) :
AbstractClientResponseHandler(context, "Connection validation")
{
}
{}
virtual ~ClientConnectionValidationHandler() {
}
virtual ~ClientConnectionValidationHandler() {}
virtual void handleResponse(osiSockAddr* responseFrom,
Transport::shared_pointer const & transport, int8 version, int8 command,
@@ -2805,11 +2798,9 @@ class ClientConnectionValidatedHandler : public AbstractClientResponseHandler, p
public:
ClientConnectionValidatedHandler(ClientContextImpl::shared_pointer context) :
AbstractClientResponseHandler(context, "Connection validated")
{
}
{}
virtual ~ClientConnectionValidatedHandler() {
}
virtual ~ClientConnectionValidatedHandler() {}
virtual void handleResponse(osiSockAddr* responseFrom,
Transport::shared_pointer const & transport, int8 version, int8 command,
@@ -2828,11 +2819,9 @@ class MessageHandler : public AbstractClientResponseHandler, private epics::pvDa
public:
MessageHandler(ClientContextImpl::shared_pointer const & context) :
AbstractClientResponseHandler(context, "Message")
{
}
{}
virtual ~MessageHandler() {
}
virtual ~MessageHandler() {}
virtual void handleResponse(osiSockAddr* responseFrom,
Transport::shared_pointer const & transport, int8 version, int8 command,
@@ -2865,11 +2854,9 @@ class CreateChannelHandler : public AbstractClientResponseHandler, private epics
public:
CreateChannelHandler(ClientContextImpl::shared_pointer const & context) :
AbstractClientResponseHandler(context, "Create channel")
{
}
{}
virtual ~CreateChannelHandler() {
}
virtual ~CreateChannelHandler() {}
virtual void handleResponse(osiSockAddr* responseFrom,
Transport::shared_pointer const & transport, int8 version, int8 command,
@@ -2917,11 +2904,9 @@ class DestroyChannelHandler : public AbstractClientResponseHandler, private epic
public:
DestroyChannelHandler(ClientContextImpl::shared_pointer const & context) :
AbstractClientResponseHandler(context, "Destroy channel")
{
}
{}
virtual ~DestroyChannelHandler() {
}
virtual ~DestroyChannelHandler() {}
virtual void handleResponse(osiSockAddr* responseFrom,
Transport::shared_pointer const & transport, int8 version, int8 command,
@@ -2955,8 +2940,7 @@ private:
public:
virtual ~ClientResponseHandler() {
}
virtual ~ClientResponseHandler() {}
/**
* @param context
@@ -2964,18 +2948,18 @@ public:
ClientResponseHandler(ClientContextImpl::shared_pointer const & context)
:ResponseHandler(context.get(), "ClientResponseHandler")
{
ResponseHandler::shared_pointer badResponse(new BadResponse(context));
ResponseHandler::shared_pointer ignoreResponse(new NoopResponse(context, "Ignore"));
ResponseHandler::shared_pointer dataResponse(new ResponseRequestHandler(context));
m_handlerTable.resize(CMD_CANCEL_REQUEST+1);
m_handlerTable[CMD_BEACON].reset(new BeaconResponseHandler(context)); /* 0 */
m_handlerTable[CMD_CONNECTION_VALIDATION].reset(new ClientConnectionValidationHandler(context)); /* 1 */
m_handlerTable[CMD_ECHO].reset(new NoopResponse(context, "Echo")); /* 2 */
m_handlerTable[CMD_ECHO] = ignoreResponse; /* 2 */
m_handlerTable[CMD_SEARCH].reset(new SearchHandler(context)); /* 3 */
m_handlerTable[CMD_SEARCH_RESPONSE].reset(new SearchResponseHandler(context)); /* 4 */
m_handlerTable[CMD_AUTHNZ].reset(new AuthNZHandler(context.get())); /* 5 */
m_handlerTable[CMD_ACL_CHANGE].reset(new NoopResponse(context, "Access rights change")); /* 6 */
m_handlerTable[CMD_ACL_CHANGE] = ignoreResponse; /* 6 */
m_handlerTable[CMD_CREATE_CHANNEL].reset(new CreateChannelHandler(context)); /* 7 */
m_handlerTable[CMD_DESTROY_CHANNEL].reset(new DestroyChannelHandler(context)); /* 8 */
m_handlerTable[CMD_CONNECTION_VALIDATED].reset(new ClientConnectionValidatedHandler(context)); /* 9 */
@@ -2984,13 +2968,13 @@ public:
m_handlerTable[CMD_PUT_GET] = dataResponse; /* 12 - put-get response */
m_handlerTable[CMD_MONITOR] = dataResponse; /* 13 - monitor response */
m_handlerTable[CMD_ARRAY] = dataResponse; /* 14 - array response */
m_handlerTable[CMD_DESTROY_REQUEST] = badResponse; /* 15 - destroy request */
m_handlerTable[CMD_DESTROY_REQUEST] = ignoreResponse; /* 15 - destroy request */
m_handlerTable[CMD_PROCESS] = dataResponse; /* 16 - process response */
m_handlerTable[CMD_GET_FIELD] = dataResponse; /* 17 - get field response */
m_handlerTable[CMD_MESSAGE].reset(new MessageHandler(context)); /* 18 - message to Requester */
m_handlerTable[CMD_MULTIPLE_DATA].reset(new MultipleResponseRequestHandler(context)); /* 19 - grouped monitors */
m_handlerTable[CMD_RPC] = dataResponse; /* 20 - RPC response */
m_handlerTable[CMD_CANCEL_REQUEST] = badResponse; /* 21 - cancel request */
m_handlerTable[CMD_CANCEL_REQUEST] = ignoreResponse; /* 21 - cancel request */
}
virtual void handleResponse(osiSockAddr* responseFrom,
@@ -3131,22 +3115,22 @@ public:
/**
* Context.
*/
std::tr1::shared_ptr<InternalClientContextImpl> m_context;
const std::tr1::shared_ptr<InternalClientContextImpl> m_context;
/**
* Client channel ID.
*/
pvAccessID m_channelID;
const pvAccessID m_channelID;
/**
* Channel name.
*/
string m_name;
const string m_name;
/**
* Channel requester.
*/
ChannelRequester::weak_pointer m_requester;
const ChannelRequester::weak_pointer m_requester;
public:
//! The in-progress GetField operation.
@@ -3157,7 +3141,7 @@ public:
/**
* Process priority.
*/
short m_priority;
const short m_priority;
/**
* List of fixed addresses, if <code<0</code> name resolution will be used.
@@ -4720,6 +4704,7 @@ public:
const GetFieldRequester::weak_pointer m_callback;
string m_subField;
// const after activate()
pvAccessID m_ioid;
Mutex m_mutex;
@@ -4786,7 +4771,7 @@ public:
}
virtual void send(ByteBuffer* buffer, TransportSendControl* control) OVERRIDE FINAL {
control->startMessage((int8)17, 8);
control->startMessage((int8)CMD_GET_FIELD, 8);
buffer->putInt(m_channel->getServerChannelID());
buffer->putInt(m_ioid);
SerializeHelper::serializeString(m_subField, buffer, control);

View File

@@ -1755,7 +1755,7 @@ void ChannelAccessIFTest::test_channelMonitorWithInvalidRequesterAndRequest() {
void ChannelAccessIFTest::test_channelMonitor(int queueSize) {
testDiag("BEGIN TEST %s:", CURRENT_FUNCTION);
testDiag("BEGIN TEST %s: queueSize=%d", CURRENT_FUNCTION, queueSize);
ostringstream ostream;
ostream << queueSize;
string request = "record[queueSize=";