From 4e047f7f4aafd99e4e192b6eaa573c28eb895426 Mon Sep 17 00:00:00 2001 From: appel_c Date: Fri, 29 Aug 2025 10:12:32 +0200 Subject: [PATCH] wip overwrite AndStatus --- superxas_bec/devices/timepix/timepix.py | 42 ++++++++++++++++++- .../timepix_fly_client/timepix_fly_backend.py | 25 +++++------ 2 files changed, 51 insertions(+), 16 deletions(-) diff --git a/superxas_bec/devices/timepix/timepix.py b/superxas_bec/devices/timepix/timepix.py index 1535541..53360a2 100644 --- a/superxas_bec/devices/timepix/timepix.py +++ b/superxas_bec/devices/timepix/timepix.py @@ -16,7 +16,8 @@ from bec_lib.logger import bec_logger from ophyd import ADBase from ophyd import Component as Cpt from ophyd import DeviceStatus, Kind, StatusBase -from ophyd_devices import AndStatus, AsyncSignal, CompareStatus, TransitionStatus +from ophyd_devices import AndStatus as _AndStatus +from ophyd_devices import AsyncSignal, CompareStatus, TransitionStatus from ophyd_devices.devices.areadetector.cam import ASItpxCam from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase from typeguard import typechecked @@ -28,6 +29,45 @@ from superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_interface impor PixelMap, ) + +class AndStatus(_AndStatus): + """Custom AndStatus for TimePix detector.""" + + def __init__(self, left: StatusBase | DeviceStatus, right: StatusBase | DeviceStatus, **kwargs): + self.left = left + self.right = right + super().__init__(**kwargs) + self._trace_attributes["left"] = self.left._trace_attributes + self._trace_attributes["right"] = self.right._trace_attributes + + def inner(status): + with self._lock: + if self._externally_initiated_completion: + return + with self.left._lock: + with self.right._lock: + l_success = self.left.success + r_success = self.right.success + l_done = self.left.done + r_done = self.right.done + + # At least one is done. + # If it failed, do not wait for the second one. + if (not l_success) and l_done: + self.set_exception(self.left.exception()) + elif (not r_success) and r_done: + self.set_exception(self.right.exception()) + + elif l_success and r_success and l_done and r_done: + # Both are done, successfully. + self.set_finished() + # Else one is done, successfully, and we wait for #2, + # when this function will be called again. + + self.left.add_callback(inner) + self.right.add_callback(inner) + + logger = bec_logger.logger # pylint: disable=redefined-outer-name diff --git a/superxas_bec/devices/timepix/timepix_fly_client/timepix_fly_backend.py b/superxas_bec/devices/timepix/timepix_fly_client/timepix_fly_backend.py index 259d976..144639d 100644 --- a/superxas_bec/devices/timepix/timepix_fly_client/timepix_fly_backend.py +++ b/superxas_bec/devices/timepix/timepix_fly_client/timepix_fly_backend.py @@ -67,7 +67,7 @@ class TimepixFlyBackend: hostname = socket.getfqdn() self.hostname = hostname self.socket_port = socket_port # Use 0 as default to let the OS choose an available port - self.msg_buffer = [] + self.__msg_buffer = [] self.callbacks: dict[str, Tuple[Callable[[dict, list[dict], dict, dict], None], dict]] = {} self._status_objects: list[StatusBase] = [] self._decoder = json.JSONDecoder() @@ -115,7 +115,6 @@ class TimepixFlyBackend: """ # Ensure that the message buffer is empty, should never contain data from previous scan # But to be sure, it should definitely be resetted before starting a new scan - self.reset_message_buffer() status = StatusBase() self.cancel_on_stop(status) self.timepix_fly_client.add_status_callback( @@ -191,7 +190,6 @@ class TimepixFlyBackend: def on_unstage(self): """Hook for on_unstage logic.""" - self.reset_message_buffer() def on_destroy(self): """Hook for on_destroy logic.""" @@ -216,7 +214,6 @@ class TimepixFlyBackend: """Hook for on_stop logic.""" self.stop_all_status_objects() self.timepix_fly_client.stop_running_collection() - self.reset_message_buffer() #################################################### ########## Custom Methods for the Backend ########## @@ -278,10 +275,6 @@ class TimepixFlyBackend: else: logger.warning(f"Callback with UUID {cb_id} not found.") - def reset_message_buffer(self): - """Reset the message buffer.""" - self.msg_buffer.clear() - def start_data_server(self) -> StatusBase: """ Start the data server to receive data from the Timepix Fly backend over a socket connection. @@ -365,6 +358,9 @@ class TimepixFlyBackend: logger.error(f"Error accepting connection: {content}") continue logger.debug(f"Connection accepted from {addr} for timepix_fly backend.") + if self.__msg_buffer: + logger.warning(f"Found messages in msg_buffer: {self.__msg_buffer}") + self.__msg_buffer.clear() conn.settimeout(0.1) # Set timeout for connection to avoid blocking in recv with conn: while not self._data_thread_shutdown_event.is_set(): @@ -406,7 +402,7 @@ class TimepixFlyBackend: logger.error(f"TimePixFlyBackend: Failed to decode JSON from buffer: {buffer}") return # TODO should this raise, or only log error as of now? - self.msg_buffer.append(obj) + self.__msg_buffer.append(obj) if obj.get("type", "") == "EndFrame": try: # If the EndFrame message is received, run the callbacks @@ -415,21 +411,21 @@ class TimepixFlyBackend: content = traceback.format_exc() logger.error(f"Error in msg callbacks with error msg: {content}") msgs_in_buffer = "".join( - [f"{msg['type']} with keys {msg.keys()} \n" for msg in self.msg_buffer] + [f"{msg['type']} with keys {msg.keys()} \n" for msg in self.__msg_buffer] ) logger.debug(f"TimePixFlyBackend: Messages in buffer: {msgs_in_buffer}") finally: # Make sure to always reset the message buffer after processing logger.debug( "TimePixFlyBackend: Resetting message buffer after processing EndFrame message." ) - self.reset_message_buffer() + self.__msg_buffer.clear() def run_msg_callbacks(self): """Run callbacks if EndFrame message is received.""" # TODO - start_frame = self.msg_buffer[0] - end_frame = self.msg_buffer[-1] - data_frames = self.msg_buffer[1:-1] + start_frame = self.__msg_buffer[0] + end_frame = self.__msg_buffer[-1] + data_frames = self.__msg_buffer[1:-1] for cb, kwd in self.callbacks.values(): cb(start_frame, data_frames, end_frame, **kwd) @@ -494,7 +490,6 @@ if __name__ == "__main__": # pragma: no cover status_2 = timepix.on_complete() status_2.wait(timeout=10) print("TimepixFlyBackend scan completed.") - timepix.reset_message_buffer() print( f"Received {len(start_frames)} start frames, {len(xes_frames)} data frames, and {len(end_frames)} end frames." )