mirror of
https://github.com/slsdetectorgroup/slsDetectorPackage.git
synced 2025-04-23 06:50:02 +02:00
conflict merged fix, lockin in stop and startandreadallnowait, r_restreamstop moved to stop
This commit is contained in:
commit
6e876d79ee
@ -1581,6 +1581,7 @@ int multiSlsDetector::startAcquisition(){
|
||||
|
||||
|
||||
int multiSlsDetector::stopAcquisition(){
|
||||
pthread_mutex_lock(&mg); // locks due to processing thread using threadpool when in use
|
||||
int i=0;
|
||||
int ret=OK,ret1=OK;
|
||||
int posmin=0, posmax=thisMultiDetector->numberOfDetectors;
|
||||
@ -1625,7 +1626,7 @@ int multiSlsDetector::stopAcquisition(){
|
||||
}
|
||||
|
||||
*stoppedFlag=1;
|
||||
|
||||
pthread_mutex_unlock(&mg);
|
||||
return ret;
|
||||
};
|
||||
|
||||
@ -1874,7 +1875,7 @@ int* multiSlsDetector::startAndReadAll(){
|
||||
|
||||
|
||||
int multiSlsDetector::startAndReadAllNoWait(){
|
||||
|
||||
pthread_mutex_lock(&mg); // locks due to processing thread using threadpool when in use
|
||||
int i=0;
|
||||
int ret=OK;
|
||||
int posmin=0, posmax=thisMultiDetector->numberOfDetectors;
|
||||
@ -1919,7 +1920,7 @@ int multiSlsDetector::startAndReadAllNoWait(){
|
||||
ret=FAIL;
|
||||
}
|
||||
}
|
||||
|
||||
pthread_mutex_unlock(&mg);
|
||||
return ret;
|
||||
|
||||
}
|
||||
@ -6794,20 +6795,3 @@ bool multiSlsDetector::isAcquireReady() {
|
||||
return OK;
|
||||
}
|
||||
|
||||
|
||||
|
||||
int multiSlsDetector::restreamStopFromReceiver() {
|
||||
int ret=OK, ret1;
|
||||
for(int idet=0; idet<thisMultiDetector->numberOfDetectors; ++idet){
|
||||
if (detectors[idet]) {
|
||||
ret1=detectors[idet]->restreamStopFromReceiver();
|
||||
if(detectors[idet]->getErrorMask())
|
||||
setErrorMask(getErrorMask()|(1<<idet));
|
||||
if (ret1!=OK)
|
||||
ret=FAIL;
|
||||
}
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
@ -1520,14 +1520,6 @@ class multiSlsDetector : public slsDetectorUtils {
|
||||
*/
|
||||
bool isAcquireReady();
|
||||
|
||||
/**
|
||||
If data streaming in receiver is enabled,
|
||||
restream the stop dummy packet from receiver
|
||||
Used usually for Moench,
|
||||
in case it is lost in network due to high data rate
|
||||
\returns OK if success else FAIL
|
||||
*/
|
||||
int restreamStopFromReceiver();
|
||||
|
||||
private:
|
||||
|
||||
|
@ -4196,6 +4196,9 @@ int slsDetector::startAcquisition(){
|
||||
};
|
||||
int slsDetector::stopAcquisition(){
|
||||
|
||||
runStatus s = getRunStatus();
|
||||
runStatus r = getReceiverStatus();
|
||||
|
||||
int fnum=F_STOP_ACQUISITION;
|
||||
int ret=FAIL;
|
||||
char mess[MAX_STR_LENGTH]="";
|
||||
@ -4217,6 +4220,11 @@ int slsDetector::stopAcquisition(){
|
||||
}
|
||||
}
|
||||
thisDetector->stoppedFlag=1;
|
||||
|
||||
if ((thisDetector->receiver_upstream) && (s == IDLE) && (r == IDLE)) {
|
||||
restreamStopFromReceiver();
|
||||
}
|
||||
|
||||
return ret;
|
||||
|
||||
|
||||
@ -9446,8 +9454,10 @@ int slsDetector::restreamStopFromReceiver(){
|
||||
}
|
||||
if(ret==FORCE_UPDATE)
|
||||
ret=updateReceiver();
|
||||
else if (ret == FAIL)
|
||||
else if (ret == FAIL) {
|
||||
setErrorMask((getErrorMask())|(RESTREAM_STOP_FROM_RECEIVER));
|
||||
std::cout << " Could not restream stop dummy packet from receiver" << endl;
|
||||
}
|
||||
}
|
||||
|
||||
return ret;
|
||||
|
@ -191,7 +191,7 @@ slsDetectorCommand::slsDetectorCommand(slsDetectorUtils *det) {
|
||||
++i;
|
||||
|
||||
/*! \page acquisition
|
||||
- <b> status [s] </b> starts or stops acquisition in detector in non blocking mode. \c s: [\c start, \c stop]. \c Returns the detector status: [\c running, \c error, \c transmitting, \c finished, \c waiting, \c idle]. \c Returns \c (string)
|
||||
- <b> status [s] </b> starts or stops acquisition in detector in non blocking mode. When using stop acquisition and if acquisition is done, it will restream the stop packet from receiver (if data streaming in receiver is on). \c s: [\c start, \c stop]. \c Returns the detector status: [\c running, \c error, \c transmitting, \c finished, \c waiting, \c idle]. \c Returns \c (string)
|
||||
*/
|
||||
descrToFuncMap[i].m_pFuncName="status"; //
|
||||
descrToFuncMap[i].m_pFuncPtr=&slsDetectorCommand::cmdStatus;
|
||||
@ -2069,15 +2069,6 @@ slsDetectorCommand::slsDetectorCommand(slsDetectorUtils *det) {
|
||||
descrToFuncMap[i].m_pFuncPtr=&slsDetectorCommand::cmdReceiver;
|
||||
++i;
|
||||
|
||||
/*! \page receiver
|
||||
- <b>r_restreamstop [i]</b> If data streaming in receiver is enabled, restreams the stop dummy packet via zmq. i can be any value. Only put! \cReturns 1 for success 0 for fail \c (int)
|
||||
*/
|
||||
descrToFuncMap[i].m_pFuncName="r_restreamstop"; //
|
||||
descrToFuncMap[i].m_pFuncPtr=&slsDetectorCommand::cmdReceiver;
|
||||
++i;
|
||||
|
||||
|
||||
|
||||
/* pattern generator */
|
||||
|
||||
/*! \page ctb Chiptest board
|
||||
@ -2456,8 +2447,10 @@ string slsDetectorCommand::cmdStatus(int narg, char *args[], int action) {
|
||||
//myDet->setThreadedProcessing(0);
|
||||
if (string(args[1])=="start")
|
||||
myDet->startAcquisition();
|
||||
else if (string(args[1])=="stop")
|
||||
else if (string(args[1])=="stop") {
|
||||
myDet->setReceiverOnline(ONLINE_FLAG);
|
||||
myDet->stopAcquisition();
|
||||
}
|
||||
else
|
||||
return string("unknown action");
|
||||
}
|
||||
@ -2489,7 +2482,7 @@ string slsDetectorCommand::helpStatus(int narg, char *args[], int action) {
|
||||
os << string("busy \t gets the status of acquire- can be: 0 or 1. 0 for idle, 1 for running\n");
|
||||
}
|
||||
if (action==PUT_ACTION || action==HELP_ACTION) {
|
||||
os << string("status \t controls the detector acquisition - can be start or stop \n");
|
||||
os << string("status \t controls the detector acquisition - can be start or stop. When using stop acquisition and if acquisition is done, it will restream the stop packet from receiver (if data streaming in receiver is on). \n");
|
||||
os << string("busy i\t sets the status of acquire- can be: 0(idle) or 1(running).Command Acquire sets it to 1 at beignning of acquire and back to 0 at the end. Clear Flag for unexpected acquire terminations. \n");
|
||||
}
|
||||
return os.str();
|
||||
@ -6060,31 +6053,6 @@ string slsDetectorCommand::cmdReceiver(int narg, char *args[], int action) {
|
||||
}
|
||||
|
||||
|
||||
else if(cmd=="r_restreamstop"){
|
||||
if (action==GET_ACTION)
|
||||
return string("cannot get");
|
||||
else {
|
||||
runStatus s = myDet->getRunStatus();
|
||||
if (s != IDLE) {
|
||||
std::cout << "Could not restream stop from receiver.\n"
|
||||
"Detector Acquisition still in progress. Status: " << myDet->runStatusType(s) << std::endl;
|
||||
sprintf(answer,"%d",0);
|
||||
return string (answer);
|
||||
}
|
||||
s = myDet->getReceiverStatus();
|
||||
if (s != IDLE) {
|
||||
std::cout << "Could not restream stop from receiver.\n"
|
||||
"Receiver Acquisition still in progress. Status: " << myDet->runStatusType(s) << std::endl;
|
||||
sprintf(answer,"%d",0);
|
||||
return string (answer);
|
||||
}
|
||||
else
|
||||
sprintf(answer,"%d",(myDet->restreamStopFromReceiver() == OK) ? 1 : 0);
|
||||
}
|
||||
return string(answer);
|
||||
}
|
||||
|
||||
|
||||
return string("could not decode command");
|
||||
|
||||
}
|
||||
@ -6101,7 +6069,6 @@ string slsDetectorCommand::helpReceiver(int narg, char *args[], int action) {
|
||||
os << "tengiga \t sets system to be configure for 10Gbe if set to 1, else 1Gbe if set to 0" << std::endl;
|
||||
os << "rx_fifodepth [val]\t sets receiver fifo depth to val" << std::endl;
|
||||
os << "r_silent [i]\t sets receiver in silent mode, ie. it will not print anything during real time acquisition. 1 sets, 0 unsets." << std::endl;
|
||||
os << "r_restreamstop [i]\t If data streaming in receiver is enabled and receiver is idle, restreams the stop dummy packet via zmq. i can be any value." << std::endl;
|
||||
}
|
||||
if (action==GET_ACTION || action==HELP_ACTION){
|
||||
os << "receiver \t returns the status of receiver - can be running or idle" << std::endl;
|
||||
@ -6111,7 +6078,6 @@ string slsDetectorCommand::helpReceiver(int narg, char *args[], int action) {
|
||||
os << "tengiga \t returns 1 if the system is configured for 10Gbe else 0 for 1Gbe" << std::endl;
|
||||
os << "rx_fifodepth \t returns receiver fifo depth" << std::endl;
|
||||
os << "r_silent \t returns receiver silent mode enable. 1 is silent, 0 not silent." << std::endl;
|
||||
os << "r_restreamstop \t returns if restreaming the stop dummy packet from receiver via zmq was success(1) or fail(0)" << std:: endl;
|
||||
}
|
||||
return os.str();
|
||||
}
|
||||
|
@ -970,16 +970,6 @@ virtual int setReceiverSilentMode(int i = -1)=0;
|
||||
*/
|
||||
virtual bool isAcquireReady() = 0;
|
||||
|
||||
/**
|
||||
If data streaming in receiver is enabled,
|
||||
restream the stop dummy packet from receiver
|
||||
Used usually for Moench,
|
||||
in case it is lost in network due to high data rate
|
||||
\returns OK if success else FAIL
|
||||
*/
|
||||
virtual int restreamStopFromReceiver() = 0;
|
||||
|
||||
|
||||
|
||||
protected:
|
||||
|
||||
|
@ -418,9 +418,7 @@ int postProcessing::fillBadChannelMask() {
|
||||
|
||||
|
||||
void* postProcessing::processData(int delflag) {
|
||||
pthread_mutex_lock(&mg);
|
||||
if(setReceiverOnline()==OFFLINE_FLAG){
|
||||
pthread_mutex_unlock(&mg);
|
||||
|
||||
#ifdef VERBOSE
|
||||
std::cout<< " ??????????????????????????????????????????? processing data - threaded mode " << *threadedProcessing << endl;
|
||||
@ -505,8 +503,6 @@ void* postProcessing::processData(int delflag) {
|
||||
}
|
||||
} //receiver
|
||||
else{
|
||||
|
||||
pthread_mutex_unlock(&mg);
|
||||
//cprintf(RED,"In post processing threads\n");
|
||||
|
||||
if(dataReady) {
|
||||
@ -535,11 +531,11 @@ void* postProcessing::processData(int delflag) {
|
||||
|
||||
|
||||
//get progress
|
||||
pthread_mutex_lock(&mg);
|
||||
if(setReceiverOnline() == ONLINE_FLAG){
|
||||
pthread_mutex_lock(&mg);
|
||||
caught = getFramesCaughtByAnyReceiver();
|
||||
pthread_mutex_unlock(&mg);
|
||||
}
|
||||
pthread_mutex_unlock(&mg);
|
||||
|
||||
|
||||
//updating progress
|
||||
|
@ -7,7 +7,6 @@ ThreadPool::ThreadPool(int pool_size) : m_pool_size(pool_size){
|
||||
#endif
|
||||
m_tasks_loaded = false;
|
||||
thread_started = false;
|
||||
zmqthreadpool = false;
|
||||
current_thread_number = -1;
|
||||
number_of_ongoing_tasks = 0;
|
||||
number_of_total_tasks = 0;
|
||||
@ -117,7 +116,6 @@ void* ThreadPool::execute_thread(){
|
||||
/*cout << "Unlocking: " << pthread_self() << endl;*/
|
||||
m_task_mutex.unlock();
|
||||
|
||||
//if(zmqthreadpool) cout<<"***"<<ithread<<" semaphore start address wait:"<<&semStart<<endl;
|
||||
sem_wait(&semStart);
|
||||
|
||||
//cout<<"***"<<ithread<<" checking out semaphore done address:"<<&semDone<<endl;
|
||||
@ -134,14 +132,10 @@ void* ThreadPool::execute_thread(){
|
||||
number_of_ongoing_tasks--;
|
||||
m_task_mutex.unlock();
|
||||
|
||||
//if(zmqthreadpool) cout<<ithread <<" task done: "<<number_of_ongoing_tasks<<endl;
|
||||
//last task and check m_tasks_loaded to ensure done only once
|
||||
if((!number_of_ongoing_tasks) && m_tasks_loaded){
|
||||
//if(zmqthreadpool) cout << ithread << " all tasks done."<<endl;
|
||||
m_tasks_loaded = false;
|
||||
}
|
||||
//if(zmqthreadpool) cout<<"***"<<ithread<<" semaphore done address post:"<<&semDone<<endl;
|
||||
|
||||
|
||||
sem_post(&semDone);
|
||||
//removed deleteing task to earlier
|
||||
@ -155,6 +149,7 @@ int ThreadPool::add_task(Task* task){
|
||||
return 0;
|
||||
}
|
||||
m_task_mutex.lock();
|
||||
|
||||
// TODO: put a limit on how many tasks can be added at most
|
||||
m_tasks.push_back(task);
|
||||
number_of_ongoing_tasks++;
|
||||
@ -172,7 +167,6 @@ void ThreadPool::startExecuting(){
|
||||
m_tasks_loaded = true;
|
||||
|
||||
//giving all threads permission to start as all tasks have been added
|
||||
//if(zmqthreadpool) cout<<"*** semaphore start address post:"<<&semStart<<endl;
|
||||
for(int i=0;i<number_of_total_tasks;i++)
|
||||
sem_post(&semStart);
|
||||
|
||||
@ -181,14 +175,12 @@ void ThreadPool::startExecuting(){
|
||||
void ThreadPool::wait_for_tasks_to_complete(){
|
||||
if(m_pool_size == 1)
|
||||
return;
|
||||
//if(zmqthreadpool) cout<<"waiting for all tasks to be done "<<endl;
|
||||
//if(zmqthreadpool) cout<<"*** semaphore done address wait:"<<&semDone<<endl;
|
||||
for(int i=0;i<number_of_total_tasks;i++)
|
||||
|
||||
for(int i=0;i<number_of_total_tasks;i++) {
|
||||
//cprintf(MAGENTA,"waiting for %d to be done, total tasks:%d\n", i, number_of_total_tasks);
|
||||
sem_wait(&semDone);
|
||||
//cprintf(YELLOW,"done with waiting for %d, total tasks:%d\n", i, number_of_total_tasks);
|
||||
}
|
||||
number_of_total_tasks = 0;
|
||||
//if(zmqthreadpool) cout<<"complete"<<endl<<endl;
|
||||
}
|
||||
|
||||
void ThreadPool::setzeromqThread(){
|
||||
zmqthreadpool = true;
|
||||
}
|
||||
|
@ -28,7 +28,6 @@ public:
|
||||
int add_task(Task* task);
|
||||
void startExecuting();
|
||||
void wait_for_tasks_to_complete();
|
||||
void setzeromqThread();
|
||||
|
||||
private:
|
||||
int m_pool_size;
|
||||
@ -48,6 +47,5 @@ private:
|
||||
|
||||
sem_t semStart;
|
||||
sem_t semDone;
|
||||
bool zmqthreadpool;
|
||||
};
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user