diff --git a/slsReceiverSoftware/include/UDPBaseImplementation.h b/slsReceiverSoftware/include/UDPBaseImplementation.h index 6a8c53bd3..3ece433dd 100644 --- a/slsReceiverSoftware/include/UDPBaseImplementation.h +++ b/slsReceiverSoftware/include/UDPBaseImplementation.h @@ -486,7 +486,10 @@ protected: /** structure of an eiger image header*/ typedef struct { - unsigned char header_before[20]; + //unsigned char header_before1[5]; + //unsigned char header_confirm[1]; + //unsigned char header_before2[14]; + unsigned char header_before[20]; unsigned char fnum[4]; unsigned char header_after[24]; } eiger_image_header; diff --git a/slsReceiverSoftware/include/genericSocket.h b/slsReceiverSoftware/include/genericSocket.h index 0ccc80adf..ed5d386a1 100644 --- a/slsReceiverSoftware/include/genericSocket.h +++ b/slsReceiverSoftware/include/genericSocket.h @@ -94,18 +94,11 @@ enum communicationProtocol{ typedef struct { - unsigned char header_before[20]; + unsigned char header_before[20]; unsigned char fnum[4]; unsigned char header_after[24]; } eiger_image_header; -typedef struct -{ - unsigned char header_before[19]; - unsigned char fnum[4]; - unsigned char header_after[25]; -} eiger_image_header32; - genericSocket(const char* const host_ip_or_name, unsigned short int const port_number, communicationProtocol p, int ps = DEFAULT_PACKET_SIZE) : // portno(port_number), protocol(p), @@ -581,24 +574,13 @@ typedef struct case UDP: if (socketDescriptor<0) return -1; -/* - cout <<"******listening inside genericsocket"<0){ nsending = (length>packet_size) ? packet_size:length; - - /* - +/* //created for debugging on 11.05.2015 nsending=5000; nsent = recvfrom(socketDescriptor,(char*)buf,nsending, 0, (struct sockaddr *) &clientAddress, &clientAddress_length); @@ -616,14 +598,9 @@ typedef struct k++; */ - nsent = recvfrom(socketDescriptor,(char*)buf+total_sent,nsending, 0, (struct sockaddr *) &clientAddress, &clientAddress_length); if(!nsent) break; - if(nsent == 16) { - //cout << "."; - continue; - } length-=nsent; total_sent+=nsent; } diff --git a/slsReceiverSoftware/src/UDPBaseImplementation.cpp b/slsReceiverSoftware/src/UDPBaseImplementation.cpp index b279147ae..e1a630103 100644 --- a/slsReceiverSoftware/src/UDPBaseImplementation.cpp +++ b/slsReceiverSoftware/src/UDPBaseImplementation.cpp @@ -624,7 +624,7 @@ void UDPBaseImplementation::setupFifoStructure(){ FILE_LOG(logDEBUG) << __AT__ < mem0[i]=(char*)malloc((bufferSize * numJobsPerThread + HEADER_SIZE_NUM_TOT_PACKETS)*fifosize); /** shud let the client know about this */ if (mem0[i]==NULL){ - cout<<"++++++++++++++++++++++ COULD NOT ALLOCATE MEMORY FOR LISTENING !!!!!!!+++++++++++++++++++++" << endl; + cprintf(BG_RED,"++++++++++++++++++++++ COULD NOT ALLOCATE MEMORY FOR LISTENING !!!!!!!+++++++++++++++++++++\n"); exit(-1); } buffer[i]=mem0[i]; @@ -1754,7 +1754,7 @@ int i; cerr << ithread << " recvfrom() failed:"<push(buffer[ithread]); exit(-1); } diff --git a/slsReceiverSoftware/src/UDPStandardImplementation.cpp b/slsReceiverSoftware/src/UDPStandardImplementation.cpp index cd4963e4b..5b6ca4bdf 100644 --- a/slsReceiverSoftware/src/UDPStandardImplementation.cpp +++ b/slsReceiverSoftware/src/UDPStandardImplementation.cpp @@ -839,14 +839,13 @@ void UDPStandardImplementation::setupFifoStructure(){ else fifosize = fifosize/numJobsPerThread; + //one packet per buffer + if(myDetectorType == EIGER) + fifosize *= (packetsPerFrame/(numListeningThreads)); cout << "Number of Frames per buffer:" << numJobsPerThread << endl; cout << "Fifo Size:" << fifosize << endl; - /* - //for testing - numJobsPerThread = 3; fifosize = 11; - */ for(int i=0;i(fifosize); fifo[i] = new CircularFifo(fifosize); //allocate memory - mem0[i]=(char*)malloc((bufferSize * numJobsPerThread + HEADER_SIZE_NUM_TOT_PACKETS)*fifosize); + //if eiger, allocate only one packet size per buffer + if(myDetectorType == EIGER) + mem0[i]=(char*)malloc((onePacketSize * numJobsPerThread)*fifosize); + else + mem0[i]=(char*)malloc((bufferSize * numJobsPerThread + HEADER_SIZE_NUM_TOT_PACKETS)*fifosize); + /** shud let the client know about this */ if (mem0[i]==NULL){ - cout<<"++++++++++++++++++++++ COULD NOT ALLOCATE MEMORY FOR LISTENING !!!!!!!+++++++++++++++++++++" << endl; + cprintf(BG_RED,"++++++++++++++++++++++ COULD NOT ALLOCATE MEMORY FOR LISTENING !!!!!!!+++++++++++++++++++++\n"); exit(-1); } buffer[i]=mem0[i]; + + //push the addresses into freed fifoFree and writingFifoFree - while (buffer[i]<(mem0[i]+(bufferSize * numJobsPerThread + HEADER_SIZE_NUM_TOT_PACKETS)*(fifosize-1))) { - fifoFree[i]->push(buffer[i]); + if(myDetectorType == EIGER){ + while (buffer[i]<(mem0[i]+(onePacketSize * numJobsPerThread)*(fifosize-1))) { + fifoFree[i]->push(buffer[i]); #ifdef FIFO_DEBUG - cprintf(CYAN,"%d fifostructure free pushed into fifofree %x\n", i, (void*)(buffer[i])); + cprintf(CYAN,"%d fifostructure free pushed into fifofree %x\n", i, (void*)(buffer[i])); #endif - buffer[i]+=(bufferSize * numJobsPerThread + HEADER_SIZE_NUM_TOT_PACKETS); + buffer[i]+=(onePacketSize * numJobsPerThread); + } } + else{ + while (buffer[i]<(mem0[i]+(bufferSize * numJobsPerThread + HEADER_SIZE_NUM_TOT_PACKETS)*(fifosize-1))) { + fifoFree[i]->push(buffer[i]); +#ifdef FIFO_DEBUG + cprintf(CYAN,"%d fifostructure free pushed into fifofree %x\n", i, (void*)(buffer[i])); +#endif + buffer[i]+=(bufferSize * numJobsPerThread + HEADER_SIZE_NUM_TOT_PACKETS); + } + } + } cout << "Fifo structure(s) reconstructed" << endl; } @@ -1674,17 +1693,8 @@ int UDPStandardImplementation::startListening(){ int total; int lastpacketoffset, expected, rc,packetcount, maxBufferSize, carryonBufferSize; - uint32_t lastframeheader, lastpacketheader;// for moench to check for all the packets in last frame + uint32_t lastframeheader = 0, lastpacketheader = -1, tempframenum = 0;// for moench to check for all the packets in last frame char* tempchar = NULL; - int last_packet_value = 0xff; - if(myDetectorType == EIGER){ - switch(dynamicRange){ - case 4: last_packet_value = 0x40; break; - case 8: last_packet_value = 0x80; break; - case 16: last_packet_value = 0xff; break; - default: last_packet_value = 0xff; break; // 32 bit mode currently gives pnum only upto 0xff and then resets - } - } while(1){ @@ -1692,10 +1702,12 @@ int UDPStandardImplementation::startListening(){ carryonBufferSize = 0; //if more than 1 listening thread, listen one packet at a time, else need to interleaved frame later maxBufferSize = bufferSize * numJobsPerThread; + #ifdef VERYDEBUG - cprintf(BLUE, "%d maxBufferSize:%d carryonBufferSize:%d\n", ithread, maxBufferSize,carryonBufferSize); + cprintf(BLUE, "%d listenTo:%d maxBufferSize:%d carryonBufferSize:%d\n", ithread,listenTo,maxBufferSize,carryonBufferSize); #endif + //temporary buffer to copy initial packets for next frames if(tempchar) {delete [] tempchar;tempchar = NULL;} if(myDetectorType != EIGER) tempchar = new char[onePacketSize * ((packetsPerFrame/numListeningThreads) - 1)]; //gotthard: 1packet size, moench:39 packet size @@ -1724,9 +1736,32 @@ int UDPStandardImplementation::startListening(){ #ifdef SOCKET_DEBUG if(!ithread){ #endif - rc = udpSocket[ithread]->ReceiveDataOnly(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS, maxBufferSize); - //cout<<"value:"<fnum)<ReceiveDataOnly(buffer[ithread]); + //headers or crazy byte size + while (rc != onePacketSize){ + //end of acquisition + if((rc == 0) && (status == TRANSMITTING)){ + stopListening(ithread,rc,packetcount,total); + continue; + } + //16 byte or crazy packet size + if(rc != EIGER_HEADER_LENGTH) + rc = udpSocket[ithread]->ReceiveDataOnly(buffer[ithread]); + //header + else{ + tempframenum = htonl(*(unsigned int*)((eiger_image_header *)((char*)(buffer[ithread])))->fnum); + rc = udpSocket[ithread]->ReceiveDataOnly(buffer[ithread]); + } + } + } + //other detectors + else{ + rc = udpSocket[ithread]->ReceiveDataOnly(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS, maxBufferSize); + //cout<<"value:"<fnum)< 0) && (!ithread)) + + if( (!measurementStarted) && (myDetectorType != EIGER) && (rc > 0) && (!ithread)) startFrameIndices(ithread); //problem in receiving or end of acquisition - if((rc < expected)||(rc <= 0)){ + //if(((rc < expected) && (status == TRANSMITTING)) || (rc<0)){ + if((status == TRANSMITTING)&& (myDetectorType != EIGER)) { /*if(myDetectorType != EIGER){ //start indices for each start of scan/acquisition - this should be done earlier for normal detectors if((!measurementStarted) && (rc > 0) && (!ithread)) @@ -1776,8 +1813,10 @@ int UDPStandardImplementation::startListening(){ */ //reset - packetcount = (packetsPerFrame/numListeningThreads) * numJobsPerThread; - carryonBufferSize = 0; + if (myDetectorType != EIGER){ + packetcount = (packetsPerFrame/numListeningThreads) * numJobsPerThread; + carryonBufferSize = 0; + } //check if last packet valid and calculate packet count @@ -1838,46 +1877,126 @@ int UDPStandardImplementation::startListening(){ case EIGER: - lastpacketoffset = (((numJobsPerThread * packetsPerFrame/numListeningThreads - 1) * onePacketSize) + EIGER_HEADER_LENGTH + HEADER_SIZE_NUM_TOT_PACKETS); -#ifdef VERYDEBUG - cprintf(BLUE,"%d fnum: 0x%x\n", ithread, htonl(*(unsigned int*)((eiger_image_header *)((char*)(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS)))->fnum)); - cprintf(BLUE,"%d 1st pnum: 0x%x\n", ithread, ((*(uint8_t*)(((eiger_packet_header *)((char*)(buffer[ithread] + EIGER_HEADER_LENGTH + HEADER_SIZE_NUM_TOT_PACKETS)))->num4)))); - cprintf(BLUE,"%d last packet offset: %d\n",ithread,lastpacketoffset); - cprintf(BLUE,"%d last pnum: 0x%x\n", ithread,((*(uint8_t*)(((eiger_packet_header *)((char*)(buffer[ithread] + lastpacketoffset)))->num4)))); -/*for 32 bit try to print 64 bit value of packet header to rule out no other byte changes value other than num4 */ -#endif - //if eiger last packet value is NOT as expected according to bit mode - cprintf(BLUE,"%d lastpacket value: %d\n",ithread,((*(uint8_t*)(((eiger_packet_header *)((char*)(buffer[ithread] + lastpacketoffset)))->num4)))); - cprintf(BLUE,"%d lastpacket value -1: %d\n",ithread,((*(uint8_t*)(((eiger_packet_header *)((char*)(buffer[ithread] + lastpacketoffset - onePacketSize)))->num4)))); - if( ((*(uint8_t*)(((eiger_packet_header *)((char*)(buffer[ithread] + lastpacketoffset)))->num4))) != last_packet_value){ -//#ifdef VERYDEBUG - cprintf(RED,"NOT full frame\n"); -//#endif + + //push missing packet if any by looking at last packetheader values + + + //packetnumber is smaller = different frame + if(lastpacketheader >= ((*(uint8_t*)(((eiger_packet_header *)((char*)(buffer[ithread])))->num4)))) + tempframenum++; + + + + + //tag framenumber to the packet + (*(uint32_t*)(((eiger_packet_header *)((char*)(buffer[ithread])))->num1)) = tempframenum; + //remember last packet value + lastpacketheader = ((*(uint8_t*)(((eiger_packet_header *)((char*)(buffer[ithread])))->num4))); + + + + + + + + + + //literally end of buffer + lastpacketoffset = rc + HEADER_SIZE_NUM_TOT_PACKETS; + + //extra header + //or less than 1 frame caught(end of acquisition) + if(rc < expected){ + cprintf(RED,"%d rc:%d expectted:%d\n", ithread, rc, expected); + //extra header, looking at last byte of buffer + if ((*((uint8_t*)((char*)buffer[ithread]+lastpacketoffset-1))) == 0x1){ + cprintf(RED,"%d extra header\n", ithread); + //copy extra header + lastpacketoffset-= EIGER_HEADER_LENGTH; + carryonBufferSize += EIGER_HEADER_LENGTH; + memcpy(tempchar, buffer[ithread]+(lastpacketoffset-EIGER_HEADER_LENGTH), EIGER_HEADER_LENGTH); + cprintf(RED,"%d lastpacketoffset:%d\n",ithread, lastpacketoffset); + } + + //less than 1 frame caught + else cprintf(RED,"%d less than 1 frame \n", ithread); + + //find number of packets from last packet number + lastpacketoffset -= onePacketSize; + cprintf(RED,"%d lastpacketoffset:%d\n",ithread, lastpacketoffset); + + for (k=0;k*1040num4)))); + } + + packetcount = ((*(uint8_t*)(((eiger_packet_header *)((char*)(buffer[ithread] + lastpacketoffset)))->num4)))+1; + cprintf(RED,"%d num packets caught:%d\n",ithread, packetcount); + + + + cout <<"EXITING"<< endl; + exit(-1); + + } + //more than 1 frame caught but missed the frame header packet + //or exactly 1 frame caught + else { + //points to last packet + lastpacketoffset = rc + HEADER_SIZE_NUM_TOT_PACKETS - onePacketSize; lastpacketheader = ((*(uint8_t*)(((eiger_packet_header *)((char*)(buffer[ithread] + lastpacketoffset)))->num4))); +#ifdef VERYDEBUG + cprintf(BLUE,"%d fnum: 0x%x\n", ithread, htonl(*(unsigned int*)((eiger_image_header *)((char*)(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS)))->fnum)); + cprintf(BLUE,"%d 1st pnum: 0x%x\n", ithread, ((*(uint8_t*)(((eiger_packet_header *)((char*)(buffer[ithread] + EIGER_HEADER_LENGTH + HEADER_SIZE_NUM_TOT_PACKETS)))->num4)))); + cprintf(BLUE,"%d last packet offset: %d\n",ithread,lastpacketoffset); + cprintf(BLUE,"%d last pnum: 0x%x\n", ithread,((*(uint8_t*)(((eiger_packet_header *)((char*)(buffer[ithread] + lastpacketoffset)))->num4)))); + //for 32 bit try to print 64 bit value of packet header to rule out no other byte changes value other than num4 +#endif + + //proper 1 frame + if((rc == expected) && (lastpacketheader == last_packet_value)){ + break; + } + + //incomplete frame, more than 1 frame caught, but missed header packet + cprintf(RED,"%d INCOMPLETE frame, rc:%d, expected:%d\n", ithread, rc, expected); + cprintf(RED,"%d fnum: 0x%x\n", ithread, htonl(*(unsigned int*)((eiger_image_header *)((char*)(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS)))->fnum)); + + //if eiger last packet value is NOT as expected according to bit mode carryonBufferSize += onePacketSize; - cprintf(BLUE,"%d lastpacket value: %d packet count: %d\n",ithread,lastpacketheader,packetcount); lastpacketoffset -= onePacketSize; --packetcount; - cprintf(BLUE,"%d lastpacket value -1: %d\n",ithread,((*(uint8_t*)(((eiger_packet_header *)((char*)(buffer[ithread] + lastpacketoffset - onePacketSize)))->num4)))); + cprintf(RED,"%d lastpacketheader:%d last packet value:%d packetcount: %d\n",ithread,lastpacketheader,((*(uint8_t*)(((eiger_packet_header *)((char*)(buffer[ithread] + lastpacketoffset)))->num4))), packetcount); //while last packet value is greater than current offset packet value (till we reach ff) while (lastpacketheader > ((*(uint8_t*)(((eiger_packet_header *)((char*)(buffer[ithread] + lastpacketoffset)))->num4))) ){ + lastpacketheader = ((*(uint8_t*)(((eiger_packet_header *)((char*)(buffer[ithread] + lastpacketoffset)))->num4))); carryonBufferSize += onePacketSize; - cprintf(BLUE,"%d check value: %d lastpacket value: %d packet count: %d\n",ithread,lastpacketheader,packetcount); lastpacketoffset -= onePacketSize; --packetcount; - cprintf(BLUE,"%d lastpacket value -1: %d\n",ithread,((*(uint8_t*)(((eiger_packet_header *)((char*)(buffer[ithread] + lastpacketoffset - onePacketSize)))->num4)))); + + cprintf(RED,"%d lastpacketheader:%d last packet value:%d packetcount: %d\n",ithread,lastpacketheader,((*(uint8_t*)(((eiger_packet_header *)((char*)(buffer[ithread] + lastpacketoffset)))->num4))), packetcount); } - memcpy(tempchar, buffer[ithread]+(lastpacketoffset+onePacketSize), carryonBufferSize); -//#ifdef VERYDEBUG - cprintf(BLUE,"%d tempchar 1st pnum: 0x%x\n", ithread, ((*(uint8_t*)(((eiger_packet_header *)((char*)(tempchar)))->num4)))); -//#endif + + cprintf(RED,"%d to copy: %d\n", ithread, carryonBufferSize); + cprintf(RED,"%d lastpacketheader:%d last packet value:%d packetcount: %d\n",ithread,lastpacketheader,((*(uint8_t*)(((eiger_packet_header *)((char*)(buffer[ithread] + lastpacketoffset+onePacketSize)))->num4))), packetcount); + + memcpy(tempchar , buffer[ithread]+(HEADER_SIZE_NUM_TOT_PACKETS), EIGER_HEADER_LENGTH); + memcpy((char*)(tempchar + EIGER_HEADER_LENGTH), buffer[ithread]+(lastpacketoffset+onePacketSize), carryonBufferSize); + (*(unsigned int*)((eiger_image_header *)((char*)(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS)))->fnum) = htonl(lastframeheader+1); + cprintf(RED,"%d copied\n", ithread); + //#ifdef VERYDEBUG + cprintf(RED,"%d tempchar 1st pnum: 0x%x\n", ithread, ((*(uint8_t*)(((eiger_packet_header *)((char*)(tempchar+EIGER_HEADER_LENGTH)))->num4)))); + //#endif + } + //when we lose frame header packet + lastframeheader = htonl(*(unsigned int*)((eiger_image_header *)((char*)(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS)))->fnum); + cprintf(RED,"%d lastframeheader : %d\n", ithread, lastframeheader); break; @@ -2208,7 +2327,7 @@ int i; cprintf(BLUE, "%d recvfrom() failed\n", ithread); #endif if(status != TRANSMITTING){ - cprintf(BLUE,"%d *** should never be here********* status not transmitting***********************\n", ithread);/**/ + cprintf(BG_RED,"%d *** should never be here********* status not transmitting***********************\n", ithread);/**/ fifoFree[ithread]->push(buffer[ithread]); #ifdef FIFO_DEBUG cprintf(BLUE,"%d listener not txm free pushed into fifofree %x\n", ithread,(void*)(buffer[ithread])); @@ -2257,9 +2376,9 @@ int i; cprintf(BLUE,"%d dummy buffer num packets:%d\n", ithread(*((uint16_t*)(buffer[ithread])))); #endif while(!fifo[ithread]->push(buffer[ithread])); -#ifdef FIFO_DEBUG +//#ifdef FIFO_DEBUG cprintf(BLUE,"%d listener pushed dummy buffer into fifo %x\n", ithread,(void*)(buffer[ithread])); -#endif +//#endif } //reset mask and exit loop @@ -2504,7 +2623,7 @@ void UDPStandardImplementation::writeToFile_withoutCompression(char* buf,int num void UDPStandardImplementation::handleWithoutDataCompression(int ithread, char* wbuffer[], int partialframe, int smaller){ int totalheader = HEADER_SIZE_NUM_TOT_PACKETS + EIGER_HEADER_LENGTH; - int i,j,npackets; + int i,j,npackets, ntotpackets=0; if (cbAction < DO_EVERYTHING){ @@ -2513,6 +2632,7 @@ void UDPStandardImplementation::handleWithoutDataCompression(int ithread, char* if(partialframe && (i!=smaller) ) continue; //for eiger 32 bit mode, currframenum like gotthard, does not start from 0 or 1 npackets = (uint16_t)(*((uint16_t*)wbuffer[i])); + ntotpackets += npackets; rawDataReadyCallBack(currframenum, wbuffer[i], npackets * onePacketSize, sfilefd, guiData,pRawDataReady); } } @@ -2523,6 +2643,7 @@ void UDPStandardImplementation::handleWithoutDataCompression(int ithread, char* if(partialframe && (j!=smaller) ) continue; npackets = (uint16_t)(*((uint16_t*)wbuffer[j])); + ntotpackets += npackets; if (npackets > 0){ #ifdef WRITE_HEADERS @@ -2583,7 +2704,7 @@ void UDPStandardImplementation::handleWithoutDataCompression(int ithread, char* if(myDetectorType == EIGER) { - if(!partialframe){ + if(ntotpackets >= packetsPerFrame ) { #ifdef VERYDEBUG cprintf(GREEN,"gonna copy frame\n"); #endif