Build Packages / build:rpm (ubuntu2404_nocuda) (push) Successful in 13m18s
Build Packages / build:rpm (rocky8_nocuda) (push) Successful in 14m31s
Build Packages / build:rpm (rocky8_sls9) (push) Successful in 14m57s
Build Packages / build:rpm (rocky8) (push) Successful in 14m6s
Build Packages / build:rpm (ubuntu2204_nocuda) (push) Successful in 15m15s
Build Packages / build:rpm (rocky9_nocuda) (push) Successful in 15m23s
Build Packages / build:rpm (rocky9_sls9) (push) Successful in 15m34s
Build Packages / XDS test (neggia plugin) (push) Successful in 8m55s
Build Packages / XDS test (JFJoch plugin) (push) Successful in 9m17s
Build Packages / XDS test (durin plugin) (push) Successful in 9m25s
Build Packages / Create release (push) Skipped
Build Packages / Generate python client (push) Successful in 28s
Build Packages / Build documentation (push) Successful in 57s
Build Packages / build:rpm (ubuntu2204) (push) Successful in 11m31s
Build Packages / build:rpm (rocky9) (push) Successful in 12m45s
Build Packages / build:rpm (ubuntu2404) (push) Successful in 11m44s
Build Packages / DIALS test (push) Successful in 13m3s
Build Packages / Unit tests (push) Successful in 58m30s
Build Packages / build:windows:cuda (push) Successful in 20m21s
Build Packages / build:windows:nocuda (push) Successful in 10m30s
Adds an opt-in smooth absorption correction for rotation scaling. After the rot3d fulls are scaled, --absorption[=num] fits a multiplicative surface A(s1_crystal) - a degree<=4 monomial basis (real spherical harmonics up to l=4, as XDS/DIALS) of the diffracted-beam direction in the crystal/goniometer frame, by ridge-regularized log-linear least-squares of I_scaled/I_ref weighted by (I/sigma)^2, over num iterations (default 3); the surface divides image_scale_corr and the fulls are re-merged. Off by default and a no-op without rot3d. On the test panel (~13 keV, thin crystals) it is metric-neutral - fitted rms(log A) ~3-4%, ISa/CC1/2 unchanged - because absorption is negligible there and the per-frame scale G(phi) already absorbs the angular part. It is kept as a lever for low-energy data (e.g. 6 keV) where absorption becomes significant. Stored as ScalingSettings::absorption_iter. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
669 lines
33 KiB
C++
669 lines
33 KiB
C++
// SPDX-FileCopyrightText: 2026 Filip Leonarski, Paul Scherrer Institute <filip.leonarski@psi.ch>
|
|
// SPDX-License-Identifier: GPL-3.0-only
|
|
|
|
#include "JFJochProcess.h"
|
|
|
|
#include <algorithm>
|
|
#include <atomic>
|
|
#include <chrono>
|
|
#include <cmath>
|
|
#include <functional>
|
|
#include <future>
|
|
#include <numeric>
|
|
#include <set>
|
|
#include <sstream>
|
|
|
|
#include "../reader/JFJochHDF5Reader.h"
|
|
#include "../common/Logger.h"
|
|
#include "../common/AzimuthalIntegrationMapping.h"
|
|
#include "../common/AzimuthalIntegrationProfile.h"
|
|
#include "../common/CUDAWrapper.h"
|
|
#include "../common/time_utc.h"
|
|
#include "../writer/FileWriter.h"
|
|
#include "../image_analysis/MXAnalysisWithoutFPGA.h"
|
|
#include "../image_analysis/IndexAndRefine.h"
|
|
#include "../image_analysis/indexing/IndexerThreadPool.h"
|
|
#include "../image_analysis/azint/AzIntEngineCPU.h"
|
|
#include "../image_analysis/image_preprocessing/ImagePreprocessorCPU.h"
|
|
#include "../image_analysis/image_preprocessing/ImagePreprocessorBuffer.h"
|
|
#include "../image_analysis/scale_merge/Merge.h"
|
|
#include "../image_analysis/scale_merge/ScaleOnTheFly.h"
|
|
#include "../image_analysis/scale_merge/ScalingResult.h"
|
|
#include "../image_analysis/scale_merge/SearchSpaceGroup.h"
|
|
#include "../image_analysis/scale_merge/TwinningAnalysis.h"
|
|
#include "../image_analysis/scale_merge/Combine3D.h"
|
|
#include "../image_analysis/scale_merge/HKLKey.h"
|
|
#include "../image_analysis/WriteReflections.h"
|
|
#include <array>
|
|
#include <map>
|
|
#include <Eigen/Dense>
|
|
|
|
namespace {
|
|
// Pick up to requested_images ordinals spread evenly across [0, images_to_process) for the
|
|
// first pass of two-pass rotation indexing.
|
|
std::vector<int> select_equally_spaced_image_ordinals(int images_to_process, int requested_images) {
|
|
std::vector<int> ret;
|
|
if (images_to_process <= 0 || requested_images <= 0)
|
|
return ret;
|
|
|
|
const int n = std::min(images_to_process, requested_images);
|
|
if (n == 1) {
|
|
ret.push_back(0);
|
|
return ret;
|
|
}
|
|
|
|
std::set<int> unique_ordinals;
|
|
for (int i = 0; i < n; i++)
|
|
unique_ordinals.insert(static_cast<int>(
|
|
std::llround(static_cast<double>(i) * static_cast<double>(images_to_process - 1) /
|
|
static_cast<double>(n - 1))));
|
|
|
|
ret.assign(unique_ordinals.begin(), unique_ordinals.end());
|
|
return ret;
|
|
}
|
|
|
|
// XDS-order scaling. The rot3d combine emits fulls with partiality == 1 (image_scale_corr == 1),
|
|
// so they were only ever scaled as per-frame *partials* upstream - their per-frame scale is
|
|
// entangled with the rocking-curve/partiality model. This refits a per-frame scale directly on
|
|
// the complete reflections with the Unity model (no partiality term, G*Itrue - I_full), the way
|
|
// XDS/DIALS scale 3D-integrated fulls. A pure post-correction: it updates image_scale_corr on the
|
|
// fulls (1 -> 1/G) without re-combining.
|
|
void ScaleFulls(const DiffractionExperiment &experiment,
|
|
std::vector<IntegrationOutcome> &fulls, int scaling_iter,
|
|
size_t nthreads, Logger &logger) {
|
|
DiffractionExperiment unity = experiment;
|
|
ScalingSettings ss = unity.GetScalingSettings();
|
|
ss.SetPartialityModel(PartialityModel::Unity);
|
|
unity.ImportScalingSettings(ss);
|
|
|
|
for (int i = 0; i < scaling_iter; i++) {
|
|
const auto reference = MergeAll(unity, fulls, false);
|
|
ScaleOnTheFly(unity, reference).Scale(fulls, nthreads);
|
|
}
|
|
logger.Info("Scaled fulls (XDS order, Unity model)");
|
|
}
|
|
|
|
// Absorption surface. A smooth multiplicative correction A(s1_crystal) shared across the sweep,
|
|
// fit by ridge-regularized log-linear least-squares of I_scaled/I_ref against a degree<=4
|
|
// monomial basis of the diffracted-beam direction in the crystal (goniometer) frame - the same
|
|
// function space as real spherical harmonics up to l=4, as XDS/DIALS use. Applied by dividing
|
|
// image_scale_corr by A, then re-merged. Negligible at hard X-rays / thin crystals but matters
|
|
// at low energy (e.g. 6 keV).
|
|
void AbsorptionSurface(const DiffractionExperiment &exp,
|
|
std::vector<IntegrationOutcome> &fulls, int n_iter, Logger &logger) {
|
|
const auto gon_opt = exp.GetGoniometer();
|
|
if (!gon_opt) { logger.Warning("Absorption: no goniometer, skipping"); return; }
|
|
const auto &gon = *gon_opt;
|
|
const Coord axis = gon.GetAxis().Normalize();
|
|
const DiffractionGeometry geom = exp.GetDiffractionGeometry();
|
|
const HKLKeyGenerator keygen(exp.GetScalingSettings().GetMergeFriedel(),
|
|
exp.GetSpaceGroupNumber().value_or(1));
|
|
constexpr int MAXDEG = 4;
|
|
constexpr int NB = (MAXDEG + 1) * (MAXDEG + 2) * (MAXDEG + 3) / 6; // # monomials of degree 0..4
|
|
auto basis = [&](const Coord &s, std::array<double, NB> &b) {
|
|
int n = 0;
|
|
for (int deg = 0; deg <= MAXDEG; ++deg)
|
|
for (int i = deg; i >= 0; --i)
|
|
for (int j = deg - i; j >= 0; --j) {
|
|
const int kk = deg - i - j;
|
|
double v = 1.0;
|
|
for (int a = 0; a < i; ++a) v *= s.x;
|
|
for (int a = 0; a < j; ++a) v *= s.y;
|
|
for (int a = 0; a < kk; ++a) v *= s.z;
|
|
b[n++] = v;
|
|
}
|
|
};
|
|
auto s1_crystal = [&](const Reflection &r) {
|
|
const Coord s1lab = geom.LabCoord(r.predicted_x, r.predicted_y).Normalize();
|
|
const double phi = gon.GetAngle_deg(r.image_number) * 3.14159265358979323846 / 180.0;
|
|
return RotMatrix(static_cast<float>(-phi), axis) * s1lab;
|
|
};
|
|
|
|
for (int iter = 0; iter < n_iter; ++iter) {
|
|
const auto ref = MergeAll(exp, fulls, false);
|
|
std::map<HKLKey, float> refI;
|
|
for (const auto &m : ref) refI[keygen(m)] = m.I;
|
|
|
|
Eigen::MatrixXd AtA = Eigen::MatrixXd::Zero(NB, NB);
|
|
Eigen::VectorXd Atb = Eigen::VectorXd::Zero(NB);
|
|
std::array<double, NB> b{};
|
|
for (const auto &oc : fulls)
|
|
for (const auto &r : oc.reflections) {
|
|
if (!r.observed || !std::isfinite(r.image_scale_corr) || r.image_scale_corr <= 0.0f) continue;
|
|
const double Iscaled = static_cast<double>(r.I) * r.image_scale_corr;
|
|
const double sig = static_cast<double>(r.sigma) * r.image_scale_corr;
|
|
if (!(Iscaled > 0.0) || !(sig > 0.0)) continue;
|
|
const auto it = refI.find(keygen(r));
|
|
if (it == refI.end() || !(it->second > 0.0f)) continue;
|
|
basis(s1_crystal(r), b);
|
|
const double y = std::log(Iscaled / it->second);
|
|
const double w = std::min(100.0, (Iscaled / sig) * (Iscaled / sig)); // strong reflections define the surface
|
|
for (int a = 0; a < NB; ++a) {
|
|
Atb(a) += w * b[a] * y;
|
|
for (int c = 0; c < NB; ++c) AtA(a, c) += w * b[a] * b[c];
|
|
}
|
|
}
|
|
const double lambda = 1e-2 * AtA.diagonal().mean(); // ridge: x^2+y^2+z^2=1 makes the basis rank-deficient
|
|
for (int a = 0; a < NB; ++a) AtA(a, a) += lambda;
|
|
const Eigen::VectorXd c = AtA.ldlt().solve(Atb);
|
|
|
|
double sumsq = 0.0; size_t n_app = 0;
|
|
for (auto &oc : fulls)
|
|
for (auto &r : oc.reflections) {
|
|
if (!std::isfinite(r.image_scale_corr) || r.image_scale_corr <= 0.0f) continue;
|
|
basis(s1_crystal(r), b);
|
|
double surf = 0.0;
|
|
for (int a = 0; a < NB; ++a) surf += c(a) * b[a];
|
|
r.image_scale_corr = static_cast<float>(r.image_scale_corr / std::exp(surf));
|
|
sumsq += surf * surf; ++n_app;
|
|
}
|
|
logger.Info("Absorption surface iter {}/{}: {} fulls, rms(log A)={:.3f}",
|
|
iter + 1, n_iter, n_app, std::sqrt(sumsq / std::max<size_t>(n_app, 1)));
|
|
}
|
|
}
|
|
|
|
// Smooth the per-frame scale G across frames before the rot3d combine. ScaleOnTheFly fits each
|
|
// frame's G independently, so the few partials of one rocking event get inconsistent scales when
|
|
// weight-summed; a centered moving average of log(G) over a small odd frame window removes that
|
|
// jitter. Only G changed, so each reflection's image_scale_corr (proportional to 1/G) is rescaled
|
|
// by g_old/g_smooth. Frames without a fitted G (n<MIN_REFLECTIONS) are skipped on both sides.
|
|
void SmoothImageScaleG(std::vector<IntegrationOutcome> &outcomes, int window, Logger &logger) {
|
|
const int n = static_cast<int>(outcomes.size());
|
|
const int half = window / 2;
|
|
std::vector<double> g_smooth(n, NAN);
|
|
for (int o = 0; o < n; o++) {
|
|
double sum_log = 0.0;
|
|
int count = 0;
|
|
for (int j = std::max(0, o - half); j <= std::min(n - 1, o + half); j++) {
|
|
const auto &g = outcomes[j].image_scale_g;
|
|
if (g && std::isfinite(*g) && *g > 0.0) {
|
|
sum_log += std::log(*g);
|
|
count++;
|
|
}
|
|
}
|
|
if (count > 0)
|
|
g_smooth[o] = std::exp(sum_log / count);
|
|
}
|
|
size_t n_smoothed = 0;
|
|
for (int o = 0; o < n; o++) {
|
|
const auto &g_old = outcomes[o].image_scale_g;
|
|
if (!g_old || !std::isfinite(*g_old) || *g_old <= 0.0 || !std::isfinite(g_smooth[o]))
|
|
continue;
|
|
const double factor = *g_old / g_smooth[o];
|
|
for (auto &r : outcomes[o].reflections)
|
|
if (std::isfinite(r.image_scale_corr))
|
|
r.image_scale_corr = static_cast<float>(r.image_scale_corr * factor);
|
|
outcomes[o].image_scale_g = g_smooth[o];
|
|
n_smoothed++;
|
|
}
|
|
logger.Info("Smoothed per-frame scale G over a {}-frame window ({} of {} frames)",
|
|
window, n_smoothed, n);
|
|
}
|
|
}
|
|
|
|
JFJochProcess::JFJochProcess(JFJochHDF5Reader &reader, DiffractionExperiment experiment,
|
|
PixelMask pixel_mask, ProcessConfig config)
|
|
: reader_(reader), experiment_(std::move(experiment)),
|
|
pixel_mask_(std::move(pixel_mask)), config_(std::move(config)) {}
|
|
|
|
ProcessResult JFJochProcess::Run(JFJochProcessObserver *observer) {
|
|
Logger logger("JFJochProcess");
|
|
ProcessResult result;
|
|
|
|
const auto dataset = reader_.GetDataset();
|
|
if (!dataset)
|
|
throw JFJochException(JFJochExceptionCategory::InputParameterInvalid,
|
|
"No experiment dataset found in the input file");
|
|
|
|
if (config_.stride <= 0)
|
|
throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "Image stride must be positive");
|
|
|
|
const auto total_images_in_file = static_cast<int>(reader_.GetNumberOfImages());
|
|
int end_image = config_.end_image;
|
|
if (end_image < 0 || end_image > total_images_in_file)
|
|
end_image = total_images_in_file;
|
|
const int start_image = config_.start_image;
|
|
const int images_to_process = (end_image - start_image) / config_.stride;
|
|
if (images_to_process <= 0) {
|
|
logger.Warning("No images to process (start {}, end {}, stride {}, total {})",
|
|
start_image, end_image, config_.stride, total_images_in_file);
|
|
return result;
|
|
}
|
|
|
|
const bool full = (config_.mode == ProcessMode::FullAnalysis);
|
|
const bool write_files = !config_.output_prefix.empty();
|
|
|
|
// Output/runtime invariants. Algorithm settings (indexing, scaling, integration, polarization,
|
|
// space group, unit cell, ...) are configured on experiment_ by the caller.
|
|
experiment_.BitDepthImage(32).PixelSigned(true);
|
|
experiment_.Mode(DetectorMode::Standard);
|
|
experiment_.OverwriteExistingFiles(true);
|
|
experiment_.FilePrefix(config_.output_prefix.empty() ? "output" : config_.output_prefix);
|
|
experiment_.SetFileWriterFormat(FileWriterFormat::NXmxLegacy);
|
|
experiment_.ImagesPerTrigger(images_to_process);
|
|
experiment_.NumTriggers(1);
|
|
if (full)
|
|
experiment_.Compression(CompressionAlgorithm::BSHUF_LZ4);
|
|
|
|
// The pipeline indexes images 0..N-1 within this run; if we process a sub-range/strided
|
|
// selection, shift the goniometer so local index i maps to the angle of original image
|
|
// start+i*stride (keeping the per-image rotation wedge), otherwise rotation angles would be
|
|
// wrong for any start_image != 0.
|
|
if (const auto g = experiment_.GetGoniometer();
|
|
g.has_value() && (start_image != 0 || config_.stride != 1)) {
|
|
const float incr = g->GetIncrement_deg();
|
|
GoniometerAxis shifted(g->GetName(),
|
|
g->GetStart_deg() + incr * static_cast<float>(start_image),
|
|
incr * static_cast<float>(config_.stride),
|
|
g->GetAxis(), g->GetHelicalStep());
|
|
shifted.ScreeningWedge(g->GetScreeningWedge().value_or(incr));
|
|
experiment_.Goniometer(shifted);
|
|
}
|
|
|
|
AzimuthalIntegrationMapping mapping(experiment_, pixel_mask_);
|
|
|
|
JFJochReceiverPlots plots;
|
|
plots.Setup(experiment_, mapping);
|
|
|
|
// Output file (NXmxIntegrated master that links back to the original images).
|
|
StartMessage start_message;
|
|
experiment_.FillMessage(start_message);
|
|
start_message.arm_date = dataset->arm_date;
|
|
start_message.az_int_bin_to_q = mapping.GetBinToQ();
|
|
start_message.az_int_bin_to_two_theta = mapping.GetBinToTwoTheta();
|
|
start_message.az_int_q_bin_count = mapping.GetQBinCount();
|
|
start_message.az_int_phi_bin_count = mapping.GetAzimuthalBinCount();
|
|
if (mapping.GetAzimuthalBinCount() > 1)
|
|
start_message.az_int_bin_to_phi = mapping.GetBinToPhi();
|
|
start_message.pixel_mask["default"] = pixel_mask_.GetMask(experiment_);
|
|
if (full) {
|
|
start_message.rois = experiment_.ROI().ExportMetadata();
|
|
if (!experiment_.ROI().empty())
|
|
start_message.roi_map = experiment_.ExportROIMap();
|
|
start_message.max_spot_count = experiment_.GetMaxSpotCount();
|
|
}
|
|
start_message.master_suffix = "process";
|
|
start_message.file_format = FileWriterFormat::NXmxIntegrated;
|
|
start_message.write_master_file = true;
|
|
start_message.write_images = false;
|
|
start_message.hdf5_source_data = reader_.GetHDF5DataSource(start_image, images_to_process);
|
|
|
|
std::unique_ptr<FileWriter> writer;
|
|
if (write_files && config_.write_process_h5)
|
|
writer = std::make_unique<FileWriter>(start_message);
|
|
|
|
logger.Info("Processing {} images (range {}-{}, stride {}) using {} threads [{}]",
|
|
images_to_process, start_image, end_image, config_.stride, config_.nthreads,
|
|
full ? "full analysis" : "azimuthal integration");
|
|
if (observer)
|
|
observer->OnPhase(full ? "Full analysis" : "Azimuthal integration");
|
|
|
|
// Full-analysis shared engines.
|
|
std::unique_ptr<IndexerThreadPool> indexer_pool;
|
|
std::unique_ptr<IndexAndRefine> indexer;
|
|
if (full) {
|
|
indexer_pool = std::make_unique<IndexerThreadPool>(experiment_.GetIndexingSettings());
|
|
indexer = std::make_unique<IndexAndRefine>(experiment_, indexer_pool.get());
|
|
if (!config_.reference_data.empty())
|
|
indexer->ReferenceIntensities(config_.reference_data);
|
|
}
|
|
|
|
const auto start_time = std::chrono::steady_clock::now();
|
|
|
|
// First pass of two-pass rotation indexing (full analysis only).
|
|
if (full && config_.forced_rotation_lattice.has_value()) {
|
|
indexer->ForceRotationIndexerLattice(*config_.forced_rotation_lattice);
|
|
logger.Info("Rotation indexer lattice forced externally - skipping first pass");
|
|
} else if (full && config_.rotation_indexing && config_.two_pass_rotation) {
|
|
if (observer)
|
|
observer->OnPhase("Rotation indexing (first pass)");
|
|
const auto selected = select_equally_spaced_image_ordinals(images_to_process,
|
|
config_.rotation_indexing_image_count);
|
|
logger.Info("First-pass rotation indexing using {} images{}", selected.size(),
|
|
config_.reuse_rotation_spots ? " and stored spots" : "");
|
|
|
|
for (const int ordinal: selected) {
|
|
if (cancelled_) break;
|
|
const int image_idx = start_image + ordinal * config_.stride;
|
|
DataMessage msg{};
|
|
msg.number = ordinal; // index into the rotation indexer (0..images_to_process-1)
|
|
msg.original_number = image_idx;
|
|
try {
|
|
if (config_.reuse_rotation_spots) {
|
|
msg.spots = reader_.ReadSpots(image_idx);
|
|
} else {
|
|
auto img = reader_.GetRawImage(image_idx);
|
|
if (!img) continue;
|
|
MXAnalysisWithoutFPGA analysis(experiment_, mapping, pixel_mask_, *indexer);
|
|
AzimuthalIntegrationProfile profile(mapping);
|
|
auto first_pass = config_.spot_finding;
|
|
first_pass.indexing = false;
|
|
first_pass.quick_integration = false;
|
|
msg.image = img->image;
|
|
if (dataset->efficiency.size() > image_idx)
|
|
msg.image_collection_efficiency = dataset->efficiency[image_idx];
|
|
analysis.Analyze(msg, profile, first_pass);
|
|
}
|
|
indexer->AddImageToRotationIndexer(msg);
|
|
} catch (const std::exception &e) {
|
|
logger.Warning("First-pass rotation indexing failed for image {}: {}", image_idx, e.what());
|
|
}
|
|
}
|
|
|
|
if (!cancelled_ && !indexer->FinalizeRotationIndexing().has_value())
|
|
throw JFJochException(JFJochExceptionCategory::InputParameterInvalid,
|
|
"Two-pass rotation indexing failed");
|
|
if (!cancelled_)
|
|
logger.Info("Two-pass rotation indexing found lattice");
|
|
}
|
|
|
|
// Main per-image loop, spread over N worker threads pulling from a shared counter. HDF5 reads
|
|
// are serialized by the global hdf5_mutex; the analysis runs in parallel.
|
|
std::atomic<int> next_ordinal = 0;
|
|
std::atomic<int> finished_count = 0;
|
|
std::atomic<uint64_t> total_uncompressed_bytes = 0;
|
|
|
|
auto azint_worker = [&]() {
|
|
std::vector<uint8_t> decompression_buffer;
|
|
ImagePreprocessorCPU preprocessor(experiment_, pixel_mask_);
|
|
ImagePreprocessorBuffer buffer(experiment_.GetPixelsNum());
|
|
AzIntEngineCPU azint(mapping);
|
|
AzimuthalIntegrationProfile profile(mapping);
|
|
|
|
while (!cancelled_) {
|
|
const int ordinal = next_ordinal.fetch_add(1);
|
|
const int image_idx = start_image + ordinal * config_.stride;
|
|
if (image_idx >= end_image) break;
|
|
|
|
std::shared_ptr<JFJochReaderRawImage> img;
|
|
try {
|
|
img = reader_.GetRawImage(image_idx);
|
|
} catch (const std::exception &e) {
|
|
logger.Error("Failed to load image {}: {}", image_idx, e.what());
|
|
continue;
|
|
}
|
|
if (!img) continue;
|
|
|
|
DataMessage msg{};
|
|
msg.image = img->image;
|
|
msg.number = ordinal;
|
|
msg.original_number = image_idx;
|
|
if (dataset->efficiency.size() > image_idx)
|
|
msg.image_collection_efficiency = dataset->efficiency[image_idx];
|
|
total_uncompressed_bytes += msg.image.GetUncompressedSize();
|
|
|
|
const auto t0 = std::chrono::steady_clock::now();
|
|
try {
|
|
const uint8_t *image_ptr = msg.image.GetUncompressedPtr(decompression_buffer);
|
|
preprocessor.Analyze(buffer, image_ptr, msg.image.GetMode());
|
|
azint.Run(buffer, profile);
|
|
} catch (const std::exception &e) {
|
|
logger.Error("Error integrating image {}: {}", image_idx, e.what());
|
|
continue;
|
|
}
|
|
msg.azint_time_s = std::chrono::duration<float>(std::chrono::steady_clock::now() - t0).count();
|
|
msg.processing_time_s = msg.azint_time_s;
|
|
msg.az_int_profile = profile.GetResult();
|
|
msg.az_int_profile_count = profile.GetPixelCount();
|
|
msg.az_int_profile_std = profile.GetStd();
|
|
msg.bkg_estimate = profile.GetBkgEstimate(mapping.Settings());
|
|
msg.run_number = experiment_.GetRunNumber();
|
|
msg.run_name = experiment_.GetRunName();
|
|
|
|
plots.Add(msg, profile);
|
|
if (writer) writer->Write(msg);
|
|
if (observer) observer->OnImageProcessed(msg);
|
|
const int done = finished_count.fetch_add(1) + 1;
|
|
if (observer) observer->OnProgress(done, images_to_process);
|
|
}
|
|
};
|
|
|
|
auto full_worker = [&]() {
|
|
pin_gpu(); // round-robin per worker thread; must precede engine construction
|
|
MXAnalysisWithoutFPGA analysis(experiment_, mapping, pixel_mask_, *indexer);
|
|
AzimuthalIntegrationProfile profile(mapping);
|
|
|
|
while (!cancelled_) {
|
|
const int ordinal = next_ordinal.fetch_add(1);
|
|
const int image_idx = start_image + ordinal * config_.stride;
|
|
if (image_idx >= end_image) break;
|
|
|
|
std::shared_ptr<JFJochReaderRawImage> img;
|
|
try {
|
|
img = reader_.GetRawImage(image_idx);
|
|
} catch (const std::exception &e) {
|
|
logger.Error("Failed to load image {}: {}", image_idx, e.what());
|
|
continue;
|
|
}
|
|
if (!img) continue;
|
|
|
|
DataMessage msg{};
|
|
msg.image = img->image;
|
|
msg.number = ordinal;
|
|
msg.original_number = image_idx;
|
|
if (dataset->efficiency.size() > image_idx)
|
|
msg.image_collection_efficiency = dataset->efficiency[image_idx];
|
|
total_uncompressed_bytes += msg.image.GetUncompressedSize();
|
|
|
|
const auto t0 = std::chrono::steady_clock::now();
|
|
try {
|
|
analysis.Analyze(msg, profile, config_.spot_finding);
|
|
} catch (const std::exception &e) {
|
|
logger.Error("Error analyzing image {}: {}", image_idx, e.what());
|
|
continue;
|
|
}
|
|
msg.processing_time_s = std::chrono::duration<float>(std::chrono::steady_clock::now() - t0).count();
|
|
msg.run_number = experiment_.GetRunNumber();
|
|
msg.run_name = experiment_.GetRunName();
|
|
|
|
plots.Add(msg, profile);
|
|
if (writer) writer->Write(msg);
|
|
if (observer) observer->OnImageProcessed(msg);
|
|
const int done = finished_count.fetch_add(1) + 1;
|
|
if (observer) observer->OnProgress(done, images_to_process);
|
|
}
|
|
};
|
|
|
|
if (observer)
|
|
observer->OnPhase("Processing images");
|
|
|
|
std::function<void()> worker = full ? std::function<void()>(full_worker)
|
|
: std::function<void()>(azint_worker);
|
|
std::vector<std::future<void> > futures;
|
|
futures.reserve(config_.nthreads);
|
|
for (int i = 0; i < config_.nthreads; ++i)
|
|
futures.push_back(std::async(std::launch::async, worker));
|
|
for (auto &f: futures)
|
|
f.get();
|
|
|
|
const auto end_time = std::chrono::steady_clock::now();
|
|
result.cancelled = cancelled_;
|
|
result.images_processed = finished_count.load();
|
|
result.processing_time_s = std::chrono::duration<double>(end_time - start_time).count();
|
|
if (result.processing_time_s > 0.0) {
|
|
result.frame_rate_hz = static_cast<double>(result.images_processed) / result.processing_time_s;
|
|
result.throughput_MBs = static_cast<double>(total_uncompressed_bytes) / (result.processing_time_s * 1e6);
|
|
}
|
|
result.mean_processing_time = plots.GetMeanProcessingTime();
|
|
result.indexing_rate = plots.GetIndexingRate();
|
|
|
|
// End message (also written to the file).
|
|
EndMessage end_msg;
|
|
end_msg.max_image_number = result.images_processed;
|
|
end_msg.images_collected_count = result.images_processed;
|
|
end_msg.images_sent_to_write_count = result.images_processed;
|
|
end_msg.end_date = time_UTC(std::chrono::system_clock::now());
|
|
end_msg.run_number = experiment_.GetRunNumber();
|
|
end_msg.run_name = experiment_.GetRunName();
|
|
end_msg.bkg_estimate = plots.GetBkgEstimate();
|
|
end_msg.az_int_result["dataset"] = plots.GetAzIntProfile();
|
|
end_msg.indexing_rate = result.indexing_rate;
|
|
|
|
if (full && !cancelled_) {
|
|
if (const auto rot = indexer->FinalizeRotationIndexing(); rot.has_value()) {
|
|
end_msg.rotation_lattice = rot->lattice;
|
|
end_msg.rotation_lattice_type = LatticeMessage{
|
|
.centering = rot->search_result.centering,
|
|
.niggli_class = rot->search_result.niggli_class,
|
|
.crystal_system = rot->search_result.system
|
|
};
|
|
result.rotation_lattice_found = true;
|
|
}
|
|
result.consensus_cell = indexer->GetConsensusUnitCell();
|
|
end_msg.unit_cell = result.consensus_cell;
|
|
}
|
|
|
|
// Scaling and merging (full analysis only).
|
|
if (full && !cancelled_ && result.indexing_rate.has_value() && result.indexing_rate > 0
|
|
&& (config_.run_scaling || !config_.reference_data.empty())) {
|
|
// Scaling/merging is a long post-pass; report each sub-step as a phase so the GUI progress
|
|
// bar reflects what is happening instead of freezing on one label. Also time each phase
|
|
// (logged on transition) so the bottlenecks are visible.
|
|
auto t_phase = std::chrono::steady_clock::now();
|
|
std::string prev_phase;
|
|
auto phase = [&](const std::string &p) {
|
|
const auto now = std::chrono::steady_clock::now();
|
|
if (!prev_phase.empty())
|
|
logger.Info("[timing] {}: {:.2f} s", prev_phase,
|
|
std::chrono::duration<double>(now - t_phase).count());
|
|
t_phase = now;
|
|
prev_phase = p;
|
|
if (observer) observer->OnPhase(p);
|
|
};
|
|
phase("Scaling and merging");
|
|
|
|
// Scale the images and merge. Factored so it can run twice: first in P1 to give the
|
|
// space-group search a merged dataset, then again in the determined space group so the
|
|
// scaling sees symmetry equivalents and the final statistics are in the right symmetry.
|
|
// (With a user-fixed space group it simply runs once, already in that symmetry.)
|
|
struct ScaleMergeResult {
|
|
std::vector<MergedReflection> merged;
|
|
MergeStatistics statistics;
|
|
};
|
|
auto scale_and_merge = [&](const std::string &label) -> ScaleMergeResult {
|
|
// ScaleOnTheFly self-scaling is only for the no-reference path; with a reference each
|
|
// image is already scaled against it during the per-image pass, so we merge directly.
|
|
if (config_.reference_data.empty()) {
|
|
logger.Info("Running scaling ({}) ...", label);
|
|
for (int i = 0; i < config_.scaling_iter; i++) {
|
|
phase("Scaling images (" + label + ", iteration " + std::to_string(i + 1) + "/"
|
|
+ std::to_string(config_.scaling_iter) + ")");
|
|
auto merge_result = MergeAll(experiment_, indexer->GetIntegrationOutcome(), false);
|
|
indexer->ScaleAllImages(merge_result);
|
|
}
|
|
}
|
|
|
|
// -P rot3d: weight-sum each reflection's per-frame partials into one full before
|
|
// merging, so the error model sees counting statistics (high ISa) instead of
|
|
// rocking-curve slicing scatter.
|
|
const bool rot3d = experiment_.GetScalingSettings().GetCombine3D();
|
|
if (rot3d && experiment_.GetScalingSettings().GetSmoothGWindow() > 0) {
|
|
phase("Smoothing per-frame scale G");
|
|
SmoothImageScaleG(indexer->GetIntegrationOutcome(),
|
|
experiment_.GetScalingSettings().GetSmoothGWindow(), logger);
|
|
}
|
|
std::vector<IntegrationOutcome> combined;
|
|
if (rot3d) {
|
|
phase("Combining 3D partials");
|
|
combined = CombineRotationObservations(indexer->GetIntegrationOutcome(), experiment_, &logger,
|
|
config_.observation_dump_path);
|
|
}
|
|
if (rot3d && experiment_.GetScalingSettings().GetScaleFulls()) {
|
|
phase("Scaling fulls (XDS order)");
|
|
ScaleFulls(experiment_, combined, static_cast<int>(config_.scaling_iter), config_.nthreads, logger);
|
|
}
|
|
if (rot3d && experiment_.GetScalingSettings().GetAbsorptionIter() > 0) {
|
|
phase("Absorption surface");
|
|
AbsorptionSurface(experiment_, combined, experiment_.GetScalingSettings().GetAbsorptionIter(), logger);
|
|
}
|
|
const std::vector<IntegrationOutcome> &merge_input =
|
|
rot3d ? combined : indexer->GetIntegrationOutcome();
|
|
|
|
phase("Merging");
|
|
MergeOnTheFly merge_engine(experiment_);
|
|
if (result.consensus_cell.has_value())
|
|
merge_engine.ReferenceCell(*result.consensus_cell);
|
|
merge_engine.RefineErrorModel(merge_input);
|
|
if (merge_engine.ErrorModelActive())
|
|
logger.Info("Error model: a={:.3f} b={:.3f} ISa={:.1f} chi2={:.2f}", merge_engine.ErrorModelA(),
|
|
merge_engine.ErrorModelB(),
|
|
merge_engine.ErrorModelB() > 0 ? 1.0 / merge_engine.ErrorModelB() : 0.0,
|
|
merge_engine.ErrorModelChi2());
|
|
for (const auto &outcome: merge_input)
|
|
merge_engine.AddImage(outcome);
|
|
|
|
ScaleMergeResult out;
|
|
out.merged = merge_engine.ExportReflections();
|
|
phase("Computing statistics");
|
|
out.statistics = merge_engine.MergeStats(out.merged, merge_input, config_.reference_data);
|
|
result.error_model_isa = merge_engine.ErrorModelB() > 0 ? 1.0 / merge_engine.ErrorModelB() : 0.0;
|
|
logger.Info("Merge complete ({} unique reflections, {})", out.merged.size(), label);
|
|
return out;
|
|
};
|
|
|
|
// First pass: P1 when searching, or directly the user-fixed space group.
|
|
const auto initial_sg = experiment_.GetGemmiSpaceGroup();
|
|
auto sm = scale_and_merge(initial_sg ? initial_sg->short_name() : "P1");
|
|
|
|
std::ostringstream stats_text;
|
|
if (!experiment_.GetGemmiSpaceGroup().has_value()) {
|
|
SearchSpaceGroupOptions sg_opts;
|
|
sg_opts.merge_friedel = experiment_.GetScalingSettings().GetMergeFriedel();
|
|
sg_opts.d_min_limit_A = experiment_.GetScalingSettings().GetHighResolutionLimit_A().value_or(0.0);
|
|
// Constrain the search to subgroups of the lattice (metric) symmetry found by rotation
|
|
// indexing. Centering is not constrained here - it is determined from the absences.
|
|
if (end_msg.rotation_lattice_type.has_value())
|
|
sg_opts.lattice_system = end_msg.rotation_lattice_type->crystal_system;
|
|
const auto sg_search = SearchSpaceGroup(sm.merged, sg_opts);
|
|
stats_text << SearchSpaceGroupResultToText(sg_search) << "\n\n";
|
|
|
|
// Adopt the determined space group and re-scale/merge in it, so scaling uses symmetry
|
|
// equivalents and the statistics come out in the right symmetry. P1 stands when nothing
|
|
// is determined.
|
|
if (sg_search.best_space_group.has_value()) {
|
|
const auto &sg = *sg_search.best_space_group;
|
|
logger.Info("Adopting space group {} (number {})", sg.short_name(), sg.number);
|
|
experiment_.SpaceGroupNumber(sg.number);
|
|
end_msg.space_group_number = sg.number;
|
|
result.space_group_number = sg.number;
|
|
phase("Re-scaling in space group " + sg.short_name());
|
|
sm = scale_and_merge(sg.short_name());
|
|
}
|
|
}
|
|
|
|
const auto twin_sg_number = experiment_.GetSpaceGroupNumber();
|
|
const gemmi::SpaceGroup *twin_sg = twin_sg_number
|
|
? gemmi::find_spacegroup_by_number(twin_sg_number.value()) : nullptr;
|
|
result.twinning = AnalyzeTwinning(sm.merged, twin_sg);
|
|
stats_text << TwinningAnalysisToText(result.twinning) << "\n";
|
|
stats_text << sm.statistics;
|
|
result.merge_statistics_text = stats_text.str();
|
|
result.has_merge_statistics = true;
|
|
result.merge_statistics = sm.statistics;
|
|
result.has_reference = !config_.reference_data.empty();
|
|
|
|
if (result.consensus_cell && write_files) {
|
|
phase("Writing reflections");
|
|
WriteReflections(sm.merged, *result.consensus_cell, experiment_, config_.output_prefix);
|
|
// Per-image scaling table (G, B-factor, mosaicity, wedge, CC) for inspection / XDS
|
|
// comparison. The offline self-scaling result is otherwise not exposed (process.h5's
|
|
// per-image arrays are only filled on the online per-image path). Sourced from the
|
|
// partials, which carry the first-pass per-image scale.
|
|
ScalingResult(indexer->GetIntegrationOutcome()).SaveToFile(config_.output_prefix);
|
|
}
|
|
phase(""); // flush the last phase's timing
|
|
}
|
|
|
|
if (writer) {
|
|
writer->WriteHDF5(end_msg);
|
|
writer->Finalize();
|
|
result.written_master_path = config_.output_prefix + "_process.h5";
|
|
}
|
|
|
|
if (observer)
|
|
observer->OnPhase(cancelled_ ? "Cancelled" : "Done");
|
|
logger.Info("{} {} images in {:.2f} s ({:.2f} Hz)", cancelled_ ? "Cancelled after" : "Processed",
|
|
result.images_processed, result.processing_time_s, result.frame_rate_hz);
|
|
return result;
|
|
}
|