From e0c45f4afd23a72d5d90ffb1692cd0b5efab4986 Mon Sep 17 00:00:00 2001 From: Dhanya Maliakal Date: Wed, 17 Aug 2016 10:29:47 +0200 Subject: [PATCH] solved paralel start bug. due to handshaking deficit in thread start --- .../multiSlsDetector/multiSlsDetector.cpp | 36 +-- .../threadFiles/ThreadPool.cpp | 254 +++++++++--------- slsDetectorSoftware/threadFiles/ThreadPool.h | 5 +- 3 files changed, 130 insertions(+), 165 deletions(-) diff --git a/slsDetectorSoftware/multiSlsDetector/multiSlsDetector.cpp b/slsDetectorSoftware/multiSlsDetector/multiSlsDetector.cpp index 8fb32627c..f902e24d7 100644 --- a/slsDetectorSoftware/multiSlsDetector/multiSlsDetector.cpp +++ b/slsDetectorSoftware/multiSlsDetector/multiSlsDetector.cpp @@ -1608,36 +1608,7 @@ int* multiSlsDetector::startAndReadAll(){ int multiSlsDetector::startAndReadAllNoWait(){ -/* - int i=0; - int ret=OK, ret1=OK; - for (i=0; inumberOfDetectors; i++) { - if (i!=thisMultiDetector->masterPosition) - if (detectors[i]) { - ret=detectors[i]->startAndReadAllNoWait(); - if(detectors[i]->getErrorMask()) - setErrorMask(getErrorMask()|(1<masterPosition; - if (thisMultiDetector->masterPosition>=0) { - if (detectors[i]) { - ret=detectors[i]->startAndReadAllNoWait(); - if(detectors[i]->getErrorMask()) - setErrorMask(getErrorMask()|(1<numberOfDetectors; @@ -1653,10 +1624,9 @@ int multiSlsDetector::startAndReadAllNoWait(){ Task* task = new Task(new func0_t(&slsDetector::startAndReadAllNoWait, detectors[idet],iret[idet])); threadpool->add_task(task); - cprintf(GREEN,"task addeD\n"); } - }cout<<"waiting for tasks to copmlete"<wait_for_tasks_to_complete();cout<<"returned!"<wait_for_tasks_to_complete(); for(int idet=posmin; idetmasterPosition) && (detectors[idet])){ if(iret[idet] != NULL){ @@ -1673,7 +1643,7 @@ int multiSlsDetector::startAndReadAllNoWait(){ //master int ret1=OK; i=thisMultiDetector->masterPosition; - if (thisMultiDetector->masterPosition>=0) {cout<<"should never be here"<masterPosition>=0) { if (detectors[i]) { ret1=detectors[i]->startAndReadAllNoWait(); if(detectors[i]->getErrorMask()) diff --git a/slsDetectorSoftware/threadFiles/ThreadPool.cpp b/slsDetectorSoftware/threadFiles/ThreadPool.cpp index 7e6d72ab0..223983a73 100644 --- a/slsDetectorSoftware/threadFiles/ThreadPool.cpp +++ b/slsDetectorSoftware/threadFiles/ThreadPool.cpp @@ -1,178 +1,170 @@ #include "ThreadPool.h" -ThreadPool::ThreadPool(int pool_size) : m_pool_size(pool_size) -{ +ThreadPool::ThreadPool(int pool_size) : m_pool_size(pool_size){ #ifdef VERBOSE - cout << "Constructed ThreadPool of size " << m_pool_size << endl; + cout << "Constructed ThreadPool of size " << m_pool_size << endl; #endif - m_tasks_loaded = false; - thread_started = false; - current_thread_number = -1; - number_of_ongoing_tasks = 0; + m_tasks_loaded = false; + thread_started = false; + current_thread_number = -1; + number_of_ongoing_tasks = 0; } -ThreadPool::~ThreadPool() -{ - // Release resources - if (m_pool_state != STOPPED) { - destroy_threadpool(); - } +ThreadPool::~ThreadPool(){ + // Release resources + if (m_pool_state != STOPPED) { + destroy_threadpool(); + } } -// We can't pass a member function to pthread_create. -// So created the wrapper function that calls the member function -// we want to run in the thread. + extern "C" void* start_thread(void* arg) { - ThreadPool* tp = (ThreadPool*) arg; - tp->execute_thread(); - return NULL; + ThreadPool* tp = (ThreadPool*) arg; + tp->execute_thread(); + return NULL; } -int ThreadPool::initialize_threadpool() -{ +int ThreadPool::initialize_threadpool(){ if(m_pool_size == 1) return m_pool_size; - // TODO: COnsider lazy loading threads instead of creating all at once - m_pool_state = STARTED; - int ret = -1; - for (int i = 0; i < m_pool_size; i++) { - pthread_t tid; - thread_started = false; - current_thread_number = i; - ret = pthread_create(&tid, NULL, start_thread, (void*) this); - if (ret != 0) { - cerr << "pthread_create() failed: " << ret << endl; - return 0; - } - m_threads.push_back(tid); - while(!thread_started); - } + m_pool_state = STARTED; + int ret = -1; + sem_init(&semStart,1,0); + for (int i = 0; i < m_pool_size; i++) { + pthread_t tid; + thread_started = false; + current_thread_number = i; + ret = pthread_create(&tid, NULL, start_thread, (void*) this); + if (ret != 0) { + cerr << "pthread_create() failed: " << ret << endl; + return 0; + } + m_threads.push_back(tid); + while(!thread_started); + } #ifdef VERBOSE - cout << m_pool_size << " threads created by the thread pool" << endl; + cout << m_pool_size << " threads created by the thread pool" << endl; #endif - return m_pool_size; + return m_pool_size; } -int ThreadPool::destroy_threadpool() -{ if(m_pool_size == 1) +int ThreadPool::destroy_threadpool(){ + if(m_pool_size == 1) return 0; - //cout << "in destroying threadpool" << endl; - // Note: this is not for synchronization, its for thread communication! - // destroy_threadpool() will only be called from the main thread, yet - // the modified m_pool_state may not show up to other threads until its - // modified in a lock! - m_task_mutex.lock(); - m_pool_state = STOPPED; - m_task_mutex.unlock(); - /*cout << "Broadcasting STOP signal to all threads..." << endl;*/ - m_task_cond_var.broadcast(); // notify all threads we are shttung down + /*cout << "in destroying threadpool" << endl;*/ + // thread communication- modified m_pool_state may not show up + //to other threads until its modified in a lock! + m_task_mutex.lock(); + m_pool_state = STOPPED; + m_task_mutex.unlock(); + /*cout << "Broadcasting STOP signal to all threads..." << endl;*/ + m_task_cond_var.broadcast(); // notify all threads we are shttung down - int ret = -1; - for (int i = 0; i < m_pool_size; i++) { - void* result; - ret = pthread_join(m_threads[i], &result); - /*cout << "pthread_join() returned " << ret << ": " << strerror(errno) << endl;*/ - m_task_cond_var.broadcast(); // try waking up a bunch of threads that are still waiting - } - number_of_ongoing_tasks = 0; - /* cout << m_pool_size << " threads exited from the thread pool" << endl;*/ - return 0; + int ret = -1; + for (int i = 0; i < m_pool_size; i++) { + void* result; + sem_post(&semStart); + ret = pthread_join(m_threads[i], &result); + /*cout << "pthread_join() returned " << ret << ": " << strerror(errno) << endl;*/ + m_task_cond_var.broadcast(); // try waking up a bunch of threads that are still waiting + } + sem_destroy(&semStart); + number_of_ongoing_tasks = 0; + /* cout << m_pool_size << " threads exited from the thread pool" << endl;*/ + return 0; } -void* ThreadPool::execute_thread() -{ +void* ThreadPool::execute_thread(){ int ithread = current_thread_number; thread_started = true; - Task* task = NULL; - m_tasks_loaded = false; - /*cout << "Starting thread " << pthread_self() << endl;*/ - while(true) { - // Try to pick a task - /*cout << "Locking: " << pthread_self() << endl;*/ - m_task_mutex.lock(); - - // We need to put pthread_cond_wait in a loop for two reasons: - // 1. There can be spurious wakeups (due to signal/ENITR) - // 2. When mutex is released for waiting, another thread can be waken up - // from a signal/broadcast and that thread can mess up the condition. - // So when the current thread wakes up the condition may no longer be - // actually true! - while ((m_pool_state != STOPPED) && (m_tasks.empty())) { - // Wait until there is a task in the queue - // Unlock mutex while wait, then lock it back when signaled - /* cout << "Unlocking and waiting: " << pthread_self() << endl;*/ - m_task_cond_var.wait(m_task_mutex.get_mutex_ptr()); - /* cout << "Signaled and locking: " << pthread_self() << endl;*/ - } + Task* task = NULL; + m_tasks_loaded = false; + /*cout << "Starting thread " << pthread_self() << endl;*/ + while(true) { + // Try to pick a task + /*cout << "Locking: " << pthread_self() << endl;*/ + m_task_mutex.lock(); - // If the thread was woken up to notify process shutdown, return from here - if (m_pool_state == STOPPED) { - /* cout << "Unlocking and exiting: " << pthread_self() << endl;*/ - m_task_mutex.unlock(); - pthread_exit(NULL); - } + while ((m_pool_state != STOPPED) && (m_tasks.empty())) { + // Wait until there is a task in the queue + // Unlock mutex while wait, then lock it back when signaled + /* cout << "Unlocking and waiting: " << pthread_self() << endl;*/ + m_task_cond_var.wait(m_task_mutex.get_mutex_ptr()); + /* cout << "Signaled and locking: " << pthread_self() << endl;*/ + } - task = m_tasks.front(); - m_tasks.pop_front(); - /*cout << "Unlocking: " << pthread_self() << endl;*/ - m_task_mutex.unlock(); + // If the thread was woken up to notify process shutdown, return from here + if (m_pool_state == STOPPED) { + /* cout << "Unlocking and exiting: " << pthread_self() << endl;*/ + m_task_mutex.unlock(); + pthread_exit(NULL); + } - cout << ithread <<" Executing thread " << pthread_self() << endl; - // execute the task - (*task)(); // could also do task->run(arg); - cout << ithread <<" Done executing thread " << pthread_self() << endl; + task = m_tasks.front(); + m_tasks.pop_front(); + /*cout << "Unlocking: " << pthread_self() << endl;*/ + m_task_mutex.unlock(); - m_all_tasks_mutex.lock(); - number_of_ongoing_tasks--; cout<run(arg); + /*cout << ithread <<" Done executing thread " << pthread_self() << endl;*/ + m_all_tasks_mutex.lock(); + number_of_ongoing_tasks--; + m_all_tasks_mutex.unlock(); - - delete task;cout< using namespace std; @@ -44,5 +44,8 @@ private: //volatile uint64_t tasks_done_mask; volatile int number_of_ongoing_tasks; + + sem_t semStart; + };