#ifdef SLS_RECEIVER_UDP_FUNCTIONS /********************************************//** * @file UDPRESTImplementation.cpp * @short does all the functions for a receiver, set/get parameters, start/stop etc. ***********************************************/ #include "UDPRESTImplementation.h" #include "moench02ModuleData.h" #include "gotthardModuleData.h" #include "gotthardShortModuleData.h" #include // SIGINT #include // stat #include // socket(), bind(), listen(), accept(), shut down #include // sock_addr_in, htonl, INADDR_ANY #include // exit() #include //set precision #include //munmap #include #include //#include "utilities.h" using namespace std; UDPRESTImplementation::UDPRESTImplementation() : isInitialized(false), status(slsReceiverDefs::ERROR) {} UDPRESTImplementation::~UDPRESTImplementation(){} void UDPRESTImplementation::initialize(const char *detectorHostName){ string name; if (detectorHostName != NULL) name = detectorHostName; if (name.empty()) { FILE_LOG(logDEBUG) << "initialize(): can't initialize with empty string or NULL for detectorHostname"; } else if (isInitialized == true) { FILE_LOG(logDEBUG) << "initialize(): already initialized, can't initialize several times"; } else { FILE_LOG(logDEBUG) << "initialize(): initialize() with: detectorHostName=" << name; strcpy(detHostname,detectorHostName); //init_config.detectorHostname = name; //REST call - hardcoded //RestHelper rest ; rest->init(detHostname, 8080); std::string answer; int code = rest->get_json("status", &answer); if (code != 0){ //throw -1; std::cout << "I SHOULD THROW AN EXCEPTION!!!" << std::endl; } else{ isInitialized = true; status = slsReceiverDefs::IDLE; } std::cout << "Answer: " << answer << std::endl; /* std::std::cout << string << std::endl; << "---- REST test 3: true, json object "<< std::endl; JsonBox::Value json_value; code = rest.get_json("status", &json_value); std::cout << "JSON " << json_value["status"] << std::endl; */ } } int UDPRESTImplementation::setDetectorType(detectorType det){ cout << "[WARNING] This is a base implementation, " << __func__ << " not correctly implemented" << endl; return OK; } /*Frame indices and numbers caught*/ bool UDPRESTImplementation::getAcquistionStarted(){return acqStarted;}; bool UDPRESTImplementation::getMeasurementStarted(){return measurementStarted;}; int UDPRESTImplementation::getFramesCaught(){return (packetsCaught/packetsPerFrame);} int UDPRESTImplementation::getTotalFramesCaught(){return (totalPacketsCaught/packetsPerFrame);} uint32_t UDPRESTImplementation::getStartFrameIndex(){return startFrameIndex;} uint32_t UDPRESTImplementation::getFrameIndex(){ if(!packetsCaught) frameIndex=-1; else frameIndex = currframenum - startFrameIndex; return frameIndex; } uint32_t UDPRESTImplementation::getAcquisitionIndex(){ if(!totalPacketsCaught) acquisitionIndex=-1; else acquisitionIndex = currframenum - startAcquisitionIndex; return acquisitionIndex; } void UDPRESTImplementation::resetTotalFramesCaught(){ acqStarted = false; startAcquisitionIndex = 0; totalPacketsCaught = 0; } /*file parameters*/ int UDPRESTImplementation::getFileIndex(){ return fileIndex; } int UDPRESTImplementation::setFileIndex(int i){ cout << "[WARNING] This is a base implementation, " << __func__ << " could have no effects." << endl; /* if(i>=0) fileIndex = i; */ return getFileIndex(); } int UDPRESTImplementation::setFrameIndexNeeded(int i){ cout << "[WARNING] This is a base implementation, " << __func__ << " could have no effects." << endl; frameIndexNeeded = i; return frameIndexNeeded; } /* int UDPRESTImplementation::getEnableFileWrite() const{ return enableFileWrite; } int UDPRESTImplementation::setEnableFileWrite(int i){ enableFileWrite=i; return getEnableFileWrite(); } int UDPRESTImplementation::getEnableOverwrite() const{ return overwrite; } int UDPRESTImplementation::setEnableOverwrite(int i){ overwrite=i; return getEnableOverwrite(); } */ /*other parameters*/ slsReceiverDefs::runStatus UDPRESTImplementation::getStatus() const{ return status; } /* char *UDPRESTImplementation::getDetectorHostname() const{ return (char*)detHostname; } */ void UDPRESTImplementation::setEthernetInterface(char* c){ strcpy(eth,c); } void UDPRESTImplementation::setUDPPortNo(int p){ for(int i=0;i= 0) numberOfFrames = fnum; return getNumberOfFrames(); } */ /* int UDPRESTImplementation::getScanTag() const{ return scanTag; } */ /* int32_t UDPRESTImplementation::setScanTag(int32_t stag){ if(stag >= 0) scanTag = stag; return getScanTag(); } */ int32_t UDPRESTImplementation::setDynamicRange(int32_t dr){ cout << "Setting Dynamic Range" << endl; int olddr = dynamicRange; if(dr >= 0){ dynamicRange = dr; } return getDynamicRange(); } int UDPRESTImplementation::setShortFrame(int i){ shortFrame=i; if(shortFrame!=-1){ bufferSize = GOTTHARD_SHORT_ONE_PACKET_SIZE; frameSize = GOTTHARD_SHORT_BUFFER_SIZE; maxPacketsPerFile = SHORT_MAX_FRAMES_PER_FILE * GOTTHARD_SHORT_PACKETS_PER_FRAME; packetsPerFrame = GOTTHARD_SHORT_PACKETS_PER_FRAME; frameIndexMask = GOTTHARD_SHORT_FRAME_INDEX_MASK; frameIndexOffset = GOTTHARD_SHORT_FRAME_INDEX_OFFSET; }else{ onePacketSize = GOTTHARD_ONE_PACKET_SIZE; bufferSize = GOTTHARD_BUFFER_SIZE; frameSize = GOTTHARD_BUFFER_SIZE; maxPacketsPerFile = MAX_FRAMES_PER_FILE * GOTTHARD_PACKETS_PER_FRAME; packetsPerFrame = GOTTHARD_PACKETS_PER_FRAME; frameIndexMask = GOTTHARD_FRAME_INDEX_MASK; frameIndexOffset = GOTTHARD_FRAME_INDEX_OFFSET; } deleteFilter(); if(dataCompression) setupFilter(); return shortFrame; } int UDPRESTImplementation::setNFrameToGui(int i){ if(i>=0){ nFrameToGui = i; setupFifoStructure(); } return nFrameToGui; } int64_t UDPRESTImplementation::setAcquisitionPeriod(int64_t index){ if(index >= 0){ if(index != acquisitionPeriod){ acquisitionPeriod = index; setupFifoStructure(); } } return acquisitionPeriod; } bool UDPRESTImplementation::getDataCompression(){return dataCompression;} int UDPRESTImplementation::enableDataCompression(bool enable){ cout << "Data compression "; if(enable) cout << "enabled" << endl; else cout << "disabled" << endl; #ifdef MYROOT1 cout << " WITH ROOT" << endl; #else cout << " WITHOUT ROOT" << endl; #endif //delete filter for the current number of threads deleteFilter(); dataCompression = enable; pthread_mutex_lock(&status_mutex); writerthreads_mask = 0x0; pthread_mutex_unlock(&(status_mutex)); createWriterThreads(true); if(enable) numWriterThreads = MAX_NUM_WRITER_THREADS; else numWriterThreads = 1; if(createWriterThreads() == FAIL){ cout << "ERROR: Could not create writer threads" << endl; return FAIL; } setThreadPriorities(); if(enable) setupFilter(); return OK; } /*other functions*/ void UDPRESTImplementation::deleteFilter(){ int i; cmSub=NULL; for(i=0;i(receiverdata[i], csize, sigma, sign, cmSub); } //LEO: it is not clear to me.. void UDPRESTImplementation::setupFifoStructure(){ int64_t i; int oldn = numJobsPerThread; //if every nth frame mode if(nFrameToGui) numJobsPerThread = nFrameToGui; //random nth frame mode else{ if(!acquisitionPeriod) i = SAMPLE_TIME_IN_NS; else i = SAMPLE_TIME_IN_NS/acquisitionPeriod; if (i > MAX_JOBS_PER_THREAD) numJobsPerThread = MAX_JOBS_PER_THREAD; else if (i < 1) numJobsPerThread = 1; else numJobsPerThread = i; } //if same, return if(oldn == numJobsPerThread) return; if(myDetectorType == EIGER) numJobsPerThread = 1; //otherwise memory too much if numjobsperthread is at max = 1000 fifosize = GOTTHARD_FIFO_SIZE; if(myDetectorType == MOENCH) fifosize = MOENCH_FIFO_SIZE; else if(myDetectorType == EIGER) fifosize = EIGER_FIFO_SIZE; if(fifosize % numJobsPerThread) fifosize = (fifosize/numJobsPerThread)+1; else fifosize = fifosize/numJobsPerThread; cout << "Number of Frames per buffer:" << numJobsPerThread << endl; cout << "Fifo Size:" << fifosize << endl; /* //for testing numJobsPerThread = 3; fifosize = 11; */ for(int i=0;iisEmpty()) fifoFree[i]->pop(buffer[i]); delete fifoFree[i]; } if(fifo[i]) delete fifo[i]; if(mem0[i]) free(mem0[i]); fifoFree[i] = new CircularFifo(fifosize); fifo[i] = new CircularFifo(fifosize); //allocate memory mem0[i]=(char*)malloc((bufferSize * numJobsPerThread + HEADER_SIZE_NUM_TOT_PACKETS)*fifosize); /** shud let the client know about this */ if (mem0[i]==NULL){ cout<<"++++++++++++++++++++++ COULD NOT ALLOCATE MEMORY FOR LISTENING !!!!!!!+++++++++++++++++++++" << endl; exit(-1); } buffer[i]=mem0[i]; //push the addresses into freed fifoFree and writingFifoFree while (buffer[i]<(mem0[i]+(bufferSize * numJobsPerThread + HEADER_SIZE_NUM_TOT_PACKETS)*(fifosize-1))) { fifoFree[i]->push(buffer[i]); buffer[i]+=(bufferSize * numJobsPerThread + HEADER_SIZE_NUM_TOT_PACKETS); } } cout << "Fifo structure(s) reconstructed" << endl; } /** acquisition functions */ void UDPRESTImplementation::readFrame(char* c,char** raw, uint32_t &fnum){ //point to gui data if (guiData == NULL) guiData = latestData; //copy data and filename strcpy(c,guiFileName); fnum = guiFrameNumber; //could not get gui data if(!guiDataReady){ *raw = NULL; } //data ready, set guidata to receive new data else{ *raw = guiData; guiData = NULL; pthread_mutex_lock(&dataReadyMutex); guiDataReady = 0; pthread_mutex_unlock(&dataReadyMutex); if((nFrameToGui) && (writerthreads_mask)){ /*if(nFrameToGui){*/ //release after getting data sem_post(&smp); } } } void UDPRESTImplementation::copyFrameToGui(char* startbuf[], uint32_t fnum, char* buf){ //random read when gui not ready if((!nFrameToGui) && (!guiData)){ pthread_mutex_lock(&dataReadyMutex); guiDataReady=0; pthread_mutex_unlock(&dataReadyMutex); } //random read or nth frame read, gui needs data now else{ /* //nth frame read, block current process if the guireader hasnt read it yet if(nFrameToGui) sem_wait(&smp); */ pthread_mutex_lock(&dataReadyMutex); guiDataReady=0; //eiger if(startbuf != NULL){ int offset = 0; int size = frameSize/EIGER_MAX_PORTS; for(int j=0;jgetErrorStatus(); if(iret){ #ifdef VERBOSE cout << "Could not create UDP socket on port " << server_port[i] << " error:" << iret << endl; #endif return FAIL; } } return OK; } int UDPRESTImplementation::shutDownUDPSockets(){ for(int i=0;iShutDownSocket(); delete udpSocket[i]; udpSocket[i] = NULL; } } return OK; } int UDPRESTImplementation::createListeningThreads(bool destroy){ int i; void* status; killAllListeningThreads = 0; pthread_mutex_lock(&status_mutex); listeningthreads_mask = 0x0; pthread_mutex_unlock(&(status_mutex)); if(!destroy){ //start listening threads cout << "Creating Listening Threads(s)"; currentListeningThreadIndex = -1; for(i = 0; i < numListeningThreads; ++i){ sem_init(&listensmp[i],1,0); thread_started = 0; currentListeningThreadIndex = i; if(pthread_create(&listening_thread[i], NULL,startListeningThread, (void*) this)){ cout << "Could not create listening thread with index " << i << endl; return FAIL; } while(!thread_started); cout << "."; cout << flush; } #ifdef VERBOSE cout << "Listening thread(s) created successfully." << endl; #else cout << endl; #endif }else{ cout<<"Destroying Listening Thread(s)"<initEventTree(temp, &iframe); //resets the pedestalSubtraction array and the commonModeSubtraction singlePhotonDet[ithr]->newDataSet(); if(myFile[ithr]==NULL){ cout<<"file null"<IsOpen()){ cout<<"file not open"< DO_NOTHING){ //close if(sfilefd){ fclose(sfilefd); sfilefd = NULL; } //open file if(!overwrite){ if (NULL == (sfilefd = fopen((const char *) (savefilename), "wx"))){ cout << "Error: Could not create new file " << savefilename << endl; return FAIL; } }else if (NULL == (sfilefd = fopen((const char *) (savefilename), "w"))){ cout << "Error: Could not create file " << savefilename << endl; return FAIL; } //setting buffer setvbuf(sfilefd,NULL,_IOFBF,BUF_SIZE); //printing packet losses and file names if(!packetsCaught) cout << savefilename << endl; else{ cout << savefilename << "\tpacket loss " << setw(4)<GetCurrentFile(); if(myFile[ithr]->Write()) //->Write(tall->GetName(),TObject::kOverwrite); cout << "Thread " << ithr <<": wrote frames to file" << endl; else cout << "Thread " << ithr << ": could not write frames to file" << endl; }else cout << "Thread " << ithr << ": could not write frames to file: No file or No Tree" << endl; //close file if(myTree[ithr] && myFile[ithr]) myFile[ithr] = myTree[ithr]->GetCurrentFile(); if(myFile[ithr] != NULL) myFile[ithr]->Close(); myFile[ithr] = NULL; myTree[ithr] = NULL; pthread_mutex_unlock(&write_mutex); #endif } } int UDPRESTImplementation::startReceiver(char message[]){ int i; // #ifdef VERBOSE cout << "Starting Receiver" << endl; //#endif //reset listening thread variables measurementStarted = false; //should be set to zero as its added to get next start frame indices for scans for eiger if(!acqStarted) currframenum = 0; startFrameIndex = 0; for(int i = 0; i < numListeningThreads; ++i) totalListeningFrameCount[i] = 0; //udp socket if(createUDPSockets() == FAIL){ strcpy(message,"Could not create UDP Socket(s).\n"); cout << endl << message << endl; return FAIL; } cout << "UDP socket(s) created successfully. 1st port " << server_port[0] << endl; if(setupWriter() == FAIL){ //stop udp socket shutDownUDPSockets(); sprintf(message,"Could not create file %s.\n",savefilename); return FAIL; } cout << "Successfully created file(s)" << endl; //done to give the gui some proper name instead of always the last file name if(dataCompression) sprintf(savefilename, "%s/%s_fxxx_%d_xx.root", filePath,fileName,fileIndex); //initialize semaphore sem_init(&smp,1,0); //status pthread_mutex_lock(&status_mutex); status = RUNNING; for(i=0;istartListening(); return this_pointer; } void* UDPRESTImplementation::startWritingThread(void* this_pointer){ ((UDPRESTImplementation*)this_pointer)->startWriting(); return this_pointer; } int UDPRESTImplementation::startListening(){ int ithread = currentListeningThreadIndex; #ifdef VERYVERBOSE cout << "In startListening() " << endl; #endif thread_started = 1; int i,total; int lastpacketoffset, expected, rc, rc1,packetcount, maxBufferSize, carryonBufferSize; uint32_t lastframeheader;// for moench to check for all the packets in last frame char* tempchar = NULL; int imageheader = 0; if(myDetectorType==EIGER) imageheader = EIGER_IMAGE_HEADER_SIZE; while(1){ //variables that need to be checked/set before each acquisition carryonBufferSize = 0; //if more than 1 listening thread, listen one packet at a time, else need to interleaved frame later maxBufferSize = bufferSize * numJobsPerThread; #ifdef VERYDEBUG cout << " maxBufferSize:" << maxBufferSize << ",carryonBufferSize:" << carryonBufferSize << endl; #endif if(tempchar) {delete [] tempchar;tempchar = NULL;} if(myDetectorType != EIGER) tempchar = new char[onePacketSize * ((packetsPerFrame/numListeningThreads) - 1)]; //gotthard: 1packet size, moench:39 packet size while((1<pop(buffer[ithread]); #ifdef VERYDEBUG cout << ithread << " *** popped from fifo free" << (void*)buffer[ithread] << endl; #endif //receive if(udpSocket[ithread] == NULL){ rc = 0; cout << ithread << "UDP Socket is NULL" << endl; } //normal listening else if(!carryonBufferSize){ rc = udpSocket[ithread]->ReceiveDataOnly(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS, maxBufferSize); expected = maxBufferSize; } //the remaining packets from previous buffer else{ #ifdef VERYDEBUG cout << ithread << " ***carry on buffer" << carryonBufferSize << endl; cout << ithread << " framennum in temochar:"<<((((uint32_t)(*((uint32_t*)tempchar))) & (frameIndexMask)) >> frameIndexOffset)<ReceiveDataOnly((buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS + carryonBufferSize),maxBufferSize - carryonBufferSize); expected = maxBufferSize - carryonBufferSize; } #ifdef VERYDEBUG cout << ithread << " *** rc:" << dec << rc << ". expected:" << dec << expected << endl; #endif //start indices for each start of scan/acquisition - eiger does it before if((!measurementStarted) && (rc > 0) && (!ithread)) startFrameIndices(ithread); //problem in receiving or end of acquisition if((rc < expected)||(rc <= 0)){ stopListening(ithread,rc,packetcount,total); continue; } //reset packetcount = (packetsPerFrame/numListeningThreads) * numJobsPerThread; carryonBufferSize = 0; //check if last packet valid and calculate packet count switch(myDetectorType){ case MOENCH: lastpacketoffset = (((numJobsPerThread * packetsPerFrame - 1) * onePacketSize) + HEADER_SIZE_NUM_TOT_PACKETS); #ifdef VERYDEBUG cout <<"first packet:"<< ((((uint32_t)(*((uint32_t*)(buffer[ithread]+HEADER_SIZE_NUM_TOT_PACKETS))))) & (packetIndexMask)) << endl; cout <<"first header:"<< (((((uint32_t)(*((uint32_t*)(buffer[ithread]+HEADER_SIZE_NUM_TOT_PACKETS))))) & (frameIndexMask)) >> frameIndexOffset) << endl; cout << "last packet offset:" << lastpacketoffset << endl; cout <<"last packet:"<< ((((uint32_t)(*((uint32_t*)(buffer[ithread]+lastpacketoffset))))) & (packetIndexMask)) << endl; cout <<"last header:"<< (((((uint32_t)(*((uint32_t*)(buffer[ithread]+lastpacketoffset))))) & (frameIndexMask)) >> frameIndexOffset) << endl; #endif //moench last packet value is 0 if( ((((uint32_t)(*((uint32_t*)(buffer[ithread]+lastpacketoffset))))) & (packetIndexMask))){ lastframeheader = ((((uint32_t)(*((uint32_t*)(buffer[ithread]+lastpacketoffset))))) & (frameIndexMask)) >> frameIndexOffset; carryonBufferSize += onePacketSize; lastpacketoffset -= onePacketSize; --packetcount; while (lastframeheader == (((((uint32_t)(*((uint32_t*)(buffer[ithread]+lastpacketoffset))))) & (frameIndexMask)) >> frameIndexOffset)){ carryonBufferSize += onePacketSize; lastpacketoffset -= onePacketSize; --packetcount; } memcpy(tempchar, buffer[ithread]+(lastpacketoffset+onePacketSize), carryonBufferSize); #ifdef VERYDEBUG cout << "tempchar header:" << (((((uint32_t)(*((uint32_t*)(tempchar))))) & (frameIndexMask)) >> frameIndexOffset) << endl; cout <<"tempchar packet:"<< ((((uint32_t)(*((uint32_t*)(tempchar))))) & (packetIndexMask)) << endl; #endif } break; case GOTTHARD: if(shortFrame == -1){ lastpacketoffset = (((numJobsPerThread * packetsPerFrame - 1) * onePacketSize) + HEADER_SIZE_NUM_TOT_PACKETS); #ifdef VERYDEBUG cout << "last packet offset:" << lastpacketoffset << endl; #endif if((unsigned int)(packetsPerFrame -1) != ((((uint32_t)(*((uint32_t*)(buffer[ithread]+lastpacketoffset))))+1) & (packetIndexMask))){ memcpy(tempchar,buffer[ithread]+lastpacketoffset, onePacketSize); #ifdef VERYDEBUG cout << "tempchar header:" << (((((uint32_t)(*((uint32_t*)(tempchar))))+1) & (frameIndexMask)) >> frameIndexOffset) << endl; #endif carryonBufferSize = onePacketSize; --packetcount; } } #ifdef VERYDEBUG cout << "header:" << (((((uint32_t)(*((uint32_t*)(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS))))+1) & (frameIndexMask)) >> frameIndexOffset) << endl; #endif break; default: break; } // cout<<"*********** "<fnum)<push(buffer[ithread])); #ifdef VERYDEBUG if(!ithread) cout << ithread << " *** pushed into listening fifo" << endl; #endif } sem_wait(&listensmp[ithread]); //make sure its not exiting thread if(killAllListeningThreads){ cout << ithread << " good bye listening thread" << endl; if(tempchar) {delete [] tempchar;tempchar = NULL;} pthread_exit(NULL); } } return OK; } int UDPRESTImplementation::startWriting(){ int ithread = currentWriterThreadIndex; #ifdef VERYVERBOSE cout << ithread << "In startWriting()" <pop(wbuf[i]); numpackets = (uint16_t)(*((uint16_t*)wbuf[i])); #ifdef VERYDEBUG cout << ithread << " numpackets:" << dec << numpackets << endl; #endif } #ifdef VERYDEBUG cout << ithread << " numpackets:" << dec << numpackets << endl; cout << ithread << " *** writer popped from fifo " << (void*) wbuf[0]<< endl; cout << ithread << " *** writer popped from fifo " << (void*) wbuf[1]<< endl; #endif //last dummy packet if(numpackets == 0xFFFF){ stopWriting(ithread,wbuf); continue; } //for progress if(myDetectorType == EIGER){ tempframenum = htonl(*(unsigned int*)((eiger_image_header *)((char*)(wbuf[ithread] + HEADER_SIZE_NUM_TOT_PACKETS)))->fnum); tempframenum += (startFrameIndex-1); //eiger frame numbers start at 1, so need to -1 }else if ((myDetectorType == GOTTHARD) && (shortFrame == -1)) tempframenum = (((((uint32_t)(*((uint32_t*)(wbuf[ithread] + HEADER_SIZE_NUM_TOT_PACKETS))))+1)& (frameIndexMask)) >> frameIndexOffset); else tempframenum = ((((uint32_t)(*((uint32_t*)(wbuf[ithread] + HEADER_SIZE_NUM_TOT_PACKETS))))& (frameIndexMask)) >> frameIndexOffset); if(numWriterThreads == 1) currframenum = tempframenum; else{ pthread_mutex_lock(&progress_mutex); if(tempframenum > currframenum) currframenum = tempframenum; pthread_mutex_unlock(&progress_mutex); } //#ifdef VERYDEBUG if(myDetectorType == EIGER) cout << endl < 0){ for(i=0;ipush(wbuf[i])); #ifdef VERYDEBUG cout << ithread << ":" << i+j << " fifo freed:" << (void*)wbuf[i] << endl; #endif } } else{ //copy to gui copyFrameToGui(NULL,-1,wbuf[0]+HEADER_SIZE_NUM_TOT_PACKETS); #ifdef VERYVERBOSE cout << ithread << " finished copying" << endl; #endif while(!fifoFree[0]->push(wbuf[0])); #ifdef VERYVERBOSE cout<<"buf freed:"<<(void*)wbuf[0]<fnum); //gotthard has +1 for frame number and not a short frame else if ((myDetectorType == GOTTHARD) && (shortFrame == -1)) startFrameIndex = (((((uint32_t)(*((uint32_t*)(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS))))+1) & (frameIndexMask)) >> frameIndexOffset); else startFrameIndex = ((((uint32_t)(*((uint32_t*)(buffer[ithread]+HEADER_SIZE_NUM_TOT_PACKETS)))) & (frameIndexMask)) >> frameIndexOffset); //start of acquisition if(!acqStarted){ startAcquisitionIndex=startFrameIndex; currframenum = startAcquisitionIndex; acqStarted = true; cout << "startAcquisitionIndex:" << startAcquisitionIndex<push(buffer[ithread]); exit(-1); } //push the last buffer into fifo if(rc > 0){ pc = (rc/onePacketSize); #ifdef VERYDEBUG cout << ithread << " *** last packetcount:" << pc << endl; #endif (*((uint16_t*)(buffer[ithread]))) = pc; totalListeningFrameCount[ithread] += pc; while(!fifo[ithread]->push(buffer[ithread])); #ifdef VERYDEBUG cout << ithread << " *** last lbuf1:" << (void*)buffer[ithread] << endl; #endif } //push dummy buffer to all writer threads for(i=0;ipop(buffer[ithread]); (*((uint16_t*)(buffer[ithread]))) = 0xFFFF; #ifdef VERYDEBUG cout << ithread << " going to push in dummy buffer:" << (void*)buffer[ithread] << " with num packets:"<< (*((uint16_t*)(buffer[ithread]))) << endl; #endif while(!fifo[ithread]->push(buffer[ithread])); #ifdef VERYDEBUG cout << ithread << " pushed in dummy buffer:" << (void*)buffer[ithread] << endl; #endif } //reset mask and exit loop pthread_mutex_lock(&status_mutex); listeningthreads_mask^=(1< 1) cout << "Waiting for listening to be done.. current mask:" << hex << listeningthreads_mask << endl; #endif while(listeningthreads_mask) usleep(5000); #ifdef VERYDEBUG t = 0; for(i=0;ipush(wbuffer[i])); #ifdef VERYDEBUG cout << ithread << ":" << i<< " fifo freed:" << (void*)wbuffer[i] << endl; #endif } //all threads need to close file, reset mask and exit loop closeFile(ithread); pthread_mutex_lock(&status_mutex); writerthreads_mask^=(1< 0){ //for progress and packet loss calculation(new files) if(myDetectorType == EIGER); else if ((myDetectorType == GOTTHARD) && (shortFrame == -1)) tempframenum = (((((uint32_t)(*((uint32_t*)(buf + HEADER_SIZE_NUM_TOT_PACKETS))))+1)& (frameIndexMask)) >> frameIndexOffset); else tempframenum = ((((uint32_t)(*((uint32_t*)(buf + HEADER_SIZE_NUM_TOT_PACKETS))))& (frameIndexMask)) >> frameIndexOffset); if(numWriterThreads == 1) currframenum = tempframenum; else{ if(tempframenum > currframenum) currframenum = tempframenum; } #ifdef VERYDEBUG cout << "tempframenum:" << dec << tempframenum << " curframenum:" << currframenum << endl; #endif //lock if(numWriterThreads > 1) pthread_mutex_lock(&write_mutex); //to create new file when max reached packetsToSave = maxPacketsPerFile - packetsInFile; if(packetsToSave > numpackets) packetsToSave = numpackets; /**next time offset is still plus header length*/ fwrite(buf+offset, 1, packetsToSave * onePacketSize, sfilefd); packetsInFile += packetsToSave; packetsCaught += packetsToSave; totalPacketsCaught += packetsToSave; //new file if(packetsInFile >= maxPacketsPerFile){ //for packet loss lastpacket = (((packetsToSave - 1) * onePacketSize) + offset); if(myDetectorType == EIGER); else if ((myDetectorType == GOTTHARD) && (shortFrame == -1)) tempframenum = (((((uint32_t)(*((uint32_t*)(buf + lastpacket))))+1)& (frameIndexMask)) >> frameIndexOffset); else tempframenum = ((((uint32_t)(*((uint32_t*)(buf + lastpacket))))& (frameIndexMask)) >> frameIndexOffset); if(numWriterThreads == 1) currframenum = tempframenum; else{ if(tempframenum > currframenum) currframenum = tempframenum; } #ifdef VERYDEBUG cout << "tempframenum:" << dec << tempframenum << " curframenum:" << currframenum << endl; #endif //create createNewFile(); } //unlock if(numWriterThreads > 1) pthread_mutex_unlock(&write_mutex); offset += (packetsToSave * onePacketSize); numpackets -= packetsToSave; } } else{ if(numWriterThreads > 1) pthread_mutex_lock(&write_mutex); packetsInFile += numpackets; packetsCaught += numpackets; totalPacketsCaught += numpackets; if(numWriterThreads > 1) pthread_mutex_unlock(&write_mutex); } } void UDPRESTImplementation::handleDataCompression(int ithread, char* wbuffer[], int &npackets, char* data, int xmax, int ymax, int &nf){ #if defined(MYROOT1) && defined(ALLFILE_DEBUG) writeToFile_withoutCompression(wbuf[0], numpackets,currframenum); #endif eventType thisEvent = PEDESTAL; int ndata; char* buff = 0; data = wbuffer[0]+ HEADER_SIZE_NUM_TOT_PACKETS; int remainingsize = npackets * onePacketSize; int np; int once = 0; double tot, tl, tr, bl, br; int xmin = 1, ymin = 1, ix, iy; while(buff = receiverdata[ithread]->findNextFrame(data,ndata,remainingsize)){ np = ndata/onePacketSize; //cout<<"buff framnum:"<> frameIndexOffset)<newFrame(); //only for moench if(commonModeSubtractionEnable){ for(ix = xmin - 1; ix < xmax+1; ix++){ for(iy = ymin - 1; iy < ymax+1; iy++){ thisEvent = singlePhotonDet[ithread]->getEventType(buff, ix, iy, 0); } } } for(ix = xmin - 1; ix < xmax+1; ix++) for(iy = ymin - 1; iy < ymax+1; iy++){ thisEvent=singlePhotonDet[ithread]->getEventType(buff, ix, iy, commonModeSubtractionEnable); if (nf>1000) { tot=0; tl=0; tr=0; bl=0; br=0; if (thisEvent==PHOTON_MAX) { receiverdata[ithread]->getFrameNumber(buff); //iFrame=receiverdata[ithread]->getFrameNumber(buff); #ifdef MYROOT1 myTree[ithread]->Fill(); //cout << "Fill in event: frmNr: " << iFrame << " ix " << ix << " iy " << iy << " type " << thisEvent << endl; #else pthread_mutex_lock(&write_mutex); if((enableFileWrite) && (sfilefd)) singlePhotonDet[ithread]->writeCluster(sfilefd); pthread_mutex_unlock(&write_mutex); #endif } } } nf++; #ifndef ALLFILE pthread_mutex_lock(&progress_mutex); packetsInFile += packetsPerFrame; packetsCaught += packetsPerFrame; totalPacketsCaught += packetsPerFrame; if(packetsInFile >= maxPacketsPerFile) createNewFile(); pthread_mutex_unlock(&progress_mutex); #endif if(!once){ copyFrameToGui(NULL,-1,buff); once = 1; } } remainingsize -= ((buff + ndata) - data); data = buff + ndata; if(data > (wbuffer[0] + HEADER_SIZE_NUM_TOT_PACKETS + npackets * onePacketSize) ) cout <<" **************ERROR SHOULD NOT COME HERE, Error 142536!"<push(wbuffer[0])); #ifdef VERYVERBOSE cout<<"buf freed:"<<(void*)wbuffer[0]<= 0){ tengigaEnable = enable; if(myDetectorType == EIGER){ if(!tengigaEnable){ packetsPerFrame = EIGER_ONE_GIGA_CONSTANT * dynamicRange * EIGER_MAX_PORTS; onePacketSize = EIGER_ONE_GIGA_ONE_PACKET_SIZE; maxPacketsPerFile = EIGER_MAX_FRAMES_PER_FILE * packetsPerFrame; }else{ packetsPerFrame = EIGER_TEN_GIGA_CONSTANT * dynamicRange * EIGER_MAX_PORTS; onePacketSize = EIGER_TEN_GIGA_ONE_PACKET_SIZE; maxPacketsPerFile = EIGER_MAX_FRAMES_PER_FILE * packetsPerFrame*4; } frameSize = onePacketSize * packetsPerFrame; bufferSize = (frameSize/EIGER_MAX_PORTS) + EIGER_HEADER_LENGTH;//everything one port gets (img header plus packets) //maxPacketsPerFile = EIGER_MAX_FRAMES_PER_FILE * packetsPerFrame; cout<<"packetsPerFrame:"<