from logging import getLogger from cam_server.pipeline.data_processing import functions from cam_server.utils import create_thread_pvs, epics_lock from collections import deque import json import numpy as np import scipy.signal import numba import time import sys from threading import Thread # Configure Numba to use multiple threads numba.set_num_threads(4) _logger = getLogger(__name__) # Shared state globals global_roi = [0, 0] initialized = False sent_pid = -1 buffer = deque(maxlen=5) channel_pv_names = None base_pv_names = [] all_pv_names = [] global_ravg_length = 100 ravg_buffers = {} # Configuration for rolling-average of statistics # Configuration for N-shot average spectrum global_avg_length = 100 avg_buffer = None avg_pv_names = [] @numba.njit(parallel=False) def get_spectrum(image, background): """Compute background-subtracted spectrum via row-wise summation.""" y, x = image.shape profile = np.zeros(x, dtype=np.float64) for i in numba.prange(y): for j in range(x): profile[j] += image[i, j] - background[i, j] return profile def update_PVs(buffer, *pv_names): pvs = create_thread_pvs(list(pv_names)) while True: time.sleep(0.1) try: rec = buffer.popleft() except IndexError: continue for pv, val in zip(pvs, rec): if pv and pv.connected and (val is not None): try: pv.put(val) except Exception as e: _logger.error(f"Error writing to {pv.pvname}: {e}") def initialize(params): """Initialize PV names, running-average settings, N-shot average, and launch update thread.""" global channel_pv_names, base_pv_names, all_pv_names, global_ravg_length global global_avg_length, avg_buffer, avg_pv_names camera = params["camera_name"] e_int = params["e_int_name"] e_axis = params["e_axis_name"] # Fit/result PV names center_pv = f"{camera}:FIT-COM" fwhm_pv = f"{camera}:FIT-FWHM" fit_rms_pv = f"{camera}:FIT-RMS" fit_res_pv = f"{camera}:FIT-RES" fit_spec_pv = f"{camera}:FIT-SPECTRUM_Y" # ROI PVs for dynamic read ymin_pv = f"{camera}:SPC_ROI_YMIN" ymax_pv = f"{camera}:SPC_ROI_YMAX" axis_pv = e_axis channel_pv_names = [ymin_pv, ymax_pv, axis_pv] # Spectrum statistical PV names com_pv = f"{camera}:SPECT-COM" std_pv = f"{camera}:SPECT-RMS" skew_pv = f"{camera}:SPECT-SKEW" iqr_pv = f"{camera}:SPECT-IQR" res_pv = f"{camera}:SPECT-RES" # Base PVs for update thread (order matters) base_pv_names = [ e_int, center_pv, fwhm_pv, fit_rms_pv, fit_res_pv, fit_spec_pv, com_pv, std_pv, skew_pv, iqr_pv, res_pv ] # Running-average configuration global_ravg_length = params.get('RAVG_length', global_ravg_length) exclude = {e_int, e_axis, f"{camera}:processing_parameters"} ravg_base = [pv for pv in base_pv_names if pv not in exclude] ravg_pv_names = [pv + '-RAVG' for pv in ravg_base] # N-shot average configuration global_avg_length = params.get('avg_nshots', global_avg_length) avg_buffer = deque(maxlen=global_avg_length) # Define PVs for N-shot average statistics and spectra avg_pv_names = [ f"{camera}:AVG-FIT-COM", f"{camera}:AVG-FIT-FWHM", f"{camera}:AVG-FIT-RMS", f"{camera}:AVG-FIT-RES", f"{camera}:AVG-SPECT-COM", f"{camera}:AVG-SPECT-RMS", f"{camera}:AVG-SPECT-SKEW", f"{camera}:AVG-SPECT-IQR", f"{camera}:AVG-SPECT-RES", f"{camera}:AVG-SPECTRUM_Y", f"{camera}:AVG-FIT-SPECTRUM_Y" ] global ARRAY_PVS ARRAY_PVS = set([ f"{camera}:SPECTRUM_Y", f"{camera}:FIT-SPECTRUM_Y", f"{camera}:AVG-SPECTRUM_Y", f"{camera}:AVG-FIT-SPECTRUM_Y", f"{camera}:FIT-SPECTRUM_Y-RAVG" # add any others here ]) # All PVs (original + running average + N-shot average) all_pv_names = base_pv_names + ravg_pv_names + avg_pv_names # Start background thread for PV updates thread = Thread(target=update_PVs, args=(buffer, *all_pv_names), daemon=True) thread.start() def _pv_safe(val, pvname): if pvname in ARRAY_PVS: # array PV: must always send a 1D numpy array if isinstance(val, np.ndarray): if val.ndim == 1: return val.astype(np.float64) elif val.ndim == 0: # Scalar array, wrap as 1-element return np.array([val.item()], dtype=np.float64) else: raise ValueError(f"{pvname}: expected 1D array, got shape {val.shape}") elif isinstance(val, (list, tuple)): return np.array(val, dtype=np.float64) elif isinstance(val, (float, int, np.generic)): return np.array([val], dtype=np.float64) else: raise TypeError(f"{pvname}: expected array, got {type(val)}") else: # scalar PV: must always send float if isinstance(val, np.ndarray): if val.size == 1: return float(val.item()) else: raise ValueError(f"{pvname}: expected scalar, got array of size {val.size}") elif isinstance(val, (np.generic, float, int)): return float(val) else: raise TypeError(f"{pvname}: expected scalar, got {type(val)}") def process_image(image, pulse_id, timestamp, x_axis, y_axis, parameters, bsdata=None, background=None): """ Main entrypoint: subtract background, crop ROI, smooth, fit Gaussian, compute metrics, N-shot average, queue PV updates (with running averages). Returns a dict of processed PV values (original and average channels). """ global initialized, sent_pid, channel_pv_names global global_ravg_length, ravg_buffers, avg_buffer try: if not initialized: initialize(parameters) initialized = True camera = parameters["camera_name"] # Dynamic ROI and axis PV read ymin_pv, ymax_pv, axis_pv = create_thread_pvs(channel_pv_names) if ymin_pv and ymin_pv.connected: global_roi[0] = ymin_pv.value if ymax_pv and ymax_pv.connected: global_roi[1] = ymax_pv.value if not (axis_pv and axis_pv.connected): _logger.warning("Energy axis not connected") return None axis = axis_pv.value[:image.shape[1]] # Preprocess image proc_img = image.astype(np.float32) - np.float32(parameters.get("pixel_bkg", 0)) nrows, _ = proc_img.shape # Background image bg_img = parameters.pop('background_data', None) bg_img = bg_img.astype(np.float32) if isinstance(bg_img, np.ndarray) and bg_img.shape == proc_img.shape else None # Crop ROI ymin, ymax = map(int, global_roi) if 0 <= ymin < ymax <= nrows: proc_img = proc_img[ymin:ymax, :] if bg_img is not None: bg_img = bg_img[ymin:ymax, :] # Extract spectrum and fitted spectrum spectrum = get_spectrum(proc_img, bg_img) if bg_img is not None else np.sum(proc_img, axis=0) smoothed = scipy.signal.savgol_filter(spectrum, 51, 3) minimum, maximum = smoothed.min(), smoothed.max() amplitude = maximum - minimum skip = amplitude <= nrows * 1.5 offset, amp_fit, center, sigma = functions.gauss_fit_psss( smoothed[::2], axis[::2], offset=minimum, amplitude=amplitude, skip=skip, maxfev=10 ) # Reconstruct fitted curve fit_spectrum = offset + amp_fit * np.exp(-((axis - center)**2) / (2 * sigma**2)) #fit_spectrum = np.abs(fit_spectrum) # Moments sm_norm = smoothed / np.sum(smoothed) spect_com = np.sum(axis * sm_norm) spect_std = np.sqrt(np.sum((axis - spect_com)**2 * sm_norm)) spect_skew = np.sum((axis - spect_com)**3 * sm_norm) / (spect_std**3) cum = np.cumsum(sm_norm); e25 = np.interp(0.25, cum, axis); e75 = np.interp(0.75, cum, axis) spect_iqr = e75 - e25; spect_sum = np.sum(spectrum) # Original result result = { parameters["e_int_name"]: spectrum, parameters["e_axis_name"]: axis, f"{camera}:SPECTRUM_Y_SUM": spect_sum, f"{camera}:FIT-COM": np.float64(center), f"{camera}:FIT-FWHM": np.float64(2.355 * sigma), f"{camera}:FIT-RMS": np.float64(sigma), f"{camera}:FIT-RES": np.float64(2.355 * sigma / center * 1000), f"{camera}:FIT-SPECTRUM_Y": np.float64(fit_spectrum), f"{camera}:SPECT-COM": spect_com, f"{camera}:SPECT-RMS": spect_std, f"{camera}:SPECT-SKEW": spect_skew, f"{camera}:SPECT-IQR": spect_iqr, f"{camera}:SPECT-RES": np.float64(spect_iqr / spect_com * 1000), f"{camera}:processing_parameters": json.dumps({"roi": global_roi}) } # Rolling averages exclude = {parameters["e_int_name"], parameters["e_axis_name"], f"{camera}:processing_parameters"} ravg_results = {} for base_pv in (pv for pv in base_pv_names if pv not in exclude): buf = ravg_buffers.setdefault(base_pv, deque(maxlen=global_ravg_length)) buf.append(result[base_pv]) ravg_results[f"{base_pv}-RAVG"] = np.mean(buf) # N-shot average and average fitted spectrum avg_buffer.append(spectrum) avg_spectrum = np.mean(np.stack(avg_buffer), axis=0) avg_spectrum = np.abs(avg_spectrum) fit_avg = offset + amp_fit * np.exp(-((axis - center)**2) / (2 * sigma**2)) # using avg fit params below sm_avg = scipy.signal.savgol_filter(avg_spectrum, 51, 3) min_a, max_a = sm_avg.min(), sm_avg.max() amp_a = max_a - min_a; skip_a = amp_a <= nrows * 1.5 offs_a, amp_fit_a, center_a, sigma_a = functions.gauss_fit_psss( sm_avg[::2], axis[::2], offset=min_a, amplitude=amp_a, skip=skip_a, maxfev=10 ) fit_avg_spectrum = offs_a + amp_fit_a * np.exp(-((axis - center_a)**2) / (2 * sigma_a**2)) fit_avg_spectrum = np.abs(fit_avg_spectrum) # Average moments sm_norm_a = sm_avg / np.sum(sm_avg) spect_com_a = np.sum(axis * sm_norm_a) spect_std_a = np.sqrt(np.sum((axis - spect_com_a)**2 * sm_norm_a)) spect_skew_a = np.sum((axis - spect_com_a)**3 * sm_norm_a) / (spect_std_a**3) cum_a = np.cumsum(sm_norm_a); e25_a = np.interp(0.25, cum_a, axis); e75_a = np.interp(0.75, cum_a, axis) spect_iqr_a = e75_a - e25_a spect_res_a = spect_iqr_a / spect_com_a * 1000 avg_results = { f"{camera}:AVG-FIT-COM": np.float64(center), f"{camera}:AVG-FIT-FWHM": np.float64(2.355 * sigma_a), f"{camera}:AVG-FIT-RMS": np.float64(sigma_a), f"{camera}:AVG-FIT-RES": np.float64(2.355 * sigma_a / center_a * 1000), f"{camera}:AVG-SPECT-COM": spect_com, f"{camera}:AVG-SPECT-RMS": spect_std_a, f"{camera}:AVG-SPECT-SKEW": spect_skew_a, f"{camera}:AVG-SPECT-IQR": spect_iqr_a, f"{camera}:AVG-SPECT-RES": np.float64(spect_res_a), f"{camera}:AVG-SPECTRUM_Y": avg_spectrum, f"{camera}:AVG-FIT-SPECTRUM_Y": fit_avg_spectrum } # Merge and queue full_results = {**result, **ravg_results, **avg_results} if epics_lock.acquire(False): try: if pulse_id > sent_pid: sent_pid = pulse_id #entry = tuple(full_results.get(pv) for pv in all_pv_names) entry = tuple(_pv_safe(full_results.get(pv), pv) for pv in all_pv_names) buffer.append(entry) finally: epics_lock.release() return full_results except Exception as ex: _logger.warning("Processing error: %s", ex) return {}