style(black): skip magic trailing comma

This commit is contained in:
wakonig_k 2024-04-10 16:00:14 +02:00
parent 6ba2428dd8
commit b1f353139b
16 changed files with 206 additions and 291 deletions

View File

@ -58,7 +58,7 @@ class X07MAUndulator(PVPositioner):
configuration_attrs=None,
parent=None,
egu="",
**kwargs
**kwargs,
):
super().__init__(
prefix,
@ -68,7 +68,7 @@ class X07MAUndulator(PVPositioner):
configuration_attrs=configuration_attrs,
parent=parent,
egu=egu,
**kwargs
**kwargs,
)
self.readback.name = self.name
@ -96,7 +96,7 @@ class PGMMonochromator(PVPositioner):
configuration_attrs=None,
parent=None,
egu="",
**kwargs
**kwargs,
):
super().__init__(
prefix,
@ -106,7 +106,7 @@ class PGMMonochromator(PVPositioner):
configuration_attrs=configuration_attrs,
parent=parent,
egu=egu,
**kwargs
**kwargs,
)
self.readback.name = self.name
@ -237,7 +237,7 @@ class X07MAExitSlit(PVPositioner):
configuration_attrs=None,
parent=None,
egu="",
**kwargs
**kwargs,
):
super().__init__(
prefix,
@ -247,7 +247,7 @@ class X07MAExitSlit(PVPositioner):
configuration_attrs=configuration_attrs,
parent=parent,
egu=egu,
**kwargs
**kwargs,
)
self.readback.name = self.name
@ -294,9 +294,7 @@ class X07MAMagnetAxis(PVPositioner):
class NormSignal(Signal):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._metadata.update(
write_access=False,
)
self._metadata.update(write_access=False)
def wait_for_connection(self, timeout=0):
super().wait_for_connection(timeout)
@ -389,7 +387,7 @@ class X07MATemperatureController(Device):
read_attrs=None,
configuration_attrs=None,
parent=None,
**kwargs
**kwargs,
):
super().__init__(
prefix,
@ -398,7 +396,7 @@ class X07MATemperatureController(Device):
read_attrs=read_attrs,
configuration_attrs=configuration_attrs,
parent=parent,
**kwargs
**kwargs,
)
self.readback.name = self.name

View File

@ -22,7 +22,7 @@ class EpicsMotorEx(EpicsMotor):
read_attrs=None,
configuration_attrs=None,
parent=None,
**kwargs
**kwargs,
):
# get configuration attributes from kwargs and then remove them
attrs = {}
@ -39,7 +39,7 @@ class EpicsMotorEx(EpicsMotor):
read_attrs=read_attrs,
configuration_attrs=configuration_attrs,
parent=parent,
**kwargs
**kwargs,
)
# set configuration attributes

View File

@ -263,11 +263,7 @@ class Bpm4i(Device):
ch2 = Component(EpicsSignalRO, "S3", auto_monitor=True, kind=Kind.omitted, name="ch2")
ch3 = Component(EpicsSignalRO, "S4", auto_monitor=True, kind=Kind.omitted, name="ch3")
ch4 = Component(EpicsSignalRO, "S5", auto_monitor=True, kind=Kind.omitted, name="ch4")
sum = Component(
CurrentSum,
kind=Kind.hinted,
name="sum",
)
sum = Component(CurrentSum, kind=Kind.hinted, name="sum")
def __init__(
self,

View File

@ -468,11 +468,7 @@ class GalilMotorIsMoving(GalilSignalRO):
def get(self):
val = super().get()
if val is not None:
self._run_subs(
sub_type=self.SUB_VALUE,
value=val,
timestamp=time.time(),
)
self._run_subs(sub_type=self.SUB_VALUE, value=val, timestamp=time.time())
return val
@ -490,11 +486,7 @@ class SGalilMotor(Device, PositionerBase):
"""
USER_ACCESS = ["controller"]
readback = Cpt(
GalilReadbackSignal,
signal_name="readback",
kind="hinted",
)
readback = Cpt(GalilReadbackSignal, signal_name="readback", kind="hinted")
user_setpoint = Cpt(GalilSetpointSignal, signal_name="setpoint")
motor_is_moving = Cpt(GalilMotorIsMoving, signal_name="motor_is_moving", kind="normal")
all_axes_referenced = Cpt(GalilAxesReferenced, signal_name="all_axes_referenced", kind="config")
@ -624,18 +616,10 @@ class SGalilMotor(Device, PositionerBase):
while self.motor_is_moving.get():
logger.info("motor is moving")
val = self.readback.read()
self._run_subs(
sub_type=self.SUB_READBACK,
value=val,
timestamp=time.time(),
)
self._run_subs(sub_type=self.SUB_READBACK, value=val, timestamp=time.time())
time.sleep(1.5)
val = self.readback.read()
success = np.isclose(
val[self.name]["value"],
position,
atol=self.tolerance,
)
success = np.isclose(val[self.name]["value"], position, atol=self.tolerance)
if not success:
print(" stop")
@ -712,11 +696,7 @@ class SGalilMotor(Device, PositionerBase):
)
return status
def configure(
self,
parameter: dict,
**kwargs,
) -> None:
def configure(self, parameter: dict, **kwargs) -> None:
self._kickoff_params = parameter

View File

@ -73,10 +73,7 @@ class NPointController:
_channel_base = ["11", "83"]
def __init__(
self,
comm_socket: SocketIO,
server_ip: str = "129.129.99.87",
server_port: int = 23,
self, comm_socket: SocketIO, server_ip: str = "129.129.99.87", server_port: int = 23
) -> None:
self._lock = threading.RLock()
super().__init__()
@ -107,12 +104,7 @@ class NPointController:
t.field_names = ["Channel", "Range", "Position", "Target"]
for ii in range(self.NUM_CHANNELS):
t.add_row(
[
ii,
self._get_range(ii),
self._get_current_pos(ii),
self._get_target_pos(ii),
]
[ii, self._get_range(ii), self._get_current_pos(ii), self._get_target_pos(ii)]
)
print(t)

View File

@ -52,14 +52,9 @@ DEFAULT_PARAMS_LMFIT = {
"sigma": 1,
}
DEFAULT_PARAMS_NOISE = {
"noise": NoiseType.UNIFORM,
"noise_multiplier": 10,
}
DEFAULT_PARAMS_NOISE = {"noise": NoiseType.UNIFORM, "noise_multiplier": 10}
DEFAULT_PARAMS_MOTOR = {
"ref_motor": "samx",
}
DEFAULT_PARAMS_MOTOR = {"ref_motor": "samx"}
DEFAULT_PARAMS_CAMERA_GAUSSIAN = {
"amplitude": 100,
@ -67,17 +62,11 @@ DEFAULT_PARAMS_CAMERA_GAUSSIAN = {
"covariance": np.array([[400, 100], [100, 400]]),
}
DEFAULT_PARAMS_CAMERA_CONSTANT = {
"amplitude": 100,
}
DEFAULT_PARAMS_CAMERA_CONSTANT = {"amplitude": 100}
DEFAULT_PARAMS_HOT_PIXEL = {
"hot_pixel_coords": np.array([[24, 24], [50, 20], [4, 40]]),
"hot_pixel_types": [
HotPixelType.FLUCTUATING,
HotPixelType.CONSTANT,
HotPixelType.FLUCTUATING,
],
"hot_pixel_types": [HotPixelType.FLUCTUATING, HotPixelType.CONSTANT, HotPixelType.FLUCTUATING],
"hot_pixel_values": np.array([1e4, 1e6, 1e4]),
}
@ -98,12 +87,7 @@ class SimulatedDataBase(ABC):
- update_sim_state: update the simulated state of the device
"""
USER_ACCESS = [
"sim_params",
"sim_select_model",
"sim_get_models",
"sim_show_all",
]
USER_ACCESS = ["sim_params", "sim_select_model", "sim_get_models", "sim_show_all"]
def __init__(self, *args, parent=None, device_manager=None, **kwargs) -> None:
"""
@ -254,12 +238,7 @@ class SimulatedDataBase(ABC):
table.title = "Available methods within the simulation module"
table.field_names = ["Method", "Docstring"]
table.add_row(
[
self.sim_get_models.__name__,
f"{self.sim_get_models.__doc__}",
]
)
table.add_row([self.sim_get_models.__name__, f"{self.sim_get_models.__doc__}"])
table.add_row([self.sim_select_model.__name__, self.sim_select_model.__doc__])
table.add_row(["sim_params", self.__class__.sim_params.__doc__])
table.max_table_width = width
@ -621,11 +600,7 @@ class SimulatedDataCamera(SimulatedDataBase):
) from exc
def _compute_multivariate_gaussian(
self,
pos: np.ndarray | list,
cen_off: np.ndarray | list,
cov: np.ndarray | list,
amp: float,
self, pos: np.ndarray | list, cen_off: np.ndarray | list, cov: np.ndarray | list, amp: float
) -> np.ndarray:
"""Computes and returns the multivariate Gaussian distribution.

View File

@ -19,13 +19,7 @@ class DeviceProxy(BECDeviceBase, ABC):
The minimum requirement for a device proxy is to implement the _compute method.
"""
def __init__(
self,
name,
*args,
device_manager=None,
**kwargs,
):
def __init__(self, name, *args, device_manager=None, **kwargs):
self.name = name
self.device_manager = device_manager
self.config = None
@ -111,13 +105,7 @@ class SlitProxy(DeviceProxy):
USER_ACCESS = ["enabled", "lookup", "help"]
def __init__(
self,
name,
*args,
device_manager=None,
**kwargs,
):
def __init__(self, name, *args, device_manager=None, **kwargs):
self._gaussian_blur_sigma = 5
super().__init__(name, *args, device_manager=device_manager, **kwargs)
@ -225,13 +213,7 @@ class H5ImageReplayProxy(DeviceProxy):
USER_ACCESS = ["file_source", "h5_entry"]
def __init__(
self,
name,
*args,
device_manager=None,
**kwargs,
):
def __init__(self, name, *args, device_manager=None, **kwargs):
self.h5_file = None
self.h5_dataset = None
self._number_of_images = None

View File

@ -44,10 +44,7 @@ class SetableSignal(Signal):
**kwargs,
):
super().__init__(*args, name=name, value=value, kind=kind, **kwargs)
self._metadata.update(
connected=True,
write_access=False,
)
self._metadata.update(connected=True, write_access=False)
self._value = value
self.precision = precision
self.sim = getattr(self.parent, "sim", None)
@ -138,10 +135,7 @@ class ReadOnlySignal(Signal):
**kwargs,
):
super().__init__(*args, name=name, parent=parent, value=value, kind=kind, **kwargs)
self._metadata.update(
connected=True,
write_access=False,
)
self._metadata.update(connected=True, write_access=False)
self._value = value
self.precision = precision
self.compute_readback = compute_readback
@ -293,11 +287,7 @@ class CustomSetableSignal(BECDeviceBase):
Core function for signal.
"""
res = {
self.name: {
"source": str(self.__class__),
"dtype": self._dtype,
"shape": self._shape,
}
self.name: {"source": str(self.__class__), "dtype": self._dtype, "shape": self._shape}
}
if self.precision is not None:
res[self.name]["precision"] = self.precision
@ -314,12 +304,7 @@ class CustomSetableSignal(BECDeviceBase):
def read(self):
"""Read method"""
return {
self.name: {
"value": self.get(),
"timestamp": self.timestamp,
}
}
return {self.name: {"value": self.get(), "timestamp": self.timestamp}}
def read_configuration(self):
"""Read method"""

View File

@ -50,10 +50,7 @@ class SynSetpoint(Signal):
old_val = self._readback
self._readback = self._dtype(value)
self._run_subs(
sub_type="value",
old_value=old_val,
value=self._readback,
timestamp=time.time(),
sub_type="value", old_value=old_val, value=self._readback, timestamp=time.time()
)
def get(self):

View File

@ -90,11 +90,7 @@ class SmaractAxisLimits(SmaractSignalBase):
class SmaractMotor(Device, PositionerBase):
USER_ACCESS = ["controller"]
readback = Cpt(
SmaractReadbackSignal,
signal_name="readback",
kind="hinted",
)
readback = Cpt(SmaractReadbackSignal, signal_name="readback", kind="hinted")
user_setpoint = Cpt(SmaractSetpointSignal, signal_name="setpoint")
# motor_resolution = Cpt(
@ -217,18 +213,10 @@ class SmaractMotor(Device, PositionerBase):
def move_and_finish():
while self.motor_is_moving.get():
val = self.readback.read()
self._run_subs(
sub_type=self.SUB_READBACK,
value=val,
timestamp=time.time(),
)
self._run_subs(sub_type=self.SUB_READBACK, value=val, timestamp=time.time())
time.sleep(0.1)
val = self.readback.read()
success = np.isclose(
val[self.name]["value"],
position,
atol=self.tolerance,
)
success = np.isclose(val[self.name]["value"], position, atol=self.tolerance)
self._done_moving(success=success)
threading.Thread(target=move_and_finish, daemon=True).start()
@ -312,20 +300,10 @@ if __name__ == "__main__":
from ophyd_devices.utils.socket import SocketMock
lsmarA = SmaractMotor(
"A",
name="lsmarA",
host="mpc2680.psi.ch",
port=8085,
sign=1,
socket_cls=SocketMock,
"A", name="lsmarA", host="mpc2680.psi.ch", port=8085, sign=1, socket_cls=SocketMock
)
lsmarB = SmaractMotor(
"B",
name="lsmarB",
host="mpc2680.psi.ch",
port=8085,
sign=1,
socket_cls=SocketMock,
"B", name="lsmarB", host="mpc2680.psi.ch", port=8085, sign=1, socket_cls=SocketMock
)
lsmarA.stage()
lsmarB.stage()

View File

@ -140,10 +140,7 @@ class SocketSignal(abc.ABC, Signal):
timestamp = time.time()
super().put(value, timestamp=timestamp, force=True)
self._run_subs(
sub_type=self.SUB_SETPOINT,
old_value=old_value,
value=value,
timestamp=timestamp,
sub_type=self.SUB_SETPOINT, old_value=old_value, value=value, timestamp=timestamp
)
def describe(self):

View File

@ -27,8 +27,11 @@ def mock_det():
sim_mode = False
dm = DMMock()
with mock.patch.object(dm, "connector"):
with mock.patch("ophyd_devices.epics.devices.psi_detector_base.FileWriter"), mock.patch(
"ophyd_devices.epics.devices.psi_detector_base.PSIDetectorBase._update_service_config"
with (
mock.patch("ophyd_devices.epics.devices.psi_detector_base.FileWriter"),
mock.patch(
"ophyd_devices.epics.devices.psi_detector_base.PSIDetectorBase._update_service_config"
),
):
with mock.patch.object(ophyd, "cl") as mock_cl:
mock_cl.get_pv = MockPV
@ -48,18 +51,25 @@ def test_init():
sim_mode = False
dm = DMMock()
with mock.patch.object(dm, "connector"):
with mock.patch("ophyd_devices.epics.devices.psi_detector_base.FileWriter"), mock.patch(
"ophyd_devices.epics.devices.psi_detector_base.PSIDetectorBase._update_service_config"
with (
mock.patch("ophyd_devices.epics.devices.psi_detector_base.FileWriter"),
mock.patch(
"ophyd_devices.epics.devices.psi_detector_base.PSIDetectorBase._update_service_config"
),
):
with mock.patch.object(ophyd, "cl") as mock_cl:
mock_cl.get_pv = MockPV
with mock.patch(
"ophyd_devices.epics.devices.eiger9m_csaxs.Eiger9MSetup.initialize_default_parameter"
) as mock_default, mock.patch(
"ophyd_devices.epics.devices.eiger9m_csaxs.Eiger9MSetup.initialize_detector"
) as mock_init_det, mock.patch(
"ophyd_devices.epics.devices.eiger9m_csaxs.Eiger9MSetup.initialize_detector_backend"
) as mock_init_backend:
with (
mock.patch(
"ophyd_devices.epics.devices.eiger9m_csaxs.Eiger9MSetup.initialize_default_parameter"
) as mock_default,
mock.patch(
"ophyd_devices.epics.devices.eiger9m_csaxs.Eiger9MSetup.initialize_detector"
) as mock_init_det,
mock.patch(
"ophyd_devices.epics.devices.eiger9m_csaxs.Eiger9MSetup.initialize_detector_backend"
) as mock_init_backend,
):
Eiger9McSAXS(name=name, prefix=prefix, device_manager=dm, sim_mode=sim_mode)
mock_default.assert_called_once()
mock_init_det.assert_called_once()
@ -207,11 +217,12 @@ def test_initialize_detector_backend(
def test_stage(
mock_det, scaninfo, daq_status, daq_cfg, detector_state, stopped, expected_exception
):
with mock.patch.object(
mock_det.custom_prepare, "std_client"
) as mock_std_daq, mock.patch.object(
mock_det.custom_prepare, "publish_file_location"
) as mock_publish_file_location:
with (
mock.patch.object(mock_det.custom_prepare, "std_client") as mock_std_daq,
mock.patch.object(
mock_det.custom_prepare, "publish_file_location"
) as mock_publish_file_location,
):
mock_std_daq.stop_writer.return_value = None
mock_std_daq.get_status.return_value = daq_status
mock_std_daq.get_config.return_value = daq_cfg
@ -281,14 +292,11 @@ def test_stage(
],
)
def test_prepare_detector_backend(mock_det, scaninfo, daq_status, expected_exception):
with mock.patch.object(
mock_det.custom_prepare, "std_client"
) as mock_std_daq, mock.patch.object(
mock_det.custom_prepare, "filepath_exists"
) as mock_file_path_exists, mock.patch.object(
mock_det.custom_prepare, "stop_detector_backend"
) as mock_stop_backend, mock.patch.object(
mock_det, "scaninfo"
with (
mock.patch.object(mock_det.custom_prepare, "std_client") as mock_std_daq,
mock.patch.object(mock_det.custom_prepare, "filepath_exists") as mock_file_path_exists,
mock.patch.object(mock_det.custom_prepare, "stop_detector_backend") as mock_stop_backend,
mock.patch.object(mock_det, "scaninfo"),
):
mock_std_daq.start_writer_async.return_value = None
mock_std_daq.get_status.return_value = daq_status
@ -317,9 +325,12 @@ def test_prepare_detector_backend(mock_det, scaninfo, daq_status, expected_excep
@pytest.mark.parametrize("stopped, expected_exception", [(False, False), (True, True)])
def test_unstage(mock_det, stopped, expected_exception):
with mock.patch.object(mock_det.custom_prepare, "finished") as mock_finished, mock.patch.object(
mock_det.custom_prepare, "publish_file_location"
) as mock_publish_file_location:
with (
mock.patch.object(mock_det.custom_prepare, "finished") as mock_finished,
mock.patch.object(
mock_det.custom_prepare, "publish_file_location"
) as mock_publish_file_location,
):
mock_det.stopped = stopped
if expected_exception:
mock_det.unstage()
@ -375,11 +386,12 @@ def test_publish_file_location(mock_det, scaninfo):
def test_stop(mock_det):
with mock.patch.object(
mock_det.custom_prepare, "stop_detector"
) as mock_stop_det, mock.patch.object(
mock_det.custom_prepare, "stop_detector_backend"
) as mock_stop_detector_backend:
with (
mock.patch.object(mock_det.custom_prepare, "stop_detector") as mock_stop_det,
mock.patch.object(
mock_det.custom_prepare, "stop_detector_backend"
) as mock_stop_detector_backend,
):
mock_det.stop()
mock_stop_det.assert_called_once()
mock_stop_detector_backend.assert_called_once()
@ -420,13 +432,11 @@ def test_stop(mock_det):
],
)
def test_finished(mock_det, stopped, cam_state, daq_status, scaninfo, expected_exception):
with mock.patch.object(
mock_det.custom_prepare, "std_client"
) as mock_std_daq, mock.patch.object(
mock_det.custom_prepare, "stop_detector_backend"
) as mock_stop_backend, mock.patch.object(
mock_det.custom_prepare, "stop_detector"
) as mock_stop_det:
with (
mock.patch.object(mock_det.custom_prepare, "std_client") as mock_std_daq,
mock.patch.object(mock_det.custom_prepare, "stop_detector_backend") as mock_stop_backend,
mock.patch.object(mock_det.custom_prepare, "stop_detector") as mock_stop_det,
):
mock_std_daq.get_status.return_value = daq_status
mock_det.cam.acquire._read_pv.mock_state = cam_state
mock_det.scaninfo.num_points = scaninfo["num_points"]

View File

@ -28,11 +28,12 @@ def mock_det():
sim_mode = False
dm = DMMock()
with mock.patch.object(dm, "connector"):
with mock.patch(
"ophyd_devices.epics.devices.psi_detector_base.FileWriter"
) as filemixin, mock.patch(
"ophyd_devices.epics.devices.psi_detector_base.PSIDetectorBase._update_service_config"
) as mock_service_config:
with (
mock.patch("ophyd_devices.epics.devices.psi_detector_base.FileWriter") as filemixin,
mock.patch(
"ophyd_devices.epics.devices.psi_detector_base.PSIDetectorBase._update_service_config"
) as mock_service_config,
):
with mock.patch.object(ophyd, "cl") as mock_cl:
mock_cl.get_pv = MockPV
mock_cl.thread_class = threading.Thread
@ -133,13 +134,16 @@ def test_stage(mock_det, scaninfo):
This includes testing _prep_det
"""
with mock.patch.object(mock_det, "set_trigger") as mock_set_trigger, mock.patch.object(
mock_det.custom_prepare, "prepare_detector_backend"
) as mock_prep_data_backend, mock.patch.object(
mock_det.custom_prepare, "publish_file_location"
) as mock_publish_file_location, mock.patch.object(
mock_det.custom_prepare, "arm_acquisition"
) as mock_arm_acquisition:
with (
mock.patch.object(mock_det, "set_trigger") as mock_set_trigger,
mock.patch.object(
mock_det.custom_prepare, "prepare_detector_backend"
) as mock_prep_data_backend,
mock.patch.object(
mock_det.custom_prepare, "publish_file_location"
) as mock_publish_file_location,
mock.patch.object(mock_det.custom_prepare, "arm_acquisition") as mock_arm_acquisition,
):
mock_det.scaninfo.exp_time = scaninfo["exp_time"]
mock_det.scaninfo.num_points = scaninfo["num_points"]
mock_det.scaninfo.frames_per_trigger = scaninfo["frames_per_trigger"]
@ -248,9 +252,12 @@ def test_trigger(mock_det):
@pytest.mark.parametrize("stopped, expected_abort", [(False, False), (True, True)])
def test_unstage(mock_det, stopped, expected_abort):
with mock.patch.object(mock_det.custom_prepare, "finished") as mock_finished, mock.patch.object(
mock_det.custom_prepare, "publish_file_location"
) as mock_publish_file_location:
with (
mock.patch.object(mock_det.custom_prepare, "finished") as mock_finished,
mock.patch.object(
mock_det.custom_prepare, "publish_file_location"
) as mock_publish_file_location,
):
mock_det.stopped = stopped
if expected_abort:
mock_det.unstage()
@ -264,11 +271,12 @@ def test_unstage(mock_det, stopped, expected_abort):
def test_stop(mock_det):
with mock.patch.object(
mock_det.custom_prepare, "stop_detector"
) as mock_stop_det, mock.patch.object(
mock_det.custom_prepare, "stop_detector_backend"
) as mock_stop_detector_backend:
with (
mock.patch.object(mock_det.custom_prepare, "stop_detector") as mock_stop_det,
mock.patch.object(
mock_det.custom_prepare, "stop_detector_backend"
) as mock_stop_detector_backend,
):
mock_det.stop()
mock_stop_det.assert_called_once()
mock_stop_detector_backend.assert_called_once()
@ -283,11 +291,12 @@ def test_stop(mock_det):
],
)
def test_finished(mock_det, stopped, scaninfo):
with mock.patch.object(
mock_det.custom_prepare, "stop_detector"
) as mock_stop_det, mock.patch.object(
mock_det.custom_prepare, "stop_detector_backend"
) as mock_stop_file_writer:
with (
mock.patch.object(mock_det.custom_prepare, "stop_detector") as mock_stop_det,
mock.patch.object(
mock_det.custom_prepare, "stop_detector_backend"
) as mock_stop_file_writer,
):
mock_det.stopped = stopped
mock_det.dxp.current_pixel._read_pv.mock_data = int(
scaninfo["num_points"] * scaninfo["frames_per_trigger"]

View File

@ -134,11 +134,12 @@ def test_prepare_detector(mock_GrashopperSetup, scaninfo):
setattr(mock_GrashopperSetup.parent.scaninfo, k, v)
# Call the function you want to test
with mock.patch.object(
mock_GrashopperSetup, "set_acquisition_params"
) as mock_set_acquisition_params, mock.patch.object(
mock_GrashopperSetup, "set_exposure_time"
) as mock_set_exposure_time:
with (
mock.patch.object(
mock_GrashopperSetup, "set_acquisition_params"
) as mock_set_acquisition_params,
mock.patch.object(mock_GrashopperSetup, "set_exposure_time") as mock_set_exposure_time,
):
mock_GrashopperSetup.prepare_detector()
# Assert the correct methods are called with the expected arguments

View File

@ -33,11 +33,12 @@ def mock_det():
sim_mode = False
dm = DMMock()
with mock.patch.object(dm, "connector"):
with mock.patch(
"ophyd_devices.epics.devices.psi_detector_base.FileWriter"
) as filemixin, mock.patch(
"ophyd_devices.epics.devices.psi_detector_base.PSIDetectorBase._update_service_config"
) as mock_service_config:
with (
mock.patch("ophyd_devices.epics.devices.psi_detector_base.FileWriter") as filemixin,
mock.patch(
"ophyd_devices.epics.devices.psi_detector_base.PSIDetectorBase._update_service_config"
) as mock_service_config,
):
with mock.patch.object(ophyd, "cl") as mock_cl:
mock_cl.get_pv = MockPV
mock_cl.thread_class = threading.Thread
@ -54,18 +55,25 @@ def test_init():
sim_mode = False
dm = DMMock()
with mock.patch.object(dm, "connector"):
with mock.patch("ophyd_devices.epics.devices.psi_detector_base.FileWriter"), mock.patch(
"ophyd_devices.epics.devices.psi_detector_base.PSIDetectorBase._update_service_config"
with (
mock.patch("ophyd_devices.epics.devices.psi_detector_base.FileWriter"),
mock.patch(
"ophyd_devices.epics.devices.psi_detector_base.PSIDetectorBase._update_service_config"
),
):
with mock.patch.object(ophyd, "cl") as mock_cl:
mock_cl.get_pv = MockPV
with mock.patch(
"ophyd_devices.epics.devices.mcs_csaxs.MCSSetup.initialize_default_parameter"
) as mock_default, mock.patch(
"ophyd_devices.epics.devices.mcs_csaxs.MCSSetup.initialize_detector"
) as mock_init_det, mock.patch(
"ophyd_devices.epics.devices.mcs_csaxs.MCSSetup.initialize_detector_backend"
) as mock_init_backend:
with (
mock.patch(
"ophyd_devices.epics.devices.mcs_csaxs.MCSSetup.initialize_default_parameter"
) as mock_default,
mock.patch(
"ophyd_devices.epics.devices.mcs_csaxs.MCSSetup.initialize_detector"
) as mock_init_det,
mock.patch(
"ophyd_devices.epics.devices.mcs_csaxs.MCSSetup.initialize_detector_backend"
) as mock_init_backend,
):
MCScSAXS(name=name, prefix=prefix, device_manager=dm, sim_mode=sim_mode)
mock_default.assert_called_once()
mock_init_det.assert_called_once()
@ -261,9 +269,12 @@ def test_prepare_detector_backend(mock_det):
@pytest.mark.parametrize("stopped, expected_exception", [(False, False), (True, True)])
def test_unstage(mock_det, stopped, expected_exception):
with mock.patch.object(mock_det.custom_prepare, "finished") as mock_finished, mock.patch.object(
mock_det.custom_prepare, "publish_file_location"
) as mock_publish_file_location:
with (
mock.patch.object(mock_det.custom_prepare, "finished") as mock_finished,
mock.patch.object(
mock_det.custom_prepare, "publish_file_location"
) as mock_publish_file_location,
):
mock_det.stopped = stopped
if expected_exception:
mock_det.unstage()
@ -281,11 +292,12 @@ def test_stop_detector_backend(mock_det):
def test_stop(mock_det):
with mock.patch.object(
mock_det.custom_prepare, "stop_detector"
) as mock_stop_det, mock.patch.object(
mock_det.custom_prepare, "stop_detector_backend"
) as mock_stop_detector_backend:
with (
mock.patch.object(mock_det.custom_prepare, "stop_detector") as mock_stop_det,
mock.patch.object(
mock_det.custom_prepare, "stop_detector_backend"
) as mock_stop_detector_backend,
):
mock_det.stop()
mock_stop_det.assert_called_once()
mock_stop_detector_backend.assert_called_once()

View File

@ -28,8 +28,11 @@ def mock_det():
sim_mode = False
dm = DMMock()
with mock.patch.object(dm, "connector"):
with mock.patch("ophyd_devices.epics.devices.psi_detector_base.FileWriter"), mock.patch(
"ophyd_devices.epics.devices.psi_detector_base.PSIDetectorBase._update_service_config"
with (
mock.patch("ophyd_devices.epics.devices.psi_detector_base.FileWriter"),
mock.patch(
"ophyd_devices.epics.devices.psi_detector_base.PSIDetectorBase._update_service_config"
),
):
with mock.patch.object(ophyd, "cl") as mock_cl:
mock_cl.get_pv = MockPV
@ -99,11 +102,14 @@ def test_stage(mock_det, scaninfo, stopped, expected_exception):
mock_det.filewriter.compile_full_filename.return_value = scaninfo["filepath"]
mock_det.device_manager.add_device("mokev", value=12.4)
mock_det.stopped = stopped
with mock.patch.object(
mock_det.custom_prepare, "prepare_detector_backend"
) as mock_data_backend, mock.patch.object(
mock_det.custom_prepare, "update_readout_time"
) as mock_update_readout_time:
with (
mock.patch.object(
mock_det.custom_prepare, "prepare_detector_backend"
) as mock_data_backend,
mock.patch.object(
mock_det.custom_prepare, "update_readout_time"
) as mock_update_readout_time,
):
mock_det.filepath = scaninfo["filepath"]
if expected_exception:
with pytest.raises(Exception):
@ -225,11 +231,12 @@ def test_publish_file_location(mock_det, scaninfo):
],
)
def test_stop_detector_backend(mock_det, requests_state, expected_exception, url_delete, url_put):
with mock.patch.object(
mock_det.custom_prepare, "send_requests_delete"
) as mock_send_requests_delete, mock.patch.object(
mock_det.custom_prepare, "send_requests_put"
) as mock_send_requests_put:
with (
mock.patch.object(
mock_det.custom_prepare, "send_requests_delete"
) as mock_send_requests_delete,
mock.patch.object(mock_det.custom_prepare, "send_requests_put") as mock_send_requests_put,
):
instance_delete = mock_send_requests_delete.return_value
instance_delete.ok = requests_state
instance_put = mock_send_requests_put.return_value
@ -340,17 +347,13 @@ def test_stop_detector_backend(mock_det, requests_state, expected_exception, url
],
)
def test_prep_file_writer(mock_det, scaninfo, data_msgs, urls, requests_state, expected_exception):
with mock.patch.object(
mock_det.custom_prepare, "close_file_writer"
) as mock_close_file_writer, mock.patch.object(
mock_det.custom_prepare, "stop_file_writer"
) as mock_stop_file_writer, mock.patch.object(
mock_det, "filewriter"
) as mock_filewriter, mock.patch.object(
mock_det.custom_prepare, "create_directory"
) as mock_create_directory, mock.patch.object(
mock_det.custom_prepare, "send_requests_put"
) as mock_send_requests_put:
with (
mock.patch.object(mock_det.custom_prepare, "close_file_writer") as mock_close_file_writer,
mock.patch.object(mock_det.custom_prepare, "stop_file_writer") as mock_stop_file_writer,
mock.patch.object(mock_det, "filewriter") as mock_filewriter,
mock.patch.object(mock_det.custom_prepare, "create_directory") as mock_create_directory,
mock.patch.object(mock_det.custom_prepare, "send_requests_put") as mock_send_requests_put,
):
mock_det.scaninfo.scan_number = scaninfo["scan_number"]
mock_det.scaninfo.num_points = scaninfo["num_points"]
mock_det.scaninfo.frames_per_trigger = scaninfo["frames_per_trigger"]
@ -400,9 +403,12 @@ def test_prep_file_writer(mock_det, scaninfo, data_msgs, urls, requests_state, e
@pytest.mark.parametrize("stopped, expected_exception", [(False, False), (True, True)])
def test_unstage(mock_det, stopped, expected_exception):
with mock.patch.object(mock_det.custom_prepare, "finished") as mock_finished, mock.patch.object(
mock_det.custom_prepare, "publish_file_location"
) as mock_publish_file_location:
with (
mock.patch.object(mock_det.custom_prepare, "finished") as mock_finished,
mock.patch.object(
mock_det.custom_prepare, "publish_file_location"
) as mock_publish_file_location,
):
mock_det.stopped = stopped
if expected_exception:
mock_det.unstage()
@ -415,13 +421,11 @@ def test_unstage(mock_det, stopped, expected_exception):
def test_stop(mock_det):
with mock.patch.object(
mock_det.custom_prepare, "stop_detector"
) as mock_stop_det, mock.patch.object(
mock_det.custom_prepare, "stop_file_writer"
) as mock_stop_file_writer, mock.patch.object(
mock_det.custom_prepare, "close_file_writer"
) as mock_close_file_writer:
with (
mock.patch.object(mock_det.custom_prepare, "stop_detector") as mock_stop_det,
mock.patch.object(mock_det.custom_prepare, "stop_file_writer") as mock_stop_file_writer,
mock.patch.object(mock_det.custom_prepare, "close_file_writer") as mock_close_file_writer,
):
mock_det.stop()
mock_stop_det.assert_called_once()
mock_stop_file_writer.assert_called_once()
@ -438,13 +442,12 @@ def test_stop(mock_det):
],
)
def test_finished(mock_det, stopped, mcs_stage_state, expected_exception):
with mock.patch.object(mock_det, "device_manager") as mock_dm, mock.patch.object(
mock_det.custom_prepare, "stop_file_writer"
) as mock_stop_file_friter, mock.patch.object(
mock_det.custom_prepare, "stop_detector"
) as mock_stop_det, mock.patch.object(
mock_det.custom_prepare, "close_file_writer"
) as mock_close_file_writer:
with (
mock.patch.object(mock_det, "device_manager") as mock_dm,
mock.patch.object(mock_det.custom_prepare, "stop_file_writer") as mock_stop_file_friter,
mock.patch.object(mock_det.custom_prepare, "stop_detector") as mock_stop_det,
mock.patch.object(mock_det.custom_prepare, "close_file_writer") as mock_close_file_writer,
):
mock_dm.devices.mcs.obj._staged = mcs_stage_state
mock_det.stopped = stopped
if expected_exception: