diff --git a/slsReceiverSoftware/Makefile b/slsReceiverSoftware/Makefile index 4aa0c3fde..8f6e00910 100644 --- a/slsReceiverSoftware/Makefile +++ b/slsReceiverSoftware/Makefile @@ -14,8 +14,10 @@ DFLAGS= -g -DDACS_INT -DSLS_RECEIVER_UDP_FUNCTIONS INCLUDES?= -I. -Iincludes -IMySocketTCP -IslsReceiver -IslsDetectorCalibration -IslsReceiver/eigerReceiver -I$(ASM) #-IslsReceiverInterface -SRC_CLNT= MySocketTCP/MySocketTCP.cpp slsReceiver/slsReceiver.cpp slsReceiver/slsReceiverUDPFunctions.cpp slsReceiver/slsReceiverTCPIPInterface.cpp slsReceiver/slsReceiverUsers.cpp +SRC_CLNT= MySocketTCP/MySocketTCP.cpp slsReceiver/slsReceiver.cpp slsReceiver/UDPInterface.cpp slsReceiver/UDPBaseImplementation.cpp slsReceiver/slsReceiverTCPIPInterface.cpp slsReceiver/slsReceiverUsers.cpp + #slsReceiverInterface/receiverInterface.cpp +#slsReceiver/slsReceiverUDPFunctions.cpp OBJS = $(SRC_CLNT:.cpp=.o) OBJS += slsReceiver/eigerReceiver.o diff --git a/slsReceiverSoftware/slsReceiver/Makefile b/slsReceiverSoftware/slsReceiver/Makefile index d5f02c2ea..b4a681c3f 100644 --- a/slsReceiverSoftware/slsReceiver/Makefile +++ b/slsReceiverSoftware/slsReceiver/Makefile @@ -13,7 +13,7 @@ LDFLAGRXR ?= -L$(LIBDIR) -lSlsReceiver -L/usr/lib64/ -lpthread LDFLAGRXR += -lm -lstdc++ -INCLUDES ?= -I ../MySocketTCP -I ../slsDetectorCalibration -I ../includes -I eigerReceiver -I . +INCLUDES ?= -I ../MySocketTCP -I ../slsDetectorCalibration -I ../includes/ -I eigerReceiver -I . SRC_CLNT = main.cpp @@ -40,19 +40,21 @@ $(DESTDIR)/sslsReceiver: lib $(DESTDIR)/slsReceiver: eigerReceiver lib + echo "AAAAAAAAAAAA" $(CXX) -o $@ $(SRC_CLNT) $(FLAGS) $(INCLUDES) $(CLAGS) $(LIBS) $(LDFLAGRXR) -fPIC $(CXX) -o $@ $(SRC_CLNT) $(FLAGS) $(INCLUDES) $(CLAGS) $(LIBS) $(LDFLAGRXR) -fPIC #$(EIGERFLAGS) ifeq ($(EIGERSLS), yes) eigerReceiver: - $(CXX) $(FLAGS) $(CFLAGS) -fPIC -c -o eigerReceiverTest.o eigerReceiver/eigerReceiverTest.cpp $(EIGERFLAGS) - $(CXX) $(FLAGS) $(CFLAGS) -fPIC -c -o eigerReceiver.o eigerReceiver/eigerReceiver.cpp $(EIGERFLAGS) - $(CXX) eigerReceiverTest.o eigerReceiver.o -o eigerReceiver/eigerReceiverTest $(EIGERFLAGS) +# $(CXX) $(FLAGS) $(CFLAGS) -fPIC -c -o eigerReceiverTest.o eigerReceiver/eigerReceiverTest.cpp $(EIGERFLAGS) +# $(CXX) $(FLAGS) $(CFLAGS) -fPIC -c -o eigerReceiver.o eigerReceiver/eigerReceiver.cpp $(EIGERFLAGS) +# $(CXX) eigerReceiverTest.o eigerReceiver.o -o eigerReceiver/eigerReceiverTest $(EIGERFLAGS) + $(CXX) $(FLAGS) $(CFLAGS) $(INCLUDES) -fPIC -c -o eigerReceiver.o eigerReceiverImplementation.cpp $(EIGERFLAGS) else ifeq ($(ROOTSLS), yes) eigerReceiver: eigerReceiver/eigerReceiverDummy.cpp echo "Compiling with root" - $(CXX) $(FLAGS) $(CFLAGS) -fPIC -c -o eigerReceiver.o eigerReceiver/eeigerReceiverDummy.cpp $(ROOTFLAGS) + $(CXX) $(FLAGS) $(CFLAGS) -fPIC -c -o eigerReceiver.o eigerReceiver/eigerReceiverDummy.cpp $(ROOTFLAGS) else eigerReceiver: eigerReceiver/eigerReceiverDummy.cpp $(CXX) $(FLAGS) $(CFLAGS) $(INCLUDES) -fPIC -c -o eigerReceiver.o eigerReceiver/eigerReceiverDummy.cpp @@ -63,6 +65,6 @@ lib: clean: rm -rf $(PROGS) *.o eigerReceiverTest $(DESTDIR)/libSlsReceiver.a $(DESTDIR)/libSlsReceiver.so core - + diff --git a/slsReceiverSoftware/slsReceiver/UDPBaseImplementation.cpp b/slsReceiverSoftware/slsReceiver/UDPBaseImplementation.cpp new file mode 100644 index 000000000..aa0aa77e1 --- /dev/null +++ b/slsReceiverSoftware/slsReceiver/UDPBaseImplementation.cpp @@ -0,0 +1,2317 @@ +#ifdef SLS_RECEIVER_UDP_FUNCTIONS +/********************************************//** + * @file UDPBaseImplementation.cpp + * @short does all the functions for a receiver, set/get parameters, start/stop etc. + ***********************************************/ + + +#include "UDPBaseImplementation.h" + +#include "moench02ModuleData.h" +#include "gotthardModuleData.h" +#include "gotthardShortModuleData.h" + + +#include // SIGINT +#include // stat +#include // socket(), bind(), listen(), accept(), shut down +#include // sock_addr_in, htonl, INADDR_ANY +#include // exit() +#include //set precision +#include //munmap + + + +#include +#include + + + +using namespace std; + + + +UDPBaseImplementation::UDPBaseImplementation(): + thread_started(0), + eth(NULL), + latestData(NULL), + guiFileName(NULL), + guiFrameNumber(0), + tengigaEnable(0){ + + for(int i=0;i /proc/sys/net/core/rmem_max")) + cout << "\nWARNING: Could not change socket receiver buffer size in file /proc/sys/net/core/rmem_max" << endl; + else if(system("echo 250000 > /proc/sys/net/core/netdev_max_backlog")) + cout << "\nWARNING: Could not change max length of input queue in file /proc/sys/net/core/netdev_max_backlog" << endl; + /** permanent setting heiner + net.core.rmem_max = 104857600 # 100MiB + net.core.netdev_max_backlog = 250000 + sysctl -p + // from the manual + sysctl -w net.core.rmem_max=16777216 + sysctl -w net.core.netdev_max_backlog=250000 + */ +} + + + +UDPBaseImplementation::~UDPBaseImplementation(){ + createListeningThreads(true); + createWriterThreads(true); + deleteMembers(); +} + + + + +void UDPBaseImplementation::deleteMembers(){ + //kill threads + if(thread_started){ + createListeningThreads(true); + createWriterThreads(true); + } + + for(int i=0;i=0) + fileIndex = i; + return getFileIndex(); +} + + +int UDPBaseImplementation::setFrameIndexNeeded(int i){ + frameIndexNeeded = i; + return frameIndexNeeded; +} + + +int UDPBaseImplementation::getEnableFileWrite() const{ + return enableFileWrite; +} + +int UDPBaseImplementation::setEnableFileWrite(int i){ + enableFileWrite=i; + return getEnableFileWrite(); +} + +int UDPBaseImplementation::getEnableOverwrite() const{ + return overwrite; +} + +int UDPBaseImplementation::setEnableOverwrite(int i){ + overwrite=i; + return getEnableOverwrite(); +} + + + + + +/*other parameters*/ + +slsReceiverDefs::runStatus UDPBaseImplementation::getStatus() const{ + return status; +} + + +void UDPBaseImplementation::initialize(const char *detectorHostName){ + if(strlen(detectorHostName)) + strcpy(detHostname,detectorHostName); +} + + +char *UDPBaseImplementation::getDetectorHostname() const{ + return (char*)detHostname; +} + +void UDPBaseImplementation::setEthernetInterface(char* c){ + strcpy(eth,c); +} + + +void UDPBaseImplementation::setUDPPortNo(int p){ + for(int i=0;i= 0) + numberOfFrames = fnum; + + return getNumberOfFrames(); +} + +int UDPBaseImplementation::getScanTag() const{ + return scanTag; +} + + +int32_t UDPBaseImplementation::setScanTag(int32_t stag){ + if(stag >= 0) + scanTag = stag; + + return getScanTag(); +} + + +int UDPBaseImplementation::getDynamicRange() const{ + return dynamicRange; +} + +int32_t UDPBaseImplementation::setDynamicRange(int32_t dr){ + cout << "Setting Dynamic Range" << endl; + + int olddr = dynamicRange; + if(dr >= 0){ + dynamicRange = dr; + + if(myDetectorType == EIGER){ + + + if(!tengigaEnable) + packetsPerFrame = EIGER_ONE_GIGA_CONSTANT * dynamicRange * EIGER_MAX_PORTS; + else + packetsPerFrame = EIGER_TEN_GIGA_CONSTANT * dynamicRange * EIGER_MAX_PORTS; + frameSize = onePacketSize * packetsPerFrame; + bufferSize = (frameSize/EIGER_MAX_PORTS) + EIGER_HEADER_LENGTH;//everything one port gets (img header plus packets) + maxPacketsPerFile = EIGER_MAX_FRAMES_PER_FILE * packetsPerFrame; + + + + if(olddr != dr){ + + //del + if(thread_started){ + createListeningThreads(true); + createWriterThreads(true); + } + for(int i=0;i=0){ + nFrameToGui = i; + setupFifoStructure(); + } + return nFrameToGui; +} + + + +int64_t UDPBaseImplementation::setAcquisitionPeriod(int64_t index){ + + if(index >= 0){ + if(index != acquisitionPeriod){ + acquisitionPeriod = index; + setupFifoStructure(); + } + } + return acquisitionPeriod; +} + + +bool UDPBaseImplementation::getDataCompression(){return dataCompression;} + +int UDPBaseImplementation::enableDataCompression(bool enable){ + cout << "Data compression "; + if(enable) + cout << "enabled" << endl; + else + cout << "disabled" << endl; +#ifdef MYROOT1 + cout << " WITH ROOT" << endl; +#else + cout << " WITHOUT ROOT" << endl; +#endif + //delete filter for the current number of threads + deleteFilter(); + + dataCompression = enable; + pthread_mutex_lock(&status_mutex); + writerthreads_mask = 0x0; + pthread_mutex_unlock(&(status_mutex)); + + createWriterThreads(true); + + if(enable) + numWriterThreads = MAX_NUM_WRITER_THREADS; + else + numWriterThreads = 1; + + if(createWriterThreads() == FAIL){ + cout << "ERROR: Could not create writer threads" << endl; + return FAIL; + } + setThreadPriorities(); + + + if(enable) + setupFilter(); + + return OK; +} + + + + + + + + + + + + +/*other functions*/ + + +void UDPBaseImplementation::deleteFilter(){ + int i; + cmSub=NULL; + + for(i=0;i(receiverdata[i], csize, sigma, sign, cmSub); + +} + + + +//LEO: it is not clear to me.. +void UDPBaseImplementation::setupFifoStructure(){ + + int64_t i; + int oldn = numJobsPerThread; + + //if every nth frame mode + if(nFrameToGui) + numJobsPerThread = nFrameToGui; + + //random nth frame mode + else{ + if(!acquisitionPeriod) + i = SAMPLE_TIME_IN_NS; + else + i = SAMPLE_TIME_IN_NS/acquisitionPeriod; + if (i > MAX_JOBS_PER_THREAD) + numJobsPerThread = MAX_JOBS_PER_THREAD; + else if (i < 1) + numJobsPerThread = 1; + else + numJobsPerThread = i; + } + + //if same, return + if(oldn == numJobsPerThread) + return; + + if(myDetectorType == EIGER) + numJobsPerThread = 1; + + //otherwise memory too much if numjobsperthread is at max = 1000 + fifosize = GOTTHARD_FIFO_SIZE; + if(myDetectorType == MOENCH) + fifosize = MOENCH_FIFO_SIZE; + else if(myDetectorType == EIGER) + fifosize = EIGER_FIFO_SIZE; + + if(fifosize % numJobsPerThread) + fifosize = (fifosize/numJobsPerThread)+1; + else + fifosize = fifosize/numJobsPerThread; + + + cout << "Number of Frames per buffer:" << numJobsPerThread << endl; + cout << "Fifo Size:" << fifosize << endl; + + /* + //for testing + numJobsPerThread = 3; fifosize = 11; + */ + + for(int i=0;iisEmpty()) + fifoFree[i]->pop(buffer[i]); + delete fifoFree[i]; + } + if(fifo[i]) delete fifo[i]; + if(mem0[i]) free(mem0[i]); + fifoFree[i] = new CircularFifo(fifosize); + fifo[i] = new CircularFifo(fifosize); + + + //allocate memory + mem0[i]=(char*)malloc((bufferSize * numJobsPerThread + HEADER_SIZE_NUM_TOT_PACKETS)*fifosize); + /** shud let the client know about this */ + if (mem0[i]==NULL){ + cout<<"++++++++++++++++++++++ COULD NOT ALLOCATE MEMORY FOR LISTENING !!!!!!!+++++++++++++++++++++" << endl; + exit(-1); + } + buffer[i]=mem0[i]; + //push the addresses into freed fifoFree and writingFifoFree + while (buffer[i]<(mem0[i]+(bufferSize * numJobsPerThread + HEADER_SIZE_NUM_TOT_PACKETS)*(fifosize-1))) { + fifoFree[i]->push(buffer[i]); + buffer[i]+=(bufferSize * numJobsPerThread + HEADER_SIZE_NUM_TOT_PACKETS); + } + } + cout << "Fifo structure(s) reconstructed" << endl; +} + + + + + + + +/** acquisition functions */ + +void UDPBaseImplementation::readFrame(char* c,char** raw, uint32_t &fnum){ + //point to gui data + if (guiData == NULL) + guiData = latestData; + + //copy data and filename + strcpy(c,guiFileName); + fnum = guiFrameNumber; + + + //could not get gui data + if(!guiDataReady){ + *raw = NULL; + } + //data ready, set guidata to receive new data + else{ + *raw = guiData; + guiData = NULL; + + pthread_mutex_lock(&dataReadyMutex); + guiDataReady = 0; + pthread_mutex_unlock(&dataReadyMutex); + if((nFrameToGui) && (writerthreads_mask)){ + /*if(nFrameToGui){*/ + //release after getting data + sem_post(&smp); + } + } +} + + + + + +void UDPBaseImplementation::copyFrameToGui(char* startbuf[], uint32_t fnum, char* buf){ + + //random read when gui not ready + if((!nFrameToGui) && (!guiData)){ + pthread_mutex_lock(&dataReadyMutex); + guiDataReady=0; + pthread_mutex_unlock(&dataReadyMutex); + } + + //random read or nth frame read, gui needs data now + else{ + /* + //nth frame read, block current process if the guireader hasnt read it yet + if(nFrameToGui) + sem_wait(&smp); +*/ + pthread_mutex_lock(&dataReadyMutex); + guiDataReady=0; + //eiger + if(startbuf != NULL){ + int offset = 0; + int size = frameSize/EIGER_MAX_PORTS; + for(int j=0;jgetErrorStatus(); + if(iret){ +#ifdef VERBOSE + cout << "Could not create UDP socket on port " << server_port[i] << " error:" << iret << endl; +#endif + return FAIL; + } + } + + return OK; +} + + + + + + + +int UDPBaseImplementation::shutDownUDPSockets(){ + for(int i=0;iShutDownSocket(); + delete udpSocket[i]; + udpSocket[i] = NULL; + } + } + return OK; +} + + + + + +int UDPBaseImplementation::createListeningThreads(bool destroy){ + int i; + void* status; + + killAllListeningThreads = 0; + + pthread_mutex_lock(&status_mutex); + listeningthreads_mask = 0x0; + pthread_mutex_unlock(&(status_mutex)); + + if(!destroy){ + + //start listening threads + cout << "Creating Listening Threads(s)"; + + currentListeningThreadIndex = -1; + + for(i = 0; i < numListeningThreads; ++i){ + sem_init(&listensmp[i],1,0); + thread_started = 0; + currentListeningThreadIndex = i; + if(pthread_create(&listening_thread[i], NULL,startListeningThread, (void*) this)){ + cout << "Could not create listening thread with index " << i << endl; + return FAIL; + } + while(!thread_started); + cout << "."; + cout << flush; + } +#ifdef VERBOSE + cout << "Listening thread(s) created successfully." << endl; +#else + cout << endl; +#endif + }else{ + cout<<"Destroying Listening Thread(s)"<initEventTree(temp, &iframe); + //resets the pedestalSubtraction array and the commonModeSubtraction + singlePhotonDet[ithr]->newDataSet(); + if(myFile[ithr]==NULL){ + cout<<"file null"<IsOpen()){ + cout<<"file not open"< DO_NOTHING){ + //close + if(sfilefd){ + fclose(sfilefd); + sfilefd = NULL; + } + //open file + if(!overwrite){ + if (NULL == (sfilefd = fopen((const char *) (savefilename), "wx"))){ + cout << "Error: Could not create new file " << savefilename << endl; + return FAIL; + } + }else if (NULL == (sfilefd = fopen((const char *) (savefilename), "w"))){ + cout << "Error: Could not create file " << savefilename << endl; + return FAIL; + } + //setting buffer + setvbuf(sfilefd,NULL,_IOFBF,BUF_SIZE); + + //printing packet losses and file names + if(!packetsCaught) + cout << savefilename << endl; + else{ + cout << savefilename + << "\tpacket loss " + << setw(4)<GetCurrentFile(); + + if(myFile[ithr]->Write()) + //->Write(tall->GetName(),TObject::kOverwrite); + cout << "Thread " << ithr <<": wrote frames to file" << endl; + else + cout << "Thread " << ithr << ": could not write frames to file" << endl; + + }else + cout << "Thread " << ithr << ": could not write frames to file: No file or No Tree" << endl; + //close file + if(myTree[ithr] && myFile[ithr]) + myFile[ithr] = myTree[ithr]->GetCurrentFile(); + if(myFile[ithr] != NULL) + myFile[ithr]->Close(); + myFile[ithr] = NULL; + myTree[ithr] = NULL; + pthread_mutex_unlock(&write_mutex); + +#endif + } +} + + + + + +int UDPBaseImplementation::startReceiver(char message[]){ + int i; + + +// #ifdef VERBOSE + cout << "Starting Receiver" << endl; +//#endif + + + //reset listening thread variables + measurementStarted = false; + //should be set to zero as its added to get next start frame indices for scans for eiger + if(!acqStarted) currframenum = 0; + startFrameIndex = 0; + + for(int i = 0; i < numListeningThreads; ++i) + totalListeningFrameCount[i] = 0; + + //udp socket + if(createUDPSockets() == FAIL){ + strcpy(message,"Could not create UDP Socket(s).\n"); + cout << endl << message << endl; + return FAIL; + } + cout << "UDP socket(s) created successfully. 1st port " << server_port[0] << endl; + + + if(setupWriter() == FAIL){ + //stop udp socket + shutDownUDPSockets(); + + sprintf(message,"Could not create file %s.\n",savefilename); + return FAIL; + } + cout << "Successfully created file(s)" << endl; + + //done to give the gui some proper name instead of always the last file name + if(dataCompression) + sprintf(savefilename, "%s/%s_fxxx_%d_xx.root", filePath,fileName,fileIndex); + + //initialize semaphore + sem_init(&smp,1,0); + + //status + pthread_mutex_lock(&status_mutex); + status = RUNNING; + for(i=0;istartListening(); + + return this_pointer; +} + + + +void* UDPBaseImplementation::startWritingThread(void* this_pointer){ + ((UDPBaseImplementation*)this_pointer)->startWriting(); + return this_pointer; +} + + + + + + +int UDPBaseImplementation::startListening(){ + int ithread = currentListeningThreadIndex; +#ifdef VERYVERBOSE + cout << "In startListening() " << endl; +#endif + + thread_started = 1; + + int i,total; + int lastpacketoffset, expected, rc, rc1,packetcount, maxBufferSize, carryonBufferSize; + uint32_t lastframeheader;// for moench to check for all the packets in last frame + char* tempchar = NULL; + int imageheader = 0; + if(myDetectorType==EIGER) + imageheader = EIGER_IMAGE_HEADER_SIZE; + + + while(1){ + //variables that need to be checked/set before each acquisition + carryonBufferSize = 0; + //if more than 1 listening thread, listen one packet at a time, else need to interleaved frame later + maxBufferSize = bufferSize * numJobsPerThread; +#ifdef VERYDEBUG + cout << " maxBufferSize:" << maxBufferSize << ",carryonBufferSize:" << carryonBufferSize << endl; +#endif + + if(tempchar) {delete [] tempchar;tempchar = NULL;} + if(myDetectorType != EIGER) + tempchar = new char[onePacketSize * ((packetsPerFrame/numListeningThreads) - 1)]; //gotthard: 1packet size, moench:39 packet size + + + while((1<pop(buffer[ithread]); +#ifdef VERYDEBUG + cout << ithread << " *** popped from fifo free" << (void*)buffer[ithread] << endl; +#endif + + + //receive + if(udpSocket[ithread] == NULL){ + rc = 0; + cout << ithread << "UDP Socket is NULL" << endl; + } + //normal listening + else if(!carryonBufferSize){ + + rc = udpSocket[ithread]->ReceiveDataOnly(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS, maxBufferSize); + expected = maxBufferSize; + + } + //the remaining packets from previous buffer + else{ +#ifdef VERYDEBUG + cout << ithread << " ***carry on buffer" << carryonBufferSize << endl; + cout << ithread << " framennum in temochar:"<<((((uint32_t)(*((uint32_t*)tempchar))) + & (frameIndexMask)) >> frameIndexOffset)<ReceiveDataOnly((buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS + carryonBufferSize),maxBufferSize - carryonBufferSize); + expected = maxBufferSize - carryonBufferSize; + } + +#ifdef VERYDEBUG + cout << ithread << " *** rc:" << dec << rc << ". expected:" << dec << expected << endl; +#endif + + + + + //start indices for each start of scan/acquisition - eiger does it before + if((!measurementStarted) && (rc > 0) && (!ithread)) + startFrameIndices(ithread); + + //problem in receiving or end of acquisition + if((rc < expected)||(rc <= 0)){ + stopListening(ithread,rc,packetcount,total); + continue; + } + + + + //reset + packetcount = (packetsPerFrame/numListeningThreads) * numJobsPerThread; + carryonBufferSize = 0; + + + + //check if last packet valid and calculate packet count + switch(myDetectorType){ + + case MOENCH: + lastpacketoffset = (((numJobsPerThread * packetsPerFrame - 1) * onePacketSize) + HEADER_SIZE_NUM_TOT_PACKETS); +#ifdef VERYDEBUG + cout <<"first packet:"<< ((((uint32_t)(*((uint32_t*)(buffer[ithread]+HEADER_SIZE_NUM_TOT_PACKETS))))) & (packetIndexMask)) << endl; + cout <<"first header:"<< (((((uint32_t)(*((uint32_t*)(buffer[ithread]+HEADER_SIZE_NUM_TOT_PACKETS))))) & (frameIndexMask)) >> frameIndexOffset) << endl; + cout << "last packet offset:" << lastpacketoffset << endl; + cout <<"last packet:"<< ((((uint32_t)(*((uint32_t*)(buffer[ithread]+lastpacketoffset))))) & (packetIndexMask)) << endl; + cout <<"last header:"<< (((((uint32_t)(*((uint32_t*)(buffer[ithread]+lastpacketoffset))))) & (frameIndexMask)) >> frameIndexOffset) << endl; +#endif + //moench last packet value is 0 + if( ((((uint32_t)(*((uint32_t*)(buffer[ithread]+lastpacketoffset))))) & (packetIndexMask))){ + lastframeheader = ((((uint32_t)(*((uint32_t*)(buffer[ithread]+lastpacketoffset))))) & (frameIndexMask)) >> frameIndexOffset; + carryonBufferSize += onePacketSize; + lastpacketoffset -= onePacketSize; + --packetcount; + while (lastframeheader == (((((uint32_t)(*((uint32_t*)(buffer[ithread]+lastpacketoffset))))) & (frameIndexMask)) >> frameIndexOffset)){ + carryonBufferSize += onePacketSize; + lastpacketoffset -= onePacketSize; + --packetcount; + } + memcpy(tempchar, buffer[ithread]+(lastpacketoffset+onePacketSize), carryonBufferSize); +#ifdef VERYDEBUG + cout << "tempchar header:" << (((((uint32_t)(*((uint32_t*)(tempchar))))) + & (frameIndexMask)) >> frameIndexOffset) << endl; + cout <<"tempchar packet:"<< ((((uint32_t)(*((uint32_t*)(tempchar))))) + & (packetIndexMask)) << endl; +#endif + } + break; + + case GOTTHARD: + if(shortFrame == -1){ + lastpacketoffset = (((numJobsPerThread * packetsPerFrame - 1) * onePacketSize) + HEADER_SIZE_NUM_TOT_PACKETS); +#ifdef VERYDEBUG + cout << "last packet offset:" << lastpacketoffset << endl; +#endif + + if((unsigned int)(packetsPerFrame -1) != ((((uint32_t)(*((uint32_t*)(buffer[ithread]+lastpacketoffset))))+1) & (packetIndexMask))){ + memcpy(tempchar,buffer[ithread]+lastpacketoffset, onePacketSize); +#ifdef VERYDEBUG + cout << "tempchar header:" << (((((uint32_t)(*((uint32_t*)(tempchar))))+1) + & (frameIndexMask)) >> frameIndexOffset) << endl; +#endif + carryonBufferSize = onePacketSize; + --packetcount; + } + } +#ifdef VERYDEBUG + cout << "header:" << (((((uint32_t)(*((uint32_t*)(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS))))+1) + & (frameIndexMask)) >> frameIndexOffset) << endl; +#endif + break; + default: + + break; + + } + + + // cout<<"*********** "<fnum)<push(buffer[ithread])); +#ifdef VERYDEBUG + if(!ithread) cout << ithread << " *** pushed into listening fifo" << endl; +#endif + } + + sem_wait(&listensmp[ithread]); + + //make sure its not exiting thread + if(killAllListeningThreads){ + cout << ithread << " good bye listening thread" << endl; + if(tempchar) {delete [] tempchar;tempchar = NULL;} + pthread_exit(NULL); + } + } + + return OK; +} + + + + + + + + + + + + + +int UDPBaseImplementation::startWriting(){ + int ithread = currentWriterThreadIndex; +#ifdef VERYVERBOSE + cout << ithread << "In startWriting()" <pop(wbuf[i]); + numpackets = (uint16_t)(*((uint16_t*)wbuf[i])); +#ifdef VERYDEBUG + cout << ithread << " numpackets:" << dec << numpackets << endl; +#endif + } + +#ifdef VERYDEBUG + cout << ithread << " numpackets:" << dec << numpackets << endl; + cout << ithread << " *** writer popped from fifo " << (void*) wbuf[0]<< endl; + cout << ithread << " *** writer popped from fifo " << (void*) wbuf[1]<< endl; +#endif + + + //last dummy packet + if(numpackets == 0xFFFF){ + stopWriting(ithread,wbuf); + continue; + } + + + + + //for progress + if(myDetectorType == EIGER){ + tempframenum = htonl(*(unsigned int*)((eiger_image_header *)((char*)(wbuf[ithread] + HEADER_SIZE_NUM_TOT_PACKETS)))->fnum); + tempframenum += (startFrameIndex-1); //eiger frame numbers start at 1, so need to -1 + }else if ((myDetectorType == GOTTHARD) && (shortFrame == -1)) + tempframenum = (((((uint32_t)(*((uint32_t*)(wbuf[ithread] + HEADER_SIZE_NUM_TOT_PACKETS))))+1)& (frameIndexMask)) >> frameIndexOffset); + else + tempframenum = ((((uint32_t)(*((uint32_t*)(wbuf[ithread] + HEADER_SIZE_NUM_TOT_PACKETS))))& (frameIndexMask)) >> frameIndexOffset); + + if(numWriterThreads == 1) + currframenum = tempframenum; + else{ + pthread_mutex_lock(&progress_mutex); + if(tempframenum > currframenum) + currframenum = tempframenum; + pthread_mutex_unlock(&progress_mutex); + } +//#ifdef VERYDEBUG + if(myDetectorType == EIGER) + cout << endl < 0){ + for(i=0;ipush(wbuf[i])); +#ifdef VERYDEBUG + cout << ithread << ":" << i+j << " fifo freed:" << (void*)wbuf[i] << endl; +#endif + } + + + } + else{ + //copy to gui + copyFrameToGui(NULL,-1,wbuf[0]+HEADER_SIZE_NUM_TOT_PACKETS); +#ifdef VERYVERBOSE + cout << ithread << " finished copying" << endl; +#endif + while(!fifoFree[0]->push(wbuf[0])); +#ifdef VERYVERBOSE + cout<<"buf freed:"<<(void*)wbuf[0]<fnum); + //gotthard has +1 for frame number and not a short frame + else if ((myDetectorType == GOTTHARD) && (shortFrame == -1)) + startFrameIndex = (((((uint32_t)(*((uint32_t*)(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS))))+1) + & (frameIndexMask)) >> frameIndexOffset); + else + startFrameIndex = ((((uint32_t)(*((uint32_t*)(buffer[ithread]+HEADER_SIZE_NUM_TOT_PACKETS)))) + & (frameIndexMask)) >> frameIndexOffset); + + + //start of acquisition + if(!acqStarted){ + startAcquisitionIndex=startFrameIndex; + currframenum = startAcquisitionIndex; + acqStarted = true; + cout << "startAcquisitionIndex:" << startAcquisitionIndex<push(buffer[ithread]); + exit(-1); + } + //push the last buffer into fifo + if(rc > 0){ + pc = (rc/onePacketSize); +#ifdef VERYDEBUG + cout << ithread << " *** last packetcount:" << pc << endl; +#endif + (*((uint16_t*)(buffer[ithread]))) = pc; + totalListeningFrameCount[ithread] += pc; + while(!fifo[ithread]->push(buffer[ithread])); +#ifdef VERYDEBUG + cout << ithread << " *** last lbuf1:" << (void*)buffer[ithread] << endl; +#endif + } + + + //push dummy buffer to all writer threads + for(i=0;ipop(buffer[ithread]); + (*((uint16_t*)(buffer[ithread]))) = 0xFFFF; +#ifdef VERYDEBUG + cout << ithread << " going to push in dummy buffer:" << (void*)buffer[ithread] << " with num packets:"<< (*((uint16_t*)(buffer[ithread]))) << endl; +#endif + while(!fifo[ithread]->push(buffer[ithread])); +#ifdef VERYDEBUG + cout << ithread << " pushed in dummy buffer:" << (void*)buffer[ithread] << endl; +#endif + } + + //reset mask and exit loop + pthread_mutex_lock(&status_mutex); + listeningthreads_mask^=(1< 1) + cout << "Waiting for listening to be done.. current mask:" << hex << listeningthreads_mask << endl; +#endif + while(listeningthreads_mask) + usleep(5000); +#ifdef VERYDEBUG + t = 0; + for(i=0;ipush(wbuffer[i])); +#ifdef VERYDEBUG + cout << ithread << ":" << i<< " fifo freed:" << (void*)wbuffer[i] << endl; +#endif + } + + + + //all threads need to close file, reset mask and exit loop + closeFile(ithread); + pthread_mutex_lock(&status_mutex); + writerthreads_mask^=(1< 0){ + + //for progress and packet loss calculation(new files) + if(myDetectorType == EIGER); + else if ((myDetectorType == GOTTHARD) && (shortFrame == -1)) + tempframenum = (((((uint32_t)(*((uint32_t*)(buf + HEADER_SIZE_NUM_TOT_PACKETS))))+1)& (frameIndexMask)) >> frameIndexOffset); + else + tempframenum = ((((uint32_t)(*((uint32_t*)(buf + HEADER_SIZE_NUM_TOT_PACKETS))))& (frameIndexMask)) >> frameIndexOffset); + + if(numWriterThreads == 1) + currframenum = tempframenum; + else{ + if(tempframenum > currframenum) + currframenum = tempframenum; + } +#ifdef VERYDEBUG + cout << "tempframenum:" << dec << tempframenum << " curframenum:" << currframenum << endl; +#endif + + //lock + if(numWriterThreads > 1) + pthread_mutex_lock(&write_mutex); + + + //to create new file when max reached + packetsToSave = maxPacketsPerFile - packetsInFile; + if(packetsToSave > numpackets) + packetsToSave = numpackets; +/**next time offset is still plus header length*/ + fwrite(buf+offset, 1, packetsToSave * onePacketSize, sfilefd); + packetsInFile += packetsToSave; + packetsCaught += packetsToSave; + totalPacketsCaught += packetsToSave; + + + //new file + if(packetsInFile >= maxPacketsPerFile){ + //for packet loss + lastpacket = (((packetsToSave - 1) * onePacketSize) + offset); + if(myDetectorType == EIGER); + else if ((myDetectorType == GOTTHARD) && (shortFrame == -1)) + tempframenum = (((((uint32_t)(*((uint32_t*)(buf + lastpacket))))+1)& (frameIndexMask)) >> frameIndexOffset); + else + tempframenum = ((((uint32_t)(*((uint32_t*)(buf + lastpacket))))& (frameIndexMask)) >> frameIndexOffset); + + if(numWriterThreads == 1) + currframenum = tempframenum; + else{ + if(tempframenum > currframenum) + currframenum = tempframenum; + } +#ifdef VERYDEBUG + cout << "tempframenum:" << dec << tempframenum << " curframenum:" << currframenum << endl; +#endif + //create + createNewFile(); + } + + //unlock + if(numWriterThreads > 1) + pthread_mutex_unlock(&write_mutex); + + + offset += (packetsToSave * onePacketSize); + numpackets -= packetsToSave; + } + + } + else{ + if(numWriterThreads > 1) + pthread_mutex_lock(&write_mutex); + packetsInFile += numpackets; + packetsCaught += numpackets; + totalPacketsCaught += numpackets; + if(numWriterThreads > 1) + pthread_mutex_unlock(&write_mutex); + } +} + + + + + + + + + + + + + + +void UDPBaseImplementation::handleDataCompression(int ithread, char* wbuffer[], int &npackets, char* data, int xmax, int ymax, int &nf){ + +#if defined(MYROOT1) && defined(ALLFILE_DEBUG) + writeToFile_withoutCompression(wbuf[0], numpackets,currframenum); +#endif + + eventType thisEvent = PEDESTAL; + int ndata; + char* buff = 0; + data = wbuffer[0]+ HEADER_SIZE_NUM_TOT_PACKETS; + int remainingsize = npackets * onePacketSize; + int np; + int once = 0; + double tot, tl, tr, bl, br; + int xmin = 1, ymin = 1, ix, iy; + + + while(buff = receiverdata[ithread]->findNextFrame(data,ndata,remainingsize)){ + np = ndata/onePacketSize; + + //cout<<"buff framnum:"<> frameIndexOffset)<newFrame(); + + //only for moench + if(commonModeSubtractionEnable){ + for(ix = xmin - 1; ix < xmax+1; ix++){ + for(iy = ymin - 1; iy < ymax+1; iy++){ + thisEvent = singlePhotonDet[ithread]->getEventType(buff, ix, iy, 0); + } + } + } + + + for(ix = xmin - 1; ix < xmax+1; ix++) + for(iy = ymin - 1; iy < ymax+1; iy++){ + thisEvent=singlePhotonDet[ithread]->getEventType(buff, ix, iy, commonModeSubtractionEnable); + if (nf>1000) { + tot=0; + tl=0; + tr=0; + bl=0; + br=0; + if (thisEvent==PHOTON_MAX) { + receiverdata[ithread]->getFrameNumber(buff); + //iFrame=receiverdata[ithread]->getFrameNumber(buff); +#ifdef MYROOT1 + myTree[ithread]->Fill(); + //cout << "Fill in event: frmNr: " << iFrame << " ix " << ix << " iy " << iy << " type " << thisEvent << endl; +#else + pthread_mutex_lock(&write_mutex); + if((enableFileWrite) && (sfilefd)) + singlePhotonDet[ithread]->writeCluster(sfilefd); + pthread_mutex_unlock(&write_mutex); +#endif + } + } + } + + nf++; +#ifndef ALLFILE + pthread_mutex_lock(&progress_mutex); + packetsInFile += packetsPerFrame; + packetsCaught += packetsPerFrame; + totalPacketsCaught += packetsPerFrame; + if(packetsInFile >= maxPacketsPerFile) + createNewFile(); + pthread_mutex_unlock(&progress_mutex); + +#endif + if(!once){ + copyFrameToGui(NULL,-1,buff); + once = 1; + } + } + + remainingsize -= ((buff + ndata) - data); + data = buff + ndata; + if(data > (wbuffer[0] + HEADER_SIZE_NUM_TOT_PACKETS + npackets * onePacketSize) ) + cout <<" **************ERROR SHOULD NOT COME HERE, Error 142536!"<push(wbuffer[0])); +#ifdef VERYVERBOSE + cout<<"buf freed:"<<(void*)wbuffer[0]<= 0){ + + tengigaEnable = enable; + + if(myDetectorType == EIGER){ + + if(!tengigaEnable){ + packetsPerFrame = EIGER_ONE_GIGA_CONSTANT * dynamicRange * EIGER_MAX_PORTS; + onePacketSize = EIGER_ONE_GIGA_ONE_PACKET_SIZE; + maxPacketsPerFile = EIGER_MAX_FRAMES_PER_FILE * packetsPerFrame; + }else{ + packetsPerFrame = EIGER_TEN_GIGA_CONSTANT * dynamicRange * EIGER_MAX_PORTS; + onePacketSize = EIGER_TEN_GIGA_ONE_PACKET_SIZE; + maxPacketsPerFile = EIGER_MAX_FRAMES_PER_FILE * packetsPerFrame*4; + } + frameSize = onePacketSize * packetsPerFrame; + bufferSize = (frameSize/EIGER_MAX_PORTS) + EIGER_HEADER_LENGTH;//everything one port gets (img header plus packets) + //maxPacketsPerFile = EIGER_MAX_FRAMES_PER_FILE * packetsPerFrame; + + + cout<<"packetsPerFrame:"< +#include +#endif + + +#include +#include +#include +#include + + +/** + * @short does all the functions for a receiver, set/get parameters, start/stop etc. + */ + +class UDPBaseImplementation : private virtual slsReceiverDefs, public UDPInterface { + + public: + /** + * Constructor + */ + UDPBaseImplementation(); + + /** + * Destructor + */ + virtual ~UDPBaseImplementation(); + + + + /** + * delete and free member parameters + */ + void deleteMembers(); + + /** + * initialize member parameters + */ + void initializeMembers(); + + /** + * Set receiver type + * @param det detector type + * Returns success or FAIL + */ + int setDetectorType(detectorType det); + + + //Frame indices and numbers caught + /** + * Returns current Frame Index Caught for an entire acquisition (including all scans) + */ + uint32_t getAcquisitionIndex(); + + /** + * Returns if acquisition started + */ + bool getAcquistionStarted(); + + /** + * Returns Frames Caught for each real time acquisition (eg. for each scan) + */ + int getFramesCaught(); + + /** + * Returns Total Frames Caught for an entire acquisition (including all scans) + */ + int getTotalFramesCaught(); + + /** + * Returns the frame index at start of each real time acquisition (eg. for each scan) + */ + uint32_t getStartFrameIndex(); + + /** + * Returns current Frame Index for each real time acquisition (eg. for each scan) + */ + uint32_t getFrameIndex(); + + /** + * Returns if measurement started + */ + bool getMeasurementStarted(); + + /** + * Resets the Total Frames Caught + * This is how the receiver differentiates between entire acquisitions + * Returns 0 + */ + void resetTotalFramesCaught(); + + + + + //file parameters + /** + * Returns File Path + */ + char* getFilePath() const; + + /** + * Set File Path + * @param c file path + */ + char* setFilePath(const char c[]); + + /** + * Returns File Name + */ + char* getFileName() const; + + /** + * Set File Name (without frame index, file index and extension) + * @param c file name + */ + char* setFileName(const char c[]); + + /** + * Returns File Index + */ + int getFileIndex(); + + /** + * Set File Index + * @param i file index + */ + int setFileIndex(int i); + + /** + * Set Frame Index Needed + * @param i frame index needed + */ + int setFrameIndexNeeded(int i); + + /** + * Set enable file write + * @param i file write enable + * Returns file write enable + */ + int setEnableFileWrite(int i); + + /** + * Enable/disable overwrite + * @param i enable + * Returns enable over write + */ + int setEnableOverwrite(int i); + + /** + * Returns file write enable + * 1: YES 0: NO + */ + int getEnableFileWrite() const; + + /** + * Returns file over write enable + * 1: YES 0: NO + */ + int getEnableOverwrite() const; + +//other parameters + + /** + * abort acquisition with minimum damage: close open files, cleanup. + * does nothing if state already is 'idle' + */ + void abort() {}; + + /** + * Returns status of receiver: idle, running or error + */ + runStatus getStatus() const; + + /** + * Set detector hostname + * @param c hostname + */ + void initialize(const char *detectorHostName); + + /* Returns detector hostname + /returns hostname + * caller needs to deallocate the returned char array. + * if uninitialized, it must return NULL + */ + char *getDetectorHostname() const; + + /** + * Set Ethernet Interface or IP to listen to + */ + void setEthernetInterface(char* c); + + /** + * Set UDP Port Number + */ + void setUDPPortNo(int p); + + /* + * Returns number of frames to receive + * This is the number of frames to expect to receiver from the detector. + * The data receiver will change from running to idle when it got this number of frames + */ + int getNumberOfFrames() const; + + /** + * set frame number if a positive number + */ + int32_t setNumberOfFrames(int32_t fnum); + + /** + * Returns scan tag + */ + int getScanTag() const; + + /** + * set scan tag if its is a positive number + */ + int32_t setScanTag(int32_t stag); + + /** + * Returns the number of bits per pixel + */ + int getDynamicRange() const; + + /** + * set dynamic range if its is a positive number + */ + int32_t setDynamicRange(int32_t dr); + + /** + * Set short frame + * @param i if shortframe i=1 + */ + int setShortFrame(int i); + + /** + * Set the variable to send every nth frame to gui + * or if 0,send frame only upon gui request + */ + int setNFrameToGui(int i); + + /** set acquisition period if a positive number + */ + int64_t setAcquisitionPeriod(int64_t index); + + /** get data compression, by saving only hits + */ + bool getDataCompression(); + + /** enabl data compression, by saving only hits + /returns if failed + */ + int enableDataCompression(bool enable); + + /** + * enable 10Gbe + @param enable 1 for 10Gbe or 0 for 1 Gbe, -1 to read out + \returns enable for 10Gbe + */ + int enableTenGiga(int enable = -1); + + + +//other functions + + /** + * Returns the buffer-current frame read by receiver + * @param c pointer to current file name + * @param raw address of pointer, pointing to current frame to send to gui + * @param fnum frame number for eiger as it is not in the packet + */ + void readFrame(char* c,char** raw, uint32_t &fnum); + + /** + * Closes all files + * @param ithr thread index + */ + void closeFile(int ithr = -1); + + /** + * Starts Receiver - starts to listen for packets + * @param message is the error message if there is an error + * Returns success + */ + int startReceiver(char message[]); + + /** + * Stops Receiver - stops listening for packets + * Returns success + */ + int stopReceiver(); + + /** set status to transmitting and + * when fifo is empty later, sets status to run_finished + */ + void startReadout(); + + /** + * shuts down the udp sockets + * \returns if success or fail + */ + int shutDownUDPSockets(); + +private: + + /* + void not_implemented(string method_name){ + std::cout << "[WARNING] Method " << method_name << " not implemented!" << std::endl; + }; + */ + /** + * Deletes all the filter objects for single photon data + */ + void deleteFilter(); + + /** + * Constructs the filter for single photon data + */ + void setupFilter(); + + /** + * set up fifo according to the new numjobsperthread + */ + void setupFifoStructure (); + + /** + * Copy frames to gui + * uses semaphore for nth frame mode + */ + void copyFrameToGui(char* startbuf[], uint32_t fnum=-1, char* buf=NULL); + + /** + * creates udp sockets + * \returns if success or fail + */ + int createUDPSockets(); + + /** + * create listening thread + * @param destroy is true to kill all threads and start again + */ + int createListeningThreads(bool destroy = false); + + /** + * create writer threads + * @param destroy is true to kill all threads and start again + */ + int createWriterThreads(bool destroy = false); + + /** + * set thread priorities + */ + void setThreadPriorities(); + + /** + * initializes variables and creates the first file + * also does the startAcquisitionCallBack + * \returns FAIL or OK + */ + int setupWriter(); + + /** + * Creates new tree and file for compression + * @param ithr thread number + * @param iframe frame number + *\returns OK for succces or FAIL for failure + */ + int createCompressionFile(int ithr, int iframe); + + /** + * Creates new file + *\returns OK for succces or FAIL for failure + */ + int createNewFile(); + + /** + * Static function - Thread started which listens to packets. + * Called by startReceiver() + * @param this_pointer pointer to this object + */ + static void* startListeningThread(void *this_pointer); + + /** + * Static function - Thread started which writes packets to file. + * Called by startReceiver() + * @param this_pointer pointer to this object + */ + static void* startWritingThread(void *this_pointer); + + /** + * Thread started which listens to packets. + * Called by startReceiver() + * + */ + int startListening(); + + /** + * Thread started which writes packets to file. + * Called by startReceiver() + * + */ + int startWriting(); + + /** + * Writing to file without compression + * @param buf is the address of buffer popped out of fifo + * @param numpackets is the number of packets + * @param framenum current frame number + */ + void writeToFile_withoutCompression(char* buf,int numpackets, uint32_t framenum); + + /** + * Its called for the first packet of a scan or acquistion + * Sets the startframeindices and the variables to know if acquisition started + * @param ithread listening thread number + */ + void startFrameIndices(int ithread); + + /** + * This is called when udp socket is shut down + * It pops ffff instead of packet number into fifo + * to inform writers about the end of listening session + * @param ithread listening thread number + * @param rc number of bytes received + * @param pc packet count + * @param t total packets listened to + */ + void stopListening(int ithread, int rc, int &pc, int &t); + + /** + * When acquisition is over, this is called + * @param ithread listening thread number + * @param wbuffer writer buffer + */ + void stopWriting(int ithread, char* wbuffer[]); + + + /** + * data compression for each fifo output + * @param ithread listening thread number + * @param wbuffer writer buffer + * @param npackets number of packets from the fifo + * @param data pointer to the next packet start + * @param xmax max pixels in x direction + * @param ymax max pixels in y direction + * @param nf nf + */ + void handleDataCompression(int ithread, char* wbuffer[], int &npackets, char* data, int xmax, int ymax, int &nf); + + + /** structure of an eiger image header*/ + typedef struct + { + unsigned char header_before[20]; + unsigned char fnum[4]; + unsigned char header_after[24]; + } eiger_image_header; + + + /** structure of an eiger image header*/ + typedef struct + { + unsigned char num1[4]; + unsigned char num2[4]; + } eiger_packet_header; + + /** max number of listening threads */ + const static int MAX_NUM_LISTENING_THREADS = EIGER_MAX_PORTS; + + /** max number of writer threads */ + const static int MAX_NUM_WRITER_THREADS = 15; + + /** detector type */ + detectorType myDetectorType; + + /** detector hostname */ + char detHostname[MAX_STR_LENGTH]; + + /** status of receiver */ + runStatus status; + + /** UDP Socket between Receiver and Detector */ + genericSocket* udpSocket[MAX_NUM_LISTENING_THREADS]; + + /** Server UDP Port*/ + int server_port[MAX_NUM_LISTENING_THREADS]; + + /** ethernet interface or IP to listen to */ + char *eth; + + /** max packets per file **/ + int maxPacketsPerFile; + + /** File write enable */ + int enableFileWrite; + + /** File over write enable */ + int overwrite; + + /** Complete File name */ + char savefilename[MAX_STR_LENGTH]; + + /** File Name without frame index, file index and extension*/ + char fileName[MAX_STR_LENGTH]; + + /** File Path */ + char filePath[MAX_STR_LENGTH]; + + /** File Index */ + int fileIndex; + + /** scan tag */ + int scanTag; + + /** if frame index required in file name */ + int frameIndexNeeded; + + /* Acquisition started */ + bool acqStarted; + + /* Measurement started */ + bool measurementStarted; + + /** Frame index at start of each real time acquisition (eg. for each scan) */ + uint32_t startFrameIndex; + + /** Actual current frame index of each time acquisition (eg. for each scan) */ + uint32_t frameIndex; + + /** Frames Caught for each real time acquisition (eg. for each scan) */ + int packetsCaught; + + /** Total packets caught for an entire acquisition (including all scans) */ + int totalPacketsCaught; + + /** Pckets currently in current file, starts new file when it reaches max */ + int packetsInFile; + + /** Frame index at start of an entire acquisition (including all scans) */ + uint32_t startAcquisitionIndex; + + /** Actual current frame index of an entire acquisition (including all scans) */ + uint32_t acquisitionIndex; + + /** number of packets per frame*/ + int packetsPerFrame; + + /** frame index mask */ + uint32_t frameIndexMask; + + /** packet index mask */ + uint32_t packetIndexMask; + + /** frame index offset */ + int frameIndexOffset; + + /** acquisition period */ + int64_t acquisitionPeriod; + + /** frame number */ + int32_t numberOfFrames; + + /** dynamic range */ + int dynamicRange; + + /** short frames */ + int shortFrame; + + /** current frame number */ + uint32_t currframenum; + + /** Previous Frame number from buffer */ + uint32_t prevframenum; + + /** size of one frame */ + int frameSize; + + /** buffer size. different from framesize as we wait for one packet instead of frame for eiger */ + int bufferSize; + + /** oen buffer size */ + int onePacketSize; + + /** latest data */ + char* latestData; + + /** gui data ready */ + int guiDataReady; + + /** points to the data to send to gui */ + char* guiData; + + /** points to the filename to send to gui */ + char* guiFileName; + + /** temporary number for eiger frame number as its not included in the packet */ + uint32_t guiFrameNumber; + + /** send every nth frame to gui or only upon gui request*/ + int nFrameToGui; + + /** fifo size */ + unsigned int fifosize; + + /** number of jobs per thread for data compression */ + int numJobsPerThread; + + /** datacompression - save only hits */ + bool dataCompression; + + /** memory allocated for the buffer */ + char *mem0[MAX_NUM_LISTENING_THREADS]; + + /** circular fifo to store addresses of data read */ + CircularFifo* fifo[MAX_NUM_LISTENING_THREADS]; + + /** circular fifo to store addresses of data already written and ready to be resued*/ + CircularFifo* fifoFree[MAX_NUM_LISTENING_THREADS]; + + /** Receiver buffer */ + char *buffer[MAX_NUM_LISTENING_THREADS]; + + /** number of writer threads */ + int numListeningThreads; + + /** number of writer threads */ + int numWriterThreads; + + /** to know if listening and writer threads created properly */ + int thread_started; + + /** current listening thread index*/ + int currentListeningThreadIndex; + + /** current writer thread index*/ + int currentWriterThreadIndex; + + /** thread listening to packets */ + pthread_t listening_thread[MAX_NUM_LISTENING_THREADS]; + + /** thread writing packets */ + pthread_t writing_thread[MAX_NUM_WRITER_THREADS]; + + /** total frame count the listening thread has listened to */ + int totalListeningFrameCount[MAX_NUM_LISTENING_THREADS]; + + /** mask showing which listening threads are running */ + volatile uint32_t listeningthreads_mask; + + /** mask showing which writer threads are running */ + volatile uint32_t writerthreads_mask; + + /** mask showing which threads have created files*/ + volatile uint32_t createfile_mask; + + /** OK if file created was successful */ + int ret_createfile; + + /** variable used to self terminate threads waiting for semaphores */ + int killAllListeningThreads; + + /** variable used to self terminate threads waiting for semaphores */ + int killAllWritingThreads; + + /** 10Gbe enable*/ + int tengigaEnable; + + + + +//semaphores + /** semaphore to synchronize writer and guireader threads */ + sem_t smp; + /** semaphore to synchronize listener threads */ + sem_t listensmp[MAX_NUM_LISTENING_THREADS]; + /** semaphore to synchronize writer threads */ + sem_t writersmp[MAX_NUM_WRITER_THREADS]; + + +//mutex + /** guiDataReady mutex */ + pthread_mutex_t dataReadyMutex; + + /** mutex for status */ + pthread_mutex_t status_mutex; + + /** mutex for progress variable currframenum */ + pthread_mutex_t progress_mutex; + + /** mutex for writing data to file */ + pthread_mutex_t write_mutex; + + /** File Descriptor */ + FILE *sfilefd; + + //filter + singlePhotonDetector *singlePhotonDet[MAX_NUM_WRITER_THREADS]; + slsReceiverData *receiverdata[MAX_NUM_WRITER_THREADS]; + moenchCommonMode *cmSub; + bool commonModeSubtractionEnable; + +#ifdef MYROOT1 + /** Tree where the hits are stored */ + TTree *myTree[MAX_NUM_WRITER_THREADS]; + + /** File where the tree is saved */ + TFile *myFile[MAX_NUM_WRITER_THREADS]; +#endif + + + + /** + callback arguments are + filepath + filename + fileindex + data size + + return value is + 0 callback takes care of open,close,write file + 1 callback writes file, we have to open, close it + 2 we open, close, write file, callback does not do anything + + */ + int (*startAcquisitionCallBack)(char*, char*,int, int, void*); + void *pStartAcquisition; + + /** + args to acquisition finished callback + total frames caught + + */ + void (*acquisitionFinishedCallBack)(int, void*); + void *pAcquisitionFinished; + + + /** + args to raw data ready callback are + framenum + datapointer + datasize in bytes + file descriptor + guidatapointer (NULL, no data required) + */ + void (*rawDataReadyCallBack)(int, char*, int, FILE*, char*, void*); + void *pRawDataReady; + + /** The action which decides what the user and default responsibilites to save data are + * 0 raw data ready callback takes care of open,close,write file + * 1 callback writes file, we have to open, close it + * 2 we open, close, write file, callback does not do anything */ + int cbAction; + + +public: + + + /** + callback arguments are + filepath + filename + fileindex + datasize + + return value is + 0 callback takes care of open,close,wrie file + 1 callback writes file, we have to open, close it + 2 we open, close, write file, callback does not do anything + */ + void registerCallBackStartAcquisition(int (*func)(char*, char*,int, int, void*),void *arg){startAcquisitionCallBack=func; pStartAcquisition=arg;}; + + /** + callback argument is + toatal frames caught + */ + void registerCallBackAcquisitionFinished(void (*func)(int, void*),void *arg){acquisitionFinishedCallBack=func; pAcquisitionFinished=arg;}; + + /** + args to raw data ready callback are + framenum + datapointer + datasize in bytes + file descriptor + guidatapointer (NULL, no data required) + */ + void registerCallBackRawDataReady(void (*func)(int, char*, int, FILE*, char*, void*),void *arg){rawDataReadyCallBack=func; pRawDataReady=arg;}; +}; + + +#endif + +//#endif diff --git a/slsReceiverSoftware/slsReceiver/UDPInterface.cpp b/slsReceiverSoftware/slsReceiver/UDPInterface.cpp new file mode 100644 index 000000000..d53166f9e --- /dev/null +++ b/slsReceiverSoftware/slsReceiver/UDPInterface.cpp @@ -0,0 +1,43 @@ +//#ifdef SLS_RECEIVER_UDP_FUNCTIONS +/********************************************//** + * @file slsReceiverUDPFunctions.cpp + * @short does all the functions for a receiver, set/get parameters, start/stop etc. + ***********************************************/ + + + + +#include // SIGINT +#include // stat +#include // socket(), bind(), listen(), accept(), shut down +#include // sock_addr_in, htonl, INADDR_ANY +#include // exit() +#include //set precision +#include //munmap + +#include +#include +using namespace std; + +#include "UDPInterface.h" +#include "UDPBaseImplementation.h" + +#include "moench02ModuleData.h" +#include "gotthardModuleData.h" +#include "gotthardShortModuleData.h" + + +using namespace std; + +UDPInterface * UDPInterface::create(string receiver_type){ + + if (receiver_type == "standard") + return new UDPBaseImplementation(); + else{ + cout << "[ERROR] UDP interface not supported, using standard implementation" << endl; + return new UDPBaseImplementation(); + } +} + + +//#endif diff --git a/slsReceiverSoftware/slsReceiver/slsReceiverBase.h b/slsReceiverSoftware/slsReceiver/UDPInterface.h similarity index 93% rename from slsReceiverSoftware/slsReceiver/slsReceiverBase.h rename to slsReceiverSoftware/slsReceiver/UDPInterface.h index 007bf4a97..74f471ae0 100644 --- a/slsReceiverSoftware/slsReceiver/slsReceiverBase.h +++ b/slsReceiverSoftware/slsReceiver/UDPInterface.h @@ -1,7 +1,7 @@ -#ifndef SLSRECEIVERBASE_H -#define SLSRECEIVERBASE_H +#ifndef UDPINTERFACE_H +#define UDPINTERFACE_H /*********************************************** - * @file slsReceiverBase.h + * @file UDPInterface.h * @short base class with all the functions for a receiver, set/get parameters, start/stop etc. ***********************************************/ /** @@ -12,19 +12,35 @@ * @short base class with all the functions for a receiver, set/get parameters, start/stop etc. */ -class slsReceiverBase { +#include "sls_receiver_defs.h" +#include "receiver_defs.h" +#include "MySocketTCP.h" -public: +/* +void print_not_implemented(string method_name){ + std::cout << "[WARNING] Method " << method_name << " not implemented!" << std::endl; +} +*/ - /** - * constructor - */ - slsReceiverBase(){}; - - /** - * Destructor - */ - virtual ~slsReceiverBase() {}; +class UDPInterface { + + public: + + /** + * constructor + */ + //UDPInterface(){}; + + /** + * Destructor + */ + virtual ~UDPInterface() {}; + + /** + * Factory create method + */ + static UDPInterface *create(string receiver_type = "standard"); + /** * Initialize the Receiver @@ -32,7 +48,7 @@ public: * you can call this function only once. You must call it before you call startReceiver() for the first time. */ virtual void initialize(const char *detectorHostName) = 0; - + /* Returns detector hostname /returns hostname @@ -44,7 +60,7 @@ public: /** * Returns status of receiver: idle, running or error */ - virtual slsReceiverDefs::runStatus getStatus() const = 0; + virtual slsReceiverDefs::runStatus getStatus() const = 0; /** * Returns File Name diff --git a/slsReceiverSoftware/slsReceiver/main.cpp b/slsReceiverSoftware/slsReceiver/main.cpp index 3513a26c7..87d947a7d 100644 --- a/slsReceiverSoftware/slsReceiver/main.cpp +++ b/slsReceiverSoftware/slsReceiver/main.cpp @@ -75,6 +75,7 @@ int main(int argc, char *argv[]) { //start tcp server thread if(user->start() == slsReceiverDefs::OK){ + cout << "DONE!" << endl; string str; cin>>str; //wait and look for an exit keyword diff --git a/slsReceiverSoftware/slsReceiver/slsReceiver.cpp b/slsReceiverSoftware/slsReceiver/slsReceiver.cpp index cd5ab551d..45359d02f 100644 --- a/slsReceiverSoftware/slsReceiver/slsReceiver.cpp +++ b/slsReceiverSoftware/slsReceiver/slsReceiver.cpp @@ -10,24 +10,26 @@ #include #include "slsReceiver.h" -#include "slsReceiverUDPFunctions.h" -#include "eigerReceiver.h" +//#include "slsReceiverUDPFunctions.h" +//#include "eigerReceiver.h" + +#include "UDPInterface.h" +//#include "UDPBaseImplementation.h" + + +#include "utilities.h" using namespace std; slsReceiver::slsReceiver(int argc, char *argv[], int &success){ + //creating base receiver - cout << "SLS Receiver" << endl; - receiverBase = new slsReceiverUDPFunctions(); int tcpip_port_no=-1; - - ifstream infile; string sLine,sargname; int iline = 0; - success=OK; string fname = ""; @@ -38,34 +40,34 @@ slsReceiver::slsReceiver(int argc, char *argv[], int &success){ if(iarg+1==argc){ cout << "no config file name given. Exiting." << endl; success=FAIL; - }else + } + else fname.assign(argv[iarg+1]); } } if((!fname.empty()) && (success == OK)){ -#ifdef VERBOSE - std::cout<< "config file name "<< fname << std::endl; -#endif + + VERBOSE_PRINT("config file name " + fname ); + infile.open(fname.c_str(), ios_base::in); if (infile.is_open()) { while(infile.good()){ getline(infile,sLine); iline++; -#ifdef VERBOSE - cout << sLine << endl; -#endif + + VERBOSE_PRINT(sLine); + if(sLine.find('#')!=string::npos){ -#ifdef VERBOSE - cout << "Line is a comment " << endl; -#endif + VERBOSE_PRINT( "Line is a comment "); continue; - }else if(sLine.length()<2){ -#ifdef VERBOSE - cout << "Empty line " << endl; -#endif + } + else if(sLine.length()<2){ + VERBOSE_PRINT("Empty line "); continue; - }else{ + } + else{ istringstream sstr(sLine); + //parameter name if(sstr.good()) sstr >> sargname; @@ -85,14 +87,13 @@ slsReceiver::slsReceiver(int argc, char *argv[], int &success){ } } infile.close(); - }else { + } + else { cout << "Error opening configuration file " << fname << endl; success = FAIL; } -#ifdef VERBOSE - cout << "Read configuration file of " << iline << " lines" << endl; -#endif + VERBOSE_PRINT("Read configuration file of " + iline + " lines"); } @@ -131,16 +132,16 @@ slsReceiver::slsReceiver(int argc, char *argv[], int &success){ } - if (success==OK) - tcpipInterface = new slsReceiverTCPIPInterface(success,receiverBase, tcpip_port_no); - //tcp ip interface - - - + if (success==OK){ + cout << "SLS Receiver" << endl; + udp_interface = UDPInterface::create("stasndard"); + tcpipInterface = new slsReceiverTCPIPInterface(success, udp_interface, tcpip_port_no); + //tcp ip interface + } } -slsReceiver::~slsReceiver() {if(receiverBase) delete receiverBase; if(tcpipInterface) delete tcpipInterface;} +slsReceiver::~slsReceiver() {if(udp_interface) delete udp_interface; if(tcpipInterface) delete tcpipInterface;} int slsReceiver::start() { @@ -158,7 +159,6 @@ void slsReceiver::closeFile(int p) { } - int64_t slsReceiver::getReceiverVersion(){ tcpipInterface->getReceiverVersion(); } @@ -166,20 +166,20 @@ int64_t slsReceiver::getReceiverVersion(){ void slsReceiver::registerCallBackStartAcquisition(int (*func)(char*, char*,int, int, void*),void *arg){ //tcpipInterface - receiverBase->registerCallBackStartAcquisition(func,arg); + udp_interface->registerCallBackStartAcquisition(func,arg); } void slsReceiver::registerCallBackAcquisitionFinished(void (*func)(int, void*),void *arg){ //tcpipInterface - receiverBase->registerCallBackAcquisitionFinished(func,arg); + udp_interface->registerCallBackAcquisitionFinished(func,arg); } void slsReceiver::registerCallBackRawDataReady(void (*func)(int, char*, int, FILE*, char*, void*),void *arg){ - //tcpipInterface - receiverBase->registerCallBackRawDataReady(func,arg); + //tcpipInterface + udp_interface->registerCallBackRawDataReady(func,arg); } diff --git a/slsReceiverSoftware/slsReceiver/slsReceiver.h b/slsReceiverSoftware/slsReceiver/slsReceiver.h index 1ab8c764a..899dabe8d 100644 --- a/slsReceiverSoftware/slsReceiver/slsReceiver.h +++ b/slsReceiverSoftware/slsReceiver/slsReceiver.h @@ -7,8 +7,12 @@ #include "slsReceiverTCPIPInterface.h" -#include "slsReceiverBase.h" +#include "UDPInterface.h" +#include "UDPBaseImplementation.h" +#include "sls_receiver_defs.h" +#include "receiver_defs.h" +#include "MySocketTCP.h" @@ -17,8 +21,8 @@ */ class slsReceiver : private virtual slsReceiverDefs { - -public: + + public: /** * Constructor * creates the tcp interface and the udp class @@ -80,9 +84,9 @@ public: void registerCallBackRawDataReady(void (*func)(int, char*, int, FILE*, char*, void*),void *arg); -private: - slsReceiverTCPIPInterface* tcpipInterface; - slsReceiverBase* receiverBase; + private: + slsReceiverTCPIPInterface* tcpipInterface; + UDPInterface* udp_interface; }; diff --git a/slsReceiverSoftware/slsReceiver/slsReceiverTCPIPInterface.cpp b/slsReceiverSoftware/slsReceiver/slsReceiverTCPIPInterface.cpp index cc143efc4..ea59d4bce 100644 --- a/slsReceiverSoftware/slsReceiver/slsReceiverTCPIPInterface.cpp +++ b/slsReceiverSoftware/slsReceiver/slsReceiverTCPIPInterface.cpp @@ -4,7 +4,7 @@ ***********************************************/ #include "slsReceiverTCPIPInterface.h" -#include "slsReceiverBase.h" +#include "UDPInterface.h" #include "gitInfoReceiver.h" #include "slsReceiverUsers.h" #include "slsReceiver.h" @@ -28,7 +28,7 @@ slsReceiverTCPIPInterface::~slsReceiverTCPIPInterface() { } -slsReceiverTCPIPInterface::slsReceiverTCPIPInterface(int &success, slsReceiverBase* rbase, int pn): +slsReceiverTCPIPInterface::slsReceiverTCPIPInterface(int &success, UDPInterface* rbase, int pn): myDetectorType(GOTTHARD), receiverBase(rbase), ret(OK), @@ -124,9 +124,9 @@ int slsReceiverTCPIPInterface::start(){ cout << "Could not create TCP Server thread" << endl; return FAIL; } -#ifdef VERBOSE + //#ifdef VERBOSE cout << "TCP Server thread created successfully." << endl; -#endif + //#endif return OK; } @@ -503,7 +503,7 @@ int slsReceiverTCPIPInterface::set_file_dir() { - +// LEO: do we need it in the base class? int slsReceiverTCPIPInterface::set_file_index() { ret=OK; int retval=-1; @@ -608,7 +608,7 @@ int slsReceiverTCPIPInterface::set_frame_index() { - +//LEO: is the client that commands the setup, or you just need the args? int slsReceiverTCPIPInterface::setup_udp(){ ret=OK; strcpy(mess,"could not set up udp connection"); @@ -1773,7 +1773,7 @@ int slsReceiverTCPIPInterface::set_detector_hostname() { - +//LEO: why the receiver should set the dynamic range? int slsReceiverTCPIPInterface::set_dynamic_range() { ret=OK; int retval=-1; diff --git a/slsReceiverSoftware/slsReceiver/slsReceiverTCPIPInterface.h b/slsReceiverSoftware/slsReceiver/slsReceiverTCPIPInterface.h index 423d0949a..adff81697 100644 --- a/slsReceiverSoftware/slsReceiver/slsReceiverTCPIPInterface.h +++ b/slsReceiverSoftware/slsReceiver/slsReceiverTCPIPInterface.h @@ -9,7 +9,7 @@ #include "sls_receiver_defs.h" #include "receiver_defs.h" #include "MySocketTCP.h" -#include "slsReceiverBase.h" +#include "UDPInterface.h" @@ -18,8 +18,8 @@ */ class slsReceiverTCPIPInterface : private virtual slsReceiverDefs { - -public: + + public: /** * Constructor * reads config file, creates socket, assigns function table @@ -27,16 +27,16 @@ public: * @param rbase pointer to the receiver base * @param pn port number (defaults to default port number) */ - slsReceiverTCPIPInterface(int &success, slsReceiverBase* rbase, int pn=-1); - + slsReceiverTCPIPInterface(int &success, UDPInterface* rbase, int pn=-1); + /** * Sets the port number to listen to. Take care that the client must know to whcih port it has to listen to, so normally it is better to use a fixes port from the instatiation or change it from the client. @param pn port number (-1 only get) \returns actual port number - */ - int setPortNumber(int pn=-1); - + */ + int setPortNumber(int pn=-1); + /** * Starts listening on the TCP port for client comminication \returns OK or FAIL @@ -234,7 +234,7 @@ private: detectorType myDetectorType; /** slsReceiverBase object */ - slsReceiverBase *receiverBase; + UDPInterface *receiverBase; /** Number of functions */ static const int numberOfFunctions = 256; diff --git a/slsReceiverSoftware/slsReceiver/slsReceiverUDPFunctions.cpp b/slsReceiverSoftware/slsReceiver/slsReceiverUDPFunctions.cpp index 3cc4fef38..bb8f213a5 100644 --- a/slsReceiverSoftware/slsReceiver/slsReceiverUDPFunctions.cpp +++ b/slsReceiverSoftware/slsReceiver/slsReceiverUDPFunctions.cpp @@ -6,6 +6,8 @@ #include "slsReceiverUDPFunctions.h" +#include "UDPBaseImplementation.h" + #include "moench02ModuleData.h" #include "gotthardModuleData.h" @@ -28,6 +30,9 @@ using namespace std; +slsReceiverUDPFunctions * slsReceiverUDPFunctions::create(void){ + return slsReceiverUDPFunctions(); +} slsReceiverUDPFunctions::slsReceiverUDPFunctions(): thread_started(0), @@ -695,7 +700,7 @@ void slsReceiverUDPFunctions::setupFilter(){ - +//LEO: it is not clear to me.. void slsReceiverUDPFunctions::setupFifoStructure(){ int64_t i; diff --git a/slsReceiverSoftware/slsReceiver/slsReceiverUDPFunctions.h b/slsReceiverSoftware/slsReceiver/slsReceiverUDPFunctions.h index 347a14358..6adce30ad 100644 --- a/slsReceiverSoftware/slsReceiver/slsReceiverUDPFunctions.h +++ b/slsReceiverSoftware/slsReceiver/slsReceiverUDPFunctions.h @@ -7,15 +7,17 @@ ***********************************************/ -#include "sls_receiver_defs.h" -#include "receiver_defs.h" -#include "genericSocket.h" +//#include "sls_receiver_defs.h" +//#include "receiver_defs.h" +//#include "genericSocket.h" #include "circularFifo.h" #include "singlePhotonDetector.h" #include "slsReceiverData.h" #include "moenchCommonMode.h" -#include "slsReceiverBase.h" + +#include "UDPInterface.h" +#include "UDPBaseImplementation.h" #ifdef MYROOT1 @@ -34,9 +36,10 @@ * @short does all the functions for a receiver, set/get parameters, start/stop etc. */ -class slsReceiverUDPFunctions : private virtual slsReceiverDefs, public slsReceiverBase { -public: +class slsReceiverUDPFunctions : private virtual slsReceiverDefs, public UDPInterface { + + public: /** * Constructor */ diff --git a/slsReceiverSoftware/slsReceiver/slsReceiverUsers.h b/slsReceiverSoftware/slsReceiver/slsReceiverUsers.h index a9d3626d2..50d6f38fe 100644 --- a/slsReceiverSoftware/slsReceiver/slsReceiverUsers.h +++ b/slsReceiverSoftware/slsReceiver/slsReceiverUsers.h @@ -1,4 +1,3 @@ - #ifndef SLS_RECEIVER_USERS_H #define SLS_RECEIVER_USERS_H @@ -29,7 +28,7 @@ public: * @param argv from command line * @param succecc socket creation was successfull */ - slsReceiverUsers(int argc, char *argv[], int &success); + slsReceiverUsers(int argc, char *argv[], int &success); /** Destructor */