Files
extrapolation/process_database_v1.py

604 lines
17 KiB
Python

import os
import subprocess
import pandas as pd
import numpy as np
import regex as re
import glob
import datetime
XTRAPOL8_ROOT = "xtrapol8_runs_2"
TEMPLATE_PHIL = "xtrapol8.phil" # same directory as script
def extract_replicate(filename):
"""
Extract replicate number from filename.
Examples:
dark_2.mtz -> 2
raw_3.mtz -> 3
dark.mtz -> 1
"""
m = re.search(r'_(\d+)\.mtz$', filename.lower())
if m:
return int(m.group(1))
return 1
def collect_cif_files(folder):
"""
Return list of absolute paths to .cif files in folder.
"""
cif_files = []
for f in os.listdir(folder):
if f.lower().endswith(".cif"):
cif_files.append(os.path.abspath(os.path.join(folder, f)))
return cif_files
def replace_phil_field(content, field_name, new_value):
"""
Replace a single-line PHIL field assignment.
"""
pattern = rf'({field_name}\s*=\s*)(.*)'
replacement = rf'\1"{new_value}"'
return re.sub(pattern, replacement, content)
def replace_additional_files_block(content, new_block):
"""
Replace the additional_files line(s).
"""
pattern = r'additional_files\s*=.*'
return re.sub(pattern, new_block, content)
def write_run_phil(
dark_mtz,
raw_mtz,
pdb_file,
cif_files,
output_dir,
top_dir,
dmin=None
):
"""
Copy template PHIL and modify required fields for this run.
Returns path to new phil file.
"""
dark_mtz = os.path.abspath(dark_mtz)
raw_mtz = os.path.abspath(raw_mtz)
pdb_file = os.path.abspath(pdb_file)
cif_files = [os.path.abspath(c) for c in cif_files]
output_dir = os.path.abspath(output_dir)
# Read template
with open(TEMPLATE_PHIL, "r") as f:
content = f.read()
# Replace input paths
content = replace_phil_field(content, "reference_mtz", dark_mtz)
content = replace_phil_field(content, "triggered_mtz", raw_mtz)
content = replace_phil_field(content, "reference_pdb", pdb_file)
# Replace resolution if requested
if dmin is not None:
content = replace_phil_field(content, "high_resolution", str(dmin))
# Handle CIF files
if cif_files:
cif_block = "\n".join(
[f' additional_files = "{c}"' for c in cif_files]
)
content = replace_additional_files_block(content, cif_block)
else:
content = replace_phil_field(content, "additional_files", "None")
# Replace output directory
content = replace_phil_field(content, "outdir", output_dir)
# Write run phil
run_phil_path = os.path.join(top_dir, "xtrapol8_run.phil")
with open(run_phil_path, "w") as f:
f.write(content)
return run_phil_path
def create_xtrapol8_shell_script(output_dir):
"""
Creates shell script that:
- Loads modules
- Runs Xtrapol8 with phil file
"""
output_dir = os.path.abspath(output_dir)
script_path = os.path.join(output_dir, "run_xtrapol8.sh")
script_content = f"""#!/bin/bash
module purge
module load Xtrapol8/1.2.1
module load phenix/phenix-1.19.1-4122
module load ccp4
cd {output_dir}
phenix.python ${{XTRAPOL8_DIR}}/bin/Fextr.py xtrapol8_run.phil
"""
with open(script_path, "w") as f:
f.write(script_content)
os.chmod(script_path, 0o755)
return script_path
def run_shell_script(script_path):
"""
Executes a shell script and returns (success, stdout, stderr).
"""
result = subprocess.run(
["bash", script_path],
capture_output=True,
text=True
)
success = result.returncode == 0
return success, result.stdout, result.stderr
def parse_peak_file(peak_file):
if not os.path.exists(peak_file):
return None, None
top_pos_int = None
top_neg_int = None
header_count = 0
with open(peak_file, "r") as f:
for line in f:
line = line.strip()
if not line:
continue
# Detect start of table
if line.startswith("Resn"):
header_count += 1
continue
parts = line.split()
# Need at least 4 columns for tail parsing
if len(parts) < 4:
continue
# Check last 3 tokens are numeric
try:
float(parts[-1]) # voxels
Int_value = float(parts[-2])
float(parts[-3]) # peak
except ValueError:
continue
# First block = positive
if header_count == 1 and top_pos_int is None:
top_pos_int = Int_value
# Second block = negative
elif header_count == 2 and top_neg_int is None:
top_neg_int = Int_value
if top_pos_int is not None and top_neg_int is not None:
break
return top_pos_int, top_neg_int
def parse_xtrapol8_outputs(timestamp_dir):
log_file = None
peak_file = os.path.join(timestamp_dir, "peakintegration.txt")
for f in os.listdir(timestamp_dir):
if f.endswith("_Xtrapol8.log"):
log_file = os.path.join(timestamp_dir, f)
break
if log_file is None:
return (None,) * 8
Riso = None
CCiso = None
alpha_max = None
alpha_pearson = None
high_res = None
Rwork = None
Rfree = None
top_pos_int = None
top_neg_int = None
# --------------------------------------------------
# Parse LOG FILE
# --------------------------------------------------
with open(log_file, "r") as f:
for line in f:
# --- Resolution ---
if "Resolution range=" in line:
m = re.search(r"Resolution range=\s*[0-9.]+\s*-\s*([0-9.]+)", line)
if m:
high_res = float(m.group(1))
# --- Riso ---
if "Overall R_iso" in line:
m = re.search(r"Overall R_iso\s*=\s*([0-9.]+)", line)
if m:
Riso = float(m.group(1))
# --- CCiso ---
if "Overall cc_iso" in line:
m = re.search(r"Overall cc_iso\s*=\s*([0-9.]+)", line)
if m:
CCiso = float(m.group(1))
# --- Rwork ---
if "R_work:" in line:
m = re.search(r"R_work:\s*([0-9.]+)", line)
if m:
Rwork = float(m.group(1))
# --- Rfree ---
if "R_free:" in line:
m = re.search(r"R_free:\s*([0-9.]+)", line)
if m:
Rfree = float(m.group(1))
# --- Alpha (maximization) ---
if "Difference map maximization" in line:
m = re.search(r"Best alpha value is\s*([0-9.]+)", line)
if m:
alpha_max = float(m.group(1))
# --- Alpha (PearsonCC) ---
if "Difference map PearsonCC" in line:
m = re.search(r"Best alpha value is\s*([0-9.]+)", line)
if m:
alpha_pearson = float(m.group(1))
# --------------------------------------------------
# Parse peak integration file
# --------------------------------------------------
if os.path.exists(peak_file):
top_pos_int, top_neg_int = parse_peak_file(peak_file)
return (
Riso,
CCiso,
alpha_max,
alpha_pearson,
high_res,
Rwork,
Rfree,
top_pos_int,
top_neg_int
)
def find_successful_run(pair_root, debug=False):
if not os.path.isdir(pair_root):
print("not -", pair_root)
return None
# Get timestamp folders
subdirs = [
os.path.join(pair_root, d)
for d in os.listdir(pair_root)
if os.path.isdir(os.path.join(pair_root, d))
]
if not subdirs:
return None
# Newest first (timestamps sort lexicographically)
subdirs.sort(reverse=True)
for folder in subdirs:
if debug:
print("Checking:", folder)
# Look for *Xtrapol8.log exactly
log_files = glob.glob(os.path.join(folder, "*Xtrapol8.log"))
if not log_files:
if debug:
print(" No *Xtrapol8.log found")
continue
# Look for peakintegration.txt (correct spelling)
peak_file = os.path.join(folder, "peakintegration.txt")
if not os.path.exists(peak_file):
if debug:
print(" No peakintegration.txt found")
continue
if debug:
print(" SUCCESS:", folder)
return folder
return None
def build_pair_root(dark_mtz, raw_mtz, pdb_file, data_root="pdb_data"):
# --------------------------------------------------
# Absolute paths
# --------------------------------------------------
dark_mtz = os.path.abspath(dark_mtz)
raw_mtz = os.path.abspath(raw_mtz)
pdb_file = os.path.abspath(pdb_file)
data_root = os.path.abspath(data_root)
# --------------------------------------------------
# Extract dark code from pdb filename
# --------------------------------------------------
dark_code = os.path.basename(pdb_file).replace(".pdb", "").upper()
# --------------------------------------------------
# Extract raw code safely from MTZ filename
# --------------------------------------------------
m = re.search(r"r([0-9a-zA-Z]{4})", os.path.basename(raw_mtz))
if m:
raw_code = m.group(1).upper()
else:
raise ValueError(f"Could not extract raw PDB code from {raw_mtz}")
pair_name = f"{raw_code}_minus_{dark_code}"
# --------------------------------------------------
# Mirror directory structure relative to pdb_data
# --------------------------------------------------
rel_path = os.path.relpath(dark_mtz, start=data_root)
rel_dir = os.path.dirname(rel_path)
pair_root = os.path.abspath(
os.path.join(XTRAPOL8_ROOT, rel_dir, pair_name)
)
os.makedirs(pair_root, exist_ok=True)
return pair_root
def run_xtrapol8_pair(
dark_mtz,
raw_mtz,
pdb_file,
dmin,
data_root="pdb_data"
):
# --------------------------------------------------
# Absolute paths
# --------------------------------------------------
dark_mtz = os.path.abspath(dark_mtz)
raw_mtz = os.path.abspath(raw_mtz)
pdb_file = os.path.abspath(pdb_file)
# --------------------------------------------------
# Extract PDB codes
# --------------------------------------------------
dark_code = os.path.basename(pdb_file).replace(".pdb", "").upper()
# safer extraction than slicing
m = re.search(r"r([0-9a-zA-Z]{4})", os.path.basename(raw_mtz))
if m:
raw_code = m.group(1).upper()
else:
raw_code = "UNKNOWN"
pair_name = f"{raw_code}_minus_{dark_code}"
# --------------------------------------------------
# Build mirrored directory structure
# --------------------------------------------------
rel_path = os.path.relpath(dark_mtz, start=os.path.abspath(data_root))
rel_dir = os.path.dirname(rel_path)
pair_root = os.path.abspath(
os.path.join(XTRAPOL8_ROOT, rel_dir, pair_name)
)
os.makedirs(pair_root, exist_ok=True)
print(" Pair directory:", pair_root)
# --------------------------------------------------
# Collect CIF files from dataset folder
# --------------------------------------------------
dataset_folder = os.path.dirname(dark_mtz)
cif_files = [
os.path.join(dataset_folder, f)
for f in os.listdir(dataset_folder)
if f.lower().endswith(".cif")
]
# --------------------------------------------------
# Write PHIL file
# --------------------------------------------------
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
xtrapol8_dir = "xtrapol8_{0}".format( timestamp )
output_dir = os.path.join(pair_root, xtrapol8_dir)
run_phil = write_run_phil(
dark_mtz,
raw_mtz,
pdb_file,
cif_files,
output_dir,
pair_root,
dmin=dmin
)
# --------------------------------------------------
# Write shell script
# --------------------------------------------------
script_path = create_xtrapol8_shell_script(pair_root)
# --------------------------------------------------
# Execute Xtrapol8 via shell
# --------------------------------------------------
result = subprocess.run(
["bash", script_path],
capture_output=True,
text=True
)
if result.returncode != 0:
print("Xtrapol8 failed:")
print(result.stderr)
def analyze_folder(folder_path, dmin_cutoff=None):
paper_folder = os.path.basename(folder_path)
results = []
# --------------------------------------------------
# Find all MTZ files recursively
# --------------------------------------------------
mtz_files = []
for root, dirs, files in os.walk(folder_path):
for f in files:
if f.lower().endswith(".mtz"):
mtz_files.append(os.path.join(root, f))
if not mtz_files:
return pd.DataFrame()
# --------------------------------------------------
# Group by replicate (explicit dict)
# --------------------------------------------------
grouped = {}
for f in mtz_files:
name = os.path.basename(f).lower()
rep = extract_replicate(name)
if rep not in grouped:
grouped[rep] = {"dark": [], "raw": []}
if "dark" in name:
grouped[rep]["dark"].append(f)
elif "raw" in name:
grouped[rep]["raw"].append(f)
# --------------------------------------------------
# Loop through replicate groups
# --------------------------------------------------
df = pd.DataFrame()
for rep in sorted(grouped.keys()):
dark_files = grouped[rep]["dark"]
raw_files = grouped[rep]["raw"]
if not dark_files or not raw_files:
continue
for dark_file in dark_files:
print(f"Working on dark file: {dark_file}")
# Determine PDB path from dark file
folder = os.path.dirname(dark_file)
pdb_code = os.path.basename(dark_file)[1:5].upper()
pdb_path = os.path.join(folder, f"{pdb_code}.pdb")
if not os.path.exists(pdb_path):
print("Missing PDB:", pdb_path)
continue
for raw_file in raw_files:
print(f" Pairing with raw file: {raw_file}")
pair_root = build_pair_root(dark_file, raw_file, pdb_path)
successful_dir = find_successful_run(pair_root)
if successful_dir is None:
print(f" running Xtrapol8")
run_xtrapol8_pair(
dark_file,
raw_file,
pdb_path,
dmin_cutoff,
data_root="pdb_data"
)
successful_dir = find_successful_run(pair_root)
if successful_dir is None:
print("Pair failed")
return None
# --------------------------------------------------
# Parse log
# --------------------------------------------------
(
Riso,
CCiso,
alpha_max,
alpha_pearson,
high_res,
Rwork,
Rfree,
top_pos_int,
top_neg_int
) = parse_xtrapol8_outputs(successful_dir)
if alpha_max is not None:
print(
f" Overall Riso : {Riso}\n"
f" Overall CCiso : {CCiso}\n"
f" Alpha (maximization): {alpha_max} 1/b : {1/alpha_max} \n"
f" Alpha (Pearson CC) : {alpha_pearson} 1/b : {1/alpha_pearson} \n"
f" High resolution : {high_res}\n"
f" Rwork : {Rwork}\n"
f" Rfree : {Rfree}\n"
f" Top +ve peak : {top_pos_int}\n"
f" Top -ve peak : {top_neg_int}\n"
)
results = [{
"paper_folder": paper_folder,
"replicate": rep,
"dark_file": os.path.basename(dark_file),
"raw_file": os.path.basename(raw_file),
"Riso": Riso,
"CCiso": CCiso,
"alpha_max": alpha_max,
"alpha_person": alpha_pearson,
"high_res": high_res,
"Rwork": Rwork,
"Rfree": Rfree,
"top_pos_int": top_pos_int,
"top_neg_int": top_neg_int
}]
df_1 = pd.DataFrame( results )
df = pd.concat( ( df, df_1 ) )
return df
root = "pdb_data"
results_df = pd.DataFrame()
for sub in os.listdir(root):
path = os.path.join(root, sub)
if os.path.isdir(path):
df = analyze_folder(path, dmin_cutoff=None)
results_df = pd.concat( ( results_df, df ) )
print( results_df )
results_df.to_csv( "xtrapol8_analysis.csv", sep="," )