From 24ff8a252e537b09a454e44e24aac9b62a25b19c Mon Sep 17 00:00:00 2001 From: wakonig_k Date: Thu, 16 Jan 2025 17:01:14 +0100 Subject: [PATCH] feat(scan_status): added support for request inputs and scan parameters --- bec_lib/bec_lib/messages.py | 4 +-- .../bec_server/scan_server/scan_assembler.py | 29 +++++++++++++++++++ .../bec_server/scan_server/scan_worker.py | 7 +++-- bec_server/bec_server/scan_server/scans.py | 23 +++++++++++---- 4 files changed, 53 insertions(+), 10 deletions(-) diff --git a/bec_lib/bec_lib/messages.py b/bec_lib/bec_lib/messages.py index 47612c9c..249d78a8 100644 --- a/bec_lib/bec_lib/messages.py +++ b/bec_lib/bec_lib/messages.py @@ -167,7 +167,7 @@ class ScanStatusMessage(BECMessage): user_metadata (dict, optional): User metadata readout_priority (dict[Literal["monitored", "baseline", "async", "continuous", "on_request"], list[str]], optional): Readout priority scan_parameters (dict[Literal["exp_time", "frames_per_trigger", "settling_time", "readout_time"] | str, Any], optional): Scan parameters such as exposure time, frames per trigger, settling time, readout time - scan_input (dict[Literal["arg_bundles", "inputs", "kwargs"], Any], optional): Scan input + request_inputs (dict[Literal["arg_bundle", "inputs", "kwargs"], Any], optional): Scan input info (dict): Dictionary containing additional information about the scan timestamp (float, optional): Timestamp of the message. Defaults to time.time() @@ -197,7 +197,7 @@ class ScanStatusMessage(BECMessage): dict[Literal["exp_time", "frames_per_trigger", "settling_time", "readout_time"] | str, Any] | None ) = None - scan_input: dict[Literal["arg_bundles", "inputs", "kwargs"], Any] | None = None + request_inputs: dict[Literal["arg_bundle", "inputs", "kwargs"], Any] | None = None info: dict timestamp: float = Field(default_factory=time.time) diff --git a/bec_server/bec_server/scan_server/scan_assembler.py b/bec_server/bec_server/scan_server/scan_assembler.py index 02918d89..87c95c9e 100644 --- a/bec_server/bec_server/scan_server/scan_assembler.py +++ b/bec_server/bec_server/scan_server/scan_assembler.py @@ -1,5 +1,6 @@ from __future__ import annotations +import inspect import traceback from typing import TYPE_CHECKING @@ -64,6 +65,33 @@ class ScanAssembler: try: args = unpack_scan_args(msg.content.get("parameter", {}).get("args", [])) kwargs = msg.content.get("parameter", {}).get("kwargs", {}) + + cls_input_args = [ + name + for name, val in inspect.signature(scan_cls).parameters.items() + if val.default == inspect.Parameter.empty and name != "kwargs" + ] + + request_inputs = {} + if scan_cls.arg_bundle_size["bundle"] > 0: + request_inputs["arg_bundle"] = args + request_inputs["inputs"] = {} + request_inputs["kwargs"] = kwargs + else: + request_inputs["arg_bundle"] = [] + request_inputs["inputs"] = {} + request_inputs["kwargs"] = {} + for ii, key in enumerate(args): + request_inputs["inputs"][cls_input_args[ii]] = key + + for key in kwargs: + if key in cls_input_args: + request_inputs["inputs"][key] = kwargs[key] + + for key, val in kwargs.items(): + if key not in cls_input_args: + request_inputs["kwargs"][key] = val + scan_instance = scan_cls( *args, device_manager=self.device_manager, @@ -71,6 +99,7 @@ class ScanAssembler: metadata=msg.metadata, instruction_handler=self.parent.queue_manager.instruction_handler, scan_id=scan_id, + request_inputs=request_inputs, **kwargs, ) return scan_instance diff --git a/bec_server/bec_server/scan_server/scan_worker.py b/bec_server/bec_server/scan_server/scan_worker.py index e99a7864..ffd9284c 100644 --- a/bec_server/bec_server/scan_server/scan_worker.py +++ b/bec_server/bec_server/scan_server/scan_worker.py @@ -188,10 +188,11 @@ class ScanWorker(threading.Thread): "frames_per_trigger": active_rb.scan.frames_per_trigger, "settling_time": active_rb.scan.settling_time, "readout_time": active_rb.scan.readout_time, - "acquisition_config": active_rb.scan.acquisition_config, "scan_report_devices": active_rb.scan.scan_report_devices, "monitor_sync": active_rb.scan.monitor_sync, "num_points": num_points, + "scan_parameters": active_rb.scan.scan_parameters, + "request_inputs": active_rb.scan.request_inputs, } ) self.current_scan_info["scan_msgs"] = [ @@ -252,9 +253,11 @@ class ScanWorker(threading.Thread): user_metadata=self.current_scan_info.get("user_metadata"), readout_priority=self.current_scan_info.get("readout_priority"), scan_parameters=self.current_scan_info.get("scan_parameters"), - scan_input=self.current_scan_info.get("scan_input"), + request_inputs=self.current_scan_info.get("request_inputs"), info=self.current_scan_info, ) + if msg.readout_priority != self.current_scan_info.get("readout_priority"): + raise RuntimeError("Readout priority mismatch") expire = None if status in ["open", "paused"] else 1800 pipe = self.device_manager.connector.pipeline() self.device_manager.connector.set( diff --git a/bec_server/bec_server/scan_server/scans.py b/bec_server/bec_server/scan_server/scans.py index 15e7c05c..e634faea 100644 --- a/bec_server/bec_server/scan_server/scans.py +++ b/bec_server/bec_server/scan_server/scans.py @@ -205,6 +205,7 @@ class RequestBase(ABC): instruction_handler: InstructionHandler = None, scan_id: str = None, return_to_start: bool = False, + request_inputs: dict = None, **kwargs, ) -> None: super().__init__() @@ -239,6 +240,7 @@ class RequestBase(ABC): device_msg_callback=self.device_msg_metadata, shutdown_event=self._shutdown_event, ) + self.request_inputs = request_inputs @property def scan_report_devices(self): @@ -369,7 +371,6 @@ class ScanBase(RequestBase, PathOptimizerMixin): parameter: dict = None, exp_time: float = 0, readout_time: float = 0, - acquisition_config: dict = None, settling_time: float = 0, relative: bool = False, burst_at_each_point: int = 1, @@ -393,7 +394,6 @@ class ScanBase(RequestBase, PathOptimizerMixin): self.point_id = 0 self.exp_time = exp_time self.readout_time = readout_time - self.acquisition_config = acquisition_config self.settling_time = settling_time self.relative = relative self.burst_at_each_point = burst_at_each_point @@ -414,10 +414,21 @@ class ScanBase(RequestBase, PathOptimizerMixin): if self.scan_name == "": raise ValueError("scan_name cannot be empty") - if acquisition_config is None or "default" not in acquisition_config: - self.acquisition_config = { - "default": {"exp_time": self.exp_time, "readout_time": self.readout_time} - } + self.scan_parameters = { + "exp_time": self.exp_time, + "frames_per_trigger": self.frames_per_trigger, + "settling_time": self.settling_time, + "readout_time": self.readout_time, + "optim_trajectory": self.optim_trajectory, + "return_to_start": self.return_to_start, + "relative": self.relative, + } + + self.scan_parameters.update(**kwargs) + self.scan_parameters.pop("device_manager", None) + self.scan_parameters.pop("instruction_handler", None) + self.scan_parameters.pop("scan_id", None) + self.scan_parameters.pop("request_inputs", None) @property def monitor_sync(self):