fix bug in policies, add check macro #18

Merged
pauluhn merged 1 commits from x10sa_production_20260521T182415 into main 2026-05-22 15:18:00 +02:00
3 changed files with 63 additions and 74 deletions
+62
View File
@@ -0,0 +1,62 @@
from bec_lib.device import Signal, Positioner
def check():
check_devices(tolerance = 0.02)
def check_devices(tolerance):
devices = list(dev.items())
for name, obj in devices:
try:
data = obj.read()
# deal with smargon
if "smargon" in name:
continue
actual = data[name]["value"]
# If signal and a camera or bpm, check reading is not 0
if isinstance(obj, Signal):
if 'cam' in name or 'bpm' in name:
if actual == 0.0:
print(f"{name} is reading 0")
# if signal is a motor, check in position, error, moving
if isinstance(obj, Positioner):
# Check set position = real position
sp_key = f"{name}_user_setpoint"
if sp_key not in data:
continue
setpoint = data[sp_key]["value"]
diff = abs(actual - setpoint)
inpos = diff <= tolerance
if not inpos:
print(
f"{name}: "
# f"actual={actual:.4f}, "
# f"setpoint={setpoint:.4f}, "
# f"diff={diff:.4f}, "
f"in position = {inpos}"
)
# check if motor is in error state
error_status = obj.motor_status.get()
if error_status != 0:
print(f"{name}: error_state = {error_status}")
# check if motor is moving
move_status = obj.motor_is_moving.get()
if move_status != 0:
print(f"{name} is moving")
except Exception as exc:
print(f"{name}: ERROR -> {exc}")
+1 -1
View File
@@ -54,7 +54,7 @@ def make_diag_y_policy(d):
def diag_y_policy(target):
cfg = d["diag_y"].positions
# Don't move in if the goniometer is >= 'in'
if d["gon_x"].actual >= d["gon_x"].positions["in"]:
if d["gon_x"].actual >= d["gon_x"].positions["in"] and target > cfg['out']:
raise GuardViolation(
f"Diagnostic device cannot move beyond {cfg['out']} mm when goniometer is not OUT"
)
-73
View File
@@ -1,73 +0,0 @@
'''Define guard policies for devices in the beamline.'''
# from guards import GuardViolation, guards_setup
def is_sample_area_clear_for_beamstop(d):
'''Check if the sample area is clear of diag_y, coll_y, and gonx'''
g = guards_setup(d)
if g["diag_y_out"].check() and g["coll_y_out"].check() and g["gonx_out"].check():
return True
return False
def is_sample_area_clear_for_gonx(d):
'''Check if the sample area is clear of diag_y and bs_z'''
g = guards_setup(d)
if g["diag_y_out"].check() and g["bs_work_min"].check():
return True
return False
def make_gon_x_policy(d):
'''Create the policy for gon_x'''
def gon_x_policy(target):
cfg = d["gon_x"].positions
if target >= cfg["out"] and not is_sample_area_clear_for_gonx(d):
raise GuardViolation("Sample area is not clear")
return gon_x_policy
def make_bs_z_policy(d):
'''Create the policy for bs_z'''
def bs_z_policy(target):
"""Checks that the target position is within limits"""
cfg = d["bs_z"].positions
# Lower bound
if target < cfg["work_min"] and not is_sample_area_clear_for_beamstop(d):
raise GuardViolation("Sample area is not clear")
if target < cfg["min"]:
raise GuardViolation(
f"Requested beamstop Z {target} is below recommended minimum {cfg['min']}"
)
# Upper bound
if d["bl_pos"].pos == "in" and target > cfg["max_blin"]:
raise GuardViolation(
f"Beamstop Z cannot move beyond {cfg['max_blin']} when backlight is IN"
)
return bs_z_policy
def make_diag_y_policy(d):
'''Create the policy for diag_y'''
def diag_y_policy(target):
cfg = d["diag_y"].positions
# Don't move in if the goniometer is >= 'in'
if d["gon_x"].actual >= d["gon_x"].positions["in"] and target > {cfg['out']}:
raise GuardViolation(
f"Diagnostic device cannot move beyond {cfg['out']} mm when goniometer is not OUT"
)
if d['cryo_pos'].pos == "in" and target > cfg["out"]:
raise GuardViolation(
f"Diagnostic device cannot move beyond {cfg['out']} mm when cryo is IN"
)
return diag_y_policy
def attach_policies(d):
'''Attach the policies to the devices'''
d["bs_z"].policy = make_bs_z_policy(d)
d["gon_x"].policy = make_gon_x_policy(d)
d["diag_y"].policy = make_diag_y_policy(d)