Files
cristallina_analysis_package/src/cristallina/analysis.py

480 lines
13 KiB
Python

import re
from collections import defaultdict
from typing import Optional
import numpy as np
from matplotlib import pyplot as plt
import lmfit
from sfdata import SFDataFiles, sfdatafile, SFScanInfo
import joblib
from joblib import Parallel, delayed, Memory
from . import utils
from .utils import ROI
def setup_cachedirs(pgroup=None, cachedir=None):
"""
Sets the path to a persistent cache directory either from the given p-group (e.g. "p20841")
or an explicitly given directory.
If heuristics fail we use "/tmp" as a non-persistent alternative.
"""
if cachedir is not None:
# explicit directory given, use this choice
memory = Memory(cachedir, verbose=0, compress=2)
return memory
try:
if pgroup is None:
pgroup_no = utils.heuristic_extract_pgroup()
else:
parts = re.split(r"(\d.*)", pgroup) # ['p', '2343', '']
pgroup_no = parts[-2]
cachedir = f"/das/work/units/cristallina/p{pgroup_no}/cachedir"
except KeyError as e:
print(e)
cachedir = "/das/work/units/cristallina/p19739/cachedir"
try:
memory = Memory(cachedir, verbose=0, compress=2)
except PermissionError as e:
cachedir = "/tmp"
memory = Memory(cachedir, verbose=0, compress=2)
return memory
memory = None
memory = setup_cachedirs()
@memory.cache(ignore=["batch_size"]) # we ignore batch_size for caching purposes
def perform_image_calculations(
filesets,
channel="JF16T03V01",
alignment_channels=None,
batch_size=10,
roi: Optional[ROI] = None,
preview=False,
operations=["sum"],
):
"""
Performs one or more calculations ("sum", "mean" or "std") for a given region of interest (roi)
for an image channel from a fileset (e.g. ["run0352/data/acq0001.*.h5"] or step.fnames from a SFScanInfo object).
Allows alignment, i.e. reducing only to a common subset with other channels.
Calculations are performed in batches to reduce maximum memory requirements.
Preview only applies calculation to first batch and returns.
Returns a dictionary ({"JF16T03V01_intensity":[11, 18, 21, 55, ...]})
with the given channel values for each pulse and corresponding pulse id.
"""
possible_operations = {
"sum": ["intensity", np.sum],
"mean": ["mean", np.mean],
"std": ["mean", np.std],
}
with SFDataFiles(*filesets) as data:
if alignment_channels is not None:
channels = [channel] + [ch for ch in alignment_channels]
else:
channels = [channel]
subset = data[channels]
subset.drop_missing()
Images = subset[channel]
res = defaultdict(list)
res["roi"] = repr(roi)
for image_slice in Images.in_batches(batch_size):
index_slice, im = image_slice
if roi is None:
im_ROI = im[:]
else:
im_ROI = im[:, roi.rows, roi.cols]
# iterate over all operations
for op in operations:
label, func = possible_operations[op]
res[f"{channel}_{label}"].extend(func(im_ROI, axis=(1, 2)))
res["pids"].extend(Images.pids[index_slice])
# only return first batch
if preview:
break
return res
@memory.cache(ignore=["batch_size"]) # we ignore batch_size for caching purposes
def sum_images(
filesets,
channel="JF16T03V01",
alignment_channels=None,
batch_size=10,
roi: Optional[ROI] = None,
preview=False,
):
"""
Sums a given region of interest (roi) for an image channel from a
given fileset (e.g. "run0352/data/acq0001.*.h5" or step.fnames from a SFScanInfo object).
Allows alignment, i.e. reducing only to a common subset with other channels.
Summation is performed in batches to reduce maximum memory requirements.
Preview only sums and returns the first batch.
Returns a dictionary ({"JF16T03V01_intensity":[11, 18, 21, 55, ...]})
with the given channel intensity for each pulse and corresponding pulse id.
"""
return perform_image_calculations(
filesets,
channel=channel,
alignment_channels=alignment_channels,
batch_size=batch_size,
roi=roi,
preview=preview,
operations=["sum"],
)
def get_contrast_images(
filesets,
channel="JF16T03V01",
alignment_channels=None,
batch_size=10,
roi: Optional[ROI] = None,
preview=False,
):
"""
See perform_image_calculations. Here calculates mean and standard deviation for a given set of images.
"""
return perform_image_calculations(
filesets,
channel=channel,
alignment_channels=alignment_channels,
batch_size=batch_size,
roi=roi,
preview=preview,
operations=["mean", "std"],
)
@memory.cache(ignore=["batch_size"]) # we ignore batch_size for caching purposes
def perform_image_stack_sum(
filesets,
channel="JF16T03V01",
alignment_channels=None,
batch_size=10,
roi: Optional[ROI] = None,
preview=False,
# operations=["sum"],
lower_cutoff_threshold=None,
):
"""
Performs one or more calculations ("sum", "mean" or "std") for a given region of interest (roi)
for an image channel from a fileset (e.g. "run0352/data/acq0001.*.h5" or step.fnames from a SFScanInfo object).
Allows alignment, i.e. reducing only to a common subset with other channels.
Calculations are performed in batches to reduce maximum memory requirements.
Preview only applies calculation to first batch and returns.
Returns a dictionary ({"JF16T03V01_intensity":[11, 18, 21, 55, ...]})
with the given channel values for each pulse and corresponding pulse id.
"""
possible_operations = {
"sum": ["intensity", np.sum],
"mean": ["mean", np.mean],
"std": ["mean", np.std],
}
with SFDataFiles(*filesets) as data:
if alignment_channels is not None:
channels = [channel] + [ch for ch in alignment_channels]
else:
channels = [channel]
subset = data[channels]
subset.drop_missing()
Images = subset[channel]
# create empty array for stack sum with right shape
im = Images[0]
if roi is None:
im_ROI = im[:]
else:
im_ROI = im[:, roi.rows, roi.cols]
summed = np.zeros(im_ROI[0].shape)
#for image_slice, slice2 in zip(Images.in_batches(batch_size), other.in_batches(batch_size)):
for image_slice in Images.in_batches(batch_size):
index_slice, im = image_slice
if roi is None:
im_ROI = im[:]
else:
im_ROI = im[:, roi.rows, roi.cols]
if lower_cutoff_threshold is not None:
im_ROI = np.clip(im_ROI, lower_cutoff_threshold, np.inf)
summed = summed + np.sum(im_ROI, axis=(0))
# only return first batch
if preview:
break
return summed
def fit_2d_gaussian(image, roi: Optional[ROI] = None, plot=False):
"""
2D Gaussian fit using LMFit for a given image and an optional region of interest.
plot=True as optional argument plots the fit results.
Returns the x, y coordinates of the center and the results object which contains
further fit statistics.
"""
# given an image and optional ROI
if roi is not None:
im = image[roi.rows, roi.cols]
else:
im = image
len_y, len_x = im.shape
y = np.arange(len_y)
x = np.arange(len_x)
x, y = np.meshgrid(x, y) # here now a 2D mesh
x, y = x.ravel(), y.ravel() # and all back into sequences of 1D arrays
z = im.ravel() # and this also as a 1D
model = lmfit.models.Gaussian2dModel()
params = model.guess(z.astype('float'), x.astype('float'), y.astype('float'))
result = model.fit(
z,
x=x,
y=y,
params=params,
method="leastsq",
verbose=False,
nan_policy=None,
max_nfev=None,
)
if roi is not None:
# convert back to original image coordinates
center_x = roi.left + result.params["centerx"]
center_y = roi.bottom + result.params["centery"]
else:
center_x = result.params["centerx"].value
center_y = result.params["centery"].value
if plot == True:
_plot_2d_gaussian_fit(im, z, model, result)
return center_x, center_y, result
def _plot_2d_gaussian_fit(im, z, model, result):
"""Plot helper function to use the current image data, model and fit result and
plots them together.
"""
from matplotlib import pyplot as plt
from scipy.interpolate import griddata
len_y, len_x = im.shape
X, Y = np.meshgrid(np.arange(len_x), np.arange(len_y))
Z = griddata((X.ravel(), Y.ravel()), z, (X, Y), method="linear", fill_value=np.nan)
fig, axs = plt.subplots(2, 2, figsize=(10, 10), layout="constrained")
for ax in axs.ravel():
ax.axis("equal")
ax.set_xlabel("x")
ax.set_ylabel("y")
vmax = np.max(Z)
ax = axs[0, 0]
art = ax.pcolormesh(X, Y, Z, vmin=0, vmax=vmax, shading="auto")
fig.colorbar(art, ax=ax, label="z", shrink=0.5)
ax.set_title("Data")
ax = axs[0, 1]
fit = model.func(X, Y, **result.best_values)
art = ax.pcolormesh(X, Y, fit, vmin=0, vmax=vmax, shading="auto")
fig.colorbar(art, ax=ax, label="z", shrink=0.5)
ax.set_title("Fit")
ax = axs[1, 0]
fit = model.func(X, Y, **result.best_values)
art = ax.pcolormesh(
X, Y, Z - fit, vmin=-0.05 * vmax, vmax=0.05 * vmax, cmap="gray", shading="auto"
)
fig.colorbar(art, ax=ax, label="z", shrink=0.5)
ax.set_title("Data - Fit")
ax = axs[1, 1]
fit = model.func(X, Y, **result.best_values)
art = ax.pcolormesh(X, Y, fit, vmin=0, vmax=vmax, shading="auto")
ax.contour(X, Y, fit, 8, colors="r", alpha=0.4)
fig.colorbar(art, ax=ax, label="z", shrink=0.5)
ax.set_title("Data & Fit")
fig.suptitle("2D Gaussian fit results")
def gaussian2d_rot_model(
x,
y=0.0,
amplitude=1.0,
centerx=0.0,
centery=0.0,
sigmax=1.0,
sigmay=1.0,
rotation=0,
background=0,
):
"""Returns a two-dimensional Gaussian model from lmfit with a rotation in radians around the center."""
sr = np.sin(rotation)
cr = np.cos(rotation)
center_x_rot = centerx * cr - centery * sr
center_y_rot = centerx * sr + centery * cr
x_rot = x * cr - y * sr
y_rot = x * sr + y * cr
return (
lmfit.models.gaussian2d(
x_rot,
y=y_rot,
amplitude=amplitude,
centerx=center_x_rot,
centery=center_y_rot,
sigmax=sigmax,
sigmay=sigmay,
)
+ background
)
def fit_2d_gaussian_rotated(
image,
roi=None,
plot=False,
vary_rotation=True,
vary_background=False,
):
"""
2D Gaussian fit with rotation using LMFit for a given image and an optional region of interest.
As the number of free parameters for this kind of fit is large issues with convergence appear often.
Here we first fit without rotation, use the obtained parameters as a starting guess.
plot = True as optional argument plots the fit results.
Returns the x, y coordinates of the center and the results object which contains
further fit statistics.
"""
# given an image and optional ROI
if roi is not None:
im = image[roi.rows, roi.cols]
else:
im = image
len_y, len_x = im.shape
y = np.arange(len_y)
x = np.arange(len_x)
x, y = np.meshgrid(x, y) # here now a 2D mesh
x, y = x.ravel(), y.ravel() # and all back into sequences of 1D arrays
z = im.ravel() # and this also as a 1D
mod = lmfit.Model(gaussian2d_rot_model, independent_vars=["x", "y"])
# Guess parameters, this is one possible approach
mod.set_param_hint("amplitude", value=np.max(z) * 0.75, min=0, vary=True)
mod.set_param_hint("centerx", value=np.mean(x) / 2, vary=True)
mod.set_param_hint("centery", value=np.mean(y) / 2, vary=True)
mod.set_param_hint("sigmax", value=np.mean(x) / 10, vary=True)
mod.set_param_hint("sigmay", value=np.mean(y) / 10, vary=True)
mod.set_param_hint("rotation", value=0.0, min=-np.pi / 2, max=np.pi / 2, vary=False)
mod.set_param_hint("background", value=0.0, vary=vary_background)
params = mod.make_params(verbose=False)
# first fit without rotation
result = mod.fit(
z,
x=x,
y=y,
params=params,
method="leastsq",
verbose=False,
nan_policy=None,
max_nfev=20,
)
# now refining with rotation
params = result.params
params["rotation"].set(vary=vary_rotation)
result = mod.fit(
z,
x=x,
y=y,
params=params,
method="leastsq",
verbose=False,
nan_policy=None,
max_nfev=None,
)
if roi is not None:
# convert back to original image coordinates
center_x = roi.left + result.params["centerx"]
center_y = roi.bottom + result.params["centery"]
else:
center_x = result.params["centerx"].value
center_y = result.params["centery"].value
if plot == True:
_plot_2d_gaussian_fit(im, z, mod, result)
return center_x, center_y, result