changes to make jungfrau work

This commit is contained in:
Dhanya Maliakal 2016-10-27 08:27:31 +02:00
parent 0cd9261332
commit 942f024761
4 changed files with 69 additions and 50 deletions

View File

@ -663,6 +663,9 @@ private:
/** Total fifo size */
uint32_t fifoSize;
/** fifo buffer header size */
uint32_t fifoBufferHeaderSize;
/** Missing Packet */
int missingPacketinFile;

View File

@ -615,13 +615,14 @@ enum communicationProtocol{
/*int k = 0;*/
while(length>0){
nsending = (length>packet_size) ? packet_size:length;
if(length<packet_size)
nsending = length; //works for jungfrau to read packet header
else
nsending = (length>packet_size) ? packet_size:length; //works for eiger to get packets to discard image header packets
nsent = recvfrom(socketDescriptor,(char*)buf+total_sent,nsending, 0, (struct sockaddr *) &clientAddress, &clientAddress_length);
if(nsent < packet_size) {
if(nsent){
if((nsent != header_packet_size) && (nsent != -1))
if(nsent != nsending){ //if((nsent != nsending)){ && (nsent < packet_size)){
if(nsent && (nsent != header_packet_size) && (nsent != -1))
cprintf(RED,"Incomplete Packet size %d\n",nsent);
}
break;
}
length-=nsent;

View File

@ -129,7 +129,7 @@ typedef struct {
#define JFRAU_FILE_FRAME_HEADER_LENGTH 16
#define JFRAU_FIFO_SIZE 2500 //cannot be less than max jobs per thread = 1000
#define JFRAU_PACKETS_PER_FRAME 128
#define JFRAU_HEADER_LENGTH 22

View File

@ -368,7 +368,7 @@ int UDPStandardImplementation::setupFifoStructure(){
fifo[i] = new CircularFifo<char>(fifoSize);
//allocate memory
mem0[i] = (char*)malloc((bufferSize * numberofJobsPerBuffer + HEADER_SIZE_NUM_TOT_PACKETS) * fifoSize);
mem0[i] = (char*)malloc((bufferSize * numberofJobsPerBuffer + fifoBufferHeaderSize) * fifoSize);
if (mem0[i] == NULL){
cprintf(BG_RED,"Error: Could not allocate memory for listening \n");
return FAIL;
@ -376,10 +376,10 @@ int UDPStandardImplementation::setupFifoStructure(){
//push free address into fifoFree
buffer[i]=mem0[i];
while (buffer[i] < (mem0[i]+(bufferSize * numberofJobsPerBuffer + HEADER_SIZE_NUM_TOT_PACKETS) * (fifoSize-1))) {
while (buffer[i] < (mem0[i]+(bufferSize * numberofJobsPerBuffer + fifoBufferHeaderSize) * (fifoSize-1))) {
//cprintf(BLUE,"fifofree %d: push 0x%p\n",i,(void*)buffer[i]);
/*for(int k=0;k<bufferSize;k=k+4){
sprintf(buffer[i]+HEADER_SIZE_NUM_TOT_PACKETS+k,"mem%d",i);
sprintf(buffer[i]+fifoBufferHeaderSize+k,"mem%d",i);
}*/
sprintf(buffer[i],"mem%d",i);
while(!fifoFree[i]->push(buffer[i]));
@ -387,7 +387,7 @@ int UDPStandardImplementation::setupFifoStructure(){
#ifdef DEBUG5
cprintf(BLUE,"Info: %d fifostructure free pushed into fifofree %p\n", i, (void*)(buffer[i]));
#endif
buffer[i] += (bufferSize * numberofJobsPerBuffer + HEADER_SIZE_NUM_TOT_PACKETS);
buffer[i] += (bufferSize * numberofJobsPerBuffer + fifoBufferHeaderSize);
}
}
cout << "Fifo structure(s) reconstructed" << endl;
@ -763,6 +763,7 @@ int UDPStandardImplementation::setDetectorType(const detectorType d){
maxFramesPerFile = MAX_FRAMES_PER_FILE;
fifoSize = GOTTHARD_FIFO_SIZE;
fifoDepth = GOTTHARD_FIFO_SIZE;
fifoBufferHeaderSize= HEADER_SIZE_NUM_TOT_PACKETS;
//footerOffset = Not applicable;
break;
case PROPIX:
@ -776,6 +777,7 @@ int UDPStandardImplementation::setDetectorType(const detectorType d){
maxFramesPerFile = MAX_FRAMES_PER_FILE;
fifoSize = PROPIX_FIFO_SIZE;
fifoDepth = PROPIX_FIFO_SIZE;
fifoBufferHeaderSize= HEADER_SIZE_NUM_TOT_PACKETS;
//footerOffset = Not applicable;
break;
case MOENCH:
@ -789,6 +791,7 @@ int UDPStandardImplementation::setDetectorType(const detectorType d){
maxFramesPerFile = MOENCH_MAX_FRAMES_PER_FILE;
fifoSize = MOENCH_FIFO_SIZE;
fifoDepth = MOENCH_FIFO_SIZE;
fifoBufferHeaderSize= HEADER_SIZE_NUM_TOT_PACKETS;
//footerOffset = Not applicable;
break;
case EIGER:
@ -804,6 +807,7 @@ int UDPStandardImplementation::setDetectorType(const detectorType d){
fifoSize = EIGER_FIFO_SIZE;
fifoDepth = EIGER_FIFO_SIZE;
footerOffset = EIGER_DATA_PACKET_HEADER_SIZE + oneDataSize;
fifoBufferHeaderSize= HEADER_SIZE_NUM_TOT_PACKETS;
break;
case JUNGFRAUCTB:
packetsPerFrame = JCTB_PACKETS_PER_FRAME;
@ -816,6 +820,7 @@ int UDPStandardImplementation::setDetectorType(const detectorType d){
maxFramesPerFile = JFCTB_MAX_FRAMES_PER_FILE;
fifoSize = JCTB_FIFO_SIZE;
fifoDepth = JCTB_FIFO_SIZE;
fifoBufferHeaderSize= HEADER_SIZE_NUM_TOT_PACKETS;
//footerOffset = Not applicable;
break;
case JUNGFRAU:
@ -829,6 +834,7 @@ int UDPStandardImplementation::setDetectorType(const detectorType d){
maxFramesPerFile = JFRAU_MAX_FRAMES_PER_FILE;
fifoDepth = JFRAU_FIFO_SIZE;
fifoSize = JFRAU_FIFO_SIZE;
fifoBufferHeaderSize= JFRAU_FILE_FRAME_HEADER_LENGTH;
//footerOffset = Not applicable;
break;
default:
@ -1988,8 +1994,7 @@ void UDPStandardImplementation::startListening(){
}
//write packet count to buffer
if(myDetectorType == EIGER)
(*((uint32_t*)(buffer[ithread]))) = (rc/onePacketSize);
(*((uint32_t*)(buffer[ithread]))) = (rc/onePacketSize);
if(dataCompressionEnable)
(*((uint32_t*)(buffer[ithread]))) = processListeningBuffer(ithread, carryonBufferSize, tempBuffer, rc);
@ -2037,7 +2042,7 @@ int UDPStandardImplementation::prepareAndListenBuffer(int ithread, int cSize, ch
int receivedSize = 0;
//carry over from previous buffer
if(cSize) memcpy(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS, temp, cSize);
if(cSize) memcpy(buffer[ithread] + fifoBufferHeaderSize, temp, cSize);
if(!activated){
@ -2059,14 +2064,14 @@ int UDPStandardImplementation::prepareAndListenBuffer(int ithread, int cSize, ch
//copy dummy packets
receivedSize = framestoclone*packetsPerFrame*onePacketSize;
memset(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS, 0xFF,receivedSize);
memset(buffer[ithread] + fifoBufferHeaderSize, 0xFF,receivedSize);
//set fnum, pnum and deactivatedpacket label
eiger_packet_header_t* header;
eiger_packet_footer_t* footer;
int pnum=0;
//loop by each packet
for(int offset=HEADER_SIZE_NUM_TOT_PACKETS;
for(int offset=fifoBufferHeaderSize;
offset<receivedSize;
offset+=onePacketSize){
header = (eiger_packet_header_t*)(buffer[ithread] + offset);
@ -2090,12 +2095,22 @@ int UDPStandardImplementation::prepareAndListenBuffer(int ithread, int cSize, ch
return receivedSize;
}
if(status != TRANSMITTING){
if(myDetectorType == JUNGFRAU){
jfrau_packet_header_t* header;
int pcount = packetsPerFrame-1;
lSize = JFRAU_HEADER_LENGTH;
receivedSize = udpSocket[ithread]->ReceiveDataOnly(buffer[ithread] + fifoBufferHeaderSize, lSize);
header = (jfrau_packet_header_t*)(buffer[ithread] + fifoBufferHeaderSize + iloop * (JFRAU_HEADER_LENGTH+JFRAU_ONE_DATA_SIZE));
}
else{
receivedSize = udpSocket[ithread]->ReceiveDataOnly(buffer[ithread] + fifoBufferHeaderSize + cSize, (bufferSize * numberofJobsPerBuffer) - cSize);
//eiger returns 0 when header packet caught
while(receivedSize < onePacketSize && status != TRANSMITTING)
receivedSize = udpSocket[ithread]->ReceiveDataOnly(buffer[ithread] + fifoBufferHeaderSize + cSize, (bufferSize * numberofJobsPerBuffer) - cSize);
if(status != TRANSMITTING)
receivedSize = udpSocket[ithread]->ReceiveDataOnly(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS + cSize, (bufferSize * numberofJobsPerBuffer) - cSize);
//eiger returns 0 when header packet caught
while(receivedSize < onePacketSize && status != TRANSMITTING)
receivedSize = udpSocket[ithread]->ReceiveDataOnly(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS + cSize, (bufferSize * numberofJobsPerBuffer) - cSize);
}
}
totalListeningPacketCount[ithread] += (receivedSize/onePacketSize);
@ -2105,14 +2120,14 @@ int UDPStandardImplementation::prepareAndListenBuffer(int ithread, int cSize, ch
if(myDetectorType == JUNGFRAU){
jfrau_packet_header_t* header;
for(int iloop=0;iloop<2;iloop++){
header = (jfrau_packet_header_t*)(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS + iloop * (JFRAU_HEADER_LENGTH+JFRAU_ONE_DATA_SIZE));
cprintf(RED,"[%d]: packetnumber:%x\n",iloop, (*( (uint8_t*) header->packetNumber)));
cprintf(RED," : framenumber :%x\n", (*( (uint32_t*) header->frameNumber))&0xffffff);
for(int iloop=0;iloop<128;iloop++){
header = (jfrau_packet_header_t*)(buffer[ithread] + fifoBufferHeaderSize + iloop * (JFRAU_HEADER_LENGTH+JFRAU_ONE_DATA_SIZE));
cprintf(RED,"[%d]: packetnumber:%d\n",iloop, (*( (uint8_t*) header->packetNumber)));
cprintf(RED," : framenumber :%d\n", (*( (uint32_t*) header->frameNumber))&frameIndexMask);
}
}else if(myDetectorType == EIGER){
eiger_packet_header_t* header = (eiger_packet_header_t*) (buffer[ithread]+HEADER_SIZE_NUM_TOT_PACKETS);
eiger_packet_footer_t* footer = (eiger_packet_footer_t*)(buffer[ithread] + footerOffset + HEADER_SIZE_NUM_TOT_PACKETS);
eiger_packet_header_t* header = (eiger_packet_header_t*) (buffer[ithread]+fifoBufferHeaderSize);
eiger_packet_footer_t* footer = (eiger_packet_footer_t*)(buffer[ithread] + footerOffset + fifoBufferHeaderSize);
cprintf(GREEN,"thread:%d footeroffset:%dsubframenum:%d oldpacketnum:%d new pnum:%d new fnum:%d\n",
ithread,footerOffset,
(*( (unsigned int*) header->subFrameNumber)),
@ -2143,15 +2158,15 @@ void UDPStandardImplementation::startFrameIndices(int ithread){
startFrameIndex = 0; //frame number always resets
break;
case JUNGFRAU:
header = (jfrau_packet_header_t*)(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS);
startFrameIndex = (*( (uint32_t*) header->frameNumber))&0xffffff;
header = (jfrau_packet_header_t*)(buffer[ithread] + fifoBufferHeaderSize);
startFrameIndex = (*( (uint32_t*) header->frameNumber))&frameIndexMask;
break;
default:
if(shortFrameEnable < 0){
startFrameIndex = (((((uint32_t)(*((uint32_t*)(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS))))+1)
startFrameIndex = (((((uint32_t)(*((uint32_t*)(buffer[ithread] + fifoBufferHeaderSize))))+1)
& (frameIndexMask)) >> frameIndexOffset);
}else{
startFrameIndex = ((((uint32_t)(*((uint32_t*)(buffer[ithread]+HEADER_SIZE_NUM_TOT_PACKETS))))
startFrameIndex = ((((uint32_t)(*((uint32_t*)(buffer[ithread]+fifoBufferHeaderSize))))
& (frameIndexMask)) >> frameIndexOffset);
}
break;
@ -2302,7 +2317,7 @@ uint32_t UDPStandardImplementation::processListeningBuffer(int ithread, int &cSi
case PROPIX:
//for short frames, 1 packet/frame, so split frames is not a topic
if(shortFrameEnable == -1){
lastPacketOffset = (((packetCount - 1) * onePacketSize) + HEADER_SIZE_NUM_TOT_PACKETS);
lastPacketOffset = (((packetCount - 1) * onePacketSize) + fifoBufferHeaderSize);
#ifdef DEBUG4
cprintf(BLUE, "Listening_Thread %d: Last Packet Offset:%d\n",ithread, lastPacketOffset);
#endif
@ -2319,17 +2334,17 @@ uint32_t UDPStandardImplementation::processListeningBuffer(int ithread, int &cSi
}
}
#ifdef DEBUG4
cprintf(BLUE, "Listening_Thread %d: First Header:%d\n", (((((uint32_t)(*((uint32_t*)(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS))))+1)
cprintf(BLUE, "Listening_Thread %d: First Header:%d\n", (((((uint32_t)(*((uint32_t*)(buffer[ithread] + fifoBufferHeaderSize))))+1)
& (frameIndexMask)) >> frameIndexOffset));
#endif
break;
case MOENCH:
lastPacketOffset = (((packetCount - 1) * onePacketSize) + HEADER_SIZE_NUM_TOT_PACKETS);
lastPacketOffset = (((packetCount - 1) * onePacketSize) + fifoBufferHeaderSize);
#ifdef DEBUG4
cprintf(BLUE, "Listening_Thread %d: First Header:%d\t First Packet:%d\t Last Header:%d\t Last Packet:%d\tLast Packet Offset:%d\n",
(((((uint32_t)(*((uint32_t*)(buffer[ithread]+HEADER_SIZE_NUM_TOT_PACKETS))))) & (frameIndexMask)) >> frameIndexOffset),
((((uint32_t)(*((uint32_t*)(buffer[ithread]+HEADER_SIZE_NUM_TOT_PACKETS))))) & (packetIndexMask)),
(((((uint32_t)(*((uint32_t*)(buffer[ithread]+fifoBufferHeaderSize))))) & (frameIndexMask)) >> frameIndexOffset),
((((uint32_t)(*((uint32_t*)(buffer[ithread]+fifoBufferHeaderSize))))) & (packetIndexMask)),
(((((uint32_t)(*((uint32_t*)(buffer[ithread]+lastpacketoffset))))) & (frameIndexMask)) >> frameIndexOffset),
((((uint32_t)(*((uint32_t*)(buffer[ithread]+lastpacketoffset))))) & (packetIndexMask)),
lastPacketOffset);
@ -2357,33 +2372,33 @@ uint32_t UDPStandardImplementation::processListeningBuffer(int ithread, int &cSi
case JUNGFRAU:
lastPacketOffset = (((packetCount - 1) * onePacketSize) + HEADER_SIZE_NUM_TOT_PACKETS);
lastPacketOffset = (((packetCount - 1) * onePacketSize) + fifoBufferHeaderSize);
#ifdef DEBUG4
header = (jfrau_packet_header_t*) (buffer[ithread]+HEADER_SIZE_NUM_TOT_PACKETS);
header = (jfrau_packet_header_t*) (buffer[ithread]+fifoBufferHeaderSize);
cprintf(BLUE, "Listening_Thread: First Header:%d\t First Packet:%d\n",
(*( (uint32_t*) header->frameNumber))&0xffffff,
(*( (uint32_t*) header->frameNumber))&frameIndexMask,
(*( (uint8_t*) header->packetNumber)));
#endif
header = (jfrau_packet_header_t*) (buffer[ithread]+lastPacketOffset);
#ifdef DEBUG4
cprintf(BLUE, "Listening_Thread: Last Header:%du\t Last Packet:%d\n",
(*( (uint32_t*) header->frameNumber))&0xffffff,
(*( (uint32_t*) header->frameNumber))&frameIndexMask,
(*( (uint8_t*) header->packetNumber)));
#endif
//jungfrau last packet value is 0, so find the last packet and store the others in a temp storage
if((*( (uint8_t*) header->packetNumber))){
//cprintf(RED,"entering missing packet zone\n");
lastFrameHeader64 = (*( (uint32_t*) header->frameNumber))&0xffffff;
lastFrameHeader64 = (*( (uint32_t*) header->frameNumber))&frameIndexMask;
cSize += onePacketSize;
lastPacketOffset -= onePacketSize;
--packetCount;
while (lastFrameHeader64 == ((*( (uint32_t*) header->frameNumber))&0xffffff)){
while (lastFrameHeader64 == ((*( (uint32_t*) header->frameNumber))&frameIndexMask)){
cSize += onePacketSize;
lastPacketOffset -= onePacketSize;
header = (jfrau_packet_header_t*) (buffer[ithread]+lastPacketOffset);
#ifdef DEBUG4
cprintf(RED,"new header:%d new packet:%d\n",
(*( (uint32_t*) header->frameNumber))&0xffffff,
(*( (uint32_t*) header->frameNumber))&frameIndexMask,
(*( (uint8_t*) header->packetNumber)));
#endif
--packetCount;
@ -2713,7 +2728,7 @@ void UDPStandardImplementation::handleWithoutDataCompression(int ithread, char*
uint64_t tempframenumber;
uint32_t pnum;
uint32_t snum;
if(getFrameandPacketNumber(ithread, wbuffer + HEADER_SIZE_NUM_TOT_PACKETS,tempframenumber,pnum,snum) == FAIL){
if(getFrameandPacketNumber(ithread, wbuffer + fifoBufferHeaderSize,tempframenumber,pnum,snum) == FAIL){
//error in frame number sent by fpga
while(!fifoFree[ithread]->push(wbuffer));
@ -2732,7 +2747,7 @@ void UDPStandardImplementation::handleWithoutDataCompression(int ithread, char*
//callback to write data
if (cbAction < DO_EVERYTHING)
rawDataReadyCallBack((int)currentFrameNumber[ithread], wbuffer + HEADER_SIZE_NUM_TOT_PACKETS, npackets * onePacketSize,
rawDataReadyCallBack((int)currentFrameNumber[ithread], wbuffer + fifoBufferHeaderSize, npackets * onePacketSize,
sfilefd[ithread], latestData[ithread],pRawDataReady);//know which thread from sfilefd
@ -2778,7 +2793,7 @@ void UDPStandardImplementation::writeFileWithoutCompression(int ithread, char* w
//if write enabled
if((fileWriteEnable) && (sfilefd[ithread])){
if(numpackets){
int offset = HEADER_SIZE_NUM_TOT_PACKETS;
int offset = fifoBufferHeaderSize;
uint64_t nextFileFrameNumber;
int packetsWritten = 0;
//if(ithread) cout<<"numpackets:"<<numpackets<<"lastframenumberinfile:"<<lastFrameNumberInFile[ithread]<<endl;
@ -2850,7 +2865,7 @@ void UDPStandardImplementation::writeFileWithoutCompression(int ithread, char* w
uint64_t finalLastFrameNumberToSave = 0;
uint32_t pnum;
uint32_t snum;
if(getFrameandPacketNumber(ithread, wbuffer + HEADER_SIZE_NUM_TOT_PACKETS + ((numpackets - 1) * onePacketSize), finalLastFrameNumberToSave,pnum,snum) == FAIL){
if(getFrameandPacketNumber(ithread, wbuffer + fifoBufferHeaderSize + ((numpackets - 1) * onePacketSize), finalLastFrameNumberToSave,pnum,snum) == FAIL){
//error in frame number sent by fpga
while(!fifoFree[ithread]->push(wbuffer));
return;
@ -2942,7 +2957,7 @@ void UDPStandardImplementation::copyFrameToGui(int ithread, char* buffer, uint32
//copy date
guiNumPackets[ithread] = numpackets;
strcpy(guiFileName[ithread],completeFileName[ithread]);
memcpy(latestData[ithread],buffer+ HEADER_SIZE_NUM_TOT_PACKETS , numpackets*onePacketSize);
memcpy(latestData[ithread],buffer+ fifoBufferHeaderSize , numpackets*onePacketSize);
//let it know its got data
sem_post(&dataCallbackWriterSemaphore[ithread]);
@ -2970,7 +2985,7 @@ void UDPStandardImplementation::handleDataCompression(int ithread, char* wbuffer
uint64_t tempframenumber=-1;
uint32_t pnum;
uint32_t snum;
if(getFrameandPacketNumber(ithread, wbuffer + HEADER_SIZE_NUM_TOT_PACKETS, tempframenumber,pnum,snum) == FAIL){
if(getFrameandPacketNumber(ithread, wbuffer + fifoBufferHeaderSize, tempframenumber,pnum,snum) == FAIL){
//error in frame number sent by fpga
while(!fifoFree[ithread]->push(wbuffer));
return;
@ -2988,7 +3003,7 @@ void UDPStandardImplementation::handleDataCompression(int ithread, char* wbuffer
//variable definitions
char* buff[2]={0,0}; //an array just to be compatible with copyframetogui
char* data = wbuffer+ HEADER_SIZE_NUM_TOT_PACKETS; //data pointer to the next memory to be analysed
char* data = wbuffer+ fifoBufferHeaderSize; //data pointer to the next memory to be analysed
int ndata; //size of data returned
uint32_t np; //remaining number of packets returned
uint32_t npackets = (uint32_t)(*((uint32_t*)wbuffer)); //number of total packets
@ -3089,7 +3104,7 @@ void UDPStandardImplementation::handleDataCompression(int ithread, char* wbuffer
remainingsize -= ((buff[0] + ndata) - data);
data = buff[0] + ndata;
if(data > (wbuffer + HEADER_SIZE_NUM_TOT_PACKETS + npackets * onePacketSize) )
if(data > (wbuffer + fifoBufferHeaderSize + npackets * onePacketSize) )
cprintf(BG_RED,"Writing_Thread %d: Error: Compression data goes out of bounds!\n", ithread);
}