WIP ptycho_floimni_sim simulation for local development

This commit is contained in:
2026-03-01 19:06:57 +01:00
parent 116ef7d360
commit d198f88387
3 changed files with 956 additions and 0 deletions

View File

@@ -0,0 +1,480 @@
############################################################
#################### flOMNI Galil motors ###################
############################################################
feyex:
description: Xray eye X
deviceClass: csaxs_bec.devices.sim.flomni_sim.FlomniSimPositioner
deviceConfig:
axis_Id: D
host: mpc2844.psi.ch
limits:
- -30
- -1
port: 8082
sign: 1
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
userParameter:
in: -16.267
out: -1
feyey:
description: Xray eye Y
deviceClass: csaxs_bec.devices.sim.flomni_sim.FlomniSimPositioner
deviceConfig:
axis_Id: E
host: mpc2844.psi.ch
limits:
- -1
- -10
port: 8082
sign: 1
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
userParameter:
in: -10.467
fheater:
description: Heater Y
deviceClass: csaxs_bec.devices.sim.flomni_sim.FlomniSimPositioner
deviceConfig:
axis_Id: C
host: mpc2844.psi.ch
limits:
- -15
- 0
port: 8082
sign: -1
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
foptx:
description: Optics X
deviceClass: csaxs_bec.devices.sim.flomni_sim.FlomniSimPositioner
deviceConfig:
axis_Id: B
host: mpc2844.psi.ch
limits:
- -17
- -12
port: 8082
sign: 1
enabled: true
onFailure: buffer
readOnly: true
readoutPriority: baseline
connectionTimeout: 20
userParameter:
in: -13.761
fopty:
description: Optics Y
deviceClass: csaxs_bec.devices.sim.flomni_sim.FlomniSimPositioner
deviceConfig:
axis_Id: F
host: mpc2844.psi.ch
limits:
- 0
- 4
port: 8082
sign: 1
enabled: true
onFailure: buffer
readOnly: true
readoutPriority: baseline
connectionTimeout: 20
userParameter:
in: 0.552
out: 0.752
foptz:
description: Optics Z
deviceClass: csaxs_bec.devices.sim.flomni_sim.FlomniSimPositioner
deviceConfig:
axis_Id: A
host: mpc2844.psi.ch
limits:
- 0
- 27
port: 8082
sign: 1
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
userParameter:
in: 23
fsamroy:
description: Sample rotation
deviceClass: csaxs_bec.devices.sim.flomni_sim.FlomniSimPositioner
deviceConfig:
axis_Id: A
host: mpc2844.psi.ch
limits:
- -5
- 365
port: 8084
sign: -1
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
fsamx:
description: Sample coarse X
deviceClass: csaxs_bec.devices.sim.flomni_sim.FlomniSimPositioner
deviceConfig:
axis_Id: E
host: mpc2844.psi.ch
limits:
- -162
- 0
port: 8081
sign: 1
enabled: true
onFailure: buffer
readOnly: true
readoutPriority: baseline
connectionTimeout: 20
userParameter:
in: -1.1
fsamy:
description: Sample coarse Y
deviceClass: csaxs_bec.devices.sim.flomni_sim.FlomniSimPositioner
deviceConfig:
axis_Id: F
host: mpc2844.psi.ch
limits:
- 2
- 3.1
port: 8081
sign: 1
enabled: true
onFailure: buffer
readOnly: true
readoutPriority: baseline
connectionTimeout: 20
userParameter:
in: 2.75
ftracky:
description: Laser Tracker coarse Y
deviceClass: csaxs_bec.devices.sim.flomni_sim.FlomniSimPositioner
deviceConfig:
axis_Id: H
host: mpc2844.psi.ch
limits:
- 2.2
- 2.8
port: 8082
sign: 1
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
ftrackz:
description: Laser Tracker coarse Z
deviceClass: csaxs_bec.devices.sim.flomni_sim.FlomniSimPositioner
deviceConfig:
axis_Id: G
host: mpc2844.psi.ch
limits:
- 4.5
- 5.5
port: 8082
sign: 1
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
ftransx:
description: Sample transer X
deviceClass: csaxs_bec.devices.sim.flomni_sim.FlomniSimPositioner
deviceConfig:
axis_Id: C
host: mpc2844.psi.ch
limits:
- 0
- 50
port: 8081
sign: 1
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
ftransy:
description: Sample transer Y
deviceClass: csaxs_bec.devices.sim.flomni_sim.FlomniSimPositioner
deviceConfig:
axis_Id: A
host: mpc2844.psi.ch
limits:
- -100
- 0
port: 8081
sign: 1
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
userParameter:
sensor_voltage: -2.4
ftransz:
description: Sample transer Z
deviceClass: csaxs_bec.devices.sim.flomni_sim.FlomniSimPositioner
deviceConfig:
axis_Id: B
host: mpc2844.psi.ch
limits:
- 0
- 145
port: 8081
sign: 1
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
ftray:
description: Sample transfer tray
deviceClass: csaxs_bec.devices.sim.flomni_sim.FlomniSimPositioner
deviceConfig:
axis_Id: D
host: mpc2844.psi.ch
limits:
- -200
- 0
port: 8081
sign: -1
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
############################################################
#################### flOMNI Sample Names ###################
############################################################
flomni_samples:
description: Sample names and storage
deviceClass: csaxs_bec.devices.sim.flomni_sim.FlomniSimSampleStorage
deviceConfig: {}
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: baseline
############################################################
#################### flOMNI Smaract motors #################
############################################################
fosax:
description: OSA X
deviceClass: csaxs_bec.devices.sim.flomni_sim.FlomniSimPositioner
deviceConfig:
axis_Id: A
host: mpc2844.psi.ch
limits:
- 10.2
- 10.6
port: 3334
sign: -1
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
userParameter:
in: 9.124
out: 5.3
fosay:
description: OSA Y
deviceClass: csaxs_bec.devices.sim.flomni_sim.FlomniSimPositioner
deviceConfig:
axis_Id: B
host: mpc2844.psi.ch
limits:
- -3.1
- -2.9
port: 3334
sign: -1
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
userParameter:
in: 0.367
fosaz:
description: OSA Z
deviceClass: csaxs_bec.devices.sim.flomni_sim.FlomniSimPositioner
deviceConfig:
axis_Id: C
host: mpc2844.psi.ch
limits:
- -6
- -4
port: 3334
sign: 1
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
userParameter:
in: 8.5
out: 6
############################################################
#################### flOMNI RT motors ######################
############################################################
rtx:
description: flomni rt
deviceClass: csaxs_bec.devices.sim.flomni_sim.FlomniSimPositioner
deviceConfig:
axis_Id: A
host: mpc2844.psi.ch
port: 2222
sign: 1
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: on_request
connectionTimeout: 20
userParameter:
low_signal: 10000
min_signal: 9000
rt_pid_voltage: -0.06219
rty:
description: flomni rt
deviceClass: csaxs_bec.devices.sim.flomni_sim.FlomniSimPositioner
deviceConfig:
axis_Id: B
host: mpc2844.psi.ch
port: 2222
sign: 1
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: on_request
connectionTimeout: 20
userParameter:
tomo_additional_offsety: 0
rtz:
description: flomni rt
deviceClass: csaxs_bec.devices.sim.flomni_sim.FlomniSimPositioner
deviceConfig:
axis_Id: C
host: mpc2844.psi.ch
port: 2222
sign: 1
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: on_request
connectionTimeout: 20
############################################################
####################### Cameras ############################
############################################################
cam_flomni_gripper:
description: Camera sample changer
deviceClass: csaxs_bec.devices.sim.flomni_sim.FlomniSimCamera
deviceConfig:
url: http://flomnicamserver:5000/video_high
num_rotation_90: 3
transpose: false
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: on_request
cam_flomni_overview:
description: Camera flomni overview
deviceClass: csaxs_bec.devices.sim.flomni_sim.FlomniSimCamera
deviceConfig:
url: http://flomnicamserver:5001/video_high
num_rotation_90: 3
transpose: false
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: on_request
cam_xeye:
description: Camera flOMNI Xray eye ID1
deviceClass: csaxs_bec.devices.sim.flomni_sim.FlomniSimCamera
deviceConfig:
camera_id: 1
bits_per_pixel: 24
num_rotation_90: 3
transpose: false
force_monochrome: true
m_n_colormode: 1
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: async
# cam_ids_rgb:
# description: Camera flOMNI Xray eye ID203
# deviceClass: csaxs_bec.devices.sim.flomni_sim.FlomniSimCamera
# deviceConfig:
# camera_id: 203
# bits_per_pixel: 24
# num_rotation_90: 2
# transpose: false
# force_monochrome: false
# m_n_colormode: 1
# enabled: true
# onFailure: buffer
# readOnly: false
# readoutPriority: async
# ############################################################
# ################### flOMNI temperatures ####################
# ############################################################
flomni_temphum:
description: flOMNI Temperatures and humidity
deviceClass: csaxs_bec.devices.sim.flomni_sim.FlomniSimTempHum
deviceConfig: {}
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: baseline
# ############################################################
# ########## OMNY / flOMNI / LamNI fast shutter ##############
# ############################################################
omnyfsh:
description: omnyfsh connects to fast shutter at X12 if device fsh exists
deviceClass: csaxs_bec.devices.sim.flomni_sim.FlomniSimFastShutter
deviceConfig: {}
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: baseline
############################################################
#################### GUI Signals ###########################
############################################################
omny_xray_gui:
description: Gui signals
deviceClass: csaxs_bec.devices.sim.flomni_sim.FlomniSimXRayAlignGUI
deviceConfig: {}
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: on_request

View File

@@ -0,0 +1,2 @@
"""Simulation devices for local csaxs_bec development."""

View File

@@ -0,0 +1,474 @@
from __future__ import annotations
import json
import threading
import numpy as np
from ophyd import Component as Cpt
from ophyd import Device, Kind, Signal
from ophyd import DynamicDeviceComponent as Dcpt
from ophyd_devices import SimPositioner
from ophyd_devices.utils.bec_signals import PreviewSignal
from prettytable import PrettyTable
class FlomniSimController:
"""Controller shim that exposes FLOMNI-specific controller APIs for local simulation."""
USER_ACCESS = [
"socket_put_and_receive",
"socket_put_confirmed",
"drive_axis_to_limit",
"get_motor_limit_switch",
"all_axes_referenced",
"axis_is_referenced",
"is_motor_on",
"feedback_disable",
"feedback_enable_with_reset",
"feedback_enable_without_reset",
"feedback_is_running",
"laser_tracker_on",
"laser_tracker_off",
"laser_tracker_show_all",
"laser_tracker_check_signalstrength",
"laser_tracker_check_enabled",
"show_signal_strength_interferometer",
"read_ssi_interferometer",
"show_cyclic_error_compensation",
"get_pid_x",
"set_rotation_angle",
"fosaz_light_curtain_is_triggered",
"lights_off",
"lights_on",
"galil_show_all",
"move_open_loop_steps",
"find_reference_mark",
]
def __init__(self, device: FlomniSimPositioner):
self.device = device
self.connected = True
self._feedback_running = False
self._laser_tracker_enabled = False
self._lights_on = True
self._mount_mode = False
self._confirm = 0
self._pid_x = -0.06
self._rt_angle = 0.0
self._sensor_voltage = -2.4
self.tracker_info = {
"enabled_y": True,
"enabled_z": True,
"tracker_intensity": 15000,
"threshold_intensity_y": 9000,
"threshold_intensity_z": 9000,
}
def socket_put_and_receive(self, cmd: str) -> str:
cmd_norm = cmd.strip().lower()
if "mg mntmod" in cmd_norm:
return "1" if self._mount_mode else "0"
if "mg mntprgs" in cmd_norm:
return "0"
if "mg confirm" in cmd_norm:
return str(self._confirm)
if "mg @out[9]" in cmd_norm:
return "0"
if "mg@an[1]" in cmd_norm:
return str(self._sensor_voltage)
if "mg @in[14]" in cmd_norm:
return "0"
if "axisref" in cmd_norm:
return "1"
if cmd_norm == "g":
return str(self._pid_x)
if cmd_norm == "o":
return "1"
if cmd_norm.startswith("tp") or cmd_norm.startswith("td"):
return str(float(self.device.readback.get()))
if cmd_norm.startswith("mg_bg"):
return "0"
return "0"
def socket_put_confirmed(self, cmd: str) -> None:
cmd_norm = cmd.strip().lower()
if "xq#mntmode" in cmd_norm:
self._mount_mode = True
elif "xq#posmode" in cmd_norm:
self._mount_mode = False
elif cmd_norm.startswith("confirm="):
try:
self._confirm = int(float(cmd_norm.split("=")[1]))
except ValueError:
self._confirm = 0
elif cmd_norm == "l0":
self._feedback_running = False
elif cmd_norm in ("l1", "l3"):
self._feedback_running = True
elif cmd_norm == "t0":
self._laser_tracker_enabled = False
elif cmd_norm == "t1":
self._laser_tracker_enabled = True
elif cmd_norm == "cb15":
self._lights_on = False
elif cmd_norm == "sb15":
self._lights_on = True
def drive_axis_to_limit(self, axis_Id_numeric=None, direction: str = "forward") -> None:
low, high = self.device.limits
if low >= high:
return
target = high if direction == "forward" else low
status = self.device.move(target)
status.wait(timeout=2)
def get_motor_limit_switch(self, axis_id=None) -> tuple[bool, bool]:
low, high = self.device.limits
if low >= high:
return (False, False)
pos = float(self.device.readback.get())
eps = max(self.device.tolerance.get(), 0.05)
return (abs(pos - low) <= eps, abs(pos - high) <= eps)
def all_axes_referenced(self) -> bool:
return True
def axis_is_referenced(self, axis_id_numeric=None) -> bool:
return True
def is_motor_on(self, axis_id=None) -> bool:
return True
def feedback_disable(self) -> None:
self._feedback_running = False
def feedback_enable_with_reset(self) -> None:
self._feedback_running = True
def feedback_enable_without_reset(self) -> None:
self._feedback_running = True
def feedback_is_running(self) -> bool:
return self._feedback_running
def laser_tracker_on(self) -> None:
self._laser_tracker_enabled = True
def laser_tracker_off(self) -> None:
self._laser_tracker_enabled = False
def laser_tracker_show_all(self) -> None:
t = PrettyTable()
t.title = "Simulated Laser Tracker"
t.field_names = ["name", "value"]
for key, val in self.tracker_info.items():
t.add_row([key, val])
print(t)
def laser_tracker_check_signalstrength(self) -> bool:
return True
def laser_tracker_check_enabled(self) -> bool:
return self._laser_tracker_enabled
def show_signal_strength_interferometer(self) -> None:
print("Simulated interferometer signal strength: OK")
def read_ssi_interferometer(self) -> float:
return 12000.0
def show_cyclic_error_compensation(self) -> None:
print("Simulated cyclic error compensation: initialized")
def get_pid_x(self) -> float:
return self._pid_x
def set_rotation_angle(self, val: float) -> None:
self._rt_angle = float(val)
def fosaz_light_curtain_is_triggered(self) -> bool:
return False
def lights_off(self) -> None:
self._lights_on = False
def lights_on(self) -> None:
self._lights_on = True
def galil_show_all(self) -> None:
print(f"Simulated controller for {self.device.name} connected: {self.connected}")
def move_open_loop_steps(self, axis_id_numeric: int, steps: int, **_kwargs) -> None:
# no-op in simulation
_ = axis_id_numeric, steps
def find_reference_mark(self, axis_id_numeric: int, *_args) -> None:
# no-op in simulation
_ = axis_id_numeric
class JSONSafeSignal(Signal):
"""Signal that coerces complex Python objects to JSON strings for ophyd describe()."""
def _coerce(self, value):
if isinstance(value, np.ndarray):
value = value.tolist()
if isinstance(value, (dict, list, tuple)):
return json.dumps(value)
return value
def put(self, value, *args, **kwargs):
return super().put(self._coerce(value), *args, **kwargs)
def set(self, value, *args, **kwargs):
return super().set(self._coerce(value), *args, **kwargs)
class FlomniSimPositioner(SimPositioner):
"""Simulated positioner that preserves FLOMNI controller-style API."""
USER_ACCESS = ["sim", "readback", "registered_proxies", "controller"]
def __init__(
self,
name: str,
*,
axis_Id: str | None = None,
host: str | None = None,
port: int | None = None,
sign: int = 1,
limits: list[float] | tuple[float, float] | None = None,
**kwargs,
):
super().__init__(name=name, limits=limits, **kwargs)
self.axis_Id = axis_Id
self.host = host
self.port = port
self.sign = sign
self.controller = FlomniSimController(self)
@property
def axis_Id_numeric(self) -> int | None:
if not self.axis_Id:
return None
return ord(self.axis_Id.lower()) - 97
class FlomniSimCamera(Device):
"""Lightweight camera simulation with live mode controls for FLOMNI/OMNY GUI usage."""
USER_ACCESS = ["start_live_mode", "stop_live_mode", "live_mode_enabled", "image", "preview"]
image = Cpt(PreviewSignal, name="image", ndim=2, num_rotation_90=0, transpose=False)
preview = Cpt(PreviewSignal, name="preview", ndim=2, num_rotation_90=0, transpose=False)
live_mode_enabled = Cpt(
Signal,
name="live_mode_enabled",
value=False,
kind=Kind.config,
)
def __init__(
self,
name: str,
*,
camera_id: int | None = None,
camera_ID: int | None = None,
url: str | None = None,
bits_per_pixel: int = 8,
channels: int = 1,
m_n_colormode: int = 0,
num_rotation_90: int = 0,
transpose: bool = False,
force_monochrome: bool = False,
**kwargs,
):
super().__init__(name=name, **kwargs)
self.camera_id = camera_id if camera_id is not None else camera_ID
self.url = url
self.bits_per_pixel = bits_per_pixel
self.channels = channels
self.m_n_colormode = m_n_colormode
self.force_monochrome = force_monochrome
self.image.num_rotation_90 = num_rotation_90
self.image.transpose = transpose
self.preview.num_rotation_90 = num_rotation_90
self.preview.transpose = transpose
self._frame_number = 0
self._shutdown_event = threading.Event()
self._live_mode_thread: threading.Thread | None = None
self.live_mode_enabled.subscribe(self._on_live_mode_enabled_changed, run=False)
def start_live_mode(self) -> None:
self.live_mode_enabled.put(True)
def stop_live_mode(self) -> None:
self.live_mode_enabled.put(False)
def _on_live_mode_enabled_changed(self, *args, value, **kwargs):
_ = args, kwargs
if bool(value):
self._start_live_mode()
else:
self._stop_live_mode()
def _start_live_mode(self) -> None:
if self._live_mode_thread and self._live_mode_thread.is_alive():
return
self._shutdown_event.clear()
self._live_mode_thread = threading.Thread(target=self._live_mode_loop, daemon=True)
self._live_mode_thread.start()
def _stop_live_mode(self) -> None:
if self._live_mode_thread is None:
return
self._shutdown_event.set()
self._live_mode_thread.join(timeout=1)
self._live_mode_thread = None
self._shutdown_event.clear()
def _live_mode_loop(self) -> None:
while not self._shutdown_event.is_set():
frame = self._generate_frame()
self.image.put(frame)
self.preview.put(frame)
self._shutdown_event.wait(0.2)
def _generate_frame(self) -> np.ndarray:
self._frame_number += 1
x = np.linspace(0, 255, 320, dtype=np.uint8)
y = np.linspace(0, 255, 240, dtype=np.uint8)[:, None]
frame = ((x + y + self._frame_number) % 255).astype(np.uint8)
return frame
def destroy(self):
self._stop_live_mode()
return super().destroy()
class FlomniSimFastShutter(Device):
USER_ACCESS = ["fshopen", "fshclose", "fshstatus", "fshinfo", "fshstatus_readback"]
shutter = Cpt(Signal, name="shutter", value=0)
def fshopen(self):
self.shutter.put(1)
def fshclose(self):
self.shutter.put(0)
def fshstatus(self):
return int(self.shutter.get())
def fshstatus_readback(self):
return int(self.shutter.get())
def fshinfo(self):
print("Using simulated fast shutter.")
class FlomniSimXRayAlignGUI(Device):
"""Signal-only X-ray alignment GUI backend for simulation."""
update_frame_acqdone = Cpt(Signal, value=0)
update_frame_acq = Cpt(Signal, value=0)
enable_mv_x = Cpt(Signal, value=0)
enable_mv_y = Cpt(Signal, value=0)
send_message = Cpt(Signal, value="")
sample_name = Cpt(Signal, value="")
angle = Cpt(Signal, value=0)
pixel_size = Cpt(Signal, value=0)
submit = Cpt(Signal, name="submit", value=0)
step = Cpt(Signal, value=0)
recbg = Cpt(Signal, value=0)
mvx = Cpt(Signal, value=0)
mvy = Cpt(Signal, value=0)
fit_array = Cpt(Signal, value=np.zeros((3, 10)))
fit_params_x = Cpt(JSONSafeSignal, value="{}")
fit_params_y = Cpt(JSONSafeSignal, value="{}")
for i in range(11):
locals()[f"width_y_{i}"] = Cpt(Signal, value=0)
for i in range(11):
locals()[f"width_x_{i}"] = Cpt(Signal, value=0)
for i in range(11):
locals()[f"xval_x_{i}"] = Cpt(Signal, value=0)
for i in range(11):
locals()[f"yval_y_{i}"] = Cpt(Signal, value=0)
for i in range(1, 6):
locals()[f"stage_pos_x_{i}"] = Cpt(Signal, value=0)
class FlomniSimSampleStorage(Device):
USER_ACCESS = [
"is_sample_slot_used",
"is_sample_in_gripper",
"set_sample_slot",
"unset_sample_slot",
"set_sample_in_gripper",
"unset_sample_in_gripper",
"get_sample_name",
"show_all",
]
sample_placed = Dcpt({f"sample{i}": (Signal, None, {"value": 0}) for i in range(21)})
sample_names = Dcpt({f"sample{i}": (Signal, None, {"value": "-"}) for i in range(21)})
sample_in_gripper = Cpt(Signal, name="sample_in_gripper", value=0)
sample_in_gripper_name = Cpt(Signal, name="sample_in_gripper_name", value="-")
def set_sample_slot(self, slot_nr: int, name: str) -> None:
getattr(self.sample_placed, f"sample{slot_nr}").set(1)
getattr(self.sample_names, f"sample{slot_nr}").set(name)
def unset_sample_slot(self, slot_nr: int) -> None:
getattr(self.sample_placed, f"sample{slot_nr}").set(0)
getattr(self.sample_names, f"sample{slot_nr}").set("-")
def set_sample_in_gripper(self, name: str) -> None:
self.sample_in_gripper.set(1)
self.sample_in_gripper_name.set(name)
def unset_sample_in_gripper(self) -> None:
self.sample_in_gripper.set(0)
self.sample_in_gripper_name.set("-")
def is_sample_slot_used(self, slot_nr: int) -> bool:
return bool(getattr(self.sample_placed, f"sample{slot_nr}").get())
def is_sample_in_gripper(self) -> bool:
return bool(self.sample_in_gripper.get())
def get_sample_name(self, slot_nr: int) -> str:
return str(getattr(self.sample_names, f"sample{slot_nr}").get())
def show_all(self) -> None:
t = PrettyTable()
t.title = "flOMNI sample storage (sim)"
t.field_names = ["slot", "used", "name"]
for slot in range(21):
t.add_row([slot, int(self.is_sample_slot_used(slot)), self.get_sample_name(slot)])
print(t)
class FlomniSimTempHum(Device):
USER_ACCESS = ["show_all", "help"]
temperature_mirror = Cpt(Signal, value=22.0)
temperature_osa = Cpt(Signal, value=22.0)
temperature_heater = Cpt(Signal, value=23.0)
humidity_sensor1 = Cpt(Signal, value=35.0)
humidity_sensor2 = Cpt(Signal, value=34.0)
flow = Cpt(Signal, value=2.0)
suction = Cpt(Signal, value=1.0)
def show_all(self):
print("=== flOMNI Temperature & Humidity (sim) ===")
print(f"Mirror temperature: {self.temperature_mirror.get():.1f} C")
print(f"OSA temperature: {self.temperature_osa.get():.1f} C")
print(f"Heater temperature: {self.temperature_heater.get():.1f} C")
print(f"Humidity sensor 1: {self.humidity_sensor1.get():.1f} %RH")
print(f"Humidity sensor 2: {self.humidity_sensor2.get():.1f} %RH")
print(f"Flow: {self.flow.get():.1f} sccm")
print(f"Suction: {self.suction.get():.1f}")
def help(self):
print("show_all() - display current simulated values")