#ifdef SLS_RECEIVER_FUNCTION_LIST /********************************************//** * @file slsReceiverFunctionList.cpp * @short does all the functions for a receiver, set/get parameters, start/stop etc. ***********************************************/ #include "slsReceiverFunctionList.h" #include // SIGINT #include // stat #include // socket(), bind(), listen(), accept(), shut down #include // sock_addr_in, htonl, INADDR_ANY #include // exit() #include //set precision #include //munmap #include #include using namespace std; FILE* slsReceiverFunctionList::sfilefd(NULL); int slsReceiverFunctionList::receiver_threads_running(0); slsReceiverFunctionList::slsReceiverFunctionList(detectorType det): myDetectorType(det), maxFramesPerFile(MAX_FRAMES_PER_FILE), enableFileWrite(1), fileIndex(0), frameIndexNeeded(0), framesCaught(0), acqStarted(false), measurementStarted(false), startFrameIndex(0), frameIndex(0), totalFramesCaught(0), totalPacketsCaught(0), framesInFile(0), startAcquisitionIndex(0), acquisitionIndex(0), prevframenum(0), listening_thread_running(0), writing_thread_running(0), status(IDLE), latestData(NULL), udpSocket(NULL), server_port(DEFAULT_UDP_PORTNO), fifo(NULL), fifofree(NULL), fifosize(GOTTHARD_FIFO_SIZE), shortFrame(-1), bufferSize(GOTTHARD_BUFFER_SIZE), packetsPerFrame(GOTTHARD_PACKETS_PER_FRAME), guiDataReady(0), guiData(NULL), guiFileName(NULL), currframenum(0), nFrameToGui(0), frameIndexMask(GOTTHARD_FRAME_INDEX_MASK), frameIndexOffset(GOTTHARD_FRAME_INDEX_OFFSET), dataCompression(false), numJobsPerThread(-1), acquisitionPeriod(SAMPLE_TIME_IN_NS), startAcquisitionCallBack(NULL), pStartAcquisition(NULL), acquisitionFinishedCallBack(NULL), pAcquisitionFinished(NULL), rawDataReadyCallBack(NULL), pRawDataReady(NULL) { if(myDetectorType == MOENCH){ fifosize = MOENCH_FIFO_SIZE; maxFramesPerFile = MOENCH_MAX_FRAMES_PER_FILE; bufferSize = MOENCH_BUFFER_SIZE; packetsPerFrame = MOENCH_PACKETS_PER_FRAME; frameIndexMask = MOENCH_FRAME_INDEX_MASK; frameIndexOffset = MOENCH_FRAME_INDEX_OFFSET; } oneBufferSize = bufferSize/packetsPerFrame; strcpy(savefilename,""); strcpy(filePath,""); strcpy(fileName,"run"); guiFileName = new char[MAX_STR_LENGTH]; strcpy(guiFileName,""); eth = new char[MAX_STR_LENGTH]; strcpy(eth,""); latestData = new char[bufferSize]; setupFifoStructure(); //for the filter int16_t* map; int16_t* mask; int initial_offset = 2; int later_offset = 1; int mask_y_offset = 120; int mask_adc = 0x7fff; int num_packets_in_col = 4; int num_packets_in_row = 10; int num_pixels_per_packet_in_row = 40; int num_pixels_per_packet_in_col = 16; int offset,ipacket; int x,y,i,j; int ipx,ipy,ix,iy; /** not for roi */ //filter switch(myDetectorType){ case MOENCH: x = MOENCH_PIXELS_IN_ONE_ROW; y = MOENCH_PIXELS_IN_ONE_ROW; mask = new int16_t[x*y]; map = new int16_t[x*y]; //set up mask for moench for(i=0;iregisterCallBackFreeFifo(&(freeFifoBufferCallBack),this); dataCompression = false; } slsReceiverFunctionList::~slsReceiverFunctionList(){ if(latestData) delete [] latestData; if(fifofree) delete [] fifofree; if(fifo) delete [] fifo; if(guiFileName) delete [] guiFileName; if(eth) delete [] eth; if(mem0) free(mem0); } int slsReceiverFunctionList::setEnableFileWrite(int i){ if(i!=-1) enableFileWrite=i; return enableFileWrite; } void slsReceiverFunctionList::setEthernetInterface(char* c){ strcpy(eth,c); } uint32_t slsReceiverFunctionList::getFrameIndex(){ if(!framesCaught) frameIndex=0; else frameIndex = currframenum - startFrameIndex; return frameIndex; } uint32_t slsReceiverFunctionList::getAcquisitionIndex(){ if(!totalFramesCaught) acquisitionIndex=0; else acquisitionIndex = currframenum - startAcquisitionIndex; return acquisitionIndex; } char* slsReceiverFunctionList::setFileName(char c[]){ if(strlen(c)) strcpy(fileName,c); return getFileName(); } char* slsReceiverFunctionList::setFilePath(char c[]){ if(strlen(c)){ //check if filepath exists struct stat st; if(stat(c,&st) == 0) strcpy(filePath,c); else{ strcpy(filePath,""); cout<<"FilePath does not exist:"<=0) fileIndex = i; return getFileIndex(); } void slsReceiverFunctionList::resetTotalFramesCaught(){ acqStarted = false; startAcquisitionIndex = 0; totalFramesCaught = 0; totalPacketsCaught = 0; } int slsReceiverFunctionList::startReceiver(char message[]){ //#ifdef VERBOSE cout << "Starting Receiver" << endl; //#endif cout << endl; int err = 0; if(!receiver_threads_running){ #ifdef VERBOSE cout << "Starting new acquisition threadddd ...." << endl; #endif //change status pthread_mutex_lock(&status_mutex); status = IDLE; listening_thread_running = 0; writing_thread_running = 0; receiver_threads_running = 1; pthread_mutex_unlock(&(status_mutex)); // creating listening thread---------- err = pthread_create(&listening_thread, NULL,startListeningThread, (void*) this); if(err){ //change status pthread_mutex_lock(&status_mutex); status = IDLE; listening_thread_running = 0; receiver_threads_running = 0; pthread_mutex_unlock(&(status_mutex)); sprintf(message,"Cant create listening thread. Status:%d\n",status); cout << endl << message << endl; return FAIL; } //wait till udp socket created while(!listening_thread_running); if(listening_thread_running==-1){ //change status pthread_mutex_lock(&status_mutex); status = IDLE; listening_thread_running = 0; receiver_threads_running = 0; pthread_mutex_unlock(&(status_mutex)); strcpy(message,"Could not create UDP Socket.\n"); return FAIL; } #ifdef VERBOSE cout << "Listening thread created successfully." << endl; #endif // creating writing thread---------- err = 0; err = pthread_create(&writing_thread, NULL,startWritingThread, (void*) this); if(err){ //change status pthread_mutex_lock(&status_mutex); status = IDLE; writing_thread_running = 0; receiver_threads_running = 0; pthread_mutex_unlock(&(status_mutex)); //stop listening thread pthread_join(listening_thread,NULL); sprintf(message,"Cant create writing thread. Status:%d\n",status); cout << endl << message << endl; return FAIL; } //wait till file is created while(!writing_thread_running); if(writing_thread_running==-1){ //change status pthread_mutex_lock(&status_mutex); status = IDLE; listening_thread_running = 0; receiver_threads_running = 0; pthread_mutex_unlock(&(status_mutex)); if(udpSocket) udpSocket->ShutDownSocket(); pthread_join(listening_thread,NULL); sprintf(message,"Could not create file %s.\n",savefilename); return FAIL; } #ifdef VERBOSE cout << "Writing thread created successfully." << endl; #endif //change status---------- pthread_mutex_lock(&status_mutex); status = RUNNING; pthread_mutex_unlock(&(status_mutex)); cout << "Threads created successfully." << endl; struct sched_param tcp_param, listen_param, write_param; int policy= SCHED_RR; tcp_param.sched_priority = 50; listen_param.sched_priority = 99; write_param.sched_priority = 90; /*** ???????????????????????????*/ if(!dataCompression){ if (pthread_setschedparam(listening_thread, policy, &listen_param) == EPERM) cout << "WARNING: Could not prioritize threads. You need to be super user for that." << endl; if (pthread_setschedparam(writing_thread, policy, &write_param) == EPERM) cout << "WARNING: Could not prioritize threads. You need to be super user for that." << endl; if (pthread_setschedparam(pthread_self(),5 , &tcp_param) == EPERM) cout << "WARNING: Could not prioritize threads. You need to be super user for that." << endl; } //pthread_getschedparam(pthread_self(),&policy,&tcp_param); //cout << "current priority of main tcp thread is " << tcp_param.sched_priority << endl; } //initialize semaphore sem_init(&smp,0,1); return OK; } int slsReceiverFunctionList::stopReceiver(){ //#ifdef VERBOSE cout << "Stopping Receiver" << endl; //#endif if(receiver_threads_running){ #ifdef VERBOSE cout << "Stopping new acquisition thread" << endl; #endif //stop listening thread pthread_mutex_lock(&status_mutex); receiver_threads_running=0; pthread_mutex_unlock(&(status_mutex)); if(listening_thread_running == 1){ if(udpSocket) udpSocket->ShutDownSocket(); pthread_join(listening_thread,NULL); } if(writing_thread_running == 1) pthread_join(writing_thread,NULL); } //change status pthread_mutex_lock(&status_mutex); status = IDLE; pthread_mutex_unlock(&(status_mutex));; //semaphore destroy sem_post(&smp); sem_destroy(&smp); cout << "Receiver Stopped.\nStatus:" << status << endl; return OK; } void* slsReceiverFunctionList::startListeningThread(void* this_pointer){ ((slsReceiverFunctionList*)this_pointer)->startListening(); return this_pointer; } int slsReceiverFunctionList::startListening(){ #ifdef VERYVERBOSE cout << "In startListening()\n"); #endif int rc=0; measurementStarted = false; startFrameIndex = 0; int offset=0; int frameStartOffset = 0; int ret=1; int i=0; int framesCount = -1; int packetsCount = 0; char *tempchar = new char[oneBufferSize]; //#ifdef VERYVERBOSE int totalcount = 0; //#endif //to increase socket receiver buffer size and max length of input queue by changing kernel settings if(system("echo $((100*1024*1024)) > /proc/sys/net/core/rmem_max")) cout << "\nWARNING: Could not change socket receiver buffer size in file /proc/sys/net/core/rmem_max" << endl; else if(system("echo 250000 > /proc/sys/net/core/netdev_max_backlog")) cout << "\nWARNING: Could not change max length of input queue in file /proc/sys/net/core/netdev_max_backlog" << endl; /** permanent setting heiner net.core.rmem_max = 104857600 # 100MiB net.core.netdev_max_backlog = 250000 sysctl -p // from the manual sysctl -w net.core.rmem_max=16777216 sysctl -w net.core.netdev_max_backlog=250000 */ //creating udp socket if (strchr(eth,'.')!=NULL) strcpy(eth,""); if(!strlen(eth)){ cout<<"warning:eth is empty.listening to all"<getErrorStatus(); if (iret){ //#ifdef VERBOSE std::cout<< "Could not create UDP socket on port "<< server_port << " error:"<= numJobsPerThread){ //write frame count for each buffer (*((uint16_t*)buffer)) = framesCount; while(!fifo->push(buffer)); #ifdef VERYVERBOSE cout << "lbuf1:" << (void*)buffer << endl; #endif } //pop freefifo if((framesCount >= numJobsPerThread) || (framesCount == -1)){ //reset frame count and packet count framesCount = 0; packetsCount = 0; //pop freefifo /*while(fifofree->isEmpty());*/ fifofree->pop(buffer); #ifdef VERYVERBOSE cout << "lbuf1 popped:" << (void*)buffer << endl; #endif //increment offsets offset = HEADER_SIZE_NUM_FRAMES; offset += HEADER_SIZE_NUM_PACKETS; frameStartOffset = HEADER_SIZE_NUM_FRAMES; } //let tcp thread know this thread is in working condition if(!startFrameIndex){ if(!listening_thread_running){ pthread_mutex_lock(&status_mutex); listening_thread_running = 1; pthread_mutex_unlock(&(status_mutex)); } } //ret -2, remaining, start new frame with curent packet, then progress to ret = 0 (waiting for next packet) if(ret == -2){ memcpy(buffer+offset,tempchar,oneBufferSize); ret = 0; } //ret = -3, remaning: start new frame with current packet, progress to ret = -1 (invalidate remaining packets, start new frame) else if(ret == -3){ memcpy(buffer+offset,tempchar,oneBufferSize); ret = -1; } else{ //receive 1 packet rc = udpSocket->ReceiveDataOnly(buffer+offset,oneBufferSize); if( rc <= 0){ #ifdef VERYVERBOSE cerr << "recvfrom() failed:"< 0){ (*((uint16_t*)buffer)) = framesCount; fifo->push(buffer); #ifdef VERYVERBOSE cout <<" last lbuf1:" << (void*)buffer << endl; #endif } //push in dummy packet while(fifofree->isEmpty()); fifofree->pop(buffer); (*((uint16_t*)buffer)) = 0xFFFF; fifo->push(buffer); #ifdef VERYVERBOSE cout << "pushed in dummy buffer:" << (void*)buffer << endl; #endif break; } } //manipulate buffer number to inlude frame number and packet number for gotthard if ((myDetectorType == GOTTHARD) && (shortFrame == -1)) (*((uint32_t*)(buffer+offset)))++; //start for each scan if(!measurementStarted){ startFrameIndex = ((((uint32_t)(*((uint32_t*)(buffer+offset)))) & (frameIndexMask)) >> frameIndexOffset); cout<<"startFrameIndex:"<verifyFrame(buffer+offset); /* rets case 0: waiting for next packet of new frame case 1: finished with full frame, start new frame case -1: last packet of current frame, invalidate remaining packets, start new frame case -2: first packet of new frame, invalidate remaining packets, check buffer needs to be pushed, start new frame with the current packet, then ret = 0 case -3: last packet of new frame, invalidate remaining packets, check buffer needs to be pushed, start new frame with current packet, then ret = -1 (invalidate remaining packets and start a new frame) */ } //for each packet packetsCount++; //ret = 0, so just increment offset and continue if(ret == 0){ offset += oneBufferSize; continue; } //ret -2, -3, copy the current packet temporarily if(ret < -1){ memcpy(tempchar, buffer+offset, oneBufferSize); packetsCount --; } //ret -1, change needed only for the remaining packets else if (ret == -1){ offset += oneBufferSize; ret = 1; } //ret = -1, -2, -3, invalidate remaining packets if(ret < 0){ for( i = offset; i < bufferSize; i += oneBufferSize) (*((uint32_t*)(buffer+i))) = 0xFFFFFFFF; } //for each frame //write packet count (*((uint8_t*)(buffer+frameStartOffset))) = packetsCount; //reset packet count packetsCount = 0; //increment frame count framesCount++; //#ifdef VERYVERBOSE totalcount++; //#endif #ifdef VERYVERBOSE cout<<"lcurrframnum:"<< dec<< (((uint32_t)(*((uint32_t*)(buffer+offset))) & frameIndexMask) >> frameIndexOffset)<<"*"<startWriting(); return this_pointer; } int slsReceiverFunctionList::startWriting(){ #ifdef VERYVERBOSE cout << "In startWriting()" <setupAcquisitionParameters(filePath,fileName,fileIndex); // if(enableFileWrite && cbAction > DO_NOTHING) // This commented option doesnt exist as we save and do ebverything for data compression //create file i = filter->initTree(); } //file/tree not created if(i == FAIL){ cout << " Error: Could not create file " << savefilename << endl; pthread_mutex_lock(&status_mutex); writing_thread_running = -1; pthread_mutex_unlock(&(status_mutex)); } else{ //let tcp thread know it started successfully pthread_mutex_lock(&status_mutex); writing_thread_running = 1; pthread_mutex_unlock(&(status_mutex)); cout << "Ready!" << endl; //will always run till acquisition over and then runs till fifo is empty while(receiver_threads_running){// || (!fifo->isEmpty())){ //pop fifo if(fifo->pop(wbuf)){ //number of frames per buffer numFrames = (uint16_t)(*((uint16_t*)wbuf)); //last dummy packet if(numFrames == 0xFFFF){ #ifdef VERYVERBOSE cout << "popped last dummy frame:" << (void*)wbuf << endl; #endif //fifofree->push(wbuf); /* cout <<"fifofree is full?:" << fifofree->isFull()<isEmpty()<isEmpty() && status == TRANSMITTING){ //data compression if(dataCompression){ //check if jobs done while(!filter->checkIfJobsDone()) usleep(50000); } #ifdef VERYVERBOSE cout << "fifo freed:" << (void*)wbuf << endl; #endif fifofree->push(wbuf); /* cout <<"fifofree is full?:" << fifofree->isFull()<isEmpty()<>frameIndexOffset); #ifdef VERYVERBOSE cout << "currframnum:" << dec << currframenum << endl; #endif } //send it for writing and data compression if(dataCompression) filter->assignJobsForThread(wbuf, numFrames); //standard way else{ //write data call back if (cbAction < DO_EVERYTHING) { rawDataReadyCallBack(currframenum, wbuf, numFrames * bufferSize, sfilefd, guiData,pRawDataReady); } else { offset = HEADER_SIZE_NUM_FRAMES; while(numFrames > 0){ //to make sure, we write according to max frames per file numFramesToBeSaved = maxFramesPerFile - framesInFile; if(numFramesToBeSaved > numFrames) numFramesToBeSaved = numFrames; //write packets to file for(i = 0; i < numFramesToBeSaved; i++){ //determine number of packets npackets = (uint8_t)(*((uint8_t*)(wbuf + offset))); totalPacketsCaught += npackets; offset += HEADER_SIZE_NUM_PACKETS; //write to file if(enableFileWrite){ if(sfilefd) fwrite(wbuf+offset, 1, npackets * oneBufferSize, sfilefd); else{ if(!totalPacketsCaught) cout << "ERROR: You do not have permissions to overwrite: " << savefilename << endl; } } //increment offset offset += bufferSize; } //increment/decrement counters framesInFile += numFramesToBeSaved; framesCaught += numFramesToBeSaved; totalFramesCaught += numFramesToBeSaved; numFrames -= numFramesToBeSaved; //create new file if(framesInFile >= maxFramesPerFile) createNewFile(); } } } copyFrameToGui(wbuf + HEADER_SIZE_NUM_FRAMES + HEADER_SIZE_NUM_PACKETS); if(!dataCompression){ fifofree->push(wbuf); #ifdef VERYVERBOSE cout<<"buf freed:"<<(void*)wbuf< DO_NOTHING){ //close if(sfilefd){ fclose(sfilefd); sfilefd = NULL; } //open file if (NULL == (sfilefd = fopen((const char *) (savefilename), "w"))){ cout << "Error: Could not create file " << savefilename << endl; pthread_mutex_lock(&status_mutex); writing_thread_running = -1; pthread_mutex_unlock(&(status_mutex)); return FAIL; } //setting buffer setvbuf(sfilefd,NULL,_IOFBF,BUF_SIZE); //printing packet losses and file names if(!framesCaught) cout << savefilename << endl; else{ cout << savefilename << "\tpacket loss " << setw(4)<getPacketsPerFrame() != packetsPerFrame){ int16_t* map; int16_t* mask; int initial_offset = 2; int x,y,i,j,offset = 0; switch(packetsPerFrame){ case GOTTHARD_SHORT_PACKETS_PER_FRAME://roi readout for gotthard x = 1; y = (GOTTHARD_SHORT_DATABYTES/sizeof(int16_t)); offset = initial_offset; mask = new int16_t[x*y]; map = new int16_t[x*y]; //set up mask for gotthard short for (i=0; i < x; i++) for (j=0; j < y; j++){ mask[i*y+j] = 0; } //set up mapping for gotthard short for (i=0; i < x; i++) for (j=0; j < y; j++){ map[i*y+j] = offset; offset += 2; } delete filter; filter = new singlePhotonFilter(x,y,frameIndexMask, GOTTHARD_PACKET_INDEX_MASK, frameIndexOffset, 0, GOTTHARD_SHORT_PACKETS_PER_FRAME, 0,map, mask,fifofree,GOTTHARD_SHORT_BUFFER_SIZE,&totalFramesCaught,&framesCaught,&currframenum); break; default: //normal readout for gotthard x = 1; y = (GOTTHARD_DATA_BYTES/sizeof(int16_t)); offset = initial_offset; mask = new int16_t[x*y]; map = new int16_t[x*y]; //set up mask for gotthard for (i=0; i < x; i++) for (j=0; j < y; j++){ mask[i*y+j] = 0; } //set up mapping for gotthard for (i=0; i < x; i++) for (j=0; j < y; j++){ //since there are 2 packets if (j == y/2){ offset += initial_offset; offset += 1; } map[i*y+j] = offset; offset += 1; } delete filter; filter = new singlePhotonFilter(x,y,frameIndexMask, GOTTHARD_PACKET_INDEX_MASK, frameIndexOffset, 0, GOTTHARD_PACKETS_PER_FRAME, 1,map, mask,fifofree,GOTTHARD_BUFFER_SIZE,&totalFramesCaught,&framesCaught,&currframenum); break; } } return shortFrame; } void slsReceiverFunctionList::startReadout(){ //wait so that all packets which take time has arrived usleep(50000); pthread_mutex_lock(&status_mutex); status = TRANSMITTING; pthread_mutex_unlock(&status_mutex); cout << "Status: Transmitting" << endl; //push last packet if((udpSocket) && (listening_thread_running == 1)) udpSocket->ShutDownSocket(); } #endif int slsReceiverFunctionList::setNFrameToGui(int i){ if(i>=0){ nFrameToGui = i; setupFifoStructure(); } return nFrameToGui; }; int64_t slsReceiverFunctionList::setAcquisitionPeriod(int64_t index){ if(index >= 0){ if(index != acquisitionPeriod){ acquisitionPeriod = index; setupFifoStructure(); } } return acquisitionPeriod; }; void slsReceiverFunctionList::setupFifoStructure(){ int64_t i; int oldn = numJobsPerThread; //if every nth frame mode if(nFrameToGui) numJobsPerThread = nFrameToGui; //random nth frame mode else{ if(!acquisitionPeriod) i = SAMPLE_TIME_IN_NS; else i = SAMPLE_TIME_IN_NS/acquisitionPeriod; if (i > MAX_JOBS_PER_THREAD) numJobsPerThread = MAX_JOBS_PER_THREAD; else if (i < 1) numJobsPerThread = 1; else numJobsPerThread = i; } /*for testing numJobsPerThread = 3;*/ //if same, return if(oldn == numJobsPerThread) return; //deleting old structure and creating fifo structure if(fifofree){ while(!fifofree->isEmpty()) fifofree->pop(buffer); delete fifofree; } if(fifo) delete fifo; if(mem0) free(mem0); fifofree = new CircularFifo(fifosize); fifo = new CircularFifo(fifosize); //otherwise memory too much if numjobsperthread is at max = 1000 fifosize = GOTTHARD_FIFO_SIZE; if(myDetectorType == MOENCH) fifosize = MOENCH_FIFO_SIZE; if(fifosize % numJobsPerThread) fifosize = (fifosize/numJobsPerThread)+1; else fifosize = fifosize/numJobsPerThread; /*for testing fifosize = 11;*/ //allocate memory mem0=(char*)malloc(((bufferSize+HEADER_SIZE_NUM_PACKETS)*numJobsPerThread+HEADER_SIZE_NUM_FRAMES)*fifosize); if (mem0==NULL) /** shud let the client know about this */ cout<<"++++++++++++++++++++++ COULD NOT ALLOCATE MEMORY!!!!!!!+++++++++++++++++++++" << endl; buffer=mem0; //push the addresses into freed fifo while (buffer<(mem0+((bufferSize+HEADER_SIZE_NUM_PACKETS)*numJobsPerThread+HEADER_SIZE_NUM_FRAMES)*(fifosize-1))) { fifofree->push(buffer); buffer+=((bufferSize+HEADER_SIZE_NUM_PACKETS)*numJobsPerThread+HEADER_SIZE_NUM_FRAMES); } cout<<"Number of Frames per buffer:"<