From cd88aff756fd16542ff5deac3676eadef2be03d7 Mon Sep 17 00:00:00 2001 From: l_maliakal_d Date: Thu, 14 Nov 2013 12:34:31 +0000 Subject: [PATCH] receiver complete change git-svn-id: file:///afs/psi.ch/project/sls_det_software/svn/slsDetectorSoftware@687 951219d9-93cf-4727-9268-0efd64621fa3 --- .../singlePhotonFilter.cpp | 564 +++++++++++++----- .../slsDetectorAnalysis/singlePhotonFilter.h | 211 +++++-- 2 files changed, 558 insertions(+), 217 deletions(-) diff --git a/slsDetectorSoftware/slsDetectorAnalysis/singlePhotonFilter.cpp b/slsDetectorSoftware/slsDetectorAnalysis/singlePhotonFilter.cpp index e349761d3..cf77aee6b 100644 --- a/slsDetectorSoftware/slsDetectorAnalysis/singlePhotonFilter.cpp +++ b/slsDetectorSoftware/slsDetectorAnalysis/singlePhotonFilter.cpp @@ -3,17 +3,22 @@ * @short single photon filter using trees ***********************************************/ #include "singlePhotonFilter.h" +#include +#define BUF_SIZE (16*1024*1024) //16mb singlePhotonFilter::singlePhotonFilter(int nx, int ny, int fmask, int pmask, int foffset, int poffset, int pperf, int iValue, - vector > m, vector > s, int d): + int16_t *m, int16_t *s, CircularFifo* f, int d): #ifdef MYROOT myTree(NULL), myFile(NULL), #else myFile(NULL), + nHitsPerFrame(0), + nHitsPerFile(0), + nTotalHits(0), #endif nChannelsX(nx), nChannelsY(ny), @@ -36,38 +41,128 @@ singlePhotonFilter::singlePhotonFilter(int nx, int ny, packet_index_offset(poffset), packets_per_frame(pperf), incrementValue(iValue), - enable(false), firstTime(true), ret(0), pIndex(0), - fIndex(0){ - + fIndex(0), + thread_started(0), + threads_mask(0x0), + currentThread(-1), + thisThreadIndex(-1), + fileIndex(0), + fifo(f){ +#ifndef MYROOT + photonHitList = new single_photon_hit[nChannelsX*nChannelsY]; +#endif + //cluster if (nChannelsX) nClusterX = 1; + sqrtCluster = sqrt(nClusterX*nClusterY); + deltaX = nClusterX/2;// 0 or 1 + clusterCenterPixel = (deltaX * nClusterY) + 1; - stat.resize(nChannelsX); - for(int i=0; idata = new double[nClusterX*nClusterY]; + myPhotonHit->x = 0; + myPhotonHit->y = 0; + myPhotonHit->rms = 0; + myPhotonHit->ped = 0; + myPhotonHit->iframe = -1; + + for(int i=0;i < NUM_THREADS; i++){ + /*smp[i] = NULL;*/ + mem0[i]=NULL; + } + numFramesAlloted = new int[NUM_THREADS]; + + strcpy(savefilename,""); + strcpy(filePath,""); + strcpy(fileName,"run"); + + pthread_mutex_init(&write_mutex,NULL); + pthread_mutex_init(&running_mutex,NULL); + + + +} + + +singlePhotonFilter::~singlePhotonFilter(){ + enableCompression(false); + if(numFramesAlloted) delete [] numFramesAlloted; + writeToFile(); + closeFile(); + if(myFile) delete myFile; + if(mask) delete mask; + if(stat) delete stat; + if(nHitStat) delete nHitStat; + /*if(smp) delete []smp;*/ + if(mem0) delete [] mem0; + if(fifo) delete fifo; } -void singlePhotonFilter::initTree(char *outfname){ +int singlePhotonFilter::enableCompression(bool enable){ +//#ifdef VERBOSE + cout << "Compression set to " << enable << endl; +//#endif + if(enable){ + threads_mask = 0x0; + currentThread = -1; + + for(int i=0; ifindHits(); + return this_pointer; +} + + + +int singlePhotonFilter::initTree(){ #ifdef MYROOT + if(myFile) { + writeToFile(); + closeFile(); + } outfname = string(outfname).replace(".raw",".root"); //fName.replace(".raw",".png"); //sprintf(outfname, "%s/%s.root", outdir, fname); @@ -81,163 +176,89 @@ void singlePhotonFilter::initTree(char *outfname){ myFile = new TFile(outfname, "RECREATE"); /** later return error if it exists */ //tree myTree = new TTree(fname, fname); - myTree->Branch("x",&myPhotonHit.x,"x/I"); - myTree->Branch("y",&myPhotonHit.y,"y/I"); - myTree->Branch("data",myPhotonHit.data,cdata); - myTree->Branch("pedestal",&myPhotonHit.ped,"pedestal/D"); - myTree->Branch("rms",&myPhotonHit.rms,"rms/D"); + myTree->Branch("iframe",&myPhotonHit->iframe,"iframe/I"); + myTree->Branch("x",&myPhotonHit->x,"x/I"); + myTree->Branch("y",&myPhotonHit->y,"y/I"); + myTree->Branch("data",myPhotonHit->data,cdata); + myTree->Branch("pedestal",&myPhotonHit->ped,"pedestal/D"); + myTree->Branch("rms",&myPhotonHit->rms,"rms/D"); #else - ;/*myFile = fopen(outfname, "w");*/ + + writeToFile(); + closeFile(); + sprintf(savefilename, "%s/%s_f%012d_%d.raw", filePath,fileName,nTotalHits,fileIndex); + myFile = fopen(savefilename, "w"); + setvbuf(myFile,NULL,_IOFBF,BUF_SIZE); + cout<<"File created: "<Write(); - - myFile = myTree->GetCurrentFile(); - myFile->Close(); - delete myFile; - } #else - ; - /*if(myFile){ //&& (number of structs?) - fwrite((void*)(&myPhotonHit), 1, sizeof(myPhotonHit), myFile); - fclose(myFile); - delete myFile; - return OK; - }*/ + if(nHitsPerFrame){ + if(myFile){ + /*cout<<"writing "<< nHitsPerFrame << " hits to file" << endl;*/ + fwrite((void*)(photonHitList), 1, sizeof(single_photon_hit)*nHitsPerFrame, myFile); + nHitsPerFrame = 0; + //cout<<"Exiting writeToFile"<= (myDataSize))){ - cout << "Bad Channel Mapping index: " << map[ir][ic] << endl; - continue; - } - - - //if frame within pedestal number - if (myPhotonHit.iframe < nped) - stat[ir][ic].Calc((double)(mask[ir][ic] ^ myData[map[ir][ic]])); - - // frame outside pedestal number - else{ - - //if hit wasnt registered - if (hits[ir][ic] == 0){ - - myPhotonHit.rms = stat[ir][ic].StandardDeviation(); - myPhotonHit.ped = stat[ir][ic].Mean(); - - dum = 1; - tot = 0; - - //for 1d and 2d - int c = 1; - int d = 1; - if (nChannelsX == 1) - c = 0; - - //center pixel - myPhotonHit.data[c][d] = ((double)(mask[ir][ic] ^ myData[map[ir][ic]])) - myPhotonHit.ped; - - //check if neighbours are involved - for (int ih = -c; ih <= c ; ih++){ - for (int iv = -d; iv <= d; iv++){ - //validate neighbouring pixels (not really needed) - if (((ir+ih) >= 0) && ((ir+ih) < nChannelsX) && ((ic+iv) >= 0) && ((ic+iv) < nChannelsY)) { - //validate mapping - if ((map[ir+ih][ic+iv] < 0) || (map[ir+ih][ic+iv] >= myDataSize)){ - cout << "Bad Channel Mapping index: " << map[ir][ic] << endl; - continue; - } - - myPhotonHit.data[iv+d][ih+c] = (double)(mask[ir][ic] ^ myData[map[ir+ih][ic+iv]])-stat[ir+ih][ic+iv].Mean(); - tot += myPhotonHit.data[iv+c][ih+d]; - if (myPhotonHit.data[iv+c][ih+d] > myPhotonHit.data[c][d]) - dum = 2; - } - } - } - - - if (tot < CLUSTER_SIZE * nsigma * myPhotonHit.rms) - dum = 0; - - if (myPhotonHit.data[c][d] < nsigma * myPhotonHit.rms && dum != 0) - dum = 3; - - if (myPhotonHit.data[c][d] > -nsigma * myPhotonHit.rms && - myPhotonHit.data[c][d] < nsigma * myPhotonHit.rms && - dum == 0){ - //Appriximated running average - stat[ir][ic].Calc((double)(mask[ir][ic]^myData[map[ir][ic]])); - } - else if (dum == 1){ - myPhotonHit.x = ic; - myPhotonHit.y = ir; +int singlePhotonFilter::closeFile(){ #ifdef MYROOT - myTree->Fill(); -#endif - hits[ir][ic] = 1; - nHits++; - } - - } - } + if(myTree){ + if (myFile){ + myFile = myTree->GetCurrentFile(); + myFile->Close(); + delete myFile; } + delete myTree; } - - if (myPhotonHit.iframe%1000 == 0) - cout << "Frame: " << myPhotonHit.iframe << " Hits: " << nHits << endl; - - return nHits; +#else + if(myFile) + fclose(myFile); + myFile = NULL; +#endif + return OK; } -void singlePhotonFilter::setupAcquisitionParameters(){ - fnum = 0; pnum = 0; ptot = 0; f0 = 0; firstTime = true; +void singlePhotonFilter::setupAcquisitionParameters(char *outfpath, char* outfname, int outfIndex){ + fileIndex = outfIndex; + strcpy(filePath,outfpath); + strcpy(fileName,outfname); + fnum = 0; pnum = 0; ptot = 0; f0 = 0; firstTime = true; currentThread = -1; + //initialize + for (int ir=0; irClear(); + nHitStat->SetN(nbackground); +#ifndef MYROOT + nTotalHits = 0; +#endif } @@ -260,7 +281,7 @@ int singlePhotonFilter::verifyFrame(char *inData){ if (pIndex == 0) pIndex = packets_per_frame; #ifdef VERYVERBOSE - cout<<"fi:"<>frame_index_offset) - f0; + + + //for each pixel + for (ir=0; ir= (dataSize))){ + cout << "Bad Channel Mapping index: " << map[currentIndex] << endl; + continue; + } + + //if frame within pedestal number + if (clusteriframe < nped){ + stat[currentIndex].Calc((double)(mask[currentIndex] ^ myData[map[currentIndex]])); + // frame outside pedestal number + }else{ + + dum = 1; + tot = 0; + clusterrms = stat[currentIndex].StandardDeviation();//-1 + clusterped = stat[currentIndex].Mean();//0 + sigmarms = clusterrms * nsigma; + + + clusterData[clusterCenterPixel] = ((double)(mask[currentIndex] ^ myData[map[currentIndex]])) - clusterped; + for (r=-deltaX; r <= deltaX; ++r ){ + if (((ir+r) < 0) || ((ir+r) >= nChannelsX)) + continue; + for(c=-1; c <= 1; ++c){ + if (((ic+c) < 0) || ((ic+c) >= nChannelsY)) + continue; + + + pixelIndex = currentIndex + (r*nChannelsY+c); + + if ((map[pixelIndex] < 0) || (map[pixelIndex] >= dataSize)){ + cout << "Bad Channel Mapping index: " << map[pixelIndex] << endl; + continue; + } + + clusterIndex = pixelIndex-(currentIndex - deltaX * nChannelsY - 1); + clusterData[clusterIndex] = ((double)(mask[pixelIndex] ^ myData[map[pixelIndex]])) - stat[pixelIndex].Mean(); + tot += clusterData[clusterIndex]; + //discard negative events + if (clusterData[clusterIndex] > clusterData[clusterCenterPixel]) + dum = 2; + + } + + } + + + if (tot < sqrtCluster * sigmarms) + dum = 0; + //discard events (for pedestal) where sum of the neighbours is too large. + if (clusterData[clusterCenterPixel] < sigmarms && dum != 0) + dum = 3; + //Appriximated running average + if (clusterData[clusterCenterPixel] > -sigmarms && + clusterData[clusterCenterPixel] < sigmarms && + dum == 0){ + stat[currentIndex].Calc((double)(mask[currentIndex]^myData[map[currentIndex]])); + } + // this is an event and we are in the center + else if (dum == 1){ + pthread_mutex_lock(&write_mutex); +#ifdef MYROOT + myTree->Fill(); +#else + photonHitList[nHitsPerFrame].data = clusterData; + photonHitList[nHitsPerFrame].x = ic; + photonHitList[nHitsPerFrame].y = ir; + photonHitList[nHitsPerFrame].rms = clusterrms; + photonHitList[nHitsPerFrame].ped = clusterped; + photonHitList[nHitsPerFrame].iframe = clusteriframe; + + nHitsPerFrame++; + nHitsPerFile++; + nTotalHits++; + if(nHitsPerFile >= MAX_HITS_PER_FILE-1) + initTree(); +#endif + pthread_mutex_unlock(&write_mutex); + } + + } + } + } + + + }//else cout<<"***did not get whoel frame in single photon filter"<Calc((double)nHitsPerFrame); + + pthread_mutex_lock(&write_mutex); + writeToFile(); + fifo->push(isData); + pthread_mutex_unlock(&write_mutex); + + isData += 4096; + myData += 2048; + +/* + if ((clusteriframe%1000 == 0) && (clusteriframe != 0) ){ + cout << dec << "Frame: " << clusteriframe << " Hit Avg over last frames: " << + nHitStat->Mean() << " .. "<StandardDeviation() << endl; + cout<<"writing "<< nHitsPerFrame << " hits to file" << endl; + } +*/ + } + + pthread_mutex_lock(&running_mutex); + threads_mask^=(1<=25000) { + cout<<"*****************problem: "<<((theData-listmem0)/4096)<<" :"< #include #include +#include +#include - +#include "circularFifo.h" #include "runningStat.h" #include "movingStat.h" using namespace std; @@ -36,7 +38,7 @@ using namespace std; typedef double double32_t; typedef float float32_t; typedef int int32_t; - +#define MAX_STR_LENGTH 1000 /** return values @@ -49,7 +51,7 @@ enum { @short structure for a single photon hit */ typedef struct{ - vector < vector > data; /**< data size */ + double* data; /**< data size */ int x; /**< x-coordinate of the center of hit */ int y; /**< x-coordinate of the center of hit */ double rms; /**< noise of central pixel */ @@ -58,8 +60,6 @@ typedef struct{ }single_photon_hit; - - /** @short class handling trees and its data file */ @@ -78,10 +78,9 @@ public: * @param iValue increment value (only for gotthard to increment index to have matching frame number) * @param m Map to data without headers * @param s mask as to which adcs are inverted + * @param f circular fifo buffer, which needs to be freed * @param d Size of data with the headers */ - - /** why is the datasize -1, you need to know the datasize with headers so that you dont go over the limits */ singlePhotonFilter( int nx, int ny, @@ -91,55 +90,13 @@ public: int poffset, int pperf, int iValue, - vector > m, - vector > s, - int d = -1); + int16_t *m, + int16_t *s, + CircularFifo* f, + int d); /** virtual destructor */ - virtual ~singlePhotonFilter(){}; - - /** - * Construct a tree, populate struct for the single photon hit and provide all the masks and offsets - * @param outdir Output file directory/Output file name - - */ - void initTree(char *outfname); - - /** - * Reset Indices before starting acquisition - */ - void setupAcquisitionParameters(); - - /** reconstruct the frame with all the right packets - * @param inData the data from socket to be verified - * */ - int verifyFrame(char *inData); - - /** - * Writes tree/struct to file - * returns OK if successful, else FAIL - */ - int writeToFile(); - - /** - * Find Hits frame by frame - * @param myData data for one frame - * @param myDataSize data size for one frame with headers - * returns number of hits - */ - int findHits(int16_t *myData, int myDataSize); - - /** - * Enable Filter, This makes sure findHits() is called - */ - void enableFilter(bool r){enable = r;}; - - /** - * Returns packets per frame - */ - int getPacketsPerFrame(){ return packets_per_frame;}; - - + virtual ~singlePhotonFilter(); #ifdef MYROOT /** @@ -148,10 +105,15 @@ public: TTree *getTree(){ return myTree; }; #endif + /** + * Returns packets per frame + */ + int getPacketsPerFrame(){ return packets_per_frame;}; + /** * returns struct */ - single_photon_hit getStructure(){ return myPhotonHit; }; + single_photon_hit* getStructure(){ return myPhotonHit; }; /** Set number of frames to calculate pedestal at beginning */ void setNPed(int n){ nped = n; }; @@ -177,6 +139,65 @@ public: /** Get correction */ double getOutCorr(){return outcorr;}; + /** + * Construct a tree, populate struct for the single photon hit and provide all the masks and offsets + * @param outdir Output file directory/Output file name + * returns OK if successful, else FAIL + + */ + int initTree(); + + /** + * Writes tree/struct to file + * returns OK if successful, else FAIL + */ + int writeToFile(); + + /** + * Closes file + * returns OK if successful, else FAIL + */ + int closeFile(); + + /** + * Reset Indices before starting acquisition + */ + void setupAcquisitionParameters(char *outfpath, char* outfname, int outfIndex); + + /** reconstruct the frame with all the right packets + * @param inData the data from socket to be verified + * returns 0 if still waiting for next packet of same frame, + * 1 if end of complete frame, -1 if end of incomplete frame, + * -2 first packet of next frame, so push previous one; -3 last packet of current frame, push both frames + * */ + int verifyFrame(char *inData); + + /** + * Find Hits frame by frame and save it in file/tree + */ + void findHits(); + + /** Enable or disable compression + * @param enable true to enable compression and false to disable + * returns OK for success or FAIL for failure, incase threads fail to start + * */ + int enableCompression(bool enable); + + /** create threads for compression + * @param this_pointer obejct of this class + * */ + static void* createThreads(void *this_pointer); + + /** assignjobs to each thread + * @param thisData a bunch of frames + * @param numThisData number of frames + * */ + void assignJobsForThread(char *thisData, int numThisData); + + /** Checks if all the threads are done processing + * @param returns 1 for jobs done and 0 for jobs not done + * */ + int checkIfJobsDone(); @@ -190,8 +211,23 @@ private: TFile *myFile; #else FILE *myFile; + + /** pointer to array of structs when only using files */ + single_photon_hit* photonHitList; + + /** Number of Hits per frame*/ + int nHitsPerFrame; + + /** Number of Hits per file */ + int nHitsPerFile; + + /** Total Number of Hits Per Acquisition */ + int nTotalHits; #endif + /** Maximum Number of hits written to file */ + const static int MAX_HITS_PER_FILE = 200000; + /** Number of Channels in X direction */ int nChannelsX; @@ -205,19 +241,21 @@ private: int nClusterY; /** map to the data without headers */ - vector > map; + int16_t *map; /** Size of data with headers */ int dataSize; /** mask as to which adcs are inverted */ - vector > mask; + int16_t *mask; /** movingStat object */ - vector > stat; + movingStat *stat; + + movingStat *nHitStat; /** single Photon Hit structure */ - single_photon_hit myPhotonHit; + single_photon_hit* myPhotonHit; /** Cluster size */ const static int CLUSTER_SIZE = 3; @@ -276,9 +314,6 @@ private: /** increment value for index for gotthard */ int incrementValue; - /** filter enable */ - bool enable; - /** first packet */ bool firstTime; @@ -291,6 +326,58 @@ private: /** current frame index */ int fIndex; + /** thread related variables */ + static const int NUM_THREADS = 15; + pthread_t find_hits_thread[NUM_THREADS]; + volatile int thread_started; + volatile int threads_mask; + pthread_mutex_t write_mutex; + pthread_mutex_t running_mutex; + + /** current thread the job being allotted to */ + int currentThread; + + /** current index alloted for each thread */ + int thisThreadIndex; + + /** semaphore to synchronize between different jobs on same thread */ + sem_t smp[NUM_THREADS]; + + /** starting memory of data for different threads */ + char* mem0[NUM_THREADS]; + + /** number of frames alloted for each thread to process */ + int* numFramesAlloted; + + + + /** final file name */ + char savefilename[MAX_STR_LENGTH]; + + /** file path */ + char filePath[MAX_STR_LENGTH]; + + /** file prefix */ + char fileName[MAX_STR_LENGTH]; + + /** file acquisition index */ + int fileIndex; + + + /** 0 for 1d and 1 for 2d */ + int deltaX; + + /** index of center of cluster for 1d and for 2d*/ + int clusterCenterPixel; + + /** squareroot of cluster */ + double sqrtCluster; + + /** circular fifo buffer to be freed */ + CircularFifo* fifo; }; + + + #endif