pva client break ref loop with ChannelRequester

pvutils: remove ChannelRequesterImpl
This commit is contained in:
Michael Davidsaver
2017-07-07 18:41:31 +02:00
parent e5a4fcba38
commit 006224b4d1
6 changed files with 75 additions and 191 deletions

View File

@@ -1340,6 +1340,13 @@ public:
}
}
virtual void channelDisconnect(bool destroy)
{
if(!destroy)
std::cerr << std::setw(30) << std::left << m_channelName
<< ' ' << "*** disconnected" << std::endl;
}
virtual void monitorEvent(Monitor::shared_pointer const & monitor)
{
@@ -1772,12 +1779,8 @@ int main (int argc, char *argv[])
for (int n = 0; n < nPvs; n++)
{
if(!providers[n]) continue;
TR1::shared_ptr<ChannelRequesterImpl> channelRequesterImpl(new ChannelRequesterImpl(quiet));
if (pvsAddress[n].empty())
channels[n] = providers[n]->createChannel(pvs[n], channelRequesterImpl);
else
channels[n] = providers[n]->createChannel(pvs[n], channelRequesterImpl,
ChannelProvider::PRIORITY_DEFAULT, pvsAddress[n]);
channels[n] = providers[n]->createChannel(pvs[n], DefaultChannelRequester::build(),
ChannelProvider::PRIORITY_DEFAULT, pvsAddress[n]);
if(!channels[n]) {
std::cerr<<"No such channel '"<<pvs[n]<<"'\n";
@@ -1808,93 +1811,63 @@ int main (int argc, char *argv[])
if (monitor)
{
TR1::shared_ptr<ChannelRequesterImpl> channelRequesterImpl = TR1::dynamic_pointer_cast<ChannelRequesterImpl>(channel->getChannelRequester());
channelRequesterImpl->showDisconnectMessage();
// TODO remove this line, when CA provider will allow creation of monitors
// when channels is yet not connected
if (channelRequesterImpl->waitUntilConnected(timeOut))
{
TR1::shared_ptr<MonitorRequesterImpl> monitorRequesterImpl(new MonitorRequesterImpl(channel->getChannelName()));
operations.push_back(channel->createMonitor(monitorRequesterImpl, pvRequest));
}
else
{
allOK = false;
channel->destroy();
std::cerr << "[" << channel->getChannelName() << "] connection timeout" << std::endl;
}
TR1::shared_ptr<MonitorRequesterImpl> monitorRequesterImpl(new MonitorRequesterImpl(channel->getChannelName()));
operations.push_back(channel->createMonitor(monitorRequesterImpl, pvRequest));
}
else
{
/*
TR1::shared_ptr<ChannelRequesterImpl> channelRequesterImpl(new ChannelRequesterImpl());
Channel::shared_pointer channel = provider->createChannel(pvs[n], channelRequesterImpl);
*/
TR1::shared_ptr<GetFieldRequesterImpl> getFieldRequesterImpl;
TR1::shared_ptr<ChannelRequesterImpl> channelRequesterImpl = TR1::dynamic_pointer_cast<ChannelRequesterImpl>(channel->getChannelRequester());
if (channelRequesterImpl->waitUntilConnected(timeOut))
// probe for value field
// but only if there is only one PV request (otherwise mode change makes a mess)
if (mode == ValueOnlyMode && nPvs == 1)
{
TR1::shared_ptr<GetFieldRequesterImpl> getFieldRequesterImpl;
getFieldRequesterImpl.reset(new GetFieldRequesterImpl(channel));
// get all to be immune to bad clients not supporting selective getField request
channel->getField(getFieldRequesterImpl, "");
}
// probe for value field
// but only if there is only one PV request (otherwise mode change makes a mess)
if (mode == ValueOnlyMode && nPvs == 1)
if (getFieldRequesterImpl.get() == 0 ||
getFieldRequesterImpl->waitUntilFieldGet(timeOut))
{
// check probe
if (getFieldRequesterImpl.get())
{
getFieldRequesterImpl.reset(new GetFieldRequesterImpl(channel));
// get all to be immune to bad clients not supporting selective getField request
channel->getField(getFieldRequesterImpl, "");
}
if (getFieldRequesterImpl.get() == 0 ||
getFieldRequesterImpl->waitUntilFieldGet(timeOut))
{
// check probe
if (getFieldRequesterImpl.get())
Structure::const_shared_pointer structure =
TR1::dynamic_pointer_cast<const Structure>(getFieldRequesterImpl->getField());
if (structure.get() == 0 || structure->getField("value").get() == 0)
{
Structure::const_shared_pointer structure =
TR1::dynamic_pointer_cast<const Structure>(getFieldRequesterImpl->getField());
if (structure.get() == 0 || structure->getField("value").get() == 0)
{
// fallback to structure
mode = StructureMode;
pvRequest = CreateRequest::create()->createRequest("field()");
}
}
TR1::shared_ptr<ChannelGetRequesterImpl> getRequesterImpl(
new ChannelGetRequesterImpl(channel->getChannelName(), false)
);
ChannelGet::shared_pointer channelGet = channel->createChannelGet(getRequesterImpl, pvRequest);
bool ok = getRequesterImpl->waitUntilGet(timeOut);
allOK &= ok;
if (ok)
{
if (collectValues)
{
collectedValues.push_back(getRequesterImpl->getPVStructure());
collectedNames.push_back(channel->getChannelName());
}
else
{
// print immediately
printValue(channel->getChannelName(), getRequesterImpl->getPVStructure(), fromStream);
}
// fallback to structure
mode = StructureMode;
pvRequest = CreateRequest::create()->createRequest("field()");
}
}
else
TR1::shared_ptr<ChannelGetRequesterImpl> getRequesterImpl(
new ChannelGetRequesterImpl(channel->getChannelName(), false)
);
ChannelGet::shared_pointer channelGet = channel->createChannelGet(getRequesterImpl, pvRequest);
bool ok = getRequesterImpl->waitUntilGet(timeOut);
allOK &= ok;
if (ok)
{
allOK = false;
channel->destroy();
std::cerr << "[" << channel->getChannelName() << "] failed to get channel introspection data" << std::endl;
if (collectValues)
{
collectedValues.push_back(getRequesterImpl->getPVStructure());
collectedNames.push_back(channel->getChannelName());
}
else
{
// print immediately
printValue(channel->getChannelName(), getRequesterImpl->getPVStructure(), fromStream);
}
}
}
else
{
allOK = false;
channel->destroy();
std::cerr << "[" << channel->getChannelName() << "] connection timeout" << std::endl;
std::cerr << "[" << channel->getChannelName() << "] failed to get channel introspection data" << std::endl;
}
}
}
@@ -2053,50 +2026,39 @@ int main (int argc, char *argv[])
ChannelProvider::shared_pointer provider = ChannelProviderRegistry::clients()->getProvider("pva");
assert(provider);
TR1::shared_ptr<ChannelRequesterImpl> channelRequesterImpl(new ChannelRequesterImpl(quiet));
Channel::shared_pointer channel =
authority.empty() ?
provider->createChannel(service, channelRequesterImpl) :
provider->createChannel(service, channelRequesterImpl,
provider->createChannel(service, DefaultChannelRequester::build(),
ChannelProvider::PRIORITY_DEFAULT, authority);
if (channelRequesterImpl->waitUntilConnected(timeOut))
{
TR1::shared_ptr<ChannelRPCRequesterImpl> rpcRequesterImpl(new ChannelRPCRequesterImpl(channel->getChannelName()));
ChannelRPC::shared_pointer channelRPC = channel->createChannelRPC(rpcRequesterImpl, pvRequest);
TR1::shared_ptr<ChannelRPCRequesterImpl> rpcRequesterImpl(new ChannelRPCRequesterImpl(channel->getChannelName()));
ChannelRPC::shared_pointer channelRPC = channel->createChannelRPC(rpcRequesterImpl, pvRequest);
if (rpcRequesterImpl->waitUntilConnected(timeOut))
if (rpcRequesterImpl->waitUntilConnected(timeOut))
{
channelRPC->lastRequest();
channelRPC->request(arg);
allOK &= rpcRequesterImpl->waitUntilRPC(timeOut);
if (allOK)
{
channelRPC->lastRequest();
channelRPC->request(arg);
allOK &= rpcRequesterImpl->waitUntilRPC(timeOut);
if (allOK)
if (dumpStructure)
{
if (dumpStructure)
{
if (rpcRequesterImpl->getLastResponse().get() == 0)
std::cout << "(null)" << std::endl;
else
{
//std::cout << *(rpcRequesterImpl->getLastResponse().get()) << std::endl;
pvutil_ostream myos(std::cout.rdbuf());
myos << *(rpcRequesterImpl->getLastResponse().get()) << std::endl;
}
}
if (rpcRequesterImpl->getLastResponse().get() == 0)
std::cout << "(null)" << std::endl;
else
formatNT(std::cout, rpcRequesterImpl->getLastResponse());
std::cout << std::endl;
{
//std::cout << *(rpcRequesterImpl->getLastResponse().get()) << std::endl;
pvutil_ostream myos(std::cout.rdbuf());
myos << *(rpcRequesterImpl->getLastResponse().get()) << std::endl;
}
}
}
else
{
allOK = false;
else
formatNT(std::cout, rpcRequesterImpl->getLastResponse());
std::cout << std::endl;
}
}
else
{
allOK = false;
std::cerr << "[" << channel->getChannelName() << "] connection timeout" << std::endl;
}
channel->destroy();

View File

@@ -604,8 +604,6 @@ int main (int argc, char *argv[])
URI uri;
bool validURI = URI::parse(pv, uri);
TR1::shared_ptr<ChannelRequesterImpl> channelRequesterImpl(new ChannelRequesterImpl(quiet));
string providerName(defaultProvider);
string pvName(pv);
string address(noAddress);

View File

@@ -437,64 +437,6 @@ char *url_encode(const char *str) {
return buf;
}
ChannelRequesterImpl::ChannelRequesterImpl(bool _printOnlyErrors) :
showDisconnectMsg(false)
{
}
string ChannelRequesterImpl::getRequesterName()
{
return "ChannelRequesterImpl";
}
void ChannelRequesterImpl::channelCreated(const epics::pvData::Status& status, Channel::shared_pointer const & channel)
{
if (status.isSuccess())
{
// show warning
if (!status.isOK())
{
std::cerr << "[" << channel->getChannelName() << "] channel create: " << dump_stack_only_on_debug(status) << std::endl;
}
}
else
{
std::cerr << "[" << channel->getChannelName() << "] failed to create a channel: " << dump_stack_only_on_debug(status) << std::endl;
}
}
void ChannelRequesterImpl::channelStateChange(Channel::shared_pointer const & channel, Channel::ConnectionState connectionState)
{
if (connectionState == Channel::CONNECTED)
{
m_event.signal();
}
else if (showDisconnectMsg && connectionState == Channel::DISCONNECTED)
{
std::cerr << std::setw(30) << std::left << channel->getChannelName()
<< ' ' << "*** disconnected" << std::endl;
}
/*
else if (connectionState != Channel::DESTROYED)
{
std::cerr << "[" << channel->getChannelName() << "] channel state change: " << Channel::ConnectionStateNames[connectionState] << std::endl;
}
*/
}
bool ChannelRequesterImpl::waitUntilConnected(double timeOut)
{
return m_event.wait(timeOut);
}
void ChannelRequesterImpl::showDisconnectMessage(bool show)
{
showDisconnectMsg = show;
}
GetFieldRequesterImpl::GetFieldRequesterImpl(epics::pvAccess::Channel::shared_pointer channel) :
m_channel(channel)
{

View File

@@ -62,26 +62,6 @@ private:
std::string m_requesterName;
};
class ChannelRequesterImpl :
public epics::pvAccess::ChannelRequester
{
private:
epics::pvData::Event m_event;
bool showDisconnectMsg;
public:
ChannelRequesterImpl(bool printOnlyErrors = false);
virtual std::string getRequesterName();
virtual void channelCreated(const epics::pvData::Status& status, epics::pvAccess::Channel::shared_pointer const & channel);
virtual void channelStateChange(epics::pvAccess::Channel::shared_pointer const & channel, epics::pvAccess::Channel::ConnectionState connectionState);
bool waitUntilConnected(double timeOut);
void showDisconnectMessage(bool show = true);
};
class GetFieldRequesterImpl :
public epics::pvAccess::GetFieldRequester
{

View File

@@ -865,6 +865,8 @@ public:
/**
* The ChannelRequester passed to ChannelProvider::createChannel()
*
* @throws std::tr1::bad_weak_ptr
*/
virtual std::tr1::shared_ptr<ChannelRequester> getChannelRequester() = 0;

View File

@@ -3172,7 +3172,7 @@ public:
/**
* Channel requester.
*/
ChannelRequester::shared_pointer m_requester;
ChannelRequester::weak_pointer m_requester;
public:
//! The in-progress GetField operation.
@@ -3353,7 +3353,7 @@ private:
virtual ChannelRequester::shared_pointer getChannelRequester() OVERRIDE FINAL
{
return m_requester;
return ChannelRequester::shared_pointer(m_requester);
}
virtual ConnectionState getConnectionState() OVERRIDE FINAL
@@ -3695,7 +3695,7 @@ public:
if (!sockAddrAreIdentical(transport->getRemoteAddress(), serverAddress) &&
!std::equal(guid.value, guid.value + 12, m_guid.value))
{
EXCEPTION_GUARD(m_requester->message("More than one channel with name '" + m_name +
EXCEPTION_GUARD3(m_requester, req, req->message("More than one channel with name '" + m_name +
"' detected, connected to: " + inetAddressToString(*transport->getRemoteAddress()) + ", ignored: " + inetAddressToString(*serverAddress), warningMessage));
}
@@ -3851,7 +3851,7 @@ public:
}
}
EXCEPTION_GUARD(m_requester->channelStateChange(self, connectionState));
EXCEPTION_GUARD3(m_requester, req, req->channelStateChange(self, connectionState));
if(connectionState==Channel::DISCONNECTED || connectionState==Channel::DESTROYED) {
for(size_t i=0, N=ops.size(); i<N; i++) {