separated

This commit is contained in:
Dhanya Maliakal 2016-08-31 17:23:44 +02:00
parent fbf6e2bff1
commit a3369a7d21
7 changed files with 205 additions and 181 deletions

View File

@ -398,12 +398,13 @@ class UDPBaseImplementation : protected virtual slsReceiverDefs, public UDPInter
/**
* Get the buffer-current frame read by receiver
* @param ithread port thread index
* @param c pointer to current file name
* @param raw address of pointer, pointing to current frame to send to gui
* @param startAcq start index of the acquisition
* @param startFrame start index of the scan
*/
void readFrame(char* c,char** raw, uint64_t &startAcq, uint64_t &startFrame);
void readFrame(int ithread, char* c,char** raw, int64_t &startAcq, int64_t &startFrame);
/**
* abort acquisition with minimum damage: close open files, cleanup.

View File

@ -455,12 +455,13 @@ class UDPInterface {
/**
* Get the buffer-current frame read by receiver
* @param ithread port thread index
* @param c pointer to current file name
* @param raw address of pointer, pointing to current frame to send to gui
* @param startAcq start index of the acquisition
* @param startFrame start index of the scan
*/
virtual void readFrame(char* c,char** raw, uint64_t &startAcq, uint64_t &startFrame)=0;
virtual void readFrame(int ithread, char* c,char** raw, int64_t &startAcq, int64_t &startFrame)=0;
/**
* abort acquisition with minimum damage: close open files, cleanup.

View File

@ -218,7 +218,7 @@ class UDPStandardImplementation: private virtual slsReceiverDefs, public UDPBase
* @param startAcq start index of the acquisition
* @param startFrame start index of the scan
*/
void readFrame(int ithread, char* c,char** raw, uint64_t &startAcq, uint64_t &startFrame);
void readFrame(int ithread, char* c,char** raw, int64_t &startAcq, int64_t &startFrame);
/**
* Overridden method
@ -571,8 +571,6 @@ private:
/** Previous Frame number from buffer to calculate loss */
int64_t frameNumberInPreviousFile[MAX_NUMBER_OF_WRITER_THREADS];
/** Last Frame Index Listened To */
int64_t lastFrameIndex[MAX_NUMBER_OF_WRITER_THREADS];
@ -586,7 +584,7 @@ private:
int totalListeningPacketCount[MAX_NUMBER_OF_LISTENING_THREADS];
/** Pckets currently in current file, starts new file when it reaches max */
uint64_t lastFrameNumberInFile[MAX_NUMBER_OF_WRITER_THREADS];
int64_t lastFrameNumberInFile[MAX_NUMBER_OF_WRITER_THREADS];
/** packets in current file */
uint64_t totalPacketsInFile[MAX_NUMBER_OF_WRITER_THREADS];

View File

@ -3,7 +3,7 @@
#define GENERIC_SOCKET_H
#include "ansi.h"
/**
@ -105,7 +105,8 @@ enum communicationProtocol{
packet_size(ps),
nsending(0),
nsent(0),
total_sent(0)// sender (client): where to? ip
total_sent(0),// sender (client): where to? ip
header_packet_size(0)
{
//memset(&serverAddress, 0, sizeof(sockaddr_in));
//memset(&clientAddress, 0, sizeof(sockaddr_in));
@ -161,7 +162,7 @@ enum communicationProtocol{
*/
genericSocket(unsigned short int const port_number, communicationProtocol p, int ps = DEFAULT_PACKET_SIZE, const char *eth=NULL):
genericSocket(unsigned short int const port_number, communicationProtocol p, int ps = DEFAULT_PACKET_SIZE, const char *eth=NULL, int hsize=0):
//portno(port_number),
protocol(p),
is_a_server(1),
@ -170,7 +171,8 @@ enum communicationProtocol{
packet_size(ps),
nsending(0),
nsent(0),
total_sent(0)
total_sent(0),
header_packet_size(hsize)
{
/* // you can specify an IP address: */
@ -616,7 +618,8 @@ enum communicationProtocol{
nsent = recvfrom(socketDescriptor,(char*)buf+total_sent,nsending, 0, (struct sockaddr *) &clientAddress, &clientAddress_length);
if(nsent < packet_size) {
if(nsent){
cout << "Incomplete Packet size " << nsent << endl;
if(nsent != header_packet_size)
cprintf(RED,"Incomplete Packet size %d\n",nsent);
}
break;
}
@ -690,6 +693,11 @@ enum communicationProtocol{
}
int getCurrentTotalReceived(){
return total_sent;
}
char lastClientIP[INET_ADDRSTRLEN];
char thisClientIP[INET_ADDRSTRLEN];
int differentClients;
@ -712,7 +720,7 @@ enum communicationProtocol{
int nsending;
int nsent;
int total_sent;
int header_packet_size;
// pthread_mutex_t mp;

View File

@ -431,7 +431,7 @@ int UDPBaseImplementation::shutDownUDPSockets(){
return OK;
}
void UDPBaseImplementation::readFrame(char* c,char** raw, uint64_t &startAcquisitionIndex, uint64_t &startFrameIndex){
void UDPBaseImplementation::readFrame(int ithread, char* c,char** raw, int64_t &startAcquisitionIndex, int64_t &startFrameIndex){
FILE_LOG(logWARNING) << __AT__ << " doing nothing...";
FILE_LOG(logERROR) << __AT__ << " must be overridden by child classes";
}

View File

@ -159,8 +159,7 @@ void UDPStandardImplementation::initializeMembers(){
frameIndex[i] = 0;
currentFrameNumber[i] = 0;
frameNumberInPreviousFile[i] = -1;
lastFrameIndex[i] = 0;
lastFrameNumberInFile[i] = 0;
lastFrameNumberInFile[i] = -1;
totalPacketsInFile[i] = 0;
}
@ -551,7 +550,6 @@ int UDPStandardImplementation::setDynamicRange(const uint32_t i){
//set parameters depending on new dynamic range.
packetsPerFrame = (tengigaEnable ? EIGER_TEN_GIGA_CONSTANT : EIGER_ONE_GIGA_CONSTANT) * dynamicRange;
bufferSize = onePacketSize * packetsPerFrame;
cout<<"packetsPerFrame:"<<packetsPerFrame<<" bufferSize:"<<bufferSize<<endl;
for(int i=0; i<MAX_NUMBER_OF_WRITER_THREADS; i++)
updateFileHeader(i);
@ -729,7 +727,7 @@ int UDPStandardImplementation::setDetectorType(const detectorType d){
frameIndexMask = EIGER_FRAME_INDEX_MASK;
frameIndexOffset = EIGER_FRAME_INDEX_OFFSET;
packetIndexMask = EIGER_PACKET_INDEX_MASK;
maxFramesPerFile = EIGER_MAX_FRAMES_PER_FILE;
maxFramesPerFile = 5;//EIGER_MAX_FRAMES_PER_FILE;
fifoSize = EIGER_FIFO_SIZE;
fifoDepth = EIGER_FIFO_SIZE;
footerOffset = EIGER_PACKET_HEADER_SIZE + oneDataSize;
@ -848,9 +846,8 @@ int UDPStandardImplementation::startReceiver(char *c){
for(int i=0;i<numberofWriterThreads;i++){
frameIndex[i] = 0;
lastFrameIndex[i]=-1;
//reset file parameters
lastFrameNumberInFile[i] = 0;
lastFrameNumberInFile[i] = -1;
totalPacketsInFile[i] = 0;
if(sfilefd[i]){
fclose(sfilefd[i]);
@ -1003,26 +1000,37 @@ void UDPStandardImplementation::startReadout(){
if(status == RUNNING){
//check if all packets got
int totalP = 0,prev,i;
for(i=0; i<numberofListeningThreads; ++i){
int totalP = 0,prev=-1,i;
for(i=0; i<numberofListeningThreads; ++i)
totalP += totalListeningPacketCount[i];
}
//check if current buffer still receiving something
int currentReceivedInBuffer=0,prevReceivedInBuffer=-1;
for(i=0; i<numberofListeningThreads; ++i)
currentReceivedInBuffer += udpSocket[i]->getCurrentTotalReceived();
//wait for all packets
if(totalP!=numberOfFrames*packetsPerFrame*numberofListeningThreads){
prev = -1;
//wait as long as there is change from prev totalP
while(prev != totalP){
//wait as long as there is change from prev totalP,
//and also change from received in buffer to previous value
//(as one listens to many at a time, shouldnt cut off in between)
while((prev != totalP) && (prevReceivedInBuffer!= currentReceivedInBuffer)){
#ifdef DEBUG5
cprintf(MAGENTA,"waiting for all packets totalP:%d\n",totalP);
cprintf(MAGENTA,"waiting for all packets totalP:%d currently in buffer:%d\n",totalP,currentReceivedInBuffer);
#endif
usleep(5000);/* Need to find optimal time (exposure time and acquisition period) **/
prev = totalP;
totalP=0;
for(i=0; i<numberofListeningThreads; ++i){
totalP = 0;
for(i=0; i<numberofListeningThreads; ++i)
totalP += totalListeningPacketCount[i];
}
prevReceivedInBuffer = currentReceivedInBuffer;
currentReceivedInBuffer = 0;
for(i=0; i<numberofListeningThreads; ++i)
currentReceivedInBuffer += udpSocket[i]->getCurrentTotalReceived();
}
}
@ -1041,7 +1049,7 @@ void UDPStandardImplementation::startReadout(){
/**make this better by asking all of it at once*/
void UDPStandardImplementation::readFrame(int ithread, char* c,char** raw, uint64_t &startAcq, uint64_t &startFrame){
void UDPStandardImplementation::readFrame(int ithread, char* c,char** raw, int64_t &startAcq, int64_t &startFrame){
FILE_LOG(logDEBUG) << __AT__ << " called";
//point to gui data, to let writer thread know that gui is back for data
@ -1311,20 +1319,23 @@ int UDPStandardImplementation::createUDPSockets(){
strcpy(eth,"");
shutDownUDPSockets();
int headerpacketsize = 0;
if(myDetectorType == EIGER)
headerpacketsize = EIGER_HEADER_LENGTH;
//if no eth, listen to all
if(!strlen(eth)){
FILE_LOG(logWARNING) << "eth is empty. Listening to all";
for(int i=0;i<numberofListeningThreads;i++)
udpSocket[i] = new genericSocket(port[i],genericSocket::UDP,onePacketSize);
udpSocket[i] = new genericSocket(port[i],genericSocket::UDP,onePacketSize,NULL,headerpacketsize);
}
//normal socket
else{
FILE_LOG(logINFO) << "Ethernet Interface:" << eth;
for(int i=0;i<numberofListeningThreads;i++)
udpSocket[i] = new genericSocket(port[i],genericSocket::UDP,onePacketSize,eth);
udpSocket[i] = new genericSocket(port[i],genericSocket::UDP,onePacketSize,eth,headerpacketsize);
}
//error
@ -1395,7 +1406,6 @@ int UDPStandardImplementation::setupWriter(){
int UDPStandardImplementation::createNewFile(int ithread){
FILE_LOG(logDEBUG) << __AT__ << " called";
if(!ithread) cout<<ithread <<" ****Creating new file" << endl;
int index = 0;
if(packetsCaught)
@ -1405,7 +1415,7 @@ int UDPStandardImplementation::createNewFile(int ithread){
if(!frameIndexEnable)
sprintf(completeFileName[ithread], "%s/%s_%lld.raw", filePath,fileNamePerThread[ithread],(long long int)fileIndex);
else
sprintf(completeFileName[ithread], "%s/%s_f%012lld_%lld.raw", filePath,fileNamePerThread[ithread],(long long int)lastFrameNumberInFile[ithread],(long long int)fileIndex);
sprintf(completeFileName[ithread], "%s/%s_f%012lld_%lld.raw", filePath,fileNamePerThread[ithread],(long long int)lastFrameNumberInFile[ithread]+1,(long long int)fileIndex);
#ifdef DEBUG4
FILE_LOG(logINFO) << completefileName;
@ -1437,21 +1447,23 @@ int UDPStandardImplementation::createNewFile(int ithread){
//Print packet loss and filenames
if(!packetsCaught){
frameNumberInPreviousFile[ithread] = -1;
cout << "File: " << completeFileName[ithread] << endl;
cout << "Thread " << ithread << " File:" << completeFileName[ithread] << endl;
}else{
//Assumption for startFrameindex usign ithread: datacompression never enters here and therefore is always same number of listening and writing threads to use ithread
if (frameNumberInPreviousFile[ithread] == -1)
frameNumberInPreviousFile[ithread] = startFrameIndex-1;
cout << completeFileName[ithread]
cout << "Thread " << ithread << " File:" << completeFileName[ithread]
<< "\tPacket Loss: " << setw(4)<<fixed << setprecision(4) << dec <<
(int)((( (currentFrameNumber[ithread]-frameNumberInPreviousFile[ithread]) - ((totalPacketsInFile[ithread])/packetsPerFrame))/
(double)(currentFrameNumber[ithread]-frameNumberInPreviousFile[ithread]))*100.000)
<< "%\tFramenumber: " << currentFrameNumber[ithread]
<< "\t\t frameNumberInPreviousFile: " << frameNumberInPreviousFile[ithread]
<< "\tIndex " << dec << index
<< "\tLost " << dec << ( ((int)(currentFrameNumber[ithread]-frameNumberInPreviousFile[ithread])) -
((totalPacketsInFile[ithread])/packetsPerFrame)) << endl;
(int)((( ((currentFrameNumber[ithread]-1)-frameNumberInPreviousFile[ithread]) - ((totalPacketsInFile[ithread])/packetsPerFrame))/
(double)((currentFrameNumber[ithread]-1)-frameNumberInPreviousFile[ithread]))*100.000)
<< "%\tFrame Number: " << currentFrameNumber[ithread]
// << "\t\t frameNumberInPreviousFile: " << frameNumberInPreviousFile[ithread]
// << "\tIndex " << dec << index
<< "\tPackets Lost " << dec << ( ((int)((currentFrameNumber[ithread]-1)-frameNumberInPreviousFile[ithread])) -
((totalPacketsInFile[ithread])/packetsPerFrame))
<< endl;
}
@ -1463,7 +1475,6 @@ int UDPStandardImplementation::createNewFile(int ithread){
//reset counters for each new file
if(packetsCaught){
frameNumberInPreviousFile[ithread] = currentFrameNumber[ithread];
lastFrameNumberInFile[ithread] = 0;
totalPacketsInFile[ithread] = 0;
}
@ -1643,7 +1654,10 @@ int UDPStandardImplementation::prepareAndListenBuffer(int ithread, int cSize, ch
if(status != TRANSMITTING)
receivedSize = udpSocket[ithread]->ReceiveDataOnly(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS + cSize, (bufferSize * numberofJobsPerBuffer) - cSize);
cout<<ithread<<" receivedSize:"<<receivedSize<<endl;
//eiger returns 0 when header packet caught
if(!receivedSize && status != TRANSMITTING)
receivedSize = udpSocket[ithread]->ReceiveDataOnly(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS + cSize, (bufferSize * numberofJobsPerBuffer) - cSize);
totalListeningPacketCount[ithread] += (receivedSize/onePacketSize);
#ifdef MANUALDEBUG
@ -1774,32 +1788,31 @@ void UDPStandardImplementation::stopListening(int ithread, int numbytes){
}
//push dummy-end buffer into fifo for all writer threads
for(int i=0; i<numberofWriterThreads; ++i){
fifoFree[ithread]->pop(buffer[ithread]);
fifoFree[ithread]->pop(buffer[ithread]);
#ifdef EVERYFIFODEBUG
if(fifoFree[ithread]->getSemValue()<100)
if(fifoFree[ithread]->getSemValue()<100)
cprintf(BLUE,"FifoFree[%d]: value:%d, pop 0x%x\n",ithread,fifoFree[ithread]->getSemValue(),(void*)(buffer[ithread]));
#endif
#ifdef CFIFODEBUG
if(ithread == 0)
cprintf(CYAN,"Listening_Thread %d: Popped Dummy from fifoFree %p\n", ithread,(void*)(buffer[ithread]));
else
cprintf(YELLOW,"Listening_Thread %d: Popped Dummy from fifoFree %p\n", ithread,(void*)(buffer[ithread]));
if(ithread == 0)
cprintf(CYAN,"Listening_Thread %d: Popped Dummy from fifoFree %p\n", ithread,(void*)(buffer[ithread]));
else
cprintf(YELLOW,"Listening_Thread %d: Popped Dummy from fifoFree %p\n", ithread,(void*)(buffer[ithread]));
#endif
//creating dummy-end buffer with pc=0xFFFF
(*((uint32_t*)(buffer[ithread]))) = dummyPacketValue;
while(!fifo[ithread]->push(buffer[ithread]));
//creating dummy-end buffer with pc=0xFFFF
(*((uint32_t*)(buffer[ithread]))) = dummyPacketValue;
while(!fifo[ithread]->push(buffer[ithread]));
#ifdef EVERYFIFODEBUG
if(fifo[ithread]->getSemValue()>(fifoSize-100))
if(fifo[ithread]->getSemValue()>(fifoSize-100))
cprintf(MAGENTA,"Fifo[%d]: value:%d, push 0x%x\n",ithread,fifo[ithread]->getSemValue(),(void*)(buffer[ithread]));
#endif
#ifdef CFIFODEBUG
if(ithread == 0)
cprintf(CYAN,"Listening_Thread %d: Listener pushed dummy-end buffer into fifo %p\n", ithread,(void*)(buffer[ithread]));
else
cprintf(YELLOW,"Listening_Thread %d: Listener pushed dummy-end buffer into fifo %p\n", ithread,(void*)(buffer[ithread]));
if(ithread == 0)
cprintf(CYAN,"Listening_Thread %d: Listener pushed dummy-end buffer into fifo %p\n", ithread,(void*)(buffer[ithread]));
else
cprintf(YELLOW,"Listening_Thread %d: Listener pushed dummy-end buffer into fifo %p\n", ithread,(void*)(buffer[ithread]));
#endif
}
//reset mask and exit loop
@ -2061,8 +2074,9 @@ void UDPStandardImplementation::waitWritingBufferForNextAcquisition(int ithread)
//pop fifo so that its empty
char* temp;
while(!fifo[ithread]->isEmpty()){
cprintf(RED,"%d:emptied buffer in fifo\n", ithread);
cprintf(RED,"%d:fifo emptied\n", ithread);
fifo[ithread]->pop(temp);
fifoFree[ithread]->push(temp);
#ifdef EVERYFIFODEBUG
if(fifo[ithread]->getSemValue()>(fifoSize-100))
cprintf(CYAN,"Fifo[%d]: value:%d, pop 0x%x\n",ithread,fifo[ithread]->getSemValue(),(void*)(temp));
@ -2087,7 +2101,6 @@ void UDPStandardImplementation::waitWritingBufferForNextAcquisition(int ithread)
if(detindex == -1)
sprintf(fileNamePerThread[ithread],"%s_d%d",fileName,ithread);
cout << "file name changed to include det Id:" << fileNamePerThread[ithread] << endl;
}
if(dataCompressionEnable){
@ -2189,15 +2202,34 @@ void UDPStandardImplementation::stopWriting(int ithread, char* wbuffer){
//statistics
FILE_LOG(logINFO) << "Status: Run Finished";
FILE_LOG(logINFO) << "Last Frame Number Caught:" << lastFrameIndex[ithread];
if(totalPacketsCaught < ((uint64_t)numberOfFrames*packetsPerFrame*numberofListeningThreads)){
cprintf(RED, "Total Missing Packets: %lld\n",(long long int)numberOfFrames*packetsPerFrame*numberofListeningThreads-totalPacketsCaught);
cprintf(RED, "Total Packets Caught: %lld\n",(long long int)totalPacketsCaught);
cprintf(RED, "Total Frames Caught: %lld\n",(long long int)(totalPacketsCaught/(packetsPerFrame*numberofListeningThreads)));
cprintf(RED, "Last Frame Number Caught: ");
int64_t lastFrameNumber = 0;
for(int i=0;i<numberofListeningThreads;i++){
if(i) cprintf(RED, ", ");
lastFrameNumber = lastFrameNumberInFile[i] - startFrameIndex;//lastFrameNumberInFile updated even if not written
if(myDetectorType == EIGER)
lastFrameNumber+= 1;
cprintf(RED, "%lld",(long long int)lastFrameNumber);
}
cout<<endl;
}else{
cprintf(GREEN, "Total Missing Packets: %lld\n",(long long int)numberOfFrames*packetsPerFrame*numberofListeningThreads-totalPacketsCaught);
cprintf(GREEN, "Total Packets Caught: %lld\n",(long long int)totalPacketsCaught);
cprintf(GREEN, "Total Frames Caught: %lld\n",(long long int)(totalPacketsCaught/(packetsPerFrame*numberofListeningThreads)));
cprintf(GREEN, "Last Frame Number Caught: ");
int64_t lastFrameNumber = 0;
for(int i=0;i<numberofListeningThreads;i++){
if(i) cprintf(GREEN, ", ");
lastFrameNumber = lastFrameNumberInFile[i] - startFrameIndex;//lastFrameNumberInFile updated even if not written
if(myDetectorType == EIGER)
lastFrameNumber+= 1;
cprintf(GREEN, "%lld",(long long int)lastFrameNumber);
}
cout<<endl;
}
//acquisition end
if (acquisitionFinishedCallBack)
@ -2221,12 +2253,7 @@ void UDPStandardImplementation::handleWithoutDataCompression(int ithread, char*
return;
}
//update current frame number
lastFrameIndex[ithread] = tempframenumber;
if(myDetectorType == EIGER)
currentFrameNumber[ithread] = tempframenumber + (startFrameIndex - 1);
else
currentFrameNumber[ithread] = tempframenumber-startFrameIndex;
currentFrameNumber[ithread] = tempframenumber;
//set indices
pthread_mutex_lock(&progressMutex);
@ -2282,7 +2309,6 @@ void UDPStandardImplementation::writeFileWithoutCompression(int ithread, char* w
//if write enabled
if((fileWriteEnable) && (sfilefd[ithread])){
cout<<ithread<<" numpackets:"<<numpackets<<" lastFrameNumberInFile[ithread]:"<<lastFrameNumberInFile[ithread]<<endl;
if(numpackets){
int offset = HEADER_SIZE_NUM_TOT_PACKETS;
uint64_t nextFileFrameNumber;
@ -2291,7 +2317,7 @@ cout<<ithread<<" numpackets:"<<numpackets<<" lastFrameNumberInFile[ithread]:"<<l
//handle half frames from previous buffer
//second part to not check when there has been something written previously
if(numpackets &&(lastFrameNumberInFile[ithread])){
if(numpackets &&(lastFrameNumberInFile[ithread]>=0)){
//get start frame (required to create new file at the right juncture)
uint64_t startframe =-1;
if(getFrameNumber(ithread, wbuffer + offset, startframe) == FAIL){
@ -2299,8 +2325,8 @@ cout<<ithread<<" numpackets:"<<numpackets<<" lastFrameNumberInFile[ithread]:"<<l
while(!fifoFree[ithread]->push(wbuffer));
return;
}
cout<<"222"<<endl;
if(startframe == lastFrameNumberInFile[ithread]-1){
if(startframe == lastFrameNumberInFile[ithread]){
if(writeUptoFrameNumber(ithread, wbuffer, offset, startframe+1, numpackets, packetsWritten) == FAIL)
//weird frame number of zero from fpga
return;
@ -2311,26 +2337,21 @@ cout<<ithread<<" numpackets:"<<numpackets<<" lastFrameNumberInFile[ithread]:"<<l
pthread_mutex_lock(&writeMutex);
packetsCaught += packetsWritten;
totalPacketsCaught += packetsWritten;
cout<<"totalpacketscaught:"<<totalPacketsCaught<<" packetscaught:" <<packetsCaught <<endl;
pthread_mutex_unlock(&writeMutex);
}
}
while(numpackets){
if(!ithread) cout<<ithread<<" loop: lastFrameNumberInFile[ithread]:"<<lastFrameNumberInFile[ithread]<<endl;
//new file
//create new file only if something has been written and modulus works
if((lastFrameNumberInFile[ithread]) &&(!(lastFrameNumberInFile[ithread] % maxFramesPerFile))){
if(!ithread) cout<<ithread <<" *** gonna create a new file!!"<<endl;
if((lastFrameNumberInFile[ithread]>=0) &&(!((lastFrameNumberInFile[ithread]+1) % maxFramesPerFile))){
createNewFile(ithread);
}
//frames to save in one file
nextFileFrameNumber = lastFrameNumberInFile[ithread] +
(maxFramesPerFile - (lastFrameNumberInFile[ithread]%maxFramesPerFile));
if(!ithread) cout<<ithread<<" nextFileFrameNumber:"<<nextFileFrameNumber<<endl;
if(!ithread) cout<<ithread<<" offset:"<<offset<<endl;
nextFileFrameNumber = (lastFrameNumberInFile[ithread]+1) +
(maxFramesPerFile - ((lastFrameNumberInFile[ithread]+1)%maxFramesPerFile));
if(writeUptoFrameNumber(ithread, wbuffer, offset, nextFileFrameNumber, numpackets, packetsWritten) == FAIL)
//weird frame number of zero from fpga
@ -2342,9 +2363,8 @@ cout<<ithread<<" numpackets:"<<numpackets<<" lastFrameNumberInFile[ithread]:"<<l
pthread_mutex_lock(&writeMutex);
packetsCaught += packetsWritten;
totalPacketsCaught += packetsWritten;
pthread_mutex_unlock(&writeMutex);
currentFrameNumber[ithread] += lastFrameNumberInFile[ithread]-1;
currentFrameNumber[ithread] += lastFrameNumberInFile[ithread];
}
}
}
@ -2361,7 +2381,7 @@ cout<<ithread<<" numpackets:"<<numpackets<<" lastFrameNumberInFile[ithread]:"<<l
return;
}
totalPacketsInFile[ithread] += numpackets;
lastFrameNumberInFile[ithread] = finalLastFrameNumberToSave+1;
lastFrameNumberInFile[ithread] = finalLastFrameNumberToSave;
currentFrameNumber[ithread] = finalLastFrameNumberToSave;
}
@ -2502,7 +2522,8 @@ void UDPStandardImplementation::handleDataCompression(int ithread, char* wbuffer
while(!fifoFree[ithread]->push(wbuffer));
return;
}
currentFrameNumber[ithread] = tempframenumber;
currentFrameNumber[ithread] = tempframenumber;
//set indices
pthread_mutex_lock(&progressMutex);
@ -2648,14 +2669,15 @@ int UDPStandardImplementation::getFrameNumber(int ithread, char* wbuffer, uint64
FILE_LOG(logERROR) << "Fifo "<< ithread << ": Frame Number is zero from firmware.";
return FAIL;
}
//#ifdef DEBUG4
#ifdef DEBUG4
if(!ithread) cprintf(GREEN,"Writing_Thread %d: fnum:%lld pnum:%d FPGA_fnum:%d footeroffset:%d\n",
ithread,
(long long int)tempframenumber,
(*( (uint16_t*) footer->packetNumber)),
(uint32_t)(*( (uint64_t*) footer)),
footerOffset);
//#endif
#endif
tempframenumber += (startFrameIndex - 1);
break;
case JUNGFRAU:
@ -2666,6 +2688,7 @@ int UDPStandardImplementation::getFrameNumber(int ithread, char* wbuffer, uint64
(long long int)tempframenumber,
(*( (uint8_t*) header->packetNumber)));
#endif
tempframenumber += startFrameIndex;
break;
default:
@ -2680,7 +2703,7 @@ int UDPStandardImplementation::getFrameNumber(int ithread, char* wbuffer, uint64
(long long int)tempframenumber,
pnum);
#endif
tempframenumber += startFrameIndex;
break;
}
return OK;
@ -2694,119 +2717,103 @@ int UDPStandardImplementation::writeUptoFrameNumber(int ithread, char* wbuffer,
FILE_LOG(logDEBUG) << __AT__ << " called";
bool expectedoffsetATlastpacket = false;
int bigIncrements = onePacketSize * packetsPerFrame; //a frame at a time
if(numberofJobsPerBuffer == 1) bigIncrements = onePacketSize; //a packet at a time as we listen to only one frame in a buffer
int startoffset = offset;
if(!ithread) cout<<ithread<<" startoffset:"<<startoffset<<endl;
int endoffset = startoffset + numpackets * onePacketSize;
if(!ithread) cout<<ithread<<" endoffset:"<<endoffset<<endl;
if(!ithread) cout<<ithread<<" nextFrameNumber:"<<nextFrameNumber<<endl;
if(!ithread) cout<<ithread<<" lastFrameNumberInFile[ithread]:"<<lastFrameNumberInFile[ithread]<<endl;
int expectedoffset = startoffset + ((nextFrameNumber - lastFrameNumberInFile[ithread]) * onePacketSize);
if(!ithread) cout<<ithread<<" expected offset before:"<<expectedoffset<<endl;
int expectedoffset = startoffset + ((nextFrameNumber - (lastFrameNumberInFile[ithread]+1)) * onePacketSize * packetsPerFrame);
bool expectedoffsetATlastpacket = false;
if(expectedoffset >= endoffset){
expectedoffset = startoffset + ((numpackets -1) * onePacketSize);
expectedoffsetATlastpacket = true;
}
offset = expectedoffset;
if(!ithread) cout<<ithread<<" expected offset:"<<expectedoffset<<endl;
//get frame number at expected offset
uint64_t tempframenumber=-1;
uint64_t frameNumberWritten=-1;
if(getFrameNumber(ithread, wbuffer + expectedoffset, tempframenumber) == FAIL){
//error in frame number sent by fpga
while(!fifoFree[ithread]->push(wbuffer));
return FAIL;
}
if(!ithread) cout<<ithread<<" tempframenumber:"<<tempframenumber<<endl;
//last packet in buffer does not reach the nextframenumber, write all
if(expectedoffsetATlastpacket && tempframenumber < nextFrameNumber){
frameNumberWritten = tempframenumber;
offset += onePacketSize;
if(!ithread) cout<<ithread<<" final offset:"<<offset<<endl;
}else{
//if tempframenumber is too high, go backwards fast (by frame) and then slowly (by each packet) frontwards
if(tempframenumber>=nextFrameNumber){
if(!ithread) cout<<ithread<<" too high, so going backwards fast:"<<endl;
while(tempframenumber>=nextFrameNumber){
offset -= (onePacketSize*packetsPerFrame);/** its ok..if jonbsperthread is 1, go packet by packet*/
if(!ithread) cout<<ithread<<" offset:"<<offset<<endl;
if(offset<startoffset){
if(!ithread) cout<<ithread<<" offset less than endoffset, breaking out"<<endl;
offset -= bigIncrements;
if(offset<startoffset)
break;
}if(getFrameNumber(ithread, wbuffer + offset, tempframenumber) == FAIL){
//error in frame number sent by fpga
while(!fifoFree[ithread]->push(wbuffer));
return FAIL;
}
if(!ithread) cout<<ithread<<" tempframenumber:"<<tempframenumber<<endl;
}
if(offset<startoffset){
offset = startoffset;
if(!ithread) cout<<ithread<<" offset < start offset so offset now:"<<offset<<endl;
}
if(!ithread) cout<<ithread<<" lower, so going forwards slow"<<endl;
while(tempframenumber<nextFrameNumber){
offset += onePacketSize;
if(!ithread) cout<<ithread<<" offset:"<<offset<<endl;
if(getFrameNumber(ithread, wbuffer + offset, tempframenumber) == FAIL){
//error in frame number sent by fpga
while(!fifoFree[ithread]->push(wbuffer));
return FAIL;
}
if(!ithread) cout<<ithread<<" tempframenumber:"<<tempframenumber<<endl;
}
if(!ithread) cout<<ithread<<" final offset:"<<offset<<endl;
/** here*/
if(offset<startoffset){
offset = startoffset;
if(getFrameNumber(ithread, wbuffer + offset, tempframenumber) == FAIL){
//error in frame number sent by fpga
while(!fifoFree[ithread]->push(wbuffer));
return FAIL;
}
}
while(tempframenumber<nextFrameNumber){
offset += onePacketSize;
if(getFrameNumber(ithread, wbuffer + offset, tempframenumber) == FAIL){
//error in frame number sent by fpga
while(!fifoFree[ithread]->push(wbuffer));
return FAIL;
}
}
}
//if tempframenumber is too low, go forwards fast (by frame) and then slowly (by each packet) backwards
else{
if(!ithread) cout<<ithread<<" too low, so going forwards fast:"<<endl;
while(tempframenumber<nextFrameNumber){
offset += (onePacketSize*packetsPerFrame);
if(!ithread) cout<<ithread<<" offset:"<<offset<<endl;
if(offset>endoffset){if(!ithread) cout<<ithread<<" offset greater than endoffset, breaking out"<<endl;
break;}
offset += bigIncrements;
if(offset>endoffset)
break;
if(getFrameNumber(ithread, wbuffer + offset, tempframenumber) == FAIL){
//error in frame number sent by fpga
while(!fifoFree[ithread]->push(wbuffer));
return FAIL;
}
if(!ithread) cout<<ithread<<" tempframenumber:"<<tempframenumber<<endl;
}
if(offset>endoffset){
offset = endoffset;
if(!ithread) cout<<ithread<<" offset > end offset so offset now:"<<offset<<endl;
}
if(!ithread) cout<<ithread<<" higher, so going backewards slow"<<endl;
while(tempframenumber>nextFrameNumber){
offset -= onePacketSize;
if(!ithread) cout<<ithread<<" offset:"<<offset<<endl;
if(getFrameNumber(ithread, wbuffer + offset, tempframenumber) == FAIL){
//error in frame number sent by fpga
while(!fifoFree[ithread]->push(wbuffer));
return FAIL;
}
if(!ithread) cout<<ithread<<" tempframenumber:"<<tempframenumber<<endl;
}
while(tempframenumber>nextFrameNumber){
offset -= onePacketSize;
if(getFrameNumber(ithread, wbuffer + offset, tempframenumber) == FAIL){
//error in frame number sent by fpga
while(!fifoFree[ithread]->push(wbuffer));
return FAIL;
}
}
offset += onePacketSize;
if(!ithread) cout<<ithread<<" final offset:"<<offset<<endl;
/** here*/
}
frameNumberWritten = nextFrameNumber-1;
}
fwrite(wbuffer + startoffset, 1, offset-startoffset, sfilefd[ithread]);
if(!ithread) cout<<" written "<<offset-startoffset<<" bytes"<<endl;
numPacketsWritten += ((offset-startoffset)/onePacketSize);
if(!ithread) cout<<" numPacketsWritten:"<<numPacketsWritten<<endl;
lastFrameNumberInFile[ithread] += tempframenumber;
if(!ithread) cout<<" lastFrameNumberInFile[ithread]:"<<lastFrameNumberInFile[ithread]<<endl;
lastFrameNumberInFile[ithread] = frameNumberWritten;
return OK;
}

View File

@ -930,7 +930,6 @@ int slsReceiverTCPIPInterface::get_frames_caught(){
strcpy(mess,SET_RECEIVER_ERR_MESSAGE);
ret=FAIL;
}else retval=receiverBase->getTotalFramesCaught();
cout<<"frames caught sent:"<<retval<<endl;
#endif
if(ret==OK && socket->differentClients){
FILE_LOG(logDEBUG) << "Force update";
@ -1132,8 +1131,8 @@ int slsReceiverTCPIPInterface::moench_read_frame(){
char* raw;
uint64_t startAcquisitionIndex=0;
uint64_t startFrameIndex=0;
int64_t startAcquisitionIndex=0;
int64_t startFrameIndex=0;
uint32_t index = -1,bindex = 0, offset=0;
strcpy(mess,"Could not read frame\n");
@ -1153,7 +1152,7 @@ int slsReceiverTCPIPInterface::moench_read_frame(){
else{
ret = OK;
receiverBase->readFrame(fName,&raw,startAcquisitionIndex,startFrameIndex);
receiverBase->readFrame(0,fName,&raw,startAcquisitionIndex,startFrameIndex);
/**send garbage with -1 index to try again*/
if (raw == NULL){
@ -1321,8 +1320,8 @@ int slsReceiverTCPIPInterface::gotthard_read_frame(){
uint32_t index=-1,index2=0;
uint32_t pindex=0,pindex2=0;
uint32_t bindex=0,bindex2=0;
uint64_t startAcquisitionIndex=0;
uint64_t startFrameIndex=0;
int64_t startAcquisitionIndex=0;
int64_t startFrameIndex=0;
strcpy(mess,"Could not read frame\n");
@ -1341,7 +1340,7 @@ int slsReceiverTCPIPInterface::gotthard_read_frame(){
cout<<"haven't caught any frame yet"<<endl;
}else{
ret = OK;
receiverBase->readFrame(fName,&raw,startAcquisitionIndex,startFrameIndex);
receiverBase->readFrame(0,fName,&raw,startAcquisitionIndex,startFrameIndex);
/**send garbage with -1 index to try again*/
if (raw == NULL){
@ -1496,8 +1495,8 @@ int slsReceiverTCPIPInterface::propix_read_frame(){
uint32_t index=-1,index2=0;
uint32_t pindex=0,pindex2=0;
uint32_t bindex=0,bindex2=0;
uint64_t startAcquisitionIndex=0;
uint64_t startFrameIndex=0;
int64_t startAcquisitionIndex=0;
int64_t startFrameIndex=0;
strcpy(mess,"Could not read frame\n");
@ -1516,7 +1515,7 @@ int slsReceiverTCPIPInterface::propix_read_frame(){
cout<<"haven't caught any frame yet"<<endl;
}else{
ret = OK;
receiverBase->readFrame(fName,&raw,startAcquisitionIndex,startFrameIndex);
receiverBase->readFrame(0,fName,&raw,startAcquisitionIndex,startFrameIndex);
/**send garbage with -1 index to try again*/
if (raw == NULL){
@ -1634,22 +1633,27 @@ int slsReceiverTCPIPInterface::eiger_read_frame(){
char fName[MAX_STR_LENGTH]="";
int acquisitionIndex = -1;
int frameIndex= -1;
uint32_t index=0;
uint32_t subframenumber=-1;
int index=0;
int subframenumber=-1;
int frameSize = EIGER_ONE_GIGA_ONE_PACKET_SIZE * packetsPerFrame;
int dataSize = EIGER_ONE_GIGA_ONE_DATA_SIZE * packetsPerFrame;
int frameSize = EIGER_ONE_GIGA_ONE_PACKET_SIZE * packetsPerFrame * EIGER_MAX_PORTS;
int dataSize = EIGER_ONE_GIGA_ONE_DATA_SIZE * packetsPerFrame * EIGER_MAX_PORTS;
int oneDataSize = EIGER_ONE_GIGA_ONE_DATA_SIZE;
int onePacketSize = EIGER_ONE_GIGA_ONE_PACKET_SIZE;
if(tenGigaEnable){
frameSize = EIGER_TEN_GIGA_ONE_PACKET_SIZE * packetsPerFrame;
dataSize = EIGER_TEN_GIGA_ONE_DATA_SIZE * packetsPerFrame;
frameSize = EIGER_TEN_GIGA_ONE_PACKET_SIZE * packetsPerFrame * EIGER_MAX_PORTS;
dataSize = EIGER_TEN_GIGA_ONE_DATA_SIZE * packetsPerFrame * EIGER_MAX_PORTS;
oneDataSize = EIGER_TEN_GIGA_ONE_DATA_SIZE;
onePacketSize = EIGER_TEN_GIGA_ONE_PACKET_SIZE;
}
char* raw;
char* origVal = new char[frameSize];
char* retval = new char[dataSize];
uint64_t startAcquisitionIndex=0;
uint64_t startFrameIndex=0;
memset(origVal,0xF,frameSize);
memset(retval,0xF,dataSize);
int64_t startAcquisitionIndex=0;
int64_t startFrameIndex=0;
strcpy(mess,"Could not read frame\n");
@ -1674,35 +1678,40 @@ int slsReceiverTCPIPInterface::eiger_read_frame(){
else{
ret = OK;
//read a frame
receiverBase->readFrame(fName,&raw,startAcquisitionIndex,startFrameIndex);
//send garbage with -1 index to try again
if (raw == NULL){
startAcquisitionIndex = -1;
for(int i=0;i<EIGER_MAX_PORTS;i++){
receiverBase->readFrame(i,fName,&raw,startAcquisitionIndex,startFrameIndex);
//send garbage with -1 index to try again
if (raw == NULL){
startAcquisitionIndex = -1;
#ifdef VERYVERBOSE
cout<<"data not ready for gui yet"<<endl;
cout<<"data not ready for gui yet"<<endl;
#endif
}
else{
memcpy(((char*)origVal)+(i*onePacketSize*packetsPerFrame),raw,(frameSize/EIGER_MAX_PORTS));
raw=NULL;
}
}
//proper frame
else{//cout<<"**** got proper frame ******"<<endl;
if(startAcquisitionIndex != -1){
//cout<<"**** got proper frame ******"<<endl;
eiger_packet_footer_t* wbuf_footer;
wbuf_footer = (eiger_packet_footer_t*)(raw + oneDataSize + sizeof(eiger_packet_header_t));
wbuf_footer = (eiger_packet_footer_t*)(((char*)origVal) + oneDataSize + sizeof(eiger_packet_header_t));
index =(uint32_t)(*( (uint64_t*) wbuf_footer));
index += (startFrameIndex-1);
if(dynamicrange == 32){
eiger_packet_header_t* wbuf_header;
wbuf_header = (eiger_packet_header_t*) raw;
wbuf_header = (eiger_packet_header_t*) ((char*)origVal);
subframenumber = *( (uint32_t*) wbuf_header->subFrameNumber);
}
#ifdef VERYVERBOSE
//#ifdef VERYVERBOSE
cout << "index:" << dec << index << endl;
cout << "subframenumber:" << dec << subframenumber << endl;
#endif
//#endif
memcpy(origVal,raw,frameSize);
raw=NULL;
int c1=8;//first port
int c2=(frameSize/2) + 8; //second port
@ -1815,18 +1824,18 @@ int slsReceiverTCPIPInterface::eiger_read_frame(){
startFrameIndex = -1;
else
frameIndex = index-startFrameIndex;
#ifdef VERY_VERY_DEBUG
//#ifdef VERY_VERY_DEBUG
cout << "acquisitionIndex calculated is:" << acquisitionIndex << endl;
cout << "frameIndex calculated is:" << frameIndex << endl;
cout << "index:" << index << endl;
cout << "startAcquisitionIndex:" << startAcquisitionIndex << endl;
cout << "startFrameIndex:" << startFrameIndex << endl;
cout << "subframenumber:" << subframenumber << endl;
#endif
//#endif
}
}
#ifdef VERYVERBOSE
//#ifdef VERYVERBOSE
if(frameIndex!=-1){
cout << "fName:" << fName << endl;
cout << "acquisitionIndex:" << acquisitionIndex << endl;
@ -1835,7 +1844,7 @@ int slsReceiverTCPIPInterface::eiger_read_frame(){
cout << "startFrameIndex:" << startFrameIndex << endl;
cout << "subframenumber:" << subframenumber << endl;
}
#endif
//#endif
@ -1880,8 +1889,8 @@ int slsReceiverTCPIPInterface::jungfrau_read_frame(){
int acquisitionIndex = -1;
int frameIndex= -1;
int64_t currentIndex=0;
uint64_t startAcquisitionIndex=0;
uint64_t startFrameIndex=0;
int64_t startAcquisitionIndex=0;
int64_t startFrameIndex=0;
strcpy(mess,"Could not read frame\n");
@ -1919,7 +1928,7 @@ int slsReceiverTCPIPInterface::jungfrau_read_frame(){
else{
ret = OK;
//read a frame
receiverBase->readFrame(fName,&raw,startAcquisitionIndex,startFrameIndex);
receiverBase->readFrame(0,fName,&raw,startAcquisitionIndex,startFrameIndex);
//send garbage with -1 index to try again
if (raw == NULL){
startAcquisitionIndex = -1;