merge fix

This commit is contained in:
2022-02-03 13:03:40 +01:00
39 changed files with 395 additions and 360 deletions

View File

@ -0,0 +1,115 @@
// SPDX-License-Identifier: LGPL-3.0-or-other
// Copyright (C) 2021 Contributors to the SLS Detector Package
#include "Arping.h"
#include <chrono>
#include <sys/syscall.h>
#include <unistd.h>
void Arping::SetInterfacesAndIps(const int index, const std::string &interface,
const std::string &ip) {
if (interface.empty() || ip.empty()) {
throw sls::RuntimeError("Could not arping. Interface name and ip not "
"set up for interface " +
std::to_string(index));
}
// create commands to arping
std::ostringstream os;
os << "arping -c 1 -U -I " << interface << " " << ip;
// to read error messages
os << " 2>&1";
std::string cmd = os.str();
commands[index] = cmd;
}
pid_t Arping::GetThreadId() const { return threadId; }
bool Arping::IsRunning() const { return runningFlag; }
void Arping::StartThread() {
TestCommands();
try {
t = std::thread(&Arping::ThreadExecution, this);
} catch (...) {
throw sls::RuntimeError("Could not start arping thread");
}
runningFlag = true;
}
void Arping::StopThread() {
runningFlag = false;
t.join();
}
void Arping::ThreadExecution() {
threadId = syscall(SYS_gettid);
LOG(logINFOBLUE) << "Created [ Arping Thread, Tid: " << threadId << " ]";
while (runningFlag) {
std::string error = ExecuteCommands();
// just print (was already tested at thread start)
if (!error.empty()) {
LOG(logERROR) << error;
}
// wait for 60s as long as thread not killed
int nsecs = 0;
while (runningFlag && nsecs != 60) {
std::this_thread::sleep_for(std::chrono::seconds(1));
++nsecs;
}
}
LOG(logINFOBLUE) << "Exiting [ Arping Thread, Tid: " << threadId << " ]";
threadId = 0;
}
void Arping::TestCommands() {
// atleast one interface must be set up
if (commands[0].empty()) {
throw sls::RuntimeError(
"Could not arping. Interface not set up in apring thread");
}
// test if arping commands throw an error
std::string error = ExecuteCommands();
if (!error.empty()) {
throw sls::RuntimeError(error);
}
}
std::string Arping::ExecuteCommands() {
for (auto cmd : commands) {
// empty if 2nd interface not enabled
if (cmd.empty())
continue;
LOG(logDEBUG) << "Executing Arping Command: " << cmd;
// execute command
FILE *sysFile = popen(cmd.c_str(), "r");
if (sysFile == NULL) {
std::ostringstream os;
os << "Could not Arping [" << cmd << " ] : Popen fail";
return os.str();
}
// copy output
char output[MAX_STR_LENGTH] = {0};
fgets(output, sizeof(output), sysFile);
output[sizeof(output) - 1] = '\0';
// check exit status of command
if (pclose(sysFile)) {
std::ostringstream os;
os << "Could not arping[" << cmd << "] : " << output;
return os.str();
} else {
LOG(logDEBUG) << output;
}
}
return std::string();
}

View File

@ -0,0 +1,35 @@
// SPDX-License-Identifier: LGPL-3.0-or-other
// Copyright (C) 2021 Contributors to the SLS Detector Package
#pragma once
/**
*@short creates/destroys an ARPing thread to arping the interfaces slsReceiver
is listening to.
*/
#include "receiver_defs.h"
#include "sls/logger.h"
#include <atomic>
#include <thread>
class Arping : private virtual slsDetectorDefs {
public:
void SetInterfacesAndIps(const int index, const std::string &interface,
const std::string &ip);
pid_t GetThreadId() const;
bool IsRunning() const;
void StartThread();
void StopThread();
private:
void TestCommands();
std::string ExecuteCommands();
void ThreadExecution();
std::vector<std::string> commands =
std::vector<std::string>(MAX_NUMBER_OF_LISTENING_THREADS);
std::atomic<bool> runningFlag{false};
std::thread t;
pid_t threadId{0};
};

View File

@ -1401,8 +1401,7 @@ sls::MacAddr ClientInterface::setUdpIp(sls::IpAddr arg) {
}
// update locally to use for arping
udpips.clear();
udpips.push_back(arg.str());
udpips[0] = arg.str();
// get mac address
auto retval = sls::InterfaceNameToMac(eth);
@ -1437,7 +1436,7 @@ sls::MacAddr ClientInterface::setUdpIp2(sls::IpAddr arg) {
impl()->setEthernetInterface2(eth);
// update locally to use for arping
udpips.push_back(arg.str());
udpips[1] = arg.str();
// get mac address
auto retval = sls::InterfaceNameToMac(eth);

View File

@ -192,5 +192,6 @@ class ClientInterface : private virtual slsDetectorDefs {
pid_t parentThreadId{0};
pid_t tcpThreadId{0};
std::vector<std::string> udpips;
std::vector<std::string> udpips =
std::vector<std::string>(MAX_NUMBER_OF_LISTENING_THREADS);
};

View File

@ -1,13 +1,13 @@
// SPDX-License-Identifier: LGPL-3.0-or-other
// Copyright (C) 2021 Contributors to the SLS Detector Package
#include "Implementation.h"
#include "Arping.h"
#include "DataProcessor.h"
#include "DataStreamer.h"
#include "Fifo.h"
#include "GeneralData.h"
#include "Listener.h"
#include "MasterAttributes.h"
#include "ThreadArping.h"
#include "sls/ToString.h"
#include "sls/ZmqSocket.h" //just for the zmq port define
#include "sls/file_utils.h"
@ -109,8 +109,8 @@ void Implementation::SetupFifoStructure() {
void Implementation::setDetectorType(const detectorType d) {
// object to create threads to arping
threadArping = sls::make_unique<ThreadArping>();
// object to create thread for arping
arping = sls::make_unique<Arping>();
detType = d;
switch (detType) {
@ -325,30 +325,31 @@ std::array<pid_t, NUM_RX_THREAD_IDS> Implementation::getThreadIds() const {
retval[id++] = 0;
}
}
if (threadArping->IsRunning()) {
retval[NUM_RX_THREAD_IDS - 1] = threadArping->GetThreadId();
}
retval[NUM_RX_THREAD_IDS - 1] = arping->GetThreadId();
return retval;
}
bool Implementation::getArping() const { return threadArping->IsRunning(); }
bool Implementation::getArping() const { return arping->IsRunning(); }
pid_t Implementation::getArpingThreadId() const {
return threadArping->GetThreadId();
return arping->GetThreadId();
}
void Implementation::setArping(const bool i,
const std::vector<std::string> ips) {
if (i != threadArping->IsRunning()) {
if (i != arping->IsRunning()) {
if (!i) {
threadArping->StopRunning();
arping->StopThread();
} else {
threadArping->ClearIpsAndInterfaces();
threadArping->AddIpsAndInterfaces(eth[0], ips[0]);
if (numUDPInterfaces == 2 && detType != EIGER) {
threadArping->AddIpsAndInterfaces(eth[1], ips[1]);
// setup interface
for (int i = 0; i != numUDPInterfaces; ++i) {
// ignore eiger with 2 interfaces (only udp port)
if (i == 1 && (numUDPInterfaces == 1 || detType == EIGER)) {
break;
}
arping->SetInterfacesAndIps(i, eth[i], ips[i]);
}
threadArping->StartRunning();
arping->StartThread();
}
}
}

View File

@ -11,7 +11,7 @@ class DataProcessor;
class DataStreamer;
class Fifo;
class slsDetectorDefs;
class ThreadArping;
class Arping;
#include <atomic>
#include <chrono>
@ -383,7 +383,7 @@ class Implementation : private virtual slsDetectorDefs {
std::vector<std::unique_ptr<DataProcessor>> dataProcessor;
std::vector<std::unique_ptr<DataStreamer>> dataStreamer;
std::vector<std::unique_ptr<Fifo>> fifo;
std::unique_ptr<ThreadArping> threadArping;
std::unique_ptr<Arping> arping;
std::mutex hdf5Lib;
};

View File

@ -55,9 +55,11 @@ int64_t Listener::GetNumMissingPacket(bool stoppedFlag,
bool Listener::GetStartedFlag() { return startedFlag; }
uint64_t Listener::GetCurrentFrameIndex() { return currentFrameIndex; }
uint64_t Listener::GetCurrentFrameIndex() { return lastCaughtFrameIndex; }
uint64_t Listener::GetListenedIndex() { return currentFrameIndex - firstIndex; }
uint64_t Listener::GetListenedIndex() {
return lastCaughtFrameIndex - firstIndex;
}
void Listener::SetFifo(Fifo *f) { fifo = f; }
@ -88,6 +90,7 @@ void Listener::ResetParametersforNewAcquisition() {
void Listener::RecordFirstIndex(uint64_t fnum) {
// listen to this fnum, later +1
currentFrameIndex = fnum;
lastCaughtFrameIndex = fnum;
startedFlag = true;
firstIndex = fnum;

View File

@ -1,95 +0,0 @@
// SPDX-License-Identifier: LGPL-3.0-or-other
// Copyright (C) 2021 Contributors to the SLS Detector Package
#include "ThreadArping.h"
#include "sls/container_utils.h"
#include <iostream>
#include <sys/syscall.h>
#include <unistd.h>
ThreadArping::ThreadArping() {}
ThreadArping::~ThreadArping() { StopRunning(); }
pid_t ThreadArping::GetThreadId() const { return threadId; }
bool ThreadArping::IsRunning() const { return runningFlag; }
void ThreadArping::StartRunning() {
if (!runningFlag) {
if (arpInterfaceIp.size() == 0) {
throw sls::RuntimeError("No Interface added to Arping");
}
runningFlag = true;
// create thread
try {
std::thread temp = std::thread(&ThreadArping::RunningThread, this);
threadObject = temp.native_handle();
temp.detach();
} catch (...) {
throw sls::RuntimeError("Could not create arping thread");
}
}
}
void ThreadArping::StopRunning() {
pthread_cancel(threadObject);
LOG(logINFOBLUE) << "Killing [ Arping Thread, Tid: " << threadId << " ]";
runningFlag = false;
}
void ThreadArping::ClearIpsAndInterfaces() { arpInterfaceIp.clear(); }
void ThreadArping::AddIpsAndInterfaces(std::string interface, std::string ip) {
arpInterfaceIp.push_back(std::make_pair(interface, ip));
}
void ThreadArping::RunningThread() {
threadId = syscall(SYS_gettid);
{
std::ostringstream os;
os << "Created [ Arping Thread, Tid: " << threadId << " ] for ";
for (auto ethip : arpInterfaceIp) {
os << "\n\t[ " << ethip.first << ", " << ethip.second << " ]";
}
LOG(logINFOBLUE) << os.str();
}
// create the commands to ping necessary interfaces
std::vector<std::string> commands;
for (auto ethip : arpInterfaceIp) {
std::ostringstream os;
os << "arping -c 1 -U -I " << ethip.first << " " << ethip.second;
// to read error messages
os << " 2>&1";
std::string cmd = os.str();
commands.push_back(cmd);
}
while (IsRunning()) {
// arping
for (auto cmd : commands) {
LOG(logDEBUG) << "Executing Arping Command: " << cmd;
// execute command and check for errors
FILE *sysFile = popen(cmd.c_str(), "r");
char output[MAX_STR_LENGTH] = {0};
fgets(output, sizeof(output), sysFile);
output[sizeof(output) - 1] = '\0';
if (pclose(sysFile)) {
LOG(logERROR) << "Executing cmd[" << cmd
<< "]\n\tError Message : " << output;
} else {
LOG(logDEBUG) << output;
}
}
// wait for 60s
usleep(60 * 1000 * 1000);
}
LOG(logINFOBLUE) << "Exiting [ Arping Thread, Tid: " << threadId << " ]";
}

View File

@ -1,45 +0,0 @@
// SPDX-License-Identifier: LGPL-3.0-or-other
// Copyright (C) 2021 Contributors to the SLS Detector Package
#pragma once
/**
*@short creates/destroys an ARPing thread to ping the interfaces slsReceiver is
listening to.
*/
#include "sls/logger.h"
#include "sls/sls_detector_defs.h"
#include <atomic>
#include <string>
#include <thread>
#include <utility> // pair, make_pair
class ThreadArping : private virtual slsDetectorDefs {
private:
std::atomic<bool> killThread{false};
std::atomic<bool> runningFlag{false};
pthread_t threadObject;
std::vector<std::pair<std::string, std::string>> arpInterfaceIp;
std::vector<std::string> commands;
pid_t threadId;
public:
ThreadArping();
virtual ~ThreadArping();
pid_t GetThreadId() const;
bool IsRunning() const;
void StartRunning();
void StopRunning();
void ClearIpsAndInterfaces();
void AddIpsAndInterfaces(std::string interface, std::string ip);
private:
/**
* Thread called: An infinite while loop that runs arping as long as
* RunningMask is satisfied Then it exits the thread on its own if
* killThread is true
*/
void RunningThread();
};

View File

@ -21,14 +21,6 @@ class ThreadObject : private virtual slsDetectorDefs {
protected:
const int index{0};
private:
std::atomic<bool> killThread{false};
std::atomic<bool> runningFlag{false};
std::thread threadObject;
sem_t semaphore;
const std::string type;
pid_t threadId{0};
public:
ThreadObject(int threadIndex, std::string threadType);
virtual ~ThreadObject();
@ -47,4 +39,11 @@ class ThreadObject : private virtual slsDetectorDefs {
* Then it exits the thread on its own if killThread is true
*/
void RunningThread();
std::atomic<bool> killThread{false};
std::atomic<bool> runningFlag{false};
std::thread threadObject;
sem_t semaphore;
const std::string type;
pid_t threadId{0};
};