593 lines
14 KiB
Python
593 lines
14 KiB
Python
#!/usr/bin/env python3
|
|
|
|
import gemmi
|
|
import argparse
|
|
import tempfile
|
|
import os
|
|
import sys
|
|
import yaml
|
|
|
|
# -----------------------------
|
|
# load YAML
|
|
# -----------------------------
|
|
def load_yaml(file):
|
|
with open(file) as f:
|
|
return yaml.safe_load(f)
|
|
|
|
|
|
# -----------------------------
|
|
# build site atom map
|
|
# -----------------------------
|
|
def build_site_map(rois):
|
|
|
|
site_map = {}
|
|
|
|
for site, info in rois["regions"].items():
|
|
|
|
atoms = []
|
|
|
|
for a in info["atoms"]:
|
|
|
|
atoms.append({
|
|
"atom": a["atom"],
|
|
"resn": a["res_name"],
|
|
"chain": a["chain"],
|
|
"resid": a["resid"]
|
|
})
|
|
|
|
site_map[site] = atoms
|
|
|
|
return site_map
|
|
|
|
|
|
def is_mainchain(atom_name):
|
|
return atom_name in {"N", "CA", "C", "O"}
|
|
|
|
|
|
def get_site_for_atom(atom_name, chain, resid, resn, site_map):
|
|
|
|
for site, atoms in site_map.items():
|
|
|
|
for a in atoms:
|
|
|
|
if (
|
|
chain == a["chain"] and
|
|
resid == a["resid"] and
|
|
resn == a["resn"]
|
|
):
|
|
# FULL RESIDUE MATCH
|
|
return site
|
|
|
|
return None
|
|
|
|
|
|
# -----------------------------
|
|
# Altloc validation
|
|
# -----------------------------
|
|
def validate_altlocs(struct, label):
|
|
|
|
allowed = {'A', 'B', '', ' ', '\x00'}
|
|
found = set()
|
|
|
|
for chain in struct[0]:
|
|
for res in chain:
|
|
for atom in res:
|
|
|
|
alt = atom.altloc
|
|
|
|
if alt not in ('', ' ', '\x00'):
|
|
found.add(alt)
|
|
|
|
if alt not in allowed:
|
|
|
|
print()
|
|
print("ERROR: Unsupported altloc detected")
|
|
print("----------------------------------")
|
|
print(f"Structure : {label}")
|
|
print(f"Chain : {chain.name}")
|
|
print(f"Residue : {res.name} {res.seqid.num}")
|
|
print(f"Atom : {atom.name}")
|
|
print(f"Altloc : '{alt}'")
|
|
print()
|
|
print("Only altlocs A/B are allowed in input structures.")
|
|
print()
|
|
|
|
sys.exit(1)
|
|
|
|
if len(found) > 2:
|
|
|
|
print()
|
|
print("ERROR: More than two conformations detected")
|
|
print("-------------------------------------------")
|
|
print(f"Structure : {label}")
|
|
print(f"Altlocs : {sorted(found)}")
|
|
print()
|
|
|
|
sys.exit(1)
|
|
|
|
print(f"{label} altlocs detected:", sorted(found))
|
|
|
|
|
|
# -----------------------------
|
|
# Remove ANISOU records
|
|
# -----------------------------
|
|
def strip_anisou(pdb_in, pdb_out):
|
|
|
|
with open(pdb_in) as fin, open(pdb_out, "w") as fout:
|
|
for line in fin:
|
|
if not line.startswith("ANISOU"):
|
|
fout.write(line)
|
|
|
|
|
|
# -----------------------------
|
|
# Altloc conversion helper
|
|
# -----------------------------
|
|
def convert_light_altloc(alt):
|
|
|
|
if alt in ('', ' ', '\x00', 'A'):
|
|
return 'C'
|
|
|
|
if alt == 'B':
|
|
return 'D'
|
|
|
|
raise RuntimeError(f"Unsupported altloc '{alt}' in light structure")
|
|
|
|
|
|
# -----------------------------
|
|
# report differences between states
|
|
# -----------------------------
|
|
def report_state_differences(dark, light):
|
|
|
|
dark_atoms = set()
|
|
light_atoms = set()
|
|
|
|
for chain in dark[0]:
|
|
for res in chain:
|
|
for atom in res:
|
|
key = (
|
|
chain.name,
|
|
res.seqid.num,
|
|
res.name,
|
|
atom.name
|
|
)
|
|
dark_atoms.add(key)
|
|
|
|
for chain in light[0]:
|
|
for res in chain:
|
|
for atom in res:
|
|
key = (
|
|
chain.name,
|
|
res.seqid.num,
|
|
res.name,
|
|
atom.name
|
|
)
|
|
light_atoms.add(key)
|
|
|
|
dark_only = dark_atoms - light_atoms
|
|
light_only = light_atoms - dark_atoms
|
|
|
|
print()
|
|
print("State difference summary")
|
|
print("------------------------")
|
|
print("Atoms only in dark :", len(dark_only))
|
|
print("Atoms only in light:", len(light_only))
|
|
print()
|
|
|
|
# Print a few examples
|
|
if dark_only:
|
|
print("Example dark-only atoms:")
|
|
for x in list(dark_only)[:5]:
|
|
print(" ", x)
|
|
|
|
if light_only:
|
|
print("Example light-only atoms:")
|
|
for x in list(light_only)[:5]:
|
|
print(" ", x)
|
|
|
|
print()
|
|
|
|
|
|
# -----------------------------
|
|
# lookup and check if waters are in both light and dark
|
|
# -----------------------------
|
|
def get_atom_keys(struct):
|
|
|
|
keys = set()
|
|
|
|
for chain in struct[0]:
|
|
for res in chain:
|
|
for atom in res:
|
|
|
|
keys.add((
|
|
chain.name,
|
|
res.seqid.num,
|
|
res.name,
|
|
atom.name
|
|
))
|
|
|
|
return keys
|
|
|
|
|
|
# -----------------------------
|
|
# occupancy sanity check
|
|
# -----------------------------
|
|
def check_occupancies(struct):
|
|
|
|
occ_map = {}
|
|
|
|
for chain in struct[0]:
|
|
for res in chain:
|
|
for atom in res:
|
|
|
|
key = (
|
|
chain.name,
|
|
res.seqid.num,
|
|
res.name,
|
|
atom.name
|
|
)
|
|
|
|
occ_map.setdefault(key, 0.0)
|
|
occ_map[key] += atom.occ
|
|
|
|
problems = []
|
|
|
|
for key, occ in occ_map.items():
|
|
if occ > 1.01: # small tolerance
|
|
problems.append((key, occ))
|
|
|
|
if problems:
|
|
|
|
print()
|
|
print("WARNING: Occupancy > 1 detected")
|
|
print("-------------------------------")
|
|
|
|
for p in problems[:10]:
|
|
print(p)
|
|
|
|
print()
|
|
|
|
else:
|
|
print("Occupancy check passed (no atoms >1).")
|
|
|
|
|
|
# -----------------------------
|
|
# chain/residue check
|
|
# -----------------------------
|
|
def check_missing_light_residues(dark, light, ensemble):
|
|
|
|
ens_atoms = {
|
|
(c.name, r.seqid.num, r.name, a.name)
|
|
for c in ensemble[0]
|
|
for r in c
|
|
for a in r
|
|
}
|
|
|
|
missing = []
|
|
|
|
for chain in light[0]:
|
|
for res in chain:
|
|
for atom in res:
|
|
|
|
key = (chain.name, res.seqid.num, res.name, atom.name)
|
|
|
|
if key not in ens_atoms:
|
|
missing.append(key)
|
|
|
|
if missing:
|
|
|
|
print()
|
|
print("ERROR: atoms lost from light structure during merge")
|
|
print("---------------------------------------------------")
|
|
|
|
for m in missing[:20]:
|
|
print(" ", m)
|
|
|
|
print()
|
|
print("Ensemble construction aborted.")
|
|
print()
|
|
|
|
sys.exit(1)
|
|
|
|
# -----------------------------
|
|
# Build ensemble model
|
|
# -----------------------------
|
|
def build_ensemble(dark, light, light_occ, occ_cutoff=0.005, verbose=False,
|
|
site_occ=None, site_map=None):
|
|
|
|
dark_keys = get_atom_keys(dark)
|
|
light_keys = get_atom_keys(light)
|
|
|
|
ensemble = dark.clone()
|
|
model = ensemble[0]
|
|
|
|
dark_atoms = 0
|
|
|
|
# -----------------
|
|
# Scale dark atoms
|
|
# -----------------
|
|
for chain in model:
|
|
for res in chain:
|
|
for atom in res:
|
|
|
|
# default dark occupancy
|
|
occ = 1.0 - light_occ
|
|
|
|
# override if site-specific
|
|
if site_map and site_occ:
|
|
site = get_site_for_atom(
|
|
atom.name,
|
|
chain.name,
|
|
res.seqid.num,
|
|
res.name,
|
|
site_map
|
|
)
|
|
if site:
|
|
occ = 1.0 - site_occ.get(site, light_occ)
|
|
|
|
key = (chain.name, res.seqid.num, res.name, atom.name)
|
|
|
|
if key not in light_keys:
|
|
# no light counterpart → keep full occupancy
|
|
atom.occ *= 1.0
|
|
else:
|
|
atom.occ *= occ
|
|
|
|
if atom.occ < occ_cutoff:
|
|
continue
|
|
|
|
if atom.altloc in ('', ' ', '\x00'):
|
|
atom.altloc = 'A'
|
|
|
|
dark_atoms += 1
|
|
|
|
if site:
|
|
print(
|
|
"SITE MATCH:",
|
|
site,
|
|
chain.name,
|
|
res.name,
|
|
res.seqid.num,
|
|
atom.name,
|
|
"→ occ =", occ
|
|
)
|
|
|
|
light_atoms = 0
|
|
|
|
# -----------------
|
|
# Add light atoms
|
|
# -----------------
|
|
for chain in light[0]:
|
|
|
|
target_chain = model.find_chain(chain.name)
|
|
|
|
if target_chain is None:
|
|
target_chain = gemmi.Chain(chain.name)
|
|
model.add_chain(target_chain)
|
|
|
|
# residue lookup dictionary (O(1) lookup)
|
|
res_lookup = {
|
|
(r.seqid.num, r.seqid.icode, r.name): r
|
|
for r in target_chain
|
|
}
|
|
|
|
for res in chain:
|
|
|
|
key = (res.seqid.num, res.seqid.icode, res.name)
|
|
|
|
target_res = res_lookup.get(key)
|
|
|
|
if target_res is None:
|
|
|
|
new_res = gemmi.Residue()
|
|
new_res.name = res.name
|
|
new_res.seqid = gemmi.SeqId(res.seqid.num, res.seqid.icode)
|
|
|
|
target_chain.add_residue(new_res)
|
|
target_res = target_chain[-1]
|
|
|
|
res_lookup[key] = target_res
|
|
|
|
for atom in res:
|
|
|
|
# default light occupancy
|
|
occ = light_occ
|
|
|
|
# override if site-specific
|
|
if site_map and site_occ:
|
|
site = get_site_for_atom(
|
|
atom.name,
|
|
chain.name,
|
|
res.seqid.num,
|
|
res.name,
|
|
site_map
|
|
)
|
|
if site:
|
|
occ = site_occ.get(site, light_occ)
|
|
|
|
key = (chain.name, res.seqid.num, res.name, atom.name)
|
|
|
|
if key not in dark_keys:
|
|
# no dark counterpart → full occupancy
|
|
new_occ = atom.occ * 1.0
|
|
else:
|
|
new_occ = atom.occ * occ
|
|
|
|
if new_occ < occ_cutoff:
|
|
|
|
if verbose:
|
|
print(
|
|
"Skipping low-occupancy atom:",
|
|
f"{chain.name} {res.name} {res.seqid.num}",
|
|
f"atom {atom.name}",
|
|
f"orig_occ={atom.occ:.3f}",
|
|
f"scaled_occ={new_occ:.4f}"
|
|
)
|
|
|
|
continue
|
|
|
|
new_atom = gemmi.Atom()
|
|
|
|
new_atom.name = atom.name
|
|
new_atom.pos = gemmi.Position(atom.pos.x, atom.pos.y, atom.pos.z)
|
|
new_atom.element = atom.element
|
|
new_atom.b_iso = atom.b_iso
|
|
new_atom.occ = new_occ
|
|
new_atom.altloc = convert_light_altloc(atom.altloc)
|
|
|
|
target_res.add_atom(new_atom)
|
|
|
|
|
|
if site:
|
|
print(
|
|
"SITE MATCH:",
|
|
site,
|
|
chain.name,
|
|
res.name,
|
|
res.seqid.num,
|
|
atom.name,
|
|
"→ occ =", occ
|
|
)
|
|
|
|
light_atoms += 1
|
|
|
|
print()
|
|
print("Ensemble build summary")
|
|
print("----------------------")
|
|
print("Dark atoms written :", dark_atoms)
|
|
print("Light atoms written:", light_atoms)
|
|
print()
|
|
|
|
return ensemble
|
|
|
|
|
|
# -----------------------------
|
|
# Main
|
|
# -----------------------------
|
|
def main():
|
|
|
|
parser = argparse.ArgumentParser(
|
|
description="Generate multicopy ensemble PDB (Barends-style refinement model).",
|
|
formatter_class=argparse.ArgumentDefaultsHelpFormatter
|
|
)
|
|
|
|
parser.add_argument(
|
|
"-r",
|
|
"--reference",
|
|
type=os.path.abspath,
|
|
help="reference-state PDB file",
|
|
required=True
|
|
)
|
|
|
|
parser.add_argument(
|
|
"-t",
|
|
"--triggered",
|
|
type=os.path.abspath,
|
|
help="Triggered-state PDB file",
|
|
required=True
|
|
)
|
|
|
|
parser.add_argument(
|
|
"-f",
|
|
"--fraction",
|
|
type=float,
|
|
help="Triggered-state occupancy number < 1",
|
|
required=True
|
|
)
|
|
|
|
parser.add_argument(
|
|
"-o",
|
|
"--output",
|
|
default="ensemble.pdb",
|
|
help="Output ensemble PDB",
|
|
)
|
|
|
|
parser.add_argument(
|
|
"-c",
|
|
"--occ-cutoff",
|
|
type=float,
|
|
default=0.005,
|
|
help="Atoms with scaled occupancy below this value are removed"
|
|
)
|
|
|
|
parser.add_argument(
|
|
"--site-occ",
|
|
help="YAML file with site occupancies"
|
|
)
|
|
|
|
parser.add_argument(
|
|
"--roi",
|
|
help="ROI YAML file"
|
|
)
|
|
|
|
parser.add_argument(
|
|
"-v",
|
|
"--verbose",
|
|
action="store_true",
|
|
help="Print detailed filtering information"
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Validate occupancy
|
|
if not (0.0 <= args.fraction <= 1.0):
|
|
print("ERROR: light_occ must be between 0 and 1")
|
|
sys.exit(1)
|
|
|
|
# Temporary files to remove any ANISOU records
|
|
with tempfile.NamedTemporaryFile(suffix=".pdb", delete=False) as tmp_dark, \
|
|
tempfile.NamedTemporaryFile(suffix=".pdb", delete=False) as tmp_light:
|
|
|
|
strip_anisou(args.reference, tmp_dark.name)
|
|
strip_anisou(args.triggered, tmp_light.name)
|
|
|
|
dark = gemmi.read_structure(tmp_dark.name)
|
|
light = gemmi.read_structure(tmp_light.name)
|
|
|
|
os.remove(tmp_dark.name)
|
|
os.remove(tmp_light.name)
|
|
|
|
# Validate the altocs and check only two conformations present in structure
|
|
validate_altlocs(dark, "dark")
|
|
validate_altlocs(light, "light")
|
|
|
|
# report any residue differences
|
|
report_state_differences(dark, light)
|
|
|
|
site_occ = None
|
|
site_map = None
|
|
|
|
if args.site_occ and args.roi:
|
|
|
|
site_occ_data = load_yaml(args.site_occ)
|
|
roi_data = load_yaml(args.roi)
|
|
|
|
site_occ = site_occ_data["site_occupancies"]
|
|
site_map = build_site_map(roi_data)
|
|
|
|
print("Using site-specific occupancies:")
|
|
print(site_occ)
|
|
|
|
ensemble = build_ensemble(
|
|
dark,
|
|
light,
|
|
args.fraction,
|
|
occ_cutoff=args.occ_cutoff,
|
|
verbose=args.verbose,
|
|
site_occ=site_occ,
|
|
site_map=site_map
|
|
)
|
|
|
|
# check all the residues are there
|
|
check_missing_light_residues(dark, light, ensemble)
|
|
|
|
# check occupancies in the ensemble = 1
|
|
check_occupancies(ensemble)
|
|
|
|
# Write to file
|
|
ensemble.write_pdb(args.output)
|
|
|
|
print("Ensemble written:", args.output)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main() |