This commit is contained in:
Dhanya Maliakal 2015-10-16 10:47:21 +02:00
parent a3e12e7955
commit 169a91b6ac
2 changed files with 405 additions and 492 deletions

View File

@ -317,15 +317,6 @@ private:
*/
void startListening();
/**
* Thread started which writes packets to file.
* It pops the fifo, processes and writes packets to file and pushes the addresses into the fifoFree
* This is continuously looped for each buffer in a nested loop, which is again looped for each acquisition
* Exits only for changing dynamic range, 10G parameters etc and recreated
*
*/
void startWriting();
/**
* Called by startListening
* Listens to buffer, until packet(s) received or shutdownUDPsocket called by client
@ -368,6 +359,16 @@ private:
*/
uint32_t processListeningBuffer(int ithread, int cSize,char* temp);
/**
* Thread started which writes packets to file.
* It calls popAndCheckEndofAcquisition to pop fifo and check if it is a dummy end buffer
* It then calls a function to process and write packets to file and pushes the addresses into the fifoFree
* This is continuously looped for each buffer in a nested loop, which is again looped for each acquisition
* Exits only for changing dynamic range, 10G parameters etc and recreated
*
*/
void startWriting();
/**
* Called by StartWriting
* Pops buffer from all the FIFOs and checks for dummy frames and end of acquisition
@ -392,9 +393,40 @@ private:
*/
void stopWriting(int ithread, char* wbuffer[]);
void processWritingBuffer(int ithread, char* wbuffer[], uint32_t nP[]);
void processWritingBufferPacketByPacket();
/**
* Called by startWriting or processWritingBufferPacketByPacket upon reading a frame (for eiger)
* Updates parameters, (writes headers for eiger) and writes to file when not a dummy frame
* Copies data for gui display and frees addresses popped from FIFOs
* @param ithread writing thread index
* @param wbuffer writing buffer popped out from FIFO
* @param npackets number of packets
*/
void handleWithoutDataCompression(int ithread, char* wbuffer[],int npackets);
/**
* Calle by handleWithoutDataCompression
* Creating headers Writing to file without compression
* @param wbuffer is the address of buffer popped out of FIFO
* @param numpackets is the number of packets
*/
void writeFileWithoutCompression(char* wbuffer[],int numpackets);
/**
* Called by writeToFileWithoutCompression()
* Create headers for file writing (at the moment, this is eiger specific)
* @param wbuffer writing buffer popped from FIFOs
*/
void createHeaders(char* wbuffer[]);
/**
* Called by handleWithoutDataCompression and handleWithCompression after writing to file
* Copy frames for GUI and updates appropriate parameters for frequency frames to gui
* Uses semaphore for nth frame mode
* @param buffer buffer to copy
*/
void copyFrameToGui(char* buffer[]);
void processWritingBufferPacketByPacket();
/*************************************************************************
* Class Members *********************************************************
@ -674,11 +706,7 @@ private:
private:
/**
* Copy frames to gui
* uses semaphore for nth frame mode
*/
void copyFrameToGui(char* startbuf[], char* buf=NULL);
/**
* Creates new tree and file for compression
@ -688,26 +716,6 @@ private:
*/
int createCompressionFile(int ithr, int iframe);
/**
* Writing to file without compression
* @param buf is the address of buffer popped out of fifo
* @param numpackets is the number of packets
* @param framenum current frame number
*/
void writeToFile_withoutCompression(char* buf[],int numpackets, uint32_t framenum);
/**
* updates parameters and writes to file when not a dummy frame
* Also calls writeToFile_withoutCompression or handleDataCompression
* Called by startWriting()
* @param ithread writing thread number
* @param wbuffer writer buffer
* @param npackets number of packets
*/
void handleWithoutDataCompression(int ithread, char* wbuffer[],int npackets);
/**
* data compression for each fifo output
* @param ithread writing thread number

View File

@ -775,8 +775,11 @@ int UDPStandardImplementation::startReceiver(char *c){
measurementStarted = false;
startFrameIndex = 0;
frameIndex = 0;
if(!acqStarted)
if(!acqStarted){
currentFrameNumber = 0; //has to be zero to add to startframeindex for each scan
acquisitionIndex = 0;
frameIndex = 0;
}
for(int i = 0; i < numberofListeningThreads; ++i)
totalListeningFrameCount[i] = 0;
packetsCaught = 0;
@ -983,7 +986,7 @@ void UDPStandardImplementation::readFrame(char* c,char** raw, uint64_t &startAcq
cprintf(CYAN,"Info: gonna post\n");
#endif
//release after getting data
sem_post(&smp);
sem_post(&writerGuiSemaphore);
}
#ifdef DEBUG4
cprintf(CYAN,"Info: done post\n");
@ -1771,7 +1774,10 @@ void UDPStandardImplementation::startWriting(){
processWritingBufferPacketByPacket();
break;
default:
processWritingBuffer(ithread, wbuf, numPackets);
if(!dataCompressionEnable)
handleWithoutDataCompression(ithread, wbuf, numPackets[0]);
else
handleDataCompression(ithread,wbuf,d, xmax, ymax, nf);
break;
}
@ -1899,180 +1905,372 @@ void UDPStandardImplementation::stopWriting(int ithread, char* wbuffer[]){
cprintf(GREEN,"Info: Writing_Thread %d: End of Acquisition\n",ithread);
//free fifo
for(int i=0; i<numberofListeningThreads; ++i)
for(int i=0; i<numberofListeningThreads; ++i){
while(!fifoFree[i]->push(wbuffer[i]));
#ifdef
}
void UDPStandardImplementation::processWritingBuffer(int ithread, char* wbuffer[], uint32_t nP[]){
FILE_LOG(logDEBUG1) << __AT__ << " called";
}
void UDPStandardImplementation::processWritingBufferPacketByPacket(int ithread, char* wbuffer[], uint32_t nP[]){
FILE_LOG(logDEBUG1) << __AT__ << " called";
uint64_t tempframenumber = ((uint32_t)(*((uint32_t*)(wbuffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS))));
//for gotthard and normal frame, increment frame number to separate fnum and pnum
if (myDetectorType == PROPIX ||(myDetectorType == GOTTHARD && shortFrameEnable == -1))
tempframenumber++;
//get frame number
tempframenumber = (tempframenumber & frameIndexMask) >> frameIndexOffset;
//single thread, just assign and process without compression
if(!dataCompressionEnable){
currentFrameNumber = tempframenumber;
handleWithoutDataCompression(ithread, wbuffer, nP[0]);
#ifdef FIFODEBUG
cprintf(GREEN,"Writing_Thread %d: Freeing dummy-end buffer. Pushed into fifofree %p for listener %d\n", ithread,(void*)(wbuffer[i]),i);
#endif
}
//handling multiple threads
else{
pthread_mutex_lock(&progressMutex);
if(tempframenumber > currentFrameNumber)
currentFrameNumber = tempframenumber;
pthread_mutex_unlock(&progressMutex);
handleDataCompression(ithread,wbuffer,d, xmax, ymax, nf);
//all threads need to close file, reset mask and exit loop
closeFile(ithread);
pthread_mutex_lock(&statusMutex);
writerThreadsMask^=(1<<ithread);
#ifdef DEBUG4
cprintf(GREEN,"Writing_Thread %d: Resetting mask. New Mask: 0x%x\n", ithread,writerThreadsMask );
#endif
pthread_mutex_unlock(&statusMutex);
//thread 0 waits for all threads to finish & print statistics
if(ithread == 0){
//wait for all other threads
if(dataCompressionEnable){
cprintf(GREEN,"Writing_Thread %d: Waiting for jobs to be done.. current mask:0x%x\n",ithread, writerThreadsMask);
while(writerThreadsMask){
/*cout << "." << flush;*/
usleep(50000);
}
cprintf(GREEN,"Writing_Thread %d: Jobs Done!\n",ithread);
}
//ensure listening threads done before updating status as it returns to client (from stopReceiver)
while(listeningThreadsMask)
usleep(5000);
//update status
pthread_mutex_lock(&statusMutex);
status = RUN_FINISHED;
pthread_mutex_unlock(&(statusMutex));
//statistics
cprintf(GREEN, "Status: Run Finished\n");
if(!totalPacketsCaught){
cprintf(RED, "Total Missing Packets padded:%d\n",numTotMissingPackets);
cprintf(RED, "Total Packets Caught: 0\n");
cprintf(RED, "Total Frames Caught: 0\n");
}else{
cprintf(GREEN, "Total Missing Packets padded:%d\n",numTotMissingPackets);
cprintf(GREEN, "Total Packets Caught:%d\n", totalPacketsCaught);
cprintf(GREEN, "Total Frames Caught:%d\n",(totalPacketsCaught/packetsPerFrame));
}
//acquisition end
if (acquisitionFinishedCallBack)
acquisitionFinishedCallBack((totalPacketsCaught/packetsPerFrame), pAcquisitionFinished);
}
}
void UDPStandardImplementation::handleWithoutDataCompression(int ithread, char* wbuffer[],int npackets){
FILE_LOG(logDEBUG1) << __AT__ << " called";
int i;
//get frame number (eiger already gets it when it does packet to packet processing)
if (myDetectorType != EIGER){
uint64_t tempframenumber = ((uint32_t)(*((uint32_t*)(wbuffer[0] + HEADER_SIZE_NUM_TOT_PACKETS))));
//for gotthard and normal frame, increment frame number to separate fnum and pnum
if (myDetectorType == PROPIX ||(myDetectorType == GOTTHARD && shortFrameEnable == -1))
tempframenumber++;
//get frame number
currentFrameNumber = (tempframenumber & frameIndexMask) >> frameIndexOffset;
//set indices
acquisitionIndex = currentFrameNumber - startAcquisitionIndex;
frameIndex = currentFrameNumber - startFrameIndex;
}
}
/*
acquisitionIndex = currframenum - startAcquisitionIndex
frameIndex = currframenum - startFrameIndex;
}
*/
void UDPStandardImplementation::copyFrameToGui(char* startbuf[], char* buf){
FILE_LOG(logDEBUG1) << __AT__ << " called";
#ifdef VERY_VERY_DEBUG
cout << "copyframe" << endl;
//callback to write data
if (cbAction < DO_EVERYTHING){
switch(myDetectorType){
case EIGER:
for(i=0;i<npackets;++i)
rawDataReadyCallBack(currentFrameNumber, wbuffer[i], onePacketSize, sfilefd, guiData, pRawDataReady);
break;
default:
rawDataReadyCallBack(currentFrameNumber, wbuffer[0] + HEADER_SIZE_NUM_TOT_PACKETS, npackets * onePacketSize,
sfilefd, guiData,pRawDataReady);
break;
}
}
//write to file if enabled and update write parameters
if(npackets > 0)
writeFileWithoutCompression(wbuffer, npackets);
#ifdef DEBUG4
cprintf(GREEN,"Writing_Thread: Writing done\nGoing to copy frame\n");
#endif
//random read when gui not ready , also command line doesnt have nthframetogui
//else guidata always null as guidataready is always 1 after 1st frame, and seccond data never gets copied
if((!nFrameToGui) && (!guiData)){
#ifdef VERY_VERY_DEBUG
cprintf(GREEN,"doing nothing\n");
//copy frame for gui
if(npackets >= packetsPerFrame)
copyFrameToGui(wbuffer);
#ifdef DEBUG4
cprintf(GREEN,"Writing_Thread: Copied frame\n");
#endif
//free fifo addresses (eiger frees for each packet later)
if(myDetectorType != EIGER){
while(!fifoFree[0]->push(wbuffer[0]));
#ifdef FIFODEBUG
cprintf(GREEN,"Writing_Thread %d: Freed buffer, pushed into fifofree %p for listener 0\n",ithread, (void*)(wbuffer[0]));
#endif
}
}
void UDPStandardImplementation::writeFileWithoutCompression(char* wbuffer[],int numpackets){
FILE_LOG(logDEBUG1) << __AT__ << " called";
int i;
//create headers for eiger
#ifdef WRITE_HEADERS
if (myDetectorType == EIGER && cbAction == DO_EVERYTHING)
createHeaders(wbuffer);
#endif
//if write enabled
if((fileWriteEnable) && (sfilefd)){
int offset = HEADER_SIZE_NUM_TOT_PACKETS; //offset (not eiger) to keep track of how many packets saved
int packetsToSave; //how many packets to save at a time
volatile uint64_t tempframenumber;
int lastpacket;
//loop to take care of creating new files when it reaches max packets per file
while(numpackets > 0){
//to create new file when max reached
packetsToSave = maxPacketsPerFile - packetsInFile;
if(packetsToSave > numpackets)
packetsToSave = numpackets;
//write to file
if(cbAction == DO_EVERYTHING){
switch(myDetectorType){
case EIGER:
for(i=0; i<packetsToSave; ++i)
fwrite((void*)wbuffer[i], 1, onePacketSize, sfilefd);
break;
default:
fwrite(wbuffer[0] + offset, 1, packetsToSave * onePacketSize, sfilefd);
break;
}
}
//update parameters
packetsInFile += packetsToSave;
#ifdef DEBUG4
cprintf(GREEN,"Writing Thread: packetsCaught till now:%d packetsToSave:%d numMissingPackets:%d packetsCaught now:%d\n",
packetsCaught,packetsToSave,numMissingPackets,(packetsToSave - numMissingPackets));
#endif
packetsCaught += (packetsToSave - numMissingPackets);
totalPacketsCaught += (packetsToSave - numMissingPackets);
numMissingPackets = 0;
#ifdef DEBUG4
cprintf(GREEN,"Writing Thread: packetscaught:%d totalPacketsCaught:%d\n", packetsCaught,totalPacketsCaught);
#endif
//new file
if(packetsInFile >= (uint32_t)maxPacketsPerFile){
//for packet loss, because currframenum is the latest one for eiger
if(myDetectorType != EIGER){
lastpacket = (((packetsToSave - 1) * onePacketSize) + offset);
//get frame number (eiger already gets it when it does packet to packet processing)
tempframenumber = ((uint32_t)(*((uint32_t*)(wbuffer[0] + lastpacket))));
//for gotthard and normal frame, increment frame number to separate fnum and pnum
if (myDetectorType == PROPIX ||(myDetectorType == GOTTHARD && shortFrameEnable == -1))
tempframenumber++;
//get frame number
currentFrameNumber = (tempframenumber & frameIndexMask) >> frameIndexOffset;
//set indices
acquisitionIndex = currentFrameNumber - startAcquisitionIndex;
frameIndex = currentFrameNumber - startFrameIndex;
}
#ifdef DEBUG3
cprintf(GREEN,"Writing_Thread: Current Frame Number:%d\n",currentFrameNumber);
#endif
createNewFile();
}
//increase offset
if(myDetectorType != EIGER)
offset += (packetsToSave * onePacketSize);
numpackets -= packetsToSave;
}
}
//only update parameters
else{
if(numberofWriterThreads > 1) pthread_mutex_lock(&writeMutex);
packetsInFile += numpackets;
packetsCaught += (numpackets - numMissingPackets);
totalPacketsCaught += (numpackets - numMissingPackets);
numMissingPackets = 0;
if(numberofWriterThreads > 1) pthread_mutex_unlock(&writeMutex);
}
}
void UDPStandardImplementation::createHeaders(char* wbuffer[]){
eiger_packet_header_t* wbuf_header=0;
eiger_packet_footer_t* wbuf_footer=0;
int port = 0, missingPacket;
for (int i = 0; i < packetsPerFrame; i++){
wbuf_header = (eiger_packet_header_t*) wbuffer[i];
wbuf_footer = (eiger_packet_footer_t*)(wbuffer[i] + footerOffset);
#ifdef DEBUG4
cprintf(GREEN, "Loop index:%d Pnum:%d\n",i,*( (uint16_t*) wbuf_footer->packetNumber));
#endif
//which port
if (i ==(packetsPerFrame/2)) port = 1;
//missing packet
if (*( (uint16_t*) wbuf_header->missingPacket)== missingPacketValue){
#ifdef DEBUG4
cprintf(GREEN,"Missing packet at %d\n", i+1);
#endif
missingPacket = 1;
//add frame and packet numbers
*( (uint64_t*) wbuf_footer) = (uint64_t)((currentFrameNumber+1));
*( (uint16_t*) wbuf_footer->packetNumber) = (i+1);
}
//normal packet
else{
missingPacket = 0;
//correct the packet numbers of port2 so that port1 and 2 are not the same
if(port) *( (uint16_t*) wbuf_footer->packetNumber) = (*( (uint16_t*) wbuf_footer->packetNumber))+(packetsPerFrame/2);
}
//DEBUGGING
if(*( (uint16_t*) wbuf_footer->packetNumber) != (i+1)){
cprintf(BG_RED, "Packet Number Mismatch! i:%d pnum:%d fnum:%d missingPacket:%d\n",
i,*( (uint16_t*) wbuf_footer->packetNumber),currentFrameNumber,*( (uint16_t*) wbuf_header->missingPacket));
exit(-1);
}
//overwriting port number and dynamic range
*( (uint8_t*) wbuf_header->portIndex) = port;
*( (uint8_t*) wbuf_header->dynamicRange) = dynamicRange;
}
}
void UDPStandardImplementation::copyFrameToGui(char* buffer[]){
FILE_LOG(logDEBUG1) << __AT__ << " called";
int i;
//random read (gui not ready)
//need to toggle guiDataReady or the second frame wont be copied
if((!FrameToGuiFrequency) && (!guiData)){
#ifdef DEBUG4
cprintf(GREEN,"Writing_Thread: CopyingFrame: Resetting guiDataReady\n");
#endif
pthread_mutex_lock(&dataReadyMutex);
guiDataReady=0;
pthread_mutex_unlock(&dataReadyMutex);
}
//random read or nth frame read, gui needs data now or it is the first frame
//random read (gui ready) or nth frame read: gui needs data now or it is the first frame
else{
#ifdef VERY_VERY_DEBUG
cprintf(GREEN,"gui needs data now or 1st frame\n");
#ifdef DEBUG4
cprintf(GREEN,"Writing_Thread: CopyingFrame: Gui needs data now OR 1st frame\n");
#endif
pthread_mutex_lock(&dataReadyMutex);
guiDataReady=0;
#ifdef VERY_VERY_DEBUG
cprintf(GREEN,"guidataready is 0, copying data\n");
#ifdef DEBUG4
cprintf(GREEN,"Writing_Thread: CopyingFrame: guidataready is 0, Copying data\n");
#endif
//eiger
if(startbuf != NULL){
switch(myDetectorType){
case EIGER:
for(int i=0; i<packetsPerFrame; ++i)
memcpy((((char*)latestData)+i * onePacketSize) ,buffer[i],onePacketSize);
break;
default:
memcpy(latestData,buffer[0] + HEADER_SIZE_NUM_TOT_PACKETS,bufferSize);
break;
}
for(int j=0;j<packetsPerFrame;++j)
memcpy((((char*)latestData)+j * onePacketSize) ,startbuf[j],onePacketSize);
}else//other detectors
memcpy(latestData,buf,bufferSize);
strcpy(guiFileName,savefilename);
strcpy(guiFileName,completeFileName);
guiDataReady=1;
pthread_mutex_unlock(&dataReadyMutex);
#ifdef VERY_VERY_DEBUG
cprintf(GREEN,"guidataready = 1\n");
#ifdef DEBUG4
cprintf(GREEN,"Writing_Thread: CopyingFrame: Copied Data, guidataready is 1\n");
#endif
//nth frame read, block current process if the guireader hasnt read it yet
if(nFrameToGui){
#ifdef VERY_VERY_DEBUG
cprintf(GREEN,"waiting after copying\n");
if(FrameToGuiFrequency){
#ifdef DEBUG4
cprintf(GREEN,"Writing_Thread: CopyingFrame: Waiting after copying\n");
#endif
sem_wait(&smp);
#ifdef VERY_VERY_DEBUG
cprintf(GREEN,"done waiting\n");
sem_wait(&writerGuiSemaphore);
#ifdef DEBUG4
cprintf(GREEN,"Writing_Thread: CopyingFrame: Done waiting\n");
#endif
}
}
}
void UDPStandardImplementation::processWritingBufferPacketByPacket(int ithread, char* wbuffer[], uint32_t nP[]){
FILE_LOG(logDEBUG1) << __AT__ << " called";
}
@ -2554,316 +2752,7 @@ int UDPStandardImplementation::startWriting(){
void UDPStandardImplementation::stopWriting(int ithread, char* wbuffer[]){
FILE_LOG(logDEBUG1) << __AT__ << " called";
cprintf(GREEN,"%d End of Acquisition for Writing Thread\n",ithread);
int i;
//free fifo
for(i=0;i<numListeningThreads;++i){
while(!fifoFree[i]->push(wbuffer[i]));
#ifdef FIFO_DEBUG
cprintf(GREEN,"%d writer free dummy pushed into fifofree %x for listener %d\n", ithread,(void*)(wbuffer[i]),i);
#endif
}
//all threads need to close file, reset mask and exit loop
closeFile(ithread);
pthread_mutex_lock(&statusMutex);
writerthreads_mask^=(1<<ithread);
#ifdef VERYDEBUG
cprintf(GREEN,"%d Resetting mask of current writing thread. New Mask: 0x%x\n", ithread,writerthreads_mask );
#endif
pthread_mutex_unlock(&statusMutex);
//only thread 0 needs to do this
//check if all jobs are done and wait
//change status to run finished
if(ithread == 0){
if(dataCompression){
cprintf(GREEN,"%d Waiting for jobs to be done.. current mask:0x%x\n",ithread, writerthreads_mask);
while(writerthreads_mask){
/*cout << "." << flush;*/
usleep(50000);
}
cprintf(GREEN," Jobs Done!\n");
}
//to make sure listening threads are done before you update status, as that returns to client
while(listeningthreads_mask)
usleep(5000);
//update status
pthread_mutex_lock(&statusMutex);
status = RUN_FINISHED;
pthread_mutex_unlock(&(statusMutex));
//report
cprintf(GREEN, "Status: Run Finished\n");
if(!totalPacketsCaught){
cprintf(RED, "Total Missing Packets padded:%d\n",numTotMissingPackets);
cprintf(RED, "Total Packets Caught: 0\n");
cprintf(RED, "Total Frames Caught: 0\n");
}else{
cprintf(GREEN, "Total Missing Packets padded:%d\n",numTotMissingPackets);
cprintf(GREEN, "Total Packets Caught:%d\n", totalPacketsCaught);
cprintf(GREEN, "Total Frames Caught:%d\n",(totalPacketsCaught/packetsPerFrame));
}
//acquisition end
if (acquisitionFinishedCallBack)
acquisitionFinishedCallBack((totalPacketsCaught/packetsPerFrame), pAcquisitionFinished);
}
}
void UDPStandardImplementation::writeToFile_withoutCompression(char* buf[],int numpackets, uint32_t framenum){
FILE_LOG(logDEBUG1) << __AT__ << " called";
int packetsToSave, offset,lastpacket,i;
uint32_t tempframenum = framenum;
//file write
if((enableFileWrite) && (sfilefd)){
offset = HEADER_SIZE_NUM_TOT_PACKETS;
while(numpackets > 0){
//for progress and packet loss calculation(new files)
if(myDetectorType == EIGER);
else if ((myDetectorType == PROPIX)||((myDetectorType == GOTTHARD) && (shortFrame == -1)))
tempframenum = (((((uint32_t)(*((uint32_t*)(buf[0] + HEADER_SIZE_NUM_TOT_PACKETS))))+1)& (frameIndexMask)) >> frameIndexOffset);
else
tempframenum = ((((uint32_t)(*((uint32_t*)(buf[0] + HEADER_SIZE_NUM_TOT_PACKETS))))& (frameIndexMask)) >> frameIndexOffset);
if(numWriterThreads == 1)
currframenum = tempframenum;
else{
if(tempframenum > currframenum)
currframenum = tempframenum;
}
#ifdef VERYDEBUG
cout << "tempframenum:" << dec << tempframenum << " curframenum:" << currframenum << endl;
#endif
//lock
if(numWriterThreads > 1)
pthread_mutex_lock(&write_mutex);
//to create new file when max reached
packetsToSave = maxPacketsPerFile - packetsInFile;
if(packetsToSave > numpackets)
packetsToSave = numpackets;
/**next time offset is still plus header length*/
if(myDetectorType == EIGER){
for(i=0;i<packetsToSave;++i)
fwrite((void*)buf[i], 1, onePacketSize, sfilefd);
//fwrite((void*)buf, 1, packetsToSave * onePacketSize, sfilefd);
}else
fwrite(buf[0]+offset, 1, packetsToSave * onePacketSize, sfilefd);
packetsInFile += packetsToSave;
#ifdef EIGER_DEBUG3
cprintf(GREEN,"packetscaught earlier:%d packetstosave:%d numMissingPackets:%d addingon:%d\n",
packetsCaught,packetsToSave,numMissingPackets,(packetsToSave - numMissingPackets));
#endif
packetsCaught += (packetsToSave - numMissingPackets);
totalPacketsCaught += (packetsToSave - numMissingPackets);
numMissingPackets = 0;
#ifdef EIGER_DEBUG3
cprintf(GREEN,"packetscaught:%d\n", packetsCaught);
cprintf(GREEN,"totalPacketsCaught:%d\n", totalPacketsCaught);
#endif
//new file
if(packetsInFile >= (uint32_t)maxPacketsPerFile){
//for packet loss, because currframenum is the latest one for eiger
if(myDetectorType != EIGER){
lastpacket = (((packetsToSave - 1) * onePacketSize) + offset);
if ((myDetectorType == PROPIX)||((myDetectorType == GOTTHARD) && (shortFrame == -1)))
tempframenum = (((((uint32_t)(*((uint32_t*)(buf[0] + lastpacket))))+1)& (frameIndexMask)) >> frameIndexOffset);
else
tempframenum = ((((uint32_t)(*((uint32_t*)(buf[0] + lastpacket))))& (frameIndexMask)) >> frameIndexOffset);
}
if(numWriterThreads == 1)
currframenum = tempframenum;
else{
if(tempframenum > currframenum)
currframenum = tempframenum;
}
#ifdef VERYDEBUG
cout << "tempframenum:" << dec << tempframenum << " curframenum:" << currframenum << endl;
#endif
//create
createNewFile();
}
//unlock
if(numWriterThreads > 1)
pthread_mutex_unlock(&write_mutex);
if(myDetectorType != EIGER)
offset += (packetsToSave * onePacketSize);
numpackets -= packetsToSave;
}
}
else{
if(numWriterThreads > 1)
pthread_mutex_lock(&write_mutex);
packetsInFile += numpackets;
packetsCaught += (numpackets - numMissingPackets);
totalPacketsCaught += (numpackets - numMissingPackets);
numMissingPackets = 0;
if(numWriterThreads > 1)
pthread_mutex_unlock(&write_mutex);
}
}
void UDPStandardImplementation::handleWithoutDataCompression(int ithread, char* wbuffer[],int npackets){
int i, missingpacket,port = 0;
if (cbAction < DO_EVERYTHING){
if (myDetectorType == EIGER){
for(i=0;i<npackets;++i)
rawDataReadyCallBack(currframenum, wbuffer[i], onePacketSize, sfilefd, guiData,pRawDataReady);
}else
rawDataReadyCallBack(currframenum, wbuffer[0] + HEADER_SIZE_NUM_TOT_PACKETS, npackets * onePacketSize, sfilefd, guiData,pRawDataReady);
}
else {
if (npackets > 0){
#ifdef WRITE_HEADERS
if (myDetectorType == EIGER){
eiger_packet_header_t* wbuf_header=0;
eiger_packet_footer_t* wbuf_footer=0;
for (i = 0; i < packetsPerFrame; i++){
wbuf_header = (eiger_packet_header_t*) wbuffer[i];
wbuf_footer = (eiger_packet_footer_t*)(wbuffer[i] + footer_offset);
#ifdef EIGER_DEBUG3
cprintf(GREEN, "i:%d pnum:%d \n",i,*( (uint16_t*) wbuf_footer->packetnum));
#endif
//which port
if (i ==(packetsPerFrame/2))
port = 1;
//missing packet
if (*( (uint16_t*) wbuf_header->missingpacket)== missingPacketValue){
#ifdef VERY_VERBOSE
cprintf(GREEN,"missing packet at %d\n", i+1);
#endif
missingpacket = 1;
//add frame and packet numbers
*( (uint64_t*) wbuf_footer) = (uint64_t)((currframenum+1));
*( (uint16_t*) wbuf_footer->packetnum) = (i+1);
}else{
missingpacket = 0;
if(*( (uint16_t*) wbuf_footer->packetnum)!= (i-(port*packetsPerFrame/numListeningThreads))+1){
cprintf(BG_RED, "pnum mismatch num4! i:%d pnum:%d fnum:%d\n",
i,*( (uint16_t*) wbuf_footer->packetnum),currframenum);
exit(-1);
}
//move packet numbers to num2, and compensate for port1 starting pnum from 0
if(port)
*( (uint16_t*) wbuf_footer->packetnum) = (*( (uint16_t*) wbuf_footer->packetnum))+(packetsPerFrame/2);
}
if(*( (uint16_t*) wbuf_footer->packetnum) != (i+1)){
cprintf(BG_RED, "pnum mismatch! i:%d pnum:%d fnum:%d\n",
i,*( (uint16_t*) wbuf_footer->packetnum),currframenum);
if (*( (uint16_t*) wbuf_header->missingpacket) == missingPacketValue)
cprintf(BG_RED,"missing packet though\n");
exit(-1);
}
//overwriting port number and dynamic range
*( (uint8_t*) wbuf_header->portnum) = port;
*( (uint8_t*) wbuf_header->dynamicrange) = dynamicRange;
#ifdef VERYDEBUG
if((i==0)||(i==1)){
cprintf(GREEN, "%d packet header:0x%016llx missingpacket:0x%x\n",i,
(uint64_t)()*( (uint64_t*) wbuf_header)), *( (uint16_t*) wbuf_header->missingpacket));
cprintf(GREEN, "%d - 0x%x - %d\n", i,
*( (uint16_t*) wbuf_header->missingpacket), *( (uint16_t*) wbuf_footer->packetnum));
}
#endif
}
}
#endif
writeToFile_withoutCompression(wbuffer, npackets,currframenum);
}
#ifdef VERYDEBUG
cprintf(GREEN,"written everyting\n");
#endif
}
if(myDetectorType == EIGER) {
#ifdef VERYDEBUG
cprintf(GREEN,"gonna copy frame\n");
#endif
copyFrameToGui(wbuffer);
#ifdef VERYDEBUG
cprintf(GREEN,"copied frame\n");
#endif
}else{
//copy to gui
if(npackets >= packetsPerFrame){//min 1 frame, but neednt be
//if(npackets == packetsPerFrame * numJobsPerThread){ //only full frames
copyFrameToGui(NULL,wbuffer[0]+HEADER_SIZE_NUM_TOT_PACKETS);
#ifdef VERYVERBOSE
cout << ithread << " finished copying" << endl;
#endif
}
//else cout << "unfinished buffersize" << endl;
while(!fifoFree[0]->push(wbuffer[0]));
#ifdef FIFO_DEBUG
cprintf(GREEN,"%d writer freed pushed into fifofree %x for listener 0\n",ithread, (void*)(wbuffer[0]));
#endif
}
}
@ -2875,6 +2764,22 @@ void UDPStandardImplementation::handleWithoutDataCompression(int ithread, char*
void UDPStandardImplementation::handleDataCompression(int ithread, char* wbuffer[], char* data, int xmax, int ymax, int &nf){
FILE_LOG(logDEBUG1) << __AT__ << " called";
uint64_t tempframenumber = ((uint32_t)(*((uint32_t*)(wbuffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS))));
//for gotthard and normal frame, increment frame number to separate fnum and pnum
if (myDetectorType == PROPIX ||(myDetectorType == GOTTHARD && shortFrameEnable == -1))
tempframenumber++;
//get frame number
tempframenumber = (tempframenumber & frameIndexMask) >> frameIndexOffset;
pthread_mutex_lock(&progressMutex);
if(tempframenumber > currentFrameNumber)
currentFrameNumber = tempframenumber;
pthread_mutex_unlock(&progressMutex);
//set indices
acquisitionIndex = currentFrameNumber - startAcquisitionIndex;
frameIndex = currentFrameNumber - startFrameIndex;
#if defined(MYROOT1) && defined(ALLFILE_DEBUG)
writeToFile_withoutCompression(wbuf[0], numpackets,currframenum);
#endif