This commit is contained in:
Dhanya Maliakal 2015-10-21 11:03:57 +02:00
parent 253383331f
commit 6d6725c4e8

View File

@ -1773,12 +1773,14 @@ void UDPStandardImplementation::startWriting(){
void UDPStandardImplementation::processWritingBuffer(int ithread, int &nf){
void UDPStandardImplementation::processWritingBuffer(int ithread){
FILE_LOG(logDEBUG1) << __AT__ << " called";
//variable definitions
char* wbuf[numberofListeningThreads]; //buffer popped from FIFO
sfilefd = NULL; //file pointer
int nf; //for compression, number of frames
/* outer loop - loops once for each acquisition */
//infinite loop, exited only to change dynamic range, 10G parameters etc (then recreated again)
@ -1786,6 +1788,8 @@ void UDPStandardImplementation::processWritingBuffer(int ithread, int &nf){
//--reset parameters before acquisition
nf = 0;
guiData = latestData; //so that the first frame is always copied
/* inner loop - loop for each buffer */
//until mask unset (udp sockets shut down by client)
@ -1803,7 +1807,7 @@ void UDPStandardImplementation::processWritingBuffer(int ithread, int &nf){
#endif
//dummy packet
//end of acquisition
if(numPackets == dummyPacketValue){
#ifdef DEBUG3
cprintf(GREEN,"Writing_Thread %d: Dummy frame popped out of FIFO %d",ithread, 0);
@ -1829,8 +1833,6 @@ void UDPStandardImplementation::processWritingBuffer(int ithread, int &nf){
waitWritingBufferForNextAcquisition(ithread);
}/*--end of loop for each acquisition (outer loop) */
}
@ -1843,32 +1845,72 @@ void UDPStandardImplementation::processWritingBuffer(int ithread, int &nf){
void UDPStandardImplementation::processWritingBufferPacketByPacket(int ithread){
FILE_LOG(logDEBUG1) << __AT__ << " called";
//variable definitions
char* wbuf[numberofListeningThreads]; //buffer popped from FIFO
char* packetBuffer[numberofListeningThreads]; //buffer popped from FIFO
sfilefd = NULL; //file pointer
bool popReady[numberofListeningThreads]; //if the FIFO can be popped
uint32_t numPackets[numberofListeningThreads]; //number of packets popped from the FIFO
//eiger specific
int MAX_NUM_PACKETS = 1024; //highest 32 bit has 1024 number of packets
char* toFreePointers[MAX_NUM_PACKETS]; //pointers to free for each frame
int toFreePointersOffset[numberofListeningThreads]; //offset of pointers to free added for each thread
uint32_t numPackets[numberofListeningThreads]; //number of packets popped from the FIFO
int MAX_NUM_PACKETS = 1024; //highest 32 bit has 1024 number of packets
uint32_t LAST_PACKET_VALUE; //last packet number
char* toFreePointers[MAX_NUM_PACKETS]; //pointers to free for each frame
int toFreePointersOffset[numberofListeningThreads]; //offset of pointers to free added for each thread
char* frameBuffer[MAX_NUM_PACKETS]; //buffer offset created for a whole frame
int frameBufferoffset[numberofListeningThreads]; //buffer offset created for a whole frame for both listening threads
char* blankframe[MAX_NUM_PACKETS]; //blank buffer for a whole frame with missing packets
int blankoffset; //blank buffer offset
bool fullframe[numberofListeningThreads]; //if full frame processed for each listening thread
volatile uint32_t threadFrameNumber[numberofListeningThreads]; //thread frame number for each listening thread buffer popped out
volatile uint32_t presentFrameNumber; //the current frame number aiming to be built
volatile uint32_t lastPacketNumber[numberofListeningThreads]; //last packet number got
volatile uint32_t currentPacketNumber[numberofListeningThreads];//current packet number
volatile int numberofMissingPackets[numberofListeningThreads]; // number of missing packets in this buffer
eiger_packet_header_t* blankframe_header;
for(int i=0; i<MAX_NUM_PACKETS; ++i){
toFreePointers[i] = NULL;
frameBuffer[i] = NULL;
blankframe[i] = NULL;
}
/* outer loop - loops once for each acquisition */
//infinite loop, exited only to change dynamic range, 10G parameters etc (then recreated again)
while(true){
//--reset parameters before acquisition
for(int i=0; i<numberofListeningThreads; ++i){
wbuf[i] = NULL;
packetBuffer[i] = NULL;
popReady[i] = true;
numPackets[i] = 0;
toFreePointersOffset[i] = (i*packetsPerFrame/numberofListeningThreads);
frameBufferoffset[i] = (i*packetsPerFrame/numberofListeningThreads);
fullframe[i] = false;
threadFrameNumber[i] = 0;
lastPacketNumber[i] = 0;
currentPacketNumber[i] = 0;
numberofMissingPackets[i] = 0;
}
for(int i=0; i<MAX_NUM_PACKETS; ++i){
toFreePointers[i] = NULL;
presentFrameNumber = 0;
//blank frame - initializing with missing packet values
blankoffset = 0;
unsigned char* blankframe_data=0;
for(int i=0; i<packetsPerFrame; ++i){
if(blankframe[i]){delete [] blankframe[i]; blankframe[i] = 0;}
blankframe[i] = new char[onePacketSize];
//set missing packet to 0xff
blankframe_header = (eiger_packet_header_t*) blankframe[i];
*( (uint16_t*) blankframe_header->missingPacket) = missingPacketValue;
//set each value inside blank frame to 0xff
for(int j=0;j<(oneDataSize);++j){
blankframe_data = (unsigned char*)blankframe[i] + sizeof(eiger_packet_header_t) + j;
*(blankframe_data) = 0xFF;
}
}
//--end of reset parameters before acquisition
guiData = latestData; //so that the first frame is always copied
LAST_PACKET_VALUE = (packetsPerFrame/numberofListeningThreads);
/* inner loop - loop for each buffer */
@ -1877,22 +1919,147 @@ void UDPStandardImplementation::processWritingBufferPacketByPacket(int ithread){
//pop fifo and if end of acquisition
if(popAndCheckEndofAcquisition(ithread, wbuf, popReady, numPackets,toFreePointers,toFreePointersOffset)){
if(popAndCheckEndofAcquisition(ithread, packetBuffer, popReady, numPackets,toFreePointers,toFreePointersOffset)){
#ifdef DEBUG4
cprintf(GREEN,"Writing_Thread %d: All dummy-end buffers popped\n", ithread);
cprintf(GREEN,"All dummy-end buffers popped\n", ithread);
#endif
//finish missing packets
if(myDetectorType == EIGER &&
((tempoffset[0]!=0) || (tempoffset[1]!=(packetsPerFrame/numberofListeningThreads))));
if(((frameBufferoffset[0]!=0) || (frameBufferoffset[1]!=(packetsPerFrame/numberofListeningThreads))));
else{
stopWriting(ithread,wbuf);
stopWriting(ithread,packetBuffer);
continue;
}
}
for(int i=0;i<numberofListeningThreads;++i){
/**Check if ths required*/
//dummy done-------------------------------------------------------------------------
if(numPackets[i] == dummyPacketValue && frameBufferoffset[i]== (((i+1)*packetsPerFrame/numberofListeningThreads)));
//NOT FULL FRAME
else if(!fullframe[i]){
eiger_packet_footer_t* packetBuffer_footer = (eiger_packet_footer_t*)(packetBuffer[i] + footerOffset + HEADER_SIZE_NUM_TOT_PACKETS);
//update frame number
if(numPackets[i] != dummyPacketValue){
if(!((uint32_t)(*( (uint64_t*) packetBuffer_footer)))){
cprintf(BG_RED,"Fifo %d: Error: Frame Number is zero from firmware. popready[%d]:%d\n",
i,(uint32_t)(*( (uint64_t*) packetBuffer_footer)),i,popReady[i]);
popReady[i]=true;
continue;
}
threadFrameNumber[i] = (uint32_t)(*( (uint64_t*) packetBuffer_footer));
threadFrameNumber[i] += (startFrameIndex - 1);
}
//dummy not done -----------------------------
if(numPackets[i] == dummyPacketValue){
#ifdef DEBUG4
cprintf(RED, "Fifo %d: Dummy packet: Adding missing packets to the last frame\n", i);
#endif
numberofMissingPackets[i] = (LAST_PACKET_VALUE - lastPacketNumber[i]);
threadFrameNumber[i] = dummyPacketValue;
}
//wrong packet (dummy (not full) or from next frame))--------------------------------
else if (threadFrameNumber[i] != presentFrameNumber){
#ifdef DEBUG4
cprintf(GREEN,"Fifo %d: Wrong Packet has fnum %d, (firmware fnum %d), pnum %d, last_packet %d, pnum_offset %d\n",
i,presentFrameNumber[i],(uint32_t)(*( (uint64_t*) packetBuffer_footer)),
*( (uint16_t*) packetBuffer_footer->packetNumber),lastPacketNumber[i],frameBufferoffset[i] );
cprintf(RED,"Fifo %d: Add missing packets to the right fnum %d\n",
i,presentFrameNumber);
#endif
numberofMissingPackets[i] = (LAST_PACKET_VALUE - lastPacketNumber[i]);
}
//correct packet (but never dummy frame)-------------------------------------------
else{
//update current packet
eiger_packet_footer_t* packetBuffer_footer = (eiger_packet_footer_t*)(packetBuffer[i] + footerOffset + HEADER_SIZE_NUM_TOT_PACKETS);
currentPacketNumber[i] = *( (uint16_t*) packetBuffer_footer->packetNumber);
#ifdef DEBUG4
cprintf(GREEN,"Fifo %d: Correct Packet has fnum %d, pnum %d, last_packet %d, pnum_offset\n",
i,threadFrameNumber[i],currentPacketNumber[i],lastPacketNumber[i],frameBufferoffset[i]);
#endif
numberofMissingPackets[i] = (currentPacketNumber[i] - lastPacketNumber[i] - 1);
}
//add missing packets
for(int j=0;j<numberofMissingPackets[i];++j){
frameBuffer[frameBufferoffset[i]] = blankframe[blankoffset];
eiger_packet_header_t* frameBuffer_header = (eiger_packet_header_t*) frameBuffer[frameBufferoffset[i]];
if (*( (uint16_t*) frameBuffer_header->missingPacket)!= missingPacketValue){
blankframe_header = (eiger_packet_header_t*) blankframe[blankoffset];
cprintf(BG_RED, "Fifo %d: Missing Packet Error: Adding blank packets mismatch "
"pnum_offset %d, fnum_thread %d, missingpacket_buffer 0x%x, missingpacket_blank 0x%x\n",
i,frameBufferoffset[i],threadFrameNumber[i],
*( (uint16_t*) frameBuffer_header->missingPacket),
*( (uint16_t*) blankframe_header->missingPacket));
exit(-1);
}else{
#ifdef DEBUG4
cprintf(GREEN, "Fifo %d: Missing Packet Adding blank packets success "
"pnum_offset %d, fnum_thread %d, missingpacket_buffer 0x%x\n",
i,frameBufferoffset[i],threadFrameNumber[i],
*( (uint16_t*) frameBuffer_header->missingPacket));
#endif
frameBufferoffset[i]++;
blankoffset++;
}
if(threadFrameNumber[i] != presentFrameNumber){
//set fullframe and dont let fifo pop over it
fullframe[i] = true;
popReady[i] = false;
}
}
//add current packet
if(threadFrameNumber[i] == presentFrameNumber){
if(currentPacketNumber[i] != (uint32_t)(frameBufferoffset[i]-(i*packetsPerFrame/numberofListeningThreads))+1){
cprintf(BG_RED, "Fifo %d: Correct Packet Error:Adding current packet mismatch "
"pnum_offset %d,pnum %d fnum %d, (firmware fnum %d)\n",
i,frameBufferoffset[i],currentPacketNumber[i],
threadFrameNumber[i],(uint32_t)(*( (uint64_t*) packetBuffer_footer)));
exit(-1);
}
frameBuffer[frameBufferoffset[i]] = packetBuffer[i] + HEADER_SIZE_NUM_TOT_PACKETS;
tempframe_header = (eiger_packet_header_t*) tempbuffer[tempoffset[i]];
tempframe_footer = (eiger_packet_footer_t*) (tempbuffer[tempoffset[i]] + footer_offset);
}
if(threadFrameNumber[i] != presentFrameNumber)
threadFrameNumber[i] = presentFrameNumber;
}
}
}
@ -1904,10 +2071,11 @@ void UDPStandardImplementation::processWritingBufferPacketByPacket(int ithread){
waitWritingBufferForNextAcquisition(ithread);
}/*--end of loop for each acquisition (outer loop) */
}
void UDPStandardImplementation::waitWritingBufferForNextAcquisition(int ithread){
FILE_LOG(logDEBUG1) << __AT__ << " called";
@ -2428,7 +2596,7 @@ void UDPStandardImplementation::handleDataCompression(int ithread, char* wbuffer
br=0;
if (thisEvent==PHOTON_MAX) {
receiverData[ithread]->getFrameNumber(buff[0]);
//iFrame=receiverdata[ithread]->getFrameNumber(buff);
//iFrame=receiverData[ithread]->getFrameNumber(buff);
#ifdef MYROOT1
myTree[ithread]->Fill();
//cout << "Fill in event: frmNr: " << iFrame << " ix " << ix << " iy " << iy << " type " << thisEvent << endl;
@ -2524,37 +2692,13 @@ int UDPStandardImplementation::startWriting(){
int ret,i,j;
bool endofacquisition;
int nf;
bool fullframe[numListeningThreads];
volatile uint32_t tempframenum[numListeningThreads];
uint32_t presentframenum;
uint32_t lastpacketheader[numListeningThreads], currentpacketheader[numListeningThreads];
int numberofmissingpackets[numListeningThreads];
char* tempbuffer[MAX_VALUE];
char* blankframe[MAX_VALUE];
int tempoffset[numListeningThreads];
int blankoffset;
for(i=0;i<MAX_VALUE;++i){
tempbuffer[i] = 0;
blankframe[i] = 0;
}
uint32_t LAST_PACKET_VALUE;
eiger_packet_footer_t* wbuf_footer=0;
eiger_packet_header_t* tempframe_header=0;
eiger_packet_footer_t* tempframe_footer=0;
eiger_packet_header_t* blankframe_header=0;
unsigned char* blankframe_data=0;
while(1){
@ -2562,59 +2706,9 @@ int UDPStandardImplementation::startWriting(){
//so that the first frame is always copied
guiData = latestData;
//blank frame
if(myDetectorType == EIGER){
for(i=0;i<packetsPerFrame;++i){
if(blankframe[i]){delete [] blankframe[i]; blankframe[i] = 0;}
//blank frame for each packet
blankframe[i] = new char[onePacketSize];
blankframe_header = (eiger_packet_header_t*) blankframe[i];
//set missing packet to 0xff
*( (uint16_t*) blankframe_header->missingpacket) = missingPacketValue;
//set each value inside blank frame to 0xff
for(j=0;j<(oneDataSize);++j){
blankframe_data = (unsigned char*)blankframe[i] + sizeof(eiger_packet_header_t) + j;
*(blankframe_data) = 0xFF;
}
//verify
if (*( (uint16_t*) blankframe_header->missingpacket) != missingPacketValue){
cprintf(RED,"blank frame not detected at %d: 0x%x\n",i,*( (uint16_t*) blankframe_header->missingpacket) );
exit(-1);
}
#ifdef FIFO_DEBUG
cprintf(GREEN,"packet %d blank frame 0x%x\n",i,(void*)(blankframe[i]));
#endif
}
//last packet numbers for different dynamic ranges
LAST_PACKET_VALUE = (packetsPerFrame/numListeningThreads);
}
//allow them all to be popped initially
for(i=0;i<numListeningThreads;++i){
fullframe[i] = false;
tempoffset[i] = (i*packetsPerFrame/numListeningThreads);
blankoffset = 0;
lastpacketheader[i] = 0;
currentpacketheader[i] = 0;
numberofmissingpackets[i] = 0;
numpackets[i] = 0;
tempframenum[i] = 0;
}
endofacquisition = false;
presentframenum = 0;
while((1<<ithread)&writerthreads_mask){
while((1<<ithread)&writerthreads_mask){
@ -2633,181 +2727,19 @@ int UDPStandardImplementation::startWriting(){
//anything that is not a data packet of right size
if(numpackets[i] != onePacketSize){
//header packet
if(numpackets[i] == EIGER_HEADER_LENGTH) {cprintf(BG_RED,"weird, header frame packet recieved. shouldnt\n"); exit(-1);}
//dummy packet
else if(!numpackets[i]){
#ifdef EIGER_DEBUG3
cprintf(RED, "Dummy packet: %d from fifo %d\n", numpackets[i],i);
#endif
//cout<<"tempoffset["<<i<<"]:"<<tempoffset[i]<<" checking against:"<<(((i+1)*packetsPerFrame/numListeningThreads))<<endl;
//cannot check for full frame as it will be false if its done with all packets OR waiting for packets
if(tempoffset[i]!= (((i+1)*packetsPerFrame/numListeningThreads))){
#ifdef VERYDEBUG
cprintf(RED, "Dummy packet: Adding missing packets\n");
#endif
//add missing packets
numberofmissingpackets[i] = (LAST_PACKET_VALUE - lastpacketheader[i]);
//to decrement from packetsInFile to calculate packet loss
for(j=0;j<numberofmissingpackets[i];++j){
tempbuffer[tempoffset[i]] = blankframe[blankoffset];
tempframe_header = (eiger_packet_header_t*) tempbuffer[tempoffset[i]];
blankframe_header = (eiger_packet_header_t*) blankframe[blankoffset];
if (*( (uint16_t*) tempframe_header->missingpacket)!= missingPacketValue){
cprintf(BG_RED, "dummy blank mismatch num4 earlier2! "
"i:%d pnum:%d fnum:%d missingpacket:0x%x actual missingpacket:0x%x\n",
i,tempoffset[i],tempframenum[i],
*( (uint16_t*) tempframe_header->missingpacket),
*( (uint16_t*) blankframe_header->missingpacket));
exit(-1);
}else
#ifdef PADDING
cprintf(GREEN, "blank packet i:%d pnum:%d fnum:%d missingpacket:0x%x\n",i,
tempoffset[i],tempframenum[i],*( (uint16_t*) tempframe_header->missingpacket));
#endif
tempoffset[i]++;
blankoffset++;
}
//set fullframe and dont let fifo pop over it until written
fullframe[i] = true;
popready[i] = false;
}
}
#ifdef EIGER_DEBUG3
else{
cprintf(RED, "WARNING: Got a weird packet size: %d from fifo %d\n", numpackets[i],i);
continue;
}
#endif
}
//not a full frame
if(!fullframe[i]){
wbuf_footer = (eiger_packet_footer_t*)(wbuf[i] + footer_offset + HEADER_SIZE_NUM_TOT_PACKETS);
#ifdef EIGER_DEBUG3
cprintf(GREEN,"**pnum of %d: %d\n",i,(*( (uint16_t*) wbuf_footer->packetnum)));
#endif
//update frame number
if(!((uint32_t)(*( (uint64_t*) wbuf_footer)))){
cprintf(BG_RED,"%d VERY WEIRD frame number=%d and popready:%d\n",
i,(uint32_t)(*( (uint64_t*) wbuf_footer)),popready[i]);
popready[i]=true;
continue;
}
tempframenum[i] =(uint32_t)(*( (uint64_t*) wbuf_footer));
tempframenum[i] += (startFrameIndex-1);
//WRONG FRAME - leave
if(tempframenum[i] != presentframenum){
#ifdef PADDING
cout<<"wrong packet"<<endl;
#endif
#ifdef EIGER_DEBUG3
cprintf(RED,"fifo:%d packet from next frame %d, add missing packets to the right one %d\n",
i,tempframenum[i],presentframenum );
cprintf(RED,"current wrong frame:%d wrong frame packet number:%d\n",
(uint32_t)(*( (uint64_t*) wbuf_footer)),
*( (uint16_t*) wbuf_footer->packetnum));
#endif
tempframenum[i] = presentframenum;
//add missing packets
numberofmissingpackets[i] = (LAST_PACKET_VALUE - lastpacketheader[i]);
#ifdef VERYDEBUG
if(numberofmissingpackets[i]>0)
cprintf(BG_RED,"fifo:%d missing packet from: %d now\n",i,lastpacketheader[i]);
#endif
//to decrement from packetsInFile to calculate packet loss
for(j=0;j<numberofmissingpackets[i];++j){
tempbuffer[tempoffset[i]] = blankframe[blankoffset];
tempframe_header = (eiger_packet_header_t*) tempbuffer[tempoffset[i]];
blankframe_header = (eiger_packet_header_t*) blankframe[blankoffset];
if (*( (uint16_t*) tempframe_header->missingpacket)!= missingPacketValue){
cprintf(BG_RED, "wrong blank mismatch num4 earlier2! "
"i:%d pnum:%d fnum:%d missingpacket:0x%x actual missingpacket:0x%x add:0x%p\n",
i,tempoffset[i],tempframenum[i],
*( (uint16_t*) tempframe_header->missingpacket),
*( (uint16_t*) blankframe_header->missingpacket),
(void*)(tempbuffer[tempoffset[i]]));
exit(-1);
}else
#ifdef PADDING
cprintf(GREEN, "blank packet i:%d pnum:%d fnum:%d missingpacket:0x%x add:0x%x\n",
i,tempoffset[i],tempframenum[i],
*( (uint16_t*) tempframe_header->missingpacket),
(void*)(tempbuffer[tempoffset[i]]));
#endif
tempoffset[i] ++;
blankoffset ++;
}
//set fullframe and dont let fifo pop over it until written
fullframe[i] = true;
popready[i] = false;
}
//CORRECT FRAME - continue building frame
else {
#ifdef PADDING
cout<<"correct packet"<<endl;
#endif
#ifdef EIGER_DEBUG3
cprintf(GREEN,"**tempfraemnum of %d: %d\n",i,tempframenum[i]);
#endif
//update current packet
currentpacketheader[i] = *( (uint16_t*) wbuf_footer->packetnum);
#ifdef VERYVERBOSE
cprintf(GREEN,"**fifo:%d currentpacketheader: %d lastpacketheader %d tempoffset:%d\n",
i,currentpacketheader[i],lastpacketheader[i], tempoffset[i]);
#endif
//add missing packets
numberofmissingpackets[i] = (currentpacketheader[i] - lastpacketheader[i] -1);
#ifdef VERYDEBUG
if(numberofmissingpackets[i]>0)
cprintf(BG_RED,"fifo:%d missing packet from: %d now at :%d tempoffset:%d\n",
i,lastpacketheader[i],currentpacketheader[i],tempoffset[i]);
#endif
//to decrement from packetsInFile to calculate packet loss
for(j=0;j<numberofmissingpackets[i];++j){
tempbuffer[tempoffset[i]] = blankframe[blankoffset];
{
tempframe_header = (eiger_packet_header_t*) tempbuffer[tempoffset[i]];
blankframe_header = (eiger_packet_header_t*) blankframe[blankoffset];
if (*( (uint16_t*) tempframe_header->missingpacket)!= missingPacketValue){
cprintf(BG_RED, "correct blank mismatch num4 earlier2! "
"i:%d pnum:%d fnum:%d missingpacket:0x%x actual missingpacket:0x%x add:0x%p\n",
i,tempoffset[i],tempframenum[i],
*( (uint16_t*) tempframe_header->missingpacket),
*( (uint16_t*) blankframe_header->missingpacket),
(void*)(tempbuffer[tempoffset[i]]));
exit(-1);
}else
#ifdef PADDING
cprintf(GREEN, "blank packet i:%d pnum:%d fnum:%d missingpacket:0x%x add:0x%x\n",
i,tempoffset[i],tempframenum[i],
*( (uint16_t*) tempframe_header->missingpacket),
(void*)(tempbuffer[tempoffset[i]]));
#endif
tempoffset[i] ++;
blankoffset ++;
}
//add current packet
if(currentpacketheader[i] != (uint32_t)(tempoffset[i]-(i*packetsPerFrame/numListeningThreads))+1){
cprintf(BG_RED, "correct pnum mismatch earlier! tempoffset[%d]:%d pnum:%d fnum:%d rfnum:%d\n",
i,tempoffset[i],currentpacketheader[i],
tempframenum[i],(uint32_t)(*( (uint64_t*) wbuf_footer)));
exit(-1);
}
tempbuffer[tempoffset[i]] = wbuf[i] + HEADER_SIZE_NUM_TOT_PACKETS;
tempframe_header = (eiger_packet_header_t*) tempbuffer[tempoffset[i]];