diff --git a/ophyd_devices/epics/devices/DelayGeneratorDG645.py b/ophyd_devices/epics/devices/DelayGeneratorDG645.py index caac4ec..f51576a 100644 --- a/ophyd_devices/epics/devices/DelayGeneratorDG645.py +++ b/ophyd_devices/epics/devices/DelayGeneratorDG645.py @@ -1,12 +1,6 @@ -# -*- coding: utf-8 -*- -""" -Created on Tue Nov 9 16:12:47 2021 - -@author: mohacsi_i -""" - import enum import time +from typing import Any, List from ophyd import Device, Component, EpicsSignal, EpicsSignalRO, Kind from ophyd import PVPositioner, Signal from ophyd.pseudopos import ( @@ -15,12 +9,12 @@ from ophyd.pseudopos import ( PseudoSingle, PseudoPositioner, ) +from ophyd_devices.utils.socket import data_shape, data_type +import ophyd_devices.utils.bec_utils as bec_utils -from bec_lib.core import BECMessage, MessageEndpoints, RedisConnector -from bec_lib.core.file_utils import FileWriterMixin from bec_lib.core import bec_logger -from ophyd_devices.utils.socket import data_shape, data_type +from ophyd_devices.epics.devices.bec_scaninfo_mixin import BecScaninfoMixin logger = bec_logger.logger @@ -145,9 +139,6 @@ class DelayPair(PseudoPositioner): # The pseudo positioner axes delay = Component(PseudoSingle, limits=(0, 2000.0), name="delay") width = Component(PseudoSingle, limits=(0, 2000.0), name="pulsewidth") - # The real delay axes - # ch1 = Component(EpicsSignal, "DelayAI", write_pv="DelayAO", name="ch1", put_complete=True, kind=Kind.config) - # ch2 = Component(EpicsSignal, "DelayAI", write_pv="DelayAO", name="ch2", put_complete=True, kind=Kind.config) ch1 = Component(DummyPositioner, name="ch1") ch2 = Component(DummyPositioner, name="ch2") io = Component(DelayStatic, name="io") @@ -254,6 +245,7 @@ class DelayGeneratorDG645(Device): name="trigger_rate", kind=Kind.config, ) + trigger_shot = Component(EpicsSignal, "TriggerDelayBO", name="trigger_shot", kind="config") # Burst mode burstMode = Component( EpicsSignal, "BurstModeBI", write_pv="BurstModeBO", name="burstmode", kind=Kind.config @@ -271,13 +263,15 @@ class DelayGeneratorDG645(Device): EpicsSignal, "BurstPeriodAI", write_pv="BurstPeriodAO", name="burstperiod", kind=Kind.config ) - delta_delay = Component(DDGConfigSignal, name="delta_delay", kind="config") + delay_burst = Component(DDGConfigSignal, name="delay_burst", kind="config") delta_width = Component(DDGConfigSignal, name="delta_width", kind="config") - delta_triggers = Component(DDGConfigSignal, name="delta_triggers", kind="config") + additional_triggers = Component(DDGConfigSignal, name="additional_triggers", kind="config") polarity = Component(DDGConfigSignal, name="polarity", kind="config") amplitude = Component(DDGConfigSignal, name="amplitude", kind="config") offset = Component(DDGConfigSignal, name="offset", kind="config") thres_trig_level = Component(DDGConfigSignal, name="thres_trig_level", kind="config") + set_high_on_exposure = Component(DDGConfigSignal, name="set_high_on_exposure", kind="config") + set_high_on_stage = Component(DDGConfigSignal, name="set_high_on_stage", kind="config") def __init__( self, @@ -289,6 +283,7 @@ class DelayGeneratorDG645(Device): configuration_attrs=None, parent=None, device_manager=None, + sim_mode=False, **kwargs, ): """_summary_ @@ -301,6 +296,7 @@ class DelayGeneratorDG645(Device): configuration_attrs (_type_, optional): _description_. Defaults to None. parent (_type_, optional): _description_. Defaults to None. device_manager (_type_, optional): _description_. Defaults to None. + Signals: polarity (_type_, optional): _description_. Defaults to None. amplitude (_type_, optional): _description_. Defaults to None. offset (_type_, optional): _description_. Defaults to None. @@ -310,13 +306,15 @@ class DelayGeneratorDG645(Device): delta_triggers (_type_, int): Add additional triggers to burst mode (mcs card needs +1 triggers per line). Defaults to 0. """ self.ddg_configs = { - f"{name}_delta_delay": 0, + f"{name}_delay_burst": 0, f"{name}_delta_width": 0, - f"{name}_delta_triggers": 0, + f"{name}_additional_triggers": 0, f"{name}_polarity": 1, f"{name}_amplitude": 2.5, # half amplitude -> 5V peak signal f"{name}_offset": 0, f"{name}_thres_trig_level": 1.75, # -> 3.5V + f"{name}_set_high_on_exposure": False, + f"{name}_set_high_on_stage": False, } super().__init__( prefix=prefix, @@ -327,9 +325,16 @@ class DelayGeneratorDG645(Device): parent=parent, **kwargs, ) + if device_manager is None and not sim_mode: + raise DDGError("Add DeviceManager to initialization or init with sim_mode=True") self.device_manager = device_manager - self._producer = self.device_manager.producer - self.wait_for_connection() + if not sim_mode: + self._producer = self.device_manager.producer + else: + self._producer = bec_utils.MockProducer() + self.scaninfo = BecScaninfoMixin(device_manager, sim_mode) + self.all_channels = ["channelT0", "channelAB", "channelCD", "channelEF", "channelGH"] + self.wait_for_connection() # Make sure to be connected before talking to PVs self._init_ddg() self._ddg_is_okay() @@ -362,35 +367,34 @@ class DelayGeneratorDG645(Device): Args: polarity: int | 0 negative, 1 positive defaults to 1 """ - self.channelT0.polarity.set(polarity) - self.channelAB.io.polarity.set(polarity) - self.channelCD.io.polarity.set(polarity) - self.channelEF.io.polarity.set(polarity) - self.channelGH.io.polarity.set(polarity) + self._set_channels("polarity", polarity) def _init_ddg_amp_allchannels(self, amplitude: float = 2.5) -> None: """Set amplitude for all channels (including T0) upon init Args: amplitude: float | defaults to 2.5 (value is equivalent to half amplitude -> 5V difference between low and high) """ - # TODO add check for range!! - self.channelT0.amplitude.set(amplitude) - self.channelAB.io.amplitude.set(amplitude) - self.channelCD.io.amplitude.set(amplitude) - self.channelEF.io.amplitude.set(amplitude) - self.channelGH.io.amplitude.set(amplitude) + self._set_channels("amplitude", amplitude) def _init_ddg_offset_allchannels(self, offset: float = 0) -> None: """Set offset for all channels (including T0) upon init Args: offset: float | defaults to 0 """ - # TODO add check for range!! - self.channelT0.offset.set(offset) - self.channelAB.io.offset.set(offset) - self.channelCD.io.offset.set(offset) - self.channelEF.io.offset.set(offset) - self.channelGH.io.offset.set(offset) + self._set_channels("offset", offset) + + def _set_channels(self, signal: str, value: Any, channels: List = None) -> None: + if not channels: + channels = self.all_channels + for chname in channels: + channel = getattr(self, chname, None) + if not channel: + continue + if signal in channel.component_names: + getattr(channel, signal).set(value) + continue + if "io" in channel.component_names and signal in channel.io.component_names: + getattr(channel.io, signal).set(value) def _cleanup_ddg(self) -> None: self._set_trigger(TriggerSource.SINGLE_SHOT) @@ -406,7 +410,24 @@ class DelayGeneratorDG645(Device): def stage(self): """Trigger the generator by arming to accept triggers""" - # TODO check PV TriggerDelayBO, seems to be a bug in the IOC + self.scaninfo.load_scan_metadata() + if self.scaninfo.scan_type == "step": + # define parameters + self._set_trigger(TriggerSource.SINGLE_SHOT) + exp_time = ( + self.delta_width.get() + self.scaninfo.exp_time + ) # TODO add readout+ self.scantype.readout + delay_burst = self.delay_burst.get() + num_burst_cycle = 1 + self.additional_triggers.get() + # set parameters in DDG + self.burstEnable(num_burst_cycle, delay_burst, exp_time, config="first") + self._set_channels("delay", 0) + self._set_channels("width", exp_time) + elif self.scaninfo.scan_type == "fly": + if self.set_high_on_exposure.get(): + ... + else: + raise DDGError(f"Unknown scan type {self.scaninfo.scan_type}") super().stage() @@ -415,6 +436,10 @@ class DelayGeneratorDG645(Device): self._set_trigger(TriggerSource.SINGLE_SHOT) super().stage() + def trigger(self) -> None: + if self.scaninfo.scan_type == "step": + self.trigger_shot.set(1).wait() + def burstEnable(self, count, delay, period, config="all"): """Enable the burst mode""" # Validate inputs @@ -441,4 +466,5 @@ class DelayGeneratorDG645(Device): # Automatically connect to test environmenr if directly invoked if __name__ == "__main__": - dgen = DelayGeneratorDG645("X01DA-PC-DGEN:", name="delayer") + dgen = DelayGeneratorDG645("delaygen:DG1:", name="dgen", sim_mode=True) + dgen.stage()