diff --git a/slsReceiverSoftware/include/UDPStandardImplementation.h b/slsReceiverSoftware/include/UDPStandardImplementation.h index 42650e9bb..3462ded22 100644 --- a/slsReceiverSoftware/include/UDPStandardImplementation.h +++ b/slsReceiverSoftware/include/UDPStandardImplementation.h @@ -442,8 +442,9 @@ private: * Its called for the first packet of a scan or acquistion * Sets the startframeindices and the variables to know if acquisition started * @param ithread listening thread number + * @param numbytes number of bytes it listened to */ - void startFrameIndices(int ithread); + void startFrameIndices(int ithread, int numbytes); /** * This is called when udp socket is shut down diff --git a/slsReceiverSoftware/src/UDPStandardImplementation.cpp b/slsReceiverSoftware/src/UDPStandardImplementation.cpp index 84d1adcbc..ed0df1b47 100644 --- a/slsReceiverSoftware/src/UDPStandardImplementation.cpp +++ b/slsReceiverSoftware/src/UDPStandardImplementation.cpp @@ -828,15 +828,17 @@ void UDPStandardImplementation::setupFifoStructure(){ if(myDetectorType == MOENCH) fifosize = MOENCH_FIFO_SIZE; else if(myDetectorType == EIGER) - fifosize = EIGER_FIFO_SIZE; + fifosize = EIGER_FIFO_SIZE * packetsPerFrame; if(fifosize % numJobsPerThread) fifosize = (fifosize/numJobsPerThread)+1; else fifosize = fifosize/numJobsPerThread; - - cout << "Number of Frames per buffer:" << numJobsPerThread << endl; + if(myDetectorType == EIGER) + cout << "1 packet per buffer" << endl; + else + cout << "Number of Frames per buffer:" << numJobsPerThread << endl; cout << "Fifo Size:" << fifosize << endl; /* @@ -854,27 +856,32 @@ void UDPStandardImplementation::setupFifoStructure(){ #endif delete fifoFree[i]; } - if(fifo[i]) delete fifo[i]; + if(fifo[i]) delete fifo[i]; if(mem0[i]) free(mem0[i]); fifoFree[i] = new CircularFifo(fifosize); fifo[i] = new CircularFifo(fifosize); + int whatperbuffer = bufferSize; + if(myDetectorType == EIGER) + whatperbuffer = onePacketSize; + //allocate memory - mem0[i]=(char*)malloc((bufferSize * numJobsPerThread + HEADER_SIZE_NUM_TOT_PACKETS)*fifosize); + mem0[i]=(char*)malloc((whatperbuffer * numJobsPerThread + HEADER_SIZE_NUM_TOT_PACKETS)*fifosize); /** shud let the client know about this */ if (mem0[i]==NULL){ cout<<"++++++++++++++++++++++ COULD NOT ALLOCATE MEMORY FOR LISTENING !!!!!!!+++++++++++++++++++++" << endl; exit(-1); } + buffer[i]=mem0[i]; //push the addresses into freed fifoFree and writingFifoFree - while (buffer[i]<(mem0[i]+(bufferSize * numJobsPerThread + HEADER_SIZE_NUM_TOT_PACKETS)*(fifosize-1))) { + while (buffer[i]<(mem0[i]+(whatperbuffer * numJobsPerThread + HEADER_SIZE_NUM_TOT_PACKETS)*(fifosize-1))) { fifoFree[i]->push(buffer[i]); #ifdef FIFO_DEBUG cprintf(BLUE,"%d fifostructure free pushed into fifofree %x\n", i, (void*)(buffer[i])); #endif - buffer[i]+=(bufferSize * numJobsPerThread + HEADER_SIZE_NUM_TOT_PACKETS); + buffer[i]+=(whatperbuffer * numJobsPerThread + HEADER_SIZE_NUM_TOT_PACKETS); } } cout << "Fifo structure(s) reconstructed" << endl; @@ -1683,7 +1690,8 @@ int UDPStandardImplementation::startListening(){ if(tempchar) {delete [] tempchar;tempchar = NULL;} if(myDetectorType != EIGER) tempchar = new char[onePacketSize * ((packetsPerFrame/numListeningThreads) - 1)]; //gotthard: 1packet size, moench:39 packet size - + else + maxBufferSize = 0; @@ -1747,7 +1755,7 @@ int UDPStandardImplementation::startListening(){ //start indices for each start of scan/acquisition if((!measurementStarted) && (rc > 0)){ pthread_mutex_lock(&progress_mutex); - startFrameIndices(ithread); + startFrameIndices(ithread, rc); pthread_mutex_unlock(&progress_mutex); } @@ -1825,6 +1833,7 @@ int UDPStandardImplementation::startListening(){ case EIGER: //because even headers might be included, so not packet count (*((uint32_t*)(buffer[ithread]))) = rc; + packetcount = 1; break; default: @@ -1890,16 +1899,16 @@ int UDPStandardImplementation::startWriting(){ thread_started = 1; int totalheader = HEADER_SIZE_NUM_TOT_PACKETS + EIGER_HEADER_LENGTH; - int numpackets[numListeningThreads], popready[numListeningThreads], woffset[numListeningThreads], nf; - bool endofacquisition, startheader[numListeningThreads]; + int numpackets[numListeningThreads], popready[numListeningThreads], nf; + bool startdatapacket[numListeningThreads],fullframe[numListeningThreads]; uint32_t tempframenum[numListeningThreads]; uint32_t lastpacketheader[numListeningThreads], currentpacketheader[numListeningThreads]; int numberofmissingpackets[numListeningThreads]; - char* tempbuffer[numListeningThreads] = NULL; + char* tempbuffer = NULL; int tempoffset[numListeningThreads]; int LAST_PACKET_VALUE; - bool fullframe; + char* wbuf[numListeningThreads];//interleaved @@ -1930,31 +1939,33 @@ int UDPStandardImplementation::startWriting(){ //allow them all to be popped initially for(i=0;ipop(wbuf[i]); #ifdef FIFO_DEBUG @@ -1977,27 +1985,18 @@ int UDPStandardImplementation::startWriting(){ #endif numpackets[i] = (uint32_t)(*((uint32_t*)wbuf[i])); #ifdef VERYDEBUG - cout << i << " numpackets:" << dec << numpackets[i] << "for fifo :"<< i << endl; + cprintf(GREEN,"%d numpackets: %d for fifo :%d\n", ithread, numpackets[i], i); #endif - //dont pop until ready - popready[i] = 0; - //reset offset - woffset[i] = 0; + //dont pop again if dummy packet + if(!numpackets[i]) + popready[i] = 0; } } - //check for end of acquisition - endofacquisition = true; - for(i=0;i= numpackets[i]){ - popready[i] = 1; - fullframe = false; - break; - } + //header packet + if( 0x01 == (*(uint8_t*)(((eiger_image_header *)((char*)(wbuf[i])))->header_confirm))){ - //check if header - if( 0x01 == (*(uint16_t*)(((eiger_image_header *)((char*)(wbuf[i] + woffset[i])))->header_confirm))){ - //expected frame header -eiger frame numbers start at 1, so need to -1 - if(tempframenum[i] == (htonl(*(unsigned int*)((eiger_image_header *)((char*)(wbuf[ithread] + woffset[i])))->fnum)+(startFrameIndex-1))){ - woffset[i] += EIGER_HEADER_LENGTH; - startheader[i] = true; - numberofmissingpackets[i] = 0; - lastpacketheader[i] = -1; - tempoffset[i] = 0; - } - //wrong header - leave - else{ - numberofmissingpackets[i] += (LAST_PACKET_VALUE - lastpacketheader[i]); - tempframenum[i]++; - //add missing packets - for(j=0;jfnum)+(startFrameIndex-1); + //next frame, leave else{ - //update current packet - currentpacketheader[i] = ((*(uint8_t*)(((eiger_packet_header *)((char*)(wbuf[i] + woffset[i])))->num4))); - //last packet - leave - if(currentpacketheader[i] == LAST_PACKET_VALUE){ - //fill buffer - tempbuffer[i][tempoffset[i]] = wbuf[i] + woffset[i]; - woffset[i] += onePacketSize; - //reset - startheader[i] = false; - lastpacketheader[i] = -1; - tempoffset[i] = 0; - break; - } - //same frame packet - if(currentpacketheader[i] > lastpacketheader[i]){ - - if(!startheader[i]){ - tempframenum[i]++; - } - else - startheader[i] = false; - lastpacketheader[i] = -1; - numberofmissingpackets[i] = 0; - numberofmissingpackets[i] += (currentpacketheader[i] - lastpacketheader[i] -1); - //add missing packets - for(j=0;jnum4))); + //same frame packet - continue building frame + if(currentpacketheader[i] > lastpacketheader[i]){ + //add missing packets + numberofmissingpackets[i] += (currentpacketheader[i] - lastpacketheader[i] -1); + for(j=0;jnum1)) = currframenum; - //overwriting port number and dynamic range - if (!j) (*(uint8_t*)(((eiger_packet_header *)((char*)(tempoffset[j] + onePacketSize*i)))->num3)) = (dynamicRange<<2); - else (*(uint8_t*)(((eiger_packet_header *)((char*)(tempoffset[j] + onePacketSize*i)))->num3)) = ((dynamicRange<<2)|(0x1)); - - #ifdef VERYDEBUG - cprintf(GREEN, "%d - 0x%x - %d\n", i, - (*(uint8_t*)(((eiger_packet_header *)((char*)(tempoffset[j] + i*onePacketSize)))->num3)), - (*(uint8_t*)(((eiger_packet_header *)((char*)(tempoffset[j] + i*onePacketSize)))->num4))); - #endif - - } - - //for 32 bit,port number needs to be changed and packet number reconstructed - if(dynamicRange == 32){ - for (i = 0; i < packetsPerFrame/4; i++){ - //new packet number that has space for 16 bit - (*(uint16_t*)(((eiger_packet_header *)((char*)(tempoffset[j] + onePacketSize*i)))->num2)) - = ((*(uint8_t*)(((eiger_packet_header *)((char*)(tempoffset[j] + onePacketSize*i)))->num4))); - - #ifdef VERYDEBUG - cprintf(GREEN, "%d - 0x%x - %d - %d\n", i, - (*(uint8_t*)(((eiger_packet_header *)((char*)(tempoffset[j] + i*onePacketSize)))->num3)), - (*(uint8_t*)(((eiger_packet_header *)((char*)(tempoffset[j] + i*onePacketSize)))->num4)), - (*(uint16_t*)(((eiger_packet_header *)((char*)(tempoffset[j] + i*onePacketSize)))->num2))); - #endif - } - for (i = packetsPerFrame/4; i < packetsPerFrame/2; i++){ - //new packet number that has space for 16 bit - (*(uint16_t*)(((eiger_packet_header *)((char*)(tempoffset[j] + onePacketSize*i)))->num2)) - = ((*(uint8_t*)(((eiger_packet_header *)((char*)(tempoffset[j] + onePacketSize*i)))->num4))+(packetsPerFrame/4)); - - #ifdef VERYDEBUG - cprintf(GREEN, "%d -0x%x - %d - %d\n", i, - (*(uint8_t*)(((eiger_packet_header *)((char*)(tempoffset[j] + i*onePacketSize)))->num3)), - (*(uint8_t*)(((eiger_packet_header *)((char*)(tempoffset[j] + i*onePacketSize)))->num4)), - (*(uint16_t*)(((eiger_packet_header *)((char*)(tempoffset[j] + i*onePacketSize)))->num2))); - #endif + //next frame packet - leave + else{ + //add missing packets + numberofmissingpackets += (LAST_PACKET_VALUE = lastpacketheader[i]); + for(j=0;jpush(tempoffset[i])); - #ifdef FIFO_DEBUG - cprintf(GREEN,"%d writer freed pushed into fifofree %x for listener %d\n",ithread, (void*)(tempoffset[i]),i); - #endif - } + //check if a full frame received + if(fullframe[0] && fullframe[1]){ + for(int i=0;ifnum); + //check if its a header + if(EIGER_HEADER_LENGTH == numbytes) + startFrameIndex = (htonl(*(unsigned int*)((eiger_image_header *)((char*)(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS)))->fnum))-1; + //missed header packet, so default value + else + startFrameIndex = 0; } //gotthard has +1 for frame number and not a short frame else if ((myDetectorType == GOTTHARD) && (shortFrame == -1)) @@ -2321,13 +2229,13 @@ void UDPStandardImplementation::startFrameIndices(int ithread){ //start of acquisition if(!acqStarted){ startAcquisitionIndex=startFrameIndex; - currframenum = startAcquisitionIndex; + //currframenum = startAcquisitionIndex; acqStarted = true; cprintf(BLUE,"%d startAcquisitionIndex:%d\n", ithread, startAcquisitionIndex); } //for scans, cuz currfraenum resets else if (myDetectorType == EIGER){ - startFrameIndex += currframenum; + startFrameIndex += (currframenum+1); } @@ -2369,12 +2277,18 @@ void UDPStandardImplementation::stopListening(int ithread, int rc, int &pc, int //push the last buffer into fifo else{ - pc = (rc/onePacketSize); + if(myDetectorType == EIGER){ + (*((uint32_t*)(buffer[ithread]))) = rc; + pc = 1; + }else{ + pc = (rc/onePacketSize); + (*((uint32_t*)(buffer[ithread]))) = pc; + } #ifdef VERYDEBUG cprintf(BLUE,"%d last rc:%d\n",ithread, rc); cprintf(BLUE,"%d last packetcount:%d\n", ithread, pc); #endif - (*((uint32_t*)(buffer[ithread]))) = pc; + totalListeningFrameCount[ithread] += pc; while(!fifo[ithread]->push(buffer[ithread])); #ifdef FIFO_DEBUG