Files
Jungfraujoch/broker/JFJochServices.cpp
T
leonarski_fandClaude Opus 4.8 145b300fa2 broker: fail the run loudly when the writer breaks mid-acquisition
A writer connection that dropped mid-run left a truncated file, yet the
acquisition was still logged as "finished with success": the writer error
returned by ImagePusher::Finalize() (set via transmission_error on a broken
session connection) was captured into receiver_output.writer_err but never
acted upon.

Treat a non-empty writer_err as a failed acquisition: throw so it is reported
as "finished with error" and surfaced to the caller, instead of silently
succeeding with incomplete data. Applies to both the Lite and FPGA workflows,
which share this stop/finalize path. Combined with the liveness changes (which
no longer tear a connection down for a transient stall), this fires only on a
genuine writer break.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-23 18:27:04 +02:00

322 lines
11 KiB
C++

// SPDX-FileCopyrightText: 2024 Filip Leonarski, Paul Scherrer Institute <filip.leonarski@psi.ch>
// SPDX-License-Identifier: GPL-3.0-only
#include "JFJochServices.h"
#include "../common/JFJochException.h"
#include "../detector_control/SLSDetectorWrapper.h"
#include "../detector_control/DectrisDetectorWrapper.h"
JFJochServices::JFJochServices(Logger &in_logger) : logger(in_logger) {}
void JFJochServices::Start(const DiffractionExperiment& experiment,
const PixelMask &pixel_mask,
const JFCalibration *calibration) {
logger.Info("Measurement start for: {}", experiment.GetFilePrefix());
cannot_stop_detector = false;
if (receiver != nullptr) {
logger.Info(" ... receiver start");
if (image_puller)
image_puller->ResumeAndClear();
receiver->Start(experiment, pixel_mask, calibration, image_puller);
{
std::shared_lock ul(detector_mutex);
if (detector && !experiment.IsUsingInternalPacketGen()) {
logger.Info(" ... detector start");
detector->Start(experiment);
}
}
}
logger.Info(" Done!");
}
void JFJochServices::Off() {
image_puller.reset();
std::unique_ptr<DetectorWrapper> old_detector;
{
std::unique_lock ul(detector_mutex);
old_detector = std::move(detector);
}
if (old_detector) {
old_detector->Deactivate();
// destroyed here, outside lock
}
}
void JFJochServices::On(DiffractionExperiment &x) {
if (x.IsUsingInternalPacketGen() || (receiver == nullptr)) {
std::unique_lock ul(detector_mutex);
detector.reset();
} else {
logger.Info("Detector on");
std::unique_ptr<DetectorWrapper> new_detector;
switch (x.GetDetectorType()) {
case DetectorType::EIGER:
case DetectorType::JUNGFRAU:
new_detector = std::make_unique<SLSDetectorWrapper>();
image_puller.reset();
break;
case DetectorType::DECTRIS:
new_detector = std::make_unique<DectrisDetectorWrapper>();
image_puller = std::make_shared<ZMQImagePuller>(x.GetDetectorSetup().GetDECTRISStream2Addr());
image_puller->Suspend();
break;
}
new_detector->Initialize(x, receiver->GetNetworkConfig());
{
std::unique_lock ul(detector_mutex);
detector = std::move(new_detector);
}
logger.Info(" ... done");
}
}
JFJochServicesOutput JFJochServices::Stop() {
JFJochServicesOutput ret;
std::unique_ptr<JFJochException> exception;
bool detector_error = false;
if (receiver != nullptr) {
try {
{
std::shared_lock ul(detector_mutex);
if (detector) {
logger.Info("Wait for detector idle");
DetectorState state = detector->GetState();
while ((!cannot_stop_detector)
&& ((state == DetectorState::WAITING) || (state == DetectorState::BUSY))) {
// check detector state every 5 ms
std::this_thread::sleep_for(std::chrono::milliseconds(5));
state = detector->GetState();
}
if (state == DetectorState::IDLE) {
logger.Info(" ... detector idle");
receiver->Cancel(true); // cancel silently
} else {
logger.Error(" ... detector in error state");
receiver->Cancel(false);
detector_error = true;
}
}
}
logger.Info("Wait for receiver done");
ret.receiver_output = receiver->Stop();
if (image_puller)
image_puller->Suspend();
logger.Info(" ... Receiver efficiency: {} % Max delay: {} Compression ratio {}x",
static_cast<int>(ret.receiver_output.efficiency * 100.0),
ret.receiver_output.status.max_receive_delay.value_or(0),
static_cast<int>(std::round(ret.receiver_output.status.compressed_ratio.value_or(1))));
if (ret.receiver_output.efficiency < 1.0) {
for (int i = 0; i < ret.receiver_output.received_packets.size(); i++) {
if (ret.receiver_output.received_packets[i] != ret.receiver_output.expected_packets[i])
logger.Info(" ... Module: {} Packets received: {} out of {}", i,
ret.receiver_output.received_packets[i], ret.receiver_output.expected_packets[i]);
}
}
// A writer that broke mid-run (e.g. a lost connection) leaves a truncated file.
// Surface that as a failed acquisition instead of silently reporting success.
if (!ret.receiver_output.writer_err.empty())
throw JFJochException(JFJochExceptionCategory::FileWriteError,
"Writer error, written data may be incomplete: "
+ ret.receiver_output.writer_err);
logger.Info(" ... finished with success");
} catch (const JFJochException &e) {
logger.Error(" ... finished with error {}", e.what());
exception = std::make_unique<JFJochException>(e);
}
} else {
logger.Info("No receiver - sleeping for 30 seconds");
std::this_thread::sleep_for(std::chrono::seconds(30));
logger.Info("Sleep done");
}
if (exception)
throw JFJochException(*exception);
if (detector_error)
throw JFJochException(JFJochExceptionCategory::Detector, "Error in detector operation");
return ret;
}
void JFJochServices::Cancel() {
std::shared_lock ul(detector_mutex);
if (detector) {
// Best effort - if detector cannot be stopped, this is OK, important to still stop receiver
try {
detector->Stop();
} catch (...) {
cannot_stop_detector = true;
}
}
if (receiver != nullptr)
receiver->Cancel(false);
}
JFJochServices &JFJochServices::Receiver(JFJochReceiverService *input) {
receiver = input;
return *this;
}
std::optional<JFJochReceiverStatus> JFJochServices::GetReceiverStatus() const {
if (receiver == nullptr)
return {};
return receiver->GetStatus();
}
std::optional<float> JFJochServices::GetReceiverProgress() const {
if (receiver == nullptr)
return {};
return receiver->GetProgress();
}
MultiLinePlot JFJochServices::GetPlots(const PlotRequest &request) {
if (receiver == nullptr)
return {};
return receiver->GetDataProcessingPlot(request);
}
void JFJochServices::GetPlotRaw(std::vector<float> &v, PlotType type, const std::string &roi) {
if (receiver != nullptr)
receiver->GetPlotRaw(v, type, roi);
}
void JFJochServices::SetSpotFindingSettings(const SpotFindingSettings &settings) {
if (receiver)
receiver->SetSpotFindingSettings(settings);
}
void JFJochServices::Trigger() {
std::shared_lock ul(detector_mutex);
if (detector && (receiver != nullptr))
detector->Trigger();
}
std::optional<DetectorStatus> JFJochServices::GetDetectorStatus() const {
std::shared_lock ul(detector_mutex, std::defer_lock);
if (ul.try_lock_for(std::chrono::milliseconds(500)) && detector)
return detector->GetStatus();
return {};
}
std::string JFJochServices::GetPreviewJPEG(const PreviewImageSettings &settings, int64_t image_number) const {
if (receiver != nullptr)
return receiver->GetJPEGFromBuffer(settings, image_number);
else
return {};
}
std::string JFJochServices::GetPreviewTIFF(int64_t image_number) const {
if (receiver != nullptr)
return receiver->GetTIFFFromBuffer(image_number);
else
return "";
}
void JFJochServices::ConfigureDetector(const DiffractionExperiment &experiment) {
std::unique_lock ul(detector_mutex); // While configuring detector ensure exclusive access (even though pointer is not modified here)
if (detector)
detector->Configure(experiment);
}
void JFJochServices::LoadInternalGeneratorImage(const DiffractionExperiment &experiment,
const std::vector<uint16_t> &image,
uint64_t image_number) {
if (receiver)
receiver->LoadInternalGeneratorImage(experiment, image, image_number);
}
void JFJochServices::GetXFELPulseID(std::vector<uint64_t> &v) const {
if (receiver)
receiver->GetXFELPulseID(v);
}
void JFJochServices::GetXFELEventCode(std::vector<uint64_t> &v) const {
if (receiver)
receiver->GetXFELEventCode(v);
}
std::vector<DeviceStatus> JFJochServices::GetDeviceStatus() const {
std::vector<DeviceStatus> ret;
if (receiver)
ret = receiver->GetDeviceStatus();
return ret;
}
ZMQPreviewSettings JFJochServices::GetPreviewSocketSettings() {
if (receiver)
return receiver->GetPreviewSocketSettings();
return {};
}
ZMQMetadataSettings JFJochServices::GetMetadataSocketSettings() {
if (receiver)
return receiver->GetMetadataSocketSettings();
return {};
}
void JFJochServices::SetPreviewSocketSettings(const ZMQPreviewSettings &input) {
if (receiver)
receiver->PreviewSocketSettings(input);
}
void JFJochServices::SetMetadataSocketSettings(const ZMQMetadataSettings &input) {
if (receiver)
receiver->MetadataSocketSettings(input);
}
void JFJochServices::GetStartMessageFromBuffer(std::vector<uint8_t> &v) {
if (receiver)
return receiver->GetStartMessageFromBuffer(v);
}
bool JFJochServices::GetImageFromBuffer(std::vector<uint8_t> &v, int64_t image_number) {
if (receiver)
return receiver->GetImageFromBuffer(v, image_number);
return false;
}
ImageBufferStatus JFJochServices::GetImageBufferStatus() const {
if (receiver)
return receiver->GetImageBufferStatus();
else return ImageBufferStatus{.total_slots = 0, .available_slots = 0};
}
void JFJochServices::ClearImageBuffer() const {
if (receiver)
receiver->ClearImageBuffer();
}
void JFJochServices::LoadDetectorPixelMask(PixelMask &mask) {
std::shared_lock ul(detector_mutex);
if (detector)
detector->LoadPixelMask(mask);
}
void JFJochServices::SetupIndexing(const IndexingSettings &input) {
if (receiver)
receiver->Indexing(input);
}
ImagePusherStatus JFJochServices::GetImagePusherStatus() const {
if (receiver)
return receiver->GetImagePusherStatus();
return {
.pusher_type = ImagePusherType::None,
.address = {},
.connected_writers = 0
};
}