From df45c70149ec8f87bdea37f637b9ca4eea95100c Mon Sep 17 00:00:00 2001 From: mrkraimer Date: Sat, 14 Jul 2018 11:46:26 -0400 Subject: [PATCH] add putDoneThread and getDoneThread; fix issue 114 --- caProvider.md | 26 +++---- src/ca/Makefile | 2 + src/ca/caChannel.cpp | 127 ++++++++++++++++++++++++---------- src/ca/caChannel.h | 45 ++++++++---- src/ca/caProvider.cpp | 38 +++------- src/ca/caProviderPvt.h | 16 +++-- src/ca/dbdToPv.cpp | 75 ++++++++++---------- src/ca/dbdToPv.h | 27 +++++--- src/ca/getDoneThread.cpp | 114 ++++++++++++++++++++++++++++++ src/ca/getDoneThread.h | 71 +++++++++++++++++++ src/ca/monitorEventThread.cpp | 22 +++--- src/ca/monitorEventThread.h | 14 ++-- src/ca/putDoneThread.cpp | 114 ++++++++++++++++++++++++++++++ src/ca/putDoneThread.h | 70 +++++++++++++++++++ src/ca/pv/caProvider.h | 18 +++-- 15 files changed, 615 insertions(+), 164 deletions(-) create mode 100644 src/ca/getDoneThread.cpp create mode 100644 src/ca/getDoneThread.h create mode 100644 src/ca/putDoneThread.cpp create mode 100644 src/ca/putDoneThread.h diff --git a/caProvider.md b/caProvider.md index 7f6c536..6a479aa 100644 --- a/caProvider.md +++ b/caProvider.md @@ -1,6 +1,6 @@ # pvAccessCPP: ca provider -2018.06.25 +2018.07.09 Editors: @@ -610,26 +610,26 @@ Any code that uses **ca** must call **CAClientFactory::start()** before making a ca_context_create is called for the thread that calls CAClientFactory::start(). -Client code can create an Auxillary Thread by calling: - -``` -ca_client_context* current_context = CAClientFactory::get_ca_client_context(); -int result = ca_attach_context(current_context); - -``` +If the client creates auxillary threads the make pvAccess client requests then the auxillary threads will automatically become +a **ca** auxilary thread. [Deadlock in ca_clear_subscription()](https://bugs.launchpad.net/epics-base/7.0/+bug/1751380) Shows a problem with monitor callbacks. +A test was created that shows that the same problem can occur with a combination of rapid get, put and monitor events. -In order to prevent this problem **ca** creates a monitorEventThread. -All calls to the requester's **monitorEvent** method are made from the monitorEventThread. +In order to prevent this problem **ca** creates the following threads: +**getEventThread**, **putEventThread**, and **monitorEventThread**. -**Note** the monitorEventThread does not call **ca_attach_context**. -This means that no **ca_xxx** function can be called from -the requester's **monitorEvent** method. +All client callbacks are made via one of these threads. +For example a call to the requester's **monitorEvent** method is made from the monitorEventThread. + +**Notes** + +* These threads do not call **ca_attach_context**. +* No **ca_xxx** function should be called from the requester's callback method. diff --git a/src/ca/Makefile b/src/ca/Makefile index 7aa4247..93e6b27 100644 --- a/src/ca/Makefile +++ b/src/ca/Makefile @@ -12,6 +12,8 @@ LIB_SYS_LIBS_WIN32 += ws2_32 INC += pv/caProvider.h pvAccessCA_SRCS += monitorEventThread.cpp +pvAccessCA_SRCS += getDoneThread.cpp +pvAccessCA_SRCS += putDoneThread.cpp pvAccessCA_SRCS += caProvider.cpp pvAccessCA_SRCS += caChannel.cpp pvAccessCA_SRCS += dbdToPv.cpp diff --git a/src/ca/caChannel.cpp b/src/ca/caChannel.cpp index a4294a3..b19349b 100644 --- a/src/ca/caChannel.cpp +++ b/src/ca/caChannel.cpp @@ -10,8 +10,9 @@ #include #include #include -#include #include "monitorEventThread.h" +#include "getDoneThread.h" +#include "putDoneThread.h" #define epicsExportSharedSymbols #include "caChannel.h" @@ -95,8 +96,6 @@ void CAChannel::disconnected() } } -size_t CAChannel::num_instances; - CAChannel::CAChannel(std::string const & channelName, CAChannelProvider::shared_pointer const & channelProvider, ChannelRequester::shared_pointer const & channelRequester) : @@ -402,9 +401,6 @@ void CAChannel::attachContext() throw std::runtime_error(mess); } - -size_t CAChannelGet::num_instances; - CAChannelGetPtr CAChannelGet::create( CAChannel::shared_pointer const & channel, ChannelGetRequester::shared_pointer const & channelGetRequester, @@ -422,7 +418,9 @@ CAChannelGet::CAChannelGet(CAChannel::shared_pointer const & channel, : channel(channel), channelGetRequester(channelGetRequester), - pvRequest(pvRequest) + pvRequest(pvRequest), + getStatus(Status::Ok), + getDoneThread(GetDoneThread::get()) {} CAChannelGet::~CAChannelGet() @@ -442,10 +440,14 @@ void CAChannelGet::activate() dbdToPv = DbdToPv::create(channel,pvRequest,getIO); pvStructure = dbdToPv->createPVStructure(); bitSet = BitSetPtr(new BitSet(pvStructure->getStructure()->getNumberFields())); + notifyGetRequester = NotifyGetRequesterPtr(new NotifyGetRequester()); + notifyGetRequester->setChannelGet(shared_from_this()); EXCEPTION_GUARD(getRequester->channelGetConnect(Status::Ok, shared_from_this(), pvStructure->getStructure())); } + + std::string CAChannelGet::getRequesterName() { return "CAChannelGet";} namespace { @@ -467,8 +469,15 @@ void CAChannelGet::getDone(struct event_handler_args &args) ChannelGetRequester::shared_pointer getRequester(channelGetRequester.lock()); if(!getRequester) return; - Status status = dbdToPv->getFromDBD(pvStructure,bitSet,args); - EXCEPTION_GUARD(getRequester->getDone(status, shared_from_this(), pvStructure, bitSet)); + getStatus = dbdToPv->getFromDBD(pvStructure,bitSet,args); + getDoneThread->getDone(notifyGetRequester); +} + +void CAChannelGet::notifyClient() +{ + ChannelGetRequester::shared_pointer getRequester(channelGetRequester.lock()); + if(!getRequester) return; + EXCEPTION_GUARD(getRequester->getDone(getStatus, shared_from_this(), pvStructure, bitSet)); } void CAChannelGet::get() @@ -491,7 +500,8 @@ void CAChannelGet::get() { string mess("CAChannelGet::get "); mess += channel->getChannelName() + " message " + ca_message(result); - throw std::runtime_error(mess); + getStatus = Status(Status::STATUSTYPE_ERROR,mess); + notifyClient(); } } @@ -508,8 +518,6 @@ void CAChannelGet::lastRequest() { } -size_t CAChannelPut::num_instances; - CAChannelPutPtr CAChannelPut::create( CAChannel::shared_pointer const & channel, ChannelPutRequester::shared_pointer const & channelPutRequester, @@ -528,7 +536,11 @@ CAChannelPut::CAChannelPut(CAChannel::shared_pointer const & channel, channel(channel), channelPutRequester(channelPutRequester), pvRequest(pvRequest), - block(false) + block(false), + isPut(false), + getStatus(Status::Ok), + putStatus(Status::Ok), + putDoneThread(PutDoneThread::get()) {} CAChannelPut::~CAChannelPut() @@ -554,6 +566,8 @@ void CAChannelPut::activate() std::string val = pvString->get(); if(val.compare("true")==0) block = true; } + notifyPutRequester = NotifyPutRequesterPtr(new NotifyPutRequester()); + notifyPutRequester->setChannelPut(shared_from_this()); EXCEPTION_GUARD(putRequester->channelPutConnect(Status::Ok, shared_from_this(), pvStructure->getStructure())); } @@ -565,6 +579,12 @@ std::string CAChannelPut::getRequesterName() { return "CAChannelPut";} namespace { +static void ca_put_handler(struct event_handler_args args) +{ + CAChannelPut *channelPut = static_cast(args.usr); + channelPut->putDone(args); +} + static void ca_put_get_handler(struct event_handler_args args) { CAChannelPut *channelPut = static_cast(args.usr); @@ -582,11 +602,33 @@ void CAChannelPut::put(PVStructure::shared_pointer const & pvPutStructure, } ChannelPutRequester::shared_pointer putRequester(channelPutRequester.lock()); if(!putRequester) return; - Status status = dbdToPv->putToDBD(channel,pvPutStructure,block); - EXCEPTION_GUARD(putRequester->putDone(status, shared_from_this())); + { + Lock lock(mutex); + isPut = true; + } + putStatus = dbdToPv->putToDBD(channel,pvPutStructure,block,&ca_put_handler,this); + if(!block || !putStatus.isOK()) { + EXCEPTION_GUARD(putRequester->putDone(putStatus, shared_from_this())); + } } +void CAChannelPut::putDone(struct event_handler_args &args) +{ + if(DEBUG_LEVEL>1) { + cout << "CAChannelPut::putDone " << channel->getChannelName() << endl; + } + ChannelPutRequester::shared_pointer putRequester(channelPutRequester.lock()); + if(!putRequester) return; + if(args.status!=ECA_NORMAL) + { + putStatus = Status(Status::STATUSTYPE_ERROR, string(ca_message(args.status))); + } else { + putStatus = Status::Ok; + } + putDoneThread->putDone(notifyPutRequester); +} + void CAChannelPut::getDone(struct event_handler_args &args) { if(DEBUG_LEVEL>1) { @@ -595,8 +637,19 @@ void CAChannelPut::getDone(struct event_handler_args &args) ChannelPutRequester::shared_pointer putRequester(channelPutRequester.lock()); if(!putRequester) return; - Status status = dbdToPv->getFromDBD(pvStructure,bitSet,args); - EXCEPTION_GUARD(putRequester->getDone(status, shared_from_this(), pvStructure, bitSet)); + getStatus = dbdToPv->getFromDBD(pvStructure,bitSet,args); + putDoneThread->putDone(notifyPutRequester); +} + +void CAChannelPut::notifyClient() +{ + ChannelPutRequester::shared_pointer putRequester(channelPutRequester.lock()); + if(!putRequester) return; + if(isPut) { + EXCEPTION_GUARD(putRequester->putDone(putStatus, shared_from_this())); + } else { + EXCEPTION_GUARD(putRequester->getDone(getStatus, shared_from_this(), pvStructure, bitSet)); + } } @@ -607,6 +660,11 @@ void CAChannelPut::get() } ChannelPutRequester::shared_pointer putRequester(channelPutRequester.lock()); if(!putRequester) return; + { + Lock lock(mutex); + isPut = false; + } + channel->attachContext(); bitSet->clear(); int result = ca_array_get_callback(dbdToPv->getRequestType(), @@ -620,7 +678,8 @@ void CAChannelPut::get() { string mess("CAChannelPut::get "); mess += channel->getChannelName() + " message " +ca_message(result); - throw std::runtime_error(mess); + Status status(Status::STATUSTYPE_ERROR,mess); + EXCEPTION_GUARD(putRequester->getDone(status, shared_from_this(), pvStructure, bitSet)); } } @@ -716,8 +775,6 @@ public: } }; -size_t CAChannelMonitor::num_instances; - CAChannelMonitorPtr CAChannelMonitor::create( CAChannel::shared_pointer const & channel, MonitorRequester::shared_pointer const & monitorRequester, @@ -775,9 +832,8 @@ void CAChannelMonitor::activate() if (size > 1) queueSize = size; } } - notifyRequester = NotifyRequesterPtr(new NotifyRequester()); - - notifyRequester->setChannelMonitor(shared_from_this()); + notifyMonitorRequester = NotifyMonitorRequesterPtr(new NotifyMonitorRequester()); + notifyMonitorRequester->setChannelMonitor(shared_from_this()); monitorQueue = CACMonitorQueuePtr(new CACMonitorQueue(queueSize)); EXCEPTION_GUARD(requester->monitorConnect(Status::Ok, shared_from_this(), pvStructure->getStructure())); @@ -806,7 +862,7 @@ void CAChannelMonitor::subscriptionEvent(struct event_handler_args &args) } else { *(activeElement->overrunBitSet) |= *(activeElement->changedBitSet); } - monitorEventThread->event(notifyRequester); + monitorEventThread->event(notifyMonitorRequester); } else { @@ -817,6 +873,18 @@ void CAChannelMonitor::subscriptionEvent(struct event_handler_args &args) } } + +void CAChannelMonitor::notifyClient() +{ + { + Lock lock(mutex); + if(!isStarted) return; + } + MonitorRequester::shared_pointer requester(monitorRequester.lock()); + if(!requester) return; + requester->monitorEvent(shared_from_this()); +} + Status CAChannelMonitor::start() { if(DEBUG_LEVEL>0) { @@ -867,17 +935,6 @@ Status CAChannelMonitor::stop() return Status(Status::STATUSTYPE_ERROR,string(ca_message(result))); } -void CAChannelMonitor::notifyClient() -{ - { - Lock lock(mutex); - if(!isStarted) return; - } - MonitorRequester::shared_pointer requester(monitorRequester.lock()); - if(!requester) return; - requester->monitorEvent(shared_from_this()); -} - MonitorElementPtr CAChannelMonitor::poll() { diff --git a/src/ca/caChannel.h b/src/ca/caChannel.h index ae641c0..8228627 100644 --- a/src/ca/caChannel.h +++ b/src/ca/caChannel.h @@ -4,6 +4,11 @@ * in file LICENSE that is included with this distribution. */ +/** + * @author msekoranja, mrk + * @date 2018.07 + */ + #ifndef CACHANNEL_H #define CACHANNEL_H @@ -23,12 +28,22 @@ namespace epics { namespace pvAccess { namespace ca { -class NotifyRequester; -typedef std::tr1::shared_ptr NotifyRequesterPtr; - +class NotifyMonitorRequester; +typedef std::tr1::shared_ptr NotifyMonitorRequesterPtr; class MonitorEventThread; typedef std::tr1::shared_ptr MonitorEventThreadPtr; +class NotifyGetRequester; +typedef std::tr1::shared_ptr NotifyGetRequesterPtr; +typedef std::tr1::weak_ptr NotifyGetRequesterWPtr; +class GetDoneThread; +typedef std::tr1::shared_ptr GetDoneThreadPtr; + +class NotifyPutRequester; +typedef std::tr1::shared_ptr NotifyPutRequesterPtr; +typedef std::tr1::weak_ptr NotifyPutRequesterWPtr; +class PutDoneThread; +typedef std::tr1::shared_ptr PutDoneThreadPtr; class CAChannelGetField; typedef std::tr1::shared_ptr CAChannelGetFieldPtr; @@ -62,7 +77,6 @@ class CAChannel : { public: POINTER_DEFINITIONS(CAChannel); - static size_t num_instances; static CAChannelPtr create( CAChannelProvider::shared_pointer const & channelProvider, std::string const & channelName, @@ -123,7 +137,6 @@ class CAChannelGet : { public: POINTER_DEFINITIONS(CAChannelGet); - static size_t num_instances; static CAChannelGet::shared_pointer create(CAChannel::shared_pointer const & channel, ChannelGetRequester::shared_pointer const & channelGetRequester, epics::pvData::PVStructurePtr const & pvRequest); @@ -136,7 +149,7 @@ public: virtual std::string getRequesterName(); void activate(); - + void notifyClient(); private: virtual void destroy() {} CAChannelGet(CAChannel::shared_pointer const & _channel, @@ -146,7 +159,11 @@ private: CAChannelPtr channel; ChannelGetRequester::weak_pointer channelGetRequester; epics::pvData::PVStructurePtr const & pvRequest; + epics::pvData::Status getStatus; + GetDoneThreadPtr getDoneThread; + NotifyGetRequesterPtr notifyGetRequester; DbdToPvPtr dbdToPv; + epics::pvData::Mutex mutex; epics::pvData::PVStructure::shared_pointer pvStructure; epics::pvData::BitSet::shared_pointer bitSet; }; @@ -158,11 +175,11 @@ class CAChannelPut : public: POINTER_DEFINITIONS(CAChannelPut); - static size_t num_instances; static CAChannelPut::shared_pointer create(CAChannel::shared_pointer const & channel, ChannelPutRequester::shared_pointer const & channelPutRequester, epics::pvData::PVStructurePtr const & pvRequest); virtual ~CAChannelPut(); + void putDone(struct event_handler_args &args); void getDone(struct event_handler_args &args); virtual void put( epics::pvData::PVStructure::shared_pointer const & pvPutStructure, @@ -175,6 +192,7 @@ public: virtual std::string getRequesterName(); void activate(); + void notifyClient(); private: virtual void destroy() {} CAChannelPut(CAChannel::shared_pointer const & _channel, @@ -184,7 +202,13 @@ private: ChannelPutRequester::weak_pointer channelPutRequester; const epics::pvData::PVStructure::shared_pointer pvRequest; bool block; + bool isPut; + epics::pvData::Status getStatus; + epics::pvData::Status putStatus; + PutDoneThreadPtr putDoneThread; + NotifyPutRequesterPtr notifyPutRequester; DbdToPvPtr dbdToPv; + epics::pvData::Mutex mutex; epics::pvData::PVStructure::shared_pointer pvStructure; epics::pvData::BitSet::shared_pointer bitSet; }; @@ -199,7 +223,6 @@ class CAChannelMonitor : public: POINTER_DEFINITIONS(CAChannelMonitor); - static size_t num_instances; static CAChannelMonitor::shared_pointer create(CAChannel::shared_pointer const & channel, MonitorRequester::shared_pointer const & monitorRequester, epics::pvData::PVStructurePtr const & pvRequest); @@ -223,9 +246,9 @@ private: MonitorRequester::weak_pointer monitorRequester; const epics::pvData::PVStructure::shared_pointer pvRequest; bool isStarted; - NotifyRequesterPtr notifyRequester; MonitorEventThreadPtr monitorEventThread; evid pevid; + NotifyMonitorRequesterPtr notifyMonitorRequester; DbdToPvPtr dbdToPv; epics::pvData::Mutex mutex; @@ -235,8 +258,6 @@ private: CACMonitorQueuePtr monitorQueue; }; -} -} -} +}}} #endif /* CACHANNEL_H */ diff --git a/src/ca/caProvider.cpp b/src/ca/caProvider.cpp index 27386e2..5c5b22c 100644 --- a/src/ca/caProvider.cpp +++ b/src/ca/caProvider.cpp @@ -4,18 +4,16 @@ * in file LICENSE that is included with this distribution. */ -#include - #include #include #include #include #include -#include #include -#include #include "monitorEventThread.h" +#include "getDoneThread.h" +#include "putDoneThread.h" #define epicsExportSharedSymbols #include @@ -33,8 +31,6 @@ using namespace epics::pvData; catch (std::exception &e) { LOG(logLevelError, "Unhandled exception caught from client code at %s:%d: %s", __FILE__, __LINE__, e.what()); } \ catch (...) { LOG(logLevelError, "Unhandled exception caught from client code at %s:%d.", __FILE__, __LINE__); } -size_t CAChannelProvider::num_instances; - CAChannelProvider::CAChannelProvider() : current_context(0) { @@ -43,7 +39,9 @@ CAChannelProvider::CAChannelProvider() CAChannelProvider::CAChannelProvider(const std::tr1::shared_ptr&) : current_context(0), - monitorEventThread(MonitorEventThread::get()) + monitorEventThread(MonitorEventThread::get()), + getDoneThread(GetDoneThread::get()), + putDoneThread(PutDoneThread::get()) { if(DEBUG_LEVEL>0) { std::cout<< "CAChannelProvider::CAChannelProvider\n"; @@ -78,11 +76,13 @@ CAChannelProvider::~CAChannelProvider() channelQ.pop(); } monitorEventThread->stop(); + getDoneThread->stop(); + putDoneThread->stop(); if(DEBUG_LEVEL>0) { std::cout << "CAChannelProvider::~CAChannelProvider() calling ca_context_destroy\n"; } ca_context_destroy(); -std::cout << "CAChannelProvider::~CAChannelProvider() returning\n"; +//std::cout << "CAChannelProvider::~CAChannelProvider() returning\n"; } std::string CAChannelProvider::getProviderName() @@ -197,11 +197,6 @@ void CAChannelProvider::initialize() current_context = ca_current_context(); } -ca_client_context * CAChannelProvider::get_ca_client_context() -{ - return current_context; -} - void CAClientFactory::start() { if(DEBUG_LEVEL>0) std::cout << "CAClientFactory::start()\n"; @@ -210,29 +205,12 @@ void CAClientFactory::start() } epicsSignalInstallSigAlarmIgnore(); epicsSignalInstallSigPipeIgnore(); - registerRefCounter("CAChannelProvider", &CAChannelProvider::num_instances); - registerRefCounter("CAChannel", &CAChannel::num_instances); - registerRefCounter("CAChannelGet", &CAChannelGet::num_instances); - registerRefCounter("CAChannelPut", &CAChannelPut::num_instances); - registerRefCounter("CAChannelMonitor", &CAChannelMonitor::num_instances); - if(!ChannelProviderRegistry::clients()->add("ca", true)) { throw std::runtime_error("CAClientFactory::start failed"); } } -ca_client_context * CAClientFactory::get_ca_client_context() -{ - if(DEBUG_LEVEL>0) std::cout << "CAClientFactory::get_ca_client_context\n"; - ChannelProvider::shared_pointer channelProvider( - ChannelProviderRegistry::clients()->getProvider("ca")); - if(!channelProvider) throw std::runtime_error("CAClientFactory::start() was not called"); - CAChannelProviderPtr cacChannelProvider - = std::tr1::static_pointer_cast(channelProvider); - return cacChannelProvider->get_ca_client_context(); -} - void CAClientFactory::stop() { // unregister now done with exit hook diff --git a/src/ca/caProviderPvt.h b/src/ca/caProviderPvt.h index af0c4d9..0d48271 100644 --- a/src/ca/caProviderPvt.h +++ b/src/ca/caProviderPvt.h @@ -4,6 +4,11 @@ * in file LICENSE that is included with this distribution. */ +/** + * @author msekoranja, mrk + * @date 2018.07 + */ + #ifndef CAPROVIDERPVT_H #define CAPROVIDERPVT_H @@ -22,6 +27,11 @@ namespace ca { class MonitorEventThread; typedef std::tr1::shared_ptr MonitorEventThreadPtr; +class GetDoneThread; +typedef std::tr1::shared_ptr GetDoneThreadPtr; + +class PutDoneThread; +typedef std::tr1::shared_ptr PutDoneThreadPtr; class CAChannel; typedef std::tr1::shared_ptr CAChannelPtr; @@ -37,9 +47,6 @@ class CAChannelProvider : { public: POINTER_DEFINITIONS(CAChannelProvider); - - static size_t num_instances; - CAChannelProvider(); CAChannelProvider(const std::tr1::shared_ptr&); virtual ~CAChannelProvider(); @@ -72,7 +79,6 @@ public: void attachContext(); void addChannel(const CAChannelPtr & channel); - ca_client_context* get_ca_client_context(); private: virtual void destroy() EPICS_DEPRECATED {} @@ -81,6 +87,8 @@ private: epics::pvData::Mutex channelListMutex; std::vector caChannelList; MonitorEventThreadPtr monitorEventThread; + GetDoneThreadPtr getDoneThread; + PutDoneThreadPtr putDoneThread; }; }}} diff --git a/src/ca/dbdToPv.cpp b/src/ca/dbdToPv.cpp index f8fe8ca..636dc5e 100644 --- a/src/ca/dbdToPv.cpp +++ b/src/ca/dbdToPv.cpp @@ -48,12 +48,6 @@ static void descriptionHandler(struct event_handler_args args) dbdToPv->getDescriptionDone(args); } -static void putHandler(struct event_handler_args args) -{ - DbdToPv *dbdToPv = static_cast(args.usr); - dbdToPv->putDone(args); -} - DbdToPvPtr DbdToPv::create( CAChannelPtr const & caChannel, PVStructurePtr const & pvRequest, @@ -74,6 +68,8 @@ DbdToPv::DbdToPv(IOType ioType) valueAlarmRequested(false), isArray(false), firstTime(true), + choicesValid(false), + waitForChoicesValid(false), caValueType(-1), caRequestType(-1), maxElements(0) @@ -292,6 +288,13 @@ void DbdToPv::getChoicesDone(struct event_handler_args &args) size_t num = dbr_enum_p->no_str; choices.reserve(num); for(size_t i=0; istrs[i][0])); + bool signal = false; + { + Lock lock(choicesMutex); + choicesValid = true; + if(waitForChoicesValid) signal = true; + } + if(signal) choicesEvent.signal(); } chtype DbdToPv::getRequestType() @@ -685,7 +688,9 @@ const void * put_DBRScalarArray(unsigned long*count, PVScalarArray::shared_point Status DbdToPv::putToDBD( CAChannelPtr const & caChannel, PVStructurePtr const & pvStructure, - bool block) + bool block, + caCallbackFunc putHandler, + void * userarg) { chid channelID = caChannel->getChannelID(); const void *pValue = NULL; @@ -745,6 +750,23 @@ Status DbdToPv::putToDBD( switch(caValueType) { case DBR_ENUM: { + bool wait = false; + { + Lock lock(choicesMutex); + if(!choicesValid) { + wait = true; + waitForChoicesValid = true; + } + } + bool result = true; + if(wait) { + result = choicesEvent.wait(5.0); + } + if(!result) { + Status errorStatus( + Status::STATUSTYPE_ERROR, string("DbdToPv::getFromDBD ")); + return errorStatus; + } dbr_enum_t indexvalue = pvStructure->getSubField("value.index")->get(); pValue = &indexvalue; break; @@ -761,40 +783,21 @@ Status DbdToPv::putToDBD( return errorStatus; } } + Status status = Status::Ok; int result = 0; + caChannel->attachContext(); if(block) { - caChannel->attachContext(); - result = ca_array_put_callback(caValueType,count,channelID,pValue,putHandler,this); - if(result==ECA_NORMAL) { - ca_flush_io(); - if(!waitForCallback.wait(2.0)) { - throw std::runtime_error("DbDToPv::putToDBD waitForCallback timeout"); - } - return putStatus; - } + result = ca_array_put_callback(caValueType,count,channelID,pValue,putHandler,userarg); } else { - caChannel->attachContext(); result = ca_array_put(caValueType,count,channelID,pValue); - ca_flush_io(); + } + if(result==ECA_NORMAL) { + ca_flush_io(); + } else { + status = Status(Status::STATUSTYPE_ERROR, string(ca_message(result))); } if(ca_stringBuffer!=NULL) delete[] ca_stringBuffer; - if(result==ECA_NORMAL) return Status::Ok; - Status errorStatus(Status::STATUSTYPE_ERROR, string(ca_message(result))); - return errorStatus; -} - -void DbdToPv::putDone(struct event_handler_args &args) -{ - if(args.status!=ECA_NORMAL) - { - string message("DbdToPv::putDone ca_message "); - message += ca_message(args.status); - putStatus = Status(Status::STATUSTYPE_ERROR, string(ca_message(args.status))); - } else { - putStatus = Status::Ok; - } - waitForCallback.signal(); -} - + return status; +} }}} diff --git a/src/ca/dbdToPv.h b/src/ca/dbdToPv.h index dea4a4b..5ee77b5 100644 --- a/src/ca/dbdToPv.h +++ b/src/ca/dbdToPv.h @@ -35,40 +35,46 @@ typedef std::tr1::shared_ptr ValueAlarmDbdPtr; struct CaAlarm { - CaAlarm() : status(0), severity(0) {} dbr_short_t status; dbr_short_t severity; + CaAlarm() : status(0), severity(0) {} }; struct CaDisplay { - CaDisplay() : lower_disp_limit(0),upper_disp_limit(0) {} double lower_disp_limit; double upper_disp_limit; std::string units; std::string format; + CaDisplay() : lower_disp_limit(0),upper_disp_limit(0) {} }; struct CaControl { - CaControl() : upper_ctrl_limit(0),lower_ctrl_limit(0) {} double upper_ctrl_limit; double lower_ctrl_limit; + CaControl() : upper_ctrl_limit(0),lower_ctrl_limit(0) {} }; struct CaValueAlarm { - CaValueAlarm() : upper_alarm_limit(0),upper_warning_limit(0),lower_warning_limit(0),lower_alarm_limit(0) - {} double upper_alarm_limit; double upper_warning_limit; double lower_warning_limit; double lower_alarm_limit; + CaValueAlarm() : + upper_alarm_limit(0), + upper_warning_limit(0), + lower_warning_limit(0), + lower_alarm_limit(0) + {} }; class DbdToPv; typedef std::tr1::shared_ptr DbdToPvPtr; +typedef void ( caCallbackFunc ) (struct event_handler_args); + /** * @brief DbdToPv converts between DBD data and pvData. * @@ -93,12 +99,13 @@ public: epics::pvData::Status putToDBD( CAChannelPtr const & caChannel, epics::pvData::PVStructurePtr const & pvStructure, - bool block + bool block, + caCallbackFunc putHandler, + void *userArg ); void getChoicesDone(struct event_handler_args &args); void descriptionConnected(struct connection_handler_args args); void getDescriptionDone(struct event_handler_args &args); - void putDone(struct event_handler_args &args); private: DbdToPv(IOType ioType); void activate( @@ -114,17 +121,19 @@ private: bool valueAlarmRequested; bool isArray; bool firstTime; + bool choicesValid; + bool waitForChoicesValid; chtype caValueType; chtype caRequestType; unsigned long maxElements; + epics::pvData::Mutex choicesMutex; + epics::pvData::Event choicesEvent; epicsTimeStamp caTimeStamp; CaAlarm caAlarm; CaDisplay caDisplay; CaControl caControl; CaValueAlarm caValueAlarm; std::string description; - epics::pvData::Event waitForCallback; - epics::pvData::Status putStatus; epics::pvData::Structure::const_shared_pointer structure; std::vector choices; }; diff --git a/src/ca/getDoneThread.cpp b/src/ca/getDoneThread.cpp new file mode 100644 index 0000000..ae01f1a --- /dev/null +++ b/src/ca/getDoneThread.cpp @@ -0,0 +1,114 @@ +/** + * Copyright - See the COPYRIGHT that is included with this distribution. + * pvAccessCPP is distributed subject to a Software License Agreement found + * in file LICENSE that is included with this distribution. + */ +/** + * @author mrk + * @date 2018.07 + */ + +#include "caChannel.h" +#include +#define epicsExportSharedSymbols +#include "getDoneThread.h" + +using namespace epics::pvData; +using namespace std; + +namespace epics { +namespace pvAccess { +namespace ca { + +GetDoneThreadPtr GetDoneThread::get() +{ + static GetDoneThreadPtr master; + static Mutex mutex; + Lock xx(mutex); + if(!master) { + master = GetDoneThreadPtr(new GetDoneThread()); + master->start(); + } + return master; +} + +GetDoneThread::GetDoneThread() +: isStop(false) +{ +} + +GetDoneThread::~GetDoneThread() +{ +//std::cout << "GetDoneThread::~GetDoneThread()\n"; +} + + +void GetDoneThread::start() +{ + thread = std::tr1::shared_ptr(new epicsThread( + *this, + "getDoneThread", + epicsThreadGetStackSize(epicsThreadStackSmall), + epicsThreadPriorityLow)); + thread->start(); +} + + +void GetDoneThread::stop() +{ + { + Lock xx(mutex); + isStop = true; + } + waitForCommand.signal(); + waitForStop.wait(); +} + +void GetDoneThread::getDone(NotifyGetRequesterPtr const ¬ifyGetRequester) +{ + { + Lock lock(mutex); + if(notifyGetRequester->isOnQueue) return; + notifyGetRequester->isOnQueue = true; + notifyGetQueue.push(notifyGetRequester); + } + waitForCommand.signal(); +} + +void GetDoneThread::run() +{ + while(true) + { + waitForCommand.wait(); + while(true) { + bool more = false; + NotifyGetRequester* notifyGetRequester(NULL); + { + Lock lock(mutex); + if(!notifyGetQueue.empty()) + { + more = true; + NotifyGetRequesterWPtr req(notifyGetQueue.front()); + notifyGetQueue.pop(); + NotifyGetRequesterPtr reqPtr(req.lock()); + if(reqPtr) { + notifyGetRequester = reqPtr.get(); + reqPtr->isOnQueue = false; + } + } + } + if(!more) break; + if(notifyGetRequester!=NULL) + { + CAChannelGetPtr channelGet(notifyGetRequester->channelGet.lock()); + if(channelGet) channelGet->notifyClient(); + } + } + if(isStop) { + waitForStop.signal(); + break; + } + } +} + +}}} diff --git a/src/ca/getDoneThread.h b/src/ca/getDoneThread.h new file mode 100644 index 0000000..6522f0a --- /dev/null +++ b/src/ca/getDoneThread.h @@ -0,0 +1,71 @@ +/** + * Copyright - See the COPYRIGHT that is included with this distribution. + * pvAccessCPP is distributed subject to a Software License Agreement found + * in file LICENSE that is included with this distribution. + */ +/** + * @author mrk + * @date 2018.07 + */ +#ifndef GetDoneThread_H +#define GetDoneThread_H +#include +#include +#include +#include +#include +#include + +namespace epics { +namespace pvAccess { +namespace ca { + +class NotifyGetRequester; +typedef std::tr1::shared_ptr NotifyGetRequesterPtr; +typedef std::tr1::weak_ptr NotifyGetRequesterWPtr; + + +class GetDoneThread; +typedef std::tr1::shared_ptr GetDoneThreadPtr; + +class CAChannelGet; +typedef std::tr1::shared_ptr CAChannelGetPtr; +typedef std::tr1::weak_ptr CAChannelGetWPtr; + +class NotifyGetRequester +{ +public: + ChannelGetRequester::weak_pointer channelGetRequester; + CAChannelGetWPtr channelGet; + bool isOnQueue; + NotifyGetRequester() : isOnQueue(false) {} + void setChannelGet(CAChannelGetPtr const &channelGet) + { this->channelGet = channelGet;} +}; + + +class GetDoneThread : + public epicsThreadRunable +{ +public: + static GetDoneThreadPtr get(); + ~GetDoneThread(); + virtual void run(); + void start(); + void stop(); + void getDone(NotifyGetRequesterPtr const ¬ifyGetRequester); +private: + GetDoneThread(); + + bool isStop; + std::tr1::shared_ptr thread; + epics::pvData::Mutex mutex; + epics::pvData::Event waitForCommand; + epics::pvData::Event waitForStop; + std::queue notifyGetQueue; +}; + + +}}} + +#endif /* GetDoneThread_H */ diff --git a/src/ca/monitorEventThread.cpp b/src/ca/monitorEventThread.cpp index e95d101..d74cbd1 100644 --- a/src/ca/monitorEventThread.cpp +++ b/src/ca/monitorEventThread.cpp @@ -39,7 +39,7 @@ MonitorEventThread::MonitorEventThread() MonitorEventThread::~MonitorEventThread() { -std::cout << "MonitorEventThread::~MonitorEventThread()\n"; +//std::cout << "MonitorEventThread::~MonitorEventThread()\n"; } void MonitorEventThread::start() @@ -63,13 +63,13 @@ void MonitorEventThread::stop() } -void MonitorEventThread::event(NotifyRequesterPtr const &stopMonitor) +void MonitorEventThread::event(NotifyMonitorRequesterPtr const ¬ifyMonitorRequester) { { Lock lock(mutex); - if(stopMonitor->isOnQueue) return; - stopMonitor->isOnQueue = true; - notifyMonitorQueue.push(stopMonitor); + if(notifyMonitorRequester->isOnQueue) return; + notifyMonitorRequester->isOnQueue = true; + notifyMonitorQueue.push(notifyMonitorRequester); } waitForCommand.signal(); } @@ -81,25 +81,25 @@ void MonitorEventThread::run() waitForCommand.wait(); while(true) { bool more = false; - NotifyRequester* notifyRequester(NULL); + NotifyMonitorRequester* notifyMonitorRequester(NULL); { Lock lock(mutex); if(!notifyMonitorQueue.empty()) { more = true; - NotifyRequesterWPtr req(notifyMonitorQueue.front()); + NotifyMonitorRequesterWPtr req(notifyMonitorQueue.front()); notifyMonitorQueue.pop(); - NotifyRequesterPtr reqPtr(req.lock()); + NotifyMonitorRequesterPtr reqPtr(req.lock()); if(reqPtr) { - notifyRequester = reqPtr.get(); + notifyMonitorRequester = reqPtr.get(); reqPtr->isOnQueue = false; } } } if(!more) break; - if(notifyRequester!=NULL) + if(notifyMonitorRequester!=NULL) { - CAChannelMonitorPtr channelMonitor(notifyRequester->channelMonitor.lock()); + CAChannelMonitorPtr channelMonitor(notifyMonitorRequester->channelMonitor.lock()); if(channelMonitor) channelMonitor->notifyClient(); } } diff --git a/src/ca/monitorEventThread.h b/src/ca/monitorEventThread.h index 0784d07..0a03ee7 100644 --- a/src/ca/monitorEventThread.h +++ b/src/ca/monitorEventThread.h @@ -20,9 +20,9 @@ namespace epics { namespace pvAccess { namespace ca { -class NotifyRequester; -typedef std::tr1::shared_ptr NotifyRequesterPtr; -typedef std::tr1::weak_ptr NotifyRequesterWPtr; +class NotifyMonitorRequester; +typedef std::tr1::shared_ptr NotifyMonitorRequesterPtr; +typedef std::tr1::weak_ptr NotifyMonitorRequesterWPtr; class MonitorEventThread; @@ -32,13 +32,13 @@ class CAChannelMonitor; typedef std::tr1::shared_ptr CAChannelMonitorPtr; typedef std::tr1::weak_ptr CAChannelMonitorWPtr; -class NotifyRequester +class NotifyMonitorRequester { public: MonitorRequester::weak_pointer monitorRequester; CAChannelMonitorWPtr channelMonitor; bool isOnQueue; - NotifyRequester() : isOnQueue(false) {} + NotifyMonitorRequester() : isOnQueue(false) {} void setChannelMonitor(CAChannelMonitorPtr const &channelMonitor) { this->channelMonitor = channelMonitor;} }; @@ -53,7 +53,7 @@ public: virtual void run(); void start(); void stop(); - void event(NotifyRequesterPtr const ¬ifyRequester); + void event(NotifyMonitorRequesterPtr const ¬ifyMonitorRequester); private: MonitorEventThread(); @@ -62,7 +62,7 @@ private: epics::pvData::Mutex mutex; epics::pvData::Event waitForCommand; epics::pvData::Event waitForStop; - std::queue notifyMonitorQueue; + std::queue notifyMonitorQueue; }; diff --git a/src/ca/putDoneThread.cpp b/src/ca/putDoneThread.cpp new file mode 100644 index 0000000..79eb6e9 --- /dev/null +++ b/src/ca/putDoneThread.cpp @@ -0,0 +1,114 @@ +/** + * Copyright - See the COPYRIGHT that is included with this distribution. + * pvAccessCPP is distributed subject to a Software License Agreement found + * in file LICENSE that is included with this distribution. + */ +/** + * @author mrk + * @date 2018.07 + */ + +#include "caChannel.h" +#include +#define epicsExportSharedSymbols +#include "putDoneThread.h" + +using namespace epics::pvData; +using namespace std; + +namespace epics { +namespace pvAccess { +namespace ca { + +PutDoneThreadPtr PutDoneThread::get() +{ + static PutDoneThreadPtr master; + static Mutex mutex; + Lock xx(mutex); + if(!master) { + master = PutDoneThreadPtr(new PutDoneThread()); + master->start(); + } + return master; +} + +PutDoneThread::PutDoneThread() +: isStop(false) +{ +} + +PutDoneThread::~PutDoneThread() +{ +//std::cout << "PutDoneThread::~PutDoneThread()\n"; +} + + +void PutDoneThread::start() +{ + thread = std::tr1::shared_ptr(new epicsThread( + *this, + "putDoneThread", + epicsThreadGetStackSize(epicsThreadStackSmall), + epicsThreadPriorityLow)); + thread->start(); +} + + +void PutDoneThread::stop() +{ + { + Lock xx(mutex); + isStop = true; + } + waitForCommand.signal(); + waitForStop.wait(); +} + +void PutDoneThread::putDone(NotifyPutRequesterPtr const ¬ifyPutRequester) +{ + { + Lock lock(mutex); + if(notifyPutRequester->isOnQueue) return; + notifyPutRequester->isOnQueue = true; + notifyPutQueue.push(notifyPutRequester); + } + waitForCommand.signal(); +} + +void PutDoneThread::run() +{ + while(true) + { + waitForCommand.wait(); + while(true) { + bool more = false; + NotifyPutRequester* notifyPutRequester(NULL); + { + Lock lock(mutex); + if(!notifyPutQueue.empty()) + { + more = true; + NotifyPutRequesterWPtr req(notifyPutQueue.front()); + notifyPutQueue.pop(); + NotifyPutRequesterPtr reqPtr(req.lock()); + if(reqPtr) { + notifyPutRequester = reqPtr.get(); + reqPtr->isOnQueue = false; + } + } + } + if(!more) break; + if(notifyPutRequester!=NULL) + { + CAChannelPutPtr channelPut(notifyPutRequester->channelPut.lock()); + if(channelPut) channelPut->notifyClient(); + } + } + if(isStop) { + waitForStop.signal(); + break; + } + } +} + +}}} diff --git a/src/ca/putDoneThread.h b/src/ca/putDoneThread.h new file mode 100644 index 0000000..bab0f21 --- /dev/null +++ b/src/ca/putDoneThread.h @@ -0,0 +1,70 @@ +/** + * Copyright - See the COPYRIGHT that is included with this distribution. + * pvAccessCPP is distributed subject to a Software License Agreement found + * in file LICENSE that is included with this distribution. + */ +/** + * @author mrk + * @date 2018.07 + */ +#ifndef PutDoneThread_H +#define PutDoneThread_H +#include +#include +#include +#include +#include +#include + +namespace epics { +namespace pvAccess { +namespace ca { + +class NotifyPutRequester; +typedef std::tr1::shared_ptr NotifyPutRequesterPtr; +typedef std::tr1::weak_ptr NotifyPutRequesterWPtr; + + +class PutDoneThread; +typedef std::tr1::shared_ptr PutDoneThreadPtr; + +class CAChannelPut; +typedef std::tr1::shared_ptr CAChannelPutPtr; +typedef std::tr1::weak_ptr CAChannelPutWPtr; + +class NotifyPutRequester +{ +public: + ChannelPutRequester::weak_pointer channelPutRequester; + CAChannelPutWPtr channelPut; + bool isOnQueue; + NotifyPutRequester() : isOnQueue(false) {} + void setChannelPut(CAChannelPutPtr const &channelPut) + { this->channelPut = channelPut;} +}; + + +class PutDoneThread : + public epicsThreadRunable +{ +public: + static PutDoneThreadPtr get(); + ~PutDoneThread(); + virtual void run(); + void start(); + void stop(); + void putDone(NotifyPutRequesterPtr const ¬ifyPutRequester); +private: + PutDoneThread(); + bool isStop; + std::tr1::shared_ptr thread; + epics::pvData::Mutex mutex; + epics::pvData::Event waitForCommand; + epics::pvData::Event waitForStop; + std::queue notifyPutQueue; +}; + + +}}} + +#endif /* PutDoneThread_H */ diff --git a/src/ca/pv/caProvider.h b/src/ca/pv/caProvider.h index 2a18acd..90b880e 100644 --- a/src/ca/pv/caProvider.h +++ b/src/ca/pv/caProvider.h @@ -3,6 +3,9 @@ * pvAccessCPP is distributed subject to a Software License Agreement found * in file LICENSE that is included with this distribution. */ +/** + * @author msekoranja + */ #ifndef CAPROVIDER_H #define CAPROVIDER_H @@ -22,10 +25,17 @@ namespace ca { * A single instance is created the first time CAClientFactory::start is called. * epicsAtExit is used to destroy the instance. * + * The single instance calls: + * ca_context_create(ca_enable_preemptive_callback); + * * The thread that calls start, or a ca auxillary thread, are the only threads * that can call the ca_* functions. * - * Note the monitor callbacks are made from a separate thread that must NOT call any ca_* function. + * NOTE: callbacks for monitor, get, and put are made from a separate thread. + * This is done to prevent a deadly embrace that can occur + * when rapid gets, puts, and monitor events are happening. + * The callbacks should not call any pvAccess method. + * If any such call is made the separate thread becomes a ca auxillary thread. * */ class epicsShareClass CAClientFactory @@ -35,12 +45,6 @@ public: * */ static void start(); - /** @brief get the ca_client_context - * - * This can be called by an application specific auxiliary thread. - * See ca documentation. Not for casual use. - */ - static struct ca_client_context * get_ca_client_context(); /** @brief stop provider ca * * This does nothing since epicsAtExit is used to destroy the instance.