Files
cristallina_analysis_package/src/cristallina/analysis.py

175 lines
4.7 KiB
Python

import re
from collections import defaultdict
import numpy as np
from sfdata import SFDataFiles, sfdatafile, SFScanInfo
import joblib
from joblib import Parallel, delayed, Memory
from . import utils
from .utils import ROI
memory = None
def setup_cachedirs(pgroup=None, cachedir=None):
"""
Sets the path to a persistent cache directory either from the given p-group (e.g. "p20841")
or an explicitly given directory.
If heuristics fail we use "/tmp" as a non-persistent alternative.
"""
global memory
if cachedir is not None:
# explicit directory given, use this choice
memory = Memory(cachedir, verbose=0, compress=2)
return
try:
if pgroup is None:
pgroup_no = utils.heuristic_extract_pgroup()
else:
parts = re.split(r"(\d.*)", pgroup) # ['p', '2343', '']
pgroup_no = parts[-2]
cachedir = f"/das/work/units/cristallina/p{pgroup_no}/cachedir"
except KeyError as e:
print(e)
cachedir = "/das/work/units/cristallina/p19739/cachedir"
try:
memory = Memory(cachedir, verbose=0, compress=2)
except PermissionError as e:
cachedir = "/tmp"
memory = Memory(cachedir, verbose=0, compress=2)
setup_cachedirs()
@memory.cache(ignore=["batch_size"]) # we ignore batch_size for caching purposes
def perform_image_calculations(
fileset,
channel="JF16T03V01",
alignment_channels=None,
batch_size=10,
roi: ROI = None,
preview=False,
operations=["sum"],
):
"""
Performs one or more calculations ("sum", "mean" or "std") for a given region of interest (roi)
for an image channel from a fileset (e.g. "run0352/data/acq0001.*.h5" or step.fnames from a SFScanInfo object).
Allows alignment, i.e. reducing only to a common subset with other channels.
Calculations are performed in batches to reduce maximum memory requirements.
Preview only applies calculation to first batch and returns.
Returns a dictionary ({"JF16T03V01_intensity":[11, 18, 21, 55, ...]})
with the given channel values for each pulse and corresponding pulse id.
"""
possible_operations = {
"sum": ["intensity", np.sum],
"mean": ["mean", np.mean],
"std": ["mean", np.std],
}
with SFDataFiles(*fileset) as data:
if alignment_channels is not None:
channels = [channel] + [ch for ch in alignment_channels]
else:
channels = [channel]
subset = data[channels]
subset.drop_missing()
Images = subset[channel]
res = defaultdict(list)
res["roi"] = repr(roi)
for image_slice in Images.in_batches(batch_size):
index_slice, im = image_slice
if roi is None:
im_ROI = im[:]
else:
im_ROI = im[:, roi.rows, roi.cols]
# iterate over all operations
for op in operations:
label, func = possible_operations[op]
res[f"{channel}_{label}"].extend(func(im_ROI, axis=(1, 2)))
res["pids"].extend(Images.pids[index_slice])
# only return first batch
if preview:
break
return res
@memory.cache(ignore=["batch_size"]) # we ignore batch_size for caching purposes
def sum_images(
fileset,
channel="JF16T03V01",
alignment_channels=None,
batch_size=10,
roi: ROI = None,
preview=False,
):
"""
Sums a given region of interest (roi) for an image channel from a
given fileset (e.g. "run0352/data/acq0001.*.h5" or step.fnames from a SFScanInfo object).
Allows alignment, i.e. reducing only to a common subset with other channels.
Summation is performed in batches to reduce maximum memory requirements.
Preview only sums and returns the first batch.
Returns a dictionary ({"JF16T03V01_intensity":[11, 18, 21, 55, ...]})
with the given channel intensity for each pulse and corresponding pulse id.
"""
return perform_image_calculations(
fileset,
channel=channel,
alignment_channels=alignment_channels,
batch_size=batch_size,
roi=roi,
preview=preview,
operations=["sum"],
)
def get_contrast_images(
fileset,
channel="JF16T03V01",
alignment_channels=None,
batch_size=10,
roi: ROI = None,
preview=False,
):
"""
See perform_image_calculations. Here calculates mean and standard deviation for a given set of images.
"""
return perform_image_calculations(
fileset,
channel=channel,
alignment_channels=alignment_channels,
batch_size=batch_size,
roi=roi,
preview=preview,
operations=["mean", "std"],
)