Files
Jungfraujoch/process/JFJochProcess.cpp
T
leonarski_fandClaude Opus 4.8 dfc6703f4a jfjoch_process: don't double-smooth per-frame G on the reference path
SmoothImageScaleG rewrites the partials in place (image_scale_corr and
image_scale_g). On the no-reference path that is harmless: each scaling
pass recomputes G from scratch via ScaleAllImages, so smoothing always
runs on freshly-refined G. On the reference path the scaling loop is
skipped, so G is computed once and stays; running scale_and_merge twice
(P1 then the adopted space group) smoothed the already-smoothed G a
second time, compounding into a ~2x wider effective kernel than the
configured --smooth-g and biasing the merged intensities.

Smooth only on the first pass of the reference path (G is unchanged
afterwards, and the smoothed partials persist into the second pass's
combine3D). The no-reference path is unchanged.

Verified on lyso (600 frames, -P rot3d -z ref.mtz -M): the reference run
now logs the smoothing once instead of twice, and the merged MTZ changes.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-07-01 14:24:05 +02:00

705 lines
36 KiB
C++

// SPDX-FileCopyrightText: 2026 Filip Leonarski, Paul Scherrer Institute <filip.leonarski@psi.ch>
// SPDX-License-Identifier: GPL-3.0-only
#include "JFJochProcess.h"
#include <algorithm>
#include <atomic>
#include <chrono>
#include <cmath>
#include <functional>
#include <future>
#include <numeric>
#include <set>
#include <sstream>
#include "../reader/JFJochHDF5Reader.h"
#include "../common/Logger.h"
#include "../common/AzimuthalIntegrationMapping.h"
#include "../common/AzimuthalIntegrationProfile.h"
#include "../common/CUDAWrapper.h"
#include "../common/time_utc.h"
#include "../writer/FileWriter.h"
#include "../image_analysis/MXAnalysisWithoutFPGA.h"
#include "../image_analysis/IndexAndRefine.h"
#include "../image_analysis/indexing/IndexerThreadPool.h"
#include "../image_analysis/azint/AzIntEngineCPU.h"
#include "../image_analysis/image_preprocessing/ImagePreprocessorCPU.h"
#include "../image_analysis/image_preprocessing/ImagePreprocessorBuffer.h"
#include "../image_analysis/scale_merge/Merge.h"
#include "../image_analysis/scale_merge/ScaleOnTheFly.h"
#include "../image_analysis/scale_merge/ScalingResult.h"
#include "../image_analysis/scale_merge/SearchSpaceGroup.h"
#include "../image_analysis/scale_merge/TwinningAnalysis.h"
#include "../image_analysis/scale_merge/Combine3D.h"
#include "../image_analysis/scale_merge/HKLKey.h"
#include "../image_analysis/WriteReflections.h"
#include "../common/Definitions.h"
#include <array>
#include <map>
#include <Eigen/Dense>
namespace {
// Pick up to requested_images ordinals spread evenly across [0, images_to_process) for the
// first pass of two-pass rotation indexing.
std::vector<int> select_equally_spaced_image_ordinals(int images_to_process, int requested_images) {
std::vector<int> ret;
if (images_to_process <= 0 || requested_images <= 0)
return ret;
const int n = std::min(images_to_process, requested_images);
if (n == 1) {
ret.push_back(0);
return ret;
}
std::set<int> unique_ordinals;
for (int i = 0; i < n; i++)
unique_ordinals.insert(static_cast<int>(
std::llround(static_cast<double>(i) * static_cast<double>(images_to_process - 1) /
static_cast<double>(n - 1))));
ret.assign(unique_ordinals.begin(), unique_ordinals.end());
return ret;
}
// XDS-order scaling. The rot3d combine emits fulls with partiality == 1 (image_scale_corr == 1),
// so they were only ever scaled as per-frame *partials* upstream - their per-frame scale is
// entangled with the rocking-curve/partiality model. This refits a per-frame scale directly on
// the complete reflections with the Unity model (no partiality term, G*Itrue - I_full), the way
// XDS/DIALS scale 3D-integrated fulls. A pure post-correction: it updates image_scale_corr on the
// fulls (1 -> 1/G) without re-combining.
void ScaleFulls(const DiffractionExperiment &experiment,
std::vector<IntegrationOutcome> &fulls, int scaling_iter,
size_t nthreads, Logger &logger) {
DiffractionExperiment unity = experiment;
ScalingSettings ss = unity.GetScalingSettings();
ss.SetPartialityModel(PartialityModel::Unity);
unity.ImportScalingSettings(ss);
for (int i = 0; i < scaling_iter; i++) {
const auto reference = MergeAll(unity, fulls, false);
ScaleOnTheFly(unity, reference).Scale(fulls, nthreads);
}
logger.Info("Scaled fulls (XDS order, Unity model)");
}
// Absorption surface. A smooth multiplicative correction A(s1_crystal) shared across the sweep,
// fit by ridge-regularized log-linear least-squares of I_scaled/I_ref against a degree<=4
// monomial basis of the diffracted-beam direction in the crystal (goniometer) frame - the same
// function space as real spherical harmonics up to l=4, as XDS/DIALS use. Applied by dividing
// image_scale_corr by A, then re-merged. Negligible at hard X-rays / thin crystals but matters
// at low energy (e.g. 6 keV).
void AbsorptionSurface(const DiffractionExperiment &exp,
std::vector<IntegrationOutcome> &fulls, int n_iter, Logger &logger) {
const auto gon_opt = exp.GetGoniometer();
if (!gon_opt) { logger.Warning("Absorption: no goniometer, skipping"); return; }
const auto &gon = *gon_opt;
const Coord axis = gon.GetAxis().Normalize();
const DiffractionGeometry geom = exp.GetDiffractionGeometry();
const HKLKeyGenerator keygen(exp.GetScalingSettings().GetMergeFriedel(),
exp.GetSpaceGroupNumber().value_or(1));
constexpr int MAXDEG = 4;
constexpr int NB = (MAXDEG + 1) * (MAXDEG + 2) * (MAXDEG + 3) / 6; // # monomials of degree 0..4
auto basis = [&](const Coord &s, std::array<double, NB> &b) {
int n = 0;
for (int deg = 0; deg <= MAXDEG; ++deg)
for (int i = deg; i >= 0; --i)
for (int j = deg - i; j >= 0; --j) {
const int kk = deg - i - j;
double v = 1.0;
for (int a = 0; a < i; ++a) v *= s.x;
for (int a = 0; a < j; ++a) v *= s.y;
for (int a = 0; a < kk; ++a) v *= s.z;
b[n++] = v;
}
};
auto s1_crystal = [&](const Reflection &r) {
const Coord s1lab = geom.LabCoord(r.predicted_x, r.predicted_y).Normalize();
const double phi = gon.GetAngle_deg(r.image_number) * 3.14159265358979323846 / 180.0;
return RotMatrix(static_cast<float>(-phi), axis) * s1lab;
};
for (int iter = 0; iter < n_iter; ++iter) {
const auto ref = MergeAll(exp, fulls, false);
std::map<HKLKey, float> refI;
for (const auto &m : ref) refI[keygen(m)] = m.I;
Eigen::MatrixXd AtA = Eigen::MatrixXd::Zero(NB, NB);
Eigen::VectorXd Atb = Eigen::VectorXd::Zero(NB);
std::array<double, NB> b{};
for (const auto &oc : fulls)
for (const auto &r : oc.reflections) {
if (!r.observed || !std::isfinite(r.image_scale_corr) || r.image_scale_corr <= 0.0f) continue;
const double Iscaled = static_cast<double>(r.I) * r.image_scale_corr;
const double sig = static_cast<double>(r.sigma) * r.image_scale_corr;
if (!(Iscaled > 0.0) || !(sig > 0.0)) continue;
const auto it = refI.find(keygen(r));
if (it == refI.end() || !(it->second > 0.0f)) continue;
basis(s1_crystal(r), b);
const double y = std::log(Iscaled / it->second);
const double w = std::min(100.0, (Iscaled / sig) * (Iscaled / sig)); // strong reflections define the surface
for (int a = 0; a < NB; ++a) {
Atb(a) += w * b[a] * y;
for (int c = 0; c < NB; ++c) AtA(a, c) += w * b[a] * b[c];
}
}
const double lambda = 1e-2 * AtA.diagonal().mean(); // ridge: x^2+y^2+z^2=1 makes the basis rank-deficient
for (int a = 0; a < NB; ++a) AtA(a, a) += lambda;
const Eigen::VectorXd c = AtA.ldlt().solve(Atb);
double sumsq = 0.0; size_t n_app = 0;
for (auto &oc : fulls)
for (auto &r : oc.reflections) {
if (!std::isfinite(r.image_scale_corr) || r.image_scale_corr <= 0.0f) continue;
basis(s1_crystal(r), b);
double surf = 0.0;
for (int a = 0; a < NB; ++a) surf += c(a) * b[a];
r.image_scale_corr = static_cast<float>(r.image_scale_corr / std::exp(surf));
sumsq += surf * surf; ++n_app;
}
logger.Info("Absorption surface iter {}/{}: {} fulls, rms(log A)={:.3f}",
iter + 1, n_iter, n_app, std::sqrt(sumsq / std::max<size_t>(n_app, 1)));
}
}
// Smooth the per-frame scale G across frames before the rot3d combine. ScaleOnTheFly fits each
// frame's G independently, so the few partials of one rocking event get inconsistent scales when
// weight-summed; a centered moving average of log(G) over a small odd frame window removes that
// jitter. Only G changed, so each reflection's image_scale_corr (proportional to 1/G) is rescaled
// by g_old/g_smooth. Frames without a fitted G (n<MIN_REFLECTIONS) are skipped on both sides.
void SmoothImageScaleG(std::vector<IntegrationOutcome> &outcomes, int window, Logger &logger) {
const int n = static_cast<int>(outcomes.size());
const int half = window / 2;
std::vector<double> g_smooth(n, NAN);
for (int o = 0; o < n; o++) {
double sum_log = 0.0;
int count = 0;
for (int j = std::max(0, o - half); j <= std::min(n - 1, o + half); j++) {
const auto &g = outcomes[j].image_scale_g;
if (g && std::isfinite(*g) && *g > 0.0) {
sum_log += std::log(*g);
count++;
}
}
if (count > 0)
g_smooth[o] = std::exp(sum_log / count);
}
size_t n_smoothed = 0;
for (int o = 0; o < n; o++) {
const auto &g_old = outcomes[o].image_scale_g;
if (!g_old || !std::isfinite(*g_old) || *g_old <= 0.0 || !std::isfinite(g_smooth[o]))
continue;
const double factor = *g_old / g_smooth[o];
for (auto &r : outcomes[o].reflections)
if (std::isfinite(r.image_scale_corr))
r.image_scale_corr = static_cast<float>(r.image_scale_corr * factor);
outcomes[o].image_scale_g = g_smooth[o];
n_smoothed++;
}
logger.Info("Smoothed per-frame scale G over a {}-frame window ({} of {} frames)",
window, n_smoothed, n);
}
}
JFJochProcess::JFJochProcess(JFJochHDF5Reader &reader, DiffractionExperiment experiment,
PixelMask pixel_mask, ProcessConfig config)
: reader_(reader), experiment_(std::move(experiment)),
pixel_mask_(std::move(pixel_mask)), config_(std::move(config)) {}
ProcessResult JFJochProcess::Run(JFJochProcessObserver *observer) {
Logger logger("JFJochProcess");
ProcessResult result;
const auto dataset = reader_.GetDataset();
if (!dataset)
throw JFJochException(JFJochExceptionCategory::InputParameterInvalid,
"No experiment dataset found in the input file");
if (config_.stride <= 0)
throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "Image stride must be positive");
const auto total_images_in_file = static_cast<int>(reader_.GetNumberOfImages());
int end_image = config_.end_image;
if (end_image < 0 || end_image > total_images_in_file)
end_image = total_images_in_file;
const int start_image = config_.start_image;
const int images_to_process = (end_image - start_image) / config_.stride;
if (images_to_process <= 0) {
logger.Warning("No images to process (start {}, end {}, stride {}, total {})",
start_image, end_image, config_.stride, total_images_in_file);
return result;
}
const bool full = (config_.mode == ProcessMode::FullAnalysis);
const bool write_files = !config_.output_prefix.empty();
// Output/runtime invariants. Algorithm settings (indexing, scaling, integration, polarization,
// space group, unit cell, ...) are configured on experiment_ by the caller.
experiment_.BitDepthImage(32).PixelSigned(true);
experiment_.Mode(DetectorMode::Standard);
experiment_.OverwriteExistingFiles(true);
experiment_.FilePrefix(config_.output_prefix.empty() ? "output" : config_.output_prefix);
experiment_.SetFileWriterFormat(FileWriterFormat::NXmxLegacy);
experiment_.ImagesPerTrigger(images_to_process);
experiment_.NumTriggers(1);
if (full)
experiment_.Compression(CompressionAlgorithm::BSHUF_LZ4);
// The pipeline indexes images 0..N-1 within this run; if we process a sub-range/strided
// selection, shift the goniometer so local index i maps to the angle of original image
// start+i*stride (keeping the per-image rotation wedge), otherwise rotation angles would be
// wrong for any start_image != 0.
if (const auto g = experiment_.GetGoniometer();
g.has_value() && (start_image != 0 || config_.stride != 1)) {
const float incr = g->GetIncrement_deg();
GoniometerAxis shifted(g->GetName(),
g->GetStart_deg() + incr * static_cast<float>(start_image),
incr * static_cast<float>(config_.stride),
g->GetAxis(), g->GetHelicalStep());
shifted.ScreeningWedge(g->GetScreeningWedge().value_or(incr));
experiment_.Goniometer(shifted);
}
AzimuthalIntegrationMapping mapping(experiment_, pixel_mask_);
JFJochReceiverPlots plots;
plots.Setup(experiment_, mapping);
// Output file (NXmxIntegrated master that links back to the original images).
StartMessage start_message;
experiment_.FillMessage(start_message);
start_message.arm_date = dataset->arm_date;
start_message.az_int_bin_to_q = mapping.GetBinToQ();
start_message.az_int_bin_to_two_theta = mapping.GetBinToTwoTheta();
start_message.az_int_q_bin_count = mapping.GetQBinCount();
start_message.az_int_phi_bin_count = mapping.GetAzimuthalBinCount();
if (mapping.GetAzimuthalBinCount() > 1)
start_message.az_int_bin_to_phi = mapping.GetBinToPhi();
start_message.pixel_mask["default"] = pixel_mask_.GetMask(experiment_);
if (full) {
start_message.rois = experiment_.ROI().ExportMetadata();
if (!experiment_.ROI().empty())
start_message.roi_map = experiment_.ExportROIMap();
start_message.max_spot_count = experiment_.GetMaxSpotCount();
}
start_message.master_suffix = "process";
start_message.file_format = FileWriterFormat::NXmxIntegrated;
start_message.write_master_file = true;
start_message.write_images = false;
start_message.hdf5_source_data = reader_.GetHDF5DataSource(start_image, images_to_process);
std::unique_ptr<FileWriter> writer;
if (write_files && config_.write_process_h5)
writer = std::make_unique<FileWriter>(start_message);
logger.Info("Processing {} images (range {}-{}, stride {}) using {} threads [{}]",
images_to_process, start_image, end_image, config_.stride, config_.nthreads,
full ? "full analysis" : "azimuthal integration");
if (observer)
observer->OnPhase(full ? "Full analysis" : "Azimuthal integration");
// Full-analysis shared engines.
std::unique_ptr<IndexerThreadPool> indexer_pool;
std::unique_ptr<IndexAndRefine> indexer;
if (full) {
indexer_pool = std::make_unique<IndexerThreadPool>(experiment_.GetIndexingSettings());
indexer = std::make_unique<IndexAndRefine>(experiment_, indexer_pool.get());
if (!config_.reference_data.empty())
indexer->ReferenceIntensities(config_.reference_data);
}
const auto start_time = std::chrono::steady_clock::now();
// First pass of two-pass rotation indexing (full analysis only).
if (full && config_.forced_rotation_lattice.has_value()) {
indexer->ForceRotationIndexerLattice(*config_.forced_rotation_lattice);
logger.Info("Rotation indexer lattice forced externally - skipping first pass");
} else if (full && config_.rotation_indexing && config_.two_pass_rotation) {
if (observer)
observer->OnPhase("Rotation indexing (first pass)");
const auto selected = select_equally_spaced_image_ordinals(images_to_process,
config_.rotation_indexing_image_count);
logger.Info("First-pass rotation indexing using {} images{}", selected.size(),
config_.reuse_rotation_spots ? " and stored spots" : "");
for (const int ordinal: selected) {
if (cancelled_) break;
const int image_idx = start_image + ordinal * config_.stride;
DataMessage msg{};
msg.number = ordinal; // index into the rotation indexer (0..images_to_process-1)
msg.original_number = image_idx;
try {
if (config_.reuse_rotation_spots) {
msg.spots = reader_.ReadSpots(image_idx);
} else {
auto img = reader_.GetRawImage(image_idx);
if (!img) continue;
MXAnalysisWithoutFPGA analysis(experiment_, mapping, pixel_mask_, *indexer);
AzimuthalIntegrationProfile profile(mapping);
auto first_pass = config_.spot_finding;
first_pass.indexing = false;
first_pass.quick_integration = false;
msg.image = img->image;
if (dataset->efficiency.size() > image_idx)
msg.image_collection_efficiency = dataset->efficiency[image_idx];
analysis.Analyze(msg, profile, first_pass);
}
indexer->AddImageToRotationIndexer(msg);
} catch (const std::exception &e) {
logger.Warning("First-pass rotation indexing failed for image {}: {}", image_idx, e.what());
}
}
if (!cancelled_ && !indexer->FinalizeRotationIndexing().has_value())
throw JFJochException(JFJochExceptionCategory::InputParameterInvalid,
"Two-pass rotation indexing failed");
if (!cancelled_)
logger.Info("Two-pass rotation indexing found lattice");
}
// Main per-image loop, spread over N worker threads pulling from a shared counter. HDF5 reads
// are serialized by the global hdf5_mutex; the analysis runs in parallel.
std::atomic<int> next_ordinal = 0;
std::atomic<int> finished_count = 0;
std::atomic<uint64_t> total_uncompressed_bytes = 0;
auto azint_worker = [&]() {
std::vector<uint8_t> decompression_buffer;
ImagePreprocessorCPU preprocessor(experiment_, pixel_mask_);
ImagePreprocessorBuffer buffer(experiment_.GetPixelsNum());
AzIntEngineCPU azint(mapping);
AzimuthalIntegrationProfile profile(mapping);
while (!cancelled_) {
const int ordinal = next_ordinal.fetch_add(1);
const int image_idx = start_image + ordinal * config_.stride;
if (image_idx >= end_image) break;
std::shared_ptr<JFJochReaderRawImage> img;
try {
img = reader_.GetRawImage(image_idx);
} catch (const std::exception &e) {
logger.Error("Failed to load image {}: {}", image_idx, e.what());
continue;
}
if (!img) continue;
DataMessage msg{};
msg.image = img->image;
msg.number = ordinal;
msg.original_number = image_idx;
if (dataset->efficiency.size() > image_idx)
msg.image_collection_efficiency = dataset->efficiency[image_idx];
total_uncompressed_bytes += msg.image.GetUncompressedSize();
const auto t0 = std::chrono::steady_clock::now();
try {
const uint8_t *image_ptr = msg.image.GetUncompressedPtr(decompression_buffer);
preprocessor.Analyze(buffer, image_ptr, msg.image.GetMode());
azint.Run(buffer, profile);
} catch (const std::exception &e) {
logger.Error("Error integrating image {}: {}", image_idx, e.what());
continue;
}
msg.azint_time_s = std::chrono::duration<float>(std::chrono::steady_clock::now() - t0).count();
msg.processing_time_s = msg.azint_time_s;
msg.az_int_profile = profile.GetResult();
msg.az_int_profile_count = profile.GetPixelCount();
msg.az_int_profile_std = profile.GetStd();
msg.bkg_estimate = profile.GetBkgEstimate(mapping.Settings());
msg.run_number = experiment_.GetRunNumber();
msg.run_name = experiment_.GetRunName();
plots.Add(msg, profile);
if (writer) writer->Write(msg);
if (observer) observer->OnImageProcessed(msg);
const int done = finished_count.fetch_add(1) + 1;
if (observer) observer->OnProgress(done, images_to_process);
}
};
auto full_worker = [&]() {
pin_gpu(); // round-robin per worker thread; must precede engine construction
MXAnalysisWithoutFPGA analysis(experiment_, mapping, pixel_mask_, *indexer);
AzimuthalIntegrationProfile profile(mapping);
while (!cancelled_) {
const int ordinal = next_ordinal.fetch_add(1);
const int image_idx = start_image + ordinal * config_.stride;
if (image_idx >= end_image) break;
std::shared_ptr<JFJochReaderRawImage> img;
try {
img = reader_.GetRawImage(image_idx);
} catch (const std::exception &e) {
logger.Error("Failed to load image {}: {}", image_idx, e.what());
continue;
}
if (!img) continue;
DataMessage msg{};
msg.image = img->image;
msg.number = ordinal;
msg.original_number = image_idx;
if (dataset->efficiency.size() > image_idx)
msg.image_collection_efficiency = dataset->efficiency[image_idx];
total_uncompressed_bytes += msg.image.GetUncompressedSize();
const auto t0 = std::chrono::steady_clock::now();
try {
analysis.Analyze(msg, profile, config_.spot_finding);
} catch (const std::exception &e) {
logger.Error("Error analyzing image {}: {}", image_idx, e.what());
continue;
}
msg.processing_time_s = std::chrono::duration<float>(std::chrono::steady_clock::now() - t0).count();
msg.run_number = experiment_.GetRunNumber();
msg.run_name = experiment_.GetRunName();
plots.Add(msg, profile);
if (writer) writer->Write(msg);
if (observer) observer->OnImageProcessed(msg);
const int done = finished_count.fetch_add(1) + 1;
if (observer) observer->OnProgress(done, images_to_process);
}
};
if (observer)
observer->OnPhase("Processing images");
std::function<void()> worker = full ? std::function<void()>(full_worker)
: std::function<void()>(azint_worker);
std::vector<std::future<void> > futures;
futures.reserve(config_.nthreads);
for (int i = 0; i < config_.nthreads; ++i)
futures.push_back(std::async(std::launch::async, worker));
for (auto &f: futures)
f.get();
const auto end_time = std::chrono::steady_clock::now();
result.cancelled = cancelled_;
result.images_processed = finished_count.load();
result.processing_time_s = std::chrono::duration<double>(end_time - start_time).count();
if (result.processing_time_s > 0.0) {
result.frame_rate_hz = static_cast<double>(result.images_processed) / result.processing_time_s;
result.throughput_MBs = static_cast<double>(total_uncompressed_bytes) / (result.processing_time_s * 1e6);
}
result.mean_processing_time = plots.GetMeanProcessingTime();
result.indexing_rate = plots.GetIndexingRate();
// End message (also written to the file).
EndMessage end_msg;
end_msg.max_image_number = result.images_processed;
end_msg.images_collected_count = result.images_processed;
end_msg.images_sent_to_write_count = result.images_processed;
end_msg.end_date = time_UTC(std::chrono::system_clock::now());
end_msg.run_number = experiment_.GetRunNumber();
end_msg.run_name = experiment_.GetRunName();
end_msg.bkg_estimate = plots.GetBkgEstimate();
end_msg.az_int_result["dataset"] = plots.GetAzIntProfile();
end_msg.indexing_rate = result.indexing_rate;
if (full && !cancelled_) {
if (const auto rot = indexer->FinalizeRotationIndexing(); rot.has_value()) {
end_msg.rotation_lattice = rot->lattice;
end_msg.rotation_lattice_type = LatticeMessage{
.centering = rot->search_result.centering,
.niggli_class = rot->search_result.niggli_class,
.crystal_system = rot->search_result.system
};
result.rotation_lattice_found = true;
}
result.consensus_cell = indexer->GetConsensusUnitCell();
end_msg.unit_cell = result.consensus_cell;
}
// Scaling and merging (full analysis only).
if (full && !cancelled_ && result.indexing_rate.has_value() && result.indexing_rate > 0
&& (config_.run_scaling || !config_.reference_data.empty())) {
// Scaling/merging is a long post-pass; report each sub-step as a phase so the GUI progress
// bar reflects what is happening instead of freezing on one label. Also time each phase
// (logged on transition) so the bottlenecks are visible.
auto t_phase = std::chrono::steady_clock::now();
std::string prev_phase;
auto phase = [&](const std::string &p) {
const auto now = std::chrono::steady_clock::now();
if (!prev_phase.empty())
logger.Info("[timing] {}: {:.2f} s", prev_phase,
std::chrono::duration<double>(now - t_phase).count());
t_phase = now;
prev_phase = p;
if (observer) observer->OnPhase(p);
};
phase("Scaling and merging");
// Ice-ring handling (--detect-ice-rings): drop reflections sitting on a hexagonal-ice powder
// ring before scaling. Their integrated intensity is contaminated by the strong, variable ice
// background, so leaving them in mis-scales the whole frame (the per-image fit is dragged) and
// inflates the error model. Removed once here, so every scaling pass, the combine, the merge
// and the statistics all skip them.
if (experiment_.IsDetectIceRings()) {
const float ice_width = config_.spot_finding.ice_ring_width_Q_recipA;
size_t total = 0, dropped = 0;
for (auto &outcome : indexer->GetIntegrationOutcome()) {
total += outcome.reflections.size();
const size_t before = outcome.reflections.size();
std::erase_if(outcome.reflections,
[&](const Reflection &r) { return IsOnIceRing(r.d, ice_width); });
dropped += before - outcome.reflections.size();
}
logger.Info("Ice-ring exclusion: dropped {} of {} reflections on ice rings (half-width {:.3f} A^-1)",
dropped, total, ice_width);
}
// Scale the images and merge. Factored so it can run twice: first in P1 to give the
// space-group search a merged dataset, then again in the determined space group so the
// scaling sees symmetry equivalents and the final statistics are in the right symmetry.
// (With a user-fixed space group it simply runs once, already in that symmetry.)
struct ScaleMergeResult {
std::vector<MergedReflection> merged;
MergeStatistics statistics;
};
// The reference path computes each image's G once (per-image scaling against the
// reference); the scaling loop below is skipped, so G is stable across the two passes.
// Smoothing it more than once would compound the correction, so do it only on the first
// pass. The no-reference path recomputes G from scratch each pass and re-smooths correctly.
bool reference_g_smoothed = false;
auto scale_and_merge = [&](const std::string &label) -> ScaleMergeResult {
// ScaleOnTheFly self-scaling is only for the no-reference path; with a reference each
// image is already scaled against it during the per-image pass, so we merge directly.
if (config_.reference_data.empty()) {
logger.Info("Running scaling ({}) ...", label);
for (int i = 0; i < config_.scaling_iter; i++) {
phase("Scaling images (" + label + ", iteration " + std::to_string(i + 1) + "/"
+ std::to_string(config_.scaling_iter) + ")");
auto merge_result = MergeAll(experiment_, indexer->GetIntegrationOutcome(), false);
indexer->ScaleAllImages(merge_result);
}
}
// -P rot3d: weight-sum each reflection's per-frame partials into one full before
// merging, so the error model sees counting statistics (high ISa) instead of
// rocking-curve slicing scatter.
const bool rot3d = experiment_.GetScalingSettings().GetCombine3D();
const double smooth_g_deg = experiment_.GetScalingSettings().GetSmoothGDegrees();
const auto gonio = experiment_.GetGoniometer();
const double osc_deg = gonio ? std::fabs(gonio->GetIncrement_deg()) : 0.0;
const bool reference_path = !config_.reference_data.empty();
if (rot3d && smooth_g_deg > 0.0 && osc_deg > 1e-6 && !(reference_path && reference_g_smoothed)) {
int window = std::max(1, static_cast<int>(std::lround(smooth_g_deg / osc_deg)));
if (window % 2 == 0)
++window; // odd window for a centered average
phase("Smoothing per-frame scale G");
logger.Info("Smoothing per-frame scale G over {:.2f} deg ({} frames at {:.3f} deg/frame)",
smooth_g_deg, window, osc_deg);
SmoothImageScaleG(indexer->GetIntegrationOutcome(), window, logger);
reference_g_smoothed = true;
}
std::vector<IntegrationOutcome> combined;
if (rot3d) {
phase("Combining 3D partials");
combined = CombineRotationObservations(indexer->GetIntegrationOutcome(), experiment_, &logger,
config_.observation_dump_path);
}
if (rot3d && experiment_.GetScalingSettings().GetScaleFulls()) {
phase("Scaling fulls (XDS order)");
ScaleFulls(experiment_, combined, static_cast<int>(config_.scaling_iter), config_.nthreads, logger);
}
if (rot3d && experiment_.GetScalingSettings().GetAbsorptionIter() > 0) {
phase("Absorption surface");
AbsorptionSurface(experiment_, combined, experiment_.GetScalingSettings().GetAbsorptionIter(), logger);
}
const std::vector<IntegrationOutcome> &merge_input =
rot3d ? combined : indexer->GetIntegrationOutcome();
phase("Merging");
MergeOnTheFly merge_engine(experiment_);
if (result.consensus_cell.has_value())
merge_engine.ReferenceCell(*result.consensus_cell);
merge_engine.RefineErrorModel(merge_input);
if (merge_engine.ErrorModelActive())
logger.Info("Error model: a={:.3f} b={:.3f} ISa={:.1f} chi2={:.2f}", merge_engine.ErrorModelA(),
merge_engine.ErrorModelB(),
merge_engine.ErrorModelB() > 0 ? 1.0 / merge_engine.ErrorModelB() : 0.0,
merge_engine.ErrorModelChi2());
for (size_t i = 0; i < merge_input.size(); ++i)
merge_engine.AddImage(merge_input[i], static_cast<int64_t>(i));
ScaleMergeResult out;
out.merged = merge_engine.ExportReflections();
phase("Computing statistics");
out.statistics = merge_engine.MergeStats(out.merged, merge_input, config_.reference_data);
result.error_model_isa = merge_engine.ErrorModelB() > 0 ? 1.0 / merge_engine.ErrorModelB() : 0.0;
logger.Info("Merge complete ({} unique reflections, {})", out.merged.size(), label);
return out;
};
// First pass: P1 when searching, or directly the user-fixed space group.
const auto initial_sg = experiment_.GetGemmiSpaceGroup();
auto sm = scale_and_merge(initial_sg ? initial_sg->short_name() : "P1");
std::ostringstream stats_text;
if (!experiment_.GetGemmiSpaceGroup().has_value()) {
SearchSpaceGroupOptions sg_opts;
sg_opts.merge_friedel = experiment_.GetScalingSettings().GetMergeFriedel();
sg_opts.d_min_limit_A = experiment_.GetScalingSettings().GetHighResolutionLimit_A().value_or(0.0);
// Constrain the search to subgroups of the lattice (metric) symmetry found by rotation
// indexing. Centering is not constrained here - it is determined from the absences.
if (end_msg.rotation_lattice_type.has_value())
sg_opts.lattice_system = end_msg.rotation_lattice_type->crystal_system;
const auto sg_search = SearchSpaceGroup(sm.merged, sg_opts);
stats_text << SearchSpaceGroupResultToText(sg_search) << "\n\n";
// Adopt the determined space group and re-scale/merge in it, so scaling uses symmetry
// equivalents and the statistics come out in the right symmetry. P1 stands when nothing
// is determined.
if (sg_search.best_space_group.has_value()) {
const auto &sg = *sg_search.best_space_group;
logger.Info("Adopting space group {} (number {})", sg.short_name(), sg.number);
experiment_.SpaceGroupNumber(sg.number);
end_msg.space_group_number = sg.number;
result.space_group_number = sg.number;
phase("Re-scaling in space group " + sg.short_name());
sm = scale_and_merge(sg.short_name());
}
}
const auto twin_sg_number = experiment_.GetSpaceGroupNumber();
const gemmi::SpaceGroup *twin_sg = twin_sg_number
? gemmi::find_spacegroup_by_number(twin_sg_number.value()) : nullptr;
result.twinning = AnalyzeTwinning(sm.merged, twin_sg);
stats_text << TwinningAnalysisToText(result.twinning) << "\n";
stats_text << sm.statistics;
result.merge_statistics_text = stats_text.str();
result.has_merge_statistics = true;
result.merge_statistics = sm.statistics;
result.has_reference = !config_.reference_data.empty();
if (result.consensus_cell && write_files) {
phase("Writing reflections");
WriteReflections(sm.merged, *result.consensus_cell, experiment_, sm.statistics,
result.error_model_isa > 0 ? fmt::format("{:.2f}", result.error_model_isa) : "?",
config_.output_prefix);
// Per-image scaling table (G, B-factor, mosaicity, wedge, CC) for inspection / XDS
// comparison. The offline self-scaling result is otherwise not exposed (process.h5's
// per-image arrays are only filled on the online per-image path). Sourced from the
// partials, which carry the first-pass per-image scale.
ScalingResult(indexer->GetIntegrationOutcome()).SaveToFile(config_.output_prefix);
}
phase(""); // flush the last phase's timing
}
if (writer) {
writer->WriteHDF5(end_msg);
writer->Finalize();
result.written_master_path = config_.output_prefix + "_process.h5";
}
if (observer)
observer->OnPhase(cancelled_ ? "Cancelled" : "Done");
logger.Info("{} {} images in {:.2f} s ({:.2f} Hz)", cancelled_ ? "Cancelled after" : "Processed",
result.images_processed, result.processing_time_s, result.frame_rate_hz);
return result;
}