rxr: switched to threads for threadObject (from pthread), moved priorities to threadObject, mutex

This commit is contained in:
2019-11-29 16:19:02 +01:00
parent 16bec25b0c
commit 1a8337540a
11 changed files with 75 additions and 285 deletions

View File

@ -30,7 +30,7 @@ DataProcessor::DataProcessor(int ind, detectorType dtype, Fifo* f,
bool* fp, bool* act, bool* depaden, bool* sm, bool* qe,
std::vector <int> * cdl, int* cdo, int* cad) :
ThreadObject(ind),
ThreadObject(ind, TypeName),
runningFlag(false),
generalData(nullptr),
fifo(f),
@ -62,7 +62,6 @@ DataProcessor::DataProcessor(int ind, detectorType dtype, Fifo* f,
rawDataModifyReadyCallBack(nullptr),
pRawDataReady(nullptr)
{
ThreadObject::CreateThread();
FILE_LOG(logDEBUG) << "DataProcessor " << ind << " created";
memset((void*)&timerBegin, 0, sizeof(timespec));
}
@ -71,13 +70,9 @@ DataProcessor::DataProcessor(int ind, detectorType dtype, Fifo* f,
DataProcessor::~DataProcessor() {
delete file;
delete [] tempBuffer;
ThreadObject::DestroyThread();
}
/** getters */
std::string DataProcessor::GetType(){
return TypeName;
}
bool DataProcessor::IsRunning() {
return runningFlag;
@ -155,20 +150,6 @@ void DataProcessor::SetGeneralData(GeneralData* g) {
}
void DataProcessor::SetThreadPriority(int priority) {
struct sched_param param;
param.sched_priority = priority;
if (pthread_setschedparam(thread, SCHED_FIFO, &param) == EPERM) {
if (!index) {
FILE_LOG(logWARNING) << "Could not prioritize dataprocessing thread. "
"(No Root Privileges?)";
}
} else {
FILE_LOG(logINFO) << "Priorities set - DataProcessor: " << priority;
}
}
void DataProcessor::SetFileFormat(const fileFormat f) {
if ((file != nullptr) && file->GetFileType() != f) {
//remember the pointer values before they are destroyed

View File

@ -18,7 +18,7 @@ const std::string DataStreamer::TypeName = "DataStreamer";
DataStreamer::DataStreamer(int ind, Fifo* f, uint32_t* dr, ROI* r,
uint64_t* fi, int fd, std::string* ajh, int* nd, bool* gpEnable, bool* qe) :
ThreadObject(ind),
ThreadObject(ind, TypeName),
runningFlag(0),
generalData(nullptr),
fifo(f),
@ -38,7 +38,6 @@ DataStreamer::DataStreamer(int ind, Fifo* f, uint32_t* dr, ROI* r,
numDet[0] = nd[0];
numDet[1] = nd[1];
ThreadObject::CreateThread();
FILE_LOG(logDEBUG) << "DataStreamer " << ind << " created";
}
@ -46,13 +45,9 @@ DataStreamer::DataStreamer(int ind, Fifo* f, uint32_t* dr, ROI* r,
DataStreamer::~DataStreamer() {
CloseZmqSocket();
delete [] completeBuffer;
ThreadObject::DestroyThread();
}
/** getters */
std::string DataStreamer::GetType(){
return TypeName;
}
bool DataStreamer::IsRunning() {
return runningFlag;
@ -104,19 +99,6 @@ void DataStreamer::SetGeneralData(GeneralData* g) {
generalData->Print();
}
void DataStreamer::SetThreadPriority(int priority) {
struct sched_param param;
param.sched_priority = priority;
if (pthread_setschedparam(thread, SCHED_FIFO, &param) == EPERM) {
if (!index) {
FILE_LOG(logWARNING) << "Could not prioritize datastreaming thread. "
"(No Root Privileges?)";
}
} else {
FILE_LOG(logINFO) << "Priorities set - DataStreamer: " << priority;
}
}
void DataStreamer::SetNumberofDetectors(int* nd) {
numDet[0] = nd[0];
numDet[1] = nd[1];

View File

@ -13,7 +13,6 @@
#include <string.h>
pthread_mutex_t HDF5File::Mutex = PTHREAD_MUTEX_INITIALIZER;
H5File* HDF5File::masterfd = 0;
hid_t HDF5File::virtualfd = 0;
@ -142,19 +141,14 @@ void HDF5File::CreateFile() {
uint64_t framestosave = ((*maxFramesPerFile == 0) ? *numImages : // infinite images
(((extNumImages - subFileIndex) > (*maxFramesPerFile)) ? // save up to maximum at a time
(*maxFramesPerFile) : (extNumImages-subFileIndex)));
pthread_mutex_lock(&Mutex);
try{
HDF5FileStatic::CreateDataFile(index, *overWriteEnable, currentFileName, (*numImages > 1),
std::lock_guard<std::mutex> lock(mutex);
HDF5FileStatic::CreateDataFile(index, *overWriteEnable, currentFileName, (*numImages > 1),
subFileIndex, framestosave, nPixelsY, ((*dynamicRange==4) ? (nPixelsX/2) : nPixelsX),
datatype, filefd, dataspace, dataset,
HDF5_WRITER_VERSION, MAX_CHUNKED_IMAGES,
dataspace_para, dataset_para,
parameterNames, parameterDataTypes);
} catch(const RuntimeError &e) {
pthread_mutex_unlock(&Mutex);
throw;
}
pthread_mutex_unlock(&Mutex);
if(!(*silentMode)) {
FILE_LOG(logINFO) << *udpPortNumber << ": HDF5 File created: " << currentFileName;
@ -163,9 +157,10 @@ void HDF5File::CreateFile() {
void HDF5File::CloseCurrentFile() {
pthread_mutex_lock(&Mutex);
HDF5FileStatic::CloseDataFile(index, filefd);
pthread_mutex_unlock(&Mutex);
{
std::lock_guard<std::mutex> lock(mutex);
HDF5FileStatic::CloseDataFile(index, filefd);
}
for (unsigned int i = 0; i < dataset_para.size(); ++i)
delete dataset_para[i];
dataset_para.clear();
@ -178,14 +173,14 @@ void HDF5File::CloseCurrentFile() {
void HDF5File::CloseAllFiles() {
numFilesinAcquisition = 0;
pthread_mutex_lock(&Mutex);
HDF5FileStatic::CloseDataFile(index, filefd);
if (master && (*detIndex==0)) {
HDF5FileStatic::CloseMasterDataFile(masterfd);
HDF5FileStatic::CloseVirtualDataFile(virtualfd);
{
std::lock_guard<std::mutex> lock(mutex);
HDF5FileStatic::CloseDataFile(index, filefd);
if (master && (*detIndex==0)) {
HDF5FileStatic::CloseMasterDataFile(masterfd);
HDF5FileStatic::CloseVirtualDataFile(virtualfd);
}
}
pthread_mutex_unlock(&Mutex);
for (unsigned int i = 0; i < dataset_para.size(); ++i)
delete dataset_para[i];
dataset_para.clear();
@ -206,9 +201,9 @@ void HDF5File::WriteToFile(char* buffer, int buffersize, uint64_t fnum, uint32_t
}
numFramesInFile++;
numActualPacketsInFile += nump;
pthread_mutex_lock(&Mutex);
try {
std::lock_guard<std::mutex> lock(mutex);
// extend dataset (when receiver start followed by many status starts (jungfrau)))
if (fnum >= extNumImages) {
HDF5FileStatic::ExtendDataset(index, dataspace, dataset,
@ -231,11 +226,6 @@ void HDF5File::WriteToFile(char* buffer, int buffersize, uint64_t fnum, uint32_t
((*maxFramesPerFile == 0) ? fnum : fnum%(*maxFramesPerFile)),
dataset_para, (sls_receiver_header*) (buffer),
parameterDataTypes);
} catch (const RuntimeError &e) {
pthread_mutex_unlock(&Mutex);
throw;
}
pthread_mutex_unlock(&Mutex);
}
@ -253,15 +243,10 @@ void HDF5File::CreateMasterFile(bool mfwenable, masterAttributes& attr) {
if(!(*silentMode)) {
FILE_LOG(logINFO) << "Master File: " << masterFileName;
}
pthread_mutex_lock(&Mutex);
std::lock_guard<std::mutex> lock(mutex);
attr.version = HDF5_WRITER_VERSION;
try{
HDF5FileStatic::CreateMasterDataFile(masterfd, masterFileName,
HDF5FileStatic::CreateMasterDataFile(masterfd, masterFileName,
*overWriteEnable, attr);
} catch (const RuntimeError &e) {
pthread_mutex_unlock(&Mutex);
throw;
}
}
}
@ -288,14 +273,13 @@ void HDF5File::EndofAcquisition(bool anyPacketsCaught, uint64_t numf) {
// called only by the one maser receiver
void HDF5File::CreateVirtualFile(uint64_t numf) {
pthread_mutex_lock(&Mutex);
std::lock_guard<std::mutex> lock(mutex);
std::string vname = HDF5FileStatic::CreateVirtualFileName(*filePath, *fileNamePrefix, *fileIndex);
if(!(*silentMode)) {
FILE_LOG(logINFO) << "Virtual File: " << vname;
}
try {
HDF5FileStatic::CreateVirtualDataFile(vname,
HDF5FileStatic::CreateVirtualDataFile(vname,
virtualfd, masterFileName,
*filePath, *fileNamePrefix, *fileIndex, (*numImages > 1),
*detIndex, *numUnitsPerDetector,
@ -306,11 +290,6 @@ void HDF5File::CreateVirtualFile(uint64_t numf) {
numDetY, numDetX, nPixelsY, ((*dynamicRange==4) ? (nPixelsX/2) : nPixelsX),
HDF5_WRITER_VERSION,
parameterNames, parameterDataTypes);
} catch (const RuntimeError &e) {
pthread_mutex_unlock(&Mutex);
throw;
}
pthread_mutex_unlock(&Mutex);
}
// called only by the one maser receiver
@ -321,13 +300,7 @@ void HDF5File::LinkVirtualFileinMasterFile() {
if ((*numImages > 1)) osfn << "_f" << std::setfill('0') << std::setw(12) << 0;
std::string dsetname = osfn.str();
pthread_mutex_lock(&Mutex);
try {
HDF5FileStatic::LinkVirtualInMaster(masterFileName, currentFileName,
std::lock_guard<std::mutex> lock(mutex);
HDF5FileStatic::LinkVirtualInMaster(masterFileName, currentFileName,
dsetname, parameterNames);
} catch (const RuntimeError &e) {
pthread_mutex_unlock(&Mutex);
throw;
}
pthread_mutex_unlock(&Mutex);
}

View File

@ -24,7 +24,7 @@ Listener::Listener(int ind, detectorType dtype, Fifo* f, std::atomic<runStatus>*
uint32_t* portno, std::string* e, uint64_t* nf, uint32_t* dr,
int64_t* us, int64_t* as, uint32_t* fpf,
frameDiscardPolicy* fdp, bool* act, bool* depaden, bool* sm) :
ThreadObject(ind),
ThreadObject(ind, TypeName),
runningFlag(0),
generalData(nullptr),
fifo(f),
@ -55,7 +55,6 @@ Listener::Listener(int ind, detectorType dtype, Fifo* f, std::atomic<runStatus>*
numFramesStatistic(0),
oddStartingPacket(true)
{
ThreadObject::CreateThread();
FILE_LOG(logDEBUG) << "Listener " << ind << " created";
}
@ -65,15 +64,9 @@ Listener::~Listener() {
sem_post(&semaphore_socket);
sem_destroy(&semaphore_socket);
}
ThreadObject::DestroyThread();
}
/** getters */
std::string Listener::GetType(){
return TypeName;
}
bool Listener::IsRunning() {
return runningFlag;
}
@ -153,19 +146,6 @@ void Listener::SetGeneralData(GeneralData* g) {
}
void Listener::SetThreadPriority(int priority) {
struct sched_param param;
param.sched_priority = priority;
if (pthread_setschedparam(thread, SCHED_FIFO, &param) == EPERM) {
if (!index) {
FILE_LOG(logWARNING) << "Could not prioritize listener thread. "
"(No Root Privileges?)";
}
} else {
FILE_LOG(logINFO) << "Priorities set - Listener: " << priority;
}
}
void Listener::CreateUDPSockets() {
if (!(*activated)) {

View File

@ -6,99 +6,71 @@
#include "ThreadObject.h"
#include "container_utils.h"
#include <iostream>
#include <syscall.h>
ThreadObject::ThreadObject(int ind):
index(ind),
alive(false),
killThread(false),
thread(0)
{
PrintMembers();
ThreadObject::ThreadObject(int threadIndex, std::string threadType)
: index(threadIndex), type(threadType) {
FILE_LOG(logDEBUG) << type << " thread created: " << index;
sem_init(&semaphore,1,0);
try {
threadObject = sls::make_unique<std::thread>(&ThreadObject::RunningThread, this);
} catch (...) {
throw sls::RuntimeError("Could not create " + type + " thread with index " + std::to_string(index));
}
}
ThreadObject::~ThreadObject() {
DestroyThread();
}
killThread = true;
sem_post(&semaphore);
threadObject->join();
void ThreadObject::PrintMembers() {
FILE_LOG(logDEBUG) << "Index : " << index
<< "\nalive: " << alive
<< "\nkillThread: " << killThread
<< "\npthread: " << thread;
}
void ThreadObject::DestroyThread() {
if(alive){
killThread = true;
sem_post(&semaphore);
pthread_join(thread,nullptr);
sem_destroy(&semaphore);
killThread = false;
alive = false;
FILE_LOG(logDEBUG) << GetType() << " thread with index " << index << " destroyed successfully.";
}
}
void ThreadObject::CreateThread() {
if (alive) {
throw sls::RuntimeError("Cannot create " + GetType() + " thread " + std::to_string(index) + ". Already alive");
}
sem_init(&semaphore,1,0);
killThread = false;
if (pthread_create(&thread, nullptr,StartThread, (void*) this)){
throw sls::RuntimeError("Could not create " + GetType() + " thread with index " + std::to_string(index));
}
alive = true;
FILE_LOG(logDEBUG) << GetType() << " thread " << index << " created successfully.";
}
void* ThreadObject::StartThread(void* thisPointer) {
((ThreadObject*)thisPointer)->RunningThread();
return thisPointer;
sem_destroy(&semaphore);
}
void ThreadObject::RunningThread() {
FILE_LOG(logINFOBLUE) << "Created [ " << GetType() << "Thread " << index << ", "
"Tid: " << syscall(SYS_gettid) << "]";
FILE_LOG(logINFOBLUE) << "Created [ " << type << "Thread " << index << ", Tid: " << syscall(SYS_gettid) << "]";
FILE_LOG(logDEBUG) << type << " thread " << index << " created successfully.";
while(true) {
while(IsRunning()) {
ThreadExecution();
}//end of inner loop
}
//wait till the next acquisition
sem_wait(&semaphore);
if(killThread) {
FILE_LOG(logINFOBLUE) << "Exiting [ " << GetType() <<
" Thread " << index << ", Tid: " << syscall(SYS_gettid) << "]";
pthread_exit(nullptr);
break;
}
}//end of outer loop
}
FILE_LOG(logDEBUG) << type << " thread with index " << index << " destroyed successfully.";
FILE_LOG(logINFOBLUE) << "Exiting [ " << type << " Thread " << index << ", Tid: " << syscall(SYS_gettid) << "]";
}
void ThreadObject::Continue() {
sem_post(&semaphore);
}
void ThreadObject::SetThreadPriority(int priority) {
struct sched_param param;
param.sched_priority = priority;
if (pthread_setschedparam(threadObject->native_handle(), SCHED_FIFO, &param) == EPERM) {
if (!index) {
FILE_LOG(logWARNING) << "Could not prioritize " << type << " thread. "
"(No Root Privileges?)";
}
} else {
FILE_LOG(logINFO) << "Priorities set - " << type << ": " << priority;
}
}