cleaning up properly , semaphore leaks, child process/thread throwing handled

This commit is contained in:
2025-07-08 17:25:23 +02:00
parent fb4a25ecee
commit 2926904cf7
6 changed files with 52 additions and 40 deletions

View File

@ -28,7 +28,8 @@ ParsedOptions CommandLineOptions::parse(int argc, char *argv[]) {
MultiReceiverOptions multi; MultiReceiverOptions multi;
FrameSyncOptions frame; FrameSyncOptions frame;
auto optString = buildOptString(); // leading '+' to stop parsing at first non-option argument (to allow for deprecated options)
auto optString = "+" + buildOptString();
auto longOptions = buildOptionList(); auto longOptions = buildOptionList();
optind = 0; // reset getopt optind = 0; // reset getopt
@ -52,7 +53,7 @@ ParsedOptions CommandLineOptions::parse(int argc, char *argv[]) {
handleAppSpecificOption(opt, optarg, base, multi, frame); handleAppSpecificOption(opt, optarg, base, multi, frame);
break; break;
default: default:
throw sls::RuntimeError("Invalid arguments." + getHelpMessage()); throw sls::RuntimeError("Invalidddd arguments." + getHelpMessage());
} }
} }
@ -237,7 +238,7 @@ void CommandLineOptions::handleAppSpecificOption(int opt, const char *optarg,
} }
case 't': case 't':
LOG(sls::logWARNING) << "Deprecated option 't' and '--rx_tcport'. Use " LOG(sls::logWARNING) << "Deprecated option '-t' and '--rx_tcport'. Use "
"'p' or '--port' instead."; "'p' or '--port' instead.";
base.port = parsePort(optarg); base.port = parsePort(optarg);
break; break;

View File

@ -503,16 +503,15 @@ void sigInterruptHandler(int p) {
} }
int main(int argc, char *argv[]) { int main(int argc, char *argv[]) {
CommandLineOptions cli(AppType::SingleReceiver); CommandLineOptions cli(AppType::FrameSynchronizer);
ParsedOptions opts; ParsedOptions opts;
try { try {
opts = cli.parse(argc, argv); opts = cli.parse(argc, argv);
} catch (sls::RuntimeError &e) { } catch (sls::RuntimeError &e) {
return EXIT_FAILURE; return EXIT_FAILURE;
} }
auto &o = std::get<CommonOptions>(opts);
auto &f = std::get<FrameSyncOptions>(opts); auto &f = std::get<FrameSyncOptions>(opts);
if (o.versionRequested || o.helpRequested) { if (f.versionRequested || f.helpRequested) {
return EXIT_SUCCESS; return EXIT_SUCCESS;
} }
@ -525,7 +524,7 @@ int main(int argc, char *argv[]) {
semaphores.resize(f.numReceivers); semaphores.resize(f.numReceivers);
for (auto &s : semaphores) { for (auto &s : semaphores) {
sem_init(&s, 1, 0); sem_init(&s, 0, 0);
} }
FrameStatus stat{true, false, f.numReceivers}; FrameStatus stat{true, false, f.numReceivers};
@ -537,24 +536,35 @@ int main(int argc, char *argv[]) {
std::thread combinerThread(Correlate, &stat); std::thread combinerThread(Correlate, &stat);
for (int i = 0; i != f.numReceivers; ++i) { for (int i = 0; i != f.numReceivers; ++i) {
uint16_t port = o.port + i; uint16_t port = f.port + i;
sem_t *semaphore = &semaphores[i]; sem_t *semaphore = &semaphores[i];
threads.emplace_back([i, semaphore, port, user_data]() { threads.emplace_back([i, semaphore, port, user_data]() {
sls::Receiver receiver(port); LOG(sls::logINFOBLUE)
receiver.registerCallBackStartAcquisition(StartAcquisitionCallback, << "Thread " << i << " [ Tid: " << gettid() << ']';
user_data); try {
receiver.registerCallBackAcquisitionFinished( sls::Receiver receiver(port);
AcquisitionFinishedCallback, user_data); receiver.registerCallBackStartAcquisition(StartAcquisitionCallback,
receiver.registerCallBackRawDataReady(GetDataCallback, user_data); user_data);
receiver.registerCallBackAcquisitionFinished(
AcquisitionFinishedCallback, user_data);
receiver.registerCallBackRawDataReady(GetDataCallback, user_data);
/** - as long as no Ctrl+C */ /** - as long as no Ctrl+C */
// each child shares the common semaphore // each child shares the common semaphore
sem_wait(semaphore); sem_wait(semaphore);
sem_destroy(semaphore); } catch (...) {
LOG(sls::logINFOBLUE)
// clean up frames << "Exiting Thread " << i << " [ Tid: " << gettid() << " ]";
if (i == 0) for (auto &s : semaphores)
sem_destroy(&s);
cleanup(); cleanup();
if (global_frame_status)
sem_destroy(&(global_frame_status->available));
std::exit(EXIT_FAILURE);
}
LOG(sls::logINFOBLUE)
<< "Exiting Thread " << i << " [ Tid: " << gettid() << " ]";
sem_destroy(semaphore);
}); });
} }
@ -562,6 +572,7 @@ int main(int argc, char *argv[]) {
t.join(); t.join();
} }
cleanup();
{ {
std::lock_guard<std::mutex> lock(stat.mtx); std::lock_guard<std::mutex> lock(stat.mtx);
stat.terminate = true; stat.terminate = true;

View File

@ -143,16 +143,15 @@ void sigInterruptHandler(int signal) {
int main(int argc, char *argv[]) { int main(int argc, char *argv[]) {
CommandLineOptions cli(AppType::SingleReceiver); CommandLineOptions cli(AppType::MultiReceiver);
ParsedOptions opts; ParsedOptions opts;
try { try {
opts = cli.parse(argc, argv); opts = cli.parse(argc, argv);
} catch (sls::RuntimeError &e) { } catch (sls::RuntimeError &e) {
return EXIT_FAILURE; return EXIT_FAILURE;
} }
auto& o = std::get<CommonOptions>(opts);
auto &m = std::get<MultiReceiverOptions>(opts); auto &m = std::get<MultiReceiverOptions>(opts);
if (o.versionRequested || o.helpRequested) { if (m.versionRequested || m.helpRequested) {
return EXIT_SUCCESS; return EXIT_SUCCESS;
} }
@ -185,7 +184,7 @@ int main(int argc, char *argv[]) {
<< "Child process " << i << " [ Tid: " << gettid() << ']'; << "Child process " << i << " [ Tid: " << gettid() << ']';
try { try {
uint16_t port = o.port + i; uint16_t port = m.port + i;
sls::Receiver receiver(port); sls::Receiver receiver(port);
/** - register callbacks. remember to set file write enable /** - register callbacks. remember to set file write enable
@ -215,6 +214,7 @@ int main(int argc, char *argv[]) {
sem_destroy(&semaphore); sem_destroy(&semaphore);
} catch (...) { } catch (...) {
sem_destroy(&semaphore);
LOG(sls::logINFOBLUE) LOG(sls::logINFOBLUE)
<< "Exiting Child Process [ Tid: " << gettid() << " ]"; << "Exiting Child Process [ Tid: " << gettid() << " ]";
throw; throw;
@ -250,10 +250,6 @@ int main(int argc, char *argv[]) {
break; break;
} }
} }
// child closed
LOG(sls::logINFOBLUE)
<< "Exiting Child Process [ Tid: " << childPid << ']';
} }
std::cout << "Goodbye!\n"; std::cout << "Goodbye!\n";

View File

@ -29,8 +29,10 @@ Receiver::~Receiver() = default;
Receiver::Receiver(uint16_t port) { Receiver::Receiver(uint16_t port) {
validatePortNumber(port); validatePortNumber(port);
//#ifdef SLS_USE_TESTS
if (port == 9999) throw RuntimeError("throwing for 9999 test");
//#endif
tcpipInterface = make_unique<ClientInterface>(port); tcpipInterface = make_unique<ClientInterface>(port);
if (port == 1957) throw RuntimeError("throwing for 1957");
} }
std::string Receiver::getReceiverVersion() { std::string Receiver::getReceiverVersion() {

View File

@ -58,7 +58,9 @@ int main(int argc, char *argv[]) {
sem_wait(&semaphore); sem_wait(&semaphore);
sem_destroy(&semaphore); sem_destroy(&semaphore);
} catch (...) { } catch (...) {
// pass sem_destroy(&semaphore);
LOG(sls::logINFOBLUE) << "Exiting [ Tid: " << gettid() << " ]";
throw;
} }
LOG(sls::logINFOBLUE) << "Exiting [ Tid: " << gettid() << " ]"; LOG(sls::logINFOBLUE) << "Exiting [ Tid: " << gettid() << " ]";
LOG(sls::logINFO) << "Exiting Receiver"; LOG(sls::logINFO) << "Exiting Receiver";

View File

@ -200,7 +200,7 @@ namespace sls {
REQUIRE_THROWS(s.parse({"", "110", "1954"}));// mix order REQUIRE_THROWS(s.parse({"", "110", "1954"}));// mix order
REQUIRE_THROWS(s.parse({"", "1023", "10"}));// privileged port REQUIRE_THROWS(s.parse({"", "1023", "10"}));// privileged port
REQUIRE_THROWS(s.parse({"", "2000", "0"}));// invalid num receivers REQUIRE_THROWS(s.parse({"", "2000", "0"}));// invalid num receivers
REQUIRE_THROWS(s.parse({"", "2000", "1000"}));// invalid num receivers REQUIRE_THROWS(s.parse({"", "2000", "1001"}));// invalid num receivers
REQUIRE_THROWS(s.parse({"", "1954", "1", "2"}));// invalid 3rd opt REQUIRE_THROWS(s.parse({"", "1954", "1", "2"}));// invalid 3rd opt
REQUIRE_NOTHROW(s.parse({""})); REQUIRE_NOTHROW(s.parse({""}));
@ -217,13 +217,13 @@ namespace sls {
REQUIRE(o.callbackEnabled == false); REQUIRE(o.callbackEnabled == false);
} else if constexpr (is_type<FrameSyncOptions>(o)) { } else if constexpr (is_type<FrameSyncOptions>(o)) {
REQUIRE(o.port == 1958); REQUIRE(o.port == 1954);
REQUIRE(o.numReceivers == 10); REQUIRE(o.numReceivers == 1);
REQUIRE(o.printHeaders == false); // default REQUIRE(o.printHeaders == false); // default
} }
}, opts); }, opts);
auto opts = s.parse({"", "1958", "10"}); opts = s.parse({"", "1958", "10"});
std::visit([&](const auto& o) { std::visit([&](const auto& o) {
if constexpr (is_type<MultiReceiverOptions>(o)) { if constexpr (is_type<MultiReceiverOptions>(o)) {
REQUIRE(o.port == 1958); REQUIRE(o.port == 1958);
@ -237,7 +237,7 @@ namespace sls {
} }
}, opts); }, opts);
auto opts = s.parse({"", "1958", "1", "1"}); opts = s.parse({"", "1958", "10", "1"});
std::visit([&](const auto& o) { std::visit([&](const auto& o) {
if constexpr (is_type<MultiReceiverOptions>(o)) { if constexpr (is_type<MultiReceiverOptions>(o)) {
REQUIRE(o.port == 1958); REQUIRE(o.port == 1958);
@ -264,13 +264,13 @@ namespace sls {
REQUIRE(n == 1); REQUIRE(n == 1);
REQUIRE(o == false); REQUIRE(o == false);
auto [p, n, o] = CommandLineOptions::ParseDeprecated({"", "1955", "6"}); std::tie(p, n, o) = CommandLineOptions::ParseDeprecated({"", "1955", "6"});
REQUIRE(p == 1954); REQUIRE(p == 1955);
REQUIRE(n == 6); REQUIRE(n == 6);
REQUIRE(o == false); REQUIRE(o == false);
auto [p, n, o] = CommandLineOptions::ParseDeprecated({"", "1955", "6", "1"}); std::tie(p, n, o) = CommandLineOptions::ParseDeprecated({"", "1955", "6", "1"});
REQUIRE(p == 1954); REQUIRE(p == 1955);
REQUIRE(n == 6); REQUIRE(n == 6);
REQUIRE(o == true); REQUIRE(o == true);
} }