Merge pull request 'fix bug in policies, add check macro' (#18) from x10sa_production_20260521T182415 into main
CI for pxii_bec / test (push) Successful in 32s
CI for pxii_bec / test (push) Successful in 32s
Reviewed-on: #18
This commit was merged in pull request #18.
This commit is contained in:
@@ -0,0 +1,62 @@
|
||||
from bec_lib.device import Signal, Positioner
|
||||
|
||||
def check():
|
||||
check_devices(tolerance = 0.02)
|
||||
|
||||
|
||||
|
||||
def check_devices(tolerance):
|
||||
devices = list(dev.items())
|
||||
for name, obj in devices:
|
||||
|
||||
try:
|
||||
data = obj.read()
|
||||
# deal with smargon
|
||||
if "smargon" in name:
|
||||
continue
|
||||
|
||||
actual = data[name]["value"]
|
||||
|
||||
# If signal and a camera or bpm, check reading is not 0
|
||||
if isinstance(obj, Signal):
|
||||
if 'cam' in name or 'bpm' in name:
|
||||
if actual == 0.0:
|
||||
print(f"{name} is reading 0")
|
||||
|
||||
# if signal is a motor, check in position, error, moving
|
||||
if isinstance(obj, Positioner):
|
||||
# Check set position = real position
|
||||
|
||||
sp_key = f"{name}_user_setpoint"
|
||||
|
||||
if sp_key not in data:
|
||||
continue
|
||||
|
||||
setpoint = data[sp_key]["value"]
|
||||
|
||||
diff = abs(actual - setpoint)
|
||||
inpos = diff <= tolerance
|
||||
if not inpos:
|
||||
print(
|
||||
f"{name}: "
|
||||
# f"actual={actual:.4f}, "
|
||||
# f"setpoint={setpoint:.4f}, "
|
||||
# f"diff={diff:.4f}, "
|
||||
f"in position = {inpos}"
|
||||
)
|
||||
# check if motor is in error state
|
||||
error_status = obj.motor_status.get()
|
||||
|
||||
if error_status != 0:
|
||||
print(f"{name}: error_state = {error_status}")
|
||||
|
||||
# check if motor is moving
|
||||
move_status = obj.motor_is_moving.get()
|
||||
if move_status != 0:
|
||||
print(f"{name} is moving")
|
||||
|
||||
|
||||
except Exception as exc:
|
||||
|
||||
print(f"{name}: ERROR -> {exc}")
|
||||
|
||||
@@ -54,7 +54,7 @@ def make_diag_y_policy(d):
|
||||
def diag_y_policy(target):
|
||||
cfg = d["diag_y"].positions
|
||||
# Don't move in if the goniometer is >= 'in'
|
||||
if d["gon_x"].actual >= d["gon_x"].positions["in"]:
|
||||
if d["gon_x"].actual >= d["gon_x"].positions["in"] and target > cfg['out']:
|
||||
raise GuardViolation(
|
||||
f"Diagnostic device cannot move beyond {cfg['out']} mm when goniometer is not OUT"
|
||||
)
|
||||
|
||||
@@ -1,73 +0,0 @@
|
||||
'''Define guard policies for devices in the beamline.'''
|
||||
# from guards import GuardViolation, guards_setup
|
||||
|
||||
|
||||
def is_sample_area_clear_for_beamstop(d):
|
||||
'''Check if the sample area is clear of diag_y, coll_y, and gonx'''
|
||||
g = guards_setup(d)
|
||||
if g["diag_y_out"].check() and g["coll_y_out"].check() and g["gonx_out"].check():
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def is_sample_area_clear_for_gonx(d):
|
||||
'''Check if the sample area is clear of diag_y and bs_z'''
|
||||
g = guards_setup(d)
|
||||
if g["diag_y_out"].check() and g["bs_work_min"].check():
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def make_gon_x_policy(d):
|
||||
'''Create the policy for gon_x'''
|
||||
def gon_x_policy(target):
|
||||
cfg = d["gon_x"].positions
|
||||
if target >= cfg["out"] and not is_sample_area_clear_for_gonx(d):
|
||||
raise GuardViolation("Sample area is not clear")
|
||||
|
||||
return gon_x_policy
|
||||
|
||||
|
||||
def make_bs_z_policy(d):
|
||||
'''Create the policy for bs_z'''
|
||||
def bs_z_policy(target):
|
||||
"""Checks that the target position is within limits"""
|
||||
cfg = d["bs_z"].positions
|
||||
# Lower bound
|
||||
if target < cfg["work_min"] and not is_sample_area_clear_for_beamstop(d):
|
||||
raise GuardViolation("Sample area is not clear")
|
||||
if target < cfg["min"]:
|
||||
raise GuardViolation(
|
||||
f"Requested beamstop Z {target} is below recommended minimum {cfg['min']}"
|
||||
)
|
||||
# Upper bound
|
||||
if d["bl_pos"].pos == "in" and target > cfg["max_blin"]:
|
||||
raise GuardViolation(
|
||||
f"Beamstop Z cannot move beyond {cfg['max_blin']} when backlight is IN"
|
||||
)
|
||||
|
||||
return bs_z_policy
|
||||
|
||||
|
||||
def make_diag_y_policy(d):
|
||||
'''Create the policy for diag_y'''
|
||||
def diag_y_policy(target):
|
||||
cfg = d["diag_y"].positions
|
||||
# Don't move in if the goniometer is >= 'in'
|
||||
if d["gon_x"].actual >= d["gon_x"].positions["in"] and target > {cfg['out']}:
|
||||
raise GuardViolation(
|
||||
f"Diagnostic device cannot move beyond {cfg['out']} mm when goniometer is not OUT"
|
||||
)
|
||||
if d['cryo_pos'].pos == "in" and target > cfg["out"]:
|
||||
raise GuardViolation(
|
||||
f"Diagnostic device cannot move beyond {cfg['out']} mm when cryo is IN"
|
||||
)
|
||||
|
||||
return diag_y_policy
|
||||
|
||||
|
||||
def attach_policies(d):
|
||||
'''Attach the policies to the devices'''
|
||||
d["bs_z"].policy = make_bs_z_policy(d)
|
||||
d["gon_x"].policy = make_gon_x_policy(d)
|
||||
d["diag_y"].policy = make_diag_y_policy(d)
|
||||
Reference in New Issue
Block a user