From 7b00519ef4fa1ab2e53b944d7f2d151243a3d3f9 Mon Sep 17 00:00:00 2001 From: beale_j Date: Fri, 6 Mar 2026 08:24:24 +0100 Subject: [PATCH] attempt to auto Marius Schmidt style alpha estimate --- knee_analysis.py | 231 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 231 insertions(+) create mode 100644 knee_analysis.py diff --git a/knee_analysis.py b/knee_analysis.py new file mode 100644 index 0000000..85381ca --- /dev/null +++ b/knee_analysis.py @@ -0,0 +1,231 @@ +import os +import re +import numpy as np +import gemmi +import pandas as pd +from scipy.ndimage import label +from scipy.ndimage import minimum_filter +from scipy.signal import savgol_filter +from scipy.optimize import curve_fit +import matplotlib.pyplot as plt + +# -------------------------------------------------- +# Map metric +# -------------------------------------------------- +def compute_negative_density_sum(map_file, sigma_threshold=3.0, radius=2): + + m = gemmi.read_ccp4_map(map_file) + + try: + m.setup() + except TypeError: + m.setup(0.0) + + rho = np.array(m.grid, copy=False) + + sigma = rho.std() + threshold = -sigma_threshold * sigma + + print("sigma:", sigma) + print("min density:", rho.min()) + + # find local minima + local_min = rho == minimum_filter(rho, size=3) + + peak_mask = (rho < threshold) & local_min + + peak_positions = np.argwhere(peak_mask) + + neg_sum = 0.0 + + # for x, y, z in peak_positions: + + # x0 = max(0, x-radius) + # x1 = min(rho.shape[0], x+radius+1) + + # y0 = max(0, y-radius) + # y1 = min(rho.shape[1], y+radius+1) + + # z0 = max(0, z-radius) + # z1 = min(rho.shape[2], z+radius+1) + + # patch = rho[x0:x1, y0:y1, z0:z1] + + # #neg_sum += np.sum(np.abs(patch[patch < 0])) + neg_sum = np.sum(np.abs(rho[rho < 0])) / rho.size + + return neg_sum + + +# -------------------------------------------------- +# Collect occupancy data for one timestamp folder +# -------------------------------------------------- + +def collect_run_knee_data(timestamp_dir): + + print(f" Processing timestamp: {os.path.basename(timestamp_dir)}") + + rows = [] + + for folder in sorted(os.listdir(timestamp_dir)): + + if not folder.startswith("occupancy_"): + continue + + try: + occ = float(folder.split("_")[1]) + except: + continue + + occ_dir = os.path.join(timestamp_dir, folder) + + map_file = None + for f in os.listdir(occ_dir): + if f.endswith(".ccp4"): + map_file = os.path.join(occ_dir, f) + break + + if map_file and os.path.exists(map_file): + neg_sum = compute_negative_density_sum(map_file) + + rows.append({ + "occupancy": occ, + "alpha": 1.0 / occ, + "neg_sum": neg_sum + }) + + df = pd.DataFrame(rows).sort_values("alpha") + + print(f" → Collected {len(df)} occupancy points\n") + + return df + +# -------------------------------------------------- +# Walk full xtrapol8 tree +# -------------------------------------------------- + +def scan_xtrapol8_root(root_dir): + + print(f"\nScanning root directory: {root_dir}\n") + + df = pd.DataFrame() + + target_journal = "2023_science_1" + + for journal in os.listdir(root_dir): + journal_path = os.path.join(root_dir, journal) +# if not os.path.isdir(journal_path): +# continue + + if journal != target_journal: + continue + + print(f"Journal: {journal}") + + for dark_code in os.listdir(journal_path): + dark_path = os.path.join(journal_path, dark_code) + if not os.path.isdir(dark_path): + continue + + print(f" Dark dataset: {dark_code}") + + for pair in os.listdir(dark_path): + pair_path = os.path.join(dark_path, pair) + if not os.path.isdir(pair_path): + continue + + print(f" Pair: {pair}") + + found_timestamp = False + + # timestamp folders match YYYYMMDD_HHMMSS + for sub in os.listdir(pair_path): + if re.match(r"xtrapol8_\d{8}_\d{6}", sub): + + found_timestamp = True + timestamp_dir = os.path.join(pair_path, sub) + + knee_data = collect_run_knee_data(timestamp_dir) + + x = np.log10(knee_data.alpha) + y = knee_data.neg_sum + + y_smooth = savgol_filter(y, 9, 3) + + def sigmoid(x, A, B, k, x0): + return A + B / (1 + np.exp(-k*(x-x0))) + + p0 = [min(y_smooth), max(y_smooth)-min(y_smooth), 3, np.median(x)] + + params, _ = curve_fit( + sigmoid, + x, + y_smooth, + p0=p0, + bounds=( + [min(y_smooth)*0.5, 0, 0.1, min(x)], + [max(y_smooth)*2, max(y_smooth)*2, 20, max(x)] + ), + maxfev=10000 + ) + + A, B, k, x0 = params + x_knee = x0 - 2/k + alpha_knee = 10**x_knee + + + xx = np.linspace(min(x), max(x), 200) + yy = sigmoid(xx, *params) + + xx = 10**xx + + plt.figure() + + plt.plot(knee_data["alpha"], knee_data["neg_sum"], "o-", color="blue") + plt.plot(knee_data["alpha"], y_smooth, linestyle='dashed', color="green" ) + plt.plot(xx, yy, linestyle='dotted', color="green" ) + + plt.axvline( alpha_knee ) + + plt.xscale("log") + #plt.yscale("log") + + plt.xlabel("alpha") + plt.ylabel("Σ|neg density|") + + plt.title(f"{pair} {sub}") + + plt.show() + + if len(knee_data) > 3: + print(" ✓ Valid run detected\n") + + data = [ { + "journal": journal, + "dark": dark_code, + "pair": pair, + "timestamp": sub, + }] + df_1 = pd.DataFrame( data ) + df_1 = pd.concat( ( df_1, knee_data ), axis=1 ) + df = pd.concat( ( df, df_1 ) ) + + else: + print(" ✗ Not enough occupancy points\n") + + if not found_timestamp: + print(" ⚠ No timestamp folder found") + + print(f"\nFinished scan. Total valid runs: {len(df)}\n") + + return df + +runs = scan_xtrapol8_root("xtrapol8_runs_2") + +print( runs ) + +for run in runs: + N_vals = np.array([x[0] for x in run["data"]]) + sums = np.array([x[1] for x in run["data"]]) + + # detect knee here \ No newline at end of file