import os import re import numpy as np import gemmi import pandas as pd from scipy.ndimage import label from scipy.ndimage import minimum_filter from scipy.signal import savgol_filter from scipy.optimize import curve_fit import matplotlib.pyplot as plt # -------------------------------------------------- # Map metric # -------------------------------------------------- def compute_negative_density_sum(map_file, sigma_threshold=3.0, radius=2): m = gemmi.read_ccp4_map(map_file) try: m.setup() except TypeError: m.setup(0.0) rho = np.array(m.grid, copy=False) sigma = rho.std() threshold = -sigma_threshold * sigma print("sigma:", sigma) print("min density:", rho.min()) # find local minima local_min = rho == minimum_filter(rho, size=3) peak_mask = (rho < threshold) & local_min peak_positions = np.argwhere(peak_mask) neg_sum = 0.0 # for x, y, z in peak_positions: # x0 = max(0, x-radius) # x1 = min(rho.shape[0], x+radius+1) # y0 = max(0, y-radius) # y1 = min(rho.shape[1], y+radius+1) # z0 = max(0, z-radius) # z1 = min(rho.shape[2], z+radius+1) # patch = rho[x0:x1, y0:y1, z0:z1] # #neg_sum += np.sum(np.abs(patch[patch < 0])) neg_sum = np.sum(np.abs(rho[rho < 0])) / rho.size return neg_sum # -------------------------------------------------- # Collect occupancy data for one timestamp folder # -------------------------------------------------- def collect_run_knee_data(timestamp_dir): print(f" Processing timestamp: {os.path.basename(timestamp_dir)}") rows = [] for folder in sorted(os.listdir(timestamp_dir)): if not folder.startswith("occupancy_"): continue try: occ = float(folder.split("_")[1]) except: continue occ_dir = os.path.join(timestamp_dir, folder) map_file = None for f in os.listdir(occ_dir): if f.endswith(".ccp4"): map_file = os.path.join(occ_dir, f) break if map_file and os.path.exists(map_file): neg_sum = compute_negative_density_sum(map_file) rows.append({ "occupancy": occ, "alpha": 1.0 / occ, "neg_sum": neg_sum }) df = pd.DataFrame(rows).sort_values("alpha") print(f" → Collected {len(df)} occupancy points\n") return df # -------------------------------------------------- # Walk full xtrapol8 tree # -------------------------------------------------- def scan_xtrapol8_root(root_dir): print(f"\nScanning root directory: {root_dir}\n") df = pd.DataFrame() target_journal = "2023_science_1" for journal in os.listdir(root_dir): journal_path = os.path.join(root_dir, journal) # if not os.path.isdir(journal_path): # continue if journal != target_journal: continue print(f"Journal: {journal}") for dark_code in os.listdir(journal_path): dark_path = os.path.join(journal_path, dark_code) if not os.path.isdir(dark_path): continue print(f" Dark dataset: {dark_code}") for pair in os.listdir(dark_path): pair_path = os.path.join(dark_path, pair) if not os.path.isdir(pair_path): continue print(f" Pair: {pair}") found_timestamp = False # timestamp folders match YYYYMMDD_HHMMSS for sub in os.listdir(pair_path): if re.match(r"xtrapol8_\d{8}_\d{6}", sub): found_timestamp = True timestamp_dir = os.path.join(pair_path, sub) knee_data = collect_run_knee_data(timestamp_dir) x = np.log10(knee_data.alpha) y = knee_data.neg_sum y_smooth = savgol_filter(y, 9, 3) def sigmoid(x, A, B, k, x0): return A + B / (1 + np.exp(-k*(x-x0))) p0 = [min(y_smooth), max(y_smooth)-min(y_smooth), 3, np.median(x)] params, _ = curve_fit( sigmoid, x, y_smooth, p0=p0, bounds=( [min(y_smooth)*0.5, 0, 0.1, min(x)], [max(y_smooth)*2, max(y_smooth)*2, 20, max(x)] ), maxfev=10000 ) A, B, k, x0 = params x_knee = x0 - 2/k alpha_knee = 10**x_knee xx = np.linspace(min(x), max(x), 200) yy = sigmoid(xx, *params) xx = 10**xx plt.figure() plt.plot(knee_data["alpha"], knee_data["neg_sum"], "o-", color="blue") plt.plot(knee_data["alpha"], y_smooth, linestyle='dashed', color="green" ) plt.plot(xx, yy, linestyle='dotted', color="green" ) plt.axvline( alpha_knee ) plt.xscale("log") #plt.yscale("log") plt.xlabel("alpha") plt.ylabel("Σ|neg density|") plt.title(f"{pair} {sub}") plt.show() if len(knee_data) > 3: print(" ✓ Valid run detected\n") data = [ { "journal": journal, "dark": dark_code, "pair": pair, "timestamp": sub, }] df_1 = pd.DataFrame( data ) df_1 = pd.concat( ( df_1, knee_data ), axis=1 ) df = pd.concat( ( df, df_1 ) ) else: print(" ✗ Not enough occupancy points\n") if not found_timestamp: print(" ⚠ No timestamp folder found") print(f"\nFinished scan. Total valid runs: {len(df)}\n") return df runs = scan_xtrapol8_root("xtrapol8_runs_2") print( runs ) for run in runs: N_vals = np.array([x[0] for x in run["data"]]) sums = np.array([x[1] for x in run["data"]]) # detect knee here