Starting to rewise classes

This commit is contained in:
gac-x05la
2024-11-20 14:47:04 +01:00
committed by mohacsi_i
parent 43dbda8b12
commit 6d5b61eb87
11 changed files with 347 additions and 510 deletions

View File

@@ -138,6 +138,7 @@ gfcam:
auto_soft_enable: true
deviceTags:
- camera
- trigger
enabled: true
onFailure: buffer
readOnly: false

View File

@@ -10,7 +10,6 @@ from .aerotech import (
from .grashopper_tomcat import GrashopperTOMCAT
from .psimotor import EpicsMotorMR, EpicsMotorEC
from .gigafrost.gigafrostclient import GigaFrostClient
from .gigafrost.gigafrostcamera import GigaFrostCamera
from .gigafrost.stddaq_client import StdDaqClient
from .gigafrost.stddaq_preview import StdDaqPreviewDetector

View File

@@ -23,34 +23,33 @@ class AerotechDriveDataCollectionMixin(CustomDeviceMixin):
# parent : aa1Tasks
def on_stage(self) -> None:
"""Configuration and staging"""
# Fish out our configuration from scaninfo (via explicit or generic addressing)
# Fish out configuration from scaninfo (does not need to be full configuration)
scanparam = self.parent.scaninfo.scan_msg.info
alias = self.parent.parent.name if self.parent.parent is not None else self.parent.name
logger.warning(f"[{alias}] Scan parameters:\n{scanparam}")
d = {}
if "kwargs" in scanparam:
scanargs = scanparam["kwargs"]
for prefix in ["", alias + "_"]:
if f"{prefix}num_points_total" in scanargs:
d["num_points_total"] = scanargs[f"{prefix}num_points_total"]
if f"{prefix}ddc_trigger" in scanargs:
d["ddc_trigger"] = scanargs[f"{prefix}ddc_trigger"]
if f"{prefix}ddc_source0" in scanargs:
d["ddc_source0"] = scanargs[f"{prefix}ddc_source0"]
if f"{prefix}ddc_source1" in scanargs:
d["ddc_source1"] = scanargs[f"{prefix}ddc_source1"]
# NOTE: Scans don't have to fully configure the device
if "ddc_trigger" in scanargs:
d["ddc_trigger"] = scanargs["ddc_trigger"]
if "steps" in scanargs and "exp_burst" in scanargs:
scan_steps = scanargs["steps"]
scan_burst = scanargs["exp_burst"]
d["num_points_total"] = (scan_steps+1) * scan_burst
elif "exp_burst" in scanargs:
d["num_points_total"] = scanargs["exp_burst"]
elif "steps" in scanargs:
d["num_points_total"] = scanargs["steps"]
# Perform bluesky-style configuration
if len(d) > 0:
logger.warning(f"[{self.parent.name}] Configuring with:\n{d}")
self.parent.configure(d=d)
# Only start acquisition if there was config
if len(d) == 0:
logger.warning(f"[{self.parent.name}] No configuration to stage.")
return
# Stage the DDC distance module
# NOTE: This expects explicit device disabling when not configured
self.parent._switch.set("Start", settle_time=0.2).wait()
def on_unstage(self):
@@ -72,10 +71,14 @@ class aa1AxisDriveDataCollection(PSIDeviceBase):
# Configure the DDC with default internal triggers
ddc = aa1AxisPsoDistance(AA1_IOC_NAME+":ROTY:DDC:", name="ddc")
ddc.wait_for_connection()
ddc.configure(d={'npoints': 5000})
ddc.kickoff().wait()
ddc.configure(d={'num_points_total': 5000})
ddc.stage()
...
ret = yield from ddc.collect()
NOTE: Expected behavior is that the device is disabled when not in use,
i.e. there's avtive enable/disable management.
"""
# ########################################################################
@@ -107,17 +110,16 @@ class aa1AxisDriveDataCollection(PSIDeviceBase):
Aerotech API allows the simultaneous capture of two signals into the
limited amount of local DriveArray (2-16 MB/axis).
"""
num_points = int(d["num_points_total"])
ddc_trigger = d.get("ddc_trigger", DriveDataCaptureTrigger.PsoOutput)
ddc_source0 = d.get("ddc_source0", DriveDataCaptureInput.PrimaryFeedback)
ddc_source1 = d.get("ddc_source1", DriveDataCaptureInput.PositionCommand)
old = self.read_configuration()
self._input0.set(ddc_source0).wait()
self._input1.set(ddc_source1).wait()
self._trigger.set(ddc_trigger).wait()
self.npoints.set(num_points).wait()
if "num_points_total" in d:
self.npoints.set(d["num_points_total"]).wait()
if "ddc_trigger" in d:
self._trigger.set(d['ddc_trigger']).wait()
if "ddc_source0" in d:
self._input0.set(d['ddc_source0']).wait()
if "ddc_source1" in d:
self._input1.set(d['ddc_source1']).wait()
# Reset incremental readback
self._switch.set("ResetRB", settle_time=0.1).wait()

View File

@@ -14,38 +14,40 @@ logger = bec_logger.logger
class AerotechPsoDistanceMixin(CustomDeviceMixin):
# parent : aa1Tasks
def on_stage(self) -> None:
"""Configuration and staging"""
"""Configuration and staging
NOTE: Scans don't have to fully configure the device, that can be done
manually outside. However we expect that the device is disabled
when not in use. I.e. this method is not expected to be called when
PSO is not needed or when it'd conflict with other devices.
"""
# logger.warning(self.parent.scaninfo.scan_msg.info['kwargs'].keys())
# Fish out our configuration from scaninfo (via explicit or generic addressing)
# Fish out configuration from scaninfo
# NOTE: Scans don't have to fully configure the device, but it is expected that it's
scanparam = self.parent.scaninfo.scan_msg.info
alias = self.parent.parent.name if self.parent.parent is not None else self.parent.name
logger.warning(f"[{alias}] Scan parameters:\n{scanparam}")
d = {}
if "kwargs" in scanparam:
scanargs = scanparam["kwargs"]
for prefix in ["", alias + "_"]:
if f"{prefix}pso_distance" in scanargs:
d["pso_distance"] = scanargs[f"{prefix}pso_distance"]
if f"{prefix}pso_wavemode" in scanargs:
d["pso_wavemode"] = scanargs[f"{prefix}pso_wavemode"]
if f"{prefix}pso_w_pulse" in scanargs:
d["pso_w_pulse"] = scanargs[f"{prefix}pso_w_pulse"]
if f"{prefix}pso_t_pulse" in scanargs:
d["pso_t_pulse"] = scanargs[f"{prefix}pso_t_pulse"]
if f"{prefix}pso_n_pulse" in scanargs:
d["pso_n_pulse"] = scanargs[f"{prefix}pso_n_pulse"]
if "pso_distance" in scanargs:
d["pso_distance"] = scanargs["pso_distance"]
if "pso_wavemode" in scanargs:
d["pso_wavemode"] = scanargs["pso_wavemode"]
if "pso_w_pulse" in scanargs:
d["pso_w_pulse"] = scanargs["pso_w_pulse"]
if "pso_t_pulse" in scanargs:
d["pso_t_pulse"] = scanargs["pso_t_pulse"]
if "pso_n_pulse" in scanargs:
d["pso_n_pulse"] = scanargs["pso_n_pulse"]
# Perform bluesky-style configuration
if len(d) > 0:
logger.info(f"[{self.parent.name}] Configuring with:\n{d}")
self.parent.configure(d=d)
# Only start acquisition if there was config
if len(d) == 0:
logger.warning(f"[{self.parent.name}] No configuration to stage.")
return
# Stage the PSO distance module
if isinstance(self.parent._distance_value, (np.ndarray, list, tuple)):
self.dstArrayRearm.set(1).wait()
@@ -81,83 +83,6 @@ class AerotechPsoDistanceMixin(CustomDeviceMixin):
return status
class AerotechPsoWindowMixin(CustomDeviceMixin):
# parent : aa1Tasks
def on_stage(self) -> None:
"""Configuration and staging"""
logger.warning(self.parent.scaninfo.__dict__)
# Fish out our configuration from scaninfo (via explicit or generic addressing)
prefix = self.parent.parent.name if self.parent.parent is not None else self.parent.name
d = {}
if hasattr(self.parent.scaninfo, prefix + "_pso_bounds"):
val = str(getattr(self.parent.scaninfo, prefix + "_pso_bounds"))
d["pso_bounds"] = val
if hasattr(self.parent.scaninfo, prefix + "_pso_wavemode"):
val = str(getattr(self.parent.scaninfo, prefix + "_pso_wavemode"))
d["pso_wavemode"] = val
if hasattr(self.parent.scaninfo, prefix + "_pso_windevent"):
val = str(getattr(self.parent.scaninfo, prefix + "_pso_windevent"))
d["pso_windevent"] = val
if hasattr(self.parent.scaninfo, prefix + "_pso_w_pulse"):
val = getattr(self.parent.scaninfo, prefix + "_pso_w_pulse")
d["pso_w_pulse"] = val
if hasattr(self.parent.scaninfo, prefix + "_pso_t_pulse"):
val = getattr(self.parent.scaninfo, prefix + "_pso_t_pulse")
d["pso_t_pulse"] = val
if hasattr(self.parent.scaninfo, prefix + "_pso_n_pulse"):
val = getattr(self.parent.scaninfo, prefix + "_pso_n_pulse")
d["pso_n_pulse"] = val
if hasattr(self.parent.scaninfo, "pso_bounds"):
val = str(getattr(self.parent.scaninfo, "pso_bounds"))
d["pso_bounds"] = val
if hasattr(self.parent.scaninfo, "pso_wavemode"):
val = str(getattr(self.parent.scaninfo, "pso_wavemode"))
d["pso_wavemode"] = val
if hasattr(self.parent.scaninfo, "pso_windevent"):
val = str(getattr(self.parent.scaninfo, "pso_windevent"))
d["pso_windevent"] = val
if hasattr(self.parent.scaninfo, "pso_w_pulse"):
val = getattr(self.parent.scaninfo, "pso_w_pulse")
d["pso_w_pulse"] = val
if hasattr(self.parent.scaninfo, "pso_t_pulse"):
val = getattr(self.parent.scaninfo, "pso_t_pulse")
d["pso_t_pulse"] = val
if hasattr(self.parent.scaninfo, "pso_n_pulse"):
val = getattr(self.parent.scaninfo, "pso_n_pulse")
d["pso_n_pulse"] = val
# Perform bluesky-style configuration
if len(d) > 0:
self.parent.configure(d=d)
# Only start acquisition if there was config
if len(d) == 0:
return
# Stage the PSO window module
self.winCounter.set(0).wait()
if self.outSource.get() in ["Window", 2]:
self.winOutput.set("On").wait()
else:
self.winEvents.set(self._eventMode).wait()
def on_unstage(self):
"""Standard bluesky unstage"""
# Ensure output is set to low
# if self.parent.output.value:
# self.parent.toggle()
# Turn off window mode
self.parent.winOutput.set("Off").wait()
self.parent.winEvents.set("Off").wait()
# Turn off distance mode
self.parent.dstEventsEna.set("Off").wait()
self.parent.dstCounterEna.set("Off").wait()
# Disable output
self.parent.outSource.set("None").wait()
# Sleep for one poll period
sleep(0.2)
class aa1AxisPsoBase(PSIDeviceBase):
"""Position Sensitive Output - Base class
@@ -382,69 +307,3 @@ class aa1AxisPsoDistance(aa1AxisPsoBase):
# status = DeviceStatus(self)
# status.set_finished()
# return status
class aa1AxisPsoWindow(aa1AxisPsoBase):
"""Position Sensitive Output - Window mode
This class provides convenience wrappers around the Aerotech API's PSO functionality in window
mode. It can either use the event-waveform concept or provide a direct window in/out output
signal on the output pin. The latter is well-suited for the generation of trigger enable
signals, while event mode allows finetuning of trigger waveform. So the simplified pipeline
structure are:
Genrator --> Event --> Waveform --> Output pin
Genrator --> Window output --> Output pin
NOTE: PSO module has 32 bit counters, and windows are defined in absolute coordinates. This
makes them prone to integer overflows.
"""
custom_prepare_cls = AerotechPsoWindowMixin
USER_ACCESS = ["configure", "prepare", "toggle"]
_eventMode = "Enter"
# ########################################################################
# PSO high level interface
def configure(self, d: dict = {}) -> tuple:
"""Simplified configuration interface to access the most common
functionality for distance mode PSO.
:param pso_bounds: The trigger window or the array of windows.
:param pso_wavemode: Waveform mode configuration, usually output/pulsed/toggled.
:param pso_windevent: Event mode configuration, usually Off/Enter/Exit/Both.
"""
pso_bounds = d["pso_bounds"]
pso_wavemode = str(d["pso_wavemode"])
pso_eventmode = d.get("pso_windevent", "Enter")
# Validate input parameters
if pso_wavemode not in ["pulse", "pulsed", "toggle", "toggled", "output", "flag"]:
raise RuntimeError(f"Unsupported window triggering mode: {pso_wavemode}")
if len(pso_bounds) % 2 == 1:
raise RuntimeError(
f"Window mode requires an even number of bounds, got: {len(pso_bounds)}"
)
self._eventMode = pso_eventmode
old = self.read_configuration()
# Configure the window module
# Set the window ranges (MUST be in start position)
if len(pso_bounds) == 2:
self.winCounter.set(0).wait()
self._winLower.set(pso_bounds[0]).wait()
self._winUpper.set(pso_bounds[1]).wait()
elif isinstance(pso_bounds, (np.ndarray, list, tuple)):
self.winCounter.set(0).wait()
self.winBoundsArr.set(pso_bounds).wait()
# Don't start triggering just yet
self.winOutput.set("Off").wait()
self.winEvents.set("Off").wait()
# Configure the pulsed/toggled/output waveform
super().configure(d)
new = self.read_configuration()
return (old, new)

View File

@@ -1,5 +1,5 @@
from time import sleep
import jinja2
from ophyd import Component, EpicsSignal, EpicsSignalRO, Kind
from ophyd.status import DeviceStatus, SubscriptionStatus
@@ -20,7 +20,10 @@ class AerotechTasksMixin(CustomDeviceMixin):
In the BEC model ophyd devices must fish out their own configuration from the 'scaninfo'.
I.e. they need to know which parameters are relevant for them at each scan.
NOTE: Tomcat might use multiple cameras with their own separate DAQ instances.
NOTE: Scans don't have to fully configure the device, that can be done
manually outside. However we expect that the device is disabled
when not in use. I.e. this method is not expected to be called when
PSO is not needed or when it'd conflict with other devices.
"""
# logger.warning(self.parent.scaninfo.scan_msg.info['kwargs'].keys())
@@ -29,28 +32,30 @@ class AerotechTasksMixin(CustomDeviceMixin):
alias = self.parent.parent.name if self.parent.parent is not None else self.parent.name
logger.warning(f"[{alias}] Scan parameters:\n{scanparam}")
d = {}
if "kwargs" in scanparam:
scanargs = scanparam["kwargs"]
for prefix in ["", alias + "_"]:
if f"{prefix}script_text" in scanargs:
d["script_text"] = scanargs[f"{prefix}script_text"]
if f"{prefix}script_file" in scanargs:
d["script_file"] = scanargs[f"{prefix}script_file"]
if f"{prefix}script_mode" in scanargs:
d["script_mode"] = scanargs[f"{prefix}script_mode"]
if f"{prefix}script_task" in scanargs:
d["script_task"] = scanargs[f"{prefix}script_task"]
if self.parent.scaninfo == "script":
# NOTE: Scans don't have to fully configure the device
if "script_text" in scanargs:
d["script_text"] = scanargs["script_text"]
if "script_file" in scanargs:
d["script_file"] = scanargs["script_file"]
if "script_task" in scanargs:
d["script_task"] = scanargs["script_task"]
if self.parent.scaninfo == "subs":
# NOTE: But if we ask for substitutions, we need the filename
filename = scanargs["script_template"]
filesubs = scanargs
d["script_text"] = self.render_file(filename, filesubs)
if "script_task" in scanargs:
d["script_task"] = scanargs["script_task"]
# Perform bluesky-style configuration
if len(d) > 0:
logger.warning(f"[{self.parent.name}] Configuring with:\n{d}")
self.parent.configure(d=d)
# Only start acquisition if there was config
if len(d) == 0:
logger.warning(f"[{self.parent.name}] No configuration to stage.")
return
# The actual staging
settle_time = 0.2
if self.parent._is_configured:
@@ -75,6 +80,18 @@ class AerotechTasksMixin(CustomDeviceMixin):
"""Stop the currently selected task"""
self.parent.switch.set("Stop").wait()
def render_file(self, filename, filesubs):
"""Render AeroScript file with Jinja"""
# Load the test file
logger.info(f"Attempting to load file {filename}")
with open(filename) as f:
templatetext = f.read()
# Substitute jinja template
tm = jinja2.Template(templatetext)
filetext = tm.render(scan=filesubs)
return filetext
class aa1Tasks(PSIDeviceBase):
"""Task management API
@@ -132,30 +149,28 @@ class aa1Tasks(PSIDeviceBase):
# Unrolling the configuration dict
script_text = d.get("script_text", None)
script_file = d.get("script_file", None)
script_task = d.get("script_task", 4)
script_mode = d.get("script_mode", None)
# Validation
if script_task < 1 or script_task > 31:
raise RuntimeError(f"Invalid task index: {script_task}")
if (script_text is None) and (script_file is None):
raise RuntimeError("Task execution requires either AeroScript text or filename")
# Common operations
old = self.read_configuration()
self.taskIndex.set(script_task).wait()
self._text_to_execute = None
self.switch.set("Reset").wait()
if "script_task" in d:
if d['script_task'] < 1 or d['script_task'] > 31:
raise RuntimeError(f"Invalid task index: {d['script_task']}")
self.taskIndex.set(d['script_task']).wait()
if "script_mode" in d:
self._executeMode.set(d['script_mode']).wait()
# Choose the right execution mode
if (script_file is None) and (script_text not in [None, ""]):
# Direct command execution from string
print("Preparing for direct command execution")
logger.info(f"[{self.name}] Preparing for direct text command execution")
if script_mode is not None:
self._executeMode.set(script_mode).wait()
# Compile for syntax checking
self.taskIndex.set(script_task).wait()
self.fileName.set("foobar.ascript").wait()
self._fileWrite.set(script_text).wait()
self.switch.set("Load").wait()
@@ -176,7 +191,6 @@ class aa1Tasks(PSIDeviceBase):
elif (script_file is not None) and (script_text not in [None, ""]):
logger.info(f"[{self.name}] Preparing to execute text via file '{script_file}'")
# Execute text via intermediate file
self.taskIndex.set(script_task).wait()
self.fileName.set(script_file).wait()
self._fileWrite.set(script_text).wait()
self.switch.set("Load").wait()

View File

@@ -1,5 +1,5 @@
from .AerotechTasks import aa1Tasks
from .AerotechPso import aa1AxisPsoDistance, aa1AxisPsoWindow
from .AerotechPso import aa1AxisPsoDistance
from .AerotechDriveDataCollection import aa1AxisDriveDataCollection
from .AerotechAutomation1 import aa1Controller, aa1GlobalVariables, aa1GlobalVariableBindings, aa1AxisIo

View File

@@ -166,34 +166,24 @@ class GigaFrostCameraMixin(CustomDetectorMixin):
d = {}
if 'kwargs' in scanparam:
scanargs = scanparam['kwargs']
for prefix in ["", alias + "_"]:
if f'{prefix}image_width' in scanargs:
d['image_width'] = scanargs[f'{prefix}image_width']
if f'{prefix}image_height' in scanargs:
d['image_height'] = scanargs[f'{prefix}image_height']
if f'{prefix}exposure_num_burst' in scanargs:
d['exposure_num_burst'] = scanargs[f'{prefix}exposure_num_burst']
if f'{prefix}exposure_time_ms' in scanargs:
d['exposure_time_ms'] = scanargs[f'{prefix}exposure_time_ms']
if f'{prefix}exposure_period_ms' in scanargs:
d['exposure_period_ms'] = scanargs[f'{prefix}exposure_period_ms']
if f'{prefix}correction_mode' in scanargs:
d['correction_mode'] = scanargs[f'{prefix}correction_mode']
if f'{prefix}scanid' in scanargs:
d['scanid'] = scanargs[f'{prefix}scanid']
if f'{prefix}trigger_mode' in scanargs:
d['trigger_mode'] = scanargs[f'{prefix}trigger_mode']
if 'image_width' in scanargs:
d['image_width'] = scanargs['image_width']
if 'image_height' in scanargs:
d['image_height'] = scanargs['image_height']
if 'exp_time' in scanargs:
d['exposure_time_ms'] = 1000*scanargs['exp_time']
if 'exp_burst' in scanargs:
d['exposure_num_burst'] = scanargs['exp_burst']
if 'acq_mode' in scanargs:
d['acq_mode'] = scanargs['acq_mode']
elif self.parent.scaninfo.scan_type == "step":
d['acq_mode'] = "default"
# Perform bluesky-style configuration
if len(d) > 0:
logger.warning(f"[{self.parent.name}] Configuring with:\n{d}")
self.parent.configure(d=d)
# Only start acquisition if there was config
if len(d) == 0:
logger.warning(f"[{self.parent.name}] No configuration to stage.")
return
# Sync if out of sync
if self.parent.infoSyncFlag.value == 0:
self.parent.cmdSyncHw.set(1).wait()
@@ -229,6 +219,8 @@ class GigaFrostCameraMixin(CustomDetectorMixin):
if self.parent.infoBusyFlag.get() in (0, 'IDLE'):
raise RuntimeError('GigaFrost must be running before triggering')
logger.warning(f"[{self.parent.name}] SW triggering gigafrost")
# Soft triggering based on operation mode
if self.parent.autoSoftEnable.get() and self.parent.trigger_mode == 'auto' and self.parent.enable_mode == 'soft':
# BEC teststand operation mode: posedge of SoftEnable if Started
@@ -533,7 +525,6 @@ class GigaFrostCamera(PSIDetectorBase):
image_width = d.get('image_width', 2016)
image_height = d.get('image_height', 2016)
scanid = d.get('scanid', 0)
trigger_mode = d.get('trigger_mode', None)
correction_mode = d.get('correction_mode', 5)
# change settings
@@ -545,20 +536,25 @@ class GigaFrostCamera(PSIDetectorBase):
self.cfgCntNum.set(num_images).wait()
self.cfgCorrMode.set(correction_mode).wait()
if trigger_mode is not None:
self.set_trigger_mode(str(trigger_mode))
if 'acq_mode' in d:
self.set_acquisition_mode(d['acq_mode'])
# Commit parameter
self.cmdSetParam.set(1).wait()
def set_trigger_mode(self, trigger_mode):
"""
def set_acquisition_mode(self, acq_mode):
""" Set acquisition mode
Utility function to quickly select between pre-configured and tested
acquisition modes.
NOTE: The trigger input appears to be dead, it completely ignores the
supplied signal. Use external enable instead, that works!
"""
if trigger_mode == "default":
if acq_mode == "default":
# trigger modes
self.cfgCntStartBit.set(1).wait()
self.cfgCntEndBit.set(0).wait()
@@ -567,7 +563,17 @@ class GigaFrostCamera(PSIDetectorBase):
self.enable_mode = "soft"
self.trigger_mode = "auto"
self.exposure_mode = "timer"
elif trigger_mode == "soft":
elif acq_mode in ["ext_enable", "external_enable"]:
# Switch to physical enable signal
self.cfgEnableScheme.set(0).wait()
# Trigger modes
self.cfgCntStartBit.set(1).wait()
self.cfgCntEndBit.set(0).wait()
# Set modes
self.enable_mode = "external"
self.trigger_mode = "auto"
self.exposure_mode = "timer"
elif acq_mode == "soft":
# Switch to physical enable signal
self.cfgEnableScheme.set(0).wait()
# Set enable signal to always
@@ -586,7 +592,7 @@ class GigaFrostCamera(PSIDetectorBase):
# Set trigger edge to fixed frames on posedge
self.cfgCntStartBit.set(1).wait()
self.cfgCntEndBit.set(0).wait()
elif trigger_mode in ["ext", "external"]:
elif acq_mode in ["ext", "external"]:
# Switch to physical enable signal
self.cfgEnableScheme.set(0).wait()
# Set enable signal to always
@@ -605,18 +611,8 @@ class GigaFrostCamera(PSIDetectorBase):
# Set trigger edge to fixed frames on posedge
self.cfgCntStartBit.set(1).wait()
self.cfgCntEndBit.set(0).wait()
elif trigger_mode in ["ext_enable", "external_enable"]:
# Switch to physical enable signal
self.cfgEnableScheme.set(0).wait()
# Trigger modes
self.cfgCntStartBit.set(1).wait()
self.cfgCntEndBit.set(0).wait()
# Set modes
self.enable_mode = "external"
self.trigger_mode = "auto"
self.exposure_mode = "timer"
else:
raise RuntimeError(f"Unsupported trigger mode: {trigger_mode}")
raise RuntimeError(f"Unsupported acquisition mode: {acq_mode}")
@property
def exposure_mode(self):

View File

@@ -1,198 +0,0 @@
# -*- coding: utf-8 -*-
"""
GigaFrost client module that combines camera and DAQ
Created on Thu Jun 27 17:28:43 2024
@author: mohacsi_i
"""
from ophyd import Component, DeviceStatus
from ophyd_devices.interfaces.base_classes.psi_detector_base import (
CustomDetectorMixin,
PSIDetectorBase,
)
from . import gfconstants as const
from . import stddaq_client as stddaq
from . import gigafrostcamera as gfcam
class GigaFrostClientMixin(CustomDetectorMixin):
"""Mixin class to setup TOMCAT specific implementations of the detector.
This class will be called by the custom_prepare_cls attribute of the detector class.
"""
def on_stage(self) -> None:
"""
Specify actions to be executed during stage in preparation for a scan.
self.parent.scaninfo already has all current parameters for the upcoming scan.
In case the backend service is writing data on disk, this step should include publishing
a file_event and file_message to BEC to inform the system where the data is written to.
IMPORTANT:
It must be safe to assume that the device is ready for the scan
to start immediately once this function is finished.
"""
# Gigafrost can finish a run without explicit unstaging
if self.parent._staged:
self.parent.unstage()
# self.parent.daq.stage()
# self.parent.cam.stage()
# def on_unstage(self) -> None:
# """
# Specify actions to be executed during unstage.
# This step should include checking if the acqusition was successful,
# and publishing the file location and file event message,
# with flagged done to BEC.
# """
# self.parent.cam.unstage()
# self.parent.daq.unstage()
def on_stop(self) -> None:
"""
Specify actions to be executed during stop.
This must also set self.parent.stopped to True.
This step should include stopping the detector and backend service.
"""
return self.on_unstage()
def on_trigger(self) -> None | DeviceStatus:
"""
Specify actions to be executed upon receiving trigger signal.
Return a DeviceStatus object or None
"""
return self.parent.cam.trigger()
class GigaFrostClient(PSIDetectorBase):
"""Ophyd device class to control Gigafrost cameras at Tomcat
The actual hardware is implemented by an IOC based on an old fork of Helge's
cameras. This means that the camera behaves differently than the SF cameras
in particular it provides even less feedback about it's internal progress.
Helge will update the GigaFrost IOC after working beamline.
The ophyd class is based on the 'gfclient' package and has a lot of Tomcat
specific additions. It does behave differently though, as ophyd swallows the
errors from failed PV writes.
Parameters
----------
use_soft_enable : bool
Flag to use the camera's soft enable (default: False)
backend_url : str
Backend url address necessary to set up the camera's udp header.
(default: http://xbl-daq-23:8080)
Usage:
----------
gf = GigaFrostClient(
"X02DA-CAM-GF2:", name="gf2", backend_url="http://xbl-daq-28:8080", auto_soft_enable=True,
daq_ws_url="ws://xbl-daq-29:8080", daq_rest_url="http://xbl-daq-29:5000"
)
Bugs:
----------
FRAMERATE : Ignored in soft trigger mode, period becomes 2xexposure time
"""
# pylint: disable=too-many-instance-attributes
custom_prepare_cls = GigaFrostClientMixin
USER_ACCESS = ["kickoff"]
cam = Component(gfcam.GigaFrostCamera, prefix="X02DA-CAM-GF2:", name="cam")
daq = Component(stddaq.StdDaqClient, name="daq")
# pylint: disable=too-many-arguments
def __init__(
self,
prefix="",
*,
name,
auto_soft_enable=False,
backend_url=const.BE999_DAFL_CLIENT,
daq_ws_url="ws://localhost:8080",
daq_rest_url="http://localhost:5000",
kind=None,
**kwargs,
):
self.__class__.__dict__["cam"].kwargs['backend_url'] = backend_url
self.__class__.__dict__["cam"].kwargs['auto_soft_enable'] = auto_soft_enable
self.__class__.__dict__["daq"].kwargs['ws_url'] = daq_ws_url
self.__class__.__dict__["daq"].kwargs['rest_url'] = daq_rest_url
super().__init__(prefix=prefix, name=name, kind=kind, **kwargs)
def configure(self, d: dict = None):
"""Configure the next scan with the GigaFRoST camera and standard DAQ backend.
It also makes some simple checks for consistent configuration, but otherwise
status feedback is missing on both sides.
Parameters
----------
ntotal : int, optional
Total mumber of images to be taken by the DAQ during the whole scan.
Set to -1 for an unlimited number of images (limited by the
ringbuffer size and backend speed). (default = 10000)
nimages : int, optional
Number of images to be taken during each trigger (i.e. burst).
Maximum is 16777215 images. (default = 10)
exposure : float, optional
Exposure time, max 40 ms. [ms]. (default = 0.2)
period : float, optional
Exposure period [ms], ignored in soft trigger mode. (default = 1.0)
pixel_width : int, optional
Image size in the x-direction, must be multiple of 48 [pixels] (default = 2016)
pixel_height : int, optional
Image size in the y-direction, must be multiple of 16 [pixels] (default = 2016)
scanid : int, optional
Scan identification number to be associated with the scan data.
ToDo: This should be retrieved from the BEC. (default = 0)
correction_mode : int, optional
The correction to be applied to the imaging data. The following
modes are available (default = 5):
"""
# Unstage camera (reconfiguration will anyway stop camera)
super().unstage()
# If Bluesky style configure
old = self.read_configuration()
self.cam.configure(d)
self.daq.configure(d)
new = self.read_configuration()
return old, new
def stage(self):
""" Stages the current device and all sub-devices
"""
px_daq_h = self.daq.cfg_pixel_height.get()
px_daq_w = self.daq.cfg_pixel_width.get()
px_gf_w = self.cam.cfgRoiX.get()
px_gf_h = self.cam.cfgRoiY.get()
if px_daq_h != px_gf_h or px_daq_w != px_gf_w:
raise RuntimeError("Different image size configured on GF and the DAQ")
return super().stage()
def kickoff(self) -> DeviceStatus:
if not self._staged:
self.stage()
return DeviceStatus(self, done=True, success=True, settle_time=0.1)
# def trigger(self) -> DeviceStatus:
# """ Triggers the current device and all sub-devices, i.e. the camera.
# """
# status = super().trigger()
# return status
# Automatically connect to MicroSAXS testbench if directly invoked
if __name__ == "__main__":
gf = GigaFrostClient(
"X02DA-CAM-GF2:", name="gf2", backend_url="http://xbl-daq-28:8080", auto_soft_enable=True,
daq_ws_url="ws://xbl-daq-29:8080", daq_rest_url="http://xbl-daq-29:5000"
)
gf.wait_for_connection()

View File

@@ -40,15 +40,19 @@ class StdDaqMixin(CustomDeviceMixin):
d = {}
if 'kwargs' in scanparam:
scanargs = scanparam['kwargs']
for prefix in ["", alias + "_"]:
if f'{prefix}image_width' in scanargs:
d['image_width'] = scanargs[f'{prefix}image_width']
if f'{prefix}image_height' in scanargs:
d['image_height'] = scanargs[f'{prefix}image_height']
if f'{prefix}num_points_total' in scanargs:
d['num_points_total'] = scanargs[f'{prefix}num_points_total']
if f'{prefix}file_path' in scanargs:
d['file_path'] = scanargs[f'{prefix}file_path']
if 'image_width' in scanargs:
d['image_width'] = scanargs['image_width']
if 'image_height' in scanargs:
d['image_height'] = scanargs['image_height']
# NOTE: Scans don't have to fully configure the device
if "steps" in scanargs and "exp_burst" in scanargs:
scan_steps = scanargs["steps"]
scan_burst = scanargs["exp_burst"]
d["num_points_total"] = (scan_steps+1) * scan_burst
elif "exp_burst" in scanargs:
d["num_points_total"] = scanargs["exp_burst"]
elif "steps" in scanargs:
d["num_points_total"] = scanargs["steps"]
# Perform bluesky-style configuration
if len(d) > 0:
@@ -59,11 +63,6 @@ class StdDaqMixin(CustomDeviceMixin):
logger.warning(f"[{self.parent.name}] Configuring with:\n{d}")
self.parent.configure(d=d)
# Only start acquisition if there was config
if len(d) == 0:
logger.warning(f"[{self.parent.name}] No configuration to stage.")
return
# Try to start a new run
file_path = self.parent.file_path.get()
num_images = self.parent.num_images.get()

View File

@@ -36,12 +36,16 @@ class TomcatStepScan(ScanBase):
"""
scan_name = "tomcatstepscan"
scan_type = "step"
required_kwargs = ["scan_start", "scan_end", "steps"]
gui_config = {
"Movement parameters": ["steps"],
"Acquisition parameters": ["exp_time", "exp_burst", "roix", "roiy"],
"Acquisition parameters": ["exp_time", "exp_burst", "image_width", "image_height"],
}
def update_scan_motors(self):
self.scan_motors = ["es1_roty"]
def _get_scan_motors(self):
self.scan_motors = ["es1_roty"]
@@ -53,31 +57,18 @@ class TomcatStepScan(ScanBase):
exp_time=0.005,
settling_time=0.2,
exp_burst=1,
roix=2016,
roiy=2016,
image_width=2016,
image_height=2016,
sync="event",
**kwargs,
):
# Converting generic kwargs to tomcat device configuration parameters
# Used by gigafrost
# FIXME: This should go to the device (maybe use the scanargs to identify itself)
kwargs["parameter"]["kwargs"]["exposure_time_ms"] = 1000 * exp_time
kwargs["parameter"]["kwargs"]["exposure_period_ms"] = 2 * 1000 * exp_time
kwargs["parameter"]["kwargs"]["exposure_num_burst"] = exp_burst
kwargs["parameter"]["kwargs"]["image_width"] = roix
kwargs["parameter"]["kwargs"]["image_height"] = roiy
# Used by stdDAQ and DDC
kwargs["parameter"]["kwargs"]["num_points_total"] = exp_burst * (steps + 1)
t_modes = {"pso": 0, "event": 1, "inp0": 2, "inp1": 4}
ddc_trigger = t_modes[sync]
kwargs["parameter"]["kwargs"]["ddc_trigger"] = ddc_trigger
# Use PSO trigger
kwargs["parameter"]["kwargs"]["pso_wavemode"] = "pulsed"
super().__init__(
exp_time=exp_time,
settling_time=settling_time,
relative=False,
burst_at_each_point=1,
optim_trajectory=None,
**kwargs,
@@ -94,7 +85,26 @@ class TomcatStepScan(ScanBase):
"""Pre-calculate scan positions"""
for ii in range(self.scan_steps + 1):
self.positions.append(self.scan_start + ii * self.scan_stepsize)
# FIXME : override at_each_point
def _at_each_point(self, ind=None, pos=None):
""" Overriden at_each_point, using detector burst instaead of manual triggering"""
trigger_time = self.exp_time * self.burst_at_each_point
# yield from self.stubs.trigger(min_wait=trigger_time)
yield from self.stubs.trigger(group='trigger', point_id=self.point_id)
time.sleep(trigger_time)
time.sleep(self.settling_time)
yield from self.stubs.read(group="monitored", point_id=self.point_id, wait_group=None)
# self.point_id += 1
def cleanup(self):
"""Set scan progress to 1 to finish the scan"""
self.num_pos = 1
return super().cleanup()
class TomcatSnapNStep(AsyncFlyScanBase):
"""Simple software step scan forTomcat
@@ -108,13 +118,14 @@ class TomcatSnapNStep(AsyncFlyScanBase):
"""
scan_name = "tomcatsnapnstepscan"
arg_input = {"camera" : ScanArgType.DEVICE,
"exp_time" : ScanArgType.FLOAT}
arg_bundle_size= {"bundle": len(arg_input), "min": 1, "max": None}
scan_type = "scripted"
# arg_input = {"camera" : ScanArgType.DEVICE,
# "exp_time" : ScanArgType.FLOAT}
# arg_bundle_size= {"bundle": len(arg_input), "min": 1, "max": None}
required_kwargs = ["scan_start", "scan_end", "steps"]
gui_config = {
"Movement parameters": ["steps"],
"Acquisition parameters": ["exp_time", "exp_burst", "roix", "roiy"],
"Acquisition parameters": ["exp_time", "exp_burst", "image_width", "image_height"],
}
def _get_scan_motors(self):
@@ -128,20 +139,13 @@ class TomcatSnapNStep(AsyncFlyScanBase):
exp_time:float=0.005,
settling_time:float=0.2,
exp_burst:int=1,
roix:int=2016,
roiy:int=2016,
image_width:int=2016,
image_height:int=2016,
sync:str="event",
**kwargs,
):
# Converting generic kwargs to tomcat device configuration parameters
# Used by gigafrost
kwargs["parameter"]["kwargs"]["exposure_time_ms"] = 1000 * exp_time
kwargs["parameter"]["kwargs"]["exposure_period_ms"] = 2 * 1000 * exp_time
kwargs["parameter"]["kwargs"]["exposure_num_burst"] = exp_burst
kwargs["parameter"]["kwargs"]["image_width"] = roix
kwargs["parameter"]["kwargs"]["image_height"] = roiy
# Used by stdDAQ and DDC
kwargs["parameter"]["kwargs"]["num_points_total"] = exp_burst * (steps + 1)
t_modes = {"pso": 0, "event": 1, "inp0": 2, "inp1": 4}
ddc_trigger = t_modes[sync]
kwargs["parameter"]["kwargs"]["ddc_trigger"] = ddc_trigger
@@ -168,7 +172,6 @@ class TomcatSnapNStep(AsyncFlyScanBase):
super().__init__(
exp_time=exp_time,
settling_time=settling_time,
relative=False,
burst_at_each_point=1,
optim_trajectory=None,
**kwargs,
@@ -250,6 +253,7 @@ class TomcatSimpleSequence(AsyncFlyScanBase):
"""
scan_name = "tomcatsimplesequencescan"
scan_type = "scripted"
scan_report_hint = "table"
required_kwargs = ["scan_start", "gate_high", "gate_low"]
gui_config = {
@@ -259,8 +263,8 @@ class TomcatSimpleSequence(AsyncFlyScanBase):
"gate_low",
"exp_time",
"exp_burst",
"roix",
"roiy",
"image_width",
"image_height",
"sync",
],
}
@@ -277,8 +281,8 @@ class TomcatSimpleSequence(AsyncFlyScanBase):
repmode: str = "PosNeg",
exp_time: float = 0.005,
exp_burst: float = 180,
roix: int = 2016,
roiy: int = 2016,
image_width: int = 2016,
image_height: int = 2016,
sync: str = "pso",
**kwargs,
):
@@ -310,14 +314,7 @@ class TomcatSimpleSequence(AsyncFlyScanBase):
raise RuntimeError(f"Unsupported repetition mode: {self.scan_repmode}")
# Converting generic kwargs to tomcat device configuration parameters
# Used by gigafrost
kwargs["parameter"]["kwargs"]["exposure_time_ms"] = 1000 * exp_time
kwargs["parameter"]["kwargs"]["exposure_period_ms"] = 2 * 1000 * exp_time
kwargs["parameter"]["kwargs"]["exposure_num_burst"] = exp_burst
kwargs["parameter"]["kwargs"]["image_width"] = roix
kwargs["parameter"]["kwargs"]["image_height"] = roiy
# Used by stdDAQ and DDC
kwargs["parameter"]["kwargs"]["num_points_total"] = exp_burst * (self.scan_repnum + 1)
# Used by DDC
t_modes = {"pso": 0, "event": 1, "inp0": 2, "inp1": 4}
ddc_trigger = t_modes[sync]
kwargs["parameter"]["kwargs"]["ddc_trigger"] = ddc_trigger

View File

@@ -0,0 +1,168 @@
""" Demo scans for Tomcat at the microXAS test bench
"""
# def bl_check_beam():
# """Checks beamline status"""
# motor_enabled = bool(dev.es1_roty.motor_enable.get())
# result = motor_enabled
# return result
def dev_disable_all():
"""Disable all devices """
for d in dev:
d.enabled = False
def anotherstepscan(
scan_start,
scan_end,
steps,
exp_time=0.005,
exp_burst=5,
settling_time=0,
image_width=2016,
image_height=2016,
sync="event",
):
"""Demo step scan with GigaFrost
This is a small BEC user-space demo step scan at the microXAS testbench
using the gigafrost in software triggering mode. It tries to be a
standard BEC scan, while still setting up the environment.
Example:
--------
demostepscan(scan_start=-32, scan_end=148, steps=180, exp_time=0.005, exp_burst=5)
"""
if not bl_check_beam():
raise RuntimeError("Beamline is not in ready state")
dev.es1_tasks.enabled = False
dev.es1_psod.enabled = False
dev.es1_ddaq.enabled = True
dev.gfcam.enabled = True
dev.gfdaq.enabled = True
dev.daq_stream0.enabled = True
dev.daq_stream1.enabled = False
print("Handing over to 'scans.tomcatstepscan'")
scans.tomcatstepscan(
scan_start=scan_start,
scan_end=scan_end,
steps=steps,
exp_time=exp_time,
exp_burst=exp_burst,
relative=False,
image_width=image_width,
image_height=image_height,
settling_time=settling_time,
sync=sync
)
def anothersequencescan(
scan_start,
gate_high,
gate_low,
repeats=1,
repmode="PosNeg",
exp_time=0.005,
exp_burst=180,
image_width=2016,
image_height=2016,
sync="pso",
):
"""Demo sequence scan with GigaFrost
This is a small BEC user-space sequence scan at the microXAS testbench
triggering a customized scripted scan on the controller. The scan uses
a pre-written custom low-level sequence scan, so it is really minimal.
NOTE: It calls the AeroScript template version.
Example:
--------
>>> demosequencescan(33, 180, 180, exp_time=0.005, exp_frames=1800, repeats=10)
"""
if not bl_check_beam():
raise RuntimeError("Beamline is not in ready state")
dev.es1_tasks.enabled = True
dev.es1_psod.enabled = False
dev.es1_ddaq.enabled = True
dev.gfcam.enabled = True
dev.gfdaq.enabled = True
dev.daq_stream0.enabled = True
dev.daq_stream1.enabled = False
print("Handing over to 'scans.sequencescan'")
scans.sequencescan(
scan_start,
gate_high,
gate_low,
repeats=repeats,
repmode=repmode,
exp_time=exp_time,
exp_burst=exp_burst,
image_width=image_width,
image_height=image_height,
sync=sync,
)
def anothersnapnstepscan(
scan_start,
scan_end,
steps,
exp_time=0.005,
exp_burst=180,
image_width=2016,
image_height=2016,
settling_time=0.1,
sync="pso",
):
"""Demo snapnstep scan with GigaFrost
This is a small BEC user-space sequence scan at the microXAS testbench
triggering a customized scripted scan on the controller. The scan uses
a pre-written custom low-level sequence scan, so it is really minimal.
NOTE: It calls the AeroScript template version.
Example:
--------
>>> demosequencescan(33, 180, 180, exp_time=0.005, exp_frames=1800, repeats=10)
"""
if not bl_check_beam():
raise RuntimeError("Beamline is not in ready state")
dev.es1_tasks.enabled = True
dev.es1_psod.enabled = False
dev.es1_ddaq.enabled = True
dev.gfcam.enabled = True
dev.gfdaq.enabled = True
dev.daq_stream0.enabled = True
dev.daq_stream1.enabled = False
print("Handing over to 'scans.tomcatsnapnstepscan'")
scans.tomcatsnapnstepscan(
scan_start,
scan_end,
steps,
exp_time=exp_time,
exp_burst=exp_burst,
image_width=image_width,
image_height=image_height,
settling_time=settling_time,
sync=sync,
)