diff --git a/csaxs_bec/device_configs/ptycho_floimni_sim.yaml b/csaxs_bec/device_configs/ptycho_floimni_sim.yaml new file mode 100644 index 0000000..c9be711 --- /dev/null +++ b/csaxs_bec/device_configs/ptycho_floimni_sim.yaml @@ -0,0 +1,480 @@ +############################################################ +#################### flOMNI Galil motors ################### +############################################################ + +feyex: + description: Xray eye X + deviceClass: csaxs_bec.devices.sim.flomni_sim.FlomniSimPositioner + deviceConfig: + axis_Id: D + host: mpc2844.psi.ch + limits: + - -30 + - -1 + port: 8082 + sign: 1 + enabled: true + onFailure: buffer + readOnly: false + readoutPriority: baseline + connectionTimeout: 20 + userParameter: + in: -16.267 + out: -1 +feyey: + description: Xray eye Y + deviceClass: csaxs_bec.devices.sim.flomni_sim.FlomniSimPositioner + deviceConfig: + axis_Id: E + host: mpc2844.psi.ch + limits: + - -1 + - -10 + port: 8082 + sign: 1 + enabled: true + onFailure: buffer + readOnly: false + readoutPriority: baseline + connectionTimeout: 20 + userParameter: + in: -10.467 +fheater: + description: Heater Y + deviceClass: csaxs_bec.devices.sim.flomni_sim.FlomniSimPositioner + deviceConfig: + axis_Id: C + host: mpc2844.psi.ch + limits: + - -15 + - 0 + port: 8082 + sign: -1 + enabled: true + onFailure: buffer + readOnly: false + readoutPriority: baseline + connectionTimeout: 20 +foptx: + description: Optics X + deviceClass: csaxs_bec.devices.sim.flomni_sim.FlomniSimPositioner + deviceConfig: + axis_Id: B + host: mpc2844.psi.ch + limits: + - -17 + - -12 + port: 8082 + sign: 1 + enabled: true + onFailure: buffer + readOnly: true + readoutPriority: baseline + connectionTimeout: 20 + userParameter: + in: -13.761 +fopty: + description: Optics Y + deviceClass: csaxs_bec.devices.sim.flomni_sim.FlomniSimPositioner + deviceConfig: + axis_Id: F + host: mpc2844.psi.ch + limits: + - 0 + - 4 + port: 8082 + sign: 1 + enabled: true + onFailure: buffer + readOnly: true + readoutPriority: baseline + connectionTimeout: 20 + userParameter: + in: 0.552 + out: 0.752 +foptz: + description: Optics Z + deviceClass: csaxs_bec.devices.sim.flomni_sim.FlomniSimPositioner + deviceConfig: + axis_Id: A + host: mpc2844.psi.ch + limits: + - 0 + - 27 + port: 8082 + sign: 1 + enabled: true + onFailure: buffer + readOnly: false + readoutPriority: baseline + connectionTimeout: 20 + userParameter: + in: 23 +fsamroy: + description: Sample rotation + deviceClass: csaxs_bec.devices.sim.flomni_sim.FlomniSimPositioner + deviceConfig: + axis_Id: A + host: mpc2844.psi.ch + limits: + - -5 + - 365 + port: 8084 + sign: -1 + enabled: true + onFailure: buffer + readOnly: false + readoutPriority: baseline + connectionTimeout: 20 +fsamx: + description: Sample coarse X + deviceClass: csaxs_bec.devices.sim.flomni_sim.FlomniSimPositioner + deviceConfig: + axis_Id: E + host: mpc2844.psi.ch + limits: + - -162 + - 0 + port: 8081 + sign: 1 + enabled: true + onFailure: buffer + readOnly: true + readoutPriority: baseline + connectionTimeout: 20 + userParameter: + in: -1.1 +fsamy: + description: Sample coarse Y + deviceClass: csaxs_bec.devices.sim.flomni_sim.FlomniSimPositioner + deviceConfig: + axis_Id: F + host: mpc2844.psi.ch + limits: + - 2 + - 3.1 + port: 8081 + sign: 1 + enabled: true + onFailure: buffer + readOnly: true + readoutPriority: baseline + connectionTimeout: 20 + userParameter: + in: 2.75 +ftracky: + description: Laser Tracker coarse Y + deviceClass: csaxs_bec.devices.sim.flomni_sim.FlomniSimPositioner + deviceConfig: + axis_Id: H + host: mpc2844.psi.ch + limits: + - 2.2 + - 2.8 + port: 8082 + sign: 1 + enabled: true + onFailure: buffer + readOnly: false + readoutPriority: baseline + connectionTimeout: 20 +ftrackz: + description: Laser Tracker coarse Z + deviceClass: csaxs_bec.devices.sim.flomni_sim.FlomniSimPositioner + deviceConfig: + axis_Id: G + host: mpc2844.psi.ch + limits: + - 4.5 + - 5.5 + port: 8082 + sign: 1 + enabled: true + onFailure: buffer + readOnly: false + readoutPriority: baseline + connectionTimeout: 20 +ftransx: + description: Sample transer X + deviceClass: csaxs_bec.devices.sim.flomni_sim.FlomniSimPositioner + deviceConfig: + axis_Id: C + host: mpc2844.psi.ch + limits: + - 0 + - 50 + port: 8081 + sign: 1 + enabled: true + onFailure: buffer + readOnly: false + readoutPriority: baseline + connectionTimeout: 20 +ftransy: + description: Sample transer Y + deviceClass: csaxs_bec.devices.sim.flomni_sim.FlomniSimPositioner + deviceConfig: + axis_Id: A + host: mpc2844.psi.ch + limits: + - -100 + - 0 + port: 8081 + sign: 1 + enabled: true + onFailure: buffer + readOnly: false + readoutPriority: baseline + connectionTimeout: 20 + userParameter: + sensor_voltage: -2.4 +ftransz: + description: Sample transer Z + deviceClass: csaxs_bec.devices.sim.flomni_sim.FlomniSimPositioner + deviceConfig: + axis_Id: B + host: mpc2844.psi.ch + limits: + - 0 + - 145 + port: 8081 + sign: 1 + enabled: true + onFailure: buffer + readOnly: false + readoutPriority: baseline + connectionTimeout: 20 +ftray: + description: Sample transfer tray + deviceClass: csaxs_bec.devices.sim.flomni_sim.FlomniSimPositioner + deviceConfig: + axis_Id: D + host: mpc2844.psi.ch + limits: + - -200 + - 0 + port: 8081 + sign: -1 + enabled: true + onFailure: buffer + readOnly: false + readoutPriority: baseline + connectionTimeout: 20 + + +############################################################ +#################### flOMNI Sample Names ################### +############################################################ + +flomni_samples: + description: Sample names and storage + deviceClass: csaxs_bec.devices.sim.flomni_sim.FlomniSimSampleStorage + deviceConfig: {} + enabled: true + onFailure: buffer + readOnly: false + readoutPriority: baseline + +############################################################ +#################### flOMNI Smaract motors ################# +############################################################ + +fosax: + description: OSA X + deviceClass: csaxs_bec.devices.sim.flomni_sim.FlomniSimPositioner + deviceConfig: + axis_Id: A + host: mpc2844.psi.ch + limits: + - 10.2 + - 10.6 + port: 3334 + sign: -1 + enabled: true + onFailure: buffer + readOnly: false + readoutPriority: baseline + connectionTimeout: 20 + userParameter: + in: 9.124 + out: 5.3 +fosay: + description: OSA Y + deviceClass: csaxs_bec.devices.sim.flomni_sim.FlomniSimPositioner + deviceConfig: + axis_Id: B + host: mpc2844.psi.ch + limits: + - -3.1 + - -2.9 + port: 3334 + sign: -1 + enabled: true + onFailure: buffer + readOnly: false + readoutPriority: baseline + connectionTimeout: 20 + userParameter: + in: 0.367 +fosaz: + description: OSA Z + deviceClass: csaxs_bec.devices.sim.flomni_sim.FlomniSimPositioner + deviceConfig: + axis_Id: C + host: mpc2844.psi.ch + limits: + - -6 + - -4 + port: 3334 + sign: 1 + enabled: true + onFailure: buffer + readOnly: false + readoutPriority: baseline + connectionTimeout: 20 + userParameter: + in: 8.5 + out: 6 + +############################################################ +#################### flOMNI RT motors ###################### +############################################################ + +rtx: + description: flomni rt + deviceClass: csaxs_bec.devices.sim.flomni_sim.FlomniSimPositioner + deviceConfig: + axis_Id: A + host: mpc2844.psi.ch + port: 2222 + sign: 1 + enabled: true + onFailure: buffer + readOnly: false + readoutPriority: on_request + connectionTimeout: 20 + userParameter: + low_signal: 10000 + min_signal: 9000 + rt_pid_voltage: -0.06219 +rty: + description: flomni rt + deviceClass: csaxs_bec.devices.sim.flomni_sim.FlomniSimPositioner + deviceConfig: + axis_Id: B + host: mpc2844.psi.ch + port: 2222 + sign: 1 + enabled: true + onFailure: buffer + readOnly: false + readoutPriority: on_request + connectionTimeout: 20 + userParameter: + tomo_additional_offsety: 0 +rtz: + description: flomni rt + deviceClass: csaxs_bec.devices.sim.flomni_sim.FlomniSimPositioner + deviceConfig: + axis_Id: C + host: mpc2844.psi.ch + port: 2222 + sign: 1 + enabled: true + onFailure: buffer + readOnly: false + readoutPriority: on_request + connectionTimeout: 20 + +############################################################ +####################### Cameras ############################ +############################################################ + +cam_flomni_gripper: + description: Camera sample changer + deviceClass: csaxs_bec.devices.sim.flomni_sim.FlomniSimCamera + deviceConfig: + url: http://flomnicamserver:5000/video_high + num_rotation_90: 3 + transpose: false + enabled: true + onFailure: buffer + readOnly: false + readoutPriority: on_request + +cam_flomni_overview: + description: Camera flomni overview + deviceClass: csaxs_bec.devices.sim.flomni_sim.FlomniSimCamera + deviceConfig: + url: http://flomnicamserver:5001/video_high + num_rotation_90: 3 + transpose: false + enabled: true + onFailure: buffer + readOnly: false + readoutPriority: on_request + +cam_xeye: + description: Camera flOMNI Xray eye ID1 + deviceClass: csaxs_bec.devices.sim.flomni_sim.FlomniSimCamera + deviceConfig: + camera_id: 1 + bits_per_pixel: 24 + num_rotation_90: 3 + transpose: false + force_monochrome: true + m_n_colormode: 1 + enabled: true + onFailure: buffer + readOnly: false + readoutPriority: async + +# cam_ids_rgb: +# description: Camera flOMNI Xray eye ID203 +# deviceClass: csaxs_bec.devices.sim.flomni_sim.FlomniSimCamera +# deviceConfig: +# camera_id: 203 +# bits_per_pixel: 24 +# num_rotation_90: 2 +# transpose: false +# force_monochrome: false +# m_n_colormode: 1 +# enabled: true +# onFailure: buffer +# readOnly: false +# readoutPriority: async + + +# ############################################################ +# ################### flOMNI temperatures #################### +# ############################################################ +flomni_temphum: + description: flOMNI Temperatures and humidity + deviceClass: csaxs_bec.devices.sim.flomni_sim.FlomniSimTempHum + deviceConfig: {} + enabled: true + onFailure: buffer + readOnly: false + readoutPriority: baseline +# ############################################################ +# ########## OMNY / flOMNI / LamNI fast shutter ############## +# ############################################################ +omnyfsh: + description: omnyfsh connects to fast shutter at X12 if device fsh exists + deviceClass: csaxs_bec.devices.sim.flomni_sim.FlomniSimFastShutter + deviceConfig: {} + enabled: true + onFailure: buffer + readOnly: false + readoutPriority: baseline +############################################################ +#################### GUI Signals ########################### +############################################################ +omny_xray_gui: + description: Gui signals + deviceClass: csaxs_bec.devices.sim.flomni_sim.FlomniSimXRayAlignGUI + deviceConfig: {} + enabled: true + onFailure: buffer + readOnly: false + readoutPriority: on_request \ No newline at end of file diff --git a/csaxs_bec/devices/sim/__init__.py b/csaxs_bec/devices/sim/__init__.py new file mode 100644 index 0000000..b06f21b --- /dev/null +++ b/csaxs_bec/devices/sim/__init__.py @@ -0,0 +1,2 @@ +"""Simulation devices for local csaxs_bec development.""" + diff --git a/csaxs_bec/devices/sim/flomni_sim.py b/csaxs_bec/devices/sim/flomni_sim.py new file mode 100644 index 0000000..c281295 --- /dev/null +++ b/csaxs_bec/devices/sim/flomni_sim.py @@ -0,0 +1,474 @@ +from __future__ import annotations + +import json +import threading + +import numpy as np +from ophyd import Component as Cpt +from ophyd import Device, Kind, Signal +from ophyd import DynamicDeviceComponent as Dcpt +from ophyd_devices import SimPositioner +from ophyd_devices.utils.bec_signals import PreviewSignal +from prettytable import PrettyTable + + +class FlomniSimController: + """Controller shim that exposes FLOMNI-specific controller APIs for local simulation.""" + + USER_ACCESS = [ + "socket_put_and_receive", + "socket_put_confirmed", + "drive_axis_to_limit", + "get_motor_limit_switch", + "all_axes_referenced", + "axis_is_referenced", + "is_motor_on", + "feedback_disable", + "feedback_enable_with_reset", + "feedback_enable_without_reset", + "feedback_is_running", + "laser_tracker_on", + "laser_tracker_off", + "laser_tracker_show_all", + "laser_tracker_check_signalstrength", + "laser_tracker_check_enabled", + "show_signal_strength_interferometer", + "read_ssi_interferometer", + "show_cyclic_error_compensation", + "get_pid_x", + "set_rotation_angle", + "fosaz_light_curtain_is_triggered", + "lights_off", + "lights_on", + "galil_show_all", + "move_open_loop_steps", + "find_reference_mark", + ] + + def __init__(self, device: FlomniSimPositioner): + self.device = device + self.connected = True + self._feedback_running = False + self._laser_tracker_enabled = False + self._lights_on = True + self._mount_mode = False + self._confirm = 0 + self._pid_x = -0.06 + self._rt_angle = 0.0 + self._sensor_voltage = -2.4 + self.tracker_info = { + "enabled_y": True, + "enabled_z": True, + "tracker_intensity": 15000, + "threshold_intensity_y": 9000, + "threshold_intensity_z": 9000, + } + + def socket_put_and_receive(self, cmd: str) -> str: + cmd_norm = cmd.strip().lower() + if "mg mntmod" in cmd_norm: + return "1" if self._mount_mode else "0" + if "mg mntprgs" in cmd_norm: + return "0" + if "mg confirm" in cmd_norm: + return str(self._confirm) + if "mg @out[9]" in cmd_norm: + return "0" + if "mg@an[1]" in cmd_norm: + return str(self._sensor_voltage) + if "mg @in[14]" in cmd_norm: + return "0" + if "axisref" in cmd_norm: + return "1" + if cmd_norm == "g": + return str(self._pid_x) + if cmd_norm == "o": + return "1" + if cmd_norm.startswith("tp") or cmd_norm.startswith("td"): + return str(float(self.device.readback.get())) + if cmd_norm.startswith("mg_bg"): + return "0" + return "0" + + def socket_put_confirmed(self, cmd: str) -> None: + cmd_norm = cmd.strip().lower() + if "xq#mntmode" in cmd_norm: + self._mount_mode = True + elif "xq#posmode" in cmd_norm: + self._mount_mode = False + elif cmd_norm.startswith("confirm="): + try: + self._confirm = int(float(cmd_norm.split("=")[1])) + except ValueError: + self._confirm = 0 + elif cmd_norm == "l0": + self._feedback_running = False + elif cmd_norm in ("l1", "l3"): + self._feedback_running = True + elif cmd_norm == "t0": + self._laser_tracker_enabled = False + elif cmd_norm == "t1": + self._laser_tracker_enabled = True + elif cmd_norm == "cb15": + self._lights_on = False + elif cmd_norm == "sb15": + self._lights_on = True + + def drive_axis_to_limit(self, axis_Id_numeric=None, direction: str = "forward") -> None: + low, high = self.device.limits + if low >= high: + return + target = high if direction == "forward" else low + status = self.device.move(target) + status.wait(timeout=2) + + def get_motor_limit_switch(self, axis_id=None) -> tuple[bool, bool]: + low, high = self.device.limits + if low >= high: + return (False, False) + pos = float(self.device.readback.get()) + eps = max(self.device.tolerance.get(), 0.05) + return (abs(pos - low) <= eps, abs(pos - high) <= eps) + + def all_axes_referenced(self) -> bool: + return True + + def axis_is_referenced(self, axis_id_numeric=None) -> bool: + return True + + def is_motor_on(self, axis_id=None) -> bool: + return True + + def feedback_disable(self) -> None: + self._feedback_running = False + + def feedback_enable_with_reset(self) -> None: + self._feedback_running = True + + def feedback_enable_without_reset(self) -> None: + self._feedback_running = True + + def feedback_is_running(self) -> bool: + return self._feedback_running + + def laser_tracker_on(self) -> None: + self._laser_tracker_enabled = True + + def laser_tracker_off(self) -> None: + self._laser_tracker_enabled = False + + def laser_tracker_show_all(self) -> None: + t = PrettyTable() + t.title = "Simulated Laser Tracker" + t.field_names = ["name", "value"] + for key, val in self.tracker_info.items(): + t.add_row([key, val]) + print(t) + + def laser_tracker_check_signalstrength(self) -> bool: + return True + + def laser_tracker_check_enabled(self) -> bool: + return self._laser_tracker_enabled + + def show_signal_strength_interferometer(self) -> None: + print("Simulated interferometer signal strength: OK") + + def read_ssi_interferometer(self) -> float: + return 12000.0 + + def show_cyclic_error_compensation(self) -> None: + print("Simulated cyclic error compensation: initialized") + + def get_pid_x(self) -> float: + return self._pid_x + + def set_rotation_angle(self, val: float) -> None: + self._rt_angle = float(val) + + def fosaz_light_curtain_is_triggered(self) -> bool: + return False + + def lights_off(self) -> None: + self._lights_on = False + + def lights_on(self) -> None: + self._lights_on = True + + def galil_show_all(self) -> None: + print(f"Simulated controller for {self.device.name} connected: {self.connected}") + + def move_open_loop_steps(self, axis_id_numeric: int, steps: int, **_kwargs) -> None: + # no-op in simulation + _ = axis_id_numeric, steps + + def find_reference_mark(self, axis_id_numeric: int, *_args) -> None: + # no-op in simulation + _ = axis_id_numeric + + +class JSONSafeSignal(Signal): + """Signal that coerces complex Python objects to JSON strings for ophyd describe().""" + + def _coerce(self, value): + if isinstance(value, np.ndarray): + value = value.tolist() + if isinstance(value, (dict, list, tuple)): + return json.dumps(value) + return value + + def put(self, value, *args, **kwargs): + return super().put(self._coerce(value), *args, **kwargs) + + def set(self, value, *args, **kwargs): + return super().set(self._coerce(value), *args, **kwargs) + + +class FlomniSimPositioner(SimPositioner): + """Simulated positioner that preserves FLOMNI controller-style API.""" + + USER_ACCESS = ["sim", "readback", "registered_proxies", "controller"] + + def __init__( + self, + name: str, + *, + axis_Id: str | None = None, + host: str | None = None, + port: int | None = None, + sign: int = 1, + limits: list[float] | tuple[float, float] | None = None, + **kwargs, + ): + super().__init__(name=name, limits=limits, **kwargs) + self.axis_Id = axis_Id + self.host = host + self.port = port + self.sign = sign + self.controller = FlomniSimController(self) + + @property + def axis_Id_numeric(self) -> int | None: + if not self.axis_Id: + return None + return ord(self.axis_Id.lower()) - 97 + + +class FlomniSimCamera(Device): + """Lightweight camera simulation with live mode controls for FLOMNI/OMNY GUI usage.""" + + USER_ACCESS = ["start_live_mode", "stop_live_mode", "live_mode_enabled", "image", "preview"] + + image = Cpt(PreviewSignal, name="image", ndim=2, num_rotation_90=0, transpose=False) + preview = Cpt(PreviewSignal, name="preview", ndim=2, num_rotation_90=0, transpose=False) + live_mode_enabled = Cpt( + Signal, + name="live_mode_enabled", + value=False, + kind=Kind.config, + ) + + def __init__( + self, + name: str, + *, + camera_id: int | None = None, + camera_ID: int | None = None, + url: str | None = None, + bits_per_pixel: int = 8, + channels: int = 1, + m_n_colormode: int = 0, + num_rotation_90: int = 0, + transpose: bool = False, + force_monochrome: bool = False, + **kwargs, + ): + super().__init__(name=name, **kwargs) + self.camera_id = camera_id if camera_id is not None else camera_ID + self.url = url + self.bits_per_pixel = bits_per_pixel + self.channels = channels + self.m_n_colormode = m_n_colormode + self.force_monochrome = force_monochrome + self.image.num_rotation_90 = num_rotation_90 + self.image.transpose = transpose + self.preview.num_rotation_90 = num_rotation_90 + self.preview.transpose = transpose + self._frame_number = 0 + self._shutdown_event = threading.Event() + self._live_mode_thread: threading.Thread | None = None + self.live_mode_enabled.subscribe(self._on_live_mode_enabled_changed, run=False) + + def start_live_mode(self) -> None: + self.live_mode_enabled.put(True) + + def stop_live_mode(self) -> None: + self.live_mode_enabled.put(False) + + def _on_live_mode_enabled_changed(self, *args, value, **kwargs): + _ = args, kwargs + if bool(value): + self._start_live_mode() + else: + self._stop_live_mode() + + def _start_live_mode(self) -> None: + if self._live_mode_thread and self._live_mode_thread.is_alive(): + return + self._shutdown_event.clear() + self._live_mode_thread = threading.Thread(target=self._live_mode_loop, daemon=True) + self._live_mode_thread.start() + + def _stop_live_mode(self) -> None: + if self._live_mode_thread is None: + return + self._shutdown_event.set() + self._live_mode_thread.join(timeout=1) + self._live_mode_thread = None + self._shutdown_event.clear() + + def _live_mode_loop(self) -> None: + while not self._shutdown_event.is_set(): + frame = self._generate_frame() + self.image.put(frame) + self.preview.put(frame) + self._shutdown_event.wait(0.2) + + def _generate_frame(self) -> np.ndarray: + self._frame_number += 1 + x = np.linspace(0, 255, 320, dtype=np.uint8) + y = np.linspace(0, 255, 240, dtype=np.uint8)[:, None] + frame = ((x + y + self._frame_number) % 255).astype(np.uint8) + return frame + + def destroy(self): + self._stop_live_mode() + return super().destroy() + + +class FlomniSimFastShutter(Device): + USER_ACCESS = ["fshopen", "fshclose", "fshstatus", "fshinfo", "fshstatus_readback"] + shutter = Cpt(Signal, name="shutter", value=0) + + def fshopen(self): + self.shutter.put(1) + + def fshclose(self): + self.shutter.put(0) + + def fshstatus(self): + return int(self.shutter.get()) + + def fshstatus_readback(self): + return int(self.shutter.get()) + + def fshinfo(self): + print("Using simulated fast shutter.") + + +class FlomniSimXRayAlignGUI(Device): + """Signal-only X-ray alignment GUI backend for simulation.""" + + update_frame_acqdone = Cpt(Signal, value=0) + update_frame_acq = Cpt(Signal, value=0) + enable_mv_x = Cpt(Signal, value=0) + enable_mv_y = Cpt(Signal, value=0) + send_message = Cpt(Signal, value="") + sample_name = Cpt(Signal, value="") + angle = Cpt(Signal, value=0) + pixel_size = Cpt(Signal, value=0) + submit = Cpt(Signal, name="submit", value=0) + step = Cpt(Signal, value=0) + recbg = Cpt(Signal, value=0) + mvx = Cpt(Signal, value=0) + mvy = Cpt(Signal, value=0) + fit_array = Cpt(Signal, value=np.zeros((3, 10))) + fit_params_x = Cpt(JSONSafeSignal, value="{}") + fit_params_y = Cpt(JSONSafeSignal, value="{}") + + for i in range(11): + locals()[f"width_y_{i}"] = Cpt(Signal, value=0) + for i in range(11): + locals()[f"width_x_{i}"] = Cpt(Signal, value=0) + for i in range(11): + locals()[f"xval_x_{i}"] = Cpt(Signal, value=0) + for i in range(11): + locals()[f"yval_y_{i}"] = Cpt(Signal, value=0) + for i in range(1, 6): + locals()[f"stage_pos_x_{i}"] = Cpt(Signal, value=0) + + +class FlomniSimSampleStorage(Device): + USER_ACCESS = [ + "is_sample_slot_used", + "is_sample_in_gripper", + "set_sample_slot", + "unset_sample_slot", + "set_sample_in_gripper", + "unset_sample_in_gripper", + "get_sample_name", + "show_all", + ] + + sample_placed = Dcpt({f"sample{i}": (Signal, None, {"value": 0}) for i in range(21)}) + sample_names = Dcpt({f"sample{i}": (Signal, None, {"value": "-"}) for i in range(21)}) + sample_in_gripper = Cpt(Signal, name="sample_in_gripper", value=0) + sample_in_gripper_name = Cpt(Signal, name="sample_in_gripper_name", value="-") + + def set_sample_slot(self, slot_nr: int, name: str) -> None: + getattr(self.sample_placed, f"sample{slot_nr}").set(1) + getattr(self.sample_names, f"sample{slot_nr}").set(name) + + def unset_sample_slot(self, slot_nr: int) -> None: + getattr(self.sample_placed, f"sample{slot_nr}").set(0) + getattr(self.sample_names, f"sample{slot_nr}").set("-") + + def set_sample_in_gripper(self, name: str) -> None: + self.sample_in_gripper.set(1) + self.sample_in_gripper_name.set(name) + + def unset_sample_in_gripper(self) -> None: + self.sample_in_gripper.set(0) + self.sample_in_gripper_name.set("-") + + def is_sample_slot_used(self, slot_nr: int) -> bool: + return bool(getattr(self.sample_placed, f"sample{slot_nr}").get()) + + def is_sample_in_gripper(self) -> bool: + return bool(self.sample_in_gripper.get()) + + def get_sample_name(self, slot_nr: int) -> str: + return str(getattr(self.sample_names, f"sample{slot_nr}").get()) + + def show_all(self) -> None: + t = PrettyTable() + t.title = "flOMNI sample storage (sim)" + t.field_names = ["slot", "used", "name"] + for slot in range(21): + t.add_row([slot, int(self.is_sample_slot_used(slot)), self.get_sample_name(slot)]) + print(t) + + +class FlomniSimTempHum(Device): + USER_ACCESS = ["show_all", "help"] + + temperature_mirror = Cpt(Signal, value=22.0) + temperature_osa = Cpt(Signal, value=22.0) + temperature_heater = Cpt(Signal, value=23.0) + humidity_sensor1 = Cpt(Signal, value=35.0) + humidity_sensor2 = Cpt(Signal, value=34.0) + flow = Cpt(Signal, value=2.0) + suction = Cpt(Signal, value=1.0) + + def show_all(self): + print("=== flOMNI Temperature & Humidity (sim) ===") + print(f"Mirror temperature: {self.temperature_mirror.get():.1f} C") + print(f"OSA temperature: {self.temperature_osa.get():.1f} C") + print(f"Heater temperature: {self.temperature_heater.get():.1f} C") + print(f"Humidity sensor 1: {self.humidity_sensor1.get():.1f} %RH") + print(f"Humidity sensor 2: {self.humidity_sensor2.get():.1f} %RH") + print(f"Flow: {self.flow.get():.1f} sccm") + print(f"Suction: {self.suction.get():.1f}") + + def help(self): + print("show_all() - display current simulated values")