From 5d5eef11f68e98dbb4808dcfd2e4519c15f60d23 Mon Sep 17 00:00:00 2001 From: gac-x05la Date: Wed, 30 Apr 2025 13:10:53 +0200 Subject: [PATCH] Fixed threading mutex --- .../device_configs/microxas_test_bed.yaml | 38 +++++++++---------- tomcat_bec/devices/gigafrost/pcoedgecamera.py | 16 +++----- .../devices/gigafrost/std_daq_client.py | 32 +++++++++------- 3 files changed, 44 insertions(+), 42 deletions(-) diff --git a/tomcat_bec/device_configs/microxas_test_bed.yaml b/tomcat_bec/device_configs/microxas_test_bed.yaml index 96905cb..c932527 100644 --- a/tomcat_bec/device_configs/microxas_test_bed.yaml +++ b/tomcat_bec/device_configs/microxas_test_bed.yaml @@ -151,25 +151,25 @@ es1_ddaq: # softwareTrigger: true -# gfcam: -# description: GigaFrost camera client -# deviceClass: tomcat_bec.devices.GigaFrostCamera -# deviceConfig: -# prefix: 'X02DA-CAM-GF2:' -# backend_url: 'http://sls-daq-001:8080' -# auto_soft_enable: true -# std_daq_live: 'tcp://129.129.95.111:20000' -# std_daq_ws: 'ws://129.129.95.111:8080' -# std_daq_rest: 'http://129.129.95.111:5000' -# deviceTags: -# - camera -# - trigger -# - gfcam -# enabled: true -# onFailure: buffer -# readOnly: false -# readoutPriority: monitored -# softwareTrigger: true +gfcam: + description: GigaFrost camera client + deviceClass: tomcat_bec.devices.GigaFrostCamera + deviceConfig: + prefix: 'X02DA-CAM-GF2:' + backend_url: 'http://sls-daq-001:8080' + auto_soft_enable: true + std_daq_live: 'tcp://129.129.95.111:20000' + std_daq_ws: 'ws://129.129.95.111:8080' + std_daq_rest: 'http://129.129.95.111:5000' + deviceTags: + - camera + - trigger + - gfcam + enabled: true + onFailure: buffer + readOnly: false + readoutPriority: monitored + softwareTrigger: true # gfdaq: # description: GigaFrost stdDAQ client diff --git a/tomcat_bec/devices/gigafrost/pcoedgecamera.py b/tomcat_bec/devices/gigafrost/pcoedgecamera.py index f8461c2..6d815ef 100644 --- a/tomcat_bec/devices/gigafrost/pcoedgecamera.py +++ b/tomcat_bec/devices/gigafrost/pcoedgecamera.py @@ -178,8 +178,10 @@ class PcoEdge5M(PSIDeviceBase, PcoEdgeBase): """ if acq_mode in ["default", "step"]: # NOTE: Trigger duration requires a consumer - self.bufferStoreMode.set("Recorder").wait() - # self.file_format.set("ZEROMQ").wait() + self.bufferStoreMode.set("FIFO Buffer").wait() + if acq_mode in ["stream"]: + # NOTE: Trigger duration requires a consumer + self.bufferStoreMode.set("FIFO Buffer").wait() else: raise RuntimeError(f"Unsupported acquisition mode: {acq_mode}") @@ -355,16 +357,10 @@ class PcoEdge5M(PSIDeviceBase, PcoEdgeBase): # status.wait() # Not sure if it always sends the first batch of images or the newest - def wait_bufferreset(*, old_value, value, timestamp, **_): - return (value < old_value) or (value == 0) - - self.buffer_clear.set(1).wait() - status = SubscriptionStatus(self.buffer_used, wait_bufferreset, timeout=5) - status.wait() - - t_expected = (self.acquire_time.get() + self.acquire_delay.get()) * self.file_savestop.get() + self.buffer_clear.set(1, settle_time=0.1).wait() # Wait until the buffer fills up with enough images + t_expected = (self.acquire_time.get() + self.acquire_delay.get()) * self.file_savestop.get() def wait_acquisition(*, value, timestamp, **_): num_target = self.file_savestop.get() # logger.warning(f"{value} of {num_target}") diff --git a/tomcat_bec/devices/gigafrost/std_daq_client.py b/tomcat_bec/devices/gigafrost/std_daq_client.py index 56cca38..f98684a 100644 --- a/tomcat_bec/devices/gigafrost/std_daq_client.py +++ b/tomcat_bec/devices/gigafrost/std_daq_client.py @@ -48,17 +48,13 @@ class StdDaqStatus(str, enum.Enum): class StdDaqClient: - USER_ACCESS = ["status", "count", "start", "stop", "get_config", "set_config", "reset", "_status"] + USER_ACCESS = ["status", "count", "start", "stop", "get_config", "set_config", "reset"] _ws_client: ws.ClientConnection | None = None _count: int = 0 - _status: StdDaqStatus = StdDaqStatus.UNDEFINED + _status: str = "undefined" _status_timestamp: float | None = None - _ws_recv_mutex = threading.Lock() _ws_monitor_thread: threading.Thread | None = None - _shutdown_event = threading.Event() - _ws_idle_event = threading.Event() - _daq_is_running = threading.Event() _config: dict | None = None _status_callbacks: dict[str, tuple[DeviceStatus, list[StdDaqStatus], list[StdDaqStatus]]] = {} @@ -66,12 +62,18 @@ class StdDaqClient: self.parent = parent self.ws_url = ws_url self.rest_url = rest_url - self._daq_is_running.set() + self.name = self.parent.name if self.parent is not None else "None" + # Must be here otherwise they're static (shared between class instances) + self._ws_recv_mutex = threading.Lock() + self._shutdown_event = threading.Event() + self._ws_idle_event = threading.Event() + self._daq_is_running = threading.Event() # Connect to WS interface and start status monitoring self.wait_for_connection() + self._daq_is_running.set() self._ws_monitor_thread = threading.Thread( - target=self._ws_monitor_loop, name=f"{self.parent.name}_stddaq_ws_monitor", daemon=True + target=self._ws_monitor_loop, name=f"{self.name}_ws_monitor", daemon=True ) self._ws_monitor_thread.start() @@ -84,7 +86,7 @@ class StdDaqClient: @property def count(self) -> int: - """ Get the recorded frame count""" + """Get the recorded frame count""" return self._count def add_status_callback( @@ -122,7 +124,9 @@ class StdDaqClient: status = StatusBase() # NOTE: CREATING_FILE --> IDLE is a known error, the exact cause is unknown, nut might be botched overwrite protection # Changing file_prefix often solves the problem, but still allows overwrites - self.add_status_callback(status, success=["waiting_for_first_image"], error=["rejected", "idle"]) + self.add_status_callback( + status, success=["waiting_for_first_image"], error=["rejected", "idle"] + ) message = { "command": "start", "path": file_path, @@ -316,7 +320,7 @@ class StdDaqClient: callbacks. It also handles stdDAQ restarts and reconnection by itself. """ if self._ws_recv_mutex.locked(): - logger.warning("stdDAQ WS monitor loop already locked") + logger.warning(f"[{self.name}] stdDAQ WS monitor loop already locked") return with self._ws_recv_mutex: @@ -335,10 +339,12 @@ class StdDaqClient: continue msg = json.loads(msg) if self._status != msg["status"]: - logger.warning(f"stdDAQ state transition: {self._status} --> {msg}") + logger.warning( + f"[{self.name}] stdDAQ state transition: {self._status} --> {msg['status']}" + ) if msg["status"] == "recording": self._count = msg.get("count", 0) - # Update status and run callbacks + # Update status and run callbacks self._status = msg["status"] self._status_timestamp = msg_timestamp self._run_status_callbacks()