diff --git a/superxas_bec/devices/timepix/timepix.py b/superxas_bec/devices/timepix/timepix.py index d3b8b62..a9f82d4 100644 --- a/superxas_bec/devices/timepix/timepix.py +++ b/superxas_bec/devices/timepix/timepix.py @@ -7,12 +7,13 @@ is implemented via EPICS IOC. import enum import threading import time +from typing import Any, Literal import numpy as np from bec_lib.logger import bec_logger from ophyd import ADBase from ophyd import Component as Cpt -from ophyd import DeviceStatus, StatusBase +from ophyd import DeviceStatus, Kind, StatusBase from ophyd_devices import AndStatus, AsyncSignal, CompareStatus, TransitionStatus from ophyd_devices.devices.areadetector.cam import ASItpxCam from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase @@ -109,19 +110,18 @@ class Timepix(PSIDeviceBase, TimePixControl): The hostname needs to be set to the name of this machine, e.h. x10da-bec-001.psi.ch. """ - tds_energies = Cpt( + _DETECTOR_SHAPE = (512, 512) # Shape of the TimePix detector + + xes_data = Cpt(AsyncSignal, name="xes_data", ndim=2, max_size=1000) + xes_spectra = Cpt(AsyncSignal, name="xes_spectra", ndim=1, max_size=1000) + + xes_info = Cpt( AsyncSignal, - name="tds_signal", - ndim=2, - max_size=1000, - async_update={"type": "add", "max_shape": [None, None]}, - ) - tds_spectra = Cpt( - AsyncSignal, - name="tds_spectra", - ndim=1, - max_size=1000, + name="xes_info", + ndim=0, async_update={"type": "add", "max_shape": [None]}, + signals=[("tds_period", Kind.normal), ("tds_total_events", Kind.normal)], + max_size=1000, ) def __init__( @@ -172,51 +172,68 @@ class Timepix(PSIDeviceBase, TimePixControl): name=name, prefix=prefix, scan_info=scan_info, device_manager=device_manager, **kwargs ) - def _process_msg_buffer(self, status: DeviceStatus): + def msg_buffer_callback( + self, + start_frame: dict[ + Literal[ + "type", "Mode", "TRoiStart", "TRoiStep", "TRoiN" "NumEnergyPoints", "save_interval" + ], + Any, + ], + data_frames: list[ + dict[ + Literal["type", "period", "totalEvents", "TDSpectra", "beforeROI", "afterROI"], Any + ] + ], + end_frame: dict[Literal["type", "error"], Any], + # xes_data_signal: AsyncSignal, + # xes_spectra_signal: AsyncSignal, + # tds_period_signal: AsyncSignal, + # tds_total_events_signal: AsyncSignal, + ): """ - Callback to set the tds_signal when the trigger status is done. - If the status was not successful, the tds_signal is not set. + Callback method to be attached to the backend to process the message buffer. The callback expects + start_frame, data_frames, and end_frame as arguments. Additionally, one may pass extra kwargs that + will be passed to the callback function. """ - # obj = status.__obj__ - md = {"period_list": [], "total_event_list": []} - data = np.zeros((self.n_energy_points, self.troin)) - # Discuss possibilities and expected behavior for the messages received in the msg_buffer. - with self.r_lock: - if status.done and status.success: - if len(self.backend.msg_buffer) <= 2: - logger.error(f"No data received in msg_buffer: {self.backend.msg_buffer}") - return - # Check start frame - start_frame = self.backend.msg_buffer[0] - if start_frame.get("type", "") != "StartFrame": - logger.error(f"First message in msg_buffer is not a StartFrame: {start_frame}") - return - # Check end frame - end_frame = self.backend.msg_buffer[-1] - if end_frame.get("type", "") != "EndFrame": - logger.error(f"Last message in msg_buffer is not an EndFrame: {end_frame}") - return - # Loop over XesData messages - for msg in self.backend.msg_buffer[1:-1]: - if msg.get("type", "") != "XesData": - logger.error( - f"Received unexpected message type: {msg.get('type', 'None')}, Excpected: XesData" - ) - return - md["period_list"].append(msg["period"]) - md["total_event_list"].append(msg["totalEvents"]) - for i in range(self.n_energy_points): - data[i, :] += msg["TDSpectra"][i :: self.n_energy_points] - # Clear the buffer - self.backend.reset_message_buffer() - # Set the signals, if no data was received, the signals will be set as empty zero arrays. - self.tds_energies.put( - {self.tds_energies.name: {"value": data, "timestamp": time.time()}}, metadata=md - ) - self.tds_spectra.put( - {self.tds_spectra.name: {"value": data.sum(axis=1), "timestamp": time.time()}}, - metadata=md, + n_energy_points = start_frame["NumEnergyPoints"] + if n_energy_points != self._n_energy_points: + logger.error( + f"Number of energy points {n_energy_points} does not match expected {self._n_energy_points}." + ) + # TODO should we return, continue or raise? + troin = start_frame["TRoiN"] + if troin != self._troin: + logger.error(f"Number of pixels {troin} does not match expected {self._troin}.") + # TODO should we return, continue or raise? + + # Create data return arrays + xes_data = np.zeros((n_energy_points, troin), dtype=np.uint32) + tds_period = 0 + tds_total_events = 0 + if len(data_frames) == 0: + logger.info( + f"No data frames received in msg_buffer; for start_frame: {start_frame}, end_frame: {end_frame}" + ) + else: + for msg in data_frames: + tds_period += msg["period"] + tds_total_events += msg["totalEvents"] + for ii in range(n_energy_points): + xes_data[ii, :] += msg["TDSpectra"][ii::n_energy_points] + # Put XES data + msg_data = {self.xes_data.name: {"value": xes_data, "timestamp": time.time()}} + self.xes_data.put( + msg_data, metadata={"type": "add", "max_shape": [None, n_energy_points, troin]} ) + # Put XES spectra + msg_spectra = { + self.xes_spectra.name: {"value": xes_data.sum(axis=1), "timestamp": time.time()} + } + self.xes_spectra.put(msg_spectra, metadata={"type": "add", "max_shape": [None, troin]}) + # Put TDS info + msg_info = {"tds_period": tds_period, "tds_total_events": tds_total_events} + self.xes_info.put(msg_info) @property def n_energy_points(self) -> int: @@ -287,6 +304,8 @@ class Timepix(PSIDeviceBase, TimePixControl): # Prepare backend for TimePixFly self.backend.on_connected() + # Register the callback for processing data received by the backend + self.backend.add_callback(self.msg_buffer_callback) def on_stage(self) -> DeviceStatus | StatusBase | None: """ @@ -294,30 +313,20 @@ class Timepix(PSIDeviceBase, TimePixControl): Information about the upcoming scan can be accessed from the scan_info (self.scan_info.msg) object. """ - # currently hardcode acquire time.. -> discuss logic here - # TODO discuss if the acquire period should be equivalent to the exposure time, - # and acquire time always 2ms shorter than the acquire period. - # num_images is currently hardcoded, is this basically a burst frame at each point of a step scan? - exp_time = self.scan_info.msg.scan_parameters.get("exp_time", 1) # TODO remove hardcoded 1 + exp_time = self.scan_info.msg.scan_parameters.get("exp_time", 0) if exp_time - self._readout_time <= 0: raise ValueError( f"Exposure time {exp_time} must be greater than readout time {self._readout_time}." ) - num_images = self.scan_info.msg.scan_parameters.get( - "frames_per_trigger", 1 - ) # TODO remove hardcoded 1 + num_images = self.scan_info.msg.scan_parameters.get("frames_per_trigger", 1) self.cam.acquire_time.set(exp_time - self._readout_time).wait(timeout=self._pv_timeout) self.cam.acquire_period.set(exp_time).wait(timeout=self._pv_timeout) self.cam.num_images.set(num_images).wait(timeout=self._pv_timeout) self.cam.raw_enable.set(1).wait(timeout=self._pv_timeout) - # TODO: can we inspect this from the backend instead!! # ------------------------- # Prepare TimePixFly - # TODO RoiStep and TRoinN are those config parameters across many scans, - # or parameter that are configured similar to energypoints --> pixel_map? - other_config = OtherConfigModel( TRoiStep=self.troistep, TRoiN=self.troin, @@ -335,7 +344,6 @@ class Timepix(PSIDeviceBase, TimePixControl): def on_unstage(self) -> None: """Called while unstaging the device.""" - # Camera to do? self.backend.on_unstage() def on_pre_scan(self) -> StatusBase: @@ -353,27 +361,23 @@ class Timepix(PSIDeviceBase, TimePixControl): """Trigger callback to start the acquisition.""" status.device.cam.acquire.put(1) - # TODO should this be handled in the backend instead? - # Reset the msg_buffer - with self.r_lock: - self.backend.reset_message_buffer() - # The detector is definitely ready to acquire data, so we can start the acquisition. - # and in ACQUIRESTATUS.DONE, this is ensured by on_pre_scan, and the callback status here. + # Detector will be ready to start, as either pre_scan or the status_camera from a previous + # trigger will ensure that the detector is in ACQUIRESTATUS.DONE state. status_backend = DeviceStatus(self) + # Add callback that starts the acquisition on the detector status_backend.add_callback(trigger_callback) - # Trigger the backend to start, then the callback will trigger the acquisition on the detector. - # This adds a status callback from the backend, which will resolve the status once the backend is finished. + # Prepare the backend, attach the status to the state of the backend status_backend = self.backend.on_trigger(status=status_backend) + # Prepare the camera status that resolves when the camera is finished acquiring status_camera = TransitionStatus( self.cam.acquire_busy, [ACQUIRESTATUS.ACQUIRING, ACQUIRESTATUS.DONE] ) - # Combine the backend and camera status status = AndStatus(status_backend, status_camera) - status.__obj__ = self # Set the device object for the status - status.add_callback(self._process_msg_buffer) self.cancel_on_stop(status) + # NOTE, the callback to sent the data will always be called from the backend + # as it is attached via self.backend.add_callback() in on_connected. return status def on_complete(self) -> DeviceStatus | StatusBase | None: @@ -460,9 +464,8 @@ if __name__ == "__main__": # pragma: no cover print("Waiting for timepix to complete.") status.wait(timeout=10) print("Timepix scan completed.") - for ii, msg in enumerate(msgs): - print(f"Received {len(msg)} messages for trigger {ii}.") - print(f"") + for iii, msgi in enumerate(msgs): + print(f"Received {len(msgi)} messages for trigger {iii}.") timepix.unstage() print("Timepix unstaged.") @@ -472,15 +475,3 @@ if __name__ == "__main__": # pragma: no cover finally: timepix.destroy() print("Timepix destroyed.") - - # Create a Timepix object - # timepix = Timepix(name="TimePixDetector", prefix="") - # timepix.on_connected() - # timepix.stage() - # timepix.pre_scan() - # mock_server.start_acquisition() - # time.sleep(5) - # timepix.complete() - # print(timepix._data_buffer) - # print(timepix._global_buffer[-100:]) - # timepix.unstage() diff --git a/superxas_bec/devices/timepix/timepix_fly_client/timepix_fly_backend.py b/superxas_bec/devices/timepix/timepix_fly_client/timepix_fly_backend.py index 67aab54..1d3fdf3 100644 --- a/superxas_bec/devices/timepix/timepix_fly_client/timepix_fly_backend.py +++ b/superxas_bec/devices/timepix/timepix_fly_client/timepix_fly_backend.py @@ -3,14 +3,12 @@ from __future__ import annotations import atexit -import errno import json import signal import socket import threading -import time import uuid -from typing import TYPE_CHECKING, Callable +from typing import TYPE_CHECKING, Callable, Tuple from bec_lib.logger import bec_logger from ophyd import StatusBase @@ -51,7 +49,7 @@ class TimepixFlyBackend: self.hostname = hostname self.socket_port = socket_port # Try using 0 to let the OS choose an available port self.msg_buffer = [] - self.callbacks: dict[str, Callable[[dict, list[dict], dict], None]] = {} + self.callbacks: dict[str, Tuple[Callable[[dict, list[dict], dict, dict], None], dict]] = {} self._status_objects: list[StatusBase] = [] self._decoder = json.JSONDecoder() self._socket_server: socket.socket | None = None @@ -101,7 +99,6 @@ class TimepixFlyBackend: def on_pre_scan(self) -> None: """Called before the scan starts.""" - pass def on_trigger( self, status: StatusBase | DeviceStatus | None = None @@ -174,21 +171,24 @@ class TimepixFlyBackend: logger.info(f"Cancelled status object: {status}") self._status_objects.clear() - def add_callback(self, callback: callable) -> str: + def add_callback(self, callback: callable, kwd: dict | None = None) -> str: """ Add a callback to be called when an EndFrame message is received. Any raised exception in the callback will be logged, but not raised. Args: callback (callable): The callback function to be called. The callback signature should be: - def callback(start_frame: dict, data_frames: list[dict], end_frame: dict) -> None: + def callback(start_frame: dict, data_frames: list[dict], end_frame: dict, kwd) -> None: where start_frame is the first message, data_frames is a list of all data frames, - and end_frame is the last message containing the EndFrame type. + and end_frame is the last message containing the EndFrame type, and kwd is a dictionary + with additional keyword arguments passed while registering the callback. Returns: str: A unique identifier for the callback. """ + if kwd is None: + kwd = {} cb_id = uuid.uuid4() - self.callbacks[cb_id] = callback + self.callbacks[cb_id] = (callback, kwd) logger.info(f"Callback {callback.__name__} added with UUID {cb_id}.") return str(cb_id) @@ -291,6 +291,8 @@ class TimepixFlyBackend: if obj.get("type", "") == "EndFrame": # If the EndFrame message is received, run the callbacks self.run_msg_callbacks() + # Clear the msg_buffer after processing + self.reset_message_buffer() except json.JSONDecodeError: logger.warning(f"Failed to decode JSON from buffer: {buffer}") @@ -300,15 +302,16 @@ class TimepixFlyBackend: end_frame = self.msg_buffer[-1] data_frames = self.msg_buffer[1:-1] # TODO - for cb in self.callbacks: + for cb, kwd in self.callbacks.values(): try: - cb(start_frame, data_frames, end_frame) + cb(start_frame, data_frames, end_frame, **kwd) except Exception as e: logger.error(f"Error in callback {cb}: {e}") - # Should this be allowed to raise? if __name__ == "__main__": # pragma: no cover + import time + from superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_mock_server import ( TimePixFlyMockServer, ) @@ -338,15 +341,15 @@ if __name__ == "__main__": # pragma: no cover timepix.on_stage(other_config=config, pixel_map=pixel_map) print("TimepixFlyBackend staged with configuration and pixel map.") for ii in range(10): - print(f"Starting scan {ii + 1}...; sleeping for 1 before start") - time.sleep(1) - status = timepix.on_trigger() - print("TimepixFlyBackend pre-scan started.") - status.wait(timeout=10) + print(f"Starting scan {ii + 1}...;") + # time.sleep(1) + status_1 = timepix.on_trigger() + # print("TimepixFlyBackend pre-scan started.") + status_1.wait(timeout=10) mock_server.start_acquisition() - print("Acquisition started on mock server.") - status = timepix.on_complete() - status.wait(timeout=10) + # print("Acquisition started on mock server.") + status_2 = timepix.on_complete() + status_2.wait(timeout=10) print("TimepixFlyBackend scan completed.") for ii, msg in enumerate(timepix.msg_buffer): print(f"Message {ii}: {msg.keys()}") diff --git a/superxas_bec/devices/timepix/timepix_fly_client/timepix_fly_client.py b/superxas_bec/devices/timepix/timepix_fly_client/timepix_fly_client.py index 4ca5575..f33e814 100644 --- a/superxas_bec/devices/timepix/timepix_fly_client/timepix_fly_client.py +++ b/superxas_bec/devices/timepix/timepix_fly_client/timepix_fly_client.py @@ -36,6 +36,10 @@ logger = bec_logger.logger SERVER_ADDRESS = "localhost:8452" # Default server address for TimePix REST API +class TimePixFlyConnectionError(Exception): + """Exception raised when the TimePix detector cannot be connected to the server.""" + + # pylint: disable=arguments-differ class TimePixStatusError(Exception): """Exception raised when the TimePix detector status was in an unexpected state.""" @@ -97,12 +101,16 @@ class TimepixFlyClient: self.connect() self.wait_for_connection(timeout=5) - except Exception as e: + except Exception: + content = traceback.format_exc() logger.error( - f"Error while checking the state of the TimePix server: {e}. " + f"Error while checking the state of the TimePix server: {content}. " f"Please check the server address and ensure the server is running." ) - raise e + # pylint: disable=raise-missing-from + raise TimePixFlyConnectionError( + f"Failed to properly connect to the TimePixFly service on {self.rest_url}. Please check logs for detailed error." + ) def stop_running_collection(self): """ @@ -210,11 +218,11 @@ class TimepixFlyClient: Handle a message received from the StdDAQ. """ try: - logger.info(f"Received status update from TimePixFly: '{msg}'") self._status = TimePixFlyStatus(msg) + logger.info(f"Received TimepixFly status: {self._status.value}") except Exception: content = traceback.format_exc() - logger.warning(f"Failed to decode websocket message: {content}") + logger.error(f"Failed to decode websocket message: {content}") return self._run_status_callbacks() @@ -233,11 +241,11 @@ class TimepixFlyClient: continue if status in success: dev_status.set_finished() - logger.info(f"Timepix status in succes is {status.value}") + logger.info(f"Status callback finished in succes: {status.value}") self._status_callbacks.pop(cb_id) elif status in error: last_error = self.last_error() - logger.warning( + logger.error( f"Timepix status in error is {status.value}, with last error: {last_error.message}" ) dev_status.set_exception( diff --git a/superxas_bec/devices/timepix/timepix_fly_client/timepix_fly_mock_server.py b/superxas_bec/devices/timepix/timepix_fly_client/timepix_fly_mock_server.py index a1dd1dc..7bbc6d1 100644 --- a/superxas_bec/devices/timepix/timepix_fly_client/timepix_fly_mock_server.py +++ b/superxas_bec/devices/timepix/timepix_fly_client/timepix_fly_mock_server.py @@ -43,5 +43,7 @@ class TimePixFlyMockServer: try: requests.get(f"http://{self.host}:{self.port}/measurement/start", timeout=0.2) except requests.exceptions.RequestException: - pass # Ignore all exceptions as there is currently no return value for the request - self.add_log("Acquisition started on Timepix Fly mock server.") + self.add_log("Failed to start acquisition on Timepix Fly mock server.") + # Ignore all exceptions as there is currently no return value for the request + else: + self.add_log("Acquisition started on Timepix Fly mock server.")