removed bug where it shuts down socket and tries to read it as it is -1

This commit is contained in:
Dhanya Maliakal 2016-11-25 14:34:49 +01:00
parent af8c750b5a
commit 5445552b17

View File

@ -1140,13 +1140,13 @@ void UDPStandardImplementation::startReadout(){
//needs to wait for packets only if activated
if(activated){
//check if all packets got
int totalP = 0,prev=-1,i;
for(i=0; i<numberofListeningThreads; ++i)
int totalP = 0,prev=-1;
for(int i=0; i<numberofListeningThreads; ++i)
totalP += totalListeningPacketCount[i];
//check if current buffer still receiving something
int currentReceivedInBuffer=0,prevReceivedInBuffer=-1;
for(i=0; i<numberofListeningThreads; ++i)
for(int i=0; i<numberofListeningThreads; ++i)
currentReceivedInBuffer += udpSocket[i]->getCurrentTotalReceived();
//wait for all packets
@ -1157,19 +1157,19 @@ void UDPStandardImplementation::startReadout(){
//(as one listens to many at a time, shouldnt cut off in between)
while((prev != totalP) || (prevReceivedInBuffer!= currentReceivedInBuffer)){
#ifdef DEBUG5
cprintf(MAGENTA,"waiting for all packets totalP:%d currently in buffer:%d\n",totalP,currentReceivedInBuffer);
cprintf(MAGENTA,"waiting for all packets prevP:%d totalP:%d PrevBuffer:%d currentBuffer:%d\n",prev,totalP,prevReceivedInBuffer,currentReceivedInBuffer);
#endif
//usleep(2*1000*1000);
usleep(5*1000);/* Need to find optimal time (exposure time and acquisition period) **/
prev = totalP;
totalP = 0;
for(i=0; i<numberofListeningThreads; ++i)
for(int i=0; i<numberofListeningThreads; ++i)
totalP += totalListeningPacketCount[i];
prevReceivedInBuffer = currentReceivedInBuffer;
currentReceivedInBuffer = 0;
for(i=0; i<numberofListeningThreads; ++i)
for(int i=0; i<numberofListeningThreads; ++i)
currentReceivedInBuffer += udpSocket[i]->getCurrentTotalReceived();
#ifdef DEBUG5
cprintf(MAGENTA,"\tupdated: totalP:%d currently in buffer:%d\n",totalP,currentReceivedInBuffer);
@ -1662,7 +1662,8 @@ int UDPStandardImplementation::createNewFile(int ithread){
);
}
}
}else
printf("Thread:%d File opened:%s\n",ithread, completeFileName[ithread]);
//write file header
if(myDetectorType == EIGER)
@ -2074,7 +2075,7 @@ void UDPStandardImplementation::startListening(){
}
//problem in receiving or end of acquisition
if (status == TRANSMITTING||(rc == 0 && activated == 0)){
if (status == TRANSMITTING||(rc <= 0 && activated == 0)){
stopListening(ithread,rc);
continue;
}
@ -2119,14 +2120,15 @@ int UDPStandardImplementation::prepareAndListenBuffer(int ithread, int cSize, ch
receivedSize = udpSocket[ithread]->ReceiveDataOnly(buffer[ithread] + fifoBufferHeaderSize + cSize,
(bufferSize * numberofJobsPerBuffer) - cSize);
//write packet count to buffer
*((uint32_t*)(buffer[ithread])) = (receivedSize/onePacketSize);
totalListeningPacketCount[ithread] += (receivedSize/onePacketSize);
//start indices for each start of scan/acquisition
if((!measurementStarted[ithread]) && (receivedSize > 0))
startFrameIndices(ithread);
if(receivedSize > 0){
//write packet count to buffer
*((uint32_t*)(buffer[ithread])) = (receivedSize/onePacketSize);
totalListeningPacketCount[ithread] += (receivedSize/onePacketSize);
//start indices for each start of scan/acquisition
if(!measurementStarted[ithread]) //and rc>0
startFrameIndices(ithread);
}
#ifdef DEBUG
cprintf(BLUE, "Listening_Thread %d : Received bytes: %d. Expected bytes: %d\n", ithread, receivedSize, bufferSize * numberofJobsPerBuffer-cSize);
#endif
@ -2202,10 +2204,13 @@ int UDPStandardImplementation::prepareAndListenBufferCompleteFrames(int ithread)
//read first packet
pnum = FIRSTPNUM; //first packet number to validate
if(status != TRANSMITTING) rc = udpSocket[ithread]->ReceiveDataOnly(buffer[ithread] + offset);
if(!rc) return 0;
if(getFrameandPacketNumber(ithread,buffer[ithread] + offset,fi,pi,si,bi) == FAIL)
if(rc <= 0) return 0;
if(getFrameandPacketNumber(ithread,buffer[ithread] + offset,fi,pi,si,bi) == FAIL){
pi = ALL_MASK_32; //got 0 from fpga
fnum = fi; //fnum of first packet
fi = ALL_MASK_32;
}
else
fnum = fi; //fnum of first packet
bnum = bi; //bnum of first packet
totalListeningPacketCount[ithread]++;
#ifdef VERBOSE
@ -2237,7 +2242,7 @@ int UDPStandardImplementation::prepareAndListenBufferCompleteFrames(int ithread)
rc=0; //listen again
if(status != TRANSMITTING)
rc = udpSocket[ithread]->ReceiveDataOnly(buffer[ithread] + offset);
if(!rc){ //end: update ignored and return
if(rc <= 0){ //end: update ignored and return
if(myDetectorType == JUNGFRAU)
totalIgnoredPacketCount[ithread] += (packetsPerFrame - pnum);
else
@ -2245,10 +2250,13 @@ int UDPStandardImplementation::prepareAndListenBufferCompleteFrames(int ithread)
return 0;
}
totalListeningPacketCount[ithread]++;
if(getFrameandPacketNumber(ithread, buffer[ithread] + offset,fi,pi,si,bi) == FAIL)
if(getFrameandPacketNumber(ithread, buffer[ithread] + offset,fi,pi,si,bi) == FAIL){
pi = ALL_MASK_32; //got 0 from fpga
if(myDetectorType == EIGER)
fnum = fi; //update currentfnum for eiger (next packets should have currentfnum value)
fi = ALL_MASK_32;
totalIgnoredPacketCount[ithread] += (pnum + 1);
}
else if(myDetectorType == EIGER)
fnum = fi; //update currentfnum for eiger (next packets should have currentfnum value)
#ifdef VERBOSE
if(!ithread) cout << "next currentpnum :" << pnum << endl;
#endif
@ -2274,7 +2282,7 @@ int UDPStandardImplementation::prepareAndListenBufferCompleteFrames(int ithread)
rc=0;
if(status != TRANSMITTING)
rc = udpSocket[ithread]->ReceiveDataOnly(buffer[ithread] + offset);
if(!rc){
if(rc <= 0){
if(myDetectorType == JUNGFRAU)
totalIgnoredPacketCount[ithread] += (packetsPerFrame - pnum);
else
@ -2282,13 +2290,16 @@ int UDPStandardImplementation::prepareAndListenBufferCompleteFrames(int ithread)
return 0;
}
totalListeningPacketCount[ithread]++;
if(getFrameandPacketNumber(ithread, buffer[ithread] + offset,fi,pi,si,bi) == FAIL)
if(getFrameandPacketNumber(ithread, buffer[ithread] + offset,fi,pi,si,bi) == FAIL){
pi = ALL_MASK_32; //got 0 from fpga
fi = ALL_MASK_32;
}
#ifdef VERBOSE
if(!ithread) cout << "trying to find pnum:" << pnum << " got " << pi << endl;
#endif
}
fnum = fi; //fnum of first packet
if(fi!=ALL_MASK_32)
fnum = fi; //fnum of first packet
bnum = bi; //bnum of first packet
}
}
@ -2508,17 +2519,17 @@ void UDPStandardImplementation::startWriting(){
//--reset parameters before acquisition (depending on compression)
nf = 0; //compression has only one listening thread (anything not eiger)
if(dataCompressionEnable)
listenfifoIndex = 0; //compression has only one listening thread (anything not eiger)
else
listenfifoIndex = ithread;
/* inner loop - loop for each buffer */
//until mask unset (udp sockets shut down by client)
while((1 << ithread) & writerThreadsMask){
//pop
fifo[listenfifoIndex]->pop(wbuf);
if(!dataCompressionEnable)
fifo[ithread]->pop(wbuf);
else
fifo[0]->pop(wbuf);
uint32_t numPackets = (uint32_t)(*((uint32_t*)wbuf));
#ifdef DEBUG4
cprintf(GREEN,"Writing_Thread %d: Number of Packets: %d for FIFO %d\n", ithread, numPackets, listenfifoIndex);
@ -2684,6 +2695,7 @@ void UDPStandardImplementation::stopWriting(int ithread, char* wbuffer){
//Print packet loss
//if(totalWritingPacketCountFromLastCheck[ithread]){
#ifdef VERBOSE
if(numberofWriterThreads>1){
printf("Thread:%d"
"\tLost:%lld"
@ -2711,6 +2723,38 @@ void UDPStandardImplementation::stopWriting(int ithread, char* wbuffer){
(long long int)frameNumberInPreviousCheck[ithread]
);
}
if(numberofWriterThreads>1){
cprintf(BLUE,"File:%s"
"\nThread:%d"
"\tLost:%lld"
"\t\tPackets:%lld"
"\tFrame#:%lld"
"\tPFrame#:%lld\n",
completeFileName[ithread],ithread,
((frameNumberInPreviousFile[ithread]+1+maxFramesPerFile)>numberOfFrames)
?(long long int)((numberOfFrames-(frameNumberInPreviousFile[ithread]+1))*packetsPerFrame - totalPacketsInFile[ithread])
:(long long int)((frameNumberInPreviousFile[ithread]+maxFramesPerFile - frameNumberInPreviousFile[ithread])*packetsPerFrame - totalPacketsInFile[ithread]),
(long long int)totalPacketsInFile[ithread],
(long long int)currentFrameNumber[ithread],
(long long int)frameNumberInPreviousFile[ithread]
);
}else{
cprintf(BLUE,"File:%s"
"\nLost:%lld"
"\t\tPackets:%lld"
"\tFrame#:%lld"
"\tPFrame#:%lld\n",
completeFileName[ithread],
((frameNumberInPreviousFile[ithread]+1+maxFramesPerFile)>numberOfFrames)
?(long long int)(numberOfFrames-(frameNumberInPreviousFile[ithread]+1))
:(long long int)(frameNumberInPreviousFile[ithread]+maxFramesPerFile - frameNumberInPreviousFile[ithread]),
(long long int)totalPacketsInFile[ithread],
(long long int)currentFrameNumber[ithread],
(long long int)frameNumberInPreviousFile[ithread]
);
}
#endif
//}
closeFile(ithread);