somewhere

This commit is contained in:
Dhanya Maliakal 2015-10-15 12:11:06 +02:00
parent f141624477
commit a3e12e7955
4 changed files with 106 additions and 112 deletions

View File

@ -77,8 +77,9 @@ class UDPStandardImplementation: private virtual slsReceiverDefs, public UDPBase
* Overridden method * Overridden method
* Set data compression, by saving only hits (so far implemented only for Moench and Gotthard) * Set data compression, by saving only hits (so far implemented only for Moench and Gotthard)
* @param b true for data compression enable, else false * @param b true for data compression enable, else false
* @return OK or FAIL
*/ */
void setDataCompressionEnable(const bool b); int setDataCompressionEnable(const bool b);
//***acquisition parameters*** //***acquisition parameters***
/** /**
@ -367,7 +368,33 @@ private:
*/ */
uint32_t processListeningBuffer(int ithread, int cSize,char* temp); uint32_t processListeningBuffer(int ithread, int cSize,char* temp);
bool popAndCheckEndofAcquisition(char* wbuffer[], bool ready[], uint32_t nP[],char* toFree[],int toFreeOffset[]); /**
* Called by StartWriting
* Pops buffer from all the FIFOs and checks for dummy frames and end of acquisition
* @param ithread current thread index
* @param wbuffer the buffer array that is popped from all the FIFOs
* @param ready if that FIFO is allowed to pop (depends on if dummy buffer already popped/ waiting for other FIFO to finish a frame(eiger))
* @param nP number of packets in the buffer popped out
* @param toFree array of addresses to pop into fifoFree (eiger specific)
* @param toFreeOffset the number of addresses to free for each FIFO (eiger specific)
* @return true if end of acquisition else false
*/
bool popAndCheckEndofAcquisition(int ithread, char* wbuffer[], bool ready[], uint32_t nP[],char* toFree[],int toFreeOffset[]);
/**
* Called by StartWriting
* When dummy-end buffers are popped from all FIFOs (acquisition over), this is called
* It frees the FIFO addresses, closes all files
* For data compression, it waits for all threads to be done
* Changes the status to RUN_FINISHED and prints statistics
* @param ithread writing thread index
* @param wbuffer writing buffer popped out from FIFO
*/
void stopWriting(int ithread, char* wbuffer[]);
void processWritingBuffer(int ithread, char* wbuffer[], uint32_t nP[]);
void processWritingBufferPacketByPacket();
/************************************************************************* /*************************************************************************
* Class Members ********************************************************* * Class Members *********************************************************
@ -669,11 +696,7 @@ private:
*/ */
void writeToFile_withoutCompression(char* buf[],int numpackets, uint32_t framenum); void writeToFile_withoutCompression(char* buf[],int numpackets, uint32_t framenum);
/**
* When acquisition is over, this is called
* @param ithread listening thread number
*/
void stopWriting(int ithread, char* wbuffer[]);
/** /**
* updates parameters and writes to file when not a dummy frame * updates parameters and writes to file when not a dummy frame

View File

@ -8,8 +8,10 @@
#endif #endif
#include <stdint.h> #include <stdint.h>
#include <string>
#include "ansi.h" #include "ansi.h"
typedef double double32_t; typedef double double32_t;
typedef float float32_t; typedef float float32_t;
typedef int int32_t; typedef int int32_t;
@ -115,9 +117,9 @@ public:
\param b true or false \param b true or false
\returns string enabled, disabled \returns string enabled, disabled
*/ */
static string stringEnable(bool b){\ static std::string stringEnable(bool b){\
if(b) return string("enabled"); \ if(b) return std::string("enabled"); \
else return string("disabled"); \ else return std::string("disabled"); \
}; };

View File

@ -5,9 +5,9 @@
***********************************************/ ***********************************************/
#include "UDPBaseImplementation.h" #include "UDPBaseImplementation.h"
#include "genericSocket.h"
#include <sys/stat.h> // stat #include <sys/stat.h> // stat
#include <iostream> #include <iostream>
#include <string.h> #include <string.h>
using namespace std; using namespace std;
@ -75,10 +75,6 @@ UDPBaseImplementation::~UDPBaseImplementation(){
FILE_LOG(logDEBUG) << __AT__ << " starting"; FILE_LOG(logDEBUG) << __AT__ << " starting";
cout << "Info: Deleting base member pointers" << endl; cout << "Info: Deleting base member pointers" << endl;
if(detHostname) {delete [] detHostname; detHostname = NULL;}
if(eth) {delete [] eth; eth = NULL;}
if(fileName) {delete [] fileName; fileName = NULL;}
if(filePath) {delete [] filePath; filePath = NULL;}
} }
@ -129,7 +125,7 @@ char *UDPBaseImplementation::getFilePath() const{
return output; return output;
} }
uint32_t UDPBaseImplementation::getFileIndex() const{ FILE_LOG(logDEBUG) << __AT__ << " starting"; return fileIndex;} uint64_t UDPBaseImplementation::getFileIndex() const{ FILE_LOG(logDEBUG) << __AT__ << " starting"; return fileIndex;}
int UDPBaseImplementation::getScanTag() const{ FILE_LOG(logDEBUG) << __AT__ << " starting"; return scanTag;} int UDPBaseImplementation::getScanTag() const{ FILE_LOG(logDEBUG) << __AT__ << " starting"; return scanTag;}
@ -234,7 +230,7 @@ void UDPBaseImplementation::setFilePath(const char c[]){
FILE_LOG(logINFO) << "File path:" << filePath; FILE_LOG(logINFO) << "File path:" << filePath;
} }
void UDPBaseImplementation::setFileIndex(const uint32_t i){ void UDPBaseImplementation::setFileIndex(const uint64_t i){
FILE_LOG(logDEBUG) << __AT__ << " starting"; FILE_LOG(logDEBUG) << __AT__ << " starting";
fileIndex = i; fileIndex = i;
@ -394,7 +390,7 @@ void UDPBaseImplementation::resetAcquisitionCount(){
FILE_LOG(logINFO) << "totalPacketsCaught:" << totalPacketsCaught << endl; FILE_LOG(logINFO) << "totalPacketsCaught:" << totalPacketsCaught << endl;
} }
int UDPBaseImplementation::startReceiver(char *c=NULL){ int UDPBaseImplementation::startReceiver(char *c){
FILE_LOG(logWARNING) << __AT__ << " doing nothing..."; FILE_LOG(logWARNING) << __AT__ << " doing nothing...";
FILE_LOG(logERROR) << __AT__ << " must be overridden by child classes"; FILE_LOG(logERROR) << __AT__ << " must be overridden by child classes";
return OK; return OK;
@ -433,17 +429,17 @@ void UDPBaseImplementation::closeFile(int i){
/***callback functions***/ /***callback functions***/
void UDPBaseImplementation::registerCallBackStartAcquisition(int (*func)(char*, char*,int, int, void*),void *arg){ void UDPBaseImplementation::registerCallBackStartAcquisition(int (*func)(char*, char*,uint64_t, uint32_t, void*),void *arg){
startAcquisitionCallBack=func; startAcquisitionCallBack=func;
pStartAcquisition=arg; pStartAcquisition=arg;
} }
void UDPBaseImplementation::registerCallBackAcquisitionFinished(void (*func)(int, void*),void *arg){ void UDPBaseImplementation::registerCallBackAcquisitionFinished(void (*func)(uint64_t, void*),void *arg){
acquisitionFinishedCallBack=func; acquisitionFinishedCallBack=func;
pAcquisitionFinished=arg; pAcquisitionFinished=arg;
} }
void UDPBaseImplementation::registerCallBackRawDataReady(void (*func)(int, char*, int, FILE*, char*, void*),void *arg){ void UDPBaseImplementation::registerCallBackRawDataReady(void (*func)(uint64_t, char*, uint32_t, FILE*, char*, void*),void *arg){
rawDataReadyCallBack=func; rawDataReadyCallBack=func;
pRawDataReady=arg; pRawDataReady=arg;
} }

View File

@ -764,7 +764,7 @@ void UDPStandardImplementation::resetAcquisitionCount(){
} }
int UDPStandardImplementation::startReceiver(char *c=NULL){ int UDPStandardImplementation::startReceiver(char *c){
FILE_LOG(logDEBUG1) << __AT__ << " called"; FILE_LOG(logDEBUG1) << __AT__ << " called";
cout << "Info: Starting Receiver" << endl; cout << "Info: Starting Receiver" << endl;
@ -1752,23 +1752,28 @@ void UDPStandardImplementation::startWriting(){
//pop fifo and if end of acquisition //pop fifo and if end of acquisition
if(popAndCheckEndofAcquisition(wbuf, popReady, numPackets,toFreePointers,toFreePointersOffset)){ if(popAndCheckEndofAcquisition(ithread, wbuf, popReady, numPackets,toFreePointers,toFreePointersOffset)){
#ifdef DEBUG4 #ifdef DEBUG4
cprintf(GREEN,"Writing_Thread %d: All dummy-end buffers popped\n", ithread); cprintf(GREEN,"Writing_Thread %d: All dummy-end buffers popped\n", ithread);
#endif #endif
//finish missing packets //finish missing packets
if(myDetectorType == EIGER
&& ((tempoffset[0]!=0) || (tempoffset[1]!=(packetsPerFrame/numListeningThreads)))); if(myDetectorType == EIGER &&
((tempoffset[0]!=0) || (tempoffset[1]!=(packetsPerFrame/numberofListeningThreads))));
else{ else{
stopWriting(ithread,wbuf); stopWriting(ithread,wbuf);
continue; continue;
} }
} }
//eiger-processWritingPackets(); switch(myDetectorType){
//others-processWritingBuffer(); case EIGER:
processWritingBufferPacketByPacket();
break;
default:
processWritingBuffer(ithread, wbuf, numPackets);
break;
}
}/*--end of loop for each buffer (inner loop)*/ }/*--end of loop for each buffer (inner loop)*/
@ -1835,7 +1840,8 @@ void UDPStandardImplementation::startWriting(){
bool UDPStandardImplementation::popAndCheckEndofAcquisition(char* wbuffer[], bool ready[], uint32_t nP[],char* toFree[],int toFreeOffset[]){ bool UDPStandardImplementation::popAndCheckEndofAcquisition(int ithread, char* wbuffer[], bool ready[], uint32_t nP[],char* toFree[],int toFreeOffset[]){
FILE_LOG(logDEBUG1) << __AT__ << " called";
bool endofAcquisition = true; bool endofAcquisition = true;
int val; int val;
@ -1866,7 +1872,7 @@ bool UDPStandardImplementation::popAndCheckEndofAcquisition(char* wbuffer[], boo
#ifdef DEBUG4 #ifdef DEBUG4
switch(myDetectorType){ switch(myDetectorType){
case EIGER: case EIGER:
wbuf_footer = (eiger_packet_footer_t*)(wbuffer[i] + footerOffset + HEADER_SIZE_NUM_TOT_PACKETS); eiger_packet_footer_t* wbuf_footer = (eiger_packet_footer_t*)(wbuffer[i] + footerOffset + HEADER_SIZE_NUM_TOT_PACKETS);
//cprintf(BLUE,"footer value:0x%x\n",i,(uint64_t)(*( (uint64_t*) wbuf_footer))); //cprintf(BLUE,"footer value:0x%x\n",i,(uint64_t)(*( (uint64_t*) wbuf_footer)));
cprintf(BLUE,"Fnum[%d]:%d\n",i,(uint32_t)(*( (uint64_t*) wbuf_footer))); cprintf(BLUE,"Fnum[%d]:%d\n",i,(uint32_t)(*( (uint64_t*) wbuf_footer)));
cprintf(BLUE,"Pnum[%d]:%d\n",i,*( (uint16_t*) wbuf_footer->packetNumber)); cprintf(BLUE,"Pnum[%d]:%d\n",i,*( (uint16_t*) wbuf_footer->packetNumber));
@ -1887,6 +1893,54 @@ bool UDPStandardImplementation::popAndCheckEndofAcquisition(char* wbuffer[], boo
void UDPStandardImplementation::stopWriting(int ithread, char* wbuffer[]){
FILE_LOG(logDEBUG1) << __AT__ << " called";
cprintf(GREEN,"Info: Writing_Thread %d: End of Acquisition\n",ithread);
//free fifo
for(int i=0; i<numberofListeningThreads; ++i)
while(!fifoFree[i]->push(wbuffer[i]));
#ifdef
}
void UDPStandardImplementation::processWritingBuffer(int ithread, char* wbuffer[], uint32_t nP[]){
FILE_LOG(logDEBUG1) << __AT__ << " called";
}
void UDPStandardImplementation::processWritingBufferPacketByPacket(int ithread, char* wbuffer[], uint32_t nP[]){
FILE_LOG(logDEBUG1) << __AT__ << " called";
uint64_t tempframenumber = ((uint32_t)(*((uint32_t*)(wbuffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS))));
//for gotthard and normal frame, increment frame number to separate fnum and pnum
if (myDetectorType == PROPIX ||(myDetectorType == GOTTHARD && shortFrameEnable == -1))
tempframenumber++;
//get frame number
tempframenumber = (tempframenumber & frameIndexMask) >> frameIndexOffset;
//single thread, just assign and process without compression
if(!dataCompressionEnable){
currentFrameNumber = tempframenumber;
handleWithoutDataCompression(ithread, wbuffer, nP[0]);
}
//handling multiple threads
else{
pthread_mutex_lock(&progressMutex);
if(tempframenumber > currentFrameNumber)
currentFrameNumber = tempframenumber;
pthread_mutex_unlock(&progressMutex);
handleDataCompression(ithread,wbuffer,d, xmax, ymax, nf);
}
}
@ -2173,66 +2227,9 @@ int UDPStandardImplementation::startWriting(){
//pop
endofacquisition = true;
for(i=0;i<numListeningThreads;++i){
if(popready[i]){
fifo[i]->pop(wbuf[i]);
#ifdef FIFO_DEBUG
cprintf(GREEN,"%d writer poped 0x%x from fifo %d\n", ithread, (void*)(wbuf[i]), i);
#endif
numpackets[i] = (uint32_t)(*((uint32_t*)wbuf[i]));
#ifdef VERYDEBUG
cprintf(GREEN,"%d numpackets: %d for fifo :%d\n", ithread, numpackets[i], i);
#endif
if(numpackets < 0){
cprintf(BG_RED,"negative numpackets[%d]%d\n",i,numpackets[i]);
exit(-1);
}
//dont pop again if dummy packet
else if(numpackets[i] == 0){
popready[i] = false;
#ifdef EIGER_DEBUG3
cprintf(GREEN,"%d Dummy frame popped out of fifo %d",ithread, i);
#endif
}else{
endofacquisition = false;
if(numpackets[i] == onePacketSize){
#ifdef EIGER_DEBUG3
wbuf_footer = (eiger_packet_footer_t*)(wbuf[i] + footer_offset + HEADER_SIZE_NUM_TOT_PACKETS);
//cprintf(BLUE,"footer value:0x%x\n",i,(uint64_t)(*( (uint64_t*) wbuf_footer)));
cprintf(BLUE,"tempframenum[%d]:%d\n",i,(uint32_t)(*( (uint64_t*) wbuf_footer)));
cprintf(BLUE,"packetnum[%d]:%d\n",i,*( (uint16_t*) wbuf_footer->packetnum));
#endif
}
if(myDetectorType == EIGER){
tofree[tofreeoffset[i]] = wbuf[i];
tofreeoffset[i]++;
}
}
}
}
//END OF ACQUISITION
if(endofacquisition){
#ifdef EIGER_DEBUG3
cprintf(GREEN,"%d Both dummy frames\n", ithread);
#endif
//remaining packets to be written
if((myDetectorType == EIGER) &&
((tempoffset[0]!=0) || (tempoffset[1]!=(packetsPerFrame/numListeningThreads))));
else{
stopWriting(ithread,wbuf);
continue;
}
}
if(myDetectorType == EIGER){ if(myDetectorType == EIGER){
@ -2539,31 +2536,7 @@ int UDPStandardImplementation::startWriting(){
} }
//other detectors other than eiger
else{
//frame number for progress
if ((myDetectorType == PROPIX) ||((myDetectorType == GOTTHARD) && (shortFrame == -1)))
tempframenum[0] = (((((uint32_t)(*((uint32_t*)(wbuf[ithread] + HEADER_SIZE_NUM_TOT_PACKETS))))+1)& (frameIndexMask)) >> frameIndexOffset);
else
tempframenum[0] = ((((uint32_t)(*((uint32_t*)(wbuf[ithread] + HEADER_SIZE_NUM_TOT_PACKETS))))& (frameIndexMask)) >> frameIndexOffset);
if(numWriterThreads == 1)
currframenum = tempframenum[0];
else{
pthread_mutex_lock(&progress_mutex);
if(tempframenum[0] > currframenum)
currframenum = tempframenum[0];
pthread_mutex_unlock(&progress_mutex);
}
//without datacompression: write datacall back, or write data, free fifo
if(!dataCompression) handleWithoutDataCompression(ithread,wbuf, numpackets[0]);
//data compression
else handleDataCompression(ithread,wbuf,d, xmax, ymax, nf);
}
} }