diff --git a/slsReceiverSoftware/include/UDPStandardImplementation.h b/slsReceiverSoftware/include/UDPStandardImplementation.h index 1912bcbea..cc69e0da0 100644 --- a/slsReceiverSoftware/include/UDPStandardImplementation.h +++ b/slsReceiverSoftware/include/UDPStandardImplementation.h @@ -663,6 +663,9 @@ private: /** Total fifo size */ uint32_t fifoSize; + /** fifo buffer header size */ + uint32_t fifoBufferHeaderSize; + /** Missing Packet */ int missingPacketinFile; diff --git a/slsReceiverSoftware/include/genericSocket.h b/slsReceiverSoftware/include/genericSocket.h index 91debdec1..1ceb7203b 100644 --- a/slsReceiverSoftware/include/genericSocket.h +++ b/slsReceiverSoftware/include/genericSocket.h @@ -615,13 +615,14 @@ enum communicationProtocol{ /*int k = 0;*/ while(length>0){ - nsending = (length>packet_size) ? packet_size:length; + if(lengthpacket_size) ? packet_size:length; //works for eiger to get packets to discard image header packets nsent = recvfrom(socketDescriptor,(char*)buf+total_sent,nsending, 0, (struct sockaddr *) &clientAddress, &clientAddress_length); - if(nsent < packet_size) { - if(nsent){ - if((nsent != header_packet_size) && (nsent != -1)) + if(nsent != nsending){ //if((nsent != nsending)){ && (nsent < packet_size)){ + if(nsent && (nsent != header_packet_size) && (nsent != -1)) cprintf(RED,"Incomplete Packet size %d\n",nsent); - } break; } length-=nsent; diff --git a/slsReceiverSoftware/include/receiver_defs.h b/slsReceiverSoftware/include/receiver_defs.h index 8ecd46ba9..29c3f4994 100755 --- a/slsReceiverSoftware/include/receiver_defs.h +++ b/slsReceiverSoftware/include/receiver_defs.h @@ -129,7 +129,7 @@ typedef struct { - +#define JFRAU_FILE_FRAME_HEADER_LENGTH 16 #define JFRAU_FIFO_SIZE 2500 //cannot be less than max jobs per thread = 1000 #define JFRAU_PACKETS_PER_FRAME 128 #define JFRAU_HEADER_LENGTH 22 diff --git a/slsReceiverSoftware/src/UDPStandardImplementation.cpp b/slsReceiverSoftware/src/UDPStandardImplementation.cpp index 5823220be..bb64f86f9 100644 --- a/slsReceiverSoftware/src/UDPStandardImplementation.cpp +++ b/slsReceiverSoftware/src/UDPStandardImplementation.cpp @@ -368,7 +368,7 @@ int UDPStandardImplementation::setupFifoStructure(){ fifo[i] = new CircularFifo(fifoSize); //allocate memory - mem0[i] = (char*)malloc((bufferSize * numberofJobsPerBuffer + HEADER_SIZE_NUM_TOT_PACKETS) * fifoSize); + mem0[i] = (char*)malloc((bufferSize * numberofJobsPerBuffer + fifoBufferHeaderSize) * fifoSize); if (mem0[i] == NULL){ cprintf(BG_RED,"Error: Could not allocate memory for listening \n"); return FAIL; @@ -376,10 +376,10 @@ int UDPStandardImplementation::setupFifoStructure(){ //push free address into fifoFree buffer[i]=mem0[i]; - while (buffer[i] < (mem0[i]+(bufferSize * numberofJobsPerBuffer + HEADER_SIZE_NUM_TOT_PACKETS) * (fifoSize-1))) { + while (buffer[i] < (mem0[i]+(bufferSize * numberofJobsPerBuffer + fifoBufferHeaderSize) * (fifoSize-1))) { //cprintf(BLUE,"fifofree %d: push 0x%p\n",i,(void*)buffer[i]); /*for(int k=0;kpush(buffer[i])); @@ -387,7 +387,7 @@ int UDPStandardImplementation::setupFifoStructure(){ #ifdef DEBUG5 cprintf(BLUE,"Info: %d fifostructure free pushed into fifofree %p\n", i, (void*)(buffer[i])); #endif - buffer[i] += (bufferSize * numberofJobsPerBuffer + HEADER_SIZE_NUM_TOT_PACKETS); + buffer[i] += (bufferSize * numberofJobsPerBuffer + fifoBufferHeaderSize); } } cout << "Fifo structure(s) reconstructed" << endl; @@ -763,6 +763,7 @@ int UDPStandardImplementation::setDetectorType(const detectorType d){ maxFramesPerFile = MAX_FRAMES_PER_FILE; fifoSize = GOTTHARD_FIFO_SIZE; fifoDepth = GOTTHARD_FIFO_SIZE; + fifoBufferHeaderSize= HEADER_SIZE_NUM_TOT_PACKETS; //footerOffset = Not applicable; break; case PROPIX: @@ -776,6 +777,7 @@ int UDPStandardImplementation::setDetectorType(const detectorType d){ maxFramesPerFile = MAX_FRAMES_PER_FILE; fifoSize = PROPIX_FIFO_SIZE; fifoDepth = PROPIX_FIFO_SIZE; + fifoBufferHeaderSize= HEADER_SIZE_NUM_TOT_PACKETS; //footerOffset = Not applicable; break; case MOENCH: @@ -789,6 +791,7 @@ int UDPStandardImplementation::setDetectorType(const detectorType d){ maxFramesPerFile = MOENCH_MAX_FRAMES_PER_FILE; fifoSize = MOENCH_FIFO_SIZE; fifoDepth = MOENCH_FIFO_SIZE; + fifoBufferHeaderSize= HEADER_SIZE_NUM_TOT_PACKETS; //footerOffset = Not applicable; break; case EIGER: @@ -804,6 +807,7 @@ int UDPStandardImplementation::setDetectorType(const detectorType d){ fifoSize = EIGER_FIFO_SIZE; fifoDepth = EIGER_FIFO_SIZE; footerOffset = EIGER_DATA_PACKET_HEADER_SIZE + oneDataSize; + fifoBufferHeaderSize= HEADER_SIZE_NUM_TOT_PACKETS; break; case JUNGFRAUCTB: packetsPerFrame = JCTB_PACKETS_PER_FRAME; @@ -816,6 +820,7 @@ int UDPStandardImplementation::setDetectorType(const detectorType d){ maxFramesPerFile = JFCTB_MAX_FRAMES_PER_FILE; fifoSize = JCTB_FIFO_SIZE; fifoDepth = JCTB_FIFO_SIZE; + fifoBufferHeaderSize= HEADER_SIZE_NUM_TOT_PACKETS; //footerOffset = Not applicable; break; case JUNGFRAU: @@ -829,6 +834,7 @@ int UDPStandardImplementation::setDetectorType(const detectorType d){ maxFramesPerFile = JFRAU_MAX_FRAMES_PER_FILE; fifoDepth = JFRAU_FIFO_SIZE; fifoSize = JFRAU_FIFO_SIZE; + fifoBufferHeaderSize= JFRAU_FILE_FRAME_HEADER_LENGTH; //footerOffset = Not applicable; break; default: @@ -1988,8 +1994,7 @@ void UDPStandardImplementation::startListening(){ } //write packet count to buffer - if(myDetectorType == EIGER) - (*((uint32_t*)(buffer[ithread]))) = (rc/onePacketSize); + (*((uint32_t*)(buffer[ithread]))) = (rc/onePacketSize); if(dataCompressionEnable) (*((uint32_t*)(buffer[ithread]))) = processListeningBuffer(ithread, carryonBufferSize, tempBuffer, rc); @@ -2037,7 +2042,7 @@ int UDPStandardImplementation::prepareAndListenBuffer(int ithread, int cSize, ch int receivedSize = 0; //carry over from previous buffer - if(cSize) memcpy(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS, temp, cSize); + if(cSize) memcpy(buffer[ithread] + fifoBufferHeaderSize, temp, cSize); if(!activated){ @@ -2059,14 +2064,14 @@ int UDPStandardImplementation::prepareAndListenBuffer(int ithread, int cSize, ch //copy dummy packets receivedSize = framestoclone*packetsPerFrame*onePacketSize; - memset(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS, 0xFF,receivedSize); + memset(buffer[ithread] + fifoBufferHeaderSize, 0xFF,receivedSize); //set fnum, pnum and deactivatedpacket label eiger_packet_header_t* header; eiger_packet_footer_t* footer; int pnum=0; //loop by each packet - for(int offset=HEADER_SIZE_NUM_TOT_PACKETS; + for(int offset=fifoBufferHeaderSize; offsetReceiveDataOnly(buffer[ithread] + fifoBufferHeaderSize, lSize); + header = (jfrau_packet_header_t*)(buffer[ithread] + fifoBufferHeaderSize + iloop * (JFRAU_HEADER_LENGTH+JFRAU_ONE_DATA_SIZE)); + } + else{ + receivedSize = udpSocket[ithread]->ReceiveDataOnly(buffer[ithread] + fifoBufferHeaderSize + cSize, (bufferSize * numberofJobsPerBuffer) - cSize); + //eiger returns 0 when header packet caught + while(receivedSize < onePacketSize && status != TRANSMITTING) + receivedSize = udpSocket[ithread]->ReceiveDataOnly(buffer[ithread] + fifoBufferHeaderSize + cSize, (bufferSize * numberofJobsPerBuffer) - cSize); - if(status != TRANSMITTING) - receivedSize = udpSocket[ithread]->ReceiveDataOnly(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS + cSize, (bufferSize * numberofJobsPerBuffer) - cSize); - //eiger returns 0 when header packet caught - while(receivedSize < onePacketSize && status != TRANSMITTING) - receivedSize = udpSocket[ithread]->ReceiveDataOnly(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS + cSize, (bufferSize * numberofJobsPerBuffer) - cSize); + } + } totalListeningPacketCount[ithread] += (receivedSize/onePacketSize); @@ -2105,14 +2120,14 @@ int UDPStandardImplementation::prepareAndListenBuffer(int ithread, int cSize, ch if(myDetectorType == JUNGFRAU){ jfrau_packet_header_t* header; - for(int iloop=0;iloop<2;iloop++){ - header = (jfrau_packet_header_t*)(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS + iloop * (JFRAU_HEADER_LENGTH+JFRAU_ONE_DATA_SIZE)); - cprintf(RED,"[%d]: packetnumber:%x\n",iloop, (*( (uint8_t*) header->packetNumber))); - cprintf(RED," : framenumber :%x\n", (*( (uint32_t*) header->frameNumber))&0xffffff); + for(int iloop=0;iloop<128;iloop++){ + header = (jfrau_packet_header_t*)(buffer[ithread] + fifoBufferHeaderSize + iloop * (JFRAU_HEADER_LENGTH+JFRAU_ONE_DATA_SIZE)); + cprintf(RED,"[%d]: packetnumber:%d\n",iloop, (*( (uint8_t*) header->packetNumber))); + cprintf(RED," : framenumber :%d\n", (*( (uint32_t*) header->frameNumber))&frameIndexMask); } }else if(myDetectorType == EIGER){ - eiger_packet_header_t* header = (eiger_packet_header_t*) (buffer[ithread]+HEADER_SIZE_NUM_TOT_PACKETS); - eiger_packet_footer_t* footer = (eiger_packet_footer_t*)(buffer[ithread] + footerOffset + HEADER_SIZE_NUM_TOT_PACKETS); + eiger_packet_header_t* header = (eiger_packet_header_t*) (buffer[ithread]+fifoBufferHeaderSize); + eiger_packet_footer_t* footer = (eiger_packet_footer_t*)(buffer[ithread] + footerOffset + fifoBufferHeaderSize); cprintf(GREEN,"thread:%d footeroffset:%dsubframenum:%d oldpacketnum:%d new pnum:%d new fnum:%d\n", ithread,footerOffset, (*( (unsigned int*) header->subFrameNumber)), @@ -2143,15 +2158,15 @@ void UDPStandardImplementation::startFrameIndices(int ithread){ startFrameIndex = 0; //frame number always resets break; case JUNGFRAU: - header = (jfrau_packet_header_t*)(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS); - startFrameIndex = (*( (uint32_t*) header->frameNumber))&0xffffff; + header = (jfrau_packet_header_t*)(buffer[ithread] + fifoBufferHeaderSize); + startFrameIndex = (*( (uint32_t*) header->frameNumber))&frameIndexMask; break; default: if(shortFrameEnable < 0){ - startFrameIndex = (((((uint32_t)(*((uint32_t*)(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS))))+1) + startFrameIndex = (((((uint32_t)(*((uint32_t*)(buffer[ithread] + fifoBufferHeaderSize))))+1) & (frameIndexMask)) >> frameIndexOffset); }else{ - startFrameIndex = ((((uint32_t)(*((uint32_t*)(buffer[ithread]+HEADER_SIZE_NUM_TOT_PACKETS)))) + startFrameIndex = ((((uint32_t)(*((uint32_t*)(buffer[ithread]+fifoBufferHeaderSize)))) & (frameIndexMask)) >> frameIndexOffset); } break; @@ -2302,7 +2317,7 @@ uint32_t UDPStandardImplementation::processListeningBuffer(int ithread, int &cSi case PROPIX: //for short frames, 1 packet/frame, so split frames is not a topic if(shortFrameEnable == -1){ - lastPacketOffset = (((packetCount - 1) * onePacketSize) + HEADER_SIZE_NUM_TOT_PACKETS); + lastPacketOffset = (((packetCount - 1) * onePacketSize) + fifoBufferHeaderSize); #ifdef DEBUG4 cprintf(BLUE, "Listening_Thread %d: Last Packet Offset:%d\n",ithread, lastPacketOffset); #endif @@ -2319,17 +2334,17 @@ uint32_t UDPStandardImplementation::processListeningBuffer(int ithread, int &cSi } } #ifdef DEBUG4 - cprintf(BLUE, "Listening_Thread %d: First Header:%d\n", (((((uint32_t)(*((uint32_t*)(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS))))+1) + cprintf(BLUE, "Listening_Thread %d: First Header:%d\n", (((((uint32_t)(*((uint32_t*)(buffer[ithread] + fifoBufferHeaderSize))))+1) & (frameIndexMask)) >> frameIndexOffset)); #endif break; case MOENCH: - lastPacketOffset = (((packetCount - 1) * onePacketSize) + HEADER_SIZE_NUM_TOT_PACKETS); + lastPacketOffset = (((packetCount - 1) * onePacketSize) + fifoBufferHeaderSize); #ifdef DEBUG4 cprintf(BLUE, "Listening_Thread %d: First Header:%d\t First Packet:%d\t Last Header:%d\t Last Packet:%d\tLast Packet Offset:%d\n", - (((((uint32_t)(*((uint32_t*)(buffer[ithread]+HEADER_SIZE_NUM_TOT_PACKETS))))) & (frameIndexMask)) >> frameIndexOffset), - ((((uint32_t)(*((uint32_t*)(buffer[ithread]+HEADER_SIZE_NUM_TOT_PACKETS))))) & (packetIndexMask)), + (((((uint32_t)(*((uint32_t*)(buffer[ithread]+fifoBufferHeaderSize))))) & (frameIndexMask)) >> frameIndexOffset), + ((((uint32_t)(*((uint32_t*)(buffer[ithread]+fifoBufferHeaderSize))))) & (packetIndexMask)), (((((uint32_t)(*((uint32_t*)(buffer[ithread]+lastpacketoffset))))) & (frameIndexMask)) >> frameIndexOffset), ((((uint32_t)(*((uint32_t*)(buffer[ithread]+lastpacketoffset))))) & (packetIndexMask)), lastPacketOffset); @@ -2357,33 +2372,33 @@ uint32_t UDPStandardImplementation::processListeningBuffer(int ithread, int &cSi case JUNGFRAU: - lastPacketOffset = (((packetCount - 1) * onePacketSize) + HEADER_SIZE_NUM_TOT_PACKETS); + lastPacketOffset = (((packetCount - 1) * onePacketSize) + fifoBufferHeaderSize); #ifdef DEBUG4 - header = (jfrau_packet_header_t*) (buffer[ithread]+HEADER_SIZE_NUM_TOT_PACKETS); + header = (jfrau_packet_header_t*) (buffer[ithread]+fifoBufferHeaderSize); cprintf(BLUE, "Listening_Thread: First Header:%d\t First Packet:%d\n", - (*( (uint32_t*) header->frameNumber))&0xffffff, + (*( (uint32_t*) header->frameNumber))&frameIndexMask, (*( (uint8_t*) header->packetNumber))); #endif header = (jfrau_packet_header_t*) (buffer[ithread]+lastPacketOffset); #ifdef DEBUG4 cprintf(BLUE, "Listening_Thread: Last Header:%du\t Last Packet:%d\n", - (*( (uint32_t*) header->frameNumber))&0xffffff, + (*( (uint32_t*) header->frameNumber))&frameIndexMask, (*( (uint8_t*) header->packetNumber))); #endif //jungfrau last packet value is 0, so find the last packet and store the others in a temp storage if((*( (uint8_t*) header->packetNumber))){ //cprintf(RED,"entering missing packet zone\n"); - lastFrameHeader64 = (*( (uint32_t*) header->frameNumber))&0xffffff; + lastFrameHeader64 = (*( (uint32_t*) header->frameNumber))&frameIndexMask; cSize += onePacketSize; lastPacketOffset -= onePacketSize; --packetCount; - while (lastFrameHeader64 == ((*( (uint32_t*) header->frameNumber))&0xffffff)){ + while (lastFrameHeader64 == ((*( (uint32_t*) header->frameNumber))&frameIndexMask)){ cSize += onePacketSize; lastPacketOffset -= onePacketSize; header = (jfrau_packet_header_t*) (buffer[ithread]+lastPacketOffset); #ifdef DEBUG4 cprintf(RED,"new header:%d new packet:%d\n", - (*( (uint32_t*) header->frameNumber))&0xffffff, + (*( (uint32_t*) header->frameNumber))&frameIndexMask, (*( (uint8_t*) header->packetNumber))); #endif --packetCount; @@ -2713,7 +2728,7 @@ void UDPStandardImplementation::handleWithoutDataCompression(int ithread, char* uint64_t tempframenumber; uint32_t pnum; uint32_t snum; - if(getFrameandPacketNumber(ithread, wbuffer + HEADER_SIZE_NUM_TOT_PACKETS,tempframenumber,pnum,snum) == FAIL){ + if(getFrameandPacketNumber(ithread, wbuffer + fifoBufferHeaderSize,tempframenumber,pnum,snum) == FAIL){ //error in frame number sent by fpga while(!fifoFree[ithread]->push(wbuffer)); @@ -2732,7 +2747,7 @@ void UDPStandardImplementation::handleWithoutDataCompression(int ithread, char* //callback to write data if (cbAction < DO_EVERYTHING) - rawDataReadyCallBack((int)currentFrameNumber[ithread], wbuffer + HEADER_SIZE_NUM_TOT_PACKETS, npackets * onePacketSize, + rawDataReadyCallBack((int)currentFrameNumber[ithread], wbuffer + fifoBufferHeaderSize, npackets * onePacketSize, sfilefd[ithread], latestData[ithread],pRawDataReady);//know which thread from sfilefd @@ -2778,7 +2793,7 @@ void UDPStandardImplementation::writeFileWithoutCompression(int ithread, char* w //if write enabled if((fileWriteEnable) && (sfilefd[ithread])){ if(numpackets){ - int offset = HEADER_SIZE_NUM_TOT_PACKETS; + int offset = fifoBufferHeaderSize; uint64_t nextFileFrameNumber; int packetsWritten = 0; //if(ithread) cout<<"numpackets:"<push(wbuffer)); return; @@ -2942,7 +2957,7 @@ void UDPStandardImplementation::copyFrameToGui(int ithread, char* buffer, uint32 //copy date guiNumPackets[ithread] = numpackets; strcpy(guiFileName[ithread],completeFileName[ithread]); - memcpy(latestData[ithread],buffer+ HEADER_SIZE_NUM_TOT_PACKETS , numpackets*onePacketSize); + memcpy(latestData[ithread],buffer+ fifoBufferHeaderSize , numpackets*onePacketSize); //let it know its got data sem_post(&dataCallbackWriterSemaphore[ithread]); @@ -2970,7 +2985,7 @@ void UDPStandardImplementation::handleDataCompression(int ithread, char* wbuffer uint64_t tempframenumber=-1; uint32_t pnum; uint32_t snum; - if(getFrameandPacketNumber(ithread, wbuffer + HEADER_SIZE_NUM_TOT_PACKETS, tempframenumber,pnum,snum) == FAIL){ + if(getFrameandPacketNumber(ithread, wbuffer + fifoBufferHeaderSize, tempframenumber,pnum,snum) == FAIL){ //error in frame number sent by fpga while(!fifoFree[ithread]->push(wbuffer)); return; @@ -2988,7 +3003,7 @@ void UDPStandardImplementation::handleDataCompression(int ithread, char* wbuffer //variable definitions char* buff[2]={0,0}; //an array just to be compatible with copyframetogui - char* data = wbuffer+ HEADER_SIZE_NUM_TOT_PACKETS; //data pointer to the next memory to be analysed + char* data = wbuffer+ fifoBufferHeaderSize; //data pointer to the next memory to be analysed int ndata; //size of data returned uint32_t np; //remaining number of packets returned uint32_t npackets = (uint32_t)(*((uint32_t*)wbuffer)); //number of total packets @@ -3089,7 +3104,7 @@ void UDPStandardImplementation::handleDataCompression(int ithread, char* wbuffer remainingsize -= ((buff[0] + ndata) - data); data = buff[0] + ndata; - if(data > (wbuffer + HEADER_SIZE_NUM_TOT_PACKETS + npackets * onePacketSize) ) + if(data > (wbuffer + fifoBufferHeaderSize + npackets * onePacketSize) ) cprintf(BG_RED,"Writing_Thread %d: Error: Compression data goes out of bounds!\n", ithread); }