Starting to look good

This commit is contained in:
gac-x05la
2025-03-20 17:46:51 +01:00
parent 49e8c64433
commit 32b976f9d6
5 changed files with 89 additions and 114 deletions

View File

@@ -127,6 +127,8 @@ gfcam:
backend_url: 'http://sls-daq-001:8080'
auto_soft_enable: true
std_daq_live: 'tcp://129.129.95.111:20000'
std_daq_ws: 'ws://129.129.95.111:8080'
std_daq_rest: 'http://129.129.95.111:5000'
deviceTags:
- camera
- trigger

View File

@@ -56,9 +56,9 @@ class GigaFrostBase(Device):
)
# DAQ parameters
file_path = Cpt(Signal, kind=Kind.config, value="")
file_prefix = Cpt(Signal, kind=Kind.config, value="")
num_images = Cpt(Signal, kind=Kind.config, value=1)
file_path = Cpt(Signal, kind=Kind.config, value="/gpfs/test/test-beamline")
file_prefix = Cpt(Signal, kind=Kind.config, value="scan_")
num_images = Cpt(Signal, kind=Kind.config, value=1000)
num_images_counter = Cpt(Signal, kind=Kind.hinted, value=0)
# GF specific interface

View File

@@ -16,7 +16,6 @@ from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from tomcat_bec.devices.gigafrost.gigafrost_base import GigaFrostBase
from tomcat_bec.devices.gigafrost.std_daq_client import (
StdDaqClient,
StdDaqConfigPartial,
StdDaqStatus,
)
@@ -73,7 +72,6 @@ class GigaFrostCamera(PSIDeviceBase, GigaFrostBase):
"arm",
"disarm",
]
_initialized = False
# Placeholders for stdDAQ and livestream clients
backend = None
@@ -150,12 +148,8 @@ class GigaFrostCamera(PSIDeviceBase, GigaFrostBase):
# Stop acquisition
self.disarm()
# if self.backend is not None:
# backend_config = StdDaqConfigPartial(**d)
# self.backend.configure(backend_config)
# If Bluesky style configure
if d is not None:
if d:
# Commonly changed settings
if "exposure_num_burst" in d:
self.num_exposures.set(d["exposure_num_burst"]).wait()
@@ -181,9 +175,24 @@ class GigaFrostCamera(PSIDeviceBase, GigaFrostBase):
if "acq_mode" in d:
self.set_acquisition_mode(d["acq_mode"])
# Commit parameters
# Commit parameters to GigaFrost
self.set_param.set(1).wait()
# Backend stdDAQ configuration
if d and self.backend is not None:
daq_update = {}
if "image_height" in d:
daq_update['image_pixel_height'] = d["image_height"]
if "image_width" in d:
daq_update['image_pixel_width'] = d["image_width"]
if "bit_depth" in d:
daq_update['bit_depth'] = d["bit_depth"]
if "number_of_writers" in d:
daq_update['number_of_writers'] = d["number_of_writers"]
if daq_update:
self.backend.set_config(daq_update, force=False)
def set_acquisition_mode(self, acq_mode):
"""Set acquisition mode
@@ -485,6 +494,7 @@ class GigaFrostCamera(PSIDeviceBase, GigaFrostBase):
super().destroy()
def _on_preview_update(self, img:np.ndarray, header: dict):
"""Send preview stream and update frame index counter"""
self.num_images_counter.put(header['frame'], force=True)
self._run_subs(sub_type=self.SUB_DEVICE_MONITOR_2D, obj=self, value=img)
@@ -570,20 +580,31 @@ class GigaFrostCamera(PSIDeviceBase, GigaFrostBase):
if self.sync_flag.value == 0:
self.sync_swhw.set(1).wait()
# stdDAQ backend parameters
num_points = (
1
* scan_args.get("steps", 1)
* scan_args.get("exp_burst", 1)
* scan_args.get("repeats", 1)
* scan_args.get("burst_at_each_point", 1)
)
self.num_images.set(num_points).wait()
if "daq_file_path" in scan_args and scan_args["daq_file_path"] is not None:
self.file_path.set(scan_args['daq_file_path']).wait()
if "daq_file_prefix" in scan_args and scan_args["daq_file_prefix"] is not None:
self.file_prefix.set(scan_args['daq_file_prefix']).wait()
if "daq_num_images" in scan_args and scan_args["daq_num_images"] is not None:
self.num_images.set(scan_args['daq_num_images']).wait()
# Start stdDAQ preview
if self.live_preview is not None:
self.live_preview.start()
def on_unstage(self) -> DeviceStatus | None:
"""Called while unstaging the device."""
# Switch to idle
self.disarm()
if self.backend is not None:
logger.info(f"StdDaq status on unstage: {self.backend.status}")
logger.info(f"StdDaq status before unstage: {self.backend.status}")
self.backend.stop()
def on_pre_scan(self) -> DeviceStatus | None:
@@ -644,8 +665,8 @@ if __name__ == "__main__":
name="gf2",
backend_url="http://xbl-daq-28:8080",
auto_soft_enable=True,
# std_daq_ws="ws://129.129.95.111:8080",
# std_daq_rest="http://129.129.95.111:5000",
# std_daq_live='tcp://129.129.95.111:20000',
std_daq_ws="ws://129.129.95.111:8080",
std_daq_rest="http://129.129.95.111:5000",
std_daq_live='tcp://129.129.95.111:20000',
)
gf.wait_for_connection()

View File

@@ -3,16 +3,14 @@ from __future__ import annotations
import copy
import enum
import json
import queue
import threading
import time
import traceback
from typing import TYPE_CHECKING, Callable, Literal
from typing import TYPE_CHECKING
import requests
from bec_lib.logger import bec_logger
from ophyd import StatusBase
from pydantic import BaseModel, ConfigDict, Field, model_validator
from typeguard import typechecked
from websockets import State
from websockets.exceptions import WebSocketException
@@ -48,88 +46,19 @@ class StdDaqStatus(str, enum.Enum):
WAITING_FOR_FIRST_IMAGE = "waiting_for_first_image"
class StdDaqConfig(BaseModel):
"""
Configuration for the StdDAQ
"""
detector_name: str
detector_type: str
n_modules: int
bit_depth: int
image_pixel_height: int
image_pixel_width: int
start_udp_port: int
writer_user_id: int
max_number_of_forwarders_spawned: int
use_all_forwarders: bool
module_sync_queue_size: int
number_of_writers: int
module_positions: dict
ram_buffer_gb: float
delay_filter_timeout: float
live_stream_configs: dict[str, dict[Literal["type", "config"], str | list]]
model_config = ConfigDict(extra="ignore")
@model_validator(mode="before")
@classmethod
def resolve_aliases(cls, values):
if "roix" in values:
values["image_pixel_height"] = values.pop("roiy")
if "roiy" in values:
values["image_pixel_width"] = values.pop("roix")
return values
class StdDaqConfigPartial(BaseModel):
"""
Partial configuration for the StdDAQ.
"""
detector_name: str | None = None
detector_type: str | None = None
n_modules: int | None = None
bit_depth: int | None = None
image_pixel_height: int | None = Field(default=None, alias="roiy")
image_pixel_width: int | None = Field(default=None, alias="roix")
start_udp_port: int | None = None
writer_user_id: int | None = None
max_number_of_forwarders_spawned: int | None = None
use_all_forwarders: bool | None = None
module_sync_queue_size: int | None = None
number_of_writers: int | None = None
module_positions: dict | None = None
ram_buffer_gb: float | None = None
delay_filter_timeout: float | None = None
live_stream_configs: dict[str, dict[Literal["type", "config"], str | list]] | None = None
model_config = ConfigDict(extra="ignore")
class StdDaqWsResponse(BaseModel):
"""
Response from the StdDAQ websocket
"""
status: StdDaqStatus
reason: str | None = None
model_config = ConfigDict(extra="allow")
class StdDaqClient:
USER_ACCESS = ["status", "start", "stop", "get_config", "set_config", "reset"]
USER_ACCESS = ["status", "start", "stop", "get_config", "set_config", "reset", "_status"]
_ws_client: ws.ClientConnection | None = None
_status: StdDaqStatus = StdDaqStatus.UNDEFINED
_status_timestamp: float | None = None
_ws_recv_mutex = threading.Lock()
_ws_update_thread: threading.Thread | None = None
_shutdown_event = threading.Event()
_ws_idle_event = threading.Event()
_daq_is_running = threading.Event()
_config: StdDaqConfig | None = None
_config: dict | None = None
_status_callbacks: dict[str, tuple[DeviceStatus, list[StdDaqStatus], list[StdDaqStatus]]] = {}
def __init__(self, parent: Device, ws_url: str, rest_url: str):
@@ -170,7 +99,7 @@ class StdDaqClient:
@typechecked
def start(
self, file_path: str, file_prefix: str, num_images: int, timeout: float = 20, wait=True
) -> StatusBase:
) -> StatusBase | None:
"""Start acquisition on the StdDAQ.
Args:
@@ -196,10 +125,11 @@ class StdDaqClient:
self._ws_client.send(json.dumps(message))
if wait:
status.wait(timeout=timeout)
return None
return status
@typechecked
def stop(self, timeout: float = 5, wait=True) -> StatusBase:
def stop(self, timeout: float = 5, wait=True, stop_cmd="stop") -> StatusBase | None:
"""Stop acquisition on the StdDAQ.
Args:
@@ -213,11 +143,12 @@ class StdDaqClient:
logger.info(f"Stopping StdDaq backend. Current status: {self.status}")
status = StatusBase()
self.add_status_callback(status, success=["idle"], error=["error"])
message = {"command": "stop"}
message = {"command": stop_cmd}
self._ws_client.send(json.dumps(message))
if wait:
status.wait(timeout=timeout)
return None
return status
def get_config(self, timeout: float = 2) -> dict:
@@ -244,14 +175,23 @@ class StdDaqClient:
timeout (float): timeout for the request
"""
old_config = self.get_config()
new_config = copy.deepcopy(self._config.update(config)) if update else config
if update:
cfg = copy.deepcopy(self._config)
cfg.update(config)
new_config = cfg
else:
new_config = config
# Escape unnecesary restarts
if not force and new_config == old_config:
return
if not new_config:
return
self._pre_restart()
# new_jason = json.dumps(new_config)
logger.warning(new_config)
response = requests.post(
self.rest_url + "/api/config/set", params={"user": "ioc"}, json=new_config, timeout=timeout
)
@@ -269,6 +209,7 @@ class StdDaqClient:
def _post_restart(self):
"""Start monitor after a restart"""
time.sleep(2)
self.wait_for_connection()
self._daq_is_running.set()
@@ -354,23 +295,29 @@ class StdDaqClient:
This is a persistent monitor that updates the status and calls attached
callbacks. It also handles stdDAQ restarts and reconnection by itself.
"""
while not self._shutdown_event.is_set():
self._wait_for_server_running()
try:
msg = self._ws_client.recv(timeout=0.1)
except TimeoutError:
continue
except WebSocketException:
content = traceback.format_exc()
# TODO: this is expected to happen on every reconfiguration
logger.warning(f"Websocket connection closed unexpectedly: {content}")
self.wait_for_connection()
continue
msg = json.loads(msg)
if self._status != msg["status"]:
logger.info(f"stdDAQ state transition by: {msg}")
self._status = msg["status"]
self._run_status_callbacks()
if self._ws_recv_mutex.locked():
return
with self._ws_recv_mutex:
while not self._shutdown_event.is_set():
self._wait_for_server_running()
try:
msg = self._ws_client.recv(timeout=0.1)
msg_timestamp = time.time()
except TimeoutError:
continue
except WebSocketException:
content = traceback.format_exc()
# TODO: this is expected to happen on every reconfiguration
logger.warning(f"Websocket connection closed unexpectedly: {content}")
self.wait_for_connection()
continue
msg = json.loads(msg)
if self._status != msg["status"]:
logger.info(f"stdDAQ state transition by: {msg}")
self._status = msg["status"]
self._status_timestamp = msg_timestamp
self._run_status_callbacks()
def _run_status_callbacks(self):
"""
@@ -399,7 +346,9 @@ class StdDaqClient:
# Automatically connect to microXAS testbench if directly invoked
if __name__ == "__main__":
# pylint: disable=disallowed-name,too-few-public-methods
class foo:
"""Dummy"""
name="bar"
daq = StdDaqClient(

View File

@@ -13,13 +13,14 @@ ZMQ_TOPIC_FILTER = b""
class StdDaqPreview:
USER_ACCESS = ["start", "stop", "image"]
USER_ACCESS = ["start", "stop", "image", "frameno"]
_socket = None
_zmq_thread = None
_monitor_mutex = threading.Lock()
_shutdown_event = threading.Event()
_throttle = 0.2
image = None
frameno = None
def __init__(self, url: str, cb: Callable):
self.url = url
@@ -68,8 +69,7 @@ class StdDaqPreview:
with self._monitor_mutex:
# Open a new connection
if self._socket is None:
self.connect()
self.connect()
try:
# Run the monitor loop
@@ -93,6 +93,7 @@ class StdDaqPreview:
finally:
# Stop receiving incoming data
self._socket.close()
logger.warning("Detached live_preview monitoring")
def _parse_data(self, data):
# Length and throtling checks
@@ -119,4 +120,6 @@ class StdDaqPreview:
f"Live update: frame {header['frame']}\tShape: {header['shape']}\t"
f"Mean: {np.mean(image):.3f}"
)
self.image = image
self.frameno = header['frame']
self._on_update_callback(image, header)