diff --git a/tomcat_bec/device_configs/microxas_test_bed.yaml b/tomcat_bec/device_configs/microxas_test_bed.yaml index 96905cb..c932527 100644 --- a/tomcat_bec/device_configs/microxas_test_bed.yaml +++ b/tomcat_bec/device_configs/microxas_test_bed.yaml @@ -151,25 +151,25 @@ es1_ddaq: # softwareTrigger: true -# gfcam: -# description: GigaFrost camera client -# deviceClass: tomcat_bec.devices.GigaFrostCamera -# deviceConfig: -# prefix: 'X02DA-CAM-GF2:' -# backend_url: 'http://sls-daq-001:8080' -# auto_soft_enable: true -# std_daq_live: 'tcp://129.129.95.111:20000' -# std_daq_ws: 'ws://129.129.95.111:8080' -# std_daq_rest: 'http://129.129.95.111:5000' -# deviceTags: -# - camera -# - trigger -# - gfcam -# enabled: true -# onFailure: buffer -# readOnly: false -# readoutPriority: monitored -# softwareTrigger: true +gfcam: + description: GigaFrost camera client + deviceClass: tomcat_bec.devices.GigaFrostCamera + deviceConfig: + prefix: 'X02DA-CAM-GF2:' + backend_url: 'http://sls-daq-001:8080' + auto_soft_enable: true + std_daq_live: 'tcp://129.129.95.111:20000' + std_daq_ws: 'ws://129.129.95.111:8080' + std_daq_rest: 'http://129.129.95.111:5000' + deviceTags: + - camera + - trigger + - gfcam + enabled: true + onFailure: buffer + readOnly: false + readoutPriority: monitored + softwareTrigger: true # gfdaq: # description: GigaFrost stdDAQ client diff --git a/tomcat_bec/devices/gigafrost/pcoedgecamera.py b/tomcat_bec/devices/gigafrost/pcoedgecamera.py index f8461c2..af9db5e 100644 --- a/tomcat_bec/devices/gigafrost/pcoedgecamera.py +++ b/tomcat_bec/devices/gigafrost/pcoedgecamera.py @@ -9,16 +9,17 @@ import numpy as np from ophyd.status import SubscriptionStatus, DeviceStatus from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase +from bec_lib.logger import bec_logger + from tomcat_bec.devices.gigafrost.pcoedge_base import PcoEdgeBase from tomcat_bec.devices.gigafrost.std_daq_preview import StdDaqPreview from tomcat_bec.devices.gigafrost.std_daq_client import StdDaqClient, StdDaqStatus -from bec_lib.logger import bec_logger - logger = bec_logger.logger +# pylint: disable=too-many-instance-attributes class PcoEdge5M(PSIDeviceBase, PcoEdgeBase): """Ophyd baseclass for Helge camera IOCs @@ -55,19 +56,13 @@ class PcoEdge5M(PSIDeviceBase, PcoEdgeBase): destructive. to prevent this, we don't have EPICS preview """ - # pylint: disable=too-many-instance-attributes - USER_ACCESS = [ - "complete", - "backend", - "live_preview", - "arm", - "disarm", - ] + USER_ACCESS = ["complete", "backend", "live_preview", "arm", "disarm"] # Placeholders for stdDAQ and livestream clients backend = None live_preview = None + # pylint: disable=too-many-arguments def __init__( self, prefix="", @@ -106,7 +101,7 @@ class PcoEdge5M(PSIDeviceBase, PcoEdgeBase): else: logger.error("No stdDAQ stream address provided, launching without preview!") - def configure(self, d: dict = {}) -> tuple: + def configure(self, d: dict = None) -> tuple: """Configure the base Helge camera device Parameters as 'd' dictionary @@ -178,8 +173,10 @@ class PcoEdge5M(PSIDeviceBase, PcoEdgeBase): """ if acq_mode in ["default", "step"]: # NOTE: Trigger duration requires a consumer - self.bufferStoreMode.set("Recorder").wait() - # self.file_format.set("ZEROMQ").wait() + self.bufferStoreMode.set("FIFO Buffer").wait() + if acq_mode in ["stream"]: + # NOTE: Trigger duration requires a consumer + self.bufferStoreMode.set("FIFO Buffer").wait() else: raise RuntimeError(f"Unsupported acquisition mode: {acq_mode}") @@ -213,7 +210,7 @@ class PcoEdge5M(PSIDeviceBase, PcoEdgeBase): self.acquire.set("Idle").wait() # Data streaming is stopped by setting the max index to 0 - # FIXME: This might interrupt data transfer + # FIXME: This will interrupt data transfer self.file_savestop.set(0).wait() def destroy(self): @@ -224,7 +221,7 @@ class PcoEdge5M(PSIDeviceBase, PcoEdgeBase): def _on_preview_update(self, img: np.ndarray, header: dict): """Send preview stream and update frame index counter""" - # FIXME: There's also a recorded images counter provided by the stdDAQ writer + # FIXME: There's also a recorded images counter provided by the stdDAQ writer self.num_images_counter.put(header["frame"], force=True) self._run_subs(sub_type=self.SUB_DEVICE_MONITOR_2D, obj=self, value=img) @@ -278,16 +275,16 @@ class PcoEdge5M(PSIDeviceBase, PcoEdgeBase): d["exposure_time_ms"] = scan_args["exp_time"] if "exp_period" in scan_args and scan_args["exp_period"] is not None: d["exposure_period_ms"] = scan_args["exp_period"] - if 'exp_burst' in scan_args and scan_args['exp_burst'] is not None: - d['exposure_num_burst'] = scan_args['exp_burst'] + if "exp_burst" in scan_args and scan_args["exp_burst"] is not None: + d["exposure_num_burst"] = scan_args["exp_burst"] if "acq_time" in scan_args and scan_args["acq_time"] is not None: d["exposure_time_ms"] = scan_args["acq_time"] if "acq_period" in scan_args and scan_args["acq_period"] is not None: d["exposure_period_ms"] = scan_args["acq_period"] - if 'acq_burst' in scan_args and scan_args['acq_burst'] is not None: - d['exposure_num_burst'] = scan_args['acq_burst'] - if 'acq_mode' in scan_args and scan_args['acq_mode'] is not None: - d['acq_mode'] = scan_args['acq_mode'] + if "acq_burst" in scan_args and scan_args["acq_burst"] is not None: + d["exposure_num_burst"] = scan_args["acq_burst"] + if "acq_mode" in scan_args and scan_args["acq_mode"] is not None: + d["acq_mode"] = scan_args["acq_mode"] # elif self.scaninfo.scan_type == "step": # d['acq_mode'] = "default" if "pco_store_mode" in scan_args and scan_args["pco_store_mode"] is not None: @@ -355,16 +352,11 @@ class PcoEdge5M(PSIDeviceBase, PcoEdgeBase): # status.wait() # Not sure if it always sends the first batch of images or the newest - def wait_bufferreset(*, old_value, value, timestamp, **_): - return (value < old_value) or (value == 0) - - self.buffer_clear.set(1).wait() - status = SubscriptionStatus(self.buffer_used, wait_bufferreset, timeout=5) - status.wait() - - t_expected = (self.acquire_time.get() + self.acquire_delay.get()) * self.file_savestop.get() + self.buffer_clear.set(1, settle_time=0.1).wait() # Wait until the buffer fills up with enough images + t_expected = (self.acquire_time.get() + self.acquire_delay.get()) * self.file_savestop.get() + def wait_acquisition(*, value, timestamp, **_): num_target = self.file_savestop.get() # logger.warning(f"{value} of {num_target}") @@ -385,7 +377,7 @@ class PcoEdge5M(PSIDeviceBase, PcoEdgeBase): # against values from the previous cycle, i.e. pass automatically. t_start = time.time() - def wait_sending(*, old_value, value, timestamp, **kwargs): + def wait_sending(*, old_value, value, timestamp, **_): t_elapsed = timestamp - t_start # logger.warning(f"{old_value}\t{value}\t{t_elapsed}") return old_value == 1 and value == 0 and t_elapsed > 0 diff --git a/tomcat_bec/devices/gigafrost/std_daq_client.py b/tomcat_bec/devices/gigafrost/std_daq_client.py index 56cca38..881c05c 100644 --- a/tomcat_bec/devices/gigafrost/std_daq_client.py +++ b/tomcat_bec/devices/gigafrost/std_daq_client.py @@ -46,19 +46,17 @@ class StdDaqStatus(str, enum.Enum): WAITING_FOR_FIRST_IMAGE = "waiting_for_first_image" +# pylint: disable=too-many-instance-attributes class StdDaqClient: + """Standalone stdDAQ client class""" - USER_ACCESS = ["status", "count", "start", "stop", "get_config", "set_config", "reset", "_status"] + USER_ACCESS = ["status", "count", "start", "stop", "get_config", "set_config", "reset"] _ws_client: ws.ClientConnection | None = None _count: int = 0 - _status: StdDaqStatus = StdDaqStatus.UNDEFINED + _status: str = "undefined" _status_timestamp: float | None = None - _ws_recv_mutex = threading.Lock() _ws_monitor_thread: threading.Thread | None = None - _shutdown_event = threading.Event() - _ws_idle_event = threading.Event() - _daq_is_running = threading.Event() _config: dict | None = None _status_callbacks: dict[str, tuple[DeviceStatus, list[StdDaqStatus], list[StdDaqStatus]]] = {} @@ -66,12 +64,18 @@ class StdDaqClient: self.parent = parent self.ws_url = ws_url self.rest_url = rest_url - self._daq_is_running.set() + self.name = self.parent.name if self.parent is not None else "None" + # Must be here otherwise they're static (shared between class instances) + self._ws_recv_mutex = threading.Lock() + self._shutdown_event = threading.Event() + self._ws_idle_event = threading.Event() + self._daq_is_running = threading.Event() # Connect to WS interface and start status monitoring self.wait_for_connection() + self._daq_is_running.set() self._ws_monitor_thread = threading.Thread( - target=self._ws_monitor_loop, name=f"{self.parent.name}_stddaq_ws_monitor", daemon=True + target=self._ws_monitor_loop, name=f"{self.name}_ws_monitor", daemon=True ) self._ws_monitor_thread.start() @@ -84,16 +88,17 @@ class StdDaqClient: @property def count(self) -> int: - """ Get the recorded frame count""" + """Get the recorded frame count""" return self._count def add_status_callback( self, status: DeviceStatus, success: list[StdDaqStatus], error: list[StdDaqStatus] ): """ - Add a DeviceStatus callback for the StdDAQ. The status will be updated when the StdDAQ status changes and - set to finished when the status matches one of the specified success statuses and to exception when the status - matches one of the specified error statuses. + Add a DeviceStatus callback for the StdDAQ. The status will be updated + when the StdDAQ status changes and set to finished when the status + matches one of the specified success statuses and to exception when the + status matches one of the specified error statuses. Args: status (DeviceStatus): DeviceStatus object @@ -104,7 +109,12 @@ class StdDaqClient: @typechecked def start( - self, file_path: str, file_prefix: str, num_images: int, timeout: float = 20, wait=True + self, + file_path: str, + file_prefix: str, + num_images: int, + timeout: float = 20, + wait: bool = True, ) -> StatusBase | None: """Start acquisition on the StdDAQ. @@ -120,9 +130,12 @@ class StdDaqClient: self.wait_for_connection() status = StatusBase() - # NOTE: CREATING_FILE --> IDLE is a known error, the exact cause is unknown, nut might be botched overwrite protection - # Changing file_prefix often solves the problem, but still allows overwrites - self.add_status_callback(status, success=["waiting_for_first_image"], error=["rejected", "idle"]) + # NOTE: CREATING_FILE --> IDLE is a known error, the exact cause is unknown, + # Might be botched overwrite protection (solved by changing file_prefix) + # In previous versions there was also a mutex ownership problem + self.add_status_callback( + status, success=["waiting_for_first_image"], error=["rejected", "idle"] + ) message = { "command": "start", "path": file_path, @@ -316,7 +329,7 @@ class StdDaqClient: callbacks. It also handles stdDAQ restarts and reconnection by itself. """ if self._ws_recv_mutex.locked(): - logger.warning("stdDAQ WS monitor loop already locked") + logger.warning(f"[{self.name}] stdDAQ WS monitor loop already locked") return with self._ws_recv_mutex: @@ -335,10 +348,12 @@ class StdDaqClient: continue msg = json.loads(msg) if self._status != msg["status"]: - logger.warning(f"stdDAQ state transition: {self._status} --> {msg}") + logger.warning( + f"[{self.name}] stdDAQ state transition: {self._status} --> {msg['status']}" + ) if msg["status"] == "recording": self._count = msg.get("count", 0) - # Update status and run callbacks + # Update status and run callbacks self._status = msg["status"] self._status_timestamp = msg_timestamp self._run_status_callbacks() diff --git a/tomcat_bec/devices/gigafrost/std_daq_preview.py b/tomcat_bec/devices/gigafrost/std_daq_preview.py index 03884d0..8c4e6f1 100644 --- a/tomcat_bec/devices/gigafrost/std_daq_preview.py +++ b/tomcat_bec/devices/gigafrost/std_daq_preview.py @@ -14,11 +14,11 @@ ZMQ_TOPIC_FILTER = b"" class StdDaqPreview: + """Standalone stdDAQ preview class""" + USER_ACCESS = ["start", "stop", "image", "frameno"] _socket = None _zmq_thread = None - _monitor_mutex = threading.Lock() - _shutdown_event = threading.Event() _throttle = 0.2 image = None frameno = None @@ -26,6 +26,9 @@ class StdDaqPreview: def __init__(self, url: str, cb: Callable): self.url = url self._on_update_callback = cb + # Must be here otherwise they're static (shared between class instances) + self._monitor_mutex = threading.Lock() + self._shutdown_event = threading.Event() def connect(self): """Connect to te StDAQs PUB-SUB streaming interface @@ -45,6 +48,7 @@ class StdDaqPreview: self._socket.connect(self.url) def start(self): + """Start the preview thread""" # Only one consumer thread if self._zmq_thread: self.stop() @@ -56,6 +60,7 @@ class StdDaqPreview: self._zmq_thread.start() def stop(self): + """Stop the preview and disconnect from ZMQ stream""" self._shutdown_event.set() if self._zmq_thread: self._zmq_thread.join() @@ -124,5 +129,5 @@ class StdDaqPreview: f"Mean: {np.mean(image):.3f}" ) self.image = image - self.frameno = header['frame'] + self.frameno = header["frame"] self._on_update_callback(image, header)