Files
camserver_sf/configuration/user_scripts/spectrometer.py
2023-08-17 10:57:35 +02:00

131 lines
3.6 KiB
Python

import sys
import json
from functools import lru_cache
import numpy as np
from logging import getLogger
_logger = getLogger(__name__)
def process_image(image, pulse_id, timestamp, x_axis, y_axis, parameters, bsdata=None):
image = image.astype(int)
camera_name = parameters["camera_name"]
background = parameters.get("background_data")
background_name = parameters.get("image_background")
background_mode = parameters.get("image_background_enable")
roi_signal = parameters.get("roi_signal")
roi_background = parameters.get("roi_background")
project_axis = parameters.get("project_axis", 0)
threshold = parameters.get("threshold")
roi_radial = parameters.get("roi_radial")
radial_x0 = parameters.get("radial_x0")
radial_y0 = parameters.get("radial_y0")
# maintain the structure of processing_parameters
background_shape = None
# maintain the structure of res
projection_signal = projection_background = None
projection_radial_bins = projection_radial_counts = None
try:
if background is not None:
background_shape = background.shape
image -= background
if threshold is not None:
image -= threshold
image[image < 0] = 0
if roi_signal is not None:
projection_signal = get_roi_projection(image, roi_signal, project_axis)
if roi_background is not None:
projection_background = get_roi_projection(image, roi_background, project_axis)
if roi_radial is not None:
projection_radial_bins, projection_radial_counts = get_radial_projection(image, roi_radial, radial_x0, radial_y0)
except Exception as e:
_logger.exception("Error processing pulse_id " + str(pulse_id) + ": " + str(e))
lineno = sys.exc_info()[2].tb_lineno
tn = type(e).__name__
status = f"Error in line number {lineno}: {tn}: {e}"
else:
status = "OK"
processing_parameters = {
"image_shape": image.shape,
"background_shape": background_shape,
"background_name": background_name,
"background_mode": background_mode,
"roi_signal": roi_signal,
"roi_background": roi_background,
"project_axis": project_axis,
"threshold": threshold,
"roi_radial": roi_radial,
"radial_x0": radial_x0,
"radial_y0": radial_y0,
"status": status
}
processing_parameters = json.dumps(processing_parameters)
res = {
camera_name + ".processing_parameters": processing_parameters,
camera_name + ".projection_signal": projection_signal,
camera_name + ".projection_background": projection_background,
camera_name + ".projection_radial_bins": projection_radial_bins,
camera_name + ".projection_radial_counts": projection_radial_counts
}
return res
def get_roi_projection(image, roi, axis):
x_start, x_stop, y_start, y_stop = roi
cropped = image[x_start:x_stop, y_start:y_stop]
project = cropped.mean(axis=axis)
return project
def get_radial_projection(image, roi, x0, y0):
x_start, x_stop, y_start, y_stop = roi
image = image[x_start:x_stop, y_start:y_stop]
r, norm, ur = calc_r_norm_ur(image.shape, x0, y0)
image = image.ravel()
count = np.bincount(r, image)
res = count / norm
return ur, res
@lru_cache
def calc_r_norm_ur(shape, x0, y0):
r = calc_r(shape, x0, y0)
norm = np.bincount(r)
ur = np.unique(r)
return r, norm, ur
def calc_r(shape, x0, y0):
y, x = np.indices(shape)
r = np.sqrt((x - x0)**2 + (y - y0)**2)
r = r.ravel().astype(int)
return r