Collect ca_client_context operations

Each instance of the caContext class represents a separate CA context,
so each CAChannelProvider creates one and keeps a shared_ptr to it,
making that available to its channels and channel operations. These
also take their own shared_ptr to it as well so the context cannot be
destroyed while it might be needed.

A related caContext Attach object is intended to be short-lived, and
to be allocated on the stack. When created it saves the current CA
context for the thread, replacing it from the caContext given to its
constructor. CA operations will now use the attached context. When the
Attach destructor runs it detaches the thread from the current context
(checking still has the expected value) and re-attaches the thread to
any context that was saved by the constructor.
This commit is contained in:
Andrew Johnson
2020-10-08 00:13:24 -05:00
committed by mdavidsaver
parent 25dde0f4ba
commit f9c40e96cf
8 changed files with 175 additions and 91 deletions

View File

@ -11,9 +11,10 @@ LIB_SYS_LIBS_WIN32 += netapi32 ws2_32
INC += pv/caProvider.h
pvAccessCA_SRCS += notifierConveyor.cpp
pvAccessCA_SRCS += caProvider.cpp
pvAccessCA_SRCS += caContext.cpp
pvAccessCA_SRCS += caChannel.cpp
pvAccessCA_SRCS += dbdToPv.cpp
pvAccessCA_SRCS += notifierConveyor.cpp
include $(TOP)/configure/RULES

View File

@ -111,7 +111,8 @@ CAChannel::CAChannel(std::string const & channelName,
channelID(0),
channelCreated(false),
channelConnected(false),
connectNotification(new Notification())
connectNotification(new Notification()),
ca_context(channelProvider->caContext())
{
}
@ -120,17 +121,17 @@ void CAChannel::activate(short priority)
ChannelRequester::shared_pointer req(channelRequester.lock());
if (!req) return;
connectNotification->setClient(shared_from_this());
attachContext();
Attach to(ca_context);
int result = ca_create_channel(channelName.c_str(),
ca_connection_handler,
this,
priority, // TODO mapping
&channelID);
ca_connection_handler, this,
priority, // TODO mapping
&channelID);
if (result == ECA_NORMAL) {
channelCreated = true;
CAChannelProviderPtr provider(channelProvider.lock());
if(provider) provider->addChannel(shared_from_this());
EXCEPTION_GUARD(req->channelCreated(Status::Ok, shared_from_this()));
channelCreated = true;
CAChannelProviderPtr provider(channelProvider.lock());
if (provider)
provider->addChannel(shared_from_this());
EXCEPTION_GUARD(req->channelCreated(Status::Ok, shared_from_this()));
}
else {
Status errorStatus(Status::STATUSTYPE_ERROR, string(ca_message(result)));
@ -162,7 +163,7 @@ void CAChannel::disconnectChannel()
}
monitorlist.resize(0);
/* Clear CA Channel */
attachContext();
Attach to(ca_context);
int result = ca_clear_channel(channelID);
if (result == ECA_NORMAL) return;
string mess("CAChannel::disconnectChannel() ");
@ -361,18 +362,6 @@ void CAChannelGetField::callRequester(CAChannelPtr const & caChannel)
/* ---------------------------------------------------------- */
void CAChannel::attachContext()
{
CAChannelProviderPtr provider(channelProvider.lock());
if (provider) {
provider->attachContext();
return;
}
string mess("CAChannel::attachContext provider does not exist ");
mess += getChannelName();
throw std::runtime_error(mess);
}
CAChannelGetPtr CAChannelGet::create(
CAChannel::shared_pointer const & channel,
ChannelGetRequester::shared_pointer const & channelGetRequester,
@ -389,7 +378,8 @@ CAChannelGet::CAChannelGet(CAChannel::shared_pointer const & channel,
channelGetRequester(channelGetRequester),
pvRequest(pvRequest),
getStatus(Status::Ok),
getNotification(new Notification())
getNotification(new Notification()),
ca_context(channel->caContext())
{}
CAChannelGet::~CAChannelGet()
@ -445,8 +435,8 @@ void CAChannelGet::get()
{
ChannelGetRequester::shared_pointer getRequester(channelGetRequester.lock());
if (!getRequester) return;
channel->attachContext();
bitSet->clear();
Attach to(ca_context);
int result = ca_array_get_callback(dbdToPv->getRequestType(),
0,
channel->getChannelID(), ca_get_handler, this);
@ -492,7 +482,8 @@ CAChannelPut::CAChannelPut(CAChannel::shared_pointer const & channel,
isPut(false),
getStatus(Status::Ok),
putStatus(Status::Ok),
putNotification(new Notification())
putNotification(new Notification()),
ca_context(channel->caContext())
{}
CAChannelPut::~CAChannelPut()
@ -603,8 +594,8 @@ void CAChannelPut::get()
isPut = false;
}
channel->attachContext();
bitSet->clear();
Attach to(ca_context);
int result = ca_array_get_callback(dbdToPv->getRequestType(),
0,
channel->getChannelID(), ca_put_get_handler, this);
@ -737,7 +728,8 @@ CAChannelMonitor::CAChannelMonitor(
isStarted(false),
pevid(NULL),
eventMask(DBE_VALUE | DBE_ALARM),
eventNotification(new Notification())
eventNotification(new Notification()),
ca_context(channel->caContext())
{}
CAChannelMonitor::~CAChannelMonitor()
@ -833,7 +825,7 @@ Status CAChannelMonitor::start()
isStarted = true;
monitorQueue->start();
}
channel->attachContext();
Attach to(ca_context);
int result = ca_create_subscription(dbdToPv->getRequestType(),
0,
channel->getChannelID(), eventMask,
@ -859,6 +851,7 @@ Status CAChannelMonitor::stop()
isStarted = false;
}
monitorQueue->stop();
// Attach to(ca_context); -- Not required!
int result = ca_clear_subscription(pevid);
if (result==ECA_NORMAL)
return Status::Ok;

View File

@ -96,14 +96,16 @@ public:
epics::pvData::PVStructurePtr const & pvRequest);
virtual void printInfo(std::ostream& out);
void attachContext();
void disconnectChannel();
void connect(bool isConnected);
virtual void notifyClient();
void notifyResult(NotificationPtr const &notificationPtr);
virtual void notifyClient();
CAContextPtr caContext() {
return ca_context;
}
private:
virtual void destroy() {}
CAChannel(std::string const & channelName,
CAChannelProvider::shared_pointer const & channelProvider,
ChannelRequester::shared_pointer const & channelRequester);
@ -117,6 +119,7 @@ private:
bool channelCreated;
bool channelConnected;
NotificationPtr connectNotification;
CAContextPtr ca_context;
epics::pvData::Mutex requestsMutex;
std::queue<CAChannelGetFieldPtr> getFieldQueue;
@ -148,16 +151,17 @@ public:
void activate();
virtual void notifyClient();
private:
virtual void destroy() {}
CAChannelGet(CAChannel::shared_pointer const & _channel,
ChannelGetRequester::shared_pointer const & _channelGetRequester,
epics::pvData::PVStructurePtr const & pvRequest);
CAChannelPtr channel;
ChannelGetRequester::weak_pointer channelGetRequester;
const epics::pvData::PVStructure::shared_pointer pvRequest;
epics::pvData::Status getStatus;
NotificationPtr getNotification;
CAContextPtr ca_context;
DbdToPvPtr dbdToPv;
epics::pvData::Mutex mutex;
epics::pvData::PVStructure::shared_pointer pvStructure;
@ -190,7 +194,6 @@ public:
void activate();
virtual void notifyClient();
private:
virtual void destroy() {}
CAChannelPut(CAChannel::shared_pointer const & _channel,
ChannelPutRequester::shared_pointer const & _channelPutRequester,
epics::pvData::PVStructurePtr const & pvRequest);
@ -202,6 +205,8 @@ private:
epics::pvData::Status getStatus;
epics::pvData::Status putStatus;
NotificationPtr putNotification;
CAContextPtr ca_context;
DbdToPvPtr dbdToPv;
epics::pvData::Mutex mutex;
epics::pvData::PVStructure::shared_pointer pvStructure;
@ -233,9 +238,8 @@ public:
void activate();
virtual void notifyClient();
private:
virtual void destroy() {}
CAChannelMonitor(CAChannel::shared_pointer const & _channel,
MonitorRequester::shared_pointer const & _monitorRequester,
CAChannelMonitor(CAChannel::shared_pointer const & channel,
MonitorRequester::shared_pointer const & monitorRequester,
epics::pvData::PVStructurePtr const & pvRequest);
CAChannelPtr channel;
MonitorRequester::weak_pointer monitorRequester;
@ -244,6 +248,7 @@ private:
evid pevid;
unsigned long eventMask;
NotificationPtr eventNotification;
CAContextPtr ca_context;
DbdToPvPtr dbdToPv;
epics::pvData::Mutex mutex;

65
src/ca/caContext.cpp Normal file
View File

@ -0,0 +1,65 @@
/**
* Copyright - See the COPYRIGHT that is included with this distribution.
* pvAccessCPP is distributed subject to a Software License Agreement found
* in file LICENSE that is included with this distribution.
*/
#include <cadef.h>
#define epicsExportSharedSymbols
#include "caContext.h"
namespace epics {
namespace pvAccess {
namespace ca {
CAContext::CAContext()
{
int result = ca_context_create(ca_enable_preemptive_callback);
if (result != ECA_NORMAL)
throw std::runtime_error("Can't create CA context");
ca_context = ca_current_context();
}
ca_client_context* CAContext::attach()
{
ca_client_context *thread_context = ca_current_context();
if (thread_context != ca_context) {
if (thread_context)
ca_detach_context();
int result = ca_attach_context(ca_context);
if (result != ECA_NORMAL)
throw std::runtime_error("Can't attach to CA context");
}
return thread_context;
}
void CAContext::detach(ca_client_context* restore) \
{
ca_client_context *thread_context = ca_current_context();
if (thread_context != ca_context)
std::cerr << "CA context was changed!" << std::endl;
ca_detach_context();
if (restore) {
int result = ca_attach_context(restore);
if (result != ECA_NORMAL)
std::cerr << "Can't re-attach to CA context" << std::endl;
}
}
CAContext::~CAContext()
{
ca_client_context *thread_context = attach();
ca_context_destroy();
if (thread_context != ca_context) {
int result = ca_attach_context(ca_context);
if (result != ECA_NORMAL)
std::cerr << "Can't re-attach to CA context" << std::endl;
}
}
}}}

51
src/ca/caContext.h Normal file
View File

@ -0,0 +1,51 @@
/**
* Copyright - See the COPYRIGHT that is included with this distribution.
* pvAccessCPP is distributed subject to a Software License Agreement found
* in file LICENSE that is included with this distribution.
*/
#ifndef INC_caContext_H
#define INC_caContext_H
#include <cadef.h>
#include <pv/pvAccess.h>
namespace epics {
namespace pvAccess {
namespace ca {
class Attach;
class CAContext;
typedef std::tr1::shared_ptr<CAContext> CAContextPtr;
class CAContext
{
public:
CAContext();
~CAContext();
private:
ca_client_context* ca_context;
private: // Internal API
friend class Attach;
ca_client_context* attach();
void detach(ca_client_context* restore);
};
class Attach
{
public:
Attach(const CAContextPtr & to) :
context(to), saved_context(to->attach()) {}
~Attach() {
context->detach(saved_context);
}
private:
CAContextPtr context;
ca_client_context* saved_context;
};
}}}
#endif // INC_caContext_H

View File

@ -20,18 +20,11 @@ namespace ca {
using namespace epics::pvData;
CAChannelProvider::CAChannelProvider()
: current_context(0)
{
initialize();
}
CAChannelProvider::CAChannelProvider(const std::tr1::shared_ptr<Configuration> &)
: current_context(0)
: ca_context(CAContextPtr(new CAContext()))
{
connectNotifier.start();
resultNotifier.start();
initialize();
}
CAChannelProvider::~CAChannelProvider()
@ -52,7 +45,6 @@ CAChannelProvider::~CAChannelProvider()
channelQ.front()->disconnectChannel();
channelQ.pop();
}
ca_context_destroy();
}
std::string CAChannelProvider::getProviderName()
@ -137,33 +129,7 @@ void CAChannelProvider::poll()
{
}
void CAChannelProvider::attachContext()
{
ca_client_context *thread_context = ca_current_context();
if (thread_context == current_context)
return;
int result = ca_attach_context(current_context);
if (result == ECA_ISATTACHED)
return;
if (result != ECA_NORMAL)
{
std::string mess("CAChannelProvider::attachContext error calling ca_attach_context ");
mess += ca_message(result);
throw std::runtime_error(mess);
}
}
void CAChannelProvider::initialize()
{
int result = ca_context_create(ca_enable_preemptive_callback);
if (result != ECA_NORMAL)
{
std::string mess("CAChannelProvider::initialize error calling ca_context_create ");
mess += ca_message(result);
throw std::runtime_error(mess);
}
current_context = ca_current_context();
}
// ---------------- CAClientFactory ----------------
void CAClientFactory::start()
{

View File

@ -18,6 +18,7 @@
#include <pv/pvAccess.h>
#include <pv/caProvider.h>
#include "caContext.h"
#include "notifierConveyor.h"
@ -49,7 +50,6 @@ class CAChannelProvider :
{
public:
POINTER_DEFINITIONS(CAChannelProvider);
CAChannelProvider();
CAChannelProvider(const std::tr1::shared_ptr<Configuration>&);
virtual ~CAChannelProvider();
@ -79,9 +79,11 @@ public:
virtual void flush();
virtual void poll();
void attachContext();
void addChannel(const CAChannelPtr & channel);
CAContextPtr caContext() {
return ca_context;
}
void notifyConnection(NotificationPtr const &notificationPtr) {
connectNotifier.notifyClient(notificationPtr);
}
@ -89,8 +91,7 @@ public:
resultNotifier.notifyClient(notificationPtr);
}
private:
void initialize();
ca_client_context* current_context;
CAContextPtr ca_context;
epics::pvData::Mutex channelListMutex;
std::vector<CAChannelWPtr> caChannelList;

View File

@ -357,11 +357,10 @@ void DbdToPv::getChoices(CAChannelPtr const & caChannel)
{
if(caRequestType==DBR_ENUM||caRequestType==DBR_TIME_ENUM)
{
caChannel->attachContext();
chid channelID = caChannel->getChannelID();
int result = ca_array_get_callback(DBR_GR_ENUM,
1,
channelID, enumChoicesHandler, this);
Attach to(caChannel->caContext());
int result = ca_array_get_callback(DBR_GR_ENUM, 1,
channelID, enumChoicesHandler, this);
if (result == ECA_NORMAL) {
result = ca_flush_io();
choicesEvent.wait();
@ -969,18 +968,21 @@ Status DbdToPv::putToDBD(
}
Status status = Status::Ok;
int result = 0;
caChannel->attachContext();
if(block) {
Attach to(caChannel->caContext());
if (block) {
result = ca_array_put_callback(caValueType,count,channelID,pValue,putHandler,userarg);
} else {
}
else {
result = ca_array_put(caValueType,count,channelID,pValue);
}
if(result==ECA_NORMAL) {
ca_flush_io();
} else {
status = Status(Status::STATUSTYPE_ERROR, string(ca_message(result)));
if (result == ECA_NORMAL) {
ca_flush_io();
}
if(ca_stringBuffer!=NULL) delete[] ca_stringBuffer;
else {
status = Status(Status::STATUSTYPE_ERROR, string(ca_message(result)));
}
if (ca_stringBuffer != NULL)
delete[] ca_stringBuffer;
return status;
}