refactor: refactoring of eiger9m class, alsmost compatible with pilatus

This commit is contained in:
2023-08-21 21:51:36 +02:00
parent f3e4575359
commit 287c667621

View File

@ -1,9 +1,5 @@
import json
import os
import requests
import numpy as np
from typing import List from typing import List
import numpy as np
from ophyd import EpicsSignal, EpicsSignalRO, EpicsSignalWithRBV from ophyd import EpicsSignal, EpicsSignalRO, EpicsSignalWithRBV
from ophyd import CamBase, DetectorBase from ophyd import CamBase, DetectorBase
@ -20,7 +16,7 @@ from std_daq_client import StdDaqClient
logger = bec_logger.logger logger = bec_logger.logger
class slsDetectorCam(CamBase, FileBase): class SlsDetectorCam(CamBase, FileBase):
detector_type = ADCpt(EpicsSignalRO, "DetectorType_RBV") detector_type = ADCpt(EpicsSignalRO, "DetectorType_RBV")
setting = ADCpt(EpicsSignalWithRBV, "Setting") setting = ADCpt(EpicsSignalWithRBV, "Setting")
delay_time = ADCpt(EpicsSignalWithRBV, "DelayTime") delay_time = ADCpt(EpicsSignalWithRBV, "DelayTime")
@ -58,14 +54,19 @@ class slsDetectorCam(CamBase, FileBase):
json_detector_mode = ADCpt(EpicsSignalWithRBV, "JsonDetectorMode") json_detector_mode = ADCpt(EpicsSignalWithRBV, "JsonDetectorMode")
# TODO refactor class -> move away from DetectorBase and PilatusDetectorCamEx class to Device. -> this will be cleaner
class Eiger9mCsaxs(DetectorBase): class Eiger9mCsaxs(DetectorBase):
"""Eiger 9M detector for CSAXS
Parent class: DetectorBase
Device class: SlsDetectorCam
Attributes:
name str: 'eiger'
prefix (str): PV prefix (X12SA-ES-EIGER9M:)
""" """
in device config, device_access needs to be set true to inject the device manager cam = ADCpt(SlsDetectorCam, "cam1:")
"""
cam = ADCpt(slsDetectorCam, "cam1:")
def __init__( def __init__(
self, self,
@ -90,16 +91,16 @@ class Eiger9mCsaxs(DetectorBase):
) )
self.device_manager = device_manager self.device_manager = device_manager
self.name = name self.name = name
self.username = "e21206" # self.username = "e21206"
# TODO once running from BEC
# self.username = self.device_manager.producer.get(MessageEndpoints.account()).decode() # self.username = self.device_manager.producer.get(MessageEndpoints.account()).decode()
self._init_eiger9m() self._init_eiger9m()
self._init_standard_daq() self._init_standard_daq()
self.service_cfg = {"base_path": f"/sls/X12SA/data/{self.username}/Data10/data/"} self.service_cfg = {"base_path": f"/sls/X12SA/data/{self.username}/Data10/data/"}
self.filewriter = FileWriterMixin(self.service_cfg) # TODO check FileWriterMixin if generix self.filewriter = FileWriterMixin(self.service_cfg)
self._producer = RedisConnector(["localhost:6379"]).producer() self._producer = RedisConnector(["localhost:6379"]).producer()
# self.num_frames = 0
self.readout = 0.003 # 3 ms self.readout = 0.003 # 3 ms
self.triggermode = 0 # 0 : internal, scan must set this if hardware triggered self.triggermode = 0 # 0 : internal, scan must set this if hardware triggered
@ -115,73 +116,93 @@ class Eiger9mCsaxs(DetectorBase):
msg = self.device_manager.producer.get(MessageEndpoints.scan_status()) msg = self.device_manager.producer.get(MessageEndpoints.scan_status())
return BECMessage.ScanStatusMessage.loads(msg) return BECMessage.ScanStatusMessage.loads(msg)
def _prepare_standard_daq(self) -> None: def _load_scan_metadata(self) -> None:
self.std_client.start_writer_async({"output_file": filename, "n_images": self.num_frames}) scan_msg = self._get_current_scan_msg()
self.metadata = {
"scanID": scan_msg.content["scanID"],
"RID": scan_msg.content["info"]["RID"],
"queueID": scan_msg.content["info"]["queueID"],
}
self.scanID = scan_msg.content["scanID"]
self.scan_number = scan_msg.content["info"]["scan_number"]
self.exp_time = scan_msg.content["info"]["exp_time"]
self.num_frames = scan_msg.content["info"]["num_points"]
self.username = self.device_manager.producer.get(MessageEndpoints.account()).decode()
# self.triggermode = scan_msg.content["info"]["trigger_mode"]
self.filepath = self.filewriter.compile_full_filename(
self.scan_number, "eiger", 1000, 5, True
)
def _prep_det(self) -> None:
self._set_det_threshold()
self._set_acquisition_params()
def _set_det_threshold(self) -> None:
# threshold_energy PV exists on Eiger 9M?
threshold = self.cam.threshold_energy.read()[self.cam.threshold_energy.name]["value"]
if not np.isclose(self.mokev / 2, threshold, rtol=0.05):
self.cam.threshold_energy.set(self.mokev / 2)
def _set_acquisition_params(self) -> None:
self.cam.acquire_time.set(self.exp_time)
self.cam.acquire_period.set(self.exp_time + self.readout)
self.cam.num_images.set(self.num_frames)
self.cam.num_exposures.set(1)
# trigger_mode vs timing mode ??!!
self.cam.timing_mode.set(self.triggermode)
def _prep_file_writer(self) -> None:
self.std_client.start_writer_async(
{"output_file": self.filepath, "n_images": self.num_frames}
)
def _close_file_writer(self) -> None:
pass
def stage(self) -> List[object]: def stage(self) -> List[object]:
# TODO remove """stage the detector and file writer"""
# scan_msg = self._get_current_scan_msg() # TODO remove once running from BEC
# self.metadata = { # self._load_scan_metadata()
# "scanID": scan_msg.content["scanID"], self.scan_number = 10
# "RID": scan_msg.content["info"]["RID"], self.exp_time = 0.5
# "queueID": scan_msg.content["info"]["queueID"], self.num_frames = 3
# } self.mokev = 12
self.scan_number = 10 # scan_msg.content["info"]["scan_number"] self.triggermode = 0
self.exp_time = 0.5 # scan_msg.content["info"]["exp_time"]
self.num_frames = 3 # scan_msg.content["info"]["num_points"]
self.mokev = 12 # self.device_manager.devices.mokev.read()['mokev']['value']
# TODO remove
# self.username = self.device_manager.producer.get(MessageEndpoints.account()).decode()
# set pilatus threshold self._prep_det()
self._set_eiger_threshold() self._prep_file_writer()
self._set_acquisition_params(
exp_time=self.exp_time, msg = BECMessage.FileMessage(file_path=self.filepath, done=False)
readout=self.readout, self._producer.set_and_publish(
num_frames=self.num_frames, MessageEndpoints.public_file(self.scanID, "eiger9m"),
triggermode=self.triggermode, msg.dumps(),
) )
return super().stage() return super().stage()
def unstage(self) -> List[object]: def unstage(self) -> List[object]:
"""unstage the detector and file writer"""
self.timing_mode = 0
self._close_file_writer()
# TODO file succesfully written?
state = True
msg = BECMessage.FileMessage(file_path=self.filepath, done=True, successful=state)
self.producer.set_and_publish(
MessageEndpoints.public_file(self.metadata["scanID"], self.name),
msg.dumps(),
)
return super().unstage() return super().unstage()
def _set_eiger_threshold(self) -> None:
pil_threshold = self.cam.threshold_energy.read()[self.cam.threshold_energy.name]["value"]
if not np.isclose(self.mokev / 2, pil_threshold, rtol=0.05):
self.cam.threshold_energy.set(self.mokev / 2)
def _set_acquisition_params(
self, exp_time: float, readout: float, num_frames: int, triggermode: int
) -> None:
"""set acquisition parameters on the detector
Args:
exp_time (float): exposure time
readout (float): readout time
num_frames (int): images per scan
triggermode (int):
0 Internal
1 Ext. Enable
2 Ext. Trigger
3 Mult. Trigger
4 Alignment
Returns:
None
"""
self.cam.acquire_time.set(exp_time)
self.cam.acquire_period.set(exp_time + readout)
self.cam.num_images.set(num_frames)
self.cam.num_exposures.set(1)
# trigger_mode exists in baseclass
self.cam.timing_mode.set(triggermode)
def acquire(self) -> None: def acquire(self) -> None:
"""Start acquisition in software trigger mode,
or arm the detector in hardware of the detector
"""
self.cam.acquire.set(1) self.cam.acquire.set(1)
def stop(self, *, success=False) -> None: def stop(self, *, success=False) -> None:
"""Stop the scan, with camera and file writer"""
self.cam.acquire.set(0) self.cam.acquire.set(0)
self.std_client.stop_writer()
self.unstage() self.unstage()
super().stop(success=success) super().stop(success=success)
self._stopped = True self._stopped = True