feat: bec_scaninfo_mixin class for scaninfo

This commit is contained in:
e21206 2023-08-23 14:17:31 +02:00
parent 39142ffc92
commit 49f95e0476

View File

@ -5,6 +5,8 @@ Created on Tue Nov 9 16:12:47 2021
@author: mohacsi_i @author: mohacsi_i
""" """
import enum
import time
from ophyd import Device, Component, EpicsSignal, EpicsSignalRO, Kind from ophyd import Device, Component, EpicsSignal, EpicsSignalRO, Kind
from ophyd import PVPositioner, Signal from ophyd import PVPositioner, Signal
from ophyd.pseudopos import ( from ophyd.pseudopos import (
@ -14,6 +16,92 @@ from ophyd.pseudopos import (
PseudoPositioner, PseudoPositioner,
) )
from bec_lib.core import BECMessage, MessageEndpoints, RedisConnector
from bec_lib.core.file_utils import FileWriterMixin
from bec_lib.core import bec_logger
from ophyd_devices.utils.socket import data_shape, data_type
logger = bec_logger.logger
DEFAULT_EPICSSIGNAL_VALUE = object()
class DDGError(Exception):
pass
class DDGConfigSignal(Signal):
def get(self):
self._readback = self.parent.ddg_configs[self.name]
return self._readback
def put(
self,
value,
connection_timeout=1,
callback=None,
timeout=1,
**kwargs,
):
"""Using channel access, set the write PV to `value`.
Keyword arguments are passed on to callbacks
Parameters
----------
value : any
The value to set
connection_timeout : float, optional
If not already connected, allow up to `connection_timeout` seconds
for the connection to complete.
use_complete : bool, optional
Override put completion settings
callback : callable
Callback for when the put has completed
timeout : float, optional
Timeout before assuming that put has failed. (Only relevant if
put completion is used.)
"""
old_value = self.get()
timestamp = time.time()
self.parent.ddg_configs[self.name] = value
super().put(value, timestamp=timestamp, force=True)
self._run_subs(
sub_type=self.SUB_VALUE,
old_value=old_value,
value=value,
timestamp=timestamp,
)
def describe(self):
"""Provide schema and meta-data for :meth:`~BlueskyInterface.read`
This keys in the `OrderedDict` this method returns must match the
keys in the `OrderedDict` return by :meth:`~BlueskyInterface.read`.
This provides schema related information, (ex shape, dtype), the
source (ex PV name), and if available, units, limits, precision etc.
Returns
-------
data_keys : OrderedDict
The keys must be strings and the values must be dict-like
with the ``event_model.event_descriptor.data_key`` schema.
"""
if self._readback is DEFAULT_EPICSSIGNAL_VALUE:
val = self.get()
else:
val = self._readback
return {
self.name: {
"source": f"{self.parent.prefix}:{self.name}",
"dtype": data_type(val),
"shape": data_shape(val),
}
}
class DelayStatic(Device): class DelayStatic(Device):
"""Static axis for the T0 output channel """Static axis for the T0 output channel
@ -85,6 +173,16 @@ class DelayPair(PseudoPositioner):
return self.PseudoPosition(delay=real_pos.ch1, width=real_pos.ch2 - real_pos.ch1) return self.PseudoPosition(delay=real_pos.ch1, width=real_pos.ch2 - real_pos.ch1)
class TriggerSource(int, enum.Enum):
INTERNAL = 0
EXT_RISING_EDGE = 1
EXT_FALLING_EDGE = 2
SS_EXT_RISING_EDGE = 3
SS_EXT_FALLING_EDGE = 4
SINGLE_SHOT = 5
LINE = 6
class DelayGeneratorDG645(Device): class DelayGeneratorDG645(Device):
"""DG645 delay generator """DG645 delay generator
@ -111,6 +209,7 @@ class DelayGeneratorDG645(Device):
state = Component(EpicsSignalRO, "EventStatusLI", name="status_register") state = Component(EpicsSignalRO, "EventStatusLI", name="status_register")
status = Component(EpicsSignalRO, "StatusSI", name="status") status = Component(EpicsSignalRO, "StatusSI", name="status")
clear_error = Component(EpicsSignal, "StatusClearBO", name="clear_error")
# Front Panel # Front Panel
channelT0 = Component(DelayStatic, "T0", name="T0") channelT0 = Component(DelayStatic, "T0", name="T0")
@ -155,10 +254,6 @@ class DelayGeneratorDG645(Device):
name="trigger_rate", name="trigger_rate",
kind=Kind.config, kind=Kind.config,
) )
# Command PVs
# arm = Component(EpicsSignal, "TriggerDelayBO", name="arm", kind=Kind.omitted)
# Burst mode # Burst mode
burstMode = Component( burstMode = Component(
EpicsSignal, "BurstModeBI", write_pv="BurstModeBO", name="burstmode", kind=Kind.config EpicsSignal, "BurstModeBI", write_pv="BurstModeBO", name="burstmode", kind=Kind.config
@ -176,15 +271,148 @@ class DelayGeneratorDG645(Device):
EpicsSignal, "BurstPeriodAI", write_pv="BurstPeriodAO", name="burstperiod", kind=Kind.config EpicsSignal, "BurstPeriodAI", write_pv="BurstPeriodAO", name="burstperiod", kind=Kind.config
) )
delta_delay = Component(DDGConfigSignal, name="delta_delay", kind="config")
delta_width = Component(DDGConfigSignal, name="delta_width", kind="config")
delta_triggers = Component(DDGConfigSignal, name="delta_triggers", kind="config")
polarity = Component(DDGConfigSignal, name="polarity", kind="config")
amplitude = Component(DDGConfigSignal, name="amplitude", kind="config")
offset = Component(DDGConfigSignal, name="offset", kind="config")
thres_trig_level = Component(DDGConfigSignal, name="thres_trig_level", kind="config")
def __init__(
self,
prefix="",
*,
name,
kind=None,
read_attrs=None,
configuration_attrs=None,
parent=None,
device_manager=None,
**kwargs,
):
"""_summary_
Args:
name (_type_): _description_
prefix (str, optional): _description_. Defaults to "".
kind (_type_, optional): _description_. Defaults to None.
read_attrs (_type_, optional): _description_. Defaults to None.
configuration_attrs (_type_, optional): _description_. Defaults to None.
parent (_type_, optional): _description_. Defaults to None.
device_manager (_type_, optional): _description_. Defaults to None.
polarity (_type_, optional): _description_. Defaults to None.
amplitude (_type_, optional): _description_. Defaults to None.
offset (_type_, optional): _description_. Defaults to None.
thres_trig_level (_type_, optional): _description_. Defaults to None.
delta_delay (_type_, float): Add delay for triggering in software trigger mode to allow fast shutter to open. Defaults to 0.
delta_width (_type_, float): Add width to fast shutter signal to make sure its open during acquisition. Defaults to 0.
delta_triggers (_type_, int): Add additional triggers to burst mode (mcs card needs +1 triggers per line). Defaults to 0.
"""
self.ddg_configs = {
f"{name}_delta_delay": 0,
f"{name}_delta_width": 0,
f"{name}_delta_triggers": 0,
f"{name}_polarity": 1,
f"{name}_amplitude": 2.5, # half amplitude -> 5V peak signal
f"{name}_offset": 0,
f"{name}_thres_trig_level": 1.75, # -> 3.5V
}
super().__init__(
prefix=prefix,
name=name,
kind=kind,
read_attrs=read_attrs,
configuration_attrs=configuration_attrs,
parent=parent,
**kwargs,
)
self.device_manager = device_manager
self._producer = self.device_manager.producer
self.wait_for_connection()
self._init_ddg()
self._ddg_is_okay()
def _set_trigger(self, trigger_source: TriggerSource) -> None:
"""Set trigger source to value of list below, or string
Accepts integer 0-6 or TriggerSource.* with *
INTERNAL = 0
EXT_RISING_EDGE = 1
EXT_FALLING_EDGE = 2
SS_EXT_RISING_EDGE = 3
SS_EXT_FALLING_EDGE = 4
SINGLE_SHOT = 5
LINE = 6
"""
value = int(trigger_source)
self.source.set(value)
def _ddg_is_okay(self, raise_on_error=False) -> None:
status = self.status.read()[self.status.name]["value"]
if status != "STATUS OK" and not raise_on_error:
logger.warning(f"DDG returns {status}, trying to clear ERROR")
self.clear_error()
time.sleep(1)
self._ddg_is_okay(rais_on_error=True)
elif status != "STATUS OK":
raise DDGError(f"DDG failed to start with status: {status}")
def _init_ddg_pol_allchannels(self, polarity: int = 1) -> None:
"""Set Polarity for all channels (including T0) upon init
Args:
polarity: int | 0 negative, 1 positive defaults to 1
"""
self.channelT0.polarity.set(polarity)
self.channelAB.io.polarity.set(polarity)
self.channelCD.io.polarity.set(polarity)
self.channelEF.io.polarity.set(polarity)
self.channelGH.io.polarity.set(polarity)
def _init_ddg_amp_allchannels(self, amplitude: float = 2.5) -> None:
"""Set amplitude for all channels (including T0) upon init
Args:
amplitude: float | defaults to 2.5 (value is equivalent to half amplitude -> 5V difference between low and high)
"""
# TODO add check for range!!
self.channelT0.amplitude.set(amplitude)
self.channelAB.io.amplitude.set(amplitude)
self.channelCD.io.amplitude.set(amplitude)
self.channelEF.io.amplitude.set(amplitude)
self.channelGH.io.amplitude.set(amplitude)
def _init_ddg_offset_allchannels(self, offset: float = 0) -> None:
"""Set offset for all channels (including T0) upon init
Args:
offset: float | defaults to 0
"""
# TODO add check for range!!
self.channelT0.offset.set(offset)
self.channelAB.io.offset.set(offset)
self.channelCD.io.offset.set(offset)
self.channelEF.io.offset.set(offset)
self.channelGH.io.offset.set(offset)
def _cleanup_ddg(self) -> None:
self._set_trigger(TriggerSource.SINGLE_SHOT)
def _init_ddg(self) -> None:
self._init_ddg_pol_allchannels(self.polarity.get())
self._init_ddg_amp_allchannels(self.amplitude.get())
self._init_ddg_offset_allchannels(self.offset.get())
self._set_trigger(TriggerSource.SINGLE_SHOT)
self.level.set(self.thres_trig_level.get())
# TODO add delta_delay, delta_width, delta triggers!
def stage(self): def stage(self):
"""Trigger the generator by arming to accept triggers""" """Trigger the generator by arming to accept triggers"""
# TODO check PV TriggerDelayBO, seems to be a bug in the IOC # TODO check PV TriggerDelayBO, seems to be a bug in the IOC
# self.arm.write(1).wait()
super().stage() super().stage()
def unstage(self): def unstage(self):
"""Stop the trigger generator from accepting triggers""" """Stop the trigger generator from accepting triggers"""
# self.arm.write(0).wait() self._set_trigger(TriggerSource.SINGLE_SHOT)
super().stage() super().stage()
def burstEnable(self, count, delay, period, config="all"): def burstEnable(self, count, delay, period, config="all"):