This commit is contained in:
Dhanya Maliakal 2015-08-04 15:20:00 +02:00
parent 902ab36705
commit 4fbfe186ef

View File

@ -1922,43 +1922,34 @@ int UDPStandardImplementation::startWriting(){
thread_started = 1; thread_started = 1;
int numpackets[numListeningThreads], nf;
bool startdatapacket[numListeningThreads],fullframe[numListeningThreads],popready[numListeningThreads];
uint32_t tempframenum[numListeningThreads];
int lastpacketheader[numListeningThreads], currentpacketheader[numListeningThreads];
int numberofmissingpackets[numListeningThreads];
int LAST_PACKET_VALUE;
char* wbuf[numListeningThreads];//interleaved char* wbuf[numListeningThreads];//interleaved
char *d=new char[bufferSize*numListeningThreads]; char *d=new char[bufferSize*numListeningThreads];
int xmax=0,ymax=0; int xmax=0,ymax=0;
int ret,i,j; int ret,i,j;
int numpackets[numListeningThreads], nf;
bool startdatapacket[numListeningThreads],fullframe[numListeningThreads],popready[numListeningThreads];
uint32_t tempframenum[numListeningThreads];
int lastpacketheader[numListeningThreads], currentpacketheader[numListeningThreads];
int numberofmissingpackets[numListeningThreads];
char* tofree[packetsPerFrame] ; int MAX_VALUE = 1024;
int tofreeoffset[packetsPerFrame]; char* tofree[MAX_VALUE];
char* tempbuffer[packetsPerFrame]; char* tempbuffer[MAX_VALUE];
char* blankframe[packetsPerFrame]; char* blankframe[MAX_VALUE];
int blankoffset; int tofreeoffset[numListeningThreads];
int tempoffset[numListeningThreads]; int tempoffset[numListeningThreads];
int blankoffset;
for(i=0;i<MAX_VALUE;++i){
tempbuffer[i] = 0;
tofree[i] = 0;
blankframe[i] = 0;
}
int LAST_PACKET_VALUE;
//last packet numbers for different dynamic ranges
if(myDetectorType == EIGER){ if(myDetectorType == EIGER){
for(i=0;i<packetsPerFrame;++i){
tempbuffer[i] = 0;
tofree[i] = 0;
blankframe[i] =new char[onePacketSize];
for(j=0;j<onePacketSize;++j)
(*((uint8_t*)((char*)(blankframe[i])+j))) = 0xFF;
if((*(uint8_t*)(((eiger_packet_header *)((char*)(blankframe[i])))->num3)) != 0xFF)
cprintf(RED,"blank frame header is not FF\n");
cprintf(GREEN,"packet %d blank frame 0x%x\n",i,(void*)(blankframe[i]));
}
//last packet numbers for different dynamic ranges
switch(dynamicRange){ switch(dynamicRange){
case 4: LAST_PACKET_VALUE = 0x40; break; case 4: LAST_PACKET_VALUE = 0x40; break;
case 8: LAST_PACKET_VALUE = 0x80; break; case 8: LAST_PACKET_VALUE = 0x80; break;
@ -1966,9 +1957,10 @@ int UDPStandardImplementation::startWriting(){
case 32: LAST_PACKET_VALUE = 0xff; break; case 32: LAST_PACKET_VALUE = 0xff; break;
default: break; default: break;
} }
} }
while(1){ while(1){
@ -1989,6 +1981,20 @@ int UDPStandardImplementation::startWriting(){
//so that the first frame is always copied //so that the first frame is always copied
guiData = latestData; guiData = latestData;
//blank frame
if(myDetectorType == EIGER){
for(i=0;i<packetsPerFrame;++i){
if(blankframe[i]){delete [] blankframe[i]; blankframe[i] = 0;}
blankframe[i] = new char[onePacketSize];
for(j=0;j<onePacketSize;++j)
(*((uint8_t*)((char*)(blankframe[i])+j))) = 0xFF;
#ifdef FIFO_DEBUG
cprintf(GREEN,"packet %d blank frame 0x%x\n",i,(void*)(blankframe[i]));
#endif
}
}
//allow them all to be popped initially //allow them all to be popped initially
for(i=0;i<numListeningThreads;++i){ for(i=0;i<numListeningThreads;++i){
fullframe[i] = false; fullframe[i] = false;
@ -2029,7 +2035,9 @@ int UDPStandardImplementation::startWriting(){
//dont pop again if dummy packet //dont pop again if dummy packet
if(!numpackets[i]){ if(!numpackets[i]){
popready[i] = false; popready[i] = false;
cprintf(RED,"%d dummy frame popped out of fifo %d",ithread, i); #ifdef EIGER_DEBUG3
cprintf(RED,"%d Dummy frame popped out of fifo %d",ithread, i);
#endif
}else{ }else{
tofree[tofreeoffset[i]] = wbuf[i]; tofree[tofreeoffset[i]] = wbuf[i];
tofreeoffset[i]++; tofreeoffset[i]++;
@ -2040,15 +2048,17 @@ int UDPStandardImplementation::startWriting(){
//end of acquisition //END OF ACQUISITION
if((!numpackets[0])&& (!numpackets[1])){ if((!numpackets[0])&& (!numpackets[1])){
//#ifdef VERYDEBUG #ifdef VERYDEBUG
cprintf(GREEN,"%d Both dummy frames\n", ithread); cprintf(GREEN,"%d Both dummy frames\n", ithread);
//#endif #endif
//remaning packets to be written //remaning packets to be written
if((myDetectorType == EIGER) && if((myDetectorType == EIGER) &&
((tempoffset[0]!=0) || (tempoffset[1]!=(packetsPerFrame/numListeningThreads)))){ ((tempoffset[0]!=0) || (tempoffset[1]!=(packetsPerFrame/numListeningThreads)))){
cprintf(RED,"**missing packets and end of acquisition\n"); #ifdef EIGER_DEBUG3
cprintf(RED,"**End of Acquisition but didnt get last packet\n");
#endif
for(i=0;i<numListeningThreads;++i){ for(i=0;i<numListeningThreads;++i){
//add missing packets //add missing packets
numberofmissingpackets[i] = (LAST_PACKET_VALUE - lastpacketheader[i]); numberofmissingpackets[i] = (LAST_PACKET_VALUE - lastpacketheader[i]);
@ -2075,37 +2085,28 @@ int UDPStandardImplementation::startWriting(){
if(myDetectorType == EIGER){ if(myDetectorType == EIGER){
//trying to find a full frame
if(!fullframe[0] || !fullframe[1]){ if(!fullframe[0] || !fullframe[1]){
//trying to find a full frame
for(i=0;i<numListeningThreads;++i){ for(i=0;i<numListeningThreads;++i){
//offset outside boundaries, also eliminates dummy packet //offset outside boundaries, also eliminates dummy packet
if((numpackets[i] != EIGER_HEADER_LENGTH) && (numpackets[i] != onePacketSize)){ if((numpackets[i] != EIGER_HEADER_LENGTH) && (numpackets[i] != onePacketSize)){
if(numpackets[i]){ if(numpackets[i])
cprintf(RED, "WARNING: Got a weird packet size: %d from fifo %d\n", numpackets[i],i); cprintf(RED, "WARNING: Got a weird packet size: %d from fifo %d\n", numpackets[i],i);
if(numpackets[i] == 16)
cprintf(RED,"weird packet, x%x\n",(*(uint16_t*)((uint16_t *)((char*)(wbuf[i] + HEADER_SIZE_NUM_TOT_PACKETS)))));
}
#ifdef VERBOSE #ifdef VERBOSE
else else
cprintf(RED, "WARNING: Dummy packet: %d from fifo %d\n", numpackets[i],i); cprintf(RED, "WARNING: Dummy packet: %d from fifo %d\n", numpackets[i],i);
#endif #endif
if(numpackets[i]){
while(!fifoFree[i]->push((wbuf[i])));
//#ifdef FIFO_DEBUG
cprintf(GREEN,"%d writer freed unknown length pushed into fifofree %x for listener %d\n",ithread, (void*)(wbuf[i]),i);
//#endif
}
continue; continue;
} }
//not dummy buffer and not after getting a full frame //not dummy buffer and not after getting a full frame
if(numpackets[i] && (!fullframe[i])){ if(numpackets[i] && (!fullframe[i])){
//header packet //IMAGE HEADER PACKET
if( 0x01 == (*(uint8_t*)(((eiger_image_header *)((char*)(wbuf[i] + HEADER_SIZE_NUM_TOT_PACKETS)))->header_confirm))){ if( 0x01 == (*(uint8_t*)(((eiger_image_header *)((char*)(wbuf[i] + HEADER_SIZE_NUM_TOT_PACKETS)))->header_confirm))){
//new frame (no datapacket received yet), update frame num and corrected for fnum reset for scans //new frame (no datapacket received yet), update frame num and corrected for fnum reset for scans
@ -2114,12 +2115,37 @@ int UDPStandardImplementation::startWriting(){
if(!tempframenum[i]) if(!tempframenum[i])
cprintf(RED,"**VERY WEIRD frame numbers for fifo %d: %d\n",i,tempframenum[i]); cprintf(RED,"**VERY WEIRD frame numbers for fifo %d: %d\n",i,tempframenum[i]);
tempframenum[i] += (startFrameIndex-1); tempframenum[i] += (startFrameIndex-1);
//#ifdef VERYVERBOSE
cprintf(GREEN,"**tempfraemnum of %d: %d\n",i,tempframenum[i]); //normal frame packet (also exception of tempnum 0 and currfnum 0)
//#endif if((tempframenum[i] == (currframenum+1))||(!tempframenum[i] && !currframenum)){
}//next frame, leave #ifdef EIGER_DEBUG3
cprintf(GREEN,"**tempfraemnum of %d: %d\n",i,tempframenum[i]);
#endif
}
//frame too far ahead
else{
#ifdef EIGER_DEBUG3
cprintf(RED,"frame number too far ahead, missing packets\n");
#endif
tempframenum[i] = currframenum + 1;
//add missing packets
numberofmissingpackets[i] = (LAST_PACKET_VALUE);
//to decrement from packetsInFile to calculate packet loss
for(j=0;j<numberofmissingpackets[i];++j){
tempbuffer[tempoffset[i]] = blankframe[blankoffset];
tempoffset[i] ++;
blankoffset ++;
}
//set fullframe and dont let fifo pop over it until written
fullframe[i] = true;
popready[i] = false;
}
}//two image headers at a time = next frame, leave
else{ else{
cprintf(RED,"**missing packets and got header\n"); #ifdef EIGER_DEBUG3
cprintf(RED,"received frame header twice, missing packets\n");
#endif
//add missing packets //add missing packets
numberofmissingpackets[i] = (LAST_PACKET_VALUE - lastpacketheader[i]); numberofmissingpackets[i] = (LAST_PACKET_VALUE - lastpacketheader[i]);
//to decrement from packetsInFile to calculate packet loss //to decrement from packetsInFile to calculate packet loss
@ -2133,7 +2159,7 @@ int UDPStandardImplementation::startWriting(){
popready[i] = false; popready[i] = false;
} }
} }
//data packet //DATA PACKET
else{ else{
startdatapacket[i] = true; startdatapacket[i] = true;
//update current packet //update current packet
@ -2158,19 +2184,21 @@ int UDPStandardImplementation::startWriting(){
lastpacketheader[i] = currentpacketheader[i]; lastpacketheader[i] = currentpacketheader[i];
//last frame got, this will save time and also for last frames, it doesnt wait for stop receiver //last frame got, this will save time and also for last frames, it doesnt wait for stop receiver
if(currentpacketheader[i] == LAST_PACKET_VALUE){ if(currentpacketheader[i] == LAST_PACKET_VALUE){
cout<<"last value"<<endl; #ifdef EIGER_DEBUG3
cprintf(GREEN, "Got last packet\n");
#endif
fullframe[i] = true; fullframe[i] = true;
popready[i] = false; popready[i] = false;
} }
} }
//next frame packet - leave //next frame packet - leave
else{cout<<"abnormal next frame packet!! "<<endl; else{
#ifdef EIGER_DEBUG3
cprintf(RED,"packet from next frame, missing packets"<<endl;
#endif
//add missing packets //add missing packets
cprintf(RED,"LAST_PACKET_VALUE:%d lastpacketheader[%d]:%d\n",LAST_PACKET_VALUE,i,lastpacketheader[i]);
numberofmissingpackets[i] = (LAST_PACKET_VALUE - lastpacketheader[i]); numberofmissingpackets[i] = (LAST_PACKET_VALUE - lastpacketheader[i]);
cprintf(RED, "fifo:%d abrupt missing packets:%d\n",i,numberofmissingpackets[i]);
//to decrement from packetsInFile to calculate packet loss //to decrement from packetsInFile to calculate packet loss
for(j=0;j<numberofmissingpackets[i];++j){ for(j=0;j<numberofmissingpackets[i];++j){
tempbuffer[tempoffset[i]] = blankframe[blankoffset]; tempbuffer[tempoffset[i]] = blankframe[blankoffset];
@ -2196,25 +2224,23 @@ int UDPStandardImplementation::startWriting(){
if(tempframenum[0] != tempframenum[1]) if(tempframenum[0] != tempframenum[1])
cprintf(RED,"Frame numbers mismatch!!! %d %d\n",tempframenum[0],tempframenum[1]); cprintf(RED,"Frame numbers mismatch!!! %d %d\n",tempframenum[0],tempframenum[1]);
currframenum = tempframenum[0]; currframenum = tempframenum[0];
//#ifdef EIGER_DEBUG2 //to resolve for missing frame packets
cprintf(GREEN,"%d **fnum:%d**\n",currframenum); tempframenum[0]++;
if(numberofmissingpackets[0]) tempframenum[1]++;
cprintf(RED, "fifo 0 missing packets:%d\n",numberofmissingpackets[0]);
if(numberofmissingpackets[1])
cprintf(RED, "fifo 1 missing packets:%d\n",numberofmissingpackets[1]);
//#endif
if(blankoffset)
cprintf(RED, "blankoffset:%d\n",blankoffset);
numMissingPackets += (numberofmissingpackets[0]+numberofmissingpackets[1]); numMissingPackets += (numberofmissingpackets[0]+numberofmissingpackets[1]);
numTotMissingPacketsInFile += numMissingPackets;
#ifdef EIGER_DEBUG2
cprintf(GREEN,"%d **fnum:%d**\n",currframenum);
#endif
#ifdef EIGER_DEBUG3
if(numberofmissingpackets[0])
cprintf(RED, "fifo 0 missing packets:%d\n",numberofmissingpackets[0]);
if(numberofmissingpackets[1])
cprintf(RED, "fifo 1 missing packets:%d\n",numberofmissingpackets[1]);
if(numMissingPackets) if(numMissingPackets)
cprintf(RED, "numMissingPackets:%d\n",numMissingPackets); cprintf(RED, "numMissingPackets:%d\n",numMissingPackets);
#endif
numTotMissingPacketsInFile += numMissingPackets;
/*if(numTotMissingPacketsInFile)
cprintf(RED, "numTotMissingPacketsInFile:%d\n",numTotMissingPacketsInFile);*/
//write and copy to gui //write and copy to gui
@ -2237,9 +2263,9 @@ int UDPStandardImplementation::startWriting(){
//#ifdef VERYDEBUG #ifdef VERYDEBUG
cprintf(GREEN,"finished freeing\n"); cprintf(GREEN,"finished freeing\n");
//#endif #endif
//reset a few stuff //reset a few stuff
for(int i=0;i<numListeningThreads;i++){ for(int i=0;i<numListeningThreads;i++){
@ -2518,9 +2544,9 @@ void UDPStandardImplementation::stopWriting(int ithread, char* wbuffer[]){
//free fifo //free fifo
for(i=0;i<numListeningThreads;++i){ for(i=0;i<numListeningThreads;++i){
while(!fifoFree[i]->push(wbuffer[i])); while(!fifoFree[i]->push(wbuffer[i]));
//#ifdef FIFO_DEBUG #ifdef FIFO_DEBUG
cprintf(GREEN,"%d writer free dummy pushed into fifofree %x for listener %d\n", ithread,(void*)(wbuffer[i]),i); cprintf(GREEN,"%d writer free dummy pushed into fifofree %x for listener %d\n", ithread,(void*)(wbuffer[i]),i);
//#endif #endif
} }
@ -2584,7 +2610,7 @@ void UDPStandardImplementation::stopWriting(int ithread, char* wbuffer[]){
void UDPStandardImplementation::writeToFile_withoutCompression(char* buf[],int numpackets, uint32_t framenum){ void UDPStandardImplementation::writeToFile_withoutCompression(char* buf[],int numpackets, uint32_t framenum){
FILE_LOG(logDEBUG) << __AT__ << " called"; FILE_LOG(logDEBUG) << __AT__ << " called";
cout<<"in write to file numpackets:"<<numpackets<<endl;
int packetsToSave, offset,lastpacket,i; int packetsToSave, offset,lastpacket,i;
uint32_t tempframenum = framenum; uint32_t tempframenum = framenum;
@ -2675,15 +2701,17 @@ cout<<"in write to file numpackets:"<<numpackets<<endl;
}else }else
fwrite(buf[0]+offset, 1, packetsToSave * onePacketSize, sfilefd); fwrite(buf[0]+offset, 1, packetsToSave * onePacketSize, sfilefd);
packetsInFile += packetsToSave; packetsInFile += packetsToSave;
cout<<"packetscaught earlier:"<<packetsCaught <<" totalPacketsCaught earlier:"<<totalPacketsCaught<<endl; #ifdef EIGER_DEBUG3
cout<<"packetstosave:"<<packetsToSave<<" numMissingPackets:"<<numMissingPackets<< " adding on:"<< (packetsToSave - numMissingPackets)<<endl; cprintf(GREEN,"packetscaught earlier:%d packetstosave:%d numMissingPackets:%d addingon:%d\n",
packetsCaught,packetsToSave,numMissingPackets,(packetsToSave - numMissingPackets));
#endif
packetsCaught += (packetsToSave - numMissingPackets); packetsCaught += (packetsToSave - numMissingPackets);
totalPacketsCaught += (packetsToSave - numMissingPackets); totalPacketsCaught += (packetsToSave - numMissingPackets);
numMissingPackets = 0; numMissingPackets = 0;
//#ifdef VERYDEBUG #ifdef EIGER_DEBUG3
cprintf(GREEN,"packetscaught:%d\n", packetsCaught); cprintf(GREEN,"packetscaught:%d\n", packetsCaught);
cprintf(GREEN,"totalPacketsCaught:%d\n", totalPacketsCaught); cprintf(GREEN,"totalPacketsCaught:%d\n", totalPacketsCaught);
//#endif #endif
//new file //new file
if(packetsInFile >= maxPacketsPerFile){ if(packetsInFile >= maxPacketsPerFile){
@ -2814,28 +2842,10 @@ void UDPStandardImplementation::handleWithoutDataCompression(int ithread, char*
cprintf(GREEN,"gonna copy frame\n"); cprintf(GREEN,"gonna copy frame\n");
#endif #endif
copyFrameToGui(wbuffer,currframenum); copyFrameToGui(wbuffer,currframenum);
//#ifdef VERYDEBUG #ifdef VERYDEBUG
cprintf(GREEN,"copied frame\n"); cprintf(GREEN,"copied frame\n");
//#endif #endif
}else{
/*
for(j=0;j<packetsPerFrame;++j){
if((*(uint8_t*)(((eiger_packet_header *)((char*)(wbuffer[j])))->num3)) != 0xFF){
while(!fifoFree[(j/(packetsPerFrame/2))]->push(&(wbuffer[j] - HEADER_SIZE_NUM_TOT_PACKETS)));
//#ifdef FIFO_DEBUG
cprintf(GREEN,"%d writer freed pushed into fifofree %x for listener %d\n",ithread, (void*)(wbuffer[j]- HEADER_SIZE_NUM_TOT_PACKETS),(j/(packetsPerFrame/2)));
//#endif
}else cprintf(GREEN,"blank frame 0x%x\n",(void*)(wbuffer[j]));
}
//#ifdef VERYDEBUG
cprintf(GREEN,"finished freeing\n");
//#endif
*/
}
else{
//copy to gui //copy to gui
if(npackets >= packetsPerFrame){//min 1 frame, but neednt be if(npackets >= packetsPerFrame){//min 1 frame, but neednt be
//if(npackets == packetsPerFrame * numJobsPerThread){ //only full frames //if(npackets == packetsPerFrame * numJobsPerThread){ //only full frames
@ -2843,11 +2853,14 @@ void UDPStandardImplementation::handleWithoutDataCompression(int ithread, char*
#ifdef VERYVERBOSE #ifdef VERYVERBOSE
cout << ithread << " finished copying" << endl; cout << ithread << " finished copying" << endl;
#endif #endif
}//else cout << "unfinished buffersize" << endl; }
/* freeing now done at function call
//else cout << "unfinished buffersize" << endl;
while(!fifoFree[0]->push(wbuffer[0])); while(!fifoFree[0]->push(wbuffer[0]));
#ifdef FIFO_DEBUG #ifdef FIFO_DEBUG
cprintf(GREEN,"%d writer freed pushed into fifofree %x for listener 0\n",ithread, (void*)(wbuffer[0])); cprintf(GREEN,"%d writer freed pushed into fifofree %x for listener 0\n",ithread, (void*)(wbuffer[0]));
#endif #endif
*/
} }
} }