Replaced 4 notification threads with 2 notifierConveyors

Channel connection notifications are now handled by connectNotifier,
getDone, putDone and monitor events handled by resultNotifier.
A notifierConveyor is generic, and contains a queue and a thread.
You pass a Notification pointing to a NotifierClient to a Conveyor's
notifyClient() method, and the thread will call the client's
notifyClient() method once it reaches the front of the queue.

The conveyor threads stop when the caProvider is destroyed.
The queue stores weak pointers, so queued notifications won't prevent
client objects from being destroyed.
This commit is contained in:
Andrew Johnson
2020-10-06 00:48:32 -05:00
committed by mdavidsaver
parent 2729903a10
commit 06c2fb579f
16 changed files with 213 additions and 833 deletions

View File

@@ -5,17 +5,10 @@
*/
#include <epicsVersion.h>
#include <pv/standardField.h>
#include <pv/logger.h>
#include <pv/pvAccess.h>
#define epicsExportSharedSymbols
#include "channelConnectThread.h"
#include "monitorEventThread.h"
#include "getDoneThread.h"
#include "putDoneThread.h"
#include "caChannel.h"
using namespace epics::pvData;
@@ -40,12 +33,14 @@ CAChannel::shared_pointer CAChannel::create(CAChannelProvider::shared_pointer co
return caChannel;
}
extern "C" {
static void ca_connection_handler(struct connection_handler_args args)
{
CAChannel *channel = static_cast<CAChannel*>(ca_puser(args.chid));
channel->connect(args.op == CA_OP_CONN_UP);
}
}
void CAChannel::connect(bool isConnected)
{
@@ -55,7 +50,13 @@ void CAChannel::connect(bool isConnected)
}
CAChannelProviderPtr provider(channelProvider.lock());
if (!provider) return;
provider->getChannelConnectThread().channelConnected(notifyChannelRequester);
provider->notifyConnection(connectNotification);
}
void CAChannel::notifyResult(NotificationPtr const &notificationPtr) {
CAChannelProviderPtr provider(channelProvider.lock());
if (!provider) return;
provider->notifyResult(notificationPtr);
}
void CAChannel::notifyClient()
@@ -109,7 +110,8 @@ CAChannel::CAChannel(std::string const & channelName,
channelRequester(channelRequester),
channelID(0),
channelCreated(false),
channelConnected(false)
channelConnected(false),
connectNotification(new Notification())
{
}
@@ -117,8 +119,7 @@ void CAChannel::activate(short priority)
{
ChannelRequester::shared_pointer req(channelRequester.lock());
if (!req) return;
notifyChannelRequester = NotifyChannelRequesterPtr(new NotifyChannelRequester());
notifyChannelRequester->setChannel(shared_from_this());
connectNotification->setClient(shared_from_this());
attachContext();
int result = ca_create_channel(channelName.c_str(),
ca_connection_handler,
@@ -161,10 +162,7 @@ void CAChannel::disconnectChannel()
}
monitorlist.resize(0);
/* Clear CA Channel */
CAChannelProviderPtr provider(channelProvider.lock());
if (provider) {
std::tr1::static_pointer_cast<CAChannelProvider>(provider)->attachContext();
}
attachContext();
int result = ca_clear_channel(channelID);
if (result == ECA_NORMAL) return;
string mess("CAChannel::disconnectChannel() ");
@@ -367,7 +365,7 @@ void CAChannel::attachContext()
{
CAChannelProviderPtr provider(channelProvider.lock());
if (provider) {
std::tr1::static_pointer_cast<CAChannelProvider>(provider)->attachContext();
provider->attachContext();
return;
}
string mess("CAChannel::attachContext provider does not exist ");
@@ -391,7 +389,7 @@ CAChannelGet::CAChannelGet(CAChannel::shared_pointer const & channel,
channelGetRequester(channelGetRequester),
pvRequest(pvRequest),
getStatus(Status::Ok),
getDoneThread(GetDoneThread::get())
getNotification(new Notification())
{}
CAChannelGet::~CAChannelGet()
@@ -406,8 +404,7 @@ void CAChannelGet::activate()
dbdToPv->getChoices(channel);
pvStructure = dbdToPv->createPVStructure();
bitSet = BitSetPtr(new BitSet(pvStructure->getStructure()->getNumberFields()));
notifyGetRequester = NotifyGetRequesterPtr(new NotifyGetRequester());
notifyGetRequester->setChannelGet(shared_from_this());
getNotification->setClient(shared_from_this());
EXCEPTION_GUARD(getRequester->channelGetConnect(Status::Ok, shared_from_this(),
pvStructure->getStructure()));
}
@@ -419,7 +416,7 @@ std::string CAChannelGet::getRequesterName()
return "CAChannelGet";
}
namespace {
extern "C" {
static void ca_get_handler(struct event_handler_args args)
{
@@ -427,14 +424,14 @@ static void ca_get_handler(struct event_handler_args args)
channelGet->getDone(args);
}
} // namespace
} // extern "C"
void CAChannelGet::getDone(struct event_handler_args &args)
{
ChannelGetRequester::shared_pointer getRequester(channelGetRequester.lock());
if (!getRequester) return;
getStatus = dbdToPv->getFromDBD(pvStructure,bitSet,args);
getDoneThread->getDone(notifyGetRequester);
channel->notifyResult(getNotification);
}
void CAChannelGet::notifyClient()
@@ -495,7 +492,7 @@ CAChannelPut::CAChannelPut(CAChannel::shared_pointer const & channel,
isPut(false),
getStatus(Status::Ok),
putStatus(Status::Ok),
putDoneThread(PutDoneThread::get())
putNotification(new Notification())
{}
CAChannelPut::~CAChannelPut()
@@ -517,8 +514,7 @@ void CAChannelPut::activate()
if (val.compare("true")==0)
block = true;
}
notifyPutRequester = NotifyPutRequesterPtr(new NotifyPutRequester());
notifyPutRequester->setChannelPut(shared_from_this());
putNotification->setClient(shared_from_this());
EXCEPTION_GUARD(putRequester->channelPutConnect(Status::Ok, shared_from_this(),
pvStructure->getStructure()));
}
@@ -531,7 +527,7 @@ std::string CAChannelPut::getRequesterName()
/* --------------- epics::pvAccess::ChannelPut --------------- */
namespace {
extern "C" {
static void ca_put_handler(struct event_handler_args args)
{
@@ -545,7 +541,7 @@ static void ca_put_get_handler(struct event_handler_args args)
channelPut->getDone(args);
}
} // namespace
} // extern "C"
void CAChannelPut::put(PVStructure::shared_pointer const & pvPutStructure,
@@ -574,7 +570,7 @@ void CAChannelPut::putDone(struct event_handler_args &args)
else {
putStatus = Status::Ok;
}
putDoneThread->putDone(notifyPutRequester);
channel->notifyResult(putNotification);
}
void CAChannelPut::getDone(struct event_handler_args &args)
@@ -582,7 +578,7 @@ void CAChannelPut::getDone(struct event_handler_args &args)
ChannelPutRequester::shared_pointer putRequester(channelPutRequester.lock());
if (!putRequester) return;
getStatus = dbdToPv->getFromDBD(pvStructure,bitSet,args);
putDoneThread->putDone(notifyPutRequester);
channel->notifyResult(putNotification);
}
void CAChannelPut::notifyClient()
@@ -739,9 +735,9 @@ CAChannelMonitor::CAChannelMonitor(
monitorRequester(monitorRequester),
pvRequest(pvRequest),
isStarted(false),
monitorEventThread(MonitorEventThread::get()),
pevid(NULL),
eventMask(DBE_VALUE | DBE_ALARM)
eventMask(DBE_VALUE | DBE_ALARM),
eventNotification(new Notification())
{}
CAChannelMonitor::~CAChannelMonitor()
@@ -778,8 +774,7 @@ void CAChannelMonitor::activate()
if (value.find("PROPERTY")!=std::string::npos) eventMask|=DBE_PROPERTY;
}
}
notifyMonitorRequester = NotifyMonitorRequesterPtr(new NotifyMonitorRequester());
notifyMonitorRequester->setChannelMonitor(shared_from_this());
eventNotification->setClient(shared_from_this());
monitorQueue = CACMonitorQueuePtr(new CACMonitorQueue(queueSize));
EXCEPTION_GUARD(requester->monitorConnect(Status::Ok, shared_from_this(),
pvStructure->getStructure()));
@@ -807,7 +802,7 @@ void CAChannelMonitor::subscriptionEvent(struct event_handler_args &args)
else {
*(activeElement->overrunBitSet) |= *(activeElement->changedBitSet);
}
monitorEventThread->event(notifyMonitorRequester);
channel->notifyResult(eventNotification);
}
else {
string mess("CAChannelMonitor::subscriptionEvent ");