refactor(progress-signal): ProgressSignal for mo1_bragg and nidaq

This commit is contained in:
2025-06-18 14:15:52 +02:00
parent 827557b667
commit e941647750
2 changed files with 8 additions and 15 deletions

View File

@@ -16,6 +16,7 @@ from bec_lib.logger import bec_logger
from ophyd import Component as Cpt
from ophyd import DeviceStatus, Signal, StatusBase
from ophyd.status import SubscriptionStatus
from ophyd_devices import ProgressSignal
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from ophyd_devices.utils.errors import DeviceStopError
from pydantic import BaseModel, Field
@@ -81,6 +82,8 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
The prefix to connect to the soft IOC is X01DA-OP-MO1:BRAGG:
"""
progress_signal = Cpt(ProgressSignal, name="progress_signal")
USER_ACCESS = ["set_advanced_xas_settings", "set_xtal"]
def __init__(self, name: str, prefix: str = "", scan_info: ScanInfo | None = None, **kwargs): # type: ignore
@@ -289,9 +292,6 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
self.stopped = True # Needs to be set to stop motion
######### Utility Methods #########
# FIXME this should become the ProgressSignal
# pylint: disable=unused-argument
def _progress_update(self, value, **kwargs) -> None:
"""Callback method to update the scan progress, runs a callback
to SUB_PROGRESS subscribers, i.e. BEC.
@@ -300,12 +300,7 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
value (int) : current progress value
"""
max_value = 100
self._run_subs(
sub_type=self.SUB_PROGRESS,
value=value,
max_value=max_value,
done=bool(max_value == value),
)
self.progress_signal.put(value=value, max_value=max_value, done=bool(max_value == value))
def set_xas_settings(self, low: float, high: float, scan_time: float) -> None:
"""Set XAS parameters for upcoming scan.

View File

@@ -7,6 +7,7 @@ from bec_lib.logger import bec_logger
from ophyd import Component as Cpt
from ophyd import Device, DeviceStatus, EpicsSignal, EpicsSignalRO, Kind, StatusBase
from ophyd.status import SubscriptionStatus, WaitTimeoutError
from ophyd_devices import ProgressSignal
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from ophyd_devices.sim.sim_signals import SetableSignal
@@ -342,6 +343,8 @@ class Nidaq(PSIDeviceBase, NidaqControl):
scan_info (ScanInfo) : ScanInfo object passed by BEC's devicemanager.
"""
progress_signal = Cpt(ProgressSignal, name="progress_signal")
USER_ACCESS = ["set_config"]
def __init__(self, prefix: str = "", *, name: str, scan_info: ScanInfo = None, **kwargs):
@@ -649,12 +652,7 @@ class Nidaq(PSIDeviceBase, NidaqControl):
return
value = scan_duration - value
max_value = scan_duration
self._run_subs(
sub_type=self.SUB_PROGRESS,
value=value,
max_value=max_value,
done=bool(value == max_value),
)
self.progress_signal.put(value=value, max_value=max_value, done=bool(max_value == value))
def on_stop(self) -> None:
"""Called when the device is stopped."""