This commit is contained in:
Dhanya Maliakal 2015-07-17 13:29:23 +02:00
parent ca9f195f0b
commit 26a42d8f67
2 changed files with 98 additions and 41 deletions

View File

@ -659,6 +659,9 @@ protected:
/** Receiver buffer */ /** Receiver buffer */
char *buffer[MAX_NUM_LISTENING_THREADS]; char *buffer[MAX_NUM_LISTENING_THREADS];
/** Missing Packet */
char *missingPacket;
/** number of writer threads */ /** number of writer threads */
int numListeningThreads; int numListeningThreads;

View File

@ -54,6 +54,7 @@ UDPStandardImplementation::UDPStandardImplementation()
udpSocket[i] = NULL; udpSocket[i] = NULL;
server_port[i] = DEFAULT_UDP_PORTNO+i; server_port[i] = DEFAULT_UDP_PORTNO+i;
mem0[i] = NULL; mem0[i] = NULL;
missingPacket[i] = NULL;
fifo[i] = NULL; fifo[i] = NULL;
fifoFree[i] = NULL; fifoFree[i] = NULL;
} }
@ -163,6 +164,7 @@ void UDPStandardImplementation::initializeMembers(){
for(int i=0;i<numListeningThreads;i++){ for(int i=0;i<numListeningThreads;i++){
udpSocket[i] = NULL; udpSocket[i] = NULL;
mem0[i] = NULL; mem0[i] = NULL;
missingPacket[i] = NULL;
fifo[i] = NULL; fifo[i] = NULL;
fifoFree[i] = NULL; fifoFree[i] = NULL;
buffer[i] = NULL; buffer[i] = NULL;
@ -243,6 +245,7 @@ void UDPStandardImplementation::deleteMembers(){ FILE_LOG(logDEBUG) << __AT__ <
if(guiFileName) {delete [] guiFileName; guiFileName = NULL;} if(guiFileName) {delete [] guiFileName; guiFileName = NULL;}
for(int i=0;i<numListeningThreads;i++){ for(int i=0;i<numListeningThreads;i++){
if(mem0[i]) {free(mem0[i]); mem0[i] = NULL;} if(mem0[i]) {free(mem0[i]); mem0[i] = NULL;}
if(missingPacket[i]){free(missingPacket[i]);missingPacket[i] = NULL;}
if(fifo[i]) {delete fifo[i]; fifo[i] = NULL;} if(fifo[i]) {delete fifo[i]; fifo[i] = NULL;}
if(fifoFree[i]) {delete fifoFree[i]; fifoFree[i] = NULL;} if(fifoFree[i]) {delete fifoFree[i]; fifoFree[i] = NULL;}
} }
@ -337,7 +340,8 @@ int UDPStandardImplementation::setDetectorType(detectorType det){ FILE_LOG(logD
} }
latestData = new char[frameSize]; latestData = new char[frameSize];
//to force the set up
numJobsPerThread = -1;
setupFifoStructure(); setupFifoStructure();
if(createListeningThreads() == FAIL){ if(createListeningThreads() == FAIL){
@ -596,6 +600,7 @@ int32_t UDPStandardImplementation::setDynamicRange(int32_t dr){ FILE_LOG(logDEB
} }
for(int i=0;i<numListeningThreads;i++){ for(int i=0;i<numListeningThreads;i++){
if(mem0[i]) {free(mem0[i]); mem0[i] = NULL;} if(mem0[i]) {free(mem0[i]); mem0[i] = NULL;}
if(missingPacket[i]){free(missingPacket[i]);missingPacket[i] = NULL;}
if(fifo[i]) {delete fifo[i]; fifo[i] = NULL;} if(fifo[i]) {delete fifo[i]; fifo[i] = NULL;}
if(fifoFree[i]) {delete fifoFree[i]; fifoFree[i] = NULL;} if(fifoFree[i]) {delete fifoFree[i]; fifoFree[i] = NULL;}
buffer[i] = NULL; buffer[i] = NULL;
@ -820,7 +825,7 @@ void UDPStandardImplementation::setupFifoStructure(){
numJobsPerThread = i; numJobsPerThread = i;
} }
//if same, return //if same, return (but oldn =-1 for change in dr, enable10giga etc that changes buffersize)
if(oldn == numJobsPerThread) if(oldn == numJobsPerThread)
return; return;
@ -901,6 +906,30 @@ void UDPStandardImplementation::setupFifoStructure(){
} }
cout << "Fifo structure(s) reconstructed" << endl; cout << "Fifo structure(s) reconstructed" << endl;
//missing packet
if(myDetectorType == EIGER){
if (missingPacket)
free(missingPacket);
missingPacket=(char*)malloc(onePacketSize);
if (missingPacket==NULL){
cprintf(BG_RED,"++++++++++++++++++++++ COULD NOT ALLOCATE MEMORY FOR missing packet !!!!!!!+++++++++++++++++++++\n");
exit(-1);
}
//fill missing packet
else{
for(int j = 0; j < onePacketSize/4; j++)
(*((uint32_t*)(missingPacket + j))) = 0xffffffff;
//to check
for(int j = 0; j < onePacketSize/4; j++)
cout << i << ":"<< (*((uint32_t*)(missingPacket + j))) << endl;
}
}
} }
@ -1693,7 +1722,8 @@ int UDPStandardImplementation::startListening(){
int total; int total;
int lastpacketoffset, expected, rc,packetcount, maxBufferSize, carryonBufferSize; int lastpacketoffset, expected, rc,packetcount, maxBufferSize, carryonBufferSize;
uint32_t lastframeheader = 0, lastpacketheader = -1, tempframenum = 0;// for moench to check for all the packets in last frame uint32_t lastframeheader = 0, lastpacketheader = -1, currentpacketheader = 0,tempframenum = 0;// for moench to check for all the packets in last frame
int numberofmissingpackets, LAST_PACKET_VALUE,i;
char* tempchar = NULL; char* tempchar = NULL;
@ -1711,7 +1741,15 @@ int UDPStandardImplementation::startListening(){
if(tempchar) {delete [] tempchar;tempchar = NULL;} if(tempchar) {delete [] tempchar;tempchar = NULL;}
if(myDetectorType != EIGER) if(myDetectorType != EIGER)
tempchar = new char[onePacketSize * ((packetsPerFrame/numListeningThreads) - 1)]; //gotthard: 1packet size, moench:39 packet size tempchar = new char[onePacketSize * ((packetsPerFrame/numListeningThreads) - 1)]; //gotthard: 1packet size, moench:39 packet size
else{
switch(dynamicRange){
case 4: LAST_PACKET_VALUE = 0x40; break;
case 8: LAST_PACKET_VALUE = 0x80; break;
case 16: LAST_PACKET_VALUE = 0xff; break;
case 32: LAST_PACKET_VALUE = 0xff; break;
default: break;
}
}
while((1<<ithread)&listeningthreads_mask){ while((1<<ithread)&listeningthreads_mask){
@ -1878,29 +1916,62 @@ int UDPStandardImplementation::startListening(){
case EIGER: case EIGER:
currentpacketheader = ((*(uint8_t*)(((eiger_packet_header *)((char*)(buffer[ithread])))->num4)));
numberofmissingpackets = 0;
//push missing packet if any by looking at last packetheader values //update tempframenumber if header packet missed out, ie. if packetnumber is smaller = different frame
if(lastpacketheader >= currentpacketheader){
//packetnumber is smaller = different frame
if(lastpacketheader >= ((*(uint8_t*)(((eiger_packet_header *)((char*)(buffer[ithread])))->num4))))
tempframenum++; tempframenum++;
//add missing packets for previous frame
numberofmissingpackets += (LAST_PACKET_VALUE - lastpacketheader);
//add for missing frames
numberofmissingpackets += ((tempframenum-lastframeheader -1) * (packetsPerFrame/numListeningThreads));
//add missing packets for current frame
numberofmissingpackets += (currentpacketheader);
}
//tag framenumber to the packet
(*(uint32_t*)(((eiger_packet_header *)((char*)(buffer[ithread])))->num1)) = tempframenum;
//remember last packet value //remember last packet value
lastpacketheader = ((*(uint8_t*)(((eiger_packet_header *)((char*)(buffer[ithread])))->num4))); lastpacketheader = ((*(uint8_t*)(((eiger_packet_header *)((char*)(buffer[ithread])))->num4)));
lastframeheader = tempframenum;
//temporarily store current buffer if there are missing packets
if(numberofmissingpackets)
tempchar = buffer[ithread];
for(i=0;i<numberofmissingpackets;i++){
buffer[ithread] = missingPacket;
//tag framenumber to the packet
(*(uint32_t*)(((eiger_packet_header *)((char*)(buffer[ithread])))->num1)) = tempframenum;
#ifdef VERYDEBUG
cprintf(BLUE,"%d listener going to push fifo: 0x%x\n", ithread,(void*)(buffer[ithread]));
#endif
while(!fifo[ithread]->push(buffer[ithread]));
#ifdef FIFO_DEBUG
//if(!ithread)
cprintf(BLUE, "%d listener pushed into fifo %x\n",ithread, (void*)(buffer[ithread]));
#endif
#ifdef VERYDEBUG
cprintf(BLUE, "%d waiting to pop out of listeningfifo\n",ithread);
#endif
//pop
fifoFree[ithread]->pop(buffer[ithread]);
#ifdef FIFO_DEBUG
cprintf(BLUE,"%d listener popped from fifofree %x\n", ithread, (void*)(buffer[ithread]));
#endif
}
buffer[ithread] = tempchar;
//tag framenumber to the packet
(*(uint32_t*)(((eiger_packet_header *)((char*)(buffer[ithread])))->num1)) = tempframenum;
packetcount = 1;
/*
//literally end of buffer //literally end of buffer
lastpacketoffset = rc + HEADER_SIZE_NUM_TOT_PACKETS; lastpacketoffset = rc + HEADER_SIZE_NUM_TOT_PACKETS;
@ -1997,6 +2068,8 @@ int UDPStandardImplementation::startListening(){
//when we lose frame header packet //when we lose frame header packet
lastframeheader = htonl(*(unsigned int*)((eiger_image_header *)((char*)(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS)))->fnum); lastframeheader = htonl(*(unsigned int*)((eiger_image_header *)((char*)(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS)))->fnum);
cprintf(RED,"%d lastframeheader : %d\n", ithread, lastframeheader); cprintf(RED,"%d lastframeheader : %d\n", ithread, lastframeheader);
*/
break; break;
@ -2007,11 +2080,14 @@ int UDPStandardImplementation::startListening(){
//#ifdef VERYDEBUG //#ifdef VERYDEBUG
cprintf(BLUE, "%d packetcount:%d carryonbuffer:%d\n", ithread, packetcount, carryonBufferSize); cprintf(BLUE, "%d packetcount:%d carryonbuffer:%d\n", ithread, packetcount, carryonBufferSize);
//#endif //#endif
//write packet count and push //write packet count and push
(*((uint16_t*)(buffer[ithread]))) = packetcount; if(myDetectorType != EIGER)
(*((uint16_t*)(buffer[ithread]))) = packetcount;
totalListeningFrameCount[ithread] += packetcount; totalListeningFrameCount[ithread] += packetcount;
#ifdef VERYDEBUG #ifdef VERYDEBUG
cprintf(BLUE,"%d listener going to push fifo: 0x%x\n", ithread,(void*)(buffer[ithread])); cprintf(BLUE,"%d listener going to push fifo: 0x%x\n", ithread,(void*)(buffer[ithread]));
@ -2067,8 +2143,7 @@ int UDPStandardImplementation::startWriting(){
int xmax=0,ymax=0; int xmax=0,ymax=0;
int ret,i; int ret,i;
int packetsPerThread = packetsPerFrame/numListeningThreads; int packetsPerThread = packetsPerFrame/numListeningThreads;
int allDummyFramesPopped;
int smaller, onlyoneport=0;
while(1){ while(1){
@ -2099,16 +2174,10 @@ int UDPStandardImplementation::startWriting(){
while((1<<ithread)&writerthreads_mask){ while((1<<ithread)&writerthreads_mask){
#ifdef VERYDEBUG
cprintf(GREEN,"%d waiting to pop out of writing fifo\n", ithread);
#endif
//pop //pop
allDummyFramesPopped=1;
for(i=0;i<numListeningThreads;++i){ for(i=0;i<numListeningThreads;++i){
//if previous popped out frame was last dummy frame, dont pop out or you're stuck
//also check if there is a previous frame, then also dont pop out or you miss one frame
if(!previousframe[i]){
//#ifdef VERYDEBUG //#ifdef VERYDEBUG
cprintf(GREEN,"%d writer gonna pop from fifo: %d\n",ithread,i); cprintf(GREEN,"%d writer gonna pop from fifo: %d\n",ithread,i);
//#endif //#endif
@ -2120,26 +2189,11 @@ int UDPStandardImplementation::startWriting(){
//#ifdef VERYDEBUG //#ifdef VERYDEBUG
cprintf(GREEN,"%d numpackets: %d for fifo :%d\n", ithread, numpackets[i], i); cprintf(GREEN,"%d numpackets: %d for fifo :%d\n", ithread, numpackets[i], i);
//#endif //#endif
//if last dummy packet, free it
if(numpackets[i] == 0xFFFF){
//#ifdef VERYDEBUG
cprintf(GREEN, "%d popped last dummy frame:0x%x for listen thread %d\n", ithread, (void*)wbuf[i], i);
//#endif
while(!fifoFree[i]->push(wbuf[i]));
#ifdef FIFO_DEBUG
cprintf(GREEN,"%d writer free dummy pushed into fifofree %x for listener %d\n", ithread,(void*)(wbuf[i]),i);
#endif
}else allDummyFramesPopped = 0;
}
} }
//if all last dummy frames popped //if all last dummy frames popped
if(allDummyFramesPopped){ if(numpackets[i] == 0xFFFF){
#ifdef VERYDEBUG #ifdef VERYDEBUG
cprintf(GREEN,"%d all dummy frames popped\n", ithread); cprintf(GREEN,"%d all dummy frames popped\n", ithread);
#endif #endif