Move caProvider worker threads shutdown to static destructors

This commit is contained in:
Andrew Johnson
2020-10-02 18:33:50 -05:00
committed by mdavidsaver
parent 44fda51935
commit 601280836d
5 changed files with 141 additions and 157 deletions

View File

@ -8,10 +8,10 @@
* @date 2018.07
*/
#include "caChannel.h"
#include <epicsExit.h>
#define epicsExportSharedSymbols
#include "channelConnectThread.h"
#include "caChannel.h"
using namespace epics::pvData;
using namespace std;
@ -39,6 +39,12 @@ ChannelConnectThread::ChannelConnectThread()
ChannelConnectThread::~ChannelConnectThread()
{
{
Lock the(mutex);
isStop = true;
}
workToDo.signal();
thread->exitWait();
}
@ -53,63 +59,51 @@ void ChannelConnectThread::start()
thread->start();
}
void ChannelConnectThread::stop()
{
{
Lock xx(mutex);
isStop = true;
}
waitForCommand.signal();
waitForStop.wait();
}
void ChannelConnectThread::channelConnected(
NotifyChannelRequesterPtr const &notifyChannelRequester)
{
{
Lock lock(mutex);
if(notifyChannelRequester->isOnQueue) return;
Lock the(mutex);
if (notifyChannelRequester->isOnQueue) return;
notifyChannelRequester->isOnQueue = true;
notifyChannelQueue.push(notifyChannelRequester);
}
waitForCommand.signal();
workToDo.signal();
}
void ChannelConnectThread::run()
{
while(true)
{
waitForCommand.wait();
while(true) {
bool more = false;
NotifyChannelRequester* notifyChannelRequester(NULL);
{
Lock lock(mutex);
if(!notifyChannelQueue.empty())
{
more = true;
NotifyChannelRequesterWPtr req(notifyChannelQueue.front());
notifyChannelQueue.pop();
NotifyChannelRequesterPtr reqPtr(req.lock());
if(reqPtr) {
notifyChannelRequester = reqPtr.get();
reqPtr->isOnQueue = false;
}
}
}
if(!more) break;
if(notifyChannelRequester!=NULL)
{
CAChannelPtr channel(notifyChannelRequester->channel.lock());
if(channel) channel->notifyClient();
}
}
if(stopping()) {
waitForStop.signal();
break;
}
}
do {
workToDo.wait();
while (true) {
bool more = false;
NotifyChannelRequester* notifyChannelRequester(NULL);
{
Lock the(mutex);
if (!notifyChannelQueue.empty())
{
more = true;
NotifyChannelRequesterWPtr req(notifyChannelQueue.front());
notifyChannelQueue.pop();
NotifyChannelRequesterPtr reqPtr(req.lock());
if (reqPtr) {
notifyChannelRequester = reqPtr.get();
reqPtr->isOnQueue = false;
}
}
}
if (!more) break;
if (notifyChannelRequester)
{
CAChannelPtr channel(notifyChannelRequester->channel.lock());
if (channel) channel->notifyClient();
}
}
} while (!stopping());
}
}}}

View File

@ -24,7 +24,6 @@ class NotifyChannelRequester;
typedef std::tr1::shared_ptr<NotifyChannelRequester> NotifyChannelRequesterPtr;
typedef std::tr1::weak_ptr<NotifyChannelRequester> NotifyChannelRequesterWPtr;
class ChannelConnectThread;
typedef std::tr1::shared_ptr<ChannelConnectThread> ChannelConnectThreadPtr;
@ -64,8 +63,7 @@ private:
bool isStop;
std::tr1::shared_ptr<epicsThread> thread;
epics::pvData::Mutex mutex;
epics::pvData::Event waitForCommand;
epics::pvData::Event waitForStop;
epics::pvData::Event workToDo;
std::queue<NotifyChannelRequesterWPtr> notifyChannelQueue;
};

View File

@ -39,6 +39,12 @@ GetDoneThread::GetDoneThread()
GetDoneThread::~GetDoneThread()
{
{
Lock xx(mutex);
isStop = true;
}
waitForCommand.signal();
waitForStop.wait();
}
@ -55,12 +61,6 @@ void GetDoneThread::start()
void GetDoneThread::stop()
{
{
Lock xx(mutex);
isStop = true;
}
waitForCommand.signal();
waitForStop.wait();
}
void GetDoneThread::getDone(NotifyGetRequesterPtr const &notifyGetRequester)
@ -76,37 +76,34 @@ void GetDoneThread::getDone(NotifyGetRequesterPtr const &notifyGetRequester)
void GetDoneThread::run()
{
while(true)
{
waitForCommand.wait();
while(true) {
bool more = false;
NotifyGetRequester* notifyGetRequester(NULL);
{
Lock lock(mutex);
if(!notifyGetQueue.empty())
{
more = true;
NotifyGetRequesterWPtr req(notifyGetQueue.front());
notifyGetQueue.pop();
NotifyGetRequesterPtr reqPtr(req.lock());
if(reqPtr) {
notifyGetRequester = reqPtr.get();
reqPtr->isOnQueue = false;
}
}
}
if(!more) break;
if(notifyGetRequester!=NULL)
{
CAChannelGetPtr channelGet(notifyGetRequester->channelGet.lock());
if(channelGet) channelGet->notifyClient();
}
}
if(stopping()) {
waitForStop.signal();
break;
}
while (true) {
waitForCommand.wait();
while (true) {
bool more = false;
NotifyGetRequester* notifyGetRequester(NULL);
{
Lock lock(mutex);
if (!notifyGetQueue.empty()) {
more = true;
NotifyGetRequesterWPtr req(notifyGetQueue.front());
notifyGetQueue.pop();
NotifyGetRequesterPtr reqPtr(req.lock());
if (reqPtr) {
notifyGetRequester = reqPtr.get();
reqPtr->isOnQueue = false;
}
}
}
if (!more) break;
if (notifyGetRequester!=NULL) {
CAChannelGetPtr channelGet(notifyGetRequester->channelGet.lock());
if (channelGet) channelGet->notifyClient();
}
}
if (stopping()) {
waitForStop.signal();
break;
}
}
}

View File

@ -39,6 +39,12 @@ MonitorEventThread::MonitorEventThread()
MonitorEventThread::~MonitorEventThread()
{
{
Lock xx(mutex);
isStop = true;
}
waitForCommand.signal();
waitForStop.wait();
}
void MonitorEventThread::start()
@ -53,12 +59,6 @@ void MonitorEventThread::start()
void MonitorEventThread::stop()
{
{
Lock xx(mutex);
isStop = true;
}
waitForCommand.signal();
waitForStop.wait();
}
@ -75,37 +75,35 @@ void MonitorEventThread::event(NotifyMonitorRequesterPtr const &notifyMonitorReq
void MonitorEventThread::run()
{
while(true)
{
waitForCommand.wait();
while(true) {
bool more = false;
NotifyMonitorRequester* notifyMonitorRequester(NULL);
{
Lock lock(mutex);
if(!notifyMonitorQueue.empty())
{
more = true;
NotifyMonitorRequesterWPtr req(notifyMonitorQueue.front());
notifyMonitorQueue.pop();
NotifyMonitorRequesterPtr reqPtr(req.lock());
if(reqPtr) {
notifyMonitorRequester = reqPtr.get();
reqPtr->isOnQueue = false;
}
}
}
if(!more) break;
if(notifyMonitorRequester!=NULL)
{
CAChannelMonitorPtr channelMonitor(notifyMonitorRequester->channelMonitor.lock());
if(channelMonitor) channelMonitor->notifyClient();
}
}
if(stopping()) {
waitForStop.signal();
break;
}
while (true) {
waitForCommand.wait();
while (true) {
bool more = false;
NotifyMonitorRequester* notifyMonitorRequester(NULL);
{
Lock lock(mutex);
if (!notifyMonitorQueue.empty())
{
more = true;
NotifyMonitorRequesterWPtr req(notifyMonitorQueue.front());
notifyMonitorQueue.pop();
NotifyMonitorRequesterPtr reqPtr(req.lock());
if (reqPtr) {
notifyMonitorRequester = reqPtr.get();
reqPtr->isOnQueue = false;
}
}
}
if (!more) break;
if (notifyMonitorRequester!=NULL) {
CAChannelMonitorPtr channelMonitor(notifyMonitorRequester->channelMonitor.lock());
if (channelMonitor) channelMonitor->notifyClient();
}
}
if (stopping()) {
waitForStop.signal();
break;
}
}
}

View File

@ -39,6 +39,12 @@ PutDoneThread::PutDoneThread()
PutDoneThread::~PutDoneThread()
{
{
Lock xx(mutex);
isStop = true;
}
waitForCommand.signal();
waitForStop.wait();
}
@ -55,12 +61,6 @@ void PutDoneThread::start()
void PutDoneThread::stop()
{
{
Lock xx(mutex);
isStop = true;
}
waitForCommand.signal();
waitForStop.wait();
}
void PutDoneThread::putDone(NotifyPutRequesterPtr const &notifyPutRequester)
@ -76,37 +76,34 @@ void PutDoneThread::putDone(NotifyPutRequesterPtr const &notifyPutRequester)
void PutDoneThread::run()
{
while(true)
{
waitForCommand.wait();
while(true) {
bool more = false;
NotifyPutRequester* notifyPutRequester(NULL);
{
Lock lock(mutex);
if(!notifyPutQueue.empty())
{
more = true;
NotifyPutRequesterWPtr req(notifyPutQueue.front());
notifyPutQueue.pop();
NotifyPutRequesterPtr reqPtr(req.lock());
if(reqPtr) {
notifyPutRequester = reqPtr.get();
reqPtr->isOnQueue = false;
}
}
}
if(!more) break;
if(notifyPutRequester!=NULL)
{
CAChannelPutPtr channelPut(notifyPutRequester->channelPut.lock());
if(channelPut) channelPut->notifyClient();
}
}
if(stopping()) {
waitForStop.signal();
break;
}
while (true) {
waitForCommand.wait();
while (true) {
bool more = false;
NotifyPutRequester* notifyPutRequester(NULL);
{
Lock lock(mutex);
if (!notifyPutQueue.empty()) {
more = true;
NotifyPutRequesterWPtr req(notifyPutQueue.front());
notifyPutQueue.pop();
NotifyPutRequesterPtr reqPtr(req.lock());
if (reqPtr) {
notifyPutRequester = reqPtr.get();
reqPtr->isOnQueue = false;
}
}
}
if (!more) break;
if (notifyPutRequester!=NULL) {
CAChannelPutPtr channelPut(notifyPutRequester->channelPut.lock());
if (channelPut) channelPut->notifyClient();
}
}
if (stopping()) {
waitForStop.signal();
break;
}
}
}