diff --git a/slsDetectorSoftware/slsReceiver/Makefile b/slsDetectorSoftware/slsReceiver/Makefile index d603db3f1..75f47fe90 100644 --- a/slsDetectorSoftware/slsReceiver/Makefile +++ b/slsDetectorSoftware/slsReceiver/Makefile @@ -1,5 +1,5 @@ CC = g++ -CLAGS += -DSLS_RECEIVER_FUNCTION_LIST +CLAGS += -DSLS_RECEIVER_FUNCTION_LIST LDFLAG= -L/usr/lib64/ -lpthread -lm -lstdc++ diff --git a/slsDetectorSoftware/slsReceiver/circularFifo.h b/slsDetectorSoftware/slsReceiver/circularFifo.h index 3c4c05670..e57a59192 100644 --- a/slsDetectorSoftware/slsReceiver/circularFifo.h +++ b/slsDetectorSoftware/slsReceiver/circularFifo.h @@ -15,11 +15,17 @@ #ifndef CIRCULARFIFO_H_ #define CIRCULARFIFO_H_ -#include "sls_detector_defs.h" +//#include "sls_detector_defs.h" #include using namespace std; +typedef double double32_t; +typedef float float32_t; +typedef int int32_t; + + + /** Circular Fifo (a.k.a. Circular Buffer) * Thread safe for one reader, and one writer */ template diff --git a/slsDetectorSoftware/slsReceiver/receiver_defs.h b/slsDetectorSoftware/slsReceiver/receiver_defs.h index 90ac83c19..ebe3f7a62 100755 --- a/slsDetectorSoftware/slsReceiver/receiver_defs.h +++ b/slsDetectorSoftware/slsReceiver/receiver_defs.h @@ -16,7 +16,7 @@ //all max frames defined in sls_detector_defs.h. 20000 gotthard, 100000 for short gotthard, 1000 for moench -#define GOTTHARD_FIFO_SIZE 25000 +#define GOTTHARD_FIFO_SIZE 25000//11 #define GOTTHARD_ALIGNED_FRAME_SIZE 4096 #define GOTTHARD_PACKETS_PER_FRAME 2 #define GOTTHARD_ONE_PACKET_SIZE 1286 @@ -35,7 +35,9 @@ #define GOTTHARD_PACKET_INDEX_MASK 0x1 - +#define GOTTHARD_NUM_JOBS_P_THREAD 5000//20000//3 with 25 frames +#define GOTTHARD_SHORT_NUM_JOBS_P_THREAD 2500//40000 +#define MOENCH_NUM_JOBS_P_THREAD 1000//10000 #define MOENCH_FIFO_SIZE 2500 #define MOENCH_ALIGNED_FRAME_SIZE 65536 diff --git a/slsDetectorSoftware/slsReceiver/slsReceiverFunctionList.cpp b/slsDetectorSoftware/slsReceiver/slsReceiverFunctionList.cpp index 7260fa77e..a56c46a4c 100644 --- a/slsDetectorSoftware/slsReceiver/slsReceiverFunctionList.cpp +++ b/slsDetectorSoftware/slsReceiver/slsReceiverFunctionList.cpp @@ -16,6 +16,7 @@ #include //munmap + #include #include using namespace std; @@ -60,6 +61,8 @@ slsReceiverFunctionList::slsReceiverFunctionList(detectorType det): frameIndexMask(GOTTHARD_FRAME_INDEX_MASK), frameIndexOffset(GOTTHARD_FRAME_INDEX_OFFSET), dataCompression(false), + numJobsPerThread(GOTTHARD_NUM_JOBS_P_THREAD), + userDefinedNumJobsPerThread(0), startAcquisitionCallBack(NULL), pStartAcquisition(NULL), acquisitionFinishedCallBack(NULL), @@ -80,7 +83,13 @@ slsReceiverFunctionList::slsReceiverFunctionList(detectorType det): packetsPerFrame = MOENCH_PACKETS_PER_FRAME; frameIndexMask = MOENCH_FRAME_INDEX_MASK; frameIndexOffset = MOENCH_FRAME_INDEX_OFFSET; + numJobsPerThread = MOENCH_NUM_JOBS_P_THREAD; } + + if(userDefinedNumJobsPerThread) + numJobsPerThread = userDefinedNumJobsPerThread; + + oneBufferSize = bufferSize/packetsPerFrame; strcpy(savefilename,""); @@ -108,10 +117,10 @@ slsReceiverFunctionList::slsReceiverFunctionList(detectorType det): } - vector > map; - vector > mask; - int initial_offset = 4; - int later_offset = 2; + int16_t* map; + int16_t* mask; + int initial_offset = 2; + int later_offset = 1; int mask_y_offset = 120; int mask_adc = 0x7fff; @@ -131,21 +140,17 @@ slsReceiverFunctionList::slsReceiverFunctionList(detectorType det): case MOENCH: x = MOENCH_PIXELS_IN_ONE_ROW; y = MOENCH_PIXELS_IN_ONE_ROW; - mask.resize(x); - for(i=0;iShutDownSocket(); + pthread_join(listening_thread,NULL); sprintf(message,"Could not create file %s.\n",savefilename); return FAIL; } @@ -401,7 +423,6 @@ int slsReceiverFunctionList::startReceiver(char message[]){ //initialize semaphore sem_init(&smp,0,1); - return OK; } @@ -410,14 +431,14 @@ int slsReceiverFunctionList::startReceiver(char message[]){ int slsReceiverFunctionList::stopReceiver(){ -#ifdef VERBOSE +//#ifdef VERBOSE cout << "Stopping Receiver" << endl; -#endif +//#endif if(receiver_threads_running){ -#ifdef VERBOSE - cout << "Stopping new acquisition threadddd ...." << endl; -#endif +//#ifdef VERBOSE + cout << "Stopping new acquisition thread" << endl; +//#endif //stop listening thread pthread_mutex_lock(&status_mutex); receiver_threads_running=0; @@ -426,11 +447,13 @@ int slsReceiverFunctionList::stopReceiver(){ if(udpSocket) udpSocket->ShutDownSocket(); pthread_join(listening_thread,NULL); pthread_join(writing_thread,NULL); + /*if(dataCompression) + filter->enableCompression(false);*/ } //change status pthread_mutex_lock(&status_mutex); status = IDLE; - pthread_mutex_unlock(&(status_mutex)); + pthread_mutex_unlock(&(status_mutex));; //semaphore destroy sem_post(&smp); @@ -462,144 +485,155 @@ int slsReceiverFunctionList::startListening(){ #ifdef VERYVERBOSE cout << "In startListening()\n"); #endif - int rc; - + int rc=0; measurementStarted = false; startFrameIndex = 0; - int offset=0; int ret=1; int i=0; - uint32_t *framenum; char *tempchar = new char[oneBufferSize]; - // A do/while(FALSE) loop is used to make error cleanup easier. The - // close() of each of the socket descriptors is only done once at the - // very end of the program. - do { + //to increase socket receiver buffer size and max length of input queue by changing kernel settings + if(system("echo $((100*1024*1024)) > /proc/sys/net/core/rmem_max")) + cout << "\nWARNING: Could not change socket receiver buffer size in file /proc/sys/net/core/rmem_max" << endl; + else if(system("echo 250000 > /proc/sys/net/core/netdev_max_backlog")) + cout << "\nWARNING: Could not change max length of input queue in file /proc/sys/net/core/netdev_max_backlog" << endl; - //to increase socket receiver buffer size and max length of input queue by changing kernel settings - if(system("echo $((100*1024*1024)) > /proc/sys/net/core/rmem_max")) - cout << "\nWARNING: Could not change socket receiver buffer size in file /proc/sys/net/core/rmem_max" << endl; - else if(system("echo 250000 > /proc/sys/net/core/netdev_max_backlog")) - cout << "\nWARNING: Could not change max length of input queue in file /proc/sys/net/core/netdev_max_backlog" << endl; - - /** permanent setting heiner + /** permanent setting heiner net.core.rmem_max = 104857600 # 100MiB net.core.netdev_max_backlog = 250000 sysctl -p // from the manual sysctl -w net.core.rmem_max=16777216 sysctl -w net.core.netdev_max_backlog=250000 - */ + */ - //creating udp socket - if (strchr(eth,'.')!=NULL) strcpy(eth,""); - if(!strlen(eth)){ - cout<<"warning:eth is empty.listening to all"<getErrorStatus()){ -#ifdef VERBOSE - std::cout<< "Could not create UDP socket "<< server_port << std::endl; -#endif - pthread_mutex_lock(&status_mutex); - listening_thread_running = -1; - pthread_mutex_unlock(&status_mutex); - break; + //creating udp socket + if (strchr(eth,'.')!=NULL) strcpy(eth,""); + if(!strlen(eth)){ + cout<<"warning:eth is empty.listening to all"<getErrorStatus(); + if (iret){ + //#ifdef VERBOSE + std::cout<< "Could not create UDP socket on port "<< server_port << " error:"<setupAcquisitionParameters();*/ - filter->setupAcquisitionParameters(); while (receiver_threads_running) { - if(!listening_thread_running){ - pthread_mutex_lock(&status_mutex); - listening_thread_running = 1; - pthread_mutex_unlock(&(status_mutex)); - } if (!fifofree->isEmpty()) { if (ret!=0) fifofree->pop(buffer); - if(ret == -2){ + + if(ret == -3){ memcpy(buffer,tempchar,oneBufferSize); offset = oneBufferSize; - } - - //receiver 2 half frames / 1 short frame / 40 moench frames - rc = udpSocket->ReceiveDataOnly(buffer+offset,oneBufferSize); - if( rc <= 0){ -#ifdef VERYVERBOSE - cerr << "recvfrom() failed" << endl; -#endif - continue; - } - //cout<<"got index:"<> frameIndexOffset); - - - if ((myDetectorType == GOTTHARD) && (shortFrame == -1)) - (*((uint32_t*)(buffer+offset)))++; - - - ret = filter->verifyFrame(buffer+offset); - - - //start for each scan - if(!measurementStarted){ - startFrameIndex = ((((uint32_t)(*((uint32_t*)buffer))) & (frameIndexMask)) >> frameIndexOffset); - cout<<"startFrameIndex:"<ReceiveDataOnly(buffer+offset,oneBufferSize); + if( rc <= 0){ + //#ifdef VERYVERBOSE + cerr << "recvfrom() failed:"<push(buffer); + //#endif + continue; + } + + + //manipulate buffer number to inlude frame number and packet number for gotthard + if ((myDetectorType == GOTTHARD) && (shortFrame == -1)) + (*((uint32_t*)(buffer+offset)))++; + + + ret = filter->verifyFrame(buffer+offset); + + //start for each scan + if(!measurementStarted){ + startFrameIndex = ((((uint32_t)(*((uint32_t*)buffer))) & (frameIndexMask)) >> frameIndexOffset); + cout<<"startFrameIndex:"<Disconnect(); - delete tempchar; - -#ifdef VERBOSE - cout << "receiver_threads_running:" << receiver_threads_running << endl; -#endif + } + delete tempchar; return 0; } @@ -634,7 +656,6 @@ int slsReceiverFunctionList::startListening(){ void* slsReceiverFunctionList::startWritingThread(void* this_pointer){ ((slsReceiverFunctionList*)this_pointer)->startWriting(); - return this_pointer; } @@ -649,7 +670,14 @@ int slsReceiverFunctionList::startWriting(){ char *wbuf; int sleepnumber=0; int frameFactor=0; - int i; + int i,p; + +/* char* trialarr[GOTTHARD_COMPRESSION_FIFO_SIZE];*/ + int iJob = -2; + char* startingmem = 0; + int header_of_last_packet = (packetsPerFrame-1)*oneBufferSize; + int pointinfifo=0; + int firsttime = 1; packetsInFile=0; framesCaught=0; @@ -667,10 +695,11 @@ int slsReceiverFunctionList::startWriting(){ cout << "Max Frames Per File:" << maxFramesPerFile << endl; if (rawDataReadyCallBack) cout << "Note: Data Write has been defined exernally" << endl; + if (dataCompression) + cout << "Data Compression is enabled with " << numJobsPerThread << " number of jobs per thread" << endl; if(nFrameToGui) cout << "Sending every " << nFrameToGui << "th frame to gui" << endl; - //by default, we read/write everything cbAction = DO_EVERYTHING; //acquisition start call back returns enable write @@ -685,68 +714,83 @@ int slsReceiverFunctionList::startWriting(){ cout << "Ready!" << endl; - if (dataCompression) - filter->enableFilter(true); - //will always run till acquisition over and then runs till fifo is empty while(receiver_threads_running || (!fifo->isEmpty())){ //start a new file - if (((int)(packetsInFile/packetsPerFrame) >= maxFramesPerFile) || (strlen(savefilename) == 0)){ + if ((strlen(savefilename) == 0) || ((packetsInFile/packetsPerFrame) >= maxFramesPerFile)){ //create file name - if(frameIndexNeeded==-1) sprintf(savefilename, "%s/%s_%d.raw", filePath,fileName,fileIndex); - else sprintf(savefilename, "%s/%s_f%012d_%d.raw", filePath,fileName,framesCaught,fileIndex); + if(!dataCompression){ + if(frameIndexNeeded==-1) + sprintf(savefilename, "%s/%s_%d.raw", filePath,fileName,fileIndex); + else + sprintf(savefilename, "%s/%s_f%012d_%d.raw", filePath,fileName,framesCaught,fileIndex); + } + + //only for gui display + else if(strlen(savefilename) == 0){ + sprintf(savefilename, "%s/%s_fxxx_%d.raw", filePath,fileName,fileIndex); + filter->setupAcquisitionParameters(filePath,fileName,fileIndex); + } + + if(enableFileWrite && cbAction > DO_NOTHING){ - - //create tree and file - if(dataCompression){ - if(enableFileWrite){ - filter->writeToFile(); - filter->initTree(savefilename); + if (dataCompression){ + //only the first time + if ((!framesCaught) && (filter->initTree()== FAIL)){ + cout << " Error: Could not create file " << savefilename << endl; + pthread_mutex_lock(&status_mutex); + writing_thread_running = -1; + pthread_mutex_unlock(&(status_mutex)); + break; } } - /*else{*///the standard way - if(sfilefd){ - fclose(sfilefd); - sfilefd = NULL; - } - if (NULL == (sfilefd = fopen((const char *) (savefilename), "w"))){ - cout << "Error: Could not create file " << savefilename << endl; - pthread_mutex_lock(&status_mutex); - writing_thread_running = -1; - pthread_mutex_unlock(&(status_mutex)); - break; - } - //setting buffer - setvbuf(sfilefd,NULL,_IOFBF,BUF_SIZE); - /*}*/ - //printing packet losses and file names - //if(prevframenum != 0) - if(!framesCaught) - cout << savefilename << endl; + //standard way else{ - cout << savefilename - << "\tpacket loss " - << setw(4)<isEmpty()){ if(fifo->pop(wbuf)){ - currframenum = ((uint32_t)(*((uint32_t*)wbuf))& frameIndexMask) >>frameIndexOffset; - //cout<<"currframenum: "<> frameIndexOffset)<> frameIndexOffset)<> frameIndexOffset)<= (fifosize-1)){ + /*cout<<"*** sending not normally "<assignJobsForThread(startingmem,iJob); + startingmem = mem0; + iJob = 0; + pointinfifo = 0; + /*cout<<"***************1trialmem:"<<(void*)startingmem<=(numJobsPerThread)){ + /*cout<<"*** sending normally "<assignJobsForThread(startingmem,numJobsPerThread); + iJob = -2; + } + } + + + //standard way + else{ + //find number of packets received + for(i=0,p=0; i < bufferSize; i+=oneBufferSize,++p){ + if(((uint32_t)(*((uint32_t*)((wbuf+i))))) == 0xFFFFFFFF){ + break; + } + } + //increment counters + totalPacketsCaught += p; + packetsInFile += p; + if(p == packetsPerFrame){ + framesCaught++; + totalFramesCaught++; + } + //write to file + if(sfilefd) + fwrite(wbuf, 1, p*oneBufferSize, sfilefd); + else{ + cout << "You do not have permissions to overwrite: " << savefilename << endl; + usleep(50000); + } } - /*}*/ } - - //does not read every frame if(!nFrameToGui){ - if((guiData) && (i == packetsPerFrame)){ + if((guiData) && (p == packetsPerFrame)){/*change it in funcs*/ + /* if(guiData){*/ pthread_mutex_lock(&dataReadyMutex); guiDataReady=0; pthread_mutex_unlock(&dataReadyMutex); @@ -816,7 +919,7 @@ int slsReceiverFunctionList::startWriting(){ } //reads every nth frame else{ - if (i != packetsPerFrame)//so no 1 packet frame writing over previous 2 packet frame + if (p != packetsPerFrame)//so no 1 packet frame writing over previous 2 packet frame ; else if(frameFactor){ frameFactor--; @@ -837,33 +940,60 @@ int slsReceiverFunctionList::startWriting(){ } } - fifofree->push(wbuf); + + if(!dataCompression) + fifofree->push(wbuf); } } else{//cout<<"************************fifo empty**********************************"<0){ + cout<<"sending at the end "<assignJobsForThread(startingmem,iJob); + iJob = -2; + } + //no more popped data + else{ + //all jobs done + if(filter->checkIfJobsDone()){ + //its all done + pthread_mutex_lock(&status_mutex); + status = RUN_FINISHED; + pthread_mutex_unlock(&(status_mutex)); + cout << "Status: Run Finished" << endl; + } + } + } + + //standard way + else{ + pthread_mutex_lock(&status_mutex); + status = RUN_FINISHED; + pthread_mutex_unlock(&(status_mutex)); + cout << "Status: Run Finished" << endl; + } + } + //acquisition not done in detector + else{ sleepnumber++; usleep(50000); } } } - pthread_mutex_lock(&status_mutex); - receiver_threads_running=0; - pthread_mutex_unlock(&status_mutex); cout << "Total Packets Caught:" << dec << totalPacketsCaught << endl; //cout << "RealTime Full Frames Caught:" << dec << framesCaught << endl; cout << "Total Full Frames Caught:"<< dec << totalFramesCaught << endl; - if(sfilefd){ + if((!dataCompression)&&(sfilefd)){ #ifdef VERBOSE cout << "sfield:" << (int)sfilefd << endl; #endif @@ -921,6 +1051,7 @@ int slsReceiverFunctionList::setShortFrame(int i){ packetsPerFrame = GOTTHARD_SHORT_PACKETS_PER_FRAME; frameIndexMask = GOTTHARD_SHORT_FRAME_INDEX_MASK; frameIndexOffset = GOTTHARD_SHORT_FRAME_INDEX_OFFSET; + numJobsPerThread = GOTTHARD_SHORT_NUM_JOBS_P_THREAD; }else{ bufferSize = GOTTHARD_BUFFER_SIZE; @@ -928,77 +1059,80 @@ int slsReceiverFunctionList::setShortFrame(int i){ packetsPerFrame = GOTTHARD_PACKETS_PER_FRAME; frameIndexMask = GOTTHARD_FRAME_INDEX_MASK; frameIndexOffset = GOTTHARD_FRAME_INDEX_OFFSET; + numJobsPerThread = GOTTHARD_NUM_JOBS_P_THREAD; } + if(userDefinedNumJobsPerThread) + numJobsPerThread = userDefinedNumJobsPerThread; + oneBufferSize = bufferSize/packetsPerFrame; //if the filter is inititalized with the wrong readout if(filter->getPacketsPerFrame() != packetsPerFrame){ - vector > map; - vector > mask; - int initial_offset = 4; - int later_offset = 2; + int16_t* map; + int16_t* mask; + + int initial_offset = 2; + int later_offset = 1; int x,y,i,j,offset = 0; switch(packetsPerFrame){ case GOTTHARD_SHORT_PACKETS_PER_FRAME://roi readout for gotthard x = 1; - y = (GOTTHARD_DATA_BYTES/GOTTHARD_PACKETS_PER_FRAME)/2; + y = (GOTTHARD_SHORT_DATABYTES/sizeof(int16_t)); offset = initial_offset; - mask.resize(x); - for(int i=0;ienableCompression(enable);}; /** get data compression, by saving only hits */ bool getDataCompression(){ return dataCompression;}; + /** Set Number of Jobs Per Thread */ + void setNumberOfJobsPerThread(int i){userDefinedNumJobsPerThread = i; numJobsPerThread = i;}; + private: /** detector type */ detectorType myDetectorType; /** max frames per file **/ - uint32_t maxFramesPerFile; + int maxFramesPerFile; /** File write enable */ int enableFileWrite; @@ -241,7 +244,7 @@ private: int frameIndexNeeded; /** Frames Caught for each real time acquisition (eg. for each scan) */ - uint32_t framesCaught; + int framesCaught; /* Acquisition started */ bool acqStarted; @@ -268,7 +271,7 @@ private: uint32_t acquisitionIndex; /** Packets currently in current file, starts new file when it reaches max */ - uint32_t packetsInFile; + int packetsInFile; /** Previous Frame number from buffer */ uint32_t prevframenum; @@ -328,10 +331,10 @@ private: int shortFrame; /** buffer size can be 1286*2 or 518 or 1286*40 */ - uint32_t bufferSize; + int bufferSize; /** number of packets per frame*/ - uint32_t packetsPerFrame; + int packetsPerFrame; /** gui data ready */ int guiDataReady; @@ -369,6 +372,12 @@ private: /** guiDataReady mutex */ pthread_mutex_t dataReadyMutex; + /** Number of jobs per thread for data compression */ + int numJobsPerThread; + + /** user defined number of jobs per thread for data compression */ + int userDefinedNumJobsPerThread; + /** callback arguments are filepath @@ -412,6 +421,7 @@ private: int cbAction; + public: /** File Descriptor */ static FILE *sfilefd; diff --git a/slsDetectorSoftware/slsReceiver/slsReceiver_funcs.cpp b/slsDetectorSoftware/slsReceiver/slsReceiver_funcs.cpp index 85a4e927a..d34038a13 100644 --- a/slsDetectorSoftware/slsReceiver/slsReceiver_funcs.cpp +++ b/slsDetectorSoftware/slsReceiver/slsReceiver_funcs.cpp @@ -40,6 +40,8 @@ slsReceiverFuncs::slsReceiverFuncs(int argc, char *argv[], int &success): ifstream infile; string sLine,sargname; int iline = 0; + bool dcompr = false; + int jobthread = -1; success=OK; @@ -130,17 +132,20 @@ slsReceiverFuncs::slsReceiverFuncs(int argc, char *argv[], int &success): //parse command line for type etc.. more priority if(success == OK){ for(int iarg=1;iargenableDataCompression(dcompr); + if(jobthread!=-1) slsReceiverList->setNumberOfJobsPerThread(jobthread); + #ifdef VERBOSE cout << "Function table assigned." << endl; #endif @@ -1146,8 +1201,10 @@ int slsReceiverFuncs::gotthard_read_frame(){ if(shortFrame!=-1){ if(bindex != 0xFFFFFFFF) memcpy((((char*)retval)+(GOTTHARD_SHORT_DATABYTES*shortFrame)),((char*) origVal)+4, GOTTHARD_SHORT_DATABYTES); - else + else{ index = startIndex - 1; + cout << "Missing Packet,Not sending to gui" << endl; + } } //all adc else{ @@ -1170,8 +1227,10 @@ int slsReceiverFuncs::gotthard_read_frame(){ }else cout << "different frames caught. frame1:"<< hex << index << ":"<