diff --git a/dap/.gitignore b/dap/.gitignore new file mode 100644 index 0000000..82f9275 --- /dev/null +++ b/dap/.gitignore @@ -0,0 +1,162 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ +cover/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +.pybuilder/ +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +# For a library or package, you might want to ignore these files since the code is +# intended to run in multiple environments; otherwise, check them in: +# .python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# poetry +# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control. +# This is especially recommended for binary packages to ensure reproducibility, and is more +# commonly ignored for libraries. +# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control +#poetry.lock + +# pdm +# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control. +#pdm.lock +# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it +# in version control. +# https://pdm.fming.dev/latest/usage/project/#working-with-version-control +.pdm.toml +.pdm-python +.pdm-build/ + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# pytype static type analyzer +.pytype/ + +# Cython debug symbols +cython_debug/ + +# PyCharm +# JetBrains specific template is maintained in a separate JetBrains.gitignore that can +# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore +# and can be added to the global gitignore or merged into this file. For a more nuclear +# option (not recommended) you can uncomment the following to ignore the entire idea folder. +#.idea/ diff --git a/dap/algos/__init__.py b/dap/algos/__init__.py index 064c713..2b5f92e 100644 --- a/dap/algos/__init__.py +++ b/dap/algos/__init__.py @@ -1,4 +1,12 @@ -from .radprof import prepare_radial_profile, radial_profile +from .addmask import calc_apply_additional_mask +from .aggregation import calc_apply_aggregation +from .jfdata import JFData +from .mask import calc_mask_pixels +from .peakfind import calc_peakfinder_analysis +from .radprof import calc_radial_integration +from .roi import calc_roi +from .spiana import calc_spi_analysis +from .thresh import calc_apply_threshold diff --git a/dap/algos/addmask.py b/dap/algos/addmask.py new file mode 100644 index 0000000..42c9842 --- /dev/null +++ b/dap/algos/addmask.py @@ -0,0 +1,64 @@ +#TODO: find a better way to handle this + +def calc_apply_additional_mask(results, pixel_mask_pf): + apply_additional_mask = results.get("apply_additional_mask", False) + if not apply_additional_mask: + return + + detector_name = results.get("detector_name", None) + if not detector_name: + return + + if detector_name == "JF06T08V05": + # edge pixels + pixel_mask_pf[0:1030, 1100] = 0 + pixel_mask_pf[0:1030, 1613] = 0 + pixel_mask_pf[0:1030, 1650] = 0 + + pixel_mask_pf[67, 0:1063] = 0 + + pixel_mask_pf[67:1097, 513] = 0 + pixel_mask_pf[67:1097, 550] = 0 + pixel_mask_pf[67:1097, 1063] = 0 + + pixel_mask_pf[1029, 1100:1614] = 0 + pixel_mask_pf[1029, 1650:2163] = 0 + + pixel_mask_pf[1039, 1168:1682] = 0 + pixel_mask_pf[1039, 1718:2230] = 0 + + pixel_mask_pf[1039:2069, 1168] = 0 + pixel_mask_pf[1039:2069, 1681] = 0 + pixel_mask_pf[1039:2069, 1718] = 0 + + pixel_mask_pf[1096, 0:513] = 0 + pixel_mask_pf[1096, 550:1064] = 0 + + pixel_mask_pf[1106, 68:582] = 0 + pixel_mask_pf[1106, 618:1132] = 0 + + pixel_mask_pf[1106:2136, 581] = 0 + pixel_mask_pf[1106:2136, 618] = 0 + pixel_mask_pf[1106:2136, 1131] = 0 + + pixel_mask_pf[2068, 1168:2232] = 0 + + # first bad region in left bottom inner module + pixel_mask_pf[842:1097, 669:671] = 0 + + # second bad region in left bottom inner module + pixel_mask_pf[1094, 620:807] = 0 + + # vertical line in upper left bottom module + pixel_mask_pf[842:1072, 87:90] = 0 + + # horizontal line? + pixel_mask_pf[1794, 1503:1550] = 0 + + + elif detector_name == "JF17T16V01": + # mask module 11 + pixel_mask_pf[2619:3333,1577:2607] = 0 + + + diff --git a/dap/algos/aggregation.py b/dap/algos/aggregation.py new file mode 100644 index 0000000..6281fbc --- /dev/null +++ b/dap/algos/aggregation.py @@ -0,0 +1,59 @@ +from .mask import calc_mask_pixels +from .thresh import threshold + + +def calc_apply_aggregation(results, data, pixel_mask_pf, aggregator): + # last round was ready, restart + if aggregator.is_ready(): + aggregator.reset() + + calc_apply_threshold(results, data) # changes data in place + data = calc_aggregate(results, data, aggregator) + calc_mask_pixels(data, pixel_mask_pf) # changes data in place + + aggregator.nmax = results.get("aggregation_max") + aggregation_is_ready = aggregator.is_ready() + + return data, aggregation_is_ready + + + +#TODO: this is duplicated in calc_apply_threshold and calc_radial_integration +def calc_apply_threshold(results, data): + apply_threshold = results.get("apply_threshold", False) + if not apply_threshold: + return + + for k in ("threshold_min", "threshold_max"): + if k not in results: + return + + threshold_min = float(results["threshold_min"]) + threshold_max = float(results["threshold_max"]) + + threshold(data, threshold_min, threshold_max, 0) + + + +def calc_aggregate(results, data, aggregator): + apply_aggregation = results.get("apply_aggregation", False) + if not apply_aggregation: + aggregator.reset() + return data + + if "aggregation_max" not in results: + aggregator.reset() + return data + + aggregator += data + + data = aggregator.data + n_aggregated_images = aggregator.counter + + results["aggregated_images"] = n_aggregated_images + results["worker"] = 1 #TODO: keep this for backwards compatibility? + + return data + + + diff --git a/dap/algos/jfdata.py b/dap/algos/jfdata.py new file mode 100644 index 0000000..bd2fa1e --- /dev/null +++ b/dap/algos/jfdata.py @@ -0,0 +1,72 @@ +import numpy as np + +import jungfrau_utils as ju + +from .addmask import calc_apply_additional_mask + + +class JFData: + + def __init__(self): + self.ju_stream_adapter = ju.StreamAdapter() + self.pedestal_name_saved = None + self.id_pixel_mask_corrected = None + self.pixel_mask_pf = None + + + def ensure_current_pixel_mask(self, pedestal_name): + if pedestal_name is None: + return + + new_pedestal_name = pedestal_name + old_pedestal_name = self.pedestal_name_saved + if new_pedestal_name == old_pedestal_name: + return + + self.refresh_pixel_mask() + self.pedestal_name_saved = pedestal_name + + + def refresh_pixel_mask(self): + pixel_mask_current = self.ju_stream_adapter.handler.pixel_mask + self.ju_stream_adapter.handler.pixel_mask = pixel_mask_current + + + def process(self, image, metadata, double_pixels): + data = self.ju_stream_adapter.process(image, metadata, double_pixels=double_pixels) + + # pedestal and gain files are loaded in process(), this check needs to be afterwards + if not self.ju_stream_adapter.handler.can_convert(): + return None + + data = np.ascontiguousarray(data) + return data + + + def get_pixel_mask(self, results, double_pixels): + pixel_mask_corrected = self.ju_stream_adapter.handler.get_pixel_mask(double_pixels=double_pixels) + if pixel_mask_corrected is None: + self.id_pixel_mask_corrected = None + self.pixel_mask_pf = None + return None + + # starting from ju 3.3.1 pedestal file is cached in library, re-calculated only if parameters (and/or pedestal file) have changed + new_id_pixel_mask_corrected = id(pixel_mask_corrected) + old_id_pixel_mask_corrected = self.id_pixel_mask_corrected + if new_id_pixel_mask_corrected == old_id_pixel_mask_corrected: + return self.pixel_mask_pf + + pixel_mask_pf = np.ascontiguousarray(pixel_mask_corrected) + calc_apply_additional_mask(results, pixel_mask_pf) # changes pixel_mask_pf in place + + self.id_pixel_mask_corrected = new_id_pixel_mask_corrected + self.pixel_mask_pf = pixel_mask_pf + + return pixel_mask_pf + + + def get_saturated_pixels(self, image, double_pixels): + return self.ju_stream_adapter.handler.get_saturated_pixels(image, double_pixels=double_pixels) + + + diff --git a/dap/algos/mask.py b/dap/algos/mask.py new file mode 100644 index 0000000..7ba422f --- /dev/null +++ b/dap/algos/mask.py @@ -0,0 +1,11 @@ +import numpy as np + + +def calc_mask_pixels(data, pixel_mask_pf): + if pixel_mask_pf is None: + return + + data[~pixel_mask_pf] = np.nan + + + diff --git a/dap/algos/peakfind.py b/dap/algos/peakfind.py new file mode 100644 index 0000000..6667d87 --- /dev/null +++ b/dap/algos/peakfind.py @@ -0,0 +1,67 @@ +from copy import copy + +import numpy as np + +from peakfinder8_extension import peakfinder_8 + + +def calc_peakfinder_analysis(results, data, pixel_mask_pf): + do_peakfinder_analysis = results.get("do_peakfinder_analysis", False) + if not do_peakfinder_analysis: + return + + if pixel_mask_pf is None: + return + + for k in ("beam_center_x", "beam_center_y", "hitfinder_min_snr", "hitfinder_min_pix_count", "hitfinder_adc_thresh"): + if k not in results: + return + + # to coordinates where position of first pixel/point is (0.5, 0.5) + x_beam = results["beam_center_x"] - 0.5 + y_beam = results["beam_center_y"] - 0.5 + + hitfinder_min_snr = results["hitfinder_min_snr"] + hitfinder_min_pix_count = int(results["hitfinder_min_pix_count"]) + hitfinder_adc_thresh = results["hitfinder_adc_thresh"] + + asic_ny, asic_nx = data.shape + nasics_y, nasics_x = 1, 1 + hitfinder_max_pix_count = 100 + max_num_peaks = 10000 + + # usually don't need to change this value, rather robust + hitfinder_local_bg_radius = 20. + + y, x = np.indices(data.shape) + pix_r = np.sqrt((x - x_beam)**2 + (y - y_beam)**2) + + peak_list_x, peak_list_y, peak_list_value = peakfinder_8( + max_num_peaks, + data.astype(np.float32), + pixel_mask_pf.astype(np.int8), + pix_r.astype(np.float32), + asic_nx, asic_ny, + nasics_x, nasics_y, + hitfinder_adc_thresh, + hitfinder_min_snr, + hitfinder_min_pix_count, + hitfinder_max_pix_count, + hitfinder_local_bg_radius + ) + + number_of_spots = len(peak_list_x) + results["number_of_spots"] = number_of_spots + + # to coordinates where position of first pixel/point is (1, 1) + results["spot_x"] = [x + 0.5 for x in peak_list_x] + results["spot_y"] = [y + 0.5 for y in peak_list_y] + results["spot_intensity"] = copy(peak_list_value) #TODO: why is this copy needed? + + npeaks_threshold_hit = results.get("npeaks_threshold_hit", 15) + + if number_of_spots >= npeaks_threshold_hit: + results["is_hit_frame"] = True + + + diff --git a/dap/algos/radprof.py b/dap/algos/radprof.py index bad21f8..42bc268 100644 --- a/dap/algos/radprof.py +++ b/dap/algos/radprof.py @@ -1,23 +1,86 @@ import numpy as np +from .thresh import threshold +from .utils import npmemo -def radial_profile(data, r, nr, keep_pixels=None): + +def calc_radial_integration(results, data, pixel_mask_pf): + do_radial_integration = results.get("do_radial_integration", False) + if not do_radial_integration: + return + + center_x = results["beam_center_x"] + center_y = results["beam_center_y"] + + rad, norm = prepare_radial_profile(data.shape, center_x, center_y, pixel_mask_pf) + + r_min = min(rad) + r_max = max(rad) + 1 + + data = calc_apply_threshold(results, data) + + rp = radial_profile(data, rad, norm, pixel_mask_pf) + + silent_min = results.get("radial_integration_silent_min", None) + silent_max = results.get("radial_integration_silent_max", None) + + if ( + silent_min is not None and + silent_max is not None and + #TODO: skipping entirely is a guess, but not obvious -- better to ensure the order min < max by switching them if needed + silent_max > silent_min and + silent_min > r_min and + silent_max < r_max + ): + silent_region = rp[silent_min:silent_max] + integral_silent_region = np.sum(silent_region) + rp = rp / integral_silent_region + results["radint_normalised"] = [silent_min, silent_max] + + results["radint_I"] = rp[r_min:].tolist() #TODO: why not stop at r_max? + results["radint_q"] = [r_min, r_max] + + + +@npmemo +def prepare_radial_profile(shape, x0, y0, keep_pixels): + y, x = np.indices(shape) + rad = np.sqrt((x - x0)**2 + (y - y0)**2) if keep_pixels is not None: - tbin = np.bincount(r, data[keep_pixels].ravel()) - else: - tbin = np.bincount(r, data.ravel()) - radialprofile = tbin / nr - return radialprofile + rad = rad[keep_pixels] + rad = rad.astype(int).ravel() + norm = np.bincount(rad) + return rad, norm -def prepare_radial_profile(data, center, keep_pixels=None): - y, x = np.indices((data.shape)) - r = np.sqrt((x - center[0])**2 + (y - center[1])**2) + +def radial_profile(data, rad, norm, keep_pixels): if keep_pixels is not None: - r = r[keep_pixels].astype(int).ravel() - else: - r = r.astype(np.int).ravel() - nr = np.bincount(r) - return r, nr + data = data[keep_pixels] + data = data.ravel() + tbin = np.bincount(rad, data) + rp = tbin / norm + return rp + + + +#TODO: this is duplicated in calc_apply_threshold and calc_force_send +def calc_apply_threshold(results, data): + apply_threshold = results.get("apply_threshold", False) + if not apply_threshold: + return data + + for k in ("threshold_min", "threshold_max"): + if k not in results: + return data + + data = data.copy() # do the following in-place changes on a copy + + threshold_min = float(results["threshold_min"]) + threshold_max = float(results["threshold_max"]) + + threshold(data, threshold_min, threshold_max, np.nan) + + return data diff --git a/dap/algos/roi.py b/dap/algos/roi.py new file mode 100644 index 0000000..451416b --- /dev/null +++ b/dap/algos/roi.py @@ -0,0 +1,56 @@ +import numpy as np + + +def calc_roi(results, data, pixel_mask_pf): + #TODO: why is this checked here? + if pixel_mask_pf is None: + return + + for k in ("roi_x1", "roi_x2", "roi_y1", "roi_y2"): + if k not in results: + return + + roi_x1 = results["roi_x1"] + roi_x2 = results["roi_x2"] + roi_y1 = results["roi_y1"] + roi_y2 = results["roi_y2"] + + if len(roi_x1) == 0: + return + + if not (len(roi_x1) == len(roi_x2) == len(roi_y1) == len(roi_y2)): + return + + threshold_value = results.get("threshold_value", "NaN") + + roi_intensities = [] + roi_intensities_normalised = [] + roi_intensities_x = [] + roi_intensities_proj_x = [] + + for ix1, ix2, iy1, iy2 in zip(roi_x1, roi_x2, roi_y1, roi_y2): + data_roi = data[iy1:iy2, ix1:ix2] + + roi_sum = np.nansum(data_roi, dtype=float) # data_roi is np.float32, which cannot be json serialized + + if threshold_value == "NaN": + roi_area = (ix2 - ix1) * (iy2 - iy1) + roi_sum_norm = roi_sum / roi_area + else: + roi_sum_norm = np.nanmean(data_roi, dtype=float) # data_roi is np.float32, which cannot be json serialized + + roi_indices_x = [ix1, ix2] + roi_proj_x = np.nansum(data_roi, axis=0).tolist() + + roi_intensities.append(roi_sum) + roi_intensities_normalised.append(roi_sum_norm) + roi_intensities_x.append(roi_indices_x) + roi_intensities_proj_x.append(roi_proj_x) + + results["roi_intensities"] = roi_intensities + results["roi_intensities_normalised"] = roi_intensities_normalised + results["roi_intensities_x"] = roi_intensities_x + results["roi_intensities_proj_x"] = roi_intensities_proj_x + + + diff --git a/dap/algos/spiana.py b/dap/algos/spiana.py new file mode 100644 index 0000000..593aaf7 --- /dev/null +++ b/dap/algos/spiana.py @@ -0,0 +1,32 @@ + +def calc_spi_analysis(results): + do_spi_analysis = results.get("do_spi_analysis", False) + if not do_spi_analysis: + return + + for k in ("spi_limit", "roi_intensities_normalised"): + if k not in results: + return + + spi_limit = results["spi_limit"] + roi_intensities_normalised = results["roi_intensities_normalised"] + + if len(spi_limit) != 2: + return + + if len(roi_intensities_normalised) < 2: + return + + number_of_spots = 0 + if roi_intensities_normalised[0] >= spi_limit[0]: + number_of_spots += 25 + if roi_intensities_normalised[1] >= spi_limit[1]: + number_of_spots += 50 + + results["number_of_spots"] = number_of_spots + + if number_of_spots > 0: + results["is_hit_frame"] = True + + + diff --git a/dap/algos/thresh.py b/dap/algos/thresh.py new file mode 100644 index 0000000..69da60e --- /dev/null +++ b/dap/algos/thresh.py @@ -0,0 +1,34 @@ +import numpy as np + + +#TODO: this is duplicated in calc_radial_integration and calc_force_send +def calc_apply_threshold(results, data): + apply_threshold = results.get("apply_threshold", False) + if not apply_threshold: + return + + for k in ("threshold_min", "threshold_max"): + if k not in results: + return + + threshold_value_choice = results.get("threshold_value", "NaN") + threshold_value = 0 if threshold_value_choice == "0" else np.nan #TODO + + threshold_min = float(results["threshold_min"]) + threshold_max = float(results["threshold_max"]) + + threshold(data, threshold_min, threshold_max, threshold_value) + + + +def threshold(data, vmin, vmax, replacement): + """ + threshold data in place by replacing values < vmin and values > vmax with replacement + """ + data[data < vmin] = replacement + #TODO: skipping max is a guess, but not obvious/symmetric -- better to ensure the order min < max by switching them if needed + if vmax > vmin: + data[data > vmax] = replacement + + + diff --git a/dap/algos/utils/__init__.py b/dap/algos/utils/__init__.py new file mode 100644 index 0000000..9efc8a4 --- /dev/null +++ b/dap/algos/utils/__init__.py @@ -0,0 +1,4 @@ + +from .npmemo import npmemo + + diff --git a/dap/algos/utils/npmemo.py b/dap/algos/utils/npmemo.py new file mode 100644 index 0000000..0711f3e --- /dev/null +++ b/dap/algos/utils/npmemo.py @@ -0,0 +1,87 @@ +import functools +#import hashlib +import numpy as np + + +def npmemo(func): + """ + numpy array aware memoizer with size limit + """ + maxsize = 10 + order = [] + cache = {} + + @functools.wraps(func) + def wrapper(*args): + key = make_key(args) + try: + return cache[key] + except KeyError: + if len(cache) >= maxsize: + oldest = order.pop(0) + cache.pop(oldest) + order.append(key) + cache[key] = res = func(*args) + return res + +# wrapper.cache = cache + return wrapper + + +def make_key(args): + return tuple(make_key_entry(i) for i in args) + +def make_key_entry(x): + if isinstance(x, np.ndarray): + return np_array_hash(x) + return x + +def np_array_hash(arr): +# return id(arr) # this has been used so far + res = arr.tobytes() +# res = hashlib.sha256(res).hexdigest() # if tobytes was too large, we could hash it +# res = (arr.shape, res) # tobytes does not take shape into account + return res + + + + + +if __name__ == "__main__": + @npmemo + def expensive(arr, offset): + print("recalc", arr, offset) + return np.dot(arr, arr) + offset + + def test(arr, offset): + print("first") + res1 = expensive(arr, offset) + print("second") + res2 = expensive(arr, offset) + print() + assert np.array_equal(res1, res2) + + arrays = ( + [1, 2, 3], + [1, 2, 3, 4], + [1, 2, 3, 4] + ) + + offsets = ( + 2, + 2, + 5 + ) + + for a, o in zip(arrays, offsets): + a = np.array(a) + test(a, o) + + for a, o in zip(arrays, offsets): + a = np.array(a) + test(a, o) + +# print(expensive.cache) + + + diff --git a/dap/utils/__init__.py b/dap/utils/__init__.py new file mode 100644 index 0000000..c195111 --- /dev/null +++ b/dap/utils/__init__.py @@ -0,0 +1,7 @@ + +from .aggregator import Aggregator +from .bits import read_bit +from .bufjson import BufferedJSON +from .randskip import randskip + + diff --git a/dap/utils/aggregator.py b/dap/utils/aggregator.py new file mode 100644 index 0000000..03f7ba8 --- /dev/null +++ b/dap/utils/aggregator.py @@ -0,0 +1,32 @@ + +class Aggregator: + + def __init__(self): + self.reset() + + def reset(self): + self.data = None + self.counter = 0 + self.nmax = None + + def add(self, item): + if self.data is None: + self.data = item.copy() + self.counter = 1 + else: + self.data += item + self.counter += 1 + return self + + __iadd__ = add + + def is_ready(self): + if self.nmax is None: + return False + return (self.counter >= self.nmax) + + def __repr__(self): + return f"{self.data!r} # ({self.counter} / {self.nmax})" + + + diff --git a/dap/utils/bits.py b/dap/utils/bits.py new file mode 100644 index 0000000..070d9ef --- /dev/null +++ b/dap/utils/bits.py @@ -0,0 +1,9 @@ + +def read_bit(bits, n): + """ + read the n-th bit from bits as boolean + """ + return bool((bits >> n) & 1) + + + diff --git a/dap/utils/bufjson.py b/dap/utils/bufjson.py new file mode 100644 index 0000000..7e6fcfe --- /dev/null +++ b/dap/utils/bufjson.py @@ -0,0 +1,50 @@ +import json +import os +from time import sleep + + +class BufferedJSON: + + def __init__(self, fname): + self.fname = fname + self.last_time = self.get_time() + self.last_data = self.get_data() + + + def load(self): + current_time = self.get_time() + time_delta = current_time - self.last_time + if time_delta <= 2: #TODO: is that a good time? + return self.last_data + + #TODO: logging for change? + sleep(0.5) #TODO: why? + current_data = self.get_data() + self.last_time = current_time + self.last_data = current_data + return current_data + + + def get_time(self): + if not self.exists(): + return -1 + return os.path.getmtime(self.fname) + + def get_data(self, *args, **kwargs): + if not self.exists(): + return {} + return json_load(self.fname, *args, **kwargs) + + def exists(self): + if not self.fname: + return False + return os.path.exists(self.fname) + + + +def json_load(filename, *args, **kwargs): + with open(filename, "r") as f: + return json.load(f, *args, **kwargs) + + + diff --git a/dap/utils/randskip.py b/dap/utils/randskip.py new file mode 100644 index 0000000..3870b14 --- /dev/null +++ b/dap/utils/randskip.py @@ -0,0 +1,39 @@ +from random import randint + + +def randskip(skip_rate): + return (randint(1, skip_rate) != 1) + + +# from randint docs: +# randint(a, b) +# Return random integer in range [a, b], including both end points. + +# thus: +# randskip(1) -> False 100% of times (never skip) +# randskip(10) -> False 10% of times (skip 90%) +# randskip(100) -> False 1% of times (skip 99%) + +#TODO: does this actually make sense? +# the following seems much clearer: + + +#from random import random + +#def randskip(percentage): +# """ +# Return True percentage % of times +# Return False (100 - percentage) % of times +# """ +# return 100 * random() < percentage + + +## thus: +# randskip(0) -> False 100% of times (never skip) +# randskip(1) -> False 99% of times (skip 1%) +# randskip(10) -> False 10% of times (skip 90%) +# randskip(99) -> False 1% of times (skip 99%) +# randskip(100) -> False 0% of times (always skip) + + + diff --git a/dap/worker.py b/dap/worker.py index 066d0f9..63fdca3 100644 --- a/dap/worker.py +++ b/dap/worker.py @@ -1,20 +1,10 @@ import argparse -import json -import os -from copy import copy -from random import randint -from time import sleep -import jungfrau_utils as ju import numpy as np -import zmq -from peakfinder8_extension import peakfinder_8 - -from algos import prepare_radial_profile, radial_profile - - -FLAGS = 0 +from algos import calc_apply_aggregation, calc_apply_threshold, calc_mask_pixels, calc_peakfinder_analysis, calc_radial_integration, calc_roi, calc_spi_analysis, JFData +from utils import Aggregator, BufferedJSON, randskip, read_bit +from zmqsocks import ZMQSockets def main(): @@ -46,434 +36,111 @@ def main(): def work(backend_address, accumulator_host, accumulator_port, visualisation_host, visualisation_port, fn_peakfinder_parameters, skip_frames_rate): - peakfinder_parameters = {} - peakfinder_parameters_time = -1 - if fn_peakfinder_parameters is not None and os.path.exists(fn_peakfinder_parameters): - with open(fn_peakfinder_parameters, "r") as read_file: - peakfinder_parameters = json.load(read_file) - peakfinder_parameters_time = os.path.getmtime(fn_peakfinder_parameters) + bj_peakfinder_parameters = BufferedJSON(fn_peakfinder_parameters) - pulseid = 0 + jfdata = JFData() - ju_stream_adapter = ju.StreamAdapter() + zmq_socks = ZMQSockets(backend_address, accumulator_host, accumulator_port, visualisation_host, visualisation_port) - zmq_context = zmq.Context(io_threads=4) - poller = zmq.Poller() + aggregator = Aggregator() -# all the normal workers - worker = 1 - -# receive from backend: - backend_socket = zmq_context.socket(zmq.PULL) - backend_socket.connect(backend_address) - - poller.register(backend_socket, zmq.POLLIN) - - accumulator_socket = zmq_context.socket(zmq.PUSH) - accumulator_socket.connect(f"tcp://{accumulator_host}:{accumulator_port}") - - visualisation_socket = zmq_context.socket(zmq.PUB) - visualisation_socket.connect(f"tcp://{visualisation_host}:{visualisation_port}") - -# in case of problem with communication to visualisation, keep in 0mq buffer only few messages - visualisation_socket.set_hwm(10) - - keep_pixels = None - r_radial_integration = None - center_radial_integration = None - - results = {} - - pedestal_file_name_saved = None - - pixel_mask_corrected = None - pixel_mask_pf = None - - n_aggregated_images = 1 - data_summed = None while True: + if not zmq_socks.has_data(): + continue + + raw_image, metadata = zmq_socks.get_data() + + if metadata["shape"] == [2, 2]: # this is used as marker for empty images + continue + + + pulse_id = metadata.get("pulse_id", 0) -# check if peakfinder parameters changed and then re-read it try: - if peakfinder_parameters_time > 0: - new_time = os.path.getmtime(fn_peakfinder_parameters) - time_delta = new_time - peakfinder_parameters_time - if time_delta > 2.0: - old_peakfinder_parameters = peakfinder_parameters - sleep(0.5) - with open(fn_peakfinder_parameters, "r") as read_file: - peakfinder_parameters = json.load(read_file) - peakfinder_parameters_time = new_time - center_radial_integration = None - if worker == 0: - print(f"({pulseid}) update peakfinder parameters {old_peakfinder_parameters}", flush=True) - print(f" --> {peakfinder_parameters}", flush=True) - print("",flush=True) + peakfinder_parameters = bj_peakfinder_parameters.load() except Exception as e: - print(f"({pulseid}) problem ({e}) to read peakfinder parameters file, worker : {worker}", flush=True) + print(f"({pulse_id}) cannot read peakfinder parameters file: {e}", flush=True) #TODO: logging? - events = dict(poller.poll(2000)) # check every 2 seconds in each worker - if backend_socket not in events: - continue - metadata = backend_socket.recv_json(FLAGS) - image = backend_socket.recv(FLAGS, copy=False, track=False) - image = np.frombuffer(image, dtype=metadata["type"]).reshape(metadata["shape"]) - - results = copy(metadata) - if results["shape"][0] == 2 and results["shape"][1] == 2: - continue - - pulseid = results.get("pulse_id", 0) + results = metadata.copy() results.update(peakfinder_parameters) - detector = results.get("detector_name", "") - - results["laser_on"] = False results["number_of_spots"] = 0 results["is_hit_frame"] = False + daq_rec = results.get("daq_rec", 0) - event_laser = bool((daq_rec >> 16) & 1) - event_darkshot = bool((daq_rec >> 17) & 1) -# event_fel = bool((daq_rec >> 18) & 1) - event_ppicker = bool((daq_rec >> 19) & 1) + event_laser = read_bit(daq_rec, 16) + event_darkshot = read_bit(daq_rec, 17) +# event_fel = read_bit(daq_rec, 18) + event_ppicker = read_bit(daq_rec, 19) - if not event_darkshot: - results["laser_on"] = event_laser + results["laser_on"] = event_laser and not event_darkshot -# Filter only ppicker events, if requested; skipping all other events + # if requested, filter on ppicker events by skipping other events select_only_ppicker_events = results.get("select_only_ppicker_events", False) if select_only_ppicker_events and not event_ppicker: continue -# special settings for p20270, only few shots were opened by pulse-picker -# if detector in ["JF06T32V02"]: -# if event_ppicker: -# results["number_of_spots"] = 50 -# results["is_hit_frame"] = True + + pedestal_name = metadata.get("pedestal_name", None) + + jfdata.ensure_current_pixel_mask(pedestal_name) double_pixels = results.get("double_pixels", "mask") - pedestal_file_name = metadata.get("pedestal_name", None) - if pedestal_file_name is not None and pedestal_file_name != pedestal_file_name_saved: - pixel_mask_current = ju_stream_adapter.handler.pixel_mask - ju_stream_adapter.handler.pixel_mask = pixel_mask_current - pedestal_file_name_saved = pedestal_file_name + image = jfdata.process(raw_image, metadata, double_pixels) - data = ju_stream_adapter.process(image, metadata, double_pixels=double_pixels) - - # pedestal file is not in stream, skip this frame - if ju_stream_adapter.handler.pedestal_file is None or ju_stream_adapter.handler.pedestal_file == "": + if image is None: continue - data = np.ascontiguousarray(data) + pixel_mask_pf = jfdata.get_pixel_mask(results, double_pixels) - # starting from ju 3.3.1 pedestal file is cached in library, re-calculated only if parameters (and/or pedestal file) are changed - id_pixel_mask_1 = id(pixel_mask_corrected) - pixel_mask_corrected = ju_stream_adapter.handler.get_pixel_mask(geometry=True, gap_pixels=True, double_pixels=double_pixels) - id_pixel_mask_2 = id(pixel_mask_corrected) - - if id_pixel_mask_1 != id_pixel_mask_2: - keep_pixels = None - r_radial_integration = None - if pixel_mask_corrected is not None: - #pixel_mask_corrected = np.ascontiguousarray(pixel_mask_corrected) - pixel_mask_pf = np.ascontiguousarray(pixel_mask_corrected).astype(np.int8, copy=False) - - else: - pixel_mask_pf = None - -# add additional mask at the edge of modules for JF06T08 - apply_additional_mask = results.get("apply_additional_mask", False) - if detector == "JF06T08V04" and apply_additional_mask: - # edge pixels - pixel_mask_pf[67:1097,1063] = 0 - pixel_mask_pf[0:1030, 1100] = 0 - - pixel_mask_pf[1106:2136, 1131] = 0 - pixel_mask_pf[1039:2069, 1168] = 0 - - pixel_mask_pf[1039:2069, 1718] = 0 - pixel_mask_pf[1039:2069, 1681] = 0 - - pixel_mask_pf[1106:2136, 618] = 0 - - pixel_mask_pf[1106:2136, 581] = 0 - - pixel_mask_pf[67:1097,513] = 0 - - pixel_mask_pf[67:1097, 550] = 0 - - pixel_mask_pf[0:1030, 1650] = 0 - - pixel_mask_pf[0:1030, 1613] = 0 - - pixel_mask_pf[1106, 68:582] = 0 - - pixel_mask_pf[1096, 550:1064] = 0 - pixel_mask_pf[1106, 618:1132] = 0 - - pixel_mask_pf[1029, 1100:1614] = 0 - pixel_mask_pf[1039, 1168:1682] = 0 - - pixel_mask_pf[1039, 1718:2230] = 0 - - pixel_mask_pf[1096, 0:513] = 0 - - pixel_mask_pf[1029, 1650:2163] = 0 - - pixel_mask_pf[2068, 1168:2232] = 0 - - pixel_mask_pf[67,0:1063] = 0 - - #bad region in left bottom inner module - pixel_mask_pf[842:1097, 669:671] = 0 - - #second bad region in left bottom inner module - pixel_mask_pf[1094, 620:807] = 0 - - # vertical line in upper left bottom module - pixel_mask_pf[842:1072, 87:90] = 0 - - pixel_mask_pf[1794, 1503:1550] = 0 - - if detector == "JF17T16V01" and apply_additional_mask: - # mask module 11 - pixel_mask_pf[2619:3333,1577:2607] = 0 - - if pixel_mask_corrected is not None: - data_s = copy(image) - saturated_pixels_coordinates = ju_stream_adapter.handler.get_saturated_pixels(data_s, mask=True, geometry=True, gap_pixels=True, double_pixels=double_pixels) - results["saturated_pixels"] = len(saturated_pixels_coordinates[0]) - results["saturated_pixels_x"] = saturated_pixels_coordinates[1].tolist() - results["saturated_pixels_y"] = saturated_pixels_coordinates[0].tolist() - -# pump probe analysis - do_radial_integration = results.get("do_radial_integration", False) - - if do_radial_integration: - - data_copy_1 = np.copy(data) - - if keep_pixels is None and pixel_mask_pf is not None: - keep_pixels = pixel_mask_pf!=0 - if center_radial_integration is None: - center_radial_integration = [results["beam_center_x"], results["beam_center_y"]] - r_radial_integration = None - if r_radial_integration is None: - r_radial_integration, nr_radial_integration = prepare_radial_profile(data_copy_1, center_radial_integration, keep_pixels) - r_min_max = [int(np.min(r_radial_integration)), int(np.max(r_radial_integration)) + 1] - - - apply_threshold = results.get("apply_threshold", False) - - if apply_threshold and all(k in results for k in ("threshold_min", "threshold_max")): - threshold_min = float(results["threshold_min"]) - threshold_max = float(results["threshold_max"]) - data_copy_1[data_copy_1 < threshold_min] = np.nan - if threshold_max > threshold_min: - data_copy_1[data_copy_1 > threshold_max] = np.nan - - rp = radial_profile(data_copy_1, r_radial_integration, nr_radial_integration, keep_pixels) - - silent_region_min = results.get("radial_integration_silent_min", None) - silent_region_max = results.get("radial_integration_silent_max", None) - - if ( - silent_region_min is not None and - silent_region_max is not None and - silent_region_max > silent_region_min and - silent_region_min > r_min_max[0] and - silent_region_max < r_min_max[1] - ): - - integral_silent_region = np.sum(rp[silent_region_min:silent_region_max]) - rp = rp / integral_silent_region - results["radint_normalised"] = [silent_region_min, silent_region_max] - - results["radint_I"] = list(rp[r_min_max[0]:]) - results["radint_q"] = r_min_max - - #copy image to work with peakfinder, just in case - d = np.copy(data) - - # make all masked pixels values nans if pixel_mask_pf is not None: - d[pixel_mask_pf != 1] = np.nan - - apply_threshold = results.get("apply_threshold", False) - threshold_value_choice = results.get("threshold_value", "NaN") - threshold_value = 0 if threshold_value_choice == "0" else np.nan - if apply_threshold and all(k in results for k in ("threshold_min", "threshold_max")): - threshold_min = float(results["threshold_min"]) - threshold_max = float(results["threshold_max"]) - d[d < threshold_min] = threshold_value - if threshold_max > threshold_min: - d[d > threshold_max] = threshold_value - - # if roi calculation request is present, make it - roi_x1 = results.get("roi_x1", []) - roi_x2 = results.get("roi_x2", []) - roi_y1 = results.get("roi_y1", []) - roi_y2 = results.get("roi_y2", []) - - if len(roi_x1) > 0 and len(roi_x1) == len(roi_x2) and len(roi_x1) == len(roi_y1) and len(roi_x1) == len(roi_y2): - roi_results = [0] * len(roi_x1) - roi_results_normalised = [0] * len(roi_x1) - - if pixel_mask_pf is not None: - - results["roi_intensities_x"] = [] - results["roi_intensities_proj_x"] = [] - - for iRoi in range(len(roi_x1)): - data_roi = np.copy(d[roi_y1[iRoi]:roi_y2[iRoi], roi_x1[iRoi]:roi_x2[iRoi]]) - - roi_results[iRoi] = np.nansum(data_roi) - if threshold_value_choice == "NaN": - roi_results_normalised[iRoi] = roi_results[iRoi] / ((roi_y2[iRoi] - roi_y1[iRoi]) * (roi_x2[iRoi] - roi_x1[iRoi])) - else: - roi_results_normalised[iRoi] = np.nanmean(data_roi) - - results["roi_intensities_x"].append([roi_x1[iRoi], roi_x2[iRoi]]) - results["roi_intensities_proj_x"].append(np.nansum(data_roi,axis=0).tolist()) - - results["roi_intensities"] = [float(r) for r in roi_results] - results["roi_intensities_normalised"] = [float(r) for r in roi_results_normalised ] - -# SPI analysis - do_spi_analysis = results.get("do_spi_analysis", False) - - if do_spi_analysis and "roi_intensities_normalised" in results and len(results["roi_intensities_normalised"]) >= 2: - - if "spi_limit" in results and len(results["spi_limit"]) == 2: - - number_of_spots = 0 - if results["roi_intensities_normalised"][0] >= results["spi_limit"][0]: - number_of_spots += 25 - if results["roi_intensities_normalised"][1] >= results["spi_limit"][1]: - number_of_spots += 50 - - results["number_of_spots"] = number_of_spots - if number_of_spots > 0: - results["is_hit_frame"] = True - -# in case all needed parameters are present, make peakfinding - do_peakfinder_analysis = results.get("do_peakfinder_analysis", False) - if do_peakfinder_analysis and pixel_mask_pf is not None and all(k in results for k in ("beam_center_x", "beam_center_y", "hitfinder_min_snr", "hitfinder_min_pix_count", "hitfinder_adc_thresh")): - x_beam = results["beam_center_x"] - 0.5 # to coordinates where position of first pixel/point is 0.5, 0.5 - y_beam = results["beam_center_y"] - 0.5 # to coordinates where position of first pixel/point is 0.5, 0.5 - hitfinder_min_snr = results["hitfinder_min_snr"] - hitfinder_min_pix_count = int(results["hitfinder_min_pix_count"]) - hitfinder_adc_thresh = results["hitfinder_adc_thresh"] - - asic_ny, asic_nx = d.shape - nasics_y, nasics_x = 1, 1 - hitfinder_max_pix_count = 100 - max_num_peaks = 10000 - - # usually don't need to change this value, rather robust - hitfinder_local_bg_radius= 20. - - # in case of further modification with the mask, make a new one, independent from real mask - maskPr = np.copy(pixel_mask_pf) - - y, x = np.indices(d.shape) - pix_r = np.sqrt((x-x_beam)**2 + (y-y_beam)**2) - - peak_list_x, peak_list_y, peak_list_value = peakfinder_8( - max_num_peaks, - d.astype(np.float32), - maskPr.astype(np.int8), - pix_r.astype(np.float32), - asic_nx, asic_ny, - nasics_x, nasics_y, - hitfinder_adc_thresh, - hitfinder_min_snr, - hitfinder_min_pix_count, - hitfinder_max_pix_count, - hitfinder_local_bg_radius - ) + saturated_pixels_y, saturated_pixels_x = jfdata.get_saturated_pixels(raw_image, double_pixels) + results["saturated_pixels"] = len(saturated_pixels_x) + results["saturated_pixels_x"] = saturated_pixels_x.tolist() + results["saturated_pixels_y"] = saturated_pixels_y.tolist() - number_of_spots = len(peak_list_x) - results["number_of_spots"] = number_of_spots - if number_of_spots != 0: - results["spot_x"] = [-1.0] * number_of_spots - results["spot_y"] = [-1.0] * number_of_spots - results["spot_intensity"] = copy(peak_list_value) - for i in range(number_of_spots): - results["spot_x"][i] = peak_list_x[i] + 0.5 - results["spot_y"][i] = peak_list_y[i] + 0.5 - else: - results["spot_x"] = [] - results["spot_y"] = [] - results["spot_intensity"] = [] + calc_radial_integration(results, image, pixel_mask_pf) - npeaks_threshold_hit = results.get("npeaks_threshold_hit", 15) + pfimage = image.copy() #TODO: is this copy needed? - if number_of_spots >= npeaks_threshold_hit: - results["is_hit_frame"] = True + calc_mask_pixels(pfimage, pixel_mask_pf) # changes pfimage in place + calc_apply_threshold(results, pfimage) # changes pfimage in place + calc_roi(results, pfimage, pixel_mask_pf) + calc_spi_analysis(results) + calc_peakfinder_analysis(results, pfimage, pixel_mask_pf) - forceSendVisualisation = False - if data.dtype != np.uint16: - apply_threshold = results.get("apply_threshold", False) - apply_aggregation = results.get("apply_aggregation", False) - if not apply_aggregation: - data_summed = None - n_aggregated_images = 1 - if apply_threshold or apply_aggregation: - if apply_threshold and all(k in results for k in ("threshold_min", "threshold_max")): - threshold_min = float(results["threshold_min"]) - threshold_max = float(results["threshold_max"]) - data[data < threshold_min] = 0.0 - if threshold_max > threshold_min: - data[data > threshold_max] = 0.0 - if apply_aggregation and "aggregation_max" in results: - if data_summed is not None: - data += data_summed - n_aggregated_images += 1 - data_summed = data.copy() - data_summed[data == -np.nan] = -np.nan - results["aggregated_images"] = n_aggregated_images - results["worker"] = worker - if n_aggregated_images >= results["aggregation_max"]: - forceSendVisualisation = True - data_summed = None - n_aggregated_images = 1 - data[pixel_mask_pf == 0] = np.NaN +# ??? + image, aggregation_is_ready = calc_apply_aggregation(results, image, pixel_mask_pf, aggregator) - else: - data = image - - results["type"] = str(data.dtype) - results["shape"] = data.shape + results["type"] = str(image.dtype) + results["shape"] = image.shape - accumulator_socket.send_json(results, FLAGS) + zmq_socks.send_accumulator(results) - if apply_aggregation and "aggregation_max" in results: - if forceSendVisualisation: - visualisation_socket.send_json(results, FLAGS | zmq.SNDMORE) - visualisation_socket.send(data, FLAGS, copy=True, track=True) - else: - data = np.empty((2, 2), dtype=np.uint16) - results["type"] = str(data.dtype) - results["shape"] = data.shape - visualisation_socket.send_json(results, FLAGS | zmq.SNDMORE) - visualisation_socket.send(data, FLAGS, copy=True, track=True) - else: - if results["is_good_frame"] and (results["is_hit_frame"] or randint(1, skip_frames_rate) == 1): - visualisation_socket.send_json(results, FLAGS | zmq.SNDMORE) - visualisation_socket.send(data, FLAGS, copy=True, track=True) - else: - data = np.empty((2, 2), dtype=np.uint16) - results["type"] = str(data.dtype) - results["shape"] = data.shape - visualisation_socket.send_json(results, FLAGS | zmq.SNDMORE) - visualisation_socket.send(data, FLAGS, copy=True, track=True) + + apply_aggregation = results.get("apply_aggregation", False) + aggregation_is_enabled = (apply_aggregation and "aggregation_max" in results) + aggregation_is_enabled_but_not_ready = (aggregation_is_enabled and not aggregation_is_ready) + + is_bad_frame = (not results["is_good_frame"]) + + # hits are sent at full rate, but no-hits are sent at reduced frequency + is_no_hit_frame = (not results["is_hit_frame"]) + random_skip = randskip(skip_frames_rate) + is_no_hit_frame_and_skipped = (is_no_hit_frame and random_skip) + + if aggregation_is_enabled_but_not_ready or is_bad_frame or is_no_hit_frame_and_skipped: + image = np.empty((2, 2), dtype=np.uint16) + results["type"] = str(image.dtype) + results["shape"] = image.shape + + zmq_socks.send_visualisation(results, image) diff --git a/dap/zmqsocks.py b/dap/zmqsocks.py new file mode 100644 index 0000000..be25cc8 --- /dev/null +++ b/dap/zmqsocks.py @@ -0,0 +1,49 @@ +import numpy as np +import zmq + + +FLAGS = 0 + + +class ZMQSockets: + + def __init__(self, backend_address, accumulator_host, accumulator_port, visualisation_host, visualisation_port): + zmq_context = zmq.Context(io_threads=4) + self.poller = poller = zmq.Poller() + + # receive from backend: + self.backend_socket = backend_socket = zmq_context.socket(zmq.PULL) + backend_socket.connect(backend_address) + + poller.register(backend_socket, zmq.POLLIN) + + self.accumulator_socket = accumulator_socket = zmq_context.socket(zmq.PUSH) + accumulator_socket.connect(f"tcp://{accumulator_host}:{accumulator_port}") + + self.visualisation_socket = visualisation_socket = zmq_context.socket(zmq.PUB) + visualisation_socket.connect(f"tcp://{visualisation_host}:{visualisation_port}") + + # in case of problem with communication to visualisation, keep in 0mq buffer only few messages + visualisation_socket.set_hwm(10) + + + def has_data(self): + events = dict(self.poller.poll(2000)) # check every 2 seconds in each worker + return (self.backend_socket in events) + + def get_data(self): + metadata = self.backend_socket.recv_json(FLAGS) + image = self.backend_socket.recv(FLAGS, copy=False, track=False) + image = np.frombuffer(image, dtype=metadata["type"]).reshape(metadata["shape"]) + return image, metadata + + + def send_accumulator(self, results): + self.accumulator_socket.send_json(results, FLAGS) + + def send_visualisation(self, results, data): + self.visualisation_socket.send_json(results, FLAGS | zmq.SNDMORE) + self.visualisation_socket.send(data, FLAGS, copy=True, track=True) + + +