feat(pilatus): deprecate pilatus integration
This commit is contained in:
@@ -1,400 +0,0 @@
|
||||
import enum
|
||||
import json
|
||||
import os
|
||||
import threading
|
||||
import time
|
||||
|
||||
import numpy as np
|
||||
import requests
|
||||
from bec_lib import bec_logger
|
||||
from ophyd import ADComponent as ADCpt
|
||||
from ophyd import Device, EpicsSignal, EpicsSignalRO, EpicsSignalWithRBV, Staged
|
||||
from ophyd_devices.interfaces.base_classes.psi_detector_base import (
|
||||
CustomDetectorMixin,
|
||||
PSIDetectorBase,
|
||||
)
|
||||
|
||||
logger = bec_logger.logger
|
||||
|
||||
|
||||
class PilatusError(Exception):
|
||||
"""Base class for exceptions in this module."""
|
||||
|
||||
|
||||
class PilatusTimeoutError(PilatusError):
|
||||
"""Raised when the Pilatus does not respond in time during unstage."""
|
||||
|
||||
|
||||
class TriggerSource(enum.IntEnum):
|
||||
"""Trigger source options for the detector"""
|
||||
|
||||
INTERNAL = 0
|
||||
EXT_ENABLE = 1
|
||||
EXT_TRIGGER = 2
|
||||
MULTI_TRIGGER = 3
|
||||
ALGINMENT = 4
|
||||
|
||||
|
||||
class SLSDetectorCam(Device):
|
||||
"""SLS Detector Camera - Pilatus
|
||||
|
||||
Base class to map EPICS PVs to ophyd signals.
|
||||
"""
|
||||
|
||||
num_images = ADCpt(EpicsSignalWithRBV, "NumImages")
|
||||
num_frames = ADCpt(EpicsSignalWithRBV, "NumExposures")
|
||||
delay_time = ADCpt(EpicsSignalWithRBV, "NumExposures")
|
||||
trigger_mode = ADCpt(EpicsSignalWithRBV, "TriggerMode")
|
||||
acquire = ADCpt(EpicsSignal, "Acquire")
|
||||
armed = ADCpt(EpicsSignalRO, "Armed")
|
||||
|
||||
read_file_timeout = ADCpt(EpicsSignal, "ImageFileTmot")
|
||||
detector_state = ADCpt(EpicsSignalRO, "StatusMessage_RBV")
|
||||
status_message_camserver = ADCpt(EpicsSignalRO, "StringFromServer_RBV", string=True)
|
||||
acquire_time = ADCpt(EpicsSignal, "AcquireTime")
|
||||
acquire_period = ADCpt(EpicsSignal, "AcquirePeriod")
|
||||
threshold_energy = ADCpt(EpicsSignalWithRBV, "ThresholdEnergy")
|
||||
file_path = ADCpt(EpicsSignalWithRBV, "FilePath")
|
||||
file_name = ADCpt(EpicsSignalWithRBV, "FileName")
|
||||
file_number = ADCpt(EpicsSignalWithRBV, "FileNumber")
|
||||
auto_increment = ADCpt(EpicsSignalWithRBV, "AutoIncrement")
|
||||
file_template = ADCpt(EpicsSignalWithRBV, "FileTemplate")
|
||||
file_format = ADCpt(EpicsSignalWithRBV, "FileNumber")
|
||||
gap_fill = ADCpt(EpicsSignalWithRBV, "GapFill")
|
||||
|
||||
|
||||
class PilatusSetup(CustomDetectorMixin):
|
||||
"""Pilatus setup class for cSAXS
|
||||
|
||||
Parent class: CustomDetectorMixin
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, *args, parent: Device = None, **kwargs) -> None:
|
||||
super().__init__(*args, parent=parent, **kwargs)
|
||||
self._lock = threading.RLock()
|
||||
|
||||
def on_init(self) -> None:
|
||||
"""Initialize the detector"""
|
||||
self.initialize_default_parameter()
|
||||
self.initialize_detector()
|
||||
|
||||
def initialize_default_parameter(self) -> None:
|
||||
"""Set default parameters for Eiger9M detector"""
|
||||
self.update_readout_time()
|
||||
|
||||
def update_readout_time(self) -> None:
|
||||
"""Set readout time for Eiger9M detector"""
|
||||
readout_time = (
|
||||
self.parent.scaninfo.readout_time
|
||||
if hasattr(self.parent.scaninfo, "readout_time")
|
||||
else self.parent.MIN_READOUT
|
||||
)
|
||||
self.parent.readout_time = max(readout_time, self.parent.MIN_READOUT)
|
||||
|
||||
def initialize_detector(self) -> None:
|
||||
"""Initialize detector"""
|
||||
# Stops the detector
|
||||
self.stop_detector()
|
||||
# Sets the trigger source to GATING
|
||||
self.parent.cam.trigger_mode.put(TriggerSource.EXT_ENABLE)
|
||||
|
||||
def on_stage(self) -> None:
|
||||
"""Stage the detector for scan"""
|
||||
self.prepare_detector()
|
||||
self.prepare_data_backend()
|
||||
self.publish_file_location(
|
||||
done=False, successful=False, metadata={"input_path": self.parent.filepath_raw}
|
||||
)
|
||||
|
||||
def prepare_detector(self) -> None:
|
||||
"""
|
||||
Prepare detector for scan.
|
||||
|
||||
Includes checking the detector threshold,
|
||||
setting the acquisition parameters and setting the trigger source
|
||||
"""
|
||||
self.set_detector_threshold()
|
||||
self.set_acquisition_params()
|
||||
self.parent.cam.trigger_mode.put(TriggerSource.EXT_ENABLE)
|
||||
|
||||
def prepare_data_backend(self) -> None:
|
||||
"""
|
||||
Prepare the detector backend of pilatus for a scan
|
||||
|
||||
A zmq service is running on xbl-daq-34 that is waiting
|
||||
for a zmq message to start the writer for the pilatus_2 x12sa-pd-2
|
||||
|
||||
"""
|
||||
|
||||
self.stop_detector_backend()
|
||||
|
||||
self.parent.filepath.set(
|
||||
self.parent.filewriter.compile_full_filename("pilatus_2.h5")
|
||||
).wait()
|
||||
self.parent.cam.file_path.put("/dev/shm/zmq/")
|
||||
self.parent.cam.file_name.put(
|
||||
f"{self.parent.scaninfo.username}_2_{self.parent.scaninfo.scan_number:05d}"
|
||||
)
|
||||
self.parent.cam.auto_increment.put(1) # auto increment
|
||||
self.parent.cam.file_number.put(0) # first iter
|
||||
self.parent.cam.file_format.put(0) # 0: TIFF
|
||||
self.parent.cam.file_template.put("%s%s_%5.5d.cbf")
|
||||
|
||||
# TODO better to remove hard coded path with link to home directory/pilatus_2
|
||||
basepath = f"/sls/X12SA/data/{self.parent.scaninfo.username}/Data10/pilatus_2/"
|
||||
self.parent.filepath_raw = os.path.join(
|
||||
basepath,
|
||||
self.parent.filewriter.get_scan_directory(self.parent.scaninfo.scan_number, 1000, 5),
|
||||
)
|
||||
# Make directory if needed
|
||||
self.create_directory(self.parent.filepath_raw)
|
||||
|
||||
headers = {"Content-Type": "application/json", "Accept": "application/json"}
|
||||
# start the stream on x12sa-pd-2
|
||||
url = "http://x12sa-pd-2:8080/stream/pilatus_2"
|
||||
data_msg = {
|
||||
"source": [
|
||||
{
|
||||
"searchPath": "/",
|
||||
"searchPattern": "glob:*.cbf",
|
||||
"destinationPath": self.parent.filepath_raw,
|
||||
}
|
||||
]
|
||||
}
|
||||
res = self.send_requests_put(url=url, data=data_msg, headers=headers)
|
||||
logger.info(f"{res.status_code} - {res.text} - {res.content}")
|
||||
|
||||
if not res.ok:
|
||||
res.raise_for_status()
|
||||
|
||||
# start the data receiver on xbl-daq-34
|
||||
url = "http://xbl-daq-34:8091/pilatus_2/run"
|
||||
data_msg = [
|
||||
"zmqWriter",
|
||||
self.parent.scaninfo.username,
|
||||
{
|
||||
"addr": "tcp://x12sa-pd-2:8888",
|
||||
"dst": ["file"],
|
||||
"numFrm": int(
|
||||
self.parent.scaninfo.num_points * self.parent.scaninfo.frames_per_trigger
|
||||
),
|
||||
"timeout": 2000,
|
||||
"ifType": "PULL",
|
||||
"user": self.parent.scaninfo.username,
|
||||
},
|
||||
]
|
||||
res = self.send_requests_put(url=url, data=data_msg, headers=headers)
|
||||
logger.info(f"{res.status_code} - {res.text} - {res.content}")
|
||||
|
||||
if not res.ok:
|
||||
res.raise_for_status()
|
||||
|
||||
# Wait for server to become available again
|
||||
time.sleep(0.1)
|
||||
logger.info(f"{res.status_code} -{res.text} - {res.content}")
|
||||
|
||||
# Send requests.put to xbl-daq-34 to wait for data
|
||||
url = "http://xbl-daq-34:8091/pilatus_2/wait"
|
||||
data_msg = [
|
||||
"zmqWriter",
|
||||
self.parent.scaninfo.username,
|
||||
{
|
||||
"frmCnt": int(
|
||||
self.parent.scaninfo.num_points * self.parent.scaninfo.frames_per_trigger
|
||||
),
|
||||
"timeout": 2000,
|
||||
},
|
||||
]
|
||||
try:
|
||||
res = self.send_requests_put(url=url, data=data_msg, headers=headers)
|
||||
logger.info(f"{res}")
|
||||
|
||||
if not res.ok:
|
||||
res.raise_for_status()
|
||||
except Exception as exc:
|
||||
logger.info(f"Pilatus2 wait threw Exception: {exc}")
|
||||
|
||||
def set_detector_threshold(self) -> None:
|
||||
"""
|
||||
Set correct detector threshold to 1/2 of current X-ray energy, allow 5% tolerance
|
||||
|
||||
Threshold might be in ev or keV
|
||||
"""
|
||||
|
||||
# get current beam energy from device manageer
|
||||
mokev = self.parent.device_manager.devices.mokev.obj.read()[
|
||||
self.parent.device_manager.devices.mokev.name
|
||||
]["value"]
|
||||
factor = 1
|
||||
|
||||
# Check if energies are eV or keV, assume keV as the default
|
||||
unit = getattr(self.parent.cam.threshold_energy, "units", None)
|
||||
if unit is not None and unit == "eV":
|
||||
factor = 1000
|
||||
|
||||
# set energy on detector
|
||||
setpoint = int(mokev * factor)
|
||||
|
||||
# set threshold on detector
|
||||
threshold = self.parent.cam.threshold_energy.read()[self.parent.cam.threshold_energy.name][
|
||||
"value"
|
||||
]
|
||||
if not np.isclose(setpoint / 2, threshold, rtol=0.05):
|
||||
self.parent.cam.threshold_energy.set(setpoint / 2)
|
||||
|
||||
def set_acquisition_params(self) -> None:
|
||||
"""Set acquisition parameters for the detector"""
|
||||
|
||||
# Set number of images and frames (frames is for internal burst of detector)
|
||||
self.parent.cam.num_images.put(
|
||||
int(self.parent.scaninfo.num_points * self.parent.scaninfo.frames_per_trigger)
|
||||
)
|
||||
self.parent.cam.num_frames.put(1)
|
||||
|
||||
# Update the readout time of the detector
|
||||
self.update_readout_time()
|
||||
|
||||
def create_directory(self, filepath: str) -> None:
|
||||
"""Create directory if it does not exist"""
|
||||
os.makedirs(filepath, exist_ok=True)
|
||||
|
||||
def close_file_writer(self) -> None:
|
||||
"""
|
||||
Close the file writer for pilatus_2
|
||||
|
||||
Delete the data from x12sa-pd-2
|
||||
|
||||
"""
|
||||
url = "http://x12sa-pd-2:8080/stream/pilatus_2"
|
||||
try:
|
||||
res = self.send_requests_delete(url=url)
|
||||
if not res.ok:
|
||||
res.raise_for_status()
|
||||
except Exception as exc:
|
||||
logger.info(f"Pilatus2 close threw Exception: {exc}")
|
||||
|
||||
def stop_file_writer(self) -> None:
|
||||
"""
|
||||
Stop the file writer for pilatus_2
|
||||
|
||||
Runs on xbl-daq-34
|
||||
"""
|
||||
url = "http://xbl-daq-34:8091/pilatus_2/stop"
|
||||
res = self.send_requests_put(url=url)
|
||||
if not res.ok:
|
||||
res.raise_for_status()
|
||||
|
||||
def send_requests_put(self, url: str, data: list = None, headers: dict = None) -> object:
|
||||
"""
|
||||
Send a put request to the given url
|
||||
|
||||
Args:
|
||||
url (str): url to send the request to
|
||||
data (dict): data to be sent with the request (optional)
|
||||
headers (dict): headers to be sent with the request (optional)
|
||||
|
||||
Returns:
|
||||
status code of the request
|
||||
"""
|
||||
return requests.put(url=url, data=json.dumps(data), headers=headers, timeout=5)
|
||||
|
||||
def send_requests_delete(self, url: str, headers: dict = None) -> object:
|
||||
"""
|
||||
Send a delete request to the given url
|
||||
|
||||
Args:
|
||||
url (str): url to send the request to
|
||||
headers (dict): headers to be sent with the request (optional)
|
||||
|
||||
Returns:
|
||||
status code of the request
|
||||
"""
|
||||
return requests.delete(url=url, headers=headers, timeout=5)
|
||||
|
||||
def on_pre_scan(self) -> None:
|
||||
"""Prepare detector for scan"""
|
||||
self.arm_acquisition()
|
||||
|
||||
def arm_acquisition(self) -> None:
|
||||
"""Arms the detector for the acquisition"""
|
||||
self.parent.cam.acquire.put(1)
|
||||
# TODO is this sleep needed? to be tested with detector and for how long
|
||||
time.sleep(0.5)
|
||||
|
||||
def on_unstage(self) -> None:
|
||||
"""Unstage the detector"""
|
||||
pass
|
||||
|
||||
def on_complete(self) -> None:
|
||||
"""Complete the scan"""
|
||||
self.finished(timeout=self.parent.TIMEOUT_FOR_SIGNALS)
|
||||
self.publish_file_location(
|
||||
done=True, successful=True, metadata={"input_path": self.parent.filepath_raw}
|
||||
)
|
||||
|
||||
def finished(self, timeout: int = 5) -> None:
|
||||
"""Check if acquisition is finished."""
|
||||
# pylint: disable=protected-access
|
||||
# TODO: at the moment this relies on device.mcs.obj._staged attribute
|
||||
signal_conditions = [
|
||||
(lambda: self.parent.device_manager.devices.mcs.obj._staged, Staged.no)
|
||||
]
|
||||
if not self.wait_for_signals(
|
||||
signal_conditions=signal_conditions,
|
||||
timeout=timeout,
|
||||
check_stopped=True,
|
||||
all_signals=True,
|
||||
):
|
||||
raise PilatusTimeoutError(
|
||||
f"Reached timeout with detector state {signal_conditions[0][0]}, std_daq state"
|
||||
f" {signal_conditions[1][0]} and received frames of {signal_conditions[2][0]} for"
|
||||
" the file writer"
|
||||
)
|
||||
self.stop_detector()
|
||||
self.stop_detector_backend()
|
||||
|
||||
def on_stop(self) -> None:
|
||||
"""Stop detector"""
|
||||
self.stop_detector()
|
||||
self.stop_detector_backend()
|
||||
|
||||
def stop_detector(self) -> None:
|
||||
"""Stop detector"""
|
||||
self.parent.cam.acquire.put(0)
|
||||
|
||||
def stop_detector_backend(self) -> None:
|
||||
"""Stop the file writer zmq service for pilatus_2"""
|
||||
self.close_file_writer()
|
||||
time.sleep(0.1)
|
||||
self.stop_file_writer()
|
||||
time.sleep(0.1)
|
||||
|
||||
|
||||
class PilatuscSAXS(PSIDetectorBase):
|
||||
"""Pilatus_2 300k detector for CSAXS
|
||||
|
||||
Parent class: PSIDetectorBase
|
||||
|
||||
class attributes:
|
||||
custom_prepare_cls (Eiger9MSetup) : Custom detector setup class for cSAXS,
|
||||
inherits from CustomDetectorMixin
|
||||
cam (SLSDetectorCam) : Detector camera
|
||||
MIN_READOUT (float) : Minimum readout time for the detector
|
||||
|
||||
"""
|
||||
|
||||
# Specify which functions are revealed to the user in BEC client
|
||||
USER_ACCESS = []
|
||||
|
||||
# specify Setup class
|
||||
custom_prepare_cls = PilatusSetup
|
||||
# specify minimum readout time for detector
|
||||
MIN_READOUT = 3e-3
|
||||
TIMEOUT_FOR_SIGNALS = 5
|
||||
# specify class attributes
|
||||
cam = ADCpt(SLSDetectorCam, "cam1:")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
pilatus_2 = PilatuscSAXS(name="pilatus_2", prefix="X12SA-ES-PILATUS300K:", sim_mode=True)
|
||||
@@ -1,449 +0,0 @@
|
||||
# pylint: skip-file
|
||||
import os
|
||||
import threading
|
||||
from unittest import mock
|
||||
|
||||
import ophyd
|
||||
import pytest
|
||||
from bec_lib import messages
|
||||
from bec_lib.endpoints import MessageEndpoints
|
||||
from bec_server.device_server.tests.utils import DMMock
|
||||
from ophyd_devices.tests.utils import MockPV
|
||||
|
||||
from csaxs_bec.devices.epics.pilatus_csaxs import PilatuscSAXS
|
||||
from csaxs_bec.devices.tests_utils.utils import patch_dual_pvs
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def mock_det():
|
||||
name = "pilatus"
|
||||
prefix = "X12SA-ES-PILATUS300K:"
|
||||
dm = DMMock()
|
||||
with mock.patch.object(dm, "connector"):
|
||||
with (
|
||||
mock.patch("ophyd_devices.interfaces.base_classes.bec_device_base.FileWriter"),
|
||||
mock.patch(
|
||||
"ophyd_devices.interfaces.base_classes.psi_detector_base.PSIDetectorBase._update_service_config"
|
||||
),
|
||||
):
|
||||
with mock.patch.object(ophyd, "cl") as mock_cl:
|
||||
mock_cl.get_pv = MockPV
|
||||
mock_cl.thread_class = threading.Thread
|
||||
with mock.patch.object(PilatuscSAXS, "_init"):
|
||||
det = PilatuscSAXS(name=name, prefix=prefix, device_manager=dm)
|
||||
patch_dual_pvs(det)
|
||||
yield det
|
||||
|
||||
|
||||
@pytest.mark.parametrize("trigger_source, detector_state", [(1, 0)])
|
||||
# TODO rewrite this one, write test for init_detector, init_filewriter is tested
|
||||
def test_init_detector(mock_det, trigger_source, detector_state):
|
||||
"""Test the _init function:
|
||||
|
||||
This includes testing the functions:
|
||||
- _init_detector
|
||||
- _stop_det
|
||||
- _set_trigger
|
||||
--> Testing the filewriter is done in test_init_filewriter
|
||||
|
||||
Validation upon setting the correct PVs
|
||||
|
||||
"""
|
||||
mock_det.custom_prepare.on_init() # call the method you want to test
|
||||
assert mock_det.cam.acquire.get() == detector_state
|
||||
assert mock_det.cam.trigger_mode.get() == trigger_source
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"scaninfo, stopped, expected_exception",
|
||||
[
|
||||
(
|
||||
{
|
||||
"eacc": "e12345",
|
||||
"num_points": 500,
|
||||
"frames_per_trigger": 1,
|
||||
"filepath": "test.h5",
|
||||
"scan_id": "123",
|
||||
"mokev": 12.4,
|
||||
},
|
||||
False,
|
||||
False,
|
||||
),
|
||||
(
|
||||
{
|
||||
"eacc": "e12345",
|
||||
"num_points": 500,
|
||||
"frames_per_trigger": 1,
|
||||
"filepath": "test.h5",
|
||||
"scan_id": "123",
|
||||
"mokev": 12.4,
|
||||
},
|
||||
True,
|
||||
False,
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_stage(mock_det, scaninfo, stopped, expected_exception):
|
||||
path = "tmp"
|
||||
mock_det.filepath_raw = path
|
||||
with mock.patch.object(
|
||||
mock_det.custom_prepare, "publish_file_location"
|
||||
) as mock_publish_file_location:
|
||||
mock_det.scaninfo.num_points = scaninfo["num_points"]
|
||||
mock_det.scaninfo.frames_per_trigger = scaninfo["frames_per_trigger"]
|
||||
mock_det.filewriter.compile_full_filename.return_value = scaninfo["filepath"]
|
||||
mock_det.device_manager.add_device("mokev", value=12.4)
|
||||
mock_det.stopped = stopped
|
||||
with (
|
||||
mock.patch.object(mock_det.custom_prepare, "prepare_data_backend") as mock_data_backend,
|
||||
mock.patch.object(
|
||||
mock_det.custom_prepare, "update_readout_time"
|
||||
) as mock_update_readout_time,
|
||||
):
|
||||
mock_det.filepath.set(scaninfo["filepath"]).wait()
|
||||
if expected_exception:
|
||||
with pytest.raises(Exception):
|
||||
mock_det.timeout = 0.1
|
||||
mock_det.stage()
|
||||
else:
|
||||
mock_det.stage()
|
||||
mock_data_backend.assert_called_once()
|
||||
mock_update_readout_time.assert_called()
|
||||
# Check _prep_det
|
||||
assert mock_det.cam.num_images.get() == int(
|
||||
scaninfo["num_points"] * scaninfo["frames_per_trigger"]
|
||||
)
|
||||
assert mock_det.cam.num_frames.get() == 1
|
||||
|
||||
mock_publish_file_location.assert_called_once_with(
|
||||
done=False, successful=False, metadata={"input_path": path}
|
||||
)
|
||||
|
||||
|
||||
def test_pre_scan(mock_det):
|
||||
mock_det.custom_prepare.on_pre_scan()
|
||||
assert mock_det.cam.acquire.get() == 1
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"readout_time, expected_value", [(1e-3, 3e-3), (3e-3, 3e-3), (5e-3, 5e-3), (None, 3e-3)]
|
||||
)
|
||||
def test_update_readout_time(mock_det, readout_time, expected_value):
|
||||
if readout_time is None:
|
||||
mock_det.custom_prepare.update_readout_time()
|
||||
assert mock_det.readout_time == expected_value
|
||||
else:
|
||||
mock_det.scaninfo.readout_time = readout_time
|
||||
mock_det.custom_prepare.update_readout_time()
|
||||
assert mock_det.readout_time == expected_value
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"scaninfo",
|
||||
[
|
||||
(
|
||||
{
|
||||
"filepath": "test.h5",
|
||||
"filepath_raw": "test5_raw.h5",
|
||||
"successful": True,
|
||||
"done": False,
|
||||
"scan_id": "123",
|
||||
}
|
||||
),
|
||||
(
|
||||
{
|
||||
"filepath": "test.h5",
|
||||
"filepath_raw": "test5_raw.h5",
|
||||
"successful": False,
|
||||
"done": True,
|
||||
"scan_id": "123",
|
||||
}
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_publish_file_location(mock_det, scaninfo):
|
||||
mock_det.scaninfo.scan_id = scaninfo["scan_id"]
|
||||
mock_det.filepath.set(scaninfo["filepath"]).wait()
|
||||
mock_det.filepath_raw = scaninfo["filepath_raw"]
|
||||
mock_det.custom_prepare.publish_file_location(
|
||||
done=scaninfo["done"],
|
||||
successful=scaninfo["successful"],
|
||||
metadata={"input_path": scaninfo["filepath_raw"]},
|
||||
)
|
||||
if scaninfo["successful"] is None:
|
||||
msg = messages.FileMessage(
|
||||
file_path=scaninfo["filepath"],
|
||||
done=scaninfo["done"],
|
||||
metadata={"input_path": scaninfo["filepath_raw"]},
|
||||
)
|
||||
else:
|
||||
msg = messages.FileMessage(
|
||||
file_path=scaninfo["filepath"],
|
||||
done=scaninfo["done"],
|
||||
metadata={"input_path": scaninfo["filepath_raw"]},
|
||||
successful=scaninfo["successful"],
|
||||
)
|
||||
expected_calls = [
|
||||
mock.call(
|
||||
MessageEndpoints.public_file(scaninfo["scan_id"], mock_det.name),
|
||||
msg,
|
||||
pipe=mock_det.connector.pipeline.return_value,
|
||||
),
|
||||
mock.call(
|
||||
MessageEndpoints.file_event(mock_det.name),
|
||||
msg,
|
||||
pipe=mock_det.connector.pipeline.return_value,
|
||||
),
|
||||
]
|
||||
assert mock_det.connector.set_and_publish.call_args_list == expected_calls
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"requests_state, expected_exception, url_delete, url_put",
|
||||
[
|
||||
(
|
||||
True,
|
||||
False,
|
||||
"http://x12sa-pd-2:8080/stream/pilatus_2",
|
||||
"http://xbl-daq-34:8091/pilatus_2/stop",
|
||||
),
|
||||
(
|
||||
False,
|
||||
False,
|
||||
"http://x12sa-pd-2:8080/stream/pilatus_2",
|
||||
"http://xbl-daq-34:8091/pilatus_2/stop",
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_stop_detector_backend(mock_det, requests_state, expected_exception, url_delete, url_put):
|
||||
with (
|
||||
mock.patch.object(
|
||||
mock_det.custom_prepare, "send_requests_delete"
|
||||
) as mock_send_requests_delete,
|
||||
mock.patch.object(mock_det.custom_prepare, "send_requests_put") as mock_send_requests_put,
|
||||
):
|
||||
instance_delete = mock_send_requests_delete.return_value
|
||||
instance_delete.ok = requests_state
|
||||
instance_put = mock_send_requests_put.return_value
|
||||
instance_put.ok = requests_state
|
||||
if expected_exception:
|
||||
mock_det.custom_prepare.stop_detector_backend()
|
||||
mock_send_requests_delete.assert_called_once_with(url=url_delete)
|
||||
mock_send_requests_put.assert_called_once_with(url=url_put)
|
||||
instance_delete.raise_for_status.called_once()
|
||||
instance_put.raise_for_status.called_once()
|
||||
else:
|
||||
mock_det.custom_prepare.stop_detector_backend()
|
||||
mock_send_requests_delete.assert_called_once_with(url=url_delete)
|
||||
mock_send_requests_put.assert_called_once_with(url=url_put)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"scaninfo, data_msgs, urls, requests_state, expected_exception",
|
||||
[
|
||||
(
|
||||
{
|
||||
"filepath_raw": "pilatus_2.h5",
|
||||
"eacc": "e12345",
|
||||
"scan_number": 1000,
|
||||
"scan_directory": "S00000_00999",
|
||||
"num_points": 500,
|
||||
"frames_per_trigger": 1,
|
||||
"headers": {"Content-Type": "application/json", "Accept": "application/json"},
|
||||
},
|
||||
[
|
||||
{
|
||||
"source": [
|
||||
{
|
||||
"searchPath": "/",
|
||||
"searchPattern": "glob:*.cbf",
|
||||
"destinationPath": (
|
||||
"/sls/X12SA/data/e12345/Data10/pilatus_2/S00000_00999"
|
||||
),
|
||||
}
|
||||
]
|
||||
},
|
||||
[
|
||||
"zmqWriter",
|
||||
"e12345",
|
||||
{
|
||||
"addr": "tcp://x12sa-pd-2:8888",
|
||||
"dst": ["file"],
|
||||
"numFrm": 500,
|
||||
"timeout": 2000,
|
||||
"ifType": "PULL",
|
||||
"user": "e12345",
|
||||
},
|
||||
],
|
||||
["zmqWriter", "e12345", {"frmCnt": 500, "timeout": 2000}],
|
||||
],
|
||||
[
|
||||
"http://x12sa-pd-2:8080/stream/pilatus_2",
|
||||
"http://xbl-daq-34:8091/pilatus_2/run",
|
||||
"http://xbl-daq-34:8091/pilatus_2/wait",
|
||||
],
|
||||
True,
|
||||
False,
|
||||
),
|
||||
(
|
||||
{
|
||||
"filepath_raw": "pilatus_2.h5",
|
||||
"eacc": "e12345",
|
||||
"scan_number": 1000,
|
||||
"scan_directory": "S00000_00999",
|
||||
"num_points": 500,
|
||||
"frames_per_trigger": 1,
|
||||
"headers": {"Content-Type": "application/json", "Accept": "application/json"},
|
||||
},
|
||||
[
|
||||
{
|
||||
"source": [
|
||||
{
|
||||
"searchPath": "/",
|
||||
"searchPattern": "glob:*.cbf",
|
||||
"destinationPath": (
|
||||
"/sls/X12SA/data/e12345/Data10/pilatus_2/S00000_00999"
|
||||
),
|
||||
}
|
||||
]
|
||||
},
|
||||
[
|
||||
"zmqWriter",
|
||||
"e12345",
|
||||
{
|
||||
"addr": "tcp://x12sa-pd-2:8888",
|
||||
"dst": ["file"],
|
||||
"numFrm": 500,
|
||||
"timeout": 2000,
|
||||
"ifType": "PULL",
|
||||
"user": "e12345",
|
||||
},
|
||||
],
|
||||
["zmqWriter", "e12345", {"frmCnt": 500, "timeout": 2000}],
|
||||
],
|
||||
[
|
||||
"http://x12sa-pd-2:8080/stream/pilatus_2",
|
||||
"http://xbl-daq-34:8091/pilatus_2/run",
|
||||
"http://xbl-daq-34:8091/pilatus_2/wait",
|
||||
],
|
||||
False, # return of res.ok is False!
|
||||
True,
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_prep_file_writer(mock_det, scaninfo, data_msgs, urls, requests_state, expected_exception):
|
||||
with (
|
||||
mock.patch.object(mock_det.custom_prepare, "close_file_writer") as mock_close_file_writer,
|
||||
mock.patch.object(mock_det.custom_prepare, "stop_file_writer") as mock_stop_file_writer,
|
||||
mock.patch.object(mock_det, "filewriter") as mock_filewriter,
|
||||
mock.patch.object(mock_det.custom_prepare, "create_directory") as mock_create_directory,
|
||||
mock.patch.object(mock_det.custom_prepare, "send_requests_put") as mock_send_requests_put,
|
||||
):
|
||||
mock_det.scaninfo.scan_number = scaninfo["scan_number"]
|
||||
mock_det.scaninfo.num_points = scaninfo["num_points"]
|
||||
mock_det.scaninfo.frames_per_trigger = scaninfo["frames_per_trigger"]
|
||||
mock_det.scaninfo.username = scaninfo["eacc"]
|
||||
mock_filewriter.compile_full_filename.return_value = scaninfo["filepath_raw"]
|
||||
mock_filewriter.get_scan_directory.return_value = scaninfo["scan_directory"]
|
||||
instance = mock_send_requests_put.return_value
|
||||
instance.ok = requests_state
|
||||
instance.raise_for_status.side_effect = Exception
|
||||
|
||||
if expected_exception:
|
||||
with pytest.raises(Exception):
|
||||
mock_det.timeout = 0.1
|
||||
mock_det.custom_prepare.prepare_data_backend()
|
||||
mock_close_file_writer.assert_called_once()
|
||||
mock_stop_file_writer.assert_called_once()
|
||||
instance.raise_for_status.assert_called_once()
|
||||
else:
|
||||
mock_det.custom_prepare.prepare_data_backend()
|
||||
|
||||
mock_close_file_writer.assert_called_once()
|
||||
mock_stop_file_writer.assert_called_once()
|
||||
|
||||
# Assert values set on detector
|
||||
assert mock_det.cam.file_path.get() == "/dev/shm/zmq/"
|
||||
assert (
|
||||
mock_det.cam.file_name.get()
|
||||
== f"{scaninfo['eacc']}_2_{scaninfo['scan_number']:05d}"
|
||||
)
|
||||
assert mock_det.cam.auto_increment.get() == 1
|
||||
assert mock_det.cam.file_number.get() == 0
|
||||
assert mock_det.cam.file_format.get() == 0
|
||||
assert mock_det.cam.file_template.get() == "%s%s_%5.5d.cbf"
|
||||
# Remove last / from destinationPath
|
||||
mock_create_directory.assert_called_once_with(
|
||||
os.path.join(data_msgs[0]["source"][0]["destinationPath"])
|
||||
)
|
||||
assert mock_send_requests_put.call_count == 3
|
||||
|
||||
calls = [
|
||||
mock.call(url=url, data=data_msg, headers=scaninfo["headers"])
|
||||
for url, data_msg in zip(urls, data_msgs)
|
||||
]
|
||||
for call, mock_call in zip(calls, mock_send_requests_put.call_args_list):
|
||||
assert call == mock_call
|
||||
|
||||
|
||||
def test_complete(mock_det):
|
||||
path = "tmp"
|
||||
mock_det.filepath_raw = path
|
||||
with (
|
||||
mock.patch.object(mock_det.custom_prepare, "finished") as mock_finished,
|
||||
mock.patch.object(
|
||||
mock_det.custom_prepare, "publish_file_location"
|
||||
) as mock_publish_file_location,
|
||||
):
|
||||
mock_det.complete()
|
||||
assert mock_finished.call_count == 1
|
||||
call = mock.call(done=True, successful=True, metadata={"input_path": path})
|
||||
assert mock_publish_file_location.call_args == call
|
||||
|
||||
|
||||
def test_stop(mock_det):
|
||||
with (
|
||||
mock.patch.object(mock_det.custom_prepare, "stop_detector") as mock_stop_det,
|
||||
mock.patch.object(mock_det.custom_prepare, "stop_file_writer") as mock_stop_file_writer,
|
||||
mock.patch.object(mock_det.custom_prepare, "close_file_writer") as mock_close_file_writer,
|
||||
):
|
||||
mock_det.stop()
|
||||
mock_stop_det.assert_called_once()
|
||||
mock_stop_file_writer.assert_called_once()
|
||||
mock_close_file_writer.assert_called_once()
|
||||
assert mock_det.stopped is True
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"stopped, mcs_stage_state, expected_exception",
|
||||
[
|
||||
(False, ophyd.Staged.no, False),
|
||||
(True, ophyd.Staged.no, True),
|
||||
(False, ophyd.Staged.yes, True),
|
||||
],
|
||||
)
|
||||
def test_finished(mock_det, stopped, mcs_stage_state, expected_exception):
|
||||
with (
|
||||
mock.patch.object(mock_det, "device_manager") as mock_dm,
|
||||
mock.patch.object(mock_det.custom_prepare, "stop_file_writer") as mock_stop_file_friter,
|
||||
mock.patch.object(mock_det.custom_prepare, "stop_detector") as mock_stop_det,
|
||||
mock.patch.object(mock_det.custom_prepare, "close_file_writer") as mock_close_file_writer,
|
||||
):
|
||||
mock_dm.devices.mcs.obj._staged = mcs_stage_state
|
||||
mock_det.stopped = stopped
|
||||
if expected_exception:
|
||||
with pytest.raises(Exception):
|
||||
mock_det.timeout = 0.1
|
||||
mock_det.custom_prepare.finished()
|
||||
assert mock_det.stopped is stopped
|
||||
mock_stop_file_friter.assert_called()
|
||||
mock_stop_det.assert_called_once()
|
||||
mock_close_file_writer.assert_called_once()
|
||||
else:
|
||||
mock_det.custom_prepare.finished()
|
||||
if stopped:
|
||||
assert mock_det.stopped is stopped
|
||||
|
||||
mock_stop_file_friter.assert_called()
|
||||
mock_stop_det.assert_called_once()
|
||||
mock_close_file_writer.assert_called_once()
|
||||
Reference in New Issue
Block a user