From ebb6f53b21dbee19747162145d5a61f148f07d8c Mon Sep 17 00:00:00 2001 From: Dhanya Thattil <33750417+thattil@users.noreply.github.com> Date: Fri, 3 Feb 2023 17:18:09 +0100 Subject: [PATCH] Fix rx arping socket bind (#646) * first tries with a process intead of thread for rx_arping * Moving delete pointer of udp socket to stopReciever,so rx_arping can only be set when udp socket is closed * refactoring and formatting * unused variable processId * ignore sigchild to prevent zombie from child processes being killed --- python/slsdet/detector.py | 2 +- slsDetectorSoftware/include/sls/Detector.h | 7 ++- slsDetectorSoftware/src/CmdProxy.h | 11 ++-- slsReceiverSoftware/src/Arping.cpp | 67 ++++++++++++---------- slsReceiverSoftware/src/Arping.h | 18 +++--- slsReceiverSoftware/src/Implementation.cpp | 12 ++-- slsReceiverSoftware/src/Implementation.h | 2 +- slsReceiverSoftware/src/Listener.cpp | 7 +++ slsReceiverSoftware/src/Listener.h | 1 + 9 files changed, 75 insertions(+), 52 deletions(-) diff --git a/python/slsdet/detector.py b/python/slsdet/detector.py index 1692e3148..3345f729a 100755 --- a/python/slsdet/detector.py +++ b/python/slsdet/detector.py @@ -267,7 +267,7 @@ class Detector(CppDetectorApi): @element def rx_threads(self): """ - Get thread ids from the receiver in order of [parent, tcp, listener 0, processor 0, streamer 0, listener 1, processor 1, streamer 1, arping]. + Get kernel thread ids from the receiver in order of [parent, tcp, listener 0, processor 0, streamer 0, listener 1, processor 1, streamer 1, arping]. Note ----- diff --git a/slsDetectorSoftware/include/sls/Detector.h b/slsDetectorSoftware/include/sls/Detector.h index ea17f9c78..ca33a92f7 100644 --- a/slsDetectorSoftware/include/sls/Detector.h +++ b/slsDetectorSoftware/include/sls/Detector.h @@ -943,9 +943,10 @@ class Detector { /** Client IP Address that last communicated with the receiver */ Result getRxLastClientIP(Positions pos = {}) const; - /** Get thread ids from the receiver in order of [parent, tcp, listener 0, - * processor 0, streamer 0, listener 1, processor 1, streamer 1, arping]. If - * no streamer yet or there is no second interface, it gives 0 in its place. + /** Get kernel thread ids from the receiver in order of [parent, tcp, + * listener 0, processor 0, streamer 0, listener 1, processor 1, streamer 1, + * arping]. If no streamer yet or there is no second interface, it gives 0 + * in its place. */ Result> getRxThreadIds(Positions pos = {}) const; diff --git a/slsDetectorSoftware/src/CmdProxy.h b/slsDetectorSoftware/src/CmdProxy.h index dd3b69449..ddc8c056b 100644 --- a/slsDetectorSoftware/src/CmdProxy.h +++ b/slsDetectorSoftware/src/CmdProxy.h @@ -1767,11 +1767,12 @@ class CmdProxy { rx_lastclient, getRxLastClientIP, "\n\tClient IP Address that last communicated with the receiver."); - GET_COMMAND(rx_threads, getRxThreadIds, - "\n\tGet thread ids from the receiver in order of [parent, " - "tcp, listener 0, processor 0, streamer 0, listener 1, " - "processor 1, streamer 1, arping]. If no streamer yet or there " - "is no second interface, it gives 0 in its place."); + GET_COMMAND( + rx_threads, getRxThreadIds, + "\n\tGet kernel thread ids from the receiver in order of [parent, " + "tcp, listener 0, processor 0, streamer 0, listener 1, " + "processor 1, streamer 1, arping]. If no streamer yet or there " + "is no second interface, it gives 0 in its place."); INTEGER_COMMAND_VEC_ID(rx_arping, getRxArping, setRxArping, StringTo, "[0, 1]\n\tStarts a thread in slsReceiver to arping " diff --git a/slsReceiverSoftware/src/Arping.cpp b/slsReceiverSoftware/src/Arping.cpp index 3431737f5..0f8fca2ba 100644 --- a/slsReceiverSoftware/src/Arping.cpp +++ b/slsReceiverSoftware/src/Arping.cpp @@ -4,6 +4,8 @@ #include "Arping.h" #include +#include +#include #include namespace sls { @@ -18,7 +20,7 @@ Arping::Arping() {} Arping::~Arping() { if (IsRunning()) { - StopThread(); + StopProcess(); } } @@ -33,59 +35,66 @@ void Arping::SetInterfacesAndIps(const int index, const std::string &interface, // create commands to arping std::ostringstream os; os << "arping -c 1 -U -I " << interface << " " << ip; - // to read error messages - os << " 2>&1"; std::string cmd = os.str(); commands[index] = cmd; } -pid_t Arping::GetThreadId() const { return threadId; } +pid_t Arping::GetProcessId() const { return childPid; } bool Arping::IsRunning() const { return runningFlag; } -void Arping::StartThread() { +void Arping::StartProcess() { TestCommands(); - try { - t = std::thread(&Arping::ThreadExecution, this); - } catch (...) { - throw RuntimeError("Could not start arping thread"); + + // to prevent zombies from child processes being killed + signal(SIGCHLD, SIG_IGN); + + // Needs to be a fork and udp socket deleted after Listening threads + // done running to prevent udp socket cannot bind because of popen + // that forks + childPid = fork(); + // child process + if (childPid == 0) { + LOG(logINFOBLUE) << "Created [ Arping Process, Tid: " << gettid() + << " ]"; + ProcessExecution(); + } + // parent process + else if (childPid > 0) { + runningFlag = true; + } + // error + else { + throw RuntimeError("Could not start arping Process"); } - runningFlag = true; } -void Arping::StopThread() { +void Arping::StopProcess() { + LOG(logINFOBLUE) << "Exiting [ Arping Process ]"; + + if (kill(childPid, SIGTERM)) { + throw RuntimeError("Could not kill the arping Process"); + } runningFlag = false; - t.join(); } -void Arping::ThreadExecution() { - threadId = gettid(); - LOG(logINFOBLUE) << "Created [ Arping Thread, Tid: " << threadId << " ]"; - - while (runningFlag) { +void Arping::ProcessExecution() { + while (true) { std::string error = ExecuteCommands(); - // just print (was already tested at thread start) + // just print (was already tested at Process start) if (!error.empty()) { LOG(logERROR) << error; } - // wait for 60s as long as thread not killed - int nsecs = 0; - while (runningFlag && nsecs != 60) { - std::this_thread::sleep_for(std::chrono::seconds(1)); - ++nsecs; - } + std::this_thread::sleep_for(std::chrono::seconds(timeIntervalSeconds)); } - - LOG(logINFOBLUE) << "Exiting [ Arping Thread, Tid: " << threadId << " ]"; - threadId = 0; } void Arping::TestCommands() { // atleast one interface must be set up if (commands[0].empty()) { throw RuntimeError( - "Could not arping. Interface not set up in apring thread"); + "Could not arping. Interface not set up in arping Process"); } // test if arping commands throw an error std::string error = ExecuteCommands(); @@ -101,7 +110,7 @@ std::string Arping::ExecuteCommands() { if (cmd.empty()) continue; - LOG(logDEBUG) << "Executing Arping Command: " << cmd; + LOG(logDEBUG1) << "Executing Arping Command: " << cmd; // execute command FILE *sysFile = popen(cmd.c_str(), "r"); diff --git a/slsReceiverSoftware/src/Arping.h b/slsReceiverSoftware/src/Arping.h index 4a4571605..e9f454dab 100644 --- a/slsReceiverSoftware/src/Arping.h +++ b/slsReceiverSoftware/src/Arping.h @@ -2,15 +2,15 @@ // Copyright (C) 2021 Contributors to the SLS Detector Package #pragma once /** - *@short creates/destroys an ARPing thread to arping the interfaces slsReceiver -is listening to. + *@short creates/destroys an ARPing child process to arping the interfaces +slsReceiver is listening to. */ #include "receiver_defs.h" #include "sls/logger.h" #include -#include +#include namespace sls { @@ -22,21 +22,21 @@ class Arping { void SetInterfacesAndIps(const int index, const std::string &interface, const std::string &ip); - pid_t GetThreadId() const; + pid_t GetProcessId() const; bool IsRunning() const; - void StartThread(); - void StopThread(); + void StartProcess(); + void StopProcess(); private: void TestCommands(); std::string ExecuteCommands(); - void ThreadExecution(); + void ProcessExecution(); std::vector commands = std::vector(MAX_NUMBER_OF_LISTENING_THREADS); std::atomic runningFlag{false}; - std::thread t; - std::atomic threadId{0}; + std::atomic childPid{0}; + static const int timeIntervalSeconds = 60; }; } // namespace sls diff --git a/slsReceiverSoftware/src/Implementation.cpp b/slsReceiverSoftware/src/Implementation.cpp index d67fd0cee..7951e922f 100644 --- a/slsReceiverSoftware/src/Implementation.cpp +++ b/slsReceiverSoftware/src/Implementation.cpp @@ -351,19 +351,19 @@ std::array Implementation::getThreadIds() const { retval[id++] = 0; } } - retval[NUM_RX_THREAD_IDS - 1] = arping.GetThreadId(); + retval[NUM_RX_THREAD_IDS - 1] = arping.GetProcessId(); return retval; } bool Implementation::getArping() const { return arping.IsRunning(); } -pid_t Implementation::getArpingThreadId() const { return arping.GetThreadId(); } +pid_t Implementation::getArpingProcessId() const { return arping.GetProcessId(); } void Implementation::setArping(const bool i, const std::vector ips) { if (i != arping.IsRunning()) { if (!i) { - arping.StopThread(); + arping.StopProcess(); } else { // setup interface for (int i = 0; i != generalData->numUDPInterfaces; ++i) { @@ -374,7 +374,7 @@ void Implementation::setArping(const bool i, } arping.SetInterfacesAndIps(i, eth[i], ips[i]); } - arping.StartThread(); + arping.StartProcess(); } } } @@ -713,6 +713,10 @@ void Implementation::stopReceiver() { std::this_thread::sleep_for(std::chrono::milliseconds(5)); } + // delete the udp sockets + for (const auto &it : listener) + it->DeleteUDPSocket(); + if (fileWriteEnable && modulePos == 0) { // master and virtual file (hdf5) StartMasterWriter(); diff --git a/slsReceiverSoftware/src/Implementation.h b/slsReceiverSoftware/src/Implementation.h index a9ebd660f..f4f455b56 100644 --- a/slsReceiverSoftware/src/Implementation.h +++ b/slsReceiverSoftware/src/Implementation.h @@ -54,7 +54,7 @@ class Implementation : private virtual slsDetectorDefs { void setThreadIds(const pid_t parentTid, const pid_t tcpTid); std::array getThreadIds() const; bool getArping() const; - pid_t getArpingThreadId() const; + pid_t getArpingProcessId() const; void setArping(const bool i, const std::vector ips); ROI getReceiverROI() const; void setReceiverROI(const ROI arg); diff --git a/slsReceiverSoftware/src/Listener.cpp b/slsReceiverSoftware/src/Listener.cpp index 68ac3ca50..87d0c7d35 100644 --- a/slsReceiverSoftware/src/Listener.cpp +++ b/slsReceiverSoftware/src/Listener.cpp @@ -184,6 +184,13 @@ void Listener::ShutDownUDPSocket() { } } +void Listener::DeleteUDPSocket() { + if (udpSocket) { + udpSocket.reset(); + LOG(logINFO) << "Closed UDP port " << udpPortNumber; + } +} + void Listener::CreateDummySocketForUDPSocketBufferSize(int s, int &actualSize) { // custom setup (s != 0) // default setup at startup (s = 0) diff --git a/slsReceiverSoftware/src/Listener.h b/slsReceiverSoftware/src/Listener.h index 8ebeee35f..5b50e53f6 100644 --- a/slsReceiverSoftware/src/Listener.h +++ b/slsReceiverSoftware/src/Listener.h @@ -50,6 +50,7 @@ class Listener : private virtual slsDetectorDefs, public ThreadObject { void ResetParametersforNewAcquisition(); void CreateUDPSocket(int &actualSize); void ShutDownUDPSocket(); + void DeleteUDPSocket(); /** to set & get actual buffer size */ void CreateDummySocketForUDPSocketBufferSize(int s, int &actualSize);