fix: separated core simulation classes from additional devices

This commit is contained in:
2024-02-09 22:00:30 +01:00
parent 6fb912bd9c
commit 2225dafb74
3 changed files with 265 additions and 220 deletions

View File

@ -4,13 +4,9 @@ from .galil.galil_ophyd import GalilMotor
from .galil.sgalil_ophyd import SGalilMotor
from .npoint.npoint import NPointAxis
from .rt_lamni import RtLamniMotor
from .sim.sim import SimPositioner, SimMonitor
from .sim.sim import SimPositioner as SynAxisOPAAS
from .sim.sim import SimMonitor as SynAxisMonitor
from .sim.sim import SimMonitor as SynGaussBEC
from .sim.sim import SimCamera as SynSLSDetector
from .sim.sim_signals import ReadOnlySignal as SynSignalRO
from .sim.sim import SynDeviceOPAAS, SynFlyer
from .sim.sim import SimPositioner, SimMonitor, SimCamera, SynDeviceOPAAS, SynFlyer
from .sim.sim_signals import ReadOnlySignal, SetableSignal, ComputedReadOnlySignal
from .sls_devices.sls_devices import SLSInfo, SLSOperatorMessages
from .smaract.smaract_ophyd import SmaractMotor
from .utils.static_device_test import launch
from .utils.bec_device_base import BECDeviceBase

View File

@ -6,12 +6,13 @@ import warnings
import numpy as np
from bec_lib import MessageEndpoints, bec_logger, messages
from ophyd import Component as Cpt
from ophyd import Device, DeviceStatus, Kind
from ophyd import DynamicDeviceComponent as Dcpt
from ophyd import Device, DeviceStatus, Kind
from ophyd import PositionerBase, Signal
from ophyd.sim import SynSignal
from ophyd.utils import LimitError
from ophyd_devices.epics.devices.bec_scaninfo_mixin import BecScaninfoMixin
from ophyd_devices.utils.bec_scaninfo_mixin import BecScaninfoMixin
from ophyd_devices.sim.sim_data import SimulatedDataBase, SimulatedDataCamera, SimulatedDataMonitor
from ophyd_devices.sim.sim_additional_devices import DummyController
@ -102,7 +103,9 @@ class SimCamera(Device):
exp_time = Cpt(SetableSignal, name="exp_time", value=1, kind=Kind.config)
file_path = Cpt(SetableSignal, name="file_path", value="", kind=Kind.config)
file_pattern = Cpt(SetableSignal, name="file_pattern", value="", kind=Kind.config)
frames = Cpt(SetableSignal, name="frames", value=1, kind=Kind.config)
burst = Cpt(SetableSignal, name="burst", value=1, kind=Kind.config)
image_shape = Cpt(SetableSignal, name="image_shape", value=SHAPE, kind=Kind.config)
image = Cpt(
@ -128,6 +131,7 @@ class SimCamera(Device):
super().__init__(name=name, parent=parent, kind=kind, **kwargs)
self._stopped = False
self._staged = False
self.scaninfo = None
self._update_scaninfo()
@ -175,15 +179,16 @@ class SimCamera(Device):
is published to the device_monitor endpoint in REDIS.
"""
if self._staged:
return super().stag e()
return super().stage()
self.scaninfo.load_scan_metadata()
self.file_path.set(
os.path.join(
self.file_path.get(), self.file_pattern.get().format(self.scaninfo.scan_number)
)
)
self.frames = self.scaninfo.num_points * self.scaninfo.frames_per_trigger
self.exp_time = self.scaninfo.exp_time
self.frames.set(self.scaninfo.num_points * self.scaninfo.frames_per_trigger)
self.exp_time.set(self.scaninfo.exp_time)
self.burst.set(self.scaninfo.frames_per_trigger)
self._stopped = False
return super().stage()
@ -199,11 +204,9 @@ class SimCamera(Device):
if signal.item._kind == Kind.config
}
signals = {"config": config_readout, "data": self.file_name}
signals = {"config": config_readout, "data": self.file_path.get()}
msg = messages.DeviceMessage(signals=signals, metadata=self.scaninfo.metadata)
self.device_manager.producer.set_and_publish(
MessageEndpoints.device_read(self.name), msg.dumps()
)
self.device_manager.producer.set_and_publish(MessageEndpoints.device_read(self.name), msg)
def unstage(self) -> list[object]:
"""Unstage the device
@ -222,205 +225,6 @@ class SimCamera(Device):
super().stop(success=success)
class DummyControllerDevice(Device):
USER_ACCESS = ["controller"]
class SynFlyer(Device, PositionerBase):
def __init__(
self,
*,
name,
readback_func=None,
value=0,
delay=0,
speed=1,
update_frequency=2,
precision=3,
parent=None,
labels=None,
kind=None,
device_manager=None,
**kwargs,
):
if readback_func is None:
def readback_func(x):
return x
sentinel = object()
loop = kwargs.pop("loop", sentinel)
if loop is not sentinel:
warnings.warn(
f"{self.__class__} no longer takes a loop as input. "
"Your input will be ignored and may raise in the future",
stacklevel=2,
)
self.sim_state = {}
self._readback_func = readback_func
self.delay = delay
self.precision = precision
self.tolerance = kwargs.pop("tolerance", 0.5)
self.device_manager = device_manager
# initialize values
self.sim_state["readback"] = readback_func(value)
self.sim_state["readback_ts"] = ttime.time()
super().__init__(name=name, parent=parent, labels=labels, kind=kind, **kwargs)
@property
def hints(self):
return {"fields": ["flyer_samx", "flyer_samy"]}
def kickoff(self, metadata, num_pos, positions, exp_time: float = 0):
positions = np.asarray(positions)
def produce_data(device, metadata):
buffer_time = 0.2
elapsed_time = 0
bundle = messages.BundleMessage()
for ii in range(num_pos):
bundle.append(
messages.DeviceMessage(
signals={
self.name: {
"flyer_samx": {"value": positions[ii, 0], "timestamp": 0},
"flyer_samy": {"value": positions[ii, 1], "timestamp": 0},
}
},
metadata={"pointID": ii, **metadata},
).dumps()
)
ttime.sleep(exp_time)
elapsed_time += exp_time
if elapsed_time > buffer_time:
elapsed_time = 0
device.device_manager.producer.send(
MessageEndpoints.device_read(device.name), bundle.dumps()
)
bundle = messages.BundleMessage()
device.device_manager.producer.set_and_publish(
MessageEndpoints.device_status(device.name),
messages.DeviceStatusMessage(
device=device.name,
status=1,
metadata={"pointID": ii, **metadata},
).dumps(),
)
device.device_manager.producer.send(
MessageEndpoints.device_read(device.name), bundle.dumps()
)
device.device_manager.producer.set_and_publish(
MessageEndpoints.device_status(device.name),
messages.DeviceStatusMessage(
device=device.name,
status=0,
metadata={"pointID": num_pos, **metadata},
).dumps(),
)
print("done")
flyer = threading.Thread(target=produce_data, args=(self, metadata))
flyer.start()
class SynFlyerLamNI(Device, PositionerBase):
def __init__(
self,
*,
name,
readback_func=None,
value=0,
delay=0,
speed=1,
update_frequency=2,
precision=3,
parent=None,
labels=None,
kind=None,
device_manager=None,
**kwargs,
):
if readback_func is None:
def readback_func(x):
return x
sentinel = object()
loop = kwargs.pop("loop", sentinel)
if loop is not sentinel:
warnings.warn(
f"{self.__class__} no longer takes a loop as input. "
"Your input will be ignored and may raise in the future",
stacklevel=2,
)
self.sim_state = {}
self._readback_func = readback_func
self.delay = delay
self.precision = precision
self.tolerance = kwargs.pop("tolerance", 0.5)
self.device_manager = device_manager
# initialize values
self.sim_state["readback"] = readback_func(value)
self.sim_state["readback_ts"] = ttime.time()
super().__init__(name=name, parent=parent, labels=labels, kind=kind, **kwargs)
self.controller = SynController(name="SynController")
def kickoff(self, metadata, num_pos, positions, exp_time: float = 0):
positions = np.asarray(positions)
def produce_data(device, metadata):
buffer_time = 0.2
elapsed_time = 0
bundle = messages.BundleMessage()
for ii in range(num_pos):
bundle.append(
messages.DeviceMessage(
signals={
"syn_flyer_lamni": {
"flyer_samx": {"value": positions[ii, 0], "timestamp": 0},
"flyer_samy": {"value": positions[ii, 1], "timestamp": 0},
}
},
metadata={"pointID": ii, **metadata},
).dumps()
)
ttime.sleep(exp_time)
elapsed_time += exp_time
if elapsed_time > buffer_time:
elapsed_time = 0
device.device_manager.producer.send(
MessageEndpoints.device_read(device.name), bundle.dumps()
)
bundle = messages.BundleMessage()
device.device_manager.producer.set_and_publish(
MessageEndpoints.device_status(device.name),
messages.DeviceStatusMessage(
device=device.name,
status=1,
metadata={"pointID": ii, **metadata},
).dumps(),
)
device.device_manager.producer.send(
MessageEndpoints.device_read(device.name), bundle.dumps()
)
device.device_manager.producer.set_and_publish(
MessageEndpoints.device_status(device.name),
messages.DeviceStatusMessage(
device=device.name,
status=0,
metadata={"pointID": num_pos, **metadata},
).dumps(),
)
print("done")
flyer = threading.Thread(target=produce_data, args=(self, metadata))
flyer.start()
class SimPositioner(Device, PositionerBase):
"""
A simulated device mimicing any 1D Axis device (position, temperature, rotation).
@ -625,6 +429,105 @@ class SimPositioner(Device, PositionerBase):
return "mm"
class SynFlyer(Device, PositionerBase):
def __init__(
self,
*,
name,
readback_func=None,
value=0,
delay=0,
speed=1,
update_frequency=2,
precision=3,
parent=None,
labels=None,
kind=None,
device_manager=None,
**kwargs,
):
if readback_func is None:
def readback_func(x):
return x
sentinel = object()
loop = kwargs.pop("loop", sentinel)
if loop is not sentinel:
warnings.warn(
f"{self.__class__} no longer takes a loop as input. "
"Your input will be ignored and may raise in the future",
stacklevel=2,
)
self.sim_state = {}
self._readback_func = readback_func
self.delay = delay
self.precision = precision
self.tolerance = kwargs.pop("tolerance", 0.5)
self.device_manager = device_manager
# initialize values
self.sim_state["readback"] = readback_func(value)
self.sim_state["readback_ts"] = ttime.time()
super().__init__(name=name, parent=parent, labels=labels, kind=kind, **kwargs)
@property
def hints(self):
return {"fields": ["flyer_samx", "flyer_samy"]}
def kickoff(self, metadata, num_pos, positions, exp_time: float = 0):
positions = np.asarray(positions)
def produce_data(device, metadata):
buffer_time = 0.2
elapsed_time = 0
bundle = messages.BundleMessage()
for ii in range(num_pos):
bundle.append(
messages.DeviceMessage(
signals={
self.name: {
"flyer_samx": {"value": positions[ii, 0], "timestamp": 0},
"flyer_samy": {"value": positions[ii, 1], "timestamp": 0},
}
},
metadata={"pointID": ii, **metadata},
).dumps()
)
ttime.sleep(exp_time)
elapsed_time += exp_time
if elapsed_time > buffer_time:
elapsed_time = 0
device.device_manager.producer.send(
MessageEndpoints.device_read(device.name), bundle.dumps()
)
bundle = messages.BundleMessage()
device.device_manager.producer.set_and_publish(
MessageEndpoints.device_status(device.name),
messages.DeviceStatusMessage(
device=device.name,
status=1,
metadata={"pointID": ii, **metadata},
).dumps(),
)
device.device_manager.producer.send(
MessageEndpoints.device_read(device.name), bundle.dumps()
)
device.device_manager.producer.set_and_publish(
MessageEndpoints.device_status(device.name),
messages.DeviceStatusMessage(
device=device.name,
status=0,
metadata={"pointID": num_pos, **metadata},
).dumps(),
)
print("done")
flyer = threading.Thread(target=produce_data, args=(self, metadata))
flyer.start()
class SynDynamicComponents(Device):
messages = Dcpt({f"message{i}": (SynSignal, None, {"name": f"msg{i}"}) for i in range(1, 6)})
@ -636,8 +539,4 @@ class SynDeviceSubOPAAS(Device):
class SynDeviceOPAAS(Device):
x = Cpt(SimPositioner, name="x")
y = Cpt(SimPositioner, name="y")
z = Cpt(SynDeviceSubOPAAS, name="z")
if __name__ == "__main__":
gauss = SynGaussBEC(name="gauss")
z = Cpt(SimPositioner, name="z")

View File

@ -0,0 +1,150 @@
import time as ttime
import threading
import numpy as np
from ophyd import OphydObject, Device, PositionerBase
from bec_lib import messages, MessageEndpoints
class DummyControllerDevice(Device):
USER_ACCESS = ["controller"]
class DummyController:
USER_ACCESS = [
"some_var",
"controller_show_all",
"_func_with_args",
"_func_with_args_and_kwargs",
"_func_with_kwargs",
"_func_without_args_kwargs",
]
some_var = 10
another_var = 20
def on(self):
self._connected = True
def off(self):
self._connected = False
def _func_with_args(self, *args):
return args
def _func_with_args_and_kwargs(self, *args, **kwargs):
return args, kwargs
def _func_with_kwargs(self, **kwargs):
return kwargs
def _func_without_args_kwargs(self):
return None
def controller_show_all(self):
"""dummy controller show all
Raises:
in: _description_
LimitError: _description_
Returns:
_type_: _description_
"""
print(self.some_var)
class SynController(OphydObject):
def on(self):
pass
def off(self):
pass
class SynFlyerLamNI(Device, PositionerBase):
def __init__(
self,
*,
name,
readback_func=None,
value=0,
delay=0,
speed=1,
update_frequency=2,
precision=3,
parent=None,
labels=None,
kind=None,
device_manager=None,
**kwargs,
):
if readback_func is None:
def readback_func(x):
return x
self.sim_state = {}
self._readback_func = readback_func
self.delay = delay
self.precision = precision
self.tolerance = kwargs.pop("tolerance", 0.5)
self.device_manager = device_manager
# initialize values
self.sim_state["readback"] = readback_func(value)
self.sim_state["readback_ts"] = ttime.time()
super().__init__(name=name, parent=parent, labels=labels, kind=kind, **kwargs)
self.controller = SynController(name="SynController")
def kickoff(self, metadata, num_pos, positions, exp_time: float = 0):
positions = np.asarray(positions)
def produce_data(device, metadata):
buffer_time = 0.2
elapsed_time = 0
bundle = messages.BundleMessage()
for ii in range(num_pos):
bundle.append(
messages.DeviceMessage(
signals={
"syn_flyer_lamni": {
"flyer_samx": {"value": positions[ii, 0], "timestamp": 0},
"flyer_samy": {"value": positions[ii, 1], "timestamp": 0},
}
},
metadata={"pointID": ii, **metadata},
).dumps()
)
ttime.sleep(exp_time)
elapsed_time += exp_time
if elapsed_time > buffer_time:
elapsed_time = 0
device.device_manager.producer.send(
MessageEndpoints.device_read(device.name), bundle.dumps()
)
bundle = messages.BundleMessage()
device.device_manager.producer.set_and_publish(
MessageEndpoints.device_status(device.name),
messages.DeviceStatusMessage(
device=device.name,
status=1,
metadata={"pointID": ii, **metadata},
).dumps(),
)
device.device_manager.producer.send(
MessageEndpoints.device_read(device.name), bundle.dumps()
)
device.device_manager.producer.set_and_publish(
MessageEndpoints.device_status(device.name),
messages.DeviceStatusMessage(
device=device.name,
status=0,
metadata={"pointID": num_pos, **metadata},
).dumps(),
)
print("done")
flyer = threading.Thread(target=produce_data, args=(self, metadata))
flyer.start()