diff --git a/slsDetectorSoftware/src/multiSlsDetector.h b/slsDetectorSoftware/src/multiSlsDetector.h index e992658c9..a90d05e21 100755 --- a/slsDetectorSoftware/src/multiSlsDetector.h +++ b/slsDetectorSoftware/src/multiSlsDetector.h @@ -462,7 +462,6 @@ class multiSlsDetector : public virtual slsDetectorDefs { bool jointhread{false}; /** the data processing thread */ - // pthread_t dataProcessingThread; std::thread dataProcessingThread; /** detector data packed for the gui */ diff --git a/slsReceiverSoftware/include/DataProcessor.h b/slsReceiverSoftware/include/DataProcessor.h index ff0a430e8..7dbce1eb1 100755 --- a/slsReceiverSoftware/include/DataProcessor.h +++ b/slsReceiverSoftware/include/DataProcessor.h @@ -118,12 +118,6 @@ class DataProcessor : private virtual slsDetectorDefs, public ThreadObject { */ void SetGeneralData(GeneralData* g); - /** - * Set thread priority - * @priority priority - */ - void SetThreadPriority(int priority); - /** * Set File Format * @param f file format @@ -199,12 +193,6 @@ class DataProcessor : private virtual slsDetectorDefs, public ThreadObject { private: - /** - * Get Type - * @return type - */ - std::string GetType() override; - /** * Record First Index * @param fnum frame index to record diff --git a/slsReceiverSoftware/include/DataStreamer.h b/slsReceiverSoftware/include/DataStreamer.h index f341fa960..c3fdd8d22 100755 --- a/slsReceiverSoftware/include/DataStreamer.h +++ b/slsReceiverSoftware/include/DataStreamer.h @@ -79,12 +79,6 @@ class DataStreamer : private virtual slsDetectorDefs, public ThreadObject { */ void SetGeneralData(GeneralData* g); - /** - * Set thread priority - * @priority priority - */ - void SetThreadPriority(int priority); - /** * Set number of detectors * @param number of detectors in both dimensions @@ -119,12 +113,6 @@ class DataStreamer : private virtual slsDetectorDefs, public ThreadObject { private: - /** - * Get Type - * @return type - */ - std::string GetType(); - /** * Record First Index * @param fnum frame index to record diff --git a/slsReceiverSoftware/include/HDF5File.h b/slsReceiverSoftware/include/HDF5File.h index 5706ec126..722758cc0 100755 --- a/slsReceiverSoftware/include/HDF5File.h +++ b/slsReceiverSoftware/include/HDF5File.h @@ -17,6 +17,7 @@ #ifndef H5_NO_NAMESPACE using namespace H5; #endif +#include class HDF5File : private virtual slsDetectorDefs, public File, public HDF5FileStatic { @@ -130,9 +131,8 @@ class HDF5File : private virtual slsDetectorDefs, public File, public HDF5FileSt void UpdateDataType(); - /** mutex to update static items among objects (threads)*/ - static pthread_mutex_t Mutex; + static mutable std::mutex mutex; /** Master File handle */ static H5File* masterfd; diff --git a/slsReceiverSoftware/include/Listener.h b/slsReceiverSoftware/include/Listener.h index 2b72b64ec..423df0fa2 100755 --- a/slsReceiverSoftware/include/Listener.h +++ b/slsReceiverSoftware/include/Listener.h @@ -102,12 +102,6 @@ class Listener : private virtual slsDetectorDefs, public ThreadObject { */ void SetGeneralData(GeneralData* g); - /** - * Set thread priority - * @priority priority - */ - void SetThreadPriority(int priority); - /** * Creates UDP Sockets */ @@ -136,12 +130,6 @@ class Listener : private virtual slsDetectorDefs, public ThreadObject { private: - /** - * Get Type - * @return type - */ - std::string GetType() override; - /** * Record First Acquisition Index * @param fnum frame index to record diff --git a/slsReceiverSoftware/include/ThreadObject.h b/slsReceiverSoftware/include/ThreadObject.h index 6d68a8db4..cc9ffddf1 100755 --- a/slsReceiverSoftware/include/ThreadObject.h +++ b/slsReceiverSoftware/include/ThreadObject.h @@ -10,76 +10,27 @@ #include "sls_detector_defs.h" #include "logger.h" -#include + #include #include +#include +#include class ThreadObject : private virtual slsDetectorDefs { public: - /** - * Constructor - * @param ind self index - */ - ThreadObject(int ind); - - /** - * Destructor - * if alive, destroys thread - */ + ThreadObject(int threadIndex, std::string threadType); virtual ~ThreadObject(); - - /** - * Print all member values - */ - void PrintMembers(); - - - /** - * Get Type - * @return type - */ - virtual std::string GetType() = 0; - - /** - * Returns if the thread is currently running - * @returns true if thread is running, else false - */ virtual bool IsRunning() = 0; - - /** - * What is really being executed in the thread - */ - virtual void ThreadExecution() = 0; - - /** - * Post semaphore so thread can continue & start an acquisition - */ void Continue(); + void SetThreadPriority(int priority); protected: - - /** - * Destroy thread, semaphore and resets alive and killThread - */ - void DestroyThread(); - - /** - * Create Thread, sets semaphore, alive and killThread - */ - void CreateThread(); - + virtual void ThreadExecution() = 0; private: - /** - * Static function using pointer from argument to call RunningThread() - * @param thisPointer pointer to an object of ThreadObject - */ - static void* StartThread(void *thisPointer); - - /** - * Actual Thread called: An infinite while loop in which, + * Thread called: An infinite while loop in which, * semaphore starts executing its contents as long RunningMask is satisfied * Then it exits the thread on its own if killThread is true */ @@ -87,22 +38,10 @@ class ThreadObject : private virtual slsDetectorDefs { protected: - /** Self Index */ - int index; - - /** Thread is alive/dead */ - volatile bool alive; - - /** Variable monitored by thread to kills itself */ - volatile bool killThread; - - /** Thread variable */ - pthread_t thread; - - /** Semaphore to synchonize starting of each run */ + int index{0}; + std::string type; + std::atomic killThread{false}; + std::unique_ptr threadObject; sem_t semaphore; - - - }; diff --git a/slsReceiverSoftware/src/DataProcessor.cpp b/slsReceiverSoftware/src/DataProcessor.cpp index 98907e54d..f81673b55 100755 --- a/slsReceiverSoftware/src/DataProcessor.cpp +++ b/slsReceiverSoftware/src/DataProcessor.cpp @@ -30,7 +30,7 @@ DataProcessor::DataProcessor(int ind, detectorType dtype, Fifo* f, bool* fp, bool* act, bool* depaden, bool* sm, bool* qe, std::vector * cdl, int* cdo, int* cad) : - ThreadObject(ind), + ThreadObject(ind, TypeName), runningFlag(false), generalData(nullptr), fifo(f), @@ -62,7 +62,6 @@ DataProcessor::DataProcessor(int ind, detectorType dtype, Fifo* f, rawDataModifyReadyCallBack(nullptr), pRawDataReady(nullptr) { - ThreadObject::CreateThread(); FILE_LOG(logDEBUG) << "DataProcessor " << ind << " created"; memset((void*)&timerBegin, 0, sizeof(timespec)); } @@ -71,13 +70,9 @@ DataProcessor::DataProcessor(int ind, detectorType dtype, Fifo* f, DataProcessor::~DataProcessor() { delete file; delete [] tempBuffer; - ThreadObject::DestroyThread(); } /** getters */ -std::string DataProcessor::GetType(){ - return TypeName; -} bool DataProcessor::IsRunning() { return runningFlag; @@ -155,20 +150,6 @@ void DataProcessor::SetGeneralData(GeneralData* g) { } -void DataProcessor::SetThreadPriority(int priority) { - struct sched_param param; - param.sched_priority = priority; - if (pthread_setschedparam(thread, SCHED_FIFO, ¶m) == EPERM) { - if (!index) { - FILE_LOG(logWARNING) << "Could not prioritize dataprocessing thread. " - "(No Root Privileges?)"; - } - } else { - FILE_LOG(logINFO) << "Priorities set - DataProcessor: " << priority; - } -} - - void DataProcessor::SetFileFormat(const fileFormat f) { if ((file != nullptr) && file->GetFileType() != f) { //remember the pointer values before they are destroyed diff --git a/slsReceiverSoftware/src/DataStreamer.cpp b/slsReceiverSoftware/src/DataStreamer.cpp index cd4a95302..b2f786472 100755 --- a/slsReceiverSoftware/src/DataStreamer.cpp +++ b/slsReceiverSoftware/src/DataStreamer.cpp @@ -18,7 +18,7 @@ const std::string DataStreamer::TypeName = "DataStreamer"; DataStreamer::DataStreamer(int ind, Fifo* f, uint32_t* dr, ROI* r, uint64_t* fi, int fd, std::string* ajh, int* nd, bool* gpEnable, bool* qe) : - ThreadObject(ind), + ThreadObject(ind, TypeName), runningFlag(0), generalData(nullptr), fifo(f), @@ -38,7 +38,6 @@ DataStreamer::DataStreamer(int ind, Fifo* f, uint32_t* dr, ROI* r, numDet[0] = nd[0]; numDet[1] = nd[1]; - ThreadObject::CreateThread(); FILE_LOG(logDEBUG) << "DataStreamer " << ind << " created"; } @@ -46,13 +45,9 @@ DataStreamer::DataStreamer(int ind, Fifo* f, uint32_t* dr, ROI* r, DataStreamer::~DataStreamer() { CloseZmqSocket(); delete [] completeBuffer; - ThreadObject::DestroyThread(); } /** getters */ -std::string DataStreamer::GetType(){ - return TypeName; -} bool DataStreamer::IsRunning() { return runningFlag; @@ -104,19 +99,6 @@ void DataStreamer::SetGeneralData(GeneralData* g) { generalData->Print(); } -void DataStreamer::SetThreadPriority(int priority) { - struct sched_param param; - param.sched_priority = priority; - if (pthread_setschedparam(thread, SCHED_FIFO, ¶m) == EPERM) { - if (!index) { - FILE_LOG(logWARNING) << "Could not prioritize datastreaming thread. " - "(No Root Privileges?)"; - } - } else { - FILE_LOG(logINFO) << "Priorities set - DataStreamer: " << priority; - } -} - void DataStreamer::SetNumberofDetectors(int* nd) { numDet[0] = nd[0]; numDet[1] = nd[1]; diff --git a/slsReceiverSoftware/src/HDF5File.cpp b/slsReceiverSoftware/src/HDF5File.cpp index 2fd45038b..20f5a87e0 100755 --- a/slsReceiverSoftware/src/HDF5File.cpp +++ b/slsReceiverSoftware/src/HDF5File.cpp @@ -13,7 +13,6 @@ #include -pthread_mutex_t HDF5File::Mutex = PTHREAD_MUTEX_INITIALIZER; H5File* HDF5File::masterfd = 0; hid_t HDF5File::virtualfd = 0; @@ -142,19 +141,14 @@ void HDF5File::CreateFile() { uint64_t framestosave = ((*maxFramesPerFile == 0) ? *numImages : // infinite images (((extNumImages - subFileIndex) > (*maxFramesPerFile)) ? // save up to maximum at a time (*maxFramesPerFile) : (extNumImages-subFileIndex))); - pthread_mutex_lock(&Mutex); - try{ - HDF5FileStatic::CreateDataFile(index, *overWriteEnable, currentFileName, (*numImages > 1), + + std::lock_guard lock(mutex); + HDF5FileStatic::CreateDataFile(index, *overWriteEnable, currentFileName, (*numImages > 1), subFileIndex, framestosave, nPixelsY, ((*dynamicRange==4) ? (nPixelsX/2) : nPixelsX), datatype, filefd, dataspace, dataset, HDF5_WRITER_VERSION, MAX_CHUNKED_IMAGES, dataspace_para, dataset_para, parameterNames, parameterDataTypes); - } catch(const RuntimeError &e) { - pthread_mutex_unlock(&Mutex); - throw; - } - pthread_mutex_unlock(&Mutex); if(!(*silentMode)) { FILE_LOG(logINFO) << *udpPortNumber << ": HDF5 File created: " << currentFileName; @@ -163,9 +157,10 @@ void HDF5File::CreateFile() { void HDF5File::CloseCurrentFile() { - pthread_mutex_lock(&Mutex); - HDF5FileStatic::CloseDataFile(index, filefd); - pthread_mutex_unlock(&Mutex); + { + std::lock_guard lock(mutex); + HDF5FileStatic::CloseDataFile(index, filefd); + } for (unsigned int i = 0; i < dataset_para.size(); ++i) delete dataset_para[i]; dataset_para.clear(); @@ -178,14 +173,14 @@ void HDF5File::CloseCurrentFile() { void HDF5File::CloseAllFiles() { numFilesinAcquisition = 0; - pthread_mutex_lock(&Mutex); - HDF5FileStatic::CloseDataFile(index, filefd); - if (master && (*detIndex==0)) { - HDF5FileStatic::CloseMasterDataFile(masterfd); - HDF5FileStatic::CloseVirtualDataFile(virtualfd); + { + std::lock_guard lock(mutex); + HDF5FileStatic::CloseDataFile(index, filefd); + if (master && (*detIndex==0)) { + HDF5FileStatic::CloseMasterDataFile(masterfd); + HDF5FileStatic::CloseVirtualDataFile(virtualfd); + } } - pthread_mutex_unlock(&Mutex); - for (unsigned int i = 0; i < dataset_para.size(); ++i) delete dataset_para[i]; dataset_para.clear(); @@ -206,9 +201,9 @@ void HDF5File::WriteToFile(char* buffer, int buffersize, uint64_t fnum, uint32_t } numFramesInFile++; numActualPacketsInFile += nump; - pthread_mutex_lock(&Mutex); - try { + std::lock_guard lock(mutex); + // extend dataset (when receiver start followed by many status starts (jungfrau))) if (fnum >= extNumImages) { HDF5FileStatic::ExtendDataset(index, dataspace, dataset, @@ -231,11 +226,6 @@ void HDF5File::WriteToFile(char* buffer, int buffersize, uint64_t fnum, uint32_t ((*maxFramesPerFile == 0) ? fnum : fnum%(*maxFramesPerFile)), dataset_para, (sls_receiver_header*) (buffer), parameterDataTypes); - } catch (const RuntimeError &e) { - pthread_mutex_unlock(&Mutex); - throw; - } - pthread_mutex_unlock(&Mutex); } @@ -253,15 +243,10 @@ void HDF5File::CreateMasterFile(bool mfwenable, masterAttributes& attr) { if(!(*silentMode)) { FILE_LOG(logINFO) << "Master File: " << masterFileName; } - pthread_mutex_lock(&Mutex); + std::lock_guard lock(mutex); attr.version = HDF5_WRITER_VERSION; - try{ - HDF5FileStatic::CreateMasterDataFile(masterfd, masterFileName, + HDF5FileStatic::CreateMasterDataFile(masterfd, masterFileName, *overWriteEnable, attr); - } catch (const RuntimeError &e) { - pthread_mutex_unlock(&Mutex); - throw; - } } } @@ -288,14 +273,13 @@ void HDF5File::EndofAcquisition(bool anyPacketsCaught, uint64_t numf) { // called only by the one maser receiver void HDF5File::CreateVirtualFile(uint64_t numf) { - pthread_mutex_lock(&Mutex); + std::lock_guard lock(mutex); std::string vname = HDF5FileStatic::CreateVirtualFileName(*filePath, *fileNamePrefix, *fileIndex); if(!(*silentMode)) { FILE_LOG(logINFO) << "Virtual File: " << vname; } - try { - HDF5FileStatic::CreateVirtualDataFile(vname, + HDF5FileStatic::CreateVirtualDataFile(vname, virtualfd, masterFileName, *filePath, *fileNamePrefix, *fileIndex, (*numImages > 1), *detIndex, *numUnitsPerDetector, @@ -306,11 +290,6 @@ void HDF5File::CreateVirtualFile(uint64_t numf) { numDetY, numDetX, nPixelsY, ((*dynamicRange==4) ? (nPixelsX/2) : nPixelsX), HDF5_WRITER_VERSION, parameterNames, parameterDataTypes); - } catch (const RuntimeError &e) { - pthread_mutex_unlock(&Mutex); - throw; - } - pthread_mutex_unlock(&Mutex); } // called only by the one maser receiver @@ -321,13 +300,7 @@ void HDF5File::LinkVirtualFileinMasterFile() { if ((*numImages > 1)) osfn << "_f" << std::setfill('0') << std::setw(12) << 0; std::string dsetname = osfn.str(); - pthread_mutex_lock(&Mutex); - try { - HDF5FileStatic::LinkVirtualInMaster(masterFileName, currentFileName, + std::lock_guard lock(mutex); + HDF5FileStatic::LinkVirtualInMaster(masterFileName, currentFileName, dsetname, parameterNames); - } catch (const RuntimeError &e) { - pthread_mutex_unlock(&Mutex); - throw; - } - pthread_mutex_unlock(&Mutex); } diff --git a/slsReceiverSoftware/src/Listener.cpp b/slsReceiverSoftware/src/Listener.cpp index 88957077c..2347f3da1 100755 --- a/slsReceiverSoftware/src/Listener.cpp +++ b/slsReceiverSoftware/src/Listener.cpp @@ -24,7 +24,7 @@ Listener::Listener(int ind, detectorType dtype, Fifo* f, std::atomic* uint32_t* portno, std::string* e, uint64_t* nf, uint32_t* dr, int64_t* us, int64_t* as, uint32_t* fpf, frameDiscardPolicy* fdp, bool* act, bool* depaden, bool* sm) : - ThreadObject(ind), + ThreadObject(ind, TypeName), runningFlag(0), generalData(nullptr), fifo(f), @@ -55,7 +55,6 @@ Listener::Listener(int ind, detectorType dtype, Fifo* f, std::atomic* numFramesStatistic(0), oddStartingPacket(true) { - ThreadObject::CreateThread(); FILE_LOG(logDEBUG) << "Listener " << ind << " created"; } @@ -65,15 +64,9 @@ Listener::~Listener() { sem_post(&semaphore_socket); sem_destroy(&semaphore_socket); } - - ThreadObject::DestroyThread(); } /** getters */ -std::string Listener::GetType(){ - return TypeName; -} - bool Listener::IsRunning() { return runningFlag; } @@ -153,19 +146,6 @@ void Listener::SetGeneralData(GeneralData* g) { } -void Listener::SetThreadPriority(int priority) { - struct sched_param param; - param.sched_priority = priority; - if (pthread_setschedparam(thread, SCHED_FIFO, ¶m) == EPERM) { - if (!index) { - FILE_LOG(logWARNING) << "Could not prioritize listener thread. " - "(No Root Privileges?)"; - } - } else { - FILE_LOG(logINFO) << "Priorities set - Listener: " << priority; - } -} - void Listener::CreateUDPSockets() { if (!(*activated)) { diff --git a/slsReceiverSoftware/src/ThreadObject.cpp b/slsReceiverSoftware/src/ThreadObject.cpp index 69e8dfca7..c4c426152 100755 --- a/slsReceiverSoftware/src/ThreadObject.cpp +++ b/slsReceiverSoftware/src/ThreadObject.cpp @@ -6,99 +6,71 @@ #include "ThreadObject.h" +#include "container_utils.h" #include #include -ThreadObject::ThreadObject(int ind): - index(ind), - alive(false), - killThread(false), - thread(0) -{ - PrintMembers(); +ThreadObject::ThreadObject(int threadIndex, std::string threadType) + : index(threadIndex), type(threadType) { + FILE_LOG(logDEBUG) << type << " thread created: " << index; + + sem_init(&semaphore,1,0); + + try { + threadObject = sls::make_unique(&ThreadObject::RunningThread, this); + } catch (...) { + throw sls::RuntimeError("Could not create " + type + " thread with index " + std::to_string(index)); + } } ThreadObject::~ThreadObject() { - DestroyThread(); -} + killThread = true; + sem_post(&semaphore); + threadObject->join(); -void ThreadObject::PrintMembers() { - FILE_LOG(logDEBUG) << "Index : " << index - << "\nalive: " << alive - << "\nkillThread: " << killThread - << "\npthread: " << thread; -} - - -void ThreadObject::DestroyThread() { - if(alive){ - killThread = true; - sem_post(&semaphore); - pthread_join(thread,nullptr); - sem_destroy(&semaphore); - killThread = false; - alive = false; - FILE_LOG(logDEBUG) << GetType() << " thread with index " << index << " destroyed successfully."; - } -} - - -void ThreadObject::CreateThread() { - if (alive) { - throw sls::RuntimeError("Cannot create " + GetType() + " thread " + std::to_string(index) + ". Already alive"); - } - sem_init(&semaphore,1,0); - killThread = false; - - if (pthread_create(&thread, nullptr,StartThread, (void*) this)){ - throw sls::RuntimeError("Could not create " + GetType() + " thread with index " + std::to_string(index)); - } - alive = true; - FILE_LOG(logDEBUG) << GetType() << " thread " << index << " created successfully."; -} - - -void* ThreadObject::StartThread(void* thisPointer) { - ((ThreadObject*)thisPointer)->RunningThread(); - return thisPointer; + sem_destroy(&semaphore); } void ThreadObject::RunningThread() { - FILE_LOG(logINFOBLUE) << "Created [ " << GetType() << "Thread " << index << ", " - "Tid: " << syscall(SYS_gettid) << "]"; + FILE_LOG(logINFOBLUE) << "Created [ " << type << "Thread " << index << ", Tid: " << syscall(SYS_gettid) << "]"; + FILE_LOG(logDEBUG) << type << " thread " << index << " created successfully."; + while(true) { - while(IsRunning()) { - ThreadExecution(); - - }//end of inner loop - - + } //wait till the next acquisition sem_wait(&semaphore); - if(killThread) { - FILE_LOG(logINFOBLUE) << "Exiting [ " << GetType() << - " Thread " << index << ", Tid: " << syscall(SYS_gettid) << "]"; - pthread_exit(nullptr); + break; } - - }//end of outer loop + } + + FILE_LOG(logDEBUG) << type << " thread with index " << index << " destroyed successfully."; + FILE_LOG(logINFOBLUE) << "Exiting [ " << type << " Thread " << index << ", Tid: " << syscall(SYS_gettid) << "]"; } - - void ThreadObject::Continue() { sem_post(&semaphore); } - +void ThreadObject::SetThreadPriority(int priority) { + struct sched_param param; + param.sched_priority = priority; + if (pthread_setschedparam(threadObject->native_handle(), SCHED_FIFO, ¶m) == EPERM) { + if (!index) { + FILE_LOG(logWARNING) << "Could not prioritize " << type << " thread. " + "(No Root Privileges?)"; + } + } else { + FILE_LOG(logINFO) << "Priorities set - " << type << ": " << priority; + } +}