postprocessing based on external functions - can work with f90 interface

git-svn-id: file:///afs/psi.ch/project/sls_det_software/svn/slsDetectorSoftware@285 951219d9-93cf-4727-9268-0efd64621fa3
This commit is contained in:
bergamaschi
2012-10-08 09:34:14 +00:00
parent e5c5b76236
commit 05181fa618
27 changed files with 1291 additions and 864 deletions

View File

@ -1,5 +1,9 @@
#include "postProcessing.h"
#include "postProcessingFuncs.h"
#include "angleConversionConstant.h"
#ifdef VERBOSE
#include "usersFunctions.h"
#endif
@ -12,63 +16,21 @@ postProcessing::postProcessing(){
//cout << "reg callback "<< endl;
dataReady = 0;
pCallbackArg = 0;
#ifdef VERBOSE
registerDataCallback(&defaultDataReadyFunc, NULL);
#endif
//cout << "done "<< endl;
rawDataReady = 0;
pRawDataArg = 0;
ppFun=new postProcessingFuncs();
}
int postProcessing::flatFieldCorrect(double datain, double errin, double &dataout, double &errout, double ffcoefficient, double fferr){
double e;
dataout=datain*ffcoefficient;
if (errin==0 && datain>=0)
e=sqrt(datain);
else
e=errin;
if (dataout>0)
errout=sqrt(e*ffcoefficient*e*ffcoefficient+datain*fferr*datain*fferr);
else
errout=1.0;
return 0;
};
int postProcessing::rateCorrect(double datain, double errin, double &dataout, double &errout, double tau, double t){
// double data;
double e;
dataout=(datain*exp(tau*datain/t));
if (errin==0 && datain>=0)
e=sqrt(datain);
else
e=errin;
if (dataout>0)
errout=e*dataout*sqrt((1/(datain*datain)+tau*tau/(t*t)));
else
errout=1.;
return 0;
};
postProcessing::~postProcessing(){delete ppFun;};
@ -82,47 +44,62 @@ void postProcessing::processFrame(int *myData, int delflag) {
string fname;
// double *fdata=NULL;
incrementProgress();
#ifdef VERBOSE
cout << "start processing"<< endl;
#endif
incrementProgress();
#ifdef VERBOSE
cout << "prog incremented"<< endl;
#endif
/** decode data */
fdata=decodeData(myData, fdata);
#ifdef VERBOSE
cout << "decode"<< endl;
#endif
fname=createFileName();
//Checking for write flag
if(*correctionMask&(1<<WRITE_FILE))
{
fname=createFileName();
//Checking for write flag
if(*correctionMask&(1<<WRITE_FILE)) {
#ifdef VERBOSE
cout << "writing raw data " << endl;
cout << "writing raw data " << endl;
#endif
//uses static function?!?!?!?
writeDataFile (fname+string(".raw"),fdata, NULL, NULL, 'i');
//uses static function?!?!?!?
writeDataFile (fname+string(".raw"),fdata, NULL, NULL, 'i');
#ifdef VERBOSE
cout << "done " << endl;
cout << "done " << endl;
#endif
}
}
if (rawDataReady)
rawDataReady(fdata,pRawDataArg);
if (*correctionMask & ~(1<<WRITE_FILE))
doProcessing(fdata,delflag, fname);
doProcessing(fdata,delflag, fname);
delete [] myData;
myData=NULL;
fdata=NULL;
delete [] myData;
delete [] fdata;
myData=NULL;
fdata=NULL;
#ifdef VERBOSE
cout << "Pop data queue " << *fileIndex << endl;
#endif
pthread_mutex_lock(&mp);
dataQueue.pop(); //remove the data from the queue
queuesize=dataQueue.size();
pthread_mutex_unlock(&mp);
if(*correctionMask&(1<<WRITE_FILE))
IncrementFileIndex();
popDataQueue(); //remove the data from the queue
queuesize=dataQueueSize();
}
@ -132,210 +109,53 @@ if(*correctionMask&(1<<WRITE_FILE))
void postProcessing::doProcessing(double *lfdata, int delflag, string fname) {
double *ang=new double[arraySize];
double *val=new double[arraySize];
double *err=new double[arraySize];
int np;
detectorData *thisData;
// /** write raw data file */
// if (*correctionMask==0 && delflag==1) {
// // delete [] fdata;
// ;
// } else {
int npos=getNumberOfPositions();
double *rcdata=NULL, *rcerr=NULL;
double *ffcdata=NULL, *ffcerr=NULL;
double *ang=NULL;
// int imod;
int np;
//string fname;
detectorData *thisData;
string ext=".dat";
// fname=createFileName();
/** rate correction */
if (*correctionMask&(1<<RATE_CORRECTION)) {
string ext=".dat";
double t=(*expTime)*1E-9;
if (GetCurrentPositionIndex()<=1) {
#ifdef VERBOSE
cout << "rate correcting " << endl;
cout << "init dataset" << endl;
#endif
rcdata=new double[getTotalNumberOfChannels()];
rcerr=new double[getTotalNumberOfChannels()];
rateCorrect(lfdata,NULL,rcdata,rcerr);
delete [] lfdata;
#ifdef VERBOSE
cout << "done " << endl;
#endif
} else {
rcdata=lfdata;
ppFun->initDataset();
}
lfdata=NULL;
#ifdef VERBOSE
cout << "add frame" << endl;
#endif
ppFun->addFrame(lfdata, &currentPosition, &currentI0, &t, (fname+ext).c_str(), NULL);
if ((GetCurrentPositionIndex()>=npos && positionFinished() && dataQueueSize()) || npos==0) {
#ifdef VERBOSE
cout << "finalize dataset" << endl;
#endif
ppFun->finalizeDataset(ang, val, err, &np);
IncrementPositionIndex();
/** flat field correction */
if (*correctionMask&(1<<FLAT_FIELD_CORRECTION)) {
#ifdef VERBOSE
cout << "ff correcting " << endl;
#endif
ffcdata=new double[getTotalNumberOfChannels()];
ffcerr=new double[getTotalNumberOfChannels()];
flatFieldCorrect(rcdata,rcerr,ffcdata,ffcerr);
delete [] rcdata;
rcdata=NULL;
if (rcerr)
delete [] rcerr;
rcerr=NULL;
#ifdef VERBOSE
cout << "done " << endl;
#endif
} else {
ffcdata=rcdata;
ffcerr=rcerr;
rcdata=NULL;
rcerr=NULL;
}
// writes angualr converted files
if (*correctionMask!=0) {
if (*correctionMask&(1<< ANGULAR_CONVERSION)) {
#ifdef VERBOSE
cout << "ang conv " << endl;
#endif
ang=convertAngles();
#ifdef VERBOSE
cout << "done - write position data file " << endl;
#endif
}
writeDataFile (fname+ext, ffcdata, ffcerr,ang);
#ifdef VERBOSE
cout << "done " << endl;
#endif
}
// if (*correctionMask&(1<< ANGULAR_CONVERSION) && getNumberOfPositions()>0) {
if (*correctionMask&(1<< ANGULAR_CONVERSION)) {
#ifdef VERBOSE
cout << "**************Current position index is " << getCurrentPositionIndex() << endl;
#endif
// if (*numberOfPositions>0) {
if (getCurrentPositionIndex()<=1) {
#ifdef VERBOSE
cout << "reset merging " << endl;
#endif
resetMerging();
}
#ifdef VERBOSE
cout << "add to merging "<< getCurrentPositionIndex() << endl;
#endif
addToMerging(ang, ffcdata, ffcerr, badChannelMask );
#ifdef VERBOSE
cout << getCurrentPositionIndex() << " " << getNumberOfPositions() << endl;
#endif
// cout << "lock 1" << endl;
pthread_mutex_lock(&mp);
if ((getCurrentPositionIndex()>=getNumberOfPositions() && posfinished==1 && queuesize==1)) {
#ifdef VERBOSE
cout << "finalize merging " << getCurrentPositionIndex()<< endl;
#endif
np=finalizeMerging();
/** file writing */
incrementPositionIndex();
// cout << "unlock 1" << endl;
pthread_mutex_lock(&mp);
fname=createFileName();
pthread_mutex_unlock(&mp);
fname=createFileName();
#ifdef VERBOSE
cout << "writing merged data file" << endl;
#endif
writeDataFile (fname+ext,np,getMergedCounts(), getMergedErrors(), getMergedPositions(),'f');
#ifdef VERBOSE
cout << " done" << endl;
#endif
// if (delflag) {
// deleteMerging();
// } else {
thisData=new detectorData(getMergedCounts(),getMergedErrors(),getMergedPositions(),getCurrentProgress(),(fname+ext).c_str(),np);
// // cout << "lock 2" << endl;
// pthread_mutex_lock(&mg);
// finalDataQueue.push(thisData);
// // cout << "unlock 2" << endl;
// pthread_mutex_unlock(&mg);
if (dataReady) {
dataReady(thisData, pCallbackArg);
delete thisData;
}
// }
// cout << "lock 3" << endl;
pthread_mutex_lock(&mp);
}
// cout << "unlock 3" << endl;
pthread_mutex_unlock(&mp);
if (ffcdata)
delete [] ffcdata;
ffcdata=NULL;
if (ffcerr)
delete [] ffcerr;
ffcerr=NULL;
if (ang)
delete [] ang;
ang=NULL;
} else {
// if (delflag) {
// if (ffcdata)
// delete [] ffcdata;
// if (ffcerr)
// delete [] ffcerr;
// if ( ang)
// delete [] ang;
// } else {
thisData=new detectorData(ffcdata,ffcerr,NULL,getCurrentProgress(),(fname+ext).c_str(),getTotalNumberOfChannels());
if(*correctionMask&(1<<WRITE_FILE))
writeDataFile (fname+ext,np,ang, val, err,'f');
thisData=new detectorData(ang,val,err,getCurrentProgress(),(fname+ext).c_str(),np);
if (dataReady) {
dataReady(thisData, pCallbackArg);
delete thisData;
}
// pthread_mutex_lock(&mg);
// finalDataQueue.push(thisData);
// pthread_mutex_unlock(&mg);
// }
delete thisData;
}
//}
incrementFileIndex();
#ifdef VERBOSE
cout << "fdata is " << fdata << endl;
#endif
}
@ -422,70 +242,89 @@ void* postProcessing::processData(int delflag) {
#endif
setTotalProgress();
pthread_mutex_lock(&mp);
queuesize=dataQueue.size();
pthread_mutex_unlock(&mp);
queuesize=dataQueueSize();
int *myData;
int dum=1;
fdata=NULL;
while(dum | *threadedProcessing) { // ????????????????????????
/* IF THERE ARE DATA PROCESS THEM*/
pthread_mutex_lock(&mp);
while((queuesize=dataQueue.size())>0) {
while((queuesize=dataQueueSize())>0) {
/** Pop data queue */
myData=dataQueue.front(); // get the data from the queue
pthread_mutex_unlock(&mp);
#ifdef VERBOSE
cout << "data found"<< endl;
#endif
myData=dataQueueFront(); // get the data from the queue
#ifdef VERBOSE
cout << "got them"<< endl;
#endif
if (myData) {
processFrame(myData,delflag);
//usleep(1000);
}
pthread_mutex_lock(&mp);
}
pthread_mutex_unlock(&mp);
/* IF THERE ARE NO DATA look if acquisition is finished */
pthread_mutex_lock(&mp);
if (jointhread) {
if (dataQueue.size()==0) {
pthread_mutex_unlock(&mp);
if (checkJoinThread()) {
if (dataQueueSize()==0) {
break;
}
pthread_mutex_unlock(&mp);
} else {
pthread_mutex_unlock(&mp);
}
}
dum=0;
}
if (fdata) {
#ifdef VERBOSE
cout << "delete fdata" << endl;
#endif
delete [] fdata;
#ifdef VERBOSE
cout << "done " << endl;
#endif
}
return 0;
}
int postProcessing::checkJoinThread() {
int retval;
pthread_mutex_lock(&mp);
retval=jointhread;
pthread_mutex_unlock(&mp);
return retval;
}
void postProcessing::setJoinThread( int v) {
pthread_mutex_lock(&mp);
jointhread=v;
pthread_mutex_unlock(&mp);
}
int* postProcessing::dataQueueFront() {
int *retval=NULL;
pthread_mutex_lock(&mp);
if( !dataQueue.empty() ) {
retval=dataQueue.front();
}
pthread_mutex_unlock(&mp);
return retval;
}
int postProcessing::dataQueueSize() {
int retval;
pthread_mutex_lock(&mp);
retval=dataQueue.size();
pthread_mutex_unlock(&mp);
return retval;
}
int* postProcessing::popDataQueue() {
int *retval=NULL;
pthread_mutex_lock(&mp);
if( !dataQueue.empty() ) {
retval=dataQueue.front();
dataQueue.pop();
}
pthread_mutex_unlock(&mp);
return retval;
}
@ -502,11 +341,13 @@ detectorData* postProcessing::popFinalDataQueue() {
void postProcessing::resetDataQueue() {
int *retval=NULL;
pthread_mutex_lock(&mp);
while( !dataQueue.empty() ) {
retval=dataQueue.front();
dataQueue.pop();
delete [] retval;
}
pthread_mutex_unlock(&mp);
}
@ -523,12 +364,125 @@ void postProcessing::resetFinalDataQueue() {
void postProcessing::startThread(int delflag) {
/////////////////////////////////// Initialize dataset
//resetDataQueue();
setTotalProgress();
int nmod=getNMods();
int *chPM=new int[nmod];
int *mM=new int[nmod];
int totch=0;
#ifdef VERBOSE
cout << "init dataset stuff" << endl;
#endif
for (int im=0; im<nmod; im++) {
#ifdef VERBOSE
cout << "MODULE " << im << endl;
#endif
chPM[im]=getChansPerMod(im);
#ifdef VERBOSE
cout << "chanspermod" << endl;
#endif
mM[im]=getMoveFlag(im);
#ifdef VERBOSE
cout << "moveflag" << endl;
#endif
totch+=chPM[im];
}
#ifdef VERBOSE
cout << "total channels is " << totch << endl;
#endif
double *ffcoeff=NULL, *fferr=NULL;
if (*correctionMask&(1<<FLAT_FIELD_CORRECTION)) {
#ifdef VERBOSE
cout << "get ff " << endl;
#endif
ffcoeff=new double[totch];
fferr=new double[totch];
getFlatFieldCorrection(ffcoeff,fferr);
}
double tdead;
if (*correctionMask&(1<<RATE_CORRECTION)) {
#ifdef VERBOSE
cout << "get tau " << endl;
#endif
tdead=getRateCorrectionTau();
} else
tdead=0;
int angdir=getAngularDirection();
double to=0;
double bs=0;
double sx=0, sy=0;
double *angRad=NULL;
double *angOff=NULL;
double *angCenter=NULL;
angleConversionConstant *p=NULL;
if (*correctionMask&(1<< ANGULAR_CONVERSION)) {
#ifdef VERBOSE
cout << "get angconv " << endl;
#endif
bs=getBinSize();
to=getGlobalOffset()+getFineOffset();
angRad=new double[nmod];
angOff=new double[nmod];
angCenter=new double[nmod];
for (int im=0; im<nmod; im++) {
p=getAngularConversionPointer(im);
angRad[im]=p->r_conversion;
angOff[im]=p->offset;
angCenter[im]=p->center;
}
sx=getAngularConversionParameter(SAMPLE_X);
sy=getAngularConversionParameter(SAMPLE_Y);
}
#ifdef VERBOSE
cout << "init dataset" << endl;
#endif
ppFun->initDataset(&nmod,chPM,mM,badChannelMask, ffcoeff, fferr, &tdead, &angdir, angRad, angOff, angCenter, &to, &bs, &sx, &sy);
#ifdef VERBOSE
cout << "done" << endl;
#endif
if (*correctionMask&(1<< ANGULAR_CONVERSION)) {
arraySize=getNumberOfAngularBins();
if (arraySize<=0)
arraySize=totch;
} else {
arraySize=totch;
}
queuesize=dataQueueSize();
resetFinalDataQueue();
resetDataQueue();
/////////////////////////////////// Start thread ////////////////////////////////////////////////////////
#ifdef VERBOSE
cout << "start thread stuff" << endl;
#endif
pthread_attr_t tattr;
int ret;
sched_param param, mparam;
int policy= SCHED_OTHER;
// set the priority; others are unchanged
//newprio = 30;
mparam.sched_priority =1;
@ -553,7 +507,8 @@ void postProcessing::startThread(int delflag) {
ret = pthread_create(&dataProcessingThread, &tattr,startProcessDataNoDelete, (void*)this);
pthread_attr_destroy(&tattr);
// scheduling parameters of target thread
// scheduling parameters of target thread
ret = pthread_setschedparam(dataProcessingThread, policy, &param);
}