receiver complete change

git-svn-id: file:///afs/psi.ch/project/sls_det_software/svn/slsDetectorSoftware@687 951219d9-93cf-4727-9268-0efd64621fa3
This commit is contained in:
l_maliakal_d 2013-11-14 12:34:31 +00:00
parent 18fce607a6
commit cd88aff756
2 changed files with 558 additions and 217 deletions

View File

@ -3,17 +3,22 @@
* @short single photon filter using trees
***********************************************/
#include "singlePhotonFilter.h"
#include <errno.h>
#define BUF_SIZE (16*1024*1024) //16mb
singlePhotonFilter::singlePhotonFilter(int nx, int ny,
int fmask, int pmask, int foffset, int poffset, int pperf, int iValue,
vector <vector<int16_t> > m, vector <vector<int16_t> > s, int d):
int16_t *m, int16_t *s, CircularFifo<char>* f, int d):
#ifdef MYROOT
myTree(NULL),
myFile(NULL),
#else
myFile(NULL),
nHitsPerFrame(0),
nHitsPerFile(0),
nTotalHits(0),
#endif
nChannelsX(nx),
nChannelsY(ny),
@ -36,38 +41,128 @@ singlePhotonFilter::singlePhotonFilter(int nx, int ny,
packet_index_offset(poffset),
packets_per_frame(pperf),
incrementValue(iValue),
enable(false),
firstTime(true),
ret(0),
pIndex(0),
fIndex(0){
fIndex(0),
thread_started(0),
threads_mask(0x0),
currentThread(-1),
thisThreadIndex(-1),
fileIndex(0),
fifo(f){
#ifndef MYROOT
photonHitList = new single_photon_hit[nChannelsX*nChannelsY];
#endif
//cluster
if (nChannelsX)
nClusterX = 1;
sqrtCluster = sqrt(nClusterX*nClusterY);
deltaX = nClusterX/2;// 0 or 1
clusterCenterPixel = (deltaX * nClusterY) + 1;
stat.resize(nChannelsX);
for(int i=0; i<nChannelsX; i++)
stat[i].resize(nChannelsY);
//struct
myPhotonHit.data.resize(nClusterX);
for(int i=0; i<nClusterX; i++)
myPhotonHit.data[i].resize(nClusterY);
myPhotonHit.x = 0;
myPhotonHit.y = 0;
myPhotonHit.rms = 0;
myPhotonHit.ped = 0;
myPhotonHit.iframe = -1;
stat = new movingStat[nChannelsX*nChannelsY];
nHitStat = new movingStat();
myPhotonHit = new single_photon_hit;
myPhotonHit->data = new double[nClusterX*nClusterY];
myPhotonHit->x = 0;
myPhotonHit->y = 0;
myPhotonHit->rms = 0;
myPhotonHit->ped = 0;
myPhotonHit->iframe = -1;
for(int i=0;i < NUM_THREADS; i++){
/*smp[i] = NULL;*/
mem0[i]=NULL;
}
numFramesAlloted = new int[NUM_THREADS];
strcpy(savefilename,"");
strcpy(filePath,"");
strcpy(fileName,"run");
pthread_mutex_init(&write_mutex,NULL);
pthread_mutex_init(&running_mutex,NULL);
}
singlePhotonFilter::~singlePhotonFilter(){
enableCompression(false);
if(numFramesAlloted) delete [] numFramesAlloted;
writeToFile();
closeFile();
if(myFile) delete myFile;
if(mask) delete mask;
if(stat) delete stat;
if(nHitStat) delete nHitStat;
/*if(smp) delete []smp;*/
if(mem0) delete [] mem0;
if(fifo) delete fifo;
}
void singlePhotonFilter::initTree(char *outfname){
int singlePhotonFilter::enableCompression(bool enable){
//#ifdef VERBOSE
cout << "Compression set to " << enable << endl;
//#endif
if(enable){
threads_mask = 0x0;
currentThread = -1;
for(int i=0; i<NUM_THREADS; ++i){
//initialize semaphore
sem_init(&smp[i],0,0);
//create threads
thread_started = 0;
thisThreadIndex = i;
if(pthread_create(&find_hits_thread[i], NULL,createThreads, (void*) this)){
cout << "Could not create thread with index " << i << endl;
return FAIL;
}
while(!thread_started);
}
}else{
if(thread_started){
for(int i=0; i<NUM_THREADS; ++i){
//cancel threads
while(pthread_cancel(find_hits_thread[i])!=0)
cout << "Unable to cancel Thread " << index << endl;/*pthread_join(find_hits_thread[i],NULL);*/
pthread_mutex_lock(&write_mutex);
closeFile();
pthread_mutex_unlock(&write_mutex);
//semaphore destroy
sem_post(&smp[i]);
sem_destroy(&smp[i]);
}
}
}
return OK;
}
void* singlePhotonFilter::createThreads(void *this_pointer){
((singlePhotonFilter*)this_pointer)->findHits();
return this_pointer;
}
int singlePhotonFilter::initTree(){
#ifdef MYROOT
if(myFile) {
writeToFile();
closeFile();
}
outfname = string(outfname).replace(".raw",".root");
//fName.replace(".raw",".png");
//sprintf(outfname, "%s/%s.root", outdir, fname);
@ -81,163 +176,89 @@ void singlePhotonFilter::initTree(char *outfname){
myFile = new TFile(outfname, "RECREATE"); /** later return error if it exists */
//tree
myTree = new TTree(fname, fname);
myTree->Branch("x",&myPhotonHit.x,"x/I");
myTree->Branch("y",&myPhotonHit.y,"y/I");
myTree->Branch("data",myPhotonHit.data,cdata);
myTree->Branch("pedestal",&myPhotonHit.ped,"pedestal/D");
myTree->Branch("rms",&myPhotonHit.rms,"rms/D");
myTree->Branch("iframe",&myPhotonHit->iframe,"iframe/I");
myTree->Branch("x",&myPhotonHit->x,"x/I");
myTree->Branch("y",&myPhotonHit->y,"y/I");
myTree->Branch("data",myPhotonHit->data,cdata);
myTree->Branch("pedestal",&myPhotonHit->ped,"pedestal/D");
myTree->Branch("rms",&myPhotonHit->rms,"rms/D");
#else
;/*myFile = fopen(outfname, "w");*/
writeToFile();
closeFile();
sprintf(savefilename, "%s/%s_f%012d_%d.raw", filePath,fileName,nTotalHits,fileIndex);
myFile = fopen(savefilename, "w");
setvbuf(myFile,NULL,_IOFBF,BUF_SIZE);
cout<<"File created: "<<savefilename<<endl;
nHitsPerFile = 0;
#endif
//initialize
for (int ir=0; ir<nChannelsX; ir++){
for (int ic=0; ic<nChannelsY; ic++){
stat[ir][ic].Clear();
stat[ir][ic].SetN(nbackground);
}
}
return OK;
}
int singlePhotonFilter::writeToFile(){
#ifdef MYROOT
if((myTree) && (myFile)){
if((myTree) && (myFile))
myTree->Write();
myFile = myTree->GetCurrentFile();
myFile->Close();
delete myFile;
}
#else
;
/*if(myFile){ //&& (number of structs?)
fwrite((void*)(&myPhotonHit), 1, sizeof(myPhotonHit), myFile);
fclose(myFile);
delete myFile;
return OK;
}*/
if(nHitsPerFrame){
if(myFile){
/*cout<<"writing "<< nHitsPerFrame << " hits to file" << endl;*/
fwrite((void*)(photonHitList), 1, sizeof(single_photon_hit)*nHitsPerFrame, myFile);
nHitsPerFrame = 0;
//cout<<"Exiting writeToFile"<<endl;
return OK;
}else
cout << "ERROR: Could not write to " << nHitsPerFrame <<" hits to file as myfile doesnt exist" << endl;
}
#endif
return FAIL;
}
int singlePhotonFilter::findHits(int16_t *myData, int myDataSize){
int nHits = 0;
int hits[nChannelsX][nChannelsY];
int ir,ic; // for indexing row, column
int dum;
double tot; // total value of pixel
//initialize to 0
for (ir=0; ir<nChannelsX; ir++)
for (ic=0; ic<nChannelsY;ic++)
hits[ir][ic]=0;
//for each pixel
for (ir=0; ir<160; ir++){
for (ic=0; ic<160;ic++){
//validate mapping
if ((map[ir][ic] < 0) || (map[ir][ic] >= (myDataSize))){
cout << "Bad Channel Mapping index: " << map[ir][ic] << endl;
continue;
}
//if frame within pedestal number
if (myPhotonHit.iframe < nped)
stat[ir][ic].Calc((double)(mask[ir][ic] ^ myData[map[ir][ic]]));
// frame outside pedestal number
else{
//if hit wasnt registered
if (hits[ir][ic] == 0){
myPhotonHit.rms = stat[ir][ic].StandardDeviation();
myPhotonHit.ped = stat[ir][ic].Mean();
dum = 1;
tot = 0;
//for 1d and 2d
int c = 1;
int d = 1;
if (nChannelsX == 1)
c = 0;
//center pixel
myPhotonHit.data[c][d] = ((double)(mask[ir][ic] ^ myData[map[ir][ic]])) - myPhotonHit.ped;
//check if neighbours are involved
for (int ih = -c; ih <= c ; ih++){
for (int iv = -d; iv <= d; iv++){
//validate neighbouring pixels (not really needed)
if (((ir+ih) >= 0) && ((ir+ih) < nChannelsX) && ((ic+iv) >= 0) && ((ic+iv) < nChannelsY)) {
//validate mapping
if ((map[ir+ih][ic+iv] < 0) || (map[ir+ih][ic+iv] >= myDataSize)){
cout << "Bad Channel Mapping index: " << map[ir][ic] << endl;
continue;
}
myPhotonHit.data[iv+d][ih+c] = (double)(mask[ir][ic] ^ myData[map[ir+ih][ic+iv]])-stat[ir+ih][ic+iv].Mean();
tot += myPhotonHit.data[iv+c][ih+d];
if (myPhotonHit.data[iv+c][ih+d] > myPhotonHit.data[c][d])
dum = 2;
}
}
}
if (tot < CLUSTER_SIZE * nsigma * myPhotonHit.rms)
dum = 0;
if (myPhotonHit.data[c][d] < nsigma * myPhotonHit.rms && dum != 0)
dum = 3;
if (myPhotonHit.data[c][d] > -nsigma * myPhotonHit.rms &&
myPhotonHit.data[c][d] < nsigma * myPhotonHit.rms &&
dum == 0){
//Appriximated running average
stat[ir][ic].Calc((double)(mask[ir][ic]^myData[map[ir][ic]]));
}
else if (dum == 1){
myPhotonHit.x = ic;
myPhotonHit.y = ir;
int singlePhotonFilter::closeFile(){
#ifdef MYROOT
myTree->Fill();
#endif
hits[ir][ic] = 1;
nHits++;
}
}
}
if(myTree){
if (myFile){
myFile = myTree->GetCurrentFile();
myFile->Close();
delete myFile;
}
delete myTree;
}
if (myPhotonHit.iframe%1000 == 0)
cout << "Frame: " << myPhotonHit.iframe << " Hits: " << nHits << endl;
return nHits;
#else
if(myFile)
fclose(myFile);
myFile = NULL;
#endif
return OK;
}
void singlePhotonFilter::setupAcquisitionParameters(){
fnum = 0; pnum = 0; ptot = 0; f0 = 0; firstTime = true;
void singlePhotonFilter::setupAcquisitionParameters(char *outfpath, char* outfname, int outfIndex){
fileIndex = outfIndex;
strcpy(filePath,outfpath);
strcpy(fileName,outfname);
fnum = 0; pnum = 0; ptot = 0; f0 = 0; firstTime = true; currentThread = -1;
//initialize
for (int ir=0; ir<nChannelsX; ir++){
for (int ic=0; ic<nChannelsY; ic++){
stat[ir*nChannelsY+ic].Clear();
stat[ir*nChannelsY+ic].SetN(nbackground);
}
}
nHitStat->Clear();
nHitStat->SetN(nbackground);
#ifndef MYROOT
nTotalHits = 0;
#endif
}
@ -260,7 +281,7 @@ int singlePhotonFilter::verifyFrame(char *inData){
if (pIndex == 0)
pIndex = packets_per_frame;
#ifdef VERYVERBOSE
cout<<"fi:"<<hex<<fIndex<< " pi:"<< pIndex << endl;
cout<<"fi:"<<fIndex<< " pi:"<< pIndex << endl;
#endif
//firsttime
if (firstTime){
@ -275,15 +296,28 @@ int singlePhotonFilter::verifyFrame(char *inData){
if (fIndex != fnum){
/*cout << "**Frame number doesnt match:Missing Packet! " << fnum << " "
"Expected f " << fnum << " p " << pnum + 1 << " received f " << fIndex << " p " << pIndex << endl;*/
if (ptot == 0) {
if (pIndex == 1)//so that its not moved to next line.
ret = 0;
else
ret = -1; //moved to next line
}
else
ret = -2;//so remember and moved to next line and copy
if ((pnum+1 == packets_per_frame) && (pIndex == packets_per_frame)) //so remember and moved to next line and copy and again move to next line
ret = -3;
fnum = fIndex;
pnum = pIndex;
ptot = 1;
ret = -2; //dont return here.. if is the end of packets, for gotthard uve toreturn -1
//ret = -2; dont return here.. if is the end of packets, uve toreturn -1 so that remaining ones are FFFFd and moved to new line
}
//if missing a packet, discard
else if (pIndex != pnum + 1){/**else */
/* cout << "**packet number doesnt match:Missing Packet! " << fnum << " "
/*cout << "**packet number doesnt match:Missing Packet! " << fnum << " "
"Expected f" << fnum << " p " << pnum + 1 << " received f " << fnum << " p " << pIndex << endl;*/
pnum = pIndex;
ptot++;
@ -299,9 +333,7 @@ int singlePhotonFilter::verifyFrame(char *inData){
if (pIndex == packets_per_frame){
//got all packets
if (ptot == packets_per_frame){
/*myPhotonHit.iframe = fnum - f0;//??
if (enable)
;*//*findHits(myData,inDataSize * ptot);*/
/*myPhotonHit.iframe = fnum - f0;*/
fnum = fIndex + 1;
ptot = 0;
pnum = 0;
@ -311,7 +343,8 @@ int singlePhotonFilter::verifyFrame(char *inData){
ptot = 0;
pnum = 0;
fnum = fIndex + 1;
ret = -1;
if(!ret)
ret = -1;
}
}
@ -330,3 +363,224 @@ int singlePhotonFilter::verifyFrame(char *inData){
void singlePhotonFilter::findHits(){
int ir,ic,r,c,i;
int currentIndex;
int pixelIndex;
int clusterIndex;
double* clusterData = new double[nClusterX*nClusterY];
double sigmarms;
double clusterrms;
double clusterped;
uint32_t clusteriframe;
int dum;
double tot; // total value of pixel
char* isData;
int16_t* myData;
int index = thisThreadIndex;
//thread created
pthread_mutex_lock(&running_mutex);
thread_started = 1;
pthread_mutex_unlock(&running_mutex);
while(1){
//wait for job
while((dum = sem_wait(&smp[index]))!=0)
cout<<"semwait:["<<index<<"]:"<<dum<<endl;
//proceed to get details for job
if(pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL)!=0)
cout << "Could not set Thread " << index <<" cancel state to be disabled" << endl;
isData = mem0[index];
pthread_mutex_lock(&running_mutex);
threads_mask|=(1<<index);
pthread_mutex_unlock(&running_mutex);
//wait for acknowledgement to start
while((dum = sem_wait(&smp[index]))!=0)
cout<<"got data semwait:["<<index<<"]:"<<dum<<endl;
myData = (int16_t*)isData;
for (i=0; i < numFramesAlloted[index]; ++i){
/*cout<<"mydata:"<<(void*)isData<<endl;*/
clusteriframe = (uint32_t)(*((uint32_t*)(isData)));
if(clusteriframe != 0xFFFFFFFF){
clusteriframe = ((clusteriframe & frame_index_mask) >>frame_index_offset) - f0;
//for each pixel
for (ir=0; ir<nChannelsX; ++ir){
for (ic=0; ic<nChannelsY;++ic){
currentIndex = (ir * nChannelsY) + ic;
//validate mapping
if ((map[currentIndex] < 0) || (map[currentIndex] >= (dataSize))){
cout << "Bad Channel Mapping index: " << map[currentIndex] << endl;
continue;
}
//if frame within pedestal number
if (clusteriframe < nped){
stat[currentIndex].Calc((double)(mask[currentIndex] ^ myData[map[currentIndex]]));
// frame outside pedestal number
}else{
dum = 1;
tot = 0;
clusterrms = stat[currentIndex].StandardDeviation();//-1
clusterped = stat[currentIndex].Mean();//0
sigmarms = clusterrms * nsigma;
clusterData[clusterCenterPixel] = ((double)(mask[currentIndex] ^ myData[map[currentIndex]])) - clusterped;
for (r=-deltaX; r <= deltaX; ++r ){
if (((ir+r) < 0) || ((ir+r) >= nChannelsX))
continue;
for(c=-1; c <= 1; ++c){
if (((ic+c) < 0) || ((ic+c) >= nChannelsY))
continue;
pixelIndex = currentIndex + (r*nChannelsY+c);
if ((map[pixelIndex] < 0) || (map[pixelIndex] >= dataSize)){
cout << "Bad Channel Mapping index: " << map[pixelIndex] << endl;
continue;
}
clusterIndex = pixelIndex-(currentIndex - deltaX * nChannelsY - 1);
clusterData[clusterIndex] = ((double)(mask[pixelIndex] ^ myData[map[pixelIndex]])) - stat[pixelIndex].Mean();
tot += clusterData[clusterIndex];
//discard negative events
if (clusterData[clusterIndex] > clusterData[clusterCenterPixel])
dum = 2;
}
}
if (tot < sqrtCluster * sigmarms)
dum = 0;
//discard events (for pedestal) where sum of the neighbours is too large.
if (clusterData[clusterCenterPixel] < sigmarms && dum != 0)
dum = 3;
//Appriximated running average
if (clusterData[clusterCenterPixel] > -sigmarms &&
clusterData[clusterCenterPixel] < sigmarms &&
dum == 0){
stat[currentIndex].Calc((double)(mask[currentIndex]^myData[map[currentIndex]]));
}
// this is an event and we are in the center
else if (dum == 1){
pthread_mutex_lock(&write_mutex);
#ifdef MYROOT
myTree->Fill();
#else
photonHitList[nHitsPerFrame].data = clusterData;
photonHitList[nHitsPerFrame].x = ic;
photonHitList[nHitsPerFrame].y = ir;
photonHitList[nHitsPerFrame].rms = clusterrms;
photonHitList[nHitsPerFrame].ped = clusterped;
photonHitList[nHitsPerFrame].iframe = clusteriframe;
nHitsPerFrame++;
nHitsPerFile++;
nTotalHits++;
if(nHitsPerFile >= MAX_HITS_PER_FILE-1)
initTree();
#endif
pthread_mutex_unlock(&write_mutex);
}
}
}
}
}//else cout<<"***did not get whoel frame in single photon filter"<<endl;
nHitStat->Calc((double)nHitsPerFrame);
pthread_mutex_lock(&write_mutex);
writeToFile();
fifo->push(isData);
pthread_mutex_unlock(&write_mutex);
isData += 4096;
myData += 2048;
/*
if ((clusteriframe%1000 == 0) && (clusteriframe != 0) ){
cout << dec << "Frame: " << clusteriframe << " Hit Avg over last frames: " <<
nHitStat->Mean() << " .. "<<nHitStat->StandardDeviation() << endl;
cout<<"writing "<< nHitsPerFrame << " hits to file" << endl;
}
*/
}
pthread_mutex_lock(&running_mutex);
threads_mask^=(1<<index);
pthread_mutex_unlock(&running_mutex);
if(pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL)!=0)
cout << "Could not set Thread " << index <<" cancel state to be enabled" << endl;
}
}
void singlePhotonFilter::assignJobsForThread(char *theData, int numThisData){
//cout << "1 Entering assignJobsForThread" << endl;
while(1){
++currentThread;
if(currentThread == NUM_THREADS)
currentThread=0;
if(!((1<<currentThread)&threads_mask)){
mem0[currentThread] = theData;
/*cout<<"thedata:"<<((theData-listmem0)/4096)<<" numFramesAlloted:"<<numThisData<<endl;
if(((theData-listmem0)/4096)+numThisData>=25000) {
cout<<"*****************problem: "<<((theData-listmem0)/4096)<<" :"<<numThisData<<endl;
}*/
numFramesAlloted[currentThread] = numThisData;
sem_post(&smp[currentThread]);
while(!((1<<currentThread)&threads_mask));
//usleep(10000);
sem_post(&smp[currentThread]);
break;
}
}
//cout << "4 Exiting assignJobsForThread" << endl;
}
int singlePhotonFilter::checkIfJobsDone(){
//cout<<"Checking if jobs are done"<<endl;
if(threads_mask){
//cout<<"Not done!"<<threads_mask<<endl;
return 0;
}
pthread_mutex_lock(&write_mutex);
writeToFile();
closeFile();
pthread_mutex_unlock(&write_mutex);
cout<<"All done!"<<endl;
return 1;
}

View File

@ -26,8 +26,10 @@
#include <list>
#include <queue>
#include <fstream>
#include <pthread.h>
#include <semaphore.h>
#include "circularFifo.h"
#include "runningStat.h"
#include "movingStat.h"
using namespace std;
@ -36,7 +38,7 @@ using namespace std;
typedef double double32_t;
typedef float float32_t;
typedef int int32_t;
#define MAX_STR_LENGTH 1000
/**
return values
@ -49,7 +51,7 @@ enum {
@short structure for a single photon hit
*/
typedef struct{
vector < vector<double> > data; /**< data size */
double* data; /**< data size */
int x; /**< x-coordinate of the center of hit */
int y; /**< x-coordinate of the center of hit */
double rms; /**< noise of central pixel */
@ -58,8 +60,6 @@ typedef struct{
}single_photon_hit;
/**
@short class handling trees and its data file
*/
@ -78,10 +78,9 @@ public:
* @param iValue increment value (only for gotthard to increment index to have matching frame number)
* @param m Map to data without headers
* @param s mask as to which adcs are inverted
* @param f circular fifo buffer, which needs to be freed
* @param d Size of data with the headers
*/
/** why is the datasize -1, you need to know the datasize with headers so that you dont go over the limits */
singlePhotonFilter(
int nx,
int ny,
@ -91,55 +90,13 @@ public:
int poffset,
int pperf,
int iValue,
vector <vector<int16_t> > m,
vector <vector<int16_t> > s,
int d = -1);
int16_t *m,
int16_t *s,
CircularFifo<char>* f,
int d);
/** virtual destructor */
virtual ~singlePhotonFilter(){};
/**
* Construct a tree, populate struct for the single photon hit and provide all the masks and offsets
* @param outdir Output file directory/Output file name
*/
void initTree(char *outfname);
/**
* Reset Indices before starting acquisition
*/
void setupAcquisitionParameters();
/** reconstruct the frame with all the right packets
* @param inData the data from socket to be verified
* */
int verifyFrame(char *inData);
/**
* Writes tree/struct to file
* returns OK if successful, else FAIL
*/
int writeToFile();
/**
* Find Hits frame by frame
* @param myData data for one frame
* @param myDataSize data size for one frame with headers
* returns number of hits
*/
int findHits(int16_t *myData, int myDataSize);
/**
* Enable Filter, This makes sure findHits() is called
*/
void enableFilter(bool r){enable = r;};
/**
* Returns packets per frame
*/
int getPacketsPerFrame(){ return packets_per_frame;};
virtual ~singlePhotonFilter();
#ifdef MYROOT
/**
@ -148,10 +105,15 @@ public:
TTree *getTree(){ return myTree; };
#endif
/**
* Returns packets per frame
*/
int getPacketsPerFrame(){ return packets_per_frame;};
/**
* returns struct
*/
single_photon_hit getStructure(){ return myPhotonHit; };
single_photon_hit* getStructure(){ return myPhotonHit; };
/** Set number of frames to calculate pedestal at beginning */
void setNPed(int n){ nped = n; };
@ -177,6 +139,65 @@ public:
/** Get correction */
double getOutCorr(){return outcorr;};
/**
* Construct a tree, populate struct for the single photon hit and provide all the masks and offsets
* @param outdir Output file directory/Output file name
* returns OK if successful, else FAIL
*/
int initTree();
/**
* Writes tree/struct to file
* returns OK if successful, else FAIL
*/
int writeToFile();
/**
* Closes file
* returns OK if successful, else FAIL
*/
int closeFile();
/**
* Reset Indices before starting acquisition
*/
void setupAcquisitionParameters(char *outfpath, char* outfname, int outfIndex);
/** reconstruct the frame with all the right packets
* @param inData the data from socket to be verified
* returns 0 if still waiting for next packet of same frame,
* 1 if end of complete frame, -1 if end of incomplete frame,
* -2 first packet of next frame, so push previous one; -3 last packet of current frame, push both frames
* */
int verifyFrame(char *inData);
/**
* Find Hits frame by frame and save it in file/tree
*/
void findHits();
/** Enable or disable compression
* @param enable true to enable compression and false to disable
* returns OK for success or FAIL for failure, incase threads fail to start
* */
int enableCompression(bool enable);
/** create threads for compression
* @param this_pointer obejct of this class
* */
static void* createThreads(void *this_pointer);
/** assignjobs to each thread
* @param thisData a bunch of frames
* @param numThisData number of frames
* */
void assignJobsForThread(char *thisData, int numThisData);
/** Checks if all the threads are done processing
* @param returns 1 for jobs done and 0 for jobs not done
* */
int checkIfJobsDone();
@ -190,8 +211,23 @@ private:
TFile *myFile;
#else
FILE *myFile;
/** pointer to array of structs when only using files */
single_photon_hit* photonHitList;
/** Number of Hits per frame*/
int nHitsPerFrame;
/** Number of Hits per file */
int nHitsPerFile;
/** Total Number of Hits Per Acquisition */
int nTotalHits;
#endif
/** Maximum Number of hits written to file */
const static int MAX_HITS_PER_FILE = 200000;
/** Number of Channels in X direction */
int nChannelsX;
@ -205,19 +241,21 @@ private:
int nClusterY;
/** map to the data without headers */
vector <vector<int16_t> > map;
int16_t *map;
/** Size of data with headers */
int dataSize;
/** mask as to which adcs are inverted */
vector <vector<int16_t> > mask;
int16_t *mask;
/** movingStat object */
vector <vector<movingStat> > stat;
movingStat *stat;
movingStat *nHitStat;
/** single Photon Hit structure */
single_photon_hit myPhotonHit;
single_photon_hit* myPhotonHit;
/** Cluster size */
const static int CLUSTER_SIZE = 3;
@ -276,9 +314,6 @@ private:
/** increment value for index for gotthard */
int incrementValue;
/** filter enable */
bool enable;
/** first packet */
bool firstTime;
@ -291,6 +326,58 @@ private:
/** current frame index */
int fIndex;
/** thread related variables */
static const int NUM_THREADS = 15;
pthread_t find_hits_thread[NUM_THREADS];
volatile int thread_started;
volatile int threads_mask;
pthread_mutex_t write_mutex;
pthread_mutex_t running_mutex;
/** current thread the job being allotted to */
int currentThread;
/** current index alloted for each thread */
int thisThreadIndex;
/** semaphore to synchronize between different jobs on same thread */
sem_t smp[NUM_THREADS];
/** starting memory of data for different threads */
char* mem0[NUM_THREADS];
/** number of frames alloted for each thread to process */
int* numFramesAlloted;
/** final file name */
char savefilename[MAX_STR_LENGTH];
/** file path */
char filePath[MAX_STR_LENGTH];
/** file prefix */
char fileName[MAX_STR_LENGTH];
/** file acquisition index */
int fileIndex;
/** 0 for 1d and 1 for 2d */
int deltaX;
/** index of center of cluster for 1d and for 2d*/
int clusterCenterPixel;
/** squareroot of cluster */
double sqrtCluster;
/** circular fifo buffer to be freed */
CircularFifo<char>* fifo;
};
#endif