removed unnecessary functions in listener and dataprocessor and ensure processedin measurement is not same saas processed frames in acqustion

This commit is contained in:
Dhanya Maliakal 2017-05-17 15:37:51 +02:00
parent 001d6415bf
commit cb008bb700
5 changed files with 85 additions and 68 deletions

View File

@ -93,6 +93,11 @@ class DataProcessor : private virtual slsReceiverDefs, public ThreadObject {
*/ */
uint64_t GetProcessedAcquisitionIndex(); uint64_t GetProcessedAcquisitionIndex();
/**
* Get Current Frame Index thats been processed for each real time acquisition (eg. for each scan)
* @return -1 if no frames have been caught, else current frame index
*/
uint64_t GetProcessedMeasurementIndex();
//*** setters *** //*** setters ***
/** /**
@ -250,33 +255,14 @@ class DataProcessor : private virtual slsReceiverDefs, public ThreadObject {
/** Fifo structure */ /** Fifo structure */
Fifo* fifo; Fifo* fifo;
/** Data Stream Enable */
bool* dataStreamEnable;
/** Aquisition Started flag */
bool acquisitionStartedFlag;
/** Measurement Started flag */
bool measurementStartedFlag;
/**Number of complete frames caught for an entire acquisition (including all scans) */
uint64_t numTotalFramesCaught;
/** Number of complete frames caught for each real time acquisition (eg. for each scan) */
uint64_t numFramesCaught;
/** Frame Number of First Frame of an entire Acquisition (including all scans) */
uint64_t firstAcquisitionIndex;
/** Frame Number of First Frame for each real time acquisition (eg. for each scan) */
uint64_t firstMeasurementIndex;
/** Frame Number of latest processed frame number of an entire Acquisition (including all scans) */
uint64_t currentFrameIndex;
//individual members
/** File writer implemented as binary or hdf5 File */ /** File writer implemented as binary or hdf5 File */
File* file; File* file;
/** Data Stream Enable */
bool* dataStreamEnable;
/** File Format Type */ /** File Format Type */
fileFormat* fileFormatType; fileFormat* fileFormatType;
@ -284,6 +270,35 @@ class DataProcessor : private virtual slsReceiverDefs, public ThreadObject {
bool* fileWriteEnable; bool* fileWriteEnable;
//acquisition start
/** Aquisition Started flag */
bool acquisitionStartedFlag;
/** Measurement Started flag */
bool measurementStartedFlag;
/** Frame Number of First Frame of an entire Acquisition (including all scans) */
uint64_t firstAcquisitionIndex;
/** Frame Number of First Frame for each real time acquisition (eg. for each scan) */
uint64_t firstMeasurementIndex;
//for statistics
/**Number of complete frames caught for an entire acquisition (including all scans) */
uint64_t numTotalFramesCaught;
/** Number of complete frames caught for each real time acquisition (eg. for each scan) */
uint64_t numFramesCaught;
/** Frame Number of latest processed frame number of an entire Acquisition (including all scans) */
uint64_t currentFrameIndex;
//call back
/** /**
* Call back for raw data * Call back for raw data
* args to raw data ready callback are * args to raw data ready callback are

View File

@ -73,10 +73,10 @@ class Listener : private virtual slsReceiverDefs, public ThreadObject {
bool GetMeasurementStartedFlag(); bool GetMeasurementStartedFlag();
/** /**
* Get Total Packets caught in an acquisition * Get Packets caught in a real time acquisition (start and stop of receiver)
* @return Total Packets caught in an acquisition * @return Packets caught in a real time acquisition
*/ */
uint64_t GetTotalPacketsCaught(); uint64_t GetPacketsCaught();
/** /**
* Get Last Frame index caught * Get Last Frame index caught
@ -217,12 +217,6 @@ class Listener : private virtual slsReceiverDefs, public ThreadObject {
/** Detector Type */ /** Detector Type */
detectorType myDetectorType; detectorType myDetectorType;
/** Aquisition Started flag */
bool acquisitionStartedFlag;
/** Measurement Started flag */
bool measurementStartedFlag;
/** Receiver Status */ /** Receiver Status */
runStatus* status; runStatus* status;
@ -244,11 +238,13 @@ class Listener : private virtual slsReceiverDefs, public ThreadObject {
/** Dynamic Range */ /** Dynamic Range */
uint32_t* dynamicRange; uint32_t* dynamicRange;
/**Number of complete Packets caught for an entire acquisition (including all scans) */
volatile uint64_t numTotalPacketsCaught;
/** Number of complete Packets caught for each real time acquisition (eg. for each scan) */ // acquisition start
volatile uint64_t numPacketsCaught; /** Aquisition Started flag */
bool acquisitionStartedFlag;
/** Measurement Started flag */
bool measurementStartedFlag;
/** Frame Number of First Frame of an entire Acquisition (including all scans) */ /** Frame Number of First Frame of an entire Acquisition (including all scans) */
uint64_t firstAcquisitionIndex; uint64_t firstAcquisitionIndex;
@ -256,14 +252,21 @@ class Listener : private virtual slsReceiverDefs, public ThreadObject {
/** Frame Number of First Frame for each real time acquisition (eg. for each scan) */ /** Frame Number of First Frame for each real time acquisition (eg. for each scan) */
uint64_t firstMeasurementIndex; uint64_t firstMeasurementIndex;
// for statistics
/** Number of complete Packets caught for each real time acquisition (eg. for each scan (start& stop of receiver)) */
volatile uint64_t numPacketsCaught;
/** Last Frame Index caught from udp network */
uint64_t lastCaughtFrameIndex;
// parameters to acquire image
/** Current Frame Index, default value is 0 /** Current Frame Index, default value is 0
* ( always check acquisitionStartedFlag for validity first) * ( always check acquisitionStartedFlag for validity first)
*/ */
uint64_t currentFrameIndex; uint64_t currentFrameIndex;
/** Last Frame Index caught from udp network */
uint64_t lastCaughtFrameIndex;
/** True if there is a packet carry over from previous Image */ /** True if there is a packet carry over from previous Image */
bool carryOverFlag; bool carryOverFlag;

View File

@ -39,17 +39,17 @@ DataProcessor::DataProcessor(Fifo*& f, fileFormat* ftype, bool* fwenable, bool*
ThreadObject(NumberofDataProcessors), ThreadObject(NumberofDataProcessors),
generalData(0), generalData(0),
fifo(f), fifo(f),
dataStreamEnable(dsEnable),
acquisitionStartedFlag(false),
measurementStartedFlag(false),
numTotalFramesCaught(0),
numFramesCaught(0),
firstAcquisitionIndex(0),
firstMeasurementIndex(0),
currentFrameIndex(0),
file(0), file(0),
dataStreamEnable(dsEnable),
fileFormatType(ftype), fileFormatType(ftype),
fileWriteEnable(fwenable), fileWriteEnable(fwenable),
acquisitionStartedFlag(false),
measurementStartedFlag(false),
firstAcquisitionIndex(0),
firstMeasurementIndex(0),
numTotalFramesCaught(0),
numFramesCaught(0),
currentFrameIndex(0),
rawDataReadyCallBack(dataReadycb), rawDataReadyCallBack(dataReadycb),
pRawDataReady(pDataReadycb) pRawDataReady(pDataReadycb)
{ {
@ -114,6 +114,9 @@ uint64_t DataProcessor::GetProcessedAcquisitionIndex() {
return currentFrameIndex - firstAcquisitionIndex; return currentFrameIndex - firstAcquisitionIndex;
} }
uint64_t DataProcessor::GetProcessedMeasurementIndex() {
return currentFrameIndex - firstMeasurementIndex;
}

View File

@ -32,8 +32,6 @@ Listener::Listener(detectorType dtype, Fifo*& f, runStatus* s, uint32_t* portno,
generalData(0), generalData(0),
fifo(f), fifo(f),
myDetectorType(dtype), myDetectorType(dtype),
acquisitionStartedFlag(false),
measurementStartedFlag(false),
status(s), status(s),
udpSocket(0), udpSocket(0),
udpPortNumber(portno), udpPortNumber(portno),
@ -41,12 +39,13 @@ Listener::Listener(detectorType dtype, Fifo*& f, runStatus* s, uint32_t* portno,
activated(act), activated(act),
numImages(nf), numImages(nf),
dynamicRange(dr), dynamicRange(dr),
numTotalPacketsCaught(0), acquisitionStartedFlag(false),
numPacketsCaught(0), measurementStartedFlag(false),
firstAcquisitionIndex(0), firstAcquisitionIndex(0),
firstMeasurementIndex(0), firstMeasurementIndex(0),
currentFrameIndex(0), numPacketsCaught(0),
lastCaughtFrameIndex(0), lastCaughtFrameIndex(0),
currentFrameIndex(0),
carryOverFlag(0), carryOverFlag(0),
carryOverPacket(0), carryOverPacket(0),
listeningPacket(0) listeningPacket(0)
@ -103,8 +102,8 @@ bool Listener::GetMeasurementStartedFlag(){
return measurementStartedFlag; return measurementStartedFlag;
} }
uint64_t Listener::GetTotalPacketsCaught() { uint64_t Listener::GetPacketsCaught() {
return numTotalPacketsCaught; return numPacketsCaught;
} }
uint64_t Listener::GetLastFrameIndexCaught() { uint64_t Listener::GetLastFrameIndexCaught() {
@ -133,7 +132,6 @@ void Listener::SetFifo(Fifo*& f) {
void Listener::ResetParametersforNewAcquisition() { void Listener::ResetParametersforNewAcquisition() {
acquisitionStartedFlag = false; acquisitionStartedFlag = false;
numTotalPacketsCaught = 0;
firstAcquisitionIndex = 0; firstAcquisitionIndex = 0;
currentFrameIndex = 0; currentFrameIndex = 0;
lastCaughtFrameIndex = 0; lastCaughtFrameIndex = 0;
@ -387,7 +385,6 @@ uint32_t Listener::ListenToAnImage(char* buf) {
//update parameters //update parameters
numPacketsCaught++; //record immediately to get more time before socket shutdown numPacketsCaught++; //record immediately to get more time before socket shutdown
numTotalPacketsCaught++;
// -------------------------- new header ---------------------------------------------------------------------- // -------------------------- new header ----------------------------------------------------------------------
if (myDetectorType == JUNGFRAU) { if (myDetectorType == JUNGFRAU) {
@ -463,7 +460,6 @@ uint32_t Listener::CreateAnImage(char* buf) {
//update parameters //update parameters
numPacketsCaught++; //record immediately to get more time before socket shutdown numPacketsCaught++; //record immediately to get more time before socket shutdown
numTotalPacketsCaught++;
//reset data to -1 //reset data to -1
memset(buf, 0xFF, generalData->dataSize); memset(buf, 0xFF, generalData->dataSize);

View File

@ -492,12 +492,12 @@ void UDPStandardImplementation::stopReceiver(){
//create virtual file //create virtual file
if (fileWriteEnable && fileFormatType == HDF5) { if (fileWriteEnable && fileFormatType == HDF5) {
uint64_t maxFramescaught = 0; uint64_t maxIndexCaught = 0;
for (vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it) { for (vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it) {
maxFramescaught = max(maxFramescaught, (*it)->GetNumFramesCaught()); maxIndexCaught = max(maxIndexCaught, (*it)->GetProcessedMeasurementIndex());
} }
if (maxFramescaught) if (maxIndexCaught)
dataProcessor[0]->EndofAcquisition(maxFramescaught); dataProcessor[0]->EndofAcquisition(maxIndexCaught); //to create virtual file
} }
while(DataStreamer::GetRunningMask()){ while(DataStreamer::GetRunningMask()){
@ -515,7 +515,7 @@ void UDPStandardImplementation::stopReceiver(){
for (int i = 0; i < numThreads; i++) { for (int i = 0; i < numThreads; i++) {
tot += dataProcessor[i]->GetNumFramesCaught(); tot += dataProcessor[i]->GetNumFramesCaught();
uint64_t missingpackets = numberOfFrames*generalData->packetsPerFrame-listener[i]->GetTotalPacketsCaught(); uint64_t missingpackets = numberOfFrames*generalData->packetsPerFrame-listener[i]->GetPacketsCaught();
if (missingpackets) { if (missingpackets) {
cprintf(RED, "\n[Port %d]\n",udpPortNum[i]); cprintf(RED, "\n[Port %d]\n",udpPortNum[i]);
cprintf(RED, "Missing Packets\t\t: %lld\n",(long long int)missingpackets); cprintf(RED, "Missing Packets\t\t: %lld\n",(long long int)missingpackets);
@ -555,7 +555,7 @@ void UDPStandardImplementation::startReadout(){
//current packets caught //current packets caught
volatile int totalP = 0,prev=-1; volatile int totalP = 0,prev=-1;
for (vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it) for (vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it)
totalP += (*it)->GetTotalPacketsCaught(); totalP += (*it)->GetPacketsCaught();
//wait for all packets //wait for all packets
if((unsigned long long int)totalP!=numberOfFrames*generalData->packetsPerFrame*listener.size()){ if((unsigned long long int)totalP!=numberOfFrames*generalData->packetsPerFrame*listener.size()){
@ -574,7 +574,7 @@ void UDPStandardImplementation::startReadout(){
totalP = 0; totalP = 0;
for (vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it) for (vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it)
totalP += (*it)->GetTotalPacketsCaught(); totalP += (*it)->GetPacketsCaught();
#ifdef VERY_VERBOSE #ifdef VERY_VERBOSE
cprintf(MAGENTA,"\tupdated: totalP:%d\n",totalP); cprintf(MAGENTA,"\tupdated: totalP:%d\n",totalP);
#endif #endif
@ -602,13 +602,13 @@ void UDPStandardImplementation::shutDownUDPSockets() {
void UDPStandardImplementation::closeFiles() { void UDPStandardImplementation::closeFiles() {
uint64_t maxFramescaught = 0; uint64_t maxIndexCaught = 0;
for (vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it) { for (vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it) {
(*it)->CloseFiles(); (*it)->CloseFiles();
maxFramescaught = max(maxFramescaught, (*it)->GetNumFramesCaught()); maxIndexCaught = max(maxIndexCaught, (*it)->GetProcessedMeasurementIndex());
} }
if (maxFramescaught) if (maxIndexCaught)
dataProcessor[0]->EndofAcquisition(maxFramescaught); dataProcessor[0]->EndofAcquisition(maxIndexCaught);
} }