Files
Jungfraujoch/receiver/JFJochReceiver.h
Filip Leonarski 64002f1e29
Some checks failed
Build Packages / build:rpm (rocky8_nocuda) (push) Successful in 11m14s
Build Packages / build:rpm (ubuntu2204_nocuda) (push) Successful in 10m43s
Build Packages / build:rpm (rocky9_nocuda) (push) Successful in 11m35s
Build Packages / build:rpm (ubuntu2404_nocuda) (push) Successful in 9m20s
Build Packages / build:rpm (rocky8_sls9) (push) Successful in 10m23s
Build Packages / Generate python client (push) Successful in 39s
Build Packages / build:rpm (rocky9_sls9) (push) Successful in 11m24s
Build Packages / Create release (push) Has been skipped
Build Packages / Build documentation (push) Successful in 1m0s
Build Packages / build:rpm (rocky8) (push) Successful in 10m35s
Build Packages / build:rpm (ubuntu2204) (push) Successful in 10m35s
Build Packages / build:rpm (rocky9) (push) Successful in 11m17s
Build Packages / build:rpm (ubuntu2404) (push) Successful in 9m9s
Build Packages / Unit tests (push) Failing after 1h18m57s
v1.0.0-rc.129 (#36)
This is an UNSTABLE release. The release has significant modifications and bug fixes, if things go wrong, it is better to revert to 1.0.0-rc.124.

* jfjoch_broker: Significant improvements in TCP image socket, as a viable alternative for ZeroMQ sockets (only a single port on broker side, dynamically change number of writers, acknowledgments for written files)
* jfjoch_broker: Delta phi is calculated also for still data in Bragg prediction
* jfjoch_broker: Image pusher statistics are accessible via the REST interface
* jfjoch_writer: Supports TCP image socket and for these auto-forking option

Reviewed-on: #36
Co-authored-by: Filip Leonarski <filip.leonarski@psi.ch>
Co-committed-by: Filip Leonarski <filip.leonarski@psi.ch>
2026-03-05 22:13:12 +01:00

130 lines
4.1 KiB
C++

// SPDX-FileCopyrightText: 2024 Filip Leonarski, Paul Scherrer Institute <filip.leonarski@psi.ch>
// SPDX-License-Identifier: GPL-3.0-only
#ifndef JFJOCH_JFJOCHRECEIVER_H
#define JFJOCH_JFJOCHRECEIVER_H
#include <mutex>
#include <atomic>
#include <chrono>
#include "../common/DiffractionExperiment.h"
#include "../image_analysis/spot_finding/SpotFindingSettings.h"
#include "JFJochReceiverOutput.h"
#include "../image_puller/ZMQImagePuller.h"
#include "../common/ImageBuffer.h"
#include "../image_pusher/ImagePusher.h"
#include "../preview/ZMQPreviewSocket.h"
#include "../preview/ZMQMetadataSocket.h"
#include "../preview/PreviewImage.h"
#include "JFJochReceiverPlots.h"
#include "LossyFilter.h"
#include "../common/MovingAverage.h"
#include "../common/NUMAHWPolicy.h"
#include "../common/ScanResultGenerator.h"
#include "../image_analysis/indexing/IndexerThreadPool.h"
#include "../image_analysis/IndexAndRefine.h"
class JFJochReceiver {
protected:
Logger &logger;
DiffractionExperiment experiment;
SpotFindingSettings spot_finding_settings;
std::mutex spot_finding_settings_mutex;
SpotFindingSettings GetSpotFindingSettings();
std::atomic<bool> writer_queue_full{false};
std::atomic<bool> cancelled{false};
bool push_images_to_writer;
std::atomic<size_t> compressed_size{0};
std::atomic<uint64_t> uncompressed_size{0};
std::atomic<size_t> images_sent{0};
std::atomic<size_t> images_collected{0};
std::atomic<size_t> images_skipped{0};
std::chrono::time_point<std::chrono::system_clock> start_time;
std::chrono::time_point<std::chrono::system_clock> end_time;
int64_t max_image_number_sent = 0;
std::mutex max_image_number_sent_mutex;
void UpdateMaxImageSent(int64_t image_number);
int64_t max_image_number_received = 0;
std::mutex max_image_number_received_mutex;
void UpdateMaxImageReceived(int64_t image_number);
ImageBuffer &image_buffer;
ImagePusher &image_pusher;
PreviewImage &preview_image;
ZMQPreviewSocket *zmq_preview_socket;
ZMQMetadataSocket *zmq_metadata_socket;
JFJochReceiverCurrentStatus &current_status;
JFJochReceiverPlots &plots;
ScanResultGenerator scan_result;
LossyFilter serialmx_filter;
MovingAverage saturated_pixels{1000};
MovingAverage error_pixels{1000};
MovingAverage roi_beam_npixel{1000};
MovingAverage roi_beam_sum{1000};
std::optional<int64_t> images_written;
NUMAHWPolicy numa_policy;
std::vector<std::unique_ptr<ADUHistogram>> adu_histogram_module;
PixelMask pixel_mask;
AzimuthalIntegration az_int_mapping;
std::optional<uint64_t> max_delay;
std::mutex max_delay_mutex;
IndexAndRefine indexer;
std::optional<CrystalLattice> rotation_indexing_lattice;
std::string writer_error;
void UpdateMaxDelay(uint64_t delay);
JFJochReceiverStatus GetStatus() const;
void SendStartMessage();
void SendEndMessage();
void SaveStartMessageToImageBuffer(const StartMessage &msg);
virtual void Cancel(const JFJochException &e);
public:
JFJochReceiver(const DiffractionExperiment& experiment,
ImageBuffer &image_buffer,
ImagePusher &image_pusher,
PreviewImage &preview_image,
JFJochReceiverCurrentStatus &current_status,
JFJochReceiverPlots &plots,
const SpotFindingSettings &spot_finding_settings,
Logger &logger,
const NUMAHWPolicy &numa_policy,
const PixelMask &pixel_mask,
ZMQPreviewSocket *zmq_preview_socket = nullptr,
ZMQMetadataSocket *zmq_metadata_socket = nullptr,
IndexerThreadPool *indexing_thread_pool = nullptr);
virtual ~JFJochReceiver();
virtual void StopReceiver() = 0;
virtual float GetEfficiency() const = 0;
virtual void Cancel(bool silent);
virtual float GetProgress() const = 0;
virtual void SetSpotFindingSettings(const SpotFindingSettings &spot_finding_settings) = 0;
virtual JFJochReceiverOutput GetFinalStatistics() const;
};
#endif //JFJOCH_JFJOCHRECEIVER_H