diff --git a/ophyd_devices/devices/panda_box/panda_box.py b/ophyd_devices/devices/panda_box/panda_box.py index 7b4ec30..117d2b9 100644 --- a/ophyd_devices/devices/panda_box/panda_box.py +++ b/ophyd_devices/devices/panda_box/panda_box.py @@ -38,13 +38,14 @@ import threading import time import uuid from enum import StrEnum -from typing import TYPE_CHECKING, Any, Callable, TypeAlias, TypedDict +from typing import TYPE_CHECKING, Any, Callable, Tuple, TypeAlias, TypedDict import pandablocks.commands as pbc from bec_lib import bec_logger from ophyd import Component as Cpt from ophyd.status import WaitTimeoutError -from pandablocks.blocking import BlockingClient +from pandablocks.blocking import BlockingClient, _SocketHelper +from pandablocks.connections import DataConnection, NeedMoreDataError from pandablocks.responses import Data, EndData, FrameData, ReadyData, StartData from ophyd_devices import DynamicSignal, PSIDeviceBase, StatusBase @@ -166,6 +167,34 @@ class DataCallback(TypedDict): data_type: PandaState +### PandaBox Convenient Data Connection Wrapper: + + +class PandaBoxDataConnection: + """Context Manager to conveniently manage the connection to the PandaBox data streams.""" + + def __init__(self, host: str, scaled: bool = True, socket_timeout: float = 0.1): + self.host = host + self.scaled = scaled + self.socket_timeout = socket_timeout + self.data_socket = None + + def __enter__(self) -> Tuple[socket.socket, DataConnection]: + self.data_socket = _SocketHelper() + self.data_socket.connect(self.host, 8889) + + connection = DataConnection() + s = self.data_socket.socket + s.settimeout(self.socket_timeout) # close enough + s.sendall(connection.connect(self.scaled)) + return s, connection + + def __exit__(self, *args): + if self.data_socket is not None: + self.data_socket.close() + self.data_socket = None + + class PandaBox(PSIDeviceBase): """ Base class for PandaBox devices. Beamline integrations should inherit from this base class, @@ -431,25 +460,35 @@ class PandaBox(PSIDeviceBase): loop after receiving EndData, as this indicates the end of the acquisition. """ try: - with BlockingClient(self.host) as client: + with PandaBoxDataConnection(self.host, scaled=True, socket_timeout=0.1) as ( + data_socket, + data_connection, + ): while ( not self.data_thread_kill_event.is_set() and self.data_thread_run_event.is_set() ): try: - # Timeout is needed to periodically check if we should leave the loop. - # IMPORTANT: Keep scaled=True to receive the data properly scaled and not in - # raw format. If performance becomes an issue, this should be re-evaluated. - # One could directly connect to the Socket, however, this should not be relevant - # in the kHz range, maybe in the several MHz range. - for data in client.data(scaled=True, frame_timeout=0.1): + chunk = data_socket.recv(4096) + for data in data_connection.receive_bytes(chunk): if not self._run_data_readout_step(data): - return # finally executes still - except socket.timeout: - # Timeout is expected to happen, but we have to check if the polling loop should still be running, or if - # stop was called and we should exit the loop. + return # Received EndData, exit readout loop + except NeedMoreDataError: + # Expected for multiple small chunks, we continue readout loop continue + except socket.timeout: + # Timeout is okay, continue in loop and check if loop is stopped externally + continue + except Exception as e: # pylint: disable=broad-except + logger.error(f"Connection error: {e}. Closing connection.") + # Broad error should be logged, and then we exit and thereby stop the + # readout loop + return + if not chunk: + # Receiving an empty chunk means that the connection was closed from the PandaBox side. + return finally: - # Make sure to leave the PandaBox in a clean state. + # We run our clean up procedure, disarming, resetting data_thread_run_event and updating + # the PandaState to DISARMED. self._reset_panda() def _reset_panda(self) -> None: