delete stopMonitorThread; add monitorEventThread

This commit is contained in:
mrkraimer
2018-06-19 13:11:01 -04:00
parent 43111a2195
commit 184e92b346
11 changed files with 328 additions and 250 deletions

View File

@@ -11,7 +11,7 @@
#include <pv/logger.h>
#include <pv/pvAccess.h>
#include <pv/reftrack.h>
#include "stopMonitorThread.h"
#include "monitorEventThread.h"
#define epicsExportSharedSymbols
#include "caChannel.h"
@@ -70,7 +70,9 @@ void CAChannel::connected()
getQueue.pop();
}
while(!monitorQueue.empty()) {
monitorQueue.front()->activate();
CAChannelMonitorPtr monitor(monitorQueue.front());
monitor->activate();
addMonitor(monitor);
monitorQueue.pop();
}
ChannelRequester::shared_pointer req(channelRequester.lock());
@@ -161,6 +163,14 @@ void CAChannel::disconnectChannel()
if(!channelCreated) return;
channelCreated = false;
}
std::vector<CAChannelMonitorWPtr>::iterator it;
for(it = monitorlist.begin(); it!=monitorlist.end(); ++it)
{
CAChannelMonitorPtr mon = (*it).lock();
if(!mon) continue;
mon->stop();
}
monitorlist.resize(0);
/* Clear CA Channel */
CAChannelProviderPtr provider(channelProvider.lock());
if(provider) {
@@ -233,7 +243,7 @@ void CAChannel::getField(GetFieldRequester::shared_pointer const & requester,
}
AccessRights CAChannel::getAccessRights(epics::pvData::PVField::shared_pointer const & /*pvField*/)
AccessRights CAChannel::getAccessRights(PVField::shared_pointer const & /*pvField*/)
{
if (ca_write_access(channelID))
return readWrite;
@@ -246,7 +256,7 @@ AccessRights CAChannel::getAccessRights(epics::pvData::PVField::shared_pointer c
ChannelGet::shared_pointer CAChannel::createChannelGet(
ChannelGetRequester::shared_pointer const & channelGetRequester,
epics::pvData::PVStructure::shared_pointer const & pvRequest)
PVStructure::shared_pointer const & pvRequest)
{
if(DEBUG_LEVEL>0) {
cout << "CAChannel::createChannelGet " << channelName << endl;
@@ -267,7 +277,7 @@ ChannelGet::shared_pointer CAChannel::createChannelGet(
ChannelPut::shared_pointer CAChannel::createChannelPut(
ChannelPutRequester::shared_pointer const & channelPutRequester,
epics::pvData::PVStructure::shared_pointer const & pvRequest)
PVStructure::shared_pointer const & pvRequest)
{
if(DEBUG_LEVEL>0) {
cout << "CAChannel::createChannelPut " << channelName << endl;
@@ -288,7 +298,7 @@ ChannelPut::shared_pointer CAChannel::createChannelPut(
Monitor::shared_pointer CAChannel::createMonitor(
MonitorRequester::shared_pointer const & monitorRequester,
epics::pvData::PVStructure::shared_pointer const & pvRequest)
PVStructure::shared_pointer const & pvRequest)
{
if(DEBUG_LEVEL>0) {
cout << "CAChannel::createMonitor " << channelName << endl;
@@ -303,9 +313,22 @@ Monitor::shared_pointer CAChannel::createMonitor(
}
}
channelMonitor->activate();
addMonitor(channelMonitor);
return channelMonitor;
}
void CAChannel::addMonitor(CAChannelMonitorPtr const & monitor)
{
std::vector<CAChannelMonitorWPtr>::iterator it;
for(it = monitorlist.begin(); it!=monitorlist.end(); ++it)
{
CAChannelMonitorWPtr mon = *it;
if(mon.lock()) continue;
mon = monitor;
return;
}
monitorlist.push_back(monitor);
}
void CAChannel::printInfo(std::ostream& out)
{
@@ -348,7 +371,7 @@ void CAChannelGetField::callRequester(CAChannelPtr const & caChannel)
PVStructurePtr pvRequest(createRequest(""));
DbdToPvPtr dbdToPv = DbdToPv::create(caChannel,pvRequest,getIO);
PVStructurePtr pvStructure = dbdToPv->createPVStructure();
epics::pvData::Structure::const_shared_pointer structure(pvStructure->getStructure());
Structure::const_shared_pointer structure(pvStructure->getStructure());
Field::const_shared_pointer field =
subField.empty() ?
std::tr1::static_pointer_cast<const Field>(structure) :
@@ -385,7 +408,7 @@ size_t CAChannelGet::num_instances;
CAChannelGetPtr CAChannelGet::create(
CAChannel::shared_pointer const & channel,
ChannelGetRequester::shared_pointer const & channelGetRequester,
epics::pvData::PVStructure::shared_pointer const & pvRequest)
PVStructure::shared_pointer const & pvRequest)
{
if(DEBUG_LEVEL>0) {
cout << "CAChannelGet::create " << channel->getChannelName() << endl;
@@ -395,7 +418,7 @@ CAChannelGetPtr CAChannelGet::create(
CAChannelGet::CAChannelGet(CAChannel::shared_pointer const & channel,
ChannelGetRequester::shared_pointer const & channelGetRequester,
epics::pvData::PVStructure::shared_pointer const & pvRequest)
PVStructure::shared_pointer const & pvRequest)
:
channel(channel),
channelGetRequester(channelGetRequester),
@@ -490,7 +513,7 @@ size_t CAChannelPut::num_instances;
CAChannelPutPtr CAChannelPut::create(
CAChannel::shared_pointer const & channel,
ChannelPutRequester::shared_pointer const & channelPutRequester,
epics::pvData::PVStructure::shared_pointer const & pvRequest)
PVStructure::shared_pointer const & pvRequest)
{
if(DEBUG_LEVEL>0) {
cout << "CAChannelPut::create " << channel->getChannelName() << endl;
@@ -500,7 +523,7 @@ CAChannelPutPtr CAChannelPut::create(
CAChannelPut::CAChannelPut(CAChannel::shared_pointer const & channel,
ChannelPutRequester::shared_pointer const & channelPutRequester,
epics::pvData::PVStructure::shared_pointer const & pvRequest)
PVStructure::shared_pointer const & pvRequest)
:
channel(channel),
channelPutRequester(channelPutRequester),
@@ -698,7 +721,7 @@ size_t CAChannelMonitor::num_instances;
CAChannelMonitorPtr CAChannelMonitor::create(
CAChannel::shared_pointer const & channel,
MonitorRequester::shared_pointer const & monitorRequester,
epics::pvData::PVStructure::shared_pointer const & pvRequest)
PVStructure::shared_pointer const & pvRequest)
{
if(DEBUG_LEVEL>0) {
cout << "CAChannelMonitor::create " << channel->getChannelName() << endl;
@@ -715,7 +738,8 @@ CAChannelMonitor::CAChannelMonitor(
monitorRequester(monitorRequester),
pvRequest(pvRequest),
isStarted(false),
stopMonitorThread(StopMonitorThread::get())
monitorEventThread(MonitorEventThread::get()),
pevid(NULL)
{}
CAChannelMonitor::~CAChannelMonitor()
@@ -726,9 +750,7 @@ CAChannelMonitor::~CAChannelMonitor()
<< " isStarted " << (isStarted ? "true" : "false")
<< endl;
}
if(isStarted) stop();
stopMonitorThread->addNoEventsCallback(&waitForNoEvents);
waitForNoEvents.wait();
stop();
}
void CAChannelMonitor::activate()
@@ -753,6 +775,9 @@ void CAChannelMonitor::activate()
if (size > 1) queueSize = size;
}
}
notifyRequester = NotifyRequesterPtr(new NotifyRequester());
notifyRequester->setChannelMonitor(shared_from_this());
monitorQueue = CACMonitorQueuePtr(new CACMonitorQueue(queueSize));
EXCEPTION_GUARD(requester->monitorConnect(Status::Ok, shared_from_this(),
pvStructure->getStructure()));
@@ -766,7 +791,10 @@ void CAChannelMonitor::subscriptionEvent(struct event_handler_args &args)
std::cout << "CAChannelMonitor::subscriptionEvent "
<< channel->getChannelName() << endl;
}
if(!isStarted) return;
{
Lock lock(mutex);
if(!isStarted) return;
}
MonitorRequester::shared_pointer requester(monitorRequester.lock());
if(!requester) return;
Status status = dbdToPv->getFromDBD(pvStructure,activeElement->changedBitSet,args);
@@ -778,9 +806,7 @@ void CAChannelMonitor::subscriptionEvent(struct event_handler_args &args)
} else {
*(activeElement->overrunBitSet) |= *(activeElement->changedBitSet);
}
// call monitorRequester even if queue is full
requester->monitorEvent(shared_from_this());
monitorEventThread->event(notifyRequester);
}
else
{
@@ -791,24 +817,27 @@ void CAChannelMonitor::subscriptionEvent(struct event_handler_args &args)
}
}
epics::pvData::Status CAChannelMonitor::start()
Status CAChannelMonitor::start()
{
if(DEBUG_LEVEL>0) {
std::cout << "CAChannelMonitor::start " << channel->getChannelName() << endl;
}
Status status = Status::Ok;
if(isStarted) {
status = Status(Status::STATUSTYPE_WARNING,"already started");
return status;
{
Lock lock(mutex);
if(isStarted) {
status = Status(Status::STATUSTYPE_WARNING,"already started");
return status;
}
isStarted = true;
monitorQueue->start();
}
channel->attachContext();
monitorQueue->start();
isStarted = true;
int result = ca_create_subscription(dbdToPv->getRequestType(),
0,
channel->getChannelID(), DBE_VALUE,
ca_subscription_handler, this,
&eventID);
&pevid);
if (result == ECA_NORMAL)
{
result = ca_flush_io();
@@ -819,7 +848,7 @@ epics::pvData::Status CAChannelMonitor::start()
return Status(Status::STATUSTYPE_ERROR,message);
}
epics::pvData::Status CAChannelMonitor::stop()
Status CAChannelMonitor::stop()
{
if(DEBUG_LEVEL>0) {
std::cout << "CAChannelMonitor::stop "
@@ -827,12 +856,26 @@ epics::pvData::Status CAChannelMonitor::stop()
<< " isStarted " << (isStarted ? "true" : "false")
<< endl;
}
if(!isStarted) return Status(Status::STATUSTYPE_WARNING,"already stopped");
isStarted = false;
{
Lock lock(mutex);
if(!isStarted) return Status(Status::STATUSTYPE_WARNING,"already stopped");
isStarted = false;
}
monitorQueue->stop();
stopMonitorThread->callStop(eventID);
eventID = NULL;
return Status::Ok;
int result = ca_clear_subscription(pevid);
if(result==ECA_NORMAL) return Status::Ok;
return Status(Status::STATUSTYPE_ERROR,string(ca_message(result)));
}
void CAChannelMonitor::notifyClient()
{
{
Lock lock(mutex);
if(!isStarted) return;
}
MonitorRequester::shared_pointer requester(monitorRequester.lock());
if(!requester) return;
requester->monitorEvent(shared_from_this());
}
@@ -841,6 +884,10 @@ MonitorElementPtr CAChannelMonitor::poll()
if(DEBUG_LEVEL>1) {
std::cout << "CAChannelMonitor::poll " << channel->getChannelName() << endl;
}
{
Lock lock(mutex);
if(!isStarted) return MonitorElementPtr();
}
return monitorQueue->poll();
}
@@ -853,7 +900,7 @@ void CAChannelMonitor::release(MonitorElementPtr const & monitorElement)
monitorQueue->release(monitorElement);
}
/* --------------- epics::pvData::ChannelRequest --------------- */
/* --------------- ChannelRequest --------------- */
void CAChannelMonitor::cancel()
{