This commit is contained in:
File diff suppressed because it is too large
Load Diff
@@ -81,6 +81,7 @@ def set_mirror_stripe(energy_ev):
|
||||
def mono_pitch_scan(plot=True):
|
||||
"""Scan the monochromator pitch and move to the peak."""
|
||||
# Move to the calculated pitch value for the current energy
|
||||
print("Starting Mono Pitch Scan.")
|
||||
energy = get_current_energy()
|
||||
pos = get_dcm_motors_positions(energy)
|
||||
print(f"Setting DCM Pitch to default value of {pos['dcm_pitch']}")
|
||||
|
||||
@@ -0,0 +1,280 @@
|
||||
"""Guards for preventing clashing devices in
|
||||
the sample environment"""
|
||||
|
||||
# PD_guards2.py
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Callable, List, Dict
|
||||
|
||||
|
||||
# ----------------------------
|
||||
# Exceptions
|
||||
# ----------------------------
|
||||
|
||||
|
||||
class GuardViolation(RuntimeError):
|
||||
"""Raised when a guarded move is not allowed."""
|
||||
|
||||
|
||||
# ----------------------------
|
||||
# Guarded axis
|
||||
# ----------------------------
|
||||
|
||||
|
||||
class GuardedAxis:
|
||||
""" Motor axis protected by guard policy """
|
||||
def __init__(
|
||||
self,
|
||||
bec_name: str,
|
||||
policy: Callable[[float], None],
|
||||
config: Dict[str, float] = None
|
||||
):
|
||||
self.bec_name = bec_name
|
||||
self.policy = policy
|
||||
self.config = config or {}
|
||||
self.mot = getattr(dev, self.bec_name)
|
||||
|
||||
@property
|
||||
def actual(self) -> float:
|
||||
"""Returns the current motor position"""
|
||||
return self.mot.read()[self.bec_name]["value"]
|
||||
|
||||
def move(self, target: float):
|
||||
"""Used to move a guarded axis to a target value"""
|
||||
self.policy(target) # must raise if disallowed
|
||||
scans.umv(self.mot, target, relative=False)
|
||||
|
||||
|
||||
# ----------------------------
|
||||
# Positioned device (IN / OUT)
|
||||
# ----------------------------
|
||||
|
||||
|
||||
@dataclass
|
||||
class PositionedDevice:
|
||||
"""Applies to devices that only have IN and OUT positions
|
||||
Guards are defined by guard rules to ensure their safe operation"""
|
||||
|
||||
bec_name: str
|
||||
inpos: float
|
||||
outpos: float
|
||||
tol: float = 0.01
|
||||
guards: List[Callable[[], None]] = field(default_factory=list)
|
||||
|
||||
def __post_init__(self):
|
||||
self.mot = getattr(dev, self.bec_name)
|
||||
|
||||
def _check_guards(self):
|
||||
for g in self.guards:
|
||||
g()
|
||||
|
||||
def mvin(self):
|
||||
"""Move a positioned device to IN position"""
|
||||
self._check_guards()
|
||||
scans.umv(self.mot, self.inpos, relative=False)
|
||||
|
||||
def mvout(self):
|
||||
"""Move a positioned device to OUT position"""
|
||||
self._check_guards()
|
||||
scans.umv(self.mot, self.outpos, relative=False)
|
||||
|
||||
def is_in(self):
|
||||
"""Returns true if the device is IN"""
|
||||
return abs(self.mot.read()[self.bec_name]["value"] - self.inpos) <= self.tol
|
||||
|
||||
def is_out(self):
|
||||
"""Returns true if the device is OUT"""
|
||||
return abs(self.mot.read()[self.bec_name]["value"] - self.outpos) <= self.tol
|
||||
|
||||
|
||||
@dataclass
|
||||
class MultiPositionDevice:
|
||||
""" Devices that have multiple defined positions. Guards rules are defined to
|
||||
ensure their safe operation"""
|
||||
bec_name: str
|
||||
positions: Dict[str, float] # {"out": 0.0, "scint": 10.0, "i1": 20.0}
|
||||
tol: float = 0.01
|
||||
guards: List[Callable[[], None]] = field(default_factory=list)
|
||||
|
||||
def __post_init__(self):
|
||||
self.mot = getattr(dev, self.bec_name)
|
||||
|
||||
def _check_guards(self):
|
||||
"""Check guard conditions"""
|
||||
for g in self.guards:
|
||||
g()
|
||||
|
||||
def move_to(self, state: str):
|
||||
"""Move to one of the states defined in self.positions"""
|
||||
if state not in self.positions:
|
||||
raise ValueError(f"Unknown state '{state}'")
|
||||
|
||||
self._check_guards()
|
||||
scans.umv(self.mot, self.positions[state], relative=False)
|
||||
|
||||
def is_at(self, state: str) -> bool:
|
||||
"""Check if device is at a given state"""
|
||||
if state not in self.positions:
|
||||
raise ValueError(f"Unknown state '{state}'")
|
||||
|
||||
return abs(self.mot.read()[self.bec_name]["value"] - self.positions[state]) <= self.tol
|
||||
|
||||
@property
|
||||
def actual(self) -> float:
|
||||
"""Returns current motor position"""
|
||||
return self.mot.read()[self.bec_name]["value"]
|
||||
|
||||
@property
|
||||
def state(self) -> str:
|
||||
"""Returns current state"""
|
||||
for name, pos in self.positions.items():
|
||||
if abs(self.mot.read()[self.bec_name]["value"] - pos) <= self.tol:
|
||||
return name
|
||||
return "unknown"
|
||||
|
||||
def is_clear(self):
|
||||
"""Returns true if device is at OUT or below e.g. PARK"""
|
||||
if "out" not in self.positions:
|
||||
raise ValueError("MultiPosition device requires 'out' state")
|
||||
return self.actual < (self.positions["out"] + self.tol)
|
||||
|
||||
|
||||
# ----------------------------
|
||||
# PD namespace (filled at runtime)
|
||||
# ----------------------------
|
||||
|
||||
|
||||
class PD:
|
||||
"""Populated when the PD devices are initialised"""
|
||||
pass
|
||||
|
||||
|
||||
# ----------------------------
|
||||
# Guard rules for BS_Z
|
||||
# ----------------------------
|
||||
# BS positioner must be in for BS_Z to move
|
||||
def bs_z_requires_bs_pos_in():
|
||||
"""Cannot move bs_z unless the BS positioner is in"""
|
||||
if not PD.bs_pos.is_in():
|
||||
raise GuardViolation("BS_Z cannot move unless beamstop positioner is IN")
|
||||
|
||||
def bs_z_range_check(target):
|
||||
"""Checks that the target position is within limits"""
|
||||
cfg = PD.bs_z.config
|
||||
|
||||
# Lower bound
|
||||
if target < cfg["work_min"] and not is_sample_area_clear(beamstop=True):
|
||||
raise GuardViolation(
|
||||
f"Requested beamstop Z {target} is below working range minimum {cfg['work_min']}"
|
||||
)
|
||||
if target < cfg["min"]:
|
||||
raise GuardViolation(
|
||||
f"Requested beamstop Z {target} is below absolute minimum {cfg['min']}"
|
||||
)
|
||||
|
||||
# Maximum position depend on backlight position
|
||||
if PD.bl_pos.is_in():
|
||||
if target > cfg["max_blin"]:
|
||||
raise GuardViolation(
|
||||
f"Requested beamstop Z value of {target} mm exceeds maximum allowed"
|
||||
f"value of {cfg['max_blin']} while backlight is IN"
|
||||
)
|
||||
else:
|
||||
if target > cfg["max_blout"]:
|
||||
raise GuardViolation(
|
||||
f"Requested beamstop Z value of {target} mm exceeds maximum allowed "
|
||||
f"value of {cfg['max_blout']} mm"
|
||||
)
|
||||
|
||||
|
||||
def is_sample_area_clear(beamstop=True):
|
||||
"""Check if the sample area is clear, raising GuardViolation if constraints are not met."""
|
||||
if beamstop:
|
||||
# Check collimator, and diagnostic device positions
|
||||
if not PD.coll_y.is_clear():
|
||||
raise GuardViolation("Sample area is not clear: Collimator is IN")
|
||||
if not PD.diag_y.is_clear():
|
||||
raise GuardViolation("Sample area is not clear: Diagnostic device is IN")
|
||||
|
||||
# Validate goniometer position
|
||||
if not abs(PD.gon_x.actual - PD.gon_x.config["out"]) < PD.gon_x.config["tol"]:
|
||||
raise GuardViolation("Sample area is not clear: Goniometer is IN")
|
||||
else:
|
||||
# Check that diagnostic (scintillator/i1) device is out
|
||||
if not PD.diag_y.is_clear():
|
||||
raise GuardViolation("Sample are is not clear: Diagnostic device is IN")
|
||||
|
||||
return True
|
||||
|
||||
|
||||
def bs_z_policy(target):
|
||||
"""Defines the policy for bs_z operation"""
|
||||
# Beamstop z can only move when the positioner is in
|
||||
bs_z_requires_bs_pos_in()
|
||||
|
||||
# Check the allowed range for bs_z
|
||||
bs_z_range_check(target)
|
||||
|
||||
return True
|
||||
|
||||
|
||||
def gon_x_policy(target):
|
||||
"""Defines the policy for gon_x operation"""
|
||||
is_sample_area_clear(beamstop=False)
|
||||
bs_z_above_work_min()
|
||||
return True
|
||||
|
||||
|
||||
def bs_pos_requires_bs_z_safe():
|
||||
"""bs_pos can only move when bs_z is at the safe position"""
|
||||
safe = PD.bs_z.config["safe"]
|
||||
actual = PD.bs_z.actual
|
||||
tol = 0.1
|
||||
|
||||
if abs(actual - safe) > tol:
|
||||
raise GuardViolation(f"Beamstop positioner can only move when BS_Z is at {safe} mm")
|
||||
|
||||
|
||||
def bs_z_above_work_min():
|
||||
"""work_min specifies the minimum bs_z value that is
|
||||
outside of the sample area i.e. no clashes with diagnostic
|
||||
device or collimator"""
|
||||
work_min = PD.bs_z.config["work_min"]
|
||||
if PD.bs_z.actual < work_min:
|
||||
raise GuardViolation(f"BS_Z must be greater than {work_min} mm")
|
||||
|
||||
|
||||
def bs_z_below_max_blin():
|
||||
"""Maximum bs_z vale when the backlight is in"""
|
||||
max_blin = PD.bs_z.config["max_blin"]
|
||||
if PD.bs_z.actual > max_blin:
|
||||
raise GuardViolation(f"BS_Z must be less than {max_blin} mm")
|
||||
|
||||
|
||||
def gonio_is_out():
|
||||
"""Maximum bs_z value when the backlight is out"""
|
||||
if not abs(PD.gon_x.actual - PD.gon_x.config["out"]) < PD.gon_x.config["tol"]:
|
||||
raise GuardViolation(f"Goniometer must be OUT ({PD.gon_x.config['out']} mm)")
|
||||
|
||||
|
||||
def get_policy_for_axis(bec_name):
|
||||
"""Specify the policy for guarded axis"""
|
||||
policy_registry = {"bs_z": bs_z_policy, "gon_x": gon_x_policy}
|
||||
return policy_registry.get(bec_name, lambda target: True)
|
||||
|
||||
|
||||
def init_collision_guards():
|
||||
"""Add the guard rules for positioned devices"""
|
||||
PD.bs_pos.guards.append(bs_pos_requires_bs_z_safe)
|
||||
PD.bl_pos.guards.append(bs_z_below_max_blin)
|
||||
PD.coll_y.guards.append(bs_z_above_work_min)
|
||||
PD.diag_y.guards.append(bs_z_above_work_min)
|
||||
PD.diag_y.guards.append(gonio_is_out)
|
||||
PD.diag_y.guards.append(bs_z_requires_bs_pos_in)
|
||||
|
||||
|
||||
def init_positioned_devices():
|
||||
"""Initialises the positioned devices"""
|
||||
file = "/sls/x10sa/config/bec/production/pxii_bec/pxii_bec/device_configs/pxii-autogenerated.yaml"
|
||||
build_pd(file)
|
||||
init_collision_guards()
|
||||
print("Defined positions for devices have been updated from pxii-autogenerated.yaml")
|
||||
@@ -1,10 +1,7 @@
|
||||
"""File to store beamline parameters and defaults"""
|
||||
|
||||
from dataclasses import dataclass
|
||||
from typing import Callable
|
||||
import numpy as np
|
||||
import yaml
|
||||
|
||||
|
||||
|
||||
|
||||
@@ -16,7 +13,7 @@ class EnergyDefaults:
|
||||
min_energy_ev = 4800
|
||||
max_energy_ev = 30002
|
||||
beam_offset = 6
|
||||
signals = {"sig1": dev.lu_bpmsum, "sig2": dev.bsc_bpmsum, "sig3": dev.bcu_bpmsum}
|
||||
signals = {"sig1": dev.lu_bpmsum, "sig2": dev.ss_bpmsum, "sig3": dev.bcu_bpmsum}
|
||||
energy = dev.dcm_bragg
|
||||
mono_pitch = dev.dcm_pitch
|
||||
mono_perp = dev.dcm_perp
|
||||
@@ -148,15 +145,15 @@ class BPMScans:
|
||||
"y_device": dev.lu_bpm_y,
|
||||
}
|
||||
bsc = {
|
||||
"x_name": dev.bsc_bpm_x.name,
|
||||
"y_name": dev.bsc_bpm_y.name,
|
||||
"z1_name": dev.bsc_bpm1.name,
|
||||
"z2_name": dev.bsc_bpm2.name,
|
||||
"z3_name": dev.bsc_bpm3.name,
|
||||
"z4_name": dev.bsc_bpm4.name,
|
||||
"z5_name": dev.bsc_bpmsum.name,
|
||||
"x_device": dev.bsc_bpm_x,
|
||||
"y_device": dev.bsc_bpm_y,
|
||||
"x_name": dev.ss_bpm_x.name,
|
||||
"y_name": dev.ss_bpm_y.name,
|
||||
"z1_name": dev.ss_bpm1.name,
|
||||
"z2_name": dev.ss_bpm2.name,
|
||||
"z3_name": dev.ss_bpm3.name,
|
||||
"z4_name": dev.ss_bpm4.name,
|
||||
"z5_name": dev.ss_bpmsum.name,
|
||||
"x_device": dev.ss_bpm_x,
|
||||
"y_device": dev.ss_bpm_y,
|
||||
}
|
||||
bcu = {
|
||||
"x_name": dev.bcu_bpm_x.name,
|
||||
@@ -191,148 +188,3 @@ class MirrorConfig:
|
||||
}
|
||||
|
||||
|
||||
@dataclass
|
||||
class PositionedDevice:
|
||||
"""Class for devices with defined in and out positions"""
|
||||
|
||||
device_name: str
|
||||
type: str
|
||||
name: str
|
||||
inpos: float
|
||||
outpos: float
|
||||
tol: float
|
||||
mot: str
|
||||
reader: Callable[[], float]
|
||||
|
||||
@property
|
||||
def actual(self):
|
||||
"""Returns current motor position"""
|
||||
return self.reader()
|
||||
|
||||
def checkin(self):
|
||||
"""Returns True if motor in in the 'in' position"""
|
||||
return abs(self.actual - self.inpos) <= self.tol
|
||||
|
||||
def mvin(self):
|
||||
"""Moves motor to the 'in' position"""
|
||||
scans.umv(self.mot, self.inpos, relative=False)
|
||||
|
||||
def mvout(self):
|
||||
"""Moves motor to the 'out' position"""
|
||||
scans.umv(self.mot, self.outpos, relative=False)
|
||||
|
||||
def status(self):
|
||||
""" Check if device is in or out or moving"""
|
||||
positions = ("in", "out", "moving", "undefined")
|
||||
target_in = self.inpos
|
||||
target_out = self.outpos
|
||||
actual = self.actual
|
||||
delta_in = actual - target_in
|
||||
delta_out = actual - target_out
|
||||
# Check if motor is moving
|
||||
if "Signal" in self.type:
|
||||
moving = 0
|
||||
elif "Motor" in self.type:
|
||||
d = getattr(dev, self.device_name)
|
||||
moving = d.motor_is_moving.get()
|
||||
if moving:
|
||||
pos = positions[2]
|
||||
return {"position": pos.upper(),
|
||||
"name": self.name,
|
||||
"moving": moving}
|
||||
|
||||
if abs(delta_in) > self.tol and abs(delta_out) > self.tol:
|
||||
pos = positions[3]
|
||||
return {"position": pos.upper(),
|
||||
"name": self.name,
|
||||
"actual": actual,
|
||||
"moving": moving}
|
||||
|
||||
elif abs(delta_in) <= self.tol:
|
||||
target = self.inpos
|
||||
pos = positions[0]
|
||||
delta = delta_in
|
||||
elif abs(delta_out) <= self.tol:
|
||||
target = self.outpos
|
||||
pos = positions[1]
|
||||
delta = delta_out
|
||||
|
||||
return {
|
||||
"name": self.name,
|
||||
"position": pos.upper(),
|
||||
"target": target,
|
||||
"actual": actual,
|
||||
"delta": delta,
|
||||
"tol": self.tol,
|
||||
"moving": moving,
|
||||
}
|
||||
|
||||
def report(self):
|
||||
""" Print status of motor """
|
||||
s = self.status()
|
||||
|
||||
if s['position'] == "UNDEFINED":
|
||||
return (f"{s['name']:15s}: "
|
||||
f"{s['position']} "
|
||||
f"position {s['actual']:.3f}")
|
||||
elif s['position'] == "MOVING":
|
||||
return (f"{s['name']:15s}: "
|
||||
f"{s['position']} ")
|
||||
else:
|
||||
return (
|
||||
f"{s['name']:15s}: "
|
||||
f"[{s['position']}] "
|
||||
f"actual = {s['actual']:.3f} "
|
||||
f"target = {s['target']:.3f} "
|
||||
f"delta = {s['delta']:.3f}"
|
||||
)
|
||||
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class PD:
|
||||
"""Class for positioned device positions"""
|
||||
|
||||
|
||||
def build_pd(yaml_file):
|
||||
"""Takes the in and out values from the yaml file
|
||||
and adds them to the PD class
|
||||
"""
|
||||
with open(yaml_file, encoding="utf-8") as f:
|
||||
data = yaml.safe_load(f)
|
||||
for device_name, cfg in data.items():
|
||||
# Skip devices without userParameter
|
||||
user = cfg.get("userParameter")
|
||||
if not user:
|
||||
continue
|
||||
# Set tolerance
|
||||
if "tol" not in user:
|
||||
user["tol"] = 0.01
|
||||
|
||||
try:
|
||||
dev_obj = getattr(dev, device_name)
|
||||
except:
|
||||
raise KeyError(f"Device {device_name} not found in device list")
|
||||
desc = cfg.get("description")
|
||||
type = cfg.get("deviceClass")
|
||||
target = PositionedDevice(
|
||||
device_name=device_name,
|
||||
type = type,
|
||||
name=desc,
|
||||
inpos=user["in"],
|
||||
outpos=user["out"],
|
||||
tol=user["tol"],
|
||||
mot=dev_obj,
|
||||
reader=lambda d=dev_obj, n=device_name: d.read()[n]["value"],
|
||||
)
|
||||
|
||||
setattr(PD, device_name, target)
|
||||
|
||||
|
||||
def init_positioned_devices():
|
||||
"""Initialises the positioned devices"""
|
||||
file = (
|
||||
"/sls/x10sa/config/bec/production/pxii_bec/pxii_bec/device_configs/pxii-autogenerated.yaml"
|
||||
)
|
||||
build_pd(file)
|
||||
print("Defined positions for devices have been updated from pxii-autogenerated.yaml")
|
||||
|
||||
@@ -0,0 +1,59 @@
|
||||
"""
|
||||
update_PD_from_yaml.py
|
||||
|
||||
Creates PositionedDevice, MultiPositionDevice and GuardedAxis
|
||||
instances from YAML configuration.
|
||||
"""
|
||||
|
||||
import yaml
|
||||
|
||||
|
||||
def build_pd(yaml_file):
|
||||
"""Takes the defined positions from the device yaml file
|
||||
and adds them to the PD class
|
||||
"""
|
||||
pos_devs = []
|
||||
mp_devs = []
|
||||
ga_devs = []
|
||||
|
||||
|
||||
with open(yaml_file, encoding="utf-8") as f:
|
||||
data = yaml.safe_load(f)
|
||||
|
||||
for bec_name, cfg in data.items():
|
||||
# Skip devices without userParameter
|
||||
user = cfg.get("userParameter")
|
||||
|
||||
if not user:
|
||||
continue
|
||||
# ------------------------------------------------------------------
|
||||
# Positioned device
|
||||
# ------------------------------------------------------------------
|
||||
if user["type"] == "positioner":
|
||||
pos_devs.append(bec_name)
|
||||
posdev = PositionedDevice(bec_name=bec_name, inpos=1.0, outpos=0.0)
|
||||
setattr(PD, bec_name, posdev)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Multi-position device
|
||||
# ------------------------------------------------------------------
|
||||
elif user["type"] == "multi-position":
|
||||
mp_devs.append(bec_name)
|
||||
positions = {k: v for k, v in user.items() if k != "type"}
|
||||
mpdev = MultiPositionDevice(bec_name=bec_name, positions=positions)
|
||||
setattr(PD, bec_name, mpdev)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Guarded device
|
||||
# ------------------------------------------------------------------
|
||||
elif user["type"] == "guarded":
|
||||
ga_devs.append(bec_name)
|
||||
config = {k: v for k, v in user.items() if k != "type"}
|
||||
gadev = GuardedAxis(
|
||||
bec_name=bec_name, policy=get_policy_for_axis(bec_name), config=config
|
||||
)
|
||||
setattr(PD, bec_name, gadev)
|
||||
|
||||
print(f"Positioned devices: {pos_devs}")
|
||||
print(f"Guarded axes: {ga_devs}")
|
||||
print(f"Multi position devices: {mp_devs}")
|
||||
Reference in New Issue
Block a user