Verifying scan compatibility

This commit is contained in:
gac-x05la
2025-04-17 13:01:22 +02:00
parent 9a878db49a
commit 2d23c5e071
4 changed files with 95 additions and 48 deletions

View File

@@ -132,6 +132,25 @@ es1_ddaq:
# readoutPriority: monitored
# softwareTrigger: true
gfcam:
description: GigaFrost camera client
deviceClass: tomcat_bec.devices.GigaFrostCamera
deviceConfig:
prefix: 'X02DA-CAM-GF2:'
backend_url: 'http://sls-daq-001:8080'
auto_soft_enable: true
deviceTags:
- camera
- trigger
- gfcam
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: monitored
softwareTrigger: true
# gfcam:
# description: GigaFrost camera client
# deviceClass: tomcat_bec.devices.GigaFrostCamera
@@ -182,6 +201,23 @@ es1_ddaq:
# readoutPriority: monitored
# softwareTrigger: false
pcocam:
description: PCO.edge camera client
deviceClass: tomcat_bec.devices.PcoEdge5M
deviceConfig:
prefix: 'X02DA-CCDCAM2:'
deviceTags:
- camera
- trigger
- pcocam
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: monitored
softwareTrigger: true
# pcocam:
# description: PCO.edge camera client
# deviceClass: tomcat_bec.devices.PcoEdge5M

View File

@@ -116,38 +116,44 @@ class aa1AxisDriveDataCollection(PSIDeviceBase, Device):
def on_stage(self) -> None:
"""Configuration and staging"""
# Fish out configuration from scaninfo (does not need to be full configuration)
d = {}
if "kwargs" in self.scaninfo.scan_msg.info:
scanargs = self.scaninfo.scan_msg.info["kwargs"]
# NOTE: Scans don't have to fully configure the device
if "ddc_trigger" in scanargs:
d["ddc_trigger"] = scanargs["ddc_trigger"]
if "ddc_num_points" in scanargs:
d["num_points_total"] = scanargs["ddc_num_points"]
else:
# Try to figure out number of points
num_points = 1
points_valid = False
if "steps" in scanargs and scanargs["steps"] is not None:
num_points *= scanargs["steps"]
points_valid = True
elif "exp_burst" in scanargs and scanargs["exp_burst"] is not None:
num_points *= scanargs["exp_burst"]
points_valid = True
elif "repeats" in scanargs and scanargs["repeats"] is not None:
num_points *= scanargs["repeats"]
points_valid = True
if points_valid:
d["num_points_total"] = num_points
scan_args = {
**self.scan_info.msg.request_inputs["inputs"],
**self.scan_info.msg.request_inputs["kwargs"],
**self.scan_info.msg.scan_parameters,
}
# NOTE: Scans don't have to fully configure the device
if "ddc_trigger" in scan_args:
d["ddc_trigger"] = scan_args["ddc_trigger"]
if "ddc_num_points" in scan_args:
d["num_points_total"] = scan_args["ddc_num_points"]
else:
# Try to figure out number of points
num_points = 1
points_valid = False
if "steps" in scan_args and scan_args["steps"] is not None:
num_points *= scan_args["steps"]
points_valid = True
if "exp_burst" in scan_args and scan_args["exp_burst"] is not None:
num_points *= scan_args["exp_burst"]
points_valid = True
if "repeats" in scan_args and scan_args["repeats"] is not None:
num_points *= scan_args["repeats"]
points_valid = True
if "burst_at_each_point" in scan_args and scan_args["burst_at_each_point"] is not None:
num_points *= scan_args["burst_at_each_point"]
points_valid = True
if points_valid:
d["num_points_total"] = num_points
# Perform bluesky-style configuration
if len(d) > 0:
logger.warning(f"[{self.name}] Configuring with:\n{d}")
if d:
self.configure(d=d)
# Stage the data collection if not in internally launced mode
# NOTE: Scripted scans start acquiring from the scrits
if self.scaninfo.scan_type not in ("script", "scripted"):
if "scan_type" in scan_args and scan_args["scan_type"] in ("scripted", "script"):
self.arm()
# Reset readback
self.reset()

View File

@@ -257,22 +257,24 @@ class aa1AxisPsoDistance(AerotechPsoBase):
"""
# Fish out configuration from scaninfo (does not need to be full configuration)
d = {}
if "kwargs" in self.scaninfo.scan_msg.info:
scanargs = self.scaninfo.scan_msg.info["kwargs"]
if "pso_distance" in scanargs:
d["pso_distance"] = scanargs["pso_distance"]
if "pso_wavemode" in scanargs:
d["pso_wavemode"] = scanargs["pso_wavemode"]
if "pso_w_pulse" in scanargs:
d["pso_w_pulse"] = scanargs["pso_w_pulse"]
if "pso_t_pulse" in scanargs:
d["pso_t_pulse"] = scanargs["pso_t_pulse"]
if "pso_n_pulse" in scanargs:
d["pso_n_pulse"] = scanargs["pso_n_pulse"]
scan_args = {
**self.scan_info.msg.request_inputs["inputs"],
**self.scan_info.msg.request_inputs["kwargs"],
**self.scan_info.msg.scan_parameters,
}
if "pso_distance" in scan_args:
d["pso_distance"] = scan_args["pso_distance"]
if "pso_wavemode" in scan_args:
d["pso_wavemode"] = scan_args["pso_wavemode"]
if "pso_w_pulse" in scan_args:
d["pso_w_pulse"] = scan_args["pso_w_pulse"]
if "pso_t_pulse" in scan_args:
d["pso_t_pulse"] = scan_args["pso_t_pulse"]
if "pso_n_pulse" in scan_args:
d["pso_n_pulse"] = scan_args["pso_n_pulse"]
# Perform bluesky-style configuration
if d:
# logger.info(f"[{self.name}] Configuring with:\n{d}")
self.configure(d=d)
# Stage the PSO distance module

View File

@@ -129,16 +129,19 @@ class aa1Tasks(PSIDeviceBase, Device):
"""
# Fish out our configuration from scaninfo (via explicit or generic addressing)
d = {}
if "kwargs" in self.scaninfo.scan_msg.info:
scanargs = self.scaninfo.scan_msg.info["kwargs"]
if self.scaninfo.scan_type in ("script", "scripted"):
# NOTE: Scans don't have to fully configure the device
if "script_text" in scanargs and scanargs["script_text"] is not None:
d["script_text"] = scanargs["script_text"]
if "script_file" in scanargs and scanargs["script_file"] is not None:
d["script_file"] = scanargs["script_file"]
if "script_task" in scanargs and scanargs["script_task"] is not None:
d["script_task"] = scanargs["script_task"]
scan_args = {
**self.scan_info.msg.request_inputs["inputs"],
**self.scan_info.msg.request_inputs["kwargs"],
**self.scan_info.msg.scan_parameters,
}
# if self.scan_info.scan_type in ("script", "scripted"):
# NOTE: Scans don't have to fully configure the device
if "script_text" in scan_args and scan_args["script_text"] is not None:
d["script_text"] = scan_args["script_text"]
if "script_file" in scan_args and scan_args["script_file"] is not None:
d["script_file"] = scan_args["script_file"]
if "script_task" in scan_args and scan_args["script_task"] is not None:
d["script_task"] = scan_args["script_task"]
# FIXME: The above should be exchanged with:
# d = self.scan_info.scan_msg.scan_parameters.get("aerotech_config")