changed progress prints, file naming etc

This commit is contained in:
Dhanya Maliakal 2016-12-21 10:53:56 +01:00
parent 66c488b79a
commit d847a289a4
3 changed files with 107 additions and 91 deletions

View File

@ -255,6 +255,7 @@ class UDPBaseImplementation : protected virtual slsReceiverDefs, public UDPInter
//***file parameters*** //***file parameters***
/** /**
* Set File Format * Set File Format
* @param f fileformat binary or hdf5
*/ */
void setFileFormat(slsReceiverDefs::fileFormat f); void setFileFormat(slsReceiverDefs::fileFormat f);

View File

@ -314,6 +314,7 @@ class UDPInterface {
//***file parameters*** //***file parameters***
/** /**
* Set File Format * Set File Format
* @param f fileformat binary or hdf5
*/ */
virtual void setFileFormat(slsReceiverDefs::fileFormat f) = 0; virtual void setFileFormat(slsReceiverDefs::fileFormat f) = 0;

View File

@ -1036,7 +1036,8 @@ int UDPStandardImplementation::startReceiver(char *c){
FILE_LOG(logINFO) << "Data Compression has been " << stringEnable(dataCompressionEnable); FILE_LOG(logINFO) << "Data Compression has been " << stringEnable(dataCompressionEnable);
} }
FILE_LOG(logINFO) << "Number of Jobs Per Buffer: " << numberofJobsPerBuffer; FILE_LOG(logINFO) << "Number of Jobs Per Buffer: " << numberofJobsPerBuffer;
FILE_LOG(logINFO) << "Max Frames Per File:" << maxFramesPerFile; if(fileFormatType == BINARY)
FILE_LOG(logINFO) << "Max Frames Per File:" << maxFramesPerFile;
if(frameToGuiFrequency) if(frameToGuiFrequency)
FILE_LOG(logINFO) << "Frequency of frames sent to gui: " << frameToGuiFrequency; FILE_LOG(logINFO) << "Frequency of frames sent to gui: " << frameToGuiFrequency;
else else
@ -1638,9 +1639,12 @@ int UDPStandardImplementation::createNewFile(int ithread){
//create file name //create file name
if(!frameIndexEnable) if(!frameIndexEnable)
sprintf(completeFileName[ithread], "%s/%s_%lld", filePath,fileNamePerThread[ithread],(long long int)fileIndex); sprintf(completeFileName[ithread], "%s/%s_%lld", filePath,fileNamePerThread[ithread],(long long int)fileIndex);
else else{
sprintf(completeFileName[ithread], "%s/%s_f%012lld_%lld", filePath,fileNamePerThread[ithread],(long long int)lastFrameNumberInFile[ithread]+1,(long long int)fileIndex); if(fileFormatType == BINARY)
sprintf(completeFileName[ithread], "%s/%s_f%012lld_%lld", filePath,fileNamePerThread[ithread],(long long int)lastFrameNumberInFile[ithread]+1,(long long int)fileIndex);
else
sprintf(completeFileName[ithread], "%s/%s_%lld", filePath,fileNamePerThread[ithread],(long long int)fileIndex);
}
//file type //file type
switch(fileFormatType){ switch(fileFormatType){
#ifdef HDF5C #ifdef HDF5C
@ -1690,6 +1694,47 @@ int UDPStandardImplementation::createNewFile(int ithread){
} }
//setting file buffer size to 16mb //setting file buffer size to 16mb
setvbuf(sfilefd[ithread],NULL,_IOFBF,BUF_SIZE); setvbuf(sfilefd[ithread],NULL,_IOFBF,BUF_SIZE);
//Print packet loss and filenames
if(totalWritingPacketCount[ithread]){
if(numberofWriterThreads>1){
cprintf(BLUE,"File:%s"
"\nThread:%d"
"\tLost:%lld"
"\t\tPackets:%lld"
"\tFrame#:%lld"
"\tPFrame#:%lld\n",
completeFileName[ithread],ithread,
((frameNumberInPreviousFile[ithread]+1+maxFramesPerFile)>numberOfFrames)
?(long long int)((numberOfFrames-(frameNumberInPreviousFile[ithread]+1))*packetsPerFrame - totalPacketsInFile[ithread])
:(long long int)((frameNumberInPreviousFile[ithread]+maxFramesPerFile - frameNumberInPreviousFile[ithread])*packetsPerFrame - totalPacketsInFile[ithread]),
(long long int)totalPacketsInFile[ithread],
(long long int)currentFrameNumber[ithread],
(long long int)frameNumberInPreviousFile[ithread]
);
}else{
cprintf(BLUE,"File:%s"
"\nLost:%lld"
"\t\tPackets:%lld"
"\tFrame#:%lld"
"\tPFrame#:%lld\n",
completeFileName[ithread],
((frameNumberInPreviousFile[ithread]+1+maxFramesPerFile)>numberOfFrames)
?(long long int)(numberOfFrames-(frameNumberInPreviousFile[ithread]+1))
:(long long int)(frameNumberInPreviousFile[ithread]+maxFramesPerFile - frameNumberInPreviousFile[ithread]),
(long long int)totalPacketsInFile[ithread],
(long long int)currentFrameNumber[ithread],
(long long int)frameNumberInPreviousFile[ithread]
);
}
}else
printf("Thread:%d File opened:%s\n",ithread, completeFileName[ithread]);
//write file header
if(myDetectorType == EIGER)
fwrite((void*)fileHeader[ithread], 1, FILE_HEADER_SIZE, sfilefd[ithread]);
} }
@ -1749,18 +1794,20 @@ int UDPStandardImplementation::createNewFile(int ithread){
int iValue=0; int iValue=0;
StrType strdatatype(PredType::C_S1,256); StrType strdatatype(PredType::C_S1,256);
DataSet dataset; DataSet dataset;
//top if(myDetectorType == EIGER){
iValue = (flippedData[0]?0:1); //top
dataset = group5.createDataSet ( "top", PredType::NATIVE_INT, dataspace ); iValue = (flippedData[0]?0:1);
dataset.write ( &iValue, PredType::NATIVE_INT); dataset = group5.createDataSet ( "top", PredType::NATIVE_INT, dataspace );
//left dataset.write ( &iValue, PredType::NATIVE_INT);
iValue = (ithread?0:1); //left
dataset = group5.createDataSet ( "left", PredType::NATIVE_INT, dataspace ); iValue = (ithread?0:1);
dataset.write ( &iValue, PredType::NATIVE_INT); dataset = group5.createDataSet ( "left", PredType::NATIVE_INT, dataspace );
//active dataset.write ( &iValue, PredType::NATIVE_INT);
iValue = activated; //active
dataset = group5.createDataSet ( "active", PredType::NATIVE_INT, dataspace ); iValue = activated;
dataset.write ( &iValue, PredType::NATIVE_INT); dataset = group5.createDataSet ( "active", PredType::NATIVE_INT, dataspace );
dataset.write ( &iValue, PredType::NATIVE_INT);
}
//Dynamic Range //Dynamic Range
dataset = group4.createDataSet ( "dynamic range", PredType::NATIVE_INT, dataspace ); dataset = group4.createDataSet ( "dynamic range", PredType::NATIVE_INT, dataspace );
dataset.write ( &dynamicRange, PredType::NATIVE_INT); dataset.write ( &dynamicRange, PredType::NATIVE_INT);
@ -1831,50 +1878,12 @@ int UDPStandardImplementation::createNewFile(int ithread){
}//end of creating file }//end of creating file
pthread_mutex_unlock(&writeMutex); pthread_mutex_unlock(&writeMutex);
if(!totalWritingPacketCount[ithread])
printf("Thread:%d File opened:%s\n",ithread, completeFileName[ithread]);
} }
#endif #endif
//Print packet loss and filenames
if(totalWritingPacketCount[ithread]){
if(numberofWriterThreads>1){
cprintf(BLUE,"File:%s"
"\nThread:%d"
"\tLost:%lld"
"\t\tPackets:%lld"
"\tFrame#:%lld"
"\tPFrame#:%lld\n",
completeFileName[ithread],ithread,
((frameNumberInPreviousFile[ithread]+1+maxFramesPerFile)>numberOfFrames)
?(long long int)((numberOfFrames-(frameNumberInPreviousFile[ithread]+1))*packetsPerFrame - totalPacketsInFile[ithread])
:(long long int)((frameNumberInPreviousFile[ithread]+maxFramesPerFile - frameNumberInPreviousFile[ithread])*packetsPerFrame - totalPacketsInFile[ithread]),
(long long int)totalPacketsInFile[ithread],
(long long int)currentFrameNumber[ithread],
(long long int)frameNumberInPreviousFile[ithread]
);
}else{
cprintf(BLUE,"File:%s"
"\nLost:%lld"
"\t\tPackets:%lld"
"\tFrame#:%lld"
"\tPFrame#:%lld\n",
completeFileName[ithread],
((frameNumberInPreviousFile[ithread]+1+maxFramesPerFile)>numberOfFrames)
?(long long int)(numberOfFrames-(frameNumberInPreviousFile[ithread]+1))
:(long long int)(frameNumberInPreviousFile[ithread]+maxFramesPerFile - frameNumberInPreviousFile[ithread]),
(long long int)totalPacketsInFile[ithread],
(long long int)currentFrameNumber[ithread],
(long long int)frameNumberInPreviousFile[ithread]
);
}
}else
printf("Thread:%d File opened:%s\n",ithread, completeFileName[ithread]);
//write file header
if(myDetectorType == EIGER && fileFormatType == BINARY)
fwrite((void*)fileHeader[ithread], 1, FILE_HEADER_SIZE, sfilefd[ithread]);
} }
//reset counters for each new file //reset counters for each new file
@ -2903,6 +2912,7 @@ void UDPStandardImplementation::stopWriting(int ithread, char* wbuffer){
//Print packet loss //Print packet loss
//if(totalWritingPacketCountFromLastCheck[ithread]){ //if(totalWritingPacketCountFromLastCheck[ithread]){
#ifdef VERBOSE #ifdef VERBOSE
if(fileFormatType == BINARY){
if(numberofWriterThreads>1){ if(numberofWriterThreads>1){
printf("Thread:%d" printf("Thread:%d"
"\tLost:%lld" "\tLost:%lld"
@ -2911,11 +2921,11 @@ void UDPStandardImplementation::stopWriting(int ithread, char* wbuffer){
"\tPFrame#:%lld\n", "\tPFrame#:%lld\n",
ithread, ithread,
((frameNumberInPreviousCheck[ithread]+1+(maxFramesPerFile/progressFrequency))>numberOfFrames) ((frameNumberInPreviousCheck[ithread]+1+(maxFramesPerFile/progressFrequency))>numberOfFrames)
?(long long int)((numberOfFrames-(frameNumberInPreviousCheck[ithread]+1))*packetsPerFrame - totalWritingPacketCountFromLastCheck[ithread]) ?(long long int)((numberOfFrames-(frameNumberInPreviousCheck[ithread]+1))*packetsPerFrame - totalWritingPacketCountFromLastCheck[ithread])
:(long long int)((frameNumberInPreviousCheck[ithread]+(maxFramesPerFile/progressFrequency) - frameNumberInPreviousCheck[ithread])*packetsPerFrame - totalWritingPacketCountFromLastCheck[ithread]), :(long long int)((frameNumberInPreviousCheck[ithread]+(maxFramesPerFile/progressFrequency) - frameNumberInPreviousCheck[ithread])*packetsPerFrame - totalWritingPacketCountFromLastCheck[ithread]),
(long long int)totalWritingPacketCountFromLastCheck[ithread], (long long int)totalWritingPacketCountFromLastCheck[ithread],
(long long int)currentFrameNumber[ithread], (long long int)currentFrameNumber[ithread],
(long long int)frameNumberInPreviousCheck[ithread] (long long int)frameNumberInPreviousCheck[ithread]
); );
}else{ }else{
printf("Lost:%lld" printf("Lost:%lld"
@ -2923,11 +2933,11 @@ void UDPStandardImplementation::stopWriting(int ithread, char* wbuffer){
"\tFrame#:%lld" "\tFrame#:%lld"
"\tPFrame#:%lld\n", "\tPFrame#:%lld\n",
((frameNumberInPreviousCheck[ithread]+1+(maxFramesPerFile/progressFrequency))>numberOfFrames) ((frameNumberInPreviousCheck[ithread]+1+(maxFramesPerFile/progressFrequency))>numberOfFrames)
?(long long int)((numberOfFrames-(frameNumberInPreviousCheck[ithread]+1))*packetsPerFrame - totalWritingPacketCountFromLastCheck[ithread]) ?(long long int)((numberOfFrames-(frameNumberInPreviousCheck[ithread]+1))*packetsPerFrame - totalWritingPacketCountFromLastCheck[ithread])
:(long long int)((frameNumberInPreviousCheck[ithread]+(maxFramesPerFile/progressFrequency) - frameNumberInPreviousCheck[ithread])*packetsPerFrame - totalWritingPacketCountFromLastCheck[ithread]), :(long long int)((frameNumberInPreviousCheck[ithread]+(maxFramesPerFile/progressFrequency) - frameNumberInPreviousCheck[ithread])*packetsPerFrame - totalWritingPacketCountFromLastCheck[ithread]),
(long long int)totalWritingPacketCountFromLastCheck[ithread], (long long int)totalWritingPacketCountFromLastCheck[ithread],
(long long int)currentFrameNumber[ithread], (long long int)currentFrameNumber[ithread],
(long long int)frameNumberInPreviousCheck[ithread] (long long int)frameNumberInPreviousCheck[ithread]
); );
} }
@ -2940,12 +2950,12 @@ void UDPStandardImplementation::stopWriting(int ithread, char* wbuffer){
"\tPFrame#:%lld\n", "\tPFrame#:%lld\n",
completeFileName[ithread],ithread, completeFileName[ithread],ithread,
((frameNumberInPreviousFile[ithread]+1+maxFramesPerFile)>numberOfFrames) ((frameNumberInPreviousFile[ithread]+1+maxFramesPerFile)>numberOfFrames)
?(long long int)((numberOfFrames-(frameNumberInPreviousFile[ithread]+1))*packetsPerFrame - totalPacketsInFile[ithread]) ?(long long int)((numberOfFrames-(frameNumberInPreviousFile[ithread]+1))*packetsPerFrame - totalPacketsInFile[ithread])
:(long long int)((frameNumberInPreviousFile[ithread]+maxFramesPerFile - frameNumberInPreviousFile[ithread])*packetsPerFrame - totalPacketsInFile[ithread]), :(long long int)((frameNumberInPreviousFile[ithread]+maxFramesPerFile - frameNumberInPreviousFile[ithread])*packetsPerFrame - totalPacketsInFile[ithread]),
(long long int)totalPacketsInFile[ithread], (long long int)totalPacketsInFile[ithread],
(long long int)currentFrameNumber[ithread], (long long int)currentFrameNumber[ithread],
(long long int)frameNumberInPreviousFile[ithread] (long long int)frameNumberInPreviousFile[ithread]
); );
}else{ }else{
cprintf(BLUE,"File:%s" cprintf(BLUE,"File:%s"
"\nLost:%lld" "\nLost:%lld"
@ -2954,13 +2964,14 @@ void UDPStandardImplementation::stopWriting(int ithread, char* wbuffer){
"\tPFrame#:%lld\n", "\tPFrame#:%lld\n",
completeFileName[ithread], completeFileName[ithread],
((frameNumberInPreviousFile[ithread]+1+maxFramesPerFile)>numberOfFrames) ((frameNumberInPreviousFile[ithread]+1+maxFramesPerFile)>numberOfFrames)
?(long long int)(numberOfFrames-(frameNumberInPreviousFile[ithread]+1)) ?(long long int)(numberOfFrames-(frameNumberInPreviousFile[ithread]+1))
:(long long int)(frameNumberInPreviousFile[ithread]+maxFramesPerFile - frameNumberInPreviousFile[ithread]), :(long long int)(frameNumberInPreviousFile[ithread]+maxFramesPerFile - frameNumberInPreviousFile[ithread]),
(long long int)totalPacketsInFile[ithread], (long long int)totalPacketsInFile[ithread],
(long long int)currentFrameNumber[ithread], (long long int)currentFrameNumber[ithread],
(long long int)frameNumberInPreviousFile[ithread] (long long int)frameNumberInPreviousFile[ithread]
); );
} }
}
#endif #endif
//} //}
@ -3112,17 +3123,22 @@ void UDPStandardImplementation::handleCompleteFramesOnly(int ithread, char* wbuf
#ifdef HDF5C #ifdef HDF5C
||hdf5_fileId[ithread])){ ||hdf5_fileId[ithread])){
#else #else
)){ )){
#endif #endif
if(fileFormatType == BINARY && tempframenumber && (tempframenumber%maxFramesPerFile) == 0) if(fileFormatType == BINARY){
createNewFile(ithread); if(tempframenumber && (tempframenumber%maxFramesPerFile) == 0)
#ifdef HDF5C createNewFile(ithread);
fwrite(wbuffer + HEADER_SIZE_NUM_TOT_PACKETS, 1, (bufferSize + FILE_FRAME_HEADER_LENGTH), sfilefd[ithread]);
}
if(fileFormatType == HDF5){
#ifdef HDF5C
else if (fileFormatType == HDF5){
pthread_mutex_lock(&writeMutex); pthread_mutex_lock(&writeMutex);
//wite to file
hsize_t count[3] = {NY,NX,1}; hsize_t count[3] = {NY,NX,1};
hsize_t start[3] = {0, 0, tempframenumber}; hsize_t start[3] = {0, 0, tempframenumber};
hsize_t dims2[2]={NY,NX}; hsize_t dims2[2]={NY,NX};
@ -3139,16 +3155,14 @@ void UDPStandardImplementation::handleCompleteFramesOnly(int ithread, char* wbuf
memspace.close(); memspace.close();
} }
catch(Exception error){ catch(Exception error){
cprintf(RED,"Error in writing to file in thread %d\n",ithread); cprintf(RED,"Error in writing to file in thread %d\n",ithread);
error.printError(); error.printError();
} }
pthread_mutex_unlock(&writeMutex); pthread_mutex_unlock(&writeMutex);
}else }
#endif #endif
if(fileFormatType == BINARY)
fwrite(wbuffer + HEADER_SIZE_NUM_TOT_PACKETS, 1, (bufferSize + FILE_FRAME_HEADER_LENGTH), sfilefd[ithread]);
} }