refactor: replace deprecated imports from typing

https://peps.python.org/pep-0585/#implementation
This commit is contained in:
2023-12-12 14:55:27 +01:00
parent 43626973ca
commit 952c92e4b9
13 changed files with 30 additions and 42 deletions

View File

@ -1,8 +1,7 @@
import os
import time
from typing import List
from bec_lib import messages, MessageEndpoints, bec_logger
from bec_lib import MessageEndpoints, bec_logger, messages
from ophyd import Component as Cpt
from ophyd import Device, DeviceStatus, EpicsSignal, EpicsSignalRO, Signal
@ -91,7 +90,7 @@ class Eiger1p5MDetector(Device):
floor_dir = scan_number // scan_bundle * scan_bundle
return f"S{floor_dir:0{leading_zeros}d}-{floor_dir+scan_bundle-1:0{leading_zeros}d}/S{scan_number:0{leading_zeros}d}"
def stage(self) -> List[object]:
def stage(self) -> list[object]:
scan_msg = self._get_current_scan_msg()
self.metadata = {
"scanID": scan_msg.content["scanID"],
@ -141,7 +140,7 @@ class Eiger1p5MDetector(Device):
return super().stage()
def unstage(self) -> List[object]:
def unstage(self) -> list[object]:
time_waited = 0
sleep_time = 0.2
framesexpected = self.frames.get()

View File

@ -4,7 +4,7 @@ import time
import numpy as np
import os
from typing import Any, List
from typing import Any
from ophyd import EpicsSignal, EpicsSignalRO, EpicsSignalWithRBV
from ophyd import Device
@ -426,7 +426,7 @@ class Eiger9McSAXS(PSIDetectorBase):
value = trigger_source
self.cam.trigger_mode.put(value)
def stage(self) -> List[object]:
def stage(self) -> list[object]:
"""
Add functionality to stage, and arm the detector

View File

@ -1,6 +1,5 @@
import enum
import os
from typing import List
from ophyd import EpicsSignal, EpicsSignalRO, EpicsSignalWithRBV, Component as Cpt
from ophyd.mca import EpicsMCARecord
@ -362,7 +361,7 @@ class FalconcSAXS(PSIDetectorBase):
self.pixel_advance_mode.put(trigger)
self.ignore_gate.put(ignore_gate)
def stage(self) -> List[object]:
def stage(self) -> list[object]:
"""Stage"""
rtr = super().stage()
self.custom_prepare.arm_acquisition()

View File

@ -1,6 +1,5 @@
import enum
import threading
from typing import List
from collections import defaultdict
import numpy as np
@ -317,7 +316,7 @@ class MCScSAXS(PSIDetectorBase):
value = int(trigger_source)
self.input_mode.set(value)
def stage(self) -> List[object]:
def stage(self) -> list[object]:
"""stage the detector for upcoming acquisition"""
rtr = super().stage()
self.custom_prepare.arm_acquisition()

View File

@ -1,6 +1,6 @@
import time
import enum
from typing import Any, List
from typing import Any
from ophyd import Device, Component, EpicsSignal, EpicsSignalRO, Kind
from ophyd import PVPositioner, Signal, DeviceStatus
from ophyd.pseudopos import (
@ -409,7 +409,7 @@ class PSIDelayGeneratorBase(Device):
self.custom_prepare.initialize_default_parameter()
self.custom_prepare.is_ddg_okay()
def set_channels(self, signal: str, value: Any, channels: List = None) -> None:
def set_channels(self, signal: str, value: Any, channels: list = None) -> None:
"""
Method to set signals on DelayPair and DelayStatic channels.
@ -420,7 +420,7 @@ class PSIDelayGeneratorBase(Device):
Args:
signal (str) : signal to set (width, delay, amplitude, offset, polarity)
value (Any) : value to set
channels (List, optional) : list of channels to set. Defaults to self.all_channels (T0,AB,CD,EF,GH)
channels (list, optional) : list of channels to set. Defaults to self.all_channels (T0,AB,CD,EF,GH)
"""
if not channels:
channels = self.all_channels
@ -465,7 +465,7 @@ class PSIDelayGeneratorBase(Device):
"""Disable burst mode"""
self.burstMode.put(0)
def stage(self) -> List[object]:
def stage(self) -> list[object]:
"""
Method to stage the device.
@ -477,7 +477,7 @@ class PSIDelayGeneratorBase(Device):
- is_ddg_okay : check if DDG is okay
Returns:
List(object): list of objects that were staged
list(object): list of objects that were staged
"""
if self._staged != Staged.no:
return super().stage()
@ -509,7 +509,7 @@ class PSIDelayGeneratorBase(Device):
"""
self.custom_prepare.on_pre_scan()
def unstage(self) -> List[object]:
def unstage(self) -> list[object]:
"""
Method unstage gets called at the end of a scan.
@ -522,7 +522,7 @@ class PSIDelayGeneratorBase(Device):
- is_ddg_okay : check if DDG is okay
Returns:
List(object): list of objects that were unstaged
list(object): list of objects that were unstaged
"""
self.custom_prepare.check_scanID()
if self.stopped is True:

View File

@ -1,8 +1,6 @@
import time
import os
from typing import List
from ophyd import Device
from ophyd.device import Staged
@ -260,7 +258,7 @@ class PSIDetectorBase(Device):
self.custom_prepare.initialize_detector()
self.custom_prepare.initialize_detector_backend()
def stage(self) -> List[object]:
def stage(self) -> list[object]:
"""
Stage device in preparation for a scan
@ -269,7 +267,7 @@ class PSIDetectorBase(Device):
- _prep_detector : prepare detector for measurement
Returns:
List(object): list of objects that were staged
list(object): list of objects that were staged
"""
# Method idempotent, should rais ;obj;'RedudantStaging' if staged twice
@ -295,7 +293,7 @@ class PSIDetectorBase(Device):
self.custom_prepare.on_trigger()
return super().trigger()
def unstage(self) -> List[object]:
def unstage(self) -> list[object]:
"""
Unstage device in preparation for a scan
@ -309,7 +307,7 @@ class PSIDetectorBase(Device):
- custom_prepare.publish_file_location : publish file location to bec
Returns:
List(object): list of objects that were unstaged
list(object): list of objects that were unstaged
"""
self.custom_prepare.check_scanID()
if self.stopped is True:

View File

@ -1,7 +1,6 @@
import functools
import threading
import time
from typing import List
import numpy as np
from bec_lib import bec_logger
@ -264,10 +263,10 @@ class FlomniGalilMotor(Device, PositionerBase):
"""The engineering units (EGU) for positions"""
return "mm"
def stage(self) -> List[object]:
def stage(self) -> list[object]:
return super().stage()
def unstage(self) -> List[object]:
def unstage(self) -> list[object]:
return super().unstage()
def stop(self, *, success=False):

View File

@ -1,7 +1,6 @@
import functools
import threading
import time
from typing import List
import numpy as np
from bec_lib import bec_logger
@ -308,10 +307,10 @@ class FuprGalilMotor(Device, PositionerBase):
"""The engineering units (EGU) for positions"""
return "mm"
def stage(self) -> List[object]:
def stage(self) -> list[object]:
return super().stage()
def unstage(self) -> List[object]:
def unstage(self) -> list[object]:
return super().unstage()
def stop(self, *, success=False):

View File

@ -1,7 +1,6 @@
import functools
import threading
import time
from typing import List
import numpy as np
from bec_lib import bec_logger
@ -576,10 +575,10 @@ class GalilMotor(Device, PositionerBase):
"""The engineering units (EGU) for positions"""
return "mm"
def stage(self) -> List[object]:
def stage(self) -> list[object]:
return super().stage()
def unstage(self) -> List[object]:
def unstage(self) -> list[object]:
return super().unstage()
def stop(self, *, success=False):

View File

@ -1,7 +1,6 @@
import functools
import threading
import time
from typing import List
import numpy as np
from bec_lib import bec_logger

View File

@ -1,7 +1,6 @@
import functools
import threading
import time
from typing import List
import numpy as np
from bec_lib import messages, MessageEndpoints, bec_logger
@ -827,10 +826,10 @@ class RtLamniMotor(Device, PositionerBase):
# how is this used later?
def stage(self) -> List[object]:
def stage(self) -> list[object]:
return super().stage()
def unstage(self) -> List[object]:
def unstage(self) -> list[object]:
return super().unstage()
def stop(self, *, success=False):

View File

@ -2,7 +2,6 @@ import os
import threading
import time as ttime
import warnings
from typing import List
import numpy as np
from bec_lib import MessageEndpoints, bec_logger, messages
@ -301,7 +300,7 @@ class SynSLSDetector(Device):
threading.Thread(target=acquire, daemon=True).start()
return status
def stage(self) -> List[object]:
def stage(self) -> list[object]:
msg = self.device_manager.producer.get(MessageEndpoints.scan_status())
scan_msg = messages.ScanStatusMessage.loads(msg)
self.metadata = {
@ -316,7 +315,7 @@ class SynSLSDetector(Device):
)
return super().stage()
def unstage(self) -> List[object]:
def unstage(self) -> list[object]:
signals = {"config": self.sim_state, "data": self.file_name}
msg = messages.DeviceMessage(signals=signals, metadata=self.metadata)
self.device_manager.producer.set_and_publish(

View File

@ -1,7 +1,6 @@
import logging
import threading
import time
from typing import List
import numpy as np
from bec_lib import bec_logger
@ -275,10 +274,10 @@ class SmaractMotor(Device, PositionerBase):
"""The engineering units (EGU) for positions"""
return "mm"
def stage(self) -> List[object]:
def stage(self) -> list[object]:
return super().stage()
def unstage(self) -> List[object]:
def unstage(self) -> list[object]:
return super().unstage()
def stop(self, *, success=False):