diff --git a/slsDetectorSoftware/slsDetectorAnalysis/postProcessing.cpp b/slsDetectorSoftware/slsDetectorAnalysis/postProcessing.cpp index 53d799c68..b8d7bd3d5 100644 --- a/slsDetectorSoftware/slsDetectorAnalysis/postProcessing.cpp +++ b/slsDetectorSoftware/slsDetectorAnalysis/postProcessing.cpp @@ -428,6 +428,102 @@ void* postProcessing::processData(int delflag) { */ + int progress = 0; + char currentfName[MAX_STR_LENGTH]=""; + int currentfIndex = -1; + bool newData = false; + int nthframe = setReadReceiverFrequency(0); +#ifdef VERBOSE + std::cout << "receiver read freq:" << nthframe << std::endl; +#endif + + //if nth frame + if(nthframe){ + newData = true; + //and no gui + if(!dataReady){ + std::cout << "Error: receiver read freq is set to " << nthframe << " but should be > 0 only when using gui." << std::endl; + nthframe = 0; + std::cout << "Current receiver read frequency: " << nthframe << std::endl; + } + } + + //repeat forever until joined by the calling thread + while(1){ + + cout.flush(); + cout< progress) + newData = true; + } + + if(newData){ + if(setReceiverOnline()==ONLINE_FLAG){ + //get data + strcpy(currentfName,""); + pthread_mutex_lock(&mg); + int* receiverData = readFrameFromReceiver(currentfName,currentfIndex); + pthread_mutex_unlock(&mg); + /*cout<<"index:"<= 0) { + fdata = decodeData(receiverData); + delete [] receiverData; + if ((fdata) && (dataReady)){ + thisData = new detectorData(fdata,NULL,NULL,getCurrentProgress(),currentfName,getTotalNumberOfChannels()); + dataReady(thisData, currentfIndex, pCallbackArg); + delete thisData; + fdata = NULL; + progress = currentfIndex; + if(!nthframe) + newData = false; + } + } + else{ + ;//cout<<"****Detector returned mismatched indices/garbage or acquisition is over. Trying again.***"< *currentframenum){ - pthread_mutex_lock(&frnum_mutex); - *currentframenum = clusteriframe; - pthread_mutex_unlock(&frnum_mutex); - } - +/*cout<<"currentframenum:"<<(*currentframenum)< -1){ + (*((uint8_t*)(buffer+currentFrameOffset))) = currentPacketCount; + currentPacketCount = 0; + currentFrameCount++; +//#ifdef VERYVERBOSE + totalListeningFrameCount++; +//#endif +#ifdef VERYVERBOSE + cout<<"lcurrframnum:"<< dec<< + (((uint32_t)(*((uint32_t*)(buffer+currentPacketOffset))) & frameIndexMask) >> frameIndexOffset)<<"*"<= numJobsPerThread){ + (*((uint16_t*)buffer)) = currentFrameCount; + while(!fifo->push(buffer)); +#ifdef VERYVERBOSE + cout << "lbuf1:" << (void*)buffer << endl; +#endif + } + + //pop freefifo and reset counters, set offsets + if((currentFrameCount >= numJobsPerThread) || (currentFrameCount == -1)){ + fifofree->pop(buffer); +#ifdef VERYVERBOSE + cout << "lbuf1 popped:" << (void*)buffer << endl; +#endif + currentFrameCount = 0; + currentPacketCount = 0; + currentPacketOffset = HEADER_SIZE_NUM_FRAMES + HEADER_SIZE_NUM_PACKETS; + currentFrameOffset = HEADER_SIZE_NUM_FRAMES; + } +} @@ -475,17 +519,21 @@ int slsReceiverFunctionList::startListening(){ measurementStarted = false; startFrameIndex = 0; - int offset=0; - int frameStartOffset = 0; int ret=1; - int i=0; - int framesCount = -1; - int packetsCount = 0; + bool newFrame = true; char *tempchar = new char[oneBufferSize]; + int tempoffset= 0; + + + currentPacketOffset = 0; + currentFrameOffset = 0; + currentFrameCount = -1; + currentPacketCount = 0; //#ifdef VERYVERBOSE - int totalcount = 0; + totalListeningFrameCount = 0; //#endif + //to increase socket receiver buffer size and max length of input queue by changing kernel settings if(system("echo $((100*1024*1024)) > /proc/sys/net/core/rmem_max")) cout << "\nWARNING: Could not change socket receiver buffer size in file /proc/sys/net/core/rmem_max" << endl; @@ -527,34 +575,10 @@ int slsReceiverFunctionList::startListening(){ while (receiver_threads_running) { - //push buffer - if(framesCount >= numJobsPerThread){ - //write frame count for each buffer - (*((uint16_t*)buffer)) = framesCount; - while(!fifo->push(buffer)); -#ifdef VERYVERBOSE - cout << "lbuf1:" << (void*)buffer << endl; -#endif - } + if(newFrame) + processFrameForFifo(); - //pop freefifo - if((framesCount >= numJobsPerThread) || (framesCount == -1)){ - //reset frame count and packet count - framesCount = 0; - packetsCount = 0; - //pop freefifo - /*while(fifofree->isEmpty());*/ - fifofree->pop(buffer); -#ifdef VERYVERBOSE - cout << "lbuf1 popped:" << (void*)buffer << endl; -#endif - //increment offsets - offset = HEADER_SIZE_NUM_FRAMES; - offset += HEADER_SIZE_NUM_PACKETS; - frameStartOffset = HEADER_SIZE_NUM_FRAMES; - } - //let tcp thread know this thread is in working condition if(!startFrameIndex){ if(!listening_thread_running){ @@ -565,148 +589,97 @@ int slsReceiverFunctionList::startListening(){ } - //ret -2, remaining, start new frame with curent packet, then progress to ret = 0 (waiting for next packet) - if(ret == -2){ - memcpy(buffer+offset,tempchar,oneBufferSize); - ret = 0; - } - - //ret = -3, remaning: start new frame with current packet, progress to ret = -1 (invalidate remaining packets, start new frame) - else if(ret == -3){ - memcpy(buffer+offset,tempchar,oneBufferSize); - ret = -1; - } - - - - else{ - //receive 1 packet - rc = udpSocket->ReceiveDataOnly(buffer+offset,oneBufferSize); - if( rc <= 0){ + //receive 1 packet + rc = udpSocket->ReceiveDataOnly(buffer+currentPacketOffset,oneBufferSize); + if( rc <= 0){ #ifdef VERYVERBOSE - cerr << "recvfrom() failed:"< 0){ - (*((uint16_t*)buffer)) = framesCount; - fifo->push(buffer); -#ifdef VERYVERBOSE - cout <<" last lbuf1:" << (void*)buffer << endl; + cerr << "recvfrom() failed:"< 0){ + (*((uint8_t*)(buffer+currentFrameOffset))) = currentPacketCount; + if(currentPacketCount != 0){ + currentFrameCount++; + totalListeningFrameCount++; } - //push in dummy packet - while(fifofree->isEmpty()); - fifofree->pop(buffer); - (*((uint16_t*)buffer)) = 0xFFFF; + (*((uint16_t*)buffer)) = currentFrameCount; fifo->push(buffer); #ifdef VERYVERBOSE - cout << "pushed in dummy buffer:" << (void*)buffer << endl; + cout <<" last lbuf1:" << (void*)buffer << endl; #endif - - break; } - } - //manipulate buffer number to inlude frame number and packet number for gotthard - if ((myDetectorType == GOTTHARD) && (shortFrame == -1)) - (*((uint32_t*)(buffer+offset)))++; - - - //start for each scan - if(!measurementStarted){ - startFrameIndex = ((((uint32_t)(*((uint32_t*)(buffer+offset)))) & (frameIndexMask)) >> frameIndexOffset); - cout<<"startFrameIndex:"<verifyFrame(buffer+offset); - /* - rets - case 0: waiting for next packet of new frame - case 1: finished with full frame, - start new frame - case -1: last packet of current frame, - invalidate remaining packets, - start new frame - case -2: first packet of new frame, - invalidate remaining packets, - check buffer needs to be pushed, - start new frame with the current packet, - then ret = 0 - case -3: last packet of new frame, - invalidate remaining packets, - check buffer needs to be pushed, - start new frame with current packet, - then ret = -1 (invalidate remaining packets and start a new frame) - */ - - } - - - //for each packet - packetsCount++; - - //ret = 0, so just increment offset and continue - if(ret == 0){ - offset += oneBufferSize; - continue; - } - - - //ret -2, -3, copy the current packet temporarily - if(ret < -1){ - memcpy(tempchar, buffer+offset, oneBufferSize); - packetsCount --; - } - //ret -1, change needed only for the remaining packets - else if (ret == -1){ - offset += oneBufferSize; - ret = 1; - } - - //ret = -1, -2, -3, invalidate remaining packets - if(ret < 0){ - for( i = offset; i < bufferSize; i += oneBufferSize) - (*((uint32_t*)(buffer+i))) = 0xFFFFFFFF; - } - - - - //for each frame - //write packet count - (*((uint8_t*)(buffer+frameStartOffset))) = packetsCount; - //reset packet count - packetsCount = 0; - //increment frame count - framesCount++; -//#ifdef VERYVERBOSE - totalcount++; -//#endif + //push in dummy packet + while(fifofree->isEmpty()); + fifofree->pop(buffer); + (*((uint16_t*)buffer)) = 0xFFFF; + fifo->push(buffer); #ifdef VERYVERBOSE - cout<<"lcurrframnum:"<< dec<< - (((uint32_t)(*((uint32_t*)(buffer+offset))) & frameIndexMask) >> frameIndexOffset)<<"*"<> frameIndexOffset); + cout<<"startFrameIndex:"<verifyFrame(buffer+currentPacketOffset); + + + //ret = -1, -3 :packets of next frame, so copy it to a later offset or to new buffer + if(ret < -1){ + if((currentFrameCount + 1) >= numJobsPerThread){ + memcpy(tempchar, buffer+currentPacketOffset, oneBufferSize); + processFrameForFifo(); + memcpy(buffer+currentPacketOffset,tempchar,oneBufferSize); + }else{ + tempoffset = currentPacketCount; + processFrameForFifo(); + memcpy(buffer+currentPacketOffset,buffer+tempoffset,oneBufferSize); + } + + //ret = -2, not last frame of next packet. so wait for next packet + if(ret == -2) + ret = 0; + //rer = -3, last packet, so new frame + } + + currentPacketCount++; + + //ret = 0, wait for next packet + if(ret == 0){ + currentPacketOffset += oneBufferSize; + newFrame = false; + } + + // ret = -1, 1, last packet rxd for current frame, so new frame please + else + newFrame = true; } - } delete tempchar; @@ -716,7 +689,7 @@ int slsReceiverFunctionList::startListening(){ pthread_mutex_unlock(&(status_mutex)); //#ifdef VERYVERBOSE - cout << "Total count listened to " << totalcount << endl; + cout << "Total count listened to " << totalListeningFrameCount << endl; //#endif return 0; } @@ -904,7 +877,10 @@ int slsReceiverFunctionList::startWriting(){ cout << "ERROR: You do not have permissions to overwrite: " << savefilename << endl; } } - + if(npackets == packetsPerFrame){ + framesCaught++; + totalFramesCaught++; + } //increment offset offset += bufferSize; @@ -914,8 +890,6 @@ int slsReceiverFunctionList::startWriting(){ //increment/decrement counters framesInFile += numFramesToBeSaved; - framesCaught += numFramesToBeSaved; - totalFramesCaught += numFramesToBeSaved; numFrames -= numFramesToBeSaved; //create new file if(framesInFile >= maxFramesPerFile) @@ -925,8 +899,9 @@ int slsReceiverFunctionList::startWriting(){ } } - - copyFrameToGui(wbuf + HEADER_SIZE_NUM_FRAMES + HEADER_SIZE_NUM_PACKETS); + if(((uint8_t)(*((uint8_t*)(wbuf + HEADER_SIZE_NUM_FRAMES)))) == packetsPerFrame){ + copyFrameToGui(wbuf + HEADER_SIZE_NUM_FRAMES + HEADER_SIZE_NUM_PACKETS); + } if(!dataCompression){ @@ -1041,11 +1016,11 @@ void slsReceiverFunctionList::copyFrameToGui(char* startbuf){ pthread_mutex_lock(&dataReadyMutex); guiDataReady=0; - pthread_mutex_unlock(&dataReadyMutex); + /*pthread_mutex_unlock(&dataReadyMutex);*/ //send the first one memcpy(latestData,startbuf,bufferSize); strcpy(guiFileName,savefilename); - pthread_mutex_lock(&dataReadyMutex); + /*pthread_mutex_lock(&dataReadyMutex);*/ guiDataReady=1; pthread_mutex_unlock(&dataReadyMutex); } @@ -1059,6 +1034,7 @@ void slsReceiverFunctionList::readFrame(char* c,char** raw){ //point to gui data if (guiData == NULL) guiData = latestData; + //copy data and filename strcpy(c,guiFileName); //could not get gui data @@ -1069,6 +1045,7 @@ void slsReceiverFunctionList::readFrame(char* c,char** raw){ else{ *raw = guiData; guiData = NULL; + pthread_mutex_lock(&dataReadyMutex); guiDataReady = 0; pthread_mutex_unlock(&dataReadyMutex); diff --git a/slsDetectorSoftware/slsReceiver/slsReceiverFunctionList.h b/slsDetectorSoftware/slsReceiver/slsReceiverFunctionList.h index 97d43fe03..3152458d5 100644 --- a/slsDetectorSoftware/slsReceiver/slsReceiverFunctionList.h +++ b/slsDetectorSoftware/slsReceiver/slsReceiverFunctionList.h @@ -157,6 +157,51 @@ public: */ int stopReceiver(); + /** + * Returns the buffer-current frame read by receiver + * @param c pointer to current file name + * @param raw address of pointer, pointing to current frame to send to gui + */ + void readFrame(char* c,char** raw); + + /** + * Set short frame + * @param i if shortframe i=1 + */ + int setShortFrame(int i); + + /** set status to transmitting and + * when fifo is empty later, sets status to run_finished + */ + void startReadout(); + + /** enabl data compression, by saving only hits + */ + void enableDataCompression(bool enable){dataCompression = enable;if(filter)filter->enableCompression(enable);}; + + /** get data compression, by saving only hits + */ + bool getDataCompression(){ return dataCompression;}; + + /** + * Set the variable to send every nth frame to gui + * or if 0,send frame only upon gui request + */ + int setNFrameToGui(int i); + + /** set acquisition period if a positive number + */ + int64_t setAcquisitionPeriod(int64_t index); + + /** free fifo buffer, called back from single photon filter + */ + static void freeFifoBufferCallBack (char* fbuffer, void *this_pointer){((slsReceiverFunctionList*)this_pointer)->freeFifoBuffer(fbuffer);}; + void freeFifoBuffer(char* fbuffer){fifofree->push(fbuffer);}; + + + +private: + /** * Static function - Thread started which listens to packets. * Called by startReceiver() @@ -197,49 +242,17 @@ public: */ void copyFrameToGui(char* startbuf); - /** - * Returns the buffer-current frame read by receiver - * @param c pointer to current file name - * @param raw address of pointer, pointing to current frame to send to gui + /** set up fifo according to the new numjobsperthread */ - void readFrame(char* c,char** raw); - - /** - * Set short frame - * @param i if shortframe i=1 - */ - int setShortFrame(int i); - - /** set status to transmitting and - * when fifo is empty later, sets status to run_finished */ - void startReadout(); - - /** enabl data compression, by saving only hits */ - void enableDataCompression(bool enable){dataCompression = enable;if(filter)filter->enableCompression(enable);}; - - /** get data compression, by saving only hits */ - bool getDataCompression(){ return dataCompression;}; - - /** - * Set the variable to send every nth frame to gui - * or if 0,send frame only upon gui request - */ - int setNFrameToGui(int i); - - /** set acquisition period if a positive number */ - int64_t setAcquisitionPeriod(int64_t index); - - /** set up fifo according to the new numjobsperthread */ void setupFifoStructure (); - /** free fifo buffer, called back from single photon filter */ - static void freeFifoBufferCallBack (char* fbuffer, void *this_pointer){((slsReceiverFunctionList*)this_pointer)->freeFifoBuffer(fbuffer);}; - void freeFifoBuffer(char* fbuffer){/*cout<< "fifo freed:"<<(void*)fbuffer<push(fbuffer);}; + /** + * increment counters, pop and push fifos + */ + void processFrameForFifo(); -private: - /** detector type */ detectorType myDetectorType; @@ -393,9 +406,24 @@ private: /** guiDataReady mutex */ pthread_mutex_t dataReadyMutex; - /** Number of jobs per thread for data compression */ + /** number of jobs per thread for data compression */ int numJobsPerThread; + /** offset of current frame */ + int currentFrameOffset; + + /** offset of current packet */ + int currentPacketOffset; + + /** current packet count for current frame */ + int currentPacketCount; + + /** current frame count for current buffer */ + int currentFrameCount; + + /** total frame count the listening thread has listened to */ + int totalListeningFrameCount; + /** acquisition period */ int64_t acquisitionPeriod;