From 60fa98ef74a35b715c4d3fb786d2b7dce8e5cc5f Mon Sep 17 00:00:00 2001 From: Alexander Steppke Date: Sun, 29 Jan 2023 16:07:36 +0100 Subject: [PATCH] restructured analysis, now can perform any np ufunc operation --- src/cristallina/analysis.py | 176 ++++++++++++++++++++---------------- 1 file changed, 97 insertions(+), 79 deletions(-) diff --git a/src/cristallina/analysis.py b/src/cristallina/analysis.py index fce2b18..c78916d 100644 --- a/src/cristallina/analysis.py +++ b/src/cristallina/analysis.py @@ -13,25 +13,26 @@ from .utils import ROI memory = None + def setup_cachedirs(pgroup=None, cachedir=None): - """ + """ Sets the path to a persistent cache directory either from the given p-group (e.g. "p20841") or an explicitly given directory. If heuristics fail we use "/tmp" as a non-persistent alternative. """ - + global memory if cachedir is not None: # explicit directory given, use this choice memory = Memory(cachedir, verbose=0, compress=2) - return + return try: if pgroup is None: pgroup_no = utils.heuristic_extract_pgroup() else: - parts = re.split(r'(\d.*)', pgroup) # ['p', '2343', ''] + parts = re.split(r"(\d.*)", pgroup) # ['p', '2343', ''] pgroup_no = parts[-2] cachedir = f"/das/work/units/cristallina/p{pgroup_no}/cachedir" except KeyError as e: @@ -43,9 +44,79 @@ def setup_cachedirs(pgroup=None, cachedir=None): except PermissionError as e: cachedir = "/tmp" memory = Memory(cachedir, verbose=0, compress=2) - + + setup_cachedirs() + +@memory.cache(ignore=["batch_size"]) # we ignore batch_size for caching purposes +def perform_image_calculations( + fileset, + channel="JF16T03V01", + alignment_channels=None, + batch_size=10, + roi: ROI = None, + preview=False, + operations=["sum"], +): + """ + Performs one or more calculations ("sum", "mean" or "std") for a given region of interest (roi) + for an image channel from a fileset (e.g. "run0352/data/acq0001.*.h5" or step.fnames from a SFScanInfo object). + + Allows alignment, i.e. reducing only to a common subset with other channels. + + Calculations are performed in batches to reduce maximum memory requirements. + + Preview only applies calculation to first batch and returns. + + Returns a dictionary ({"JF16T03V01_intensity":[11, 18, 21, 55, ...]}) + with the given channel values for each pulse and corresponding pulse id. + """ + + possible_operations = { + "sum": ["intensity", np.sum], + "mean": ["mean", np.mean], + "std": ["mean", np.std], + } + + with SFDataFiles(*fileset) as data: + if alignment_channels is not None: + channels = [channel] + [ch for ch in alignment_channels] + else: + channels = [channel] + + subset = data[channels] + + subset.drop_missing() + + Images = subset[channel] + + res = defaultdict(list) + res["roi"] = repr(roi) + + for image_slice in Images.in_batches(batch_size): + + index_slice, im = image_slice + + if roi is None: + im_ROI = im[:] + else: + im_ROI = im[:, roi.rows, roi.cols] + + # iterate over all operations + for op in operations: + label, func = possible_operations[op] + res[f"{channel}_{label}"].extend(func(im_ROI, axis=(1, 2))) + + res["pids"].extend(Images.pids[index_slice]) + + # only return first batch + if preview: + break + + return res + + @memory.cache(ignore=["batch_size"]) # we ignore batch_size for caching purposes def sum_images( fileset, @@ -62,47 +133,24 @@ def sum_images( Allows alignment, i.e. reducing only to a common subset with other channels. Summation is performed in batches to reduce maximum memory requirements. - + Preview only sums and returns the first batch. - Returns a dictionary ({"JF16T03V01_intensity":[11, 18, 21, 55, ...]}) + Returns a dictionary ({"JF16T03V01_intensity":[11, 18, 21, 55, ...]}) with the given channel intensity for each pulse and corresponding pulse id. """ - with SFDataFiles(*fileset) as data: - if alignment_channels is not None: - channels = [channel] + [ch for ch in alignment_channels] - else: - channels = [channel] + return perform_image_calculations( + fileset, + channel=channel, + alignment_channels=alignment_channels, + batch_size=batch_size, + roi=roi, + preview=preview, + operations=["sum"], + ) - subset = data[channels] - subset.drop_missing() - - Images = subset[channel] - - res = defaultdict(list) - res["roi"] = repr(roi) - - for image_slice in Images.in_batches(batch_size): - - index_slice, im = image_slice - - if roi is None: - im_ROI = im[:] - else: - im_ROI = im[:, roi.rows, roi.cols] - - res[f"{channel}_intensity"].extend(np.sum(im_ROI, axis=(1, 2))) - res["pids"].extend(Images.pids[index_slice]) - - # only return first batch - if preview: - break - - return res - -@memory.cache(ignore=["batch_size"]) # we ignore batch_size for caching purposes def get_contrast_images( fileset, channel="JF16T03V01", @@ -112,45 +160,15 @@ def get_contrast_images( preview=False, ): """ - Sums a given region of interest (roi) for an image channel from a - given fileset (e.g. "run0352/data/acq0001.*.h5"). - - Allows alignment, i.e. reducing only to a common subset with other channels. - - Summation is performed in batches, preview only sums and returns the first batch. + See perform_image_calculations. Here calculates mean and standard deviation for a given set of images. """ - with SFDataFiles(*fileset) as data: - if alignment_channels is not None: - channels = [channel] + [ch for ch in alignment_channels] - else: - channels = [channel] - - subset = data[channels] - - subset.drop_missing() - - Images = subset[channel] - - res = defaultdict(list) - res["roi"] = repr(roi) - - for image_slice in Images.in_batches(batch_size): - - index_slice, im = image_slice - - if roi is None: - im_ROI = im[:] - else: - im_ROI = im[:, roi.rows, roi.cols] - - res[f"{channel}_mean"].extend(np.mean(im_ROI, axis=(1, 2))) - res[f"{channel}_std"].extend(np.std(im_ROI, axis=(1, 2))) - - res["pids"].extend(Images.pids[index_slice]) - - # only return first batch - if preview: - break - - return res + return perform_image_calculations( + fileset, + channel=channel, + alignment_channels=alignment_channels, + batch_size=batch_size, + roi=roi, + preview=preview, + operations=["mean", "std"], + )