mirror of
https://github.com/slsdetectorgroup/slsDetectorPackage.git
synced 2025-06-13 05:17:13 +02:00
vg
This commit is contained in:
@ -291,11 +291,19 @@ private:
|
|||||||
int setupWriter();
|
int setupWriter();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates new file
|
* Creates new file and reset some parameters
|
||||||
* @return OK or FAIL
|
* @return OK or FAIL
|
||||||
*/
|
*/
|
||||||
int createNewFile();
|
int createNewFile();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates new tree and file for compression
|
||||||
|
* @param ithread thread number
|
||||||
|
* @param iframe frame number
|
||||||
|
* @return OK or FAIL
|
||||||
|
*/
|
||||||
|
int createCompressionFile(int ithread, int iframe);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Static function - Starts Listening Thread of this object
|
* Static function - Starts Listening Thread of this object
|
||||||
* @param this_pointer pointer to this object
|
* @param this_pointer pointer to this object
|
||||||
@ -370,7 +378,7 @@ private:
|
|||||||
void startWriting();
|
void startWriting();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Called by StartWriting
|
* Called by processWritingBuffer and processWritingBufferPacketByPacket
|
||||||
* Pops buffer from all the FIFOs and checks for dummy frames and end of acquisition
|
* Pops buffer from all the FIFOs and checks for dummy frames and end of acquisition
|
||||||
* @param ithread current thread index
|
* @param ithread current thread index
|
||||||
* @param wbuffer the buffer array that is popped from all the FIFOs
|
* @param wbuffer the buffer array that is popped from all the FIFOs
|
||||||
@ -383,7 +391,7 @@ private:
|
|||||||
bool popAndCheckEndofAcquisition(int ithread, char* wbuffer[], bool ready[], uint32_t nP[],char* toFree[],int toFreeOffset[]);
|
bool popAndCheckEndofAcquisition(int ithread, char* wbuffer[], bool ready[], uint32_t nP[],char* toFree[],int toFreeOffset[]);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Called by StartWriting
|
* Called by processWritingBuffer and processWritingBufferPacketByPacket
|
||||||
* When dummy-end buffers are popped from all FIFOs (acquisition over), this is called
|
* When dummy-end buffers are popped from all FIFOs (acquisition over), this is called
|
||||||
* It frees the FIFO addresses, closes all files
|
* It frees the FIFO addresses, closes all files
|
||||||
* For data compression, it waits for all threads to be done
|
* For data compression, it waits for all threads to be done
|
||||||
@ -394,7 +402,7 @@ private:
|
|||||||
void stopWriting(int ithread, char* wbuffer[]);
|
void stopWriting(int ithread, char* wbuffer[]);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Called by startWriting or processWritingBufferPacketByPacket upon reading a frame (for eiger)
|
* Called by processWritingBuffer and processWritingBufferPacketByPacket
|
||||||
* Updates parameters, (writes headers for eiger) and writes to file when not a dummy frame
|
* Updates parameters, (writes headers for eiger) and writes to file when not a dummy frame
|
||||||
* Copies data for gui display and frees addresses popped from FIFOs
|
* Copies data for gui display and frees addresses popped from FIFOs
|
||||||
* @param ithread writing thread index
|
* @param ithread writing thread index
|
||||||
@ -412,7 +420,7 @@ private:
|
|||||||
void writeFileWithoutCompression(char* wbuffer[],int numpackets);
|
void writeFileWithoutCompression(char* wbuffer[],int numpackets);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Called by writeToFileWithoutCompression()
|
* Called by writeToFileWithoutCompression
|
||||||
* Create headers for file writing (at the moment, this is eiger specific)
|
* Create headers for file writing (at the moment, this is eiger specific)
|
||||||
* @param wbuffer writing buffer popped from FIFOs
|
* @param wbuffer writing buffer popped from FIFOs
|
||||||
*/
|
*/
|
||||||
@ -426,7 +434,24 @@ private:
|
|||||||
*/
|
*/
|
||||||
void copyFrameToGui(char* buffer[]);
|
void copyFrameToGui(char* buffer[]);
|
||||||
|
|
||||||
void processWritingBufferPacketByPacket();
|
void processWritingBuffer(int ithread);
|
||||||
|
|
||||||
|
void processWritingBufferPacketByPacket(int ithread);
|
||||||
|
|
||||||
|
void waitWritingBufferForNextAcquisition(int ithread);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Called by processWritingBuffer
|
||||||
|
* Processing fifo popped buffers for data compression
|
||||||
|
* Updates parameters and writes to file
|
||||||
|
* Copies data for gui display and frees addresses popped from FIFOs
|
||||||
|
* @param ithread writing thread number
|
||||||
|
* @param wbuffer writer buffer
|
||||||
|
* @param nf number of frames
|
||||||
|
*/
|
||||||
|
void handleDataCompression(int ithread, char* wbuffer[], int &nf);
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/*************************************************************************
|
/*************************************************************************
|
||||||
* Class Members *********************************************************
|
* Class Members *********************************************************
|
||||||
@ -683,58 +708,6 @@ private:
|
|||||||
* 2 we open, close, write file, callback does not do anything */
|
* 2 we open, close, write file, callback does not do anything */
|
||||||
int cbAction;
|
int cbAction;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
private:
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Creates new tree and file for compression
|
|
||||||
* @param ithr thread number
|
|
||||||
* @param iframe frame number
|
|
||||||
*\returns OK for succces or FAIL for failure
|
|
||||||
*/
|
|
||||||
int createCompressionFile(int ithr, int iframe);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* data compression for each fifo output
|
|
||||||
* @param ithread writing thread number
|
|
||||||
* @param wbuffer writer buffer
|
|
||||||
* @param data pointer to the next packet start
|
|
||||||
* @param xmax max pixels in x direction
|
|
||||||
* @param ymax max pixels in y direction
|
|
||||||
* @param nf nf
|
|
||||||
*/
|
|
||||||
void handleDataCompression(int ithread, char* wbuffer[], char* data, int xmax, int ymax, int &nf);
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
//filter
|
|
||||||
|
|
||||||
|
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
|
@ -1382,6 +1382,36 @@ int UDPStandardImplementation::createNewFile(){
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
int UDPStandardImplementation::createCompressionFile(int ithread, int iframe){
|
||||||
|
FILE_LOG(logDEBUG1) << __AT__ << " called";
|
||||||
|
|
||||||
|
#ifdef MYROOT1
|
||||||
|
char temp[MAX_STR_LENGTH];
|
||||||
|
//create file name for gui purposes, and set up acquistion parameters
|
||||||
|
sprintf(temp, "%s/%s_fxxx_%d_%d.root", filePath,fileName,fileIndex,ithread);
|
||||||
|
//file
|
||||||
|
myFile[ithread] = new TFile(temp,"RECREATE");/** later return error if it exists */
|
||||||
|
cprintf(GREEN,"Writing_Thread %d: Created Compression File: %s\n",ithread, temp);
|
||||||
|
//tree
|
||||||
|
sprintf(temp, "%s_fxxx_%d_%d",fileName,fileIndex,ithread);
|
||||||
|
myTree[ithread]=singlePhotonDetectorObject[ithread]->initEventTree(temp, &iframe);
|
||||||
|
//resets the pedestalSubtraction array and the commonModeSubtraction
|
||||||
|
singlePhotonDetectorObject[ithread]->newDataSet();
|
||||||
|
if(myFile[ithread]==NULL){
|
||||||
|
cprintf(BG_RED,"Error: File Null\n");
|
||||||
|
return FAIL;
|
||||||
|
}
|
||||||
|
if(!myFile[ithread]->IsOpen()){
|
||||||
|
cprintf(BG_RED,"Error: File Not Open\n")
|
||||||
|
return FAIL;
|
||||||
|
}
|
||||||
|
return OK;
|
||||||
|
#endif
|
||||||
|
return FAIL;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
void* UDPStandardImplementation::startListeningThread(void* this_pointer){
|
void* UDPStandardImplementation::startListeningThread(void* this_pointer){
|
||||||
FILE_LOG(logDEBUG1) << __AT__ << " called";
|
FILE_LOG(logDEBUG1) << __AT__ << " called";
|
||||||
((UDPStandardImplementation*)this_pointer)->startListening();
|
((UDPStandardImplementation*)this_pointer)->startListening();
|
||||||
@ -1399,6 +1429,8 @@ void* UDPStandardImplementation::startWritingThread(void* this_pointer){
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
void UDPStandardImplementation::startListening(){
|
void UDPStandardImplementation::startListening(){
|
||||||
FILE_LOG(logDEBUG1) << __AT__ << " called";
|
FILE_LOG(logDEBUG1) << __AT__ << " called";
|
||||||
|
|
||||||
@ -1517,6 +1549,10 @@ int UDPStandardImplementation::prepareAndListenBuffer(int ithread, int lSize, in
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
void UDPStandardImplementation::startFrameIndices(int ithread){
|
void UDPStandardImplementation::startFrameIndices(int ithread){
|
||||||
FILE_LOG(logDEBUG1) << __AT__ << " called";
|
FILE_LOG(logDEBUG1) << __AT__ << " called";
|
||||||
|
|
||||||
@ -1552,6 +1588,8 @@ void UDPStandardImplementation::startFrameIndices(int ithread){
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
void UDPStandardImplementation::stopListening(int ithread, int numbytes){
|
void UDPStandardImplementation::stopListening(int ithread, int numbytes){
|
||||||
FILE_LOG(logDEBUG1) << __AT__ << " called";
|
FILE_LOG(logDEBUG1) << __AT__ << " called";
|
||||||
|
|
||||||
@ -1629,6 +1667,7 @@ void UDPStandardImplementation::stopListening(int ithread, int numbytes){
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
uint32_t UDPStandardImplementation::processListeningBuffer(int ithread, int cSize, char* temp){
|
uint32_t UDPStandardImplementation::processListeningBuffer(int ithread, int cSize, char* temp){
|
||||||
FILE_LOG(logDEBUG1) << __AT__ << " called";
|
FILE_LOG(logDEBUG1) << __AT__ << " called";
|
||||||
|
|
||||||
@ -1721,17 +1760,99 @@ void UDPStandardImplementation::startWriting(){
|
|||||||
//let calling function know thread started and obtained current
|
//let calling function know thread started and obtained current
|
||||||
threadStarted = 1;
|
threadStarted = 1;
|
||||||
|
|
||||||
|
switch(myDetectorType){
|
||||||
|
case EIGER:
|
||||||
|
processWritingBufferPacketByPacket(ithread);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
processWritingBuffer(ithread);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
void UDPStandardImplementation::processWritingBuffer(int ithread, int &nf){
|
||||||
|
FILE_LOG(logDEBUG1) << __AT__ << " called";
|
||||||
|
|
||||||
|
//variable definitions
|
||||||
|
char* wbuf[numberofListeningThreads]; //buffer popped from FIFO
|
||||||
|
sfilefd = NULL; //file pointer
|
||||||
|
|
||||||
|
/* outer loop - loops once for each acquisition */
|
||||||
|
//infinite loop, exited only to change dynamic range, 10G parameters etc (then recreated again)
|
||||||
|
while(true){
|
||||||
|
|
||||||
|
//--reset parameters before acquisition
|
||||||
|
nf = 0;
|
||||||
|
|
||||||
|
/* inner loop - loop for each buffer */
|
||||||
|
//until mask unset (udp sockets shut down by client)
|
||||||
|
while((1 << ithread) & writerThreadsMask){
|
||||||
|
//pop
|
||||||
|
fifo[0]->pop(wbuf[0]);
|
||||||
|
#ifdef FIFODEBUG
|
||||||
|
cprintf(GREEN,"Writing_Thread %d: Popped %p from FIFO %d\n", ithread, (void*)(wbuf[0]),0);
|
||||||
|
#endif
|
||||||
|
int numPackets = (uint32_t)(*((uint32_t*)wbuf[0]));
|
||||||
|
if(numPackets < 0)
|
||||||
|
cprintf(BG_RED,"Error: Negative packet numbers: %d for FIFO %d\n",numPackets,0);
|
||||||
|
#ifdef DEBUG4
|
||||||
|
cprintf(GREEN,"Writing_Thread %d: Number of Packets: %d for FIFO %d\n", ithread, numPackets, 0);
|
||||||
|
#endif
|
||||||
|
|
||||||
|
|
||||||
|
//dummy packet
|
||||||
|
if(numPackets == dummyPacketValue){
|
||||||
|
#ifdef DEBUG3
|
||||||
|
cprintf(GREEN,"Writing_Thread %d: Dummy frame popped out of FIFO %d",ithread, 0);
|
||||||
|
#endif
|
||||||
|
stopWriting(ithread,wbuf);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
//process
|
||||||
|
if(!dataCompressionEnable)
|
||||||
|
handleWithoutDataCompression(ithread, wbuf, numPackets);
|
||||||
|
else{
|
||||||
|
#if defined(MYROOT1) && defined(ALLFILE_DEBUG)
|
||||||
|
if(npackets > 0)
|
||||||
|
writeFileWithoutCompression(wbuf, numPackets);
|
||||||
|
#endif
|
||||||
|
handleDataCompression(ithread,wbuf,nf);
|
||||||
|
}
|
||||||
|
}/*--end of loop for each buffer (inner loop)*/
|
||||||
|
|
||||||
|
waitWritingBufferForNextAcquisition(ithread);
|
||||||
|
|
||||||
|
}/*--end of loop for each acquisition (outer loop) */
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
void UDPStandardImplementation::processWritingBufferPacketByPacket(int ithread){
|
||||||
|
FILE_LOG(logDEBUG1) << __AT__ << " called";
|
||||||
|
|
||||||
|
|
||||||
//variable definitions
|
//variable definitions
|
||||||
char* wbuf[numberofListeningThreads]; //buffer popped from FIFO
|
char* wbuf[numberofListeningThreads]; //buffer popped from FIFO
|
||||||
sfilefd = NULL; //file pointer
|
sfilefd = NULL; //file pointer
|
||||||
bool popReady[numberofListeningThreads]; //if the FIFO can be popped
|
bool popReady[numberofListeningThreads]; //if the FIFO can be popped
|
||||||
uint32_t numPackets[numberofListeningThreads]; //number of packets popped from the FIFO
|
uint32_t numPackets[numberofListeningThreads]; //number of packets popped from the FIFO
|
||||||
//eiger specific
|
//eiger specific
|
||||||
int MAX_NUM_PACKETS = 1024; //highest 32 bit has 1024 number of packets
|
int MAX_NUM_PACKETS = 1024; //highest 32 bit has 1024 number of packets
|
||||||
char* toFreePointers[MAX_NUM_PACKETS]; //pointers to free for each frame
|
char* toFreePointers[MAX_NUM_PACKETS]; //pointers to free for each frame
|
||||||
int toFreePointersOffset[numberofListeningThreads]; //offset of pointers to free added for each thread
|
int toFreePointersOffset[numberofListeningThreads]; //offset of pointers to free added for each thread
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/* outer loop - loops once for each acquisition */
|
/* outer loop - loops once for each acquisition */
|
||||||
//infinite loop, exited only to change dynamic range, 10G parameters etc (then recreated again)
|
//infinite loop, exited only to change dynamic range, 10G parameters etc (then recreated again)
|
||||||
@ -1745,10 +1866,11 @@ void UDPStandardImplementation::startWriting(){
|
|||||||
toFreePointersOffset[i] = (i*packetsPerFrame/numberofListeningThreads);
|
toFreePointersOffset[i] = (i*packetsPerFrame/numberofListeningThreads);
|
||||||
}
|
}
|
||||||
for(int i=0; i<MAX_NUM_PACKETS; ++i){
|
for(int i=0; i<MAX_NUM_PACKETS; ++i){
|
||||||
toFreePointers[i] = NULL;
|
toFreePointers[i] = NULL;
|
||||||
}
|
}
|
||||||
//--end of reset parameters before acquisition
|
//--end of reset parameters before acquisition
|
||||||
|
|
||||||
|
|
||||||
/* inner loop - loop for each buffer */
|
/* inner loop - loop for each buffer */
|
||||||
//until mask unset (udp sockets shut down by client)
|
//until mask unset (udp sockets shut down by client)
|
||||||
while((1 << ithread) & writerThreadsMask){
|
while((1 << ithread) & writerThreadsMask){
|
||||||
@ -1762,95 +1884,91 @@ void UDPStandardImplementation::startWriting(){
|
|||||||
//finish missing packets
|
//finish missing packets
|
||||||
|
|
||||||
if(myDetectorType == EIGER &&
|
if(myDetectorType == EIGER &&
|
||||||
((tempoffset[0]!=0) || (tempoffset[1]!=(packetsPerFrame/numberofListeningThreads))));
|
((tempoffset[0]!=0) || (tempoffset[1]!=(packetsPerFrame/numberofListeningThreads))));
|
||||||
else{
|
else{
|
||||||
stopWriting(ithread,wbuf);
|
stopWriting(ithread,wbuf);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
switch(myDetectorType){
|
|
||||||
case EIGER:
|
|
||||||
processWritingBufferPacketByPacket();
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
if(!dataCompressionEnable)
|
|
||||||
handleWithoutDataCompression(ithread, wbuf, numPackets[0]);
|
|
||||||
else
|
|
||||||
handleDataCompression(ithread,wbuf,d, xmax, ymax, nf);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
}/*--end of loop for each buffer (inner loop)*/
|
}/*--end of loop for each buffer (inner loop)*/
|
||||||
|
|
||||||
|
waitWritingBufferForNextAcquisition(ithread);
|
||||||
//in case they are not closed already
|
|
||||||
closeFile();
|
|
||||||
#ifdef DEBUG4
|
|
||||||
cprintf(GREEN,"Writing_Thread %d: Done with acquisition. Waiting for 1st sem to create new file/change of parameters\n", ithread);
|
|
||||||
#endif
|
|
||||||
//end of acquisition, wait for file create/change of parameters
|
|
||||||
sem_wait(&writerSemaphore[ithread]);
|
|
||||||
//check to exit thread (for change of parameters) - only EXIT possibility
|
|
||||||
if(killAllWritingThreads){
|
|
||||||
cprintf(GREEN,"Writing_Thread %d:Goodbye!\n",ithread);
|
|
||||||
//free resources at exit
|
|
||||||
for(int i=0; i<MAX_NUMBER_OF_LISTENING_THREADS; ++i)
|
|
||||||
if(wbuf[i]) delete[] wbuf[i];
|
|
||||||
pthread_exit(NULL);
|
|
||||||
}
|
|
||||||
#ifdef DEBUG4
|
|
||||||
cprintf(GREEN,"Writing_Thread %d: Got 1st post. Creating File\n", ithread);
|
|
||||||
#endif
|
|
||||||
|
|
||||||
|
|
||||||
//create file
|
|
||||||
if((1<<ithread)&createFileMask){
|
|
||||||
if(dataCompressionEnable){
|
|
||||||
#ifdef MYROOT1
|
|
||||||
pthread_mutex_lock(&writeMutex);
|
|
||||||
fileCreateSuccess = createCompressionFile(ithread,0);
|
|
||||||
pthread_mutex_unlock(&writeMutex);
|
|
||||||
#endif
|
|
||||||
}else
|
|
||||||
fileCreateSuccess = createNewFile();
|
|
||||||
|
|
||||||
//let startwriter know file created
|
|
||||||
pthread_mutex_lock(&statusMutex);
|
|
||||||
createFileMask^=(1<<ithread);
|
|
||||||
pthread_mutex_unlock(&statusMutex);
|
|
||||||
}
|
|
||||||
#ifdef DEBUG4
|
|
||||||
cprintf(GREEN,"Writing_Thread %d: File Created. Waiting for 2nd sem to restart acquisition\n", ithread);
|
|
||||||
#endif
|
|
||||||
|
|
||||||
|
|
||||||
//end of acquisition, wait for restart acquisition/change of parameters
|
|
||||||
sem_wait(&writerSemaphore[ithread]);
|
|
||||||
//check to exit thread (for change of parameters) - only EXIT possibility
|
|
||||||
if(killAllWritingThreads){
|
|
||||||
cprintf(GREEN,"Writing_Thread %d:Goodbye!\n",ithread);
|
|
||||||
//free resources at exit
|
|
||||||
for(int i=0; i<MAX_NUMBER_OF_LISTENING_THREADS; ++i)
|
|
||||||
if(wbuf[i]) delete[] wbuf[i];
|
|
||||||
pthread_exit(NULL);
|
|
||||||
}
|
|
||||||
#ifdef DEBUG4
|
|
||||||
cprintf(GREEN,"Writing_Thread %d: Got 2nd post. Restarting Acquisition\n", ithread);
|
|
||||||
#endif
|
|
||||||
|
|
||||||
}/*--end of loop for each acquisition (outer loop) */
|
}/*--end of loop for each acquisition (outer loop) */
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void UDPStandardImplementation::waitWritingBufferForNextAcquisition(int ithread){
|
||||||
|
FILE_LOG(logDEBUG1) << __AT__ << " called";
|
||||||
|
|
||||||
|
//in case they are not closed already
|
||||||
|
closeFile();
|
||||||
|
#ifdef DEBUG4
|
||||||
|
cprintf(GREEN,"Writing_Thread %d: Done with acquisition. Waiting for 1st sem to create new file/change of parameters\n", ithread);
|
||||||
|
#endif
|
||||||
|
//end of acquisition, wait for file create/change of parameters
|
||||||
|
sem_wait(&writerSemaphore[ithread]);
|
||||||
|
//check to exit thread (for change of parameters) - only EXIT possibility
|
||||||
|
if(killAllWritingThreads){
|
||||||
|
cprintf(GREEN,"Writing_Thread %d:Goodbye!\n",ithread);
|
||||||
|
pthread_exit(NULL);
|
||||||
|
}
|
||||||
|
#ifdef DEBUG4
|
||||||
|
cprintf(GREEN,"Writing_Thread %d: Got 1st post. Creating File\n", ithread);
|
||||||
|
#endif
|
||||||
|
|
||||||
|
|
||||||
|
//create file
|
||||||
|
if((1<<ithread)&createFileMask){
|
||||||
|
if(dataCompressionEnable){
|
||||||
|
#ifdef MYROOT1
|
||||||
|
pthread_mutex_lock(&writeMutex);
|
||||||
|
fileCreateSuccess = createCompressionFile(ithread,0);
|
||||||
|
pthread_mutex_unlock(&writeMutex);
|
||||||
|
#endif
|
||||||
|
}else
|
||||||
|
fileCreateSuccess = createNewFile();
|
||||||
|
|
||||||
|
//let startwriter know file created
|
||||||
|
pthread_mutex_lock(&statusMutex);
|
||||||
|
createFileMask^=(1<<ithread);
|
||||||
|
pthread_mutex_unlock(&statusMutex);
|
||||||
|
}
|
||||||
|
#ifdef DEBUG4
|
||||||
|
cprintf(GREEN,"Writing_Thread %d: File Created. Waiting for 2nd sem to restart acquisition\n", ithread);
|
||||||
|
#endif
|
||||||
|
|
||||||
|
|
||||||
|
//end of acquisition, wait for restart acquisition/change of parameters
|
||||||
|
sem_wait(&writerSemaphore[ithread]);
|
||||||
|
//check to exit thread (for change of parameters) - only EXIT possibility
|
||||||
|
if(killAllWritingThreads){
|
||||||
|
cprintf(GREEN,"Writing_Thread %d:Goodbye!\n",ithread);
|
||||||
|
//free resources at exit
|
||||||
|
pthread_exit(NULL);
|
||||||
|
}
|
||||||
|
#ifdef DEBUG4
|
||||||
|
cprintf(GREEN,"Writing_Thread %d: Got 2nd post. Restarting Acquisition\n", ithread);
|
||||||
|
#endif
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
bool UDPStandardImplementation::popAndCheckEndofAcquisition(int ithread, char* wbuffer[], bool ready[], uint32_t nP[],char* toFree[],int toFreeOffset[]){
|
bool UDPStandardImplementation::popAndCheckEndofAcquisition(int ithread, char* wbuffer[], bool ready[], uint32_t nP[],char* toFree[],int toFreeOffset[]){
|
||||||
FILE_LOG(logDEBUG1) << __AT__ << " called";
|
FILE_LOG(logDEBUG1) << __AT__ << " called";
|
||||||
|
|
||||||
bool endofAcquisition = true;
|
bool endofAcquisition = true;
|
||||||
int val;
|
|
||||||
for(int i=0; i<numberofListeningThreads; ++i){
|
for(int i=0; i<numberofListeningThreads; ++i){
|
||||||
//pop if ready
|
//pop if ready
|
||||||
if(ready[i]){
|
if(ready[i]){
|
||||||
@ -1858,7 +1976,7 @@ bool UDPStandardImplementation::popAndCheckEndofAcquisition(int ithread, char* w
|
|||||||
#ifdef FIFODEBUG
|
#ifdef FIFODEBUG
|
||||||
cprintf(GREEN,"Writing_Thread %d: Popped %p from FIFO %d\n", ithread, (void*)(wbuffer[i]),i);
|
cprintf(GREEN,"Writing_Thread %d: Popped %p from FIFO %d\n", ithread, (void*)(wbuffer[i]),i);
|
||||||
#endif
|
#endif
|
||||||
val = (uint32_t)(*((uint32_t*)wbuffer[i]));
|
int val = (uint32_t)(*((uint32_t*)wbuffer[i]));
|
||||||
if(val < 0)
|
if(val < 0)
|
||||||
cprintf(BG_RED,"Error: Negative packet numbers: %d for FIFO %d\n",val,i);
|
cprintf(BG_RED,"Error: Negative packet numbers: %d for FIFO %d\n",val,i);
|
||||||
nP[i] = abs(val);
|
nP[i] = abs(val);
|
||||||
@ -2223,84 +2341,135 @@ void UDPStandardImplementation::copyFrameToGui(char* buffer[]){
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void UDPStandardImplementation::processWritingBufferPacketByPacket(int ithread, char* wbuffer[], uint32_t nP[]){
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
void UDPStandardImplementation::handleDataCompression(int ithread, char* wbuffer[], int &nf){
|
||||||
FILE_LOG(logDEBUG1) << __AT__ << " called";
|
FILE_LOG(logDEBUG1) << __AT__ << " called";
|
||||||
|
|
||||||
|
//frame number
|
||||||
|
uint64_t tempframenumber = ((uint32_t)(*((uint32_t*)(wbuffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS))));
|
||||||
|
//for gotthard and normal frame, increment frame number to separate fnum and pnum
|
||||||
|
if (myDetectorType == PROPIX ||(myDetectorType == GOTTHARD && shortFrameEnable == -1))
|
||||||
|
tempframenumber++;
|
||||||
|
//get frame number
|
||||||
|
tempframenumber = (tempframenumber & frameIndexMask) >> frameIndexOffset;
|
||||||
|
//handle multi threads
|
||||||
|
pthread_mutex_lock(&progressMutex);
|
||||||
|
if(tempframenumber > currentFrameNumber)
|
||||||
|
currentFrameNumber = tempframenumber;
|
||||||
|
pthread_mutex_unlock(&progressMutex);
|
||||||
|
//set indices
|
||||||
|
acquisitionIndex = currentFrameNumber - startAcquisitionIndex;
|
||||||
|
frameIndex = currentFrameNumber - startFrameIndex;
|
||||||
|
|
||||||
|
|
||||||
}
|
//variable definitions
|
||||||
|
char* buff[2] = 0; //an array just to be compatible with copyframetogui
|
||||||
|
char* data = wbuffer[0]+ HEADER_SIZE_NUM_TOT_PACKETS; //data pointer to the next memory to be analysed
|
||||||
|
int ndata; //size of data returned
|
||||||
|
int np; //remaining number of packets returned
|
||||||
|
int npackets = (uint32_t)(*((uint32_t*)wbuffer[0])); //number of total packets
|
||||||
|
int remainingsize = npackets * onePacketSize; //size of the memory slot to be analyzed
|
||||||
|
|
||||||
|
eventType thisEvent = PEDESTAL;
|
||||||
|
int once = 0;
|
||||||
|
int xmax = 0, ymax = 0; //max pixels in x and y direction
|
||||||
|
int xmin = 1, ymin = 1; //min pixels in x and y direction
|
||||||
|
double tot, tl, tr, bl, br;
|
||||||
|
|
||||||
|
//determining xmax and ymax
|
||||||
|
switch(myDetectorType){
|
||||||
|
case MOENCH:
|
||||||
|
xmax = MOENCH_PIXELS_IN_ONE_ROW-1;
|
||||||
|
ymax = MOENCH_PIXELS_IN_ONE_ROW-1;
|
||||||
|
break;
|
||||||
|
case GOTTHARD:
|
||||||
|
if(shortFrameEnable == -1){
|
||||||
|
xmax = GOTTHARD_PIXELS_IN_ROW-1;
|
||||||
|
ymax = GOTTHARD_PIXELS_IN_COL-1;
|
||||||
|
}else{
|
||||||
|
xmax = GOTTHARD_SHORT_PIXELS_IN_ROW-1;
|
||||||
|
ymax = GOTTHARD_SHORT_PIXELS_IN_COL-1;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
while(buff[0] = receiverData[ithread]->findNextFrame(data,ndata,remainingsize)){
|
||||||
|
|
||||||
|
//remaining number of packets
|
||||||
|
np = ndata/onePacketSize;
|
||||||
|
|
||||||
|
if ((np == packetsPerFrame) && (buff[0]!=NULL)){
|
||||||
|
if(nf == 1000)
|
||||||
|
cprintf(GREEN, "Writing_Thread %d: pedestal done\n", ithread);
|
||||||
|
|
||||||
|
singlePhotonDetectorObject[ithread]->newFrame();
|
||||||
|
|
||||||
|
//only for moench
|
||||||
|
if(commonModeSubtractionEnable){
|
||||||
|
for(int ix = xmin - 1; ix < xmax+1; ix++){
|
||||||
|
for(int iy = ymin - 1; iy < ymax+1; iy++){
|
||||||
|
thisEvent = singlePhotonDetectorObject[ithread]->getEventType(buff[0], ix, iy, 0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
for(int ix = xmin - 1; ix < xmax+1; ix++)
|
||||||
|
for(int iy = ymin - 1; iy < ymax+1; iy++){
|
||||||
|
thisEvent=singlePhotonDetectorObject[ithread]->getEventType(buff[0], ix, iy, commonModeSubtractionEnable);
|
||||||
|
if (nf>1000) {
|
||||||
|
tot=0;
|
||||||
|
tl=0;
|
||||||
|
tr=0;
|
||||||
|
bl=0;
|
||||||
|
br=0;
|
||||||
|
if (thisEvent==PHOTON_MAX) {
|
||||||
|
receiverData[ithread]->getFrameNumber(buff[0]);
|
||||||
|
//iFrame=receiverdata[ithread]->getFrameNumber(buff);
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
int UDPStandardImplementation::createCompressionFile(int ithr, int iframe){
|
|
||||||
FILE_LOG(logDEBUG1) << __AT__ << " called";
|
|
||||||
|
|
||||||
#ifdef MYROOT1
|
#ifdef MYROOT1
|
||||||
char temp[MAX_STR_LENGTH];
|
myTree[ithread]->Fill();
|
||||||
//create file name for gui purposes, and set up acquistion parameters
|
//cout << "Fill in event: frmNr: " << iFrame << " ix " << ix << " iy " << iy << " type " << thisEvent << endl;
|
||||||
sprintf(temp, "%s/%s_fxxx_%d_%d.root", filePath,fileName,fileIndex,ithr);
|
#else
|
||||||
//file
|
pthread_mutex_lock(&writeMutex);
|
||||||
myFile[ithr] = new TFile(temp,"RECREATE");/** later return error if it exists */
|
if((fileWriteEnable) && (sfilefd))
|
||||||
cout << "Thread " << ithr << ": created File: "<< temp << endl;
|
singlePhotonDetectorObject[ithread]->writeCluster(sfilefd);
|
||||||
//tree
|
pthread_mutex_unlock(&writeMutex);
|
||||||
sprintf(temp, "%s_fxxx_%d_%d",fileName,fileIndex,ithr);
|
#endif
|
||||||
myTree[ithr]=singlePhotonDet[ithr]->initEventTree(temp, &iframe);
|
}
|
||||||
//resets the pedestalSubtraction array and the commonModeSubtraction
|
}
|
||||||
singlePhotonDet[ithr]->newDataSet();
|
}
|
||||||
if(myFile[ithr]==NULL){
|
|
||||||
cout<<"file null"<<endl;
|
nf++;
|
||||||
return FAIL;
|
#ifndef ALLFILE
|
||||||
}
|
pthread_mutex_lock(&progressMutex);
|
||||||
if(!myFile[ithr]->IsOpen()){
|
packetsInFile += packetsPerFrame;
|
||||||
cout<<"file not open"<<endl;
|
packetsCaught += packetsPerFrame;
|
||||||
return FAIL;
|
totalPacketsCaught += packetsPerFrame;
|
||||||
}
|
if(packetsInFile >= (uint32_t)maxPacketsPerFile)
|
||||||
|
createNewFile();
|
||||||
|
pthread_mutex_unlock(&progressMutex);
|
||||||
|
|
||||||
|
#endif
|
||||||
|
if(!once){
|
||||||
|
copyFrameToGui(buff);
|
||||||
|
once = 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
remainingsize -= ((buff[0] + ndata) - data);
|
||||||
|
data = buff[0] + ndata;
|
||||||
|
if(data > (wbuffer[0] + HEADER_SIZE_NUM_TOT_PACKETS + npackets * onePacketSize) )
|
||||||
|
cprintf(BG_RED,"Writing_Thread %d: Error: Compression data goes out of bounds!\n", ithread);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
while(!fifoFree[0]->push(wbuffer[0]));
|
||||||
|
#ifdef FIFODEBUG
|
||||||
|
cprintf(GREEN,"Writing_Thread %d: Compression free pushed into fifofree %p for listerner 0\n", ithread, (void*)(wbuffer[0]));
|
||||||
#endif
|
#endif
|
||||||
return OK;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -2312,13 +2481,49 @@ int UDPStandardImplementation::createCompressionFile(int ithr, int iframe){
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
int UDPStandardImplementation::startWriting(){
|
int UDPStandardImplementation::startWriting(){
|
||||||
FILE_LOG(logDEBUG1) << __AT__ << " called";
|
FILE_LOG(logDEBUG1) << __AT__ << " called";
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
char *d=new char[bufferSize*numListeningThreads];
|
|
||||||
int xmax=0,ymax=0;
|
|
||||||
int ret,i,j;
|
int ret,i,j;
|
||||||
|
|
||||||
bool endofacquisition;
|
bool endofacquisition;
|
||||||
@ -2355,19 +2560,7 @@ int UDPStandardImplementation::startWriting(){
|
|||||||
while(1){
|
while(1){
|
||||||
|
|
||||||
|
|
||||||
nf = 0;
|
|
||||||
if(myDetectorType == MOENCH){
|
|
||||||
xmax = MOENCH_PIXELS_IN_ONE_ROW-1;
|
|
||||||
ymax = MOENCH_PIXELS_IN_ONE_ROW-1;
|
|
||||||
}else{
|
|
||||||
if(shortFrame == -1){
|
|
||||||
xmax = GOTTHARD_PIXELS_IN_ROW-1;
|
|
||||||
ymax = GOTTHARD_PIXELS_IN_COL-1;
|
|
||||||
}else{
|
|
||||||
xmax = GOTTHARD_SHORT_PIXELS_IN_ROW-1;
|
|
||||||
ymax = GOTTHARD_SHORT_PIXELS_IN_COL-1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
//so that the first frame is always copied
|
//so that the first frame is always copied
|
||||||
guiData = latestData;
|
guiData = latestData;
|
||||||
@ -2757,118 +2950,3 @@ int UDPStandardImplementation::startWriting(){
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
void UDPStandardImplementation::handleDataCompression(int ithread, char* wbuffer[], char* data, int xmax, int ymax, int &nf){
|
|
||||||
FILE_LOG(logDEBUG1) << __AT__ << " called";
|
|
||||||
|
|
||||||
uint64_t tempframenumber = ((uint32_t)(*((uint32_t*)(wbuffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS))));
|
|
||||||
//for gotthard and normal frame, increment frame number to separate fnum and pnum
|
|
||||||
if (myDetectorType == PROPIX ||(myDetectorType == GOTTHARD && shortFrameEnable == -1))
|
|
||||||
tempframenumber++;
|
|
||||||
//get frame number
|
|
||||||
tempframenumber = (tempframenumber & frameIndexMask) >> frameIndexOffset;
|
|
||||||
|
|
||||||
|
|
||||||
pthread_mutex_lock(&progressMutex);
|
|
||||||
if(tempframenumber > currentFrameNumber)
|
|
||||||
currentFrameNumber = tempframenumber;
|
|
||||||
pthread_mutex_unlock(&progressMutex);
|
|
||||||
//set indices
|
|
||||||
acquisitionIndex = currentFrameNumber - startAcquisitionIndex;
|
|
||||||
frameIndex = currentFrameNumber - startFrameIndex;
|
|
||||||
|
|
||||||
#if defined(MYROOT1) && defined(ALLFILE_DEBUG)
|
|
||||||
writeToFile_withoutCompression(wbuf[0], numpackets,currframenum);
|
|
||||||
#endif
|
|
||||||
int npackets = (uint32_t)(*((uint32_t*)wbuffer[0]));
|
|
||||||
eventType thisEvent = PEDESTAL;
|
|
||||||
int ndata;
|
|
||||||
char* buff = 0;
|
|
||||||
data = wbuffer[0]+ HEADER_SIZE_NUM_TOT_PACKETS;
|
|
||||||
int remainingsize = npackets * onePacketSize;
|
|
||||||
int np;
|
|
||||||
int once = 0;
|
|
||||||
double tot, tl, tr, bl, br;
|
|
||||||
int xmin = 1, ymin = 1, ix, iy;
|
|
||||||
|
|
||||||
|
|
||||||
while(buff = receiverdata[ithread]->findNextFrame(data,ndata,remainingsize)){
|
|
||||||
np = ndata/onePacketSize;
|
|
||||||
|
|
||||||
//cout<<"buff framnum:"<<ithread <<":"<< ((((uint32_t)(*((uint32_t*)buff)))& (frameIndexMask)) >> frameIndexOffset)<<endl;
|
|
||||||
|
|
||||||
if ((np == packetsPerFrame) && (buff!=NULL)){
|
|
||||||
if(nf == 1000) cout << "Thread " << ithread << ": pedestal done " << endl;
|
|
||||||
|
|
||||||
|
|
||||||
singlePhotonDet[ithread]->newFrame();
|
|
||||||
|
|
||||||
//only for moench
|
|
||||||
if(commonModeSubtractionEnable){
|
|
||||||
for(ix = xmin - 1; ix < xmax+1; ix++){
|
|
||||||
for(iy = ymin - 1; iy < ymax+1; iy++){
|
|
||||||
thisEvent = singlePhotonDet[ithread]->getEventType(buff, ix, iy, 0);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
for(ix = xmin - 1; ix < xmax+1; ix++)
|
|
||||||
for(iy = ymin - 1; iy < ymax+1; iy++){
|
|
||||||
thisEvent=singlePhotonDet[ithread]->getEventType(buff, ix, iy, commonModeSubtractionEnable);
|
|
||||||
if (nf>1000) {
|
|
||||||
tot=0;
|
|
||||||
tl=0;
|
|
||||||
tr=0;
|
|
||||||
bl=0;
|
|
||||||
br=0;
|
|
||||||
if (thisEvent==PHOTON_MAX) {
|
|
||||||
receiverdata[ithread]->getFrameNumber(buff);
|
|
||||||
//iFrame=receiverdata[ithread]->getFrameNumber(buff);
|
|
||||||
#ifdef MYROOT1
|
|
||||||
myTree[ithread]->Fill();
|
|
||||||
//cout << "Fill in event: frmNr: " << iFrame << " ix " << ix << " iy " << iy << " type " << thisEvent << endl;
|
|
||||||
#else
|
|
||||||
pthread_mutex_lock(&write_mutex);
|
|
||||||
if((enableFileWrite) && (sfilefd))
|
|
||||||
singlePhotonDet[ithread]->writeCluster(sfilefd);
|
|
||||||
pthread_mutex_unlock(&write_mutex);
|
|
||||||
#endif
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
nf++;
|
|
||||||
#ifndef ALLFILE
|
|
||||||
pthread_mutex_lock(&progress_mutex);
|
|
||||||
packetsInFile += packetsPerFrame;
|
|
||||||
packetsCaught += packetsPerFrame;
|
|
||||||
totalPacketsCaught += packetsPerFrame;
|
|
||||||
if(packetsInFile >= (uint32_t)maxPacketsPerFile)
|
|
||||||
createNewFile();
|
|
||||||
pthread_mutex_unlock(&progress_mutex);
|
|
||||||
|
|
||||||
#endif
|
|
||||||
if(!once){
|
|
||||||
copyFrameToGui(NULL,buff);
|
|
||||||
once = 1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
remainingsize -= ((buff + ndata) - data);
|
|
||||||
data = buff + ndata;
|
|
||||||
if(data > (wbuffer[0] + HEADER_SIZE_NUM_TOT_PACKETS + npackets * onePacketSize) )
|
|
||||||
cprintf(BG_RED,"ERROR SHOULD NOT COME HERE, Error 142536!\n");
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
while(!fifoFree[0]->push(wbuffer[0]));
|
|
||||||
#ifdef FIFO_DEBUG
|
|
||||||
cprintf(BLUE,"%d writer compression free pushed into fifofree %x for listerner 0\n", ithread, (void*)(wbuffer[0]));
|
|
||||||
#endif
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user