This commit is contained in:
2025-08-15 09:20:28 +02:00
parent ae541fb185
commit ce9c6edc0d
4 changed files with 126 additions and 122 deletions

View File

@@ -7,12 +7,13 @@ is implemented via EPICS IOC.
import enum
import threading
import time
from typing import Any, Literal
import numpy as np
from bec_lib.logger import bec_logger
from ophyd import ADBase
from ophyd import Component as Cpt
from ophyd import DeviceStatus, StatusBase
from ophyd import DeviceStatus, Kind, StatusBase
from ophyd_devices import AndStatus, AsyncSignal, CompareStatus, TransitionStatus
from ophyd_devices.devices.areadetector.cam import ASItpxCam
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
@@ -109,19 +110,18 @@ class Timepix(PSIDeviceBase, TimePixControl):
The hostname needs to be set to the name of this machine, e.h. x10da-bec-001.psi.ch.
"""
tds_energies = Cpt(
_DETECTOR_SHAPE = (512, 512) # Shape of the TimePix detector
xes_data = Cpt(AsyncSignal, name="xes_data", ndim=2, max_size=1000)
xes_spectra = Cpt(AsyncSignal, name="xes_spectra", ndim=1, max_size=1000)
xes_info = Cpt(
AsyncSignal,
name="tds_signal",
ndim=2,
max_size=1000,
async_update={"type": "add", "max_shape": [None, None]},
)
tds_spectra = Cpt(
AsyncSignal,
name="tds_spectra",
ndim=1,
max_size=1000,
name="xes_info",
ndim=0,
async_update={"type": "add", "max_shape": [None]},
signals=[("tds_period", Kind.normal), ("tds_total_events", Kind.normal)],
max_size=1000,
)
def __init__(
@@ -172,51 +172,68 @@ class Timepix(PSIDeviceBase, TimePixControl):
name=name, prefix=prefix, scan_info=scan_info, device_manager=device_manager, **kwargs
)
def _process_msg_buffer(self, status: DeviceStatus):
def msg_buffer_callback(
self,
start_frame: dict[
Literal[
"type", "Mode", "TRoiStart", "TRoiStep", "TRoiN" "NumEnergyPoints", "save_interval"
],
Any,
],
data_frames: list[
dict[
Literal["type", "period", "totalEvents", "TDSpectra", "beforeROI", "afterROI"], Any
]
],
end_frame: dict[Literal["type", "error"], Any],
# xes_data_signal: AsyncSignal,
# xes_spectra_signal: AsyncSignal,
# tds_period_signal: AsyncSignal,
# tds_total_events_signal: AsyncSignal,
):
"""
Callback to set the tds_signal when the trigger status is done.
If the status was not successful, the tds_signal is not set.
Callback method to be attached to the backend to process the message buffer. The callback expects
start_frame, data_frames, and end_frame as arguments. Additionally, one may pass extra kwargs that
will be passed to the callback function.
"""
# obj = status.__obj__
md = {"period_list": [], "total_event_list": []}
data = np.zeros((self.n_energy_points, self.troin))
# Discuss possibilities and expected behavior for the messages received in the msg_buffer.
with self.r_lock:
if status.done and status.success:
if len(self.backend.msg_buffer) <= 2:
logger.error(f"No data received in msg_buffer: {self.backend.msg_buffer}")
return
# Check start frame
start_frame = self.backend.msg_buffer[0]
if start_frame.get("type", "") != "StartFrame":
logger.error(f"First message in msg_buffer is not a StartFrame: {start_frame}")
return
# Check end frame
end_frame = self.backend.msg_buffer[-1]
if end_frame.get("type", "") != "EndFrame":
logger.error(f"Last message in msg_buffer is not an EndFrame: {end_frame}")
return
# Loop over XesData messages
for msg in self.backend.msg_buffer[1:-1]:
if msg.get("type", "") != "XesData":
logger.error(
f"Received unexpected message type: {msg.get('type', 'None')}, Excpected: XesData"
)
return
md["period_list"].append(msg["period"])
md["total_event_list"].append(msg["totalEvents"])
for i in range(self.n_energy_points):
data[i, :] += msg["TDSpectra"][i :: self.n_energy_points]
# Clear the buffer
self.backend.reset_message_buffer()
# Set the signals, if no data was received, the signals will be set as empty zero arrays.
self.tds_energies.put(
{self.tds_energies.name: {"value": data, "timestamp": time.time()}}, metadata=md
)
self.tds_spectra.put(
{self.tds_spectra.name: {"value": data.sum(axis=1), "timestamp": time.time()}},
metadata=md,
n_energy_points = start_frame["NumEnergyPoints"]
if n_energy_points != self._n_energy_points:
logger.error(
f"Number of energy points {n_energy_points} does not match expected {self._n_energy_points}."
)
# TODO should we return, continue or raise?
troin = start_frame["TRoiN"]
if troin != self._troin:
logger.error(f"Number of pixels {troin} does not match expected {self._troin}.")
# TODO should we return, continue or raise?
# Create data return arrays
xes_data = np.zeros((n_energy_points, troin), dtype=np.uint32)
tds_period = 0
tds_total_events = 0
if len(data_frames) == 0:
logger.info(
f"No data frames received in msg_buffer; for start_frame: {start_frame}, end_frame: {end_frame}"
)
else:
for msg in data_frames:
tds_period += msg["period"]
tds_total_events += msg["totalEvents"]
for ii in range(n_energy_points):
xes_data[ii, :] += msg["TDSpectra"][ii::n_energy_points]
# Put XES data
msg_data = {self.xes_data.name: {"value": xes_data, "timestamp": time.time()}}
self.xes_data.put(
msg_data, metadata={"type": "add", "max_shape": [None, n_energy_points, troin]}
)
# Put XES spectra
msg_spectra = {
self.xes_spectra.name: {"value": xes_data.sum(axis=1), "timestamp": time.time()}
}
self.xes_spectra.put(msg_spectra, metadata={"type": "add", "max_shape": [None, troin]})
# Put TDS info
msg_info = {"tds_period": tds_period, "tds_total_events": tds_total_events}
self.xes_info.put(msg_info)
@property
def n_energy_points(self) -> int:
@@ -287,6 +304,8 @@ class Timepix(PSIDeviceBase, TimePixControl):
# Prepare backend for TimePixFly
self.backend.on_connected()
# Register the callback for processing data received by the backend
self.backend.add_callback(self.msg_buffer_callback)
def on_stage(self) -> DeviceStatus | StatusBase | None:
"""
@@ -294,30 +313,20 @@ class Timepix(PSIDeviceBase, TimePixControl):
Information about the upcoming scan can be accessed from the scan_info (self.scan_info.msg) object.
"""
# currently hardcode acquire time.. -> discuss logic here
# TODO discuss if the acquire period should be equivalent to the exposure time,
# and acquire time always 2ms shorter than the acquire period.
# num_images is currently hardcoded, is this basically a burst frame at each point of a step scan?
exp_time = self.scan_info.msg.scan_parameters.get("exp_time", 1) # TODO remove hardcoded 1
exp_time = self.scan_info.msg.scan_parameters.get("exp_time", 0)
if exp_time - self._readout_time <= 0:
raise ValueError(
f"Exposure time {exp_time} must be greater than readout time {self._readout_time}."
)
num_images = self.scan_info.msg.scan_parameters.get(
"frames_per_trigger", 1
) # TODO remove hardcoded 1
num_images = self.scan_info.msg.scan_parameters.get("frames_per_trigger", 1)
self.cam.acquire_time.set(exp_time - self._readout_time).wait(timeout=self._pv_timeout)
self.cam.acquire_period.set(exp_time).wait(timeout=self._pv_timeout)
self.cam.num_images.set(num_images).wait(timeout=self._pv_timeout)
self.cam.raw_enable.set(1).wait(timeout=self._pv_timeout)
# TODO: can we inspect this from the backend instead!!
# -------------------------
# Prepare TimePixFly
# TODO RoiStep and TRoinN are those config parameters across many scans,
# or parameter that are configured similar to energypoints --> pixel_map?
other_config = OtherConfigModel(
TRoiStep=self.troistep,
TRoiN=self.troin,
@@ -335,7 +344,6 @@ class Timepix(PSIDeviceBase, TimePixControl):
def on_unstage(self) -> None:
"""Called while unstaging the device."""
# Camera to do?
self.backend.on_unstage()
def on_pre_scan(self) -> StatusBase:
@@ -353,27 +361,23 @@ class Timepix(PSIDeviceBase, TimePixControl):
"""Trigger callback to start the acquisition."""
status.device.cam.acquire.put(1)
# TODO should this be handled in the backend instead?
# Reset the msg_buffer
with self.r_lock:
self.backend.reset_message_buffer()
# The detector is definitely ready to acquire data, so we can start the acquisition.
# and in ACQUIRESTATUS.DONE, this is ensured by on_pre_scan, and the callback status here.
# Detector will be ready to start, as either pre_scan or the status_camera from a previous
# trigger will ensure that the detector is in ACQUIRESTATUS.DONE state.
status_backend = DeviceStatus(self)
# Add callback that starts the acquisition on the detector
status_backend.add_callback(trigger_callback)
# Trigger the backend to start, then the callback will trigger the acquisition on the detector.
# This adds a status callback from the backend, which will resolve the status once the backend is finished.
# Prepare the backend, attach the status to the state of the backend
status_backend = self.backend.on_trigger(status=status_backend)
# Prepare the camera status that resolves when the camera is finished acquiring
status_camera = TransitionStatus(
self.cam.acquire_busy, [ACQUIRESTATUS.ACQUIRING, ACQUIRESTATUS.DONE]
)
# Combine the backend and camera status
status = AndStatus(status_backend, status_camera)
status.__obj__ = self # Set the device object for the status
status.add_callback(self._process_msg_buffer)
self.cancel_on_stop(status)
# NOTE, the callback to sent the data will always be called from the backend
# as it is attached via self.backend.add_callback() in on_connected.
return status
def on_complete(self) -> DeviceStatus | StatusBase | None:
@@ -460,9 +464,8 @@ if __name__ == "__main__": # pragma: no cover
print("Waiting for timepix to complete.")
status.wait(timeout=10)
print("Timepix scan completed.")
for ii, msg in enumerate(msgs):
print(f"Received {len(msg)} messages for trigger {ii}.")
print(f"")
for iii, msgi in enumerate(msgs):
print(f"Received {len(msgi)} messages for trigger {iii}.")
timepix.unstage()
print("Timepix unstaged.")
@@ -472,15 +475,3 @@ if __name__ == "__main__": # pragma: no cover
finally:
timepix.destroy()
print("Timepix destroyed.")
# Create a Timepix object
# timepix = Timepix(name="TimePixDetector", prefix="")
# timepix.on_connected()
# timepix.stage()
# timepix.pre_scan()
# mock_server.start_acquisition()
# time.sleep(5)
# timepix.complete()
# print(timepix._data_buffer)
# print(timepix._global_buffer[-100:])
# timepix.unstage()

View File

@@ -3,14 +3,12 @@
from __future__ import annotations
import atexit
import errno
import json
import signal
import socket
import threading
import time
import uuid
from typing import TYPE_CHECKING, Callable
from typing import TYPE_CHECKING, Callable, Tuple
from bec_lib.logger import bec_logger
from ophyd import StatusBase
@@ -51,7 +49,7 @@ class TimepixFlyBackend:
self.hostname = hostname
self.socket_port = socket_port # Try using 0 to let the OS choose an available port
self.msg_buffer = []
self.callbacks: dict[str, Callable[[dict, list[dict], dict], None]] = {}
self.callbacks: dict[str, Tuple[Callable[[dict, list[dict], dict, dict], None], dict]] = {}
self._status_objects: list[StatusBase] = []
self._decoder = json.JSONDecoder()
self._socket_server: socket.socket | None = None
@@ -101,7 +99,6 @@ class TimepixFlyBackend:
def on_pre_scan(self) -> None:
"""Called before the scan starts."""
pass
def on_trigger(
self, status: StatusBase | DeviceStatus | None = None
@@ -174,21 +171,24 @@ class TimepixFlyBackend:
logger.info(f"Cancelled status object: {status}")
self._status_objects.clear()
def add_callback(self, callback: callable) -> str:
def add_callback(self, callback: callable, kwd: dict | None = None) -> str:
"""
Add a callback to be called when an EndFrame message is received. Any raised exception
in the callback will be logged, but not raised.
Args:
callback (callable): The callback function to be called. The callback signature should be:
def callback(start_frame: dict, data_frames: list[dict], end_frame: dict) -> None:
def callback(start_frame: dict, data_frames: list[dict], end_frame: dict, kwd) -> None:
where start_frame is the first message, data_frames is a list of all data frames,
and end_frame is the last message containing the EndFrame type.
and end_frame is the last message containing the EndFrame type, and kwd is a dictionary
with additional keyword arguments passed while registering the callback.
Returns:
str: A unique identifier for the callback.
"""
if kwd is None:
kwd = {}
cb_id = uuid.uuid4()
self.callbacks[cb_id] = callback
self.callbacks[cb_id] = (callback, kwd)
logger.info(f"Callback {callback.__name__} added with UUID {cb_id}.")
return str(cb_id)
@@ -291,6 +291,8 @@ class TimepixFlyBackend:
if obj.get("type", "") == "EndFrame":
# If the EndFrame message is received, run the callbacks
self.run_msg_callbacks()
# Clear the msg_buffer after processing
self.reset_message_buffer()
except json.JSONDecodeError:
logger.warning(f"Failed to decode JSON from buffer: {buffer}")
@@ -300,15 +302,16 @@ class TimepixFlyBackend:
end_frame = self.msg_buffer[-1]
data_frames = self.msg_buffer[1:-1]
# TODO
for cb in self.callbacks:
for cb, kwd in self.callbacks.values():
try:
cb(start_frame, data_frames, end_frame)
cb(start_frame, data_frames, end_frame, **kwd)
except Exception as e:
logger.error(f"Error in callback {cb}: {e}")
# Should this be allowed to raise?
if __name__ == "__main__": # pragma: no cover
import time
from superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_mock_server import (
TimePixFlyMockServer,
)
@@ -338,15 +341,15 @@ if __name__ == "__main__": # pragma: no cover
timepix.on_stage(other_config=config, pixel_map=pixel_map)
print("TimepixFlyBackend staged with configuration and pixel map.")
for ii in range(10):
print(f"Starting scan {ii + 1}...; sleeping for 1 before start")
time.sleep(1)
status = timepix.on_trigger()
print("TimepixFlyBackend pre-scan started.")
status.wait(timeout=10)
print(f"Starting scan {ii + 1}...;")
# time.sleep(1)
status_1 = timepix.on_trigger()
# print("TimepixFlyBackend pre-scan started.")
status_1.wait(timeout=10)
mock_server.start_acquisition()
print("Acquisition started on mock server.")
status = timepix.on_complete()
status.wait(timeout=10)
# print("Acquisition started on mock server.")
status_2 = timepix.on_complete()
status_2.wait(timeout=10)
print("TimepixFlyBackend scan completed.")
for ii, msg in enumerate(timepix.msg_buffer):
print(f"Message {ii}: {msg.keys()}")

View File

@@ -36,6 +36,10 @@ logger = bec_logger.logger
SERVER_ADDRESS = "localhost:8452" # Default server address for TimePix REST API
class TimePixFlyConnectionError(Exception):
"""Exception raised when the TimePix detector cannot be connected to the server."""
# pylint: disable=arguments-differ
class TimePixStatusError(Exception):
"""Exception raised when the TimePix detector status was in an unexpected state."""
@@ -97,12 +101,16 @@ class TimepixFlyClient:
self.connect()
self.wait_for_connection(timeout=5)
except Exception as e:
except Exception:
content = traceback.format_exc()
logger.error(
f"Error while checking the state of the TimePix server: {e}. "
f"Error while checking the state of the TimePix server: {content}. "
f"Please check the server address and ensure the server is running."
)
raise e
# pylint: disable=raise-missing-from
raise TimePixFlyConnectionError(
f"Failed to properly connect to the TimePixFly service on {self.rest_url}. Please check logs for detailed error."
)
def stop_running_collection(self):
"""
@@ -210,11 +218,11 @@ class TimepixFlyClient:
Handle a message received from the StdDAQ.
"""
try:
logger.info(f"Received status update from TimePixFly: '{msg}'")
self._status = TimePixFlyStatus(msg)
logger.info(f"Received TimepixFly status: {self._status.value}")
except Exception:
content = traceback.format_exc()
logger.warning(f"Failed to decode websocket message: {content}")
logger.error(f"Failed to decode websocket message: {content}")
return
self._run_status_callbacks()
@@ -233,11 +241,11 @@ class TimepixFlyClient:
continue
if status in success:
dev_status.set_finished()
logger.info(f"Timepix status in succes is {status.value}")
logger.info(f"Status callback finished in succes: {status.value}")
self._status_callbacks.pop(cb_id)
elif status in error:
last_error = self.last_error()
logger.warning(
logger.error(
f"Timepix status in error is {status.value}, with last error: {last_error.message}"
)
dev_status.set_exception(

View File

@@ -43,5 +43,7 @@ class TimePixFlyMockServer:
try:
requests.get(f"http://{self.host}:{self.port}/measurement/start", timeout=0.2)
except requests.exceptions.RequestException:
pass # Ignore all exceptions as there is currently no return value for the request
self.add_log("Acquisition started on Timepix Fly mock server.")
self.add_log("Failed to start acquisition on Timepix Fly mock server.")
# Ignore all exceptions as there is currently no return value for the request
else:
self.add_log("Acquisition started on Timepix Fly mock server.")