in between

This commit is contained in:
Dhanya Maliakal 2016-08-17 09:33:59 +02:00
parent b440c11a46
commit 0a2a88e23f
4 changed files with 76 additions and 54 deletions

View File

@ -413,9 +413,9 @@ class UDPBaseImplementation : protected virtual slsReceiverDefs, public UDPInter
/** /**
* Closes file / all files(if multiple files) * Closes file / all files(if multiple files)
* @param i thread index (if multiple files used eg. root files) -1 for all threads * @param i writer thread index
*/ */
void closeFile(int i = -1); void closeFile(int i = 0);
//***callback functions*** //***callback functions***

View File

@ -470,9 +470,9 @@ class UDPInterface {
/** /**
* Closes file / all files(if multiple files) * Closes file / all files(if multiple files)
* @param i thread index (if multiple files used eg. root files) -1 for all threads * @param i writer thread index
*/ */
virtual void closeFile(int i = -1) = 0; virtual void closeFile(int i = 0) = 0;
//***callback functions*** //***callback functions***

View File

@ -210,9 +210,9 @@ class UDPStandardImplementation: private virtual slsReceiverDefs, public UDPBase
* Overridden method * Overridden method
* Closes file / all files(data compression involves multiple files) * Closes file / all files(data compression involves multiple files)
* TCPIPInterface can also call this in case of illegal shutdown of receiver * TCPIPInterface can also call this in case of illegal shutdown of receiver
* @param i thread index valid for datacompression using root files, -1 for all threads * @param i writer thread index
*/ */
void closeFile(int i = -1); void closeFile(int i = 0);
private: private:
/************************************************************************* /*************************************************************************
@ -307,9 +307,10 @@ private:
/** /**
* Creates new file and reset some parameters * Creates new file and reset some parameters
* @param ithread writer thread index
* @return OK or FAIL * @return OK or FAIL
*/ */
int createNewFile(); int createNewFile(int ithread);
/** /**
* Creates new tree and file for compression * Creates new tree and file for compression
@ -428,10 +429,11 @@ private:
/** /**
* Calle by handleWithoutDataCompression * Calle by handleWithoutDataCompression
* Creating headers Writing to file without compression * Creating headers Writing to file without compression
* @param ithread writer thread index
* @param wbuffer is the address of buffer popped out of FIFO * @param wbuffer is the address of buffer popped out of FIFO
* @param numpackets is the number of packets * @param numpackets is the number of packets
*/ */
void writeFileWithoutCompression(char* wbuffer[],uint32_t numpackets); void writeFileWithoutCompression(int ithread, char* wbuffer[],uint32_t numpackets);
/** /**
* Called by writeToFileWithoutCompression * Called by writeToFileWithoutCompression
@ -524,6 +526,9 @@ private:
/** Complete File name */ /** Complete File name */
char completeFileName[MAX_NUMBER_OF_WRITER_THREADS][MAX_STR_LENGTH]; char completeFileName[MAX_NUMBER_OF_WRITER_THREADS][MAX_STR_LENGTH];
/** File Prefix with detector index */
char receiverFilePrefix[MAX_NUMBER_OF_WRITER_THREADS][MAX_STR_LENGTH];
/** Maximum Packets Per File **/ /** Maximum Packets Per File **/
int maxPacketsPerFile; int maxPacketsPerFile;

View File

@ -10,6 +10,8 @@
#include "gotthardModuleData.h" #include "gotthardModuleData.h"
#include "gotthardShortModuleData.h" #include "gotthardShortModuleData.h"
#include "fileIOStatic.h"
#include <stdlib.h> // exit() #include <stdlib.h> // exit()
#include <iomanip> //set precision for printing parameters for create new file #include <iomanip> //set precision for printing parameters for create new file
#include <map> //map #include <map> //map
@ -74,7 +76,8 @@ void UDPStandardImplementation::deleteMembers(){
FILE_LOG(logDEBUG) << "Info: Deleting member pointers"; FILE_LOG(logDEBUG) << "Info: Deleting member pointers";
shutDownUDPSockets(); shutDownUDPSockets();
closeFile(); for(int i=0;i<MAX_NUMBER_OF_WRITER_THREADS; i++)
closeFile(i);
//filter //filter
deleteFilter(); deleteFilter();
for(int i=0; i<MAX_NUMBER_OF_LISTENING_THREADS; i++){ for(int i=0; i<MAX_NUMBER_OF_LISTENING_THREADS; i++){
@ -141,6 +144,7 @@ void UDPStandardImplementation::initializeMembers(){
#endif #endif
for(int i=0; i<MAX_NUMBER_OF_WRITER_THREADS; i++){ for(int i=0; i<MAX_NUMBER_OF_WRITER_THREADS; i++){
strcpy(completeFileName[i],""); strcpy(completeFileName[i],"");
strcpy(receiverFilePrefix[i],"");
strcpy(fileHeader[i],""); strcpy(fileHeader[i],"");
} }
maxPacketsPerFile = 0; maxPacketsPerFile = 0;
@ -415,7 +419,6 @@ void UDPStandardImplementation::configure(map<string, string> config_map){
} }
/***file parameters***/
int UDPStandardImplementation::setDataCompressionEnable(const bool b){ int UDPStandardImplementation::setDataCompressionEnable(const bool b){
FILE_LOG(logDEBUG) << __AT__ << " starting"; FILE_LOG(logDEBUG) << __AT__ << " starting";
@ -958,12 +961,14 @@ void UDPStandardImplementation::stopReceiver(){
//wait until status is run_finished //wait until status is run_finished
while(status == TRANSMITTING){ while(status == TRANSMITTING){
sem_post(&writerGuiSemaphore); for(int i=0; i < numberofWriterThreads; i++)
sem_post(&writerGuiSemaphore[i]);
usleep(5000); usleep(5000);
} }
//semaphore destroy //semaphore destroy
sem_destroy(&writerGuiSemaphore); for(int i=0; i < numberofWriterThreads; i++)
sem_destroy(&writerGuiSemaphore[i]);
//change status //change status
pthread_mutex_lock(&statusMutex); pthread_mutex_lock(&statusMutex);
@ -982,8 +987,6 @@ void UDPStandardImplementation::stopReceiver(){
int UDPStandardImplementation::shutDownUDPSockets(){ int UDPStandardImplementation::shutDownUDPSockets(){
FILE_LOG(logDEBUG) << __AT__ << " called"; FILE_LOG(logDEBUG) << __AT__ << " called";
for(int i=0;i<numberofListeningThreads;i++){ for(int i=0;i<numberofListeningThreads;i++){
if(udpSocket[i]){ if(udpSocket[i]){
udpSocket[i]->ShutDownSocket(); udpSocket[i]->ShutDownSocket();
@ -1033,8 +1036,6 @@ void UDPStandardImplementation::startReadout(){
} }
} }
//set status //set status
pthread_mutex_lock(&statusMutex); pthread_mutex_lock(&statusMutex);
status = TRANSMITTING; status = TRANSMITTING;
@ -1049,26 +1050,26 @@ void UDPStandardImplementation::startReadout(){
/**make this better by asking all of it at once*/
void UDPStandardImplementation::readFrame(char* c,char** raw, uint64_t &startAcq, uint64_t &startFrame){ void UDPStandardImplementation::readFrame(int wThread, char* c,char** raw, uint64_t &startAcq, uint64_t &startFrame){
FILE_LOG(logDEBUG) << __AT__ << " called"; FILE_LOG(logDEBUG) << __AT__ << " called";
//point to gui data, to let writer thread know that gui is back for data //point to gui data, to let writer thread know that gui is back for data
if (guiData == NULL){ if (guiData[wThread] == NULL){
guiData = latestData; guiData[wThread] = latestData[wThread];
#ifdef DEBUG4 #ifdef DEBUG4
cprintf(CYAN,"Info: gui data not null anymore - ready to get data\n"); cprintf(CYAN,"Info: gui data not null anymore - ready to get data\n");
#endif #endif
} }
//copy data and filename //copy data and filename
strcpy(c,guiFileName); strcpy(c,guiFileName[wThread]);
startAcq = startAcquisitionIndex; startAcq = startAcquisitionIndex[wThread];
startFrame = startFrameIndex; startFrame = startFrameIndex[wThread];
//gui data not copied yet //gui data not copied yet
if(!guiDataReady){ if(!guiDataReady[wThread]){
#ifdef DEBUG4 #ifdef DEBUG4
cprintf(CYAN,"Info: gui data not ready\n"); cprintf(CYAN,"Info: gui data not ready\n");
#endif #endif
@ -1080,8 +1081,8 @@ void UDPStandardImplementation::readFrame(char* c,char** raw, uint64_t &startAcq
#ifdef DEBUG4 #ifdef DEBUG4
cprintf(CYAN,"Info: gui data ready\n"); cprintf(CYAN,"Info: gui data ready\n");
#endif #endif
*raw = guiData; *raw = guiData[wThread];
guiData = NULL; guiData[wThread] = NULL;
//for nth frame to gui, post semaphore so writer stops waiting //for nth frame to gui, post semaphore so writer stops waiting
if((FrameToGuiFrequency) && (writerThreadsMask)){ if((FrameToGuiFrequency) && (writerThreadsMask)){
@ -1089,7 +1090,7 @@ void UDPStandardImplementation::readFrame(char* c,char** raw, uint64_t &startAcq
cprintf(CYAN,"Info: gonna post\n"); cprintf(CYAN,"Info: gonna post\n");
#endif #endif
//release after getting data //release after getting data
sem_post(&writerGuiSemaphore); sem_post(&writerGuiSemaphore[wThread]);
} }
#ifdef DEBUG4 #ifdef DEBUG4
cprintf(CYAN,"Info: done post\n"); cprintf(CYAN,"Info: done post\n");
@ -1105,24 +1106,24 @@ void UDPStandardImplementation::closeFile(int i){
//normal //normal
if(!dataCompressionEnable){ if(!dataCompressionEnable){
if(sfilefd){ if(sfilefd[i]){
#ifdef DEBUG4 #ifdef DEBUG4
FILE_LOG(logDEBUG4) << "Going to close file: " << fileno(sfilefd)); FILE_LOG(logDEBUG4) << "Going to close file: " << fileno(sfilefd));
#endif #endif
fclose(sfilefd); fclose(sfilefd[i]);
sfilefd = NULL; sfilefd[i] = NULL;
} }
} }
//compression //compression
else{ else{
#if (defined(MYROOT1) && defined(ALLFILE_DEBUG)) || !defined(MYROOT1) #if (defined(MYROOT1) && defined(ALLFILE_DEBUG)) || !defined(MYROOT1)
if(sfilefd){ if(sfilefd[i]){
#ifdef DEBUG4 #ifdef DEBUG4
FILE_LOG(logDEBUG4) << "sfield: " << (int)sfilefd; FILE_LOG(logDEBUG4) << "sfield: " << (int)sfilefd[i];
#endif #endif
fclose(sfilefd); fclose(sfilefd[i]);
sfilefd = NULL; sfilefd[i] = NULL;
} }
#endif #endif
@ -1293,7 +1294,9 @@ void UDPStandardImplementation::setThreadPriorities(){
rights = false; rights = false;
if(!rights){ if(!rights){
FILE_LOG(logWARNING) << "No root permission to prioritize threads."; FILE_LOG(logWARNING) << "Unable to prioritize threads. Root privileges required for this option.";
}else{
FILE_LOG(logINFO) << "Priorities set - TCP:50, Listening:99, Writing:90";
} }
} }
@ -1375,7 +1378,8 @@ int UDPStandardImplementation::setupWriter(){
//creating first file //creating first file
//setting all value to 1 //setting all value to 1
pthread_mutex_lock(&statusMutex); pthread_mutex_lock(&statusMutex);
for(int i=0; i<numberofWriterThreads; i++) createFileMask|=(1<<i); for(int i=0; i<numberofWriterThreads; i++)
createFileMask|=(1<<i);
pthread_mutex_unlock(&statusMutex); pthread_mutex_unlock(&statusMutex);
for(int i=0; i<numberofWriterThreads; i++){ for(int i=0; i<numberofWriterThreads; i++){
@ -1388,14 +1392,6 @@ int UDPStandardImplementation::setupWriter(){
usleep(5000); usleep(5000);
} }
if(dataCompressionEnable){
#if (defined(MYROOT1) && defined(ALLFILE_DEBUG)) || !defined(MYROOT1)
if(fileCreateSuccess != FAIL)
fileCreateSuccess = createNewFile();
#endif
}
if(fileCreateSuccess == OK){ if(fileCreateSuccess == OK){
FILE_LOG(logDEBUG) << "Successfully created file(s)"; FILE_LOG(logDEBUG) << "Successfully created file(s)";
cout << "Writer Ready ..." << endl; cout << "Writer Ready ..." << endl;
@ -1406,20 +1402,20 @@ int UDPStandardImplementation::setupWriter(){
int UDPStandardImplementation::createNewFile(){ int UDPStandardImplementation::createNewFile(int ithread){
FILE_LOG(logDEBUG) << __AT__ << " called"; FILE_LOG(logDEBUG) << __AT__ << " called";
int index = 0; int index = 0;
if(packetsCaught) if(packetsCaught)
index = frameIndex; index = frameIndex[ithread];
//create file name //create file name
if(!frameIndexEnable) if(!frameIndexEnable)
sprintf(completeFileName, "%s/%s_%lld.raw", filePath,fileName,(long long int)fileIndex); sprintf(completeFileName[ithread], "%s/%s_%lld.raw", filePath,fileName,(long long int)fileIndex);
else if (myDetectorType == EIGER) else if (myDetectorType == EIGER)
sprintf(completeFileName, "%s/%s_f%012lld_%lld.raw", filePath,fileName,(long long int)currentFrameNumber,(long long int)fileIndex); sprintf(completeFileName[ithread], "%s/%s_f%012lld_%lld.raw", filePath,fileName,(long long int)currentFrameNumber,(long long int)fileIndex);
else else
sprintf(completeFileName, "%s/%s_f%012lld_%lld.raw", filePath,fileName,(long long int)(packetsCaught/packetsPerFrame),(long long int)fileIndex); sprintf(completeFileName[ithread], "%s/%s_f%012lld_%lld.raw", filePath,fileName,(long long int)(packetsCaught/packetsPerFrame),(long long int)fileIndex);
#ifdef DEBUG4 #ifdef DEBUG4
FILE_LOG(logINFO) << completefileName; FILE_LOG(logINFO) << completefileName;
@ -2433,7 +2429,7 @@ void UDPStandardImplementation::waitWritingBufferForNextAcquisition(int ithread)
FILE_LOG(logDEBUG) << __AT__ << " called"; FILE_LOG(logDEBUG) << __AT__ << " called";
//in case they are not closed already //in case they are not closed already
closeFile(); closeFile(ithread);
#ifdef DEBUG4 #ifdef DEBUG4
cprintf(GREEN,"Writing_Thread %d: Done with acquisition. Waiting for 1st sem to create new file/change of parameters\n", ithread); cprintf(GREEN,"Writing_Thread %d: Done with acquisition. Waiting for 1st sem to create new file/change of parameters\n", ithread);
#endif #endif
@ -2462,14 +2458,35 @@ void UDPStandardImplementation::waitWritingBufferForNextAcquisition(int ithread)
//create file //create file
if((1<<ithread)&createFileMask){ if((1<<ithread)&createFileMask){
//change the detector index in the file names
if(myDetectorType == EIGER){
string tempname;int ci = 0, fi = 0, p = 0, di = 0; double cs0 = 0 , cs1 = 0;
tempname.assign(fileName+"_0.raw");
fileIOStatic::getVariablesFromFileName(tempname,ci, fi, p, cs0, cs1, di);
if(di!=-1) di = 0;
di = (di * 2) + ithread;
strcpy(receiverFilePrefix[ithread],createReceiverFilePrefix(fileIO::getNameFromReceiverFilePrefix(tempname),).c_str());
}
if(dataCompressionEnable){ if(dataCompressionEnable){
#ifdef MYROOT1 #ifdef MYROOT1
pthread_mutex_lock(&writeMutex); pthread_mutex_lock(&writeMutex);
fileCreateSuccess = createCompressionFile(ithread,0); fileCreateSuccess = createCompressionFile(ithread,0);
pthread_mutex_unlock(&writeMutex); pthread_mutex_unlock(&writeMutex);
#endif
#if (defined(MYROOT1) && defined(ALLFILE_DEBUG)) || !defined(MYROOT1)
if(!ithread){
//wait till its mask becomes 1 (all created except this one)
while(createFileMask!=0x1){
FILE_LOG(logDEBUG4) << "*" << flush;
usleep(5000);
}
//create the normal file
if(fileCreateSuccess != FAIL)
fileCreateSuccess = createNewFile(0);
}
#endif #endif
}else }else
fileCreateSuccess = createNewFile(); fileCreateSuccess = createNewFile(ithread);
//let startwriter know file created //let startwriter know file created
pthread_mutex_lock(&statusMutex); pthread_mutex_lock(&statusMutex);
@ -2695,7 +2712,7 @@ void UDPStandardImplementation::handleWithoutDataCompression(int ithread, char*
void UDPStandardImplementation::writeFileWithoutCompression(char* wbuffer[],uint32_t numpackets){ void UDPStandardImplementation::writeFileWithoutCompression(int ithread, char* wbuffer[],uint32_t numpackets){
FILE_LOG(logDEBUG) << __AT__ << " called"; FILE_LOG(logDEBUG) << __AT__ << " called";
@ -2740,7 +2757,7 @@ void UDPStandardImplementation::writeFileWithoutCompression(char* wbuffer[],uint
#ifdef DEBUG3 #ifdef DEBUG3
cprintf(GREEN,"Writing_Thread: Current Frame Number:%d\n",currentFrameNumber); cprintf(GREEN,"Writing_Thread: Current Frame Number:%d\n",currentFrameNumber);
#endif #endif
createNewFile(); createNewFile(ithread);
} }
//to create new file when max reached //to create new file when max reached
packetsToSave = maxPacketsPerFile - packetsInFile; packetsToSave = maxPacketsPerFile - packetsInFile;
@ -3109,7 +3126,7 @@ void UDPStandardImplementation::handleDataCompression(int ithread, char* wbuffer
packetsCaught += packetsPerFrame; packetsCaught += packetsPerFrame;
totalPacketsCaught += packetsPerFrame; totalPacketsCaught += packetsPerFrame;
if(packetsInFile >= (uint32_t)maxPacketsPerFile) if(packetsInFile >= (uint32_t)maxPacketsPerFile)
createNewFile(); createNewFile(0);
pthread_mutex_unlock(&progressMutex); pthread_mutex_unlock(&progressMutex);
#endif #endif