diff --git a/slsReceiverSoftware/include/UDPBaseImplementation.h b/slsReceiverSoftware/include/UDPBaseImplementation.h index 568183e42..7562b0056 100644 --- a/slsReceiverSoftware/include/UDPBaseImplementation.h +++ b/slsReceiverSoftware/include/UDPBaseImplementation.h @@ -413,9 +413,9 @@ class UDPBaseImplementation : protected virtual slsReceiverDefs, public UDPInter /** * Closes file / all files(if multiple files) - * @param i thread index (if multiple files used eg. root files) -1 for all threads + * @param i writer thread index */ - void closeFile(int i = -1); + void closeFile(int i = 0); //***callback functions*** diff --git a/slsReceiverSoftware/include/UDPInterface.h b/slsReceiverSoftware/include/UDPInterface.h index 9ad5a7e6a..2b3a3baba 100644 --- a/slsReceiverSoftware/include/UDPInterface.h +++ b/slsReceiverSoftware/include/UDPInterface.h @@ -470,9 +470,9 @@ class UDPInterface { /** * Closes file / all files(if multiple files) - * @param i thread index (if multiple files used eg. root files) -1 for all threads + * @param i writer thread index */ - virtual void closeFile(int i = -1) = 0; + virtual void closeFile(int i = 0) = 0; //***callback functions*** diff --git a/slsReceiverSoftware/include/UDPStandardImplementation.h b/slsReceiverSoftware/include/UDPStandardImplementation.h index 26c01262d..0f2c98bad 100644 --- a/slsReceiverSoftware/include/UDPStandardImplementation.h +++ b/slsReceiverSoftware/include/UDPStandardImplementation.h @@ -210,9 +210,9 @@ class UDPStandardImplementation: private virtual slsReceiverDefs, public UDPBase * Overridden method * Closes file / all files(data compression involves multiple files) * TCPIPInterface can also call this in case of illegal shutdown of receiver - * @param i thread index valid for datacompression using root files, -1 for all threads + * @param i writer thread index */ - void closeFile(int i = -1); + void closeFile(int i = 0); private: /************************************************************************* @@ -307,9 +307,10 @@ private: /** * Creates new file and reset some parameters + * @param ithread writer thread index * @return OK or FAIL */ - int createNewFile(); + int createNewFile(int ithread); /** * Creates new tree and file for compression @@ -428,10 +429,11 @@ private: /** * Calle by handleWithoutDataCompression * Creating headers Writing to file without compression + * @param ithread writer thread index * @param wbuffer is the address of buffer popped out of FIFO * @param numpackets is the number of packets */ - void writeFileWithoutCompression(char* wbuffer[],uint32_t numpackets); + void writeFileWithoutCompression(int ithread, char* wbuffer[],uint32_t numpackets); /** * Called by writeToFileWithoutCompression @@ -524,6 +526,9 @@ private: /** Complete File name */ char completeFileName[MAX_NUMBER_OF_WRITER_THREADS][MAX_STR_LENGTH]; + /** File Prefix with detector index */ + char receiverFilePrefix[MAX_NUMBER_OF_WRITER_THREADS][MAX_STR_LENGTH]; + /** Maximum Packets Per File **/ int maxPacketsPerFile; diff --git a/slsReceiverSoftware/src/UDPStandardImplementation.cpp b/slsReceiverSoftware/src/UDPStandardImplementation.cpp index 3907f7834..92e4e9636 100644 --- a/slsReceiverSoftware/src/UDPStandardImplementation.cpp +++ b/slsReceiverSoftware/src/UDPStandardImplementation.cpp @@ -10,6 +10,8 @@ #include "gotthardModuleData.h" #include "gotthardShortModuleData.h" +#include "fileIOStatic.h" + #include // exit() #include //set precision for printing parameters for create new file #include //map @@ -74,7 +76,8 @@ void UDPStandardImplementation::deleteMembers(){ FILE_LOG(logDEBUG) << "Info: Deleting member pointers"; shutDownUDPSockets(); - closeFile(); + for(int i=0;i config_map){ } -/***file parameters***/ int UDPStandardImplementation::setDataCompressionEnable(const bool b){ FILE_LOG(logDEBUG) << __AT__ << " starting"; @@ -958,12 +961,14 @@ void UDPStandardImplementation::stopReceiver(){ //wait until status is run_finished while(status == TRANSMITTING){ - sem_post(&writerGuiSemaphore); + for(int i=0; i < numberofWriterThreads; i++) + sem_post(&writerGuiSemaphore[i]); usleep(5000); } //semaphore destroy - sem_destroy(&writerGuiSemaphore); + for(int i=0; i < numberofWriterThreads; i++) + sem_destroy(&writerGuiSemaphore[i]); //change status pthread_mutex_lock(&statusMutex); @@ -982,8 +987,6 @@ void UDPStandardImplementation::stopReceiver(){ int UDPStandardImplementation::shutDownUDPSockets(){ FILE_LOG(logDEBUG) << __AT__ << " called"; - - for(int i=0;iShutDownSocket(); @@ -1033,8 +1036,6 @@ void UDPStandardImplementation::startReadout(){ } } - - //set status pthread_mutex_lock(&statusMutex); status = TRANSMITTING; @@ -1049,26 +1050,26 @@ void UDPStandardImplementation::startReadout(){ - -void UDPStandardImplementation::readFrame(char* c,char** raw, uint64_t &startAcq, uint64_t &startFrame){ +/**make this better by asking all of it at once*/ +void UDPStandardImplementation::readFrame(int wThread, char* c,char** raw, uint64_t &startAcq, uint64_t &startFrame){ FILE_LOG(logDEBUG) << __AT__ << " called"; //point to gui data, to let writer thread know that gui is back for data - if (guiData == NULL){ - guiData = latestData; + if (guiData[wThread] == NULL){ + guiData[wThread] = latestData[wThread]; #ifdef DEBUG4 cprintf(CYAN,"Info: gui data not null anymore - ready to get data\n"); #endif } //copy data and filename - strcpy(c,guiFileName); - startAcq = startAcquisitionIndex; - startFrame = startFrameIndex; + strcpy(c,guiFileName[wThread]); + startAcq = startAcquisitionIndex[wThread]; + startFrame = startFrameIndex[wThread]; //gui data not copied yet - if(!guiDataReady){ + if(!guiDataReady[wThread]){ #ifdef DEBUG4 cprintf(CYAN,"Info: gui data not ready\n"); #endif @@ -1080,8 +1081,8 @@ void UDPStandardImplementation::readFrame(char* c,char** raw, uint64_t &startAcq #ifdef DEBUG4 cprintf(CYAN,"Info: gui data ready\n"); #endif - *raw = guiData; - guiData = NULL; + *raw = guiData[wThread]; + guiData[wThread] = NULL; //for nth frame to gui, post semaphore so writer stops waiting if((FrameToGuiFrequency) && (writerThreadsMask)){ @@ -1089,7 +1090,7 @@ void UDPStandardImplementation::readFrame(char* c,char** raw, uint64_t &startAcq cprintf(CYAN,"Info: gonna post\n"); #endif //release after getting data - sem_post(&writerGuiSemaphore); + sem_post(&writerGuiSemaphore[wThread]); } #ifdef DEBUG4 cprintf(CYAN,"Info: done post\n"); @@ -1105,24 +1106,24 @@ void UDPStandardImplementation::closeFile(int i){ //normal if(!dataCompressionEnable){ - if(sfilefd){ + if(sfilefd[i]){ #ifdef DEBUG4 FILE_LOG(logDEBUG4) << "Going to close file: " << fileno(sfilefd)); #endif - fclose(sfilefd); - sfilefd = NULL; + fclose(sfilefd[i]); + sfilefd[i] = NULL; } } //compression else{ #if (defined(MYROOT1) && defined(ALLFILE_DEBUG)) || !defined(MYROOT1) - if(sfilefd){ + if(sfilefd[i]){ #ifdef DEBUG4 - FILE_LOG(logDEBUG4) << "sfield: " << (int)sfilefd; + FILE_LOG(logDEBUG4) << "sfield: " << (int)sfilefd[i]; #endif - fclose(sfilefd); - sfilefd = NULL; + fclose(sfilefd[i]); + sfilefd[i] = NULL; } #endif @@ -1293,7 +1294,9 @@ void UDPStandardImplementation::setThreadPriorities(){ rights = false; if(!rights){ - FILE_LOG(logWARNING) << "No root permission to prioritize threads."; + FILE_LOG(logWARNING) << "Unable to prioritize threads. Root privileges required for this option."; + }else{ + FILE_LOG(logINFO) << "Priorities set - TCP:50, Listening:99, Writing:90"; } } @@ -1375,7 +1378,8 @@ int UDPStandardImplementation::setupWriter(){ //creating first file //setting all value to 1 pthread_mutex_lock(&statusMutex); - for(int i=0; i= (uint32_t)maxPacketsPerFile) - createNewFile(); + createNewFile(0); pthread_mutex_unlock(&progressMutex); #endif