This commit is contained in:
Dhanya Maliakal 2016-09-02 15:47:28 +02:00
parent 5b3ab9a2b4
commit 4eceb3b5f7
4 changed files with 139 additions and 127 deletions

View File

@ -220,6 +220,8 @@ class UDPStandardImplementation: private virtual slsReceiverDefs, public UDPBase
*/ */
void readFrame(int ithread, char* c,char** raw, int64_t &startAcq, int64_t &startFrame); void readFrame(int ithread, char* c,char** raw, int64_t &startAcq, int64_t &startFrame);
void resetGuiPointer(int ithread);
/** /**
* Overridden method * Overridden method
* Closes file / all files(data compression involves multiple files) * Closes file / all files(data compression involves multiple files)
@ -456,8 +458,9 @@ private:
* Uses semaphore for nth frame mode * Uses semaphore for nth frame mode
* @param ithread writer thread index * @param ithread writer thread index
* @param buffer buffer to copy * @param buffer buffer to copy
* @param numpackets number of packets to copy
*/ */
void copyFrameToGui(int ithread, char* buffer); void copyFrameToGui(int ithread, char* buffer, uint32_t numpackets);
void waitWritingBufferForNextAcquisition(int ithread); void waitWritingBufferForNextAcquisition(int ithread);
@ -589,6 +592,9 @@ private:
/** packets in current file */ /** packets in current file */
uint64_t totalPacketsInFile[MAX_NUMBER_OF_WRITER_THREADS]; uint64_t totalPacketsInFile[MAX_NUMBER_OF_WRITER_THREADS];
/**Total packet count written by each writing thread */
uint64_t totalWritingPacketCount[MAX_NUMBER_OF_LISTENING_THREADS];

View File

@ -436,6 +436,7 @@ void UDPBaseImplementation::readFrame(int ithread, char* c,char** raw, int64_t &
FILE_LOG(logERROR) << __AT__ << " must be overridden by child classes"; FILE_LOG(logERROR) << __AT__ << " must be overridden by child classes";
} }
//FIXME: needed, isnt stopReceiver enough? //FIXME: needed, isnt stopReceiver enough?
void UDPBaseImplementation::abort(){ void UDPBaseImplementation::abort(){
FILE_LOG(logWARNING) << __AT__ << " doing nothing..."; FILE_LOG(logWARNING) << __AT__ << " doing nothing...";

View File

@ -161,6 +161,7 @@ void UDPStandardImplementation::initializeMembers(){
frameNumberInPreviousFile[i] = -1; frameNumberInPreviousFile[i] = -1;
lastFrameNumberInFile[i] = -1; lastFrameNumberInFile[i] = -1;
totalPacketsInFile[i] = 0; totalPacketsInFile[i] = 0;
totalWritingPacketCount[i] = 0;
} }
@ -849,6 +850,7 @@ int UDPStandardImplementation::startReceiver(char *c){
//reset file parameters //reset file parameters
lastFrameNumberInFile[i] = -1; lastFrameNumberInFile[i] = -1;
totalPacketsInFile[i] = 0; totalPacketsInFile[i] = 0;
totalWritingPacketCount[i] = 0;
if(sfilefd[i]){ if(sfilefd[i]){
fclose(sfilefd[i]); fclose(sfilefd[i]);
sfilefd[i] = NULL; sfilefd[i] = NULL;
@ -1015,7 +1017,7 @@ void UDPStandardImplementation::startReadout(){
//wait as long as there is change from prev totalP, //wait as long as there is change from prev totalP,
//and also change from received in buffer to previous value //and also change from received in buffer to previous value
//(as one listens to many at a time, shouldnt cut off in between) //(as one listens to many at a time, shouldnt cut off in between)
while((prev != totalP) && (prevReceivedInBuffer!= currentReceivedInBuffer)){ while((prev != totalP) || (prevReceivedInBuffer!= currentReceivedInBuffer)){
#ifdef DEBUG5 #ifdef DEBUG5
cprintf(MAGENTA,"waiting for all packets totalP:%d currently in buffer:%d\n",totalP,currentReceivedInBuffer); cprintf(MAGENTA,"waiting for all packets totalP:%d currently in buffer:%d\n",totalP,currentReceivedInBuffer);
@ -1031,7 +1033,12 @@ void UDPStandardImplementation::startReadout(){
currentReceivedInBuffer = 0; currentReceivedInBuffer = 0;
for(i=0; i<numberofListeningThreads; ++i) for(i=0; i<numberofListeningThreads; ++i)
currentReceivedInBuffer += udpSocket[i]->getCurrentTotalReceived(); currentReceivedInBuffer += udpSocket[i]->getCurrentTotalReceived();
#ifdef DEBUG5
cprintf(MAGENTA,"\tupdated: totalP:%d currently in buffer:%d\n",totalP,currentReceivedInBuffer);
#endif
} }
} }
//set status //set status
@ -1080,6 +1087,7 @@ void UDPStandardImplementation::readFrame(int ithread, char* c,char** raw, int64
cprintf(CYAN,"Info: gui data ready\n"); cprintf(CYAN,"Info: gui data ready\n");
#endif #endif
*raw = guiData[ithread]; *raw = guiData[ithread];
guiData[ithread] = NULL; guiData[ithread] = NULL;
//for nth frame to gui, post semaphore so writer stops waiting //for nth frame to gui, post semaphore so writer stops waiting
@ -1097,7 +1105,25 @@ void UDPStandardImplementation::readFrame(int ithread, char* c,char** raw, int64
} }
} }
/*
void UDPStandardImplementation::resetGuiPointer(int ithread){
FILE_LOG(logDEBUG) << __AT__ << " called";
guiData[ithread] = NULL;
//for nth frame to gui, post semaphore so writer stops waiting
if((FrameToGuiFrequency) && (writerThreadsMask)){
#ifdef DEBUG4
cprintf(CYAN,"Info: gonna post\n");
#endif
//release after getting data
sem_post(&writerGuiSemaphore[ithread]);
}
#ifdef DEBUG4
cprintf(CYAN,"Info: done post\n");
#endif
}
*/
void UDPStandardImplementation::closeFile(int ithread){ void UDPStandardImplementation::closeFile(int ithread){
FILE_LOG(logDEBUG) << __AT__ << " called for " << ithread ; FILE_LOG(logDEBUG) << __AT__ << " called for " << ithread ;
@ -1408,7 +1434,7 @@ int UDPStandardImplementation::createNewFile(int ithread){
FILE_LOG(logDEBUG) << __AT__ << " called"; FILE_LOG(logDEBUG) << __AT__ << " called";
int index = 0; int index = 0;
if(packetsCaught) if(totalWritingPacketCount[ithread])
index = frameIndex[ithread]; index = frameIndex[ithread];
//create file name //create file name
@ -1445,7 +1471,7 @@ int UDPStandardImplementation::createNewFile(int ithread){
//Print packet loss and filenames //Print packet loss and filenames
if(!packetsCaught){ if(!totalWritingPacketCount[ithread]){
frameNumberInPreviousFile[ithread] = -1; frameNumberInPreviousFile[ithread] = -1;
cout << "Thread " << ithread << " File:" << completeFileName[ithread] << endl; cout << "Thread " << ithread << " File:" << completeFileName[ithread] << endl;
}else{ }else{
@ -1458,7 +1484,8 @@ int UDPStandardImplementation::createNewFile(int ithread){
<< "\tPacket Loss: " << setw(4)<<fixed << setprecision(4) << dec << << "\tPacket Loss: " << setw(4)<<fixed << setprecision(4) << dec <<
(int)((( ((currentFrameNumber[ithread]-1)-frameNumberInPreviousFile[ithread]) - ((totalPacketsInFile[ithread])/packetsPerFrame))/ (int)((( ((currentFrameNumber[ithread]-1)-frameNumberInPreviousFile[ithread]) - ((totalPacketsInFile[ithread])/packetsPerFrame))/
(double)((currentFrameNumber[ithread]-1)-frameNumberInPreviousFile[ithread]))*100.000) (double)((currentFrameNumber[ithread]-1)-frameNumberInPreviousFile[ithread]))*100.000)
<< "%\tFrame Number: " << currentFrameNumber[ithread] << "\tFrame Number: " << currentFrameNumber[ithread]
<< "\tTotal Packets in File: " << totalPacketsInFile[ithread]
// << "\t\t frameNumberInPreviousFile: " << frameNumberInPreviousFile[ithread] // << "\t\t frameNumberInPreviousFile: " << frameNumberInPreviousFile[ithread]
// << "\tIndex " << dec << index // << "\tIndex " << dec << index
<< "\tPackets Lost " << dec << ( ((int)((currentFrameNumber[ithread]-1)-frameNumberInPreviousFile[ithread])) - << "\tPackets Lost " << dec << ( ((int)((currentFrameNumber[ithread]-1)-frameNumberInPreviousFile[ithread])) -
@ -1473,7 +1500,7 @@ int UDPStandardImplementation::createNewFile(int ithread){
} }
//reset counters for each new file //reset counters for each new file
if(packetsCaught){ if(totalWritingPacketCount[ithread]){
frameNumberInPreviousFile[ithread] = currentFrameNumber[ithread]; frameNumberInPreviousFile[ithread] = currentFrameNumber[ithread];
totalPacketsInFile[ithread] = 0; totalPacketsInFile[ithread] = 0;
} }
@ -1660,7 +1687,7 @@ int UDPStandardImplementation::prepareAndListenBuffer(int ithread, int cSize, ch
totalListeningPacketCount[ithread] += (receivedSize/onePacketSize); totalListeningPacketCount[ithread] += (receivedSize/onePacketSize);
//#ifdef MANUALDEBUG #ifdef MANUALDEBUG
if(receivedSize>0){ if(receivedSize>0){
if(myDetectorType == JUNGFRAU){ if(myDetectorType == JUNGFRAU){
jfrau_packet_header_t* header; jfrau_packet_header_t* header;
@ -1681,7 +1708,7 @@ int UDPStandardImplementation::prepareAndListenBuffer(int ithread, int cSize, ch
(uint32_t)(*( (uint64_t*) footer))); (uint32_t)(*( (uint64_t*) footer)));
} }
} }
//#endif #endif
#ifdef DEBUG #ifdef DEBUG
cprintf(BLUE, "Listening_Thread %d : Received bytes: %d. Expected bytes: %d\n", ithread, receivedSize, bufferSize * numberofJobsPerBuffer-cSize); cprintf(BLUE, "Listening_Thread %d : Received bytes: %d. Expected bytes: %d\n", ithread, receivedSize, bufferSize * numberofJobsPerBuffer-cSize);
#endif #endif
@ -2202,31 +2229,33 @@ void UDPStandardImplementation::stopWriting(int ithread, char* wbuffer){
//statistics //statistics
FILE_LOG(logINFO) << "Status: Run Finished"; FILE_LOG(logINFO) << "Status: Run Finished";
if(totalPacketsCaught < ((uint64_t)numberOfFrames*packetsPerFrame*numberofListeningThreads)){
cprintf(RED, "Total Missing Packets: %lld\n",(long long int)numberOfFrames*packetsPerFrame*numberofListeningThreads-totalPacketsCaught);
cprintf(RED, "Total Packets Caught: %lld\n",(long long int)totalPacketsCaught);
cprintf(RED, "Total Frames Caught: %lld\n",(long long int)(totalPacketsCaught/(packetsPerFrame*numberofListeningThreads)));
int64_t lastFrameNumber = 0;
for(int i=0;i<numberofListeningThreads;i++){ for(int i=0;i<numberofListeningThreads;i++){
if(totalWritingPacketCount[i] < ((uint64_t)numberOfFrames*packetsPerFrame)){
cprintf(RED, "\nPort %d\n",udpPortNum[i]);
cprintf(RED, "Missing Packets \t: %lld\n",(long long int)numberOfFrames*packetsPerFrame-totalWritingPacketCount[i]);
cprintf(RED, "Packets Caught \t\t: %lld\n",(long long int)totalWritingPacketCount[i]);
cprintf(RED, "Frames Caught \t\t: %lld\n",(long long int)(totalWritingPacketCount[i]/packetsPerFrame));
int64_t lastFrameNumber = 0;
lastFrameNumber = lastFrameNumberInFile[i] - startFrameIndex;//lastFrameNumberInFile updated even if not written lastFrameNumber = lastFrameNumberInFile[i] - startFrameIndex;//lastFrameNumberInFile updated even if not written
if(myDetectorType == EIGER) if(myDetectorType == EIGER)
lastFrameNumber+= 1; lastFrameNumber+= 1;
cprintf(RED, "Last Frame Number aught at Port %d: %lld\n",udpPortNum[i],(long long int)lastFrameNumber); cprintf(RED, "Last Frame Number Caught :%lld\n",(long long int)lastFrameNumber);
}
cout<<endl;
}else{ }else{
cprintf(GREEN, "Total Missing Packets: %lld\n",(long long int)numberOfFrames*packetsPerFrame*numberofListeningThreads-totalPacketsCaught); cprintf(GREEN, "\nPort %d\n",udpPortNum[i]);
cprintf(GREEN, "Total Packets Caught: %lld\n",(long long int)totalPacketsCaught); cprintf(GREEN, "Missing Packets \t: %lld\n",(long long int)numberOfFrames*packetsPerFrame-totalWritingPacketCount[i]);
cprintf(GREEN, "Total Frames Caught: %lld\n",(long long int)(totalPacketsCaught/(packetsPerFrame*numberofListeningThreads))); cprintf(GREEN, "Packets Caught \t\t: %lld\n",(long long int)totalWritingPacketCount[i]);
cprintf(GREEN, "Frames Caught \t\t: %lld\n",(long long int)(totalWritingPacketCount[i]/packetsPerFrame));
int64_t lastFrameNumber = 0; int64_t lastFrameNumber = 0;
for(int i=0;i<numberofListeningThreads;i++){
lastFrameNumber = lastFrameNumberInFile[i] - startFrameIndex;//lastFrameNumberInFile updated even if not written lastFrameNumber = lastFrameNumberInFile[i] - startFrameIndex;//lastFrameNumberInFile updated even if not written
if(myDetectorType == EIGER) if(myDetectorType == EIGER)
lastFrameNumber+= 1; lastFrameNumber+= 1;
cprintf(GREEN, "Last Frame Number Caught at Port %d: %lld\n",udpPortNum[i],(long long int)lastFrameNumber); cprintf(GREEN, "Last Frame Number Caught: %lld\n",(long long int)lastFrameNumber);
} }
cout<<endl;
} }
//acquisition end //acquisition end
if (acquisitionFinishedCallBack) if (acquisitionFinishedCallBack)
acquisitionFinishedCallBack((int)totalPacketsCaught, pAcquisitionFinished); acquisitionFinishedCallBack((int)totalPacketsCaught, pAcquisitionFinished);
@ -2275,8 +2304,9 @@ void UDPStandardImplementation::handleWithoutDataCompression(int ithread, char*
//copy frame for gui //copy frame for gui
if(npackets >= packetsPerFrame)/**needs to be reworked*/ //if(npackets >= (packetsPerFrame/numberofListeningThreads))
copyFrameToGui(ithread, wbuffer); if(npackets)
copyFrameToGui(ithread, wbuffer,npackets);
#ifdef DEBUG4 #ifdef DEBUG4
cprintf(GREEN,"Writing_Thread: Copied frame\n"); cprintf(GREEN,"Writing_Thread: Copied frame\n");
#endif #endif
@ -2331,6 +2361,7 @@ void UDPStandardImplementation::writeFileWithoutCompression(int ithread, char* w
//update stats //update stats
numpackets -= packetsWritten; numpackets -= packetsWritten;
totalPacketsInFile[ithread] += packetsWritten; totalPacketsInFile[ithread] += packetsWritten;
totalWritingPacketCount[ithread] += packetsWritten;
pthread_mutex_lock(&writeMutex); pthread_mutex_lock(&writeMutex);
packetsCaught += packetsWritten; packetsCaught += packetsWritten;
totalPacketsCaught += packetsWritten; totalPacketsCaught += packetsWritten;
@ -2342,9 +2373,9 @@ void UDPStandardImplementation::writeFileWithoutCompression(int ithread, char* w
while(numpackets){ while(numpackets){
//new file //new file
//create new file only if something has been written and modulus works //create new file only if something has been written and modulus works
if((lastFrameNumberInFile[ithread]>=0) &&(!((lastFrameNumberInFile[ithread]+1) % maxFramesPerFile))){ if((lastFrameNumberInFile[ithread]>=0) &&(!((lastFrameNumberInFile[ithread]+1) % maxFramesPerFile)))
createNewFile(ithread); createNewFile(ithread);
}
//frames to save in one file //frames to save in one file
nextFileFrameNumber = (lastFrameNumberInFile[ithread]+1) + nextFileFrameNumber = (lastFrameNumberInFile[ithread]+1) +
@ -2357,6 +2388,7 @@ void UDPStandardImplementation::writeFileWithoutCompression(int ithread, char* w
//update stats //update stats
numpackets -= packetsWritten; numpackets -= packetsWritten;
totalPacketsInFile[ithread] += packetsWritten; totalPacketsInFile[ithread] += packetsWritten;
totalWritingPacketCount[ithread] += packetsWritten;
pthread_mutex_lock(&writeMutex); pthread_mutex_lock(&writeMutex);
packetsCaught += packetsWritten; packetsCaught += packetsWritten;
totalPacketsCaught += packetsWritten; totalPacketsCaught += packetsWritten;
@ -2378,8 +2410,10 @@ void UDPStandardImplementation::writeFileWithoutCompression(int ithread, char* w
return; return;
} }
totalPacketsInFile[ithread] += numpackets; totalPacketsInFile[ithread] += numpackets;
totalWritingPacketCount[ithread] += numpackets;
lastFrameNumberInFile[ithread] = finalLastFrameNumberToSave; lastFrameNumberInFile[ithread] = finalLastFrameNumberToSave;
currentFrameNumber[ithread] = finalLastFrameNumberToSave; currentFrameNumber[ithread] = finalLastFrameNumberToSave;
} }
if(numberofWriterThreads > 1) pthread_mutex_lock(&writeMutex); if(numberofWriterThreads > 1) pthread_mutex_lock(&writeMutex);
@ -2447,7 +2481,7 @@ void UDPStandardImplementation::updateFileHeader(int ithread){
} }
void UDPStandardImplementation::copyFrameToGui(int ithread, char* buffer){ void UDPStandardImplementation::copyFrameToGui(int ithread, char* buffer, uint32_t numpackets){
FILE_LOG(logDEBUG) << __AT__ << " called"; FILE_LOG(logDEBUG) << __AT__ << " called";
@ -2475,7 +2509,7 @@ void UDPStandardImplementation::copyFrameToGui(int ithread, char* buffer){
#ifdef DEBUG4 #ifdef DEBUG4
cprintf(GREEN,"Writing_Thread: CopyingFrame: guidataready is 0, Copying data\n"); cprintf(GREEN,"Writing_Thread: CopyingFrame: guidataready is 0, Copying data\n");
#endif #endif
memcpy(latestData[ithread],buffer + HEADER_SIZE_NUM_TOT_PACKETS,bufferSize); memcpy(latestData[ithread],buffer , numpackets*onePacketSize);
strcpy(guiFileName[ithread],completeFileName[ithread]); strcpy(guiFileName[ithread],completeFileName[ithread]);
guiDataReady[ithread]=1; guiDataReady[ithread]=1;
pthread_mutex_unlock(&dataReadyMutex); pthread_mutex_unlock(&dataReadyMutex);
@ -2612,6 +2646,7 @@ void UDPStandardImplementation::handleDataCompression(int ithread, char* wbuffer
#ifndef ALLFILE #ifndef ALLFILE
totalPacketsInFile[ithread] += (bufferSize/packetsPerFrame); totalPacketsInFile[ithread] += (bufferSize/packetsPerFrame);
totalWritingPacketCount[ithread] += (bufferSize/packetsPerFrame);
pthread_mutex_lock(&writeMutex); pthread_mutex_lock(&writeMutex);
if((packetsCaught%packetsPerFrame) >= (uint32_t)maxFramesPerFile) if((packetsCaught%packetsPerFrame) >= (uint32_t)maxFramesPerFile)
createNewFile(ithread); createNewFile(ithread);
@ -2622,7 +2657,7 @@ void UDPStandardImplementation::handleDataCompression(int ithread, char* wbuffer
#endif #endif
if(!once){ if(!once){
copyFrameToGui(ithread, buff[0]); copyFrameToGui(ithread, buff[0],(uint32_t)packetsPerFrame);
once = 1; once = 1;
} }
} }
@ -2662,7 +2697,6 @@ int UDPStandardImplementation::getFrameNumber(int ithread, char* wbuffer, uint64
if(!((uint32_t)(*( (uint64_t*) footer)))){ if(!((uint32_t)(*( (uint64_t*) footer)))){
tempframenumber = -1; tempframenumber = -1;
FILE_LOG(logERROR) << "Fifo "<< ithread << ": Frame Number is zero from firmware."; FILE_LOG(logERROR) << "Fifo "<< ithread << ": Frame Number is zero from firmware.";
exit(-1);
return FAIL; return FAIL;
} }
#ifdef DEBUG4 #ifdef DEBUG4
@ -2713,38 +2747,35 @@ int UDPStandardImplementation::writeUptoFrameNumber(int ithread, char* wbuffer,
FILE_LOG(logDEBUG) << __AT__ << " called"; FILE_LOG(logDEBUG) << __AT__ << " called";
//if(ithread) cout<<"at writeUptoFrameNumber " << nextFrameNumber<< endl; //if(ithread) cout<<"at writeUptoFrameNumber " << nextFrameNumber<< endl;
int bigIncrements = onePacketSize * packetsPerFrame; //a frame at a time
if(numberofJobsPerBuffer == 1) bigIncrements = onePacketSize; //a packet at a time as we listen to only one frame in a buffer
int startoffset = offset; int startoffset = offset;
int endoffset = startoffset + numpackets * onePacketSize; int endoffset = startoffset + numpackets * onePacketSize;
int expectedoffset = startoffset + ((nextFrameNumber - (lastFrameNumberInFile[ithread]+1)) * onePacketSize * packetsPerFrame);
bool expectedoffsetATlastpacket = false;
if(expectedoffset >= endoffset){
expectedoffset = startoffset + ((numpackets -1) * onePacketSize);
expectedoffsetATlastpacket = true;
}
offset = expectedoffset;
//get frame number at expected offset
uint64_t tempframenumber=-1; uint64_t tempframenumber=-1;
uint64_t frameNumberWritten=-1;//if(ithread) cout<<"frame number at expected ofset"<<endl; offset = endoffset;
if(getFrameNumber(ithread, wbuffer + expectedoffset, tempframenumber) == FAIL){
//get last frame number
if(getFrameNumber(ithread, wbuffer + (endoffset-onePacketSize), tempframenumber) == FAIL){
//error in frame number sent by fpga //error in frame number sent by fpga
while(!fifoFree[ithread]->push(wbuffer)); while(!fifoFree[ithread]->push(wbuffer));
return FAIL; return FAIL;
} }
//last packet's frame number < nextframenumber
if(tempframenumber<nextFrameNumber){
fwrite(wbuffer + startoffset, 1, offset-startoffset, sfilefd[ithread]);
numPacketsWritten += ((offset-startoffset)/onePacketSize);
lastFrameNumberInFile[ithread] = tempframenumber;
return OK;
}
//last packet in buffer does not reach the nextframenumber, write all
if(expectedoffsetATlastpacket && tempframenumber < nextFrameNumber){
frameNumberWritten = tempframenumber;
offset += onePacketSize;
}else{
//if tempframenumber is too high, go backwards fast (by frame) and then slowly (by each packet) frontwards
if(tempframenumber>=nextFrameNumber){ //somewhere in between
int bigIncrements = onePacketSize * packetsPerFrame * 10; //10 frames at a time
if(numberofJobsPerBuffer == 1) bigIncrements = onePacketSize; //a packet at a time as we listen to only one frame in a buffer
cout<<ithread<<" lastFrameNumberInFile:"<<lastFrameNumberInFile[ithread]<<endl;
cout<<ithread<<" nextFeame number:"<<nextFrameNumber<<endl;
cout<<ithread<<" tempframenumber:"<<tempframenumber<<endl;
while(tempframenumber>=nextFrameNumber){ while(tempframenumber>=nextFrameNumber){
offset -= bigIncrements; offset -= bigIncrements;
if(offset<startoffset) if(offset<startoffset)
@ -2771,45 +2802,11 @@ int UDPStandardImplementation::writeUptoFrameNumber(int ithread, char* wbuffer,
return FAIL; return FAIL;
} }
} }
}
//if tempframenumber is too low, go forwards fast (by frame) and then slowly (by each packet) backwards
else{
while(tempframenumber<nextFrameNumber){
offset += bigIncrements;
if(offset>endoffset)
break;//if(ithread) cout<<"frame number at going forwards fast f#:"<<tempframenumber<< " offset:"<<offset<<endl;
if(getFrameNumber(ithread, wbuffer + offset, tempframenumber) == FAIL){
//error in frame number sent by fpga
while(!fifoFree[ithread]->push(wbuffer));
return FAIL;
}
}
if(offset>endoffset){
offset = endoffset;//if(ithread) cout<<"frame number at offset>endoffset f#:"<<tempframenumber<< " offset:"<<offset<<endl;
if(getFrameNumber(ithread, wbuffer + offset, tempframenumber) == FAIL){
//error in frame number sent by fpga
while(!fifoFree[ithread]->push(wbuffer));
return FAIL;
}
}
while(tempframenumber>nextFrameNumber){
offset -= onePacketSize;//if(ithread) cout<<"frame number at going bacckwards slow f#:"<<tempframenumber<< " offset:"<<offset<<endl;
if(getFrameNumber(ithread, wbuffer + offset, tempframenumber) == FAIL){
//error in frame number sent by fpga
while(!fifoFree[ithread]->push(wbuffer));
return FAIL;
}
}
offset += onePacketSize;
}
frameNumberWritten = nextFrameNumber-1;
}
fwrite(wbuffer + startoffset, 1, offset-startoffset, sfilefd[ithread]); fwrite(wbuffer + startoffset, 1, offset-startoffset, sfilefd[ithread]);
numPacketsWritten += ((offset-startoffset)/onePacketSize); numPacketsWritten += ((offset-startoffset)/onePacketSize);
lastFrameNumberInFile[ithread] = frameNumberWritten; lastFrameNumberInFile[ithread] = (nextFrameNumber-1);
//if(ithread) cout<<"done with writeUptoFrameNumber" << endl; //if(ithread) cout<<"done with writeUptoFrameNumber" << endl;
return OK; return OK;
} }

View File

@ -1164,7 +1164,7 @@ int slsReceiverTCPIPInterface::moench_read_frame(){
else{ else{
bindex = ((uint32_t)(*((uint32_t*)raw))); bindex = ((uint32_t)(*((uint32_t*)raw)));
memcpy(origVal,raw,bufferSize); memcpy(origVal,raw + HEADER_SIZE_NUM_TOT_PACKETS,bufferSize);
raw=NULL; raw=NULL;
//************** packet number order********************** //************** packet number order**********************
@ -1369,7 +1369,7 @@ int slsReceiverTCPIPInterface::gotthard_read_frame(){
#endif #endif
} }
memcpy(origVal,raw,bufferSize); memcpy(origVal,raw + HEADER_SIZE_NUM_TOT_PACKETS,bufferSize);
raw=NULL; raw=NULL;
@ -1535,7 +1535,7 @@ int slsReceiverTCPIPInterface::propix_read_frame(){
cout << "index2:" << hex << index << endl; cout << "index2:" << hex << index << endl;
#endif #endif
memcpy(origVal,raw,bufferSize); memcpy(origVal,raw + HEADER_SIZE_NUM_TOT_PACKETS,bufferSize);
raw=NULL; raw=NULL;
/*//ignore if half frame is missing /*//ignore if half frame is missing
@ -1649,8 +1649,8 @@ int slsReceiverTCPIPInterface::eiger_read_frame(){
char* raw; char* raw;
char* origVal = new char[frameSize]; char* origVal = new char[frameSize];
char* retval = new char[dataSize]; char* retval = new char[dataSize];
memset(origVal,0xF,frameSize); memset(origVal,0xFF,frameSize);
memset(retval,0xF,dataSize); memset(retval,0xFF,dataSize);
int64_t startAcquisitionIndex=0; int64_t startAcquisitionIndex=0;
int64_t startFrameIndex=0; int64_t startFrameIndex=0;
@ -1677,6 +1677,9 @@ int slsReceiverTCPIPInterface::eiger_read_frame(){
// acq started // acq started
else{ else{
ret = OK; ret = OK;
int fnum[EIGER_MAX_PORTS];
for(int i=0;i<EIGER_MAX_PORTS;i++)
fnum[i] = -1;
//read a frame //read a frame
for(int i=0;i<EIGER_MAX_PORTS;i++){ for(int i=0;i<EIGER_MAX_PORTS;i++){
receiverBase->readFrame(i,fName,&raw,startAcquisitionIndex,startFrameIndex); receiverBase->readFrame(i,fName,&raw,startAcquisitionIndex,startFrameIndex);
@ -1684,27 +1687,28 @@ int slsReceiverTCPIPInterface::eiger_read_frame(){
if (raw == NULL){ if (raw == NULL){
startAcquisitionIndex = -1; startAcquisitionIndex = -1;
#ifdef VERYVERBOSE #ifdef VERYVERBOSE
cout<<"data not ready for gui yet"<<endl; cout<<"data not ready for gui yet for "<< i << endl;
break;
#endif #endif
raw=NULL;
break;
} }
else{ else{
eiger_packet_footer_t* wbuf_footer; eiger_packet_footer_t* wbuf_footer;
wbuf_footer = (eiger_packet_footer_t*)(raw + oneDataSize + sizeof(eiger_packet_header_t)); wbuf_footer = (eiger_packet_footer_t*)(raw + HEADER_SIZE_NUM_TOT_PACKETS + oneDataSize + sizeof(eiger_packet_header_t));
index =(uint32_t)(*( (uint64_t*) wbuf_footer)); index =(uint32_t)(*( (uint64_t*) wbuf_footer));
index += (startFrameIndex-1); index += (startFrameIndex-1);
fnum[i] = index;
if(dynamicrange == 32){ if(dynamicrange == 32){
eiger_packet_header_t* wbuf_header; eiger_packet_header_t* wbuf_header;
wbuf_header = (eiger_packet_header_t*) raw; wbuf_header = (eiger_packet_header_t*) (raw + HEADER_SIZE_NUM_TOT_PACKETS);
subframenumber = *( (uint32_t*) wbuf_header->subFrameNumber); subframenumber = *( (uint32_t*) wbuf_header->subFrameNumber);
} }
#ifdef VERYVERBOSE #ifdef VERYVERBOSE
cout << "index:" << dec << index << endl; cout << "index:" << dec << index << endl;
if(index>10000) exit(-1);
cout << "subframenumber:" << dec << subframenumber << endl; cout << "subframenumber:" << dec << subframenumber << endl;
#endif #endif
int numpackets = (uint32_t)(*( (uint32_t*) raw));
memcpy(((char*)origVal)+(i*onePacketSize*packetsPerFrame),raw,(frameSize/EIGER_MAX_PORTS)); memcpy(((char*)origVal)+(i*onePacketSize*packetsPerFrame),raw + HEADER_SIZE_NUM_TOT_PACKETS,numpackets*onePacketSize);
raw=NULL; raw=NULL;
} }
} }
@ -1712,8 +1716,12 @@ int slsReceiverTCPIPInterface::eiger_read_frame(){
if(startAcquisitionIndex != -1){ if(startAcquisitionIndex != -1){
//cout<<"**** got proper frame ******"<<endl; //cout<<"**** got proper frame ******"<<endl;
//let them continue
//for(int i=0;i<EIGER_MAX_PORTS;++i)
//receiverBase->resetGuiPointer(i);
if(fnum[0]!=fnum[1])
cprintf(BG_RED,"Fnums differ %d and %d\n",fnum[0],fnum[1]);
int c1=8;//first port int c1=8;//first port
int c2=(frameSize/2) + 8; //second port int c2=(frameSize/2) + 8; //second port
@ -1942,7 +1950,7 @@ int slsReceiverTCPIPInterface::jungfrau_read_frame(){
//proper frame //proper frame
else{ else{
//cout<<"**** got proper frame ******"<<endl; //cout<<"**** got proper frame ******"<<endl;
memcpy(origVal,raw,frameSize); memcpy(origVal,raw + HEADER_SIZE_NUM_TOT_PACKETS,frameSize);
raw=NULL; raw=NULL;
//fixed frame number //fixed frame number