fix: Fix MCS card and DDG implementation after testing with hardware at cSAXS

This commit is contained in:
2025-12-09 10:00:16 +01:00
parent 14c56939bf
commit 188e23df48
5 changed files with 290 additions and 337 deletions

View File

@@ -127,14 +127,27 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
Sets DEFAULT_IO_CONFIG into each channel, sets the trigger source to DEFAULT_TRIGGER_SOURCE,
and turns off burst mode.
"""
self.burst_disable() # it is possible to miss setting settings if burst is enabled
# NOTE First we make sure that there is nothing running on the DDG. This seems to
# help to tackle that the DDG occasionally freezes during the first scan
# after reconnecting to it. Do not remove.
self.stop_ddg()
# NOTE Setting DEFAULT configurations for IO config, trigger config and references.
# The three dictionaries above 'DEFAULT_IO_CONFIG', 'DEFAULT_TRIGGER_SOURCE' and
# 'DEFAULT_REFERNCES' should be used to adapt configurations if needed.
for channel, config in DEFAULT_IO_CONFIG.items():
self.set_io_values(channel, **config)
self.set_trigger(DEFAULT_TRIGGER_SOURCE)
self.set_references_for_channels(DEFAULT_REFERENCES)
# Set proc status to passively update with 5Hz (0.2s)
# NOTE Set state proc_status to be event based. This triggers readouts of the EventStatusLI bit
# based on events. This was empirically found to be a stable solution in combination with the poll
# loop of the state.
self.state.proc_status_mode.put(PROC_EVENT_MODE.EVENT)
# NOTE Burst delay should be set to 0, don't remove as this will not be checked
self.burst_delay.put(0)
def on_stage(self) -> None:
"""
Stage logic for the DDG1 device, being th main trigger delay generator for CSAXS.
@@ -143,45 +156,87 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
This DDG is always not in burst mode.
"""
# NOTE Only set relevant channels on burst_mode channel
# After mutliple tests with the HW, this procedure has been determined empirically
# to improve stability and avoid HW getting stuck in triggering cycles
# Please also note that this should happen first, before setting delay times on the chabnnels.
if self.burst_mode.get() == 0:
self.burst_mode.put(1)
exp_time = self.scan_info.msg.scan_parameters["exp_time"]
self.burst_enable(1, 0, exp_time)
exp_time = self.scan_info.msg.scan_parameters["exp_time"]
if self.burst_period.get() != exp_time:
self.burst_period.put(exp_time)
if self.burst_delay.get() != 0:
self.burst_delay.put(0)
#########################################
### Setup delay pairs for acquisition ###
#########################################
frames_per_trigger = self.scan_info.msg.scan_parameters["frames_per_trigger"]
# Trigger DDG2
# a = t0 + 2ms, b = a + 1us
# a has reference to t0, b has reference to a
self.set_delay_pairs(channel="ab", delay=2e-3, width=1e-6)
# Trigger shutter
shutter_width = 2e-3 + exp_time * frames_per_trigger + 1e-3
# d = c/t0 + 2ms + exp_time * burst_count + 1ms
# c has reference to t0, d has reference to c
shutter_width = 2e-3 + exp_time * frames_per_trigger + 1e-3
self.set_delay_pairs(channel="cd", delay=0, width=shutter_width)
# Trigger extra pulse for MCS OR gate
# f = e + 1us
# e has refernce to d, f has reference to e
self.set_delay_pairs(channel="ef", delay=0, width=1e-6)
time.sleep(
0.2
) # After staging, make sure that the DDG HW has some time to process changes properly.
# NOTE Add additional sleep to make sure that the IOC and DDG HW process the values properly
# This value has been choosen empirically after testing with the HW. Please acknowledge that
# this is called in parallel, so it should not add significant overhead to acquisition. It's
# also just called once per scan.
time.sleep(0.2)
def _prepare_mcs_on_trigger(self, mcs: MCSCardCSAXS) -> None:
"""Prepare the MCS card for the next trigger.
This method holds the logic to ensure that the MCS card is ready to read.
It's logic is coupled to the MCS card implementation and the DDG1 trigger logic.
"""
mcs.stop_all.put(1)
# NOTE: It is crucial to first wait for the MCS card to finish it's acquisition before
# the DDG moves on to the next trigger cycle.
status = CompareStatus(mcs.acquiring, ACQUIRING.DONE)
self.cancel_on_stop(status)
status.wait(timeout=5)
# NOTE: Important logic on the MCS card, this makes sure that callbacks from the MCA channels
# are not surpressed. Please check MCS card and 'erase_all' comment.
mcs._omit_mca_callbacks.clear()
status_acquiring = TransitionStatus(mcs.acquiring, [ACQUIRING.DONE, ACQUIRING.ACQUIRING])
self.cancel_on_stop(status_acquiring)
mcs.erase_start.put(1)
status_acquiring.wait(timeout=10) # Allow 10 seconds in case communication is slow
mcs.erase_start.put(1)
# NOTE: Now we wait for the card to go to Acuiring after we've called erase_start
# Please increase the timeout if this turns out to be problematic
status_acquiring.wait(timeout=3)
def _poll_event_status(self) -> None:
"""
Poll the event status register in a background thread. Control
the polling with the _poll_thread_run_event and _poll_thread_kill_event.
"""
# NOTE hook to kill the loop, only needed if device is destroyed
while not self._poll_thread_kill_event.is_set():
# The thread will wait in this event if IDLE. Polling can be started
# by setting 'poll_thread_run_event.set()'. Please check usage for software
# triggered scans from BEC within on_trigger.
self._poll_thread_run_event.wait()
# NOTE Event to indicate that polling is taking place currently. This is needed as there
# are sleeps of 20ms in the poll loop which were empirically determined after long testing
# to improve stability in communication with the HW.
self._poll_thread_poll_loop_done.clear()
while (
self._poll_thread_run_event.is_set() and not self._poll_thread_kill_event.is_set()
@@ -193,29 +248,36 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
logger.error(
f"Exception in polling loop thread, polling continues...\n Error content:\n{content}"
)
# NOTE Important to set the event again. The next trigger loop waits for the poll thread to become
# IDLE again. Do not remove.
self._poll_thread_poll_loop_done.set()
def _poll_loop(self) -> None:
"""
Poll loop to update event status.
The checks ensure that the loop exist after each operation and be stuck in sleep.
The 20ms sleep was added to ensure that the event status is not polled too frequently,
and to give the device time to process the previous command. This was found empirically
to be necessary to avoid missing events.
IMPORTANT: Do not remove sleeps or try to optimize this logic. This seems to be a
fragile balance between polling frequency and device processing time. Also in between
start/stop of polling. Please also consider that there is a sleep in on_trigger and
that this might also be necessary to avoid that HW becomes unavailable/unstable.
"""
self.state.proc_status.put(1, use_complete=True)
time.sleep(0.02) # 20ms delay for processing, important for not missing events
#NOTE: Important sleep that has been empirically determined after testing for a long time
# Only remove if absolutely certain that the DDG logic of polling the EventStatusLI works without it.
time.sleep(0.02)
if self._poll_thread_kill_event.is_set() or not self._poll_thread_run_event.is_set():
return
self.state.event_status.get(use_monitor=False)
if self._poll_thread_kill_event.is_set() or not self._poll_thread_run_event.is_set():
return
time.sleep(0.02) # 20ms delay for processing, important for not missing events
#NOTE: Again important sleep that has been empirically determined after testing for a long time
# Only remove if certain that logic can be replaced to not risk HW failures.
time.sleep(0.02)
def _start_polling(self) -> None:
"""Start the polling loop in the background thread."""
@@ -246,7 +308,8 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
# Callback to cancel the status if the device is stopped
def cancel_cb(status: CompareStatus) -> None:
"""Callback to cancel the status if the device is stopped."""
self._stop_polling()
if status.done:
self._stop_polling()
# Run false is important to ensure that the status is only checked on the next event status update
status = StatusBitsCompareStatus(
@@ -257,20 +320,27 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
return status
def on_trigger(self) -> DeviceStatus:
"""Note, we need to add a delay to the StatusBits callback on the event_status.
If we don't then subsequent triggers may reach the DDG too early, and will be ignored. To
avoid this, we've added the option to specify a delay via add_delay, default here is 50ms.
"""
This method is called from BEC as a software trigger.
It first stops any active polling if still running. The sleep of 20ms is important
for proper functionality of the card. Then it checks if the 'mcs' card is in the config
and enabled, and prepares the card for triggering. For now this is still relevant, but may
be moved to a high level logic in BEC in the future (neeeds).
Then a status_object is prepared that receives the EventStatusLI epics channel (self.state.event_status),
and attaches a callback that resolves once the burst is done. The polling thread is enabled to manually
trigger a reading of the event status before a software trigger is sent via trigger_shot.
"""
# Stop polling, poll once manually to ensure that the register is clean
self._stop_polling()
self._poll_thread_poll_loop_done.wait(timeout=1)
# IMPORTANT: Keep this sleep setting, as it is necessary to avoid that the HW
# becomes unresponsive. This was found empirically and seems to be necessary
# NOTE: THis sleep is important for the HW to process the event and avoid that
# becomes unresponsive. This was found empirically after long testing.
time.sleep(0.02)
# Prepare the MCS card for the next software trigger
mcs = self.device_manager.devices.get("mcs", None)
if mcs is None:
if mcs is None or mcs.enabled is False:
logger.info("Did not find mcs card with name 'mcs' in current session")
else:
self._prepare_mcs_on_trigger(mcs)

View File

@@ -3,6 +3,11 @@ Delay generator implementation for CSAXS.
Detailed information can be found in the manual:
https://www.thinksrs.com/downloads/pdfs/manuals/DG645m.pdf
On the beamline consoles, the caqtdm panel can be started via:
caqtdm -noMsg -attach -macro P=X12SA-CPCL-DDG,R=1: srsDG645.ui
R=1,2,3 for 3 different DDG units installed at CSAXS.
"""
import enum
@@ -151,8 +156,9 @@ class StatusBitsCompareStatus(SubscriptionStatus):
run=run,
)
def _compare_callback(self, value, **kwargs) -> bool:
def _compare_callback(self, *args, value, **kwargs) -> bool:
"""Callback for subscription status"""
logger.info(f"StatusBitsCompareStatus: Received value {value}")
obj = kwargs.get("obj", None)
if obj is None:
name = "no object received"
@@ -167,7 +173,9 @@ class StatusBitsCompareStatus(SubscriptionStatus):
return False
if self._add_delay != 0:
time.sleep(self._add_delay)
logger.info(
f"Returning comparison for {name}: {(STATUSBITS(value) & self._value) == self._value}"
)
return (STATUSBITS(value) & self._value) == self._value
@@ -533,6 +541,7 @@ class DelayGeneratorCSAXS(Device):
write_pv="BurstDelayAO",
name="burst_delay",
kind=Kind.omitted,
auto_monitor=True,
doc="Delay before bursts start in seconds. Must be >=0.",
)
burst_period = Cpt(

View File

@@ -170,11 +170,12 @@ class MCSCard(Device):
kind=Kind.omitted,
doc="Indicates whether the SNL program has connected to all PVs.",
)
# NOTE: Please note that the erase_all command sends the mca or waveform records to process after erasing, potentially also values of 0. This logic needs to be considered when running callbacks on the mca channels.
erase_all = Cpt(
EpicsSignal,
"EraseAll",
kind=Kind.omitted,
doc="Erases all mca or waveform records, setting elapsed times and counts in all channels to 0.",
doc="Erases all mca or waveform records, setting elapsed times and counts in all channels to 0. Please note that this operation sends the mca or waveform records to process after erasing, potentially also 0s.",
)
erase_start = Cpt(
EpicsSignal,
@@ -192,6 +193,7 @@ class MCSCard(Device):
EpicsSignalRO,
"Acquiring",
kind=Kind.omitted,
auto_monitor=True,
doc="Acquiring (=1) when acquisition is in progress and Done (=0) when acquisition is complete.",
)
stop_all = Cpt(EpicsSignal, "StopAll", kind=Kind.omitted, doc="Stops acquisition.")
@@ -279,11 +281,12 @@ class MCSCard(Device):
kind=Kind.omitted,
doc="The current acquisition mode (MCS=0 or Scaler=1). This record is used to turn off the scaler record Autocount in MCS mode.",
)
# NOTE: Setting mux_output programmatically results in occasional errors on the IOC; it is recommended to avoid using it.
mux_output = Cpt(
EpicsSignal,
"MUXOutput",
kind=Kind.omitted,
doc="Value of 0-32 used to select which input signal is routed to output signal 7 on the SIS3820 in output mode 3.",
doc="Value of 0-32 used to select which input signal is routed to output signal 7 on the SIS3820 in output mode 3. NOTE: This settings seems to occasionally result in errors on the IOC; it is recommended to avoid using it.",
)
user_led = Cpt(
EpicsSignal,

View File

@@ -2,15 +2,19 @@
from __future__ import annotations
import threading
import time
import traceback
from contextlib import contextmanager
from functools import partial
from threading import RLock
from typing import TYPE_CHECKING, Literal
from typing import TYPE_CHECKING, Callable, Literal
import numpy as np
from bec_lib.logger import bec_logger
from ophyd import Component as Cpt
from ophyd import EpicsSignalRO, Kind, SignalRO
from ophyd_devices import AsyncSignal, CompareStatus, ProgressSignal, TransitionStatus
from ophyd import EpicsSignalRO, Kind
from ophyd_devices import AsyncSignal, CompareStatus, ProgressSignal, StatusBase
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from ophyd_devices.utils.bec_signals import AsyncMultiSignal
@@ -26,6 +30,23 @@ from csaxs_bec.devices.epics.mcs_card.mcs_card import (
MCSCard,
)
@contextmanager
def suppress_mca_callbacks(mcs_card: MCSCard):
"""
Context manager to temporarily disable MCA channel callbacks.
Required to avoid additional callbacks when erasing all channels.
Args:
mcs_card (MCSCard): The MCSCard instance to suppress callbacks for.
"""
mcs_card._omit_mca_callbacks.set()
try:
yield
finally:
pass
if TYPE_CHECKING: # pragma: no cover
from bec_lib.devicemanager import DeviceManagerBase, ScanInfo
@@ -36,276 +57,27 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
"""
Implementation of the MCSCard SIS3820 for CSAXS, prefix 'X12SA-MCS:'.
The basic functionality is inherited from the MCSCard class.
Please note that the number of channels is fixed to 32, so there will be data for all
32 channels even if not all channels are used in the experiment. This setting can not
be realibly changed on the SIS3820 card's IOC through mux_output, so it is fixed here.
Mux_output should therefore also be set to 32 in the IOC configuration.
"""
USER_ACCESS = ["mcs_recovery"]
NUM_MCA_CHANNELS: int = 32
# All counter from the MCS card.
# mca = Cpt(
# AsyncMultiSignal,
# name="counters",
# signals=[
# f"mca{i}" for i in range(1, 33)
# ], # This needs to be in sync with counters DynamicDeviceComponent
# ndim=0,
# async_update={"type": "add", "max_shape": [None]},
# max_size=1000,
# kind=Kind.normal,
# doc="AsyncMultiSignal for MCA card channels 1-32",
# )
mca1 = Cpt(
AsyncSignal,
name="mca1",
kind=Kind.normal,
mca = Cpt(
AsyncMultiSignal,
name="counters",
signals=[
f"mca{i}" for i in range(1, 33)
], # This needs to be in sync with counters DynamicDeviceComponent
ndim=1,
async_update={"type": "add", "max_shape": [None]},
max_size=1000,
doc="AsyncSignal for MCA channel 1",
)
mca2 = Cpt(
AsyncSignal,
name="mca2",
kind=Kind.normal,
async_update={"type": "add", "max_shape": [None]},
max_size=1000,
doc="AsyncSignal for MCA channel 2",
)
mca3 = Cpt(
AsyncSignal,
name="mca3",
kind=Kind.normal,
async_update={"type": "add", "max_shape": [None]},
max_size=1000,
doc="AsyncSignal for MCA channel 3",
)
mca4 = Cpt(
AsyncSignal,
name="mca4",
kind=Kind.normal,
async_update={"type": "add", "max_shape": [None]},
max_size=1000,
doc="AsyncSignal for MCA channel 4",
)
mca5 = Cpt(
AsyncSignal,
name="mca5",
kind=Kind.normal,
async_update={"type": "add", "max_shape": [None]},
max_size=1000,
doc="AsyncSignal for MCA channel 5",
)
mca6 = Cpt(
AsyncSignal,
name="mca6",
kind=Kind.normal,
async_update={"type": "add", "max_shape": [None]},
max_size=1000,
doc="AsyncSignal for MCA channel 6",
)
mca7 = Cpt(
AsyncSignal,
name="mca7",
kind=Kind.normal,
async_update={"type": "add", "max_shape": [None]},
max_size=1000,
doc="AsyncSignal for MCA channel 7",
)
mca8 = Cpt(
AsyncSignal,
name="mca8",
kind=Kind.normal,
async_update={"type": "add", "max_shape": [None]},
max_size=1000,
doc="AsyncSignal for MCA channel 8",
)
mca9 = Cpt(
AsyncSignal,
name="mca9",
kind=Kind.normal,
async_update={"type": "add", "max_shape": [None]},
max_size=1000,
doc="AsyncSignal for MCA channel 9",
)
mca10 = Cpt(
AsyncSignal,
name="mca10",
kind=Kind.normal,
async_update={"type": "add", "max_shape": [None]},
max_size=1000,
doc="AsyncSignal for MCA channel 10",
)
mca11 = Cpt(
AsyncSignal,
name="mca11",
kind=Kind.normal,
async_update={"type": "add", "max_shape": [None]},
max_size=1000,
doc="AsyncSignal for MCA channel 11",
)
mca12 = Cpt(
AsyncSignal,
name="mca12",
kind=Kind.normal,
async_update={"type": "add", "max_shape": [None]},
max_size=1000,
doc="AsyncSignal for MCA channel 12",
)
mca13 = Cpt(
AsyncSignal,
name="mca13",
kind=Kind.normal,
async_update={"type": "add", "max_shape": [None]},
max_size=1000,
doc="AsyncSignal for MCA channel 13",
)
mca14 = Cpt(
AsyncSignal,
name="mca14",
kind=Kind.normal,
async_update={"type": "add", "max_shape": [None]},
max_size=1000,
doc="AsyncSignal for MCA channel 14",
)
mca15 = Cpt(
AsyncSignal,
name="mca15",
kind=Kind.normal,
async_update={"type": "add", "max_shape": [None]},
max_size=1000,
doc="AsyncSignal for MCA channel 15",
)
mca16 = Cpt(
AsyncSignal,
name="mca16",
kind=Kind.normal,
async_update={"type": "add", "max_shape": [None]},
max_size=1000,
doc="AsyncSignal for MCA channel 16",
)
mca17 = Cpt(
AsyncSignal,
name="mca17",
kind=Kind.normal,
async_update={"type": "add", "max_shape": [None]},
max_size=1000,
doc="AsyncSignal for MCA channel 17",
)
mca18 = Cpt(
AsyncSignal,
name="mca18",
kind=Kind.normal,
async_update={"type": "add", "max_shape": [None]},
max_size=1000,
doc="AsyncSignal for MCA channel 18",
)
mca19 = Cpt(
AsyncSignal,
name="mca19",
kind=Kind.normal,
async_update={"type": "add", "max_shape": [None]},
max_size=1000,
doc="AsyncSignal for MCA channel 19",
)
mca20 = Cpt(
AsyncSignal,
name="mca20",
kind=Kind.normal,
async_update={"type": "add", "max_shape": [None]},
max_size=1000,
doc="AsyncSignal for MCA channel 20",
)
mca21 = Cpt(
AsyncSignal,
name="mca21",
kind=Kind.normal,
async_update={"type": "add", "max_shape": [None]},
max_size=1000,
doc="AsyncSignal for MCA channel 21",
)
mca22 = Cpt(
AsyncSignal,
name="mca22",
kind=Kind.normal,
async_update={"type": "add", "max_shape": [None]},
max_size=1000,
doc="AsyncSignal for MCA channel 22",
)
mca23 = Cpt(
AsyncSignal,
name="mca23",
kind=Kind.normal,
async_update={"type": "add", "max_shape": [None]},
max_size=1000,
doc="AsyncSignal for MCA channel 23",
)
mca24 = Cpt(
AsyncSignal,
name="mca24",
kind=Kind.normal,
async_update={"type": "add", "max_shape": [None]},
max_size=1000,
doc="AsyncSignal for MCA channel 24",
)
mca25 = Cpt(
AsyncSignal,
name="mca25",
kind=Kind.normal,
async_update={"type": "add", "max_shape": [None]},
max_size=1000,
doc="AsyncSignal for MCA channel 25",
)
mca26 = Cpt(
AsyncSignal,
name="mca26",
kind=Kind.normal,
async_update={"type": "add", "max_shape": [None]},
max_size=1000,
doc="AsyncSignal for MCA channel 26",
)
mca27 = Cpt(
AsyncSignal,
name="mca27",
kind=Kind.normal,
async_update={"type": "add", "max_shape": [None]},
max_size=1000,
doc="AsyncSignal for MCA channel 27",
)
mca28 = Cpt(
AsyncSignal,
name="mca28",
kind=Kind.normal,
async_update={"type": "add", "max_shape": [None]},
max_size=1000,
doc="AsyncSignal for MCA channel 28",
)
mca29 = Cpt(
AsyncSignal,
name="mca29",
kind=Kind.normal,
async_update={"type": "add", "max_shape": [None]},
max_size=1000,
doc="AsyncSignal for MCA channel 29",
)
mca30 = Cpt(
AsyncSignal,
name="mca30",
kind=Kind.normal,
async_update={"type": "add", "max_shape": [None]},
max_size=1000,
doc="AsyncSignal for MCA channel 30",
)
mca31 = Cpt(
AsyncSignal,
name="mca31",
kind=Kind.normal,
async_update={"type": "add", "max_shape": [None]},
max_size=1000,
doc="AsyncSignal for MCA channel 31",
)
mca32 = Cpt(
AsyncSignal,
name="mca32",
kind=Kind.normal,
async_update={"type": "add", "max_shape": [None]},
max_size=1000,
doc="AsyncSignal for MCA channel 32",
doc="AsyncMultiSignal for MCA card channels 1-32",
)
# Progress Signal
progress = Cpt(ProgressSignal, doc="ProgressSignal indicating the progress of the device")
@@ -314,7 +86,6 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
self,
name: str,
prefix: str = "",
num_connected_channels: int = 5,
scan_info: ScanInfo | None = None,
device_manager: DeviceManagerBase | None = None,
**kwargs,
@@ -328,29 +99,36 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
self._mcs_clock = 1e7 # 10MHz clock -> 1e7 Hz
self._pv_timeout = 2.0 # seconds
self._rlock = RLock() # Needed to ensure thread safety for counter updates
self.num_connected_channels = num_connected_channels
self._received_updates: dict[
str, dict[Literal["value", "timestamp"], list[int] | float]
] = {}
self._acquisition_group: str = "monitored"
self._num_total_triggers: int = 0
# Event logic to schedule async data emission & monitoring
# Please note that complete needs to wait until all data was sent
# This requires additional logic and a thread to monitor the data emission
self._scan_done_thread_kill_event: threading.Event = threading.Event()
self._start_monitor_async_data_emission: threading.Event = threading.Event()
self._scan_done_callbacks: list[Callable[[], None]] = []
self._scan_done_thread: threading.Thread = threading.Thread(
target=self._monitor_async_data_emission, daemon=True
)
self._current_data_index: int = 0
self._mca_counter_index: int = 0
self._current_data: dict[str, dict[Literal["value", "timestamp"], list[int] | float]] = {}
self._omit_mca_callbacks: threading.Event = threading.Event()
def on_connected(self):
"""
Called when the device is connected.
"""
# Make sure card is not running
self.stop_all.put(1)
# Setup the MCS card settings
self.channel_advance.set(CHANNELADVANCE.EXTERNAL).wait(timeout=self._pv_timeout)
self.channel1_source.set(CHANNEL1SOURCE.EXTERNAL).wait(timeout=self._pv_timeout)
self.prescale.set(1).wait(timeout=self._pv_timeout)
self.user_led.set(0).wait(timeout=self._pv_timeout)
# Set mux_output to number of connected channels. Connect channels in increasing order
self.mux_output.set(self.num_connected_channels).wait(timeout=self._pv_timeout)
# Set the input and output modes & polarities
self.input_mode.set(INPUTMODE.MODE_3).wait(timeout=self._pv_timeout)
self.input_polarity.set(POLARITY.NORMAL).wait(timeout=self._pv_timeout)
@@ -367,11 +145,17 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
# Subscribe the progress signal
self.current_channel.subscribe(self._progress_update, run=False)
self.mcs_recovery()
# Subscribe to the mca updates
for sig in self.counters.component_names:
sig_obj: EpicsSignalRO = getattr(self.counters, sig)
sig_obj.subscribe(self._on_counter_update, run=False)
# Start monitoring thread
self._scan_done_thread.start()
def _on_counter_update(self, value, **kwargs) -> None:
"""
Callback for counter updates of the mca channels (1-32).
@@ -381,74 +165,166 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
"""
with self._rlock:
self._mca_counter_index += 1
if self._omit_mca_callbacks.is_set():
return # Suppress callbacks when erasing all channels
signal = kwargs.get("obj", None)
if signal is None:
logger.error(f"Called without 'obj' in kwargs: {kwargs}")
return
attr_name = signal.name
mca_channel = getattr(self, attr_name, None)
if mca_channel is None:
logger.error(f"Could not find matching MCA channel for signal {signal.name}")
signal: EpicsSignalRO
attr_name = signal.attr_name
# Ignore updates for channels that are not setup through num_connected_channels
index = int(attr_name[3:]) # Extract index from 'mcaX'
if index > self.NUM_MCA_CHANNELS:
return
mca_channel: AsyncSignal
if isinstance(value, np.ndarray):
value = value.tolist() # Convert numpy array to list
else:
value = [value] # Received single value, convert to list
data = {
attr_name: {"value": value, "timestamp": kwargs.get("timestamp") or time.time()}
}
mca_channel.put(data)
# self._received_updates.update(data)
# if len(self._received_updates) == self.num_connected_channels:
# # Send out data on multi async signal
# self.mca.put(self._received_updates, acquisition_group=self._acquisition_group)
# self._received_updates.clear()
self._current_data.update(
{attr_name: {"value": value, "timestamp": kwargs.get("timestamp") or time.time()}}
)
if len(self._current_data) == self.NUM_MCA_CHANNELS:
# Send out data on multi async signal
self.mca.put(self._current_data, acquisition_group=self._acquisition_group)
self._current_data.clear()
self._mca_counter_index = 0
self._current_data_index += 1
def _progress_update(self, value, **kwargs) -> None:
def _progress_update(self, *args, old_value: any, value: any, **kwargs) -> None:
"""Callback for progress updates from ophyd subscription on current_channel."""
self.progress.put(
value=value,
max_value=self._num_total_triggers,
done=bool(value == self._num_total_triggers),
)
scan_done = bool(value == self._num_total_triggers)
self.progress.put(value=value, max_value=self._num_total_triggers, done=scan_done)
if scan_done:
self._scan_done_event.set()
def on_stage(self) -> None:
"""
Called when the device is staged.
"""
self.erase_all.set(1).wait(timeout=self._pv_timeout)
# NOTE: If for some reason, the card is still acquiring, we need to stop it first
# This should never happen as the card is properly stopped during unstage
# Can only happen if user manually interferes with the IOC through other means
if self.acquiring.get() == ACQUIRING.ACQUIRING:
logger.warning(
f"MCS Card {self.name} was still acquiring on staging. Stopping acquisition."
)
self.stop_all.put(1)
status = CompareStatus(self.acquiring, ACQUIRING.DONE)
status.wait(timeout=10)
# NOTE: Erase all will result in data emission through mca callback subscriptions
# The buffer needs to be cleared as this will otherwise lead to missing
# triggers during the scan. Again, this should not happen if unstage is properly called.
# But user interference or a restart of the device_server may lead to this situation.
# self.erase_all.put(1)
# time.sleep(3)
if self.current_channel.get() != 0:
with suppress_mca_callbacks(self):
logger.warning(
f"MCS Card {self.name} had still data in buffer Erased all data on staging and sleeping for 1 second."
)
# Erase all data on the MCS card
self.erase_all.put(1)
time.sleep(1) # Allow time to process erase
triggers = self.scan_info.msg.scan_parameters.get("frames_per_trigger", 1)
num_points = self.scan_info.msg.num_points
self._num_total_triggers = triggers * num_points
self._acquisition_group = "monitored" if triggers == 1 else "burst_group"
self.preset_real.set(0).wait(timeout=self._pv_timeout)
self.num_use_all.set(triggers).wait(timeout=self._pv_timeout)
# Reset data
self._received_updates.clear()
# Make sure to clear any remaining data in the local buffer
with self._rlock:
self._current_data.clear()
# Reset monitoring of async data emission
self._start_monitor_async_data_emission.clear()
self._scan_done_callbacks.clear()
self._current_data_index = 0
def on_unstage(self) -> None:
"""
Called when the device is unstaged.
"""
self.stop_all.put(1)
self.erase_all.set(0).wait(timeout=self._pv_timeout)
# Make sure that upon unstaging, all data on the MCS card is erased.
with suppress_mca_callbacks(self):
with self._rlock:
self._current_data.clear()
self.erase_all.put(1)
def on_pre_scan(self) -> None:
"""
Called before the scan starts.
"""
def _monitor_async_data_emission(self, timeout: int = 10) -> None:
"""Monitor data emission after scan is done."""
while not self._scan_done_thread_kill_event.is_set():
while self._start_monitor_async_data_emission.wait():
try:
if self._current_data_index == self.scan_info.msg.num_points:
for callback in self._scan_done_callbacks:
callback()
time.sleep(0.02) # 20ms delay to avoid busy loop
except Exception: # pylint: disable=broad-except
content = traceback.format_exc()
logger.error(
f"Exception in monitoring thread of complete for {self.name}:\n{content}"
"Running callbacks to avoid deadlock."
)
for callback in self._scan_done_callbacks:
callback()
def _status_callback(self, status: StatusBase) -> None:
"""Callback for status completion."""
if not status.done:
status.set_finished()
self._start_monitor_async_data_emission.clear() # Stop monitoring
def _status_failed_callback(self, status: StatusBase) -> None:
"""Callback for status failure."""
if status.done and not status.success:
self._start_monitor_async_data_emission.clear() # Stop monitoring
def on_complete(self) -> CompareStatus:
"""On scan completion."""
# Check if we should get a signal based on updates from the MCA channels
# Prepare callback for data emission done
status_async_data = StatusBase(obj=self)
self._scan_done_callbacks.append(partial(self._status_callback, status_async_data))
# Start done callback loop
self._start_monitor_async_data_emission.set()
# Add CompareStatus for Acquiring DONE
status = CompareStatus(self.acquiring, ACQUIRING.DONE)
self.cancel_on_stop(status)
# Combine both statuses
ret_status = status & status_async_data
ret_status.add_callback(self._status_failed_callback)
self.cancel_on_stop(ret_status)
return status
def on_destroy(self):
self._scan_done_thread_kill_event.set()
self._start_monitor_async_data_emission.set()
if self._scan_done_thread.is_alive():
self._scan_done_thread.join(timeout=2.0)
if self._scan_done_thread.is_alive():
logger.warning(f"Thread for device {self.name} did not terminate properly.")
def on_stop(self) -> None:
"""
Called when the scan is stopped.
"""
self.stop_all.put(1)
self.erase_all.put(1)
def mcs_recovery(self, timeout:int=1) -> None:
"""Recovery procedure for the mcs card"""
sleep_time = timeout / 2 # 2 sleeps
logger.info(f"Running recovery procedure for MCS card {self.name} with {sleep_time}s sleep, calling stop_all and erase_all, and another {sleep_time}s sleep")
self.erase_start.put(1)
time.sleep(sleep_time)
self.stop_all.put(1)
self.erase_all.put(1)
time.sleep(sleep_time)

View File

@@ -21,7 +21,7 @@ from csaxs_bec.devices.epics.mcs_card.mcs_card import (
READMODE,
MCSCard,
)
from csaxs_bec.devices.epics.mcs_card.mcs_card_csaxs import READYTOREAD, MCSCardCSAXS
from csaxs_bec.devices.epics.mcs_card.mcs_card_csaxs import MCSCardCSAXS
@pytest.fixture(scope="function")
@@ -85,8 +85,6 @@ def test_mcs_card_csaxs_on_connected(mock_mcs_csaxs):
assert mcs.prescale.get() == 1
#
assert mcs.user_led.get() == 0
# Only 5 channels are connected
assert mcs.mux_output.get() == 5
# input output settings
assert mcs.input_mode.get() == INPUTMODE.MODE_3
assert mcs.input_polarity.get() == POLARITY.NORMAL
@@ -122,7 +120,6 @@ def test_mcs_card_csaxs_unstage(mock_mcs_csaxs):
mcs.erase_all.put(1)
mcs.unstage()
assert mcs.stop_all.get() == 1
assert mcs.ready_to_read.get() == READYTOREAD.DONE
assert mcs.erase_all.get() == 0
@@ -133,14 +130,12 @@ def test_mcs_card_csaxs_complete_and_stop(mock_mcs_csaxs):
st = mcs.complete()
assert st.done is False
mcs.stop_all.put(0)
mcs.ready_to_read.put(READYTOREAD.PROCESSING)
mcs.stop()
with pytest.raises(Exception):
st.wait(timeout=3)
assert st.done is True
assert st.success is False
assert mcs.stop_all.get() == 1
assert mcs.ready_to_read.get() == READYTOREAD.DONE
def test_mcs_card_csaxs_on_counter_updated(mock_mcs_csaxs):