This commit is contained in:
@@ -43,3 +43,6 @@ if _args.session.lower() == "alignment":
|
||||
# SETUP PROMPTS
|
||||
bec._ip.prompts.username = _session_name
|
||||
bec._ip.prompts.status = 1
|
||||
|
||||
d, planner = init_beamline_environment()
|
||||
|
||||
|
||||
@@ -10,8 +10,14 @@ states:
|
||||
det_cov: 'close'
|
||||
diag_y: out
|
||||
fl_bright: 'off'
|
||||
gon_x: in
|
||||
# smargon: not implemented
|
||||
aerotech_x: in
|
||||
aerotech_y: mount
|
||||
aerotech_z: mount
|
||||
aerotech_u: mount
|
||||
smargon_x: mount
|
||||
smargon_y: mount
|
||||
smargon_chi: mount
|
||||
smargon_phi: mount
|
||||
xrf_pos: out
|
||||
|
||||
sample_alignment:
|
||||
@@ -25,8 +31,14 @@ states:
|
||||
det_cov: 'close'
|
||||
diag_y: out
|
||||
fl_bright: 'on'
|
||||
gon_x: in
|
||||
# smargon: not implemented
|
||||
aerotech_x: in
|
||||
aerotech_y: work
|
||||
aerotech_z: work
|
||||
# aerotech_u: mount
|
||||
# smargon_x: mount
|
||||
# smargon_y: mount
|
||||
# smargon_chi: mount
|
||||
# smargon_phi: mount
|
||||
xrf_pos: out
|
||||
|
||||
data_collection:
|
||||
@@ -40,8 +52,14 @@ states:
|
||||
det_cov: 'open'
|
||||
diag_y: out
|
||||
fl_bright: 'on'
|
||||
gon_x: in
|
||||
# smargon: not implemented
|
||||
aerotech_x: in
|
||||
aerotech_y: work
|
||||
aerotech_z: work
|
||||
# aerotech_u: mount
|
||||
# smargon_x: mount
|
||||
# smargon_y: mount
|
||||
# smargon_chi: mount
|
||||
# smargon_phi: mount
|
||||
xrf_pos: out
|
||||
|
||||
DC_XRF:
|
||||
@@ -55,8 +73,14 @@ states:
|
||||
det_cov: 'close'
|
||||
diag_y: out
|
||||
fl_bright: 'on'
|
||||
gon_x: in
|
||||
# smargon: not implemented
|
||||
aerotech_x: in
|
||||
aerotech_y: work
|
||||
aerotech_z: work
|
||||
aerotech_u: mount
|
||||
# smargon_x: mount
|
||||
# smargon_y: mount
|
||||
# smargon_chi: mount
|
||||
# smargon_phi: mount
|
||||
xrf_pos: in
|
||||
|
||||
manual_sample_exchange:
|
||||
@@ -70,8 +94,14 @@ states:
|
||||
det_cov: 'close'
|
||||
diag_y: park
|
||||
fl_bright: 'off'
|
||||
gon_x: in
|
||||
# smargon: not implemented
|
||||
aerotech_x: in
|
||||
aerotech_y: mount
|
||||
aerotech_z: mount
|
||||
aerotech_u: mount
|
||||
smargon_x: mount
|
||||
smargon_y: mount
|
||||
smargon_chi: mount
|
||||
smargon_phi: mount
|
||||
xrf_pos: out
|
||||
|
||||
beam_visualisation:
|
||||
@@ -84,8 +114,14 @@ states:
|
||||
det_cov: 'close'
|
||||
diag_y: scint
|
||||
fl_bright: 'off'
|
||||
gon_x: out
|
||||
# smargon: not implemented
|
||||
aerotech_x: out
|
||||
aerotech_y: mount
|
||||
aerotech_z: mount
|
||||
aerotech_u: mount
|
||||
smargon_x: mount
|
||||
smargon_y: mount
|
||||
smargon_chi: mount
|
||||
smargon_phi: mount
|
||||
xrf_pos: out
|
||||
|
||||
flux_measurement:
|
||||
@@ -98,8 +134,14 @@ states:
|
||||
det_cov: 'close'
|
||||
diag_y: i1
|
||||
fl_bright: 'off'
|
||||
gon_x: out
|
||||
# smargon: not implemented
|
||||
aerotech_x: out
|
||||
aerotech_y: mount
|
||||
aerotech_z: mount
|
||||
aerotech_u: mount
|
||||
smargon_x: mount
|
||||
smargon_y: mount
|
||||
smargon_chi: mount
|
||||
smargon_phi: mount
|
||||
xrf_pos: out
|
||||
|
||||
beamstop_alignment:
|
||||
@@ -112,8 +154,14 @@ states:
|
||||
det_cov: 'close'
|
||||
diag_y: out
|
||||
fl_bright: 'on'
|
||||
gon_x: out
|
||||
# smargon: not implemented
|
||||
aerotech_x: out
|
||||
aerotech_y: mount
|
||||
aerotech_z: mount
|
||||
aerotech_u: mount
|
||||
smargon_x: mount
|
||||
smargon_y: mount
|
||||
smargon_chi: mount
|
||||
smargon_phi: mount
|
||||
xrf_pos: out
|
||||
|
||||
maintenance:
|
||||
@@ -127,8 +175,14 @@ states:
|
||||
det_cov: 'close'
|
||||
diag_y: park
|
||||
fl_bright: 'off'
|
||||
gon_x: out
|
||||
# smargon: not implemented
|
||||
aerotech_x: out
|
||||
aerotech_y: mount
|
||||
aerotech_z: mount
|
||||
aerotech_u: mount
|
||||
smargon_x: mount
|
||||
smargon_y: mount
|
||||
smargon_chi: mount
|
||||
smargon_phi: mount
|
||||
xrf_pos: out
|
||||
|
||||
xtal_snapshot:
|
||||
@@ -142,8 +196,14 @@ states:
|
||||
det_cov: 'close'
|
||||
diag_y: out
|
||||
fl_bright: 'on'
|
||||
gon_x: in
|
||||
# smargon: not implemented
|
||||
aerotech_x: in
|
||||
aerotech_y: work
|
||||
aerotech_z: work
|
||||
aerotech_u: mount
|
||||
smargon_x: mount
|
||||
smargon_y: mount
|
||||
smargon_chi: mount
|
||||
smargon_phi: mount
|
||||
xrf_pos: out
|
||||
|
||||
|
||||
|
||||
@@ -1,81 +0,0 @@
|
||||
import csv
|
||||
import json
|
||||
|
||||
|
||||
|
||||
|
||||
def str_to_bool(val):
|
||||
return str(val).strip().lower() in ["yes", "true", "1"]
|
||||
|
||||
def create(INPUT_CSV, OUTPUT_YAML):
|
||||
with open(INPUT_CSV, newline="") as csvfile:
|
||||
reader = csv.DictReader(csvfile)
|
||||
|
||||
with open(OUTPUT_YAML, "w") as yamlfile:
|
||||
for row in reader:
|
||||
include = row["include"]
|
||||
name = row["name"]
|
||||
desc = row["description"]
|
||||
device_class = row["deviceClass"]
|
||||
pv = row["PV"]
|
||||
readout_priority = row["readoutPriority"]
|
||||
tag = row["tag"]
|
||||
read_only = str_to_bool(row["readOnly"])
|
||||
user_param = row.get("userParameter", "").strip()
|
||||
|
||||
if str(include).strip().lower() != "yes":
|
||||
continue
|
||||
|
||||
yamlfile.write(f"{name}:\n")
|
||||
yamlfile.write(f" description: {desc}\n")
|
||||
|
||||
if device_class == "Motor" or device_class == "MotorEC":
|
||||
yamlfile.write(f" deviceClass: ophyd_devices.Epics{device_class}\n")
|
||||
yamlfile.write(f" deviceConfig: {{prefix: '{pv}'}}\n")
|
||||
else:
|
||||
yamlfile.write(f" deviceClass: ophyd.Epics{device_class}\n")
|
||||
yamlfile.write(
|
||||
f" deviceConfig: {{read_pv: '{pv}', auto_monitor: true}}\n"
|
||||
)
|
||||
|
||||
yamlfile.write(" onFailure: buffer\n")
|
||||
yamlfile.write(" enabled: True\n")
|
||||
yamlfile.write(f" readoutPriority: {readout_priority}\n")
|
||||
yamlfile.write(" deviceTags:\n")
|
||||
yamlfile.write(f" - {tag}\n")
|
||||
yamlfile.write(f" readOnly: {read_only}\n")
|
||||
yamlfile.write(" softwareTrigger: false\n")
|
||||
|
||||
# Only add userParameter for Motors if present
|
||||
# if device_class == "Motor" and user_param:
|
||||
if user_param:
|
||||
try:
|
||||
parsed = json.loads(user_param)
|
||||
yamlfile.write(" userParameter:\n")
|
||||
for k, v in parsed.items():
|
||||
yamlfile.write(f" {k}: {v}\n")
|
||||
except json.JSONDecodeError:
|
||||
yamlfile.write(f" userParameter: {user_param}\n")
|
||||
|
||||
yamlfile.write("\n")
|
||||
|
||||
print(f"YAML written to {OUTPUT_YAML}")
|
||||
|
||||
def main():
|
||||
|
||||
devices = "pxiii-standard-devices"
|
||||
states = "pxiii-state-devices"
|
||||
|
||||
device_files = [
|
||||
f"{devices}.csv",
|
||||
f"{devices}.yaml"
|
||||
]
|
||||
state_files = [
|
||||
f"{states}.csv",
|
||||
f"{states}.yaml"
|
||||
]
|
||||
|
||||
create(device_files[0],device_files[1])
|
||||
create(state_files[0],state_files[1])
|
||||
|
||||
main()
|
||||
@@ -0,0 +1,18 @@
|
||||
aerotech_x:
|
||||
userParameter: {"type": continuous, "in": 0.0, "out": -10.0, "safe": -100, "tol": 0.5}
|
||||
aerotech_y:
|
||||
userParameter: {"type": continuous, "mount": 0.0, "work": 0.01, "tol": 0.002}
|
||||
aerotech_z:
|
||||
userParameter: {"type": continuous, "mount": 0.0, "work": 0.02, "tol": 0.01}
|
||||
aerotech_u:
|
||||
userParameter: {"type": continuous, "mount": 0.0}
|
||||
smargon_x:
|
||||
userParameter: {"type": continuous, "mount": 0.0}
|
||||
smargon_y:
|
||||
userParameter: {"type": continuous, "mount": 0.0}
|
||||
smargon_z:
|
||||
userParameter: {"type": continuous, "mount": 0.0}
|
||||
smargon_chi:
|
||||
userParameter: {"type": continuous, "mount": 0.0}
|
||||
smargon_phi:
|
||||
userParameter: {"type": continuous, "mount": 0.0}
|
||||
@@ -1,127 +0,0 @@
|
||||
name,description,deviceClass,PV,readoutPriority,tag,readOnly,include,userParameter,
|
||||
sls_current,SLS Current,SignalRO,ARS07-DPCT-0100:CURR,monitored,SLS,yes,yes,,
|
||||
fe_sl_xr,FE Slit X Ring,MotorEC,X06DA-FE-SLDI:TRXR,baseline,fe,no,yes,,
|
||||
fe_sl_yt,FE Slit Y Top,MotorEC,X06DA-FE-SLDI:TRYT,baseline,fe,no,yes,,
|
||||
fe_sl_xw,FE Slit X Wall,MotorEC,X06DA-FE-SLDI:TRXW,baseline,fe,no,yes,,
|
||||
fe_sl_yb,FE Slit Y Bottom,MotorEC,X06DA-FE-SLDI:TRYB,baseline,fe,no,yes,,
|
||||
fe_sl_xcen,FE Slit X Centre,MotorEC,X06DA-FE-SLDI:CENTERX,baseline,fe,no,yes,,
|
||||
fe_sl_xsize,FE Slit X Size,MotorEC,X06DA-FE-SLDI:SIZEX,baseline,fe,no,yes,,
|
||||
fe_sl_ycen,FE Slit Y Centre,MotorEC,X06DA-FE-SLDI:CENTERY,baseline,fe,no,yes,,
|
||||
fe_sl_ysize,FE Slit Y Size,MotorEC,X06DA-FE-SLDI:SIZEY,baseline,fe,no,yes,,
|
||||
tm_xu,TorM Upstream X,MotorEC,X06DA-FE-MI1:TRXU,baseline,tm,no,yes,,
|
||||
tm_xd,TorM Downstream X,MotorEC,X06DA-FE-MI1:TRXD,baseline,tm,no,yes,,
|
||||
tm_yur,TorM Upstream Ring Y,MotorEC,X06DA-FE-MI1:TRYUR,baseline,tm,no,yes,,
|
||||
tm_yw,TorM Wall Y,MotorEC,X06DA-FE-MI1:TRYUW,baseline,tm,no,yes,,
|
||||
tm_yd,TorM Downstream Y,MotorEC,X06DA-FE-MI1:TRYD,baseline,tm,no,yes,,
|
||||
tm_b1,TorM Bender,MotorEC,X06DA-FE-MI1:BEND1,baseline,tm,no,yes,,
|
||||
tm_yaw,TorM Virtual Yaw,MotorEC,X06DA-FE-MI1:YAW,baseline,tm,no,yes,,
|
||||
tm_roll,TorM Virtual Roll,MotorEC,X06DA-FE-MI1:ROLL,baseline,tm,no,yes,,
|
||||
tm_pitch,TorM Virtual Pitch,MotorEC,X06DA-FE-MI1:PITCH,baseline,tm,no,yes,,
|
||||
tm_x,TorM Virtual X,MotorEC,X06DA-FE-MI1:TRX,baseline,tm,no,yes,,
|
||||
tm_y,TorM Virtual Y ,MotorEC,X06DA-FE-MI1:TRY,baseline,tm,no,yes,,
|
||||
bsf_bpm1,BSF BPM Channel 1,SignalRO,X06DA-OP-BSFBPM:SIGNAL1,monitored,bpm,yes,no,,
|
||||
bsf_bpm2,BSF BPM Channel 2,SignalRO,X06DA-OP-BSFBPM:SIGNAL2,monitored,bpm,yes,no,,
|
||||
bsf_bpm3,BSF BPM Channel 3,SignalRO,X06DA-OP-BSFBPM:SIGNAL3,monitored,bpm,yes,no,,
|
||||
bsf_bpm4,BSF BPM Channel 4,SignalRO,X06DA-OP-BSFBPM:SIGNAL4,monitored,bpm,yes,no,,
|
||||
bsf_bpmsum,BSF BPM Summed,SignalRO,X06DA-OP-BSFBPM:SUM,monitored,bpm,yes,no,,
|
||||
bsf_sl_xw,BSF Slit outboard,MotorEC,X06DA-OP-BSFSLH:TRXW,baseline,bsf,no,yes,,
|
||||
bsf_sl_xr,BSF Slit inboard,MotorEC,X06DA-OP-BSFSLH:TRXR,baseline,bsf,no,yes,,
|
||||
bsf_sl_xcen,BSF X Centre,MotorEC,X06DA-OP-BSFSLH:CENTER,baseline,bsf,no,yes,,
|
||||
bsf_sl_xsize,BSF X Size,MotorEC,X06DA-OP-BSFSLH:SIZE,baseline,bsf,no,yes,,
|
||||
bsf_f1_y,BSF Filter 1 Y,MotorEC,X06DA-OP-BSFFI1:TRY,baseline,bsf,no,yes,,
|
||||
dccm_theta1,DCCM Theta Xtal1,MotorEC,X06DA-OP-DCCM:ROTX-CR1,baseline,dccm,no,yes,,
|
||||
dccm_theta2,DCCM Theta Xtal2,MotorEC,X06DA-OP-DCCM:ROTX-CR2,baseline,dccm,no,yes,,
|
||||
dccm_rotz,DCCM RotZ Xtal 2,MotorEC,X06DA-OP-DCCM:ROTZ-CR2,baseline,dccm,no,yes,,
|
||||
dccm_xbpm1_y,DCCM BPM1 Y,MotorEC,X06DA-OP-DCCMXBPM1:TRY,baseline,dccm,no,yes,,
|
||||
dccm_xbpm2_y,DCCM BPM2 Y,MotorEC,X06DA-OP-DCCMXBPM2:TRY,baseline,dccm,no,yes,,
|
||||
dccm_energy,DCCM Energy,Motor,X06DA-OP-DCCM:ENERGY,baseline,dccm,no,yes,,
|
||||
dccm_di_top,DCCM Diode Top,SignalRO,X06DA-OP-DCCMXBPM1T:READOUT,monitored,dccm,no,yes,,
|
||||
dccm_di_bot,DCCM Diode Bottom,SignalRO,X06DA-OP-DCCMXBPM1B:READOUT,monitored,dccm,no,yes,,
|
||||
dccm_bpm1,DCCM BPM Channel 1,SignalRO,X06DA-OP-DCCMXBPM2:Current1:MeanValue_RBV,monitored,dccm,no,yes,,
|
||||
dccm_bpm2,DCCM BPM Channel 2,SignalRO,X06DA-OP-DCCMXBPM2:Current2:MeanValue_RBV,monitored,dccm,no,yes,,
|
||||
dccm_bpm3,DCCM BPM Channel 3,SignalRO,X06DA-OP-DCCMXBPM2:Current3:MeanValue_RBV,monitored,dccm,no,yes,,
|
||||
dccm_bpm4,DCCM BPM Channel 4,SignalRO,X06DA-OP-DCCMXBPM2:Current4:MeanValue_RBV,monitored,dccm,no,yes,,
|
||||
dccm_bpmsum,DCCM BPM Summed,SignalRO,X06DA-OP-DCCMXBPM2:SumAll:MeanValue_RBV,monitored,dccm,no,yes,,
|
||||
ss_bpm1,SS BPM Channel 1,SignalRO,X06DA-ES-SSBPM:Current1:MeanValue_RBV,monitored,bpm,yes,yes,,
|
||||
ss_bpm2,SS BPM Channel 2,SignalRO,X06DA-ES-SSBPM:Current2:MeanValue_RBV,monitored,bpm,yes,yes,,
|
||||
ss_bpm3,SS BPM Channel 3,SignalRO,X06DA-ES-SSBPM:Current3:MeanValue_RBV,monitored,bpm,yes,yes,,
|
||||
ss_bpm4,SS BPM Channel 4,SignalRO,X06DA-ES-SSBPM:Current4:MeanValue_RBV,monitored,bpm,yes,yes,,
|
||||
ss_bpmsum,SS BPM Summed,SignalRO,X06DA-ES-SSBPM:SumAll:MeanValue_RBV,monitored,bpm,yes,yes,,
|
||||
ss_bpm_x,SS BPM X,Motor,X06DA-ES-SSBPM:TRX,baseline,ss,no,yes,,
|
||||
ss_bpm_y,SS BPM Y,Motor,X06DA-ES-SSBPM:TRY,baseline,ss,no,yes,,
|
||||
ss_sl_xw,SS Slit Wall,Motor,X06DA-ES-SSSLH:TRXW,baseline,ss,no,yes,,
|
||||
ss_sl_xr,SS Slit Ring,Motor,X06DA-ES-SSSLH:TRXR,baseline,ss,no,yes,,
|
||||
ss_sl_xcen,SS Slit X Centre,Motor,X06DA-ES-SSSLH:CENTER,baseline,ss,no,yes,,
|
||||
ss_sl_xsize,SS Slit X Size,Motor,X06DA-ES-SSSLH:SIZE,baseline,ss,no,yes,,
|
||||
ss_sl_yt,SS Slit Top,Motor,X06DA-ES-SSSLV:TRYT,baseline,ss,no,yes,,
|
||||
ss_sl_yb,SS Slit Bottom,Motor,X06DA-ES-SSSLV:TRYB,baseline,ss,no,yes,,
|
||||
ss_sl_ycen,SS Slit Y Centre,Motor,X06DA-ES-SSSLV:CENTER,baseline,ss,no,yes,,
|
||||
ss_sl_ysize,SS Slit Y Size,Motor,X06DA-ES-SSSLV:SIZE,baseline,ss,no,yes,,
|
||||
ss_xi_x,SS X-ray Eye X,Motor,X06DA-ES-SSXI:TRX,baseline,ss,no,yes,"{""type"": multi-position,""in"": 7.5, ""out"": -2.1}",
|
||||
ss_xi_y,SS X-ray Eye Y,Motor,X06DA-ES-SSXI:TRY,baseline,ss,no,yes,,
|
||||
ss_xicam_x,SS Camera X,SignalRO,X06DA-ES-SSCAM:Stats5:CentroidX_RBV,baseline,ss,yes,yes,,
|
||||
ss_xicam_y,SS Camera Y,SignalRO,X06DA-ES-SSCAM:Stats5:CentroidY_RBV,baseline,ss,yes,yes,,
|
||||
ss_xicam_max,SS Cam Max,SignalRO,X06DA-ES-SSCAM:Stats5:MaxValue_RBV,monitored,ss,yes,yes,,
|
||||
ss_xicam_exp,SS Camera Exposure,Signal,X06DA-ES-SSCAM:cam1:AcquireTime,baseline,ss,no,yes,,
|
||||
ss_xicam_gain,SS Camera Gain,Signal,X06DA-ES-SSCAM:cam1:Gain,baseline,ss,no,yes,,
|
||||
ss_xicam_xsig,SS Camera X Sigma,Signal,X06DA-ES-SSCAM:Stats5:SigmaX_RBV,baseline,ss,yes,yes,,
|
||||
ss_xicam_ysig,SS Camera Y Sigma,Signal,X06DA-ES-SSCAM:Stats5:SigmaY_RBV,baseline,ss,yes,yes,,
|
||||
vfm_xu,VFM Upstream X,MotorEC,X06DA-ES-VFM:TRXU,baseline,vfm,no,yes,,
|
||||
vfm_xd,VFM Downstream X,MotorEC,X06DA-ES-VFM:TRXD,baseline,vfm,no,yes,,
|
||||
vfm_yur,VFM Upstream Ring Y,MotorEC,X06DA-ES-VFM:TRYUR,baseline,vfm,no,yes,,
|
||||
vfm_yw,VFM Wall Y,MotorEC,X06DA-ES-VFM:TRYW,baseline,vfm,no,yes,,
|
||||
vfm_ydr,VFM Downstream Ring Y,MotorEC,X06DA-ES-VFM:TRYDR,baseline,vfm,no,yes,,
|
||||
vfm_bu,VFM Upstream Bender,MotorEC,X06DA-ES-VFM:BNDU,baseline,vfm,no,yes,,
|
||||
vfm_bd,VFM Downstream Bender,MotorEC,X06DA-ES-VFM:BNDD,baseline,vfm,no,yes,,
|
||||
vfm_yaw,VFM Virtual Yaw,MotorEC,X06DA-ES-VFM:YAW,baseline,vfm,no,yes,,
|
||||
vfm_roll,VFM Virtual Roll,MotorEC,X06DA-ES-VFM:ROLL,baseline,vfm,no,yes,,
|
||||
vfm_pitch,VFM Virtual Pitch,MotorEC,X06DA-ES-VFM:PITCH,baseline,vfm,no,yes,,
|
||||
vfm_x,VFM Virtual X,MotorEC,X06DA-ES-VFM:TRX,baseline,vfm,no,yes,,
|
||||
vfm_y,VFM Virtual Y ,MotorEC,X06DA-ES-VFM:TRY,baseline,vfm,no,yes,,
|
||||
hfm_xu,HFM Upstream X,MotorEC,X06DA-ES-HFM:TRXU,baseline,hfm,no,yes,,
|
||||
hfm_xd,HFM Downstream X,MotorEC,X06DA-ES-HFM:TRXD,baseline,hfm,no,yes,,
|
||||
hfm_yuw,HFM Upstream Wall Y,MotorEC,X06DA-ES-HFM:TRYUW,baseline,hfm,no,yes,,
|
||||
hfm_yr,HFM Ring Y,MotorEC,X06DA-ES-HFM:TRYR,baseline,hfm,no,yes,,
|
||||
hfm_ydw,HFM Downstream Wall Y,MotorEC,X06DA-ES-HFM:TRYDW,baseline,hfm,no,yes,,
|
||||
hfm_bu,HFM Upstream Bender,MotorEC,X06DA-ES-HFM:BNDU,baseline,hfm,no,yes,,
|
||||
hfm_bd,HFM Downstream Bender,MotorEC,X06DA-ES-HFM:BNDD,baseline,hfm,no,yes,,
|
||||
hfm_yaw,HFM Virtual Yaw,MotorEC,X06DA-ES-HFM:YAW,baseline,hfm,no,yes,,
|
||||
hfm_roll,HFM Virtual Roll,MotorEC,X06DA-ES-HFM:ROLL,baseline,hfm,no,yes,,
|
||||
hfm_pitch,HFM Virtual Pitch,MotorEC,X06DA-ES-HFM:PITCH,baseline,hfm,no,yes,,
|
||||
hfm_x,HFM Virtual X,MotorEC,X06DA-ES-HFM:TRX,baseline,hfm,no,yes,,
|
||||
hfm_y,HFM Virtual Y ,MotorEC,X06DA-ES-HFM:TRY,baseline,hfm,no,yes,,
|
||||
bcu_bpm1,BCU BPM Channel 1 ,SignalRO,X06DA-ES-BCBPM:Current1:MeanValue_RBV,monitored,bpm,yes,yes,,
|
||||
bcu_bpm2,BCU BPM Channel 2,SignalRO,X06DA-ES-BCBPM:Current2:MeanValue_RBV,monitored,bpm,yes,yes,,
|
||||
bcu_bpm3,BCU BPM Channel 3,SignalRO,X06DA-ES-BCBPM:Current3:MeanValue_RBV,monitored,bpm,yes,yes,,
|
||||
bcu_bpm4,BCU BPM Channel 4,SignalRO,X06DA-ES-BCBPM:Current4:MeanValue_RBV,monitored,bpm,yes,yes,,
|
||||
bcu_bpmsum,BCU BPM Summed,SignalRO,X06DA-ES-BCBPM:SumAll:MeanValue_RBV,monitored,bpm,yes,yes,,
|
||||
bcu_bpm_x,BCU BPM X,Motor,X06DA-ES-BCBPM:TRX,baseline,bcu,no,yes,,
|
||||
bcu_bpm_y,BCU BPM Y ,Motor,X06DA-ES-BCBPM:TRY,baseline,bcu,no,yes,,
|
||||
bcu_sl_xw,BCU Slit Wall,Motor,X06DA-ES-BCSLH:TRXW,baseline,bcu,no,yes,,
|
||||
bcu_sl_xr,BCU Slit Ring,Motor,X06DA-ES-BCSLH:TRXR,baseline,bcu,no,yes,,
|
||||
bcu_sl_xcen,BCU Slit X Centre,Motor,X06DA-ES-BCSLH:CENTER,baseline,bcu,no,yes,,
|
||||
bcu_sl_xsize,BCU Slit X Size,Motor,X06DA-ES-BCSLH:SIZE,baseline,bcu,no,yes,,
|
||||
bcu_sl_yt,BCU Slit top,Motor,X06DA-ES-BCSLV:TRYT,baseline,bcu,no,yes,,
|
||||
bcu_sl_yb,BCU Slit Bottom,Motor,X06DA-ES-BCSLV:TRYB,baseline,bcu,no,yes,,
|
||||
bcu_sl_ycen,BCU Slit Y Centre,Motor,X06DA-ES-BCSLV:CENTER,baseline,bcu,no,yes,,
|
||||
bcu_sl_ysize,BCU Slit Y Size,Motor,X06DA-ES-BCSLV:SIZE,baseline,bcu,no,yes,,
|
||||
samcam_x,Sample Camera X ,SignalRO,X06DA-ES-MS:Stats5:CentroidX_RBV,baseline,scam,yes,no,,
|
||||
samcam_xsig,Sample Camera X Sigma,SignalRO,X06DA-ES-MS:Stats5:SigmaX_RBV,monitored,scam,yes,no,,
|
||||
samcam_y,Sample Camera Y ,SignalRO,X06DA-ES-MS:Stats5:CentroidY_RBV,baseline,scam,yes,no,,
|
||||
samcam_ysig,Sample Camera Y Sigma,SignalRO,X06DA-ES-MS:Stats5:SigmaY_RBV,monitored,scam,yes,no,,
|
||||
samcam_max,Sample Camera Max,SignalRO,X06DA-ES-MS:Stats5:MaxValue_RBV,monitored,scam,yes,no,,
|
||||
samcam_exp,Sample Camera Exposure,Signal,X06DA-ES-MS:cam1:AcquireTime,baseline,scam,no,no,,
|
||||
samcam_gain,Sample Camera Gain,Signal,X06DA-ES-MS:cam1:Gain,baseline,scam,no,no,,
|
||||
scam_zoom,Sample Camera Zoom,Motor,X06DA-ES-MS:ZOOM,baseline,scam,no,yes,,
|
||||
coll_x,Collimator X,Motor,X06DA-ES-COL:TRX,baseline,se,no,no,,
|
||||
diag_z,Scintillator/diode Z,Motor,X06DA-ES-SCL:TRZ,baseline,se,no,no,,
|
||||
i1,I1 diode,SignalRO,X06DA-ES-SCLDI:READOUT,monitored,bpm,yes,no,,
|
||||
bs_x,Beamstop X,Motor,X06DA-ES-BS:TRX,baseline,se,no,no,,
|
||||
bs_y,Beamstop Y,Motor,X06DA-ES-BS:TRY,baseline,se,no,no,,
|
||||
gon_y,Goniometer Y,Motor,X06DA-ES-DF1:TRY1,baseline,det,no,no,,
|
||||
gon_z,Goniometer X,Motor,X06DA-ES-DF1:TRZ1,baseline,det,no,no,,
|
||||
omega,Omega,Motor,X06DA-ES-DF1:ROTU,baseline,det,no,no,,
|
||||
cryo_x,Cryo X ,Motor,X06DA-ES-CS:TRX,baseline,se,no,no,,
|
||||
cryo_temp,Cryo Temperature,SignalRO,X06DA-ES-CS:TEMP_RBV,baseline,se,no,no,,
|
||||
det_y,Detector Y,MotorEC,X06DA-ES-DET:TRY,baseline,det,no,no,,
|
||||
det_z,Detector Z,MotorEC,X06DA-ES-DET:TRZ,baseline,det,no,no,,
|
||||
|
@@ -1223,3 +1223,111 @@ scam_zoom:
|
||||
readOnly: False
|
||||
softwareTrigger: false
|
||||
|
||||
coll_x:
|
||||
description: Collimator X
|
||||
deviceClass: ophyd_devices.EpicsMotor
|
||||
deviceConfig: {prefix: 'X06DA-ES-COL:TRX'}
|
||||
onFailure: buffer
|
||||
enabled: True
|
||||
readoutPriority: baseline
|
||||
deviceTags:
|
||||
- se
|
||||
readOnly: False
|
||||
softwareTrigger: false
|
||||
|
||||
diag_z:
|
||||
description: Scintillator/diode Z
|
||||
deviceClass: ophyd_devices.EpicsMotor
|
||||
deviceConfig: {prefix: 'X06DA-ES-SCL:TRZ'}
|
||||
onFailure: buffer
|
||||
enabled: True
|
||||
readoutPriority: baseline
|
||||
deviceTags:
|
||||
- se
|
||||
readOnly: False
|
||||
softwareTrigger: false
|
||||
|
||||
i1:
|
||||
description: I1 diode
|
||||
deviceClass: ophyd.EpicsSignalRO
|
||||
deviceConfig: {read_pv: 'X06DA-ES-SCLDI:READOUT', auto_monitor: true}
|
||||
onFailure: buffer
|
||||
enabled: True
|
||||
readoutPriority: monitored
|
||||
deviceTags:
|
||||
- bpm
|
||||
readOnly: True
|
||||
softwareTrigger: false
|
||||
|
||||
bs_x:
|
||||
description: Beamstop X
|
||||
deviceClass: ophyd_devices.EpicsMotor
|
||||
deviceConfig: {prefix: 'X06DA-ES-BS:TRX'}
|
||||
onFailure: buffer
|
||||
enabled: True
|
||||
readoutPriority: baseline
|
||||
deviceTags:
|
||||
- se
|
||||
readOnly: False
|
||||
softwareTrigger: false
|
||||
|
||||
bs_y:
|
||||
description: Beamstop Y
|
||||
deviceClass: ophyd_devices.EpicsMotor
|
||||
deviceConfig: {prefix: 'X06DA-ES-BS:TRY'}
|
||||
onFailure: buffer
|
||||
enabled: True
|
||||
readoutPriority: baseline
|
||||
deviceTags:
|
||||
- se
|
||||
readOnly: False
|
||||
softwareTrigger: false
|
||||
|
||||
cryo_x:
|
||||
description: Cryo X
|
||||
deviceClass: ophyd_devices.EpicsMotor
|
||||
deviceConfig: {prefix: 'X06DA-ES-CS:TRX'}
|
||||
onFailure: buffer
|
||||
enabled: True
|
||||
readoutPriority: baseline
|
||||
deviceTags:
|
||||
- se
|
||||
readOnly: False
|
||||
softwareTrigger: false
|
||||
|
||||
cryo_temp:
|
||||
description: Cryo Temperature
|
||||
deviceClass: ophyd.EpicsSignalRO
|
||||
deviceConfig: {read_pv: 'X06DA-ES-CS:TEMP_RBV', auto_monitor: true}
|
||||
onFailure: buffer
|
||||
enabled: True
|
||||
readoutPriority: baseline
|
||||
deviceTags:
|
||||
- se
|
||||
readOnly: False
|
||||
softwareTrigger: false
|
||||
|
||||
det_y:
|
||||
description: Detector Y
|
||||
deviceClass: ophyd_devices.EpicsMotorEC
|
||||
deviceConfig: {prefix: 'X06DA-ES-DET:TRY'}
|
||||
onFailure: buffer
|
||||
enabled: True
|
||||
readoutPriority: baseline
|
||||
deviceTags:
|
||||
- det
|
||||
readOnly: False
|
||||
softwareTrigger: false
|
||||
|
||||
det_z:
|
||||
description: Detector Z
|
||||
deviceClass: ophyd_devices.EpicsMotorEC
|
||||
deviceConfig: {prefix: 'X06DA-ES-DET:TRZ'}
|
||||
onFailure: buffer
|
||||
enabled: True
|
||||
readoutPriority: baseline
|
||||
deviceTags:
|
||||
- det
|
||||
readOnly: False
|
||||
softwareTrigger: false
|
||||
|
||||
|
||||
@@ -1,11 +0,0 @@
|
||||
name,description,deviceClass,PV,readoutPriority,tag,readOnly,include,userParameter
|
||||
bl_bright,Backlight Brightness,Signal,X06DA-ES-BL:SET,baseline,state,no,yes,
|
||||
bl_pos,Backlight Positioner,Signal,X06DA-ES-BL:POS-SET,baseline,state,no,no,"{""type"":positioner}"
|
||||
bs_pos,Beamstop Positioner,Signal,X06DA-ES-BS:POS-SET,baseline,state,no,no,"{""type"":positioner}"
|
||||
bs_z,Beamstop Z,Motor,X06DA-ES-BS:TRZ,baseline,state,no,yes,"{""type"": guarded, ""min"": 13, ""samp"": 15, ""work_min"": 20, ""safe"": 41, ""max_blin"": 42, ""max_blout"": 70}"
|
||||
coll_y,Collimator Y,Motor,X06DA-ES-COL:TRY,baseline,state,no,yes,"{""type"": multi-position, ""in"": 40, ""out"": 20.0, ""park"": 0,""tol"":0.05}"
|
||||
cryo_pos,Cryo positioner,Signal,X06DA-ES-CS:POS-SET,baseline,state,no,no,"{""type"":positioner}"
|
||||
det_cov,Detector cover,Signal,X06DA-ES-DETCOV:SET,baseline,state,no,no,"{""type"":positioner}"
|
||||
diag_y,Scintillator/diode Y,Motor,X06DA-ES-SCL:TRY,baseline,state,no,yes,"{""type"": multi-position, ""scint"": 39, ""i1"": 44.0, ""out"": 20.0,""park"": 0,""tol"":0.3}"
|
||||
fl_bright,Frontlight Brightness,Signal,X06DA-ES-FL:SET,baseline,state,no,yes,
|
||||
xrf_pos,XRF Positioner,Signal,X06DA-ES-XRF:POS-SET,baseline,state,no,no,"{""type"":positioner}"
|
||||
|
@@ -9,6 +9,33 @@ bl_bright:
|
||||
- state
|
||||
readOnly: False
|
||||
softwareTrigger: false
|
||||
userParameter: {"type": continuous, "on": 1.3, "off": 0, “tol”: 0.01}
|
||||
|
||||
bl_pos:
|
||||
description: Backlight Positioner
|
||||
deviceClass: ophyd.EpicsSignal
|
||||
deviceConfig: {read_pv: 'X06DA-ES-BL:POS-SET', auto_monitor: true}
|
||||
onFailure: buffer
|
||||
enabled: True
|
||||
readoutPriority: baseline
|
||||
deviceTags:
|
||||
- state
|
||||
readOnly: False
|
||||
softwareTrigger: false
|
||||
userParameter: {"type": discrete, "in": 1, "out": 0}
|
||||
|
||||
bs_pos:
|
||||
description: Beamstop Positioner
|
||||
deviceClass: ophyd.EpicsSignal
|
||||
deviceConfig: {read_pv: 'X06DA-ES-BS:POS-SET', auto_monitor: true}
|
||||
onFailure: buffer
|
||||
enabled: True
|
||||
readoutPriority: baseline
|
||||
deviceTags:
|
||||
- state
|
||||
readOnly: False
|
||||
softwareTrigger: false
|
||||
userParameter: {"type": discrete, "in": 1, "out": 0}
|
||||
|
||||
bs_z:
|
||||
description: Beamstop Z
|
||||
@@ -21,7 +48,7 @@ bs_z:
|
||||
- state
|
||||
readOnly: False
|
||||
softwareTrigger: false
|
||||
userParameter: {"type": guarded, "min": 13, "samp": 15, "work_min": 20, "safe": 41, "max_blin": 42, "max_blout": 70}
|
||||
userParameter: {"type": continuous, "min": 13, "samp": 15, "work_min": 20, "safe": 23.8, "max_blin": 24, "max_blout": 35}
|
||||
|
||||
coll_y:
|
||||
description: Collimator Y
|
||||
@@ -34,7 +61,33 @@ coll_y:
|
||||
- state
|
||||
readOnly: False
|
||||
softwareTrigger: false
|
||||
userParameter: {"type": multi-position, "in": 40, "out": 20.0, "park": 0,"tol":0.05}
|
||||
userParameter: {"type": continuous, "in": 40, "out": 20.0, "park": 0,"tol":0.05}
|
||||
|
||||
cryo_pos:
|
||||
description: Cryo Positioner
|
||||
deviceClass: ophyd.EpicsSignal
|
||||
deviceConfig: {read_pv: 'X06DA-ES-CS:POS-SET', auto_monitor: true}
|
||||
onFailure: buffer
|
||||
enabled: True
|
||||
readoutPriority: baseline
|
||||
deviceTags:
|
||||
- state
|
||||
readOnly: False
|
||||
softwareTrigger: false
|
||||
userParameter: {"type": discrete, "in": 1, "out": 0}
|
||||
|
||||
det_cov:
|
||||
description: Detector Cover
|
||||
deviceClass: ophyd.EpicsSignal
|
||||
deviceConfig: {read_pv: 'X06DA-ES-DETCOV:SET', auto_monitor: true}
|
||||
onFailure: buffer
|
||||
enabled: True
|
||||
readoutPriority: baseline
|
||||
deviceTags:
|
||||
- state
|
||||
readOnly: False
|
||||
softwareTrigger: false
|
||||
userParameter: {"type": discrete, "open": 2, "close": 1}
|
||||
|
||||
diag_y:
|
||||
description: Scintillator/diode Y
|
||||
@@ -47,7 +100,7 @@ diag_y:
|
||||
- state
|
||||
readOnly: False
|
||||
softwareTrigger: false
|
||||
userParameter: {"type": multi-position, "scint": 39, "i1": 44.0, "out": 20.0,"park": 0,"tol":0.3}
|
||||
userParameter: {"type": continuous, "scint": 25, "i1": 29, "out": 5.0,"park": 0,"tol":0.3}
|
||||
|
||||
fl_bright:
|
||||
description: Frontlight Brightness
|
||||
@@ -60,4 +113,18 @@ fl_bright:
|
||||
- state
|
||||
readOnly: False
|
||||
softwareTrigger: false
|
||||
userParameter: {"type": continuous, "on": 3.0, "off": 0, “tol”: 0.01}
|
||||
|
||||
xrf_pos:
|
||||
description: XRF Positioner
|
||||
deviceClass: ophyd.EpicsSignal
|
||||
deviceConfig: {read_pv: 'X06DA-ES-XRF:POS-SET', auto_monitor: true}
|
||||
onFailure: buffer
|
||||
enabled: True
|
||||
readoutPriority: baseline
|
||||
deviceTags:
|
||||
- state
|
||||
readOnly: False
|
||||
softwareTrigger: false
|
||||
userParameter: {"type": discrete, "in": 1, "out": 0}
|
||||
|
||||
|
||||
@@ -2,3 +2,29 @@ base_config:
|
||||
- !include ./pxiii-standard-devices.yaml
|
||||
states_config:
|
||||
- !include ./pxiii-state-devices.yaml
|
||||
|
||||
smargon:
|
||||
description: REST-based device which connects to Smargopolo
|
||||
deviceClass: pxiii_bec.devices.smargopolo_smargon.Smargon
|
||||
deviceConfig: {prefix: 'http://x06da-smargopolo.psi.ch:3000'}
|
||||
onFailure: buffer
|
||||
enabled: True
|
||||
readoutPriority: baseline
|
||||
deviceTags:
|
||||
- smargon
|
||||
- motors
|
||||
readOnly: false
|
||||
softwareTrigger: false
|
||||
|
||||
aerotech:
|
||||
description: REST-based device which connects to AareScan and Aerotech
|
||||
deviceClass: pxiii_bec.devices.aerotech.Aerotech
|
||||
deviceConfig: {prefix: 'http://mx-x06da-queue-01:5234'}
|
||||
onFailure: buffer
|
||||
enabled: True
|
||||
readoutPriority: baseline
|
||||
deviceTags:
|
||||
- aerotech
|
||||
- motors
|
||||
readOnly: false
|
||||
softwareTrigger: false
|
||||
|
||||
@@ -0,0 +1,43 @@
|
||||
from ophyd import Component as Cpt
|
||||
|
||||
from .http import TIMESTAMP_ID, HttpDeviceController, HttpDeviceSignal, HttpOphydDevice
|
||||
|
||||
|
||||
class AerotechController(HttpDeviceController):
|
||||
_readback_endpoint = "/status"
|
||||
_target_endpoint = "/position"
|
||||
|
||||
def __init__(self, *, prefix, **kwargs):
|
||||
self._readbacks: dict[str, dict[str, float | bool]] = {}
|
||||
super().__init__(prefix=prefix, **kwargs)
|
||||
|
||||
def put(self, axis: str, val: float):
|
||||
self._rest_post(body={axis: val})
|
||||
|
||||
def get_readback(self, axis_id: str) -> tuple[float, float] | None:
|
||||
with self._readback_lock:
|
||||
if axis_id not in self._readbacks or TIMESTAMP_ID not in self._readbacks:
|
||||
return None
|
||||
return self._readbacks.get(axis_id)["pos"], self._readbacks.get(TIMESTAMP_ID) # type: ignore
|
||||
|
||||
|
||||
class Aerotech(HttpOphydDevice):
|
||||
controller_class = AerotechController
|
||||
|
||||
x = Cpt(HttpDeviceSignal, axis_identifier="x", tolerance=0.01)
|
||||
y = Cpt(HttpDeviceSignal, axis_identifier="y", tolerance=0.01)
|
||||
z = Cpt(HttpDeviceSignal, axis_identifier="z", tolerance=0.01)
|
||||
u = Cpt(HttpDeviceSignal, axis_identifier="u", tolerance=0.01)
|
||||
vel_u_deg_s = Cpt(HttpDeviceSignal, axis_identifier="vel_u_deg_s", tolerance=0.01)
|
||||
|
||||
|
||||
def _test():
|
||||
a = Aerotech(name="aerotech", prefix="http://mx-x06da-queue-01:5234")
|
||||
a.wait_for_connection()
|
||||
return a
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
aerotech = _test()
|
||||
print(aerotech.read())
|
||||
aerotech.stop()
|
||||
@@ -0,0 +1,178 @@
|
||||
import time
|
||||
from abc import ABC, abstractmethod
|
||||
from threading import Event, RLock, Thread
|
||||
from typing import Any
|
||||
|
||||
from ophyd import OphydObject
|
||||
from ophyd_devices import PSIDeviceBase
|
||||
from ophyd_devices.utils.socket import SocketSignal
|
||||
from requests import Response, Session
|
||||
|
||||
TIMESTAMP_ID = "__timestamp"
|
||||
_POLL_INTERVAL_SLOW = 0.1
|
||||
|
||||
|
||||
class HttpRestError(Exception):
|
||||
"""Error for rest calls from a HttpRestSignal."""
|
||||
|
||||
def __init__(self, resp: Response, *args: object, value: Any | None = None) -> None:
|
||||
method, url = resp.request.method, resp.request.url
|
||||
data = f"{str(value)} to " if value is not None else ""
|
||||
super().__init__(
|
||||
f"Could not {method} {data}{url}. Code: {resp.status_code}. Reason: {resp.reason}.",
|
||||
*args,
|
||||
)
|
||||
|
||||
|
||||
class HttpDeviceController(OphydObject, ABC):
|
||||
"""Controller to consolidate polling loops and other REST calls for devices which communicate
|
||||
with HTTP REST interfaces"""
|
||||
|
||||
_readback_endpoint: str
|
||||
_target_endpoint: str
|
||||
|
||||
def __init__(self, *, prefix, **kwargs):
|
||||
self._readbacks: dict
|
||||
self._session = Session()
|
||||
self._prefix = prefix
|
||||
self._targets = {}
|
||||
self._signal_registry: set[str] = set()
|
||||
self._readback_poll_interval: float = _POLL_INTERVAL_SLOW
|
||||
|
||||
super().__init__(**kwargs)
|
||||
self._setup_readback()
|
||||
|
||||
def _setup_readback(self):
|
||||
self._stop_monitor_readback_event = Event()
|
||||
self._readback_lock = RLock()
|
||||
self._monitor_readback_thread = Thread(
|
||||
target=self._monitor,
|
||||
args=[
|
||||
self._readback_endpoint,
|
||||
self._stop_monitor_readback_event,
|
||||
self._readback_lock,
|
||||
self._readbacks,
|
||||
],
|
||||
)
|
||||
|
||||
def manual_update(self):
|
||||
self._update_reading(self._readback_endpoint, self._readback_lock, self._readbacks)
|
||||
|
||||
def _update_reading(self, endpoint: str, lock: RLock, buffer: dict):
|
||||
data = self._rest_get(endpoint)
|
||||
timestamp = time.monotonic()
|
||||
with lock:
|
||||
buffer.update(data)
|
||||
buffer["__timestamp"] = timestamp
|
||||
|
||||
def _monitor(self, endpoint: str, event: Event, lock: RLock, buffer: dict):
|
||||
while not event.is_set():
|
||||
self._update_reading(endpoint, lock, buffer)
|
||||
time.sleep(self._readback_poll_interval)
|
||||
|
||||
def _clean_monitor(self):
|
||||
if self._monitor_readback_thread.is_alive():
|
||||
self._stop_monitor_readback_event.set()
|
||||
self._monitor_readback_thread.join(timeout=2)
|
||||
if self._monitor_readback_thread.is_alive():
|
||||
raise RuntimeError("Failed to clean up Aerotech monitor thread.")
|
||||
|
||||
def register(self, axis_id: str):
|
||||
self._signal_registry.add(axis_id)
|
||||
|
||||
def _rest_get(self, endpoint):
|
||||
resp = self._session.get(self._prefix + endpoint)
|
||||
if not resp.ok:
|
||||
raise HttpRestError(resp)
|
||||
return resp.json()
|
||||
|
||||
def _rest_put(self, params: dict | None = None, body: dict | None = None):
|
||||
resp = self._session.put(self._prefix + self._target_endpoint, params=params, json=body)
|
||||
if not resp.ok:
|
||||
raise HttpRestError(resp, value=params)
|
||||
|
||||
def _rest_post(self, params: dict | None = None, body: dict | None = None):
|
||||
resp = self._session.post(self._prefix + self._target_endpoint, params=params, json=body)
|
||||
if not resp.ok:
|
||||
raise HttpRestError(resp, value=params)
|
||||
|
||||
def start_monitor(self):
|
||||
"""Start or restart the automonitor thread."""
|
||||
self._clean_monitor()
|
||||
self._setup_readback()
|
||||
self._monitor_readback_thread.start()
|
||||
|
||||
def monitor_stopped(self):
|
||||
return not self._monitor_readback_thread.is_alive()
|
||||
|
||||
def put(self, axis: str, val: float):
|
||||
self._rest_put({axis: val})
|
||||
|
||||
@abstractmethod
|
||||
def get_readback(self, axis_id: str) -> tuple[float, float] | None:
|
||||
"""Return a tuple (reading, timestamp) if the axis_id exists"""
|
||||
|
||||
def stop(self):
|
||||
# There doesn't appear to be a stop endpoint on the server
|
||||
# Best effort: set the target to the current position
|
||||
pass
|
||||
# TODO: self._rest_put(self._readbacks)
|
||||
|
||||
|
||||
class HttpDeviceSignal(SocketSignal):
|
||||
"""Ophyd signal which gets and puts to a REST API rather than EPICS PVs, mediated through the Aerotech
|
||||
Controller"""
|
||||
|
||||
def __init__(self, *args, axis_identifier: str, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
controller: HttpDeviceController | None = getattr(self.root, "controller", None)
|
||||
if controller is None:
|
||||
raise TypeError("HttpDeviceSignal must be used in a device with a HttpDeviceController")
|
||||
self._controller = controller
|
||||
self._axis_id = axis_identifier
|
||||
self._controller.register(self._axis_id)
|
||||
|
||||
def _socket_get(self): # type: ignore
|
||||
self._readback, self.metadata["timestamp"] = self._controller.get_readback(
|
||||
self._axis_id
|
||||
) or (0.0, 0.0)
|
||||
return self._readback
|
||||
|
||||
def _socket_set(self, val: float):
|
||||
self._controller.put(self._axis_id, val)
|
||||
|
||||
def get(self, **kwargs):
|
||||
if self._controller.monitor_stopped():
|
||||
self._controller.start_monitor()
|
||||
return super().get(**kwargs)
|
||||
|
||||
|
||||
class HttpOphydDevice(PSIDeviceBase):
|
||||
controller_class: type[HttpDeviceController]
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
name: str,
|
||||
prefix: str = "",
|
||||
scan_info=None,
|
||||
device_manager=None,
|
||||
**kwargs,
|
||||
):
|
||||
self.controller = self.controller_class(prefix=prefix)
|
||||
super().__init__(
|
||||
name=name,
|
||||
prefix=prefix,
|
||||
scan_info=scan_info,
|
||||
device_manager=device_manager,
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
def wait_for_connection(self, **kwargs): # type: ignore
|
||||
self.controller.start_monitor()
|
||||
self.controller.manual_update()
|
||||
return super().wait_for_connection(**kwargs)
|
||||
|
||||
def stop(self, *, success: bool = False) -> None:
|
||||
self.controller.stop()
|
||||
return super().stop(success=success)
|
||||
@@ -0,0 +1,42 @@
|
||||
from ophyd import Component as Cpt
|
||||
from ophyd_devices import PSIDeviceBase
|
||||
|
||||
from .http import HttpDeviceController, HttpDeviceSignal, HttpOphydDevice
|
||||
|
||||
_TIMESTAMP_ID = "__timestamp"
|
||||
_POLL_INTERVAL_SLOW = 0.1
|
||||
|
||||
|
||||
class SmargonController(HttpDeviceController):
|
||||
"""Controller to consolidate polling loops and other REST calls for the smargon"""
|
||||
|
||||
_readback_endpoint = "/readbackSCS"
|
||||
_target_endpoint = "/targetSCS"
|
||||
|
||||
def __init__(self, *, prefix, **kwargs):
|
||||
self._readbacks: dict[str, float] = {}
|
||||
super().__init__(prefix=prefix, **kwargs)
|
||||
|
||||
def get_readback(self, axis_id: str) -> tuple[float, float] | None:
|
||||
with self._readback_lock:
|
||||
if axis_id not in self._readbacks or _TIMESTAMP_ID not in self._readbacks:
|
||||
return None
|
||||
return self._readbacks.get(axis_id), self._readbacks.get(_TIMESTAMP_ID) # type: ignore
|
||||
|
||||
def put(self, axis: str, val: float):
|
||||
self._rest_put(params={axis: val})
|
||||
|
||||
def stop(self):
|
||||
# There doesn't appear to be a stop endpoint on the server
|
||||
# Best effort: set the target to the current position
|
||||
self._rest_put(params=self._readbacks)
|
||||
|
||||
|
||||
class Smargon(HttpOphydDevice):
|
||||
controller_class = SmargonController
|
||||
|
||||
x = Cpt(HttpDeviceSignal, axis_identifier="SHX", tolerance=0.01)
|
||||
y = Cpt(HttpDeviceSignal, axis_identifier="SHY", tolerance=0.01)
|
||||
z = Cpt(HttpDeviceSignal, axis_identifier="SHZ", tolerance=0.01)
|
||||
phi = Cpt(HttpDeviceSignal, axis_identifier="PHI", tolerance=0.01)
|
||||
chi = Cpt(HttpDeviceSignal, axis_identifier="CHI", tolerance=0.01)
|
||||
@@ -0,0 +1,194 @@
|
||||
"""Planner to move between beamline statesΩ"""
|
||||
|
||||
import time
|
||||
from collections import defaultdict, deque
|
||||
# from enums import BeamlineState, TemperatureMode
|
||||
# from matcher import DeviceMatcher, nonzero_is_on
|
||||
|
||||
|
||||
class StateChangePlanner:
|
||||
"""Moves devices to the correct positions to achieve a given state"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
devices,
|
||||
states: dict[BeamlineState, dict[str, str]],
|
||||
allow_modifiers=None,
|
||||
deps=None,
|
||||
stage_timeout=5,
|
||||
debug=False,
|
||||
):
|
||||
self.devices = devices
|
||||
self.states = states
|
||||
self.allow_modifiers = allow_modifiers or {}
|
||||
self.deps = deps
|
||||
self.stage_timeout = stage_timeout
|
||||
self.debug = debug
|
||||
|
||||
self.modifiers = {
|
||||
TemperatureMode.CRYO: {"cryo_pos": "in"},
|
||||
TemperatureMode.ROOM_TEMP: {"cryo_pos": "out"},
|
||||
}
|
||||
|
||||
self.matcher = DeviceMatcher()
|
||||
# Register rules
|
||||
self.matcher.register("bl_bright", nonzero_is_on)
|
||||
self.matcher.register("fl_bright", nonzero_is_on)
|
||||
# self.matcher.register("bs_z", threshold_rule("safe"))
|
||||
|
||||
def _merged_state(self, state, modifier):
|
||||
target = dict(self.states[state])
|
||||
|
||||
if modifier:
|
||||
if isinstance(modifier, str):
|
||||
modifier = TemperatureMode(modifier)
|
||||
|
||||
if self.allow_modifiers.get(state, False):
|
||||
target.update(self.modifiers[modifier])
|
||||
return target
|
||||
|
||||
def execute_plan(self, stage, state_name):
|
||||
"""Execute the planning stage"""
|
||||
statuses = []
|
||||
moved = []
|
||||
start = time.time()
|
||||
|
||||
# trigger moves in parallel
|
||||
for dev, pos in stage:
|
||||
d = self.devices[dev]
|
||||
if not d.is_at(pos):
|
||||
# status = d.mv(pos)
|
||||
status = d.mv(pos)
|
||||
statuses.append((dev, pos, status))
|
||||
moved.append((dev, d, pos))
|
||||
|
||||
# wait for all to finish
|
||||
for dev, pos, status in statuses:
|
||||
remaining = self.stage_timeout - (time.time() - start)
|
||||
if remaining <= 0:
|
||||
raise RuntimeError(f"Stage timeout while moving to {state_name.name}")
|
||||
try:
|
||||
status.wait(timeout=remaining)
|
||||
except Exception:
|
||||
print(f"\nTimeout waiting for {dev} -> {pos}")
|
||||
# print("Positions:", self.print_positions())
|
||||
raise
|
||||
|
||||
# optional final verification (recommended for beamlines)
|
||||
for dev, d, pos in moved:
|
||||
if not self.matcher.matches(dev, d, pos):
|
||||
raise RuntimeError(
|
||||
f"{dev} did not reach position '{pos}' while moving to "
|
||||
f"{state_name.name}. Check motor status in EPICS."
|
||||
)
|
||||
print("Stage complete.")
|
||||
|
||||
def move_to(self, state_name, modifier=None):
|
||||
"""Move devices to the correct positions to achieve a given state"""
|
||||
if isinstance(state_name, str):
|
||||
state_name = BeamlineState(state_name)
|
||||
target = self._merged_state(state_name, modifier)
|
||||
|
||||
plan = self._plan(target)
|
||||
print(len(plan), "stages to reach target state")
|
||||
|
||||
# print("PLAN:")
|
||||
# for i, stage in enumerate(plan):
|
||||
# print(f"Stage {i + 1}: {stage}")
|
||||
seq = 1
|
||||
for stage in plan:
|
||||
print(f"Stage {seq}: {stage}")
|
||||
self.execute_plan(stage, state_name)
|
||||
seq += 1
|
||||
|
||||
def available_states(self):
|
||||
"""Return a list of available states"""
|
||||
return list(self.states.keys())
|
||||
|
||||
def get_positions(self):
|
||||
"""Return current positions of all SE devices"""
|
||||
return {name: dev.pos for name, dev in self.devices.items()}
|
||||
|
||||
def print_positions(self):
|
||||
"""Return current state of all devices"""
|
||||
for name, device in self.devices.items():
|
||||
print(f"{name:10s} : {device.pos:10s} value: {device.actual}")
|
||||
|
||||
def diff_states(self, before):
|
||||
"""Return a dict of {device: (before, after)} pairs for devices that changed state"""
|
||||
after = self.get_positions()
|
||||
return {k: (before[k], after[k]) for k in before if before[k] != after[k]}
|
||||
|
||||
def current_state(self):
|
||||
"""Return all current matching BeamlineState and TemperatureMode combinations,
|
||||
prioritizing non-None modifiers first."""
|
||||
matches = [] # Store all matching (state, modifier) pairs
|
||||
|
||||
for state in self.states:
|
||||
# Start with prioritized modifiers: Non-None first, then None.
|
||||
modifiers = list(self.modifiers.keys())
|
||||
modifiers.append(None) # Add `None` as a fallback after real modifiers.
|
||||
|
||||
for modifier in modifiers:
|
||||
# Combine state and modifier to get full configuration
|
||||
config = self._merged_state(state, modifier)
|
||||
|
||||
# Check if all devices match their expected positions
|
||||
all_match = True
|
||||
|
||||
for d, expected in config.items():
|
||||
dev = self.devices[d]
|
||||
|
||||
if not self.matcher.matches(d, dev, expected):
|
||||
all_match = False
|
||||
break
|
||||
|
||||
if all_match:
|
||||
matches.append((state.name, modifier.name if modifier else None))
|
||||
|
||||
return matches if matches else None
|
||||
|
||||
def is_state(self, state, modifier=None):
|
||||
"""Check if the current state matches the given state and modifier."""
|
||||
actual = self.current_state()
|
||||
if not actual:
|
||||
return False
|
||||
|
||||
if modifier is None:
|
||||
# match any modifier
|
||||
return any(s == state.name for s, _ in actual)
|
||||
|
||||
return (state.name, modifier.name) in actual
|
||||
|
||||
def _plan(self, target):
|
||||
|
||||
graph = defaultdict(set)
|
||||
indeg = defaultdict(int)
|
||||
nodes = set()
|
||||
|
||||
for dev, pos in target.items():
|
||||
node = (dev, pos)
|
||||
nodes.add(node)
|
||||
|
||||
for dep in self.deps.get(node, []):
|
||||
graph[dep].add(node)
|
||||
indeg[node] += 1
|
||||
nodes.add(dep)
|
||||
|
||||
q = deque(n for n in nodes if indeg[n] == 0)
|
||||
stages = []
|
||||
|
||||
while q:
|
||||
stage = list(q)
|
||||
stages.append(stage)
|
||||
q.clear()
|
||||
|
||||
for n in stage:
|
||||
for m in graph[n]:
|
||||
indeg[m] -= 1
|
||||
if indeg[m] == 0:
|
||||
q.append(m)
|
||||
|
||||
if sum(len(s) for s in stages) != len(nodes):
|
||||
raise RuntimeError("Circular dependency in state dependencies")
|
||||
return stages
|
||||
@@ -0,0 +1,21 @@
|
||||
# from enums import BeamlineState
|
||||
import yaml
|
||||
|
||||
|
||||
class DefineStatesManager:
|
||||
@staticmethod
|
||||
def initialize_states(states_file):
|
||||
"""
|
||||
Returns the states and modifiers defined in the specified states file.
|
||||
"""
|
||||
with open(states_file, "r", encoding="utf-8") as f:
|
||||
cfg = yaml.safe_load(f)
|
||||
states = {}
|
||||
allow_modifiers = {}
|
||||
|
||||
for name, config in cfg["states"].items():
|
||||
state = BeamlineState(name)
|
||||
allow_modifiers[state] = config.pop("allow_modifiers", False)
|
||||
states[state] = config
|
||||
|
||||
return states, allow_modifiers
|
||||
@@ -0,0 +1,70 @@
|
||||
""" Build the sample environment devices"""
|
||||
import yaml
|
||||
|
||||
# from position_device import PositionDevice
|
||||
|
||||
def motor_resolver(bec_name):
|
||||
|
||||
candidates = [
|
||||
bec_name,
|
||||
bec_name.replace("_", "."),
|
||||
]
|
||||
|
||||
for path in candidates:
|
||||
try:
|
||||
obj = dev
|
||||
|
||||
for part in path.split("."):
|
||||
obj = getattr(obj, part)
|
||||
|
||||
return obj
|
||||
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
raise ValueError(f"Cannot resolve motor for '{bec_name}'")
|
||||
|
||||
def build_devices(yaml_file, mock_devices):
|
||||
""" Build devices from the beamline states yaml"""
|
||||
|
||||
|
||||
state_devices = {}
|
||||
|
||||
with open(yaml_file, encoding="utf-8") as f:
|
||||
data = yaml.safe_load(f)
|
||||
|
||||
for bec_name, cfg in data.items():
|
||||
|
||||
user = cfg.get("userParameter")
|
||||
|
||||
# Skip devices without user parameters
|
||||
if not user:
|
||||
continue
|
||||
|
||||
|
||||
tol = user.get("tol", 0.1)
|
||||
|
||||
positions = {
|
||||
k: v for k, v in user.items()
|
||||
if k not in ("type", "tol")
|
||||
}
|
||||
|
||||
allow_arbitrary = (user["type"] == "continuous")
|
||||
|
||||
|
||||
|
||||
pos_dev = PositionDevice(
|
||||
bec_name=bec_name,
|
||||
mot_device = motor_resolver(bec_name),
|
||||
positions=positions,
|
||||
tol=tol,
|
||||
allow_arbitrary=allow_arbitrary,
|
||||
use_mock=bec_name in mock_devices,
|
||||
)
|
||||
|
||||
state_devices[bec_name] = pos_dev
|
||||
return state_devices
|
||||
|
||||
|
||||
|
||||
|
||||
+44
-82
@@ -1,3 +1,5 @@
|
||||
"""Start of a beamline health checker"""
|
||||
|
||||
from dataclasses import dataclass, field
|
||||
from enum import Enum
|
||||
from typing import Any, Callable
|
||||
@@ -12,6 +14,8 @@ from bec_lib.device import Signal, Positioner
|
||||
|
||||
|
||||
class Status(Enum):
|
||||
"""Define standard statuses"""
|
||||
|
||||
OK = 0
|
||||
WARNING = 1
|
||||
ERROR = 2
|
||||
@@ -25,15 +29,12 @@ class Status(Enum):
|
||||
Status.ERROR: "red",
|
||||
Status.UNKNOWN: "blue",
|
||||
}[self]
|
||||
|
||||
|
||||
@property
|
||||
def color_scilog(self):
|
||||
return {
|
||||
Status.OK: "green",
|
||||
Status.WARNING: "",
|
||||
Status.ERROR: "red",
|
||||
Status.UNKNOWN: "",
|
||||
}[self]
|
||||
return {Status.OK: "green", Status.WARNING: "", Status.ERROR: "red", Status.UNKNOWN: ""}[
|
||||
self
|
||||
]
|
||||
|
||||
|
||||
# -------------------------------------------------------------------
|
||||
@@ -43,9 +44,10 @@ class Status(Enum):
|
||||
|
||||
@dataclass
|
||||
class HealthCheckResult:
|
||||
"""Define the output of the health check"""
|
||||
|
||||
name: str
|
||||
description: str
|
||||
description: str
|
||||
status: Status
|
||||
|
||||
value: Any = None
|
||||
@@ -59,32 +61,24 @@ class HealthCheckResult:
|
||||
if self.status == Status.OK:
|
||||
return f"[{self.status.name}] {self.description}"
|
||||
|
||||
return (
|
||||
f"[{self.status.name}] "
|
||||
f"{self.description}: {self.message}"
|
||||
)
|
||||
|
||||
return f"[{self.status.name}] " f"{self.description}: {self.message}"
|
||||
|
||||
def formatted_message(self):
|
||||
if self.status == Status.OK:
|
||||
return f"[{self.status.name}] {self.name}"
|
||||
|
||||
return (
|
||||
f"[{self.status.name}] "
|
||||
f"{self.description}: {self.message}"
|
||||
)
|
||||
|
||||
return f"[{self.status.name}] " f"{self.description}: {self.message}"
|
||||
|
||||
|
||||
# -------------------------------------------------------------------
|
||||
# Send to SciLog
|
||||
# -------------------------------------------------------------------
|
||||
|
||||
def send_to_scilog(results):
|
||||
|
||||
counts = {
|
||||
Status.OK: 0,
|
||||
Status.WARNING: 0,
|
||||
Status.ERROR: 0,
|
||||
Status.UNKNOWN: 0,
|
||||
}
|
||||
def send_to_scilog(results):
|
||||
"""Make a scilog entry of the health check"""
|
||||
|
||||
counts = {Status.OK: 0, Status.WARNING: 0, Status.ERROR: 0, Status.UNKNOWN: 0}
|
||||
|
||||
for result in results:
|
||||
counts[result.status] += 1
|
||||
@@ -92,24 +86,18 @@ def send_to_scilog(results):
|
||||
|
||||
msg = bec.messaging.scilog.new()
|
||||
|
||||
msg.add_text(
|
||||
f"Beamline Health Summary {timestamp}",
|
||||
bold = True
|
||||
)
|
||||
msg.add_text(f"Beamline Health Summary {timestamp}", bold=True)
|
||||
|
||||
msg.add_text("\n")
|
||||
for status, count in counts.items():
|
||||
msg.add_text(
|
||||
f"{status.name:<10}: {count}",
|
||||
bold=True,
|
||||
)
|
||||
msg.add_text(f"{status.name:<10}: {count}", bold=True)
|
||||
msg.add_text("\n")
|
||||
|
||||
for result in results:
|
||||
msg.add_text(
|
||||
result.formatted_message(),
|
||||
# bold=result.status != Status.OK,
|
||||
color = result.status.color_scilog
|
||||
color=result.status.color_scilog,
|
||||
)
|
||||
msg.add_text("\n")
|
||||
msg.add_tags(["beamline health check"])
|
||||
@@ -120,14 +108,13 @@ def send_to_scilog(results):
|
||||
# Configuration
|
||||
# -------------------------------------------------------------------
|
||||
|
||||
|
||||
@dataclass
|
||||
class BeamlineHealthConfig:
|
||||
"""Define some rules to check against"""
|
||||
|
||||
signal_rules: dict[str, Callable] = field(
|
||||
default_factory=lambda: {
|
||||
"cam": lambda x: x != 0,
|
||||
"bpm": lambda x: x != 0,
|
||||
}
|
||||
default_factory=lambda: {"cam": lambda x: x != 0, "bpm": lambda x: x != 0}
|
||||
)
|
||||
|
||||
motor_tolerances: dict[str, float] = field(
|
||||
@@ -147,6 +134,7 @@ class BeamlineHealthConfig:
|
||||
|
||||
|
||||
def get_devices():
|
||||
"""Return a list of all the beamline devices"""
|
||||
return list(dev.items())
|
||||
|
||||
|
||||
@@ -156,14 +144,11 @@ def get_devices():
|
||||
|
||||
|
||||
def check_signals(devices, config: BeamlineHealthConfig):
|
||||
"""Check the signal devices"""
|
||||
|
||||
results = []
|
||||
|
||||
signal_devices = [
|
||||
(name, obj)
|
||||
for name, obj in devices
|
||||
if isinstance(obj, Signal)
|
||||
]
|
||||
signal_devices = [(name, obj) for name, obj in devices if isinstance(obj, Signal)]
|
||||
|
||||
for name, obj in signal_devices:
|
||||
|
||||
@@ -250,14 +235,11 @@ def check_signals(devices, config: BeamlineHealthConfig):
|
||||
|
||||
|
||||
def check_motors(devices, config: BeamlineHealthConfig):
|
||||
"""Check the standard motor devices"""
|
||||
|
||||
results = []
|
||||
|
||||
motor_devices = [
|
||||
(name, obj)
|
||||
for name, obj in devices
|
||||
if isinstance(obj, Positioner)
|
||||
]
|
||||
motor_devices = [(name, obj) for name, obj in devices if isinstance(obj, Positioner)]
|
||||
|
||||
for name, obj in motor_devices:
|
||||
|
||||
@@ -337,10 +319,7 @@ def check_motors(devices, config: BeamlineHealthConfig):
|
||||
|
||||
diff = abs(actual - setpoint)
|
||||
|
||||
tolerance = config.motor_tolerances.get(
|
||||
name,
|
||||
config.default_motor_tolerance,
|
||||
)
|
||||
tolerance = config.motor_tolerances.get(name, config.default_motor_tolerance)
|
||||
|
||||
if diff > tolerance:
|
||||
|
||||
@@ -392,6 +371,7 @@ def check_motors(devices, config: BeamlineHealthConfig):
|
||||
|
||||
|
||||
def check2(config: BeamlineHealthConfig | None = None):
|
||||
"""Perform the checks"""
|
||||
if config is None:
|
||||
config = BeamlineHealthConfig()
|
||||
|
||||
@@ -407,9 +387,7 @@ def check2(config: BeamlineHealthConfig | None = None):
|
||||
# Sort by severity
|
||||
# ---------------------------------------------------------------
|
||||
|
||||
results.sort(
|
||||
key=lambda r: r.status.value,
|
||||
)
|
||||
results.sort(key=lambda r: r.status.value)
|
||||
|
||||
return results
|
||||
|
||||
@@ -420,10 +398,10 @@ def check2(config: BeamlineHealthConfig | None = None):
|
||||
|
||||
|
||||
def summary_text(results):
|
||||
"""Summarise the results in a text table"""
|
||||
|
||||
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M")
|
||||
|
||||
|
||||
n_ok = sum(r.status == Status.OK for r in results)
|
||||
|
||||
n_warn = sum(r.status == Status.WARNING for r in results)
|
||||
@@ -448,48 +426,32 @@ def summary_text(results):
|
||||
# -------------------------------------------------------------------
|
||||
# Filter results
|
||||
# -------------------------------------------------------------------
|
||||
def filter_results(results, statuses = None):
|
||||
def filter_results(results, statuses=None):
|
||||
"""Filter the results"""
|
||||
if statuses is None:
|
||||
return results
|
||||
|
||||
return [
|
||||
r for r in results
|
||||
if r.status in statuses
|
||||
]
|
||||
|
||||
return [r for r in results if r.status in statuses]
|
||||
|
||||
|
||||
# -------------------------------------------------------------------
|
||||
# CLI Entry Point
|
||||
# -------------------------------------------------------------------
|
||||
|
||||
|
||||
def run_check(show_all= False):
|
||||
def run_check(show_all=False):
|
||||
"""Runs the checks and outputs the results"""
|
||||
|
||||
results = check2()
|
||||
|
||||
print(summary_text(results))
|
||||
|
||||
problem_results = filter_results(
|
||||
results,
|
||||
statuses={
|
||||
Status.WARNING,
|
||||
Status.ERROR,
|
||||
Status.UNKNOWN,
|
||||
}
|
||||
results, statuses={Status.WARNING, Status.ERROR, Status.UNKNOWN}
|
||||
)
|
||||
send_to_scilog(results)
|
||||
|
||||
|
||||
if not show_all:
|
||||
results = filter_results(
|
||||
results,
|
||||
statuses={
|
||||
Status.WARNING,
|
||||
Status.ERROR,
|
||||
Status.UNKNOWN
|
||||
}
|
||||
)
|
||||
results = filter_results(results, statuses={Status.WARNING, Status.ERROR, Status.UNKNOWN})
|
||||
for result in results:
|
||||
print(result)
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
@@ -0,0 +1,36 @@
|
||||
"""Planner dependencies"""
|
||||
def planner_deps():
|
||||
"""Define the dependencies between beamline positions"""
|
||||
return {
|
||||
("bs_z", "samp"): [
|
||||
("aerotech_x", "out"),
|
||||
("diag_y", "out"),
|
||||
("coll_y", "out"),
|
||||
],
|
||||
("aerotech_x", "in"): [
|
||||
("diag_y", "out"),
|
||||
("bs_z", "safe"),
|
||||
],
|
||||
("aerotech_x", "out"): [
|
||||
("diag_y", "out"),
|
||||
("bs_z", "safe"),
|
||||
],
|
||||
("diag_y", "scint"): [
|
||||
("aerotech_x", "out"),
|
||||
("bs_z", "safe"),
|
||||
("cryo_pos", "out"),
|
||||
],
|
||||
("diag_y", "i1"): [
|
||||
("aerotech_x", "out"),
|
||||
("bs_z", "safe"),
|
||||
("cryo_pos", "out"),
|
||||
],
|
||||
("bs_pos", "out"): [("bs_z", "safe")],
|
||||
("bs_pos", "in"): [("bs_z", "safe")],
|
||||
("diag_y", "out"): [("bs_z", "safe")],
|
||||
("diag_y", "park"): [("bs_z", "safe")],
|
||||
("coll_y", "out"): [("bs_z", "safe")],
|
||||
("coll_y", "park"): [("bs_z", "safe")],
|
||||
("coll_y", "in"): [("bs_z", "safe")],
|
||||
("coll_y", "intermediate"): [("bs_z", "safe")],
|
||||
}
|
||||
@@ -0,0 +1,22 @@
|
||||
"""
|
||||
This module manages the initialization of devices."""
|
||||
|
||||
# from guards import attach_guards
|
||||
# from policies import attach_policies
|
||||
# from build_devices import build_devices
|
||||
|
||||
|
||||
class DeviceManager:
|
||||
"""Class for building devices and attaching safety guards and policies."""
|
||||
|
||||
@staticmethod
|
||||
def initialize_devices(state_devices_file, rest_devices_file, mock_devices):
|
||||
"""
|
||||
Initializes sample environment devices from the specified file.
|
||||
"""
|
||||
devices = build_devices(state_devices_file, mock_devices)
|
||||
rest_devices = build_devices(rest_devices_file, mock_devices)
|
||||
devices.update(rest_devices)
|
||||
attach_guards(devices)
|
||||
attach_policies(devices)
|
||||
return devices
|
||||
@@ -0,0 +1,22 @@
|
||||
"""Enums for beamline states"""
|
||||
from enum import Enum
|
||||
|
||||
|
||||
class BeamlineState(str, Enum):
|
||||
"""List of beamline states"""
|
||||
ROBOT_SAMPLE_EXCHANGE = "robot_sample_exchange"
|
||||
SAMPLE_ALIGNMENT = "sample_alignment"
|
||||
DATA_COLLECTION = "data_collection"
|
||||
DC_XRF = "DC_XRF"
|
||||
MANUAL_SAMPLE_EXCHANGE = "manual_sample_exchange"
|
||||
BEAM_VISUALISATION = "beam_visualisation"
|
||||
FLUX_MEASUREMENT = "flux_measurement"
|
||||
BEAMSTOP_ALIGNMENT = "beamstop_alignment"
|
||||
MAINTENANCE = "maintenance"
|
||||
XTAL_SNAPSHOT = "xtal_snapshot"
|
||||
|
||||
|
||||
class TemperatureMode(str, Enum):
|
||||
"""List of temperature modes"""
|
||||
CRYO = "cryo"
|
||||
ROOM_TEMP = "room_temp"
|
||||
@@ -0,0 +1,95 @@
|
||||
"""Setup guards for devices."""
|
||||
|
||||
|
||||
class GuardViolation(Exception):
|
||||
"""Raised when a guarded move is not allowed."""
|
||||
|
||||
|
||||
class AtPositionGuard:
|
||||
"""Guard that checks if a device is in a specific position."""
|
||||
|
||||
def __init__(self, device, position):
|
||||
self.device = device
|
||||
self.pos = position
|
||||
|
||||
def check(self):
|
||||
"""Check if the device is in the specified position."""
|
||||
if self.device.pos != self.pos:
|
||||
raise GuardViolation(
|
||||
f"{self.device.bec_name} must be in the '{self.pos}' position"
|
||||
)
|
||||
# print("move allowed")
|
||||
return True
|
||||
|
||||
def requirement(self):
|
||||
"""Return the requirement for the guard."""
|
||||
return (self.device.bec_name, self.pos)
|
||||
|
||||
|
||||
class MinMaxGuard:
|
||||
"""Guard that checks if a device is within a specific range."""
|
||||
|
||||
def __init__(self, device, limit_value, direction):
|
||||
self.device = device
|
||||
self.limit_value = limit_value
|
||||
self.direction = direction # direction: 'max' or 'min'
|
||||
|
||||
def check(self):
|
||||
"""Check if the device is within the specified range."""
|
||||
if self.direction == "less_than":
|
||||
if not (self.device.actual - self.device.tol) <= self.limit_value:
|
||||
raise GuardViolation(
|
||||
f"{self.device.bec_name} must be less than or equal to {self.limit_value} mm"
|
||||
)
|
||||
elif self.direction == "more_than":
|
||||
if not (self.device.actual + self.device.tol) >= self.limit_value:
|
||||
raise GuardViolation(
|
||||
f"{self.device.bec_name} must be greater than or equal to {self.limit_value} mm"
|
||||
)
|
||||
else:
|
||||
raise ValueError(
|
||||
f"Invalid direction '{self.direction}'. Use 'less_than' or 'more_than'."
|
||||
)
|
||||
|
||||
# print("move allowed")
|
||||
return True
|
||||
|
||||
def requirement(self):
|
||||
"""Return the requirement for the guard."""
|
||||
# planner cannot handle numeric constraints directly
|
||||
# return None -> planner ignores
|
||||
return None
|
||||
|
||||
|
||||
def guards_setup(d):
|
||||
"""Define guards for devices."""
|
||||
guards = {}
|
||||
guards["bs_safe"] = AtPositionGuard(d["bs_z"], position="safe")
|
||||
guards["bs_max_blin"] = MinMaxGuard(
|
||||
d["bs_z"], direction="less_than", limit_value=d["bs_z"].positions["max_blin"]
|
||||
)
|
||||
guards["bs_work_min"] = MinMaxGuard(
|
||||
d["bs_z"], direction="more_than", limit_value=d["bs_z"].positions["work_min"]
|
||||
)
|
||||
guards["bs_pos_in"] = AtPositionGuard(d["bs_pos"], position="in")
|
||||
guards["gonx_out"] = MinMaxGuard(
|
||||
d["aerotech_x"], direction="less_than", limit_value=d["aerotech_x"].positions["out"]
|
||||
)
|
||||
guards["gonx_safe"] = AtPositionGuard(d["aerotech_x"], position="safe")
|
||||
guards["diag_y_out"] = MinMaxGuard(
|
||||
d["diag_y"], direction="less_than", limit_value=d["diag_y"].positions["out"]
|
||||
)
|
||||
guards["coll_y_out"] = MinMaxGuard(
|
||||
d["coll_y"], direction="less_than", limit_value=d["coll_y"].positions["out"]
|
||||
)
|
||||
return guards
|
||||
|
||||
|
||||
def attach_guards(d):
|
||||
"""Attach guards to devices."""
|
||||
g = guards_setup(d)
|
||||
d["diag_y"].guards.append(g["bs_work_min"].check)
|
||||
d["bl_pos"].guards.append(g["bs_max_blin"].check)
|
||||
d["bs_pos"].guards.append(g["bs_safe"].check)
|
||||
d["bs_z"].guards.append(g["bs_pos_in"].check)
|
||||
d["coll_y"].guards.append(g["bs_work_min"].check)
|
||||
@@ -0,0 +1,97 @@
|
||||
"""Initialise sample environment devices and beamline states"""
|
||||
|
||||
from dataclasses import dataclass
|
||||
|
||||
# from devices_manager import DeviceManager
|
||||
# from beamline_state_manager import DefineStatesManager
|
||||
# from dependencies import planner_deps
|
||||
# from beamline_planner import StateChangePlanner
|
||||
|
||||
@dataclass
|
||||
class Environment:
|
||||
|
||||
device_mocks: dict[str, bool] = None
|
||||
|
||||
def __post_init__(self):
|
||||
|
||||
if self.device_mocks is None:
|
||||
self.device_mocks = {
|
||||
"aerotech_x": "mock",
|
||||
"aerotech_y": "mock",
|
||||
"aerotech_z": "mock",
|
||||
"aerotech_u": "mock",
|
||||
"bl_bright": "mock",
|
||||
"bl_pos": "mock",
|
||||
"bs_pos": "mock",
|
||||
"bs_z": "mock",
|
||||
"coll_y": "mock",
|
||||
"cryo_pos": "mock",
|
||||
"det_cov": "mock",
|
||||
"diag_y": "mock",
|
||||
"fl_bright": "mock",
|
||||
"smargon_x": "mock",
|
||||
"smargon_y": "mock",
|
||||
"smargon_z": "mock",
|
||||
"smargon_chi": "mock",
|
||||
"smargon_phi": "mock",
|
||||
"xrf_pos": "mock",
|
||||
}
|
||||
|
||||
mocks = sorted(
|
||||
name
|
||||
for name, backend in self.device_mocks.items()
|
||||
if backend == "mock"
|
||||
)
|
||||
|
||||
reals = sorted(
|
||||
name
|
||||
for name, backend in self.device_mocks.items()
|
||||
if backend == "real"
|
||||
)
|
||||
|
||||
print(f"Mock devices ({len(mocks)}): {mocks}")
|
||||
print(f"Real devices ({len(reals)}): {reals}")
|
||||
|
||||
devdir = "/sls/x06da/config/bec/production/pxiii_bec/pxiii_bec/device_configs/"
|
||||
|
||||
state_devices_file: str = devdir + "pxiii-state-devices.yaml"
|
||||
rest_devices_file: str = devdir + "pxiii-rest-devices.yaml"
|
||||
states_file: str = devdir + "beamline_states.yaml"
|
||||
|
||||
@property
|
||||
def mock_devices(self):
|
||||
mock_names = set()
|
||||
for name, device in self.device_mocks.items():
|
||||
if device == "mock":
|
||||
mock_names.add(name)
|
||||
return mock_names
|
||||
|
||||
def init_beamline_environment():
|
||||
"""
|
||||
Initializes the beamline with real or mock devices.
|
||||
"""
|
||||
|
||||
env = Environment()
|
||||
|
||||
# Initialize devices
|
||||
device_manager = DeviceManager()
|
||||
devices = device_manager.initialize_devices(
|
||||
env.state_devices_file,
|
||||
env.rest_devices_file,
|
||||
env.mock_devices
|
||||
)
|
||||
|
||||
# Initialize states
|
||||
state_manager = DefineStatesManager()
|
||||
states, allow_modifiers = state_manager.initialize_states(env.states_file)
|
||||
|
||||
# Setup dependencies
|
||||
deps = planner_deps()
|
||||
|
||||
# Setup planner
|
||||
planner = StateChangePlanner(devices, states, allow_modifiers, deps)
|
||||
print("Initializing beamline state planner")
|
||||
return devices, planner
|
||||
|
||||
|
||||
|
||||
@@ -0,0 +1,63 @@
|
||||
"""Used for checking device positions match current state"""
|
||||
class DeviceMatcher:
|
||||
"""Class for checking device positions match current state"""
|
||||
def __init__(self):
|
||||
self._rules = {}
|
||||
|
||||
def register(self, device_name, func):
|
||||
"""Register a matching function for a device."""
|
||||
self._rules[device_name] = func
|
||||
|
||||
def matches(self, device_name, device, expected):
|
||||
"""Return True if device matches expected state."""
|
||||
if expected is None:
|
||||
return True # "don't care"
|
||||
|
||||
if device_name in self._rules:
|
||||
return self._rules[device_name](device, expected)
|
||||
|
||||
# default fallback
|
||||
return device.is_at(expected)
|
||||
|
||||
def explain(self, device_name, device, expected):
|
||||
"""Return True if device matches expected state."""
|
||||
val = device.readback.get()
|
||||
|
||||
if device_name in self._rules:
|
||||
ok = self._rules[device_name](device, expected)
|
||||
else:
|
||||
ok = device.is_at(expected)
|
||||
|
||||
return ok, val
|
||||
|
||||
def nonzero_is_on(device, expected, eps=1e-6):
|
||||
"""Define that anything > 0 is on"""
|
||||
val = device.actual
|
||||
|
||||
if expected == "off":
|
||||
return abs(val) < eps
|
||||
if expected == "on":
|
||||
return val > eps
|
||||
|
||||
return abs(val - expected) < eps
|
||||
|
||||
|
||||
# def threshold_rule(param_name):
|
||||
# def _rule(device, expected):
|
||||
# val = device.actual
|
||||
#
|
||||
# if expected == param_name:
|
||||
# threshold = device.position
|
||||
# return val >= threshold
|
||||
#
|
||||
# return device.is_at(expected)
|
||||
#
|
||||
# return _rule
|
||||
#
|
||||
#
|
||||
# def within_tolerance(tol):
|
||||
# def _rule(device, expected):
|
||||
# val = device.readback.get()
|
||||
# return abs(val - expected) < tol
|
||||
#
|
||||
# return _rule
|
||||
@@ -0,0 +1,77 @@
|
||||
"""Define guard policies for devices in the beamline."""
|
||||
|
||||
# from guards import GuardViolation, guards_setup
|
||||
|
||||
|
||||
def is_sample_area_clear_for_beamstop(d):
|
||||
"""Check if the sample area is clear of diag_y, coll_y, and gonx"""
|
||||
g = guards_setup(d)
|
||||
if g["diag_y_out"].check() and g["coll_y_out"].check() and g["gonx_out"].check():
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def is_sample_area_clear_for_gonx(d):
|
||||
"""Check if the sample area is clear of diag_y and bs_z"""
|
||||
g = guards_setup(d)
|
||||
if g["diag_y_out"].check() and g["bs_work_min"].check():
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def make_aerotech_x_policy(d):
|
||||
"""Create the policy for aerotech_x"""
|
||||
|
||||
def aerotech_x_policy(target):
|
||||
cfg = d["aerotech_x"].positions
|
||||
if target >= cfg["out"] and not is_sample_area_clear_for_gonx(d):
|
||||
raise GuardViolation("Sample area is not clear")
|
||||
|
||||
return aerotech_x_policy
|
||||
|
||||
|
||||
def make_bs_z_policy(d):
|
||||
"""Create the policy for bs_z"""
|
||||
|
||||
def bs_z_policy(target):
|
||||
"""Checks that the target position is within limits"""
|
||||
cfg = d["bs_z"].positions
|
||||
# Lower bound
|
||||
if target < cfg["work_min"] and not is_sample_area_clear_for_beamstop(d):
|
||||
raise GuardViolation("Sample area is not clear")
|
||||
if target < cfg["min"]:
|
||||
raise GuardViolation(
|
||||
f"Requested beamstop Z {target} is below recommended minimum {cfg['min']}"
|
||||
)
|
||||
# Upper bound
|
||||
if d["bl_pos"].pos == "in" and target > cfg["max_blin"]:
|
||||
raise GuardViolation(
|
||||
f"Beamstop Z cannot move beyond {cfg['max_blin']} when backlight is IN"
|
||||
)
|
||||
|
||||
return bs_z_policy
|
||||
|
||||
|
||||
def make_diag_y_policy(d):
|
||||
"""Create the policy for diag_y"""
|
||||
|
||||
def diag_y_policy(target):
|
||||
cfg = d["diag_y"].positions
|
||||
# Don't move in if the goniometer is in
|
||||
if d["aerotech_x"].actual >= d['aerotech_x'].positions['in'] and target > cfg["out"]:
|
||||
raise GuardViolation(
|
||||
f"Diagnostic device cannot move beyond {cfg['out']} mm when goniometer is not OUT"
|
||||
)
|
||||
# Don't move if cryocooler is in
|
||||
if d['cryo_pos'].pos == 'in' and target > cfg['out']:
|
||||
raise GuardViolation(
|
||||
f"Diagnostic device cannot move beyond {cfg['out']} mm when cryocooler is IN"
|
||||
)
|
||||
return diag_y_policy
|
||||
|
||||
|
||||
def attach_policies(d):
|
||||
"""Attach the policies to the devices"""
|
||||
d["bs_z"].policy = make_bs_z_policy(d)
|
||||
d["aerotech_x"].policy = make_aerotech_x_policy(d)
|
||||
d["diag_y"].policy = make_diag_y_policy(d)
|
||||
@@ -0,0 +1,254 @@
|
||||
"""Set up the positioned devices for mock or real motors"""
|
||||
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Callable, List, Dict, Optional, Union
|
||||
|
||||
import time
|
||||
|
||||
|
||||
class SimpleStatus:
|
||||
"""Makes a mock motor return a status"""
|
||||
|
||||
def __init__(self, motor, target, delay=0.0, success=True, name=""):
|
||||
self.motor = motor
|
||||
self.target = target
|
||||
self.delay = delay
|
||||
self._success = success
|
||||
self.name = name
|
||||
self._done = False
|
||||
|
||||
def wait(self, timeout=None):
|
||||
start = time.time()
|
||||
|
||||
while True:
|
||||
# simulate motion completion
|
||||
if not self._done:
|
||||
if time.time() - start >= self.delay:
|
||||
if self._success:
|
||||
self.motor.position = self.target
|
||||
self._done = True
|
||||
|
||||
if self._done:
|
||||
if not self._success:
|
||||
raise RuntimeError(f"Motor {self.name} failed")
|
||||
return True
|
||||
|
||||
if timeout is not None and (time.time() - start) > timeout:
|
||||
raise TimeoutError(f"Timeout waiting for {self.name}")
|
||||
|
||||
time.sleep(0.01)
|
||||
|
||||
|
||||
class MotorAdapter:
|
||||
"""Motor adapter for setting up mock/real motors"""
|
||||
|
||||
def move(self, pos: float):
|
||||
"""Move the motor to the given position"""
|
||||
raise NotImplementedError
|
||||
|
||||
def move_with_status(self, pos: float):
|
||||
"""Move the motor to the given position with a status"""
|
||||
raise NotImplementedError
|
||||
|
||||
def set_fail(self, value: bool):
|
||||
"""Put the motor into a failure state"""
|
||||
raise NotImplementedError
|
||||
|
||||
@property
|
||||
def actual(self) -> float:
|
||||
"""The actual position of the motor"""
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
class MockMotorAdapter(MotorAdapter):
|
||||
"""Motor adapter for mock motors"""
|
||||
|
||||
def __init__(self, name: str):
|
||||
self._motor = PositionDevice.MockMotor(name)
|
||||
|
||||
def move(self, pos: float):
|
||||
"""Move the motor to the given position"""
|
||||
self._motor.move(pos)
|
||||
|
||||
def move_with_status(self, pos: float):
|
||||
"""Move the motor to the given position with a status"""
|
||||
if self._motor.fail:
|
||||
return SimpleStatus(
|
||||
motor=self._motor,
|
||||
target=pos,
|
||||
delay=self._motor.delay,
|
||||
success=False,
|
||||
name=self._motor.name,
|
||||
)
|
||||
|
||||
# don't update position immediately
|
||||
return SimpleStatus(
|
||||
motor=self._motor,
|
||||
target=pos,
|
||||
delay=self._motor.delay,
|
||||
success=True,
|
||||
name=self._motor.name,
|
||||
)
|
||||
|
||||
def set_fail(self, value: bool):
|
||||
"""Put the motor into a failure state"""
|
||||
self._motor.fail = value
|
||||
|
||||
def set_delay(self, value: float):
|
||||
self._motor.delay = value
|
||||
|
||||
@property
|
||||
def actual(self) -> float:
|
||||
"""The actual position of the motor"""
|
||||
return self._motor.position
|
||||
|
||||
|
||||
class RealMotorAdapter(MotorAdapter):
|
||||
"""Motor adapter for real motors"""
|
||||
|
||||
def __init__(self, mot, name):
|
||||
self._motor = mot
|
||||
self._name = name
|
||||
|
||||
def move(self, pos: float):
|
||||
"""Move the motor to the given position"""
|
||||
scans.umv(self._motor, pos, relative=False)
|
||||
|
||||
def move_with_status(self, pos: float):
|
||||
"""Move the motor to the given position with a status"""
|
||||
return scans.mv(self._motor, pos, relative=False)
|
||||
|
||||
def set_fail(self, value: bool):
|
||||
"""Put the motor into a failure state"""
|
||||
raise NotImplementedError
|
||||
|
||||
@property
|
||||
def actual(self) -> float:
|
||||
"""The actual position of the motor"""
|
||||
return self._motor.read()[self._name]["value"]
|
||||
|
||||
|
||||
@dataclass
|
||||
class PositionDevice:
|
||||
"""Generic device that moves between named or numeric positions"""
|
||||
|
||||
bec_name: str
|
||||
mot_device: any = None
|
||||
positions: Dict[str, float] = field(default_factory=dict)
|
||||
tol: float = 0.1
|
||||
# device_timeout = 3
|
||||
guards: List[Callable[[], None]] = field(default_factory=list)
|
||||
policy: Optional[Callable[[float], None]] = None
|
||||
allow_arbitrary: bool = False
|
||||
use_mock: bool = True
|
||||
|
||||
def __post_init__(self):
|
||||
if self.use_mock:
|
||||
self.mot = MockMotorAdapter(self.bec_name)
|
||||
else:
|
||||
# real_motor = getattr(dev, self.bec_name)
|
||||
self.mot = RealMotorAdapter(self.mot_device, self.bec_name)
|
||||
# self.mot = RealMotorAdapter(real_motor, self.bec_name)
|
||||
# self.mot = getattr(dev, self.bec_name)
|
||||
|
||||
# Normalize position names
|
||||
self.positions = {k.lower(): v for k, v in self.positions.items()}
|
||||
|
||||
def _check_guards(self):
|
||||
"""Check if guards exist"""
|
||||
for g in self.guards:
|
||||
g()
|
||||
|
||||
def _resolve_target(self, target: Union[str, float]) -> float:
|
||||
"""Convert target into a motor position"""
|
||||
|
||||
if isinstance(target, str):
|
||||
name = target.lower()
|
||||
|
||||
if name not in self.positions:
|
||||
raise ValueError(f"Unknown position '{target}'")
|
||||
|
||||
return self.positions[name]
|
||||
|
||||
if isinstance(target, (float, int)):
|
||||
if not self.allow_arbitrary:
|
||||
raise ValueError(f"{self.bec_name} only accepts named positions")
|
||||
return float(target)
|
||||
|
||||
raise TypeError("Target must be str or float")
|
||||
|
||||
def move(self, target: Union[str, float]):
|
||||
"""Move devices"""
|
||||
pos = self._resolve_target(target)
|
||||
|
||||
self._check_guards()
|
||||
|
||||
if self.policy:
|
||||
self.policy(pos)
|
||||
|
||||
self.mot.move(pos)
|
||||
|
||||
def mv(self, target: Union[str, float]):
|
||||
"""move devices with a timeout"""
|
||||
|
||||
pos = self._resolve_target(target)
|
||||
|
||||
self._check_guards()
|
||||
|
||||
if self.policy:
|
||||
self.policy(pos)
|
||||
|
||||
# status.wait(self.device_timeout)
|
||||
return self.mot.move_with_status(pos)
|
||||
|
||||
def set_position(self, target: Union[str, float]):
|
||||
"""Only to be used for testing purposes, bypasses guards"""
|
||||
pos = self._resolve_target(target)
|
||||
self.mot.move(pos)
|
||||
|
||||
@property
|
||||
def actual(self) -> float:
|
||||
"""Return the actual position of the device."""
|
||||
# return self.mot.read()[self.bec_name]["value"]
|
||||
return self.mot.actual
|
||||
|
||||
@property
|
||||
def pos(self) -> str:
|
||||
"""Return the closest matching position"""
|
||||
|
||||
for name, pos in self.positions.items():
|
||||
if abs(self.actual - pos) <= self.tol:
|
||||
return name
|
||||
|
||||
return "unknown"
|
||||
|
||||
#
|
||||
|
||||
def is_at(self, target: Union[str, float]) -> bool:
|
||||
"""Return True if the device is at the given position."""
|
||||
|
||||
pos = self._resolve_target(target)
|
||||
return abs(self.actual - pos) <= self.tol
|
||||
|
||||
# -------------------------
|
||||
# Mock Motor Implementation
|
||||
# -------------------------
|
||||
|
||||
class MockMotor:
|
||||
"""Mock motor implementation"""
|
||||
|
||||
def __init__(self, name: str):
|
||||
self.name = name
|
||||
self.position = 0.0
|
||||
self._target = self.position
|
||||
self.fail = False
|
||||
self.delay = 0
|
||||
|
||||
#
|
||||
def move(self, pos: float):
|
||||
"""Move the motor to the given position."""
|
||||
if self.fail:
|
||||
return
|
||||
# raise RuntimeError(f"Motor {self.name} failed")
|
||||
time.sleep(self.delay)
|
||||
self.position = pos
|
||||
Reference in New Issue
Block a user