executor: revise run and shutdown

Instead of a flag use a special marker Command inserted into
the queue.
This commit is contained in:
Michael Davidsaver
2011-02-02 18:47:12 -05:00
parent 294684636d
commit 624dff139d
3 changed files with 45 additions and 22 deletions

View File

@@ -21,6 +21,15 @@
namespace epics { namespace pvData {
// special instance to stop the executor thread
static
class ExecutorShutdown : public Command {
virtual void command(){};
} executorShutdown;
static
Command *shutdown=&executorShutdown;
PVDATA_REFCOUNT_MONITOR_DEFINE(executor);
typedef LinkedListNode<ExecutorNode> ExecutorListNode;
@@ -54,7 +63,6 @@ private:
Event moreWork;
Event stopped;
Mutex mutex;
volatile bool alive;
Thread thread;
};
@@ -64,21 +72,21 @@ ExecutorPvt::ExecutorPvt(String threadName,ThreadPriority priority)
moreWork(),
stopped(),
mutex(),
alive(true),
thread(threadName,priority,this)
{}
ExecutorPvt::~ExecutorPvt()
{
{
Lock xx(&mutex);
alive = false;
}
moreWork.signal();
{
Lock xx(&mutex);
stopped.wait();
}
ExecutorNode shutdownNode(shutdown);
execute(&shutdownNode);
stopped.wait();
// The thread signals 'stopped' while still holding
// the lock. By taking it we wait for the run() function
// to actually return
Lock xx(mutex);
ExecutorListNode *node;
while((node=executorList.removeHead())!=0) {
delete node->getObject();
@@ -87,19 +95,34 @@ ExecutorPvt::~ExecutorPvt()
void ExecutorPvt::run()
{
while(alive) {
unique_lock<Mutex> xx(mutex);
while(true) {
ExecutorListNode * executorListNode = 0;
while(alive && runList.isEmpty()) {
while(runList.isEmpty()) {
xx.unlock();
moreWork.wait();
xx.lock();
}
if(alive) {
Lock xx(&mutex);
executorListNode = runList.removeHead();
}
if(alive && executorListNode!=0) {
executorListNode->getObject()->command->command();
executorListNode = runList.removeHead();
if(!executorListNode) continue;
Command *cmd=executorListNode->getObject()->command;
if(cmd==shutdown) break;
xx.unlock();
try {
executorListNode->getObject()->command->command();
}catch(std::exception& e){
//TODO: feed into logging mechanism
fprintf(stderr, "Executor: Unhandled exception: %s",e.what());
}catch(...){
fprintf(stderr, "Executor: Unhandled exception");
}
xx.lock();
}
stopped.signal();
}
@@ -114,7 +137,7 @@ ExecutorNode * ExecutorPvt::createNode(Command *command)
void ExecutorPvt::execute(ExecutorNode *node)
{
Lock xx(&mutex);
if(!alive || node->runNode.isOnList()) return;
if(node->runNode.isOnList()) return;
bool isEmpty = runList.isEmpty();
runList.addTail(&node->runNode);
if(isEmpty) moreWork.signal();