From 3a126976cd7cfb3f294556110d77249da6fbc99d Mon Sep 17 00:00:00 2001 From: appel_c Date: Wed, 6 Sep 2023 08:00:42 +0200 Subject: [PATCH] fix: online changes --- .../epics/devices/bec_scaninfo_mixin.py | 8 +- ophyd_devices/epics/devices/eiger9m_csaxs.py | 9 +- ophyd_devices/epics/devices/mcs_csaxs.py | 6 +- ophyd_devices/epics/devices/pilatus_csaxs.py | 241 ++++++++++++------ 4 files changed, 168 insertions(+), 96 deletions(-) diff --git a/ophyd_devices/epics/devices/bec_scaninfo_mixin.py b/ophyd_devices/epics/devices/bec_scaninfo_mixin.py index 09e1a0c..2a10b3b 100644 --- a/ophyd_devices/epics/devices/bec_scaninfo_mixin.py +++ b/ophyd_devices/epics/devices/bec_scaninfo_mixin.py @@ -13,11 +13,11 @@ class BecScaninfoMixin: "RID": "mockrid", "queueID": "mockqueuid", "scan_number": 1, - "exp_time": 26e-3, - "num_points": 9999, - "readout_time": 2e-3, + "exp_time": 12e-3, + "num_points": 500, + "readout_time": 3e-3, "scan_type": "fly", - "num_lines": 10, + "num_lines": 1, "frames_per_trigger": 1, } diff --git a/ophyd_devices/epics/devices/eiger9m_csaxs.py b/ophyd_devices/epics/devices/eiger9m_csaxs.py index 45dded0..15f208b 100644 --- a/ophyd_devices/epics/devices/eiger9m_csaxs.py +++ b/ophyd_devices/epics/devices/eiger9m_csaxs.py @@ -139,11 +139,13 @@ class Eiger9mCsaxs(DetectorBase): else: self._producer = bec_utils.MockProducer() self.device_manager = bec_utils.MockDeviceManager() + self.scaninfo = BecScaninfoMixin(device_manager, sim_mode) + self.scaninfo.load_scan_metadata() self.service_cfg = {"base_path": f"/sls/X12SA/data/{self.scaninfo.username}/Data10/"} self.scaninfo = BecScaninfoMixin(device_manager, sim_mode) + self.scaninfo.load_scan_metadata() # TODO self.filepath = "" - self.scaninfo.username = "e21206" self.filewriter = FileWriterMixin(self.service_cfg) self.reduce_readout = 1e-3 # 3 ms @@ -194,11 +196,8 @@ class Eiger9mCsaxs(DetectorBase): return def _prep_det(self) -> None: - logger.info("prepping thresholds") self._set_det_threshold() - logger.info("prepping detector parameter") self._set_acquisition_params() - logger.info("setting trigger") self._set_trigger(TriggerSource.GATING) def _set_det_threshold(self) -> None: @@ -274,7 +273,7 @@ class Eiger9mCsaxs(DetectorBase): ) msg = BECMessage.FileMessage(file_path=self.filepath, done=False) self._producer.set_and_publish( - MessageEndpoints.public_file(self.scaninfo.scanID, self.name), + MessageEndpoints.file_event(self.name), msg.dumps(), ) self.arm_acquisition() diff --git a/ophyd_devices/epics/devices/mcs_csaxs.py b/ophyd_devices/epics/devices/mcs_csaxs.py index c2806cb..a041b01 100644 --- a/ophyd_devices/epics/devices/mcs_csaxs.py +++ b/ophyd_devices/epics/devices/mcs_csaxs.py @@ -276,12 +276,12 @@ class McsCsaxs(SIS38XX): def _set_acquisition_params(self) -> None: if self.scaninfo.scan_type == "step": - n_points = self.scaninfo.frames_per_trigger + 1 + n_points = int(self.scaninfo.frames_per_trigger + 1) elif self.scaninfo.scan_type == "fly": - n_points = self.scaninfo.num_points / int(self.num_lines.get()) + 1 + n_points = int(self.scaninfo.num_points / int(self.num_lines.get()) + 1) else: raise MCSError(f"Scantype {self.scaninfo} not implemented for MCS card") - if n_points > 1000: + if n_points > 10000: raise MCSError( f"Requested number of points N={n_points} exceeds hardware limit of mcs card 10000 (N-1)" ) diff --git a/ophyd_devices/epics/devices/pilatus_csaxs.py b/ophyd_devices/epics/devices/pilatus_csaxs.py index 74b5557..db97798 100644 --- a/ophyd_devices/epics/devices/pilatus_csaxs.py +++ b/ophyd_devices/epics/devices/pilatus_csaxs.py @@ -1,24 +1,41 @@ +import enum import json import os +import time from typing import List import requests import numpy as np from ophyd.areadetector import ADComponent as ADCpt, PilatusDetectorCam, DetectorBase from ophyd.areadetector.plugins import FileBase +from ophyd_devices.utils import bec_utils as bec_utils from bec_lib.core import BECMessage, MessageEndpoints, RedisConnector from bec_lib.core.file_utils import FileWriterMixin from bec_lib.core import bec_logger +from ophyd_devices.epics.devices.bec_scaninfo_mixin import BecScaninfoMixin + logger = bec_logger.logger +class PilatusError(Exception): + pass + + class PilatusDetectorCamEx(PilatusDetectorCam, FileBase): pass +class TriggerSource(int, enum.Enum): + INTERNAL = 0 + EXT_ENABLE = 1 + EXT_TRIGGER = 2 + MULTI_TRIGGER = 3 + ALGINMENT = 4 + + class PilatusCsaxs(DetectorBase): """Pilatus_2 300k detector for CSAXS @@ -43,6 +60,7 @@ class PilatusCsaxs(DetectorBase): configuration_attrs=None, parent=None, device_manager=None, + sim_mode=False, **kwargs, ): super().__init__( @@ -54,41 +72,55 @@ class PilatusCsaxs(DetectorBase): parent=parent, **kwargs, ) - self.device_manager = device_manager - self.name = name - self.username = "e21206" - # TODO once running from BEC - # self.username = self.device_manager.producer.get(MessageEndpoints.account()).decode() + if device_manager is None and not sim_mode: + raise PilatusError("Add DeviceManager to initialization or init with sim_mode=True") + + self.name = name + self.wait_for_connection() # Make sure to be connected before talking to PVs + if not sim_mode: + from bec_lib.core.bec_service import SERVICE_CONFIG + + self.device_manager = device_manager + self._producer = self.device_manager.producer + self.service_cfg = SERVICE_CONFIG.config["service_config"]["file_writer"] + else: + self._producer = bec_utils.MockProducer() + self.device_manager = bec_utils.MockDeviceManager() + self.scaninfo = BecScaninfoMixin(device_manager, sim_mode) + self.scaninfo.load_scan_metadata() + self.service_cfg = {"base_path": f"/sls/X12SA/data/{self.scaninfo.username}/Data10/"} + + self.scaninfo = BecScaninfoMixin(device_manager, sim_mode) + self.filepath = "" - self.service_cfg = {"base_path": f"/sls/X12SA/data/{self.username}/Data10/data/"} self.filewriter = FileWriterMixin(self.service_cfg) - self._producer = RedisConnector(["localhost:6379"]).producer() - self.readout = 0.003 # 3 ms - self.triggermode = 0 # 0 : internal, scan must set this if hardware triggered + self.readout = 1e-3 # 3 ms def _get_current_scan_msg(self) -> BECMessage.ScanStatusMessage: msg = self.device_manager.producer.get(MessageEndpoints.scan_status()) return BECMessage.ScanStatusMessage.loads(msg) - def _load_scan_metadata(self) -> None: - scan_msg = self._get_current_scan_msg() - self.metadata = { - "scanID": scan_msg.content["scanID"], - "RID": scan_msg.content["info"]["RID"], - "queueID": scan_msg.content["info"]["queueID"], - } - self.scanID = scan_msg.content["scanID"] - self.scan_number = scan_msg.content["info"]["scan_number"] - self.exp_time = scan_msg.content["info"]["exp_time"] - self.num_frames = scan_msg.content["info"]["num_points"] - self.username = self.device_manager.producer.get(MessageEndpoints.account()).decode() - self.device_manager.devices.mokev.read()["mokev"]["value"] - # self.triggermode = scan_msg.content["info"]["trigger_mode"] - # self.filename = self.filewriter.compile_full_filename( - # self.scan_number, "pilatus_2", 1000, 5, True - # ) - # TODO fix with BEC running - # self.filename = '/sls/X12SA/Data10/e21206/data/test.h5' + # def _load_scan_metadata(self) -> None: + # scan_msg = self._get_current_scan_msg() + # self.metadata = { + # "scanID": scan_msg.content["scanID"], + # "RID": scan_msg.content["info"]["RID"], + # "queueID": scan_msg.content["info"]["queueID"], + # } + # self.scanID = scan_msg.content["scanID"] + # self.scan_number = scan_msg.content["info"]["scan_number"] + # self.exp_time = scan_msg.content["info"]["exp_time"] + # self.num_frames = scan_msg.content["info"]["num_points"] + # self.username = self.device_manager.producer.get( + # MessageEndpoints.account() + # ).decode() + # self.device_manager.devices.mokev.read()["mokev"]["value"] + # # self.triggermode = scan_msg.content["info"]["trigger_mode"] + # # self.filename = self.filewriter.compile_full_filename( + # # self.scan_number, "pilatus_2", 1000, 5, True + # # ) + # # TODO fix with BEC running + # # self.filename = '/sls/X12SA/Data10/e21206/data/test.h5' def _prep_det(self) -> None: # TODO slow reaction, seemed to have timeout. @@ -96,37 +128,57 @@ class PilatusCsaxs(DetectorBase): self._set_acquisition_params() def _set_det_threshold(self) -> None: + # threshold_energy PV exists on Eiger 9M? + factor = 1 + if self.cam.threshold_energy._metadata["units"] == "eV": + factor = 1000 + setp_energy = int(self.mokev * factor) threshold = self.cam.threshold_energy.read()[self.cam.threshold_energy.name]["value"] - # threshold = self.cam.threshold_energy.read()[self.cam.threshold_energy.name]['value'] - if not np.isclose(self.mokev / 2, threshold, rtol=0.05): - self.cam.threshold_energy.set(self.mokev / 2) + if not np.isclose(setp_energy / 2, threshold, rtol=0.05): + self.cam.threshold_energy.set(setp_energy / 2) def _set_acquisition_params(self) -> None: """set acquisition parameters on the detector""" - self.cam.acquire_time.set(self.exp_time) - self.cam.acquire_period.set(self.exp_time + self.readout) - self.cam.num_images.set(self.num_frames) + # self.cam.acquire_time.set(self.exp_time) + # self.cam.acquire_period.set(self.exp_time + self.readout) + self.cam.num_images.set(int(self.scaninfo.num_points * self.scaninfo.frames_per_trigger)) self.cam.num_exposures.set(1) - self.cam.trigger_mode.set(self.triggermode) + self._set_trigger(TriggerSource.INTERNAL) # EXT_TRIGGER) + + def _set_trigger(self, trigger_source: TriggerSource) -> None: + """Set trigger source for the detector, either directly to value or TriggerSource.* with + INTERNAL = 0 + EXT_ENABLE = 1 + EXT_TRIGGER = 2 + MULTI_TRIGGER = 3 + ALGINMENT = 4 + """ + value = int(trigger_source) + self.cam.trigger_mode.set(value) def _prep_file_writer(self) -> None: """Prepare the file writer for pilatus_2 a zmq service is running on xbl-daq-34 that is waiting for a zmq message to start the writer for the pilatus_2 x12sa-pd-2 """ + self.filepath_h5 = self.filewriter.compile_full_filename( + self.scaninfo.scan_number, "pilatus_2.h5", 1000, 5, True + ) self.cam.file_path.set(f"/dev/shm/zmq/") - self.cam.file_name.set(f"{self.username}_2_{self.scan_number:05d}") + self.cam.file_name.set(f"{self.scaninfo.username}_2_{self.scaninfo.scan_number:05d}") self.cam.auto_increment.set(1) # auto increment self.cam.file_number.set(0) # first iter self.cam.file_format.set(0) # 0: TIFF self.cam.file_template.set("%s%s_%5.5d.cbf") - # TODO Filewriter Plugin to write cbfs to h5! - # Pilatus_2 writes cbf files -> where do we like to write those! - # scan_dir = self.filewriter._get_scan_directory( - # scan_bundle=1000, scan_number=self.scan_number, leading_zeros=5 - # ) # os.path.join(self.service_cfg["base_path"], scan_dir) - self.destination_path = "/sls/X12SA/data/{self.username}/Data10/pilatus_2/" + # compile filename + basepath = f"/sls/X12SA/data/{self.scaninfo.username}/Data10/pilatus_2/" + self.destination_path = os.path.join( + basepath, + self.filewriter.get_scan_directory(self.scaninfo.scan_number, 1000, 5), + ) + # Make directory if needed + os.makedirs(os.path.dirname(self.destination_path), exist_ok=True) data_msg = { "source": [ @@ -146,6 +198,7 @@ class PilatusCsaxs(DetectorBase): data=json.dumps(data_msg), headers=headers, ) + logger.info(f"{res.status_code} - {res.text} - {res.content}") if not res.ok: res.raise_for_status() @@ -153,14 +206,14 @@ class PilatusCsaxs(DetectorBase): # prepare writer data_msg = [ "zmqWriter", - self.username, + self.scaninfo.username, { "addr": "tcp://x12sa-pd-2:8888", "dst": ["file"], - "numFrm": self.num_frames, + "numFrm": int(self.scaninfo.num_points * self.scaninfo.frames_per_trigger), "timeout": 2000, "ifType": "PULL", - "user": self.username, + "user": self.scaninfo.username, }, ] @@ -170,55 +223,69 @@ class PilatusCsaxs(DetectorBase): headers=headers, ) + logger.info(f"{res.status_code} - {res.text} - {res.content}") + if not res.ok: res.raise_for_status() + # Wait for server to become available again + time.sleep(0.1) + + headers = {"Content-Type": "application/json", "Accept": "application/json"} + data_msg = [ + "zmqWriter", + self.scaninfo.username, + { + "frmCnt": int(self.scaninfo.num_points * self.scaninfo.frames_per_trigger), + "timeout": 2000, + }, + ] + logger.info(f"{res.status_code} -{res.text} - {res.content}") + + try: + res = requests.put( + url="http://xbl-daq-34:8091/pilatus_2/wait", + data=json.dumps(data_msg), + # headers=headers, + ) + + logger.info(f"{res}") + + if not res.ok: + res.raise_for_status() + except Exception as exc: + logger.info("exc") + def _close_file_writer(self) -> None: """Close the file writer for pilatus_2 a zmq service is running on xbl-daq-34 that is waiting for a zmq message to stop the writer for the pilatus_2 x12sa-pd-2 """ - headers = {"Content-Type": "application/json", "Accept": "application/json"} - data_msg = [ - "zmqWriter", - self.username, - { - "frmCnt": self.num_frames, - "timeout": 2000, - }, - ] - logger.info(data_msg) - - res = requests.put( - url="http://xbl-daq-34:8091/pilatus_2/wait", - data=json.dumps(data_msg), - headers=headers, - ) - - if not res.ok: - res.raise_for_status() res = requests.delete(url="http://x12sa-pd-2:8080/stream/pilatus_2") if not res.ok: res.raise_for_status() + def _stop_file_writer(self) -> None: + res = requests.put( + url="http://xbl-daq-34:8091/pilatus_2/stop", + # data=json.dumps(data_msg), + # headers=headers, + ) + + if not res.ok: + res.raise_for_status() + def stage(self) -> List[object]: """stage the detector and file writer""" - # TODO remove once running from BEC - # self._load_scan_metadata() - self.scan_number = 10 - self.exp_time = 0.5 - self.num_frames = 3 - self.mokev = 12 + self.scaninfo.load_scan_metadata() + self.mokev = self.device_manager.devices.mokev.obj.read()[ + self.device_manager.devices.mokev.name + ]["value"] self._prep_det() self._prep_file_writer() - - # msg = BECMessage.FileMessage(file_path=self.filename, done=False) - # self._producer.set_and_publish( - # MessageEndpoints.public_file(self.scanID, "pilatus_2"), - # msg.dumps(), - # ) + self.acquire() return super().stage() @@ -227,13 +294,18 @@ class PilatusCsaxs(DetectorBase): # Reset to software trigger self.triggermode = 0 self._close_file_writer() - # TODO check if acquisition is done and successful! - state = True - # msg = BECMessage.FileMessage(file_path=self.filepath, done=True, successful=state) - # self.producer.set_and_publish( - # MessageEndpoints.public_file(self.metadata["scanID"], self.name), - # msg.dumps(), - # ) + self._stop_file_writer() + # Only sent this out once data is written to disk since cbf to hdf5 converter will be triggered + msg = BECMessage.FileMessage(file_path=self.filepath, done=True) + self._producer.set_and_publish( + MessageEndpoints.public_file(self.scaninfo.scanID, self.name), + msg.dumps(), + ) + msg = BECMessage.FileMessage(file_path=self.filepath, done=True) + self._producer.set_and_publish( + MessageEndpoints.file_event(self.name), + msg.dumps(), + ) return super().unstage() def acquire(self) -> None: @@ -245,7 +317,8 @@ class PilatusCsaxs(DetectorBase): def stop(self, *, success=False) -> None: """Stop the scan, with camera and file writer""" self.cam.acquire.set(0) - self.unstage() + self._stop_file_writer() + # self.unstage() super().stop(success=success) self._stopped = True