diff --git a/ophyd_devices/eiger1p5m_csaxs/eiger1p5m.py b/ophyd_devices/eiger1p5m_csaxs/eiger1p5m.py index 582e88e..79ed520 100644 --- a/ophyd_devices/eiger1p5m_csaxs/eiger1p5m.py +++ b/ophyd_devices/eiger1p5m_csaxs/eiger1p5m.py @@ -92,7 +92,7 @@ class Eiger1p5MDetector(Device): def stage(self) -> list[object]: scan_msg = self._get_current_scan_msg() self.metadata = { - "scanID": scan_msg.content["scanID"], + "scan_id": scan_msg.content["scan_id"], "RID": scan_msg.content["info"]["RID"], "queueID": scan_msg.content["info"]["queueID"], } diff --git a/ophyd_devices/epics/devices/delay_generator_csaxs.py b/ophyd_devices/epics/devices/delay_generator_csaxs.py index 18fa474..bea33fa 100644 --- a/ophyd_devices/epics/devices/delay_generator_csaxs.py +++ b/ophyd_devices/epics/devices/delay_generator_csaxs.py @@ -1,14 +1,12 @@ +from bec_lib import bec_logger from ophyd import Component -from ophyd_devices.utils import bec_utils from ophyd_devices.epics.devices.psi_delay_generator_base import ( - PSIDelayGeneratorBase, DDGCustomMixin, + PSIDelayGeneratorBase, TriggerSource, ) - -from bec_lib import bec_logger - +from ophyd_devices.utils import bec_utils logger = bec_logger.logger @@ -34,14 +32,10 @@ class DDGSetup(DDGCustomMixin): self.parent.set_channels("offset", self.parent.offset.get()) # Setup reference self.parent.set_channels( - "reference", - 0, - [f"channel{pair}.ch1" for pair in self.parent.all_delay_pairs], + "reference", 0, [f"channel{pair}.ch1" for pair in self.parent.all_delay_pairs] ) self.parent.set_channels( - "reference", - 0, - [f"channel{pair}.ch2" for pair in self.parent.all_delay_pairs], + "reference", 0, [f"channel{pair}.ch2" for pair in self.parent.all_delay_pairs] ) self.parent.set_trigger(getattr(TriggerSource, self.parent.set_trigger_source.get())) # Set threshold level for ext. pulses @@ -157,15 +151,15 @@ class DDGSetup(DDGCustomMixin): if self.parent.source.read()[self.parent.source.name]["value"] == TriggerSource.SINGLE_SHOT: self.parent.trigger_shot.put(1) - def check_scanID(self) -> None: + def check_scan_id(self) -> None: """ - Method to check if scanID has changed. + Method to check if scan_id has changed. If yes, then it changes parent.stopped to True, which will stop further actions. """ - old_scanID = self.parent.scaninfo.scanID + old_scan_id = self.parent.scaninfo.scan_id self.parent.scaninfo.load_scan_metadata() - if self.parent.scaninfo.scanID != old_scanID: + if self.parent.scaninfo.scan_id != old_scan_id: self.parent.stopped = True def finished(self) -> None: @@ -211,17 +205,11 @@ class DelayGeneratorcSAXS(PSIDelayGeneratorBase): custom_prepare_cls = DDGSetup delay_burst = Component( - bec_utils.ConfigSignal, - name="delay_burst", - kind="config", - config_storage_name="ddg_config", + bec_utils.ConfigSignal, name="delay_burst", kind="config", config_storage_name="ddg_config" ) delta_width = Component( - bec_utils.ConfigSignal, - name="delta_width", - kind="config", - config_storage_name="ddg_config", + bec_utils.ConfigSignal, name="delta_width", kind="config", config_storage_name="ddg_config" ) additional_triggers = Component( @@ -232,10 +220,7 @@ class DelayGeneratorcSAXS(PSIDelayGeneratorBase): ) polarity = Component( - bec_utils.ConfigSignal, - name="polarity", - kind="config", - config_storage_name="ddg_config", + bec_utils.ConfigSignal, name="polarity", kind="config", config_storage_name="ddg_config" ) fixed_ttl_width = Component( @@ -246,17 +231,11 @@ class DelayGeneratorcSAXS(PSIDelayGeneratorBase): ) amplitude = Component( - bec_utils.ConfigSignal, - name="amplitude", - kind="config", - config_storage_name="ddg_config", + bec_utils.ConfigSignal, name="amplitude", kind="config", config_storage_name="ddg_config" ) offset = Component( - bec_utils.ConfigSignal, - name="offset", - kind="config", - config_storage_name="ddg_config", + bec_utils.ConfigSignal, name="offset", kind="config", config_storage_name="ddg_config" ) thres_trig_level = Component( diff --git a/ophyd_devices/epics/devices/eiger9m_csaxs.py b/ophyd_devices/epics/devices/eiger9m_csaxs.py index 9873326..0b2c1aa 100644 --- a/ophyd_devices/epics/devices/eiger9m_csaxs.py +++ b/ophyd_devices/epics/devices/eiger9m_csaxs.py @@ -1,23 +1,18 @@ import enum +import os import threading import time -import numpy as np -import os - from typing import Any -from ophyd import EpicsSignal, EpicsSignalRO, EpicsSignalWithRBV -from ophyd import Device +import numpy as np +from bec_lib import messages, threadlocked +from bec_lib.endpoints import MessageEndpoints +from bec_lib.logger import bec_logger from ophyd import ADComponent as ADCpt - +from ophyd import Device, EpicsSignal, EpicsSignalRO, EpicsSignalWithRBV from std_daq_client import StdDaqClient -from bec_lib import threadlocked -from bec_lib.logger import bec_logger -from bec_lib import messages -from bec_lib.endpoints import MessageEndpoints - -from ophyd_devices.epics.devices.psi_detector_base import PSIDetectorBase, CustomDetectorMixin +from ophyd_devices.epics.devices.psi_detector_base import CustomDetectorMixin, PSIDetectorBase logger = bec_logger.logger @@ -78,16 +73,9 @@ class Eiger9MSetup(CustomDetectorMixin): eacc = self.parent.scaninfo.username self.update_std_cfg("writer_user_id", int(eacc.strip(" e"))) - signal_conditions = [ - ( - lambda: self.std_client.get_status()["state"], - "READY", - ), - ] + signal_conditions = [(lambda: self.std_client.get_status()["state"], "READY")] if not self.wait_for_signals( - signal_conditions=signal_conditions, - timeout=self.parent.timeout, - all_signals=True, + signal_conditions=signal_conditions, timeout=self.parent.timeout, all_signals=True ): raise EigerTimeoutError( f"Std client not in READY state, returns: {self.std_client.get_status()}" @@ -282,11 +270,11 @@ class Eiger9MSetup(CustomDetectorMixin): f"Failed to arm the acquisition. Detector state {signal_conditions[0][0]}" ) - def check_scanID(self) -> None: - """Checks if scanID has changed and stops the scan if it has""" - old_scanID = self.parent.scaninfo.scanID + def check_scan_id(self) -> None: + """Checks if scan_id has changed and stops the scan if it has""" + old_scan_id = self.parent.scaninfo.scan_id self.parent.scaninfo.load_scan_metadata() - if self.parent.scaninfo.scanID != old_scanID: + if self.parent.scaninfo.scan_id != old_scan_id: self.parent.stopped = True def publish_file_location(self, done: bool = False, successful: bool = None) -> None: @@ -309,7 +297,7 @@ class Eiger9MSetup(CustomDetectorMixin): file_path=self.parent.filepath, done=done, successful=successful ) self.parent.connector.set_and_publish( - MessageEndpoints.public_file(self.parent.scaninfo.scanID, self.parent.name), + MessageEndpoints.public_file(self.parent.scaninfo.scan_id, self.parent.name), msg, pipe=pipe, ) @@ -404,9 +392,7 @@ class Eiger9McSAXS(PSIDetectorBase): """ # Specify which functions are revealed to the user in BEC client - USER_ACCESS = [ - "describe", - ] + USER_ACCESS = ["describe"] # specify Setup class custom_prepare_cls = Eiger9MSetup diff --git a/ophyd_devices/epics/devices/falcon_csaxs.py b/ophyd_devices/epics/devices/falcon_csaxs.py index 09d61ff..5b6bc70 100644 --- a/ophyd_devices/epics/devices/falcon_csaxs.py +++ b/ophyd_devices/epics/devices/falcon_csaxs.py @@ -1,18 +1,14 @@ import enum import os -from ophyd import EpicsSignal, EpicsSignalRO, EpicsSignalWithRBV, Component as Cpt -from ophyd.mca import EpicsMCARecord -from ophyd import Device - - -from bec_lib.endpoints import MessageEndpoints from bec_lib import messages +from bec_lib.endpoints import MessageEndpoints from bec_lib.logger import bec_logger +from ophyd import Component as Cpt +from ophyd import Device, EpicsSignal, EpicsSignalRO, EpicsSignalWithRBV +from ophyd.mca import EpicsMCARecord - -from ophyd_devices.epics.devices.psi_detector_base import PSIDetectorBase, CustomDetectorMixin - +from ophyd_devices.epics.devices.psi_detector_base import CustomDetectorMixin, PSIDetectorBase logger = bec_logger.logger @@ -145,10 +141,7 @@ class FalconSetup(CustomDetectorMixin): self.parent.erase_all.put(1) signal_conditions = [ - ( - lambda: self.parent.state.read()[self.parent.state.name]["value"], - DetectorState.DONE, - ), + (lambda: self.parent.state.read()[self.parent.state.name]["value"], DetectorState.DONE) ] if not self.wait_for_signals( @@ -225,11 +218,11 @@ class FalconSetup(CustomDetectorMixin): f"Failed to arm the acquisition. Detector state {signal_conditions[0][0]}" ) - def check_scanID(self) -> None: - """Checks if scanID has changed and stops the scan if it has""" - old_scanID = self.parent.scaninfo.scanID + def check_scan_id(self) -> None: + """Checks if scan_id has changed and stops the scan if it has""" + old_scan_id = self.parent.scaninfo.scan_id self.parent.scaninfo.load_scan_metadata() - if self.parent.scaninfo.scanID != old_scanID: + if self.parent.scaninfo.scan_id != old_scan_id: self.parent.stopped = True def publish_file_location(self, done: bool = False, successful: bool = None) -> None: @@ -252,7 +245,7 @@ class FalconSetup(CustomDetectorMixin): file_path=self.parent.filepath, done=done, successful=successful ) self.parent.connector.set_and_publish( - MessageEndpoints.public_file(self.parent.scaninfo.scanID, self.parent.name), + MessageEndpoints.public_file(self.parent.scaninfo.scan_id, self.parent.name), msg, pipe=pipe, ) @@ -267,14 +260,8 @@ class FalconSetup(CustomDetectorMixin): self.parent.scaninfo.num_points * self.parent.scaninfo.frames_per_trigger ) signal_conditions = [ - ( - self.parent.dxp.current_pixel.get, - total_frames, - ), - ( - self.parent.hdf5.array_counter.get, - total_frames, - ), + (self.parent.dxp.current_pixel.get, total_frames), + (self.parent.hdf5.array_counter.get, total_frames), ] if not self.wait_for_signals( signal_conditions=signal_conditions, @@ -308,9 +295,7 @@ class FalconcSAXS(PSIDetectorBase): """ # Specify which functions are revealed to the user in BEC client - USER_ACCESS = [ - "describe", - ] + USER_ACCESS = ["describe"] # specify Setup class custom_prepare_cls = FalconSetup diff --git a/ophyd_devices/epics/devices/mcs_csaxs.py b/ophyd_devices/epics/devices/mcs_csaxs.py index 5158b78..2041a7a 100644 --- a/ophyd_devices/epics/devices/mcs_csaxs.py +++ b/ophyd_devices/epics/devices/mcs_csaxs.py @@ -1,14 +1,14 @@ import enum import threading from collections import defaultdict + import numpy as np +from bec_lib import MessageEndpoints, bec_logger, messages, threadlocked +from ophyd import Component as Cpt +from ophyd import Device, EpicsSignal, EpicsSignalRO -from ophyd import EpicsSignal, EpicsSignalRO, Device, Component as Cpt +from ophyd_devices.epics.devices.psi_detector_base import CustomDetectorMixin, PSIDetectorBase from ophyd_devices.utils import bec_utils -from ophyd_devices.epics.devices.psi_detector_base import PSIDetectorBase, CustomDetectorMixin - -from bec_lib import messages, MessageEndpoints, bec_logger, threadlocked - logger = bec_logger.logger @@ -127,19 +127,13 @@ class MCSSetup(CustomDetectorMixin): if self.parent.scaninfo.scan_msg is None: return metadata = self.parent.scaninfo.scan_msg.metadata - metadata.update( - { - "async_update": "append", - "num_lines": self.parent.num_lines.get(), - } - ) + metadata.update({"async_update": "append", "num_lines": self.parent.num_lines.get()}) msg = messages.DeviceMessage( - signals=dict(self.mca_data), - metadata=self.parent.scaninfo.scan_msg.metadata, + signals=dict(self.mca_data), metadata=self.parent.scaninfo.scan_msg.metadata ) self.parent.connector.xadd( topic=MessageEndpoints.device_async_readback( - scanID=self.parent.scaninfo.scanID, device=self.parent.name + scan_id=self.parent.scaninfo.scan_id, device=self.parent.name ), msg={"data": msg}, expire=self._stream_ttl, @@ -181,14 +175,8 @@ class MCSSetup(CustomDetectorMixin): def finished(self) -> None: """Check if acquisition is finished, if not successful, rais MCSTimeoutError""" signal_conditions = [ - ( - lambda: self.acquisition_done, - True, - ), - ( - self.parent.acquiring.get, - 0, # Considering making a enum.Int class for this state - ), + (lambda: self.acquisition_done, True), + (self.parent.acquiring.get, 0), # Considering making a enum.Int class for this state ] if not self.wait_for_signals( signal_conditions=signal_conditions, @@ -272,10 +260,7 @@ class MCScSAXS(PSIDetectorBase): # Custom signal readout from device config num_lines = Cpt( - bec_utils.ConfigSignal, - name="num_lines", - kind="config", - config_storage_name="mcs_config", + bec_utils.ConfigSignal, name="num_lines", kind="config", config_storage_name="mcs_config" ) def __init__( @@ -292,9 +277,7 @@ class MCScSAXS(PSIDetectorBase): mcs_config=None, **kwargs, ): - self.mcs_config = { - f"{name}_num_lines": 1, - } + self.mcs_config = {f"{name}_num_lines": 1} if mcs_config is not None: # pylint: disable=expression-not-assigned [self.mcs_config.update({f"{name}_{key}": value}) for key, value in mcs_config.items()] diff --git a/ophyd_devices/epics/devices/pilatus_csaxs.py b/ophyd_devices/epics/devices/pilatus_csaxs.py index 390db83..fd62695 100644 --- a/ophyd_devices/epics/devices/pilatus_csaxs.py +++ b/ophyd_devices/epics/devices/pilatus_csaxs.py @@ -2,16 +2,14 @@ import enum import json import os import time -import requests + import numpy as np - -from ophyd import EpicsSignal, EpicsSignalRO, EpicsSignalWithRBV -from ophyd import Device, Staged +import requests +from bec_lib import MessageEndpoints, bec_logger, messages from ophyd import ADComponent as ADCpt +from ophyd import Device, EpicsSignal, EpicsSignalRO, EpicsSignalWithRBV, Staged -from bec_lib import messages, MessageEndpoints, bec_logger - -from ophyd_devices.epics.devices.psi_detector_base import PSIDetectorBase, CustomDetectorMixin +from ophyd_devices.epics.devices.psi_detector_base import CustomDetectorMixin, PSIDetectorBase logger = bec_logger.logger @@ -346,7 +344,7 @@ class PilatusSetup(CustomDetectorMixin): metadata={"input_path": self.parent.filepath_raw}, ) self.parent.connector.set_and_publish( - MessageEndpoints.public_file(self.parent.scaninfo.scanID, self.parent.name), + MessageEndpoints.public_file(self.parent.scaninfo.scan_id, self.parent.name), msg, pipe=pipe, ) @@ -360,10 +358,7 @@ class PilatusSetup(CustomDetectorMixin): # pylint: disable=protected-access # TODO: at the moment this relies on device.mcs.obj._staged attribute signal_conditions = [ - ( - lambda: self.parent.device_manager.devices.mcs.obj._staged, - Staged.no, - ), + (lambda: self.parent.device_manager.devices.mcs.obj._staged, Staged.no) ] if not self.wait_for_signals( signal_conditions=signal_conditions, @@ -383,11 +378,11 @@ class PilatusSetup(CustomDetectorMixin): """Stop detector""" self.parent.cam.acquire.put(0) - def check_scanID(self) -> None: - """Checks if scanID has changed and stops the scan if it has""" - old_scanID = self.parent.scaninfo.scanID + def check_scan_id(self) -> None: + """Checks if scan_id has changed and stops the scan if it has""" + old_scan_id = self.parent.scaninfo.scan_id self.parent.scaninfo.load_scan_metadata() - if self.parent.scaninfo.scanID != old_scanID: + if self.parent.scaninfo.scan_id != old_scan_id: self.parent.stopped = True @@ -405,9 +400,7 @@ class PilatuscSAXS(PSIDetectorBase): """ # Specify which functions are revealed to the user in BEC client - USER_ACCESS = [ - "describe", - ] + USER_ACCESS = ["describe"] # specify Setup class custom_prepare_cls = PilatusSetup diff --git a/ophyd_devices/epics/devices/psi_delay_generator_base.py b/ophyd_devices/epics/devices/psi_delay_generator_base.py index 5068343..03c76ea 100644 --- a/ophyd_devices/epics/devices/psi_delay_generator_base.py +++ b/ophyd_devices/epics/devices/psi_delay_generator_base.py @@ -1,21 +1,28 @@ -import time import enum +import time from typing import Any -from ophyd import Device, Component, EpicsSignal, EpicsSignalRO, Kind -from ophyd import PVPositioner, Signal, DeviceStatus -from ophyd.pseudopos import ( - pseudo_position_argument, - real_position_argument, - PseudoSingle, - PseudoPositioner, -) -from ophyd.device import Staged - -from ophyd_devices.utils.bec_scaninfo_mixin import BecScaninfoMixin -from ophyd_devices.utils import bec_utils from bec_lib import bec_logger +from ophyd import ( + Component, + Device, + DeviceStatus, + EpicsSignal, + EpicsSignalRO, + Kind, + PVPositioner, + Signal, +) +from ophyd.device import Staged +from ophyd.pseudopos import ( + PseudoPositioner, + PseudoSingle, + pseudo_position_argument, + real_position_argument, +) +from ophyd_devices.utils import bec_utils +from ophyd_devices.utils.bec_scaninfo_mixin import BecScaninfoMixin logger = bec_logger.logger @@ -71,18 +78,10 @@ class DelayStatic(Device): kind=Kind.config, ) amplitude = Component( - EpicsSignal, - "OutputAmpAI", - write_pv="OutputAmpAO", - name="amplitude", - kind=Kind.config, + EpicsSignal, "OutputAmpAI", write_pv="OutputAmpAO", name="amplitude", kind=Kind.config ) offset = Component( - EpicsSignal, - "OutputOffsetAI", - write_pv="OutputOffsetAO", - name="offset", - kind=Kind.config, + EpicsSignal, "OutputOffsetAI", write_pv="OutputOffsetAO", name="offset", kind=Kind.config ) @@ -178,8 +177,8 @@ class DDGCustomMixin: Example could be to open the shutter by triggering a pulse via pre_scan. """ - def check_scanID(self) -> None: - """Method to check if there is a new scanID, called by stage.""" + def check_scan_id(self) -> None: + """Method to check if there is a new scan_id, called by stage.""" def is_ddg_okay(self, raise_on_error=False) -> None: """ @@ -246,13 +245,7 @@ class PSIDelayGeneratorBase(Device): SUB_VALUE = "value" _default_sub = SUB_VALUE - USER_ACCESS = [ - "set_channels", - "_set_trigger", - "burst_enable", - "burst_disable", - "reload_config", - ] + USER_ACCESS = ["set_channels", "_set_trigger", "burst_enable", "burst_disable", "reload_config"] # Assign PVs from DDG645 trigger_burst_readout = Component( @@ -307,39 +300,19 @@ class PSIDelayGeneratorBase(Device): ) trigger_shot = Component(EpicsSignal, "TriggerDelayBO", name="trigger_shot", kind="config") burstMode = Component( - EpicsSignal, - "BurstModeBI", - write_pv="BurstModeBO", - name="burstmode", - kind=Kind.config, + EpicsSignal, "BurstModeBI", write_pv="BurstModeBO", name="burstmode", kind=Kind.config ) burstConfig = Component( - EpicsSignal, - "BurstConfigBI", - write_pv="BurstConfigBO", - name="burstconfig", - kind=Kind.config, + EpicsSignal, "BurstConfigBI", write_pv="BurstConfigBO", name="burstconfig", kind=Kind.config ) burstCount = Component( - EpicsSignal, - "BurstCountLI", - write_pv="BurstCountLO", - name="burstcount", - kind=Kind.config, + EpicsSignal, "BurstCountLI", write_pv="BurstCountLO", name="burstcount", kind=Kind.config ) burstDelay = Component( - EpicsSignal, - "BurstDelayAI", - write_pv="BurstDelayAO", - name="burstdelay", - kind=Kind.config, + EpicsSignal, "BurstDelayAI", write_pv="BurstDelayAO", name="burstdelay", kind=Kind.config ) burstPeriod = Component( - EpicsSignal, - "BurstPeriodAI", - write_pv="BurstPeriodAO", - name="burstperiod", - kind=Kind.config, + EpicsSignal, "BurstPeriodAI", write_pv="BurstPeriodAO", name="burstperiod", kind=Kind.config ) def __init__( @@ -375,13 +348,7 @@ class PSIDelayGeneratorBase(Device): self.name = name self.scaninfo = None self.timeout = 5 - self.all_channels = [ - "channelT0", - "channelAB", - "channelCD", - "channelEF", - "channelGH", - ] + self.all_channels = ["channelT0", "channelAB", "channelCD", "channelEF", "channelGH"] self.all_delay_pairs = ["AB", "CD", "EF", "GH"] self.wait_for_connection(all_signals=True) @@ -446,10 +413,7 @@ class PSIDelayGeneratorBase(Device): assert count > 0, "Number of bursts must be positive" assert delay >= 0, "Burst delay must be larger than 0" assert period > 0, "Burst period must be positive" - assert config in [ - "all", - "first", - ], "Supported burst configs are 'all' and 'first'" + assert config in ["all", "first"], "Supported burst configs are 'all' and 'first'" self.burstMode.put(1) self.burstCount.put(count) @@ -517,14 +481,14 @@ class PSIDelayGeneratorBase(Device): Otherwise, checks if the DDG finished acquisition Internal Calls: - - custom_prepare.check_scanID : check if scanID changed or detector stopped + - custom_prepare.check_scan_id : check if scan_id changed or detector stopped - custom_prepare.finished : check if device finished acquisition (succesfully) - is_ddg_okay : check if DDG is okay Returns: list(object): list of objects that were unstaged """ - self.custom_prepare.check_scanID() + self.custom_prepare.check_scan_id() if self.stopped is True: return super().unstage() self.custom_prepare.finished() diff --git a/ophyd_devices/epics/devices/psi_detector_base.py b/ophyd_devices/epics/devices/psi_detector_base.py index 5e18518..f8ee8d8 100644 --- a/ophyd_devices/epics/devices/psi_detector_base.py +++ b/ophyd_devices/epics/devices/psi_detector_base.py @@ -5,8 +5,9 @@ from bec_lib.device import DeviceStatus from bec_lib.file_utils import FileWriterMixin from ophyd import Device from ophyd.device import Staged -from ophyd_devices.utils.bec_scaninfo_mixin import BecScaninfoMixin + from ophyd_devices.utils import bec_utils +from ophyd_devices.utils.bec_scaninfo_mixin import BecScaninfoMixin class DetectorInitError(Exception): @@ -99,9 +100,9 @@ class CustomDetectorMixin: DetectorTimeoutError: if detector cannot be stopped """ - def check_scanID(self) -> None: + def check_scan_id(self) -> None: """ - Check if BEC is running on a new scanID + Check if BEC is running on a new scan_id """ def publish_file_location(self, done: bool = False, successful: bool = None) -> None: @@ -299,14 +300,14 @@ class PSIDetectorBase(Device): if data acquisition on device finished (an was successful) Internal Calls: - - custom_prepare.check_scanID : check if scanID changed or detector stopped + - custom_prepare.check_scan_id : check if scan_id changed or detector stopped - custom_prepare.finished : check if device finished acquisition (succesfully) - custom_prepare.publish_file_location : publish file location to bec Returns: list(object): list of objects that were unstaged """ - self.custom_prepare.check_scanID() + self.custom_prepare.check_scan_id() if self.stopped is True: return super().unstage() self.custom_prepare.finished() diff --git a/ophyd_devices/utils/bec_scaninfo_mixin.py b/ophyd_devices/utils/bec_scaninfo_mixin.py index f005927..809437a 100644 --- a/ophyd_devices/utils/bec_scaninfo_mixin.py +++ b/ophyd_devices/utils/bec_scaninfo_mixin.py @@ -1,6 +1,6 @@ import getpass -from bec_lib import DeviceManagerBase, messages, MessageEndpoints, bec_logger +from bec_lib import DeviceManagerBase, MessageEndpoints, bec_logger, messages logger = bec_logger.logger @@ -67,7 +67,7 @@ class BecScaninfoMixin: self.device_manager = device_manager self.sim_mode = sim_mode self.scan_msg = None - self.scanID = None + self.scan_id = None if bec_info_msg is None: infomsgmock = BECInfoMsgMock() self.bec_info_msg = infomsgmock.get_bec_info_msg() @@ -94,11 +94,7 @@ class BecScaninfoMixin: return None return msg - return messages.ScanStatusMessage( - scanID="1", - status={}, - info=self.bec_info_msg, - ) + return messages.ScanStatusMessage(scan_id="1", status={}, info=self.bec_info_msg) def get_username(self) -> str: """Get username""" @@ -119,11 +115,11 @@ class BecScaninfoMixin: logger.info(f"{self.scan_msg}") try: self.metadata = { - "scanID": scan_msg.content["scanID"], + "scan_id": scan_msg.content["scan_id"], "RID": scan_msg.content["info"]["RID"], "queueID": scan_msg.content["info"]["queueID"], } - self.scanID = scan_msg.content["scanID"] + self.scan_id = scan_msg.content["scan_id"] self.scan_number = scan_msg.content["info"]["scan_number"] self.exp_time = scan_msg.content["info"]["exp_time"] self.frames_per_trigger = scan_msg.content["info"]["frames_per_trigger"] diff --git a/tests/test_delay_generator_csaxs.py b/tests/test_delay_generator_csaxs.py index ecac0b7..27be21b 100644 --- a/tests/test_delay_generator_csaxs.py +++ b/tests/test_delay_generator_csaxs.py @@ -1,7 +1,8 @@ # pylint: skip-file -import pytest from unittest import mock +import pytest + from ophyd_devices.epics.devices.delay_generator_csaxs import DDGSetup from ophyd_devices.epics.devices.psi_delay_generator_base import TriggerSource @@ -26,7 +27,7 @@ def mock_DDGSetup(): @pytest.fixture( params=[ { - "scanID": "1234", + "scan_id": "1234", "scan_type": "step", "num_points": 500, "frames_per_trigger": 1, @@ -34,7 +35,7 @@ def mock_DDGSetup(): "readout_time": 0.1, }, { - "scanID": "1234", + "scan_id": "1234", "scan_type": "step", "num_points": 500, "frames_per_trigger": 5, @@ -42,7 +43,7 @@ def mock_DDGSetup(): "readout_time": 0, }, { - "scanID": "1234", + "scan_id": "1234", "scan_type": "fly", "num_points": 500, "frames_per_trigger": 1, @@ -50,7 +51,7 @@ def mock_DDGSetup(): "readout_time": 0.2, }, { - "scanID": "1234", + "scan_id": "1234", "scan_type": "fly", "num_points": 500, "frames_per_trigger": 5, @@ -135,8 +136,8 @@ def channel_pairs(request): return request.param -def test_check_scanID(mock_DDGSetup, scaninfo, ddg_config_defaults, ddg_config_scan): - """Test the check_scanID method.""" +def test_check_scan_id(mock_DDGSetup, scaninfo, ddg_config_defaults, ddg_config_scan): + """Test the check_scan_id method.""" # Set first attributes of parent class for k, v in scaninfo.items(): setattr(mock_DDGSetup.parent.scaninfo, k, v) @@ -145,12 +146,12 @@ def test_check_scanID(mock_DDGSetup, scaninfo, ddg_config_defaults, ddg_config_s for k, v in ddg_config_scan.items(): getattr(mock_DDGSetup.parent, k).get.return_value = v # Call the function you want to test - mock_DDGSetup.check_scanID() + mock_DDGSetup.check_scan_id() mock_DDGSetup.parent.scaninfo.load_scan_metadata.assert_called_once() def test_on_pre_scan(mock_DDGSetup, scaninfo, ddg_config_defaults, ddg_config_scan): - """Test the check_scanID method.""" + """Test the check_scan_id method.""" # Set first attributes of parent class for k, v in scaninfo.items(): setattr(mock_DDGSetup.parent.scaninfo, k, v) @@ -185,11 +186,7 @@ def test_on_trigger(mock_DDGSetup, scaninfo, ddg_config_defaults, ddg_config_sca def test_initialize_default_parameter( - mock_DDGSetup, - scaninfo, - ddg_config_defaults, - ddg_config_scan, - channel_pairs, + mock_DDGSetup, scaninfo, ddg_config_defaults, ddg_config_scan, channel_pairs ): """Test the initialize_default_parameter method.""" # Set first attributes of parent class @@ -229,13 +226,7 @@ def test_initialize_default_parameter( mock_DDGSetup.parent.set_channels.assert_has_calls(calls) -def test_prepare_ddg( - mock_DDGSetup, - scaninfo, - ddg_config_defaults, - ddg_config_scan, - channel_pairs, -): +def test_prepare_ddg(mock_DDGSetup, scaninfo, ddg_config_defaults, ddg_config_scan, channel_pairs): """Test the prepare_ddg method.""" # Set first attributes of parent class for k, v in scaninfo.items(): diff --git a/tests/test_eiger9m_csaxs.py b/tests/test_eiger9m_csaxs.py index cf25ec3..7be017e 100644 --- a/tests/test_eiger9m_csaxs.py +++ b/tests/test_eiger9m_csaxs.py @@ -1,13 +1,12 @@ # pylint: skip-file -import pytest import threading from unittest import mock import ophyd +import pytest +from bec_lib import MessageEndpoints, messages -from bec_lib import messages, MessageEndpoints from ophyd_devices.epics.devices.eiger9m_csaxs import Eiger9McSAXS - from tests.utils import DMMock, MockPV @@ -72,26 +71,9 @@ def test_init(): @pytest.mark.parametrize( - "trigger_source, detector_state, expected_exception", - [ - ( - 2, - 1, - True, - ), - ( - 2, - 0, - False, - ), - ], + "trigger_source, detector_state, expected_exception", [(2, 1, True), (2, 0, False)] ) -def test_initialize_detector( - mock_det, - trigger_source, - detector_state, - expected_exception, -): +def test_initialize_detector(mock_det, trigger_source, detector_state, expected_exception): """Test the _init function: This includes testing the functions: @@ -125,13 +107,7 @@ def test_trigger(mock_det): @pytest.mark.parametrize( - "readout_time, expected_value", - [ - (1e-3, 3e-3), - (3e-3, 3e-3), - (5e-3, 5e-3), - (None, 3e-3), - ], + "readout_time, expected_value", [(1e-3, 3e-3), (3e-3, 3e-3), (5e-3, 5e-3), (None, 3e-3)] ) def test_update_readout_time(mock_det, readout_time, expected_value): if readout_time is None: @@ -146,34 +122,10 @@ def test_update_readout_time(mock_det, readout_time, expected_value): @pytest.mark.parametrize( "eacc, exp_url, daq_status, daq_cfg, expected_exception", [ - ( - "e12345", - "http://xbl-daq-29:5000", - {"state": "READY"}, - {"writer_user_id": 12543}, - False, - ), - ( - "e12345", - "http://xbl-daq-29:5000", - {"state": "READY"}, - {"writer_user_id": 15421}, - False, - ), - ( - "e12345", - "http://xbl-daq-29:5000", - {"state": "BUSY"}, - {"writer_user_id": 15421}, - True, - ), - ( - "e12345", - "http://xbl-daq-29:5000", - {"state": "READY"}, - {"writer_ud": 12345}, - True, - ), + ("e12345", "http://xbl-daq-29:5000", {"state": "READY"}, {"writer_user_id": 12543}, False), + ("e12345", "http://xbl-daq-29:5000", {"state": "READY"}, {"writer_user_id": 15421}, False), + ("e12345", "http://xbl-daq-29:5000", {"state": "BUSY"}, {"writer_user_id": 15421}, True), + ("e12345", "http://xbl-daq-29:5000", {"state": "READY"}, {"writer_ud": 12345}, True), ], ) def test_initialize_detector_backend( @@ -215,7 +167,7 @@ def test_initialize_detector_backend( "num_points": 500, "frames_per_trigger": 1, "filepath": "test.h5", - "scanID": "123", + "scan_id": "123", "mokev": 12.4, }, {"state": "READY"}, @@ -230,7 +182,7 @@ def test_initialize_detector_backend( "num_points": 500, "frames_per_trigger": 1, "filepath": "test.h5", - "scanID": "123", + "scan_id": "123", "mokev": 12.4, }, {"state": "BUSY"}, @@ -245,7 +197,7 @@ def test_initialize_detector_backend( "num_points": 500, "frames_per_trigger": 1, "filepath": "test.h5", - "scanID": "123", + "scan_id": "123", "mokev": 18.4, }, {"state": "READY"}, @@ -257,13 +209,7 @@ def test_initialize_detector_backend( ], ) def test_stage( - mock_det, - scaninfo, - daq_status, - daq_cfg, - detector_state, - stopped, - expected_exception, + mock_det, scaninfo, daq_status, daq_cfg, detector_state, stopped, expected_exception ): with mock.patch.object( mock_det.custom_prepare, "std_client" @@ -309,7 +255,7 @@ def test_stage( "num_points": 500, "frames_per_trigger": 1, "filepath": "test.h5", - "scanID": "123", + "scan_id": "123", }, {"state": "BUSY", "acquisition": {"state": "WAITING_IMAGES"}}, False, @@ -320,7 +266,7 @@ def test_stage( "num_points": 500, "frames_per_trigger": 1, "filepath": "test.h5", - "scanID": "123", + "scan_id": "123", }, {"state": "BUSY", "acquisition": {"state": "WAITING_IMAGES"}}, False, @@ -331,7 +277,7 @@ def test_stage( "num_points": 500, "frames_per_trigger": 1, "filepath": "test.h5", - "scanID": "123", + "scan_id": "123", }, {"state": "BUSY", "acquisition": {"state": "ERROR"}}, True, @@ -373,24 +319,8 @@ def test_prepare_detector_backend(mock_det, scaninfo, daq_status, expected_excep mock_std_daq.start_writer_async.assert_called_with(daq_writer_call) -@pytest.mark.parametrize( - "stopped, expected_exception", - [ - ( - False, - False, - ), - ( - True, - True, - ), - ], -) -def test_unstage( - mock_det, - stopped, - expected_exception, -): +@pytest.mark.parametrize("stopped, expected_exception", [(False, False), (True, True)]) +def test_unstage(mock_det, stopped, expected_exception): with mock.patch.object(mock_det.custom_prepare, "finished") as mock_finished, mock.patch.object( mock_det.custom_prepare, "publish_file_location" ) as mock_publish_file_location: @@ -416,13 +346,13 @@ def test_stop_detector_backend(mock_det): @pytest.mark.parametrize( "scaninfo", [ - ({"filepath": "test.h5", "successful": True, "done": False, "scanID": "123"}), - ({"filepath": "test.h5", "successful": False, "done": True, "scanID": "123"}), - ({"filepath": "test.h5", "successful": None, "done": True, "scanID": "123"}), + ({"filepath": "test.h5", "successful": True, "done": False, "scan_id": "123"}), + ({"filepath": "test.h5", "successful": False, "done": True, "scan_id": "123"}), + ({"filepath": "test.h5", "successful": None, "done": True, "scan_id": "123"}), ], ) def test_publish_file_location(mock_det, scaninfo): - mock_det.scaninfo.scanID = scaninfo["scanID"] + mock_det.scaninfo.scan_id = scaninfo["scan_id"] mock_det.filepath = scaninfo["filepath"] mock_det.custom_prepare.publish_file_location( done=scaninfo["done"], successful=scaninfo["successful"] @@ -435,7 +365,7 @@ def test_publish_file_location(mock_det, scaninfo): ) expected_calls = [ mock.call( - MessageEndpoints.public_file(scaninfo["scanID"], mock_det.name), + MessageEndpoints.public_file(scaninfo["scan_id"], mock_det.name), msg, pipe=mock_det.connector.pipeline.return_value, ), @@ -465,40 +395,28 @@ def test_stop(mock_det): [ ( False, - { - "num_points": 500, - "frames_per_trigger": 4, - }, + {"num_points": 500, "frames_per_trigger": 4}, 0, {"acquisition": {"state": "FINISHED", "stats": {"n_write_completed": 2000}}}, False, ), ( False, - { - "num_points": 500, - "frames_per_trigger": 4, - }, + {"num_points": 500, "frames_per_trigger": 4}, 0, {"acquisition": {"state": "FINISHED", "stats": {"n_write_completed": 1999}}}, True, ), ( False, - { - "num_points": 500, - "frames_per_trigger": 1, - }, + {"num_points": 500, "frames_per_trigger": 1}, 1, {"acquisition": {"state": "READY", "stats": {"n_write_completed": 500}}}, True, ), ( False, - { - "num_points": 500, - "frames_per_trigger": 1, - }, + {"num_points": 500, "frames_per_trigger": 1}, 0, {"acquisition": {"state": "FINISHED", "stats": {"n_write_completed": 500}}}, False, diff --git a/tests/test_falcon_csaxs.py b/tests/test_falcon_csaxs.py index b07915e..6154c77 100644 --- a/tests/test_falcon_csaxs.py +++ b/tests/test_falcon_csaxs.py @@ -1,14 +1,14 @@ # pylint: skip-file -import pytest -from unittest import mock -import threading import os +import threading +from unittest import mock + import ophyd +import pytest +from bec_lib import MessageEndpoints, messages from ophyd_devices.epics.devices.falcon_csaxs import FalconcSAXS, FalconTimeoutError - from tests.utils import DMMock, MockPV -from bec_lib import messages, MessageEndpoints def patch_dual_pvs(device): @@ -47,10 +47,7 @@ def mock_det(): @pytest.mark.parametrize( "trigger_source, mapping_source, ignore_gate, pixels_per_buffer, detector_state," " expected_exception", - [ - (1, 1, 0, 20, 0, False), - (1, 1, 0, 20, 1, True), - ], + [(1, 1, 0, 20, 0, False), (1, 1, 0, 20, 1, True)], ) # TODO rewrite this one, write test for init_detector, init_filewriter is tested def test_init_detector( @@ -94,13 +91,7 @@ def test_init_detector( @pytest.mark.parametrize( - "readout_time, expected_value", - [ - (1e-3, 3e-3), - (3e-3, 3e-3), - (5e-3, 5e-3), - (None, 3e-3), - ], + "readout_time, expected_value", [(1e-3, 3e-3), (3e-3, 3e-3), (5e-3, 5e-3), (None, 3e-3)] ) def test_update_readout_time(mock_det, readout_time, expected_value): if readout_time is None: @@ -131,10 +122,10 @@ def test_initialize_default_parameter(mock_det): "frames_per_trigger": 1, "exp_time": 0.1, "filepath": "test.h5", - "scanID": "123", + "scan_id": "123", "mokev": 12.4, } - ), + ) ], ) def test_stage(mock_det, scaninfo): @@ -203,13 +194,13 @@ def test_prepare_data_backend(mock_det, scaninfo): @pytest.mark.parametrize( "scaninfo", [ - ({"filepath": "test.h5", "successful": True, "done": False, "scanID": "123"}), - ({"filepath": "test.h5", "successful": False, "done": True, "scanID": "123"}), - ({"filepath": "test.h5", "successful": None, "done": True, "scanID": "123"}), + ({"filepath": "test.h5", "successful": True, "done": False, "scan_id": "123"}), + ({"filepath": "test.h5", "successful": False, "done": True, "scan_id": "123"}), + ({"filepath": "test.h5", "successful": None, "done": True, "scan_id": "123"}), ], ) def test_publish_file_location(mock_det, scaninfo): - mock_det.scaninfo.scanID = scaninfo["scanID"] + mock_det.scaninfo.scan_id = scaninfo["scan_id"] mock_det.filepath = scaninfo["filepath"] mock_det.custom_prepare.publish_file_location( done=scaninfo["done"], successful=scaninfo["successful"] @@ -222,7 +213,7 @@ def test_publish_file_location(mock_det, scaninfo): ) expected_calls = [ mock.call( - MessageEndpoints.public_file(scaninfo["scanID"], mock_det.name), + MessageEndpoints.public_file(scaninfo["scan_id"], mock_det.name), msg, pipe=mock_det.connector.pipeline.return_value, ), @@ -235,13 +226,7 @@ def test_publish_file_location(mock_det, scaninfo): assert mock_det.connector.set_and_publish.call_args_list == expected_calls -@pytest.mark.parametrize( - "detector_state, expected_exception", - [ - (1, False), - (0, True), - ], -) +@pytest.mark.parametrize("detector_state, expected_exception", [(1, False), (0, True)]) def test_arm_acquisition(mock_det, detector_state, expected_exception): with mock.patch.object(mock_det, "stop") as mock_stop: mock_det.state._read_pv.mock_data = detector_state @@ -261,24 +246,8 @@ def test_trigger(mock_det): mock_on_trigger.assert_called_once() -@pytest.mark.parametrize( - "stopped, expected_abort", - [ - ( - False, - False, - ), - ( - True, - True, - ), - ], -) -def test_unstage( - mock_det, - stopped, - expected_abort, -): +@pytest.mark.parametrize("stopped, expected_abort", [(False, False), (True, True)]) +def test_unstage(mock_det, stopped, expected_abort): with mock.patch.object(mock_det.custom_prepare, "finished") as mock_finished, mock.patch.object( mock_det.custom_prepare, "publish_file_location" ) as mock_publish_file_location: diff --git a/tests/test_grashopper_tomcat.py b/tests/test_grashopper_tomcat.py index d8ab5e0..1bbf2a1 100644 --- a/tests/test_grashopper_tomcat.py +++ b/tests/test_grashopper_tomcat.py @@ -1,20 +1,21 @@ # pylint: skip-file -import pytest from unittest import mock +import pytest + from ophyd_devices.epics.devices.grashopper_tomcat import ( - GrashopperTOMCATSetup, - AutoMode, - ImageMode, - DetectorState, - ImageBinning, - VideoMode, - PixelFormat, COLORMODE, - TriggerSource, - MemoryPolling, + AutoMode, + DetectorState, GrashopperError, GrashopperTimeoutError, + GrashopperTOMCATSetup, + ImageBinning, + ImageMode, + MemoryPolling, + PixelFormat, + TriggerSource, + VideoMode, ) @@ -38,7 +39,7 @@ def mock_GrashopperSetup(): @pytest.fixture( params=[ { - "scanID": "1234", + "scan_id": "1234", "scan_type": "step", "num_points": 500, "frames_per_trigger": 1, @@ -46,7 +47,7 @@ def mock_GrashopperSetup(): "readout_time": 0.1, }, { - "scanID": "1234", + "scan_id": "1234", "scan_type": "step", "num_points": 500, "frames_per_trigger": 5, @@ -54,7 +55,7 @@ def mock_GrashopperSetup(): "readout_time": 0, }, { - "scanID": "1234", + "scan_id": "1234", "scan_type": "fly", "num_points": 500, "frames_per_trigger": 1, @@ -62,7 +63,7 @@ def mock_GrashopperSetup(): "readout_time": 0.2, }, { - "scanID": "1234", + "scan_id": "1234", "scan_type": "fly", "num_points": 500, "frames_per_trigger": 5, diff --git a/tests/test_mcs_card.py b/tests/test_mcs_card.py index 841dac6..f519a19 100644 --- a/tests/test_mcs_card.py +++ b/tests/test_mcs_card.py @@ -1,8 +1,10 @@ # pylint: skip-file -import pytest -from unittest import mock import threading +from unittest import mock + import ophyd +import pytest +from bec_lib import MessageEndpoints, messages from ophyd_devices.epics.devices.mcs_csaxs import ( MCScSAXS, @@ -11,9 +13,7 @@ from ophyd_devices.epics.devices.mcs_csaxs import ( ReadoutMode, TriggerSource, ) - from tests.utils import DMMock, MockPV -from bec_lib import messages, MessageEndpoints def patch_dual_pvs(device): @@ -89,15 +89,11 @@ def test_init(): "count_on_start": 0, "stop_all": 1, }, - ), + ) ], ) def test_initialize_detector( - mock_det, - trigger_source, - channel_advance, - channel_source1, - pv_channels, + mock_det, trigger_source, channel_advance, channel_source1, pv_channels ): """Test the _init function: @@ -145,10 +141,7 @@ def test_progress_update(mock_det, value, num_lines, num_points, done): @pytest.mark.parametrize( "values, expected_nothing", - [ - ([[100, 120, 140], [200, 220, 240], [300, 320, 340]], False), - ([100, 200, 300], True), - ], + [([[100, 120, 140], [200, 220, 240], [300, 320, 340]], False), ([100, 200, 300], True)], ) def test_on_mca_data(mock_det, values, expected_nothing): """Test the on_mca_data function: @@ -171,15 +164,15 @@ def test_on_mca_data(mock_det, values, expected_nothing): "metadata, mca_data", [ ( - {"scanID": 123}, + {"scan_id": 123}, {"mca1": [100, 120, 140], "mca3": [200, 220, 240], "mca4": [300, 320, 340]}, - ), + ) ], ) def test_send_data_to_bec(mock_det, metadata, mca_data): mock_det.scaninfo.scan_msg = mock.MagicMock() mock_det.scaninfo.scan_msg.metadata = metadata - mock_det.scaninfo.scanID = metadata["scanID"] + mock_det.scaninfo.scan_id = metadata["scan_id"] mock_det.custom_prepare.mca_data = mca_data mock_det.custom_prepare._send_data_to_bec() device_metadata = mock_det.scaninfo.scan_msg.metadata @@ -187,7 +180,7 @@ def test_send_data_to_bec(mock_det, metadata, mca_data): data = messages.DeviceMessage(signals=dict(mca_data), metadata=device_metadata) calls = mock.call( topic=MessageEndpoints.device_async_readback( - scanID=metadata["scanID"], device=mock_det.name + scan_id=metadata["scan_id"], device=mock_det.name ), msg={"data": data}, expire=1800, @@ -200,54 +193,32 @@ def test_send_data_to_bec(mock_det, metadata, mca_data): "scaninfo, triggersource, stopped, expected_exception", [ ( - { - "num_points": 500, - "frames_per_trigger": 1, - "scan_type": "step", - }, + {"num_points": 500, "frames_per_trigger": 1, "scan_type": "step"}, TriggerSource.MODE3, False, False, ), ( - { - "num_points": 500, - "frames_per_trigger": 1, - "scan_type": "fly", - }, + {"num_points": 500, "frames_per_trigger": 1, "scan_type": "fly"}, TriggerSource.MODE3, False, False, ), ( - { - "num_points": 5001, - "frames_per_trigger": 2, - "scan_type": "step", - }, + {"num_points": 5001, "frames_per_trigger": 2, "scan_type": "step"}, TriggerSource.MODE3, False, True, ), ( - { - "num_points": 500, - "frames_per_trigger": 2, - "scan_type": "random", - }, + {"num_points": 500, "frames_per_trigger": 2, "scan_type": "random"}, TriggerSource.MODE3, False, True, ), ], ) -def test_stage( - mock_det, - scaninfo, - triggersource, - stopped, - expected_exception, -): +def test_stage(mock_det, scaninfo, triggersource, stopped, expected_exception): mock_det.scaninfo.num_points = scaninfo["num_points"] mock_det.scaninfo.frames_per_trigger = scaninfo["frames_per_trigger"] mock_det.scaninfo.scan_type = scaninfo["scan_type"] @@ -290,24 +261,8 @@ def test_prepare_detector_backend(mock_det): assert mock_det.read_mode.get() == ReadoutMode.EVENT -@pytest.mark.parametrize( - "stopped, expected_exception", - [ - ( - False, - False, - ), - ( - True, - True, - ), - ], -) -def test_unstage( - mock_det, - stopped, - expected_exception, -): +@pytest.mark.parametrize("stopped, expected_exception", [(False, False), (True, True)]) +def test_unstage(mock_det, stopped, expected_exception): with mock.patch.object(mock_det.custom_prepare, "finished") as mock_finished, mock.patch.object( mock_det.custom_prepare, "publish_file_location" ) as mock_publish_file_location: @@ -342,30 +297,10 @@ def test_stop(mock_det): @pytest.mark.parametrize( "stopped, acquisition_done, acquiring_state, expected_exception", [ - ( - False, - True, - 0, - False, - ), - ( - False, - False, - 0, - True, - ), - ( - False, - True, - 1, - True, - ), - ( - True, - True, - 0, - True, - ), + (False, True, 0, False), + (False, False, 0, True), + (False, True, 1, True), + (True, True, 0, True), ], ) def test_finished(mock_det, stopped, acquisition_done, acquiring_state, expected_exception): diff --git a/tests/test_pilatus_csaxs.py b/tests/test_pilatus_csaxs.py index 642e0e0..7c8929f 100644 --- a/tests/test_pilatus_csaxs.py +++ b/tests/test_pilatus_csaxs.py @@ -1,14 +1,13 @@ # pylint: skip-file import os -import pytest import threading from unittest import mock import ophyd +import pytest +from bec_lib import MessageEndpoints, messages -from bec_lib import messages, MessageEndpoints from ophyd_devices.epics.devices.pilatus_csaxs import PilatuscSAXS - from tests.utils import DMMock, MockPV @@ -45,18 +44,9 @@ def mock_det(): yield det -@pytest.mark.parametrize( - "trigger_source, detector_state", - [ - (1, 0), - ], -) +@pytest.mark.parametrize("trigger_source, detector_state", [(1, 0)]) # TODO rewrite this one, write test for init_detector, init_filewriter is tested -def test_init_detector( - mock_det, - trigger_source, - detector_state, -): +def test_init_detector(mock_det, trigger_source, detector_state): """Test the _init function: This includes testing the functions: @@ -82,7 +72,7 @@ def test_init_detector( "num_points": 500, "frames_per_trigger": 1, "filepath": "test.h5", - "scanID": "123", + "scan_id": "123", "mokev": 12.4, }, False, @@ -94,7 +84,7 @@ def test_init_detector( "num_points": 500, "frames_per_trigger": 1, "filepath": "test.h5", - "scanID": "123", + "scan_id": "123", "mokev": 12.4, }, True, @@ -102,12 +92,7 @@ def test_init_detector( ), ], ) -def test_stage( - mock_det, - scaninfo, - stopped, - expected_exception, -): +def test_stage(mock_det, scaninfo, stopped, expected_exception): with mock.patch.object( mock_det.custom_prepare, "publish_file_location" ) as mock_publish_file_location: @@ -145,13 +130,7 @@ def test_pre_scan(mock_det): @pytest.mark.parametrize( - "readout_time, expected_value", - [ - (1e-3, 3e-3), - (3e-3, 3e-3), - (5e-3, 5e-3), - (None, 3e-3), - ], + "readout_time, expected_value", [(1e-3, 3e-3), (3e-3, 3e-3), (5e-3, 5e-3), (None, 3e-3)] ) def test_update_readout_time(mock_det, readout_time, expected_value): if readout_time is None: @@ -172,7 +151,7 @@ def test_update_readout_time(mock_det, readout_time, expected_value): "filepath_raw": "test5_raw.h5", "successful": True, "done": False, - "scanID": "123", + "scan_id": "123", } ), ( @@ -181,7 +160,7 @@ def test_update_readout_time(mock_det, readout_time, expected_value): "filepath_raw": "test5_raw.h5", "successful": False, "done": True, - "scanID": "123", + "scan_id": "123", } ), ( @@ -190,13 +169,13 @@ def test_update_readout_time(mock_det, readout_time, expected_value): "filepath_raw": "test5_raw.h5", "successful": None, "done": True, - "scanID": "123", + "scan_id": "123", } ), ], ) def test_publish_file_location(mock_det, scaninfo): - mock_det.scaninfo.scanID = scaninfo["scanID"] + mock_det.scaninfo.scan_id = scaninfo["scan_id"] mock_det.filepath = scaninfo["filepath"] mock_det.filepath_raw = scaninfo["filepath_raw"] mock_det.custom_prepare.publish_file_location( @@ -217,7 +196,7 @@ def test_publish_file_location(mock_det, scaninfo): ) expected_calls = [ mock.call( - MessageEndpoints.public_file(scaninfo["scanID"], mock_det.name), + MessageEndpoints.public_file(scaninfo["scan_id"], mock_det.name), msg, pipe=mock_det.connector.pipeline.return_value, ), @@ -306,14 +285,7 @@ def test_stop_detector_backend(mock_det, requests_state, expected_exception, url "user": "e12345", }, ], - [ - "zmqWriter", - "e12345", - { - "frmCnt": 500, - "timeout": 2000, - }, - ], + ["zmqWriter", "e12345", {"frmCnt": 500, "timeout": 2000}], ], [ "http://x12sa-pd-2:8080/stream/pilatus_2", @@ -357,14 +329,7 @@ def test_stop_detector_backend(mock_det, requests_state, expected_exception, url "user": "e12345", }, ], - [ - "zmqWriter", - "e12345", - { - "frmCnt": 500, - "timeout": 2000, - }, - ], + ["zmqWriter", "e12345", {"frmCnt": 500, "timeout": 2000}], ], [ "http://x12sa-pd-2:8080/stream/pilatus_2", @@ -435,24 +400,8 @@ def test_prep_file_writer(mock_det, scaninfo, data_msgs, urls, requests_state, e assert call == mock_call -@pytest.mark.parametrize( - "stopped, expected_exception", - [ - ( - False, - False, - ), - ( - True, - True, - ), - ], -) -def test_unstage( - mock_det, - stopped, - expected_exception, -): +@pytest.mark.parametrize("stopped, expected_exception", [(False, False), (True, True)]) +def test_unstage(mock_det, stopped, expected_exception): with mock.patch.object(mock_det.custom_prepare, "finished") as mock_finished, mock.patch.object( mock_det.custom_prepare, "publish_file_location" ) as mock_publish_file_location: @@ -485,21 +434,9 @@ def test_stop(mock_det): @pytest.mark.parametrize( "stopped, mcs_stage_state, expected_exception", [ - ( - False, - ophyd.Staged.no, - False, - ), - ( - True, - ophyd.Staged.no, - True, - ), - ( - False, - ophyd.Staged.yes, - True, - ), + (False, ophyd.Staged.no, False), + (True, ophyd.Staged.no, True), + (False, ophyd.Staged.yes, True), ], ) def test_finished(mock_det, stopped, mcs_stage_state, expected_exception):