This commit is contained in:
gac-x05la
2025-03-19 09:52:34 +01:00
parent c9a2ce0dc5
commit 20dcd1849a

View File

@@ -122,17 +122,19 @@ class StdDaqClient:
USER_ACCESS = ["status", "start", "stop", "get_config", "set_config", "reset"] USER_ACCESS = ["status", "start", "stop", "get_config", "set_config", "reset"]
_ws_client: ClientConnection | None = None
_status: StdDaqStatus = StdDaqStatus.UNDEFINED
_ws_update_thread: threading.Thread | None = None
_shutdown_event = threading.Event()
_ws_idle_event = threading.Event()
_daq_is_running = threading.Event()
_config: StdDaqConfig | None = None
def __init__(self, parent: Device, ws_url: str, rest_url: str): def __init__(self, parent: Device, ws_url: str, rest_url: str):
self.parent = parent self.parent = parent
self.ws_url = ws_url self.ws_url = ws_url
self.rest_url = rest_url self.rest_url = rest_url
self.ws_client: ClientConnection | None = None
self._status: StdDaqStatus = StdDaqStatus.UNDEFINED
self._ws_update_thread: threading.Thread | None = None
self._shutdown_event = threading.Event()
self._ws_idle_event = threading.Event()
self._daq_is_running = threading.Event()
self._config: StdDaqConfig | None = None
self._status_callbacks: dict[ self._status_callbacks: dict[
str, tuple[DeviceStatus, list[StdDaqStatus], list[StdDaqStatus]] str, tuple[DeviceStatus, list[StdDaqStatus], list[StdDaqStatus]]
] = {} ] = {}
@@ -199,7 +201,7 @@ class StdDaqClient:
message = {"command": "stop"} message = {"command": "stop"}
return self._send_queue.put(message) return self._send_queue.put(message)
def get_config(self, cached=False, timeout: float = 2) -> dict: def get_config(self, timeout: float = 2) -> dict:
""" """
Get the current configuration of the StdDAQ. Get the current configuration of the StdDAQ.
@@ -210,16 +212,14 @@ class StdDaqClient:
Returns: Returns:
StdDaqConfig: configuration of the StdDAQ StdDaqConfig: configuration of the StdDAQ
""" """
if cached and self._config is not None:
return self._config
response = requests.get( response = requests.get(
self.rest_url + "/api/config/get", params={"user": "ioc"}, timeout=timeout self.rest_url + "/api/config/get", params={"user": "ioc"}, timeout=timeout
) )
response.raise_for_status() response.raise_for_status()
self._config = StdDaqConfig(**response.json()) self._config = response.json()
return self._config.model_dump() return self._config
def set_config(self, config: StdDaqConfig | dict, timeout: float = 2) -> None: def set_config(self, config: dict, timeout: float = 2, update: bool = True) -> None:
""" """
Set the configuration of the StdDAQ. This will overwrite the current configuration. Set the configuration of the StdDAQ. This will overwrite the current configuration.
@@ -227,20 +227,16 @@ class StdDaqClient:
config (StdDaqConfig | dict): configuration to set config (StdDaqConfig | dict): configuration to set
timeout (float): timeout for the request timeout (float): timeout for the request
""" """
if not isinstance(config, StdDaqConfig): if self._config is None:
config = StdDaqConfig(**config) self.get_config()
if update:
out = config.model_dump(exclude_none=True) self._config.update(config)
if not out: config = copy.deepcopy(self._config)
logger.info(
"The provided config does not contain relevant values for the StdDaq. Skipping set_config."
)
return
self._pre_restart() self._pre_restart()
response = requests.post( response = requests.post(
self.rest_url + "/api/config/set", params={"user": "ioc"}, json=out, timeout=timeout self.rest_url + "/api/config/set", params={"user": "ioc"}, json=config, timeout=timeout
) )
response.raise_for_status() response.raise_for_status()
@@ -248,38 +244,17 @@ class StdDaqClient:
self._post_restart() self._post_restart()
def _pre_restart(self): def _pre_restart(self):
"""Stop monitor before restart"""
self._daq_is_running.clear() self._daq_is_running.clear()
self._ws_idle_event.wait() self._ws_idle_event.wait()
if self.ws_client is not None: if self._ws_client is not None:
self.ws_client.close() self._ws_client.close()
def _post_restart(self): def _post_restart(self):
"""Start monitor after a restart"""
self.wait_for_connection() self.wait_for_connection()
self._daq_is_running.set() self._daq_is_running.set()
def update_config(self, config: StdDaqConfigPartial | dict, timeout: float = 2) -> None:
"""
Update the configuration of the StdDAQ. This will update the current configuration.
Args:
config (StdDaqConfigPartial | dict): configuration to update
timeout (float): timeout for the request
"""
if not isinstance(config, StdDaqConfigPartial):
config = StdDaqConfigPartial(**config)
patch_config_dict = config.model_dump(exclude_none=True)
if not patch_config_dict:
return
current_config = copy.deepcopy(self.get_config())
new_config = copy.deepcopy(current_config)
new_config.update(patch_config_dict)
if current_config == new_config:
return
self.set_config(StdDaqConfig(**new_config), timeout=timeout)
def reset(self, min_wait: float = 5) -> None: def reset(self, min_wait: float = 5) -> None:
""" """
Reset the StdDAQ. Reset the StdDAQ.
@@ -299,10 +274,10 @@ class StdDaqClient:
""" """
start_time = time.time() start_time = time.time()
while True: while True:
if self.ws_client is not None and self.ws_client.state == State.OPEN: if self._ws_client is not None and self._ws_client.state == State.OPEN:
return return
try: try:
self.ws_client = connect(self.ws_url) self._ws_client = connect(self.ws_url)
break break
except ConnectionRefusedError as exc: except ConnectionRefusedError as exc:
if time.time() - start_time > timeout: if time.time() - start_time > timeout:
@@ -351,9 +326,9 @@ class StdDaqClient:
""" """
if self._ws_update_thread is not None: if self._ws_update_thread is not None:
self._ws_update_thread.join() self._ws_update_thread.join()
if self.ws_client is not None: if self._ws_client is not None:
self.ws_client.close() self._ws_client.close()
self.ws_client = None self._ws_client = None
def _wait_for_server_running(self): def _wait_for_server_running(self):
""" """
@@ -367,18 +342,18 @@ class StdDaqClient:
self._ws_idle_event.set() self._ws_idle_event.set()
def _ws_send_and_receive(self): def _ws_send_and_receive(self):
if not self.ws_client: if not self._ws_client:
self.wait_for_connection() self.wait_for_connection()
try: try:
try: try:
msg = self._send_queue.get(block=False) msg = self._send_queue.get(block=False)
logger.trace(f"Sending to stddaq ws: {msg}") logger.trace(f"Sending to stddaq ws: {msg}")
self.ws_client.send(json.dumps(msg)) self._ws_client.send(json.dumps(msg))
logger.trace(f"Sent to stddaq ws: {msg}") logger.trace(f"Sent to stddaq ws: {msg}")
except queue.Empty: except queue.Empty:
pass pass
try: try:
recv_msgs = self.ws_client.recv(timeout=0.1) recv_msgs = self._ws_client.recv(timeout=0.1)
except TimeoutError: except TimeoutError:
return return
logger.trace(f"Received from stddaq ws: {recv_msgs}") logger.trace(f"Received from stddaq ws: {recv_msgs}")