diff --git a/superxas_bec/devices/timepix/timepix.py b/superxas_bec/devices/timepix/timepix.py index 2f59780..dcda59d 100644 --- a/superxas_bec/devices/timepix/timepix.py +++ b/superxas_bec/devices/timepix/timepix.py @@ -318,7 +318,7 @@ class Timepix(PSIDeviceBase, TimePixControl): height = self.image.array_size.height.get() # Geometry correction for the image data = np.reshape(value, (height, width)) - logger.info(f"Setting preview data for {self.name} with shape {data.shape}") + logger.debug(f"Setting preview data for {self.name} with shape {data.shape}") self.preview.put(data) except Exception: # pylint: disable=broad-except content = traceback.format_exc() @@ -475,17 +475,6 @@ class Timepix(PSIDeviceBase, TimePixControl): """Set whether XES data acquisition is enabled.""" self._enable_xes = value self._enable_xes_settings(value) - # #TODO Update device manager config if available - # if self.device_manager is not None: - # dev_obj = self.device_manager.devices.get(self.name, None) - # if dev_obj is not None: - # cfg = dev_obj.get_device_config() - # if "enable_xes" in cfg and cfg["enable_xes"] != value: - # cfg["enable_xes"] = value - # dev_obj.set_device_config({"enable_xes": value}) - # logger.info( - # f"Updated 'enable_xes' to {value} in device manager for {self.name}" - # ) @property def pixel_map(self) -> PixelMap: @@ -632,10 +621,6 @@ class Timepix(PSIDeviceBase, TimePixControl): def on_stage(self) -> StatusBase | None: """Called while staging the device.""" - ### Clean last error on backend - last_error = self.backend.timepix_fly_client.last_error() - if last_error: - logger.info(f"TimeipxFly backend reports about last error: {last_error}") ### self.accumulated_data_e1 = None self.accumulated_data_e2 = None @@ -697,10 +682,10 @@ class Timepix(PSIDeviceBase, TimePixControl): def on_unstage(self) -> None: """Called while unstaging the device.""" - # TODO what should happen for unstage? Make sure that acquisition is not running? - # self.backend.on_unstage() - # self.cam.acquire.put(0) - # status_camera = CompareStatus(self.cam.acquire_busy, ACQUIRESTATUS.DONE) + if self.cam.acquire_busy.get() == ACQUIRESTATUS.ACQUIRING: + self.cam.acquire.put(0) + status_camera = CompareStatus(self.cam.acquire_busy, ACQUIRESTATUS.DONE) + status_camera.wait(timeout=self._pv_timeout) def on_pre_scan(self) -> StatusBase: """Called right before the scan starts on all devices automatically.""" @@ -725,7 +710,6 @@ class Timepix(PSIDeviceBase, TimePixControl): status_camera = TransitionStatus( self.cam.acquire_busy, [ACQUIRESTATUS.DONE, ACQUIRESTATUS.ACQUIRING, ACQUIRESTATUS.DONE] ) - img_counter = self.hdf.num_captured.get() status_backend = None # First we make sure that the backend reach 'config' state. This needs to happend before each trigger. if self.enable_xes is True: @@ -781,7 +765,7 @@ class Timepix(PSIDeviceBase, TimePixControl): 0, operation="!=", exception=RuntimeError( - "HDF5 write failed, please check with support if the IOC has sufficient permissions to write to disk." + f"HDF5 write failed on device {self.name} with file path {self._full_path}, please check with support if the IOC has sufficient permissions to write to disk." ), ) status_written_images = CompareStatus(self.hdf.num_captured, self._n_images) @@ -804,9 +788,7 @@ class Timepix(PSIDeviceBase, TimePixControl): def _complete_callback(self, status: CompareStatus) -> None: """Callback for when the device completes a scan.""" - if ( - self.hdf.enable.get() != "Enable" - ): # TODO: Not sure if we should support disabled file writing. + if self.hdf.enable.get() != "Enable": return if status.success: self.file_event.put( diff --git a/superxas_bec/devices/timepix/timepix_fly_client/timepix_fly_backend.py b/superxas_bec/devices/timepix/timepix_fly_client/timepix_fly_backend.py index b2ca509..585f2b3 100644 --- a/superxas_bec/devices/timepix/timepix_fly_client/timepix_fly_backend.py +++ b/superxas_bec/devices/timepix/timepix_fly_client/timepix_fly_backend.py @@ -107,7 +107,7 @@ class TimepixFlyBackend: # status = self.start_data_server() # status.wait(timeout=timeout / 2) - except Exception as exc: # pylint: disable=broad-except + except Exception: # pylint: disable=broad-except content = traceback.format_exc() logger.error(f"Error starting data server: {content}") # pylint: disable=raise-missing-from @@ -127,6 +127,10 @@ class TimepixFlyBackend: pixel_map (PixelMap): The pixel map for the Timepix Fly detector. """ time_started = time.time() + ### Clean last error on backend + last_error = self.timepix_fly_client.last_error() + if last_error: + logger.info(f"TimeipxFly backend reports about last error: {last_error}") status = StatusBase() self.cancel_on_stop(status) self.timepix_fly_client.add_status_callback( @@ -168,7 +172,6 @@ class TimepixFlyBackend: raise TimeoutError( f"Timepix Fly backend state did not reach config state after setting config, running into timeout. Error traceback {content}." ) - logger.info(f"TimePixFly backend staged after {time.time() - time_started:.3f} seconds.") def on_trigger( self, status: StatusBase | DeviceStatus | None = None @@ -357,135 +360,6 @@ class TimepixFlyBackend: """ self._process_timepix_fly_msg(message.value.data) - def start_data_server(self) -> StatusBase: - """ - Start the data server to receive data from the Timepix Fly backend over a socket connection. - It will try to decypher the hostname through socket.getaddrinfo, and if multiple addresses - are found, it will use the first one. Please note that depending on the network configuration, - the hostname might not have the correct domain name attached, so it is recommended to specify - the hostname explicitly with domain name, e.g. 'x10da-bec-001.psi.ch'. - - The method creates a socket server that listens for incoming connections on the specified - hostname and port. It starts a thread that continuously receives data from the socket, - decodes the received JSON data, and processes it. The data is expected to be in JSON format, - with each message ending with a trailing byte "}\n". - - Returns: - StatusBase: A status object that indicates if the data server thread is ready to accept - connections. High level implementation should ensure that the data server is - started (status.wait(timeout=4)) before any data is sent from the backend. - """ - info = socket.getaddrinfo( - self.hostname, port=self.socket_port, family=socket.AF_INET, type=socket.SOCK_STREAM - ) - if len(info) == 0: - raise RuntimeError(f"Could not resolve hostname {self.hostname} for socket server.") - if len(info) > 1: - logger.info( - f"Multiple addresses found for {self.hostname}. Using the first one: {info[0]}" - ) - family, socktype, proto, _, sockaddr = info[0] - - self._socket_server = socket.create_server(sockaddr, family=family, backlog=1) - self._socket_server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - - # Set the hostname and socket_port to the ones that was picked by the socket.getaddrinfo - self.hostname, self.socket_port = self._socket_server.getsockname() - logger.info( - f"Socket server started on {self.hostname}:{self.socket_port}. Waiting for connections." - ) - - # Create status object to return for the high level implementations - status = StatusBase() - - if self._data_thread is None or not self._data_thread.is_alive(): - self._data_thread_shutdown_event.clear() - self._data_thread = threading.Thread( - target=self._receive_data_on_socket, kwargs={"status": status} - ) - self._data_thread.start() - else: - raise TimepixFlyBackendException( - "Data server thread is already running on timepix_fly_backend." - ) - - return status - - def _receive_data_on_socket(self, status: StatusBase): - """ - Background loop running in a thread, that receives data from the - timepix fly backend over socket_server. The backend reconnects for every acquisition (trigger), - to this socket. Therefore, it is important to handle all connections and disconnections properly. - - The buffer variable stores a string stream of received data. Whenever a trailing byte "}\n" is found, - in the buffer, the buffer is split into chunks of received data and each chunk is decoded - as a JSON object. The decoded objects are then processed, and if an EndFrame message is received, - the registered callbacks are executed with the StartFrame, all DataFrames, and the EndFrame message. - """ - buffer = "" - self._socket_server.settimeout( - 0.1 - ) # Set short socket timeout to avoid blocking the thread loop - status.set_finished() # Indicate that the socket server is ready to accept connections - while not self._data_thread_shutdown_event.is_set(): # Shutdown event - try: - # blocks until connected or timeout reached - conn, addr = self._socket_server.accept() - except socket.timeout: - continue # Timeout is okay, continue - except Exception: # pylint: disable=broad-except - # Log error, check if shutdown event is set. - # Shutdown event should be set before socket_server.close() is called. - content = traceback.format_exc() - logger.error(f"Error accepting connection: {content}") - continue - logger.debug(f"Connection accepted from {addr} for timepix_fly backend.") - # Clear the message buffer before entering the loop. - if self.__msg_buffer: - logger.warning(f"Found messages in msg_buffer: {self.__msg_buffer}") - self.__msg_buffer.clear() - conn.settimeout(0.1) # Set timeout for connection to avoid blocking in recv - with conn: - while not self._data_thread_shutdown_event.is_set(): - try: - # What if we split the chunk - chunk = conn.recv(4096) # Adjust buffer size as needed - except socket.timeout: - # Timeout is okay, continue in loop - continue - except Exception as e: # pylint: disable=broad-except - logger.error(f"Connection error: {e}. Closing connection.") - # conn = None #TODO should we reset conn? - break - if not chunk: - # Receiving an empty chunk means the connection was closed - # conn = None #TODO should we reset conn? - break - buffer += chunk.decode("utf-8") - - # Check if trailing byte "}\n" present in buffer - buffer_chunks = buffer.split("}\n") - for entry in buffer_chunks[:-1]: - # Process all complete JSON objects in the buffer - self._decode_received_data(entry + "}") - # Keep the last incomplete chunk. - # If the buffer ended with "}\n", this will be an empty string. - buffer = buffer_chunks[-1] - - def _decode_received_data(self, buffer: str) -> None: - """ - Decode the received data from the socket. - - Args: - buffer (str): The JSON string received from the socket. - """ - try: - obj, _ = self._decoder.raw_decode(buffer) - except json.JSONDecodeError: - logger.error(f"TimePixFlyBackend: Failed to decode JSON from buffer: {buffer}") - return # TODO should this raise, or only log error as of now? - self._process_timepix_fly_msg(obj) - def _process_timepix_fly_msg(self, obj: dict) -> None: if obj.get("type", "") == "StartFrame": # Clear the message buffer when a new StartFrame is received, to avoid mixing messages from different acquisitions. @@ -499,7 +373,7 @@ class TimepixFlyBackend: if obj.get("type", "") == "EndFrame": try: # If the EndFrame message is received, run the callbacks - logger.debug(f"Running callbacks") + logger.debug("Running callbacks") self.run_msg_callbacks() except Exception: # pylint: disable=broad-except content = traceback.format_exc() @@ -529,7 +403,6 @@ class TimepixFlyBackend: if __name__ == "__main__": # pragma: no cover - import time from superxas_bec.devices.timepix.timepix_fly_client.test_utils.timepix_fly_mock_server import ( TimePixFlyMockServer,