changes for new-pva-api; improve pvaClientMonitor

This commit is contained in:
mrkraimer
2017-06-28 10:36:23 -04:00
parent 91e6392e77
commit d8ab89e96a
3 changed files with 49 additions and 107 deletions

View File

@@ -28,7 +28,6 @@
#include <pv/standardField.h>
#include <pv/standardPVField.h>
#include <pv/createRequest.h>
#include <pv/executor.h>
#include <pv/nt.h>
#ifdef pvaClientEpicsExportSharedSymbols
@@ -192,10 +191,6 @@ typedef std::tr1::shared_ptr<PvaClientGetCache> PvaClientGetCachePtr;
class PvaClientPutCache;
typedef std::tr1::shared_ptr<PvaClientPutCache> PvaClientPutCachePtr;
// NOTE: must use separate class that implements ChannelRequester,
// because pvAccess holds a shared_ptr to ChannelRequester instead of weak_pointer
class ChannelRequesterImpl;
typedef std::tr1::shared_ptr<ChannelRequesterImpl> ChannelRequesterImplPtr;
/**
* @brief A callback for change in connection status.
@@ -225,6 +220,7 @@ public:
*/
class epicsShareClass PvaClientChannel :
public epics::pvAccess::ChannelRequester,
public std::tr1::enable_shared_from_this<PvaClientChannel>
{
public:
@@ -431,18 +427,6 @@ public:
*/
void destroy() EPICS_DEPRECATED {}
private:
//ChannelRequester methods
void channelCreated(
const epics::pvData::Status& status,
epics::pvAccess::Channel::shared_pointer const & channel);
void channelStateChange(
epics::pvAccess::Channel::shared_pointer const & channel,
epics::pvAccess::Channel::ConnectionState connectionState);
std::string getRequesterName();
void message(
std::string const & message,
epics::pvData::MessageType messageType);
static PvaClientChannelPtr create(
PvaClientPtr const &pvaClient,
std::string const & channelName,
@@ -466,11 +450,18 @@ private:
epics::pvData::Mutex mutex;
epics::pvData::Event waitForConnect;
epics::pvAccess::Channel::shared_pointer channel;
epics::pvAccess::ChannelProvider::shared_pointer channelProvider;
PvaClientChannelStateChangeRequesterWPtr stateChangeRequester;
ChannelRequesterImplPtr channelRequester;
public:
virtual std::string getRequesterName();
virtual void message(std::string const & message, epics::pvData::MessageType messageType);
virtual void channelCreated(
const epics::pvData::Status& status,
epics::pvAccess::Channel::shared_pointer const & channel);
virtual void channelStateChange(
epics::pvAccess::Channel::shared_pointer const & channel,
epics::pvAccess::Channel::ConnectionState connectionState);
friend class PvaClient;
friend class ChannelRequesterImpl;
};
/**
@@ -1406,7 +1397,6 @@ typedef std::tr1::shared_ptr<MonitorRequesterImpl> MonitorRequesterImplPtr;
class epicsShareClass PvaClientMonitor :
public PvaClientChannelStateChangeRequester,
public PvaClientMonitorRequester,
public epics::pvData::Command,
public std::tr1::enable_shared_from_this<PvaClientMonitor>
{
public:
@@ -1499,7 +1489,6 @@ public:
*/
void destroy() EPICS_DEPRECATED {}
private:
static epics::pvData::ExecutorPtr executor;
std::string getRequesterName();
void message(std::string const & message,epics::pvData::MessageType messageType);
void monitorConnect(
@@ -1515,7 +1504,7 @@ private:
epics::pvData::PVStructurePtr const &pvRequest);
void checkMonitorState();
enum MonitorConnectState {connectIdle,connectActive,connected};
enum MonitorConnectState {connectIdle,connectWait,connectActive,connected};
PvaClient::weak_pointer pvaClient;
PvaClientChannelPtr pvaClientChannel;
@@ -1541,7 +1530,6 @@ private:
public:
void channelStateChange(PvaClientChannelPtr const & channel, bool isConnected);
void event(PvaClientMonitorPtr const & monitor);
void command();
friend class MonitorRequesterImpl;
};

View File

@@ -122,60 +122,12 @@ size_t PvaClientPutCache::cacheSize()
}
class epicsShareClass ChannelRequesterImpl : public ChannelRequester
{
PvaClientChannel::weak_pointer pvaClientChannel;
PvaClient::weak_pointer pvaClient;
public:
ChannelRequesterImpl(
PvaClientChannelPtr const & pvaClientChannel,
PvaClientPtr const &pvaClient)
: pvaClientChannel(pvaClientChannel),
pvaClient(pvaClient)
{}
virtual ~ChannelRequesterImpl() {
if(PvaClient::getDebug()) std::cout << "~ChannelRequesterImpl" << std::endl;
}
virtual std::string getRequesterName() {
PvaClientChannelPtr clientChannel(pvaClientChannel.lock());
if(!clientChannel) return string("clientChannel is null");
return clientChannel->getRequesterName();
}
virtual void message(std::string const & message, epics::pvData::MessageType messageType) {
PvaClientChannelPtr clientChannel(pvaClientChannel.lock());
if(!clientChannel) return;
clientChannel->message(message,messageType);
}
virtual void channelCreated(
const epics::pvData::Status& status,
Channel::shared_pointer const & channel)
{
PvaClientChannelPtr clientChannel(pvaClientChannel.lock());
if(!clientChannel) return;
clientChannel->channelCreated(status,channel);
}
virtual void channelStateChange(
Channel::shared_pointer const & channel,
Channel::ConnectionState connectionState)
{
PvaClientChannelPtr clientChannel(pvaClientChannel.lock());
if(!clientChannel) return;
clientChannel->channelStateChange(channel,connectionState);
}
};
PvaClientChannelPtr PvaClientChannel::create(
PvaClientPtr const &pvaClient,
string const & channelName,
string const & providerName)
{
PvaClientChannelPtr channel(new PvaClientChannel(pvaClient,channelName,providerName));
channel->channelRequester = ChannelRequesterImplPtr(
new ChannelRequesterImpl(channel,pvaClient));
return channel;
}
@@ -209,6 +161,7 @@ PvaClientChannel::~PvaClientChannel()
void PvaClientChannel::channelCreated(const Status& status, Channel::shared_pointer const & channel)
{
cout << "PvaClientChannel::channelCreated channel\n" << channel.get() << endl;
if(PvaClient::getDebug()) {
cout << "PvaClientChannel::channelCreated"
<< " channelName " << channelName
@@ -240,6 +193,7 @@ void PvaClientChannel::channelStateChange(
Channel::shared_pointer const & channel,
Channel::ConnectionState connectionState)
{
cout << "PvaClientChannel::channelStateChange channel\n" << channel.get() << endl;
if(PvaClient::getDebug()) {
cout << " PvaClientChannel::channelStateChange "
<< " channelName " << channelName
@@ -333,12 +287,14 @@ void PvaClientChannel::issueConnect()
connectState = connectActive;
}
ChannelProviderRegistry::shared_pointer reg(getChannelProviderRegistry());;
ChannelProvider::shared_pointer provider = reg->getProvider(providerName);
if(!provider) {
channelProvider = reg->getProvider(providerName);
if(!channelProvider) {
throw std::runtime_error(channelName + " provider " + providerName + " not registered");
}
if(PvaClient::getDebug()) cout << "PvaClientChannel::issueConnect calling provider->createChannel\n";
channel = provider->createChannel(channelName,channelRequester,ChannelProvider::PRIORITY_DEFAULT);
cout << "before provider->createChannel channel " << channel.get() << endl;
channel = channelProvider->createChannel(channelName,shared_from_this(),ChannelProvider::PRIORITY_DEFAULT);
cout << "after provider->createChannel channel " << channel.get() << endl;
if(!channel) {
throw std::runtime_error(channelName + " channelCreate failed ");
}

View File

@@ -25,8 +25,6 @@ using namespace std;
namespace epics { namespace pvaClient {
ExecutorPtr PvaClientMonitor::executor(new Executor("pvaClientMonitor",middlePriority));
class MonitorRequesterImpl : public MonitorRequester
{
PvaClientMonitor::weak_pointer pvaClientMonitor;
@@ -114,26 +112,19 @@ PvaClientMonitorPtr PvaClientMonitor::create(
CreateRequest::shared_pointer createRequest(CreateRequest::create());
PVStructurePtr pvRequest(createRequest->createRequest(request));
if(!pvRequest) throw std::runtime_error(createRequest->getMessage());
cout << "calling pvaClient->createChannel\n";
PvaClientChannelPtr pvaClientChannel = pvaClient->createChannel(channelName,providerName);
cout << "calling createMonitor\n";
PvaClientMonitorPtr clientMonitor(new PvaClientMonitor(pvaClient,pvaClientChannel,pvRequest));
cout << "after calling createMonitor\n";
clientMonitor->monitorRequester = MonitorRequesterImplPtr(
new MonitorRequesterImpl(clientMonitor,pvaClient));
if(stateChangeRequester) clientMonitor->pvaClientChannelStateChangeRequester = stateChangeRequester;
if(monitorRequester) clientMonitor->pvaClientMonitorRequester = monitorRequester;
cout << "calling init\n";
clientMonitor->init();
cout << "after calling init\n";
return clientMonitor;
}
void PvaClientMonitor::init()
{
cout << "init calling setStateChangeRequester\n";
pvaClientChannel->setStateChangeRequester(shared_from_this());
cout << "init calling issueConnect\n";
pvaClientChannel->issueConnect();
}
@@ -178,12 +169,12 @@ void PvaClientMonitor::channelStateChange(PvaClientChannelPtr const & channel, b
}
if(isConnected&&!monitor)
{
if(PvaClient::getDebug()) cout<< "PvaClientMonitor::channelStateChange calling executor.execute\n";
executor->execute(shared_from_this());
PvaClientChannelStateChangeRequesterPtr req(pvaClientChannelStateChangeRequester.lock());
if(req) {
req->channelStateChange(channel,isConnected);
}
connectState = connectActive;
monitor = pvaClientChannel->getChannel()->createMonitor(monitorRequester,pvRequest);
}
PvaClientChannelStateChangeRequesterPtr req(pvaClientChannelStateChangeRequester.lock());
if(req) {
req->channelStateChange(channel,isConnected);
}
}
@@ -193,13 +184,6 @@ void PvaClientMonitor::event(PvaClientMonitorPtr const & monitor)
if(req) req->event(monitor);
}
void PvaClientMonitor::command()
{
if(PvaClient::getDebug()) cout<< "PvaClientMonitor::command\n";
connect();
if(connectState==connected && !isStarted) start();
}
void PvaClientMonitor::checkMonitorState()
{
if(PvaClient::getDebug()) {
@@ -237,9 +221,18 @@ void PvaClientMonitor::monitorConnect(
<< " status.isOK " << (status.isOK() ? "true" : "false")
<< endl;
}
if(!status.isOK()) {
this->monitor.reset();
connectState = connectIdle;
string message = string("PvaClientMonitor::monitorConnect channel ")
+ pvaClientChannel->getChannel()->getChannelName()
+ " "
+ status.getMessage();
throw std::runtime_error(message);
}
bool signal = (connectState==connectWait) ? true : false;
connectStatus = status;
connectState = connected;
this->monitor = monitor;
if(isStarted) {
if(PvaClient::getDebug()) {
cout << "PvaClientMonitor::monitorConnect"
@@ -249,14 +242,19 @@ void PvaClientMonitor::monitorConnect(
}
return;
}
if(status.isOK()) {
pvaClientData = PvaClientMonitorData::create(structure);
pvaClientData->setMessagePrefix(pvaClientChannel->getChannel()->getChannelName());
pvaClientData = PvaClientMonitorData::create(structure);
pvaClientData->setMessagePrefix(pvaClientChannel->getChannel()->getChannelName());
if(signal) {
if(PvaClient::getDebug()) {
cout << "PvaClientMonitor::monitorConnect calling waitForConnect.signal\n";
}
waitForConnect.signal();
} else {
if(PvaClient::getDebug()) {
cout << "PvaClientMonitor::monitorConnect calling start\n";
}
start();
}
if(PvaClient::getDebug()) {
cout << "PvaClientMonitor::monitorConnect calling waitForConnect.signal\n";
}
waitForConnect.signal();
}
@@ -306,7 +304,7 @@ void PvaClientMonitor::issueConnect()
+ " pvaClientMonitor already connected ";
throw std::runtime_error(message);
}
connectState = connectActive;
connectState = connectWait;
monitor = pvaClientChannel->getChannel()->createMonitor(monitorRequester,pvRequest);
}
@@ -321,7 +319,7 @@ Status PvaClientMonitor::waitConnect()
if(!connectStatus.isOK()) connectState = connectIdle;
return connectStatus;
}
if(connectState!=connectActive) {
if(connectState!=connectWait) {
string message = string("channel ")
+ pvaClientChannel->getChannel()->getChannelName()
+ " PvaClientMonitor::waitConnect illegal connect state ";