This commit is contained in:
gac-x05la
2025-04-30 13:31:10 +02:00
parent 5d5eef11f6
commit f688cfca0f
3 changed files with 39 additions and 29 deletions

View File

@@ -9,16 +9,17 @@ import numpy as np
from ophyd.status import SubscriptionStatus, DeviceStatus from ophyd.status import SubscriptionStatus, DeviceStatus
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from bec_lib.logger import bec_logger
from tomcat_bec.devices.gigafrost.pcoedge_base import PcoEdgeBase from tomcat_bec.devices.gigafrost.pcoedge_base import PcoEdgeBase
from tomcat_bec.devices.gigafrost.std_daq_preview import StdDaqPreview from tomcat_bec.devices.gigafrost.std_daq_preview import StdDaqPreview
from tomcat_bec.devices.gigafrost.std_daq_client import StdDaqClient, StdDaqStatus from tomcat_bec.devices.gigafrost.std_daq_client import StdDaqClient, StdDaqStatus
from bec_lib.logger import bec_logger
logger = bec_logger.logger logger = bec_logger.logger
# pylint: disable=too-many-instance-attributes
class PcoEdge5M(PSIDeviceBase, PcoEdgeBase): class PcoEdge5M(PSIDeviceBase, PcoEdgeBase):
"""Ophyd baseclass for Helge camera IOCs """Ophyd baseclass for Helge camera IOCs
@@ -55,19 +56,13 @@ class PcoEdge5M(PSIDeviceBase, PcoEdgeBase):
destructive. to prevent this, we don't have EPICS preview destructive. to prevent this, we don't have EPICS preview
""" """
# pylint: disable=too-many-instance-attributes USER_ACCESS = ["complete", "backend", "live_preview", "arm", "disarm"]
USER_ACCESS = [
"complete",
"backend",
"live_preview",
"arm",
"disarm",
]
# Placeholders for stdDAQ and livestream clients # Placeholders for stdDAQ and livestream clients
backend = None backend = None
live_preview = None live_preview = None
# pylint: disable=too-many-arguments
def __init__( def __init__(
self, self,
prefix="", prefix="",
@@ -106,7 +101,7 @@ class PcoEdge5M(PSIDeviceBase, PcoEdgeBase):
else: else:
logger.error("No stdDAQ stream address provided, launching without preview!") logger.error("No stdDAQ stream address provided, launching without preview!")
def configure(self, d: dict = {}) -> tuple: def configure(self, d: dict = None) -> tuple:
"""Configure the base Helge camera device """Configure the base Helge camera device
Parameters as 'd' dictionary Parameters as 'd' dictionary
@@ -215,7 +210,7 @@ class PcoEdge5M(PSIDeviceBase, PcoEdgeBase):
self.acquire.set("Idle").wait() self.acquire.set("Idle").wait()
# Data streaming is stopped by setting the max index to 0 # Data streaming is stopped by setting the max index to 0
# FIXME: This might interrupt data transfer # FIXME: This will interrupt data transfer
self.file_savestop.set(0).wait() self.file_savestop.set(0).wait()
def destroy(self): def destroy(self):
@@ -226,7 +221,7 @@ class PcoEdge5M(PSIDeviceBase, PcoEdgeBase):
def _on_preview_update(self, img: np.ndarray, header: dict): def _on_preview_update(self, img: np.ndarray, header: dict):
"""Send preview stream and update frame index counter""" """Send preview stream and update frame index counter"""
# FIXME: There's also a recorded images counter provided by the stdDAQ writer # FIXME: There's also a recorded images counter provided by the stdDAQ writer
self.num_images_counter.put(header["frame"], force=True) self.num_images_counter.put(header["frame"], force=True)
self._run_subs(sub_type=self.SUB_DEVICE_MONITOR_2D, obj=self, value=img) self._run_subs(sub_type=self.SUB_DEVICE_MONITOR_2D, obj=self, value=img)
@@ -280,16 +275,16 @@ class PcoEdge5M(PSIDeviceBase, PcoEdgeBase):
d["exposure_time_ms"] = scan_args["exp_time"] d["exposure_time_ms"] = scan_args["exp_time"]
if "exp_period" in scan_args and scan_args["exp_period"] is not None: if "exp_period" in scan_args and scan_args["exp_period"] is not None:
d["exposure_period_ms"] = scan_args["exp_period"] d["exposure_period_ms"] = scan_args["exp_period"]
if 'exp_burst' in scan_args and scan_args['exp_burst'] is not None: if "exp_burst" in scan_args and scan_args["exp_burst"] is not None:
d['exposure_num_burst'] = scan_args['exp_burst'] d["exposure_num_burst"] = scan_args["exp_burst"]
if "acq_time" in scan_args and scan_args["acq_time"] is not None: if "acq_time" in scan_args and scan_args["acq_time"] is not None:
d["exposure_time_ms"] = scan_args["acq_time"] d["exposure_time_ms"] = scan_args["acq_time"]
if "acq_period" in scan_args and scan_args["acq_period"] is not None: if "acq_period" in scan_args and scan_args["acq_period"] is not None:
d["exposure_period_ms"] = scan_args["acq_period"] d["exposure_period_ms"] = scan_args["acq_period"]
if 'acq_burst' in scan_args and scan_args['acq_burst'] is not None: if "acq_burst" in scan_args and scan_args["acq_burst"] is not None:
d['exposure_num_burst'] = scan_args['acq_burst'] d["exposure_num_burst"] = scan_args["acq_burst"]
if 'acq_mode' in scan_args and scan_args['acq_mode'] is not None: if "acq_mode" in scan_args and scan_args["acq_mode"] is not None:
d['acq_mode'] = scan_args['acq_mode'] d["acq_mode"] = scan_args["acq_mode"]
# elif self.scaninfo.scan_type == "step": # elif self.scaninfo.scan_type == "step":
# d['acq_mode'] = "default" # d['acq_mode'] = "default"
if "pco_store_mode" in scan_args and scan_args["pco_store_mode"] is not None: if "pco_store_mode" in scan_args and scan_args["pco_store_mode"] is not None:
@@ -361,6 +356,7 @@ class PcoEdge5M(PSIDeviceBase, PcoEdgeBase):
# Wait until the buffer fills up with enough images # Wait until the buffer fills up with enough images
t_expected = (self.acquire_time.get() + self.acquire_delay.get()) * self.file_savestop.get() t_expected = (self.acquire_time.get() + self.acquire_delay.get()) * self.file_savestop.get()
def wait_acquisition(*, value, timestamp, **_): def wait_acquisition(*, value, timestamp, **_):
num_target = self.file_savestop.get() num_target = self.file_savestop.get()
# logger.warning(f"{value} of {num_target}") # logger.warning(f"{value} of {num_target}")
@@ -381,7 +377,7 @@ class PcoEdge5M(PSIDeviceBase, PcoEdgeBase):
# against values from the previous cycle, i.e. pass automatically. # against values from the previous cycle, i.e. pass automatically.
t_start = time.time() t_start = time.time()
def wait_sending(*, old_value, value, timestamp, **kwargs): def wait_sending(*, old_value, value, timestamp, **_):
t_elapsed = timestamp - t_start t_elapsed = timestamp - t_start
# logger.warning(f"{old_value}\t{value}\t{t_elapsed}") # logger.warning(f"{old_value}\t{value}\t{t_elapsed}")
return old_value == 1 and value == 0 and t_elapsed > 0 return old_value == 1 and value == 0 and t_elapsed > 0

View File

@@ -46,7 +46,9 @@ class StdDaqStatus(str, enum.Enum):
WAITING_FOR_FIRST_IMAGE = "waiting_for_first_image" WAITING_FOR_FIRST_IMAGE = "waiting_for_first_image"
# pylint: disable=too-many-instance-attributes
class StdDaqClient: class StdDaqClient:
"""Standalone stdDAQ client class"""
USER_ACCESS = ["status", "count", "start", "stop", "get_config", "set_config", "reset"] USER_ACCESS = ["status", "count", "start", "stop", "get_config", "set_config", "reset"]
@@ -93,9 +95,10 @@ class StdDaqClient:
self, status: DeviceStatus, success: list[StdDaqStatus], error: list[StdDaqStatus] self, status: DeviceStatus, success: list[StdDaqStatus], error: list[StdDaqStatus]
): ):
""" """
Add a DeviceStatus callback for the StdDAQ. The status will be updated when the StdDAQ status changes and Add a DeviceStatus callback for the StdDAQ. The status will be updated
set to finished when the status matches one of the specified success statuses and to exception when the status when the StdDAQ status changes and set to finished when the status
matches one of the specified error statuses. matches one of the specified success statuses and to exception when the
status matches one of the specified error statuses.
Args: Args:
status (DeviceStatus): DeviceStatus object status (DeviceStatus): DeviceStatus object
@@ -106,7 +109,12 @@ class StdDaqClient:
@typechecked @typechecked
def start( def start(
self, file_path: str, file_prefix: str, num_images: int, timeout: float = 20, wait=True self,
file_path: str,
file_prefix: str,
num_images: int,
timeout: float = 20,
wait: bool = True,
) -> StatusBase | None: ) -> StatusBase | None:
"""Start acquisition on the StdDAQ. """Start acquisition on the StdDAQ.
@@ -122,8 +130,9 @@ class StdDaqClient:
self.wait_for_connection() self.wait_for_connection()
status = StatusBase() status = StatusBase()
# NOTE: CREATING_FILE --> IDLE is a known error, the exact cause is unknown, nut might be botched overwrite protection # NOTE: CREATING_FILE --> IDLE is a known error, the exact cause is unknown,
# Changing file_prefix often solves the problem, but still allows overwrites # Might be botched overwrite protection (solved by changing file_prefix)
# In previous versions there was also a mutex ownership problem
self.add_status_callback( self.add_status_callback(
status, success=["waiting_for_first_image"], error=["rejected", "idle"] status, success=["waiting_for_first_image"], error=["rejected", "idle"]
) )

View File

@@ -14,11 +14,11 @@ ZMQ_TOPIC_FILTER = b""
class StdDaqPreview: class StdDaqPreview:
"""Standalone stdDAQ preview class"""
USER_ACCESS = ["start", "stop", "image", "frameno"] USER_ACCESS = ["start", "stop", "image", "frameno"]
_socket = None _socket = None
_zmq_thread = None _zmq_thread = None
_monitor_mutex = threading.Lock()
_shutdown_event = threading.Event()
_throttle = 0.2 _throttle = 0.2
image = None image = None
frameno = None frameno = None
@@ -26,6 +26,9 @@ class StdDaqPreview:
def __init__(self, url: str, cb: Callable): def __init__(self, url: str, cb: Callable):
self.url = url self.url = url
self._on_update_callback = cb self._on_update_callback = cb
# Must be here otherwise they're static (shared between class instances)
self._monitor_mutex = threading.Lock()
self._shutdown_event = threading.Event()
def connect(self): def connect(self):
"""Connect to te StDAQs PUB-SUB streaming interface """Connect to te StDAQs PUB-SUB streaming interface
@@ -45,6 +48,7 @@ class StdDaqPreview:
self._socket.connect(self.url) self._socket.connect(self.url)
def start(self): def start(self):
"""Start the preview thread"""
# Only one consumer thread # Only one consumer thread
if self._zmq_thread: if self._zmq_thread:
self.stop() self.stop()
@@ -56,6 +60,7 @@ class StdDaqPreview:
self._zmq_thread.start() self._zmq_thread.start()
def stop(self): def stop(self):
"""Stop the preview and disconnect from ZMQ stream"""
self._shutdown_event.set() self._shutdown_event.set()
if self._zmq_thread: if self._zmq_thread:
self._zmq_thread.join() self._zmq_thread.join()
@@ -124,5 +129,5 @@ class StdDaqPreview:
f"Mean: {np.mean(image):.3f}" f"Mean: {np.mean(image):.3f}"
) )
self.image = image self.image = image
self.frameno = header['frame'] self.frameno = header["frame"]
self._on_update_callback(image, header) self._on_update_callback(image, header)