Merge branch 'csaxs_detectors' into 'master'

Csaxs detectors

Closes #3

See merge request bec/ophyd_devices!35
This commit is contained in:
2023-10-24 17:33:31 +02:00
3 changed files with 605 additions and 385 deletions

View File

@ -1,78 +1,60 @@
import enum import enum
import threading import threading
import time import time
from typing import Any, List from bec_lib.core.devicemanager import DeviceStatus
import numpy as np import numpy as np
import os import os
from typing import Any, List
from ophyd import EpicsSignal, EpicsSignalRO, EpicsSignalWithRBV from ophyd import EpicsSignal, EpicsSignalRO, EpicsSignalWithRBV
from ophyd import DetectorBase, Device from ophyd import DetectorBase, Device
from ophyd import ADComponent as ADCpt from ophyd import ADComponent as ADCpt
from std_daq_client import StdDaqClient
from bec_lib.core import BECMessage, MessageEndpoints, threadlocked from bec_lib.core import BECMessage, MessageEndpoints, threadlocked
from bec_lib.core.file_utils import FileWriterMixin from bec_lib.core.file_utils import FileWriterMixin
from bec_lib.core import bec_logger from bec_lib.core import bec_logger
from ophyd_devices.utils import bec_utils as bec_utils
from std_daq_client import StdDaqClient
from ophyd_devices.epics.devices.bec_scaninfo_mixin import BecScaninfoMixin from ophyd_devices.epics.devices.bec_scaninfo_mixin import BecScaninfoMixin
from ophyd_devices.utils import bec_utils
logger = bec_logger.logger logger = bec_logger.logger
class EigerError(Exception): class EigerError(Exception):
"""Base class for exceptions in this module."""
pass pass
class EigerTimeoutError(Exception): class EigerTimeoutError(Exception):
"""Raised when the Eiger does not respond in time during unstage."""
pass pass
class SlsDetectorCam(Device): class SlsDetectorCam(Device):
# detector_type = ADCpt(EpicsSignalRO, "DetectorType_RBV") """SLS Detector Camera - Eiger 9M
# setting = ADCpt(EpicsSignalWithRBV, "Setting")
# delay_time = ADCpt(EpicsSignalWithRBV, "DelayTime") Base class to map EPICS PVs to ophyd signals.
"""
threshold_energy = ADCpt(EpicsSignalWithRBV, "ThresholdEnergy") threshold_energy = ADCpt(EpicsSignalWithRBV, "ThresholdEnergy")
beam_energy = ADCpt(EpicsSignalWithRBV, "BeamEnergy") beam_energy = ADCpt(EpicsSignalWithRBV, "BeamEnergy")
# enable_trimbits = ADCpt(EpicsSignalWithRBV, "Trimbits")
bit_depth = ADCpt(EpicsSignalWithRBV, "BitDepth") bit_depth = ADCpt(EpicsSignalWithRBV, "BitDepth")
# num_gates = ADCpt(EpicsSignalWithRBV, "NumGates") num_images = ADCpt(EpicsSignalWithRBV, "NumCycles")
num_cycles = ADCpt(EpicsSignalWithRBV, "NumCycles")
num_frames = ADCpt(EpicsSignalWithRBV, "NumFrames") num_frames = ADCpt(EpicsSignalWithRBV, "NumFrames")
timing_mode = ADCpt(EpicsSignalWithRBV, "TimingMode") trigger_mode = ADCpt(EpicsSignalWithRBV, "TimingMode")
trigger_software = ADCpt(EpicsSignal, "TriggerSoftware") trigger_software = ADCpt(EpicsSignal, "TriggerSoftware")
# high_voltage = ADCpt(EpicsSignalWithRBV, "HighVoltage")
# Receiver and data callback
# receiver_mode = ADCpt(EpicsSignalWithRBV, "ReceiverMode")
# receiver_stream = ADCpt(EpicsSignalWithRBV, "ReceiverStream")
# enable_data = ADCpt(EpicsSignalWithRBV, "UseDataCallback")
# missed_packets = ADCpt(EpicsSignalRO, "ReceiverMissedPackets_RBV")
# Direct settings access
# setup_file = ADCpt(EpicsSignal, "SetupFile")
# load_setup = ADCpt(EpicsSignal, "LoadSetup")
# command = ADCpt(EpicsSignal, "Command")
# Mythen 3
# counter_mask = ADCpt(EpicsSignalWithRBV, "CounterMask")
# counter1_threshold = ADCpt(EpicsSignalWithRBV, "Counter1Threshold")
# counter2_threshold = ADCpt(EpicsSignalWithRBV, "Counter2Threshold")
# counter3_threshold = ADCpt(EpicsSignalWithRBV, "Counter3Threshold")
# gate1_delay = ADCpt(EpicsSignalWithRBV, "Gate1Delay")
# gate1_width = ADCpt(EpicsSignalWithRBV, "Gate1Width")
# gate2_delay = ADCpt(EpicsSignalWithRBV, "Gate2Delay")
# gate2_width = ADCpt(EpicsSignalWithRBV, "Gate2Width")
# gate3_delay = ADCpt(EpicsSignalWithRBV, "Gate3Delay")
# gate3_width = ADCpt(EpicsSignalWithRBV, "Gate3Width")
# # Moench
# json_frame_mode = ADCpt(EpicsSignalWithRBV, "JsonFrameMode")
# json_detector_mode = ADCpt(EpicsSignalWithRBV, "JsonDetectorMode")
# fixes due to missing PVs from CamBase
acquire = ADCpt(EpicsSignal, "Acquire") acquire = ADCpt(EpicsSignal, "Acquire")
detector_state = ADCpt(EpicsSignalRO, "DetectorState_RBV") detector_state = ADCpt(EpicsSignalRO, "DetectorState_RBV")
class TriggerSource(int, enum.Enum): class TriggerSource(int, enum.Enum):
"""Trigger signals for Eiger9M detector"""
AUTO = 0 AUTO = 0
TRIGGER = 1 TRIGGER = 1
GATING = 2 GATING = 2
@ -80,6 +62,8 @@ class TriggerSource(int, enum.Enum):
class DetectorState(int, enum.Enum): class DetectorState(int, enum.Enum):
"""Detector states for Eiger9M detector"""
IDLE = 0 IDLE = 0
ERROR = 1 ERROR = 1
WAITING = 2 WAITING = 2
@ -105,6 +89,7 @@ class Eiger9mCsaxs(DetectorBase):
""" """
# Specify which functions are revealed to the user in BEC client
USER_ACCESS = [ USER_ACCESS = [
"describe", "describe",
] ]
@ -124,6 +109,18 @@ class Eiger9mCsaxs(DetectorBase):
sim_mode=False, sim_mode=False,
**kwargs, **kwargs,
): ):
"""Initialize the Eiger9M detector
Args:
#TODO add here the parameters for kind, read_attrs, configuration_attrs, parent
prefix (str): PV prefix (X12SA-ES-EIGER9M:)
name (str): 'eiger'
kind (str):
read_attrs (list):
configuration_attrs (list):
parent (object):
device_manager (object): BEC device manager
sim_mode (bool): simulation mode to start the detector without BEC, e.g. from ipython shell
"""
super().__init__( super().__init__(
prefix=prefix, prefix=prefix,
name=name, name=name,
@ -133,13 +130,16 @@ class Eiger9mCsaxs(DetectorBase):
parent=parent, parent=parent,
**kwargs, **kwargs,
) )
self._stopped = False
self._lock = threading.RLock()
if device_manager is None and not sim_mode: if device_manager is None and not sim_mode:
raise EigerError("Add DeviceManager to initialization or init with sim_mode=True") raise EigerError("Add DeviceManager to initialization or init with sim_mode=True")
# Not sure if this is needed, comment it for now!
# self._lock = threading.RLock()
self._stopped = False
self.name = name self.name = name
self.wait_for_connection() # Make sure to be connected before talking to PVs self.wait_for_connection()
# Spin up connections for simulation or BEC mode
# TODO check if sim_mode still works. Is it needed? I believe filewriting might be handled properly
if not sim_mode: if not sim_mode:
from bec_lib.core.bec_service import SERVICE_CONFIG from bec_lib.core.bec_service import SERVICE_CONFIG
@ -147,32 +147,70 @@ class Eiger9mCsaxs(DetectorBase):
self._producer = self.device_manager.producer self._producer = self.device_manager.producer
self.service_cfg = SERVICE_CONFIG.config["service_config"]["file_writer"] self.service_cfg = SERVICE_CONFIG.config["service_config"]["file_writer"]
else: else:
base_path = f"/sls/X12SA/data/{self.scaninfo.username}/Data10/"
self._producer = bec_utils.MockProducer() self._producer = bec_utils.MockProducer()
self.device_manager = bec_utils.MockDeviceManager() self.device_manager = bec_utils.MockDeviceManager()
self.scaninfo = BecScaninfoMixin(device_manager, sim_mode) self.scaninfo = BecScaninfoMixin(device_manager, sim_mode)
self.scaninfo.load_scan_metadata() self.scaninfo.load_scan_metadata()
self.service_cfg = {"base_path": f"/sls/X12SA/data/{self.scaninfo.username}/Data10/"} self.service_cfg = {"base_path": base_path}
self.scaninfo = BecScaninfoMixin(device_manager, sim_mode) self.scaninfo = BecScaninfoMixin(device_manager, sim_mode)
self.scaninfo.load_scan_metadata() self.scaninfo.load_scan_metadata()
# TODO
self.filepath = ""
self.filewriter = FileWriterMixin(self.service_cfg) self.filewriter = FileWriterMixin(self.service_cfg)
self.reduce_readout = 1e-3 # 3 ms self._init()
self.triggermode = 0 # 0 : internal, scan must set this if hardware triggered
self._init_eiger9m()
self._init_standard_daq()
# self.mokev = self.device_manager.devices.mokev.read()[ # TODO function for abstract class?
# self.device_manager.devices.mokev.name def _init(self) -> None:
# ]["value"] """Initialize detector, filewriter and set default parameters"""
self._default_parameter()
self._init_detector()
self._init_filewriter()
def _init_eiger9m(self) -> None: # TODO function for abstract class?
"""Init parameters for Eiger 9m""" def _default_parameter(self) -> None:
"""Set default parameters for Eiger 9M
readout (float) : readout time in seconds
"""
self.reduce_readout = 1e-3
# TODO function for abstract class?
def _init_detector(self) -> None:
"""Init parameters for Eiger 9m.
Depends on hardware configuration and delay generators.
At this point it is set up for gating mode (09/2023).
"""
self._stop_det()
self._set_trigger(TriggerSource.GATING) self._set_trigger(TriggerSource.GATING)
self.stop_acquisition()
# TODO function for abstract class?
def _init_filewriter(self) -> None:
"""Init parameters for filewriter.
For the Eiger9M, the data backend is std_daq client.
Setting up these parameters depends on the backend, and would need to change upon changes in the backend.
"""
self.std_rest_server_url = "http://xbl-daq-29:5000"
self.std_client = StdDaqClient(url_base=self.std_rest_server_url)
self.std_client.stop_writer()
timeout = 0
# TODO changing e-account was not possible during beamtimes.
# self._update_std_cfg("writer_user_id", int(self.scaninfo.username.strip(" e")))
# time.sleep(5)
# TODO is this the only state to wait for or should we wait for more from the std_daq client?
while not self.std_client.get_status()["state"] == "READY":
time.sleep(0.1)
timeout = timeout + 0.1
logger.info("Waiting for std_daq init.")
if timeout > 5:
if not self.std_client.get_status()["state"]:
raise EigerError(
f"Std client not in READY state, returns: {self.std_client.get_status()}"
)
else:
return
def _update_std_cfg(self, cfg_key: str, value: Any) -> None: def _update_std_cfg(self, cfg_key: str, value: Any) -> None:
"""Update std_daq config with new e-account for the current beamtime"""
# TODO Do we need all the loggers here, should this be properly refactored with a DEBUG mode?
cfg = self.std_client.get_config() cfg = self.std_client.get_config()
old_value = cfg.get(cfg_key) old_value = cfg.get(cfg_key)
logger.info(old_value) logger.info(old_value)
@ -189,71 +227,54 @@ class Eiger9mCsaxs(DetectorBase):
logger.info(f"Updated std_daq config for key {cfg_key} from {old_value} to {value}") logger.info(f"Updated std_daq config for key {cfg_key} from {old_value} to {value}")
self.std_client.set_config(cfg) self.std_client.set_config(cfg)
def _init_standard_daq(self) -> None: # TODO function for abstract class?
self.std_rest_server_url = "http://xbl-daq-29:5000" def stage(self) -> List[object]:
self.std_client = StdDaqClient(url_base=self.std_rest_server_url) """Stage command, called from BEC in preparation of a scan.
self.std_client.stop_writer() This will iniate the preparation of detector and file writer.
timeout = 0 The following functuions are called (at least):
# TODO put back change of e-account! - _prep_file_writer
# self._update_std_cfg("writer_user_id", int(self.scaninfo.username.strip(" e"))) - _prep_det
# time.sleep(5) - _publish_file_location
while not self.std_client.get_status()["state"] == "READY": The device returns a List[object] from the Ophyd Device class.
time.sleep(0.1)
timeout = timeout + 0.1
logger.info("Waiting for std_daq init.")
if timeout > 5:
if not self.std_client.get_status()["state"]:
raise EigerError(
f"Std client not in READY state, returns: {self.std_client.get_status()}"
)
else:
return
def _prep_det(self) -> None: #TODO make sure this is fullfiled
self._set_det_threshold()
self._set_acquisition_params()
self._set_trigger(TriggerSource.GATING)
def _set_det_threshold(self) -> None: Staging not idempotent and should raise
# threshold_energy PV exists on Eiger 9M? :obj:`RedundantStaging` if staged twice without an
factor = 1 intermediate :meth:`~BlueskyInterface.unstage`.
if self.cam.threshold_energy._metadata["units"] == "eV":
factor = 1000
setp_energy = int(self.mokev * factor)
energy = self.cam.beam_energy.read()[self.cam.beam_energy.name]["value"]
if setp_energy != energy:
self.cam.beam_energy.set(setp_energy) # .wait()
threshold = self.cam.threshold_energy.read()[self.cam.threshold_energy.name]["value"]
if not np.isclose(setp_energy / 2, threshold, rtol=0.05):
self.cam.threshold_energy.set(setp_energy / 2) # .wait()
def _set_acquisition_params(self) -> None:
# self.cam.acquire_time.set(self.scaninfo.exp_time)
# Set acquisition parameters slightly shorter then cycle
# self.cam.acquire_period.set(
# self.scaninfo.exp_time + (self.scaninfo.readout_time - self.reduce_readout)
# )
self.cam.num_cycles.put(int(self.scaninfo.num_points * self.scaninfo.frames_per_trigger))
self.cam.num_frames.put(1)
def _set_trigger(self, trigger_source: TriggerSource) -> None:
"""Set trigger source for the detector, either directly to value or TriggerSource.* with
AUTO = 0
TRIGGER = 1
GATING = 2
BURST_TRIGGER = 3
""" """
value = int(trigger_source) self._stopped = False
self.cam.timing_mode.put(value) self.scaninfo.load_scan_metadata()
self.mokev = self.device_manager.devices.mokev.obj.read()[
self.device_manager.devices.mokev.name
]["value"]
# TODO refactor logger.info to DEBUG mode?
self._prep_file_writer()
self._prep_det()
state = False
self._publish_file_location(done=state, successful=state)
self._arm_acquisition()
# TODO Fix should take place in EPICS or directly on the hardware!
# We observed that the detector missed triggers in the beginning in case BEC was to fast. Adding 50ms delay solved this
time.sleep(0.05)
return super().stage()
# TODO function for abstract class?
def _prep_file_writer(self) -> None: def _prep_file_writer(self) -> None:
"""Prepare file writer for scan
self.filewriter is a FileWriterMixin object that hosts logic for compiling the filepath
"""
self.filepath = self.filewriter.compile_full_filename( self.filepath = self.filewriter.compile_full_filename(
self.scaninfo.scan_number, f"{self.name}.h5", 1000, 5, True self.scaninfo.scan_number, f"{self.name}.h5", 1000, 5, True
) )
# TODO needed, should be checked from the filerwriter mixin right?
while not os.path.exists(os.path.dirname(self.filepath)): while not os.path.exists(os.path.dirname(self.filepath)):
time.sleep(0.1) time.sleep(0.1)
self._close_file_writer()
self._stop_file_writer()
logger.info(f" std_daq output filepath {self.filepath}") logger.info(f" std_daq output filepath {self.filepath}")
# TODO Discuss with Leo if this is needed, or how to start the async writing best
try: try:
self.std_client.start_writer_async( self.std_client.start_writer_async(
{ {
@ -272,100 +293,66 @@ class Eiger9mCsaxs(DetectorBase):
break break
time.sleep(0.005) time.sleep(0.005)
def _close_file_writer(self) -> None: # TODO function for abstract class?
def _stop_file_writer(self) -> None:
"""Close file writer"""
self.std_client.stop_writer() self.std_client.stop_writer()
pass # TODO can I wait for a status message here maybe? To ensure writer returned
def stage(self) -> List[object]: # TODO function for abstract class?
"""stage the detector and file writer""" def _prep_det(self) -> None:
self._stopped = False """Prepare detector for scan.
self._acquisition_done = False Includes checking the detector threshold, setting the acquisition parameters and setting the trigger source
self.scaninfo.load_scan_metadata()
self.mokev = self.device_manager.devices.mokev.obj.read()[
self.device_manager.devices.mokev.name
]["value"]
self._prep_file_writer()
self._prep_det()
logger.info("Waiting for std daq to be armed")
logger.info("std_daq is ready")
msg = BECMessage.FileMessage(file_path=self.filepath, done=False)
self._producer.set_and_publish(
MessageEndpoints.public_file(self.scaninfo.scanID, self.name),
msg.dumps(),
)
msg = BECMessage.FileMessage(file_path=self.filepath, done=False)
self._producer.set_and_publish(
MessageEndpoints.file_event(self.name),
msg.dumps(),
)
self.arm_acquisition()
self._stopped = False
# We see that we miss a trigger occasionally, it seems that status msg from the ioc are not realiable
time.sleep(0.05)
return super().stage()
@threadlocked
def unstage(self) -> List[object]:
"""unstage the detector and file writer"""
logger.info("Waiting for Eiger9M to finish")
old_scanID = self.scaninfo.scanID
self.scaninfo.load_scan_metadata()
logger.info(f"Old scanID: {old_scanID}, ")
if self.scaninfo.scanID != old_scanID:
self._stopped = True
if self._stopped == True:
return super().unstage()
self._eiger9M_finished()
# Message to BEC
state = True
msg = BECMessage.FileMessage(file_path=self.filepath, done=True, successful=state)
self._producer.set_and_publish(
MessageEndpoints.public_file(self.scaninfo.scanID, self.name),
msg.dumps(),
)
self._stopped = False
logger.info("Eiger9M finished")
return super().unstage()
@threadlocked
def _eiger9M_finished(self):
"""Function with 10s timeout"""
timer = 0
while True:
det_ctrl = self.cam.acquire.read()[self.cam.acquire.name]["value"]
# det_ctrl = 0
std_ctrl = self.std_client.get_status()["acquisition"]["state"]
status = self.std_client.get_status()
received_frames = status["acquisition"]["stats"]["n_write_completed"]
total_frames = int(self.scaninfo.num_points * self.scaninfo.frames_per_trigger)
# TODO if no writing was performed before
if det_ctrl == 0 and std_ctrl == "FINISHED" and total_frames == received_frames:
break
if self._stopped == True:
self.stop_acquisition()
self._close_file_writer()
break
time.sleep(0.1)
timer += 0.1
if timer > 5:
self._stopped == True
self._close_file_writer()
self.stop_acquisition()
raise EigerTimeoutError(
f"Reached timeout with detector state {det_ctrl}, std_daq state {std_ctrl} and received frames of {received_frames} for the file writer"
)
self._close_file_writer()
def arm_acquisition(self) -> None:
"""Start acquisition in software trigger mode,
or arm the detector in hardware of the detector
""" """
self._set_det_threshold()
self._set_acquisition_params()
self._set_trigger(TriggerSource.GATING)
def _set_det_threshold(self) -> None:
"""Set correct detector threshold to 1/2 of current X-ray energy, allow 5% tolerance"""
# threshold energy might be in eV or keV
factor = 1
if self.cam.threshold_energy._metadata["units"] == "eV":
factor = 1000
setpoint = int(self.mokev * factor)
energy = self.cam.beam_energy.read()[self.cam.beam_energy.name]["value"]
if setpoint != energy:
self.cam.beam_energy.set(setpoint)
threshold = self.cam.threshold_energy.read()[self.cam.threshold_energy.name]["value"]
if not np.isclose(setpoint / 2, threshold, rtol=0.05):
self.cam.threshold_energy.set(setpoint / 2)
def _set_acquisition_params(self) -> None:
"""Set acquisition parameters for the detector"""
self.cam.num_images.put(int(self.scaninfo.num_points * self.scaninfo.frames_per_trigger))
self.cam.num_frames.put(1)
# TODO function for abstract class? + call it for each scan??
def _set_trigger(self, trigger_source: TriggerSource) -> None:
"""Set trigger source for the detector.
Check the TriggerSource enum for possible values
"""
value = int(trigger_source)
self.cam.trigger_mode.put(value)
def _publish_file_location(self, done=False, successful=False) -> None:
"""Publish the filepath to REDIS
First msg for file writer and the second one for other listeners (e.g. radial integ)
"""
pipe = self._producer.pipeline()
msg = BECMessage.FileMessage(file_path=self.filepath, done=done, successful=successful)
self._producer.set_and_publish(
MessageEndpoints.public_file(self.scaninfo.scanID, self.name), msg.dumps(), pipe=pipe
)
self._producer.set_and_publish(
MessageEndpoints.file_event(self.name), msg.dumps(), pip=pipe
)
pipe.execute()
# TODO function for abstract class?
def _arm_acquisition(self) -> None:
"""Arm Eiger detector for acquisition"""
self.cam.acquire.put(1) self.cam.acquire.put(1)
logger.info("Waiting for Eiger9m to be armed")
while True: while True:
det_ctrl = self.cam.detector_state.read()[self.cam.detector_state.name]["value"] det_ctrl = self.cam.detector_state.read()[self.cam.detector_state.name]["value"]
if det_ctrl == int(DetectorState.RUNNING): if det_ctrl == int(DetectorState.RUNNING):
@ -373,15 +360,96 @@ class Eiger9mCsaxs(DetectorBase):
if self._stopped == True: if self._stopped == True:
break break
time.sleep(0.005) time.sleep(0.005)
logger.info("Eiger9m is armed")
def stop_acquisition(self) -> None: # TODO function for abstract class?
def trigger(self) -> DeviceStatus:
"""Trigger the detector, called from BEC."""
self._on_trigger()
return super().trigger()
# TODO function for abstract class?
def _on_trigger(self):
"""Specify action that should be taken upon trigger signal."""
pass
# TODO function for abstract class?
# TODO threadlocked was attached, in principle unstage needs to be fast and should possibly called multiple times
@threadlocked
def unstage(self) -> List[object]:
"""Unstage the device.
This method must be idempotent, multiple calls (without a new
call to 'stage') have no effect.
Functions called:
- _finished
- _publish_file_location
"""
# TODO solution for multiple calls of the function to avoid calling the finished loop.
# Loop to avoid calling the finished loop multiple times
old_scanID = self.scaninfo.scanID
self.scaninfo.load_scan_metadata()
if self.scaninfo.scanID != old_scanID:
self._stopped = True
if self._stopped == True:
return super().unstage()
self._finished()
state = True
self._publish_file_location(done=state, successful=state)
self._stopped = False
return super().unstage()
# TODO function for abstract class?
# TODO necessary, how can we make this cleaner.
@threadlocked
def _finished(self):
"""Check if acquisition is finished.
This function is called from unstage and stop
and will check detector and file backend status.
Timeouts after given time
Functions called:
- _stop_det
- _stop_file_writer
"""
sleep_time = 0.1
timeout = 5
timer = 0
# Check status with timeout, break out if _stopped=True
while True:
det_ctrl = self.cam.acquire.read()[self.cam.acquire.name]["value"]
std_ctrl = self.std_client.get_status()["acquisition"]["state"]
status = self.std_client.get_status()
received_frames = status["acquisition"]["stats"]["n_write_completed"]
total_frames = int(self.scaninfo.num_points * self.scaninfo.frames_per_trigger)
if det_ctrl == 0 and std_ctrl == "FINISHED" and total_frames == received_frames:
break
if self._stopped == True:
self._stop_det()
self._stop_file_writer()
break
time.sleep(sleep_time)
timer += sleep_time
if timer > timeout:
self._stopped == True
self._stop_det()
self._stop_file_writer()
raise EigerTimeoutError(
f"Reached timeout with detector state {det_ctrl}, std_daq state {std_ctrl} and received frames of {received_frames} for the file writer"
)
self._stop_det()
self._stop_file_writer()
def _stop_det(self) -> None:
"""Stop the detector and wait for the proper status message""" """Stop the detector and wait for the proper status message"""
logger.info("Waiting for Eiger9m to be armed")
elapsed_time = 0 elapsed_time = 0
sleep_time = 0.01 sleep_time = 0.01
timeout = 5
# Stop acquisition
self.cam.acquire.put(0) self.cam.acquire.put(0)
retry = False retry = False
# Check status
while True: while True:
det_ctrl = self.cam.detector_state.read()[self.cam.detector_state.name]["value"] det_ctrl = self.cam.detector_state.read()[self.cam.detector_state.name]["value"]
if det_ctrl == int(DetectorState.IDLE): if det_ctrl == int(DetectorState.IDLE):
@ -390,16 +458,17 @@ class Eiger9mCsaxs(DetectorBase):
break break
time.sleep(sleep_time) time.sleep(sleep_time)
elapsed_time += sleep_time elapsed_time += sleep_time
if elapsed_time > 2 and not retry: if elapsed_time > timeout // 2 and not retry:
retry = True retry = True
# Retry to stop acquisition
self.cam.acquire.put(0) self.cam.acquire.put(0)
if elapsed_time > 5: if elapsed_time > timeout:
raise EigerTimeoutError("Failed to stop the acquisition. IOC did not update.") raise EigerTimeoutError("Failed to stop the acquisition. IOC did not update.")
def stop(self, *, success=False) -> None: def stop(self, *, success=False) -> None:
"""Stop the scan, with camera and file writer""" """Stop the scan, with camera and file writer"""
self.stop_acquisition() self._stop_det()
self._close_file_writer() self._stop_file_writer()
super().stop(success=success) super().stop(success=success)
self._stopped = True self._stopped = True

View File

@ -1,37 +1,47 @@
import enum import enum
import os import os
import time import time
from typing import List
from ophyd import EpicsSignal, EpicsSignalRO, EpicsSignalWithRBV, Component as Cpt, Device
from typing import List
from bec_lib.core.devicemanager import DeviceStatus
from ophyd import EpicsSignal, EpicsSignalRO, EpicsSignalWithRBV, Component as Cpt, Device
from ophyd.mca import EpicsMCARecord from ophyd.mca import EpicsMCARecord
from ophyd.areadetector.plugins import HDF5Plugin_V21, FilePlugin_V22
from bec_lib.core.file_utils import FileWriterMixin from bec_lib.core.file_utils import FileWriterMixin
from bec_lib.core import MessageEndpoints, BECMessage from bec_lib.core import MessageEndpoints, BECMessage
from bec_lib.core import bec_logger from bec_lib.core import bec_logger
from ophyd_devices.epics.devices.bec_scaninfo_mixin import BecScaninfoMixin
from ophyd_devices.epics.devices.bec_scaninfo_mixin import BecScaninfoMixin
from ophyd_devices.utils import bec_utils from ophyd_devices.utils import bec_utils
logger = bec_logger.logger logger = bec_logger.logger
class FalconError(Exception): class FalconError(Exception):
"""Base class for exceptions in this module."""
pass pass
class FalconTimeoutError(Exception): class FalconTimeoutError(Exception):
"""Raised when the Falcon does not respond in time during unstage."""
pass pass
class DetectorState(int, enum.Enum): class DetectorState(int, enum.Enum):
"""Detector states for Falcon detector"""
DONE = 0 DONE = 0
ACQUIRING = 1 ACQUIRING = 1
class EpicsDXPFalcon(Device): class EpicsDXPFalcon(Device):
"""All high-level DXP parameters for each channel""" """DXP parameters for Falcon detector
Base class to map EPICS PVs from DXP parameters to ophyd signals.
"""
elapsed_live_time = Cpt(EpicsSignal, "ElapsedLiveTime") elapsed_live_time = Cpt(EpicsSignal, "ElapsedLiveTime")
elapsed_real_time = Cpt(EpicsSignal, "ElapsedRealTime") elapsed_real_time = Cpt(EpicsSignal, "ElapsedRealTime")
@ -51,15 +61,17 @@ class EpicsDXPFalcon(Device):
current_pixel = Cpt(EpicsSignalRO, "CurrentPixel") current_pixel = Cpt(EpicsSignalRO, "CurrentPixel")
class FalconHDF5Plugins(Device): # HDF5Plugin_V21, FilePlugin_V22): class FalconHDF5Plugins(Device):
"""HDF5 parameters for Falcon detector
Base class to map EPICS PVs from HDF5 Plugin to ophyd signals.
"""
capture = Cpt(EpicsSignalWithRBV, "Capture") capture = Cpt(EpicsSignalWithRBV, "Capture")
enable = Cpt(EpicsSignalWithRBV, "EnableCallbacks", string=True, kind="config") enable = Cpt(EpicsSignalWithRBV, "EnableCallbacks", string=True, kind="config")
xml_file_name = Cpt(EpicsSignalWithRBV, "XMLFileName", string=True, kind="config") xml_file_name = Cpt(EpicsSignalWithRBV, "XMLFileName", string=True, kind="config")
lazy_open = Cpt(EpicsSignalWithRBV, "LazyOpen", string=True, doc="0='No' 1='Yes'") lazy_open = Cpt(EpicsSignalWithRBV, "LazyOpen", string=True, doc="0='No' 1='Yes'")
temp_suffix = Cpt(EpicsSignalWithRBV, "TempSuffix", string=True) temp_suffix = Cpt(EpicsSignalWithRBV, "TempSuffix", string=True)
# file_path = Cpt(
# EpicsSignalWithRBV, "FilePath", string=True, kind="config", path_semantics="posix"
# )
file_path = Cpt(EpicsSignalWithRBV, "FilePath", string=True, kind="config") file_path = Cpt(EpicsSignalWithRBV, "FilePath", string=True, kind="config")
file_name = Cpt(EpicsSignalWithRBV, "FileName", string=True, kind="config") file_name = Cpt(EpicsSignalWithRBV, "FileName", string=True, kind="config")
file_template = Cpt(EpicsSignalWithRBV, "FileTemplate", string=True, kind="config") file_template = Cpt(EpicsSignalWithRBV, "FileTemplate", string=True, kind="config")
@ -70,29 +82,40 @@ class FalconHDF5Plugins(Device): # HDF5Plugin_V21, FilePlugin_V22):
class FalconCsaxs(Device): class FalconCsaxs(Device):
"""FalxonX1 with HDF5 writer""" """Falcon Sitoro detector for CSAXS
Parent class: Device
Device classes: EpicsDXPFalcon dxp1:, EpicsMCARecord mca1, FalconHDF5Plugins HDF1:
Attributes:
name str: 'falcon'
prefix (str): PV prefix ("X12SA-SITORO:)
"""
# Specify which functions are revealed to the user in BEC client
USER_ACCESS = [
"describe",
]
dxp = Cpt(EpicsDXPFalcon, "dxp1:") dxp = Cpt(EpicsDXPFalcon, "dxp1:")
mca = Cpt(EpicsMCARecord, "mca1") mca = Cpt(EpicsMCARecord, "mca1")
hdf5 = Cpt(FalconHDF5Plugins, "HDF1:") hdf5 = Cpt(FalconHDF5Plugins, "HDF1:")
# Control # specify Epics PVs for Falcon
# TODO consider moving this outside of this class!
stop_all = Cpt(EpicsSignal, "StopAll") stop_all = Cpt(EpicsSignal, "StopAll")
erase_all = Cpt(EpicsSignal, "EraseAll") erase_all = Cpt(EpicsSignal, "EraseAll")
start_all = Cpt(EpicsSignal, "StartAll") start_all = Cpt(EpicsSignal, "StartAll")
state = Cpt(EpicsSignal, "Acquiring") state = Cpt(EpicsSignal, "Acquiring")
# Preset options
preset_mode = Cpt(EpicsSignal, "PresetMode") # 0 No preset 1 Real time 2 Events 3 Triggers preset_mode = Cpt(EpicsSignal, "PresetMode") # 0 No preset 1 Real time 2 Events 3 Triggers
preset_real = Cpt(EpicsSignal, "PresetReal") preset_real = Cpt(EpicsSignal, "PresetReal")
preset_events = Cpt(EpicsSignal, "PresetEvents") preset_events = Cpt(EpicsSignal, "PresetEvents")
preset_triggers = Cpt(EpicsSignal, "PresetTriggers") preset_triggers = Cpt(EpicsSignal, "PresetTriggers")
# read-only diagnostics
triggers = Cpt(EpicsSignalRO, "MaxTriggers", lazy=True) triggers = Cpt(EpicsSignalRO, "MaxTriggers", lazy=True)
events = Cpt(EpicsSignalRO, "MaxEvents", lazy=True) events = Cpt(EpicsSignalRO, "MaxEvents", lazy=True)
input_count_rate = Cpt(EpicsSignalRO, "MaxInputCountRate", lazy=True) input_count_rate = Cpt(EpicsSignalRO, "MaxInputCountRate", lazy=True)
output_count_rate = Cpt(EpicsSignalRO, "MaxOutputCountRate", lazy=True) output_count_rate = Cpt(EpicsSignalRO, "MaxOutputCountRate", lazy=True)
# Mapping control
collect_mode = Cpt(EpicsSignal, "CollectMode") # 0 MCA spectra, 1 MCA mapping collect_mode = Cpt(EpicsSignal, "CollectMode") # 0 MCA spectra, 1 MCA mapping
pixel_advance_mode = Cpt(EpicsSignal, "PixelAdvanceMode") pixel_advance_mode = Cpt(EpicsSignal, "PixelAdvanceMode")
ignore_gate = Cpt(EpicsSignal, "IgnoreGate") ignore_gate = Cpt(EpicsSignal, "IgnoreGate")
@ -102,8 +125,6 @@ class FalconCsaxs(Device):
pixels_per_run = Cpt(EpicsSignal, "PixelsPerRun") pixels_per_run = Cpt(EpicsSignal, "PixelsPerRun")
nd_array_mode = Cpt(EpicsSignal, "NDArrayMode") nd_array_mode = Cpt(EpicsSignal, "NDArrayMode")
# HDF5
def __init__( def __init__(
self, self,
prefix="", prefix="",
@ -117,6 +138,18 @@ class FalconCsaxs(Device):
sim_mode=False, sim_mode=False,
**kwargs, **kwargs,
): ):
"""Initialize Falcon detector
Args:
#TODO add here the parameters for kind, read_attrs, configuration_attrs, parent
prefix (str): PV prefix ("X12SA-SITORO:)
name (str): 'falcon'
kind (str):
read_attrs (list):
configuration_attrs (list):
parent (object):
device_manager (object): BEC device manager
sim_mode (bool): simulation mode to start the detector without BEC, e.g. from ipython shell
"""
super().__init__( super().__init__(
prefix=prefix, prefix=prefix,
name=name, name=name,
@ -130,7 +163,8 @@ class FalconCsaxs(Device):
raise FalconError("Add DeviceManager to initialization or init with sim_mode=True") raise FalconError("Add DeviceManager to initialization or init with sim_mode=True")
self._stopped = False self._stopped = False
self.name = name self.name = name
self.wait_for_connection() # Make sure to be connected before talking to PVs self.wait_for_connection()
# Spin up connections for simulation or BEC mode
if not sim_mode: if not sim_mode:
from bec_lib.core.bec_service import SERVICE_CONFIG from bec_lib.core.bec_service import SERVICE_CONFIG
@ -138,37 +172,60 @@ class FalconCsaxs(Device):
self._producer = self.device_manager.producer self._producer = self.device_manager.producer
self.service_cfg = SERVICE_CONFIG.config["service_config"]["file_writer"] self.service_cfg = SERVICE_CONFIG.config["service_config"]["file_writer"]
else: else:
base_path = f"/sls/X12SA/data/{self.scaninfo.username}/Data10/"
self._producer = bec_utils.MockProducer() self._producer = bec_utils.MockProducer()
self.device_manager = bec_utils.MockDeviceManager() self.device_manager = bec_utils.MockDeviceManager()
self.scaninfo = BecScaninfoMixin(device_manager, sim_mode) self.scaninfo = BecScaninfoMixin(device_manager, sim_mode)
self.scaninfo.load_scan_metadata() self.scaninfo.load_scan_metadata()
self.service_cfg = {"base_path": f"/sls/X12SA/data/{self.scaninfo.username}/Data10/"} self.service_cfg = {"base_path": base_path}
self.scaninfo = BecScaninfoMixin(device_manager, sim_mode) self.scaninfo = BecScaninfoMixin(device_manager, sim_mode)
self.scaninfo.load_scan_metadata() self.scaninfo.load_scan_metadata()
self.filewriter = FileWriterMixin(self.service_cfg) self.filewriter = FileWriterMixin(self.service_cfg)
self._init()
self.readout = 0.003 # 3 ms def _init(self) -> None:
"""Initialize detector, filewriter and set default parameters"""
self._default_parameter()
self._init_detector()
self._init_filewriter()
def _default_parameter(self) -> None:
"""Set default parameters for Falcon
readout (float): readout time in seconds
_value_pixel_per_buffer (int): number of spectra in buffer of Falcon Sitoro"""
self.readout = 1e-3
self._value_pixel_per_buffer = 20 # 16 self._value_pixel_per_buffer = 20 # 16
self._clean_up() self._stop_det()
self._init_hdf5_saving() self._stop_file_writer()
self._init_mapping_mode()
def _clean_up(self) -> None: def _stop_det(self) -> None:
"""Clean up""" """ "Stop detector"""
self.hdf5.capture.put(0)
self.stop_all.put(1) self.stop_all.put(1)
self.erase_all.put(1) self.erase_all.put(1)
def _init_hdf5_saving(self) -> None: def _stop_file_writer(self) -> None:
"""Set up hdf5 save parameters""" """ "Stop the file writer"""
self.hdf5.capture.put(0)
def _init_filewriter(self) -> None:
"""Initialize file writer for Falcon.
This includes setting variables for the HDF5 plugin (EPICS) that is used to write the data.
"""
self.hdf5.enable.put(1) # EnableCallbacks self.hdf5.enable.put(1) # EnableCallbacks
self.hdf5.xml_file_name.put("layout.xml") # Points to hardcopy of HDF5 Layout xml file self.hdf5.xml_file_name.put("layout.xml") # Points to hardcopy of HDF5 Layout xml file
self.hdf5.lazy_open.put(1) # Yes -> To be checked how to add FilePlugin_V21+ self.hdf5.lazy_open.put(
1
) # Potentially not needed, means a temp data file is created first, could be 0
self.hdf5.temp_suffix.put("") # -> To be checked how to add FilePlugin_V22+ self.hdf5.temp_suffix.put("") # -> To be checked how to add FilePlugin_V22+
self.hdf5.queue_size.put(2000) self.hdf5.queue_size.put(2000) # size of queue for spectra in the buffer
def _init_mapping_mode(self) -> None: def _init_detector(self) -> None:
"""Set up mapping mode params""" """Initialize Falcon detector.
The detector is operated in MCA mapping mode.
Parameters here affect the triggering, gating etc.
This includes also the readout chunk size and whether data is segmented into spectra in EPICS.
"""
self.collect_mode.put(1) # 1 MCA Mapping, 0 MCA Spectrum self.collect_mode.put(1) # 1 MCA Mapping, 0 MCA Spectrum
self.preset_mode.put(1) # 1 Realtime self.preset_mode.put(1) # 1 Realtime
self.input_logic_polarity.put(0) # 0 Normal, 1 Inverted self.input_logic_polarity.put(0) # 0 Normal, 1 Inverted
@ -176,24 +233,47 @@ class FalconCsaxs(Device):
self.ignore_gate.put(0) # 1 Yes, 0 No self.ignore_gate.put(0) # 1 Yes, 0 No
self.auto_pixels_per_buffer.put(0) # 0 Manual 1 Auto self.auto_pixels_per_buffer.put(0) # 0 Manual 1 Auto
self.pixels_per_buffer.put(self._value_pixel_per_buffer) # self.pixels_per_buffer.put(self._value_pixel_per_buffer) #
self.nd_array_mode.put(1) self.nd_array_mode.put(1) # Segmentation happens in EPICS
def stage(self) -> List[object]:
"""Stage command, called from BEC in preparation of a scan.
This will iniate the preparation of detector and file writer.
The following functuions are called (at least):
- _prep_file_writer
- _prep_det
- _publish_file_location
The device returns a List[object] from the Ophyd Device class.
#TODO make sure this is fullfiled
Staging not idempotent and should raise
:obj:`RedundantStaging` if staged twice without an
intermediate :meth:`~BlueskyInterface.unstage`.
"""
self._stopped = False
self.scaninfo.load_scan_metadata()
self.mokev = self.device_manager.devices.mokev.obj.read()[
self.device_manager.devices.mokev.name
]["value"]
self._prep_file_writer()
self._prep_det()
state = False
self._publish_file_location(done=state, successful=state)
self._arm_acquisition()
return super().stage()
def _prep_det(self) -> None: def _prep_det(self) -> None:
"""Prepare detector for acquisition""" """Prepare detector for acquisition"""
self.collect_mode.put(1) self.collect_mode.put(1)
self.preset_real.put(self.scaninfo.exp_time) self.preset_real.put(self.scaninfo.exp_time)
self.pixels_per_run.put(int(self.scaninfo.num_points * self.scaninfo.frames_per_trigger)) self.pixels_per_run.put(int(self.scaninfo.num_points * self.scaninfo.frames_per_trigger))
# self.auto_pixels_per_buffer.put(0)
# self.pixels_per_buffer.put(self._value_pixel_per_buffer)
def _prep_file_writer(self) -> None: def _prep_file_writer(self) -> None:
"""Prep HDF5 weriting""" """Prepare filewriting from HDF5 plugin"""
# TODO creta filename and destination path from filepath self.filepath = self.filewriter.compile_full_filename(
self.destination_path = self.filewriter.compile_full_filename(
self.scaninfo.scan_number, f"{self.name}.h5", 1000, 5, True self.scaninfo.scan_number, f"{self.name}.h5", 1000, 5, True
) )
# self.hdf5.file_path.set(self.destination_path) file_path, file_name = os.path.split(self.filepath)
file_path, file_name = os.path.split(self.destination_path)
self.hdf5.file_path.put(file_path) self.hdf5.file_path.put(file_path)
self.hdf5.file_name.put(file_name) self.hdf5.file_name.put(file_name)
self.hdf5.file_template.put(f"%s%s") self.hdf5.file_template.put(f"%s%s")
@ -202,29 +282,23 @@ class FalconCsaxs(Device):
self.hdf5.array_counter.put(0) self.hdf5.array_counter.put(0)
self.hdf5.capture.put(1) self.hdf5.capture.put(1)
def stage(self) -> List[object]: def _publish_file_location(self, done=False, successful=False) -> None:
"""stage the detector and file writer""" """Publish the filepath to REDIS
# TODO clean up needed? First msg for file writer and the second one for other listeners (e.g. radial integ)
self._stopped = False """
self.scaninfo.load_scan_metadata() pipe = self._producer.pipeline()
self.mokev = self.device_manager.devices.mokev.obj.read()[ msg = BECMessage.FileMessage(file_path=self.filepath, done=done, successful=successful)
self.device_manager.devices.mokev.name
]["value"]
logger.info("Waiting for pilatus2 to be armed")
self._prep_det()
logger.info("Pilatus2 armed")
logger.info("Waiting for pilatus2 zmq stream to be ready")
self._prep_file_writer()
logger.info("Pilatus2 zmq ready")
msg = BECMessage.FileMessage(file_path=self.destination_path, done=False)
self._producer.set_and_publish( self._producer.set_and_publish(
MessageEndpoints.public_file(self.scaninfo.scanID, self.name), MessageEndpoints.public_file(self.scaninfo.scanID, self.name), msg.dumps(), pipe=pipe
msg.dumps(),
) )
self.arm_acquisition() self._producer.set_and_publish(
logger.info("Waiting for Falcon to be armed") MessageEndpoints.file_event(self.name), msg.dumps(), pip=pipe
)
pipe.execute()
def _arm_acquisition(self) -> None:
"""Arm Falcon detector for acquisition"""
self.start_all.put(1)
while True: while True:
det_ctrl = self.state.read()[self.state.name]["value"] det_ctrl = self.state.read()[self.state.name]["value"]
if det_ctrl == int(DetectorState.ACQUIRING): if det_ctrl == int(DetectorState.ACQUIRING):
@ -232,14 +306,28 @@ class FalconCsaxs(Device):
if self._stopped == True: if self._stopped == True:
break break
time.sleep(0.005) time.sleep(0.005)
logger.info("Falcon is armed")
return super().stage()
def arm_acquisition(self) -> None: # TODO function for abstract class?
self.start_all.put(1) def trigger(self) -> DeviceStatus:
"""Trigger the detector, called from BEC."""
self._on_trigger()
return super().trigger()
# TODO function for abstract class?
def _on_trigger(self):
"""Specify action that should be taken upon trigger signal."""
pass
def unstage(self) -> List[object]: def unstage(self) -> List[object]:
logger.info("Waiting for Falcon to return from acquisition") """Unstage the device.
This method must be idempotent, multiple calls (without a new
call to 'stage') have no effect.
Functions called:
- _finished
- _publish_file_location
"""
old_scanID = self.scaninfo.scanID old_scanID = self.scaninfo.scanID
self.scaninfo.load_scan_metadata() self.scaninfo.load_scan_metadata()
logger.info(f"Old scanID: {old_scanID}, ") logger.info(f"Old scanID: {old_scanID}, ")
@ -247,20 +335,25 @@ class FalconCsaxs(Device):
self._stopped = True self._stopped = True
if self._stopped: if self._stopped:
return super().unstage() return super().unstage()
self._falcon_finished() self._finished()
self._clean_up()
state = True state = True
msg = BECMessage.FileMessage(file_path=self.destination_path, done=True, successful=state) self._publish_file_location(done=state, successful=state)
self._producer.set_and_publish(
MessageEndpoints.public_file(self.scaninfo.metadata["scanID"], self.name),
msg.dumps(),
)
self._stopped = False self._stopped = False
logger.info("Falcon done")
return super().unstage() return super().unstage()
def _falcon_finished(self): def _finished(self):
"""Function with 10s timeout""" """Check if acquisition is finished.
This function is called from unstage and stop
and will check detector and file backend status.
Timeouts after given time
Functions called:
- _stop_det
- _stop_file_writer
"""
sleep_time = 0.1
timeout = 5
timer = 0 timer = 0
while True: while True:
det_ctrl = self.state.read()[self.state.name]["value"] det_ctrl = self.state.read()[self.state.name]["value"]
@ -268,25 +361,25 @@ class FalconCsaxs(Device):
received_frames = self.dxp.current_pixel.get() received_frames = self.dxp.current_pixel.get()
written_frames = self.hdf5.array_counter.get() written_frames = self.hdf5.array_counter.get()
total_frames = int(self.scaninfo.num_points * self.scaninfo.frames_per_trigger) total_frames = int(self.scaninfo.num_points * self.scaninfo.frames_per_trigger)
# TODO if no writing was performed before # TODO Could check state of detector (det_ctrl) and file writer (writer_ctrl)
if total_frames == received_frames and total_frames == written_frames: if total_frames == received_frames and total_frames == written_frames:
break break
if self._stopped == True: if self._stopped == True:
break break
time.sleep(0.1) time.sleep(sleep_time)
timer += 0.1 timer += sleep_time
if timer > 5: if timer > timeout:
logger.info( logger.info(
f"Falcon missed a trigger: received trigger {received_frames}, send data {written_frames} from total_frames {total_frames}" f"Falcon missed a trigger: received trigger {received_frames}, send data {written_frames} from total_frames {total_frames}"
) )
break break
# raise FalconTimeoutError self._stop_det()
# f"Reached timeout with detector state {det_ctrl}, falcon state {writer_ctrl}, received trigger {received_frames} and files written {written_frames}" self._stop_file_writer()
# )
def stop(self, *, success=False) -> None: def stop(self, *, success=False) -> None:
"""Stop the scan, with camera and file writer""" """Stop the scan, with camera and file writer"""
self._clean_up() self._stop_det()
self._stop_file_writer()
super().stop(success=success) super().stop(success=success)
self._stopped = True self._stopped = True

View File

@ -1,32 +1,36 @@
import enum import enum
import json import json
import os import os
import subprocess
import time import time
from typing import List from bec_lib.core.devicemanager import DeviceStatus
import requests import requests
import numpy as np import numpy as np
from typing import List
from ophyd import EpicsSignal, EpicsSignalRO, EpicsSignalWithRBV from ophyd import EpicsSignal, EpicsSignalRO, EpicsSignalWithRBV
from ophyd import DetectorBase, Device, Staged from ophyd import DetectorBase, Device, Staged
from ophyd import ADComponent as ADCpt from ophyd import ADComponent as ADCpt
from ophyd_devices.utils import bec_utils as bec_utils
from bec_lib.core import BECMessage, MessageEndpoints from bec_lib.core import BECMessage, MessageEndpoints
from bec_lib.core.file_utils import FileWriterMixin from bec_lib.core.file_utils import FileWriterMixin
from bec_lib.core import bec_logger from bec_lib.core import bec_logger
from ophyd_devices.utils import bec_utils as bec_utils
from ophyd_devices.epics.devices.bec_scaninfo_mixin import BecScaninfoMixin from ophyd_devices.epics.devices.bec_scaninfo_mixin import BecScaninfoMixin
logger = bec_logger.logger logger = bec_logger.logger
class PilatusError(Exception): class PilatusError(Exception):
"""Base class for exceptions in this module."""
pass pass
class PilatusTimeoutError(Exception): class PilatusTimeoutError(Exception):
"""Raised when the Pilatus does not respond in time during unstage."""
pass pass
@ -38,50 +42,12 @@ class TriggerSource(int, enum.Enum):
ALGINMENT = 4 ALGINMENT = 4
class SlsDetectorCam(Device): # CamBase, FileBase): class SlsDetectorCam(Device):
# detector_type = ADCpt(EpicsSignalRO, "DetectorType_RBV") """SLS Detector Camera - Pilatus
# setting = ADCpt(EpicsSignalWithRBV, "Setting")
# beam_energy = ADCpt(EpicsSignalWithRBV, "BeamEnergy")
# enable_trimbits = ADCpt(EpicsSignalWithRBV, "Trimbits")
# bit_depth = ADCpt(EpicsSignalWithRBV, "BitDepth")
# trigger_software = ADCpt(EpicsSignal, "TriggerSoftware")
# high_voltage = ADCpt(EpicsSignalWithRBV, "HighVoltage")
# Receiver and data callback
# receiver_mode = ADCpt(EpicsSignalWithRBV, "ReceiverMode")
# receiver_stream = ADCpt(EpicsSignalWithRBV, "ReceiverStream")
# enable_data = ADCpt(EpicsSignalWithRBV, "UseDataCallback")
# missed_packets = ADCpt(EpicsSignalRO, "ReceiverMissedPackets_RBV")
# # Direct settings access
# setup_file = ADCpt(EpicsSignal, "SetupFile")
# load_setup = ADCpt(EpicsSignal, "LoadSetup")
# command = ADCpt(EpicsSignal, "Command")
# Mythen 3
# counter_mask = ADCpt(EpicsSignalWithRBV, "CounterMask")
# counter1_threshold = ADCpt(EpicsSignalWithRBV, "Counter1Threshold")
# counter2_threshold = ADCpt(EpicsSignalWithRBV, "Counter2Threshold")
# counter3_threshold = ADCpt(EpicsSignalWithRBV, "Counter3Threshold")
# gate1_delay = ADCpt(EpicsSignalWithRBV, "Gate1Delay")
# gate1_width = ADCpt(EpicsSignalWithRBV, "Gate1Width")
# gate2_delay = ADCpt(EpicsSignalWithRBV, "Gate2Delay")
# gate2_width = ADCpt(EpicsSignalWithRBV, "Gate2Width")
# gate3_delay = ADCpt(EpicsSignalWithRBV, "Gate3Delay")
# gate3_width = ADCpt(EpicsSignalWithRBV, "Gate3Width")
# Moench
# json_frame_mode = ADCpt(EpicsSignalWithRBV, "JsonFrameMode")
# json_detector_mode = ADCpt(EpicsSignalWithRBV, "JsonDetectorMode")
# Eiger9M Base class to map EPICS PVs to ophyd signals.
# delay_time = ADCpt(EpicsSignalWithRBV, "DelayTime") """
# num_frames = ADCpt(EpicsSignalWithRBV, "NumFrames")
# acquire = ADCpt(EpicsSignal, "Acquire")
# acquire_time = ADCpt(EpicsSignal, 'AcquireTime')
# detector_state = ADCpt(EpicsSignalRO, "DetectorState_RBV")
# threshold_energy = ADCpt(EpicsSignalWithRBV, "ThresholdEnergy")
# num_gates = ADCpt(EpicsSignalWithRBV, "NumGates")
# num_cycles = ADCpt(EpicsSignalWithRBV, "NumCycles")
# timing_mode = ADCpt(EpicsSignalWithRBV, "TimingMode")
# Pilatus_2 300k
num_images = ADCpt(EpicsSignalWithRBV, "NumImages") num_images = ADCpt(EpicsSignalWithRBV, "NumImages")
num_exposures = ADCpt(EpicsSignalWithRBV, "NumExposures") num_exposures = ADCpt(EpicsSignalWithRBV, "NumExposures")
delay_time = ADCpt(EpicsSignalWithRBV, "NumExposures") delay_time = ADCpt(EpicsSignalWithRBV, "NumExposures")
@ -116,6 +82,11 @@ class PilatusCsaxs(DetectorBase):
""" """
# Specify which functions are revealed to the user in BEC client
USER_ACCESS = [
"describe",
]
cam = ADCpt(SlsDetectorCam, "cam1:") cam = ADCpt(SlsDetectorCam, "cam1:")
def __init__( def __init__(
@ -131,6 +102,18 @@ class PilatusCsaxs(DetectorBase):
sim_mode=False, sim_mode=False,
**kwargs, **kwargs,
): ):
"""Initialize the Pilatus detector
Args:
#TODO add here the parameters for kind, read_attrs, configuration_attrs, parent
prefix (str): PV prefix ("X12SA-ES-PILATUS300K:)
name (str): 'pilatus_2'
kind (str):
read_attrs (list):
configuration_attrs (list):
parent (object):
device_manager (object): BEC device manager
sim_mode (bool): simulation mode to start the detector without BEC, e.g. from ipython shell
"""
super().__init__( super().__init__(
prefix=prefix, prefix=prefix,
name=name, name=name,
@ -144,7 +127,8 @@ class PilatusCsaxs(DetectorBase):
raise PilatusError("Add DeviceManager to initialization or init with sim_mode=True") raise PilatusError("Add DeviceManager to initialization or init with sim_mode=True")
self.name = name self.name = name
self.wait_for_connection() # Make sure to be connected before talking to PVs self.wait_for_connection()
# Spin up connections for simulation or BEC mode
if not sim_mode: if not sim_mode:
from bec_lib.core.bec_service import SERVICE_CONFIG from bec_lib.core.bec_service import SERVICE_CONFIG
@ -152,24 +136,39 @@ class PilatusCsaxs(DetectorBase):
self._producer = self.device_manager.producer self._producer = self.device_manager.producer
self.service_cfg = SERVICE_CONFIG.config["service_config"]["file_writer"] self.service_cfg = SERVICE_CONFIG.config["service_config"]["file_writer"]
else: else:
base_path = f"/sls/X12SA/data/{self.scaninfo.username}/Data10/"
self._producer = bec_utils.MockProducer() self._producer = bec_utils.MockProducer()
self.device_manager = bec_utils.MockDeviceManager() self.device_manager = bec_utils.MockDeviceManager()
self.scaninfo = BecScaninfoMixin(device_manager, sim_mode) self.scaninfo = BecScaninfoMixin(device_manager, sim_mode)
self.scaninfo.load_scan_metadata() self.scaninfo.load_scan_metadata()
self.service_cfg = {"base_path": f"/sls/X12SA/data/{self.scaninfo.username}/Data10/"} self.service_cfg = {"base_path": base_path}
self.scaninfo = BecScaninfoMixin(device_manager, sim_mode) self.scaninfo = BecScaninfoMixin(device_manager, sim_mode)
self.filepath_h5 = "" self.scaninfo.load_scan_metadata()
self.filewriter = FileWriterMixin(self.service_cfg) self.filewriter = FileWriterMixin(self.service_cfg)
self.readout = 1e-3 # 3 ms self._init()
# TODO maybe needed def _init(self) -> None:
# self._close_file_writer() """Initialize detector, filewriter and set default parameters"""
self._default_parameter()
self._init_detector()
self._init_filewriter()
def _get_current_scan_msg(self) -> BECMessage.ScanStatusMessage: def _default_parameter(self) -> None:
msg = self.device_manager.producer.get(MessageEndpoints.scan_status()) """Set default parameters for Pilatus300k detector
return BECMessage.ScanStatusMessage.loads(msg) readout (float): readout time in seconds
"""
self.reduce_readout = 1e-3
def _init_detector(self) -> None:
"""Initialize the detector"""
# TODO add check if detector is running
pass
def _init_filewriter(self) -> None:
"""Initialize the file writer"""
# TODO in case the data backend is rewritten, add check if it is ready!
pass
def _prep_det(self) -> None: def _prep_det(self) -> None:
# TODO slow reaction, seemed to have timeout. # TODO slow reaction, seemed to have timeout.
@ -181,10 +180,10 @@ class PilatusCsaxs(DetectorBase):
factor = 1 factor = 1
if self.cam.threshold_energy._metadata["units"] == "eV": if self.cam.threshold_energy._metadata["units"] == "eV":
factor = 1000 factor = 1000
setp_energy = int(self.mokev * factor) setpoint = int(self.mokev * factor)
threshold = self.cam.threshold_energy.read()[self.cam.threshold_energy.name]["value"] threshold = self.cam.threshold_energy.read()[self.cam.threshold_energy.name]["value"]
if not np.isclose(setp_energy / 2, threshold, rtol=0.05): if not np.isclose(setpoint / 2, threshold, rtol=0.05):
self.cam.threshold_energy.set(setp_energy / 2) self.cam.threshold_energy.set(setpoint / 2)
def _set_acquisition_params(self) -> None: def _set_acquisition_params(self) -> None:
"""set acquisition parameters on the detector""" """set acquisition parameters on the detector"""
@ -220,7 +219,7 @@ class PilatusCsaxs(DetectorBase):
self._stop_file_writer() self._stop_file_writer()
time.sleep(0.1) time.sleep(0.1)
self.filepath_h5 = self.filewriter.compile_full_filename( self.filepath_raw = self.filewriter.compile_full_filename(
self.scaninfo.scan_number, "pilatus_2.h5", 1000, 5, True self.scaninfo.scan_number, "pilatus_2.h5", 1000, 5, True
) )
self.cam.file_path.put(f"/dev/shm/zmq/") self.cam.file_path.put(f"/dev/shm/zmq/")
@ -232,19 +231,19 @@ class PilatusCsaxs(DetectorBase):
# compile filename # compile filename
basepath = f"/sls/X12SA/data/{self.scaninfo.username}/Data10/pilatus_2/" basepath = f"/sls/X12SA/data/{self.scaninfo.username}/Data10/pilatus_2/"
self.destination_path = os.path.join( self.filepath = os.path.join(
basepath, basepath,
self.filewriter.get_scan_directory(self.scaninfo.scan_number, 1000, 5), self.filewriter.get_scan_directory(self.scaninfo.scan_number, 1000, 5),
) )
# Make directory if needed # Make directory if needed
os.makedirs(self.destination_path, exist_ok=True) os.makedirs(self.filepath, exist_ok=True)
data_msg = { data_msg = {
"source": [ "source": [
{ {
"searchPath": "/", "searchPath": "/",
"searchPattern": "glob:*.cbf", "searchPattern": "glob:*.cbf",
"destinationPath": self.destination_path, "destinationPath": self.filepath,
} }
] ]
} }
@ -339,32 +338,76 @@ class PilatusCsaxs(DetectorBase):
res.raise_for_status() res.raise_for_status()
def stage(self) -> List[object]: def stage(self) -> List[object]:
"""stage the detector and file writer""" """Stage command, called from BEC in preparation of a scan.
self._acquisition_done = False This will iniate the preparation of detector and file writer.
The following functuions are called:
- _prep_file_writer
- _prep_det
- _publish_file_location
The device returns a List[object] from the Ophyd Device class.
#TODO make sure this is fullfiled
Staging not idempotent and should raise
:obj:`RedundantStaging` if staged twice without an
intermediate :meth:`~BlueskyInterface.unstage`.
"""
self._stopped = False self._stopped = False
self.scaninfo.load_scan_metadata() self.scaninfo.load_scan_metadata()
self.mokev = self.device_manager.devices.mokev.obj.read()[ self.mokev = self.device_manager.devices.mokev.obj.read()[
self.device_manager.devices.mokev.name self.device_manager.devices.mokev.name
]["value"] ]["value"]
# TODO refactor logger.info to DEBUG mode?
logger.info("Waiting for pilatus2 to be armed")
self._prep_det()
logger.info("Pilatus2 armed")
logger.info("Waiting for pilatus2 zmq stream to be ready")
self._prep_file_writer() self._prep_file_writer()
logger.info("Pilatus2 zmq ready") self._prep_det()
msg = BECMessage.FileMessage( state = False
file_path=self.filepath_h5, done=False, metadata={"input_path": self.destination_path} self._publish_file_location(done=state, successful=state)
)
return super().stage() return super().stage()
# TODO might be useful for base class
def pre_scan(self) -> None: def pre_scan(self) -> None:
""" " Pre_scan gets executed right before"""
self._arm_acquisition()
def _arm_acquisition(self) -> None:
self.acquire() self.acquire()
def _publish_file_location(self, done=False, successful=False) -> None:
"""Publish the filepath to REDIS
First msg for file writer and the second one for other listeners (e.g. radial integ)
"""
pipe = self._producer.pipeline()
msg = BECMessage.FileMessage(file_path=self.filepath, done=done, successful=successful)
self._producer.set_and_publish(
MessageEndpoints.public_file(self.scaninfo.scanID, self.name), msg.dumps(), pipe=pipe
)
self._producer.set_and_publish(
MessageEndpoints.file_event(self.name), msg.dumps(), pip=pipe
)
pipe.execute()
# TODO function for abstract class?
def trigger(self) -> DeviceStatus:
"""Trigger the detector, called from BEC."""
self._on_trigger()
return super().trigger()
# TODO function for abstract class?
def _on_trigger(self):
"""Specify action that should be taken upon trigger signal."""
pass
def unstage(self) -> List[object]: def unstage(self) -> List[object]:
"""unstage the detector and file writer""" """Unstage the device.
# Reset to software trigger
logger.info("Waiting for Pilatus to return from acquisition") This method must be idempotent, multiple calls (without a new
call to 'stage') have no effect.
Functions called:
- _finished
- _publish_file_location
"""
old_scanID = self.scaninfo.scanID old_scanID = self.scaninfo.scanID
self.scaninfo.load_scan_metadata() self.scaninfo.load_scan_metadata()
logger.info(f"Old scanID: {old_scanID}, ") logger.info(f"Old scanID: {old_scanID}, ")
@ -372,27 +415,37 @@ class PilatusCsaxs(DetectorBase):
self._stopped = True self._stopped = True
if self._stopped: if self._stopped:
return super().unstage() return super().unstage()
self._pilatus_finished() self._finished()
msg = BECMessage.FileMessage( state = True
file_path=self.filepath_h5, done=True, metadata={"input_path": self.destination_path} self._publish_file_location(done=state, successful=state)
) self._start_h5converter(done=state)
self._producer.set_and_publish(
MessageEndpoints.public_file(self.scaninfo.scanID, self.name),
msg.dumps(),
)
self._producer.set_and_publish(
MessageEndpoints.file_event(self.name),
msg.dumps(),
)
logger.info("Pilatus2 done")
return super().unstage() return super().unstage()
def _pilatus_finished(self) -> None: def _start_h5converter(self, done=False) -> None:
# time.sleep(2) """Start the h5converter"""
msg = BECMessage.FileMessage(
file_path=self.filepath_raw, done=done, metadata={"input_path": self.filepath}
)
self._producer.set_and_publish(
MessageEndpoints.public_file(self.scaninfo.scanID, self.name), msg.dumps()
)
def _finished(self) -> None:
"""Check if acquisition is finished.
This function is called from unstage and stop
and will check detector and file backend status.
Timeouts after given time
Functions called:
- _stop_det
- _stop_file_writer
"""
while True: while True:
if self.device_manager.devices.mcs.obj._staged != Staged.yes: if self.device_manager.devices.mcs.obj._staged != Staged.yes:
break break
time.sleep(0.1) time.sleep(0.1)
# TODO implement a waiting function or not
# time.sleep(2) # time.sleep(2)
# timer = 0 # timer = 0
# while True: # while True:
@ -412,7 +465,9 @@ class PilatusCsaxs(DetectorBase):
# # f"Pilatus timeout with detector state {self.cam.acquire.get()} and camserver return status: {rtr} " # # f"Pilatus timeout with detector state {self.cam.acquire.get()} and camserver return status: {rtr} "
# # ) # # )
self._stop_det()
self._stop_file_writer() self._stop_file_writer()
# TODO explore if sleep is needed
time.sleep(0.5) time.sleep(0.5)
self._close_file_writer() self._close_file_writer()
@ -421,15 +476,18 @@ class PilatusCsaxs(DetectorBase):
or arm the detector in hardware of the detector or arm the detector in hardware of the detector
""" """
self.cam.acquire.put(1) self.cam.acquire.put(1)
# TODO check if sleep of 1s is needed, could be that less is enough
time.sleep(1) time.sleep(1)
def _stop_det(self) -> None:
"""Stop the detector"""
self.cam.acquire.put(0)
def stop(self, *, success=False) -> None: def stop(self, *, success=False) -> None:
"""Stop the scan, with camera and file writer""" """Stop the scan, with camera and file writer"""
self.cam.acquire.put(0) self._stop_det()
self._stop_file_writer() self._stop_file_writer()
# TODO maybe needed
self._close_file_writer() self._close_file_writer()
# self.unstage()
super().stop(success=success) super().stop(success=success)
self._stopped = True self._stopped = True