added flyer sim

This commit is contained in:
wakonig_k 2022-07-25 14:01:05 +02:00
parent 298094d58b
commit 619ff60e73
3 changed files with 87 additions and 4 deletions

View File

@ -1,6 +1,6 @@
from .galil.galil_ophyd import GalilMotor from .galil.galil_ophyd import GalilMotor
from .npoint.npoint import NPointAxis from .npoint.npoint import NPointAxis
from .smaract.smaract_ophyd import SmaractMotor
from .sim.sim import SynAxisMonitor, SynAxisOPAAS
from .sls_devices.sls_devices import SLSOperatorMessages
from .rt_lamni import RtLamniMotor from .rt_lamni import RtLamniMotor
from .sim.sim import SynAxisMonitor, SynAxisOPAAS, SynFlyer
from .sls_devices.sls_devices import SLSOperatorMessages
from .smaract.smaract_ophyd import SmaractMotor

View File

@ -1 +1 @@
from .sim import SynAxisMonitor, SynAxisOPAAS from .sim import SynAxisMonitor, SynAxisOPAAS, SynFlyer

View File

@ -3,6 +3,7 @@ import time as ttime
import warnings import warnings
import numpy as np import numpy as np
from bec_utils import BECMessage, MessageEndpoints
from ophyd import Component as Cpt from ophyd import Component as Cpt
from ophyd import Device, DeviceStatus, PositionerBase, Signal from ophyd import Device, DeviceStatus, PositionerBase, Signal
from ophyd.sim import _ReadbackSignal, _SetpointSignal from ophyd.sim import _ReadbackSignal, _SetpointSignal
@ -208,6 +209,88 @@ class DummyController:
print(self.some_var) print(self.some_var)
class SynFlyer(Device, PositionerBase):
def __init__(
self,
*,
name,
readback_func=None,
value=0,
delay=0,
speed=1,
update_frequency=2,
precision=3,
parent=None,
labels=None,
kind=None,
device_manager=None,
**kwargs,
):
if readback_func is None:
def readback_func(x):
return x
sentinel = object()
loop = kwargs.pop("loop", sentinel)
if loop is not sentinel:
warnings.warn(
f"{self.__class__} no longer takes a loop as input. "
"Your input will be ignored and may raise in the future",
stacklevel=2,
)
self.sim_state = {}
self._readback_func = readback_func
self.delay = delay
self.precision = precision
self.tolerance = kwargs.pop("tolerance", 0.5)
self.device_manager = device_manager
# initialize values
self.sim_state["readback"] = readback_func(value)
self.sim_state["readback_ts"] = ttime.time()
super().__init__(name=name, parent=parent, labels=labels, kind=kind, **kwargs)
def kickoff(self, metadata, num_pos):
def produce_data(device, metadata):
for ii in range(num_pos):
device.device_manager.producer.send(
MessageEndpoints.device_read(device.name),
BECMessage.DeviceMessage(
signals={
"flyer_samx": {"value": ii * 1.22, "timestamp": 0},
"flyer_samy": {"value": ii * 1.45, "timestamp": 0},
},
metadata={"pointID": ii, **metadata},
).dumps(),
)
ttime.sleep(0.05)
if ii % 100 == 0:
device.device_manager.producer.set_and_publish(
MessageEndpoints.device_status(device.name),
BECMessage.DeviceStatusMessage(
device=device.name,
status=1,
metadata={"pointID": ii, **metadata},
).dumps(),
)
device.device_manager.producer.set_and_publish(
MessageEndpoints.device_status(device.name),
BECMessage.DeviceStatusMessage(
device=device.name,
status=0,
metadata={"pointID": num_pos, **metadata},
).dumps(),
)
print("done")
flyer = threading.Thread(target=produce_data, args=(self, metadata))
flyer.start()
# time.sleep(0.01)
class SynAxisOPAAS(Device, PositionerBase): class SynAxisOPAAS(Device, PositionerBase):
""" """
A synthetic settable Device mimic any 1D Axis (position, temperature). A synthetic settable Device mimic any 1D Axis (position, temperature).