/********************************************//** * @file UDPStandardImplementation.cpp * @short does all the functions for a receiver, set/get parameters, start/stop etc. ***********************************************/ #include "UDPStandardImplementation.h" #include "moench02ModuleData.h" #include "gotthardModuleData.h" #include "gotthardShortModuleData.h" //#include // socket(), bind(), listen(), accept(), shut down //#include // sock_addr_in, htonl, INADDR_ANY #include // exit() #include //set precision for printing parameters for create new file #include //map //#include //munmap #include #include #include #include using namespace std; #define WRITE_HEADERS /************************************************************************* * Constructor & Destructor ********************************************** * They access local cache of configuration or detector parameters ******* *************************************************************************/ UDPStandardImplementation::UDPStandardImplementation(){ FILE_LOG(logDEBUG) << __AT__ << " called"; initializeMembers(); //***mutex*** pthread_mutex_init(&statusMutex,NULL); pthread_mutex_init(&writeMutex,NULL); pthread_mutex_init(&dataReadyMutex,NULL); pthread_mutex_init(&progressMutex,NULL); //to increase socket receiver buffer size and max length of input queue by changing kernel settings if(myDetectorType == EIGER); else if(system("echo $((100*1024*1024)) > /proc/sys/net/core/rmem_max")){ FILE_LOG(logDEBUG) << "Warning: No root permission to change socket receiver buffer size in file /proc/sys/net/core/rmem_max"; }else if(system("echo 250000 > /proc/sys/net/core/netdev_max_backlog")){ FILE_LOG(logDEBUG) << "Warning: No root permission to change max length of input queue in file /proc/sys/net/core/netdev_max_backlog"; } /** permanent setting by heiner net.core.rmem_max = 104857600 # 100MiB net.core.netdev_max_backlog = 250000 sysctl -p // from the manual sysctl -w net.core.rmem_max=16777216 sysctl -w net.core.netdev_max_backlog=250000 */ } UDPStandardImplementation::~UDPStandardImplementation(){ FILE_LOG(logDEBUG) << __AT__ << " called"; closeFile(); deleteMembers(); } /************************************************************************* * Setters *************************************************************** * They modify the local cache of configuration or detector parameters *** *************************************************************************/ /***initial parameters***/ void UDPStandardImplementation::deleteMembers(){ FILE_LOG(logDEBUG) << __AT__ << " starting"; FILE_LOG(logDEBUG) << "Info: Deleting member pointers"; shutDownUDPSockets(); closeFile(); //filter deleteFilter(); for(int i=0; i(receiverData[i], csize, sigma, sign, moenchCommonModeSubtraction); } int UDPStandardImplementation::setupFifoStructure(){ FILE_LOG(logDEBUG) << __AT__ << " called"; int64_t i; int oldNumberofJobsPerBuffer = numberofJobsPerBuffer; uint32_t oldFifoSize = fifoSize; //eiger always listens to 1 packet at a time if(myDetectorType == EIGER){ numberofJobsPerBuffer = 1; FILE_LOG(logDEBUG) << "Info: 1 packet per buffer"; } //else calculate best possible number of frames to listen to at a time (for fast readouts like gotthard) else{ //if frequency to gui is not random (every nth frame), then listen to only n frames per buffer if(FrameToGuiFrequency) numberofJobsPerBuffer = FrameToGuiFrequency; //random frame sent to gui, then frames per buffer depends on acquisition period else{ //calculate 100ms/period to get frames to listen to at a time if(!acquisitionPeriod) i = SAMPLE_TIME_IN_NS; else i = SAMPLE_TIME_IN_NS/acquisitionPeriod; //max frames to listen to at a time is limited by 1000 if (i > MAX_JOBS_PER_THREAD) numberofJobsPerBuffer = MAX_JOBS_PER_THREAD; else if (i < 1) numberofJobsPerBuffer = 1; else numberofJobsPerBuffer = i; } FILE_LOG(logINFO) << "Number of Frames per buffer:" << numberofJobsPerBuffer << endl; } //set fifo depth //eiger listens to 1 packet at a time and size changes depending on packets per frame if(myDetectorType == EIGER) fifoSize = EIGER_FIFO_SIZE * packetsPerFrame; else{ fifoSize = GOTTHARD_FIFO_SIZE; if(myDetectorType == MOENCH) fifoSize = MOENCH_FIFO_SIZE; else if(myDetectorType == PROPIX) fifoSize = PROPIX_FIFO_SIZE; //reduce fifo depth if more frames listened to at a time if(fifoSize % numberofJobsPerBuffer) fifoSize = (fifoSize/numberofJobsPerBuffer)+1; else fifoSize = fifoSize/numberofJobsPerBuffer; } FILE_LOG(logDEBUG) << "Info: Fifo Depth:" << fifoSize; //do not rebuild fifo structure if it is the same if((oldNumberofJobsPerBuffer == numberofJobsPerBuffer) && (oldFifoSize == fifoSize)) return OK; int count = 0; //set up fifo structure for(int i=0;iisEmpty()) fifoFree[i]->pop(buffer[i]); #ifdef DEBUG5 cprintf(BLUE,"Info: %d fifostructure popped from fifofree %p\n", i, (void*)(buffer[i])); #endif delete fifoFree[i]; } if(fifo[i]){ while(!fifo[i]->isEmpty()) fifo[i]->pop(buffer[i]); delete fifo[i]; } if(mem0[i]) free(mem0[i]); //creating fifoFree[i] = new CircularFifo(fifoSize); fifo[i] = new CircularFifo(fifoSize); //cout<<"buffersize:"<push(buffer[i]); //#ifdef DEBUG5 if(count==0 || count == 127998) cprintf(BLUE,"Info: %d fifostructure free pushed into fifofree %p\n", i, (void*)(buffer[i])); //#endif buffer[i] += (bufferSize * numberofJobsPerBuffer + HEADER_SIZE_NUM_TOT_PACKETS); count++; } } FILE_LOG(logDEBUG) << "Info: Fifo structure(s) reconstructed"; return OK; } void UDPStandardImplementation::configure(map config_map){ FILE_LOG(logDEBUG) << __AT__ << " starting"; map::const_iterator pos; pos = config_map.find("mode"); if (pos != config_map.end() ){ int b; if(!sscanf(pos->second.c_str(), "%d", &b)){ cout << "Warning: Could not parse mode. Assuming top mode." << endl; b = 0; } bottomEnable = b!= 0; FILE_LOG(logINFO) << "Bottom: " << stringEnable(bottomEnable); } } /***file parameters***/ int UDPStandardImplementation::setDataCompressionEnable(const bool b){ FILE_LOG(logDEBUG) << __AT__ << " starting"; if(myDetectorType != EIGER){ cout << "Info: Setting up Data Compression Enable to " << stringEnable(b); #ifdef MYROOT1 cout << " WITH ROOT" << endl; #else cout << " WITHOUT ROOT" << endl; #endif } //set data compression enable dataCompressionEnable = b; //-- create writer threads depending on enable pthread_mutex_lock(&statusMutex); writerThreadsMask = 0x0; pthread_mutex_unlock(&(statusMutex)); createWriterThreads(true); if(b) numberofWriterThreads = MAX_NUMBER_OF_WRITER_THREADS; else numberofWriterThreads = 1; if(createWriterThreads() == FAIL){ cprintf(BG_RED,"Error: Could not create writer threads\n"); return FAIL; } //-- end of create writer threads setThreadPriorities(); //filter deleteFilter(); if(b) initializeFilter(); FILE_LOG(logINFO) << "Data Compression: " << stringEnable(dataCompressionEnable); return OK; } /***acquisition parameters***/ void UDPStandardImplementation::setShortFrameEnable(const int i){ FILE_LOG(logDEBUG) << __AT__ << " called"; shortFrameEnable = i; if(shortFrameEnable!=-1){ frameSize = GOTTHARD_SHORT_BUFFER_SIZE; bufferSize = GOTTHARD_SHORT_BUFFER_SIZE; onePacketSize = GOTTHARD_SHORT_BUFFER_SIZE; oneDataSize = GOTTHARD_SHORT_DATABYTES; maxPacketsPerFile = SHORT_MAX_FRAMES_PER_FILE * GOTTHARD_SHORT_PACKETS_PER_FRAME; packetsPerFrame = GOTTHARD_SHORT_PACKETS_PER_FRAME; frameIndexMask = GOTTHARD_SHORT_FRAME_INDEX_MASK; frameIndexOffset = GOTTHARD_SHORT_FRAME_INDEX_OFFSET; packetIndexMask = GOTTHARD_SHORT_PACKET_INDEX_MASK; }else{ frameSize = GOTTHARD_BUFFER_SIZE; bufferSize = GOTTHARD_BUFFER_SIZE; onePacketSize = GOTTHARD_ONE_PACKET_SIZE; oneDataSize = GOTTHARD_ONE_DATA_SIZE; maxPacketsPerFile = MAX_FRAMES_PER_FILE * GOTTHARD_PACKETS_PER_FRAME; packetsPerFrame = GOTTHARD_PACKETS_PER_FRAME; frameIndexMask = GOTTHARD_FRAME_INDEX_MASK; frameIndexOffset = GOTTHARD_FRAME_INDEX_OFFSET; packetIndexMask = GOTTHARD_PACKET_INDEX_MASK; } //filter deleteFilter(); if(dataCompressionEnable) initializeFilter(); FILE_LOG(logINFO) << "Short Frame Enable: " << shortFrameEnable; } int UDPStandardImplementation::setFrameToGuiFrequency(const uint32_t i){ FILE_LOG(logDEBUG) << __AT__ << " called"; FrameToGuiFrequency = i; if(setupFifoStructure() == FAIL) return FAIL; FILE_LOG(logINFO) << "Frame to Gui Frequency: " << FrameToGuiFrequency; return OK; } int UDPStandardImplementation::setAcquisitionPeriod(const uint64_t i){ FILE_LOG(logDEBUG) << __AT__ << " called"; acquisitionPeriod = i; if(setupFifoStructure() == FAIL) return FAIL; FILE_LOG(logINFO) << "Acquisition Period: " << (double)acquisitionPeriod/(1E9) << "s"; return OK; } int UDPStandardImplementation::setDynamicRange(const uint32_t i){ FILE_LOG(logDEBUG) << __AT__ << " called"; uint32_t oldDynamicRange = dynamicRange; FILE_LOG(logDEBUG) << "Info: Setting Dynamic Range to " << i; dynamicRange = i; if(myDetectorType == EIGER){ //set parameters depending on new dynamic range. packetsPerFrame = (tengigaEnable ? EIGER_TEN_GIGA_CONSTANT : EIGER_ONE_GIGA_CONSTANT) * dynamicRange * EIGER_MAX_PORTS; frameSize = onePacketSize * packetsPerFrame; maxPacketsPerFile = EIGER_MAX_FRAMES_PER_FILE * packetsPerFrame; //new dynamic range, then restart threads and resetup fifo structure if(oldDynamicRange != dynamicRange){ //delete threads if(threadStarted){ createListeningThreads(true); createWriterThreads(true); } //gui buffer if(latestData){delete[] latestData; latestData = NULL;} latestData = new char[frameSize]; //restructure fifo if(setupFifoStructure() == FAIL) return FAIL; //create threads if(createListeningThreads() == FAIL){ FILE_LOG(logERROR) << "Could not create listening thread"; return FAIL; } if(createWriterThreads() == FAIL){ FILE_LOG(logERROR) << "Could not create writer threads"; return FAIL; } setThreadPriorities(); } } FILE_LOG(logINFO) << "Dynamic Range: " << dynamicRange; return OK; } int UDPStandardImplementation::setTenGigaEnable(const bool b){ FILE_LOG(logDEBUG) << __AT__ << " called"; FILE_LOG(logDEBUG) << "Info: Setting Ten Giga to " << stringEnable(b); bool oldTenGigaEnable = tengigaEnable; tengigaEnable = b; if(myDetectorType == EIGER){ //set parameters depending on 10g if(tengigaEnable){ packetsPerFrame = EIGER_TEN_GIGA_CONSTANT * dynamicRange * EIGER_MAX_PORTS; onePacketSize = EIGER_TEN_GIGA_ONE_PACKET_SIZE; oneDataSize = EIGER_TEN_GIGA_ONE_DATA_SIZE; }else{ packetsPerFrame = EIGER_ONE_GIGA_CONSTANT * dynamicRange * EIGER_MAX_PORTS; onePacketSize = EIGER_ONE_GIGA_ONE_PACKET_SIZE; oneDataSize = EIGER_ONE_GIGA_ONE_DATA_SIZE; } frameSize = onePacketSize * packetsPerFrame; bufferSize = onePacketSize; maxPacketsPerFile = EIGER_MAX_FRAMES_PER_FILE * packetsPerFrame; FILE_LOG(logDEBUG) << dec << "packetsPerFrame:" << packetsPerFrame << "\nonePacketSize:" << onePacketSize << "\noneDataSize:" << oneDataSize << "\nframesize:" << frameSize << "\nbufferSize:" << bufferSize << "\nmaxPacketsPerFile:" << maxPacketsPerFile; //new enable, then restart threads and resetup fifo structure if(oldTenGigaEnable != tengigaEnable){ //delete threads if(threadStarted){ createListeningThreads(true); createWriterThreads(true); } //gui buffer if(latestData){delete[] latestData; latestData = NULL;} latestData = new char[frameSize]; //restructure fifo if(setupFifoStructure() == FAIL) return FAIL; //create threads if(createListeningThreads() == FAIL){ FILE_LOG(logERROR) << "Could not create listening thread"; return FAIL; } if(createWriterThreads() == FAIL){ FILE_LOG(logERROR) << "Could not create writer threads"; return FAIL; } setThreadPriorities(); } } FILE_LOG(logINFO) << "Ten Giga: " << stringEnable(tengigaEnable); return OK; } /************************************************************************* * Behavioral functions*************************************************** * They may modify the status of the receiver **************************** *************************************************************************/ /***initial functions***/ int UDPStandardImplementation::setDetectorType(const detectorType d){ FILE_LOG(logDEBUG) << __AT__ << " called"; FILE_LOG(logDEBUG) << "Setting receiver type"; deleteMembers(); initializeBaseMembers(); initializeMembers(); myDetectorType = d; switch(myDetectorType){ case GOTTHARD: case PROPIX: case MOENCH: case EIGER: case JUNGFRAUCTB: case JUNGFRAU: FILE_LOG(logINFO) << " ***** This is a " << getDetectorType(d) << " Receiver *****"; break; default: FILE_LOG(logERROR) << "This is an unknown receiver type " << (int)d; return FAIL; } //set detector specific variables switch(myDetectorType){ case GOTTHARD: packetsPerFrame = GOTTHARD_PACKETS_PER_FRAME; onePacketSize = GOTTHARD_ONE_PACKET_SIZE; oneDataSize = GOTTHARD_ONE_DATA_SIZE; frameSize = GOTTHARD_BUFFER_SIZE; bufferSize = GOTTHARD_BUFFER_SIZE; frameIndexMask = GOTTHARD_FRAME_INDEX_MASK; frameIndexOffset = GOTTHARD_FRAME_INDEX_OFFSET; packetIndexMask = GOTTHARD_PACKET_INDEX_MASK; maxPacketsPerFile = MAX_FRAMES_PER_FILE * GOTTHARD_PACKETS_PER_FRAME; fifoSize = GOTTHARD_FIFO_SIZE; //footerOffset = Not applicable; break; case PROPIX: packetsPerFrame = PROPIX_PACKETS_PER_FRAME; onePacketSize = PROPIX_ONE_PACKET_SIZE; //oneDataSize = Not applicable; frameSize = PROPIX_BUFFER_SIZE; bufferSize = PROPIX_BUFFER_SIZE; frameIndexMask = PROPIX_FRAME_INDEX_MASK; frameIndexOffset = PROPIX_FRAME_INDEX_OFFSET; packetIndexMask = PROPIX_PACKET_INDEX_MASK; maxPacketsPerFile = MAX_FRAMES_PER_FILE * PROPIX_PACKETS_PER_FRAME; fifoSize = PROPIX_FIFO_SIZE; //footerOffset = Not applicable; break; case MOENCH: packetsPerFrame = MOENCH_PACKETS_PER_FRAME; onePacketSize = MOENCH_ONE_PACKET_SIZE; oneDataSize = MOENCH_ONE_DATA_SIZE; frameSize = MOENCH_BUFFER_SIZE; bufferSize = MOENCH_BUFFER_SIZE; frameIndexMask = MOENCH_FRAME_INDEX_MASK; frameIndexOffset = MOENCH_FRAME_INDEX_OFFSET; packetIndexMask = MOENCH_PACKET_INDEX_MASK; maxPacketsPerFile = MOENCH_MAX_FRAMES_PER_FILE * MOENCH_PACKETS_PER_FRAME; fifoSize = MOENCH_FIFO_SIZE; //footerOffset = Not applicable; break; case EIGER: //assuming 1G in the beginning packetsPerFrame = EIGER_ONE_GIGA_CONSTANT * dynamicRange * EIGER_MAX_PORTS; onePacketSize = EIGER_ONE_GIGA_ONE_PACKET_SIZE; oneDataSize = EIGER_ONE_GIGA_ONE_DATA_SIZE; frameSize = onePacketSize * packetsPerFrame; bufferSize = onePacketSize; frameIndexMask = EIGER_FRAME_INDEX_MASK; frameIndexOffset = EIGER_FRAME_INDEX_OFFSET; packetIndexMask = EIGER_PACKET_INDEX_MASK; maxPacketsPerFile = EIGER_MAX_FRAMES_PER_FILE * packetsPerFrame; fifoSize = EIGER_FIFO_SIZE; footerOffset = EIGER_PACKET_HEADER_SIZE + oneDataSize; break; case JUNGFRAUCTB: case JUNGFRAU: packetsPerFrame = JCTB_PACKETS_PER_FRAME; onePacketSize = JCTB_ONE_PACKET_SIZE; //oneDataSize = Not applicable; frameSize = JCTB_BUFFER_SIZE; bufferSize = JCTB_BUFFER_SIZE; frameIndexMask = JCTB_FRAME_INDEX_MASK; frameIndexOffset = JCTB_FRAME_INDEX_OFFSET; packetIndexMask = JCTB_PACKET_INDEX_MASK; maxPacketsPerFile = JFCTB_MAX_FRAMES_PER_FILE * JCTB_PACKETS_PER_FRAME; fifoSize = JCTB_FIFO_SIZE; //footerOffset = Not applicable; break; default: FILE_LOG(logERROR) << "This is an unknown receiver type " << (int)d; return FAIL; } //delete threads and set number of listening threads if(myDetectorType == EIGER){ pthread_mutex_lock(&statusMutex); listeningThreadsMask = 0x0; pthread_mutex_unlock(&(statusMutex)); if(threadStarted) createListeningThreads(true); numberofListeningThreads = MAX_NUMBER_OF_LISTENING_THREADS; } //set up fifo structure -1 for numberofJobsPerBuffer ensure it is done numberofJobsPerBuffer = -1; setupFifoStructure(); //create threads if(createListeningThreads() == FAIL){ FILE_LOG(logERROR) << "Could not create listening thread"; return FAIL; } if(createWriterThreads() == FAIL){ FILE_LOG(logERROR) << "Could not create writer threads"; return FAIL; } setThreadPriorities(); //allocate for latest data (frame copy for gui) latestData = new char[frameSize]; FILE_LOG(logDEBUG) << " Detector type set to " << getDetectorType(d); return OK; } /***acquisition functions***/ void UDPStandardImplementation::resetAcquisitionCount(){ FILE_LOG(logDEBUG) << __AT__ << " starting"; totalPacketsCaught = 0; acqStarted = false; startAcquisitionIndex = 0; FILE_LOG(logINFO) << "Acquisition Count has been reset"; } int UDPStandardImplementation::startReceiver(char *c){ FILE_LOG(logDEBUG) << __AT__ << " called"; cout << "Starting Receiver" << endl; //RESET //reset measurement variables measurementStarted = false; startFrameIndex = 0; frameIndex = 0; if(!acqStarted){ currentFrameNumber = 0; //has to be zero to add to startframeindex for each scan acquisitionIndex = 0; frameIndex = 0; } for(int i = 0; i < numberofListeningThreads; ++i) totalListeningFrameCount[i] = 0; packetsCaught = 0; numMissingPackets = 0; numTotMissingPackets = 0; numTotMissingPacketsInFile = 0; //reset file parameters packetsInFile = 0; if(sfilefd){ fclose(sfilefd); sfilefd = NULL; } //reset gui variables guiData = NULL; guiDataReady=0; strcpy(guiFileName,""); //reset masks pthread_mutex_lock(&statusMutex); writerThreadsMask = 0x0; createFileMask = 0x0; fileCreateSuccess = false; pthread_mutex_unlock(&statusMutex); //Print Receiver Configuration if(myDetectorType != EIGER){ FILE_LOG(logINFO) << "Data Compression has been " << stringEnable(dataCompressionEnable); FILE_LOG(logINFO) << "Number of Jobs Per Buffer: " << numberofJobsPerBuffer; FILE_LOG(logINFO) << "Max Packets Per File:" << maxPacketsPerFile; } if(FrameToGuiFrequency) FILE_LOG(logINFO) << "Frequency of frames sent to gui: " << FrameToGuiFrequency; else FILE_LOG(logINFO) << "Frequency of frames sent to gui: Random"; //create UDP sockets if(createUDPSockets() == FAIL){ strcpy(c,"Could not create UDP Socket(s)."); FILE_LOG(logERROR) << c; return FAIL; } if(setupWriter() == FAIL){ //stop udp socket shutDownUDPSockets(); sprintf(c,"Could not create file %s.",completeFileName); FILE_LOG(logERROR) << c; return FAIL; } //For compression, just for gui purposes if(dataCompressionEnable) sprintf(completeFileName, "%s/%s_fxxx_%lld_xx.root", filePath,fileName,(long long int)fileIndex); //initialize semaphore to synchronize between writer and gui reader threads sem_init(&writerGuiSemaphore,1,0); //status and thread masks pthread_mutex_lock(&statusMutex); status = RUNNING; for(int i=0;iShutDownSocket(); delete udpSocket[i]; udpSocket[i] = NULL; } } return OK; } /** * Pre: status is running, udp sockets have been initialized, stop receiver initiated * Post:udp sockets closed, status is transmitting * */ void UDPStandardImplementation::startReadout(){ FILE_LOG(logDEBUG) << __AT__ << " called"; FILE_LOG(logDEBUG) << "Info: Transmitting last data"; if(status == RUNNING){ //wait for all packets uint64_t prev = totalPacketsCaught; usleep(50000); while(prev!=totalPacketsCaught){ prev=totalPacketsCaught; usleep(50000); } //set status pthread_mutex_lock(&statusMutex); status = TRANSMITTING; pthread_mutex_unlock(&statusMutex); cout << "Status: Transmitting" << endl; } //shut down udp sockets and make listeners push dummy (end) packets for writers shutDownUDPSockets(); } void UDPStandardImplementation::readFrame(char* c,char** raw, uint64_t &startAcq, uint64_t &startFrame){ FILE_LOG(logDEBUG) << __AT__ << " called"; //point to gui data, to let writer thread know that gui is back for data if (guiData == NULL){ guiData = latestData; #ifdef DEBUG4 cprintf(CYAN,"Info: gui data not null anymore - ready to get data\n"); #endif } //copy data and filename strcpy(c,guiFileName); startAcq = startAcquisitionIndex; startFrame = startFrameIndex; //gui data not copied yet if(!guiDataReady){ #ifdef DEBUG4 cprintf(CYAN,"Info: gui data not ready\n"); #endif *raw = NULL; } //gui data ready, pass address to gui to copy the data else{ #ifdef DEBUG4 cprintf(CYAN,"Info: gui data ready\n"); #endif *raw = guiData; guiData = NULL; //for nth frame to gui, post semaphore so writer stops waiting if((FrameToGuiFrequency) && (writerThreadsMask)){ #ifdef DEBUG4 cprintf(CYAN,"Info: gonna post\n"); #endif //release after getting data sem_post(&writerGuiSemaphore); } #ifdef DEBUG4 cprintf(CYAN,"Info: done post\n"); #endif } } void UDPStandardImplementation::closeFile(int i){ FILE_LOG(logDEBUG) << __AT__ << " called for " << i ; //normal if(!dataCompressionEnable){ if(sfilefd){ #ifdef DEBUG4 FILE_LOG(logDEBUG4) << "Going to close file: " << fileno(sfilefd)); #endif fclose(sfilefd); sfilefd = NULL; } } //compression else{ #if (defined(MYROOT1) && defined(ALLFILE_DEBUG)) || !defined(MYROOT1) if(sfilefd){ #ifdef DEBUG4 FILE_LOG(logDEBUG4) << "sfield: " << (int)sfilefd; #endif fclose(sfilefd); sfilefd = NULL; } #endif #ifdef MYROOT1 pthread_mutex_lock(&writeMutex); //write to file if(myTree[i] && myFile[i]){ myFile[i] = myTree[i]->GetCurrentFile(); if(myFile[i]->Write()) //->Write(tall->GetName(),TObject::kOverwrite); cout << "Thread " << i <<": wrote frames to file" << endl; else cout << "Thread " << i << ": could not write frames to file" << endl; }else cout << "Thread " << i << ": could not write frames to file: No file or No Tree" << endl; //close file if(myTree[i] && myFile[i]) myFile[i] = myTree[i]->GetCurrentFile(); if(myFile[i] != NULL) myFile[i]->Close(); myFile[i] = NULL; myTree[i] = NULL; pthread_mutex_unlock(&writeMutex); #endif } } /************************************************************************* * Listening and Writing Threads ***************************************** *************************************************************************/ int UDPStandardImplementation::createListeningThreads(bool destroy){ FILE_LOG(logDEBUG) << __AT__ << " starting"; //reset masks killAllListeningThreads = false; pthread_mutex_lock(&statusMutex); listeningThreadsMask = 0x0; pthread_mutex_unlock(&(statusMutex)); //destroy if(destroy){ FILE_LOG(logDEBUG) << "Info: Destroying Listening Thread(s)"; killAllListeningThreads = true; for(int i = 0; i < numberofListeningThreads; ++i){ sem_post(&listenSemaphore[i]); pthread_join(listeningThreads[i],NULL); FILE_LOG(logDEBUG) << "." << flush; } killAllListeningThreads = false; threadStarted = false; FILE_LOG(logDEBUG) << "Info: Listening thread(s) destroyed"; } //create else{ FILE_LOG(logDEBUG) << "Info: Creating Listening Thread(s)"; //reset current index currentThreadIndex = -1; for(int i = 0; i < numberofListeningThreads; ++i){ sem_init(&listenSemaphore[i],1,0); threadStarted = false; currentThreadIndex = i; if(pthread_create(&listeningThreads[i], NULL,startListeningThread, (void*) this)){ FILE_LOG(logERROR) << "Could not create listening thread with index " << i; return FAIL; } while(!threadStarted); FILE_LOG(logDEBUG) << "." << flush; } FILE_LOG(logDEBUG) << "Info: Listening thread(s) created successfully."; } return OK; } int UDPStandardImplementation::createWriterThreads(bool destroy){ FILE_LOG(logDEBUG) << __AT__ << " starting"; //reset masks killAllWritingThreads = false; pthread_mutex_lock(&statusMutex); writerThreadsMask = 0x0; createFileMask = 0x0; pthread_mutex_unlock(&(statusMutex)); //destroy threads if(destroy){ FILE_LOG(logDEBUG) << "Info: Destroying Writer Thread(s)"; killAllWritingThreads = true; for(int i = 0; i < numberofWriterThreads; ++i){ sem_post(&writerSemaphore[i]); pthread_join(writingThreads[i],NULL); FILE_LOG(logDEBUG) <<"."<getErrorStatus(); if(!iret){ cout << "UDP port opened at port " << port[i] << endl; }else{ FILE_LOG(logERROR) << "Could not create UDP socket on port " << port[i] << " error: " << iret; shutDownUDPSockets(); return FAIL; } } FILE_LOG(logDEBUG) << "UDP socket(s) created successfully."; cout << "Listener Ready ..." << endl; return OK; } int UDPStandardImplementation::setupWriter(){ FILE_LOG(logDEBUG) << __AT__ << " starting"; //acquisition start call back returns enable write cbAction = DO_EVERYTHING; if (startAcquisitionCallBack) cbAction=startAcquisitionCallBack(filePath,fileName,(int)fileIndex,bufferSize,pStartAcquisition); if(cbAction < DO_EVERYTHING){ FILE_LOG(logINFO) << "Call back activated. Data saving must be taken care of by user in call back."; if (rawDataReadyCallBack){ FILE_LOG(logINFO) << "Data Write has been defined externally"; } }else if(!fileWriteEnable) FILE_LOG(logINFO) << "Data will not be saved"; //creating first file //setting all value to 1 pthread_mutex_lock(&statusMutex); for(int i=0; i DO_NOTHING){ //close file pointers if(sfilefd){ fclose(sfilefd); sfilefd = NULL; } //create file if(!overwriteEnable){ if (NULL == (sfilefd = fopen((const char *) (completeFileName), "wx"))){ FILE_LOG(logERROR) << "Could not create/overwrite file" << completeFileName; return FAIL; } }else if (NULL == (sfilefd = fopen((const char *) (completeFileName), "w"))){ FILE_LOG(logERROR) << "Could not create file" << completeFileName; return FAIL; } //setting file buffer size to 16mb setvbuf(sfilefd,NULL,_IOFBF,BUF_SIZE); //Print packet loss and filenames if(!packetsCaught){ previousFrameNumber = -1; cout << "File: " << completeFileName << endl; }else{ cout << completeFileName << "\tPacket Loss: " << setw(4)<initEventTree(temp, &iframe); //resets the pedestalSubtraction array and the commonModeSubtraction singlePhotonDetectorObject[ithread]->newDataSet(); if(myFile[ithread]==NULL){ FILE_LOG(logERROR) << "File Null"; return FAIL; } if(!myFile[ithread]->IsOpen()){ FILE_LOG(logERROR) << "File Not Open"; return FAIL; } return OK; #endif return FAIL; } void* UDPStandardImplementation::startListeningThread(void* this_pointer){ FILE_LOG(logDEBUG) << __AT__ << " called"; ((UDPStandardImplementation*)this_pointer)->startListening(); return this_pointer; } void* UDPStandardImplementation::startWritingThread(void* this_pointer){ FILE_LOG(logDEBUG) << __AT__ << " called"; ((UDPStandardImplementation*)this_pointer)->startWriting(); return this_pointer; } void UDPStandardImplementation::startListening(){ FILE_LOG(logDEBUG) << __AT__ << " called"; //set current thread value index int ithread = currentThreadIndex; //let calling function know thread started and obtained current threadStarted = 1; //variable definitions int listenSize = 0; //listen to only 1 packet uint32_t rc; //size of buffer received in bytes //split frames int carryonBufferSize; //from previous buffer to keep frames together in a buffer char* tempBuffer = NULL; //temporary buffer to store split frames /* outer loop - loops once for each acquisition */ //infinite loop, exited only to change dynamic range, 10G parameters etc (then recreated again) while(true){ //reset parameters before acquisition carryonBufferSize = 0; if(myDetectorType != EIGER){ listenSize = bufferSize * numberofJobsPerBuffer; //listen to more than 1 packet if(tempBuffer!=NULL){delete []tempBuffer;tempBuffer=NULL;} tempBuffer = new char[onePacketSize * (packetsPerFrame - 1)]; //store maximum of 1 packets less in a frame } /* inner loop - loop for each buffer */ //until mask unset (udp sockets shut down by client) while((1 << ithread) & listeningThreadsMask){ //pop from fifo fifoFree[ithread]->pop(buffer[ithread]); #ifdef CFIFODEBUG if(ithread == 0) cprintf(CYAN,"Listening_Thread %d :Listener popped from fifofree %p\n", ithread, (void*)(buffer[ithread])); else cprintf(YELLOW,"Listening_Thread %d :Listener popped from fifofree %p\n", ithread, (void*)(buffer[ithread])); #endif //udpsocket doesnt exist if(udpSocket[ithread] == NULL){ FILE_LOG(logERROR) << "Listening_Thread " << ithread << ": UDP Socket not created"; stopListening(ithread,0); continue; } rc = prepareAndListenBuffer(ithread, listenSize, carryonBufferSize, tempBuffer); //start indices for each start of scan/acquisition if((!measurementStarted) && (rc > 0)){ pthread_mutex_lock(&progressMutex); if(!measurementStarted) startFrameIndices(ithread); pthread_mutex_unlock(&progressMutex); } //problem in receiving or end of acquisition if (status == TRANSMITTING){ stopListening(ithread,rc); continue; } //write packet count to buffer if(myDetectorType == EIGER) (*((uint32_t*)(buffer[ithread]))) = 1; //handling split frames and writing packet Count to buffer else (*((uint32_t*)(buffer[ithread]))) = processListeningBuffer(ithread, carryonBufferSize, tempBuffer); //push buffer to FIFO while(!fifo[ithread]->push(buffer[ithread])); #ifdef CFIFODEBUG if(ithread == 0) cprintf(CYAN,"Listening_Thread %d: Listener pushed into fifo %p\n",ithread, (void*)(buffer[ithread])); else cprintf(YELLOW,"Listening_Thread %d: Listener pushed into fifo %p\n",ithread, (void*)(buffer[ithread])); #endif }/*--end of loop for each buffer (inner loop)*/ //end of acquisition, wait for next acquisition/change of parameters sem_wait(&listenSemaphore[ithread]); //check to exit thread (for change of parameters) - only EXIT possibility if(killAllListeningThreads){ cprintf(BLUE,"Listening_Thread %d:Goodbye!\n",ithread); //free resources at exit if(tempBuffer) delete[] tempBuffer; pthread_exit(NULL); } }/*--end of loop for each acquisition (outer loop) */ } int UDPStandardImplementation::prepareAndListenBuffer(int ithread, int lSize, int cSize, char* temp){ FILE_LOG(logDEBUG) << __AT__ << " called"; //listen to UDP packets if(cSize) memcpy(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS, temp, cSize); int receivedSize = udpSocket[ithread]->ReceiveDataOnly(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS + cSize, lSize + cSize); //throw away packets that is not one packet size, need to check status if socket is shut down while(status != TRANSMITTING && myDetectorType == EIGER && receivedSize != onePacketSize) { if(receivedSize != EIGER_HEADER_LENGTH) cprintf(RED,"Listening_Thread %d: Listened to a weird packet size %d\n",ithread, receivedSize); #ifdef DEBUG else cprintf(BLUE,"Listening_Thread %d: Listened to a header packet\n",ithread); #endif receivedSize = udpSocket[ithread]->ReceiveDataOnly(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS); } #ifdef MANUALDEBUG eiger_packet_header_t* header = (eiger_packet_header_t*) (buffer[ithread]+HEADER_SIZE_NUM_TOT_PACKETS); eiger_packet_footer_t* footer = (eiger_packet_footer_t*)(buffer[ithread] + footerOffset + HEADER_SIZE_NUM_TOT_PACKETS); cprintf(GREEN,"thread:%d subframenum:%d oldpacketnum:%d new pnum:%d\n", ithread, (*( (unsigned int*) header->subFameNumber)), (*( (uint8_t*) header->dynamicRange)), (*( (uint16_t*) footer->packetNumber))); #endif #ifdef DEBUG cprintf(BLUE, "Listening_Thread %d : Received bytes: %d. Expected bytes: %d\n", ithread, receivedSize, bufferSize * numberofJobsPerBuffer-cSize); #endif return receivedSize; } void UDPStandardImplementation::startFrameIndices(int ithread){ FILE_LOG(logDEBUG) << __AT__ << " called"; //determine startFrameIndex switch(myDetectorType){ case EIGER: startFrameIndex = 0; //frame number always resets break; default: if(shortFrameEnable < 0){ startFrameIndex = (((((uint32_t)(*((uint32_t*)(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS))))+1) & (frameIndexMask)) >> frameIndexOffset); }else{ startFrameIndex = ((((uint32_t)(*((uint32_t*)(buffer[ithread]+HEADER_SIZE_NUM_TOT_PACKETS)))) & (frameIndexMask)) >> frameIndexOffset); } break; } //start of entire acquisition if(!acqStarted){ startAcquisitionIndex = startFrameIndex; acqStarted = true; cprintf(BLUE,"Listening_Thread %d: startAcquisitionIndex:%lld\n",ithread,(long long int)startAcquisitionIndex); } //set start of scan/real time measurement cprintf(BLUE,"Listening_Thread %d: startFrameIndex: %lld\n", ithread,(long long int)startFrameIndex); measurementStarted = true; } void UDPStandardImplementation::stopListening(int ithread, int numbytes){ FILE_LOG(logDEBUG) << __AT__ << " called"; #ifdef DEBUG4 cprintf(BLUE,"Listening_Thread %d: Stop Listening\nStatus: %s\n", ithread, runStatusType(status).c_str()); #endif //less than 1 packet size (especially for eiger), ignore the buffer (so that 2 dummy buffers are not sent with pc=0) if(numbytes < onePacketSize) numbytes = 0; //free empty buffer if(numbytes <= 0){ cprintf(BLUE,"Listening_Thread %d :End of Acquisition\n", ithread); while(!fifoFree[ithread]->push(buffer[ithread])); #ifdef CFIFODEBUG if(ithread == 0) cprintf(CYAN,"Listening_Thread %d :Listener push empty buffer into fifofree %p\n", ithread, (void*)(buffer[ithread])); else cprintf(YELLOW,"Listening_Thread %d :Listener push empty buffer into fifofree %p\n", ithread, (void*)(buffer[ithread])); #endif } //push last non empty buffer into fifo else{ (*((uint32_t*)(buffer[ithread]))) = numbytes/onePacketSize; totalListeningFrameCount[ithread] += (numbytes/onePacketSize); #ifdef DEBUG cprintf(BLUE,"Listening_Thread %d: Last Buffer numBytes:%d\n",ithread, numbytes); cprintf(BLUE,"Listening_Thread %d: Last Buffer packet count:%d\n",ithread, numbytes/onePacketSize); #endif while(!fifo[ithread]->push(buffer[ithread])); #ifdef CFIFODEBUG if(ithread == 0) cprintf(CYAN,"Listening_Thread %d: Listener Last Buffer pushed into fifo %p\n", ithread,(void*)(buffer[ithread])); else cprintf(YELLOW,"Listening_Thread %d: Listener Last Buffer pushed into fifo %p\n", ithread,(void*)(buffer[ithread])); #endif } //push dummy-end buffer into fifo for all writer threads for(int i=0; ipop(buffer[ithread]); #ifdef CFIFODEBUG if(ithread == 0) cprintf(CYAN,"Listening_Thread %d: Popped Dummy from fifoFree %p\n", ithread,(void*)(buffer[ithread])); else cprintf(YELLOW,"Listening_Thread %d: Popped Dummy from fifoFree %p\n", ithread,(void*)(buffer[ithread])); #endif //creating dummy-end buffer with pc=0xFFFF (*((uint32_t*)(buffer[ithread]))) = dummyPacketValue; while(!fifo[ithread]->push(buffer[ithread])); #ifdef CFIFODEBUG if(ithread == 0) cprintf(CYAN,"Listening_Thread %d: Listener pushed dummy-end buffer into fifo %p\n", ithread,(void*)(buffer[ithread])); else cprintf(YELLOW,"Listening_Thread %d: Listener pushed dummy-end buffer into fifo %p\n", ithread,(void*)(buffer[ithread])); #endif } //reset mask and exit loop pthread_mutex_lock(&statusMutex); listeningThreadsMask^=(1< 1) cprintf(BLUE,"Listening_Thread %d: Waiting for other listening threads to be done.. current mask:0x%x\n", ithread, listeningThreadsMask); #endif while(listeningThreadsMask) usleep(5000); #ifdef DEBUG4 int t=0; for(i=0;i> frameIndexOffset)); #endif cSize = onePacketSize; --packetCount; } } #ifdef DEBUG4 cprintf(BLUE, "Listening_Thread %d: First Header:%d\n", (((((uint32_t)(*((uint32_t*)(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS))))+1) & (frameIndexMask)) >> frameIndexOffset)); #endif break; case MOENCH: lastPacketOffset = (((numberofJobsPerBuffer * packetsPerFrame - 1) * onePacketSize) + HEADER_SIZE_NUM_TOT_PACKETS); #ifdef DEBUG4 cprintf(BLUE, "Listening_Thread %d: First Header:%d\t First Packet:%d\t Last Header:%d\t Last Packet:%d\tLast Packet Offset:%d\n", (((((uint32_t)(*((uint32_t*)(buffer[ithread]+HEADER_SIZE_NUM_TOT_PACKETS))))) & (frameIndexMask)) >> frameIndexOffset), ((((uint32_t)(*((uint32_t*)(buffer[ithread]+HEADER_SIZE_NUM_TOT_PACKETS))))) & (packetIndexMask)), (((((uint32_t)(*((uint32_t*)(buffer[ithread]+lastpacketoffset))))) & (frameIndexMask)) >> frameIndexOffset), ((((uint32_t)(*((uint32_t*)(buffer[ithread]+lastpacketoffset))))) & (packetIndexMask)), lastPacketOffset); #endif //moench last packet value is 0, so find the last packet and store the others in a temp storage if( ((((uint32_t)(*((uint32_t*)(buffer[ithread]+lastPacketOffset))))) & (packetIndexMask))){ lastFrameHeader = ((((uint32_t)(*((uint32_t*)(buffer[ithread]+lastPacketOffset))))) & (frameIndexMask)) >> frameIndexOffset; cSize += onePacketSize; lastPacketOffset -= onePacketSize; --packetCount; while (lastFrameHeader == (((((uint32_t)(*((uint32_t*)(buffer[ithread]+lastPacketOffset))))) & (frameIndexMask)) >> frameIndexOffset)){ cSize += onePacketSize; lastPacketOffset -= onePacketSize; --packetCount; } memcpy(temp, buffer[ithread]+(lastPacketOffset+onePacketSize), cSize); #ifdef DEBUG4 cprintf(BLUE, "Listening_Thread %d: temp Header:%d\t temp Packet:%d\n", (((((uint32_t)(*((uint32_t*)(temp)))))& (frameIndexMask)) >> frameIndexOffset), ((((uint32_t)(*((uint32_t*)(temp))))) & (packetIndexMask))); #endif } break; default: cprintf(RED,"Listening_Thread %d: Error: This detector %s is not implemented in the receiver\n", ithread, getDetectorType(myDetectorType).c_str()); break; } #ifdef DEBUG4 cprintf(BLUE,"Listening_Thread %d: PacketCount:%d CarryonBufferSize:%d\n",ithread, packetCount, cSize); #endif return packetCount; } void UDPStandardImplementation::startWriting(){ FILE_LOG(logDEBUG) << __AT__ << " called"; //set current thread value index int ithread = currentThreadIndex; //let calling function know thread started and obtained current threadStarted = 1; switch(myDetectorType){ case EIGER: processWritingBufferPacketByPacket(ithread); break; default: processWritingBuffer(ithread); break; } } void UDPStandardImplementation::processWritingBuffer(int ithread){ FILE_LOG(logDEBUG) << __AT__ << " called"; //variable definitions char* wbuf[numberofListeningThreads]; //buffer popped from FIFO sfilefd = NULL; //file pointer uint64_t nf; //for compression, number of frames /* outer loop - loops once for each acquisition */ //infinite loop, exited only to change dynamic range, 10G parameters etc (then recreated again) while(true){ //--reset parameters before acquisition nf = 0; guiData = latestData; //so that the first frame is always copied /* inner loop - loop for each buffer */ //until mask unset (udp sockets shut down by client) while((1 << ithread) & writerThreadsMask){ //pop fifo[0]->pop(wbuf[0]); #ifdef DEBUG5 cprintf(GREEN,"Writing_Thread %d: Popped %p from FIFO %d\n", ithread, (void*)(wbuf[0]),0); #endif uint32_t numPackets = (uint32_t)(*((uint32_t*)wbuf[0])); #ifdef DEBUG4 cprintf(GREEN,"Writing_Thread %d: Number of Packets: %d for FIFO %d\n", ithread, numPackets, 0); #endif //end of acquisition if(numPackets == dummyPacketValue){ #ifdef DEBUG3 cprintf(GREEN,"Writing_Thread %d: Dummy frame popped out of FIFO %d",ithread, 0); #endif stopWriting(ithread,wbuf); continue; } //process if(!dataCompressionEnable) handleWithoutDataCompression(ithread, wbuf, numPackets); else{ #if defined(MYROOT1) && defined(ALLFILE_DEBUG) if(npackets > 0) writeFileWithoutCompression(wbuf, numPackets); #endif handleDataCompression(ithread,wbuf,nf); } }/*--end of loop for each buffer (inner loop)*/ waitWritingBufferForNextAcquisition(ithread); }/*--end of loop for each acquisition (outer loop) */ } void UDPStandardImplementation::processWritingBufferPacketByPacket(int ithread){ FILE_LOG(logDEBUG) << __AT__ << " called"; //variable definitions char* packetBuffer[numberofListeningThreads]; //buffer popped from FIFO sfilefd = NULL; //file pointer bool popReady[numberofListeningThreads]; //if the FIFO can be popped uint32_t numPackets[numberofListeningThreads]; //number of packets popped from the FIFO int MAX_NUM_PACKETS = 1024; //highest 32 bit has 1024 number of packets uint32_t LAST_PACKET_VALUE; //last packet number CircularFifo* fifoTempFree[numberofListeningThreads];//ciruclar fifo to keep track of one frame packets to be freed and reused later char* temp = NULL; char* frameBuffer[MAX_NUM_PACKETS]; //buffer offset created for a whole frame int frameBufferoffset[numberofListeningThreads]; //buffer offset created for a whole frame for both listening threads char* blankframe[MAX_NUM_PACKETS]; //blank buffer for a whole frame with missing packets int blankoffset; //blank buffer offset bool fullframe[numberofListeningThreads]; //if full frame processed for each listening thread volatile uint32_t threadFrameNumber[numberofListeningThreads]; //thread frame number for each listening thread buffer popped out volatile uint32_t presentFrameNumber; //the current frame number aiming to be built volatile uint32_t lastPacketNumber[numberofListeningThreads]; //last packet number got volatile uint32_t currentPacketNumber[numberofListeningThreads];//current packet number volatile int numberofMissingPackets[numberofListeningThreads]; // number of missing packets in this buffer for(int i=0; iisEmpty()) fifoTempFree[i]->pop(temp); delete fifoTempFree[i]; } fifoTempFree[i] = new CircularFifo(MAX_NUM_PACKETS); } for(uint32_t i=0; imissingPacket) = missingPacketValue; //set each value inside blank frame to 0xff for(int j=0;j<(oneDataSize);++j){ unsigned char* blankframe_data = (unsigned char*)blankframe[i] + sizeof(eiger_packet_header_t) + j; *(blankframe_data) = 0xFF; } } /* inner loop - loop for each buffer */ //until mask unset (udp sockets shut down by client) while((1 << ithread) & writerThreadsMask){ //pop fifo and if end of acquisition if(popAndCheckEndofAcquisition(ithread, packetBuffer, popReady, numPackets,fifoTempFree)){ #ifdef DEBUG4 cprintf(GREEN,"Writing_Thread All dummy-end buffers popped\n", ithread); #endif //finish missing packets if(((frameBufferoffset[0]!=0) || (frameBufferoffset[1]!=((int)packetsPerFrame/numberofListeningThreads)))); else{ stopWriting(ithread,packetBuffer); continue; } } //get a full frame------------------------------------------------------------------------------------------------------- for(int i=0;ipacketNumber); } //calculate number of missing packets----------------------------------------------------- numberofMissingPackets[i] = 0; #ifdef DEBUG4 if(numPackets[i] == dummyPacketValue) cprintf(GREEN, "Fifo %d: Dummy packet: Adding missing packets to the last frame\n", i); else{ cprintf(GREEN,"Fifo %d: fnum %d, fnum_thread %d, pnum %d, last_pnum %d, pnum_offset %d\n" "Fifo %d: Add missing packets to the right fnum %d\n", i,presentFrameNumber[i],threadFrameNumber[i], currentPacketNumber[i],lastPacketNumber[i],frameBufferoffset[i], i,presentFrameNumber); } #endif if((numPackets[i] == dummyPacketValue) || (threadFrameNumber[i] != presentFrameNumber)) numberofMissingPackets[i] = (LAST_PACKET_VALUE - lastPacketNumber[i]); else numberofMissingPackets[i] = (currentPacketNumber[i] - lastPacketNumber[i] - 1); //add missing packets--------------------------------------------------------------------- for(int j=0;jmissingPacket)!= missingPacketValue){ eiger_packet_header_t* blankframe_header = (eiger_packet_header_t*) blankframe[blankoffset]; cprintf(BG_RED, "Fifo %d: Missing Packet Error: Adding blank packets mismatch " "pnum_offset %d, pnum %d, fnum_thread %d, missingpacket_buffer 0x%x, missingpacket_blank 0x%x\n", i,frameBufferoffset[i],currentPacketNumber[i],threadFrameNumber[i], *( (uint16_t*) frameBuffer_header->missingPacket), *( (uint16_t*) blankframe_header->missingPacket)); exit(-1); }else{ #ifdef DEBUG4 cprintf(RED, "Fifo %d: Missing Packet: Adding blank packets success " "pnum_offset %d, pnum %d, fnum_thread %d, missingpacket_buffer 0x%x\n", i,frameBufferoffset[i],currentPacketNumber[i],threadFrameNumber[i], *( (uint16_t*) frameBuffer_header->missingPacket)); #endif frameBufferoffset[i]++; blankoffset++; } } //missed packets/future packet: do not pop over and determine fullframe-------------------- if(numberofMissingPackets[i]){ popReady[i] = false; if((numPackets[i] == dummyPacketValue) ||(threadFrameNumber[i] != presentFrameNumber)) fullframe[i] = true; else{ fullframe[i] = false; //update last packet lastPacketNumber[i] = currentPacketNumber[i] - 1; } if(threadFrameNumber[i] != presentFrameNumber) threadFrameNumber[i] = presentFrameNumber; numMissingPackets += numberofMissingPackets[i]; } //no missed packet: add current packet-------------------------------------------------------------- else{ if(currentPacketNumber[i] != (uint32_t)(frameBufferoffset[i]-(i*packetsPerFrame/numberofListeningThreads))+1){ cprintf(BG_RED, "Fifo %d: Correct Packet Offset Error:Adding current packet mismatch " "pnum_offset %d,pnum %d fnum_thread %d\n", i,frameBufferoffset[i],currentPacketNumber[i],threadFrameNumber[i]); exit(-1); } frameBuffer[frameBufferoffset[i]] = packetBuffer[i] + HEADER_SIZE_NUM_TOT_PACKETS; #ifdef DEBUG4 eiger_packet_header_t* frameBuffer_header = (eiger_packet_header_t*) frameBuffer[frameBufferoffset[i]]; eiger_packet_footer_t* frameBuffer_footer = (eiger_packet_footer_t*) (frameBuffer[frameBufferoffset[i]] + footer_offset); cprintf(GREEN, "Fifo %d: Current Packet added success:" "pnum_offset %d, pnum %d, fnum_thread %d, missingpacket_buffer 0x%x\n", i,frameBufferoffset[i],currentPacketNumber[i],threadFrameNumber[i], *( (uint16_t*) frameBuffer_header->missingPacket)); #endif frameBufferoffset[i]++; //update last packet lastPacketNumber[i] = currentPacketNumber[i]; popReady[i] = true; fullframe[i] = false; if(currentPacketNumber[i] == LAST_PACKET_VALUE){ #ifdef DEBUG4 cprintf(GREEN, "Fifo %d: Got last packet\n",i); #endif popReady[i] = false; fullframe[i] = true; } } } } //full frame if(fullframe[0] && fullframe[1]){ currentFrameNumber = presentFrameNumber; numTotMissingPacketsInFile += numMissingPackets; numTotMissingPackets += numMissingPackets; //#ifdef FNUM_DEBUG cprintf(GREEN,"**fnum:%d**\n",currentFrameNumber); //#endif //#ifdef MISSINGP_DEBUG if(numMissingPackets){ cprintf(RED, "Total missing packets %d for fnum %d\n",numMissingPackets,currentFrameNumber); for (int j=0;jmissingPacket)==missingPacketValue) cprintf(RED,"Found missing packet at pnum %d\n",j); } } //#endif //write and copy to gui handleWithoutDataCompression(ithread,frameBuffer,packetsPerFrame); //freeing for(int i=0; iisEmpty()){ fifoTempFree[i]->pop(temp); fifoFree[i]->push(temp); count++; #ifdef CFIFODEBUG if(i==0) cprintf(CYAN,"Fifo %d: %d Writing_Thread freed: pushed into fifofree %p\n",i,count, (void*)(temp)); else cprintf(YELLOW,"Fifo %d: %d Writing_Thread freed: pushed into fifofree %p\n",i, count,(void*)(temp)); #endif } } #ifdef DEBUG4 cprintf(GREEN,"Writing_Thread: finished freeing\n"); #endif //reset a few stuff presentFrameNumber++; for(int i=0; ipacketNumber), (void*)(packetBuffer[i])); } #endif }/*--end of loop for each buffer (inner loop)*/ waitWritingBufferForNextAcquisition(ithread); }/*--end of loop for each acquisition (outer loop) */ } void UDPStandardImplementation::waitWritingBufferForNextAcquisition(int ithread){ FILE_LOG(logDEBUG) << __AT__ << " called"; //in case they are not closed already closeFile(); #ifdef DEBUG4 cprintf(GREEN,"Writing_Thread %d: Done with acquisition. Waiting for 1st sem to create new file/change of parameters\n", ithread); #endif //end of acquisition, wait for file create/change of parameters sem_wait(&writerSemaphore[ithread]); //check to exit thread (for change of parameters) - only EXIT possibility if(killAllWritingThreads){ cprintf(GREEN,"Writing_Thread %d:Goodbye!\n",ithread); pthread_exit(NULL); } #ifdef DEBUG4 cprintf(GREEN,"Writing_Thread %d: Got 1st post. Creating File\n", ithread); #endif //create file if((1<* fifoTempFree[]){ FILE_LOG(logDEBUG) << __AT__ << " called"; bool endofAcquisition = true; for(int i=0; ipop(wbuffer[i]); #ifdef CFIFODEBUG if(i == 0) cprintf(CYAN,"Writing_Thread %d: Popped %p from FIFO %d\n", ithread, (void*)(wbuffer[i]),i); else cprintf(YELLOW,"Writing_Thread %d: Popped %p from FIFO %d\n", ithread, (void*)(wbuffer[i]),i); #endif nP[i] = (uint32_t)(*((uint32_t*)wbuffer[i])); #ifdef DEBUG4 cprintf(GREEN,"Writing_Thread %d: Number of Packets: %d for FIFO %d\n", ithread, nP[i], i); #endif //dummy-end buffer if(nP[i] == dummyPacketValue){ ready[i] = false; #ifdef DEBUG3 cprintf(GREEN,"Writing_Thread %d: Dummy frame popped out of FIFO %d",ithread, i); #endif } //normal buffer popped out else{ endofAcquisition = false; #ifdef DEBUG4 switch(myDetectorType){ case EIGER: eiger_packet_footer_t* wbuf_footer = (eiger_packet_footer_t*)(wbuffer[i] + footerOffset + HEADER_SIZE_NUM_TOT_PACKETS); //cprintf(BLUE,"footer value:0x%x\n",i,(uint64_t)(*( (uint64_t*) wbuf_footer))); cprintf(BLUE,"Fnum[%d]:%d\n",i,(uint32_t)(*( (uint64_t*) wbuf_footer))); cprintf(BLUE,"Pnum[%d]:%d\n",i,*( (uint16_t*) wbuf_footer->packetNumber)); break; default: break; } #endif if(myDetectorType == EIGER){ while(!fifoTempFree[i]->push(wbuffer[i])); } } } //when both are not popped but curretn frame number is being processed else{ if(nP[i] != dummyPacketValue) endofAcquisition = false; } } return endofAcquisition; } void UDPStandardImplementation::stopWriting(int ithread, char* wbuffer[]){ FILE_LOG(logDEBUG) << __AT__ << " called"; cprintf(GREEN,"Info: Writing_Thread %d: End of Acquisition\n",ithread); //free fifo for(int i=0; ipush(wbuffer[i])); #ifdef CFIFODEBUG if(i==0) cprintf(CYAN,"Writing_Thread %d: Freeing dummy-end buffer. Pushed into fifofree %p for listener %d\n", ithread,(void*)(wbuffer[i]),i); else cprintf(YELLOW,"Writing_Thread %d: Freeing dummy-end buffer. Pushed into fifofree %p for listener %d\n", ithread,(void*)(wbuffer[i]),i); #endif } //all threads need to close file, reset mask and exit loop closeFile(ithread); pthread_mutex_lock(&statusMutex); writerThreadsMask^=(1<> frameIndexOffset; //set indices acquisitionIndex = currentFrameNumber - startAcquisitionIndex; frameIndex = currentFrameNumber - startFrameIndex; } //callback to write data if (cbAction < DO_EVERYTHING){ switch(myDetectorType){ case EIGER: for(uint32_t i=0;i 0) writeFileWithoutCompression(wbuffer, npackets); #ifdef DEBUG4 cprintf(GREEN,"Writing_Thread: Writing done\nGoing to copy frame\n"); #endif //copy frame for gui if(npackets >= packetsPerFrame) copyFrameToGui(wbuffer); #ifdef DEBUG4 cprintf(GREEN,"Writing_Thread: Copied frame\n"); #endif //free fifo addresses (eiger frees for each packet later) if(myDetectorType != EIGER){ while(!fifoFree[0]->push(wbuffer[0])); #ifdef DEBUG5 cprintf(GREEN,"Writing_Thread %d: Freed buffer, pushed into fifofree %p for listener 0\n",ithread, (void*)(wbuffer[0])); #endif } } void UDPStandardImplementation::writeFileWithoutCompression(char* wbuffer[],uint32_t numpackets){ FILE_LOG(logDEBUG) << __AT__ << " called"; //create headers for eiger #ifdef WRITE_HEADERS if (myDetectorType == EIGER && cbAction == DO_EVERYTHING) createHeaders(wbuffer); #endif //if write enabled if((fileWriteEnable) && (sfilefd)){ int offset = HEADER_SIZE_NUM_TOT_PACKETS; //offset (not eiger) to keep track of how many packets saved uint32_t packetsToSave; //how many packets to save at a time volatile uint64_t tempframenumber; int lastpacket; //loop to take care of creating new files when it reaches max packets per file while(numpackets > 0){ //to create new file when max reached packetsToSave = maxPacketsPerFile - packetsInFile; if(packetsToSave > numpackets) packetsToSave = numpackets; //write to file if(cbAction == DO_EVERYTHING){ switch(myDetectorType){ case EIGER: for(uint32_t i=0; i= (uint32_t)maxPacketsPerFile){ //for packet loss, because currframenum is the latest one for eiger if(myDetectorType != EIGER){ lastpacket = (((packetsToSave - 1) * onePacketSize) + offset); //get frame number (eiger already gets it when it does packet to packet processing) tempframenumber = ((uint32_t)(*((uint32_t*)(wbuffer[0] + lastpacket)))); //for gotthard and normal frame, increment frame number to separate fnum and pnum if (myDetectorType == PROPIX ||(myDetectorType == GOTTHARD && shortFrameEnable == -1)) tempframenumber++; //get frame number currentFrameNumber = (tempframenumber & frameIndexMask) >> frameIndexOffset; //set indices acquisitionIndex = currentFrameNumber - startAcquisitionIndex; frameIndex = currentFrameNumber - startFrameIndex; } #ifdef DEBUG3 cprintf(GREEN,"Writing_Thread: Current Frame Number:%d\n",currentFrameNumber); #endif createNewFile(); } //increase offset if(myDetectorType != EIGER) offset += (packetsToSave * onePacketSize); numpackets -= packetsToSave; } } //only update parameters else{ if(numberofWriterThreads > 1) pthread_mutex_lock(&writeMutex); packetsInFile += numpackets; packetsCaught += (numpackets - numMissingPackets); totalPacketsCaught += (numpackets - numMissingPackets); numMissingPackets = 0; if(numberofWriterThreads > 1) pthread_mutex_unlock(&writeMutex); } } void UDPStandardImplementation::createHeaders(char* wbuffer[]){ int port = 0, missingPacket; for (uint32_t i = 0; i < packetsPerFrame; i++){ eiger_packet_header_t* wbuf_header = (eiger_packet_header_t*) wbuffer[i]; eiger_packet_footer_t* wbuf_footer = (eiger_packet_footer_t*)(wbuffer[i] + footerOffset); #ifdef DEBUG4 cprintf(GREEN, "Loop index:%d Pnum:%d\n",i,*( (uint16_t*) wbuf_footer->packetNumber)); #endif //which port if (i ==(packetsPerFrame/2)) port = 1; //missing packet if (*( (uint16_t*) wbuf_header->missingPacket)== missingPacketValue){ #ifdef DEBUG4 cprintf(GREEN,"Missing packet at %d\n", i+1); #endif missingPacket = 1; //add frame and packet numbers *( (uint64_t*) wbuf_footer) = (uint64_t)((currentFrameNumber+1)); *( (uint16_t*) wbuf_footer->packetNumber) = (i+1); } //normal packet else{ missingPacket = 0; //correct the packet numbers of port2 so that port1 and 2 are not the same if(port) *( (uint16_t*) wbuf_footer->packetNumber) = (*( (uint16_t*) wbuf_footer->packetNumber))+(packetsPerFrame/2); } //DEBUGGING if(*( (uint16_t*) wbuf_footer->packetNumber) != (i+1)){ cprintf(BG_RED, "Writing_Thread: Packet Number Mismatch! " "i %d, pnum %d, fnum %lld, missingPacket 0x%x\n", i,*( (uint16_t*) wbuf_footer->packetNumber),(long long int)currentFrameNumber,*( (uint16_t*) wbuf_header->missingPacket)); exit(-1); } //overwriting port number and dynamic range *( (uint8_t*) wbuf_header->portIndex) = port; *( (uint8_t*) wbuf_header->dynamicRange) = dynamicRange; } } void UDPStandardImplementation::copyFrameToGui(char* buffer[]){ FILE_LOG(logDEBUG) << __AT__ << " called"; //random read (gui not ready) //need to toggle guiDataReady or the second frame wont be copied if((!FrameToGuiFrequency) && (!guiData)){ #ifdef DEBUG4 cprintf(GREEN,"Writing_Thread: CopyingFrame: Resetting guiDataReady\n"); #endif pthread_mutex_lock(&dataReadyMutex); guiDataReady=0; pthread_mutex_unlock(&dataReadyMutex); } //random read (gui ready) or nth frame read: gui needs data now or it is the first frame else{ #ifdef DEBUG4 cprintf(GREEN,"Writing_Thread: CopyingFrame: Gui needs data now OR 1st frame\n"); #endif pthread_mutex_lock(&dataReadyMutex); guiDataReady=0; #ifdef DEBUG4 cprintf(GREEN,"Writing_Thread: CopyingFrame: guidataready is 0, Copying data\n"); #endif switch(myDetectorType){ case EIGER: for(uint32_t i=0; i> frameIndexOffset; //handle multi threads pthread_mutex_lock(&progressMutex); if(tempframenumber > currentFrameNumber) currentFrameNumber = tempframenumber; pthread_mutex_unlock(&progressMutex); //set indices acquisitionIndex = currentFrameNumber - startAcquisitionIndex; frameIndex = currentFrameNumber - startFrameIndex; //variable definitions char* buff[2]={0,0}; //an array just to be compatible with copyframetogui char* data = wbuffer[0]+ HEADER_SIZE_NUM_TOT_PACKETS; //data pointer to the next memory to be analysed int ndata; //size of data returned uint32_t np; //remaining number of packets returned uint32_t npackets = (uint32_t)(*((uint32_t*)wbuffer[0])); //number of total packets int remainingsize = npackets * onePacketSize; //size of the memory slot to be analyzed eventType thisEvent = PEDESTAL; int once = 0; int xmax = 0, ymax = 0; //max pixels in x and y direction int xmin = 1, ymin = 1; //min pixels in x and y direction double tot, tl, tr, bl, br; //determining xmax and ymax switch(myDetectorType){ case MOENCH: xmax = MOENCH_PIXELS_IN_ONE_ROW-1; ymax = MOENCH_PIXELS_IN_ONE_ROW-1; break; case GOTTHARD: if(shortFrameEnable == -1){ xmax = GOTTHARD_PIXELS_IN_ROW-1; ymax = GOTTHARD_PIXELS_IN_COL-1; }else{ xmax = GOTTHARD_SHORT_PIXELS_IN_ROW-1; ymax = GOTTHARD_SHORT_PIXELS_IN_COL-1; } break; default: break; } while(buff[0] = receiverData[ithread]->findNextFrame(data,ndata,remainingsize)){ //remaining number of packets np = ndata/onePacketSize; if ((np == packetsPerFrame) && (buff[0]!=NULL)){ if(nf == 1000) cprintf(GREEN, "Writing_Thread %d: pedestal done\n", ithread); singlePhotonDetectorObject[ithread]->newFrame(); //only for moench if(commonModeSubtractionEnable){ for(int ix = xmin - 1; ix < xmax+1; ix++){ for(int iy = ymin - 1; iy < ymax+1; iy++){ thisEvent = singlePhotonDetectorObject[ithread]->getEventType(buff[0], ix, iy, 0); } } } for(int ix = xmin - 1; ix < xmax+1; ix++) for(int iy = ymin - 1; iy < ymax+1; iy++){ thisEvent=singlePhotonDetectorObject[ithread]->getEventType(buff[0], ix, iy, commonModeSubtractionEnable); if (nf>1000) { tot=0; tl=0; tr=0; bl=0; br=0; if (thisEvent==PHOTON_MAX) { receiverData[ithread]->getFrameNumber(buff[0]); //iFrame=receiverData[ithread]->getFrameNumber(buff); #ifdef MYROOT1 myTree[ithread]->Fill(); //cout << "Fill in event: frmNr: " << iFrame << " ix " << ix << " iy " << iy << " type " << thisEvent << endl; #else pthread_mutex_lock(&writeMutex); if((fileWriteEnable) && (sfilefd)) singlePhotonDetectorObject[ithread]->writeCluster(sfilefd); pthread_mutex_unlock(&writeMutex); #endif } } } nf++; #ifndef ALLFILE pthread_mutex_lock(&progressMutex); packetsInFile += packetsPerFrame; packetsCaught += packetsPerFrame; totalPacketsCaught += packetsPerFrame; if(packetsInFile >= (uint32_t)maxPacketsPerFile) createNewFile(); pthread_mutex_unlock(&progressMutex); #endif if(!once){ copyFrameToGui(buff); once = 1; } } remainingsize -= ((buff[0] + ndata) - data); data = buff[0] + ndata; if(data > (wbuffer[0] + HEADER_SIZE_NUM_TOT_PACKETS + npackets * onePacketSize) ) cprintf(BG_RED,"Writing_Thread %d: Error: Compression data goes out of bounds!\n", ithread); } while(!fifoFree[0]->push(wbuffer[0])); #ifdef DEBUG5 cprintf(GREEN,"Writing_Thread %d: Compression free pushed into fifofree %p for listerner 0\n", ithread, (void*)(wbuffer[0])); #endif }