mirror of
https://github.com/slsdetectorgroup/slsDetectorPackage.git
synced 2025-06-12 21:07:13 +02:00
- framescaught and frameindex now returns a vector for each port
- progress looks at activated or enabled ports, so progress does not stagnate - (eiger) disable datastreaming also for virtual servers only for 10g - missing packets also takes care of disabled ports
This commit is contained in:
@ -847,9 +847,13 @@ int ClientInterface::get_file_index(Interface &socket) {
|
||||
}
|
||||
|
||||
int ClientInterface::get_frame_index(Interface &socket) {
|
||||
uint64_t retval = impl()->getCurrentFrameIndex();
|
||||
LOG(logDEBUG1) << "frame index:" << retval;
|
||||
return socket.sendResult(retval);
|
||||
auto retval = impl()->getCurrentFrameIndex();
|
||||
LOG(logDEBUG1) << "frames index:" << sls::ToString(retval);
|
||||
auto size = static_cast<int>(retval.size());
|
||||
socket.Send(OK);
|
||||
socket.Send(size);
|
||||
socket.Send(retval);
|
||||
return OK;
|
||||
}
|
||||
|
||||
int ClientInterface::get_missing_packets(Interface &socket) {
|
||||
@ -863,9 +867,13 @@ int ClientInterface::get_missing_packets(Interface &socket) {
|
||||
}
|
||||
|
||||
int ClientInterface::get_frames_caught(Interface &socket) {
|
||||
int64_t retval = impl()->getFramesCaught();
|
||||
LOG(logDEBUG1) << "frames caught:" << retval;
|
||||
return socket.sendResult(retval);
|
||||
auto retval = impl()->getFramesCaught();
|
||||
LOG(logDEBUG1) << "frames caught:" << sls::ToString(retval);
|
||||
auto size = static_cast<int>(retval.size());
|
||||
socket.Send(OK);
|
||||
socket.Send(size);
|
||||
socket.Send(retval);
|
||||
return OK;
|
||||
}
|
||||
|
||||
int ClientInterface::set_file_write(Interface &socket) {
|
||||
|
@ -52,19 +52,7 @@ DataProcessor::~DataProcessor() { DeleteFiles(); }
|
||||
|
||||
/** getters */
|
||||
|
||||
bool DataProcessor::GetStartedFlag() { return startedFlag_; }
|
||||
|
||||
uint64_t DataProcessor::GetNumFramesCaught() { return numFramesCaught_; }
|
||||
|
||||
uint64_t DataProcessor::GetNumCompleteFramesCaught() {
|
||||
return numCompleteFramesCaught_;
|
||||
}
|
||||
|
||||
uint64_t DataProcessor::GetCurrentFrameIndex() { return currentFrameIndex_; }
|
||||
|
||||
uint64_t DataProcessor::GetProcessedIndex() {
|
||||
return currentFrameIndex_ - firstIndex_;
|
||||
}
|
||||
bool DataProcessor::GetStartedFlag() const { return startedFlag_; }
|
||||
|
||||
void DataProcessor::SetFifo(Fifo *fifo) { fifo_ = fifo; }
|
||||
|
||||
@ -72,7 +60,6 @@ void DataProcessor::ResetParametersforNewAcquisition() {
|
||||
StopRunning();
|
||||
startedFlag_ = false;
|
||||
numFramesCaught_ = 0;
|
||||
numCompleteFramesCaught_ = 0;
|
||||
firstIndex_ = 0;
|
||||
currentFrameIndex_ = 0;
|
||||
firstStreamerFrame_ = true;
|
||||
@ -215,10 +202,9 @@ void DataProcessor::CreateVirtualFile(
|
||||
}
|
||||
virtualFile_ = new HDF5VirtualFile(hdf5Lib);
|
||||
|
||||
uint64_t numImagesProcessed = GetProcessedIndex() + 1;
|
||||
// maxframesperfile = 0 for infinite files
|
||||
uint32_t framesPerFile =
|
||||
((maxFramesPerFile == 0) ? numImagesProcessed + 1 : maxFramesPerFile);
|
||||
((maxFramesPerFile == 0) ? numFramesCaught_ : maxFramesPerFile);
|
||||
|
||||
// TODO: assumption 1: create virtual file even if no data in other
|
||||
// files (they exist anyway) assumption2: virtual file max frame index
|
||||
@ -344,9 +330,6 @@ uint64_t DataProcessor::ProcessAnImage(char *buf) {
|
||||
currentFrameIndex_ = fnum;
|
||||
numFramesCaught_++;
|
||||
uint32_t nump = header.packetNumber;
|
||||
if (nump == generalData_->packetsPerFrame) {
|
||||
numCompleteFramesCaught_++;
|
||||
}
|
||||
|
||||
LOG(logDEBUG1) << "DataProcessing " << index << ": fnum:" << fnum;
|
||||
|
||||
|
@ -36,13 +36,7 @@ class DataProcessor : private virtual slsDetectorDefs, public ThreadObject {
|
||||
|
||||
~DataProcessor() override;
|
||||
|
||||
bool GetStartedFlag();
|
||||
uint64_t GetNumFramesCaught();
|
||||
uint64_t GetNumCompleteFramesCaught();
|
||||
/** (-1 if no frames have been caught */
|
||||
uint64_t GetCurrentFrameIndex();
|
||||
/** (-1 if no frames have been caught) */
|
||||
uint64_t GetProcessedIndex();
|
||||
bool GetStartedFlag() const;
|
||||
|
||||
void SetFifo(Fifo *f);
|
||||
void ResetParametersforNewAcquisition();
|
||||
@ -178,9 +172,6 @@ class DataProcessor : private virtual slsDetectorDefs, public ThreadObject {
|
||||
/** Number of frames caught */
|
||||
uint64_t numFramesCaught_{0};
|
||||
|
||||
/** Number of complete frames caught */
|
||||
uint64_t numCompleteFramesCaught_{0};
|
||||
|
||||
/** Frame Number of latest processed frame number */
|
||||
std::atomic<uint64_t> currentFrameIndex_{0};
|
||||
|
||||
|
@ -453,51 +453,51 @@ void Implementation::setFramesPerFile(const uint32_t i) {
|
||||
* ************************************************/
|
||||
slsDetectorDefs::runStatus Implementation::getStatus() const { return status; }
|
||||
|
||||
uint64_t Implementation::getFramesCaught() const {
|
||||
uint64_t min = -1;
|
||||
uint32_t flagsum = 0;
|
||||
|
||||
for (const auto &it : dataProcessor) {
|
||||
flagsum += it->GetStartedFlag();
|
||||
min = std::min(min, it->GetNumCompleteFramesCaught());
|
||||
std::vector<int64_t> Implementation::getFramesCaught() const {
|
||||
std::vector<int64_t> numFramesCaught(numUDPInterfaces);
|
||||
int index = 0;
|
||||
for (const auto &it : listener) {
|
||||
if (it->GetStartedFlag()) {
|
||||
numFramesCaught[index] = it->GetNumCompleteFramesCaught();
|
||||
}
|
||||
++index;
|
||||
}
|
||||
// no data processed
|
||||
if (flagsum != dataProcessor.size())
|
||||
return 0;
|
||||
|
||||
return min;
|
||||
return numFramesCaught;
|
||||
}
|
||||
|
||||
uint64_t Implementation::getCurrentFrameIndex() const {
|
||||
uint64_t max = 0;
|
||||
uint32_t flagsum = 0;
|
||||
|
||||
std::vector<int64_t> Implementation::getCurrentFrameIndex() const {
|
||||
std::vector<int64_t> frameIndex(numUDPInterfaces);
|
||||
int index = 0;
|
||||
for (const auto &it : listener) {
|
||||
flagsum += it->GetStartedFlag();
|
||||
max = std::max(max, it->GetCurrentFrameIndex());
|
||||
if (it->GetStartedFlag()) {
|
||||
frameIndex[index] = it->GetCurrentFrameIndex();
|
||||
}
|
||||
++index;
|
||||
}
|
||||
// no data processed
|
||||
if (flagsum != listener.size())
|
||||
return 0;
|
||||
return max;
|
||||
return frameIndex;
|
||||
}
|
||||
|
||||
double Implementation::getProgress() const {
|
||||
// get minimum of processed frame indices
|
||||
uint64_t currentFrameIndex = 0;
|
||||
uint32_t flagsum = 0;
|
||||
if (!activated || (!detectorDataStream[0] && !detectorDataStream[1])) {
|
||||
return 100.00;
|
||||
}
|
||||
|
||||
// if disabled, considering only 1 port
|
||||
double totalFrames = (double)(numberOfTotalFrames * listener.size());
|
||||
if (!detectorDataStream[0] || !detectorDataStream[1]) {
|
||||
totalFrames /= 2;
|
||||
}
|
||||
|
||||
double progress = 0;
|
||||
int index = 0;
|
||||
for (const auto &it : listener) {
|
||||
flagsum += it->GetStartedFlag();
|
||||
currentFrameIndex = std::max(currentFrameIndex, it->GetListenedIndex());
|
||||
if (detectorDataStream[index] && it->GetStartedFlag()) {
|
||||
progress += (it->GetListenedIndex() + 1) / totalFrames;
|
||||
}
|
||||
++index;
|
||||
}
|
||||
// no data processed
|
||||
if (flagsum != listener.size()) {
|
||||
currentFrameIndex = -1;
|
||||
}
|
||||
|
||||
return (100.00 *
|
||||
((double)(currentFrameIndex + 1) / (double)numberOfTotalFrames));
|
||||
progress *= 100;
|
||||
return progress;
|
||||
}
|
||||
|
||||
std::vector<int64_t> Implementation::getNumMissingPackets() const {
|
||||
@ -627,7 +627,7 @@ void Implementation::stopReceiver() {
|
||||
// print summary
|
||||
uint64_t tot = 0;
|
||||
for (int i = 0; i < numUDPInterfaces; i++) {
|
||||
int nf = dataProcessor[i]->GetNumCompleteFramesCaught();
|
||||
int nf = listener[i]->GetNumCompleteFramesCaught();
|
||||
tot += nf;
|
||||
std::string mpMessage = std::to_string(mp[i]);
|
||||
if (mp[i] < 0) {
|
||||
|
@ -84,8 +84,8 @@ class Implementation : private virtual slsDetectorDefs {
|
||||
* *
|
||||
* ************************************************/
|
||||
runStatus getStatus() const;
|
||||
uint64_t getFramesCaught() const;
|
||||
uint64_t getCurrentFrameIndex() const;
|
||||
std::vector<int64_t> getFramesCaught() const;
|
||||
std::vector<int64_t> getCurrentFrameIndex() const;
|
||||
double getProgress() const;
|
||||
std::vector<int64_t> getNumMissingPackets() const;
|
||||
void setScan(slsDetectorDefs::scanParameters s);
|
||||
|
@ -36,12 +36,22 @@ Listener::~Listener() = default;
|
||||
|
||||
uint64_t Listener::GetPacketsCaught() const { return numPacketsCaught; }
|
||||
|
||||
uint64_t Listener::GetNumCompleteFramesCaught() const {
|
||||
return numCompleteFramesCaught;
|
||||
}
|
||||
|
||||
uint64_t Listener::GetLastFrameIndexCaught() const {
|
||||
return lastCaughtFrameIndex;
|
||||
}
|
||||
|
||||
int64_t Listener::GetNumMissingPacket(bool stoppedFlag,
|
||||
uint64_t numPackets) const {
|
||||
if (!activated) {
|
||||
return 0;
|
||||
}
|
||||
if (!(*detectorDataStream)) {
|
||||
return 0;
|
||||
}
|
||||
if (!stoppedFlag) {
|
||||
return (numPackets - numPacketsCaught);
|
||||
}
|
||||
@ -53,11 +63,11 @@ int64_t Listener::GetNumMissingPacket(bool stoppedFlag,
|
||||
numPacketsCaught;
|
||||
}
|
||||
|
||||
bool Listener::GetStartedFlag() { return startedFlag; }
|
||||
bool Listener::GetStartedFlag() const { return startedFlag; }
|
||||
|
||||
uint64_t Listener::GetCurrentFrameIndex() { return lastCaughtFrameIndex; }
|
||||
uint64_t Listener::GetCurrentFrameIndex() const { return lastCaughtFrameIndex; }
|
||||
|
||||
uint64_t Listener::GetListenedIndex() {
|
||||
uint64_t Listener::GetListenedIndex() const {
|
||||
return lastCaughtFrameIndex - firstIndex;
|
||||
}
|
||||
|
||||
@ -67,6 +77,7 @@ void Listener::ResetParametersforNewAcquisition() {
|
||||
StopRunning();
|
||||
startedFlag = false;
|
||||
numPacketsCaught = 0;
|
||||
numCompleteFramesCaught = 0;
|
||||
firstIndex = 0;
|
||||
currentFrameIndex = 0;
|
||||
lastCaughtFrameIndex = 0;
|
||||
@ -607,6 +618,9 @@ uint32_t Listener::ListenToAnImage(char *buf) {
|
||||
// complete image
|
||||
new_header->detHeader.packetNumber = numpackets; // number of packets caught
|
||||
new_header->detHeader.frameNumber = currentFrameIndex;
|
||||
if (numpackets == pperFrame) {
|
||||
++numCompleteFramesCaught;
|
||||
}
|
||||
++currentFrameIndex;
|
||||
return imageSize;
|
||||
}
|
||||
|
@ -51,53 +51,19 @@ class Listener : private virtual slsDetectorDefs, public ThreadObject {
|
||||
*/
|
||||
~Listener();
|
||||
|
||||
/**
|
||||
* Get Packets caught
|
||||
* @return Packets caught
|
||||
*/
|
||||
uint64_t GetPacketsCaught() const;
|
||||
|
||||
/**
|
||||
* Get Last Frame index caught
|
||||
* @return last frame index caught
|
||||
*/
|
||||
uint64_t GetNumCompleteFramesCaught() const;
|
||||
uint64_t GetLastFrameIndexCaught() const;
|
||||
|
||||
/** Get number of missing packets, returns negative values in case to extra
|
||||
* packet */
|
||||
/** negative values in case of extra packets */
|
||||
int64_t GetNumMissingPacket(bool stoppedFlag, uint64_t numPackets) const;
|
||||
bool GetStartedFlag() const;
|
||||
uint64_t GetCurrentFrameIndex() const;
|
||||
uint64_t GetListenedIndex() const;
|
||||
|
||||
bool GetStartedFlag();
|
||||
|
||||
uint64_t GetCurrentFrameIndex();
|
||||
/** (-1 if no frames have been caught) */
|
||||
uint64_t GetListenedIndex();
|
||||
|
||||
/**
|
||||
* Set Fifo pointer to the one given
|
||||
* @param f address of Fifo pointer
|
||||
*/
|
||||
void SetFifo(Fifo *f);
|
||||
|
||||
/**
|
||||
* Reset parameters for new acquisition
|
||||
*/
|
||||
void ResetParametersforNewAcquisition();
|
||||
|
||||
/**
|
||||
* Set GeneralData pointer to the one given
|
||||
* @param g address of GeneralData (Detector Data) pointer
|
||||
*/
|
||||
void SetGeneralData(GeneralData *g);
|
||||
|
||||
/**
|
||||
* Creates UDP Sockets
|
||||
*/
|
||||
void CreateUDPSockets();
|
||||
|
||||
/**
|
||||
* Shuts down and deletes UDP Sockets
|
||||
*/
|
||||
void ShutDownUDPSocket();
|
||||
|
||||
/**
|
||||
@ -116,10 +82,6 @@ class Listener : private virtual slsDetectorDefs, public ThreadObject {
|
||||
void SetHardCodedPosition(uint16_t r, uint16_t c);
|
||||
|
||||
private:
|
||||
/**
|
||||
* Record First Acquisition Index
|
||||
* @param fnum frame index to record
|
||||
*/
|
||||
void RecordFirstIndex(uint64_t fnum);
|
||||
|
||||
/**
|
||||
@ -146,55 +108,25 @@ class Listener : private virtual slsDetectorDefs, public ThreadObject {
|
||||
*/
|
||||
uint32_t ListenToAnImage(char *buf);
|
||||
|
||||
/**
|
||||
* Print Fifo Statistics
|
||||
*/
|
||||
void PrintFifoStatistics();
|
||||
|
||||
/** type of thread */
|
||||
static const std::string TypeName;
|
||||
|
||||
/** GeneralData (Detector Data) object */
|
||||
GeneralData *generalData{nullptr};
|
||||
|
||||
/** Fifo structure */
|
||||
Fifo *fifo;
|
||||
|
||||
// individual members
|
||||
/** Detector Type */
|
||||
detectorType myDetectorType;
|
||||
|
||||
/** Receiver Status */
|
||||
std::atomic<runStatus> *status;
|
||||
|
||||
/** UDP Socket - Detector to Receiver */
|
||||
std::unique_ptr<sls::UdpRxSocket> udpSocket{nullptr};
|
||||
|
||||
/** UDP Port Number */
|
||||
uint32_t *udpPortNumber;
|
||||
|
||||
/** ethernet interface */
|
||||
std::string *eth;
|
||||
|
||||
/** UDP Socket Buffer Size */
|
||||
int *udpSocketBufferSize;
|
||||
|
||||
/** actual UDP Socket Buffer Size (double due to kernel bookkeeping) */
|
||||
/** double due to kernel bookkeeping */
|
||||
int *actualUDPSocketBufferSize;
|
||||
|
||||
/** frames per file */
|
||||
uint32_t *framesPerFile;
|
||||
|
||||
/** frame discard policy */
|
||||
frameDiscardPolicy *frameDiscardMode;
|
||||
|
||||
/** Activated/Deactivated */
|
||||
bool *activated;
|
||||
|
||||
/** detector data stream */
|
||||
bool *detectorDataStream;
|
||||
|
||||
/** Silent Mode */
|
||||
bool *silentMode;
|
||||
|
||||
/** row hardcoded as 1D or 2d,
|
||||
@ -209,15 +141,12 @@ class Listener : private virtual slsDetectorDefs, public ThreadObject {
|
||||
// acquisition start
|
||||
/** Aquisition Started flag */
|
||||
std::atomic<bool> startedFlag{false};
|
||||
|
||||
/** Frame Number of First Frame */
|
||||
uint64_t firstIndex{0};
|
||||
|
||||
// for acquisition summary
|
||||
/** Number of complete Packets caught */
|
||||
std::atomic<uint64_t> numPacketsCaught{0};
|
||||
|
||||
/** Last Frame Index caught from udp network */
|
||||
std::atomic<uint64_t> numCompleteFramesCaught{0};
|
||||
std::atomic<uint64_t> lastCaughtFrameIndex{0};
|
||||
|
||||
// parameters to acquire image
|
||||
@ -225,25 +154,16 @@ class Listener : private virtual slsDetectorDefs, public ThreadObject {
|
||||
* ( always check startedFlag for validity first)
|
||||
*/
|
||||
uint64_t currentFrameIndex{0};
|
||||
|
||||
/** True if there is a packet carry over from previous Image */
|
||||
bool carryOverFlag{false};
|
||||
|
||||
/** Carry over packet buffer */
|
||||
std::unique_ptr<char[]> carryOverPacket;
|
||||
|
||||
/** Listening buffer for one packet - might be removed when we can peek and
|
||||
* eiger fnum is in header */
|
||||
std::unique_ptr<char[]> listeningPacket;
|
||||
|
||||
/** if the udp socket is connected */
|
||||
std::atomic<bool> udpSocketAlive{false};
|
||||
|
||||
// for print progress during acquisition
|
||||
/** number of packets for statistic */
|
||||
// for print progress during acquisition*/
|
||||
uint32_t numPacketsStatistic{0};
|
||||
|
||||
/** number of images for statistic */
|
||||
uint32_t numFramesStatistic{0};
|
||||
|
||||
/**
|
||||
|
Reference in New Issue
Block a user