wip, thread to start arping

This commit is contained in:
2022-01-31 17:12:32 +01:00
parent a4cd4fd14a
commit ca8a1c046a
14 changed files with 205 additions and 2 deletions

View File

@ -210,7 +210,8 @@ int ClientInterface::functionTable(){
flist[F_SET_RECEIVER_STREAMING_HWM] = &ClientInterface::set_streaming_hwm;
flist[F_RECEIVER_SET_ALL_THRESHOLD] = &ClientInterface::set_all_threshold;
flist[F_RECEIVER_SET_DATASTREAM] = &ClientInterface::set_detector_datastream;
flist[F_GET_RECEIVER_ARPING] = &ClientInterface::get_arping;
flist[F_SET_RECEIVER_ARPING] = &ClientInterface::set_arping;
for (int i = NUM_DET_FUNCTIONS + 1; i < NUM_REC_FUNCTIONS ; i++) {
LOG(logDEBUG1) << "function fnum: " << i << " (" <<
@ -1697,3 +1698,20 @@ int ClientInterface::set_detector_datastream(Interface &socket) {
impl()->setDetectorDataStream(port, enable);
return socket.Send(OK);
}
int ClientInterface::get_arping(Interface &socket) {
auto retval = static_cast<int>(impl()->getArping());
LOG(logDEBUG1) << "arping thread status:" << retval;
return socket.sendResult(retval);
}
int ClientInterface::set_arping(Interface &socket) {
auto value = socket.Receive<int>();
if (value < 0) {
throw RuntimeError("Invalid arping value: " + std::to_string(value));
}
verifyIdle(socket);
LOG(logDEBUG1) << "Starting/ Killing arping thread:" << value;
impl()->setArping(value);
return socket.Send(OK);
}

View File

@ -163,6 +163,8 @@ class ClientInterface : private virtual slsDetectorDefs {
int set_streaming_hwm(sls::ServerInterface &socket);
int set_all_threshold(sls::ServerInterface &socket);
int set_detector_datastream(sls::ServerInterface &socket);
int get_arping(sls::ServerInterface &socket);
int set_arping(sls::ServerInterface &socket);
Implementation *impl() {
if (receiver != nullptr) {

View File

@ -7,6 +7,7 @@
#include "GeneralData.h"
#include "Listener.h"
#include "MasterAttributes.h"
#include "ThreadArping.h"
#include "sls/ToString.h"
#include "sls/ZmqSocket.h" //just for the zmq port define
#include "sls/file_utils.h"
@ -107,6 +108,10 @@ void Implementation::SetupFifoStructure() {
* ************************************************/
void Implementation::setDetectorType(const detectorType d) {
// object to create threads to arping
threadArping = sls::make_unique<ThreadArping>();
detType = d;
switch (detType) {
case GOTTHARD:
@ -323,6 +328,20 @@ std::array<pid_t, NUM_RX_THREAD_IDS> Implementation::getThreadIds() const {
return retval;
}
bool Implementation::getArping() const { return threadArping->IsRunning(); }
void Implementation::setArping(const bool i) {
if (i != threadArping->IsRunning()) {
if (!i) {
threadArping->StopRunning();
} else {
threadArping->ClearIpsAndInterfaces();
threadArping->AddIpsAndInterfaces(eth[0], "10.0.0.1");
threadArping->StartRunning();
}
}
}
/**************************************************
* *
* File Parameters *

View File

@ -11,6 +11,7 @@ class DataProcessor;
class DataStreamer;
class Fifo;
class slsDetectorDefs;
class ThreadArping;
#include <atomic>
#include <chrono>
@ -49,6 +50,8 @@ class Implementation : private virtual slsDetectorDefs {
void setFramePaddingEnable(const bool i);
void setThreadIds(const pid_t parentTid, const pid_t tcpTid);
std::array<pid_t, NUM_RX_THREAD_IDS> getThreadIds() const;
bool getArping() const;
void setArping(const bool i);
/**************************************************
* *
@ -379,6 +382,7 @@ class Implementation : private virtual slsDetectorDefs {
std::vector<std::unique_ptr<DataProcessor>> dataProcessor;
std::vector<std::unique_ptr<DataStreamer>> dataStreamer;
std::vector<std::unique_ptr<Fifo>> fifo;
std::unique_ptr<ThreadArping> threadArping;
std::mutex hdf5Lib;
};

View File

@ -0,0 +1,79 @@
// SPDX-License-Identifier: LGPL-3.0-or-other
// Copyright (C) 2021 Contributors to the SLS Detector Package
#include "ThreadArping.h"
#include "sls/container_utils.h"
#include <iostream>
#include <sys/syscall.h>
#include <unistd.h>
ThreadArping::ThreadArping() {}
ThreadArping::~ThreadArping() { StopRunning(); }
bool ThreadArping::IsRunning() const { return runningFlag; }
void ThreadArping::StartRunning() {
if (!runningFlag) {
if (arpInterfaceIp.size() == 0) {
throw sls::RuntimeError("No Interface added to Arping");
}
threads.clear();
threadIds.clear();
runningFlag = true;
// create threadss
for (auto arp : arpInterfaceIp) {
try {
std::thread temp =
std::thread(&ThreadArping::RunningThread, this,
threads.size(), arp.first, arp.second);
threads.push_back(temp.native_handle());
temp.detach();
} catch (...) {
StopRunning();
throw sls::RuntimeError("Could not create arping thread [" +
arp.first + ", " + arp.second + "]");
}
}
}
}
void ThreadArping::StopRunning() {
int i = 0;
for (auto t : threads) {
pthread_cancel(t);
LOG(logINFOBLUE) << "Killing [ Arping Thread " << i << ": ("
<< arpInterfaceIp[i].first << ", "
<< arpInterfaceIp[i].second << ")]";
++i;
}
threads.clear();
runningFlag = false;
}
void ThreadArping::ClearIpsAndInterfaces() { arpInterfaceIp.clear(); }
void ThreadArping::AddIpsAndInterfaces(std::string interface, std::string ip) {
arpInterfaceIp.push_back(std::make_pair(interface, ip));
}
void ThreadArping::RunningThread(int index, std::string interface,
std::string ip) {
pid_t threadId = syscall(SYS_gettid);
LOG(logINFOBLUE) << "Created [ Arping Thread " << index << ": ("
<< interface << ", " << ip << ") Tid: " << threadId << "]";
{
std::lock_guard<std::mutex> lock(&mutexIds);
threadIds.push_back(threadId);
}
while (IsRunning()) {
LOG(logINFOBLUE) << "Going to sleep apring id " << threadId;
// wait for 60s
usleep(60 * 1000 * 1000);
}
LOG(logINFOBLUE) << "Exiting [ Arping Thread " << index << ": ("
<< interface << ", " << ip << ") Tid: " << threadId << "]";
}

View File

@ -0,0 +1,45 @@
// SPDX-License-Identifier: LGPL-3.0-or-other
// Copyright (C) 2021 Contributors to the SLS Detector Package
#pragma once
/**
*@short creates/destroys an ARPing thread to ping the interfaces slsReceiver is
listening to.
*/
#include "sls/logger.h"
#include "sls/sls_detector_defs.h"
#include <atomic>
#include <mutex>
#include <string>
#include <thread>
#include <utility> // pair, make_pair
class ThreadArping : private virtual slsDetectorDefs {
private:
std::atomic<bool> killThread{false};
std::atomic<bool> runningFlag{false};
std::vector<pthread_t> threads;
std::vector<std::pair<std::string, std::string>> arpInterfaceIp;
std::vector<pid_t> threadIds;
std::mutex mutexIds;
public:
ThreadArping();
virtual ~ThreadArping();
bool IsRunning() const;
void StartRunning();
void StopRunning();
void ClearIpsAndInterfaces();
void AddIpsAndInterfaces(std::string interface, std::string ip);
private:
/**
* Thread called: An infinite while loop that runs arping as long as
* RunningMask is satisfied Then it exits the thread on its own if
* killThread is true
*/
void RunningThread(int index, std::string interface, std::string ip);
};