Files
extrapolation/generate_combined_pdb.py

593 lines
14 KiB
Python

#!/usr/bin/env python3
import gemmi
import argparse
import tempfile
import os
import sys
import yaml
# -----------------------------
# load YAML
# -----------------------------
def load_yaml(file):
with open(file) as f:
return yaml.safe_load(f)
# -----------------------------
# build site atom map
# -----------------------------
def build_site_map(rois):
site_map = {}
for site, info in rois["regions"].items():
atoms = []
for a in info["atoms"]:
atoms.append({
"atom": a["atom"],
"resn": a["res_name"],
"chain": a["chain"],
"resid": a["resid"]
})
site_map[site] = atoms
return site_map
def is_mainchain(atom_name):
return atom_name in {"N", "CA", "C", "O"}
def get_site_for_atom(atom_name, chain, resid, resn, site_map):
for site, atoms in site_map.items():
for a in atoms:
if (
chain == a["chain"] and
resid == a["resid"] and
resn == a["resn"]
):
# FULL RESIDUE MATCH
return site
return None
# -----------------------------
# Altloc validation
# -----------------------------
def validate_altlocs(struct, label):
allowed = {'A', 'B', '', ' ', '\x00'}
found = set()
for chain in struct[0]:
for res in chain:
for atom in res:
alt = atom.altloc
if alt not in ('', ' ', '\x00'):
found.add(alt)
if alt not in allowed:
print()
print("ERROR: Unsupported altloc detected")
print("----------------------------------")
print(f"Structure : {label}")
print(f"Chain : {chain.name}")
print(f"Residue : {res.name} {res.seqid.num}")
print(f"Atom : {atom.name}")
print(f"Altloc : '{alt}'")
print()
print("Only altlocs A/B are allowed in input structures.")
print()
sys.exit(1)
if len(found) > 2:
print()
print("ERROR: More than two conformations detected")
print("-------------------------------------------")
print(f"Structure : {label}")
print(f"Altlocs : {sorted(found)}")
print()
sys.exit(1)
print(f"{label} altlocs detected:", sorted(found))
# -----------------------------
# Remove ANISOU records
# -----------------------------
def strip_anisou(pdb_in, pdb_out):
with open(pdb_in) as fin, open(pdb_out, "w") as fout:
for line in fin:
if not line.startswith("ANISOU"):
fout.write(line)
# -----------------------------
# Altloc conversion helper
# -----------------------------
def convert_light_altloc(alt):
if alt in ('', ' ', '\x00', 'A'):
return 'C'
if alt == 'B':
return 'D'
raise RuntimeError(f"Unsupported altloc '{alt}' in light structure")
# -----------------------------
# report differences between states
# -----------------------------
def report_state_differences(dark, light):
dark_atoms = set()
light_atoms = set()
for chain in dark[0]:
for res in chain:
for atom in res:
key = (
chain.name,
res.seqid.num,
res.name,
atom.name
)
dark_atoms.add(key)
for chain in light[0]:
for res in chain:
for atom in res:
key = (
chain.name,
res.seqid.num,
res.name,
atom.name
)
light_atoms.add(key)
dark_only = dark_atoms - light_atoms
light_only = light_atoms - dark_atoms
print()
print("State difference summary")
print("------------------------")
print("Atoms only in dark :", len(dark_only))
print("Atoms only in light:", len(light_only))
print()
# Print a few examples
if dark_only:
print("Example dark-only atoms:")
for x in list(dark_only)[:5]:
print(" ", x)
if light_only:
print("Example light-only atoms:")
for x in list(light_only)[:5]:
print(" ", x)
print()
# -----------------------------
# lookup and check if waters are in both light and dark
# -----------------------------
def get_atom_keys(struct):
keys = set()
for chain in struct[0]:
for res in chain:
for atom in res:
keys.add((
chain.name,
res.seqid.num,
res.name,
atom.name
))
return keys
# -----------------------------
# occupancy sanity check
# -----------------------------
def check_occupancies(struct):
occ_map = {}
for chain in struct[0]:
for res in chain:
for atom in res:
key = (
chain.name,
res.seqid.num,
res.name,
atom.name
)
occ_map.setdefault(key, 0.0)
occ_map[key] += atom.occ
problems = []
for key, occ in occ_map.items():
if occ > 1.01: # small tolerance
problems.append((key, occ))
if problems:
print()
print("WARNING: Occupancy > 1 detected")
print("-------------------------------")
for p in problems[:10]:
print(p)
print()
else:
print("Occupancy check passed (no atoms >1).")
# -----------------------------
# chain/residue check
# -----------------------------
def check_missing_light_residues(dark, light, ensemble):
ens_atoms = {
(c.name, r.seqid.num, r.name, a.name)
for c in ensemble[0]
for r in c
for a in r
}
missing = []
for chain in light[0]:
for res in chain:
for atom in res:
key = (chain.name, res.seqid.num, res.name, atom.name)
if key not in ens_atoms:
missing.append(key)
if missing:
print()
print("ERROR: atoms lost from light structure during merge")
print("---------------------------------------------------")
for m in missing[:20]:
print(" ", m)
print()
print("Ensemble construction aborted.")
print()
sys.exit(1)
# -----------------------------
# Build ensemble model
# -----------------------------
def build_ensemble(dark, light, light_occ, occ_cutoff=0.005, verbose=False,
site_occ=None, site_map=None):
dark_keys = get_atom_keys(dark)
light_keys = get_atom_keys(light)
ensemble = dark.clone()
model = ensemble[0]
dark_atoms = 0
# -----------------
# Scale dark atoms
# -----------------
for chain in model:
for res in chain:
for atom in res:
# default dark occupancy
occ = 1.0 - light_occ
# override if site-specific
if site_map and site_occ:
site = get_site_for_atom(
atom.name,
chain.name,
res.seqid.num,
res.name,
site_map
)
if site:
occ = 1.0 - site_occ.get(site, light_occ)
key = (chain.name, res.seqid.num, res.name, atom.name)
if key not in light_keys:
# no light counterpart → keep full occupancy
atom.occ *= 1.0
else:
atom.occ *= occ
if atom.occ < occ_cutoff:
continue
if atom.altloc in ('', ' ', '\x00'):
atom.altloc = 'A'
dark_atoms += 1
if site:
print(
"SITE MATCH:",
site,
chain.name,
res.name,
res.seqid.num,
atom.name,
"→ occ =", occ
)
light_atoms = 0
# -----------------
# Add light atoms
# -----------------
for chain in light[0]:
target_chain = model.find_chain(chain.name)
if target_chain is None:
target_chain = gemmi.Chain(chain.name)
model.add_chain(target_chain)
# residue lookup dictionary (O(1) lookup)
res_lookup = {
(r.seqid.num, r.seqid.icode, r.name): r
for r in target_chain
}
for res in chain:
key = (res.seqid.num, res.seqid.icode, res.name)
target_res = res_lookup.get(key)
if target_res is None:
new_res = gemmi.Residue()
new_res.name = res.name
new_res.seqid = gemmi.SeqId(res.seqid.num, res.seqid.icode)
target_chain.add_residue(new_res)
target_res = target_chain[-1]
res_lookup[key] = target_res
for atom in res:
# default light occupancy
occ = light_occ
# override if site-specific
if site_map and site_occ:
site = get_site_for_atom(
atom.name,
chain.name,
res.seqid.num,
res.name,
site_map
)
if site:
occ = site_occ.get(site, light_occ)
key = (chain.name, res.seqid.num, res.name, atom.name)
if key not in dark_keys:
# no dark counterpart → full occupancy
new_occ = atom.occ * 1.0
else:
new_occ = atom.occ * occ
if new_occ < occ_cutoff:
if verbose:
print(
"Skipping low-occupancy atom:",
f"{chain.name} {res.name} {res.seqid.num}",
f"atom {atom.name}",
f"orig_occ={atom.occ:.3f}",
f"scaled_occ={new_occ:.4f}"
)
continue
new_atom = gemmi.Atom()
new_atom.name = atom.name
new_atom.pos = gemmi.Position(atom.pos.x, atom.pos.y, atom.pos.z)
new_atom.element = atom.element
new_atom.b_iso = atom.b_iso
new_atom.occ = new_occ
new_atom.altloc = convert_light_altloc(atom.altloc)
target_res.add_atom(new_atom)
if site:
print(
"SITE MATCH:",
site,
chain.name,
res.name,
res.seqid.num,
atom.name,
"→ occ =", occ
)
light_atoms += 1
print()
print("Ensemble build summary")
print("----------------------")
print("Dark atoms written :", dark_atoms)
print("Light atoms written:", light_atoms)
print()
return ensemble
# -----------------------------
# Main
# -----------------------------
def main():
parser = argparse.ArgumentParser(
description="Generate multicopy ensemble PDB (Barends-style refinement model).",
formatter_class=argparse.ArgumentDefaultsHelpFormatter
)
parser.add_argument(
"-r",
"--reference",
type=os.path.abspath,
help="reference-state PDB file",
required=True
)
parser.add_argument(
"-t",
"--triggered",
type=os.path.abspath,
help="Triggered-state PDB file",
required=True
)
parser.add_argument(
"-f",
"--fraction",
type=float,
help="Triggered-state occupancy number < 1",
required=True
)
parser.add_argument(
"-o",
"--output",
default="ensemble.pdb",
help="Output ensemble PDB",
)
parser.add_argument(
"-c",
"--occ-cutoff",
type=float,
default=0.005,
help="Atoms with scaled occupancy below this value are removed"
)
parser.add_argument(
"--site-occ",
help="YAML file with site occupancies"
)
parser.add_argument(
"--roi",
help="ROI YAML file"
)
parser.add_argument(
"-v",
"--verbose",
action="store_true",
help="Print detailed filtering information"
)
args = parser.parse_args()
# Validate occupancy
if not (0.0 <= args.fraction <= 1.0):
print("ERROR: light_occ must be between 0 and 1")
sys.exit(1)
# Temporary files to remove any ANISOU records
with tempfile.NamedTemporaryFile(suffix=".pdb", delete=False) as tmp_dark, \
tempfile.NamedTemporaryFile(suffix=".pdb", delete=False) as tmp_light:
strip_anisou(args.reference, tmp_dark.name)
strip_anisou(args.triggered, tmp_light.name)
dark = gemmi.read_structure(tmp_dark.name)
light = gemmi.read_structure(tmp_light.name)
os.remove(tmp_dark.name)
os.remove(tmp_light.name)
# Validate the altocs and check only two conformations present in structure
validate_altlocs(dark, "dark")
validate_altlocs(light, "light")
# report any residue differences
report_state_differences(dark, light)
site_occ = None
site_map = None
if args.site_occ and args.roi:
site_occ_data = load_yaml(args.site_occ)
roi_data = load_yaml(args.roi)
site_occ = site_occ_data["site_occupancies"]
site_map = build_site_map(roi_data)
print("Using site-specific occupancies:")
print(site_occ)
ensemble = build_ensemble(
dark,
light,
args.fraction,
occ_cutoff=args.occ_cutoff,
verbose=args.verbose,
site_occ=site_occ,
site_map=site_map
)
# check all the residues are there
check_missing_light_residues(dark, light, ensemble)
# check occupancies in the ensemble = 1
check_occupancies(ensemble)
# Write to file
ensemble.write_pdb(args.output)
print("Ensemble written:", args.output)
if __name__ == "__main__":
main()