fix: adapt imports to refactoring in bec_lib
This commit is contained in:
@@ -10,7 +10,6 @@ import threading
|
||||
import time
|
||||
|
||||
import numpy as np
|
||||
from bec_lib import threadlocked
|
||||
from ophyd import DeviceStatus
|
||||
from ophyd_devices.interfaces.base_classes.ophyd_rotation_base import EpicsRotationBase
|
||||
from ophyd_devices.interfaces.protocols.bec_protocols import BECFlyerProtocol, BECScanProtocol
|
||||
@@ -141,18 +140,18 @@ class TomcatAerotechRotation(EpicsRotationBase, BECFlyerProtocol, BECScanProtoco
|
||||
self._stopped = True
|
||||
super().stop(success=success)
|
||||
|
||||
@threadlocked
|
||||
def _is_motor_moving(self):
|
||||
"""Function to check if the motor is moving.
|
||||
|
||||
This function is used in a thread to check if the motor is moving.
|
||||
It resolves by running"""
|
||||
while self.motor_done_move.get():
|
||||
if self._stopped:
|
||||
self._done_moving(success=False)
|
||||
return
|
||||
time.sleep(0.1)
|
||||
self._done_moving(success=True)
|
||||
with self._lock:
|
||||
while self.motor_done_move.get():
|
||||
if self._stopped:
|
||||
self._done_moving(success=False)
|
||||
return
|
||||
time.sleep(0.1)
|
||||
self._done_moving(success=True)
|
||||
|
||||
# TODO This logic could be refined to be more robust for various scan types, i.e. at the moment it just takes
|
||||
# the start and target position and calculates the progress based on the current position.
|
||||
|
||||
@@ -25,7 +25,8 @@ but they are executed in a specific order:
|
||||
|
||||
# import numpy as np
|
||||
|
||||
# from bec_lib import MessageEndpoints, bec_logger, messages
|
||||
# from bec_lib import bec_logger, messages
|
||||
# from bec_lib.endpoints import MessageEndpoints
|
||||
# from bec_server.scan_server.errors import ScanAbortion
|
||||
# from bec_server.scan_server.scans import FlyScanBase, RequestBase, ScanArgType, ScanBase
|
||||
|
||||
|
||||
Reference in New Issue
Block a user