Run control works

This commit is contained in:
gac-x05la
2025-03-19 14:37:30 +01:00
parent 20dcd1849a
commit 49e8c64433
2 changed files with 80 additions and 84 deletions

View File

@@ -525,9 +525,6 @@ class GigaFrostCamera(PSIDeviceBase, GigaFrostBase):
"""
# Perform a full initialization of the GigaFrost
self.initialize_gigafrost()
# Connect to the stdDAQ backend
if self.backend is not None:
self.backend.connect()
def on_stage(self) -> DeviceStatus | None:
"""

View File

@@ -16,7 +16,7 @@ from pydantic import BaseModel, ConfigDict, Field, model_validator
from typeguard import typechecked
from websockets import State
from websockets.exceptions import WebSocketException
from websockets.sync.client import ClientConnection, connect
import websockets.sync.client as ws
if TYPE_CHECKING: # pragma: no cover
from ophyd import Device, DeviceStatus
@@ -122,25 +122,29 @@ class StdDaqClient:
USER_ACCESS = ["status", "start", "stop", "get_config", "set_config", "reset"]
_ws_client: ClientConnection | None = None
_ws_client: ws.ClientConnection | None = None
_status: StdDaqStatus = StdDaqStatus.UNDEFINED
_ws_recv_mutex = threading.Lock()
_ws_update_thread: threading.Thread | None = None
_shutdown_event = threading.Event()
_ws_idle_event = threading.Event()
_daq_is_running = threading.Event()
_config: StdDaqConfig | None = None
_status_callbacks: dict[str, tuple[DeviceStatus, list[StdDaqStatus], list[StdDaqStatus]]] = {}
def __init__(self, parent: Device, ws_url: str, rest_url: str):
self.parent = parent
self.ws_url = ws_url
self.rest_url = rest_url
self._status_callbacks: dict[
str, tuple[DeviceStatus, list[StdDaqStatus], list[StdDaqStatus]]
] = {}
self._send_queue = queue.Queue()
self._daq_is_running.set()
# Connect to WS interface and start status monitoring
self.wait_for_connection()
self._ws_monitor_thread = threading.Thread(
target=self._ws_monitor_loop, name=f"{self.parent.name}_stddaq_ws_monitor", daemon=True
)
self._ws_monitor_thread.start()
@property
def status(self) -> StdDaqStatus:
"""
@@ -167,50 +171,62 @@ class StdDaqClient:
def start(
self, file_path: str, file_prefix: str, num_images: int, timeout: float = 20, wait=True
) -> StatusBase:
"""
Start acquisition on the StdDAQ.
"""Start acquisition on the StdDAQ.
Args:
file_path (str): path to save the files
file_prefix (str): prefix of the files
num_images (int): number of images to acquire
timeout (float): timeout for the request
Returns:
status (StatusBase): Ophyd status object with attached monitor
"""
# Ensure connection
self.wait_for_connection()
logger.info(f"Starting StdDaq backend. Current status: {self.status}")
status = StatusBase()
self.add_status_callback(status, success=["waiting_for_first_image"], error=[])
self.add_status_callback(status, success=["waiting_for_first_image"], error=["rejected"])
message = {
"command": "start",
"path": file_path,
"file_prefix": file_prefix,
"n_image": num_images,
}
self._send_queue.put(message)
self._ws_client.send(json.dumps(message))
if wait:
return status.wait(timeout=timeout)
status.wait(timeout=timeout)
return status
def stop(self):
"""
Stop acquisition on the StdDAQ.
@typechecked
def stop(self, timeout: float = 5, wait=True) -> StatusBase:
"""Stop acquisition on the StdDAQ.
Args:
timeout (float): timeout for the request
Returns:
status (StatusBase): Ophyd status object with attached monitor
"""
# Ensure connection
self.wait_for_connection()
logger.info(f"Stopping StdDaq backend. Current status: {self.status}")
status = StatusBase()
self.add_status_callback(status, success=["idle"], error=["error"])
message = {"command": "stop"}
return self._send_queue.put(message)
self._ws_client.send(json.dumps(message))
if wait:
status.wait(timeout=timeout)
return status
def get_config(self, timeout: float = 2) -> dict:
"""
Get the current configuration of the StdDAQ.
"""Get the current configuration of the StdDAQ.
Args:
cached (bool): whether to use the cached configuration
timeout (float): timeout for the request
Returns:
StdDaqConfig: configuration of the StdDAQ
config (dict): configuration of the StdDAQ
"""
response = requests.get(
self.rest_url + "/api/config/get", params={"user": "ioc"}, timeout=timeout
@@ -219,7 +235,7 @@ class StdDaqClient:
self._config = response.json()
return self._config
def set_config(self, config: dict, timeout: float = 2, update: bool = True) -> None:
def set_config(self, config: dict, timeout: float = 2, update: bool = True, force: bool=True) -> None:
"""
Set the configuration of the StdDAQ. This will overwrite the current configuration.
@@ -227,16 +243,17 @@ class StdDaqClient:
config (StdDaqConfig | dict): configuration to set
timeout (float): timeout for the request
"""
if self._config is None:
self.get_config()
if update:
self._config.update(config)
config = copy.deepcopy(self._config)
old_config = self.get_config()
new_config = copy.deepcopy(self._config.update(config)) if update else config
# Escape unnecesary restarts
if not force and new_config == old_config:
return
self._pre_restart()
response = requests.post(
self.rest_url + "/api/config/set", params={"user": "ioc"}, json=config, timeout=timeout
self.rest_url + "/api/config/set", params={"user": "ioc"}, json=new_config, timeout=timeout
)
response.raise_for_status()
@@ -277,7 +294,7 @@ class StdDaqClient:
if self._ws_client is not None and self._ws_client.state == State.OPEN:
return
try:
self._ws_client = connect(self.ws_url)
self._ws_client = ws.connect(self.ws_url)
break
except ConnectionRefusedError as exc:
if time.time() - start_time > timeout:
@@ -310,16 +327,6 @@ class StdDaqClient:
)
response.raise_for_status()
def connect(self):
"""
Connect to the StdDAQ. This method should be called after the client is created. It will
launch a background thread to exchange data with the StdDAQ.
"""
self._ws_update_thread = threading.Thread(
target=self._ws_update_loop, name=f"{self.parent.name}_stddaq_ws_loop", daemon=True
)
self._ws_update_thread.start()
def shutdown(self):
"""
Shutdown the StdDAQ client.
@@ -341,55 +348,36 @@ class StdDaqClient:
break
self._ws_idle_event.set()
def _ws_send_and_receive(self):
if not self._ws_client:
self.wait_for_connection()
try:
try:
msg = self._send_queue.get(block=False)
logger.trace(f"Sending to stddaq ws: {msg}")
self._ws_client.send(json.dumps(msg))
logger.trace(f"Sent to stddaq ws: {msg}")
except queue.Empty:
pass
try:
recv_msgs = self._ws_client.recv(timeout=0.1)
except TimeoutError:
return
logger.trace(f"Received from stddaq ws: {recv_msgs}")
if recv_msgs is not None:
self._on_received_ws_message(recv_msgs)
except WebSocketException:
content = traceback.format_exc()
logger.warning(f"Websocket connection closed unexpectedly: {content}")
self.wait_for_connection()
def _ws_monitor_loop(self):
"""Loop to update the status property of the StdDAQ.
def _ws_update_loop(self):
"""
Loop to update the status property of the StdDAQ.
This is a persistent monitor that updates the status and calls attached
callbacks. It also handles stdDAQ restarts and reconnection by itself.
"""
while not self._shutdown_event.is_set():
self._wait_for_server_running()
self._ws_send_and_receive()
def _on_received_ws_message(self, msg: str):
"""
Handle a message received from the StdDAQ.
"""
try:
data = StdDaqWsResponse(**json.loads(msg))
except Exception:
content = traceback.format_exc()
logger.warning(f"Failed to decode websocket message: {content}")
return
self._status = data.status
self._run_status_callbacks()
try:
msg = self._ws_client.recv(timeout=0.1)
except TimeoutError:
continue
except WebSocketException:
content = traceback.format_exc()
# TODO: this is expected to happen on every reconfiguration
logger.warning(f"Websocket connection closed unexpectedly: {content}")
self.wait_for_connection()
continue
msg = json.loads(msg)
if self._status != msg["status"]:
logger.info(f"stdDAQ state transition by: {msg}")
self._status = msg["status"]
self._run_status_callbacks()
def _run_status_callbacks(self):
"""
Update the DeviceStatus objects based on the current status of the StdDAQ.
If the status matches one of the success or error statuses, the DeviceStatus object will be set to finished
or exception, respectively and removed from the list of callbacks.
If the status matches one of the success or error statuses, the DeviceStatus
object will be set to finished or exception, respectively and removed from
the list of callbacks.
"""
status = self._status
@@ -406,3 +394,14 @@ class StdDaqClient:
for cb in completed_callbacks:
self._status_callbacks.pop(id(cb))
# Automatically connect to microXAS testbench if directly invoked
if __name__ == "__main__":
class foo:
name="bar"
daq = StdDaqClient(
parent=foo(), ws_url='ws://129.129.95.111:8080', rest_url='http://129.129.95.111:5000'
)