Files
extrapolation/barends_refine.py

644 lines
15 KiB
Python

#!/usr/bin/env python3
import os
import subprocess
import argparse
import numpy as np
import yaml
import math
import re
# -------------------------------------------------
# find value at point
# -------------------------------------------------
def load_rois(yml):
with open(yml) as f:
data = yaml.safe_load(f)
print(data)
return data["regions"]
# -------------------------------------------------
# run phenix.map_value_at_point
# -------------------------------------------------
def run_map_values(mtz, pdb, outfile):
script = outfile.replace(".dat", ".sh")
with open(script, "w") as f:
f.write("#!/bin/bash\n")
f.write("module load phenix\n\n")
f.write(
f"phenix.map_value_at_point {mtz} {pdb} "
f"label=\"FOFCWT\" > {outfile}\n"
)
os.chmod(script, 0o755)
subprocess.run(["bash", script], check=True)
def parse_mapvalues(filename):
atoms = []
with open(filename) as f:
for line in f:
if "Map value:" not in line:
continue
pdbline = line.split('"')[1]
atom = pdbline[12:16].strip()
altloc = pdbline[16].strip()
resn = pdbline[17:20].strip()
chain = pdbline[21].strip()
resid = int(pdbline[22:26])
value = float(line.split("Map value:")[1])
atoms.append({
"atom": atom,
"resn": resn,
"chain": chain,
"resid": resid,
"altloc": altloc,
"value": value
})
return atoms
# -------------------------------------------------
# read atoms from pdb
# -------------------------------------------------
def read_pdb_atoms(pdbfile):
atoms = []
with open(pdbfile) as f:
for line in f:
if not (line.startswith("ATOM") or line.startswith("HETATM")):
continue
atom = line[12:16].strip()
altloc = line[16].strip()
resn = line[17:20].strip()
chain = line[21].strip()
resid = int(line[22:26])
x = float(line[30:38])
y = float(line[38:46])
z = float(line[46:54])
atoms.append({
"atom": atom,
"resn": resn,
"chain": chain,
"resid": resid,
"altloc": altloc,
"xyz": (x,y,z)
})
return atoms
def distance(a,b):
dx = a[0]-b[0]
dy = a[1]-b[1]
dz = a[2]-b[2]
return math.sqrt(dx*dx + dy*dy + dz*dz)
# -------------------------------------------------
# find ROI atom
# -------------------------------------------------
def find_roi_atom(atoms, roi):
for a in atoms:
if a["atom"] != roi["atom"]:
continue
if a["resn"] != roi["res_name"]:
continue
if a["chain"] != roi["chain"]:
continue
if a["resid"] != roi["resid"]:
continue
return a
raise RuntimeError("ROI atom not found")
# -------------------------------------------------
# atoms within radius
# -------------------------------------------------
def atoms_within_radius(atoms, center_atom, radius=2.0, max_neighbors=2):
"""
Return the center_atom and up to max_neighbors atoms closest to it within radius.
"""
neighbors = []
cx, cy, cz = center_atom["xyz"]
# compute distances
for atom in atoms:
d = distance(atom["xyz"], (cx, cy, cz))
if d <= radius:
neighbors.append((d, atom))
# sort by distance
neighbors.sort(key=lambda x: x[0])
# always include the ROI atom itself
result = [atom for d, atom in neighbors if atom == center_atom]
# add up to max_neighbors other atoms
count = 0
for d, atom in neighbors:
if atom == center_atom:
continue
result.append(atom)
count += 1
if count >= max_neighbors:
break
return result
# -------------------------------------------------
# site refinement functions
# -------------------------------------------------
def is_mainchain(atom):
mainchain_atoms = {"N", "CA", "C", "O"}
if atom["resn"] in ["HOH", "WAT"]:
return False
if atom["atom"] in mainchain_atoms:
return True
return False
def split_altloc(atoms):
dark = []
triggered = []
for a in atoms:
if a["altloc"] in ["A", "B"]:
dark.append(a)
elif a["altloc"] in ["C", "D"]:
triggered.append(a)
return dark, triggered
def build_refinement_group(neighbors):
dark_atoms, trig_atoms = split_altloc(neighbors)
group = {
"triggered": [],
"dark": []
}
# --- triggered atoms ---
for a in trig_atoms:
if is_mainchain(a):
group["triggered"].append(
("residue", a["chain"], a["resid"])
)
else:
group["triggered"].append(
("atom", a["chain"], a["resid"], a["atom"])
)
# --- dark atoms ---
for a in dark_atoms:
if is_mainchain(a):
group["dark"].append(
("residue", a["chain"], a["resid"])
)
else:
group["dark"].append(
("atom", a["chain"], a["resid"], a["atom"])
)
return group
# -------------------------------------------------
# find zero density crossing point
# -------------------------------------------------
def find_zero_crossing(data):
occs = np.array([x[0] for x in data])
vals = np.array([x[1] for x in data])
p = np.polyfit(occs, vals, 2)
roots = np.roots(p)
for r in roots:
if 0 < r < 1:
return float(r)
return None
# -------------------------------------------------
# extract ROI densities
# -------------------------------------------------
def extract_roi_values(atoms, regions):
triggered_altloc = ["C", "D"]
roi_values = {}
for name, region in regions.items():
vals = []
for target in region["atoms"]:
for entry in atoms:
if entry["altloc"] not in triggered_altloc:
continue
if entry["atom"] != target["atom"]:
continue
if entry["resn"] != target["res_name"]:
continue
if entry["chain"] != target["chain"]:
continue
if entry["resid"] != target["resid"]:
continue
vals.append(entry["value"])
if vals:
roi_values[name] = sum(vals)/len(vals)
else:
roi_values[name] = None
return roi_values
# -------------------------------------------------
# run phenix refinement
# -------------------------------------------------
def run_refinement(params, tag):
script = f"{tag}_refine.sh"
with open(script, "w") as f:
f.write("#!/bin/bash\n")
f.write("module load phenix\n\n")
f.write(f"phenix.refine {params}")
os.chmod(script, 0o755)
subprocess.run(["bash", script], check=True)
# -------------------------------------------------
# update parameter file
# -------------------------------------------------
def write_params(template, output, pdb, mtz, prefix):
with open(template) as f:
text = f.read()
text = text.replace(
"file_name = None",
f"file_name = {pdb}",
1
)
text = text.replace(
"xray_data {\n file_name = None",
f"xray_data {{\n file_name = {mtz}"
)
text = text.replace(
"prefix = None",
f"prefix = {prefix}"
)
with open(output, "w") as f:
f.write(text)
# -------------------------------------------------
# extract Rfree from phenix log
# -------------------------------------------------
def extract_rfree(logfile):
rfree = None
with open(logfile) as f:
for line in f:
m = re.search(r"R-free\s*=\s*([0-9.]+)", line)
if m:
rfree = float(m.group(1))
if rfree is None:
raise RuntimeError("Could not extract R-free from log")
return rfree
# -------------------------------------------------
# occupancy scan
# -------------------------------------------------
def run_coarse_scan(args):
cwd = os.getcwd()
ensemble_builder = os.path.abspath(args.builder)
mtz = os.path.abspath(args.mtz)
params_input = os.path.join(cwd, "ensemble.params")
dark = os.path.join(cwd, args.dark)
light = os.path.join(cwd, args.light)
yml = os.path.abspath(args.roi_yml)
# read yml regions
rois = load_rois(yml)
occ_values = np.round(np.arange(
args.min_occ,
args.max_occ + args.step,
args.step
),3)
results = []
os.makedirs(args.workdir, exist_ok=True)
for occ in occ_values:
tag = f"occ_{occ:.2f}"
rundir = os.path.join(cwd, args.workdir, tag)
ensemble_pdb = os.path.join(rundir, f"ensemble_{tag}.pdb")
os.makedirs(rundir, exist_ok=True)
print("\n====================================")
print("Running occupancy:", occ)
print("====================================")
# parameter file name
params_occ = f"ensemble_{tag}.params"
# check if refinement already finished
output_pdb = os.path.join(rundir, f"{tag}_001.pdb" )
if os.path.exists(output_pdb):
print("Refinement already exists - skipping:", output_pdb)
else:
os.chdir(rundir)
# build ensemble
subprocess.run([
"python",
ensemble_builder,
"-r", dark,
"-t", light,
"-f", str(occ),
"-o", ensemble_pdb
], check=True)
# write paramater file
write_params(
params_input,
params_occ,
ensemble_pdb,
mtz,
tag
)
# run refinement
run_refinement(
params_occ,
tag
)
os.chdir(cwd)
# find log file
log = os.path.join(rundir, f"{tag}_001.log")
rfree = extract_rfree(log)
print("Rfree:", rfree)
# -------------------------------------------------
# compute map densities
# -------------------------------------------------
refined_pdb = os.path.join(rundir, f"{tag}_001.pdb")
refined_mtz = os.path.join(rundir, f"{tag}_001.mtz")
mapfile = os.path.join(rundir, f"{tag}_mapvalues.dat")
if not os.path.exists(mapfile):
print("Running map_value_at_point...")
os.chdir(rundir)
run_map_values(refined_mtz, refined_pdb, mapfile)
os.chdir(cwd)
print("parsing map values")
# exact map values
map_atoms = parse_mapvalues(mapfile)
# match atoms to rois
roi_vals = extract_roi_values(map_atoms, rois)
results.append({
"occ": occ,
"rfree": rfree,
"roi": roi_vals
})
return results
# -------------------------------------------------
# site-specific occupancy scan
# -------------------------------------------------
def scan_site(args, site_name, site_info, occ_values):
results = []
print("\n==============================")
print("Scanning site:", site_name)
print("==============================")
for occ in occ_values:
tag = f"{site_name}_occ_{occ:.3f}"
rundir = os.path.join(args.workdir, "site_scans", tag)
os.makedirs(rundir, exist_ok=True)
ensemble_pdb = os.path.join(rundir, f"ensemble_{tag}.pdb")
params_occ = f"ensemble_{tag}.params"
# --- build ensemble ---
subprocess.run([
"python",
args.builder,
"-r", args.dark,
"-t", args.light,
"-f", str(occ),
"-o", ensemble_pdb
], check=True)
# --- write params ---
write_params(
args.params,
params_occ,
ensemble_pdb,
args.mtz,
tag
)
# --- refine ---
run_refinement(params_occ, tag)
# --- extract density ---
refined_pdb = f"{tag}_001.pdb"
refined_mtz = f"{tag}_001.mtz"
mapfile = f"{tag}_map.dat"
run_map_values(refined_mtz, refined_pdb, mapfile)
atoms = parse_mapvalues(mapfile)
roi_vals = extract_roi_values(atoms, {site_name: site_info})
val = roi_vals[site_name]
print(f"occ={occ:.3f} density={val:.3f}")
results.append((occ, val))
return results
# -------------------------------------------------
# main
# -------------------------------------------------
def main():
parser = argparse.ArgumentParser(
description="Barends occupancy refinement scan"
)
parser.add_argument("--dark", required=True)
parser.add_argument("--light", required=True)
parser.add_argument("--mtz", required=True)
parser.add_argument(
"--params",
required=True,
help="phenix params template"
)
parser.add_argument(
"--builder",
default="generate_combined_pdb.py",
help="ensemble builder script"
)
parser.add_argument(
"--min-occ",
type=float,
default=0.08
)
parser.add_argument(
"--max-occ",
type=float,
default=0.16
)
parser.add_argument(
"--step",
type=float,
default=0.02
)
parser.add_argument(
"-y",
"--roi_yml",
type=str,
help="yml file with list of roi regions",
required=True
)
parser.add_argument(
"--workdir",
default="barends_scan"
)
args = parser.parse_args()
results = run_coarse_scan(args)
site_data = {}
for entry in results:
occ = entry["occ"]
roi_vals = entry["roi"]
for site, val in roi_vals.items():
if site not in site_data:
site_data[site] = []
if val is not None:
site_data[site].append((occ, val))
site_occupancies = {}
for site, data in site_data.items():
occ = find_zero_crossing(data)
print(f"{site}: {occ}")
site_occupancies[site] = occ
# write results
outfile = "barends_scan.dat"
# with open(outfile, "w") as f:
# f.write("occ rfree\n")
# for occ, rfree in results:
# f.write(f"{occ:.3f} {rfree:.5f}\n")
# print("\nScan complete")
# print("Results written to", outfile)
if __name__ == "__main__":
main()