w
All checks were successful
CI for superxas_bec / test (push) Successful in 37s
CI for superxas_bec / test (pull_request) Successful in 36s

This commit is contained in:
2025-11-30 20:30:14 +01:00
parent 6a9e522e45
commit a6749e4c2b

View File

@@ -19,6 +19,7 @@ from bec_lib.file_utils import get_full_path
from bec_lib.logger import bec_logger
from ophyd import ADBase
from ophyd import Component as Cpt
from ophyd import EpicsSignalRO
from ophyd_devices import (
AsyncSignal,
CompareStatus,
@@ -155,12 +156,18 @@ def load_pixel_map_from_json(file_path: str) -> PixelMap:
return pixel_map
class ImagePlugin_Timepix(ImagePlugin_V35):
"""Custom Image Plugin for TimePix detector."""
unique_id = Cpt(EpicsSignalRO, "UniqueId_RBV", auto_monitor=True)
# pylint: disable=too-many-instance-attributes, too-many-arguments, too-many-locals
class TimePixControl(ADBase):
"""Interface for the TimePix EPICS control of the TimePix detector."""
cam = Cpt(ASItpxCam, "cam1:")
image = Cpt(ImagePlugin_V35, "image1:")
image = Cpt(ImagePlugin_Timepix, "image1:")
hdf = Cpt(HDF5Plugin_V35, "HDF1:")
@@ -180,7 +187,7 @@ class Timepix(PSIDeviceBase, TimePixControl):
MIN_DETECTOR_READOUT_TIME = 2.1e-3 # Minimum readout time in seconds for ASI TimePix detector
# TODO adapt detector shape here
_DETECTOR_SHAPE = (512, 512) # Shape of the TimePix detector
_DETECTOR_SHAPE = (512, 1024) # Shape of the TimePix detector
USER_ACCESS = [
"troin",
"troistep",
@@ -225,10 +232,19 @@ class Timepix(PSIDeviceBase, TimePixControl):
PreviewSignal,
name="preview",
ndim=2,
num_rotation_90=0, # TODO check the orientation
num_rotation_90=1, # TODO check the orientation
doc="Preview signal of the TimePix detector.",
)
spectra = Cpt(
AsyncSignal,
name="spectra",
ndim=1,
acquisition_group="monitored",
async_update={"type": "add", "max_shape": [None]},
doc="Spectra signal of the TimePix detector.",
)
file_event = Cpt(
FileEventSignal, name="file_event", doc="File event signal for TimePix detector."
)
@@ -276,9 +292,8 @@ class Timepix(PSIDeviceBase, TimePixControl):
target=self._poll_array_data, daemon=True, name=f"{self.name}_poll_thread"
)
self._poll_thread_kill_event = threading.Event()
self._poll_rate = (
5 # Image poll rate for preview updates in Hz (max 5 Hz to limit throughput)
)
# Image poll rate for preview updates in Hz (max 5 Hz to limit throughput)
self._poll_rate = 10
self._enable_xes = enable_xes
self._full_path = ""
self._unique_array_id = 0
@@ -529,7 +544,8 @@ class Timepix(PSIDeviceBase, TimePixControl):
# ------------------
# Prepare file writing through AD HDF5 plugin
# -----------------
self.hdf.enable.set(1).wait(timeout=self._pv_timeout)
# TODO reenable HDF5 writing
# self.hdf.enable.set(1).wait(timeout=self._pv_timeout)
self.hdf.file_write_mode.set(FILEWRITEMODE.STREAM.value).wait(timeout=self._pv_timeout)
self.hdf.auto_save.set(1).wait(timeout=self._pv_timeout)
self.hdf.file_template.set("%s%s").wait(timeout=self._pv_timeout)
@@ -548,6 +564,28 @@ class Timepix(PSIDeviceBase, TimePixControl):
logger.info(
f"TimePix detector {self.name} connected and initialized after {time.time() - start_time:.3f} seconds."
)
# Subscribe to new image updates
self.image.unique_id.subscribe(self._on_new_image_received)
def _on_new_image_received(self, value: int, old_value: int, **kwargs):
"""Callback for image unique ID updates to trigger preview update."""
if value == old_value:
return # No new image, or counter reset
try:
# Get new image data
width = self.image.array_size.width.get()
height = self.image.array_size.height.get()
array_data = self.image.array_data.get()
if array_data is None:
logger.info(f"No image data available for preview of {self.name}")
return
# Geometry correction for the image
data = np.sum(np.reshape(array_data, (height, width)), axis=1)
self.spectra.put(data)
except Exception: # pylint: disable=broad-except
content = traceback.format_exc()
logger.error(f"Error while updating preview for {self.name} on image update: {content}")
def on_stage(self) -> StatusBase | None:
"""Called while staging the device."""
@@ -570,7 +608,8 @@ class Timepix(PSIDeviceBase, TimePixControl):
file_path = "/".join(self._full_path.split("/")[:-1])
file_name = self._full_path.split("/")[-1]
self.cam.array_callbacks.set(1).wait(5) # Enable array callbacks
self.hdf.enable.set(1).wait(5) # Enable HDF5 plugin
# TODO reenable HDF5 writing
# self.hdf.enable.set(1).wait(5) # Enable HDF5 plugin
self.hdf.file_path.set(file_path).wait(5)
self.hdf.file_name.set(file_name).wait(5)
self.hdf.num_capture.set(num_images).wait(5)