in between

This commit is contained in:
Dhanya Maliakal 2016-08-24 16:23:43 +02:00
parent f17a2ba2b8
commit e9b7a11cf6
2 changed files with 122 additions and 144 deletions

View File

@ -348,12 +348,11 @@ private:
* Also copies carryovers from previous frame in front of buffer (gotthard and moench)
* For eiger, it ignores packets less than onePacketSize
* @param ithread listening thread index
* @param lSize number of bytes to listen to
* @param cSize number of bytes carried on from previous buffer
* @param temp temporary storage of previous buffer
* @return the number of bytes actually received
*/
int prepareAndListenBuffer(int ithread, int lSize, int cSize, char* temp);
int prepareAndListenBuffer(int ithread, int cSize, char* temp);
/**
* Called by startListening
@ -380,9 +379,10 @@ private:
* @param ithread listening thread index
* @param cSize number of bytes carried over to the next buffer to reunite with split frame
* @param temp temporary buffer to store the split frame
* @param rc number of bytes received
* @return packet count
*/
uint32_t processListeningBuffer(int ithread, int cSize,char* temp);
uint32_t processListeningBuffer(int ithread, int &cSize,char* temp, int rc);
/**
* Thread started which writes packets to file.
@ -415,7 +415,7 @@ private:
* @param ithread writing thread index
* @param wbuffer writing buffer popped out from FIFO
*/
void stopWriting(int ithread, char* wbuffer[]);
void stopWriting(int ithread, char* wbuffer);
/**
* Called by processWritingBuffer and processWritingBufferPacketByPacket
@ -425,7 +425,7 @@ private:
* @param wbuffer writing buffer popped out from FIFO
* @param npackets number of packets
*/
void handleWithoutDataCompression(int ithread, char* wbuffer[],uint32_t npackets);
void handleWithoutDataCompression(int ithread, char* wbuffer,uint32_t npackets);
/**
* Calle by handleWithoutDataCompression
@ -492,9 +492,6 @@ private:
#endif
//**detector parameters***
/** Size of 1 Frame including headers */
int frameSize;
/** Size of 1 buffer processed at a time */
int bufferSize;
@ -529,8 +526,8 @@ private:
/** Complete File name */
char completeFileName[MAX_NUMBER_OF_WRITER_THREADS][MAX_STR_LENGTH];
/** Maximum Packets Per File **/
int maxPacketsPerFile;
/** Maximum Frames Per File **/
int maxFramesPerFile;
/** If file created successfully for all Writer Threads */
bool fileCreateSuccess;
@ -545,7 +542,7 @@ private:
uint64_t startAcquisitionIndex;
/** Frame index at start of each real time acquisition (eg. for each scan) */
uint64_t startFrameIndex[MAX_NUMBER_OF_WRITER_THREADS];
uint64_t startFrameIndex;
/** Actual current frame index of each time acquisition (eg. for each scan) */
uint64_t frameIndex[MAX_NUMBER_OF_WRITER_THREADS];
@ -577,7 +574,7 @@ private:
uint32_t numMissingPackets[MAX_NUMBER_OF_WRITER_THREADS];
/** Total Number of Missing Packets in acquisition*/
uint32_t numTotMissingPackets[MAX_NUMBER_OF_WRITER_THREADS];
uint32_t numTotMissingPackets;
/** Number of Missing Packets in file */
uint32_t numTotMissingPacketsInFile[MAX_NUMBER_OF_WRITER_THREADS];

View File

@ -128,7 +128,6 @@ void UDPStandardImplementation::initializeMembers(){
FILE_LOG(logDEBUG) << "Info: Initializing members";
//***detector parameters***
frameSize = 0;
bufferSize = 0;
onePacketSize = 0;
oneDataSize = 0;
@ -149,18 +148,18 @@ void UDPStandardImplementation::initializeMembers(){
strcpy(fileHeader[i],"");
sfilefd[i] = NULL;
}
maxPacketsPerFile = 0;
maxFramesPerFile = 0;
fileCreateSuccess = false;
//***acquisition indices parameters***
startAcquisitionIndex = 0;
acqStarted = false;
startFrameIndex = 0;
for(int i = 0; i < MAX_NUMBER_OF_LISTENING_THREADS; ++i){
startFrameIndex[i] = 0;
measurementStarted[i] = false;
totalListeningFrameCount[i] = 0;
}
numTotMissingPackets = 0;
for(int i=0; i<MAX_NUMBER_OF_WRITER_THREADS; i++){
frameIndex[i] = 0;
currentFrameNumber[i] = 0;
@ -168,7 +167,6 @@ void UDPStandardImplementation::initializeMembers(){
lastFrameIndex[i] = 0;
packetsInFile[i] = 0;
numMissingPackets[i] = 0;
numTotMissingPackets[i] = 0;
numTotMissingPacketsInFile[i] = 0;
}
@ -304,10 +302,7 @@ int UDPStandardImplementation::setupFifoStructure(){
// fifo depth
uint32_t oldFifoSize = fifoSize;
if(myDetectorType == EIGER)
fifoSize = fifoDepth * packetsPerFrame;//listens to 1 packet at a time and size depends on packetsperframe
else
fifoSize = fifoDepth;
fifoSize = fifoDepth;
//reduce fifo depth if > 1 numberofJobsPerBuffer
if(fifoSize % numberofJobsPerBuffer)
@ -338,10 +333,8 @@ int UDPStandardImplementation::setupFifoStructure(){
fifoFree[i]->pop(buffer[i]);
//cprintf(BLUE,"FifoFree[%d]: value:%d, pop 0x%x\n",i,fifoFree[i]->getSemValue(),(void*)(buffer[i]));
}
#ifdef DEBUG5
cprintf(BLUE,"Info: %d fifostructure popped from fifofree %p\n", i, (void*)(buffer[i]));
#endif
delete fifoFree[i];
fifoFree[i] = NULL;
}
if(fifo[i]){
while(!fifo[i]->isEmpty()){
@ -349,8 +342,12 @@ int UDPStandardImplementation::setupFifoStructure(){
//cprintf(CYAN,"Fifo[%d]: value:%d, pop 0x%x\n",i,fifo[i]->getSemValue(),(void*)(buffer[i]));
}
delete fifo[i];
fifo[i] = NULL;
}
if(mem0[i]){
free(mem0[i]);
mem0[i] = NULL;
}
if(mem0[i]) free(mem0[i]);
//creating
fifoFree[i] = new CircularFifo<char>(fifoSize);
@ -469,22 +466,20 @@ void UDPStandardImplementation::setShortFrameEnable(const int i){
shortFrameEnable = i;
if(shortFrameEnable!=-1){
frameSize = GOTTHARD_SHORT_BUFFER_SIZE;
bufferSize = GOTTHARD_SHORT_BUFFER_SIZE;
onePacketSize = GOTTHARD_SHORT_BUFFER_SIZE;
oneDataSize = GOTTHARD_SHORT_DATABYTES;
maxPacketsPerFile = SHORT_MAX_FRAMES_PER_FILE * GOTTHARD_SHORT_PACKETS_PER_FRAME;
maxFramesPerFile = SHORT_MAX_FRAMES_PER_FILE;
packetsPerFrame = GOTTHARD_SHORT_PACKETS_PER_FRAME;
frameIndexMask = GOTTHARD_SHORT_FRAME_INDEX_MASK;
frameIndexOffset = GOTTHARD_SHORT_FRAME_INDEX_OFFSET;
packetIndexMask = GOTTHARD_SHORT_PACKET_INDEX_MASK;
}else{
frameSize = GOTTHARD_BUFFER_SIZE;
bufferSize = GOTTHARD_BUFFER_SIZE;
onePacketSize = GOTTHARD_ONE_PACKET_SIZE;
oneDataSize = GOTTHARD_ONE_DATA_SIZE;
maxPacketsPerFile = MAX_FRAMES_PER_FILE * GOTTHARD_PACKETS_PER_FRAME;
maxFramesPerFile = MAX_FRAMES_PER_FILE;
packetsPerFrame = GOTTHARD_PACKETS_PER_FRAME;
frameIndexMask = GOTTHARD_FRAME_INDEX_MASK;
frameIndexOffset = GOTTHARD_FRAME_INDEX_OFFSET;
@ -517,9 +512,8 @@ int UDPStandardImplementation::setAcquisitionPeriod(const uint64_t i){
FILE_LOG(logDEBUG) << __AT__ << " called";
acquisitionPeriod = i;
if((myDetectorType == GOTTHARD) && (myDetectorType == MOENCH))
if(setupFifoStructure() == FAIL)
return FAIL;
if(setupFifoStructure() == FAIL)
return FAIL;
FILE_LOG(logINFO) << "Acquisition Period: " << (double)acquisitionPeriod/(1E9) << "s";
@ -531,9 +525,8 @@ int UDPStandardImplementation::setNumberOfFrames(const uint64_t i){
FILE_LOG(logDEBUG) << __AT__ << " called";
numberOfFrames = i;
if((myDetectorType == GOTTHARD) && (myDetectorType == MOENCH))
if(setupFifoStructure() == FAIL)
return FAIL;
if(setupFifoStructure() == FAIL)
return FAIL;
FILE_LOG(logINFO) << "Number of Frames:" << numberOfFrames;
@ -551,12 +544,11 @@ int UDPStandardImplementation::setDynamicRange(const uint32_t i){
if(myDetectorType == EIGER){
//set parameters depending on new dynamic range.
packetsPerFrame = (tengigaEnable ? EIGER_TEN_GIGA_CONSTANT : EIGER_ONE_GIGA_CONSTANT)
* dynamicRange * EIGER_MAX_PORTS;
frameSize = onePacketSize * packetsPerFrame;
maxPacketsPerFile = EIGER_MAX_FRAMES_PER_FILE * packetsPerFrame;
packetsPerFrame = (tengigaEnable ? EIGER_TEN_GIGA_CONSTANT : EIGER_ONE_GIGA_CONSTANT) * dynamicRange;
bufferSize = onePacketSize * packetsPerFrame;
updateFileHeader();
for(int i=0; i<MAX_NUMBER_OF_WRITER_THREADS; i++)
updateFileHeader(i);
//new dynamic range, then restart threads and resetup fifo structure
if(oldDynamicRange != dynamicRange){
@ -564,7 +556,7 @@ int UDPStandardImplementation::setDynamicRange(const uint32_t i){
//gui buffer
for(int i=0;i<numberofWriterThreads;i++){
if(latestData[i]){delete[] latestData[i]; latestData[i] = NULL;}
latestData[i] = new char[frameSize];
latestData[i] = new char[bufferSize];
}
//restructure fifo
numberofJobsPerBuffer = -1;
@ -601,20 +593,16 @@ int UDPStandardImplementation::setTenGigaEnable(const bool b){
onePacketSize = EIGER_ONE_GIGA_ONE_PACKET_SIZE;
oneDataSize = EIGER_ONE_GIGA_ONE_DATA_SIZE;
}
frameSize = onePacketSize * packetsPerFrame;
bufferSize = onePacketSize;
maxPacketsPerFile = EIGER_MAX_FRAMES_PER_FILE * packetsPerFrame;
bufferSize = onePacketSize * packetsPerFrame;
footerOffset = EIGER_PACKET_HEADER_SIZE + oneDataSize;
FILE_LOG(logDEBUG) << dec <<
"packetsPerFrame:" << packetsPerFrame <<
"\nonePacketSize:" << onePacketSize <<
"\noneDataSize:" << oneDataSize <<
"\nframesize:" << frameSize <<
"\nbufferSize:" << bufferSize <<
"\nmaxPacketsPerFile:" << maxPacketsPerFile;
"\nbufferSize:" << bufferSize;
updateFileHeader();
for(int i=0; i<MAX_NUMBER_OF_WRITER_THREADS; i++)
updateFileHeader(i);
//new enable, then restart threads and resetup fifo structure
if(oldTenGigaEnable != tengigaEnable){
@ -622,7 +610,7 @@ int UDPStandardImplementation::setTenGigaEnable(const bool b){
//gui buffer
for(int i=0;i<numberofWriterThreads;i++){
if(latestData[i]){delete[] latestData[i]; latestData[i] = NULL;}
latestData[i] = new char[frameSize];
latestData[i] = new char[bufferSize];
}
//restructure fifo
@ -690,12 +678,11 @@ int UDPStandardImplementation::setDetectorType(const detectorType d){
packetsPerFrame = GOTTHARD_PACKETS_PER_FRAME;
onePacketSize = GOTTHARD_ONE_PACKET_SIZE;
oneDataSize = GOTTHARD_ONE_DATA_SIZE;
frameSize = GOTTHARD_BUFFER_SIZE;
bufferSize = GOTTHARD_BUFFER_SIZE;
frameIndexMask = GOTTHARD_FRAME_INDEX_MASK;
frameIndexOffset = GOTTHARD_FRAME_INDEX_OFFSET;
packetIndexMask = GOTTHARD_PACKET_INDEX_MASK;
maxPacketsPerFile = MAX_FRAMES_PER_FILE * GOTTHARD_PACKETS_PER_FRAME;
maxFramesPerFile = MAX_FRAMES_PER_FILE;
fifoSize = GOTTHARD_FIFO_SIZE;
fifoDepth = GOTTHARD_FIFO_SIZE;
//footerOffset = Not applicable;
@ -704,12 +691,11 @@ int UDPStandardImplementation::setDetectorType(const detectorType d){
packetsPerFrame = PROPIX_PACKETS_PER_FRAME;
onePacketSize = PROPIX_ONE_PACKET_SIZE;
//oneDataSize = Not applicable;
frameSize = PROPIX_BUFFER_SIZE;
bufferSize = PROPIX_BUFFER_SIZE;
frameIndexMask = PROPIX_FRAME_INDEX_MASK;
frameIndexOffset = PROPIX_FRAME_INDEX_OFFSET;
packetIndexMask = PROPIX_PACKET_INDEX_MASK;
maxPacketsPerFile = MAX_FRAMES_PER_FILE * PROPIX_PACKETS_PER_FRAME;
maxFramesPerFile = MAX_FRAMES_PER_FILE;
fifoSize = PROPIX_FIFO_SIZE;
fifoDepth = PROPIX_FIFO_SIZE;
//footerOffset = Not applicable;
@ -718,27 +704,25 @@ int UDPStandardImplementation::setDetectorType(const detectorType d){
packetsPerFrame = MOENCH_PACKETS_PER_FRAME;
onePacketSize = MOENCH_ONE_PACKET_SIZE;
oneDataSize = MOENCH_ONE_DATA_SIZE;
frameSize = MOENCH_BUFFER_SIZE;
bufferSize = MOENCH_BUFFER_SIZE;
frameIndexMask = MOENCH_FRAME_INDEX_MASK;
frameIndexOffset = MOENCH_FRAME_INDEX_OFFSET;
packetIndexMask = MOENCH_PACKET_INDEX_MASK;
maxPacketsPerFile = MOENCH_MAX_FRAMES_PER_FILE * MOENCH_PACKETS_PER_FRAME;
maxFramesPerFile = MOENCH_MAX_FRAMES_PER_FILE;
fifoSize = MOENCH_FIFO_SIZE;
fifoDepth = MOENCH_FIFO_SIZE;
//footerOffset = Not applicable;
break;
case EIGER:
//assuming 1G in the beginning
packetsPerFrame = EIGER_ONE_GIGA_CONSTANT * dynamicRange * EIGER_MAX_PORTS;
packetsPerFrame = EIGER_ONE_GIGA_CONSTANT * dynamicRange;
onePacketSize = EIGER_ONE_GIGA_ONE_PACKET_SIZE;
oneDataSize = EIGER_ONE_GIGA_ONE_DATA_SIZE;
frameSize = onePacketSize * packetsPerFrame;
bufferSize = onePacketSize;
bufferSize = onePacketSize * packetsPerFrame;
frameIndexMask = EIGER_FRAME_INDEX_MASK;
frameIndexOffset = EIGER_FRAME_INDEX_OFFSET;
packetIndexMask = EIGER_PACKET_INDEX_MASK;
maxPacketsPerFile = EIGER_MAX_FRAMES_PER_FILE * packetsPerFrame;
maxFramesPerFile = EIGER_MAX_FRAMES_PER_FILE;
fifoSize = EIGER_FIFO_SIZE;
fifoDepth = EIGER_FIFO_SIZE;
footerOffset = EIGER_PACKET_HEADER_SIZE + oneDataSize;
@ -747,12 +731,11 @@ int UDPStandardImplementation::setDetectorType(const detectorType d){
packetsPerFrame = JCTB_PACKETS_PER_FRAME;
onePacketSize = JCTB_ONE_PACKET_SIZE;
//oneDataSize = Not applicable;
frameSize = JCTB_BUFFER_SIZE;
bufferSize = JCTB_BUFFER_SIZE;
frameIndexMask = JCTB_FRAME_INDEX_MASK;
frameIndexOffset = JCTB_FRAME_INDEX_OFFSET;
packetIndexMask = JCTB_PACKET_INDEX_MASK;
maxPacketsPerFile = JFCTB_MAX_FRAMES_PER_FILE * JCTB_PACKETS_PER_FRAME;
maxFramesPerFile = JFCTB_MAX_FRAMES_PER_FILE;
fifoSize = JCTB_FIFO_SIZE;
fifoDepth = JCTB_FIFO_SIZE;
//footerOffset = Not applicable;
@ -761,12 +744,11 @@ int UDPStandardImplementation::setDetectorType(const detectorType d){
packetsPerFrame = JFRAU_PACKETS_PER_FRAME;
onePacketSize = JFRAU_ONE_PACKET_SIZE;
oneDataSize = JFRAU_DATA_BYTES;
frameSize = JFRAU_BUFFER_SIZE;
bufferSize = JFRAU_BUFFER_SIZE;
frameIndexMask = JFRAU_FRAME_INDEX_MASK;
frameIndexOffset = JFRAU_FRAME_INDEX_OFFSET;
packetIndexMask = JFRAU_PACKET_INDEX_MASK;
maxPacketsPerFile = JFRAU_MAX_FRAMES_PER_FILE * JFRAU_PACKETS_PER_FRAME;
maxFramesPerFile = JFRAU_MAX_FRAMES_PER_FILE;
fifoDepth = JFRAU_FIFO_SIZE;
fifoSize = JFRAU_FIFO_SIZE;
//footerOffset = Not applicable;
@ -798,7 +780,7 @@ int UDPStandardImplementation::setDetectorType(const detectorType d){
for(int i=0; i<MAX_NUMBER_OF_WRITER_THREADS; i++){
if(latestData[i]) {delete[] latestData; latestData = NULL;}
guiData[i] = NULL;
latestData[i] = new char[frameSize];
latestData[i] = new char[bufferSize];
}
@ -849,16 +831,17 @@ int UDPStandardImplementation::startReceiver(char *c){
}
//for every measurement
pthread_mutex_lock(&progressMutex);
startFrameIndex = 0;
pthread_mutex_unlock(&progressMutex);
for(int i=0;i<numberofListeningThreads;i++){
startFrameIndex[i] = 0;
measurementStarted[i] = false;
totalListeningFrameCount[i] = 0;
}
numTotMissingPackets = 0;
for(int i=0;i<numberofWriterThreads;i++){
frameIndex[i] = 0;
numMissingPackets[i] = 0;
numTotMissingPackets[i] = 0;
numTotMissingPacketsInFile[i] = 0;
//reset file parameters
packetsInFile[i] = 0;
@ -886,11 +869,10 @@ int UDPStandardImplementation::startReceiver(char *c){
//Print Receiver Configuration
if(myDetectorType != EIGER){
if(myDetectorType != EIGER)
FILE_LOG(logINFO) << "Data Compression has been " << stringEnable(dataCompressionEnable);
FILE_LOG(logINFO) << "Number of Jobs Per Buffer: " << numberofJobsPerBuffer;
FILE_LOG(logINFO) << "Max Packets Per File:" << maxPacketsPerFile;
}
FILE_LOG(logINFO) << "Number of Jobs Per Buffer: " << numberofJobsPerBuffer;
FILE_LOG(logINFO) << "Max Frames Per File:" << maxFramesPerFile;
if(FrameToGuiFrequency)
FILE_LOG(logINFO) << "Frequency of frames sent to gui: " << FrameToGuiFrequency;
else
@ -1066,7 +1048,7 @@ void UDPStandardImplementation::readFrame(int ithread, char* c,char** raw, uint6
//copy data and filename
strcpy(c,guiFileName[ithread]);
startAcq = startAcquisitionIndex;
startFrame = startFrameIndex[ithread];
startFrame = startFrameIndex;
//gui data not copied yet
@ -1121,7 +1103,7 @@ void UDPStandardImplementation::closeFile(int ithread){
#if (defined(MYROOT1) && defined(ALLFILE_DEBUG)) || !defined(MYROOT1)
if(sfilefd[0]){
#ifdef DEBUG4
FILE_LOG(logDEBUG4) << "sfield: " << (int)sfilefd[i];
FILE_LOG(logDEBUG4) << "sfilefd: " << (int)sfilefd[i];
#endif
fclose(sfilefd[0]);
sfilefd[0] = NULL;
@ -1453,7 +1435,7 @@ int UDPStandardImplementation::createNewFile(int ithread){
}else{
//Assumption for startFrameindex usign ithread: datacompression never enters here and therefore is always same number of listening and writing threads to use ithread
if (previousFrameNumber[ithread] == -1)
previousFrameNumber[ithread] = startFrameIndex[ithread]-1;
previousFrameNumber[ithread] = startFrameIndex-1;
cout << completeFileName[ithread]
<< "\tPacket Loss: " << setw(4)<<fixed << setprecision(4) << dec <<
@ -1545,27 +1527,25 @@ void UDPStandardImplementation::startListening(){
threadStarted = 1;
//variable definitions
int listenSize = 0; //listen to only 1 packet
uint32_t rc; //size of buffer received in bytes
//split frames
//split frames for data compression
int carryonBufferSize; //from previous buffer to keep frames together in a buffer
char* tempBuffer = NULL; //temporary buffer to store split frames
/* outer loop - loops once for each acquisition */
//infinite loop, exited only to change dynamic range, 10G parameters etc (then recreated again)
while(true){
//reset parameters before acquisition
//compression variables reset before acquisition
carryonBufferSize = 0;
if(myDetectorType != EIGER){
listenSize = bufferSize * numberofJobsPerBuffer; //listen to more than 1 packet
if(dataCompressionEnable){
if(tempBuffer!=NULL){delete []tempBuffer;tempBuffer=NULL;}
tempBuffer = new char[onePacketSize * (packetsPerFrame - 1)]; //store maximum of 1 packets less in a frame
}
/* inner loop - loop for each buffer */
//until mask unset (udp sockets shut down by client)
//until mask reset (udp sockets shut down by client)
while((1 << ithread) & listeningThreadsMask){
@ -1583,14 +1563,20 @@ void UDPStandardImplementation::startListening(){
cprintf(YELLOW,"Listening_Thread %d :Listener popped from fifofree %p\n", ithread, (void*)(buffer[ithread]));
#endif
pthread_mutex_lock(&udpSocketMutex[ithread]);
//udpsocket doesnt exist
if(udpSocket[ithread] == NULL){
FILE_LOG(logERROR) << "Listening_Thread " << ithread << ": UDP Socket not created or shut down earlier";
stopListening(ithread,0);
pthread_mutex_unlock(&udpSocketMutex[ithread]);
continue;
}
rc = prepareAndListenBuffer(ithread, listenSize, carryonBufferSize, tempBuffer);
rc = prepareAndListenBuffer(ithread, carryonBufferSize, tempBuffer);
carryonBufferSize = 0;
pthread_mutex_unlock(&udpSocketMutex[ithread]);
//start indices for each start of scan/acquisition
if((!measurementStarted) && (rc > 0))
@ -1603,10 +1589,10 @@ void UDPStandardImplementation::startListening(){
//write packet count to buffer
if(myDetectorType == EIGER)
(*((uint32_t*)(buffer[ithread]))) = 1;
//handling split frames and writing packet Count to buffer
else
(*((uint32_t*)(buffer[ithread]))) = processListeningBuffer(ithread, carryonBufferSize, tempBuffer);
(*((uint32_t*)(buffer[ithread]))) = rc/onePacketSize;
if(dataCompressionEnable)
(*((uint32_t*)(buffer[ithread]))) = processListeningBuffer(ithread, carryonBufferSize, tempBuffer, rc);
//push buffer to FIFO
@ -1645,24 +1631,17 @@ void UDPStandardImplementation::startListening(){
int UDPStandardImplementation::prepareAndListenBuffer(int ithread, int lSize, int cSize, char* temp){
int UDPStandardImplementation::prepareAndListenBuffer(int ithread, int cSize, char* temp){
FILE_LOG(logDEBUG) << __AT__ << " called";
//listen to UDP packets
if(cSize)
memcpy(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS, temp, cSize);
//carry over from previous buffer
if(cSize) memcpy(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS, temp, cSize);
pthread_mutex_lock(&udpSocketMutex[ithread]);
int receivedSize = udpSocket[ithread]->ReceiveDataOnly(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS + cSize, lSize + cSize);
int receivedSize = udpSocket[ithread]->ReceiveDataOnly(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS + cSize, (bufferSize * numberofJobsPerBuffer) - cSize);
//throw away packets that is not one packet size
while(myDetectorType == EIGER && receivedSize != onePacketSize) {
//need to check status if socket is shut down
if(status == TRANSMITTING)
break;
//print
if(receivedSize != EIGER_HEADER_LENGTH){
if(receivedSize != EIGER_HEADER_LENGTH)
cprintf(RED,"Listening_Thread %d: Listened to a weird packet size %d\n",ithread, receivedSize);
}
#ifdef DEBUG
else
cprintf(BLUE,"Listening_Thread %d: Listened to a header packet\n",ithread);
@ -1670,7 +1649,6 @@ int UDPStandardImplementation::prepareAndListenBuffer(int ithread, int lSize, in
//listen again
receivedSize = udpSocket[ithread]->ReceiveDataOnly(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS);
}
pthread_mutex_unlock(&udpSocketMutex[ithread]);
totalListeningFrameCount[ithread] += (receivedSize/onePacketSize);
@ -1696,8 +1674,6 @@ int UDPStandardImplementation::prepareAndListenBuffer(int ithread, int lSize, in
}
}
#endif
#ifdef DEBUG
cprintf(BLUE, "Listening_Thread %d : Received bytes: %d. Expected bytes: %d\n", ithread, receivedSize, bufferSize * numberofJobsPerBuffer-cSize);
#endif
@ -1716,18 +1692,18 @@ void UDPStandardImplementation::startFrameIndices(int ithread){
jfrau_packet_header_t* header=0;
switch(myDetectorType){
case EIGER:
startFrameIndex[ithread] = 0; //frame number always resets
startFrameIndex = 0; //frame number always resets
break;
case JUNGFRAU:
header = (jfrau_packet_header_t*)(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS);
startFrameIndex[ithread] = (*( (uint32_t*) header->frameNumber))&0xffffff;
startFrameIndex = (*( (uint32_t*) header->frameNumber))&0xffffff;
break;
default:
if(shortFrameEnable < 0){
startFrameIndex[ithread] = (((((uint32_t)(*((uint32_t*)(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS))))+1)
startFrameIndex = (((((uint32_t)(*((uint32_t*)(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS))))+1)
& (frameIndexMask)) >> frameIndexOffset);
}else{
startFrameIndex[ithread] = ((((uint32_t)(*((uint32_t*)(buffer[ithread]+HEADER_SIZE_NUM_TOT_PACKETS))))
startFrameIndex = ((((uint32_t)(*((uint32_t*)(buffer[ithread]+HEADER_SIZE_NUM_TOT_PACKETS))))
& (frameIndexMask)) >> frameIndexOffset);
}
break;
@ -1736,14 +1712,14 @@ void UDPStandardImplementation::startFrameIndices(int ithread){
//start of entire acquisition
if(!acqStarted){
pthread_mutex_lock(&progressMutex);
startAcquisitionIndex = startFrameIndex[ithread];
startAcquisitionIndex = startFrameIndex;
acqStarted = true;
pthread_mutex_unlock(&progressMutex);
cprintf(BLUE,"Listening_Thread %d: startAcquisitionIndex:%lld\n",ithread,(long long int)startAcquisitionIndex);
}
//set start of scan/real time measurement
cprintf(BLUE,"Listening_Thread %d: startFrameIndex: %lld\n", ithread,(long long int)startFrameIndex[ithread]);
cprintf(BLUE,"Listening_Thread %d: startFrameIndex: %lld\n", ithread,(long long int)startFrameIndex);
measurementStarted[ithread] = true;
}
@ -1863,13 +1839,13 @@ void UDPStandardImplementation::stopListening(int ithread, int numbytes){
uint32_t UDPStandardImplementation::processListeningBuffer(int ithread, int cSize, char* temp){
uint32_t UDPStandardImplementation::processListeningBuffer(int ithread, int &cSize, char* temp, int rc){
FILE_LOG(logDEBUG) << __AT__ << " called";
int lastPacketOffset; //the offset of the last packet
uint32_t lastFrameHeader; //frame number of last packet in buffer
uint64_t lastFrameHeader64; //frame number of last packet in buffer
uint32_t packetCount = (packetsPerFrame/numberofListeningThreads) * numberofJobsPerBuffer; //packets received
uint32_t packetCount = rc;//(packetsPerFrame/numberofListeningThreads) * numberofJobsPerBuffer; //packets received
cSize = 0; //reset size
jfrau_packet_header_t* header;
@ -2012,9 +1988,10 @@ void UDPStandardImplementation::processWritingBuffer(int ithread){
FILE_LOG(logDEBUG) << __AT__ << " called";
//variable definitions
char* wbuf[numberofListeningThreads]; //buffer popped from FIFO
sfilefd = NULL; //file pointer
uint64_t nf; //for compression, number of frames
char* wbuf; //buffer popped from FIFO
sfilefd[ithread] = NULL; //file pointer
uint64_t nf; //for compression, number of frames
int listenfifoIndex = ithread;
/* outer loop - loops once for each acquisition */
@ -2023,31 +2000,33 @@ void UDPStandardImplementation::processWritingBuffer(int ithread){
//--reset parameters before acquisition
nf = 0;
guiData = latestData; //so that the first frame is always copied
guiData[ithread] = latestData[ithread]; //so that the first frame is always copied
if(dataCompressionEnable)
listenfifoIndex = 0; //compression has only one listening thread
/* inner loop - loop for each buffer */
//until mask unset (udp sockets shut down by client)
while((1 << ithread) & writerThreadsMask){
//pop
fifo[0]->pop(wbuf[0]);
fifo[listenfifoIndex]->pop(wbuf);
#ifdef EVERYFIFODEBUG
if(fifo[0]->getSemValue()>(fifoSize-100))
cprintf(CYAN,"Fifo[%d]: value:%d, pop 0x%x\n",0,fifo[0]->getSemValue(),(void*)(wbuf[0]));
if(fifo[listenfifoIndex]->getSemValue()>(fifoSize-100))
cprintf(CYAN,"Fifo[%d]: value:%d, pop 0x%x\n",listenfifoIndex,fifo[listenfifoIndex]->getSemValue(),(void*)(wbuf));
#endif
#ifdef DEBUG5
cprintf(GREEN,"Writing_Thread %d: Popped %p from FIFO %d\n", ithread, (void*)(wbuf[0]),0);
#endif
uint32_t numPackets = (uint32_t)(*((uint32_t*)wbuf[0]));
#ifdef DEBUG4
cprintf(GREEN,"Writing_Thread %d: Number of Packets: %d for FIFO %d\n", ithread, numPackets, 0);
cprintf(GREEN,"Writing_Thread %d: Popped %p from FIFO %d\n", ithread, (void*)(wbuf),listenfifoIndex);
#endif
uint32_t numPackets = (uint32_t)(*((uint32_t*)wbuf));
#ifdef DEBUG4
cprintf(GREEN,"Writing_Thread %d: Number of Packets: %d for FIFO %d\n", ithread, numPackets, listenfifoIndex);
#endif
//end of acquisition
if(numPackets == dummyPacketValue){
#ifdef DEBUG3
cprintf(GREEN,"Writing_Thread %d: Dummy frame popped out of FIFO %d",ithread, 0);
#ifdef DEBUG4
cprintf(GREEN,"Writing_Thread %d: Dummy frame popped out of FIFO %d",ithread, listenfifoIndex);
#endif
stopWriting(ithread,wbuf);
continue;
@ -2055,9 +2034,11 @@ void UDPStandardImplementation::processWritingBuffer(int ithread){
//process
//normal
if(!dataCompressionEnable)
handleWithoutDataCompression(ithread, wbuf, numPackets);
//compression
else{
#if defined(MYROOT1) && defined(ALLFILE_DEBUG)
if(npackets > 0)
@ -2145,12 +2126,13 @@ void UDPStandardImplementation::processWritingBufferPacketByPacket(int ithread){
#endif
}
delete fifoTempFree[i];
fifoTempFree[i] = NULL;
}
fifoTempFree[i] = new CircularFifo<char>(MAX_NUM_PACKETS);
}
for(uint32_t i=0; i<packetsPerFrame; ++i){
if(blankframe[i]){delete [] blankframe[i]; blankframe[i] = 0;}
if(blankframe[i]){delete [] blankframe[i]; blankframe[i] = NULL;}
blankframe[i] = new char[onePacketSize];
//set missing packet to 0xff
eiger_packet_header_t* blankframe_header = (eiger_packet_header_t*) blankframe[i];
@ -2223,7 +2205,7 @@ void UDPStandardImplementation::processWritingBufferPacketByPacket(int ithread){
threadFrameNumber[i] = (uint32_t)(*( (uint64_t*) packetBuffer_footer));
//last frame read out
lastFrameIndex = threadFrameNumber[i];
threadFrameNumber[i] += (startFrameIndex[ithread] - 1);
threadFrameNumber[i] += (startFrameIndex - 1);
//packet number
currentPacketNumber[i] = *( (uint16_t*) packetBuffer_footer->packetNumber);
@ -2338,7 +2320,7 @@ void UDPStandardImplementation::processWritingBufferPacketByPacket(int ithread){
if(fullframe[0] && fullframe[1]){
currentFrameNumber = presentFrameNumber;
numTotMissingPacketsInFile += numMissingPackets;
numTotMissingPackets += numMissingPackets;
numTotMissingPackets += numMissingPackets;/**requires a lock*/
/*
cprintf(CYAN,"**framenum:%lld\n ",(long long int)currentFrameNumber);
@ -2379,7 +2361,7 @@ void UDPStandardImplementation::processWritingBufferPacketByPacket(int ithread){
//ensuring last packet got is not of some other future frame but of the current one
eiger_packet_footer_t* wbuf_footer1 = (eiger_packet_footer_t*)(packetBuffer[i] + footerOffset + HEADER_SIZE_NUM_TOT_PACKETS);
uint64_t packfnum = (((uint32_t)(*( (uint64_t*) wbuf_footer1)))+(startFrameIndex[ithread] - 1));
uint64_t packfnum = (((uint32_t)(*( (uint64_t*) wbuf_footer1)))+(startFrameIndex - 1));
//to reset to get new frame: not dummy and the last packet
if((numPackets[i] != dummyPacketValue) && (currentPacketNumber[i] == LAST_PACKET_VALUE) && (packfnum == currentFrameNumber) )
@ -2592,25 +2574,24 @@ bool UDPStandardImplementation::popAndCheckEndofAcquisition(int ithread, char* w
void UDPStandardImplementation::stopWriting(int ithread, char* wbuffer[]){
void UDPStandardImplementation::stopWriting(int ithread, char* wbuffer){
FILE_LOG(logDEBUG) << __AT__ << " called";
FILE_LOG(logINFO) << "Writing "<< ithread << ": End of Acquisition";
//free fifo
for(int i=0; i<numberofListeningThreads; ++i){
while(!fifoFree[i]->push(wbuffer[i]));
while(!fifoFree[ithread]->push(wbuffer));
#ifdef EVERYFIFODEBUG
if(fifoFree[i]->getSemValue()<100)
cprintf(GREEN,"FifoFree[%d]: value:%d, push 0x%x\n",i,fifoFree[i]->getSemValue(),(void*)(wbuffer[i]));
if(fifoFree[ithread]->getSemValue()<100)
cprintf(GREEN,"FifoFree[%d]: value:%d, push 0x%x\n",ithread,fifoFree[ithread]->getSemValue(),(void*)(wbuffer));
#endif
#ifdef CFIFODEBUG
if(i==0)
cprintf(CYAN,"Writing_Thread %d: Freeing dummy-end buffer. Pushed into fifofree %p for listener %d\n", ithread,(void*)(wbuffer[i]),i);
else
cprintf(YELLOW,"Writing_Thread %d: Freeing dummy-end buffer. Pushed into fifofree %p for listener %d\n", ithread,(void*)(wbuffer[i]),i);
if(ithread==0)
cprintf(CYAN,"Writing_Thread %d: Freeing dummy-end buffer. Pushed into fifofree %p for listener %d\n", ithread,(void*)(wbuffer),ithread);
else
cprintf(YELLOW,"Writing_Thread %d: Freeing dummy-end buffer. Pushed into fifofree %p for listener %d\n", ithread,(void*)(wbuffer),ithread);
#endif
}
//all threads need to close file, reset mask and exit loop
closeFile(ithread);
@ -2644,7 +2625,7 @@ void UDPStandardImplementation::stopWriting(int ithread, char* wbuffer[]){
//statistics
FILE_LOG(logINFO) << "Status: Run Finished";
FILE_LOG(logINFO) << "Last Frame Number Caught:" << lastFrameIndex;
FILE_LOG(logINFO) << "Last Frame Number Caught:" << lastFrameIndex[ithread];
if(totalPacketsCaught < ((uint64_t)numberOfFrames*packetsPerFrame)){
cprintf(RED, "Total Missing Packets padded: %d\n",numTotMissingPackets);
cprintf(RED, "Total Packets Caught: %lld\n",(long long int)totalPacketsCaught);
@ -2663,7 +2644,7 @@ void UDPStandardImplementation::stopWriting(int ithread, char* wbuffer[]){
void UDPStandardImplementation::handleWithoutDataCompression(int ithread, char* wbuffer[],uint32_t npackets){
void UDPStandardImplementation::handleWithoutDataCompression(int ithread, char* wbuffer,uint32_t npackets){
FILE_LOG(logDEBUG) << __AT__ << " called";
@ -2682,7 +2663,7 @@ void UDPStandardImplementation::handleWithoutDataCompression(int ithread, char*
}
//set indices
acquisitionIndex = currentFrameNumber - startAcquisitionIndex;
frameIndex = currentFrameNumber - startFrameIndex[ithread];
frameIndex = currentFrameNumber - startFrameIndex;
}
@ -2774,7 +2755,7 @@ void UDPStandardImplementation::writeFileWithoutCompression(int ithread, char* w
//set indices
acquisitionIndex = currentFrameNumber - startAcquisitionIndex;
frameIndex = currentFrameNumber - startFrameIndex[ithread];
frameIndex = currentFrameNumber - startFrameIndex;
}
#ifdef DEBUG3
cprintf(GREEN,"Writing_Thread: Current Frame Number:%d\n",currentFrameNumber);
@ -3065,7 +3046,7 @@ void UDPStandardImplementation::handleDataCompression(int ithread, char* wbuffer
pthread_mutex_unlock(&progressMutex);
//set indices
acquisitionIndex = currentFrameNumber - startAcquisitionIndex;
frameIndex = currentFrameNumber - startFrameIndex[0];
frameIndex = currentFrameNumber - startFrameIndex;
//variable definitions