diff --git a/csaxs_bec/devices/epics/pilatus_csaxs.py b/csaxs_bec/devices/epics/pilatus_csaxs.py deleted file mode 100644 index 6a87369..0000000 --- a/csaxs_bec/devices/epics/pilatus_csaxs.py +++ /dev/null @@ -1,400 +0,0 @@ -import enum -import json -import os -import threading -import time - -import numpy as np -import requests -from bec_lib import bec_logger -from ophyd import ADComponent as ADCpt -from ophyd import Device, EpicsSignal, EpicsSignalRO, EpicsSignalWithRBV, Staged -from ophyd_devices.interfaces.base_classes.psi_detector_base import ( - CustomDetectorMixin, - PSIDetectorBase, -) - -logger = bec_logger.logger - - -class PilatusError(Exception): - """Base class for exceptions in this module.""" - - -class PilatusTimeoutError(PilatusError): - """Raised when the Pilatus does not respond in time during unstage.""" - - -class TriggerSource(enum.IntEnum): - """Trigger source options for the detector""" - - INTERNAL = 0 - EXT_ENABLE = 1 - EXT_TRIGGER = 2 - MULTI_TRIGGER = 3 - ALGINMENT = 4 - - -class SLSDetectorCam(Device): - """SLS Detector Camera - Pilatus - - Base class to map EPICS PVs to ophyd signals. - """ - - num_images = ADCpt(EpicsSignalWithRBV, "NumImages") - num_frames = ADCpt(EpicsSignalWithRBV, "NumExposures") - delay_time = ADCpt(EpicsSignalWithRBV, "NumExposures") - trigger_mode = ADCpt(EpicsSignalWithRBV, "TriggerMode") - acquire = ADCpt(EpicsSignal, "Acquire") - armed = ADCpt(EpicsSignalRO, "Armed") - - read_file_timeout = ADCpt(EpicsSignal, "ImageFileTmot") - detector_state = ADCpt(EpicsSignalRO, "StatusMessage_RBV") - status_message_camserver = ADCpt(EpicsSignalRO, "StringFromServer_RBV", string=True) - acquire_time = ADCpt(EpicsSignal, "AcquireTime") - acquire_period = ADCpt(EpicsSignal, "AcquirePeriod") - threshold_energy = ADCpt(EpicsSignalWithRBV, "ThresholdEnergy") - file_path = ADCpt(EpicsSignalWithRBV, "FilePath") - file_name = ADCpt(EpicsSignalWithRBV, "FileName") - file_number = ADCpt(EpicsSignalWithRBV, "FileNumber") - auto_increment = ADCpt(EpicsSignalWithRBV, "AutoIncrement") - file_template = ADCpt(EpicsSignalWithRBV, "FileTemplate") - file_format = ADCpt(EpicsSignalWithRBV, "FileNumber") - gap_fill = ADCpt(EpicsSignalWithRBV, "GapFill") - - -class PilatusSetup(CustomDetectorMixin): - """Pilatus setup class for cSAXS - - Parent class: CustomDetectorMixin - - """ - - def __init__(self, *args, parent: Device = None, **kwargs) -> None: - super().__init__(*args, parent=parent, **kwargs) - self._lock = threading.RLock() - - def on_init(self) -> None: - """Initialize the detector""" - self.initialize_default_parameter() - self.initialize_detector() - - def initialize_default_parameter(self) -> None: - """Set default parameters for Eiger9M detector""" - self.update_readout_time() - - def update_readout_time(self) -> None: - """Set readout time for Eiger9M detector""" - readout_time = ( - self.parent.scaninfo.readout_time - if hasattr(self.parent.scaninfo, "readout_time") - else self.parent.MIN_READOUT - ) - self.parent.readout_time = max(readout_time, self.parent.MIN_READOUT) - - def initialize_detector(self) -> None: - """Initialize detector""" - # Stops the detector - self.stop_detector() - # Sets the trigger source to GATING - self.parent.cam.trigger_mode.put(TriggerSource.EXT_ENABLE) - - def on_stage(self) -> None: - """Stage the detector for scan""" - self.prepare_detector() - self.prepare_data_backend() - self.publish_file_location( - done=False, successful=False, metadata={"input_path": self.parent.filepath_raw} - ) - - def prepare_detector(self) -> None: - """ - Prepare detector for scan. - - Includes checking the detector threshold, - setting the acquisition parameters and setting the trigger source - """ - self.set_detector_threshold() - self.set_acquisition_params() - self.parent.cam.trigger_mode.put(TriggerSource.EXT_ENABLE) - - def prepare_data_backend(self) -> None: - """ - Prepare the detector backend of pilatus for a scan - - A zmq service is running on xbl-daq-34 that is waiting - for a zmq message to start the writer for the pilatus_2 x12sa-pd-2 - - """ - - self.stop_detector_backend() - - self.parent.filepath.set( - self.parent.filewriter.compile_full_filename("pilatus_2.h5") - ).wait() - self.parent.cam.file_path.put("/dev/shm/zmq/") - self.parent.cam.file_name.put( - f"{self.parent.scaninfo.username}_2_{self.parent.scaninfo.scan_number:05d}" - ) - self.parent.cam.auto_increment.put(1) # auto increment - self.parent.cam.file_number.put(0) # first iter - self.parent.cam.file_format.put(0) # 0: TIFF - self.parent.cam.file_template.put("%s%s_%5.5d.cbf") - - # TODO better to remove hard coded path with link to home directory/pilatus_2 - basepath = f"/sls/X12SA/data/{self.parent.scaninfo.username}/Data10/pilatus_2/" - self.parent.filepath_raw = os.path.join( - basepath, - self.parent.filewriter.get_scan_directory(self.parent.scaninfo.scan_number, 1000, 5), - ) - # Make directory if needed - self.create_directory(self.parent.filepath_raw) - - headers = {"Content-Type": "application/json", "Accept": "application/json"} - # start the stream on x12sa-pd-2 - url = "http://x12sa-pd-2:8080/stream/pilatus_2" - data_msg = { - "source": [ - { - "searchPath": "/", - "searchPattern": "glob:*.cbf", - "destinationPath": self.parent.filepath_raw, - } - ] - } - res = self.send_requests_put(url=url, data=data_msg, headers=headers) - logger.info(f"{res.status_code} - {res.text} - {res.content}") - - if not res.ok: - res.raise_for_status() - - # start the data receiver on xbl-daq-34 - url = "http://xbl-daq-34:8091/pilatus_2/run" - data_msg = [ - "zmqWriter", - self.parent.scaninfo.username, - { - "addr": "tcp://x12sa-pd-2:8888", - "dst": ["file"], - "numFrm": int( - self.parent.scaninfo.num_points * self.parent.scaninfo.frames_per_trigger - ), - "timeout": 2000, - "ifType": "PULL", - "user": self.parent.scaninfo.username, - }, - ] - res = self.send_requests_put(url=url, data=data_msg, headers=headers) - logger.info(f"{res.status_code} - {res.text} - {res.content}") - - if not res.ok: - res.raise_for_status() - - # Wait for server to become available again - time.sleep(0.1) - logger.info(f"{res.status_code} -{res.text} - {res.content}") - - # Send requests.put to xbl-daq-34 to wait for data - url = "http://xbl-daq-34:8091/pilatus_2/wait" - data_msg = [ - "zmqWriter", - self.parent.scaninfo.username, - { - "frmCnt": int( - self.parent.scaninfo.num_points * self.parent.scaninfo.frames_per_trigger - ), - "timeout": 2000, - }, - ] - try: - res = self.send_requests_put(url=url, data=data_msg, headers=headers) - logger.info(f"{res}") - - if not res.ok: - res.raise_for_status() - except Exception as exc: - logger.info(f"Pilatus2 wait threw Exception: {exc}") - - def set_detector_threshold(self) -> None: - """ - Set correct detector threshold to 1/2 of current X-ray energy, allow 5% tolerance - - Threshold might be in ev or keV - """ - - # get current beam energy from device manageer - mokev = self.parent.device_manager.devices.mokev.obj.read()[ - self.parent.device_manager.devices.mokev.name - ]["value"] - factor = 1 - - # Check if energies are eV or keV, assume keV as the default - unit = getattr(self.parent.cam.threshold_energy, "units", None) - if unit is not None and unit == "eV": - factor = 1000 - - # set energy on detector - setpoint = int(mokev * factor) - - # set threshold on detector - threshold = self.parent.cam.threshold_energy.read()[self.parent.cam.threshold_energy.name][ - "value" - ] - if not np.isclose(setpoint / 2, threshold, rtol=0.05): - self.parent.cam.threshold_energy.set(setpoint / 2) - - def set_acquisition_params(self) -> None: - """Set acquisition parameters for the detector""" - - # Set number of images and frames (frames is for internal burst of detector) - self.parent.cam.num_images.put( - int(self.parent.scaninfo.num_points * self.parent.scaninfo.frames_per_trigger) - ) - self.parent.cam.num_frames.put(1) - - # Update the readout time of the detector - self.update_readout_time() - - def create_directory(self, filepath: str) -> None: - """Create directory if it does not exist""" - os.makedirs(filepath, exist_ok=True) - - def close_file_writer(self) -> None: - """ - Close the file writer for pilatus_2 - - Delete the data from x12sa-pd-2 - - """ - url = "http://x12sa-pd-2:8080/stream/pilatus_2" - try: - res = self.send_requests_delete(url=url) - if not res.ok: - res.raise_for_status() - except Exception as exc: - logger.info(f"Pilatus2 close threw Exception: {exc}") - - def stop_file_writer(self) -> None: - """ - Stop the file writer for pilatus_2 - - Runs on xbl-daq-34 - """ - url = "http://xbl-daq-34:8091/pilatus_2/stop" - res = self.send_requests_put(url=url) - if not res.ok: - res.raise_for_status() - - def send_requests_put(self, url: str, data: list = None, headers: dict = None) -> object: - """ - Send a put request to the given url - - Args: - url (str): url to send the request to - data (dict): data to be sent with the request (optional) - headers (dict): headers to be sent with the request (optional) - - Returns: - status code of the request - """ - return requests.put(url=url, data=json.dumps(data), headers=headers, timeout=5) - - def send_requests_delete(self, url: str, headers: dict = None) -> object: - """ - Send a delete request to the given url - - Args: - url (str): url to send the request to - headers (dict): headers to be sent with the request (optional) - - Returns: - status code of the request - """ - return requests.delete(url=url, headers=headers, timeout=5) - - def on_pre_scan(self) -> None: - """Prepare detector for scan""" - self.arm_acquisition() - - def arm_acquisition(self) -> None: - """Arms the detector for the acquisition""" - self.parent.cam.acquire.put(1) - # TODO is this sleep needed? to be tested with detector and for how long - time.sleep(0.5) - - def on_unstage(self) -> None: - """Unstage the detector""" - pass - - def on_complete(self) -> None: - """Complete the scan""" - self.finished(timeout=self.parent.TIMEOUT_FOR_SIGNALS) - self.publish_file_location( - done=True, successful=True, metadata={"input_path": self.parent.filepath_raw} - ) - - def finished(self, timeout: int = 5) -> None: - """Check if acquisition is finished.""" - # pylint: disable=protected-access - # TODO: at the moment this relies on device.mcs.obj._staged attribute - signal_conditions = [ - (lambda: self.parent.device_manager.devices.mcs.obj._staged, Staged.no) - ] - if not self.wait_for_signals( - signal_conditions=signal_conditions, - timeout=timeout, - check_stopped=True, - all_signals=True, - ): - raise PilatusTimeoutError( - f"Reached timeout with detector state {signal_conditions[0][0]}, std_daq state" - f" {signal_conditions[1][0]} and received frames of {signal_conditions[2][0]} for" - " the file writer" - ) - self.stop_detector() - self.stop_detector_backend() - - def on_stop(self) -> None: - """Stop detector""" - self.stop_detector() - self.stop_detector_backend() - - def stop_detector(self) -> None: - """Stop detector""" - self.parent.cam.acquire.put(0) - - def stop_detector_backend(self) -> None: - """Stop the file writer zmq service for pilatus_2""" - self.close_file_writer() - time.sleep(0.1) - self.stop_file_writer() - time.sleep(0.1) - - -class PilatuscSAXS(PSIDetectorBase): - """Pilatus_2 300k detector for CSAXS - - Parent class: PSIDetectorBase - - class attributes: - custom_prepare_cls (Eiger9MSetup) : Custom detector setup class for cSAXS, - inherits from CustomDetectorMixin - cam (SLSDetectorCam) : Detector camera - MIN_READOUT (float) : Minimum readout time for the detector - - """ - - # Specify which functions are revealed to the user in BEC client - USER_ACCESS = [] - - # specify Setup class - custom_prepare_cls = PilatusSetup - # specify minimum readout time for detector - MIN_READOUT = 3e-3 - TIMEOUT_FOR_SIGNALS = 5 - # specify class attributes - cam = ADCpt(SLSDetectorCam, "cam1:") - - -if __name__ == "__main__": - pilatus_2 = PilatuscSAXS(name="pilatus_2", prefix="X12SA-ES-PILATUS300K:", sim_mode=True) diff --git a/tests/tests_devices/test_pilatus_csaxs.py b/tests/tests_devices/test_pilatus_csaxs.py deleted file mode 100644 index 8b04372..0000000 --- a/tests/tests_devices/test_pilatus_csaxs.py +++ /dev/null @@ -1,449 +0,0 @@ -# pylint: skip-file -import os -import threading -from unittest import mock - -import ophyd -import pytest -from bec_lib import messages -from bec_lib.endpoints import MessageEndpoints -from bec_server.device_server.tests.utils import DMMock -from ophyd_devices.tests.utils import MockPV - -from csaxs_bec.devices.epics.pilatus_csaxs import PilatuscSAXS -from csaxs_bec.devices.tests_utils.utils import patch_dual_pvs - - -@pytest.fixture(scope="function") -def mock_det(): - name = "pilatus" - prefix = "X12SA-ES-PILATUS300K:" - dm = DMMock() - with mock.patch.object(dm, "connector"): - with ( - mock.patch("ophyd_devices.interfaces.base_classes.bec_device_base.FileWriter"), - mock.patch( - "ophyd_devices.interfaces.base_classes.psi_detector_base.PSIDetectorBase._update_service_config" - ), - ): - with mock.patch.object(ophyd, "cl") as mock_cl: - mock_cl.get_pv = MockPV - mock_cl.thread_class = threading.Thread - with mock.patch.object(PilatuscSAXS, "_init"): - det = PilatuscSAXS(name=name, prefix=prefix, device_manager=dm) - patch_dual_pvs(det) - yield det - - -@pytest.mark.parametrize("trigger_source, detector_state", [(1, 0)]) -# TODO rewrite this one, write test for init_detector, init_filewriter is tested -def test_init_detector(mock_det, trigger_source, detector_state): - """Test the _init function: - - This includes testing the functions: - - _init_detector - - _stop_det - - _set_trigger - --> Testing the filewriter is done in test_init_filewriter - - Validation upon setting the correct PVs - - """ - mock_det.custom_prepare.on_init() # call the method you want to test - assert mock_det.cam.acquire.get() == detector_state - assert mock_det.cam.trigger_mode.get() == trigger_source - - -@pytest.mark.parametrize( - "scaninfo, stopped, expected_exception", - [ - ( - { - "eacc": "e12345", - "num_points": 500, - "frames_per_trigger": 1, - "filepath": "test.h5", - "scan_id": "123", - "mokev": 12.4, - }, - False, - False, - ), - ( - { - "eacc": "e12345", - "num_points": 500, - "frames_per_trigger": 1, - "filepath": "test.h5", - "scan_id": "123", - "mokev": 12.4, - }, - True, - False, - ), - ], -) -def test_stage(mock_det, scaninfo, stopped, expected_exception): - path = "tmp" - mock_det.filepath_raw = path - with mock.patch.object( - mock_det.custom_prepare, "publish_file_location" - ) as mock_publish_file_location: - mock_det.scaninfo.num_points = scaninfo["num_points"] - mock_det.scaninfo.frames_per_trigger = scaninfo["frames_per_trigger"] - mock_det.filewriter.compile_full_filename.return_value = scaninfo["filepath"] - mock_det.device_manager.add_device("mokev", value=12.4) - mock_det.stopped = stopped - with ( - mock.patch.object(mock_det.custom_prepare, "prepare_data_backend") as mock_data_backend, - mock.patch.object( - mock_det.custom_prepare, "update_readout_time" - ) as mock_update_readout_time, - ): - mock_det.filepath.set(scaninfo["filepath"]).wait() - if expected_exception: - with pytest.raises(Exception): - mock_det.timeout = 0.1 - mock_det.stage() - else: - mock_det.stage() - mock_data_backend.assert_called_once() - mock_update_readout_time.assert_called() - # Check _prep_det - assert mock_det.cam.num_images.get() == int( - scaninfo["num_points"] * scaninfo["frames_per_trigger"] - ) - assert mock_det.cam.num_frames.get() == 1 - - mock_publish_file_location.assert_called_once_with( - done=False, successful=False, metadata={"input_path": path} - ) - - -def test_pre_scan(mock_det): - mock_det.custom_prepare.on_pre_scan() - assert mock_det.cam.acquire.get() == 1 - - -@pytest.mark.parametrize( - "readout_time, expected_value", [(1e-3, 3e-3), (3e-3, 3e-3), (5e-3, 5e-3), (None, 3e-3)] -) -def test_update_readout_time(mock_det, readout_time, expected_value): - if readout_time is None: - mock_det.custom_prepare.update_readout_time() - assert mock_det.readout_time == expected_value - else: - mock_det.scaninfo.readout_time = readout_time - mock_det.custom_prepare.update_readout_time() - assert mock_det.readout_time == expected_value - - -@pytest.mark.parametrize( - "scaninfo", - [ - ( - { - "filepath": "test.h5", - "filepath_raw": "test5_raw.h5", - "successful": True, - "done": False, - "scan_id": "123", - } - ), - ( - { - "filepath": "test.h5", - "filepath_raw": "test5_raw.h5", - "successful": False, - "done": True, - "scan_id": "123", - } - ), - ], -) -def test_publish_file_location(mock_det, scaninfo): - mock_det.scaninfo.scan_id = scaninfo["scan_id"] - mock_det.filepath.set(scaninfo["filepath"]).wait() - mock_det.filepath_raw = scaninfo["filepath_raw"] - mock_det.custom_prepare.publish_file_location( - done=scaninfo["done"], - successful=scaninfo["successful"], - metadata={"input_path": scaninfo["filepath_raw"]}, - ) - if scaninfo["successful"] is None: - msg = messages.FileMessage( - file_path=scaninfo["filepath"], - done=scaninfo["done"], - metadata={"input_path": scaninfo["filepath_raw"]}, - ) - else: - msg = messages.FileMessage( - file_path=scaninfo["filepath"], - done=scaninfo["done"], - metadata={"input_path": scaninfo["filepath_raw"]}, - successful=scaninfo["successful"], - ) - expected_calls = [ - mock.call( - MessageEndpoints.public_file(scaninfo["scan_id"], mock_det.name), - msg, - pipe=mock_det.connector.pipeline.return_value, - ), - mock.call( - MessageEndpoints.file_event(mock_det.name), - msg, - pipe=mock_det.connector.pipeline.return_value, - ), - ] - assert mock_det.connector.set_and_publish.call_args_list == expected_calls - - -@pytest.mark.parametrize( - "requests_state, expected_exception, url_delete, url_put", - [ - ( - True, - False, - "http://x12sa-pd-2:8080/stream/pilatus_2", - "http://xbl-daq-34:8091/pilatus_2/stop", - ), - ( - False, - False, - "http://x12sa-pd-2:8080/stream/pilatus_2", - "http://xbl-daq-34:8091/pilatus_2/stop", - ), - ], -) -def test_stop_detector_backend(mock_det, requests_state, expected_exception, url_delete, url_put): - with ( - mock.patch.object( - mock_det.custom_prepare, "send_requests_delete" - ) as mock_send_requests_delete, - mock.patch.object(mock_det.custom_prepare, "send_requests_put") as mock_send_requests_put, - ): - instance_delete = mock_send_requests_delete.return_value - instance_delete.ok = requests_state - instance_put = mock_send_requests_put.return_value - instance_put.ok = requests_state - if expected_exception: - mock_det.custom_prepare.stop_detector_backend() - mock_send_requests_delete.assert_called_once_with(url=url_delete) - mock_send_requests_put.assert_called_once_with(url=url_put) - instance_delete.raise_for_status.called_once() - instance_put.raise_for_status.called_once() - else: - mock_det.custom_prepare.stop_detector_backend() - mock_send_requests_delete.assert_called_once_with(url=url_delete) - mock_send_requests_put.assert_called_once_with(url=url_put) - - -@pytest.mark.parametrize( - "scaninfo, data_msgs, urls, requests_state, expected_exception", - [ - ( - { - "filepath_raw": "pilatus_2.h5", - "eacc": "e12345", - "scan_number": 1000, - "scan_directory": "S00000_00999", - "num_points": 500, - "frames_per_trigger": 1, - "headers": {"Content-Type": "application/json", "Accept": "application/json"}, - }, - [ - { - "source": [ - { - "searchPath": "/", - "searchPattern": "glob:*.cbf", - "destinationPath": ( - "/sls/X12SA/data/e12345/Data10/pilatus_2/S00000_00999" - ), - } - ] - }, - [ - "zmqWriter", - "e12345", - { - "addr": "tcp://x12sa-pd-2:8888", - "dst": ["file"], - "numFrm": 500, - "timeout": 2000, - "ifType": "PULL", - "user": "e12345", - }, - ], - ["zmqWriter", "e12345", {"frmCnt": 500, "timeout": 2000}], - ], - [ - "http://x12sa-pd-2:8080/stream/pilatus_2", - "http://xbl-daq-34:8091/pilatus_2/run", - "http://xbl-daq-34:8091/pilatus_2/wait", - ], - True, - False, - ), - ( - { - "filepath_raw": "pilatus_2.h5", - "eacc": "e12345", - "scan_number": 1000, - "scan_directory": "S00000_00999", - "num_points": 500, - "frames_per_trigger": 1, - "headers": {"Content-Type": "application/json", "Accept": "application/json"}, - }, - [ - { - "source": [ - { - "searchPath": "/", - "searchPattern": "glob:*.cbf", - "destinationPath": ( - "/sls/X12SA/data/e12345/Data10/pilatus_2/S00000_00999" - ), - } - ] - }, - [ - "zmqWriter", - "e12345", - { - "addr": "tcp://x12sa-pd-2:8888", - "dst": ["file"], - "numFrm": 500, - "timeout": 2000, - "ifType": "PULL", - "user": "e12345", - }, - ], - ["zmqWriter", "e12345", {"frmCnt": 500, "timeout": 2000}], - ], - [ - "http://x12sa-pd-2:8080/stream/pilatus_2", - "http://xbl-daq-34:8091/pilatus_2/run", - "http://xbl-daq-34:8091/pilatus_2/wait", - ], - False, # return of res.ok is False! - True, - ), - ], -) -def test_prep_file_writer(mock_det, scaninfo, data_msgs, urls, requests_state, expected_exception): - with ( - mock.patch.object(mock_det.custom_prepare, "close_file_writer") as mock_close_file_writer, - mock.patch.object(mock_det.custom_prepare, "stop_file_writer") as mock_stop_file_writer, - mock.patch.object(mock_det, "filewriter") as mock_filewriter, - mock.patch.object(mock_det.custom_prepare, "create_directory") as mock_create_directory, - mock.patch.object(mock_det.custom_prepare, "send_requests_put") as mock_send_requests_put, - ): - mock_det.scaninfo.scan_number = scaninfo["scan_number"] - mock_det.scaninfo.num_points = scaninfo["num_points"] - mock_det.scaninfo.frames_per_trigger = scaninfo["frames_per_trigger"] - mock_det.scaninfo.username = scaninfo["eacc"] - mock_filewriter.compile_full_filename.return_value = scaninfo["filepath_raw"] - mock_filewriter.get_scan_directory.return_value = scaninfo["scan_directory"] - instance = mock_send_requests_put.return_value - instance.ok = requests_state - instance.raise_for_status.side_effect = Exception - - if expected_exception: - with pytest.raises(Exception): - mock_det.timeout = 0.1 - mock_det.custom_prepare.prepare_data_backend() - mock_close_file_writer.assert_called_once() - mock_stop_file_writer.assert_called_once() - instance.raise_for_status.assert_called_once() - else: - mock_det.custom_prepare.prepare_data_backend() - - mock_close_file_writer.assert_called_once() - mock_stop_file_writer.assert_called_once() - - # Assert values set on detector - assert mock_det.cam.file_path.get() == "/dev/shm/zmq/" - assert ( - mock_det.cam.file_name.get() - == f"{scaninfo['eacc']}_2_{scaninfo['scan_number']:05d}" - ) - assert mock_det.cam.auto_increment.get() == 1 - assert mock_det.cam.file_number.get() == 0 - assert mock_det.cam.file_format.get() == 0 - assert mock_det.cam.file_template.get() == "%s%s_%5.5d.cbf" - # Remove last / from destinationPath - mock_create_directory.assert_called_once_with( - os.path.join(data_msgs[0]["source"][0]["destinationPath"]) - ) - assert mock_send_requests_put.call_count == 3 - - calls = [ - mock.call(url=url, data=data_msg, headers=scaninfo["headers"]) - for url, data_msg in zip(urls, data_msgs) - ] - for call, mock_call in zip(calls, mock_send_requests_put.call_args_list): - assert call == mock_call - - -def test_complete(mock_det): - path = "tmp" - mock_det.filepath_raw = path - with ( - mock.patch.object(mock_det.custom_prepare, "finished") as mock_finished, - mock.patch.object( - mock_det.custom_prepare, "publish_file_location" - ) as mock_publish_file_location, - ): - mock_det.complete() - assert mock_finished.call_count == 1 - call = mock.call(done=True, successful=True, metadata={"input_path": path}) - assert mock_publish_file_location.call_args == call - - -def test_stop(mock_det): - with ( - mock.patch.object(mock_det.custom_prepare, "stop_detector") as mock_stop_det, - mock.patch.object(mock_det.custom_prepare, "stop_file_writer") as mock_stop_file_writer, - mock.patch.object(mock_det.custom_prepare, "close_file_writer") as mock_close_file_writer, - ): - mock_det.stop() - mock_stop_det.assert_called_once() - mock_stop_file_writer.assert_called_once() - mock_close_file_writer.assert_called_once() - assert mock_det.stopped is True - - -@pytest.mark.parametrize( - "stopped, mcs_stage_state, expected_exception", - [ - (False, ophyd.Staged.no, False), - (True, ophyd.Staged.no, True), - (False, ophyd.Staged.yes, True), - ], -) -def test_finished(mock_det, stopped, mcs_stage_state, expected_exception): - with ( - mock.patch.object(mock_det, "device_manager") as mock_dm, - mock.patch.object(mock_det.custom_prepare, "stop_file_writer") as mock_stop_file_friter, - mock.patch.object(mock_det.custom_prepare, "stop_detector") as mock_stop_det, - mock.patch.object(mock_det.custom_prepare, "close_file_writer") as mock_close_file_writer, - ): - mock_dm.devices.mcs.obj._staged = mcs_stage_state - mock_det.stopped = stopped - if expected_exception: - with pytest.raises(Exception): - mock_det.timeout = 0.1 - mock_det.custom_prepare.finished() - assert mock_det.stopped is stopped - mock_stop_file_friter.assert_called() - mock_stop_det.assert_called_once() - mock_close_file_writer.assert_called_once() - else: - mock_det.custom_prepare.finished() - if stopped: - assert mock_det.stopped is stopped - - mock_stop_file_friter.assert_called() - mock_stop_det.assert_called_once() - mock_close_file_writer.assert_called_once()