Files

231 lines
6.7 KiB
Python

import os
import re
import numpy as np
import gemmi
import pandas as pd
from scipy.ndimage import label
from scipy.ndimage import minimum_filter
from scipy.signal import savgol_filter
from scipy.optimize import curve_fit
import matplotlib.pyplot as plt
# --------------------------------------------------
# Map metric
# --------------------------------------------------
def compute_negative_density_sum(map_file, sigma_threshold=3.0, radius=2):
m = gemmi.read_ccp4_map(map_file)
try:
m.setup()
except TypeError:
m.setup(0.0)
rho = np.array(m.grid, copy=False)
sigma = rho.std()
threshold = -sigma_threshold * sigma
print("sigma:", sigma)
print("min density:", rho.min())
# find local minima
local_min = rho == minimum_filter(rho, size=3)
peak_mask = (rho < threshold) & local_min
peak_positions = np.argwhere(peak_mask)
neg_sum = 0.0
# for x, y, z in peak_positions:
# x0 = max(0, x-radius)
# x1 = min(rho.shape[0], x+radius+1)
# y0 = max(0, y-radius)
# y1 = min(rho.shape[1], y+radius+1)
# z0 = max(0, z-radius)
# z1 = min(rho.shape[2], z+radius+1)
# patch = rho[x0:x1, y0:y1, z0:z1]
# #neg_sum += np.sum(np.abs(patch[patch < 0]))
neg_sum = np.sum(np.abs(rho[rho < 0])) / rho.size
return neg_sum
# --------------------------------------------------
# Collect occupancy data for one timestamp folder
# --------------------------------------------------
def collect_run_knee_data(timestamp_dir):
print(f" Processing timestamp: {os.path.basename(timestamp_dir)}")
rows = []
for folder in sorted(os.listdir(timestamp_dir)):
if not folder.startswith("occupancy_"):
continue
try:
occ = float(folder.split("_")[1])
except:
continue
occ_dir = os.path.join(timestamp_dir, folder)
map_file = None
for f in os.listdir(occ_dir):
if f.endswith(".ccp4"):
map_file = os.path.join(occ_dir, f)
break
if map_file and os.path.exists(map_file):
neg_sum = compute_negative_density_sum(map_file)
rows.append({
"occupancy": occ,
"alpha": 1.0 / occ,
"neg_sum": neg_sum
})
df = pd.DataFrame(rows).sort_values("alpha")
print(f" → Collected {len(df)} occupancy points\n")
return df
# --------------------------------------------------
# Walk full xtrapol8 tree
# --------------------------------------------------
def scan_xtrapol8_root(root_dir):
print(f"\nScanning root directory: {root_dir}\n")
df = pd.DataFrame()
target_journal = "2023_science_1"
for journal in os.listdir(root_dir):
journal_path = os.path.join(root_dir, journal)
# if not os.path.isdir(journal_path):
# continue
if journal != target_journal:
continue
print(f"Journal: {journal}")
for dark_code in os.listdir(journal_path):
dark_path = os.path.join(journal_path, dark_code)
if not os.path.isdir(dark_path):
continue
print(f" Dark dataset: {dark_code}")
for pair in os.listdir(dark_path):
pair_path = os.path.join(dark_path, pair)
if not os.path.isdir(pair_path):
continue
print(f" Pair: {pair}")
found_timestamp = False
# timestamp folders match YYYYMMDD_HHMMSS
for sub in os.listdir(pair_path):
if re.match(r"xtrapol8_\d{8}_\d{6}", sub):
found_timestamp = True
timestamp_dir = os.path.join(pair_path, sub)
knee_data = collect_run_knee_data(timestamp_dir)
x = np.log10(knee_data.alpha)
y = knee_data.neg_sum
y_smooth = savgol_filter(y, 9, 3)
def sigmoid(x, A, B, k, x0):
return A + B / (1 + np.exp(-k*(x-x0)))
p0 = [min(y_smooth), max(y_smooth)-min(y_smooth), 3, np.median(x)]
params, _ = curve_fit(
sigmoid,
x,
y_smooth,
p0=p0,
bounds=(
[min(y_smooth)*0.5, 0, 0.1, min(x)],
[max(y_smooth)*2, max(y_smooth)*2, 20, max(x)]
),
maxfev=10000
)
A, B, k, x0 = params
x_knee = x0 - 2/k
alpha_knee = 10**x_knee
xx = np.linspace(min(x), max(x), 200)
yy = sigmoid(xx, *params)
xx = 10**xx
plt.figure()
plt.plot(knee_data["alpha"], knee_data["neg_sum"], "o-", color="blue")
plt.plot(knee_data["alpha"], y_smooth, linestyle='dashed', color="green" )
plt.plot(xx, yy, linestyle='dotted', color="green" )
plt.axvline( alpha_knee )
plt.xscale("log")
#plt.yscale("log")
plt.xlabel("alpha")
plt.ylabel("Σ|neg density|")
plt.title(f"{pair} {sub}")
plt.show()
if len(knee_data) > 3:
print(" ✓ Valid run detected\n")
data = [ {
"journal": journal,
"dark": dark_code,
"pair": pair,
"timestamp": sub,
}]
df_1 = pd.DataFrame( data )
df_1 = pd.concat( ( df_1, knee_data ), axis=1 )
df = pd.concat( ( df, df_1 ) )
else:
print(" ✗ Not enough occupancy points\n")
if not found_timestamp:
print(" ⚠ No timestamp folder found")
print(f"\nFinished scan. Total valid runs: {len(df)}\n")
return df
runs = scan_xtrapol8_root("xtrapol8_runs_2")
print( runs )
for run in runs:
N_vals = np.array([x[0] for x in run["data"]])
sums = np.array([x[1] for x in run["data"]])
# detect knee here