Daili commit coming to shape

This commit is contained in:
Istvan Mohacsi
2024-07-22 17:59:11 +02:00
committed by mohacsi_i
parent 7c5df6d0b4
commit 7f8dd3a4bb
4 changed files with 893 additions and 650 deletions
+1
View File
@@ -40,6 +40,7 @@ share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
*.bak
MANIFEST
# PyInstaller
@@ -9,6 +9,12 @@ Created on Thu Jun 27 17:28:43 2024
from enum import Enum
gf_valid_enable_modes = ("soft", "external", "soft+ext", "always")
gf_valid_exposure_modes = ("external", "timer", "soft")
gf_valid_trigger_modes = ("auto", "external", "timer", "soft")
gf_valid_fix_nframe_modes = ("off", "start", "end", "start+end")
# STATUS
class GfStatus(Enum):
"""Operation states for GigaFrost Ophyd device"""
@@ -0,0 +1,696 @@
# -*- coding: utf-8 -*-
"""
GigaFrost camera class module
Created on Thu Jun 27 17:28:43 2024
@author: mohacsi_i
"""
import sys
from time import sleep
from ophyd import Device, Component, EpicsSignal, EpicsSignalRO, Kind, DeviceStatus
from ophyd.device import Staged
from ophyd_devices.interfaces.base_classes.psi_detector_base import (
CustomDetectorMixin,
PSIDetectorBase,
)
try:
import gfconstants as const
except ModuleNotFoundError:
import tomcat_bec.devices.gigafrost.gfconstants as const
try:
from gfutils import extend_header_table
except ModuleNotFoundError:
from tomcat_bec.devices.gigafrost.gfutils import extend_header_table
class GigaFrostCameraMixin(CustomDetectorMixin):
"""Mixin class to setup TOMCAT specific implementations of the detector.
This class will be called by the custom_prepare_cls attribute of the detector class.
"""
def _define_backend_ip(self):
if self.parent.backendUrl.get() == const.BE3_DAFL_CLIENT: # xbl-daq-33
return const.BE3_NORTH_IP, const.BE3_SOUTH_IP
elif self.parent.backendUrl.get() == const.BE999_DAFL_CLIENT:
return const.BE999_NORTH_IP, const.BE999_SOUTH_IP
else:
raise RuntimeError(f"Backend not recognized. {(const.GF1, const.GF2, const.GF3)}")
def _define_backend_mac(self):
if self.parent.backendUrl.get() == const.BE3_DAFL_CLIENT: # xbl-daq-33
return const.BE3_NORTH_MAC, const.BE3_SOUTH_MAC
elif self.parent.backendUrl.get() == const.BE999_DAFL_CLIENT:
return const.BE999_NORTH_MAC, const.BE999_SOUTH_MAC
else:
raise RuntimeError(f"Backend not recognized. {(const.GF1, const.GF2, const.GF3)}")
def _set_udp_header_table(self):
"""Set the communication parameters for the camera module"""
self.parent.cfgConnectionParam.set(self._build_udp_header_table()).wait()
def _build_udp_header_table(self):
"""Build the header table for the communication"""
udp_header_table = []
for i in range(0, 64, 1):
for j in range(0, 8, 1):
dest_port = 2000 + 8 * i + j
source_port = 3000 + j
if j < 4:
extend_header_table(
udp_header_table, self.parent.macSouth, self.parent.ipSouth, dest_port, source_port
)
else:
extend_header_table(
udp_header_table, self.parent.macNorth, self.parent.ipNorth, dest_port, source_port
)
return udp_header_table
def on_init(self) -> None:
"""Initialize the camera, set channel values"""
## Stop acquisition
self.parent.cmdStartCamera.set(0).wait()
### set entry to UDP table
# number of UDP ports to use
self.parent.cfgUdpNumPorts.set(2).wait()
# number of images to send to each UDP port before switching to next
self.parent.cfgUdpNumFrames.set(5).wait()
# offset in UDP table - where to find the first entry
self.parent.cfgUdpHtOffset.set(0).wait()
# activate changes
self.parent.cmdWriteService.set(1).wait()
# Configure software triggering if needed
if self.parent._auto_soft_enable:
# trigger modes
self.parent.cfgCntStartBit.set(1).wait()
self.parent.cfgCntEndBit.set(0).wait()
# set modes
self.parent.enable_mode = "soft"
self.parent.trigger_mode = "auto"
self.parent.exposure_mode = "timer"
# line swap - on for west, off for east
self.parent.cfgLineSwapSW.set(1).wait()
self.parent.cfgLineSwapNW.set(1).wait()
self.parent.cfgLineSwapSE.set(0).wait()
self.parent.cfgLineSwapNE.set(0).wait()
# Commit parameters
self.parent.cmdSetParam.set(1).wait()
# Initialize data backend
n, s = self._define_backend_ip()
self.parent.ipNorth.put(n, force=True)
self.parent.ipSouth.put(s, force=True)
n, s = self._define_backend_mac()
self.parent.macNorth.put(n, force=True)
self.parent.macSouth.put(s, force=True)
# Set udp header table
self._set_udp_header_table()
self.parent.state.put(const.GfStatus.INIT, force=True)
return super().on_init()
def on_stage(self) -> None:
"""
Specify actions to be executed during stage in preparation for a scan.
self.parent.scaninfo already has all current parameters for the upcoming scan.
In case the backend service is writing data on disk, this step should include publishing
a file_event and file_message to BEC to inform the system where the data is written to.
IMPORTANT:
It must be safe to assume that the device is ready for the scan
to start immediately once this function is finished.
"""
if self.parent.infoBusyFlag.value:
raise RuntimeError("Camera is already busy, unstage it first!")
# Switch to acquiring
self.parent.cmdStartCamera.set(1).wait()
self.parent.state.put(const.GfStatus.ACQUIRING, force=True)
# Gigafrost can finish a run without explicit unstaging
self.parent._staged = Staged.no
def on_unstage(self) -> None:
"""
Specify actions to be executed during unstage.
This step should include checking if the acqusition was successful,
and publishing the file location and file event message,
with flagged done to BEC.
"""
# Switch to idle
self.parent.cmdStartCamera.set(0).wait()
if self.parent.autoSoftEnable.get():
self.cmdSoftEnable.set(0).wait()
self.parent.state.put(const.GfStatus.STOPPED, force=True)
def on_stop(self) -> None:
"""
Specify actions to be executed during stop.
This must also set self.parent.stopped to True.
This step should include stopping the detector and backend service.
"""
return self.on_unstage()
def on_trigger(self) -> None | DeviceStatus:
"""
Specify actions to be executed upon receiving trigger signal.
Return a DeviceStatus object or None
"""
status = DeviceStatus(self.parent)
# Soft triggering based on operation mode
if self.parent.autoSoftEnable.get() and self.parent.trigger_mode=='auto' and self.parent.enable_mode=='soft':
# BEC teststand operation mode: posedge of SoftEnable if Started
self.parent.cmdSoftEnable.set(0).wait()
self.parent.cmdSoftEnable.set(1).wait()
sleep_time = self.parent.cfgFramerate.value*self.parent.cfgCntNum.value*0.001+0.050
# There's no status readback from the camera, so we just wait
sleep(sleep_time)
print(f"[GF2] Slept for: {sleep_time} seconds", file=sys.stderr)
else:
self.parent.cmdSoftTrigger.set(1).wait()
status.set_finished()
return status
class GigaFrostCamera(PSIDetectorBase):
"""Ophyd device class to control Gigafrost cameras at Tomcat
The actual hardware is implemented by an IOC based on an old fork of Helge's
cameras. This means that the camera behaves differently than the SF cameras
in particular it provides even less feedback about it's internal progress.
Helge will update the GigaFrost IOC after working beamline.
The ophyd class is based on the 'gfclient' package and has a lot of Tomcat
specific additions. It does behave differently though, as ophyd swallows the
errors from failed PV writes.
Parameters
----------
use_soft_enable : bool
Flag to use the camera's soft enable (default: False)
backend_url : str
Backend url address necessary to set up the camera's udp header.
(default: http://xbl-daq-23:8080)
Bugs:
----------
FRAMERATE : Ignored in soft trigger mode, period becomes 2xexposure time
"""
# pylint: disable=too-many-instance-attributes
custom_prepare_cls = GigaFrostCameraMixin
infoBusyFlag = Component(EpicsSignalRO, "BUSY_STAT", auto_monitor=True)
infoSyncFlag = Component(EpicsSignalRO, "SYNC_FLAG", auto_monitor=True)
cmdSyncHw = Component(EpicsSignal, "SYNC_SWHW.PROC", put_complete=True)
cmdStartCamera = Component(EpicsSignal, "START_CAM", put_complete=True)
cmdSetParam = Component(EpicsSignal, "SET_PARAM.PROC", put_complete=True)
# UDP header
cfgUdpNumPorts = Component(EpicsSignal, "PORTS", put_complete=True, kind=Kind.config)
cfgUdpNumFrames = Component(EpicsSignal, "FRAMENUM", put_complete=True, kind=Kind.config)
cfgUdpHtOffset = Component(EpicsSignal, "HT_OFFSET", put_complete=True, kind=Kind.config)
cmdWriteService = Component(EpicsSignal, "WRITE_SRV.PROC", put_complete=True)
# Standard camera configs
cfgExposure = Component(EpicsSignal, "EXPOSURE", put_complete=True, auto_monitor=True, kind=Kind.config)
cfgFramerate = Component(EpicsSignal, "FRAMERATE", put_complete=True, auto_monitor=True, kind=Kind.config)
cfgRoiX = Component(EpicsSignal, "ROIX", put_complete=True, auto_monitor=True, kind=Kind.config)
cfgRoiY = Component(EpicsSignal, "ROIY", put_complete=True, auto_monitor=True, kind=Kind.config)
cfgScanId = Component(EpicsSignal, "SCAN_ID", put_complete=True, auto_monitor=True, kind=Kind.config)
cfgCntNum = Component(EpicsSignal, "CNT_NUM", put_complete=True, auto_monitor=True, kind=Kind.config)
cfgCorrMode = Component(EpicsSignal, "CORR_MODE", put_complete=True, auto_monitor=True, kind=Kind.config)
# Software signals
cmdSoftEnable = Component(EpicsSignal, "SOFT_ENABLE", put_complete=True)
cmdSoftTrigger = Component(EpicsSignal, "SOFT_TRIG.PROC", put_complete=True)
cmdSoftExposure = Component(EpicsSignal, "SOFT_EXP", put_complete=True)
# Trigger configuration PVs
cfgCntStartBit = Component(
EpicsSignal,
"CNT_STARTBIT_RBV",
write_pv="CNT_STARTBIT",
put_complete=True,
kind=Kind.config,
)
cfgCntEndBit = Component(
EpicsSignal, "CNT_ENDBIT_RBV", write_pv="CNT_ENDBIT", put_complete=True, kind=Kind.config
)
# Enable modes
cfgTrigEnableExt = Component(
EpicsSignal,
"MODE_ENBL_EXT_RBV",
write_pv="MODE_ENBL_EXT",
put_complete=True,
kind=Kind.config,
)
cfgTrigEnableSoft = Component(
EpicsSignal,
"MODE_ENBL_SOFT_RBV",
write_pv="MODE_ENBL_SOFT",
put_complete=True,
kind=Kind.config,
)
cfgTrigEnableAuto = Component(
EpicsSignal,
"MODE_ENBL_AUTO_RBV",
write_pv="MODE_ENBL_AUTO",
put_complete=True,
kind=Kind.config,
)
cfgTrigVirtEnable = Component(
EpicsSignal,
"MODE_ENBL_EXP_RBV",
write_pv="MODE_ENBL_EXP",
put_complete=True,
kind=Kind.config,
)
# Trigger modes
cfgTrigExt = Component(
EpicsSignal,
"MODE_TRIG_EXT_RBV",
write_pv="MODE_TRIG_EXT",
put_complete=True,
kind=Kind.config,
)
cfgTrigSoft = Component(
EpicsSignal,
"MODE_TRIG_SOFT_RBV",
write_pv="MODE_TRIG_SOFT",
put_complete=True,
kind=Kind.config,
)
cfgTrigTimer = Component(
EpicsSignal,
"MODE_TRIG_TIMER_RBV",
write_pv="MODE_TRIG_TIMER",
put_complete=True,
kind=Kind.config,
)
cfgTrigAuto = Component(
EpicsSignal,
"MODE_TRIG_AUTO_RBV",
write_pv="MODE_TRIG_AUTO",
put_complete=True,
kind=Kind.config,
)
# Exposure modes
cfgTrigExpExt = Component(
EpicsSignal,
"MODE_EXP_EXT_RBV",
write_pv="MODE_EXP_EXT",
put_complete=True,
kind=Kind.config,
)
cfgTrigExpSoft = Component(
EpicsSignal,
"MODE_EXP_SOFT_RBV",
write_pv="MODE_EXP_SOFT",
put_complete=True,
kind=Kind.config,
)
cfgTrigExpTimer = Component(
EpicsSignal,
"MODE_EXP_TIMER_RBV",
write_pv="MODE_EXP_TIMER",
put_complete=True,
kind=Kind.config,
)
# Line swap selection
cfgLineSwapSW = Component(EpicsSignal, "LS_SW", put_complete=True, kind=Kind.config)
cfgLineSwapNW = Component(EpicsSignal, "LS_NW", put_complete=True, kind=Kind.config)
cfgLineSwapSE = Component(EpicsSignal, "LS_SE", put_complete=True, kind=Kind.config)
cfgLineSwapNE = Component(EpicsSignal, "LS_NE", put_complete=True, kind=Kind.config)
cfgConnectionParam = Component(
EpicsSignal, "CONN_PARM", string=True, put_complete=True, kind=Kind.config
)
# HW settings as read only
cfgSyncFlag = Component(EpicsSignalRO, "PIXRATE", auto_monitor=True)
cfgTrigDelay = Component(EpicsSignalRO, "TRIG_DELAY", auto_monitor=True)
cfgSyncoutDelay = Component(EpicsSignalRO, "SYNCOUT_DLY", auto_monitor=True)
cfgOutputPolarity0 = Component(EpicsSignalRO, "BNC0_RBV", auto_monitor=True)
cfgOutputPolarity1 = Component(EpicsSignalRO, "BNC1_RBV", auto_monitor=True)
cfgOutputPolarity2 = Component(EpicsSignalRO, "BNC2_RBV", auto_monitor=True)
cfgOutputPolarity3 = Component(EpicsSignalRO, "BNC3_RBV", auto_monitor=True)
cfgInputPolarity1 = Component(EpicsSignalRO, "BNC4_RBV", auto_monitor=True)
cfgInputPolarity2 = Component(EpicsSignalRO, "BNC5_RBV", auto_monitor=True)
infoBoardTemp = Component(EpicsSignalRO, "T_BOARD", auto_monitor=True)
USER_ACCESS = ["exposure_mode", "fix_nframes_mode", "trigger_mode", "enable_mode"]
autoSoftEnable = Component(Signal, auto_monitor=True, kind=Kind.config)
backendUrl = Component(Signal, auto_monitor=True, kind=Kind.config)
state = Component(Signal, auto_monitor=True, kind=Kind.config)
def __init__(
self,
prefix="",
*,
name,
auto_soft_enable=False,
timeout=10,
backend_url=const.BE999_DAFL_CLIENT,
kind=None,
read_attrs=None,
configuration_attrs=None,
parent=None,
**kwargs,
):
# Additional parameters
self.autoSoftEnable._metadata["write_access"] = False
self.backendUrl._metadata["write_access"] = False
self.state._metadata["write_access"] = False
self.autoSoftEnable.put(auto_soft_enable, force=True)
self.backendUrl.put(backend_url, force=True)
self.state.put(const.GfStatus.NEW, force=True)
# super() will call the mixin class
super().__init__(
prefix=prefix,
name=name,
kind=kind,
read_attrs=read_attrs,
configuration_attrs=configuration_attrs,
parent=parent,
**kwargs,
)
def configure(
self,
nimages=10,
exposure=0.2,
period=1.0,
roix=2016,
roiy=2016,
scanid=0,
correction_mode=5,
):
"""Configure the next scan with the GigaFRoST camera
Parameters
----------
nimages : int, optional
Number of images to be taken during each scan. Set to -1 for an
unlimited number of images (limited by the ringbuffer size and
backend speed). (default = 10)
exposure : float, optional
Exposure time [ms]. (default = 0.2)
period : float, optional
Exposure period [ms], ignored in soft trigger mode. (default = 1.0)
roix : int, optional
ROI size in the x-direction [pixels] (default = 2016)
roiy : int, optional
ROI size in the y-direction [pixels] (default = 2016)
scanid : int, optional
Scan identification number to be associated with the scan data
(default = 0)
correction_mode : int, optional
The correction to be applied to the imaging data. The following
modes are available (default = 5):
* 0: Bypass. No corrections are applied to the data.
* 1: Send correction factor A instead of pixel values
* 2: Send correction factor B instead of pixel values
* 3: Send correction factor C instead of pixel values
* 4: Invert pixel values, but do not apply any linearity correction
* 5: Apply the full linearity correction
"""
# If Bluesky style configure
if isinstance(nimages, dict):
d = nimages.copy()
nimages = d.get('nimages', 10)
exposure = d.get('exposure', exposure)
period = d.get('period', period)
roix = d.get('roix', roix)
roiy = d.get('roiy', roiy)
scanid = d.get('scanid', scanid)
correction_mode = d.get('correction_mode', correction_mode)
# Stop acquisition
self.cmdStartCamera.set(0).wait()
if self._auto_soft_enable:
self.cmdSoftEnable.set(0).wait()
# change settings
self.cfgExposure.set(exposure).wait()
self.cfgFramerate.set(period).wait()
self.cfgRoiX.set(roix).wait()
self.cfgRoiY.set(roiy).wait()
self.cfgScanId.set(scanid).wait()
self.cfgCntNum.set(nimages).wait()
self.cfgCorrMode.set(correction_mode).wait()
# Commit parameter
self.cmdSetParam.set(1).wait()
self.state.set(const.GfStatus.CONFIGURED, force=True)
@property
def exposure_mode(self):
"""Returns the current exposure mode of the GigaFRost camera.
Returns
-------
exp_mode : {'external', 'timer', 'soft'}
The camera's active exposure mode.
If more than one mode is active at the same time, it returns None.
"""
mode_soft = self.cfgTrigExpSoft.get()
mode_timer = self.cfgTrigExpTimer.get()
mode_external = self.cfgTrigExpExt.get()
if mode_soft and not mode_timer and not mode_external:
return "soft"
elif not mode_soft and mode_timer and not mode_external:
return "timer"
elif not mode_soft and not mode_timer and mode_external:
return "external"
else:
return None
@exposure_mode.setter
def exposure_mode(self, exp_mode):
"""Apply the exposure mode for the GigaFRoST camera.
Parameters
----------
exp_mode : {'external', 'timer', 'soft'}
The exposure mode to be set.
"""
if exp_mode not in const.gf_valid_exposure_modes:
raise ValueError(
f"Invalid exposure mode! Valid modes are:\n" "{const.gf_valid_exposure_modes}"
)
if exp_mode == "external":
self.cfgTrigExpExt.set(1).wait()
self.cfgTrigExpSoft.set(0).wait()
self.cfgTrigExpTimer.set(0).wait()
elif exp_mode == "timer":
self.cfgTrigExpExt.set(0).wait()
self.cfgTrigExpSoft.set(0).wait()
self.cfgTrigExpTimer.set(1).wait()
elif exp_mode == "soft":
self.cfgTrigExpExt.set(0).wait()
self.cfgTrigExpSoft.set(1).wait()
self.cfgTrigExpTimer.set(0).wait()
# Commit parameters
self.cmdSetParam.set(1).wait()
@property
def fix_nframes_mode(self):
"""Return the current fixed number of frames mode of the GigaFRoST camera.
Returns
-------
fix_nframes_mode : {'off', 'start', 'end', 'start+end'}
The camera's active fixed number of frames mode.
"""
start_bit = self.cfgCntStartBit.get()
end_bit = self.cfgCntStartBit.get()
if not start_bit and not end_bit:
return "off"
elif start_bit and not end_bit:
return "start"
elif not start_bit and end_bit:
return "end"
elif start_bit and end_bit:
return "start+end"
else:
return None
@fix_nframes_mode.setter
def fix_nframes_mode(self, mode):
"""Apply the fixed number of frames settings to the GigaFRoST camera.
Parameters
----------
mode : {'off', 'start', 'end', 'start+end'}
The fixed number of frames mode to be applied.
"""
if mode not in const.gf_valid_fix_nframe_modes:
raise ValueError(
f"Invalid fixed number of frames mode! Valid modes are:\n{const.gf_valid_fix_nframe_modes}"
)
self._fix_nframes_mode = mode
if self._fix_nframes_mode == "off":
self.cfgCntStartBit.set(0).wait()
self.cfgCntEndBit.set(0).wait()
elif self._fix_nframes_mode == "start":
self.cfgCntStartBit.set(1).wait()
self.cfgCntEndBit.set(0).wait()
elif self._fix_nframes_mode == "end":
self.cfgCntStartBit.set(0).wait()
self.cfgCntEndBit.set(1).wait()
elif self._fix_nframes_mode == "start+end":
self.cfgCntStartBit.set(1).wait()
self.cfgCntEndBit.set(1).wait()
# Commit parameters
self.cmdSetParam.set(1).wait()
@property
def trigger_mode(self):
"""Method to detect the current trigger mode set in the GigaFRost camera.
Returns
-------
mode : {'auto', 'external', 'timer', 'soft'}
The camera's active trigger mode. If more than one mode is active
at the moment, None is returned.
"""
mode_auto = self.cfgTrigAuto.get()
mode_external = self.cfgTrigExt.get()
mode_timer = self.cfgTrigTimer.get()
mode_soft = self.cfgTrigSoft.get()
if mode_auto:
return "auto"
elif mode_soft:
return "soft"
elif mode_timer:
return "timer"
elif mode_external:
return "external"
else:
return None
@trigger_mode.setter
def trigger_mode(self, mode):
"""Set the trigger mode for the GigaFRoST camera.
Parameters
----------
mode : {'auto', 'external', 'timer', 'soft'}
The GigaFRoST trigger mode.
"""
if mode not in const.gf_valid_trigger_modes:
raise ValueError(
"Invalid trigger mode! Valid modes are:\n" "{const.gf_valid_trigger_modes}"
)
if mode == "auto":
self.cfgTrigAuto.set(1).wait()
self.cfgTrigSoft.set(0).wait()
self.cfgTrigTimer.set(0).wait()
self.cfgTrigExt.set(0).wait()
elif mode == "external":
self.cfgTrigAuto.set(0).wait()
self.cfgTrigSoft.set(0).wait()
self.cfgTrigTimer.set(0).wait()
self.cfgTrigExt.set(1).wait()
elif mode == "timer":
self.cfgTrigAuto.set(0).wait()
self.cfgTrigSoft.set(0).wait()
self.cfgTrigTimer.set(1).wait()
self.cfgTrigExt.set(0).wait()
elif mode == "soft":
self.cfgTrigAuto.set(0).wait()
self.cfgTrigSoft.set(1).wait()
self.cfgTrigTimer.set(0).wait()
self.cfgTrigExt.set(0).wait()
# Commit parameters
self.cmdSetParam.set(1).wait()
@property
def enable_mode(self):
"""Return the enable mode set in the GigaFRost camera.
Returns
-------
enable_mode: {'soft', 'external', 'soft+ext', 'always'}
The camera's active enable mode.
"""
mode_soft = self.cfgTrigEnableSoft.get()
mode_external = self.cfgTrigEnableExt.get()
mode_auto = self.cfgTrigEnableAuto.get()
if mode_soft and not mode_auto:
if mode_external:
return "soft+ext"
else:
return "soft"
elif mode_auto and not mode_soft and not mode_external:
return "always"
elif mode_external and not mode_soft and not mode_auto:
return "external"
else:
return None
@enable_mode.setter
def enable_mode(self, mode):
"""Apply the enable mode for the GigaFRoST camera.
Parameters
----------
mode : {'soft', 'external', 'soft+ext', 'always'}
The enable mode to be applied.
"""
if mode not in const.gf_valid_enable_modes:
raise ValueError(
"Invalid enable mode! Valid modes are:\n" "{const.gf_valid_enable_modes}"
)
if mode == "soft":
self.cfgTrigEnableExt.set(0).wait()
self.cfgTrigEnableSoft.set(1).wait()
self.cfgTrigEnableAuto.set(0).wait()
elif mode == "external":
self.cfgTrigEnableExt.set(1).wait()
self.cfgTrigEnableSoft.set(0).wait()
self.cfgTrigEnableAuto.set(0).wait()
elif mode == "soft+ext":
self.cfgTrigEnableExt.set(1).wait()
self.cfgTrigEnableSoft.set(1).wait()
self.cfgTrigEnableAuto.set(0).wait()
elif mode == "always":
self.cfgTrigEnableExt.set(0).wait()
self.cfgTrigEnableSoft.set(0).wait()
self.cfgTrigEnableAuto.set(1).wait()
# Commit parameters
self.cmdSetParam.set(1).wait()
# Automatically connect to MicroSAXS testbench if directly invoked
if __name__ == "__main__":
gf = GigaFrostCamera(
"X02DA-CAM-GF2:", name="gf2", backend_url="http://xbl-daq-28:8080", auto_soft_enable=True
)
gf.wait_for_connection()
+190 -650
View File
@@ -11,18 +11,190 @@ from time import sleep
from ophyd import Device, Component, EpicsSignal, EpicsSignalRO, Kind, DeviceStatus
from ophyd.device import Staged
try:
import gfconstants as const
except ModuleNotFoundError:
import tomcat_bec.devices.gigafrost.gfconstants as const
from ophyd_devices.interfaces.base_classes.psi_detector_base import (
CustomDetectorMixin,
PSIDetectorBase,
)
try:
from gfutils import extend_header_table
from StdDaqClient import StdDaqClient
except ModuleNotFoundError:
from tomcat_bec.devices.gigafrost.gfutils import extend_header_table
from tomcat_bec.devices.gigafrost.StdDaqClient import StdDaqClient
try:
from gfcamera import GigaFrostCamera
except ModuleNotFoundError:
from tomcat_bec.devices.gigafrost.gfcamera import GigaFrostCamera
class GigaFrostClient(Device):
class GigafrostClientMixin(CustomDetectorMixin):
"""Mixin class to setup TOMCAT specific implementations of the detector.
This class will be called by the custom_prepare_cls attribute of the detector class.
"""
def _define_backend_ip(self):
if self.parent.backendUrl.get() == const.BE3_DAFL_CLIENT: # xbl-daq-33
return const.BE3_NORTH_IP, const.BE3_SOUTH_IP
elif self.parent.backendUrl.get() == const.BE999_DAFL_CLIENT:
return const.BE999_NORTH_IP, const.BE999_SOUTH_IP
else:
raise RuntimeError(f"Backend not recognized. {(const.GF1, const.GF2, const.GF3)}")
def _define_backend_mac(self):
if self.parent.backendUrl.get() == const.BE3_DAFL_CLIENT: # xbl-daq-33
return const.BE3_NORTH_MAC, const.BE3_SOUTH_MAC
elif self.parent.backendUrl.get() == const.BE999_DAFL_CLIENT:
return const.BE999_NORTH_MAC, const.BE999_SOUTH_MAC
else:
raise RuntimeError(f"Backend not recognized. {(const.GF1, const.GF2, const.GF3)}")
def _set_udp_header_table(self):
"""Set the communication parameters for the camera module"""
self.parent.cfgConnectionParam.set(self._build_udp_header_table()).wait()
def _build_udp_header_table(self):
"""Build the header table for the communication"""
udp_header_table = []
for i in range(0, 64, 1):
for j in range(0, 8, 1):
dest_port = 2000 + 8 * i + j
source_port = 3000 + j
if j < 4:
extend_header_table(
udp_header_table, self.parent.macSouth, self.parent.ipSouth, dest_port, source_port
)
else:
extend_header_table(
udp_header_table, self.parent.macNorth, self.parent.ipNorth, dest_port, source_port
)
return udp_header_table
def on_init(self) -> None:
"""Initialize the camera, set channel values"""
## Stop acquisition
self.parent.cmdStartCamera.set(0).wait()
### set entry to UDP table
# number of UDP ports to use
self.parent.cfgUdpNumPorts.set(2).wait()
# number of images to send to each UDP port before switching to next
self.parent.cfgUdpNumFrames.set(5).wait()
# offset in UDP table - where to find the first entry
self.parent.cfgUdpHtOffset.set(0).wait()
# activate changes
self.parent.cmdWriteService.set(1).wait()
# Configure software triggering if needed
if self.parent._auto_soft_enable:
# trigger modes
self.parent.cfgCntStartBit.set(1).wait()
self.parent.cfgCntEndBit.set(0).wait()
# set modes
self.parent.enable_mode = "soft"
self.parent.trigger_mode = "auto"
self.parent.exposure_mode = "timer"
# line swap - on for west, off for east
self.parent.cfgLineSwapSW.set(1).wait()
self.parent.cfgLineSwapNW.set(1).wait()
self.parent.cfgLineSwapSE.set(0).wait()
self.parent.cfgLineSwapNE.set(0).wait()
# Commit parameters
self.parent.cmdSetParam.set(1).wait()
# Initialize data backend
n, s = self._define_backend_ip()
self.parent.ipNorth.put(n, force=True)
self.parent.ipSouth.put(s, force=True)
n, s = self._define_backend_mac()
self.parent.macNorth.put(n, force=True)
self.parent.macSouth.put(s, force=True)
# Set udp header table
self._set_udp_header_table()
self.parent.state.put(const.GfStatus.INIT, force=True)
return super().on_init()
def on_stage(self) -> None:
"""
Specify actions to be executed during stage in preparation for a scan.
self.parent.scaninfo already has all current parameters for the upcoming scan.
In case the backend service is writing data on disk, this step should include publishing
a file_event and file_message to BEC to inform the system where the data is written to.
IMPORTANT:
It must be safe to assume that the device is ready for the scan
to start immediately once this function is finished.
"""
if self.parent.infoBusyFlag.value:
raise RuntimeError("Camera is already busy, unstage it first!")
# Switch to acquiring
self.parent.cmdStartCamera.set(1).wait()
self.parent.state.put(const.GfStatus.ACQUIRING, force=True)
# Gigafrost can finish a run without explicit unstaging
self.parent._staged = Staged.no
def on_unstage(self) -> None:
"""
Specify actions to be executed during unstage.
This step should include checking if the acqusition was successful,
and publishing the file location and file event message,
with flagged done to BEC.
"""
# Switch to idle
self.parent.cmdStartCamera.set(0).wait()
if self.parent.autoSoftEnable.get():
self.cmdSoftEnable.set(0).wait()
self.parent.state.put(const.GfStatus.STOPPED, force=True)
def on_stop(self) -> None:
"""
Specify actions to be executed during stop.
This must also set self.parent.stopped to True.
This step should include stopping the detector and backend service.
"""
return self.on_unstage()
def on_trigger(self) -> None | DeviceStatus:
"""
Specify actions to be executed upon receiving trigger signal.
Return a DeviceStatus object or None
"""
status = DeviceStatus(self.parent)
# Soft triggering based on operation mode
if self.parent.autoSoftEnable.get() and self.parent.trigger_mode=='auto' and self.parent.enable_mode=='soft':
# BEC teststand operation mode: posedge of SoftEnable if Started
self.parent.cmdSoftEnable.set(0).wait()
self.parent.cmdSoftEnable.set(1).wait()
sleep_time = self.parent.cfgFramerate.value*self.parent.cfgCntNum.value*0.001+0.050
# There's no status readback from the camera, so we just wait
sleep(sleep_time)
print(f"[GF2] Slept for: {sleep_time} seconds", file=sys.stderr)
else:
self.parent.cmdSoftTrigger.set(1).wait()
status.set_finished()
return status
class GigaFrostClient(PSIDetectorBase):
"""Ophyd device class to control Gigafrost cameras at Tomcat
The actual hardware is implemented by an IOC based on an old fork of Helge's
@@ -47,654 +219,22 @@ class GigaFrostClient(Device):
"""
# pylint: disable=too-many-instance-attributes
infoBusyFlag = Component(EpicsSignalRO, "BUSY_STAT", auto_monitor=True)
infoSyncFlag = Component(EpicsSignalRO, "SYNC_FLAG", auto_monitor=True)
cmdSyncHw = Component(EpicsSignal, "SYNC_SWHW.PROC", put_complete=True)
cmdStartCamera = Component(EpicsSignal, "START_CAM", put_complete=True)
cmdSetParam = Component(EpicsSignal, "SET_PARAM.PROC", put_complete=True)
# UDP header
cfgUdpNumPorts = Component(EpicsSignal, "PORTS", put_complete=True, kind=Kind.config)
cfgUdpNumFrames = Component(EpicsSignal, "FRAMENUM", put_complete=True, kind=Kind.config)
cfgUdpHtOffset = Component(EpicsSignal, "HT_OFFSET", put_complete=True, kind=Kind.config)
cmdWriteService = Component(EpicsSignal, "WRITE_SRV.PROC", put_complete=True)
# Standard camera configs
cfgExposure = Component(EpicsSignal, "EXPOSURE", put_complete=True, auto_monitor=True, kind=Kind.config)
cfgFramerate = Component(EpicsSignal, "FRAMERATE", put_complete=True, auto_monitor=True, kind=Kind.config)
cfgRoiX = Component(EpicsSignal, "ROIX", put_complete=True, auto_monitor=True, kind=Kind.config)
cfgRoiY = Component(EpicsSignal, "ROIY", put_complete=True, auto_monitor=True, kind=Kind.config)
cfgScanId = Component(EpicsSignal, "SCAN_ID", put_complete=True, auto_monitor=True, kind=Kind.config)
cfgCntNum = Component(EpicsSignal, "CNT_NUM", put_complete=True, auto_monitor=True, kind=Kind.config)
cfgCorrMode = Component(EpicsSignal, "CORR_MODE", put_complete=True, auto_monitor=True, kind=Kind.config)
# Software signals
cmdSoftEnable = Component(EpicsSignal, "SOFT_ENABLE", put_complete=True)
cmdSoftTrigger = Component(EpicsSignal, "SOFT_TRIG.PROC", put_complete=True)
cmdSoftExposure = Component(EpicsSignal, "SOFT_EXP", put_complete=True)
# Trigger configuration PVs
cfgCntStartBit = Component(
EpicsSignal,
"CNT_STARTBIT_RBV",
write_pv="CNT_STARTBIT",
put_complete=True,
kind=Kind.config,
)
cfgCntEndBit = Component(
EpicsSignal, "CNT_ENDBIT_RBV", write_pv="CNT_ENDBIT", put_complete=True, kind=Kind.config
)
# Enable modes
cfgTrigEnableExt = Component(
EpicsSignal,
"MODE_ENBL_EXT_RBV",
write_pv="MODE_ENBL_EXT",
put_complete=True,
kind=Kind.config,
)
cfgTrigEnableSoft = Component(
EpicsSignal,
"MODE_ENBL_SOFT_RBV",
write_pv="MODE_ENBL_SOFT",
put_complete=True,
kind=Kind.config,
)
cfgTrigEnableAuto = Component(
EpicsSignal,
"MODE_ENBL_AUTO_RBV",
write_pv="MODE_ENBL_AUTO",
put_complete=True,
kind=Kind.config,
)
cfgTrigVirtEnable = Component(
EpicsSignal,
"MODE_ENBL_EXP_RBV",
write_pv="MODE_ENBL_EXP",
put_complete=True,
kind=Kind.config,
)
# Trigger modes
cfgTrigExt = Component(
EpicsSignal,
"MODE_TRIG_EXT_RBV",
write_pv="MODE_TRIG_EXT",
put_complete=True,
kind=Kind.config,
)
cfgTrigSoft = Component(
EpicsSignal,
"MODE_TRIG_SOFT_RBV",
write_pv="MODE_TRIG_SOFT",
put_complete=True,
kind=Kind.config,
)
cfgTrigTimer = Component(
EpicsSignal,
"MODE_TRIG_TIMER_RBV",
write_pv="MODE_TRIG_TIMER",
put_complete=True,
kind=Kind.config,
)
cfgTrigAuto = Component(
EpicsSignal,
"MODE_TRIG_AUTO_RBV",
write_pv="MODE_TRIG_AUTO",
put_complete=True,
kind=Kind.config,
)
# Exposure modes
cfgTrigExpExt = Component(
EpicsSignal,
"MODE_EXP_EXT_RBV",
write_pv="MODE_EXP_EXT",
put_complete=True,
kind=Kind.config,
)
cfgTrigExpSoft = Component(
EpicsSignal,
"MODE_EXP_SOFT_RBV",
write_pv="MODE_EXP_SOFT",
put_complete=True,
kind=Kind.config,
)
cfgTrigExpTimer = Component(
EpicsSignal,
"MODE_EXP_TIMER_RBV",
write_pv="MODE_EXP_TIMER",
put_complete=True,
kind=Kind.config,
)
# Line swap selection
cfgLineSwapSW = Component(EpicsSignal, "LS_SW", put_complete=True, kind=Kind.config)
cfgLineSwapNW = Component(EpicsSignal, "LS_NW", put_complete=True, kind=Kind.config)
cfgLineSwapSE = Component(EpicsSignal, "LS_SE", put_complete=True, kind=Kind.config)
cfgLineSwapNE = Component(EpicsSignal, "LS_NE", put_complete=True, kind=Kind.config)
cfgConnectionParam = Component(
EpicsSignal, "CONN_PARM", string=True, put_complete=True, kind=Kind.config
)
# HW settings as read only
cfgSyncFlag = Component(EpicsSignalRO, "PIXRATE", auto_monitor=True)
cfgTrigDelay = Component(EpicsSignalRO, "TRIG_DELAY", auto_monitor=True)
cfgSyncoutDelay = Component(EpicsSignalRO, "SYNCOUT_DLY", auto_monitor=True)
cfgOutputPolarity0 = Component(EpicsSignalRO, "BNC0_RBV", auto_monitor=True)
cfgOutputPolarity1 = Component(EpicsSignalRO, "BNC1_RBV", auto_monitor=True)
cfgOutputPolarity2 = Component(EpicsSignalRO, "BNC2_RBV", auto_monitor=True)
cfgOutputPolarity3 = Component(EpicsSignalRO, "BNC3_RBV", auto_monitor=True)
cfgInputPolarity1 = Component(EpicsSignalRO, "BNC4_RBV", auto_monitor=True)
cfgInputPolarity2 = Component(EpicsSignalRO, "BNC5_RBV", auto_monitor=True)
infoBoardTemp = Component(EpicsSignalRO, "T_BOARD", auto_monitor=True)
USER_ACCESS = ["exposure_mode", "fix_nframes_mode", "trigger_mode", "enable_mode"]
def __init__(
self,
prefix="",
*,
name,
auto_soft_enable=False,
timeout=10,
backend_url=const.BE999_DAFL_CLIENT,
kind=None,
read_attrs=None,
configuration_attrs=None,
parent=None,
**kwargs,
):
super().__init__(
prefix=prefix,
name=name,
kind=kind,
read_attrs=read_attrs,
configuration_attrs=configuration_attrs,
parent=parent,
**kwargs,
)
self._auto_soft_enable = auto_soft_enable
self._timeout = timeout
self._backend_url = backend_url
self.state = const.GfStatus.NEW
self._settings = None
self._north_mac, self._south_mac = self._define_backend_mac()
self._north_ip, self._south_ip = self._define_backend_ip()
self._valid_enable_modes = ("soft", "external", "soft+ext", "always")
self._valid_exposure_modes = ("external", "timer", "soft")
self._valid_trigger_modes = ("auto", "external", "timer", "soft")
self._valid_fix_nframe_modes = ("off", "start", "end", "start+end")
# Continue initialization
self.initialize()
def initialize(self):
"""Initialize the camera, set channel values"""
## Stop acquisition
self.cmdStartCamera.set(0).wait()
### set entry to UDP table
# number of UDP ports to use
self.cfgUdpNumPorts.set(2).wait()
# number of images to send to each UDP port before switching to next
self.cfgUdpNumFrames.set(5).wait()
# offset in UDP table - where to find the first entry
self.cfgUdpHtOffset.set(0).wait()
# activate changes
self.cmdWriteService.set(1).wait()
# Configure software triggering if needed
if self._auto_soft_enable:
# trigger modes
self.cfgCntStartBit.set(1).wait()
self.cfgCntEndBit.set(0).wait()
# set modes
self.enable_mode = "soft"
self.trigger_mode = "auto"
self.exposure_mode = "timer"
# line swap - on for west, off for east
self.cfgLineSwapSW.set(1).wait()
self.cfgLineSwapNW.set(1).wait()
self.cfgLineSwapSE.set(0).wait()
self.cfgLineSwapNE.set(0).wait()
# Commit parameters
self.cmdSetParam.set(1).wait()
self._set_udp_header_table()
self.state = const.GfStatus.INIT
# sets the basic settings
self._settings = {
"backend_url": self._backend_url,
"auto_soft_enable": self._auto_soft_enable,
"ioc_name": self.prefix,
}
def configure(
self,
nimages=10,
exposure=0.2,
period=1.0,
roix=2016,
roiy=2016,
scanid=0,
correction_mode=5,
):
"""Configure the next scan with the GigaFRoST camera
Parameters
----------
nimages : int, optional
Number of images to be taken during each scan. Set to -1 for an
unlimited number of images (limited by the ringbuffer size and
backend speed). (default = 10)
exposure : float, optional
Exposure time [ms]. (default = 0.2)
period : float, optional
Exposure period [ms], ignored in soft trigger mode. (default = 1.0)
roix : int, optional
ROI size in the x-direction [pixels] (default = 2016)
roiy : int, optional
ROI size in the y-direction [pixels] (default = 2016)
scanid : int, optional
Scan identification number to be associated with the scan data
(default = 0)
correction_mode : int, optional
The correction to be applied to the imaging data. The following
modes are available (default = 5):
* 0: Bypass. No corrections are applied to the data.
* 1: Send correction factor A instead of pixel values
* 2: Send correction factor B instead of pixel values
* 3: Send correction factor C instead of pixel values
* 4: Invert pixel values, but do not apply any linearity correction
* 5: Apply the full linearity correction
"""
# If Bluesky style configure
if isinstance(nimages, dict):
d = nimages.copy()
nimages = d.get('nimages', 10)
exposure = d.get('exposure', exposure)
period = d.get('period', period)
roix = d.get('roix', roix)
roiy = d.get('roiy', roiy)
scanid = d.get('scanid', scanid)
correction_mode = d.get('correction_mode', correction_mode)
# Stop acquisition
self.cmdStartCamera.set(0).wait()
if self._auto_soft_enable:
self.cmdSoftEnable.set(0).wait()
# change settings
self.cfgExposure.set(exposure).wait()
self.cfgFramerate.set(period).wait()
self.cfgRoiX.set(roix).wait()
self.cfgRoiY.set(roiy).wait()
self.cfgScanId.set(scanid).wait()
self.cfgCntNum.set(nimages).wait()
self.cfgCorrMode.set(correction_mode).wait()
# Commit parameter
self.cmdSetParam.set(1).wait()
self.state = const.GfStatus.CONFIGURED
self._settings = {
"nimages": nimages,
"exposure": exposure,
"frame_rate": period,
"roix": roix,
"roiy": roiy,
"scanid": scanid,
"correction_mode": correction_mode,
"backend_url": self._backend_url,
"auto_soft_enable": self._auto_soft_enable,
"ioc_name": self.name,
}
cam = Component(GigaFrostCamera, "", auto_monitor=True)
daq = Component(StdDaqClient, "", auto_monitor=True)
def stage(self):
"""Standard ophyd method to start an acquisition
px_daq_h = self.daq.config.cfg_image_pixel_height.get()
px_daq_w = self.daq.config.cfg_image_pixel_width.get()
In ophyd it still needs to be triggered to perfor an actual capture.
"""
if self.infoBusyFlag.value:
raise RuntimeError("Camera is already busy, unstage it first!")
px_gf_h = self.cam.cfgRoiX.get()
px_gf_y = self.cam.cfgRoiY.get()
# change to running
self.cmdStartCamera.set(1).wait()
# soft trigger on (DISABLED: use trigger() in ophyd)
#if self._auto_soft_enable:
# self.cmdSoftEnable.set(1).wait()
self.state = const.GfStatus.ACQUIRING
# Gigafrost can finish a run without explicit unstaging
self._staged = Staged.no
if px_daq_h != px_gf_h or px_daq_w != px_gf_w:
raise RuntimeError(f"Different image size configured on GF and the DAQ")
return super().stage()
def unstage(self):
"""Standard ophyd method to finish an acquisition"""
# switch to idle
self.cmdStartCamera.set(0).wait()
if self._auto_soft_enable:
self.cmdSoftEnable.set(0).wait()
self.state = const.GfStatus.STOPPED
return super().unstage()
def stop(self):
"""Standard ophyd method to stop an acquisition"""
self.unstage()
def trigger(self) -> DeviceStatus:
"""Sends a software trigger and approximately waits to finnish"""
status = DeviceStatus(self)
# Soft triggering based on operation mode
if self._auto_soft_enable and self.trigger_mode=='auto' and self.enable_mode=='soft':
# BEC teststand operation mode: posedge of SoftEnable if Started
self.cmdSoftEnable.set(0).wait()
self.cmdSoftEnable.set(1).wait()
sleep_time = self.cfgFramerate.value*self.cfgCntNum.value*0.001+0.050
sleep(sleep_time)
print(f"[GF2] Slept for: {sleep_time} seconds", file=sys.stderr)
else:
self.cmdSoftTrigger.set(1).wait()
status.set_finished()
return status
@property
def exposure_mode(self):
"""Returns the current exposure mode of the GigaFRost camera.
Returns
-------
exp_mode : {'external', 'timer', 'soft'}
The camera's active exposure mode.
If more than one mode is active at the same time, it returns None.
"""
mode_soft = self.cfgTrigExpSoft.get()
mode_timer = self.cfgTrigExpTimer.get()
mode_external = self.cfgTrigExpExt.get()
if mode_soft and not mode_timer and not mode_external:
return "soft"
elif not mode_soft and mode_timer and not mode_external:
return "timer"
elif not mode_soft and not mode_timer and mode_external:
return "external"
else:
return None
@exposure_mode.setter
def exposure_mode(self, exp_mode):
"""Apply the exposure mode for the GigaFRoST camera.
Parameters
----------
exp_mode : {'external', 'timer', 'soft'}
The exposure mode to be set.
"""
if exp_mode not in self._valid_exposure_modes:
raise ValueError(
f"Invalid exposure mode! Valid modes are:\n" "{self._valid_exposure_modes}"
)
if exp_mode == "external":
self.cfgTrigExpExt.set(1).wait()
self.cfgTrigExpSoft.set(0).wait()
self.cfgTrigExpTimer.set(0).wait()
elif exp_mode == "timer":
self.cfgTrigExpExt.set(0).wait()
self.cfgTrigExpSoft.set(0).wait()
self.cfgTrigExpTimer.set(1).wait()
elif exp_mode == "soft":
self.cfgTrigExpExt.set(0).wait()
self.cfgTrigExpSoft.set(1).wait()
self.cfgTrigExpTimer.set(0).wait()
# Commit parameters
self.cmdSetParam.set(1).wait()
@property
def fix_nframes_mode(self):
"""Return the current fixed number of frames mode of the GigaFRoST camera.
Returns
-------
fix_nframes_mode : {'off', 'start', 'end', 'start+end'}
The camera's active fixed number of frames mode.
"""
start_bit = self.cfgCntStartBit.get()
end_bit = self.cfgCntStartBit.get()
if not start_bit and not end_bit:
return "off"
elif start_bit and not end_bit:
return "start"
elif not start_bit and end_bit:
return "end"
elif start_bit and end_bit:
return "start+end"
else:
return None
@fix_nframes_mode.setter
def fix_nframes_mode(self, mode):
"""Apply the fixed number of frames settings to the GigaFRoST camera.
Parameters
----------
mode : {'off', 'start', 'end', 'start+end'}
The fixed number of frames mode to be applied.
"""
if mode not in self._valid_fix_nframe_modes:
raise ValueError(
"Invalid fixed number of frames mode! "
"Valid modes are:\n{}".format(self._valid_fix_nframe_modes)
)
self._fix_nframes_mode = mode
if self._fix_nframes_mode == "off":
self.cfgCntStartBit.set(0).wait()
self.cfgCntEndBit.set(0).wait()
elif self._fix_nframes_mode == "start":
self.cfgCntStartBit.set(1).wait()
self.cfgCntEndBit.set(0).wait()
elif self._fix_nframes_mode == "end":
self.cfgCntStartBit.set(0).wait()
self.cfgCntEndBit.set(1).wait()
elif self._fix_nframes_mode == "start+end":
self.cfgCntStartBit.set(1).wait()
self.cfgCntEndBit.set(1).wait()
# Commit parameters
self.cmdSetParam.set(1).wait()
@property
def trigger_mode(self):
"""Method to detect the current trigger mode set in the GigaFRost camera.
Returns
-------
mode : {'auto', 'external', 'timer', 'soft'}
The camera's active trigger mode. If more than one mode is active
at the moment, None is returned.
"""
mode_auto = self.cfgTrigAuto.get()
mode_external = self.cfgTrigExt.get()
mode_timer = self.cfgTrigTimer.get()
mode_soft = self.cfgTrigSoft.get()
if mode_auto:
return "auto"
elif mode_soft:
return "soft"
elif mode_timer:
return "timer"
elif mode_external:
return "external"
else:
return None
@trigger_mode.setter
def trigger_mode(self, mode):
"""Set the trigger mode for the GigaFRoST camera.
Parameters
----------
mode : {'auto', 'external', 'timer', 'soft'}
The GigaFRoST trigger mode.
"""
if mode not in self._valid_trigger_modes:
raise ValueError(
"Invalid trigger mode! Valid modes are:\n" "{self._valid_trigger_modes}"
)
if mode == "auto":
self.cfgTrigAuto.set(1).wait()
self.cfgTrigSoft.set(0).wait()
self.cfgTrigTimer.set(0).wait()
self.cfgTrigExt.set(0).wait()
elif mode == "external":
self.cfgTrigAuto.set(0).wait()
self.cfgTrigSoft.set(0).wait()
self.cfgTrigTimer.set(0).wait()
self.cfgTrigExt.set(1).wait()
elif mode == "timer":
self.cfgTrigAuto.set(0).wait()
self.cfgTrigSoft.set(0).wait()
self.cfgTrigTimer.set(1).wait()
self.cfgTrigExt.set(0).wait()
elif mode == "soft":
self.cfgTrigAuto.set(0).wait()
self.cfgTrigSoft.set(1).wait()
self.cfgTrigTimer.set(0).wait()
self.cfgTrigExt.set(0).wait()
# Commit parameters
self.cmdSetParam.set(1).wait()
@property
def enable_mode(self):
"""Return the enable mode set in the GigaFRost camera.
Returns
-------
enable_mode: {'soft', 'external', 'soft+ext', 'always'}
The camera's active enable mode.
"""
mode_soft = self.cfgTrigEnableSoft.get()
mode_external = self.cfgTrigEnableExt.get()
mode_auto = self.cfgTrigEnableAuto.get()
if mode_soft and not mode_auto:
if mode_external:
return "soft+ext"
else:
return "soft"
elif mode_auto and not mode_soft and not mode_external:
return "always"
elif mode_external and not mode_soft and not mode_auto:
return "external"
else:
return None
@enable_mode.setter
def enable_mode(self, mode):
"""Apply the enable mode for the GigaFRoST camera.
Parameters
----------
mode : {'soft', 'external', 'soft+ext', 'always'}
The enable mode to be applied.
"""
if mode not in self._valid_enable_modes:
raise ValueError(
"Invalid enable mode! Valid modes are:\n" "{self._valid_enable_modes}"
)
if mode == "soft":
self.cfgTrigEnableExt.set(0).wait()
self.cfgTrigEnableSoft.set(1).wait()
self.cfgTrigEnableAuto.set(0).wait()
elif mode == "external":
self.cfgTrigEnableExt.set(1).wait()
self.cfgTrigEnableSoft.set(0).wait()
self.cfgTrigEnableAuto.set(0).wait()
elif mode == "soft+ext":
self.cfgTrigEnableExt.set(1).wait()
self.cfgTrigEnableSoft.set(1).wait()
self.cfgTrigEnableAuto.set(0).wait()
elif mode == "always":
self.cfgTrigEnableExt.set(0).wait()
self.cfgTrigEnableSoft.set(0).wait()
self.cfgTrigEnableAuto.set(1).wait()
# Commit parameters
self.cmdSetParam.set(1).wait()
def get_state(self):
return self.state
def get_south_mac(self):
return self._south_mac
def get_north_mac(self):
return self._north_mac
def get_north_ip(self):
return self._north_ip
def get_south_ip(self):
return self._south_ip
def get_backend_url(self):
"""Method to read the configured backend URL"""
return self._backend_url
def set_backend_ip(self, north, south):
"""Method to manually set the backend ip"""
self._north_ip, self._south_ip = north, south
def set_backend_mac(self, north, south):
"""Method to manually set the backend mac"""
self._north_mac, self._south_mac = north, south
def _build_udp_header_table(self):
"""Build the header table for the communication"""
udp_header_table = []
for i in range(0, 64, 1):
for j in range(0, 8, 1):
dest_port = 2000 + 8 * i + j
source_port = 3000 + j
if j < 4:
extend_header_table(
udp_header_table, self._south_mac, self._south_ip, dest_port, source_port
)
else:
extend_header_table(
udp_header_table, self._north_mac, self._north_ip, dest_port, source_port
)
return udp_header_table
def _define_backend_ip(self):
if self._backend_url == const.BE3_DAFL_CLIENT: # xbl-daq-33
return const.BE3_NORTH_IP, const.BE3_SOUTH_IP
elif self._backend_url == const.BE999_DAFL_CLIENT:
return const.BE999_NORTH_IP, const.BE999_SOUTH_IP
else:
raise RuntimeError(f"Backend not recognized. {(const.GF1, const.GF2, const.GF3)}")
def _define_backend_mac(self):
if self._backend_url == const.BE3_DAFL_CLIENT: # xbl-daq-33
return const.BE3_NORTH_MAC, const.BE3_SOUTH_MAC
elif self._backend_url == const.BE999_DAFL_CLIENT:
return const.BE999_NORTH_MAC, const.BE999_SOUTH_MAC
else:
raise RuntimeError(f"Backend not recognized. {(const.GF1, const.GF2, const.GF3)}")
def _set_udp_header_table(self):
"""Set the communication parameters for the camera module"""
self.cfgConnectionParam.set(self._build_udp_header_table()).wait()
# Automatically connect to MicroSAXS testbench if directly invoked
if __name__ == "__main__":