fix: stepscan logic implemented in ddg

This commit is contained in:
e21206 2023-08-23 17:56:41 +02:00
parent 6ee819de53
commit c365b8e954

View File

@ -1,12 +1,6 @@
# -*- coding: utf-8 -*-
"""
Created on Tue Nov 9 16:12:47 2021
@author: mohacsi_i
"""
import enum
import time
from typing import Any, List
from ophyd import Device, Component, EpicsSignal, EpicsSignalRO, Kind
from ophyd import PVPositioner, Signal
from ophyd.pseudopos import (
@ -15,12 +9,12 @@ from ophyd.pseudopos import (
PseudoSingle,
PseudoPositioner,
)
from ophyd_devices.utils.socket import data_shape, data_type
import ophyd_devices.utils.bec_utils as bec_utils
from bec_lib.core import BECMessage, MessageEndpoints, RedisConnector
from bec_lib.core.file_utils import FileWriterMixin
from bec_lib.core import bec_logger
from ophyd_devices.utils.socket import data_shape, data_type
from ophyd_devices.epics.devices.bec_scaninfo_mixin import BecScaninfoMixin
logger = bec_logger.logger
@ -145,9 +139,6 @@ class DelayPair(PseudoPositioner):
# The pseudo positioner axes
delay = Component(PseudoSingle, limits=(0, 2000.0), name="delay")
width = Component(PseudoSingle, limits=(0, 2000.0), name="pulsewidth")
# The real delay axes
# ch1 = Component(EpicsSignal, "DelayAI", write_pv="DelayAO", name="ch1", put_complete=True, kind=Kind.config)
# ch2 = Component(EpicsSignal, "DelayAI", write_pv="DelayAO", name="ch2", put_complete=True, kind=Kind.config)
ch1 = Component(DummyPositioner, name="ch1")
ch2 = Component(DummyPositioner, name="ch2")
io = Component(DelayStatic, name="io")
@ -254,6 +245,7 @@ class DelayGeneratorDG645(Device):
name="trigger_rate",
kind=Kind.config,
)
trigger_shot = Component(EpicsSignal, "TriggerDelayBO", name="trigger_shot", kind="config")
# Burst mode
burstMode = Component(
EpicsSignal, "BurstModeBI", write_pv="BurstModeBO", name="burstmode", kind=Kind.config
@ -271,13 +263,15 @@ class DelayGeneratorDG645(Device):
EpicsSignal, "BurstPeriodAI", write_pv="BurstPeriodAO", name="burstperiod", kind=Kind.config
)
delta_delay = Component(DDGConfigSignal, name="delta_delay", kind="config")
delay_burst = Component(DDGConfigSignal, name="delay_burst", kind="config")
delta_width = Component(DDGConfigSignal, name="delta_width", kind="config")
delta_triggers = Component(DDGConfigSignal, name="delta_triggers", kind="config")
additional_triggers = Component(DDGConfigSignal, name="additional_triggers", kind="config")
polarity = Component(DDGConfigSignal, name="polarity", kind="config")
amplitude = Component(DDGConfigSignal, name="amplitude", kind="config")
offset = Component(DDGConfigSignal, name="offset", kind="config")
thres_trig_level = Component(DDGConfigSignal, name="thres_trig_level", kind="config")
set_high_on_exposure = Component(DDGConfigSignal, name="set_high_on_exposure", kind="config")
set_high_on_stage = Component(DDGConfigSignal, name="set_high_on_stage", kind="config")
def __init__(
self,
@ -289,6 +283,7 @@ class DelayGeneratorDG645(Device):
configuration_attrs=None,
parent=None,
device_manager=None,
sim_mode=False,
**kwargs,
):
"""_summary_
@ -301,6 +296,7 @@ class DelayGeneratorDG645(Device):
configuration_attrs (_type_, optional): _description_. Defaults to None.
parent (_type_, optional): _description_. Defaults to None.
device_manager (_type_, optional): _description_. Defaults to None.
Signals:
polarity (_type_, optional): _description_. Defaults to None.
amplitude (_type_, optional): _description_. Defaults to None.
offset (_type_, optional): _description_. Defaults to None.
@ -310,13 +306,15 @@ class DelayGeneratorDG645(Device):
delta_triggers (_type_, int): Add additional triggers to burst mode (mcs card needs +1 triggers per line). Defaults to 0.
"""
self.ddg_configs = {
f"{name}_delta_delay": 0,
f"{name}_delay_burst": 0,
f"{name}_delta_width": 0,
f"{name}_delta_triggers": 0,
f"{name}_additional_triggers": 0,
f"{name}_polarity": 1,
f"{name}_amplitude": 2.5, # half amplitude -> 5V peak signal
f"{name}_offset": 0,
f"{name}_thres_trig_level": 1.75, # -> 3.5V
f"{name}_set_high_on_exposure": False,
f"{name}_set_high_on_stage": False,
}
super().__init__(
prefix=prefix,
@ -327,9 +325,16 @@ class DelayGeneratorDG645(Device):
parent=parent,
**kwargs,
)
if device_manager is None and not sim_mode:
raise DDGError("Add DeviceManager to initialization or init with sim_mode=True")
self.device_manager = device_manager
self._producer = self.device_manager.producer
self.wait_for_connection()
if not sim_mode:
self._producer = self.device_manager.producer
else:
self._producer = bec_utils.MockProducer()
self.scaninfo = BecScaninfoMixin(device_manager, sim_mode)
self.all_channels = ["channelT0", "channelAB", "channelCD", "channelEF", "channelGH"]
self.wait_for_connection() # Make sure to be connected before talking to PVs
self._init_ddg()
self._ddg_is_okay()
@ -362,35 +367,34 @@ class DelayGeneratorDG645(Device):
Args:
polarity: int | 0 negative, 1 positive defaults to 1
"""
self.channelT0.polarity.set(polarity)
self.channelAB.io.polarity.set(polarity)
self.channelCD.io.polarity.set(polarity)
self.channelEF.io.polarity.set(polarity)
self.channelGH.io.polarity.set(polarity)
self._set_channels("polarity", polarity)
def _init_ddg_amp_allchannels(self, amplitude: float = 2.5) -> None:
"""Set amplitude for all channels (including T0) upon init
Args:
amplitude: float | defaults to 2.5 (value is equivalent to half amplitude -> 5V difference between low and high)
"""
# TODO add check for range!!
self.channelT0.amplitude.set(amplitude)
self.channelAB.io.amplitude.set(amplitude)
self.channelCD.io.amplitude.set(amplitude)
self.channelEF.io.amplitude.set(amplitude)
self.channelGH.io.amplitude.set(amplitude)
self._set_channels("amplitude", amplitude)
def _init_ddg_offset_allchannels(self, offset: float = 0) -> None:
"""Set offset for all channels (including T0) upon init
Args:
offset: float | defaults to 0
"""
# TODO add check for range!!
self.channelT0.offset.set(offset)
self.channelAB.io.offset.set(offset)
self.channelCD.io.offset.set(offset)
self.channelEF.io.offset.set(offset)
self.channelGH.io.offset.set(offset)
self._set_channels("offset", offset)
def _set_channels(self, signal: str, value: Any, channels: List = None) -> None:
if not channels:
channels = self.all_channels
for chname in channels:
channel = getattr(self, chname, None)
if not channel:
continue
if signal in channel.component_names:
getattr(channel, signal).set(value)
continue
if "io" in channel.component_names and signal in channel.io.component_names:
getattr(channel.io, signal).set(value)
def _cleanup_ddg(self) -> None:
self._set_trigger(TriggerSource.SINGLE_SHOT)
@ -406,7 +410,24 @@ class DelayGeneratorDG645(Device):
def stage(self):
"""Trigger the generator by arming to accept triggers"""
# TODO check PV TriggerDelayBO, seems to be a bug in the IOC
self.scaninfo.load_scan_metadata()
if self.scaninfo.scan_type == "step":
# define parameters
self._set_trigger(TriggerSource.SINGLE_SHOT)
exp_time = (
self.delta_width.get() + self.scaninfo.exp_time
) # TODO add readout+ self.scantype.readout
delay_burst = self.delay_burst.get()
num_burst_cycle = 1 + self.additional_triggers.get()
# set parameters in DDG
self.burstEnable(num_burst_cycle, delay_burst, exp_time, config="first")
self._set_channels("delay", 0)
self._set_channels("width", exp_time)
elif self.scaninfo.scan_type == "fly":
if self.set_high_on_exposure.get():
...
else:
raise DDGError(f"Unknown scan type {self.scaninfo.scan_type}")
super().stage()
@ -415,6 +436,10 @@ class DelayGeneratorDG645(Device):
self._set_trigger(TriggerSource.SINGLE_SHOT)
super().stage()
def trigger(self) -> None:
if self.scaninfo.scan_type == "step":
self.trigger_shot.set(1).wait()
def burstEnable(self, count, delay, period, config="all"):
"""Enable the burst mode"""
# Validate inputs
@ -441,4 +466,5 @@ class DelayGeneratorDG645(Device):
# Automatically connect to test environmenr if directly invoked
if __name__ == "__main__":
dgen = DelayGeneratorDG645("X01DA-PC-DGEN:", name="delayer")
dgen = DelayGeneratorDG645("delaygen:DG1:", name="dgen", sim_mode=True)
dgen.stage()