let pvaClientMonitor do what pvaMonitor was doing

This commit is contained in:
mrkraimer
2017-06-23 14:45:44 -04:00
parent f0efef68ea
commit 094336b5e0
5 changed files with 171 additions and 365 deletions

View File

@@ -25,6 +25,8 @@ using namespace std;
namespace epics { namespace pvaClient {
ExecutorPtr PvaClientMonitor::executor(new Executor("pvaClientMonitor",middlePriority));
class MonitorRequesterImpl : public MonitorRequester
{
PvaClientMonitor::weak_pointer pvaClientMonitor;
@@ -80,21 +82,67 @@ public:
PvaClientMonitorPtr PvaClientMonitor::create(
PvaClientPtr const &pvaClient,
Channel::shared_pointer const & channel,
PvaClientChannelPtr const & pvaClientChannel,
PVStructurePtr const &pvRequest)
{
PvaClientMonitorPtr epv(new PvaClientMonitor(pvaClient,channel,pvRequest));
epv->monitorRequester = MonitorRequesterImplPtr(
new MonitorRequesterImpl(epv,pvaClient));
return epv;
if(PvaClient::getDebug()) {
cout<< "PvaClientMonitor::create(pvaClient,pvaClientChannel,pvRequest)\n"
<< " channelName " << pvaClientChannel->getChannel()->getChannelName()
<< endl;
}
PvaClientMonitorPtr clientMonitor(new PvaClientMonitor(pvaClient,pvaClientChannel,pvRequest));
clientMonitor->monitorRequester = MonitorRequesterImplPtr(
new MonitorRequesterImpl(clientMonitor,pvaClient));
return clientMonitor;
}
PvaClientMonitorPtr PvaClientMonitor::create(
PvaClientPtr const &pvaClient,
std::string const & channelName,
std::string const & providerName,
std::string const & request,
PvaClientChannelStateChangeRequesterPtr const & stateChangeRequester,
PvaClientMonitorRequesterPtr const & monitorRequester)
{
if(PvaClient::getDebug()) {
cout<< "PvaClientMonitor::create(pvaClient,channelName,providerName,request,stateChangeRequester,monitorRequester)\n"
<< " channelName " << channelName
<< " providerName " << providerName
<< " request " << request
<< endl;
}
CreateRequest::shared_pointer createRequest(CreateRequest::create());
PVStructurePtr pvRequest(createRequest->createRequest(request));
if(!pvRequest) throw std::runtime_error(createRequest->getMessage());
cout << "calling pvaClient->createChannel\n";
PvaClientChannelPtr pvaClientChannel = pvaClient->createChannel(channelName,providerName);
cout << "calling createMonitor\n";
PvaClientMonitorPtr clientMonitor(new PvaClientMonitor(pvaClient,pvaClientChannel,pvRequest));
cout << "after calling createMonitor\n";
clientMonitor->monitorRequester = MonitorRequesterImplPtr(
new MonitorRequesterImpl(clientMonitor,pvaClient));
if(stateChangeRequester) clientMonitor->pvaClientChannelStateChangeRequester = stateChangeRequester;
if(monitorRequester) clientMonitor->pvaClientMonitorRequester = monitorRequester;
cout << "calling init\n";
clientMonitor->init();
cout << "after calling init\n";
return clientMonitor;
}
void PvaClientMonitor::init()
{
cout << "init calling setStateChangeRequester\n";
pvaClientChannel->setStateChangeRequester(shared_from_this());
cout << "init calling issueConnect\n";
pvaClientChannel->issueConnect();
}
PvaClientMonitor::PvaClientMonitor(
PvaClientPtr const &pvaClient,
Channel::shared_pointer const & channel,
PvaClientChannelPtr const & pvaClientChannel,
PVStructurePtr const &pvRequest)
: pvaClient(pvaClient),
channel(channel),
pvaClientChannel(pvaClientChannel),
pvRequest(pvRequest),
isStarted(false),
connectState(connectIdle),
@@ -102,9 +150,7 @@ PvaClientMonitor::PvaClientMonitor(
userWait(false)
{
if(PvaClient::getDebug()) {
cout<< "PvaClientMonitor::PvaClientMonitor()"
<< " channelName " << channel->getChannelName()
<< endl;
cout<< "PvaClientMonitor::PvaClientMonitor()" << endl;
}
}
@@ -112,11 +158,8 @@ PvaClientMonitor::~PvaClientMonitor()
{
if(PvaClient::getDebug()) cout<< "PvaClientMonitor::~PvaClientMonitor\n";
if(PvaClient::getDebug()) {
string channelName("disconnected");
Channel::shared_pointer chan(channel.lock());
if(chan) channelName = chan->getChannelName();
cout<< "PvaClientMonitor::~PvaClientMonitor"
<< " channelName " << channelName
<< " channelName " << pvaClientChannel->getChannel()->getChannelName()
<< endl;
}
if(monitor) {
@@ -125,14 +168,43 @@ PvaClientMonitor::~PvaClientMonitor()
}
}
void PvaClientMonitor::channelStateChange(PvaClientChannelPtr const & channel, bool isConnected)
{
if(PvaClient::getDebug()) {
cout<< "PvaClientMonitor::channelStateChange"
<< " channelName " << channel->getChannelName()
<< " isConnected " << (isConnected ? "true" : "false")
<< endl;
}
if(isConnected&&!monitor)
{
if(PvaClient::getDebug()) cout<< "PvaClientMonitor::channelStateChange calling executor.execute\n";
executor->execute(shared_from_this());
PvaClientChannelStateChangeRequesterPtr req(pvaClientChannelStateChangeRequester.lock());
if(req) {
req->channelStateChange(channel,isConnected);
}
}
}
void PvaClientMonitor::event(PvaClientMonitorPtr const & monitor)
{
PvaClientMonitorRequesterPtr req(pvaClientMonitorRequester.lock());
if(req) req->event(monitor);
}
void PvaClientMonitor::command()
{
if(PvaClient::getDebug()) cout<< "PvaClientMonitor::command\n";
connect();
if(connectState==connected && !isStarted) start();
}
void PvaClientMonitor::checkMonitorState()
{
if(PvaClient::getDebug()) {
string channelName("disconnected");
Channel::shared_pointer chan(channel.lock());
if(chan) channelName = chan->getChannelName();
cout << "PvaClientMonitor::checkMonitorState"
<< " channelName " << channelName
<< " channelName " << pvaClientChannel->getChannel()->getChannelName()
<< " connectState " << connectState
<< endl;
}
@@ -159,13 +231,9 @@ void PvaClientMonitor::monitorConnect(
Monitor::shared_pointer const & monitor,
StructureConstPtr const & structure)
{
Channel::shared_pointer chan(channel.lock());
if(PvaClient::getDebug()) {
string channelName("disconnected");
Channel::shared_pointer chan(channel.lock());
if(chan) channelName = chan->getChannelName();
cout << "PvaClientMonitor::monitorConnect"
<< " channelName " << channelName
<< " channelName " << pvaClientChannel->getChannel()->getChannelName()
<< " status.isOK " << (status.isOK() ? "true" : "false")
<< endl;
}
@@ -174,19 +242,16 @@ void PvaClientMonitor::monitorConnect(
this->monitor = monitor;
if(isStarted) {
if(PvaClient::getDebug()) {
string channelName("disconnected");
Channel::shared_pointer chan(channel.lock());
if(chan) channelName = chan->getChannelName();
cout << "PvaClientMonitor::monitorConnect"
<< " channelName " << channelName
<< " channelName " << pvaClientChannel->getChannel()->getChannelName()
<< " is already started "
<< endl;
}
return;
}
if(status.isOK() && chan) {
if(status.isOK()) {
pvaClientData = PvaClientMonitorData::create(structure);
pvaClientData->setMessagePrefix(chan->getChannelName());
pvaClientData->setMessagePrefix(pvaClientChannel->getChannel()->getChannelName());
}
if(PvaClient::getDebug()) {
cout << "PvaClientMonitor::monitorConnect calling waitForConnect.signal\n";
@@ -198,11 +263,8 @@ void PvaClientMonitor::monitorConnect(
void PvaClientMonitor::monitorEvent(MonitorPtr const & monitor)
{
if(PvaClient::getDebug()) {
string channelName("disconnected");
Channel::shared_pointer chan(channel.lock());
if(chan) channelName = chan->getChannelName();
cout << "PvaClientMonitor::monitorEvent"
<< " channelName " << channelName
<< " channelName " << pvaClientChannel->getChannel()->getChannelName()
<< endl;
}
PvaClientMonitorRequesterPtr req = pvaClientMonitorRequester.lock();
@@ -218,10 +280,7 @@ void PvaClientMonitor::unlisten(MonitorPtr const & monitor)
req->unlisten();
return;
}
string channelName("disconnected");
Channel::shared_pointer chan(channel.lock());
if(chan) channelName = chan->getChannelName();
cerr << channelName + "pvaClientMonitor::unlisten called but no PvaClientMonitorRequester\n";
cerr << pvaClientChannel->getChannel()->getChannelName() + "pvaClientMonitor::unlisten called but no PvaClientMonitorRequester\n";
}
@@ -231,11 +290,8 @@ void PvaClientMonitor::connect()
issueConnect();
Status status = waitConnect();
if(status.isOK()) return;
Channel::shared_pointer chan(channel.lock());
string channelName("disconnected");
if(chan) channelName = chan->getChannelName();
string message = string("channel ")
+ channelName
+ pvaClientChannel->getChannel()->getChannelName()
+ " PvaClientMonitor::connect "
+ status.getMessage();
throw std::runtime_error(message);
@@ -244,36 +300,30 @@ void PvaClientMonitor::connect()
void PvaClientMonitor::issueConnect()
{
if(PvaClient::getDebug()) cout << "PvaClientMonitor::issueConnect\n";
Channel::shared_pointer chan(channel.lock());
if(connectState!=connectIdle) {
string channelName("disconnected");
if(chan) channelName = chan->getChannelName();
string message = string("channel ")
+ channelName
+ pvaClientChannel->getChannel()->getChannelName()
+ " pvaClientMonitor already connected ";
throw std::runtime_error(message);
}
if(chan) {
connectState = connectActive;
monitor = chan->createMonitor(monitorRequester,pvRequest);
return;
}
throw std::runtime_error("PvaClientMonitor::issueConnect() but channel disconnected");
connectState = connectActive;
monitor = pvaClientChannel->getChannel()->createMonitor(monitorRequester,pvRequest);
}
Status PvaClientMonitor::waitConnect()
{
if(PvaClient::getDebug()) cout << "PvaClientMonitor::waitConnect\n";
if(PvaClient::getDebug()) {
cout << "PvaClientMonitor::waitConnect"
<< pvaClientChannel->getChannel()->getChannelName()
<< endl;
}
if(connectState==connected) {
if(!connectStatus.isOK()) connectState = connectIdle;
return connectStatus;
}
if(connectState!=connectActive) {
Channel::shared_pointer chan(channel.lock());
string channelName("disconnected");
if(chan) channelName = chan->getChannelName();
string message = string("channel ")
+ channelName
+ pvaClientChannel->getChannel()->getChannelName()
+ " PvaClientMonitor::waitConnect illegal connect state ";
throw std::runtime_error(message);
}
@@ -292,11 +342,8 @@ Status PvaClientMonitor::waitConnect()
void PvaClientMonitor::setRequester(PvaClientMonitorRequesterPtr const & pvaClientMonitorRequester)
{
if(PvaClient::getDebug()) {
string channelName("disconnected");
Channel::shared_pointer chan(channel.lock());
if(chan) channelName = chan->getChannelName();
cout << "PvaClientMonitor::setRequester"
<< " channelName " << channelName
<< " channelName " << pvaClientChannel->getChannel()->getChannelName()
<< endl;
}
this->pvaClientMonitorRequester = pvaClientMonitorRequester;
@@ -305,30 +352,21 @@ void PvaClientMonitor::setRequester(PvaClientMonitorRequesterPtr const & pvaClie
void PvaClientMonitor::start()
{
if(PvaClient::getDebug()) {
string channelName("disconnected");
Channel::shared_pointer chan(channel.lock());
if(chan) channelName = chan->getChannelName();
cout << "PvaClientMonitor::start"
<< " channelName " << channelName
<< " channelName " << pvaClientChannel->getChannel()->getChannelName()
<< " connectState " << connectState
<< endl;
}
if(isStarted) {
string channelName("disconnected");
Channel::shared_pointer chan(channel.lock());
if(chan) channelName = chan->getChannelName();
cerr << "PvaClientMonitor::start"
<< " channelName " << channelName
<< " channelName " << pvaClientChannel->getChannel()->getChannelName()
<< " why is this called twice "
<< endl;
return;
}
if(connectState==connectIdle) connect();
if(connectState!=connected) {
Channel::shared_pointer chan(channel.lock());
string channelName("disconnected");
if(chan) channelName = chan->getChannelName();
string message = string("channel ") + channelName
string message = string("channel ") + pvaClientChannel->getChannel()->getChannelName()
+ " PvaClientMonitor::start illegal state ";
throw std::runtime_error(message);
}
@@ -336,15 +374,47 @@ void PvaClientMonitor::start()
monitor->start();
}
void PvaClientMonitor::start(string const & request)
{
if(PvaClient::getDebug()) {
cout<< "PvaMonitor::start(request)"
<< " request " << request
<< endl;
}
PvaClientPtr client(pvaClient.lock());
if(!client) throw std::runtime_error("pvaClient was deleted");
if(!pvaClientChannel->getChannel()->isConnected()) {
client->message(
"PvaClientMonitor::start(request) but not connected",
MessageType::errorMessage);
return;
}
CreateRequest::shared_pointer createRequest(CreateRequest::create());
PVStructurePtr pvr(createRequest->createRequest(request));
if(!pvr) throw std::runtime_error(createRequest->getMessage());
if(monitor) {
if(isStarted) monitor->stop();
monitor->destroy();
}
monitorRequester.reset();
monitor.reset();
isStarted = false;
connectState = connectIdle;
userPoll = false;
userWait = false;
monitorRequester = MonitorRequesterImplPtr(
new MonitorRequesterImpl(shared_from_this(),client));
pvRequest = pvr;
connect();
start();
}
void PvaClientMonitor::stop()
{
if(PvaClient::getDebug()) {
string channelName("disconnected");
Channel::shared_pointer chan(channel.lock());
if(chan) channelName = chan->getChannelName();
cout << "PvaClientMonitor::stop"
<< " channelName " << channelName
<< " channelName " << pvaClientChannel->getChannel()->getChannelName()
<< endl;
}
if(!isStarted) return;
@@ -355,27 +425,18 @@ void PvaClientMonitor::stop()
bool PvaClientMonitor::poll()
{
if(PvaClient::getDebug()) {
string channelName("disconnected");
Channel::shared_pointer chan(channel.lock());
if(chan) channelName = chan->getChannelName();
cout << "PvaClientMonitor::poll"
<< " channelName " << channelName
<< " channelName " << pvaClientChannel->getChannel()->getChannelName()
<< endl;
}
checkMonitorState();
if(!isStarted) {
string channelName("disconnected");
Channel::shared_pointer chan(channel.lock());
if(chan) channelName = chan->getChannelName();
string message = string("channel ") + channelName
string message = string("channel ") + pvaClientChannel->getChannel()->getChannelName()
+ " PvaClientMonitor::poll illegal state ";
throw std::runtime_error(message);
}
if(userPoll) {
string channelName("disconnected");
Channel::shared_pointer chan(channel.lock());
if(chan) channelName = chan->getChannelName();
string message = string("channel ") + channelName
string message = string("channel ") + pvaClientChannel->getChannel()->getChannelName()
+ " PvaClientMonitor::poll did not release last";
throw std::runtime_error(message);
}
@@ -389,18 +450,12 @@ bool PvaClientMonitor::poll()
bool PvaClientMonitor::waitEvent(double secondsToWait)
{
if(PvaClient::getDebug()) {
string channelName("disconnected");
Channel::shared_pointer chan(channel.lock());
if(chan) channelName = chan->getChannelName();
cout << "PvaClientMonitor::waitEvent"
<< " channelName " << channelName
<< " channelName " << pvaClientChannel->getChannel()->getChannelName()
<< endl;
}
if(!isStarted) {
Channel::shared_pointer chan(channel.lock());
string channelName("disconnected");
if(chan) channelName = chan->getChannelName();
string message = string("channel ") + channelName
string message = string("channel ") + pvaClientChannel->getChannel()->getChannelName()
+ " PvaClientMonitor::waitEvent illegal state ";
throw std::runtime_error(message);
}
@@ -418,26 +473,17 @@ bool PvaClientMonitor::waitEvent(double secondsToWait)
void PvaClientMonitor::releaseEvent()
{
if(PvaClient::getDebug()) {
string channelName("disconnected");
Channel::shared_pointer chan(channel.lock());
if(chan) channelName = chan->getChannelName();
cout << "PvaClientMonitor::releaseEvent"
<< " channelName " << channelName
<< " channelName " << pvaClientChannel->getChannel()->getChannelName()
<< endl;
}
if(!isStarted) {
Channel::shared_pointer chan(channel.lock());
string channelName("disconnected");
if(chan) channelName = chan->getChannelName();
string message = string("channel ") + channelName
string message = string("channel ") + pvaClientChannel->getChannel()->getChannelName()
+ " PvaClientMonitor::releaseEvent monitor not started ";
throw std::runtime_error(message);
}
if(!userPoll) {
Channel::shared_pointer chan(channel.lock());
string channelName("disconnected");
if(chan) channelName = chan->getChannelName();
string message = string("channel ") + channelName
string message = string("channel ") + pvaClientChannel->getChannel()->getChannelName()
+ " PvaClientMonitor::releaseEvent did not call poll";
throw std::runtime_error(message);
}
@@ -448,11 +494,8 @@ void PvaClientMonitor::releaseEvent()
PvaClientMonitorDataPtr PvaClientMonitor::getData()
{
if(PvaClient::getDebug()) {
string channelName("disconnected");
Channel::shared_pointer chan(channel.lock());
if(chan) channelName = chan->getChannelName();
cout << "PvaClientMonitor::getData"
<< " channelName " << channelName
<< " channelName " << pvaClientChannel->getChannel()->getChannelName()
<< endl;
}
checkMonitorState();