From 20dcd1849a9e104f60404ee6e7b4e44cfe54260c Mon Sep 17 00:00:00 2001 From: gac-x05la Date: Wed, 19 Mar 2025 09:52:34 +0100 Subject: [PATCH] WIP --- .../devices/gigafrost/std_daq_client.py | 87 +++++++------------ 1 file changed, 31 insertions(+), 56 deletions(-) diff --git a/tomcat_bec/devices/gigafrost/std_daq_client.py b/tomcat_bec/devices/gigafrost/std_daq_client.py index e28e655..06e76f3 100644 --- a/tomcat_bec/devices/gigafrost/std_daq_client.py +++ b/tomcat_bec/devices/gigafrost/std_daq_client.py @@ -122,17 +122,19 @@ class StdDaqClient: USER_ACCESS = ["status", "start", "stop", "get_config", "set_config", "reset"] + _ws_client: ClientConnection | None = None + _status: StdDaqStatus = StdDaqStatus.UNDEFINED + _ws_update_thread: threading.Thread | None = None + _shutdown_event = threading.Event() + _ws_idle_event = threading.Event() + _daq_is_running = threading.Event() + _config: StdDaqConfig | None = None + def __init__(self, parent: Device, ws_url: str, rest_url: str): self.parent = parent self.ws_url = ws_url self.rest_url = rest_url - self.ws_client: ClientConnection | None = None - self._status: StdDaqStatus = StdDaqStatus.UNDEFINED - self._ws_update_thread: threading.Thread | None = None - self._shutdown_event = threading.Event() - self._ws_idle_event = threading.Event() - self._daq_is_running = threading.Event() - self._config: StdDaqConfig | None = None + self._status_callbacks: dict[ str, tuple[DeviceStatus, list[StdDaqStatus], list[StdDaqStatus]] ] = {} @@ -199,7 +201,7 @@ class StdDaqClient: message = {"command": "stop"} return self._send_queue.put(message) - def get_config(self, cached=False, timeout: float = 2) -> dict: + def get_config(self, timeout: float = 2) -> dict: """ Get the current configuration of the StdDAQ. @@ -210,16 +212,14 @@ class StdDaqClient: Returns: StdDaqConfig: configuration of the StdDAQ """ - if cached and self._config is not None: - return self._config response = requests.get( self.rest_url + "/api/config/get", params={"user": "ioc"}, timeout=timeout ) response.raise_for_status() - self._config = StdDaqConfig(**response.json()) - return self._config.model_dump() + self._config = response.json() + return self._config - def set_config(self, config: StdDaqConfig | dict, timeout: float = 2) -> None: + def set_config(self, config: dict, timeout: float = 2, update: bool = True) -> None: """ Set the configuration of the StdDAQ. This will overwrite the current configuration. @@ -227,20 +227,16 @@ class StdDaqClient: config (StdDaqConfig | dict): configuration to set timeout (float): timeout for the request """ - if not isinstance(config, StdDaqConfig): - config = StdDaqConfig(**config) - - out = config.model_dump(exclude_none=True) - if not out: - logger.info( - "The provided config does not contain relevant values for the StdDaq. Skipping set_config." - ) - return + if self._config is None: + self.get_config() + if update: + self._config.update(config) + config = copy.deepcopy(self._config) self._pre_restart() response = requests.post( - self.rest_url + "/api/config/set", params={"user": "ioc"}, json=out, timeout=timeout + self.rest_url + "/api/config/set", params={"user": "ioc"}, json=config, timeout=timeout ) response.raise_for_status() @@ -248,38 +244,17 @@ class StdDaqClient: self._post_restart() def _pre_restart(self): + """Stop monitor before restart""" self._daq_is_running.clear() self._ws_idle_event.wait() - if self.ws_client is not None: - self.ws_client.close() + if self._ws_client is not None: + self._ws_client.close() def _post_restart(self): + """Start monitor after a restart""" self.wait_for_connection() self._daq_is_running.set() - def update_config(self, config: StdDaqConfigPartial | dict, timeout: float = 2) -> None: - """ - Update the configuration of the StdDAQ. This will update the current configuration. - - Args: - config (StdDaqConfigPartial | dict): configuration to update - timeout (float): timeout for the request - """ - if not isinstance(config, StdDaqConfigPartial): - config = StdDaqConfigPartial(**config) - - patch_config_dict = config.model_dump(exclude_none=True) - if not patch_config_dict: - return - - current_config = copy.deepcopy(self.get_config()) - new_config = copy.deepcopy(current_config) - new_config.update(patch_config_dict) - if current_config == new_config: - return - - self.set_config(StdDaqConfig(**new_config), timeout=timeout) - def reset(self, min_wait: float = 5) -> None: """ Reset the StdDAQ. @@ -299,10 +274,10 @@ class StdDaqClient: """ start_time = time.time() while True: - if self.ws_client is not None and self.ws_client.state == State.OPEN: + if self._ws_client is not None and self._ws_client.state == State.OPEN: return try: - self.ws_client = connect(self.ws_url) + self._ws_client = connect(self.ws_url) break except ConnectionRefusedError as exc: if time.time() - start_time > timeout: @@ -351,9 +326,9 @@ class StdDaqClient: """ if self._ws_update_thread is not None: self._ws_update_thread.join() - if self.ws_client is not None: - self.ws_client.close() - self.ws_client = None + if self._ws_client is not None: + self._ws_client.close() + self._ws_client = None def _wait_for_server_running(self): """ @@ -367,18 +342,18 @@ class StdDaqClient: self._ws_idle_event.set() def _ws_send_and_receive(self): - if not self.ws_client: + if not self._ws_client: self.wait_for_connection() try: try: msg = self._send_queue.get(block=False) logger.trace(f"Sending to stddaq ws: {msg}") - self.ws_client.send(json.dumps(msg)) + self._ws_client.send(json.dumps(msg)) logger.trace(f"Sent to stddaq ws: {msg}") except queue.Empty: pass try: - recv_msgs = self.ws_client.recv(timeout=0.1) + recv_msgs = self._ws_client.recv(timeout=0.1) except TimeoutError: return logger.trace(f"Received from stddaq ws: {recv_msgs}")