in between

This commit is contained in:
Dhanya Maliakal
2016-08-24 11:54:15 +02:00
parent 0a2a88e23f
commit f17a2ba2b8
6 changed files with 176 additions and 135 deletions

View File

@ -413,9 +413,9 @@ class UDPBaseImplementation : protected virtual slsReceiverDefs, public UDPInter
/** /**
* Closes file / all files(if multiple files) * Closes file / all files(if multiple files)
* @param i writer thread index * @param ithread writer thread index
*/ */
void closeFile(int i = 0); void closeFile(int ithread = 0);
//***callback functions*** //***callback functions***

View File

@ -470,9 +470,9 @@ class UDPInterface {
/** /**
* Closes file / all files(if multiple files) * Closes file / all files(if multiple files)
* @param i writer thread index * @param ithread writer thread index
*/ */
virtual void closeFile(int i = 0) = 0; virtual void closeFile(int ithread = 0) = 0;
//***callback functions*** //***callback functions***

View File

@ -199,20 +199,21 @@ class UDPStandardImplementation: private virtual slsReceiverDefs, public UDPBase
/** /**
* Overridden method * Overridden method
* Get the buffer-current frame read by receiver * Get the buffer-current frame read by receiver
* @param ithread writer thread
* @param c pointer to current file name * @param c pointer to current file name
* @param raw address of pointer, pointing to current frame to send to gui * @param raw address of pointer, pointing to current frame to send to gui
* @param startAcq start index of the acquisition * @param startAcq start index of the acquisition
* @param startFrame start index of the scan * @param startFrame start index of the scan
*/ */
void readFrame(char* c,char** raw, uint64_t &startAcq, uint64_t &startFrame); void readFrame(int ithread, char* c,char** raw, uint64_t &startAcq, uint64_t &startFrame);
/** /**
* Overridden method * Overridden method
* Closes file / all files(data compression involves multiple files) * Closes file / all files(data compression involves multiple files)
* TCPIPInterface can also call this in case of illegal shutdown of receiver * TCPIPInterface can also call this in case of illegal shutdown of receiver
* @param i writer thread index * @param ithread writer thread index
*/ */
void closeFile(int i = 0); void closeFile(int ithread = 0);
private: private:
/************************************************************************* /*************************************************************************
@ -444,16 +445,18 @@ private:
/** /**
* Updates the file header char aray, each time the corresp parameter is changed * Updates the file header char aray, each time the corresp parameter is changed
* @param ithread writer thread index
*/ */
void updateFileHeader(); void updateFileHeader(int ithread);
/** /**
* Called by handleWithoutDataCompression and handleWithCompression after writing to file * Called by handleWithoutDataCompression and handleWithCompression after writing to file
* Copy frames for GUI and updates appropriate parameters for frequency frames to gui * Copy frames for GUI and updates appropriate parameters for frequency frames to gui
* Uses semaphore for nth frame mode * Uses semaphore for nth frame mode
* @param ithread writer thread index
* @param buffer buffer to copy * @param buffer buffer to copy
*/ */
void copyFrameToGui(char* buffer[]); void copyFrameToGui(int ithread, char* buffer[]);
void processWritingBuffer(int ithread); void processWritingBuffer(int ithread);
@ -526,9 +529,6 @@ private:
/** Complete File name */ /** Complete File name */
char completeFileName[MAX_NUMBER_OF_WRITER_THREADS][MAX_STR_LENGTH]; char completeFileName[MAX_NUMBER_OF_WRITER_THREADS][MAX_STR_LENGTH];
/** File Prefix with detector index */
char receiverFilePrefix[MAX_NUMBER_OF_WRITER_THREADS][MAX_STR_LENGTH];
/** Maximum Packets Per File **/ /** Maximum Packets Per File **/
int maxPacketsPerFile; int maxPacketsPerFile;
@ -542,10 +542,10 @@ private:
//***acquisition indices/count parameters*** //***acquisition indices/count parameters***
/** Frame Number of First Frame of an entire Acquisition (including all scans) */ /** Frame Number of First Frame of an entire Acquisition (including all scans) */
uint64_t startAcquisitionIndex[MAX_NUMBER_OF_LISTENING_THREADS]; uint64_t startAcquisitionIndex;
/** Frame index at start of each real time acquisition (eg. for each scan) */ /** Frame index at start of each real time acquisition (eg. for each scan) */
uint64_t startFrameIndex[MAX_NUMBER_OF_LISTENING_THREADS]; uint64_t startFrameIndex[MAX_NUMBER_OF_WRITER_THREADS];
/** Actual current frame index of each time acquisition (eg. for each scan) */ /** Actual current frame index of each time acquisition (eg. for each scan) */
uint64_t frameIndex[MAX_NUMBER_OF_WRITER_THREADS]; uint64_t frameIndex[MAX_NUMBER_OF_WRITER_THREADS];
@ -564,8 +564,8 @@ private:
/* Acquisition started */ /* Acquisition started */
bool acqStarted; bool acqStarted;
/* Measurement started */ /* Measurement started - for each thread to get progress print outs*/
bool measurementStarted; bool measurementStarted[MAX_NUMBER_OF_LISTENING_THREADS];
/** Total Frame Count listened to by listening threads */ /** Total Frame Count listened to by listening threads */
int totalListeningFrameCount[MAX_NUMBER_OF_LISTENING_THREADS]; int totalListeningFrameCount[MAX_NUMBER_OF_LISTENING_THREADS];
@ -582,6 +582,10 @@ private:
/** Number of Missing Packets in file */ /** Number of Missing Packets in file */
uint32_t numTotMissingPacketsInFile[MAX_NUMBER_OF_WRITER_THREADS]; uint32_t numTotMissingPacketsInFile[MAX_NUMBER_OF_WRITER_THREADS];
/** packets caught per thread */
uint64_t packetsCaughtPerThread[MAX_NUMBER_OF_WRITER_THREADS];
@ -720,6 +724,9 @@ private:
/** Progress (currentFrameNumber) Mutex */ /** Progress (currentFrameNumber) Mutex */
pthread_mutex_t progressMutex; pthread_mutex_t progressMutex;
/** Progress (currentFrameNumber) Mutex */
pthread_mutex_t udpSocketMutex[MAX_NUMBER_OF_LISTENING_THREADS];
//***callback*** //***callback***
/** The action which decides what the user and default responsibilities to save data are /** The action which decides what the user and default responsibilities to save data are
* 0 raw data ready callback takes care of open,close,write file * 0 raw data ready callback takes care of open,close,write file

View File

@ -442,7 +442,7 @@ void UDPBaseImplementation::abort(){
FILE_LOG(logERROR) << __AT__ << " must be overridden by child classes"; FILE_LOG(logERROR) << __AT__ << " must be overridden by child classes";
} }
void UDPBaseImplementation::closeFile(int i){ void UDPBaseImplementation::closeFile(int ithread){
FILE_LOG(logWARNING) << __AT__ << " doing nothing..."; FILE_LOG(logWARNING) << __AT__ << " doing nothing...";
FILE_LOG(logERROR) << __AT__ << " must be overridden by child classes"; FILE_LOG(logERROR) << __AT__ << " must be overridden by child classes";
} }

View File

@ -38,6 +38,8 @@ UDPStandardImplementation::UDPStandardImplementation(){
pthread_mutex_init(&writeMutex,NULL); pthread_mutex_init(&writeMutex,NULL);
pthread_mutex_init(&dataReadyMutex,NULL); pthread_mutex_init(&dataReadyMutex,NULL);
pthread_mutex_init(&progressMutex,NULL); pthread_mutex_init(&progressMutex,NULL);
for(int i=0;i<MAX_NUMBER_OF_LISTENING_THREADS;i++)
pthread_mutex_init(&udpSocketMutex[i],NULL);
//to increase socket receiver buffer size and max length of input queue by changing kernel settings //to increase socket receiver buffer size and max length of input queue by changing kernel settings
if(myDetectorType == EIGER); if(myDetectorType == EIGER);
@ -144,20 +146,21 @@ void UDPStandardImplementation::initializeMembers(){
#endif #endif
for(int i=0; i<MAX_NUMBER_OF_WRITER_THREADS; i++){ for(int i=0; i<MAX_NUMBER_OF_WRITER_THREADS; i++){
strcpy(completeFileName[i],""); strcpy(completeFileName[i],"");
strcpy(receiverFilePrefix[i],"");
strcpy(fileHeader[i],""); strcpy(fileHeader[i],"");
sfilefd[i] = NULL;
} }
maxPacketsPerFile = 0; maxPacketsPerFile = 0;
fileCreateSuccess = false; fileCreateSuccess = false;
//***acquisition indices parameters*** //***acquisition indices parameters***
startAcquisitionIndex = 0;
acqStarted = false;
for(int i = 0; i < MAX_NUMBER_OF_LISTENING_THREADS; ++i){ for(int i = 0; i < MAX_NUMBER_OF_LISTENING_THREADS; ++i){
startAcquisitionIndex[i] = 0;
startFrameIndex[i] = 0; startFrameIndex[i] = 0;
measurementStarted[i] = false;
totalListeningFrameCount[i] = 0; totalListeningFrameCount[i] = 0;
} }
acqStarted = false;
measurementStarted = false;
for(int i=0; i<MAX_NUMBER_OF_WRITER_THREADS; i++){ for(int i=0; i<MAX_NUMBER_OF_WRITER_THREADS; i++){
frameIndex[i] = 0; frameIndex[i] = 0;
currentFrameNumber[i] = 0; currentFrameNumber[i] = 0;
@ -179,9 +182,6 @@ void UDPStandardImplementation::initializeMembers(){
fifoFree[i] = NULL; fifoFree[i] = NULL;
udpSocket[i] = NULL; udpSocket[i] = NULL;
} }
for(int i=0; i<MAX_NUMBER_OF_WRITER_THREADS; i++){
sfilefd[i] = NULL;
}
numberofJobsPerBuffer = -1; numberofJobsPerBuffer = -1;
fifoSize = 0; fifoSize = 0;
@ -803,8 +803,10 @@ int UDPStandardImplementation::setDetectorType(const detectorType d){
//updates File Header //updates File Header
if(myDetectorType == EIGER) if(myDetectorType == EIGER){
updateFileHeader(); for(int i=0; i<MAX_NUMBER_OF_WRITER_THREADS; i++)
updateFileHeader(i);
}
FILE_LOG(logDEBUG) << " Detector type set to " << getDetectorType(d); FILE_LOG(logDEBUG) << " Detector type set to " << getDetectorType(d);
@ -816,13 +818,10 @@ int UDPStandardImplementation::setDetectorType(const detectorType d){
void UDPStandardImplementation::resetAcquisitionCount(){ void UDPStandardImplementation::resetAcquisitionCount(){
FILE_LOG(logDEBUG) << __AT__ << " starting"; FILE_LOG(logDEBUG) << __AT__ << " starting";
for(int i=0;i<numberofListeningThreads;i++)
startAcquisitionIndex[i] = 0;
pthread_mutex_lock(&progressMutex); pthread_mutex_lock(&progressMutex);
startAcquisitionIndex = 0;
acqStarted = false; acqStarted = false;
pthread_mutex_unlock(&progressMutex); pthread_mutex_unlock(&progressMutex);
pthread_mutex_lock(&writeMutex); pthread_mutex_lock(&writeMutex);
totalPacketsCaught = 0; totalPacketsCaught = 0;
pthread_mutex_unlock(&writeMutex); pthread_mutex_unlock(&writeMutex);
@ -838,26 +837,25 @@ int UDPStandardImplementation::startReceiver(char *c){
//reseting variables //reseting variables
pthread_mutex_lock(&progressMutex);
measurementStarted = false;
pthread_mutex_unlock(&progressMutex);
//for every acquisition start (not every scan) //for every acquisition start (not every scan)
if(!acqStarted){ if(!acqStarted){
pthread_mutex_lock(&progressMutex); pthread_mutex_lock(&progressMutex);
acquisitionIndex = 0; acquisitionIndex = 0;
pthread_mutex_unlock(&progressMutex); pthread_mutex_unlock(&progressMutex);
for(int i=0;i<numberofWriterThreads;i++){ for(int i=0;i<numberofWriterThreads;i++){
currentFrameNumber[i] = 0; //has to be zero to add to startframeindex for each scan currentFrameNumber[i] = 0; //has to be zero to add to startframeindex for each scan
frameIndex[i] = 0; frameIndex[i] = 0;
} }
} }
//for every measurement
for(int i=0;i<numberofListeningThreads;i++){ for(int i=0;i<numberofListeningThreads;i++){
startFrameIndex[i] = 0; startFrameIndex[i] = 0;
measurementStarted[i] = false;
totalListeningFrameCount[i] = 0; totalListeningFrameCount[i] = 0;
} }
for(int i=0;i<numberofWriterThreads;i++){ for(int i=0;i<numberofWriterThreads;i++){
frametoGuiCounter[i] = 0;
frameIndex[i] = 0; frameIndex[i] = 0;
numMissingPackets[i] = 0; numMissingPackets[i] = 0;
numTotMissingPackets[i] = 0; numTotMissingPackets[i] = 0;
@ -869,9 +867,11 @@ int UDPStandardImplementation::startReceiver(char *c){
sfilefd[i] = NULL; sfilefd[i] = NULL;
} }
//reset gui variables //reset gui variables
frametoGuiCounter[i] = 0;
guiData[i] = NULL; guiData[i] = NULL;
guiDataReady[i]=0; guiDataReady[i]=0;
strcpy(guiFileName[i],""); strcpy(guiFileName[i],"");
packetsCaughtPerThread[i] = 0;
} }
pthread_mutex_lock(&writeMutex); pthread_mutex_lock(&writeMutex);
packetsCaught = 0; packetsCaught = 0;
@ -915,10 +915,9 @@ int UDPStandardImplementation::startReceiver(char *c){
return FAIL; return FAIL;
} }
//For compression, just for gui purposes //For compression, just for gui purposes
if(dataCompressionEnable) if(dataCompressionEnable)
sprintf(completeFileName[0], "%s/%s_fxxx_%lld_xx.root", filePath,fileName,(long long int)fileIndex); sprintf(completeFileName[0], "%s/%s_fxxx_%lld_xx.root", filePath,fileName[0],(long long int)fileIndex);
//initialize semaphore to synchronize between writer and gui reader threads //initialize semaphore to synchronize between writer and gui reader threads
for(int i=0;i<numberofWriterThreads;i++) for(int i=0;i<numberofWriterThreads;i++)
@ -989,10 +988,12 @@ int UDPStandardImplementation::shutDownUDPSockets(){
for(int i=0;i<numberofListeningThreads;i++){ for(int i=0;i<numberofListeningThreads;i++){
if(udpSocket[i]){ if(udpSocket[i]){
pthread_mutex_lock(&udpSocketMutex[i]);
udpSocket[i]->ShutDownSocket(); udpSocket[i]->ShutDownSocket();
FILE_LOG(logINFO) << "Shut down UDP Socket " << i; FILE_LOG(logINFO) << "Shut down UDP Socket " << i;
delete udpSocket[i]; delete udpSocket[i];
udpSocket[i] = NULL; udpSocket[i] = NULL;
pthread_mutex_unlock(&udpSocketMutex[i]);
} }
} }
return OK; return OK;
@ -1051,25 +1052,25 @@ void UDPStandardImplementation::startReadout(){
/**make this better by asking all of it at once*/ /**make this better by asking all of it at once*/
void UDPStandardImplementation::readFrame(int wThread, char* c,char** raw, uint64_t &startAcq, uint64_t &startFrame){ void UDPStandardImplementation::readFrame(int ithread, char* c,char** raw, uint64_t &startAcq, uint64_t &startFrame){
FILE_LOG(logDEBUG) << __AT__ << " called"; FILE_LOG(logDEBUG) << __AT__ << " called";
//point to gui data, to let writer thread know that gui is back for data //point to gui data, to let writer thread know that gui is back for data
if (guiData[wThread] == NULL){ if (guiData[ithread] == NULL){
guiData[wThread] = latestData[wThread]; guiData[ithread] = latestData[ithread];
#ifdef DEBUG4 #ifdef DEBUG4
cprintf(CYAN,"Info: gui data not null anymore - ready to get data\n"); cprintf(CYAN,"Info: gui data not null anymore - ready to get data\n");
#endif #endif
} }
//copy data and filename //copy data and filename
strcpy(c,guiFileName[wThread]); strcpy(c,guiFileName[ithread]);
startAcq = startAcquisitionIndex[wThread]; startAcq = startAcquisitionIndex;
startFrame = startFrameIndex[wThread]; startFrame = startFrameIndex[ithread];
//gui data not copied yet //gui data not copied yet
if(!guiDataReady[wThread]){ if(!guiDataReady[ithread]){
#ifdef DEBUG4 #ifdef DEBUG4
cprintf(CYAN,"Info: gui data not ready\n"); cprintf(CYAN,"Info: gui data not ready\n");
#endif #endif
@ -1081,8 +1082,8 @@ void UDPStandardImplementation::readFrame(int wThread, char* c,char** raw, uint6
#ifdef DEBUG4 #ifdef DEBUG4
cprintf(CYAN,"Info: gui data ready\n"); cprintf(CYAN,"Info: gui data ready\n");
#endif #endif
*raw = guiData[wThread]; *raw = guiData[ithread];
guiData[wThread] = NULL; guiData[ithread] = NULL;
//for nth frame to gui, post semaphore so writer stops waiting //for nth frame to gui, post semaphore so writer stops waiting
if((FrameToGuiFrequency) && (writerThreadsMask)){ if((FrameToGuiFrequency) && (writerThreadsMask)){
@ -1090,7 +1091,7 @@ void UDPStandardImplementation::readFrame(int wThread, char* c,char** raw, uint6
cprintf(CYAN,"Info: gonna post\n"); cprintf(CYAN,"Info: gonna post\n");
#endif #endif
//release after getting data //release after getting data
sem_post(&writerGuiSemaphore[wThread]); sem_post(&writerGuiSemaphore[ithread]);
} }
#ifdef DEBUG4 #ifdef DEBUG4
cprintf(CYAN,"Info: done post\n"); cprintf(CYAN,"Info: done post\n");
@ -1101,53 +1102,53 @@ void UDPStandardImplementation::readFrame(int wThread, char* c,char** raw, uint6
void UDPStandardImplementation::closeFile(int i){ void UDPStandardImplementation::closeFile(int ithread){
FILE_LOG(logDEBUG) << __AT__ << " called for " << i ; FILE_LOG(logDEBUG) << __AT__ << " called for " << ithread ;
//normal //normal
if(!dataCompressionEnable){ if(!dataCompressionEnable){
if(sfilefd[i]){ if(sfilefd[ithread]){
#ifdef DEBUG4 #ifdef DEBUG4
FILE_LOG(logDEBUG4) << "Going to close file: " << fileno(sfilefd)); FILE_LOG(logDEBUG4) << "Going to close file: " << fileno(sfilefd));
#endif #endif
fclose(sfilefd[i]); fclose(sfilefd[ithread]);
sfilefd[i] = NULL; sfilefd[ithread] = NULL;
} }
} }
//compression //compression
else{ else{
#if (defined(MYROOT1) && defined(ALLFILE_DEBUG)) || !defined(MYROOT1) #if (defined(MYROOT1) && defined(ALLFILE_DEBUG)) || !defined(MYROOT1)
if(sfilefd[i]){ if(sfilefd[0]){
#ifdef DEBUG4 #ifdef DEBUG4
FILE_LOG(logDEBUG4) << "sfield: " << (int)sfilefd[i]; FILE_LOG(logDEBUG4) << "sfield: " << (int)sfilefd[i];
#endif #endif
fclose(sfilefd[i]); fclose(sfilefd[0]);
sfilefd[i] = NULL; sfilefd[0] = NULL;
} }
#endif #endif
#ifdef MYROOT1 #ifdef MYROOT1
pthread_mutex_lock(&writeMutex); pthread_mutex_lock(&writeMutex);
//write to file //write to file
if(myTree[i] && myFile[i]){ if(myTree[ithread] && myFile[ithread]){
myFile[i] = myTree[i]->GetCurrentFile(); myFile[ithread] = myTree[ithread]->GetCurrentFile();
if(myFile[i]->Write()) if(myFile[ithread]->Write())
//->Write(tall->GetName(),TObject::kOverwrite); //->Write(tall->GetName(),TObject::kOverwrite);
cout << "Thread " << i <<": wrote frames to file" << endl; cout << "Thread " << ithread <<": wrote frames to file" << endl;
else else
cout << "Thread " << i << ": could not write frames to file" << endl; cout << "Thread " << ithread << ": could not write frames to file" << endl;
}else }else
cout << "Thread " << i << ": could not write frames to file: No file or No Tree" << endl; cout << "Thread " << ithread << ": could not write frames to file: No file or No Tree" << endl;
//close file //close file
if(myTree[i] && myFile[i]) if(myTree[ithread] && myFile[ithread])
myFile[i] = myTree[i]->GetCurrentFile(); myFile[ithread] = myTree[ithread]->GetCurrentFile();
if(myFile[i] != NULL) if(myFile[ithread] != NULL)
myFile[i]->Close(); myFile[ithread]->Close();
myFile[i] = NULL; myFile[ithread] = NULL;
myTree[i] = NULL; myTree[ithread] = NULL;
pthread_mutex_unlock(&writeMutex); pthread_mutex_unlock(&writeMutex);
#endif #endif
@ -1363,7 +1364,8 @@ int UDPStandardImplementation::setupWriter(){
//acquisition start call back returns enable write //acquisition start call back returns enable write
cbAction = DO_EVERYTHING; cbAction = DO_EVERYTHING;
if (startAcquisitionCallBack) if (startAcquisitionCallBack)
cbAction=startAcquisitionCallBack(filePath,fileName,(int)fileIndex,bufferSize,pStartAcquisition); cbAction=startAcquisitionCallBack(filePath,fileName[0],(int)fileIndex,bufferSize,pStartAcquisition);
if(cbAction < DO_EVERYTHING){ if(cbAction < DO_EVERYTHING){
FILE_LOG(logINFO) << "Call back activated. Data saving must be taken care of by user in call back."; FILE_LOG(logINFO) << "Call back activated. Data saving must be taken care of by user in call back.";
@ -1406,16 +1408,16 @@ int UDPStandardImplementation::createNewFile(int ithread){
FILE_LOG(logDEBUG) << __AT__ << " called"; FILE_LOG(logDEBUG) << __AT__ << " called";
int index = 0; int index = 0;
if(packetsCaught) if(packetsCaughtPerThread[ithread])
index = frameIndex[ithread]; index = frameIndex[ithread];
//create file name //create file name
if(!frameIndexEnable) if(!frameIndexEnable)
sprintf(completeFileName[ithread], "%s/%s_%lld.raw", filePath,fileName,(long long int)fileIndex); sprintf(completeFileName[ithread], "%s/%s_%lld.raw", filePath,fileName[ithread],(long long int)fileIndex);
else if (myDetectorType == EIGER) else if (myDetectorType == EIGER)
sprintf(completeFileName[ithread], "%s/%s_f%012lld_%lld.raw", filePath,fileName,(long long int)currentFrameNumber,(long long int)fileIndex); sprintf(completeFileName[ithread], "%s/%s_f%012lld_%lld.raw", filePath,fileName[ithread],(long long int)currentFrameNumber[ithread],(long long int)fileIndex);
else else
sprintf(completeFileName[ithread], "%s/%s_f%012lld_%lld.raw", filePath,fileName,(long long int)(packetsCaught/packetsPerFrame),(long long int)fileIndex); sprintf(completeFileName[ithread], "%s/%s_f%012lld_%lld.raw", filePath,fileName[ithread],(long long int)(packetsCaught[ithread]/packetsPerFrame),(long long int)fileIndex);
#ifdef DEBUG4 #ifdef DEBUG4
FILE_LOG(logINFO) << completefileName; FILE_LOG(logINFO) << completefileName;
@ -1425,54 +1427,56 @@ int UDPStandardImplementation::createNewFile(int ithread){
if(fileWriteEnable && cbAction > DO_NOTHING){ if(fileWriteEnable && cbAction > DO_NOTHING){
//close file pointers //close file pointers
if(sfilefd){ if(sfilefd[ithread]){
fclose(sfilefd); fclose(sfilefd[ithread]);
sfilefd = NULL; sfilefd[ithread] = NULL;
} }
//create file //create file
if(!overwriteEnable){ if(!overwriteEnable){
if (NULL == (sfilefd = fopen((const char *) (completeFileName), "wx"))){ if (NULL == (sfilefd[ithread] = fopen((const char *) (completeFileName[ithread]), "wx"))){
FILE_LOG(logERROR) << "Could not create/overwrite file" << completeFileName; FILE_LOG(logERROR) << "Could not create/overwrite file" << completeFileName[ithread];
return FAIL; return FAIL;
} }
}else if (NULL == (sfilefd = fopen((const char *) (completeFileName), "w"))){ }else if (NULL == (sfilefd[ithread] = fopen((const char *) (completeFileName[ithread]), "w"))){
FILE_LOG(logERROR) << "Could not create file" << completeFileName; FILE_LOG(logERROR) << "Could not create file" << completeFileName[ithread];
return FAIL; return FAIL;
} }
//setting file buffer size to 16mb //setting file buffer size to 16mb
setvbuf(sfilefd,NULL,_IOFBF,BUF_SIZE); setvbuf(sfilefd[ithread],NULL,_IOFBF,BUF_SIZE);
//Print packet loss and filenames //Print packet loss and filenames
if(!packetsCaught){ if(!packetsCaughtPerThread[ithread]){
previousFrameNumber = -1; previousFrameNumber[ithread] = -1;
cout << "File: " << completeFileName << endl; cout << "File: " << completeFileName[ithread] << endl;
}else{ }else{
if (previousFrameNumber == -1) //Assumption for startFrameindex usign ithread: datacompression never enters here and therefore is always same number of listening and writing threads to use ithread
previousFrameNumber = startFrameIndex-1; if (previousFrameNumber[ithread] == -1)
previousFrameNumber[ithread] = startFrameIndex[ithread]-1;
cout << completeFileName cout << completeFileName[ithread]
<< "\tPacket Loss: " << setw(4)<<fixed << setprecision(4) << dec << << "\tPacket Loss: " << setw(4)<<fixed << setprecision(4) << dec <<
(int)((( (currentFrameNumber-previousFrameNumber) - ((packetsInFile-numTotMissingPacketsInFile)/packetsPerFrame))/ (int)((( (currentFrameNumber[ithread]-previousFrameNumber[ithread]) - ((packetsInFile[ithread]-numTotMissingPacketsInFile[ithread])/packetsPerFrame))/
(double)(currentFrameNumber-previousFrameNumber))*100.000) (double)(currentFrameNumber[ithread]-previousFrameNumber[ithread]))*100.000)
<< "%\tFramenumber: " << currentFrameNumber << "%\tFramenumber: " << currentFrameNumber[ithread]
<< "\t\t PreviousFrameNumber: " << previousFrameNumber << "\t\t PreviousFrameNumber: " << previousFrameNumber[ithread]
<< "\tIndex " << dec << index << "\tIndex " << dec << index
<< "\tLost " << dec << ( ((int)(currentFrameNumber-previousFrameNumber)) - << "\tLost " << dec << ( ((int)(currentFrameNumber[ithread]-previousFrameNumber[ithread])) -
((packetsInFile-numTotMissingPacketsInFile)/packetsPerFrame)) << endl; ((packetsInFile[ithread]-numTotMissingPacketsInFile[ithread])/packetsPerFrame)) << endl;
} }
//write file header //write file header
if(myDetectorType == EIGER) if(myDetectorType == EIGER)
fwrite((void*)fileHeader, 1, strlen(fileHeader), sfilefd); fwrite((void*)fileHeader[ithread], 1, strlen(fileHeader[ithread]), sfilefd[ithread]);
} }
//reset counters for each new file //reset counters for each new file
if(packetsCaught){ if(packetsCaughtPerThread[ithread]){
previousFrameNumber = currentFrameNumber; previousFrameNumber[ithread] = currentFrameNumber[ithread];
packetsInFile = 0; packetsInFile[ithread] = 0;
numTotMissingPacketsInFile = 0; numTotMissingPacketsInFile[ithread] = 0;
} }
@ -1489,12 +1493,12 @@ int UDPStandardImplementation::createCompressionFile(int ithread, int iframe){
#ifdef MYROOT1 #ifdef MYROOT1
char temp[MAX_STR_LENGTH]; char temp[MAX_STR_LENGTH];
//create file name for gui purposes, and set up acquistion parameters //create file name for gui purposes, and set up acquistion parameters
sprintf(temp, "%s/%s_fxxx_%d_%d.root", filePath,fileName,fileIndex,ithread); sprintf(temp, "%s/%s_fxxx_%d_%d.root", filePath,fileName[ithread],fileIndex,ithread);
//file //file
myFile[ithread] = new TFile(temp,"RECREATE");/** later return error if it exists */ myFile[ithread] = new TFile(temp,"RECREATE");/** later return error if it exists */
cprintf(GREEN,"Writing_Thread %d: Created Compression File: %s\n",ithread, temp); cprintf(GREEN,"Writing_Thread %d: Created Compression File: %s\n",ithread, temp);
//tree //tree
sprintf(temp, "%s_fxxx_%d_%d",fileName,fileIndex,ithread); sprintf(temp, "%s_fxxx_%d_%d",fileName[ithread],fileIndex,ithread);
myTree[ithread]=singlePhotonDetectorObject[ithread]->initEventTree(temp, &iframe); myTree[ithread]=singlePhotonDetectorObject[ithread]->initEventTree(temp, &iframe);
//resets the pedestalSubtraction array and the commonModeSubtraction //resets the pedestalSubtraction array and the commonModeSubtraction
singlePhotonDetectorObject[ithread]->newDataSet(); singlePhotonDetectorObject[ithread]->newDataSet();
@ -1589,19 +1593,14 @@ void UDPStandardImplementation::startListening(){
rc = prepareAndListenBuffer(ithread, listenSize, carryonBufferSize, tempBuffer); rc = prepareAndListenBuffer(ithread, listenSize, carryonBufferSize, tempBuffer);
//start indices for each start of scan/acquisition //start indices for each start of scan/acquisition
if((!measurementStarted) && (rc > 0)){ if((!measurementStarted) && (rc > 0))
pthread_mutex_lock(&progressMutex); startFrameIndices(ithread);
if(!measurementStarted)
startFrameIndices(ithread);
pthread_mutex_unlock(&progressMutex);
}
//problem in receiving or end of acquisition //problem in receiving or end of acquisition
if (status == TRANSMITTING){ if (status == TRANSMITTING){
stopListening(ithread,rc); stopListening(ithread,rc);
continue; continue;
} }
//write packet count to buffer //write packet count to buffer
if(myDetectorType == EIGER) if(myDetectorType == EIGER)
(*((uint32_t*)(buffer[ithread]))) = 1; (*((uint32_t*)(buffer[ithread]))) = 1;
@ -1653,10 +1652,14 @@ int UDPStandardImplementation::prepareAndListenBuffer(int ithread, int lSize, in
if(cSize) if(cSize)
memcpy(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS, temp, cSize); memcpy(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS, temp, cSize);
pthread_mutex_lock(&udpSocketMutex[ithread]);
int receivedSize = udpSocket[ithread]->ReceiveDataOnly(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS + cSize, lSize + cSize); int receivedSize = udpSocket[ithread]->ReceiveDataOnly(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS + cSize, lSize + cSize);
//throw away packets that is not one packet size
//throw away packets that is not one packet size, need to check status if socket is shut down while(myDetectorType == EIGER && receivedSize != onePacketSize) {
while(status != TRANSMITTING && myDetectorType == EIGER && receivedSize != onePacketSize) { //need to check status if socket is shut down
if(status == TRANSMITTING)
break;
//print
if(receivedSize != EIGER_HEADER_LENGTH){ if(receivedSize != EIGER_HEADER_LENGTH){
cprintf(RED,"Listening_Thread %d: Listened to a weird packet size %d\n",ithread, receivedSize); cprintf(RED,"Listening_Thread %d: Listened to a weird packet size %d\n",ithread, receivedSize);
} }
@ -1664,8 +1667,11 @@ int UDPStandardImplementation::prepareAndListenBuffer(int ithread, int lSize, in
else else
cprintf(BLUE,"Listening_Thread %d: Listened to a header packet\n",ithread); cprintf(BLUE,"Listening_Thread %d: Listened to a header packet\n",ithread);
#endif #endif
//listen again
receivedSize = udpSocket[ithread]->ReceiveDataOnly(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS); receivedSize = udpSocket[ithread]->ReceiveDataOnly(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS);
} }
pthread_mutex_unlock(&udpSocketMutex[ithread]);
totalListeningFrameCount[ithread] += (receivedSize/onePacketSize); totalListeningFrameCount[ithread] += (receivedSize/onePacketSize);
#ifdef MANUALDEBUG #ifdef MANUALDEBUG
@ -1710,18 +1716,18 @@ void UDPStandardImplementation::startFrameIndices(int ithread){
jfrau_packet_header_t* header=0; jfrau_packet_header_t* header=0;
switch(myDetectorType){ switch(myDetectorType){
case EIGER: case EIGER:
startFrameIndex = 0; //frame number always resets startFrameIndex[ithread] = 0; //frame number always resets
break; break;
case JUNGFRAU: case JUNGFRAU:
header = (jfrau_packet_header_t*)(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS); header = (jfrau_packet_header_t*)(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS);
startFrameIndex = (*( (uint32_t*) header->frameNumber))&0xffffff; startFrameIndex[ithread] = (*( (uint32_t*) header->frameNumber))&0xffffff;
break; break;
default: default:
if(shortFrameEnable < 0){ if(shortFrameEnable < 0){
startFrameIndex = (((((uint32_t)(*((uint32_t*)(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS))))+1) startFrameIndex[ithread] = (((((uint32_t)(*((uint32_t*)(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS))))+1)
& (frameIndexMask)) >> frameIndexOffset); & (frameIndexMask)) >> frameIndexOffset);
}else{ }else{
startFrameIndex = ((((uint32_t)(*((uint32_t*)(buffer[ithread]+HEADER_SIZE_NUM_TOT_PACKETS)))) startFrameIndex[ithread] = ((((uint32_t)(*((uint32_t*)(buffer[ithread]+HEADER_SIZE_NUM_TOT_PACKETS))))
& (frameIndexMask)) >> frameIndexOffset); & (frameIndexMask)) >> frameIndexOffset);
} }
break; break;
@ -1729,14 +1735,16 @@ void UDPStandardImplementation::startFrameIndices(int ithread){
//start of entire acquisition //start of entire acquisition
if(!acqStarted){ if(!acqStarted){
startAcquisitionIndex = startFrameIndex; pthread_mutex_lock(&progressMutex);
startAcquisitionIndex = startFrameIndex[ithread];
acqStarted = true; acqStarted = true;
pthread_mutex_unlock(&progressMutex);
cprintf(BLUE,"Listening_Thread %d: startAcquisitionIndex:%lld\n",ithread,(long long int)startAcquisitionIndex); cprintf(BLUE,"Listening_Thread %d: startAcquisitionIndex:%lld\n",ithread,(long long int)startAcquisitionIndex);
} }
//set start of scan/real time measurement //set start of scan/real time measurement
cprintf(BLUE,"Listening_Thread %d: startFrameIndex: %lld\n", ithread,(long long int)startFrameIndex); cprintf(BLUE,"Listening_Thread %d: startFrameIndex: %lld\n", ithread,(long long int)startFrameIndex[ithread]);
measurementStarted = true; measurementStarted[ithread] = true;
} }
@ -2215,7 +2223,7 @@ void UDPStandardImplementation::processWritingBufferPacketByPacket(int ithread){
threadFrameNumber[i] = (uint32_t)(*( (uint64_t*) packetBuffer_footer)); threadFrameNumber[i] = (uint32_t)(*( (uint64_t*) packetBuffer_footer));
//last frame read out //last frame read out
lastFrameIndex = threadFrameNumber[i]; lastFrameIndex = threadFrameNumber[i];
threadFrameNumber[i] += (startFrameIndex - 1); threadFrameNumber[i] += (startFrameIndex[ithread] - 1);
//packet number //packet number
currentPacketNumber[i] = *( (uint16_t*) packetBuffer_footer->packetNumber); currentPacketNumber[i] = *( (uint16_t*) packetBuffer_footer->packetNumber);
@ -2371,7 +2379,7 @@ void UDPStandardImplementation::processWritingBufferPacketByPacket(int ithread){
//ensuring last packet got is not of some other future frame but of the current one //ensuring last packet got is not of some other future frame but of the current one
eiger_packet_footer_t* wbuf_footer1 = (eiger_packet_footer_t*)(packetBuffer[i] + footerOffset + HEADER_SIZE_NUM_TOT_PACKETS); eiger_packet_footer_t* wbuf_footer1 = (eiger_packet_footer_t*)(packetBuffer[i] + footerOffset + HEADER_SIZE_NUM_TOT_PACKETS);
uint64_t packfnum = (((uint32_t)(*( (uint64_t*) wbuf_footer1)))+(startFrameIndex - 1)); uint64_t packfnum = (((uint32_t)(*( (uint64_t*) wbuf_footer1)))+(startFrameIndex[ithread] - 1));
//to reset to get new frame: not dummy and the last packet //to reset to get new frame: not dummy and the last packet
if((numPackets[i] != dummyPacketValue) && (currentPacketNumber[i] == LAST_PACKET_VALUE) && (packfnum == currentFrameNumber) ) if((numPackets[i] != dummyPacketValue) && (currentPacketNumber[i] == LAST_PACKET_VALUE) && (packfnum == currentFrameNumber) )
@ -2460,13 +2468,27 @@ void UDPStandardImplementation::waitWritingBufferForNextAcquisition(int ithread)
if((1<<ithread)&createFileMask){ if((1<<ithread)&createFileMask){
//change the detector index in the file names //change the detector index in the file names
if(myDetectorType == EIGER){ if(myDetectorType == EIGER){
string tempname;int ci = 0, fi = 0, p = 0, di = 0; double cs0 = 0 , cs1 = 0; int detindex = -1;
tempname.assign(fileName+"_0.raw"); string tempname(fileName[ithread]);
fileIOStatic::getVariablesFromFileName(tempname,ci, fi, p, cs0, cs1, di); cout<<"tempname:"<<tempname<<endl;
if(di!=-1) di = 0;
di = (di * 2) + ithread; size_t uscore=tempname.rfind("_");
strcpy(receiverFilePrefix[ithread],createReceiverFilePrefix(fileIO::getNameFromReceiverFilePrefix(tempname),).c_str()); if (uscore!=string::npos){
if (sscanf(tempname.substr(uscore+1,tempname.size()-uscore-1).c_str(),"%d",&detindex)) {
cout<<"got detindex :" << detindex<< endl;
tempname=tempname.substr(0,uscore);
cout<<"truncate fname to :"<<tempname<<endl;
sprintf(fileName[ithread],"%s_%d",tempname,detindex*2+ithread);
cout<<"new fname:"<<fileName[ithread]<<endl;
}
}
if(detindex == -1){
sprintf(fileName[ithread],"%s_%d",fileName[ithread],ithread);
cout<<"only one det. so added:"<<fileName[ithread]<<endl;
}
} }
if(dataCompressionEnable){ if(dataCompressionEnable){
#ifdef MYROOT1 #ifdef MYROOT1
pthread_mutex_lock(&writeMutex); pthread_mutex_lock(&writeMutex);
@ -2660,7 +2682,7 @@ void UDPStandardImplementation::handleWithoutDataCompression(int ithread, char*
} }
//set indices //set indices
acquisitionIndex = currentFrameNumber - startAcquisitionIndex; acquisitionIndex = currentFrameNumber - startAcquisitionIndex;
frameIndex = currentFrameNumber - startFrameIndex; frameIndex = currentFrameNumber - startFrameIndex[ithread];
} }
@ -2752,7 +2774,7 @@ void UDPStandardImplementation::writeFileWithoutCompression(int ithread, char* w
//set indices //set indices
acquisitionIndex = currentFrameNumber - startAcquisitionIndex; acquisitionIndex = currentFrameNumber - startAcquisitionIndex;
frameIndex = currentFrameNumber - startFrameIndex; frameIndex = currentFrameNumber - startFrameIndex[ithread];
} }
#ifdef DEBUG3 #ifdef DEBUG3
cprintf(GREEN,"Writing_Thread: Current Frame Number:%d\n",currentFrameNumber); cprintf(GREEN,"Writing_Thread: Current Frame Number:%d\n",currentFrameNumber);
@ -2781,13 +2803,16 @@ void UDPStandardImplementation::writeFileWithoutCompression(int ithread, char* w
packetsInFile += packetsToSave; packetsInFile += packetsToSave;
#ifdef DEBUG4 #ifdef DEBUG4
cprintf(GREEN,"Writing Thread: packetsCaught till now:%d packetsToSave:%d numMissingPackets:%d packetsCaught now:%d\n", cprintf(GREEN,"Writing Thread: packetsCaught till now:%d packetsToSave:%d numMissingPackets:%d packetsCaught now:%d\n",
packetsCaught,packetsToSave,numMissingPackets,(packetsToSave - numMissingPackets)); packetsCaughtPerThread[ithread],packetsToSave,numMissingPackets,(packetsToSave - numMissingPackets));
#endif #endif
packetsCaughtPerThread[ithread] += (packetsToSave - numMissingPackets);
pthread_mutex_lock(&progressMutex);
packetsCaught += (packetsToSave - numMissingPackets); packetsCaught += (packetsToSave - numMissingPackets);
pthread_mutex_unlock(&progressMutex);
totalPacketsCaught += (packetsToSave - numMissingPackets); totalPacketsCaught += (packetsToSave - numMissingPackets);
numMissingPackets = 0; numMissingPackets = 0;
#ifdef DEBUG4 #ifdef DEBUG4
cprintf(GREEN,"Writing Thread: packetscaught:%d totalPacketsCaught:%d\n", packetsCaught,totalPacketsCaught); cprintf(GREEN,"Writing Thread: packetscaught:%d totalPacketsCaught:%d\n", packetsCaughtPerThread[ithread],totalPacketsCaught);
#endif #endif
//increase offset //increase offset
@ -2801,7 +2826,10 @@ void UDPStandardImplementation::writeFileWithoutCompression(int ithread, char* w
else{ else{
if(numberofWriterThreads > 1) pthread_mutex_lock(&writeMutex); if(numberofWriterThreads > 1) pthread_mutex_lock(&writeMutex);
packetsInFile += numpackets; packetsInFile += numpackets;
packetsCaughtPerThread[ithread] += (numpackets - numMissingPackets);
pthread_mutex_lock(&progressMutex);
packetsCaught += (numpackets - numMissingPackets); packetsCaught += (numpackets - numMissingPackets);
pthread_mutex_unlock(&progressMutex);
totalPacketsCaught += (numpackets - numMissingPackets); totalPacketsCaught += (numpackets - numMissingPackets);
numMissingPackets = 0; numMissingPackets = 0;
if(numberofWriterThreads > 1) pthread_mutex_unlock(&writeMutex); if(numberofWriterThreads > 1) pthread_mutex_unlock(&writeMutex);
@ -2906,7 +2934,7 @@ void UDPStandardImplementation::createHeaders(char* wbuffer[]){
} }
void UDPStandardImplementation::updateFileHeader(){ void UDPStandardImplementation::updateFileHeader(int ithread){
int xpix=-1,ypix=-1; int xpix=-1,ypix=-1;
//create detector specific packet header //create detector specific packet header
@ -2934,10 +2962,10 @@ void UDPStandardImplementation::updateFileHeader(){
//update file header //update file header
time_t t = time(0); time_t t = time(0);
int length = sizeof(fileHeader); int length = sizeof(fileHeader[ithread]);
while((unsigned int)length!=strlen(fileHeader)){ while((unsigned int)length!=strlen(fileHeader[ithread])){
length = strlen(fileHeader); length = strlen(fileHeader[ithread]);
sprintf(fileHeader,"\nHeader\t\t %d bytes\n" sprintf(fileHeader[ithread],"\nHeader\t\t %d bytes\n"
"Dynamic Range\t %d\n" "Dynamic Range\t %d\n"
"Packet\t\t %d bytes\n" "Packet\t\t %d bytes\n"
"x\t\t %d pixels\n" "x\t\t %d pixels\n"
@ -3037,7 +3065,7 @@ void UDPStandardImplementation::handleDataCompression(int ithread, char* wbuffer
pthread_mutex_unlock(&progressMutex); pthread_mutex_unlock(&progressMutex);
//set indices //set indices
acquisitionIndex = currentFrameNumber - startAcquisitionIndex; acquisitionIndex = currentFrameNumber - startAcquisitionIndex;
frameIndex = currentFrameNumber - startFrameIndex; frameIndex = currentFrameNumber - startFrameIndex[0];
//variable definitions //variable definitions
@ -3123,6 +3151,7 @@ void UDPStandardImplementation::handleDataCompression(int ithread, char* wbuffer
#ifndef ALLFILE #ifndef ALLFILE
pthread_mutex_lock(&progressMutex); pthread_mutex_lock(&progressMutex);
packetsInFile += packetsPerFrame; packetsInFile += packetsPerFrame;
packetsCaughtPerThread[0] += packetsPerFrame;
packetsCaught += packetsPerFrame; packetsCaught += packetsPerFrame;
totalPacketsCaught += packetsPerFrame; totalPacketsCaught += packetsPerFrame;
if(packetsInFile >= (uint32_t)maxPacketsPerFile) if(packetsInFile >= (uint32_t)maxPacketsPerFile)

View File

@ -370,6 +370,7 @@ int slsReceiverTCPIPInterface::set_detector_type(){
} }
if(ret != FAIL){ if(ret != FAIL){
#ifndef REST #ifndef REST
if(receiverBase) delete receiverBase;
receiverBase = UDPInterface::create("standard"); receiverBase = UDPInterface::create("standard");
if(startAcquisitionCallBack) if(startAcquisitionCallBack)
receiverBase->registerCallBackStartAcquisition(startAcquisitionCallBack,pStartAcquisition); receiverBase->registerCallBackStartAcquisition(startAcquisitionCallBack,pStartAcquisition);
@ -998,6 +999,10 @@ int slsReceiverTCPIPInterface::reset_frames_caught(){
strcpy(mess,SET_RECEIVER_ERR_MESSAGE); strcpy(mess,SET_RECEIVER_ERR_MESSAGE);
ret=FAIL; ret=FAIL;
} }
else if(receiverBase->getStatus()==RUNNING){
strcpy(mess,"Cannot reset frames caught while status is running\n");
ret=FAIL;
}
else else
receiverBase->resetAcquisitionCount(); receiverBase->resetAcquisitionCount();
} }