wip overwrite AndStatus

This commit is contained in:
2025-08-29 10:12:32 +02:00
parent 46b9a7ed72
commit 4e047f7f4a
2 changed files with 51 additions and 16 deletions

View File

@@ -16,7 +16,8 @@ from bec_lib.logger import bec_logger
from ophyd import ADBase
from ophyd import Component as Cpt
from ophyd import DeviceStatus, Kind, StatusBase
from ophyd_devices import AndStatus, AsyncSignal, CompareStatus, TransitionStatus
from ophyd_devices import AndStatus as _AndStatus
from ophyd_devices import AsyncSignal, CompareStatus, TransitionStatus
from ophyd_devices.devices.areadetector.cam import ASItpxCam
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from typeguard import typechecked
@@ -28,6 +29,45 @@ from superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_interface impor
PixelMap,
)
class AndStatus(_AndStatus):
"""Custom AndStatus for TimePix detector."""
def __init__(self, left: StatusBase | DeviceStatus, right: StatusBase | DeviceStatus, **kwargs):
self.left = left
self.right = right
super().__init__(**kwargs)
self._trace_attributes["left"] = self.left._trace_attributes
self._trace_attributes["right"] = self.right._trace_attributes
def inner(status):
with self._lock:
if self._externally_initiated_completion:
return
with self.left._lock:
with self.right._lock:
l_success = self.left.success
r_success = self.right.success
l_done = self.left.done
r_done = self.right.done
# At least one is done.
# If it failed, do not wait for the second one.
if (not l_success) and l_done:
self.set_exception(self.left.exception())
elif (not r_success) and r_done:
self.set_exception(self.right.exception())
elif l_success and r_success and l_done and r_done:
# Both are done, successfully.
self.set_finished()
# Else one is done, successfully, and we wait for #2,
# when this function will be called again.
self.left.add_callback(inner)
self.right.add_callback(inner)
logger = bec_logger.logger
# pylint: disable=redefined-outer-name

View File

@@ -67,7 +67,7 @@ class TimepixFlyBackend:
hostname = socket.getfqdn()
self.hostname = hostname
self.socket_port = socket_port # Use 0 as default to let the OS choose an available port
self.msg_buffer = []
self.__msg_buffer = []
self.callbacks: dict[str, Tuple[Callable[[dict, list[dict], dict, dict], None], dict]] = {}
self._status_objects: list[StatusBase] = []
self._decoder = json.JSONDecoder()
@@ -115,7 +115,6 @@ class TimepixFlyBackend:
"""
# Ensure that the message buffer is empty, should never contain data from previous scan
# But to be sure, it should definitely be resetted before starting a new scan
self.reset_message_buffer()
status = StatusBase()
self.cancel_on_stop(status)
self.timepix_fly_client.add_status_callback(
@@ -191,7 +190,6 @@ class TimepixFlyBackend:
def on_unstage(self):
"""Hook for on_unstage logic."""
self.reset_message_buffer()
def on_destroy(self):
"""Hook for on_destroy logic."""
@@ -216,7 +214,6 @@ class TimepixFlyBackend:
"""Hook for on_stop logic."""
self.stop_all_status_objects()
self.timepix_fly_client.stop_running_collection()
self.reset_message_buffer()
####################################################
########## Custom Methods for the Backend ##########
@@ -278,10 +275,6 @@ class TimepixFlyBackend:
else:
logger.warning(f"Callback with UUID {cb_id} not found.")
def reset_message_buffer(self):
"""Reset the message buffer."""
self.msg_buffer.clear()
def start_data_server(self) -> StatusBase:
"""
Start the data server to receive data from the Timepix Fly backend over a socket connection.
@@ -365,6 +358,9 @@ class TimepixFlyBackend:
logger.error(f"Error accepting connection: {content}")
continue
logger.debug(f"Connection accepted from {addr} for timepix_fly backend.")
if self.__msg_buffer:
logger.warning(f"Found messages in msg_buffer: {self.__msg_buffer}")
self.__msg_buffer.clear()
conn.settimeout(0.1) # Set timeout for connection to avoid blocking in recv
with conn:
while not self._data_thread_shutdown_event.is_set():
@@ -406,7 +402,7 @@ class TimepixFlyBackend:
logger.error(f"TimePixFlyBackend: Failed to decode JSON from buffer: {buffer}")
return # TODO should this raise, or only log error as of now?
self.msg_buffer.append(obj)
self.__msg_buffer.append(obj)
if obj.get("type", "") == "EndFrame":
try:
# If the EndFrame message is received, run the callbacks
@@ -415,21 +411,21 @@ class TimepixFlyBackend:
content = traceback.format_exc()
logger.error(f"Error in msg callbacks with error msg: {content}")
msgs_in_buffer = "".join(
[f"{msg['type']} with keys {msg.keys()} \n" for msg in self.msg_buffer]
[f"{msg['type']} with keys {msg.keys()} \n" for msg in self.__msg_buffer]
)
logger.debug(f"TimePixFlyBackend: Messages in buffer: {msgs_in_buffer}")
finally: # Make sure to always reset the message buffer after processing
logger.debug(
"TimePixFlyBackend: Resetting message buffer after processing EndFrame message."
)
self.reset_message_buffer()
self.__msg_buffer.clear()
def run_msg_callbacks(self):
"""Run callbacks if EndFrame message is received."""
# TODO
start_frame = self.msg_buffer[0]
end_frame = self.msg_buffer[-1]
data_frames = self.msg_buffer[1:-1]
start_frame = self.__msg_buffer[0]
end_frame = self.__msg_buffer[-1]
data_frames = self.__msg_buffer[1:-1]
for cb, kwd in self.callbacks.values():
cb(start_frame, data_frames, end_frame, **kwd)
@@ -494,7 +490,6 @@ if __name__ == "__main__": # pragma: no cover
status_2 = timepix.on_complete()
status_2.wait(timeout=10)
print("TimepixFlyBackend scan completed.")
timepix.reset_message_buffer()
print(
f"Received {len(start_frames)} start frames, {len(xes_frames)} data frames, and {len(end_frames)} end frames."
)