add weak_ptr threaded work queue

This commit is contained in:
Michael Davidsaver
2018-04-08 10:42:13 -07:00
parent 266f2f9d2d
commit f3173682fb
3 changed files with 187 additions and 0 deletions

View File

@ -16,6 +16,7 @@ qsrv_SRCS += qsrv.cpp
qsrv_SRCS += pdb.cpp
qsrv_SRCS += pdbsingle.cpp
#qsrv_SRCS += pvalink.cpp
qsrv_SRCS += tpool.cpp
qsrv_SRCS += demo.cpp
qsrv_SRCS += imagedemo.c

133
pdbApp/tpool.cpp Normal file
View File

@ -0,0 +1,133 @@
#include <typeinfo>
#include <stdexcept>
#include <epicsGuard.h>
#include <errlog.h>
#include <pv/sharedPtr.h>
#define epicsExportSharedSymbols
#include "helper.h"
#include "tpool.h"
typedef epicsGuard<epicsMutex> Guard;
typedef epicsGuardRelease<epicsMutex> UnGuard;
WorkQueue::WorkQueue(const std::string& name)
:name(name)
,state(Idle)
{}
WorkQueue::~WorkQueue() { close(); }
void WorkQueue::start(unsigned nworkers, unsigned prio)
{
Guard G(mutex);
if(state!=Idle)
throw std::logic_error("Already started");
try {
state = Active;
for(unsigned i=0; i<nworkers; i++) {
p2p::auto_ptr<epicsThread> worker(new epicsThread(*this, name.c_str(),
epicsThreadGetStackSize(epicsThreadStackSmall),
prio));
worker->start();
workers.push_back(worker.get());
worker.release();
}
}catch(...){
UnGuard U(G); // unlock as close() blocks to join any workers which were started
close();
throw;
}
}
void WorkQueue::close()
{
workers_t temp;
{
Guard G(mutex);
if(state!=Active)
return;
temp.swap(workers);
state = Stopping;
}
wakeup.signal();
for(workers_t::iterator it(temp.begin()), end(temp.end()); it!=end; ++it)
{
(*it)->exitWait();
}
{
Guard G(mutex);
state = Idle;
}
}
void WorkQueue::add(const value_type& work)
{
bool empty;
{
Guard G(mutex);
if(state!=Active)
return;
empty = queue.empty();
queue.push_back(work);
}
if(empty) {
wakeup.signal();
}
}
void WorkQueue::run()
{
Guard G(mutex);
std::tr1::shared_ptr<epicsThreadRunable> work;
while(state==Active) {
if(!queue.empty()) {
work = queue.front().lock();
queue.pop_front();
}
bool last = queue.empty();
{
UnGuard U(G);
if(work) {
try {
work->run();
work.reset();
}catch(std::exception& e){
errlogPrintf("%s Unhandled exception from %s: %s\n",
name.c_str(), typeid(work.get()).name(), e.what());
work.reset();
}
}
if(last) {
wakeup.wait();
}
}
}
// pass along the close() signal to next worker
wakeup.signal();
}

53
pdbApp/tpool.h Normal file
View File

@ -0,0 +1,53 @@
#ifndef TPOOL_H
#define TPOOL_H
#include <stdexcept>
#include <deque>
#include <vector>
#include <errlog.h>
#include <epicsThread.h>
#include <epicsMutex.h>
#include <epicsEvent.h>
#include <pv/sharedPtr.h>
#include <shareLib.h>
struct WorkQueue : private epicsThreadRunable
{
typedef std::tr1::weak_ptr<epicsThreadRunable> value_type;
private:
const std::string name;
epicsMutex mutex;
enum state_t {
Idle,
Active,
Stopping,
} state;
typedef std::deque<value_type> queue_t;
queue_t queue;
epicsEvent wakeup;
typedef std::vector<epicsThread*> workers_t;
workers_t workers;
public:
WorkQueue(const std::string& name);
virtual ~WorkQueue();
void start(unsigned nworkers=1, unsigned prio = epicsThreadPriorityLow);
void close();
void add(const value_type& work);
private:
virtual void run();
};
#endif // TPOOL_H