solved paralel start bug. due to handshaking deficit in thread start

This commit is contained in:
Dhanya Maliakal
2016-08-17 10:29:47 +02:00
parent fb8f49985b
commit e0c45f4afd
3 changed files with 130 additions and 165 deletions

View File

@ -1608,36 +1608,7 @@ int* multiSlsDetector::startAndReadAll(){
int multiSlsDetector::startAndReadAllNoWait(){ int multiSlsDetector::startAndReadAllNoWait(){
/*
int i=0;
int ret=OK, ret1=OK;
for (i=0; i<thisMultiDetector->numberOfDetectors; i++) {
if (i!=thisMultiDetector->masterPosition)
if (detectors[i]) {
ret=detectors[i]->startAndReadAllNoWait();
if(detectors[i]->getErrorMask())
setErrorMask(getErrorMask()|(1<<i));
if (ret!=OK)
ret1=FAIL;
}
}
i=thisMultiDetector->masterPosition;
if (thisMultiDetector->masterPosition>=0) {
if (detectors[i]) {
ret=detectors[i]->startAndReadAllNoWait();
if(detectors[i]->getErrorMask())
setErrorMask(getErrorMask()|(1<<i));
if (ret!=OK)
ret1=FAIL;
}
}
return ret1;
*/
// hanging randomly around 4000-5000 frames at 1sec exptime (threads dont return)
int i=0; int i=0;
int ret=OK; int ret=OK;
int posmin=0, posmax=thisMultiDetector->numberOfDetectors; int posmin=0, posmax=thisMultiDetector->numberOfDetectors;
@ -1653,10 +1624,9 @@ int multiSlsDetector::startAndReadAllNoWait(){
Task* task = new Task(new func0_t<int,slsDetector,int>(&slsDetector::startAndReadAllNoWait, Task* task = new Task(new func0_t<int,slsDetector,int>(&slsDetector::startAndReadAllNoWait,
detectors[idet],iret[idet])); detectors[idet],iret[idet]));
threadpool->add_task(task); threadpool->add_task(task);
cprintf(GREEN,"task addeD\n");
} }
}cout<<"waiting for tasks to copmlete"<<endl; }
threadpool->wait_for_tasks_to_complete();cout<<"returned!"<<endl; threadpool->wait_for_tasks_to_complete();
for(int idet=posmin; idet<posmax; idet++){ for(int idet=posmin; idet<posmax; idet++){
if((idet!=thisMultiDetector->masterPosition) && (detectors[idet])){ if((idet!=thisMultiDetector->masterPosition) && (detectors[idet])){
if(iret[idet] != NULL){ if(iret[idet] != NULL){
@ -1673,7 +1643,7 @@ int multiSlsDetector::startAndReadAllNoWait(){
//master //master
int ret1=OK; int ret1=OK;
i=thisMultiDetector->masterPosition; i=thisMultiDetector->masterPosition;
if (thisMultiDetector->masterPosition>=0) {cout<<"should never be here"<<endl; if (thisMultiDetector->masterPosition>=0) {
if (detectors[i]) { if (detectors[i]) {
ret1=detectors[i]->startAndReadAllNoWait(); ret1=detectors[i]->startAndReadAllNoWait();
if(detectors[i]->getErrorMask()) if(detectors[i]->getErrorMask())

View File

@ -1,8 +1,7 @@
#include "ThreadPool.h" #include "ThreadPool.h"
ThreadPool::ThreadPool(int pool_size) : m_pool_size(pool_size) ThreadPool::ThreadPool(int pool_size) : m_pool_size(pool_size){
{
#ifdef VERBOSE #ifdef VERBOSE
cout << "Constructed ThreadPool of size " << m_pool_size << endl; cout << "Constructed ThreadPool of size " << m_pool_size << endl;
#endif #endif
@ -12,17 +11,14 @@ ThreadPool::ThreadPool(int pool_size) : m_pool_size(pool_size)
number_of_ongoing_tasks = 0; number_of_ongoing_tasks = 0;
} }
ThreadPool::~ThreadPool() ThreadPool::~ThreadPool(){
{
// Release resources // Release resources
if (m_pool_state != STOPPED) { if (m_pool_state != STOPPED) {
destroy_threadpool(); destroy_threadpool();
} }
} }
// We can't pass a member function to pthread_create.
// So created the wrapper function that calls the member function
// we want to run in the thread.
extern "C" extern "C"
void* start_thread(void* arg) void* start_thread(void* arg)
{ {
@ -31,14 +27,13 @@ void* start_thread(void* arg)
return NULL; return NULL;
} }
int ThreadPool::initialize_threadpool() int ThreadPool::initialize_threadpool(){
{
if(m_pool_size == 1) if(m_pool_size == 1)
return m_pool_size; return m_pool_size;
// TODO: COnsider lazy loading threads instead of creating all at once
m_pool_state = STARTED; m_pool_state = STARTED;
int ret = -1; int ret = -1;
sem_init(&semStart,1,0);
for (int i = 0; i < m_pool_size; i++) { for (int i = 0; i < m_pool_size; i++) {
pthread_t tid; pthread_t tid;
thread_started = false; thread_started = false;
@ -57,14 +52,12 @@ int ThreadPool::initialize_threadpool()
return m_pool_size; return m_pool_size;
} }
int ThreadPool::destroy_threadpool() int ThreadPool::destroy_threadpool(){
{ if(m_pool_size == 1) if(m_pool_size == 1)
return 0; return 0;
//cout << "in destroying threadpool" << endl; /*cout << "in destroying threadpool" << endl;*/
// Note: this is not for synchronization, its for thread communication! // thread communication- modified m_pool_state may not show up
// destroy_threadpool() will only be called from the main thread, yet //to other threads until its modified in a lock!
// the modified m_pool_state may not show up to other threads until its
// modified in a lock!
m_task_mutex.lock(); m_task_mutex.lock();
m_pool_state = STOPPED; m_pool_state = STOPPED;
m_task_mutex.unlock(); m_task_mutex.unlock();
@ -74,17 +67,18 @@ int ThreadPool::destroy_threadpool()
int ret = -1; int ret = -1;
for (int i = 0; i < m_pool_size; i++) { for (int i = 0; i < m_pool_size; i++) {
void* result; void* result;
sem_post(&semStart);
ret = pthread_join(m_threads[i], &result); ret = pthread_join(m_threads[i], &result);
/*cout << "pthread_join() returned " << ret << ": " << strerror(errno) << endl;*/ /*cout << "pthread_join() returned " << ret << ": " << strerror(errno) << endl;*/
m_task_cond_var.broadcast(); // try waking up a bunch of threads that are still waiting m_task_cond_var.broadcast(); // try waking up a bunch of threads that are still waiting
} }
sem_destroy(&semStart);
number_of_ongoing_tasks = 0; number_of_ongoing_tasks = 0;
/* cout << m_pool_size << " threads exited from the thread pool" << endl;*/ /* cout << m_pool_size << " threads exited from the thread pool" << endl;*/
return 0; return 0;
} }
void* ThreadPool::execute_thread() void* ThreadPool::execute_thread(){
{
int ithread = current_thread_number; int ithread = current_thread_number;
thread_started = true; thread_started = true;
Task* task = NULL; Task* task = NULL;
@ -95,12 +89,6 @@ void* ThreadPool::execute_thread()
/*cout << "Locking: " << pthread_self() << endl;*/ /*cout << "Locking: " << pthread_self() << endl;*/
m_task_mutex.lock(); m_task_mutex.lock();
// We need to put pthread_cond_wait in a loop for two reasons:
// 1. There can be spurious wakeups (due to signal/ENITR)
// 2. When mutex is released for waiting, another thread can be waken up
// from a signal/broadcast and that thread can mess up the condition.
// So when the current thread wakes up the condition may no longer be
// actually true!
while ((m_pool_state != STOPPED) && (m_tasks.empty())) { while ((m_pool_state != STOPPED) && (m_tasks.empty())) {
// Wait until there is a task in the queue // Wait until there is a task in the queue
// Unlock mutex while wait, then lock it back when signaled // Unlock mutex while wait, then lock it back when signaled
@ -121,35 +109,32 @@ void* ThreadPool::execute_thread()
/*cout << "Unlocking: " << pthread_self() << endl;*/ /*cout << "Unlocking: " << pthread_self() << endl;*/
m_task_mutex.unlock(); m_task_mutex.unlock();
cout << ithread <<" Executing thread " << pthread_self() << endl; sem_wait(&semStart);
/*cout << ithread <<" Executing thread " << pthread_self() << endl;*/
// execute the task // execute the task
(*task)(); // could also do task->run(arg); (*task)(); // could also do task->run(arg);
cout << ithread <<" Done executing thread " << pthread_self() << endl; /*cout << ithread <<" Done executing thread " << pthread_self() << endl;*/
m_all_tasks_mutex.lock(); m_all_tasks_mutex.lock();
number_of_ongoing_tasks--; cout<<ithread <<" number_of_ongoing_tasks:"<<number_of_ongoing_tasks<<endl; number_of_ongoing_tasks--;
m_all_tasks_mutex.unlock(); m_all_tasks_mutex.unlock();
//if all required tasks done //last task and check m_tasks_loaded to ensure done only once
if(!ithread && m_tasks_loaded && (m_tasks.empty())){cout<<ithread <<" waiting for all tasks to be done"<<endl; if((!number_of_ongoing_tasks) && m_tasks_loaded){
while(number_of_ongoing_tasks) /*cout << ithread << " all tasks done."<<endl;*/
usleep(5000); m_all_tasks_mutex.lock();
cout<<ithread <<" all tasks done. gonna lock"<<endl; m_tasks_loaded = false;
m_all_tasks_mutex.lock(); cout<<ithread <<" all tasks done"<<endl; m_all_tasks_cond_var.signal();// wake up thread that is waiting for all tasks to be complete
m_tasks_loaded = false; cout<<ithread <<" task loaded set to false"<<endl; m_all_tasks_mutex.unlock();
m_all_tasks_cond_var.signal(); cout<<ithread <<" wake up signal sent"<<endl;// wake up thread that is waiting for all tasks to be complete
m_all_tasks_mutex.unlock();cout<<ithread <<" unlocked"<<endl;
} }
delete task;
/*cout << ithread << " task deleted" << endl;*/
delete task;cout<<ithread <<" task deleted"<<endl;
} }
return NULL; return NULL;
} }
int ThreadPool::add_task(Task* task) int ThreadPool::add_task(Task* task){
{
if(m_pool_size == 1){ if(m_pool_size == 1){
(*task)(); (*task)();
return 0; return 0;
@ -157,7 +142,7 @@ int ThreadPool::add_task(Task* task)
m_task_mutex.lock(); m_task_mutex.lock();
// TODO: put a limit on how many tasks can be added at most // TODO: put a limit on how many tasks can be added at most
m_tasks.push_back(task); m_tasks.push_back(task);
number_of_ongoing_tasks++;// cout<<"number_of_ongoing_tasks:"<<number_of_ongoing_tasks<<endl; number_of_ongoing_tasks++;
m_task_cond_var.signal(); // wake up one thread that is waiting for a task to be available m_task_cond_var.signal(); // wake up one thread that is waiting for a task to be available
m_task_mutex.unlock(); m_task_mutex.unlock();
return 0; return 0;
@ -166,13 +151,20 @@ int ThreadPool::add_task(Task* task)
void ThreadPool::wait_for_tasks_to_complete(){ void ThreadPool::wait_for_tasks_to_complete(){
if(m_pool_size == 1) if(m_pool_size == 1)
return; return;
m_all_tasks_mutex.lock();
m_all_tasks_mutex.lock();cout<<"waiting for tasks: locked. gonna wait" <<endl; /*cout << "waiting for tasks: locked. gonna wait" << endl;*/
m_tasks_loaded = true; m_tasks_loaded = true;
while ((m_pool_state != STOPPED) && m_tasks_loaded) {cout<<"waiting for tasks: checking should be locked most likely, m_tasks_loaded:"<<m_tasks_loaded <<endl;
m_all_tasks_cond_var.wait(m_all_tasks_mutex.get_mutex_ptr());cout<<"waiting for tasks: out of the wait loop, m_tasks_loaded:"<< m_tasks_loaded<<endl;
}cout<<"waiting for tasks:totall out, must be locked again" <<endl;
m_all_tasks_mutex.unlock();cout<<"waiting for tasks: unlocked and done" <<endl;
//giving all threads permission to start as all tasks have been added
//using a different variable as number_of_ongoing_tasks is likely to change during the loop
int totalnumtasks = number_of_ongoing_tasks;
for(int i=0;i<totalnumtasks;i++)
sem_post(&semStart);
while ((m_pool_state != STOPPED) && m_tasks_loaded) {
m_all_tasks_cond_var.wait(m_all_tasks_mutex.get_mutex_ptr());
}
/*cout << "waiting for tasks:totall out, must be locked again" << endl;*/
m_all_tasks_mutex.unlock();
} }

View File

@ -13,7 +13,7 @@
#include "Task.h" #include "Task.h"
#include "CondVar.h" #include "CondVar.h"
#include "Global.h" #include "Global.h"
#include <semaphore.h>
using namespace std; using namespace std;
@ -44,5 +44,8 @@ private:
//volatile uint64_t tasks_done_mask; //volatile uint64_t tasks_done_mask;
volatile int number_of_ongoing_tasks; volatile int number_of_ongoing_tasks;
sem_t semStart;
}; };