#ifdef SLS_RECEIVER_FUNCTION_LIST /********************************************//** * @file slsReceiverFunctionList.cpp * @short does all the functions for a receiver, set/get parameters, start/stop etc. ***********************************************/ #include "slsReceiverFunctionList.h" #include // SIGINT #include // stat #include // socket(), bind(), listen(), accept(), shut down #include // sock_addr_in, htonl, INADDR_ANY #include // exit() #include //set precision #include //munmap #include #include using namespace std; FILE* slsReceiverFunctionList::sfilefd(NULL); int slsReceiverFunctionList::receiver_threads_running(0); slsReceiverFunctionList::slsReceiverFunctionList(detectorType det): myDetectorType(det), maxFramesPerFile(MAX_FRAMES_PER_FILE), enableFileWrite(1), fileIndex(0), frameIndexNeeded(0), framesCaught(0), acqStarted(false), measurementStarted(false), startFrameIndex(0), frameIndex(0), totalFramesCaught(0), totalPacketsCaught(0), startAcquisitionIndex(0), acquisitionIndex(0), packetsInFile(0), prevframenum(0), listening_thread_running(0), writing_thread_running(0), status(IDLE), latestData(NULL), udpSocket(NULL), server_port(DEFAULT_UDP_PORTNO), fifo(NULL), fifofree(NULL), fifosize(GOTTHARD_FIFO_SIZE), shortFrame(-1), bufferSize(GOTTHARD_BUFFER_SIZE), packetsPerFrame(GOTTHARD_PACKETS_PER_FRAME), guiDataReady(0), guiData(NULL), guiFileName(NULL), currframenum(0), nFrameToGui(0), frameIndexMask(GOTTHARD_FRAME_INDEX_MASK), frameIndexOffset(GOTTHARD_FRAME_INDEX_OFFSET), dataCompression(false), numJobsPerThread(GOTTHARD_NUM_JOBS_P_THREAD), userDefinedNumJobsPerThread(0), startAcquisitionCallBack(NULL), pStartAcquisition(NULL), acquisitionFinishedCallBack(NULL), pAcquisitionFinished(NULL), rawDataReadyCallBack(NULL), pRawDataReady(NULL) { int aligned_frame_size = GOTTHARD_ALIGNED_FRAME_SIZE; if(myDetectorType == MOENCH){ aligned_frame_size = MOENCH_ALIGNED_FRAME_SIZE; fifosize = MOENCH_FIFO_SIZE; maxFramesPerFile = MOENCH_MAX_FRAMES_PER_FILE; bufferSize = MOENCH_BUFFER_SIZE; packetsPerFrame = MOENCH_PACKETS_PER_FRAME; frameIndexMask = MOENCH_FRAME_INDEX_MASK; frameIndexOffset = MOENCH_FRAME_INDEX_OFFSET; numJobsPerThread = MOENCH_NUM_JOBS_P_THREAD; } if(userDefinedNumJobsPerThread) numJobsPerThread = userDefinedNumJobsPerThread; oneBufferSize = bufferSize/packetsPerFrame; strcpy(savefilename,""); strcpy(filePath,""); strcpy(fileName,"run"); guiFileName = new char[MAX_STR_LENGTH]; strcpy(guiFileName,""); eth = new char[MAX_STR_LENGTH]; strcpy(eth,""); latestData = new char[bufferSize]; fifofree = new CircularFifo(fifosize); fifo = new CircularFifo(fifosize); mem0=(char*)malloc(aligned_frame_size*fifosize); if (mem0==NULL) { cout<<"++++++++++++++++++++++ COULD NOT ALLOCATE MEMORY!!!!!!!+++++++++++++++++++++" << endl; } buffer=mem0; while (buffer<(mem0+aligned_frame_size*(fifosize-1))) { fifofree->push(buffer); buffer+=aligned_frame_size; } int16_t* map; int16_t* mask; int initial_offset = 2; int later_offset = 1; int mask_y_offset = 120; int mask_adc = 0x7fff; int num_packets_in_col = 4; int num_packets_in_row = 10; int num_pixels_per_packet_in_row = 40; int num_pixels_per_packet_in_col = 16; int offset,ipacket; int x,y,i,j; int ipx,ipy,ix,iy; /** not for roi */ //filter switch(myDetectorType){ case MOENCH: x = MOENCH_PIXELS_IN_ONE_ROW; y = MOENCH_PIXELS_IN_ONE_ROW; mask = new int16_t[x*y]; map = new int16_t[x*y]; //set up mask for moench for(i=0;i=0) fileIndex = i; return getFileIndex(); } void slsReceiverFunctionList::resetTotalFramesCaught(){ acqStarted = false; startAcquisitionIndex = 0; totalFramesCaught = 0; totalPacketsCaught = 0; } int slsReceiverFunctionList::startReceiver(char message[]){ #ifdef VERBOSE cout << "Starting Receiver" << endl; #endif cout << endl; int err = 0; if(!receiver_threads_running){ #ifdef VERBOSE cout << "Starting new acquisition threadddd ...." << endl; #endif //change status pthread_mutex_lock(&status_mutex); status = IDLE; listening_thread_running = 0; writing_thread_running = 0; receiver_threads_running = 1; pthread_mutex_unlock(&(status_mutex)); // creating listening thread---------- err = pthread_create(&listening_thread, NULL,startListeningThread, (void*) this); if(err){ //change status pthread_mutex_lock(&status_mutex); status = IDLE; listening_thread_running = 0; receiver_threads_running = 0; pthread_mutex_unlock(&(status_mutex)); sprintf(message,"Cant create listening thread. Status:%d\n",status); cout << endl << message << endl; return FAIL; } //wait till udp socket created while(!listening_thread_running); if(listening_thread_running==-1){ //change status pthread_mutex_lock(&status_mutex); status = IDLE; listening_thread_running = 0; receiver_threads_running = 0; pthread_mutex_unlock(&(status_mutex)); strcpy(message,"Could not create UDP Socket.\n"); return FAIL; } #ifdef VERBOSE cout << "Listening thread created successfully." << endl; #endif // creating writing thread---------- err = 0; err = pthread_create(&writing_thread, NULL,startWritingThread, (void*) this); if(err){ //change status pthread_mutex_lock(&status_mutex); status = IDLE; writing_thread_running = 0; receiver_threads_running = 0; pthread_mutex_unlock(&(status_mutex)); //stop listening thread pthread_join(listening_thread,NULL); sprintf(message,"Cant create writing thread. Status:%d\n",status); cout << endl << message << endl; return FAIL; } //wait till file is created while(!writing_thread_running); if(writing_thread_running==-1){ //change status pthread_mutex_lock(&status_mutex); status = IDLE; listening_thread_running = 0; receiver_threads_running = 0; pthread_mutex_unlock(&(status_mutex)); if(udpSocket) udpSocket->ShutDownSocket(); pthread_join(listening_thread,NULL); sprintf(message,"Could not create file %s.\n",savefilename); return FAIL; } #ifdef VERBOSE cout << "Writing thread created successfully." << endl; #endif //change status---------- pthread_mutex_lock(&status_mutex); status = RUNNING; pthread_mutex_unlock(&(status_mutex)); cout << "Threads created successfully." << endl; struct sched_param tcp_param, listen_param, write_param; int policy= SCHED_RR; tcp_param.sched_priority = 50; listen_param.sched_priority = 99; write_param.sched_priority = 90; if (pthread_setschedparam(listening_thread, policy, &listen_param) == EPERM) cout << "WARNING: Could not prioritize threads. You need to be super user for that." << endl; if (pthread_setschedparam(writing_thread, policy, &write_param) == EPERM) cout << "WARNING: Could not prioritize threads. You need to be super user for that." << endl; if (pthread_setschedparam(pthread_self(),5 , &tcp_param) == EPERM) cout << "WARNING: Could not prioritize threads. You need to be super user for that." << endl; //pthread_getschedparam(pthread_self(),&policy,&tcp_param); //cout << "current priority of main tcp thread is " << tcp_param.sched_priority << endl; } //initialize semaphore sem_init(&smp,0,1); return OK; } int slsReceiverFunctionList::stopReceiver(){ //#ifdef VERBOSE cout << "Stopping Receiver" << endl; //#endif if(receiver_threads_running){ //#ifdef VERBOSE cout << "Stopping new acquisition thread" << endl; //#endif //stop listening thread pthread_mutex_lock(&status_mutex); receiver_threads_running=0; pthread_mutex_unlock(&(status_mutex)); if(udpSocket) udpSocket->ShutDownSocket(); pthread_join(listening_thread,NULL); pthread_join(writing_thread,NULL); /*if(dataCompression) filter->enableCompression(false);*/ } //change status pthread_mutex_lock(&status_mutex); status = IDLE; pthread_mutex_unlock(&(status_mutex));; //semaphore destroy sem_post(&smp); sem_destroy(&smp); cout << "Receiver Stopped.\nStatus:" << status << endl; return OK; } void* slsReceiverFunctionList::startListeningThread(void* this_pointer){ ((slsReceiverFunctionList*)this_pointer)->startListening(); return this_pointer; } int slsReceiverFunctionList::startListening(){ #ifdef VERYVERBOSE cout << "In startListening()\n"); #endif int rc=0; measurementStarted = false; startFrameIndex = 0; int offset=0; int ret=1; int i=0; char *tempchar = new char[oneBufferSize]; //to increase socket receiver buffer size and max length of input queue by changing kernel settings if(system("echo $((100*1024*1024)) > /proc/sys/net/core/rmem_max")) cout << "\nWARNING: Could not change socket receiver buffer size in file /proc/sys/net/core/rmem_max" << endl; else if(system("echo 250000 > /proc/sys/net/core/netdev_max_backlog")) cout << "\nWARNING: Could not change max length of input queue in file /proc/sys/net/core/netdev_max_backlog" << endl; /** permanent setting heiner net.core.rmem_max = 104857600 # 100MiB net.core.netdev_max_backlog = 250000 sysctl -p // from the manual sysctl -w net.core.rmem_max=16777216 sysctl -w net.core.netdev_max_backlog=250000 */ //creating udp socket if (strchr(eth,'.')!=NULL) strcpy(eth,""); if(!strlen(eth)){ cout<<"warning:eth is empty.listening to all"<getErrorStatus(); if (iret){ //#ifdef VERBOSE std::cout<< "Could not create UDP socket on port "<< server_port << " error:"<setupAcquisitionParameters();*/ while (receiver_threads_running) { if (!fifofree->isEmpty()) { if (ret!=0) fifofree->pop(buffer); if(ret == -3){ memcpy(buffer,tempchar,oneBufferSize); offset = oneBufferSize; //set all the new frame header and remaining headers invalid for(i = offset; i < bufferSize; i += oneBufferSize) (*((uint32_t*)(buffer+i))) = 0xFFFFFFFF; //start from beginning offset = 0; ret = 0; } else{ if(ret == -2){/** for moench, in between nxt, means ots -1*/ memcpy(buffer,tempchar,oneBufferSize); offset = oneBufferSize; ret = 0; } if(!startFrameIndex){ if(!listening_thread_running){ pthread_mutex_lock(&status_mutex); listening_thread_running = 1; pthread_mutex_unlock(&(status_mutex)); } } //receiver 2 half frames / 1 short frame / 40 moench frames rc = udpSocket->ReceiveDataOnly(buffer+offset,oneBufferSize); if( rc <= 0){ //#ifdef VERYVERBOSE cerr << "recvfrom() failed:"<push(buffer); //#endif continue; } //manipulate buffer number to inlude frame number and packet number for gotthard if ((myDetectorType == GOTTHARD) && (shortFrame == -1)) (*((uint32_t*)(buffer+offset)))++; ret = filter->verifyFrame(buffer+offset); //start for each scan if(!measurementStarted){ startFrameIndex = ((((uint32_t)(*((uint32_t*)buffer))) & (frameIndexMask)) >> frameIndexOffset); cout<<"startFrameIndex:"<isFull())){ fifo->push(buffer); } } } } delete tempchar; return 0; } void* slsReceiverFunctionList::startWritingThread(void* this_pointer){ ((slsReceiverFunctionList*)this_pointer)->startWriting(); return this_pointer; } int slsReceiverFunctionList::startWriting(){ #ifdef VERYVERBOSE cout << "In startWriting()" <isEmpty())){ //start a new file if ((strlen(savefilename) == 0) || ((packetsInFile/packetsPerFrame) >= maxFramesPerFile)){ //create file name if(!dataCompression){ if(frameIndexNeeded==-1) sprintf(savefilename, "%s/%s_%d.raw", filePath,fileName,fileIndex); else sprintf(savefilename, "%s/%s_f%012d_%d.raw", filePath,fileName,framesCaught,fileIndex); } //only for gui display else if(strlen(savefilename) == 0){ sprintf(savefilename, "%s/%s_fxxx_%d.raw", filePath,fileName,fileIndex); filter->setupAcquisitionParameters(filePath,fileName,fileIndex); } if(enableFileWrite && cbAction > DO_NOTHING){ if (dataCompression){ //only the first time if ((!framesCaught) && (filter->initTree()== FAIL)){ cout << " Error: Could not create file " << savefilename << endl; pthread_mutex_lock(&status_mutex); writing_thread_running = -1; pthread_mutex_unlock(&(status_mutex)); break; } } //standard way else{ //close and open file if(sfilefd){ fclose(sfilefd); sfilefd = NULL; } if (NULL == (sfilefd = fopen((const char *) (savefilename), "w"))){ cout << "Error: Could not create file " << savefilename << endl; pthread_mutex_lock(&status_mutex); writing_thread_running = -1; pthread_mutex_unlock(&(status_mutex)); break; } //setting buffer setvbuf(sfilefd,NULL,_IOFBF,BUF_SIZE); //printing packet losses and file names if(!framesCaught) cout << savefilename << endl; else{ cout << savefilename << "\tpacket loss " << setw(4)<isEmpty()){ if(fifo->pop(wbuf)){ currframenum = ((uint32_t)(*((uint32_t*)wbuf))& frameIndexMask) >>frameIndexOffset; //cout<<"currframenum2:"<<((((uint32_t)(*((uint32_t*)((char*)(wbuf+oneBufferSize)))))& frameIndexMask) >> frameIndexOffset)<= (fifosize-1)){ /*cout<<"*** sending not normally "<assignJobsForThread(startingmem,iJob); startingmem = mem0; iJob = 0; pointinfifo = 0; /*cout<<"***************1trialmem:"<<(void*)startingmem<=(numJobsPerThread)){ /*cout<<"*** sending normally "<assignJobsForThread(startingmem,numJobsPerThread); iJob = -2; } } //standard way else{ //find number of packets received for(i=0,p=0; i < bufferSize; i+=oneBufferSize,++p){ if(((uint32_t)(*((uint32_t*)((wbuf+i))))) == 0xFFFFFFFF){ break; } } //increment counters totalPacketsCaught += p; packetsInFile += p; if(p == packetsPerFrame){ framesCaught++; totalFramesCaught++; } //write to file if(sfilefd) fwrite(wbuf, 1, p*oneBufferSize, sfilefd); else{ cout << "You do not have permissions to overwrite: " << savefilename << endl; usleep(50000); } } } //does not read every frame if(!nFrameToGui){ if((guiData) && (p == packetsPerFrame)){/*change it in funcs*/ /* if(guiData){*/ pthread_mutex_lock(&dataReadyMutex); guiDataReady=0; pthread_mutex_unlock(&dataReadyMutex); memcpy(latestData,wbuf,bufferSize); strcpy(guiFileName,savefilename); pthread_mutex_lock(&dataReadyMutex); guiDataReady=1; pthread_mutex_unlock(&dataReadyMutex); }else{ pthread_mutex_lock(&dataReadyMutex); guiDataReady=0; pthread_mutex_unlock(&dataReadyMutex); } } //reads every nth frame else{ if (p != packetsPerFrame)//so no 1 packet frame writing over previous 2 packet frame ; else if(frameFactor){ frameFactor--; }else{ frameFactor = nFrameToGui-1; //block current process if the guireader hasnt read it yet sem_wait(&smp); //copy data and set guidataready pthread_mutex_lock(&dataReadyMutex); guiDataReady=0; pthread_mutex_unlock(&dataReadyMutex); memcpy(latestData,wbuf,bufferSize); strcpy(guiFileName,savefilename); pthread_mutex_lock(&dataReadyMutex); guiDataReady = 1; pthread_mutex_unlock(&dataReadyMutex); } } if(!dataCompression) fifofree->push(wbuf); } } else{//cout<<"************************fifo empty**********************************"<0){ cout<<"sending at the end "<assignJobsForThread(startingmem,iJob); iJob = -2; } //no more popped data else{ //all jobs done if(filter->checkIfJobsDone()){ //its all done pthread_mutex_lock(&status_mutex); status = RUN_FINISHED; pthread_mutex_unlock(&(status_mutex)); cout << "Status: Run Finished" << endl; } } } //standard way else{ pthread_mutex_lock(&status_mutex); status = RUN_FINISHED; pthread_mutex_unlock(&(status_mutex)); cout << "Status: Run Finished" << endl; } } //acquisition not done in detector else{ sleepnumber++; usleep(50000); } } } cout << "Total Packets Caught:" << dec << totalPacketsCaught << endl; //cout << "RealTime Full Frames Caught:" << dec << framesCaught << endl; cout << "Total Full Frames Caught:"<< dec << totalFramesCaught << endl; if((!dataCompression)&&(sfilefd)){ #ifdef VERBOSE cout << "sfield:" << (int)sfilefd << endl; #endif fclose(sfilefd); sfilefd = NULL; } //acquistion over call back if (acquisitionFinishedCallBack) acquisitionFinishedCallBack(totalFramesCaught, pAcquisitionFinished); return 0; } void slsReceiverFunctionList::readFrame(char* c,char** raw){ //point to gui data if (guiData == NULL) guiData = latestData; //copy data and filename strcpy(c,guiFileName); //could not get gui data if(!guiDataReady){ *raw = NULL; } //data ready, set guidata to receive new data else{ *raw = guiData; guiData = NULL; pthread_mutex_lock(&dataReadyMutex); guiDataReady = 0; pthread_mutex_unlock(&dataReadyMutex); if((nFrameToGui) && (receiver_threads_running)){ //release after getting data sem_post(&smp); } } } int slsReceiverFunctionList::setShortFrame(int i){ shortFrame=i; if(shortFrame!=-1){ bufferSize = GOTTHARD_SHORT_BUFFER_SIZE; maxFramesPerFile = SHORT_MAX_FRAMES_PER_FILE; packetsPerFrame = GOTTHARD_SHORT_PACKETS_PER_FRAME; frameIndexMask = GOTTHARD_SHORT_FRAME_INDEX_MASK; frameIndexOffset = GOTTHARD_SHORT_FRAME_INDEX_OFFSET; numJobsPerThread = GOTTHARD_SHORT_NUM_JOBS_P_THREAD; }else{ bufferSize = GOTTHARD_BUFFER_SIZE; maxFramesPerFile = MAX_FRAMES_PER_FILE; packetsPerFrame = GOTTHARD_PACKETS_PER_FRAME; frameIndexMask = GOTTHARD_FRAME_INDEX_MASK; frameIndexOffset = GOTTHARD_FRAME_INDEX_OFFSET; numJobsPerThread = GOTTHARD_NUM_JOBS_P_THREAD; } if(userDefinedNumJobsPerThread) numJobsPerThread = userDefinedNumJobsPerThread; oneBufferSize = bufferSize/packetsPerFrame; //if the filter is inititalized with the wrong readout if(filter->getPacketsPerFrame() != packetsPerFrame){ int16_t* map; int16_t* mask; int initial_offset = 2; int later_offset = 1; int x,y,i,j,offset = 0; switch(packetsPerFrame){ case GOTTHARD_SHORT_PACKETS_PER_FRAME://roi readout for gotthard x = 1; y = (GOTTHARD_SHORT_DATABYTES/sizeof(int16_t)); offset = initial_offset; mask = new int16_t[x*y]; map = new int16_t[x*y]; //set up mask for gotthard short for (int i=0; i < x; i++) for (int j=0; j < y; j++){ mask[i*y+j] = 0; } //set up mapping for gotthard short for (int i=0; i < x; i++) for (int j=0; j < y; j++){ map[i*y+j] = offset; offset += 2; } delete filter; filter = new singlePhotonFilter(x,y,frameIndexMask, GOTTHARD_PACKET_INDEX_MASK, frameIndexOffset, 0, GOTTHARD_SHORT_PACKETS_PER_FRAME, 0,map, mask,fifofree,GOTTHARD_SHORT_BUFFER_SIZE); break; default: //normal readout for gotthard x = 1; y = (GOTTHARD_DATA_BYTES/sizeof(int16_t)); offset = initial_offset; mask = new int16_t[x*y]; map = new int16_t[x*y]; //set up mask for gotthard for (int i=0; i < x; i++) for (int j=0; j < y; j++){ mask[i*y+j] = 0; } //set up mapping for gotthard for (int i=0; i < x; i++) for (int j=0; j < y; j++){ //since there are 2 packets if (j == y/2){ offset += initial_offset; offset += 1; } map[i*y+j] = offset; offset += 1; } delete filter; filter = new singlePhotonFilter(x,y,frameIndexMask, GOTTHARD_PACKET_INDEX_MASK, frameIndexOffset, 0, GOTTHARD_PACKETS_PER_FRAME, 1,map, mask,fifofree,GOTTHARD_BUFFER_SIZE); break; } } return shortFrame; } void slsReceiverFunctionList::startReadout(){ pthread_mutex_lock(&status_mutex); status = TRANSMITTING; pthread_mutex_unlock(&status_mutex); cout << "Status: Transmitting" << endl; } #endif