removed external gui flag, acquiring flag is now not reset in stop acquisition, slsdet does not have acquiring flag anymore, busy accomodating acquiringflag, acquire cleanup, in acquire made lock for receiver more local, moved setacquiringflag false to bottom of acquire

This commit is contained in:
Dhanya Maliakal
2017-11-30 16:25:23 +01:00
parent 712c9a4524
commit 2227265357
7 changed files with 380 additions and 479 deletions

View File

@ -42,475 +42,419 @@ slsDetectorUtils::slsDetectorUtils() {
int slsDetectorUtils::acquire(int delflag){
//ensure acquire isnt started multiple times by same client
if(getAcquiringFlag() == false)
setAcquiringFlag(true);
else{
std::cout << "Error: Acquire has already been started." << std::endl;
return FAIL;
}
//ensure acquire isnt started multiple times by same client
if (isAcquireReady() == FAIL)
return FAIL;
#ifdef VERBOSE
struct timespec begin,end;
clock_gettime(CLOCK_REALTIME, &begin);
#endif
//not in the loop for real time acqusition yet,
//in the real time acquisition loop, processing thread will wait for a post each time
sem_init(&sem_newRTAcquisition,1,0);
bool receiver = (setReceiverOnline()==ONLINE_FLAG);
if(!receiver){
setDetectorIndex(-1);
}
bool receiver = (setReceiverOnline()==ONLINE_FLAG);
if(!receiver){
setDetectorIndex(-1);
}
int nc=setTimer(CYCLES_NUMBER,-1);
int nf=setTimer(FRAME_NUMBER,-1);
if (nc==0) nc=1;
if (nf==0) nf=1;
int nc=setTimer(CYCLES_NUMBER,-1);
int nf=setTimer(FRAME_NUMBER,-1);
if (nc==0) nc=1;
if (nf==0) nf=1;
int multiframe = nc*nf;
int multiframe = nc*nf;
progressIndex=0;
*stoppedFlag=0;
// setTotalProgress();
//moved these 2 here for measurement change
progressIndex=0;
*stoppedFlag=0;
angCalLogClass *aclog=NULL;
enCalLogClass *eclog=NULL;
// int lastindex=startindex, nowindex=startindex;
int connectChannels=0;
angCalLogClass *aclog=NULL;
enCalLogClass *eclog=NULL;
int connectChannels=0;
#ifdef VERBOSE
cout << "Acquire function "<< delflag << endl;
cout << "Stopped flag is "<< stoppedFlag << delflag << endl;
cout << "Acquire function "<< delflag << endl;
cout << "Stopped flag is "<< stoppedFlag << delflag << endl;
#endif
void *status;
void *status;
if ((*correctionMask&(1<< ANGULAR_CONVERSION)) || (*correctionMask&(1<< I0_NORMALIZATION)) || getActionMode(angCalLog) || (getScanMode(0)==positionScan)|| (getScanMode(1)==positionScan)) {
if (connectChannels==0)
if (connect_channels) {
connect_channels(CCarg);
connectChannels=1;
}
}
if (getActionMode(angCalLog)) {
aclog=new angCalLogClass(this);
}
if (getActionMode(enCalLog)) {
eclog=new enCalLogClass(this);
}
if ((*correctionMask&(1<< ANGULAR_CONVERSION)) || (*correctionMask&(1<< I0_NORMALIZATION)) || getActionMode(angCalLog) || (getScanMode(0)==positionScan)|| (getScanMode(1)==positionScan)) {
if (connectChannels==0)
if (connect_channels) {
connect_channels(CCarg);
connectChannels=1;
}
}
if (getActionMode(angCalLog)) {
aclog=new angCalLogClass(this);
}
if (getActionMode(enCalLog)) {
eclog=new enCalLogClass(this);
}
setJoinThread(0);
positionFinished(0);
setJoinThread(0);
positionFinished(0);
int nm=timerValue[MEASUREMENTS_NUMBER];
if (nm<1)
nm=1;
int np=getNumberOfPositions();
if (np<1)
np=1;
int nm=timerValue[MEASUREMENTS_NUMBER];
if (nm<1)
nm=1;
int ns0=1;
if (*actionMask & (1 << MAX_ACTIONS)) {
ns0=getScanSteps(0);
if (ns0<1)
ns0=1;
}
int np=getNumberOfPositions();
if (np<1)
np=1;
int ns0=1;
if (*actionMask & (1 << MAX_ACTIONS)) {
ns0=getScanSteps(0);
if (ns0<1)
ns0=1;
}
int ns1=1;
if (*actionMask & (1 << (MAX_ACTIONS+1))) {
ns1=getScanSteps(1);
if (ns1<1)
ns1=1;
}
int ns1=1;
if (*actionMask & (1 << (MAX_ACTIONS+1))) {
ns1=getScanSteps(1);
if (ns1<1)
ns1=1;
}
if(receiver){
pthread_mutex_lock(&mg); //cout << "lock"<< endl;
if(getReceiverStatus()!=IDLE)
if(stopReceiver() == FAIL)
*stoppedFlag=1;
pthread_mutex_unlock(&mg);//cout << "unlock"<< endl;
}
// verify receiver is idle
if(receiver){
pthread_mutex_lock(&mg);
if(getReceiverStatus()!=IDLE)
if(stopReceiver() == FAIL)
*stoppedFlag=1;
pthread_mutex_unlock(&mg);
}
if (*threadedProcessing)
startThread(delflag);
if (*threadedProcessing)
startThread(delflag);
#ifdef VERBOSE
cout << " starting thread " << endl;
cout << " starting thread " << endl;
#endif
//resets frames caught in receiver
if(receiver){
pthread_mutex_lock(&mg); //cout << "lock"<< endl;
if (resetFramesCaught() == FAIL)
*stoppedFlag=1;
pthread_mutex_unlock(&mg);//cout << "unlock"<< endl;
}
//resets frames caught in receiver
if(receiver){
pthread_mutex_lock(&mg);
if (resetFramesCaught() == FAIL)
*stoppedFlag=1;
pthread_mutex_unlock(&mg);
}
for(int im=0;im<nm;++im) {
for(int im=0;im<nm;++im) {
#ifdef VERBOSE
cout << " starting measurement "<< im << " of " << nm << endl;
cout << " starting measurement "<< im << " of " << nm << endl;
#endif
// start script
if (*stoppedFlag==0) {
executeAction(startScript);
}
for (int is0=0; is0<ns0; ++is0) {
if (*stoppedFlag==0) {
executeScan(0,is0);
} else
break;
for (int is1=0; is1<ns1; ++is1) {
if (*stoppedFlag==0) {
executeScan(1,is1);
} else
break;
if (*stoppedFlag==0) {
executeAction(scriptBefore);
} else
break;
ResetPositionIndex();
for (int ip=0; ip<np; ++ip) {
if (*stoppedFlag==0) {
if (getNumberOfPositions()>0) {
moveDetector(detPositions[ip]);
IncrementPositionIndex();
#ifdef VERBOSE
std::cout<< "moving to position" << std::endl;
#endif
}
} else
break;
pthread_mutex_lock(&mp);
createFileName();
pthread_mutex_unlock(&mp);
// script before
if (*stoppedFlag==0) {
executeAction(scriptBefore);
} else
break;
// header before
if (*stoppedFlag==0) {
executeAction(headerBefore);
if (*correctionMask&(1<< ANGULAR_CONVERSION) || aclog){
positionFinished(0);
setCurrentPosition(getDetectorPosition());
}
if (aclog)
aclog->addStep(getCurrentPosition(), getCurrentFileName());
if (eclog)
eclog->addStep(setDAC(-1,THRESHOLD,0), getCurrentFileName());
if (*correctionMask&(1<< I0_NORMALIZATION)) {
if (get_i0)
get_i0(0, IOarg);
}
setCurrentFrameIndex(0);
if(receiver)
pthread_mutex_lock(&mg);
if (multiframe>1)
setFrameIndex(0);
else
setFrameIndex(-1);
// file name and start receiver
if(receiver){
pthread_mutex_unlock(&mg);
pthread_mutex_lock(&mp);
createFileName();
pthread_mutex_unlock(&mp);
//send receiver file name
pthread_mutex_lock(&mg);
setFileName(fileIO::getFileName());
if(startReceiver() == FAIL) {
cout << "Start receiver failed " << endl;
stopReceiver();
*stoppedFlag=1;
pthread_mutex_unlock(&mg);
break;
}
#ifdef VERBOSE
cout << "Receiver started " << endl;
#endif
pthread_mutex_unlock(&mg);
//let processing thread listen to these packets
sem_post(&sem_newRTAcquisition);
}
#ifdef VERBOSE
cout << "Acquiring " << endl;
#endif
startAndReadAll();
#ifdef VERBOSE
cout << "detector finished" << endl;
cout << "returned! " << endl;
#endif
//cout << "data thread started " << endl;
//loop measurements
// pthread_mutex_lock(&mp);
// setStartIndex(*fileIndex);
// pthread_mutex_unlock(&mp);
//cout << "action at start" << endl;
if (*stoppedFlag==0) {
executeAction(startScript);
}
for (int is0=0; is0<ns0; ++is0) {
// cout << "scan0 loop" << endl;
if (*stoppedFlag==0) {
executeScan(0,is0);
} else
break;
for (int is1=0; is1<ns1; ++is1) {
// cout << "scan1 loop" << endl;
if (*stoppedFlag==0) {
executeScan(1,is1);
} else
break;
if (*stoppedFlag==0) {
executeAction(scriptBefore);
} else
break;
ResetPositionIndex();
for (int ip=0; ip<np; ++ip) {
// cout << "positions " << endl;
if (*stoppedFlag==0) {
if (getNumberOfPositions()>0) {
moveDetector(detPositions[ip]);
IncrementPositionIndex();
if (*correctionMask&(1<< I0_NORMALIZATION)) {
if (get_i0)
currentI0=get_i0(1,IOarg);
}
#ifdef VERBOSE
std::cout<< "moving to position" << std::endl;
#endif
}
} else
break;
pthread_mutex_lock(&mp);
createFileName();
pthread_mutex_unlock(&mp);
if (*stoppedFlag==0) {
executeAction(scriptBefore);
} else
break;
if (*stoppedFlag==0) {
executeAction(headerBefore);
if (*correctionMask&(1<< ANGULAR_CONVERSION) || aclog){// || eclog) {
positionFinished(0);
setCurrentPosition(getDetectorPosition());
}
if (aclog)
aclog->addStep(getCurrentPosition(), getCurrentFileName());
if (eclog)
eclog->addStep(setDAC(-1,THRESHOLD,0), getCurrentFileName());
if (*correctionMask&(1<< I0_NORMALIZATION)) {
if (get_i0)
get_i0(0, IOarg);
}
setCurrentFrameIndex(0);
if(receiver)
pthread_mutex_lock(&mg); //cout << "lock"<< endl;
if (multiframe>1)
setFrameIndex(0);
else
setFrameIndex(-1);
if(receiver){
pthread_mutex_unlock(&mg);//cout << "unlock"<< endl;
pthread_mutex_lock(&mp);
createFileName();
pthread_mutex_unlock(&mp);
//send receiver file name
pthread_mutex_lock(&mg); //cout << "lock"<< endl;
setFileName(fileIO::getFileName());
//start receiver
if(startReceiver() == FAIL) {
cout << "Start receiver failed " << endl;
stopReceiver();
*stoppedFlag=1;
pthread_mutex_unlock(&mg);//cout << "unlock"<< endl;
break;
}
#ifdef VERBOSE
cout << "Receiver started " << endl;
#endif
pthread_mutex_unlock(&mg);//cout << "unlock"<< endl;
//let processing thread listen to these packets
sem_post(&sem_newRTAcquisition);
}
#ifdef VERBOSE
cout << "Acquiring " << endl;
#endif
startAndReadAll();
#ifdef VERBOSE
cout << "detector finished " << endl;
#endif
#ifdef VERBOSE
cout << "returned! " << endl;
#endif
if (*correctionMask&(1<< I0_NORMALIZATION)) {
if (get_i0)
currentI0=get_i0(1,IOarg); // this is the correct i0!!!!!
}
#ifdef VERBOSE
cout << "pos finished? " << endl;
cout << "pos finished? " << endl;
#endif
positionFinished(1);
#ifdef VERBOSE
cout << "done! " << endl;
#endif
positionFinished(1);
if (*threadedProcessing==0){
#ifdef VERBOSE
cout << "start unthreaded process data " << endl;
cout << "done! " << endl;
#endif
processData(delflag);
}
} else
break;
while (dataQueueSize()) usleep(100000);
// cout << "mglock " << endl;;
pthread_mutex_lock(&mg); //cout << "lock"<< endl;
// cout << "done " << endl;;
//offline
if(!receiver){
if ((getDetectorsType()==GOTTHARD) || (getDetectorsType()==MOENCH) || (getDetectorsType()==JUNGFRAU)|| (getDetectorsType()==JUNGFRAUCTB) ){
if((*correctionMask)&(1<<WRITE_FILE))
closeDataFile();
}
}
//online
else{
if (stopReceiver() == FAIL)
*stoppedFlag = 1;
// cout<<"***********receiver stopped"<<endl;
}
pthread_mutex_unlock(&mg);//cout << "unlock"<< endl;
pthread_mutex_lock(&mp);
if (*stoppedFlag==0) {
executeAction(headerAfter);
pthread_mutex_unlock(&mp);
// setLastIndex(*fileIndex);
} else {
// setLastIndex(*fileIndex);
pthread_mutex_unlock(&mp);
break;
}
if (*stoppedFlag) {
if (*threadedProcessing==0){
#ifdef VERBOSE
std::cout<< "exiting since the detector has been stopped" << std::endl;
cout << "start unthreaded process data " << endl;
#endif
break;
} else if (ip<(np-1)) {
// pthread_mutex_lock(&mp);
// *fileIndex=setStartIndex();
// pthread_mutex_unlock(&mp);
}
} // loop on position finished
processData(delflag);
}
//script after
if (*stoppedFlag==0) {
executeAction(scriptAfter);
} else
break;
if (*stoppedFlag) {
} else
break;
while (dataQueueSize()) usleep(100000);
// close file
if(!receiver){
detectorType type = getDetectorsType();
if ((type==GOTTHARD) || (type==MOENCH) || (type==JUNGFRAUCTB) ){
if((*correctionMask)&(1<<WRITE_FILE))
closeDataFile();
}
}
// stop receiver
else{
pthread_mutex_lock(&mg);
if (stopReceiver() == FAIL)
*stoppedFlag = 1;
pthread_mutex_unlock(&mg);
}
// header after
if (*stoppedFlag==0) {
pthread_mutex_lock(&mp);
executeAction(headerAfter);
pthread_mutex_unlock(&mp);
}
if (*stoppedFlag) {
#ifdef VERBOSE
std::cout<< "exiting since the detector has been stopped" << std::endl;
std::cout<< "exiting since the detector has been stopped" << std::endl;
#endif
break;
}
}//end position loop ip
//script after
if (*stoppedFlag==0) {
executeAction(scriptAfter);
}
if (*stoppedFlag) {
#ifdef VERBOSE
std::cout<< "exiting since the detector has been stopped" << std::endl;
#endif
break;
}
}//end scan1 loop is1
if (*stoppedFlag) {
#ifdef VERBOSE
std::cout<< "exiting since the detector has been stopped" << std::endl;
#endif
break;
}
}//end scan0 loop is0
if (*stoppedFlag==0) {
executeAction(stopScript);
}
if (*stoppedFlag) {
#ifdef VERBOSE
cout << "findex incremented " << endl;
#endif
if(*correctionMask&(1<<WRITE_FILE))
IncrementFileIndex();
pthread_mutex_lock(&mg);
setFileIndex(fileIO::getFileIndex());
pthread_mutex_unlock(&mg);
#ifdef VERBOSE
std::cout<< "exiting since the detector has been stopped" << std::endl;
#endif
break;
}
#ifdef VERBOSE
cout << "findex incremented " << endl;
#endif
if(*correctionMask&(1<<WRITE_FILE))
IncrementFileIndex();
pthread_mutex_lock(&mg); //cout << "lock"<< endl;
setFileIndex(fileIO::getFileIndex());
pthread_mutex_unlock(&mg);//cout << "unlock"<< endl;
if (measurement_finished){
pthread_mutex_lock(&mg); //cout << "lock"<< endl;
measurement_finished(im,*fileIndex,measFinished_p);
pthread_mutex_unlock(&mg);//cout << "unlock"<< endl;
}
if (*stoppedFlag) {
#ifdef VERBOSE
std::cout<< "exiting since the detector has been stopped" << std::endl;
#endif
break;
}
}//end measurements loop im
// waiting for the data processing thread to finish!
if (*threadedProcessing) {
#ifdef VERBOSE
cout << "wait for data processing thread" << endl;
#endif
setJoinThread(1);
//let processing thread continue and checkjointhread
sem_post(&sem_newRTAcquisition);
pthread_join(dataProcessingThread, &status);
#ifdef VERBOSE
cout << "data processing thread joined" << endl;
#endif
break;
} else if (is1<(ns1-1)) {
// pthread_mutex_lock(&mp);
// *fileIndex=setStartIndex();
// pthread_mutex_unlock(&mp);
}
}
//end scan1 loop is1
if (*stoppedFlag) {
if(progress_call)
progress_call(getCurrentProgress(),pProgressCallArg);
if (connectChannels) {
if (disconnect_channels)
disconnect_channels(DCarg);
}
if (aclog)
delete aclog;
if (eclog)
delete eclog;
#ifdef VERBOSE
std::cout<< "exiting since the detector has been stopped" << std::endl;
cout << "acquisition finished callback " << endl;
#endif
break;
} else if (is0<(ns0-1)) {
// pthread_mutex_lock(&mp);
// *fileIndex=setStartIndex();
// pthread_mutex_unlock(&mp);
}
} //end scan0 loop is0
// pthread_mutex_lock(&mp);
// *fileIndex=setLastIndex();
// pthread_mutex_unlock(&mp);
if (*stoppedFlag==0) {
executeAction(stopScript);
} else{
if (acquisition_finished)
acquisition_finished(getCurrentProgress(),getDetectorStatus(),acqFinished_p);
#ifdef VERBOSE
cout << "findex incremented " << endl;
#endif
if(*correctionMask&(1<<WRITE_FILE))
IncrementFileIndex();
pthread_mutex_lock(&mg); //cout << "lock"<< endl;
setFileIndex(fileIO::getFileIndex());
pthread_mutex_unlock(&mg);//cout << "unlock"<< endl;
break;
}
#ifdef VERBOSE
cout << "findex incremented " << endl;
#endif
if(*correctionMask&(1<<WRITE_FILE))
IncrementFileIndex();
pthread_mutex_lock(&mg); //cout << "lock"<< endl;
setFileIndex(fileIO::getFileIndex());
pthread_mutex_unlock(&mg);//cout << "unlock"<< endl;
if (measurement_finished){
pthread_mutex_lock(&mg); //cout << "lock"<< endl;
measurement_finished(im,*fileIndex,measFinished_p);
pthread_mutex_unlock(&mg);//cout << "unlock"<< endl;
}
if (*stoppedFlag) {
break;
}
// loop measurements
}
// waiting for the data processing thread to finish!
if (*threadedProcessing) {
#ifdef VERBOSE
cout << "wait for data processing thread" << endl;
#endif
setJoinThread(1);
//let processing thread continue and checkjointhread
sem_post(&sem_newRTAcquisition);
pthread_join(dataProcessingThread, &status);
#ifdef VERBOSE
cout << "data processing thread joined" << endl;
#endif
}
if(progress_call)
progress_call(getCurrentProgress(),pProgressCallArg);
if (connectChannels) {
if (disconnect_channels)
disconnect_channels(DCarg);
}
if (aclog)
delete aclog;
if (eclog)
delete eclog;
#ifdef VERBOSE
cout << "acquisition finished callback " << endl;
#endif
if (acquisition_finished)
acquisition_finished(getCurrentProgress(),getDetectorStatus(),acqFinished_p);
#ifdef VERBOSE
cout << "acquisition finished callback done " << endl;
cout << "acquisition finished callback done " << endl;
#endif
setAcquiringFlag(false);
sem_destroy(&sem_newRTAcquisition);
sem_destroy(&sem_newRTAcquisition);
#ifdef VERBOSE
clock_gettime(CLOCK_REALTIME, &end);
cout << "Elapsed time for acquisition:" << (( end.tv_sec - begin.tv_sec ) + ( end.tv_nsec - begin.tv_nsec ) / 1000000000.0) << " seconds" << endl;
clock_gettime(CLOCK_REALTIME, &end);
cout << "Elapsed time for acquisition:" << (( end.tv_sec - begin.tv_sec ) + ( end.tv_nsec - begin.tv_nsec ) / 1000000000.0) << " seconds" << endl;
#endif
return OK;
setAcquiringFlag(false);
return OK;
}