refactor: renamed scanID to scan_id

This commit is contained in:
wakonig_k 2024-03-13 21:09:12 +01:00
parent 46063b905b
commit 1c7737ccda
15 changed files with 226 additions and 588 deletions

View File

@ -92,7 +92,7 @@ class Eiger1p5MDetector(Device):
def stage(self) -> list[object]:
scan_msg = self._get_current_scan_msg()
self.metadata = {
"scanID": scan_msg.content["scanID"],
"scan_id": scan_msg.content["scan_id"],
"RID": scan_msg.content["info"]["RID"],
"queueID": scan_msg.content["info"]["queueID"],
}

View File

@ -1,14 +1,12 @@
from bec_lib import bec_logger
from ophyd import Component
from ophyd_devices.utils import bec_utils
from ophyd_devices.epics.devices.psi_delay_generator_base import (
PSIDelayGeneratorBase,
DDGCustomMixin,
PSIDelayGeneratorBase,
TriggerSource,
)
from bec_lib import bec_logger
from ophyd_devices.utils import bec_utils
logger = bec_logger.logger
@ -34,14 +32,10 @@ class DDGSetup(DDGCustomMixin):
self.parent.set_channels("offset", self.parent.offset.get())
# Setup reference
self.parent.set_channels(
"reference",
0,
[f"channel{pair}.ch1" for pair in self.parent.all_delay_pairs],
"reference", 0, [f"channel{pair}.ch1" for pair in self.parent.all_delay_pairs]
)
self.parent.set_channels(
"reference",
0,
[f"channel{pair}.ch2" for pair in self.parent.all_delay_pairs],
"reference", 0, [f"channel{pair}.ch2" for pair in self.parent.all_delay_pairs]
)
self.parent.set_trigger(getattr(TriggerSource, self.parent.set_trigger_source.get()))
# Set threshold level for ext. pulses
@ -157,15 +151,15 @@ class DDGSetup(DDGCustomMixin):
if self.parent.source.read()[self.parent.source.name]["value"] == TriggerSource.SINGLE_SHOT:
self.parent.trigger_shot.put(1)
def check_scanID(self) -> None:
def check_scan_id(self) -> None:
"""
Method to check if scanID has changed.
Method to check if scan_id has changed.
If yes, then it changes parent.stopped to True, which will stop further actions.
"""
old_scanID = self.parent.scaninfo.scanID
old_scan_id = self.parent.scaninfo.scan_id
self.parent.scaninfo.load_scan_metadata()
if self.parent.scaninfo.scanID != old_scanID:
if self.parent.scaninfo.scan_id != old_scan_id:
self.parent.stopped = True
def finished(self) -> None:
@ -211,17 +205,11 @@ class DelayGeneratorcSAXS(PSIDelayGeneratorBase):
custom_prepare_cls = DDGSetup
delay_burst = Component(
bec_utils.ConfigSignal,
name="delay_burst",
kind="config",
config_storage_name="ddg_config",
bec_utils.ConfigSignal, name="delay_burst", kind="config", config_storage_name="ddg_config"
)
delta_width = Component(
bec_utils.ConfigSignal,
name="delta_width",
kind="config",
config_storage_name="ddg_config",
bec_utils.ConfigSignal, name="delta_width", kind="config", config_storage_name="ddg_config"
)
additional_triggers = Component(
@ -232,10 +220,7 @@ class DelayGeneratorcSAXS(PSIDelayGeneratorBase):
)
polarity = Component(
bec_utils.ConfigSignal,
name="polarity",
kind="config",
config_storage_name="ddg_config",
bec_utils.ConfigSignal, name="polarity", kind="config", config_storage_name="ddg_config"
)
fixed_ttl_width = Component(
@ -246,17 +231,11 @@ class DelayGeneratorcSAXS(PSIDelayGeneratorBase):
)
amplitude = Component(
bec_utils.ConfigSignal,
name="amplitude",
kind="config",
config_storage_name="ddg_config",
bec_utils.ConfigSignal, name="amplitude", kind="config", config_storage_name="ddg_config"
)
offset = Component(
bec_utils.ConfigSignal,
name="offset",
kind="config",
config_storage_name="ddg_config",
bec_utils.ConfigSignal, name="offset", kind="config", config_storage_name="ddg_config"
)
thres_trig_level = Component(

View File

@ -1,23 +1,18 @@
import enum
import os
import threading
import time
import numpy as np
import os
from typing import Any
from ophyd import EpicsSignal, EpicsSignalRO, EpicsSignalWithRBV
from ophyd import Device
import numpy as np
from bec_lib import messages, threadlocked
from bec_lib.endpoints import MessageEndpoints
from bec_lib.logger import bec_logger
from ophyd import ADComponent as ADCpt
from ophyd import Device, EpicsSignal, EpicsSignalRO, EpicsSignalWithRBV
from std_daq_client import StdDaqClient
from bec_lib import threadlocked
from bec_lib.logger import bec_logger
from bec_lib import messages
from bec_lib.endpoints import MessageEndpoints
from ophyd_devices.epics.devices.psi_detector_base import PSIDetectorBase, CustomDetectorMixin
from ophyd_devices.epics.devices.psi_detector_base import CustomDetectorMixin, PSIDetectorBase
logger = bec_logger.logger
@ -78,16 +73,9 @@ class Eiger9MSetup(CustomDetectorMixin):
eacc = self.parent.scaninfo.username
self.update_std_cfg("writer_user_id", int(eacc.strip(" e")))
signal_conditions = [
(
lambda: self.std_client.get_status()["state"],
"READY",
),
]
signal_conditions = [(lambda: self.std_client.get_status()["state"], "READY")]
if not self.wait_for_signals(
signal_conditions=signal_conditions,
timeout=self.parent.timeout,
all_signals=True,
signal_conditions=signal_conditions, timeout=self.parent.timeout, all_signals=True
):
raise EigerTimeoutError(
f"Std client not in READY state, returns: {self.std_client.get_status()}"
@ -282,11 +270,11 @@ class Eiger9MSetup(CustomDetectorMixin):
f"Failed to arm the acquisition. Detector state {signal_conditions[0][0]}"
)
def check_scanID(self) -> None:
"""Checks if scanID has changed and stops the scan if it has"""
old_scanID = self.parent.scaninfo.scanID
def check_scan_id(self) -> None:
"""Checks if scan_id has changed and stops the scan if it has"""
old_scan_id = self.parent.scaninfo.scan_id
self.parent.scaninfo.load_scan_metadata()
if self.parent.scaninfo.scanID != old_scanID:
if self.parent.scaninfo.scan_id != old_scan_id:
self.parent.stopped = True
def publish_file_location(self, done: bool = False, successful: bool = None) -> None:
@ -309,7 +297,7 @@ class Eiger9MSetup(CustomDetectorMixin):
file_path=self.parent.filepath, done=done, successful=successful
)
self.parent.connector.set_and_publish(
MessageEndpoints.public_file(self.parent.scaninfo.scanID, self.parent.name),
MessageEndpoints.public_file(self.parent.scaninfo.scan_id, self.parent.name),
msg,
pipe=pipe,
)
@ -404,9 +392,7 @@ class Eiger9McSAXS(PSIDetectorBase):
"""
# Specify which functions are revealed to the user in BEC client
USER_ACCESS = [
"describe",
]
USER_ACCESS = ["describe"]
# specify Setup class
custom_prepare_cls = Eiger9MSetup

View File

@ -1,18 +1,14 @@
import enum
import os
from ophyd import EpicsSignal, EpicsSignalRO, EpicsSignalWithRBV, Component as Cpt
from ophyd.mca import EpicsMCARecord
from ophyd import Device
from bec_lib.endpoints import MessageEndpoints
from bec_lib import messages
from bec_lib.endpoints import MessageEndpoints
from bec_lib.logger import bec_logger
from ophyd import Component as Cpt
from ophyd import Device, EpicsSignal, EpicsSignalRO, EpicsSignalWithRBV
from ophyd.mca import EpicsMCARecord
from ophyd_devices.epics.devices.psi_detector_base import PSIDetectorBase, CustomDetectorMixin
from ophyd_devices.epics.devices.psi_detector_base import CustomDetectorMixin, PSIDetectorBase
logger = bec_logger.logger
@ -145,10 +141,7 @@ class FalconSetup(CustomDetectorMixin):
self.parent.erase_all.put(1)
signal_conditions = [
(
lambda: self.parent.state.read()[self.parent.state.name]["value"],
DetectorState.DONE,
),
(lambda: self.parent.state.read()[self.parent.state.name]["value"], DetectorState.DONE)
]
if not self.wait_for_signals(
@ -225,11 +218,11 @@ class FalconSetup(CustomDetectorMixin):
f"Failed to arm the acquisition. Detector state {signal_conditions[0][0]}"
)
def check_scanID(self) -> None:
"""Checks if scanID has changed and stops the scan if it has"""
old_scanID = self.parent.scaninfo.scanID
def check_scan_id(self) -> None:
"""Checks if scan_id has changed and stops the scan if it has"""
old_scan_id = self.parent.scaninfo.scan_id
self.parent.scaninfo.load_scan_metadata()
if self.parent.scaninfo.scanID != old_scanID:
if self.parent.scaninfo.scan_id != old_scan_id:
self.parent.stopped = True
def publish_file_location(self, done: bool = False, successful: bool = None) -> None:
@ -252,7 +245,7 @@ class FalconSetup(CustomDetectorMixin):
file_path=self.parent.filepath, done=done, successful=successful
)
self.parent.connector.set_and_publish(
MessageEndpoints.public_file(self.parent.scaninfo.scanID, self.parent.name),
MessageEndpoints.public_file(self.parent.scaninfo.scan_id, self.parent.name),
msg,
pipe=pipe,
)
@ -267,14 +260,8 @@ class FalconSetup(CustomDetectorMixin):
self.parent.scaninfo.num_points * self.parent.scaninfo.frames_per_trigger
)
signal_conditions = [
(
self.parent.dxp.current_pixel.get,
total_frames,
),
(
self.parent.hdf5.array_counter.get,
total_frames,
),
(self.parent.dxp.current_pixel.get, total_frames),
(self.parent.hdf5.array_counter.get, total_frames),
]
if not self.wait_for_signals(
signal_conditions=signal_conditions,
@ -308,9 +295,7 @@ class FalconcSAXS(PSIDetectorBase):
"""
# Specify which functions are revealed to the user in BEC client
USER_ACCESS = [
"describe",
]
USER_ACCESS = ["describe"]
# specify Setup class
custom_prepare_cls = FalconSetup

View File

@ -1,14 +1,14 @@
import enum
import threading
from collections import defaultdict
import numpy as np
from bec_lib import MessageEndpoints, bec_logger, messages, threadlocked
from ophyd import Component as Cpt
from ophyd import Device, EpicsSignal, EpicsSignalRO
from ophyd import EpicsSignal, EpicsSignalRO, Device, Component as Cpt
from ophyd_devices.epics.devices.psi_detector_base import CustomDetectorMixin, PSIDetectorBase
from ophyd_devices.utils import bec_utils
from ophyd_devices.epics.devices.psi_detector_base import PSIDetectorBase, CustomDetectorMixin
from bec_lib import messages, MessageEndpoints, bec_logger, threadlocked
logger = bec_logger.logger
@ -127,19 +127,13 @@ class MCSSetup(CustomDetectorMixin):
if self.parent.scaninfo.scan_msg is None:
return
metadata = self.parent.scaninfo.scan_msg.metadata
metadata.update(
{
"async_update": "append",
"num_lines": self.parent.num_lines.get(),
}
)
metadata.update({"async_update": "append", "num_lines": self.parent.num_lines.get()})
msg = messages.DeviceMessage(
signals=dict(self.mca_data),
metadata=self.parent.scaninfo.scan_msg.metadata,
signals=dict(self.mca_data), metadata=self.parent.scaninfo.scan_msg.metadata
)
self.parent.connector.xadd(
topic=MessageEndpoints.device_async_readback(
scanID=self.parent.scaninfo.scanID, device=self.parent.name
scan_id=self.parent.scaninfo.scan_id, device=self.parent.name
),
msg={"data": msg},
expire=self._stream_ttl,
@ -181,14 +175,8 @@ class MCSSetup(CustomDetectorMixin):
def finished(self) -> None:
"""Check if acquisition is finished, if not successful, rais MCSTimeoutError"""
signal_conditions = [
(
lambda: self.acquisition_done,
True,
),
(
self.parent.acquiring.get,
0, # Considering making a enum.Int class for this state
),
(lambda: self.acquisition_done, True),
(self.parent.acquiring.get, 0), # Considering making a enum.Int class for this state
]
if not self.wait_for_signals(
signal_conditions=signal_conditions,
@ -272,10 +260,7 @@ class MCScSAXS(PSIDetectorBase):
# Custom signal readout from device config
num_lines = Cpt(
bec_utils.ConfigSignal,
name="num_lines",
kind="config",
config_storage_name="mcs_config",
bec_utils.ConfigSignal, name="num_lines", kind="config", config_storage_name="mcs_config"
)
def __init__(
@ -292,9 +277,7 @@ class MCScSAXS(PSIDetectorBase):
mcs_config=None,
**kwargs,
):
self.mcs_config = {
f"{name}_num_lines": 1,
}
self.mcs_config = {f"{name}_num_lines": 1}
if mcs_config is not None:
# pylint: disable=expression-not-assigned
[self.mcs_config.update({f"{name}_{key}": value}) for key, value in mcs_config.items()]

View File

@ -2,16 +2,14 @@ import enum
import json
import os
import time
import requests
import numpy as np
from ophyd import EpicsSignal, EpicsSignalRO, EpicsSignalWithRBV
from ophyd import Device, Staged
import requests
from bec_lib import MessageEndpoints, bec_logger, messages
from ophyd import ADComponent as ADCpt
from ophyd import Device, EpicsSignal, EpicsSignalRO, EpicsSignalWithRBV, Staged
from bec_lib import messages, MessageEndpoints, bec_logger
from ophyd_devices.epics.devices.psi_detector_base import PSIDetectorBase, CustomDetectorMixin
from ophyd_devices.epics.devices.psi_detector_base import CustomDetectorMixin, PSIDetectorBase
logger = bec_logger.logger
@ -346,7 +344,7 @@ class PilatusSetup(CustomDetectorMixin):
metadata={"input_path": self.parent.filepath_raw},
)
self.parent.connector.set_and_publish(
MessageEndpoints.public_file(self.parent.scaninfo.scanID, self.parent.name),
MessageEndpoints.public_file(self.parent.scaninfo.scan_id, self.parent.name),
msg,
pipe=pipe,
)
@ -360,10 +358,7 @@ class PilatusSetup(CustomDetectorMixin):
# pylint: disable=protected-access
# TODO: at the moment this relies on device.mcs.obj._staged attribute
signal_conditions = [
(
lambda: self.parent.device_manager.devices.mcs.obj._staged,
Staged.no,
),
(lambda: self.parent.device_manager.devices.mcs.obj._staged, Staged.no)
]
if not self.wait_for_signals(
signal_conditions=signal_conditions,
@ -383,11 +378,11 @@ class PilatusSetup(CustomDetectorMixin):
"""Stop detector"""
self.parent.cam.acquire.put(0)
def check_scanID(self) -> None:
"""Checks if scanID has changed and stops the scan if it has"""
old_scanID = self.parent.scaninfo.scanID
def check_scan_id(self) -> None:
"""Checks if scan_id has changed and stops the scan if it has"""
old_scan_id = self.parent.scaninfo.scan_id
self.parent.scaninfo.load_scan_metadata()
if self.parent.scaninfo.scanID != old_scanID:
if self.parent.scaninfo.scan_id != old_scan_id:
self.parent.stopped = True
@ -405,9 +400,7 @@ class PilatuscSAXS(PSIDetectorBase):
"""
# Specify which functions are revealed to the user in BEC client
USER_ACCESS = [
"describe",
]
USER_ACCESS = ["describe"]
# specify Setup class
custom_prepare_cls = PilatusSetup

View File

@ -1,21 +1,28 @@
import time
import enum
import time
from typing import Any
from ophyd import Device, Component, EpicsSignal, EpicsSignalRO, Kind
from ophyd import PVPositioner, Signal, DeviceStatus
from ophyd.pseudopos import (
pseudo_position_argument,
real_position_argument,
PseudoSingle,
PseudoPositioner,
)
from ophyd.device import Staged
from ophyd_devices.utils.bec_scaninfo_mixin import BecScaninfoMixin
from ophyd_devices.utils import bec_utils
from bec_lib import bec_logger
from ophyd import (
Component,
Device,
DeviceStatus,
EpicsSignal,
EpicsSignalRO,
Kind,
PVPositioner,
Signal,
)
from ophyd.device import Staged
from ophyd.pseudopos import (
PseudoPositioner,
PseudoSingle,
pseudo_position_argument,
real_position_argument,
)
from ophyd_devices.utils import bec_utils
from ophyd_devices.utils.bec_scaninfo_mixin import BecScaninfoMixin
logger = bec_logger.logger
@ -71,18 +78,10 @@ class DelayStatic(Device):
kind=Kind.config,
)
amplitude = Component(
EpicsSignal,
"OutputAmpAI",
write_pv="OutputAmpAO",
name="amplitude",
kind=Kind.config,
EpicsSignal, "OutputAmpAI", write_pv="OutputAmpAO", name="amplitude", kind=Kind.config
)
offset = Component(
EpicsSignal,
"OutputOffsetAI",
write_pv="OutputOffsetAO",
name="offset",
kind=Kind.config,
EpicsSignal, "OutputOffsetAI", write_pv="OutputOffsetAO", name="offset", kind=Kind.config
)
@ -178,8 +177,8 @@ class DDGCustomMixin:
Example could be to open the shutter by triggering a pulse via pre_scan.
"""
def check_scanID(self) -> None:
"""Method to check if there is a new scanID, called by stage."""
def check_scan_id(self) -> None:
"""Method to check if there is a new scan_id, called by stage."""
def is_ddg_okay(self, raise_on_error=False) -> None:
"""
@ -246,13 +245,7 @@ class PSIDelayGeneratorBase(Device):
SUB_VALUE = "value"
_default_sub = SUB_VALUE
USER_ACCESS = [
"set_channels",
"_set_trigger",
"burst_enable",
"burst_disable",
"reload_config",
]
USER_ACCESS = ["set_channels", "_set_trigger", "burst_enable", "burst_disable", "reload_config"]
# Assign PVs from DDG645
trigger_burst_readout = Component(
@ -307,39 +300,19 @@ class PSIDelayGeneratorBase(Device):
)
trigger_shot = Component(EpicsSignal, "TriggerDelayBO", name="trigger_shot", kind="config")
burstMode = Component(
EpicsSignal,
"BurstModeBI",
write_pv="BurstModeBO",
name="burstmode",
kind=Kind.config,
EpicsSignal, "BurstModeBI", write_pv="BurstModeBO", name="burstmode", kind=Kind.config
)
burstConfig = Component(
EpicsSignal,
"BurstConfigBI",
write_pv="BurstConfigBO",
name="burstconfig",
kind=Kind.config,
EpicsSignal, "BurstConfigBI", write_pv="BurstConfigBO", name="burstconfig", kind=Kind.config
)
burstCount = Component(
EpicsSignal,
"BurstCountLI",
write_pv="BurstCountLO",
name="burstcount",
kind=Kind.config,
EpicsSignal, "BurstCountLI", write_pv="BurstCountLO", name="burstcount", kind=Kind.config
)
burstDelay = Component(
EpicsSignal,
"BurstDelayAI",
write_pv="BurstDelayAO",
name="burstdelay",
kind=Kind.config,
EpicsSignal, "BurstDelayAI", write_pv="BurstDelayAO", name="burstdelay", kind=Kind.config
)
burstPeriod = Component(
EpicsSignal,
"BurstPeriodAI",
write_pv="BurstPeriodAO",
name="burstperiod",
kind=Kind.config,
EpicsSignal, "BurstPeriodAI", write_pv="BurstPeriodAO", name="burstperiod", kind=Kind.config
)
def __init__(
@ -375,13 +348,7 @@ class PSIDelayGeneratorBase(Device):
self.name = name
self.scaninfo = None
self.timeout = 5
self.all_channels = [
"channelT0",
"channelAB",
"channelCD",
"channelEF",
"channelGH",
]
self.all_channels = ["channelT0", "channelAB", "channelCD", "channelEF", "channelGH"]
self.all_delay_pairs = ["AB", "CD", "EF", "GH"]
self.wait_for_connection(all_signals=True)
@ -446,10 +413,7 @@ class PSIDelayGeneratorBase(Device):
assert count > 0, "Number of bursts must be positive"
assert delay >= 0, "Burst delay must be larger than 0"
assert period > 0, "Burst period must be positive"
assert config in [
"all",
"first",
], "Supported burst configs are 'all' and 'first'"
assert config in ["all", "first"], "Supported burst configs are 'all' and 'first'"
self.burstMode.put(1)
self.burstCount.put(count)
@ -517,14 +481,14 @@ class PSIDelayGeneratorBase(Device):
Otherwise, checks if the DDG finished acquisition
Internal Calls:
- custom_prepare.check_scanID : check if scanID changed or detector stopped
- custom_prepare.check_scan_id : check if scan_id changed or detector stopped
- custom_prepare.finished : check if device finished acquisition (succesfully)
- is_ddg_okay : check if DDG is okay
Returns:
list(object): list of objects that were unstaged
"""
self.custom_prepare.check_scanID()
self.custom_prepare.check_scan_id()
if self.stopped is True:
return super().unstage()
self.custom_prepare.finished()

View File

@ -5,8 +5,9 @@ from bec_lib.device import DeviceStatus
from bec_lib.file_utils import FileWriterMixin
from ophyd import Device
from ophyd.device import Staged
from ophyd_devices.utils.bec_scaninfo_mixin import BecScaninfoMixin
from ophyd_devices.utils import bec_utils
from ophyd_devices.utils.bec_scaninfo_mixin import BecScaninfoMixin
class DetectorInitError(Exception):
@ -99,9 +100,9 @@ class CustomDetectorMixin:
DetectorTimeoutError: if detector cannot be stopped
"""
def check_scanID(self) -> None:
def check_scan_id(self) -> None:
"""
Check if BEC is running on a new scanID
Check if BEC is running on a new scan_id
"""
def publish_file_location(self, done: bool = False, successful: bool = None) -> None:
@ -299,14 +300,14 @@ class PSIDetectorBase(Device):
if data acquisition on device finished (an was successful)
Internal Calls:
- custom_prepare.check_scanID : check if scanID changed or detector stopped
- custom_prepare.check_scan_id : check if scan_id changed or detector stopped
- custom_prepare.finished : check if device finished acquisition (succesfully)
- custom_prepare.publish_file_location : publish file location to bec
Returns:
list(object): list of objects that were unstaged
"""
self.custom_prepare.check_scanID()
self.custom_prepare.check_scan_id()
if self.stopped is True:
return super().unstage()
self.custom_prepare.finished()

View File

@ -1,6 +1,6 @@
import getpass
from bec_lib import DeviceManagerBase, messages, MessageEndpoints, bec_logger
from bec_lib import DeviceManagerBase, MessageEndpoints, bec_logger, messages
logger = bec_logger.logger
@ -67,7 +67,7 @@ class BecScaninfoMixin:
self.device_manager = device_manager
self.sim_mode = sim_mode
self.scan_msg = None
self.scanID = None
self.scan_id = None
if bec_info_msg is None:
infomsgmock = BECInfoMsgMock()
self.bec_info_msg = infomsgmock.get_bec_info_msg()
@ -94,11 +94,7 @@ class BecScaninfoMixin:
return None
return msg
return messages.ScanStatusMessage(
scanID="1",
status={},
info=self.bec_info_msg,
)
return messages.ScanStatusMessage(scan_id="1", status={}, info=self.bec_info_msg)
def get_username(self) -> str:
"""Get username"""
@ -119,11 +115,11 @@ class BecScaninfoMixin:
logger.info(f"{self.scan_msg}")
try:
self.metadata = {
"scanID": scan_msg.content["scanID"],
"scan_id": scan_msg.content["scan_id"],
"RID": scan_msg.content["info"]["RID"],
"queueID": scan_msg.content["info"]["queueID"],
}
self.scanID = scan_msg.content["scanID"]
self.scan_id = scan_msg.content["scan_id"]
self.scan_number = scan_msg.content["info"]["scan_number"]
self.exp_time = scan_msg.content["info"]["exp_time"]
self.frames_per_trigger = scan_msg.content["info"]["frames_per_trigger"]

View File

@ -1,7 +1,8 @@
# pylint: skip-file
import pytest
from unittest import mock
import pytest
from ophyd_devices.epics.devices.delay_generator_csaxs import DDGSetup
from ophyd_devices.epics.devices.psi_delay_generator_base import TriggerSource
@ -26,7 +27,7 @@ def mock_DDGSetup():
@pytest.fixture(
params=[
{
"scanID": "1234",
"scan_id": "1234",
"scan_type": "step",
"num_points": 500,
"frames_per_trigger": 1,
@ -34,7 +35,7 @@ def mock_DDGSetup():
"readout_time": 0.1,
},
{
"scanID": "1234",
"scan_id": "1234",
"scan_type": "step",
"num_points": 500,
"frames_per_trigger": 5,
@ -42,7 +43,7 @@ def mock_DDGSetup():
"readout_time": 0,
},
{
"scanID": "1234",
"scan_id": "1234",
"scan_type": "fly",
"num_points": 500,
"frames_per_trigger": 1,
@ -50,7 +51,7 @@ def mock_DDGSetup():
"readout_time": 0.2,
},
{
"scanID": "1234",
"scan_id": "1234",
"scan_type": "fly",
"num_points": 500,
"frames_per_trigger": 5,
@ -135,8 +136,8 @@ def channel_pairs(request):
return request.param
def test_check_scanID(mock_DDGSetup, scaninfo, ddg_config_defaults, ddg_config_scan):
"""Test the check_scanID method."""
def test_check_scan_id(mock_DDGSetup, scaninfo, ddg_config_defaults, ddg_config_scan):
"""Test the check_scan_id method."""
# Set first attributes of parent class
for k, v in scaninfo.items():
setattr(mock_DDGSetup.parent.scaninfo, k, v)
@ -145,12 +146,12 @@ def test_check_scanID(mock_DDGSetup, scaninfo, ddg_config_defaults, ddg_config_s
for k, v in ddg_config_scan.items():
getattr(mock_DDGSetup.parent, k).get.return_value = v
# Call the function you want to test
mock_DDGSetup.check_scanID()
mock_DDGSetup.check_scan_id()
mock_DDGSetup.parent.scaninfo.load_scan_metadata.assert_called_once()
def test_on_pre_scan(mock_DDGSetup, scaninfo, ddg_config_defaults, ddg_config_scan):
"""Test the check_scanID method."""
"""Test the check_scan_id method."""
# Set first attributes of parent class
for k, v in scaninfo.items():
setattr(mock_DDGSetup.parent.scaninfo, k, v)
@ -185,11 +186,7 @@ def test_on_trigger(mock_DDGSetup, scaninfo, ddg_config_defaults, ddg_config_sca
def test_initialize_default_parameter(
mock_DDGSetup,
scaninfo,
ddg_config_defaults,
ddg_config_scan,
channel_pairs,
mock_DDGSetup, scaninfo, ddg_config_defaults, ddg_config_scan, channel_pairs
):
"""Test the initialize_default_parameter method."""
# Set first attributes of parent class
@ -229,13 +226,7 @@ def test_initialize_default_parameter(
mock_DDGSetup.parent.set_channels.assert_has_calls(calls)
def test_prepare_ddg(
mock_DDGSetup,
scaninfo,
ddg_config_defaults,
ddg_config_scan,
channel_pairs,
):
def test_prepare_ddg(mock_DDGSetup, scaninfo, ddg_config_defaults, ddg_config_scan, channel_pairs):
"""Test the prepare_ddg method."""
# Set first attributes of parent class
for k, v in scaninfo.items():

View File

@ -1,13 +1,12 @@
# pylint: skip-file
import pytest
import threading
from unittest import mock
import ophyd
import pytest
from bec_lib import MessageEndpoints, messages
from bec_lib import messages, MessageEndpoints
from ophyd_devices.epics.devices.eiger9m_csaxs import Eiger9McSAXS
from tests.utils import DMMock, MockPV
@ -72,26 +71,9 @@ def test_init():
@pytest.mark.parametrize(
"trigger_source, detector_state, expected_exception",
[
(
2,
1,
True,
),
(
2,
0,
False,
),
],
"trigger_source, detector_state, expected_exception", [(2, 1, True), (2, 0, False)]
)
def test_initialize_detector(
mock_det,
trigger_source,
detector_state,
expected_exception,
):
def test_initialize_detector(mock_det, trigger_source, detector_state, expected_exception):
"""Test the _init function:
This includes testing the functions:
@ -125,13 +107,7 @@ def test_trigger(mock_det):
@pytest.mark.parametrize(
"readout_time, expected_value",
[
(1e-3, 3e-3),
(3e-3, 3e-3),
(5e-3, 5e-3),
(None, 3e-3),
],
"readout_time, expected_value", [(1e-3, 3e-3), (3e-3, 3e-3), (5e-3, 5e-3), (None, 3e-3)]
)
def test_update_readout_time(mock_det, readout_time, expected_value):
if readout_time is None:
@ -146,34 +122,10 @@ def test_update_readout_time(mock_det, readout_time, expected_value):
@pytest.mark.parametrize(
"eacc, exp_url, daq_status, daq_cfg, expected_exception",
[
(
"e12345",
"http://xbl-daq-29:5000",
{"state": "READY"},
{"writer_user_id": 12543},
False,
),
(
"e12345",
"http://xbl-daq-29:5000",
{"state": "READY"},
{"writer_user_id": 15421},
False,
),
(
"e12345",
"http://xbl-daq-29:5000",
{"state": "BUSY"},
{"writer_user_id": 15421},
True,
),
(
"e12345",
"http://xbl-daq-29:5000",
{"state": "READY"},
{"writer_ud": 12345},
True,
),
("e12345", "http://xbl-daq-29:5000", {"state": "READY"}, {"writer_user_id": 12543}, False),
("e12345", "http://xbl-daq-29:5000", {"state": "READY"}, {"writer_user_id": 15421}, False),
("e12345", "http://xbl-daq-29:5000", {"state": "BUSY"}, {"writer_user_id": 15421}, True),
("e12345", "http://xbl-daq-29:5000", {"state": "READY"}, {"writer_ud": 12345}, True),
],
)
def test_initialize_detector_backend(
@ -215,7 +167,7 @@ def test_initialize_detector_backend(
"num_points": 500,
"frames_per_trigger": 1,
"filepath": "test.h5",
"scanID": "123",
"scan_id": "123",
"mokev": 12.4,
},
{"state": "READY"},
@ -230,7 +182,7 @@ def test_initialize_detector_backend(
"num_points": 500,
"frames_per_trigger": 1,
"filepath": "test.h5",
"scanID": "123",
"scan_id": "123",
"mokev": 12.4,
},
{"state": "BUSY"},
@ -245,7 +197,7 @@ def test_initialize_detector_backend(
"num_points": 500,
"frames_per_trigger": 1,
"filepath": "test.h5",
"scanID": "123",
"scan_id": "123",
"mokev": 18.4,
},
{"state": "READY"},
@ -257,13 +209,7 @@ def test_initialize_detector_backend(
],
)
def test_stage(
mock_det,
scaninfo,
daq_status,
daq_cfg,
detector_state,
stopped,
expected_exception,
mock_det, scaninfo, daq_status, daq_cfg, detector_state, stopped, expected_exception
):
with mock.patch.object(
mock_det.custom_prepare, "std_client"
@ -309,7 +255,7 @@ def test_stage(
"num_points": 500,
"frames_per_trigger": 1,
"filepath": "test.h5",
"scanID": "123",
"scan_id": "123",
},
{"state": "BUSY", "acquisition": {"state": "WAITING_IMAGES"}},
False,
@ -320,7 +266,7 @@ def test_stage(
"num_points": 500,
"frames_per_trigger": 1,
"filepath": "test.h5",
"scanID": "123",
"scan_id": "123",
},
{"state": "BUSY", "acquisition": {"state": "WAITING_IMAGES"}},
False,
@ -331,7 +277,7 @@ def test_stage(
"num_points": 500,
"frames_per_trigger": 1,
"filepath": "test.h5",
"scanID": "123",
"scan_id": "123",
},
{"state": "BUSY", "acquisition": {"state": "ERROR"}},
True,
@ -373,24 +319,8 @@ def test_prepare_detector_backend(mock_det, scaninfo, daq_status, expected_excep
mock_std_daq.start_writer_async.assert_called_with(daq_writer_call)
@pytest.mark.parametrize(
"stopped, expected_exception",
[
(
False,
False,
),
(
True,
True,
),
],
)
def test_unstage(
mock_det,
stopped,
expected_exception,
):
@pytest.mark.parametrize("stopped, expected_exception", [(False, False), (True, True)])
def test_unstage(mock_det, stopped, expected_exception):
with mock.patch.object(mock_det.custom_prepare, "finished") as mock_finished, mock.patch.object(
mock_det.custom_prepare, "publish_file_location"
) as mock_publish_file_location:
@ -416,13 +346,13 @@ def test_stop_detector_backend(mock_det):
@pytest.mark.parametrize(
"scaninfo",
[
({"filepath": "test.h5", "successful": True, "done": False, "scanID": "123"}),
({"filepath": "test.h5", "successful": False, "done": True, "scanID": "123"}),
({"filepath": "test.h5", "successful": None, "done": True, "scanID": "123"}),
({"filepath": "test.h5", "successful": True, "done": False, "scan_id": "123"}),
({"filepath": "test.h5", "successful": False, "done": True, "scan_id": "123"}),
({"filepath": "test.h5", "successful": None, "done": True, "scan_id": "123"}),
],
)
def test_publish_file_location(mock_det, scaninfo):
mock_det.scaninfo.scanID = scaninfo["scanID"]
mock_det.scaninfo.scan_id = scaninfo["scan_id"]
mock_det.filepath = scaninfo["filepath"]
mock_det.custom_prepare.publish_file_location(
done=scaninfo["done"], successful=scaninfo["successful"]
@ -435,7 +365,7 @@ def test_publish_file_location(mock_det, scaninfo):
)
expected_calls = [
mock.call(
MessageEndpoints.public_file(scaninfo["scanID"], mock_det.name),
MessageEndpoints.public_file(scaninfo["scan_id"], mock_det.name),
msg,
pipe=mock_det.connector.pipeline.return_value,
),
@ -465,40 +395,28 @@ def test_stop(mock_det):
[
(
False,
{
"num_points": 500,
"frames_per_trigger": 4,
},
{"num_points": 500, "frames_per_trigger": 4},
0,
{"acquisition": {"state": "FINISHED", "stats": {"n_write_completed": 2000}}},
False,
),
(
False,
{
"num_points": 500,
"frames_per_trigger": 4,
},
{"num_points": 500, "frames_per_trigger": 4},
0,
{"acquisition": {"state": "FINISHED", "stats": {"n_write_completed": 1999}}},
True,
),
(
False,
{
"num_points": 500,
"frames_per_trigger": 1,
},
{"num_points": 500, "frames_per_trigger": 1},
1,
{"acquisition": {"state": "READY", "stats": {"n_write_completed": 500}}},
True,
),
(
False,
{
"num_points": 500,
"frames_per_trigger": 1,
},
{"num_points": 500, "frames_per_trigger": 1},
0,
{"acquisition": {"state": "FINISHED", "stats": {"n_write_completed": 500}}},
False,

View File

@ -1,14 +1,14 @@
# pylint: skip-file
import pytest
from unittest import mock
import threading
import os
import threading
from unittest import mock
import ophyd
import pytest
from bec_lib import MessageEndpoints, messages
from ophyd_devices.epics.devices.falcon_csaxs import FalconcSAXS, FalconTimeoutError
from tests.utils import DMMock, MockPV
from bec_lib import messages, MessageEndpoints
def patch_dual_pvs(device):
@ -47,10 +47,7 @@ def mock_det():
@pytest.mark.parametrize(
"trigger_source, mapping_source, ignore_gate, pixels_per_buffer, detector_state,"
" expected_exception",
[
(1, 1, 0, 20, 0, False),
(1, 1, 0, 20, 1, True),
],
[(1, 1, 0, 20, 0, False), (1, 1, 0, 20, 1, True)],
)
# TODO rewrite this one, write test for init_detector, init_filewriter is tested
def test_init_detector(
@ -94,13 +91,7 @@ def test_init_detector(
@pytest.mark.parametrize(
"readout_time, expected_value",
[
(1e-3, 3e-3),
(3e-3, 3e-3),
(5e-3, 5e-3),
(None, 3e-3),
],
"readout_time, expected_value", [(1e-3, 3e-3), (3e-3, 3e-3), (5e-3, 5e-3), (None, 3e-3)]
)
def test_update_readout_time(mock_det, readout_time, expected_value):
if readout_time is None:
@ -131,10 +122,10 @@ def test_initialize_default_parameter(mock_det):
"frames_per_trigger": 1,
"exp_time": 0.1,
"filepath": "test.h5",
"scanID": "123",
"scan_id": "123",
"mokev": 12.4,
}
),
)
],
)
def test_stage(mock_det, scaninfo):
@ -203,13 +194,13 @@ def test_prepare_data_backend(mock_det, scaninfo):
@pytest.mark.parametrize(
"scaninfo",
[
({"filepath": "test.h5", "successful": True, "done": False, "scanID": "123"}),
({"filepath": "test.h5", "successful": False, "done": True, "scanID": "123"}),
({"filepath": "test.h5", "successful": None, "done": True, "scanID": "123"}),
({"filepath": "test.h5", "successful": True, "done": False, "scan_id": "123"}),
({"filepath": "test.h5", "successful": False, "done": True, "scan_id": "123"}),
({"filepath": "test.h5", "successful": None, "done": True, "scan_id": "123"}),
],
)
def test_publish_file_location(mock_det, scaninfo):
mock_det.scaninfo.scanID = scaninfo["scanID"]
mock_det.scaninfo.scan_id = scaninfo["scan_id"]
mock_det.filepath = scaninfo["filepath"]
mock_det.custom_prepare.publish_file_location(
done=scaninfo["done"], successful=scaninfo["successful"]
@ -222,7 +213,7 @@ def test_publish_file_location(mock_det, scaninfo):
)
expected_calls = [
mock.call(
MessageEndpoints.public_file(scaninfo["scanID"], mock_det.name),
MessageEndpoints.public_file(scaninfo["scan_id"], mock_det.name),
msg,
pipe=mock_det.connector.pipeline.return_value,
),
@ -235,13 +226,7 @@ def test_publish_file_location(mock_det, scaninfo):
assert mock_det.connector.set_and_publish.call_args_list == expected_calls
@pytest.mark.parametrize(
"detector_state, expected_exception",
[
(1, False),
(0, True),
],
)
@pytest.mark.parametrize("detector_state, expected_exception", [(1, False), (0, True)])
def test_arm_acquisition(mock_det, detector_state, expected_exception):
with mock.patch.object(mock_det, "stop") as mock_stop:
mock_det.state._read_pv.mock_data = detector_state
@ -261,24 +246,8 @@ def test_trigger(mock_det):
mock_on_trigger.assert_called_once()
@pytest.mark.parametrize(
"stopped, expected_abort",
[
(
False,
False,
),
(
True,
True,
),
],
)
def test_unstage(
mock_det,
stopped,
expected_abort,
):
@pytest.mark.parametrize("stopped, expected_abort", [(False, False), (True, True)])
def test_unstage(mock_det, stopped, expected_abort):
with mock.patch.object(mock_det.custom_prepare, "finished") as mock_finished, mock.patch.object(
mock_det.custom_prepare, "publish_file_location"
) as mock_publish_file_location:

View File

@ -1,20 +1,21 @@
# pylint: skip-file
import pytest
from unittest import mock
import pytest
from ophyd_devices.epics.devices.grashopper_tomcat import (
GrashopperTOMCATSetup,
AutoMode,
ImageMode,
DetectorState,
ImageBinning,
VideoMode,
PixelFormat,
COLORMODE,
TriggerSource,
MemoryPolling,
AutoMode,
DetectorState,
GrashopperError,
GrashopperTimeoutError,
GrashopperTOMCATSetup,
ImageBinning,
ImageMode,
MemoryPolling,
PixelFormat,
TriggerSource,
VideoMode,
)
@ -38,7 +39,7 @@ def mock_GrashopperSetup():
@pytest.fixture(
params=[
{
"scanID": "1234",
"scan_id": "1234",
"scan_type": "step",
"num_points": 500,
"frames_per_trigger": 1,
@ -46,7 +47,7 @@ def mock_GrashopperSetup():
"readout_time": 0.1,
},
{
"scanID": "1234",
"scan_id": "1234",
"scan_type": "step",
"num_points": 500,
"frames_per_trigger": 5,
@ -54,7 +55,7 @@ def mock_GrashopperSetup():
"readout_time": 0,
},
{
"scanID": "1234",
"scan_id": "1234",
"scan_type": "fly",
"num_points": 500,
"frames_per_trigger": 1,
@ -62,7 +63,7 @@ def mock_GrashopperSetup():
"readout_time": 0.2,
},
{
"scanID": "1234",
"scan_id": "1234",
"scan_type": "fly",
"num_points": 500,
"frames_per_trigger": 5,

View File

@ -1,8 +1,10 @@
# pylint: skip-file
import pytest
from unittest import mock
import threading
from unittest import mock
import ophyd
import pytest
from bec_lib import MessageEndpoints, messages
from ophyd_devices.epics.devices.mcs_csaxs import (
MCScSAXS,
@ -11,9 +13,7 @@ from ophyd_devices.epics.devices.mcs_csaxs import (
ReadoutMode,
TriggerSource,
)
from tests.utils import DMMock, MockPV
from bec_lib import messages, MessageEndpoints
def patch_dual_pvs(device):
@ -89,15 +89,11 @@ def test_init():
"count_on_start": 0,
"stop_all": 1,
},
),
)
],
)
def test_initialize_detector(
mock_det,
trigger_source,
channel_advance,
channel_source1,
pv_channels,
mock_det, trigger_source, channel_advance, channel_source1, pv_channels
):
"""Test the _init function:
@ -145,10 +141,7 @@ def test_progress_update(mock_det, value, num_lines, num_points, done):
@pytest.mark.parametrize(
"values, expected_nothing",
[
([[100, 120, 140], [200, 220, 240], [300, 320, 340]], False),
([100, 200, 300], True),
],
[([[100, 120, 140], [200, 220, 240], [300, 320, 340]], False), ([100, 200, 300], True)],
)
def test_on_mca_data(mock_det, values, expected_nothing):
"""Test the on_mca_data function:
@ -171,15 +164,15 @@ def test_on_mca_data(mock_det, values, expected_nothing):
"metadata, mca_data",
[
(
{"scanID": 123},
{"scan_id": 123},
{"mca1": [100, 120, 140], "mca3": [200, 220, 240], "mca4": [300, 320, 340]},
),
)
],
)
def test_send_data_to_bec(mock_det, metadata, mca_data):
mock_det.scaninfo.scan_msg = mock.MagicMock()
mock_det.scaninfo.scan_msg.metadata = metadata
mock_det.scaninfo.scanID = metadata["scanID"]
mock_det.scaninfo.scan_id = metadata["scan_id"]
mock_det.custom_prepare.mca_data = mca_data
mock_det.custom_prepare._send_data_to_bec()
device_metadata = mock_det.scaninfo.scan_msg.metadata
@ -187,7 +180,7 @@ def test_send_data_to_bec(mock_det, metadata, mca_data):
data = messages.DeviceMessage(signals=dict(mca_data), metadata=device_metadata)
calls = mock.call(
topic=MessageEndpoints.device_async_readback(
scanID=metadata["scanID"], device=mock_det.name
scan_id=metadata["scan_id"], device=mock_det.name
),
msg={"data": data},
expire=1800,
@ -200,54 +193,32 @@ def test_send_data_to_bec(mock_det, metadata, mca_data):
"scaninfo, triggersource, stopped, expected_exception",
[
(
{
"num_points": 500,
"frames_per_trigger": 1,
"scan_type": "step",
},
{"num_points": 500, "frames_per_trigger": 1, "scan_type": "step"},
TriggerSource.MODE3,
False,
False,
),
(
{
"num_points": 500,
"frames_per_trigger": 1,
"scan_type": "fly",
},
{"num_points": 500, "frames_per_trigger": 1, "scan_type": "fly"},
TriggerSource.MODE3,
False,
False,
),
(
{
"num_points": 5001,
"frames_per_trigger": 2,
"scan_type": "step",
},
{"num_points": 5001, "frames_per_trigger": 2, "scan_type": "step"},
TriggerSource.MODE3,
False,
True,
),
(
{
"num_points": 500,
"frames_per_trigger": 2,
"scan_type": "random",
},
{"num_points": 500, "frames_per_trigger": 2, "scan_type": "random"},
TriggerSource.MODE3,
False,
True,
),
],
)
def test_stage(
mock_det,
scaninfo,
triggersource,
stopped,
expected_exception,
):
def test_stage(mock_det, scaninfo, triggersource, stopped, expected_exception):
mock_det.scaninfo.num_points = scaninfo["num_points"]
mock_det.scaninfo.frames_per_trigger = scaninfo["frames_per_trigger"]
mock_det.scaninfo.scan_type = scaninfo["scan_type"]
@ -290,24 +261,8 @@ def test_prepare_detector_backend(mock_det):
assert mock_det.read_mode.get() == ReadoutMode.EVENT
@pytest.mark.parametrize(
"stopped, expected_exception",
[
(
False,
False,
),
(
True,
True,
),
],
)
def test_unstage(
mock_det,
stopped,
expected_exception,
):
@pytest.mark.parametrize("stopped, expected_exception", [(False, False), (True, True)])
def test_unstage(mock_det, stopped, expected_exception):
with mock.patch.object(mock_det.custom_prepare, "finished") as mock_finished, mock.patch.object(
mock_det.custom_prepare, "publish_file_location"
) as mock_publish_file_location:
@ -342,30 +297,10 @@ def test_stop(mock_det):
@pytest.mark.parametrize(
"stopped, acquisition_done, acquiring_state, expected_exception",
[
(
False,
True,
0,
False,
),
(
False,
False,
0,
True,
),
(
False,
True,
1,
True,
),
(
True,
True,
0,
True,
),
(False, True, 0, False),
(False, False, 0, True),
(False, True, 1, True),
(True, True, 0, True),
],
)
def test_finished(mock_det, stopped, acquisition_done, acquiring_state, expected_exception):

View File

@ -1,14 +1,13 @@
# pylint: skip-file
import os
import pytest
import threading
from unittest import mock
import ophyd
import pytest
from bec_lib import MessageEndpoints, messages
from bec_lib import messages, MessageEndpoints
from ophyd_devices.epics.devices.pilatus_csaxs import PilatuscSAXS
from tests.utils import DMMock, MockPV
@ -45,18 +44,9 @@ def mock_det():
yield det
@pytest.mark.parametrize(
"trigger_source, detector_state",
[
(1, 0),
],
)
@pytest.mark.parametrize("trigger_source, detector_state", [(1, 0)])
# TODO rewrite this one, write test for init_detector, init_filewriter is tested
def test_init_detector(
mock_det,
trigger_source,
detector_state,
):
def test_init_detector(mock_det, trigger_source, detector_state):
"""Test the _init function:
This includes testing the functions:
@ -82,7 +72,7 @@ def test_init_detector(
"num_points": 500,
"frames_per_trigger": 1,
"filepath": "test.h5",
"scanID": "123",
"scan_id": "123",
"mokev": 12.4,
},
False,
@ -94,7 +84,7 @@ def test_init_detector(
"num_points": 500,
"frames_per_trigger": 1,
"filepath": "test.h5",
"scanID": "123",
"scan_id": "123",
"mokev": 12.4,
},
True,
@ -102,12 +92,7 @@ def test_init_detector(
),
],
)
def test_stage(
mock_det,
scaninfo,
stopped,
expected_exception,
):
def test_stage(mock_det, scaninfo, stopped, expected_exception):
with mock.patch.object(
mock_det.custom_prepare, "publish_file_location"
) as mock_publish_file_location:
@ -145,13 +130,7 @@ def test_pre_scan(mock_det):
@pytest.mark.parametrize(
"readout_time, expected_value",
[
(1e-3, 3e-3),
(3e-3, 3e-3),
(5e-3, 5e-3),
(None, 3e-3),
],
"readout_time, expected_value", [(1e-3, 3e-3), (3e-3, 3e-3), (5e-3, 5e-3), (None, 3e-3)]
)
def test_update_readout_time(mock_det, readout_time, expected_value):
if readout_time is None:
@ -172,7 +151,7 @@ def test_update_readout_time(mock_det, readout_time, expected_value):
"filepath_raw": "test5_raw.h5",
"successful": True,
"done": False,
"scanID": "123",
"scan_id": "123",
}
),
(
@ -181,7 +160,7 @@ def test_update_readout_time(mock_det, readout_time, expected_value):
"filepath_raw": "test5_raw.h5",
"successful": False,
"done": True,
"scanID": "123",
"scan_id": "123",
}
),
(
@ -190,13 +169,13 @@ def test_update_readout_time(mock_det, readout_time, expected_value):
"filepath_raw": "test5_raw.h5",
"successful": None,
"done": True,
"scanID": "123",
"scan_id": "123",
}
),
],
)
def test_publish_file_location(mock_det, scaninfo):
mock_det.scaninfo.scanID = scaninfo["scanID"]
mock_det.scaninfo.scan_id = scaninfo["scan_id"]
mock_det.filepath = scaninfo["filepath"]
mock_det.filepath_raw = scaninfo["filepath_raw"]
mock_det.custom_prepare.publish_file_location(
@ -217,7 +196,7 @@ def test_publish_file_location(mock_det, scaninfo):
)
expected_calls = [
mock.call(
MessageEndpoints.public_file(scaninfo["scanID"], mock_det.name),
MessageEndpoints.public_file(scaninfo["scan_id"], mock_det.name),
msg,
pipe=mock_det.connector.pipeline.return_value,
),
@ -306,14 +285,7 @@ def test_stop_detector_backend(mock_det, requests_state, expected_exception, url
"user": "e12345",
},
],
[
"zmqWriter",
"e12345",
{
"frmCnt": 500,
"timeout": 2000,
},
],
["zmqWriter", "e12345", {"frmCnt": 500, "timeout": 2000}],
],
[
"http://x12sa-pd-2:8080/stream/pilatus_2",
@ -357,14 +329,7 @@ def test_stop_detector_backend(mock_det, requests_state, expected_exception, url
"user": "e12345",
},
],
[
"zmqWriter",
"e12345",
{
"frmCnt": 500,
"timeout": 2000,
},
],
["zmqWriter", "e12345", {"frmCnt": 500, "timeout": 2000}],
],
[
"http://x12sa-pd-2:8080/stream/pilatus_2",
@ -435,24 +400,8 @@ def test_prep_file_writer(mock_det, scaninfo, data_msgs, urls, requests_state, e
assert call == mock_call
@pytest.mark.parametrize(
"stopped, expected_exception",
[
(
False,
False,
),
(
True,
True,
),
],
)
def test_unstage(
mock_det,
stopped,
expected_exception,
):
@pytest.mark.parametrize("stopped, expected_exception", [(False, False), (True, True)])
def test_unstage(mock_det, stopped, expected_exception):
with mock.patch.object(mock_det.custom_prepare, "finished") as mock_finished, mock.patch.object(
mock_det.custom_prepare, "publish_file_location"
) as mock_publish_file_location:
@ -485,21 +434,9 @@ def test_stop(mock_det):
@pytest.mark.parametrize(
"stopped, mcs_stage_state, expected_exception",
[
(
False,
ophyd.Staged.no,
False,
),
(
True,
ophyd.Staged.no,
True,
),
(
False,
ophyd.Staged.yes,
True,
),
(False, ophyd.Staged.no, False),
(True, ophyd.Staged.no, True),
(False, ophyd.Staged.yes, True),
],
)
def test_finished(mock_det, stopped, mcs_stage_state, expected_exception):