add putDoneThread and getDoneThread; fix issue 114

This commit is contained in:
mrkraimer
2018-07-14 11:46:26 -04:00
parent f83c6c87d6
commit df45c70149
15 changed files with 615 additions and 164 deletions

View File

@@ -1,6 +1,6 @@
# pvAccessCPP: ca provider
2018.06.25
2018.07.09
Editors:
@@ -610,26 +610,26 @@ Any code that uses **ca** must call **CAClientFactory::start()** before making a
ca_context_create is called for the thread that calls CAClientFactory::start().
Client code can create an Auxillary Thread by calling:
```
ca_client_context* current_context = CAClientFactory::get_ca_client_context();
int result = ca_attach_context(current_context);
```
If the client creates auxillary threads the make pvAccess client requests then the auxillary threads will automatically become
a **ca** auxilary thread.
[Deadlock in ca_clear_subscription()](https://bugs.launchpad.net/epics-base/7.0/+bug/1751380)
Shows a problem with monitor callbacks.
A test was created that shows that the same problem can occur with a combination of rapid get, put and monitor events.
In order to prevent this problem **ca** creates a monitorEventThread.
All calls to the requester's **monitorEvent** method are made from the monitorEventThread.
In order to prevent this problem **ca** creates the following threads:
**getEventThread**, **putEventThread**, and **monitorEventThread**.
**Note** the monitorEventThread does not call **ca_attach_context**.
This means that no **ca_xxx** function can be called from
the requester's **monitorEvent** method.
All client callbacks are made via one of these threads.
For example a call to the requester's **monitorEvent** method is made from the monitorEventThread.
**Notes**
* These threads do not call **ca_attach_context**.
* No **ca_xxx** function should be called from the requester's callback method.

View File

@@ -12,6 +12,8 @@ LIB_SYS_LIBS_WIN32 += ws2_32
INC += pv/caProvider.h
pvAccessCA_SRCS += monitorEventThread.cpp
pvAccessCA_SRCS += getDoneThread.cpp
pvAccessCA_SRCS += putDoneThread.cpp
pvAccessCA_SRCS += caProvider.cpp
pvAccessCA_SRCS += caChannel.cpp
pvAccessCA_SRCS += dbdToPv.cpp

View File

@@ -10,8 +10,9 @@
#include <pv/standardField.h>
#include <pv/logger.h>
#include <pv/pvAccess.h>
#include <pv/reftrack.h>
#include "monitorEventThread.h"
#include "getDoneThread.h"
#include "putDoneThread.h"
#define epicsExportSharedSymbols
#include "caChannel.h"
@@ -95,8 +96,6 @@ void CAChannel::disconnected()
}
}
size_t CAChannel::num_instances;
CAChannel::CAChannel(std::string const & channelName,
CAChannelProvider::shared_pointer const & channelProvider,
ChannelRequester::shared_pointer const & channelRequester) :
@@ -402,9 +401,6 @@ void CAChannel::attachContext()
throw std::runtime_error(mess);
}
size_t CAChannelGet::num_instances;
CAChannelGetPtr CAChannelGet::create(
CAChannel::shared_pointer const & channel,
ChannelGetRequester::shared_pointer const & channelGetRequester,
@@ -422,7 +418,9 @@ CAChannelGet::CAChannelGet(CAChannel::shared_pointer const & channel,
:
channel(channel),
channelGetRequester(channelGetRequester),
pvRequest(pvRequest)
pvRequest(pvRequest),
getStatus(Status::Ok),
getDoneThread(GetDoneThread::get())
{}
CAChannelGet::~CAChannelGet()
@@ -442,10 +440,14 @@ void CAChannelGet::activate()
dbdToPv = DbdToPv::create(channel,pvRequest,getIO);
pvStructure = dbdToPv->createPVStructure();
bitSet = BitSetPtr(new BitSet(pvStructure->getStructure()->getNumberFields()));
notifyGetRequester = NotifyGetRequesterPtr(new NotifyGetRequester());
notifyGetRequester->setChannelGet(shared_from_this());
EXCEPTION_GUARD(getRequester->channelGetConnect(Status::Ok, shared_from_this(),
pvStructure->getStructure()));
}
std::string CAChannelGet::getRequesterName() { return "CAChannelGet";}
namespace {
@@ -467,8 +469,15 @@ void CAChannelGet::getDone(struct event_handler_args &args)
ChannelGetRequester::shared_pointer getRequester(channelGetRequester.lock());
if(!getRequester) return;
Status status = dbdToPv->getFromDBD(pvStructure,bitSet,args);
EXCEPTION_GUARD(getRequester->getDone(status, shared_from_this(), pvStructure, bitSet));
getStatus = dbdToPv->getFromDBD(pvStructure,bitSet,args);
getDoneThread->getDone(notifyGetRequester);
}
void CAChannelGet::notifyClient()
{
ChannelGetRequester::shared_pointer getRequester(channelGetRequester.lock());
if(!getRequester) return;
EXCEPTION_GUARD(getRequester->getDone(getStatus, shared_from_this(), pvStructure, bitSet));
}
void CAChannelGet::get()
@@ -491,7 +500,8 @@ void CAChannelGet::get()
{
string mess("CAChannelGet::get ");
mess += channel->getChannelName() + " message " + ca_message(result);
throw std::runtime_error(mess);
getStatus = Status(Status::STATUSTYPE_ERROR,mess);
notifyClient();
}
}
@@ -508,8 +518,6 @@ void CAChannelGet::lastRequest()
{
}
size_t CAChannelPut::num_instances;
CAChannelPutPtr CAChannelPut::create(
CAChannel::shared_pointer const & channel,
ChannelPutRequester::shared_pointer const & channelPutRequester,
@@ -528,7 +536,11 @@ CAChannelPut::CAChannelPut(CAChannel::shared_pointer const & channel,
channel(channel),
channelPutRequester(channelPutRequester),
pvRequest(pvRequest),
block(false)
block(false),
isPut(false),
getStatus(Status::Ok),
putStatus(Status::Ok),
putDoneThread(PutDoneThread::get())
{}
CAChannelPut::~CAChannelPut()
@@ -554,6 +566,8 @@ void CAChannelPut::activate()
std::string val = pvString->get();
if(val.compare("true")==0) block = true;
}
notifyPutRequester = NotifyPutRequesterPtr(new NotifyPutRequester());
notifyPutRequester->setChannelPut(shared_from_this());
EXCEPTION_GUARD(putRequester->channelPutConnect(Status::Ok, shared_from_this(),
pvStructure->getStructure()));
}
@@ -565,6 +579,12 @@ std::string CAChannelPut::getRequesterName() { return "CAChannelPut";}
namespace {
static void ca_put_handler(struct event_handler_args args)
{
CAChannelPut *channelPut = static_cast<CAChannelPut*>(args.usr);
channelPut->putDone(args);
}
static void ca_put_get_handler(struct event_handler_args args)
{
CAChannelPut *channelPut = static_cast<CAChannelPut*>(args.usr);
@@ -582,11 +602,33 @@ void CAChannelPut::put(PVStructure::shared_pointer const & pvPutStructure,
}
ChannelPutRequester::shared_pointer putRequester(channelPutRequester.lock());
if(!putRequester) return;
Status status = dbdToPv->putToDBD(channel,pvPutStructure,block);
EXCEPTION_GUARD(putRequester->putDone(status, shared_from_this()));
{
Lock lock(mutex);
isPut = true;
}
putStatus = dbdToPv->putToDBD(channel,pvPutStructure,block,&ca_put_handler,this);
if(!block || !putStatus.isOK()) {
EXCEPTION_GUARD(putRequester->putDone(putStatus, shared_from_this()));
}
}
void CAChannelPut::putDone(struct event_handler_args &args)
{
if(DEBUG_LEVEL>1) {
cout << "CAChannelPut::putDone " << channel->getChannelName() << endl;
}
ChannelPutRequester::shared_pointer putRequester(channelPutRequester.lock());
if(!putRequester) return;
if(args.status!=ECA_NORMAL)
{
putStatus = Status(Status::STATUSTYPE_ERROR, string(ca_message(args.status)));
} else {
putStatus = Status::Ok;
}
putDoneThread->putDone(notifyPutRequester);
}
void CAChannelPut::getDone(struct event_handler_args &args)
{
if(DEBUG_LEVEL>1) {
@@ -595,8 +637,19 @@ void CAChannelPut::getDone(struct event_handler_args &args)
ChannelPutRequester::shared_pointer putRequester(channelPutRequester.lock());
if(!putRequester) return;
Status status = dbdToPv->getFromDBD(pvStructure,bitSet,args);
EXCEPTION_GUARD(putRequester->getDone(status, shared_from_this(), pvStructure, bitSet));
getStatus = dbdToPv->getFromDBD(pvStructure,bitSet,args);
putDoneThread->putDone(notifyPutRequester);
}
void CAChannelPut::notifyClient()
{
ChannelPutRequester::shared_pointer putRequester(channelPutRequester.lock());
if(!putRequester) return;
if(isPut) {
EXCEPTION_GUARD(putRequester->putDone(putStatus, shared_from_this()));
} else {
EXCEPTION_GUARD(putRequester->getDone(getStatus, shared_from_this(), pvStructure, bitSet));
}
}
@@ -607,6 +660,11 @@ void CAChannelPut::get()
}
ChannelPutRequester::shared_pointer putRequester(channelPutRequester.lock());
if(!putRequester) return;
{
Lock lock(mutex);
isPut = false;
}
channel->attachContext();
bitSet->clear();
int result = ca_array_get_callback(dbdToPv->getRequestType(),
@@ -620,7 +678,8 @@ void CAChannelPut::get()
{
string mess("CAChannelPut::get ");
mess += channel->getChannelName() + " message " +ca_message(result);
throw std::runtime_error(mess);
Status status(Status::STATUSTYPE_ERROR,mess);
EXCEPTION_GUARD(putRequester->getDone(status, shared_from_this(), pvStructure, bitSet));
}
}
@@ -716,8 +775,6 @@ public:
}
};
size_t CAChannelMonitor::num_instances;
CAChannelMonitorPtr CAChannelMonitor::create(
CAChannel::shared_pointer const & channel,
MonitorRequester::shared_pointer const & monitorRequester,
@@ -775,9 +832,8 @@ void CAChannelMonitor::activate()
if (size > 1) queueSize = size;
}
}
notifyRequester = NotifyRequesterPtr(new NotifyRequester());
notifyRequester->setChannelMonitor(shared_from_this());
notifyMonitorRequester = NotifyMonitorRequesterPtr(new NotifyMonitorRequester());
notifyMonitorRequester->setChannelMonitor(shared_from_this());
monitorQueue = CACMonitorQueuePtr(new CACMonitorQueue(queueSize));
EXCEPTION_GUARD(requester->monitorConnect(Status::Ok, shared_from_this(),
pvStructure->getStructure()));
@@ -806,7 +862,7 @@ void CAChannelMonitor::subscriptionEvent(struct event_handler_args &args)
} else {
*(activeElement->overrunBitSet) |= *(activeElement->changedBitSet);
}
monitorEventThread->event(notifyRequester);
monitorEventThread->event(notifyMonitorRequester);
}
else
{
@@ -817,6 +873,18 @@ void CAChannelMonitor::subscriptionEvent(struct event_handler_args &args)
}
}
void CAChannelMonitor::notifyClient()
{
{
Lock lock(mutex);
if(!isStarted) return;
}
MonitorRequester::shared_pointer requester(monitorRequester.lock());
if(!requester) return;
requester->monitorEvent(shared_from_this());
}
Status CAChannelMonitor::start()
{
if(DEBUG_LEVEL>0) {
@@ -867,17 +935,6 @@ Status CAChannelMonitor::stop()
return Status(Status::STATUSTYPE_ERROR,string(ca_message(result)));
}
void CAChannelMonitor::notifyClient()
{
{
Lock lock(mutex);
if(!isStarted) return;
}
MonitorRequester::shared_pointer requester(monitorRequester.lock());
if(!requester) return;
requester->monitorEvent(shared_from_this());
}
MonitorElementPtr CAChannelMonitor::poll()
{

View File

@@ -4,6 +4,11 @@
* in file LICENSE that is included with this distribution.
*/
/**
* @author msekoranja, mrk
* @date 2018.07
*/
#ifndef CACHANNEL_H
#define CACHANNEL_H
@@ -23,12 +28,22 @@ namespace epics {
namespace pvAccess {
namespace ca {
class NotifyRequester;
typedef std::tr1::shared_ptr<NotifyRequester> NotifyRequesterPtr;
class NotifyMonitorRequester;
typedef std::tr1::shared_ptr<NotifyMonitorRequester> NotifyMonitorRequesterPtr;
class MonitorEventThread;
typedef std::tr1::shared_ptr<MonitorEventThread> MonitorEventThreadPtr;
class NotifyGetRequester;
typedef std::tr1::shared_ptr<NotifyGetRequester> NotifyGetRequesterPtr;
typedef std::tr1::weak_ptr<NotifyGetRequester> NotifyGetRequesterWPtr;
class GetDoneThread;
typedef std::tr1::shared_ptr<GetDoneThread> GetDoneThreadPtr;
class NotifyPutRequester;
typedef std::tr1::shared_ptr<NotifyPutRequester> NotifyPutRequesterPtr;
typedef std::tr1::weak_ptr<NotifyPutRequester> NotifyPutRequesterWPtr;
class PutDoneThread;
typedef std::tr1::shared_ptr<PutDoneThread> PutDoneThreadPtr;
class CAChannelGetField;
typedef std::tr1::shared_ptr<CAChannelGetField> CAChannelGetFieldPtr;
@@ -62,7 +77,6 @@ class CAChannel :
{
public:
POINTER_DEFINITIONS(CAChannel);
static size_t num_instances;
static CAChannelPtr create(
CAChannelProvider::shared_pointer const & channelProvider,
std::string const & channelName,
@@ -123,7 +137,6 @@ class CAChannelGet :
{
public:
POINTER_DEFINITIONS(CAChannelGet);
static size_t num_instances;
static CAChannelGet::shared_pointer create(CAChannel::shared_pointer const & channel,
ChannelGetRequester::shared_pointer const & channelGetRequester,
epics::pvData::PVStructurePtr const & pvRequest);
@@ -136,7 +149,7 @@ public:
virtual std::string getRequesterName();
void activate();
void notifyClient();
private:
virtual void destroy() {}
CAChannelGet(CAChannel::shared_pointer const & _channel,
@@ -146,7 +159,11 @@ private:
CAChannelPtr channel;
ChannelGetRequester::weak_pointer channelGetRequester;
epics::pvData::PVStructurePtr const & pvRequest;
epics::pvData::Status getStatus;
GetDoneThreadPtr getDoneThread;
NotifyGetRequesterPtr notifyGetRequester;
DbdToPvPtr dbdToPv;
epics::pvData::Mutex mutex;
epics::pvData::PVStructure::shared_pointer pvStructure;
epics::pvData::BitSet::shared_pointer bitSet;
};
@@ -158,11 +175,11 @@ class CAChannelPut :
public:
POINTER_DEFINITIONS(CAChannelPut);
static size_t num_instances;
static CAChannelPut::shared_pointer create(CAChannel::shared_pointer const & channel,
ChannelPutRequester::shared_pointer const & channelPutRequester,
epics::pvData::PVStructurePtr const & pvRequest);
virtual ~CAChannelPut();
void putDone(struct event_handler_args &args);
void getDone(struct event_handler_args &args);
virtual void put(
epics::pvData::PVStructure::shared_pointer const & pvPutStructure,
@@ -175,6 +192,7 @@ public:
virtual std::string getRequesterName();
void activate();
void notifyClient();
private:
virtual void destroy() {}
CAChannelPut(CAChannel::shared_pointer const & _channel,
@@ -184,7 +202,13 @@ private:
ChannelPutRequester::weak_pointer channelPutRequester;
const epics::pvData::PVStructure::shared_pointer pvRequest;
bool block;
bool isPut;
epics::pvData::Status getStatus;
epics::pvData::Status putStatus;
PutDoneThreadPtr putDoneThread;
NotifyPutRequesterPtr notifyPutRequester;
DbdToPvPtr dbdToPv;
epics::pvData::Mutex mutex;
epics::pvData::PVStructure::shared_pointer pvStructure;
epics::pvData::BitSet::shared_pointer bitSet;
};
@@ -199,7 +223,6 @@ class CAChannelMonitor :
public:
POINTER_DEFINITIONS(CAChannelMonitor);
static size_t num_instances;
static CAChannelMonitor::shared_pointer create(CAChannel::shared_pointer const & channel,
MonitorRequester::shared_pointer const & monitorRequester,
epics::pvData::PVStructurePtr const & pvRequest);
@@ -223,9 +246,9 @@ private:
MonitorRequester::weak_pointer monitorRequester;
const epics::pvData::PVStructure::shared_pointer pvRequest;
bool isStarted;
NotifyRequesterPtr notifyRequester;
MonitorEventThreadPtr monitorEventThread;
evid pevid;
NotifyMonitorRequesterPtr notifyMonitorRequester;
DbdToPvPtr dbdToPv;
epics::pvData::Mutex mutex;
@@ -235,8 +258,6 @@ private:
CACMonitorQueuePtr monitorQueue;
};
}
}
}
}}}
#endif /* CACHANNEL_H */

View File

@@ -4,18 +4,16 @@
* in file LICENSE that is included with this distribution.
*/
#include <algorithm>
#include <cadef.h>
#include <epicsSignal.h>
#include <epicsThread.h>
#include <epicsExit.h>
#include <pv/logger.h>
#include <pv/configuration.h>
#include <pv/pvAccess.h>
#include <pv/reftrack.h>
#include "monitorEventThread.h"
#include "getDoneThread.h"
#include "putDoneThread.h"
#define epicsExportSharedSymbols
#include <pv/caProvider.h>
@@ -33,8 +31,6 @@ using namespace epics::pvData;
catch (std::exception &e) { LOG(logLevelError, "Unhandled exception caught from client code at %s:%d: %s", __FILE__, __LINE__, e.what()); } \
catch (...) { LOG(logLevelError, "Unhandled exception caught from client code at %s:%d.", __FILE__, __LINE__); }
size_t CAChannelProvider::num_instances;
CAChannelProvider::CAChannelProvider()
: current_context(0)
{
@@ -43,7 +39,9 @@ CAChannelProvider::CAChannelProvider()
CAChannelProvider::CAChannelProvider(const std::tr1::shared_ptr<Configuration>&)
: current_context(0),
monitorEventThread(MonitorEventThread::get())
monitorEventThread(MonitorEventThread::get()),
getDoneThread(GetDoneThread::get()),
putDoneThread(PutDoneThread::get())
{
if(DEBUG_LEVEL>0) {
std::cout<< "CAChannelProvider::CAChannelProvider\n";
@@ -78,11 +76,13 @@ CAChannelProvider::~CAChannelProvider()
channelQ.pop();
}
monitorEventThread->stop();
getDoneThread->stop();
putDoneThread->stop();
if(DEBUG_LEVEL>0) {
std::cout << "CAChannelProvider::~CAChannelProvider() calling ca_context_destroy\n";
}
ca_context_destroy();
std::cout << "CAChannelProvider::~CAChannelProvider() returning\n";
//std::cout << "CAChannelProvider::~CAChannelProvider() returning\n";
}
std::string CAChannelProvider::getProviderName()
@@ -197,11 +197,6 @@ void CAChannelProvider::initialize()
current_context = ca_current_context();
}
ca_client_context * CAChannelProvider::get_ca_client_context()
{
return current_context;
}
void CAClientFactory::start()
{
if(DEBUG_LEVEL>0) std::cout << "CAClientFactory::start()\n";
@@ -210,29 +205,12 @@ void CAClientFactory::start()
}
epicsSignalInstallSigAlarmIgnore();
epicsSignalInstallSigPipeIgnore();
registerRefCounter("CAChannelProvider", &CAChannelProvider::num_instances);
registerRefCounter("CAChannel", &CAChannel::num_instances);
registerRefCounter("CAChannelGet", &CAChannelGet::num_instances);
registerRefCounter("CAChannelPut", &CAChannelPut::num_instances);
registerRefCounter("CAChannelMonitor", &CAChannelMonitor::num_instances);
if(!ChannelProviderRegistry::clients()->add<CAChannelProvider>("ca", true))
{
throw std::runtime_error("CAClientFactory::start failed");
}
}
ca_client_context * CAClientFactory::get_ca_client_context()
{
if(DEBUG_LEVEL>0) std::cout << "CAClientFactory::get_ca_client_context\n";
ChannelProvider::shared_pointer channelProvider(
ChannelProviderRegistry::clients()->getProvider("ca"));
if(!channelProvider) throw std::runtime_error("CAClientFactory::start() was not called");
CAChannelProviderPtr cacChannelProvider
= std::tr1::static_pointer_cast<CAChannelProvider>(channelProvider);
return cacChannelProvider->get_ca_client_context();
}
void CAClientFactory::stop()
{
// unregister now done with exit hook

View File

@@ -4,6 +4,11 @@
* in file LICENSE that is included with this distribution.
*/
/**
* @author msekoranja, mrk
* @date 2018.07
*/
#ifndef CAPROVIDERPVT_H
#define CAPROVIDERPVT_H
@@ -22,6 +27,11 @@ namespace ca {
class MonitorEventThread;
typedef std::tr1::shared_ptr<MonitorEventThread> MonitorEventThreadPtr;
class GetDoneThread;
typedef std::tr1::shared_ptr<GetDoneThread> GetDoneThreadPtr;
class PutDoneThread;
typedef std::tr1::shared_ptr<PutDoneThread> PutDoneThreadPtr;
class CAChannel;
typedef std::tr1::shared_ptr<CAChannel> CAChannelPtr;
@@ -37,9 +47,6 @@ class CAChannelProvider :
{
public:
POINTER_DEFINITIONS(CAChannelProvider);
static size_t num_instances;
CAChannelProvider();
CAChannelProvider(const std::tr1::shared_ptr<Configuration>&);
virtual ~CAChannelProvider();
@@ -72,7 +79,6 @@ public:
void attachContext();
void addChannel(const CAChannelPtr & channel);
ca_client_context* get_ca_client_context();
private:
virtual void destroy() EPICS_DEPRECATED {}
@@ -81,6 +87,8 @@ private:
epics::pvData::Mutex channelListMutex;
std::vector<CAChannelWPtr> caChannelList;
MonitorEventThreadPtr monitorEventThread;
GetDoneThreadPtr getDoneThread;
PutDoneThreadPtr putDoneThread;
};
}}}

View File

@@ -48,12 +48,6 @@ static void descriptionHandler(struct event_handler_args args)
dbdToPv->getDescriptionDone(args);
}
static void putHandler(struct event_handler_args args)
{
DbdToPv *dbdToPv = static_cast<DbdToPv*>(args.usr);
dbdToPv->putDone(args);
}
DbdToPvPtr DbdToPv::create(
CAChannelPtr const & caChannel,
PVStructurePtr const & pvRequest,
@@ -74,6 +68,8 @@ DbdToPv::DbdToPv(IOType ioType)
valueAlarmRequested(false),
isArray(false),
firstTime(true),
choicesValid(false),
waitForChoicesValid(false),
caValueType(-1),
caRequestType(-1),
maxElements(0)
@@ -292,6 +288,13 @@ void DbdToPv::getChoicesDone(struct event_handler_args &args)
size_t num = dbr_enum_p->no_str;
choices.reserve(num);
for(size_t i=0; i<num; ++i) choices.push_back(string(&dbr_enum_p->strs[i][0]));
bool signal = false;
{
Lock lock(choicesMutex);
choicesValid = true;
if(waitForChoicesValid) signal = true;
}
if(signal) choicesEvent.signal();
}
chtype DbdToPv::getRequestType()
@@ -685,7 +688,9 @@ const void * put_DBRScalarArray(unsigned long*count, PVScalarArray::shared_point
Status DbdToPv::putToDBD(
CAChannelPtr const & caChannel,
PVStructurePtr const & pvStructure,
bool block)
bool block,
caCallbackFunc putHandler,
void * userarg)
{
chid channelID = caChannel->getChannelID();
const void *pValue = NULL;
@@ -745,6 +750,23 @@ Status DbdToPv::putToDBD(
switch(caValueType) {
case DBR_ENUM:
{
bool wait = false;
{
Lock lock(choicesMutex);
if(!choicesValid) {
wait = true;
waitForChoicesValid = true;
}
}
bool result = true;
if(wait) {
result = choicesEvent.wait(5.0);
}
if(!result) {
Status errorStatus(
Status::STATUSTYPE_ERROR, string("DbdToPv::getFromDBD "));
return errorStatus;
}
dbr_enum_t indexvalue = pvStructure->getSubField<PVInt>("value.index")->get();
pValue = &indexvalue;
break;
@@ -761,40 +783,21 @@ Status DbdToPv::putToDBD(
return errorStatus;
}
}
Status status = Status::Ok;
int result = 0;
caChannel->attachContext();
if(block) {
caChannel->attachContext();
result = ca_array_put_callback(caValueType,count,channelID,pValue,putHandler,this);
if(result==ECA_NORMAL) {
ca_flush_io();
if(!waitForCallback.wait(2.0)) {
throw std::runtime_error("DbDToPv::putToDBD waitForCallback timeout");
}
return putStatus;
}
result = ca_array_put_callback(caValueType,count,channelID,pValue,putHandler,userarg);
} else {
caChannel->attachContext();
result = ca_array_put(caValueType,count,channelID,pValue);
ca_flush_io();
}
if(result==ECA_NORMAL) {
ca_flush_io();
} else {
status = Status(Status::STATUSTYPE_ERROR, string(ca_message(result)));
}
if(ca_stringBuffer!=NULL) delete[] ca_stringBuffer;
if(result==ECA_NORMAL) return Status::Ok;
Status errorStatus(Status::STATUSTYPE_ERROR, string(ca_message(result)));
return errorStatus;
}
void DbdToPv::putDone(struct event_handler_args &args)
{
if(args.status!=ECA_NORMAL)
{
string message("DbdToPv::putDone ca_message ");
message += ca_message(args.status);
putStatus = Status(Status::STATUSTYPE_ERROR, string(ca_message(args.status)));
} else {
putStatus = Status::Ok;
}
waitForCallback.signal();
}
return status;
}
}}}

View File

@@ -35,40 +35,46 @@ typedef std::tr1::shared_ptr<ValueAlarmDbd> ValueAlarmDbdPtr;
struct CaAlarm
{
CaAlarm() : status(0), severity(0) {}
dbr_short_t status;
dbr_short_t severity;
CaAlarm() : status(0), severity(0) {}
};
struct CaDisplay
{
CaDisplay() : lower_disp_limit(0),upper_disp_limit(0) {}
double lower_disp_limit;
double upper_disp_limit;
std::string units;
std::string format;
CaDisplay() : lower_disp_limit(0),upper_disp_limit(0) {}
};
struct CaControl
{
CaControl() : upper_ctrl_limit(0),lower_ctrl_limit(0) {}
double upper_ctrl_limit;
double lower_ctrl_limit;
CaControl() : upper_ctrl_limit(0),lower_ctrl_limit(0) {}
};
struct CaValueAlarm
{
CaValueAlarm() : upper_alarm_limit(0),upper_warning_limit(0),lower_warning_limit(0),lower_alarm_limit(0)
{}
double upper_alarm_limit;
double upper_warning_limit;
double lower_warning_limit;
double lower_alarm_limit;
CaValueAlarm() :
upper_alarm_limit(0),
upper_warning_limit(0),
lower_warning_limit(0),
lower_alarm_limit(0)
{}
};
class DbdToPv;
typedef std::tr1::shared_ptr<DbdToPv> DbdToPvPtr;
typedef void ( caCallbackFunc ) (struct event_handler_args);
/**
* @brief DbdToPv converts between DBD data and pvData.
*
@@ -93,12 +99,13 @@ public:
epics::pvData::Status putToDBD(
CAChannelPtr const & caChannel,
epics::pvData::PVStructurePtr const & pvStructure,
bool block
bool block,
caCallbackFunc putHandler,
void *userArg
);
void getChoicesDone(struct event_handler_args &args);
void descriptionConnected(struct connection_handler_args args);
void getDescriptionDone(struct event_handler_args &args);
void putDone(struct event_handler_args &args);
private:
DbdToPv(IOType ioType);
void activate(
@@ -114,17 +121,19 @@ private:
bool valueAlarmRequested;
bool isArray;
bool firstTime;
bool choicesValid;
bool waitForChoicesValid;
chtype caValueType;
chtype caRequestType;
unsigned long maxElements;
epics::pvData::Mutex choicesMutex;
epics::pvData::Event choicesEvent;
epicsTimeStamp caTimeStamp;
CaAlarm caAlarm;
CaDisplay caDisplay;
CaControl caControl;
CaValueAlarm caValueAlarm;
std::string description;
epics::pvData::Event waitForCallback;
epics::pvData::Status putStatus;
epics::pvData::Structure::const_shared_pointer structure;
std::vector<std::string> choices;
};

114
src/ca/getDoneThread.cpp Normal file
View File

@@ -0,0 +1,114 @@
/**
* Copyright - See the COPYRIGHT that is included with this distribution.
* pvAccessCPP is distributed subject to a Software License Agreement found
* in file LICENSE that is included with this distribution.
*/
/**
* @author mrk
* @date 2018.07
*/
#include "caChannel.h"
#include <epicsExit.h>
#define epicsExportSharedSymbols
#include "getDoneThread.h"
using namespace epics::pvData;
using namespace std;
namespace epics {
namespace pvAccess {
namespace ca {
GetDoneThreadPtr GetDoneThread::get()
{
static GetDoneThreadPtr master;
static Mutex mutex;
Lock xx(mutex);
if(!master) {
master = GetDoneThreadPtr(new GetDoneThread());
master->start();
}
return master;
}
GetDoneThread::GetDoneThread()
: isStop(false)
{
}
GetDoneThread::~GetDoneThread()
{
//std::cout << "GetDoneThread::~GetDoneThread()\n";
}
void GetDoneThread::start()
{
thread = std::tr1::shared_ptr<epicsThread>(new epicsThread(
*this,
"getDoneThread",
epicsThreadGetStackSize(epicsThreadStackSmall),
epicsThreadPriorityLow));
thread->start();
}
void GetDoneThread::stop()
{
{
Lock xx(mutex);
isStop = true;
}
waitForCommand.signal();
waitForStop.wait();
}
void GetDoneThread::getDone(NotifyGetRequesterPtr const &notifyGetRequester)
{
{
Lock lock(mutex);
if(notifyGetRequester->isOnQueue) return;
notifyGetRequester->isOnQueue = true;
notifyGetQueue.push(notifyGetRequester);
}
waitForCommand.signal();
}
void GetDoneThread::run()
{
while(true)
{
waitForCommand.wait();
while(true) {
bool more = false;
NotifyGetRequester* notifyGetRequester(NULL);
{
Lock lock(mutex);
if(!notifyGetQueue.empty())
{
more = true;
NotifyGetRequesterWPtr req(notifyGetQueue.front());
notifyGetQueue.pop();
NotifyGetRequesterPtr reqPtr(req.lock());
if(reqPtr) {
notifyGetRequester = reqPtr.get();
reqPtr->isOnQueue = false;
}
}
}
if(!more) break;
if(notifyGetRequester!=NULL)
{
CAChannelGetPtr channelGet(notifyGetRequester->channelGet.lock());
if(channelGet) channelGet->notifyClient();
}
}
if(isStop) {
waitForStop.signal();
break;
}
}
}
}}}

71
src/ca/getDoneThread.h Normal file
View File

@@ -0,0 +1,71 @@
/**
* Copyright - See the COPYRIGHT that is included with this distribution.
* pvAccessCPP is distributed subject to a Software License Agreement found
* in file LICENSE that is included with this distribution.
*/
/**
* @author mrk
* @date 2018.07
*/
#ifndef GetDoneThread_H
#define GetDoneThread_H
#include <queue>
#include <cadef.h>
#include <shareLib.h>
#include <epicsThread.h>
#include <pv/event.h>
#include <pv/lock.h>
namespace epics {
namespace pvAccess {
namespace ca {
class NotifyGetRequester;
typedef std::tr1::shared_ptr<NotifyGetRequester> NotifyGetRequesterPtr;
typedef std::tr1::weak_ptr<NotifyGetRequester> NotifyGetRequesterWPtr;
class GetDoneThread;
typedef std::tr1::shared_ptr<GetDoneThread> GetDoneThreadPtr;
class CAChannelGet;
typedef std::tr1::shared_ptr<CAChannelGet> CAChannelGetPtr;
typedef std::tr1::weak_ptr<CAChannelGet> CAChannelGetWPtr;
class NotifyGetRequester
{
public:
ChannelGetRequester::weak_pointer channelGetRequester;
CAChannelGetWPtr channelGet;
bool isOnQueue;
NotifyGetRequester() : isOnQueue(false) {}
void setChannelGet(CAChannelGetPtr const &channelGet)
{ this->channelGet = channelGet;}
};
class GetDoneThread :
public epicsThreadRunable
{
public:
static GetDoneThreadPtr get();
~GetDoneThread();
virtual void run();
void start();
void stop();
void getDone(NotifyGetRequesterPtr const &notifyGetRequester);
private:
GetDoneThread();
bool isStop;
std::tr1::shared_ptr<epicsThread> thread;
epics::pvData::Mutex mutex;
epics::pvData::Event waitForCommand;
epics::pvData::Event waitForStop;
std::queue<NotifyGetRequesterWPtr> notifyGetQueue;
};
}}}
#endif /* GetDoneThread_H */

View File

@@ -39,7 +39,7 @@ MonitorEventThread::MonitorEventThread()
MonitorEventThread::~MonitorEventThread()
{
std::cout << "MonitorEventThread::~MonitorEventThread()\n";
//std::cout << "MonitorEventThread::~MonitorEventThread()\n";
}
void MonitorEventThread::start()
@@ -63,13 +63,13 @@ void MonitorEventThread::stop()
}
void MonitorEventThread::event(NotifyRequesterPtr const &stopMonitor)
void MonitorEventThread::event(NotifyMonitorRequesterPtr const &notifyMonitorRequester)
{
{
Lock lock(mutex);
if(stopMonitor->isOnQueue) return;
stopMonitor->isOnQueue = true;
notifyMonitorQueue.push(stopMonitor);
if(notifyMonitorRequester->isOnQueue) return;
notifyMonitorRequester->isOnQueue = true;
notifyMonitorQueue.push(notifyMonitorRequester);
}
waitForCommand.signal();
}
@@ -81,25 +81,25 @@ void MonitorEventThread::run()
waitForCommand.wait();
while(true) {
bool more = false;
NotifyRequester* notifyRequester(NULL);
NotifyMonitorRequester* notifyMonitorRequester(NULL);
{
Lock lock(mutex);
if(!notifyMonitorQueue.empty())
{
more = true;
NotifyRequesterWPtr req(notifyMonitorQueue.front());
NotifyMonitorRequesterWPtr req(notifyMonitorQueue.front());
notifyMonitorQueue.pop();
NotifyRequesterPtr reqPtr(req.lock());
NotifyMonitorRequesterPtr reqPtr(req.lock());
if(reqPtr) {
notifyRequester = reqPtr.get();
notifyMonitorRequester = reqPtr.get();
reqPtr->isOnQueue = false;
}
}
}
if(!more) break;
if(notifyRequester!=NULL)
if(notifyMonitorRequester!=NULL)
{
CAChannelMonitorPtr channelMonitor(notifyRequester->channelMonitor.lock());
CAChannelMonitorPtr channelMonitor(notifyMonitorRequester->channelMonitor.lock());
if(channelMonitor) channelMonitor->notifyClient();
}
}

View File

@@ -20,9 +20,9 @@ namespace epics {
namespace pvAccess {
namespace ca {
class NotifyRequester;
typedef std::tr1::shared_ptr<NotifyRequester> NotifyRequesterPtr;
typedef std::tr1::weak_ptr<NotifyRequester> NotifyRequesterWPtr;
class NotifyMonitorRequester;
typedef std::tr1::shared_ptr<NotifyMonitorRequester> NotifyMonitorRequesterPtr;
typedef std::tr1::weak_ptr<NotifyMonitorRequester> NotifyMonitorRequesterWPtr;
class MonitorEventThread;
@@ -32,13 +32,13 @@ class CAChannelMonitor;
typedef std::tr1::shared_ptr<CAChannelMonitor> CAChannelMonitorPtr;
typedef std::tr1::weak_ptr<CAChannelMonitor> CAChannelMonitorWPtr;
class NotifyRequester
class NotifyMonitorRequester
{
public:
MonitorRequester::weak_pointer monitorRequester;
CAChannelMonitorWPtr channelMonitor;
bool isOnQueue;
NotifyRequester() : isOnQueue(false) {}
NotifyMonitorRequester() : isOnQueue(false) {}
void setChannelMonitor(CAChannelMonitorPtr const &channelMonitor)
{ this->channelMonitor = channelMonitor;}
};
@@ -53,7 +53,7 @@ public:
virtual void run();
void start();
void stop();
void event(NotifyRequesterPtr const &notifyRequester);
void event(NotifyMonitorRequesterPtr const &notifyMonitorRequester);
private:
MonitorEventThread();
@@ -62,7 +62,7 @@ private:
epics::pvData::Mutex mutex;
epics::pvData::Event waitForCommand;
epics::pvData::Event waitForStop;
std::queue<NotifyRequesterWPtr> notifyMonitorQueue;
std::queue<NotifyMonitorRequesterWPtr> notifyMonitorQueue;
};

114
src/ca/putDoneThread.cpp Normal file
View File

@@ -0,0 +1,114 @@
/**
* Copyright - See the COPYRIGHT that is included with this distribution.
* pvAccessCPP is distributed subject to a Software License Agreement found
* in file LICENSE that is included with this distribution.
*/
/**
* @author mrk
* @date 2018.07
*/
#include "caChannel.h"
#include <epicsExit.h>
#define epicsExportSharedSymbols
#include "putDoneThread.h"
using namespace epics::pvData;
using namespace std;
namespace epics {
namespace pvAccess {
namespace ca {
PutDoneThreadPtr PutDoneThread::get()
{
static PutDoneThreadPtr master;
static Mutex mutex;
Lock xx(mutex);
if(!master) {
master = PutDoneThreadPtr(new PutDoneThread());
master->start();
}
return master;
}
PutDoneThread::PutDoneThread()
: isStop(false)
{
}
PutDoneThread::~PutDoneThread()
{
//std::cout << "PutDoneThread::~PutDoneThread()\n";
}
void PutDoneThread::start()
{
thread = std::tr1::shared_ptr<epicsThread>(new epicsThread(
*this,
"putDoneThread",
epicsThreadGetStackSize(epicsThreadStackSmall),
epicsThreadPriorityLow));
thread->start();
}
void PutDoneThread::stop()
{
{
Lock xx(mutex);
isStop = true;
}
waitForCommand.signal();
waitForStop.wait();
}
void PutDoneThread::putDone(NotifyPutRequesterPtr const &notifyPutRequester)
{
{
Lock lock(mutex);
if(notifyPutRequester->isOnQueue) return;
notifyPutRequester->isOnQueue = true;
notifyPutQueue.push(notifyPutRequester);
}
waitForCommand.signal();
}
void PutDoneThread::run()
{
while(true)
{
waitForCommand.wait();
while(true) {
bool more = false;
NotifyPutRequester* notifyPutRequester(NULL);
{
Lock lock(mutex);
if(!notifyPutQueue.empty())
{
more = true;
NotifyPutRequesterWPtr req(notifyPutQueue.front());
notifyPutQueue.pop();
NotifyPutRequesterPtr reqPtr(req.lock());
if(reqPtr) {
notifyPutRequester = reqPtr.get();
reqPtr->isOnQueue = false;
}
}
}
if(!more) break;
if(notifyPutRequester!=NULL)
{
CAChannelPutPtr channelPut(notifyPutRequester->channelPut.lock());
if(channelPut) channelPut->notifyClient();
}
}
if(isStop) {
waitForStop.signal();
break;
}
}
}
}}}

70
src/ca/putDoneThread.h Normal file
View File

@@ -0,0 +1,70 @@
/**
* Copyright - See the COPYRIGHT that is included with this distribution.
* pvAccessCPP is distributed subject to a Software License Agreement found
* in file LICENSE that is included with this distribution.
*/
/**
* @author mrk
* @date 2018.07
*/
#ifndef PutDoneThread_H
#define PutDoneThread_H
#include <queue>
#include <cadef.h>
#include <shareLib.h>
#include <epicsThread.h>
#include <pv/event.h>
#include <pv/lock.h>
namespace epics {
namespace pvAccess {
namespace ca {
class NotifyPutRequester;
typedef std::tr1::shared_ptr<NotifyPutRequester> NotifyPutRequesterPtr;
typedef std::tr1::weak_ptr<NotifyPutRequester> NotifyPutRequesterWPtr;
class PutDoneThread;
typedef std::tr1::shared_ptr<PutDoneThread> PutDoneThreadPtr;
class CAChannelPut;
typedef std::tr1::shared_ptr<CAChannelPut> CAChannelPutPtr;
typedef std::tr1::weak_ptr<CAChannelPut> CAChannelPutWPtr;
class NotifyPutRequester
{
public:
ChannelPutRequester::weak_pointer channelPutRequester;
CAChannelPutWPtr channelPut;
bool isOnQueue;
NotifyPutRequester() : isOnQueue(false) {}
void setChannelPut(CAChannelPutPtr const &channelPut)
{ this->channelPut = channelPut;}
};
class PutDoneThread :
public epicsThreadRunable
{
public:
static PutDoneThreadPtr get();
~PutDoneThread();
virtual void run();
void start();
void stop();
void putDone(NotifyPutRequesterPtr const &notifyPutRequester);
private:
PutDoneThread();
bool isStop;
std::tr1::shared_ptr<epicsThread> thread;
epics::pvData::Mutex mutex;
epics::pvData::Event waitForCommand;
epics::pvData::Event waitForStop;
std::queue<NotifyPutRequesterWPtr> notifyPutQueue;
};
}}}
#endif /* PutDoneThread_H */

View File

@@ -3,6 +3,9 @@
* pvAccessCPP is distributed subject to a Software License Agreement found
* in file LICENSE that is included with this distribution.
*/
/**
* @author msekoranja
*/
#ifndef CAPROVIDER_H
#define CAPROVIDER_H
@@ -22,10 +25,17 @@ namespace ca {
* A single instance is created the first time CAClientFactory::start is called.
* epicsAtExit is used to destroy the instance.
*
* The single instance calls:
* ca_context_create(ca_enable_preemptive_callback);
*
* The thread that calls start, or a ca auxillary thread, are the only threads
* that can call the ca_* functions.
*
* Note the monitor callbacks are made from a separate thread that must NOT call any ca_* function.
* NOTE: callbacks for monitor, get, and put are made from a separate thread.
* This is done to prevent a deadly embrace that can occur
* when rapid gets, puts, and monitor events are happening.
* The callbacks should not call any pvAccess method.
* If any such call is made the separate thread becomes a ca auxillary thread.
*
*/
class epicsShareClass CAClientFactory
@@ -35,12 +45,6 @@ public:
*
*/
static void start();
/** @brief get the ca_client_context
*
* This can be called by an application specific auxiliary thread.
* See ca documentation. Not for casual use.
*/
static struct ca_client_context * get_ca_client_context();
/** @brief stop provider ca
*
* This does nothing since epicsAtExit is used to destroy the instance.