mirror of
https://github.com/bec-project/ophyd_devices.git
synced 2026-06-01 07:48:26 +02:00
fix(panda-box): Refactor DataConnection to PandaBox
This commit is contained in:
@@ -38,13 +38,14 @@ import threading
|
||||
import time
|
||||
import uuid
|
||||
from enum import StrEnum
|
||||
from typing import TYPE_CHECKING, Any, Callable, TypeAlias, TypedDict
|
||||
from typing import TYPE_CHECKING, Any, Callable, Tuple, TypeAlias, TypedDict
|
||||
|
||||
import pandablocks.commands as pbc
|
||||
from bec_lib import bec_logger
|
||||
from ophyd import Component as Cpt
|
||||
from ophyd.status import WaitTimeoutError
|
||||
from pandablocks.blocking import BlockingClient
|
||||
from pandablocks.blocking import BlockingClient, _SocketHelper
|
||||
from pandablocks.connections import DataConnection, NeedMoreDataError
|
||||
from pandablocks.responses import Data, EndData, FrameData, ReadyData, StartData
|
||||
|
||||
from ophyd_devices import DynamicSignal, PSIDeviceBase, StatusBase
|
||||
@@ -166,6 +167,34 @@ class DataCallback(TypedDict):
|
||||
data_type: PandaState
|
||||
|
||||
|
||||
### PandaBox Convenient Data Connection Wrapper:
|
||||
|
||||
|
||||
class PandaBoxDataConnection:
|
||||
"""Context Manager to conveniently manage the connection to the PandaBox data streams."""
|
||||
|
||||
def __init__(self, host: str, scaled: bool = True, socket_timeout: float = 0.1):
|
||||
self.host = host
|
||||
self.scaled = scaled
|
||||
self.socket_timeout = socket_timeout
|
||||
self.data_socket = None
|
||||
|
||||
def __enter__(self) -> Tuple[socket.socket, DataConnection]:
|
||||
self.data_socket = _SocketHelper()
|
||||
self.data_socket.connect(self.host, 8889)
|
||||
|
||||
connection = DataConnection()
|
||||
s = self.data_socket.socket
|
||||
s.settimeout(self.socket_timeout) # close enough
|
||||
s.sendall(connection.connect(self.scaled))
|
||||
return s, connection
|
||||
|
||||
def __exit__(self, *args):
|
||||
if self.data_socket is not None:
|
||||
self.data_socket.close()
|
||||
self.data_socket = None
|
||||
|
||||
|
||||
class PandaBox(PSIDeviceBase):
|
||||
"""
|
||||
Base class for PandaBox devices. Beamline integrations should inherit from this base class,
|
||||
@@ -431,25 +460,35 @@ class PandaBox(PSIDeviceBase):
|
||||
loop after receiving EndData, as this indicates the end of the acquisition.
|
||||
"""
|
||||
try:
|
||||
with BlockingClient(self.host) as client:
|
||||
with PandaBoxDataConnection(self.host, scaled=True, socket_timeout=0.1) as (
|
||||
data_socket,
|
||||
data_connection,
|
||||
):
|
||||
while (
|
||||
not self.data_thread_kill_event.is_set() and self.data_thread_run_event.is_set()
|
||||
):
|
||||
try:
|
||||
# Timeout is needed to periodically check if we should leave the loop.
|
||||
# IMPORTANT: Keep scaled=True to receive the data properly scaled and not in
|
||||
# raw format. If performance becomes an issue, this should be re-evaluated.
|
||||
# One could directly connect to the Socket, however, this should not be relevant
|
||||
# in the kHz range, maybe in the several MHz range.
|
||||
for data in client.data(scaled=True, frame_timeout=0.1):
|
||||
chunk = data_socket.recv(4096)
|
||||
for data in data_connection.receive_bytes(chunk):
|
||||
if not self._run_data_readout_step(data):
|
||||
return # finally executes still
|
||||
except socket.timeout:
|
||||
# Timeout is expected to happen, but we have to check if the polling loop should still be running, or if
|
||||
# stop was called and we should exit the loop.
|
||||
return # Received EndData, exit readout loop
|
||||
except NeedMoreDataError:
|
||||
# Expected for multiple small chunks, we continue readout loop
|
||||
continue
|
||||
except socket.timeout:
|
||||
# Timeout is okay, continue in loop and check if loop is stopped externally
|
||||
continue
|
||||
except Exception as e: # pylint: disable=broad-except
|
||||
logger.error(f"Connection error: {e}. Closing connection.")
|
||||
# Broad error should be logged, and then we exit and thereby stop the
|
||||
# readout loop
|
||||
return
|
||||
if not chunk:
|
||||
# Receiving an empty chunk means that the connection was closed from the PandaBox side.
|
||||
return
|
||||
finally:
|
||||
# Make sure to leave the PandaBox in a clean state.
|
||||
# We run our clean up procedure, disarming, resetting data_thread_run_event and updating
|
||||
# the PandaState to DISARMED.
|
||||
self._reset_panda()
|
||||
|
||||
def _reset_panda(self) -> None:
|
||||
|
||||
Reference in New Issue
Block a user