Files
Jungfraujoch/broker/JFJochServices.cpp
T
leonarski_f 239a441ee6
Build Packages / Unit tests (push) Successful in 1h20m34s
Build Packages / build:rpm (rocky8) (push) Successful in 13m32s
Build Packages / Generate python client (push) Successful in 24s
Build Packages / build:rpm (ubuntu2404_nocuda) (push) Successful in 13m6s
Build Packages / build:rpm (rocky9_sls9) (push) Successful in 11m32s
Build Packages / XDS test (durin plugin) (push) Successful in 10m49s
Build Packages / build:rpm (ubuntu2404) (push) Successful in 14m8s
Build Packages / DIALS test (push) Successful in 14m57s
Build Packages / Build documentation (push) Successful in 47s
Build Packages / build:rpm (rocky8_nocuda) (push) Successful in 13m30s
Build Packages / build:rpm (rocky9_nocuda) (push) Successful in 14m23s
Build Packages / build:rpm (rocky8_sls9) (push) Successful in 14m40s
Build Packages / Create release (push) Has been skipped
Build Packages / build:rpm (ubuntu2204_nocuda) (push) Successful in 13m14s
Build Packages / build:rpm (ubuntu2204) (push) Successful in 11m55s
Build Packages / build:rpm (rocky9) (push) Successful in 14m23s
Build Packages / XDS test (JFJoch plugin) (push) Successful in 9m48s
Build Packages / XDS test (neggia plugin) (push) Successful in 7m10s
v1.0.0-rc.140 (#50)
This is an UNSTABLE release. The release has significant modifications and bug fixes, if things go wrong, it is better to revert to 1.0.0-rc.132.

* jfjoch_broker: For DECTRIS detectors, ZeroMQ link is persistent, to save time for establishing new connection
* jfjoch_broker: Minor bug fixes for rare conditions

Reviewed-on: #50
2026-04-29 21:40:22 +02:00

314 lines
10 KiB
C++

// SPDX-FileCopyrightText: 2024 Filip Leonarski, Paul Scherrer Institute <filip.leonarski@psi.ch>
// SPDX-License-Identifier: GPL-3.0-only
#include "JFJochServices.h"
#include "../common/JFJochException.h"
#include "../detector_control/SLSDetectorWrapper.h"
#include "../detector_control/DectrisDetectorWrapper.h"
JFJochServices::JFJochServices(Logger &in_logger) : logger(in_logger) {}
void JFJochServices::Start(const DiffractionExperiment& experiment,
const PixelMask &pixel_mask,
const JFCalibration *calibration) {
logger.Info("Measurement start for: {}", experiment.GetFilePrefix());
cannot_stop_detector = false;
if (receiver != nullptr) {
logger.Info(" ... receiver start");
if (image_puller)
image_puller->ResumeAndClear();
receiver->Start(experiment, pixel_mask, calibration, image_puller);
{
std::shared_lock ul(detector_mutex);
if (detector && !experiment.IsUsingInternalPacketGen()) {
logger.Info(" ... detector start");
detector->Start(experiment);
}
}
}
logger.Info(" Done!");
}
void JFJochServices::Off() {
image_puller.reset();
std::unique_ptr<DetectorWrapper> old_detector;
{
std::unique_lock ul(detector_mutex);
old_detector = std::move(detector);
}
if (old_detector) {
old_detector->Deactivate();
// destroyed here, outside lock
}
}
void JFJochServices::On(DiffractionExperiment &x) {
if (x.IsUsingInternalPacketGen() || (receiver == nullptr)) {
std::unique_lock ul(detector_mutex);
detector.reset();
} else {
logger.Info("Detector on");
std::unique_ptr<DetectorWrapper> new_detector;
switch (x.GetDetectorType()) {
case DetectorType::EIGER:
case DetectorType::JUNGFRAU:
new_detector = std::make_unique<SLSDetectorWrapper>();
image_puller.reset();
break;
case DetectorType::DECTRIS:
new_detector = std::make_unique<DectrisDetectorWrapper>();
image_puller = std::make_shared<ZMQImagePuller>(x.GetDetectorSetup().GetDECTRISStream2Addr());
image_puller->Suspend();
break;
}
new_detector->Initialize(x, receiver->GetNetworkConfig());
{
std::unique_lock ul(detector_mutex);
detector = std::move(new_detector);
}
logger.Info(" ... done");
}
}
JFJochServicesOutput JFJochServices::Stop() {
JFJochServicesOutput ret;
std::unique_ptr<JFJochException> exception;
bool detector_error = false;
if (receiver != nullptr) {
try {
{
std::shared_lock ul(detector_mutex);
if (detector) {
logger.Info("Wait for detector idle");
DetectorState state = detector->GetState();
while ((!cannot_stop_detector)
&& ((state == DetectorState::WAITING) || (state == DetectorState::BUSY))) {
// check detector state every 5 ms
std::this_thread::sleep_for(std::chrono::milliseconds(5));
state = detector->GetState();
}
if (state == DetectorState::IDLE) {
logger.Info(" ... detector idle");
receiver->Cancel(true); // cancel silently
} else {
logger.Error(" ... detector in error state");
receiver->Cancel(false);
detector_error = true;
}
}
}
logger.Info("Wait for receiver done");
ret.receiver_output = receiver->Stop();
if (image_puller)
image_puller->Suspend();
logger.Info(" ... Receiver efficiency: {} % Max delay: {} Compression ratio {}x",
static_cast<int>(ret.receiver_output.efficiency * 100.0),
ret.receiver_output.status.max_receive_delay.value_or(0),
static_cast<int>(std::round(ret.receiver_output.status.compressed_ratio.value_or(1))));
if (ret.receiver_output.efficiency < 1.0) {
for (int i = 0; i < ret.receiver_output.received_packets.size(); i++) {
if (ret.receiver_output.received_packets[i] != ret.receiver_output.expected_packets[i])
logger.Info(" ... Module: {} Packets received: {} out of {}", i,
ret.receiver_output.received_packets[i], ret.receiver_output.expected_packets[i]);
}
}
logger.Info(" ... finished with success");
} catch (const JFJochException &e) {
logger.Error(" ... finished with error {}", e.what());
exception = std::make_unique<JFJochException>(e);
}
} else {
logger.Info("No receiver - sleeping for 30 seconds");
std::this_thread::sleep_for(std::chrono::seconds(30));
logger.Info("Sleep done");
}
if (exception)
throw JFJochException(*exception);
if (detector_error)
throw JFJochException(JFJochExceptionCategory::Detector, "Error in detector operation");
return ret;
}
void JFJochServices::Cancel() {
std::shared_lock ul(detector_mutex);
if (detector) {
// Best effort - if detector cannot be stopped, this is OK, important to still stop receiver
try {
detector->Stop();
} catch (...) {
cannot_stop_detector = true;
}
}
if (receiver != nullptr)
receiver->Cancel(false);
}
JFJochServices &JFJochServices::Receiver(JFJochReceiverService *input) {
receiver = input;
return *this;
}
std::optional<JFJochReceiverStatus> JFJochServices::GetReceiverStatus() const {
if (receiver == nullptr)
return {};
return receiver->GetStatus();
}
std::optional<float> JFJochServices::GetReceiverProgress() const {
if (receiver == nullptr)
return {};
return receiver->GetProgress();
}
MultiLinePlot JFJochServices::GetPlots(const PlotRequest &request) {
if (receiver == nullptr)
return {};
return receiver->GetDataProcessingPlot(request);
}
void JFJochServices::GetPlotRaw(std::vector<float> &v, PlotType type, const std::string &roi) {
if (receiver != nullptr)
receiver->GetPlotRaw(v, type, roi);
}
void JFJochServices::SetSpotFindingSettings(const SpotFindingSettings &settings) {
if (receiver)
receiver->SetSpotFindingSettings(settings);
}
void JFJochServices::Trigger() {
std::shared_lock ul(detector_mutex);
if (detector && (receiver != nullptr))
detector->Trigger();
}
std::optional<DetectorStatus> JFJochServices::GetDetectorStatus() const {
std::shared_lock ul(detector_mutex, std::defer_lock);
if (ul.try_lock_for(std::chrono::milliseconds(500)) && detector)
return detector->GetStatus();
return {};
}
std::string JFJochServices::GetPreviewJPEG(const PreviewImageSettings &settings, int64_t image_number) const {
if (receiver != nullptr)
return receiver->GetJPEGFromBuffer(settings, image_number);
else
return {};
}
std::string JFJochServices::GetPreviewTIFF(int64_t image_number) const {
if (receiver != nullptr)
return receiver->GetTIFFFromBuffer(image_number);
else
return "";
}
void JFJochServices::ConfigureDetector(const DiffractionExperiment &experiment) {
std::unique_lock ul(detector_mutex); // While configuring detector ensure exclusive access (even though pointer is not modified here)
if (detector)
detector->Configure(experiment);
}
void JFJochServices::LoadInternalGeneratorImage(const DiffractionExperiment &experiment,
const std::vector<uint16_t> &image,
uint64_t image_number) {
if (receiver)
receiver->LoadInternalGeneratorImage(experiment, image, image_number);
}
void JFJochServices::GetXFELPulseID(std::vector<uint64_t> &v) const {
if (receiver)
receiver->GetXFELPulseID(v);
}
void JFJochServices::GetXFELEventCode(std::vector<uint64_t> &v) const {
if (receiver)
receiver->GetXFELEventCode(v);
}
std::vector<DeviceStatus> JFJochServices::GetDeviceStatus() const {
std::vector<DeviceStatus> ret;
if (receiver)
ret = receiver->GetDeviceStatus();
return ret;
}
ZMQPreviewSettings JFJochServices::GetPreviewSocketSettings() {
if (receiver)
return receiver->GetPreviewSocketSettings();
return {};
}
ZMQMetadataSettings JFJochServices::GetMetadataSocketSettings() {
if (receiver)
return receiver->GetMetadataSocketSettings();
return {};
}
void JFJochServices::SetPreviewSocketSettings(const ZMQPreviewSettings &input) {
if (receiver)
receiver->PreviewSocketSettings(input);
}
void JFJochServices::SetMetadataSocketSettings(const ZMQMetadataSettings &input) {
if (receiver)
receiver->MetadataSocketSettings(input);
}
void JFJochServices::GetStartMessageFromBuffer(std::vector<uint8_t> &v) {
if (receiver)
return receiver->GetStartMessageFromBuffer(v);
}
bool JFJochServices::GetImageFromBuffer(std::vector<uint8_t> &v, int64_t image_number) {
if (receiver)
return receiver->GetImageFromBuffer(v, image_number);
return false;
}
ImageBufferStatus JFJochServices::GetImageBufferStatus() const {
if (receiver)
return receiver->GetImageBufferStatus();
else return ImageBufferStatus{.total_slots = 0, .available_slots = 0};
}
void JFJochServices::ClearImageBuffer() const {
if (receiver)
receiver->ClearImageBuffer();
}
void JFJochServices::LoadDetectorPixelMask(PixelMask &mask) {
std::shared_lock ul(detector_mutex);
if (detector)
detector->LoadPixelMask(mask);
}
void JFJochServices::SetupIndexing(const IndexingSettings &input) {
if (receiver)
receiver->Indexing(input);
}
ImagePusherStatus JFJochServices::GetImagePusherStatus() const {
if (receiver)
return receiver->GetImagePusherStatus();
return {
.pusher_type = ImagePusherType::None,
.address = {},
.connected_writers = 0
};
}