From a485e33e82b4288d8722d9189403324b72e3fa0b Mon Sep 17 00:00:00 2001 From: l_maliakal_d Date: Mon, 16 Dec 2013 10:07:08 +0000 Subject: [PATCH] changed receiver to work with many writer threads for only receiver without compression git-svn-id: file:///afs/psi.ch/project/sls_det_software/svn/slsDetectorSoftware@707 951219d9-93cf-4727-9268-0efd64621fa3 --- .../MySocketTCP/genericSocket.h | 30 +- .../slsDetectorAnalysis/postProcessing.cpp | 114 +- .../singlePhotonFilter.cpp | 12 +- .../slsReceiver/receiver_defs.h | 1 + .../slsReceiver/slsReceiverFunctionList.cpp | 2025 ++++++++--------- .../slsReceiver/slsReceiverFunctionList.h | 352 +-- .../slsReceiver/slsReceiver_funcs.cpp | 6 +- 7 files changed, 1237 insertions(+), 1303 deletions(-) diff --git a/slsDetectorSoftware/MySocketTCP/genericSocket.h b/slsDetectorSoftware/MySocketTCP/genericSocket.h index af08337a9..1fa136939 100644 --- a/slsDetectorSoftware/MySocketTCP/genericSocket.h +++ b/slsDetectorSoftware/MySocketTCP/genericSocket.h @@ -541,7 +541,7 @@ enum communicationProtocol{ }; - int ReceiveDataOnly(void* buf,int length){ + int ReceiveDataOnly(void* buf,int length=0){ if (buf==NULL) return -1; @@ -562,16 +562,26 @@ enum communicationProtocol{ break; case UDP: if (socketDescriptor<0) return -1; - // while(length>0){ - for(int i=0;ipacket_size) ? packet_size:length; - nsent = recvfrom(socketDescriptor,(char*)buf+total_sent,nsending, 0, (struct sockaddr *) &clientAddress, &clientAddress_length); - if(!nsent) break; - length-=nsent; - total_sent+=nsent; + //if length given + if(length){ + while(length>0){ + nsending=packet_size; + nsent = recvfrom(socketDescriptor,(char*)buf+total_sent,nsending, 0, (struct sockaddr *) &clientAddress, &clientAddress_length); + if(!nsent) break; + length-=nsent; + total_sent+=nsent; + } + } + //depends on packets per frame + else{ + for(int i=0;i 0 only when using gui." << std::endl; - std::cout << "Current receiver read frequency: " << read_freq << std::endl; - } - } - - while(1){ - - cout.flush(); - cout< prevCaught) - newData=true; - else - newData=false; -#ifdef VERBOSE - std::cout << "caught:" << caught << " prevcaught:" << prevCaught << " newData:" << newData << std::endl; -#endif - prevCaught=caught; - } - - //read frame if new data or nth frame reading - if (newData){ - - if(setReceiverOnline()==ONLINE_FLAG){ - - //get data - strcpy(currentfName,""); - pthread_mutex_lock(&mg); - int* receiverData = readFrameFromReceiver(currentfName,currentfIndex);//if(currentfIndex!=-1)cout<<"--currentfIndex:"<= 0) { - fdata = decodeData(receiverData); - delete [] receiverData; - if ((fdata) && (dataReady)){ - thisData = new detectorData(fdata,NULL,NULL,getCurrentProgress(),currentfName,getTotalNumberOfChannels()); - dataReady(thisData, currentfIndex, pCallbackArg); - delete thisData; - fdata = NULL; - } - } - else{ - ;//cout<<"****Detector returned mismatched indices/garbage or acquisition is over. Trying again.***"<registerCallBackFreeFifo(&(freeFifoBufferCallBack),this); - - - dataCompression = false; } slsReceiverFunctionList::~slsReceiverFunctionList(){ - if(latestData) delete [] latestData; - if(fifofree) delete [] fifofree; - if(fifo) delete [] fifo; + if(udpSocket) delete udpSocket; + if(eth) delete [] eth; + if(latestData) delete [] latestData; if(guiFileName) delete [] guiFileName; - if(eth) delete [] eth; - if(mem0) free(mem0); + if(mem0) free(mem0); + if(filter) delete filter; + if(fifo) delete fifo; + if(fifoFree) delete fifoFree; } - -int slsReceiverFunctionList::setEnableFileWrite(int i){ - if(i!=-1) - enableFileWrite=i; - return enableFileWrite; -} - - void slsReceiverFunctionList::setEthernetInterface(char* c){ strcpy(eth,c); } @@ -232,7 +141,7 @@ void slsReceiverFunctionList::setEthernetInterface(char* c){ uint32_t slsReceiverFunctionList::getFrameIndex(){ - if(!framesCaught) + if(!packetsCaught) frameIndex=0; else frameIndex = currframenum - startFrameIndex; @@ -242,7 +151,7 @@ uint32_t slsReceiverFunctionList::getFrameIndex(){ uint32_t slsReceiverFunctionList::getAcquisitionIndex(){ - if(!totalFramesCaught) + if(!totalPacketsCaught) acquisitionIndex=0; else acquisitionIndex = currframenum - startAcquisitionIndex; @@ -283,415 +192,50 @@ int slsReceiverFunctionList::setFileIndex(int i){ } + +int slsReceiverFunctionList::setEnableFileWrite(int i){ + if(i!=-1) + enableFileWrite=i; + return enableFileWrite; +} + + + + void slsReceiverFunctionList::resetTotalFramesCaught(){ acqStarted = false; startAcquisitionIndex = 0; - totalFramesCaught = 0; totalPacketsCaught = 0; } -int slsReceiverFunctionList::startReceiver(char message[]){ -//#ifdef VERBOSE - cout << "Starting Receiver" << endl; -//#endif - cout << endl; +int slsReceiverFunctionList::setShortFrame(int i){ + shortFrame=i; - int err = 0; - if(!receiver_threads_running){ -#ifdef VERBOSE - cout << "Starting new acquisition threadddd ...." << endl; -#endif - //change status - pthread_mutex_lock(&status_mutex); - status = IDLE; - listening_thread_running = 0; - writing_thread_running = 0; - receiver_threads_running = 1; - pthread_mutex_unlock(&(status_mutex)); + if(shortFrame!=-1){ + bufferSize = GOTTHARD_SHORT_BUFFER_SIZE; + maxPacketsPerFile = SHORT_MAX_FRAMES_PER_FILE * GOTTHARD_SHORT_PACKETS_PER_FRAME; + packetsPerFrame = GOTTHARD_SHORT_PACKETS_PER_FRAME; + frameIndexMask = GOTTHARD_SHORT_FRAME_INDEX_MASK; + frameIndexOffset = GOTTHARD_SHORT_FRAME_INDEX_OFFSET; - - // creating listening thread---------- - err = pthread_create(&listening_thread, NULL,startListeningThread, (void*) this); - if(err){ - //change status - pthread_mutex_lock(&status_mutex); - status = IDLE; - listening_thread_running = 0; - receiver_threads_running = 0; - pthread_mutex_unlock(&(status_mutex)); - - sprintf(message,"Cant create listening thread. Status:%d\n",status); - cout << endl << message << endl; - return FAIL; - } - //wait till udp socket created - while(!listening_thread_running); - if(listening_thread_running==-1){ - //change status - pthread_mutex_lock(&status_mutex); - status = IDLE; - listening_thread_running = 0; - receiver_threads_running = 0; - pthread_mutex_unlock(&(status_mutex)); - strcpy(message,"Could not create UDP Socket.\n"); - return FAIL; - } -#ifdef VERBOSE - cout << "Listening thread created successfully." << endl; -#endif - - // creating writing thread---------- - err = 0; - err = pthread_create(&writing_thread, NULL,startWritingThread, (void*) this); - if(err){ - //change status - pthread_mutex_lock(&status_mutex); - status = IDLE; - writing_thread_running = 0; - receiver_threads_running = 0; - pthread_mutex_unlock(&(status_mutex)); - - //stop listening thread - pthread_join(listening_thread,NULL); - sprintf(message,"Cant create writing thread. Status:%d\n",status); - cout << endl << message << endl; - return FAIL; - } - //wait till file is created - while(!writing_thread_running); - if(writing_thread_running==-1){ - //change status - pthread_mutex_lock(&status_mutex); - status = IDLE; - listening_thread_running = 0; - receiver_threads_running = 0; - pthread_mutex_unlock(&(status_mutex)); - if(udpSocket) udpSocket->ShutDownSocket(); - pthread_join(listening_thread,NULL); - sprintf(message,"Could not create file %s.\n",savefilename); - return FAIL; - } -#ifdef VERBOSE - cout << "Writing thread created successfully." << endl; -#endif - - - //change status---------- - pthread_mutex_lock(&status_mutex); - status = RUNNING; - pthread_mutex_unlock(&(status_mutex)); - cout << "Threads created successfully." << endl; - - - - - struct sched_param tcp_param, listen_param, write_param; - int policy= SCHED_RR; - - tcp_param.sched_priority = 50; - listen_param.sched_priority = 99; - write_param.sched_priority = 90; - - /*** ???????????????????????????*/ - if(!dataCompression){ - if (pthread_setschedparam(listening_thread, policy, &listen_param) == EPERM) - cout << "WARNING: Could not prioritize threads. You need to be super user for that." << endl; - if (pthread_setschedparam(writing_thread, policy, &write_param) == EPERM) - cout << "WARNING: Could not prioritize threads. You need to be super user for that." << endl; - if (pthread_setschedparam(pthread_self(),5 , &tcp_param) == EPERM) - cout << "WARNING: Could not prioritize threads. You need to be super user for that." << endl; - } - - //pthread_getschedparam(pthread_self(),&policy,&tcp_param); - //cout << "current priority of main tcp thread is " << tcp_param.sched_priority << endl; - - } - - //initialize semaphore - sem_init(&smp,0,1); - - return OK; -} - - - - - -int slsReceiverFunctionList::stopReceiver(){ -//#ifdef VERBOSE - cout << "Stopping Receiver" << endl; -//#endif - - if(receiver_threads_running){ -#ifdef VERBOSE - cout << "Stopping new acquisition thread" << endl; -#endif - //stop listening thread - pthread_mutex_lock(&status_mutex); - receiver_threads_running=0; - pthread_mutex_unlock(&(status_mutex)); - - if(listening_thread_running == 1){ - if(udpSocket) udpSocket->ShutDownSocket(); - pthread_join(listening_thread,NULL); - } - if(writing_thread_running == 1) - pthread_join(writing_thread,NULL); - - } - //change status - pthread_mutex_lock(&status_mutex); - status = IDLE; - pthread_mutex_unlock(&(status_mutex));; - - //semaphore destroy - sem_post(&smp); - sem_destroy(&smp); - - - cout << "Receiver Stopped.\nStatus:" << status << endl; - return OK; -} - - - -void* slsReceiverFunctionList::startListeningThread(void* this_pointer){ - ((slsReceiverFunctionList*)this_pointer)->startListening(); - - return this_pointer; -} - - - - - -void slsReceiverFunctionList::processFrameForFifo(){ - - //write packet count and set/increment counters/offsets - if(currentFrameCount > -1){ - (*((uint8_t*)(buffer+currentFrameOffset))) = currentPacketCount; - currentPacketCount = 0; - currentFrameCount++; -//#ifdef VERYVERBOSE - totalListeningFrameCount++; -//#endif -#ifdef VERYVERBOSE - cout<<"lcurrframnum:"<< dec<< - (((uint32_t)(*((uint32_t*)(buffer+currentPacketOffset))) & frameIndexMask) >> frameIndexOffset)<<"*"<= numJobsPerThread){ - (*((uint16_t*)buffer)) = currentFrameCount; - while(!fifo->push(buffer)); -#ifdef VERYVERBOSE - cout << "lbuf1:" << (void*)buffer << endl; -#endif - } - - //pop freefifo and reset counters, set offsets - if((currentFrameCount >= numJobsPerThread) || (currentFrameCount == -1)){ - fifofree->pop(buffer); -#ifdef VERYVERBOSE - cout << "lbuf1 popped:" << (void*)buffer << endl; -#endif - currentFrameCount = 0; - currentPacketCount = 0; - currentPacketOffset = HEADER_SIZE_NUM_FRAMES + HEADER_SIZE_NUM_PACKETS; - currentFrameOffset = HEADER_SIZE_NUM_FRAMES; - } -} - - - - -int slsReceiverFunctionList::startListening(){ -#ifdef VERYVERBOSE - cout << "In startListening()\n"); -#endif - int rc=0; - measurementStarted = false; - startFrameIndex = 0; - - int ret=1; - bool newFrame = true; - char *tempchar = new char[oneBufferSize]; - int tempoffset= 0; - - - currentPacketOffset = 0; - currentFrameOffset = 0; - currentFrameCount = -1; - currentPacketCount = 0; -//#ifdef VERYVERBOSE - totalListeningFrameCount = 0; -//#endif - - - //to increase socket receiver buffer size and max length of input queue by changing kernel settings - if(system("echo $((100*1024*1024)) > /proc/sys/net/core/rmem_max")) - cout << "\nWARNING: Could not change socket receiver buffer size in file /proc/sys/net/core/rmem_max" << endl; - else if(system("echo 250000 > /proc/sys/net/core/netdev_max_backlog")) - cout << "\nWARNING: Could not change max length of input queue in file /proc/sys/net/core/netdev_max_backlog" << endl; - - /** permanent setting heiner - net.core.rmem_max = 104857600 # 100MiB - net.core.netdev_max_backlog = 250000 - sysctl -p - // from the manual - sysctl -w net.core.rmem_max=16777216 - sysctl -w net.core.netdev_max_backlog=250000 - */ - - //creating udp socket - if (strchr(eth,'.')!=NULL) strcpy(eth,""); - if(!strlen(eth)){ - cout<<"warning:eth is empty.listening to all"<getErrorStatus(); - if (iret){ - //#ifdef VERBOSE - std::cout<< "Could not create UDP socket on port "<< server_port << " error:"<ReceiveDataOnly(buffer+currentPacketOffset,oneBufferSize); - if( rc <= 0){ -#ifdef VERYVERBOSE - cerr << "recvfrom() failed:"< 0){ - (*((uint8_t*)(buffer+currentFrameOffset))) = currentPacketCount; - if(currentPacketCount != 0){ - currentFrameCount++; - totalListeningFrameCount++; - } - (*((uint16_t*)buffer)) = currentFrameCount; - fifo->push(buffer); -#ifdef VERYVERBOSE - cout <<" last lbuf1:" << (void*)buffer << endl; -#endif - } - //push in dummy packet - while(fifofree->isEmpty()); - fifofree->pop(buffer); - (*((uint16_t*)buffer)) = 0xFFFF; - fifo->push(buffer); -#ifdef VERYVERBOSE - cout << "pushed in dummy buffer:" << (void*)buffer << endl; -#endif - - break; - } - } - //manipulate buffer number to inlude frame number and packet number for gotthard - if ((myDetectorType == GOTTHARD) && (shortFrame == -1)) - (*((uint32_t*)(buffer+currentPacketOffset)))++; - - - //start for each scan - if(!measurementStarted){ - startFrameIndex = ((((uint32_t)(*((uint32_t*)(buffer+currentPacketOffset)))) & (frameIndexMask)) >> frameIndexOffset); - cout<<"startFrameIndex:"<verifyFrame(buffer+currentPacketOffset); - - - //ret = -1, -3 :packets of next frame, so copy it to a later offset or to new buffer - if(ret < -1){ - if((currentFrameCount + 1) >= numJobsPerThread){ - memcpy(tempchar, buffer+currentPacketOffset, oneBufferSize); - processFrameForFifo(); - memcpy(buffer+currentPacketOffset,tempchar,oneBufferSize); - }else{ - tempoffset = currentPacketCount; - processFrameForFifo(); - memcpy(buffer+currentPacketOffset,buffer+tempoffset,oneBufferSize); - } - - //ret = -2, not last frame of next packet. so wait for next packet - if(ret == -2) - ret = 0; - //rer = -3, last packet, so new frame - } - - currentPacketCount++; - - //ret = 0, wait for next packet - if(ret == 0){ - currentPacketOffset += oneBufferSize; - newFrame = false; - } - - // ret = -1, 1, last packet rxd for current frame, so new frame please - else - newFrame = true; - } + bufferSize = GOTTHARD_BUFFER_SIZE; + maxPacketsPerFile = MAX_FRAMES_PER_FILE * GOTTHARD_PACKETS_PER_FRAME; + packetsPerFrame = GOTTHARD_PACKETS_PER_FRAME; + frameIndexMask = GOTTHARD_FRAME_INDEX_MASK; + frameIndexOffset = GOTTHARD_FRAME_INDEX_OFFSET; } - delete tempchar; - pthread_mutex_lock(&status_mutex); - listening_thread_running = FINISHED; - pthread_mutex_unlock(&(status_mutex)); + onePacketSize = bufferSize/packetsPerFrame; -//#ifdef VERYVERBOSE - cout << "Total count listened to " << totalListeningFrameCount << endl; -//#endif - return 0; + //if the filter is inititalized with the wrong readout + if(filter->getPacketsPerFrame() != packetsPerFrame) + setupFilter(); + + return shortFrame; } @@ -699,331 +243,174 @@ int slsReceiverFunctionList::startListening(){ -void* slsReceiverFunctionList::startWritingThread(void* this_pointer){ - ((slsReceiverFunctionList*)this_pointer)->startWriting(); - return this_pointer; +int slsReceiverFunctionList::setNFrameToGui(int i){ + if(i>=0){ + nFrameToGui = i; + setupFifoStructure(); + } + return nFrameToGui; } -int slsReceiverFunctionList::startWriting(){ -#ifdef VERYVERBOSE - cout << "In startWriting()" <setupAcquisitionParameters(filePath,fileName,fileIndex); - // if(enableFileWrite && cbAction > DO_NOTHING) - // This commented option doesnt exist as we save and do ebverything for data compression - //create file - i = filter->initTree(); +int64_t slsReceiverFunctionList::setAcquisitionPeriod(int64_t index){ + if(index >= 0){ + if(index != acquisitionPeriod){ + acquisitionPeriod = index; + setupFifoStructure(); + } } - - //file/tree not created - if(i == FAIL){ - cout << " Error: Could not create file " << savefilename << endl; - pthread_mutex_lock(&status_mutex); - writing_thread_running = -1; - pthread_mutex_unlock(&(status_mutex)); - } - - else{ - //let tcp thread know it started successfully - pthread_mutex_lock(&status_mutex); - writing_thread_running = 1; - pthread_mutex_unlock(&(status_mutex)); - - cout << "Ready!" << endl; - - //will always run till acquisition over and then runs till fifo is empty - while(receiver_threads_running){// || (!fifo->isEmpty())){ - //pop fifo - if(fifo->pop(wbuf)){ - - //number of frames per buffer - numFrames = (uint16_t)(*((uint16_t*)wbuf)); + return acquisitionPeriod; +} - //last dummy packet - if(numFrames == 0xFFFF){ -#ifdef VERYVERBOSE - cout << "popped last dummy frame:" << (void*)wbuf << endl; -#endif - //fifofree->push(wbuf); -/* - cout <<"fifofree is full?:" << fifofree->isFull()<isEmpty()<isEmpty() && status == TRANSMITTING){ - //data compression - if(dataCompression){ - //check if jobs done - while(!filter->checkIfJobsDone()) - usleep(50000); - } -#ifdef VERYVERBOSE - cout << "fifo freed:" << (void*)wbuf << endl; -#endif - fifofree->push(wbuf); -/* - cout <<"fifofree is full?:" << fifofree->isFull()<isEmpty()<>frameIndexOffset); -#ifdef VERYVERBOSE - cout << "currframnum:" << dec << currframenum << endl; -#endif - } - //send it for writing and data compression - if(dataCompression) - filter->assignJobsForThread(wbuf, numFrames); - - //standard way - else{ - //write data call back - if (cbAction < DO_EVERYTHING) { - rawDataReadyCallBack(currframenum, wbuf, numFrames * bufferSize, sfilefd, guiData,pRawDataReady); - } - else { - offset = HEADER_SIZE_NUM_FRAMES; - - while(numFrames > 0){ - //to make sure, we write according to max frames per file - numFramesToBeSaved = maxFramesPerFile - framesInFile; - if(numFramesToBeSaved > numFrames) - numFramesToBeSaved = numFrames; - - //write packets to file - for(i = 0; i < numFramesToBeSaved; i++){ - //determine number of packets - npackets = (uint8_t)(*((uint8_t*)(wbuf + offset))); - totalPacketsCaught += npackets; - offset += HEADER_SIZE_NUM_PACKETS; - //write to file - if(enableFileWrite){ - if(sfilefd) - fwrite(wbuf+offset, 1, npackets * oneBufferSize, sfilefd); - else{ - if(!totalPacketsCaught) - cout << "ERROR: You do not have permissions to overwrite: " << savefilename << endl; - } - } - if(npackets == packetsPerFrame){ - framesCaught++; - totalFramesCaught++; - } - - //increment offset - offset += bufferSize; - } - - - - //increment/decrement counters - framesInFile += numFramesToBeSaved; - numFrames -= numFramesToBeSaved; - //create new file - if(framesInFile >= maxFramesPerFile) - createNewFile(); - - } - } - } - - if(((uint8_t)(*((uint8_t*)(wbuf + HEADER_SIZE_NUM_FRAMES)))) == packetsPerFrame){ - copyFrameToGui(wbuf + HEADER_SIZE_NUM_FRAMES + HEADER_SIZE_NUM_PACKETS); - } - - - if(!dataCompression){ - fifofree->push(wbuf); -#ifdef VERYVERBOSE - cout<<"buf freed:"<<(void*)wbuf<registerCallBackFreeFifo(&(freeFifoBufferCallBack),this); - if(!dataCompression){ - if(sfilefd){ -#ifdef VERBOSE - cout << "sfield:" << (int)sfilefd << endl; -#endif - fclose(sfilefd); - sfilefd = NULL; - } - cout << "Total Packets Caught and written to files:" << dec << totalPacketsCaught << endl; - } - cout << "Total Full Frames Caught:"<< dec << totalFramesCaught << endl; - - - - //acquistion over call back - if (acquisitionFinishedCallBack) - acquisitionFinishedCallBack(totalFramesCaught, pAcquisitionFinished); - - - pthread_mutex_lock(&status_mutex); - writing_thread_running = FINISHED; - pthread_mutex_unlock(&(status_mutex)); - - - return 0; } -int slsReceiverFunctionList::createNewFile(){ - //create file name - if(frameIndexNeeded==-1) - sprintf(savefilename, "%s/%s_%d.raw", filePath,fileName,fileIndex); - else - sprintf(savefilename, "%s/%s_f%012d_%d.raw", filePath,fileName,framesCaught,fileIndex); - - - //if filewrite and we are allowed to write - if(enableFileWrite && cbAction > DO_NOTHING){ - //close - if(sfilefd){ - fclose(sfilefd); - sfilefd = NULL; - } - //open file - if (NULL == (sfilefd = fopen((const char *) (savefilename), "w"))){ - cout << "Error: Could not create file " << savefilename << endl; - pthread_mutex_lock(&status_mutex); - writing_thread_running = -1; - pthread_mutex_unlock(&(status_mutex)); - return FAIL; - } - //setting buffer - setvbuf(sfilefd,NULL,_IOFBF,BUF_SIZE); - //printing packet losses and file names - if(!framesCaught) - cout << savefilename << endl; +/******************* need to look at exit strategy **************************/ +void slsReceiverFunctionList::enableDataCompression(bool enable){ + dataCompression = enable; + if(filter){ + if(filter->enableCompression(enable) == FAIL) + exit(-1); else{ - cout << savefilename - << "\tpacket loss " - << setw(4)<getPacketsPerFrame() != packetsPerFrame){ - - int16_t* map; - int16_t* mask; - - int initial_offset = 2; - int x,y,i,j,offset = 0; - - switch(packetsPerFrame){ - case GOTTHARD_SHORT_PACKETS_PER_FRAME://roi readout for gotthard - x = 1; - y = (GOTTHARD_SHORT_DATABYTES/sizeof(int16_t)); - offset = initial_offset; - - - mask = new int16_t[x*y]; - map = new int16_t[x*y]; - - //set up mask for gotthard short - for (i=0; i < x; i++) - for (j=0; j < y; j++){ - mask[i*y+j] = 0; - } - //set up mapping for gotthard short - for (i=0; i < x; i++) - for (j=0; j < y; j++){ - map[i*y+j] = offset; - offset += 2; - } - - delete filter; - filter = new singlePhotonFilter(x,y,frameIndexMask, GOTTHARD_PACKET_INDEX_MASK, frameIndexOffset, - 0, GOTTHARD_SHORT_PACKETS_PER_FRAME, 0,map, mask,fifofree,GOTTHARD_SHORT_BUFFER_SIZE,&totalFramesCaught,&framesCaught,&currframenum); - break; - - default: //normal readout for gotthard - x = 1; - y = (GOTTHARD_DATA_BYTES/sizeof(int16_t)); - offset = initial_offset; - - mask = new int16_t[x*y]; - map = new int16_t[x*y]; - - //set up mask for gotthard - for (i=0; i < x; i++) - for (j=0; j < y; j++){ - mask[i*y+j] = 0; - } - - //set up mapping for gotthard - for (i=0; i < x; i++) - for (j=0; j < y; j++){ - //since there are 2 packets - if (j == y/2){ - offset += initial_offset; - offset += 1; - } - map[i*y+j] = offset; - offset += 1; - } - - delete filter; - filter = new singlePhotonFilter(x,y,frameIndexMask, GOTTHARD_PACKET_INDEX_MASK, frameIndexOffset, - 0, GOTTHARD_PACKETS_PER_FRAME, 1,map, mask,fifofree,GOTTHARD_BUFFER_SIZE,&totalFramesCaught,&framesCaught,&currframenum); - break; - } - - - + pthread_mutex_lock(&dataReadyMutex); + guiDataReady=0; + //send the first one + memcpy(latestData,startbuf,bufferSize); + strcpy(guiFileName,savefilename); + guiDataReady=1; + pthread_mutex_unlock(&dataReadyMutex); } - return shortFrame; } -void slsReceiverFunctionList::startReadout(){ - //wait so that all packets which take time has arrived - usleep(50000); - - pthread_mutex_lock(&status_mutex); - status = TRANSMITTING; - pthread_mutex_unlock(&status_mutex); - cout << "Status: Transmitting" << endl; - - //push last packet - if((udpSocket) && (listening_thread_running == 1)) - udpSocket->ShutDownSocket(); -} -#endif - - - - -int slsReceiverFunctionList::setNFrameToGui(int i){ - if(i>=0){ - nFrameToGui = i; - setupFifoStructure(); - } - return nFrameToGui; -}; - - -int64_t slsReceiverFunctionList::setAcquisitionPeriod(int64_t index){ - if(index >= 0){ - if(index != acquisitionPeriod){ - acquisitionPeriod = index; - setupFifoStructure(); - } - } - return acquisitionPeriod; -}; - - void slsReceiverFunctionList::setupFifoStructure(){ int64_t i; @@ -1217,26 +498,11 @@ void slsReceiverFunctionList::setupFifoStructure(){ numJobsPerThread = i; } - /*for testing - numJobsPerThread = 3;*/ - //if same, return if(oldn == numJobsPerThread) return; - //deleting old structure and creating fifo structure - if(fifofree){ - while(!fifofree->isEmpty()) - fifofree->pop(buffer); - delete fifofree; - } - if(fifo) delete fifo; - if(mem0) free(mem0); - fifofree = new CircularFifo(fifosize); - fifo = new CircularFifo(fifosize); - - //otherwise memory too much if numjobsperthread is at max = 1000 fifosize = GOTTHARD_FIFO_SIZE; if(myDetectorType == MOENCH) @@ -1248,24 +514,749 @@ void slsReceiverFunctionList::setupFifoStructure(){ fifosize = fifosize/numJobsPerThread; + cout << "Number of Frames per buffer:" << numJobsPerThread << endl; + cout << "Fifo Size:" << fifosize << endl; - /*for testing - fifosize = 11;*/ + /* + //for testing + numJobsPerThread = 3; fifosize = 11; + */ + + //deleting old structure and creating fifo structure + if(fifoFree){ + while(!fifoFree->isEmpty()) + fifoFree->pop(buffer); + delete fifoFree; + } + if(fifo) delete fifo; + if(mem0) free(mem0); + fifoFree = new CircularFifo(fifosize); + fifo = new CircularFifo(fifosize); //allocate memory - mem0=(char*)malloc(((bufferSize+HEADER_SIZE_NUM_PACKETS)*numJobsPerThread+HEADER_SIZE_NUM_FRAMES)*fifosize); - - if (mem0==NULL) /** shud let the client know about this */ - cout<<"++++++++++++++++++++++ COULD NOT ALLOCATE MEMORY!!!!!!!+++++++++++++++++++++" << endl; + mem0=(char*)malloc((bufferSize * numJobsPerThread + HEADER_SIZE_NUM_TOT_PACKETS)*fifosize); + /** shud let the client know about this */ + if (mem0==NULL) + cout<<"++++++++++++++++++++++ COULD NOT ALLOCATE MEMORY FOR LISTENING !!!!!!!+++++++++++++++++++++" << endl; buffer=mem0; - - //push the addresses into freed fifo - while (buffer<(mem0+((bufferSize+HEADER_SIZE_NUM_PACKETS)*numJobsPerThread+HEADER_SIZE_NUM_FRAMES)*(fifosize-1))) { - fifofree->push(buffer); - buffer+=((bufferSize+HEADER_SIZE_NUM_PACKETS)*numJobsPerThread+HEADER_SIZE_NUM_FRAMES); + //push the addresses into freed fifoFree and writingFifoFree + while (buffer<(mem0+(bufferSize * numJobsPerThread + HEADER_SIZE_NUM_TOT_PACKETS)*(fifosize-1))) { + fifoFree->push(buffer); + buffer+=(bufferSize * numJobsPerThread + HEADER_SIZE_NUM_TOT_PACKETS); } - cout<<"Number of Frames per buffer:"<ShutDownSocket(); + + //if eth is mistaken with ip address + if (strchr(eth,'.')!=NULL) + strcpy(eth,""); + + //if no eth, listen to all + if(!strlen(eth)){ + cout<<"warning:eth is empty.listening to all"<getErrorStatus(); + if (iret){ +//#ifdef VERBOSE + cout << "Could not create UDP socket on port " << server_port << " error:" << iret << endl; +//#endif + + return FAIL; + } + return OK; +} + + + + + +void slsReceiverFunctionList::freeFifoBufferCallBack (char* fbuffer, void *this_pointer){ + ((slsReceiverFunctionList*)this_pointer)->freeFifoBuffer(fbuffer); +} + + + +void slsReceiverFunctionList::freeFifoBuffer(char* fbuffer){ + fifoFree->push(fbuffer); +} + + + + +int slsReceiverFunctionList::createThreads(bool destroy){ + int i; + + if(!destroy){ + + //listening thread + pthread_mutex_lock(&status_mutex); + status = IDLE; + running = 0; + pthread_mutex_unlock(&(status_mutex)); + + sem_init(&listensmp,0,0); + if(pthread_create(&listening_thread, NULL,startListeningThread, (void*) this)){ + cout << "Could not create listening thread" << endl; + return FAIL; + } + + //#ifdef VERBOSE + cout << "Listening thread created successfully." << endl; + //#endif + + + //start writer threads + cout << "Creating Writer Threads"; + writerthreads_mask = 0x0; + currentWriterThreadIndex = -1; + + for(i = 0; i < numWriterThreads; ++i){ + sem_init(&writersmp[i],0,0); + thread_started = 0; + currentWriterThreadIndex = i; + if(pthread_create(&writing_thread[i], NULL,startWritingThread, (void*) this)){ + cout << "Could not create writer thread with index " << i << endl; + return FAIL; + } + while(!thread_started); + cout << "."; + cout << flush; + } + //#ifdef VERBOSE + cout << endl << "Writer threads created successfully." << endl; + //#endif + + //assign priorities + struct sched_param tcp_param, listen_param, write_param; + int policy= SCHED_RR; + bool rights = true; + + tcp_param.sched_priority = 50; + listen_param.sched_priority = 99; + write_param.sched_priority = 90; + + if (pthread_setschedparam(listening_thread, policy, &listen_param) == EPERM) + rights = false; + for(i = 0; i < numWriterThreads; ++i) + if(rights) + if (pthread_setschedparam(writing_thread[i], policy, &write_param) == EPERM){ + rights = false; + break; + } + if (pthread_setschedparam(pthread_self(),5 , &tcp_param) == EPERM) + rights = false; + + if(!rights) + cout << "WARNING: Could not prioritize threads. You need to be super user for that." << endl; + + + //to increase socket receiver buffer size and max length of input queue by changing kernel settings + if(system("echo $((100*1024*1024)) > /proc/sys/net/core/rmem_max")) + cout << "\nWARNING: Could not change socket receiver buffer size in file /proc/sys/net/core/rmem_max" << endl; + else if(system("echo 250000 > /proc/sys/net/core/netdev_max_backlog")) + cout << "\nWARNING: Could not change max length of input queue in file /proc/sys/net/core/netdev_max_backlog" << endl; + + /** permanent setting heiner + net.core.rmem_max = 104857600 # 100MiB + net.core.netdev_max_backlog = 250000 + sysctl -p + // from the manual + sysctl -w net.core.rmem_max=16777216 + sysctl -w net.core.netdev_max_backlog=250000 + */ + + + } + + else{ + //cancel threads + for(i = 0; i < numWriterThreads; ++i){ + if(pthread_cancel(writing_thread[i])!=0) + cout << "Unable to cancel Thread of index" << i << endl; + sem_post(&writersmp[i]); + sem_destroy(&writersmp[i]); + } + //semaphore destroy + sem_post(&listensmp); + sem_destroy(&listensmp); + cout << "Threads destroyed" << endl; + } + + return OK; +} + + + + + + + + + + +int slsReceiverFunctionList::setupWriter(){ + + //reset writing thread variables + packetsInFile=0; + packetsCaught=0; + frameIndex=0; + if(sfilefd) sfilefd=NULL; + guiData = NULL; + guiDataReady=0; + strcpy(guiFileName,""); + cbAction = DO_EVERYTHING; + + + //printouts + cout << "Max Packets Per File:" << maxPacketsPerFile << endl; + if (rawDataReadyCallBack) + cout << "Note: Data Write has been defined exernally" << endl; + if (dataCompression) + cout << "Data Compression is enabled with " << numJobsPerThread << " number of jobs per thread" << endl; + if(nFrameToGui) + cout << "Sending every " << nFrameToGui << "th frame to gui" << endl; + + + + //acquisition start call back returns enable write + if (startAcquisitionCallBack) + cbAction=startAcquisitionCallBack(filePath,fileName,fileIndex,bufferSize,pStartAcquisition); + + if(cbAction < DO_EVERYTHING) + cout << endl << "Note: Call back activated. Data saving must be taken care of by user in call back." << endl; + else if(enableFileWrite==0) + cout << endl << "Note: Data will not be saved" << endl; + + + + //creating first file + if(!dataCompression) + return createNewFile(); + else{ + //create file name for gui purposes, and set up acquistion parameters + sprintf(savefilename, "%s/%s_fxxx_%d.raw", filePath,fileName,fileIndex); + filter->setupAcquisitionParameters(filePath,fileName,fileIndex); + // if(enableFileWrite && cbAction > DO_NOTHING) + // This commented option doesnt exist as we save and do ebverything for data compression + //create file + return filter->initTree(); + } +} + + + + + +int slsReceiverFunctionList::createNewFile(){ + + //create file name + if(frameIndexNeeded==-1) + sprintf(savefilename, "%s/%s_%d.raw", filePath,fileName,fileIndex); + else + sprintf(savefilename, "%s/%s_f%012d_%d.raw", filePath,fileName,(packetsCaught/packetsPerFrame),fileIndex); + + + //if filewrite and we are allowed to write + if(enableFileWrite && cbAction > DO_NOTHING){ + //close + if(sfilefd){ + fclose(sfilefd); + sfilefd = NULL; + } + //open file + if (NULL == (sfilefd = fopen((const char *) (savefilename), "w"))){ + cout << "Error: Could not create file " << savefilename << endl; + return FAIL; + } + //setting buffer + setvbuf(sfilefd,NULL,_IOFBF,BUF_SIZE); + //printing packet losses and file names + if(!packetsCaught) + cout << savefilename << endl; + else{ + cout << savefilename + << "\tpacket loss " + << setw(4)<ShutDownSocket(); + + sprintf(message,"Could not create file %s.\n",savefilename); + return FAIL; + } + + //initialize semaphore + sem_init(&smp,0,1); + + //status + pthread_mutex_lock(&status_mutex); + status = RUNNING; + receiver_threads_running = 1; + running = 1; + pthread_mutex_unlock(&(status_mutex)); + + + //start listening /writing + sem_post(&listensmp); + for(int i=0; i < numWriterThreads; ++i) + sem_post(&writersmp[i]); + + cout << "Receiver Started.\nStatus:" << status << endl; + + return OK; +} + + + + +int slsReceiverFunctionList::stopReceiver(){ +//#ifdef VERBOSE + cout << "Stopping Receiver" << endl; +//#endif + + if(status == RUNNING) + startReadout(); + + while(status == TRANSMITTING) + usleep(5000); + + //semaphore destroy + sem_post(&smp); + sem_destroy(&smp); + + //change status + pthread_mutex_lock(&status_mutex); + receiver_threads_running = 0; + status = IDLE; + pthread_mutex_unlock(&(status_mutex)); + + cout << "Receiver Stopped.\nStatus:" << status << endl; + return OK; +} + + + + + +void slsReceiverFunctionList::startReadout(){ + //wait so that all packets which take time has arrived + usleep(50000); + + pthread_mutex_lock(&status_mutex); + status = TRANSMITTING; + pthread_mutex_unlock(&status_mutex); + cout << "Status: Transmitting" << endl; + + //kill udp socket to tell the listening thread to push last packet + if(udpSocket) + udpSocket->ShutDownSocket(); + +} + + + +void* slsReceiverFunctionList::startListeningThread(void* this_pointer){ + ((slsReceiverFunctionList*)this_pointer)->startListening(); + + return this_pointer; +} + + + +void* slsReceiverFunctionList::startWritingThread(void* this_pointer){ + ((slsReceiverFunctionList*)this_pointer)->startWriting(); + return this_pointer; +} + + + + + + +int slsReceiverFunctionList::startListening(){ +#ifdef VERYVERBOSE + cout << "In startListening()" << endl; +#endif + + int lastpacket, offset, rc, packetcount, maxBufferSize; + char* tempchar = NULL; + + while(1){ + //variables that need to be checked/set before each acquisition + offset = HEADER_SIZE_NUM_TOT_PACKETS; + lastpacket = 0; + maxBufferSize = packetsPerFrame * numJobsPerThread * onePacketSize; + if(tempchar) {delete [] tempchar;tempchar = NULL;} + tempchar = new char[onePacketSize]; + + + while(running){ + + //pop + fifoFree->pop(buffer); +#ifdef VERYDEBUG + cout << "*** popped from fifo free" << (void*)buffer << endl; +#endif + + //receive + offset = HEADER_SIZE_NUM_TOT_PACKETS; + if(!lastpacket){ + rc = udpSocket->ReceiveDataOnly(buffer + offset, maxBufferSize); + offset = maxBufferSize; + }else{ +#ifdef VERYDEBUG + cout << "***last packet" << endl; +#endif + //if there is a packet from previous buffer, copy it and listen to 1 less frame + memcpy(buffer + offset, tempchar, onePacketSize); + offset += onePacketSize; + rc = udpSocket->ReceiveDataOnly(buffer + offset,maxBufferSize - onePacketSize); + offset = maxBufferSize - onePacketSize; + } + +#ifdef VERYDEBUG + cout << "*** rc:" << rc << endl; + cout << "*** offset:" << offset << endl;*/ +#endif + //start indices + //start of scan + if((!measurementStarted) && (rc > 0)){ + //gotthard has +1 for frame number + if ((myDetectorType == GOTTHARD) && (shortFrame == -1)) + startFrameIndex = (((((uint32_t)(*((uint32_t*)(buffer + HEADER_SIZE_NUM_TOT_PACKETS))))+1) + & (frameIndexMask)) >> frameIndexOffset); + else + startFrameIndex = ((((uint32_t)(*((uint32_t*)(buffer+HEADER_SIZE_NUM_TOT_PACKETS)))) + & (frameIndexMask)) >> frameIndexOffset); + cout<<"startFrameIndex:"<push(buffer); + continue; + } + //push the last buffer into fifo + if(rc > 0){ + packetcount = (rc/onePacketSize); +#ifdef VERYDEBUG + cout << "*** last packetcount:" << packetcount << endl; +#endif + (*((uint16_t*)(buffer))) = packetcount; + totalListeningFrameCount += packetcount; + while(!fifo->push(buffer)); +#ifdef VERYDEBUG + cout << "*** last lbuf1:" << (void*)buffer << endl; +#endif + } + //push dummy buffer + fifoFree->pop(buffer); + (*((uint16_t*)(buffer))) = 0xFFFF; + while(!fifo->push(buffer)); +#ifdef VERYDEBUG + cout << "pushed in dummy buffer:" << (void*)buffer << endl; +#endif + cout << "Total count listened to " << totalListeningFrameCount/packetsPerFrame << endl; + pthread_mutex_lock(&status_mutex); + running = 0; + pthread_mutex_unlock(&(status_mutex)); + break; + } + + + //check if last packet valid and calculate packet count + packetcount = packetsPerFrame * numJobsPerThread; + if(shortFrame != -1) + lastpacket = 0; + else{ + switch(myDetectorType){ + case MOENCH: + /*** last 40 packets ??? last packet header calculation with no +1) copy how many*/ + break; + default: + lastpacket = (((numJobsPerThread * packetsPerFrame - 1) * onePacketSize) + HEADER_SIZE_NUM_TOT_PACKETS); +#ifdef VERYDEBUG + cout << "last opacket:" << lastpacket << endl; +#endif + if((packetsPerFrame -1) == ((((uint32_t)(*((uint32_t*)(buffer+lastpacket))))+1) & (packetIndexMask))) + lastpacket = 0; + else{ + memcpy(tempchar,buffer+lastpacket, onePacketSize); +#ifdef VERYDEBUG + cout << "tempchar header:" << (((((uint32_t)(*((uint32_t*)(tempchar))))+1) + & (frameIndexMask)) >> frameIndexOffset) << endl; +#endif + --packetcount; + } + break; + } + } + +#ifdef VERYDEBUG + cout << "header:" << (((((uint32_t)(*((uint32_t*)(buffer + HEADER_SIZE_NUM_TOT_PACKETS))))+1) + & (frameIndexMask)) >> frameIndexOffset) << endl; + cout << "*** packetcount:" << packetcount << endl; +#endif + //write packet count and push + (*((uint16_t*)(buffer))) = packetcount; + totalListeningFrameCount += packetcount; + while(!fifo->push(buffer)); +#ifdef VERYDEBUG + cout << "*** pushed into listening fifo" << endl; +#endif + } + sem_wait(&listensmp); + } + + return OK; +} + + + + + + + + + + + + + + + +int slsReceiverFunctionList::startWriting(){ + int ithread = currentWriterThreadIndex; +#ifdef VERYVERBOSE + cout << ithread << "In startWriting()" <pop(wbuf); + numpackets = (uint16_t)(*((uint16_t*)wbuf)); +#ifdef VERYDEBUG + cout << "numpackets:" << hex << numpackets << endl; + cout << ithread << "*** popped from fifo " << numpackets << endl; +#endif + + + + + //last dummy packet + if(numpackets == 0xFFFF){ +#ifdef VERYDEBUG + cout << "popped last dummy frame:" << (void*)wbuf << endl; +#endif + //data compression, check if jobs done + if(dataCompression){ + while(!filter->checkIfJobsDone()) + usleep(50000); + } + //free fifo + while(!fifoFree->push(wbuf)); +#ifdef VERYDEBUG + cout << "fifo freed:" << (void*)wbuf << endl; +#endif + //update status + pthread_mutex_lock(&status_mutex); + status = RUN_FINISHED; + pthread_mutex_unlock(&(status_mutex)); + cout << "Status: Run Finished" << endl; + //close file + if(sfilefd){ +#ifdef VERBOSE + cout << "sfield:" << (int)sfilefd << endl; +#endif + fclose(sfilefd); + sfilefd = NULL; + } + //report + cout << "Total Packets Caught:" << dec << totalPacketsCaught << endl; + cout << "Total Frames Caught:"<< dec << (totalPacketsCaught/packetsPerFrame) << endl; + //acquisition end + if (acquisitionFinishedCallBack) + acquisitionFinishedCallBack((totalPacketsCaught/packetsPerFrame), pAcquisitionFinished); + continue; + } + + + + + //for progress + if ((myDetectorType == GOTTHARD) && (shortFrame == -1)) + currframenum = (((((uint32_t)(*((uint32_t*)(wbuf + HEADER_SIZE_NUM_TOT_PACKETS))))+1)& (frameIndexMask)) >> frameIndexOffset); + else + currframenum = ((((uint32_t)(*((uint32_t*)(wbuf + HEADER_SIZE_NUM_TOT_PACKETS))))& (frameIndexMask)) >> frameIndexOffset); + +#ifdef VERYDEBUG + cout << ithread << " currframnum:" << dec << currframenum << endl; +#endif + + + //without datacompression: write datacall back, or write data, free fifo + if(!dataCompression){ + if (cbAction < DO_EVERYTHING) + rawDataReadyCallBack(currframenum, wbuf, numpackets * onePacketSize, sfilefd, guiData,pRawDataReady); + else if (numpackets > 0) + writeToFile_withoutCompression(wbuf, numpackets); + while(!fifoFree->push(wbuf)); +#ifdef VERYVERBOSE + cout<<"buf freed:"<<(void*)wbuf< 0){ + //for progress and packet loss calculation(new files) + if ((myDetectorType == GOTTHARD) && (shortFrame == -1)) + currframenum = (((((uint32_t)(*((uint32_t*)(buf + HEADER_SIZE_NUM_TOT_PACKETS))))+1)& (frameIndexMask)) >> frameIndexOffset); + else + currframenum = ((((uint32_t)(*((uint32_t*)(buf + HEADER_SIZE_NUM_TOT_PACKETS))))& (frameIndexMask)) >> frameIndexOffset); + +#ifdef VERYDEBUG + cout << " currframnum:" << dec << currframenum << endl; +#endif + + //to create new file when max reached + packetsToSave = maxPacketsPerFile - packetsInFile; + if(packetsToSave > numpackets) + packetsToSave = numpackets; + + fwrite(buf+offset, 1, packetsToSave * onePacketSize, sfilefd); + offset += (packetsToSave * onePacketSize); + packetsInFile += packetsToSave; + packetsCaught += packetsToSave; + totalPacketsCaught += packetsToSave; + numpackets -= packetsToSave; + //new file + if(packetsInFile >= maxPacketsPerFile) + createNewFile(); + } + } + //no file write + else{ + packetsInFile += numpackets; + packetsCaught += numpackets; + totalPacketsCaught += numpackets; + } + + pthread_mutex_unlock(&(progress_mutex)); +} + +#endif diff --git a/slsDetectorSoftware/slsReceiver/slsReceiverFunctionList.h b/slsDetectorSoftware/slsReceiver/slsReceiverFunctionList.h index 3152458d5..eb661cbc3 100644 --- a/slsDetectorSoftware/slsReceiver/slsReceiverFunctionList.h +++ b/slsDetectorSoftware/slsReceiver/slsReceiverFunctionList.h @@ -71,12 +71,12 @@ public: /** * Returns Frames Caught for each real time acquisition (eg. for each scan) */ - int getFramesCaught(){return framesCaught;}; + int getFramesCaught(){return (packetsCaught/packetsPerFrame);}; /** * Returns Total Frames Caught for an entire acquisition (including all scans) */ - int getTotalFramesCaught(){return totalFramesCaught;}; + int getTotalFramesCaught(){return (totalPacketsCaught/packetsPerFrame);}; /** * Returns the frame index at start of each real time acquisition (eg. for each scan) @@ -143,6 +143,52 @@ public: */ void resetTotalFramesCaught(); + /** + * Set short frame + * @param i if shortframe i=1 + */ + int setShortFrame(int i); + + /** + * Set the variable to send every nth frame to gui + * or if 0,send frame only upon gui request + */ + int setNFrameToGui(int i); + + /** set acquisition period if a positive number + */ + int64_t setAcquisitionPeriod(int64_t index); + + /** enabl data compression, by saving only hits + */ + void enableDataCompression(bool enable); + + /** get data compression, by saving only hits + */ + bool getDataCompression(){ return dataCompression;}; + + /** set status to transmitting and + * when fifo is empty later, sets status to run_finished + */ + void startReadout(); + + /** + * Returns the buffer-current frame read by receiver + * @param c pointer to current file name + * @param raw address of pointer, pointing to current frame to send to gui + */ + void readFrame(char* c,char** raw); + + + /** free fifo buffer, called back from single photon filter + */ + static void freeFifoBufferCallBack (char* fbuffer, void *this_pointer); + + /** + * Call back from single photon filter to free writingfifo + * called from freeFifoBufferCallBack + */ + void freeFifoBuffer(char* fbuffer); /** * Starts Receiver - starts to listen for packets @@ -157,50 +203,48 @@ public: */ int stopReceiver(); - /** - * Returns the buffer-current frame read by receiver - * @param c pointer to current file name - * @param raw address of pointer, pointing to current frame to send to gui - */ - void readFrame(char* c,char** raw); - - /** - * Set short frame - * @param i if shortframe i=1 - */ - int setShortFrame(int i); - - /** set status to transmitting and - * when fifo is empty later, sets status to run_finished - */ - void startReadout(); - - /** enabl data compression, by saving only hits - */ - void enableDataCompression(bool enable){dataCompression = enable;if(filter)filter->enableCompression(enable);}; - - /** get data compression, by saving only hits - */ - bool getDataCompression(){ return dataCompression;}; - - /** - * Set the variable to send every nth frame to gui - * or if 0,send frame only upon gui request - */ - int setNFrameToGui(int i); - - /** set acquisition period if a positive number - */ - int64_t setAcquisitionPeriod(int64_t index); - - /** free fifo buffer, called back from single photon filter - */ - static void freeFifoBufferCallBack (char* fbuffer, void *this_pointer){((slsReceiverFunctionList*)this_pointer)->freeFifoBuffer(fbuffer);}; - void freeFifoBuffer(char* fbuffer){fifofree->push(fbuffer);}; - - private: + /** + * Constructs the singlePhotonFilter object + */ + void setupFilter(); + + /** + * Copy frames to gui + * uses semaphore for nth frame mode + */ + void copyFrameToGui(char* startbuf); + + /** + * set up fifo according to the new numjobsperthread + */ + void setupFifoStructure (); + + /** + * creates udp socket + * \returns if success or fail + */ + int createUDPSocket(); + + /** + * create listening thread and many writer threads at class construction + * @param destroy is true to kill all threads and start again + */ + int createThreads(bool destroy = false); + + /** + * initializes variables and creates the first file + * also does the startAcquisitionCallBack + * \returns FAIL or OK + */ + int setupWriter(); + + /** + * Creates new file + *\returns OK for succces or FAIL for failure + */ + int createNewFile(); /** * Static function - Thread started which listens to packets. @@ -209,13 +253,6 @@ private: */ static void* startListeningThread(void *this_pointer); - /** - * Thread started which listens to packets. - * Called by startReceiver() - * - */ - int startListening(); - /** * Static function - Thread started which writes packets to file. * Called by startReceiver() @@ -223,6 +260,13 @@ private: */ static void* startWritingThread(void *this_pointer); + /** + * Thread started which listens to packets. + * Called by startReceiver() + * + */ + int startListening(); + /** * Thread started which writes packets to file. * Called by startReceiver() @@ -230,34 +274,38 @@ private: */ int startWriting(); - /** - * Creates new file - *\returns OK for succces or FAIL for failure - */ - int createNewFile(); /** - * Copy frames to gui - * uses semaphore for nth frame mode + * Writing to file without compression + * @param buf is the address of buffer popped out of fifo + * @param num */ - void copyFrameToGui(char* startbuf); + void writeToFile_withoutCompression(char* buf,int numpackets); + + + - /** set up fifo according to the new numjobsperthread - */ - void setupFifoStructure (); - /** - * increment counters, pop and push fifos - */ - void processFrameForFifo(); /** detector type */ detectorType myDetectorType; - /** max frames per file **/ - int maxFramesPerFile; + /** status of receiver */ + runStatus status; + + /** UDP Socket between Receiver and Detector */ + genericSocket* udpSocket; + + /** Server UDP Port*/ + int server_port; + + /** ethernet interface or IP to listen to */ + char *eth; + + /** max packets per file **/ + int maxPacketsPerFile; /** File write enable */ int enableFileWrite; @@ -277,9 +325,6 @@ private: /** if frame index required in file name */ int frameIndexNeeded; - /** Frames Caught for each real time acquisition (eg. for each scan) */ - int framesCaught; - /* Acquisition started */ bool acqStarted; @@ -292,14 +337,14 @@ private: /** Actual current frame index of each time acquisition (eg. for each scan) */ uint32_t frameIndex; - /** Total Frames Caught for an entire acquisition (including all scans) */ - int totalFramesCaught; + /** Frames Caught for each real time acquisition (eg. for each scan) */ + int packetsCaught; /** Total packets caught for an entire acquisition (including all scans) */ int totalPacketsCaught; - /** Frames currently in current file, starts new file when it reaches max */ - int framesInFile; + /** Pckets currently in current file, starts new file when it reaches max */ + int packetsInFile; /** Frame index at start of an entire acquisition (including all scans) */ uint32_t startAcquisitionIndex; @@ -307,69 +352,39 @@ private: /** Actual current frame index of an entire acquisition (including all scans) */ uint32_t acquisitionIndex; - /** Previous Frame number from buffer */ - uint32_t prevframenum; + /** number of packets per frame*/ + int packetsPerFrame; - /** thread listening to packets */ - pthread_t listening_thread; + /** frame index mask */ + uint32_t frameIndexMask; - /** thread writing packets */ - pthread_t writing_thread; + /** packet index mask */ + uint32_t packetIndexMask; - /** mutex for locking variable used by different threads */ - pthread_mutex_t status_mutex; + /** frame index offset */ + int frameIndexOffset; - /** listening thread running */ - int listening_thread_running; - - /** writing thread running */ - int writing_thread_running; - - /** status of receiver */ - runStatus status; - - /** Receiver buffer */ - char* buffer; - - /** Receiver buffer */ - char *mem0, *memfull; - - /** latest data */ - char* latestData; - - /** UDP Socket between Receiver and Detector */ - genericSocket* udpSocket; - - /** Server UDP Port*/ - int server_port; - - /** ethernet interface or IP to listen to */ - char *eth; - - /** Element structure to put inside a fifo */ - struct dataStruct { - char* buffer; - int rc; - }; - - /** circular fifo to read and write data*/ - CircularFifo* fifo; - - /** circular fifo to read and write data*/ - CircularFifo* fifofree; - - /** fifo size */ - unsigned int fifosize; + /** acquisition period */ + int64_t acquisitionPeriod; /** short frames */ int shortFrame; + /** current frame number */ + uint32_t currframenum; + + /** Previous Frame number from buffer */ + uint32_t prevframenum; + /** buffer size can be 1286*2 or 518 or 1286*40 */ int bufferSize; - /** number of packets per frame*/ - int packetsPerFrame; - + /** oen buffer size */ + int onePacketSize; + + /** latest data */ + char* latestData; + /** gui data ready */ int guiDataReady; @@ -379,17 +394,17 @@ private: /** points to the filename to send to gui */ char* guiFileName; - /** current frame number */ - uint32_t currframenum; - /** send every nth frame to gui or only upon gui request*/ int nFrameToGui; - /** frame index mask */ - int frameIndexMask; + /** fifo size */ + unsigned int fifosize; - /** frame index offset */ - int frameIndexOffset; + /** number of jobs per thread for data compression */ + int numJobsPerThread; + + /** memory allocated for the buffer */ + char *mem0; /** datacompression - save only hits */ bool dataCompression; @@ -397,35 +412,66 @@ private: /** single photon filter */ singlePhotonFilter *filter; - /** oen buffer size */ - int oneBufferSize; + /** circular fifo to store addresses of data read */ + CircularFifo* fifo; - /** semaphore to synchronize writer and guireader threads */ - sem_t smp; + /** circular fifo to store addresses of data already written and ready to be resued*/ + CircularFifo* fifoFree; - /** guiDataReady mutex */ - pthread_mutex_t dataReadyMutex; + /** Receiver buffer */ + char *buffer; - /** number of jobs per thread for data compression */ - int numJobsPerThread; + /** max number of writer threads */ + const static int MAX_NUM_WRITER_THREADS = 15; - /** offset of current frame */ - int currentFrameOffset; + /** number of writer threads */ + int numWriterThreads; - /** offset of current packet */ - int currentPacketOffset; + /** to know if listening and writer threads created properly */ + int thread_started; - /** current packet count for current frame */ - int currentPacketCount; + /** mask showing which threads are running */ + volatile int32_t writerthreads_mask; - /** current frame count for current buffer */ - int currentFrameCount; + /** current writer thread index*/ + int currentWriterThreadIndex; + + /** thread listening to packets */ + pthread_t listening_thread; + + /** thread writing packets */ + pthread_t writing_thread[MAX_NUM_WRITER_THREADS]; /** total frame count the listening thread has listened to */ int totalListeningFrameCount; - /** acquisition period */ - int64_t acquisitionPeriod; + /** 0 if receiver is idle, 1 otherwise */ + int running; + + + +//semaphores + /** semaphore to synchronize writer and guireader threads */ + sem_t smp; + /** semaphore to synchronize listener thread */ + sem_t listensmp; + /** semaphore to synchronize writer threads */ + sem_t writersmp[MAX_NUM_WRITER_THREADS]; + +//mutex + /** guiDataReady mutex */ + pthread_mutex_t dataReadyMutex; + + /** mutex for status */ + pthread_mutex_t status_mutex; + + /** mutex for progress variable currframenum */ + pthread_mutex_t progress_mutex; + + /** mutex for writing data to file */ + pthread_mutex_t write_mutex; + + /** callback arguments are @@ -490,14 +536,12 @@ public: 0 callback takes care of open,close,wrie file 1 callback writes file, we have to open, close it 2 we open, close, write file, callback does not do anything - */ void registerCallBackStartAcquisition(int (*func)(char*, char*,int, int, void*),void *arg){startAcquisitionCallBack=func; pStartAcquisition=arg;}; /** callback argument is toatal frames caught - */ void registerCallBackAcquisitionFinished(void (*func)(int, void*),void *arg){acquisitionFinishedCallBack=func; pAcquisitionFinished=arg;}; diff --git a/slsDetectorSoftware/slsReceiver/slsReceiver_funcs.cpp b/slsDetectorSoftware/slsReceiver/slsReceiver_funcs.cpp index 67607b7b8..59e5ca42a 100644 --- a/slsDetectorSoftware/slsReceiver/slsReceiver_funcs.cpp +++ b/slsDetectorSoftware/slsReceiver/slsReceiver_funcs.cpp @@ -733,7 +733,7 @@ int slsReceiverFuncs::start_receiver(){ ret = FAIL; } */ - else if(slsReceiverList->getStatus()!=RUNNING) + else if(slsReceiverList->getStatus()==IDLE) ret=slsReceiverList->startReceiver(mess); #endif @@ -1183,10 +1183,10 @@ int slsReceiverFuncs::gotthard_read_frame(){ cout << "index:" << hex << index << endl; #endif }else{ - bindex = (uint32_t)(*((uint32_t*)raw)); + bindex = ((uint32_t)(*((uint32_t*)raw)))+1; pindex = (bindex & GOTTHARD_PACKET_INDEX_MASK); index = ((bindex & GOTTHARD_FRAME_INDEX_MASK) >> GOTTHARD_FRAME_INDEX_OFFSET); - bindex2 = (uint32_t)(*((uint32_t*)((char*)(raw+onebuffersize)))); + bindex2 = ((uint32_t)(*((uint32_t*)((char*)(raw+onebuffersize)))))+1; pindex2 =(bindex2 & GOTTHARD_PACKET_INDEX_MASK); index2 =((bindex2 & GOTTHARD_FRAME_INDEX_MASK) >> GOTTHARD_FRAME_INDEX_OFFSET); #ifdef VERBOSE