From b8dcda16961a36ce9cf99d65cae3e8e5fa2eaaad Mon Sep 17 00:00:00 2001 From: gac-x05la Date: Fri, 14 Feb 2025 13:34:02 +0100 Subject: [PATCH] Linting to 9.22 --- .../devices/gigafrost/gigafrostcamera.py | 174 ++++++------ tomcat_bec/devices/gigafrost/pco_datasink.py | 55 ++-- tomcat_bec/devices/gigafrost/pcoedgecamera.py | 78 +++--- tomcat_bec/devices/gigafrost/stddaq_client.py | 252 ++++++++++-------- 4 files changed, 297 insertions(+), 262 deletions(-) diff --git a/tomcat_bec/devices/gigafrost/gigafrostcamera.py b/tomcat_bec/devices/gigafrost/gigafrostcamera.py index 186badf..59ee53f 100644 --- a/tomcat_bec/devices/gigafrost/gigafrostcamera.py +++ b/tomcat_bec/devices/gigafrost/gigafrostcamera.py @@ -26,9 +26,11 @@ except ModuleNotFoundError: try: from bec_lib import bec_logger + logger = bec_logger.logger except ModuleNotFoundError: import logging + logger = logging.getLogger("GfCam") @@ -38,8 +40,9 @@ class GigaFrostCameraMixin(CustomDetectorMixin): This class will be called by the custom_prepare_cls attribute of the detector class. """ + # pylint: disable=protected-access def _define_backend_ip(self): - """ Select backend IP address for UDP stream""" + """Select backend IP address for UDP stream""" if self.parent.backendUrl.get() == const.BE3_DAFL_CLIENT: # xbl-daq-33 return const.BE3_NORTH_IP, const.BE3_SOUTH_IP if self.parent.backendUrl.get() == const.BE999_DAFL_CLIENT: @@ -48,7 +51,7 @@ class GigaFrostCameraMixin(CustomDetectorMixin): raise RuntimeError(f"Backend {self.parent.backendUrl.get()} not recognized.") def _define_backend_mac(self): - """ Select backend MAC address for UDP stream""" + """Select backend MAC address for UDP stream""" if self.parent.backendUrl.get() == const.BE3_DAFL_CLIENT: # xbl-daq-33 return const.BE3_NORTH_MAC, const.BE3_SOUTH_MAC if self.parent.backendUrl.get() == const.BE999_DAFL_CLIENT: @@ -74,7 +77,7 @@ class GigaFrostCameraMixin(CustomDetectorMixin): self.parent.macSouth.get(), self.parent.ipSouth.get(), dest_port, - source_port + source_port, ) else: extend_header_table( @@ -82,20 +85,20 @@ class GigaFrostCameraMixin(CustomDetectorMixin): self.parent.macNorth.get(), self.parent.ipNorth.get(), dest_port, - source_port + source_port, ) return udp_header_table def on_init(self) -> None: - """ Initialize the camera, set channel values""" + """Initialize the camera, set channel values""" # ToDo: Not sure if it's a good idea to change camera settings upon # ophyd device startup, i.e. each deviceserver restart. self._init_gigafrost() self.parent._initialized = True def _init_gigafrost(self) -> None: - """ Initialize the camera, set channel values""" + """Initialize the camera, set channel values""" # Stop acquisition self.parent.cmdStartCamera.set(0).wait() @@ -142,7 +145,7 @@ class GigaFrostCameraMixin(CustomDetectorMixin): return super().on_init() def on_stage(self) -> None: - """ Configuration and staging + """Configuration and staging In the BEC model ophyd devices must fish out their own configuration from the 'scaninfo'. I.e. they need to know which parameters are relevant for them at each scan. @@ -158,25 +161,23 @@ class GigaFrostCameraMixin(CustomDetectorMixin): logger.warning( f"[{self.parent.name}] Ophyd device havent ran the initialization sequence," "IOC might be in unknown configuration." - ) + ) # Fish out our configuration from scaninfo (via explicit or generic addressing) scanparam = self.parent.scaninfo.scan_msg.info - alias = self.parent.parent.name if self.parent.parent is not None else self.parent.name - # logger.warning(f"[{alias}] Scan parameters:\n{scanparam}") d = {} - if 'kwargs' in scanparam: - scanargs = scanparam['kwargs'] - if 'image_width' in scanargs and scanargs['image_width']!=None: - d['image_width'] = scanargs['image_width'] - if 'image_height' in scanargs and scanargs['image_height']!=None: - d['image_height'] = scanargs['image_height'] - if 'exp_time' in scanargs and scanargs['exp_time']!=None: - d['exposure_time_ms'] = scanargs['exp_time'] - if 'exp_burst' in scanargs and scanargs['exp_burst']!=None: - d['exposure_num_burst'] = scanargs['exp_burst'] - if 'acq_mode' in scanargs and scanargs['acq_mode']!=None: - d['acq_mode'] = scanargs['acq_mode'] + if "kwargs" in scanparam: + scanargs = scanparam["kwargs"] + if "image_width" in scanargs and scanargs["image_width"] is not None: + d["image_width"] = scanargs["image_width"] + if "image_height" in scanargs and scanargs["image_height"] is not None: + d["image_height"] = scanargs["image_height"] + if "exp_time" in scanargs and scanargs["exp_time"] is not None: + d["exposure_time_ms"] = scanargs["exp_time"] + if "exp_burst" in scanargs and scanargs["exp_burst"] is not None: + d["exposure_num_burst"] = scanargs["exp_burst"] + if "acq_mode" in scanargs and scanargs["acq_mode"] is not None: + d["acq_mode"] = scanargs["acq_mode"] # elif self.parent.scaninfo.scan_type == "step": # d['acq_mode'] = "default" @@ -217,13 +218,17 @@ class GigaFrostCameraMixin(CustomDetectorMixin): Specify actions to be executed upon receiving trigger signal. Return a DeviceStatus object or None """ - if self.parent.infoBusyFlag.get() in (0, 'IDLE'): - raise RuntimeError('GigaFrost must be running before triggering') + if self.parent.infoBusyFlag.get() in (0, "IDLE"): + raise RuntimeError("GigaFrost must be running before triggering") logger.warning(f"[{self.parent.name}] SW triggering gigafrost") # Soft triggering based on operation mode - if self.parent.autoSoftEnable.get() and self.parent.trigger_mode == 'auto' and self.parent.enable_mode == 'soft': + if ( + self.parent.autoSoftEnable.get() + and self.parent.trigger_mode == "auto" + and self.parent.enable_mode == "soft" + ): # BEC teststand operation mode: posedge of SoftEnable if Started self.parent.cmdSoftEnable.set(0).wait() self.parent.cmdSoftEnable.set(1).wait() @@ -254,6 +259,7 @@ class GigaFrostCamera(PSIDetectorBase): ---------- FRAMERATE : Ignored in soft trigger mode, period becomes 2xExposure time """ + # pylint: disable=too-many-instance-attributes custom_prepare_cls = GigaFrostCameraMixin @@ -267,12 +273,13 @@ class GigaFrostCamera(PSIDetectorBase): cmdSetParam = Component(EpicsSignal, "SET_PARAM.PROC", put_complete=True, kind=Kind.omitted) cfgAcqMode = Component(EpicsSignal, "ACQMODE", put_complete=True, kind=Kind.config) - array_size = DynamicDeviceComponent({ - "array_size_x": (EpicsSignalRO, "ROIX", {'auto_monitor': True}), - "array_size_y": (EpicsSignalRO, "ROIY", {'auto_monitor': True}), - }, doc="Size of the array in the XY dimensions") - - + array_size = DynamicDeviceComponent( + { + "array_size_x": (EpicsSignalRO, "ROIX", {"auto_monitor": True}), + "array_size_y": (EpicsSignalRO, "ROIY", {"auto_monitor": True}), + }, + doc="Size of the array in the XY dimensions", + ) # UDP header cfgUdpNumPorts = Component(EpicsSignal, "PORTS", put_complete=True, kind=Kind.config) @@ -282,24 +289,26 @@ class GigaFrostCamera(PSIDetectorBase): # Standard camera configs cfgExposure = Component( - EpicsSignal, "EXPOSURE", put_complete=True, auto_monitor=True, kind=Kind.config) + EpicsSignal, "EXPOSURE", put_complete=True, auto_monitor=True, kind=Kind.config + ) cfgFramerate = Component( - EpicsSignal, "FRAMERATE", put_complete=True, auto_monitor=True, kind=Kind.config) - cfgRoiX = Component( - EpicsSignal, "ROIX", put_complete=True, auto_monitor=True, kind=Kind.config) - cfgRoiY = Component( - EpicsSignal, "ROIY", put_complete=True, auto_monitor=True, kind=Kind.config) + EpicsSignal, "FRAMERATE", put_complete=True, auto_monitor=True, kind=Kind.config + ) + cfgRoiX = Component(EpicsSignal, "ROIX", put_complete=True, auto_monitor=True, kind=Kind.config) + cfgRoiY = Component(EpicsSignal, "ROIY", put_complete=True, auto_monitor=True, kind=Kind.config) cfgScanId = Component( - EpicsSignal, "SCAN_ID", put_complete=True, auto_monitor=True, kind=Kind.config) + EpicsSignal, "SCAN_ID", put_complete=True, auto_monitor=True, kind=Kind.config + ) cfgCntNum = Component( - EpicsSignal, "CNT_NUM", put_complete=True, auto_monitor=True, kind=Kind.config) + EpicsSignal, "CNT_NUM", put_complete=True, auto_monitor=True, kind=Kind.config + ) cfgCorrMode = Component( - EpicsSignal, "CORR_MODE", put_complete=True, auto_monitor=True, kind=Kind.config) + EpicsSignal, "CORR_MODE", put_complete=True, auto_monitor=True, kind=Kind.config + ) # Software signals cmdSoftEnable = Component(EpicsSignal, "SOFT_ENABLE", put_complete=True) - cmdSoftTrigger = Component( - EpicsSignal, "SOFT_TRIG.PROC", put_complete=True, kind=Kind.omitted) + cmdSoftTrigger = Component(EpicsSignal, "SOFT_TRIG.PROC", put_complete=True, kind=Kind.omitted) cmdSoftExposure = Component(EpicsSignal, "SOFT_EXP", put_complete=True) cfgAcqMode = Component(EpicsSignal, "ACQMODE", put_complete=True, kind=Kind.config) @@ -403,11 +412,7 @@ class GigaFrostCamera(PSIDetectorBase): kind=Kind.config, ) cfgCntEndBit = Component( - EpicsSignal, - "CNT_ENDBIT_RBV", - write_pv="CNT_ENDBIT", - put_complete=True, - kind=Kind.config + EpicsSignal, "CNT_ENDBIT_RBV", write_pv="CNT_ENDBIT", put_complete=True, kind=Kind.config ) # Line swap selection @@ -457,32 +462,41 @@ class GigaFrostCamera(PSIDetectorBase): ): # Ugly hack to pass values before on_init() self._signals_to_be_set = {} - self._signals_to_be_set['auto_soft_enable'] = auto_soft_enable - self._signals_to_be_set['backend_url'] = backend_url + self._signals_to_be_set["auto_soft_enable"] = auto_soft_enable + self._signals_to_be_set["backend_url"] = backend_url # super() will call the mixin class - super().__init__(prefix=prefix, name=name, kind=kind, read_attrs=read_attrs, configuration_attrs=configuration_attrs, parent=parent, device_manager=device_manager, **kwargs) + super().__init__( + prefix=prefix, + name=name, + kind=kind, + read_attrs=read_attrs, + configuration_attrs=configuration_attrs, + parent=parent, + device_manager=device_manager, + **kwargs, + ) def _init(self): """Ugly hack: values must be set before on_init() is called""" # Additional parameters self.autoSoftEnable._metadata["write_access"] = False self.backendUrl._metadata["write_access"] = False - self.autoSoftEnable.put(self._signals_to_be_set['auto_soft_enable'], force=True) - self.backendUrl.put(self._signals_to_be_set['backend_url'], force=True) + self.autoSoftEnable.put(self._signals_to_be_set["auto_soft_enable"], force=True) + self.backendUrl.put(self._signals_to_be_set["backend_url"], force=True) return super()._init() def initialize(self): - """ Initialization in separate command""" + """Initialization in separate command""" self.custom_prepare._init_gigafrost() self._initialized = True def trigger(self) -> DeviceStatus: - """ Sends a software trigger to GigaFrost""" + """Sends a software trigger to GigaFrost""" super().trigger() # There's no status readback from the camera, so we just wait - sleep_time = self.cfgExposure.value*self.cfgCntNum.value*0.001+0.2 + sleep_time = self.cfgExposure.value * self.cfgCntNum.value * 0.001 + 0.2 sleep(sleep_time) return DeviceStatus(self, done=True, success=True, settle_time=sleep_time) @@ -527,40 +541,40 @@ class GigaFrostCamera(PSIDetectorBase): # If Bluesky style configure if d is not None: # Commonly changed settings - if 'exposure_num_burst' in d: - self.cfgCntNum.set(d['exposure_num_burst']).wait() - if 'exposure_time_ms' in d: - self.cfgExposure.set(d['exposure_time_ms']).wait() - if 'exposure_period_ms' in d: - self.cfgFramerate.set(d['exposure_period_ms']).wait() - if 'image_width' in d: - if d['image_width']%48 !=0: + if "exposure_num_burst" in d: + self.cfgCntNum.set(d["exposure_num_burst"]).wait() + if "exposure_time_ms" in d: + self.cfgExposure.set(d["exposure_time_ms"]).wait() + if "exposure_period_ms" in d: + self.cfgFramerate.set(d["exposure_period_ms"]).wait() + if "image_width" in d: + if d["image_width"] % 48 != 0: raise RuntimeError(f"[{self.name}] image_width must be divisible by 48") - self.cfgRoiX.set(d['image_width']).wait() - if 'image_height' in d: - if d['image_height']%16 !=0: + self.cfgRoiX.set(d["image_width"]).wait() + if "image_height" in d: + if d["image_height"] % 16 != 0: raise RuntimeError(f"[{self.name}] image_height must be divisible by 16") - self.cfgRoiY.set(d['image_height']).wait() + self.cfgRoiY.set(d["image_height"]).wait() # Dont change these - scanid = d.get('scanid', 0) - correction_mode = d.get('correction_mode', 5) + scanid = d.get("scanid", 0) + correction_mode = d.get("correction_mode", 5) self.cfgScanId.set(scanid).wait() self.cfgCorrMode.set(correction_mode).wait() - if 'acq_mode' in d: - self.set_acquisition_mode(d['acq_mode']) + if "acq_mode" in d: + self.set_acquisition_mode(d["acq_mode"]) # Commit parameters self.cmdSetParam.set(1).wait() def bluestage(self): - """ Bluesky style stage""" + """Bluesky style stage""" # Switch to acquiring self.cmdStartCamera.set(1).wait() def set_acquisition_mode(self, acq_mode): - """ Set acquisition mode - + """Set acquisition mode + Utility function to quickly select between pre-configured and tested acquisition modes. @@ -577,7 +591,7 @@ class GigaFrostCamera(PSIDetectorBase): self.cfgEnableScheme.set(0).wait() # Set modes - #self.cmdSoftEnable.set(0).wait() + # self.cmdSoftEnable.set(0).wait() self.enable_mode = "soft" self.trigger_mode = "auto" self.exposure_mode = "timer" @@ -834,11 +848,11 @@ class GigaFrostCamera(PSIDetectorBase): The GigaFRoST enable mode. Valid arguments are: * 'soft': - The GigaFRoST enable signal is supplied through a software + The GigaFRoST enable signal is supplied through a software signal * 'external': - The GigaFRoST enable signal is supplied through an external TTL - gating signal from the rotaiton stage or some other control + The GigaFRoST enable signal is supplied through an external TTL + gating signal from the rotaiton stage or some other control unit * 'soft+ext': The GigaFRoST enable signal can be supplied either via the @@ -851,9 +865,7 @@ class GigaFrostCamera(PSIDetectorBase): """ if mode not in const.gf_valid_enable_modes: - raise ValueError( - "Invalid enable mode! Valid modes are:\n{const.gf_valid_enable_modes}" - ) + raise ValueError("Invalid enable mode! Valid modes are:\n{const.gf_valid_enable_modes}") if mode == "soft": self.cfgEnableExt.set(0).wait() diff --git a/tomcat_bec/devices/gigafrost/pco_datasink.py b/tomcat_bec/devices/gigafrost/pco_datasink.py index b76defb..e64f3ac 100644 --- a/tomcat_bec/devices/gigafrost/pco_datasink.py +++ b/tomcat_bec/devices/gigafrost/pco_datasink.py @@ -6,22 +6,19 @@ Created on Thu Jun 27 17:28:43 2024 @author: mohacsi_i """ -import json -import enum from time import sleep, time from threading import Thread import zmq -import numpy as np -from ophyd import Device, Signal, Component, Kind, DeviceStatus +from ophyd import Device, Signal, Component, Kind from ophyd_devices.interfaces.base_classes.psi_detector_base import ( CustomDetectorMixin, PSIDetectorBase, ) from bec_lib import bec_logger -logger = bec_logger.logger -ZMQ_TOPIC_FILTER = b'' +logger = bec_logger.logger +ZMQ_TOPIC_FILTER = b"" class PcoTestConsumerMixin(CustomDetectorMixin): @@ -29,6 +26,7 @@ class PcoTestConsumerMixin(CustomDetectorMixin): Parent class: CustomDetectorMixin """ + # pylint: disable=protected-access def on_stage(self): """Start listening for preview data stream""" if self.parent._mon is not None: @@ -72,29 +70,27 @@ class PcoTestConsumerMixin(CustomDetectorMixin): t_elapsed = t_curr - t_last if t_elapsed < self.parent.throttle.get(): continue - """ - # Unpack the Array V1 reply to metadata and array data - meta, data = r - print(meta) + # # Unpack the Array V1 reply to metadata and array data + # meta, data = r + # print(meta) - # Update image and update subscribers - header = json.loads(meta) - if header["type"] == "uint16": - image = np.frombuffer(data, dtype=np.uint16) - if image.size != np.prod(header['shape']): - err = f"Unexpected array size of {image.size} for header: {header}" - raise ValueError(err) - image = image.reshape(header['shape']) + # # Update image and update subscribers + # header = json.loads(meta) + # if header["type"] == "uint16": + # image = np.frombuffer(data, dtype=np.uint16) + # if image.size != np.prod(header['shape']): + # err = f"Unexpected array size of {image.size} for header: {header}" + # raise ValueError(err) + # image = image.reshape(header['shape']) - # Update image and update subscribers - self.parent.frame.put(header['frame'], force=True) - self.parent.image_shape.put(header['shape'], force=True) - self.parent.image.put(image, force=True) - self.parent._last_image = image - self.parent._run_subs(sub_type=self.parent.SUB_MONITOR, value=image) - """ + # # Update image and update subscribers + # self.parent.frame.put(header['frame'], force=True) + # self.parent.image_shape.put(header['shape'], force=True) + # self.parent.image.put(image, force=True) + # self.parent._last_image = image + # self.parent._run_subs(sub_type=self.parent.SUB_MONITOR, value=image) t_last = t_curr - #logger.info( + # logger.info( # f"[{self.parent.name}] Updated frame {header['frame']}\t" # f"Shape: {header['shape']}\tMean: {np.mean(image):.3f}" # ) @@ -110,7 +106,7 @@ class PcoTestConsumerMixin(CustomDetectorMixin): finally: try: self.parent._socket.disconnect() - except: + except RuntimeError: pass self.parent._mon = None logger.info(f"[{self.parent.name}]\tDetaching monitor") @@ -126,6 +122,7 @@ class PcoTestConsumer(PSIDetectorBase): You can add a preview widget to the dock by: cam_widget = gui.add_dock('cam_dock1').add_widget('BECFigure').image('daq_stream1') """ + # Subscriptions for plotting image USER_ACCESS = ["get_last_image"] SUB_MONITOR = "device_monitor_2d" @@ -171,8 +168,7 @@ class PcoTestConsumer(PSIDetectorBase): self._socket.connect(self.url.get()) def disconnect(self): - """Disconnect - """ + """Disconnect""" try: if self._socket is not None: self._socket.disconnect(self.url.get()) @@ -181,7 +177,6 @@ class PcoTestConsumer(PSIDetectorBase): finally: self._socket = None - def get_image(self): return self._last_image diff --git a/tomcat_bec/devices/gigafrost/pcoedgecamera.py b/tomcat_bec/devices/gigafrost/pcoedgecamera.py index 32899e0..0f046a6 100644 --- a/tomcat_bec/devices/gigafrost/pcoedgecamera.py +++ b/tomcat_bec/devices/gigafrost/pcoedgecamera.py @@ -4,13 +4,12 @@ Created on Wed Dec 6 11:33:54 2023 @author: mohacsi_i """ - +import time from ophyd import Component, EpicsSignal, EpicsSignalRO, Kind from ophyd.status import SubscriptionStatus, DeviceStatus -import time -from ophyd_devices.interfaces.base_classes.psi_detector_base import PSIDetectorBase as PSIDeviceBase +from ophyd_devices import BECDeviceBase from ophyd_devices.interfaces.base_classes.psi_detector_base import ( - CustomDetectorMixin as CustomDeviceMixin, + CustomDetectorMixin as CustomPrepare, ) try: @@ -23,12 +22,12 @@ except ModuleNotFoundError: logger = logging.getLogger("PcoEdgeCam") -class PcoEdgeCameraMixin(CustomDeviceMixin): +class PcoEdgeCameraMixin(CustomPrepare): """Mixin class to setup the Helge camera bae class. This class will be called by the custom_prepare_cls attribute of the detector class. """ - + # pylint: disable=protected-access def on_stage(self) -> None: """Configure and arm PCO.Edge camera for acquisition""" @@ -91,7 +90,6 @@ class PcoEdgeCameraMixin(CustomDeviceMixin): NOTE: Maciej confirmed that sparse data is no problem to the stdDAQ. TODO: Optimize data transfer to launch at end and check completion at the beginning. """ - logger.warning(f"triggering PCO") # Ensure that previous data transfer finished # def sentIt(*args, value, timestamp, **kwargs): # return value==0 @@ -99,11 +97,11 @@ class PcoEdgeCameraMixin(CustomDeviceMixin): # status.wait() # Not sure if it always sends the first batch of images or the newest - def didWeReset(*args, old_value, value, timestamp, **kwargs): + def wait_bufferreset(*, old_value, value, timestamp, **_): return (value < old_value) or (value == 0) self.parent.buffer_clear.set(1).wait() - status = SubscriptionStatus(self.parent.buffer_used, didWeReset, timeout=5) + status = SubscriptionStatus(self.parent.buffer_used, wait_bufferreset, timeout=5) status.wait() t_expected = ( @@ -111,13 +109,13 @@ class PcoEdgeCameraMixin(CustomDeviceMixin): ) * self.parent.file_savestop.get() # Wait until the buffer fills up with enough images - def areWeDoneYet(*args, old_value, value, timestamp, **kwargs): + def wait_acquisition(*, value, timestamp, **_): num_target = self.parent.file_savestop.get() # logger.warning(f"{value} of {num_target}") return bool(value >= num_target) - + max_wait = max(5, 5 * t_expected) status = SubscriptionStatus( - self.parent.buffer_used, areWeDoneYet, timeout=max(5, 5 * t_expected), settle_time=0.2 + self.parent.buffer_used, wait_acquisition, timeout=max_wait, settle_time=0.2 ) status.wait() @@ -130,19 +128,18 @@ class PcoEdgeCameraMixin(CustomDeviceMixin): # against values from the previous cycle, i.e. pass automatically. t_start = time.time() - def haveWeSentIt(*args, old_value, value, timestamp, **kwargs): + def wait_sending(*args, old_value, value, timestamp, **kwargs): t_elapsed = timestamp - t_start # logger.warning(f"{old_value}\t{value}\t{t_elapsed}") return old_value == 1 and value == 0 and t_elapsed > 0 status = SubscriptionStatus( - self.parent.file_savebusy, haveWeSentIt, timeout=120, settle_time=0.2 + self.parent.file_savebusy, wait_sending, timeout=120, settle_time=0.2 ) status.wait() - logger.warning(f"done PCO") -class HelgeCameraBase(PSIDeviceBase): +class HelgeCameraBase(BECDeviceBase): """Ophyd baseclass for Helge camera IOCs This class provides wrappers for Helge's camera IOCs around SwissFEL and @@ -161,19 +158,21 @@ class HelgeCameraBase(PSIDeviceBase): UPDATE: Data sending operation modes - Switch to ZMQ streaming by setting FILEFORMAT to ZEROMQ, set SAVESTART and SAVESTOP to select a ROI of images and start file transfer with FTRANSFER. - The ZMQ connection streams out the data in PUSH-PULL mode, i.e. it needs incoming connection. + - Switch to ZMQ streaming by setting FILEFORMAT to ZEROMQ + - Set SAVESTART and SAVESTOP to select a ROI of image indices + - Start file transfer with FTRANSFER. + The ZMQ connection operates in PUSH-PULL mode, i.e. it needs incoming connection. STOREMODE sets the acquisition mode: if STOREMODE == Recorder - Fills up the buffer with images and SAVESTART and SAVESTOP selects a ROI of images to be streamed + Fills up the buffer with images. Here SAVESTART and SAVESTOP selects a ROI + of image indices to be streamed out (i.e. maximum buffer_size number of images) if STOREMODE == FIFO buffer - Continously streams out data using the buffer as a FIFO queue and SAVESTART and SAVESTOP selects a ROI of images to be streamed continously (i.e. a large SAVESTOP streams indefinitely) - - Note that in FIFO mode buffer reads are destructive, to prevent this, we don't have EPICS preview - - + Continously streams out data using the buffer as a FIFO queue. + Here SAVESTART and SAVESTOP selects a ROI of image indices to be streamed continously + (i.e. a large SAVESTOP streams indefinitely). Note that in FIFO mode buffer reads are + destructive. to prevent this, we don't have EPICS preview """ # ######################################################################## @@ -274,7 +273,7 @@ class HelgeCameraBase(PSIDeviceBase): @state.setter def state(self): - raise ReadOnlyError("State is a ReadOnly property") + raise RuntimeError("State is a ReadOnly property") def configure(self, d: dict = {}) -> tuple: """Configure the base Helge camera device @@ -323,11 +322,11 @@ class HelgeCameraBase(PSIDeviceBase): # So we need a 'negedge' on SET_PARAM self.camSetParam.set(1).wait() - def fallingEdge(*args, old_value, value, timestamp, **kwargs): + def negedge(*, old_value, value, timestamp, **_): return bool(old_value and not value) # Subscribe and wait for update - status = SubscriptionStatus(self.camSetParam, fallingEdge, timeout=5, settle_time=0.5) + status = SubscriptionStatus(self.camSetParam, negedge, timeout=5, settle_time=0.5) status.wait() def bluestage(self): @@ -341,24 +340,23 @@ class HelgeCameraBase(PSIDeviceBase): self.bufferStoreMode.get() in ("Recorder", 0) and self.file_savestop.get() > self.buffer_size.get() ): - self.logger.warning( - "You're about to send some empty images, {self.file_savestop.get()} is above buffer size" + logger.warning( + f"You'll send empty images, {self.file_savestop.get()} is above buffer size" ) # Start the acquisition (this sets parameers and starts acquisition) self.camStatusCmd.set("Running").wait() # Subscribe and wait for update - def isRunning(*args, old_value, value, timestamp, **kwargs): + def is_running(*, value, timestamp, **_): return bool(value == 6) - status = SubscriptionStatus(self.camStatusCode, isRunning, timeout=5, settle_time=0.2) + status = SubscriptionStatus(self.camStatusCode, is_running, timeout=5, settle_time=0.2) status.wait() def blueunstage(self): """Bluesky style unstage: stop the detector""" self.camStatusCmd.set("Idle").wait() - self.custom_prepare.stop_monitor = True # Data streaming is stopped by setting the max index to 0 # FIXME: This might interrupt data transfer @@ -371,13 +369,6 @@ class HelgeCameraBase(PSIDeviceBase): """ self.file_transfer.set(1).wait() - # def complete(self): - # """ Wait until the images have been sent""" - # def areWeSending(*args, value, timestamp, **kwargs): - # return not bool(value) - # status = SubscriptionStatus(self.file_savebusy, haveWeSentIt, timeout=None, settle_time=0.2) - # return status - class PcoEdge5M(HelgeCameraBase): """Ophyd baseclass for PCO.Edge cameras @@ -402,7 +393,8 @@ class PcoEdge5M(HelgeCameraBase): acqMode = Component(EpicsSignalRO, "ACQMODE", auto_monitor=True, kind=Kind.config) acqDelay = Component(EpicsSignalRO, "DELAY", auto_monitor=True, kind=Kind.config) acqTriggerEna = Component(EpicsSignalRO, "TRIGGER", auto_monitor=True, kind=Kind.config) - # acqTriggerSource = Component(EpicsSignalRO, "TRIGGERSOURCE", auto_monitor=True, kind=Kind.config) + # acqTriggerSource = Component( + # EpicsSignalRO, "TRIGGERSOURCE", auto_monitor=True, kind=Kind.config) # acqTriggerEdge = Component(EpicsSignalRO, "TRIGGEREDGE", auto_monitor=True, kind=Kind.config) # ######################################################################## @@ -465,9 +457,9 @@ class PcoEdge5M(HelgeCameraBase): self.pxRoiY_lo.set(2160 / 2 - height / 2).wait() self.pxRoiY_hi.set(2160 / 2 + height / 2).wait() if "image_binx" in d and d["image_binx"] is not None: - self.pxBinX.set(d["image_binx"]).wait() + self.bin_x.set(d["image_binx"]).wait() if "image_biny" in d and d["image_biny"] is not None: - self.pxBinY.set(d["image_biny"]).wait() + self.bin_y.set(d["image_biny"]).wait() # Call super() to commit the changes super().configure(d) @@ -477,5 +469,5 @@ class PcoEdge5M(HelgeCameraBase): if __name__ == "__main__": # Drive data collection - cam = PcoEdgeBase("X02DA-CCDCAM2:", name="mcpcam") + cam = PcoEdge5M("X02DA-CCDCAM2:", name="mcpcam") cam.wait_for_connection() diff --git a/tomcat_bec/devices/gigafrost/stddaq_client.py b/tomcat_bec/devices/gigafrost/stddaq_client.py index fc4603f..34006b2 100644 --- a/tomcat_bec/devices/gigafrost/stddaq_client.py +++ b/tomcat_bec/devices/gigafrost/stddaq_client.py @@ -12,25 +12,26 @@ from threading import Thread import requests import os -from ophyd import Device, Signal, Component, Kind, Staged +from ophyd import Signal, Component, Kind from ophyd.status import SubscriptionStatus -from ophyd.flyers import FlyerInterface from websockets.sync.client import connect, ClientConnection from websockets.exceptions import ConnectionClosedOK, ConnectionClosedError from ophyd_devices.interfaces.base_classes.psi_detector_base import PSIDetectorBase as PSIDeviceBase -from ophyd_devices.interfaces.base_classes.psi_detector_base import CustomDetectorMixin as CustomDeviceMixin +from ophyd_devices.interfaces.base_classes.psi_detector_base import ( + CustomDetectorMixin as CustomDeviceMixin, +) from bec_lib import bec_logger -from bec_lib.file_utils import FileWriter + logger = bec_logger.logger class StdDaqMixin(CustomDeviceMixin): - # parent : StdDaqClient + # pylint: disable=protected-access _mon = None def on_stage(self) -> None: - """ Configuration and staging + """Configuration and staging In the BEC model ophyd devices must fish out their own configuration from the 'scaninfo'. I.e. they need to know which parameters are relevant for them at each scan. @@ -40,30 +41,30 @@ class StdDaqMixin(CustomDeviceMixin): # Fish out our configuration from scaninfo (via explicit or generic addressing) # NOTE: Scans don't have to fully configure the device d = {} - if 'kwargs' in self.parent.scaninfo.scan_msg.info: - scanargs = self.parent.scaninfo.scan_msg.info['kwargs'] - if 'image_width' in scanargs and scanargs['image_width'] != None: - d['image_width'] = scanargs['image_width'] - if 'image_height' in scanargs and scanargs['image_height'] != None: - d['image_height'] = scanargs['image_height'] - if 'nr_writers' in scanargs and scanargs['nr_writers'] != None: - d['nr_writers'] = scanargs['nr_writers'] - if 'file_path' in scanargs and scanargs['file_path']!=None: - self.parent.file_path.set(scanargs['file_path'].replace('data','gpfs')).wait() - print(scanargs['file_path']) - if os.path.isdir(scanargs['file_path']): + if "kwargs" in self.parent.scaninfo.scan_msg.info: + scanargs = self.parent.scaninfo.scan_msg.info["kwargs"] + if "image_width" in scanargs and scanargs["image_width"] is not None: + d["image_width"] = scanargs["image_width"] + if "image_height" in scanargs and scanargs["image_height"] is not None: + d["image_height"] = scanargs["image_height"] + if "nr_writers" in scanargs and scanargs["nr_writers"] is not None: + d["nr_writers"] = scanargs["nr_writers"] + if "file_path" in scanargs and scanargs["file_path"] is not None: + self.parent.file_path.set(scanargs["file_path"].replace("data", "gpfs")).wait() + print(scanargs["file_path"]) + if os.path.isdir(scanargs["file_path"]): print("isdir") pass else: print("creating") try: - os.makedirs(scanargs['file_path'], 0o777) - os.system('chmod -R 777 ' + scanargs['base_path']) + os.makedirs(scanargs["file_path"], 0o777) + os.system("chmod -R 777 " + scanargs["base_path"]) except: - print('Problem with creating folder') - if 'file_prefix' in scanargs and scanargs['file_prefix']!=None: - print(scanargs['file_prefix']) - self.parent.file_prefix.set(scanargs['file_prefix']).wait() + print("Problem with creating folder") + if "file_prefix" in scanargs and scanargs["file_prefix"] != None: + print(scanargs["file_prefix"]) + self.parent.file_prefix.set(scanargs["file_prefix"]).wait() if "daq_num_points" in scanargs: d["num_points_total"] = scanargs["daq_num_points"] @@ -71,13 +72,13 @@ class StdDaqMixin(CustomDeviceMixin): # Try to figure out number of points num_points = 1 points_valid = False - if "steps" in scanargs and scanargs['steps'] is not None: + if "steps" in scanargs and scanargs["steps"] is not None: num_points *= scanargs["steps"] points_valid = True - if "exp_burst" in scanargs and scanargs['exp_burst'] is not None: + if "exp_burst" in scanargs and scanargs["exp_burst"] is not None: num_points *= scanargs["exp_burst"] points_valid = True - if "repeats" in scanargs and scanargs['repeats'] is not None: + if "repeats" in scanargs and scanargs["repeats"] is not None: num_points *= scanargs["repeats"] points_valid = True if points_valid: @@ -98,19 +99,17 @@ class StdDaqMixin(CustomDeviceMixin): self._mon.start() def on_unstage(self): - """ Stop a running acquisition and close connection - """ + """Stop a running acquisition and close connection""" print("Creating virtual dataset") self.parent.create_virtual_dataset() self.parent.blueunstage() def on_stop(self): - """ Stop a running acquisition and close connection - """ + """Stop a running acquisition and close connection""" self.parent.blueunstage() def monitor(self) -> None: - """ Monitor status messages while connection is open. This will block the reply monitoring + """Monitor status messages while connection is open. This will block the reply monitoring to calling unstage() might throw. Status updates are sent every 1 seconds, but finishing acquisition means StdDAQ will close connection, so there's no idle state polling. """ @@ -144,19 +143,31 @@ class StdDaqClient(PSIDeviceBase): daq = StdDaqClient(name="daq", ws_url="ws://xbl-daq-29:8080", rest_url="http://xbl-daq-29:5000") ``` """ + # pylint: disable=too-many-instance-attributes custom_prepare_cls = StdDaqMixin - USER_ACCESS = ["set_daq_config", "get_daq_config", "nuke", "connect", "message", "state", "bluestage", "blueunstage"] + USER_ACCESS = [ + "set_daq_config", + "get_daq_config", + "nuke", + "connect", + "message", + "state", + "bluestage", + "blueunstage", + ] _wsclient = None # Status attributes - ws_url = Component(Signal, kind=Kind.config, metadata={'write_access': False}) - runstatus = Component(Signal, value="unknown", kind=Kind.normal, metadata={'write_access': False}) + ws_url = Component(Signal, kind=Kind.config, metadata={"write_access": False}) + runstatus = Component( + Signal, value="unknown", kind=Kind.normal, metadata={"write_access": False} + ) num_images = Component(Signal, value=10000, kind=Kind.config) file_path = Component(Signal, value="/gpfs/test/test-beamline", kind=Kind.config) file_prefix = Component(Signal, value="file", kind=Kind.config) - # Configuration attributes - rest_url = Component(Signal, kind=Kind.config, metadata={'write_access': False}) + # Configuration attributes + rest_url = Component(Signal, kind=Kind.config, metadata={"write_access": False}) cfg_detector_name = Component(Signal, kind=Kind.config) cfg_detector_type = Component(Signal, kind=Kind.config) cfg_bit_depth = Component(Signal, kind=Kind.config) @@ -176,10 +187,19 @@ class StdDaqClient(PSIDeviceBase): device_manager=None, ws_url: str = "ws://localhost:8080", rest_url: str = "http://localhost:5000", - data_source_name = None, + data_source_name=None, **kwargs, ) -> None: - super().__init__(prefix=prefix, name=name, kind=kind, read_attrs=read_attrs, configuration_attrs=configuration_attrs, parent=parent, device_manager=device_manager, **kwargs) + super().__init__( + prefix=prefix, + name=name, + kind=kind, + read_attrs=read_attrs, + configuration_attrs=configuration_attrs, + parent=parent, + device_manager=device_manager, + **kwargs, + ) self.ws_url.set(ws_url, force=True).wait() self.rest_url.set(rest_url, force=True).wait() self.data_source_name = data_source_name @@ -264,47 +284,49 @@ class StdDaqClient(PSIDeviceBase): """ # Configuration parameters - if 'image_width' in d and d['image_width']!=None: - self.cfg_pixel_width.set(d['image_width']).wait() - if 'image_height' in d and d['image_height']!=None: - self.cfg_pixel_height.set(d['image_height']).wait() - if 'bit_depth' in d: - self.cfg_bit_depth.set(d['bit_depth']).wait() - if 'nr_writers' in d and d['nr_writers']!=None: - self.cfg_nr_writers.set(d['nr_writers']).wait() + if "image_width" in d and d["image_width"] != None: + self.cfg_pixel_width.set(d["image_width"]).wait() + if "image_height" in d and d["image_height"] != None: + self.cfg_pixel_height.set(d["image_height"]).wait() + if "bit_depth" in d: + self.cfg_bit_depth.set(d["bit_depth"]).wait() + if "nr_writers" in d and d["nr_writers"] != None: + self.cfg_nr_writers.set(d["nr_writers"]).wait() # Run parameters - if 'num_points_total' in d: - self.num_images.set(d['num_points_total']).wait() + if "num_points_total" in d: + self.num_images.set(d["num_points_total"]).wait() # Restart the DAQ if resolution changed cfg = self.get_daq_config() - if cfg['image_pixel_height'] != self.cfg_pixel_height.get() or \ - cfg['image_pixel_width'] != self.cfg_pixel_width.get() or \ - cfg['bit_depth'] != self.cfg_bit_depth.get() or \ - cfg['number_of_writers'] != self.cfg_nr_writers.get(): - + if ( + cfg["image_pixel_height"] != self.cfg_pixel_height.get() + or cfg["image_pixel_width"] != self.cfg_pixel_width.get() + or cfg["bit_depth"] != self.cfg_bit_depth.get() + or cfg["number_of_writers"] != self.cfg_nr_writers.get() + ): + # Stop if current status is not idle if self.state() != "idle": logger.warning(f"[{self.name}] stdDAQ reconfiguration might corrupt files") # Update retrieved config - cfg['image_pixel_height'] = int(self.cfg_pixel_height.get()) - cfg['image_pixel_width'] = int(self.cfg_pixel_width.get()) - cfg['bit_depth'] = int(self.cfg_bit_depth.get()) - cfg['number_of_writers'] = int(self.cfg_nr_writers.get()) + cfg["image_pixel_height"] = int(self.cfg_pixel_height.get()) + cfg["image_pixel_width"] = int(self.cfg_pixel_width.get()) + cfg["bit_depth"] = int(self.cfg_bit_depth.get()) + cfg["number_of_writers"] = int(self.cfg_nr_writers.get()) self.set_daq_config(cfg) sleep(1) self.get_daq_config(update=True) def bluestage(self): - """ Stages the stdDAQ + """Stages the stdDAQ - Opens a new connection to the stdDAQ, sends the start command with - the current configuration. It waits for the first reply and checks - it for obvious failures. + Opens a new connection to the stdDAQ, sends the start command with + the current configuration. It waits for the first reply and checks + it for obvious failures. """ # Can't stage into a running exposure - if self.state() != 'idle': + if self.state() != "idle": raise RuntimeError(f"[{self.name}] stdDAQ can't stage from state: {self.state()}") # Must make sure that image size matches the data source @@ -315,10 +337,13 @@ class StdDaqClient(PSIDeviceBase): daq_img_h = self.cfg_pixel_height.get() if not (daq_img_w == cam_img_w and daq_img_h == cam_img_h): - raise RuntimeError(f"[{self.name}] stdDAQ image resolution ({daq_img_w} , {daq_img_h}) does not match camera with ({cam_img_w} , {cam_img_h})") + raise RuntimeError( + f"[{self.name}] stdDAQ image resolution ({daq_img_w} , {daq_img_h}) does not match camera with ({cam_img_w} , {cam_img_h})" + ) else: - logger.warning(f"[{self.name}] stdDAQ image resolution ({daq_img_w} , {daq_img_h}) matches camera with ({cam_img_w} , {cam_img_h})") - + logger.warning( + f"[{self.name}] stdDAQ image resolution ({daq_img_w} , {daq_img_h}) matches camera with ({cam_img_w} , {cam_img_h})" + ) file_path = self.file_path.get() num_images = self.num_images.get() @@ -327,7 +352,12 @@ class StdDaqClient(PSIDeviceBase): # New connection self._wsclient = self.connect() - message = {"command": "start", "path": file_path, "file_prefix": file_prefix, "n_image": num_images, } + message = { + "command": "start", + "path": file_path, + "file_prefix": file_prefix, + "n_image": num_images, + } reply = self.message(message) if reply is not None: @@ -338,8 +368,10 @@ class StdDaqClient(PSIDeviceBase): # Give it more time to reconfigure if reply["status"] in ("rejected"): # FIXME: running exposure is a nogo - if reply['reason'] == "driver is busy!": - raise RuntimeError(f"[{self.name}] Start stdDAQ command rejected: already running") + if reply["reason"] == "driver is busy!": + raise RuntimeError( + f"[{self.name}] Start stdDAQ command rejected: already running" + ) else: # Give it more time to consolidate sleep(1) @@ -348,16 +380,18 @@ class StdDaqClient(PSIDeviceBase): print(f"[{self.name}] Started stdDAQ in: {reply['status']}") return - raise RuntimeError(f"[{self.name}] Failed to start the stdDAQ in 1 tries, reason: {reply['reason']}") + raise RuntimeError( + f"[{self.name}] Failed to start the stdDAQ in 1 tries, reason: {reply['reason']}" + ) def blueunstage(self): - """ Unstages the stdDAQ + """Unstages the stdDAQ - Opens a new connection to the stdDAQ, sends the stop command and - waits for the idle state. + Opens a new connection to the stdDAQ, sends the stop command and + waits for the idle state. """ ii = 0 - while ii<10: + while ii < 10: # Stop the DAQ (will close connection) - reply is always "success" self._wsclient = self.connect() self.message({"command": "stop_all"}, wait_reply=False) @@ -371,7 +405,7 @@ class StdDaqClient(PSIDeviceBase): if reply is not None: logger.info(f"[{self.name}] DAQ status reply: {reply}") reply = json.loads(reply) - + if reply["status"] in ("idle", "error"): # Only 'idle' state accepted print(f"DAQ stopped on try {ii}") @@ -388,6 +422,7 @@ class StdDaqClient(PSIDeviceBase): # Bluesky flyer interface def complete(self) -> SubscriptionStatus: """Wait for current run. Must end in status 'file_saved'.""" + def is_running(*args, value, timestamp, **kwargs): result = value in ["idle", "file_saved", "error"] return result @@ -396,40 +431,35 @@ class StdDaqClient(PSIDeviceBase): return status def get_daq_config(self, update=False) -> dict: - """Read the current configuration from the DAQ - """ - r = requests.get( - self.rest_url.get() + '/api/config/get', - params={'user': "ioc"}, - timeout=2) + """Read the current configuration from the DAQ""" + r = requests.get(self.rest_url.get() + "/api/config/get", params={"user": "ioc"}, timeout=2) if r.status_code != 200: raise ConnectionError(f"[{self.name}] Error {r.status_code}:\t{r.text}") cfg = r.json() if update: - self.cfg_detector_name.set(cfg['detector_name']).wait() - self.cfg_detector_type.set(cfg['detector_type']).wait() - self.cfg_bit_depth.set(cfg['bit_depth']).wait() - self.cfg_pixel_height.set(cfg['image_pixel_height']).wait() - self.cfg_pixel_width.set(cfg['image_pixel_width']).wait() - self.cfg_nr_writers.set(cfg['number_of_writers']).wait() + self.cfg_detector_name.set(cfg["detector_name"]).wait() + self.cfg_detector_type.set(cfg["detector_type"]).wait() + self.cfg_bit_depth.set(cfg["bit_depth"]).wait() + self.cfg_pixel_height.set(cfg["image_pixel_height"]).wait() + self.cfg_pixel_width.set(cfg["image_pixel_width"]).wait() + self.cfg_nr_writers.set(cfg["number_of_writers"]).wait() return cfg def set_daq_config(self, config, settle_time=1): - """Write a full configuration to the DAQ - """ - url = self.rest_url.get() + '/api/config/set' + """Write a full configuration to the DAQ""" + url = self.rest_url.get() + "/api/config/set" r = requests.post( - url, - params={"user": "ioc"}, - json=config, - timeout=2, - headers={"Content-Type": "application/json"} - ) + url, + params={"user": "ioc"}, + json=config, + timeout=2, + headers={"Content-Type": "application/json"}, + ) if r.status_code != 200: raise ConnectionError(f"[{self.name}] Error {r.status_code}:\t{r.text}") # Wait for service to restart (and connect to make sure) - #sleep(settle_time) + # sleep(settle_time) self.connect() return r.json() @@ -437,20 +467,24 @@ class StdDaqClient(PSIDeviceBase): """Combine the stddaq written files in a given folder in an interleaved h5 virtual dataset """ - url = self.rest_url.get() + '/api/h5/create_interleaved_vds' + url = self.rest_url.get() + "/api/h5/create_interleaved_vds" file_path = self.file_path.get() file_prefix = self.file_prefix.get() - + r = requests.post( url, - params = {'user': 'ioc'}, - json = {'base_path': file_path, 'file_prefix': file_prefix, 'output_file': file_prefix.rstrip('_') + '.h5'}, - timeout = 2, - headers = {'Content-type': 'application/json'} + params={"user": "ioc"}, + json={ + "base_path": file_path, + "file_prefix": file_prefix, + "output_file": file_prefix.rstrip("_") + ".h5", + }, + timeout=2, + headers={"Content-type": "application/json"}, ) def nuke(self, restarttime=5): - """ Reconfigures the stdDAQ to restart the services. This causes + """Reconfigures the stdDAQ to restart the services. This causes systemd to kill the current DAQ service and restart it with the same configuration. Which might corrupt the currently written file... """ @@ -459,18 +493,20 @@ class StdDaqClient(PSIDeviceBase): sleep(restarttime) def state(self) -> str | None: - """ Querry the current system status""" + """Querry the current system status""" try: wsclient = self.connect() - wsclient.send(json.dumps({'command': 'status'})) + wsclient.send(json.dumps({"command": "status"})) r = wsclient.recv(timeout=1) r = json.loads(r) - return r['status'] + return r["status"] except ConnectionRefusedError: raise # Automatically connect to microXAS testbench if directly invoked if __name__ == "__main__": - daq = StdDaqClient(name="daq", ws_url="ws://sls-daq-001:8080", rest_url="http://sls-daq-001:5000") + daq = StdDaqClient( + name="daq", ws_url="ws://sls-daq-001:8080", rest_url="http://sls-daq-001:5000" + ) daq.wait_for_connection()