Fix segmentation fault caused by double delete of threads and resolve memory leaks

This commit is contained in:
hinger_v 2024-10-11 17:14:55 +02:00
parent 3150276bfe
commit f0fc547c6f
4 changed files with 183 additions and 89 deletions

View File

@ -50,6 +50,8 @@ template <typename Element> class CircularFifo {
mutable sem_t data_mutex; mutable sem_t data_mutex;
mutable sem_t free_mutex; mutable sem_t free_mutex;
unsigned int increment(unsigned int idx_) const; unsigned int increment(unsigned int idx_) const;
int id_;
int thread_id_;
}; };
template <typename Element> int CircularFifo<Element>::getDataValue() const { template <typename Element> int CircularFifo<Element>::getDataValue() const {
@ -74,14 +76,18 @@ template <typename Element> int CircularFifo<Element>::getFreeValue() const {
template <typename Element> template <typename Element>
bool CircularFifo<Element>::push(Element *&item_, bool no_block) { bool CircularFifo<Element>::push(Element *&item_, bool no_block) {
// check for fifo full // check for fifo full
if (no_block && isFull()) if (no_block && isFull()) {
return false; //std::cout << "Full Fifo at push. Returning." << std::endl;
return false; // No space, return immediately
}
sem_wait(&free_mutex); //std::cout << "Thread " << thread_id_ <<" Push Fifo " << id_ << " item " << static_cast<void*>(item_) << std::endl;
array[tail] = item_;
tail = increment(tail); sem_wait(&free_mutex); // Wait for space
sem_post(&data_mutex); array[tail] = item_; // Add item to the buffer
return true; tail = increment(tail); // Move the tail pointer
sem_post(&data_mutex); // Signal that there is new data
return true; // Success
} }
/** Consumer only: Removes and returns item from the queue /** Consumer only: Removes and returns item from the queue
@ -94,14 +100,18 @@ bool CircularFifo<Element>::push(Element *&item_, bool no_block) {
template <typename Element> template <typename Element>
bool CircularFifo<Element>::pop(Element *&item_, bool no_block) { bool CircularFifo<Element>::pop(Element *&item_, bool no_block) {
// check for fifo empty // check for fifo empty
if (no_block && isEmpty()) if (no_block && isEmpty()) {
return false; //std::cout << "Empty Fifo at pop. Returning." << std::endl;
return false; // No data in fifo, return immediately
}
sem_wait(&data_mutex); //std::cout << "Thread " << thread_id_ << " Pop Fifo " << id_ << " item " << static_cast<void*>(item_) << std::endl;
item_ = array[head];
head = increment(head); sem_wait(&data_mutex); // Wait for data
sem_post(&free_mutex); item_ = array[head]; // Retreive item from the current head of the buffer
return true; head = increment(head); // Move the head pointer (to point to the next item)
sem_post(&free_mutex); // Signal that there is new free space available
return true; //Success
} }
/** Useful for testinng and Consumer check of status /** Useful for testinng and Consumer check of status

View File

@ -78,7 +78,7 @@ int main(int argc, char *argv[]) {
return 1; return 1;
} }
int const fifosize = 1000; int const fifosize = 100; //1000;
int const nthreads = 10; int const nthreads = 10;
int const csize = 3; // 3 int const csize = 3; // 3
int const nsigma = 5; int const nsigma = 5;
@ -330,7 +330,7 @@ int main(int argc, char *argv[]) {
} }
//mt->setFilePointer(of); //mt->setFilePointer(of);
//std::cout << "file pointer set " << std::endl; //std::cout << "file pointer set " << std::endl;
std::cout << "Here! " << framenumber << " "; //std::cout << "Here! " << framenumber << " ";
} else { } else {
std::cout << "Could not open " << cfname std::cout << "Could not open " << cfname
<< " for writing " << std::endl; << " for writing " << std::endl;
@ -356,7 +356,9 @@ int main(int argc, char *argv[]) {
// // //pop // // //pop
mt->nextThread(); mt->nextThread();
mt->popFree(buff); mt->popFree(buff); /* In the last execution of the loop,
* this leaves buff outside of the Fifo!
* Free explicitely at the end! */
++ifr; ++ifr;
if (ifr % 1000 == 0) if (ifr % 1000 == 0)
@ -372,15 +374,16 @@ int main(int argc, char *argv[]) {
framenumber = -1; framenumber = -1;
} }
std::cout << "aa --" << std::endl;
//std::cout << "aa --" << std::endl;
fileh5->CloseResources(); fileh5->CloseResources();
std::cout << "bb --" << std::endl; //std::cout << "bb --" << std::endl;
while (mt->isBusy()) { while (mt->isBusy()) {
; ;
} }
std::cout << "cc --" << std::endl; //std::cout << "cc --" << std::endl;
if (nframes >= 0) { if (nframes >= 0) {
if (nframes > 0) if (nframes > 0)
imgfname = createFileName( outdir, fprefix, fsuffix, "tiff", ioutfile ); imgfname = createFileName( outdir, fprefix, fsuffix, "tiff", ioutfile );
@ -401,18 +404,21 @@ int main(int argc, char *argv[]) {
<< std::endl; << std::endl;
} }
if (nframes < 0) { if (nframes < 0) {
std::string fprefix( getRootString(filenames[0]) ); //This might by a non-ideal name choice for that file std::string fprefix( getRootString(filenames[0]) ); //Possibly, non-ideal name choice for file
std::string imgfname( createFileName( outdir, fprefix, "sum", "tiff" ) ); std::string imgfname( createFileName( outdir, fprefix, "sum", "tiff" ) );
std::cout << "Writing tiff to " << imgfname << " " << thr1 << std::endl; std::cout << "Writing tiff to " << imgfname << " " << thr1 << std::endl;
mt->writeImage(imgfname.c_str(), thr1); mt->writeImage(imgfname.c_str(), thr1);
} }
//delete decoder
//delete filter; //std::cout << "Calling delete..." << std::endl;
delete mt; /* Info: Previously, 'delete mt' caused crash
delete filter; (double calls of StopThread() in both destructors of
multiThreadedAnalogDetector and threadedAnalogDetector)
Now fixed! */
delete mt; // triggers cleanup of all threads and singlePhotonDetector instances (delete filter is obsolete)
delete decoder; delete decoder;
delete buff; free(buff); // Free explicitly as it gets popped out of the Fifo at termination of while(readNextFrame)
return 0; return 0;
} }

View File

@ -47,12 +47,20 @@ class threadedAnalogDetector {
if (mm) { if (mm) {
// memset(mm,0, det->getDataSize()); // memset(mm,0, det->getDataSize());
/*
if (i == 0) { // debug
first_mm = mm;
}
*/
fifoFree->push(mm); fifoFree->push(mm);
//std::cout << "Allocated memory at: " << static_cast<void*>(mm) << " (fifoslot " << i << ")" << std::endl;
} else } else
break; break;
} }
if (i < fs) if (i < fs)
cout << "Could allocate only " << i << " frames"; std::cout << "Could allocate only " << i << " frames";
busy = 0; busy = 0;
stop = 1; stop = 1;
@ -102,24 +110,45 @@ class threadedAnalogDetector {
}; };
virtual ~threadedAnalogDetector() { virtual ~threadedAnalogDetector() {
StopThread(); StopThread();
delete fifoFree; delete fifoFree;
delete fifoData; delete fifoData;
delete det; // Call destructor for singlePhotonDetector
} }
/** Returns true if the thread was successfully started, false if there was /** Returns true if the thread was successfully started, false if there was
* an error starting the thread */ * an error starting the thread */
virtual bool StartThread() { virtual bool StartThread() {
stop = 0; stop = 0;
cout << "Detector number " << det->getId() << endl; std::cout << "Detector number " << det->getId() << std::endl;
cout << "common mode is " << det->getCommonModeSubtraction() << endl; std::cout << "common mode is " << det->getCommonModeSubtraction() << std::endl;
cout << "ghos summation is " << det->getGhostSummation() << endl; std::cout << "ghos summation is " << det->getGhostSummation() << std::endl;
return (pthread_create(&_thread, NULL, processData, this) == 0); return (pthread_create(&_thread, NULL, processData, this) == 0);
} }
virtual void StopThread() { virtual void StopThread() {
stop = 1; stop = 1;
(void)pthread_join(_thread, NULL); //std::cout << "Attempting to stop thread..." << std::endl;
// Free all remaining allocated memory in fifoFree
char *mm = nullptr;
while (fifoFree->pop(mm, true)) { // Use no_block to avoid waiting
//std::cout << "fifo Free: Freeing memory at: " << static_cast<void*>(mm) << std::endl;
free(mm); // Free the allocated memory
}
if (_thread) {
//(void)pthread_join(_thread, NULL);
//std::cout << "Calling pthread_join for thread: " << det->getId() << std::endl;
pthread_join(_thread, NULL);
_thread = 0;
std::cout << "Thread " << det->getId() << " stopped and joined." << std::endl;
} else {
std::cout << "No thread to join." << std::endl;
}
} }
virtual bool pushData(char *&ptr) { return fifoData->push(ptr); } virtual bool pushData(char *&ptr) { return fifoData->push(ptr); }
@ -266,6 +295,8 @@ class threadedAnalogDetector {
char *data; char *data;
int *ff; int *ff;
//char* first_mm = nullptr; // For debug; to track first allocated block
static void *processData(void *ptr) { static void *processData(void *ptr) {
threadedAnalogDetector *This = ((threadedAnalogDetector *)ptr); threadedAnalogDetector *This = ((threadedAnalogDetector *)ptr);
return This->processData(); return This->processData();
@ -278,18 +309,32 @@ class threadedAnalogDetector {
usleep(100); usleep(100);
if (fifoData->isEmpty()) { if (fifoData->isEmpty()) {
busy = 0; busy = 0;
} else } else {
busy = 1; busy = 1;
} else }
} else {
busy = 1; busy = 1;
}
if (busy == 1) { if (busy == 1) {
// Check stop flag before making a blocking call
//if (stop) {
// break;
//}
// Blocking call
fifoData->pop(data); // blocking! fifoData->pop(data); // blocking!
// Process data if not stopping
//if (!stop) {
det->processData(data); det->processData(data);
fifoFree->push(data); fifoFree->push(data);
//}
// busy=0; // busy=0;
} }
} }
return NULL; return NULL;
} }
}; };
@ -314,19 +359,25 @@ class multiThreadedAnalogDetector {
dets[i] = new threadedAnalogDetector(dd[i], fs); dets[i] = new threadedAnalogDetector(dd[i], fs);
} }
image = NULL; image = nullptr;
ff = NULL; ff = NULL;
ped = NULL; ped = NULL;
cout << "Ithread is " << ithread << endl; std::cout << "Ithread is " << ithread << std::endl;
} }
virtual ~multiThreadedAnalogDetector() { virtual ~multiThreadedAnalogDetector() {
StopThreads(); //std::cout << "Destructing multiThreadedAnalogDetector..." << std::endl;
for (int i = 0; i < nThreads; i++) //StopThreads(); // Superfluous, leads to double delete
delete dets[i];
/* for (int i=1; i<nThreads; i++) */ /* Reverse loop for destruction.
/* delete dd[i]; */ * Deletes clones first, then root object, which owns the mutex
// delete [] image; * (ensure shared mutex is deleted last).
* Optional solution: reference counting (safer but more complex) */
for (int i = nThreads - 1; i >= 0; --i) {
//std::cout << "Deleting dets[" << i << "]" << std::endl;
delete dets[i]; //StopThread() called by each ~threadedAnalogDetector()
}
} }
virtual int setFrameMode(int fm) { virtual int setFrameMode(int fm) {
@ -359,31 +410,31 @@ class multiThreadedAnalogDetector {
dets[i]->newDataSet(); dets[i]->newDataSet();
}; };
virtual int *getImage(int &nnx, int &nny, int &ns, int &nsy) { virtual long long int *getImage(int &nnx, int &nny, int &ns, int &nsy) {
int *img; //int *img;
// int nnx, nny, ns; // int nnx, nny, ns;
// int nnx, nny, ns; // int nnx, nny, ns;
int nn = dets[0]->getImageSize(nnx, nny, ns, nsy); int nn = dets[0]->getImageSize(nnx, nny, ns, nsy);
if (image) { if (image) {
delete[] image; delete[] image;
image = NULL; image = nullptr;
} }
image = new int[nn]; image = new long long int[nn];
// int nn=dets[0]->getImageSize(nnx, nny, ns); // int nn=dets[0]->getImageSize(nnx, nny, ns);
// for (i=0; i<nn; i++) image[i]=0; // for (i=0; i<nn; i++) image[i]=0;
for (int ii = 0; ii < nThreads; ii++) { for (int ii = 0; ii < nThreads; ii++) {
// cout << ii << " " << nn << " " << nnx << " " << nny << " " << ns // cout << ii << " " << nn << " " << nnx << " " << nny << " " << ns
// << endl; // << endl;
img = dets[ii]->getImage(); int* tmp_img = dets[ii]->getImage();
for (int i = 0; i < nn; i++) { for (int i = 0; i < nn; i++) {
if (ii == 0) if (ii == 0)
// if (img[i]>0) // if (img[i]>0)
image[i] = img[i]; image[i] = static_cast<long long int>(tmp_img[i]);
// else // else
// image[i]=0; // image[i]=0;
else // if (img[i]>0) else // if (img[i]>0)
image[i] += img[i]; image[i] += static_cast<long long int>(tmp_img[i]);
// if (img[i]) cout << "det " << ii << " pix " << i << " val // if (img[i]) cout << "det " << ii << " pix " << i << " val
// " << img[i] << " " << image[i] << endl; // " << img[i] << " " << image[i] << endl;
} }
@ -428,7 +479,13 @@ class multiThreadedAnalogDetector {
WriteToTiff(gm, imgname, nnx, nny); WriteToTiff(gm, imgname, nnx, nny);
delete[] gm; delete[] gm;
} else } else
cout << "Could not allocate float image " << endl; std::cout << "Could not allocate float image " << std::endl;
if(image) {
delete[] image; // Memory cleanup (VH)
image = nullptr;
}
return NULL; return NULL;
} }
@ -439,6 +496,7 @@ class multiThreadedAnalogDetector {
} }
virtual void StopThreads() { virtual void StopThreads() {
std::cout << "Stopping all threads ..." << std::endl;
for (int i = 0; i < nThreads; i++) for (int i = 0; i < nThreads; i++)
dets[i]->StopThread(); dets[i]->StopThread();
} }
@ -564,7 +622,10 @@ class multiThreadedAnalogDetector {
WriteToTiff(gm, imgname, nx, ny); WriteToTiff(gm, imgname, nx, ny);
delete[] gm; delete[] gm;
} else } else
cout << "Could not allocate float image " << endl; std::cout << "Could not allocate float image " << std::endl;
if(ped)
delete[] ped; //Memory cleanup
return NULL; return NULL;
}; };
@ -584,7 +645,7 @@ class multiThreadedAnalogDetector {
delete[] gm; delete[] gm;
delete[] rms; delete[] rms;
} else } else
cout << "Could not allocate float image " << endl; std::cout << "Could not allocate float image " << std::endl;
return NULL; return NULL;
}; };
@ -636,10 +697,10 @@ class multiThreadedAnalogDetector {
threadedAnalogDetector *dets[MAXTHREADS]; threadedAnalogDetector *dets[MAXTHREADS];
analogDetector<uint16_t> *dd[MAXTHREADS]; analogDetector<uint16_t> *dd[MAXTHREADS];
int ithread; int ithread;
int *image; long long int *image;
int *ff; int *ff;
double *ped; double *ped;
pthread_mutex_t fmutex; //pthread_mutex_t fmutex; //unused
}; };
#endif #endif

View File

@ -60,7 +60,7 @@ class singlePhotonDetector : public analogDetector<uint16_t> {
: analogDetector<uint16_t>(d, sign, cm, nped, nnx, nny, gm, gs), : analogDetector<uint16_t>(d, sign, cm, nped, nnx, nny, gm, gs),
nDark(nd), eventMask(NULL), nSigma(nsigma), eMin(-1), eMax(-1), nDark(nd), eventMask(NULL), nSigma(nsigma), eMin(-1), eMax(-1),
clusterSize(csize), clusterSizeY(csize), c2(1), c3(1), clusters(NULL), clusterSize(csize), clusterSizeY(csize), c2(1), c3(1), clusters(NULL),
quad(UNDEFINED_QUADRANT), tot(0), quadTot(0) { quad(UNDEFINED_QUADRANT), tot(0), quadTot(0), ownsMutex(true) { // The original object owns the mutex {
fm = new pthread_mutex_t; fm = new pthread_mutex_t;
pthread_mutex_init(fm, NULL); pthread_mutex_init(fm, NULL);
@ -86,14 +86,21 @@ class singlePhotonDetector : public analogDetector<uint16_t> {
nphFrame = 0; nphFrame = 0;
}; };
/** /**
destructor. Deletes the cluster structure, the pdestalSubtraction and the Destructor. Deletes the cluster structure, event mask, and destroys the mutex.
image array
*/ */
virtual ~singlePhotonDetector() { virtual ~singlePhotonDetector() {
delete[] clusters; delete[] clusters;
for (int i = 0; i < ny; i++) for (int i = 0; i < ny; i++)
delete[] eventMask[i]; delete[] eventMask[i];
delete[] eventMask; delete[] eventMask;
if (ownsMutex) {
if (fm) {
pthread_mutex_destroy(fm); // Destroy the mutex
delete fm; // Free the memory allocated for the mutex
fm = nullptr; // Set the pointer to nullptr to avoid dangling pointer
}
}
}; };
/** /**
@ -103,7 +110,7 @@ class singlePhotonDetector : public analogDetector<uint16_t> {
*/ */
singlePhotonDetector(singlePhotonDetector *orig) singlePhotonDetector(singlePhotonDetector *orig)
: analogDetector<uint16_t>(orig) { : analogDetector<uint16_t>(orig), fm(orig->fm), ownsMutex(false) {
nDark = orig->nDark; nDark = orig->nDark;
myFile = orig->myFile; myFile = orig->myFile;
@ -130,7 +137,7 @@ class singlePhotonDetector : public analogDetector<uint16_t> {
// cluster=clusters; // cluster=clusters;
setClusterSize(clusterSize); setClusterSize(clusterSize);
fm = orig->fm; //fm = orig->fm;
quad = UNDEFINED_QUADRANT; quad = UNDEFINED_QUADRANT;
tot = 0; tot = 0;
@ -381,7 +388,7 @@ class singlePhotonDetector : public analogDetector<uint16_t> {
// int ir, ic; // int ir, ic;
eventType ee; eventType ee;
double max = 0, tl = 0, tr = 0, bl = 0, br = 0, *v; double max = 0, tl = 0, tr = 0, bl = 0, br = 0, v = 0;//, *v;
int cm = 0; int cm = 0;
int good = 1; int good = 1;
int ir, ic; int ir, ic;
@ -403,7 +410,8 @@ class singlePhotonDetector : public analogDetector<uint16_t> {
cm = 1; cm = 1;
} }
double *val = new double[ny * nx]; //double *val = new double[ny * nx];
std::vector<double> val( ny * nx );
for (int iy = ymin; iy < ymax; ++iy) { for (int iy = ymin; iy < ymax; ++iy) {
for (int ix = xmin; ix < xmax; ++ix) { for (int ix = xmin; ix < xmax; ++ix) {
@ -442,18 +450,26 @@ class singlePhotonDetector : public analogDetector<uint16_t> {
} }
v = &(val[(iy + ir) * nx + ix + ic]); //v = &(val[(iy + ir) * nx + ix + ic]);
tot += *v; v = val[(iy + ir) * nx + ix + ic];
//tot += *v;
tot += v;
if (ir <= 0 && ic <= 0) if (ir <= 0 && ic <= 0)
bl += *v; bl += v;
//bl += *v;
if (ir <= 0 && ic >= 0) if (ir <= 0 && ic >= 0)
br += *v; br += v;
//br += *v;
if (ir >= 0 && ic <= 0) if (ir >= 0 && ic <= 0)
tl += *v; tl += v;
//tl += *v;
if (ir >= 0 && ic >= 0) if (ir >= 0 && ic >= 0)
tr += *v; tr += v;
if (*v > max) //{ //tr += *v;
max = *v; //if (*v > max) //{
//max = *v;
if (v > max)
max = v;
//} //}
} }
} }
@ -561,7 +577,7 @@ class singlePhotonDetector : public analogDetector<uint16_t> {
// cout <<id << " **********************************"<< iframe << " " << // cout <<id << " **********************************"<< iframe << " " <<
// det->getFrameNumber(data) << " " << nphFrame << endl; // det->getFrameNumber(data) << " " << nphFrame << endl;
writeClusters(det->getFrameNumber(data)); writeClusters(det->getFrameNumber(data));
delete[] val; //delete[] val;
return image; return image;
}; };
@ -733,7 +749,8 @@ class singlePhotonDetector : public analogDetector<uint16_t> {
int nphFrame; int nphFrame;
// double **val; // double **val;
pthread_mutex_t *fm; pthread_mutex_t* fm; // Pointer to the shared mutex
bool ownsMutex; // Flag to indicate ownership
}; };
#endif #endif