diff --git a/tomcat_bec/devices/gigafrost/gigafrostcamera.py b/tomcat_bec/devices/gigafrost/gigafrostcamera.py index 29b7363..86ef637 100644 --- a/tomcat_bec/devices/gigafrost/gigafrostcamera.py +++ b/tomcat_bec/devices/gigafrost/gigafrostcamera.py @@ -525,9 +525,6 @@ class GigaFrostCamera(PSIDeviceBase, GigaFrostBase): """ # Perform a full initialization of the GigaFrost self.initialize_gigafrost() - # Connect to the stdDAQ backend - if self.backend is not None: - self.backend.connect() def on_stage(self) -> DeviceStatus | None: """ diff --git a/tomcat_bec/devices/gigafrost/std_daq_client.py b/tomcat_bec/devices/gigafrost/std_daq_client.py index 06e76f3..3ba5610 100644 --- a/tomcat_bec/devices/gigafrost/std_daq_client.py +++ b/tomcat_bec/devices/gigafrost/std_daq_client.py @@ -16,7 +16,7 @@ from pydantic import BaseModel, ConfigDict, Field, model_validator from typeguard import typechecked from websockets import State from websockets.exceptions import WebSocketException -from websockets.sync.client import ClientConnection, connect +import websockets.sync.client as ws if TYPE_CHECKING: # pragma: no cover from ophyd import Device, DeviceStatus @@ -122,25 +122,29 @@ class StdDaqClient: USER_ACCESS = ["status", "start", "stop", "get_config", "set_config", "reset"] - _ws_client: ClientConnection | None = None + _ws_client: ws.ClientConnection | None = None _status: StdDaqStatus = StdDaqStatus.UNDEFINED + _ws_recv_mutex = threading.Lock() _ws_update_thread: threading.Thread | None = None _shutdown_event = threading.Event() _ws_idle_event = threading.Event() _daq_is_running = threading.Event() _config: StdDaqConfig | None = None + _status_callbacks: dict[str, tuple[DeviceStatus, list[StdDaqStatus], list[StdDaqStatus]]] = {} def __init__(self, parent: Device, ws_url: str, rest_url: str): self.parent = parent self.ws_url = ws_url self.rest_url = rest_url - - self._status_callbacks: dict[ - str, tuple[DeviceStatus, list[StdDaqStatus], list[StdDaqStatus]] - ] = {} - self._send_queue = queue.Queue() self._daq_is_running.set() + # Connect to WS interface and start status monitoring + self.wait_for_connection() + self._ws_monitor_thread = threading.Thread( + target=self._ws_monitor_loop, name=f"{self.parent.name}_stddaq_ws_monitor", daemon=True + ) + self._ws_monitor_thread.start() + @property def status(self) -> StdDaqStatus: """ @@ -167,50 +171,62 @@ class StdDaqClient: def start( self, file_path: str, file_prefix: str, num_images: int, timeout: float = 20, wait=True ) -> StatusBase: - """ - Start acquisition on the StdDAQ. + """Start acquisition on the StdDAQ. Args: file_path (str): path to save the files file_prefix (str): prefix of the files num_images (int): number of images to acquire timeout (float): timeout for the request + Returns: + status (StatusBase): Ophyd status object with attached monitor """ + # Ensure connection + self.wait_for_connection() + logger.info(f"Starting StdDaq backend. Current status: {self.status}") status = StatusBase() - self.add_status_callback(status, success=["waiting_for_first_image"], error=[]) + self.add_status_callback(status, success=["waiting_for_first_image"], error=["rejected"]) message = { "command": "start", "path": file_path, "file_prefix": file_prefix, "n_image": num_images, } - self._send_queue.put(message) + self._ws_client.send(json.dumps(message)) if wait: - return status.wait(timeout=timeout) - + status.wait(timeout=timeout) return status - def stop(self): - """ - Stop acquisition on the StdDAQ. + @typechecked + def stop(self, timeout: float = 5, wait=True) -> StatusBase: + """Stop acquisition on the StdDAQ. Args: timeout (float): timeout for the request + Returns: + status (StatusBase): Ophyd status object with attached monitor """ + # Ensure connection + self.wait_for_connection() + + logger.info(f"Stopping StdDaq backend. Current status: {self.status}") + status = StatusBase() + self.add_status_callback(status, success=["idle"], error=["error"]) message = {"command": "stop"} - return self._send_queue.put(message) + + self._ws_client.send(json.dumps(message)) + if wait: + status.wait(timeout=timeout) + return status def get_config(self, timeout: float = 2) -> dict: - """ - Get the current configuration of the StdDAQ. + """Get the current configuration of the StdDAQ. Args: - cached (bool): whether to use the cached configuration timeout (float): timeout for the request - Returns: - StdDaqConfig: configuration of the StdDAQ + config (dict): configuration of the StdDAQ """ response = requests.get( self.rest_url + "/api/config/get", params={"user": "ioc"}, timeout=timeout @@ -219,7 +235,7 @@ class StdDaqClient: self._config = response.json() return self._config - def set_config(self, config: dict, timeout: float = 2, update: bool = True) -> None: + def set_config(self, config: dict, timeout: float = 2, update: bool = True, force: bool=True) -> None: """ Set the configuration of the StdDAQ. This will overwrite the current configuration. @@ -227,16 +243,17 @@ class StdDaqClient: config (StdDaqConfig | dict): configuration to set timeout (float): timeout for the request """ - if self._config is None: - self.get_config() - if update: - self._config.update(config) - config = copy.deepcopy(self._config) + old_config = self.get_config() + new_config = copy.deepcopy(self._config.update(config)) if update else config + + # Escape unnecesary restarts + if not force and new_config == old_config: + return self._pre_restart() response = requests.post( - self.rest_url + "/api/config/set", params={"user": "ioc"}, json=config, timeout=timeout + self.rest_url + "/api/config/set", params={"user": "ioc"}, json=new_config, timeout=timeout ) response.raise_for_status() @@ -277,7 +294,7 @@ class StdDaqClient: if self._ws_client is not None and self._ws_client.state == State.OPEN: return try: - self._ws_client = connect(self.ws_url) + self._ws_client = ws.connect(self.ws_url) break except ConnectionRefusedError as exc: if time.time() - start_time > timeout: @@ -310,16 +327,6 @@ class StdDaqClient: ) response.raise_for_status() - def connect(self): - """ - Connect to the StdDAQ. This method should be called after the client is created. It will - launch a background thread to exchange data with the StdDAQ. - """ - self._ws_update_thread = threading.Thread( - target=self._ws_update_loop, name=f"{self.parent.name}_stddaq_ws_loop", daemon=True - ) - self._ws_update_thread.start() - def shutdown(self): """ Shutdown the StdDAQ client. @@ -341,55 +348,36 @@ class StdDaqClient: break self._ws_idle_event.set() - def _ws_send_and_receive(self): - if not self._ws_client: - self.wait_for_connection() - try: - try: - msg = self._send_queue.get(block=False) - logger.trace(f"Sending to stddaq ws: {msg}") - self._ws_client.send(json.dumps(msg)) - logger.trace(f"Sent to stddaq ws: {msg}") - except queue.Empty: - pass - try: - recv_msgs = self._ws_client.recv(timeout=0.1) - except TimeoutError: - return - logger.trace(f"Received from stddaq ws: {recv_msgs}") - if recv_msgs is not None: - self._on_received_ws_message(recv_msgs) - except WebSocketException: - content = traceback.format_exc() - logger.warning(f"Websocket connection closed unexpectedly: {content}") - self.wait_for_connection() + def _ws_monitor_loop(self): + """Loop to update the status property of the StdDAQ. - def _ws_update_loop(self): - """ - Loop to update the status property of the StdDAQ. + This is a persistent monitor that updates the status and calls attached + callbacks. It also handles stdDAQ restarts and reconnection by itself. """ while not self._shutdown_event.is_set(): self._wait_for_server_running() - self._ws_send_and_receive() - - def _on_received_ws_message(self, msg: str): - """ - Handle a message received from the StdDAQ. - """ - try: - data = StdDaqWsResponse(**json.loads(msg)) - except Exception: - content = traceback.format_exc() - logger.warning(f"Failed to decode websocket message: {content}") - return - self._status = data.status - self._run_status_callbacks() + try: + msg = self._ws_client.recv(timeout=0.1) + except TimeoutError: + continue + except WebSocketException: + content = traceback.format_exc() + # TODO: this is expected to happen on every reconfiguration + logger.warning(f"Websocket connection closed unexpectedly: {content}") + self.wait_for_connection() + continue + msg = json.loads(msg) + if self._status != msg["status"]: + logger.info(f"stdDAQ state transition by: {msg}") + self._status = msg["status"] + self._run_status_callbacks() def _run_status_callbacks(self): """ Update the DeviceStatus objects based on the current status of the StdDAQ. - If the status matches one of the success or error statuses, the DeviceStatus object will be set to finished - or exception, respectively and removed from the list of callbacks. + If the status matches one of the success or error statuses, the DeviceStatus + object will be set to finished or exception, respectively and removed from + the list of callbacks. """ status = self._status @@ -406,3 +394,14 @@ class StdDaqClient: for cb in completed_callbacks: self._status_callbacks.pop(id(cb)) + + + +# Automatically connect to microXAS testbench if directly invoked +if __name__ == "__main__": + class foo: + name="bar" + + daq = StdDaqClient( + parent=foo(), ws_url='ws://129.129.95.111:8080', rest_url='http://129.129.95.111:5000' + )