refactor: refactored pilatus to psi_detector_base class and adapted tests

This commit is contained in:
appel_c 2023-11-17 11:16:46 +01:00
parent e8ec101f53
commit e9d9711aa7
5 changed files with 341 additions and 427 deletions

View File

@ -3,7 +3,7 @@ import time
import numpy as np import numpy as np
import os import os
from typing import Any from typing import Any, List
from ophyd import EpicsSignal, EpicsSignalRO, EpicsSignalWithRBV from ophyd import EpicsSignal, EpicsSignalRO, EpicsSignalWithRBV
from ophyd import Device from ophyd import Device
@ -414,6 +414,11 @@ class Eiger9McSAXS(PSIDetectorBase):
value = trigger_source value = trigger_source
self.cam.trigger_mode.put(value) self.cam.trigger_mode.put(value)
def stage(self) -> List[object]:
rtr = super().stage()
self.custom_prepare.arm_acquisition()
return rtr
if __name__ == "__main__": if __name__ == "__main__":
eiger = Eiger9McSAXS(name="eiger", prefix="X12SA-ES-EIGER9M:", sim_mode=True) eiger = Eiger9McSAXS(name="eiger", prefix="X12SA-ES-EIGER9M:", sim_mode=True)

View File

@ -1,5 +1,6 @@
import enum import enum
import os import os
from typing import List
from ophyd import EpicsSignal, EpicsSignalRO, EpicsSignalWithRBV, Component as Cpt from ophyd import EpicsSignal, EpicsSignalRO, EpicsSignalWithRBV, Component as Cpt
from ophyd.mca import EpicsMCARecord from ophyd.mca import EpicsMCARecord
@ -365,6 +366,11 @@ class FalconcSAXS(PSIDetectorBase):
self.pixel_advance_mode.put(trigger) self.pixel_advance_mode.put(trigger)
self.ignore_gate.put(ignore_gate) self.ignore_gate.put(ignore_gate)
def stage(self) -> List[object]:
rtr = super().stage()
self.custom_prepare.arm_acquisition()
return rtr
if __name__ == "__main__": if __name__ == "__main__":
falcon = FalconcSAXS(name="falcon", prefix="X12SA-SITORO:", sim_mode=True) falcon = FalconcSAXS(name="falcon", prefix="X12SA-SITORO:", sim_mode=True)

View File

@ -5,23 +5,19 @@ import time
import requests import requests
import numpy as np import numpy as np
from typing import List
from ophyd import EpicsSignal, EpicsSignalRO, EpicsSignalWithRBV from ophyd import EpicsSignal, EpicsSignalRO, EpicsSignalWithRBV
from ophyd import DetectorBase, Device, Staged from ophyd import Device, Staged
from ophyd import ADComponent as ADCpt from ophyd import ADComponent as ADCpt
from bec_lib import messages, MessageEndpoints, bec_logger from bec_lib import messages, MessageEndpoints, bec_logger
from bec_lib.file_utils import FileWriterMixin
from bec_lib.bec_service import SERVICE_CONFIG
from bec_lib.devicemanager import DeviceStatus
from ophyd_devices.utils import bec_utils as bec_utils from ophyd_devices.utils import bec_utils as bec_utils
from ophyd_devices.epics.devices.bec_scaninfo_mixin import BecScaninfoMixin
from ophyd_devices.epics.devices.psi_detector_base import PSIDetectorBase, CustomDetectorMixin
logger = bec_logger.logger logger = bec_logger.logger
PILATUS_MIN_READOUT = 3e-3 MIN_READOUT = 3e-3
class PilatusError(Exception): class PilatusError(Exception):
@ -36,14 +32,9 @@ class PilatusTimeoutError(PilatusError):
pass pass
class PilatusInitError(PilatusError):
"""Raised when initiation of the device class fails,
due to missing device manager or not started in sim_mode."""
pass
class TriggerSource(enum.IntEnum): class TriggerSource(enum.IntEnum):
"""Trigger source options for the detector"""
INTERNAL = 0 INTERNAL = 0
EXT_ENABLE = 1 EXT_ENABLE = 1
EXT_TRIGGER = 2 EXT_TRIGGER = 2
@ -79,204 +70,151 @@ class SLSDetectorCam(Device):
gap_fill = ADCpt(EpicsSignalWithRBV, "GapFill") gap_fill = ADCpt(EpicsSignalWithRBV, "GapFill")
class PilatuscSAXS(DetectorBase): class PilatusSetup(CustomDetectorMixin):
"""Pilatus_2 300k detector for CSAXS """Pilatus setup class for cSAXS
Parent class: DetectorBase Parent class: CustomDetectorMixin
Device class: PilatusDetectorCamEx
Attributes:
name str: 'pilatus_2'
prefix (str): PV prefix (X12SA-ES-PILATUS300K:)
""" """
# Specify which functions are revealed to the user in BEC client def initialize_default_parameter(self) -> None:
USER_ACCESS = [ """Set default parameters for Eiger9M detector"""
"describe", self.update_readout_time()
]
cam = ADCpt(SLSDetectorCam, "cam1:") def update_readout_time(self) -> None:
"""Set readout time for Eiger9M detector"""
def __init__(
self,
prefix="",
*,
name,
kind=None,
read_attrs=None,
configuration_attrs=None,
parent=None,
device_manager=None,
sim_mode=False,
**kwargs,
):
"""Initialize the Pilatus detector
Args:
#TODO add here the parameters for kind, read_attrs, configuration_attrs, parent
prefix (str): PV prefix ("X12SA-ES-PILATUS300K:)
name (str): 'pilatus_2'
kind (str):
read_attrs (list):
configuration_attrs (list):
parent (object):
device_manager (object): BEC device manager
sim_mode (bool): simulation mode to start the detector without BEC, e.g. from ipython shell
"""
super().__init__(
prefix=prefix,
name=name,
kind=kind,
read_attrs=read_attrs,
configuration_attrs=configuration_attrs,
parent=parent,
**kwargs,
)
if device_manager is None and not sim_mode:
raise PilatusInitError(
f"No device manager for device: {name}, and not started sim_mode: {sim_mode}. Add DeviceManager to initialization or init with sim_mode=True"
)
self.sim_mode = sim_mode
self._stopped = False
self.name = name
self.service_cfg = None
self.std_client = None
self.scaninfo = None
self.filewriter = None
self.readout_time_min = PILATUS_MIN_READOUT
self.timeout = 5
self.wait_for_connection(all_signals=True)
if not sim_mode:
self._update_service_config()
self.device_manager = device_manager
else:
self.device_manager = bec_utils.DMMock()
base_path = kwargs["basepath"] if "basepath" in kwargs else "~/Data10/"
self.service_cfg = {"base_path": os.path.expanduser(base_path)}
self._producer = self.device_manager.producer
self._update_scaninfo()
self._update_filewriter()
self._init()
def _update_filewriter(self) -> None:
"""Update filewriter with service config"""
self.filewriter = FileWriterMixin(self.service_cfg)
def _update_scaninfo(self) -> None:
"""Update scaninfo from BecScaninfoMixing
This depends on device manager and operation/sim_mode
"""
self.scaninfo = BecScaninfoMixin(self.device_manager, self.sim_mode)
self.scaninfo.load_scan_metadata()
def _update_service_config(self) -> None:
"""Update service config from BEC service config"""
self.service_cfg = SERVICE_CONFIG.config["service_config"]["file_writer"]
def _init(self) -> None:
"""Initialize detector, filewriter and set default parameters"""
self._default_parameter()
self._init_detector()
self._init_filewriter()
def _default_parameter(self) -> None:
"""Set default parameters for Pilatus300k detector
readout (float): readout time in seconds
"""
self._update_readout_time()
def _update_readout_time(self) -> None:
readout_time = ( readout_time = (
self.scaninfo.readout_time self.parent.scaninfo.readout_time
if hasattr(self.scaninfo, "readout_time") if hasattr(self.parent.scaninfo, "readout_time")
else self.readout_time_min else self.parent.readout_time_min
) )
self.readout_time = max(readout_time, self.readout_time_min) self.parent.readout_time = max(readout_time, self.parent.readout_time_min)
def _init_detector(self) -> None: def initialize_detector(self) -> None:
"""Initialize the detector""" """Initialize detector"""
# TODO add check if detector is running # Stops the detector
self._stop_det() self.stop_detector()
self._set_trigger(TriggerSource.EXT_ENABLE) # Sets the trigger source to GATING
self.parent.set_trigger(TriggerSource.EXT_ENABLE)
def _init_filewriter(self) -> None: def prepare_detector(self) -> None:
"""Initialize the file writer""" """
# TODO in case the data backend is rewritten, add check if it is ready! Prepare detector for scan.
pass
def _prep_det(self) -> None: Includes checking the detector threshold,
# TODO slow reaction, seemed to have timeout. setting the acquisition parameters and setting the trigger source
self._set_det_threshold() """
self._set_acquisition_params() self.set_detector_threshold()
self._set_trigger(TriggerSource.EXT_ENABLE) self.set_acquisition_params()
self.parent.set_trigger(TriggerSource.EXT_ENABLE)
def _set_det_threshold(self) -> None: def set_detector_threshold(self) -> None:
# threshold_energy PV exists on Eiger 9M? """
Set correct detector threshold to 1/2 of current X-ray energy, allow 5% tolerance
Threshold might be in ev or keV
"""
# get current beam energy from device manageer
mokev = self.parent.device_manager.devices.mokev.obj.read()[
self.parent.device_manager.devices.mokev.name
]["value"]
factor = 1 factor = 1
unit = getattr(self.cam.threshold_energy, "units", None)
# Check if energies are eV or keV, assume keV as the default
unit = getattr(self.parent.cam.threshold_energy, "units", None)
if unit != None and unit == "eV": if unit != None and unit == "eV":
factor = 1000 factor = 1000
setpoint = int(self.mokev * factor)
threshold = self.cam.threshold_energy.read()[self.cam.threshold_energy.name]["value"] # set energy on detector
setpoint = int(mokev * factor)
# set threshold on detector
threshold = self.parent.cam.threshold_energy.read()[self.parent.cam.threshold_energy.name][
"value"
]
if not np.isclose(setpoint / 2, threshold, rtol=0.05): if not np.isclose(setpoint / 2, threshold, rtol=0.05):
self.cam.threshold_energy.put(setpoint / 2) self.parent.cam.threshold_energy.set(setpoint / 2)
def _set_acquisition_params(self) -> None: def set_acquisition_params(self) -> None:
"""set acquisition parameters on the detector""" """Set acquisition parameters for the detector"""
# self.cam.acquire_time.set(self.exp_time)
# self.cam.acquire_period.set(self.exp_time + self.readout)
self.cam.num_images.put(int(self.scaninfo.num_points * self.scaninfo.frames_per_trigger))
self.cam.num_frames.put(1)
self._update_readout_time()
def _set_trigger(self, trigger_source: int) -> None: # Set number of images and frames (frames is for internal burst of detector)
"""Set trigger source for the detector, either directly to value or TriggerSource.* with self.parent.cam.num_images.put(
INTERNAL = 0 int(self.parent.scaninfo.num_points * self.parent.scaninfo.frames_per_trigger)
EXT_ENABLE = 1 )
EXT_TRIGGER = 2 self.parent.cam.num_frames.put(1)
MULTI_TRIGGER = 3
ALGINMENT = 4
"""
value = trigger_source
self.cam.trigger_mode.put(value)
def _create_directory(filepath: str) -> None: # Update the readout time of the detector
self.update_readout_time()
def create_directory(filepath: str) -> None:
"""Create directory if it does not exist""" """Create directory if it does not exist"""
os.makedirs(filepath, exist_ok=True) os.makedirs(filepath, exist_ok=True)
def _prep_file_writer(self) -> None: def stop_detector_backend(self) -> None:
self.close_file_writer()
time.sleep(0.1)
self.stop_file_writer()
time.sleep(0.1)
def close_file_writer(self) -> None:
""" """
Prepare the file writer for pilatus_2 Close the file writer for pilatus_2
Delete the data from x12sa-pd-2
"""
url = "http://x12sa-pd-2:8080/stream/pilatus_2"
try:
res = self.send_requests_delete(url=url)
if not res.ok:
res.raise_for_status()
except Exception as exc:
logger.info(f"Pilatus2 close threw Exception: {exc}")
def stop_file_writer(self) -> None:
"""
Stop the file writer for pilatus_2
Runs on xbl-daq-34
"""
url = "http://xbl-daq-34:8091/pilatus_2/stop"
res = self.send_requests_put(url=url)
if not res.ok:
res.raise_for_status()
def prepare_data_backend(self) -> None:
"""
Prepare the detector backend of pilatus for a scan
A zmq service is running on xbl-daq-34 that is waiting A zmq service is running on xbl-daq-34 that is waiting
for a zmq message to start the writer for the pilatus_2 x12sa-pd-2 for a zmq message to start the writer for the pilatus_2 x12sa-pd-2
""" """
# TODO explore required sleep time here
self._close_file_writer()
time.sleep(0.1)
self._stop_file_writer()
time.sleep(0.1)
self.filepath_raw = self.filewriter.compile_full_filename( self.stop_detector_backend()
self.scaninfo.scan_number, "pilatus_2.h5", 1000, 5, True
self.parent.filepath = self.parent.filewriter.compile_full_filename(
self.parent.scaninfo.scan_number, "pilatus_2.h5", 1000, 5, True
) )
self.cam.file_path.put(f"/dev/shm/zmq/") self.parent.cam.file_path.put(f"/dev/shm/zmq/")
self.cam.file_name.put(f"{self.scaninfo.username}_2_{self.scaninfo.scan_number:05d}") self.parent.cam.file_name.put(
self.cam.auto_increment.put(1) # auto increment f"{self.parent.scaninfo.username}_2_{self.parent.scaninfo.scan_number:05d}"
self.cam.file_number.put(0) # first iter )
self.cam.file_format.put(0) # 0: TIFF self.parent.cam.auto_increment.put(1) # auto increment
self.cam.file_template.put("%s%s_%5.5d.cbf") self.parent.cam.file_number.put(0) # first iter
self.parent.cam.file_format.put(0) # 0: TIFF
self.parent.cam.file_template.put("%s%s_%5.5d.cbf")
# TODO remove hardcoded filepath here # TODO better to remove hard coded path with link to home directory/pilatus_2
# compile filename basepath = f"/sls/X12SA/data/{self.parent.scaninfo.username}/Data10/pilatus_2/"
basepath = f"/sls/X12SA/data/{self.scaninfo.username}/Data10/pilatus_2/" self.parent.filepath_raw = os.path.join(
self.filepath = os.path.join(
basepath, basepath,
self.filewriter.get_scan_directory(self.scaninfo.scan_number, 1000, 5), self.parent.filewriter.get_scan_directory(self.parent.scaninfo.scan_number, 1000, 5),
) )
# Make directory if needed # Make directory if needed
self._create_directory(self.filepath) self.create_directory(self.parent.filepath_raw)
headers = {"Content-Type": "application/json", "Accept": "application/json"} headers = {"Content-Type": "application/json", "Accept": "application/json"}
# start the stream on x12sa-pd-2 # start the stream on x12sa-pd-2
@ -286,11 +224,11 @@ class PilatuscSAXS(DetectorBase):
{ {
"searchPath": "/", "searchPath": "/",
"searchPattern": "glob:*.cbf", "searchPattern": "glob:*.cbf",
"destinationPath": self.filepath, "destinationPath": self.parent.filepath_raw,
} }
] ]
} }
res = self._send_requests_put(url=url, data=data_msg, headers=headers) res = self.send_requests_put(url=url, data=data_msg, headers=headers)
logger.info(f"{res.status_code} - {res.text} - {res.content}") logger.info(f"{res.status_code} - {res.text} - {res.content}")
if not res.ok: if not res.ok:
@ -300,17 +238,19 @@ class PilatuscSAXS(DetectorBase):
url = "http://xbl-daq-34:8091/pilatus_2/run" url = "http://xbl-daq-34:8091/pilatus_2/run"
data_msg = [ data_msg = [
"zmqWriter", "zmqWriter",
self.scaninfo.username, self.parent.scaninfo.username,
{ {
"addr": "tcp://x12sa-pd-2:8888", "addr": "tcp://x12sa-pd-2:8888",
"dst": ["file"], "dst": ["file"],
"numFrm": int(self.scaninfo.num_points * self.scaninfo.frames_per_trigger), "numFrm": int(
self.parent.scaninfo.num_points * self.parent.scaninfo.frames_per_trigger
),
"timeout": 2000, "timeout": 2000,
"ifType": "PULL", "ifType": "PULL",
"user": self.scaninfo.username, "user": self.parent.scaninfo.username,
}, },
] ]
res = self._send_requests_put(url=url, data=data_msg, headers=headers) res = self.send_requests_put(url=url, data=data_msg, headers=headers)
logger.info(f"{res.status_code} - {res.text} - {res.content}") logger.info(f"{res.status_code} - {res.text} - {res.content}")
if not res.ok: if not res.ok:
@ -324,14 +264,16 @@ class PilatuscSAXS(DetectorBase):
url = "http://xbl-daq-34:8091/pilatus_2/wait" url = "http://xbl-daq-34:8091/pilatus_2/wait"
data_msg = [ data_msg = [
"zmqWriter", "zmqWriter",
self.scaninfo.username, self.parent.scaninfo.username,
{ {
"frmCnt": int(self.scaninfo.num_points * self.scaninfo.frames_per_trigger), "frmCnt": int(
self.parent.scaninfo.num_points * self.parent.scaninfo.frames_per_trigger
),
"timeout": 2000, "timeout": 2000,
}, },
] ]
try: try:
res = self._send_requests_put(url=url, data=data_msg, headers=headers) res = self.send_requests_put(url=url, data=data_msg, headers=headers)
logger.info(f"{res}") logger.info(f"{res}")
if not res.ok: if not res.ok:
@ -339,7 +281,7 @@ class PilatuscSAXS(DetectorBase):
except Exception as exc: except Exception as exc:
logger.info(f"Pilatus2 wait threw Exception: {exc}") logger.info(f"Pilatus2 wait threw Exception: {exc}")
def _send_requests_put(self, url: str, data_msg: list = None, headers: dict = None) -> object: def send_requests_put(self, url: str, data_msg: list = None, headers: dict = None) -> object:
""" """
Send a put request to the given url Send a put request to the given url
@ -353,7 +295,7 @@ class PilatuscSAXS(DetectorBase):
""" """
return requests.put(url=url, data=json.dumps(data_msg), headers=headers) return requests.put(url=url, data=json.dumps(data_msg), headers=headers)
def _send_requests_delete(self, url: str, headers: dict = None) -> object: def send_requests_delete(self, url: str, headers: dict = None) -> object:
""" """
Send a delete request to the given url Send a delete request to the given url
@ -366,76 +308,23 @@ class PilatuscSAXS(DetectorBase):
""" """
return requests.delete(url=url, headers=headers) return requests.delete(url=url, headers=headers)
def _close_file_writer(self) -> None:
"""
Close the file writer for pilatus_2
Delete the data from x12sa-pd-2
"""
url = "http://x12sa-pd-2:8080/stream/pilatus_2"
try:
res = self._send_requests_delete(url=url)
if not res.ok:
res.raise_for_status()
except Exception as exc:
logger.info(f"Pilatus2 close threw Exception: {exc}")
def _stop_file_writer(self) -> None:
"""
Stop the file writer for pilatus_2
Runs on xbl-daq-34
"""
url = "http://xbl-daq-34:8091/pilatus_2/stop"
res = self._send_requests_put(url=url)
if not res.ok:
res.raise_for_status()
def stage(self) -> List[object]:
"""Stage command, called from BEC in preparation of a scan.
This will iniate the preparation of detector and file writer.
The following functuions are called:
- _prep_file_writer
- _prep_det
- _publish_file_location
The device returns a List[object] from the Ophyd Device class.
#TODO make sure this is fullfiled
Staging not idempotent and should raise
:obj:`RedundantStaging` if staged twice without an
intermediate :meth:`~BlueskyInterface.unstage`.
"""
self._stopped = False
self.scaninfo.load_scan_metadata()
self.mokev = self.device_manager.devices.mokev.obj.read()[
self.device_manager.devices.mokev.name
]["value"]
# TODO refactor logger.info to DEBUG mode?
self._prep_file_writer()
self._prep_det()
state = False
self._publish_file_location(done=state)
return super().stage()
# TODO might be useful for base class
def pre_scan(self) -> None: def pre_scan(self) -> None:
"""Pre_scan is an (optional) function that is executed by BEC just before the scan core """Pre_scan is an (optional) function that is executed by BEC just before the scan core
For the pilatus detector, it is used to arm the detector for the acquisition, For the pilatus detector, it is used to arm the detector for the acquisition,
because the detector times out after ˜7-8seconds without seeing a trigger. because the detector times out after ˜7-8 seconds without seeing a trigger.
""" """
self._arm_acquisition() self.arm_acquisition()
def _arm_acquisition(self) -> None: def arm_acquisition(self) -> None:
self.cam.acquire.put(1) self.parent.cam.acquire.put(1)
# TODO check if sleep of 1s is needed, could be that less is enough # TODO Sleep needed, to be tested how long it is needed!
time.sleep(1) time.sleep(0.5)
def publish_file_location(self, done: bool = False, successful: bool = None) -> None:
"""
Publish the filepath to REDIS and publish the event for the h5_converter
def _publish_file_location(self, done: bool = False, successful: bool = None) -> None:
"""Publish the filepath to REDIS.
We publish two events here: We publish two events here:
- file_event: event for the filewriter - file_event: event for the filewriter
- public_file: event for any secondary service (e.g. radial integ code) - public_file: event for any secondary service (e.g. radial integ code)
@ -443,111 +332,92 @@ class PilatuscSAXS(DetectorBase):
Args: Args:
done (bool): True if scan is finished done (bool): True if scan is finished
successful (bool): True if scan was successful successful (bool): True if scan was successful
""" """
pipe = self._producer.pipeline() pipe = self.parent._producer.pipeline()
if successful is None: if successful is None:
msg = messages.FileMessage(file_path=self.filepath, done=done) msg = messages.FileMessage(
file_path=self.parent.filepath,
done=done,
metadata={"input_path": self.parent.filepath_raw},
)
else: else:
msg = messages.FileMessage(file_path=self.filepath, done=done, successful=successful) msg = messages.FileMessage(
self._producer.set_and_publish( file_path=self.parent.filepath,
MessageEndpoints.public_file(self.scaninfo.scanID, self.name), msg.dumps(), pipe=pipe done=done,
successful=successful,
metadata={"input_path": self.parent.filepath_raw},
)
self.parent._producer.set_and_publish(
MessageEndpoints.public_file(self.parent.scaninfo.scanID, self.parent.name),
msg.dumps(),
pipe=pipe,
) )
self._producer.set_and_publish( self.parent._producer.set_and_publish(
MessageEndpoints.file_event(self.name), msg.dumps(), pipe=pipe MessageEndpoints.file_event(self.parent.name), msg.dumps(), pipe=pipe
) )
pipe.execute() pipe.execute()
# TODO function for abstract class? def finished(self) -> None:
def trigger(self) -> DeviceStatus:
"""Trigger the detector, called from BEC."""
self._on_trigger()
return super().trigger()
# TODO function for abstract class?
def _on_trigger(self):
"""Specify action that should be taken upon trigger signal."""
pass
def unstage(self) -> List[object]:
"""Unstage the device.
This method must be idempotent, multiple calls (without a new
call to 'stage') have no effect.
Functions called:
- _finished
- _publish_file_location
""" """
old_scanID = self.scaninfo.scanID Check if acquisition is finished.
self.scaninfo.load_scan_metadata()
logger.info(f"Old scanID: {old_scanID}, ")
if self.scaninfo.scanID != old_scanID:
self._stopped = True
if self._stopped:
return super().unstage()
self._finished()
state = True
self._publish_file_location(done=state, successful=state)
self._start_h5converter(done=state)
return super().unstage()
def _start_h5converter(self, done=False) -> None: Be aware that we check here whether the mcs card is measuring at the moment,
"""Start the h5converter""" we were missing a suitable different signal.
msg = messages.FileMessage(
file_path=self.filepath_raw, done=done, metadata={"input_path": self.filepath}
)
self._producer.set_and_publish(
MessageEndpoints.public_file(self.scaninfo.scanID, self.name), msg.dumps()
)
def _finished(self) -> None: #TODO remove dependency from the mcs card
"""Check if acquisition is finished.
This function is called from unstage and stop
and will check detector and file backend status.
Timeouts after given time
Functions called:
- _stop_det
- _stop_file_writer
""" """
timer = 0 signal_conditions = [
sleep_time = 0.1 (
# TODO this is a workaround at the moment which relies on the status of the mcs device lambda: self.parent.device_manager.devices.mcs.obj._staged,
while True: Staged.no,
if self.device_manager.devices.mcs.obj._staged != Staged.yes: ),
break ]
if self._stopped == True: if not self.wait_for_signals(
break signal_conditions=signal_conditions,
time.sleep(sleep_time) timeout=self.parent.timeout,
timer = timer + sleep_time check_stopped=True,
if timer > self.timeout: all_signals=True,
self._stopped == True ):
self._stop_det() self.stop_detector()
self._stop_file_writer() self.stop_detector_backend()
# TODO explore if sleep is needed raise PilatusTimeoutError(
time.sleep(0.5) f"Reached timeout with detector state {signal_conditions[0][0]}, std_daq state {signal_conditions[1][0]} and received frames of {signal_conditions[2][0]} for the file writer"
self._close_file_writer() )
raise PilatusTimeoutError(f"Timeout waiting for mcs device to unstage") self.stop_detector()
self.stop_detector_backend()
self._stop_det() def stop_detector(self) -> None:
self._stop_file_writer() """Stop detector"""
# TODO explore if sleep time is needed self.parent.cam.acquire.put(0)
time.sleep(0.5)
self._close_file_writer()
def _stop_det(self) -> None:
"""Stop the detector"""
self.cam.acquire.put(0)
def stop(self, *, success=False) -> None: class PilatuscSAXS(PSIDetectorBase):
"""Stop the scan, with camera and file writer""" """Pilatus_2 300k detector for CSAXS
self._stop_det()
self._stop_file_writer() Eiger 9M detector class for cSAXS
self._close_file_writer()
super().stop(success=success) Parent class: PSIDetectorBase
self._stopped = True
class attributes:
custom_prepare_cls (Eiger9MSetup) : Custom detector setup class for cSAXS,
inherits from CustomDetectorMixin
cam (SLSDetectorCam) : Detector camera
MIN_READOUT (float) : Minimum readout time for the detector
"""
# Specify which functions are revealed to the user in BEC client
USER_ACCESS = [
"describe",
]
custom_prepare_cls = PilatusSetup
cam = ADCpt(SLSDetectorCam, "cam1:")
MIN_READOUT = 3e-3
def set_trigger(self, trigger_source: TriggerSource) -> None:
"""Set trigger source for the detector"""
value = trigger_source
self.cam.trigger_mode.put(value)
# Automatically connect to test environmenr if directly invoked # Automatically connect to test environmenr if directly invoked

View File

@ -64,7 +64,7 @@ class CustomDetectorMixin:
def prepare_data_backend(self) -> None: def prepare_data_backend(self) -> None:
""" """
Prepare the data backend for the scan Prepare detector backend for the scan
""" """
pass pass
@ -105,6 +105,24 @@ class CustomDetectorMixin:
Raises (optional): Raises (optional):
DetectorTimeoutError: if detector cannot be stopped DetectorTimeoutError: if detector cannot be stopped
""" """
pass
def check_scanID(self) -> None:
"""
Check if BEC is running on a new scanID
"""
pass
def publish_file_location(self, done: bool = False, successful: bool = None) -> None:
"""
Publish the designated filepath from data backend to REDIS.
Typically, the following two message types are published:
- file_event: event for the filewriter
- public_file: event for any secondary service (e.g. radial integ code)
"""
pass
def wait_for_signals( def wait_for_signals(
self, self,
@ -277,7 +295,6 @@ class PSIDetectorBase(Device):
self.custom_prepare.prepare_detector() self.custom_prepare.prepare_detector()
state = False state = False
self.custom_prepare.publish_file_location(done=state) self.custom_prepare.publish_file_location(done=state)
self.custom_prepare.arm_acquisition()
# At the moment needed bc signal is not reliable, BEC too fast # At the moment needed bc signal is not reliable, BEC too fast
time.sleep(0.05) time.sleep(0.05)
return super().stage() return super().stage()

View File

@ -29,10 +29,10 @@ def mock_det():
dm = DMMock() dm = DMMock()
with mock.patch.object(dm, "producer"): with mock.patch.object(dm, "producer"):
with mock.patch( with mock.patch(
"ophyd_devices.epics.devices.pilatus_csaxs.FileWriterMixin" "ophyd_devices.epics.devices.psi_detector_base.FileWriterMixin"
) as filemixin, mock.patch( ), mock.patch(
"ophyd_devices.epics.devices.pilatus_csaxs.PilatuscSAXS._update_service_config" "ophyd_devices.epics.devices.psi_detector_base.PSIDetectorBase._update_service_config"
) as mock_service_config: ):
with mock.patch.object(ophyd, "cl") as mock_cl: with mock.patch.object(ophyd, "cl") as mock_cl:
mock_cl.get_pv = MockPV mock_cl.get_pv = MockPV
mock_cl.thread_class = threading.Thread mock_cl.thread_class = threading.Thread
@ -67,7 +67,7 @@ def test_init_detector(
Validation upon setting the correct PVs Validation upon setting the correct PVs
""" """
mock_det._init_detector() # call the method you want to test mock_det.custom_prepare.initialize_detector() # call the method you want to test
assert mock_det.cam.acquire.get() == detector_state assert mock_det.cam.acquire.get() == detector_state
assert mock_det.cam.trigger_mode.get() == trigger_source assert mock_det.cam.trigger_mode.get() == trigger_source
@ -107,15 +107,18 @@ def test_stage(
stopped, stopped,
expected_exception, expected_exception,
): ):
with mock.patch.object(mock_det, "_publish_file_location") as mock_publish_file_location: with mock.patch.object(
mock_det.custom_prepare, "publish_file_location"
) as mock_publish_file_location:
mock_det.scaninfo.num_points = scaninfo["num_points"] mock_det.scaninfo.num_points = scaninfo["num_points"]
mock_det.scaninfo.frames_per_trigger = scaninfo["frames_per_trigger"] mock_det.scaninfo.frames_per_trigger = scaninfo["frames_per_trigger"]
mock_det.filewriter.compile_full_filename.return_value = scaninfo["filepath"] mock_det.filewriter.compile_full_filename.return_value = scaninfo["filepath"]
# TODO consider putting energy as variable in scaninfo
mock_det.device_manager.add_device("mokev", value=12.4) mock_det.device_manager.add_device("mokev", value=12.4)
mock_det._stopped = stopped mock_det._stopped = stopped
with mock.patch.object(mock_det, "_prep_file_writer") as mock_prep_fw, mock.patch.object( with mock.patch.object(
mock_det, "_update_readout_time" mock_det.custom_prepare, "prepare_data_backend"
) as mock_data_backend, mock.patch.object(
mock_det.custom_prepare, "update_readout_time"
) as mock_update_readout_time: ) as mock_update_readout_time:
mock_det.filepath = scaninfo["filepath"] mock_det.filepath = scaninfo["filepath"]
if expected_exception: if expected_exception:
@ -124,7 +127,7 @@ def test_stage(
mock_det.stage() mock_det.stage()
else: else:
mock_det.stage() mock_det.stage()
mock_prep_fw.assert_called_once() mock_data_backend.assert_called_once()
mock_update_readout_time.assert_called() mock_update_readout_time.assert_called()
# Check _prep_det # Check _prep_det
assert mock_det.cam.num_images.get() == int( assert mock_det.cam.num_images.get() == int(
@ -136,7 +139,7 @@ def test_stage(
def test_pre_scan(mock_det): def test_pre_scan(mock_det):
mock_det.pre_scan() mock_det.custom_prepare.pre_scan()
assert mock_det.cam.acquire.get() == 1 assert mock_det.cam.acquire.get() == 1
@ -151,31 +154,65 @@ def test_pre_scan(mock_det):
) )
def test_update_readout_time(mock_det, readout_time, expected_value): def test_update_readout_time(mock_det, readout_time, expected_value):
if readout_time is None: if readout_time is None:
mock_det._update_readout_time() mock_det.custom_prepare.update_readout_time()
assert mock_det.readout_time == expected_value assert mock_det.readout_time == expected_value
else: else:
mock_det.scaninfo.readout_time = readout_time mock_det.scaninfo.readout_time = readout_time
mock_det._update_readout_time() mock_det.custom_prepare.update_readout_time()
assert mock_det.readout_time == expected_value assert mock_det.readout_time == expected_value
@pytest.mark.parametrize( @pytest.mark.parametrize(
"scaninfo", "scaninfo",
[ [
({"filepath": "test.h5", "successful": True, "done": False, "scanID": "123"}), (
({"filepath": "test.h5", "successful": False, "done": True, "scanID": "123"}), {
({"filepath": "test.h5", "successful": None, "done": True, "scanID": "123"}), "filepath": "test.h5",
"filepath_raw": "test5_raw.h5",
"successful": True,
"done": False,
"scanID": "123",
}
),
(
{
"filepath": "test.h5",
"filepath_raw": "test5_raw.h5",
"successful": False,
"done": True,
"scanID": "123",
}
),
(
{
"filepath": "test.h5",
"filepath_raw": "test5_raw.h5",
"successful": None,
"done": True,
"scanID": "123",
}
),
], ],
) )
def test_publish_file_location(mock_det, scaninfo): def test_publish_file_location(mock_det, scaninfo):
mock_det.scaninfo.scanID = scaninfo["scanID"] mock_det.scaninfo.scanID = scaninfo["scanID"]
mock_det.filepath = scaninfo["filepath"] mock_det.filepath = scaninfo["filepath"]
mock_det._publish_file_location(done=scaninfo["done"], successful=scaninfo["successful"]) mock_det.filepath_raw = scaninfo["filepath_raw"]
mock_det.custom_prepare.publish_file_location(
done=scaninfo["done"], successful=scaninfo["successful"]
)
if scaninfo["successful"] is None: if scaninfo["successful"] is None:
msg = messages.FileMessage(file_path=scaninfo["filepath"], done=scaninfo["done"]).dumps() msg = messages.FileMessage(
file_path=scaninfo["filepath"],
done=scaninfo["done"],
metadata={"input_path": scaninfo["filepath_raw"]},
).dumps()
else: else:
msg = messages.FileMessage( msg = messages.FileMessage(
file_path=scaninfo["filepath"], done=scaninfo["done"], successful=scaninfo["successful"] file_path=scaninfo["filepath"],
done=scaninfo["done"],
metadata={"input_path": scaninfo["filepath_raw"]},
successful=scaninfo["successful"],
).dumps() ).dumps()
expected_calls = [ expected_calls = [
mock.call( mock.call(
@ -193,62 +230,42 @@ def test_publish_file_location(mock_det, scaninfo):
@pytest.mark.parametrize( @pytest.mark.parametrize(
"requests_state, expected_exception, url", "requests_state, expected_exception, url_delete, url_put",
[ [
( (
True, True,
False, False,
"http://x12sa-pd-2:8080/stream/pilatus_2", "http://x12sa-pd-2:8080/stream/pilatus_2",
"http://xbl-daq-34:8091/pilatus_2/stop",
), ),
( (
False, False,
False, False,
"http://x12sa-pd-2:8080/stream/pilatus_2", "http://x12sa-pd-2:8080/stream/pilatus_2",
),
],
)
def test_close_file_writer(mock_det, requests_state, expected_exception, url):
with mock.patch.object(mock_det, "_send_requests_delete") as mock_send_requests_delete:
instance = mock_send_requests_delete.return_value
instance.ok = requests_state
if expected_exception:
mock_det._close_file_writer()
mock_send_requests_delete.assert_called_once_with(url=url)
instance.raise_for_status.called_once()
else:
mock_det._close_file_writer()
mock_send_requests_delete.assert_called_once_with(url=url)
@pytest.mark.parametrize(
"requests_state, expected_exception, url",
[
(
True,
False,
"http://xbl-daq-34:8091/pilatus_2/stop",
),
(
False,
True,
"http://xbl-daq-34:8091/pilatus_2/stop", "http://xbl-daq-34:8091/pilatus_2/stop",
), ),
], ],
) )
def test_stop_file_writer(mock_det, requests_state, expected_exception, url): def test_stop_detector_backend(mock_det, requests_state, expected_exception, url_delete, url_put):
with mock.patch.object(mock_det, "_send_requests_put") as mock_send_requests_put: with mock.patch.object(
instance = mock_send_requests_put.return_value mock_det.custom_prepare, "send_requests_delete"
instance.ok = requests_state ) as mock_send_requests_delete, mock.patch.object(
instance.raise_for_status.side_effect = Exception mock_det.custom_prepare, "send_requests_put"
) as mock_send_requests_put:
instance_delete = mock_send_requests_delete.return_value
instance_delete.ok = requests_state
instance_put = mock_send_requests_put.return_value
instance_put.ok = requests_state
if expected_exception: if expected_exception:
with pytest.raises(Exception): mock_det.custom_prepare.stop_detector_backend()
mock_det.timeout = 0.1 mock_send_requests_delete.assert_called_once_with(url=url_delete)
mock_det._stop_file_writer() mock_send_requests_put.assert_called_once_with(url=url_put)
mock_send_requests_put.assert_called_once_with(url=url) instance_delete.raise_for_status.called_once()
instance.raise_for_status.called_once() instance_put.raise_for_status.called_once()
else: else:
mock_det._stop_file_writer() mock_det.custom_prepare.stop_detector_backend()
mock_send_requests_put.assert_called_once_with(url=url) mock_send_requests_delete.assert_called_once_with(url=url_delete)
mock_send_requests_put.assert_called_once_with(url=url_put)
@pytest.mark.parametrize( @pytest.mark.parametrize(
@ -356,15 +373,15 @@ def test_stop_file_writer(mock_det, requests_state, expected_exception, url):
) )
def test_prep_file_writer(mock_det, scaninfo, data_msgs, urls, requests_state, expected_exception): def test_prep_file_writer(mock_det, scaninfo, data_msgs, urls, requests_state, expected_exception):
with mock.patch.object( with mock.patch.object(
mock_det, "_close_file_writer" mock_det.custom_prepare, "close_file_writer"
) as mock_close_file_writer, mock.patch.object( ) as mock_close_file_writer, mock.patch.object(
mock_det, "_stop_file_writer" mock_det.custom_prepare, "stop_file_writer"
) as mock_stop_file_writer, mock.patch.object( ) as mock_stop_file_writer, mock.patch.object(
mock_det, "filewriter" mock_det, "filewriter"
) as mock_filewriter, mock.patch.object( ) as mock_filewriter, mock.patch.object(
mock_det, "_create_directory" mock_det.custom_prepare, "create_directory"
) as mock_create_directory, mock.patch.object( ) as mock_create_directory, mock.patch.object(
mock_det, "_send_requests_put" mock_det.custom_prepare, "send_requests_put"
) as mock_send_requests_put: ) as mock_send_requests_put:
mock_det.scaninfo.scan_number = scaninfo["scan_number"] mock_det.scaninfo.scan_number = scaninfo["scan_number"]
mock_det.scaninfo.num_points = scaninfo["num_points"] mock_det.scaninfo.num_points = scaninfo["num_points"]
@ -379,12 +396,12 @@ def test_prep_file_writer(mock_det, scaninfo, data_msgs, urls, requests_state, e
if expected_exception: if expected_exception:
with pytest.raises(Exception): with pytest.raises(Exception):
mock_det.timeout = 0.1 mock_det.timeout = 0.1
mock_det._prep_file_writer() mock_det.custom_prepare.prepare_data_backend()
mock_close_file_writer.assert_called_once() mock_close_file_writer.assert_called_once()
mock_stop_file_writer.assert_called_once() mock_stop_file_writer.assert_called_once()
instance.raise_for_status.assert_called_once() instance.raise_for_status.assert_called_once()
else: else:
mock_det._prep_file_writer() mock_det.custom_prepare.prepare_data_backend()
mock_close_file_writer.assert_called_once() mock_close_file_writer.assert_called_once()
mock_stop_file_writer.assert_called_once() mock_stop_file_writer.assert_called_once()
@ -431,11 +448,9 @@ def test_unstage(
stopped, stopped,
expected_exception, expected_exception,
): ):
with mock.patch.object(mock_det, "_finished") as mock_finished, mock.patch.object( with mock.patch.object(mock_det.custom_prepare, "finished") as mock_finished, mock.patch.object(
mock_det, "_publish_file_location" mock_det.custom_prepare, "publish_file_location"
) as mock_publish_file_location, mock.patch.object( ) as mock_publish_file_location:
mock_det, "_start_h5converter"
) as mock_start_h5converter:
mock_det._stopped = stopped mock_det._stopped = stopped
if expected_exception: if expected_exception:
mock_det.unstage() mock_det.unstage()
@ -444,15 +459,16 @@ def test_unstage(
mock_det.unstage() mock_det.unstage()
mock_finished.assert_called_once() mock_finished.assert_called_once()
mock_publish_file_location.assert_called_with(done=True, successful=True) mock_publish_file_location.assert_called_with(done=True, successful=True)
mock_start_h5converter.assert_called_once()
assert mock_det._stopped == False assert mock_det._stopped == False
def test_stop(mock_det): def test_stop(mock_det):
with mock.patch.object(mock_det, "_stop_det") as mock_stop_det, mock.patch.object( with mock.patch.object(
mock_det, "_stop_file_writer" mock_det.custom_prepare, "stop_detector"
) as mock_stop_det, mock.patch.object(
mock_det.custom_prepare, "stop_file_writer"
) as mock_stop_file_writer, mock.patch.object( ) as mock_stop_file_writer, mock.patch.object(
mock_det, "_close_file_writer" mock_det.custom_prepare, "close_file_writer"
) as mock_close_file_writer: ) as mock_close_file_writer:
mock_det.stop() mock_det.stop()
mock_stop_det.assert_called_once() mock_stop_det.assert_called_once()
@ -483,24 +499,24 @@ def test_stop(mock_det):
) )
def test_finished(mock_det, stopped, mcs_stage_state, expected_exception): def test_finished(mock_det, stopped, mcs_stage_state, expected_exception):
with mock.patch.object(mock_det, "device_manager") as mock_dm, mock.patch.object( with mock.patch.object(mock_det, "device_manager") as mock_dm, mock.patch.object(
mock_det, "_stop_file_writer" mock_det.custom_prepare, "stop_file_writer"
) as mock_stop_file_friter, mock.patch.object( ) as mock_stop_file_friter, mock.patch.object(
mock_det, "_stop_det" mock_det.custom_prepare, "stop_detector"
) as mock_stop_det, mock.patch.object( ) as mock_stop_det, mock.patch.object(
mock_det, "_close_file_writer" mock_det.custom_prepare, "close_file_writer"
) as mock_close_file_writer: ) as mock_close_file_writer:
mock_dm.devices.mcs.obj._staged = mcs_stage_state mock_dm.devices.mcs.obj._staged = mcs_stage_state
mock_det._stopped = stopped mock_det._stopped = stopped
if expected_exception: if expected_exception:
with pytest.raises(Exception): with pytest.raises(Exception):
mock_det.timeout = 0.1 mock_det.timeout = 0.1
mock_det._finished() mock_det.custom_prepare.finished()
assert mock_det._stopped == stopped assert mock_det._stopped == stopped
mock_stop_file_friter.assert_called() mock_stop_file_friter.assert_called()
mock_stop_det.assert_called_once() mock_stop_det.assert_called_once()
mock_close_file_writer.assert_called_once() mock_close_file_writer.assert_called_once()
else: else:
mock_det._finished() mock_det.custom_prepare.finished()
if stopped: if stopped:
assert mock_det._stopped == stopped assert mock_det._stopped == stopped