fix: split simulation classes in multiple files

This commit is contained in:
guijar_m 2024-06-20 17:43:12 +02:00
parent f05af706c7
commit 2622ddbee2
10 changed files with 750 additions and 719 deletions

View File

@ -3,18 +3,23 @@ from .ophyd_patch import monkey_patch_ophyd
monkey_patch_ophyd()
from .devices.sls_devices import SLSInfo, SLSOperatorMessages
from .sim.sim import SimCamera
from .sim.sim import SimFlyer
from .sim.sim import SimFlyer as SynFlyer
from .sim.sim import SimMonitor
from .sim.sim import SimMonitor as SynAxisMonitor
from .sim.sim import SimMonitor as SynGaussBEC
from .sim.sim import SimPositioner
from .sim.sim import SimPositioner as SynAxisOPAAS
from .sim.sim import SimWaveform, SynDeviceOPAAS
from .sim.sim_camera import SimCamera
from .sim.sim_monitor import SimMonitor, SimMonitorAsync
SynAxisMonitor = SimMonitor
SynGaussBEC = SimMonitor
from .sim.sim_positioner import SimPositioner
SynAxisOPAAS = SimPositioner
from .sim.sim_flyer import SimFlyer
SynFlyer = SimFlyer
from .sim.sim import SynDeviceOPAAS, SynDynamicComponents
from .sim.sim_frameworks import DeviceProxy, H5ImageReplayProxy, SlitProxy
from .sim.sim_signals import ReadOnlySignal
from .sim.sim_signals import ReadOnlySignal as SynSignalRO
from .sim.sim_waveform import SimWaveform
SynSignalRO = ReadOnlySignal
from .utils.bec_device_base import BECDeviceBase
from .utils.dynamic_pseudo import ComputedSignal
from .utils.static_device_test import launch

View File

@ -1,7 +1,10 @@
from .sim import SimCamera
from .sim import SimFlyer
from .sim import SimFlyer as SynFlyer
from .sim import SimMonitor, SimPositioner, SimWaveform
from .sim_camera import SimCamera
from .sim_flyer import SimFlyer
SynFlyer = SimFlyer
from .sim_frameworks import SlitProxy
from .sim_monitor import SimMonitor
from .sim_positioner import SimPositioner
from .sim_signals import ReadOnlySignal, SetableSignal
from .sim_waveform import SimWaveform
from .sim_xtreme import SynXtremeOtf

View File

@ -1,681 +1,10 @@
import os
import threading
import time as ttime
import numpy as np
from bec_lib import messages
from bec_lib.endpoints import MessageEndpoints
from bec_lib.logger import bec_logger
from ophyd import Component as Cpt
from ophyd import Device, DeviceStatus
from ophyd import Device
from ophyd import DynamicDeviceComponent as Dcpt
from ophyd import Kind, PositionerBase
from ophyd.flyers import FlyerInterface
from ophyd.sim import SynSignal
from ophyd.status import StatusBase
from ophyd.utils import LimitError
from ophyd_devices.interfaces.base_classes.psi_detector_base import (
CustomDetectorMixin,
PSIDetectorBase,
)
from ophyd_devices.sim.sim_data import (
SimulatedDataCamera,
SimulatedDataMonitor,
SimulatedDataWaveform,
SimulatedPositioner,
)
from ophyd_devices.sim.sim_signals import ReadOnlySignal, SetableSignal
from ophyd_devices.sim.sim_test_devices import DummyController
from ophyd_devices.sim.sim_utils import H5Writer
from ophyd_devices.utils.bec_scaninfo_mixin import BecScaninfoMixin
logger = bec_logger.logger
class DeviceStop(Exception):
pass
class SimMonitor(Device):
"""
A simulated device mimic any 1D Axis (position, temperature, beam).
It's readback is a computed signal, which is configurable by the user and from the command line.
The corresponding simulation class is sim_cls=SimulatedDataMonitor, more details on defaults within the simulation class.
>>> monitor = SimMonitor(name="monitor")
Parameters
----------
name (string) : Name of the device. This is the only required argmuent, passed on to all signals of the device.
precision (integer) : Precision of the readback in digits, written to .describe(). Default is 3 digits.
sim_init (dict) : Dictionary to initiate parameters of the simulation, check simulation type defaults for more details.
parent : Parent device, optional, is used internally if this signal/device is part of a larger device.
kind : A member the Kind IntEnum (or equivalent integer), optional. Default is Kind.normal. See Kind for options.
device_manager : DeviceManager from BEC, optional . Within startup of simulation, device_manager is passed on automatically.
"""
USER_ACCESS = ["sim", "registered_proxies"]
sim_cls = SimulatedDataMonitor
BIT_DEPTH = np.uint32
readback = Cpt(ReadOnlySignal, value=BIT_DEPTH(0), kind=Kind.hinted, compute_readback=True)
SUB_READBACK = "readback"
_default_sub = SUB_READBACK
def __init__(
self,
name,
*,
precision: int = 3,
sim_init: dict = None,
parent=None,
kind=None,
device_manager=None,
**kwargs,
):
self.precision = precision
self.init_sim_params = sim_init
self.device_manager = device_manager
self.sim = self.sim_cls(parent=self, **kwargs)
self._registered_proxies = {}
super().__init__(name=name, parent=parent, kind=kind, **kwargs)
self.sim.sim_state[self.name] = self.sim.sim_state.pop(self.readback.name, None)
self.readback.name = self.name
@property
def registered_proxies(self) -> None:
"""Dictionary of registered signal_names and proxies."""
return self._registered_proxies
class SimCameraSetup(CustomDetectorMixin):
"""Mixin class for the SimCamera device."""
def on_trigger(self) -> None:
"""Trigger the camera to acquire images.
This method can be called from BEC during a scan. It will acquire images and send them to BEC.
Whether the trigger is send from BEC is determined by the softwareTrigger argument in the device config.
Here, we also run a callback on SUB_MONITOR to send the image data the device_monitor endpoint in BEC.
"""
try:
for _ in range(self.parent.burst.get()):
data = self.parent.image.get()
self.parent._run_subs(sub_type=self.parent.SUB_MONITOR, value=data)
if self.parent.stopped:
raise DeviceStop
if self.parent.write_to_disk.get():
self.parent.h5_writer.receive_data(data)
except DeviceStop:
pass
finally:
self.parent.stopped = False
def on_stage(self) -> None:
"""Stage the camera for upcoming scan
This method is called from BEC in preparation of a scan.
It receives metadata about the scan from BEC,
compiles it and prepares the camera for the scan.
FYI: No data is written to disk in the simulation, but upon each trigger it
is published to the device_monitor endpoint in REDIS.
"""
self.parent.filepath.set(
self.parent.filewriter.compile_full_filename(f"{self.parent.name}")
).wait()
self.parent.frames.set(
self.parent.scaninfo.num_points * self.parent.scaninfo.frames_per_trigger
)
self.parent.exp_time.set(self.parent.scaninfo.exp_time)
self.parent.burst.set(self.parent.scaninfo.frames_per_trigger)
if self.parent.write_to_disk.get():
self.parent.h5_writer.prepare(
file_path=self.parent.filepath.get(), h5_entry="/entry/data/data"
)
self.publish_file_location(done=False)
self.parent.stopped = False
def on_unstage(self) -> None:
"""Unstage the device
Send reads from all config signals to redis
"""
if self.parent.write_to_disk.get():
self.publish_file_location(done=True, successful=True)
class SimCamera(PSIDetectorBase):
"""A simulated device mimic any 2D camera.
It's image is a computed signal, which is configurable by the user and from the command line.
The corresponding simulation class is sim_cls=SimulatedDataCamera, more details on defaults within the simulation class.
>>> camera = SimCamera(name="camera")
Parameters
----------
name (string) : Name of the device. This is the only required argmuent, passed on to all signals of the device.
precision (integer) : Precision of the readback in digits, written to .describe(). Default is 3 digits.
sim_init (dict) : Dictionary to initiate parameters of the simulation, check simulation type defaults for more details.
parent : Parent device, optional, is used internally if this signal/device is part of a larger device.
kind : A member the Kind IntEnum (or equivalent integer), optional. Default is Kind.normal. See Kind for options.
device_manager : DeviceManager from BEC, optional . Within startup of simulation, device_manager is passed on automatically.
"""
USER_ACCESS = ["sim", "registered_proxies"]
custom_prepare_cls = SimCameraSetup
sim_cls = SimulatedDataCamera
SHAPE = (100, 100)
BIT_DEPTH = np.uint16
SUB_MONITOR = "monitor"
_default_sub = SUB_MONITOR
exp_time = Cpt(SetableSignal, name="exp_time", value=1, kind=Kind.config)
file_pattern = Cpt(SetableSignal, name="file_pattern", value="", kind=Kind.config)
frames = Cpt(SetableSignal, name="frames", value=1, kind=Kind.config)
burst = Cpt(SetableSignal, name="burst", value=1, kind=Kind.config)
image_shape = Cpt(SetableSignal, name="image_shape", value=SHAPE, kind=Kind.config)
image = Cpt(
ReadOnlySignal,
name="image",
value=np.empty(SHAPE, dtype=BIT_DEPTH),
compute_readback=True,
kind=Kind.omitted,
)
write_to_disk = Cpt(SetableSignal, name="write_to_disk", value=False, kind=Kind.config)
def __init__(
self, name, *, kind=None, parent=None, sim_init: dict = None, device_manager=None, **kwargs
):
self.init_sim_params = sim_init
self._registered_proxies = {}
self.sim = self.sim_cls(parent=self, **kwargs)
self.h5_writer = H5Writer()
super().__init__(
name=name, parent=parent, kind=kind, device_manager=device_manager, **kwargs
)
@property
def registered_proxies(self) -> None:
"""Dictionary of registered signal_names and proxies."""
return self._registered_proxies
def complete(self) -> StatusBase:
"""Complete the motion of the simulated device."""
status = DeviceStatus(self)
if self.write_to_disk.get():
self.h5_writer.write_data()
status.set_finished()
return status
class SimWaveform(Device):
"""A simulated device mimic any 1D Waveform detector.
It's waveform is a computed signal, which is configurable by the user and from the command line.
The corresponding simulation class is sim_cls=SimulatedDataWaveform, more details on defaults within the simulation class.
>>> waveform = SimWaveform(name="waveform")
Parameters
----------
name (string) : Name of the device. This is the only required argmuent, passed on to all signals of the device.
precision (integer) : Precision of the readback in digits, written to .describe(). Default is 3 digits.
sim_init (dict) : Dictionary to initiate parameters of the simulation, check simulation type defaults for more details.
parent : Parent device, optional, is used internally if this signal/device is part of a larger device.
kind : A member the Kind IntEnum (or equivalent integer), optional. Default is Kind.normal. See Kind for options.
device_manager : DeviceManager from BEC, optional . Within startup of simulation, device_manager is passed on automatically.
"""
USER_ACCESS = ["sim", "registered_proxies"]
sim_cls = SimulatedDataWaveform
SHAPE = (1000,)
BIT_DEPTH = np.uint16
SUB_MONITOR = "monitor"
_default_sub = SUB_MONITOR
exp_time = Cpt(SetableSignal, name="exp_time", value=1, kind=Kind.config)
file_path = Cpt(SetableSignal, name="file_path", value="", kind=Kind.config)
file_pattern = Cpt(SetableSignal, name="file_pattern", value="", kind=Kind.config)
frames = Cpt(SetableSignal, name="frames", value=1, kind=Kind.config)
burst = Cpt(SetableSignal, name="burst", value=1, kind=Kind.config)
waveform_shape = Cpt(SetableSignal, name="waveform_shape", value=SHAPE, kind=Kind.config)
waveform = Cpt(
ReadOnlySignal,
name="waveform",
value=np.empty(SHAPE, dtype=BIT_DEPTH),
compute_readback=True,
kind=Kind.omitted,
)
def __init__(
self, name, *, kind=None, parent=None, sim_init: dict = None, device_manager=None, **kwargs
):
self.device_manager = device_manager
self.init_sim_params = sim_init
self._registered_proxies = {}
self.sim = self.sim_cls(parent=self, **kwargs)
super().__init__(name=name, parent=parent, kind=kind, **kwargs)
self._stopped = False
self._staged = False
self.scaninfo = None
self._update_scaninfo()
@property
def registered_proxies(self) -> None:
"""Dictionary of registered signal_names and proxies."""
return self._registered_proxies
def trigger(self) -> DeviceStatus:
"""Trigger the camera to acquire images.
This method can be called from BEC during a scan. It will acquire images and send them to BEC.
Whether the trigger is send from BEC is determined by the softwareTrigger argument in the device config.
Here, we also run a callback on SUB_MONITOR to send the image data the device_monitor endpoint in BEC.
"""
status = DeviceStatus(self)
self.subscribe(status._finished, event_type=self.SUB_ACQ_DONE, run=False)
def acquire():
try:
for _ in range(self.burst.get()):
self._run_subs(sub_type=self.SUB_MONITOR, value=self.waveform.get())
if self._stopped:
raise DeviceStop
except DeviceStop:
pass
finally:
self._stopped = False
self._done_acquiring()
threading.Thread(target=acquire, daemon=True).start()
return status
def _update_scaninfo(self) -> None:
"""Update scaninfo from BecScaninfoMixing
This depends on device manager and operation/sim_mode
"""
self.scaninfo = BecScaninfoMixin(self.device_manager)
def stage(self) -> list[object]:
"""Stage the camera for upcoming scan
This method is called from BEC in preparation of a scan.
It receives metadata about the scan from BEC,
compiles it and prepares the camera for the scan.
FYI: No data is written to disk in the simulation, but upon each trigger it
is published to the device_monitor endpoint in REDIS.
"""
if self._staged:
return super().stage()
self.scaninfo.load_scan_metadata()
self.file_path.set(
os.path.join(
self.file_path.get(), self.file_pattern.get().format(self.scaninfo.scan_number)
)
)
self.frames.set(self.scaninfo.num_points * self.scaninfo.frames_per_trigger)
self.exp_time.set(self.scaninfo.exp_time)
self.burst.set(self.scaninfo.frames_per_trigger)
self._stopped = False
return super().stage()
def unstage(self) -> list[object]:
"""Unstage the device
Send reads from all config signals to redis
"""
if self._stopped is True or not self._staged:
return super().unstage()
return super().unstage()
def stop(self, *, success=False):
"""Stop the device"""
self._stopped = True
super().stop(success=success)
class SimPositioner(Device, PositionerBase):
"""
A simulated device mimicing any 1D Axis device (position, temperature, rotation).
>>> motor = SimPositioner(name="motor")
Parameters
----------
name (string) : Name of the device. This is the only required argmuent, passed on to all signals of the device.\
Optional parameters:
----------
delay (int) : If 0, execution of move will be instant. If 1, exectution will depend on motor velocity. Default is 1.
update_frequency (int) : Frequency in Hz of the update of the simulated state during a move. Default is 2 Hz.
precision (integer) : Precision of the readback in digits, written to .describe(). Default is 3 digits.
limits (tuple) : Tuple of the low and high limits of the positioner. Overrides low/high_limit_travel is specified. Default is None.
parent : Parent device, optional, is used internally if this signal/device is part of a larger device.
kind : A member the Kind IntEnum (or equivalent integer), optional. Default is Kind.normal. See Kind for options.
device_manager : DeviceManager from BEC, optional . Within startup of simulation, device_manager is passed on automatically.
sim_init (dict) : Dictionary to initiate parameters of the simulation, check simulation type defaults for more details.
"""
# Specify which attributes are accessible via BEC client
USER_ACCESS = ["sim", "readback", "dummy_controller", "registered_proxies"]
sim_cls = SimulatedPositioner
# Define the signals as class attributes
readback = Cpt(ReadOnlySignal, name="readback", value=0, kind=Kind.hinted)
setpoint = Cpt(SetableSignal, value=0, kind=Kind.normal)
motor_is_moving = Cpt(SetableSignal, value=0, kind=Kind.normal)
# Config signals
velocity = Cpt(SetableSignal, value=100, kind=Kind.config)
acceleration = Cpt(SetableSignal, value=1, kind=Kind.config)
tolerance = Cpt(SetableSignal, value=0.5, kind=Kind.config)
# Ommitted signals
high_limit_travel = Cpt(SetableSignal, value=0, kind=Kind.omitted)
low_limit_travel = Cpt(SetableSignal, value=0, kind=Kind.omitted)
unused = Cpt(SetableSignal, value=1, kind=Kind.omitted)
SUB_READBACK = "readback"
_default_sub = SUB_READBACK
# pylint: disable=too-many-arguments
def __init__(
self,
name,
*,
delay: int = 1,
update_frequency=2,
precision=3,
limits=None,
parent=None,
kind=None,
device_manager=None,
sim_init: dict = None,
**kwargs,
):
self.delay = delay
self.device_manager = device_manager
self.precision = precision
self.init_sim_params = sim_init
self._registered_proxies = {}
self.update_frequency = update_frequency
self._stopped = False
self.dummy_controller = DummyController()
self.sim = self.sim_cls(parent=self, **kwargs)
super().__init__(name=name, parent=parent, kind=kind, **kwargs)
self.sim.sim_state[self.name] = self.sim.sim_state.pop(self.readback.name, None)
self.readback.name = self.name
if limits is not None:
assert len(limits) == 2
self.low_limit_travel.put(limits[0])
self.high_limit_travel.put(limits[1])
# @property
# def connected(self):
# """Return the connected state of the simulated device."""
# return self.dummy_controller.connected
@property
def limits(self):
"""Return the limits of the simulated device."""
return (self.low_limit_travel.get(), self.high_limit_travel.get())
@property
def low_limit(self):
"""Return the low limit of the simulated device."""
return self.limits[0]
@property
def high_limit(self):
"""Return the high limit of the simulated device."""
return self.limits[1]
def registered_proxies(self) -> None:
"""Dictionary of registered signal_names and proxies."""
return self._registered_proxies
# pylint: disable=arguments-differ
def check_value(self, value: any):
"""
Check that requested position is within existing limits.
This function has to be implemented on the top level of the positioner.
"""
low_limit, high_limit = self.limits
if low_limit < high_limit and not low_limit <= value <= high_limit:
raise LimitError(f"position={value} not within limits {self.limits}")
def _set_sim_state(self, signal_name: str, value: any) -> None:
"""Update the simulated state of the device."""
self.sim.sim_state[signal_name]["value"] = value
self.sim.sim_state[signal_name]["timestamp"] = ttime.time()
def _get_sim_state(self, signal_name: str) -> any:
"""Return the simulated state of the device."""
return self.sim.sim_state[signal_name]["value"]
def move(self, value: float, **kwargs) -> DeviceStatus:
"""Change the setpoint of the simulated device, and simultaneously initiated a motion."""
self._stopped = False
self.check_value(value)
old_setpoint = self._get_sim_state(self.setpoint.name)
self._set_sim_state(self.motor_is_moving.name, 1)
self._set_sim_state(self.setpoint.name, value)
def update_state(val):
"""Update the state of the simulated device."""
if self._stopped:
raise DeviceStop
old_readback = self._get_sim_state(self.readback.name)
self._set_sim_state(self.readback.name, val)
# Run subscription on "readback"
self._run_subs(
sub_type=self.SUB_READBACK,
old_value=old_readback,
value=self.sim.sim_state[self.readback.name]["value"],
timestamp=self.sim.sim_state[self.readback.name]["timestamp"],
)
st = DeviceStatus(device=self)
if self.delay:
def move_and_finish():
"""Move the simulated device and finish the motion."""
success = True
try:
move_val = self._get_sim_state(
self.setpoint.name
) + self.tolerance.get() * np.random.uniform(-1, 1)
updates = np.ceil(
np.abs(old_setpoint - move_val)
/ self.velocity.get()
* self.update_frequency
)
for ii in np.linspace(old_setpoint, move_val, int(updates)):
ttime.sleep(1 / self.update_frequency)
update_state(ii)
update_state(move_val)
self._set_sim_state(self.motor_is_moving, 0)
except DeviceStop:
success = False
finally:
self._stopped = False
self._done_moving(success=success)
self._set_sim_state(self.motor_is_moving.name, 0)
st.set_finished()
threading.Thread(target=move_and_finish, daemon=True).start()
else:
update_state(value)
self._done_moving()
self._set_sim_state(self.motor_is_moving.name, 0)
st.set_finished()
return st
def stop(self, *, success=False):
"""Stop the motion of the simulated device."""
super().stop(success=success)
self._stopped = True
@property
def position(self) -> float:
"""Return the current position of the simulated device."""
return self.readback.get()
@property
def egu(self):
"""Return the engineering units of the simulated device."""
return "mm"
class SimFlyer(Device, FlyerInterface):
"""A simulated device mimicing any 2D Flyer device (position, temperature, rotation).
The corresponding simulation class is sim_cls=SimulatedPositioner, more details on defaults within the simulation class.
>>> flyer = SimFlyer(name="flyer")
Parameters
----------
name (string) : Name of the device. This is the only required argmuent, passed on to all signals of the device.
precision (integer) : Precision of the readback in digits, written to .describe(). Default is 3 digits.
parent : Parent device, optional, is used internally if this signal/device is part of a larger device.
kind : A member the Kind IntEnum (or equivalent integer), optional. Default is Kind.normal. See Kind for options.
device_manager : DeviceManager from BEC, optional . Within startup of simulation, device_manager is passed on automatically.
"""
USER_ACCESS = ["sim", "registered_proxies"]
sim_cls = SimulatedPositioner
readback = Cpt(
ReadOnlySignal, name="readback", value=0, kind=Kind.hinted, compute_readback=False
)
def __init__(
self,
name: str,
*,
precision: int = 3,
parent=None,
kind=None,
device_manager=None,
# TODO remove after refactoring config
delay: int = 1,
update_frequency: int = 100,
**kwargs,
):
self.sim = self.sim_cls(parent=self, **kwargs)
self.precision = precision
self.device_manager = device_manager
self._registered_proxies = {}
super().__init__(name=name, parent=parent, kind=kind, **kwargs)
self.sim.sim_state[self.name] = self.sim.sim_state.pop(self.readback.name, None)
self.readback.name = self.name
@property
def registered_proxies(self) -> None:
"""Dictionary of registered signal_names and proxies."""
return self._registered_proxies
@property
def hints(self):
"""Return the hints of the simulated device."""
return {"fields": ["flyer_samx", "flyer_samy"]}
@property
def egu(self) -> str:
"""Return the engineering units of the simulated device."""
return "mm"
def complete(self) -> StatusBase:
"""Complete the motion of the simulated device."""
status = DeviceStatus(self)
status.set_finished()
return status
def kickoff(self, metadata, num_pos, positions, exp_time: float = 0):
"""Kickoff the flyer to execute code during the scan."""
positions = np.asarray(positions)
def produce_data(device, metadata):
"""Simulate the data being produced by the flyer."""
buffer_time = 0.2
elapsed_time = 0
bundle = messages.BundleMessage()
for ii in range(num_pos):
bundle.append(
messages.DeviceMessage(
signals={
"flyer_samx": {"value": positions[ii, 0], "timestamp": 0},
"flyer_samy": {"value": positions[ii, 1], "timestamp": 0},
},
metadata={"point_id": ii, **metadata},
)
)
ttime.sleep(exp_time)
elapsed_time += exp_time
if elapsed_time > buffer_time:
elapsed_time = 0
logger.info(f"Sending data point {ii} for {device.name}.")
device.device_manager.connector.set_and_publish(
MessageEndpoints.device_read(device.name), bundle
)
bundle = messages.BundleMessage()
device.device_manager.connector.set(
MessageEndpoints.device_status(device.name),
messages.DeviceStatusMessage(
device=device.name, status=1, metadata={"point_id": ii, **metadata}
),
)
device.device_manager.connector.set_and_publish(
MessageEndpoints.device_read(device.name), bundle
)
device.device_manager.connector.set(
MessageEndpoints.device_status(device.name),
messages.DeviceStatusMessage(
device=device.name, status=0, metadata={"point_id": num_pos, **metadata}
),
)
print("done")
flyer = threading.Thread(target=produce_data, args=(self, metadata))
flyer.start()
from ophyd_devices.sim.sim_positioner import SimPositionerWithCommFailure # noqa: F401
from ophyd_devices.sim.sim_positioner import SimPositioner
from ophyd_devices.sim.sim_signals import SetableSignal as SynSignal
class SynDeviceSubOPAAS(Device):
@ -690,29 +19,3 @@ class SynDeviceOPAAS(Device):
class SynDynamicComponents(Device):
messages = Dcpt({f"message{i}": (SynSignal, None, {"name": f"msg{i}"}) for i in range(1, 6)})
class SimPositionerWithCommFailure(SimPositioner):
fails = Cpt(SetableSignal, value=0)
def move(self, value: float, **kwargs) -> DeviceStatus:
if self.fails.get() == 1:
raise RuntimeError("Communication failure")
if self.fails.get() == 2:
while not self._stopped:
ttime.sleep(1)
status = DeviceStatus(self)
status.set_exception(RuntimeError("Communication failure"))
return super().move(value, **kwargs)
if __name__ == "__main__":
waveform = SimWaveform(name="waveform")
waveform.sim.sim_select_model(waveform.sim.sim_get_models()[7])
waveform.sim.sim_params = {
"amplitude": 1500,
"noise_multiplier": 168,
"sigma": 50,
"center": 350,
}
waveform.waveform.get()

View File

@ -0,0 +1,144 @@
import numpy as np
from bec_lib.logger import bec_logger
from ophyd import Component as Cpt
from ophyd import DeviceStatus, Kind
from ophyd.status import StatusBase
from ophyd_devices.interfaces.base_classes.psi_detector_base import (
CustomDetectorMixin,
PSIDetectorBase,
)
from ophyd_devices.sim.sim_data import SimulatedDataCamera
from ophyd_devices.sim.sim_exception import DeviceStop
from ophyd_devices.sim.sim_signals import ReadOnlySignal, SetableSignal
from ophyd_devices.sim.sim_utils import H5Writer
logger = bec_logger.logger
class SimCameraSetup(CustomDetectorMixin):
"""Mixin class for the SimCamera device."""
def on_trigger(self) -> None:
"""Trigger the camera to acquire images.
This method can be called from BEC during a scan. It will acquire images and send them to BEC.
Whether the trigger is send from BEC is determined by the softwareTrigger argument in the device config.
Here, we also run a callback on SUB_MONITOR to send the image data the device_monitor endpoint in BEC.
"""
try:
for _ in range(self.parent.burst.get()):
data = self.parent.image.get()
self.parent._run_subs(sub_type=self.parent.SUB_MONITOR, value=data)
if self.parent.stopped:
raise DeviceStop
if self.parent.write_to_disk.get():
self.parent.h5_writer.receive_data(data)
except DeviceStop:
pass
finally:
self.parent.stopped = False
def on_stage(self) -> None:
"""Stage the camera for upcoming scan
This method is called from BEC in preparation of a scan.
It receives metadata about the scan from BEC,
compiles it and prepares the camera for the scan.
FYI: No data is written to disk in the simulation, but upon each trigger it
is published to the device_monitor endpoint in REDIS.
"""
self.parent.filepath.set(
self.parent.filewriter.compile_full_filename(f"{self.parent.name}")
).wait()
self.parent.frames.set(
self.parent.scaninfo.num_points * self.parent.scaninfo.frames_per_trigger
)
self.parent.exp_time.set(self.parent.scaninfo.exp_time)
self.parent.burst.set(self.parent.scaninfo.frames_per_trigger)
if self.parent.write_to_disk.get():
self.parent.h5_writer.prepare(
file_path=self.parent.filepath.get(), h5_entry="/entry/data/data"
)
self.publish_file_location(done=False)
self.parent.stopped = False
def on_unstage(self) -> None:
"""Unstage the device
Send reads from all config signals to redis
"""
if self.parent.write_to_disk.get():
self.publish_file_location(done=True, successful=True)
class SimCamera(PSIDetectorBase):
"""A simulated device mimic any 2D camera.
It's image is a computed signal, which is configurable by the user and from the command line.
The corresponding simulation class is sim_cls=SimulatedDataCamera, more details on defaults within the simulation class.
>>> camera = SimCamera(name="camera")
Parameters
----------
name (string) : Name of the device. This is the only required argmuent, passed on to all signals of the device.
precision (integer) : Precision of the readback in digits, written to .describe(). Default is 3 digits.
sim_init (dict) : Dictionary to initiate parameters of the simulation, check simulation type defaults for more details.
parent : Parent device, optional, is used internally if this signal/device is part of a larger device.
kind : A member the Kind IntEnum (or equivalent integer), optional. Default is Kind.normal. See Kind for options.
device_manager : DeviceManager from BEC, optional . Within startup of simulation, device_manager is passed on automatically.
"""
USER_ACCESS = ["sim", "registered_proxies"]
custom_prepare_cls = SimCameraSetup
sim_cls = SimulatedDataCamera
SHAPE = (100, 100)
BIT_DEPTH = np.uint16
SUB_MONITOR = "monitor"
_default_sub = SUB_MONITOR
exp_time = Cpt(SetableSignal, name="exp_time", value=1, kind=Kind.config)
file_pattern = Cpt(SetableSignal, name="file_pattern", value="", kind=Kind.config)
frames = Cpt(SetableSignal, name="frames", value=1, kind=Kind.config)
burst = Cpt(SetableSignal, name="burst", value=1, kind=Kind.config)
image_shape = Cpt(SetableSignal, name="image_shape", value=SHAPE, kind=Kind.config)
image = Cpt(
ReadOnlySignal,
name="image",
value=np.empty(SHAPE, dtype=BIT_DEPTH),
compute_readback=True,
kind=Kind.omitted,
)
write_to_disk = Cpt(SetableSignal, name="write_to_disk", value=False, kind=Kind.config)
def __init__(
self, name, *, kind=None, parent=None, sim_init: dict = None, device_manager=None, **kwargs
):
self.init_sim_params = sim_init
self._registered_proxies = {}
self.sim = self.sim_cls(parent=self, **kwargs)
self.h5_writer = H5Writer()
super().__init__(
name=name, parent=parent, kind=kind, device_manager=device_manager, **kwargs
)
@property
def registered_proxies(self) -> None:
"""Dictionary of registered signal_names and proxies."""
return self._registered_proxies
def complete(self) -> StatusBase:
"""Complete the motion of the simulated device."""
status = DeviceStatus(self)
if self.write_to_disk.get():
self.h5_writer.write_data()
status.set_finished()
return status

View File

@ -0,0 +1,2 @@
class DeviceStop(Exception):
pass

View File

@ -0,0 +1,133 @@
import threading
import time
import numpy as np
from bec_lib import messages
from bec_lib.endpoints import MessageEndpoints
from bec_lib.logger import bec_logger
from ophyd import Component as Cpt
from ophyd import Device, DeviceStatus, Kind
from ophyd.flyers import FlyerInterface
from ophyd.status import StatusBase
from ophyd_devices.sim.sim_data import SimulatedPositioner
from ophyd_devices.sim.sim_signals import ReadOnlySignal
logger = bec_logger.logger
class SimFlyer(Device, FlyerInterface):
"""A simulated device mimicing any 2D Flyer device (position, temperature, rotation).
The corresponding simulation class is sim_cls=SimulatedPositioner, more details on defaults within the simulation class.
>>> flyer = SimFlyer(name="flyer")
Parameters
----------
name (string) : Name of the device. This is the only required argmuent, passed on to all signals of the device.
precision (integer) : Precision of the readback in digits, written to .describe(). Default is 3 digits.
parent : Parent device, optional, is used internally if this signal/device is part of a larger device.
kind : A member the Kind IntEnum (or equivalent integer), optional. Default is Kind.normal. See Kind for options.
device_manager : DeviceManager from BEC, optional . Within startup of simulation, device_manager is passed on automatically.
"""
USER_ACCESS = ["sim", "registered_proxies"]
sim_cls = SimulatedPositioner
readback = Cpt(
ReadOnlySignal, name="readback", value=0, kind=Kind.hinted, compute_readback=False
)
def __init__(
self,
name: str,
*,
precision: int = 3,
parent=None,
kind=None,
device_manager=None,
# TODO remove after refactoring config
delay: int = 1,
update_frequency: int = 100,
**kwargs,
):
self.sim = self.sim_cls(parent=self, **kwargs)
self.precision = precision
self.device_manager = device_manager
self._registered_proxies = {}
super().__init__(name=name, parent=parent, kind=kind, **kwargs)
self.sim.sim_state[self.name] = self.sim.sim_state.pop(self.readback.name, None)
self.readback.name = self.name
@property
def registered_proxies(self) -> None:
"""Dictionary of registered signal_names and proxies."""
return self._registered_proxies
@property
def hints(self):
"""Return the hints of the simulated device."""
return {"fields": ["flyer_samx", "flyer_samy"]}
@property
def egu(self) -> str:
"""Return the engineering units of the simulated device."""
return "mm"
def complete(self) -> StatusBase:
"""Complete the motion of the simulated device."""
status = DeviceStatus(self)
status.set_finished()
return status
def kickoff(self, metadata, num_pos, positions, exp_time: float = 0):
"""Kickoff the flyer to execute code during the scan."""
positions = np.asarray(positions)
def produce_data(device, metadata):
"""Simulate the data being produced by the flyer."""
buffer_time = 0.2
elapsed_time = 0
bundle = messages.BundleMessage()
for ii in range(num_pos):
bundle.append(
messages.DeviceMessage(
signals={
"flyer_samx": {"value": positions[ii, 0], "timestamp": 0},
"flyer_samy": {"value": positions[ii, 1], "timestamp": 0},
},
metadata={"point_id": ii, **metadata},
)
)
time.sleep(exp_time)
elapsed_time += exp_time
if elapsed_time > buffer_time:
elapsed_time = 0
logger.info(f"Sending data point {ii} for {device.name}.")
device.device_manager.connector.set_and_publish(
MessageEndpoints.device_read(device.name), bundle
)
bundle = messages.BundleMessage()
device.device_manager.connector.set(
MessageEndpoints.device_status(device.name),
messages.DeviceStatusMessage(
device=device.name, status=1, metadata={"point_id": ii, **metadata}
),
)
device.device_manager.connector.set_and_publish(
MessageEndpoints.device_read(device.name), bundle
)
device.device_manager.connector.set(
MessageEndpoints.device_status(device.name),
messages.DeviceStatusMessage(
device=device.name, status=0, metadata={"point_id": num_pos, **metadata}
),
)
print("done")
flyer = threading.Thread(target=produce_data, args=(self, metadata))
flyer.start()

View File

@ -1,10 +1,9 @@
"""This module provides an asynchronous monitor to simulate the behaviour of a device sending data not in sync with the point ID."""
from typing import Literal
import numpy as np
from bec_lib import messages
from bec_lib.endpoints import MessageEndpoints
from bec_lib.logger import bec_logger
from ophyd import Component as Cpt
from ophyd import Device, Kind
from typeguard import typechecked
@ -16,6 +15,65 @@ from ophyd_devices.interfaces.base_classes.psi_detector_base import (
from ophyd_devices.sim.sim_data import SimulatedDataMonitor
from ophyd_devices.sim.sim_signals import ReadOnlySignal, SetableSignal
logger = bec_logger.logger
class SimMonitor(Device):
"""
A simulated device mimic any 1D Axis (position, temperature, beam).
It's readback is a computed signal, which is configurable by the user and from the command line.
The corresponding simulation class is sim_cls=SimulatedDataMonitor, more details on defaults within the simulation class.
>>> monitor = SimMonitor(name="monitor")
Parameters
----------
name (string) : Name of the device. This is the only required argmuent, passed on to all signals of the device.
precision (integer) : Precision of the readback in digits, written to .describe(). Default is 3 digits.
sim_init (dict) : Dictionary to initiate parameters of the simulation, check simulation type defaults for more details.
parent : Parent device, optional, is used internally if this signal/device is part of a larger device.
kind : A member the Kind IntEnum (or equivalent integer), optional. Default is Kind.normal. See Kind for options.
device_manager : DeviceManager from BEC, optional . Within startup of simulation, device_manager is passed on automatically.
"""
USER_ACCESS = ["sim", "registered_proxies"]
sim_cls = SimulatedDataMonitor
BIT_DEPTH = np.uint32
readback = Cpt(ReadOnlySignal, value=BIT_DEPTH(0), kind=Kind.hinted, compute_readback=True)
SUB_READBACK = "readback"
_default_sub = SUB_READBACK
def __init__(
self,
name,
*,
precision: int = 3,
sim_init: dict = None,
parent=None,
kind=None,
device_manager=None,
**kwargs,
):
self.precision = precision
self.init_sim_params = sim_init
self.device_manager = device_manager
self.sim = self.sim_cls(parent=self, **kwargs)
self._registered_proxies = {}
super().__init__(name=name, parent=parent, kind=kind, **kwargs)
self.sim.sim_state[self.name] = self.sim.sim_state.pop(self.readback.name, None)
self.readback.name = self.name
@property
def registered_proxies(self) -> None:
"""Dictionary of registered signal_names and proxies."""
return self._registered_proxies
class SimMonitorAsyncPrepare(CustomDetectorMixin):
"""Custom prepare for the SimMonitorAsync class."""

View File

@ -0,0 +1,233 @@
import threading
import time as ttime
import numpy as np
from bec_lib.logger import bec_logger
from ophyd import Component as Cpt
from ophyd import Device, DeviceStatus, Kind, PositionerBase
from ophyd.utils import LimitError
from ophyd_devices.sim.sim_data import SimulatedPositioner
from ophyd_devices.sim.sim_exception import DeviceStop
from ophyd_devices.sim.sim_signals import ReadOnlySignal, SetableSignal
from ophyd_devices.sim.sim_test_devices import DummyController
logger = bec_logger.logger
class SimPositioner(Device, PositionerBase):
"""
A simulated device mimicing any 1D Axis device (position, temperature, rotation).
>>> motor = SimPositioner(name="motor")
Parameters
----------
name (string) : Name of the device. This is the only required argmuent, passed on to all signals of the device.\
Optional parameters:
----------
delay (int) : If 0, execution of move will be instant. If 1, exectution will depend on motor velocity. Default is 1.
update_frequency (int) : Frequency in Hz of the update of the simulated state during a move. Default is 2 Hz.
precision (integer) : Precision of the readback in digits, written to .describe(). Default is 3 digits.
limits (tuple) : Tuple of the low and high limits of the positioner. Overrides low/high_limit_travel is specified. Default is None.
parent : Parent device, optional, is used internally if this signal/device is part of a larger device.
kind : A member the Kind IntEnum (or equivalent integer), optional. Default is Kind.normal. See Kind for options.
device_manager : DeviceManager from BEC, optional . Within startup of simulation, device_manager is passed on automatically.
sim_init (dict) : Dictionary to initiate parameters of the simulation, check simulation type defaults for more details.
"""
# Specify which attributes are accessible via BEC client
USER_ACCESS = ["sim", "readback", "dummy_controller", "registered_proxies"]
sim_cls = SimulatedPositioner
# Define the signals as class attributes
readback = Cpt(ReadOnlySignal, name="readback", value=0, kind=Kind.hinted)
setpoint = Cpt(SetableSignal, value=0, kind=Kind.normal)
motor_is_moving = Cpt(SetableSignal, value=0, kind=Kind.normal)
# Config signals
velocity = Cpt(SetableSignal, value=100, kind=Kind.config)
acceleration = Cpt(SetableSignal, value=1, kind=Kind.config)
tolerance = Cpt(SetableSignal, value=0.5, kind=Kind.config)
# Ommitted signals
high_limit_travel = Cpt(SetableSignal, value=0, kind=Kind.omitted)
low_limit_travel = Cpt(SetableSignal, value=0, kind=Kind.omitted)
unused = Cpt(SetableSignal, value=1, kind=Kind.omitted)
SUB_READBACK = "readback"
_default_sub = SUB_READBACK
# pylint: disable=too-many-arguments
def __init__(
self,
name,
*,
delay: int = 1,
update_frequency=2,
precision=3,
limits=None,
parent=None,
kind=None,
device_manager=None,
sim_init: dict = None,
**kwargs,
):
self.delay = delay
self.device_manager = device_manager
self.precision = precision
self.init_sim_params = sim_init
self._registered_proxies = {}
self.update_frequency = update_frequency
self._stopped = False
self.dummy_controller = DummyController()
self.sim = self.sim_cls(parent=self, **kwargs)
super().__init__(name=name, parent=parent, kind=kind, **kwargs)
self.sim.sim_state[self.name] = self.sim.sim_state.pop(self.readback.name, None)
self.readback.name = self.name
if limits is not None:
assert len(limits) == 2
self.low_limit_travel.put(limits[0])
self.high_limit_travel.put(limits[1])
# @property
# def connected(self):
# """Return the connected state of the simulated device."""
# return self.dummy_controller.connected
@property
def limits(self):
"""Return the limits of the simulated device."""
return (self.low_limit_travel.get(), self.high_limit_travel.get())
@property
def low_limit(self):
"""Return the low limit of the simulated device."""
return self.limits[0]
@property
def high_limit(self):
"""Return the high limit of the simulated device."""
return self.limits[1]
def registered_proxies(self) -> None:
"""Dictionary of registered signal_names and proxies."""
return self._registered_proxies
# pylint: disable=arguments-differ
def check_value(self, value: any):
"""
Check that requested position is within existing limits.
This function has to be implemented on the top level of the positioner.
"""
low_limit, high_limit = self.limits
if low_limit < high_limit and not low_limit <= value <= high_limit:
raise LimitError(f"position={value} not within limits {self.limits}")
def _set_sim_state(self, signal_name: str, value: any) -> None:
"""Update the simulated state of the device."""
self.sim.sim_state[signal_name]["value"] = value
self.sim.sim_state[signal_name]["timestamp"] = ttime.time()
def _get_sim_state(self, signal_name: str) -> any:
"""Return the simulated state of the device."""
return self.sim.sim_state[signal_name]["value"]
def move(self, value: float, **kwargs) -> DeviceStatus:
"""Change the setpoint of the simulated device, and simultaneously initiated a motion."""
self._stopped = False
self.check_value(value)
old_setpoint = self._get_sim_state(self.setpoint.name)
self._set_sim_state(self.motor_is_moving.name, 1)
self._set_sim_state(self.setpoint.name, value)
def update_state(val):
"""Update the state of the simulated device."""
if self._stopped:
raise DeviceStop
old_readback = self._get_sim_state(self.readback.name)
self._set_sim_state(self.readback.name, val)
# Run subscription on "readback"
self._run_subs(
sub_type=self.SUB_READBACK,
old_value=old_readback,
value=self.sim.sim_state[self.readback.name]["value"],
timestamp=self.sim.sim_state[self.readback.name]["timestamp"],
)
st = DeviceStatus(device=self)
if self.delay:
def move_and_finish():
"""Move the simulated device and finish the motion."""
success = True
try:
move_val = self._get_sim_state(
self.setpoint.name
) + self.tolerance.get() * np.random.uniform(-1, 1)
updates = np.ceil(
np.abs(old_setpoint - move_val)
/ self.velocity.get()
* self.update_frequency
)
for ii in np.linspace(old_setpoint, move_val, int(updates)):
ttime.sleep(1 / self.update_frequency)
update_state(ii)
update_state(move_val)
self._set_sim_state(self.motor_is_moving, 0)
except DeviceStop:
success = False
finally:
self._stopped = False
self._done_moving(success=success)
self._set_sim_state(self.motor_is_moving.name, 0)
st.set_finished()
threading.Thread(target=move_and_finish, daemon=True).start()
else:
update_state(value)
self._done_moving()
self._set_sim_state(self.motor_is_moving.name, 0)
st.set_finished()
return st
def stop(self, *, success=False):
"""Stop the motion of the simulated device."""
super().stop(success=success)
self._stopped = True
@property
def position(self) -> float:
"""Return the current position of the simulated device."""
return self.readback.get()
@property
def egu(self):
"""Return the engineering units of the simulated device."""
return "mm"
class SimPositionerWithCommFailure(SimPositioner):
fails = Cpt(SetableSignal, value=0)
def move(self, value: float, **kwargs) -> DeviceStatus:
if self.fails.get() == 1:
raise RuntimeError("Communication failure")
if self.fails.get() == 2:
while not self._stopped:
ttime.sleep(1)
status = DeviceStatus(self)
status.set_exception(RuntimeError("Communication failure"))
return super().move(value, **kwargs)

View File

@ -0,0 +1,148 @@
import os
import threading
import numpy as np
from bec_lib.logger import bec_logger
from ophyd import Component as Cpt
from ophyd import Device, DeviceStatus, Kind
from ophyd_devices.sim.sim_data import SimulatedDataWaveform
from ophyd_devices.sim.sim_signals import ReadOnlySignal, SetableSignal
from ophyd_devices.utils.bec_scaninfo_mixin import BecScaninfoMixin
logger = bec_logger.logger
class SimWaveform(Device):
"""A simulated device mimic any 1D Waveform detector.
It's waveform is a computed signal, which is configurable by the user and from the command line.
The corresponding simulation class is sim_cls=SimulatedDataWaveform, more details on defaults within the simulation class.
>>> waveform = SimWaveform(name="waveform")
Parameters
----------
name (string) : Name of the device. This is the only required argmuent, passed on to all signals of the device.
precision (integer) : Precision of the readback in digits, written to .describe(). Default is 3 digits.
sim_init (dict) : Dictionary to initiate parameters of the simulation, check simulation type defaults for more details.
parent : Parent device, optional, is used internally if this signal/device is part of a larger device.
kind : A member the Kind IntEnum (or equivalent integer), optional. Default is Kind.normal. See Kind for options.
device_manager : DeviceManager from BEC, optional . Within startup of simulation, device_manager is passed on automatically.
"""
USER_ACCESS = ["sim", "registered_proxies"]
sim_cls = SimulatedDataWaveform
SHAPE = (1000,)
BIT_DEPTH = np.uint16
SUB_MONITOR = "monitor"
_default_sub = SUB_MONITOR
exp_time = Cpt(SetableSignal, name="exp_time", value=1, kind=Kind.config)
file_path = Cpt(SetableSignal, name="file_path", value="", kind=Kind.config)
file_pattern = Cpt(SetableSignal, name="file_pattern", value="", kind=Kind.config)
frames = Cpt(SetableSignal, name="frames", value=1, kind=Kind.config)
burst = Cpt(SetableSignal, name="burst", value=1, kind=Kind.config)
waveform_shape = Cpt(SetableSignal, name="waveform_shape", value=SHAPE, kind=Kind.config)
waveform = Cpt(
ReadOnlySignal,
name="waveform",
value=np.empty(SHAPE, dtype=BIT_DEPTH),
compute_readback=True,
kind=Kind.omitted,
)
def __init__(
self, name, *, kind=None, parent=None, sim_init: dict = None, device_manager=None, **kwargs
):
self.device_manager = device_manager
self.init_sim_params = sim_init
self._registered_proxies = {}
self.sim = self.sim_cls(parent=self, **kwargs)
super().__init__(name=name, parent=parent, kind=kind, **kwargs)
self._stopped = False
self._staged = False
self.scaninfo = None
self._update_scaninfo()
@property
def registered_proxies(self) -> None:
"""Dictionary of registered signal_names and proxies."""
return self._registered_proxies
def trigger(self) -> DeviceStatus:
"""Trigger the camera to acquire images.
This method can be called from BEC during a scan. It will acquire images and send them to BEC.
Whether the trigger is send from BEC is determined by the softwareTrigger argument in the device config.
Here, we also run a callback on SUB_MONITOR to send the image data the device_monitor endpoint in BEC.
"""
status = DeviceStatus(self)
self.subscribe(status._finished, event_type=self.SUB_ACQ_DONE, run=False)
def acquire():
try:
for _ in range(self.burst.get()):
self._run_subs(sub_type=self.SUB_MONITOR, value=self.waveform.get())
if self._stopped:
raise DeviceStop
except DeviceStop:
pass
finally:
self._stopped = False
self._done_acquiring()
threading.Thread(target=acquire, daemon=True).start()
return status
def _update_scaninfo(self) -> None:
"""Update scaninfo from BecScaninfoMixing
This depends on device manager and operation/sim_mode
"""
self.scaninfo = BecScaninfoMixin(self.device_manager)
def stage(self) -> list[object]:
"""Stage the camera for upcoming scan
This method is called from BEC in preparation of a scan.
It receives metadata about the scan from BEC,
compiles it and prepares the camera for the scan.
FYI: No data is written to disk in the simulation, but upon each trigger it
is published to the device_monitor endpoint in REDIS.
"""
if self._staged:
return super().stage()
self.scaninfo.load_scan_metadata()
self.file_path.set(
os.path.join(
self.file_path.get(), self.file_pattern.get().format(self.scaninfo.scan_number)
)
)
self.frames.set(self.scaninfo.num_points * self.scaninfo.frames_per_trigger)
self.exp_time.set(self.scaninfo.exp_time)
self.burst.set(self.scaninfo.frames_per_trigger)
self._stopped = False
return super().stage()
def unstage(self) -> list[object]:
"""Unstage the device
Send reads from all config signals to redis
"""
if self._stopped is True or not self._staged:
return super().unstage()
return super().unstage()
def stop(self, *, success=False):
"""Stop the device"""
self._stopped = True
super().stop(success=success)

View File

@ -20,9 +20,11 @@ from ophyd_devices.interfaces.protocols.bec_protocols import (
BECScanProtocol,
BECSignalProtocol,
)
from ophyd_devices.sim.sim import SimCamera, SimFlyer, SimMonitor, SimPositioner
from ophyd_devices.sim.sim_camera import SimCamera
from ophyd_devices.sim.sim_flyer import SimFlyer
from ophyd_devices.sim.sim_frameworks import H5ImageReplayProxy, SlitProxy
from ophyd_devices.sim.sim_monitor_async import SimMonitorAsync
from ophyd_devices.sim.sim_monitor import SimMonitor, SimMonitorAsync
from ophyd_devices.sim.sim_positioner import SimPositioner
from ophyd_devices.sim.sim_signals import ReadOnlySignal
from ophyd_devices.sim.sim_utils import H5Writer
from ophyd_devices.utils.bec_device_base import BECDevice, BECDeviceBase