feat: added otf sim

This commit is contained in:
wakonig_k 2023-05-03 18:11:27 +02:00
parent 7a889f3fa5
commit f3513207d9
3 changed files with 114 additions and 11 deletions

View File

@ -1,19 +1,19 @@
"""
ophyd device classes for X07MA beamline
"""
from collections import OrderedDict
import time
import traceback
from collections import OrderedDict
from typing import Any
from ophyd import Component as Cpt
from ophyd import FormattedComponent as FCpt
from ophyd import Device, EpicsSignal, EpicsSignalRO, Kind, PVPositioner, EpicsMotor
from ophyd.flyers import FlyerInterface
from ophyd.status import DeviceStatus, SubscriptionStatus
from ophyd.pv_positioner import PVPositionerComparator
import traceback
from bec_utils import bec_logger
from ophyd import Component as Cpt
from ophyd import Device, EpicsMotor, EpicsSignal, EpicsSignalRO
from ophyd import FormattedComponent as FCpt
from ophyd import Kind, PVPositioner
from ophyd.flyers import FlyerInterface
from ophyd.pv_positioner import PVPositionerComparator
from ophyd.status import DeviceStatus, SubscriptionStatus
logger = bec_logger.logger

View File

@ -1 +1,2 @@
from .sim import SynAxisMonitor, SynAxisOPAAS, SynFlyer, SynSignalRO, SynSLSDetector
from .sim_xtreme import SynXtremeOtf

View File

@ -7,7 +7,7 @@ from typing import List
import numpy as np
from bec_utils import BECMessage, MessageEndpoints, bec_logger
from ophyd import Component as Cpt
from ophyd import Device, DeviceStatus, PositionerBase, Signal
from ophyd import Device, DeviceStatus, OphydObject, PositionerBase, Signal
from ophyd.sim import _ReadbackSignal, _SetpointSignal
from ophyd.utils import LimitError, ReadOnlyError
@ -438,7 +438,109 @@ class SynFlyer(Device, PositionerBase):
flyer = threading.Thread(target=produce_data, args=(self, metadata))
flyer.start()
# time.sleep(0.01)
class SynController(OphydObject):
def on(self):
pass
def off(self):
pass
class SynFlyerLamNI(Device, PositionerBase):
def __init__(
self,
*,
name,
readback_func=None,
value=0,
delay=0,
speed=1,
update_frequency=2,
precision=3,
parent=None,
labels=None,
kind=None,
device_manager=None,
**kwargs,
):
if readback_func is None:
def readback_func(x):
return x
sentinel = object()
loop = kwargs.pop("loop", sentinel)
if loop is not sentinel:
warnings.warn(
f"{self.__class__} no longer takes a loop as input. "
"Your input will be ignored and may raise in the future",
stacklevel=2,
)
self.sim_state = {}
self._readback_func = readback_func
self.delay = delay
self.precision = precision
self.tolerance = kwargs.pop("tolerance", 0.5)
self.device_manager = device_manager
# initialize values
self.sim_state["readback"] = readback_func(value)
self.sim_state["readback_ts"] = ttime.time()
super().__init__(name=name, parent=parent, labels=labels, kind=kind, **kwargs)
self.controller = SynController(name="SynController")
def kickoff(self, metadata, num_pos, positions, exp_time: float = 0):
positions = np.asarray(positions)
def produce_data(device, metadata):
buffer_time = 0.2
elapsed_time = 0
bundle = BECMessage.BundleMessage()
for ii in range(num_pos):
bundle.append(
BECMessage.DeviceMessage(
signals={
"syn_flyer_lamni": {
"flyer_samx": {"value": positions[ii, 0], "timestamp": 0},
"flyer_samy": {"value": positions[ii, 1], "timestamp": 0},
}
},
metadata={"pointID": ii, **metadata},
).dumps()
)
ttime.sleep(exp_time)
elapsed_time += exp_time
if elapsed_time > buffer_time:
elapsed_time = 0
device.device_manager.producer.send(
MessageEndpoints.device_read(device.name), bundle.dumps()
)
bundle = BECMessage.BundleMessage()
device.device_manager.producer.set_and_publish(
MessageEndpoints.device_status(device.name),
BECMessage.DeviceStatusMessage(
device=device.name,
status=1,
metadata={"pointID": ii, **metadata},
).dumps(),
)
device.device_manager.producer.send(
MessageEndpoints.device_read(device.name), bundle.dumps()
)
device.device_manager.producer.set_and_publish(
MessageEndpoints.device_status(device.name),
BECMessage.DeviceStatusMessage(
device=device.name,
status=0,
metadata={"pointID": num_pos, **metadata},
).dumps(),
)
print("done")
flyer = threading.Thread(target=produce_data, args=(self, metadata))
flyer.start()
class SynAxisOPAAS(Device, PositionerBase):