mirror of
https://github.com/slsdetectorgroup/slsDetectorPackage.git
synced 2025-06-10 20:07:14 +02:00
make it work for multi threaded compression receiver for moench only
git-svn-id: file:///afs/psi.ch/project/sls_det_software/svn/slsDetectorSoftware@722 951219d9-93cf-4727-9268-0efd64621fa3
This commit is contained in:
@ -22,8 +22,8 @@
|
||||
#include <iostream>
|
||||
using namespace std;
|
||||
|
||||
FILE* slsReceiverFunctionList::sfilefd(NULL);
|
||||
int slsReceiverFunctionList::receiver_threads_running(0);
|
||||
|
||||
|
||||
|
||||
slsReceiverFunctionList::slsReceiverFunctionList(detectorType det):
|
||||
myDetectorType(det),
|
||||
@ -67,14 +67,13 @@ slsReceiverFunctionList::slsReceiverFunctionList(detectorType det):
|
||||
buffer(NULL),
|
||||
numWriterThreads(1),
|
||||
thread_started(0),
|
||||
writerthreads_mask(0x0),
|
||||
currentWriterThreadIndex(-1),
|
||||
totalListeningFrameCount(0),
|
||||
running(0),
|
||||
singlePhotonDet(NULL),
|
||||
mdecoder(NULL),
|
||||
commonModeSubtractionEnable(false),
|
||||
iFrame(0),
|
||||
sfilefd(NULL),
|
||||
writerthreads_mask(0x0),
|
||||
listening_thread_running(0),
|
||||
cbAction(DO_EVERYTHING),
|
||||
startAcquisitionCallBack(NULL),
|
||||
pStartAcquisition(NULL),
|
||||
acquisitionFinishedCallBack(NULL),
|
||||
@ -105,7 +104,14 @@ slsReceiverFunctionList::slsReceiverFunctionList(detectorType det):
|
||||
strcpy(savefilename,"");
|
||||
strcpy(filePath,"");
|
||||
strcpy(fileName,"run");
|
||||
|
||||
for(int i=0;i<numWriterThreads;i++){
|
||||
singlePhotonDet[i] = NULL;
|
||||
mdecoder[i] = NULL;
|
||||
#ifdef MYROOT1
|
||||
myTree[i] = (NULL);
|
||||
myFile[i] = (NULL);
|
||||
#endif
|
||||
}
|
||||
|
||||
setupFifoStructure();
|
||||
|
||||
@ -114,24 +120,49 @@ slsReceiverFunctionList::slsReceiverFunctionList(detectorType det):
|
||||
pthread_mutex_init(&progress_mutex,NULL);
|
||||
pthread_mutex_init(&write_mutex,NULL);
|
||||
|
||||
|
||||
|
||||
//to increase socket receiver buffer size and max length of input queue by changing kernel settings
|
||||
if(system("echo $((100*1024*1024)) > /proc/sys/net/core/rmem_max"))
|
||||
cout << "\nWARNING: Could not change socket receiver buffer size in file /proc/sys/net/core/rmem_max" << endl;
|
||||
else if(system("echo 250000 > /proc/sys/net/core/netdev_max_backlog"))
|
||||
cout << "\nWARNING: Could not change max length of input queue in file /proc/sys/net/core/netdev_max_backlog" << endl;
|
||||
|
||||
/** permanent setting heiner
|
||||
net.core.rmem_max = 104857600 # 100MiB
|
||||
net.core.netdev_max_backlog = 250000
|
||||
sysctl -p
|
||||
// from the manual
|
||||
sysctl -w net.core.rmem_max=16777216
|
||||
sysctl -w net.core.netdev_max_backlog=250000
|
||||
*/
|
||||
/*
|
||||
if(createThreads() == FAIL){
|
||||
cout << "ERROR: Could not create writer threads" << endl;
|
||||
exit (-1);
|
||||
}
|
||||
|
||||
*/
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
slsReceiverFunctionList::~slsReceiverFunctionList(){
|
||||
if(udpSocket) delete udpSocket;
|
||||
if(eth) delete [] eth;
|
||||
if(latestData) delete [] latestData;
|
||||
if(guiFileName) delete [] guiFileName;
|
||||
if(mem0) free(mem0);
|
||||
if(fifo) delete fifo;
|
||||
if(fifoFree) delete fifoFree;
|
||||
closeFile(-1);
|
||||
for(int i=0;i<numWriterThreads;i++){
|
||||
if(singlePhotonDet[i])
|
||||
delete singlePhotonDet[i];
|
||||
if(mdecoder[i])
|
||||
delete mdecoder[i];
|
||||
}
|
||||
createThreads(true);
|
||||
|
||||
if(udpSocket) delete udpSocket;
|
||||
if(eth) delete [] eth;
|
||||
if(latestData) delete [] latestData;
|
||||
if(guiFileName) delete [] guiFileName;
|
||||
if(mem0) free(mem0);
|
||||
if(fifo) delete fifo;
|
||||
}
|
||||
|
||||
|
||||
@ -265,40 +296,20 @@ int64_t slsReceiverFunctionList::setAcquisitionPeriod(int64_t index){
|
||||
}
|
||||
|
||||
|
||||
void slsReceiverFunctionList::setupFilter(){
|
||||
cout<<"************in set up filter"<<endl;
|
||||
/*******************/
|
||||
if(mdecoder) { delete mdecoder; mdecoder = NULL;}
|
||||
if(singlePhotonDet) { delete singlePhotonDet; singlePhotonDet = NULL;}
|
||||
|
||||
if(dataCompression){
|
||||
if(myDetectorType == MOENCH){
|
||||
double hc=0;
|
||||
int sign=1;
|
||||
|
||||
mdecoder=new moench02ModuleData(hc);
|
||||
moenchCommonMode *cmSub=NULL;
|
||||
if (commonModeSubtractionEnable)
|
||||
cmSub=new moenchCommonMode();
|
||||
|
||||
singlePhotonDet=new singlePhotonDetector<uint16_t>(mdecoder, 3, 5, sign, cmSub);
|
||||
cout<<"************filter created"<<endl;
|
||||
}
|
||||
}
|
||||
/*******************/
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
/******************* need to look at exit strategy **************************/
|
||||
void slsReceiverFunctionList::enableDataCompression(bool enable){
|
||||
dataCompression = enable;
|
||||
|
||||
createThreads(true);
|
||||
pthread_mutex_lock(&status_mutex);
|
||||
listening_thread_running = false;
|
||||
writerthreads_mask = 0x0;
|
||||
pthread_mutex_unlock(&(status_mutex));
|
||||
|
||||
/*createThreads(true);*/
|
||||
if(enable)
|
||||
numWriterThreads = 1;//MAX_NUM_WRITER_THREADS;
|
||||
numWriterThreads = MAX_NUM_WRITER_THREADS;
|
||||
else
|
||||
numWriterThreads = 1;
|
||||
createThreads();
|
||||
@ -309,6 +320,36 @@ void slsReceiverFunctionList::enableDataCompression(bool enable){
|
||||
|
||||
|
||||
|
||||
void slsReceiverFunctionList::setupFilter(){
|
||||
for(int i=0;i<numWriterThreads;i++){
|
||||
if(singlePhotonDet[i]){
|
||||
delete singlePhotonDet[i];
|
||||
singlePhotonDet[i] = NULL;
|
||||
}
|
||||
if(mdecoder[i]){
|
||||
delete mdecoder[i];
|
||||
mdecoder[i] = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
if(dataCompression){
|
||||
if(myDetectorType == MOENCH){
|
||||
double hc=0;
|
||||
int sign=1;
|
||||
|
||||
|
||||
moenchCommonMode *cmSub=NULL;
|
||||
if (commonModeSubtractionEnable)
|
||||
cmSub=new moenchCommonMode();
|
||||
|
||||
for(int i=0;i<numWriterThreads;i++){
|
||||
mdecoder[i]=new moench02ModuleData(hc);
|
||||
singlePhotonDet[i]=new singlePhotonDetector<uint16_t>(mdecoder[i], 3, 5, sign, cmSub);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void slsReceiverFunctionList::readFrame(char* c,char** raw){
|
||||
//point to gui data
|
||||
@ -330,7 +371,7 @@ void slsReceiverFunctionList::readFrame(char* c,char** raw){
|
||||
pthread_mutex_lock(&dataReadyMutex);
|
||||
guiDataReady = 0;
|
||||
pthread_mutex_unlock(&dataReadyMutex);
|
||||
if((nFrameToGui) && (receiver_threads_running)){
|
||||
if((nFrameToGui) && (writerthreads_mask)){
|
||||
//release after getting data
|
||||
sem_post(&smp);
|
||||
}
|
||||
@ -501,18 +542,19 @@ int slsReceiverFunctionList::createThreads(bool destroy){
|
||||
|
||||
if(!destroy){
|
||||
|
||||
//listening thread
|
||||
pthread_mutex_lock(&status_mutex);
|
||||
status = IDLE;
|
||||
running = 0;
|
||||
listening_thread_running = 0;
|
||||
writerthreads_mask = 0x0;
|
||||
pthread_mutex_unlock(&(status_mutex));
|
||||
|
||||
|
||||
//listening thread
|
||||
sem_init(&listensmp,0,0);
|
||||
if(pthread_create(&listening_thread, NULL,startListeningThread, (void*) this)){
|
||||
cout << "Could not create listening thread" << endl;
|
||||
return FAIL;
|
||||
}
|
||||
|
||||
//#ifdef VERBOSE
|
||||
cout << "Listening thread created successfully." << endl;
|
||||
//#endif
|
||||
@ -520,9 +562,7 @@ int slsReceiverFunctionList::createThreads(bool destroy){
|
||||
|
||||
//start writer threads
|
||||
cout << "Creating Writer Threads";
|
||||
writerthreads_mask = 0x0;
|
||||
currentWriterThreadIndex = -1;
|
||||
|
||||
for(i = 0; i < numWriterThreads; ++i){
|
||||
sem_init(&writersmp[i],0,0);
|
||||
thread_started = 0;
|
||||
@ -563,25 +603,11 @@ int slsReceiverFunctionList::createThreads(bool destroy){
|
||||
cout << "WARNING: Could not prioritize threads. You need to be super user for that." << endl;
|
||||
|
||||
|
||||
//to increase socket receiver buffer size and max length of input queue by changing kernel settings
|
||||
if(system("echo $((100*1024*1024)) > /proc/sys/net/core/rmem_max"))
|
||||
cout << "\nWARNING: Could not change socket receiver buffer size in file /proc/sys/net/core/rmem_max" << endl;
|
||||
else if(system("echo 250000 > /proc/sys/net/core/netdev_max_backlog"))
|
||||
cout << "\nWARNING: Could not change max length of input queue in file /proc/sys/net/core/netdev_max_backlog" << endl;
|
||||
|
||||
/** permanent setting heiner
|
||||
net.core.rmem_max = 104857600 # 100MiB
|
||||
net.core.netdev_max_backlog = 250000
|
||||
sysctl -p
|
||||
// from the manual
|
||||
sysctl -w net.core.rmem_max=16777216
|
||||
sysctl -w net.core.netdev_max_backlog=250000
|
||||
*/
|
||||
|
||||
|
||||
}
|
||||
|
||||
else{
|
||||
else{cout<<"DESTROYNG THREADS"<<endl;
|
||||
//cancel threads
|
||||
for(i = 0; i < numWriterThreads; ++i){
|
||||
if(pthread_cancel(writing_thread[i])!=0)
|
||||
@ -590,6 +616,8 @@ int slsReceiverFunctionList::createThreads(bool destroy){
|
||||
sem_destroy(&writersmp[i]);
|
||||
}
|
||||
//semaphore destroy
|
||||
if(pthread_cancel(listening_thread)!=0)
|
||||
cout << "Unable to cancel listening Thread " << endl;
|
||||
sem_post(&listensmp);
|
||||
sem_destroy(&listensmp);
|
||||
cout << "Threads destroyed" << endl;
|
||||
@ -618,6 +646,8 @@ int slsReceiverFunctionList::setupWriter(){
|
||||
guiDataReady=0;
|
||||
strcpy(guiFileName,"");
|
||||
cbAction = DO_EVERYTHING;
|
||||
writerthreads_mask = 0x0;
|
||||
int ret,ret1 = OK;
|
||||
|
||||
|
||||
//printouts
|
||||
@ -644,33 +674,55 @@ int slsReceiverFunctionList::setupWriter(){
|
||||
|
||||
//creating first file
|
||||
if(!dataCompression)
|
||||
return createNewFile();
|
||||
ret = createNewFile();
|
||||
else{
|
||||
#ifdef MYROOT1
|
||||
/**********************************/
|
||||
//create file name for gui purposes, and set up acquistion parameters
|
||||
sprintf(savefilename, "%s/%s_fxxx_%d.raw", filePath,fileName,fileIndex);
|
||||
|
||||
myTree=singlePhotonDet->initEventTree(savefilename, &iFrame);
|
||||
|
||||
singlePhotonDet->newDataSet();
|
||||
/**********************************/
|
||||
/*
|
||||
|
||||
filter->setupAcquisitionParameters(filePath,fileName,fileIndex);
|
||||
// if(enableFileWrite && cbAction > DO_NOTHING)
|
||||
// This commented option doesnt exist as we save and do ebverything for data compression
|
||||
//create file
|
||||
return filter->initTree();*/
|
||||
#endif
|
||||
return OK;
|
||||
for(int i=0;i<numWriterThreads;i++){
|
||||
pthread_mutex_lock(&write_mutex);
|
||||
ret1 = createCompressionFile(i,0);
|
||||
pthread_mutex_unlock(&write_mutex);
|
||||
if(ret1 == FAIL)
|
||||
ret = FAIL;
|
||||
}
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
int slsReceiverFunctionList::createCompressionFile(int ithr, int iframe){
|
||||
#ifdef MYROOT1
|
||||
|
||||
//create file name for gui purposes, and set up acquistion parameters
|
||||
sprintf(savefilename, "%s/%s_fxxx_%d_%d.root", filePath,fileName,fileIndex,ithr);
|
||||
//file
|
||||
myFile[ithr] = new TFile(savefilename,"RECREATE");/** later return error if it exists */
|
||||
cout<<"File created: "<<savefilename<<endl;
|
||||
//tree
|
||||
sprintf(savefilename, "%s_fxxx_%d_%d_",fileName,fileIndex,ithr);
|
||||
myTree[ithr]=singlePhotonDet[ithr]->initEventTree(savefilename, &iframe);
|
||||
//resets the pedestalSubtraction array and the commonModeSubtraction
|
||||
singlePhotonDet[ithr]->newDataSet();
|
||||
if(myFile[ithr]==NULL){
|
||||
cout<<"file not null"<<endl;
|
||||
return FAIL;
|
||||
}
|
||||
if(!myFile[ithr]->IsOpen()){
|
||||
cout<<"file not open"<<endl;
|
||||
return FAIL;
|
||||
}
|
||||
return OK;
|
||||
#else
|
||||
return FAIL;
|
||||
#endif
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
int slsReceiverFunctionList::createNewFile(){
|
||||
|
||||
//create file name
|
||||
@ -723,6 +775,58 @@ int slsReceiverFunctionList::createNewFile(){
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
void slsReceiverFunctionList::closeFile(int ithr){
|
||||
if(!dataCompression){
|
||||
//close file
|
||||
if(sfilefd){
|
||||
#ifdef VERBOSE
|
||||
cout << "sfield:" << (int)sfilefd << endl;
|
||||
#endif
|
||||
fclose(sfilefd);
|
||||
sfilefd = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
//datacompression
|
||||
else{
|
||||
#ifdef MYROOT1
|
||||
if(ithr == -1){
|
||||
for(int i=0;i<numWriterThreads;i++)
|
||||
closeFile(i);
|
||||
}
|
||||
else{
|
||||
//write to file
|
||||
/*if(packetsInFile){*/
|
||||
if(myTree[ithr] && myFile[ithr]){
|
||||
/*if(tall->Write(tall->GetName(),TObject::kOverwrite);*/
|
||||
myFile[ithr] = myTree[ithr]->GetCurrentFile();
|
||||
if(myFile[ithr]->Write())
|
||||
cout << "Thread " << ithr <<" wrote frames to file" << endl;
|
||||
else
|
||||
cout << "Thread " << ithr << " could not write frames to file" << endl;
|
||||
}else
|
||||
cout << "Thread " << ithr << " could not write frames to file: No file or No Tree" << endl;
|
||||
/* packetsInFile = 0;
|
||||
}*/
|
||||
//close file
|
||||
if(myTree[ithr] && myFile[ithr])
|
||||
myFile[ithr] = myTree[ithr]->GetCurrentFile();
|
||||
if(myFile[ithr])
|
||||
myFile[ithr]->Close();
|
||||
myFile[ithr] = NULL;
|
||||
myTree[ithr] = NULL;
|
||||
}
|
||||
#endif
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
int slsReceiverFunctionList::startReceiver(char message[]){
|
||||
//#ifdef VERBOSE
|
||||
cout << "Starting Receiver" << endl;
|
||||
@ -752,22 +856,27 @@ int slsReceiverFunctionList::startReceiver(char message[]){
|
||||
return FAIL;
|
||||
}
|
||||
|
||||
//done to give the gui some proper name instead of always the last file name
|
||||
if(dataCompression)
|
||||
sprintf(savefilename, "%s/%s_fxxx_%d_xx.root", filePath,fileName,fileIndex);
|
||||
|
||||
//initialize semaphore
|
||||
sem_init(&smp,0,1);
|
||||
|
||||
//status
|
||||
pthread_mutex_lock(&status_mutex);
|
||||
status = RUNNING;
|
||||
receiver_threads_running = 1;
|
||||
running = 1;
|
||||
for(int i=0;i<numWriterThreads;i++)
|
||||
writerthreads_mask|=(1<<i);
|
||||
listening_thread_running = 1;
|
||||
pthread_mutex_unlock(&(status_mutex));
|
||||
|
||||
|
||||
//start listening /writing
|
||||
sem_post(&listensmp);
|
||||
for(int i=0; i < numWriterThreads; ++i)
|
||||
for(int i=0; i < numWriterThreads; ++i){
|
||||
sem_post(&writersmp[i]);
|
||||
|
||||
}
|
||||
cout << "Receiver Started.\nStatus:" << status << endl;
|
||||
|
||||
return OK;
|
||||
@ -793,7 +902,6 @@ int slsReceiverFunctionList::stopReceiver(){
|
||||
|
||||
//change status
|
||||
pthread_mutex_lock(&status_mutex);
|
||||
receiver_threads_running = 0;
|
||||
status = IDLE;
|
||||
pthread_mutex_unlock(&(status_mutex));
|
||||
|
||||
@ -849,6 +957,7 @@ int slsReceiverFunctionList::startListening(){
|
||||
int lastframeheader;// for moench to check for all the packets in last frame
|
||||
char* tempchar = NULL;
|
||||
|
||||
|
||||
while(1){
|
||||
//variables that need to be checked/set before each acquisition
|
||||
carryonBufferSize = 0;
|
||||
@ -857,7 +966,7 @@ int slsReceiverFunctionList::startListening(){
|
||||
tempchar = new char[onePacketSize * (packetsPerFrame - 1)]; //gotthard: 1packet size, moench:39 packet size
|
||||
|
||||
|
||||
while(running){
|
||||
while(listening_thread_running){
|
||||
|
||||
//pop
|
||||
fifoFree->pop(buffer);
|
||||
@ -872,6 +981,10 @@ int slsReceiverFunctionList::startListening(){
|
||||
}else{
|
||||
#ifdef VERYDEBUG
|
||||
cout << "***carry on buffer" << carryonBufferSize << endl;
|
||||
cout<<"framennum in temochar:"<<((((uint32_t)(*((uint32_t*)tempchar)))
|
||||
& (frameIndexMask)) >> frameIndexOffset)<<endl;
|
||||
cout <<"temochar packet:"<< ((((uint32_t)(*((uint32_t*)(tempchar)))))
|
||||
& (packetIndexMask)) << endl;
|
||||
#endif
|
||||
//if there is a packet from previous buffer, copy it and listen to n less frame
|
||||
memcpy(buffer + HEADER_SIZE_NUM_TOT_PACKETS, tempchar, carryonBufferSize);
|
||||
@ -880,8 +993,8 @@ int slsReceiverFunctionList::startListening(){
|
||||
}
|
||||
|
||||
#ifdef VERYDEBUG
|
||||
cout << "*** rc:" << rc << endl;
|
||||
cout << "*** expected:" << expected << endl;
|
||||
cout << "*** rc:" << dec << rc << endl;
|
||||
cout << "*** expected:" << dec << expected << endl;
|
||||
#endif
|
||||
//start indices
|
||||
//start of scan
|
||||
@ -930,16 +1043,20 @@ int slsReceiverFunctionList::startListening(){
|
||||
cout << "*** last lbuf1:" << (void*)buffer << endl;
|
||||
#endif
|
||||
}
|
||||
|
||||
|
||||
//push dummy buffer
|
||||
fifoFree->pop(buffer);
|
||||
(*((uint16_t*)(buffer))) = 0xFFFF;
|
||||
while(!fifo->push(buffer));
|
||||
for(int i=0;i<numWriterThreads;++i){
|
||||
fifoFree->pop(buffer);
|
||||
(*((uint16_t*)(buffer))) = 0xFFFF;
|
||||
while(!fifo->push(buffer));
|
||||
#ifdef VERYDEBUG
|
||||
cout << "pushed in dummy buffer:" << (void*)buffer << endl;
|
||||
cout << "pushed in dummy buffer:" << (void*)buffer << endl;
|
||||
#endif
|
||||
}
|
||||
cout << "Total count listened to " << totalListeningFrameCount/packetsPerFrame << endl;
|
||||
pthread_mutex_lock(&status_mutex);
|
||||
running = 0;
|
||||
listening_thread_running = 0;
|
||||
pthread_mutex_unlock(&(status_mutex));
|
||||
break;
|
||||
}
|
||||
@ -949,6 +1066,7 @@ int slsReceiverFunctionList::startListening(){
|
||||
packetcount = packetsPerFrame * numJobsPerThread;
|
||||
carryonBufferSize = 0;
|
||||
|
||||
|
||||
//check if last packet valid and calculate packet count
|
||||
switch(myDetectorType){
|
||||
|
||||
@ -978,8 +1096,8 @@ int slsReceiverFunctionList::startListening(){
|
||||
#ifdef VERYDEBUG
|
||||
cout << "tempchar header:" << (((((uint32_t)(*((uint32_t*)(tempchar)))))
|
||||
& (frameIndexMask)) >> frameIndexOffset) << endl;
|
||||
cout << "header:" << (((((uint32_t)(*((uint32_t*)(buffer + HEADER_SIZE_NUM_TOT_PACKETS)))))
|
||||
& (frameIndexMask)) >> frameIndexOffset) << endl;
|
||||
cout <<"tempchar packet:"<< ((((uint32_t)(*((uint32_t*)(tempchar)))))
|
||||
& (packetIndexMask)) << endl;
|
||||
#endif
|
||||
}
|
||||
break;
|
||||
@ -1014,7 +1132,7 @@ int slsReceiverFunctionList::startListening(){
|
||||
|
||||
}
|
||||
#ifdef VERYDEBUG
|
||||
cout << "*** packetcount:" << packetcount << endl;
|
||||
cout << "*** packetcount:" << packetcount << " carryonbuffer:" << carryonBufferSize << endl;
|
||||
#endif
|
||||
//write packet count and push
|
||||
(*((uint16_t*)(buffer))) = packetcount;
|
||||
@ -1024,7 +1142,9 @@ int slsReceiverFunctionList::startListening(){
|
||||
cout << "*** pushed into listening fifo" << endl;
|
||||
#endif
|
||||
}
|
||||
|
||||
sem_wait(&listensmp);
|
||||
|
||||
}
|
||||
|
||||
return OK;
|
||||
@ -1052,68 +1172,89 @@ int slsReceiverFunctionList::startWriting(){
|
||||
|
||||
thread_started = 1;
|
||||
|
||||
int numpackets,tempframenum;
|
||||
int numpackets,tempframenum, nf;
|
||||
char* wbuf;
|
||||
char *data=new char[bufferSize];
|
||||
int iFrame = 0;
|
||||
|
||||
while(1){
|
||||
|
||||
int nf = 0;
|
||||
|
||||
while(receiver_threads_running){
|
||||
nf = 0;
|
||||
iFrame = 0;
|
||||
|
||||
while((1<<ithread)&writerthreads_mask){
|
||||
|
||||
//pop
|
||||
fifo->pop(wbuf);
|
||||
numpackets = (uint16_t)(*((uint16_t*)wbuf));
|
||||
#ifdef VERYDEBUG
|
||||
cout << "numpackets:" << hex << numpackets << endl;
|
||||
cout << "numpackets:" << dec << numpackets << endl;
|
||||
cout << ithread << "*** popped from fifo " << numpackets << endl;
|
||||
#endif
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
//last dummy packet
|
||||
if(numpackets == 0xFFFF){
|
||||
#ifdef VERYDEBUG
|
||||
cout << "popped last dummy frame:" << (void*)wbuf << endl;
|
||||
cout << "**********************popped last dummy frame:" << (void*)wbuf << " from thread " << ithread << endl;
|
||||
#endif
|
||||
//data compression, check if jobs done
|
||||
if(dataCompression){
|
||||
/*while(!filter->checkIfJobsDone())
|
||||
usleep(50000);*/
|
||||
;
|
||||
}
|
||||
|
||||
//free fifo
|
||||
while(!fifoFree->push(wbuf));
|
||||
#ifdef VERYDEBUG
|
||||
cout << "fifo freed:" << (void*)wbuf << endl;
|
||||
cout << "fifo freed:" << (void*)wbuf << " from thread " << ithread << endl;
|
||||
#endif
|
||||
//update status
|
||||
|
||||
|
||||
|
||||
//all threads need to close file, reset mask and exit loop
|
||||
pthread_mutex_lock(&status_mutex);
|
||||
status = RUN_FINISHED;
|
||||
pthread_mutex_unlock(&(status_mutex));
|
||||
cout << "Status: Run Finished" << endl;
|
||||
//close file
|
||||
if(sfilefd){
|
||||
#ifdef VERBOSE
|
||||
cout << "sfield:" << (int)sfilefd << endl;
|
||||
closeFile(ithread);
|
||||
writerthreads_mask^=(1<<ithread);
|
||||
#ifdef VERYDEBUG
|
||||
cout <<"Resetting mask of current thread " << ithread << ". New Mask: " << writerthreads_mask << endl;
|
||||
#endif
|
||||
fclose(sfilefd);
|
||||
sfilefd = NULL;
|
||||
pthread_mutex_unlock(&status_mutex);
|
||||
|
||||
|
||||
//only thread 0 needs to do this
|
||||
//check if all jobs are done and wait
|
||||
//change status to run finished
|
||||
if(ithread == 0){
|
||||
if(dataCompression){
|
||||
cout<<"Waiting for jobs to be done.. current mask:"<< hex << writerthreads_mask <<endl;
|
||||
while(writerthreads_mask){
|
||||
cout << "." << flush;
|
||||
usleep(50000);
|
||||
}
|
||||
cout<<" Jobs Done!"<<endl;
|
||||
}
|
||||
//update status
|
||||
pthread_mutex_lock(&status_mutex);
|
||||
status = RUN_FINISHED;
|
||||
pthread_mutex_unlock(&(status_mutex));
|
||||
//report
|
||||
cout << "Status: Run Finished" << endl;
|
||||
cout << "Total Packets Caught:" << dec << totalPacketsCaught << endl;
|
||||
cout << "Total Frames Caught:"<< dec << (totalPacketsCaught/packetsPerFrame) << endl;
|
||||
//acquisition end
|
||||
if (acquisitionFinishedCallBack)
|
||||
acquisitionFinishedCallBack((totalPacketsCaught/packetsPerFrame), pAcquisitionFinished);
|
||||
|
||||
}
|
||||
//report
|
||||
cout << "Total Packets Caught:" << dec << totalPacketsCaught << endl;
|
||||
cout << "Total Frames Caught:"<< dec << (totalPacketsCaught/packetsPerFrame) << endl;
|
||||
//acquisition end
|
||||
if (acquisitionFinishedCallBack)
|
||||
acquisitionFinishedCallBack((totalPacketsCaught/packetsPerFrame), pAcquisitionFinished);
|
||||
continue;
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
//for progress
|
||||
if ((myDetectorType == GOTTHARD) && (shortFrame == -1))
|
||||
tempframenum = (((((uint32_t)(*((uint32_t*)(wbuf + HEADER_SIZE_NUM_TOT_PACKETS))))+1)& (frameIndexMask)) >> frameIndexOffset);
|
||||
@ -1133,6 +1274,9 @@ int slsReceiverFunctionList::startWriting(){
|
||||
#endif
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
//without datacompression: write datacall back, or write data, free fifo
|
||||
if(!dataCompression){
|
||||
if (cbAction < DO_EVERYTHING)
|
||||
@ -1144,6 +1288,9 @@ int slsReceiverFunctionList::startWriting(){
|
||||
if(numWriterThreads >1)
|
||||
pthread_mutex_unlock(&progress_mutex);
|
||||
}
|
||||
//copy to gui
|
||||
copyFrameToGui(wbuf + HEADER_SIZE_NUM_TOT_PACKETS);
|
||||
|
||||
while(!fifoFree->push(wbuf));
|
||||
#ifdef VERYVERBOSE
|
||||
cout<<"buf freed:"<<(void*)wbuf<<endl;
|
||||
@ -1151,104 +1298,102 @@ int slsReceiverFunctionList::startWriting(){
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
//data compression
|
||||
else{
|
||||
/**********************************/
|
||||
#ifdef MYROOT1
|
||||
if(myDetectorType == MOENCH){
|
||||
//while
|
||||
|
||||
eventType thisEvent = PEDESTAL;
|
||||
int ndata;
|
||||
char* buff = wbuf+ HEADER_SIZE_NUM_TOT_PACKETS;
|
||||
int xmin = 0, ymin = 0, xmax = 160/*(NC)*/, ymax = 160/*NR*/, ix, iy;
|
||||
char* buff = 0;
|
||||
data = wbuf+ HEADER_SIZE_NUM_TOT_PACKETS;
|
||||
int xmin = 1, ymin = 1, xmax = 159/*(NC)*/, ymax = 159/*NR*/, ix, iy;
|
||||
int ir, ic;
|
||||
int remainingsize = numpackets * onePacketSize;
|
||||
int np;
|
||||
int once = 0;
|
||||
|
||||
double tot, tl, tr, bl, br, v;
|
||||
|
||||
while(buff = mdecoder->findNextFrame(buff,ndata,numpackets * onePacketSize )){
|
||||
while(buff = mdecoder[ithread]->findNextFrame(data,ndata,remainingsize )){/**need mutex??????????*/
|
||||
np = ndata/onePacketSize;
|
||||
|
||||
singlePhotonDet->newFrame();
|
||||
//cout<<"buff framnum:"<<ithread <<":"<< ((((uint32_t)(*((uint32_t*)buff)))& (frameIndexMask)) >> frameIndexOffset)<<endl;
|
||||
|
||||
if(commonModeSubtractionEnable){
|
||||
for(ix = xmin - 1; ix < xmax + 1; ix++){
|
||||
for(iy = ymin - 1; iy < ymax + 1; iy++){
|
||||
thisEvent = singlePhotonDet->getEventType(buff, ix, iy, 0);
|
||||
}
|
||||
}
|
||||
}
|
||||
if ((np == packetsPerFrame) && (buff!=NULL)){
|
||||
if(nf == 1000) cout << " pedestal done " << endl;
|
||||
|
||||
|
||||
for(ix = xmin - 1; ix < xmax + 1; ix++)
|
||||
for(iy = ymin - 1; iy < ymax + 1; iy++){
|
||||
thisEvent=singlePhotonDet->getEventType(buff, ix, iy, commonModeSubtractionEnable);
|
||||
|
||||
if (nf>1000) {
|
||||
tot=0;
|
||||
tl=0;
|
||||
tr=0;
|
||||
bl=0;
|
||||
br=0;
|
||||
|
||||
if (thisEvent==PHOTON_MAX ) {
|
||||
for (ir=-1; ir<2; ir++) {
|
||||
for (ic=-1; ic<2; ic++) {
|
||||
v=singlePhotonDet->getClusterElement(ic,ir);
|
||||
|
||||
tot+=v;
|
||||
if (ir<1) {
|
||||
if (ic<1)
|
||||
bl+=v;
|
||||
if (ic>-1)
|
||||
br+=v;
|
||||
}
|
||||
|
||||
if (ir>-1) {
|
||||
if (ic<1)
|
||||
tl+=v;
|
||||
if (ic>-1)
|
||||
tr+=v;
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
// if (bl>br && bl>tl && bl>tr) {
|
||||
//h2->Fill(bl, iy+NR*ix);
|
||||
//if (bl>0) {
|
||||
// hetaX->Fill((filter->getClusterElement(0,0)+filter->getClusterElement(0,-1))/bl,iy+NR*ix);
|
||||
// hetaY->Fill((filter->getClusterElement(0,0)+filter->getClusterElement(-1,0))/bl,iy+NR*ix);
|
||||
iFrame=mdecoder->getFrameNumber(buff);
|
||||
myTree->Fill();
|
||||
singlePhotonDet[ithread]->newFrame();
|
||||
if(commonModeSubtractionEnable){
|
||||
for(ix = xmin - 1; ix < xmax + 1; ix++){
|
||||
for(iy = ymin - 1; iy < ymax + 1; iy++){
|
||||
thisEvent = singlePhotonDet[ithread]->getEventType(buff, ix, iy, 0);
|
||||
}
|
||||
}
|
||||
}
|
||||
for(ix = xmin - 1; ix < xmax + 1; ix++)
|
||||
for(iy = ymin - 1; iy < ymax + 1; iy++){
|
||||
thisEvent=singlePhotonDet[ithread]->getEventType(buff, ix, iy, commonModeSubtractionEnable);
|
||||
if (nf>1000) {
|
||||
tot=0;
|
||||
tl=0;
|
||||
tr=0;
|
||||
bl=0;
|
||||
br=0;
|
||||
if (thisEvent==PHOTON_MAX) {
|
||||
|
||||
iFrame=mdecoder[ithread]->getFrameNumber(buff);/**need mutex??????????*/
|
||||
pthread_mutex_lock(&write_mutex);
|
||||
myTree[ithread]->Fill();
|
||||
pthread_mutex_unlock(&write_mutex);
|
||||
//cout << "Fill in event: frmNr: " << iFrame << " ix " << ix << " iy " << iy << " type " << thisEvent << endl;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
nf++;
|
||||
|
||||
pthread_mutex_lock(&write_mutex);
|
||||
|
||||
packetsInFile += packetsPerFrame;
|
||||
packetsCaught += packetsPerFrame;
|
||||
totalPacketsCaught += packetsPerFrame;
|
||||
|
||||
pthread_mutex_unlock(&write_mutex);
|
||||
if(!once){
|
||||
copyFrameToGui(buff);
|
||||
//cout<<"buff framnum:"<<ithread <<":"<< ((((uint32_t)(*((uint32_t*)buff)))& (frameIndexMask)) >> frameIndexOffset)<<endl;
|
||||
once = 1;
|
||||
}
|
||||
}
|
||||
|
||||
remainingsize -= ((buff + ndata) - data);
|
||||
data = buff + ndata;
|
||||
if(data > (wbuf + HEADER_SIZE_NUM_TOT_PACKETS + numpackets * onePacketSize) )
|
||||
cout <<" **************WE HAVE A PROBLEM!"<<endl;
|
||||
//cout << "remaining size: " << remainingsize << endl;
|
||||
|
||||
cout << "=" ;
|
||||
nf++;
|
||||
buff += bufferSize;
|
||||
packetsInFile += packetsPerFrame;
|
||||
packetsCaught += packetsPerFrame;
|
||||
totalPacketsCaught += packetsPerFrame;
|
||||
}
|
||||
cout << endl;
|
||||
myTree->Write(myTree->GetName(),TObject::kOverwrite);
|
||||
cout << "Read " << nf << " frames" << endl;
|
||||
|
||||
}
|
||||
|
||||
while(!fifoFree->push(wbuf));
|
||||
#ifdef VERYVERBOSE
|
||||
cout<<"buf freed:"<<(void*)wbuf<<endl;
|
||||
#endif
|
||||
/**********************************/
|
||||
|
||||
#endif
|
||||
}
|
||||
|
||||
|
||||
//copy to gui
|
||||
copyFrameToGui(wbuf + HEADER_SIZE_NUM_TOT_PACKETS);
|
||||
}
|
||||
|
||||
//wait
|
||||
sem_wait(&writersmp[ithread]);
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
Reference in New Issue
Block a user