At least no exceptions
This commit is contained in:
@@ -1,100 +1,71 @@
|
||||
from bec_widgets.cli.auto_updates import AutoUpdates, ScanInfo
|
||||
from bec_widgets.widgets.containers.auto_update.auto_updates import AutoUpdates
|
||||
|
||||
from bec_lib.messages import ScanStatusMessage
|
||||
from bec_widgets.cli.rpc.rpc_base import RPCResponseTimeoutError
|
||||
|
||||
|
||||
class PlotUpdate(AutoUpdates):
|
||||
create_default_dock = True
|
||||
enabled = True
|
||||
_scan_msg = None
|
||||
|
||||
def do_update(self, msg):
|
||||
"""Save the original scan message for future use"""
|
||||
self._scan_msg = msg
|
||||
return super().do_update(msg)
|
||||
#######################################################################
|
||||
################# GUI Callbacks #######################################
|
||||
#######################################################################
|
||||
|
||||
# def simple_line_scan(self, info: ScanInfo) -> None:
|
||||
# """
|
||||
# Simple line scan.
|
||||
# """
|
||||
# dev_x = info.scan_report_devices[0]
|
||||
# dev_y = self.get_selected_device(info.monitored_devices, self.figure.selected_device)
|
||||
# if not dev_y:
|
||||
# return
|
||||
# self.figure.clear_all()
|
||||
# plt = self.figure.plot(dev_x, dev_y)
|
||||
# plt.set(title=f"PXIII: Scan {info.scan_number}", x_label=dev_x, y_label=dev_y)
|
||||
|
||||
def plot_handler(self, info: ScanInfo) -> None:
|
||||
"""Simple keyword handler for 'plot'
|
||||
|
||||
This simple keyword handler looks for the keyword 'plot' in the scan arguments.
|
||||
This allows the user to explictly specify the desired data source. Useful for alignment
|
||||
scans.
|
||||
def on_start(self) -> None:
|
||||
"""
|
||||
print(info.scan_report_devices)
|
||||
Procedure to run when the auto updates are enabled.
|
||||
"""
|
||||
self.start_default_dock()
|
||||
|
||||
dev_x = info.scan_report_devices[0]
|
||||
if "kwargs" in self._scan_msg.info:
|
||||
det = self._scan_msg.info["kwargs"].get("plot", None)
|
||||
else:
|
||||
det = None
|
||||
if not det:
|
||||
return
|
||||
def on_stop(self) -> None:
|
||||
"""
|
||||
Procedure to run when the auto updates are disabled.
|
||||
"""
|
||||
|
||||
plt1 = self.get_default_figure()
|
||||
# clear_all() will throw RPCResponseTimeoutError
|
||||
try:
|
||||
plt1.clear_all()
|
||||
except RPCResponseTimeoutError:
|
||||
pass
|
||||
def on_scan_open(self, msg: ScanStatusMessage) -> None:
|
||||
"""
|
||||
Procedure to run when a scan starts.
|
||||
|
||||
# plot() will throw RPCResponseTimeoutError
|
||||
try:
|
||||
if len(info.scan_report_devices) == 2:
|
||||
# 2D plot
|
||||
dev_x = info.scan_report_devices[0]
|
||||
dev_y = info.scan_report_devices[1]
|
||||
plt1.plot(
|
||||
Args:
|
||||
msg (ScanStatusMessage): The scan status message.
|
||||
"""
|
||||
if msg.scan_name == "line_scan" and msg.scan_report_devices:
|
||||
return self.simple_line_scan(msg)
|
||||
if msg.scan_name == "grid_scan" and msg.scan_report_devices:
|
||||
return self.simple_grid_scan(msg)
|
||||
|
||||
dev_x = msg.scan_report_devices[0]
|
||||
if "kwargs" in msg.request_inputs:
|
||||
dev_y = msg.request_inputs["kwargs"].get("plot", None)
|
||||
if dev_y is not None:
|
||||
# Set the dock to the waveform widget
|
||||
wf = self.set_dock_to_widget("Waveform")
|
||||
|
||||
# Clear the waveform widget and plot the data
|
||||
wf.clear_all()
|
||||
wf.plot(
|
||||
x_name=dev_x,
|
||||
y_name=dev_y,
|
||||
z_name=det,
|
||||
title=f"Scan {info.scan_number}",
|
||||
label=f"Scan {msg.info.scan_number} - {dev_y}",
|
||||
title=f"Scan {msg.info.scan_number}",
|
||||
x_label=dev_x,
|
||||
y_label=dev_y,
|
||||
z_label=det,
|
||||
)
|
||||
elif len(info.scan_report_devices) == 1:
|
||||
# 1D plot
|
||||
dev_x = info.scan_report_devices[0]
|
||||
plt1.plot(
|
||||
x_name=dev_x,
|
||||
y_name=det,
|
||||
title=f"Scan {info.scan_number}",
|
||||
x_label=dev_x,
|
||||
y_label=det,
|
||||
)
|
||||
else:
|
||||
# Default is 1D
|
||||
dev_x = info.scan_report_devices[0]
|
||||
plt1.plot(
|
||||
x_name=dev_x,
|
||||
y_name=det,
|
||||
title=f"Scan {info.scan_number}",
|
||||
x_label=dev_x,
|
||||
y_label=det,
|
||||
)
|
||||
except RPCResponseTimeoutError:
|
||||
pass
|
||||
# plt1.add_dap(dev_x, det, dap="LinearModel")
|
||||
elif msg.scan_report_devices:
|
||||
return self.best_effort(msg)
|
||||
return None
|
||||
|
||||
def handler(self, info: ScanInfo) -> None:
|
||||
"""Dock configuration handler"""
|
||||
# EXAMPLES:
|
||||
# if info.scan_name == "line_scan" and info.scan_report_devices:
|
||||
# self.simple_line_scan(info)
|
||||
# return
|
||||
# if info.scan_name == "grid_scan" and info.scan_report_devices:
|
||||
# self.run_grid_scan_update(info)
|
||||
# return
|
||||
super().handler(info)
|
||||
self.plot_handler(info)
|
||||
def on_scan_closed(self, msg: ScanStatusMessage) -> None:
|
||||
"""
|
||||
Procedure to run when a scan ends.
|
||||
|
||||
Args:
|
||||
msg (ScanStatusMessage): The scan status message.
|
||||
"""
|
||||
|
||||
def on_scan_abort(self, msg: ScanStatusMessage) -> None:
|
||||
"""
|
||||
Procedure to run when a scan is aborted.
|
||||
|
||||
Args:
|
||||
msg (ScanStatusMessage): The scan status message.
|
||||
"""
|
||||
@@ -152,7 +152,7 @@ fi1_try:
|
||||
dccm_theta1:
|
||||
description: Monochromator pitch 1
|
||||
deviceClass: ophyd.EpicsMotor
|
||||
deviceConfig: {prefix: 'X06DA-OP-DCCM:PITCH1'}
|
||||
deviceConfig: {prefix: 'X06DA-OP-DCCM:THETA1'}
|
||||
onFailure: buffer
|
||||
enabled: true
|
||||
readoutPriority: monitored
|
||||
@@ -179,7 +179,7 @@ dccm_diode_bottom:
|
||||
dccm_theta2:
|
||||
description: Monochromator pitch 2
|
||||
deviceClass: ophyd.EpicsMotor
|
||||
deviceConfig: {prefix: 'X06DA-OP-DCCM:PITCH2'}
|
||||
deviceConfig: {prefix: 'X06DA-OP-DCCM:THETA2'}
|
||||
onFailure: buffer
|
||||
enabled: true
|
||||
readoutPriority: monitored
|
||||
|
||||
@@ -175,76 +175,72 @@ class AerotechAbrStage(PSIDeviceBase, Device):
|
||||
|
||||
NOTE: Zac's request is that stage is essentially ARM, i.e. get ready and don't do anything.
|
||||
"""
|
||||
|
||||
logger.warning(f"Configuring {self.scaninfo.scan_msg.info['scan_name']} on ABR")
|
||||
|
||||
d = {}
|
||||
if self.scan_info.scan_type in ("measure", "measurement", "fly"):
|
||||
# FIXME: I don't care about how we fish out config parameters from scan info
|
||||
scan_args = {
|
||||
**self.scan_info.msg.request_inputs["inputs"],
|
||||
**self.scan_info.msg.request_inputs["kwargs"],
|
||||
**self.scan_info.msg.scan_parameters,
|
||||
}
|
||||
scanname = self.scan_info.scan_msg.info["scan_name"]
|
||||
# FIXME: I don't care about how we fish out config parameters from scan info
|
||||
scan_args = {
|
||||
**self.scan_info.msg.request_inputs["inputs"],
|
||||
**self.scan_info.msg.request_inputs["kwargs"],
|
||||
**self.scan_info.msg.scan_parameters,
|
||||
}
|
||||
scanname = self.scan_info.msg.scan_name
|
||||
|
||||
if scanname in (
|
||||
"standardscan",
|
||||
"helicalscan",
|
||||
"helicalscan1",
|
||||
"helicalscan2",
|
||||
"helicalscan3",
|
||||
):
|
||||
d["scan_command"] = AbrCmd.MEASURE_STANDARD
|
||||
d["var_1"] = scan_args["start"]
|
||||
d["var_2"] = scan_args["range"]
|
||||
d["var_3"] = scan_args["move_time"]
|
||||
d["var_4"] = scan_args.get("ready_rate", 500)
|
||||
d["var_5"] = 0
|
||||
d["var_6"] = 0
|
||||
d["var_7"] = 0
|
||||
# d["var_8"] = 0
|
||||
# d["var_9"] = 0
|
||||
if scanname in ("verticallinescan", "vlinescan"):
|
||||
d["scan_command"] = AbrCmd.VERTICAL_LINE_SCAN
|
||||
d["var_1"] = scan_args["range"] / scan_args["steps"]
|
||||
d["var_2"] = scan_args["steps"]
|
||||
d["var_3"] = scan_args["exp_time"]
|
||||
d["var_4"] = 0
|
||||
d["var_5"] = 0
|
||||
d["var_6"] = 0
|
||||
d["var_7"] = 0
|
||||
# d["var_8"] = 0
|
||||
# d["var_9"] = 0
|
||||
if scanname in ("screeningscan"):
|
||||
d["scan_command"] = AbrCmd.SCREENING
|
||||
d["var_1"] = scan_args["start"]
|
||||
d["var_2"] = scan_args["oscrange"]
|
||||
d["var_3"] = scan_args["exp_time"]
|
||||
d["var_4"] = scan_args["range"] / scan_args["steps"]
|
||||
d["var_5"] = scan_args["steps"]
|
||||
d["var_6"] = scan_args.get("delta", 0.5)
|
||||
d["var_7"] = 0
|
||||
# d["var_8"] = 0
|
||||
# d["var_9"] = 0
|
||||
if scanname in ("rasterscan", "rastersimplescan"):
|
||||
d["scan_command"] = AbrCmd.RASTER_SCAN_SIMPLE
|
||||
d["var_1"] = scan_args["exp_time"]
|
||||
d["var_2"] = scan_args["range_x"] / scan_args["steps_x"]
|
||||
d["var_3"] = scan_args["range_y"] / scan_args["steps_y"]
|
||||
d["var_4"] = scan_args["steps_x"]
|
||||
d["var_5"] = scan_args["steps_y"]
|
||||
d["var_6"] = 0
|
||||
d["var_7"] = 0
|
||||
# d["var_8"] = 0
|
||||
# d["var_9"] = 0
|
||||
if scanname in (
|
||||
"standardscan",
|
||||
"helicalscan",
|
||||
"helicalscan1",
|
||||
"helicalscan2",
|
||||
"helicalscan3",
|
||||
):
|
||||
d["scan_command"] = AbrCmd.MEASURE_STANDARD
|
||||
d["var_1"] = scan_args["start"]
|
||||
d["var_2"] = scan_args["range"]
|
||||
d["var_3"] = scan_args["move_time"]
|
||||
d["var_4"] = scan_args.get("ready_rate", 500)
|
||||
d["var_5"] = 0
|
||||
d["var_6"] = 0
|
||||
d["var_7"] = 0
|
||||
# d["var_8"] = 0
|
||||
# d["var_9"] = 0
|
||||
if scanname in ("verticallinescan", "vlinescan"):
|
||||
d["scan_command"] = AbrCmd.VERTICAL_LINE_SCAN
|
||||
d["var_1"] = scan_args["range"] / scan_args["steps"]
|
||||
d["var_2"] = scan_args["steps"]
|
||||
d["var_3"] = scan_args["exp_time"]
|
||||
d["var_4"] = 0
|
||||
d["var_5"] = 0
|
||||
d["var_6"] = 0
|
||||
d["var_7"] = 0
|
||||
# d["var_8"] = 0
|
||||
# d["var_9"] = 0
|
||||
if scanname in ("screeningscan"):
|
||||
d["scan_command"] = AbrCmd.SCREENING
|
||||
d["var_1"] = scan_args["start"]
|
||||
d["var_2"] = scan_args["oscrange"]
|
||||
d["var_3"] = scan_args["exp_time"]
|
||||
d["var_4"] = scan_args["range"] / scan_args["steps"]
|
||||
d["var_5"] = scan_args["steps"]
|
||||
d["var_6"] = scan_args.get("delta", 0.5)
|
||||
d["var_7"] = 0
|
||||
# d["var_8"] = 0
|
||||
# d["var_9"] = 0
|
||||
if scanname in ("rasterscan", "rastersimplescan"):
|
||||
d["scan_command"] = AbrCmd.RASTER_SCAN_SIMPLE
|
||||
d["var_1"] = scan_args["exp_time"]
|
||||
d["var_2"] = scan_args["range_x"] / scan_args["steps_x"]
|
||||
d["var_3"] = scan_args["range_y"] / scan_args["steps_y"]
|
||||
d["var_4"] = scan_args["steps_x"]
|
||||
d["var_5"] = scan_args["steps_y"]
|
||||
d["var_6"] = 0
|
||||
d["var_7"] = 0
|
||||
# d["var_8"] = 0
|
||||
# d["var_9"] = 0
|
||||
|
||||
# Reconfigure if got a valid scan config
|
||||
if len(d) > 0:
|
||||
self.configure(d)
|
||||
# Reconfigure if got a valid scan config
|
||||
if len(d) > 0:
|
||||
self.configure(d)
|
||||
|
||||
# Stage the ABR stage
|
||||
self.arm()
|
||||
# Stage the ABR stage
|
||||
self.arm()
|
||||
|
||||
def on_unstage(self):
|
||||
"""Unstage the ABR controller"""
|
||||
|
||||
Reference in New Issue
Block a user