create threads and destroy threads should work now

git-svn-id: file:///afs/psi.ch/project/sls_det_software/svn/slsDetectorSoftware@741 951219d9-93cf-4727-9268-0efd64621fa3
This commit is contained in:
l_maliakal_d 2014-02-20 14:16:06 +00:00
parent 506ca8d6b0
commit 792e0f3845
2 changed files with 119 additions and 62 deletions

View File

@ -114,8 +114,8 @@ slsReceiverFunctionList::slsReceiverFunctionList(detectorType det):
receiverdata[i] = NULL; receiverdata[i] = NULL;
#ifdef MYROOT1 #ifdef MYROOT1
myTree[i] = (NULL); myTree[i] = (NULL);
myFile[i] = (NULL); myFile[i] = (NULL);
#endif #endif
} }
@ -133,7 +133,6 @@ slsReceiverFunctionList::slsReceiverFunctionList(detectorType det):
cout << "\nWARNING: Could not change socket receiver buffer size in file /proc/sys/net/core/rmem_max" << endl; cout << "\nWARNING: Could not change socket receiver buffer size in file /proc/sys/net/core/rmem_max" << endl;
else if(system("echo 250000 > /proc/sys/net/core/netdev_max_backlog")) else if(system("echo 250000 > /proc/sys/net/core/netdev_max_backlog"))
cout << "\nWARNING: Could not change max length of input queue in file /proc/sys/net/core/netdev_max_backlog" << endl; cout << "\nWARNING: Could not change max length of input queue in file /proc/sys/net/core/netdev_max_backlog" << endl;
/** permanent setting heiner /** permanent setting heiner
net.core.rmem_max = 104857600 # 100MiB net.core.rmem_max = 104857600 # 100MiB
net.core.netdev_max_backlog = 250000 net.core.netdev_max_backlog = 250000
@ -142,13 +141,24 @@ slsReceiverFunctionList::slsReceiverFunctionList(detectorType det):
sysctl -w net.core.rmem_max=16777216 sysctl -w net.core.rmem_max=16777216
sysctl -w net.core.netdev_max_backlog=250000 sysctl -w net.core.netdev_max_backlog=250000
*/ */
/*
if(createThreads() == FAIL){
//threads
pthread_mutex_lock(&status_mutex);
status = IDLE;
pthread_mutex_unlock(&(status_mutex));
if(createListeningThreads() == FAIL){
cout << "ERROR: Could not create listening thread" << endl;
exit (-1);
}
if(createWriterThreads() == FAIL){
cout << "ERROR: Could not create writer threads" << endl; cout << "ERROR: Could not create writer threads" << endl;
exit (-1); exit (-1);
} }
*/
setThreadPriorities();
} }
@ -161,7 +171,8 @@ slsReceiverFunctionList::~slsReceiverFunctionList(){
if(receiverdata[i]) if(receiverdata[i])
delete receiverdata[i]; delete receiverdata[i];
} }
createThreads(true); createListeningThreads(true);
createWriterThreads(true);
if(udpSocket) delete udpSocket; if(udpSocket) delete udpSocket;
if(eth) delete [] eth; if(eth) delete [] eth;
@ -308,21 +319,32 @@ int64_t slsReceiverFunctionList::setAcquisitionPeriod(int64_t index){
/******************* need to look at exit strategy **************************/ /******************* need to look at exit strategy **************************/
void slsReceiverFunctionList::enableDataCompression(bool enable){ void slsReceiverFunctionList::enableDataCompression(bool enable){
cout << "Data compression ";
if(enable)
cout << "enabled" << endl;
else
cout << "disabled" << endl;
//delete filter for the current number of threads //delete filter for the current number of threads
deleteFilter(); deleteFilter();
dataCompression = enable; dataCompression = enable;
pthread_mutex_lock(&status_mutex); pthread_mutex_lock(&status_mutex);
listening_thread_running = false;
writerthreads_mask = 0x0; writerthreads_mask = 0x0;
pthread_mutex_unlock(&(status_mutex)); pthread_mutex_unlock(&(status_mutex));
/*createThreads(true); need to make sure number of threads is correct when deleting old stuff*/ createWriterThreads(true);
if(enable) if(enable)
numWriterThreads = MAX_NUM_WRITER_THREADS; numWriterThreads = MAX_NUM_WRITER_THREADS;
else else
numWriterThreads = 1; numWriterThreads = 1;
createThreads();
if(createWriterThreads() == FAIL){
cout << "ERROR: Could not create writer threads" << endl;
exit (-1);
}
setThreadPriorities();
if(enable) if(enable)
setupFilter(); setupFilter();
@ -506,8 +528,10 @@ void slsReceiverFunctionList::setupFifoStructure(){
//allocate memory //allocate memory
mem0=(char*)malloc((bufferSize * numJobsPerThread + HEADER_SIZE_NUM_TOT_PACKETS)*fifosize); mem0=(char*)malloc((bufferSize * numJobsPerThread + HEADER_SIZE_NUM_TOT_PACKETS)*fifosize);
/** shud let the client know about this */ /** shud let the client know about this */
if (mem0==NULL) if (mem0==NULL){
cout<<"++++++++++++++++++++++ COULD NOT ALLOCATE MEMORY FOR LISTENING !!!!!!!+++++++++++++++++++++" << endl; cout<<"++++++++++++++++++++++ COULD NOT ALLOCATE MEMORY FOR LISTENING !!!!!!!+++++++++++++++++++++" << endl;
exit(-1);
}
buffer=mem0; buffer=mem0;
//push the addresses into freed fifoFree and writingFifoFree //push the addresses into freed fifoFree and writingFifoFree
while (buffer<(mem0+(bufferSize * numJobsPerThread + HEADER_SIZE_NUM_TOT_PACKETS)*(fifosize-1))) { while (buffer<(mem0+(bufferSize * numJobsPerThread + HEADER_SIZE_NUM_TOT_PACKETS)*(fifosize-1))) {
@ -558,33 +582,56 @@ int slsReceiverFunctionList::createUDPSocket(){
int slsReceiverFunctionList::createListeningThreads(bool destroy){
int slsReceiverFunctionList::createThreads(bool destroy){
int i;
if(!destroy){ if(!destroy){
pthread_mutex_lock(&status_mutex); pthread_mutex_lock(&status_mutex);
status = IDLE;
listening_thread_running = 0; listening_thread_running = 0;
writerthreads_mask = 0x0;
pthread_mutex_unlock(&(status_mutex)); pthread_mutex_unlock(&(status_mutex));
//listening thread //listening thread
cout << "Creating Listening Thread" << endl;
sem_init(&listensmp,0,0); sem_init(&listensmp,0,0);
if(pthread_create(&listening_thread, NULL,startListeningThread, (void*) this)){ if(pthread_create(&listening_thread, NULL,startListeningThread, (void*) this)){
cout << "Could not create listening thread" << endl; cout << "Could not create listening thread" << endl;
return FAIL; return FAIL;
} }
//#ifdef VERBOSE #ifdef VERBOSE
cout << "Listening thread created successfully." << endl; cout << "Listening thread created successfully." << endl;
//#endif #endif
}else{
cout<<"Destroying Listening Thread"<<endl;
sem_post(&listensmp);
sem_destroy(&listensmp);
cout << "Threads destroyed" << endl;
if(pthread_cancel(listening_thread)!=0)
cout << "Unable to cancel listening Thread " << endl;
}
return OK;
}
int slsReceiverFunctionList::createWriterThreads(bool destroy){
int i;
if(!destroy){
pthread_mutex_lock(&status_mutex);
writerthreads_mask = 0x0;
pthread_mutex_unlock(&(status_mutex));
//start writer threads //start writer threads
cout << "Creating Writer Threads"; cout << "Creating Writer Threads";
currentWriterThreadIndex = -1; currentWriterThreadIndex = -1;
for(i = 0; i < numWriterThreads; ++i){ for(i = 0; i < numWriterThreads; ++i){
sem_init(&writersmp[i],0,0); sem_init(&writersmp[i],0,0);
thread_started = 0; thread_started = 0;
@ -597,52 +644,20 @@ int slsReceiverFunctionList::createThreads(bool destroy){
cout << "."; cout << ".";
cout << flush; cout << flush;
} }
//#ifdef VERBOSE #ifdef VERBOSE
cout << endl << "Writer threads created successfully." << endl; cout << endl << "Writer threads created successfully." << endl;
//#endif #else
cout << endl;
#endif
//assign priorities }else{
struct sched_param tcp_param, listen_param, write_param; cout << "Destroying Writer Thread" << endl;
int policy= SCHED_RR;
bool rights = true;
tcp_param.sched_priority = 50;
listen_param.sched_priority = 99;
write_param.sched_priority = 90;
if (pthread_setschedparam(listening_thread, policy, &listen_param) == EPERM)
rights = false;
for(i = 0; i < numWriterThreads; ++i)
if(rights)
if (pthread_setschedparam(writing_thread[i], policy, &write_param) == EPERM){
rights = false;
break;
}
if (pthread_setschedparam(pthread_self(),5 , &tcp_param) == EPERM)
rights = false;
if(!rights)
cout << "WARNING: Could not prioritize threads. You need to be super user for that." << endl;
}
else{cout<<"DESTROYNG THREADS"<<endl;
//cancel threads
for(i = 0; i < numWriterThreads; ++i){ for(i = 0; i < numWriterThreads; ++i){
if(pthread_cancel(writing_thread[i])!=0)
cout << "Unable to cancel Thread of index" << i << endl;
sem_post(&writersmp[i]); sem_post(&writersmp[i]);
sem_destroy(&writersmp[i]); sem_destroy(&writersmp[i]);
if(pthread_cancel(writing_thread[i])!=0)
cout << "Unable to cancel Thread of index" << i << endl;
} }
//semaphore destroy
if(pthread_cancel(listening_thread)!=0)
cout << "Unable to cancel listening Thread " << endl;
sem_post(&listensmp);
sem_destroy(&listensmp);
cout << "Threads destroyed" << endl;
} }
return OK; return OK;
@ -656,6 +671,37 @@ int slsReceiverFunctionList::createThreads(bool destroy){
void slsReceiverFunctionList::setThreadPriorities(){
//assign priorities
struct sched_param tcp_param, listen_param, write_param;
int policy= SCHED_RR;
bool rights = true;
tcp_param.sched_priority = 50;
listen_param.sched_priority = 99;
write_param.sched_priority = 90;
if (pthread_setschedparam(listening_thread, policy, &listen_param) == EPERM)
rights = false;
for(int i = 0; i < numWriterThreads; ++i)
if(rights)
if (pthread_setschedparam(writing_thread[i], policy, &write_param) == EPERM){
rights = false;
break;
}
if (pthread_setschedparam(pthread_self(),5 , &tcp_param) == EPERM)
rights = false;
if(!rights)
cout << "WARNING: Could not prioritize threads. You need to be super user for that." << endl;
}
int slsReceiverFunctionList::setupWriter(){ int slsReceiverFunctionList::setupWriter(){

View File

@ -236,10 +236,21 @@ private:
int createUDPSocket(); int createUDPSocket();
/** /**
* create listening thread and many writer threads at class construction * create listening thread
* @param destroy is true to kill all threads and start again * @param destroy is true to kill all threads and start again
*/ */
int createThreads(bool destroy = false); int createListeningThreads(bool destroy = false);
/**
* create writer threads
* @param destroy is true to kill all threads and start again
*/
int createWriterThreads(bool destroy = false);
/**
* set thread priorities
*/
void setThreadPriorities();
/** /**
* initializes variables and creates the first file * initializes variables and creates the first file