Fixed threading mutex

This commit is contained in:
gac-x05la
2025-04-30 13:10:53 +02:00
parent b3e0a64bf1
commit 5d5eef11f6
3 changed files with 44 additions and 42 deletions

View File

@@ -151,25 +151,25 @@ es1_ddaq:
# softwareTrigger: true
# gfcam:
# description: GigaFrost camera client
# deviceClass: tomcat_bec.devices.GigaFrostCamera
# deviceConfig:
# prefix: 'X02DA-CAM-GF2:'
# backend_url: 'http://sls-daq-001:8080'
# auto_soft_enable: true
# std_daq_live: 'tcp://129.129.95.111:20000'
# std_daq_ws: 'ws://129.129.95.111:8080'
# std_daq_rest: 'http://129.129.95.111:5000'
# deviceTags:
# - camera
# - trigger
# - gfcam
# enabled: true
# onFailure: buffer
# readOnly: false
# readoutPriority: monitored
# softwareTrigger: true
gfcam:
description: GigaFrost camera client
deviceClass: tomcat_bec.devices.GigaFrostCamera
deviceConfig:
prefix: 'X02DA-CAM-GF2:'
backend_url: 'http://sls-daq-001:8080'
auto_soft_enable: true
std_daq_live: 'tcp://129.129.95.111:20000'
std_daq_ws: 'ws://129.129.95.111:8080'
std_daq_rest: 'http://129.129.95.111:5000'
deviceTags:
- camera
- trigger
- gfcam
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: monitored
softwareTrigger: true
# gfdaq:
# description: GigaFrost stdDAQ client

View File

@@ -178,8 +178,10 @@ class PcoEdge5M(PSIDeviceBase, PcoEdgeBase):
"""
if acq_mode in ["default", "step"]:
# NOTE: Trigger duration requires a consumer
self.bufferStoreMode.set("Recorder").wait()
# self.file_format.set("ZEROMQ").wait()
self.bufferStoreMode.set("FIFO Buffer").wait()
if acq_mode in ["stream"]:
# NOTE: Trigger duration requires a consumer
self.bufferStoreMode.set("FIFO Buffer").wait()
else:
raise RuntimeError(f"Unsupported acquisition mode: {acq_mode}")
@@ -355,16 +357,10 @@ class PcoEdge5M(PSIDeviceBase, PcoEdgeBase):
# status.wait()
# Not sure if it always sends the first batch of images or the newest
def wait_bufferreset(*, old_value, value, timestamp, **_):
return (value < old_value) or (value == 0)
self.buffer_clear.set(1).wait()
status = SubscriptionStatus(self.buffer_used, wait_bufferreset, timeout=5)
status.wait()
t_expected = (self.acquire_time.get() + self.acquire_delay.get()) * self.file_savestop.get()
self.buffer_clear.set(1, settle_time=0.1).wait()
# Wait until the buffer fills up with enough images
t_expected = (self.acquire_time.get() + self.acquire_delay.get()) * self.file_savestop.get()
def wait_acquisition(*, value, timestamp, **_):
num_target = self.file_savestop.get()
# logger.warning(f"{value} of {num_target}")

View File

@@ -48,17 +48,13 @@ class StdDaqStatus(str, enum.Enum):
class StdDaqClient:
USER_ACCESS = ["status", "count", "start", "stop", "get_config", "set_config", "reset", "_status"]
USER_ACCESS = ["status", "count", "start", "stop", "get_config", "set_config", "reset"]
_ws_client: ws.ClientConnection | None = None
_count: int = 0
_status: StdDaqStatus = StdDaqStatus.UNDEFINED
_status: str = "undefined"
_status_timestamp: float | None = None
_ws_recv_mutex = threading.Lock()
_ws_monitor_thread: threading.Thread | None = None
_shutdown_event = threading.Event()
_ws_idle_event = threading.Event()
_daq_is_running = threading.Event()
_config: dict | None = None
_status_callbacks: dict[str, tuple[DeviceStatus, list[StdDaqStatus], list[StdDaqStatus]]] = {}
@@ -66,12 +62,18 @@ class StdDaqClient:
self.parent = parent
self.ws_url = ws_url
self.rest_url = rest_url
self._daq_is_running.set()
self.name = self.parent.name if self.parent is not None else "None"
# Must be here otherwise they're static (shared between class instances)
self._ws_recv_mutex = threading.Lock()
self._shutdown_event = threading.Event()
self._ws_idle_event = threading.Event()
self._daq_is_running = threading.Event()
# Connect to WS interface and start status monitoring
self.wait_for_connection()
self._daq_is_running.set()
self._ws_monitor_thread = threading.Thread(
target=self._ws_monitor_loop, name=f"{self.parent.name}_stddaq_ws_monitor", daemon=True
target=self._ws_monitor_loop, name=f"{self.name}_ws_monitor", daemon=True
)
self._ws_monitor_thread.start()
@@ -84,7 +86,7 @@ class StdDaqClient:
@property
def count(self) -> int:
""" Get the recorded frame count"""
"""Get the recorded frame count"""
return self._count
def add_status_callback(
@@ -122,7 +124,9 @@ class StdDaqClient:
status = StatusBase()
# NOTE: CREATING_FILE --> IDLE is a known error, the exact cause is unknown, nut might be botched overwrite protection
# Changing file_prefix often solves the problem, but still allows overwrites
self.add_status_callback(status, success=["waiting_for_first_image"], error=["rejected", "idle"])
self.add_status_callback(
status, success=["waiting_for_first_image"], error=["rejected", "idle"]
)
message = {
"command": "start",
"path": file_path,
@@ -316,7 +320,7 @@ class StdDaqClient:
callbacks. It also handles stdDAQ restarts and reconnection by itself.
"""
if self._ws_recv_mutex.locked():
logger.warning("stdDAQ WS monitor loop already locked")
logger.warning(f"[{self.name}] stdDAQ WS monitor loop already locked")
return
with self._ws_recv_mutex:
@@ -335,10 +339,12 @@ class StdDaqClient:
continue
msg = json.loads(msg)
if self._status != msg["status"]:
logger.warning(f"stdDAQ state transition: {self._status} --> {msg}")
logger.warning(
f"[{self.name}] stdDAQ state transition: {self._status} --> {msg['status']}"
)
if msg["status"] == "recording":
self._count = msg.get("count", 0)
# Update status and run callbacks
# Update status and run callbacks
self._status = msg["status"]
self._status_timestamp = msg_timestamp
self._run_status_callbacks()