Fix rx arping socket bind (#646)

* first tries with a process intead of thread for rx_arping

* Moving delete pointer of udp socket to stopReciever,so rx_arping can only be set when udp socket is closed

* refactoring and formatting

* unused variable processId

* ignore sigchild to prevent zombie from child processes being killed
This commit is contained in:
Dhanya Thattil 2023-02-03 17:18:09 +01:00 committed by GitHub
parent e172df79f3
commit ebb6f53b21
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 75 additions and 52 deletions

View File

@ -267,7 +267,7 @@ class Detector(CppDetectorApi):
@element
def rx_threads(self):
"""
Get thread ids from the receiver in order of [parent, tcp, listener 0, processor 0, streamer 0, listener 1, processor 1, streamer 1, arping].
Get kernel thread ids from the receiver in order of [parent, tcp, listener 0, processor 0, streamer 0, listener 1, processor 1, streamer 1, arping].
Note
-----

View File

@ -943,9 +943,10 @@ class Detector {
/** Client IP Address that last communicated with the receiver */
Result<IpAddr> getRxLastClientIP(Positions pos = {}) const;
/** Get thread ids from the receiver in order of [parent, tcp, listener 0,
* processor 0, streamer 0, listener 1, processor 1, streamer 1, arping]. If
* no streamer yet or there is no second interface, it gives 0 in its place.
/** Get kernel thread ids from the receiver in order of [parent, tcp,
* listener 0, processor 0, streamer 0, listener 1, processor 1, streamer 1,
* arping]. If no streamer yet or there is no second interface, it gives 0
* in its place.
*/
Result<std::array<pid_t, NUM_RX_THREAD_IDS>>
getRxThreadIds(Positions pos = {}) const;

View File

@ -1767,11 +1767,12 @@ class CmdProxy {
rx_lastclient, getRxLastClientIP,
"\n\tClient IP Address that last communicated with the receiver.");
GET_COMMAND(rx_threads, getRxThreadIds,
"\n\tGet thread ids from the receiver in order of [parent, "
"tcp, listener 0, processor 0, streamer 0, listener 1, "
"processor 1, streamer 1, arping]. If no streamer yet or there "
"is no second interface, it gives 0 in its place.");
GET_COMMAND(
rx_threads, getRxThreadIds,
"\n\tGet kernel thread ids from the receiver in order of [parent, "
"tcp, listener 0, processor 0, streamer 0, listener 1, "
"processor 1, streamer 1, arping]. If no streamer yet or there "
"is no second interface, it gives 0 in its place.");
INTEGER_COMMAND_VEC_ID(rx_arping, getRxArping, setRxArping, StringTo<int>,
"[0, 1]\n\tStarts a thread in slsReceiver to arping "

View File

@ -4,6 +4,8 @@
#include "Arping.h"
#include <chrono>
#include <signal.h>
#include <thread>
#include <unistd.h>
namespace sls {
@ -18,7 +20,7 @@ Arping::Arping() {}
Arping::~Arping() {
if (IsRunning()) {
StopThread();
StopProcess();
}
}
@ -33,59 +35,66 @@ void Arping::SetInterfacesAndIps(const int index, const std::string &interface,
// create commands to arping
std::ostringstream os;
os << "arping -c 1 -U -I " << interface << " " << ip;
// to read error messages
os << " 2>&1";
std::string cmd = os.str();
commands[index] = cmd;
}
pid_t Arping::GetThreadId() const { return threadId; }
pid_t Arping::GetProcessId() const { return childPid; }
bool Arping::IsRunning() const { return runningFlag; }
void Arping::StartThread() {
void Arping::StartProcess() {
TestCommands();
try {
t = std::thread(&Arping::ThreadExecution, this);
} catch (...) {
throw RuntimeError("Could not start arping thread");
// to prevent zombies from child processes being killed
signal(SIGCHLD, SIG_IGN);
// Needs to be a fork and udp socket deleted after Listening threads
// done running to prevent udp socket cannot bind because of popen
// that forks
childPid = fork();
// child process
if (childPid == 0) {
LOG(logINFOBLUE) << "Created [ Arping Process, Tid: " << gettid()
<< " ]";
ProcessExecution();
}
// parent process
else if (childPid > 0) {
runningFlag = true;
}
// error
else {
throw RuntimeError("Could not start arping Process");
}
runningFlag = true;
}
void Arping::StopThread() {
void Arping::StopProcess() {
LOG(logINFOBLUE) << "Exiting [ Arping Process ]";
if (kill(childPid, SIGTERM)) {
throw RuntimeError("Could not kill the arping Process");
}
runningFlag = false;
t.join();
}
void Arping::ThreadExecution() {
threadId = gettid();
LOG(logINFOBLUE) << "Created [ Arping Thread, Tid: " << threadId << " ]";
while (runningFlag) {
void Arping::ProcessExecution() {
while (true) {
std::string error = ExecuteCommands();
// just print (was already tested at thread start)
// just print (was already tested at Process start)
if (!error.empty()) {
LOG(logERROR) << error;
}
// wait for 60s as long as thread not killed
int nsecs = 0;
while (runningFlag && nsecs != 60) {
std::this_thread::sleep_for(std::chrono::seconds(1));
++nsecs;
}
std::this_thread::sleep_for(std::chrono::seconds(timeIntervalSeconds));
}
LOG(logINFOBLUE) << "Exiting [ Arping Thread, Tid: " << threadId << " ]";
threadId = 0;
}
void Arping::TestCommands() {
// atleast one interface must be set up
if (commands[0].empty()) {
throw RuntimeError(
"Could not arping. Interface not set up in apring thread");
"Could not arping. Interface not set up in arping Process");
}
// test if arping commands throw an error
std::string error = ExecuteCommands();
@ -101,7 +110,7 @@ std::string Arping::ExecuteCommands() {
if (cmd.empty())
continue;
LOG(logDEBUG) << "Executing Arping Command: " << cmd;
LOG(logDEBUG1) << "Executing Arping Command: " << cmd;
// execute command
FILE *sysFile = popen(cmd.c_str(), "r");

View File

@ -2,15 +2,15 @@
// Copyright (C) 2021 Contributors to the SLS Detector Package
#pragma once
/**
*@short creates/destroys an ARPing thread to arping the interfaces slsReceiver
is listening to.
*@short creates/destroys an ARPing child process to arping the interfaces
slsReceiver is listening to.
*/
#include "receiver_defs.h"
#include "sls/logger.h"
#include <atomic>
#include <thread>
#include <unistd.h>
namespace sls {
@ -22,21 +22,21 @@ class Arping {
void SetInterfacesAndIps(const int index, const std::string &interface,
const std::string &ip);
pid_t GetThreadId() const;
pid_t GetProcessId() const;
bool IsRunning() const;
void StartThread();
void StopThread();
void StartProcess();
void StopProcess();
private:
void TestCommands();
std::string ExecuteCommands();
void ThreadExecution();
void ProcessExecution();
std::vector<std::string> commands =
std::vector<std::string>(MAX_NUMBER_OF_LISTENING_THREADS);
std::atomic<bool> runningFlag{false};
std::thread t;
std::atomic<pid_t> threadId{0};
std::atomic<pid_t> childPid{0};
static const int timeIntervalSeconds = 60;
};
} // namespace sls

View File

@ -351,19 +351,19 @@ std::array<pid_t, NUM_RX_THREAD_IDS> Implementation::getThreadIds() const {
retval[id++] = 0;
}
}
retval[NUM_RX_THREAD_IDS - 1] = arping.GetThreadId();
retval[NUM_RX_THREAD_IDS - 1] = arping.GetProcessId();
return retval;
}
bool Implementation::getArping() const { return arping.IsRunning(); }
pid_t Implementation::getArpingThreadId() const { return arping.GetThreadId(); }
pid_t Implementation::getArpingProcessId() const { return arping.GetProcessId(); }
void Implementation::setArping(const bool i,
const std::vector<std::string> ips) {
if (i != arping.IsRunning()) {
if (!i) {
arping.StopThread();
arping.StopProcess();
} else {
// setup interface
for (int i = 0; i != generalData->numUDPInterfaces; ++i) {
@ -374,7 +374,7 @@ void Implementation::setArping(const bool i,
}
arping.SetInterfacesAndIps(i, eth[i], ips[i]);
}
arping.StartThread();
arping.StartProcess();
}
}
}
@ -713,6 +713,10 @@ void Implementation::stopReceiver() {
std::this_thread::sleep_for(std::chrono::milliseconds(5));
}
// delete the udp sockets
for (const auto &it : listener)
it->DeleteUDPSocket();
if (fileWriteEnable && modulePos == 0) {
// master and virtual file (hdf5)
StartMasterWriter();

View File

@ -54,7 +54,7 @@ class Implementation : private virtual slsDetectorDefs {
void setThreadIds(const pid_t parentTid, const pid_t tcpTid);
std::array<pid_t, NUM_RX_THREAD_IDS> getThreadIds() const;
bool getArping() const;
pid_t getArpingThreadId() const;
pid_t getArpingProcessId() const;
void setArping(const bool i, const std::vector<std::string> ips);
ROI getReceiverROI() const;
void setReceiverROI(const ROI arg);

View File

@ -184,6 +184,13 @@ void Listener::ShutDownUDPSocket() {
}
}
void Listener::DeleteUDPSocket() {
if (udpSocket) {
udpSocket.reset();
LOG(logINFO) << "Closed UDP port " << udpPortNumber;
}
}
void Listener::CreateDummySocketForUDPSocketBufferSize(int s, int &actualSize) {
// custom setup (s != 0)
// default setup at startup (s = 0)

View File

@ -50,6 +50,7 @@ class Listener : private virtual slsDetectorDefs, public ThreadObject {
void ResetParametersforNewAcquisition();
void CreateUDPSocket(int &actualSize);
void ShutDownUDPSocket();
void DeleteUDPSocket();
/** to set & get actual buffer size */
void CreateDummySocketForUDPSocketBufferSize(int s, int &actualSize);