import os import subprocess import pandas as pd import numpy as np import regex as re import glob import datetime XTRAPOL8_ROOT = "xtrapol8_runs_2" TEMPLATE_PHIL = "xtrapol8.phil" # same directory as script def extract_replicate(filename): """ Extract replicate number from filename. Examples: dark_2.mtz -> 2 raw_3.mtz -> 3 dark.mtz -> 1 """ m = re.search(r'_(\d+)\.mtz$', filename.lower()) if m: return int(m.group(1)) return 1 def collect_cif_files(folder): """ Return list of absolute paths to .cif files in folder. """ cif_files = [] for f in os.listdir(folder): if f.lower().endswith(".cif"): cif_files.append(os.path.abspath(os.path.join(folder, f))) return cif_files def replace_phil_field(content, field_name, new_value): """ Replace a single-line PHIL field assignment. """ pattern = rf'({field_name}\s*=\s*)(.*)' replacement = rf'\1"{new_value}"' return re.sub(pattern, replacement, content) def replace_additional_files_block(content, new_block): """ Replace the additional_files line(s). """ pattern = r'additional_files\s*=.*' return re.sub(pattern, new_block, content) def write_run_phil( dark_mtz, raw_mtz, pdb_file, cif_files, output_dir, top_dir, dmin=None ): """ Copy template PHIL and modify required fields for this run. Returns path to new phil file. """ dark_mtz = os.path.abspath(dark_mtz) raw_mtz = os.path.abspath(raw_mtz) pdb_file = os.path.abspath(pdb_file) cif_files = [os.path.abspath(c) for c in cif_files] output_dir = os.path.abspath(output_dir) # Read template with open(TEMPLATE_PHIL, "r") as f: content = f.read() # Replace input paths content = replace_phil_field(content, "reference_mtz", dark_mtz) content = replace_phil_field(content, "triggered_mtz", raw_mtz) content = replace_phil_field(content, "reference_pdb", pdb_file) # Replace resolution if requested if dmin is not None: content = replace_phil_field(content, "high_resolution", str(dmin)) # Handle CIF files if cif_files: cif_block = "\n".join( [f' additional_files = "{c}"' for c in cif_files] ) content = replace_additional_files_block(content, cif_block) else: content = replace_phil_field(content, "additional_files", "None") # Replace output directory content = replace_phil_field(content, "outdir", output_dir) # Write run phil run_phil_path = os.path.join(top_dir, "xtrapol8_run.phil") with open(run_phil_path, "w") as f: f.write(content) return run_phil_path def create_xtrapol8_shell_script(output_dir): """ Creates shell script that: - Loads modules - Runs Xtrapol8 with phil file """ output_dir = os.path.abspath(output_dir) script_path = os.path.join(output_dir, "run_xtrapol8.sh") script_content = f"""#!/bin/bash module purge module load Xtrapol8/1.2.1 module load phenix/phenix-1.19.1-4122 module load ccp4 cd {output_dir} phenix.python ${{XTRAPOL8_DIR}}/bin/Fextr.py xtrapol8_run.phil """ with open(script_path, "w") as f: f.write(script_content) os.chmod(script_path, 0o755) return script_path def run_shell_script(script_path): """ Executes a shell script and returns (success, stdout, stderr). """ result = subprocess.run( ["bash", script_path], capture_output=True, text=True ) success = result.returncode == 0 return success, result.stdout, result.stderr def parse_peak_file(peak_file): if not os.path.exists(peak_file): return None, None top_pos_int = None top_neg_int = None header_count = 0 with open(peak_file, "r") as f: for line in f: line = line.strip() if not line: continue # Detect start of table if line.startswith("Resn"): header_count += 1 continue parts = line.split() # Need at least 4 columns for tail parsing if len(parts) < 4: continue # Check last 3 tokens are numeric try: float(parts[-1]) # voxels Int_value = float(parts[-2]) float(parts[-3]) # peak except ValueError: continue # First block = positive if header_count == 1 and top_pos_int is None: top_pos_int = Int_value # Second block = negative elif header_count == 2 and top_neg_int is None: top_neg_int = Int_value if top_pos_int is not None and top_neg_int is not None: break return top_pos_int, top_neg_int def parse_xtrapol8_outputs(timestamp_dir): log_file = None peak_file = os.path.join(timestamp_dir, "peakintegration.txt") for f in os.listdir(timestamp_dir): if f.endswith("_Xtrapol8.log"): log_file = os.path.join(timestamp_dir, f) break if log_file is None: return (None,) * 8 Riso = None CCiso = None alpha_max = None alpha_pearson = None high_res = None Rwork = None Rfree = None top_pos_int = None top_neg_int = None # -------------------------------------------------- # Parse LOG FILE # -------------------------------------------------- with open(log_file, "r") as f: for line in f: # --- Resolution --- if "Resolution range=" in line: m = re.search(r"Resolution range=\s*[0-9.]+\s*-\s*([0-9.]+)", line) if m: high_res = float(m.group(1)) # --- Riso --- if "Overall R_iso" in line: m = re.search(r"Overall R_iso\s*=\s*([0-9.]+)", line) if m: Riso = float(m.group(1)) # --- CCiso --- if "Overall cc_iso" in line: m = re.search(r"Overall cc_iso\s*=\s*([0-9.]+)", line) if m: CCiso = float(m.group(1)) # --- Rwork --- if "R_work:" in line: m = re.search(r"R_work:\s*([0-9.]+)", line) if m: Rwork = float(m.group(1)) # --- Rfree --- if "R_free:" in line: m = re.search(r"R_free:\s*([0-9.]+)", line) if m: Rfree = float(m.group(1)) # --- Alpha (maximization) --- if "Difference map maximization" in line: m = re.search(r"Best alpha value is\s*([0-9.]+)", line) if m: alpha_max = float(m.group(1)) # --- Alpha (PearsonCC) --- if "Difference map PearsonCC" in line: m = re.search(r"Best alpha value is\s*([0-9.]+)", line) if m: alpha_pearson = float(m.group(1)) # -------------------------------------------------- # Parse peak integration file # -------------------------------------------------- if os.path.exists(peak_file): top_pos_int, top_neg_int = parse_peak_file(peak_file) return ( Riso, CCiso, alpha_max, alpha_pearson, high_res, Rwork, Rfree, top_pos_int, top_neg_int ) def find_successful_run(pair_root, debug=False): if not os.path.isdir(pair_root): print("not -", pair_root) return None # Get timestamp folders subdirs = [ os.path.join(pair_root, d) for d in os.listdir(pair_root) if os.path.isdir(os.path.join(pair_root, d)) ] if not subdirs: return None # Newest first (timestamps sort lexicographically) subdirs.sort(reverse=True) for folder in subdirs: if debug: print("Checking:", folder) # Look for *Xtrapol8.log exactly log_files = glob.glob(os.path.join(folder, "*Xtrapol8.log")) if not log_files: if debug: print(" No *Xtrapol8.log found") continue # Look for peakintegration.txt (correct spelling) peak_file = os.path.join(folder, "peakintegration.txt") if not os.path.exists(peak_file): if debug: print(" No peakintegration.txt found") continue if debug: print(" SUCCESS:", folder) return folder return None def build_pair_root(dark_mtz, raw_mtz, pdb_file, data_root="pdb_data"): # -------------------------------------------------- # Absolute paths # -------------------------------------------------- dark_mtz = os.path.abspath(dark_mtz) raw_mtz = os.path.abspath(raw_mtz) pdb_file = os.path.abspath(pdb_file) data_root = os.path.abspath(data_root) # -------------------------------------------------- # Extract dark code from pdb filename # -------------------------------------------------- dark_code = os.path.basename(pdb_file).replace(".pdb", "").upper() # -------------------------------------------------- # Extract raw code safely from MTZ filename # -------------------------------------------------- m = re.search(r"r([0-9a-zA-Z]{4})", os.path.basename(raw_mtz)) if m: raw_code = m.group(1).upper() else: raise ValueError(f"Could not extract raw PDB code from {raw_mtz}") pair_name = f"{raw_code}_minus_{dark_code}" # -------------------------------------------------- # Mirror directory structure relative to pdb_data # -------------------------------------------------- rel_path = os.path.relpath(dark_mtz, start=data_root) rel_dir = os.path.dirname(rel_path) pair_root = os.path.abspath( os.path.join(XTRAPOL8_ROOT, rel_dir, pair_name) ) os.makedirs(pair_root, exist_ok=True) return pair_root def run_xtrapol8_pair( dark_mtz, raw_mtz, pdb_file, dmin, data_root="pdb_data" ): # -------------------------------------------------- # Absolute paths # -------------------------------------------------- dark_mtz = os.path.abspath(dark_mtz) raw_mtz = os.path.abspath(raw_mtz) pdb_file = os.path.abspath(pdb_file) # -------------------------------------------------- # Extract PDB codes # -------------------------------------------------- dark_code = os.path.basename(pdb_file).replace(".pdb", "").upper() # safer extraction than slicing m = re.search(r"r([0-9a-zA-Z]{4})", os.path.basename(raw_mtz)) if m: raw_code = m.group(1).upper() else: raw_code = "UNKNOWN" pair_name = f"{raw_code}_minus_{dark_code}" # -------------------------------------------------- # Build mirrored directory structure # -------------------------------------------------- rel_path = os.path.relpath(dark_mtz, start=os.path.abspath(data_root)) rel_dir = os.path.dirname(rel_path) pair_root = os.path.abspath( os.path.join(XTRAPOL8_ROOT, rel_dir, pair_name) ) os.makedirs(pair_root, exist_ok=True) print(" Pair directory:", pair_root) # -------------------------------------------------- # Collect CIF files from dataset folder # -------------------------------------------------- dataset_folder = os.path.dirname(dark_mtz) cif_files = [ os.path.join(dataset_folder, f) for f in os.listdir(dataset_folder) if f.lower().endswith(".cif") ] # -------------------------------------------------- # Write PHIL file # -------------------------------------------------- timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") xtrapol8_dir = "xtrapol8_{0}".format( timestamp ) output_dir = os.path.join(pair_root, xtrapol8_dir) run_phil = write_run_phil( dark_mtz, raw_mtz, pdb_file, cif_files, output_dir, pair_root, dmin=dmin ) # -------------------------------------------------- # Write shell script # -------------------------------------------------- script_path = create_xtrapol8_shell_script(pair_root) # -------------------------------------------------- # Execute Xtrapol8 via shell # -------------------------------------------------- result = subprocess.run( ["bash", script_path], capture_output=True, text=True ) if result.returncode != 0: print("Xtrapol8 failed:") print(result.stderr) def analyze_folder(folder_path, dmin_cutoff=None): paper_folder = os.path.basename(folder_path) results = [] # -------------------------------------------------- # Find all MTZ files recursively # -------------------------------------------------- mtz_files = [] for root, dirs, files in os.walk(folder_path): for f in files: if f.lower().endswith(".mtz"): mtz_files.append(os.path.join(root, f)) if not mtz_files: return pd.DataFrame() # -------------------------------------------------- # Group by replicate (explicit dict) # -------------------------------------------------- grouped = {} for f in mtz_files: name = os.path.basename(f).lower() rep = extract_replicate(name) if rep not in grouped: grouped[rep] = {"dark": [], "raw": []} if "dark" in name: grouped[rep]["dark"].append(f) elif "raw" in name: grouped[rep]["raw"].append(f) # -------------------------------------------------- # Loop through replicate groups # -------------------------------------------------- df = pd.DataFrame() for rep in sorted(grouped.keys()): dark_files = grouped[rep]["dark"] raw_files = grouped[rep]["raw"] if not dark_files or not raw_files: continue for dark_file in dark_files: print(f"Working on dark file: {dark_file}") # Determine PDB path from dark file folder = os.path.dirname(dark_file) pdb_code = os.path.basename(dark_file)[1:5].upper() pdb_path = os.path.join(folder, f"{pdb_code}.pdb") if not os.path.exists(pdb_path): print("Missing PDB:", pdb_path) continue for raw_file in raw_files: print(f" Pairing with raw file: {raw_file}") pair_root = build_pair_root(dark_file, raw_file, pdb_path) successful_dir = find_successful_run(pair_root) if successful_dir is None: print(f" running Xtrapol8") run_xtrapol8_pair( dark_file, raw_file, pdb_path, dmin_cutoff, data_root="pdb_data" ) successful_dir = find_successful_run(pair_root) if successful_dir is None: print("Pair failed") return None # -------------------------------------------------- # Parse log # -------------------------------------------------- ( Riso, CCiso, alpha_max, alpha_pearson, high_res, Rwork, Rfree, top_pos_int, top_neg_int ) = parse_xtrapol8_outputs(successful_dir) if alpha_max is not None: print( f" Overall Riso : {Riso}\n" f" Overall CCiso : {CCiso}\n" f" Alpha (maximization): {alpha_max} 1/b : {1/alpha_max} \n" f" Alpha (Pearson CC) : {alpha_pearson} 1/b : {1/alpha_pearson} \n" f" High resolution : {high_res}\n" f" Rwork : {Rwork}\n" f" Rfree : {Rfree}\n" f" Top +ve peak : {top_pos_int}\n" f" Top -ve peak : {top_neg_int}\n" ) results = [{ "paper_folder": paper_folder, "replicate": rep, "dark_file": os.path.basename(dark_file), "raw_file": os.path.basename(raw_file), "Riso": Riso, "CCiso": CCiso, "alpha_max": alpha_max, "alpha_person": alpha_pearson, "high_res": high_res, "Rwork": Rwork, "Rfree": Rfree, "top_pos_int": top_pos_int, "top_neg_int": top_neg_int }] df_1 = pd.DataFrame( results ) df = pd.concat( ( df, df_1 ) ) return df root = "pdb_data" results_df = pd.DataFrame() for sub in os.listdir(root): path = os.path.join(root, sub) if os.path.isdir(path): df = analyze_folder(path, dmin_cutoff=None) results_df = pd.concat( ( results_df, df ) ) print( results_df ) results_df.to_csv( "xtrapol8_analysis.csv", sep="," )