Small refactor on ThreadObject and listener (#93)

* removed pointer, slight cleaning of loop

* removed semaphore, use getters

* removed redundant log msg

* removed comment

* added const

* removed comment

* changed header
This commit is contained in:
Erik Fröjdh 2020-04-21 09:45:29 +02:00 committed by GitHub
parent 68f76e5356
commit c1ae67ac46
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 37 additions and 78 deletions

View File

@ -315,10 +315,8 @@ int ClientInterface::set_port(Interface &socket) {
LOG(logINFO) << "TCP port set to " << p_number << std::endl;
sls::ServerSocket new_server(p_number);
// auto new_server = sls::make_unique<sls::ServerSocket>(p_number);
new_server.setLockedBy(server.getLockedBy());
new_server.setLastClient(server.getThisClient());
// server = std::move(new_server);
server = std::move(new_server);
socket.sendResult(p_number);
return OK;

View File

@ -80,7 +80,7 @@ void DataProcessor::SetFifo(Fifo* f) {
}
void DataProcessor::ResetParametersforNewAcquisition(){
runningFlag = false;
StopRunning();
startedFlag = false;
numFramesCaught = 0;
firstIndex = 0;

View File

@ -44,7 +44,7 @@ void DataStreamer::SetFifo(Fifo* f) {
}
void DataStreamer::ResetParametersforNewAcquisition(const std::string& fname){
runningFlag = false;
StopRunning();
startedFlag = false;
firstIndex = 0;

View File

@ -44,13 +44,7 @@ Listener::Listener(int ind, detectorType dtype, Fifo* f, std::atomic<runStatus>*
LOG(logDEBUG) << "Listener " << ind << " created";
}
Listener::~Listener() {
if (udpSocket){
sem_post(&semaphore_socket);
sem_destroy(&semaphore_socket);
}
}
Listener::~Listener() = default;
uint64_t Listener::GetPacketsCaught() const {
return numPacketsCaught;
@ -75,7 +69,7 @@ void Listener::SetFifo(Fifo* f) {
}
void Listener::ResetParametersforNewAcquisition() {
runningFlag = false;
StopRunning();
startedFlag = false;
numPacketsCaught = 0;
firstIndex = 0;
@ -143,7 +137,6 @@ void Listener::CreateUDPSockets() {
}
udpSocketAlive = true;
sem_init(&semaphore_socket,1,0);
// doubled due to kernel bookkeeping (could also be less due to permissions)
*actualUDPSocketBufferSize = udpSocket->getBufferSize();
@ -156,12 +149,6 @@ void Listener::ShutDownUDPSocket() {
udpSocketAlive = false;
udpSocket->Shutdown();
LOG(logINFO) << "Shut down of UDP port " << *udpPortNumber;
fflush(stdout);
// wait only if the threads have started as it is the threads that
//give a post to semaphore(at stopListening)
if (runningFlag)
sem_wait(&semaphore_socket);
sem_destroy(&semaphore_socket);
}
}
@ -269,10 +256,8 @@ void Listener::StopListening(char* buf) {
(*((uint32_t*)buf)) = DUMMY_PACKET_VALUE;
fifo->PushAddress(buf);
StopRunning();
sem_post(&semaphore_socket);
LOG(logDEBUG1) << index << ": Listening Packets (" << *udpPortNumber << ") : " << numPacketsCaught;
LOG(logDEBUG1) << index << ": Listening Completed";
LOG(logDEBUG1) << index << ": Listening Packets (" << *udpPortNumber << ") : " << numPacketsCaught;
LOG(logDEBUG1) << index << ": Listening Completed";
}

View File

@ -238,9 +238,6 @@ class Listener : private virtual slsDetectorDefs, public ThreadObject {
/** if the udp socket is connected */
std::atomic<bool> udpSocketAlive{false};
/** Semaphore to synchronize deleting udp socket */
sem_t semaphore_socket;
// for print progress during acquisition
/** number of packets for statistic */
uint32_t numPacketsStatistic{0};

View File

@ -3,36 +3,27 @@
* @short creates/destroys a thread
***********************************************/
#include "ThreadObject.h"
#include "container_utils.h"
#include <iostream>
#include <sys/syscall.h>
#include <unistd.h>
ThreadObject::ThreadObject(int threadIndex, std::string threadType)
: index(threadIndex), type(threadType) {
LOG(logDEBUG) << type << " thread created: " << index;
sem_init(&semaphore,1,0);
try {
threadObject = sls::make_unique<std::thread>(&ThreadObject::RunningThread, this);
threadObject = std::thread(&ThreadObject::RunningThread, this);
} catch (...) {
throw sls::RuntimeError("Could not create " + type + " thread with index " + std::to_string(index));
}
}
ThreadObject::~ThreadObject() {
killThread = true;
sem_post(&semaphore);
threadObject->join();
threadObject.join();
sem_destroy(&semaphore);
}
@ -50,20 +41,13 @@ void ThreadObject::StopRunning() {
void ThreadObject::RunningThread() {
LOG(logINFOBLUE) << "Created [ " << type << "Thread " << index << ", Tid: " << syscall(SYS_gettid) << "]";
LOG(logDEBUG) << type << " thread " << index << " created successfully.";
while(true) {
while(!killThread) {
while(IsRunning()) {
ThreadExecution();
}
//wait till the next acquisition
sem_wait(&semaphore);
if(killThread) {
break;
}
}
LOG(logDEBUG) << type << " thread with index " << index << " destroyed successfully.";
LOG(logINFOBLUE) << "Exiting [ " << type << " Thread " << index << ", Tid: " << syscall(SYS_gettid) << "]";
}
@ -72,11 +56,10 @@ void ThreadObject::Continue() {
sem_post(&semaphore);
}
void ThreadObject::SetThreadPriority(int priority) {
struct sched_param param;
param.sched_priority = priority;
if (pthread_setschedparam(threadObject->native_handle(), SCHED_FIFO, &param) == EPERM) {
if (pthread_setschedparam(threadObject.native_handle(), SCHED_FIFO, &param) == EPERM) {
if (index == 0) {
LOG(logWARNING) << "Could not prioritize " << type << " thread. "
"(No Root Privileges?)";

View File

@ -7,44 +7,40 @@
*@short creates/destroys a thread
*/
#include "sls_detector_defs.h"
#include "logger.h"
#include "sls_detector_defs.h"
#include <atomic>
#include <thread>
#include <semaphore.h>
#include <string>
#include <atomic>
#include <future>
class ThreadObject : private virtual slsDetectorDefs {
public:
ThreadObject(int threadIndex, std::string threadType);
virtual ~ThreadObject();
bool IsRunning() const;
void StartRunning();
void StopRunning();
void Continue();
void SetThreadPriority(int priority);
protected:
const int index{0};
protected:
virtual void ThreadExecution() = 0;
private:
std::atomic<bool> killThread{false};
std::atomic<bool> runningFlag{false};
std::thread threadObject;
sem_t semaphore;
const std::string type;
private:
/**
* Thread called: An infinite while loop in which,
* semaphore starts executing its contents as long RunningMask is satisfied
* Then it exits the thread on its own if killThread is true
*/
void RunningThread();
public:
ThreadObject(int threadIndex, std::string threadType);
virtual ~ThreadObject();
bool IsRunning() const;
void StartRunning();
void StopRunning();
void Continue();
void SetThreadPriority(int priority);
protected:
int index{0};
std::string type;
std::atomic<bool> killThread{false};
std::atomic<bool> runningFlag{false};
std::unique_ptr<std::thread> threadObject;
sem_t semaphore;
private:
virtual void ThreadExecution() = 0;
/**
* Thread called: An infinite while loop in which,
* semaphore starts executing its contents as long RunningMask is satisfied
* Then it exits the thread on its own if killThread is true
*/
void RunningThread();
};