Adapt multithreaded cluster finder to storage cell data - continued (non-functional)

This commit is contained in:
2025-02-06 20:42:36 +01:00
parent 3a1945116a
commit fcbb87b71a
5 changed files with 221 additions and 95 deletions

View File

@ -341,9 +341,9 @@ class threadedAnalogDetector {
class multiThreadedAnalogDetector {
public:
multiThreadedAnalogDetector(analogDetector<uint16_t> *d, int n,
int fs = 1000)
: stop(0), nThreads(n), ithread(0) {
multiThreadedAnalogDetector(analogDetector<uint16_t> *d, int num_threads,
int fs = 1000, int num_sc = 1)
: stop(0), nThreads(num_threads), nSC(num_sc) {
dd[0] = d;
if (nThreads == 1)
dd[0]->setId(100);
@ -359,6 +359,18 @@ class multiThreadedAnalogDetector {
dets[i] = new threadedAnalogDetector(dd[i], fs);
}
// Pre-assign threads to storage cells in a round-robin manner
int threads_per_sc = nThreads / nSC;
sc_to_threads.clear();
for (int s = 0; s < nSC; ++s) {
for (int t = 0; t < threads_per_sc; ++t) {
int assigned_thread = s * threads_per_sc + t;
sc_to_threads[s].push_back(assigned_thread);
}
}
// Set all thread counter to zero for each storage cell
thread_counters_by_sc.resize(nSC,0);
image = nullptr;
ff = NULL;
ped = NULL;
@ -522,82 +534,133 @@ class multiThreadedAnalogDetector {
return ret;
}
virtual bool pushData(char *&ptr, int SC_value) {
/*
virtual std::vector<int> getThreadsForSc(int sc) {
return sc_to_threads[sc];
}
*/
virtual bool pushData(char *&ptr, int sc=0) {
//Additional logic implemented to accommodate storage cells
std::unique_lock<std::mutex> lock(map_mutex);
// Get assigned threads for this storage cell
std::vector<int>& assigned_threads = sc_to_threads[SC_value];
auto& assigned_threads = sc_to_threads[sc];
auto& counter = thread_counters_by_sc[sc];
// Distribute workload among threads using round-robin
int selected_thread = assigned_threads[counter % assigned_threads.size()];
return dets[assigned_thread]->pushData(ptr);
return dets[selected_thread]->pushData(ptr);
}
virtual bool popFree(char *&ptr) {
// cout << ithread << endl;
return dets[ithread]->popFree(ptr);
virtual bool popFree(char *&ptr, int sc=0) {
//Additional logic implemented to accommodate storage cells
std::unique_lock<std::mutex> lock(map_mutex);
// Get assigned threads for this storage cell
auto& assigned_threads = sc_to_threads[sc];
auto& counter = thread_counters_by_sc[sc];
// Distribute workload among threads using round-robin
int selected_thread = assigned_threads[counter % assigned_threads.size()];
return dets[selected_thread]->popFree(ptr);
}
virtual int nextThread() {
ithread++;
if (ithread == nThreads)
ithread = 0;
return ithread;
virtual int nextThread(int sc=0) {
//Additional logic implemented to accommodate storage cells
auto& counter = thread_counters_by_sc[sc];
//counter++;
if (++counter == nThreads/nSC)
counter = 0;
return counter;
}
virtual double *getPedestal() {
// Storage cell sensitive
virtual double *getPedestal(int sc = 0) {
int nx, ny;
dets[0]->getDetectorSize(nx, ny);
if (ped)
delete[] ped;
ped = new double[nx * ny];
if (sc_pedestals.count(sc) && sc_pedestals[sc]) {
delete[] sc_pedestals[sc];
sc_pedestals[sc] = nullptr;
}
// allocate memory and initialize all values to zero
sc_pedestals[sc] = new double[nx * ny](); // parentheses initialize elements to zero
//std::fill(sc_pedestals[sc], sc_pedestals[sc] + (nx * ny), 0.0); // explicit zero initialization
double *p0 = new double[nx * ny];
for (int i = 0; i < nThreads; i++) {
// Get the threads assigned to this storage cell
auto const& assigned_threads = sc_to_threads[sc];
int num_threads = assigned_threads.size();
// Only iterate over threads assigned to this storage cell
for ( int thread_id : assigned_threads ) {
// inte=(slsInterpolation*)dets[i]->getInterpolation(nb,emi,ema);
// cout << i << endl;
p0 = dets[i]->getPedestal(p0);
if (p0) {
if (i == 0) {
//p0 = dets[thread_id]->getPedestal(p0);
dets[thread_id]->getPedestal(p0);
if (p0) { /*
if (i == 0) {
// If first thread, initialize ped with first thread's values
for (int ib = 0; ib < nx * ny; ib++) {
ped[ib] = p0[ib] / ((double)nThreads);
// cout << p0[ib] << " ";
}
} else {
for (int ib = 0; ib < nx * ny; ib++) {
ped[ib] += p0[ib] / ((double)nThreads);
// cout << p0[ib] << " ";
}
*/
// For subsequent threads, accumulate pedestal values
// if ( i == 0 ) becomes superfluous if we zero-initialize earlier
for (int ib = 0; ib < nx * ny; ib++) {
sc_pedestals[sc][ib] += p0[ib] / ((double)num_threads);
// cout << p0[ib] << " ";
}
//}
}
}
delete[] p0;
return ped;
return sc_pedestals[sc];
};
virtual double *getPedestalRMS() {
// Storage cell sensitive
virtual double *getPedestalRMS(int sc = 0) {
int nx, ny;
dets[0]->getDetectorSize(nx, ny);
// if (ped) delete [] ped;
double *rms = new double[nx * ny];
if (sc_pedestals_rms.count(sc) && sc_pedestals_rms[sc]) {
delete[] sc_pedestals_rms[sc];
sc_pedestals_rms = nullptr;
}
// allocate memory and initialize all values to zero
sc_pedestals_rms[sc] = new double[nx * ny](); // Zero-initialize
//std::fill(sc_pedestals_rms[sc], sc_pedestals_rms[sc] + (nx * ny), 0.0); // explicit zero initialization
//double *rms = sc_pedestals_rms[sc];
double *p0 = new double[nx * ny];
for (int i = 0; i < nThreads; i++) {
// Get the threads assigned to this storage cell
auto const& assigned_threads = sc_to_threads[sc];
int num_threads = assigned_threads.size();
// Only iterate over threads assigned to this storage cell
for (int thread_id : assigned_threads) {
// inte=(slsInterpolation*)dets[i]->getInterpolation(nb,emi,ema);
// cout << i << endl;
p0 = dets[i]->getPedestalRMS(p0);
if (p0) {
if (i == 0) {
//p0 = dets[thread_id]->getPedestalRMS(p0);
dets[thread_id]->getPedestalRMS(p0);
for (int ib = 0; ib < nx * ny; ib++) {
rms[ib] = p0[ib] * p0[ib] / ((double)nThreads);
if (p0) {
for (int ib = 0; ib < nx * ny; ib++) {
sc_pedestals_rms[sc][ib] += (p0[ib] * p0[ib]) / ((double)num_threads);
// cout << p0[ib] << " ";
}
} else {
for (int ib = 0; ib < nx * ny; ib++) {
rms[ib] += p0[ib] * p0[ib] / ((double)nThreads);
// cout << p0[ib] << " ";
}
}
}
}
delete[] p0;
@ -608,9 +671,10 @@ class multiThreadedAnalogDetector {
/* rms[ib]=0; */
/* } */
return rms;
return sc_pedestals_rms[sc];
};
// Does not differentiate for storage cells!
virtual double *setPedestal(double *h = NULL) {
// int nb=0;
@ -625,44 +689,79 @@ class multiThreadedAnalogDetector {
return NULL;
};
virtual void *writePedestal(const char *imgname) {
// Storage cell sensitive
virtual void *writePedestal(char const* base_imgname) {
int nx, ny;
dets[0]->getDetectorSize(nx, ny);
getPedestal();
float *gm = new float[nx * ny];
if (gm) {
for (int ix = 0; ix < nx * ny; ix++) {
gm[ix] = ped[ix];
}
WriteToTiff(gm, imgname, nx, ny);
delete[] gm;
} else
std::cout << "Could not allocate float image " << std::endl;
//float *gm = new float[nx * ny];
if(ped)
delete[] ped; //Memory cleanup
// Loop over each storage cell
for ( auto const& entry : sc_to_threads ) {
int sc = entry.first;
std::string imgname = std::string(base_imgname);
if (nSC>1)
imgname += "_SC" + std::to_string(sc);
imgname += ".tiff";
getPedestal(sc); // Compute pedestal for this storage cell
std::vector<float> gm(nx * ny);
// Copy pedestal data into the float array
/*
for (int ix = 0; ix < nx * ny; ix++) {
gm[ix] = sc_pedestals[sc][ix];
}
*/
std::copy(sc_pedestals[sc], sc_pedestals[sc] + (nx * ny), gm.data());
WriteToTiff(gm.data(), imgname.c_str(), nx, ny);
// Clean up memory
if(sc_pedestals[sc]) {
delete[] sc_pedestals[sc];
sc_pedestals[sc] = nullptr;
}
}
return NULL;
};
virtual void *writePedestalRMS(const char *imgname) {
// Storage cell sensitive
virtual void *writePedestalRMS(char const* base_imgname) {
int nx, ny;
dets[0]->getDetectorSize(nx, ny);
double *rms = getPedestalRMS();
float *gm = new float[nx * ny];
if (gm) {
//float *gm = new float[nx * ny];
// Loop over each stoarge cell
for ( auto const& entry : sc_to_threads ) {
int sc = entry.first;
std::string imgname = std::string(base_imgname);
if (nSC>1)
imgname += "_SC" + std::to_string(sc);
imgname += ".tiff";
double *rms = getPedestalRMS(sc); // Compute pedestal RMS for this storage cell
std::vector<float> gm(nx * ny);
// Copy rms data into the float array
/*
for (int ix = 0; ix < nx * ny; ix++) {
gm[ix] = rms[ix];
gm[ix] = rms[ix];
}
WriteToTiff(gm, imgname, nx, ny);
delete[] gm;
*/
std::copy(rms, rms + (nx * ny), gm.data());
WriteToTiff(gm.data(), imgname.c_str(), nx, ny);
// Clean up memory
delete[] rms;
} else
std::cout << "Could not allocate float image " << std::endl;
if(sc_pedestals_rms[sc]) {
delete[] sc_pedestals_rms[sc];
sc_pedestals_rms[sc] = nullptr;
}
}
return NULL;
};
@ -713,13 +812,19 @@ class multiThreadedAnalogDetector {
const int nThreads;
threadedAnalogDetector *dets[MAXTHREADS];
analogDetector<uint16_t> *dd[MAXTHREADS];
int ithread;
//int ithread{0}; // Thread index
std::vector<int> thread_counters_by_sc{}; // Round-robin counters for threads for each storage cell
int* image;
int* ff;
double* ped;
//pthread_mutex_t fmutex; //unused
std::unordered_map< int, std::vector<int> > sc_to_threads; // Maps storage cell -> assigned threads
std::mutex map_mutex; // Ensure thread safe access to the map
std::unordered_map< int, std::vector<int> > sc_to_threads; // Maps storage cell -> vector of assigned thread ids
std::mutex map_mutex; // Ensure thread-safe access to the map
int nSC{1}; // Number of storage cells
std::unordered_map<int, double*> sc_pedestals; // Store pedestal arrays per storage cell
std::unordered_map<int, double*> sc_pedestals_rms; // Store pedestal RMS arrays per storage cell
// at the moment, these maps could be avoided, but this implementation is more robust in allowing future changes
};
#endif