cleanup phase II

This commit is contained in:
Matej Sekoranja
2012-07-26 21:38:02 +02:00
parent 4f15626281
commit 835ce323f1
6 changed files with 105 additions and 143 deletions

View File

@@ -39,7 +39,7 @@ namespace pvAccess {
* - 8(UDP) - some reserve (the MTU of Ethernet is currently independent
* of its speed variant)
*/
const epics::pvData::int32 MAX_UDP_SEND = 1440;
const epics::pvData::int32 MAX_UDP_UNFRAGMENTED_SEND = 1440;
/**
* UDP maximum receive message size.

View File

@@ -77,6 +77,7 @@ bool BeaconHandler::updateBeacon(int8 remoteTransportRevision, TimeStamp* timest
void BeaconHandler::changedTransport()
{
// TODO why only TCP, actually TCP does not need this
auto_ptr<TransportRegistry::transportVector_t> transports =
_context.lock()->getTransportRegistry()->get("TCP", &_responseFrom);
if (!transports.get())

View File

@@ -77,10 +77,10 @@ class ChannelSearchManager {
virtual void searchResponse(pvAccessID cid, int32_t seqNo, int8_t minorRevision, osiSockAddr* serverAddress) = 0;
/**
* Beacon anomaly detected.
* New server detected.
* Boost searching of all channels.
*/
virtual void beaconAnomalyNotify() = 0;
virtual void newServerDetected() = 0;
/**
* Cancel.

View File

@@ -48,7 +48,7 @@ SimpleChannelSearchManagerImpl::SimpleChannelSearchManagerImpl(Context::shared_p
m_context(context),
m_canceled(),
m_sequenceNumber(0),
m_sendBuffer(MAX_UDP_SEND),
m_sendBuffer(MAX_UDP_UNFRAGMENTED_SEND),
m_channels(),
m_lastTimeSent(),
m_mockTransportSendControl(),
@@ -150,7 +150,7 @@ void SimpleChannelSearchManagerImpl::searchResponse(pvAccessID cid, int32_t seqN
}
}
void SimpleChannelSearchManagerImpl::beaconAnomalyNotify()
void SimpleChannelSearchManagerImpl::newServerDetected()
{
boost();
callback();

View File

@@ -27,8 +27,7 @@ public:
void ensureBuffer(std::size_t) {}
void alignBuffer(std::size_t alignment) {}
void flushSerializeBuffer() {}
void cachedSerialize(
const std::tr1::shared_ptr<const epics::pvData::Field>& field, epics::pvData::ByteBuffer* buffer)
void cachedSerialize(const std::tr1::shared_ptr<const epics::pvData::Field>& field, epics::pvData::ByteBuffer* buffer)
{
// no cache
field->serialize(buffer, this);
@@ -82,10 +81,10 @@ class SimpleChannelSearchManagerImpl :
*/
void searchResponse(pvAccessID cid, int32_t seqNo, int8_t minorRevision, osiSockAddr* serverAddress);
/**
* Beacon anomaly detected.
* New server detected.
* Boost searching of all channels.
*/
void beaconAnomalyNotify();
void newServerDetected();
/// Timer callback.
void callback();

View File

@@ -98,6 +98,9 @@ namespace epics {
static Status otherRequestPendingStatus;
static Status pvRequestNull;
static PVStructure::shared_pointer nullPVStructure;
static BitSet::shared_pointer nullBitSet;
protected:
ChannelImpl::shared_pointer m_channel;
@@ -122,7 +125,7 @@ namespace epics {
virtual ~BaseRequestImpl() {};
BaseRequestImpl(ChannelImpl::shared_pointer const & channel, Requester::shared_pointer requester) :
BaseRequestImpl(ChannelImpl::shared_pointer const & channel, Requester::shared_pointer const & requester) :
m_channel(channel),
m_requester(requester),
m_ioid(INVALID_IOID),
@@ -245,8 +248,7 @@ namespace epics {
try
{
startRequest(PURE_DESTROY_REQUEST);
TransportSender::shared_pointer thisSender = shared_from_this();
m_channel->checkAndGetTransport()->enqueueSendRequest(thisSender);
m_channel->checkAndGetTransport()->enqueueSendRequest(shared_from_this());
} catch (...) {
// noop (do not complain if fails)
}
@@ -264,6 +266,7 @@ namespace epics {
void reportStatus(const Status& status) {
// destroy, since channel (parent) was destroyed
// NOTE: by-ref compare, not nice
if (&status == &ChannelImpl::channelDestroyed)
destroy();
else if (&status == &ChannelImpl::channelDisconnected)
@@ -300,8 +303,8 @@ namespace epics {
Status BaseRequestImpl::otherRequestPendingStatus = Status(Status::STATUSTYPE_ERROR, "other request pending");
Status BaseRequestImpl::pvRequestNull = Status(Status::STATUSTYPE_ERROR, "pvRequest == 0");
PVStructure::shared_pointer BaseRequestImpl::nullPVStructure;
BitSet::shared_pointer BaseRequestImpl::nullBitSet;
@@ -317,7 +320,7 @@ namespace epics {
PVStructure::shared_pointer m_pvRequest;
ChannelProcessRequestImpl(ChannelImpl::shared_pointer const & channel, ChannelProcessRequester::shared_pointer const & callback, PVStructure::shared_pointer const & pvRequest) :
BaseRequestImpl(channel, static_pointer_cast<Requester>(callback)),
BaseRequestImpl(channel, callback),
m_callback(callback),
m_pvRequest(pvRequest)
{
@@ -333,8 +336,7 @@ namespace epics {
// TODO best-effort support
try {
Transport::shared_pointer tp = m_channel->checkAndGetTransport();
resubscribeSubscription(tp);
resubscribeSubscription(m_channel->checkAndGetTransport());
} catch (std::runtime_error &rte) {
ChannelProcess::shared_pointer thisPointer = dynamic_pointer_cast<ChannelProcess>(shared_from_this());
EXCEPTION_GUARD(m_callback->channelProcessConnect(channelNotConnected, thisPointer));
@@ -415,8 +417,7 @@ namespace epics {
}
try {
TransportSender::shared_pointer thisSender = shared_from_this();
m_channel->checkAndGetTransport()->enqueueSendRequest(thisSender);
m_channel->checkAndGetTransport()->enqueueSendRequest(shared_from_this());
} catch (std::runtime_error &rte) {
stopRequest();
EXCEPTION_GUARD(m_callback->processDone(channelNotConnected));
@@ -425,8 +426,7 @@ namespace epics {
virtual void resubscribeSubscription(Transport::shared_pointer const & transport) {
startRequest(QOS_INIT);
TransportSender::shared_pointer thisSender = shared_from_this();
transport->enqueueSendRequest(thisSender);
transport->enqueueSendRequest(shared_from_this());
}
virtual void destroy()
@@ -466,35 +466,31 @@ namespace epics {
Mutex m_structureMutex;
ChannelGetImpl(ChannelImpl::shared_pointer const & channel, ChannelGetRequester::shared_pointer const & channelGetRequester, PVStructure::shared_pointer const & pvRequest) :
BaseRequestImpl(channel, static_pointer_cast<Requester>(channelGetRequester)),
m_channelGetRequester(channelGetRequester), m_pvRequest(pvRequest)
BaseRequestImpl(channel, channelGetRequester),
m_channelGetRequester(channelGetRequester),
m_pvRequest(pvRequest)
{
PVACCESS_REFCOUNT_MONITOR_CONSTRUCT(channelGet);
}
void activate()
{
BaseRequestImpl::activate();
if (m_pvRequest == 0)
{
ChannelGet::shared_pointer thisPointer = dynamic_pointer_cast<ChannelGet>(shared_from_this());
PVStructure::shared_pointer nullPVStructure;
BitSet::shared_pointer nullBitSet;
EXCEPTION_GUARD(m_channelGetRequester->channelGetConnect(pvRequestNull, thisPointer, nullPVStructure, nullBitSet));
return;
}
BaseRequestImpl::activate();
// TODO immediate get, i.e. get data with init message
// TODO one-time get, i.e. immediate get + lastRequest
try {
Transport::shared_pointer transport = m_channel->checkAndGetTransport();
resubscribeSubscription(transport);
resubscribeSubscription(m_channel->checkAndGetTransport());
} catch (std::runtime_error &rte) {
ChannelGet::shared_pointer thisPointer = dynamic_pointer_cast<ChannelGet>(shared_from_this());
PVStructure::shared_pointer nullPVStructure;
BitSet::shared_pointer nullBitSet;
EXCEPTION_GUARD(m_channelGetRequester->channelGetConnect(channelNotConnected, thisPointer, nullPVStructure, nullBitSet));
}
}
@@ -545,8 +541,6 @@ namespace epics {
if (!status.isSuccess())
{
ChannelGet::shared_pointer thisPointer = dynamic_pointer_cast<ChannelGet>(shared_from_this());
PVStructure::shared_pointer nullPVStructure;
BitSet::shared_pointer nullBitSet;
EXCEPTION_GUARD(m_channelGetRequester->channelGetConnect(status, thisPointer, nullPVStructure, nullBitSet));
return true;
}
@@ -614,8 +608,7 @@ namespace epics {
}
try {
TransportSender::shared_pointer thisSender = shared_from_this();
m_channel->checkAndGetTransport()->enqueueSendRequest(thisSender);
m_channel->checkAndGetTransport()->enqueueSendRequest(shared_from_this());
//TODO bulk hack m_channel->checkAndGetTransport()->enqueueOnlySendRequest(thisSender);
} catch (std::runtime_error &rte) {
stopRequest();
@@ -625,8 +618,7 @@ namespace epics {
virtual void resubscribeSubscription(Transport::shared_pointer const & transport) {
startRequest(QOS_INIT);
TransportSender::shared_pointer thisSender = shared_from_this();
transport->enqueueSendRequest(thisSender);
transport->enqueueSendRequest(shared_from_this());
}
virtual void destroy()
@@ -657,7 +649,9 @@ namespace epics {
PVACCESS_REFCOUNT_MONITOR_DEFINE(channelPut);
class ChannelPutImpl : public BaseRequestImpl, public ChannelPut
class ChannelPutImpl :
public BaseRequestImpl,
public ChannelPut
{
private:
ChannelPutRequester::shared_pointer m_channelPutRequester;
@@ -670,35 +664,31 @@ namespace epics {
Mutex m_structureMutex;
ChannelPutImpl(ChannelImpl::shared_pointer const & channel, ChannelPutRequester::shared_pointer const & channelPutRequester, PVStructure::shared_pointer const & pvRequest) :
BaseRequestImpl(channel, static_pointer_cast<Requester>(channelPutRequester)),
m_channelPutRequester(channelPutRequester), m_pvRequest(pvRequest)
BaseRequestImpl(channel, channelPutRequester),
m_channelPutRequester(channelPutRequester),
m_pvRequest(pvRequest)
{
PVACCESS_REFCOUNT_MONITOR_CONSTRUCT(channelPut);
}
void activate()
{
BaseRequestImpl::activate();
if (m_pvRequest == 0)
{
ChannelPut::shared_pointer thisPointer = dynamic_pointer_cast<ChannelPut>(shared_from_this());
PVStructure::shared_pointer nullPVStructure;
BitSet::shared_pointer nullBitSet;
EXCEPTION_GUARD(m_channelPutRequester->channelPutConnect(pvRequestNull, thisPointer, nullPVStructure, nullBitSet));
return;
}
BaseRequestImpl::activate();
// TODO low-overhead put
// TODO best-effort put
try {
Transport::shared_pointer transport = m_channel->checkAndGetTransport();
resubscribeSubscription(transport);
resubscribeSubscription(m_channel->checkAndGetTransport());
} catch (std::runtime_error &rte) {
ChannelPut::shared_pointer thisPointer = dynamic_pointer_cast<ChannelPut>(shared_from_this());
PVStructure::shared_pointer nullPVStructure;
BitSet::shared_pointer nullBitSet;
EXCEPTION_GUARD(m_channelPutRequester->channelPutConnect(channelNotConnected, thisPointer, nullPVStructure, nullBitSet));
}
}
@@ -758,8 +748,6 @@ namespace epics {
if (!status.isSuccess())
{
ChannelPut::shared_pointer nullChannelPut;
PVStructure::shared_pointer nullPVStructure;
BitSet::shared_pointer nullBitSet;
EXCEPTION_GUARD(m_channelPutRequester->channelPutConnect(status, nullChannelPut, nullPVStructure, nullBitSet));
return true;
}
@@ -822,8 +810,7 @@ namespace epics {
try {
TransportSender::shared_pointer thisSender = shared_from_this();
m_channel->checkAndGetTransport()->enqueueSendRequest(thisSender);
m_channel->checkAndGetTransport()->enqueueSendRequest(shared_from_this());
} catch (std::runtime_error &rte) {
stopRequest();
EXCEPTION_GUARD(m_channelPutRequester->getDone(channelNotConnected));
@@ -850,8 +837,7 @@ namespace epics {
}
try {
TransportSender::shared_pointer thisSender = shared_from_this();
m_channel->checkAndGetTransport()->enqueueSendRequest(thisSender);
m_channel->checkAndGetTransport()->enqueueSendRequest(shared_from_this());
} catch (std::runtime_error &rte) {
stopRequest();
EXCEPTION_GUARD(m_channelPutRequester->putDone(channelNotConnected));
@@ -860,8 +846,7 @@ namespace epics {
virtual void resubscribeSubscription(Transport::shared_pointer const & transport) {
startRequest(QOS_INIT);
TransportSender::shared_pointer thisSender = shared_from_this();
transport->enqueueSendRequest(thisSender);
transport->enqueueSendRequest(shared_from_this());
}
virtual void destroy()
@@ -889,7 +874,9 @@ namespace epics {
PVACCESS_REFCOUNT_MONITOR_DEFINE(channelPutGet);
class ChannelPutGetImpl : public BaseRequestImpl, public ChannelPutGet
class ChannelPutGetImpl :
public BaseRequestImpl,
public ChannelPutGet
{
private:
ChannelPutGetRequester::shared_pointer m_channelPutGetRequester;
@@ -902,30 +889,28 @@ namespace epics {
Mutex m_structureMutex;
ChannelPutGetImpl(ChannelImpl::shared_pointer const & channel, ChannelPutGetRequester::shared_pointer const & channelPutGetRequester, PVStructure::shared_pointer const & pvRequest) :
BaseRequestImpl(channel, static_pointer_cast<Requester>(channelPutGetRequester)),
m_channelPutGetRequester(channelPutGetRequester), m_pvRequest(pvRequest)
BaseRequestImpl(channel, channelPutGetRequester),
m_channelPutGetRequester(channelPutGetRequester),
m_pvRequest(pvRequest)
{
PVACCESS_REFCOUNT_MONITOR_CONSTRUCT(channelPutGet);
}
void activate()
{
BaseRequestImpl::activate();
if (m_pvRequest == 0)
{
ChannelPutGet::shared_pointer thisPointer = dynamic_pointer_cast<ChannelPutGet>(shared_from_this());
PVStructure::shared_pointer nullPVStructure;
EXCEPTION_GUARD(m_channelPutGetRequester->channelPutGetConnect(pvRequestNull, thisPointer, nullPVStructure, nullPVStructure));
return;
}
BaseRequestImpl::activate();
try {
Transport::shared_pointer transport = m_channel->checkAndGetTransport();
resubscribeSubscription(transport);
resubscribeSubscription(m_channel->checkAndGetTransport());
} catch (std::runtime_error &rte) {
ChannelPutGet::shared_pointer thisPointer = dynamic_pointer_cast<ChannelPutGet>(shared_from_this());
PVStructure::shared_pointer nullPVStructure;
EXCEPTION_GUARD(m_channelPutGetRequester->channelPutGetConnect(channelNotConnected, thisPointer, nullPVStructure, nullPVStructure));
}
}
@@ -989,7 +974,6 @@ namespace epics {
if (!status.isSuccess())
{
ChannelPutGet::shared_pointer nullChannelPutGet;
PVStructure::shared_pointer nullPVStructure;
EXCEPTION_GUARD(m_channelPutGetRequester->channelPutGetConnect(status, nullChannelPutGet, nullPVStructure, nullPVStructure));
return true;
}
@@ -1081,8 +1065,7 @@ namespace epics {
}
try {
TransportSender::shared_pointer thisSender = shared_from_this();
m_channel->checkAndGetTransport()->enqueueSendRequest(thisSender);
m_channel->checkAndGetTransport()->enqueueSendRequest(shared_from_this());
} catch (std::runtime_error &rte) {
stopRequest();
EXCEPTION_GUARD(m_channelPutGetRequester->putGetDone(channelNotConnected));
@@ -1108,8 +1091,7 @@ namespace epics {
}
try {
TransportSender::shared_pointer thisSender = shared_from_this();
m_channel->checkAndGetTransport()->enqueueSendRequest(thisSender);
m_channel->checkAndGetTransport()->enqueueSendRequest(shared_from_this());
} catch (std::runtime_error &rte) {
stopRequest();
EXCEPTION_GUARD(m_channelPutGetRequester->getGetDone(channelNotConnected));
@@ -1135,8 +1117,7 @@ namespace epics {
}
try {
TransportSender::shared_pointer thisSender = shared_from_this();
m_channel->checkAndGetTransport()->enqueueSendRequest(thisSender);
m_channel->checkAndGetTransport()->enqueueSendRequest(shared_from_this());
} catch (std::runtime_error &rte) {
stopRequest();
EXCEPTION_GUARD(m_channelPutGetRequester->getPutDone(channelNotConnected));
@@ -1145,8 +1126,7 @@ namespace epics {
virtual void resubscribeSubscription(Transport::shared_pointer const & transport) {
startRequest(QOS_INIT);
TransportSender::shared_pointer thisSender = shared_from_this();
transport->enqueueSendRequest(thisSender);
transport->enqueueSendRequest(shared_from_this());
}
virtual void destroy()
@@ -1177,7 +1157,9 @@ namespace epics {
PVACCESS_REFCOUNT_MONITOR_DEFINE(channelRPC);
class ChannelRPCImpl : public BaseRequestImpl, public ChannelRPC
class ChannelRPCImpl :
public BaseRequestImpl,
public ChannelRPC
{
private:
ChannelRPCRequester::shared_pointer m_channelRPCRequester;
@@ -1189,16 +1171,15 @@ namespace epics {
Mutex m_structureMutex;
ChannelRPCImpl(ChannelImpl::shared_pointer const & channel, ChannelRPCRequester::shared_pointer const & channelRPCRequester, PVStructure::shared_pointer const & pvRequest) :
BaseRequestImpl(channel, static_pointer_cast<Requester>(channelRPCRequester)),
m_channelRPCRequester(channelRPCRequester), m_pvRequest(pvRequest)
BaseRequestImpl(channel, channelRPCRequester),
m_channelRPCRequester(channelRPCRequester),
m_pvRequest(pvRequest)
{
PVACCESS_REFCOUNT_MONITOR_CONSTRUCT(channelRPC);
}
void activate()
{
BaseRequestImpl::activate();
if (m_pvRequest == 0)
{
ChannelRPC::shared_pointer thisPointer = dynamic_pointer_cast<ChannelRPC>(shared_from_this());
@@ -1206,10 +1187,11 @@ namespace epics {
return;
}
BaseRequestImpl::activate();
// subscribe
try {
Transport::shared_pointer transport = m_channel->checkAndGetTransport();
resubscribeSubscription(transport);
resubscribeSubscription(m_channel->checkAndGetTransport());
} catch (std::runtime_error &rte) {
ChannelRPC::shared_pointer thisPointer = dynamic_pointer_cast<ChannelRPC>(shared_from_this());
EXCEPTION_GUARD(m_channelRPCRequester->channelRPCConnect(channelNotConnected, thisPointer));
@@ -1287,7 +1269,6 @@ namespace epics {
virtual bool normalResponse(Transport::shared_pointer const & transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, const Status& status) {
if (!status.isSuccess())
{
PVStructure::shared_pointer nullPVStructure;
EXCEPTION_GUARD(m_channelRPCRequester->requestDone(status, nullPVStructure));
return true;
}
@@ -1303,19 +1284,16 @@ namespace epics {
{
Lock guard(m_mutex);
if (m_destroyed) {
PVStructure::shared_pointer nullPVStructure;
EXCEPTION_GUARD(m_channelRPCRequester->requestDone(destroyedStatus, nullPVStructure));
return;
}
if (!m_initialized) {
PVStructure::shared_pointer nullPVStructure;
EXCEPTION_GUARD(m_channelRPCRequester->requestDone(notInitializedStatus, nullPVStructure));
return;
}
}
if (!startRequest(lastRequest ? QOS_DESTROY : QOS_DEFAULT)) {
PVStructure::shared_pointer nullPVStructure;
EXCEPTION_GUARD(m_channelRPCRequester->requestDone(otherRequestPendingStatus, nullPVStructure));
return;
}
@@ -1325,19 +1303,16 @@ namespace epics {
m_structure = pvArgument;
m_structureMutex.unlock();
TransportSender::shared_pointer thisSender = shared_from_this();
m_channel->checkAndGetTransport()->enqueueSendRequest(thisSender);
m_channel->checkAndGetTransport()->enqueueSendRequest(shared_from_this());
} catch (std::runtime_error &rte) {
stopRequest();
PVStructure::shared_pointer nullPVStructure;
EXCEPTION_GUARD(m_channelRPCRequester->requestDone(channelNotConnected, nullPVStructure));
}
}
virtual void resubscribeSubscription(Transport::shared_pointer const & transport) {
startRequest(QOS_INIT);
TransportSender::shared_pointer thisSender = shared_from_this();
transport->enqueueSendRequest(thisSender);
transport->enqueueSendRequest(shared_from_this());
}
virtual void destroy()
@@ -1366,7 +1341,9 @@ namespace epics {
PVACCESS_REFCOUNT_MONITOR_DEFINE(channelArray);
class ChannelArrayImpl : public BaseRequestImpl, public ChannelArray
class ChannelArrayImpl :
public BaseRequestImpl,
public ChannelArray
{
private:
ChannelArrayRequester::shared_pointer m_channelArrayRequester;
@@ -1375,6 +1352,7 @@ namespace epics {
PVArray::shared_pointer m_structure;
// TODO revise int32 !!!
int32 m_offset;
int32 m_count;
@@ -1384,33 +1362,32 @@ namespace epics {
Mutex m_structureMutex;
ChannelArrayImpl(ChannelImpl::shared_pointer const & channel, ChannelArrayRequester::shared_pointer const & channelArrayRequester, PVStructure::shared_pointer const & pvRequest) :
BaseRequestImpl(channel, static_pointer_cast<Requester>(channelArrayRequester)),
m_channelArrayRequester(channelArrayRequester), m_pvRequest(pvRequest),
m_offset(0), m_count(0), m_length(-1), m_capacity(-1)
BaseRequestImpl(channel, channelArrayRequester),
m_channelArrayRequester(channelArrayRequester),
m_pvRequest(pvRequest),
m_offset(0), m_count(0),
m_length(-1), m_capacity(-1)
{
PVACCESS_REFCOUNT_MONITOR_CONSTRUCT(channelArray);
}
void activate()
{
BaseRequestImpl::activate();
if (m_pvRequest == 0)
{
ChannelArray::shared_pointer thisPointer = dynamic_pointer_cast<ChannelArray>(shared_from_this());
PVArray::shared_pointer nullPVArray;
EXCEPTION_GUARD(m_channelArrayRequester->channelArrayConnect(pvRequestNull, thisPointer, nullPVArray));
EXCEPTION_GUARD(m_channelArrayRequester->channelArrayConnect(pvRequestNull, thisPointer, PVArray::shared_pointer()));
return;
}
BaseRequestImpl::activate();
// subscribe
try {
Transport::shared_pointer transport = m_channel->checkAndGetTransport();
resubscribeSubscription(transport);
resubscribeSubscription(m_channel->checkAndGetTransport());
} catch (std::runtime_error &rte) {
ChannelArray::shared_pointer thisPointer = dynamic_pointer_cast<ChannelArray>(shared_from_this());
PVArray::shared_pointer nullPVArray;
EXCEPTION_GUARD(m_channelArrayRequester->channelArrayConnect(channelNotConnected, thisPointer, nullPVArray));
EXCEPTION_GUARD(m_channelArrayRequester->channelArrayConnect(channelNotConnected, thisPointer, PVArray::shared_pointer()));
}
}
@@ -1480,8 +1457,7 @@ namespace epics {
if (!status.isSuccess())
{
ChannelArray::shared_pointer nullChannelArray;
PVArray::shared_pointer nullPVArray;
EXCEPTION_GUARD(m_channelArrayRequester->channelArrayConnect(status, nullChannelArray, nullPVArray));
EXCEPTION_GUARD(m_channelArrayRequester->channelArrayConnect(status, nullChannelArray, PVArray::shared_pointer()));
return true;
}
@@ -1550,8 +1526,7 @@ namespace epics {
try {
m_offset = offset;
m_count = count;
TransportSender::shared_pointer thisSender = shared_from_this();
m_channel->checkAndGetTransport()->enqueueSendRequest(thisSender);
m_channel->checkAndGetTransport()->enqueueSendRequest(shared_from_this());
} catch (std::runtime_error &rte) {
stopRequest();
EXCEPTION_GUARD(m_channelArrayRequester->getArrayDone(channelNotConnected));
@@ -1580,8 +1555,7 @@ namespace epics {
try {
m_offset = offset;
m_count = count;
TransportSender::shared_pointer thisSender = shared_from_this();
m_channel->checkAndGetTransport()->enqueueSendRequest(thisSender);
m_channel->checkAndGetTransport()->enqueueSendRequest(shared_from_this());
} catch (std::runtime_error &rte) {
stopRequest();
EXCEPTION_GUARD(m_channelArrayRequester->putArrayDone(channelNotConnected));
@@ -1610,8 +1584,7 @@ namespace epics {
try {
m_length = length;
m_capacity = capacity;
TransportSender::shared_pointer thisSender = shared_from_this();
m_channel->checkAndGetTransport()->enqueueSendRequest(thisSender);
m_channel->checkAndGetTransport()->enqueueSendRequest(shared_from_this());
} catch (std::runtime_error &rte) {
stopRequest();
EXCEPTION_GUARD(m_channelArrayRequester->setLengthDone(channelNotConnected));
@@ -1621,8 +1594,7 @@ namespace epics {
virtual void resubscribeSubscription(Transport::shared_pointer const & transport) {
startRequest(QOS_INIT);
TransportSender::shared_pointer thisSender = shared_from_this();
transport->enqueueSendRequest(thisSender);
transport->enqueueSendRequest(shared_from_this());
}
virtual void destroy()
@@ -1693,11 +1665,9 @@ namespace epics {
// enqueue send request
try {
TransportSender::shared_pointer thisSender = shared_from_this();
m_channel->checkAndGetTransport()->enqueueSendRequest(thisSender);
m_channel->checkAndGetTransport()->enqueueSendRequest(shared_from_this());
} catch (std::runtime_error &rte) {
FieldConstPtr nullField;
EXCEPTION_GUARD(m_callback->getDone(BaseRequestImpl::channelNotConnected, nullField));
EXCEPTION_GUARD(m_callback->getDone(BaseRequestImpl::channelNotConnected, FieldConstPtr()));
}
}
@@ -1782,12 +1752,10 @@ namespace epics {
}
else
{
FieldConstPtr nullField;
EXCEPTION_GUARD(m_callback->getDone(status, nullField));
EXCEPTION_GUARD(m_callback->getDone(status, FieldConstPtr()));
}
cancel();
}
@@ -1825,7 +1793,8 @@ namespace epics {
public:
MonitorStrategyNotify(MonitorRequester::shared_pointer const & callback) :
m_callback(callback), m_gotMonitor(false), m_mutex(), m_monitorElement(new MonitorElement())
m_callback(callback), m_gotMonitor(false),
m_mutex(), m_monitorElement(new MonitorElement())
{
}
@@ -1888,7 +1857,8 @@ namespace epics {
public:
MonitorStrategyEntire(MonitorRequester::shared_pointer const & callback) :
m_callback(callback), m_gotMonitor(false), m_mutex(), m_monitorElement(new MonitorElement())
m_callback(callback), m_gotMonitor(false),
m_mutex(), m_monitorElement(new MonitorElement())
{
}
@@ -1966,7 +1936,7 @@ namespace epics {
public:
MonitorStrategySingle(MonitorRequester::shared_pointer callback) :
MonitorStrategySingle(MonitorRequester::shared_pointer const & callback) :
m_callback(callback), m_gotMonitor(false), m_mutex(),
m_needToCompress(false), m_monitorElement(new MonitorElement())
{
@@ -2078,7 +2048,7 @@ namespace epics {
std::tr1::shared_ptr<MonitorStrategy> m_monitorStrategy;
ChannelMonitorImpl(ChannelImpl::shared_pointer const & channel, MonitorRequester::shared_pointer const & monitorRequester, PVStructure::shared_pointer const & pvRequest) :
BaseRequestImpl(channel, static_pointer_cast<Requester>(monitorRequester)),
BaseRequestImpl(channel, monitorRequester),
m_monitorRequester(monitorRequester),
m_started(false),
m_pvRequest(pvRequest)
@@ -2088,17 +2058,14 @@ namespace epics {
void activate()
{
BaseRequestImpl::activate();
if (m_pvRequest == 0)
{
Monitor::shared_pointer thisPointer = dynamic_pointer_cast<Monitor>(shared_from_this());
StructureConstPtr nullPVStructure;
EXCEPTION_GUARD(m_monitorRequester->monitorConnect(pvRequestNull, thisPointer, nullPVStructure));
EXCEPTION_GUARD(m_monitorRequester->monitorConnect(pvRequestNull, thisPointer, StructureConstPtr()));
return;
}
int queueSize = 2;
int queueSize = 2;
PVFieldPtr pvField = m_pvRequest->getSubField("record.queueSize");
if (pvField.get()) {
PVStringPtr pvString = dynamic_pointer_cast<PVString>(pvField);
@@ -2112,13 +2079,14 @@ namespace epics {
{
Status failedToConvert(Status::STATUSTYPE_ERROR, "queueSize type is not a valid integer");
Monitor::shared_pointer thisPointer = dynamic_pointer_cast<Monitor>(shared_from_this());
StructureConstPtr nullPVStructure;
EXCEPTION_GUARD(m_monitorRequester->monitorConnect(failedToConvert, thisPointer, nullPVStructure));
EXCEPTION_GUARD(m_monitorRequester->monitorConnect(failedToConvert, thisPointer, StructureConstPtr()));
return;
}
}
}
BaseRequestImpl::activate();
if (queueSize == -1)
m_monitorStrategy.reset(new MonitorStrategyNotify(m_monitorRequester));
else if (queueSize == 0) // 0 means all (old v3 style), some sending optimization can be done (not to send bit-sets)
@@ -2132,12 +2100,10 @@ namespace epics {
// subscribe
try {
Transport::shared_pointer transport = m_channel->checkAndGetTransport();
resubscribeSubscription(transport);
resubscribeSubscription(m_channel->checkAndGetTransport());
} catch (std::runtime_error &rte) {
Monitor::shared_pointer thisPointer = dynamic_pointer_cast<Monitor>(shared_from_this());
StructureConstPtr nullPVStructure;
EXCEPTION_GUARD(m_monitorRequester->monitorConnect(channelNotConnected, thisPointer, nullPVStructure));
EXCEPTION_GUARD(m_monitorRequester->monitorConnect(channelNotConnected, thisPointer, StructureConstPtr()));
}
}
@@ -2186,8 +2152,7 @@ namespace epics {
if (!status.isSuccess())
{
Monitor::shared_pointer nullChannelMonitor;
StructureConstPtr nullPVStructure;
EXCEPTION_GUARD(m_monitorRequester->monitorConnect(status, nullChannelMonitor, nullPVStructure));
EXCEPTION_GUARD(m_monitorRequester->monitorConnect(status, nullChannelMonitor, StructureConstPtr()));
return true;
}
@@ -2221,8 +2186,7 @@ namespace epics {
virtual void resubscribeSubscription(Transport::shared_pointer const & transport) {
startRequest(QOS_INIT);
TransportSender::shared_pointer thisSender = shared_from_this();
transport->enqueueSendRequest(thisSender);
transport->enqueueSendRequest(shared_from_this());
}
// override, since we optimize status
@@ -2278,8 +2242,7 @@ namespace epics {
try
{
TransportSender::shared_pointer thisSender = shared_from_this();
m_channel->checkAndGetTransport()->enqueueSendRequest(thisSender);
m_channel->checkAndGetTransport()->enqueueSendRequest(shared_from_this());
m_started = true;
return Status::Ok;
} catch (std::runtime_error &rte) {
@@ -2305,8 +2268,7 @@ namespace epics {
try
{
TransportSender::shared_pointer thisSender = shared_from_this();
m_channel->checkAndGetTransport()->enqueueSendRequest(thisSender);
m_channel->checkAndGetTransport()->enqueueSendRequest(shared_from_this());
m_started = false;
return Status::Ok;
} catch (std::runtime_error &rte) {
@@ -4144,7 +4106,7 @@ TODO
virtual void newServerDetected()
{
if (m_channelSearchManager)
m_channelSearchManager->beaconAnomalyNotify();
m_channelSearchManager->newServerDetected();
}
/**