wip cleanup

This commit is contained in:
2025-09-11 17:06:36 +02:00
parent 9c051021ab
commit a21dd5c653
3 changed files with 279 additions and 145 deletions

View File

@@ -11,14 +11,13 @@ import enum
import threading
import time
import traceback
from typing import TYPE_CHECKING, Any, Literal
from typing import Any, Literal
import numpy as np
from bec_lib.logger import bec_logger
from ophyd import ADBase
from ophyd import Component as Cpt
from ophyd import Device, DeviceStatus, Kind, StatusBase
from ophyd_devices import AndStatus as _AndStatus
from ophyd import DeviceStatus, Kind, StatusBase
from ophyd_devices import AsyncSignal, CompareStatus, TransitionStatus
from ophyd_devices.devices.areadetector.cam import ASItpxCam
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
@@ -30,140 +29,7 @@ from superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_interface impor
OtherConfigModel,
PixelMap,
)
class AndStatus(StatusBase):
"""Custom AndStatus for TimePix detector."""
def __init__(
self,
left: StatusBase | DeviceStatus | list[StatusBase | DeviceStatus] | None,
name: str | Device | None = None,
right: StatusBase | DeviceStatus | list[StatusBase | DeviceStatus] | None = None,
**kwargs,
):
self.left = left if isinstance(left, list) else [left]
if right is not None:
self.right = right if isinstance(right, list) else [right]
else:
self.right = []
self.all_statuses = self.left + self.right
if name is None:
name = "unname_status"
elif isinstance(name, Device):
name = name.name
else:
name = name
self.name = name
super().__init__(**kwargs)
self._trace_attributes["left"] = [st._trace_attributes for st in self.left]
self._trace_attributes["right"] = [st._trace_attributes for st in self.right]
def inner(status):
with self._lock:
if self._externally_initiated_completion:
return
if self.done: # Return if status is already done.. It must be resolved already
return
for st in self.all_statuses:
with st._lock:
if st.done and not st.success:
self.set_exception(st.exception()) # st._exception
return
if all(st.done for st in self.all_statuses) and all(
st.success for st in self.all_statuses
):
self.set_finished()
for st in self.all_statuses:
with st._lock:
st.add_callback(inner)
def __repr__(self):
return "({self.left!r} & {self.right!r})".format(self=self)
def __str__(self):
return "{0}(done={1.done}, " "success={1.success})" "".format(self.__class__.__name__, self)
def __contains__(self, status: StatusBase) -> bool:
for child in [self.left, self.right]:
if child == status:
return True
if isinstance(child, AndStatus):
if status in child:
return True
return False
# def set_exception(self, exc):
# with self._lock:
# super().set_exception(exc)
# for st in self.all_statuses:
# with st._lock:
# #TODO consider removing
# if not st.done:
# st.set_exception(
# RuntimeError(f"AndStatus exception on high-level status, caused by {exc}")
# )
def _run_callbacks(self):
"""
Set the Event and run the callbacks.
"""
if self.timeout is None:
timeout = None
else:
timeout = self.timeout + self.settle_time
if not self._settled_event.wait(timeout):
# We have timed out. It's possible that set_finished() has already
# been called but we got here before the settle_time timer expired.
# And it's possible that in this space be between the above
# statement timing out grabbing the lock just below,
# set_exception(exc) has been called. Both of these possibilties
# are accounted for.
self.log.warning("%r has timed out", self)
with self._externally_initiated_completion_lock:
# Set the exception and mark the Status as done, unless
# set_exception(exc) was called externally before we grabbed
# the lock.
if self._exception is None:
exc = TimeoutError(
f"Status with name {self.name} failed to complete in specified timeout of {self.timeout + self.settle_time}."
)
self._exception = exc
# Mark this as "settled".
try:
self._settled()
except Exception:
# No alternative but to log this. We can't supersede set_exception,
# and we have to continue and run the callbacks.
self.log.exception("%r encountered error during _settled()", self)
# Now we know whether or not we have succeed or failed, either by
# timeout above or by set_exception(exc), so we can set the Event that
# will mark this Status as done.
with self._lock:
self._event.set()
if self._exception is not None:
try:
self._handle_failure()
except Exception:
self.log.exception("%r encountered an error during _handle_failure()", self)
# The callbacks have access to self, from which they can distinguish
# success or failure.
for cb in self._callbacks:
try:
cb(self)
except Exception:
self.log.exception(
"An error was raised on a background thread while "
"running the callback %r(%r).",
cb,
self,
)
self._callbacks.clear()
from superxas_bec.devices.timepix.utils import AndStatusWithList
logger = bec_logger.logger
@@ -242,6 +108,8 @@ class TimePixControl(ADBase):
"""Interface for the TimePix EPICS control of the TimePix detector."""
cam = Cpt(ASItpxCam, "cam1:")
# latest hdf5 plugin
# latest image plugin
class Timepix(PSIDeviceBase, TimePixControl):
@@ -574,9 +442,9 @@ class Timepix(PSIDeviceBase, TimePixControl):
# Start on trigger on backend
status_backend_on_trigger = self.backend.on_trigger(status=status_backend_on_trigger)
status = AndStatus(
[status_camera, status_backend_on_trigger, status_backend_collect_started],
name=f"{self.name}_trigger_status",
status = AndStatusWithList(
status_list=[status_camera, status_backend_on_trigger, status_backend_collect_started],
device=self,
)
self.cancel_on_stop(status)
return status
@@ -590,8 +458,8 @@ class Timepix(PSIDeviceBase, TimePixControl):
# Add callback to the backend complete handling
status_backend = self.backend.on_complete(status=status_backend)
# Combine the statuses
complete_status = AndStatus(
[status_backend, status_detector], name=f"{self.name}_complete_status"
complete_status = AndStatusWithList(
status_list=[status_backend, status_detector], device=self
)
self.cancel_on_stop(complete_status)
return complete_status

View File

@@ -170,6 +170,32 @@ class TimepixFlyBackend:
self.timepix_fly_client.start()
return status
def on_trigger_finished(
self, status: StatusBase | DeviceStatus | None = None
) -> StatusBase | DeviceStatus:
"""
Hook for on_trigger_finished logic. It adds a status callback based on the TimePixFlyStatus.
The backend needs to get into the CONFIG state again after a trigger is finished.
In practice, a full scan logic is happening during on trigger.
The status will be marked as finished/successful when the backend state
reaches CONFIG. If an exception state is reached, the status will be marked as failed.
Args:
status (StatusBase | DeviceStatus | None): The status object to track the operation.
If None, a new StatusBase object will be created.
Returns:
StatusBase | DeviceStatus: The status object that will be updated with the operation's result
"""
if status is None:
status = StatusBase()
self.cancel_on_stop(status)
self.timepix_fly_client.add_status_callback(
status,
success=[TimePixFlyStatus.CONFIG],
error=[TimePixFlyStatus.EXCEPT, TimePixFlyStatus.SHUTDOWN],
)
return status
def on_complete(
self, status: StatusBase | DeviceStatus | None = None
) -> StatusBase | DeviceStatus:
@@ -495,10 +521,13 @@ if __name__ == "__main__": # pragma: no cover
# print("TimepixFlyBackend pre-scan started.")
status_1.wait(timeout=10)
mock_server.start_acquisition()
# print("Acquisition started on mock server.")
status_2 = timepix.on_complete()
status_2 = timepix.on_trigger_finished()
status_2.wait(timeout=10)
# print("Acquisition started on mock server.")
print("TimepixFlyBackend scan completed.")
status = timepix.on_complete()
status.wait(timeout=10)
print(
f"Received {len(start_frames)} start frames, {len(xes_frames)} data frames, and {len(end_frames)} end frames."
)
@@ -506,3 +535,4 @@ if __name__ == "__main__": # pragma: no cover
logger.error(f"Error during TimepixFlyBackend operation: {e}")
finally:
timepix.on_destroy()
print("TimepixFlyBackend destroyed.")

View File

@@ -1,4 +1,240 @@
# """Utility module for the Timepix detector."""
"""Utility module for the Timepix detector."""
from __future__ import annotations
from typing import TYPE_CHECKING
from ophyd import Device, DeviceStatus, StatusBase
class AndStatusWithList(DeviceStatus):
"""
Custom implementation of the AndStatus that combines the
option to add multiple statuses as a list, and in addition
allows for adding the Device as an object to access its
methods.
Args"""
def __init__(
self,
device: Device,
status_list: StatusBase | DeviceStatus | list[StatusBase | DeviceStatus],
**kwargs,
):
self.all_statuses = status_list if isinstance(status_list, list) else [status_list]
super().__init__(device=device, **kwargs)
self._trace_attributes["all"] = [st._trace_attributes for st in self.all_statuses]
def inner(status):
with self._lock:
if self._externally_initiated_completion:
return
if self.done: # Return if status is already done.. It must be resolved already
return
for st in self.all_statuses:
with st._lock:
if st.done and not st.success:
self.set_exception(st.exception()) # st._exception
return
if all(st.done for st in self.all_statuses) and all(
st.success for st in self.all_statuses
):
self.set_finished()
for st in self.all_statuses:
with st._lock:
st.add_callback(inner)
# TODO improve __repr__ and __str__
def __repr__(self):
return "<AndStatusWithList({self.all_statuses!r})>".format(self=self)
def __str__(self):
return "<AndStatusWithList(done={self.done}, success={self.success})>".format(self=self)
def __contains__(self, status: StatusBase | DeviceStatus) -> bool:
for child in self.all_statuses:
if child == status:
return True
if isinstance(child, AndStatusWithList):
if status in child:
return True
return False
# TODO Check if this actually works....
def set_exception(self, exc):
with self._lock:
for st in self.all_statuses:
with st._lock:
if not st.done:
st.set_exception(exc)
def _run_callbacks(self):
"""
Set the Event and run the callbacks.
"""
if self.timeout is None:
timeout = None
else:
timeout = self.timeout + self.settle_time
if not self._settled_event.wait(timeout):
self.log.warning("%r has timed out", self)
with self._externally_initiated_completion_lock:
if self._exception is None:
exc = TimeoutError(
f"AndStatus from device {self.device.name} failed to complete in specified timeout of {self.timeout + self.settle_time}."
)
self._exception = exc
# Mark this as "settled".
try:
self._settled()
except Exception:
self.log.exception("%r encountered error during _settled()", self)
with self._lock:
self._event.set()
if self._exception is not None:
try:
self._handle_failure()
except Exception:
self.log.exception("%r encountered an error during _handle_failure()", self)
for cb in self._callbacks:
try:
cb(self)
except Exception:
self.log.exception(
"An error was raised on a background thread while "
"running the callback %r(%r).",
cb,
self,
)
self._callbacks.clear()
class AndStatus(StatusBase):
"""Custom AndStatus for TimePix detector."""
def __init__(
self,
left: StatusBase | DeviceStatus | list[StatusBase | DeviceStatus] | None,
name: str | Device | None = None,
right: StatusBase | DeviceStatus | list[StatusBase | DeviceStatus] | None = None,
**kwargs,
):
self.left = left if isinstance(left, list) else [left]
if right is not None:
self.right = right if isinstance(right, list) else [right]
else:
self.right = []
self.all_statuses = self.left + self.right
if name is None:
name = "unname_status"
elif isinstance(name, Device):
name = name.name
else:
name = name
self.name = name
super().__init__(**kwargs)
self._trace_attributes["left"] = [st._trace_attributes for st in self.left]
self._trace_attributes["right"] = [st._trace_attributes for st in self.right]
def inner(status):
with self._lock:
if self._externally_initiated_completion:
return
if self.done: # Return if status is already done.. It must be resolved already
return
for st in self.all_statuses:
with st._lock:
if st.done and not st.success:
self.set_exception(st.exception()) # st._exception
return
if all(st.done for st in self.all_statuses) and all(
st.success for st in self.all_statuses
):
self.set_finished()
for st in self.all_statuses:
with st._lock:
st.add_callback(inner)
def __repr__(self):
return "({self.left!r} & {self.right!r})".format(self=self)
def __str__(self):
return "{0}(done={1.done}, " "success={1.success})" "".format(self.__class__.__name__, self)
def __contains__(self, status: StatusBase) -> bool:
for child in [self.left, self.right]:
if child == status:
return True
if isinstance(child, AndStatus):
if status in child:
return True
return False
def _run_callbacks(self):
"""
Set the Event and run the callbacks.
"""
if self.timeout is None:
timeout = None
else:
timeout = self.timeout + self.settle_time
if not self._settled_event.wait(timeout):
# We have timed out. It's possible that set_finished() has already
# been called but we got here before the settle_time timer expired.
# And it's possible that in this space be between the above
# statement timing out grabbing the lock just below,
# set_exception(exc) has been called. Both of these possibilties
# are accounted for.
self.log.warning("%r has timed out", self)
with self._externally_initiated_completion_lock:
# Set the exception and mark the Status as done, unless
# set_exception(exc) was called externally before we grabbed
# the lock.
if self._exception is None:
exc = TimeoutError(
f"Status with name {self.name} failed to complete in specified timeout of {self.timeout + self.settle_time}."
)
self._exception = exc
# Mark this as "settled".
try:
self._settled()
except Exception:
# No alternative but to log this. We can't supersede set_exception,
# and we have to continue and run the callbacks.
self.log.exception("%r encountered error during _settled()", self)
# Now we know whether or not we have succeed or failed, either by
# timeout above or by set_exception(exc), so we can set the Event that
# will mark this Status as done.
with self._lock:
self._event.set()
if self._exception is not None:
try:
self._handle_failure()
except Exception:
self.log.exception("%r encountered an error during _handle_failure()", self)
# The callbacks have access to self, from which they can distinguish
# success or failure.
for cb in self._callbacks:
try:
cb(self)
except Exception:
self.log.exception(
"An error was raised on a background thread while "
"running the callback %r(%r).",
cb,
self,
)
self._callbacks.clear()
# from __future__ import annotations