some changes, almost done

This commit is contained in:
Dhanya Maliakal 2016-10-31 12:27:54 +01:00
parent eb68a69e38
commit ac87ae3d5b
2 changed files with 89 additions and 4 deletions

View File

@ -463,6 +463,16 @@ private:
*/ */
void handleWithoutDataCompression(int ithread, char* wbuffer,uint32_t npackets); void handleWithoutDataCompression(int ithread, char* wbuffer,uint32_t npackets);
/**
* Called by processWritingBuffer for jungfrau
* writes to dummy file, doesnt need to read packet numbers
* Copies data for gui display and frees addresses popped from FIFOs
* @param ithread writing thread index
* @param wbuffer writing buffer popped out from FIFO
* @param npackets number of packets
*/
void handleWithoutMissingPackets(int ithread, char* wbuffer,uint32_t npackets);
/** /**
* Calle by handleWithoutDataCompression * Calle by handleWithoutDataCompression
* Creating headers Writing to file without compression * Creating headers Writing to file without compression

View File

@ -2099,6 +2099,7 @@ int UDPStandardImplementation::prepareAndListenBuffer(int ithread, int cSize, ch
int offset = fifoBufferHeaderSize; int offset = fifoBufferHeaderSize;
int pnum = packetsPerFrame-1; int pnum = packetsPerFrame-1;
int currentpnum; int currentpnum;
int currentfnum=-1;
//read first packet header //read first packet header
receivedSize = udpSocket[ithread]->ReceiveDataOnly(buffer[ithread] + offset, JFRAU_HEADER_LENGTH); receivedSize = udpSocket[ithread]->ReceiveDataOnly(buffer[ithread] + offset, JFRAU_HEADER_LENGTH);
@ -2110,6 +2111,9 @@ int UDPStandardImplementation::prepareAndListenBuffer(int ithread, int cSize, ch
//correct packet //correct packet
if(currentpnum == pnum){ if(currentpnum == pnum){
//complete frame, get frame number while u can
if(pnum == 0)
(*((uint32_t*)(buffer[ithread]+8))) = (*( (uint32_t*) header->frameNumber))&frameIndexMask;
receivedSize = udpSocket[ithread]->ReceiveDataOnly(buffer[ithread] + offset, oneDataSize); receivedSize = udpSocket[ithread]->ReceiveDataOnly(buffer[ithread] + offset, oneDataSize);
if(!receivedSize) return 0; if(!receivedSize) return 0;
offset+=oneDataSize; offset+=oneDataSize;
@ -2133,7 +2137,6 @@ int UDPStandardImplementation::prepareAndListenBuffer(int ithread, int cSize, ch
header = (jfrau_packet_header_t*)(buffer[ithread] + offset); header = (jfrau_packet_header_t*)(buffer[ithread] + offset);
currentpnum = (*( (uint8_t*) header->packetNumber)); currentpnum = (*( (uint8_t*) header->packetNumber));
} }
} }
}//----- got a whole frame ------- }//----- got a whole frame -------
@ -2516,8 +2519,11 @@ void UDPStandardImplementation::startWriting(){
//jungfrau
if(myDetectorType == JUNGFRAU)
handleWithoutMissingPackets(ithread, wbuf, numPackets);
//normal //normal
if(!dataCompressionEnable) else if(!dataCompressionEnable)
handleWithoutDataCompression(ithread, wbuf, numPackets); handleWithoutDataCompression(ithread, wbuf, numPackets);
//compression //compression
@ -2760,7 +2766,6 @@ void UDPStandardImplementation::stopWriting(int ithread, char* wbuffer){
void UDPStandardImplementation::handleWithoutDataCompression(int ithread, char* wbuffer, uint32_t npackets){ void UDPStandardImplementation::handleWithoutDataCompression(int ithread, char* wbuffer, uint32_t npackets){
FILE_LOG(logDEBUG) << __AT__ << " called"; FILE_LOG(logDEBUG) << __AT__ << " called";
//get current frame number //get current frame number
uint64_t tempframenumber; uint64_t tempframenumber;
uint32_t pnum; uint32_t pnum;
@ -2772,13 +2777,13 @@ void UDPStandardImplementation::handleWithoutDataCompression(int ithread, char*
return; return;
} }
//callback to write data //callback to write data
if (cbAction < DO_EVERYTHING) if (cbAction < DO_EVERYTHING)
rawDataReadyCallBack((int)tempframenumber, wbuffer + fifoBufferHeaderSize, npackets * onePacketSize, rawDataReadyCallBack((int)tempframenumber, wbuffer + fifoBufferHeaderSize, npackets * onePacketSize,
sfilefd[ithread], latestData[ithread],pRawDataReady);//know which thread from sfilefd sfilefd[ithread], latestData[ithread],pRawDataReady);//know which thread from sfilefd
//write to file if enabled and update write parameters //write to file if enabled and update write parameters
if(npackets > 0) if(npackets > 0)
writeFileWithoutCompression(ithread, wbuffer, npackets); writeFileWithoutCompression(ithread, wbuffer, npackets);
@ -2814,6 +2819,76 @@ void UDPStandardImplementation::handleWithoutDataCompression(int ithread, char*
void UDPStandardImplementation::handleWithoutMissingPackets(int ithread, char* wbuffer, uint32_t npackets){
FILE_LOG(logDEBUG) << __AT__ << " called";
//get current frame number
uint64_t tempframenumber;
tempframenumber = (*((uint32_t*)(buffer[ithread]+8)));
cout<<"handling: frame number:"<<tempframenumber<<endl;
if (cbAction < DO_EVERYTHING)
rawDataReadyCallBack((int)tempframenumber, wbuffer, npackets * onePacketSize,
sfilefd[ithread], latestData[ithread],pRawDataReady);//know which thread from sfilefd
//write to file if enabled and update write parameters
if(npackets > 0){
if((fileWriteEnable) && (sfilefd[ithread])){
if(tempframenumber >= maxFramesPerFile)
createNewFile(ithread);
fwrite(wbuffer, 1, oneDataSize*packetsPerFrame+fifoBufferHeaderSize, sfilefd[ithread]);
}
totalPacketsInFile[ithread] += npackets;
totalWritingPacketCount[ithread] += npackets;
lastFrameNumberInFile[ithread] = tempframenumber;
currentFrameNumber[ithread] = tempframenumber;
if(numberofWriterThreads > 1)
pthread_mutex_lock(&writeMutex);
packetsCaught += npackets;
totalPacketsCaught += npackets;
if((currentFrameNumber[ithread] - startAcquisitionIndex) > acquisitionIndex)
acquisitionIndex = currentFrameNumber[ithread] - startAcquisitionIndex;
if((currentFrameNumber[ithread] - startFrameIndex) > frameIndex[ithread])
frameIndex[ithread] = currentFrameNumber[ithread] - startFrameIndex;
if(numberofWriterThreads > 1)
pthread_mutex_unlock(&writeMutex);
}
#ifdef DEBUG4
cprintf(GREEN,"Writing_Thread: Writing done\nGoing to copy frame\n");
#endif
//copy frame for gui
//if(npackets >= (packetsPerFrame/numberofListeningThreads))
if(dataStreamEnable && npackets > 0)
copyFrameToGui(ithread, wbuffer,npackets);
#ifdef DEBUG4
cprintf(GREEN,"Writing_Thread: Copied frame\n");
#endif
//free fifo addresses
int listenfifoThread = ithread;
if(dataCompressionEnable)
listenfifoThread = 0;
while(!fifoFree[listenfifoThread]->push(wbuffer));
#ifdef EVERYFIFODEBUG
if(fifoFree[listenfifoThread]->getSemValue()<100)
cprintf(GREEN,"FifoFree[%d]: value:%d, push 0x%x\n",listenfifoThread,fifoFree[listenfifoThread]->getSemValue(),(void*)(wbuffer));
#endif
#ifdef DEBUG5
cprintf(GREEN,"Writing_Thread %d: Freed buffer, pushed into fifofree %p for listener %d \n",listenfifoThread, (void*)(wbuffer), listenfifoThread);
#endif
}
void UDPStandardImplementation::writeFileWithoutCompression(int ithread, char* wbuffer,uint32_t numpackets){ void UDPStandardImplementation::writeFileWithoutCompression(int ithread, char* wbuffer,uint32_t numpackets){
FILE_LOG(logDEBUG) << __AT__ << " called"; FILE_LOG(logDEBUG) << __AT__ << " called";