changed number of files for compressed non root data to one

git-svn-id: file:///afs/psi.ch/project/sls_det_software/svn/slsDetectorSoftware@748 951219d9-93cf-4727-9268-0efd64621fa3
This commit is contained in:
l_maliakal_d
2014-03-04 14:07:50 +00:00
parent f55aebc79b
commit 7f22b3ff99
4 changed files with 148 additions and 232 deletions

View File

@ -86,7 +86,7 @@ public:
retval.append("Could not create file to start receiver.\nCheck permissions of output directory\n"); retval.append("Could not create file to start receiver.\nCheck permissions of output directory\n");
if(slsErrorMask&COULDNOT_ENABLE_COMPRESSION) if(slsErrorMask&COULDNOT_ENABLE_COMPRESSION)
retval.append("Could not enable/disable data compression in receiver.\nThread creation failed.\n"); retval.append("Could not enable/disable data compression in receiver.\nThread creation failed or recompile code with MYROOT1 flag.\n");

View File

@ -73,7 +73,6 @@ slsReceiverFunctionList::slsReceiverFunctionList(detectorType det):
thread_started(0), thread_started(0),
currentWriterThreadIndex(-1), currentWriterThreadIndex(-1),
totalListeningFrameCount(0), totalListeningFrameCount(0),
commonModeSubtractionEnable(false),
sfilefd(NULL), sfilefd(NULL),
writerthreads_mask(0x0), writerthreads_mask(0x0),
listening_thread_running(0), listening_thread_running(0),
@ -110,19 +109,19 @@ slsReceiverFunctionList::slsReceiverFunctionList(detectorType det):
strcpy(savefilename,""); strcpy(savefilename,"");
strcpy(filePath,""); strcpy(filePath,"");
strcpy(fileName,"run"); strcpy(fileName,"run");
cmSub = NULL;
for(int i=0;i<numWriterThreads;i++){ for(int i=0;i<numWriterThreads;i++){
#ifdef MYROOT1
commonModeSubtractionEnable = false;
singlePhotonDet[i] = NULL; singlePhotonDet[i] = NULL;
receiverdata[i] = NULL; receiverdata[i] = NULL;
#ifdef ALLFILE
packetsInAllFile[i] = 0;
sfilefdAll[i] = NULL;
#endif
#ifdef MYROOT1
myTree[i] = (NULL); myTree[i] = (NULL);
myFile[i] = (NULL); myFile[i] = (NULL);
#endif #endif
} }
#ifdef MYROOT1
cmSub = NULL;
#endif
setupFifoStructure(); setupFifoStructure();
@ -171,13 +170,14 @@ slsReceiverFunctionList::slsReceiverFunctionList(detectorType det):
slsReceiverFunctionList::~slsReceiverFunctionList(){ slsReceiverFunctionList::~slsReceiverFunctionList(){
createListeningThreads(true); createListeningThreads(true);
createWriterThreads(true); createWriterThreads(true);
#ifdef MYROOT1
for(int i=0;i<numWriterThreads;i++){ for(int i=0;i<numWriterThreads;i++){
if(singlePhotonDet[i]) if(singlePhotonDet[i])
delete singlePhotonDet[i]; delete singlePhotonDet[i];
if(receiverdata[i]) if(receiverdata[i])
delete receiverdata[i]; delete receiverdata[i];
} }
#endif
if(udpSocket) delete udpSocket; if(udpSocket) delete udpSocket;
if(eth) delete [] eth; if(eth) delete [] eth;
if(latestData) delete [] latestData; if(latestData) delete [] latestData;
@ -323,6 +323,10 @@ int64_t slsReceiverFunctionList::setAcquisitionPeriod(int64_t index){
int slsReceiverFunctionList::enableDataCompression(bool enable){ int slsReceiverFunctionList::enableDataCompression(bool enable){
#ifndef MYROOT1
return FAIL;
#endif
cout << "Data compression "; cout << "Data compression ";
if(enable) if(enable)
cout << "enabled" << endl; cout << "enabled" << endl;
@ -360,6 +364,7 @@ int slsReceiverFunctionList::enableDataCompression(bool enable){
void slsReceiverFunctionList::deleteFilter(){ void slsReceiverFunctionList::deleteFilter(){
#ifdef MYROOT1
cmSub=NULL; cmSub=NULL;
for(int i=0;i<numWriterThreads;i++){ for(int i=0;i<numWriterThreads;i++){
@ -372,23 +377,22 @@ void slsReceiverFunctionList::deleteFilter(){
receiverdata[i] = NULL; receiverdata[i] = NULL;
} }
} }
#endif
} }
void slsReceiverFunctionList::setupFilter(){ void slsReceiverFunctionList::setupFilter(){
#ifdef MYROOT1
double hc = 0; double hc = 0;
double sigma = 5; double sigma = 5;
int sign = 1; int sign = 1;
int csize; int csize;
int i; int i;
if (commonModeSubtractionEnable) if (commonModeSubtractionEnable)
cmSub=new moenchCommonMode(); cmSub=new moenchCommonMode();
switch(myDetectorType){ switch(myDetectorType){
case MOENCH: case MOENCH:
csize = 3; csize = 3;
@ -410,7 +414,7 @@ void slsReceiverFunctionList::setupFilter(){
for(i=0;i<numWriterThreads;i++) for(i=0;i<numWriterThreads;i++)
singlePhotonDet[i]=new singlePhotonDetector<uint16_t>(receiverdata[i], csize, sigma, sign, cmSub); singlePhotonDet[i]=new singlePhotonDetector<uint16_t>(receiverdata[i], csize, sigma, sign, cmSub);
#endif
} }
@ -723,12 +727,6 @@ int slsReceiverFunctionList::setupWriter(){
//reset writing thread variables //reset writing thread variables
packetsInFile=0; packetsInFile=0;
#ifdef ALLFILE
for(int i=0;i<numWriterThreads;i++){
packetsInAllFile[i] = 0;
if(sfilefdAll[i]) sfilefdAll[i] = NULL;
}
#endif
packetsCaught=0; packetsCaught=0;
frameIndex=0; frameIndex=0;
if(sfilefd) sfilefd=NULL; if(sfilefd) sfilefd=NULL;
@ -790,6 +788,16 @@ int slsReceiverFunctionList::setupWriter(){
if (createfile_mask) if (createfile_mask)
cout <<"*********************************************sooo weird:"<<createfile_mask<<endl; cout <<"*********************************************sooo weird:"<<createfile_mask<<endl;
if(dataCompression){
#ifdef ALLFILE
if(ret_createfile != FAIL){
int ret = createNewFile();
if(ret == FAIL)
ret_createfile = FAIL;
}
#endif
}
return ret_createfile; return ret_createfile;
} }
@ -820,46 +828,14 @@ int slsReceiverFunctionList::createCompressionFile(int ithr, int iframe){
cout<<"file not open"<<endl; cout<<"file not open"<<endl;
return FAIL; return FAIL;
} }
#endif
return OK; return OK;
#else
return FAIL;
#endif
} }
int slsReceiverFunctionList::createNewFile(int ithr){ int slsReceiverFunctionList::createNewFile(){
if(dataCompression){
#ifdef ALLFILE
//create file name
if(frameIndexNeeded==-1)
sprintf(savefilename, "%s/%s_%d_%d.raw", filePath,fileName,fileIndex,ithr);
else
sprintf(savefilename, "%s/%s_f%012d_%d_%d.raw", filePath,fileName,(packetsCaught/packetsPerFrame),fileIndex,ithr);
//close
if(sfilefdAll[ithr]){
fclose(sfilefdAll[ithr]);
sfilefdAll[ithr] = NULL;
}
//open file
if (NULL == (sfilefdAll[ithr] = fopen((const char *) (savefilename), "w"))){
cout << "Error: Could not create file " << savefilename << endl;
return FAIL;
}
//setting buffer
setvbuf(sfilefdAll[ithr],NULL,_IOFBF,BUF_SIZE);
cout << "File Created:" << savefilename << endl;
//reset counters for each new file
if(packetsCaught){
prevframenum = currframenum;
packetsInAllFile[ithr] = 0;
}
#endif
}else{
//create file name //create file name
if(frameIndexNeeded==-1) if(frameIndexNeeded==-1)
@ -881,6 +857,8 @@ int slsReceiverFunctionList::createNewFile(int ithr){
} }
//setting buffer //setting buffer
setvbuf(sfilefd,NULL,_IOFBF,BUF_SIZE); setvbuf(sfilefd,NULL,_IOFBF,BUF_SIZE);
if(!dataCompression){
//printing packet losses and file names //printing packet losses and file names
if(!packetsCaught) if(!packetsCaught)
cout << savefilename << endl; cout << savefilename << endl;
@ -896,13 +874,21 @@ int slsReceiverFunctionList::createNewFile(int ithr){
} }
} }
//data compression and dvpr flag allfile
else{
#ifdef ALLFILE
cout << "File created:" << savefilename << endl;
#endif
}
}
//reset counters for each new file //reset counters for each new file
if(packetsCaught){ if(packetsCaught){
prevframenum = currframenum; prevframenum = currframenum;
packetsInFile = 0; packetsInFile = 0;
} }
}
return OK; return OK;
} }
@ -934,12 +920,12 @@ void slsReceiverFunctionList::closeFile(int ithr){
#ifdef ALLFILE #ifdef ALLFILE
//close file //close file
if(sfilefdAll[ithr]){ if(sfilefd){
#ifdef VERBOSE #ifdef VERBOSE
cout << "sfield:" << (int)sfilefdAll[ithr] << endl; cout << "sfield:" << (int)sfilefd << endl;
#endif #endif
fclose(sfilefdAll[ithr]); fclose(sfilefd);
sfilefdAll[ithr] = NULL; sfilefd = NULL;
} }
#endif #endif
pthread_mutex_lock(&write_mutex); pthread_mutex_lock(&write_mutex);
@ -1471,10 +1457,15 @@ int slsReceiverFunctionList::startWriting(){
//data compression //data compression
else{ else{
#ifdef MYROOT1
#ifdef ALLFILE #ifdef ALLFILE
writeToFile_withoutCompression(wbuf, numpackets,ithread); writeToFile_withoutCompression(wbuf, numpackets);
#ifndef MYROOT1
copyFrameToGui(wbuf + HEADER_SIZE_NUM_TOT_PACKETS);
#endif #endif
#endif
#ifdef MYROOT1
eventType thisEvent = PEDESTAL; eventType thisEvent = PEDESTAL;
int ndata; int ndata;
char* buff = 0; char* buff = 0;
@ -1527,14 +1518,13 @@ int slsReceiverFunctionList::startWriting(){
} }
nf++; nf++;
#ifndef ALLFILE
pthread_mutex_lock(&progress_mutex); pthread_mutex_lock(&progress_mutex);
packetsInFile += packetsPerFrame; packetsInFile += packetsPerFrame;
packetsCaught += packetsPerFrame; packetsCaught += packetsPerFrame;
totalPacketsCaught += packetsPerFrame; totalPacketsCaught += packetsPerFrame;
pthread_mutex_unlock(&progress_mutex); pthread_mutex_unlock(&progress_mutex);
#endif
if(!once){ if(!once){
copyFrameToGui(buff); copyFrameToGui(buff);
once = 1; once = 1;
@ -1547,13 +1537,12 @@ int slsReceiverFunctionList::startWriting(){
cout <<" **************ERROR SHOULD NOT COME HERE, Error 142536!"<<endl; cout <<" **************ERROR SHOULD NOT COME HERE, Error 142536!"<<endl;
} }
#endif
while(!fifoFree->push(wbuf)); while(!fifoFree->push(wbuf));
#ifdef VERYVERBOSE #ifdef VERYVERBOSE
cout<<"buf freed:"<<(void*)wbuf<<endl; cout<<"buf freed:"<<(void*)wbuf<<endl;
#endif #endif
#endif
} }
} }
#ifdef VERYVERBOSE #ifdef VERYVERBOSE
@ -1578,13 +1567,6 @@ int slsReceiverFunctionList::startWriting(){
pthread_mutex_unlock(&write_mutex); pthread_mutex_unlock(&write_mutex);
if(ret == FAIL) if(ret == FAIL)
ret_createfile = FAIL; ret_createfile = FAIL;
#ifdef ALLFILE
if(ret != FAIL){
ret = createNewFile(ithread);
if(ret == FAIL)
ret_createfile = FAIL;
}
#endif
}else{ }else{
ret = createNewFile(); ret = createNewFile();
if(ret == FAIL) if(ret == FAIL)
@ -1629,80 +1611,15 @@ int slsReceiverFunctionList::startWriting(){
void slsReceiverFunctionList::writeToFile_withoutCompression(char* buf,int numpackets, int ithr){ void slsReceiverFunctionList::writeToFile_withoutCompression(char* buf,int numpackets){
int packetsToSave, offset,tempframenum,lastpacket; int packetsToSave, offset,tempframenum,lastpacket;
if(dataCompression){
#ifdef ALLFILE
//file write
if((enableFileWrite) && (sfilefdAll[ithr])){
offset = HEADER_SIZE_NUM_TOT_PACKETS;
while(numpackets > 0){
//for progress and packet loss calculation(new files)
if ((myDetectorType == GOTTHARD) && (shortFrame == -1))
tempframenum = (((((uint32_t)(*((uint32_t*)(buf + HEADER_SIZE_NUM_TOT_PACKETS))))+1)& (frameIndexMask)) >> frameIndexOffset);
else
tempframenum = ((((uint32_t)(*((uint32_t*)(buf + HEADER_SIZE_NUM_TOT_PACKETS))))& (frameIndexMask)) >> frameIndexOffset);
if(numWriterThreads == 1)
currframenum = tempframenum;
else{
if(tempframenum > currframenum)
currframenum = tempframenum;
}
#ifdef VERYDEBUG
cout << "tempframenum:" << dec << tempframenum << " curframenum:" << currframenum << endl;
#endif
//to create new file when max reached
packetsToSave = maxPacketsPerFile - packetsInAllFile[ithr];
if(packetsToSave > numpackets)
packetsToSave = numpackets;
fwrite(buf+offset, 1, packetsToSave * onePacketSize, sfilefdAll[ithr]);
packetsInAllFile[ithr] += packetsToSave;
//new file
if(packetsInAllFile[ithr] >= maxPacketsPerFile){
lastpacket = (((packetsToSave - 1) * onePacketSize) + offset);
//for packet loss
if ((myDetectorType == GOTTHARD) && (shortFrame == -1))
tempframenum = (((((uint32_t)(*((uint32_t*)(buf + lastpacket))))+1)& (frameIndexMask)) >> frameIndexOffset);
else
tempframenum = ((((uint32_t)(*((uint32_t*)(buf + lastpacket))))& (frameIndexMask)) >> frameIndexOffset);
if(numWriterThreads == 1)
currframenum = tempframenum;
else{
if(tempframenum > currframenum)
currframenum = tempframenum;
}
#ifdef VERYDEBUG
cout << "tempframenum:" << dec << tempframenum << " curframenum:" << currframenum << endl;
#endif
createNewFile(ithr);
}
offset += (packetsToSave * onePacketSize);
numpackets -= packetsToSave;
}
}
//no file write
else{
packetsInAllFile[ithr] += numpackets;
}
#endif
}
else{
//file write //file write
if((enableFileWrite) && (sfilefd)){ if((enableFileWrite) && (sfilefd)){
offset = HEADER_SIZE_NUM_TOT_PACKETS; offset = HEADER_SIZE_NUM_TOT_PACKETS;
while(numpackets > 0){ while(numpackets > 0){
//for progress and packet loss calculation(new files) //for progress and packet loss calculation(new files)
if ((myDetectorType == GOTTHARD) && (shortFrame == -1)) if ((myDetectorType == GOTTHARD) && (shortFrame == -1))
tempframenum = (((((uint32_t)(*((uint32_t*)(buf + HEADER_SIZE_NUM_TOT_PACKETS))))+1)& (frameIndexMask)) >> frameIndexOffset); tempframenum = (((((uint32_t)(*((uint32_t*)(buf + HEADER_SIZE_NUM_TOT_PACKETS))))+1)& (frameIndexMask)) >> frameIndexOffset);
@ -1719,6 +1636,10 @@ void slsReceiverFunctionList::writeToFile_withoutCompression(char* buf,int numpa
cout << "tempframenum:" << dec << tempframenum << " curframenum:" << currframenum << endl; cout << "tempframenum:" << dec << tempframenum << " curframenum:" << currframenum << endl;
#endif #endif
//lock
if(numWriterThreads > 1)
pthread_mutex_lock(&write_mutex);
//to create new file when max reached //to create new file when max reached
packetsToSave = maxPacketsPerFile - packetsInFile; packetsToSave = maxPacketsPerFile - packetsInFile;
@ -1730,11 +1651,11 @@ void slsReceiverFunctionList::writeToFile_withoutCompression(char* buf,int numpa
packetsCaught += packetsToSave; packetsCaught += packetsToSave;
totalPacketsCaught += packetsToSave; totalPacketsCaught += packetsToSave;
//new file //new file
if(packetsInFile >= maxPacketsPerFile){ if(packetsInFile >= maxPacketsPerFile){
lastpacket = (((packetsToSave - 1) * onePacketSize) + offset);
//for packet loss //for packet loss
lastpacket = (((packetsToSave - 1) * onePacketSize) + offset);
if ((myDetectorType == GOTTHARD) && (shortFrame == -1)) if ((myDetectorType == GOTTHARD) && (shortFrame == -1))
tempframenum = (((((uint32_t)(*((uint32_t*)(buf + lastpacket))))+1)& (frameIndexMask)) >> frameIndexOffset); tempframenum = (((((uint32_t)(*((uint32_t*)(buf + lastpacket))))+1)& (frameIndexMask)) >> frameIndexOffset);
else else
@ -1749,22 +1670,31 @@ void slsReceiverFunctionList::writeToFile_withoutCompression(char* buf,int numpa
#ifdef VERYDEBUG #ifdef VERYDEBUG
cout << "tempframenum:" << dec << tempframenum << " curframenum:" << currframenum << endl; cout << "tempframenum:" << dec << tempframenum << " curframenum:" << currframenum << endl;
#endif #endif
//create
createNewFile(ithr); createNewFile();
} }
//unlock
if(numWriterThreads > 1)
pthread_mutex_unlock(&write_mutex);
offset += (packetsToSave * onePacketSize); offset += (packetsToSave * onePacketSize);
numpackets -= packetsToSave; numpackets -= packetsToSave;
} }
} }
else{ else{
if(numWriterThreads > 1)
pthread_mutex_lock(&write_mutex);
packetsInFile += numpackets; packetsInFile += numpackets;
packetsCaught += numpackets; packetsCaught += numpackets;
totalPacketsCaught += numpackets; totalPacketsCaught += numpackets;
if(numWriterThreads > 1)
pthread_mutex_unlock(&write_mutex);
} }
}
} }

View File

@ -13,11 +13,10 @@
#include "genericSocket.h" #include "genericSocket.h"
#include "circularFifo.h" #include "circularFifo.h"
#ifdef MYROOT1
#include "singlePhotonDetector.h" #include "singlePhotonDetector.h"
#include "slsReceiverData.h" #include "slsReceiverData.h"
#include "moenchCommonMode.h" #include "moenchCommonMode.h"
#ifdef MYROOT1
#include <TTree.h> #include <TTree.h>
#include <TFile.h> #include <TFile.h>
#endif #endif
@ -270,10 +269,9 @@ private:
/** /**
* Creates new file * Creates new file
* @param ithr thread number
*\returns OK for succces or FAIL for failure *\returns OK for succces or FAIL for failure
*/ */
int createNewFile(int ithr = 0); int createNewFile();
/** /**
* Static function - Thread started which listens to packets. * Static function - Thread started which listens to packets.
@ -307,10 +305,9 @@ private:
/** /**
* Writing to file without compression * Writing to file without compression
* @param buf is the address of buffer popped out of fifo * @param buf is the address of buffer popped out of fifo
* @param num * @param numpackets is the number of packets
* @param ithr thread number
*/ */
void writeToFile_withoutCompression(char* buf,int numpackets,int ithr = 0); void writeToFile_withoutCompression(char* buf,int numpackets);
@ -487,14 +484,6 @@ private:
int killAllWritingThreads; int killAllWritingThreads;
//filter
singlePhotonDetector<uint16_t> *singlePhotonDet[MAX_NUM_WRITER_THREADS];
slsReceiverData<uint16_t> *receiverdata[MAX_NUM_WRITER_THREADS];
moenchCommonMode *cmSub;
bool commonModeSubtractionEnable;
//semaphores //semaphores
@ -519,8 +508,16 @@ private:
/** mutex for writing data to file */ /** mutex for writing data to file */
pthread_mutex_t write_mutex; pthread_mutex_t write_mutex;
/** File Descriptor */
FILE *sfilefd;
#ifdef MYROOT1 #ifdef MYROOT1
//filter
singlePhotonDetector<uint16_t> *singlePhotonDet[MAX_NUM_WRITER_THREADS];
slsReceiverData<uint16_t> *receiverdata[MAX_NUM_WRITER_THREADS];
moenchCommonMode *cmSub;
bool commonModeSubtractionEnable;
/** Tree where the hits are stored */ /** Tree where the hits are stored */
TTree *myTree[MAX_NUM_WRITER_THREADS]; TTree *myTree[MAX_NUM_WRITER_THREADS];
@ -528,17 +525,6 @@ private:
TFile *myFile[MAX_NUM_WRITER_THREADS]; TFile *myFile[MAX_NUM_WRITER_THREADS];
#endif #endif
/** File Descriptor */
FILE *sfilefd;
#ifdef ALLFILE
/** File Descriptor */
FILE *sfilefdAll[MAX_NUM_WRITER_THREADS];
/** Pckets currently in current file, starts new file when it reaches max for the current thread*/
int packetsInAllFile[MAX_NUM_WRITER_THREADS];
#endif
/** /**
callback arguments are callback arguments are