231 lines
6.7 KiB
Python
231 lines
6.7 KiB
Python
import os
|
|
import re
|
|
import numpy as np
|
|
import gemmi
|
|
import pandas as pd
|
|
from scipy.ndimage import label
|
|
from scipy.ndimage import minimum_filter
|
|
from scipy.signal import savgol_filter
|
|
from scipy.optimize import curve_fit
|
|
import matplotlib.pyplot as plt
|
|
|
|
# --------------------------------------------------
|
|
# Map metric
|
|
# --------------------------------------------------
|
|
def compute_negative_density_sum(map_file, sigma_threshold=3.0, radius=2):
|
|
|
|
m = gemmi.read_ccp4_map(map_file)
|
|
|
|
try:
|
|
m.setup()
|
|
except TypeError:
|
|
m.setup(0.0)
|
|
|
|
rho = np.array(m.grid, copy=False)
|
|
|
|
sigma = rho.std()
|
|
threshold = -sigma_threshold * sigma
|
|
|
|
print("sigma:", sigma)
|
|
print("min density:", rho.min())
|
|
|
|
# find local minima
|
|
local_min = rho == minimum_filter(rho, size=3)
|
|
|
|
peak_mask = (rho < threshold) & local_min
|
|
|
|
peak_positions = np.argwhere(peak_mask)
|
|
|
|
neg_sum = 0.0
|
|
|
|
# for x, y, z in peak_positions:
|
|
|
|
# x0 = max(0, x-radius)
|
|
# x1 = min(rho.shape[0], x+radius+1)
|
|
|
|
# y0 = max(0, y-radius)
|
|
# y1 = min(rho.shape[1], y+radius+1)
|
|
|
|
# z0 = max(0, z-radius)
|
|
# z1 = min(rho.shape[2], z+radius+1)
|
|
|
|
# patch = rho[x0:x1, y0:y1, z0:z1]
|
|
|
|
# #neg_sum += np.sum(np.abs(patch[patch < 0]))
|
|
neg_sum = np.sum(np.abs(rho[rho < 0])) / rho.size
|
|
|
|
return neg_sum
|
|
|
|
|
|
# --------------------------------------------------
|
|
# Collect occupancy data for one timestamp folder
|
|
# --------------------------------------------------
|
|
|
|
def collect_run_knee_data(timestamp_dir):
|
|
|
|
print(f" Processing timestamp: {os.path.basename(timestamp_dir)}")
|
|
|
|
rows = []
|
|
|
|
for folder in sorted(os.listdir(timestamp_dir)):
|
|
|
|
if not folder.startswith("occupancy_"):
|
|
continue
|
|
|
|
try:
|
|
occ = float(folder.split("_")[1])
|
|
except:
|
|
continue
|
|
|
|
occ_dir = os.path.join(timestamp_dir, folder)
|
|
|
|
map_file = None
|
|
for f in os.listdir(occ_dir):
|
|
if f.endswith(".ccp4"):
|
|
map_file = os.path.join(occ_dir, f)
|
|
break
|
|
|
|
if map_file and os.path.exists(map_file):
|
|
neg_sum = compute_negative_density_sum(map_file)
|
|
|
|
rows.append({
|
|
"occupancy": occ,
|
|
"alpha": 1.0 / occ,
|
|
"neg_sum": neg_sum
|
|
})
|
|
|
|
df = pd.DataFrame(rows).sort_values("alpha")
|
|
|
|
print(f" → Collected {len(df)} occupancy points\n")
|
|
|
|
return df
|
|
|
|
# --------------------------------------------------
|
|
# Walk full xtrapol8 tree
|
|
# --------------------------------------------------
|
|
|
|
def scan_xtrapol8_root(root_dir):
|
|
|
|
print(f"\nScanning root directory: {root_dir}\n")
|
|
|
|
df = pd.DataFrame()
|
|
|
|
target_journal = "2023_science_1"
|
|
|
|
for journal in os.listdir(root_dir):
|
|
journal_path = os.path.join(root_dir, journal)
|
|
# if not os.path.isdir(journal_path):
|
|
# continue
|
|
|
|
if journal != target_journal:
|
|
continue
|
|
|
|
print(f"Journal: {journal}")
|
|
|
|
for dark_code in os.listdir(journal_path):
|
|
dark_path = os.path.join(journal_path, dark_code)
|
|
if not os.path.isdir(dark_path):
|
|
continue
|
|
|
|
print(f" Dark dataset: {dark_code}")
|
|
|
|
for pair in os.listdir(dark_path):
|
|
pair_path = os.path.join(dark_path, pair)
|
|
if not os.path.isdir(pair_path):
|
|
continue
|
|
|
|
print(f" Pair: {pair}")
|
|
|
|
found_timestamp = False
|
|
|
|
# timestamp folders match YYYYMMDD_HHMMSS
|
|
for sub in os.listdir(pair_path):
|
|
if re.match(r"xtrapol8_\d{8}_\d{6}", sub):
|
|
|
|
found_timestamp = True
|
|
timestamp_dir = os.path.join(pair_path, sub)
|
|
|
|
knee_data = collect_run_knee_data(timestamp_dir)
|
|
|
|
x = np.log10(knee_data.alpha)
|
|
y = knee_data.neg_sum
|
|
|
|
y_smooth = savgol_filter(y, 9, 3)
|
|
|
|
def sigmoid(x, A, B, k, x0):
|
|
return A + B / (1 + np.exp(-k*(x-x0)))
|
|
|
|
p0 = [min(y_smooth), max(y_smooth)-min(y_smooth), 3, np.median(x)]
|
|
|
|
params, _ = curve_fit(
|
|
sigmoid,
|
|
x,
|
|
y_smooth,
|
|
p0=p0,
|
|
bounds=(
|
|
[min(y_smooth)*0.5, 0, 0.1, min(x)],
|
|
[max(y_smooth)*2, max(y_smooth)*2, 20, max(x)]
|
|
),
|
|
maxfev=10000
|
|
)
|
|
|
|
A, B, k, x0 = params
|
|
x_knee = x0 - 2/k
|
|
alpha_knee = 10**x_knee
|
|
|
|
|
|
xx = np.linspace(min(x), max(x), 200)
|
|
yy = sigmoid(xx, *params)
|
|
|
|
xx = 10**xx
|
|
|
|
plt.figure()
|
|
|
|
plt.plot(knee_data["alpha"], knee_data["neg_sum"], "o-", color="blue")
|
|
plt.plot(knee_data["alpha"], y_smooth, linestyle='dashed', color="green" )
|
|
plt.plot(xx, yy, linestyle='dotted', color="green" )
|
|
|
|
plt.axvline( alpha_knee )
|
|
|
|
plt.xscale("log")
|
|
#plt.yscale("log")
|
|
|
|
plt.xlabel("alpha")
|
|
plt.ylabel("Σ|neg density|")
|
|
|
|
plt.title(f"{pair} {sub}")
|
|
|
|
plt.show()
|
|
|
|
if len(knee_data) > 3:
|
|
print(" ✓ Valid run detected\n")
|
|
|
|
data = [ {
|
|
"journal": journal,
|
|
"dark": dark_code,
|
|
"pair": pair,
|
|
"timestamp": sub,
|
|
}]
|
|
df_1 = pd.DataFrame( data )
|
|
df_1 = pd.concat( ( df_1, knee_data ), axis=1 )
|
|
df = pd.concat( ( df, df_1 ) )
|
|
|
|
else:
|
|
print(" ✗ Not enough occupancy points\n")
|
|
|
|
if not found_timestamp:
|
|
print(" ⚠ No timestamp folder found")
|
|
|
|
print(f"\nFinished scan. Total valid runs: {len(df)}\n")
|
|
|
|
return df
|
|
|
|
runs = scan_xtrapol8_root("xtrapol8_runs_2")
|
|
|
|
print( runs )
|
|
|
|
for run in runs:
|
|
N_vals = np.array([x[0] for x in run["data"]])
|
|
sums = np.array([x[1] for x in run["data"]])
|
|
|
|
# detect knee here |