Starting to look good

This commit is contained in:
gac-x05la
2025-03-20 17:46:51 +01:00
parent 49e8c64433
commit 32b976f9d6
5 changed files with 89 additions and 114 deletions

View File

@@ -127,6 +127,8 @@ gfcam:
backend_url: 'http://sls-daq-001:8080' backend_url: 'http://sls-daq-001:8080'
auto_soft_enable: true auto_soft_enable: true
std_daq_live: 'tcp://129.129.95.111:20000' std_daq_live: 'tcp://129.129.95.111:20000'
std_daq_ws: 'ws://129.129.95.111:8080'
std_daq_rest: 'http://129.129.95.111:5000'
deviceTags: deviceTags:
- camera - camera
- trigger - trigger

View File

@@ -56,9 +56,9 @@ class GigaFrostBase(Device):
) )
# DAQ parameters # DAQ parameters
file_path = Cpt(Signal, kind=Kind.config, value="") file_path = Cpt(Signal, kind=Kind.config, value="/gpfs/test/test-beamline")
file_prefix = Cpt(Signal, kind=Kind.config, value="") file_prefix = Cpt(Signal, kind=Kind.config, value="scan_")
num_images = Cpt(Signal, kind=Kind.config, value=1) num_images = Cpt(Signal, kind=Kind.config, value=1000)
num_images_counter = Cpt(Signal, kind=Kind.hinted, value=0) num_images_counter = Cpt(Signal, kind=Kind.hinted, value=0)
# GF specific interface # GF specific interface

View File

@@ -16,7 +16,6 @@ from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from tomcat_bec.devices.gigafrost.gigafrost_base import GigaFrostBase from tomcat_bec.devices.gigafrost.gigafrost_base import GigaFrostBase
from tomcat_bec.devices.gigafrost.std_daq_client import ( from tomcat_bec.devices.gigafrost.std_daq_client import (
StdDaqClient, StdDaqClient,
StdDaqConfigPartial,
StdDaqStatus, StdDaqStatus,
) )
@@ -73,7 +72,6 @@ class GigaFrostCamera(PSIDeviceBase, GigaFrostBase):
"arm", "arm",
"disarm", "disarm",
] ]
_initialized = False
# Placeholders for stdDAQ and livestream clients # Placeholders for stdDAQ and livestream clients
backend = None backend = None
@@ -150,12 +148,8 @@ class GigaFrostCamera(PSIDeviceBase, GigaFrostBase):
# Stop acquisition # Stop acquisition
self.disarm() self.disarm()
# if self.backend is not None:
# backend_config = StdDaqConfigPartial(**d)
# self.backend.configure(backend_config)
# If Bluesky style configure # If Bluesky style configure
if d is not None: if d:
# Commonly changed settings # Commonly changed settings
if "exposure_num_burst" in d: if "exposure_num_burst" in d:
self.num_exposures.set(d["exposure_num_burst"]).wait() self.num_exposures.set(d["exposure_num_burst"]).wait()
@@ -181,9 +175,24 @@ class GigaFrostCamera(PSIDeviceBase, GigaFrostBase):
if "acq_mode" in d: if "acq_mode" in d:
self.set_acquisition_mode(d["acq_mode"]) self.set_acquisition_mode(d["acq_mode"])
# Commit parameters # Commit parameters to GigaFrost
self.set_param.set(1).wait() self.set_param.set(1).wait()
# Backend stdDAQ configuration
if d and self.backend is not None:
daq_update = {}
if "image_height" in d:
daq_update['image_pixel_height'] = d["image_height"]
if "image_width" in d:
daq_update['image_pixel_width'] = d["image_width"]
if "bit_depth" in d:
daq_update['bit_depth'] = d["bit_depth"]
if "number_of_writers" in d:
daq_update['number_of_writers'] = d["number_of_writers"]
if daq_update:
self.backend.set_config(daq_update, force=False)
def set_acquisition_mode(self, acq_mode): def set_acquisition_mode(self, acq_mode):
"""Set acquisition mode """Set acquisition mode
@@ -485,6 +494,7 @@ class GigaFrostCamera(PSIDeviceBase, GigaFrostBase):
super().destroy() super().destroy()
def _on_preview_update(self, img:np.ndarray, header: dict): def _on_preview_update(self, img:np.ndarray, header: dict):
"""Send preview stream and update frame index counter"""
self.num_images_counter.put(header['frame'], force=True) self.num_images_counter.put(header['frame'], force=True)
self._run_subs(sub_type=self.SUB_DEVICE_MONITOR_2D, obj=self, value=img) self._run_subs(sub_type=self.SUB_DEVICE_MONITOR_2D, obj=self, value=img)
@@ -570,20 +580,31 @@ class GigaFrostCamera(PSIDeviceBase, GigaFrostBase):
if self.sync_flag.value == 0: if self.sync_flag.value == 0:
self.sync_swhw.set(1).wait() self.sync_swhw.set(1).wait()
# stdDAQ backend parameters
num_points = ( num_points = (
1 1
* scan_args.get("steps", 1) * scan_args.get("steps", 1)
* scan_args.get("exp_burst", 1) * scan_args.get("exp_burst", 1)
* scan_args.get("repeats", 1) * scan_args.get("repeats", 1)
* scan_args.get("burst_at_each_point", 1)
) )
self.num_images.set(num_points).wait() self.num_images.set(num_points).wait()
if "daq_file_path" in scan_args and scan_args["daq_file_path"] is not None:
self.file_path.set(scan_args['daq_file_path']).wait()
if "daq_file_prefix" in scan_args and scan_args["daq_file_prefix"] is not None:
self.file_prefix.set(scan_args['daq_file_prefix']).wait()
if "daq_num_images" in scan_args and scan_args["daq_num_images"] is not None:
self.num_images.set(scan_args['daq_num_images']).wait()
# Start stdDAQ preview
if self.live_preview is not None:
self.live_preview.start()
def on_unstage(self) -> DeviceStatus | None: def on_unstage(self) -> DeviceStatus | None:
"""Called while unstaging the device.""" """Called while unstaging the device."""
# Switch to idle # Switch to idle
self.disarm() self.disarm()
if self.backend is not None: if self.backend is not None:
logger.info(f"StdDaq status on unstage: {self.backend.status}") logger.info(f"StdDaq status before unstage: {self.backend.status}")
self.backend.stop() self.backend.stop()
def on_pre_scan(self) -> DeviceStatus | None: def on_pre_scan(self) -> DeviceStatus | None:
@@ -644,8 +665,8 @@ if __name__ == "__main__":
name="gf2", name="gf2",
backend_url="http://xbl-daq-28:8080", backend_url="http://xbl-daq-28:8080",
auto_soft_enable=True, auto_soft_enable=True,
# std_daq_ws="ws://129.129.95.111:8080", std_daq_ws="ws://129.129.95.111:8080",
# std_daq_rest="http://129.129.95.111:5000", std_daq_rest="http://129.129.95.111:5000",
# std_daq_live='tcp://129.129.95.111:20000', std_daq_live='tcp://129.129.95.111:20000',
) )
gf.wait_for_connection() gf.wait_for_connection()

View File

@@ -3,16 +3,14 @@ from __future__ import annotations
import copy import copy
import enum import enum
import json import json
import queue
import threading import threading
import time import time
import traceback import traceback
from typing import TYPE_CHECKING, Callable, Literal from typing import TYPE_CHECKING
import requests import requests
from bec_lib.logger import bec_logger from bec_lib.logger import bec_logger
from ophyd import StatusBase from ophyd import StatusBase
from pydantic import BaseModel, ConfigDict, Field, model_validator
from typeguard import typechecked from typeguard import typechecked
from websockets import State from websockets import State
from websockets.exceptions import WebSocketException from websockets.exceptions import WebSocketException
@@ -48,88 +46,19 @@ class StdDaqStatus(str, enum.Enum):
WAITING_FOR_FIRST_IMAGE = "waiting_for_first_image" WAITING_FOR_FIRST_IMAGE = "waiting_for_first_image"
class StdDaqConfig(BaseModel):
"""
Configuration for the StdDAQ
"""
detector_name: str
detector_type: str
n_modules: int
bit_depth: int
image_pixel_height: int
image_pixel_width: int
start_udp_port: int
writer_user_id: int
max_number_of_forwarders_spawned: int
use_all_forwarders: bool
module_sync_queue_size: int
number_of_writers: int
module_positions: dict
ram_buffer_gb: float
delay_filter_timeout: float
live_stream_configs: dict[str, dict[Literal["type", "config"], str | list]]
model_config = ConfigDict(extra="ignore")
@model_validator(mode="before")
@classmethod
def resolve_aliases(cls, values):
if "roix" in values:
values["image_pixel_height"] = values.pop("roiy")
if "roiy" in values:
values["image_pixel_width"] = values.pop("roix")
return values
class StdDaqConfigPartial(BaseModel):
"""
Partial configuration for the StdDAQ.
"""
detector_name: str | None = None
detector_type: str | None = None
n_modules: int | None = None
bit_depth: int | None = None
image_pixel_height: int | None = Field(default=None, alias="roiy")
image_pixel_width: int | None = Field(default=None, alias="roix")
start_udp_port: int | None = None
writer_user_id: int | None = None
max_number_of_forwarders_spawned: int | None = None
use_all_forwarders: bool | None = None
module_sync_queue_size: int | None = None
number_of_writers: int | None = None
module_positions: dict | None = None
ram_buffer_gb: float | None = None
delay_filter_timeout: float | None = None
live_stream_configs: dict[str, dict[Literal["type", "config"], str | list]] | None = None
model_config = ConfigDict(extra="ignore")
class StdDaqWsResponse(BaseModel):
"""
Response from the StdDAQ websocket
"""
status: StdDaqStatus
reason: str | None = None
model_config = ConfigDict(extra="allow")
class StdDaqClient: class StdDaqClient:
USER_ACCESS = ["status", "start", "stop", "get_config", "set_config", "reset"] USER_ACCESS = ["status", "start", "stop", "get_config", "set_config", "reset", "_status"]
_ws_client: ws.ClientConnection | None = None _ws_client: ws.ClientConnection | None = None
_status: StdDaqStatus = StdDaqStatus.UNDEFINED _status: StdDaqStatus = StdDaqStatus.UNDEFINED
_status_timestamp: float | None = None
_ws_recv_mutex = threading.Lock() _ws_recv_mutex = threading.Lock()
_ws_update_thread: threading.Thread | None = None _ws_update_thread: threading.Thread | None = None
_shutdown_event = threading.Event() _shutdown_event = threading.Event()
_ws_idle_event = threading.Event() _ws_idle_event = threading.Event()
_daq_is_running = threading.Event() _daq_is_running = threading.Event()
_config: StdDaqConfig | None = None _config: dict | None = None
_status_callbacks: dict[str, tuple[DeviceStatus, list[StdDaqStatus], list[StdDaqStatus]]] = {} _status_callbacks: dict[str, tuple[DeviceStatus, list[StdDaqStatus], list[StdDaqStatus]]] = {}
def __init__(self, parent: Device, ws_url: str, rest_url: str): def __init__(self, parent: Device, ws_url: str, rest_url: str):
@@ -170,7 +99,7 @@ class StdDaqClient:
@typechecked @typechecked
def start( def start(
self, file_path: str, file_prefix: str, num_images: int, timeout: float = 20, wait=True self, file_path: str, file_prefix: str, num_images: int, timeout: float = 20, wait=True
) -> StatusBase: ) -> StatusBase | None:
"""Start acquisition on the StdDAQ. """Start acquisition on the StdDAQ.
Args: Args:
@@ -196,10 +125,11 @@ class StdDaqClient:
self._ws_client.send(json.dumps(message)) self._ws_client.send(json.dumps(message))
if wait: if wait:
status.wait(timeout=timeout) status.wait(timeout=timeout)
return None
return status return status
@typechecked @typechecked
def stop(self, timeout: float = 5, wait=True) -> StatusBase: def stop(self, timeout: float = 5, wait=True, stop_cmd="stop") -> StatusBase | None:
"""Stop acquisition on the StdDAQ. """Stop acquisition on the StdDAQ.
Args: Args:
@@ -213,11 +143,12 @@ class StdDaqClient:
logger.info(f"Stopping StdDaq backend. Current status: {self.status}") logger.info(f"Stopping StdDaq backend. Current status: {self.status}")
status = StatusBase() status = StatusBase()
self.add_status_callback(status, success=["idle"], error=["error"]) self.add_status_callback(status, success=["idle"], error=["error"])
message = {"command": "stop"} message = {"command": stop_cmd}
self._ws_client.send(json.dumps(message)) self._ws_client.send(json.dumps(message))
if wait: if wait:
status.wait(timeout=timeout) status.wait(timeout=timeout)
return None
return status return status
def get_config(self, timeout: float = 2) -> dict: def get_config(self, timeout: float = 2) -> dict:
@@ -244,14 +175,23 @@ class StdDaqClient:
timeout (float): timeout for the request timeout (float): timeout for the request
""" """
old_config = self.get_config() old_config = self.get_config()
new_config = copy.deepcopy(self._config.update(config)) if update else config if update:
cfg = copy.deepcopy(self._config)
cfg.update(config)
new_config = cfg
else:
new_config = config
# Escape unnecesary restarts # Escape unnecesary restarts
if not force and new_config == old_config: if not force and new_config == old_config:
return return
if not new_config:
return
self._pre_restart() self._pre_restart()
# new_jason = json.dumps(new_config)
logger.warning(new_config)
response = requests.post( response = requests.post(
self.rest_url + "/api/config/set", params={"user": "ioc"}, json=new_config, timeout=timeout self.rest_url + "/api/config/set", params={"user": "ioc"}, json=new_config, timeout=timeout
) )
@@ -269,6 +209,7 @@ class StdDaqClient:
def _post_restart(self): def _post_restart(self):
"""Start monitor after a restart""" """Start monitor after a restart"""
time.sleep(2)
self.wait_for_connection() self.wait_for_connection()
self._daq_is_running.set() self._daq_is_running.set()
@@ -354,23 +295,29 @@ class StdDaqClient:
This is a persistent monitor that updates the status and calls attached This is a persistent monitor that updates the status and calls attached
callbacks. It also handles stdDAQ restarts and reconnection by itself. callbacks. It also handles stdDAQ restarts and reconnection by itself.
""" """
while not self._shutdown_event.is_set(): if self._ws_recv_mutex.locked():
self._wait_for_server_running() return
try:
msg = self._ws_client.recv(timeout=0.1) with self._ws_recv_mutex:
except TimeoutError: while not self._shutdown_event.is_set():
continue self._wait_for_server_running()
except WebSocketException: try:
content = traceback.format_exc() msg = self._ws_client.recv(timeout=0.1)
# TODO: this is expected to happen on every reconfiguration msg_timestamp = time.time()
logger.warning(f"Websocket connection closed unexpectedly: {content}") except TimeoutError:
self.wait_for_connection() continue
continue except WebSocketException:
msg = json.loads(msg) content = traceback.format_exc()
if self._status != msg["status"]: # TODO: this is expected to happen on every reconfiguration
logger.info(f"stdDAQ state transition by: {msg}") logger.warning(f"Websocket connection closed unexpectedly: {content}")
self._status = msg["status"] self.wait_for_connection()
self._run_status_callbacks() continue
msg = json.loads(msg)
if self._status != msg["status"]:
logger.info(f"stdDAQ state transition by: {msg}")
self._status = msg["status"]
self._status_timestamp = msg_timestamp
self._run_status_callbacks()
def _run_status_callbacks(self): def _run_status_callbacks(self):
""" """
@@ -399,7 +346,9 @@ class StdDaqClient:
# Automatically connect to microXAS testbench if directly invoked # Automatically connect to microXAS testbench if directly invoked
if __name__ == "__main__": if __name__ == "__main__":
# pylint: disable=disallowed-name,too-few-public-methods
class foo: class foo:
"""Dummy"""
name="bar" name="bar"
daq = StdDaqClient( daq = StdDaqClient(

View File

@@ -13,13 +13,14 @@ ZMQ_TOPIC_FILTER = b""
class StdDaqPreview: class StdDaqPreview:
USER_ACCESS = ["start", "stop", "image"] USER_ACCESS = ["start", "stop", "image", "frameno"]
_socket = None _socket = None
_zmq_thread = None _zmq_thread = None
_monitor_mutex = threading.Lock() _monitor_mutex = threading.Lock()
_shutdown_event = threading.Event() _shutdown_event = threading.Event()
_throttle = 0.2 _throttle = 0.2
image = None image = None
frameno = None
def __init__(self, url: str, cb: Callable): def __init__(self, url: str, cb: Callable):
self.url = url self.url = url
@@ -68,8 +69,7 @@ class StdDaqPreview:
with self._monitor_mutex: with self._monitor_mutex:
# Open a new connection # Open a new connection
if self._socket is None: self.connect()
self.connect()
try: try:
# Run the monitor loop # Run the monitor loop
@@ -93,6 +93,7 @@ class StdDaqPreview:
finally: finally:
# Stop receiving incoming data # Stop receiving incoming data
self._socket.close() self._socket.close()
logger.warning("Detached live_preview monitoring")
def _parse_data(self, data): def _parse_data(self, data):
# Length and throtling checks # Length and throtling checks
@@ -119,4 +120,6 @@ class StdDaqPreview:
f"Live update: frame {header['frame']}\tShape: {header['shape']}\t" f"Live update: frame {header['frame']}\tShape: {header['shape']}\t"
f"Mean: {np.mean(image):.3f}" f"Mean: {np.mean(image):.3f}"
) )
self.image = image
self.frameno = header['frame']
self._on_update_callback(image, header) self._on_update_callback(image, header)