At least no exceptions

This commit is contained in:
gac-x06da
2025-05-14 13:01:41 +02:00
committed by mohacsi_i
parent e38269b96f
commit e77f9af9ca
3 changed files with 119 additions and 152 deletions

View File

@@ -1,100 +1,71 @@
from bec_widgets.cli.auto_updates import AutoUpdates, ScanInfo
from bec_widgets.widgets.containers.auto_update.auto_updates import AutoUpdates
from bec_lib.messages import ScanStatusMessage
from bec_widgets.cli.rpc.rpc_base import RPCResponseTimeoutError
class PlotUpdate(AutoUpdates):
create_default_dock = True
enabled = True
_scan_msg = None
def do_update(self, msg):
"""Save the original scan message for future use"""
self._scan_msg = msg
return super().do_update(msg)
#######################################################################
################# GUI Callbacks #######################################
#######################################################################
# def simple_line_scan(self, info: ScanInfo) -> None:
# """
# Simple line scan.
# """
# dev_x = info.scan_report_devices[0]
# dev_y = self.get_selected_device(info.monitored_devices, self.figure.selected_device)
# if not dev_y:
# return
# self.figure.clear_all()
# plt = self.figure.plot(dev_x, dev_y)
# plt.set(title=f"PXIII: Scan {info.scan_number}", x_label=dev_x, y_label=dev_y)
def plot_handler(self, info: ScanInfo) -> None:
"""Simple keyword handler for 'plot'
This simple keyword handler looks for the keyword 'plot' in the scan arguments.
This allows the user to explictly specify the desired data source. Useful for alignment
scans.
def on_start(self) -> None:
"""
print(info.scan_report_devices)
Procedure to run when the auto updates are enabled.
"""
self.start_default_dock()
dev_x = info.scan_report_devices[0]
if "kwargs" in self._scan_msg.info:
det = self._scan_msg.info["kwargs"].get("plot", None)
else:
det = None
if not det:
return
def on_stop(self) -> None:
"""
Procedure to run when the auto updates are disabled.
"""
plt1 = self.get_default_figure()
# clear_all() will throw RPCResponseTimeoutError
try:
plt1.clear_all()
except RPCResponseTimeoutError:
pass
def on_scan_open(self, msg: ScanStatusMessage) -> None:
"""
Procedure to run when a scan starts.
# plot() will throw RPCResponseTimeoutError
try:
if len(info.scan_report_devices) == 2:
# 2D plot
dev_x = info.scan_report_devices[0]
dev_y = info.scan_report_devices[1]
plt1.plot(
Args:
msg (ScanStatusMessage): The scan status message.
"""
if msg.scan_name == "line_scan" and msg.scan_report_devices:
return self.simple_line_scan(msg)
if msg.scan_name == "grid_scan" and msg.scan_report_devices:
return self.simple_grid_scan(msg)
dev_x = msg.scan_report_devices[0]
if "kwargs" in msg.request_inputs:
dev_y = msg.request_inputs["kwargs"].get("plot", None)
if dev_y is not None:
# Set the dock to the waveform widget
wf = self.set_dock_to_widget("Waveform")
# Clear the waveform widget and plot the data
wf.clear_all()
wf.plot(
x_name=dev_x,
y_name=dev_y,
z_name=det,
title=f"Scan {info.scan_number}",
label=f"Scan {msg.info.scan_number} - {dev_y}",
title=f"Scan {msg.info.scan_number}",
x_label=dev_x,
y_label=dev_y,
z_label=det,
)
elif len(info.scan_report_devices) == 1:
# 1D plot
dev_x = info.scan_report_devices[0]
plt1.plot(
x_name=dev_x,
y_name=det,
title=f"Scan {info.scan_number}",
x_label=dev_x,
y_label=det,
)
else:
# Default is 1D
dev_x = info.scan_report_devices[0]
plt1.plot(
x_name=dev_x,
y_name=det,
title=f"Scan {info.scan_number}",
x_label=dev_x,
y_label=det,
)
except RPCResponseTimeoutError:
pass
# plt1.add_dap(dev_x, det, dap="LinearModel")
elif msg.scan_report_devices:
return self.best_effort(msg)
return None
def handler(self, info: ScanInfo) -> None:
"""Dock configuration handler"""
# EXAMPLES:
# if info.scan_name == "line_scan" and info.scan_report_devices:
# self.simple_line_scan(info)
# return
# if info.scan_name == "grid_scan" and info.scan_report_devices:
# self.run_grid_scan_update(info)
# return
super().handler(info)
self.plot_handler(info)
def on_scan_closed(self, msg: ScanStatusMessage) -> None:
"""
Procedure to run when a scan ends.
Args:
msg (ScanStatusMessage): The scan status message.
"""
def on_scan_abort(self, msg: ScanStatusMessage) -> None:
"""
Procedure to run when a scan is aborted.
Args:
msg (ScanStatusMessage): The scan status message.
"""

View File

@@ -152,7 +152,7 @@ fi1_try:
dccm_theta1:
description: Monochromator pitch 1
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-OP-DCCM:PITCH1'}
deviceConfig: {prefix: 'X06DA-OP-DCCM:THETA1'}
onFailure: buffer
enabled: true
readoutPriority: monitored
@@ -179,7 +179,7 @@ dccm_diode_bottom:
dccm_theta2:
description: Monochromator pitch 2
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-OP-DCCM:PITCH2'}
deviceConfig: {prefix: 'X06DA-OP-DCCM:THETA2'}
onFailure: buffer
enabled: true
readoutPriority: monitored

View File

@@ -175,76 +175,72 @@ class AerotechAbrStage(PSIDeviceBase, Device):
NOTE: Zac's request is that stage is essentially ARM, i.e. get ready and don't do anything.
"""
logger.warning(f"Configuring {self.scaninfo.scan_msg.info['scan_name']} on ABR")
d = {}
if self.scan_info.scan_type in ("measure", "measurement", "fly"):
# FIXME: I don't care about how we fish out config parameters from scan info
scan_args = {
**self.scan_info.msg.request_inputs["inputs"],
**self.scan_info.msg.request_inputs["kwargs"],
**self.scan_info.msg.scan_parameters,
}
scanname = self.scan_info.scan_msg.info["scan_name"]
# FIXME: I don't care about how we fish out config parameters from scan info
scan_args = {
**self.scan_info.msg.request_inputs["inputs"],
**self.scan_info.msg.request_inputs["kwargs"],
**self.scan_info.msg.scan_parameters,
}
scanname = self.scan_info.msg.scan_name
if scanname in (
"standardscan",
"helicalscan",
"helicalscan1",
"helicalscan2",
"helicalscan3",
):
d["scan_command"] = AbrCmd.MEASURE_STANDARD
d["var_1"] = scan_args["start"]
d["var_2"] = scan_args["range"]
d["var_3"] = scan_args["move_time"]
d["var_4"] = scan_args.get("ready_rate", 500)
d["var_5"] = 0
d["var_6"] = 0
d["var_7"] = 0
# d["var_8"] = 0
# d["var_9"] = 0
if scanname in ("verticallinescan", "vlinescan"):
d["scan_command"] = AbrCmd.VERTICAL_LINE_SCAN
d["var_1"] = scan_args["range"] / scan_args["steps"]
d["var_2"] = scan_args["steps"]
d["var_3"] = scan_args["exp_time"]
d["var_4"] = 0
d["var_5"] = 0
d["var_6"] = 0
d["var_7"] = 0
# d["var_8"] = 0
# d["var_9"] = 0
if scanname in ("screeningscan"):
d["scan_command"] = AbrCmd.SCREENING
d["var_1"] = scan_args["start"]
d["var_2"] = scan_args["oscrange"]
d["var_3"] = scan_args["exp_time"]
d["var_4"] = scan_args["range"] / scan_args["steps"]
d["var_5"] = scan_args["steps"]
d["var_6"] = scan_args.get("delta", 0.5)
d["var_7"] = 0
# d["var_8"] = 0
# d["var_9"] = 0
if scanname in ("rasterscan", "rastersimplescan"):
d["scan_command"] = AbrCmd.RASTER_SCAN_SIMPLE
d["var_1"] = scan_args["exp_time"]
d["var_2"] = scan_args["range_x"] / scan_args["steps_x"]
d["var_3"] = scan_args["range_y"] / scan_args["steps_y"]
d["var_4"] = scan_args["steps_x"]
d["var_5"] = scan_args["steps_y"]
d["var_6"] = 0
d["var_7"] = 0
# d["var_8"] = 0
# d["var_9"] = 0
if scanname in (
"standardscan",
"helicalscan",
"helicalscan1",
"helicalscan2",
"helicalscan3",
):
d["scan_command"] = AbrCmd.MEASURE_STANDARD
d["var_1"] = scan_args["start"]
d["var_2"] = scan_args["range"]
d["var_3"] = scan_args["move_time"]
d["var_4"] = scan_args.get("ready_rate", 500)
d["var_5"] = 0
d["var_6"] = 0
d["var_7"] = 0
# d["var_8"] = 0
# d["var_9"] = 0
if scanname in ("verticallinescan", "vlinescan"):
d["scan_command"] = AbrCmd.VERTICAL_LINE_SCAN
d["var_1"] = scan_args["range"] / scan_args["steps"]
d["var_2"] = scan_args["steps"]
d["var_3"] = scan_args["exp_time"]
d["var_4"] = 0
d["var_5"] = 0
d["var_6"] = 0
d["var_7"] = 0
# d["var_8"] = 0
# d["var_9"] = 0
if scanname in ("screeningscan"):
d["scan_command"] = AbrCmd.SCREENING
d["var_1"] = scan_args["start"]
d["var_2"] = scan_args["oscrange"]
d["var_3"] = scan_args["exp_time"]
d["var_4"] = scan_args["range"] / scan_args["steps"]
d["var_5"] = scan_args["steps"]
d["var_6"] = scan_args.get("delta", 0.5)
d["var_7"] = 0
# d["var_8"] = 0
# d["var_9"] = 0
if scanname in ("rasterscan", "rastersimplescan"):
d["scan_command"] = AbrCmd.RASTER_SCAN_SIMPLE
d["var_1"] = scan_args["exp_time"]
d["var_2"] = scan_args["range_x"] / scan_args["steps_x"]
d["var_3"] = scan_args["range_y"] / scan_args["steps_y"]
d["var_4"] = scan_args["steps_x"]
d["var_5"] = scan_args["steps_y"]
d["var_6"] = 0
d["var_7"] = 0
# d["var_8"] = 0
# d["var_9"] = 0
# Reconfigure if got a valid scan config
if len(d) > 0:
self.configure(d)
# Reconfigure if got a valid scan config
if len(d) > 0:
self.configure(d)
# Stage the ABR stage
self.arm()
# Stage the ABR stage
self.arm()
def on_unstage(self):
"""Unstage the ABR controller"""