306 lines
9.8 KiB
Python
306 lines
9.8 KiB
Python
from logging import getLogger
|
|
from cam_server.pipeline.data_processing import functions
|
|
from cam_server.utils import epics_lock, create_thread_pvs
|
|
from collections import deque
|
|
import threading
|
|
import json
|
|
import numpy as np
|
|
import scipy.signal
|
|
import numba
|
|
import time
|
|
from threading import Thread
|
|
|
|
# Configure Numba to use multiple threads and parallelize
|
|
numba.set_num_threads(4)
|
|
|
|
_logger = getLogger(__name__)
|
|
|
|
# Shared state globals
|
|
global_roi = [0, 0]
|
|
last_roi = None
|
|
params_json = None
|
|
initialized = False
|
|
sent_pid = -1
|
|
buffer = deque(maxlen=5)
|
|
channel_pv_names = []
|
|
base_pv_names = []
|
|
all_pv_names = []
|
|
global_ravg_length = 100
|
|
# Running-average state
|
|
global ravg_buffers, ravg_sum_map, ravg_lock
|
|
ravg_buffers = {}
|
|
ravg_sum_map = {}
|
|
ravg_lock = threading.Lock()
|
|
# N-shot average state
|
|
global global_avg_n, avg_buffer, avg_sum
|
|
global_avg_n = 100
|
|
avg_buffer = deque()
|
|
avg_sum = None
|
|
# Cached PV handles
|
|
pv_handles = None
|
|
|
|
# Update thread function
|
|
def update_PVs():
|
|
"""Continuously pop and write to EPICS PVs using cached handles."""
|
|
global pv_handles, buffer
|
|
while True:
|
|
try:
|
|
entry = buffer.popleft()
|
|
except IndexError:
|
|
time.sleep(0.001)
|
|
continue
|
|
for pv, val in zip(pv_handles, entry):
|
|
if pv and pv.connected and val is not None:
|
|
pv.put(val)
|
|
|
|
|
|
def initialize(params):
|
|
"""Initialize PV names, caches, and start update thread."""
|
|
global channel_pv_names, base_pv_names, all_pv_names
|
|
global global_ravg_length, global_avg_n, avg_buffer, avg_sum, pv_handles
|
|
|
|
camera = params["camera_name"]
|
|
e_int = params["e_int_name"]
|
|
e_axis = params["e_axis_name"]
|
|
|
|
# Fit/result PV names
|
|
center_pv = f"{camera}:FIT-COM"
|
|
fwhm_pv = f"{camera}:FIT-FWHM"
|
|
fit_rms_pv = f"{camera}:FIT-RMS"
|
|
fit_res_pv = f"{camera}:FIT-RES"
|
|
fit_spec_pv = f"{camera}:FIT-SPECTRUM_Y"
|
|
|
|
# ROI PVs
|
|
ymin_pv = f"{camera}:SPC_ROI_YMIN"
|
|
ymax_pv = f"{camera}:SPC_ROI_YMAX"
|
|
channel_pv_names = [ymin_pv, ymax_pv, e_axis]
|
|
|
|
# Stats PVs
|
|
com_pv = f"{camera}:SPECT-COM"
|
|
std_pv = f"{camera}:SPECT-RMS"
|
|
skew_pv = f"{camera}:SPECT-SKEW"
|
|
iqr_pv = f"{camera}:SPECT-IQR"
|
|
res_pv = f"{camera}:SPECT-RES"
|
|
|
|
# Base PV list
|
|
base_pv_names = [
|
|
e_int, center_pv, fwhm_pv, fit_rms_pv,
|
|
fit_res_pv, fit_spec_pv, com_pv, std_pv, skew_pv, iqr_pv, res_pv
|
|
]
|
|
|
|
# Running-average PV names
|
|
global_ravg_length = params.get('RAVG_length', global_ravg_length)
|
|
exclude = {e_int, e_axis, f"{camera}:processing_parameters"}
|
|
ravg_base = [pv for pv in base_pv_names if pv not in exclude]
|
|
ravg_pv_names = [pv + '-RAVG' for pv in ravg_base]
|
|
|
|
# N-shot average settings
|
|
global_avg_n = params.get('avg_nshots', global_avg_n)
|
|
avg_buffer = deque(maxlen=global_avg_n)
|
|
avg_sum = None
|
|
avg_pv_names = [
|
|
f"{camera}:AVG-FIT-COM",
|
|
f"{camera}:AVG-FIT-FWHM",
|
|
f"{camera}:AVG-FIT-RMS",
|
|
f"{camera}:AVG-FIT-RES",
|
|
f"{camera}:AVG-SPECT-COM",
|
|
f"{camera}:AVG-SPECT-RMS",
|
|
f"{camera}:AVG-SPECT-SKEW",
|
|
f"{camera}:AVG-SPECT-IQR",
|
|
f"{camera}:AVG-SPECT-RES",
|
|
f"{camera}:AVG-SPECTRUM_Y",
|
|
f"{camera}:AVG-FIT-SPECTRUM_Y"
|
|
]
|
|
|
|
# All PVs
|
|
all_pv_names = base_pv_names + ravg_pv_names + avg_pv_names + [f"{camera}:processing_parameters"]
|
|
|
|
# Cache PV handles
|
|
pv_handles = create_thread_pvs(all_pv_names)
|
|
|
|
# Start update thread
|
|
thread = Thread(target=update_PVs, daemon=True)
|
|
thread.start()
|
|
|
|
|
|
def process_image(image, pulse_id, timestamp, x_axis, y_axis, parameters, bsdata=None, background=None):
|
|
"""
|
|
Fast processing: vectorized spectrum, incremental averages, cached PVs.
|
|
"""
|
|
global initialized, sent_pid, buffer
|
|
global global_roi, last_roi, params_json
|
|
global ravg_buffers, ravg_sum_map, ravg_lock
|
|
global avg_buffer, avg_sum, global_avg_n, all_pv_names
|
|
|
|
if not initialized:
|
|
initialize(parameters)
|
|
initialized = True
|
|
|
|
camera = parameters["camera_name"]
|
|
|
|
# Read ROI once and detect change
|
|
ymin, ymax = global_roi
|
|
new_ymin = parameters.get('roi_ymin', ymin)
|
|
new_ymax = parameters.get('roi_ymax', ymax)
|
|
if (new_ymin, new_ymax) != (ymin, ymax):
|
|
global_roi[:] = [new_ymin, new_ymax]
|
|
roi_changed = True
|
|
else:
|
|
roi_changed = False
|
|
|
|
# Preprocess image
|
|
img = image.astype(np.float32)
|
|
pixel_bkg = parameters.get("pixel_bkg", 0)
|
|
proc_img = img - pixel_bkg
|
|
nrows, _ = proc_img.shape
|
|
|
|
# Background subtraction
|
|
bg = parameters.pop('background_data', None)
|
|
if isinstance(bg, np.ndarray) and bg.shape == proc_img.shape:
|
|
proc_img -= bg.astype(np.float32)
|
|
|
|
# Crop ROI
|
|
ymin_i, ymax_i = int(global_roi[0]), int(global_roi[1])
|
|
if 0 <= ymin_i < ymax_i <= nrows:
|
|
proc_img = proc_img[ymin_i:ymax_i, :]
|
|
|
|
# Trim x_axis to match the cropped image width
|
|
axis = x_axis[:proc_img.shape[1]]
|
|
|
|
# Spectrum via vector sum
|
|
spectrum = np.sum(proc_img, axis=0)
|
|
# Smooth
|
|
smoothed = scipy.signal.savgol_filter(spectrum, 51, 3)
|
|
|
|
# Fit Gaussian
|
|
minimum, maximum = smoothed.min(), smoothed.max()
|
|
amplitude = maximum - minimum
|
|
skip = amplitude <= nrows * 1.5
|
|
offset, amp_fit, center, sigma = functions.gauss_fit_psss(
|
|
smoothed[::2],
|
|
axis[:len(smoothed)][::2],
|
|
offset=minimum,
|
|
amplitude=amplitude,
|
|
skip=skip,
|
|
maxfev=10
|
|
)
|
|
fit_spectrum = offset + amp_fit * np.exp(
|
|
-((axis[:len(smoothed)] - center)**2) / (2 * sigma**2)
|
|
)
|
|
|
|
# Compute stats
|
|
sm_norm = smoothed / np.sum(smoothed)
|
|
spect_com = np.dot(axis[:len(sm_norm)], sm_norm)
|
|
spect_std = np.sqrt(np.dot((axis[:len(sm_norm)] - spect_com)**2, sm_norm))
|
|
spect_skew = np.dot((axis[:len(sm_norm)] - spect_com)**3, sm_norm) / (spect_std**3)
|
|
cum = np.cumsum(sm_norm)
|
|
e25 = np.interp(0.25, cum, axis[:len(cum)])
|
|
e75 = np.interp(0.75, cum, axis[:len(cum)])
|
|
spect_iqr = e75 - e25
|
|
spect_sum = spectrum.sum()
|
|
|
|
# Original result
|
|
result = {
|
|
parameters["e_int_name"]: spectrum,
|
|
parameters["e_axis_name"]: axis,
|
|
f"{camera}:SPECTRUM_Y_SUM": spect_sum,
|
|
f"{camera}:FIT-COM": np.float64(center),
|
|
f"{camera}:FIT-FWHM": np.float64(2.355 * sigma),
|
|
f"{camera}:FIT-RMS": np.float64(sigma),
|
|
f"{camera}:FIT-RES": np.float64(2.355 * sigma / center * 1000),
|
|
f"{camera}:FIT-SPECTRUM_Y": fit_spectrum,
|
|
f"{camera}:SPECT-COM": spect_com,
|
|
f"{camera}:SPECT-RMS": spect_std,
|
|
f"{camera}:SPECT-SKEW": spect_skew,
|
|
f"{camera}:SPECT-IQR": spect_iqr,
|
|
f"{camera}:SPECT-RES": np.float64(spect_iqr / spect_com * 1000)
|
|
}
|
|
# Update JSON PV only on ROI change
|
|
if roi_changed:
|
|
params_json = json.dumps({"roi": global_roi})
|
|
result[f"{camera}:processing_parameters"] = params_json
|
|
|
|
# Running averages (thread-safe, incremental sum)
|
|
ravg_results = {}
|
|
exclude = {
|
|
parameters["e_int_name"],
|
|
parameters["e_axis_name"],
|
|
f"{camera}:processing_parameters"
|
|
}
|
|
with ravg_lock:
|
|
for pv in base_pv_names:
|
|
if pv not in exclude:
|
|
buf = ravg_buffers.setdefault(pv, deque(maxlen=global_ravg_length))
|
|
sum_val = ravg_sum_map.get(pv, 0.0)
|
|
if len(buf) == buf.maxlen:
|
|
old = buf.popleft()
|
|
sum_val -= old
|
|
buf.append(result[pv])
|
|
sum_val += result[pv]
|
|
ravg_sum_map[pv] = sum_val
|
|
ravg_results[pv + "-RAVG"] = sum_val / len(buf)
|
|
|
|
# N-shot average (incremental, no full-stack)
|
|
global avg_sum
|
|
if avg_sum is None:
|
|
avg_sum = np.zeros_like(spectrum)
|
|
if len(avg_buffer) == global_avg_n:
|
|
old = avg_buffer.popleft()
|
|
avg_sum -= old
|
|
avg_buffer.append(spectrum)
|
|
avg_sum += spectrum
|
|
avg_spectrum = avg_sum / len(avg_buffer)
|
|
|
|
# Fit and stats on avg_spectrum
|
|
sm_avg = scipy.signal.savgol_filter(avg_spectrum, 51, 3)
|
|
min_a, max_a = sm_avg.min(), sm_avg.max()
|
|
amp_a = max_a - min_a
|
|
skip_a = amp_a <= nrows * 1.5
|
|
offs_a, amp_fit_a, center_a, sigma_a = functions.gauss_fit_psss(
|
|
sm_avg[::2],
|
|
axis[:len(sm_avg)][::2],
|
|
offset=min_a,
|
|
amplitude=amp_a,
|
|
skip=skip_a,
|
|
maxfev=10
|
|
)
|
|
fit_avg_spectrum = offs_a + amp_fit_a * np.exp(
|
|
-((axis[:len(sm_avg)] - center_a)**2) / (2 * sigma_a**2)
|
|
)
|
|
sm_norm_a = sm_avg / np.sum(sm_avg)
|
|
spect_com_a = np.dot(axis[:len(sm_norm_a)], sm_norm_a)
|
|
spect_std_a = np.sqrt(np.dot((axis[:len(sm_norm_a)] - spect_com_a)**2, sm_norm_a))
|
|
spect_skew_a = np.dot((axis[:len(sm_norm_a)] - spect_com_a)**3, sm_norm_a) / (spect_std_a**3)
|
|
cum_a = np.cumsum(sm_norm_a)
|
|
e25_a = np.interp(0.25, cum_a, axis[:len(cum_a)])
|
|
e75_a = np.interp(0.75, cum_a, axis[:len(cum_a)])
|
|
spect_iqr_a = e75_a - e25_a
|
|
spect_res_a = spect_iqr_a / spect_com_a * 1000
|
|
|
|
avg_results = {
|
|
f"{camera}:AVG-FIT-COM": np.float64(center_a),
|
|
f"{camera}:AVG-FIT-FWHM": np.float64(2.355 * sigma_a),
|
|
f"{camera}:AVG-FIT-RMS": np.float64(sigma_a),
|
|
f"{camera}:AVG-FIT-RES": np.float64(2.355 * sigma_a / center_a * 1000),
|
|
f"{camera}:AVG-SPECT-COM": spect_com_a,
|
|
f"{camera}:AVG-SPECT-RMS": spect_std_a,
|
|
f"{camera}:AVG-SPECT-SKEW": spect_skew_a,
|
|
f"{camera}:AVG-SPECT-IQR": spect_iqr_a,
|
|
f"{camera}:AVG-SPECT-RES": np.float64(spect_res_a),
|
|
f"{camera}:AVG-SPECTRUM_Y": avg_spectrum,
|
|
f"{camera}:AVG-FIT-SPECTRUM_Y": fit_avg_spectrum
|
|
}
|
|
|
|
# Merge and queue for PV update
|
|
full = {**result, **ravg_results, **avg_results}
|
|
if epics_lock.acquire(False):
|
|
try:
|
|
if pulse_id > sent_pid:
|
|
sent_pid = pulse_id
|
|
buffer.append(tuple(full[pv] for pv in all_pv_names))
|
|
finally:
|
|
epics_lock.release()
|
|
|
|
return full
|
|
|