put some locking in stop acquisition and startandreadallnowait (bugs threadpool with more evernts than normal due to main and processing thread with no locks for detector releated stuff, zmqthreadpool not required, r_restreamstop moved to status stop when idle

This commit is contained in:
Dhanya Maliakal
2017-12-08 11:40:07 +01:00
parent bf9905ad2a
commit 40a7b3983a
8 changed files with 28 additions and 89 deletions

View File

@ -7,7 +7,6 @@ ThreadPool::ThreadPool(int pool_size) : m_pool_size(pool_size){
#endif
m_tasks_loaded = false;
thread_started = false;
zmqthreadpool = false;
current_thread_number = -1;
number_of_ongoing_tasks = 0;
number_of_total_tasks = 0;
@ -117,7 +116,6 @@ void* ThreadPool::execute_thread(){
/*cout << "Unlocking: " << pthread_self() << endl;*/
m_task_mutex.unlock();
//if(zmqthreadpool) cout<<"***"<<ithread<<" semaphore start address wait:"<<&semStart<<endl;
sem_wait(&semStart);
//cout<<"***"<<ithread<<" checking out semaphore done address:"<<&semDone<<endl;
@ -134,14 +132,10 @@ void* ThreadPool::execute_thread(){
number_of_ongoing_tasks--;
m_task_mutex.unlock();
//if(zmqthreadpool) cout<<ithread <<" task done: "<<number_of_ongoing_tasks<<endl;
//last task and check m_tasks_loaded to ensure done only once
if((!number_of_ongoing_tasks) && m_tasks_loaded){
//if(zmqthreadpool) cout << ithread << " all tasks done."<<endl;
m_tasks_loaded = false;
}
//if(zmqthreadpool) cout<<"***"<<ithread<<" semaphore done address post:"<<&semDone<<endl;
sem_post(&semDone);
//removed deleteing task to earlier
@ -155,6 +149,7 @@ int ThreadPool::add_task(Task* task){
return 0;
}
m_task_mutex.lock();
// TODO: put a limit on how many tasks can be added at most
m_tasks.push_back(task);
number_of_ongoing_tasks++;
@ -172,7 +167,6 @@ void ThreadPool::startExecuting(){
m_tasks_loaded = true;
//giving all threads permission to start as all tasks have been added
//if(zmqthreadpool) cout<<"*** semaphore start address post:"<<&semStart<<endl;
for(int i=0;i<number_of_total_tasks;i++)
sem_post(&semStart);
@ -181,14 +175,12 @@ void ThreadPool::startExecuting(){
void ThreadPool::wait_for_tasks_to_complete(){
if(m_pool_size == 1)
return;
//if(zmqthreadpool) cout<<"waiting for all tasks to be done "<<endl;
//if(zmqthreadpool) cout<<"*** semaphore done address wait:"<<&semDone<<endl;
for(int i=0;i<number_of_total_tasks;i++)
for(int i=0;i<number_of_total_tasks;i++) {
//cprintf(MAGENTA,"waiting for %d to be done, total tasks:%d\n", i, number_of_total_tasks);
sem_wait(&semDone);
//cprintf(YELLOW,"done with waiting for %d, total tasks:%d\n", i, number_of_total_tasks);
}
number_of_total_tasks = 0;
//if(zmqthreadpool) cout<<"complete"<<endl<<endl;
}
void ThreadPool::setzeromqThread(){
zmqthreadpool = true;
}