From c1ae67ac46281b53c358cc9147267ebde7edc9e2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Erik=20Fr=C3=B6jdh?= Date: Tue, 21 Apr 2020 09:45:29 +0200 Subject: [PATCH] Small refactor on ThreadObject and listener (#93) * removed pointer, slight cleaning of loop * removed semaphore, use getters * removed redundant log msg * removed comment * added const * removed comment * changed header --- slsReceiverSoftware/src/ClientInterface.cpp | 2 - slsReceiverSoftware/src/DataProcessor.cpp | 2 +- slsReceiverSoftware/src/DataStreamer.cpp | 2 +- slsReceiverSoftware/src/Listener.cpp | 23 ++------ slsReceiverSoftware/src/Listener.h | 3 -- slsReceiverSoftware/src/ThreadObject.cpp | 25 ++------- slsReceiverSoftware/src/ThreadObject.h | 58 ++++++++++----------- 7 files changed, 37 insertions(+), 78 deletions(-) diff --git a/slsReceiverSoftware/src/ClientInterface.cpp b/slsReceiverSoftware/src/ClientInterface.cpp index 83c751a16..a786072af 100755 --- a/slsReceiverSoftware/src/ClientInterface.cpp +++ b/slsReceiverSoftware/src/ClientInterface.cpp @@ -315,10 +315,8 @@ int ClientInterface::set_port(Interface &socket) { LOG(logINFO) << "TCP port set to " << p_number << std::endl; sls::ServerSocket new_server(p_number); - // auto new_server = sls::make_unique(p_number); new_server.setLockedBy(server.getLockedBy()); new_server.setLastClient(server.getThisClient()); - // server = std::move(new_server); server = std::move(new_server); socket.sendResult(p_number); return OK; diff --git a/slsReceiverSoftware/src/DataProcessor.cpp b/slsReceiverSoftware/src/DataProcessor.cpp index e45a29a13..a3fe3e8d5 100755 --- a/slsReceiverSoftware/src/DataProcessor.cpp +++ b/slsReceiverSoftware/src/DataProcessor.cpp @@ -80,7 +80,7 @@ void DataProcessor::SetFifo(Fifo* f) { } void DataProcessor::ResetParametersforNewAcquisition(){ - runningFlag = false; + StopRunning(); startedFlag = false; numFramesCaught = 0; firstIndex = 0; diff --git a/slsReceiverSoftware/src/DataStreamer.cpp b/slsReceiverSoftware/src/DataStreamer.cpp index 9c9b5f1a1..ccec6db9f 100755 --- a/slsReceiverSoftware/src/DataStreamer.cpp +++ b/slsReceiverSoftware/src/DataStreamer.cpp @@ -44,7 +44,7 @@ void DataStreamer::SetFifo(Fifo* f) { } void DataStreamer::ResetParametersforNewAcquisition(const std::string& fname){ - runningFlag = false; + StopRunning(); startedFlag = false; firstIndex = 0; diff --git a/slsReceiverSoftware/src/Listener.cpp b/slsReceiverSoftware/src/Listener.cpp index 48b9e133d..ca9761ca3 100755 --- a/slsReceiverSoftware/src/Listener.cpp +++ b/slsReceiverSoftware/src/Listener.cpp @@ -44,13 +44,7 @@ Listener::Listener(int ind, detectorType dtype, Fifo* f, std::atomic* LOG(logDEBUG) << "Listener " << ind << " created"; } - -Listener::~Listener() { - if (udpSocket){ - sem_post(&semaphore_socket); - sem_destroy(&semaphore_socket); - } -} +Listener::~Listener() = default; uint64_t Listener::GetPacketsCaught() const { return numPacketsCaught; @@ -75,7 +69,7 @@ void Listener::SetFifo(Fifo* f) { } void Listener::ResetParametersforNewAcquisition() { - runningFlag = false; + StopRunning(); startedFlag = false; numPacketsCaught = 0; firstIndex = 0; @@ -143,7 +137,6 @@ void Listener::CreateUDPSockets() { } udpSocketAlive = true; - sem_init(&semaphore_socket,1,0); // doubled due to kernel bookkeeping (could also be less due to permissions) *actualUDPSocketBufferSize = udpSocket->getBufferSize(); @@ -156,12 +149,6 @@ void Listener::ShutDownUDPSocket() { udpSocketAlive = false; udpSocket->Shutdown(); LOG(logINFO) << "Shut down of UDP port " << *udpPortNumber; - fflush(stdout); - // wait only if the threads have started as it is the threads that - //give a post to semaphore(at stopListening) - if (runningFlag) - sem_wait(&semaphore_socket); - sem_destroy(&semaphore_socket); } } @@ -269,10 +256,8 @@ void Listener::StopListening(char* buf) { (*((uint32_t*)buf)) = DUMMY_PACKET_VALUE; fifo->PushAddress(buf); StopRunning(); - - sem_post(&semaphore_socket); - LOG(logDEBUG1) << index << ": Listening Packets (" << *udpPortNumber << ") : " << numPacketsCaught; - LOG(logDEBUG1) << index << ": Listening Completed"; + LOG(logDEBUG1) << index << ": Listening Packets (" << *udpPortNumber << ") : " << numPacketsCaught; + LOG(logDEBUG1) << index << ": Listening Completed"; } diff --git a/slsReceiverSoftware/src/Listener.h b/slsReceiverSoftware/src/Listener.h index 38fdd072e..3011909a3 100755 --- a/slsReceiverSoftware/src/Listener.h +++ b/slsReceiverSoftware/src/Listener.h @@ -238,9 +238,6 @@ class Listener : private virtual slsDetectorDefs, public ThreadObject { /** if the udp socket is connected */ std::atomic udpSocketAlive{false}; - /** Semaphore to synchronize deleting udp socket */ - sem_t semaphore_socket; - // for print progress during acquisition /** number of packets for statistic */ uint32_t numPacketsStatistic{0}; diff --git a/slsReceiverSoftware/src/ThreadObject.cpp b/slsReceiverSoftware/src/ThreadObject.cpp index 08d2a5a67..591ad7e5c 100755 --- a/slsReceiverSoftware/src/ThreadObject.cpp +++ b/slsReceiverSoftware/src/ThreadObject.cpp @@ -3,36 +3,27 @@ * @short creates/destroys a thread ***********************************************/ - - #include "ThreadObject.h" #include "container_utils.h" #include #include #include - - ThreadObject::ThreadObject(int threadIndex, std::string threadType) : index(threadIndex), type(threadType) { LOG(logDEBUG) << type << " thread created: " << index; - sem_init(&semaphore,1,0); - try { - threadObject = sls::make_unique(&ThreadObject::RunningThread, this); + threadObject = std::thread(&ThreadObject::RunningThread, this); } catch (...) { throw sls::RuntimeError("Could not create " + type + " thread with index " + std::to_string(index)); } } - ThreadObject::~ThreadObject() { killThread = true; sem_post(&semaphore); - - threadObject->join(); - + threadObject.join(); sem_destroy(&semaphore); } @@ -50,20 +41,13 @@ void ThreadObject::StopRunning() { void ThreadObject::RunningThread() { LOG(logINFOBLUE) << "Created [ " << type << "Thread " << index << ", Tid: " << syscall(SYS_gettid) << "]"; - LOG(logDEBUG) << type << " thread " << index << " created successfully."; - - while(true) { + while(!killThread) { while(IsRunning()) { ThreadExecution(); } //wait till the next acquisition sem_wait(&semaphore); - if(killThread) { - break; - } } - - LOG(logDEBUG) << type << " thread with index " << index << " destroyed successfully."; LOG(logINFOBLUE) << "Exiting [ " << type << " Thread " << index << ", Tid: " << syscall(SYS_gettid) << "]"; } @@ -72,11 +56,10 @@ void ThreadObject::Continue() { sem_post(&semaphore); } - void ThreadObject::SetThreadPriority(int priority) { struct sched_param param; param.sched_priority = priority; - if (pthread_setschedparam(threadObject->native_handle(), SCHED_FIFO, ¶m) == EPERM) { + if (pthread_setschedparam(threadObject.native_handle(), SCHED_FIFO, ¶m) == EPERM) { if (index == 0) { LOG(logWARNING) << "Could not prioritize " << type << " thread. " "(No Root Privileges?)"; diff --git a/slsReceiverSoftware/src/ThreadObject.h b/slsReceiverSoftware/src/ThreadObject.h index 624dec1cf..08e044a7f 100755 --- a/slsReceiverSoftware/src/ThreadObject.h +++ b/slsReceiverSoftware/src/ThreadObject.h @@ -7,44 +7,40 @@ *@short creates/destroys a thread */ -#include "sls_detector_defs.h" #include "logger.h" +#include "sls_detector_defs.h" - +#include +#include #include #include -#include -#include class ThreadObject : private virtual slsDetectorDefs { - - public: - ThreadObject(int threadIndex, std::string threadType); - virtual ~ThreadObject(); - bool IsRunning() const; - void StartRunning(); - void StopRunning(); - void Continue(); - void SetThreadPriority(int priority); + protected: + const int index{0}; - protected: - virtual void ThreadExecution() = 0; + private: + std::atomic killThread{false}; + std::atomic runningFlag{false}; + std::thread threadObject; + sem_t semaphore; + const std::string type; - private: - /** - * Thread called: An infinite while loop in which, - * semaphore starts executing its contents as long RunningMask is satisfied - * Then it exits the thread on its own if killThread is true - */ - void RunningThread(); + public: + ThreadObject(int threadIndex, std::string threadType); + virtual ~ThreadObject(); + bool IsRunning() const; + void StartRunning(); + void StopRunning(); + void Continue(); + void SetThreadPriority(int priority); - - protected: - int index{0}; - std::string type; - std::atomic killThread{false}; - std::atomic runningFlag{false}; - std::unique_ptr threadObject; - sem_t semaphore; + private: + virtual void ThreadExecution() = 0; + /** + * Thread called: An infinite while loop in which, + * semaphore starts executing its contents as long RunningMask is satisfied + * Then it exits the thread on its own if killThread is true + */ + void RunningThread(); }; -