From 9a7e45aa56a00cdeb5f932bc4fb0b7ea9664cc7d Mon Sep 17 00:00:00 2001 From: Sala Leonardo Date: Mon, 8 Sep 2014 11:45:33 +0200 Subject: [PATCH] cleaning --- .../slsReceiver/slsReceiverUDPFunctions.cpp | 2319 ----------------- 1 file changed, 2319 deletions(-) delete mode 100644 slsReceiverSoftware/slsReceiver/slsReceiverUDPFunctions.cpp diff --git a/slsReceiverSoftware/slsReceiver/slsReceiverUDPFunctions.cpp b/slsReceiverSoftware/slsReceiver/slsReceiverUDPFunctions.cpp deleted file mode 100644 index bb8f213a5..000000000 --- a/slsReceiverSoftware/slsReceiver/slsReceiverUDPFunctions.cpp +++ /dev/null @@ -1,2319 +0,0 @@ -#ifdef SLS_RECEIVER_UDP_FUNCTIONS -/********************************************//** - * @file slsReceiverUDPFunctions.cpp - * @short does all the functions for a receiver, set/get parameters, start/stop etc. - ***********************************************/ - - -#include "slsReceiverUDPFunctions.h" -#include "UDPBaseImplementation.h" - - -#include "moench02ModuleData.h" -#include "gotthardModuleData.h" -#include "gotthardShortModuleData.h" - - -#include // SIGINT -#include // stat -#include // socket(), bind(), listen(), accept(), shut down -#include // sock_addr_in, htonl, INADDR_ANY -#include // exit() -#include //set precision -#include //munmap - - - -#include -#include -using namespace std; - - - -slsReceiverUDPFunctions * slsReceiverUDPFunctions::create(void){ - return slsReceiverUDPFunctions(); -} - -slsReceiverUDPFunctions::slsReceiverUDPFunctions(): - thread_started(0), - eth(NULL), - latestData(NULL), - guiFileName(NULL), - guiFrameNumber(0), - tengigaEnable(0){ - for(int i=0;i /proc/sys/net/core/rmem_max")) - cout << "\nWARNING: Could not change socket receiver buffer size in file /proc/sys/net/core/rmem_max" << endl; - else if(system("echo 250000 > /proc/sys/net/core/netdev_max_backlog")) - cout << "\nWARNING: Could not change max length of input queue in file /proc/sys/net/core/netdev_max_backlog" << endl; - /** permanent setting heiner - net.core.rmem_max = 104857600 # 100MiB - net.core.netdev_max_backlog = 250000 - sysctl -p - // from the manual - sysctl -w net.core.rmem_max=16777216 - sysctl -w net.core.netdev_max_backlog=250000 - */ -} - - - -slsReceiverUDPFunctions::~slsReceiverUDPFunctions(){ - createListeningThreads(true); - createWriterThreads(true); - deleteMembers(); -} - - - - -void slsReceiverUDPFunctions::deleteMembers(){ - //kill threads - if(thread_started){ - createListeningThreads(true); - createWriterThreads(true); - } - - for(int i=0;i=0) - fileIndex = i; - return getFileIndex(); -} - - -int slsReceiverUDPFunctions::setFrameIndexNeeded(int i){ - frameIndexNeeded = i; - return frameIndexNeeded; -} - - -int slsReceiverUDPFunctions::getEnableFileWrite() const{ - return enableFileWrite; -} - -int slsReceiverUDPFunctions::setEnableFileWrite(int i){ - enableFileWrite=i; - return getEnableFileWrite(); -} - -int slsReceiverUDPFunctions::getEnableOverwrite() const{ - return overwrite; -} - -int slsReceiverUDPFunctions::setEnableOverwrite(int i){ - overwrite=i; - return getEnableOverwrite(); -} - - - - - -/*other parameters*/ - -slsReceiverDefs::runStatus slsReceiverUDPFunctions::getStatus() const{ - return status; -} - - -void slsReceiverUDPFunctions::initialize(const char *detectorHostName){ - if(strlen(detectorHostName)) - strcpy(detHostname,detectorHostName); -} - - -char *slsReceiverUDPFunctions::getDetectorHostname() const{ - return (char*)detHostname; -} - -void slsReceiverUDPFunctions::setEthernetInterface(char* c){ - strcpy(eth,c); -} - - -void slsReceiverUDPFunctions::setUDPPortNo(int p){ - for(int i=0;i= 0) - numberOfFrames = fnum; - - return getNumberOfFrames(); -} - -int slsReceiverUDPFunctions::getScanTag() const{ - return scanTag; -} - - -int32_t slsReceiverUDPFunctions::setScanTag(int32_t stag){ - if(stag >= 0) - scanTag = stag; - - return getScanTag(); -} - - -int slsReceiverUDPFunctions::getDynamicRange() const{ - return dynamicRange; -} - -int32_t slsReceiverUDPFunctions::setDynamicRange(int32_t dr){ - cout << "Setting Dynamic Range" << endl; - - int olddr = dynamicRange; - if(dr >= 0){ - dynamicRange = dr; - - if(myDetectorType == EIGER){ - - - if(!tengigaEnable) - packetsPerFrame = EIGER_ONE_GIGA_CONSTANT * dynamicRange * EIGER_MAX_PORTS; - else - packetsPerFrame = EIGER_TEN_GIGA_CONSTANT * dynamicRange * EIGER_MAX_PORTS; - frameSize = onePacketSize * packetsPerFrame; - bufferSize = (frameSize/EIGER_MAX_PORTS) + EIGER_HEADER_LENGTH;//everything one port gets (img header plus packets) - maxPacketsPerFile = EIGER_MAX_FRAMES_PER_FILE * packetsPerFrame; - - - - if(olddr != dr){ - - //del - if(thread_started){ - createListeningThreads(true); - createWriterThreads(true); - } - for(int i=0;i=0){ - nFrameToGui = i; - setupFifoStructure(); - } - return nFrameToGui; -} - - - -int64_t slsReceiverUDPFunctions::setAcquisitionPeriod(int64_t index){ - - if(index >= 0){ - if(index != acquisitionPeriod){ - acquisitionPeriod = index; - setupFifoStructure(); - } - } - return acquisitionPeriod; -} - - -bool slsReceiverUDPFunctions::getDataCompression(){return dataCompression;} - -int slsReceiverUDPFunctions::enableDataCompression(bool enable){ - cout << "Data compression "; - if(enable) - cout << "enabled" << endl; - else - cout << "disabled" << endl; -#ifdef MYROOT1 - cout << " WITH ROOT" << endl; -#else - cout << " WITHOUT ROOT" << endl; -#endif - //delete filter for the current number of threads - deleteFilter(); - - dataCompression = enable; - pthread_mutex_lock(&status_mutex); - writerthreads_mask = 0x0; - pthread_mutex_unlock(&(status_mutex)); - - createWriterThreads(true); - - if(enable) - numWriterThreads = MAX_NUM_WRITER_THREADS; - else - numWriterThreads = 1; - - if(createWriterThreads() == FAIL){ - cout << "ERROR: Could not create writer threads" << endl; - return FAIL; - } - setThreadPriorities(); - - - if(enable) - setupFilter(); - - return OK; -} - - - - - - - - - - - - -/*other functions*/ - - -void slsReceiverUDPFunctions::deleteFilter(){ - int i; - cmSub=NULL; - - for(i=0;i(receiverdata[i], csize, sigma, sign, cmSub); - -} - - - -//LEO: it is not clear to me.. -void slsReceiverUDPFunctions::setupFifoStructure(){ - - int64_t i; - int oldn = numJobsPerThread; - - //if every nth frame mode - if(nFrameToGui) - numJobsPerThread = nFrameToGui; - - //random nth frame mode - else{ - if(!acquisitionPeriod) - i = SAMPLE_TIME_IN_NS; - else - i = SAMPLE_TIME_IN_NS/acquisitionPeriod; - if (i > MAX_JOBS_PER_THREAD) - numJobsPerThread = MAX_JOBS_PER_THREAD; - else if (i < 1) - numJobsPerThread = 1; - else - numJobsPerThread = i; - } - - //if same, return - if(oldn == numJobsPerThread) - return; - - if(myDetectorType == EIGER) - numJobsPerThread = 1; - - //otherwise memory too much if numjobsperthread is at max = 1000 - fifosize = GOTTHARD_FIFO_SIZE; - if(myDetectorType == MOENCH) - fifosize = MOENCH_FIFO_SIZE; - else if(myDetectorType == EIGER) - fifosize = EIGER_FIFO_SIZE; - - if(fifosize % numJobsPerThread) - fifosize = (fifosize/numJobsPerThread)+1; - else - fifosize = fifosize/numJobsPerThread; - - - cout << "Number of Frames per buffer:" << numJobsPerThread << endl; - cout << "Fifo Size:" << fifosize << endl; - - /* - //for testing - numJobsPerThread = 3; fifosize = 11; - */ - - for(int i=0;iisEmpty()) - fifoFree[i]->pop(buffer[i]); - delete fifoFree[i]; - } - if(fifo[i]) delete fifo[i]; - if(mem0[i]) free(mem0[i]); - fifoFree[i] = new CircularFifo(fifosize); - fifo[i] = new CircularFifo(fifosize); - - - //allocate memory - mem0[i]=(char*)malloc((bufferSize * numJobsPerThread + HEADER_SIZE_NUM_TOT_PACKETS)*fifosize); - /** shud let the client know about this */ - if (mem0[i]==NULL){ - cout<<"++++++++++++++++++++++ COULD NOT ALLOCATE MEMORY FOR LISTENING !!!!!!!+++++++++++++++++++++" << endl; - exit(-1); - } - buffer[i]=mem0[i]; - //push the addresses into freed fifoFree and writingFifoFree - while (buffer[i]<(mem0[i]+(bufferSize * numJobsPerThread + HEADER_SIZE_NUM_TOT_PACKETS)*(fifosize-1))) { - fifoFree[i]->push(buffer[i]); - buffer[i]+=(bufferSize * numJobsPerThread + HEADER_SIZE_NUM_TOT_PACKETS); - } - } - cout << "Fifo structure(s) reconstructed" << endl; -} - - - - - - - -/** acquisition functions */ - -void slsReceiverUDPFunctions::readFrame(char* c,char** raw, uint32_t &fnum){ - //point to gui data - if (guiData == NULL) - guiData = latestData; - - //copy data and filename - strcpy(c,guiFileName); - fnum = guiFrameNumber; - - - //could not get gui data - if(!guiDataReady){ - *raw = NULL; - } - //data ready, set guidata to receive new data - else{ - *raw = guiData; - guiData = NULL; - - pthread_mutex_lock(&dataReadyMutex); - guiDataReady = 0; - pthread_mutex_unlock(&dataReadyMutex); - if((nFrameToGui) && (writerthreads_mask)){ - /*if(nFrameToGui){*/ - //release after getting data - sem_post(&smp); - } - } -} - - - - - -void slsReceiverUDPFunctions::copyFrameToGui(char* startbuf[], uint32_t fnum, char* buf){ - - //random read when gui not ready - if((!nFrameToGui) && (!guiData)){ - pthread_mutex_lock(&dataReadyMutex); - guiDataReady=0; - pthread_mutex_unlock(&dataReadyMutex); - } - - //random read or nth frame read, gui needs data now - else{ - /* - //nth frame read, block current process if the guireader hasnt read it yet - if(nFrameToGui) - sem_wait(&smp); -*/ - pthread_mutex_lock(&dataReadyMutex); - guiDataReady=0; - //eiger - if(startbuf != NULL){ - int offset = 0; - int size = frameSize/EIGER_MAX_PORTS; - for(int j=0;jgetErrorStatus(); - if(iret){ -#ifdef VERBOSE - cout << "Could not create UDP socket on port " << server_port[i] << " error:" << iret << endl; -#endif - return FAIL; - } - } - - return OK; -} - - - - - - - -int slsReceiverUDPFunctions::shutDownUDPSockets(){ - for(int i=0;iShutDownSocket(); - delete udpSocket[i]; - udpSocket[i] = NULL; - } - } - return OK; -} - - - - - -int slsReceiverUDPFunctions::createListeningThreads(bool destroy){ - int i; - void* status; - - killAllListeningThreads = 0; - - pthread_mutex_lock(&status_mutex); - listeningthreads_mask = 0x0; - pthread_mutex_unlock(&(status_mutex)); - - if(!destroy){ - - //start listening threads - cout << "Creating Listening Threads(s)"; - - currentListeningThreadIndex = -1; - - for(i = 0; i < numListeningThreads; ++i){ - sem_init(&listensmp[i],1,0); - thread_started = 0; - currentListeningThreadIndex = i; - if(pthread_create(&listening_thread[i], NULL,startListeningThread, (void*) this)){ - cout << "Could not create listening thread with index " << i << endl; - return FAIL; - } - while(!thread_started); - cout << "."; - cout << flush; - } -#ifdef VERBOSE - cout << "Listening thread(s) created successfully." << endl; -#else - cout << endl; -#endif - }else{ - cout<<"Destroying Listening Thread(s)"<initEventTree(temp, &iframe); - //resets the pedestalSubtraction array and the commonModeSubtraction - singlePhotonDet[ithr]->newDataSet(); - if(myFile[ithr]==NULL){ - cout<<"file null"<IsOpen()){ - cout<<"file not open"< DO_NOTHING){ - //close - if(sfilefd){ - fclose(sfilefd); - sfilefd = NULL; - } - //open file - if(!overwrite){ - if (NULL == (sfilefd = fopen((const char *) (savefilename), "wx"))){ - cout << "Error: Could not create new file " << savefilename << endl; - return FAIL; - } - }else if (NULL == (sfilefd = fopen((const char *) (savefilename), "w"))){ - cout << "Error: Could not create file " << savefilename << endl; - return FAIL; - } - //setting buffer - setvbuf(sfilefd,NULL,_IOFBF,BUF_SIZE); - - //printing packet losses and file names - if(!packetsCaught) - cout << savefilename << endl; - else{ - cout << savefilename - << "\tpacket loss " - << setw(4)<GetCurrentFile(); - - if(myFile[ithr]->Write()) - //->Write(tall->GetName(),TObject::kOverwrite); - cout << "Thread " << ithr <<": wrote frames to file" << endl; - else - cout << "Thread " << ithr << ": could not write frames to file" << endl; - - }else - cout << "Thread " << ithr << ": could not write frames to file: No file or No Tree" << endl; - //close file - if(myTree[ithr] && myFile[ithr]) - myFile[ithr] = myTree[ithr]->GetCurrentFile(); - if(myFile[ithr] != NULL) - myFile[ithr]->Close(); - myFile[ithr] = NULL; - myTree[ithr] = NULL; - pthread_mutex_unlock(&write_mutex); - -#endif - } -} - - - - - -int slsReceiverUDPFunctions::startReceiver(char message[]){ - int i; - - -// #ifdef VERBOSE - cout << "Starting Receiver" << endl; -//#endif - - - //reset listening thread variables - measurementStarted = false; - //should be set to zero as its added to get next start frame indices for scans for eiger - if(!acqStarted) currframenum = 0; - startFrameIndex = 0; - - for(int i = 0; i < numListeningThreads; ++i) - totalListeningFrameCount[i] = 0; - - //udp socket - if(createUDPSockets() == FAIL){ - strcpy(message,"Could not create UDP Socket(s).\n"); - cout << endl << message << endl; - return FAIL; - } - cout << "UDP socket(s) created successfully. 1st port " << server_port[0] << endl; - - - if(setupWriter() == FAIL){ - //stop udp socket - shutDownUDPSockets(); - - sprintf(message,"Could not create file %s.\n",savefilename); - return FAIL; - } - cout << "Successfully created file(s)" << endl; - - //done to give the gui some proper name instead of always the last file name - if(dataCompression) - sprintf(savefilename, "%s/%s_fxxx_%d_xx.root", filePath,fileName,fileIndex); - - //initialize semaphore - sem_init(&smp,1,0); - - //status - pthread_mutex_lock(&status_mutex); - status = RUNNING; - for(i=0;istartListening(); - - return this_pointer; -} - - - -void* slsReceiverUDPFunctions::startWritingThread(void* this_pointer){ - ((slsReceiverUDPFunctions*)this_pointer)->startWriting(); - return this_pointer; -} - - - - - - -int slsReceiverUDPFunctions::startListening(){ - int ithread = currentListeningThreadIndex; -#ifdef VERYVERBOSE - cout << "In startListening() " << endl; -#endif - - thread_started = 1; - - int i,total; - int lastpacketoffset, expected, rc, rc1,packetcount, maxBufferSize, carryonBufferSize; - uint32_t lastframeheader;// for moench to check for all the packets in last frame - char* tempchar = NULL; - int imageheader = 0; - if(myDetectorType==EIGER) - imageheader = EIGER_IMAGE_HEADER_SIZE; - - - while(1){ - //variables that need to be checked/set before each acquisition - carryonBufferSize = 0; - //if more than 1 listening thread, listen one packet at a time, else need to interleaved frame later - maxBufferSize = bufferSize * numJobsPerThread; -#ifdef VERYDEBUG - cout << " maxBufferSize:" << maxBufferSize << ",carryonBufferSize:" << carryonBufferSize << endl; -#endif - - if(tempchar) {delete [] tempchar;tempchar = NULL;} - if(myDetectorType != EIGER) - tempchar = new char[onePacketSize * ((packetsPerFrame/numListeningThreads) - 1)]; //gotthard: 1packet size, moench:39 packet size - - - while((1<pop(buffer[ithread]); -#ifdef VERYDEBUG - cout << ithread << " *** popped from fifo free" << (void*)buffer[ithread] << endl; -#endif - - - //receive - if(udpSocket[ithread] == NULL){ - rc = 0; - cout << ithread << "UDP Socket is NULL" << endl; - } - //normal listening - else if(!carryonBufferSize){ - - rc = udpSocket[ithread]->ReceiveDataOnly(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS, maxBufferSize); - expected = maxBufferSize; - - } - //the remaining packets from previous buffer - else{ -#ifdef VERYDEBUG - cout << ithread << " ***carry on buffer" << carryonBufferSize << endl; - cout << ithread << " framennum in temochar:"<<((((uint32_t)(*((uint32_t*)tempchar))) - & (frameIndexMask)) >> frameIndexOffset)<ReceiveDataOnly((buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS + carryonBufferSize),maxBufferSize - carryonBufferSize); - expected = maxBufferSize - carryonBufferSize; - } - -#ifdef VERYDEBUG - cout << ithread << " *** rc:" << dec << rc << ". expected:" << dec << expected << endl; -#endif - - - - - //start indices for each start of scan/acquisition - eiger does it before - if((!measurementStarted) && (rc > 0) && (!ithread)) - startFrameIndices(ithread); - - //problem in receiving or end of acquisition - if((rc < expected)||(rc <= 0)){ - stopListening(ithread,rc,packetcount,total); - continue; - } - - - - //reset - packetcount = (packetsPerFrame/numListeningThreads) * numJobsPerThread; - carryonBufferSize = 0; - - - - //check if last packet valid and calculate packet count - switch(myDetectorType){ - - case MOENCH: - lastpacketoffset = (((numJobsPerThread * packetsPerFrame - 1) * onePacketSize) + HEADER_SIZE_NUM_TOT_PACKETS); -#ifdef VERYDEBUG - cout <<"first packet:"<< ((((uint32_t)(*((uint32_t*)(buffer[ithread]+HEADER_SIZE_NUM_TOT_PACKETS))))) & (packetIndexMask)) << endl; - cout <<"first header:"<< (((((uint32_t)(*((uint32_t*)(buffer[ithread]+HEADER_SIZE_NUM_TOT_PACKETS))))) & (frameIndexMask)) >> frameIndexOffset) << endl; - cout << "last packet offset:" << lastpacketoffset << endl; - cout <<"last packet:"<< ((((uint32_t)(*((uint32_t*)(buffer[ithread]+lastpacketoffset))))) & (packetIndexMask)) << endl; - cout <<"last header:"<< (((((uint32_t)(*((uint32_t*)(buffer[ithread]+lastpacketoffset))))) & (frameIndexMask)) >> frameIndexOffset) << endl; -#endif - //moench last packet value is 0 - if( ((((uint32_t)(*((uint32_t*)(buffer[ithread]+lastpacketoffset))))) & (packetIndexMask))){ - lastframeheader = ((((uint32_t)(*((uint32_t*)(buffer[ithread]+lastpacketoffset))))) & (frameIndexMask)) >> frameIndexOffset; - carryonBufferSize += onePacketSize; - lastpacketoffset -= onePacketSize; - --packetcount; - while (lastframeheader == (((((uint32_t)(*((uint32_t*)(buffer[ithread]+lastpacketoffset))))) & (frameIndexMask)) >> frameIndexOffset)){ - carryonBufferSize += onePacketSize; - lastpacketoffset -= onePacketSize; - --packetcount; - } - memcpy(tempchar, buffer[ithread]+(lastpacketoffset+onePacketSize), carryonBufferSize); -#ifdef VERYDEBUG - cout << "tempchar header:" << (((((uint32_t)(*((uint32_t*)(tempchar))))) - & (frameIndexMask)) >> frameIndexOffset) << endl; - cout <<"tempchar packet:"<< ((((uint32_t)(*((uint32_t*)(tempchar))))) - & (packetIndexMask)) << endl; -#endif - } - break; - - case GOTTHARD: - if(shortFrame == -1){ - lastpacketoffset = (((numJobsPerThread * packetsPerFrame - 1) * onePacketSize) + HEADER_SIZE_NUM_TOT_PACKETS); -#ifdef VERYDEBUG - cout << "last packet offset:" << lastpacketoffset << endl; -#endif - - if((unsigned int)(packetsPerFrame -1) != ((((uint32_t)(*((uint32_t*)(buffer[ithread]+lastpacketoffset))))+1) & (packetIndexMask))){ - memcpy(tempchar,buffer[ithread]+lastpacketoffset, onePacketSize); -#ifdef VERYDEBUG - cout << "tempchar header:" << (((((uint32_t)(*((uint32_t*)(tempchar))))+1) - & (frameIndexMask)) >> frameIndexOffset) << endl; -#endif - carryonBufferSize = onePacketSize; - --packetcount; - } - } -#ifdef VERYDEBUG - cout << "header:" << (((((uint32_t)(*((uint32_t*)(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS))))+1) - & (frameIndexMask)) >> frameIndexOffset) << endl; -#endif - break; - default: - - break; - - } - - - // cout<<"*********** "<fnum)<push(buffer[ithread])); -#ifdef VERYDEBUG - if(!ithread) cout << ithread << " *** pushed into listening fifo" << endl; -#endif - } - - sem_wait(&listensmp[ithread]); - - //make sure its not exiting thread - if(killAllListeningThreads){ - cout << ithread << " good bye listening thread" << endl; - if(tempchar) {delete [] tempchar;tempchar = NULL;} - pthread_exit(NULL); - } - } - - return OK; -} - - - - - - - - - - - - - -int slsReceiverUDPFunctions::startWriting(){ - int ithread = currentWriterThreadIndex; -#ifdef VERYVERBOSE - cout << ithread << "In startWriting()" <pop(wbuf[i]); - numpackets = (uint16_t)(*((uint16_t*)wbuf[i])); -#ifdef VERYDEBUG - cout << ithread << " numpackets:" << dec << numpackets << endl; -#endif - } - -#ifdef VERYDEBUG - cout << ithread << " numpackets:" << dec << numpackets << endl; - cout << ithread << " *** writer popped from fifo " << (void*) wbuf[0]<< endl; - cout << ithread << " *** writer popped from fifo " << (void*) wbuf[1]<< endl; -#endif - - - //last dummy packet - if(numpackets == 0xFFFF){ - stopWriting(ithread,wbuf); - continue; - } - - - - - //for progress - if(myDetectorType == EIGER){ - tempframenum = htonl(*(unsigned int*)((eiger_image_header *)((char*)(wbuf[ithread] + HEADER_SIZE_NUM_TOT_PACKETS)))->fnum); - tempframenum += (startFrameIndex-1); //eiger frame numbers start at 1, so need to -1 - }else if ((myDetectorType == GOTTHARD) && (shortFrame == -1)) - tempframenum = (((((uint32_t)(*((uint32_t*)(wbuf[ithread] + HEADER_SIZE_NUM_TOT_PACKETS))))+1)& (frameIndexMask)) >> frameIndexOffset); - else - tempframenum = ((((uint32_t)(*((uint32_t*)(wbuf[ithread] + HEADER_SIZE_NUM_TOT_PACKETS))))& (frameIndexMask)) >> frameIndexOffset); - - if(numWriterThreads == 1) - currframenum = tempframenum; - else{ - pthread_mutex_lock(&progress_mutex); - if(tempframenum > currframenum) - currframenum = tempframenum; - pthread_mutex_unlock(&progress_mutex); - } -//#ifdef VERYDEBUG - if(myDetectorType == EIGER) - cout << endl < 0){ - for(i=0;ipush(wbuf[i])); -#ifdef VERYDEBUG - cout << ithread << ":" << i+j << " fifo freed:" << (void*)wbuf[i] << endl; -#endif - } - - - } - else{ - //copy to gui - copyFrameToGui(NULL,-1,wbuf[0]+HEADER_SIZE_NUM_TOT_PACKETS); -#ifdef VERYVERBOSE - cout << ithread << " finished copying" << endl; -#endif - while(!fifoFree[0]->push(wbuf[0])); -#ifdef VERYVERBOSE - cout<<"buf freed:"<<(void*)wbuf[0]<fnum); - //gotthard has +1 for frame number and not a short frame - else if ((myDetectorType == GOTTHARD) && (shortFrame == -1)) - startFrameIndex = (((((uint32_t)(*((uint32_t*)(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS))))+1) - & (frameIndexMask)) >> frameIndexOffset); - else - startFrameIndex = ((((uint32_t)(*((uint32_t*)(buffer[ithread]+HEADER_SIZE_NUM_TOT_PACKETS)))) - & (frameIndexMask)) >> frameIndexOffset); - - - //start of acquisition - if(!acqStarted){ - startAcquisitionIndex=startFrameIndex; - currframenum = startAcquisitionIndex; - acqStarted = true; - cout << "startAcquisitionIndex:" << startAcquisitionIndex<push(buffer[ithread]); - exit(-1); - } - //push the last buffer into fifo - if(rc > 0){ - pc = (rc/onePacketSize); -#ifdef VERYDEBUG - cout << ithread << " *** last packetcount:" << pc << endl; -#endif - (*((uint16_t*)(buffer[ithread]))) = pc; - totalListeningFrameCount[ithread] += pc; - while(!fifo[ithread]->push(buffer[ithread])); -#ifdef VERYDEBUG - cout << ithread << " *** last lbuf1:" << (void*)buffer[ithread] << endl; -#endif - } - - - //push dummy buffer to all writer threads - for(i=0;ipop(buffer[ithread]); - (*((uint16_t*)(buffer[ithread]))) = 0xFFFF; -#ifdef VERYDEBUG - cout << ithread << " going to push in dummy buffer:" << (void*)buffer[ithread] << " with num packets:"<< (*((uint16_t*)(buffer[ithread]))) << endl; -#endif - while(!fifo[ithread]->push(buffer[ithread])); -#ifdef VERYDEBUG - cout << ithread << " pushed in dummy buffer:" << (void*)buffer[ithread] << endl; -#endif - } - - //reset mask and exit loop - pthread_mutex_lock(&status_mutex); - listeningthreads_mask^=(1< 1) - cout << "Waiting for listening to be done.. current mask:" << hex << listeningthreads_mask << endl; -#endif - while(listeningthreads_mask) - usleep(5000); -#ifdef VERYDEBUG - t = 0; - for(i=0;ipush(wbuffer[i])); -#ifdef VERYDEBUG - cout << ithread << ":" << i<< " fifo freed:" << (void*)wbuffer[i] << endl; -#endif - } - - - - //all threads need to close file, reset mask and exit loop - closeFile(ithread); - pthread_mutex_lock(&status_mutex); - writerthreads_mask^=(1< 0){ - - //for progress and packet loss calculation(new files) - if(myDetectorType == EIGER); - else if ((myDetectorType == GOTTHARD) && (shortFrame == -1)) - tempframenum = (((((uint32_t)(*((uint32_t*)(buf + HEADER_SIZE_NUM_TOT_PACKETS))))+1)& (frameIndexMask)) >> frameIndexOffset); - else - tempframenum = ((((uint32_t)(*((uint32_t*)(buf + HEADER_SIZE_NUM_TOT_PACKETS))))& (frameIndexMask)) >> frameIndexOffset); - - if(numWriterThreads == 1) - currframenum = tempframenum; - else{ - if(tempframenum > currframenum) - currframenum = tempframenum; - } -#ifdef VERYDEBUG - cout << "tempframenum:" << dec << tempframenum << " curframenum:" << currframenum << endl; -#endif - - //lock - if(numWriterThreads > 1) - pthread_mutex_lock(&write_mutex); - - - //to create new file when max reached - packetsToSave = maxPacketsPerFile - packetsInFile; - if(packetsToSave > numpackets) - packetsToSave = numpackets; -/**next time offset is still plus header length*/ - fwrite(buf+offset, 1, packetsToSave * onePacketSize, sfilefd); - packetsInFile += packetsToSave; - packetsCaught += packetsToSave; - totalPacketsCaught += packetsToSave; - - - //new file - if(packetsInFile >= maxPacketsPerFile){ - //for packet loss - lastpacket = (((packetsToSave - 1) * onePacketSize) + offset); - if(myDetectorType == EIGER); - else if ((myDetectorType == GOTTHARD) && (shortFrame == -1)) - tempframenum = (((((uint32_t)(*((uint32_t*)(buf + lastpacket))))+1)& (frameIndexMask)) >> frameIndexOffset); - else - tempframenum = ((((uint32_t)(*((uint32_t*)(buf + lastpacket))))& (frameIndexMask)) >> frameIndexOffset); - - if(numWriterThreads == 1) - currframenum = tempframenum; - else{ - if(tempframenum > currframenum) - currframenum = tempframenum; - } -#ifdef VERYDEBUG - cout << "tempframenum:" << dec << tempframenum << " curframenum:" << currframenum << endl; -#endif - //create - createNewFile(); - } - - //unlock - if(numWriterThreads > 1) - pthread_mutex_unlock(&write_mutex); - - - offset += (packetsToSave * onePacketSize); - numpackets -= packetsToSave; - } - - } - else{ - if(numWriterThreads > 1) - pthread_mutex_lock(&write_mutex); - packetsInFile += numpackets; - packetsCaught += numpackets; - totalPacketsCaught += numpackets; - if(numWriterThreads > 1) - pthread_mutex_unlock(&write_mutex); - } -} - - - - - - - - - - - - - - -void slsReceiverUDPFunctions::handleDataCompression(int ithread, char* wbuffer[], int &npackets, char* data, int xmax, int ymax, int &nf){ - -#if defined(MYROOT1) && defined(ALLFILE_DEBUG) - writeToFile_withoutCompression(wbuf[0], numpackets,currframenum); -#endif - - eventType thisEvent = PEDESTAL; - int ndata; - char* buff = 0; - data = wbuffer[0]+ HEADER_SIZE_NUM_TOT_PACKETS; - int remainingsize = npackets * onePacketSize; - int np; - int once = 0; - double tot, tl, tr, bl, br; - int xmin = 1, ymin = 1, ix, iy; - - - while(buff = receiverdata[ithread]->findNextFrame(data,ndata,remainingsize)){ - np = ndata/onePacketSize; - - //cout<<"buff framnum:"<> frameIndexOffset)<newFrame(); - - //only for moench - if(commonModeSubtractionEnable){ - for(ix = xmin - 1; ix < xmax+1; ix++){ - for(iy = ymin - 1; iy < ymax+1; iy++){ - thisEvent = singlePhotonDet[ithread]->getEventType(buff, ix, iy, 0); - } - } - } - - - for(ix = xmin - 1; ix < xmax+1; ix++) - for(iy = ymin - 1; iy < ymax+1; iy++){ - thisEvent=singlePhotonDet[ithread]->getEventType(buff, ix, iy, commonModeSubtractionEnable); - if (nf>1000) { - tot=0; - tl=0; - tr=0; - bl=0; - br=0; - if (thisEvent==PHOTON_MAX) { - receiverdata[ithread]->getFrameNumber(buff); - //iFrame=receiverdata[ithread]->getFrameNumber(buff); -#ifdef MYROOT1 - myTree[ithread]->Fill(); - //cout << "Fill in event: frmNr: " << iFrame << " ix " << ix << " iy " << iy << " type " << thisEvent << endl; -#else - pthread_mutex_lock(&write_mutex); - if((enableFileWrite) && (sfilefd)) - singlePhotonDet[ithread]->writeCluster(sfilefd); - pthread_mutex_unlock(&write_mutex); -#endif - } - } - } - - nf++; -#ifndef ALLFILE - pthread_mutex_lock(&progress_mutex); - packetsInFile += packetsPerFrame; - packetsCaught += packetsPerFrame; - totalPacketsCaught += packetsPerFrame; - if(packetsInFile >= maxPacketsPerFile) - createNewFile(); - pthread_mutex_unlock(&progress_mutex); - -#endif - if(!once){ - copyFrameToGui(NULL,-1,buff); - once = 1; - } - } - - remainingsize -= ((buff + ndata) - data); - data = buff + ndata; - if(data > (wbuffer[0] + HEADER_SIZE_NUM_TOT_PACKETS + npackets * onePacketSize) ) - cout <<" **************ERROR SHOULD NOT COME HERE, Error 142536!"<push(wbuffer[0])); -#ifdef VERYVERBOSE - cout<<"buf freed:"<<(void*)wbuffer[0]<= 0){ - - tengigaEnable = enable; - - if(myDetectorType == EIGER){ - - if(!tengigaEnable){ - packetsPerFrame = EIGER_ONE_GIGA_CONSTANT * dynamicRange * EIGER_MAX_PORTS; - onePacketSize = EIGER_ONE_GIGA_ONE_PACKET_SIZE; - maxPacketsPerFile = EIGER_MAX_FRAMES_PER_FILE * packetsPerFrame; - }else{ - packetsPerFrame = EIGER_TEN_GIGA_CONSTANT * dynamicRange * EIGER_MAX_PORTS; - onePacketSize = EIGER_TEN_GIGA_ONE_PACKET_SIZE; - maxPacketsPerFile = EIGER_MAX_FRAMES_PER_FILE * packetsPerFrame*4; - } - frameSize = onePacketSize * packetsPerFrame; - bufferSize = (frameSize/EIGER_MAX_PORTS) + EIGER_HEADER_LENGTH;//everything one port gets (img header plus packets) - //maxPacketsPerFile = EIGER_MAX_FRAMES_PER_FILE * packetsPerFrame; - - - cout<<"packetsPerFrame:"<