more client ref. loop breaking

This commit is contained in:
Michael Davidsaver
2017-06-19 16:24:11 +02:00
parent c5c6510a13
commit bb31417d4c
2 changed files with 65 additions and 81 deletions

View File

@@ -298,7 +298,7 @@ public:
try
{
startRequest(PURE_CANCEL_REQUEST);
m_channel->checkAndGetTransport()->enqueueSendRequest(external_from_this<BaseRequestImpl>());
m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this<BaseRequestImpl>());
} catch (std::exception& e) {
// noop (do not complain if fails)
LOG(logLevelWarn, "Ignore exception during ChanneGet::cancel: %s", e.what());
@@ -337,7 +337,7 @@ public:
try
{
startRequest(PURE_DESTROY_REQUEST);
m_channel->checkAndGetTransport()->enqueueSendRequest(external_from_this<BaseRequestImpl>());
m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this<BaseRequestImpl>());
} catch (std::tr1::bad_weak_ptr& e) {
// noop (do not complain if fails)
} catch (std::exception& e) {
@@ -368,7 +368,7 @@ public:
if (transport.get() != 0 && !m_subscribed.get() && startRequest(QOS_INIT))
{
m_subscribed.set();
transport->enqueueSendRequest(external_from_this<BaseRequestImpl>());
transport->enqueueSendRequest(internal_from_this<BaseRequestImpl>());
}
}
@@ -503,7 +503,7 @@ public:
}
try {
m_channel->checkAndGetTransport()->enqueueSendRequest(external_from_this<BaseRequestImpl>());
m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this<BaseRequestImpl>());
} catch (std::runtime_error &rte) {
stopRequest();
EXCEPTION_GUARD(m_callback->processDone(channelNotConnected, thisPtr));
@@ -693,7 +693,7 @@ public:
}
try {
m_channel->checkAndGetTransport()->enqueueSendRequest(external_from_this<ChannelGetImpl>());
m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this<ChannelGetImpl>());
//TODO bulk hack m_channel->checkAndGetTransport()->enqueueOnlySendRequest(thisSender);
} catch (std::runtime_error &rte) {
stopRequest();
@@ -893,7 +893,7 @@ public:
try {
m_channel->checkAndGetTransport()->enqueueSendRequest(external_from_this<ChannelPutImpl>());
m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this<ChannelPutImpl>());
} catch (std::runtime_error &rte) {
stopRequest();
EXCEPTION_GUARD(m_channelPutRequester->getDone(channelNotConnected, thisPtr, PVStructurePtr(), BitSetPtr()));
@@ -938,7 +938,7 @@ public:
*m_bitSet = *pvPutBitSet;
m_structure->copyUnchecked(*pvPutStructure, *m_bitSet);
unlock();
m_channel->checkAndGetTransport()->enqueueSendRequest(external_from_this<ChannelPutImpl>());
m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this<ChannelPutImpl>());
} catch (std::runtime_error &rte) {
stopRequest();
EXCEPTION_GUARD(m_channelPutRequester->putDone(channelNotConnected, thisPtr));
@@ -1196,7 +1196,7 @@ public:
*m_putDataBitSet = *bitSet;
m_putData->copyUnchecked(*pvPutStructure, *m_putDataBitSet);
unlock();
m_channel->checkAndGetTransport()->enqueueSendRequest(external_from_this<ChannelPutGetImpl>());
m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this<ChannelPutGetImpl>());
} catch (std::runtime_error &rte) {
stopRequest();
EXCEPTION_GUARD(m_channelPutGetRequester->putGetDone(channelNotConnected, thisPtr, PVStructurePtr(), BitSetPtr()));
@@ -1225,7 +1225,7 @@ public:
}
try {
m_channel->checkAndGetTransport()->enqueueSendRequest(external_from_this<ChannelPutGetImpl>());
m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this<ChannelPutGetImpl>());
} catch (std::runtime_error &rte) {
stopRequest();
EXCEPTION_GUARD(m_channelPutGetRequester->getGetDone(channelNotConnected, thisPtr, PVStructurePtr(), BitSetPtr()));
@@ -1254,7 +1254,7 @@ public:
}
try {
m_channel->checkAndGetTransport()->enqueueSendRequest(external_from_this<ChannelPutGetImpl>());
m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this<ChannelPutGetImpl>());
} catch (std::runtime_error &rte) {
stopRequest();
EXCEPTION_GUARD(m_channelPutGetRequester->getPutDone(channelNotConnected, thisPtr, PVStructurePtr(), BitSetPtr()));
@@ -1437,7 +1437,7 @@ public:
m_structure = pvArgument;
m_structureMutex.unlock();
m_channel->checkAndGetTransport()->enqueueSendRequest(external_from_this<ChannelRPCImpl>());
m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this<ChannelRPCImpl>());
} catch (std::runtime_error &rte) {
stopRequest();
EXCEPTION_GUARD(m_channelRPCRequester->requestDone(channelNotConnected, thisPtr, PVStructurePtr()));
@@ -1674,7 +1674,7 @@ public:
m_count = count;
m_stride = stride;
}
m_channel->checkAndGetTransport()->enqueueSendRequest(external_from_this<ChannelArrayImpl>());
m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this<ChannelArrayImpl>());
} catch (std::runtime_error &rte) {
stopRequest();
EXCEPTION_GUARD(m_channelArrayRequester->getArrayDone(channelNotConnected, thisPtr, PVArray::shared_pointer()));
@@ -1718,7 +1718,7 @@ public:
m_count = count;
m_stride = stride;
}
m_channel->checkAndGetTransport()->enqueueSendRequest(external_from_this<ChannelArrayImpl>());
m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this<ChannelArrayImpl>());
} catch (std::runtime_error &rte) {
stopRequest();
EXCEPTION_GUARD(m_channelArrayRequester->putArrayDone(channelNotConnected, thisPtr));
@@ -1751,7 +1751,7 @@ public:
Lock lock(m_structureMutex);
m_length = length;
}
m_channel->checkAndGetTransport()->enqueueSendRequest(external_from_this<ChannelArrayImpl>());
m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this<ChannelArrayImpl>());
} catch (std::runtime_error &rte) {
stopRequest();
EXCEPTION_GUARD(m_channelArrayRequester->setLengthDone(channelNotConnected, thisPtr));
@@ -1781,7 +1781,7 @@ public:
}
try {
m_channel->checkAndGetTransport()->enqueueSendRequest(external_from_this<ChannelArrayImpl>());
m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this<ChannelArrayImpl>());
} catch (std::runtime_error &rte) {
stopRequest();
EXCEPTION_GUARD(m_channelArrayRequester->getLengthDone(channelNotConnected, thisPtr, 0));
@@ -2237,7 +2237,7 @@ public:
startRequest(m_pipeline ? (QOS_INIT | QOS_GET_PUT) : QOS_INIT))
{
m_subscribed.set();
transport->enqueueSendRequest(external_from_this<ChannelMonitorImpl>());
transport->enqueueSendRequest(internal_from_this<ChannelMonitorImpl>());
}
}
@@ -2386,7 +2386,7 @@ public:
try
{
m_channel->checkAndGetTransport()->enqueueSendRequest(external_from_this<ChannelMonitorImpl>());
m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this<ChannelMonitorImpl>());
m_started = true;
return Status::Ok;
} catch (std::runtime_error &rte) {
@@ -2412,7 +2412,7 @@ public:
try
{
m_channel->checkAndGetTransport()->enqueueSendRequest(external_from_this<ChannelMonitorImpl>());
m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this<ChannelMonitorImpl>());
m_started = false;
return Status::Ok;
} catch (std::runtime_error &rte) {
@@ -3132,13 +3132,21 @@ public:
*/
class InternalChannelImpl :
public ChannelImpl,
public std::tr1::enable_shared_from_this<InternalChannelImpl>,
public TimerCallback
{
public:
POINTER_DEFINITIONS(InternalChannelImpl);
private:
const weak_pointer m_external_this, m_internal_this;
shared_pointer external_from_this() {
return shared_pointer(m_external_this);
}
shared_pointer internal_from_this() {
return shared_pointer(m_internal_this);
}
/**
* Context.
*/
@@ -3267,8 +3275,7 @@ private:
void activate()
{
// register before issuing search request
ChannelImpl::shared_pointer thisPointer = shared_from_this();
m_context->registerChannel(thisPointer);
m_context->registerChannel(internal_from_this());
// connect
connect();
@@ -3283,11 +3290,13 @@ private:
short priority,
auto_ptr<InetAddrVector>& addresses)
{
std::tr1::shared_ptr<InternalChannelImpl> tp(
new InternalChannelImpl(context, channelID, name, requester, priority, addresses));
ChannelImpl::shared_pointer thisPointer = tp;
static_cast<InternalChannelImpl*>(thisPointer.get())->activate();
return thisPointer;
std::tr1::shared_ptr<InternalChannelImpl> internal(
new InternalChannelImpl(context, channelID, name, requester, priority, addresses)),
external(internal.get(), Destroyable::cleaner(internal));
const_cast<weak_pointer&>(internal->m_internal_this) = internal;
const_cast<weak_pointer&>(internal->m_external_this) = external;
internal->activate();
return external;
}
virtual ~InternalChannelImpl()
@@ -3451,7 +3460,7 @@ public:
}
m_transport = transport;
m_transport->enqueueSendRequest(shared_from_this());
m_transport->enqueueSendRequest(internal_from_this());
}
virtual void cancel() {
@@ -3539,8 +3548,7 @@ public:
//throw std::runtime_error("Channel already destroyed.");
}
// do destruction via context
m_context->destroyChannel(shared_from_this(), force);
destroyChannel(force);
}
@@ -3558,8 +3566,10 @@ public:
if (m_connectionState == DESTROYED)
throw std::runtime_error("Channel already destroyed.");
m_getfield.reset();
// stop searching...
SearchInstance::shared_pointer thisChannelPointer = shared_from_this();
shared_pointer thisChannelPointer = internal_from_this();
m_context->getChannelSearchManager()->unregisterSearchInstance(thisChannelPointer);
cancel();
@@ -3580,7 +3590,7 @@ public:
setConnectionState(DESTROYED);
// unregister
m_context->unregisterChannel(shared_from_this());
m_context->unregisterChannel(thisChannelPointer);
}
// should be called without any lock hold
@@ -3603,7 +3613,7 @@ public:
if (!initiateSearch) {
// stop searching...
m_context->getChannelSearchManager()->unregisterSearchInstance(shared_from_this());
m_context->getChannelSearchManager()->unregisterSearchInstance(internal_from_this());
cancel();
}
setConnectionState(DISCONNECTED);
@@ -3615,7 +3625,7 @@ public:
{
if (remoteDestroy) {
m_issueCreateMessage = false;
m_transport->enqueueSendRequest(shared_from_this());
m_transport->enqueueSendRequest(internal_from_this());
}
m_transport->release(getID());
@@ -3651,12 +3661,11 @@ public:
if (!m_addresses.get())
{
m_context->getChannelSearchManager()->registerSearchInstance(shared_from_this(), penalize);
m_context->getChannelSearchManager()->registerSearchInstance(internal_from_this(), penalize);
}
else if (!m_addresses->empty())
{
TimerCallback::shared_pointer tc = std::tr1::dynamic_pointer_cast<TimerCallback>(shared_from_this());
m_context->getTimer()->scheduleAfterDelay(tc,
m_context->getTimer()->scheduleAfterDelay(internal_from_this(),
(m_addressIndex / m_addresses->size())*STATIC_SEARCH_BASE_DELAY_SEC);
}
}
@@ -3698,7 +3707,7 @@ public:
}
// NOTE: this creates a new or acquires an existing transport (implies increases usage count)
transport = m_context->getTransport(shared_from_this(), serverAddress, minorRevision, m_priority);
transport = m_context->getTransport(internal_from_this(), serverAddress, minorRevision, m_priority);
if (!transport.get())
{
createChannelFailed();
@@ -3813,7 +3822,7 @@ public:
void reportChannelStateChange()
{
Channel::shared_pointer self(shared_from_this());
shared_pointer self(external_from_this());
while (true)
{
@@ -3978,49 +3987,49 @@ public:
ChannelProcessRequester::shared_pointer const & requester,
epics::pvData::PVStructure::shared_pointer const & pvRequest)
{
return BaseRequestImpl::build<ChannelProcessRequestImpl>(shared_from_this(), requester, pvRequest);
return BaseRequestImpl::build<ChannelProcessRequestImpl>(external_from_this(), requester, pvRequest);
}
virtual ChannelGet::shared_pointer createChannelGet(
ChannelGetRequester::shared_pointer const & requester,
epics::pvData::PVStructure::shared_pointer const & pvRequest)
{
return BaseRequestImpl::build<ChannelGetImpl>(shared_from_this(), requester, pvRequest);
return BaseRequestImpl::build<ChannelGetImpl>(external_from_this(), requester, pvRequest);
}
virtual ChannelPut::shared_pointer createChannelPut(
ChannelPutRequester::shared_pointer const & requester,
epics::pvData::PVStructure::shared_pointer const & pvRequest)
{
return BaseRequestImpl::build<ChannelPutImpl>(shared_from_this(), requester, pvRequest);
return BaseRequestImpl::build<ChannelPutImpl>(external_from_this(), requester, pvRequest);
}
virtual ChannelPutGet::shared_pointer createChannelPutGet(
ChannelPutGetRequester::shared_pointer const & requester,
epics::pvData::PVStructure::shared_pointer const & pvRequest)
{
return BaseRequestImpl::build<ChannelPutGetImpl>(shared_from_this(), requester, pvRequest);
return BaseRequestImpl::build<ChannelPutGetImpl>(external_from_this(), requester, pvRequest);
}
virtual ChannelRPC::shared_pointer createChannelRPC(
ChannelRPCRequester::shared_pointer const & requester,
epics::pvData::PVStructure::shared_pointer const & pvRequest)
{
return BaseRequestImpl::build<ChannelRPCImpl>(shared_from_this(), requester, pvRequest);
return BaseRequestImpl::build<ChannelRPCImpl>(external_from_this(), requester, pvRequest);
}
virtual Monitor::shared_pointer createMonitor(
MonitorRequester::shared_pointer const & requester,
epics::pvData::PVStructure::shared_pointer const & pvRequest)
{
return BaseRequestImpl::build<ChannelMonitorImpl>(shared_from_this(), requester, pvRequest);
return BaseRequestImpl::build<ChannelMonitorImpl>(external_from_this(), requester, pvRequest);
}
virtual ChannelArray::shared_pointer createChannelArray(
ChannelArrayRequester::shared_pointer const & requester,
epics::pvData::PVStructure::shared_pointer const & pvRequest)
{
return BaseRequestImpl::build<ChannelArrayImpl>(shared_from_this(), requester, pvRequest);
return BaseRequestImpl::build<ChannelArrayImpl>(external_from_this(), requester, pvRequest);
}
@@ -4188,8 +4197,10 @@ private:
osiSockAttach();
m_timer.reset(new Timer("pvAccess-client timer", lowPriority));
Context::shared_pointer thisPointer = shared_from_this();
// stores weak_ptr
m_connector.reset(new BlockingTCPConnector(thisPointer, m_receiveBufferSize, m_connectionTimeout));
// stores many weak_ptr
m_responseHandler.reset(new ClientResponseHandler(shared_from_this()));
// preinitialize security plugins
@@ -4456,6 +4467,7 @@ private:
BeaconHandler::shared_pointer handler;
if (it == m_beaconHandlers.end())
{
// stores weak_ptr
handler.reset(new BeaconHandler(shared_from_this(), protocol, responseFrom));
m_beaconHandlers[*responseFrom] = handler;
}
@@ -4524,35 +4536,6 @@ private:
}
}
/**
* Destroy channel.
* @param channel
* @param force
* @throws PVAException
* @throws std::runtime_error
*/
void destroyChannel(ChannelImpl::shared_pointer const & channel, bool force) {
string name = channel->getChannelName();
bool lockAcquired = true; //namedLocker->acquireSynchronizationObject(name, LOCK_TIMEOUT);
if (lockAcquired)
{
try
{
channel->destroyChannel(force);
}
catch(...) {
// TODO
}
// TODO namedLocker->releaseSynchronizationObject(channel.getChannelName());
}
else
{
// TODO is this OK?
throw std::runtime_error("Failed to obtain synchronization lock for '" + name + "', possible deadlock.");
}
}
virtual void configure(epics::pvData::PVStructure::shared_pointer configuration)
{ // remove?
if (m_transportRegistry.numberOfActiveTransports() > 0)
@@ -4905,17 +4888,16 @@ public:
void InternalClientContextImpl::InternalChannelImpl::getField(GetFieldRequester::shared_pointer const & requester,std::string const & subField)
{
ChannelGetFieldRequestImpl::shared_pointer self(new ChannelGetFieldRequestImpl(shared_from_this(), requester, subField));
ChannelGetFieldRequestImpl::shared_pointer self(new ChannelGetFieldRequestImpl(internal_from_this(), requester, subField));
self->activate();
// activate() stores self in channel
}
ChannelProvider::shared_pointer createClientProvider(const Configuration::shared_pointer& conf)
{
InternalClientContextImpl::shared_pointer internal(new InternalClientContextImpl(conf)),
external(internal.get(), Destroyable::cleaner(internal));
internal->initialize();
return external;
InternalClientContextImpl::shared_pointer prov(new InternalClientContextImpl(conf));
prov->initialize();
return prov;
}
}