From 7f8dd3a4bb031d2696f5694d919b927a35de84ff Mon Sep 17 00:00:00 2001 From: Istvan Mohacsi Date: Mon, 22 Jul 2024 17:59:11 +0200 Subject: [PATCH] Daili commit coming to shape --- .gitignore | 1 + tomcat_bec/devices/gigafrost/gfconstants.py | 6 + .../devices/gigafrost/gigafrostcamera.py | 696 +++++++++++++++ .../devices/gigafrost/gigafrostclient.py | 840 ++++-------------- 4 files changed, 893 insertions(+), 650 deletions(-) create mode 100644 tomcat_bec/devices/gigafrost/gigafrostcamera.py diff --git a/.gitignore b/.gitignore index f4c73aa..4c2ec57 100644 --- a/.gitignore +++ b/.gitignore @@ -40,6 +40,7 @@ share/python-wheels/ *.egg-info/ .installed.cfg *.egg +*.bak MANIFEST # PyInstaller diff --git a/tomcat_bec/devices/gigafrost/gfconstants.py b/tomcat_bec/devices/gigafrost/gfconstants.py index 536c9df..f7786bf 100644 --- a/tomcat_bec/devices/gigafrost/gfconstants.py +++ b/tomcat_bec/devices/gigafrost/gfconstants.py @@ -9,6 +9,12 @@ Created on Thu Jun 27 17:28:43 2024 from enum import Enum +gf_valid_enable_modes = ("soft", "external", "soft+ext", "always") +gf_valid_exposure_modes = ("external", "timer", "soft") +gf_valid_trigger_modes = ("auto", "external", "timer", "soft") +gf_valid_fix_nframe_modes = ("off", "start", "end", "start+end") + + # STATUS class GfStatus(Enum): """Operation states for GigaFrost Ophyd device""" diff --git a/tomcat_bec/devices/gigafrost/gigafrostcamera.py b/tomcat_bec/devices/gigafrost/gigafrostcamera.py new file mode 100644 index 0000000..a63b217 --- /dev/null +++ b/tomcat_bec/devices/gigafrost/gigafrostcamera.py @@ -0,0 +1,696 @@ +# -*- coding: utf-8 -*- +""" +GigaFrost camera class module + +Created on Thu Jun 27 17:28:43 2024 + +@author: mohacsi_i +""" +import sys +from time import sleep +from ophyd import Device, Component, EpicsSignal, EpicsSignalRO, Kind, DeviceStatus +from ophyd.device import Staged + +from ophyd_devices.interfaces.base_classes.psi_detector_base import ( + CustomDetectorMixin, + PSIDetectorBase, +) + +try: + import gfconstants as const +except ModuleNotFoundError: + import tomcat_bec.devices.gigafrost.gfconstants as const + +try: + from gfutils import extend_header_table +except ModuleNotFoundError: + from tomcat_bec.devices.gigafrost.gfutils import extend_header_table + + +class GigaFrostCameraMixin(CustomDetectorMixin): + """Mixin class to setup TOMCAT specific implementations of the detector. + + This class will be called by the custom_prepare_cls attribute of the detector class. + """ + def _define_backend_ip(self): + if self.parent.backendUrl.get() == const.BE3_DAFL_CLIENT: # xbl-daq-33 + return const.BE3_NORTH_IP, const.BE3_SOUTH_IP + elif self.parent.backendUrl.get() == const.BE999_DAFL_CLIENT: + return const.BE999_NORTH_IP, const.BE999_SOUTH_IP + else: + raise RuntimeError(f"Backend not recognized. {(const.GF1, const.GF2, const.GF3)}") + + def _define_backend_mac(self): + if self.parent.backendUrl.get() == const.BE3_DAFL_CLIENT: # xbl-daq-33 + return const.BE3_NORTH_MAC, const.BE3_SOUTH_MAC + elif self.parent.backendUrl.get() == const.BE999_DAFL_CLIENT: + return const.BE999_NORTH_MAC, const.BE999_SOUTH_MAC + else: + raise RuntimeError(f"Backend not recognized. {(const.GF1, const.GF2, const.GF3)}") + + def _set_udp_header_table(self): + """Set the communication parameters for the camera module""" + self.parent.cfgConnectionParam.set(self._build_udp_header_table()).wait() + + def _build_udp_header_table(self): + """Build the header table for the communication""" + udp_header_table = [] + + for i in range(0, 64, 1): + for j in range(0, 8, 1): + dest_port = 2000 + 8 * i + j + source_port = 3000 + j + if j < 4: + extend_header_table( + udp_header_table, self.parent.macSouth, self.parent.ipSouth, dest_port, source_port + ) + else: + extend_header_table( + udp_header_table, self.parent.macNorth, self.parent.ipNorth, dest_port, source_port + ) + + return udp_header_table + + + def on_init(self) -> None: + """Initialize the camera, set channel values""" + ## Stop acquisition + self.parent.cmdStartCamera.set(0).wait() + + ### set entry to UDP table + # number of UDP ports to use + self.parent.cfgUdpNumPorts.set(2).wait() + # number of images to send to each UDP port before switching to next + self.parent.cfgUdpNumFrames.set(5).wait() + # offset in UDP table - where to find the first entry + self.parent.cfgUdpHtOffset.set(0).wait() + # activate changes + self.parent.cmdWriteService.set(1).wait() + + # Configure software triggering if needed + if self.parent._auto_soft_enable: + # trigger modes + self.parent.cfgCntStartBit.set(1).wait() + self.parent.cfgCntEndBit.set(0).wait() + + # set modes + self.parent.enable_mode = "soft" + self.parent.trigger_mode = "auto" + self.parent.exposure_mode = "timer" + + # line swap - on for west, off for east + self.parent.cfgLineSwapSW.set(1).wait() + self.parent.cfgLineSwapNW.set(1).wait() + self.parent.cfgLineSwapSE.set(0).wait() + self.parent.cfgLineSwapNE.set(0).wait() + + # Commit parameters + self.parent.cmdSetParam.set(1).wait() + + # Initialize data backend + n, s = self._define_backend_ip() + self.parent.ipNorth.put(n, force=True) + self.parent.ipSouth.put(s, force=True) + n, s = self._define_backend_mac() + self.parent.macNorth.put(n, force=True) + self.parent.macSouth.put(s, force=True) + # Set udp header table + self._set_udp_header_table() + + self.parent.state.put(const.GfStatus.INIT, force=True) + return super().on_init() + + + + def on_stage(self) -> None: + """ + Specify actions to be executed during stage in preparation for a scan. + self.parent.scaninfo already has all current parameters for the upcoming scan. + + In case the backend service is writing data on disk, this step should include publishing + a file_event and file_message to BEC to inform the system where the data is written to. + + IMPORTANT: + It must be safe to assume that the device is ready for the scan + to start immediately once this function is finished. + """ + if self.parent.infoBusyFlag.value: + raise RuntimeError("Camera is already busy, unstage it first!") + # Switch to acquiring + self.parent.cmdStartCamera.set(1).wait() + self.parent.state.put(const.GfStatus.ACQUIRING, force=True) + # Gigafrost can finish a run without explicit unstaging + self.parent._staged = Staged.no + + def on_unstage(self) -> None: + """ + Specify actions to be executed during unstage. + + This step should include checking if the acqusition was successful, + and publishing the file location and file event message, + with flagged done to BEC. + """ + # Switch to idle + self.parent.cmdStartCamera.set(0).wait() + if self.parent.autoSoftEnable.get(): + self.cmdSoftEnable.set(0).wait() + self.parent.state.put(const.GfStatus.STOPPED, force=True) + + def on_stop(self) -> None: + """ + Specify actions to be executed during stop. + This must also set self.parent.stopped to True. + + This step should include stopping the detector and backend service. + """ + return self.on_unstage() + + def on_trigger(self) -> None | DeviceStatus: + """ + Specify actions to be executed upon receiving trigger signal. + Return a DeviceStatus object or None + """ + status = DeviceStatus(self.parent) + + # Soft triggering based on operation mode + if self.parent.autoSoftEnable.get() and self.parent.trigger_mode=='auto' and self.parent.enable_mode=='soft': + # BEC teststand operation mode: posedge of SoftEnable if Started + self.parent.cmdSoftEnable.set(0).wait() + self.parent.cmdSoftEnable.set(1).wait() + sleep_time = self.parent.cfgFramerate.value*self.parent.cfgCntNum.value*0.001+0.050 + # There's no status readback from the camera, so we just wait + sleep(sleep_time) + print(f"[GF2] Slept for: {sleep_time} seconds", file=sys.stderr) + else: + self.parent.cmdSoftTrigger.set(1).wait() + status.set_finished() + + return status + + +class GigaFrostCamera(PSIDetectorBase): + """Ophyd device class to control Gigafrost cameras at Tomcat + + The actual hardware is implemented by an IOC based on an old fork of Helge's + cameras. This means that the camera behaves differently than the SF cameras + in particular it provides even less feedback about it's internal progress. + Helge will update the GigaFrost IOC after working beamline. + The ophyd class is based on the 'gfclient' package and has a lot of Tomcat + specific additions. It does behave differently though, as ophyd swallows the + errors from failed PV writes. + + Parameters + ---------- + use_soft_enable : bool + Flag to use the camera's soft enable (default: False) + backend_url : str + Backend url address necessary to set up the camera's udp header. + (default: http://xbl-daq-23:8080) + + Bugs: + ---------- + FRAMERATE : Ignored in soft trigger mode, period becomes 2xexposure time + """ + # pylint: disable=too-many-instance-attributes + + custom_prepare_cls = GigaFrostCameraMixin + + + + infoBusyFlag = Component(EpicsSignalRO, "BUSY_STAT", auto_monitor=True) + infoSyncFlag = Component(EpicsSignalRO, "SYNC_FLAG", auto_monitor=True) + cmdSyncHw = Component(EpicsSignal, "SYNC_SWHW.PROC", put_complete=True) + cmdStartCamera = Component(EpicsSignal, "START_CAM", put_complete=True) + cmdSetParam = Component(EpicsSignal, "SET_PARAM.PROC", put_complete=True) + + # UDP header + cfgUdpNumPorts = Component(EpicsSignal, "PORTS", put_complete=True, kind=Kind.config) + cfgUdpNumFrames = Component(EpicsSignal, "FRAMENUM", put_complete=True, kind=Kind.config) + cfgUdpHtOffset = Component(EpicsSignal, "HT_OFFSET", put_complete=True, kind=Kind.config) + cmdWriteService = Component(EpicsSignal, "WRITE_SRV.PROC", put_complete=True) + + # Standard camera configs + cfgExposure = Component(EpicsSignal, "EXPOSURE", put_complete=True, auto_monitor=True, kind=Kind.config) + cfgFramerate = Component(EpicsSignal, "FRAMERATE", put_complete=True, auto_monitor=True, kind=Kind.config) + cfgRoiX = Component(EpicsSignal, "ROIX", put_complete=True, auto_monitor=True, kind=Kind.config) + cfgRoiY = Component(EpicsSignal, "ROIY", put_complete=True, auto_monitor=True, kind=Kind.config) + cfgScanId = Component(EpicsSignal, "SCAN_ID", put_complete=True, auto_monitor=True, kind=Kind.config) + cfgCntNum = Component(EpicsSignal, "CNT_NUM", put_complete=True, auto_monitor=True, kind=Kind.config) + cfgCorrMode = Component(EpicsSignal, "CORR_MODE", put_complete=True, auto_monitor=True, kind=Kind.config) + + # Software signals + cmdSoftEnable = Component(EpicsSignal, "SOFT_ENABLE", put_complete=True) + cmdSoftTrigger = Component(EpicsSignal, "SOFT_TRIG.PROC", put_complete=True) + cmdSoftExposure = Component(EpicsSignal, "SOFT_EXP", put_complete=True) + + # Trigger configuration PVs + cfgCntStartBit = Component( + EpicsSignal, + "CNT_STARTBIT_RBV", + write_pv="CNT_STARTBIT", + put_complete=True, + kind=Kind.config, + ) + cfgCntEndBit = Component( + EpicsSignal, "CNT_ENDBIT_RBV", write_pv="CNT_ENDBIT", put_complete=True, kind=Kind.config + ) + # Enable modes + cfgTrigEnableExt = Component( + EpicsSignal, + "MODE_ENBL_EXT_RBV", + write_pv="MODE_ENBL_EXT", + put_complete=True, + kind=Kind.config, + ) + cfgTrigEnableSoft = Component( + EpicsSignal, + "MODE_ENBL_SOFT_RBV", + write_pv="MODE_ENBL_SOFT", + put_complete=True, + kind=Kind.config, + ) + cfgTrigEnableAuto = Component( + EpicsSignal, + "MODE_ENBL_AUTO_RBV", + write_pv="MODE_ENBL_AUTO", + put_complete=True, + kind=Kind.config, + ) + cfgTrigVirtEnable = Component( + EpicsSignal, + "MODE_ENBL_EXP_RBV", + write_pv="MODE_ENBL_EXP", + put_complete=True, + kind=Kind.config, + ) + # Trigger modes + cfgTrigExt = Component( + EpicsSignal, + "MODE_TRIG_EXT_RBV", + write_pv="MODE_TRIG_EXT", + put_complete=True, + kind=Kind.config, + ) + cfgTrigSoft = Component( + EpicsSignal, + "MODE_TRIG_SOFT_RBV", + write_pv="MODE_TRIG_SOFT", + put_complete=True, + kind=Kind.config, + ) + cfgTrigTimer = Component( + EpicsSignal, + "MODE_TRIG_TIMER_RBV", + write_pv="MODE_TRIG_TIMER", + put_complete=True, + kind=Kind.config, + ) + cfgTrigAuto = Component( + EpicsSignal, + "MODE_TRIG_AUTO_RBV", + write_pv="MODE_TRIG_AUTO", + put_complete=True, + kind=Kind.config, + ) + # Exposure modes + cfgTrigExpExt = Component( + EpicsSignal, + "MODE_EXP_EXT_RBV", + write_pv="MODE_EXP_EXT", + put_complete=True, + kind=Kind.config, + ) + cfgTrigExpSoft = Component( + EpicsSignal, + "MODE_EXP_SOFT_RBV", + write_pv="MODE_EXP_SOFT", + put_complete=True, + kind=Kind.config, + ) + cfgTrigExpTimer = Component( + EpicsSignal, + "MODE_EXP_TIMER_RBV", + write_pv="MODE_EXP_TIMER", + put_complete=True, + kind=Kind.config, + ) + + # Line swap selection + cfgLineSwapSW = Component(EpicsSignal, "LS_SW", put_complete=True, kind=Kind.config) + cfgLineSwapNW = Component(EpicsSignal, "LS_NW", put_complete=True, kind=Kind.config) + cfgLineSwapSE = Component(EpicsSignal, "LS_SE", put_complete=True, kind=Kind.config) + cfgLineSwapNE = Component(EpicsSignal, "LS_NE", put_complete=True, kind=Kind.config) + cfgConnectionParam = Component( + EpicsSignal, "CONN_PARM", string=True, put_complete=True, kind=Kind.config + ) + + # HW settings as read only + cfgSyncFlag = Component(EpicsSignalRO, "PIXRATE", auto_monitor=True) + cfgTrigDelay = Component(EpicsSignalRO, "TRIG_DELAY", auto_monitor=True) + cfgSyncoutDelay = Component(EpicsSignalRO, "SYNCOUT_DLY", auto_monitor=True) + cfgOutputPolarity0 = Component(EpicsSignalRO, "BNC0_RBV", auto_monitor=True) + cfgOutputPolarity1 = Component(EpicsSignalRO, "BNC1_RBV", auto_monitor=True) + cfgOutputPolarity2 = Component(EpicsSignalRO, "BNC2_RBV", auto_monitor=True) + cfgOutputPolarity3 = Component(EpicsSignalRO, "BNC3_RBV", auto_monitor=True) + cfgInputPolarity1 = Component(EpicsSignalRO, "BNC4_RBV", auto_monitor=True) + cfgInputPolarity2 = Component(EpicsSignalRO, "BNC5_RBV", auto_monitor=True) + infoBoardTemp = Component(EpicsSignalRO, "T_BOARD", auto_monitor=True) + + USER_ACCESS = ["exposure_mode", "fix_nframes_mode", "trigger_mode", "enable_mode"] + + autoSoftEnable = Component(Signal, auto_monitor=True, kind=Kind.config) + backendUrl = Component(Signal, auto_monitor=True, kind=Kind.config) + state = Component(Signal, auto_monitor=True, kind=Kind.config) + + def __init__( + self, + prefix="", + *, + name, + auto_soft_enable=False, + timeout=10, + backend_url=const.BE999_DAFL_CLIENT, + kind=None, + read_attrs=None, + configuration_attrs=None, + parent=None, + **kwargs, + ): + # Additional parameters + self.autoSoftEnable._metadata["write_access"] = False + self.backendUrl._metadata["write_access"] = False + self.state._metadata["write_access"] = False + self.autoSoftEnable.put(auto_soft_enable, force=True) + self.backendUrl.put(backend_url, force=True) + self.state.put(const.GfStatus.NEW, force=True) + + # super() will call the mixin class + super().__init__( + prefix=prefix, + name=name, + kind=kind, + read_attrs=read_attrs, + configuration_attrs=configuration_attrs, + parent=parent, + **kwargs, + ) + + def configure( + self, + nimages=10, + exposure=0.2, + period=1.0, + roix=2016, + roiy=2016, + scanid=0, + correction_mode=5, + ): + """Configure the next scan with the GigaFRoST camera + + Parameters + ---------- + nimages : int, optional + Number of images to be taken during each scan. Set to -1 for an + unlimited number of images (limited by the ringbuffer size and + backend speed). (default = 10) + exposure : float, optional + Exposure time [ms]. (default = 0.2) + period : float, optional + Exposure period [ms], ignored in soft trigger mode. (default = 1.0) + roix : int, optional + ROI size in the x-direction [pixels] (default = 2016) + roiy : int, optional + ROI size in the y-direction [pixels] (default = 2016) + scanid : int, optional + Scan identification number to be associated with the scan data + (default = 0) + correction_mode : int, optional + The correction to be applied to the imaging data. The following + modes are available (default = 5): + + * 0: Bypass. No corrections are applied to the data. + * 1: Send correction factor A instead of pixel values + * 2: Send correction factor B instead of pixel values + * 3: Send correction factor C instead of pixel values + * 4: Invert pixel values, but do not apply any linearity correction + * 5: Apply the full linearity correction + """ + # If Bluesky style configure + if isinstance(nimages, dict): + d = nimages.copy() + nimages = d.get('nimages', 10) + exposure = d.get('exposure', exposure) + period = d.get('period', period) + roix = d.get('roix', roix) + roiy = d.get('roiy', roiy) + scanid = d.get('scanid', scanid) + correction_mode = d.get('correction_mode', correction_mode) + + # Stop acquisition + self.cmdStartCamera.set(0).wait() + if self._auto_soft_enable: + self.cmdSoftEnable.set(0).wait() + + # change settings + self.cfgExposure.set(exposure).wait() + self.cfgFramerate.set(period).wait() + self.cfgRoiX.set(roix).wait() + self.cfgRoiY.set(roiy).wait() + self.cfgScanId.set(scanid).wait() + self.cfgCntNum.set(nimages).wait() + self.cfgCorrMode.set(correction_mode).wait() + + # Commit parameter + self.cmdSetParam.set(1).wait() + self.state.set(const.GfStatus.CONFIGURED, force=True) + + @property + def exposure_mode(self): + """Returns the current exposure mode of the GigaFRost camera. + + Returns + ------- + exp_mode : {'external', 'timer', 'soft'} + The camera's active exposure mode. + If more than one mode is active at the same time, it returns None. + """ + mode_soft = self.cfgTrigExpSoft.get() + mode_timer = self.cfgTrigExpTimer.get() + mode_external = self.cfgTrigExpExt.get() + if mode_soft and not mode_timer and not mode_external: + return "soft" + elif not mode_soft and mode_timer and not mode_external: + return "timer" + elif not mode_soft and not mode_timer and mode_external: + return "external" + else: + return None + + @exposure_mode.setter + def exposure_mode(self, exp_mode): + """Apply the exposure mode for the GigaFRoST camera. + + Parameters + ---------- + exp_mode : {'external', 'timer', 'soft'} + The exposure mode to be set. + """ + if exp_mode not in const.gf_valid_exposure_modes: + raise ValueError( + f"Invalid exposure mode! Valid modes are:\n" "{const.gf_valid_exposure_modes}" + ) + + if exp_mode == "external": + self.cfgTrigExpExt.set(1).wait() + self.cfgTrigExpSoft.set(0).wait() + self.cfgTrigExpTimer.set(0).wait() + elif exp_mode == "timer": + self.cfgTrigExpExt.set(0).wait() + self.cfgTrigExpSoft.set(0).wait() + self.cfgTrigExpTimer.set(1).wait() + elif exp_mode == "soft": + self.cfgTrigExpExt.set(0).wait() + self.cfgTrigExpSoft.set(1).wait() + self.cfgTrigExpTimer.set(0).wait() + # Commit parameters + self.cmdSetParam.set(1).wait() + + @property + def fix_nframes_mode(self): + """Return the current fixed number of frames mode of the GigaFRoST camera. + + Returns + ------- + fix_nframes_mode : {'off', 'start', 'end', 'start+end'} + The camera's active fixed number of frames mode. + """ + start_bit = self.cfgCntStartBit.get() + end_bit = self.cfgCntStartBit.get() + + if not start_bit and not end_bit: + return "off" + elif start_bit and not end_bit: + return "start" + elif not start_bit and end_bit: + return "end" + elif start_bit and end_bit: + return "start+end" + else: + return None + + @fix_nframes_mode.setter + def fix_nframes_mode(self, mode): + """Apply the fixed number of frames settings to the GigaFRoST camera. + + Parameters + ---------- + mode : {'off', 'start', 'end', 'start+end'} + The fixed number of frames mode to be applied. + """ + if mode not in const.gf_valid_fix_nframe_modes: + raise ValueError( + f"Invalid fixed number of frames mode! Valid modes are:\n{const.gf_valid_fix_nframe_modes}" + ) + + self._fix_nframes_mode = mode + if self._fix_nframes_mode == "off": + self.cfgCntStartBit.set(0).wait() + self.cfgCntEndBit.set(0).wait() + elif self._fix_nframes_mode == "start": + self.cfgCntStartBit.set(1).wait() + self.cfgCntEndBit.set(0).wait() + elif self._fix_nframes_mode == "end": + self.cfgCntStartBit.set(0).wait() + self.cfgCntEndBit.set(1).wait() + elif self._fix_nframes_mode == "start+end": + self.cfgCntStartBit.set(1).wait() + self.cfgCntEndBit.set(1).wait() + # Commit parameters + self.cmdSetParam.set(1).wait() + + @property + def trigger_mode(self): + """Method to detect the current trigger mode set in the GigaFRost camera. + + Returns + ------- + mode : {'auto', 'external', 'timer', 'soft'} + The camera's active trigger mode. If more than one mode is active + at the moment, None is returned. + """ + mode_auto = self.cfgTrigAuto.get() + mode_external = self.cfgTrigExt.get() + mode_timer = self.cfgTrigTimer.get() + mode_soft = self.cfgTrigSoft.get() + if mode_auto: + return "auto" + elif mode_soft: + return "soft" + elif mode_timer: + return "timer" + elif mode_external: + return "external" + else: + return None + + @trigger_mode.setter + def trigger_mode(self, mode): + """Set the trigger mode for the GigaFRoST camera. + + Parameters + ---------- + mode : {'auto', 'external', 'timer', 'soft'} + The GigaFRoST trigger mode. + """ + if mode not in const.gf_valid_trigger_modes: + raise ValueError( + "Invalid trigger mode! Valid modes are:\n" "{const.gf_valid_trigger_modes}" + ) + + if mode == "auto": + self.cfgTrigAuto.set(1).wait() + self.cfgTrigSoft.set(0).wait() + self.cfgTrigTimer.set(0).wait() + self.cfgTrigExt.set(0).wait() + elif mode == "external": + self.cfgTrigAuto.set(0).wait() + self.cfgTrigSoft.set(0).wait() + self.cfgTrigTimer.set(0).wait() + self.cfgTrigExt.set(1).wait() + elif mode == "timer": + self.cfgTrigAuto.set(0).wait() + self.cfgTrigSoft.set(0).wait() + self.cfgTrigTimer.set(1).wait() + self.cfgTrigExt.set(0).wait() + elif mode == "soft": + self.cfgTrigAuto.set(0).wait() + self.cfgTrigSoft.set(1).wait() + self.cfgTrigTimer.set(0).wait() + self.cfgTrigExt.set(0).wait() + # Commit parameters + self.cmdSetParam.set(1).wait() + + @property + def enable_mode(self): + """Return the enable mode set in the GigaFRost camera. + + Returns + ------- + enable_mode: {'soft', 'external', 'soft+ext', 'always'} + The camera's active enable mode. + """ + mode_soft = self.cfgTrigEnableSoft.get() + mode_external = self.cfgTrigEnableExt.get() + mode_auto = self.cfgTrigEnableAuto.get() + if mode_soft and not mode_auto: + if mode_external: + return "soft+ext" + else: + return "soft" + elif mode_auto and not mode_soft and not mode_external: + return "always" + elif mode_external and not mode_soft and not mode_auto: + return "external" + else: + return None + + @enable_mode.setter + def enable_mode(self, mode): + """Apply the enable mode for the GigaFRoST camera. + + Parameters + ---------- + mode : {'soft', 'external', 'soft+ext', 'always'} + The enable mode to be applied. + """ + if mode not in const.gf_valid_enable_modes: + raise ValueError( + "Invalid enable mode! Valid modes are:\n" "{const.gf_valid_enable_modes}" + ) + + if mode == "soft": + self.cfgTrigEnableExt.set(0).wait() + self.cfgTrigEnableSoft.set(1).wait() + self.cfgTrigEnableAuto.set(0).wait() + elif mode == "external": + self.cfgTrigEnableExt.set(1).wait() + self.cfgTrigEnableSoft.set(0).wait() + self.cfgTrigEnableAuto.set(0).wait() + elif mode == "soft+ext": + self.cfgTrigEnableExt.set(1).wait() + self.cfgTrigEnableSoft.set(1).wait() + self.cfgTrigEnableAuto.set(0).wait() + elif mode == "always": + self.cfgTrigEnableExt.set(0).wait() + self.cfgTrigEnableSoft.set(0).wait() + self.cfgTrigEnableAuto.set(1).wait() + # Commit parameters + self.cmdSetParam.set(1).wait() + + +# Automatically connect to MicroSAXS testbench if directly invoked +if __name__ == "__main__": + gf = GigaFrostCamera( + "X02DA-CAM-GF2:", name="gf2", backend_url="http://xbl-daq-28:8080", auto_soft_enable=True + ) + gf.wait_for_connection() diff --git a/tomcat_bec/devices/gigafrost/gigafrostclient.py b/tomcat_bec/devices/gigafrost/gigafrostclient.py index 9c610c8..27f6df0 100644 --- a/tomcat_bec/devices/gigafrost/gigafrostclient.py +++ b/tomcat_bec/devices/gigafrost/gigafrostclient.py @@ -11,18 +11,190 @@ from time import sleep from ophyd import Device, Component, EpicsSignal, EpicsSignalRO, Kind, DeviceStatus from ophyd.device import Staged -try: - import gfconstants as const -except ModuleNotFoundError: - import tomcat_bec.devices.gigafrost.gfconstants as const +from ophyd_devices.interfaces.base_classes.psi_detector_base import ( + CustomDetectorMixin, + PSIDetectorBase, +) + try: - from gfutils import extend_header_table + from StdDaqClient import StdDaqClient except ModuleNotFoundError: - from tomcat_bec.devices.gigafrost.gfutils import extend_header_table + from tomcat_bec.devices.gigafrost.StdDaqClient import StdDaqClient + +try: + from gfcamera import GigaFrostCamera +except ModuleNotFoundError: + from tomcat_bec.devices.gigafrost.gfcamera import GigaFrostCamera -class GigaFrostClient(Device): + + + + +class GigafrostClientMixin(CustomDetectorMixin): + """Mixin class to setup TOMCAT specific implementations of the detector. + + This class will be called by the custom_prepare_cls attribute of the detector class. + """ + + def _define_backend_ip(self): + if self.parent.backendUrl.get() == const.BE3_DAFL_CLIENT: # xbl-daq-33 + return const.BE3_NORTH_IP, const.BE3_SOUTH_IP + elif self.parent.backendUrl.get() == const.BE999_DAFL_CLIENT: + return const.BE999_NORTH_IP, const.BE999_SOUTH_IP + else: + raise RuntimeError(f"Backend not recognized. {(const.GF1, const.GF2, const.GF3)}") + + def _define_backend_mac(self): + if self.parent.backendUrl.get() == const.BE3_DAFL_CLIENT: # xbl-daq-33 + return const.BE3_NORTH_MAC, const.BE3_SOUTH_MAC + elif self.parent.backendUrl.get() == const.BE999_DAFL_CLIENT: + return const.BE999_NORTH_MAC, const.BE999_SOUTH_MAC + else: + raise RuntimeError(f"Backend not recognized. {(const.GF1, const.GF2, const.GF3)}") + + def _set_udp_header_table(self): + """Set the communication parameters for the camera module""" + self.parent.cfgConnectionParam.set(self._build_udp_header_table()).wait() + + def _build_udp_header_table(self): + """Build the header table for the communication""" + udp_header_table = [] + + for i in range(0, 64, 1): + for j in range(0, 8, 1): + dest_port = 2000 + 8 * i + j + source_port = 3000 + j + if j < 4: + extend_header_table( + udp_header_table, self.parent.macSouth, self.parent.ipSouth, dest_port, source_port + ) + else: + extend_header_table( + udp_header_table, self.parent.macNorth, self.parent.ipNorth, dest_port, source_port + ) + + return udp_header_table + + + def on_init(self) -> None: + """Initialize the camera, set channel values""" + ## Stop acquisition + self.parent.cmdStartCamera.set(0).wait() + + ### set entry to UDP table + # number of UDP ports to use + self.parent.cfgUdpNumPorts.set(2).wait() + # number of images to send to each UDP port before switching to next + self.parent.cfgUdpNumFrames.set(5).wait() + # offset in UDP table - where to find the first entry + self.parent.cfgUdpHtOffset.set(0).wait() + # activate changes + self.parent.cmdWriteService.set(1).wait() + + # Configure software triggering if needed + if self.parent._auto_soft_enable: + # trigger modes + self.parent.cfgCntStartBit.set(1).wait() + self.parent.cfgCntEndBit.set(0).wait() + + # set modes + self.parent.enable_mode = "soft" + self.parent.trigger_mode = "auto" + self.parent.exposure_mode = "timer" + + # line swap - on for west, off for east + self.parent.cfgLineSwapSW.set(1).wait() + self.parent.cfgLineSwapNW.set(1).wait() + self.parent.cfgLineSwapSE.set(0).wait() + self.parent.cfgLineSwapNE.set(0).wait() + + # Commit parameters + self.parent.cmdSetParam.set(1).wait() + + # Initialize data backend + n, s = self._define_backend_ip() + self.parent.ipNorth.put(n, force=True) + self.parent.ipSouth.put(s, force=True) + n, s = self._define_backend_mac() + self.parent.macNorth.put(n, force=True) + self.parent.macSouth.put(s, force=True) + # Set udp header table + self._set_udp_header_table() + + self.parent.state.put(const.GfStatus.INIT, force=True) + return super().on_init() + + + + def on_stage(self) -> None: + """ + Specify actions to be executed during stage in preparation for a scan. + self.parent.scaninfo already has all current parameters for the upcoming scan. + + In case the backend service is writing data on disk, this step should include publishing + a file_event and file_message to BEC to inform the system where the data is written to. + + IMPORTANT: + It must be safe to assume that the device is ready for the scan + to start immediately once this function is finished. + """ + if self.parent.infoBusyFlag.value: + raise RuntimeError("Camera is already busy, unstage it first!") + # Switch to acquiring + self.parent.cmdStartCamera.set(1).wait() + self.parent.state.put(const.GfStatus.ACQUIRING, force=True) + # Gigafrost can finish a run without explicit unstaging + self.parent._staged = Staged.no + + def on_unstage(self) -> None: + """ + Specify actions to be executed during unstage. + + This step should include checking if the acqusition was successful, + and publishing the file location and file event message, + with flagged done to BEC. + """ + # Switch to idle + self.parent.cmdStartCamera.set(0).wait() + if self.parent.autoSoftEnable.get(): + self.cmdSoftEnable.set(0).wait() + self.parent.state.put(const.GfStatus.STOPPED, force=True) + + def on_stop(self) -> None: + """ + Specify actions to be executed during stop. + This must also set self.parent.stopped to True. + + This step should include stopping the detector and backend service. + """ + return self.on_unstage() + + def on_trigger(self) -> None | DeviceStatus: + """ + Specify actions to be executed upon receiving trigger signal. + Return a DeviceStatus object or None + """ + status = DeviceStatus(self.parent) + + # Soft triggering based on operation mode + if self.parent.autoSoftEnable.get() and self.parent.trigger_mode=='auto' and self.parent.enable_mode=='soft': + # BEC teststand operation mode: posedge of SoftEnable if Started + self.parent.cmdSoftEnable.set(0).wait() + self.parent.cmdSoftEnable.set(1).wait() + sleep_time = self.parent.cfgFramerate.value*self.parent.cfgCntNum.value*0.001+0.050 + # There's no status readback from the camera, so we just wait + sleep(sleep_time) + print(f"[GF2] Slept for: {sleep_time} seconds", file=sys.stderr) + else: + self.parent.cmdSoftTrigger.set(1).wait() + status.set_finished() + + return status + + +class GigaFrostClient(PSIDetectorBase): """Ophyd device class to control Gigafrost cameras at Tomcat The actual hardware is implemented by an IOC based on an old fork of Helge's @@ -47,654 +219,22 @@ class GigaFrostClient(Device): """ # pylint: disable=too-many-instance-attributes - infoBusyFlag = Component(EpicsSignalRO, "BUSY_STAT", auto_monitor=True) - infoSyncFlag = Component(EpicsSignalRO, "SYNC_FLAG", auto_monitor=True) - cmdSyncHw = Component(EpicsSignal, "SYNC_SWHW.PROC", put_complete=True) - cmdStartCamera = Component(EpicsSignal, "START_CAM", put_complete=True) - cmdSetParam = Component(EpicsSignal, "SET_PARAM.PROC", put_complete=True) - - # UDP header - cfgUdpNumPorts = Component(EpicsSignal, "PORTS", put_complete=True, kind=Kind.config) - cfgUdpNumFrames = Component(EpicsSignal, "FRAMENUM", put_complete=True, kind=Kind.config) - cfgUdpHtOffset = Component(EpicsSignal, "HT_OFFSET", put_complete=True, kind=Kind.config) - cmdWriteService = Component(EpicsSignal, "WRITE_SRV.PROC", put_complete=True) - - # Standard camera configs - cfgExposure = Component(EpicsSignal, "EXPOSURE", put_complete=True, auto_monitor=True, kind=Kind.config) - cfgFramerate = Component(EpicsSignal, "FRAMERATE", put_complete=True, auto_monitor=True, kind=Kind.config) - cfgRoiX = Component(EpicsSignal, "ROIX", put_complete=True, auto_monitor=True, kind=Kind.config) - cfgRoiY = Component(EpicsSignal, "ROIY", put_complete=True, auto_monitor=True, kind=Kind.config) - cfgScanId = Component(EpicsSignal, "SCAN_ID", put_complete=True, auto_monitor=True, kind=Kind.config) - cfgCntNum = Component(EpicsSignal, "CNT_NUM", put_complete=True, auto_monitor=True, kind=Kind.config) - cfgCorrMode = Component(EpicsSignal, "CORR_MODE", put_complete=True, auto_monitor=True, kind=Kind.config) - - # Software signals - cmdSoftEnable = Component(EpicsSignal, "SOFT_ENABLE", put_complete=True) - cmdSoftTrigger = Component(EpicsSignal, "SOFT_TRIG.PROC", put_complete=True) - cmdSoftExposure = Component(EpicsSignal, "SOFT_EXP", put_complete=True) - - # Trigger configuration PVs - cfgCntStartBit = Component( - EpicsSignal, - "CNT_STARTBIT_RBV", - write_pv="CNT_STARTBIT", - put_complete=True, - kind=Kind.config, - ) - cfgCntEndBit = Component( - EpicsSignal, "CNT_ENDBIT_RBV", write_pv="CNT_ENDBIT", put_complete=True, kind=Kind.config - ) - # Enable modes - cfgTrigEnableExt = Component( - EpicsSignal, - "MODE_ENBL_EXT_RBV", - write_pv="MODE_ENBL_EXT", - put_complete=True, - kind=Kind.config, - ) - cfgTrigEnableSoft = Component( - EpicsSignal, - "MODE_ENBL_SOFT_RBV", - write_pv="MODE_ENBL_SOFT", - put_complete=True, - kind=Kind.config, - ) - cfgTrigEnableAuto = Component( - EpicsSignal, - "MODE_ENBL_AUTO_RBV", - write_pv="MODE_ENBL_AUTO", - put_complete=True, - kind=Kind.config, - ) - cfgTrigVirtEnable = Component( - EpicsSignal, - "MODE_ENBL_EXP_RBV", - write_pv="MODE_ENBL_EXP", - put_complete=True, - kind=Kind.config, - ) - # Trigger modes - cfgTrigExt = Component( - EpicsSignal, - "MODE_TRIG_EXT_RBV", - write_pv="MODE_TRIG_EXT", - put_complete=True, - kind=Kind.config, - ) - cfgTrigSoft = Component( - EpicsSignal, - "MODE_TRIG_SOFT_RBV", - write_pv="MODE_TRIG_SOFT", - put_complete=True, - kind=Kind.config, - ) - cfgTrigTimer = Component( - EpicsSignal, - "MODE_TRIG_TIMER_RBV", - write_pv="MODE_TRIG_TIMER", - put_complete=True, - kind=Kind.config, - ) - cfgTrigAuto = Component( - EpicsSignal, - "MODE_TRIG_AUTO_RBV", - write_pv="MODE_TRIG_AUTO", - put_complete=True, - kind=Kind.config, - ) - # Exposure modes - cfgTrigExpExt = Component( - EpicsSignal, - "MODE_EXP_EXT_RBV", - write_pv="MODE_EXP_EXT", - put_complete=True, - kind=Kind.config, - ) - cfgTrigExpSoft = Component( - EpicsSignal, - "MODE_EXP_SOFT_RBV", - write_pv="MODE_EXP_SOFT", - put_complete=True, - kind=Kind.config, - ) - cfgTrigExpTimer = Component( - EpicsSignal, - "MODE_EXP_TIMER_RBV", - write_pv="MODE_EXP_TIMER", - put_complete=True, - kind=Kind.config, - ) - - # Line swap selection - cfgLineSwapSW = Component(EpicsSignal, "LS_SW", put_complete=True, kind=Kind.config) - cfgLineSwapNW = Component(EpicsSignal, "LS_NW", put_complete=True, kind=Kind.config) - cfgLineSwapSE = Component(EpicsSignal, "LS_SE", put_complete=True, kind=Kind.config) - cfgLineSwapNE = Component(EpicsSignal, "LS_NE", put_complete=True, kind=Kind.config) - cfgConnectionParam = Component( - EpicsSignal, "CONN_PARM", string=True, put_complete=True, kind=Kind.config - ) - - # HW settings as read only - cfgSyncFlag = Component(EpicsSignalRO, "PIXRATE", auto_monitor=True) - cfgTrigDelay = Component(EpicsSignalRO, "TRIG_DELAY", auto_monitor=True) - cfgSyncoutDelay = Component(EpicsSignalRO, "SYNCOUT_DLY", auto_monitor=True) - cfgOutputPolarity0 = Component(EpicsSignalRO, "BNC0_RBV", auto_monitor=True) - cfgOutputPolarity1 = Component(EpicsSignalRO, "BNC1_RBV", auto_monitor=True) - cfgOutputPolarity2 = Component(EpicsSignalRO, "BNC2_RBV", auto_monitor=True) - cfgOutputPolarity3 = Component(EpicsSignalRO, "BNC3_RBV", auto_monitor=True) - cfgInputPolarity1 = Component(EpicsSignalRO, "BNC4_RBV", auto_monitor=True) - cfgInputPolarity2 = Component(EpicsSignalRO, "BNC5_RBV", auto_monitor=True) - infoBoardTemp = Component(EpicsSignalRO, "T_BOARD", auto_monitor=True) - - USER_ACCESS = ["exposure_mode", "fix_nframes_mode", "trigger_mode", "enable_mode"] - - def __init__( - self, - prefix="", - *, - name, - auto_soft_enable=False, - timeout=10, - backend_url=const.BE999_DAFL_CLIENT, - kind=None, - read_attrs=None, - configuration_attrs=None, - parent=None, - **kwargs, - ): - super().__init__( - prefix=prefix, - name=name, - kind=kind, - read_attrs=read_attrs, - configuration_attrs=configuration_attrs, - parent=parent, - **kwargs, - ) - self._auto_soft_enable = auto_soft_enable - self._timeout = timeout - self._backend_url = backend_url - - self.state = const.GfStatus.NEW - self._settings = None - self._north_mac, self._south_mac = self._define_backend_mac() - self._north_ip, self._south_ip = self._define_backend_ip() - - self._valid_enable_modes = ("soft", "external", "soft+ext", "always") - self._valid_exposure_modes = ("external", "timer", "soft") - self._valid_trigger_modes = ("auto", "external", "timer", "soft") - self._valid_fix_nframe_modes = ("off", "start", "end", "start+end") - - # Continue initialization - self.initialize() - - def initialize(self): - """Initialize the camera, set channel values""" - ## Stop acquisition - self.cmdStartCamera.set(0).wait() - - ### set entry to UDP table - # number of UDP ports to use - self.cfgUdpNumPorts.set(2).wait() - # number of images to send to each UDP port before switching to next - self.cfgUdpNumFrames.set(5).wait() - # offset in UDP table - where to find the first entry - self.cfgUdpHtOffset.set(0).wait() - # activate changes - self.cmdWriteService.set(1).wait() - - # Configure software triggering if needed - if self._auto_soft_enable: - # trigger modes - self.cfgCntStartBit.set(1).wait() - self.cfgCntEndBit.set(0).wait() - - # set modes - self.enable_mode = "soft" - self.trigger_mode = "auto" - self.exposure_mode = "timer" - - # line swap - on for west, off for east - self.cfgLineSwapSW.set(1).wait() - self.cfgLineSwapNW.set(1).wait() - self.cfgLineSwapSE.set(0).wait() - self.cfgLineSwapNE.set(0).wait() - - # Commit parameters - self.cmdSetParam.set(1).wait() - - self._set_udp_header_table() - self.state = const.GfStatus.INIT - - # sets the basic settings - self._settings = { - "backend_url": self._backend_url, - "auto_soft_enable": self._auto_soft_enable, - "ioc_name": self.prefix, - } - - def configure( - self, - nimages=10, - exposure=0.2, - period=1.0, - roix=2016, - roiy=2016, - scanid=0, - correction_mode=5, - ): - """Configure the next scan with the GigaFRoST camera - - Parameters - ---------- - nimages : int, optional - Number of images to be taken during each scan. Set to -1 for an - unlimited number of images (limited by the ringbuffer size and - backend speed). (default = 10) - exposure : float, optional - Exposure time [ms]. (default = 0.2) - period : float, optional - Exposure period [ms], ignored in soft trigger mode. (default = 1.0) - roix : int, optional - ROI size in the x-direction [pixels] (default = 2016) - roiy : int, optional - ROI size in the y-direction [pixels] (default = 2016) - scanid : int, optional - Scan identification number to be associated with the scan data - (default = 0) - correction_mode : int, optional - The correction to be applied to the imaging data. The following - modes are available (default = 5): - - * 0: Bypass. No corrections are applied to the data. - * 1: Send correction factor A instead of pixel values - * 2: Send correction factor B instead of pixel values - * 3: Send correction factor C instead of pixel values - * 4: Invert pixel values, but do not apply any linearity correction - * 5: Apply the full linearity correction - """ - # If Bluesky style configure - if isinstance(nimages, dict): - d = nimages.copy() - nimages = d.get('nimages', 10) - exposure = d.get('exposure', exposure) - period = d.get('period', period) - roix = d.get('roix', roix) - roiy = d.get('roiy', roiy) - scanid = d.get('scanid', scanid) - correction_mode = d.get('correction_mode', correction_mode) - - # Stop acquisition - self.cmdStartCamera.set(0).wait() - if self._auto_soft_enable: - self.cmdSoftEnable.set(0).wait() - - # change settings - self.cfgExposure.set(exposure).wait() - self.cfgFramerate.set(period).wait() - self.cfgRoiX.set(roix).wait() - self.cfgRoiY.set(roiy).wait() - self.cfgScanId.set(scanid).wait() - self.cfgCntNum.set(nimages).wait() - self.cfgCorrMode.set(correction_mode).wait() - - # Commit parameter - self.cmdSetParam.set(1).wait() - self.state = const.GfStatus.CONFIGURED - - self._settings = { - "nimages": nimages, - "exposure": exposure, - "frame_rate": period, - "roix": roix, - "roiy": roiy, - "scanid": scanid, - "correction_mode": correction_mode, - "backend_url": self._backend_url, - "auto_soft_enable": self._auto_soft_enable, - "ioc_name": self.name, - } + cam = Component(GigaFrostCamera, "", auto_monitor=True) + daq = Component(StdDaqClient, "", auto_monitor=True) def stage(self): - """Standard ophyd method to start an acquisition + px_daq_h = self.daq.config.cfg_image_pixel_height.get() + px_daq_w = self.daq.config.cfg_image_pixel_width.get() - In ophyd it still needs to be triggered to perfor an actual capture. - """ - if self.infoBusyFlag.value: - raise RuntimeError("Camera is already busy, unstage it first!") + px_gf_h = self.cam.cfgRoiX.get() + px_gf_y = self.cam.cfgRoiY.get() - # change to running - self.cmdStartCamera.set(1).wait() - # soft trigger on (DISABLED: use trigger() in ophyd) - #if self._auto_soft_enable: - # self.cmdSoftEnable.set(1).wait() - self.state = const.GfStatus.ACQUIRING - - # Gigafrost can finish a run without explicit unstaging - self._staged = Staged.no + if px_daq_h != px_gf_h or px_daq_w != px_gf_w: + raise RuntimeError(f"Different image size configured on GF and the DAQ") + return super().stage() - - def unstage(self): - """Standard ophyd method to finish an acquisition""" - # switch to idle - self.cmdStartCamera.set(0).wait() - if self._auto_soft_enable: - self.cmdSoftEnable.set(0).wait() - self.state = const.GfStatus.STOPPED - return super().unstage() - - def stop(self): - """Standard ophyd method to stop an acquisition""" - self.unstage() - - def trigger(self) -> DeviceStatus: - """Sends a software trigger and approximately waits to finnish""" - status = DeviceStatus(self) - - # Soft triggering based on operation mode - if self._auto_soft_enable and self.trigger_mode=='auto' and self.enable_mode=='soft': - # BEC teststand operation mode: posedge of SoftEnable if Started - self.cmdSoftEnable.set(0).wait() - self.cmdSoftEnable.set(1).wait() - sleep_time = self.cfgFramerate.value*self.cfgCntNum.value*0.001+0.050 - sleep(sleep_time) - print(f"[GF2] Slept for: {sleep_time} seconds", file=sys.stderr) - else: - self.cmdSoftTrigger.set(1).wait() - status.set_finished() - - return status - - @property - def exposure_mode(self): - """Returns the current exposure mode of the GigaFRost camera. - - Returns - ------- - exp_mode : {'external', 'timer', 'soft'} - The camera's active exposure mode. - If more than one mode is active at the same time, it returns None. - """ - mode_soft = self.cfgTrigExpSoft.get() - mode_timer = self.cfgTrigExpTimer.get() - mode_external = self.cfgTrigExpExt.get() - if mode_soft and not mode_timer and not mode_external: - return "soft" - elif not mode_soft and mode_timer and not mode_external: - return "timer" - elif not mode_soft and not mode_timer and mode_external: - return "external" - else: - return None - - @exposure_mode.setter - def exposure_mode(self, exp_mode): - """Apply the exposure mode for the GigaFRoST camera. - - Parameters - ---------- - exp_mode : {'external', 'timer', 'soft'} - The exposure mode to be set. - """ - if exp_mode not in self._valid_exposure_modes: - raise ValueError( - f"Invalid exposure mode! Valid modes are:\n" "{self._valid_exposure_modes}" - ) - - if exp_mode == "external": - self.cfgTrigExpExt.set(1).wait() - self.cfgTrigExpSoft.set(0).wait() - self.cfgTrigExpTimer.set(0).wait() - elif exp_mode == "timer": - self.cfgTrigExpExt.set(0).wait() - self.cfgTrigExpSoft.set(0).wait() - self.cfgTrigExpTimer.set(1).wait() - elif exp_mode == "soft": - self.cfgTrigExpExt.set(0).wait() - self.cfgTrigExpSoft.set(1).wait() - self.cfgTrigExpTimer.set(0).wait() - # Commit parameters - self.cmdSetParam.set(1).wait() - - @property - def fix_nframes_mode(self): - """Return the current fixed number of frames mode of the GigaFRoST camera. - - Returns - ------- - fix_nframes_mode : {'off', 'start', 'end', 'start+end'} - The camera's active fixed number of frames mode. - """ - start_bit = self.cfgCntStartBit.get() - end_bit = self.cfgCntStartBit.get() - - if not start_bit and not end_bit: - return "off" - elif start_bit and not end_bit: - return "start" - elif not start_bit and end_bit: - return "end" - elif start_bit and end_bit: - return "start+end" - else: - return None - - @fix_nframes_mode.setter - def fix_nframes_mode(self, mode): - """Apply the fixed number of frames settings to the GigaFRoST camera. - - Parameters - ---------- - mode : {'off', 'start', 'end', 'start+end'} - The fixed number of frames mode to be applied. - """ - if mode not in self._valid_fix_nframe_modes: - raise ValueError( - "Invalid fixed number of frames mode! " - "Valid modes are:\n{}".format(self._valid_fix_nframe_modes) - ) - - self._fix_nframes_mode = mode - if self._fix_nframes_mode == "off": - self.cfgCntStartBit.set(0).wait() - self.cfgCntEndBit.set(0).wait() - elif self._fix_nframes_mode == "start": - self.cfgCntStartBit.set(1).wait() - self.cfgCntEndBit.set(0).wait() - elif self._fix_nframes_mode == "end": - self.cfgCntStartBit.set(0).wait() - self.cfgCntEndBit.set(1).wait() - elif self._fix_nframes_mode == "start+end": - self.cfgCntStartBit.set(1).wait() - self.cfgCntEndBit.set(1).wait() - # Commit parameters - self.cmdSetParam.set(1).wait() - - @property - def trigger_mode(self): - """Method to detect the current trigger mode set in the GigaFRost camera. - - Returns - ------- - mode : {'auto', 'external', 'timer', 'soft'} - The camera's active trigger mode. If more than one mode is active - at the moment, None is returned. - """ - mode_auto = self.cfgTrigAuto.get() - mode_external = self.cfgTrigExt.get() - mode_timer = self.cfgTrigTimer.get() - mode_soft = self.cfgTrigSoft.get() - if mode_auto: - return "auto" - elif mode_soft: - return "soft" - elif mode_timer: - return "timer" - elif mode_external: - return "external" - else: - return None - - @trigger_mode.setter - def trigger_mode(self, mode): - """Set the trigger mode for the GigaFRoST camera. - - Parameters - ---------- - mode : {'auto', 'external', 'timer', 'soft'} - The GigaFRoST trigger mode. - """ - if mode not in self._valid_trigger_modes: - raise ValueError( - "Invalid trigger mode! Valid modes are:\n" "{self._valid_trigger_modes}" - ) - - if mode == "auto": - self.cfgTrigAuto.set(1).wait() - self.cfgTrigSoft.set(0).wait() - self.cfgTrigTimer.set(0).wait() - self.cfgTrigExt.set(0).wait() - elif mode == "external": - self.cfgTrigAuto.set(0).wait() - self.cfgTrigSoft.set(0).wait() - self.cfgTrigTimer.set(0).wait() - self.cfgTrigExt.set(1).wait() - elif mode == "timer": - self.cfgTrigAuto.set(0).wait() - self.cfgTrigSoft.set(0).wait() - self.cfgTrigTimer.set(1).wait() - self.cfgTrigExt.set(0).wait() - elif mode == "soft": - self.cfgTrigAuto.set(0).wait() - self.cfgTrigSoft.set(1).wait() - self.cfgTrigTimer.set(0).wait() - self.cfgTrigExt.set(0).wait() - # Commit parameters - self.cmdSetParam.set(1).wait() - - @property - def enable_mode(self): - """Return the enable mode set in the GigaFRost camera. - - Returns - ------- - enable_mode: {'soft', 'external', 'soft+ext', 'always'} - The camera's active enable mode. - """ - mode_soft = self.cfgTrigEnableSoft.get() - mode_external = self.cfgTrigEnableExt.get() - mode_auto = self.cfgTrigEnableAuto.get() - if mode_soft and not mode_auto: - if mode_external: - return "soft+ext" - else: - return "soft" - elif mode_auto and not mode_soft and not mode_external: - return "always" - elif mode_external and not mode_soft and not mode_auto: - return "external" - else: - return None - - @enable_mode.setter - def enable_mode(self, mode): - """Apply the enable mode for the GigaFRoST camera. - - Parameters - ---------- - mode : {'soft', 'external', 'soft+ext', 'always'} - The enable mode to be applied. - """ - if mode not in self._valid_enable_modes: - raise ValueError( - "Invalid enable mode! Valid modes are:\n" "{self._valid_enable_modes}" - ) - - if mode == "soft": - self.cfgTrigEnableExt.set(0).wait() - self.cfgTrigEnableSoft.set(1).wait() - self.cfgTrigEnableAuto.set(0).wait() - elif mode == "external": - self.cfgTrigEnableExt.set(1).wait() - self.cfgTrigEnableSoft.set(0).wait() - self.cfgTrigEnableAuto.set(0).wait() - elif mode == "soft+ext": - self.cfgTrigEnableExt.set(1).wait() - self.cfgTrigEnableSoft.set(1).wait() - self.cfgTrigEnableAuto.set(0).wait() - elif mode == "always": - self.cfgTrigEnableExt.set(0).wait() - self.cfgTrigEnableSoft.set(0).wait() - self.cfgTrigEnableAuto.set(1).wait() - # Commit parameters - self.cmdSetParam.set(1).wait() - - def get_state(self): - return self.state - - def get_south_mac(self): - return self._south_mac - - def get_north_mac(self): - return self._north_mac - - def get_north_ip(self): - return self._north_ip - - def get_south_ip(self): - return self._south_ip - - def get_backend_url(self): - """Method to read the configured backend URL""" - return self._backend_url - - def set_backend_ip(self, north, south): - """Method to manually set the backend ip""" - self._north_ip, self._south_ip = north, south - - def set_backend_mac(self, north, south): - """Method to manually set the backend mac""" - self._north_mac, self._south_mac = north, south - - def _build_udp_header_table(self): - """Build the header table for the communication""" - udp_header_table = [] - - for i in range(0, 64, 1): - for j in range(0, 8, 1): - dest_port = 2000 + 8 * i + j - source_port = 3000 + j - if j < 4: - extend_header_table( - udp_header_table, self._south_mac, self._south_ip, dest_port, source_port - ) - else: - extend_header_table( - udp_header_table, self._north_mac, self._north_ip, dest_port, source_port - ) - - return udp_header_table - - def _define_backend_ip(self): - if self._backend_url == const.BE3_DAFL_CLIENT: # xbl-daq-33 - return const.BE3_NORTH_IP, const.BE3_SOUTH_IP - elif self._backend_url == const.BE999_DAFL_CLIENT: - return const.BE999_NORTH_IP, const.BE999_SOUTH_IP - else: - raise RuntimeError(f"Backend not recognized. {(const.GF1, const.GF2, const.GF3)}") - - def _define_backend_mac(self): - if self._backend_url == const.BE3_DAFL_CLIENT: # xbl-daq-33 - return const.BE3_NORTH_MAC, const.BE3_SOUTH_MAC - elif self._backend_url == const.BE999_DAFL_CLIENT: - return const.BE999_NORTH_MAC, const.BE999_SOUTH_MAC - else: - raise RuntimeError(f"Backend not recognized. {(const.GF1, const.GF2, const.GF3)}") - - def _set_udp_header_table(self): - """Set the communication parameters for the camera module""" - self.cfgConnectionParam.set(self._build_udp_header_table()).wait() - + + # Automatically connect to MicroSAXS testbench if directly invoked if __name__ == "__main__":