Merge branch 'developer' into separateRxr

This commit is contained in:
maliakal_d 2020-04-21 11:30:23 +02:00
commit c408f9807a
7 changed files with 37 additions and 78 deletions

View File

@ -315,10 +315,8 @@ int ClientInterface::set_port(Interface &socket) {
LOG(logINFO) << "TCP port set to " << p_number << std::endl; LOG(logINFO) << "TCP port set to " << p_number << std::endl;
sls::ServerSocket new_server(p_number); sls::ServerSocket new_server(p_number);
// auto new_server = sls::make_unique<sls::ServerSocket>(p_number);
new_server.setLockedBy(server.getLockedBy()); new_server.setLockedBy(server.getLockedBy());
new_server.setLastClient(server.getThisClient()); new_server.setLastClient(server.getThisClient());
// server = std::move(new_server);
server = std::move(new_server); server = std::move(new_server);
socket.sendResult(p_number); socket.sendResult(p_number);
return OK; return OK;

View File

@ -80,7 +80,7 @@ void DataProcessor::SetFifo(Fifo* f) {
} }
void DataProcessor::ResetParametersforNewAcquisition(){ void DataProcessor::ResetParametersforNewAcquisition(){
runningFlag = false; StopRunning();
startedFlag = false; startedFlag = false;
numFramesCaught = 0; numFramesCaught = 0;
firstIndex = 0; firstIndex = 0;

View File

@ -44,7 +44,7 @@ void DataStreamer::SetFifo(Fifo* f) {
} }
void DataStreamer::ResetParametersforNewAcquisition(const std::string& fname){ void DataStreamer::ResetParametersforNewAcquisition(const std::string& fname){
runningFlag = false; StopRunning();
startedFlag = false; startedFlag = false;
firstIndex = 0; firstIndex = 0;

View File

@ -44,13 +44,7 @@ Listener::Listener(int ind, detectorType dtype, Fifo* f, std::atomic<runStatus>*
LOG(logDEBUG) << "Listener " << ind << " created"; LOG(logDEBUG) << "Listener " << ind << " created";
} }
Listener::~Listener() = default;
Listener::~Listener() {
if (udpSocket){
sem_post(&semaphore_socket);
sem_destroy(&semaphore_socket);
}
}
uint64_t Listener::GetPacketsCaught() const { uint64_t Listener::GetPacketsCaught() const {
return numPacketsCaught; return numPacketsCaught;
@ -75,7 +69,7 @@ void Listener::SetFifo(Fifo* f) {
} }
void Listener::ResetParametersforNewAcquisition() { void Listener::ResetParametersforNewAcquisition() {
runningFlag = false; StopRunning();
startedFlag = false; startedFlag = false;
numPacketsCaught = 0; numPacketsCaught = 0;
firstIndex = 0; firstIndex = 0;
@ -143,7 +137,6 @@ void Listener::CreateUDPSockets() {
} }
udpSocketAlive = true; udpSocketAlive = true;
sem_init(&semaphore_socket,1,0);
// doubled due to kernel bookkeeping (could also be less due to permissions) // doubled due to kernel bookkeeping (could also be less due to permissions)
*actualUDPSocketBufferSize = udpSocket->getBufferSize(); *actualUDPSocketBufferSize = udpSocket->getBufferSize();
@ -156,12 +149,6 @@ void Listener::ShutDownUDPSocket() {
udpSocketAlive = false; udpSocketAlive = false;
udpSocket->Shutdown(); udpSocket->Shutdown();
LOG(logINFO) << "Shut down of UDP port " << *udpPortNumber; LOG(logINFO) << "Shut down of UDP port " << *udpPortNumber;
fflush(stdout);
// wait only if the threads have started as it is the threads that
//give a post to semaphore(at stopListening)
if (runningFlag)
sem_wait(&semaphore_socket);
sem_destroy(&semaphore_socket);
} }
} }
@ -269,8 +256,6 @@ void Listener::StopListening(char* buf) {
(*((uint32_t*)buf)) = DUMMY_PACKET_VALUE; (*((uint32_t*)buf)) = DUMMY_PACKET_VALUE;
fifo->PushAddress(buf); fifo->PushAddress(buf);
StopRunning(); StopRunning();
sem_post(&semaphore_socket);
LOG(logDEBUG1) << index << ": Listening Packets (" << *udpPortNumber << ") : " << numPacketsCaught; LOG(logDEBUG1) << index << ": Listening Packets (" << *udpPortNumber << ") : " << numPacketsCaught;
LOG(logDEBUG1) << index << ": Listening Completed"; LOG(logDEBUG1) << index << ": Listening Completed";
} }

View File

@ -238,9 +238,6 @@ class Listener : private virtual slsDetectorDefs, public ThreadObject {
/** if the udp socket is connected */ /** if the udp socket is connected */
std::atomic<bool> udpSocketAlive{false}; std::atomic<bool> udpSocketAlive{false};
/** Semaphore to synchronize deleting udp socket */
sem_t semaphore_socket;
// for print progress during acquisition // for print progress during acquisition
/** number of packets for statistic */ /** number of packets for statistic */
uint32_t numPacketsStatistic{0}; uint32_t numPacketsStatistic{0};

View File

@ -3,36 +3,27 @@
* @short creates/destroys a thread * @short creates/destroys a thread
***********************************************/ ***********************************************/
#include "ThreadObject.h" #include "ThreadObject.h"
#include "container_utils.h" #include "container_utils.h"
#include <iostream> #include <iostream>
#include <sys/syscall.h> #include <sys/syscall.h>
#include <unistd.h> #include <unistd.h>
ThreadObject::ThreadObject(int threadIndex, std::string threadType) ThreadObject::ThreadObject(int threadIndex, std::string threadType)
: index(threadIndex), type(threadType) { : index(threadIndex), type(threadType) {
LOG(logDEBUG) << type << " thread created: " << index; LOG(logDEBUG) << type << " thread created: " << index;
sem_init(&semaphore,1,0); sem_init(&semaphore,1,0);
try { try {
threadObject = sls::make_unique<std::thread>(&ThreadObject::RunningThread, this); threadObject = std::thread(&ThreadObject::RunningThread, this);
} catch (...) { } catch (...) {
throw sls::RuntimeError("Could not create " + type + " thread with index " + std::to_string(index)); throw sls::RuntimeError("Could not create " + type + " thread with index " + std::to_string(index));
} }
} }
ThreadObject::~ThreadObject() { ThreadObject::~ThreadObject() {
killThread = true; killThread = true;
sem_post(&semaphore); sem_post(&semaphore);
threadObject.join();
threadObject->join();
sem_destroy(&semaphore); sem_destroy(&semaphore);
} }
@ -50,20 +41,13 @@ void ThreadObject::StopRunning() {
void ThreadObject::RunningThread() { void ThreadObject::RunningThread() {
LOG(logINFOBLUE) << "Created [ " << type << "Thread " << index << ", Tid: " << syscall(SYS_gettid) << "]"; LOG(logINFOBLUE) << "Created [ " << type << "Thread " << index << ", Tid: " << syscall(SYS_gettid) << "]";
LOG(logDEBUG) << type << " thread " << index << " created successfully."; while(!killThread) {
while(true) {
while(IsRunning()) { while(IsRunning()) {
ThreadExecution(); ThreadExecution();
} }
//wait till the next acquisition //wait till the next acquisition
sem_wait(&semaphore); sem_wait(&semaphore);
if(killThread) {
break;
} }
}
LOG(logDEBUG) << type << " thread with index " << index << " destroyed successfully.";
LOG(logINFOBLUE) << "Exiting [ " << type << " Thread " << index << ", Tid: " << syscall(SYS_gettid) << "]"; LOG(logINFOBLUE) << "Exiting [ " << type << " Thread " << index << ", Tid: " << syscall(SYS_gettid) << "]";
} }
@ -72,11 +56,10 @@ void ThreadObject::Continue() {
sem_post(&semaphore); sem_post(&semaphore);
} }
void ThreadObject::SetThreadPriority(int priority) { void ThreadObject::SetThreadPriority(int priority) {
struct sched_param param; struct sched_param param;
param.sched_priority = priority; param.sched_priority = priority;
if (pthread_setschedparam(threadObject->native_handle(), SCHED_FIFO, &param) == EPERM) { if (pthread_setschedparam(threadObject.native_handle(), SCHED_FIFO, &param) == EPERM) {
if (index == 0) { if (index == 0) {
LOG(logWARNING) << "Could not prioritize " << type << " thread. " LOG(logWARNING) << "Could not prioritize " << type << " thread. "
"(No Root Privileges?)"; "(No Root Privileges?)";

View File

@ -7,16 +7,24 @@
*@short creates/destroys a thread *@short creates/destroys a thread
*/ */
#include "sls_detector_defs.h"
#include "logger.h" #include "logger.h"
#include "sls_detector_defs.h"
#include <atomic>
#include <thread>
#include <semaphore.h> #include <semaphore.h>
#include <string> #include <string>
#include <atomic>
#include <future>
class ThreadObject : private virtual slsDetectorDefs { class ThreadObject : private virtual slsDetectorDefs {
protected:
const int index{0};
private:
std::atomic<bool> killThread{false};
std::atomic<bool> runningFlag{false};
std::thread threadObject;
sem_t semaphore;
const std::string type;
public: public:
ThreadObject(int threadIndex, std::string threadType); ThreadObject(int threadIndex, std::string threadType);
@ -27,24 +35,12 @@ class ThreadObject : private virtual slsDetectorDefs {
void Continue(); void Continue();
void SetThreadPriority(int priority); void SetThreadPriority(int priority);
protected:
virtual void ThreadExecution() = 0;
private: private:
virtual void ThreadExecution() = 0;
/** /**
* Thread called: An infinite while loop in which, * Thread called: An infinite while loop in which,
* semaphore starts executing its contents as long RunningMask is satisfied * semaphore starts executing its contents as long RunningMask is satisfied
* Then it exits the thread on its own if killThread is true * Then it exits the thread on its own if killThread is true
*/ */
void RunningThread(); void RunningThread();
protected:
int index{0};
std::string type;
std::atomic<bool> killThread{false};
std::atomic<bool> runningFlag{false};
std::unique_ptr<std::thread> threadObject;
sem_t semaphore;
}; };