diff --git a/tomcat_bec/device_configs/microxas_test_bed.yaml b/tomcat_bec/device_configs/microxas_test_bed.yaml index 67d74e3..85b159b 100644 --- a/tomcat_bec/device_configs/microxas_test_bed.yaml +++ b/tomcat_bec/device_configs/microxas_test_bed.yaml @@ -127,6 +127,8 @@ gfcam: backend_url: 'http://sls-daq-001:8080' auto_soft_enable: true std_daq_live: 'tcp://129.129.95.111:20000' + std_daq_ws: 'ws://129.129.95.111:8080' + std_daq_rest: 'http://129.129.95.111:5000' deviceTags: - camera - trigger diff --git a/tomcat_bec/devices/gigafrost/gigafrost_base.py b/tomcat_bec/devices/gigafrost/gigafrost_base.py index 3bc105f..bfe6a94 100644 --- a/tomcat_bec/devices/gigafrost/gigafrost_base.py +++ b/tomcat_bec/devices/gigafrost/gigafrost_base.py @@ -56,9 +56,9 @@ class GigaFrostBase(Device): ) # DAQ parameters - file_path = Cpt(Signal, kind=Kind.config, value="") - file_prefix = Cpt(Signal, kind=Kind.config, value="") - num_images = Cpt(Signal, kind=Kind.config, value=1) + file_path = Cpt(Signal, kind=Kind.config, value="/gpfs/test/test-beamline") + file_prefix = Cpt(Signal, kind=Kind.config, value="scan_") + num_images = Cpt(Signal, kind=Kind.config, value=1000) num_images_counter = Cpt(Signal, kind=Kind.hinted, value=0) # GF specific interface diff --git a/tomcat_bec/devices/gigafrost/gigafrostcamera.py b/tomcat_bec/devices/gigafrost/gigafrostcamera.py index 86ef637..3fa4082 100644 --- a/tomcat_bec/devices/gigafrost/gigafrostcamera.py +++ b/tomcat_bec/devices/gigafrost/gigafrostcamera.py @@ -16,7 +16,6 @@ from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase from tomcat_bec.devices.gigafrost.gigafrost_base import GigaFrostBase from tomcat_bec.devices.gigafrost.std_daq_client import ( StdDaqClient, - StdDaqConfigPartial, StdDaqStatus, ) @@ -73,7 +72,6 @@ class GigaFrostCamera(PSIDeviceBase, GigaFrostBase): "arm", "disarm", ] - _initialized = False # Placeholders for stdDAQ and livestream clients backend = None @@ -150,12 +148,8 @@ class GigaFrostCamera(PSIDeviceBase, GigaFrostBase): # Stop acquisition self.disarm() - # if self.backend is not None: - # backend_config = StdDaqConfigPartial(**d) - # self.backend.configure(backend_config) - # If Bluesky style configure - if d is not None: + if d: # Commonly changed settings if "exposure_num_burst" in d: self.num_exposures.set(d["exposure_num_burst"]).wait() @@ -181,9 +175,24 @@ class GigaFrostCamera(PSIDeviceBase, GigaFrostBase): if "acq_mode" in d: self.set_acquisition_mode(d["acq_mode"]) - # Commit parameters + # Commit parameters to GigaFrost self.set_param.set(1).wait() + # Backend stdDAQ configuration + if d and self.backend is not None: + daq_update = {} + if "image_height" in d: + daq_update['image_pixel_height'] = d["image_height"] + if "image_width" in d: + daq_update['image_pixel_width'] = d["image_width"] + if "bit_depth" in d: + daq_update['bit_depth'] = d["bit_depth"] + if "number_of_writers" in d: + daq_update['number_of_writers'] = d["number_of_writers"] + + if daq_update: + self.backend.set_config(daq_update, force=False) + def set_acquisition_mode(self, acq_mode): """Set acquisition mode @@ -485,6 +494,7 @@ class GigaFrostCamera(PSIDeviceBase, GigaFrostBase): super().destroy() def _on_preview_update(self, img:np.ndarray, header: dict): + """Send preview stream and update frame index counter""" self.num_images_counter.put(header['frame'], force=True) self._run_subs(sub_type=self.SUB_DEVICE_MONITOR_2D, obj=self, value=img) @@ -570,20 +580,31 @@ class GigaFrostCamera(PSIDeviceBase, GigaFrostBase): if self.sync_flag.value == 0: self.sync_swhw.set(1).wait() + # stdDAQ backend parameters num_points = ( 1 * scan_args.get("steps", 1) * scan_args.get("exp_burst", 1) * scan_args.get("repeats", 1) + * scan_args.get("burst_at_each_point", 1) ) self.num_images.set(num_points).wait() + if "daq_file_path" in scan_args and scan_args["daq_file_path"] is not None: + self.file_path.set(scan_args['daq_file_path']).wait() + if "daq_file_prefix" in scan_args and scan_args["daq_file_prefix"] is not None: + self.file_prefix.set(scan_args['daq_file_prefix']).wait() + if "daq_num_images" in scan_args and scan_args["daq_num_images"] is not None: + self.num_images.set(scan_args['daq_num_images']).wait() + # Start stdDAQ preview + if self.live_preview is not None: + self.live_preview.start() def on_unstage(self) -> DeviceStatus | None: """Called while unstaging the device.""" # Switch to idle self.disarm() if self.backend is not None: - logger.info(f"StdDaq status on unstage: {self.backend.status}") + logger.info(f"StdDaq status before unstage: {self.backend.status}") self.backend.stop() def on_pre_scan(self) -> DeviceStatus | None: @@ -644,8 +665,8 @@ if __name__ == "__main__": name="gf2", backend_url="http://xbl-daq-28:8080", auto_soft_enable=True, - # std_daq_ws="ws://129.129.95.111:8080", - # std_daq_rest="http://129.129.95.111:5000", - # std_daq_live='tcp://129.129.95.111:20000', + std_daq_ws="ws://129.129.95.111:8080", + std_daq_rest="http://129.129.95.111:5000", + std_daq_live='tcp://129.129.95.111:20000', ) gf.wait_for_connection() diff --git a/tomcat_bec/devices/gigafrost/std_daq_client.py b/tomcat_bec/devices/gigafrost/std_daq_client.py index 3ba5610..d72a502 100644 --- a/tomcat_bec/devices/gigafrost/std_daq_client.py +++ b/tomcat_bec/devices/gigafrost/std_daq_client.py @@ -3,16 +3,14 @@ from __future__ import annotations import copy import enum import json -import queue import threading import time import traceback -from typing import TYPE_CHECKING, Callable, Literal +from typing import TYPE_CHECKING import requests from bec_lib.logger import bec_logger from ophyd import StatusBase -from pydantic import BaseModel, ConfigDict, Field, model_validator from typeguard import typechecked from websockets import State from websockets.exceptions import WebSocketException @@ -48,88 +46,19 @@ class StdDaqStatus(str, enum.Enum): WAITING_FOR_FIRST_IMAGE = "waiting_for_first_image" -class StdDaqConfig(BaseModel): - """ - Configuration for the StdDAQ - """ - - detector_name: str - detector_type: str - n_modules: int - bit_depth: int - image_pixel_height: int - image_pixel_width: int - start_udp_port: int - writer_user_id: int - max_number_of_forwarders_spawned: int - use_all_forwarders: bool - module_sync_queue_size: int - number_of_writers: int - module_positions: dict - ram_buffer_gb: float - delay_filter_timeout: float - live_stream_configs: dict[str, dict[Literal["type", "config"], str | list]] - - model_config = ConfigDict(extra="ignore") - - @model_validator(mode="before") - @classmethod - def resolve_aliases(cls, values): - if "roix" in values: - values["image_pixel_height"] = values.pop("roiy") - if "roiy" in values: - values["image_pixel_width"] = values.pop("roix") - return values - - -class StdDaqConfigPartial(BaseModel): - """ - Partial configuration for the StdDAQ. - """ - - detector_name: str | None = None - detector_type: str | None = None - n_modules: int | None = None - bit_depth: int | None = None - image_pixel_height: int | None = Field(default=None, alias="roiy") - image_pixel_width: int | None = Field(default=None, alias="roix") - start_udp_port: int | None = None - writer_user_id: int | None = None - max_number_of_forwarders_spawned: int | None = None - use_all_forwarders: bool | None = None - module_sync_queue_size: int | None = None - number_of_writers: int | None = None - module_positions: dict | None = None - ram_buffer_gb: float | None = None - delay_filter_timeout: float | None = None - live_stream_configs: dict[str, dict[Literal["type", "config"], str | list]] | None = None - - model_config = ConfigDict(extra="ignore") - - -class StdDaqWsResponse(BaseModel): - """ - Response from the StdDAQ websocket - """ - - status: StdDaqStatus - reason: str | None = None - - model_config = ConfigDict(extra="allow") - - class StdDaqClient: - USER_ACCESS = ["status", "start", "stop", "get_config", "set_config", "reset"] + USER_ACCESS = ["status", "start", "stop", "get_config", "set_config", "reset", "_status"] _ws_client: ws.ClientConnection | None = None _status: StdDaqStatus = StdDaqStatus.UNDEFINED + _status_timestamp: float | None = None _ws_recv_mutex = threading.Lock() _ws_update_thread: threading.Thread | None = None _shutdown_event = threading.Event() _ws_idle_event = threading.Event() _daq_is_running = threading.Event() - _config: StdDaqConfig | None = None + _config: dict | None = None _status_callbacks: dict[str, tuple[DeviceStatus, list[StdDaqStatus], list[StdDaqStatus]]] = {} def __init__(self, parent: Device, ws_url: str, rest_url: str): @@ -170,7 +99,7 @@ class StdDaqClient: @typechecked def start( self, file_path: str, file_prefix: str, num_images: int, timeout: float = 20, wait=True - ) -> StatusBase: + ) -> StatusBase | None: """Start acquisition on the StdDAQ. Args: @@ -196,10 +125,11 @@ class StdDaqClient: self._ws_client.send(json.dumps(message)) if wait: status.wait(timeout=timeout) + return None return status @typechecked - def stop(self, timeout: float = 5, wait=True) -> StatusBase: + def stop(self, timeout: float = 5, wait=True, stop_cmd="stop") -> StatusBase | None: """Stop acquisition on the StdDAQ. Args: @@ -213,11 +143,12 @@ class StdDaqClient: logger.info(f"Stopping StdDaq backend. Current status: {self.status}") status = StatusBase() self.add_status_callback(status, success=["idle"], error=["error"]) - message = {"command": "stop"} + message = {"command": stop_cmd} self._ws_client.send(json.dumps(message)) if wait: status.wait(timeout=timeout) + return None return status def get_config(self, timeout: float = 2) -> dict: @@ -244,14 +175,23 @@ class StdDaqClient: timeout (float): timeout for the request """ old_config = self.get_config() - new_config = copy.deepcopy(self._config.update(config)) if update else config + if update: + cfg = copy.deepcopy(self._config) + cfg.update(config) + new_config = cfg + else: + new_config = config # Escape unnecesary restarts if not force and new_config == old_config: return + if not new_config: + return self._pre_restart() + # new_jason = json.dumps(new_config) + logger.warning(new_config) response = requests.post( self.rest_url + "/api/config/set", params={"user": "ioc"}, json=new_config, timeout=timeout ) @@ -269,6 +209,7 @@ class StdDaqClient: def _post_restart(self): """Start monitor after a restart""" + time.sleep(2) self.wait_for_connection() self._daq_is_running.set() @@ -354,23 +295,29 @@ class StdDaqClient: This is a persistent monitor that updates the status and calls attached callbacks. It also handles stdDAQ restarts and reconnection by itself. """ - while not self._shutdown_event.is_set(): - self._wait_for_server_running() - try: - msg = self._ws_client.recv(timeout=0.1) - except TimeoutError: - continue - except WebSocketException: - content = traceback.format_exc() - # TODO: this is expected to happen on every reconfiguration - logger.warning(f"Websocket connection closed unexpectedly: {content}") - self.wait_for_connection() - continue - msg = json.loads(msg) - if self._status != msg["status"]: - logger.info(f"stdDAQ state transition by: {msg}") - self._status = msg["status"] - self._run_status_callbacks() + if self._ws_recv_mutex.locked(): + return + + with self._ws_recv_mutex: + while not self._shutdown_event.is_set(): + self._wait_for_server_running() + try: + msg = self._ws_client.recv(timeout=0.1) + msg_timestamp = time.time() + except TimeoutError: + continue + except WebSocketException: + content = traceback.format_exc() + # TODO: this is expected to happen on every reconfiguration + logger.warning(f"Websocket connection closed unexpectedly: {content}") + self.wait_for_connection() + continue + msg = json.loads(msg) + if self._status != msg["status"]: + logger.info(f"stdDAQ state transition by: {msg}") + self._status = msg["status"] + self._status_timestamp = msg_timestamp + self._run_status_callbacks() def _run_status_callbacks(self): """ @@ -399,7 +346,9 @@ class StdDaqClient: # Automatically connect to microXAS testbench if directly invoked if __name__ == "__main__": + # pylint: disable=disallowed-name,too-few-public-methods class foo: + """Dummy""" name="bar" daq = StdDaqClient( diff --git a/tomcat_bec/devices/gigafrost/std_daq_preview.py b/tomcat_bec/devices/gigafrost/std_daq_preview.py index 6ea8a80..fcb1696 100644 --- a/tomcat_bec/devices/gigafrost/std_daq_preview.py +++ b/tomcat_bec/devices/gigafrost/std_daq_preview.py @@ -13,13 +13,14 @@ ZMQ_TOPIC_FILTER = b"" class StdDaqPreview: - USER_ACCESS = ["start", "stop", "image"] + USER_ACCESS = ["start", "stop", "image", "frameno"] _socket = None _zmq_thread = None _monitor_mutex = threading.Lock() _shutdown_event = threading.Event() _throttle = 0.2 image = None + frameno = None def __init__(self, url: str, cb: Callable): self.url = url @@ -68,8 +69,7 @@ class StdDaqPreview: with self._monitor_mutex: # Open a new connection - if self._socket is None: - self.connect() + self.connect() try: # Run the monitor loop @@ -93,6 +93,7 @@ class StdDaqPreview: finally: # Stop receiving incoming data self._socket.close() + logger.warning("Detached live_preview monitoring") def _parse_data(self, data): # Length and throtling checks @@ -119,4 +120,6 @@ class StdDaqPreview: f"Live update: frame {header['frame']}\tShape: {header['shape']}\t" f"Mean: {np.mean(image):.3f}" ) + self.image = image + self.frameno = header['frame'] self._on_update_callback(image, header)