refactor: renaming

This commit is contained in:
appel_c 2023-11-03 16:19:36 +01:00
parent 5d02a13ad0
commit a80d13ae66
2 changed files with 160 additions and 102 deletions

View File

@ -26,6 +26,6 @@ from ophyd.quadem import QuadEM
from .epics_motor_ex import EpicsMotorEx
from .mcs_csaxs import McsCsaxs
from .eiger9m_csaxs import Eiger9McSAXS
from .pilatus_csaxs import PilatusCsaxs
from .pilatus_csaxs import PilatuscSAXS
from .falcon_csaxs import FalconCsaxs
from .DelayGeneratorDG645 import DelayGeneratorDG645

View File

@ -1,36 +1,40 @@
import enum
import json
import os
import subprocess
import time
from typing import List
from bec_lib.core.devicemanager import DeviceStatus
import requests
import numpy as np
from typing import List
from ophyd import EpicsSignal, EpicsSignalRO, EpicsSignalWithRBV
from ophyd import DetectorBase, Device, Staged
from ophyd import ADComponent as ADCpt
from ophyd_devices.utils import bec_utils as bec_utils
from bec_lib.core import BECMessage, MessageEndpoints
from bec_lib.core.file_utils import FileWriterMixin
from bec_lib.core import bec_logger
from ophyd_devices.utils import bec_utils as bec_utils
from ophyd_devices.epics.devices.bec_scaninfo_mixin import BecScaninfoMixin
logger = bec_logger.logger
class PilatusError(Exception):
"""Base class for exceptions in this module."""
pass
class PilatusTimeoutError(Exception):
class PilatusTimeoutError(PilatusError):
"""Raised when the Pilatus does not respond in time during unstage."""
pass
class TriggerSource(int, enum.Enum):
class TriggerSource(enum.IntEnum):
INTERNAL = 0
EXT_ENABLE = 1
EXT_TRIGGER = 2
@ -38,50 +42,12 @@ class TriggerSource(int, enum.Enum):
ALGINMENT = 4
class SlsDetectorCam(Device): # CamBase, FileBase):
# detector_type = ADCpt(EpicsSignalRO, "DetectorType_RBV")
# setting = ADCpt(EpicsSignalWithRBV, "Setting")
# beam_energy = ADCpt(EpicsSignalWithRBV, "BeamEnergy")
# enable_trimbits = ADCpt(EpicsSignalWithRBV, "Trimbits")
# bit_depth = ADCpt(EpicsSignalWithRBV, "BitDepth")
# trigger_software = ADCpt(EpicsSignal, "TriggerSoftware")
# high_voltage = ADCpt(EpicsSignalWithRBV, "HighVoltage")
# Receiver and data callback
# receiver_mode = ADCpt(EpicsSignalWithRBV, "ReceiverMode")
# receiver_stream = ADCpt(EpicsSignalWithRBV, "ReceiverStream")
# enable_data = ADCpt(EpicsSignalWithRBV, "UseDataCallback")
# missed_packets = ADCpt(EpicsSignalRO, "ReceiverMissedPackets_RBV")
# # Direct settings access
# setup_file = ADCpt(EpicsSignal, "SetupFile")
# load_setup = ADCpt(EpicsSignal, "LoadSetup")
# command = ADCpt(EpicsSignal, "Command")
# Mythen 3
# counter_mask = ADCpt(EpicsSignalWithRBV, "CounterMask")
# counter1_threshold = ADCpt(EpicsSignalWithRBV, "Counter1Threshold")
# counter2_threshold = ADCpt(EpicsSignalWithRBV, "Counter2Threshold")
# counter3_threshold = ADCpt(EpicsSignalWithRBV, "Counter3Threshold")
# gate1_delay = ADCpt(EpicsSignalWithRBV, "Gate1Delay")
# gate1_width = ADCpt(EpicsSignalWithRBV, "Gate1Width")
# gate2_delay = ADCpt(EpicsSignalWithRBV, "Gate2Delay")
# gate2_width = ADCpt(EpicsSignalWithRBV, "Gate2Width")
# gate3_delay = ADCpt(EpicsSignalWithRBV, "Gate3Delay")
# gate3_width = ADCpt(EpicsSignalWithRBV, "Gate3Width")
# Moench
# json_frame_mode = ADCpt(EpicsSignalWithRBV, "JsonFrameMode")
# json_detector_mode = ADCpt(EpicsSignalWithRBV, "JsonDetectorMode")
class SLSDetectorCam(Device):
"""SLS Detector Camera - Pilatus
# Eiger9M
# delay_time = ADCpt(EpicsSignalWithRBV, "DelayTime")
# num_frames = ADCpt(EpicsSignalWithRBV, "NumFrames")
# acquire = ADCpt(EpicsSignal, "Acquire")
# acquire_time = ADCpt(EpicsSignal, 'AcquireTime')
# detector_state = ADCpt(EpicsSignalRO, "DetectorState_RBV")
# threshold_energy = ADCpt(EpicsSignalWithRBV, "ThresholdEnergy")
# num_gates = ADCpt(EpicsSignalWithRBV, "NumGates")
# num_cycles = ADCpt(EpicsSignalWithRBV, "NumCycles")
# timing_mode = ADCpt(EpicsSignalWithRBV, "TimingMode")
Base class to map EPICS PVs to ophyd signals.
"""
# Pilatus_2 300k
num_images = ADCpt(EpicsSignalWithRBV, "NumImages")
num_exposures = ADCpt(EpicsSignalWithRBV, "NumExposures")
delay_time = ADCpt(EpicsSignalWithRBV, "NumExposures")
@ -104,7 +70,7 @@ class SlsDetectorCam(Device): # CamBase, FileBase):
gap_fill = ADCpt(EpicsSignalWithRBV, "GapFill")
class PilatusCsaxs(DetectorBase):
class PilatuscSAXS(DetectorBase):
"""Pilatus_2 300k detector for CSAXS
Parent class: DetectorBase
@ -116,7 +82,12 @@ class PilatusCsaxs(DetectorBase):
"""
cam = ADCpt(SlsDetectorCam, "cam1:")
# Specify which functions are revealed to the user in BEC client
USER_ACCESS = [
"describe",
]
cam = ADCpt(SLSDetectorCam, "cam1:")
def __init__(
self,
@ -131,6 +102,18 @@ class PilatusCsaxs(DetectorBase):
sim_mode=False,
**kwargs,
):
"""Initialize the Pilatus detector
Args:
#TODO add here the parameters for kind, read_attrs, configuration_attrs, parent
prefix (str): PV prefix ("X12SA-ES-PILATUS300K:)
name (str): 'pilatus_2'
kind (str):
read_attrs (list):
configuration_attrs (list):
parent (object):
device_manager (object): BEC device manager
sim_mode (bool): simulation mode to start the detector without BEC, e.g. from ipython shell
"""
super().__init__(
prefix=prefix,
name=name,
@ -144,7 +127,8 @@ class PilatusCsaxs(DetectorBase):
raise PilatusError("Add DeviceManager to initialization or init with sim_mode=True")
self.name = name
self.wait_for_connection() # Make sure to be connected before talking to PVs
self.wait_for_connection()
# Spin up connections for simulation or BEC mode
if not sim_mode:
from bec_lib.core.bec_service import SERVICE_CONFIG
@ -152,24 +136,39 @@ class PilatusCsaxs(DetectorBase):
self._producer = self.device_manager.producer
self.service_cfg = SERVICE_CONFIG.config["service_config"]["file_writer"]
else:
base_path = f"/sls/X12SA/data/{self.scaninfo.username}/Data10/"
self._producer = bec_utils.MockProducer()
self.device_manager = bec_utils.MockDeviceManager()
self.scaninfo = BecScaninfoMixin(device_manager, sim_mode)
self.scaninfo.load_scan_metadata()
self.service_cfg = {"base_path": f"/sls/X12SA/data/{self.scaninfo.username}/Data10/"}
self.service_cfg = {"base_path": base_path}
self.scaninfo = BecScaninfoMixin(device_manager, sim_mode)
self.filepath_h5 = ""
self.scaninfo.load_scan_metadata()
self.filewriter = FileWriterMixin(self.service_cfg)
self.readout = 1e-3 # 3 ms
self._init()
# TODO maybe needed
# self._close_file_writer()
def _init(self) -> None:
"""Initialize detector, filewriter and set default parameters"""
self._default_parameter()
self._init_detector()
self._init_filewriter()
def _get_current_scan_msg(self) -> BECMessage.ScanStatusMessage:
msg = self.device_manager.producer.get(MessageEndpoints.scan_status())
return BECMessage.ScanStatusMessage.loads(msg)
def _default_parameter(self) -> None:
"""Set default parameters for Pilatus300k detector
readout (float): readout time in seconds
"""
self.reduce_readout = 1e-3
def _init_detector(self) -> None:
"""Initialize the detector"""
# TODO add check if detector is running
pass
def _init_filewriter(self) -> None:
"""Initialize the file writer"""
# TODO in case the data backend is rewritten, add check if it is ready!
pass
def _prep_det(self) -> None:
# TODO slow reaction, seemed to have timeout.
@ -181,10 +180,10 @@ class PilatusCsaxs(DetectorBase):
factor = 1
if self.cam.threshold_energy._metadata["units"] == "eV":
factor = 1000
setp_energy = int(self.mokev * factor)
setpoint = int(self.mokev * factor)
threshold = self.cam.threshold_energy.read()[self.cam.threshold_energy.name]["value"]
if not np.isclose(setp_energy / 2, threshold, rtol=0.05):
self.cam.threshold_energy.set(setp_energy / 2)
if not np.isclose(setpoint / 2, threshold, rtol=0.05):
self.cam.threshold_energy.set(setpoint / 2)
def _set_acquisition_params(self) -> None:
"""set acquisition parameters on the detector"""
@ -202,7 +201,7 @@ class PilatusCsaxs(DetectorBase):
MULTI_TRIGGER = 3
ALGINMENT = 4
"""
value = int(trigger_source)
value = trigger_source
self.cam.trigger_mode.set(value)
def _prep_file_writer(self) -> None:
@ -220,7 +219,7 @@ class PilatusCsaxs(DetectorBase):
self._stop_file_writer()
time.sleep(0.1)
self.filepath_h5 = self.filewriter.compile_full_filename(
self.filepath_raw = self.filewriter.compile_full_filename(
self.scaninfo.scan_number, "pilatus_2.h5", 1000, 5, True
)
self.cam.file_path.put(f"/dev/shm/zmq/")
@ -232,19 +231,19 @@ class PilatusCsaxs(DetectorBase):
# compile filename
basepath = f"/sls/X12SA/data/{self.scaninfo.username}/Data10/pilatus_2/"
self.destination_path = os.path.join(
self.filepath = os.path.join(
basepath,
self.filewriter.get_scan_directory(self.scaninfo.scan_number, 1000, 5),
)
# Make directory if needed
os.makedirs(self.destination_path, exist_ok=True)
os.makedirs(self.filepath, exist_ok=True)
data_msg = {
"source": [
{
"searchPath": "/",
"searchPattern": "glob:*.cbf",
"destinationPath": self.destination_path,
"destinationPath": self.filepath,
}
]
}
@ -339,32 +338,76 @@ class PilatusCsaxs(DetectorBase):
res.raise_for_status()
def stage(self) -> List[object]:
"""stage the detector and file writer"""
self._acquisition_done = False
"""Stage command, called from BEC in preparation of a scan.
This will iniate the preparation of detector and file writer.
The following functuions are called:
- _prep_file_writer
- _prep_det
- _publish_file_location
The device returns a List[object] from the Ophyd Device class.
#TODO make sure this is fullfiled
Staging not idempotent and should raise
:obj:`RedundantStaging` if staged twice without an
intermediate :meth:`~BlueskyInterface.unstage`.
"""
self._stopped = False
self.scaninfo.load_scan_metadata()
self.mokev = self.device_manager.devices.mokev.obj.read()[
self.device_manager.devices.mokev.name
]["value"]
logger.info("Waiting for pilatus2 to be armed")
self._prep_det()
logger.info("Pilatus2 armed")
logger.info("Waiting for pilatus2 zmq stream to be ready")
# TODO refactor logger.info to DEBUG mode?
self._prep_file_writer()
logger.info("Pilatus2 zmq ready")
msg = BECMessage.FileMessage(
file_path=self.filepath_h5, done=False, metadata={"input_path": self.destination_path}
)
self._prep_det()
state = False
self._publish_file_location(done=state, successful=state)
return super().stage()
# TODO might be useful for base class
def pre_scan(self) -> None:
""" " Pre_scan gets executed right before"""
self._arm_acquisition()
def _arm_acquisition(self) -> None:
self.acquire()
def _publish_file_location(self, done=False, successful=False) -> None:
"""Publish the filepath to REDIS
First msg for file writer and the second one for other listeners (e.g. radial integ)
"""
pipe = self._producer.pipeline()
msg = BECMessage.FileMessage(file_path=self.filepath, done=done, successful=successful)
self._producer.set_and_publish(
MessageEndpoints.public_file(self.scaninfo.scanID, self.name), msg.dumps(), pipe=pipe
)
self._producer.set_and_publish(
MessageEndpoints.file_event(self.name), msg.dumps(), pip=pipe
)
pipe.execute()
# TODO function for abstract class?
def trigger(self) -> DeviceStatus:
"""Trigger the detector, called from BEC."""
self._on_trigger()
return super().trigger()
# TODO function for abstract class?
def _on_trigger(self):
"""Specify action that should be taken upon trigger signal."""
pass
def unstage(self) -> List[object]:
"""unstage the detector and file writer"""
# Reset to software trigger
logger.info("Waiting for Pilatus to return from acquisition")
"""Unstage the device.
This method must be idempotent, multiple calls (without a new
call to 'stage') have no effect.
Functions called:
- _finished
- _publish_file_location
"""
old_scanID = self.scaninfo.scanID
self.scaninfo.load_scan_metadata()
logger.info(f"Old scanID: {old_scanID}, ")
@ -372,27 +415,37 @@ class PilatusCsaxs(DetectorBase):
self._stopped = True
if self._stopped:
return super().unstage()
self._pilatus_finished()
msg = BECMessage.FileMessage(
file_path=self.filepath_h5, done=True, metadata={"input_path": self.destination_path}
)
self._producer.set_and_publish(
MessageEndpoints.public_file(self.scaninfo.scanID, self.name),
msg.dumps(),
)
self._producer.set_and_publish(
MessageEndpoints.file_event(self.name),
msg.dumps(),
)
logger.info("Pilatus2 done")
self._finished()
state = True
self._publish_file_location(done=state, successful=state)
self._start_h5converter(done=state)
return super().unstage()
def _pilatus_finished(self) -> None:
# time.sleep(2)
def _start_h5converter(self, done=False) -> None:
"""Start the h5converter"""
msg = BECMessage.FileMessage(
file_path=self.filepath_raw, done=done, metadata={"input_path": self.filepath}
)
self._producer.set_and_publish(
MessageEndpoints.public_file(self.scaninfo.scanID, self.name), msg.dumps()
)
def _finished(self) -> None:
"""Check if acquisition is finished.
This function is called from unstage and stop
and will check detector and file backend status.
Timeouts after given time
Functions called:
- _stop_det
- _stop_file_writer
"""
while True:
if self.device_manager.devices.mcs.obj._staged != Staged.yes:
break
time.sleep(0.1)
# TODO implement a waiting function or not
# time.sleep(2)
# timer = 0
# while True:
@ -412,7 +465,9 @@ class PilatusCsaxs(DetectorBase):
# # f"Pilatus timeout with detector state {self.cam.acquire.get()} and camserver return status: {rtr} "
# # )
self._stop_det()
self._stop_file_writer()
# TODO explore if sleep is needed
time.sleep(0.5)
self._close_file_writer()
@ -421,19 +476,22 @@ class PilatusCsaxs(DetectorBase):
or arm the detector in hardware of the detector
"""
self.cam.acquire.put(1)
# TODO check if sleep of 1s is needed, could be that less is enough
time.sleep(1)
def _stop_det(self) -> None:
"""Stop the detector"""
self.cam.acquire.put(0)
def stop(self, *, success=False) -> None:
"""Stop the scan, with camera and file writer"""
self.cam.acquire.put(0)
self._stop_det()
self._stop_file_writer()
# TODO maybe needed
self._close_file_writer()
# self.unstage()
super().stop(success=success)
self._stopped = True
# Automatically connect to test environmenr if directly invoked
if __name__ == "__main__":
pilatus_2 = PilatusCsaxs(name="pilatus_2", prefix="X12SA-ES-PILATUS300K:", sim_mode=True)
pilatus_2 = PilatuscSAXS(name="pilatus_2", prefix="X12SA-ES-PILATUS300K:", sim_mode=True)