diff --git a/ophyd_devices/epics/devices/aerotech/AerotechAutomation1.py b/ophyd_devices/epics/devices/aerotech/AerotechAutomation1.py index 404f2de..def43b2 100644 --- a/ophyd_devices/epics/devices/aerotech/AerotechAutomation1.py +++ b/ophyd_devices/epics/devices/aerotech/AerotechAutomation1.py @@ -1,40 +1,17 @@ -from ophyd import Device, Component, EpicsMotor, EpicsSignal, EpicsSignalRO, Kind, DerivedSignal -from ophyd.status import Status, SubscriptionStatus, StatusBase, DeviceStatus -from ophyd.flyers import FlyerInterface -from time import sleep -import warnings -import numpy as np import time - -try: - from .AerotechAutomation1Enums import * - from .AerotechAutomation1Enums import ( - DataCollectionMode, - DataCollectionFrequency, - AxisDataSignal, - PsoWindowInput, - DriveDataCaptureInput, - DriveDataCaptureTrigger, - TaskDataSignal, - SystemDataSignal, - TomcatSequencerState, - ) -except: - from AerotechAutomation1Enums import * - from AerotechAutomation1Enums import ( - DataCollectionMode, - DataCollectionFrequency, - AxisDataSignal, - PsoWindowInput, - DriveDataCaptureInput, - DriveDataCaptureTrigger, - TaskDataSignal, - SystemDataSignal, - TomcatSequencerState, - ) - -from typing import Union from collections import OrderedDict +from time import sleep + +import numpy as np +from ophyd import Component, Device, EpicsMotor, EpicsSignal, EpicsSignalRO, Kind +from ophyd.status import DeviceStatus, Status, StatusBase, SubscriptionStatus + +from ophyd_devices.epics.devices.aerotech.AerotechAutomation1Enums import ( + DataCollectionFrequency, + DataCollectionMode, + DriveDataCaptureInput, + DriveDataCaptureTrigger, +) class EpicsMotorX(EpicsMotor): @@ -1148,32 +1125,32 @@ class aa1AxisPsoDistance(aa1AxisPsoBase): print("PSO kicked off") return status - def complete(self) -> DeviceStatus: - """Bluesky flyer interface""" - # Array mode waits until the buffer is empty - if hasattr(self, "_distanceValue") and isinstance( - self._distanceValue, (np.ndarray, list, tuple) - ): - # Define wait until the busy flag goes down (excluding initial update) - timestamp_ = 0 + # def complete(self) -> DeviceStatus: + # """Bluesky flyer interface""" + # # Array mode waits until the buffer is empty + # if hasattr(self, "_distanceValue") and isinstance( + # self._distanceValue, (np.ndarray, list, tuple) + # ): + # # Define wait until the busy flag goes down (excluding initial update) + # timestamp_ = 0 - def notRunning(*args, old_value, value, timestamp, **kwargs): - nonlocal timestamp_ - result = False if (timestamp_ == 0) else bool(int(value) & 0x1000) - print(f"Old {old_value}\tNew: {value}\tResult: {result}") - timestamp_ = timestamp - return result + # def notRunning(*args, old_value, value, timestamp, **kwargs): + # nonlocal timestamp_ + # result = False if (timestamp_ == 0) else bool(int(value) & 0x1000) + # print(f"Old {old_value}\tNew: {value}\tResult: {result}") + # timestamp_ = timestamp + # return result - # Subscribe and wait for update - # status = SubscriptionStatus(self.status, notRunning, settle_time=0.5) - # Data capture can be stopped any time - status = DeviceStatus(self) - status.set_finished() - else: - # In distance trigger mode there's no specific goal - status = DeviceStatus(self) - status.set_finished() - return status + # # Subscribe and wait for update + # # status = SubscriptionStatus(self.status, notRunning, settle_time=0.5) + # # Data capture can be stopped any time + # status = DeviceStatus(self) + # status.set_finished() + # else: + # # In distance trigger mode there's no specific goal + # status = DeviceStatus(self) + # status.set_finished() + # return status def describe_collect(self) -> OrderedDict: ret = OrderedDict() @@ -1434,10 +1411,8 @@ class aa1AxisDriveDataCollection(Device): # Bluesky step scanning interface def stage(self, settle_time=0.1): - super().stage() self._switch.set("Start", settle_time=0.5).wait() - status = Status(timeout=0.1, settle_time=settle_time).set_finished() - return status + return super().stage() def unstage(self, settle_time=0.1): self._switch.set("Stop", settle_time=settle_time).wait() diff --git a/ophyd_devices/epics/devices/ophyd_base_devices/__init__.py b/ophyd_devices/epics/devices/aerotech/__init__.py similarity index 100% rename from ophyd_devices/epics/devices/ophyd_base_devices/__init__.py rename to ophyd_devices/epics/devices/aerotech/__init__.py diff --git a/ophyd_devices/epics/devices/ophyd_base_devices/bec_protocols.py b/ophyd_devices/epics/devices/ophyd_base_devices/bec_protocols.py deleted file mode 100644 index c720b4e..0000000 --- a/ophyd_devices/epics/devices/ophyd_base_devices/bec_protocols.py +++ /dev/null @@ -1,499 +0,0 @@ -""" This module provides a range of protocols that describe the expected interface for different types of devices. - -The protocols below can be used as teamplates for functionality to be implemeted by different type of devices. -They further facilitate runtime checks on devices and provide a minimum set of properties required for a device to be loadable by BEC. - -The protocols are: -- BECDeviceProtocol: Protocol for devices in BEC. All devices must at least implement this protocol. -- BECSignalProtocol: Protocol for signals. -- BECScanProtocol: Protocol for the scan interface. -- BECMixinProtocol: Protocol for utilities in particular relevant for detector implementations. -- BECPositionerProtocol: Protocol for positioners. -- BECFlyerProtocol: Protocol with for flyers. - -Keep in mind, that a device of type flyer should generally also implement the BECScanProtocol that provides the required functionality for scans. -Flyers in addition, also implement the BECFlyerProtocol. Similarly, positioners should also implement the BECScanProtocol and BECPositionerProtocol. - -""" - -from typing import Protocol, runtime_checkable - -from bec_lib.file_utils import FileWriterMixin -from ophyd import Component, DeviceStatus, Kind, Staged - -from ophyd_devices.utils import bec_scaninfo_mixin - - -@runtime_checkable -class BECDeviceProtocol(Protocol): - """Protocol for ophyd objects with zero functionality.""" - - _destroyed: bool - - @property - def name(self) -> str: - """name property""" - - @name.setter - def name(self, value: str) -> None: - """name setter""" - - @property - def kind(self) -> Kind: - """kind property""" - - @kind.setter - def kind(self, value: Kind): - """kind setter""" - - @property - def parent(self): - """Property to find the parent device""" - - @property - def root(self): - """Property to fint the root device""" - - @property - def hints(self) -> dict: - """hints property""" - - @property - def connected(self) -> bool: - """connected property. - Check if signals are connected - - Returns: - bool: True if connected, False otherwise - """ - - @connected.setter - def connected(self, value: bool): - """connected setter""" - - def read(self) -> dict: - """read method - - Override by child class with read method - - Returns: - dict: Dictionary with nested dictionary of signals with kind.normal or kind.hinted: - {'signal_name' : {'value' : .., "timestamp" : ..}, ...} - """ - - def read_configuration(self) -> dict: - """read_configuration method - - Override by child class with read_configuration method - - Returns: - dict: Dictionary with nested dictionary of signals with kind.config: - {'signal_name' : {'value' : .., "timestamp" : ..}, ...} - """ - - def describe(self) -> dict: - """describe method - - Override by child class with describe method - - Returns: - dict: Dictionary with dictionaries with signal descriptions ('source', 'dtype', 'shape') - """ - - def describe_configuration(self) -> dict: - """describe method - - Includes all signals of type Kind.config. - Override by child class with describe_configuration method - - Returns: - dict: Dictionary with dictionaries with signal descriptions ('source', 'dtype', 'shape') - """ - - def destroy(self) -> None: - """Destroy method. - - _destroyed must be set to True after calling destroy. - """ - - -@runtime_checkable -class BECSignalProtocol(Protocol): - """Protocol for BEC signals with zero functionality. - - This protocol adds the specific implementation for a signal. - Please be aware that a signal must also implement BECDeviceProtocol. - - Note: Currently the implementation of the protocol is not taking into account the - event_model from ophyd, i.e. _run_sbus - """ - - @property - def limits(self) -> tuple[float, float]: - """Limits property for signals. - If low_limit == high_limit, it is equivalent to NO limits! - - Returns: - tuple: Tuple with lower and upper limits - """ - - @property - def high_limit(self) -> float: - """High limit property for signals. - - Returns: - float: Upper limit - """ - - @property - def low_limit(self) -> float: - """Low limit property for signals. - - Returns: - float: Lower limit - """ - - @property - def write_access(self) -> bool: - """Write access method for signals. - - Returns: - bool: True if write access is allowed, False otherwise - """ - - def check_value(self, value: float): - """Check whether value is within limits - - Args: - value: value to check - - Raises: - LimitError in case the requested motion is not inside of limits. - """ - - def trigger(self) -> DeviceStatus: - """Trigger method for signals. - This method can be used to trigger the signal readout. - For EpicsSignal, the readback value is typically set to auto_monitor=True, - which means their readback value is updated automatically from the IOC. - - Returns: - DeviceStatus: DeviceStatus object - """ - - def put(self, value: any, force: bool = False, timeout: float = None): - """Put method for signals. - This method should resolve immediately and not block. - If not force, the method checks if the value is within limits using check_value. - - - Args: - value (any) : value to put - force (bool) : Flag to force the put and ignore limits - timeout (float) : Timeout for the put - """ - - def set(self, value: any, timeout: float = None) -> DeviceStatus: - """Set method for signals. - This method should be blocking until the set is completed. - - Args: - value (any) : value to set - timeout (float) : Timeout for the set - - Returns: - DeviceStatus : DeviceStatus object that will finish upon return - """ - - -@runtime_checkable -class BECScanProtocol(BECDeviceProtocol, Protocol): - """Protocol for devices offering an Protocol with all relevant functionality for scans. - - In BEC, scans typically follow the order of stage, (pre_scan), trigger, unstage. - Stop should be used to interrupt a scan. Be aware that pre_scan is optional and therefor - part of the BECMixinProtocol, typically useful for more complex devices such as detectors. - - This protocol allows to perform runtime checks on devices of ophyd. - It is the minimum set of properties required for a device to be loadable by BEC. - """ - - _staged: Staged - """Staged property to indicate if the device is staged.""" - - def stage(self) -> list[object]: - """Stage method to prepare the device for an upcoming acquistion. - - This prepares a device for an upcoming acquisition, i.e. it is the first - method for which the scan parameters are known and the device can be configured. - - It can be used to move scan_motors to their start position - or also prepare DAQ systems for the upcoming measurement. - We can further publish the file location for DAQ systems - to BEC and inform BEC's file writer where data will be written to. - - Stagin is not idempoent. If called twice without an unstage it should raise. - For ophyd devices, one may used self._staged = True to check if the device is staged. - - Returns: - list: List of objects that were staged, i.e. [self] - For devices with inheritance from ophyd, return - return super().stage() in the child class. - """ - - def unstage(self) -> list[object]: - """Unstage method to cleanup after the acquisition. - - It can also be used to implement checks whether the acquisition was successful, - inform BEC that the file has been succesfully written, or raise upon receiving - feedback that the scan did not finish successful. - - Unstaging is not idempotent. If called twice without a stage it should raise. - It is recommended to return super().unstage() in the child class, if - the child class also inherits from ophyd repository. - """ - - def stop(self, success: bool) -> None: - """Stop method to stop the device. - - Args: - success: Flag to indicate if the scan was successful or not. - - This method should be called to stop the device. It is recommended to call - super().stop(success=success) if class inherits from ophyd repository. - """ - - def trigger(self) -> DeviceStatus: - """Trigger method on the device - - Returns ophyd DeviceStatus object, which is used to track the status of the trigger. - It can also be blocking until the trigger is completed, and return the status object - with set_finished() method called on the DeviceStatus. - """ - - -@runtime_checkable -class BECMixinProtocol(Protocol): - """Protocol that offers BEC specific utility functionality for detectors.""" - - USER_ACCESS: list[str] - """ - List of methods/properties that will be exposed to the client interface in addition - to the the already exposed signals, methods and properties. - """ - - scaninfo: bec_scaninfo_mixin - """ - BEC scan info mixin class that provides an transparent Protocol to scan parameter - as provided by BEC. It is recommended to use this Protocol to retrieve scaninfo from Redis. - """ - - stopped: bool - """ - Flag to indicate if the device is stopped. - - The stop method should set this flag to True, and i.e. stage to set it to False. - """ - - filewriter: FileWriterMixin - """ - The file writer mixin main purpose is to unify and centralize the creation of - file paths within BEC. Therefore, we recommend devices to use the same mixin for creation of paths. - """ - - def pre_scan(self): - """Pre-scan method is called from BEC right before executing scancore, thus - right before the start of an acquisition. - - It can be used to trigger time critical functions from the device, which - are prone to run into timeouts in case called too early. - """ - - -@runtime_checkable -class BECPositionerProtocol(BECScanProtocol, Protocol): - """Protocol with functionality specific for positioners in BEC.""" - - @property - def limits(self) -> tuple[float, float]: - """Limits property for positioners. - For an EpicsMotor, BEC will automatically recover the limits from the IOC. - - If not set, it returns (0,0). - Note, low_limit = high_limit is equivalent to NO limits! - - Returns: - tuple: Tuple with lower and upper limits - """ - - @property - def low_limit(self) -> float: - """Low limit property for positioners. - - Returns: - float: Lower limit - """ - - @property - def high_limit(self) -> float: - """High limit property for positioners. - - Returns: - float: Upper limit - """ - - def check_value(self, value: float): - """Check whether value is within limits - - Args: - value: value to check - - Raises: - LimitError in case the requested motion is not inside of limits. - """ - - def move(self, position: float) -> DeviceStatus: - """Move method for positioners. - The returned DeviceStatus is marked as done once the positioner has reached the target position. - DeviceStatus.wait() can be used to block until the move is completed. - - Args: - position: position to move to - - Returns: - DeviceStatus: DeviceStatus object - """ - - def stop(self, success: bool) -> None: - """Stop method for positioners. - - Args: - success: Flag to indicate if the scan was successful or not. - """ - - -@runtime_checkable -class BECFlyerProtocol(BECScanProtocol, Protocol): - """Protocol with functionality specific for flyers in BEC.""" - - def configure(self, d: dict): - """Configure method of the flyer. - It is an optional method, but does not need to be implemented by a flyer. - Instead, stage can be used to prepare time critical operations on the device in preparation of a scan. - - Method to configure the flyer in preparation of a scan. - - Args: - d (dict): Dictionary with configuration parameters, i.e. key value pairs of signal_name : value - """ - - def kickoff(self) -> DeviceStatus: - """Kickoff method for flyers. - - The returned DeviceStatus is marked as done once the flyer start flying, - i.e. is ready to be triggered. - - Returns: - DeviceStatus: DeviceStatus object - """ - - def complete(self) -> DeviceStatus: - """Complete method for flyers. - - The returned DeviceStatus is marked as done once the flyer has completed. - - Returns: - DeviceStatus: DeviceStatus object - """ - - -@runtime_checkable -class BECRotationProtocol(Protocol): - """Protocol which defines functionality for a tomography stage for ophyd devices""" - - allow_mod360: Component - """Signal to define whether mod360 operations are allowed. """ - - @property - def has_mod360(self) -> bool: - """Property to check if the motor has mod360 option - - Returns: - bool: True if mod360 is possible on device, False otherwise - """ - - @property - def has_freerun(self) -> bool: - """Property to check if the motor has freerun option - - Returns: - bool: True if freerun is allowed, False otherwise - """ - - @property - def valid_rotation_modes(self) -> list[str]: - """Method to get the valid rotation modes for the implemented motor. - - Returns: - list: List of strings with valid rotation modes - """ - - def apply_mod360(self) -> None: - """Method to apply the modulus 360 operation on the specific device. - - Childrens should override this method - """ - - -@runtime_checkable -class BECEventProtocol(Protocol): - """Protocol for events in BEC. - - This is a first draft for the event protocol introduced throughout BEC. - It needs to be review and extended before it can be used in production. - """ - - _callbacks: dict[dict] - - @property - def event_types(self) -> tuple[str]: - """Event types property""" - - def _run_subs(self, sub_type: str, **kwargs): - """Run subscriptions for the event. - - Args: - sub_type: Subscription type - kwargs: Keyword arguments - """ - - def subscribe(self, callback: callable, event_type: str = None, run: bool = True): - """Subscribe to the event. - - Args: - callback (callable) : Callback function - The expected callback structure is: - def cb(*args, obj:OphydObject, sub_type:str, **kwargs) -> None: - pass - event_type (str) : Event type, if None it defaults to obj._default_sub - This maps to sub_type in _run_subs - run (bool) : If true, run the callback directly. - - Returns: - cid (int): Callback id - """ - - def clear_sub(self, cb: callable, event_type: str = None): - """Clear subscription, given the origianl callback fucntion - - Args: - cb (callable) : Callback - event_type (str): Event type, if None it will be remove from all event_types - """ - - def unsubscribe(self, cid: int): - """Unsubscribe from the event. - - Args: - cid (int): Callback id - """ diff --git a/ophyd_devices/epics/devices/ophyd_base_devices/ophyd_rotation_base.py b/ophyd_devices/epics/devices/ophyd_base_devices/ophyd_rotation_base.py deleted file mode 100644 index 8e316ae..0000000 --- a/ophyd_devices/epics/devices/ophyd_base_devices/ophyd_rotation_base.py +++ /dev/null @@ -1,107 +0,0 @@ -from abc import ABC, abstractmethod - -from bec_lib import bec_logger -from ophyd import Component as Cpt -from ophyd import EpicsMotor -from typeguard import typechecked - -from ophyd_devices.epics.devices.ophyd_base_devices.bec_protocols import ( - BECRotationProtocol, -) -from ophyd_devices.utils.bec_utils import ConfigSignal - -logger = bec_logger.logger - - -class OphtyRotationBaseError(Exception): - """Exception specific for implmenetation of rotation stages.""" - - -class OphydRotationBase(BECRotationProtocol, ABC): - - allow_mod360 = Cpt(ConfigSignal, name="allow_mod360", value=False, kind="config") - - def __init__(self, *args, **kwargs): - """ - Base class to implement functionality specific for rotation devices. - - Childrens should override the instance attributes: - - has_mod360 - - has_freerun - - valid_rotation_modes - - """ - # pylint: disable=protected-access - self._has_mod360 = False - self._has_freerun = False - self._valid_rotation_modes = [] - if "allow_mod360" in kwargs: - if not isinstance(kwargs["allow_mod360"], bool): - raise ValueError("allow_mod360 must be a boolean") - self.allow_mod360.put(kwargs["allow_mod360"]) - super().__init__(*args, **kwargs) - - @abstractmethod - def apply_mod360(self) -> None: - """Method to apply the modulus 360 operation on the specific device. - - Childrens should override this method - """ - - @property - def has_mod360(self) -> bool: - """Property to check if the device has mod360 operation. - - ReadOnly property, childrens should override this method. - """ - return self._has_mod360 - - @property - def has_freerun(self) -> bool: - """Property to check if the device has freerun operation. - - ReadOnly property, childrens should override this method. - """ - return self._has_freerun - - @property - def valid_rotation_modes(self) -> list: - """Method to get the valid rotation modes for the specific device.""" - return self._valid_rotation_modes - - @typechecked - @valid_rotation_modes.setter - def valid_rotation_modes(self, value: list[str]): - """Method to set the valid rotation modes for the specific device.""" - self._valid_rotation_modes = value - return self._valid_rotation_modes - - -# pylint: disable=too-many-ancestors -class EpicsRotationBase(OphydRotationBase, EpicsMotor): - """Class for Epics rotation devices.""" - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self._has_freerun = True - self._has_freerun = True - self._valid_rotation_modes = ["target", "radiography"] - - def apply_mod360(self) -> None: - """Apply modulos 360 operation for EpicsMotorRecord. - - EpicsMotor has the function "set_current_position" which can be used for this purpose. - In addition, there is a check if mod360 is allowed and available. - """ - if self.has_mod360 and self.allow_mod360.get(): - cur_val = self.user_readback.get() - new_val = cur_val % 360 - try: - self.set_current_position(new_val) - except Exception as exc: - error_msg = f"Failed to set new position {new_val} from {cur_val} on device {self.name} with error {exc}" - raise OphtyRotationBaseError(error_msg) from exc - return - logger.info( - f"Did not apply mod360 for device {self.name} with has_mod={self.has_mod360} and allow_mod={self.allow_mod360.get()}" - ) diff --git a/ophyd_devices/epics/devices/ophyd_base_devices/tomcat_rotation_motors.py b/ophyd_devices/epics/devices/ophyd_base_devices/tomcat_rotation_motors.py deleted file mode 100644 index 86890cd..0000000 --- a/ophyd_devices/epics/devices/ophyd_base_devices/tomcat_rotation_motors.py +++ /dev/null @@ -1,181 +0,0 @@ -""" Module for Tomcat rotation motors. - -The following classes implement the rotation motors for: - -- AerotechAutomation1 (Tomcat), based on EpicsMotorIOC. - -""" - -import threading -import time - -import numpy as np -from bec_lib import threadlocked -from ophyd import DeviceStatus - -from ophyd_devices.epics.devices.ophyd_base_devices.bec_protocols import ( - BECFlyerProtocol, - BECScanProtocol, -) -from ophyd_devices.epics.devices.ophyd_base_devices.ophyd_rotation_base import ( - EpicsRotationBase, -) - - -class TomcatAerotechRotation(EpicsRotationBase, BECFlyerProtocol, BECScanProtocol): - """Special motor class that provides flyer interface and progress bar.""" - - SUB_PROGRESS = "progress" - - def __init__( - self, - prefix="", - *, - name, - kind=None, - read_attrs=None, - configuration_attrs=None, - parent=None, - **kwargs, - ): - """Implementation of the Tomcat AerotechAutomation 1 rotation motor class. - - This motor class is based on EpicsRotationBase and provides in addition the flyer interface for BEC - and a progress update. - """ - super().__init__( - prefix=prefix, - name=name, - kind=kind, - read_attrs=read_attrs, - configuration_attrs=configuration_attrs, - parent=parent, - **kwargs, - ) - self._start_position = None - self._target_position = None - self._stopped = False - self._rlock = threading.RLock() - self.subscribe(self._progress_update, run=False) - - # ------------------ alternative to using configure method --------------------- # - @property - def start_position(self) -> float: - """Get the start position.""" - return self._start_position - - @start_position.setter - def start_position(self, value: float) -> None: - """Set the start position.""" - self._start_position = value - - @property - def target_position(self) -> float: - """Get the start position.""" - return self._target_position - - @target_position.setter - def target_position(self, value: float) -> None: - """Set the start position.""" - self._target_position = value - - # ------------------ alternative to using configure method --------------------- # - - def configure(self, d: dict) -> dict: - """Configure method from the device. - - This method is usually used to set configuration parameters for the device. - - Args: - d (dict): Dictionary with configuration parameters. - - """ - if "target" in d: - self._target_position = d["target"] - del d["target"] - if "position" in d: - self._target_position = d["position"] - del d["position"] - return super().configure(d) - - def pre_scan(self): - """Perform pre-scan operation, e.g. move to start position.""" - if self._start_position: - self.move(self._start_position, wait=True) - - def kickoff(self) -> DeviceStatus: - """Kickoff the scan. - - The kickoff method should return a status object that is set to finish once the flyer flys, and is ready for the next actions. - I would consider the following implementation. - """ - self._start_position = float(self.position) - self.move(self._target_position, wait=False) - status = DeviceStatus(self) - status.set_finished() - return status - - def complete(self) -> DeviceStatus: - """Complete method of the scan. - - This will be called in a fly scan after the kickoff, thus, the stage will be moving to it's target position. - It should - - The complete method should return a status object that is set to finish once the flyer is done and the scan is complete. - I would consider the following implementation. - """ - threading.Thread(target=self._is_motor_moving, daemon=True).start() - status = DeviceStatus(self) - self.subscribe(status.set_finished, event_type=self.SUB_DONE, run=False) - return status - - def stage(self) -> list[object]: - """Stage the scan. - - We add here in addition the setting of the _stopped flag to False for the thread. - """ - self._stopped = False - return super().stage() - - def stop(self, success: bool = False) -> None: - """Stop the scan. - - If the device is stopped, the _stopped flag is set to True. - """ - self._stopped = True - super().stop(success=success) - - @threadlocked - def _is_motor_moving(self): - """Function to check if the motor is moving. - - This function is used in a thread to check if the motor is moving. - It resolves by running""" - while self.motor_done_move.get(): - if self._stopped: - self._done_moving(success=False) - return - time.sleep(0.1) - self._done_moving(success=True) - - def _progress_update(self, value, **kwargs) -> None: - """Progress update on the scan""" - if (self._start_position is None) or (self._target_position is None) or (not self.moving): - self._run_subs( - sub_type=self.SUB_PROGRESS, - value=1, - max_value=1, - done=1, - ) - return - - progress = np.abs( - (value - self._start_position) / (self._target_position - self._start_position) - ) - max_value = 100 - self._run_subs( - sub_type=self.SUB_PROGRESS, - value=int(100 * progress), - max_value=max_value, - done=int(np.isclose(max_value, progress, 1e-3)), - )