from logging import getLogger from cam_server.pipeline.data_processing import functions from cam_server.utils import create_thread_pvs, epics_lock from collections import deque import json import numpy as np import scipy.signal import numba import time from threading import Thread numba.set_num_threads(4) _logger = getLogger(__name__) # Shared variables channel_names = None roi = [0, 0] initialized = False sent_pid = -1 nrows = 1 axis = None buffer = deque(maxlen=5) @numba.njit(parallel=False) def get_spectrum(image, background): """Extracts a spectrum from the image by subtracting background.""" y, x = image.shape profile = np.zeros(x, dtype=np.float64) for i in numba.prange(y): for j in range(x): profile[j] += image[i, j] - background[i, j] return profile def update_PVs(buffer, output_pv_name, center_pv_name, fwhm_pv_name, fit_rms_pv_name, fit_res_pv_name, com_pv_name, std_pv_name, res_pv_name): """Updates EPICS PVs using buffered processing.""" [output_pv, center_pv, fwhm_pv, fit_rms_pv, fit_res_pv, com_pv, std_pv, res_pv] = create_thread_pvs( [output_pv_name, center_pv_name, fwhm_pv_name, fit_rms_pv_name, fit_res_pv_name, com_pv_name, std_pv_name, res_pv_name] ) while True: time.sleep(0.1) try: rec = buffer.popleft() except IndexError: continue try: if output_pv and output_pv.connected and (rec[0] is not None): output_pv.put(rec[0]) if center_pv and center_pv.connected and (rec[1] is not None): center_pv.put(rec[1]) if fwhm_pv and fwhm_pv.connected and (rec[2] is not None): fwhm_pv.put(rec[2]) if com_pv and com_pv.connected and (rec[3] is not None): com_pv.put(rec[3]) if std_pv and std_pv.connected and (rec[4] is not None): std_pv.put(rec[4]) if fit_rms_pv and fit_rms_pv.connected and (rec[5] is not None): fit_rms_pv.put(rec[5]) if fit_res_pv and fit_res_pv.connected and (rec[6] is not None): fit_res_pv.put(rec[6]) if res_pv and res_pv.connected and (rec[7] is not None): res_pv.put(rec[7]) except Exception as e: _logger.exception(f"Error updating channels: {e}") def initialize(params): """Initializes processing and starts the PV update thread.""" global ymin_pv, ymax_pv, axis_pv, buffer, channel_names camera_name = params["camera_name"] output_pv_name = params["e_int_name"] axis_pv_name = params["e_axis_name"] center_pv_name = camera_name + ":FIT-COM" fwhm_pv_name = camera_name + ":FIT-FWHM" fit_rms_pv_name = camera_name + ":FIT-RMS" fit_res_pv_name = camera_name + ":FIT-RES" ymin_pv_name = camera_name + ":SPC_ROI_YMIN" ymax_pv_name = camera_name + ":SPC_ROI_YMAX" com_pv_name = camera_name + ":SPECT-COM" std_pv_name = camera_name + ":SPECT-RMS" res_pv_name = camera_name + ":SPECT-RES" thread = Thread(target=update_PVs, args=(buffer, output_pv_name, center_pv_name, fwhm_pv_name, fit_rms_pv_name, fit_res_pv_name, com_pv_name, std_pv_name, res_pv_name)) thread.start() channel_names = [ymin_pv_name, ymax_pv_name, axis_pv_name] def process_image(image, pulse_id, timestamp, x_axis, y_axis, parameters, bsdata=None, background=None): """Processes the image and extracts spectrum features.""" try: global roi, initialized, sent_pid, nrows, axis, buffer if not initialized: initialize(parameters) initialized = True [ymin_pv, ymax_pv, axis_pv] = create_thread_pvs(channel_names) camera_name = parameters["camera_name"] if ymin_pv and ymin_pv.connected: roi[0] = ymin_pv.value if ymax_pv and ymax_pv.connected: roi[1] = ymax_pv.value if axis_pv and axis_pv.connected: axis = axis_pv.value else: axis = None if axis is None or len(axis) < image.shape[1]: _logger.warning("Invalid energy axis") return None axis = axis[:image.shape[1]] processing_image = image.astype(np.float32) - np.float32(parameters["pixel_bkg"]) nrows, ncols = processing_image.shape background_image = parameters.pop('background_data', None) if isinstance(background_image, np.ndarray) and background_image.shape == processing_image.shape: spectrum = get_spectrum(processing_image, background_image) else: spectrum = np.sum(processing_image, axis=0) smoothed_spectrum = scipy.signal.savgol_filter(spectrum, 51, 3) minimum, maximum = smoothed_spectrum.min(), smoothed_spectrum.max() amplitude = maximum - minimum skip = amplitude <= nrows * 1.5 offset, amplitude, center, sigma = functions.gauss_fit_psss(smoothed_spectrum[::2], axis[::2], offset=minimum, amplitude=amplitude, skip=skip, maxfev=10) spectrum_com = np.sum(axis * smoothed_spectrum) / np.sum(smoothed_spectrum) spectrum_std = np.sqrt(np.sum((axis - spectrum_com) ** 2 * smoothed_spectrum)) processed_data = { camera_name + ":SPECTRUM_Y": spectrum, camera_name + ":SPECTRUM_Y_SUM": np.sum(spectrum), camera_name + ":SPECTRUM_X": axis, camera_name + ":FIT-COM": np.float64(center), camera_name + ":FIT-FWHM": np.float64(2.355 * sigma), camera_name + ":FIT-RMS": np.float64(sigma), camera_name + ":FIT-RES": (np.float64(2.355 * sigma) / np.float64(center)) * 1000, camera_name + ":SPECT-COM": spectrum_com, camera_name + ":SPECT-RMS": spectrum_std, camera_name + ":SPECT-RES": (np.float64(2.355 * spectrum_std) / np.float64(spectrum_com)) * 1000, } if epics_lock.acquire(False): try: if pulse_id > sent_pid: sent_pid = pulse_id buffer.append((spectrum, center, 2.355 * sigma, spectrum_com, spectrum_std, sigma, (2.355 * sigma / center) * 1000, (2.355 * spectrum_std / spectrum_com) * 1000)) finally: epics_lock.release() return processed_data # Ensure the processed data is returned except Exception as ex: _logger.warning(str(ex)) return {}