#include "ThreadPool.h" #include ThreadPool::ThreadPool(int pool_size) : m_pool_size(pool_size){ #ifdef VERBOSE cout << "Constructed ThreadPool of size " << m_pool_size << endl; #endif m_tasks_loaded = false; thread_started = false; current_thread_number = -1; number_of_ongoing_tasks = 0; number_of_total_tasks = 0; } ThreadPool::~ThreadPool(){ // Release resources if (m_pool_state != STOPPED) { destroy_threadpool(); } } extern "C" void* start_thread(void* arg) { ThreadPool* tp = (ThreadPool*) arg; tp->execute_thread(); return NULL; } int ThreadPool::initialize_threadpool(){ if(m_pool_size == 1) return m_pool_size; m_pool_state = STARTED; int ret = -1; sem_init(&semStart,1,0); sem_init(&semDone,1,0); for (int i = 0; i < m_pool_size; i++) { pthread_t tid; thread_started = false; current_thread_number = i; ret = pthread_create(&tid, NULL, start_thread, (void*) this); if (ret != 0) { cerr << "pthread_create() failed: " << ret << endl; return 0; } m_threads.push_back(tid); while(!thread_started); } #ifdef VERBOSE cout << m_pool_size << " threads created by the thread pool" << endl; #endif return m_pool_size; } int ThreadPool::destroy_threadpool(){ if(m_pool_size == 1) return 0; /*cout << "in destroying threadpool" << endl;*/ // thread communication- modified m_pool_state may not show up //to other threads until its modified in a lock! m_task_mutex.lock(); m_pool_state = STOPPED; /*cout << "Broadcasting STOP signal to all threads..." << endl;*/ m_task_cond_var.broadcast(); // notify all threads we are shttung down m_task_mutex.unlock(); // int ret = -1; for (int i = 0; i < m_pool_size; i++) { sem_post(&semStart); sem_post(&semDone); void* result; pthread_join(m_threads[i], &result); /*cout << "pthread_join() returned " << ret << ": " << strerror(errno) << endl;*/ m_task_mutex.lock(); m_task_cond_var.broadcast(); // try waking up a bunch of threads that are still waiting m_task_mutex.unlock(); } sem_destroy(&semStart); sem_destroy(&semDone); number_of_ongoing_tasks = 0; number_of_total_tasks = 0; /* cout << m_pool_size << " threads exited from the thread pool" << endl;*/ return 0; } void* ThreadPool::execute_thread(){ //for debugging seting ithread value // int ithread = current_thread_number; thread_started = true; Task* task = NULL; m_tasks_loaded = false; /*cout << "Starting thread " << pthread_self() << endl;*/ while(true) { // Try to pick a task /*cout << "Locking: " << pthread_self() << endl;*/ m_task_mutex.lock(); while ((m_pool_state != STOPPED) && (m_tasks.empty())) { // Wait until there is a task in the queue // Unlock mutex while wait, then lock it back when signaled /* cout << "Unlocking and waiting: " << pthread_self() << endl;*/ m_task_cond_var.wait(m_task_mutex.get_mutex_ptr()); /* cout << "Signaled and locking: " << pthread_self() << endl;*/ } // If the thread was woken up to notify process shutdown, return from here if (m_pool_state == STOPPED) { /* cout << "Unlocking and exiting: " << pthread_self() << endl;*/ m_task_mutex.unlock(); pthread_exit(NULL); } task = m_tasks.front(); m_tasks.pop_front(); /*cout << "Unlocking: " << pthread_self() << endl;*/ m_task_mutex.unlock(); sem_wait(&semStart); //cout<<"***"<run(arg); /*cout << ithread <<" Done executing thread " << pthread_self() << endl;*/ delete task; /*cout << ithread << " task deleted" << endl;*/ m_task_mutex.lock(); number_of_ongoing_tasks--; m_task_mutex.unlock(); //last task and check m_tasks_loaded to ensure done only once if((!number_of_ongoing_tasks) && m_tasks_loaded){ m_tasks_loaded = false; } sem_post(&semDone); //removed deleteing task to earlier } return NULL; } int ThreadPool::add_task(Task* task){ if(m_pool_size == 1){ (*task)(); return 0; } m_task_mutex.lock(); // TODO: put a limit on how many tasks can be added at most m_tasks.push_back(task); number_of_ongoing_tasks++; number_of_total_tasks++; m_task_cond_var.signal(); // wake up one thread that is waiting for a task to be available m_task_mutex.unlock(); return 0; } void ThreadPool::startExecuting(){ if(m_pool_size == 1) return; /*cout << "waiting for tasks: locked. gonna wait" << endl;*/ m_tasks_loaded = true; //giving all threads permission to start as all tasks have been added for(int i=0;i