raising a SIGINT when the child thread has an exception so that the parent thread can exit all the threads and clean up gracefully

This commit is contained in:
2025-07-09 17:19:54 +02:00
parent d8ee0c2279
commit ef8d8a5fd2
2 changed files with 41 additions and 26 deletions

View File

@ -541,45 +541,44 @@ int main(int argc, char *argv[]) {
void *user_data = static_cast<void *>(&stat); void *user_data = static_cast<void *>(&stat);
std::thread combinerThread(Correlate, &stat); std::thread combinerThread(Correlate, &stat);
std::exception_ptr threadException = nullptr;
for (int i = 0; i != f.numReceivers; ++i) { for (int i = 0; i != f.numReceivers; ++i) {
uint16_t port = f.port + i; uint16_t port = f.port + i;
sem_t *semaphore = &semaphores[i]; sem_t *semaphore = &semaphores[i];
threads.emplace_back([i, semaphore, port, user_data]() { threads.emplace_back(
LOG(sls::logINFOBLUE) [i, semaphore, port, user_data, &threadException]() {
<< "Thread " << i << " [ Tid: " << gettid() << ']'; LOG(sls::logINFOBLUE)
try { << "Thread " << i << " [ Tid: " << gettid() << ']';
sls::Receiver receiver(port); try {
receiver.registerCallBackStartAcquisition( sls::Receiver receiver(port);
StartAcquisitionCallback, user_data); receiver.registerCallBackStartAcquisition(
receiver.registerCallBackAcquisitionFinished( StartAcquisitionCallback, user_data);
AcquisitionFinishedCallback, user_data); receiver.registerCallBackAcquisitionFinished(
receiver.registerCallBackRawDataReady(GetDataCallback, AcquisitionFinishedCallback, user_data);
user_data); receiver.registerCallBackRawDataReady(GetDataCallback,
user_data);
/** - as long as no Ctrl+C */ /** - as long as no Ctrl+C */
// each child shares the common semaphore // each child shares the common semaphore
sem_wait(semaphore); sem_wait(semaphore);
} catch (...) { } catch (...) {
// capture exception and raise SIGINT to exit gracefully
threadException = std::current_exception();
raise(SIGINT);
}
LOG(sls::logINFOBLUE) LOG(sls::logINFOBLUE)
<< "Exiting Thread " << i << " [ Tid: " << gettid() << " ]"; << "Exiting Thread " << i << " [ Tid: " << gettid() << " ]";
for (auto &s : semaphores) });
sem_destroy(&s);
cleanup();
if (global_frame_status)
sem_destroy(&(global_frame_status->available));
std::exit(EXIT_FAILURE);
}
LOG(sls::logINFOBLUE)
<< "Exiting Thread " << i << " [ Tid: " << gettid() << " ]";
sem_destroy(semaphore);
});
} }
for (auto &t : threads) { for (auto &t : threads) {
t.join(); t.join();
} }
for (auto &s : semaphores)
sem_destroy(&s);
cleanup(); cleanup();
{ {
std::lock_guard<std::mutex> lock(stat.mtx); std::lock_guard<std::mutex> lock(stat.mtx);
stat.terminate = true; stat.terminate = true;
@ -588,6 +587,19 @@ int main(int argc, char *argv[]) {
combinerThread.join(); combinerThread.join();
sem_destroy(&stat.available); sem_destroy(&stat.available);
if (threadException) {
try {
std::rethrow_exception(threadException);
} catch (const std::exception &e) {
LOG(sls::logERROR)
<< "Unhandled exception from thread: " << e.what();
return EXIT_FAILURE;
} catch (...) {
LOG(sls::logERROR) << "Unknown exception occurred in thread";
return EXIT_FAILURE;
}
}
LOG(sls::logINFOBLUE) << "Goodbye!"; LOG(sls::logINFOBLUE) << "Goodbye!";
return EXIT_SUCCESS; return EXIT_SUCCESS;
} }

View File

@ -29,6 +29,9 @@ Receiver::~Receiver() = default;
Receiver::Receiver(uint16_t port) { Receiver::Receiver(uint16_t port) {
validatePortNumber(port); validatePortNumber(port);
if (port == 9999) {
throw sls::RuntimeError("Throwing for testing purposes. ");
}
tcpipInterface = make_unique<ClientInterface>(port); tcpipInterface = make_unique<ClientInterface>(port);
} }