From f3173682fb0e10f66eb445ea7d6b4fc8c94db6bb Mon Sep 17 00:00:00 2001 From: Michael Davidsaver Date: Sun, 8 Apr 2018 10:42:13 -0700 Subject: [PATCH] add weak_ptr threaded work queue --- pdbApp/Makefile | 1 + pdbApp/tpool.cpp | 133 +++++++++++++++++++++++++++++++++++++++++++++++ pdbApp/tpool.h | 53 +++++++++++++++++++ 3 files changed, 187 insertions(+) create mode 100644 pdbApp/tpool.cpp create mode 100644 pdbApp/tpool.h diff --git a/pdbApp/Makefile b/pdbApp/Makefile index 37844d6..ae2d591 100644 --- a/pdbApp/Makefile +++ b/pdbApp/Makefile @@ -16,6 +16,7 @@ qsrv_SRCS += qsrv.cpp qsrv_SRCS += pdb.cpp qsrv_SRCS += pdbsingle.cpp #qsrv_SRCS += pvalink.cpp +qsrv_SRCS += tpool.cpp qsrv_SRCS += demo.cpp qsrv_SRCS += imagedemo.c diff --git a/pdbApp/tpool.cpp b/pdbApp/tpool.cpp new file mode 100644 index 0000000..ef37f66 --- /dev/null +++ b/pdbApp/tpool.cpp @@ -0,0 +1,133 @@ + +#include +#include + +#include +#include + +#include + +#define epicsExportSharedSymbols +#include "helper.h" +#include "tpool.h" + +typedef epicsGuard Guard; +typedef epicsGuardRelease UnGuard; + +WorkQueue::WorkQueue(const std::string& name) + :name(name) + ,state(Idle) +{} + +WorkQueue::~WorkQueue() { close(); } + +void WorkQueue::start(unsigned nworkers, unsigned prio) +{ + Guard G(mutex); + + if(state!=Idle) + throw std::logic_error("Already started"); + + try { + state = Active; + + for(unsigned i=0; i worker(new epicsThread(*this, name.c_str(), + epicsThreadGetStackSize(epicsThreadStackSmall), + prio)); + + worker->start(); + + workers.push_back(worker.get()); + worker.release(); + } + }catch(...){ + UnGuard U(G); // unlock as close() blocks to join any workers which were started + close(); + throw; + } +} + +void WorkQueue::close() +{ + workers_t temp; + + { + Guard G(mutex); + if(state!=Active) + return; + + temp.swap(workers); + state = Stopping; + } + + wakeup.signal(); + + for(workers_t::iterator it(temp.begin()), end(temp.end()); it!=end; ++it) + { + (*it)->exitWait(); + } + + { + Guard G(mutex); + state = Idle; + } +} + +void WorkQueue::add(const value_type& work) +{ + bool empty; + + { + Guard G(mutex); + if(state!=Active) + return; + + empty = queue.empty(); + + queue.push_back(work); + } + + if(empty) { + wakeup.signal(); + } +} + +void WorkQueue::run() +{ + Guard G(mutex); + + std::tr1::shared_ptr work; + + while(state==Active) { + + if(!queue.empty()) { + work = queue.front().lock(); + queue.pop_front(); + } + + bool last = queue.empty(); + + { + UnGuard U(G); + + if(work) { + try { + work->run(); + work.reset(); + }catch(std::exception& e){ + errlogPrintf("%s Unhandled exception from %s: %s\n", + name.c_str(), typeid(work.get()).name(), e.what()); + work.reset(); + } + } + + if(last) { + wakeup.wait(); + } + } + } + + // pass along the close() signal to next worker + wakeup.signal(); +} diff --git a/pdbApp/tpool.h b/pdbApp/tpool.h new file mode 100644 index 0000000..b50a244 --- /dev/null +++ b/pdbApp/tpool.h @@ -0,0 +1,53 @@ +#ifndef TPOOL_H +#define TPOOL_H + +#include +#include +#include + +#include +#include +#include +#include + +#include + +#include + +struct WorkQueue : private epicsThreadRunable +{ + typedef std::tr1::weak_ptr value_type; + +private: + const std::string name; + + epicsMutex mutex; + + enum state_t { + Idle, + Active, + Stopping, + } state; + + typedef std::deque queue_t; + queue_t queue; + + epicsEvent wakeup; + + typedef std::vector workers_t; + workers_t workers; + +public: + WorkQueue(const std::string& name); + virtual ~WorkQueue(); + + void start(unsigned nworkers=1, unsigned prio = epicsThreadPriorityLow); + void close(); + + void add(const value_type& work); + +private: + virtual void run(); +}; + +#endif // TPOOL_H