// SPDX-FileCopyrightText: 2026 Filip Leonarski, Paul Scherrer Institute // SPDX-License-Identifier: GPL-3.0-only #include "RotationScaleMerge.h" #include #include #include #include #include #include #include #include #include #include #include "HKLKey.h" #include "../../common/CorrelationCoefficient.h" #include "../../common/CrystalLattice.h" #include "../../common/Definitions.h" #include "../../common/JFJochException.h" #include "../../common/ResolutionShells.h" namespace { // These mirror the classic path (ScaleOnTheFly / Merge / Combine3D) verbatim so this flat // implementation is numerically identical - see the comments there for the physics. constexpr size_t MIN_REFLECTIONS = 20; // per-frame scale needs at least this many constexpr double SCALE_ROBUST_K = 3.0; // Cauchy loss scale (sigma units) for the per-frame G fit constexpr float MAX_FRAME_GAP = 2.0f; // a rocking event is a run of frames no more apart than this constexpr double CHI2_1_MEDIAN = 0.454936; double SafeInv(double x, double fallback) { if (!std::isfinite(x) || x == 0.0) return fallback; return 1.0 / x; } // Kabsch rotation partiality: the fraction of a reflection recorded in the sampled slice, from the // erf of the rocking angle relative to the mosaic width. Identical to ScaleOnTheFly's RotationPartiality // (and the predictor's), so recomputing here just swaps in the smoothed mosaicity. float RotationPartiality(double delta_phi_deg, double zeta, double mosaicity_deg, double wedge_deg) { const double half_wedge = wedge_deg / 2.0; const double c1 = zeta / std::sqrt(2.0); const double arg_plus = (delta_phi_deg + half_wedge) * c1 / mosaicity_deg; const double arg_minus = (delta_phi_deg - half_wedge) * c1 / mosaicity_deg; return static_cast((std::erf(arg_plus) - std::erf(arg_minus)) / 2.0); } // Deterministic CC1/2 half from the frame's stable index (splitmix64), matching Merge.cpp. int HalfForImage(int64_t image_id) { uint64_t z = static_cast(image_id) + 0x9e3779b97f4a7c15ULL; z = (z ^ (z >> 30)) * 0xbf58476d1ce4e5b9ULL; z = (z ^ (z >> 27)) * 0x94d049bb133111ebULL; z = z ^ (z >> 31); return static_cast(z & 1ULL); } struct ScaleObs { double coeff, Iobs, weight; }; // Robust per-frame scale (linear in G): the exact objective ScaleOnTheFly::SolveScaleIRLS solves. double SolveScaleIRLS(const std::vector &obs, double robust_k) { auto weighted_scale = [&obs](auto robust_weight) { double num = 0.0, den = 0.0; for (const auto &o : obs) { const double rw = robust_weight(o); const double w2 = o.weight * o.weight; num += rw * w2 * o.coeff * o.Iobs; den += rw * w2 * o.coeff * o.coeff; } return den > 0.0 ? num / den : NAN; }; double G = weighted_scale([](const ScaleObs &) { return 1.0; }); if (!std::isfinite(G)) return 1.0; G = std::max(0.0, G); const double k2 = robust_k * robust_k; for (int iter = 0; iter < 30; ++iter) { const double G_prev = G; const double G_next = weighted_scale([&](const ScaleObs &o) { const double res = o.weight * (G * o.coeff - o.Iobs); return 1.0 / (1.0 + res * res / k2); }); if (!std::isfinite(G_next)) break; G = std::max(0.0, G_next); if (std::abs(G - G_prev) <= 1e-7 * std::max(G, 1.0)) break; } return G; } // Run fn(i) for i in [0, n) over `nthreads` workers pulling from a shared atomic counter - the same // self-load-balancing pattern the rest of the codebase uses (heavy frames don't stall light ones). // Work-stealing per-item parallel: one atomic fetch per item. Use ONLY when the per-item work is // heavy and uneven (e.g. per-frame fits) - the atomic amortises. For millions of tiny uniform items // use ParallelChunks instead; a per-item atomic there is pure contention. template void ParallelFor(int n, size_t nthreads, Fn fn) { if (n <= 0) return; if (nthreads <= 1 || n == 1) { for (int i = 0; i < n; ++i) fn(i); return; } const size_t local = std::min(nthreads, static_cast(n)); std::atomic next = 0; std::vector> futures; futures.reserve(local); for (size_t t = 0; t < local; ++t) futures.emplace_back(std::async(std::launch::async, [&] { for (int i = next.fetch_add(1); i < n; i = next.fetch_add(1)) fn(i); })); for (auto &f : futures) f.get(); } // Chunked parallel: each worker gets one contiguous [lo, hi) range, no per-item synchronisation. // Right for millions of cheap uniform items (the CPU stand-in for a flat CUDA grid-stride kernel). template void ParallelChunks(int n, size_t nthreads, Fn fn) { if (n <= 0) return; const int nt = static_cast(std::max(1, std::min(nthreads, static_cast(n)))); if (nt == 1) { fn(0, n); return; } const int chunk = (n + nt - 1) / nt; std::vector> futures; futures.reserve(nt); for (int t = 0; t < nt; ++t) { const int lo = t * chunk, hi = std::min(n, lo + chunk); if (lo >= hi) break; futures.emplace_back(std::async(std::launch::async, [&fn, lo, hi] { fn(lo, hi); })); } for (auto &f : futures) f.get(); } double median_of(std::vector &v) { std::nth_element(v.begin(), v.begin() + v.size() / 2, v.end()); return v[v.size() / 2]; } } RotationScaleMerge::RotationScaleMerge(const DiffractionExperiment &experiment, std::vector &partial_outcomes, std::optional reference_cell, int scaling_iterations, float ice_ring_half_width_q, size_t nthreads, Logger &logger, std::string observation_dump_path) : x(experiment), partials_out(partial_outcomes), reference_cell(std::move(reference_cell)), nthreads(nthreads == 0 ? std::thread::hardware_concurrency() : nthreads), logger(logger), observation_dump_path(std::move(observation_dump_path)) { const auto s = x.GetScalingSettings(); min_partiality = s.GetMinPartiality(); d_min_limit = s.GetHighResolutionLimit_A(); merge_friedel = s.GetMergeFriedel(); capture_uncertainty_coeff = s.GetCaptureUncertaintyCoeff(); reject_nsigma = s.GetOutlierRejectNsigma(); reject_outliers = reject_nsigma > 0.0; rfree_fraction = s.GetRfreeFraction(); scale_fulls = s.GetScaleFulls(); scaling_iter = std::max(1, scaling_iterations); if (const auto forced = s.GetForcedMosaicity(); forced.has_value()) mosaicity_deg = *forced; else mosaicity_deg = s.GetDefaultMosaicity(); ice_half_width_q = ice_ring_half_width_q; } void RotationScaleMerge::Ingest() { n_frames = static_cast(partials_out.size()); size_t total = 0; for (const auto &o : partials_out) total += o.reflections.size(); partials.clear(); partials.reserve(total); frame_start.assign(n_frames, 0); frame_count.assign(n_frames, 0); frame_cell_ok.assign(n_frames, 1); g_partial.assign(n_frames, 1.0); const float dist_tol = x.GetIndexingSettings().GetUnitCellDistTolerance(); const float ang_tol = x.GetIndexingSettings().GetUnitCellAngleTolerance_deg(); for (int o = 0; o < n_frames; ++o) { frame_start[o] = static_cast(partials.size()); if (reference_cell) { const auto cell = partials_out[o].latt.GetUnitCell(); frame_cell_ok[o] = cell.is_close(*reference_cell, dist_tol, ang_tol) ? 1 : 0; } for (const auto &r : partials_out[o].reflections) { Obs obs{}; obs.h = r.h; obs.k = r.k; obs.l = r.l; obs.I = r.I; obs.sigma = r.sigma; obs.d = r.d; obs.rlp = r.rlp; obs.partiality = r.partiality; obs.zeta = r.zeta; obs.delta_phi = r.delta_phi_deg; obs.bkg = r.bkg; obs.image_number = r.image_number; obs.frame = o; obs.on_ice = r.on_ice_ring ? 1 : 0; obs.corr = r.image_scale_corr; obs.group = -1; partials.push_back(obs); } frame_count[o] = static_cast(partials.size()) - frame_start[o]; } // Sort ONCE by (raw h,k,l, image_number) and split into raw-hkl runs. This is the one expensive sort; // both the 3D combine (event split) and the per-space-group ASU grouping reuse this order. perm.resize(partials.size()); for (int i = 0; i < static_cast(partials.size()); ++i) perm[i] = i; std::sort(perm.begin(), perm.end(), [&](int32_t a, int32_t b) { const auto &x1 = partials[a]; const auto &y1 = partials[b]; if (x1.h != y1.h) return x1.h < y1.h; if (x1.k != y1.k) return x1.k < y1.k; if (x1.l != y1.l) return x1.l < y1.l; return x1.image_number < y1.image_number; }); rawrun_start.clear(); rawrun_count.clear(); rawrun_h.clear(); rawrun_k.clear(); rawrun_l.clear(); rawrun_d.clear(); for (int i = 0; i < static_cast(perm.size()); ) { const auto &o0 = partials[perm[i]]; int j = i; float d = NAN; while (j < static_cast(perm.size())) { const auto &o = partials[perm[j]]; if (o.h != o0.h || o.k != o0.k || o.l != o0.l) break; if (!std::isfinite(d) && std::isfinite(o.d) && o.d > 0.0f) d = o.d; ++j; } rawrun_start.push_back(i); rawrun_count.push_back(j - i); rawrun_h.push_back(o0.h); rawrun_k.push_back(o0.k); rawrun_l.push_back(o0.l); rawrun_d.push_back(d); i = j; } rawrun_group.assign(rawrun_start.size(), -1); logger.Info("RotationScaleMerge: ingested {} partial observations from {} frames ({} distinct hkl)", total, n_frames, rawrun_start.size()); SmoothMosaicityAndPartiality(); } void RotationScaleMerge::SmoothMosaicityAndPartiality() { // Raw per-frame mosaicity as measured at integration (image-local, deterministic). std::vector mos_raw(n_frames, NAN); for (int o = 0; o < n_frames; ++o) { const auto &m = partials_out[o].mosaicity_deg; if (m && std::isfinite(*m) && *m > 0.0f) mos_raw[o] = *m; } // Frame-order moving average with the same window as smooth-G (a rotation range -> frame count). // With smoothing off, fall back to the per-frame value (still deterministic, just unsmoothed). const auto ss = x.GetScalingSettings(); const double smooth_deg = ss.GetSmoothGDegrees(); const auto gon = x.GetGoniometer(); const double osc = gon ? std::fabs(gon->GetIncrement_deg()) : 0.0; mos_smooth.assign(n_frames, NAN); if (smooth_deg > 0.0 && osc > 1e-6) { int window = std::max(1, static_cast(std::lround(smooth_deg / osc))); if (window % 2 == 0) ++window; const int half = window / 2; for (int o = 0; o < n_frames; ++o) { double sum = 0.0; int cnt = 0; for (int j = std::max(0, o - half); j <= std::min(n_frames - 1, o + half); ++j) if (std::isfinite(mos_raw[j])) { sum += mos_raw[j]; ++cnt; } if (cnt > 0) mos_smooth[o] = static_cast(sum / cnt); } } else { for (int o = 0; o < n_frames; ++o) mos_smooth[o] = static_cast(mos_raw[o]); } // Recompute each partial's partiality from the smoothed mosaicity (same wedge the predictor used). // Frames without a mosaicity keep the stored partiality. const double wedge = gon ? std::fabs(gon->GetWedge_deg()) : 0.0; ParallelChunks(static_cast(partials.size()), nthreads, [&](int lo, int hi) { for (int i = lo; i < hi; ++i) { auto &o = partials[i]; const float mos = mos_smooth[o.frame]; if (std::isfinite(mos) && mos > 1e-6f && std::isfinite(o.zeta) && o.zeta > 0.0f && std::isfinite(o.delta_phi)) o.partiality = RotationPartiality(o.delta_phi, o.zeta, mos, wedge); } }); logger.Info("Recomputed partiality from frame-order-smoothed mosaicity"); } int RotationScaleMerge::ComputeAsuGroups(const HKLKeyGenerator &keygen) { // One ASU reduction per distinct raw hkl (not per observation): a raw hkl is eligible if it is not // systematically absent and its resolution is in range. Group the eligible raw hkls by ASU key // (sort the ~#distinct-hkl keys, not the millions of observations), then hand out dense ids. const int n_run = static_cast(rawrun_start.size()); std::vector key(n_run); std::vector eligible(n_run, 0); // The gemmi ASU reduction / absence test per raw hkl is the cost here and is independent per run // (HKLKeyGenerator is const, so concurrent reads are safe) - compute keys in parallel chunks. ParallelChunks(n_run, nthreads, [&](int lo, int hi) { for (int r = lo; r < hi; ++r) { rawrun_group[r] = -1; if (keygen.IsSystematicallyAbsent(rawrun_h[r], rawrun_k[r], rawrun_l[r])) continue; const float d = rawrun_d[r]; // resolution is a per-raw-hkl property (all its partials share d) if (!std::isfinite(d) || d <= 0.0f) continue; if (d_min_limit && d < *d_min_limit) continue; key[r] = keygen(rawrun_h[r], rawrun_k[r], rawrun_l[r]).pack(); eligible[r] = 1; } }); std::vector idx; idx.reserve(n_run); for (int r = 0; r < n_run; ++r) if (eligible[r]) idx.push_back(r); std::sort(idx.begin(), idx.end(), [&](int32_t a, int32_t b) { return key[a] < key[b]; }); group_h.clear(); group_k.clear(); group_l.clear(); int n_groups = 0; for (size_t j = 0; j < idx.size(); ++j) { if (j == 0 || key[idx[j]] != key[idx[j - 1]]) { const int r = idx[j]; const auto hkl = keygen(rawrun_h[r], rawrun_k[r], rawrun_l[r]); group_h.push_back(hkl.plus ? hkl.h : -hkl.h); group_k.push_back(hkl.plus ? hkl.k : -hkl.k); group_l.push_back(hkl.plus ? hkl.l : -hkl.l); ++n_groups; } rawrun_group[idx[j]] = n_groups - 1; } // Stamp each partial's group from its raw hkl, keeping the exact per-observation AcceptReflection // finiteness (finite I, finite rlp != 0, finite sigma > 0) that grouping by raw hkl alone would miss. // Parallel over raw-hkl runs: distinct runs own disjoint perm ranges, hence disjoint observations. ParallelChunks(n_run, nthreads, [&](int rlo, int rhi) { for (int r = rlo; r < rhi; ++r) { const int g = rawrun_group[r]; const int lo = rawrun_start[r], hi = rawrun_start[r] + rawrun_count[r]; for (int p = lo; p < hi; ++p) { auto &o = partials[perm[p]]; o.group = (g >= 0 && std::isfinite(o.I) && std::isfinite(o.rlp) && o.rlp != 0.0f && std::isfinite(o.sigma) && o.sigma > 0.0f) ? g : -1; } } }); return n_groups; } void RotationScaleMerge::ReduceGroupMeans(const std::vector &obs, int n_groups, bool exclude_ice, const std::vector &masked, std::vector &out_mean) const { // Inverse-variance per-group mean of I*corr = the merge reference (a segmented reduction over the // groups; the CPU stand-in for a CUDA reduce_by_key). No cell mask here: the scaling reference // (MergeAll) is built without a reference cell - only the final merge applies it. std::vector sw(n_groups, 0.0), swI(n_groups, 0.0); for (const auto &o : obs) { if (o.group < 0) continue; if (!(o.corr > 0.0f) || !std::isfinite(o.corr)) continue; if (exclude_ice && o.on_ice) continue; if (!masked.empty()) { const int ring = IceRingIndex(o.d, ice_half_width_q); if (ring >= 0 && ring < static_cast(masked.size()) && masked[ring]) continue; } if (o.partiality < min_partiality) continue; const float I_corr = o.I * o.corr; const float sigma_corr = o.sigma * o.corr; if (!std::isfinite(I_corr) || !std::isfinite(sigma_corr) || sigma_corr <= 0.0f) continue; const double w = 1.0 / (static_cast(sigma_corr) * sigma_corr); sw[o.group] += w; swI[o.group] += w * I_corr; } out_mean.assign(n_groups, NAN); for (int g = 0; g < n_groups; ++g) if (sw[g] > 0.0) out_mean[g] = swI[g] / sw[g]; } void RotationScaleMerge::FitPerFrameG(std::vector &obs, const std::vector &fstart, const std::vector &fcount, const std::vector &group_mean_in, bool unity, std::vector &g) { std::vector scaled(fstart.size(), 0); ParallelFor(static_cast(fstart.size()), nthreads, [&](int f) { std::vector so; so.reserve(fcount[f]); const int lo = fstart[f], hi = fstart[f] + fcount[f]; for (int i = lo; i < hi; ++i) { const auto &o = obs[i]; if (o.group < 0) continue; if (o.on_ice) continue; const double mean = group_mean_in[o.group]; if (!std::isfinite(mean)) continue; double coeff; if (unity) { coeff = mean; // partiality already folded into the full, rlp = 1 } else { if (!(std::isfinite(o.zeta) && o.zeta > 0.0f)) continue; // Rotation model needs zeta > 0 coeff = o.partiality * SafeInv(o.rlp, 1.0) * mean; } so.push_back({coeff, static_cast(o.I), SafeInv(o.sigma, 1.0)}); } if (so.size() < MIN_REFLECTIONS) return; // leave g[f]/corr untouched (as ScaleOnTheFly does) g[f] = SolveScaleIRLS(so, SCALE_ROBUST_K); scaled[f] = 1; }); // Remember which frames were fitted this call (so the caller updates corr only there). frame_scaled_scratch = std::move(scaled); } void RotationScaleMerge::UpdateCorr(std::vector &obs, const std::vector &g, const std::vector &frame_scaled) const { ParallelChunks(static_cast(obs.size()), nthreads, [&](int lo, int hi) { for (int i = lo; i < hi; ++i) { auto &o = obs[i]; if (!frame_scaled[o.frame]) continue; const double denom = static_cast(o.partiality) * g[o.frame]; // B_term = 1 (no B refine) if (std::isfinite(o.rlp) && std::isfinite(denom) && denom > 0.0) o.corr = static_cast(o.rlp / denom); else o.corr = NAN; } }); } void RotationScaleMerge::SmoothG(std::vector &obs, std::vector &g, int window) const { const int n = static_cast(g.size()); const int half = window / 2; std::vector g_smooth(n, NAN); for (int o = 0; o < n; ++o) { double sum_log = 0.0; int count = 0; for (int j = std::max(0, o - half); j <= std::min(n - 1, o + half); ++j) { if (frame_scaled_scratch[j] && std::isfinite(g[j]) && g[j] > 0.0) { sum_log += std::log(g[j]); ++count; } } if (count > 0) g_smooth[o] = std::exp(sum_log / count); } for (auto &o : obs) { const int f = o.frame; if (!frame_scaled_scratch[f] || !std::isfinite(g[f]) || g[f] <= 0.0 || !std::isfinite(g_smooth[f])) continue; if (std::isfinite(o.corr)) o.corr = static_cast(o.corr * (g[f] / g_smooth[f])); } for (int f = 0; f < n; ++f) if (frame_scaled_scratch[f] && std::isfinite(g[f]) && g[f] > 0.0 && std::isfinite(g_smooth[f])) g[f] = g_smooth[f]; } void RotationScaleMerge::Combine() { fulls.clear(); g_full.assign(n_frames, 1.0); // Combine one raw-hkl run into fulls, appended to `out`. Independent per run (the events of one hkl // touch no shared state), so runs parallelise cleanly. Returns the number of usable partials seen. // `dump` (serial path only) writes each emitted full for the diagnostic observation dump. auto process_rawrun = [&](int r, std::vector &out, std::ofstream *dump) -> size_t { const int lo = rawrun_start[r], hi = rawrun_start[r] + rawrun_count[r]; std::vector ev; // usable perm-indices of this raw hkl, in image-number order ev.reserve(hi - lo); for (int p = lo; p < hi; ++p) { const auto &o = partials[perm[p]]; if (!std::isfinite(o.corr) || o.corr <= 0.0f) continue; if (!std::isfinite(o.I) || !std::isfinite(o.sigma) || o.sigma <= 0.0f) continue; ev.push_back(perm[p]); } const int group = rawrun_group[r]; size_t i = 0; while (i < ev.size()) { size_t kk = i + 1; float last_frame = partials[ev[i]].image_number; while (kk < ev.size()) { const float frame = partials[ev[kk]].image_number; if (frame - last_frame > MAX_FRAME_GAP) break; last_frame = frame; ++kk; } double pooled_bkg = 0.0; int n_pool = 0; for (size_t m = i; m < kk; ++m) { const float b = partials[ev[m]].bkg; if (std::isfinite(b)) { pooled_bkg += b; ++n_pool; } } pooled_bkg = n_pool > 0 ? pooled_bkg / n_pool : 0.0; auto pooled_I = [&](const Obs &r2) { const double n_bkg = std::max(0.0, static_cast(r2.sigma) * r2.sigma - r2.I) / std::max(r2.bkg, 1.0f); return static_cast(r2.I) + n_bkg * (static_cast(r2.bkg) - pooled_bkg); }; double sum_w = 0.0, sum_wI = 0.0, sum_partiality = 0.0; float d = NAN; int peak_outcome = partials[ev[i]].frame; float peak_frame = partials[ev[i]].image_number; float peak_partiality = -1.0f; const bool on_ice = partials[ev[i]].on_ice; for (size_t m = i; m < kk; ++m) { const auto &r2 = partials[ev[m]]; const double sigma_corr = static_cast(r2.sigma) * r2.corr; const double w = 1.0 / (sigma_corr * sigma_corr); sum_w += w; sum_wI += w * pooled_I(r2) * r2.corr; sum_partiality += r2.partiality; if (r2.partiality > peak_partiality) { peak_partiality = r2.partiality; peak_outcome = r2.frame; peak_frame = r2.image_number; } if (!std::isfinite(d) && std::isfinite(r2.d) && r2.d > 0.0f) d = r2.d; } double F = sum_wI / sum_w; for (int iter = 0; iter < 3; ++iter) { sum_w = 0.0; sum_wI = 0.0; for (size_t m = i; m < kk; ++m) { const auto &r2 = partials[ev[m]]; const double corr = r2.corr; const double I_corr = pooled_I(r2) * corr; const double sigma_corr = static_cast(r2.sigma) * corr; const double bkg_var = sigma_corr * sigma_corr - corr * I_corr; double var = std::max(0.0, bkg_var) + corr * std::max(0.0, F); if (!(var > 0.0)) var = sigma_corr * sigma_corr; const double w = 1.0 / var; sum_w += w; sum_wI += w * I_corr; } F = sum_wI / sum_w; } const int n_frames_event = static_cast(kk - i); i = kk; if (sum_w <= 0.0 || sum_partiality < min_partiality) continue; double sigma_full = 1.0 / std::sqrt(sum_w); if (capture_uncertainty_coeff > 0.0) { const double frac = std::min(1.0, sum_partiality); const double extra = capture_uncertainty_coeff * (1.0 - frac) * std::max(0.0, F); sigma_full = std::sqrt(sigma_full * sigma_full + extra * extra); } Obs full{}; full.h = rawrun_h[r]; full.k = rawrun_k[r]; full.l = rawrun_l[r]; full.I = static_cast(F); full.sigma = static_cast(sigma_full); full.d = d; full.rlp = 1.0f; full.partiality = 1.0f; full.corr = 1.0f; full.image_number = peak_frame; full.frame = peak_outcome; full.on_ice = on_ice ? 1 : 0; full.group = group; // the raw hkl's ASU group for the current space group (<0 = absent) out.push_back(full); if (dump != nullptr) *dump << full.h << ' ' << full.k << ' ' << full.l << ' ' << full.I << ' ' << full.sigma << ' ' << d << ' ' << n_frames_event << ' ' << sum_partiality << ' ' << static_cast(peak_frame) << '\n'; } return ev.size(); }; const int n_run = static_cast(rawrun_start.size()); size_t n_used = 0; if (!observation_dump_path.empty() || nthreads <= 1) { // Serial (diagnostic dump needs a single writer). std::ofstream dump; if (!observation_dump_path.empty()) { dump.open(observation_dump_path); dump << "# h k l I sigma d n_frames captured_fraction\n"; } fulls.reserve(n_run); for (int r = 0; r < n_run; ++r) n_used += process_rawrun(r, fulls, dump.is_open() ? &dump : nullptr); } else { // Parallel over contiguous rawrun chunks; concatenate the per-thread fulls in run order so the // result is deterministic. const int nt = static_cast(std::min(nthreads, static_cast(n_run))); const int chunk = (n_run + nt - 1) / nt; std::vector> part(nt); std::vector used(nt, 0); std::vector> futures; futures.reserve(nt); for (int t = 0; t < nt; ++t) { const int r0 = t * chunk, r1 = std::min(n_run, r0 + chunk); if (r0 >= r1) break; futures.emplace_back(std::async(std::launch::async, [&, t, r0, r1] { part[t].reserve(r1 - r0); for (int r = r0; r < r1; ++r) used[t] += process_rawrun(r, part[t], nullptr); })); } for (auto &f : futures) f.get(); size_t total = 0; for (int t = 0; t < nt; ++t) { total += part[t].size(); n_used += used[t]; } fulls.reserve(total); for (int t = 0; t < nt; ++t) fulls.insert(fulls.end(), part[t].begin(), part[t].end()); } // Sort the fulls by their (peak) frame and build per-frame CSR ranges, so the scale-fulls step can // fit a per-frame G by slicing contiguous ranges (the same layout the partials use). std::sort(fulls.begin(), fulls.end(), [](const Obs &a, const Obs &b) { return a.frame < b.frame; }); fulls_frame_start.assign(n_frames, 0); fulls_frame_count.assign(n_frames, 0); for (int i = 0; i < static_cast(fulls.size()); ) { const int f = fulls[i].frame; fulls_frame_start[f] = i; int j = i; while (j < static_cast(fulls.size()) && fulls[j].frame == f) ++j; fulls_frame_count[f] = j - i; i = j; } logger.Info("3D combine: {} fulls from {} partials", fulls.size(), n_used); } void RotationScaleMerge::FinalizePerFrameScale(int n_groups, const std::vector &partial_group_mean, const std::vector &frame_scaled) { // Per-frame CC vs the merged reference (CalculateGlobalCC), computed once now (not every iteration). std::vector cc(n_frames, NAN); std::vector cc_n(n_frames, 0); ParallelFor(n_frames, nthreads, [&](int f) { double sx = 0, sy = 0, sx2 = 0, sy2 = 0, sxy = 0; size_t n = 0; const int lo = frame_start[f], hi = frame_start[f] + frame_count[f]; for (int i = lo; i < hi; ++i) { const auto &o = partials[i]; if (o.on_ice) continue; if (o.group < 0) continue; if (o.partiality < min_partiality) continue; if (!std::isfinite(o.I) || !std::isfinite(o.corr) || o.corr <= 0.0f) continue; if (!std::isfinite(o.sigma) || o.sigma <= 0.0f) continue; const double mean = partial_group_mean[o.group]; if (!std::isfinite(mean)) continue; const double img = static_cast(o.I) * o.corr; sx += img; sy += mean; sx2 += img * img; sy2 += mean * mean; sxy += img * mean; ++n; } if (n < MIN_REFLECTIONS) return; const double nd = static_cast(n); const double cov = sxy - sx * sy / nd; const double vx = sx2 - sx * sx / nd; const double vy = sy2 - sy * sy / nd; if (vx > 0.0 && vy > 0.0) { cc[f] = cov / std::sqrt(vx * vy); cc_n[f] = static_cast(n); } }); for (int f = 0; f < n_frames; ++f) { auto &o = partials_out[f]; if (frame_scaled[f]) { o.image_scale_g = static_cast(g_partial[f]); o.mosaicity_deg = (f < static_cast(mos_smooth.size()) && std::isfinite(mos_smooth[f])) ? mos_smooth[f] : static_cast(mosaicity_deg); if (std::isfinite(cc[f])) { o.image_scale_cc = static_cast(cc[f]); o.image_scale_cc_n = cc_n[f]; } else { o.image_scale_cc.reset(); o.image_scale_cc_n.reset(); } } else { o.image_scale_g.reset(); o.image_scale_cc.reset(); o.image_scale_cc_n.reset(); o.mosaicity_deg.reset(); } o.image_scale_b_factor_Ang2.reset(); o.image_scale_wedge_deg.reset(); } } namespace { // Possible unique reflections per shell for the completeness column - mirrors CalcPossibleReflections. void PossiblePerShell(int space_group_number, const UnitCell &cell, double d_min, double d_max, const ResolutionShells &shells, bool merge_friedel, std::vector &possible) { gemmi::UnitCell gemmi_cell = cell; const gemmi::SpaceGroup *sg = gemmi::find_spacegroup_by_number(space_group_number); if (sg == nullptr) return; const std::vector hkls = gemmi::make_miller_vector(gemmi_cell, sg, d_min, d_max, true); const gemmi::GroupOps gops = sg->operations(); CrystalLattice lattice(cell); const auto astar = lattice.Astar(), bstar = lattice.Bstar(), cstar = lattice.Cstar(); for (const auto &hkl : hkls) { const auto q = hkl[0] * astar + hkl[1] * bstar + hkl[2] * cstar; const auto qlen = q.Length(); if (qlen < 1e-6) continue; const auto shell = shells.GetShell(1.0 / qlen); if (!shell.has_value()) continue; const int s = *shell; if (s >= 0 && s < static_cast(possible.size())) possible[s] += (merge_friedel || gops.is_reflection_centric(hkl)) ? 1 : 2; } } } RotationScaleMerge::Result RotationScaleMerge::MergeAndStats(int n_groups, bool for_search, const std::vector &masked) { // A full is usable for the merge / error model if it passes AddImage's filters (with the current // ice/masked-ring context). group >= 0 already encodes "not absent and passes AcceptReflection". auto masked_ring = [&](const Obs &o) { if (masked.empty()) return false; const int ring = IceRingIndex(o.d, ice_half_width_q); return ring >= 0 && ring < static_cast(masked.size()) && masked[ring]; }; auto usable_merge = [&](const Obs &o) { if (o.group < 0) return false; if (!frame_cell_ok[o.frame]) return false; if (!(o.corr > 0.0f) || !std::isfinite(o.corr)) return false; if (for_search && o.on_ice) return false; if (masked_ring(o)) return false; if (o.partiality < min_partiality) return false; const float I_corr = o.I * o.corr, sigma_corr = o.sigma * o.corr; return std::isfinite(I_corr) && std::isfinite(sigma_corr) && sigma_corr > 0.0f; }; // ---- Error model: fit dev2 = a*sigma^2 + b^2*^2 from symmetry-equivalent scatter. ---- std::vector em_mean(n_groups, NAN); std::vector reject_median(n_groups, NAN); double error_model_a = 1.0, error_model_b = 0.0, error_model_chi2 = 0.0; bool error_model_active = false; { // Per-group inverse-variance mean over usable fulls (>=2 obs), and the leverage-corrected samples. std::vector sw(n_groups, 0.0), swI(n_groups, 0.0); std::vector cnt(n_groups, 0); for (const auto &o : fulls) { if (!usable_merge(o)) continue; const double sigma_corr = static_cast(o.sigma) * o.corr; const double w = 1.0 / (sigma_corr * sigma_corr); sw[o.group] += w; swI[o.group] += w * (static_cast(o.I) * o.corr); cnt[o.group]++; } for (int g = 0; g < n_groups; ++g) if (cnt[g] >= 2 && sw[g] > 0.0) em_mean[g] = swI[g] / sw[g]; if (reject_outliers) { std::vector> iv(n_groups); for (const auto &o : fulls) if (usable_merge(o) && cnt[o.group] >= 2) iv[o.group].push_back(o.I * o.corr); for (int g = 0; g < n_groups; ++g) if (!iv[g].empty()) { std::nth_element(iv[g].begin(), iv[g].begin() + iv[g].size() / 2, iv[g].end()); reject_median[g] = iv[g][iv[g].size() / 2]; } } struct Sample { double s2, I2, dev2; }; std::vector samples; samples.reserve(fulls.size()); for (const auto &o : fulls) { if (!usable_merge(o) || cnt[o.group] < 2) continue; const double mean = em_mean[o.group]; if (!std::isfinite(mean)) continue; const double sigma_corr = static_cast(o.sigma) * o.corr; const double s2 = sigma_corr * sigma_corr; const double w = 1.0 / s2; const double factor = 1.0 - w / sw[o.group]; if (factor < 0.05) continue; const double resid = static_cast(o.I) * o.corr - mean; samples.push_back({s2, mean * mean, resid * resid / factor}); } constexpr int n_bins = 16; if (samples.size() >= static_cast(8 * n_bins)) { std::sort(samples.begin(), samples.end(), [](const Sample &a, const Sample &b) { return a.I2 < b.I2; }); std::vector bs2, bI2, bd2; const size_t per = samples.size() / n_bins; for (int bin = 0; bin < n_bins; ++bin) { const size_t lo = bin * per; const size_t hi = (bin == n_bins - 1) ? samples.size() : lo + per; std::vector vs2, vI2, vd2; for (size_t i = lo; i < hi; ++i) { vs2.push_back(samples[i].s2); vI2.push_back(samples[i].I2); vd2.push_back(samples[i].dev2); } bs2.push_back(median_of(vs2)); bI2.push_back(median_of(vI2)); bd2.push_back(median_of(vd2) / CHI2_1_MEDIAN); } std::vector bd2_sorted = bd2; const double dev2_floor = std::max(1e-30, 1e-3 * median_of(bd2_sorted)); double Ass = 0, AsI = 0, AII = 0, Bs = 0, BI = 0; for (int bin = 0; bin < n_bins; ++bin) { const double s2 = bs2[bin], I2 = bI2[bin], d2 = bd2[bin]; const double d2w = std::max(d2, dev2_floor); const double wgt = 1.0 / (d2w * d2w); Ass += wgt * s2 * s2; AsI += wgt * s2 * I2; AII += wgt * I2 * I2; Bs += wgt * s2 * d2; BI += wgt * I2 * d2; } const double det = Ass * AII - AsI * AsI; if (det > 1e-10 * Ass * AII) { error_model_a = std::clamp((Bs * AII - BI * AsI) / det, 0.25, 100.0); const double b2 = std::max((Ass * BI - AsI * Bs) / det, 0.0); error_model_b = std::sqrt(b2); error_model_active = true; std::vector chi2; chi2.reserve(samples.size()); for (const auto &s : samples) { const double v = error_model_a * s.s2 + b2 * s.I2; if (v > 0.0) chi2.push_back(s.dev2 / v); } error_model_chi2 = chi2.empty() ? 0.0 : median_of(chi2) / CHI2_1_MEDIAN; } } } if (error_model_active) logger.Info("Error model: a={:.3f} b={:.3f} ISa={:.1f} chi2={:.2f}", error_model_a, error_model_b, error_model_b > 0 ? 1.0 / error_model_b : 0.0, error_model_chi2); auto corrected_sigma = [&](float I_corr, float sigma_corr, int g) -> float { if (!error_model_active) return sigma_corr; const double I_for_b = std::isfinite(em_mean[g]) ? em_mean[g] : I_corr; const double v = error_model_a * static_cast(sigma_corr) * sigma_corr + (error_model_b * I_for_b) * (error_model_b * I_for_b); return v > 0.0 ? static_cast(std::sqrt(v)) : sigma_corr; }; // ---- Merge: per-group inverse-variance sums with corrected sigma + deterministic half sets. ---- struct Accum { double swI = 0, sw = 0, swIh[2] = {0, 0}, swh[2] = {0, 0}; size_t nh[2] = {0, 0}; float d = NAN; }; std::vector acc(n_groups); size_t reject_count = 0; for (const auto &o : fulls) { if (!usable_merge(o)) continue; const float I_corr = o.I * o.corr; float sigma_corr = o.sigma * o.corr; sigma_corr = corrected_sigma(I_corr, sigma_corr, o.group); if (reject_outliers && error_model_active && std::isfinite(reject_median[o.group]) && std::fabs(I_corr - reject_median[o.group]) > reject_nsigma * sigma_corr) { ++reject_count; continue; } const double w = 1.0 / (static_cast(sigma_corr) * sigma_corr); const double wI = w * I_corr; const int half = HalfForImage(o.frame); auto &a = acc[o.group]; a.swI += wI; a.sw += w; a.swIh[half] += wI; a.swh[half] += w; a.nh[half]++; if (!std::isfinite(a.d) && std::isfinite(o.d) && o.d > 0.0f) a.d = o.d; } // ---- Export merged reflections (+ resolution-shell R-free flags). ---- Result result; result.isa = error_model_b > 0 ? 1.0 / error_model_b : 0.0; std::vector merged_I(n_groups, NAN); float d_min = std::numeric_limits::max(), d_max = 0.0f; for (int g = 0; g < n_groups; ++g) { const auto &a = acc[g]; if (a.sw <= 0.0) continue; MergedReflection mr{}; mr.h = group_h[g]; mr.k = group_k[g]; mr.l = group_l[g]; mr.I = static_cast(a.swI / a.sw); mr.sigma = 1.0f / std::sqrt(static_cast(a.sw)); mr.I_half[0] = mr.I_half[1] = NAN; mr.sigma_half[0] = mr.sigma_half[1] = NAN; mr.d = a.d; if (a.nh[0] + a.nh[1] > 0 && a.swh[0] > 0.0 && a.swh[1] > 0.0) { for (int i = 0; i < 2; ++i) { mr.I_half[i] = static_cast(a.swIh[i] / a.swh[i]); mr.sigma_half[i] = 1.0f / std::sqrt(static_cast(a.swh[i])); } } if (!std::isfinite(a.d) || a.d <= 0.0f) continue; d_min = std::min(d_min, a.d); d_max = std::max(d_max, a.d); merged_I[g] = mr.I; result.merged.push_back(mr); } if (rfree_fraction > 0.0 && !result.merged.empty() && d_min < d_max && d_min > 0.0f) { constexpr int n_shells = 20; ResolutionShells shells(d_min * 0.999f, d_max * 1.001f, n_shells); std::vector> shell_groups(n_shells); for (size_t i = 0; i < result.merged.size(); ++i) { const auto shell = shells.GetShell(result.merged[i].d); if (shell && *shell >= 0 && *shell < n_shells) shell_groups[*shell].push_back(i); } std::mt19937 rng(12345u); std::bernoulli_distribution dist(rfree_fraction); for (const auto &grp : shell_groups) for (const size_t i : grp) result.merged[i].rfree_flag = dist(rng); } if (reject_count > 0) logger.Info("Merge outlier rejection: dropped {} observations", reject_count); // ---- Statistics (10 shells): completeness, multiplicity, , R_meas, CC1/2. ---- constexpr int n_shells = 10; float sd_min = std::numeric_limits::max(), sd_max = 0.0f; for (const auto &m : result.merged) { if (!std::isfinite(m.d) || m.d <= 0.0f) continue; if (d_min_limit && m.d < *d_min_limit) continue; sd_min = std::min(sd_min, m.d); sd_max = std::max(sd_max, m.d); } if (!(sd_min < sd_max && sd_min > 0.0f)) throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "RotationScaleMerge: resolution calculation failed"); const float d_min_pad = sd_min * 0.999f, d_max_pad = sd_max * 1.001f; ResolutionShells shells(d_min_pad, d_max_pad, n_shells); const auto shell_mean_1_d2 = shells.GetShellMeanOneOverResSq(); const auto shell_min_res = shells.GetShellMinRes(); struct ShellAcc { int unique = 0, total_obs = 0, possible = 0; double sum_i_over_sigma = 0.0; int n_i_over_sigma = 0; CorrelationCoefficient cc_half; }; std::vector sa(n_shells); std::vector possible(n_shells, 0); if (reference_cell) PossiblePerShell(x.GetSpaceGroupNumber().value_or(1), *reference_cell, d_min_pad, d_max_pad, shells, merge_friedel, possible); for (int s = 0; s < n_shells; ++s) sa[s].possible = possible[s]; CorrelationCoefficient cc_half_overall; for (const auto &m : result.merged) { const auto shell = shells.GetShell(m.d); if (!shell || *shell < 0 || *shell >= n_shells) continue; if (std::isfinite(m.I) && std::isfinite(m.sigma) && m.sigma > 0.0) { auto &s = sa[*shell]; s.unique++; s.sum_i_over_sigma += m.I / m.sigma; s.n_i_over_sigma++; if (std::isfinite(m.I_half[0]) && std::isfinite(m.I_half[1])) { s.cc_half.Add(m.I_half[0], m.I_half[1]); cc_half_overall.Add(m.I_half[0], m.I_half[1]); } } } // R_meas: re-walk the fulls (Mask = cell only; no ice / masked-ring / error-model), accumulate // |I_i - | per reflection. struct RmeasObs { double sum_abs_dev = 0, sum_I = 0; int n = 0, shell = -1; }; std::vector rmeas(n_groups); for (const auto &o : fulls) { if (o.group < 0) continue; if (!frame_cell_ok[o.frame]) continue; if (!(o.corr > 0.0f) || !std::isfinite(o.corr)) continue; if (o.partiality < min_partiality) continue; const float I_corr = o.I * o.corr, sigma_corr = o.sigma * o.corr; if (!std::isfinite(I_corr) || !std::isfinite(sigma_corr) || sigma_corr <= 0.0f) continue; const auto shell = shells.GetShell(o.d); if (!shell || *shell < 0 || *shell >= n_shells) continue; sa[*shell].total_obs++; if (std::isfinite(merged_I[o.group])) { auto &r = rmeas[o.group]; r.sum_abs_dev += std::fabs(static_cast(I_corr) - merged_I[o.group]); r.sum_I += I_corr; r.n++; r.shell = *shell; } } std::vector rmeas_num(n_shells, 0.0), rmeas_den(n_shells, 0.0); double rmeas_num_all = 0.0, rmeas_den_all = 0.0; for (const auto &r : rmeas) { if (r.n < 2 || r.shell < 0 || r.shell >= n_shells) continue; const double factor = std::sqrt(static_cast(r.n) / (r.n - 1)); rmeas_num[r.shell] += factor * r.sum_abs_dev; rmeas_den[r.shell] += r.sum_I; rmeas_num_all += factor * r.sum_abs_dev; rmeas_den_all += r.sum_I; } MergeStatistics &out = result.statistics; out.shells.resize(n_shells); for (int s = 0; s < n_shells; ++s) { auto &ss = out.shells[s]; ss.mean_one_over_d2 = shell_mean_1_d2[s]; ss.d_min = shell_min_res[s]; ss.d_max = s == 0 ? d_max_pad : shell_min_res[s - 1]; ss.total_observations = sa[s].total_obs; ss.unique_reflections = sa[s].unique; ss.possible_unique_reflections = sa[s].possible; ss.mean_i_over_sigma = sa[s].n_i_over_sigma > 0 ? sa[s].sum_i_over_sigma / sa[s].n_i_over_sigma : 0.0; ss.cc_half = sa[s].cc_half.GetCC(); ss.cc_ref = NAN; ss.r_meas = rmeas_den[s] > 0.0 ? rmeas_num[s] / rmeas_den[s] : NAN; } auto &overall = out.overall; overall.d_min = sd_min; overall.d_max = sd_max; double sum_ios = 0.0; int n_ios = 0; for (int s = 0; s < n_shells; ++s) { overall.total_observations += sa[s].total_obs; overall.unique_reflections += sa[s].unique; overall.possible_unique_reflections += sa[s].possible; sum_ios += sa[s].sum_i_over_sigma; n_ios += sa[s].n_i_over_sigma; } overall.mean_i_over_sigma = n_ios > 0 ? sum_ios / n_ios : 0.0; overall.cc_half = cc_half_overall.GetCC(); overall.cc_ref = NAN; overall.r_meas = rmeas_den_all > 0.0 ? rmeas_num_all / rmeas_den_all : NAN; logger.Info("Merge complete ({} unique reflections)", result.merged.size()); return result; } RotationScaleMerge::Result RotationScaleMerge::Run(bool for_search, const std::vector &masked_ice_rings) { auto t_last = std::chrono::steady_clock::now(); auto lap = [&](const char *what) { const auto now = std::chrono::steady_clock::now(); logger.Info("[rsm] {}: {:.2f} s", what, std::chrono::duration(now - t_last).count()); t_last = now; }; const int sg_number = x.GetSpaceGroupNumber().value_or(1); HKLKeyGenerator keygen(merge_friedel, sg_number); // --- 1. Per-frame partial scaling (Rotation model, per-image G only). --- const int n_groups = ComputeAsuGroups(keygen); // one ASU grouping, shared by partials and fulls lap("group hkl"); std::vector partial_mean; for (int it = 0; it < scaling_iter; ++it) { ReduceGroupMeans(partials, n_groups, false, {}, partial_mean); FitPerFrameG(partials, frame_start, frame_count, partial_mean, /*unity=*/false, g_partial); UpdateCorr(partials, g_partial, frame_scaled_scratch); } lap("scale partials"); const std::vector partial_scaled = frame_scaled_scratch; // --- 2. Smooth G across frames (XDS DELPHI-like) before the combine. --- const auto s = x.GetScalingSettings(); const double smooth_g_deg = s.GetSmoothGDegrees(); const auto gonio = x.GetGoniometer(); const double osc_deg = gonio ? std::fabs(gonio->GetIncrement_deg()) : 0.0; if (smooth_g_deg > 0.0 && osc_deg > 1e-6) { int window = std::max(1, static_cast(std::lround(smooth_g_deg / osc_deg))); if (window % 2 == 0) ++window; SmoothG(partials, g_partial, window); } // Per-frame CC + write G/CC/mosaicity back onto the partials (once). ReduceGroupMeans(partials, n_groups, false, {}, partial_mean); FinalizePerFrameScale(n_groups, partial_mean, partial_scaled); // --- 3. 3D combine of per-frame partials into fulls (fulls inherit their ASU group here). --- Combine(); lap("combine"); // --- 4. Scale the fulls (XDS order, Unity model). --- if (scale_fulls) { std::vector full_mean; for (int it = 0; it < scaling_iter; ++it) { ReduceGroupMeans(fulls, n_groups, false, {}, full_mean); FitPerFrameG(fulls, fulls_frame_start, fulls_frame_count, full_mean, /*unity=*/true, g_full); UpdateCorr(fulls, g_full, frame_scaled_scratch); } logger.Info("Scaled fulls (XDS order, Unity model)"); } lap("scale fulls"); // --- 5. Error model + merge + statistics. --- auto r = MergeAndStats(n_groups, for_search, masked_ice_rings); lap("merge+stats"); return r; }