From ca8a1c046abd8df501550f2b41646829147f39c1 Mon Sep 17 00:00:00 2001 From: Dhanya Thattil Date: Mon, 31 Jan 2022 17:12:32 +0100 Subject: [PATCH] wip, thread to start arping --- RELEASE.txt | 2 +- slsDetectorSoftware/include/sls/Detector.h | 7 ++ slsDetectorSoftware/src/CmdProxy.h | 6 ++ slsDetectorSoftware/src/Detector.cpp | 8 ++ slsDetectorSoftware/src/Module.cpp | 8 ++ slsDetectorSoftware/src/Module.h | 2 + slsReceiverSoftware/CMakeLists.txt | 1 + slsReceiverSoftware/src/ClientInterface.cpp | 20 ++++- slsReceiverSoftware/src/ClientInterface.h | 2 + slsReceiverSoftware/src/Implementation.cpp | 19 +++++ slsReceiverSoftware/src/Implementation.h | 4 + slsReceiverSoftware/src/ThreadArping.cpp | 79 +++++++++++++++++++ slsReceiverSoftware/src/ThreadArping.h | 45 +++++++++++ .../include/sls/sls_detector_funcs.h | 4 + 14 files changed, 205 insertions(+), 2 deletions(-) create mode 100644 slsReceiverSoftware/src/ThreadArping.cpp create mode 100644 slsReceiverSoftware/src/ThreadArping.h diff --git a/RELEASE.txt b/RELEASE.txt index 8a9e9d611..371bb4494 100755 --- a/RELEASE.txt +++ b/RELEASE.txt @@ -28,7 +28,7 @@ This document describes the differences between v6.1.0 and v6.0.0. - changed default vref of adc9257 to 2V for moench (from 1.33V) - moench and ctb - can set the starting frame number of next acquisition - mythen server kernel check incompatible (cet timezone) - +- rx_arping 2. Resolved Issues diff --git a/slsDetectorSoftware/include/sls/Detector.h b/slsDetectorSoftware/include/sls/Detector.h index bc62319e8..da0a37cb3 100644 --- a/slsDetectorSoftware/include/sls/Detector.h +++ b/slsDetectorSoftware/include/sls/Detector.h @@ -882,6 +882,13 @@ class Detector { * streamer yet or there is no second interface, it gives 0 in its place. */ Result> getRxThreadIds(Positions pos = {}) const; + + Result getRxArping(Positions pos = {}) const; + + /** Starts a thread in slsReceiver to ping the interface it is listening. + * Useful in 10G mode. */ + void setRxArping(bool value, Positions pos = {}); + ///@} /** @name File */ diff --git a/slsDetectorSoftware/src/CmdProxy.h b/slsDetectorSoftware/src/CmdProxy.h index 673fd5fd8..82a745cc8 100644 --- a/slsDetectorSoftware/src/CmdProxy.h +++ b/slsDetectorSoftware/src/CmdProxy.h @@ -903,6 +903,7 @@ class CmdProxy { {"rx_lock", &CmdProxy::rx_lock}, {"rx_lastclient", &CmdProxy::rx_lastclient}, {"rx_threads", &CmdProxy::rx_threads}, + {"rx_arping", &CmdProxy::rx_arping}, /* File */ {"fformat", &CmdProxy::fformat}, @@ -1746,6 +1747,11 @@ class CmdProxy { "streamer yet or there is no second interface, it gives 0 in its " "place."); + INTEGER_COMMAND_VEC_ID( + rx_arping, getRxArping, setRxArping, StringTo, + "[0, 1]\n\tStarts a thread in slsReceiver to ping the interface it is " + "listening to. Useful in 10G mode."); + /* File */ INTEGER_COMMAND_VEC_ID( diff --git a/slsDetectorSoftware/src/Detector.cpp b/slsDetectorSoftware/src/Detector.cpp index f9335a195..801193e78 100644 --- a/slsDetectorSoftware/src/Detector.cpp +++ b/slsDetectorSoftware/src/Detector.cpp @@ -1170,6 +1170,14 @@ Detector::getRxThreadIds(Positions pos) const { return pimpl->Parallel(&Module::getReceiverThreadIds, pos); } +Result Detector::getRxArping(Positions pos) const { + return pimpl->Parallel(&Module::getRxArping, pos); +} + +void Detector::setRxArping(bool value, Positions pos) { + pimpl->Parallel(&Module::setRxArping, pos, value); +} + // File Result Detector::getFileFormat(Positions pos) const { diff --git a/slsDetectorSoftware/src/Module.cpp b/slsDetectorSoftware/src/Module.cpp index 79515b29a..0dc88b9ae 100644 --- a/slsDetectorSoftware/src/Module.cpp +++ b/slsDetectorSoftware/src/Module.cpp @@ -1318,6 +1318,14 @@ std::array Module::getReceiverThreadIds() const { F_GET_RECEIVER_THREAD_IDS); } +bool Module::getRxArping() const { + return sendToReceiver(F_GET_RECEIVER_ARPING); +} + +void Module::setRxArping(bool enable) { + sendToReceiver(F_SET_RECEIVER_ARPING, static_cast(enable), nullptr); +} + // File slsDetectorDefs::fileFormat Module::getFileFormat() const { return sendToReceiver(F_GET_RECEIVER_FILE_FORMAT); diff --git a/slsDetectorSoftware/src/Module.h b/slsDetectorSoftware/src/Module.h index ed553378d..22ea7b030 100644 --- a/slsDetectorSoftware/src/Module.h +++ b/slsDetectorSoftware/src/Module.h @@ -283,6 +283,8 @@ class Module : public virtual slsDetectorDefs { void setReceiverLock(bool lock); sls::IpAddr getReceiverLastClientIP() const; std::array getReceiverThreadIds() const; + bool getRxArping() const; + void setRxArping(bool enable); /************************************************** * * diff --git a/slsReceiverSoftware/CMakeLists.txt b/slsReceiverSoftware/CMakeLists.txt index 14efab277..df7a3f175 100755 --- a/slsReceiverSoftware/CMakeLists.txt +++ b/slsReceiverSoftware/CMakeLists.txt @@ -12,6 +12,7 @@ set(SOURCES src/DataProcessor.cpp src/DataStreamer.cpp src/Fifo.cpp + src/ThreadArping.cpp ) set(PUBLICHEADERS diff --git a/slsReceiverSoftware/src/ClientInterface.cpp b/slsReceiverSoftware/src/ClientInterface.cpp index 5dbc4b3a9..ab4023a5b 100644 --- a/slsReceiverSoftware/src/ClientInterface.cpp +++ b/slsReceiverSoftware/src/ClientInterface.cpp @@ -210,7 +210,8 @@ int ClientInterface::functionTable(){ flist[F_SET_RECEIVER_STREAMING_HWM] = &ClientInterface::set_streaming_hwm; flist[F_RECEIVER_SET_ALL_THRESHOLD] = &ClientInterface::set_all_threshold; flist[F_RECEIVER_SET_DATASTREAM] = &ClientInterface::set_detector_datastream; - + flist[F_GET_RECEIVER_ARPING] = &ClientInterface::get_arping; + flist[F_SET_RECEIVER_ARPING] = &ClientInterface::set_arping; for (int i = NUM_DET_FUNCTIONS + 1; i < NUM_REC_FUNCTIONS ; i++) { LOG(logDEBUG1) << "function fnum: " << i << " (" << @@ -1697,3 +1698,20 @@ int ClientInterface::set_detector_datastream(Interface &socket) { impl()->setDetectorDataStream(port, enable); return socket.Send(OK); } + +int ClientInterface::get_arping(Interface &socket) { + auto retval = static_cast(impl()->getArping()); + LOG(logDEBUG1) << "arping thread status:" << retval; + return socket.sendResult(retval); +} + +int ClientInterface::set_arping(Interface &socket) { + auto value = socket.Receive(); + if (value < 0) { + throw RuntimeError("Invalid arping value: " + std::to_string(value)); + } + verifyIdle(socket); + LOG(logDEBUG1) << "Starting/ Killing arping thread:" << value; + impl()->setArping(value); + return socket.Send(OK); +} diff --git a/slsReceiverSoftware/src/ClientInterface.h b/slsReceiverSoftware/src/ClientInterface.h index 6fd30cb98..fb478cd59 100644 --- a/slsReceiverSoftware/src/ClientInterface.h +++ b/slsReceiverSoftware/src/ClientInterface.h @@ -163,6 +163,8 @@ class ClientInterface : private virtual slsDetectorDefs { int set_streaming_hwm(sls::ServerInterface &socket); int set_all_threshold(sls::ServerInterface &socket); int set_detector_datastream(sls::ServerInterface &socket); + int get_arping(sls::ServerInterface &socket); + int set_arping(sls::ServerInterface &socket); Implementation *impl() { if (receiver != nullptr) { diff --git a/slsReceiverSoftware/src/Implementation.cpp b/slsReceiverSoftware/src/Implementation.cpp index 92c4cae00..99fe27f3a 100644 --- a/slsReceiverSoftware/src/Implementation.cpp +++ b/slsReceiverSoftware/src/Implementation.cpp @@ -7,6 +7,7 @@ #include "GeneralData.h" #include "Listener.h" #include "MasterAttributes.h" +#include "ThreadArping.h" #include "sls/ToString.h" #include "sls/ZmqSocket.h" //just for the zmq port define #include "sls/file_utils.h" @@ -107,6 +108,10 @@ void Implementation::SetupFifoStructure() { * ************************************************/ void Implementation::setDetectorType(const detectorType d) { + + // object to create threads to arping + threadArping = sls::make_unique(); + detType = d; switch (detType) { case GOTTHARD: @@ -323,6 +328,20 @@ std::array Implementation::getThreadIds() const { return retval; } +bool Implementation::getArping() const { return threadArping->IsRunning(); } + +void Implementation::setArping(const bool i) { + if (i != threadArping->IsRunning()) { + if (!i) { + threadArping->StopRunning(); + } else { + threadArping->ClearIpsAndInterfaces(); + threadArping->AddIpsAndInterfaces(eth[0], "10.0.0.1"); + threadArping->StartRunning(); + } + } +} + /************************************************** * * * File Parameters * diff --git a/slsReceiverSoftware/src/Implementation.h b/slsReceiverSoftware/src/Implementation.h index 4b80a0012..e4da56383 100644 --- a/slsReceiverSoftware/src/Implementation.h +++ b/slsReceiverSoftware/src/Implementation.h @@ -11,6 +11,7 @@ class DataProcessor; class DataStreamer; class Fifo; class slsDetectorDefs; +class ThreadArping; #include #include @@ -49,6 +50,8 @@ class Implementation : private virtual slsDetectorDefs { void setFramePaddingEnable(const bool i); void setThreadIds(const pid_t parentTid, const pid_t tcpTid); std::array getThreadIds() const; + bool getArping() const; + void setArping(const bool i); /************************************************** * * @@ -379,6 +382,7 @@ class Implementation : private virtual slsDetectorDefs { std::vector> dataProcessor; std::vector> dataStreamer; std::vector> fifo; + std::unique_ptr threadArping; std::mutex hdf5Lib; }; diff --git a/slsReceiverSoftware/src/ThreadArping.cpp b/slsReceiverSoftware/src/ThreadArping.cpp new file mode 100644 index 000000000..17e6d8824 --- /dev/null +++ b/slsReceiverSoftware/src/ThreadArping.cpp @@ -0,0 +1,79 @@ +// SPDX-License-Identifier: LGPL-3.0-or-other +// Copyright (C) 2021 Contributors to the SLS Detector Package + +#include "ThreadArping.h" +#include "sls/container_utils.h" +#include +#include +#include + +ThreadArping::ThreadArping() {} + +ThreadArping::~ThreadArping() { StopRunning(); } + +bool ThreadArping::IsRunning() const { return runningFlag; } + +void ThreadArping::StartRunning() { + if (!runningFlag) { + if (arpInterfaceIp.size() == 0) { + throw sls::RuntimeError("No Interface added to Arping"); + } + threads.clear(); + threadIds.clear(); + runningFlag = true; + + // create threadss + for (auto arp : arpInterfaceIp) { + try { + std::thread temp = + std::thread(&ThreadArping::RunningThread, this, + threads.size(), arp.first, arp.second); + threads.push_back(temp.native_handle()); + temp.detach(); + } catch (...) { + StopRunning(); + throw sls::RuntimeError("Could not create arping thread [" + + arp.first + ", " + arp.second + "]"); + } + } + } +} + +void ThreadArping::StopRunning() { + int i = 0; + for (auto t : threads) { + pthread_cancel(t); + LOG(logINFOBLUE) << "Killing [ Arping Thread " << i << ": (" + << arpInterfaceIp[i].first << ", " + << arpInterfaceIp[i].second << ")]"; + ++i; + } + threads.clear(); + runningFlag = false; +} + +void ThreadArping::ClearIpsAndInterfaces() { arpInterfaceIp.clear(); } + +void ThreadArping::AddIpsAndInterfaces(std::string interface, std::string ip) { + arpInterfaceIp.push_back(std::make_pair(interface, ip)); +} + +void ThreadArping::RunningThread(int index, std::string interface, + std::string ip) { + pid_t threadId = syscall(SYS_gettid); + LOG(logINFOBLUE) << "Created [ Arping Thread " << index << ": (" + << interface << ", " << ip << ") Tid: " << threadId << "]"; + { + std::lock_guard lock(&mutexIds); + threadIds.push_back(threadId); + } + + while (IsRunning()) { + LOG(logINFOBLUE) << "Going to sleep apring id " << threadId; + // wait for 60s + usleep(60 * 1000 * 1000); + } + + LOG(logINFOBLUE) << "Exiting [ Arping Thread " << index << ": (" + << interface << ", " << ip << ") Tid: " << threadId << "]"; +} \ No newline at end of file diff --git a/slsReceiverSoftware/src/ThreadArping.h b/slsReceiverSoftware/src/ThreadArping.h new file mode 100644 index 000000000..9441497c1 --- /dev/null +++ b/slsReceiverSoftware/src/ThreadArping.h @@ -0,0 +1,45 @@ +// SPDX-License-Identifier: LGPL-3.0-or-other +// Copyright (C) 2021 Contributors to the SLS Detector Package +#pragma once +/** + *@short creates/destroys an ARPing thread to ping the interfaces slsReceiver is +listening to. + */ + +#include "sls/logger.h" +#include "sls/sls_detector_defs.h" + +#include +#include +#include +#include +#include // pair, make_pair + +class ThreadArping : private virtual slsDetectorDefs { + + private: + std::atomic killThread{false}; + std::atomic runningFlag{false}; + + std::vector threads; + std::vector> arpInterfaceIp; + std::vector threadIds; + std::mutex mutexIds; + + public: + ThreadArping(); + virtual ~ThreadArping(); + bool IsRunning() const; + void StartRunning(); + void StopRunning(); + void ClearIpsAndInterfaces(); + void AddIpsAndInterfaces(std::string interface, std::string ip); + + private: + /** + * Thread called: An infinite while loop that runs arping as long as + * RunningMask is satisfied Then it exits the thread on its own if + * killThread is true + */ + void RunningThread(int index, std::string interface, std::string ip); +}; diff --git a/slsSupportLib/include/sls/sls_detector_funcs.h b/slsSupportLib/include/sls/sls_detector_funcs.h index a0b3f71cb..ca7a19de2 100755 --- a/slsSupportLib/include/sls/sls_detector_funcs.h +++ b/slsSupportLib/include/sls/sls_detector_funcs.h @@ -361,6 +361,8 @@ enum detFuncs { F_SET_RECEIVER_STREAMING_HWM, F_RECEIVER_SET_ALL_THRESHOLD, F_RECEIVER_SET_DATASTREAM, + F_GET_RECEIVER_ARPING, + F_SET_RECEIVER_ARPING, NUM_REC_FUNCTIONS }; @@ -720,6 +722,8 @@ const char* getFunctionNameFromEnum(enum detFuncs func) { case F_SET_RECEIVER_STREAMING_HWM: return "F_SET_RECEIVER_STREAMING_HWM"; case F_RECEIVER_SET_ALL_THRESHOLD: return "F_RECEIVER_SET_ALL_THRESHOLD"; case F_RECEIVER_SET_DATASTREAM: return "F_RECEIVER_SET_DATASTREAM"; + case F_GET_RECEIVER_ARPING: return "F_GET_RECEIVER_ARPING"; + case F_SET_RECEIVER_ARPING: return "F_SET_RECEIVER_ARPING"; case NUM_REC_FUNCTIONS: return "NUM_REC_FUNCTIONS"; default: return "Unknown Function";