Linting to 9.22

This commit is contained in:
gac-x05la
2025-02-14 13:34:02 +01:00
parent 163ef3c7a5
commit b8dcda1696
4 changed files with 297 additions and 262 deletions

View File

@@ -26,9 +26,11 @@ except ModuleNotFoundError:
try:
from bec_lib import bec_logger
logger = bec_logger.logger
except ModuleNotFoundError:
import logging
logger = logging.getLogger("GfCam")
@@ -38,8 +40,9 @@ class GigaFrostCameraMixin(CustomDetectorMixin):
This class will be called by the custom_prepare_cls attribute of the
detector class.
"""
# pylint: disable=protected-access
def _define_backend_ip(self):
""" Select backend IP address for UDP stream"""
"""Select backend IP address for UDP stream"""
if self.parent.backendUrl.get() == const.BE3_DAFL_CLIENT: # xbl-daq-33
return const.BE3_NORTH_IP, const.BE3_SOUTH_IP
if self.parent.backendUrl.get() == const.BE999_DAFL_CLIENT:
@@ -48,7 +51,7 @@ class GigaFrostCameraMixin(CustomDetectorMixin):
raise RuntimeError(f"Backend {self.parent.backendUrl.get()} not recognized.")
def _define_backend_mac(self):
""" Select backend MAC address for UDP stream"""
"""Select backend MAC address for UDP stream"""
if self.parent.backendUrl.get() == const.BE3_DAFL_CLIENT: # xbl-daq-33
return const.BE3_NORTH_MAC, const.BE3_SOUTH_MAC
if self.parent.backendUrl.get() == const.BE999_DAFL_CLIENT:
@@ -74,7 +77,7 @@ class GigaFrostCameraMixin(CustomDetectorMixin):
self.parent.macSouth.get(),
self.parent.ipSouth.get(),
dest_port,
source_port
source_port,
)
else:
extend_header_table(
@@ -82,20 +85,20 @@ class GigaFrostCameraMixin(CustomDetectorMixin):
self.parent.macNorth.get(),
self.parent.ipNorth.get(),
dest_port,
source_port
source_port,
)
return udp_header_table
def on_init(self) -> None:
""" Initialize the camera, set channel values"""
"""Initialize the camera, set channel values"""
# ToDo: Not sure if it's a good idea to change camera settings upon
# ophyd device startup, i.e. each deviceserver restart.
self._init_gigafrost()
self.parent._initialized = True
def _init_gigafrost(self) -> None:
""" Initialize the camera, set channel values"""
"""Initialize the camera, set channel values"""
# Stop acquisition
self.parent.cmdStartCamera.set(0).wait()
@@ -142,7 +145,7 @@ class GigaFrostCameraMixin(CustomDetectorMixin):
return super().on_init()
def on_stage(self) -> None:
""" Configuration and staging
"""Configuration and staging
In the BEC model ophyd devices must fish out their own configuration from the 'scaninfo'.
I.e. they need to know which parameters are relevant for them at each scan.
@@ -158,25 +161,23 @@ class GigaFrostCameraMixin(CustomDetectorMixin):
logger.warning(
f"[{self.parent.name}] Ophyd device havent ran the initialization sequence,"
"IOC might be in unknown configuration."
)
)
# Fish out our configuration from scaninfo (via explicit or generic addressing)
scanparam = self.parent.scaninfo.scan_msg.info
alias = self.parent.parent.name if self.parent.parent is not None else self.parent.name
# logger.warning(f"[{alias}] Scan parameters:\n{scanparam}")
d = {}
if 'kwargs' in scanparam:
scanargs = scanparam['kwargs']
if 'image_width' in scanargs and scanargs['image_width']!=None:
d['image_width'] = scanargs['image_width']
if 'image_height' in scanargs and scanargs['image_height']!=None:
d['image_height'] = scanargs['image_height']
if 'exp_time' in scanargs and scanargs['exp_time']!=None:
d['exposure_time_ms'] = scanargs['exp_time']
if 'exp_burst' in scanargs and scanargs['exp_burst']!=None:
d['exposure_num_burst'] = scanargs['exp_burst']
if 'acq_mode' in scanargs and scanargs['acq_mode']!=None:
d['acq_mode'] = scanargs['acq_mode']
if "kwargs" in scanparam:
scanargs = scanparam["kwargs"]
if "image_width" in scanargs and scanargs["image_width"] is not None:
d["image_width"] = scanargs["image_width"]
if "image_height" in scanargs and scanargs["image_height"] is not None:
d["image_height"] = scanargs["image_height"]
if "exp_time" in scanargs and scanargs["exp_time"] is not None:
d["exposure_time_ms"] = scanargs["exp_time"]
if "exp_burst" in scanargs and scanargs["exp_burst"] is not None:
d["exposure_num_burst"] = scanargs["exp_burst"]
if "acq_mode" in scanargs and scanargs["acq_mode"] is not None:
d["acq_mode"] = scanargs["acq_mode"]
# elif self.parent.scaninfo.scan_type == "step":
# d['acq_mode'] = "default"
@@ -217,13 +218,17 @@ class GigaFrostCameraMixin(CustomDetectorMixin):
Specify actions to be executed upon receiving trigger signal.
Return a DeviceStatus object or None
"""
if self.parent.infoBusyFlag.get() in (0, 'IDLE'):
raise RuntimeError('GigaFrost must be running before triggering')
if self.parent.infoBusyFlag.get() in (0, "IDLE"):
raise RuntimeError("GigaFrost must be running before triggering")
logger.warning(f"[{self.parent.name}] SW triggering gigafrost")
# Soft triggering based on operation mode
if self.parent.autoSoftEnable.get() and self.parent.trigger_mode == 'auto' and self.parent.enable_mode == 'soft':
if (
self.parent.autoSoftEnable.get()
and self.parent.trigger_mode == "auto"
and self.parent.enable_mode == "soft"
):
# BEC teststand operation mode: posedge of SoftEnable if Started
self.parent.cmdSoftEnable.set(0).wait()
self.parent.cmdSoftEnable.set(1).wait()
@@ -254,6 +259,7 @@ class GigaFrostCamera(PSIDetectorBase):
----------
FRAMERATE : Ignored in soft trigger mode, period becomes 2xExposure time
"""
# pylint: disable=too-many-instance-attributes
custom_prepare_cls = GigaFrostCameraMixin
@@ -267,12 +273,13 @@ class GigaFrostCamera(PSIDetectorBase):
cmdSetParam = Component(EpicsSignal, "SET_PARAM.PROC", put_complete=True, kind=Kind.omitted)
cfgAcqMode = Component(EpicsSignal, "ACQMODE", put_complete=True, kind=Kind.config)
array_size = DynamicDeviceComponent({
"array_size_x": (EpicsSignalRO, "ROIX", {'auto_monitor': True}),
"array_size_y": (EpicsSignalRO, "ROIY", {'auto_monitor': True}),
}, doc="Size of the array in the XY dimensions")
array_size = DynamicDeviceComponent(
{
"array_size_x": (EpicsSignalRO, "ROIX", {"auto_monitor": True}),
"array_size_y": (EpicsSignalRO, "ROIY", {"auto_monitor": True}),
},
doc="Size of the array in the XY dimensions",
)
# UDP header
cfgUdpNumPorts = Component(EpicsSignal, "PORTS", put_complete=True, kind=Kind.config)
@@ -282,24 +289,26 @@ class GigaFrostCamera(PSIDetectorBase):
# Standard camera configs
cfgExposure = Component(
EpicsSignal, "EXPOSURE", put_complete=True, auto_monitor=True, kind=Kind.config)
EpicsSignal, "EXPOSURE", put_complete=True, auto_monitor=True, kind=Kind.config
)
cfgFramerate = Component(
EpicsSignal, "FRAMERATE", put_complete=True, auto_monitor=True, kind=Kind.config)
cfgRoiX = Component(
EpicsSignal, "ROIX", put_complete=True, auto_monitor=True, kind=Kind.config)
cfgRoiY = Component(
EpicsSignal, "ROIY", put_complete=True, auto_monitor=True, kind=Kind.config)
EpicsSignal, "FRAMERATE", put_complete=True, auto_monitor=True, kind=Kind.config
)
cfgRoiX = Component(EpicsSignal, "ROIX", put_complete=True, auto_monitor=True, kind=Kind.config)
cfgRoiY = Component(EpicsSignal, "ROIY", put_complete=True, auto_monitor=True, kind=Kind.config)
cfgScanId = Component(
EpicsSignal, "SCAN_ID", put_complete=True, auto_monitor=True, kind=Kind.config)
EpicsSignal, "SCAN_ID", put_complete=True, auto_monitor=True, kind=Kind.config
)
cfgCntNum = Component(
EpicsSignal, "CNT_NUM", put_complete=True, auto_monitor=True, kind=Kind.config)
EpicsSignal, "CNT_NUM", put_complete=True, auto_monitor=True, kind=Kind.config
)
cfgCorrMode = Component(
EpicsSignal, "CORR_MODE", put_complete=True, auto_monitor=True, kind=Kind.config)
EpicsSignal, "CORR_MODE", put_complete=True, auto_monitor=True, kind=Kind.config
)
# Software signals
cmdSoftEnable = Component(EpicsSignal, "SOFT_ENABLE", put_complete=True)
cmdSoftTrigger = Component(
EpicsSignal, "SOFT_TRIG.PROC", put_complete=True, kind=Kind.omitted)
cmdSoftTrigger = Component(EpicsSignal, "SOFT_TRIG.PROC", put_complete=True, kind=Kind.omitted)
cmdSoftExposure = Component(EpicsSignal, "SOFT_EXP", put_complete=True)
cfgAcqMode = Component(EpicsSignal, "ACQMODE", put_complete=True, kind=Kind.config)
@@ -403,11 +412,7 @@ class GigaFrostCamera(PSIDetectorBase):
kind=Kind.config,
)
cfgCntEndBit = Component(
EpicsSignal,
"CNT_ENDBIT_RBV",
write_pv="CNT_ENDBIT",
put_complete=True,
kind=Kind.config
EpicsSignal, "CNT_ENDBIT_RBV", write_pv="CNT_ENDBIT", put_complete=True, kind=Kind.config
)
# Line swap selection
@@ -457,32 +462,41 @@ class GigaFrostCamera(PSIDetectorBase):
):
# Ugly hack to pass values before on_init()
self._signals_to_be_set = {}
self._signals_to_be_set['auto_soft_enable'] = auto_soft_enable
self._signals_to_be_set['backend_url'] = backend_url
self._signals_to_be_set["auto_soft_enable"] = auto_soft_enable
self._signals_to_be_set["backend_url"] = backend_url
# super() will call the mixin class
super().__init__(prefix=prefix, name=name, kind=kind, read_attrs=read_attrs, configuration_attrs=configuration_attrs, parent=parent, device_manager=device_manager, **kwargs)
super().__init__(
prefix=prefix,
name=name,
kind=kind,
read_attrs=read_attrs,
configuration_attrs=configuration_attrs,
parent=parent,
device_manager=device_manager,
**kwargs,
)
def _init(self):
"""Ugly hack: values must be set before on_init() is called"""
# Additional parameters
self.autoSoftEnable._metadata["write_access"] = False
self.backendUrl._metadata["write_access"] = False
self.autoSoftEnable.put(self._signals_to_be_set['auto_soft_enable'], force=True)
self.backendUrl.put(self._signals_to_be_set['backend_url'], force=True)
self.autoSoftEnable.put(self._signals_to_be_set["auto_soft_enable"], force=True)
self.backendUrl.put(self._signals_to_be_set["backend_url"], force=True)
return super()._init()
def initialize(self):
""" Initialization in separate command"""
"""Initialization in separate command"""
self.custom_prepare._init_gigafrost()
self._initialized = True
def trigger(self) -> DeviceStatus:
""" Sends a software trigger to GigaFrost"""
"""Sends a software trigger to GigaFrost"""
super().trigger()
# There's no status readback from the camera, so we just wait
sleep_time = self.cfgExposure.value*self.cfgCntNum.value*0.001+0.2
sleep_time = self.cfgExposure.value * self.cfgCntNum.value * 0.001 + 0.2
sleep(sleep_time)
return DeviceStatus(self, done=True, success=True, settle_time=sleep_time)
@@ -527,40 +541,40 @@ class GigaFrostCamera(PSIDetectorBase):
# If Bluesky style configure
if d is not None:
# Commonly changed settings
if 'exposure_num_burst' in d:
self.cfgCntNum.set(d['exposure_num_burst']).wait()
if 'exposure_time_ms' in d:
self.cfgExposure.set(d['exposure_time_ms']).wait()
if 'exposure_period_ms' in d:
self.cfgFramerate.set(d['exposure_period_ms']).wait()
if 'image_width' in d:
if d['image_width']%48 !=0:
if "exposure_num_burst" in d:
self.cfgCntNum.set(d["exposure_num_burst"]).wait()
if "exposure_time_ms" in d:
self.cfgExposure.set(d["exposure_time_ms"]).wait()
if "exposure_period_ms" in d:
self.cfgFramerate.set(d["exposure_period_ms"]).wait()
if "image_width" in d:
if d["image_width"] % 48 != 0:
raise RuntimeError(f"[{self.name}] image_width must be divisible by 48")
self.cfgRoiX.set(d['image_width']).wait()
if 'image_height' in d:
if d['image_height']%16 !=0:
self.cfgRoiX.set(d["image_width"]).wait()
if "image_height" in d:
if d["image_height"] % 16 != 0:
raise RuntimeError(f"[{self.name}] image_height must be divisible by 16")
self.cfgRoiY.set(d['image_height']).wait()
self.cfgRoiY.set(d["image_height"]).wait()
# Dont change these
scanid = d.get('scanid', 0)
correction_mode = d.get('correction_mode', 5)
scanid = d.get("scanid", 0)
correction_mode = d.get("correction_mode", 5)
self.cfgScanId.set(scanid).wait()
self.cfgCorrMode.set(correction_mode).wait()
if 'acq_mode' in d:
self.set_acquisition_mode(d['acq_mode'])
if "acq_mode" in d:
self.set_acquisition_mode(d["acq_mode"])
# Commit parameters
self.cmdSetParam.set(1).wait()
def bluestage(self):
""" Bluesky style stage"""
"""Bluesky style stage"""
# Switch to acquiring
self.cmdStartCamera.set(1).wait()
def set_acquisition_mode(self, acq_mode):
""" Set acquisition mode
"""Set acquisition mode
Utility function to quickly select between pre-configured and tested
acquisition modes.
@@ -577,7 +591,7 @@ class GigaFrostCamera(PSIDetectorBase):
self.cfgEnableScheme.set(0).wait()
# Set modes
#self.cmdSoftEnable.set(0).wait()
# self.cmdSoftEnable.set(0).wait()
self.enable_mode = "soft"
self.trigger_mode = "auto"
self.exposure_mode = "timer"
@@ -834,11 +848,11 @@ class GigaFrostCamera(PSIDetectorBase):
The GigaFRoST enable mode. Valid arguments are:
* 'soft':
The GigaFRoST enable signal is supplied through a software
The GigaFRoST enable signal is supplied through a software
signal
* 'external':
The GigaFRoST enable signal is supplied through an external TTL
gating signal from the rotaiton stage or some other control
The GigaFRoST enable signal is supplied through an external TTL
gating signal from the rotaiton stage or some other control
unit
* 'soft+ext':
The GigaFRoST enable signal can be supplied either via the
@@ -851,9 +865,7 @@ class GigaFrostCamera(PSIDetectorBase):
"""
if mode not in const.gf_valid_enable_modes:
raise ValueError(
"Invalid enable mode! Valid modes are:\n{const.gf_valid_enable_modes}"
)
raise ValueError("Invalid enable mode! Valid modes are:\n{const.gf_valid_enable_modes}")
if mode == "soft":
self.cfgEnableExt.set(0).wait()

View File

@@ -6,22 +6,19 @@ Created on Thu Jun 27 17:28:43 2024
@author: mohacsi_i
"""
import json
import enum
from time import sleep, time
from threading import Thread
import zmq
import numpy as np
from ophyd import Device, Signal, Component, Kind, DeviceStatus
from ophyd import Device, Signal, Component, Kind
from ophyd_devices.interfaces.base_classes.psi_detector_base import (
CustomDetectorMixin,
PSIDetectorBase,
)
from bec_lib import bec_logger
logger = bec_logger.logger
ZMQ_TOPIC_FILTER = b''
logger = bec_logger.logger
ZMQ_TOPIC_FILTER = b""
class PcoTestConsumerMixin(CustomDetectorMixin):
@@ -29,6 +26,7 @@ class PcoTestConsumerMixin(CustomDetectorMixin):
Parent class: CustomDetectorMixin
"""
# pylint: disable=protected-access
def on_stage(self):
"""Start listening for preview data stream"""
if self.parent._mon is not None:
@@ -72,29 +70,27 @@ class PcoTestConsumerMixin(CustomDetectorMixin):
t_elapsed = t_curr - t_last
if t_elapsed < self.parent.throttle.get():
continue
"""
# Unpack the Array V1 reply to metadata and array data
meta, data = r
print(meta)
# # Unpack the Array V1 reply to metadata and array data
# meta, data = r
# print(meta)
# Update image and update subscribers
header = json.loads(meta)
if header["type"] == "uint16":
image = np.frombuffer(data, dtype=np.uint16)
if image.size != np.prod(header['shape']):
err = f"Unexpected array size of {image.size} for header: {header}"
raise ValueError(err)
image = image.reshape(header['shape'])
# # Update image and update subscribers
# header = json.loads(meta)
# if header["type"] == "uint16":
# image = np.frombuffer(data, dtype=np.uint16)
# if image.size != np.prod(header['shape']):
# err = f"Unexpected array size of {image.size} for header: {header}"
# raise ValueError(err)
# image = image.reshape(header['shape'])
# Update image and update subscribers
self.parent.frame.put(header['frame'], force=True)
self.parent.image_shape.put(header['shape'], force=True)
self.parent.image.put(image, force=True)
self.parent._last_image = image
self.parent._run_subs(sub_type=self.parent.SUB_MONITOR, value=image)
"""
# # Update image and update subscribers
# self.parent.frame.put(header['frame'], force=True)
# self.parent.image_shape.put(header['shape'], force=True)
# self.parent.image.put(image, force=True)
# self.parent._last_image = image
# self.parent._run_subs(sub_type=self.parent.SUB_MONITOR, value=image)
t_last = t_curr
#logger.info(
# logger.info(
# f"[{self.parent.name}] Updated frame {header['frame']}\t"
# f"Shape: {header['shape']}\tMean: {np.mean(image):.3f}"
# )
@@ -110,7 +106,7 @@ class PcoTestConsumerMixin(CustomDetectorMixin):
finally:
try:
self.parent._socket.disconnect()
except:
except RuntimeError:
pass
self.parent._mon = None
logger.info(f"[{self.parent.name}]\tDetaching monitor")
@@ -126,6 +122,7 @@ class PcoTestConsumer(PSIDetectorBase):
You can add a preview widget to the dock by:
cam_widget = gui.add_dock('cam_dock1').add_widget('BECFigure').image('daq_stream1')
"""
# Subscriptions for plotting image
USER_ACCESS = ["get_last_image"]
SUB_MONITOR = "device_monitor_2d"
@@ -171,8 +168,7 @@ class PcoTestConsumer(PSIDetectorBase):
self._socket.connect(self.url.get())
def disconnect(self):
"""Disconnect
"""
"""Disconnect"""
try:
if self._socket is not None:
self._socket.disconnect(self.url.get())
@@ -181,7 +177,6 @@ class PcoTestConsumer(PSIDetectorBase):
finally:
self._socket = None
def get_image(self):
return self._last_image

View File

@@ -4,13 +4,12 @@ Created on Wed Dec 6 11:33:54 2023
@author: mohacsi_i
"""
import time
from ophyd import Component, EpicsSignal, EpicsSignalRO, Kind
from ophyd.status import SubscriptionStatus, DeviceStatus
import time
from ophyd_devices.interfaces.base_classes.psi_detector_base import PSIDetectorBase as PSIDeviceBase
from ophyd_devices import BECDeviceBase
from ophyd_devices.interfaces.base_classes.psi_detector_base import (
CustomDetectorMixin as CustomDeviceMixin,
CustomDetectorMixin as CustomPrepare,
)
try:
@@ -23,12 +22,12 @@ except ModuleNotFoundError:
logger = logging.getLogger("PcoEdgeCam")
class PcoEdgeCameraMixin(CustomDeviceMixin):
class PcoEdgeCameraMixin(CustomPrepare):
"""Mixin class to setup the Helge camera bae class.
This class will be called by the custom_prepare_cls attribute of the detector class.
"""
# pylint: disable=protected-access
def on_stage(self) -> None:
"""Configure and arm PCO.Edge camera for acquisition"""
@@ -91,7 +90,6 @@ class PcoEdgeCameraMixin(CustomDeviceMixin):
NOTE: Maciej confirmed that sparse data is no problem to the stdDAQ.
TODO: Optimize data transfer to launch at end and check completion at the beginning.
"""
logger.warning(f"triggering PCO")
# Ensure that previous data transfer finished
# def sentIt(*args, value, timestamp, **kwargs):
# return value==0
@@ -99,11 +97,11 @@ class PcoEdgeCameraMixin(CustomDeviceMixin):
# status.wait()
# Not sure if it always sends the first batch of images or the newest
def didWeReset(*args, old_value, value, timestamp, **kwargs):
def wait_bufferreset(*, old_value, value, timestamp, **_):
return (value < old_value) or (value == 0)
self.parent.buffer_clear.set(1).wait()
status = SubscriptionStatus(self.parent.buffer_used, didWeReset, timeout=5)
status = SubscriptionStatus(self.parent.buffer_used, wait_bufferreset, timeout=5)
status.wait()
t_expected = (
@@ -111,13 +109,13 @@ class PcoEdgeCameraMixin(CustomDeviceMixin):
) * self.parent.file_savestop.get()
# Wait until the buffer fills up with enough images
def areWeDoneYet(*args, old_value, value, timestamp, **kwargs):
def wait_acquisition(*, value, timestamp, **_):
num_target = self.parent.file_savestop.get()
# logger.warning(f"{value} of {num_target}")
return bool(value >= num_target)
max_wait = max(5, 5 * t_expected)
status = SubscriptionStatus(
self.parent.buffer_used, areWeDoneYet, timeout=max(5, 5 * t_expected), settle_time=0.2
self.parent.buffer_used, wait_acquisition, timeout=max_wait, settle_time=0.2
)
status.wait()
@@ -130,19 +128,18 @@ class PcoEdgeCameraMixin(CustomDeviceMixin):
# against values from the previous cycle, i.e. pass automatically.
t_start = time.time()
def haveWeSentIt(*args, old_value, value, timestamp, **kwargs):
def wait_sending(*args, old_value, value, timestamp, **kwargs):
t_elapsed = timestamp - t_start
# logger.warning(f"{old_value}\t{value}\t{t_elapsed}")
return old_value == 1 and value == 0 and t_elapsed > 0
status = SubscriptionStatus(
self.parent.file_savebusy, haveWeSentIt, timeout=120, settle_time=0.2
self.parent.file_savebusy, wait_sending, timeout=120, settle_time=0.2
)
status.wait()
logger.warning(f"done PCO")
class HelgeCameraBase(PSIDeviceBase):
class HelgeCameraBase(BECDeviceBase):
"""Ophyd baseclass for Helge camera IOCs
This class provides wrappers for Helge's camera IOCs around SwissFEL and
@@ -161,19 +158,21 @@ class HelgeCameraBase(PSIDeviceBase):
UPDATE: Data sending operation modes
Switch to ZMQ streaming by setting FILEFORMAT to ZEROMQ, set SAVESTART and SAVESTOP to select a ROI of images and start file transfer with FTRANSFER.
The ZMQ connection streams out the data in PUSH-PULL mode, i.e. it needs incoming connection.
- Switch to ZMQ streaming by setting FILEFORMAT to ZEROMQ
- Set SAVESTART and SAVESTOP to select a ROI of image indices
- Start file transfer with FTRANSFER.
The ZMQ connection operates in PUSH-PULL mode, i.e. it needs incoming connection.
STOREMODE sets the acquisition mode:
if STOREMODE == Recorder
Fills up the buffer with images and SAVESTART and SAVESTOP selects a ROI of images to be streamed
Fills up the buffer with images. Here SAVESTART and SAVESTOP selects a ROI
of image indices to be streamed out (i.e. maximum buffer_size number of images)
if STOREMODE == FIFO buffer
Continously streams out data using the buffer as a FIFO queue and SAVESTART and SAVESTOP selects a ROI of images to be streamed continously (i.e. a large SAVESTOP streams indefinitely)
Note that in FIFO mode buffer reads are destructive, to prevent this, we don't have EPICS preview
Continously streams out data using the buffer as a FIFO queue.
Here SAVESTART and SAVESTOP selects a ROI of image indices to be streamed continously
(i.e. a large SAVESTOP streams indefinitely). Note that in FIFO mode buffer reads are
destructive. to prevent this, we don't have EPICS preview
"""
# ########################################################################
@@ -274,7 +273,7 @@ class HelgeCameraBase(PSIDeviceBase):
@state.setter
def state(self):
raise ReadOnlyError("State is a ReadOnly property")
raise RuntimeError("State is a ReadOnly property")
def configure(self, d: dict = {}) -> tuple:
"""Configure the base Helge camera device
@@ -323,11 +322,11 @@ class HelgeCameraBase(PSIDeviceBase):
# So we need a 'negedge' on SET_PARAM
self.camSetParam.set(1).wait()
def fallingEdge(*args, old_value, value, timestamp, **kwargs):
def negedge(*, old_value, value, timestamp, **_):
return bool(old_value and not value)
# Subscribe and wait for update
status = SubscriptionStatus(self.camSetParam, fallingEdge, timeout=5, settle_time=0.5)
status = SubscriptionStatus(self.camSetParam, negedge, timeout=5, settle_time=0.5)
status.wait()
def bluestage(self):
@@ -341,24 +340,23 @@ class HelgeCameraBase(PSIDeviceBase):
self.bufferStoreMode.get() in ("Recorder", 0)
and self.file_savestop.get() > self.buffer_size.get()
):
self.logger.warning(
"You're about to send some empty images, {self.file_savestop.get()} is above buffer size"
logger.warning(
f"You'll send empty images, {self.file_savestop.get()} is above buffer size"
)
# Start the acquisition (this sets parameers and starts acquisition)
self.camStatusCmd.set("Running").wait()
# Subscribe and wait for update
def isRunning(*args, old_value, value, timestamp, **kwargs):
def is_running(*, value, timestamp, **_):
return bool(value == 6)
status = SubscriptionStatus(self.camStatusCode, isRunning, timeout=5, settle_time=0.2)
status = SubscriptionStatus(self.camStatusCode, is_running, timeout=5, settle_time=0.2)
status.wait()
def blueunstage(self):
"""Bluesky style unstage: stop the detector"""
self.camStatusCmd.set("Idle").wait()
self.custom_prepare.stop_monitor = True
# Data streaming is stopped by setting the max index to 0
# FIXME: This might interrupt data transfer
@@ -371,13 +369,6 @@ class HelgeCameraBase(PSIDeviceBase):
"""
self.file_transfer.set(1).wait()
# def complete(self):
# """ Wait until the images have been sent"""
# def areWeSending(*args, value, timestamp, **kwargs):
# return not bool(value)
# status = SubscriptionStatus(self.file_savebusy, haveWeSentIt, timeout=None, settle_time=0.2)
# return status
class PcoEdge5M(HelgeCameraBase):
"""Ophyd baseclass for PCO.Edge cameras
@@ -402,7 +393,8 @@ class PcoEdge5M(HelgeCameraBase):
acqMode = Component(EpicsSignalRO, "ACQMODE", auto_monitor=True, kind=Kind.config)
acqDelay = Component(EpicsSignalRO, "DELAY", auto_monitor=True, kind=Kind.config)
acqTriggerEna = Component(EpicsSignalRO, "TRIGGER", auto_monitor=True, kind=Kind.config)
# acqTriggerSource = Component(EpicsSignalRO, "TRIGGERSOURCE", auto_monitor=True, kind=Kind.config)
# acqTriggerSource = Component(
# EpicsSignalRO, "TRIGGERSOURCE", auto_monitor=True, kind=Kind.config)
# acqTriggerEdge = Component(EpicsSignalRO, "TRIGGEREDGE", auto_monitor=True, kind=Kind.config)
# ########################################################################
@@ -465,9 +457,9 @@ class PcoEdge5M(HelgeCameraBase):
self.pxRoiY_lo.set(2160 / 2 - height / 2).wait()
self.pxRoiY_hi.set(2160 / 2 + height / 2).wait()
if "image_binx" in d and d["image_binx"] is not None:
self.pxBinX.set(d["image_binx"]).wait()
self.bin_x.set(d["image_binx"]).wait()
if "image_biny" in d and d["image_biny"] is not None:
self.pxBinY.set(d["image_biny"]).wait()
self.bin_y.set(d["image_biny"]).wait()
# Call super() to commit the changes
super().configure(d)
@@ -477,5 +469,5 @@ class PcoEdge5M(HelgeCameraBase):
if __name__ == "__main__":
# Drive data collection
cam = PcoEdgeBase("X02DA-CCDCAM2:", name="mcpcam")
cam = PcoEdge5M("X02DA-CCDCAM2:", name="mcpcam")
cam.wait_for_connection()

View File

@@ -12,25 +12,26 @@ from threading import Thread
import requests
import os
from ophyd import Device, Signal, Component, Kind, Staged
from ophyd import Signal, Component, Kind
from ophyd.status import SubscriptionStatus
from ophyd.flyers import FlyerInterface
from websockets.sync.client import connect, ClientConnection
from websockets.exceptions import ConnectionClosedOK, ConnectionClosedError
from ophyd_devices.interfaces.base_classes.psi_detector_base import PSIDetectorBase as PSIDeviceBase
from ophyd_devices.interfaces.base_classes.psi_detector_base import CustomDetectorMixin as CustomDeviceMixin
from ophyd_devices.interfaces.base_classes.psi_detector_base import (
CustomDetectorMixin as CustomDeviceMixin,
)
from bec_lib import bec_logger
from bec_lib.file_utils import FileWriter
logger = bec_logger.logger
class StdDaqMixin(CustomDeviceMixin):
# parent : StdDaqClient
# pylint: disable=protected-access
_mon = None
def on_stage(self) -> None:
""" Configuration and staging
"""Configuration and staging
In the BEC model ophyd devices must fish out their own configuration from the 'scaninfo'.
I.e. they need to know which parameters are relevant for them at each scan.
@@ -40,30 +41,30 @@ class StdDaqMixin(CustomDeviceMixin):
# Fish out our configuration from scaninfo (via explicit or generic addressing)
# NOTE: Scans don't have to fully configure the device
d = {}
if 'kwargs' in self.parent.scaninfo.scan_msg.info:
scanargs = self.parent.scaninfo.scan_msg.info['kwargs']
if 'image_width' in scanargs and scanargs['image_width'] != None:
d['image_width'] = scanargs['image_width']
if 'image_height' in scanargs and scanargs['image_height'] != None:
d['image_height'] = scanargs['image_height']
if 'nr_writers' in scanargs and scanargs['nr_writers'] != None:
d['nr_writers'] = scanargs['nr_writers']
if 'file_path' in scanargs and scanargs['file_path']!=None:
self.parent.file_path.set(scanargs['file_path'].replace('data','gpfs')).wait()
print(scanargs['file_path'])
if os.path.isdir(scanargs['file_path']):
if "kwargs" in self.parent.scaninfo.scan_msg.info:
scanargs = self.parent.scaninfo.scan_msg.info["kwargs"]
if "image_width" in scanargs and scanargs["image_width"] is not None:
d["image_width"] = scanargs["image_width"]
if "image_height" in scanargs and scanargs["image_height"] is not None:
d["image_height"] = scanargs["image_height"]
if "nr_writers" in scanargs and scanargs["nr_writers"] is not None:
d["nr_writers"] = scanargs["nr_writers"]
if "file_path" in scanargs and scanargs["file_path"] is not None:
self.parent.file_path.set(scanargs["file_path"].replace("data", "gpfs")).wait()
print(scanargs["file_path"])
if os.path.isdir(scanargs["file_path"]):
print("isdir")
pass
else:
print("creating")
try:
os.makedirs(scanargs['file_path'], 0o777)
os.system('chmod -R 777 ' + scanargs['base_path'])
os.makedirs(scanargs["file_path"], 0o777)
os.system("chmod -R 777 " + scanargs["base_path"])
except:
print('Problem with creating folder')
if 'file_prefix' in scanargs and scanargs['file_prefix']!=None:
print(scanargs['file_prefix'])
self.parent.file_prefix.set(scanargs['file_prefix']).wait()
print("Problem with creating folder")
if "file_prefix" in scanargs and scanargs["file_prefix"] != None:
print(scanargs["file_prefix"])
self.parent.file_prefix.set(scanargs["file_prefix"]).wait()
if "daq_num_points" in scanargs:
d["num_points_total"] = scanargs["daq_num_points"]
@@ -71,13 +72,13 @@ class StdDaqMixin(CustomDeviceMixin):
# Try to figure out number of points
num_points = 1
points_valid = False
if "steps" in scanargs and scanargs['steps'] is not None:
if "steps" in scanargs and scanargs["steps"] is not None:
num_points *= scanargs["steps"]
points_valid = True
if "exp_burst" in scanargs and scanargs['exp_burst'] is not None:
if "exp_burst" in scanargs and scanargs["exp_burst"] is not None:
num_points *= scanargs["exp_burst"]
points_valid = True
if "repeats" in scanargs and scanargs['repeats'] is not None:
if "repeats" in scanargs and scanargs["repeats"] is not None:
num_points *= scanargs["repeats"]
points_valid = True
if points_valid:
@@ -98,19 +99,17 @@ class StdDaqMixin(CustomDeviceMixin):
self._mon.start()
def on_unstage(self):
""" Stop a running acquisition and close connection
"""
"""Stop a running acquisition and close connection"""
print("Creating virtual dataset")
self.parent.create_virtual_dataset()
self.parent.blueunstage()
def on_stop(self):
""" Stop a running acquisition and close connection
"""
"""Stop a running acquisition and close connection"""
self.parent.blueunstage()
def monitor(self) -> None:
""" Monitor status messages while connection is open. This will block the reply monitoring
"""Monitor status messages while connection is open. This will block the reply monitoring
to calling unstage() might throw. Status updates are sent every 1 seconds, but finishing
acquisition means StdDAQ will close connection, so there's no idle state polling.
"""
@@ -144,19 +143,31 @@ class StdDaqClient(PSIDeviceBase):
daq = StdDaqClient(name="daq", ws_url="ws://xbl-daq-29:8080", rest_url="http://xbl-daq-29:5000")
```
"""
# pylint: disable=too-many-instance-attributes
custom_prepare_cls = StdDaqMixin
USER_ACCESS = ["set_daq_config", "get_daq_config", "nuke", "connect", "message", "state", "bluestage", "blueunstage"]
USER_ACCESS = [
"set_daq_config",
"get_daq_config",
"nuke",
"connect",
"message",
"state",
"bluestage",
"blueunstage",
]
_wsclient = None
# Status attributes
ws_url = Component(Signal, kind=Kind.config, metadata={'write_access': False})
runstatus = Component(Signal, value="unknown", kind=Kind.normal, metadata={'write_access': False})
ws_url = Component(Signal, kind=Kind.config, metadata={"write_access": False})
runstatus = Component(
Signal, value="unknown", kind=Kind.normal, metadata={"write_access": False}
)
num_images = Component(Signal, value=10000, kind=Kind.config)
file_path = Component(Signal, value="/gpfs/test/test-beamline", kind=Kind.config)
file_prefix = Component(Signal, value="file", kind=Kind.config)
# Configuration attributes
rest_url = Component(Signal, kind=Kind.config, metadata={'write_access': False})
# Configuration attributes
rest_url = Component(Signal, kind=Kind.config, metadata={"write_access": False})
cfg_detector_name = Component(Signal, kind=Kind.config)
cfg_detector_type = Component(Signal, kind=Kind.config)
cfg_bit_depth = Component(Signal, kind=Kind.config)
@@ -176,10 +187,19 @@ class StdDaqClient(PSIDeviceBase):
device_manager=None,
ws_url: str = "ws://localhost:8080",
rest_url: str = "http://localhost:5000",
data_source_name = None,
data_source_name=None,
**kwargs,
) -> None:
super().__init__(prefix=prefix, name=name, kind=kind, read_attrs=read_attrs, configuration_attrs=configuration_attrs, parent=parent, device_manager=device_manager, **kwargs)
super().__init__(
prefix=prefix,
name=name,
kind=kind,
read_attrs=read_attrs,
configuration_attrs=configuration_attrs,
parent=parent,
device_manager=device_manager,
**kwargs,
)
self.ws_url.set(ws_url, force=True).wait()
self.rest_url.set(rest_url, force=True).wait()
self.data_source_name = data_source_name
@@ -264,47 +284,49 @@ class StdDaqClient(PSIDeviceBase):
"""
# Configuration parameters
if 'image_width' in d and d['image_width']!=None:
self.cfg_pixel_width.set(d['image_width']).wait()
if 'image_height' in d and d['image_height']!=None:
self.cfg_pixel_height.set(d['image_height']).wait()
if 'bit_depth' in d:
self.cfg_bit_depth.set(d['bit_depth']).wait()
if 'nr_writers' in d and d['nr_writers']!=None:
self.cfg_nr_writers.set(d['nr_writers']).wait()
if "image_width" in d and d["image_width"] != None:
self.cfg_pixel_width.set(d["image_width"]).wait()
if "image_height" in d and d["image_height"] != None:
self.cfg_pixel_height.set(d["image_height"]).wait()
if "bit_depth" in d:
self.cfg_bit_depth.set(d["bit_depth"]).wait()
if "nr_writers" in d and d["nr_writers"] != None:
self.cfg_nr_writers.set(d["nr_writers"]).wait()
# Run parameters
if 'num_points_total' in d:
self.num_images.set(d['num_points_total']).wait()
if "num_points_total" in d:
self.num_images.set(d["num_points_total"]).wait()
# Restart the DAQ if resolution changed
cfg = self.get_daq_config()
if cfg['image_pixel_height'] != self.cfg_pixel_height.get() or \
cfg['image_pixel_width'] != self.cfg_pixel_width.get() or \
cfg['bit_depth'] != self.cfg_bit_depth.get() or \
cfg['number_of_writers'] != self.cfg_nr_writers.get():
if (
cfg["image_pixel_height"] != self.cfg_pixel_height.get()
or cfg["image_pixel_width"] != self.cfg_pixel_width.get()
or cfg["bit_depth"] != self.cfg_bit_depth.get()
or cfg["number_of_writers"] != self.cfg_nr_writers.get()
):
# Stop if current status is not idle
if self.state() != "idle":
logger.warning(f"[{self.name}] stdDAQ reconfiguration might corrupt files")
# Update retrieved config
cfg['image_pixel_height'] = int(self.cfg_pixel_height.get())
cfg['image_pixel_width'] = int(self.cfg_pixel_width.get())
cfg['bit_depth'] = int(self.cfg_bit_depth.get())
cfg['number_of_writers'] = int(self.cfg_nr_writers.get())
cfg["image_pixel_height"] = int(self.cfg_pixel_height.get())
cfg["image_pixel_width"] = int(self.cfg_pixel_width.get())
cfg["bit_depth"] = int(self.cfg_bit_depth.get())
cfg["number_of_writers"] = int(self.cfg_nr_writers.get())
self.set_daq_config(cfg)
sleep(1)
self.get_daq_config(update=True)
def bluestage(self):
""" Stages the stdDAQ
"""Stages the stdDAQ
Opens a new connection to the stdDAQ, sends the start command with
the current configuration. It waits for the first reply and checks
it for obvious failures.
Opens a new connection to the stdDAQ, sends the start command with
the current configuration. It waits for the first reply and checks
it for obvious failures.
"""
# Can't stage into a running exposure
if self.state() != 'idle':
if self.state() != "idle":
raise RuntimeError(f"[{self.name}] stdDAQ can't stage from state: {self.state()}")
# Must make sure that image size matches the data source
@@ -315,10 +337,13 @@ class StdDaqClient(PSIDeviceBase):
daq_img_h = self.cfg_pixel_height.get()
if not (daq_img_w == cam_img_w and daq_img_h == cam_img_h):
raise RuntimeError(f"[{self.name}] stdDAQ image resolution ({daq_img_w} , {daq_img_h}) does not match camera with ({cam_img_w} , {cam_img_h})")
raise RuntimeError(
f"[{self.name}] stdDAQ image resolution ({daq_img_w} , {daq_img_h}) does not match camera with ({cam_img_w} , {cam_img_h})"
)
else:
logger.warning(f"[{self.name}] stdDAQ image resolution ({daq_img_w} , {daq_img_h}) matches camera with ({cam_img_w} , {cam_img_h})")
logger.warning(
f"[{self.name}] stdDAQ image resolution ({daq_img_w} , {daq_img_h}) matches camera with ({cam_img_w} , {cam_img_h})"
)
file_path = self.file_path.get()
num_images = self.num_images.get()
@@ -327,7 +352,12 @@ class StdDaqClient(PSIDeviceBase):
# New connection
self._wsclient = self.connect()
message = {"command": "start", "path": file_path, "file_prefix": file_prefix, "n_image": num_images, }
message = {
"command": "start",
"path": file_path,
"file_prefix": file_prefix,
"n_image": num_images,
}
reply = self.message(message)
if reply is not None:
@@ -338,8 +368,10 @@ class StdDaqClient(PSIDeviceBase):
# Give it more time to reconfigure
if reply["status"] in ("rejected"):
# FIXME: running exposure is a nogo
if reply['reason'] == "driver is busy!":
raise RuntimeError(f"[{self.name}] Start stdDAQ command rejected: already running")
if reply["reason"] == "driver is busy!":
raise RuntimeError(
f"[{self.name}] Start stdDAQ command rejected: already running"
)
else:
# Give it more time to consolidate
sleep(1)
@@ -348,16 +380,18 @@ class StdDaqClient(PSIDeviceBase):
print(f"[{self.name}] Started stdDAQ in: {reply['status']}")
return
raise RuntimeError(f"[{self.name}] Failed to start the stdDAQ in 1 tries, reason: {reply['reason']}")
raise RuntimeError(
f"[{self.name}] Failed to start the stdDAQ in 1 tries, reason: {reply['reason']}"
)
def blueunstage(self):
""" Unstages the stdDAQ
"""Unstages the stdDAQ
Opens a new connection to the stdDAQ, sends the stop command and
waits for the idle state.
Opens a new connection to the stdDAQ, sends the stop command and
waits for the idle state.
"""
ii = 0
while ii<10:
while ii < 10:
# Stop the DAQ (will close connection) - reply is always "success"
self._wsclient = self.connect()
self.message({"command": "stop_all"}, wait_reply=False)
@@ -371,7 +405,7 @@ class StdDaqClient(PSIDeviceBase):
if reply is not None:
logger.info(f"[{self.name}] DAQ status reply: {reply}")
reply = json.loads(reply)
if reply["status"] in ("idle", "error"):
# Only 'idle' state accepted
print(f"DAQ stopped on try {ii}")
@@ -388,6 +422,7 @@ class StdDaqClient(PSIDeviceBase):
# Bluesky flyer interface
def complete(self) -> SubscriptionStatus:
"""Wait for current run. Must end in status 'file_saved'."""
def is_running(*args, value, timestamp, **kwargs):
result = value in ["idle", "file_saved", "error"]
return result
@@ -396,40 +431,35 @@ class StdDaqClient(PSIDeviceBase):
return status
def get_daq_config(self, update=False) -> dict:
"""Read the current configuration from the DAQ
"""
r = requests.get(
self.rest_url.get() + '/api/config/get',
params={'user': "ioc"},
timeout=2)
"""Read the current configuration from the DAQ"""
r = requests.get(self.rest_url.get() + "/api/config/get", params={"user": "ioc"}, timeout=2)
if r.status_code != 200:
raise ConnectionError(f"[{self.name}] Error {r.status_code}:\t{r.text}")
cfg = r.json()
if update:
self.cfg_detector_name.set(cfg['detector_name']).wait()
self.cfg_detector_type.set(cfg['detector_type']).wait()
self.cfg_bit_depth.set(cfg['bit_depth']).wait()
self.cfg_pixel_height.set(cfg['image_pixel_height']).wait()
self.cfg_pixel_width.set(cfg['image_pixel_width']).wait()
self.cfg_nr_writers.set(cfg['number_of_writers']).wait()
self.cfg_detector_name.set(cfg["detector_name"]).wait()
self.cfg_detector_type.set(cfg["detector_type"]).wait()
self.cfg_bit_depth.set(cfg["bit_depth"]).wait()
self.cfg_pixel_height.set(cfg["image_pixel_height"]).wait()
self.cfg_pixel_width.set(cfg["image_pixel_width"]).wait()
self.cfg_nr_writers.set(cfg["number_of_writers"]).wait()
return cfg
def set_daq_config(self, config, settle_time=1):
"""Write a full configuration to the DAQ
"""
url = self.rest_url.get() + '/api/config/set'
"""Write a full configuration to the DAQ"""
url = self.rest_url.get() + "/api/config/set"
r = requests.post(
url,
params={"user": "ioc"},
json=config,
timeout=2,
headers={"Content-Type": "application/json"}
)
url,
params={"user": "ioc"},
json=config,
timeout=2,
headers={"Content-Type": "application/json"},
)
if r.status_code != 200:
raise ConnectionError(f"[{self.name}] Error {r.status_code}:\t{r.text}")
# Wait for service to restart (and connect to make sure)
#sleep(settle_time)
# sleep(settle_time)
self.connect()
return r.json()
@@ -437,20 +467,24 @@ class StdDaqClient(PSIDeviceBase):
"""Combine the stddaq written files in a given folder in an interleaved
h5 virtual dataset
"""
url = self.rest_url.get() + '/api/h5/create_interleaved_vds'
url = self.rest_url.get() + "/api/h5/create_interleaved_vds"
file_path = self.file_path.get()
file_prefix = self.file_prefix.get()
r = requests.post(
url,
params = {'user': 'ioc'},
json = {'base_path': file_path, 'file_prefix': file_prefix, 'output_file': file_prefix.rstrip('_') + '.h5'},
timeout = 2,
headers = {'Content-type': 'application/json'}
params={"user": "ioc"},
json={
"base_path": file_path,
"file_prefix": file_prefix,
"output_file": file_prefix.rstrip("_") + ".h5",
},
timeout=2,
headers={"Content-type": "application/json"},
)
def nuke(self, restarttime=5):
""" Reconfigures the stdDAQ to restart the services. This causes
"""Reconfigures the stdDAQ to restart the services. This causes
systemd to kill the current DAQ service and restart it with the same
configuration. Which might corrupt the currently written file...
"""
@@ -459,18 +493,20 @@ class StdDaqClient(PSIDeviceBase):
sleep(restarttime)
def state(self) -> str | None:
""" Querry the current system status"""
"""Querry the current system status"""
try:
wsclient = self.connect()
wsclient.send(json.dumps({'command': 'status'}))
wsclient.send(json.dumps({"command": "status"}))
r = wsclient.recv(timeout=1)
r = json.loads(r)
return r['status']
return r["status"]
except ConnectionRefusedError:
raise
# Automatically connect to microXAS testbench if directly invoked
if __name__ == "__main__":
daq = StdDaqClient(name="daq", ws_url="ws://sls-daq-001:8080", rest_url="http://sls-daq-001:5000")
daq = StdDaqClient(
name="daq", ws_url="ws://sls-daq-001:8080", rest_url="http://sls-daq-001:5000"
)
daq.wait_for_connection()