wip
Some checks failed
CI for superxas_bec / test (push) Failing after 36s
CI for superxas_bec / test (pull_request) Failing after 36s

This commit is contained in:
2025-11-12 14:38:47 +01:00
parent b6cc3417e3
commit cfe376bfac
3 changed files with 100 additions and 18 deletions

View File

@@ -11,7 +11,7 @@ import enum
import threading
import time
import traceback
from typing import Any, Literal
from typing import TYPE_CHECKING, Any, Literal
import numpy as np
from bec_lib.logger import bec_logger
@@ -20,6 +20,7 @@ from ophyd import Component as Cpt
from ophyd import DeviceStatus, Kind, StatusBase
from ophyd_devices import AsyncSignal, CompareStatus, PreviewSignal, TransitionStatus
from ophyd_devices.devices.areadetector.cam import ASItpxCam
from ophyd_devices.devices.areadetector.plugins import HDF5Plugin, ImagePlugin
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from typeguard import typechecked
@@ -31,6 +32,10 @@ from superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_interface impor
)
from superxas_bec.devices.timepix.utils import AndStatusWithList
if TYPE_CHECKING:
from bec_lib.messages import DevicePreviewMessage
logger = bec_logger.logger
# pylint: disable=redefined-outer-name
@@ -103,11 +108,21 @@ class EXPOSUREMODE(int, enum.Enum):
TRIGGER_WIDTH = 1
class DATASOURCE(int, enum.Enum):
"""Data source for AD Epics backend for Timepix."""
NONE = 0
PREVIEW = 1
IMAGE = 2
# pylint: disable=too-many-instance-attributes, too-many-arguments, too-many-locals
class TimePixControl(ADBase):
"""Interface for the TimePix EPICS control of the TimePix detector."""
cam = Cpt(ASItpxCam, "cam1:")
image = Cpt(ImagePlugin, "image1:")
hdf = Cpt(HDF5Plugin, "HDF1:")
# latest hdf5 plugin
# latest image plugin
@@ -121,6 +136,7 @@ class Timepix(PSIDeviceBase, TimePixControl):
"""
_DETECTOR_SHAPE = (512, 512) # Shape of the TimePix detector
USER_ACCESS = ["troin", "troistep", "get_pixel_map", "set_pixel_map"]
# TODO Check names with beamline team, async signals current receive a nested data name structure
xes_data = Cpt(AsyncSignal, name="xes_data", ndim=2, max_size=1000)
@@ -186,6 +202,11 @@ class Timepix(PSIDeviceBase, TimePixControl):
self._n_energy_points = 3
self._troistep = 1
self._troin = 5000
self._poll_thread = threading.Thread(
target=self._poll_array_data, daemon=True, name=f"{self.name}_poll_thread"
)
self._poll_thread_kill_event = threading.Event()
self._poll_rate = 1 # Poll rate in Hz
self._pv_timeout = 5
self._readout_time = 2.1e-3 # 2.1ms readout time to ensure readout is >2ms, required from ASI serval server..
self.r_lock = threading.RLock() # Lock to access the message buffer safely
@@ -193,6 +214,36 @@ class Timepix(PSIDeviceBase, TimePixControl):
name=name, prefix=prefix, scan_info=scan_info, device_manager=device_manager, **kwargs
)
def _poll_array_data(self):
"""Poll the array data for preview updates."""
while not self._poll_thread_kill_event.wait(1 / self._poll_rate):
try:
# logger.info(f"Running poll loop for {self.name}..")
value = self.image1.array_data.get()
if value is None:
continue
width = self.image1.array_size.width.get()
height = self.image1.array_size.height.get()
# Geometry correction for the image
data = np.reshape(value, (height, width))
last_image: DevicePreviewMessage = self.preview.get()
# logger.info(f"Preview image for {self.name} has shape {data.shape}")
if last_image is not None:
if np.array_equal(data, last_image.data):
# No update if image is the same, ~2.5ms on 2400x2400 image (6M)
logger.debug(
f"Pilatus preview image for {self.name} is the same as last one, not updating."
)
continue
logger.debug(f"Setting preview datsa for {self.name}")
self.preview.put(data)
except Exception: # pylint: disable=broad-except
content = traceback.format_exc()
logger.error(
f"Error while polling array data for preview of {self.name}: {content}"
)
###
def msg_buffer_callback(
self,
@@ -272,6 +323,16 @@ class Timepix(PSIDeviceBase, TimePixControl):
}
self.xes_info.put(msg_info)
### User ACCESS methods
def get_pixel_map(self) -> dict:
"""Get the current pixel map as a dictionary."""
return self._pixel_map.model_dump()
def set_pixel_map(self, pixel_map: dict) -> None:
"""Set the pixel map from a dictionary."""
self._pixel_map = PixelMap.model_validate(pixel_map)
@property
def n_energy_points(self) -> int:
"""Energy points for the TimePix detector."""
@@ -340,6 +401,7 @@ class Timepix(PSIDeviceBase, TimePixControl):
self.cam.trigger_mode.set(TRIGGERMODE.INTERNAL).wait(timeout=self._pv_timeout)
self.cam.trigger_source.set(TRIGGERSOURCE.HDMI1_1).wait(timeout=self._pv_timeout)
self.cam.exposure_mode.set(EXPOSUREMODE.TIMED).wait(timeout=self._pv_timeout)
self.image1.unique_id.set(1).wait(timeout=self._pv_timeout)
# Prepare backend for TimePixFly
self.backend.on_connected()
@@ -374,6 +436,7 @@ class Timepix(PSIDeviceBase, TimePixControl):
self.cam.acquire_period.set(exp_time).wait(timeout=self._pv_timeout)
self.cam.num_images.set(num_images).wait(timeout=self._pv_timeout)
self.cam.raw_enable.set(1).wait(timeout=self._pv_timeout)
self.cam.data_source.set(DATASOURCE.IMAGE).wait(timeout=self._pv_timeout)
# -------------------------
# Prepare TimePixFly
@@ -437,26 +500,38 @@ class Timepix(PSIDeviceBase, TimePixControl):
status_camera = TransitionStatus(
self.cam.acquire_busy, [ACQUIRESTATUS.DONE, ACQUIRESTATUS.ACQUIRING, ACQUIRESTATUS.DONE]
)
status_backend_on_trigger = DeviceStatus(self)
status_backend_on_trigger.add_callback(self._trigger_callback)
status_backend_collect_started = DeviceStatus(self)
self.cancel_on_stop(status_camera)
status = self.backend.on_trigger()
status.wait(timeout=5) # Wait until backend trigger is done
# Add Collect callback
# Backend ready for connection, now start camera
status = StatusBase()
self.backend.timepix_fly_client.add_status_callback(
status_backend_collect_started,
success=[TimePixFlyStatus.COLLECT],
status=status,
success=[TimePixFlyStatus.CONFIG],
error=[TimePixFlyStatus.EXCEPT, TimePixFlyStatus.SHUTDOWN],
)
# Start on trigger on backend
status_backend_on_trigger = self.backend.on_trigger(status=status_backend_on_trigger)
status = AndStatusWithList(
status_list=[status_camera, status_backend_on_trigger, status_backend_collect_started],
device=self,
)
self.cancel_on_stop(status)
return status
return_status = status_camera & status
self.cam.acquire.put(1)
return return_status
# # Add Collect callback
# self.backend.timepix_fly_client.add_status_callback(
# status_backend_collect_started,
# success=[TimePixFlyStatus.COLLECT],
# error=[TimePixFlyStatus.EXCEPT, TimePixFlyStatus.SHUTDOWN],
# )
# # Start on trigger on backend
# status_backend_on_trigger = self.backend.on_trigger(status=status_backend_on_trigger)
# status = AndStatusWithList(
# status_list=[status_camera, status_backend_on_trigger, status_backend_collect_started],
# device=self,
# )
# self.cancel_on_stop(status)
# return status
def on_complete(self) -> DeviceStatus | StatusBase | None:
"""Called to inquire if a device has completed a scans."""
@@ -486,6 +561,7 @@ class Timepix(PSIDeviceBase, TimePixControl):
def on_destroy(self):
"""Cleanup method to stop the device and clean up resources."""
self.cam.acquire.put(0)
self._poll_thread_kill_event.set()
self.backend.on_stop()
self.backend.on_destroy()
@@ -496,7 +572,7 @@ if __name__ == "__main__": # pragma: no cover
timepix = Timepix(
name="timepix",
prefix="X10DA-ES-TPX1:",
backend_rest_url="P4-0017.psi.ch:8452",
backend_rest_url="P4-0017.psi.ch:8452", # "P4-0017.psi.ch:8452",
hostname="x10da-bec-001.psi.ch",
)
try:

View File

@@ -14,6 +14,7 @@ import json
import signal
import socket
import threading
import time
import traceback
import uuid
from typing import TYPE_CHECKING, Callable, Tuple
@@ -385,6 +386,8 @@ class TimepixFlyBackend:
while not self._data_thread_shutdown_event.is_set(): # Shutdown event
try:
# blocks until connected or timeout reached
time.sleep(0.5)
logger.info("Waiting for connection from timepix_fly backend...")
conn, addr = self._socket_server.accept()
except socket.timeout:
continue # Timeout is okay, continue
@@ -456,6 +459,7 @@ class TimepixFlyBackend:
logger.debug(
"TimePixFlyBackend: Resetting message buffer after processing EndFrame message."
)
logger.debug(f"Messages in buffer: {len(self.__msg_buffer)}")
self.__msg_buffer.clear()
def run_msg_callbacks(self):
@@ -519,7 +523,7 @@ if __name__ == "__main__": # pragma: no cover
print("TimepixFlyBackend staged with configuration and pixel map.")
for ii in range(5):
print(f"Starting scan {ii + 1}...;")
# time.sleep(1)
time.sleep(1)
status_1 = timepix.on_trigger()
# print("TimepixFlyBackend pre-scan started.")
status_1.wait(timeout=10)

View File

@@ -309,6 +309,7 @@ class TimepixFlyClient:
Returns:
Any: The parsed response if a model is provided, else the raw response.
"""
logger.debug(f"Sending GET request to TimePix server: {get_cmd}")
response = requests.get(f"http://{self.rest_url}/{get_cmd}", timeout=self._timeout)
response.raise_for_status() # Raise an error for bad responses
if get_response_model is not None:
@@ -334,6 +335,7 @@ class TimepixFlyClient:
Returns:
Any: The parsed response if a model is provided, else None.
"""
logger.debug(f"Sending PUT request to TimePix server: {put_cmd} with value: {value}")
response = requests.put(
f"http://{self.rest_url}/{put_cmd}", json=value, timeout=self._timeout
)