refactor!: moved to new ophyd_devices repo structure

BREAKING CHANGE: cleaned up and migrated to the new repo structure. Only shared devices will be hosted in ophyd_devices. Everything else will be in the beamline-specific repositories
This commit is contained in:
2024-05-08 11:37:51 +02:00
parent be689baa29
commit 3415ae2007
38 changed files with 119 additions and 4434 deletions

View File

View File

@@ -0,0 +1,105 @@
from abc import ABC, abstractmethod
from bec_lib import bec_logger
from ophyd import Component as Cpt
from ophyd import EpicsMotor
from typeguard import typechecked
from ophyd_devices.interfaces.protocols.bec_protocols import BECRotationProtocol
from ophyd_devices.utils.bec_utils import ConfigSignal
logger = bec_logger.logger
class OphtyRotationBaseError(Exception):
"""Exception specific for implmenetation of rotation stages."""
class OphydRotationBase(BECRotationProtocol, ABC):
allow_mod360 = Cpt(ConfigSignal, name="allow_mod360", value=False, kind="config")
def __init__(self, *args, **kwargs):
"""
Base class to implement functionality specific for rotation devices.
Childrens should override the instance attributes:
- has_mod360
- has_freerun
- valid_rotation_modes
"""
# pylint: disable=protected-access
self._has_mod360 = False
self._has_freerun = False
self._valid_rotation_modes = []
if "allow_mod360" in kwargs:
if not isinstance(kwargs["allow_mod360"], bool):
raise ValueError("allow_mod360 must be a boolean")
self.allow_mod360.put(kwargs["allow_mod360"])
super().__init__(*args, **kwargs)
@abstractmethod
def apply_mod360(self) -> None:
"""Method to apply the modulus 360 operation on the specific device.
Childrens should override this method
"""
@property
def has_mod360(self) -> bool:
"""Property to check if the device has mod360 operation.
ReadOnly property, childrens should override this method.
"""
return self._has_mod360
@property
def has_freerun(self) -> bool:
"""Property to check if the device has freerun operation.
ReadOnly property, childrens should override this method.
"""
return self._has_freerun
@property
def valid_rotation_modes(self) -> list:
"""Method to get the valid rotation modes for the specific device."""
return self._valid_rotation_modes
@typechecked
@valid_rotation_modes.setter
def valid_rotation_modes(self, value: list[str]):
"""Method to set the valid rotation modes for the specific device."""
self._valid_rotation_modes = value
return self._valid_rotation_modes
# pylint: disable=too-many-ancestors
class EpicsRotationBase(OphydRotationBase, EpicsMotor):
"""Class for Epics rotation devices."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._has_freerun = True
self._has_freerun = True
self._valid_rotation_modes = ["target", "radiography"]
def apply_mod360(self) -> None:
"""Apply modulos 360 operation for EpicsMotorRecord.
EpicsMotor has the function "set_current_position" which can be used for this purpose.
In addition, there is a check if mod360 is allowed and available.
"""
if self.has_mod360 and self.allow_mod360.get():
cur_val = self.user_readback.get()
new_val = cur_val % 360
try:
self.set_current_position(new_val)
except Exception as exc:
error_msg = f"Failed to set new position {new_val} from {cur_val} on device {self.name} with error {exc}"
raise OphtyRotationBaseError(error_msg) from exc
return
logger.info(
f"Did not apply mod360 for device {self.name} with has_mod={self.has_mod360} and allow_mod={self.allow_mod360.get()}"
)

View File

@@ -0,0 +1,510 @@
import enum
import time
from typing import Any
from bec_lib import bec_logger
from ophyd import (
Component,
Device,
DeviceStatus,
EpicsSignal,
EpicsSignalRO,
Kind,
PVPositioner,
Signal,
)
from ophyd.device import Staged
from ophyd.pseudopos import (
PseudoPositioner,
PseudoSingle,
pseudo_position_argument,
real_position_argument,
)
from ophyd_devices.utils import bec_utils
from ophyd_devices.utils.bec_scaninfo_mixin import BecScaninfoMixin
logger = bec_logger.logger
class DelayGeneratorError(Exception):
"""Exception raised for errors."""
class DeviceInitError(DelayGeneratorError):
"""Error upon failed initialization, invoked by missing device manager or device not started in sim_mode."""
class DelayGeneratorNotOkay(DelayGeneratorError):
"""Error when DDG is not okay"""
class TriggerSource(enum.IntEnum):
"""
Class for trigger options of DG645
Used to set the trigger source of the DG645 by setting the value
e.g. source.put(TriggerSource.Internal)
Exp:
TriggerSource.Internal
"""
INTERNAL = 0
EXT_RISING_EDGE = 1
EXT_FALLING_EDGE = 2
SS_EXT_RISING_EDGE = 3
SS_EXT_FALLING_EDGE = 4
SINGLE_SHOT = 5
LINE = 6
class DelayStatic(Device):
"""
Static axis for the T0 output channel
It allows setting the logic levels, but the timing is fixed.
The signal is high after receiving the trigger until the end
of the holdoff period.
"""
# Other channel stuff
ttl_mode = Component(EpicsSignal, "OutputModeTtlSS.PROC", kind=Kind.config)
nim_mode = Component(EpicsSignal, "OutputModeNimSS.PROC", kind=Kind.config)
polarity = Component(
EpicsSignal,
"OutputPolarityBI",
write_pv="OutputPolarityBO",
name="polarity",
kind=Kind.config,
)
amplitude = Component(
EpicsSignal, "OutputAmpAI", write_pv="OutputAmpAO", name="amplitude", kind=Kind.config
)
offset = Component(
EpicsSignal, "OutputOffsetAI", write_pv="OutputOffsetAO", name="offset", kind=Kind.config
)
class DummyPositioner(PVPositioner):
"""Dummy Positioner to set AO, AI and ReferenceMO."""
setpoint = Component(EpicsSignal, "DelayAO", put_complete=True, kind=Kind.config)
readback = Component(EpicsSignalRO, "DelayAI", kind=Kind.config)
done = Component(Signal, value=1)
reference = Component(EpicsSignal, "ReferenceMO", put_complete=True, kind=Kind.config)
class DelayPair(PseudoPositioner):
"""
Delay pair interface
Virtual motor interface to a pair of signals (on the frontpanel - AB/CD/EF/GH).
It offers a simple delay and pulse width interface.
"""
# The pseudo positioner axes
delay = Component(PseudoSingle, limits=(0, 2000.0), name="delay")
width = Component(PseudoSingle, limits=(0, 2000.0), name="pulsewidth")
ch1 = Component(DummyPositioner, name="ch1")
ch2 = Component(DummyPositioner, name="ch2")
io = Component(DelayStatic, name="io")
def __init__(self, *args, **kwargs):
# Change suffix names before connecting (a bit of dynamic connections)
self.__class__.__dict__["ch1"].suffix = kwargs["channel"][0]
self.__class__.__dict__["ch2"].suffix = kwargs["channel"][1]
self.__class__.__dict__["io"].suffix = kwargs["channel"]
del kwargs["channel"]
# Call parent to start the connections
super().__init__(*args, **kwargs)
@pseudo_position_argument
def forward(self, pseudo_pos):
"""Run a forward (pseudo -> real) calculation"""
return self.RealPosition(ch1=pseudo_pos.delay, ch2=pseudo_pos.delay + pseudo_pos.width)
@real_position_argument
def inverse(self, real_pos):
"""Run an inverse (real -> pseudo) calculation"""
return self.PseudoPosition(delay=real_pos.ch1, width=real_pos.ch2 - real_pos.ch1)
class DDGCustomMixin:
"""
Mixin class for custom DelayGenerator logic within PSIDelayGeneratorBase.
This class provides a parent class for implementation of BL specific logic of the device.
It is also possible to pass implementing certain methods, e.g. finished or on_trigger,
based on the setup and desired operation mode at the beamline.
Args:
parent (object): instance of PSIDelayGeneratorBase
**kwargs: keyword arguments
"""
def __init__(self, *_args, parent: Device = None, **_kwargs) -> None:
self.parent = parent
def initialize_default_parameter(self) -> None:
"""
Method to initialize default parameters for DDG.
Called upon initiating the base class.
It should be used to set the DDG default parameters.
These may include: amplitude, offsets, delays, etc.
"""
def prepare_ddg(self) -> None:
"""
Method to prepare the DDG for the upcoming scan.
Called by the stage method of the base class.
It should be used to set the DDG parameters for the upcoming scan.
"""
def on_trigger(self) -> None:
"""Method executed upon trigger call in parent class"""
def finished(self) -> None:
"""Method to check if DDG is finished with the scan"""
def on_pre_scan(self) -> None:
"""
Method executed upon pre_scan call in parent class.
Covenient to implement time sensitive actions to be executed right before start of the scan.
Example could be to open the shutter by triggering a pulse via pre_scan.
"""
def check_scan_id(self) -> None:
"""Method to check if there is a new scan_id, called by stage."""
def is_ddg_okay(self, raise_on_error=False) -> None:
"""
Method to check if DDG is okay
It checks the status PV of the DDG and tries to clear the error if it is not okay.
It will rerun itself and raise DelayGeneratorNotOkay if DDG is still not okay.
Args:
raise_on_error (bool, optional): raise exception if DDG is not okay. Defaults to False.
"""
status = self.parent.status.read()[self.parent.status.name]["value"]
if status != "STATUS OK" and not raise_on_error:
logger.warning(f"DDG returns {status}, trying to clear ERROR")
self.parent.clear_error()
time.sleep(1)
self.is_ddg_okay(raise_on_error=True)
elif status != "STATUS OK":
raise DelayGeneratorNotOkay(f"DDG failed to start with status: {status}")
class PSIDelayGeneratorBase(Device):
"""
Abstract base class for DelayGenerator DG645
This class implements a thin Ophyd wrapper around the Stanford Research DG645
digital delay generator.
The DG645 generates 8+1 signals: A, B, C, D, E, F, G, H and T0. Front panel outputs
T0, AB, CD, EF and GH are combinations of these signals. Back panel outputs are
directly routed signals. Signals are not independent.
Signal pairs, e.g. AB, CD, EF, GH, are implemented as DelayPair objects. They
have a TTL pulse width, delay and a reference signal to which they are being triggered.
In addition, the io layer allows setting amplitude, offset and polarity for each pair.
Detailed information can be found in the manual:
https://www.thinksrs.com/downloads/pdfs/manuals/DG645m.pdf
Class attributes:
custom_prepare_cls (object): class for custom prepare logic (BL specific)
Args:
prefix (str) : EPICS PV prefix for component (optional)
name (str) : name of the device, as will be reported via read()
kind (str) : member of class 'ophydobj.Kind', defaults to Kind.normal
omitted -> readout ignored for read 'ophydobj.read()'
normal -> readout for read
config -> config parameter for 'ophydobj.read_configuration()'
hinted -> which attribute is readout for read
read_attrs (list) : sequence of attribute names to read
configuration_attrs (list) : sequence of attribute names via config_parameters
parent (object) : instance of the parent device
device_manager (object) : bec device manager
sim_mode (bool) : simulation mode, if True, no device manager is required
**kwargs : keyword arguments
attributes : lazy_wait_for_connection : bool
"""
# Custom_prepare_cls
custom_prepare_cls = DDGCustomMixin
SUB_PROGRESS = "progress"
SUB_VALUE = "value"
_default_sub = SUB_VALUE
USER_ACCESS = ["set_channels", "_set_trigger", "burst_enable", "burst_disable", "reload_config"]
# Assign PVs from DDG645
trigger_burst_readout = Component(
EpicsSignal, "EventStatusLI.PROC", name="trigger_burst_readout"
)
burst_cycle_finished = Component(EpicsSignalRO, "EventStatusMBBID.B3", name="read_burst_state")
delay_finished = Component(EpicsSignalRO, "EventStatusMBBID.B2", name="delay_finished")
status = Component(EpicsSignalRO, "StatusSI", name="status")
clear_error = Component(EpicsSignal, "StatusClearBO", name="clear_error")
# Front Panel
channelT0 = Component(DelayStatic, "T0", name="T0")
channelAB = Component(DelayPair, "", name="AB", channel="AB")
channelCD = Component(DelayPair, "", name="CD", channel="CD")
channelEF = Component(DelayPair, "", name="EF", channel="EF")
channelGH = Component(DelayPair, "", name="GH", channel="GH")
holdoff = Component(
EpicsSignal,
"TriggerHoldoffAI",
write_pv="TriggerHoldoffAO",
name="trigger_holdoff",
kind=Kind.config,
)
inhibit = Component(
EpicsSignal,
"TriggerInhibitMI",
write_pv="TriggerInhibitMO",
name="trigger_inhibit",
kind=Kind.config,
)
source = Component(
EpicsSignal,
"TriggerSourceMI",
write_pv="TriggerSourceMO",
name="trigger_source",
kind=Kind.config,
)
level = Component(
EpicsSignal,
"TriggerLevelAI",
write_pv="TriggerLevelAO",
name="trigger_level",
kind=Kind.config,
)
rate = Component(
EpicsSignal,
"TriggerRateAI",
write_pv="TriggerRateAO",
name="trigger_rate",
kind=Kind.config,
)
trigger_shot = Component(EpicsSignal, "TriggerDelayBO", name="trigger_shot", kind="config")
burstMode = Component(
EpicsSignal, "BurstModeBI", write_pv="BurstModeBO", name="burstmode", kind=Kind.config
)
burstConfig = Component(
EpicsSignal, "BurstConfigBI", write_pv="BurstConfigBO", name="burstconfig", kind=Kind.config
)
burstCount = Component(
EpicsSignal, "BurstCountLI", write_pv="BurstCountLO", name="burstcount", kind=Kind.config
)
burstDelay = Component(
EpicsSignal, "BurstDelayAI", write_pv="BurstDelayAO", name="burstdelay", kind=Kind.config
)
burstPeriod = Component(
EpicsSignal, "BurstPeriodAI", write_pv="BurstPeriodAO", name="burstperiod", kind=Kind.config
)
def __init__(
self,
prefix="",
*,
name,
kind=None,
read_attrs=None,
configuration_attrs=None,
parent=None,
device_manager=None,
sim_mode=False,
**kwargs,
):
super().__init__(
prefix=prefix,
name=name,
kind=kind,
read_attrs=read_attrs,
configuration_attrs=configuration_attrs,
parent=parent,
**kwargs,
)
if device_manager is None and not sim_mode:
raise DeviceInitError(
f"No device manager for device: {name}, and not started sim_mode: {sim_mode}. Add"
" DeviceManager to initialization or init with sim_mode=True"
)
# Init variables
self.sim_mode = sim_mode
self.stopped = False
self.name = name
self.scaninfo = None
self.timeout = 5
self.all_channels = ["channelT0", "channelAB", "channelCD", "channelEF", "channelGH"]
self.all_delay_pairs = ["AB", "CD", "EF", "GH"]
self.wait_for_connection(all_signals=True)
# Init custom prepare class with BL specific logic
self.custom_prepare = self.custom_prepare_cls(parent=self, **kwargs)
if not sim_mode:
self.device_manager = device_manager
else:
self.device_manager = bec_utils.DMMock()
self.connector = self.device_manager.connector
self._update_scaninfo()
self._init()
def _update_scaninfo(self) -> None:
"""
Method to updated scaninfo from BEC.
In sim_mode, scaninfo output is mocked - see bec_scaninfo_mixin.py
"""
self.scaninfo = BecScaninfoMixin(self.device_manager, self.sim_mode)
self.scaninfo.load_scan_metadata()
def _init(self) -> None:
"""Method to initialize custom parameters of the DDG."""
self.custom_prepare.initialize_default_parameter()
self.custom_prepare.is_ddg_okay()
def set_channels(self, signal: str, value: Any, channels: list = None) -> None:
"""
Method to set signals on DelayPair and DelayStatic channels.
Signals can be set on the DelayPair and DelayStatic channels. The method checks
if the signal is available on the channel and sets it. It works for both, DelayPair
and Delay Static although signals are hosted in different layers.
Args:
signal (str) : signal to set (width, delay, amplitude, offset, polarity)
value (Any) : value to set
channels (list, optional) : list of channels to set. Defaults to self.all_channels (T0,AB,CD,EF,GH)
"""
if not channels:
channels = self.all_channels
for chname in channels:
channel = getattr(self, chname, None)
if not channel:
continue
if signal in channel.component_names:
getattr(channel, signal).set(value)
continue
if "io" in channel.component_names and signal in channel.io.component_names:
getattr(channel.io, signal).set(value)
def set_trigger(self, trigger_source: TriggerSource) -> None:
"""Set trigger source on DDG - possible values defined in TriggerSource enum"""
value = int(trigger_source)
self.source.put(value)
def burst_enable(self, count, delay, period, config="all"):
"""Enable the burst mode"""
# Validate inputs
count = int(count)
assert count > 0, "Number of bursts must be positive"
assert delay >= 0, "Burst delay must be larger than 0"
assert period > 0, "Burst period must be positive"
assert config in ["all", "first"], "Supported burst configs are 'all' and 'first'"
self.burstMode.put(1)
self.burstCount.put(count)
self.burstDelay.put(delay)
self.burstPeriod.put(period)
if config == "all":
self.burstConfig.put(0)
elif config == "first":
self.burstConfig.put(1)
def burst_disable(self):
"""Disable burst mode"""
self.burstMode.put(0)
def stage(self) -> list[object]:
"""
Method to stage the device.
Called in preparation for a scan.
Internal Calls:
- scaninfo.load_scan_metadata : load scan metadata
- custom_prepare.prepare_ddg : prepare DDG for measurement
- is_ddg_okay : check if DDG is okay
Returns:
list(object): list of objects that were staged
"""
if self._staged != Staged.no:
return super().stage()
self.stopped = False
self.scaninfo.load_scan_metadata()
self.custom_prepare.prepare_ddg()
self.custom_prepare.is_ddg_okay()
# At the moment needed bc signal might not be reliable, BEC too fast.
# Consider removing this overhead in future!
time.sleep(0.05)
return super().stage()
def trigger(self) -> DeviceStatus:
"""
Method to trigger the acquisition.
Internal Call:
- custom_prepare.on_trigger : execute BL specific action
"""
self.custom_prepare.on_trigger()
return super().trigger()
def pre_scan(self) -> None:
"""
Method pre_scan gets executed directly before the scan
Internal Call:
- custom_prepare.on_pre_scan : execute BL specific action
"""
self.custom_prepare.on_pre_scan()
def unstage(self) -> list[object]:
"""
Method unstage gets called at the end of a scan.
If scan (self.stopped is True) is stopped, returns directly.
Otherwise, checks if the DDG finished acquisition
Internal Calls:
- custom_prepare.check_scan_id : check if scan_id changed or detector stopped
- custom_prepare.finished : check if device finished acquisition (succesfully)
- is_ddg_okay : check if DDG is okay
Returns:
list(object): list of objects that were unstaged
"""
self.custom_prepare.check_scan_id()
if self.stopped is True:
return super().unstage()
self.custom_prepare.finished()
self.custom_prepare.is_ddg_okay()
self.stopped = False
return super().unstage()
def stop(self, *, success=False) -> None:
"""
Method to stop the DDG
#TODO Check if the pulse generation can be interruppted
Internal Call:
- custom_prepare.is_ddg_okay : check if DDG is okay
"""
self.custom_prepare.is_ddg_okay()
super().stop(success=success)
self.stopped = True

View File

@@ -0,0 +1,330 @@
import os
import time
from bec_lib.device import DeviceStatus
from bec_lib.file_utils import FileWriter
from ophyd import Device
from ophyd.device import Staged
from ophyd_devices.utils import bec_utils
from ophyd_devices.utils.bec_scaninfo_mixin import BecScaninfoMixin
class DetectorInitError(Exception):
"""Raised when initiation of the device class fails,
due to missing device manager or not started in sim_mode."""
class CustomDetectorMixin:
"""
Mixin class for custom detector logic
This class is used to implement BL specific logic for the detector.
It is used in the PSIDetectorBase class.
For the integration of a new detector, the following functions should
help with integrating functionality, but additional ones can be added.
Check PSIDetectorBase for the functions that are called during relevant function calls of
stage, unstage, trigger, stop and _init.
"""
def __init__(self, *_args, parent: Device = None, **_kwargs) -> None:
self.parent = parent
def initialize_default_parameter(self) -> None:
"""
Init parameters for the detector
Raises (optional):
DetectorTimeoutError: if detector cannot be initialized
"""
def initialize_detector(self) -> None:
"""
Init parameters for the detector
Raises (optional):
DetectorTimeoutError: if detector cannot be initialized
"""
def initialize_detector_backend(self) -> None:
"""
Init parameters for teh detector backend (filewriter)
Raises (optional):
DetectorTimeoutError: if filewriter cannot be initialized
"""
def prepare_detector(self) -> None:
"""
Prepare detector for the scan
"""
def prepare_detector_backend(self) -> None:
"""
Prepare detector backend for the scan
"""
def stop_detector(self) -> None:
"""
Stop the detector
"""
def stop_detector_backend(self) -> None:
"""
Stop the detector backend
"""
def on_trigger(self) -> None:
"""
Specify actions to be executed upon receiving trigger signal
"""
def pre_scan(self) -> None:
"""
Specify actions to be executed right before a scan
BEC calls pre_scan just before execution of the scan core.
It is convenient to execute time critical features of the detector,
e.g. arming it, but it is recommended to keep this function as short/fast as possible.
"""
def finished(self) -> None:
"""
Specify actions to be executed during unstage
This may include checks if acquisition was succesful
Raises (optional):
DetectorTimeoutError: if detector cannot be stopped
"""
def check_scan_id(self) -> None:
"""
Check if BEC is running on a new scan_id
"""
def publish_file_location(self, done: bool = False, successful: bool = None) -> None:
"""
Publish the designated filepath from data backend to REDIS.
Typically, the following two message types are published:
- file_event: event for the filewriter
- public_file: event for any secondary service (e.g. radial integ code)
"""
def wait_for_signals(
self,
signal_conditions: list,
timeout: float,
check_stopped: bool = False,
interval: float = 0.05,
all_signals: bool = False,
) -> bool:
"""Wait for signals to reach a certain condition
Args:
signal_conditions (tuple): tuple of (get_current_state, condition) functions
timeout (float): timeout in seconds
interval (float): interval in seconds
all_signals (bool): True if all signals should be True, False if any signal should be True
Returns:
bool: True if all signals are in the desired state, False if timeout is reached
"""
timer = 0
while True:
checks = [
get_current_state() == condition
for get_current_state, condition in signal_conditions
]
if check_stopped is True and self.parent.stopped is True:
return False
if (all_signals and all(checks)) or (not all_signals and any(checks)):
return True
if timer > timeout:
return False
time.sleep(interval)
timer += interval
class PSIDetectorBase(Device):
"""
Abstract base class for SLS detectors
Class attributes:
custom_prepare_cls (object): class for custom prepare logic (BL specific)
Min_readout (float): minimum readout time for detector
Args:
prefix (str): EPICS PV prefix for component (optional)
name (str): name of the device, as will be reported via read()
kind (str): member of class 'ophydobj.Kind', defaults to Kind.normal
omitted -> readout ignored for read 'ophydobj.read()'
normal -> readout for read
config -> config parameter for 'ophydobj.read_configuration()'
hinted -> which attribute is readout for read
read_attrs (list): sequence of attribute names to read
configuration_attrs (list): sequence of attribute names via config_parameters
parent (object): instance of the parent device
device_manager (object): bec device manager
sim_mode (bool): simulation mode, if True, no device manager is required
**kwargs: keyword arguments
attributes: lazy_wait_for_connection : bool
"""
custom_prepare_cls = CustomDetectorMixin
MIN_READOUT = 1e-3
# Specify which functions are revealed to the user in BEC client
USER_ACCESS = ["describe"]
def __init__(
self,
prefix="",
*,
name,
kind=None,
read_attrs=None,
configuration_attrs=None,
parent=None,
device_manager=None,
sim_mode=False,
**kwargs,
):
super().__init__(
prefix=prefix,
name=name,
kind=kind,
read_attrs=read_attrs,
configuration_attrs=configuration_attrs,
parent=parent,
**kwargs,
)
if device_manager is None and not sim_mode:
raise DetectorInitError(
f"No device manager for device: {name}, and not started sim_mode: {sim_mode}. Add"
" DeviceManager to initialization or init with sim_mode=True"
)
# Init variables
self.sim_mode = sim_mode
self.stopped = False
self.name = name
self.service_cfg = None
self.scaninfo = None
self.filewriter = None
self.timeout = 5
self.wait_for_connection(all_signals=True)
# Init custom prepare class with BL specific logic
self.custom_prepare = self.custom_prepare_cls(parent=self, **kwargs)
if not sim_mode:
self._update_service_config()
self.device_manager = device_manager
else:
self.device_manager = bec_utils.DMMock()
base_path = kwargs["basepath"] if "basepath" in kwargs else "~/Data10/"
self.service_cfg = {"base_path": os.path.expanduser(base_path)}
self.connector = self.device_manager.connector
self._update_scaninfo()
self._update_filewriter()
self._init()
def _update_filewriter(self) -> None:
"""Update filewriter with service config"""
self.filewriter = FileWriter(service_config=self.service_cfg, connector=self.connector)
def _update_scaninfo(self) -> None:
"""Update scaninfo from BecScaninfoMixing
This depends on device manager and operation/sim_mode
"""
self.scaninfo = BecScaninfoMixin(self.device_manager, self.sim_mode)
self.scaninfo.load_scan_metadata()
def _update_service_config(self) -> None:
"""Update service config from BEC service config"""
from bec_lib.bec_service import SERVICE_CONFIG
self.service_cfg = SERVICE_CONFIG.config["service_config"]["file_writer"]
def _init(self) -> None:
"""Initialize detector, filewriter and set default parameters"""
self.custom_prepare.initialize_default_parameter()
self.custom_prepare.initialize_detector()
self.custom_prepare.initialize_detector_backend()
def stage(self) -> list[object]:
"""
Stage device in preparation for a scan
Internal Calls:
- _prep_backend : prepare detector filewriter for measurement
- _prep_detector : prepare detector for measurement
Returns:
list(object): list of objects that were staged
"""
# Method idempotent, should rais ;obj;'RedudantStaging' if staged twice
if self._staged != Staged.no:
return super().stage()
# Reset flag for detector stopped
self.stopped = False
# Load metadata of the scan
self.scaninfo.load_scan_metadata()
# Prepare detector and file writer
self.custom_prepare.prepare_detector_backend()
self.custom_prepare.prepare_detector()
state = False
self.custom_prepare.publish_file_location(done=state)
# At the moment needed bc signal might not be reliable, BEC too fast.
# Consider removing this overhead in future!
time.sleep(0.05)
return super().stage()
def trigger(self) -> DeviceStatus:
"""Trigger the detector, called from BEC."""
self.custom_prepare.on_trigger()
return super().trigger()
def unstage(self) -> list[object]:
"""
Unstage device in preparation for a scan
Returns directly if self.stopped,
otherwise checks with self._finished
if data acquisition on device finished (an was successful)
Internal Calls:
- custom_prepare.check_scan_id : check if scan_id changed or detector stopped
- custom_prepare.finished : check if device finished acquisition (succesfully)
- custom_prepare.publish_file_location : publish file location to bec
Returns:
list(object): list of objects that were unstaged
"""
self.custom_prepare.check_scan_id()
if self.stopped is True:
return super().unstage()
self.custom_prepare.finished()
state = True
self.custom_prepare.publish_file_location(done=state, successful=state)
self.stopped = False
return super().unstage()
def stop(self, *, success=False) -> None:
"""
Stop the scan, with camera and file writer
Internal Calls:
- custom_prepare.stop_detector : stop detector
- custom_prepare.stop_backend : stop detector filewriter
"""
self.custom_prepare.stop_detector()
self.custom_prepare.stop_detector_backend()
super().stop(success=success)
self.stopped = True

View File

@@ -0,0 +1,495 @@
""" This module provides a range of protocols that describe the expected interface for different types of devices.
The protocols below can be used as teamplates for functionality to be implemeted by different type of devices.
They further facilitate runtime checks on devices and provide a minimum set of properties required for a device to be loadable by BEC.
The protocols are:
- BECDeviceProtocol: Protocol for devices in BEC. All devices must at least implement this protocol.
- BECSignalProtocol: Protocol for signals.
- BECScanProtocol: Protocol for the scan interface.
- BECMixinProtocol: Protocol for utilities in particular relevant for detector implementations.
- BECPositionerProtocol: Protocol for positioners.
- BECFlyerProtocol: Protocol with for flyers.
Keep in mind, that a device of type flyer should generally also implement the BECScanProtocol that provides the required functionality for scans.
Flyers in addition, also implement the BECFlyerProtocol. Similarly, positioners should also implement the BECScanProtocol and BECPositionerProtocol.
"""
from typing import Protocol, runtime_checkable
from bec_lib.file_utils import FileWriter
from ophyd import Component, DeviceStatus, Kind, Staged
from ophyd_devices.utils import bec_scaninfo_mixin
@runtime_checkable
class BECDeviceProtocol(Protocol):
"""Protocol for ophyd objects with zero functionality."""
_destroyed: bool
@property
def name(self) -> str:
"""name property"""
@name.setter
def name(self, value: str) -> None:
"""name setter"""
@property
def kind(self) -> Kind:
"""kind property"""
@kind.setter
def kind(self, value: Kind):
"""kind setter"""
@property
def parent(self) -> object:
"""Property to find the parent device"""
@property
def root(self) -> object:
"""Property to fint the root device"""
@property
def hints(self) -> dict:
"""hints property"""
@property
def connected(self) -> bool:
"""connected property.
Check if signals are connected
Returns:
bool: True if connected, False otherwise
"""
@connected.setter
def connected(self, value: bool):
"""connected setter"""
def read(self) -> dict:
"""read method
Override by child class with read method
Returns:
dict: Dictionary with nested dictionary of signals with kind.normal or kind.hinted:
{'signal_name' : {'value' : .., "timestamp" : ..}, ...}
"""
def read_configuration(self) -> dict:
"""read_configuration method
Override by child class with read_configuration method
Returns:
dict: Dictionary with nested dictionary of signals with kind.config:
{'signal_name' : {'value' : .., "timestamp" : ..}, ...}
"""
def describe(self) -> dict:
"""describe method
Override by child class with describe method
Returns:
dict: Dictionary with dictionaries with signal descriptions ('source', 'dtype', 'shape')
"""
def describe_configuration(self) -> dict:
"""describe method
Includes all signals of type Kind.config.
Override by child class with describe_configuration method
Returns:
dict: Dictionary with dictionaries with signal descriptions ('source', 'dtype', 'shape')
"""
def destroy(self) -> None:
"""Destroy method.
_destroyed must be set to True after calling destroy.
"""
def trigger(self) -> DeviceStatus:
"""Trigger method on the device
Returns ophyd DeviceStatus object, which is used to track the status of the trigger.
It can also be blocking until the trigger is completed, and return the status object
with set_finished() method called on the DeviceStatus.
"""
@runtime_checkable
class BECSignalProtocol(Protocol):
"""Protocol for BEC signals with zero functionality.
This protocol adds the specific implementation for a signal.
Please be aware that a signal must also implement BECDeviceProtocol.
Note: Currently the implementation of the protocol is not taking into account the
event_model from ophyd, i.e. _run_sbus
"""
@property
def limits(self) -> tuple[float, float]:
"""Limits property for signals.
If low_limit == high_limit, it is equivalent to NO limits!
Returns:
tuple: Tuple with lower and upper limits
"""
@property
def high_limit(self) -> float:
"""High limit property for signals.
Returns:
float: Upper limit
"""
@property
def low_limit(self) -> float:
"""Low limit property for signals.
Returns:
float: Lower limit
"""
@property
def write_access(self) -> bool:
"""Write access method for signals.
Returns:
bool: True if write access is allowed, False otherwise
"""
def check_value(self, value: float) -> None:
"""Check whether value is within limits
Args:
value: value to check
Raises:
LimitError in case the requested motion is not inside of limits.
"""
def put(self, value: any, force: bool = False, timeout: float = None):
"""Put method for signals.
This method should resolve immediately and not block.
If not force, the method checks if the value is within limits using check_value.
Args:
value (any) : value to put
force (bool) : Flag to force the put and ignore limits
timeout (float) : Timeout for the put
"""
def set(self, value: any, timeout: float = None) -> DeviceStatus:
"""Set method for signals.
This method should be blocking until the set is completed.
Args:
value (any) : value to set
timeout (float) : Timeout for the set
Returns:
DeviceStatus : DeviceStatus object that will finish upon return
"""
@runtime_checkable
class BECScanProtocol(BECDeviceProtocol, Protocol):
"""Protocol for devices offering an Protocol with all relevant functionality for scans.
In BEC, scans typically follow the order of stage, (pre_scan), trigger, unstage.
Stop should be used to interrupt a scan. Be aware that pre_scan is optional and therefor
part of the BECMixinProtocol, typically useful for more complex devices such as detectors.
This protocol allows to perform runtime checks on devices of ophyd.
It is the minimum set of properties required for a device to be loadable by BEC.
"""
_staged: Staged
"""Staged property to indicate if the device is staged."""
def stage(self) -> list[object]:
"""Stage method to prepare the device for an upcoming acquistion.
This prepares a device for an upcoming acquisition, i.e. it is the first
method for which the scan parameters are known and the device can be configured.
It can be used to move scan_motors to their start position
or also prepare DAQ systems for the upcoming measurement.
We can further publish the file location for DAQ systems
to BEC and inform BEC's file writer where data will be written to.
Stagin is not idempotent. If called twice without an unstage it should raise.
For ophyd devices, one may used self._staged = True to check if the device is staged.
Returns:
list: List of objects that were staged, i.e. [self]
For devices with inheritance from ophyd, return
return super().stage() in the child class.
"""
def unstage(self) -> list[object]:
"""Unstage method to cleanup after the acquisition.
It can also be used to implement checks whether the acquisition was successful,
inform BEC that the file has been succesfully written, or raise upon receiving
feedback that the scan did not finish successful.
Unstaging is not idempotent. If called twice it should simply resolve.
It is recommended to return super().unstage() in the child class, if
the child class also inherits from ophyd repository.
"""
def stop(self, success: bool) -> None:
"""Stop method to stop the device.
Args:
success: Flag to indicate if the scan was successful or not.
This method should be called to stop the device. It is recommended to call
super().stop(success=success) if class inherits from ophyd repository.
"""
@runtime_checkable
class BECMixinProtocol(Protocol):
"""Protocol that offers BEC specific utility functionality for detectors."""
USER_ACCESS: list[str]
"""
List of methods/properties that will be exposed to the client interface in addition
to the the already exposed signals, methods and properties.
"""
scaninfo: bec_scaninfo_mixin
"""
BEC scan info mixin class that provides an transparent Protocol to scan parameter
as provided by BEC. It is recommended to use this Protocol to retrieve scaninfo from Redis.
"""
stopped: bool
"""
Flag to indicate if the device is stopped.
The stop method should set this flag to True, and i.e. stage to set it to False.
"""
filewriter: FileWriter
"""
The file writer mixin main purpose is to unify and centralize the creation of
file paths within BEC. Therefore, we recommend devices to use the same mixin for creation of paths.
"""
def pre_scan(self):
"""Pre-scan method is called from BEC right before executing scancore, thus
right before the start of an acquisition.
It can be used to trigger time critical functions from the device, which
are prone to run into timeouts in case called too early.
"""
@runtime_checkable
class BECPositionerProtocol(Protocol):
"""Protocol with functionality specific for positioners in BEC."""
@property
def limits(self) -> tuple[float, float]:
"""Limits property for positioners.
For an EpicsMotor, BEC will automatically recover the limits from the IOC.
If not set, it returns (0,0).
Note, low_limit = high_limit is equivalent to NO limits!
Returns:
tuple: Tuple with lower and upper limits
"""
@property
def low_limit(self) -> float:
"""Low limit property for positioners.
Returns:
float: Lower limit
"""
@property
def high_limit(self) -> float:
"""High limit property for positioners.
Returns:
float: Upper limit
"""
def check_value(self, value: float) -> None:
"""Check whether value is within limits
Args:
value: value to check
Raises:
LimitError in case the requested motion is not inside of limits.
"""
def move(self, position: float) -> DeviceStatus:
"""Move method for positioners.
The returned DeviceStatus is marked as done once the positioner has reached the target position.
DeviceStatus.wait() can be used to block until the move is completed.
Args:
position: position to move to
Returns:
DeviceStatus: DeviceStatus object
"""
def set(self, position: float) -> DeviceStatus:
"""Set method for positioners.
In principle, a set command is the same as move. This comes from ophyd upstream.
We will have to review whether BEC requires both.
Args:
position: position to move to
Returns:
DeviceStatus: DeviceStatus object
"""
@runtime_checkable
class BECFlyerProtocol(BECScanProtocol, Protocol):
"""Protocol with functionality specific for flyers in BEC."""
# def configure(self, d: dict):
# """Configure method of the flyer.
# It is an optional method, but does not need to be implemented by a flyer.
# Instead, stage can be used to prepare time critical operations on the device in preparation of a scan.
# Method to configure the flyer in preparation of a scan.
# Args:
# d (dict): Dictionary with configuration parameters, i.e. key value pairs of signal_name : value
# """
def kickoff(self) -> DeviceStatus:
"""Kickoff method for flyers.
The returned DeviceStatus is marked as done once the flyer start flying,
i.e. is ready to be triggered.
Returns:
DeviceStatus: DeviceStatus object
"""
def complete(self) -> DeviceStatus:
"""Complete method for flyers.
The returned DeviceStatus is marked as done once the flyer has completed.
Returns:
DeviceStatus: DeviceStatus object
"""
@runtime_checkable
class BECRotationProtocol(Protocol):
"""Protocol which defines functionality for a tomography stage for ophyd devices"""
allow_mod360: Component
"""Signal to define whether mod360 operations are allowed. """
@property
def has_mod360(self) -> bool:
"""Property to check if the motor has mod360 option
Returns:
bool: True if mod360 is possible on device, False otherwise
"""
@property
def has_freerun(self) -> bool:
"""Property to check if the motor has freerun option
Returns:
bool: True if freerun is allowed, False otherwise
"""
@property
def valid_rotation_modes(self) -> list[str]:
"""Method to get the valid rotation modes for the implemented motor.
Returns:
list: List of strings with valid rotation modes
"""
def apply_mod360(self) -> None:
"""Method to apply the modulus 360 operation on the specific device.
Childrens should override this method
"""
@runtime_checkable
class BECEventProtocol(Protocol):
"""Protocol for events in BEC.
This is a first draft for the event protocol introduced throughout BEC.
It needs to be review and extended before it can be used in production.
"""
_callbacks: dict[dict]
@property
def event_types(self) -> tuple[str]:
"""Event types property"""
def _run_subs(self, sub_type: str, **kwargs):
"""Run subscriptions for the event.
Args:
sub_type: Subscription type
kwargs: Keyword arguments
"""
def subscribe(self, callback: callable, event_type: str = None, run: bool = True):
"""Subscribe to the event.
Args:
callback (callable) : Callback function
The expected callback structure is:
def cb(*args, obj:OphydObject, sub_type:str, **kwargs) -> None:
pass
event_type (str) : Event type, if None it defaults to obj._default_sub
This maps to sub_type in _run_subs
run (bool) : If true, run the callback directly.
Returns:
cid (int): Callback id
"""
def clear_sub(self, cb: callable, event_type: str = None):
"""Clear subscription, given the origianl callback fucntion
Args:
cb (callable) : Callback
event_type (str): Event type, if None it will be remove from all event_types
"""
def unsubscribe(self, cid: int):
"""Unsubscribe from the event.
Args:
cid (int): Callback id
"""