/********************************************//** * @file UDPStandardImplementation.cpp * @short does all the functions for a receiver, set/get parameters, start/stop etc. ***********************************************/ #include "UDPStandardImplementation.h" #include "moench02ModuleData.h" #include "gotthardModuleData.h" #include "gotthardShortModuleData.h" #include // SIGINT #include // stat #include // socket(), bind(), listen(), accept(), shut down #include // sock_addr_in, htonl, INADDR_ANY #include // exit() #include //set precision #include //munmap #include #include using namespace std; #define WRITE_HEADERS UDPStandardImplementation::UDPStandardImplementation() //: //thread_started(0), //eth(NULL), //latestData(NULL), //guiFileName(NULL), //guiFrameNumber(0), //tengigaEnable(0) { thread_started = 0; eth = NULL; latestData = NULL; guiFileName = NULL; guiFrameNumber = NULL; tengigaEnable = 0; for(int i=0;i /proc/sys/net/core/rmem_max")) cout << "\nWARNING: Could not change socket receiver buffer size in file /proc/sys/net/core/rmem_max" << endl; else if(system("echo 250000 > /proc/sys/net/core/netdev_max_backlog")) cout << "\nWARNING: Could not change max length of input queue in file /proc/sys/net/core/netdev_max_backlog" << endl; /** permanent setting heiner net.core.rmem_max = 104857600 # 100MiB net.core.netdev_max_backlog = 250000 sysctl -p // from the manual sysctl -w net.core.rmem_max=16777216 sysctl -w net.core.netdev_max_backlog=250000 */ } void UDPStandardImplementation::configure(map config_map){ FILE_LOG(logWARNING) << __AT__ << " called"; map::const_iterator pos; pos = config_map.find("mode"); if (pos != config_map.end() ){ int b; if(!sscanf(pos->second.c_str(), "%d", &b)){ cout << "Warning: Could not parse mode. Assuming top mode." << endl; b = 0; } bottom = b!= 0; cout << "bottom:"<< bottom << endl; } }; void UDPStandardImplementation::initializeMembers(){ myDetectorType = GENERIC; maxPacketsPerFile = 0; enableFileWrite = 1; overwrite = 1; fileIndex = 0; scanTag = 0; frameIndexNeeded = 0; acqStarted = false; measurementStarted = false; startFrameIndex = 0; frameIndex = 0; packetsCaught = 0; totalPacketsCaught = 0; packetsInFile = 0; numTotMissingPackets = 0; numTotMissingPacketsInFile = 0; numMissingPackets = 0; startAcquisitionIndex = 0; acquisitionIndex = 0; packetsPerFrame = 0; frameIndexMask = 0; packetIndexMask = 0; frameIndexOffset = 0; acquisitionPeriod = SAMPLE_TIME_IN_NS; numberOfFrames = 0; dynamicRange = 16; shortFrame = -1; currframenum = 0; prevframenum = 0; frameSize = 0; bufferSize = 0; onePacketSize = 0; guiDataReady = 0; nFrameToGui = 0; fifosize = 0; numJobsPerThread = -1; dataCompression = false; numListeningThreads = 1; numWriterThreads = 1; thread_started = 0; currentListeningThreadIndex = -1; currentWriterThreadIndex = -1; for(int i=0;i=0) fileIndex = i; return getFileIndex(); } */ /* int UDPStandardImplementation::setFrameIndexNeeded(int i){ frameIndexNeeded = i; return frameIndexNeeded; } */ /* int UDPStandardImplementation::getEnableFileWrite() const{ return enableFileWrite; } */ /* int UDPStandardImplementation::setEnableFileWrite(int i){ enableFileWrite=i; return getEnableFileWrite(); } */ /* int UDPStandardImplementation::getEnableOverwrite() const{ return overwrite; } */ /* int UDPStandardImplementation::setEnableOverwrite(int i){ overwrite=i; return getEnableOverwrite(); } */ /*other parameters*/ slsReceiverDefs::runStatus UDPStandardImplementation::getStatus() const{ FILE_LOG(logDEBUG) << __AT__ << " called, status: " << status; return status; } void UDPStandardImplementation::initialize(const char *detectorHostName){ if(strlen(detectorHostName)) strcpy(detHostname,detectorHostName); } char *UDPStandardImplementation::getDetectorHostname() const{ return (char*)detHostname; } void UDPStandardImplementation::setEthernetInterface(char* c){ FILE_LOG(logDEBUG) << __AT__ << " called"; strcpy(eth,c); } void UDPStandardImplementation::setUDPPortNo(int p){ FILE_LOG(logDEBUG) << __AT__ << " called"; server_port[0] = p; } void UDPStandardImplementation::setUDPPortNo2(int p){ FILE_LOG(logDEBUG) << __AT__ << " called"; server_port[1] = p; } int UDPStandardImplementation::getNumberOfFrames() const { return numberOfFrames; } int32_t UDPStandardImplementation::setNumberOfFrames(int32_t fnum){ FILE_LOG(logDEBUG) << __AT__ << " called"; if(fnum >= 0) numberOfFrames = fnum; return getNumberOfFrames(); } int UDPStandardImplementation::getScanTag() const{ return scanTag; } int32_t UDPStandardImplementation::setScanTag(int32_t stag){ FILE_LOG(logDEBUG) << __AT__ << " called"; if(stag >= 0) scanTag = stag; return getScanTag(); } int UDPStandardImplementation::getDynamicRange() const{ return dynamicRange; } int32_t UDPStandardImplementation::setDynamicRange(int32_t dr){ FILE_LOG(logDEBUG) << __AT__ << " called"; int olddr = dynamicRange; if(dr >= 0){ cout << "Setting Dynamic Range to " << dr << endl; dynamicRange = dr; if(myDetectorType == EIGER){ if(!tengigaEnable){ packetsPerFrame = EIGER_ONE_GIGA_CONSTANT * dynamicRange * EIGER_MAX_PORTS; onePacketSize = EIGER_ONE_GIGA_ONE_PACKET_SIZE; }else{ packetsPerFrame = EIGER_TEN_GIGA_CONSTANT * dynamicRange * EIGER_MAX_PORTS; onePacketSize = EIGER_TEN_GIGA_ONE_PACKET_SIZE; } frameSize = onePacketSize * packetsPerFrame; bufferSize = (frameSize/EIGER_MAX_PORTS) + EIGER_HEADER_LENGTH;//everything one port gets (img header plus packets) maxPacketsPerFile = EIGER_MAX_FRAMES_PER_FILE * packetsPerFrame; if(olddr != dr){ //del if(thread_started){ createListeningThreads(true); createWriterThreads(true); } for(int i=0;i=0){ nFrameToGui = i; setupFifoStructure(); } return nFrameToGui; } int64_t UDPStandardImplementation::setAcquisitionPeriod(int64_t index){ FILE_LOG(logDEBUG) << __AT__ << " called"; if(index >= 0){ if(index != acquisitionPeriod){ acquisitionPeriod = index; setupFifoStructure(); } } return acquisitionPeriod; } bool UDPStandardImplementation::getDataCompression(){ FILE_LOG(logDEBUG) << __AT__ << " called"; return dataCompression;} int UDPStandardImplementation::enableDataCompression(bool enable){ FILE_LOG(logDEBUG) << __AT__ << " called"; cout << "Data compression "; if(enable) cout << "enabled" << endl; else cout << "disabled" << endl; #ifdef MYROOT1 cout << " WITH ROOT" << endl; #else cout << " WITHOUT ROOT" << endl; #endif //delete filter for the current number of threads deleteFilter(); dataCompression = enable; pthread_mutex_lock(&status_mutex); writerthreads_mask = 0x0; pthread_mutex_unlock(&(status_mutex)); createWriterThreads(true); if(enable) numWriterThreads = MAX_NUM_WRITER_THREADS; else numWriterThreads = 1; if(createWriterThreads() == FAIL){ cprintf(BG_RED,"ERROR: Could not create writer threads\n"); return FAIL; } setThreadPriorities(); if(enable) setupFilter(); return OK; } /*other functions*/ void UDPStandardImplementation::deleteFilter(){ FILE_LOG(logDEBUG) << __AT__ << " called"; int i; cmSub=NULL; for(i=0;i(receiverdata[i], csize, sigma, sign, cmSub); } //LEO: it is not clear to me.. void UDPStandardImplementation::setupFifoStructure(){ FILE_LOG(logDEBUG) << __AT__ << " called"; int64_t i; int oldn = numJobsPerThread; //if every nth frame mode if(nFrameToGui) numJobsPerThread = nFrameToGui; //random nth frame mode else{ if(!acquisitionPeriod) i = SAMPLE_TIME_IN_NS; else i = SAMPLE_TIME_IN_NS/acquisitionPeriod; if (i > MAX_JOBS_PER_THREAD) numJobsPerThread = MAX_JOBS_PER_THREAD; else if (i < 1) numJobsPerThread = 1; else numJobsPerThread = i; } //if same, return if(oldn == numJobsPerThread) return; if(myDetectorType == EIGER) numJobsPerThread = 1; //otherwise memory too much if numjobsperthread is at max = 1000 fifosize = GOTTHARD_FIFO_SIZE; if(myDetectorType == MOENCH) fifosize = MOENCH_FIFO_SIZE; if(myDetectorType == PROPIX) fifosize = PROPIX_FIFO_SIZE; else if(myDetectorType == EIGER) fifosize = EIGER_FIFO_SIZE * packetsPerFrame; if(fifosize % numJobsPerThread) fifosize = (fifosize/numJobsPerThread)+1; else fifosize = fifosize/numJobsPerThread; if(myDetectorType == EIGER) cout << "1 packet per buffer" << endl; else cout << "Number of Frames per buffer:" << numJobsPerThread << endl; #ifdef VERBOSE cout << "Fifo Size:" << fifosize << endl; #endif /* //for testing numJobsPerThread = 3; fifosize = 11; */ for(int i=0;iisEmpty()) fifoFree[i]->pop(buffer[i]); #ifdef FIFO_DEBUG //cprintf(GREEN,"%d fifostructure popped from fifofree %x\n", i, (void*)(buffer[i])); #endif delete fifoFree[i]; } if(fifo[i]) delete fifo[i]; if(mem0[i]) free(mem0[i]); fifoFree[i] = new CircularFifo(fifosize); fifo[i] = new CircularFifo(fifosize); int whatperbuffer = bufferSize; if(myDetectorType == EIGER) whatperbuffer = onePacketSize; //allocate memory mem0[i]=(char*)malloc((whatperbuffer * numJobsPerThread + HEADER_SIZE_NUM_TOT_PACKETS)*fifosize); /** shud let the client know about this */ if (mem0[i]==NULL){ cprintf(BG_RED,"++++++++++++++++++++++ COULD NOT ALLOCATE MEMORY FOR LISTENING !!!!!!!+++++++++++++++++++++\n"); exit(-1); } buffer[i]=mem0[i]; //push the addresses into freed fifoFree and writingFifoFree while (buffer[i]<(mem0[i]+(whatperbuffer * numJobsPerThread + HEADER_SIZE_NUM_TOT_PACKETS)*(fifosize-1))) { fifoFree[i]->push(buffer[i]); #ifdef FIFO_DEBUG cprintf(BLUE,"%d fifostructure free pushed into fifofree %x\n", i, (void*)(buffer[i])); #endif buffer[i]+=(whatperbuffer * numJobsPerThread + HEADER_SIZE_NUM_TOT_PACKETS); } } cout << "Fifo structure(s) reconstructed" << endl; } /** acquisition functions */ void UDPStandardImplementation::readFrame(char* c,char** raw, uint32_t &fnum, uint32_t &startAcquisitionIndex, uint32_t &startFrameIndex){ FILE_LOG(logDEBUG) << __AT__ << " called"; //point to gui data if (guiData == NULL){ guiData = latestData; #ifdef VERY_VERY_DEBUG cprintf(CYAN,"gui data not null anymore\n"); #endif } //copy data and filename strcpy(c,guiFileName); fnum = guiFrameNumber; startAcquisitionIndex = getStartAcquisitionIndex(); startFrameIndex = getStartFrameIndex(); //could not get gui data if(!guiDataReady){ #ifdef VERY_VERY_DEBUG cprintf(CYAN,"gui data not ready\n"); #endif *raw = NULL; } //data ready, set guidata to receive new data else{ #ifdef VERY_VERY_DEBUG cprintf(CYAN,"gui data ready\n"); #endif *raw = guiData; guiData = NULL; /*pthread_mutex_lock(&dataReadyMutex); WHY WAS THIS HERE IN THE FIRST PLACE guiDataReady = 0; pthread_mutex_unlock(&dataReadyMutex);*/ if((nFrameToGui) && (writerthreads_mask)){ #ifdef VERY_VERY_DEBUG cprintf(CYAN,"gonna post\n"); #endif /*if(nFrameToGui){*/ //release after getting data sem_post(&smp); } #ifdef VERY_VERY_DEBUG cprintf(CYAN,"done post\n"); #endif } } void UDPStandardImplementation::copyFrameToGui(char* startbuf[], uint32_t fnum, char* buf){ FILE_LOG(logDEBUG) << __AT__ << " called"; #ifdef VERY_VERY_DEBUG cout << "copyframe" << endl; #endif //random read when gui not ready , also command line doesnt have nthframetogui //else guidata always null as guidataready is always 1 after 1st frame, and seccond data never gets copied if((!nFrameToGui) && (!guiData)){ #ifdef VERY_VERY_DEBUG cprintf(GREEN,"doing nothing\n"); #endif pthread_mutex_lock(&dataReadyMutex); guiDataReady=0; pthread_mutex_unlock(&dataReadyMutex); } //random read or nth frame read, gui needs data now or it is the first frame else{ #ifdef VERY_VERY_DEBUG cprintf(GREEN,"gui needs data now or 1st frame\n"); #endif pthread_mutex_lock(&dataReadyMutex); guiDataReady=0; #ifdef VERY_VERY_DEBUG cprintf(GREEN,"guidataready is 0, copying data\n"); #endif //eiger if(startbuf != NULL){ for(int j=0;jgetErrorStatus(); if(!iret) cout << "UDP port opened at port " << port[i] << endl; else{ #ifdef VERBOSE cprintf(BG_RED,"Could not create UDP socket on port %d error: %d\n", port[i], iret); #endif shutDownUDPSockets(); return FAIL; } } return OK; } int UDPStandardImplementation::shutDownUDPSockets(){ FILE_LOG(logDEBUG) << __AT__ << " called"; for(int i=0;iShutDownSocket(); delete udpSocket[i]; udpSocket[i] = NULL; } } return OK; } // TODO: add a destroyListeningThreads int UDPStandardImplementation::createListeningThreads(bool destroy){ FILE_LOG(logDEBUG) << __AT__ << " called"; int i; void* status; killAllListeningThreads = 0; pthread_mutex_lock(&status_mutex); listeningthreads_mask = 0x0; pthread_mutex_unlock(&(status_mutex)); FILE_LOG(logDEBUG) << "Starting " << __func__ << endl; if(!destroy){ //start listening threads cout << "Creating Listening Threads(s)"; currentListeningThreadIndex = -1; for(i = 0; i < numListeningThreads; ++i){ sem_init(&listensmp[i],1,0); thread_started = 0; currentListeningThreadIndex = i; if(pthread_create(&listening_thread[i], NULL,startListeningThread, (void*) this)){ cout << "Could not create listening thread with index " << i << endl; return FAIL; } while(!thread_started); cout << "."; cout << flush; } #ifdef VERBOSE cout << "Listening thread(s) created successfully." << endl; #else cout << endl; #endif }else{ cout<<"Destroying Listening Thread(s)"<initEventTree(temp, &iframe); //resets the pedestalSubtraction array and the commonModeSubtraction singlePhotonDet[ithr]->newDataSet(); if(myFile[ithr]==NULL){ cout<<"file null"<IsOpen()){ cout<<"file not open"< DO_NOTHING){ //close if(sfilefd){ fclose(sfilefd); sfilefd = NULL; } //open file if(!overwrite){ if (NULL == (sfilefd = fopen((const char *) (savefilename), "wx"))){ cprintf(BG_RED,"Error: Could not create new file %s\n",savefilename); return FAIL; } }else if (NULL == (sfilefd = fopen((const char *) (savefilename), "w"))){ cprintf(BG_RED,"Error: Could not create file %s\n",savefilename); return FAIL; } //setting buffer setvbuf(sfilefd,NULL,_IOFBF,BUF_SIZE); //printing packet losses and file names if(!packetsCaught) cout << savefilename << endl; else{ cout << savefilename << "\tpacket loss " << setw(4)<GetCurrentFile(); if(myFile[ithr]->Write()) //->Write(tall->GetName(),TObject::kOverwrite); cout << "Thread " << ithr <<": wrote frames to file" << endl; else cout << "Thread " << ithr << ": could not write frames to file" << endl; }else cout << "Thread " << ithr << ": could not write frames to file: No file or No Tree" << endl; //close file if(myTree[ithr] && myFile[ithr]) myFile[ithr] = myTree[ithr]->GetCurrentFile(); if(myFile[ithr] != NULL) myFile[ithr]->Close(); myFile[ithr] = NULL; myTree[ithr] = NULL; pthread_mutex_unlock(&write_mutex); #endif } } /** * Pre: * Post: eiger req. time for 32bit before acq start * */ int UDPStandardImplementation::startReceiver(char message[]){ FILE_LOG(logDEBUG) << __AT__ << " called"; int i; // #ifdef VERBOSE cout << "Starting Receiver" << endl; //#endif //reset listening thread variables measurementStarted = false; //should be set to zero as its added to get next start frame indices for scans for eiger if(!acqStarted) currframenum = 0; startFrameIndex = 0; for(int i = 0; i < numListeningThreads; ++i) totalListeningFrameCount[i] = 0; //udp socket if(createUDPSockets() == FAIL){ strcpy(message,"Could not create UDP Socket(s).\n"); cout << endl << message << endl; return FAIL; } cout << "UDP socket(s) created successfully." << endl; if(setupWriter() == FAIL){ //stop udp socket shutDownUDPSockets(); sprintf(message,"Could not create file %s.\n",savefilename); return FAIL; } cout << "Successfully created file(s)" << endl; //done to give the gui some proper name instead of always the last file name if(dataCompression) sprintf(savefilename, "%s/%s_fxxx_%d_xx.root", filePath,fileName,fileIndex); //initialize semaphore sem_init(&smp,1,0); //status pthread_mutex_lock(&status_mutex); status = RUNNING; for(i=0;istartListening(); return this_pointer; } void* UDPStandardImplementation::startWritingThread(void* this_pointer){ FILE_LOG(logDEBUG) << __AT__ << " called"; ((UDPStandardImplementation*)this_pointer)->startWriting(); return this_pointer; } int UDPStandardImplementation::startListening(){ FILE_LOG(logDEBUG) << __AT__ << " called"; int ithread = currentListeningThreadIndex; #ifdef VERYVERBOSE cprintf(BLUE, "In startListening()\n "); #endif thread_started = 1; int total; int lastpacketoffset, expected, rc,packetcount, maxBufferSize, carryonBufferSize; uint32_t lastframeheader;// for moench to check for all the packets in last frame char* tempchar = NULL; uint32_t prenum=0; while(1){ //variables that need to be checked/set before each acquisition carryonBufferSize = 0; maxBufferSize = bufferSize * numJobsPerThread; #ifdef VERYDEBUG cprintf(BLUE, "%d maxBufferSize:%d carryonBufferSize:%d\n", ithread,maxBufferSize,carryonBufferSize); #endif //missing packets compensation in listening thread if(tempchar) {delete [] tempchar;tempchar = NULL;} if(myDetectorType != EIGER) tempchar = new char[onePacketSize * ((packetsPerFrame/numListeningThreads) - 1)]; //gotthard: 1packet size, moench:39 packet size else maxBufferSize = 0; while((1<pop(buffer[ithread]); #ifdef FIFO_DEBUG cprintf(BLUE,"%d listener popped from fifofree %x\n", ithread, (void*)(buffer[ithread])); #endif //ensure udpsocket exists if(udpSocket[ithread] == NULL){ rc = 0; cprintf(BLUE, "%d UDP Socket is NULL\n",ithread); } //normal listening else if(!carryonBufferSize){ #ifdef SOCKET_DEBUG if(!ithread){ #endif rc = udpSocket[ithread]->ReceiveDataOnly(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS, maxBufferSize); if(rc == EIGER_HEADER_LENGTH && myDetectorType == EIGER) { while(rc == EIGER_HEADER_LENGTH){ rc = udpSocket[ithread]->ReceiveDataOnly(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS, maxBufferSize); /*cprintf(MAGENTA,"%d got a header*****************************\n",ithread); cprintf(MAGENTA,"tempframenum[%d]:%d\n",ithread,(htonl(*(uint32_t*)(((eiger_image_header *)((char*)(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS)))->fnum)))); */} } /* if(rc == 1040){ cprintf(CYAN,"tempframenum[%d]:%d\n",ithread,((*(uint32_t*)(((eiger_packet_header *)((char*)(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS)))->num1)))); cprintf(CYAN,"packetnum[%d]:%d\n",ithread,((*(uint8_t*)(((eiger_packet_header *)((char*)(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS)))->num4)))); cprintf(CYAN,"add[%d]:0x%x\n",ithread,(void*)(buffer[ithread])); }*/ expected = maxBufferSize; #ifdef SOCKET_DEBUG }else{ while(1) usleep(100000000); } #endif } //the remaining packets from previous buffer, copy it and listen to n less frame else{ #ifdef VERYDEBUG cprintf(BLUE, "%d carry on buffer size:%d\n",ithread,carryonBufferSize); cprintf(BLUE, "%d framennum in tempchar:%d\n",((((uint32_t)(*((uint32_t*)tempchar))) & (frameIndexMask)) >> frameIndexOffset)); cprintf(BLUE, "%d tempchar packet:%d\n", ((((uint32_t)(*((uint32_t*)(tempchar))))) & (packetIndexMask))); #endif memcpy(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS, tempchar, carryonBufferSize); rc = udpSocket[ithread]->ReceiveDataOnly((buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS + carryonBufferSize),maxBufferSize - carryonBufferSize); expected = maxBufferSize - carryonBufferSize; } #ifdef EIGER_DEBUG cprintf(BLUE, "%d rc: %d. expected: %d\n", ithread, rc, expected); #endif //start indices for each start of scan/acquisition if((!measurementStarted) && (rc > 0)){ pthread_mutex_lock(&progress_mutex); if(!measurementStarted) startFrameIndices(ithread, rc); pthread_mutex_unlock(&progress_mutex); } //problem in receiving or end of acquisition if (status == TRANSMITTING){ stopListening(ithread,rc,packetcount,total); continue; } //reset packetcount = (packetsPerFrame/numListeningThreads) * numJobsPerThread; carryonBufferSize = 0; //check if last packet valid and calculate packet count switch(myDetectorType){ case MOENCH: lastpacketoffset = (((numJobsPerThread * packetsPerFrame - 1) * onePacketSize) + HEADER_SIZE_NUM_TOT_PACKETS); #ifdef VERYDEBUG cout <<"first packet:"<< ((((uint32_t)(*((uint32_t*)(buffer[ithread]+HEADER_SIZE_NUM_TOT_PACKETS))))) & (packetIndexMask)) << endl; cout <<"first header:"<< (((((uint32_t)(*((uint32_t*)(buffer[ithread]+HEADER_SIZE_NUM_TOT_PACKETS))))) & (frameIndexMask)) >> frameIndexOffset) << endl; cout << "last packet offset:" << lastpacketoffset << endl; cout <<"last packet:"<< ((((uint32_t)(*((uint32_t*)(buffer[ithread]+lastpacketoffset))))) & (packetIndexMask)) << endl; cout <<"last header:"<< (((((uint32_t)(*((uint32_t*)(buffer[ithread]+lastpacketoffset))))) & (frameIndexMask)) >> frameIndexOffset) << endl; #endif //moench last packet value is 0 if( ((((uint32_t)(*((uint32_t*)(buffer[ithread]+lastpacketoffset))))) & (packetIndexMask))){ lastframeheader = ((((uint32_t)(*((uint32_t*)(buffer[ithread]+lastpacketoffset))))) & (frameIndexMask)) >> frameIndexOffset; carryonBufferSize += onePacketSize; lastpacketoffset -= onePacketSize; --packetcount; while (lastframeheader == (((((uint32_t)(*((uint32_t*)(buffer[ithread]+lastpacketoffset))))) & (frameIndexMask)) >> frameIndexOffset)){ carryonBufferSize += onePacketSize; lastpacketoffset -= onePacketSize; --packetcount; } memcpy(tempchar, buffer[ithread]+(lastpacketoffset+onePacketSize), carryonBufferSize); #ifdef VERYDEBUG cout << "tempchar header:" << (((((uint32_t)(*((uint32_t*)(tempchar))))) & (frameIndexMask)) >> frameIndexOffset) << endl; cout <<"tempchar packet:"<< ((((uint32_t)(*((uint32_t*)(tempchar))))) & (packetIndexMask)) << endl; #endif } break; case GOTTHARD: case PROPIX: if(shortFrame == -1){ lastpacketoffset = (((numJobsPerThread * packetsPerFrame - 1) * onePacketSize) + HEADER_SIZE_NUM_TOT_PACKETS); #ifdef VERYDEBUG cprintf(BLUE, "%d last packet offset:%d\n",ithread, lastpacketoffset); #endif //if not last packet if((unsigned int)(packetsPerFrame -1) != ((((uint32_t)(*((uint32_t*)(buffer[ithread]+lastpacketoffset))))+1) & (packetIndexMask))){ memcpy(tempchar,buffer[ithread]+lastpacketoffset, onePacketSize); #ifdef VERYDEBUG cprintf(BLUE, "%d tempchar header:%d\n",ithread,(((((uint32_t)(*((uint32_t*)(tempchar))))+1) & (frameIndexMask)) >> frameIndexOffset)); #endif carryonBufferSize = onePacketSize; --packetcount; } } #ifdef VERYDEBUG cprintf(BLUE, "%d header:%d\n", (((((uint32_t)(*((uint32_t*)(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS))))+1) & (frameIndexMask)) >> frameIndexOffset)); #endif break; case EIGER: //because even headers might be included, so not packet count (*((uint32_t*)(buffer[ithread]))) = rc; packetcount = 1; break; default: break; } //write packet count and push #ifdef VERYDEBUG cprintf(BLUE, "%d packetcount:%d carryonbuffer:%d\n", ithread, packetcount, carryonBufferSize); #endif if(myDetectorType != EIGER) (*((uint32_t*)(buffer[ithread]))) = packetcount; totalListeningFrameCount[ithread] += packetcount; #ifdef VERYDEBUG cprintf(BLUE,"%d listener going to push fifo: 0x%x\n", ithread,(void*)(buffer[ithread])); #endif while(!fifo[ithread]->push(buffer[ithread])); /*cprintf(YELLOW,"tempframenum[%d]:%d\n",ithread,((*(uint32_t*)(((eiger_packet_header *)((char*)(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS)))->num1)))); cprintf(YELLOW,"packetnum[%d]:%d\n",ithread,((*(uint8_t*)(((eiger_packet_header *)((char*)(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS)))->num4)))); cprintf(YELLOW,"add[%d]:0x%x\n",ithread,(void*)(buffer[ithread]));*/ #ifdef FIFO_DEBUG cprintf(BLUE, "%d listener pushed into fifo %x\n",ithread, (void*)(buffer[ithread])); #endif } sem_wait(&listensmp[ithread]); //make sure its not exiting thread if(killAllListeningThreads){ cout << ithread << " good bye listening thread" << endl; if(tempchar) {delete [] tempchar;tempchar = NULL;} pthread_exit(NULL); } if(tempchar) {delete [] tempchar;tempchar = NULL;} } return OK; } int UDPStandardImplementation::startWriting(){ FILE_LOG(logDEBUG) << __AT__ << " called"; int ithread = currentWriterThreadIndex; #ifdef VERYVERBOSE cprintf(GREEN,"%d In startWriting()\n", ithread); #endif thread_started = 1; char* wbuf[numListeningThreads];//interleaved char *d=new char[bufferSize*numListeningThreads]; int xmax=0,ymax=0; int ret,i,j; bool endofacquisition; int numpackets[numListeningThreads], nf; bool fullframe[numListeningThreads],popready[numListeningThreads]; uint32_t tempframenum[numListeningThreads]; uint32_t presentframenum; uint32_t lastpacketheader[numListeningThreads], currentpacketheader[numListeningThreads]; int numberofmissingpackets[numListeningThreads]; int MAX_VALUE = 1024; char* tofree[MAX_VALUE]; char* tempbuffer[MAX_VALUE]; char* blankframe[MAX_VALUE]; int tofreeoffset[numListeningThreads]; int tempoffset[numListeningThreads]; int blankoffset; for(i=0;inum3)) = 0xFF; for(j=0;j<(onePacketSize-16);++j) (*((uint8_t*)((char*)(blankframe[i])+8+j))) = 0xFF; if ((*(uint8_t*)(((eiger_packet_header *)((char*)(blankframe[i])))->num3)) != 0xFF){ cprintf(RED,"blank frame not detected at %d: 0x%x\n",i,(*(uint8_t*)(((eiger_packet_header *)((char*)(blankframe[i])))->num3)) ); exit(-1); } #ifdef FIFO_DEBUG cprintf(GREEN,"packet %d blank frame 0x%x\n",i,(void*)(blankframe[i])); #endif } } //allow them all to be popped initially for(i=0;ipop(wbuf[i]); #ifdef FIFO_DEBUG cprintf(GREEN,"%d writer poped 0x%x from fifo %d\n", ithread, (void*)(wbuf[i]), i); #endif numpackets[i] = (uint32_t)(*((uint32_t*)wbuf[i])); #ifdef VERYDEBUG cprintf(GREEN,"%d numpackets: %d for fifo :%d\n", ithread, numpackets[i], i); #endif if(numpackets < 0){ cprintf(BG_RED,"negative numpackets[%d]%d\n",i,numpackets[i]); exit(-1); } //dont pop again if dummy packet else if(numpackets[i] == 0){ popready[i] = false; //#ifdef EIGER_DEBUG3 cprintf(GREEN,"%d Dummy frame popped out of fifo %d",ithread, i); //#endif }else{ endofacquisition = false; if(numpackets[i] == 1040){ /* cprintf(BLUE,"tempframenum[%d]:%d\n",i,((*(uint32_t*)(((eiger_packet_header *)((char*)(wbuf[i] + HEADER_SIZE_NUM_TOT_PACKETS)))->num1)))); cprintf(BLUE,"packetnum[%d]:%d\n",i,((*(uint8_t*)(((eiger_packet_header *)((char*)(wbuf[i] + HEADER_SIZE_NUM_TOT_PACKETS)))->num4)))); */}else if(numpackets[i] == EIGER_HEADER_LENGTH){ cprintf(BG_RED, "got header in writer, weirdd packetsize:%d\n",numpackets[i]); exit(-1); } if(myDetectorType == EIGER){ tofree[tofreeoffset[i]] = wbuf[i]; tofreeoffset[i]++; } } } } //END OF ACQUISITION if(endofacquisition){ //#ifdef VERYDEBUG cprintf(GREEN,"%d Both dummy frames\n", ithread); //#endif //remaining packets to be written if((myDetectorType == EIGER) && ((tempoffset[0]!=0) || (tempoffset[1]!=(packetsPerFrame/numListeningThreads)))); else{ stopWriting(ithread,wbuf); continue; } } if(myDetectorType == EIGER){ //NOT FULL FRAME if(!fullframe[0] || !fullframe[1]){ for(i=0;inum3))&0x2) cprintf(RED,"1 fifo:%d missing packet added at pnum:%d\n",i,tempoffset[i]); else cprintf(RED, "1 fifo:%d Weird at pnum:%d\n",i,tempoffset[i]); #endif if (!((*(uint8_t*)(((eiger_packet_header *)((char*)(tempbuffer[tempoffset[i]])))->num3))&0x2)){ cprintf(BG_RED, "dummy blank mismatch num4 earlier2! i:%d pnum:%d fnum:%d num3:0x%x actual num3:0x%x\n", i,tempoffset[i],tempframenum[i], (*(uint8_t*)(((eiger_packet_header *)((char*)(tempbuffer[tempoffset[i]])))->num3)), (*(uint8_t*)(((eiger_packet_header *)((char*)(blankframe[blankoffset])))->num3))); exit(-1); }else /*cprintf(GREEN, "blank packet i:%d pnum:%d fnum:%d num3:0x%x\n",i,tempoffset[i],tempframenum[i],(*(uint8_t*)(((eiger_packet_header *)((char*)(tempbuffer[tempoffset[i]])))->num3))); */ tempoffset[i] ++; blankoffset ++; } //set fullframe and dont let fifo pop over it until written fullframe[i] = true; popready[i] = false; } } //#ifdef EIGER_DEBUG3 else{ cprintf(RED, "WARNING: Got a weird packet size: %d from fifo %d\n", numpackets[i],i); continue; } //#endif } //not a full frame if(!fullframe[i]){ //update frame number //tempframenum[i] = (htonl(*(uint32_t*)(((eiger_packet_header *)((char*)(wbuf[i] + HEADER_SIZE_NUM_TOT_PACKETS)))->num1))); tempframenum[i] = ((*(uint32_t*)(((eiger_packet_header *)((char*)(wbuf[i] + HEADER_SIZE_NUM_TOT_PACKETS)))->num1))); if(!tempframenum[i]) cprintf(RED,"**VERY WEIRD frame numbers for fifo %d: %d\n",i,tempframenum[i]); tempframenum[i] += (startFrameIndex-1); //WRONG FRAME - leave if(tempframenum[i] != presentframenum){/*cout<<"wrong packet"<num1))), ((*(uint8_t*)(((eiger_packet_header *)((char*)(wbuf[i] + HEADER_SIZE_NUM_TOT_PACKETS)))->num4)))); #endif tempframenum[i] = presentframenum; //add missing packets numberofmissingpackets[i] = (LAST_PACKET_VALUE - lastpacketheader[i]); #ifdef VERYDEBUG if(numberofmissingpackets[i]>0) cprintf(BG_RED,"fifo:%d missing packet from: %d now\n",i,lastpacketheader[i]); #endif //to decrement from packetsInFile to calculate packet loss for(j=0;jnum3))&0x2) cprintf(RED,"5 fifo:%d missing packet added at pnum:%d\n",i,tempoffset[i]); else cprintf(RED, "5 fifo:%d WEird at pnum:%d\n",i,tempoffset[i]); #endif if (!((*(uint8_t*)(((eiger_packet_header *)((char*)(tempbuffer[tempoffset[i]])))->num3))&0x2)){ cprintf(BG_RED, "wrong blank mismatch num4 earlier2! " "i:%d pnum:%d fnum:%d num3:0x%x actual num3:0x%x add:0x%x\n", i,tempoffset[i],tempframenum[i], (*(uint8_t*)(((eiger_packet_header *)((char*)(tempbuffer[tempoffset[i]])))->num3)), (*(uint8_t*)(((eiger_packet_header *)((char*)(blankframe[blankoffset])))->num3)), (void*)(tempbuffer[tempoffset[i]])); exit(-1); }else /*cprintf(GREEN, "blank packet i:%d pnum:%d fnum:%d num3:0x%x add:0x%x\n", i,tempoffset[i],tempframenum[i], (*(uint8_t*)(((eiger_packet_header *)((char*)(tempbuffer[tempoffset[i]])))->num3)), (void*)(tempbuffer[tempoffset[i]]));*/ tempoffset[i] ++; blankoffset ++; } //set fullframe and dont let fifo pop over it until written fullframe[i] = true; popready[i] = false; } //CORRECT FRAME - continue building frame else {/*cout<<"correct packet"<num4))); #ifdef VERYVERBOSE cprintf(GREEN,"**fifo:%d currentpacketheader: %d lastpacketheader %d tempoffset:%d\n",i,currentpacketheader[i],lastpacketheader[i], tempoffset[i]); #endif //add missing packets numberofmissingpackets[i] = (currentpacketheader[i] - lastpacketheader[i] -1); #ifdef VERYDEBUG if(numberofmissingpackets[i]>0) cprintf(BG_RED,"fifo:%d missing packet from: %d now at :%d tempoffset:%d\n",i,lastpacketheader[i],currentpacketheader[i],tempoffset[i]); #endif //to decrement from packetsInFile to calculate packet loss for(j=0;jnum3))&0x2) cprintf(RED,"4 fifo:%d missing packet added at pnum:%d\n",i,tempoffset[i]); else cprintf(RED, "4 fifo:%d WEird at pnum:%d\n",i,tempoffset[i]); #endif if (!((*(uint8_t*)(((eiger_packet_header *)((char*)(tempbuffer[tempoffset[i]])))->num3))&0x2)){ cprintf(BG_RED, "correct blank mismatch num4 earlier2! " "i:%d pnum:%d fnum:%d num3:0x%x actual num3:0x%x add:0x%x\n", i,tempoffset[i],tempframenum[i], (*(uint8_t*)(((eiger_packet_header *)((char*)(tempbuffer[tempoffset[i]])))->num3)), (*(uint8_t*)(((eiger_packet_header *)((char*)(blankframe[blankoffset])))->num3)), (void*)(tempbuffer[tempoffset[i]])); exit(-1); }else /* cprintf(GREEN, "blank packet i:%d pnum:%d fnum:%d num3:0x%x add:0x%x\n", i,tempoffset[i],tempframenum[i], (*(uint8_t*)(((eiger_packet_header *)((char*)(tempbuffer[tempoffset[i]])))->num3)), (void*)(tempbuffer[tempoffset[i]]));*/ tempoffset[i] ++; blankoffset ++; } //add current packet if(currentpacketheader[i] != (tempoffset[i]-(i*packetsPerFrame/numListeningThreads))){ cprintf(BG_RED, "correct pnum mismatch earlier! tempoffset[%d]:%d pnum:%d fnum:%d rfnum:%d\n", i,tempoffset[i],currentpacketheader[i], tempframenum[i],(*(uint32_t*)(((eiger_packet_header *)((char*)(wbuf[i]+ HEADER_SIZE_NUM_TOT_PACKETS)))->num1))); exit(-1); } tempbuffer[tempoffset[i]] = wbuf[i] + HEADER_SIZE_NUM_TOT_PACKETS; #ifdef EIGER_DEBUG3 cprintf(GREEN,"**fifo:%d currentpacketheader: %d tempoffset:%d\n",i,(*(uint8_t*)(((eiger_packet_header *)((char*)(tempbuffer[tempoffset[i]])))->num4)),tempoffset[i]); #endif if((*(uint8_t*)(((eiger_packet_header *)((char*)(tempbuffer[tempoffset[i]])))->num4)) != (tempoffset[i]-(i*packetsPerFrame/numListeningThreads))){ cprintf(BG_RED, "pnum mismatch num4 earlier! i:%d pnum:%d fnum:%d add:0x%x\n", i,(*(uint8_t*)(((eiger_packet_header *)((char*)(tempbuffer[tempoffset[i]])))->num4)), tempframenum[i],(void*)(tempbuffer[tempoffset[i]])); exit(-1); } /*cprintf(GREEN, "normal packet i:%d pnum:%d fnum:%d num3:0x%x add:0x%x\n", i,tempoffset[i],tempframenum[i], (*(uint8_t*)(((eiger_packet_header *)((char*)(tempbuffer[tempoffset[i]])))->num3)), (void*)(tempbuffer[tempoffset[i]]));*/ tempoffset[i] ++; //update last packet lastpacketheader[i] = currentpacketheader[i]; popready[i] = true; //last frame got, this will save time and also for last frames, it doesnt wait for stop receiver if(currentpacketheader[i] == LAST_PACKET_VALUE){ #ifdef EIGER_DEBUG3 cprintf(GREEN, "Got last packet\n"); #endif fullframe[i] = true; popready[i] = false; } } } } } //FULL FRAME if(fullframe[0] && fullframe[1]){ //determine frame number if(tempframenum[0] != tempframenum[1]) cprintf(RED,"Frame numbers mismatch!!! %d %d\n",tempframenum[0],tempframenum[1]); currframenum = tempframenum[0]; /*//to resolve for missing frame packets tempframenum[0]++; tempframenum[1]++;*/ numMissingPackets += (numberofmissingpackets[0]+numberofmissingpackets[1]); numTotMissingPacketsInFile += numMissingPackets; numTotMissingPackets += numMissingPackets; //#ifdef EIGER_DEBUG2 cprintf(GREEN,"**fnum:%d**\n",currframenum); //#endif #ifdef EIGER_DEBUG3 if(numberofmissingpackets[0]) cprintf(RED, "fifo 0 missing packets:%d fnum:%d\n",numberofmissingpackets[0],currframenum); if(numberofmissingpackets[1]) cprintf(RED, "fifo 1 missing packets:%d fnum:%d\n",numberofmissingpackets[1],currframenum); if(numMissingPackets){ cprintf(RED, "numMissingPackets:%d fnum:%d\n",numMissingPackets,currframenum); for (j=0;jnum3))&0x2) cprintf(RED,"found the missing packet at pnum:%d\n",j); } #endif //write and copy to gui handleWithoutDataCompression(ithread,tempbuffer,packetsPerFrame); //freeing for(j=0;jpush(tofree[j])); #ifdef FIFO_DEBUG cprintf(GREEN,"%d writer freed pushed into fifofree %x for listener %d\n",ithread, (void*)(tofree[j]),0); #endif } for(j=(packetsPerFrame/numListeningThreads);jpush(tofree[j])); #ifdef FIFO_DEBUG cprintf(GREEN,"%d writer freed pushed into fifofree %x for listener %d\n",ithread, (void*)(tofree[j]),1); #endif } #ifdef VERYDEBUG cprintf(GREEN,"finished freeing\n"); #endif //reset a few stuff presentframenum = tempframenum[0]+1; for(int i=0;inum1)))); cprintf(GREEN,"packetnum[%d]:%d\n",i,((*(uint8_t*)(((eiger_packet_header *)((char*)(wbuf[i] + HEADER_SIZE_NUM_TOT_PACKETS)))->num4)))); }*/ } //other detectors other than eiger else{ //frame number for progress if ((myDetectorType == PROPIX) ||((myDetectorType == GOTTHARD) && (shortFrame == -1))) tempframenum[0] = (((((uint32_t)(*((uint32_t*)(wbuf[ithread] + HEADER_SIZE_NUM_TOT_PACKETS))))+1)& (frameIndexMask)) >> frameIndexOffset); else tempframenum[0] = ((((uint32_t)(*((uint32_t*)(wbuf[ithread] + HEADER_SIZE_NUM_TOT_PACKETS))))& (frameIndexMask)) >> frameIndexOffset); if(numWriterThreads == 1) currframenum = tempframenum[0]; else{ pthread_mutex_lock(&progress_mutex); if(tempframenum[0] > currframenum) currframenum = tempframenum[0]; pthread_mutex_unlock(&progress_mutex); } //without datacompression: write datacall back, or write data, free fifo if(!dataCompression) handleWithoutDataCompression(ithread,wbuf, numpackets[0]); //data compression else handleDataCompression(ithread,wbuf,d, xmax, ymax, nf); } } #ifdef VERYVERBOSE cprintf(GREEN,"%d gonna wait for 1st sem\n", ithread); #endif //wait sem_wait(&writersmp[ithread]); if(killAllWritingThreads){ for(i=0;ifnum))-1; //missed header packet, so default value else startFrameIndex = ((*(uint32_t*)(((eiger_packet_header *)((char*)(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS)))->num1))-1); cout<<"startFrameIndex["<> frameIndexOffset); else startFrameIndex = ((((uint32_t)(*((uint32_t*)(buffer[ithread]+HEADER_SIZE_NUM_TOT_PACKETS)))) & (frameIndexMask)) >> frameIndexOffset); //start of acquisition if(!acqStarted){ startAcquisitionIndex=startFrameIndex; //currframenum = startAcquisitionIndex; acqStarted = true; cprintf(BLUE,"%d startAcquisitionIndex:%d\n", ithread, startAcquisitionIndex); } /*//for scans, cuz currfraenum resets else if (myDetectorType == EIGER){ startFrameIndex += (currframenum+1); }*/ cprintf(BLUE,"%d startFrameIndex: %d\n", ithread,startFrameIndex); prevframenum=startFrameIndex-1; //so that there is no packet loss, when currframenum(max,20) - prevframenum(1) measurementStarted = true; } void UDPStandardImplementation::stopListening(int ithread, int rc, int &pc, int &t){ FILE_LOG(logDEBUG) << __AT__ << " called"; int i; #ifdef VERYVERBOSE cprintf(BLUE, "%d Stop Listening\n", ithread); #endif if(status != TRANSMITTING){ cprintf(BG_RED,"%d *** udp socket not shut down from client ***********************\n", ithread); while(!fifoFree[ithread]->push(buffer[ithread])); exit(-1); } //free buffer if(rc <= 0){ cprintf(BLUE,"%d End of acquisition for Listening Thread\n", ithread); while(!fifoFree[ithread]->push(buffer[ithread])); #ifdef FIFO_DEBUG cprintf(BLUE,"%d listener empty buffer pushed into fifofree %x\n", ithread, (void*)(buffer[ithread])); #endif } //push the last buffer into fifo else{ if(myDetectorType == EIGER){ (*((uint32_t*)(buffer[ithread]))) = rc; pc = 1; }else{ pc = (rc/onePacketSize); (*((uint32_t*)(buffer[ithread]))) = pc; } #ifdef VERYDEBUG cprintf(BLUE,"%d last rc:%d\n",ithread, rc); cprintf(BLUE,"%d last packetcount:%d\n", ithread, pc); #endif totalListeningFrameCount[ithread] += pc; while(!fifo[ithread]->push(buffer[ithread])); #ifdef FIFO_DEBUG cprintf(BLUE,"%d listener last buffer pushed into fifo %x\n", ithread,(void*)(buffer[ithread])); #endif } //push dummy buffer to all writer threads for(i=0;ipop(buffer[ithread]); #ifdef FIFO_DEBUG cprintf(BLUE,"%d listener popped dummy buffer from fifofree %x\n", ithread,(void*)(buffer[ithread])); #endif (*((uint32_t*)(buffer[ithread]))) = 0x0; #ifdef VERYDEBUG cprintf(BLUE,"%d dummy buffer num packets:%d\n", ithread(*((uint16_t*)(buffer[ithread])))); #endif while(!fifo[ithread]->push(buffer[ithread])); #ifdef FIFO_DEBUG cprintf(BLUE,"%d listener pushed dummy buffer into fifo %x\n", ithread,(void*)(buffer[ithread])); #endif } //reset mask and exit loop pthread_mutex_lock(&status_mutex); listeningthreads_mask^=(1< 1) cprintf(BLUE,"%d Waiting for listening to be done.. current mask:0x%x\n", ithread, listeningthreads_mask); #endif while(listeningthreads_mask) usleep(5000); #ifdef VERYDEBUG t = 0; for(i=0;ipush(wbuffer[i])); #ifdef FIFO_DEBUG cprintf(GREEN,"%d writer free dummy pushed into fifofree %x for listener %d\n", ithread,(void*)(wbuffer[i]),i); #endif } //all threads need to close file, reset mask and exit loop closeFile(ithread); pthread_mutex_lock(&status_mutex); writerthreads_mask^=(1<num1))); cprintf(RED, "p1:0x%x\n", (*(uint8_t*)(((eiger_packet_header *)((char*)(buf[k])))->num3))); cprintf(RED, "p0 num:%d - %d\n", k, (*(uint8_t*)(((eiger_packet_header *)((char*)(buf[k])))->num4))); k = 1; cprintf(RED, "p2 fnum:0x%x\n", (*(unsigned int*)(((eiger_packet_header *)((char*)(buf[k])))->num1))); cprintf(RED, "p2:0x%x\n", (*(uint8_t*)(((eiger_packet_header *)((char*)(buf[k])))->num3))); cprintf(RED, "p1 num:%d - %d\n", k,(*(uint8_t*)(((eiger_packet_header *)((char*)(buf[k])))->num4))); k = 2; cprintf(RED, "p3 fnum:0x%x\n", (*(unsigned int*)(((eiger_packet_header *)((char*)(buf[k])))->num1))); cprintf(RED, "p3:0x%x\n", (*(uint8_t*)(((eiger_packet_header *)((char*)(buf[k])))->num3))); cprintf(RED, "p2 num:%d - %d\n", k,(*(uint8_t*)(((eiger_packet_header *)((char*)(buf[k])))->num4))); }else{ k = 0; cprintf(RED, "\np1 fnum:0x%x\n", (*(unsigned int*)(((eiger_packet_header *)((char*)(buf[k])))->num1))); cprintf(RED, "p1:0x%x\n", (*(uint8_t*)(((eiger_packet_header *)((char*)(buf[k])))->num3))); cprintf(RED, "p0 num:%d - %d\n", k, (*(uint16_t*)(((eiger_packet_header *)((char*)(buf[k])))->num2))); k = 1; cprintf(RED, "p2 fnum:0x%x\n", (*(unsigned int*)(((eiger_packet_header *)((char*)(buf[k])))->num1))); cprintf(RED, "p2:0x%x\n", (*(uint8_t*)(((eiger_packet_header *)((char*)(buf[k])))->num3))); cprintf(RED, "p1 num:%d - %d\n", k, (*(uint16_t*)(((eiger_packet_header *)((char*)(buf[k])))->num2))); k = 2; cprintf(RED, "p3 fnum:0x%x\n", (*(unsigned int*)(((eiger_packet_header *)((char*)(buf[k])))->num1))); cprintf(RED, "p3:0x%x\n", (*(uint8_t*)(((eiger_packet_header *)((char*)(buf[k])))->num3))); cprintf(RED, "p2 num:%d - %d\n", k, (*(uint16_t*)(((eiger_packet_header *)((char*)(buf[k])))->num2))); k = 256; cprintf(RED, "p257 fnum:0x%x\n", (*(unsigned int*)(((eiger_packet_header *)((char*)(buf[k])))->num1))); cprintf(RED, "p257:0x%x\n", (*(uint8_t*)(((eiger_packet_header *)((char*)(buf[k])))->num3))); cprintf(RED, "p256 num:%d - %d\n", k, (*(uint16_t*)(((eiger_packet_header *)((char*)(buf[k])))->num2))); k = 512; cprintf(RED, "p513 fnum:0x%x\n", (*(unsigned int*)(((eiger_packet_header *)((char*)(buf[k])))->num1))); cprintf(RED, "p513:0x%x\n", (*(uint8_t*)(((eiger_packet_header *)((char*)(buf[k])))->num3))); cprintf(RED, "p512 num:%d - %d\n", k, (*(uint16_t*)(((eiger_packet_header *)((char*)(buf[k])))->num2))); k = 768; cprintf(RED, "p769 fnum:0x%x\n", (*(unsigned int*)(((eiger_packet_header *)((char*)(buf[k])))->num1))); cprintf(RED, "p769:0x%x\n", (*(uint8_t*)(((eiger_packet_header *)((char*)(buf[k])))->num3))); cprintf(RED, "p768 num:%d - %d\n", k,(*(uint16_t*)(((eiger_packet_header *)((char*)(buf[k])))->num2))); } } #endif #endif } while(numpackets > 0){ //for progress and packet loss calculation(new files) if(myDetectorType == EIGER); else if ((myDetectorType == PROPIX)||((myDetectorType == GOTTHARD) && (shortFrame == -1))) tempframenum = (((((uint32_t)(*((uint32_t*)(buf[0] + HEADER_SIZE_NUM_TOT_PACKETS))))+1)& (frameIndexMask)) >> frameIndexOffset); else tempframenum = ((((uint32_t)(*((uint32_t*)(buf[0] + HEADER_SIZE_NUM_TOT_PACKETS))))& (frameIndexMask)) >> frameIndexOffset); if(numWriterThreads == 1) currframenum = tempframenum; else{ if(tempframenum > currframenum) currframenum = tempframenum; } #ifdef VERYDEBUG cout << "tempframenum:" << dec << tempframenum << " curframenum:" << currframenum << endl; #endif //lock if(numWriterThreads > 1) pthread_mutex_lock(&write_mutex); //to create new file when max reached packetsToSave = maxPacketsPerFile - packetsInFile; if(packetsToSave > numpackets) packetsToSave = numpackets; /**next time offset is still plus header length*/ if(myDetectorType == EIGER){ for(i=0;i= maxPacketsPerFile){ //for packet loss, because currframenum is the latest one for eiger if(myDetectorType != EIGER){ lastpacket = (((packetsToSave - 1) * onePacketSize) + offset); if ((myDetectorType == PROPIX)||((myDetectorType == GOTTHARD) && (shortFrame == -1))) tempframenum = (((((uint32_t)(*((uint32_t*)(buf[0] + lastpacket))))+1)& (frameIndexMask)) >> frameIndexOffset); else tempframenum = ((((uint32_t)(*((uint32_t*)(buf[0] + lastpacket))))& (frameIndexMask)) >> frameIndexOffset); } if(numWriterThreads == 1) currframenum = tempframenum; else{ if(tempframenum > currframenum) currframenum = tempframenum; } #ifdef VERYDEBUG cout << "tempframenum:" << dec << tempframenum << " curframenum:" << currframenum << endl; #endif //create createNewFile(); } //unlock if(numWriterThreads > 1) pthread_mutex_unlock(&write_mutex); if(myDetectorType != EIGER) offset += (packetsToSave * onePacketSize); numpackets -= packetsToSave; } } else{ if(numWriterThreads > 1) pthread_mutex_lock(&write_mutex); packetsInFile += numpackets; packetsCaught += (numpackets - numMissingPackets); totalPacketsCaught += (numpackets - numMissingPackets); numMissingPackets = 0; if(numWriterThreads > 1) pthread_mutex_unlock(&write_mutex); } } void UDPStandardImplementation::handleWithoutDataCompression(int ithread, char* wbuffer[],int npackets){ int i,j, missingpacket,port = 0, pnuminc; if (cbAction < DO_EVERYTHING){ if (myDetectorType == EIGER){ for(i=0;i 0){ #ifdef WRITE_HEADERS if (myDetectorType == EIGER){ for (i = 0; i < packetsPerFrame; i++){ //which port if (i ==(packetsPerFrame/2)) port = 1; //missing packet if ((*(uint8_t*)(((eiger_packet_header *)((char*)(wbuffer[i])))->num3))&0x2){ missingpacket = 1; //add packet numbers (*(uint16_t*)(((eiger_packet_header *)((char*)(wbuffer[i])))->num2)) = (i+1); (*(uint32_t*)(((eiger_packet_header *)((char*)(wbuffer[i])))->num1)) = currframenum+1; }else{ missingpacket = 0; if((*(uint8_t*)(((eiger_packet_header *)((char*)(wbuffer[i])))->num4)) != (i-(port*packetsPerFrame/numListeningThreads))){ cprintf(BG_RED, "pnum mismatch num4! i:%d pnum:%d fnum:%d\n",i,(*(uint8_t*)(((eiger_packet_header *)((char*)(wbuffer[i])))->num4)),currframenum); exit(-1); } if(dynamicRange != 32){ //move packet numbers to num2, and compensate for port1 starting pnum from 0 if(!port) (*(uint16_t*)(((eiger_packet_header *)((char*)(wbuffer[i])))->num2)) = ((*(uint8_t*)(((eiger_packet_header *)((char*)(wbuffer[i])))->num4))+1); else (*(uint16_t*)(((eiger_packet_header *)((char*)(wbuffer[i])))->num2)) = ((*(uint8_t*)(((eiger_packet_header *)((char*)(wbuffer[i])))->num4))+(packetsPerFrame/2) +1); } //dr == 32 else{ if(i == 0) pnuminc = 0; else if(i == (packetsPerFrame/4)) pnuminc = (packetsPerFrame/4); else if(i == (packetsPerFrame/2)) pnuminc = (packetsPerFrame/2); else if(i == (3*packetsPerFrame/4)) pnuminc = (3*packetsPerFrame/4); (*(uint16_t*)(((eiger_packet_header *)((char*)(wbuffer[i])))->num2)) = ((*(uint8_t*)(((eiger_packet_header *)((char*)(wbuffer[i])))->num4))+pnuminc+1); } } if((*(uint16_t*)(((eiger_packet_header *)((char*)(wbuffer[i])))->num2)) != (i+1)){ cprintf(BG_RED, "pnum mismatch! i:%d pnum:%d fnum:%d\n",i,(*(uint16_t*)(((eiger_packet_header *)((char*)(wbuffer[i])))->num2)),currframenum); if ((*(uint8_t*)(((eiger_packet_header *)((char*)(wbuffer[i])))->num3))&0x2) cprintf(BG_RED,"missing packet though\n"); exit(-1); } //overwriting port number and dynamic range (*(uint8_t*)(((eiger_packet_header *)((char*)(wbuffer[i])))->num3)) = ((dynamicRange<<2)|(missingpacket<<1)|(port)); //frame number //(*(uint32_t*)(((eiger_packet_header *)((char*)(wbuffer[i])))->num1)) = currframenum; #ifdef VERYDEBUG if((i==0)||(i==1)){ cprintf(GREEN, "%d packet header:0x%016llx num3:0x%x\n",i, ((uint64_t)(*((uint64_t*)(wbuffer[i])))), (uint8_t)(*(uint8_t*)(((eiger_packet_header *)((char*)(wbuffer[i])))->num3))); } cprintf(GREEN, "%d - 0x%x - %d - %d\n", i, (*(uint8_t*)(((eiger_packet_header *)((char*)(wbuffer[i])))->num3)), (*(uint8_t*)(((eiger_packet_header *)((char*)(wbuffer[i])))->num4)), (*(uint16_t*)(((eiger_packet_header *)((char*)(wbuffer[i])))->num2))); #endif /* cprintf(GREEN,"at writing, fnum:%d, pnum:%d,num3:0x%x add:0x%x\n", currframenum, i, (*(uint8_t*)(((eiger_packet_header *)((char*)(wbuffer[i])))->num3)), (void*)(wbuffer[i])); */ } } #endif writeToFile_withoutCompression(wbuffer, npackets,currframenum); } #ifdef VERYDEBUG cprintf(GREEN,"written everyting\n"); #endif } if(myDetectorType == EIGER) { #ifdef VERYDEBUG cprintf(GREEN,"gonna copy frame\n"); #endif copyFrameToGui(wbuffer,currframenum); #ifdef VERYDEBUG cprintf(GREEN,"copied frame\n"); #endif }else{ //copy to gui if(npackets >= packetsPerFrame){//min 1 frame, but neednt be //if(npackets == packetsPerFrame * numJobsPerThread){ //only full frames copyFrameToGui(NULL,-1,wbuffer[0]+HEADER_SIZE_NUM_TOT_PACKETS); #ifdef VERYVERBOSE cout << ithread << " finished copying" << endl; #endif } //else cout << "unfinished buffersize" << endl; while(!fifoFree[0]->push(wbuffer[0])); #ifdef FIFO_DEBUG cprintf(GREEN,"%d writer freed pushed into fifofree %x for listener 0\n",ithread, (void*)(wbuffer[0])); #endif } } void UDPStandardImplementation::handleDataCompression(int ithread, char* wbuffer[], char* data, int xmax, int ymax, int &nf){ FILE_LOG(logDEBUG) << __AT__ << " called"; #if defined(MYROOT1) && defined(ALLFILE_DEBUG) writeToFile_withoutCompression(wbuf[0], numpackets,currframenum); #endif int npackets = (uint32_t)(*((uint32_t*)wbuffer[0])); eventType thisEvent = PEDESTAL; int ndata; char* buff = 0; data = wbuffer[0]+ HEADER_SIZE_NUM_TOT_PACKETS; int remainingsize = npackets * onePacketSize; int np; int once = 0; double tot, tl, tr, bl, br; int xmin = 1, ymin = 1, ix, iy; while(buff = receiverdata[ithread]->findNextFrame(data,ndata,remainingsize)){ np = ndata/onePacketSize; //cout<<"buff framnum:"<> frameIndexOffset)<newFrame(); //only for moench if(commonModeSubtractionEnable){ for(ix = xmin - 1; ix < xmax+1; ix++){ for(iy = ymin - 1; iy < ymax+1; iy++){ thisEvent = singlePhotonDet[ithread]->getEventType(buff, ix, iy, 0); } } } for(ix = xmin - 1; ix < xmax+1; ix++) for(iy = ymin - 1; iy < ymax+1; iy++){ thisEvent=singlePhotonDet[ithread]->getEventType(buff, ix, iy, commonModeSubtractionEnable); if (nf>1000) { tot=0; tl=0; tr=0; bl=0; br=0; if (thisEvent==PHOTON_MAX) { receiverdata[ithread]->getFrameNumber(buff); //iFrame=receiverdata[ithread]->getFrameNumber(buff); #ifdef MYROOT1 myTree[ithread]->Fill(); //cout << "Fill in event: frmNr: " << iFrame << " ix " << ix << " iy " << iy << " type " << thisEvent << endl; #else pthread_mutex_lock(&write_mutex); if((enableFileWrite) && (sfilefd)) singlePhotonDet[ithread]->writeCluster(sfilefd); pthread_mutex_unlock(&write_mutex); #endif } } } nf++; #ifndef ALLFILE pthread_mutex_lock(&progress_mutex); packetsInFile += packetsPerFrame; packetsCaught += packetsPerFrame; totalPacketsCaught += packetsPerFrame; if(packetsInFile >= maxPacketsPerFile) createNewFile(); pthread_mutex_unlock(&progress_mutex); #endif if(!once){ copyFrameToGui(NULL,-1,buff); once = 1; } } remainingsize -= ((buff + ndata) - data); data = buff + ndata; if(data > (wbuffer[0] + HEADER_SIZE_NUM_TOT_PACKETS + npackets * onePacketSize) ) cprintf(BG_RED,"ERROR SHOULD NOT COME HERE, Error 142536!\n"); } while(!fifoFree[0]->push(wbuffer[0])); #ifdef FIFO_DEBUG cprintf(BLUE,"%d writer compression free pushed into fifofree %x for listerner 0\n", ithread, (void*)(wbuffer[0])); #endif } int UDPStandardImplementation::enableTenGiga(int enable){ FILE_LOG(logDEBUG) << __AT__ << " called"; cout << "Enabling 10Gbe to " << enable << endl; int oldtengiga = tengigaEnable; if(enable >= 0){ tengigaEnable = enable; if(myDetectorType == EIGER){ if(!tengigaEnable){ packetsPerFrame = EIGER_ONE_GIGA_CONSTANT * dynamicRange * EIGER_MAX_PORTS; onePacketSize = EIGER_ONE_GIGA_ONE_PACKET_SIZE; }else{ packetsPerFrame = EIGER_TEN_GIGA_CONSTANT * dynamicRange * EIGER_MAX_PORTS; onePacketSize = EIGER_TEN_GIGA_ONE_PACKET_SIZE; } frameSize = onePacketSize * packetsPerFrame; bufferSize = (frameSize/EIGER_MAX_PORTS) + EIGER_HEADER_LENGTH;//everything one port gets (img header plus packets) maxPacketsPerFile = EIGER_MAX_FRAMES_PER_FILE * packetsPerFrame; #ifdef VERBOSE cout<<"packetsPerFrame:"<