164 lines
6.3 KiB
Python
164 lines
6.3 KiB
Python
from logging import getLogger
|
|
from cam_server.pipeline.data_processing import functions
|
|
from cam_server.utils import create_thread_pvs, epics_lock
|
|
from collections import deque
|
|
import json
|
|
import numpy as np
|
|
import scipy.signal
|
|
import numba
|
|
import time
|
|
from threading import Thread
|
|
|
|
numba.set_num_threads(4)
|
|
|
|
_logger = getLogger(__name__)
|
|
|
|
# Shared variables
|
|
channel_names = None
|
|
roi = [0, 0]
|
|
initialized = False
|
|
sent_pid = -1
|
|
nrows = 1
|
|
axis = None
|
|
buffer = deque(maxlen=5)
|
|
|
|
|
|
@numba.njit(parallel=False)
|
|
def get_spectrum(image, background):
|
|
"""Extracts a spectrum from the image by subtracting background."""
|
|
y, x = image.shape
|
|
profile = np.zeros(x, dtype=np.float64)
|
|
for i in numba.prange(y):
|
|
for j in range(x):
|
|
profile[j] += image[i, j] - background[i, j]
|
|
return profile
|
|
|
|
|
|
def update_PVs(buffer, output_pv_name, center_pv_name, fwhm_pv_name, fit_rms_pv_name, fit_res_pv_name, com_pv_name, std_pv_name, res_pv_name):
|
|
"""Updates EPICS PVs using buffered processing."""
|
|
[output_pv, center_pv, fwhm_pv, fit_rms_pv, fit_res_pv, com_pv, std_pv, res_pv] = create_thread_pvs(
|
|
[output_pv_name, center_pv_name, fwhm_pv_name, fit_rms_pv_name, fit_res_pv_name, com_pv_name, std_pv_name, res_pv_name]
|
|
)
|
|
|
|
while True:
|
|
time.sleep(0.1)
|
|
try:
|
|
rec = buffer.popleft()
|
|
except IndexError:
|
|
continue
|
|
try:
|
|
if output_pv and output_pv.connected and (rec[0] is not None):
|
|
output_pv.put(rec[0])
|
|
if center_pv and center_pv.connected and (rec[1] is not None):
|
|
center_pv.put(rec[1])
|
|
if fwhm_pv and fwhm_pv.connected and (rec[2] is not None):
|
|
fwhm_pv.put(rec[2])
|
|
if com_pv and com_pv.connected and (rec[3] is not None):
|
|
com_pv.put(rec[3])
|
|
if std_pv and std_pv.connected and (rec[4] is not None):
|
|
std_pv.put(rec[4])
|
|
if fit_rms_pv and fit_rms_pv.connected and (rec[5] is not None):
|
|
fit_rms_pv.put(rec[5])
|
|
if fit_res_pv and fit_res_pv.connected and (rec[6] is not None):
|
|
fit_res_pv.put(rec[6])
|
|
if res_pv and res_pv.connected and (rec[7] is not None):
|
|
res_pv.put(rec[7])
|
|
except Exception as e:
|
|
_logger.exception(f"Error updating channels: {e}")
|
|
|
|
|
|
def initialize(params):
|
|
"""Initializes processing and starts the PV update thread."""
|
|
global ymin_pv, ymax_pv, axis_pv, buffer, channel_names
|
|
|
|
camera_name = params["camera_name"]
|
|
output_pv_name = params["e_int_name"]
|
|
axis_pv_name = params["e_axis_name"]
|
|
center_pv_name = camera_name + ":FIT-COM"
|
|
fwhm_pv_name = camera_name + ":FIT-FWHM"
|
|
fit_rms_pv_name = camera_name + ":FIT-RMS"
|
|
fit_res_pv_name = camera_name + ":FIT-RES"
|
|
ymin_pv_name = camera_name + ":SPC_ROI_YMIN"
|
|
ymax_pv_name = camera_name + ":SPC_ROI_YMAX"
|
|
com_pv_name = camera_name + ":SPECT-COM"
|
|
std_pv_name = camera_name + ":SPECT-RMS"
|
|
res_pv_name = camera_name + ":SPECT-RES"
|
|
|
|
thread = Thread(target=update_PVs, args=(buffer, output_pv_name, center_pv_name, fwhm_pv_name, fit_rms_pv_name, fit_res_pv_name, com_pv_name, std_pv_name, res_pv_name))
|
|
thread.start()
|
|
|
|
channel_names = [ymin_pv_name, ymax_pv_name, axis_pv_name]
|
|
|
|
|
|
def process_image(image, pulse_id, timestamp, x_axis, y_axis, parameters, bsdata=None, background=None):
|
|
"""Processes the image and extracts spectrum features."""
|
|
try:
|
|
global roi, initialized, sent_pid, nrows, axis, buffer
|
|
|
|
if not initialized:
|
|
initialize(parameters)
|
|
initialized = True
|
|
[ymin_pv, ymax_pv, axis_pv] = create_thread_pvs(channel_names)
|
|
|
|
camera_name = parameters["camera_name"]
|
|
|
|
if ymin_pv and ymin_pv.connected:
|
|
roi[0] = ymin_pv.value
|
|
if ymax_pv and ymax_pv.connected:
|
|
roi[1] = ymax_pv.value
|
|
|
|
if axis_pv and axis_pv.connected:
|
|
axis = axis_pv.value
|
|
else:
|
|
axis = None
|
|
|
|
if axis is None or len(axis) < image.shape[1]:
|
|
_logger.warning("Invalid energy axis")
|
|
return None
|
|
|
|
axis = axis[:image.shape[1]]
|
|
processing_image = image.astype(np.float32) - np.float32(parameters["pixel_bkg"])
|
|
nrows, ncols = processing_image.shape
|
|
|
|
background_image = parameters.pop('background_data', None)
|
|
if isinstance(background_image, np.ndarray) and background_image.shape == processing_image.shape:
|
|
spectrum = get_spectrum(processing_image, background_image)
|
|
else:
|
|
spectrum = np.sum(processing_image, axis=0)
|
|
|
|
smoothed_spectrum = scipy.signal.savgol_filter(spectrum, 51, 3)
|
|
minimum, maximum = smoothed_spectrum.min(), smoothed_spectrum.max()
|
|
amplitude = maximum - minimum
|
|
skip = amplitude <= nrows * 1.5
|
|
|
|
offset, amplitude, center, sigma = functions.gauss_fit_psss(smoothed_spectrum[::2], axis[::2], offset=minimum, amplitude=amplitude, skip=skip, maxfev=10)
|
|
|
|
spectrum_com = np.sum(axis * smoothed_spectrum) / np.sum(smoothed_spectrum)
|
|
spectrum_std = np.sqrt(np.sum((axis - spectrum_com) ** 2 * smoothed_spectrum))
|
|
|
|
processed_data = {
|
|
camera_name + ":SPECTRUM_Y": spectrum,
|
|
camera_name + ":SPECTRUM_Y_SUM": np.sum(spectrum),
|
|
camera_name + ":SPECTRUM_X": axis,
|
|
camera_name + ":FIT-COM": np.float64(center),
|
|
camera_name + ":FIT-FWHM": np.float64(2.355 * sigma),
|
|
camera_name + ":FIT-RMS": np.float64(sigma),
|
|
camera_name + ":FIT-RES": (np.float64(2.355 * sigma) / np.float64(center)) * 1000,
|
|
camera_name + ":SPECT-COM": spectrum_com,
|
|
camera_name + ":SPECT-RMS": spectrum_std,
|
|
camera_name + ":SPECT-RES": (np.float64(2.355 * spectrum_std) / np.float64(spectrum_com)) * 1000,
|
|
}
|
|
|
|
if epics_lock.acquire(False):
|
|
try:
|
|
if pulse_id > sent_pid:
|
|
sent_pid = pulse_id
|
|
buffer.append((spectrum, center, 2.355 * sigma, spectrum_com, spectrum_std, sigma, (2.355 * sigma / center) * 1000, (2.355 * spectrum_std / spectrum_com) * 1000))
|
|
finally:
|
|
epics_lock.release()
|
|
|
|
return processed_data # Ensure the processed data is returned
|
|
except Exception as ex:
|
|
_logger.warning(str(ex))
|
|
return {}
|