GF camera part seems working

This commit is contained in:
gac-x05la
2025-03-17 16:54:47 +01:00
parent ab72fa3ffa
commit 1c5e4a691a
3 changed files with 343 additions and 359 deletions
@@ -38,18 +38,18 @@ femto_mean_curr:
readOnly: true
softwareTrigger: false
es1_roty:
readoutPriority: monitored
description: 'Test rotation stage'
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X02DA-ES1-SMP1:ROTY
deviceTags:
- es1-sam
onFailure: buffer
enabled: true
readOnly: false
softwareTrigger: false
# es1_roty:
# readoutPriority: monitored
# description: 'Test rotation stage'
# deviceClass: ophyd.EpicsMotor
# deviceConfig:
# prefix: X02DA-ES1-SMP1:ROTY
# deviceTags:
# - es1-sam
# onFailure: buffer
# enabled: true
# readOnly: false
# softwareTrigger: false
# es1_ismc:
# description: 'Automation1 iSMC interface'
@@ -66,12 +66,12 @@ es1_roty:
# es1_tasks:
# description: 'Automation1 task management interface'
# deviceClass: tomcat_bec.devices.aa1Tasks
# deviceConfig:
# deviceClass: tomcat_bec.devices.aa1Tasks
# deviceConfig:
# prefix: 'X02DA-ES1-SMP1:TASK:'
# deviceTags:
# - es1
# enabled: true
# enabled: false
# onFailure: buffer
# readOnly: false
# readoutPriority: monitored
@@ -94,7 +94,7 @@ es1_roty:
# es1_ddaq:
# description: 'Automation1 position recording interface'
# deviceClass: tomcat_bec.devices.aa1AxisDriveDataCollection
# deviceClass: tomcat_bec.devices.aa1AxisDriveDataCollection
# deviceConfig:
# prefix: 'X02DA-ES1-SMP1:ROTY:DDC:'
# deviceTags:
+129 -105
View File
@@ -7,57 +7,7 @@ from ophyd import Component as Cpt
from ophyd import Device, DynamicDeviceComponent, EpicsSignal, EpicsSignalRO, Kind, Signal
import tomcat_bec.devices.gigafrost.gfconstants as const
class GigaFrostSignalWithValidation(EpicsSignal):
"""
Custom EpicsSignal class that validates the value with the specified validator
before setting the value.
"""
def __init__(
self,
read_pv,
write_pv=None,
*,
put_complete=False,
string=False,
limits=False,
name=None,
validator=None,
**kwargs,
):
self._validator = validator
super().__init__(
read_pv,
write_pv,
put_complete=put_complete,
string=string,
limits=limits,
name=name,
**kwargs,
)
def check_value(self, value):
if self._validator is not None:
self._validator(value)
return super().check_value(value)
def check_image_width(value):
"""
The Gigafrost camera requires the image width to be a multiple of 48.
"""
if value % 48 != 0:
raise ValueError("Image width must be a multiple of 48")
def check_image_height(value):
"""
The Gigafrost camera requires the image height to be a multiple of 16.
"""
if value % 16 != 0:
raise ValueError("Image height must be a multiple of 16")
from tomcat_bec.devices.gigafrost.gfutils import extend_header_table
class GigaFrostBase(Device):
@@ -86,50 +36,38 @@ class GigaFrostBase(Device):
# pylint: disable=too-many-instance-attributes
busy_stat = Cpt(EpicsSignalRO, "BUSY_STAT", auto_monitor=True)
sync_flag = Cpt(EpicsSignalRO, "SYNC_FLAG", auto_monitor=True)
sync_swhw = Cpt(EpicsSignal, "SYNC_SWHW.PROC", put_complete=True, kind=Kind.omitted)
start_cam = Cpt(EpicsSignal, "START_CAM", put_complete=True, kind=Kind.omitted)
set_param = Cpt(EpicsSignal, "SET_PARAM.PROC", put_complete=True, kind=Kind.omitted)
acqmode = Cpt(EpicsSignal, "ACQMODE", put_complete=True, kind=Kind.config)
# Standard camera configs
acquire = Cpt(EpicsSignal, "START_CAM", put_complete=True, kind=Kind.omitted)
acquire_time = Cpt(
EpicsSignal, "EXPOSURE", put_complete=True, auto_monitor=True, kind=Kind.config
)
acquire_period = Cpt(
EpicsSignal, "FRAMERATE", put_complete=True, auto_monitor=True, kind=Kind.config
)
num_exposures = Cpt(
EpicsSignal, "CNT_NUM", put_complete=True, auto_monitor=True, kind=Kind.config
)
array_size = DynamicDeviceComponent(
{
"array_size_x": (EpicsSignalRO, "ROIX", {"auto_monitor": True}),
"array_size_y": (EpicsSignalRO, "ROIY", {"auto_monitor": True}),
"array_size_x": (EpicsSignal, "ROIX", {"auto_monitor": True, "put_complete": True}),
"array_size_y": (EpicsSignal, "ROIY", {"auto_monitor": True, "put_complete": True}),
},
doc="Size of the array in the XY dimensions",
)
# UDP header
ports = Cpt(EpicsSignal, "PORTS", put_complete=True, kind=Kind.config)
framenum = Cpt(EpicsSignal, "FRAMENUM", put_complete=True, kind=Kind.config)
ht_offset = Cpt(EpicsSignal, "HT_OFFSET", put_complete=True, kind=Kind.config)
write_srv = Cpt(EpicsSignal, "WRITE_SRV.PROC", put_complete=True, kind=Kind.omitted)
# DAQ parameters
file_path = Cpt(Signal, kind=Kind.config, value="")
file_prefix = Cpt(Signal, kind=Kind.config, value="")
num_images = Cpt(Signal, kind=Kind.config, value=1)
# Standard camera configs
exposure = Cpt(EpicsSignal, "EXPOSURE", put_complete=True, auto_monitor=True, kind=Kind.config)
framerate = Cpt(
EpicsSignal, "FRAMERATE", put_complete=True, auto_monitor=True, kind=Kind.config
)
roix = Cpt(
GigaFrostSignalWithValidation,
"ROIX",
put_complete=True,
auto_monitor=True,
kind=Kind.config,
validator=check_image_width,
)
roiy = Cpt(
GigaFrostSignalWithValidation,
"ROIY",
put_complete=True,
auto_monitor=True,
kind=Kind.config,
validator=check_image_height,
)
# GF specific interface
acquire_block = Cpt(Signal, kind=Kind.config, value=0)
busy_stat = Cpt(EpicsSignalRO, "BUSY_STAT", auto_monitor=True)
sync_flag = Cpt(EpicsSignalRO, "SYNC_FLAG", auto_monitor=True)
sync_swhw = Cpt(EpicsSignal, "SYNC_SWHW.PROC", put_complete=True, kind=Kind.omitted)
set_param = Cpt(EpicsSignal, "SET_PARAM.PROC", put_complete=True, kind=Kind.omitted)
acqmode = Cpt(EpicsSignal, "ACQMODE", put_complete=True, kind=Kind.config)
scan_id = Cpt(EpicsSignal, "SCAN_ID", put_complete=True, auto_monitor=True, kind=Kind.config)
cnt_num = Cpt(EpicsSignal, "CNT_NUM", put_complete=True, auto_monitor=True, kind=Kind.config)
corr_mode = Cpt(
EpicsSignal, "CORR_MODE", put_complete=True, auto_monitor=True, kind=Kind.config
)
@@ -139,6 +77,10 @@ class GigaFrostBase(Device):
soft_trig = Cpt(EpicsSignal, "SOFT_TRIG.PROC", put_complete=True, kind=Kind.omitted)
soft_exp = Cpt(EpicsSignal, "SOFT_EXP", put_complete=True)
###############################################################################################
# Automatically set modes on camera init
auto_soft_enable = Cpt(Signal, kind=Kind.config, metadata={"write_access": False})
###############################################################################################
# Enable schemes
# NOTE: 0 physical, 1 virtual (i.e. always running, but logs enable signal)
@@ -242,13 +184,6 @@ class GigaFrostBase(Device):
EpicsSignal, "CNT_ENDBIT_RBV", write_pv="CNT_ENDBIT", put_complete=True, kind=Kind.config
)
# Line swap selection
ls_sw = Cpt(EpicsSignal, "LS_SW", put_complete=True, kind=Kind.config)
ls_nw = Cpt(EpicsSignal, "LS_NW", put_complete=True, kind=Kind.config)
ls_se = Cpt(EpicsSignal, "LS_SE", put_complete=True, kind=Kind.config)
ls_ne = Cpt(EpicsSignal, "LS_NE", put_complete=True, kind=Kind.config)
conn_parm = Cpt(EpicsSignal, "CONN_PARM", string=True, put_complete=True, kind=Kind.config)
# HW settings as read only
pixrate = Cpt(EpicsSignalRO, "PIXRATE", auto_monitor=True, kind=Kind.config)
trig_delay = Cpt(EpicsSignalRO, "TRIG_DELAY", auto_monitor=True, kind=Kind.config)
@@ -261,32 +196,121 @@ class GigaFrostBase(Device):
bnc5_rbv = Cpt(EpicsSignalRO, "BNC5_RBV", auto_monitor=True, kind=Kind.config)
t_board = Cpt(EpicsSignalRO, "T_BOARD", auto_monitor=True)
auto_soft_enable = Cpt(Signal, kind=Kind.config)
backend_url = Cpt(Signal, kind=Kind.config)
### HW configuration parameters
# TODO: Only used at INIT, signals not needed
# UDP header configuration parameters
mac_north = Cpt(Signal, kind=Kind.config)
mac_south = Cpt(Signal, kind=Kind.config)
ip_north = Cpt(Signal, kind=Kind.config)
ip_south = Cpt(Signal, kind=Kind.config)
udp_backend_url = Cpt(Signal, kind=Kind.config, metadata={"write_access": False})
udp_ports = Cpt(EpicsSignal, "PORTS", put_complete=True, kind=Kind.config)
udp_framenum = Cpt(EpicsSignal, "FRAMENUM", put_complete=True, kind=Kind.config)
udp_ht_offset = Cpt(EpicsSignal, "HT_OFFSET", put_complete=True, kind=Kind.config)
udp_write_srv = Cpt(EpicsSignal, "WRITE_SRV.PROC", put_complete=True, kind=Kind.omitted)
conn_parm = Cpt(EpicsSignal, "CONN_PARM", string=True, put_complete=True, kind=Kind.config)
file_path = Cpt(Signal, kind=Kind.config, value="")
file_prefix = Cpt(Signal, kind=Kind.config, value="")
num_images = Cpt(Signal, kind=Kind.config, value=1)
# Line swap selection
ls_sw = Cpt(EpicsSignal, "LS_SW", put_complete=True, kind=Kind.config)
ls_nw = Cpt(EpicsSignal, "LS_NW", put_complete=True, kind=Kind.config)
ls_se = Cpt(EpicsSignal, "LS_SE", put_complete=True, kind=Kind.config)
ls_ne = Cpt(EpicsSignal, "LS_NE", put_complete=True, kind=Kind.config)
# pylint: disable=protected-access
def _define_backend_ip(self):
"""Select backend IP address for UDP stream"""
if self.backend_url.get() == const.BE3_DAFL_CLIENT: # xbl-daq-33
if self.udp_backend_url.get() == const.BE3_DAFL_CLIENT: # xbl-daq-33
return const.BE3_NORTH_IP, const.BE3_SOUTH_IP
if self.backend_url.get() == const.BE999_DAFL_CLIENT:
if self.udp_backend_url.get() == const.BE999_DAFL_CLIENT:
return const.BE999_NORTH_IP, const.BE999_SOUTH_IP
raise RuntimeError(f"Backend {self.backend_url.get()} not recognized.")
raise RuntimeError(f"Backend {self.udp_backend_url.get()} not recognized.")
def _define_backend_mac(self):
"""Select backend MAC address for UDP stream"""
if self.backend_url.get() == const.BE3_DAFL_CLIENT: # xbl-daq-33
if self.udp_backend_url.get() == const.BE3_DAFL_CLIENT: # xbl-daq-33
return const.BE3_NORTH_MAC, const.BE3_SOUTH_MAC
if self.backend_url.get() == const.BE999_DAFL_CLIENT:
if self.udp_backend_url.get() == const.BE999_DAFL_CLIENT:
return const.BE999_NORTH_MAC, const.BE999_SOUTH_MAC
raise RuntimeError(f"Backend {self.backend_url.get()} not recognized.")
raise RuntimeError(f"Backend {self.udp_backend_url.get()} not recognized.")
def _build_udp_header_table(self):
"""Build the header table for the UDP communication"""
udp_header_table = []
for i in range(0, 64, 1):
for j in range(0, 8, 1):
dest_port = 2000 + 8 * i + j
source_port = 3000 + j
if j < 4:
extend_header_table(
udp_header_table,
self.mac_south.get(),
self.ip_south.get(),
dest_port,
source_port,
)
else:
extend_header_table(
udp_header_table,
self.mac_north.get(),
self.ip_north.get(),
dest_port,
source_port,
)
return udp_header_table
def initialize_gigafrost(self) -> None:
"""Initialize the camera, set channel values"""
# Stop acquisition
self.acquire.set(0).wait()
# set entry to UDP table
# number of UDP ports to use
self.udp_ports.set(2).wait()
# number of images to send to each UDP port before switching to next
self.udp_framenum.set(5).wait()
# offset in UDP table - where to find the first entry
self.udp_ht_offset.set(0).wait()
# activate changes
self.udp_write_srv.set(1).wait()
# Configure triggering if needed
if self.auto_soft_enable.get():
# Set modes
# self.fix_nframes_mode = "start"
self.cnt_startbit.set(1).wait()
self.cnt_endbit.set(0).wait()
# self.enable_mode = "soft"
self.mode_enbl_ext.set(0).wait()
self.mode_endbl_soft.set(1).wait()
self.mode_enbl_auto.set(0).wait()
# self.trigger_mode = "auto"
self.mode_trig_auto.set(1).wait()
self.mode_trig_soft.set(0).wait()
self.mode_trig_timer.set(0).wait()
self.mode_trig_ext.set(0).wait()
# self.exposure_mode = "timer"
self.mode_exp_ext.set(0).wait()
self.mode_exp_soft.set(0).wait()
self.mode_exp_timer.set(1).wait()
# line swap - on for west, off for east
self.ls_sw.set(1).wait()
self.ls_nw.set(1).wait()
self.ls_se.set(0).wait()
self.ls_ne.set(0).wait()
# Commit parameters
self.set_param.set(1).wait()
# Initialize data backend
n, s = self._define_backend_ip()
self.ip_north.put(n, force=True)
self.ip_south.put(s, force=True)
n, s = self._define_backend_mac()
self.mac_north.put(n, force=True)
self.mac_south.put(s, force=True)
# Set udp header table (data communication parameters)
self.conn_parm.set(self._build_udp_header_table()).wait()
+198 -238
View File
@@ -7,14 +7,12 @@ Created on Thu Jun 27 17:28:43 2024
@author: mohacsi_i
"""
from time import sleep
from typing import Literal
import numpy as np
from bec_lib.logger import bec_logger
from ophyd import DeviceStatus
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
import tomcat_bec.devices.gigafrost.gfconstants as const
from tomcat_bec.devices.gigafrost.gfutils import extend_header_table
from tomcat_bec.devices.gigafrost.gigafrost_base import GigaFrostBase
from tomcat_bec.devices.gigafrost.std_daq_client import (
StdDaqClient,
@@ -22,6 +20,8 @@ from tomcat_bec.devices.gigafrost.std_daq_client import (
StdDaqStatus,
)
import tomcat_bec.devices.gigafrost.gfconstants as const
from tomcat_bec.devices.gigafrost.std_daq_preview import StdDaqPreview
logger = bec_logger.logger
@@ -69,10 +69,16 @@ class GigaFrostCamera(PSIDeviceBase, GigaFrostBase):
"enable_mode",
"backend",
"acq_done",
"live_preview"
"live_preview",
"arm",
"disarm",
]
_initialized = False
# Placeholders for stdDAQ and livestream clients
backend = None
live_preview = None
def __init__(
self,
prefix="",
@@ -90,11 +96,6 @@ class GigaFrostCamera(PSIDeviceBase, GigaFrostBase):
std_daq_live: str | None = None,
**kwargs,
):
# Ugly hack to pass values before on_init()
self._signals_to_be_set = {}
self._signals_to_be_set["auto_soft_enable"] = auto_soft_enable
self._signals_to_be_set["backend_url"] = backend_url
# super() will call the mixin class
super().__init__(
prefix=prefix,
@@ -106,72 +107,79 @@ class GigaFrostCamera(PSIDeviceBase, GigaFrostBase):
scan_info=scan_info,
**kwargs,
)
# Configure the stdDAQ client
if std_daq_rest is None or std_daq_ws is None:
raise ValueError("Both std_daq_rest and std_daq_ws must be provided")
self.backend = StdDaqClient(parent=self, ws_url=std_daq_ws, rest_url=std_daq_rest)
self.live_preview = None
# raise ValueError("Both std_daq_rest and std_daq_ws must be provided")
logger.error("No stdDAQ address provided, launching without data backend!")
else:
self.backend = StdDaqClient(parent=self, ws_url=std_daq_ws, rest_url=std_daq_rest)
# Configure image preview
if std_daq_live is not None:
self.live_preview = StdDaqPreview(url=std_daq_live, cb=self._on_preview_update)
else:
logger.error("No stdDAQ stream address provided, launching without preview!")
# Configure camera backend
self.auto_soft_enable.put(auto_soft_enable, force=True)
self.udp_backend_url.put(backend_url, force=True)
def configure(self, d: dict = None):
"""Configure the next scan with the GigaFRoST camera
Parameters as 'd' dictionary
----------------------------
num_images : int, optional
Number of images to be taken during each scan. Set to -1 for an
unlimited number of images (limited by the ringbuffer size and
backend speed). (default = 10)
num_exposures : int, optional
Number of images to be taken during each scan. Set to -1 for unlimited
number of images (limited by the ringbuffer size and backend speed).
exposure_time_ms : float, optional
Exposure time [ms]. (default = 0.2)
Exposure time [ms].
exposure_period_ms : float, optional
Exposure period [ms], ignored in soft trigger mode. (default = 1.0)
Exposure period [ms], ignored in soft trigger mode.
image_width : int, optional
ROI size in the x-direction [pixels] (default = 2016)
ROI size in the x-direction [pixels] (max. 2016)
image_height : int, optional
ROI size in the y-direction [pixels] (default = 2016)
ROI size in the y-direction [pixels] (max. 2016)
scanid : int, optional
Scan identification number to be associated with the scan data
(default = 0)
correction_mode : int, optional
The correction to be applied to the imaging data. The following
modes are available (default = 5):
* 0: Bypass. No corrections are applied to the data.
* 1: Send correction factor A instead of pixel values
* 2: Send correction factor B instead of pixel values
* 3: Send correction factor C instead of pixel values
* 4: Invert pixel values, but do not apply any linearity correction
* 5: Apply the full linearity correction
acq_mode : str, optional
Select one of the pre-configured trigger behavior
"""
# Stop acquisition
self.set_idle()
self.disarm()
backend_config = StdDaqConfigPartial(**d)
self.backend.update_config(backend_config)
# if self.backend is not None:
# backend_config = StdDaqConfigPartial(**d)
# self.backend.configure(backend_config)
# Update all specified ophyd signals
config = {}
for key in self.component_names:
val = d.get(key)
if val is not None:
config[key] = val
# If Bluesky style configure
if d is not None:
# Commonly changed settings
if "exposure_num_burst" in d:
self.num_exposures.set(d["exposure_num_burst"]).wait()
if "num_exposures" in d:
self.num_exposures.set(d["num_exposures"]).wait()
if "exposure_time_ms" in d:
self.acquire_time.set(d["exposure_time_ms"]).wait()
if "exposure_period_ms" in d:
self.acquire_period.set(d["exposure_period_ms"]).wait()
if "image_width" in d:
if d["image_width"] % 48 != 0:
raise RuntimeError(f"[{self.name}] image_width must be divisible by 48")
self.array_size.array_size_x.set(d["image_width"]).wait()
if "image_height" in d:
if d["image_height"] % 16 != 0:
raise RuntimeError(f"[{self.name}] image_height must be divisible by 16")
self.array_size.array_size_y.set(d["image_height"]).wait()
if d.get("exp_time", 0) > 0:
config["exposure"] = d["exp_time"] * 1000 # exposure time in ms
self.corr_mode.set(d.get("corr_mode", 5)).wait()
self.scan_id.set(d.get("scan_id", 0)).wait()
if "corr_mode" not in config:
config["corr_mode"] = 5
if "scan_id" not in config:
config["scan_id"] = 0
super().configure(config)
# If the acquisition mode is specified, set it
if "acq_mode" in d:
self.set_acquisition_mode(config["acq_mode"])
# If a pre-configured acquisition mode is specified, set it
if "acq_mode" in d:
self.set_acquisition_mode(d["acq_mode"])
# Commit parameters
self.set_param.set(1).wait()
@@ -187,10 +195,9 @@ class GigaFrostCamera(PSIDeviceBase, GigaFrostBase):
"""
if acq_mode == "default":
# NOTE: Trigger using software events via softEnable (actually works)
# NOTE: Software trigger via softEnable (actually works)
# Trigger parameters
self.fix_nframes_mode = "start"
# Switch to physical enable signal
self.mode_enbl_exp.set(0).wait()
@@ -236,7 +243,7 @@ class GigaFrostCamera(PSIDeviceBase, GigaFrostBase):
raise RuntimeError(f"Unsupported acquisition mode: {acq_mode}")
@property
def exposure_mode(self):
def exposure_mode(self) -> str | None:
"""Returns the current exposure mode of the GigaFRost camera.
Returns
@@ -258,35 +265,34 @@ class GigaFrostCamera(PSIDeviceBase, GigaFrostBase):
return None
@exposure_mode.setter
def exposure_mode(self, exp_mode):
def exposure_mode(self, mode):
"""Apply the exposure mode for the GigaFRoST camera.
Parameters
----------
exp_mode : {'external', 'timer', 'soft'}
mode : {'external', 'timer', 'soft'}
The exposure mode to be set.
"""
modes = {
"external": self.mode_exp_ext,
"timer": self.mode_exp_timer,
"soft": self.mode_exp_soft,
}
if exp_mode not in const.gf_valid_exposure_modes:
raise ValueError(
f"Invalid exposure mode! Valid modes are:\n{const.gf_valid_exposure_modes}"
)
for key, attr in modes.items():
# set the desired mode to 1, all others to 0
attr.set(int(key == exp_mode)).wait()
if mode == "external":
self.mode_exp_ext.set(1).wait()
self.mode_exp_soft.set(0).wait()
self.mode_exp_timer.set(0).wait()
elif mode == "timer":
self.mode_exp_ext.set(0).wait()
self.mode_exp_soft.set(0).wait()
self.mode_exp_timer.set(1).wait()
elif mode == "soft":
self.mode_exp_ext.set(0).wait()
self.mode_exp_soft.set(1).wait()
self.mode_exp_timer.set(0).wait()
else:
raise ValueError(f"Invalid exposure mode: {mode}!")
# Commit parameters
self.set_param.set(1).wait()
@property
def fix_nframes_mode(self) -> Literal["off", "start", "end", "start+end"] | None:
def fix_nframes_mode(self) -> str | None:
"""Return the current fixed number of frames mode of the GigaFRoST camera.
Returns
@@ -309,7 +315,7 @@ class GigaFrostCamera(PSIDeviceBase, GigaFrostBase):
return None
@fix_nframes_mode.setter
def fix_nframes_mode(self, mode: Literal["off", "start", "end", "start+end"]):
def fix_nframes_mode(self, mode: str):
"""Apply the fixed number of frames settings to the GigaFRoST camera.
Parameters
@@ -317,29 +323,26 @@ class GigaFrostCamera(PSIDeviceBase, GigaFrostBase):
mode : {'off', 'start', 'end', 'start+end'}
The fixed number of frames mode to be applied.
"""
self._fix_nframes_mode = mode
if self._fix_nframes_mode == "off":
if mode == "off":
self.cnt_startbit.set(0).wait()
self.cnt_endbit.set(0).wait()
elif self._fix_nframes_mode == "start":
elif mode == "start":
self.cnt_startbit.set(1).wait()
self.cnt_endbit.set(0).wait()
elif self._fix_nframes_mode == "end":
elif mode == "end":
self.cnt_startbit.set(0).wait()
self.cnt_endbit.set(1).wait()
elif self._fix_nframes_mode == "start+end":
elif mode == "start+end":
self.cnt_startbit.set(1).wait()
self.cnt_endbit.set(1).wait()
else:
raise ValueError(
f"Invalid fixed frame number mode! Valid modes are: {const.gf_valid_fix_nframe_modes}"
)
raise ValueError(f"Invalid fixed frame number mode: {mode}!")
# Commit parameters
self.set_param.set(1).wait()
@property
def trigger_mode(self) -> Literal["auto", "external", "timer", "soft"] | None:
def trigger_mode(self) -> str | None:
"""Method to detect the current trigger mode set in the GigaFRost camera.
Returns
@@ -364,34 +367,43 @@ class GigaFrostCamera(PSIDeviceBase, GigaFrostBase):
return None
@trigger_mode.setter
def trigger_mode(self, mode: Literal["auto", "external", "timer", "soft"]):
def trigger_mode(self, mode: str):
"""
Set the trigger mode for the GigaFRoST camera.
Args:
mode(str): The trigger mode to be set. Valid arguments are: ['auto', 'external', 'timer', 'soft']
Parameters
----------
mode : {'auto', 'external', 'timer', 'soft'}
The trigger mode to be set.
"""
modes = {
"auto": self.mode_trig_auto,
"soft": self.mode_trig_soft,
"timer": self.mode_trig_timer,
"external": self.mode_trig_ext,
}
if mode not in modes:
raise ValueError(
"Invalid trigger mode! Valid modes are: ['auto', 'external', 'timer', 'soft']"
)
for key, attr in modes.items():
# set the desired mode to 1, all others to 0
attr.set(int(key == mode)).wait()
if mode == "auto":
self.mode_trig_auto.set(1).wait()
self.mode_trig_soft.set(0).wait()
self.mode_trig_timer.set(0).wait()
self.mode_trig_ext.set(0).wait()
elif mode == "soft":
self.mode_trig_auto.set(0).wait()
self.mode_trig_soft.set(1).wait()
self.mode_trig_timer.set(0).wait()
self.mode_trig_ext.set(0).wait()
elif mode == "timer":
self.mode_trig_auto.set(0).wait()
self.mode_trig_soft.set(0).wait()
self.mode_trig_timer.set(1).wait()
self.mode_trig_ext.set(0).wait()
elif mode == "external":
self.mode_trig_auto.set(0).wait()
self.mode_trig_soft.set(0).wait()
self.mode_trig_timer.set(0).wait()
self.mode_trig_ext.set(1).wait()
else:
raise ValueError(f"Invalid trigger mode: {mode}!")
# Commit parameters
self.set_param.set(1).wait()
@property
def enable_mode(self) -> Literal["soft", "external", "soft+ext", "always"] | None:
def enable_mode(self) -> str | None:
"""Return the enable mode set in the GigaFRoST camera.
Returns
@@ -413,7 +425,7 @@ class GigaFrostCamera(PSIDeviceBase, GigaFrostBase):
return None
@enable_mode.setter
def enable_mode(self, mode: Literal["soft", "external", "soft+ext", "always"]):
def enable_mode(self, mode: str):
"""
Set the enable mode for the GigaFRoST camera.
@@ -425,27 +437,17 @@ class GigaFrostCamera(PSIDeviceBase, GigaFrostBase):
The GigaFRoST enable mode. Valid arguments are:
* 'soft':
The GigaFRoST enable signal is supplied through a software
signal
The GigaFRoST enable signal is supplied through a software signal
* 'external':
The GigaFRoST enable signal is supplied through an external TTL
gating signal from the rotaiton stage or some other control
unit
The GigaFRoST enable signal is supplied through an external TTL gating
signal from the rotaiton stage or some other control unit
* 'soft+ext':
The GigaFRoST enable signal can be supplied either via the
software signal or externally. The two signals are combined
with a logical OR gate.
The GigaFRoST enable signal can be supplied either via the software signal
or externally. The two signals are combined with a logical OR gate.
* 'always':
The GigaFRoST is always enabled.
CAUTION: This mode is not compatible with the fixed number of
frames modes!
CAUTION: This mode is not compatible with the fixed number of frames modes!
"""
if mode not in const.gf_valid_enable_modes:
raise ValueError(
f"Invalid enable mode {mode}! Valid modes are:\n{const.gf_valid_enable_modes}"
)
if mode == "soft":
self.mode_enbl_ext.set(0).wait()
self.mode_endbl_soft.set(1).wait()
@@ -462,115 +464,46 @@ class GigaFrostCamera(PSIDeviceBase, GigaFrostBase):
self.mode_enbl_ext.set(0).wait()
self.mode_endbl_soft.set(0).wait()
self.mode_enbl_auto.set(1).wait()
else:
raise ValueError(f"Invalid enable mode {mode}!")
# Commit parameters
self.set_param.set(1).wait()
def set_idle(self):
def arm(self) -> None:
"""Prepare the camera to accept triggers"""
self.acquire.set(1).wait()
def disarm(self):
"""Set the camera to idle state"""
self.start_cam.set(0).wait()
self.acquire.set(0).wait()
if self.auto_soft_enable.get():
self.soft_enable.set(0).wait()
def initialize_gigafrost(self) -> None:
"""Initialize the camera, set channel values"""
# Stop acquisition
self.start_cam.set(0).wait()
# set entry to UDP table
# number of UDP ports to use
self.ports.set(2).wait()
# number of images to send to each UDP port before switching to next
self.framenum.set(5).wait()
# offset in UDP table - where to find the first entry
self.ht_offset.set(0).wait()
# activate changes
self.write_srv.set(1).wait()
# Configure software triggering if needed
if self.auto_soft_enable.get():
# trigger modes
self.cnt_startbit.set(1).wait()
self.cnt_endbit.set(0).wait()
# set modes
self.enable_mode = "soft"
self.trigger_mode = "auto"
self.exposure_mode = "timer"
# line swap - on for west, off for east
self.ls_sw.set(1).wait()
self.ls_nw.set(1).wait()
self.ls_se.set(0).wait()
self.ls_ne.set(0).wait()
# Commit parameters
self.set_param.set(1).wait()
# Initialize data backend
n, s = self._define_backend_ip()
self.ip_north.put(n, force=True)
self.ip_south.put(s, force=True)
n, s = self._define_backend_mac()
self.mac_north.put(n, force=True)
self.mac_south.put(s, force=True)
# Set udp header table
self.set_udp_header_table()
def set_udp_header_table(self):
"""Set the communication parameters for the camera module"""
self.conn_parm.set(self._build_udp_header_table()).wait()
def destroy(self):
self.backend.shutdown()
if self.backend is not None:
self.backend.shutdown()
super().destroy()
def _build_udp_header_table(self):
"""Build the header table for the UDP communication"""
udp_header_table = []
# def _on_preview_update(self, img:np.ndarray):
# self._run_subs(sub_type=self.SUB_DEVICE_MONITOR_2D, obj=self, value=img)
for i in range(0, 64, 1):
for j in range(0, 8, 1):
dest_port = 2000 + 8 * i + j
source_port = 3000 + j
if j < 4:
extend_header_table(
udp_header_table,
self.mac_south.get(),
self.ip_south.get(),
dest_port,
source_port,
)
else:
extend_header_table(
udp_header_table,
self.mac_north.get(),
self.ip_north.get(),
dest_port,
source_port,
)
# def acq_done(self) -> DeviceStatus:
# """
# Check if the acquisition is done. For the GigaFrost camera, this is
# done by checking the status of the backend as the camera does not
# provide any feedback about its internal state.
return udp_header_table
def _on_preview_update(self, img:np.ndarray):
self._run_subs(sub_type=self.SUB_DEVICE_MONITOR_2D, obj=self, value=img)
def acq_done(self) -> DeviceStatus:
"""
Check if the acquisition is done. For the GigaFrost camera, this is
done by checking the status of the backend as the camera does not
provide any feedback about its internal state.
Returns:
DeviceStatus: The status of the acquisition
"""
status = DeviceStatus(self)
self.backend.add_status_callback(
status,
success=[StdDaqStatus.IDLE, StdDaqStatus.FILE_SAVED],
error=[StdDaqStatus.REJECTED, StdDaqStatus.ERROR],
)
return status
# Returns:
# DeviceStatus: The status of the acquisition
# """
# status = DeviceStatus(self)
# if self.backend is not None:
# self.backend.add_status_callback(
# status,
# success=[StdDaqStatus.IDLE, StdDaqStatus.FILE_SAVED],
# error=[StdDaqStatus.REJECTED, StdDaqStatus.ERROR],
# )
# return status
########################################
# Beamline Specific Implementations #
@@ -589,17 +522,11 @@ class GigaFrostCamera(PSIDeviceBase, GigaFrostBase):
Called after the device is connected and its signals are connected.
Default values for signals should be set here.
"""
# TODO: check if this can be moved to the config file
# pylint: disable=protected-access
self.auto_soft_enable._metadata["write_access"] = False
self.backend_url._metadata["write_access"] = False
self.auto_soft_enable.put(self._signals_to_be_set["auto_soft_enable"], force=True)
self.backend_url.put(self._signals_to_be_set["backend_url"], force=True)
# Perform a full initialization of the GigaFrost
self.initialize_gigafrost()
self.backend.connect()
# Connect to the stdDAQ backend
if self.backend is not None:
self.backend.connect()
def on_stage(self) -> DeviceStatus | None:
"""
@@ -613,14 +540,31 @@ class GigaFrostCamera(PSIDeviceBase, GigaFrostBase):
self.unstage()
sleep(0.5)
scan_msg = self.scan_info.msg
# FIXME: I don't care about how we fish out config parameters from scan info
scan_args = {
**scan_msg.request_inputs["inputs"],
**scan_msg.request_inputs["kwargs"],
**scan_msg.scan_parameters,
**self.scan_info.msg.request_inputs["inputs"],
**self.scan_info.msg.request_inputs["kwargs"],
**self.scan_info.msg.scan_parameters,
}
self.configure(scan_args)
d = {}
if "image_width" in scan_args and scan_args["image_width"] is not None:
d["image_width"] = scan_args["image_width"]
if "image_height" in scan_args and scan_args["image_height"] is not None:
d["image_height"] = scan_args["image_height"]
if "exp_time" in scan_args and scan_args["exp_time"] is not None:
d["exposure_time_ms"] = scan_args["exp_time"]
if "acq_time" in scan_args and scan_args["acq_time"] is not None:
d["exposure_time_ms"] = scan_args["acq_time"]
if "acq_period" in scan_args and scan_args["acq_period"] is not None:
d["exposure_period_ms"] = scan_args["acq_period"]
if "exp_burst" in scan_args and scan_args["exp_burst"] is not None:
d["exposure_num_burst"] = scan_args["exp_burst"]
if "acq_mode" in scan_args and scan_args["acq_mode"] is not None:
d["acq_mode"] = scan_args["acq_mode"]
if d:
self.configure(d)
# Sync if out of sync
if self.sync_flag.value == 0:
@@ -637,19 +581,21 @@ class GigaFrostCamera(PSIDeviceBase, GigaFrostBase):
def on_unstage(self) -> DeviceStatus | None:
"""Called while unstaging the device."""
# Switch to idle
self.set_idle()
logger.info(f"StdDaq status on unstage: {self.backend.status}")
self.backend.stop()
self.disarm()
if self.backend is not None:
logger.info(f"StdDaq status on unstage: {self.backend.status}")
self.backend.stop()
def on_pre_scan(self) -> DeviceStatus | None:
"""Called right before the scan starts on all devices automatically."""
# Switch to acquiring
self.backend.start(
file_path=self.file_path.get(),
file_prefix=self.file_prefix.get(),
num_images=self.num_images.get(),
)
self.start_cam.set(1).wait()
self.arm()
if self.backend is not None:
self.backend.start(
file_path=self.file_path.get(),
file_prefix=self.file_prefix.get(),
num_images=self.num_images.get(),
)
def on_trigger(self) -> DeviceStatus | None:
"""Called when the device is triggered."""
@@ -667,12 +613,20 @@ class GigaFrostCamera(PSIDeviceBase, GigaFrostBase):
# BEC teststand operation mode: posedge of SoftEnable if Started
self.soft_enable.set(0).wait()
self.soft_enable.set(1).wait()
if self.acquire_block.get():
wait_time = 0.2 + 0.001 * self.num_exposures.value * max(
self.acquire_time.value, self.acquire_period.value
)
logger.info(f"[{self.name}] Triggering blocks for {wait_time} seconds")
return DeviceStatus(self, done=True, success=True, settle_time=wait_time)
else:
self.soft_trig.set(1).wait()
def on_complete(self) -> DeviceStatus | None:
"""Called to inquire if a device has completed a scans."""
return self.acq_done()
# return self.acq_done()
return None
def on_kickoff(self) -> DeviceStatus | None:
"""Called to kickoff a device for a fly scan. Has to be called explicitly."""
@@ -685,6 +639,12 @@ class GigaFrostCamera(PSIDeviceBase, GigaFrostBase):
# Automatically connect to MicroSAXS testbench if directly invoked
if __name__ == "__main__":
gf = GigaFrostCamera(
"X02DA-CAM-GF2:", name="gf2", backend_url="http://xbl-daq-28:8080", auto_soft_enable=True
"X02DA-CAM-GF2:",
name="gf2",
backend_url="http://xbl-daq-28:8080",
auto_soft_enable=True,
# std_daq_ws="ws://129.129.95.111:8080",
# std_daq_rest="http://129.129.95.111:5000",
# std_daq_live='tcp://129.129.95.111:20000',
)
gf.wait_for_connection()