From fcbb87b71a3e3507a306ca556909cbb6f4d95cbe Mon Sep 17 00:00:00 2001 From: vhinger Date: Thu, 6 Feb 2025 20:42:36 +0100 Subject: [PATCH] Adapt multithreaded cluster finder to storage cell data - continued (non-functional) --- .../dataStructures/HDF5File.cpp | 4 +- .../dataStructures/HDF5File.h | 2 +- .../jungfrauRawDataProcess_filetxtH5_SC.cpp | 65 +++-- .../multiThreadedAnalogDetector.h | 241 +++++++++++++----- .../multiThreadedCountingDetector.h | 4 +- 5 files changed, 221 insertions(+), 95 deletions(-) diff --git a/slsDetectorCalibration/dataStructures/HDF5File.cpp b/slsDetectorCalibration/dataStructures/HDF5File.cpp index 0d63bb330..0437e4689 100644 --- a/slsDetectorCalibration/dataStructures/HDF5File.cpp +++ b/slsDetectorCalibration/dataStructures/HDF5File.cpp @@ -46,8 +46,10 @@ void printDatatypeSize(hid_t dataset) { } -/* + +/* ********************** * Class member functions + * ********************** */ // Default constructor diff --git a/slsDetectorCalibration/dataStructures/HDF5File.h b/slsDetectorCalibration/dataStructures/HDF5File.h index 0629861b8..bf8858bc7 100644 --- a/slsDetectorCalibration/dataStructures/HDF5File.h +++ b/slsDetectorCalibration/dataStructures/HDF5File.h @@ -82,7 +82,7 @@ public: /** * Read an image into current_image, * increment Z-offset (frame) and (if rank==4) storage cell - * @returns frame number read, + * @returns frame number read from file, */ int ReadImage (uint16_t* image, std::vector& offset); diff --git a/slsDetectorCalibration/jungfrauExecutables/jungfrauRawDataProcess_filetxtH5_SC.cpp b/slsDetectorCalibration/jungfrauExecutables/jungfrauRawDataProcess_filetxtH5_SC.cpp index ca71b193d..a6dca0878 100644 --- a/slsDetectorCalibration/jungfrauExecutables/jungfrauRawDataProcess_filetxtH5_SC.cpp +++ b/slsDetectorCalibration/jungfrauExecutables/jungfrauRawDataProcess_filetxtH5_SC.cpp @@ -46,13 +46,19 @@ std::string getRootString( std::string const& filepath ) { * \param fext file extension (e.g. "raw") */ std::string createFileName( std::string const& dir, std::string const& fprefix="run", - std::string const& fsuffix="", std::string const& fext="raw", int const outfilecounter=-1 ) { - if (outfilecounter >= 0) - return fmt::format("{:s}/{:s}_{:s}_f{:05d}.{:s}", dir, fprefix, fsuffix, outfilecounter, fext); - else if (fsuffix.length()!=0) - return fmt::format("{:s}/{:s}_{:s}.{:s}", dir, fprefix, fsuffix, fext); - else - return fmt::format("{:s}/{:s}.{:s}", dir, fprefix, fext); + std::string const& fsuffix="", std::string const& fext="raw", int const outfilecounter=-1 ) { + std::string return_string; + if (outfilecounter >= 0) + return_string = fmt::format("{:s}/{:s}_{:s}_f{:05d}", dir, fprefix, fsuffix, outfilecounter); + else if (fsuffix.length()!=0) + return_string = fmt::format("{:s}/{:s}_{:s}", dir, fprefix, fsuffix); + else + return_string = fmt::format("{:s}/{:s}", dir, fprefix); + + if (fext.length()!=0) + return_string += "." + fext; + + return return_string; } /* Adjusts number of threads to be a multiple of number of storage cells @@ -208,6 +214,7 @@ int main(int argc, char *argv[]) { std::cout << std::ctime(&end_time) << std::endl; int nThreads = nthreads; + int nSC = 1; // Validate number of threads for number of storage cells (if applicable) // Determine the dimensions of the dataset from the first datafile @@ -219,7 +226,8 @@ int main(int argc, char *argv[]) { // Validate number of threads if( firstfileh5->GetRank() == 4 ) { auto h5dims = firstfileh5->GetDatasetDimensions(); - auto nThreads = adjustThreads(nthreads,h5dims[1]); + nSC = h5dims[1]; + nThreads = adjustThreads(nthreads,nSC); } firstfileh5->CloseResources(); @@ -229,7 +237,7 @@ int main(int argc, char *argv[]) { } multiThreadedCountingDetector* mt = - new multiThreadedCountingDetector(filter, nThreads, fifosize); + new multiThreadedCountingDetector(filter, nThreads, fifosize, nSC); //auto mt = std::make_unique(filter.get(), nthreads, fifosize); mt->setClusterSize(csize, csize); mt->newDataSet(); //Initialize new dataset for each thread @@ -242,7 +250,7 @@ int main(int argc, char *argv[]) { int ifr = 0; //frame counter of while loop int framenumber = 0; //framenumber as read from file (detector) - std::vector h5offset; //frame counter internal to HDF5File::ReadImage (provided for sanity check/debugging) + std::vector h5offset; //hyperslab offset internal to HDF5File::ReadImage hsize_t h5rank; if (pedfilename.length()>1) { @@ -279,20 +287,31 @@ int main(int argc, char *argv[]) { } int storageCell = 0; - if (h5rank == 4) + hsize_t n_storageCells = 1; + if (h5rank == 4) { storageCell = h5offset[1]; + n_storageCells = pedefile->GetDatasetDimensions()[1]; + } - mt->pushData(buff, storageCell); //HERE!!! - mt->nextThread(); - mt->popFree(buff); - ++ifr; - if (ifr % 100 == 0) { - std::cout << " ****" << ifr << " " << framenumber << " " << h5offset[0] - << std::endl; - } // else + // push buff into fifoData for a thread corresponding to the active storage cell + mt->pushData(buff, storageCell); + // increment (round-robin) the internal thread counter for that storage cell + mt->nextThread(storageCell); + // get a free memory address from fifoFree of the active storage cell for the next read operation + mt->popFree(buff,storageCell); + /* NOTE: the buff that was popped free from the current thread, will be (likely) pushed into + * the fifoData of a different thread in the next iteration of the loop! */ + + ++ifr; + if (ifr % 100 == 0) { + std::cout << " ****" << ifr << " " << framenumber << " " << h5offset[0]; + if (n_storageCells>1) + std::cout << " sc " << storageCell; + std::cout << "\n"; + } // else - if (ifr >= 1000) - break; + if (ifr >= 1000*n_storageCells) + break; //framenumber = 0; } @@ -302,9 +321,9 @@ int main(int argc, char *argv[]) { } std::cout << "froot " << froot << std::endl; - auto imgfname = createFileName( outdir, froot, "ped", "tiff"); + auto imgfname = createFileName( outdir, froot, "ped", "" ); mt->writePedestal(imgfname.c_str()); - imgfname = createFileName( outdir, froot, "rms", "tiff"); + imgfname = createFileName( outdir, froot, "rms", ""); mt->writePedestalRMS(imgfname.c_str()); } else diff --git a/slsDetectorCalibration/multiThreadedAnalogDetector.h b/slsDetectorCalibration/multiThreadedAnalogDetector.h index 9df893bb4..26d7d0107 100644 --- a/slsDetectorCalibration/multiThreadedAnalogDetector.h +++ b/slsDetectorCalibration/multiThreadedAnalogDetector.h @@ -341,9 +341,9 @@ class threadedAnalogDetector { class multiThreadedAnalogDetector { public: - multiThreadedAnalogDetector(analogDetector *d, int n, - int fs = 1000) - : stop(0), nThreads(n), ithread(0) { + multiThreadedAnalogDetector(analogDetector *d, int num_threads, + int fs = 1000, int num_sc = 1) + : stop(0), nThreads(num_threads), nSC(num_sc) { dd[0] = d; if (nThreads == 1) dd[0]->setId(100); @@ -359,6 +359,18 @@ class multiThreadedAnalogDetector { dets[i] = new threadedAnalogDetector(dd[i], fs); } + // Pre-assign threads to storage cells in a round-robin manner + int threads_per_sc = nThreads / nSC; + sc_to_threads.clear(); + for (int s = 0; s < nSC; ++s) { + for (int t = 0; t < threads_per_sc; ++t) { + int assigned_thread = s * threads_per_sc + t; + sc_to_threads[s].push_back(assigned_thread); + } + } + // Set all thread counter to zero for each storage cell + thread_counters_by_sc.resize(nSC,0); + image = nullptr; ff = NULL; ped = NULL; @@ -522,82 +534,133 @@ class multiThreadedAnalogDetector { return ret; } - virtual bool pushData(char *&ptr, int SC_value) { +/* + virtual std::vector getThreadsForSc(int sc) { + return sc_to_threads[sc]; + } +*/ + virtual bool pushData(char *&ptr, int sc=0) { + //Additional logic implemented to accommodate storage cells + + std::unique_lock lock(map_mutex); // Get assigned threads for this storage cell - std::vector& assigned_threads = sc_to_threads[SC_value]; + auto& assigned_threads = sc_to_threads[sc]; + auto& counter = thread_counters_by_sc[sc]; + + // Distribute workload among threads using round-robin + int selected_thread = assigned_threads[counter % assigned_threads.size()]; - return dets[assigned_thread]->pushData(ptr); + return dets[selected_thread]->pushData(ptr); } - virtual bool popFree(char *&ptr) { - // cout << ithread << endl; - return dets[ithread]->popFree(ptr); + virtual bool popFree(char *&ptr, int sc=0) { + //Additional logic implemented to accommodate storage cells + + std::unique_lock lock(map_mutex); + + // Get assigned threads for this storage cell + auto& assigned_threads = sc_to_threads[sc]; + auto& counter = thread_counters_by_sc[sc]; + + // Distribute workload among threads using round-robin + int selected_thread = assigned_threads[counter % assigned_threads.size()]; + + return dets[selected_thread]->popFree(ptr); } - virtual int nextThread() { - ithread++; - if (ithread == nThreads) - ithread = 0; - return ithread; + virtual int nextThread(int sc=0) { + //Additional logic implemented to accommodate storage cells + + auto& counter = thread_counters_by_sc[sc]; + //counter++; + if (++counter == nThreads/nSC) + counter = 0; + return counter; } - virtual double *getPedestal() { + // Storage cell sensitive + virtual double *getPedestal(int sc = 0) { int nx, ny; dets[0]->getDetectorSize(nx, ny); - if (ped) - delete[] ped; - ped = new double[nx * ny]; + + if (sc_pedestals.count(sc) && sc_pedestals[sc]) { + delete[] sc_pedestals[sc]; + sc_pedestals[sc] = nullptr; + } + + // allocate memory and initialize all values to zero + sc_pedestals[sc] = new double[nx * ny](); // parentheses initialize elements to zero + //std::fill(sc_pedestals[sc], sc_pedestals[sc] + (nx * ny), 0.0); // explicit zero initialization double *p0 = new double[nx * ny]; - for (int i = 0; i < nThreads; i++) { + // Get the threads assigned to this storage cell + auto const& assigned_threads = sc_to_threads[sc]; + int num_threads = assigned_threads.size(); + + // Only iterate over threads assigned to this storage cell + for ( int thread_id : assigned_threads ) { // inte=(slsInterpolation*)dets[i]->getInterpolation(nb,emi,ema); // cout << i << endl; - p0 = dets[i]->getPedestal(p0); - if (p0) { - if (i == 0) { + //p0 = dets[thread_id]->getPedestal(p0); + dets[thread_id]->getPedestal(p0); + if (p0) { /* + if (i == 0) { + // If first thread, initialize ped with first thread's values for (int ib = 0; ib < nx * ny; ib++) { ped[ib] = p0[ib] / ((double)nThreads); // cout << p0[ib] << " "; } } else { - for (int ib = 0; ib < nx * ny; ib++) { - ped[ib] += p0[ib] / ((double)nThreads); - // cout << p0[ib] << " "; - } + */ + // For subsequent threads, accumulate pedestal values + // if ( i == 0 ) becomes superfluous if we zero-initialize earlier + for (int ib = 0; ib < nx * ny; ib++) { + sc_pedestals[sc][ib] += p0[ib] / ((double)num_threads); + // cout << p0[ib] << " "; } + //} } } delete[] p0; - return ped; + return sc_pedestals[sc]; }; - virtual double *getPedestalRMS() { + // Storage cell sensitive + virtual double *getPedestalRMS(int sc = 0) { int nx, ny; dets[0]->getDetectorSize(nx, ny); - // if (ped) delete [] ped; - double *rms = new double[nx * ny]; + + if (sc_pedestals_rms.count(sc) && sc_pedestals_rms[sc]) { + delete[] sc_pedestals_rms[sc]; + sc_pedestals_rms = nullptr; + } + + // allocate memory and initialize all values to zero + sc_pedestals_rms[sc] = new double[nx * ny](); // Zero-initialize + //std::fill(sc_pedestals_rms[sc], sc_pedestals_rms[sc] + (nx * ny), 0.0); // explicit zero initialization + //double *rms = sc_pedestals_rms[sc]; double *p0 = new double[nx * ny]; - for (int i = 0; i < nThreads; i++) { + // Get the threads assigned to this storage cell + auto const& assigned_threads = sc_to_threads[sc]; + int num_threads = assigned_threads.size(); + + // Only iterate over threads assigned to this storage cell + for (int thread_id : assigned_threads) { // inte=(slsInterpolation*)dets[i]->getInterpolation(nb,emi,ema); // cout << i << endl; - p0 = dets[i]->getPedestalRMS(p0); - if (p0) { - if (i == 0) { + //p0 = dets[thread_id]->getPedestalRMS(p0); + dets[thread_id]->getPedestalRMS(p0); - for (int ib = 0; ib < nx * ny; ib++) { - rms[ib] = p0[ib] * p0[ib] / ((double)nThreads); + if (p0) { + for (int ib = 0; ib < nx * ny; ib++) { + sc_pedestals_rms[sc][ib] += (p0[ib] * p0[ib]) / ((double)num_threads); // cout << p0[ib] << " "; - } - } else { - for (int ib = 0; ib < nx * ny; ib++) { - rms[ib] += p0[ib] * p0[ib] / ((double)nThreads); - // cout << p0[ib] << " "; - } } } + } delete[] p0; @@ -608,9 +671,10 @@ class multiThreadedAnalogDetector { /* rms[ib]=0; */ /* } */ - return rms; + return sc_pedestals_rms[sc]; }; + // Does not differentiate for storage cells! virtual double *setPedestal(double *h = NULL) { // int nb=0; @@ -625,44 +689,79 @@ class multiThreadedAnalogDetector { return NULL; }; - virtual void *writePedestal(const char *imgname) { + // Storage cell sensitive + virtual void *writePedestal(char const* base_imgname) { int nx, ny; dets[0]->getDetectorSize(nx, ny); - getPedestal(); - float *gm = new float[nx * ny]; - if (gm) { - for (int ix = 0; ix < nx * ny; ix++) { - gm[ix] = ped[ix]; - } - WriteToTiff(gm, imgname, nx, ny); - delete[] gm; - } else - std::cout << "Could not allocate float image " << std::endl; + //float *gm = new float[nx * ny]; - if(ped) - delete[] ped; //Memory cleanup + // Loop over each storage cell + for ( auto const& entry : sc_to_threads ) { + int sc = entry.first; + std::string imgname = std::string(base_imgname); + if (nSC>1) + imgname += "_SC" + std::to_string(sc); + imgname += ".tiff"; + + getPedestal(sc); // Compute pedestal for this storage cell + std::vector gm(nx * ny); + + // Copy pedestal data into the float array + /* + for (int ix = 0; ix < nx * ny; ix++) { + gm[ix] = sc_pedestals[sc][ix]; + } + */ + std::copy(sc_pedestals[sc], sc_pedestals[sc] + (nx * ny), gm.data()); + WriteToTiff(gm.data(), imgname.c_str(), nx, ny); + + // Clean up memory + if(sc_pedestals[sc]) { + delete[] sc_pedestals[sc]; + sc_pedestals[sc] = nullptr; + } + } return NULL; }; - virtual void *writePedestalRMS(const char *imgname) { + // Storage cell sensitive + virtual void *writePedestalRMS(char const* base_imgname) { int nx, ny; dets[0]->getDetectorSize(nx, ny); - double *rms = getPedestalRMS(); - float *gm = new float[nx * ny]; - if (gm) { + //float *gm = new float[nx * ny]; + + // Loop over each stoarge cell + for ( auto const& entry : sc_to_threads ) { + int sc = entry.first; + std::string imgname = std::string(base_imgname); + if (nSC>1) + imgname += "_SC" + std::to_string(sc); + imgname += ".tiff"; + + double *rms = getPedestalRMS(sc); // Compute pedestal RMS for this storage cell + std::vector gm(nx * ny); + + // Copy rms data into the float array + /* for (int ix = 0; ix < nx * ny; ix++) { - gm[ix] = rms[ix]; + gm[ix] = rms[ix]; } - WriteToTiff(gm, imgname, nx, ny); - delete[] gm; + */ + std::copy(rms, rms + (nx * ny), gm.data()); + WriteToTiff(gm.data(), imgname.c_str(), nx, ny); + + // Clean up memory delete[] rms; - } else - std::cout << "Could not allocate float image " << std::endl; + if(sc_pedestals_rms[sc]) { + delete[] sc_pedestals_rms[sc]; + sc_pedestals_rms[sc] = nullptr; + } + } return NULL; }; @@ -713,13 +812,19 @@ class multiThreadedAnalogDetector { const int nThreads; threadedAnalogDetector *dets[MAXTHREADS]; analogDetector *dd[MAXTHREADS]; - int ithread; + //int ithread{0}; // Thread index + std::vector thread_counters_by_sc{}; // Round-robin counters for threads for each storage cell int* image; int* ff; double* ped; //pthread_mutex_t fmutex; //unused - std::unordered_map< int, std::vector > sc_to_threads; // Maps storage cell -> assigned threads - std::mutex map_mutex; // Ensure thread safe access to the map + std::unordered_map< int, std::vector > sc_to_threads; // Maps storage cell -> vector of assigned thread ids + std::mutex map_mutex; // Ensure thread-safe access to the map + int nSC{1}; // Number of storage cells + + std::unordered_map sc_pedestals; // Store pedestal arrays per storage cell + std::unordered_map sc_pedestals_rms; // Store pedestal RMS arrays per storage cell + // at the moment, these maps could be avoided, but this implementation is more robust in allowing future changes }; #endif diff --git a/slsDetectorCalibration/multiThreadedCountingDetector.h b/slsDetectorCalibration/multiThreadedCountingDetector.h index 2fde56f2d..588e6a765 100644 --- a/slsDetectorCalibration/multiThreadedCountingDetector.h +++ b/slsDetectorCalibration/multiThreadedCountingDetector.h @@ -19,8 +19,8 @@ using namespace std; class multiThreadedCountingDetector : public multiThreadedAnalogDetector { public: - multiThreadedCountingDetector(singlePhotonDetector *d, int n, int fs = 1000) - : multiThreadedAnalogDetector(d, n, fs){}; + multiThreadedCountingDetector(singlePhotonDetector *d, int num_threads, int fs = 1000, int num_sc = 1) + : multiThreadedAnalogDetector(d, num_threads, fs, num_sc){}; // virtual // ~multiThreadedCountingDetector{multiThreadedAnalogDetector::~multiThreadedAnalogDetector();}; virtual double setNSigma(double n) {