using std::mutex and std::thread in multi detector

This commit is contained in:
Erik Frojdh
2018-10-17 16:56:01 +02:00
parent 5c2ff84c0e
commit 1d65063088
2 changed files with 40 additions and 151 deletions

View File

@ -44,14 +44,7 @@ multiSlsDetector::multiSlsDetector(int id, bool verify, bool update)
dataReady(0), dataReady(0),
pCallbackArg(0) pCallbackArg(0)
{ {
pthread_mutex_t mp1 = PTHREAD_MUTEX_INITIALIZER;
mp=mp1;
pthread_mutex_init(&mp, NULL);
mg=mp1;
pthread_mutex_init(&mg, NULL);
setupMultiDetector(verify, update); setupMultiDetector(verify, update);
} }
@ -93,20 +86,6 @@ std::vector<RT> multiSlsDetector::parallelCall(RT (slsDetector::*somefunc)(CT...
return result; return result;
} }
// std::string multiSlsDetector::concatResultOrPos(std::string (slsDetector::*somefunc)(int), int pos) {
// if (pos >= 0 && pos < (int)detectors.size()) {
// return (detectors[pos].get()->*somefunc)(pos);
// } else {
// std::string s;
// for (auto& d : detectors) {
// s += (d.get()->*somefunc)(pos) + "+";
// }
// return s;
// }
// }
int multiSlsDetector::decodeNChannel(int offsetX, int offsetY, int& channelX, int& channelY) { int multiSlsDetector::decodeNChannel(int offsetX, int offsetY, int& channelX, int& channelY) {
channelX = -1; channelX = -1;
channelY = -1; channelY = -1;
@ -138,7 +117,6 @@ int multiSlsDetector::decodeNChannel(int offsetX, int offsetY, int& channelX, in
std::string multiSlsDetector::getErrorMessage(int& critical, int detPos) { std::string multiSlsDetector::getErrorMessage(int& critical, int detPos) {
int64_t multiMask = 0, slsMask = 0; int64_t multiMask = 0, slsMask = 0;
std::string retval = ""; std::string retval = "";
// char sNumber[100];
critical = 0; critical = 0;
size_t posmin = 0, posmax = detectors.size(); size_t posmin = 0, posmax = detectors.size();
@ -842,24 +820,18 @@ int multiSlsDetector::setOnline(int off, int detPos) {
std::string multiSlsDetector::checkOnline(int detPos) { std::string multiSlsDetector::checkOnline(int detPos) {
// single if (detPos >= 0)
if (detPos >= 0) {
return detectors[detPos]->checkOnline(); return detectors[detPos]->checkOnline();
}
// multi
auto r = parallelCall(&slsDetector::checkOnline); auto r = parallelCall(&slsDetector::checkOnline);
return sls::concatenateNonEmptyStrings(r); return sls::concatenateNonEmptyStrings(r);
} }
int multiSlsDetector::setPort(portType t, int num, int detPos) { int multiSlsDetector::setPort(portType t, int num, int detPos) {
// single if (detPos >= 0)
if (detPos >= 0) {
return detectors[detPos]->setPort(t, num); return detectors[detPos]->setPort(t, num);
}
// multi
auto r = serialCall(&slsDetector::setPort, t, num); auto r = serialCall(&slsDetector::setPort, t, num);
return sls::minusOneIfDifferent(r); return sls::minusOneIfDifferent(r);
} }
@ -1194,33 +1166,20 @@ int multiSlsDetector::startAcquisition(int detPos) {
int multiSlsDetector::stopAcquisition(int detPos) { int multiSlsDetector::stopAcquisition(int detPos) {
int ret = OK;
// locks to synchronize using client->receiver simultaneously (processing thread) // locks to synchronize using client->receiver simultaneously (processing thread)
pthread_mutex_lock(&mg); std::lock_guard<std::mutex> lock(mg);
// single
if (detPos >= 0) { if (detPos >= 0) {
// if only 1 detector, set flag to stop current acquisition // if only 1 detector, set flag to stop current acquisition
if (detectors.size() == 1) if (detectors.size() == 1)
thisMultiDetector->stoppedFlag = 1; thisMultiDetector->stoppedFlag = 1;
ret = detectors[detPos]->stopAcquisition(); return detectors[detPos]->stopAcquisition();
} }
// multi
else { else {
// set flag to stop current acquisition
thisMultiDetector->stoppedFlag = 1; thisMultiDetector->stoppedFlag = 1;
auto r = parallelCall(&slsDetector::stopAcquisition); auto r = parallelCall(&slsDetector::stopAcquisition);
ret = sls::allEqualTo(r, static_cast<int>(OK)) ? OK : FAIL; return sls::allEqualTo(r, static_cast<int>(OK)) ? OK : FAIL;
} }
pthread_mutex_unlock(&mg);
return ret;
} }
@ -3678,43 +3637,26 @@ int multiSlsDetector::setTotalProgress() {
double multiSlsDetector::getCurrentProgress() { double multiSlsDetector::getCurrentProgress() {
pthread_mutex_lock(&mp); std::lock_guard<std::mutex> lock(mp);
#ifdef VERBOSE return 100.*((double)progressIndex)/((double)totalProgress);
std::cout << progressIndex << " / " << totalProgress << std::endl;
#endif
double p=100.*((double)progressIndex)/((double)totalProgress);
pthread_mutex_unlock(&mp);
return p;
} }
void multiSlsDetector::incrementProgress() { void multiSlsDetector::incrementProgress() {
pthread_mutex_lock(&mp); std::lock_guard<std::mutex> lock(mp);
progressIndex++; progressIndex++;
std::cout << std::fixed << std::setprecision(2) << std::setw (6) std::cout << std::fixed << std::setprecision(2) << std::setw (6)
<< 100.*((double)progressIndex)/((double)totalProgress) << " \%"; << 100.*((double)progressIndex)/((double)totalProgress) << " \%";
pthread_mutex_unlock(&mp); std::cout << '\r' << std::flush;
#ifdef VERBOSE
std::cout << std::endl;
#else
std::cout << "\r" << std::flush;
#endif
} }
void multiSlsDetector::setCurrentProgress(int i){ void multiSlsDetector::setCurrentProgress(int i){
pthread_mutex_lock(&mp); std::lock_guard<std::mutex> lock(mp);
progressIndex=i; progressIndex=i;
std::cout << std::fixed << std::setprecision(2) << std::setw (6) std::cout << std::fixed << std::setprecision(2) << std::setw (6)
<< 100.*((double)progressIndex)/((double)totalProgress) << " \%"; << 100.*((double)progressIndex)/((double)totalProgress) << " \%";
pthread_mutex_unlock(&mp); std::cout << '\r' << std::flush;
#ifdef VERBOSE
std::cout << std::endl;
#else
std::cout << "\r" << std::flush;
#endif
} }
@ -3748,45 +3690,37 @@ int multiSlsDetector::acquire(){
// verify receiver is idle // verify receiver is idle
if(receiver){ if(receiver){
pthread_mutex_lock(&mg); std::lock_guard<std::mutex> lock(mg);
if(getReceiverStatus()!=IDLE) if(getReceiverStatus()!=IDLE)
if(stopReceiver() == FAIL) if(stopReceiver() == FAIL)
thisMultiDetector->stoppedFlag=1; thisMultiDetector->stoppedFlag=1;
pthread_mutex_unlock(&mg);
} }
// start processing thread // start processing thread
if (thisMultiDetector->threadedProcessing) if (thisMultiDetector->threadedProcessing)
startProcessingThread(); startProcessingThread();
//resets frames caught in receiver //resets frames caught in receiver
if(receiver){ if(receiver){
pthread_mutex_lock(&mg); std::lock_guard<std::mutex> lock(mg);
if (resetFramesCaught() == FAIL) if (resetFramesCaught() == FAIL)
thisMultiDetector->stoppedFlag=1; thisMultiDetector->stoppedFlag=1;
pthread_mutex_unlock(&mg);
} }
// loop through measurements // loop through measurements
for(int im=0;im<nm;++im) { for(int im=0;im<nm;++im) {
if (thisMultiDetector->stoppedFlag) if (thisMultiDetector->stoppedFlag)
break; break;
// start receiver // start receiver
if(receiver){ if(receiver){
std::lock_guard<std::mutex> lock(mg);
pthread_mutex_lock(&mg);
if(startReceiver() == FAIL) { if(startReceiver() == FAIL) {
std::cout << "Start receiver failed " << std::endl; std::cout << "Start receiver failed " << std::endl;
stopReceiver(); stopReceiver();
thisMultiDetector->stoppedFlag=1; thisMultiDetector->stoppedFlag=1;
pthread_mutex_unlock(&mg);
break; break;
} }
pthread_mutex_unlock(&mg);
//let processing thread listen to these packets //let processing thread listen to these packets
sem_post(&sem_newRTAcquisition); sem_post(&sem_newRTAcquisition);
} }
@ -3798,31 +3732,22 @@ int multiSlsDetector::acquire(){
processData(); processData();
} }
// stop receiver // stop receiver
std::lock_guard<std::mutex> lock(mg);
if(receiver){ if(receiver){
pthread_mutex_lock(&mg);
if (stopReceiver() == FAIL) { if (stopReceiver() == FAIL) {
thisMultiDetector->stoppedFlag = 1; thisMultiDetector->stoppedFlag = 1;
pthread_mutex_unlock(&mg);
} else { } else {
pthread_mutex_unlock(&mg);
if (thisMultiDetector->threadedProcessing && dataReady) if (thisMultiDetector->threadedProcessing && dataReady)
sem_wait(&sem_endRTAcquisition); // waits for receiver's external process to be done sending data to gui sem_wait(&sem_endRTAcquisition); // waits for receiver's external process to be done sending data to gui
} }
} }
int findex = 0; int findex = 0;
pthread_mutex_lock(&mg);
findex = incrementFileIndex(); findex = incrementFileIndex();
pthread_mutex_unlock(&mg);
if (measurement_finished){ if (measurement_finished){
pthread_mutex_lock(&mg);
measurement_finished(im,findex,measFinished_p); measurement_finished(im,findex,measFinished_p);
pthread_mutex_unlock(&mg);
} }
if (thisMultiDetector->stoppedFlag) { if (thisMultiDetector->stoppedFlag) {
break; break;
} }
@ -3837,7 +3762,8 @@ int multiSlsDetector::acquire(){
//let processing thread continue and checkjointhread //let processing thread continue and checkjointhread
sem_post(&sem_newRTAcquisition); sem_post(&sem_newRTAcquisition);
pthread_join(dataProcessingThread, &status); // pthread_join(dataProcessingThread, &status);
dataProcessingThread.join();
} }
@ -3873,55 +3799,27 @@ int multiSlsDetector::setThreadedProcessing(int enable) {
void multiSlsDetector::startProcessingThread() { void multiSlsDetector::startProcessingThread() {
setTotalProgress(); setTotalProgress();
#ifdef VERBOSE dataProcessingThread = std::thread(&multiSlsDetector::processData, this);
std::cout << "start thread stuff" << std::endl;
#endif
pthread_attr_t tattr;
int ret;
sched_param param, mparam;
int policy= SCHED_OTHER;
// set the priority; others are unchanged
//newprio = 30;
mparam.sched_priority =1;
param.sched_priority =1;
/* Initialize and set thread detached attribute */
pthread_attr_init(&tattr);
pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_JOINABLE);
pthread_setschedparam(pthread_self(), policy, &mparam);
ret = pthread_create(&dataProcessingThread, &tattr,startProcessData, (void*)this);
if (ret)
printf("ret %d\n", ret);
pthread_attr_destroy(&tattr);
// scheduling parameters of target thread
pthread_setschedparam(dataProcessingThread, policy, &param);
} }
void* multiSlsDetector::startProcessData(void *n) { // void* multiSlsDetector::startProcessData(void *n) {
((multiSlsDetector*)n)->processData(); // ((multiSlsDetector*)n)->processData();
return n; // return n;
} // }
void* multiSlsDetector::processData() { void* multiSlsDetector::processData() {
if(setReceiverOnline()==OFFLINE_FLAG){ if(setReceiverOnline()==OFFLINE_FLAG){
return 0; return 0;
} //receiver }
else{ else{
//cprintf(RED,"In post processing threads\n");
if(dataReady) { if(dataReady) {
readFrameFromReceiver(); readFrameFromReceiver();
} }
//only update progress //only update progress
else{ else{
int caught = -1; int caught = -1;
char c;
int ifp;
while(true){ while(true){
// set only in startThread // set only in startThread
@ -3929,9 +3827,8 @@ void* multiSlsDetector::processData() {
setTotalProgress(); setTotalProgress();
// to exit acquire by typing q // to exit acquire by typing q
ifp=kbhit(); if (kbhit()!=0){
if (ifp!=0){ char c = fgetc(stdin);
c=fgetc(stdin);
if (c=='q') { if (c=='q') {
std::cout<<"Caught the command to stop acquisition"<<std::endl; std::cout<<"Caught the command to stop acquisition"<<std::endl;
stopAcquisition(); stopAcquisition();
@ -3941,18 +3838,13 @@ void* multiSlsDetector::processData() {
//get progress //get progress
if(setReceiverOnline() == ONLINE_FLAG){ if(setReceiverOnline() == ONLINE_FLAG){
pthread_mutex_lock(&mg); std::lock_guard<std::mutex> lock(mg);
caught = getFramesCaughtByReceiver(0); caught = getFramesCaughtByReceiver(0);
pthread_mutex_unlock(&mg);
} }
//updating progress //updating progress
if(caught!= -1){ if(caught!= -1){
setCurrentProgress(caught); setCurrentProgress(caught);
#ifdef VERY_VERY_DEBUG
std::cout << "caught:" << caught << std::endl;
#endif
} }
// exiting loop // exiting loop
@ -3961,28 +3853,22 @@ void* multiSlsDetector::processData() {
if (checkJoinThread()){ if (checkJoinThread()){
break; break;
} }
usleep(100 * 1000); //20ms need this else connecting error to receiver (too fast) usleep(100 * 1000); //20ms need this else connecting error to receiver (too fast)
} }
} }
} }
return 0; return 0;
} }
int multiSlsDetector::checkJoinThread() { int multiSlsDetector::checkJoinThread() {
int retval; std::lock_guard<std::mutex> lock(mp);
pthread_mutex_lock(&mp); return jointhread;
retval=jointhread;
pthread_mutex_unlock(&mp);
return retval;
} }
void multiSlsDetector::setJoinThread( int v) { void multiSlsDetector::setJoinThread( int v) {
pthread_mutex_lock(&mp); std::lock_guard<std::mutex> lock(mp);
jointhread=v; jointhread=v;
pthread_mutex_unlock(&mp);
} }

View File

@ -18,6 +18,8 @@ class detectorData;
#include <memory> #include <memory>
#include <vector> #include <vector>
#include <string> #include <string>
#include <thread>
#include <mutex>
#include <semaphore.h> #include <semaphore.h>
@ -1662,10 +1664,10 @@ private:
*/ */
void startProcessingThread(); void startProcessingThread();
/** // /**
* Static function to call processing thread // * Static function to call processing thread
*/ // */
static void* startProcessData(void *n); // static void* startProcessData(void *n);
/** /**
* Check if processing thread is ready to join main thread * Check if processing thread is ready to join main thread
@ -1718,10 +1720,10 @@ private:
int progressIndex; int progressIndex;
/** mutex to synchronize main and data processing threads */ /** mutex to synchronize main and data processing threads */
pthread_mutex_t mp; std::mutex mp;
/** mutex to synchronizedata processing and plotting threads */ /** mutex to synchronizedata processing and plotting threads */
pthread_mutex_t mg; std::mutex mg;
/** sets when the acquisition is finished */ /** sets when the acquisition is finished */
int jointhread; int jointhread;
@ -1730,7 +1732,8 @@ private:
int acquiringDone; int acquiringDone;
/** the data processing thread */ /** the data processing thread */
pthread_t dataProcessingThread; // pthread_t dataProcessingThread;
std::thread dataProcessingThread;
/** gui data */ /** gui data */
double *fdata; double *fdata;