diff --git a/pxiii_bec/bec_widgets/auto_updates.py b/pxiii_bec/bec_widgets/auto_updates.py index ccecd00..003bd31 100644 --- a/pxiii_bec/bec_widgets/auto_updates.py +++ b/pxiii_bec/bec_widgets/auto_updates.py @@ -1,100 +1,71 @@ -from bec_widgets.cli.auto_updates import AutoUpdates, ScanInfo +from bec_widgets.widgets.containers.auto_update.auto_updates import AutoUpdates + +from bec_lib.messages import ScanStatusMessage from bec_widgets.cli.rpc.rpc_base import RPCResponseTimeoutError class PlotUpdate(AutoUpdates): - create_default_dock = True - enabled = True - _scan_msg = None - def do_update(self, msg): - """Save the original scan message for future use""" - self._scan_msg = msg - return super().do_update(msg) + ####################################################################### + ################# GUI Callbacks ####################################### + ####################################################################### - # def simple_line_scan(self, info: ScanInfo) -> None: - # """ - # Simple line scan. - # """ - # dev_x = info.scan_report_devices[0] - # dev_y = self.get_selected_device(info.monitored_devices, self.figure.selected_device) - # if not dev_y: - # return - # self.figure.clear_all() - # plt = self.figure.plot(dev_x, dev_y) - # plt.set(title=f"PXIII: Scan {info.scan_number}", x_label=dev_x, y_label=dev_y) - - def plot_handler(self, info: ScanInfo) -> None: - """Simple keyword handler for 'plot' - - This simple keyword handler looks for the keyword 'plot' in the scan arguments. - This allows the user to explictly specify the desired data source. Useful for alignment - scans. + def on_start(self) -> None: """ - print(info.scan_report_devices) + Procedure to run when the auto updates are enabled. + """ + self.start_default_dock() - dev_x = info.scan_report_devices[0] - if "kwargs" in self._scan_msg.info: - det = self._scan_msg.info["kwargs"].get("plot", None) - else: - det = None - if not det: - return + def on_stop(self) -> None: + """ + Procedure to run when the auto updates are disabled. + """ - plt1 = self.get_default_figure() - # clear_all() will throw RPCResponseTimeoutError - try: - plt1.clear_all() - except RPCResponseTimeoutError: - pass + def on_scan_open(self, msg: ScanStatusMessage) -> None: + """ + Procedure to run when a scan starts. - # plot() will throw RPCResponseTimeoutError - try: - if len(info.scan_report_devices) == 2: - # 2D plot - dev_x = info.scan_report_devices[0] - dev_y = info.scan_report_devices[1] - plt1.plot( + Args: + msg (ScanStatusMessage): The scan status message. + """ + if msg.scan_name == "line_scan" and msg.scan_report_devices: + return self.simple_line_scan(msg) + if msg.scan_name == "grid_scan" and msg.scan_report_devices: + return self.simple_grid_scan(msg) + + dev_x = msg.scan_report_devices[0] + if "kwargs" in msg.request_inputs: + dev_y = msg.request_inputs["kwargs"].get("plot", None) + if dev_y is not None: + # Set the dock to the waveform widget + wf = self.set_dock_to_widget("Waveform") + + # Clear the waveform widget and plot the data + wf.clear_all() + wf.plot( x_name=dev_x, y_name=dev_y, - z_name=det, - title=f"Scan {info.scan_number}", + label=f"Scan {msg.info.scan_number} - {dev_y}", + title=f"Scan {msg.info.scan_number}", x_label=dev_x, y_label=dev_y, - z_label=det, ) - elif len(info.scan_report_devices) == 1: - # 1D plot - dev_x = info.scan_report_devices[0] - plt1.plot( - x_name=dev_x, - y_name=det, - title=f"Scan {info.scan_number}", - x_label=dev_x, - y_label=det, - ) - else: - # Default is 1D - dev_x = info.scan_report_devices[0] - plt1.plot( - x_name=dev_x, - y_name=det, - title=f"Scan {info.scan_number}", - x_label=dev_x, - y_label=det, - ) - except RPCResponseTimeoutError: - pass - # plt1.add_dap(dev_x, det, dap="LinearModel") + elif msg.scan_report_devices: + return self.best_effort(msg) + return None - def handler(self, info: ScanInfo) -> None: - """Dock configuration handler""" - # EXAMPLES: - # if info.scan_name == "line_scan" and info.scan_report_devices: - # self.simple_line_scan(info) - # return - # if info.scan_name == "grid_scan" and info.scan_report_devices: - # self.run_grid_scan_update(info) - # return - super().handler(info) - self.plot_handler(info) + def on_scan_closed(self, msg: ScanStatusMessage) -> None: + """ + Procedure to run when a scan ends. + + Args: + msg (ScanStatusMessage): The scan status message. + """ + + def on_scan_abort(self, msg: ScanStatusMessage) -> None: + """ + Procedure to run when a scan is aborted. + + Args: + msg (ScanStatusMessage): The scan status message. + """ \ No newline at end of file diff --git a/pxiii_bec/device_configs/x06da_device_config.yaml b/pxiii_bec/device_configs/x06da_device_config.yaml index 3221db5..5f2ecb4 100644 --- a/pxiii_bec/device_configs/x06da_device_config.yaml +++ b/pxiii_bec/device_configs/x06da_device_config.yaml @@ -152,7 +152,7 @@ fi1_try: dccm_theta1: description: Monochromator pitch 1 deviceClass: ophyd.EpicsMotor - deviceConfig: {prefix: 'X06DA-OP-DCCM:PITCH1'} + deviceConfig: {prefix: 'X06DA-OP-DCCM:THETA1'} onFailure: buffer enabled: true readoutPriority: monitored @@ -179,7 +179,7 @@ dccm_diode_bottom: dccm_theta2: description: Monochromator pitch 2 deviceClass: ophyd.EpicsMotor - deviceConfig: {prefix: 'X06DA-OP-DCCM:PITCH2'} + deviceConfig: {prefix: 'X06DA-OP-DCCM:THETA2'} onFailure: buffer enabled: true readoutPriority: monitored diff --git a/pxiii_bec/devices/A3200.py b/pxiii_bec/devices/A3200.py index 1241d1c..4e9941e 100644 --- a/pxiii_bec/devices/A3200.py +++ b/pxiii_bec/devices/A3200.py @@ -175,76 +175,72 @@ class AerotechAbrStage(PSIDeviceBase, Device): NOTE: Zac's request is that stage is essentially ARM, i.e. get ready and don't do anything. """ - - logger.warning(f"Configuring {self.scaninfo.scan_msg.info['scan_name']} on ABR") - d = {} - if self.scan_info.scan_type in ("measure", "measurement", "fly"): - # FIXME: I don't care about how we fish out config parameters from scan info - scan_args = { - **self.scan_info.msg.request_inputs["inputs"], - **self.scan_info.msg.request_inputs["kwargs"], - **self.scan_info.msg.scan_parameters, - } - scanname = self.scan_info.scan_msg.info["scan_name"] + # FIXME: I don't care about how we fish out config parameters from scan info + scan_args = { + **self.scan_info.msg.request_inputs["inputs"], + **self.scan_info.msg.request_inputs["kwargs"], + **self.scan_info.msg.scan_parameters, + } + scanname = self.scan_info.msg.scan_name - if scanname in ( - "standardscan", - "helicalscan", - "helicalscan1", - "helicalscan2", - "helicalscan3", - ): - d["scan_command"] = AbrCmd.MEASURE_STANDARD - d["var_1"] = scan_args["start"] - d["var_2"] = scan_args["range"] - d["var_3"] = scan_args["move_time"] - d["var_4"] = scan_args.get("ready_rate", 500) - d["var_5"] = 0 - d["var_6"] = 0 - d["var_7"] = 0 - # d["var_8"] = 0 - # d["var_9"] = 0 - if scanname in ("verticallinescan", "vlinescan"): - d["scan_command"] = AbrCmd.VERTICAL_LINE_SCAN - d["var_1"] = scan_args["range"] / scan_args["steps"] - d["var_2"] = scan_args["steps"] - d["var_3"] = scan_args["exp_time"] - d["var_4"] = 0 - d["var_5"] = 0 - d["var_6"] = 0 - d["var_7"] = 0 - # d["var_8"] = 0 - # d["var_9"] = 0 - if scanname in ("screeningscan"): - d["scan_command"] = AbrCmd.SCREENING - d["var_1"] = scan_args["start"] - d["var_2"] = scan_args["oscrange"] - d["var_3"] = scan_args["exp_time"] - d["var_4"] = scan_args["range"] / scan_args["steps"] - d["var_5"] = scan_args["steps"] - d["var_6"] = scan_args.get("delta", 0.5) - d["var_7"] = 0 - # d["var_8"] = 0 - # d["var_9"] = 0 - if scanname in ("rasterscan", "rastersimplescan"): - d["scan_command"] = AbrCmd.RASTER_SCAN_SIMPLE - d["var_1"] = scan_args["exp_time"] - d["var_2"] = scan_args["range_x"] / scan_args["steps_x"] - d["var_3"] = scan_args["range_y"] / scan_args["steps_y"] - d["var_4"] = scan_args["steps_x"] - d["var_5"] = scan_args["steps_y"] - d["var_6"] = 0 - d["var_7"] = 0 - # d["var_8"] = 0 - # d["var_9"] = 0 + if scanname in ( + "standardscan", + "helicalscan", + "helicalscan1", + "helicalscan2", + "helicalscan3", + ): + d["scan_command"] = AbrCmd.MEASURE_STANDARD + d["var_1"] = scan_args["start"] + d["var_2"] = scan_args["range"] + d["var_3"] = scan_args["move_time"] + d["var_4"] = scan_args.get("ready_rate", 500) + d["var_5"] = 0 + d["var_6"] = 0 + d["var_7"] = 0 + # d["var_8"] = 0 + # d["var_9"] = 0 + if scanname in ("verticallinescan", "vlinescan"): + d["scan_command"] = AbrCmd.VERTICAL_LINE_SCAN + d["var_1"] = scan_args["range"] / scan_args["steps"] + d["var_2"] = scan_args["steps"] + d["var_3"] = scan_args["exp_time"] + d["var_4"] = 0 + d["var_5"] = 0 + d["var_6"] = 0 + d["var_7"] = 0 + # d["var_8"] = 0 + # d["var_9"] = 0 + if scanname in ("screeningscan"): + d["scan_command"] = AbrCmd.SCREENING + d["var_1"] = scan_args["start"] + d["var_2"] = scan_args["oscrange"] + d["var_3"] = scan_args["exp_time"] + d["var_4"] = scan_args["range"] / scan_args["steps"] + d["var_5"] = scan_args["steps"] + d["var_6"] = scan_args.get("delta", 0.5) + d["var_7"] = 0 + # d["var_8"] = 0 + # d["var_9"] = 0 + if scanname in ("rasterscan", "rastersimplescan"): + d["scan_command"] = AbrCmd.RASTER_SCAN_SIMPLE + d["var_1"] = scan_args["exp_time"] + d["var_2"] = scan_args["range_x"] / scan_args["steps_x"] + d["var_3"] = scan_args["range_y"] / scan_args["steps_y"] + d["var_4"] = scan_args["steps_x"] + d["var_5"] = scan_args["steps_y"] + d["var_6"] = 0 + d["var_7"] = 0 + # d["var_8"] = 0 + # d["var_9"] = 0 - # Reconfigure if got a valid scan config - if len(d) > 0: - self.configure(d) + # Reconfigure if got a valid scan config + if len(d) > 0: + self.configure(d) - # Stage the ABR stage - self.arm() + # Stage the ABR stage + self.arm() def on_unstage(self): """Unstage the ABR controller"""