Merge branch 'refactor' into 'main'
refactor See merge request sf-daq/dap!1
This commit is contained in:
162
dap/.gitignore
vendored
Normal file
162
dap/.gitignore
vendored
Normal file
@ -0,0 +1,162 @@
|
||||
# Byte-compiled / optimized / DLL files
|
||||
__pycache__/
|
||||
*.py[cod]
|
||||
*$py.class
|
||||
|
||||
# C extensions
|
||||
*.so
|
||||
|
||||
# Distribution / packaging
|
||||
.Python
|
||||
build/
|
||||
develop-eggs/
|
||||
dist/
|
||||
downloads/
|
||||
eggs/
|
||||
.eggs/
|
||||
lib/
|
||||
lib64/
|
||||
parts/
|
||||
sdist/
|
||||
var/
|
||||
wheels/
|
||||
share/python-wheels/
|
||||
*.egg-info/
|
||||
.installed.cfg
|
||||
*.egg
|
||||
MANIFEST
|
||||
|
||||
# PyInstaller
|
||||
# Usually these files are written by a python script from a template
|
||||
# before PyInstaller builds the exe, so as to inject date/other infos into it.
|
||||
*.manifest
|
||||
*.spec
|
||||
|
||||
# Installer logs
|
||||
pip-log.txt
|
||||
pip-delete-this-directory.txt
|
||||
|
||||
# Unit test / coverage reports
|
||||
htmlcov/
|
||||
.tox/
|
||||
.nox/
|
||||
.coverage
|
||||
.coverage.*
|
||||
.cache
|
||||
nosetests.xml
|
||||
coverage.xml
|
||||
*.cover
|
||||
*.py,cover
|
||||
.hypothesis/
|
||||
.pytest_cache/
|
||||
cover/
|
||||
|
||||
# Translations
|
||||
*.mo
|
||||
*.pot
|
||||
|
||||
# Django stuff:
|
||||
*.log
|
||||
local_settings.py
|
||||
db.sqlite3
|
||||
db.sqlite3-journal
|
||||
|
||||
# Flask stuff:
|
||||
instance/
|
||||
.webassets-cache
|
||||
|
||||
# Scrapy stuff:
|
||||
.scrapy
|
||||
|
||||
# Sphinx documentation
|
||||
docs/_build/
|
||||
|
||||
# PyBuilder
|
||||
.pybuilder/
|
||||
target/
|
||||
|
||||
# Jupyter Notebook
|
||||
.ipynb_checkpoints
|
||||
|
||||
# IPython
|
||||
profile_default/
|
||||
ipython_config.py
|
||||
|
||||
# pyenv
|
||||
# For a library or package, you might want to ignore these files since the code is
|
||||
# intended to run in multiple environments; otherwise, check them in:
|
||||
# .python-version
|
||||
|
||||
# pipenv
|
||||
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
|
||||
# However, in case of collaboration, if having platform-specific dependencies or dependencies
|
||||
# having no cross-platform support, pipenv may install dependencies that don't work, or not
|
||||
# install all needed dependencies.
|
||||
#Pipfile.lock
|
||||
|
||||
# poetry
|
||||
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
|
||||
# This is especially recommended for binary packages to ensure reproducibility, and is more
|
||||
# commonly ignored for libraries.
|
||||
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
|
||||
#poetry.lock
|
||||
|
||||
# pdm
|
||||
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
|
||||
#pdm.lock
|
||||
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
|
||||
# in version control.
|
||||
# https://pdm.fming.dev/latest/usage/project/#working-with-version-control
|
||||
.pdm.toml
|
||||
.pdm-python
|
||||
.pdm-build/
|
||||
|
||||
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
|
||||
__pypackages__/
|
||||
|
||||
# Celery stuff
|
||||
celerybeat-schedule
|
||||
celerybeat.pid
|
||||
|
||||
# SageMath parsed files
|
||||
*.sage.py
|
||||
|
||||
# Environments
|
||||
.env
|
||||
.venv
|
||||
env/
|
||||
venv/
|
||||
ENV/
|
||||
env.bak/
|
||||
venv.bak/
|
||||
|
||||
# Spyder project settings
|
||||
.spyderproject
|
||||
.spyproject
|
||||
|
||||
# Rope project settings
|
||||
.ropeproject
|
||||
|
||||
# mkdocs documentation
|
||||
/site
|
||||
|
||||
# mypy
|
||||
.mypy_cache/
|
||||
.dmypy.json
|
||||
dmypy.json
|
||||
|
||||
# Pyre type checker
|
||||
.pyre/
|
||||
|
||||
# pytype static type analyzer
|
||||
.pytype/
|
||||
|
||||
# Cython debug symbols
|
||||
cython_debug/
|
||||
|
||||
# PyCharm
|
||||
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
|
||||
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
|
||||
# and can be added to the global gitignore or merged into this file. For a more nuclear
|
||||
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
|
||||
#.idea/
|
@ -1,4 +1,12 @@
|
||||
|
||||
from .radprof import prepare_radial_profile, radial_profile
|
||||
from .addmask import calc_apply_additional_mask
|
||||
from .aggregation import calc_apply_aggregation
|
||||
from .jfdata import JFData
|
||||
from .mask import calc_mask_pixels
|
||||
from .peakfind import calc_peakfinder_analysis
|
||||
from .radprof import calc_radial_integration
|
||||
from .roi import calc_roi
|
||||
from .spiana import calc_spi_analysis
|
||||
from .thresh import calc_apply_threshold
|
||||
|
||||
|
||||
|
64
dap/algos/addmask.py
Normal file
64
dap/algos/addmask.py
Normal file
@ -0,0 +1,64 @@
|
||||
#TODO: find a better way to handle this
|
||||
|
||||
def calc_apply_additional_mask(results, pixel_mask_pf):
|
||||
apply_additional_mask = results.get("apply_additional_mask", False)
|
||||
if not apply_additional_mask:
|
||||
return
|
||||
|
||||
detector_name = results.get("detector_name", None)
|
||||
if not detector_name:
|
||||
return
|
||||
|
||||
if detector_name == "JF06T08V05":
|
||||
# edge pixels
|
||||
pixel_mask_pf[0:1030, 1100] = 0
|
||||
pixel_mask_pf[0:1030, 1613] = 0
|
||||
pixel_mask_pf[0:1030, 1650] = 0
|
||||
|
||||
pixel_mask_pf[67, 0:1063] = 0
|
||||
|
||||
pixel_mask_pf[67:1097, 513] = 0
|
||||
pixel_mask_pf[67:1097, 550] = 0
|
||||
pixel_mask_pf[67:1097, 1063] = 0
|
||||
|
||||
pixel_mask_pf[1029, 1100:1614] = 0
|
||||
pixel_mask_pf[1029, 1650:2163] = 0
|
||||
|
||||
pixel_mask_pf[1039, 1168:1682] = 0
|
||||
pixel_mask_pf[1039, 1718:2230] = 0
|
||||
|
||||
pixel_mask_pf[1039:2069, 1168] = 0
|
||||
pixel_mask_pf[1039:2069, 1681] = 0
|
||||
pixel_mask_pf[1039:2069, 1718] = 0
|
||||
|
||||
pixel_mask_pf[1096, 0:513] = 0
|
||||
pixel_mask_pf[1096, 550:1064] = 0
|
||||
|
||||
pixel_mask_pf[1106, 68:582] = 0
|
||||
pixel_mask_pf[1106, 618:1132] = 0
|
||||
|
||||
pixel_mask_pf[1106:2136, 581] = 0
|
||||
pixel_mask_pf[1106:2136, 618] = 0
|
||||
pixel_mask_pf[1106:2136, 1131] = 0
|
||||
|
||||
pixel_mask_pf[2068, 1168:2232] = 0
|
||||
|
||||
# first bad region in left bottom inner module
|
||||
pixel_mask_pf[842:1097, 669:671] = 0
|
||||
|
||||
# second bad region in left bottom inner module
|
||||
pixel_mask_pf[1094, 620:807] = 0
|
||||
|
||||
# vertical line in upper left bottom module
|
||||
pixel_mask_pf[842:1072, 87:90] = 0
|
||||
|
||||
# horizontal line?
|
||||
pixel_mask_pf[1794, 1503:1550] = 0
|
||||
|
||||
|
||||
elif detector_name == "JF17T16V01":
|
||||
# mask module 11
|
||||
pixel_mask_pf[2619:3333,1577:2607] = 0
|
||||
|
||||
|
||||
|
59
dap/algos/aggregation.py
Normal file
59
dap/algos/aggregation.py
Normal file
@ -0,0 +1,59 @@
|
||||
from .mask import calc_mask_pixels
|
||||
from .thresh import threshold
|
||||
|
||||
|
||||
def calc_apply_aggregation(results, data, pixel_mask_pf, aggregator):
|
||||
# last round was ready, restart
|
||||
if aggregator.is_ready():
|
||||
aggregator.reset()
|
||||
|
||||
calc_apply_threshold(results, data) # changes data in place
|
||||
data = calc_aggregate(results, data, aggregator)
|
||||
calc_mask_pixels(data, pixel_mask_pf) # changes data in place
|
||||
|
||||
aggregator.nmax = results.get("aggregation_max")
|
||||
aggregation_is_ready = aggregator.is_ready()
|
||||
|
||||
return data, aggregation_is_ready
|
||||
|
||||
|
||||
|
||||
#TODO: this is duplicated in calc_apply_threshold and calc_radial_integration
|
||||
def calc_apply_threshold(results, data):
|
||||
apply_threshold = results.get("apply_threshold", False)
|
||||
if not apply_threshold:
|
||||
return
|
||||
|
||||
for k in ("threshold_min", "threshold_max"):
|
||||
if k not in results:
|
||||
return
|
||||
|
||||
threshold_min = float(results["threshold_min"])
|
||||
threshold_max = float(results["threshold_max"])
|
||||
|
||||
threshold(data, threshold_min, threshold_max, 0)
|
||||
|
||||
|
||||
|
||||
def calc_aggregate(results, data, aggregator):
|
||||
apply_aggregation = results.get("apply_aggregation", False)
|
||||
if not apply_aggregation:
|
||||
aggregator.reset()
|
||||
return data
|
||||
|
||||
if "aggregation_max" not in results:
|
||||
aggregator.reset()
|
||||
return data
|
||||
|
||||
aggregator += data
|
||||
|
||||
data = aggregator.data
|
||||
n_aggregated_images = aggregator.counter
|
||||
|
||||
results["aggregated_images"] = n_aggregated_images
|
||||
results["worker"] = 1 #TODO: keep this for backwards compatibility?
|
||||
|
||||
return data
|
||||
|
||||
|
||||
|
72
dap/algos/jfdata.py
Normal file
72
dap/algos/jfdata.py
Normal file
@ -0,0 +1,72 @@
|
||||
import numpy as np
|
||||
|
||||
import jungfrau_utils as ju
|
||||
|
||||
from .addmask import calc_apply_additional_mask
|
||||
|
||||
|
||||
class JFData:
|
||||
|
||||
def __init__(self):
|
||||
self.ju_stream_adapter = ju.StreamAdapter()
|
||||
self.pedestal_name_saved = None
|
||||
self.id_pixel_mask_corrected = None
|
||||
self.pixel_mask_pf = None
|
||||
|
||||
|
||||
def ensure_current_pixel_mask(self, pedestal_name):
|
||||
if pedestal_name is None:
|
||||
return
|
||||
|
||||
new_pedestal_name = pedestal_name
|
||||
old_pedestal_name = self.pedestal_name_saved
|
||||
if new_pedestal_name == old_pedestal_name:
|
||||
return
|
||||
|
||||
self.refresh_pixel_mask()
|
||||
self.pedestal_name_saved = pedestal_name
|
||||
|
||||
|
||||
def refresh_pixel_mask(self):
|
||||
pixel_mask_current = self.ju_stream_adapter.handler.pixel_mask
|
||||
self.ju_stream_adapter.handler.pixel_mask = pixel_mask_current
|
||||
|
||||
|
||||
def process(self, image, metadata, double_pixels):
|
||||
data = self.ju_stream_adapter.process(image, metadata, double_pixels=double_pixels)
|
||||
|
||||
# pedestal and gain files are loaded in process(), this check needs to be afterwards
|
||||
if not self.ju_stream_adapter.handler.can_convert():
|
||||
return None
|
||||
|
||||
data = np.ascontiguousarray(data)
|
||||
return data
|
||||
|
||||
|
||||
def get_pixel_mask(self, results, double_pixels):
|
||||
pixel_mask_corrected = self.ju_stream_adapter.handler.get_pixel_mask(double_pixels=double_pixels)
|
||||
if pixel_mask_corrected is None:
|
||||
self.id_pixel_mask_corrected = None
|
||||
self.pixel_mask_pf = None
|
||||
return None
|
||||
|
||||
# starting from ju 3.3.1 pedestal file is cached in library, re-calculated only if parameters (and/or pedestal file) have changed
|
||||
new_id_pixel_mask_corrected = id(pixel_mask_corrected)
|
||||
old_id_pixel_mask_corrected = self.id_pixel_mask_corrected
|
||||
if new_id_pixel_mask_corrected == old_id_pixel_mask_corrected:
|
||||
return self.pixel_mask_pf
|
||||
|
||||
pixel_mask_pf = np.ascontiguousarray(pixel_mask_corrected)
|
||||
calc_apply_additional_mask(results, pixel_mask_pf) # changes pixel_mask_pf in place
|
||||
|
||||
self.id_pixel_mask_corrected = new_id_pixel_mask_corrected
|
||||
self.pixel_mask_pf = pixel_mask_pf
|
||||
|
||||
return pixel_mask_pf
|
||||
|
||||
|
||||
def get_saturated_pixels(self, image, double_pixels):
|
||||
return self.ju_stream_adapter.handler.get_saturated_pixels(image, double_pixels=double_pixels)
|
||||
|
||||
|
||||
|
11
dap/algos/mask.py
Normal file
11
dap/algos/mask.py
Normal file
@ -0,0 +1,11 @@
|
||||
import numpy as np
|
||||
|
||||
|
||||
def calc_mask_pixels(data, pixel_mask_pf):
|
||||
if pixel_mask_pf is None:
|
||||
return
|
||||
|
||||
data[~pixel_mask_pf] = np.nan
|
||||
|
||||
|
||||
|
67
dap/algos/peakfind.py
Normal file
67
dap/algos/peakfind.py
Normal file
@ -0,0 +1,67 @@
|
||||
from copy import copy
|
||||
|
||||
import numpy as np
|
||||
|
||||
from peakfinder8_extension import peakfinder_8
|
||||
|
||||
|
||||
def calc_peakfinder_analysis(results, data, pixel_mask_pf):
|
||||
do_peakfinder_analysis = results.get("do_peakfinder_analysis", False)
|
||||
if not do_peakfinder_analysis:
|
||||
return
|
||||
|
||||
if pixel_mask_pf is None:
|
||||
return
|
||||
|
||||
for k in ("beam_center_x", "beam_center_y", "hitfinder_min_snr", "hitfinder_min_pix_count", "hitfinder_adc_thresh"):
|
||||
if k not in results:
|
||||
return
|
||||
|
||||
# to coordinates where position of first pixel/point is (0.5, 0.5)
|
||||
x_beam = results["beam_center_x"] - 0.5
|
||||
y_beam = results["beam_center_y"] - 0.5
|
||||
|
||||
hitfinder_min_snr = results["hitfinder_min_snr"]
|
||||
hitfinder_min_pix_count = int(results["hitfinder_min_pix_count"])
|
||||
hitfinder_adc_thresh = results["hitfinder_adc_thresh"]
|
||||
|
||||
asic_ny, asic_nx = data.shape
|
||||
nasics_y, nasics_x = 1, 1
|
||||
hitfinder_max_pix_count = 100
|
||||
max_num_peaks = 10000
|
||||
|
||||
# usually don't need to change this value, rather robust
|
||||
hitfinder_local_bg_radius = 20.
|
||||
|
||||
y, x = np.indices(data.shape)
|
||||
pix_r = np.sqrt((x - x_beam)**2 + (y - y_beam)**2)
|
||||
|
||||
peak_list_x, peak_list_y, peak_list_value = peakfinder_8(
|
||||
max_num_peaks,
|
||||
data.astype(np.float32),
|
||||
pixel_mask_pf.astype(np.int8),
|
||||
pix_r.astype(np.float32),
|
||||
asic_nx, asic_ny,
|
||||
nasics_x, nasics_y,
|
||||
hitfinder_adc_thresh,
|
||||
hitfinder_min_snr,
|
||||
hitfinder_min_pix_count,
|
||||
hitfinder_max_pix_count,
|
||||
hitfinder_local_bg_radius
|
||||
)
|
||||
|
||||
number_of_spots = len(peak_list_x)
|
||||
results["number_of_spots"] = number_of_spots
|
||||
|
||||
# to coordinates where position of first pixel/point is (1, 1)
|
||||
results["spot_x"] = [x + 0.5 for x in peak_list_x]
|
||||
results["spot_y"] = [y + 0.5 for y in peak_list_y]
|
||||
results["spot_intensity"] = copy(peak_list_value) #TODO: why is this copy needed?
|
||||
|
||||
npeaks_threshold_hit = results.get("npeaks_threshold_hit", 15)
|
||||
|
||||
if number_of_spots >= npeaks_threshold_hit:
|
||||
results["is_hit_frame"] = True
|
||||
|
||||
|
||||
|
@ -1,23 +1,86 @@
|
||||
import numpy as np
|
||||
|
||||
from .thresh import threshold
|
||||
from .utils import npmemo
|
||||
|
||||
def radial_profile(data, r, nr, keep_pixels=None):
|
||||
|
||||
def calc_radial_integration(results, data, pixel_mask_pf):
|
||||
do_radial_integration = results.get("do_radial_integration", False)
|
||||
if not do_radial_integration:
|
||||
return
|
||||
|
||||
center_x = results["beam_center_x"]
|
||||
center_y = results["beam_center_y"]
|
||||
|
||||
rad, norm = prepare_radial_profile(data.shape, center_x, center_y, pixel_mask_pf)
|
||||
|
||||
r_min = min(rad)
|
||||
r_max = max(rad) + 1
|
||||
|
||||
data = calc_apply_threshold(results, data)
|
||||
|
||||
rp = radial_profile(data, rad, norm, pixel_mask_pf)
|
||||
|
||||
silent_min = results.get("radial_integration_silent_min", None)
|
||||
silent_max = results.get("radial_integration_silent_max", None)
|
||||
|
||||
if (
|
||||
silent_min is not None and
|
||||
silent_max is not None and
|
||||
#TODO: skipping entirely is a guess, but not obvious -- better to ensure the order min < max by switching them if needed
|
||||
silent_max > silent_min and
|
||||
silent_min > r_min and
|
||||
silent_max < r_max
|
||||
):
|
||||
silent_region = rp[silent_min:silent_max]
|
||||
integral_silent_region = np.sum(silent_region)
|
||||
rp = rp / integral_silent_region
|
||||
results["radint_normalised"] = [silent_min, silent_max]
|
||||
|
||||
results["radint_I"] = rp[r_min:].tolist() #TODO: why not stop at r_max?
|
||||
results["radint_q"] = [r_min, r_max]
|
||||
|
||||
|
||||
|
||||
@npmemo
|
||||
def prepare_radial_profile(shape, x0, y0, keep_pixels):
|
||||
y, x = np.indices(shape)
|
||||
rad = np.sqrt((x - x0)**2 + (y - y0)**2)
|
||||
if keep_pixels is not None:
|
||||
tbin = np.bincount(r, data[keep_pixels].ravel())
|
||||
else:
|
||||
tbin = np.bincount(r, data.ravel())
|
||||
radialprofile = tbin / nr
|
||||
return radialprofile
|
||||
rad = rad[keep_pixels]
|
||||
rad = rad.astype(int).ravel()
|
||||
norm = np.bincount(rad)
|
||||
return rad, norm
|
||||
|
||||
def prepare_radial_profile(data, center, keep_pixels=None):
|
||||
y, x = np.indices((data.shape))
|
||||
r = np.sqrt((x - center[0])**2 + (y - center[1])**2)
|
||||
|
||||
def radial_profile(data, rad, norm, keep_pixels):
|
||||
if keep_pixels is not None:
|
||||
r = r[keep_pixels].astype(int).ravel()
|
||||
else:
|
||||
r = r.astype(np.int).ravel()
|
||||
nr = np.bincount(r)
|
||||
return r, nr
|
||||
data = data[keep_pixels]
|
||||
data = data.ravel()
|
||||
tbin = np.bincount(rad, data)
|
||||
rp = tbin / norm
|
||||
return rp
|
||||
|
||||
|
||||
|
||||
#TODO: this is duplicated in calc_apply_threshold and calc_force_send
|
||||
def calc_apply_threshold(results, data):
|
||||
apply_threshold = results.get("apply_threshold", False)
|
||||
if not apply_threshold:
|
||||
return data
|
||||
|
||||
for k in ("threshold_min", "threshold_max"):
|
||||
if k not in results:
|
||||
return data
|
||||
|
||||
data = data.copy() # do the following in-place changes on a copy
|
||||
|
||||
threshold_min = float(results["threshold_min"])
|
||||
threshold_max = float(results["threshold_max"])
|
||||
|
||||
threshold(data, threshold_min, threshold_max, np.nan)
|
||||
|
||||
return data
|
||||
|
||||
|
||||
|
||||
|
56
dap/algos/roi.py
Normal file
56
dap/algos/roi.py
Normal file
@ -0,0 +1,56 @@
|
||||
import numpy as np
|
||||
|
||||
|
||||
def calc_roi(results, data, pixel_mask_pf):
|
||||
#TODO: why is this checked here?
|
||||
if pixel_mask_pf is None:
|
||||
return
|
||||
|
||||
for k in ("roi_x1", "roi_x2", "roi_y1", "roi_y2"):
|
||||
if k not in results:
|
||||
return
|
||||
|
||||
roi_x1 = results["roi_x1"]
|
||||
roi_x2 = results["roi_x2"]
|
||||
roi_y1 = results["roi_y1"]
|
||||
roi_y2 = results["roi_y2"]
|
||||
|
||||
if len(roi_x1) == 0:
|
||||
return
|
||||
|
||||
if not (len(roi_x1) == len(roi_x2) == len(roi_y1) == len(roi_y2)):
|
||||
return
|
||||
|
||||
threshold_value = results.get("threshold_value", "NaN")
|
||||
|
||||
roi_intensities = []
|
||||
roi_intensities_normalised = []
|
||||
roi_intensities_x = []
|
||||
roi_intensities_proj_x = []
|
||||
|
||||
for ix1, ix2, iy1, iy2 in zip(roi_x1, roi_x2, roi_y1, roi_y2):
|
||||
data_roi = data[iy1:iy2, ix1:ix2]
|
||||
|
||||
roi_sum = np.nansum(data_roi, dtype=float) # data_roi is np.float32, which cannot be json serialized
|
||||
|
||||
if threshold_value == "NaN":
|
||||
roi_area = (ix2 - ix1) * (iy2 - iy1)
|
||||
roi_sum_norm = roi_sum / roi_area
|
||||
else:
|
||||
roi_sum_norm = np.nanmean(data_roi, dtype=float) # data_roi is np.float32, which cannot be json serialized
|
||||
|
||||
roi_indices_x = [ix1, ix2]
|
||||
roi_proj_x = np.nansum(data_roi, axis=0).tolist()
|
||||
|
||||
roi_intensities.append(roi_sum)
|
||||
roi_intensities_normalised.append(roi_sum_norm)
|
||||
roi_intensities_x.append(roi_indices_x)
|
||||
roi_intensities_proj_x.append(roi_proj_x)
|
||||
|
||||
results["roi_intensities"] = roi_intensities
|
||||
results["roi_intensities_normalised"] = roi_intensities_normalised
|
||||
results["roi_intensities_x"] = roi_intensities_x
|
||||
results["roi_intensities_proj_x"] = roi_intensities_proj_x
|
||||
|
||||
|
||||
|
32
dap/algos/spiana.py
Normal file
32
dap/algos/spiana.py
Normal file
@ -0,0 +1,32 @@
|
||||
|
||||
def calc_spi_analysis(results):
|
||||
do_spi_analysis = results.get("do_spi_analysis", False)
|
||||
if not do_spi_analysis:
|
||||
return
|
||||
|
||||
for k in ("spi_limit", "roi_intensities_normalised"):
|
||||
if k not in results:
|
||||
return
|
||||
|
||||
spi_limit = results["spi_limit"]
|
||||
roi_intensities_normalised = results["roi_intensities_normalised"]
|
||||
|
||||
if len(spi_limit) != 2:
|
||||
return
|
||||
|
||||
if len(roi_intensities_normalised) < 2:
|
||||
return
|
||||
|
||||
number_of_spots = 0
|
||||
if roi_intensities_normalised[0] >= spi_limit[0]:
|
||||
number_of_spots += 25
|
||||
if roi_intensities_normalised[1] >= spi_limit[1]:
|
||||
number_of_spots += 50
|
||||
|
||||
results["number_of_spots"] = number_of_spots
|
||||
|
||||
if number_of_spots > 0:
|
||||
results["is_hit_frame"] = True
|
||||
|
||||
|
||||
|
34
dap/algos/thresh.py
Normal file
34
dap/algos/thresh.py
Normal file
@ -0,0 +1,34 @@
|
||||
import numpy as np
|
||||
|
||||
|
||||
#TODO: this is duplicated in calc_radial_integration and calc_force_send
|
||||
def calc_apply_threshold(results, data):
|
||||
apply_threshold = results.get("apply_threshold", False)
|
||||
if not apply_threshold:
|
||||
return
|
||||
|
||||
for k in ("threshold_min", "threshold_max"):
|
||||
if k not in results:
|
||||
return
|
||||
|
||||
threshold_value_choice = results.get("threshold_value", "NaN")
|
||||
threshold_value = 0 if threshold_value_choice == "0" else np.nan #TODO
|
||||
|
||||
threshold_min = float(results["threshold_min"])
|
||||
threshold_max = float(results["threshold_max"])
|
||||
|
||||
threshold(data, threshold_min, threshold_max, threshold_value)
|
||||
|
||||
|
||||
|
||||
def threshold(data, vmin, vmax, replacement):
|
||||
"""
|
||||
threshold data in place by replacing values < vmin and values > vmax with replacement
|
||||
"""
|
||||
data[data < vmin] = replacement
|
||||
#TODO: skipping max is a guess, but not obvious/symmetric -- better to ensure the order min < max by switching them if needed
|
||||
if vmax > vmin:
|
||||
data[data > vmax] = replacement
|
||||
|
||||
|
||||
|
4
dap/algos/utils/__init__.py
Normal file
4
dap/algos/utils/__init__.py
Normal file
@ -0,0 +1,4 @@
|
||||
|
||||
from .npmemo import npmemo
|
||||
|
||||
|
87
dap/algos/utils/npmemo.py
Normal file
87
dap/algos/utils/npmemo.py
Normal file
@ -0,0 +1,87 @@
|
||||
import functools
|
||||
#import hashlib
|
||||
import numpy as np
|
||||
|
||||
|
||||
def npmemo(func):
|
||||
"""
|
||||
numpy array aware memoizer with size limit
|
||||
"""
|
||||
maxsize = 10
|
||||
order = []
|
||||
cache = {}
|
||||
|
||||
@functools.wraps(func)
|
||||
def wrapper(*args):
|
||||
key = make_key(args)
|
||||
try:
|
||||
return cache[key]
|
||||
except KeyError:
|
||||
if len(cache) >= maxsize:
|
||||
oldest = order.pop(0)
|
||||
cache.pop(oldest)
|
||||
order.append(key)
|
||||
cache[key] = res = func(*args)
|
||||
return res
|
||||
|
||||
# wrapper.cache = cache
|
||||
return wrapper
|
||||
|
||||
|
||||
def make_key(args):
|
||||
return tuple(make_key_entry(i) for i in args)
|
||||
|
||||
def make_key_entry(x):
|
||||
if isinstance(x, np.ndarray):
|
||||
return np_array_hash(x)
|
||||
return x
|
||||
|
||||
def np_array_hash(arr):
|
||||
# return id(arr) # this has been used so far
|
||||
res = arr.tobytes()
|
||||
# res = hashlib.sha256(res).hexdigest() # if tobytes was too large, we could hash it
|
||||
# res = (arr.shape, res) # tobytes does not take shape into account
|
||||
return res
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
@npmemo
|
||||
def expensive(arr, offset):
|
||||
print("recalc", arr, offset)
|
||||
return np.dot(arr, arr) + offset
|
||||
|
||||
def test(arr, offset):
|
||||
print("first")
|
||||
res1 = expensive(arr, offset)
|
||||
print("second")
|
||||
res2 = expensive(arr, offset)
|
||||
print()
|
||||
assert np.array_equal(res1, res2)
|
||||
|
||||
arrays = (
|
||||
[1, 2, 3],
|
||||
[1, 2, 3, 4],
|
||||
[1, 2, 3, 4]
|
||||
)
|
||||
|
||||
offsets = (
|
||||
2,
|
||||
2,
|
||||
5
|
||||
)
|
||||
|
||||
for a, o in zip(arrays, offsets):
|
||||
a = np.array(a)
|
||||
test(a, o)
|
||||
|
||||
for a, o in zip(arrays, offsets):
|
||||
a = np.array(a)
|
||||
test(a, o)
|
||||
|
||||
# print(expensive.cache)
|
||||
|
||||
|
||||
|
7
dap/utils/__init__.py
Normal file
7
dap/utils/__init__.py
Normal file
@ -0,0 +1,7 @@
|
||||
|
||||
from .aggregator import Aggregator
|
||||
from .bits import read_bit
|
||||
from .bufjson import BufferedJSON
|
||||
from .randskip import randskip
|
||||
|
||||
|
32
dap/utils/aggregator.py
Normal file
32
dap/utils/aggregator.py
Normal file
@ -0,0 +1,32 @@
|
||||
|
||||
class Aggregator:
|
||||
|
||||
def __init__(self):
|
||||
self.reset()
|
||||
|
||||
def reset(self):
|
||||
self.data = None
|
||||
self.counter = 0
|
||||
self.nmax = None
|
||||
|
||||
def add(self, item):
|
||||
if self.data is None:
|
||||
self.data = item.copy()
|
||||
self.counter = 1
|
||||
else:
|
||||
self.data += item
|
||||
self.counter += 1
|
||||
return self
|
||||
|
||||
__iadd__ = add
|
||||
|
||||
def is_ready(self):
|
||||
if self.nmax is None:
|
||||
return False
|
||||
return (self.counter >= self.nmax)
|
||||
|
||||
def __repr__(self):
|
||||
return f"{self.data!r} # ({self.counter} / {self.nmax})"
|
||||
|
||||
|
||||
|
9
dap/utils/bits.py
Normal file
9
dap/utils/bits.py
Normal file
@ -0,0 +1,9 @@
|
||||
|
||||
def read_bit(bits, n):
|
||||
"""
|
||||
read the n-th bit from bits as boolean
|
||||
"""
|
||||
return bool((bits >> n) & 1)
|
||||
|
||||
|
||||
|
50
dap/utils/bufjson.py
Normal file
50
dap/utils/bufjson.py
Normal file
@ -0,0 +1,50 @@
|
||||
import json
|
||||
import os
|
||||
from time import sleep
|
||||
|
||||
|
||||
class BufferedJSON:
|
||||
|
||||
def __init__(self, fname):
|
||||
self.fname = fname
|
||||
self.last_time = self.get_time()
|
||||
self.last_data = self.get_data()
|
||||
|
||||
|
||||
def load(self):
|
||||
current_time = self.get_time()
|
||||
time_delta = current_time - self.last_time
|
||||
if time_delta <= 2: #TODO: is that a good time?
|
||||
return self.last_data
|
||||
|
||||
#TODO: logging for change?
|
||||
sleep(0.5) #TODO: why?
|
||||
current_data = self.get_data()
|
||||
self.last_time = current_time
|
||||
self.last_data = current_data
|
||||
return current_data
|
||||
|
||||
|
||||
def get_time(self):
|
||||
if not self.exists():
|
||||
return -1
|
||||
return os.path.getmtime(self.fname)
|
||||
|
||||
def get_data(self, *args, **kwargs):
|
||||
if not self.exists():
|
||||
return {}
|
||||
return json_load(self.fname, *args, **kwargs)
|
||||
|
||||
def exists(self):
|
||||
if not self.fname:
|
||||
return False
|
||||
return os.path.exists(self.fname)
|
||||
|
||||
|
||||
|
||||
def json_load(filename, *args, **kwargs):
|
||||
with open(filename, "r") as f:
|
||||
return json.load(f, *args, **kwargs)
|
||||
|
||||
|
||||
|
39
dap/utils/randskip.py
Normal file
39
dap/utils/randskip.py
Normal file
@ -0,0 +1,39 @@
|
||||
from random import randint
|
||||
|
||||
|
||||
def randskip(skip_rate):
|
||||
return (randint(1, skip_rate) != 1)
|
||||
|
||||
|
||||
# from randint docs:
|
||||
# randint(a, b)
|
||||
# Return random integer in range [a, b], including both end points.
|
||||
|
||||
# thus:
|
||||
# randskip(1) -> False 100% of times (never skip)
|
||||
# randskip(10) -> False 10% of times (skip 90%)
|
||||
# randskip(100) -> False 1% of times (skip 99%)
|
||||
|
||||
#TODO: does this actually make sense?
|
||||
# the following seems much clearer:
|
||||
|
||||
|
||||
#from random import random
|
||||
|
||||
#def randskip(percentage):
|
||||
# """
|
||||
# Return True percentage % of times
|
||||
# Return False (100 - percentage) % of times
|
||||
# """
|
||||
# return 100 * random() < percentage
|
||||
|
||||
|
||||
## thus:
|
||||
# randskip(0) -> False 100% of times (never skip)
|
||||
# randskip(1) -> False 99% of times (skip 1%)
|
||||
# randskip(10) -> False 10% of times (skip 90%)
|
||||
# randskip(99) -> False 1% of times (skip 99%)
|
||||
# randskip(100) -> False 0% of times (always skip)
|
||||
|
||||
|
||||
|
469
dap/worker.py
469
dap/worker.py
@ -1,20 +1,10 @@
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
from copy import copy
|
||||
from random import randint
|
||||
from time import sleep
|
||||
|
||||
import jungfrau_utils as ju
|
||||
import numpy as np
|
||||
import zmq
|
||||
from peakfinder8_extension import peakfinder_8
|
||||
|
||||
from algos import prepare_radial_profile, radial_profile
|
||||
|
||||
|
||||
FLAGS = 0
|
||||
|
||||
from algos import calc_apply_aggregation, calc_apply_threshold, calc_mask_pixels, calc_peakfinder_analysis, calc_radial_integration, calc_roi, calc_spi_analysis, JFData
|
||||
from utils import Aggregator, BufferedJSON, randskip, read_bit
|
||||
from zmqsocks import ZMQSockets
|
||||
|
||||
|
||||
def main():
|
||||
@ -46,434 +36,111 @@ def main():
|
||||
|
||||
|
||||
def work(backend_address, accumulator_host, accumulator_port, visualisation_host, visualisation_port, fn_peakfinder_parameters, skip_frames_rate):
|
||||
peakfinder_parameters = {}
|
||||
peakfinder_parameters_time = -1
|
||||
if fn_peakfinder_parameters is not None and os.path.exists(fn_peakfinder_parameters):
|
||||
with open(fn_peakfinder_parameters, "r") as read_file:
|
||||
peakfinder_parameters = json.load(read_file)
|
||||
peakfinder_parameters_time = os.path.getmtime(fn_peakfinder_parameters)
|
||||
bj_peakfinder_parameters = BufferedJSON(fn_peakfinder_parameters)
|
||||
|
||||
pulseid = 0
|
||||
jfdata = JFData()
|
||||
|
||||
ju_stream_adapter = ju.StreamAdapter()
|
||||
zmq_socks = ZMQSockets(backend_address, accumulator_host, accumulator_port, visualisation_host, visualisation_port)
|
||||
|
||||
zmq_context = zmq.Context(io_threads=4)
|
||||
poller = zmq.Poller()
|
||||
aggregator = Aggregator()
|
||||
|
||||
# all the normal workers
|
||||
worker = 1
|
||||
|
||||
# receive from backend:
|
||||
backend_socket = zmq_context.socket(zmq.PULL)
|
||||
backend_socket.connect(backend_address)
|
||||
|
||||
poller.register(backend_socket, zmq.POLLIN)
|
||||
|
||||
accumulator_socket = zmq_context.socket(zmq.PUSH)
|
||||
accumulator_socket.connect(f"tcp://{accumulator_host}:{accumulator_port}")
|
||||
|
||||
visualisation_socket = zmq_context.socket(zmq.PUB)
|
||||
visualisation_socket.connect(f"tcp://{visualisation_host}:{visualisation_port}")
|
||||
|
||||
# in case of problem with communication to visualisation, keep in 0mq buffer only few messages
|
||||
visualisation_socket.set_hwm(10)
|
||||
|
||||
keep_pixels = None
|
||||
r_radial_integration = None
|
||||
center_radial_integration = None
|
||||
|
||||
results = {}
|
||||
|
||||
pedestal_file_name_saved = None
|
||||
|
||||
pixel_mask_corrected = None
|
||||
pixel_mask_pf = None
|
||||
|
||||
n_aggregated_images = 1
|
||||
data_summed = None
|
||||
|
||||
while True:
|
||||
if not zmq_socks.has_data():
|
||||
continue
|
||||
|
||||
raw_image, metadata = zmq_socks.get_data()
|
||||
|
||||
if metadata["shape"] == [2, 2]: # this is used as marker for empty images
|
||||
continue
|
||||
|
||||
|
||||
pulse_id = metadata.get("pulse_id", 0)
|
||||
|
||||
# check if peakfinder parameters changed and then re-read it
|
||||
try:
|
||||
if peakfinder_parameters_time > 0:
|
||||
new_time = os.path.getmtime(fn_peakfinder_parameters)
|
||||
time_delta = new_time - peakfinder_parameters_time
|
||||
if time_delta > 2.0:
|
||||
old_peakfinder_parameters = peakfinder_parameters
|
||||
sleep(0.5)
|
||||
with open(fn_peakfinder_parameters, "r") as read_file:
|
||||
peakfinder_parameters = json.load(read_file)
|
||||
peakfinder_parameters_time = new_time
|
||||
center_radial_integration = None
|
||||
if worker == 0:
|
||||
print(f"({pulseid}) update peakfinder parameters {old_peakfinder_parameters}", flush=True)
|
||||
print(f" --> {peakfinder_parameters}", flush=True)
|
||||
print("",flush=True)
|
||||
peakfinder_parameters = bj_peakfinder_parameters.load()
|
||||
except Exception as e:
|
||||
print(f"({pulseid}) problem ({e}) to read peakfinder parameters file, worker : {worker}", flush=True)
|
||||
print(f"({pulse_id}) cannot read peakfinder parameters file: {e}", flush=True) #TODO: logging?
|
||||
|
||||
events = dict(poller.poll(2000)) # check every 2 seconds in each worker
|
||||
if backend_socket not in events:
|
||||
continue
|
||||
|
||||
metadata = backend_socket.recv_json(FLAGS)
|
||||
image = backend_socket.recv(FLAGS, copy=False, track=False)
|
||||
image = np.frombuffer(image, dtype=metadata["type"]).reshape(metadata["shape"])
|
||||
|
||||
results = copy(metadata)
|
||||
if results["shape"][0] == 2 and results["shape"][1] == 2:
|
||||
continue
|
||||
|
||||
pulseid = results.get("pulse_id", 0)
|
||||
results = metadata.copy()
|
||||
results.update(peakfinder_parameters)
|
||||
|
||||
detector = results.get("detector_name", "")
|
||||
|
||||
results["laser_on"] = False
|
||||
results["number_of_spots"] = 0
|
||||
results["is_hit_frame"] = False
|
||||
|
||||
|
||||
daq_rec = results.get("daq_rec", 0)
|
||||
event_laser = bool((daq_rec >> 16) & 1)
|
||||
event_darkshot = bool((daq_rec >> 17) & 1)
|
||||
# event_fel = bool((daq_rec >> 18) & 1)
|
||||
event_ppicker = bool((daq_rec >> 19) & 1)
|
||||
event_laser = read_bit(daq_rec, 16)
|
||||
event_darkshot = read_bit(daq_rec, 17)
|
||||
# event_fel = read_bit(daq_rec, 18)
|
||||
event_ppicker = read_bit(daq_rec, 19)
|
||||
|
||||
if not event_darkshot:
|
||||
results["laser_on"] = event_laser
|
||||
results["laser_on"] = event_laser and not event_darkshot
|
||||
|
||||
# Filter only ppicker events, if requested; skipping all other events
|
||||
# if requested, filter on ppicker events by skipping other events
|
||||
select_only_ppicker_events = results.get("select_only_ppicker_events", False)
|
||||
if select_only_ppicker_events and not event_ppicker:
|
||||
continue
|
||||
|
||||
# special settings for p20270, only few shots were opened by pulse-picker
|
||||
# if detector in ["JF06T32V02"]:
|
||||
# if event_ppicker:
|
||||
# results["number_of_spots"] = 50
|
||||
# results["is_hit_frame"] = True
|
||||
|
||||
pedestal_name = metadata.get("pedestal_name", None)
|
||||
|
||||
jfdata.ensure_current_pixel_mask(pedestal_name)
|
||||
|
||||
double_pixels = results.get("double_pixels", "mask")
|
||||
|
||||
pedestal_file_name = metadata.get("pedestal_name", None)
|
||||
if pedestal_file_name is not None and pedestal_file_name != pedestal_file_name_saved:
|
||||
pixel_mask_current = ju_stream_adapter.handler.pixel_mask
|
||||
ju_stream_adapter.handler.pixel_mask = pixel_mask_current
|
||||
pedestal_file_name_saved = pedestal_file_name
|
||||
image = jfdata.process(raw_image, metadata, double_pixels)
|
||||
|
||||
data = ju_stream_adapter.process(image, metadata, double_pixels=double_pixels)
|
||||
|
||||
# pedestal file is not in stream, skip this frame
|
||||
if ju_stream_adapter.handler.pedestal_file is None or ju_stream_adapter.handler.pedestal_file == "":
|
||||
if image is None:
|
||||
continue
|
||||
|
||||
data = np.ascontiguousarray(data)
|
||||
pixel_mask_pf = jfdata.get_pixel_mask(results, double_pixels)
|
||||
|
||||
# starting from ju 3.3.1 pedestal file is cached in library, re-calculated only if parameters (and/or pedestal file) are changed
|
||||
id_pixel_mask_1 = id(pixel_mask_corrected)
|
||||
pixel_mask_corrected = ju_stream_adapter.handler.get_pixel_mask(geometry=True, gap_pixels=True, double_pixels=double_pixels)
|
||||
id_pixel_mask_2 = id(pixel_mask_corrected)
|
||||
|
||||
if id_pixel_mask_1 != id_pixel_mask_2:
|
||||
keep_pixels = None
|
||||
r_radial_integration = None
|
||||
if pixel_mask_corrected is not None:
|
||||
#pixel_mask_corrected = np.ascontiguousarray(pixel_mask_corrected)
|
||||
pixel_mask_pf = np.ascontiguousarray(pixel_mask_corrected).astype(np.int8, copy=False)
|
||||
|
||||
else:
|
||||
pixel_mask_pf = None
|
||||
|
||||
# add additional mask at the edge of modules for JF06T08
|
||||
apply_additional_mask = results.get("apply_additional_mask", False)
|
||||
if detector == "JF06T08V04" and apply_additional_mask:
|
||||
# edge pixels
|
||||
pixel_mask_pf[67:1097,1063] = 0
|
||||
pixel_mask_pf[0:1030, 1100] = 0
|
||||
|
||||
pixel_mask_pf[1106:2136, 1131] = 0
|
||||
pixel_mask_pf[1039:2069, 1168] = 0
|
||||
|
||||
pixel_mask_pf[1039:2069, 1718] = 0
|
||||
pixel_mask_pf[1039:2069, 1681] = 0
|
||||
|
||||
pixel_mask_pf[1106:2136, 618] = 0
|
||||
|
||||
pixel_mask_pf[1106:2136, 581] = 0
|
||||
|
||||
pixel_mask_pf[67:1097,513] = 0
|
||||
|
||||
pixel_mask_pf[67:1097, 550] = 0
|
||||
|
||||
pixel_mask_pf[0:1030, 1650] = 0
|
||||
|
||||
pixel_mask_pf[0:1030, 1613] = 0
|
||||
|
||||
pixel_mask_pf[1106, 68:582] = 0
|
||||
|
||||
pixel_mask_pf[1096, 550:1064] = 0
|
||||
pixel_mask_pf[1106, 618:1132] = 0
|
||||
|
||||
pixel_mask_pf[1029, 1100:1614] = 0
|
||||
pixel_mask_pf[1039, 1168:1682] = 0
|
||||
|
||||
pixel_mask_pf[1039, 1718:2230] = 0
|
||||
|
||||
pixel_mask_pf[1096, 0:513] = 0
|
||||
|
||||
pixel_mask_pf[1029, 1650:2163] = 0
|
||||
|
||||
pixel_mask_pf[2068, 1168:2232] = 0
|
||||
|
||||
pixel_mask_pf[67,0:1063] = 0
|
||||
|
||||
#bad region in left bottom inner module
|
||||
pixel_mask_pf[842:1097, 669:671] = 0
|
||||
|
||||
#second bad region in left bottom inner module
|
||||
pixel_mask_pf[1094, 620:807] = 0
|
||||
|
||||
# vertical line in upper left bottom module
|
||||
pixel_mask_pf[842:1072, 87:90] = 0
|
||||
|
||||
pixel_mask_pf[1794, 1503:1550] = 0
|
||||
|
||||
if detector == "JF17T16V01" and apply_additional_mask:
|
||||
# mask module 11
|
||||
pixel_mask_pf[2619:3333,1577:2607] = 0
|
||||
|
||||
if pixel_mask_corrected is not None:
|
||||
data_s = copy(image)
|
||||
saturated_pixels_coordinates = ju_stream_adapter.handler.get_saturated_pixels(data_s, mask=True, geometry=True, gap_pixels=True, double_pixels=double_pixels)
|
||||
results["saturated_pixels"] = len(saturated_pixels_coordinates[0])
|
||||
results["saturated_pixels_x"] = saturated_pixels_coordinates[1].tolist()
|
||||
results["saturated_pixels_y"] = saturated_pixels_coordinates[0].tolist()
|
||||
|
||||
# pump probe analysis
|
||||
do_radial_integration = results.get("do_radial_integration", False)
|
||||
|
||||
if do_radial_integration:
|
||||
|
||||
data_copy_1 = np.copy(data)
|
||||
|
||||
if keep_pixels is None and pixel_mask_pf is not None:
|
||||
keep_pixels = pixel_mask_pf!=0
|
||||
if center_radial_integration is None:
|
||||
center_radial_integration = [results["beam_center_x"], results["beam_center_y"]]
|
||||
r_radial_integration = None
|
||||
if r_radial_integration is None:
|
||||
r_radial_integration, nr_radial_integration = prepare_radial_profile(data_copy_1, center_radial_integration, keep_pixels)
|
||||
r_min_max = [int(np.min(r_radial_integration)), int(np.max(r_radial_integration)) + 1]
|
||||
|
||||
|
||||
apply_threshold = results.get("apply_threshold", False)
|
||||
|
||||
if apply_threshold and all(k in results for k in ("threshold_min", "threshold_max")):
|
||||
threshold_min = float(results["threshold_min"])
|
||||
threshold_max = float(results["threshold_max"])
|
||||
data_copy_1[data_copy_1 < threshold_min] = np.nan
|
||||
if threshold_max > threshold_min:
|
||||
data_copy_1[data_copy_1 > threshold_max] = np.nan
|
||||
|
||||
rp = radial_profile(data_copy_1, r_radial_integration, nr_radial_integration, keep_pixels)
|
||||
|
||||
silent_region_min = results.get("radial_integration_silent_min", None)
|
||||
silent_region_max = results.get("radial_integration_silent_max", None)
|
||||
|
||||
if (
|
||||
silent_region_min is not None and
|
||||
silent_region_max is not None and
|
||||
silent_region_max > silent_region_min and
|
||||
silent_region_min > r_min_max[0] and
|
||||
silent_region_max < r_min_max[1]
|
||||
):
|
||||
|
||||
integral_silent_region = np.sum(rp[silent_region_min:silent_region_max])
|
||||
rp = rp / integral_silent_region
|
||||
results["radint_normalised"] = [silent_region_min, silent_region_max]
|
||||
|
||||
results["radint_I"] = list(rp[r_min_max[0]:])
|
||||
results["radint_q"] = r_min_max
|
||||
|
||||
#copy image to work with peakfinder, just in case
|
||||
d = np.copy(data)
|
||||
|
||||
# make all masked pixels values nans
|
||||
if pixel_mask_pf is not None:
|
||||
d[pixel_mask_pf != 1] = np.nan
|
||||
|
||||
apply_threshold = results.get("apply_threshold", False)
|
||||
threshold_value_choice = results.get("threshold_value", "NaN")
|
||||
threshold_value = 0 if threshold_value_choice == "0" else np.nan
|
||||
if apply_threshold and all(k in results for k in ("threshold_min", "threshold_max")):
|
||||
threshold_min = float(results["threshold_min"])
|
||||
threshold_max = float(results["threshold_max"])
|
||||
d[d < threshold_min] = threshold_value
|
||||
if threshold_max > threshold_min:
|
||||
d[d > threshold_max] = threshold_value
|
||||
|
||||
# if roi calculation request is present, make it
|
||||
roi_x1 = results.get("roi_x1", [])
|
||||
roi_x2 = results.get("roi_x2", [])
|
||||
roi_y1 = results.get("roi_y1", [])
|
||||
roi_y2 = results.get("roi_y2", [])
|
||||
|
||||
if len(roi_x1) > 0 and len(roi_x1) == len(roi_x2) and len(roi_x1) == len(roi_y1) and len(roi_x1) == len(roi_y2):
|
||||
roi_results = [0] * len(roi_x1)
|
||||
roi_results_normalised = [0] * len(roi_x1)
|
||||
|
||||
if pixel_mask_pf is not None:
|
||||
|
||||
results["roi_intensities_x"] = []
|
||||
results["roi_intensities_proj_x"] = []
|
||||
|
||||
for iRoi in range(len(roi_x1)):
|
||||
data_roi = np.copy(d[roi_y1[iRoi]:roi_y2[iRoi], roi_x1[iRoi]:roi_x2[iRoi]])
|
||||
|
||||
roi_results[iRoi] = np.nansum(data_roi)
|
||||
if threshold_value_choice == "NaN":
|
||||
roi_results_normalised[iRoi] = roi_results[iRoi] / ((roi_y2[iRoi] - roi_y1[iRoi]) * (roi_x2[iRoi] - roi_x1[iRoi]))
|
||||
else:
|
||||
roi_results_normalised[iRoi] = np.nanmean(data_roi)
|
||||
|
||||
results["roi_intensities_x"].append([roi_x1[iRoi], roi_x2[iRoi]])
|
||||
results["roi_intensities_proj_x"].append(np.nansum(data_roi,axis=0).tolist())
|
||||
|
||||
results["roi_intensities"] = [float(r) for r in roi_results]
|
||||
results["roi_intensities_normalised"] = [float(r) for r in roi_results_normalised ]
|
||||
|
||||
# SPI analysis
|
||||
do_spi_analysis = results.get("do_spi_analysis", False)
|
||||
|
||||
if do_spi_analysis and "roi_intensities_normalised" in results and len(results["roi_intensities_normalised"]) >= 2:
|
||||
|
||||
if "spi_limit" in results and len(results["spi_limit"]) == 2:
|
||||
|
||||
number_of_spots = 0
|
||||
if results["roi_intensities_normalised"][0] >= results["spi_limit"][0]:
|
||||
number_of_spots += 25
|
||||
if results["roi_intensities_normalised"][1] >= results["spi_limit"][1]:
|
||||
number_of_spots += 50
|
||||
|
||||
results["number_of_spots"] = number_of_spots
|
||||
if number_of_spots > 0:
|
||||
results["is_hit_frame"] = True
|
||||
|
||||
# in case all needed parameters are present, make peakfinding
|
||||
do_peakfinder_analysis = results.get("do_peakfinder_analysis", False)
|
||||
if do_peakfinder_analysis and pixel_mask_pf is not None and all(k in results for k in ("beam_center_x", "beam_center_y", "hitfinder_min_snr", "hitfinder_min_pix_count", "hitfinder_adc_thresh")):
|
||||
x_beam = results["beam_center_x"] - 0.5 # to coordinates where position of first pixel/point is 0.5, 0.5
|
||||
y_beam = results["beam_center_y"] - 0.5 # to coordinates where position of first pixel/point is 0.5, 0.5
|
||||
hitfinder_min_snr = results["hitfinder_min_snr"]
|
||||
hitfinder_min_pix_count = int(results["hitfinder_min_pix_count"])
|
||||
hitfinder_adc_thresh = results["hitfinder_adc_thresh"]
|
||||
|
||||
asic_ny, asic_nx = d.shape
|
||||
nasics_y, nasics_x = 1, 1
|
||||
hitfinder_max_pix_count = 100
|
||||
max_num_peaks = 10000
|
||||
|
||||
# usually don't need to change this value, rather robust
|
||||
hitfinder_local_bg_radius= 20.
|
||||
|
||||
# in case of further modification with the mask, make a new one, independent from real mask
|
||||
maskPr = np.copy(pixel_mask_pf)
|
||||
|
||||
y, x = np.indices(d.shape)
|
||||
pix_r = np.sqrt((x-x_beam)**2 + (y-y_beam)**2)
|
||||
|
||||
peak_list_x, peak_list_y, peak_list_value = peakfinder_8(
|
||||
max_num_peaks,
|
||||
d.astype(np.float32),
|
||||
maskPr.astype(np.int8),
|
||||
pix_r.astype(np.float32),
|
||||
asic_nx, asic_ny,
|
||||
nasics_x, nasics_y,
|
||||
hitfinder_adc_thresh,
|
||||
hitfinder_min_snr,
|
||||
hitfinder_min_pix_count,
|
||||
hitfinder_max_pix_count,
|
||||
hitfinder_local_bg_radius
|
||||
)
|
||||
saturated_pixels_y, saturated_pixels_x = jfdata.get_saturated_pixels(raw_image, double_pixels)
|
||||
results["saturated_pixels"] = len(saturated_pixels_x)
|
||||
results["saturated_pixels_x"] = saturated_pixels_x.tolist()
|
||||
results["saturated_pixels_y"] = saturated_pixels_y.tolist()
|
||||
|
||||
|
||||
number_of_spots = len(peak_list_x)
|
||||
results["number_of_spots"] = number_of_spots
|
||||
if number_of_spots != 0:
|
||||
results["spot_x"] = [-1.0] * number_of_spots
|
||||
results["spot_y"] = [-1.0] * number_of_spots
|
||||
results["spot_intensity"] = copy(peak_list_value)
|
||||
for i in range(number_of_spots):
|
||||
results["spot_x"][i] = peak_list_x[i] + 0.5
|
||||
results["spot_y"][i] = peak_list_y[i] + 0.5
|
||||
else:
|
||||
results["spot_x"] = []
|
||||
results["spot_y"] = []
|
||||
results["spot_intensity"] = []
|
||||
calc_radial_integration(results, image, pixel_mask_pf)
|
||||
|
||||
npeaks_threshold_hit = results.get("npeaks_threshold_hit", 15)
|
||||
pfimage = image.copy() #TODO: is this copy needed?
|
||||
|
||||
if number_of_spots >= npeaks_threshold_hit:
|
||||
results["is_hit_frame"] = True
|
||||
calc_mask_pixels(pfimage, pixel_mask_pf) # changes pfimage in place
|
||||
calc_apply_threshold(results, pfimage) # changes pfimage in place
|
||||
calc_roi(results, pfimage, pixel_mask_pf)
|
||||
calc_spi_analysis(results)
|
||||
calc_peakfinder_analysis(results, pfimage, pixel_mask_pf)
|
||||
|
||||
forceSendVisualisation = False
|
||||
if data.dtype != np.uint16:
|
||||
apply_threshold = results.get("apply_threshold", False)
|
||||
apply_aggregation = results.get("apply_aggregation", False)
|
||||
if not apply_aggregation:
|
||||
data_summed = None
|
||||
n_aggregated_images = 1
|
||||
if apply_threshold or apply_aggregation:
|
||||
if apply_threshold and all(k in results for k in ("threshold_min", "threshold_max")):
|
||||
threshold_min = float(results["threshold_min"])
|
||||
threshold_max = float(results["threshold_max"])
|
||||
data[data < threshold_min] = 0.0
|
||||
if threshold_max > threshold_min:
|
||||
data[data > threshold_max] = 0.0
|
||||
if apply_aggregation and "aggregation_max" in results:
|
||||
if data_summed is not None:
|
||||
data += data_summed
|
||||
n_aggregated_images += 1
|
||||
data_summed = data.copy()
|
||||
data_summed[data == -np.nan] = -np.nan
|
||||
results["aggregated_images"] = n_aggregated_images
|
||||
results["worker"] = worker
|
||||
if n_aggregated_images >= results["aggregation_max"]:
|
||||
forceSendVisualisation = True
|
||||
data_summed = None
|
||||
n_aggregated_images = 1
|
||||
data[pixel_mask_pf == 0] = np.NaN
|
||||
# ???
|
||||
image, aggregation_is_ready = calc_apply_aggregation(results, image, pixel_mask_pf, aggregator)
|
||||
|
||||
else:
|
||||
data = image
|
||||
|
||||
results["type"] = str(data.dtype)
|
||||
results["shape"] = data.shape
|
||||
results["type"] = str(image.dtype)
|
||||
results["shape"] = image.shape
|
||||
|
||||
|
||||
accumulator_socket.send_json(results, FLAGS)
|
||||
zmq_socks.send_accumulator(results)
|
||||
|
||||
if apply_aggregation and "aggregation_max" in results:
|
||||
if forceSendVisualisation:
|
||||
visualisation_socket.send_json(results, FLAGS | zmq.SNDMORE)
|
||||
visualisation_socket.send(data, FLAGS, copy=True, track=True)
|
||||
else:
|
||||
data = np.empty((2, 2), dtype=np.uint16)
|
||||
results["type"] = str(data.dtype)
|
||||
results["shape"] = data.shape
|
||||
visualisation_socket.send_json(results, FLAGS | zmq.SNDMORE)
|
||||
visualisation_socket.send(data, FLAGS, copy=True, track=True)
|
||||
else:
|
||||
if results["is_good_frame"] and (results["is_hit_frame"] or randint(1, skip_frames_rate) == 1):
|
||||
visualisation_socket.send_json(results, FLAGS | zmq.SNDMORE)
|
||||
visualisation_socket.send(data, FLAGS, copy=True, track=True)
|
||||
else:
|
||||
data = np.empty((2, 2), dtype=np.uint16)
|
||||
results["type"] = str(data.dtype)
|
||||
results["shape"] = data.shape
|
||||
visualisation_socket.send_json(results, FLAGS | zmq.SNDMORE)
|
||||
visualisation_socket.send(data, FLAGS, copy=True, track=True)
|
||||
|
||||
apply_aggregation = results.get("apply_aggregation", False)
|
||||
aggregation_is_enabled = (apply_aggregation and "aggregation_max" in results)
|
||||
aggregation_is_enabled_but_not_ready = (aggregation_is_enabled and not aggregation_is_ready)
|
||||
|
||||
is_bad_frame = (not results["is_good_frame"])
|
||||
|
||||
# hits are sent at full rate, but no-hits are sent at reduced frequency
|
||||
is_no_hit_frame = (not results["is_hit_frame"])
|
||||
random_skip = randskip(skip_frames_rate)
|
||||
is_no_hit_frame_and_skipped = (is_no_hit_frame and random_skip)
|
||||
|
||||
if aggregation_is_enabled_but_not_ready or is_bad_frame or is_no_hit_frame_and_skipped:
|
||||
image = np.empty((2, 2), dtype=np.uint16)
|
||||
results["type"] = str(image.dtype)
|
||||
results["shape"] = image.shape
|
||||
|
||||
zmq_socks.send_visualisation(results, image)
|
||||
|
||||
|
||||
|
||||
|
49
dap/zmqsocks.py
Normal file
49
dap/zmqsocks.py
Normal file
@ -0,0 +1,49 @@
|
||||
import numpy as np
|
||||
import zmq
|
||||
|
||||
|
||||
FLAGS = 0
|
||||
|
||||
|
||||
class ZMQSockets:
|
||||
|
||||
def __init__(self, backend_address, accumulator_host, accumulator_port, visualisation_host, visualisation_port):
|
||||
zmq_context = zmq.Context(io_threads=4)
|
||||
self.poller = poller = zmq.Poller()
|
||||
|
||||
# receive from backend:
|
||||
self.backend_socket = backend_socket = zmq_context.socket(zmq.PULL)
|
||||
backend_socket.connect(backend_address)
|
||||
|
||||
poller.register(backend_socket, zmq.POLLIN)
|
||||
|
||||
self.accumulator_socket = accumulator_socket = zmq_context.socket(zmq.PUSH)
|
||||
accumulator_socket.connect(f"tcp://{accumulator_host}:{accumulator_port}")
|
||||
|
||||
self.visualisation_socket = visualisation_socket = zmq_context.socket(zmq.PUB)
|
||||
visualisation_socket.connect(f"tcp://{visualisation_host}:{visualisation_port}")
|
||||
|
||||
# in case of problem with communication to visualisation, keep in 0mq buffer only few messages
|
||||
visualisation_socket.set_hwm(10)
|
||||
|
||||
|
||||
def has_data(self):
|
||||
events = dict(self.poller.poll(2000)) # check every 2 seconds in each worker
|
||||
return (self.backend_socket in events)
|
||||
|
||||
def get_data(self):
|
||||
metadata = self.backend_socket.recv_json(FLAGS)
|
||||
image = self.backend_socket.recv(FLAGS, copy=False, track=False)
|
||||
image = np.frombuffer(image, dtype=metadata["type"]).reshape(metadata["shape"])
|
||||
return image, metadata
|
||||
|
||||
|
||||
def send_accumulator(self, results):
|
||||
self.accumulator_socket.send_json(results, FLAGS)
|
||||
|
||||
def send_visualisation(self, results, data):
|
||||
self.visualisation_socket.send_json(results, FLAGS | zmq.SNDMORE)
|
||||
self.visualisation_socket.send(data, FLAGS, copy=True, track=True)
|
||||
|
||||
|
||||
|
Reference in New Issue
Block a user