/********************************************//** * @file UDPStandardImplementation.cpp * @short does all the functions for a receiver, set/get parameters, start/stop etc. ***********************************************/ #include "UDPStandardImplementation.h" #include "moench02ModuleData.h" #include "gotthardModuleData.h" #include "gotthardShortModuleData.h" #include // exit() #include //set precision for printing parameters for create new file #include //map #include #include #include #include #include //zmq #include using namespace std; #define WRITE_HEADERS /************************************************************************* * Constructor & Destructor ********************************************** * They access local cache of configuration or detector parameters ******* *************************************************************************/ UDPStandardImplementation::UDPStandardImplementation(){ FILE_LOG(logDEBUG) << __AT__ << " called"; initializeMembers(); //***mutex*** pthread_mutex_init(&statusMutex,NULL); pthread_mutex_init(&writeMutex,NULL); pthread_mutex_init(&dataReadyMutex,NULL); pthread_mutex_init(&progressMutex,NULL); //to increase socket receiver buffer size and max length of input queue by changing kernel settings if(myDetectorType == EIGER); else if(system("echo $((100*1024*1024)) > /proc/sys/net/core/rmem_max")){ FILE_LOG(logDEBUG) << "Warning: No root permission to change socket receiver buffer size in file /proc/sys/net/core/rmem_max"; }else if(system("echo 250000 > /proc/sys/net/core/netdev_max_backlog")){ FILE_LOG(logDEBUG) << "Warning: No root permission to change max length of input queue in file /proc/sys/net/core/netdev_max_backlog"; } /** permanent setting by heiner net.core.rmem_max = 104857600 # 100MiB net.core.netdev_max_backlog = 250000 sysctl -p // from the manual sysctl -w net.core.rmem_max=16777216 sysctl -w net.core.netdev_max_backlog=250000 */ } UDPStandardImplementation::~UDPStandardImplementation(){ FILE_LOG(logDEBUG) << __AT__ << " called"; for(int i=0;i(receiverData[i], csize, sigma, sign, moenchCommonModeSubtraction); } int UDPStandardImplementation::setupFifoStructure(){ FILE_LOG(logDEBUG) << __AT__ << " called"; //number of jobs per buffer int64_t i; int oldNumberofJobsPerBuffer = numberofJobsPerBuffer; //eiger always listens to 1 packet at a time if((myDetectorType == EIGER) || (myDetectorType == JUNGFRAU)){ numberofJobsPerBuffer = 1; FILE_LOG(logDEBUG) << "Info: 1 packet per buffer"; } //else calculate best possible number of frames to listen to at a time (for fast readouts like gotthard) else{ //if frequency to gui is not random (every nth frame), then listen to only n frames per buffer if(frameToGuiFrequency) numberofJobsPerBuffer = frameToGuiFrequency; //random frame sent to gui, then frames per buffer depends on acquisition period else{ //calculate 100ms/period to get frames to listen to at a time if(!acquisitionPeriod) i = SAMPLE_TIME_IN_NS; else i = SAMPLE_TIME_IN_NS/acquisitionPeriod; //max frames to listen to at a time is limited by 1000 if (i > MAX_JOBS_PER_THREAD) numberofJobsPerBuffer = MAX_JOBS_PER_THREAD; else if (i < 1) numberofJobsPerBuffer = 1; else numberofJobsPerBuffer = i; } FILE_LOG(logINFO) << "Number of Frames per buffer:" << numberofJobsPerBuffer << endl; } // fifo depth uint32_t oldFifoSize = fifoSize; fifoSize = fifoDepth; //reduce fifo depth if > 1 numberofJobsPerBuffer if(fifoSize % numberofJobsPerBuffer) fifoSize = (fifoSize/numberofJobsPerBuffer)+1; else fifoSize = fifoSize/numberofJobsPerBuffer; //do not rebuild fifo structure if it is the same (oldfifosize differs only for different packetsperframe) if((oldNumberofJobsPerBuffer == numberofJobsPerBuffer) && (oldFifoSize == fifoSize)) return OK; FILE_LOG(logINFO) << "Info: Total Fifo Size:" << fifoSize; //delete threads if(threadStarted){ createListeningThreads(true); createWriterThreads(true); } //set up fifo structure for(int i=0;iisEmpty()){ fifoFree[i]->pop(buffer[i]); //cprintf(BLUE,"FifoFree[%d]: value:%d, pop 0x%x\n",i,fifoFree[i]->getSemValue(),(void*)(buffer[i])); } delete fifoFree[i]; fifoFree[i] = 0; } if(fifo[i]){ while(!fifo[i]->isEmpty()){ fifo[i]->pop(buffer[i]); //cprintf(CYAN,"Fifo[%d]: value:%d, pop 0x%x\n",i,fifo[i]->getSemValue(),(void*)(buffer[i])); } delete fifo[i]; fifo[i] = 0; } if(mem0[i]){ free(mem0[i]); mem0[i] = 0; } //creating fifoFree[i] = new CircularFifo(fifoSize); fifo[i] = new CircularFifo(fifoSize); //allocate memory mem0[i] = (char*)malloc((bufferSize * numberofJobsPerBuffer + fifoBufferHeaderSize) * fifoSize); if (mem0[i] == NULL){ cprintf(BG_RED,"Error: Could not allocate memory for listening \n"); return FAIL; } //push free address into fifoFree buffer[i]=mem0[i]; while (buffer[i] < (mem0[i]+(bufferSize * numberofJobsPerBuffer + fifoBufferHeaderSize) * (fifoSize-1))) { //cprintf(BLUE,"fifofree %d: push 0x%p\n",i,(void*)buffer[i]); /*for(int k=0;kpush(buffer[i])); //cprintf(GREEN,"Fifofree[%d]: value:%d, push 0x%x\n",i,fifoFree[i]->getSemValue(),(void*)(buffer[i])); #ifdef DEBUG5 cprintf(BLUE,"Info: %d fifostructure free pushed into fifofree %p\n", i, (void*)(buffer[i])); #endif buffer[i] += (bufferSize * numberofJobsPerBuffer + fifoBufferHeaderSize); } } cout << "Fifo structure(s) reconstructed" << endl; //create threads if(createListeningThreads() == FAIL){ FILE_LOG(logERROR) << "Could not create listening thread"; return FAIL; } if(createWriterThreads() == FAIL){ FILE_LOG(logERROR) << "Could not create writer threads"; return FAIL; } setThreadPriorities(); return OK; } void UDPStandardImplementation::configure(map config_map){ FILE_LOG(logDEBUG) << __AT__ << " starting"; map::const_iterator pos; pos = config_map.find("mode"); if (pos != config_map.end() ){ int b; if(!sscanf(pos->second.c_str(), "%d", &b)){ cout << "Warning: Could not parse mode. Assuming top mode." << endl; b = 0; } bottomEnable = b!= 0; FILE_LOG(logINFO) << "Bottom: " << stringEnable(bottomEnable); } } void UDPStandardImplementation::setFileName(const char c[]){ FILE_LOG(logDEBUG) << __AT__ << " starting"; char oldfilename[MAX_STR_LENGTH]; strcpy(oldfilename,fileName); if(strlen(c)) strcpy(fileName, c); if(strlen(fileName)){ int detindex = -1; string tempname(fileName); size_t uscore=tempname.rfind("_"); if (uscore!=string::npos){ if (sscanf(tempname.substr(uscore+1,tempname.size()-uscore-1).c_str(),"d%d",&detindex)) { detID = detindex; } } if(detindex == -1) detID = 0; } FILE_LOG(logINFO) << "File name:" << fileName; } int UDPStandardImplementation::setDataCompressionEnable(const bool b){ FILE_LOG(logDEBUG) << __AT__ << " starting"; if(myDetectorType != EIGER){ cout << "Info: Setting up Data Compression Enable to " << stringEnable(b); #ifdef MYROOT1 cout << " WITH ROOT" << endl; #else cout << " WITHOUT ROOT" << endl; #endif } //set data compression enable dataCompressionEnable = b; //-- create writer threads depending on enable pthread_mutex_lock(&statusMutex); writerThreadsMask = 0x0; pthread_mutex_unlock(&(statusMutex)); createWriterThreads(true); if(b) numberofWriterThreads = MAX_NUMBER_OF_WRITER_THREADS; else numberofWriterThreads = 1; if(createWriterThreads() == FAIL){ cprintf(BG_RED,"Error: Could not create writer threads\n"); return FAIL; } //-- end of create writer threads setThreadPriorities(); //filter deleteFilter(); if(b) initializeFilter(); FILE_LOG(logINFO) << "Data Compression: " << stringEnable(dataCompressionEnable); return OK; } /***acquisition count parameters***/ uint64_t UDPStandardImplementation::getTotalFramesCaught() const{ FILE_LOG(logDEBUG) << __AT__ << " starting"; return (totalPacketsCaught/(packetsPerFrame*numberofListeningThreads)); } uint64_t UDPStandardImplementation::getFramesCaught() const{ FILE_LOG(logDEBUG) << __AT__ << " starting"; return (packetsCaught/(packetsPerFrame*numberofListeningThreads)); } /***acquisition parameters***/ void UDPStandardImplementation::setShortFrameEnable(const int i){ FILE_LOG(logDEBUG) << __AT__ << " called"; shortFrameEnable = i; if(shortFrameEnable!=-1){ bufferSize = GOTTHARD_SHORT_BUFFER_SIZE; onePacketSize = GOTTHARD_SHORT_BUFFER_SIZE; oneDataSize = GOTTHARD_SHORT_DATABYTES; maxFramesPerFile = SHORT_MAX_FRAMES_PER_FILE; packetsPerFrame = GOTTHARD_SHORT_PACKETS_PER_FRAME; frameIndexMask = GOTTHARD_SHORT_FRAME_INDEX_MASK; frameIndexOffset = GOTTHARD_SHORT_FRAME_INDEX_OFFSET; packetIndexMask = GOTTHARD_SHORT_PACKET_INDEX_MASK; }else{ bufferSize = GOTTHARD_BUFFER_SIZE; onePacketSize = GOTTHARD_ONE_PACKET_SIZE; oneDataSize = GOTTHARD_ONE_DATA_SIZE; maxFramesPerFile = MAX_FRAMES_PER_FILE; packetsPerFrame = GOTTHARD_PACKETS_PER_FRAME; frameIndexMask = GOTTHARD_FRAME_INDEX_MASK; frameIndexOffset = GOTTHARD_FRAME_INDEX_OFFSET; packetIndexMask = GOTTHARD_PACKET_INDEX_MASK; } //filter deleteFilter(); if(dataCompressionEnable) initializeFilter(); FILE_LOG(logINFO) << "Short Frame Enable: " << shortFrameEnable; } int UDPStandardImplementation::setFrameToGuiFrequency(const uint32_t freq){ FILE_LOG(logDEBUG) << __AT__ << " called"; frameToGuiFrequency = freq; if(setupFifoStructure() == FAIL) return FAIL; FILE_LOG(logINFO) << "Frame to Gui Frequency: " << frameToGuiFrequency; return OK; } uint32_t UDPStandardImplementation::setDataStreamEnable(const uint32_t enable){ FILE_LOG(logDEBUG) << __AT__ << " called"; int oldvalue = dataStreamEnable; dataStreamEnable = enable; FILE_LOG(logINFO) << "Data Send to Gui: " << dataStreamEnable; if(oldvalue!=dataStreamEnable){ //data sockets have to be created again as the client ones are if(zmqThreadStarted) createDataCallbackThreads(true); if(dataStreamEnable){ numberofDataCallbackThreads = numberofListeningThreads; if(createDataCallbackThreads() == FAIL){ cprintf(BG_RED,"Error: Could not create data callback threads\n"); } } } return OK; } int UDPStandardImplementation::setAcquisitionPeriod(const uint64_t i){ FILE_LOG(logDEBUG) << __AT__ << " called"; acquisitionPeriod = i; if(setupFifoStructure() == FAIL) return FAIL; FILE_LOG(logINFO) << "Acquisition Period: " << (double)acquisitionPeriod/(1E9) << "s"; return OK; } int UDPStandardImplementation::setNumberOfFrames(const uint64_t i){ FILE_LOG(logDEBUG) << __AT__ << " called"; numberOfFrames = i; if(setupFifoStructure() == FAIL) return FAIL; FILE_LOG(logINFO) << "Number of Frames:" << numberOfFrames; return OK; } int UDPStandardImplementation::setDynamicRange(const uint32_t i){ FILE_LOG(logDEBUG) << __AT__ << " called"; uint32_t oldDynamicRange = dynamicRange; FILE_LOG(logDEBUG) << "Info: Setting Dynamic Range to " << i; dynamicRange = i; if(myDetectorType == EIGER){ //set parameters depending on new dynamic range. packetsPerFrame = (tengigaEnable ? EIGER_TEN_GIGA_CONSTANT : EIGER_ONE_GIGA_CONSTANT) * dynamicRange; bufferSize = onePacketSize * packetsPerFrame; for(int i=0; iShutDownSocket(); FILE_LOG(logINFO) << "Shut down UDP Socket " << i; delete udpSocket[i]; udpSocket[i] = 0; } } return OK; } /** * Pre: status is running, udp sockets have been initialized, stop receiver initiated * Post:udp sockets closed, status is transmitting * */ void UDPStandardImplementation::startReadout(){ FILE_LOG(logDEBUG) << __AT__ << " called"; FILE_LOG(logDEBUG) << "Transmitting last data"; if(status == RUNNING){ //needs to wait for packets only if activated if(activated){ //check if all packets got int totalP = 0,prev=-1,i; for(i=0; igetCurrentTotalReceived(); //wait for all packets if(totalP!=numberOfFrames*packetsPerFrame*numberofListeningThreads){ //wait as long as there is change from prev totalP, //and also change from received in buffer to previous value //(as one listens to many at a time, shouldnt cut off in between) while((prev != totalP) || (prevReceivedInBuffer!= currentReceivedInBuffer)){ #ifdef DEBUG5 cprintf(MAGENTA,"waiting for all packets totalP:%d currently in buffer:%d\n",totalP,currentReceivedInBuffer); #endif usleep(5000);/* Need to find optimal time (exposure time and acquisition period) **/ prev = totalP; totalP = 0; for(i=0; igetCurrentTotalReceived(); #ifdef DEBUG5 cprintf(MAGENTA,"\tupdated: totalP:%d currently in buffer:%d\n",totalP,currentReceivedInBuffer); #endif } } } //set status pthread_mutex_lock(&statusMutex); status = TRANSMITTING; pthread_mutex_unlock(&statusMutex); FILE_LOG(logINFO) << "Status: Transmitting"; } //shut down udp sockets and make listeners push dummy (end) packets for writers shutDownUDPSockets(); } /**make this better by asking all of it at once*/ void UDPStandardImplementation::readFrame(int ithread, char* c,char** raw, int64_t &startAcq, int64_t &startFrame){ FILE_LOG(logDEBUG) << __AT__ << " called"; } void UDPStandardImplementation::closeFile(int ithread){ FILE_LOG(logDEBUG) << __AT__ << " called for " << ithread ; //normal if(!dataCompressionEnable){ if(sfilefd[ithread]){ #ifdef DEBUG4 FILE_LOG(logDEBUG4) << "Going to close file: " << fileno(sfilefd)); #endif fclose(sfilefd[ithread]); sfilefd[ithread] = 0; } } //compression else{ #if (defined(MYROOT1) && defined(ALLFILE_DEBUG)) || !defined(MYROOT1) if(sfilefd[0]){ #ifdef DEBUG4 FILE_LOG(logDEBUG4) << "sfilefd: " << (int)sfilefd[i]; #endif fclose(sfilefd[0]); sfilefd[0] = 0; } #endif #ifdef MYROOT1 pthread_mutex_lock(&writeMutex); //write to file if(myTree[ithread] && myFile[ithread]){ myFile[ithread] = myTree[ithread]->GetCurrentFile(); if(myFile[ithread]->Write()) //->Write(tall->GetName(),TObject::kOverwrite); cout << "Thread " << ithread <<": wrote frames to file" << endl; else cout << "Thread " << ithread << ": could not write frames to file" << endl; }else cout << "Thread " << ithread << ": could not write frames to file: No file or No Tree" << endl; //close file if(myTree[ithread] && myFile[ithread]) myFile[ithread] = myTree[ithread]->GetCurrentFile(); if(myFile[ithread] != 0) myFile[ithread]->Close(); myFile[ithread] = 0; myTree[ithread] = 0; pthread_mutex_unlock(&writeMutex); #endif } } /************************************************************************* * Listening and Writing Threads ***************************************** *************************************************************************/ int UDPStandardImplementation::createDataCallbackThreads(bool destroy){ FILE_LOG(logDEBUG) << __AT__ << " starting"; if(!destroy) cprintf(MAGENTA,"Data Callback thread created\n"); else cprintf(MAGENTA,"Data Callback thread destroyed\n"); //reset masks killAllDataCallbackThreads = false; pthread_mutex_lock(&statusMutex); dataCallbackThreadsMask = 0x0; pthread_mutex_unlock(&(statusMutex)); //destroy if(destroy){ FILE_LOG(logDEBUG) << "Info: Destroying Data Callback Thread(s)"; killAllDataCallbackThreads = true; for(int i = 0; i < numberofDataCallbackThreads; ++i){ sem_post(&dataCallbackSemaphore[i]); pthread_join(dataCallbackThreads[i],NULL); sem_destroy(&dataCallbackSemaphore[i]); FILE_LOG(logDEBUG) << "." << flush; } killAllDataCallbackThreads = false; zmqThreadStarted = false; FILE_LOG(logINFO) << "Data Callback thread(s) destroyed"; } //create else{ FILE_LOG(logINFO) << "Creating Data Callback Thread(s)"; //reset current index currentThreadIndex = -1; for(int i = 0; i < numberofDataCallbackThreads; ++i){ sem_init(&dataCallbackSemaphore[i],1,0); zmqThreadStarted = false; currentThreadIndex = i; if(pthread_create(&dataCallbackThreads[i], NULL,startDataCallbackThread, (void*) this)){ FILE_LOG(logERROR) << "Could not create data call back thread with index " << i; return FAIL; } while(!zmqThreadStarted); FILE_LOG(logDEBUG) << "." << flush; } FILE_LOG(logDEBUG) << "Info: Data Callback thread(s) created successfully."; } return OK; } int UDPStandardImplementation::createListeningThreads(bool destroy){ FILE_LOG(logDEBUG) << __AT__ << " starting"; if(!destroy) cprintf(BLUE,"Listening thread created\n"); else cprintf(BLUE,"Listening thread destroyed\n"); //reset masks killAllListeningThreads = false; pthread_mutex_lock(&statusMutex); listeningThreadsMask = 0x0; pthread_mutex_unlock(&(statusMutex)); //destroy if(destroy){ FILE_LOG(logDEBUG) << "Info: Destroying Listening Thread(s)"; killAllListeningThreads = true; for(int i = 0; i < numberofListeningThreads; ++i){ sem_post(&listenSemaphore[i]); pthread_join(listeningThreads[i],NULL); sem_destroy(&listenSemaphore[i]); FILE_LOG(logDEBUG) << "." << flush; } killAllListeningThreads = false; threadStarted = false; FILE_LOG(logINFO) << "Listening thread(s) destroyed"; } //create else{ FILE_LOG(logINFO) << "Creating Listening Thread(s)"; //reset current index currentThreadIndex = -1; for(int i = 0; i < numberofListeningThreads; ++i){ sem_init(&listenSemaphore[i],1,0); threadStarted = false; currentThreadIndex = i; if(pthread_create(&listeningThreads[i], NULL,startListeningThread, (void*) this)){ FILE_LOG(logERROR) << "Could not create listening thread with index " << i; return FAIL; } while(!threadStarted); FILE_LOG(logDEBUG) << "." << flush; } FILE_LOG(logDEBUG) << "Info: Listening thread(s) created successfully."; } return OK; } int UDPStandardImplementation::createWriterThreads(bool destroy){ FILE_LOG(logDEBUG) << __AT__ << " starting"; if(!destroy) cprintf(GREEN,"Writer thread created\n"); else cprintf(GREEN,"Writer thread destroyed\n"); //reset masks killAllWritingThreads = false; pthread_mutex_lock(&statusMutex); writerThreadsMask = 0x0; createFileMask = 0x0; pthread_mutex_unlock(&(statusMutex)); //destroy threads if(destroy){ FILE_LOG(logDEBUG) << "Info: Destroying Writer Thread(s)"; killAllWritingThreads = true; for(int i = 0; i < numberofWriterThreads; ++i){ sem_post(&writerSemaphore[i]); pthread_join(writingThreads[i],NULL); sem_destroy(&writerSemaphore[i]); FILE_LOG(logDEBUG) <<"."<getErrorStatus(); if(!iret){ cout << "UDP port opened at port " << port[i] << endl; }else{ FILE_LOG(logERROR) << "Could not create UDP socket on port " << port[i] << " error: " << iret; shutDownUDPSockets(); return FAIL; } } FILE_LOG(logDEBUG) << "UDP socket(s) created successfully."; cout << "Listener Ready ..." << endl; return OK; } int UDPStandardImplementation::setupWriter(){ FILE_LOG(logDEBUG) << __AT__ << " starting"; //acquisition start call back returns enable write cbAction = DO_EVERYTHING; if (startAcquisitionCallBack) cbAction=startAcquisitionCallBack(filePath,fileNamePerThread[0],(int)fileIndex,bufferSize,pStartAcquisition); if(cbAction < DO_EVERYTHING){ FILE_LOG(logINFO) << "Call back activated. Data saving must be taken care of by user in call back."; if (rawDataReadyCallBack){ FILE_LOG(logINFO) << "Data Write has been defined externally"; } }else if(!fileWriteEnable){ FILE_LOG(logINFO) << "Data will not be saved"; } //creating first file //setting all value to 1 pthread_mutex_lock(&statusMutex); for(int i=0; i DO_NOTHING){ //close file pointers if(sfilefd[ithread]){ fclose(sfilefd[ithread]); sfilefd[ithread] = 0; } //create file if(!overwriteEnable){ if (NULL == (sfilefd[ithread] = fopen((const char *) (completeFileName[ithread]), "wx"))){ FILE_LOG(logERROR) << "Could not create/overwrite file" << completeFileName[ithread]; sfilefd[ithread] = 0; return FAIL; } }else if (NULL == (sfilefd[ithread] = fopen((const char *) (completeFileName[ithread]), "w"))){ FILE_LOG(logERROR) << "Could not create file" << completeFileName[ithread]; sfilefd[ithread] = 0; return FAIL; } //setting file buffer size to 16mb setvbuf(sfilefd[ithread],NULL,_IOFBF,BUF_SIZE); //Print packet loss and filenames if(!totalWritingPacketCount[ithread]){ frameNumberInPreviousFile[ithread] = -1; //printf("Thread:%d File:%s\n",ithread,completeFileName[ithread]); }else{ printf("\nThread:%d File:%s\n" //"\ttotalpacketsinfile:%d\t" "Packets Lost:%d" "\tCurrentFrameNumber:%lld\tPreviousFrameNumber:%lld" "\n", ithread,completeFileName[ithread], //totalPacketsInFile[ithread], ( ((int)(currentFrameNumber[ithread]-frameNumberInPreviousFile[ithread])*packetsPerFrame) - totalPacketsInFile[ithread]) ,currentFrameNumber[ithread],frameNumberInPreviousFile[ithread] ); } //write file header if(myDetectorType == EIGER) fwrite((void*)fileHeader[ithread], 1, FILE_HEADER_SIZE, sfilefd[ithread]); } //reset counters for each new file if(totalWritingPacketCount[ithread]){ frameNumberInPreviousFile[ithread] = currentFrameNumber[ithread]; totalPacketsInFile[ithread] = 0; }else frameNumberInPreviousFile[ithread] = -1; return OK; } int UDPStandardImplementation::createCompressionFile(int ithread, int iframe){ FILE_LOG(logDEBUG) << __AT__ << " called"; #ifdef MYROOT1 char temp[MAX_STR_LENGTH]; //create file name for gui purposes, and set up acquistion parameters sprintf(temp, "%s/%s_fxxx_%d_%d.root", filePath,fileNamePerThread[ithread],fileIndex,ithread); //file myFile[ithread] = new TFile(temp,"RECREATE");/** later return error if it exists */ cprintf(GREEN,"Writing_Thread %d: Created Compression File: %s\n",ithread, temp); //tree sprintf(temp, "%s_fxxx_%d_%d",fileNamePerThread[ithread],fileIndex,ithread); myTree[ithread]=singlePhotonDetectorObject[ithread]->initEventTree(temp, &iframe); //resets the pedestalSubtraction array and the commonModeSubtraction singlePhotonDetectorObject[ithread]->newDataSet(); if(myFile[ithread]==NULL){ FILE_LOG(logERROR) << "File Null"; return FAIL; } if(!myFile[ithread]->IsOpen()){ FILE_LOG(logERROR) << "File Not Open"; return FAIL; } return OK; #endif return FAIL; } void* UDPStandardImplementation::startDataCallbackThread(void* this_pointer){ FILE_LOG(logDEBUG) << __AT__ << " called"; ((UDPStandardImplementation*)this_pointer)->startDataCallback(); return this_pointer; } void* UDPStandardImplementation::startListeningThread(void* this_pointer){ FILE_LOG(logDEBUG) << __AT__ << " called"; ((UDPStandardImplementation*)this_pointer)->startListening(); return this_pointer; } void* UDPStandardImplementation::startWritingThread(void* this_pointer){ FILE_LOG(logDEBUG) << __AT__ << " called"; ((UDPStandardImplementation*)this_pointer)->startWriting(); return this_pointer; } void UDPStandardImplementation::startDataCallback(){ FILE_LOG(logDEBUG) << __AT__ << " called"; //set current thread value index int ithread = currentThreadIndex; struct timespec begin,end; // server address to bind char hostName[100] = "tcp://*:";//"tcp://127.0.0.1:"; int portno = DEFAULT_ZMQ_PORTNO + (detID*numberofListeningThreads+ithread); sprintf(hostName,"%s%d",hostName,portno); //socket details void *context = zmq_ctx_new(); void *zmqsocket = zmq_socket(context, ZMQ_PUSH); // create a publisher int val = -1; zmq_setsockopt(zmqsocket, ZMQ_LINGER, &val,sizeof(val)); // wait for the unsent packets before closing socket //val = 10; //zmq_setsockopt(zmqsocket,ZMQ_SNDHWM,&val,sizeof(val)); //set SEND HIGH WATER MARK (8-9ms slower) cprintf(RED,"bind ret: %d\n",zmq_bind(zmqsocket,hostName)); // bind FILE_LOG(logINFO) << "Thread" << ithread << ": ZMQ Server at " << hostName; int headersize=0; switch(myDetectorType){ case EIGER: headersize = EIGER_DATA_PACKET_HEADER_SIZE; break; default: headersize = 0; break; } //let calling function know thread started and obtained current (after sockets created) if(!zmqThreadStarted) zmqThreadStarted = true; /* outer loop - loops once for each acquisition */ //infinite loop, exited only to change dynamic range, 10G parameters etc (then recreated again) while(true){ int oneframesize = oneDataSize * packetsPerFrame; char* buffer = new char[packetsPerFrame*oneDataSize]; memset(buffer,0xFF,oneframesize); int bufferoffset = 0; int size = 0; int offset = 0; int currentfnum = 0; uint64_t fnum = 0; uint32_t pnum = 0; uint32_t snum = 0; bool randomSendNow = true; bool newFrame = false; //header details const char *type = "float64"; const char *shape= "[1024, 512]"; const char *jsonFmt ="{\"htype\":[\"chunk-1.0\"], \"type\":\"%s\", \"shape\":%s, \"acqIndex\":%d, \"fIndex\":%d, \"subfnum\":%d, \"fname\":\"%s\"}"; char buf[1000]; int acquisitionIndex = -1; int frameIndex = -1; int subframeIndex = -1; #ifdef DEBUG int oldpnum = -1; #endif int datapacketscaught = 0; /* inner loop - loop for each buffer */ //until mask reset (dummy pcaket got by writer) while((1 << ithread) & dataCallbackThreadsMask){ //let the writer thread continue, while we process carry over if any sem_post(&writerGuiSemaphore[ithread]); //wait for receiver to send more data sem_wait(&dataCallbackWriterSemaphore[ithread]); //end if acquistion if(guiNumPackets[ithread] == dummyPacketValue){ //sending previous half frames if any if((myDetectorType != JUNGFRAU) && newFrame){ //send header //update frame details frameIndex = fnum;if(frameIndex==-1) cprintf(RED,"frameindex = -1, 111\n"); acquisitionIndex = fnum - startAcquisitionIndex; if(dynamicRange == 32) subframeIndex = snum; int len = sprintf(buf,jsonFmt,type,shape, acquisitionIndex, frameIndex, subframeIndex,completeFileName[ithread]); zmq_send(zmqsocket, buf,len, ZMQ_SNDMORE); //send data zmq_send(zmqsocket, buffer, oneframesize, 0); newFrame = false; } //send final header //update frame details #ifdef DEBUG cout << "sending dummy" << endl; #endif frameIndex = -9; acquisitionIndex = -9; subframeIndex = -9; int len = sprintf(buf,jsonFmt,type,shape, acquisitionIndex, frameIndex, subframeIndex,completeFileName[ithread]); zmq_send(zmqsocket, buf,len, ZMQ_SNDMORE); //send final data zmq_send (zmqsocket, "end", 3, 0); pthread_mutex_lock(&statusMutex); dataCallbackThreadsMask^=(1<0) && (getFrameandPacketNumber(ithread, latestData[ithread]+offset, fnum, pnum,snum)==FAIL)){ offset+= onePacketSize; } //if(!ithread) cout<< ithread <<" fnum:"<< fnum<<" pnum:"<= size) break; if(!frameToGuiFrequency) currentfnum = fnum; //last packet of same frame if(fnum == currentfnum && pnum == (packetsPerFrame-1)){ #ifdef DEBUG oldpnum=0; #endif memcpy(buffer+(pnum*oneDataSize), latestData[ithread]+offset+headersize,oneDataSize); offset+= onePacketSize; //send header //update frame details frameIndex = fnum; acquisitionIndex = fnum - startAcquisitionIndex; if(dynamicRange == 32) subframeIndex = snum; int len = sprintf(buf,jsonFmt,type,shape, acquisitionIndex, frameIndex, subframeIndex,completeFileName[ithread]); zmq_send(zmqsocket, buf,len, ZMQ_SNDMORE); //send data zmq_send(zmqsocket, buffer, oneframesize, 0); newFrame = false; #ifdef DEBUG if(!ithread)cprintf(BLUE,"%d sent (last packet)\n",ithread); #endif currentfnum++; //start clock after sending if(!frameToGuiFrequency){ randomSendNow = false; clock_gettime(CLOCK_REALTIME, &begin); } memset(buffer,0xFF,oneframesize); } //same frame (not last) or next frame else { //next frame #ifdef DEBUG int once = true; #endif while(fnum > currentfnum){ #ifdef DEBUG if(once){ if((fnum-currentfnum-1)>1) cprintf(RED,"%d Complete sub image missing:%d (cfnum:%d nfnum:%d)\n", ithread,fnum-currentfnum-1,currentfnum,fnum); once = false; } #endif //send header //update frame details frameIndex = fnum; acquisitionIndex = fnum - startAcquisitionIndex; if(dynamicRange == 32) subframeIndex = snum; int len = sprintf(buf,jsonFmt,type,shape, acquisitionIndex, frameIndex, subframeIndex,completeFileName[ithread]); zmq_send(zmqsocket, buf,len, ZMQ_SNDMORE); //send data zmq_send(zmqsocket, buffer, oneframesize, 0); newFrame = false; #ifdef DEBUG cprintf(BLUE,"%d sent (last packet of previous frame)\n",ithread); #endif currentfnum++; //start clock after sending if(!frameToGuiFrequency){ randomSendNow = false; clock_gettime(CLOCK_REALTIME, &begin); } memset(buffer,0xFF,oneframesize); } memcpy(buffer+(pnum*oneDataSize), latestData[ithread]+offset+headersize,oneDataSize); offset+= onePacketSize; newFrame = true; } } } }/*--end of loop for each buffer (inner loop)*/ //free resources delete[] buffer; //end of acquisition, wait for next acquisition/change of parameters sem_wait(&dataCallbackSemaphore[ithread]); //check to exit thread (for change of parameters) - only EXIT possibility if(killAllDataCallbackThreads){ break;//pthread_exit(NULL); } }/*--end of loop for each acquisition (outer loop) */ //free resources zmq_unbind(zmqsocket, hostName); /* will this be too soon and cut the sending*/ zmq_close(zmqsocket); zmq_ctx_destroy(context); cprintf(MAGENTA,"DataCallback_Thread %d:Goodbye!\n",ithread); } void UDPStandardImplementation::startListening(){ FILE_LOG(logDEBUG) << __AT__ << " called"; //set current thread value index int ithread = currentThreadIndex; //let calling function know thread started and obtained current threadStarted = 1; uint32_t rc; //size of buffer received in bytes //split frames for data compression int carryonBufferSize; //from previous buffer to keep frames together in a buffer char* tempBuffer = 0; //temporary buffer to store split frames /* outer loop - loops once for each acquisition */ //infinite loop, exited only to change dynamic range, 10G parameters etc (then recreated again) while(true){ //compression variables reset before acquisition carryonBufferSize = 0; if(dataCompressionEnable){ if(tempBuffer){delete []tempBuffer;tempBuffer=0;} tempBuffer = new char[onePacketSize * (packetsPerFrame - 1)]; //store maximum of 1 packets less in a frame } /* inner loop - loop for each buffer */ //until mask reset (udp sockets shut down by client) while((1 << ithread) & listeningThreadsMask){ //pop from fifo fifoFree[ithread]->pop(buffer[ithread]); #ifdef EVERYFIFODEBUG if(fifoFree[ithread]->getSemValue()<100) cprintf(BLUE,"FifoFree[%d]: value:%d, pop 0x%x\n",ithread,fifoFree[ithread]->getSemValue(),(void*)(buffer[ithread])); #endif #ifdef CFIFODEBUG if(ithread == 0) cprintf(CYAN,"Listening_Thread %d :Listener popped from fifofree %p\n", ithread, (void*)(buffer[ithread])); else cprintf(YELLOW,"Listening_Thread %d :Listener popped from fifofree %p\n", ithread, (void*)(buffer[ithread])); #endif //udpsocket doesnt exist if(activated && !udpSocket[ithread]){ FILE_LOG(logERROR) << "Listening_Thread " << ithread << ": UDP Socket not created or shut down earlier"; stopListening(ithread,0); continue; } rc = prepareAndListenBuffer(ithread, carryonBufferSize, tempBuffer); carryonBufferSize = 0; //start indices for each start of scan/acquisition if((!measurementStarted[ithread]) && (rc > 0)) startFrameIndices(ithread); //problem in receiving or end of acquisition if (status == TRANSMITTING||(rc == 0 && activated == 0)){ stopListening(ithread,rc); continue; } //write packet count to buffer if(myDetectorType == JUNGFRAU) (*((uint32_t*)(buffer[ithread]))) = (rc/oneDataSize); else (*((uint32_t*)(buffer[ithread]))) = (rc/onePacketSize); if(dataCompressionEnable) (*((uint32_t*)(buffer[ithread]))) = processListeningBuffer(ithread, carryonBufferSize, tempBuffer, rc); //push buffer to FIFO while(!fifo[ithread]->push(buffer[ithread])); #ifdef EVERYFIFODEBUG if(fifo[ithread]->getSemValue()>(fifoSize-100)) cprintf(MAGENTA,"Fifo[%d]: value:%d, push 0x%x\n",ithread,fifo[ithread]->getSemValue(),(void*)(buffer[ithread])); #endif #ifdef CFIFODEBUG if(ithread == 0) cprintf(CYAN,"Listening_Thread %d: Listener pushed into fifo %p\n",ithread, (void*)(buffer[ithread])); else cprintf(YELLOW,"Listening_Thread %d: Listener pushed into fifo %p\n",ithread, (void*)(buffer[ithread])); #endif }/*--end of loop for each buffer (inner loop)*/ //end of acquisition, wait for next acquisition/change of parameters sem_wait(&listenSemaphore[ithread]); //check to exit thread (for change of parameters) - only EXIT possibility if(killAllListeningThreads){ cprintf(BLUE,"Listening_Thread %d:Goodbye!\n",ithread); //free resources at exit if(tempBuffer) delete[] tempBuffer; pthread_exit(NULL); } }/*--end of loop for each acquisition (outer loop) */ } int UDPStandardImplementation::prepareAndListenBuffer(int ithread, int cSize, char* temp){ FILE_LOG(logDEBUG) << __AT__ << " called"; int receivedSize = 0; //carry over from previous buffer if(cSize) memcpy(buffer[ithread] + fifoBufferHeaderSize, temp, cSize); if(!activated){ //cSize = 0 for deactivated int framestoclone = 0; //first if(deactivatedFrameNumber[ithread]==0) deactivatedFrameNumber[ithread]++; //done if(deactivatedFrameNumber[ithread] == (numberOfFrames+1)) return 0; //last if((deactivatedFrameNumber[ithread] + deactivatedFrameIncrement) > (numberOfFrames+1)) framestoclone = (numberOfFrames+1) - deactivatedFrameNumber[ithread]; //in progress else framestoclone = deactivatedFrameIncrement; //copy dummy packets receivedSize = framestoclone*packetsPerFrame*onePacketSize; memset(buffer[ithread] + fifoBufferHeaderSize, 0xFF,receivedSize); //set fnum, pnum and deactivatedpacket label eiger_packet_header_t* header; eiger_packet_footer_t* footer; int pnum=0; //loop by each packet for(int offset=fifoBufferHeaderSize; offsetpacketNumber) = ++pnum; #ifdef MANUALDEBUG if(!ithread){ cprintf(GREEN,"thread:%d pnum:%d fnum:%d\n", ithread, (*( (uint16_t*) footer->packetNumber)), (uint32_t)(*( (uint64_t*) footer))); } #endif if(pnum == packetsPerFrame){ pnum = 0; deactivatedFrameNumber[ithread]++; } } return receivedSize; } if(status != TRANSMITTING){ if(myDetectorType == JUNGFRAU){ jfrau_packet_header_t* header; int offset = fifoBufferHeaderSize; int pnum = packetsPerFrame-1; int currentpnum; int currentfnum=-1; //read first packet receivedSize = udpSocket[ithread]->ReceiveDataOnly(buffer[ithread] + offset); if(!receivedSize) return 0; header = (jfrau_packet_header_t*)(buffer[ithread] + offset); currentpnum = (*( (uint8_t*) header->packetNumber)); //cout<<"1 currentpnum:"<frameNumber))&frameIndexMask); (*((uint64_t*)(buffer[ithread]+HEADER_SIZE_NUM_TOT_PACKETS+JFRAU_FILE_HEADER_BUNCHID_OFFSET))) = (*( (uint64_t*) header->bunchid)); //cout<<"current fnum:"<<(*((uint32_t*)(buffer[ithread]+8)))<ReceiveDataOnly(buffer[ithread] + offset); if(!receivedSize){ totalIgnoredPacketCount[ithread] += (packetsPerFrame - pnum); return 0; } header = (jfrau_packet_header_t*)(buffer[ithread] + offset); currentpnum = (*( (uint8_t*) header->packetNumber)); //cout<<"next currentpnum :"<ReceiveDataOnly(buffer[ithread] + offset); if(!receivedSize) return 0; header = (jfrau_packet_header_t*)(buffer[ithread] + offset); currentpnum = (*( (uint8_t*) header->packetNumber)); //cout<<"trying to find: currentpnum:"<ReceiveDataOnly(buffer[ithread] + fifoBufferHeaderSize + cSize, (bufferSize * numberofJobsPerBuffer) - cSize); //eiger returns 0 when header packet caught while(receivedSize < onePacketSize && status != TRANSMITTING) receivedSize = udpSocket[ithread]->ReceiveDataOnly(buffer[ithread] + fifoBufferHeaderSize + cSize, (bufferSize * numberofJobsPerBuffer) - cSize); } } totalListeningPacketCount[ithread] += (receivedSize/onePacketSize); #ifdef MANUALDEBUG if(receivedSize>0){ if(myDetectorType == JUNGFRAU){ jfrau_packet_header_t* header; for(int iloop=0;iloop<128;iloop++){ header = (jfrau_packet_header_t*)(buffer[ithread] + fifoBufferHeaderSize + iloop * (JFRAU_HEADER_LENGTH+JFRAU_ONE_DATA_SIZE)); cprintf(RED,"[%d]: packetnumber:%d\n",iloop, (*( (uint8_t*) header->packetNumber))); cprintf(RED," : framenumber :%d\n", (*( (uint32_t*) header->frameNumber))&frameIndexMask); } }else if(myDetectorType == EIGER){ eiger_packet_header_t* header = (eiger_packet_header_t*) (buffer[ithread]+fifoBufferHeaderSize); eiger_packet_footer_t* footer = (eiger_packet_footer_t*)(buffer[ithread] + footerOffset + fifoBufferHeaderSize); cprintf(GREEN,"thread:%d footeroffset:%dsubframenum:%d oldpacketnum:%d new pnum:%d new fnum:%d\n", ithread,footerOffset, (*( (unsigned int*) header->subFrameNumber)), (*( (uint8_t*) header->dynamicRange)), (*( (uint16_t*) footer->packetNumber)), (uint32_t)(*( (uint64_t*) footer))); } } #endif #ifdef DEBUG cprintf(BLUE, "Listening_Thread %d : Received bytes: %d. Expected bytes: %d\n", ithread, receivedSize, bufferSize * numberofJobsPerBuffer-cSize); #endif return receivedSize; } void UDPStandardImplementation::startFrameIndices(int ithread){ FILE_LOG(logDEBUG) << __AT__ << " called"; //determine startFrameIndex jfrau_packet_header_t* header=0; switch(myDetectorType){ case EIGER: startFrameIndex = 0; //frame number always resets break; case JUNGFRAU: startFrameIndex = (*((uint32_t*)(buffer[ithread]+HEADER_SIZE_NUM_TOT_PACKETS))); break; default: if(shortFrameEnable < 0){ startFrameIndex = (((((uint32_t)(*((uint32_t*)(buffer[ithread] + fifoBufferHeaderSize))))+1) & (frameIndexMask)) >> frameIndexOffset); }else{ startFrameIndex = ((((uint32_t)(*((uint32_t*)(buffer[ithread]+fifoBufferHeaderSize)))) & (frameIndexMask)) >> frameIndexOffset); } break; } //start of entire acquisition if(!acqStarted){ pthread_mutex_lock(&progressMutex); startAcquisitionIndex = startFrameIndex; acqStarted = true; pthread_mutex_unlock(&progressMutex); cprintf(BLUE,"Listening_Thread %d: startAcquisitionIndex:%lld\n",ithread,(long long int)startAcquisitionIndex); } //set start of scan/real time measurement cprintf(BLUE,"Listening_Thread %d: startFrameIndex: %lld\n", ithread,(long long int)startFrameIndex); measurementStarted[ithread] = true; } void UDPStandardImplementation::stopListening(int ithread, int numbytes){ FILE_LOG(logDEBUG) << __AT__ << " called"; #ifdef DEBUG4 cprintf(BLUE,"Listening_Thread %d: Stop Listening\nStatus: %s numbytes:%d\n", ithread, runStatusType(status).c_str(),numbytes); #endif //less than 1 packet size (especially for eiger), ignore the buffer (so that 2 dummy buffers are not sent with pc=0) if(numbytes < onePacketSize) numbytes = 0; //free empty buffer if(numbytes <= 0){ FILE_LOG(logINFO) << "Listening "<< ithread << ": End of Acquisition"; while(!fifoFree[ithread]->push(buffer[ithread])); #ifdef EVERYFIFODEBUG if(fifoFree[ithread]->getSemValue()<100) cprintf(GREEN,"Fifofree[%d]: value:%d, push 0x%x\n",ithread,fifoFree[ithread]->getSemValue(),(void*)(buffer[ithread])); #endif #ifdef CFIFODEBUG if(ithread == 0) cprintf(CYAN,"Listening_Thread %d :Listener push empty buffer into fifofree %p\n", ithread, (void*)(buffer[ithread])); else cprintf(YELLOW,"Listening_Thread %d :Listener push empty buffer into fifofree %p\n", ithread, (void*)(buffer[ithread])); #endif } //push last non empty buffer into fifo else{ (*((uint32_t*)(buffer[ithread]))) = numbytes/onePacketSize; totalListeningPacketCount[ithread] += (numbytes/onePacketSize); #ifdef DEBUG cprintf(BLUE,"Listening_Thread %d: Last Buffer numBytes:%d\n",ithread, numbytes); cprintf(BLUE,"Listening_Thread %d: Last Buffer packet count:%d\n",ithread, numbytes/onePacketSize); #endif while(!fifo[ithread]->push(buffer[ithread])); #ifdef EVERYFIFODEBUG if(fifo[ithread]->getSemValue()>(fifoSize-100)) cprintf(MAGENTA,"Fifo[%d]: value:%d, push 0x%x\n",ithread,fifo[ithread]->getSemValue(),(void*)(buffer[ithread])); #endif #ifdef CFIFODEBUG if(ithread == 0) cprintf(CYAN,"Listening_Thread %d: Listener Last Buffer pushed into fifo %p\n", ithread,(void*)(buffer[ithread])); else cprintf(YELLOW,"Listening_Thread %d: Listener Last Buffer pushed into fifo %p\n", ithread,(void*)(buffer[ithread])); #endif } //push dummy-end buffer into fifo for all writer threads fifoFree[ithread]->pop(buffer[ithread]); #ifdef EVERYFIFODEBUG if(fifoFree[ithread]->getSemValue()<100) cprintf(BLUE,"FifoFree[%d]: value:%d, pop 0x%x\n",ithread,fifoFree[ithread]->getSemValue(),(void*)(buffer[ithread])); #endif #ifdef CFIFODEBUG if(ithread == 0) cprintf(CYAN,"Listening_Thread %d: Popped Dummy from fifoFree %p\n", ithread,(void*)(buffer[ithread])); else cprintf(YELLOW,"Listening_Thread %d: Popped Dummy from fifoFree %p\n", ithread,(void*)(buffer[ithread])); #endif //creating dummy-end buffer with pc=0xFFFF (*((uint32_t*)(buffer[ithread]))) = dummyPacketValue; while(!fifo[ithread]->push(buffer[ithread])); #ifdef EVERYFIFODEBUG if(fifo[ithread]->getSemValue()>(fifoSize-100)) cprintf(MAGENTA,"Fifo[%d]: value:%d, push 0x%x\n",ithread,fifo[ithread]->getSemValue(),(void*)(buffer[ithread])); #endif #ifdef CFIFODEBUG if(ithread == 0) cprintf(CYAN,"Listening_Thread %d: Listener pushed dummy-end buffer into fifo %p\n", ithread,(void*)(buffer[ithread])); else cprintf(YELLOW,"Listening_Thread %d: Listener pushed dummy-end buffer into fifo %p\n", ithread,(void*)(buffer[ithread])); #endif //reset mask and exit loop pthread_mutex_lock(&statusMutex); listeningThreadsMask^=(1< 1) cprintf(BLUE,"Listening_Thread %d: Waiting for other listening threads to be done.. current mask:0x%x\n", ithread, listeningThreadsMask); #endif while(listeningThreadsMask) usleep(5000); #ifdef DEBUG4 int t=0; for(i=0;i> frameIndexOffset)); #endif cSize = onePacketSize; --packetCount; } } #ifdef DEBUG4 cprintf(BLUE, "Listening_Thread %d: First Header:%d\n", (((((uint32_t)(*((uint32_t*)(buffer[ithread] + fifoBufferHeaderSize))))+1) & (frameIndexMask)) >> frameIndexOffset)); #endif break; case MOENCH: lastPacketOffset = (((packetCount - 1) * onePacketSize) + fifoBufferHeaderSize); #ifdef DEBUG4 cprintf(BLUE, "Listening_Thread %d: First Header:%d\t First Packet:%d\t Last Header:%d\t Last Packet:%d\tLast Packet Offset:%d\n", (((((uint32_t)(*((uint32_t*)(buffer[ithread]+fifoBufferHeaderSize))))) & (frameIndexMask)) >> frameIndexOffset), ((((uint32_t)(*((uint32_t*)(buffer[ithread]+fifoBufferHeaderSize))))) & (packetIndexMask)), (((((uint32_t)(*((uint32_t*)(buffer[ithread]+lastpacketoffset))))) & (frameIndexMask)) >> frameIndexOffset), ((((uint32_t)(*((uint32_t*)(buffer[ithread]+lastpacketoffset))))) & (packetIndexMask)), lastPacketOffset); #endif //moench last packet value is 0, so find the last packet and store the others in a temp storage if( ((((uint32_t)(*((uint32_t*)(buffer[ithread]+lastPacketOffset))))) & (packetIndexMask))){ lastFrameHeader = ((((uint32_t)(*((uint32_t*)(buffer[ithread]+lastPacketOffset))))) & (frameIndexMask)) >> frameIndexOffset; cSize += onePacketSize; lastPacketOffset -= onePacketSize; --packetCount; while (lastFrameHeader == (((((uint32_t)(*((uint32_t*)(buffer[ithread]+lastPacketOffset))))) & (frameIndexMask)) >> frameIndexOffset)){ cSize += onePacketSize; lastPacketOffset -= onePacketSize; --packetCount; } memcpy(temp, buffer[ithread]+(lastPacketOffset+onePacketSize), cSize); #ifdef DEBUG4 cprintf(BLUE, "Listening_Thread %d: temp Header:%d\t temp Packet:%d\n", (((((uint32_t)(*((uint32_t*)(temp)))))& (frameIndexMask)) >> frameIndexOffset), ((((uint32_t)(*((uint32_t*)(temp))))) & (packetIndexMask))); #endif } break; case JUNGFRAU: lastPacketOffset = (((packetCount - 1) * onePacketSize) + fifoBufferHeaderSize); #ifdef DEBUG4 header = (jfrau_packet_header_t*) (buffer[ithread]+fifoBufferHeaderSize); cprintf(BLUE, "Listening_Thread: First Header:%d\t First Packet:%d\n", (*( (uint32_t*) header->frameNumber))&frameIndexMask, (*( (uint8_t*) header->packetNumber))); #endif header = (jfrau_packet_header_t*) (buffer[ithread]+lastPacketOffset); #ifdef DEBUG4 cprintf(BLUE, "Listening_Thread: Last Header:%du\t Last Packet:%d\n", (*( (uint32_t*) header->frameNumber))&frameIndexMask, (*( (uint8_t*) header->packetNumber))); #endif //jungfrau last packet value is 0, so find the last packet and store the others in a temp storage if((*( (uint8_t*) header->packetNumber))){ //cprintf(RED,"entering missing packet zone\n"); lastFrameHeader64 = (*( (uint32_t*) header->frameNumber))&frameIndexMask; cSize += onePacketSize; lastPacketOffset -= onePacketSize; --packetCount; while (lastFrameHeader64 == ((*( (uint32_t*) header->frameNumber))&frameIndexMask)){ cSize += onePacketSize; lastPacketOffset -= onePacketSize; header = (jfrau_packet_header_t*) (buffer[ithread]+lastPacketOffset); #ifdef DEBUG4 cprintf(RED,"new header:%d new packet:%d\n", (*( (uint32_t*) header->frameNumber))&frameIndexMask, (*( (uint8_t*) header->packetNumber))); #endif --packetCount; } memcpy(temp, buffer[ithread]+(lastPacketOffset+onePacketSize), cSize); } break; default: cprintf(RED,"Listening_Thread %d: Error: This detector %s is not implemented in the receiver\n", ithread, getDetectorType(myDetectorType).c_str()); break; } #ifdef DEBUG4 cprintf(BLUE,"Listening_Thread %d: PacketCount:%d CarryonBufferSize:%d\n",ithread, packetCount, cSize); #endif return packetCount; } void UDPStandardImplementation::startWriting(){ FILE_LOG(logDEBUG) << __AT__ << " called"; //set current thread value index int ithread = currentThreadIndex; //let calling function know thread started and obtained current threadStarted = 1; //variable definitions char* wbuf; //buffer popped from FIFO sfilefd[ithread] = 0; //file pointer uint64_t nf; //for compression, number of frames int listenfifoIndex = ithread; /* outer loop - loops once for each acquisition */ //infinite loop, exited only to change dynamic range, 10G parameters etc (then recreated again) while(true){ //--reset parameters before acquisition nf = 0; if(dataCompressionEnable) listenfifoIndex = 0; //compression has only one listening thread /* inner loop - loop for each buffer */ //until mask unset (udp sockets shut down by client) while((1 << ithread) & writerThreadsMask){ //pop fifo[listenfifoIndex]->pop(wbuf); #ifdef EVERYFIFODEBUG if(fifo[listenfifoIndex]->getSemValue()>(fifoSize-100)) cprintf(CYAN,"Fifo[%d]: value:%d, pop 0x%x\n",listenfifoIndex,fifo[listenfifoIndex]->getSemValue(),(void*)(wbuf)); #endif #ifdef DEBUG4 cprintf(GREEN,"Writing_Thread %d: Popped %p from FIFO %d\n", ithread, (void*)(wbuf),listenfifoIndex); #endif uint32_t numPackets = (uint32_t)(*((uint32_t*)wbuf)); #ifdef DEBUG4 cprintf(GREEN,"Writing_Thread %d: Number of Packets: %d for FIFO %d\n", ithread, numPackets, listenfifoIndex); #endif //end of acquisition if(numPackets == dummyPacketValue){ #ifdef DEBUG4 cprintf(GREEN,"Writing_Thread %d: Dummy frame popped out of FIFO %d",ithread, listenfifoIndex); #endif stopWriting(ithread,wbuf); continue; } //jungfrau if(myDetectorType == JUNGFRAU) handleWithoutMissingPackets(ithread, wbuf, numPackets); //normal else if(!dataCompressionEnable) handleWithoutDataCompression(ithread, wbuf, numPackets); //compression else{ #if defined(MYROOT1) && defined(ALLFILE_DEBUG) if(npackets > 0) writeFileWithoutCompression(wbuf, numPackets); #endif handleDataCompression(ithread,wbuf,nf); } }/*--end of loop for each buffer (inner loop)*/ waitWritingBufferForNextAcquisition(ithread); }/*--end of loop for each acquisition (outer loop) */ } void UDPStandardImplementation::waitWritingBufferForNextAcquisition(int ithread){ FILE_LOG(logDEBUG) << __AT__ << " called"; //in case they are not closed already closeFile(ithread); #ifdef DEBUG4 cprintf(GREEN,"Writing_Thread %d: Done with acquisition. Waiting for 1st sem to create new file/change of parameters\n", ithread); #endif //end of acquisition, wait for file create/change of parameters sem_wait(&writerSemaphore[ithread]); //check to exit thread (for change of parameters) - only EXIT possibility if(killAllWritingThreads){ cprintf(GREEN,"Writing_Thread %d:Goodbye!\n",ithread); pthread_exit(NULL); } #ifdef DEBUG4 cprintf(GREEN,"Writing_Thread %d: Got 1st post. Creating File\n", ithread); #endif //pop fifo so that its empty char* temp; while(!fifo[ithread]->isEmpty()){ cprintf(RED,"%d:fifo emptied\n", ithread); fifo[ithread]->pop(temp); fifoFree[ithread]->push(temp); #ifdef EVERYFIFODEBUG if(fifo[ithread]->getSemValue()>(fifoSize-100)) cprintf(CYAN,"Fifo[%d]: value:%d, pop 0x%x\n",ithread,fifo[ithread]->getSemValue(),(void*)(temp)); #endif } //create file if((1<push(wbuffer)); #ifdef EVERYFIFODEBUG if(fifoFree[ithread]->getSemValue()<100) cprintf(GREEN,"FifoFree[%d]: value:%d, push 0x%x\n",ithread,fifoFree[ithread]->getSemValue(),(void*)(wbuffer)); #endif #ifdef CFIFODEBUG if(ithread==0) cprintf(CYAN,"Writing_Thread %d: Freeing dummy-end buffer. Pushed into fifofree %p for listener %d\n", ithread,(void*)(wbuffer),ithread); else cprintf(YELLOW,"Writing_Thread %d: Freeing dummy-end buffer. Pushed into fifofree %p for listener %d\n", ithread,(void*)(wbuffer),ithread); #endif if(dataStreamEnable){ //ensure previous frame was processed sem_wait(&writerGuiSemaphore[ithread]); guiNumPackets[ithread] = dummyPacketValue; //let it know its got data sem_post(&dataCallbackWriterSemaphore[ithread]); } //all threads need to close file, reset mask and exit loop missingPacketinFile = (long long int)numberOfFrames*packetsPerFrame-totalWritingPacketCount[ithread]; if(myDetectorType == EIGER && fileWriteEnable && (cbAction > DO_NOTHING) && missingPacketinFile){ updateFileHeader(ithread); fseek(sfilefd[ithread],0,0); fwrite((void*)fileHeader[ithread], 1, FILE_HEADER_SIZE, sfilefd[ithread]); } if(totalWritingPacketCount[ithread]){ printf("\nThread:%d File:%s\n" //"\ttotalpacketsinfile:%d\t" "Packets Lost:%d" //"\tCurrentFrameNumber:%lld\tPreviousFrameNumber:%lld" "\n", ithread,completeFileName[ithread], //totalPacketsInFile[ithread], ( ((int)(currentFrameNumber[ithread]-frameNumberInPreviousFile[ithread])*packetsPerFrame) - totalPacketsInFile[ithread]) //,currentFrameNumber[ithread],frameNumberInPreviousFile[ithread] ); } closeFile(ithread); pthread_mutex_lock(&statusMutex); writerThreadsMask^=(1<push(wbuffer)); return; } //callback to write data if (cbAction < DO_EVERYTHING) rawDataReadyCallBack((int)tempframenumber, wbuffer + fifoBufferHeaderSize, npackets * onePacketSize, sfilefd[ithread], latestData[ithread],pRawDataReady);//know which thread from sfilefd //write to file if enabled and update write parameters if(npackets > 0) writeFileWithoutCompression(ithread, wbuffer, npackets); #ifdef DEBUG4 cprintf(GREEN,"Writing_Thread: Writing done\nGoing to copy frame\n"); #endif //copy frame for gui //if(npackets >= (packetsPerFrame/numberofListeningThreads)) if(dataStreamEnable && npackets > 0) copyFrameToGui(ithread, wbuffer,npackets); #ifdef DEBUG4 cprintf(GREEN,"Writing_Thread: Copied frame\n"); #endif //free fifo addresses int listenfifoThread = ithread; if(dataCompressionEnable) listenfifoThread = 0; while(!fifoFree[listenfifoThread]->push(wbuffer)); #ifdef EVERYFIFODEBUG if(fifoFree[listenfifoThread]->getSemValue()<100) cprintf(GREEN,"FifoFree[%d]: value:%d, push 0x%x\n",listenfifoThread,fifoFree[listenfifoThread]->getSemValue(),(void*)(wbuffer)); #endif #ifdef DEBUG5 cprintf(GREEN,"Writing_Thread %d: Freed buffer, pushed into fifofree %p for listener %d \n",listenfifoThread, (void*)(wbuffer), listenfifoThread); #endif } void UDPStandardImplementation::handleWithoutMissingPackets(int ithread, char* wbuffer, uint32_t npackets){ FILE_LOG(logDEBUG) << __AT__ << " called"; //get current frame number uint64_t tempframenumber; tempframenumber = (*((uint32_t*)(wbuffer+HEADER_SIZE_NUM_TOT_PACKETS))); //cout<<"handling: before frame number:"< 0){ if((fileWriteEnable) && (sfilefd[ithread])){ if(tempframenumber && (tempframenumber%maxFramesPerFile) == 0){ createNewFile(ithread); } fwrite(wbuffer + HEADER_SIZE_NUM_TOT_PACKETS, 1, oneDataSize*packetsPerFrame+fifoBufferHeaderSize-HEADER_SIZE_NUM_TOT_PACKETS, sfilefd[ithread]); } totalPacketsInFile[ithread] += npackets; totalWritingPacketCount[ithread] += npackets; lastFrameNumberInFile[ithread] = tempframenumber; currentFrameNumber[ithread] = tempframenumber; //cout<<"curentframenumber:"< 1) pthread_mutex_lock(&writeMutex); packetsCaught += npackets; totalPacketsCaught += npackets; if((currentFrameNumber[ithread] - startAcquisitionIndex) > acquisitionIndex) acquisitionIndex = currentFrameNumber[ithread] - startAcquisitionIndex; if((currentFrameNumber[ithread] - startFrameIndex) > frameIndex[ithread]) frameIndex[ithread] = currentFrameNumber[ithread] - startFrameIndex; if(numberofWriterThreads > 1) pthread_mutex_unlock(&writeMutex); } #ifdef DEBUG4 cprintf(GREEN,"Writing_Thread: Writing done\nGoing to copy frame\n"); #endif //copy frame for gui //if(npackets >= (packetsPerFrame/numberofListeningThreads)) if(dataStreamEnable && npackets > 0) copyFrameToGui(ithread, wbuffer,npackets); #ifdef DEBUG4 cprintf(GREEN,"Writing_Thread: Copied frame\n"); #endif //free fifo addresses int listenfifoThread = ithread; if(dataCompressionEnable) listenfifoThread = 0; while(!fifoFree[listenfifoThread]->push(wbuffer)); #ifdef EVERYFIFODEBUG if(fifoFree[listenfifoThread]->getSemValue()<100) cprintf(GREEN,"FifoFree[%d]: value:%d, push 0x%x\n",listenfifoThread,fifoFree[listenfifoThread]->getSemValue(),(void*)(wbuffer)); #endif #ifdef DEBUG5 cprintf(GREEN,"Writing_Thread %d: Freed buffer, pushed into fifofree %p for listener %d \n",listenfifoThread, (void*)(wbuffer), listenfifoThread); #endif } void UDPStandardImplementation::writeFileWithoutCompression(int ithread, char* wbuffer,uint32_t numpackets){ FILE_LOG(logDEBUG) << __AT__ << " called"; //if write enabled if((fileWriteEnable) && (sfilefd[ithread])){ if(numpackets){ int offset = fifoBufferHeaderSize; uint64_t nextFileFrameNumber; int packetsWritten = 0; //if(ithread) cout<<"numpackets:"<=0)){ //get start frame (required to create new file at the right juncture) uint64_t startframe =-1; uint32_t pnum; uint32_t snum; //if(ithread) cout<<"getting start frame number"<push(wbuffer)); return; } //if(ithread) cout<<"done getting start frame number"<=0) &&(!((lastFrameNumberInFile[ithread]+1) % maxFramesPerFile))) createNewFile(ithread); //frames to save in one file nextFileFrameNumber = (lastFrameNumberInFile[ithread]+1) + (maxFramesPerFile - ((lastFrameNumberInFile[ithread]+1)%maxFramesPerFile)); if(writeUptoFrameNumber(ithread, wbuffer, offset, nextFileFrameNumber, numpackets, packetsWritten) == FAIL) //weird frame number of zero from fpga return; //update stats numpackets -= packetsWritten; totalPacketsInFile[ithread] += packetsWritten; totalWritingPacketCount[ithread] += packetsWritten; pthread_mutex_lock(&writeMutex); packetsCaught += packetsWritten; totalPacketsCaught += packetsWritten; pthread_mutex_unlock(&writeMutex); currentFrameNumber[ithread] = lastFrameNumberInFile[ithread]; } } } //only update parameters else{ if(numpackets){ //get last frame number uint64_t finalLastFrameNumberToSave = 0; uint32_t pnum; uint32_t snum; if(getFrameandPacketNumber(ithread, wbuffer + fifoBufferHeaderSize + ((numpackets - 1) * onePacketSize), finalLastFrameNumberToSave,pnum,snum) == FAIL){ //error in frame number sent by fpga while(!fifoFree[ithread]->push(wbuffer)); return; } totalPacketsInFile[ithread] += numpackets; totalWritingPacketCount[ithread] += numpackets; lastFrameNumberInFile[ithread] = finalLastFrameNumberToSave; currentFrameNumber[ithread] = finalLastFrameNumberToSave; } if(numberofWriterThreads > 1) pthread_mutex_lock(&writeMutex); packetsCaught += numpackets; totalPacketsCaught += numpackets; if(numberofWriterThreads > 1) pthread_mutex_unlock(&writeMutex); } //set indices pthread_mutex_lock(&progressMutex); if((currentFrameNumber[ithread] - startAcquisitionIndex) > acquisitionIndex) acquisitionIndex = currentFrameNumber[ithread] - startAcquisitionIndex; if((currentFrameNumber[ithread] - startFrameIndex) > frameIndex[ithread]) frameIndex[ithread] = currentFrameNumber[ithread] - startFrameIndex; pthread_mutex_unlock(&progressMutex); } void UDPStandardImplementation::updateFileHeader(int ithread){ //update file header time_t t = time(0); sprintf(fileHeader[ithread], "\nHeader\t\t %d bytes\n" "Top\t\t %d\n" "Left\t\t %d\n" "Active\t\t %d\n" "Packets Lost\t %d\n" "Dynamic Range\t %d\n" "Ten Giga\t %d\n" "Packet\t\t %d bytes\n" "Data\t\t %d bytes\n" "x\t\t %d pixels\n" "y\t\t %d pixels\n" "Timestamp\t %s\n\n" //only for eiger right now "#Packet Header\n" "Sub Frame Number 4 bytes\n" "Unused\t\t 2 bytes\n" "Port Number\t 1 byte\n" "Unused\t\t 1 byte\n\n" "#Packet Footer\n" "Frame Number\t 6 bytes\n" "Packet Number\t 2 bytes\n", FILE_HEADER_SIZE, (bottomEnable?0:1),(ithread?0:1), activated, missingPacketinFile, dynamicRange,tengigaEnable, onePacketSize,oneDataSize, //only for eiger right now EIGER_PIXELS_IN_ONE_ROW,EIGER_PIXELS_IN_ONE_COL, ctime(&t)); if(strlen(fileHeader[ithread]) > FILE_HEADER_SIZE) cprintf(BG_RED,"File Header Size is too small for file header\n"); } //called only if datacallback enabled void UDPStandardImplementation::copyFrameToGui(int ithread, char* buffer, uint32_t numpackets){ FILE_LOG(logDEBUG) << __AT__ << " called"; //if nthe frame, wait for your turn (1st frame always shown as its zero) if(frameToGuiFrequency && ((frametoGuiCounter[ithread])%frameToGuiFrequency)); //random read (gui ready) or nth frame read: gui needs data now or it is the first frame else{ #ifdef DEBUG4 cprintf(GREEN,"Writing_Thread: CopyingFrame: Going to copy data\n"); #endif //ensure previous frame was processed sem_wait(&writerGuiSemaphore[ithread]); //copy date guiNumPackets[ithread] = numpackets; strcpy(guiFileName[ithread],completeFileName[ithread]); if(myDetectorType == JUNGFRAU) //copy also the header memcpy(latestData[ithread],buffer+HEADER_SIZE_NUM_TOT_PACKETS, numpackets*onePacketSize+fifoBufferHeaderSize-HEADER_SIZE_NUM_TOT_PACKETS); else //copy only the data memcpy(latestData[ithread],buffer+ fifoBufferHeaderSize , numpackets*onePacketSize); //let it know its got data sem_post(&dataCallbackWriterSemaphore[ithread]); #ifdef DEBUG4 cprintf(GREEN,"Writing_Thread: CopyingFrame: Copied Data\n"); #endif } //update the counter for nth frame if(frameToGuiFrequency) frametoGuiCounter[ithread]++; } void UDPStandardImplementation::handleDataCompression(int ithread, char* wbuffer, uint64_t &nf){ FILE_LOG(logDEBUG) << __AT__ << " called"; //get frame number uint64_t tempframenumber=-1; uint32_t pnum; uint32_t snum; if(getFrameandPacketNumber(ithread, wbuffer + fifoBufferHeaderSize, tempframenumber,pnum,snum) == FAIL){ //error in frame number sent by fpga while(!fifoFree[ithread]->push(wbuffer)); return; } currentFrameNumber[ithread] = tempframenumber; //set indices pthread_mutex_lock(&progressMutex); if((currentFrameNumber[ithread] - startAcquisitionIndex) > acquisitionIndex) acquisitionIndex = currentFrameNumber[ithread] - startAcquisitionIndex; if((currentFrameNumber[ithread] - startFrameIndex) > frameIndex[ithread]) frameIndex[ithread] = currentFrameNumber[ithread] - startFrameIndex; pthread_mutex_unlock(&progressMutex); //variable definitions char* buff[2]={0,0}; //an array just to be compatible with copyframetogui char* data = wbuffer+ fifoBufferHeaderSize; //data pointer to the next memory to be analysed int ndata; //size of data returned uint32_t np; //remaining number of packets returned uint32_t npackets = (uint32_t)(*((uint32_t*)wbuffer)); //number of total packets int remainingsize = npackets * onePacketSize; //size of the memory slot to be analyzed eventType thisEvent = PEDESTAL; int once = 0; int xmax = 0, ymax = 0; //max pixels in x and y direction int xmin = 1, ymin = 1; //min pixels in x and y direction double tot, tl, tr, bl, br; //determining xmax and ymax switch(myDetectorType){ case MOENCH: xmax = MOENCH_PIXELS_IN_ONE_ROW-1; ymax = MOENCH_PIXELS_IN_ONE_ROW-1; break; case GOTTHARD: if(shortFrameEnable == -1){ xmax = GOTTHARD_PIXELS_IN_ROW-1; ymax = GOTTHARD_PIXELS_IN_COL-1; }else{ xmax = GOTTHARD_SHORT_PIXELS_IN_ROW-1; ymax = GOTTHARD_SHORT_PIXELS_IN_COL-1; } break; default: break; } while(buff[0] = receiverData[ithread]->findNextFrame(data,ndata,remainingsize)){ //remaining number of packets np = ndata/onePacketSize; if ((np == packetsPerFrame) && (buff[0]!=NULL)){ if(nf == 1000) cprintf(GREEN, "Writing_Thread %d: pedestal done\n", ithread); singlePhotonDetectorObject[ithread]->newFrame(); //only for moench if(commonModeSubtractionEnable){ for(int ix = xmin - 1; ix < xmax+1; ix++){ for(int iy = ymin - 1; iy < ymax+1; iy++){ thisEvent = singlePhotonDetectorObject[ithread]->getEventType(buff[0], ix, iy, 0); } } } for(int ix = xmin - 1; ix < xmax+1; ix++) for(int iy = ymin - 1; iy < ymax+1; iy++){ thisEvent=singlePhotonDetectorObject[ithread]->getEventType(buff[0], ix, iy, commonModeSubtractionEnable); if (nf>1000) { tot=0; tl=0; tr=0; bl=0; br=0; if (thisEvent==PHOTON_MAX) { receiverData[ithread]->getFrameNumber(buff[0]); //iFrame=receiverData[ithread]->getFrameNumber(buff); #ifdef MYROOT1 myTree[ithread]->Fill(); //cout << "Fill in event: frmNr: " << iFrame << " ix " << ix << " iy " << iy << " type " << thisEvent << endl; #else pthread_mutex_lock(&writeMutex); if((fileWriteEnable) && (sfilefd[0])) singlePhotonDetectorObject[ithread]->writeCluster(sfilefd[0]); pthread_mutex_unlock(&writeMutex); #endif } } } nf++; #ifndef ALLFILE totalPacketsInFile[ithread] += (bufferSize/packetsPerFrame); totalWritingPacketCount[ithread] += (bufferSize/packetsPerFrame); pthread_mutex_lock(&writeMutex); if((packetsCaught%packetsPerFrame) >= (uint32_t)maxFramesPerFile) createNewFile(ithread); packetsCaught += (bufferSize/packetsPerFrame); totalPacketsCaught += (bufferSize/packetsPerFrame); pthread_mutex_unlock(&writeMutex); #endif if(!once){ if(dataStreamEnable) copyFrameToGui(ithread, buff[0],(uint32_t)packetsPerFrame); once = 1; } } remainingsize -= ((buff[0] + ndata) - data); data = buff[0] + ndata; if(data > (wbuffer + fifoBufferHeaderSize + npackets * onePacketSize) ) cprintf(BG_RED,"Writing_Thread %d: Error: Compression data goes out of bounds!\n", ithread); } while(!fifoFree[0]->push(wbuffer)); #ifdef EVERYFIFODEBUG if(fifoFree[0]->getSemValue()<100) cprintf(GREEN,"FifoFree[%d]: value:%d, push 0x%x\n",0,fifoFree[0]->getSemValue(),(void*)(wbuffer)); #endif #ifdef DEBUG5 cprintf(GREEN,"Writing_Thread %d: Compression free pushed into fifofree %p for listerner 0\n", ithread, (void*)(wbuffer)); #endif } int UDPStandardImplementation::getFrameandPacketNumber(int ithread, char* wbuffer, uint64_t &framenumber, uint32_t &packetnumber,uint32_t &subframenumber){ FILE_LOG(logDEBUG) << __AT__ << " called"; eiger_packet_footer_t* footer=0; eiger_packet_header_t* e_header=0; jfrau_packet_header_t* header=0; framenumber = 0; packetnumber = 0; subframenumber = 0; switch(myDetectorType){ case EIGER: footer = (eiger_packet_footer_t*)(wbuffer + footerOffset); framenumber = (uint32_t)(*( (uint64_t*) footer)); //error in frame number sent by fpga if(!((uint32_t)(*( (uint64_t*) footer)))){ framenumber = -1; FILE_LOG(logERROR) << "Fifo "<< ithread << ": Frame Number is zero from firmware."; return FAIL; } packetnumber = (*( (uint16_t*) footer->packetNumber))-1; e_header = (eiger_packet_header_t*) (wbuffer); subframenumber = *( (uint32_t*) e_header->subFrameNumber); #ifdef DEBUG4 if(!ithread) cprintf(GREEN,"Writing_Thread %d: fnum:%lld pnum:%d FPGA_fnum:%d subfnum:%d footeroffset:%d\n", ithread, (long long int)framenumber, packetnumber, framenumber, subframenumber, footerOffset); #endif framenumber += (startFrameIndex - 1); break; case JUNGFRAU: header = (jfrau_packet_header_t*)(wbuffer); framenumber = (*( (uint32_t*) header->frameNumber))&frameIndexMask; packetnumber = (uint32_t)(*( (uint8_t*) header->packetNumber)); #ifdef DEBUG4 cprintf(GREEN, "Writing_Thread %d: fnum:%lld\t pnum:%d\n", (long long int)framenumber, packetnumber); #endif framenumber += startFrameIndex; break; default: framenumber = ((uint32_t)(*((uint32_t*)(wbuffer)))); //for gotthard and normal frame, increment frame number to separate fnum and pnum if (myDetectorType == PROPIX ||(myDetectorType == GOTTHARD && shortFrameEnable == -1)) framenumber++; packetnumber = framenumber&packetIndexMask; framenumber = (framenumber & frameIndexMask) >> frameIndexOffset; #ifdef DEBUG4 cprintf(GREEN, "Writing_Thread %d: fnum:%lld\t pnum:%d\n", (long long int)framenumber, packetnumber); #endif framenumber += startFrameIndex; break; } return OK; } int UDPStandardImplementation::writeUptoFrameNumber(int ithread, char* wbuffer, int &offset, uint64_t nextFrameNumber, uint32_t numpackets, int &numPacketsWritten){ FILE_LOG(logDEBUG) << __AT__ << " called"; //if(ithread) cout<<"at writeUptoFrameNumber " << nextFrameNumber<< endl; int startoffset = offset; int endoffset = startoffset + numpackets * onePacketSize; uint64_t tempframenumber=-1; offset = endoffset; uint32_t pnum; uint32_t snum; //get last frame number if(getFrameandPacketNumber(ithread, wbuffer + (endoffset-onePacketSize), tempframenumber,pnum,snum) == FAIL){ //error in frame number sent by fpga while(!fifoFree[ithread]->push(wbuffer)); return FAIL; } //last packet's frame number < nextframenumber if(tempframenumber=nextFrameNumber){ offset -= bigIncrements; if(offsetpush(wbuffer)); return FAIL; } } if(offsetpush(wbuffer)); return FAIL; } } while(tempframenumberpush(wbuffer)); return FAIL; } } fwrite(wbuffer + startoffset, 1, offset-startoffset, sfilefd[ithread]); numPacketsWritten += ((offset-startoffset)/onePacketSize); lastFrameNumberInFile[ithread] = (nextFrameNumber-1); //if(ithread) cout<<"done with writeUptoFrameNumber" << endl; return OK; }