604 lines
17 KiB
Python
604 lines
17 KiB
Python
import os
|
|
import subprocess
|
|
import pandas as pd
|
|
import numpy as np
|
|
import regex as re
|
|
import glob
|
|
import datetime
|
|
|
|
XTRAPOL8_ROOT = "xtrapol8_runs_2"
|
|
TEMPLATE_PHIL = "xtrapol8.phil" # same directory as script
|
|
|
|
def extract_replicate(filename):
|
|
"""
|
|
Extract replicate number from filename.
|
|
Examples:
|
|
dark_2.mtz -> 2
|
|
raw_3.mtz -> 3
|
|
dark.mtz -> 1
|
|
"""
|
|
m = re.search(r'_(\d+)\.mtz$', filename.lower())
|
|
if m:
|
|
return int(m.group(1))
|
|
return 1
|
|
|
|
def collect_cif_files(folder):
|
|
"""
|
|
Return list of absolute paths to .cif files in folder.
|
|
"""
|
|
cif_files = []
|
|
|
|
for f in os.listdir(folder):
|
|
if f.lower().endswith(".cif"):
|
|
cif_files.append(os.path.abspath(os.path.join(folder, f)))
|
|
|
|
return cif_files
|
|
|
|
def replace_phil_field(content, field_name, new_value):
|
|
"""
|
|
Replace a single-line PHIL field assignment.
|
|
"""
|
|
pattern = rf'({field_name}\s*=\s*)(.*)'
|
|
replacement = rf'\1"{new_value}"'
|
|
return re.sub(pattern, replacement, content)
|
|
|
|
|
|
def replace_additional_files_block(content, new_block):
|
|
"""
|
|
Replace the additional_files line(s).
|
|
"""
|
|
pattern = r'additional_files\s*=.*'
|
|
return re.sub(pattern, new_block, content)
|
|
|
|
def write_run_phil(
|
|
dark_mtz,
|
|
raw_mtz,
|
|
pdb_file,
|
|
cif_files,
|
|
output_dir,
|
|
top_dir,
|
|
dmin=None
|
|
):
|
|
"""
|
|
Copy template PHIL and modify required fields for this run.
|
|
Returns path to new phil file.
|
|
"""
|
|
|
|
dark_mtz = os.path.abspath(dark_mtz)
|
|
raw_mtz = os.path.abspath(raw_mtz)
|
|
pdb_file = os.path.abspath(pdb_file)
|
|
cif_files = [os.path.abspath(c) for c in cif_files]
|
|
output_dir = os.path.abspath(output_dir)
|
|
|
|
# Read template
|
|
with open(TEMPLATE_PHIL, "r") as f:
|
|
content = f.read()
|
|
|
|
# Replace input paths
|
|
content = replace_phil_field(content, "reference_mtz", dark_mtz)
|
|
content = replace_phil_field(content, "triggered_mtz", raw_mtz)
|
|
content = replace_phil_field(content, "reference_pdb", pdb_file)
|
|
|
|
# Replace resolution if requested
|
|
if dmin is not None:
|
|
content = replace_phil_field(content, "high_resolution", str(dmin))
|
|
|
|
# Handle CIF files
|
|
if cif_files:
|
|
cif_block = "\n".join(
|
|
[f' additional_files = "{c}"' for c in cif_files]
|
|
)
|
|
content = replace_additional_files_block(content, cif_block)
|
|
else:
|
|
content = replace_phil_field(content, "additional_files", "None")
|
|
|
|
# Replace output directory
|
|
content = replace_phil_field(content, "outdir", output_dir)
|
|
|
|
# Write run phil
|
|
run_phil_path = os.path.join(top_dir, "xtrapol8_run.phil")
|
|
|
|
with open(run_phil_path, "w") as f:
|
|
f.write(content)
|
|
|
|
return run_phil_path
|
|
|
|
def create_xtrapol8_shell_script(output_dir):
|
|
"""
|
|
Creates shell script that:
|
|
- Loads modules
|
|
- Runs Xtrapol8 with phil file
|
|
"""
|
|
|
|
output_dir = os.path.abspath(output_dir)
|
|
script_path = os.path.join(output_dir, "run_xtrapol8.sh")
|
|
|
|
script_content = f"""#!/bin/bash
|
|
module purge
|
|
module load Xtrapol8/1.2.1
|
|
module load phenix/phenix-1.19.1-4122
|
|
module load ccp4
|
|
|
|
cd {output_dir}
|
|
|
|
phenix.python ${{XTRAPOL8_DIR}}/bin/Fextr.py xtrapol8_run.phil
|
|
"""
|
|
|
|
with open(script_path, "w") as f:
|
|
f.write(script_content)
|
|
|
|
os.chmod(script_path, 0o755)
|
|
|
|
return script_path
|
|
|
|
def run_shell_script(script_path):
|
|
"""
|
|
Executes a shell script and returns (success, stdout, stderr).
|
|
"""
|
|
|
|
result = subprocess.run(
|
|
["bash", script_path],
|
|
capture_output=True,
|
|
text=True
|
|
)
|
|
|
|
success = result.returncode == 0
|
|
return success, result.stdout, result.stderr
|
|
|
|
def parse_peak_file(peak_file):
|
|
|
|
if not os.path.exists(peak_file):
|
|
return None, None
|
|
|
|
top_pos_int = None
|
|
top_neg_int = None
|
|
header_count = 0
|
|
|
|
with open(peak_file, "r") as f:
|
|
for line in f:
|
|
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
|
|
# Detect start of table
|
|
if line.startswith("Resn"):
|
|
header_count += 1
|
|
continue
|
|
|
|
parts = line.split()
|
|
|
|
# Need at least 4 columns for tail parsing
|
|
if len(parts) < 4:
|
|
continue
|
|
|
|
# Check last 3 tokens are numeric
|
|
try:
|
|
float(parts[-1]) # voxels
|
|
Int_value = float(parts[-2])
|
|
float(parts[-3]) # peak
|
|
except ValueError:
|
|
continue
|
|
|
|
# First block = positive
|
|
if header_count == 1 and top_pos_int is None:
|
|
top_pos_int = Int_value
|
|
|
|
# Second block = negative
|
|
elif header_count == 2 and top_neg_int is None:
|
|
top_neg_int = Int_value
|
|
|
|
if top_pos_int is not None and top_neg_int is not None:
|
|
break
|
|
|
|
return top_pos_int, top_neg_int
|
|
|
|
def parse_xtrapol8_outputs(timestamp_dir):
|
|
|
|
log_file = None
|
|
peak_file = os.path.join(timestamp_dir, "peakintegration.txt")
|
|
|
|
for f in os.listdir(timestamp_dir):
|
|
if f.endswith("_Xtrapol8.log"):
|
|
log_file = os.path.join(timestamp_dir, f)
|
|
break
|
|
|
|
if log_file is None:
|
|
return (None,) * 8
|
|
|
|
Riso = None
|
|
CCiso = None
|
|
alpha_max = None
|
|
alpha_pearson = None
|
|
high_res = None
|
|
Rwork = None
|
|
Rfree = None
|
|
top_pos_int = None
|
|
top_neg_int = None
|
|
|
|
# --------------------------------------------------
|
|
# Parse LOG FILE
|
|
# --------------------------------------------------
|
|
with open(log_file, "r") as f:
|
|
for line in f:
|
|
|
|
# --- Resolution ---
|
|
if "Resolution range=" in line:
|
|
m = re.search(r"Resolution range=\s*[0-9.]+\s*-\s*([0-9.]+)", line)
|
|
if m:
|
|
high_res = float(m.group(1))
|
|
|
|
# --- Riso ---
|
|
if "Overall R_iso" in line:
|
|
m = re.search(r"Overall R_iso\s*=\s*([0-9.]+)", line)
|
|
if m:
|
|
Riso = float(m.group(1))
|
|
|
|
# --- CCiso ---
|
|
if "Overall cc_iso" in line:
|
|
m = re.search(r"Overall cc_iso\s*=\s*([0-9.]+)", line)
|
|
if m:
|
|
CCiso = float(m.group(1))
|
|
|
|
# --- Rwork ---
|
|
if "R_work:" in line:
|
|
m = re.search(r"R_work:\s*([0-9.]+)", line)
|
|
if m:
|
|
Rwork = float(m.group(1))
|
|
|
|
# --- Rfree ---
|
|
if "R_free:" in line:
|
|
m = re.search(r"R_free:\s*([0-9.]+)", line)
|
|
if m:
|
|
Rfree = float(m.group(1))
|
|
|
|
# --- Alpha (maximization) ---
|
|
if "Difference map maximization" in line:
|
|
m = re.search(r"Best alpha value is\s*([0-9.]+)", line)
|
|
if m:
|
|
alpha_max = float(m.group(1))
|
|
|
|
# --- Alpha (PearsonCC) ---
|
|
if "Difference map PearsonCC" in line:
|
|
m = re.search(r"Best alpha value is\s*([0-9.]+)", line)
|
|
if m:
|
|
alpha_pearson = float(m.group(1))
|
|
|
|
# --------------------------------------------------
|
|
# Parse peak integration file
|
|
# --------------------------------------------------
|
|
if os.path.exists(peak_file):
|
|
|
|
top_pos_int, top_neg_int = parse_peak_file(peak_file)
|
|
|
|
return (
|
|
Riso,
|
|
CCiso,
|
|
alpha_max,
|
|
alpha_pearson,
|
|
high_res,
|
|
Rwork,
|
|
Rfree,
|
|
top_pos_int,
|
|
top_neg_int
|
|
)
|
|
|
|
def find_successful_run(pair_root, debug=False):
|
|
|
|
if not os.path.isdir(pair_root):
|
|
print("not -", pair_root)
|
|
return None
|
|
|
|
# Get timestamp folders
|
|
subdirs = [
|
|
os.path.join(pair_root, d)
|
|
for d in os.listdir(pair_root)
|
|
if os.path.isdir(os.path.join(pair_root, d))
|
|
]
|
|
|
|
if not subdirs:
|
|
return None
|
|
|
|
# Newest first (timestamps sort lexicographically)
|
|
subdirs.sort(reverse=True)
|
|
|
|
for folder in subdirs:
|
|
|
|
if debug:
|
|
print("Checking:", folder)
|
|
|
|
# Look for *Xtrapol8.log exactly
|
|
log_files = glob.glob(os.path.join(folder, "*Xtrapol8.log"))
|
|
|
|
if not log_files:
|
|
if debug:
|
|
print(" No *Xtrapol8.log found")
|
|
continue
|
|
|
|
# Look for peakintegration.txt (correct spelling)
|
|
peak_file = os.path.join(folder, "peakintegration.txt")
|
|
|
|
if not os.path.exists(peak_file):
|
|
if debug:
|
|
print(" No peakintegration.txt found")
|
|
continue
|
|
|
|
if debug:
|
|
print(" SUCCESS:", folder)
|
|
|
|
return folder
|
|
|
|
return None
|
|
|
|
def build_pair_root(dark_mtz, raw_mtz, pdb_file, data_root="pdb_data"):
|
|
|
|
# --------------------------------------------------
|
|
# Absolute paths
|
|
# --------------------------------------------------
|
|
dark_mtz = os.path.abspath(dark_mtz)
|
|
raw_mtz = os.path.abspath(raw_mtz)
|
|
pdb_file = os.path.abspath(pdb_file)
|
|
data_root = os.path.abspath(data_root)
|
|
|
|
# --------------------------------------------------
|
|
# Extract dark code from pdb filename
|
|
# --------------------------------------------------
|
|
dark_code = os.path.basename(pdb_file).replace(".pdb", "").upper()
|
|
|
|
# --------------------------------------------------
|
|
# Extract raw code safely from MTZ filename
|
|
# --------------------------------------------------
|
|
m = re.search(r"r([0-9a-zA-Z]{4})", os.path.basename(raw_mtz))
|
|
if m:
|
|
raw_code = m.group(1).upper()
|
|
else:
|
|
raise ValueError(f"Could not extract raw PDB code from {raw_mtz}")
|
|
|
|
pair_name = f"{raw_code}_minus_{dark_code}"
|
|
|
|
# --------------------------------------------------
|
|
# Mirror directory structure relative to pdb_data
|
|
# --------------------------------------------------
|
|
rel_path = os.path.relpath(dark_mtz, start=data_root)
|
|
rel_dir = os.path.dirname(rel_path)
|
|
|
|
pair_root = os.path.abspath(
|
|
os.path.join(XTRAPOL8_ROOT, rel_dir, pair_name)
|
|
)
|
|
|
|
os.makedirs(pair_root, exist_ok=True)
|
|
|
|
return pair_root
|
|
|
|
def run_xtrapol8_pair(
|
|
dark_mtz,
|
|
raw_mtz,
|
|
pdb_file,
|
|
dmin,
|
|
data_root="pdb_data"
|
|
):
|
|
|
|
# --------------------------------------------------
|
|
# Absolute paths
|
|
# --------------------------------------------------
|
|
dark_mtz = os.path.abspath(dark_mtz)
|
|
raw_mtz = os.path.abspath(raw_mtz)
|
|
pdb_file = os.path.abspath(pdb_file)
|
|
|
|
# --------------------------------------------------
|
|
# Extract PDB codes
|
|
# --------------------------------------------------
|
|
dark_code = os.path.basename(pdb_file).replace(".pdb", "").upper()
|
|
|
|
# safer extraction than slicing
|
|
m = re.search(r"r([0-9a-zA-Z]{4})", os.path.basename(raw_mtz))
|
|
if m:
|
|
raw_code = m.group(1).upper()
|
|
else:
|
|
raw_code = "UNKNOWN"
|
|
|
|
pair_name = f"{raw_code}_minus_{dark_code}"
|
|
|
|
# --------------------------------------------------
|
|
# Build mirrored directory structure
|
|
# --------------------------------------------------
|
|
rel_path = os.path.relpath(dark_mtz, start=os.path.abspath(data_root))
|
|
rel_dir = os.path.dirname(rel_path)
|
|
|
|
pair_root = os.path.abspath(
|
|
os.path.join(XTRAPOL8_ROOT, rel_dir, pair_name)
|
|
)
|
|
|
|
os.makedirs(pair_root, exist_ok=True)
|
|
|
|
print(" Pair directory:", pair_root)
|
|
|
|
# --------------------------------------------------
|
|
# Collect CIF files from dataset folder
|
|
# --------------------------------------------------
|
|
dataset_folder = os.path.dirname(dark_mtz)
|
|
cif_files = [
|
|
os.path.join(dataset_folder, f)
|
|
for f in os.listdir(dataset_folder)
|
|
if f.lower().endswith(".cif")
|
|
]
|
|
|
|
# --------------------------------------------------
|
|
# Write PHIL file
|
|
# --------------------------------------------------
|
|
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
xtrapol8_dir = "xtrapol8_{0}".format( timestamp )
|
|
output_dir = os.path.join(pair_root, xtrapol8_dir)
|
|
|
|
run_phil = write_run_phil(
|
|
dark_mtz,
|
|
raw_mtz,
|
|
pdb_file,
|
|
cif_files,
|
|
output_dir,
|
|
pair_root,
|
|
dmin=dmin
|
|
)
|
|
|
|
# --------------------------------------------------
|
|
# Write shell script
|
|
# --------------------------------------------------
|
|
script_path = create_xtrapol8_shell_script(pair_root)
|
|
|
|
# --------------------------------------------------
|
|
# Execute Xtrapol8 via shell
|
|
# --------------------------------------------------
|
|
|
|
result = subprocess.run(
|
|
["bash", script_path],
|
|
capture_output=True,
|
|
text=True
|
|
)
|
|
|
|
if result.returncode != 0:
|
|
print("Xtrapol8 failed:")
|
|
print(result.stderr)
|
|
|
|
def analyze_folder(folder_path, dmin_cutoff=None):
|
|
|
|
paper_folder = os.path.basename(folder_path)
|
|
results = []
|
|
|
|
# --------------------------------------------------
|
|
# Find all MTZ files recursively
|
|
# --------------------------------------------------
|
|
mtz_files = []
|
|
for root, dirs, files in os.walk(folder_path):
|
|
for f in files:
|
|
if f.lower().endswith(".mtz"):
|
|
mtz_files.append(os.path.join(root, f))
|
|
|
|
if not mtz_files:
|
|
return pd.DataFrame()
|
|
|
|
# --------------------------------------------------
|
|
# Group by replicate (explicit dict)
|
|
# --------------------------------------------------
|
|
grouped = {}
|
|
|
|
for f in mtz_files:
|
|
name = os.path.basename(f).lower()
|
|
rep = extract_replicate(name)
|
|
|
|
if rep not in grouped:
|
|
grouped[rep] = {"dark": [], "raw": []}
|
|
|
|
if "dark" in name:
|
|
grouped[rep]["dark"].append(f)
|
|
elif "raw" in name:
|
|
grouped[rep]["raw"].append(f)
|
|
|
|
# --------------------------------------------------
|
|
# Loop through replicate groups
|
|
# --------------------------------------------------
|
|
df = pd.DataFrame()
|
|
|
|
for rep in sorted(grouped.keys()):
|
|
|
|
dark_files = grouped[rep]["dark"]
|
|
raw_files = grouped[rep]["raw"]
|
|
|
|
if not dark_files or not raw_files:
|
|
continue
|
|
|
|
for dark_file in dark_files:
|
|
|
|
print(f"Working on dark file: {dark_file}")
|
|
|
|
# Determine PDB path from dark file
|
|
folder = os.path.dirname(dark_file)
|
|
pdb_code = os.path.basename(dark_file)[1:5].upper()
|
|
pdb_path = os.path.join(folder, f"{pdb_code}.pdb")
|
|
|
|
if not os.path.exists(pdb_path):
|
|
print("Missing PDB:", pdb_path)
|
|
continue
|
|
|
|
for raw_file in raw_files:
|
|
|
|
print(f" Pairing with raw file: {raw_file}")
|
|
|
|
pair_root = build_pair_root(dark_file, raw_file, pdb_path)
|
|
|
|
successful_dir = find_successful_run(pair_root)
|
|
|
|
if successful_dir is None:
|
|
print(f" running Xtrapol8")
|
|
run_xtrapol8_pair(
|
|
dark_file,
|
|
raw_file,
|
|
pdb_path,
|
|
dmin_cutoff,
|
|
data_root="pdb_data"
|
|
)
|
|
successful_dir = find_successful_run(pair_root)
|
|
|
|
if successful_dir is None:
|
|
print("Pair failed")
|
|
return None
|
|
|
|
# --------------------------------------------------
|
|
# Parse log
|
|
# --------------------------------------------------
|
|
(
|
|
Riso,
|
|
CCiso,
|
|
alpha_max,
|
|
alpha_pearson,
|
|
high_res,
|
|
Rwork,
|
|
Rfree,
|
|
top_pos_int,
|
|
top_neg_int
|
|
) = parse_xtrapol8_outputs(successful_dir)
|
|
|
|
if alpha_max is not None:
|
|
print(
|
|
f" Overall Riso : {Riso}\n"
|
|
f" Overall CCiso : {CCiso}\n"
|
|
f" Alpha (maximization): {alpha_max} 1/b : {1/alpha_max} \n"
|
|
f" Alpha (Pearson CC) : {alpha_pearson} 1/b : {1/alpha_pearson} \n"
|
|
f" High resolution : {high_res}\n"
|
|
f" Rwork : {Rwork}\n"
|
|
f" Rfree : {Rfree}\n"
|
|
f" Top +ve peak : {top_pos_int}\n"
|
|
f" Top -ve peak : {top_neg_int}\n"
|
|
)
|
|
|
|
results = [{
|
|
"paper_folder": paper_folder,
|
|
"replicate": rep,
|
|
"dark_file": os.path.basename(dark_file),
|
|
"raw_file": os.path.basename(raw_file),
|
|
"Riso": Riso,
|
|
"CCiso": CCiso,
|
|
"alpha_max": alpha_max,
|
|
"alpha_person": alpha_pearson,
|
|
"high_res": high_res,
|
|
"Rwork": Rwork,
|
|
"Rfree": Rfree,
|
|
"top_pos_int": top_pos_int,
|
|
"top_neg_int": top_neg_int
|
|
}]
|
|
df_1 = pd.DataFrame( results )
|
|
df = pd.concat( ( df, df_1 ) )
|
|
|
|
return df
|
|
|
|
root = "pdb_data"
|
|
results_df = pd.DataFrame()
|
|
|
|
for sub in os.listdir(root):
|
|
path = os.path.join(root, sub)
|
|
if os.path.isdir(path):
|
|
df = analyze_folder(path, dmin_cutoff=None)
|
|
|
|
results_df = pd.concat( ( results_df, df ) )
|
|
|
|
print( results_df )
|
|
results_df.to_csv( "xtrapol8_analysis.csv", sep="," ) |