From 5ce6fbcbb92d2fafb6cfcb4bb7b1f5ee616140b8 Mon Sep 17 00:00:00 2001 From: appel_c Date: Tue, 5 Sep 2023 12:28:38 +0200 Subject: [PATCH] fix: DDG logic to wait for burst in trigger --- .../epics/devices/DelayGeneratorDG645.py | 49 ++++++++++++------- .../epics/devices/bec_scaninfo_mixin.py | 7 ++- ophyd_devices/epics/devices/eiger9m_csaxs.py | 11 ++++- ophyd_devices/epics/devices/mcs_csaxs.py | 16 ++++-- 4 files changed, 56 insertions(+), 27 deletions(-) diff --git a/ophyd_devices/epics/devices/DelayGeneratorDG645.py b/ophyd_devices/epics/devices/DelayGeneratorDG645.py index 40cc546..4a897c2 100644 --- a/ophyd_devices/epics/devices/DelayGeneratorDG645.py +++ b/ophyd_devices/epics/devices/DelayGeneratorDG645.py @@ -1,8 +1,9 @@ import enum +import threading import time from typing import Any, List from ophyd import Device, Component, EpicsSignal, EpicsSignalRO, Kind -from ophyd import PVPositioner, Signal +from ophyd import PVPositioner, Signal, DeviceStatus from ophyd.pseudopos import ( pseudo_position_argument, real_position_argument, @@ -141,8 +142,11 @@ class DelayGeneratorDG645(Device): "reload_config", ] - trigger_burst_readout = Component(EpicsSignal, "EventStatusLI.PROC", name="read_burst_state") + trigger_burst_readout = Component( + EpicsSignal, "EventStatusLI.PROC", name="trigger_burst_readout" + ) burst_cycle_finished = Component(EpicsSignalRO, "EventStatusMBBID.B3", name="read_burst_state") + delay_finished = Component(EpicsSignalRO, "EventStatusMBBID.B2", name="delay_finished") status = Component(EpicsSignalRO, "StatusSI", name="status") clear_error = Component(EpicsSignal, "StatusClearBO", name="clear_error") @@ -442,17 +446,23 @@ class DelayGeneratorDG645(Device): # Set threshold level for ext. pulses self.level.set(self.thres_trig_level.get()) - def _check_burst_cycle(self) -> None: + def _check_burst_cycle(self, status) -> None: """Checks burst cycle of delay generator Force readout, return value from end of burst cycle """ while True: self.trigger_burst_readout.set(1) - if self.burst_cycle_finished.read()[self.burst_cycle_finished.name]["value"] == 1: + if ( + self.burst_cycle_finished.read()[self.burst_cycle_finished.name]["value"] == 1 + and self.delay_finished.read()[self.delay_finished.name]["value"] == 1 + ): self._acquisition_done = True + status.set_finished() return if self._stopped == True: - return + status.set_finished() + break + time.sleep(0.01) def stop(self, success=False): @@ -466,11 +476,14 @@ class DelayGeneratorDG645(Device): self.scaninfo.load_scan_metadata() if self.scaninfo.scan_type == "step": # define parameters - self._set_trigger(getattr(TriggerSource, self.set_trigger_source.get())) if self.set_high_on_exposure.get(): - num_burst_cycle = 1 - exp_time = self.delta_width.get() + self.scaninfo.num_frames * ( - self.scaninfo.exp_time + self.scaninfo.readout_time + self._set_trigger(getattr(TriggerSource, self.set_trigger_source.get())) + num_burst_cycle = 1 + self.additional_triggers.get() + exp_time = ( + self.delta_width.get() + + self.scaninfo.num_points + * self.scaninfo.frames_per_trigger + * (self.scaninfo.exp_time + self.scaninfo.readout_time) ) total_exposure = exp_time delay_burst = self.delay_burst.get() @@ -479,24 +492,24 @@ class DelayGeneratorDG645(Device): # Set burst length to half of the experimental time! self.set_channels("width", exp_time) else: + self._set_trigger(getattr(TriggerSource, self.set_trigger_source.get())) exp_time = self.delta_width.get() + self.scaninfo.exp_time total_exposure = exp_time + self.scaninfo.readout_time delay_burst = self.delay_burst.get() - num_burst_cycle = self.scaninfo.num_frames + self.additional_triggers.get() + num_burst_cycle = self.scaninfo.frames_per_trigger + self.additional_triggers.get() # set parameters in DDG self.burst_enable(num_burst_cycle, delay_burst, total_exposure, config="first") self.set_channels("delay", 0) # Set burst length to half of the experimental time! self.set_channels("width", exp_time) elif self.scaninfo.scan_type == "fly": - # Prepare FSH DDG if self.set_high_on_exposure.get(): # define parameters self._set_trigger(getattr(TriggerSource, self.set_trigger_source.get())) exp_time = ( self.delta_width.get() - + self.scaninfo.exp_time * self.scaninfo.num_frames - + self.scaninfo.readout_time * (self.scaninfo.num_frames - 1) + + self.scaninfo.exp_time * self.scaninfo.num_points + + self.scaninfo.readout_time * (self.scaninfo.num_points - 1) ) total_exposure = exp_time delay_burst = self.delay_burst.get() @@ -513,7 +526,7 @@ class DelayGeneratorDG645(Device): exp_time = self.delta_width.get() + self.scaninfo.exp_time total_exposure = exp_time + self.scaninfo.readout_time delay_burst = self.delay_burst.get() - num_burst_cycle = self.scaninfo.num_frames + self.additional_triggers.get() + num_burst_cycle = self.scaninfo.num_points + self.additional_triggers.get() # set parameters in DDG self.burst_enable(num_burst_cycle, delay_burst, total_exposure, config="first") self.set_channels("delay", 0.0) @@ -531,18 +544,20 @@ class DelayGeneratorDG645(Device): def unstage(self): """Stop the trigger generator from accepting triggers""" # self._set_trigger(getattr(TriggerSource, self.set_trigger_source.get())) - self._check_burst_cycle() # Check status self._ddg_is_okay() self._stopped = False self._acquisition_done = False super().unstage() - def trigger(self) -> None: + def trigger(self) -> DeviceStatus: # if self.scaninfo.scan_type == "step": if self.source.read()[self.source.name]["value"] == int(TriggerSource.SINGLE_SHOT): self.trigger_shot.set(1).wait() - super().trigger() + status = super().trigger() + burst_state = threading.Thread(target=self._check_burst_cycle, args=(status,), daemon=True) + burst_state.start() + return status def burst_enable(self, count, delay, period, config="all"): """Enable the burst mode""" diff --git a/ophyd_devices/epics/devices/bec_scaninfo_mixin.py b/ophyd_devices/epics/devices/bec_scaninfo_mixin.py index 5c79e48..09e1a0c 100644 --- a/ophyd_devices/epics/devices/bec_scaninfo_mixin.py +++ b/ophyd_devices/epics/devices/bec_scaninfo_mixin.py @@ -14,7 +14,7 @@ class BecScaninfoMixin: "queueID": "mockqueuid", "scan_number": 1, "exp_time": 26e-3, - "num_points": 10000, + "num_points": 9999, "readout_time": 2e-3, "scan_type": "fly", "num_lines": 10, @@ -54,9 +54,8 @@ class BecScaninfoMixin: self.scanID = scan_msg.content["scanID"] self.scan_number = scan_msg.content["info"]["scan_number"] self.exp_time = scan_msg.content["info"]["exp_time"] - self.num_frames = ( - scan_msg.content["info"]["num_points"] * scan_msg.content["info"]["frames_per_trigger"] - ) + self.frames_per_trigger = scan_msg.content["info"]["frames_per_trigger"] + self.num_points = scan_msg.content["info"]["num_points"] self.scan_type = scan_msg.content["info"].get("scan_type", "step") self.readout_time = scan_msg.content["info"]["readout_time"] self.username = self._get_username() diff --git a/ophyd_devices/epics/devices/eiger9m_csaxs.py b/ophyd_devices/epics/devices/eiger9m_csaxs.py index 261d398..2cd6d80 100644 --- a/ophyd_devices/epics/devices/eiger9m_csaxs.py +++ b/ophyd_devices/epics/devices/eiger9m_csaxs.py @@ -131,16 +131,20 @@ class Eiger9mCsaxs(DetectorBase): self.name = name self.wait_for_connection() # Make sure to be connected before talking to PVs if not sim_mode: + from bec_lib.core.bec_service import SERVICE_CONFIG + self.device_manager = device_manager self._producer = self.device_manager.producer + self.service_cfg = SERVICE_CONFIG.config["service_config"]["file_writer"] else: self._producer = bec_utils.MockProducer() self.device_manager = bec_utils.MockDeviceManager() + self.service_cfg = {"base_path": f"/sls/X12SA/data/{self.scaninfo.username}/Data10/"} self.scaninfo = BecScaninfoMixin(device_manager, sim_mode) # TODO self.filepath = "" self.scaninfo.username = "e21206" - self.service_cfg = {"base_path": f"/sls/X12SA/data/{self.scaninfo.username}/Data10/"} + self.filewriter = FileWriterMixin(self.service_cfg) self.reduce_readout = 1e-3 # 3 ms self.triggermode = 0 # 0 : internal, scan must set this if hardware triggered @@ -176,9 +180,11 @@ class Eiger9mCsaxs(DetectorBase): self.std_client.stop_writer() timeout = 0 self._update_std_cfg("writer_user_id", int(self.scaninfo.username.strip(" e"))) + time.sleep(1) while not self.std_client.get_status()["state"] == "READY": time.sleep(0.1) timeout = timeout + 0.1 + logger.info("Waiting for std_daq init.") if timeout > 2: if not self.std_client.get_status()["state"]: raise EigerError( @@ -188,8 +194,11 @@ class Eiger9mCsaxs(DetectorBase): return def _prep_det(self) -> None: + logger.info("prepping thresholds") self._set_det_threshold() + logger.info("prepping detector parameter") self._set_acquisition_params() + logger.info("setting trigger") self._set_trigger(TriggerSource.GATING) def _set_det_threshold(self) -> None: diff --git a/ophyd_devices/epics/devices/mcs_csaxs.py b/ophyd_devices/epics/devices/mcs_csaxs.py index f00095d..49a46ad 100644 --- a/ophyd_devices/epics/devices/mcs_csaxs.py +++ b/ophyd_devices/epics/devices/mcs_csaxs.py @@ -1,4 +1,5 @@ import enum +import threading import time from typing import Any, List import numpy as np @@ -15,7 +16,7 @@ from bec_lib.core import BECMessage, MessageEndpoints from bec_lib.core.file_utils import FileWriterMixin from collections import defaultdict -from bec_lib.core import bec_logger +from bec_lib.core import bec_logger, threadlocked logger = bec_logger.logger @@ -186,6 +187,7 @@ class McsCsaxs(SIS38XX): self.filewriter = FileWriterMixin(self.service_cfg) self._stopped = False self._acquisition_done = False + self._lock = threading.RLock() self._init_mcs() def _init_mcs(self) -> None: @@ -212,6 +214,7 @@ class McsCsaxs(SIS38XX): signal.subscribe(self._on_mca_data, run=False) self._counter = 0 + @threadlocked def _on_mca_data(self, *args, obj=None, **kwargs) -> None: if not isinstance(kwargs["value"], (list, np.ndarray)): return @@ -229,11 +232,12 @@ class McsCsaxs(SIS38XX): self._counter += 1 if self._counter == self.num_lines.get(): self._acquisition_done = True - self._send_data_to_bec() self.stop_all.put(1, use_complete=False) self._send_data_to_bec() self.erase_all.set(1) - # TODO how to make card wait for sure! + # Require wait for + # time.sleep(0.01) + self.mca_data = defaultdict(lambda: []) self._counter = 0 return self.erase_start.set(1) @@ -252,7 +256,8 @@ class McsCsaxs(SIS38XX): ) logger.info(f"{self.mca_data}") msg = BECMessage.DeviceMessage( - signals=dict(self.mca_data), metadata=self.scaninfo.scan_msg.metadata + signals=dict(self.mca_data), + metadata=self.scaninfo.scan_msg.metadata, ).dumps() self._producer.xadd( topic=MessageEndpoints.device_async_readback( @@ -294,6 +299,7 @@ class McsCsaxs(SIS38XX): def stage(self) -> List[object]: """stage the detector and file writer""" + logger.info("Stage Eiger") self.scaninfo.load_scan_metadata() self._prep_det() self._prep_readout() @@ -311,7 +317,7 @@ class McsCsaxs(SIS38XX): break time.sleep(0.005) logger.info("mcs is ready and running") - time.sleep(5) + # time.sleep(5) return super().stage() def unstage(self) -> List[object]: