This commit is contained in:
gac-x05la
2025-03-19 09:52:34 +01:00
parent c9a2ce0dc5
commit 20dcd1849a

View File

@@ -122,17 +122,19 @@ class StdDaqClient:
USER_ACCESS = ["status", "start", "stop", "get_config", "set_config", "reset"]
_ws_client: ClientConnection | None = None
_status: StdDaqStatus = StdDaqStatus.UNDEFINED
_ws_update_thread: threading.Thread | None = None
_shutdown_event = threading.Event()
_ws_idle_event = threading.Event()
_daq_is_running = threading.Event()
_config: StdDaqConfig | None = None
def __init__(self, parent: Device, ws_url: str, rest_url: str):
self.parent = parent
self.ws_url = ws_url
self.rest_url = rest_url
self.ws_client: ClientConnection | None = None
self._status: StdDaqStatus = StdDaqStatus.UNDEFINED
self._ws_update_thread: threading.Thread | None = None
self._shutdown_event = threading.Event()
self._ws_idle_event = threading.Event()
self._daq_is_running = threading.Event()
self._config: StdDaqConfig | None = None
self._status_callbacks: dict[
str, tuple[DeviceStatus, list[StdDaqStatus], list[StdDaqStatus]]
] = {}
@@ -199,7 +201,7 @@ class StdDaqClient:
message = {"command": "stop"}
return self._send_queue.put(message)
def get_config(self, cached=False, timeout: float = 2) -> dict:
def get_config(self, timeout: float = 2) -> dict:
"""
Get the current configuration of the StdDAQ.
@@ -210,16 +212,14 @@ class StdDaqClient:
Returns:
StdDaqConfig: configuration of the StdDAQ
"""
if cached and self._config is not None:
return self._config
response = requests.get(
self.rest_url + "/api/config/get", params={"user": "ioc"}, timeout=timeout
)
response.raise_for_status()
self._config = StdDaqConfig(**response.json())
return self._config.model_dump()
self._config = response.json()
return self._config
def set_config(self, config: StdDaqConfig | dict, timeout: float = 2) -> None:
def set_config(self, config: dict, timeout: float = 2, update: bool = True) -> None:
"""
Set the configuration of the StdDAQ. This will overwrite the current configuration.
@@ -227,20 +227,16 @@ class StdDaqClient:
config (StdDaqConfig | dict): configuration to set
timeout (float): timeout for the request
"""
if not isinstance(config, StdDaqConfig):
config = StdDaqConfig(**config)
out = config.model_dump(exclude_none=True)
if not out:
logger.info(
"The provided config does not contain relevant values for the StdDaq. Skipping set_config."
)
return
if self._config is None:
self.get_config()
if update:
self._config.update(config)
config = copy.deepcopy(self._config)
self._pre_restart()
response = requests.post(
self.rest_url + "/api/config/set", params={"user": "ioc"}, json=out, timeout=timeout
self.rest_url + "/api/config/set", params={"user": "ioc"}, json=config, timeout=timeout
)
response.raise_for_status()
@@ -248,38 +244,17 @@ class StdDaqClient:
self._post_restart()
def _pre_restart(self):
"""Stop monitor before restart"""
self._daq_is_running.clear()
self._ws_idle_event.wait()
if self.ws_client is not None:
self.ws_client.close()
if self._ws_client is not None:
self._ws_client.close()
def _post_restart(self):
"""Start monitor after a restart"""
self.wait_for_connection()
self._daq_is_running.set()
def update_config(self, config: StdDaqConfigPartial | dict, timeout: float = 2) -> None:
"""
Update the configuration of the StdDAQ. This will update the current configuration.
Args:
config (StdDaqConfigPartial | dict): configuration to update
timeout (float): timeout for the request
"""
if not isinstance(config, StdDaqConfigPartial):
config = StdDaqConfigPartial(**config)
patch_config_dict = config.model_dump(exclude_none=True)
if not patch_config_dict:
return
current_config = copy.deepcopy(self.get_config())
new_config = copy.deepcopy(current_config)
new_config.update(patch_config_dict)
if current_config == new_config:
return
self.set_config(StdDaqConfig(**new_config), timeout=timeout)
def reset(self, min_wait: float = 5) -> None:
"""
Reset the StdDAQ.
@@ -299,10 +274,10 @@ class StdDaqClient:
"""
start_time = time.time()
while True:
if self.ws_client is not None and self.ws_client.state == State.OPEN:
if self._ws_client is not None and self._ws_client.state == State.OPEN:
return
try:
self.ws_client = connect(self.ws_url)
self._ws_client = connect(self.ws_url)
break
except ConnectionRefusedError as exc:
if time.time() - start_time > timeout:
@@ -351,9 +326,9 @@ class StdDaqClient:
"""
if self._ws_update_thread is not None:
self._ws_update_thread.join()
if self.ws_client is not None:
self.ws_client.close()
self.ws_client = None
if self._ws_client is not None:
self._ws_client.close()
self._ws_client = None
def _wait_for_server_running(self):
"""
@@ -367,18 +342,18 @@ class StdDaqClient:
self._ws_idle_event.set()
def _ws_send_and_receive(self):
if not self.ws_client:
if not self._ws_client:
self.wait_for_connection()
try:
try:
msg = self._send_queue.get(block=False)
logger.trace(f"Sending to stddaq ws: {msg}")
self.ws_client.send(json.dumps(msg))
self._ws_client.send(json.dumps(msg))
logger.trace(f"Sent to stddaq ws: {msg}")
except queue.Empty:
pass
try:
recv_msgs = self.ws_client.recv(timeout=0.1)
recv_msgs = self._ws_client.recv(timeout=0.1)
except TimeoutError:
return
logger.trace(f"Received from stddaq ws: {recv_msgs}")