diff --git a/slsDetectorCalibration/circularFifo.h b/slsDetectorCalibration/circularFifo.h index c92199315..45eecf471 100644 --- a/slsDetectorCalibration/circularFifo.h +++ b/slsDetectorCalibration/circularFifo.h @@ -50,6 +50,8 @@ template class CircularFifo { mutable sem_t data_mutex; mutable sem_t free_mutex; unsigned int increment(unsigned int idx_) const; + int id_; + int thread_id_; }; template int CircularFifo::getDataValue() const { @@ -74,14 +76,18 @@ template int CircularFifo::getFreeValue() const { template bool CircularFifo::push(Element *&item_, bool no_block) { // check for fifo full - if (no_block && isFull()) - return false; + if (no_block && isFull()) { + //std::cout << "Full Fifo at push. Returning." << std::endl; + return false; // No space, return immediately + } - sem_wait(&free_mutex); - array[tail] = item_; - tail = increment(tail); - sem_post(&data_mutex); - return true; + //std::cout << "Thread " << thread_id_ <<" Push Fifo " << id_ << " item " << static_cast(item_) << std::endl; + + sem_wait(&free_mutex); // Wait for space + array[tail] = item_; // Add item to the buffer + tail = increment(tail); // Move the tail pointer + sem_post(&data_mutex); // Signal that there is new data + return true; // Success } /** Consumer only: Removes and returns item from the queue @@ -94,14 +100,18 @@ bool CircularFifo::push(Element *&item_, bool no_block) { template bool CircularFifo::pop(Element *&item_, bool no_block) { // check for fifo empty - if (no_block && isEmpty()) - return false; + if (no_block && isEmpty()) { + //std::cout << "Empty Fifo at pop. Returning." << std::endl; + return false; // No data in fifo, return immediately + } - sem_wait(&data_mutex); - item_ = array[head]; - head = increment(head); - sem_post(&free_mutex); - return true; + //std::cout << "Thread " << thread_id_ << " Pop Fifo " << id_ << " item " << static_cast(item_) << std::endl; + + sem_wait(&data_mutex); // Wait for data + item_ = array[head]; // Retreive item from the current head of the buffer + head = increment(head); // Move the head pointer (to point to the next item) + sem_post(&free_mutex); // Signal that there is new free space available + return true; //Success } /** Useful for testinng and Consumer check of status diff --git a/slsDetectorCalibration/jungfrauExecutables/jungfrauRawDataProcess_filetxtH5.cpp b/slsDetectorCalibration/jungfrauExecutables/jungfrauRawDataProcess_filetxtH5.cpp index 9fbdf559e..9e3263676 100644 --- a/slsDetectorCalibration/jungfrauExecutables/jungfrauRawDataProcess_filetxtH5.cpp +++ b/slsDetectorCalibration/jungfrauExecutables/jungfrauRawDataProcess_filetxtH5.cpp @@ -78,7 +78,7 @@ int main(int argc, char *argv[]) { return 1; } - int const fifosize = 1000; + int const fifosize = 100; //1000; int const nthreads = 10; int const csize = 3; // 3 int const nsigma = 5; @@ -330,7 +330,7 @@ int main(int argc, char *argv[]) { } //mt->setFilePointer(of); //std::cout << "file pointer set " << std::endl; - std::cout << "Here! " << framenumber << " "; + //std::cout << "Here! " << framenumber << " "; } else { std::cout << "Could not open " << cfname << " for writing " << std::endl; @@ -356,7 +356,9 @@ int main(int argc, char *argv[]) { // // //pop mt->nextThread(); - mt->popFree(buff); + mt->popFree(buff); /* In the last execution of the loop, + * this leaves buff outside of the Fifo! + * Free explicitely at the end! */ ++ifr; if (ifr % 1000 == 0) @@ -372,15 +374,16 @@ int main(int argc, char *argv[]) { framenumber = -1; } - std::cout << "aa --" << std::endl; + + //std::cout << "aa --" << std::endl; fileh5->CloseResources(); - std::cout << "bb --" << std::endl; + //std::cout << "bb --" << std::endl; while (mt->isBusy()) { ; } - std::cout << "cc --" << std::endl; + //std::cout << "cc --" << std::endl; if (nframes >= 0) { if (nframes > 0) imgfname = createFileName( outdir, fprefix, fsuffix, "tiff", ioutfile ); @@ -401,18 +404,21 @@ int main(int argc, char *argv[]) { << std::endl; } if (nframes < 0) { - std::string fprefix( getRootString(filenames[0]) ); //This might by a non-ideal name choice for that file + std::string fprefix( getRootString(filenames[0]) ); //Possibly, non-ideal name choice for file std::string imgfname( createFileName( outdir, fprefix, "sum", "tiff" ) ); std::cout << "Writing tiff to " << imgfname << " " << thr1 << std::endl; mt->writeImage(imgfname.c_str(), thr1); } - //delete decoder - //delete filter; - delete mt; - delete filter; + + //std::cout << "Calling delete..." << std::endl; + /* Info: Previously, 'delete mt' caused crash + (double calls of StopThread() in both destructors of + multiThreadedAnalogDetector and threadedAnalogDetector) + Now fixed! */ + delete mt; // triggers cleanup of all threads and singlePhotonDetector instances (delete filter is obsolete) delete decoder; - delete buff; + free(buff); // Free explicitly as it gets popped out of the Fifo at termination of while(readNextFrame) return 0; } diff --git a/slsDetectorCalibration/multiThreadedAnalogDetector.h b/slsDetectorCalibration/multiThreadedAnalogDetector.h index eb9b0215c..aa420a5b9 100644 --- a/slsDetectorCalibration/multiThreadedAnalogDetector.h +++ b/slsDetectorCalibration/multiThreadedAnalogDetector.h @@ -47,12 +47,20 @@ class threadedAnalogDetector { if (mm) { // memset(mm,0, det->getDataSize()); + /* + if (i == 0) { // debug + first_mm = mm; + } + */ + fifoFree->push(mm); + //std::cout << "Allocated memory at: " << static_cast(mm) << " (fifoslot " << i << ")" << std::endl; } else break; } + if (i < fs) - cout << "Could allocate only " << i << " frames"; + std::cout << "Could allocate only " << i << " frames"; busy = 0; stop = 1; @@ -102,24 +110,45 @@ class threadedAnalogDetector { }; virtual ~threadedAnalogDetector() { + StopThread(); + delete fifoFree; delete fifoData; + delete det; // Call destructor for singlePhotonDetector } /** Returns true if the thread was successfully started, false if there was * an error starting the thread */ virtual bool StartThread() { stop = 0; - cout << "Detector number " << det->getId() << endl; - cout << "common mode is " << det->getCommonModeSubtraction() << endl; - cout << "ghos summation is " << det->getGhostSummation() << endl; + std::cout << "Detector number " << det->getId() << std::endl; + std::cout << "common mode is " << det->getCommonModeSubtraction() << std::endl; + std::cout << "ghos summation is " << det->getGhostSummation() << std::endl; + return (pthread_create(&_thread, NULL, processData, this) == 0); } virtual void StopThread() { stop = 1; - (void)pthread_join(_thread, NULL); + //std::cout << "Attempting to stop thread..." << std::endl; + + // Free all remaining allocated memory in fifoFree + char *mm = nullptr; + while (fifoFree->pop(mm, true)) { // Use no_block to avoid waiting + //std::cout << "fifo Free: Freeing memory at: " << static_cast(mm) << std::endl; + free(mm); // Free the allocated memory + } + + if (_thread) { + //(void)pthread_join(_thread, NULL); + //std::cout << "Calling pthread_join for thread: " << det->getId() << std::endl; + pthread_join(_thread, NULL); + _thread = 0; + std::cout << "Thread " << det->getId() << " stopped and joined." << std::endl; + } else { + std::cout << "No thread to join." << std::endl; + } } virtual bool pushData(char *&ptr) { return fifoData->push(ptr); } @@ -266,6 +295,8 @@ class threadedAnalogDetector { char *data; int *ff; + //char* first_mm = nullptr; // For debug; to track first allocated block + static void *processData(void *ptr) { threadedAnalogDetector *This = ((threadedAnalogDetector *)ptr); return This->processData(); @@ -278,18 +309,32 @@ class threadedAnalogDetector { usleep(100); if (fifoData->isEmpty()) { busy = 0; - } else + } else { busy = 1; - } else + } + } else { busy = 1; + } if (busy == 1) { + // Check stop flag before making a blocking call + //if (stop) { + // break; + //} + + // Blocking call fifoData->pop(data); // blocking! + + // Process data if not stopping + //if (!stop) { det->processData(data); fifoFree->push(data); + + //} // busy=0; } } + return NULL; } }; @@ -314,19 +359,25 @@ class multiThreadedAnalogDetector { dets[i] = new threadedAnalogDetector(dd[i], fs); } - image = NULL; + image = nullptr; ff = NULL; ped = NULL; - cout << "Ithread is " << ithread << endl; + std::cout << "Ithread is " << ithread << std::endl; } virtual ~multiThreadedAnalogDetector() { - StopThreads(); - for (int i = 0; i < nThreads; i++) - delete dets[i]; - /* for (int i=1; i= 0; --i) { + //std::cout << "Deleting dets[" << i << "]" << std::endl; + delete dets[i]; //StopThread() called by each ~threadedAnalogDetector() + } + } virtual int setFrameMode(int fm) { @@ -359,31 +410,31 @@ class multiThreadedAnalogDetector { dets[i]->newDataSet(); }; - virtual int *getImage(int &nnx, int &nny, int &ns, int &nsy) { - int *img; + virtual long long int *getImage(int &nnx, int &nny, int &ns, int &nsy) { + //int *img; // int nnx, nny, ns; // int nnx, nny, ns; int nn = dets[0]->getImageSize(nnx, nny, ns, nsy); if (image) { delete[] image; - image = NULL; + image = nullptr; } - image = new int[nn]; + image = new long long int[nn]; // int nn=dets[0]->getImageSize(nnx, nny, ns); // for (i=0; igetImage(); + int* tmp_img = dets[ii]->getImage(); for (int i = 0; i < nn; i++) { if (ii == 0) // if (img[i]>0) - image[i] = img[i]; + image[i] = static_cast(tmp_img[i]); // else // image[i]=0; else // if (img[i]>0) - image[i] += img[i]; + image[i] += static_cast(tmp_img[i]); // if (img[i]) cout << "det " << ii << " pix " << i << " val // " << img[i] << " " << image[i] << endl; } @@ -428,7 +479,13 @@ class multiThreadedAnalogDetector { WriteToTiff(gm, imgname, nnx, nny); delete[] gm; } else - cout << "Could not allocate float image " << endl; + std::cout << "Could not allocate float image " << std::endl; + + if(image) { + delete[] image; // Memory cleanup (VH) + image = nullptr; + } + return NULL; } @@ -439,6 +496,7 @@ class multiThreadedAnalogDetector { } virtual void StopThreads() { + std::cout << "Stopping all threads ..." << std::endl; for (int i = 0; i < nThreads; i++) dets[i]->StopThread(); } @@ -564,7 +622,10 @@ class multiThreadedAnalogDetector { WriteToTiff(gm, imgname, nx, ny); delete[] gm; } else - cout << "Could not allocate float image " << endl; + std::cout << "Could not allocate float image " << std::endl; + + if(ped) + delete[] ped; //Memory cleanup return NULL; }; @@ -584,7 +645,7 @@ class multiThreadedAnalogDetector { delete[] gm; delete[] rms; } else - cout << "Could not allocate float image " << endl; + std::cout << "Could not allocate float image " << std::endl; return NULL; }; @@ -636,10 +697,10 @@ class multiThreadedAnalogDetector { threadedAnalogDetector *dets[MAXTHREADS]; analogDetector *dd[MAXTHREADS]; int ithread; - int *image; + long long int *image; int *ff; double *ped; - pthread_mutex_t fmutex; + //pthread_mutex_t fmutex; //unused }; #endif diff --git a/slsDetectorCalibration/singlePhotonDetector.h b/slsDetectorCalibration/singlePhotonDetector.h index 61df4bbcd..25854f3fd 100644 --- a/slsDetectorCalibration/singlePhotonDetector.h +++ b/slsDetectorCalibration/singlePhotonDetector.h @@ -60,10 +60,10 @@ class singlePhotonDetector : public analogDetector { : analogDetector(d, sign, cm, nped, nnx, nny, gm, gs), nDark(nd), eventMask(NULL), nSigma(nsigma), eMin(-1), eMax(-1), clusterSize(csize), clusterSizeY(csize), c2(1), c3(1), clusters(NULL), - quad(UNDEFINED_QUADRANT), tot(0), quadTot(0) { + quad(UNDEFINED_QUADRANT), tot(0), quadTot(0), ownsMutex(true) { // The original object owns the mutex { fm = new pthread_mutex_t; - pthread_mutex_init(fm, NULL); + pthread_mutex_init(fm, NULL); eventMask = new eventType *[ny]; // val=new double*[ny]; @@ -86,14 +86,21 @@ class singlePhotonDetector : public analogDetector { nphFrame = 0; }; /** - destructor. Deletes the cluster structure, the pdestalSubtraction and the - image array + Destructor. Deletes the cluster structure, event mask, and destroys the mutex. */ virtual ~singlePhotonDetector() { delete[] clusters; for (int i = 0; i < ny; i++) delete[] eventMask[i]; delete[] eventMask; + if (ownsMutex) { + if (fm) { + pthread_mutex_destroy(fm); // Destroy the mutex + delete fm; // Free the memory allocated for the mutex + fm = nullptr; // Set the pointer to nullptr to avoid dangling pointer + } + } + }; /** @@ -103,7 +110,7 @@ class singlePhotonDetector : public analogDetector { */ singlePhotonDetector(singlePhotonDetector *orig) - : analogDetector(orig) { + : analogDetector(orig), fm(orig->fm), ownsMutex(false) { nDark = orig->nDark; myFile = orig->myFile; @@ -130,7 +137,7 @@ class singlePhotonDetector : public analogDetector { // cluster=clusters; setClusterSize(clusterSize); - fm = orig->fm; + //fm = orig->fm; quad = UNDEFINED_QUADRANT; tot = 0; @@ -381,7 +388,7 @@ class singlePhotonDetector : public analogDetector { // int ir, ic; eventType ee; - double max = 0, tl = 0, tr = 0, bl = 0, br = 0, *v; + double max = 0, tl = 0, tr = 0, bl = 0, br = 0, v = 0;//, *v; int cm = 0; int good = 1; int ir, ic; @@ -403,7 +410,8 @@ class singlePhotonDetector : public analogDetector { cm = 1; } - double *val = new double[ny * nx]; + //double *val = new double[ny * nx]; + std::vector val( ny * nx ); for (int iy = ymin; iy < ymax; ++iy) { for (int ix = xmin; ix < xmax; ++ix) { @@ -435,26 +443,34 @@ class singlePhotonDetector : public analogDetector { (ix + ic) >= 0 && (ix + ic) < nx) { - if ((iy + ir) > iy && (ix + ic) > ix ) { + if ((iy + ir) > iy && (ix + ic) > ix ) { - val[(iy + ir) * nx + ix + ic] = - subtractPedestal(data, ix + ic, iy + ir, cm); + val[(iy + ir) * nx + ix + ic] = + subtractPedestal(data, ix + ic, iy + ir, cm); - } - v = &(val[(iy + ir) * nx + ix + ic]); - tot += *v; - if (ir <= 0 && ic <= 0) - bl += *v; - if (ir <= 0 && ic >= 0) - br += *v; - if (ir >= 0 && ic <= 0) - tl += *v; - if (ir >= 0 && ic >= 0) - tr += *v; - if (*v > max) //{ - max = *v; - //} + } + //v = &(val[(iy + ir) * nx + ix + ic]); + v = val[(iy + ir) * nx + ix + ic]; + //tot += *v; + tot += v; + if (ir <= 0 && ic <= 0) + bl += v; + //bl += *v; + if (ir <= 0 && ic >= 0) + br += v; + //br += *v; + if (ir >= 0 && ic <= 0) + tl += v; + //tl += *v; + if (ir >= 0 && ic >= 0) + tr += v; + //tr += *v; + //if (*v > max) //{ + //max = *v; + if (v > max) + max = v; + //} } } } @@ -523,19 +539,19 @@ class singlePhotonDetector : public analogDetector { ic < (clusterSize / 2) + 1; ic++) { if ((iy + ir) >= 0 && (iy + ir) < ny && (ix + ic) >= 0 && (ix + ic) < nx) { - (clusters + nph) + (clusters + nph) ->set_data(val[(iy + ir) * nx + ix + ic], ic, ir); - if (val[(iy + ir) * nx + ix + ic]>max) - good=0; - } + if (val[(iy + ir) * nx + ix + ic]>max) + good=0; + } } } - if (good==0) { - (clusters + nph)->print(); - cout << max << " " << val[iy * nx + ix] << endl; - } - //else (clusters + nph)->print(); + if (good==0) { + (clusters + nph)->print(); + cout << max << " " << val[iy * nx + ix] << endl; + } + //else (clusters + nph)->print(); good = 1; if (eMin > 0 && tot < eMin) good = 0; @@ -561,7 +577,7 @@ class singlePhotonDetector : public analogDetector { // cout <getFrameNumber(data) << " " << nphFrame << endl; writeClusters(det->getFrameNumber(data)); - delete[] val; + //delete[] val; return image; }; @@ -733,7 +749,8 @@ class singlePhotonDetector : public analogDetector { int nphFrame; // double **val; - pthread_mutex_t *fm; + pthread_mutex_t* fm; // Pointer to the shared mutex + bool ownsMutex; // Flag to indicate ownership }; #endif