diff --git a/slsDetectorSoftware/slsReceiver/slsReceiverFunctionList.cpp b/slsDetectorSoftware/slsReceiver/slsReceiverFunctionList.cpp index 1d69c0b3a..1b7c118b5 100644 --- a/slsDetectorSoftware/slsReceiver/slsReceiverFunctionList.cpp +++ b/slsDetectorSoftware/slsReceiver/slsReceiverFunctionList.cpp @@ -22,8 +22,8 @@ #include using namespace std; -FILE* slsReceiverFunctionList::sfilefd(NULL); -int slsReceiverFunctionList::receiver_threads_running(0); + + slsReceiverFunctionList::slsReceiverFunctionList(detectorType det): myDetectorType(det), @@ -67,14 +67,13 @@ slsReceiverFunctionList::slsReceiverFunctionList(detectorType det): buffer(NULL), numWriterThreads(1), thread_started(0), - writerthreads_mask(0x0), currentWriterThreadIndex(-1), totalListeningFrameCount(0), - running(0), - singlePhotonDet(NULL), - mdecoder(NULL), commonModeSubtractionEnable(false), - iFrame(0), + sfilefd(NULL), + writerthreads_mask(0x0), + listening_thread_running(0), + cbAction(DO_EVERYTHING), startAcquisitionCallBack(NULL), pStartAcquisition(NULL), acquisitionFinishedCallBack(NULL), @@ -105,7 +104,14 @@ slsReceiverFunctionList::slsReceiverFunctionList(detectorType det): strcpy(savefilename,""); strcpy(filePath,""); strcpy(fileName,"run"); - + for(int i=0;i /proc/sys/net/core/rmem_max")) + cout << "\nWARNING: Could not change socket receiver buffer size in file /proc/sys/net/core/rmem_max" << endl; + else if(system("echo 250000 > /proc/sys/net/core/netdev_max_backlog")) + cout << "\nWARNING: Could not change max length of input queue in file /proc/sys/net/core/netdev_max_backlog" << endl; + + /** permanent setting heiner + net.core.rmem_max = 104857600 # 100MiB + net.core.netdev_max_backlog = 250000 + sysctl -p + // from the manual + sysctl -w net.core.rmem_max=16777216 + sysctl -w net.core.netdev_max_backlog=250000 + */ +/* if(createThreads() == FAIL){ cout << "ERROR: Could not create writer threads" << endl; exit (-1); } - +*/ } slsReceiverFunctionList::~slsReceiverFunctionList(){ - if(udpSocket) delete udpSocket; - if(eth) delete [] eth; - if(latestData) delete [] latestData; - if(guiFileName) delete [] guiFileName; - if(mem0) free(mem0); - if(fifo) delete fifo; - if(fifoFree) delete fifoFree; + closeFile(-1); + for(int i=0;i(mdecoder, 3, 5, sign, cmSub); - cout<<"************filter created"<(mdecoder[i], 3, 5, sign, cmSub); + } + } + } +} + void slsReceiverFunctionList::readFrame(char* c,char** raw){ //point to gui data @@ -330,7 +371,7 @@ void slsReceiverFunctionList::readFrame(char* c,char** raw){ pthread_mutex_lock(&dataReadyMutex); guiDataReady = 0; pthread_mutex_unlock(&dataReadyMutex); - if((nFrameToGui) && (receiver_threads_running)){ + if((nFrameToGui) && (writerthreads_mask)){ //release after getting data sem_post(&smp); } @@ -501,18 +542,19 @@ int slsReceiverFunctionList::createThreads(bool destroy){ if(!destroy){ - //listening thread pthread_mutex_lock(&status_mutex); status = IDLE; - running = 0; + listening_thread_running = 0; + writerthreads_mask = 0x0; pthread_mutex_unlock(&(status_mutex)); + + //listening thread sem_init(&listensmp,0,0); if(pthread_create(&listening_thread, NULL,startListeningThread, (void*) this)){ cout << "Could not create listening thread" << endl; return FAIL; } - //#ifdef VERBOSE cout << "Listening thread created successfully." << endl; //#endif @@ -520,9 +562,7 @@ int slsReceiverFunctionList::createThreads(bool destroy){ //start writer threads cout << "Creating Writer Threads"; - writerthreads_mask = 0x0; currentWriterThreadIndex = -1; - for(i = 0; i < numWriterThreads; ++i){ sem_init(&writersmp[i],0,0); thread_started = 0; @@ -563,25 +603,11 @@ int slsReceiverFunctionList::createThreads(bool destroy){ cout << "WARNING: Could not prioritize threads. You need to be super user for that." << endl; - //to increase socket receiver buffer size and max length of input queue by changing kernel settings - if(system("echo $((100*1024*1024)) > /proc/sys/net/core/rmem_max")) - cout << "\nWARNING: Could not change socket receiver buffer size in file /proc/sys/net/core/rmem_max" << endl; - else if(system("echo 250000 > /proc/sys/net/core/netdev_max_backlog")) - cout << "\nWARNING: Could not change max length of input queue in file /proc/sys/net/core/netdev_max_backlog" << endl; - - /** permanent setting heiner - net.core.rmem_max = 104857600 # 100MiB - net.core.netdev_max_backlog = 250000 - sysctl -p - // from the manual - sysctl -w net.core.rmem_max=16777216 - sysctl -w net.core.netdev_max_backlog=250000 - */ } - else{ + else{cout<<"DESTROYNG THREADS"<initEventTree(savefilename, &iFrame); - - singlePhotonDet->newDataSet(); - /**********************************/ - /* - - filter->setupAcquisitionParameters(filePath,fileName,fileIndex); - // if(enableFileWrite && cbAction > DO_NOTHING) - // This commented option doesnt exist as we save and do ebverything for data compression - //create file - return filter->initTree();*/ -#endif - return OK; + for(int i=0;iinitEventTree(savefilename, &iframe); + //resets the pedestalSubtraction array and the commonModeSubtraction + singlePhotonDet[ithr]->newDataSet(); + if(myFile[ithr]==NULL){ + cout<<"file not null"<IsOpen()){ + cout<<"file not open"<Write(tall->GetName(),TObject::kOverwrite);*/ + myFile[ithr] = myTree[ithr]->GetCurrentFile(); + if(myFile[ithr]->Write()) + cout << "Thread " << ithr <<" wrote frames to file" << endl; + else + cout << "Thread " << ithr << " could not write frames to file" << endl; + }else + cout << "Thread " << ithr << " could not write frames to file: No file or No Tree" << endl; + /* packetsInFile = 0; + }*/ + //close file + if(myTree[ithr] && myFile[ithr]) + myFile[ithr] = myTree[ithr]->GetCurrentFile(); + if(myFile[ithr]) + myFile[ithr]->Close(); + myFile[ithr] = NULL; + myTree[ithr] = NULL; + } +#endif + } +} + + + + + int slsReceiverFunctionList::startReceiver(char message[]){ //#ifdef VERBOSE cout << "Starting Receiver" << endl; @@ -752,22 +856,27 @@ int slsReceiverFunctionList::startReceiver(char message[]){ return FAIL; } + //done to give the gui some proper name instead of always the last file name + if(dataCompression) + sprintf(savefilename, "%s/%s_fxxx_%d_xx.root", filePath,fileName,fileIndex); + //initialize semaphore sem_init(&smp,0,1); //status pthread_mutex_lock(&status_mutex); status = RUNNING; - receiver_threads_running = 1; - running = 1; + for(int i=0;ipop(buffer); @@ -872,6 +981,10 @@ int slsReceiverFunctionList::startListening(){ }else{ #ifdef VERYDEBUG cout << "***carry on buffer" << carryonBufferSize << endl; + cout<<"framennum in temochar:"<<((((uint32_t)(*((uint32_t*)tempchar))) + & (frameIndexMask)) >> frameIndexOffset)<pop(buffer); - (*((uint16_t*)(buffer))) = 0xFFFF; - while(!fifo->push(buffer)); + for(int i=0;ipop(buffer); + (*((uint16_t*)(buffer))) = 0xFFFF; + while(!fifo->push(buffer)); #ifdef VERYDEBUG - cout << "pushed in dummy buffer:" << (void*)buffer << endl; + cout << "pushed in dummy buffer:" << (void*)buffer << endl; #endif + } cout << "Total count listened to " << totalListeningFrameCount/packetsPerFrame << endl; pthread_mutex_lock(&status_mutex); - running = 0; + listening_thread_running = 0; pthread_mutex_unlock(&(status_mutex)); break; } @@ -949,6 +1066,7 @@ int slsReceiverFunctionList::startListening(){ packetcount = packetsPerFrame * numJobsPerThread; carryonBufferSize = 0; + //check if last packet valid and calculate packet count switch(myDetectorType){ @@ -978,8 +1096,8 @@ int slsReceiverFunctionList::startListening(){ #ifdef VERYDEBUG cout << "tempchar header:" << (((((uint32_t)(*((uint32_t*)(tempchar))))) & (frameIndexMask)) >> frameIndexOffset) << endl; - cout << "header:" << (((((uint32_t)(*((uint32_t*)(buffer + HEADER_SIZE_NUM_TOT_PACKETS))))) - & (frameIndexMask)) >> frameIndexOffset) << endl; + cout <<"tempchar packet:"<< ((((uint32_t)(*((uint32_t*)(tempchar))))) + & (packetIndexMask)) << endl; #endif } break; @@ -1014,7 +1132,7 @@ int slsReceiverFunctionList::startListening(){ } #ifdef VERYDEBUG - cout << "*** packetcount:" << packetcount << endl; + cout << "*** packetcount:" << packetcount << " carryonbuffer:" << carryonBufferSize << endl; #endif //write packet count and push (*((uint16_t*)(buffer))) = packetcount; @@ -1024,7 +1142,9 @@ int slsReceiverFunctionList::startListening(){ cout << "*** pushed into listening fifo" << endl; #endif } + sem_wait(&listensmp); + } return OK; @@ -1052,68 +1172,89 @@ int slsReceiverFunctionList::startWriting(){ thread_started = 1; - int numpackets,tempframenum; + int numpackets,tempframenum, nf; char* wbuf; + char *data=new char[bufferSize]; + int iFrame = 0; while(1){ - int nf = 0; - - while(receiver_threads_running){ + nf = 0; + iFrame = 0; + while((1<pop(wbuf); numpackets = (uint16_t)(*((uint16_t*)wbuf)); #ifdef VERYDEBUG - cout << "numpackets:" << hex << numpackets << endl; + cout << "numpackets:" << dec << numpackets << endl; cout << ithread << "*** popped from fifo " << numpackets << endl; #endif + + //last dummy packet if(numpackets == 0xFFFF){ #ifdef VERYDEBUG - cout << "popped last dummy frame:" << (void*)wbuf << endl; + cout << "**********************popped last dummy frame:" << (void*)wbuf << " from thread " << ithread << endl; #endif - //data compression, check if jobs done - if(dataCompression){ - /*while(!filter->checkIfJobsDone()) - usleep(50000);*/ - ; - } + //free fifo while(!fifoFree->push(wbuf)); #ifdef VERYDEBUG - cout << "fifo freed:" << (void*)wbuf << endl; + cout << "fifo freed:" << (void*)wbuf << " from thread " << ithread << endl; #endif - //update status + + + + //all threads need to close file, reset mask and exit loop pthread_mutex_lock(&status_mutex); - status = RUN_FINISHED; - pthread_mutex_unlock(&(status_mutex)); - cout << "Status: Run Finished" << endl; - //close file - if(sfilefd){ -#ifdef VERBOSE - cout << "sfield:" << (int)sfilefd << endl; + closeFile(ithread); + writerthreads_mask^=(1<> frameIndexOffset); @@ -1133,6 +1274,9 @@ int slsReceiverFunctionList::startWriting(){ #endif + + + //without datacompression: write datacall back, or write data, free fifo if(!dataCompression){ if (cbAction < DO_EVERYTHING) @@ -1144,6 +1288,9 @@ int slsReceiverFunctionList::startWriting(){ if(numWriterThreads >1) pthread_mutex_unlock(&progress_mutex); } + //copy to gui + copyFrameToGui(wbuf + HEADER_SIZE_NUM_TOT_PACKETS); + while(!fifoFree->push(wbuf)); #ifdef VERYVERBOSE cout<<"buf freed:"<<(void*)wbuf<findNextFrame(buff,ndata,numpackets * onePacketSize )){ + while(buff = mdecoder[ithread]->findNextFrame(data,ndata,remainingsize )){/**need mutex??????????*/ + np = ndata/onePacketSize; - singlePhotonDet->newFrame(); + //cout<<"buff framnum:"<> frameIndexOffset)<getEventType(buff, ix, iy, 0); - } - } - } + if ((np == packetsPerFrame) && (buff!=NULL)){ + if(nf == 1000) cout << " pedestal done " << endl; - for(ix = xmin - 1; ix < xmax + 1; ix++) - for(iy = ymin - 1; iy < ymax + 1; iy++){ - thisEvent=singlePhotonDet->getEventType(buff, ix, iy, commonModeSubtractionEnable); - - if (nf>1000) { - tot=0; - tl=0; - tr=0; - bl=0; - br=0; - - if (thisEvent==PHOTON_MAX ) { - for (ir=-1; ir<2; ir++) { - for (ic=-1; ic<2; ic++) { - v=singlePhotonDet->getClusterElement(ic,ir); - - tot+=v; - if (ir<1) { - if (ic<1) - bl+=v; - if (ic>-1) - br+=v; - } - - if (ir>-1) { - if (ic<1) - tl+=v; - if (ic>-1) - tr+=v; - } - - } - } - - // if (bl>br && bl>tl && bl>tr) { - //h2->Fill(bl, iy+NR*ix); - //if (bl>0) { - // hetaX->Fill((filter->getClusterElement(0,0)+filter->getClusterElement(0,-1))/bl,iy+NR*ix); - // hetaY->Fill((filter->getClusterElement(0,0)+filter->getClusterElement(-1,0))/bl,iy+NR*ix); - iFrame=mdecoder->getFrameNumber(buff); - myTree->Fill(); + singlePhotonDet[ithread]->newFrame(); + if(commonModeSubtractionEnable){ + for(ix = xmin - 1; ix < xmax + 1; ix++){ + for(iy = ymin - 1; iy < ymax + 1; iy++){ + thisEvent = singlePhotonDet[ithread]->getEventType(buff, ix, iy, 0); } } } + for(ix = xmin - 1; ix < xmax + 1; ix++) + for(iy = ymin - 1; iy < ymax + 1; iy++){ + thisEvent=singlePhotonDet[ithread]->getEventType(buff, ix, iy, commonModeSubtractionEnable); + if (nf>1000) { + tot=0; + tl=0; + tr=0; + bl=0; + br=0; + if (thisEvent==PHOTON_MAX) { + + iFrame=mdecoder[ithread]->getFrameNumber(buff);/**need mutex??????????*/ + pthread_mutex_lock(&write_mutex); + myTree[ithread]->Fill(); + pthread_mutex_unlock(&write_mutex); + //cout << "Fill in event: frmNr: " << iFrame << " ix " << ix << " iy " << iy << " type " << thisEvent << endl; + } + } + } + + nf++; + + pthread_mutex_lock(&write_mutex); + + packetsInFile += packetsPerFrame; + packetsCaught += packetsPerFrame; + totalPacketsCaught += packetsPerFrame; + + pthread_mutex_unlock(&write_mutex); + if(!once){ + copyFrameToGui(buff); + //cout<<"buff framnum:"<> frameIndexOffset)< (wbuf + HEADER_SIZE_NUM_TOT_PACKETS + numpackets * onePacketSize) ) + cout <<" **************WE HAVE A PROBLEM!"<push(wbuf)); #ifdef VERYVERBOSE cout<<"buf freed:"<<(void*)wbuf<