something

This commit is contained in:
Dhanya Maliakal 2016-03-10 17:25:47 +01:00
parent c36db579b1
commit d3a7d3bad3
18 changed files with 863 additions and 18 deletions

View File

@ -10,12 +10,12 @@ CFLAGS= -g -DC_ONLY -fPIC
DFLAGS= -g -DDACS_INT
INCLUDES?= -IcommonFiles -IslsDetector -I../slsReceiverSoftware/MySocketTCP -IusersFunctions -ImultiSlsDetector -IslsDetectorUtils -IslsDetectorCommand -IslsDetectorAnalysis -IslsReceiverInterface -I../slsReceiverSoftware/include -I$(ASM)
INCLUDES?= -IcommonFiles -IslsDetector -I../slsReceiverSoftware/MySocketTCP -IusersFunctions -ImultiSlsDetector -IslsDetectorUtils -IslsDetectorCommand -IslsDetectorAnalysis -IslsReceiverInterface -I../slsReceiverSoftware/include -IthreadFiles -I$(ASM)
#EPICSFLAGS=-D EPICS -I/usr/local/epics/base/include/ -I /usr/local/epics/base/include/os/Linux/ -L /usr/local/epics/base/lib/$(EPICS_HOST_ARCH) -Wl,-R/usr/local/epics/base/lib/$(EPICS_HOST_ARCH) -lca -lCom
SRC_CLNT=slsDetectorAnalysis/fileIO.cpp usersFunctions/usersFunctions.cpp slsDetector/slsDetectorUtils.cpp slsDetector/slsDetectorCommand.cpp slsDetectorAnalysis/angularConversion.cpp slsDetectorAnalysis/angularConversionStatic.cpp slsDetectorAnalysis/energyConversion.cpp slsDetector/slsDetectorActions.cpp slsDetectorAnalysis/postProcessing.cpp slsDetector/slsDetector.cpp multiSlsDetector/multiSlsDetector.cpp slsDetectorAnalysis/postProcessingFuncs.cpp slsReceiverInterface/receiverInterface.cpp slsDetector/slsDetectorUsers.cpp #../slsReceiverSoftware/MySocketTCP/MySocketTCP.cpp
SRC_CLNT=slsDetectorAnalysis/fileIO.cpp usersFunctions/usersFunctions.cpp slsDetector/slsDetectorUtils.cpp slsDetector/slsDetectorCommand.cpp slsDetectorAnalysis/angularConversion.cpp slsDetectorAnalysis/angularConversionStatic.cpp slsDetectorAnalysis/energyConversion.cpp slsDetector/slsDetectorActions.cpp slsDetectorAnalysis/postProcessing.cpp slsDetector/slsDetector.cpp multiSlsDetector/multiSlsDetector.cpp slsDetectorAnalysis/postProcessingFuncs.cpp slsReceiverInterface/receiverInterface.cpp slsDetector/slsDetectorUsers.cpp threadFiles/CondVar.cpp threadFiles/Mutex.cpp threadFiles/ThreadPool.cpp #../slsReceiverSoftware/MySocketTCP/MySocketTCP.cpp
$(info )

View File

@ -16,6 +16,8 @@ ID: $Id$
#include "multiSlsDetectorClient.h"
#include "postProcessingFuncs.h"
#include "usersFunctions.h"
#include "ThreadPool.h"
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/shm.h>
@ -265,13 +267,50 @@ multiSlsDetector::multiSlsDetector(int id) : slsDetectorUtils(), shmId(-1)
getNMods();
getMaxMods();
if(createThreadPool() == FAIL)
exit(-1);
}
multiSlsDetector::~multiSlsDetector() {
//removeSlsDetector();
destroyThreadPool();
}
int multiSlsDetector::createThreadPool(){
if(threadpool){
threadpool->destroy_threadpool();
threadpool=0;
}
if(thisMultiDetector->numberOfDetectors < 1){
cout << "No detectors attached to create threadpool" << endl;
return OK;
}
threadpool = new ThreadPool(thisMultiDetector->numberOfDetectors);
switch(threadpool->initialize_threadpool()){
case 0:
cerr << "Failed to initialize thread pool!" << endl;
return FAIL;
case 1:
cout << "Not initializing threads, only one detector" << endl;
break;
default:
cout << "Initialized Threadpool" << endl;
break;
}
return OK;
}
void multiSlsDetector::destroyThreadPool(){
if(threadpool){
threadpool->destroy_threadpool();
threadpool=0;
cout<<"Destroyed Threadpool"<<endl;
}
}
int multiSlsDetector::addSlsDetector(int id, int pos) {
int j=thisMultiDetector->numberOfDetectors;
@ -1088,7 +1127,7 @@ int multiSlsDetector::setThresholdEnergy(int e_eV, int pos, detectorSettings ise
slsDetectorDefs::detectorSettings multiSlsDetector::getSettings(int pos) {
int i, posmin, posmax;
detectorSettings ret1=GET_SETTINGS, ret;
int ret1=-100, ret=-1;
if (pos<0) {
posmin=0;
@ -1098,21 +1137,49 @@ slsDetectorDefs::detectorSettings multiSlsDetector::getSettings(int pos) {
posmax=pos+1;
}
for (i=posmin; i<posmax; i++) {
if (detectors[i]) {
ret=detectors[i]->getSettings();
if(detectors[i]->getErrorMask())
setErrorMask(getErrorMask()|(1<<i));
if (ret1==GET_SETTINGS)
ret1=ret;
else if (ret!=ret1)
ret1=GET_SETTINGS;
/*
if(!threadpool){cout << "Error in creating threadpool. Exiting" << endl;return GET_SETTINGS;}
else{
//return storage values
int* iret[posmax-posmin];
for(int idet=posmin; idet<posmax; idet++){
if(detectors[idet]){
iret[idet]= new int(-1);
Task* task = new Task(new func_t<detectorSettings,slsDetector,int,int>(&slsDetector::getSettings,
detectors[idet],-1,iret[idet]));
threadpool->add_task(task);
}
}
threadpool->wait_for_tasks_to_complete();
for(int idet=posmin; idet<posmax; idet++){cout<<"final iret:"<<*iret[idet]<<endl;
if(detectors[idet]){
if(iret[idet] != NULL){
ret1 = *iret[idet];
delete iret[idet];
}
if (ret==-100)
ret=ret1;
else if (ret!=ret1)
ret=GET_SETTINGS;
}
}
}
*/
}
for (i=posmin; i<posmax; i++) {
if (detectors[i]) {
ret1=detectors[i]->getSettings();
if(detectors[i]->getErrorMask())
setErrorMask(getErrorMask()|(1<<i));
if (ret==GET_SETTINGS)
ret=ret;
else if (ret!=ret1)
ret=GET_SETTINGS;
}
}
thisMultiDetector->currentSettings=ret1;
return ret1;
thisMultiDetector->currentSettings=(detectorSettings)ret;
return (detectorSettings)ret;
}
slsDetectorDefs::detectorSettings multiSlsDetector::setSettings(detectorSettings isettings, int pos) {

View File

@ -16,6 +16,7 @@ ID: $Id$
#include "slsDetectorUtils.h"
class slsDetector;
class ThreadPool;
//#include "sls_detector_defs.h"
@ -240,6 +241,14 @@ class multiSlsDetector : public slsDetectorUtils {
/** destructor */
virtual ~multiSlsDetector();
/**
* Creates all the threads in the threadpool
\returns OK or FAIL
*/
int createThreadPool();
/** destroys all the threads in the threadpool */
void destroyThreadPool();
/** frees the shared memory occpied by the sharedMultiSlsDetector structure */
int freeSharedMemory() ;
@ -1362,6 +1371,8 @@ class multiSlsDetector : public slsDetectorUtils {
/** Shared memory structure */
sharedMultiSlsDetector *thisMultiDetector;
private:
ThreadPool* threadpool;

View File

@ -0,0 +1,17 @@
#include "CondVar.h"
CondVar::CondVar() {
pthread_cond_init(&m_cond_var, NULL);
}
CondVar::~CondVar() {
pthread_cond_destroy(&m_cond_var);
}
void CondVar::wait(pthread_mutex_t* mutex) {
pthread_cond_wait(&m_cond_var, mutex);
}
void CondVar::signal() {
pthread_cond_signal(&m_cond_var);
}
void CondVar::broadcast() {
pthread_cond_broadcast(&m_cond_var);
}

View File

@ -0,0 +1,24 @@
#pragma once
#include <pthread.h>
#include <unistd.h>
#include <deque>
#include <iostream>
#include <vector>
#include "Global.h"
using namespace std;
class CondVar {
public:
CondVar();
~CondVar();
void wait(pthread_mutex_t* mutex);
void signal();
void broadcast();
private:
pthread_cond_t m_cond_var;
};

View File

@ -0,0 +1,6 @@
#pragma once
const int DEFAULT_POOL_SIZE = 10;
const int MAX_SINGLES = 5;
const int STARTED = 0;
const int STOPPED = 1;

View File

@ -0,0 +1,21 @@
OBJPATH=bin/obj
EXAMPLEPATH=bin/example
all:
g++ CondVar.cpp -lpthread -c -g -o $(OBJPATH)/CondVar.o
g++ Mutex.cpp -lpthread -c -g -o $(OBJPATH)/Mutex.o
#g++ Task.cpp -lpthread -c -g -o $(OBJPATH)/Task.o
g++ ThreadPool.cpp -lpthread -c -g -o $(OBJPATH)/ThreadPool.o
g++ Multi.cpp -lpthread -c -g -o $(OBJPATH)/Multi.o
#g++ $(OBJPATH)/CondVar.o $(OBJPATH)/Mutex.o $(OBJPATH)/Task.o $(OBJPATH)/ThreadPool.o threadpool_test.cpp Single.cpp Multi.cpp -lpthread -I . -g -o $(EXAMPLEPATH)threadpool_test
g++ $(OBJPATH)/CondVar.o $(OBJPATH)/Mutex.o $(OBJPATH)/ThreadPool.o threadpool_test.cpp Single.cpp Multi.cpp -lpthread -I . -g -o $(EXAMPLEPATH)threadpool_test
#all:
# g++ threadpool.cpp -lpthread -fpic -c -o bin/obj/threadpool.o
# g++ -L./bin bin/obj/threadpool.o -lpthread threadpool_test.cpp -o bin/example/threadpool_test
#threadpool:
# g++ threadpool.cpp -lpthread -fpic -c -o bin/obj/threadpool.o
# g++ -shared -fPIC bin/obj/threadpool.o -o bin/lib/libthreadpool.so
#example:
# g++ -L./bin/lib -lthreadpool threadpool_test.cpp -o threadpool_test

View File

@ -0,0 +1,197 @@
#include "Multi.h"
#include "Single.h"
#include "ThreadPool.h"
#include <iostream>
#include <cstring>
#include <stdio.h>
#include <stdlib.h>
using namespace std;
char ans[1000];
int threadflag = 1;
Multi::Multi() {
numSingles = 1;
threadpool = 0;
for(int i=0;i<numSingles;i++)
singles[i] = new Single(i);
if(createThreadPool()== 0)
exit(-1);
}
Multi::~Multi() {
destroyThreadPool();
}
string Multi::executeCommand(int argc,char* argv[]){
if(!strcmp(argv[1],"printnum")){
int ival;
char answer[100];
if (!sscanf(argv[2],"%d",&ival))
return string("Could not scan input ")+string(argv[2]);
sprintf(answer,"%d",printNumber(ival));
return string(answer);
}
else if(!strcmp(argv[1],"printstring")){
return printString(argv[2]);
}
else if(!strcmp(argv[1],"printchararr")){
return string(printCharArray(argv[2]));
}
else return string("unrecognized command");
}
int Multi::createThreadPool(){
if(threadpool){
threadpool->destroy_threadpool();
threadpool=0;
}
if(numSingles > 0)
threadpool = new ThreadPool(numSingles);
switch(threadpool->initialize_threadpool()){
case -1:
cerr << "Failed to initialize thread pool!" << endl;
return 0;
case 0:
cout << "Not initializing threads, only one detector" << endl;
break;
default:
cout << "Initialized Threadpool" << endl;
break;
}
return 1;
}
int Multi::destroyThreadPool(){
if(threadpool){
threadpool->destroy_threadpool();
threadpool=0;
cout<<"Destroyed Threadpool"<<endl;
}
return 1;
}
int Multi::printNumber(int inum){
int ret=-100, ret1=-1;
if(!threadpool){
cout << "Error in creating threadpool. Exiting" << endl;
return -1;
}
else{
int* iret[numSingles];
for(int i=0;i<numSingles;i++){
iret[i]= new int(-1);
//func_t <int,Single,int, int>* binder =
// new func_t<int, Single,int, int>(&Single::printNumber,singles[i],inum,iret[i]);
Task* task = new Task(new func_t<int, Single,int, int>(&Single::printNumber,singles[i],inum,iret[i]));
threadpool->add_task(task);
}
threadpool->wait_for_tasks_to_complete();
for(int i=0;i<numSingles;i++){
if(iret[i] != NULL){
ret1 = *iret[i];
delete iret[i];
}
if(ret==-100)
ret = ret1;
else if (ret != ret1)
ret = -1;
}
}
return ret;
}
string Multi::printString(string s){
string ret="error", ret1="sss";
if(numSingles>1){
string* sret[numSingles];
for(int i=0;i<numSingles;i++){
sret[i]= new string("sss");
func_t <string,Single,string,string>* binder =
new func_t<string,Single,string,string>(&Single::printString,singles[i],s,sret[i]);
Task* task = new Task(binder);
threadpool->add_task(task);
}
threadpool->wait_for_tasks_to_complete();
for(int i=0;i<numSingles;i++){
if(sret[i] != NULL){
ret1 = *sret[i];
delete sret[i];
}
if(ret=="error")
ret = ret1;
else if (ret != ret1)
ret = "sss";
}
}
else{
for(int i=0;i<numSingles;i++){
ret1=singles[i]->printString(s);
if(ret=="error")
ret = ret1;
else if (ret != ret1)
ret = "sss";
}
}
return ret;
}
char* Multi::printCharArray(char a[]){
string ret="error", ret1="sss";
if(numSingles>1){
string* sret[numSingles];
for(int i=0;i<numSingles;i++){
sret[i]= new string("sss");
//std::fill_n(cret[i],1000,0);
func_t <char*,Single,char*,string>* binder =
new func_t <char*,Single,char*,string>(&Single::printCharArray,singles[i],a,sret[i]);
Task* task = new Task(binder);
threadpool->add_task(task);
}
threadpool->wait_for_tasks_to_complete();
for(int i=0;i<numSingles;i++){
if(sret[i] != NULL){
ret1 = *sret[i];
delete sret[i];
}
if(ret=="error")
ret = ret1;
else if (ret != ret1)
ret = "sss";
}
}
else{
for(int i=0;i<numSingles;i++){
ret1=singles[i]->printCharArray(a);
if(ret=="error")
ret = ret1;
else if (ret != ret1)
ret = "sss";
}
}
strcpy(ans,ret.c_str());
return ans;
}

View File

@ -0,0 +1,36 @@
#pragma once
#include "Global.h"
#include <string>
using namespace std;
class Single;
class ThreadPool;
class Multi {
public:
Multi();
~Multi();
string executeCommand(int argc,char* argv[]);
int printNumber(int inum);
string printString(string s);
char* printCharArray(char a[]);
int createThreadPool();
int destroyThreadPool();
protected:
Single* singles[MAX_SINGLES];
int numSingles;
ThreadPool* threadpool;
};

View File

@ -0,0 +1,26 @@
#include "Mutex.h"
Mutex::Mutex() {
pthread_mutex_init(&m_lock, NULL);
is_locked = false;
}
Mutex::~Mutex() {
while(is_locked);
unlock(); // Unlock Mutex after shared resource is safe
pthread_mutex_destroy(&m_lock);
}
void Mutex::lock() {
pthread_mutex_lock(&m_lock);
is_locked = true;
}
void Mutex::unlock() {
is_locked = false; // do it BEFORE unlocking to avoid race condition
pthread_mutex_unlock(&m_lock);
}
pthread_mutex_t* Mutex::get_mutex_ptr(){
return &m_lock;
}

View File

@ -0,0 +1,27 @@
#pragma once
#include <pthread.h>
#include <unistd.h>
#include <deque>
#include <iostream>
#include <vector>
#include <errno.h>
#include <string.h>
#include "Global.h"
using namespace std;
class Mutex
{
public:
Mutex();
~Mutex();
void lock();
void unlock();
pthread_mutex_t* get_mutex_ptr();
private:
pthread_mutex_t m_lock;
volatile bool is_locked;
};

View File

@ -0,0 +1,37 @@
#include "Single.h"
#include <iostream>
#include <cstring>
using namespace std;
char local_ans[1000];
Single::Single(int id) {
detID = id;
// cout<<"detID:"<<detID<<endl;
}
Single::~Single() {
}
int Single::getID(){
return detID;
}
int Single::printNumber(int i){
cout << detID << " - Number:"<< i << endl;
//usleep(detID*10000);
//usleep(100000 - detID* 10000);
return (i+1);
}
string Single::printString(string s){
cout << detID << " - String:"<< s << endl;
return string("string done");
}
char* Single::printCharArray(char a[]){
cout << detID << " - Char array:"<< a << endl;
strcpy(local_ans,"char done");
return local_ans;
}

View File

@ -0,0 +1,24 @@
#pragma once
#include "Multi.h"
#include <string>
using namespace std;
class Single {
public:
Single(int id);
~Single();
int getID();
int printNumber(int i);
string printString(string s);
char* printCharArray(char a[]);
private:
int detID;
int* someValue;
};

View File

@ -0,0 +1,68 @@
#pragma once
#include <pthread.h>
#include <unistd.h>
#include <deque>
#include <iostream>
#include <vector>
#include <errno.h>
#include <string.h>
#include "Global.h"
#include "sls_detector_defs.h"
#include <iostream>
using namespace std;
class slsDetector;
template<typename _Ret, typename _Class,typename _Arg1, typename _Store>
class func_t{
public:
func_t(_Ret (_Class::*fn)(_Arg1),_Class* ptr,_Arg1 arg1, _Store* sto):m_fn(fn),m_ptr(ptr),m_arg1(arg1),m_store(sto){}
~func_t() {}
void operator()() const {*m_store = ((m_ptr->*m_fn)(m_arg1));}
private:
_Class* m_ptr;
_Ret (_Class::*m_fn)(_Arg1);
_Arg1 m_arg1;
_Store* m_store;
};
class Task: public virtual slsDetectorDefs{
public:
Task(func_t <int,slsDetector,int,int>* t):m_int1(t),m_string1(0),m_chararr1(0),
m_settings(0){};
Task(func_t <string,slsDetector,string,string>* t): m_int1(0),m_string1(t),m_chararr1(0),
m_settings(0){};
Task(func_t <char*,slsDetector,char*,string>* t):m_int1(0),m_string1(0),m_chararr1(t),
m_settings(0){};
//settings
Task(func_t <slsDetectorDefs::detectorSettings,slsDetector,int,int>* t):
m_int1(0),m_string1(0),m_chararr1(0),
m_settings(t)
{};
~Task(){}
void operator()(){
if(m_int1) (*m_int1)();
else if(m_string1) (*m_string1)();
else if(m_chararr1) (*m_chararr1)();
else if(m_settings) (*m_settings)();
}
private:
func_t <int,slsDetector,int,int>* m_int1;
func_t <string,slsDetector,string,string>* m_string1;
func_t <char*,slsDetector,char*,string>* m_chararr1;
func_t <slsDetectorDefs::detectorSettings,slsDetector,int,int>* m_settings;
};

View File

@ -0,0 +1,175 @@
#include "ThreadPool.h"
ThreadPool::ThreadPool(int pool_size) : m_pool_size(pool_size)
{
cout << "Constructed ThreadPool of size " << m_pool_size << endl;
m_tasks_loaded = false;
thread_started = false;
current_thread_number = -1;
number_of_ongoing_tasks = 0;
}
ThreadPool::~ThreadPool()
{
// Release resources
if (m_pool_state != STOPPED) {
destroy_threadpool();
}
}
// We can't pass a member function to pthread_create.
// So created the wrapper function that calls the member function
// we want to run in the thread.
extern "C"
void* start_thread(void* arg)
{
ThreadPool* tp = (ThreadPool*) arg;
tp->execute_thread();
return NULL;
}
int ThreadPool::initialize_threadpool()
{
if(m_pool_size == 1)
return m_pool_size;
// TODO: COnsider lazy loading threads instead of creating all at once
m_pool_state = STARTED;
int ret = -1;
for (int i = 0; i < m_pool_size; i++) {
pthread_t tid;
thread_started = false;
current_thread_number = i;
ret = pthread_create(&tid, NULL, start_thread, (void*) this);
if (ret != 0) {
cerr << "pthread_create() failed: " << ret << endl;
return 0;
}
m_threads.push_back(tid);
while(!thread_started);
}
cout << m_pool_size << " threads created by the thread pool" << endl;
return m_pool_size;
}
int ThreadPool::destroy_threadpool()
{ if(m_pool_size == 1)
return 0;
//cout << "in destroying threadpool" << endl;
// Note: this is not for synchronization, its for thread communication!
// destroy_threadpool() will only be called from the main thread, yet
// the modified m_pool_state may not show up to other threads until its
// modified in a lock!
m_task_mutex.lock();
m_pool_state = STOPPED;
m_task_mutex.unlock();
/*cout << "Broadcasting STOP signal to all threads..." << endl;*/
m_task_cond_var.broadcast(); // notify all threads we are shttung down
int ret = -1;
for (int i = 0; i < m_pool_size; i++) {
void* result;
ret = pthread_join(m_threads[i], &result);
/*cout << "pthread_join() returned " << ret << ": " << strerror(errno) << endl;*/
m_task_cond_var.broadcast(); // try waking up a bunch of threads that are still waiting
}
number_of_ongoing_tasks = 0;
/* cout << m_pool_size << " threads exited from the thread pool" << endl;*/
return 0;
}
void* ThreadPool::execute_thread()
{
int ithread = current_thread_number;
thread_started = true;
Task* task = NULL;
m_tasks_loaded = false;
/*cout << "Starting thread " << pthread_self() << endl;*/
while(true) {
// Try to pick a task
/*cout << "Locking: " << pthread_self() << endl;*/
m_task_mutex.lock();
// We need to put pthread_cond_wait in a loop for two reasons:
// 1. There can be spurious wakeups (due to signal/ENITR)
// 2. When mutex is released for waiting, another thread can be waken up
// from a signal/broadcast and that thread can mess up the condition.
// So when the current thread wakes up the condition may no longer be
// actually true!
while ((m_pool_state != STOPPED) && (m_tasks.empty())) {
// Wait until there is a task in the queue
// Unlock mutex while wait, then lock it back when signaled
/* cout << "Unlocking and waiting: " << pthread_self() << endl;*/
m_task_cond_var.wait(m_task_mutex.get_mutex_ptr());
/* cout << "Signaled and locking: " << pthread_self() << endl;*/
}
// If the thread was woken up to notify process shutdown, return from here
if (m_pool_state == STOPPED) {
/* cout << "Unlocking and exiting: " << pthread_self() << endl;*/
m_task_mutex.unlock();
pthread_exit(NULL);
}
task = m_tasks.front();
m_tasks.pop_front();
/*cout << "Unlocking: " << pthread_self() << endl;*/
m_task_mutex.unlock();
//cout << "Executing thread " << pthread_self() << endl;
// execute the task
(*task)(); // could also do task->run(arg);
//cout << "Done executing thread " << pthread_self() << endl;
m_all_tasks_mutex.lock();
number_of_ongoing_tasks--; // cout<<"number_of_ongoing_tasks:"<<number_of_ongoing_tasks<<endl;
m_all_tasks_mutex.unlock();
//if all required tasks done
if(!ithread && m_tasks_loaded && (m_tasks.empty())){
while(number_of_ongoing_tasks)
usleep(5000);
m_all_tasks_mutex.lock();
m_tasks_loaded = false; //cout<<"all tasks done"<<endl;
m_all_tasks_cond_var.signal(); // wake up thread that is waiting for all tasks to be complete
m_all_tasks_mutex.unlock();
}
delete task;
}
return NULL;
}
int ThreadPool::add_task(Task* task)
{
if(m_pool_size == 1){
(*task)();
return 0;
}
m_task_mutex.lock();
// TODO: put a limit on how many tasks can be added at most
m_tasks.push_back(task);
number_of_ongoing_tasks++;// cout<<"number_of_ongoing_tasks:"<<number_of_ongoing_tasks<<endl;
m_task_cond_var.signal(); // wake up one thread that is waiting for a task to be available
m_task_mutex.unlock();
return 0;
}
void ThreadPool::wait_for_tasks_to_complete(){
if(m_pool_size == 1)
return;
m_all_tasks_mutex.lock();
m_tasks_loaded = true;
while ((m_pool_state != STOPPED) && m_tasks_loaded) {
m_all_tasks_cond_var.wait(m_all_tasks_mutex.get_mutex_ptr());
}
m_all_tasks_mutex.unlock();
}

View File

@ -0,0 +1,48 @@
#pragma once
#include <pthread.h>
#include <unistd.h>
#include <deque>
#include <iostream>
#include <vector>
#include <errno.h>
#include <string.h>
#include <stdint.h>
#include "Mutex.h"
#include "Task.h"
#include "CondVar.h"
#include "Global.h"
using namespace std;
class ThreadPool
{
public:
ThreadPool(int pool_size);
~ThreadPool();
int initialize_threadpool();
int destroy_threadpool();
void* execute_thread();
int add_task(Task* task);
void wait_for_tasks_to_complete();
private:
int m_pool_size;
Mutex m_task_mutex;
CondVar m_task_cond_var;
std::vector<pthread_t> m_threads; // storage for threads
std::deque<Task*> m_tasks;
volatile int m_pool_state;
Mutex m_all_tasks_mutex;
CondVar m_all_tasks_cond_var;
bool m_tasks_loaded;
bool thread_started;
int current_thread_number;
//volatile uint64_t tasks_done_mask;
volatile int number_of_ongoing_tasks;
};

Binary file not shown.

View File

@ -0,0 +1,61 @@
//#include "ThreadPool.h"
//#include "threadpool.h"
#include <iostream>
#include <string.h>
#include "Multi.h"
using namespace std;
//const int MAX_TASKS = 4;
/*
void hello(void* arg)
{
string* x = (string*) arg;
cout << "Hello: " << *x << endl;
// cout << "\n";
}
*/
int main(int argc, char* argv[])
{
Multi* m = new Multi();
cout<<"Answer:"<< m->executeCommand(argc,argv) << endl;
delete m;
/*
ThreadPool tp(2);
int ret = tp.initialize_threadpool();
if (ret == -1) {
cerr << "Failed to initialize thread pool!" << endl;
return 0;
}
*/
/*
for (int i = 0; i < MAX_TASKS; i++) {
cout<<"adding task:" <<argv[1]<<":"<< i<< endl;
string *x;
if(!strcmp(argv[1],"print"))
x = new string(argv[2]);
else x = new string("foo");
Task* t = new Task(&hello, (void*) x);
// cout << "Adding to pool, task " << i+1 << endl;
tp.add_task(t);
// cout << "Added to pool, task " << i+1 << endl;
}
sleep(2);
tp.destroy_threadpool();
*/
// TODO: delete worker objects
cout << "Exiting app..." << endl;
return 0;
}