Streak Finder algorithm for CBD experiment #2

Merged
augustin_s merged 46 commits from ext-dorofe_e/dap:chapman into main 2025-07-14 11:18:07 +02:00
6 changed files with 362 additions and 1 deletions

104
README.md
View File

@@ -105,6 +105,48 @@ options:
* `'spot_x/spot_y/spot_intensity': 3*list[float]` - Provides coordinates and intensity of the identified peaks within the frame.
* `'is_hit_frame': True/False` - Marks whether a frame qualifies as a hit based on the number of identified peaks exceeding the defined threshold.
* **streakfinder Algorithm**
This algorithm is using [streak-finder package](https://github.com/simply-nicky/streak_finder) - a connection-based streak finding algorithm for convergent beam diffraction patterns.
Input parameters:
* `'do_snr': 1/0` - Specifies whether to do whitefield and std correction, if selected - changes image and mask in place.
* `'cbd_std_data_file': str` - hdf5 data file containing pre-calculated std.
* `'cbd_std_dataset': str` - [optional] dataset containing pre-calculated std, defaults to `'entry/crystallography/std'`.
* `'cbd_whitefield_data_file': str` - hdf5 data file containing pre-calculated white field.
* `'cbd_whitefield_dataset': str` - [optional] dataset containing pre-calculated white field, defaults to `'entry/crystallography/whitefield'`.
* `'cbd_mask_data_file': str` - [optional] hdf5 data file containing pre-calculated mask.
* `'cbd_mask_dataset': str` - [optional] dataset containing pre-calculated mask, defaults to `'entry/instrument/detector/mask'`.
* `'cbd_scale_whitefield': 1/0` - Specifies whether to scale whitefield to signal, useful if intensity jumps.
* `'do_streakfinder_analysis': 1/0` - Specifies whether to execute the streak-finder algorithm.
* `'cbd_peak_structure_radius': int` - Connectivity structure radius for *peaks* detection.
* `'cbd_peak_structure_rank': int` - Connectivity structure rank for *peaks* detection.
* `'cbd_peak_vmin': float` - Peak threshold. All peaks with values lower than ``cbd_peak_vmin`` are discarded.
* `'cbd_npts': int` - Support size threshold. The support structure is a connected set of pixels which
value is above the threshold ``cbd_peak_vmin``. A peak is discarded is the size of support
set is lower than ``cbd_npts``.
* `'cbd_streak_structure_radius': int` - Connectivity structure radius for *streaks* detection.
* `'cbd_streak_structure_rank': int` - Connectivity structure rank for *streaks* detection.
* `'cbd_streak_vmin': float` - Streak threshold. All streaks with values lower than ``cbd_vmin`` are discarded.
* `'cbd_min_size': float` - Minimum number of linelets required in a detected streak.
* `'cbd_xtol': float` - Distance threshold. A new linelet is added to a streak if it's distance to the
streak is no more than ``cbd_xtol``.
* `'cbd_nfa': 1` - Number of false alarms, allowed number of unaligned points in a streak.
* `'cbd_num_threads': int` - Number of threads to use for peak finder algorithm.
* `'cbd_negative_handler': 'mask'/'zero'/'shift'/''` - [optional] Method to handle negative values in converted frames, defaults to `''` (do not handle).
* `'cbd_min_hit_streaks': int` - [optional] Minimum number of discovered streaks to categorize frame as a hit, defaults to 5.
* `'cbd_mask_rois': list[(int, int, int, int)]` - [optional] list of `(y_min, y_max, x_min, x_max)` coordinates of ROIs to mask out during peak finding.
* `'cbd_crop_roi': [int, int, int, int]` - [optional] run streak finder on a cropped image, e.g. one quadrant, for purpose of spedup.
Algorithm Output:
* `'number_of_streaks': int` - Indicates the count of identified streaks.
* `'streak_lengths': list[float]` - Provides the lengths of identified streaks.
* `'bragg_counts': list[float]` - Provides the intensity sum within identified streaks.
* `'streaks': 4*list[float]` - Provides coordinates of the identified streaks: x0, y0, x1, y1.
* `'is_hit_frame': True/False` - Marks whether a frame qualifies as a hit based on the number of identified streaks.
* `'cbd_error': str` - An error message in case the streak finder failed on one of it's stages.
* **Radial Profile Integration**
This algorithm integrates pixel intensities radially based on defined parameters.
@@ -197,6 +239,16 @@ options:
Use the `'apply_additional_mask': 0/1` - Input flag to enable this functionality.
* **Additional Mask from file**
Alternative to previous additional masking, mask data is read from specified file. NumPy and HDF5 formats are supported.
Input parameters:
* `'apply_additional_mask': 1/0` - Input flag to enable this functionality.
* `'mask_file': str` - Path to the hdf5 or npy file with mask data.
* `'mask_dataset': str` [Optional] - Name of the dataset containing mask in the hdf5 file, default is `"mask_data"`.
Algorithm Output:
* `'mask_from_file_applied': 1/0` - Indicates whether the algorithm ran successfully.
* **Filter based on pulse picker information**
If the event propagation capability is accessible for the detector and the pulse picker information is correctly configured for propagation, the filtration based on pulse picker information becomes feasible by using the
@@ -238,7 +290,59 @@ Algorithms use input parameters specified in a JSON file provided to worker.py (
}
```
Example JSON for Convergent-Beam Diffraction Streak-Finder:
```json
{
"beam_center_x": 1119.0,
"beam_center_y": 1068.0,
"detector_distance": 0.092,
"do_peakfinder_analysis": 0,
"beam_energy": 11993.610318642704,
"apply_threshold": 0,
"threshold_min": 0,
"threshold_max": 35,
"apply_aggregation": 0,
"aggregation_max": 2,
"double_pixels": "mask",
"detector_rate": 100,
"do_radial_integration": 0,
"do_spi_analysis": 0,
"threshold_value": "NaN",
"select_only_ppicker_events": 0,
"disabled_modules": [],
"roi_x1": [],
"roi_y1": [],
"roi_x2": [],
"roi_y2": [],
"": 1,
"mask_file": "/sf/jungfrau/config/additional_mask/JF07T32V02.h5",
"mask_dataset": "mask_data",
"do_snr": 0,
"cbd_std_data_file": "/sf/instrument/exp/00m_mustermann/res/aux_data/streakfinder_metadata.h5",
"cbd_std_dataset": "entry/crystallography/std",
"cbd_whitefield_data_file": "/sf/bernina/exp/00m_mustermann/res/aux_data/streakfinder_metadata.h5",
"cbd_whitefield_dataset": "entry/crystallography/whitefield",
"cbd_mask_data_file": "/sf/bernina/exp/00m_mustermann/res/aux_data/JF_mask.h5",
"cbd_mask_dataset": "mask_data",
"cbd_scale_whitefield": 0,
"do_streakfinder_analysis": 1,
"cbd_negative_handler": "zero",
"cbd_peak_structure_radius": 2,
"cbd_peak_structure_rank": 2,
"cbd_peak_vmin": 50,
"cbd_npts": 10,
"cbd_streak_structure_radius": 6,
"cbd_streak_structure_rank": 4,
"cbd_xtol": 2.0,
"cbd_streak_vmin": 30,
"cbd_min_size": 25,
"cbd_nfa": 1,
"cbd_num_threads": 32,
"cbd_crop_roi": [0, 2107, 0, 2216]
}
```
# Acknowledgment
Special thanks to Valerio Mariani for providing the cython implementation of peakfinder8.
Special thanks to Nikolai Ivanov for providing the C++ implementation of streak-finder as well as Lisa Dorofeeva for integrating it.

View File

@@ -1,5 +1,6 @@
from .addmask import calc_apply_additional_mask
from .addmaskfile import calc_apply_additional_mask_from_file
from .aggregation import calc_apply_aggregation
from .jfdata import JFData
from .mask import calc_mask_pixels
@@ -7,6 +8,7 @@ from .peakfind import calc_peakfinder_analysis
from .radprof import calc_radial_integration
from .roi import calc_roi
from .spiana import calc_spi_analysis
from .streakfind import calc_streakfinder_analysis
from .thresh import calc_apply_threshold

36
dap/algos/addmaskfile.py Normal file
View File

@@ -0,0 +1,36 @@
import h5py
import numpy as np
DEFAULT_MASK_DATASET = "mask_data"
def calc_apply_additional_mask_from_file(results, pixel_mask_pf):
apply_additional_mask = results.get("apply_additional_mask_from_file", False)
if not apply_additional_mask:
return
mask_file = results.get("mask_file", None)
if not mask_file:
return
mask_dataset = results.get("mask_dataset", DEFAULT_MASK_DATASET)
# Support for hdf5 and npy
if mask_file.endswith(".npy"):
try:
mask = np.load(mask_file).astype(np.bool)
except Exception as error:
results["mask_error"] = f"Error loading mask data from NumPy file {mask_file}:\n{error}"
return
else:
try:
with h5py.File(mask_file, "r") as h5f:
mask = h5f[mask_dataset][:].astype(np.bool)
except Exception as error:
results["mask_error"] = f"Error loading mask from hdf5 file {mask_file}:\n{error}"
return
try:
np.multiply(pixel_mask_pf, mask, out=pixel_mask_pf)
except Exception as error:
results["mask_error"] = f"Error applying additional mask from file {mask_file}:\n{error}"

View File

@@ -3,6 +3,7 @@ import numpy as np
import jungfrau_utils as ju
from .addmask import calc_apply_additional_mask
from .addmaskfile import calc_apply_additional_mask_from_file
class JFData:
@@ -58,6 +59,7 @@ class JFData:
pixel_mask_pf = np.ascontiguousarray(pixel_mask_corrected)
calc_apply_additional_mask(results, pixel_mask_pf) # changes pixel_mask_pf in place
calc_apply_additional_mask_from_file(results, pixel_mask_pf) # further changes pixel_mask_pf in place
self.id_pixel_mask_corrected = new_id_pixel_mask_corrected
self.pixel_mask_pf = pixel_mask_pf

209
dap/algos/streakfind.py Normal file
View File

@@ -0,0 +1,209 @@
"""
Streak Finder algorithm implemented by CFEL Chapman group
Requires Convergent beam streak finder package installed:
https://github.com/simply-nicky/streak_finder/
(note g++ 11 required for building, numpy 2+ required)
"""
import h5py
import numpy as np
from streak_finder import CrystData
from streak_finder.label import Structure2D
DEFAULT_NUM_THREADS = 16
DEFAULT_MIN_HIT_STREAKS = 5
def _handle_negative_values(data, mask, handler: str):
if not handler or np.all(data>=0):
return
if handler == "shift":
# Shift to min=0
data -= np.min(data)
elif handler == "mask":
mask[data<0] = np.nan
elif handler == "zero":
data[data<0] = 0
def calc_streakfinder_analysis(results, data, pf_pixel_mask):
do_snr = results.get("do_snr", False)
do_streakfinder_analysis = results.get("do_streakfinder_analysis", False)
if not do_snr and not do_streakfinder_analysis:
return data
negative_val_handler = results.get("cbd_negative_handler", "")
_handle_negative_values(data, pf_pixel_mask, negative_val_handler)
try:
cryst_data = _generate_cryst_data(results, data, pf_pixel_mask)
except Exception as error: # Broad exception - we don't want to break anything here
results["cbd_error"] = f"Error processing CBD data:\n{error}"
return data
if do_snr:
# Changes data and mask in-place
data = cryst_data.snr[0].copy()
try:
_calc_streakfinder_analysis(results, cryst_data)
except Exception as error: # Broad exception - we don't want to break anything here
results["cbd_error"] = f"Error processing CBD data:\n{error}"
return data
def _generate_cryst_data(results, data, pf_pixel_mask) -> CrystData:
params_required = [
"cbd_whitefield_data_file",
"cbd_std_data_file",
"cbd_scale_whitefield", # Bool
]
if not all(param in results.keys() for param in params_required):
raise ValueError(f"ERROR: Not enough parameters for CBD correction. Skipping\n"
f"{params_required=}")
whitefield_data_file = results["cbd_whitefield_data_file"]
std_data_file = results["cbd_std_data_file"]
scale_whitefield = results["cbd_scale_whitefield"]
# Using CXI Store specification as default
whitefield_dataset = results.get("cbd_whitefield_dataset", "entry/crystallography/whitefield")
std_dataset = results.get("cbd_std_dataset", "entry/crystallography/std")
num_threads = results.get("cbd_num_threads", DEFAULT_NUM_THREADS)
with h5py.File(whitefield_data_file, "r") as hf:
whitefield = hf[whitefield_dataset][:]
with h5py.File(std_data_file, "r") as hf:
std = hf[std_dataset][:]
mask_data_file = results.get("cbd_mask_data_file", None)
if mask_data_file is None:
mask = pf_pixel_mask
else:
mask_dataset = results.get("cbd_mask_dataset", "entry/instrument/detector/mask")
with h5py.File(mask_data_file, "r") as hf:
mask = hf[mask_dataset][:].astype(np.bool)
mask *= pf_pixel_mask
data = CrystData(
data=data[np.newaxis, :],
mask=mask,
std=std,
whitefield=whitefield
)
if scale_whitefield:
data = data.scale_whitefield(method='median', num_threads=num_threads)
data = data.update_snr()
return data
def _calc_streakfinder_analysis(results, cryst_data: CrystData):
do_streakfinder_analysis = results.get("do_streakfinder_analysis", False)
if not do_streakfinder_analysis:
return
params_required = [
"cbd_peak_structure_radius",
"cbd_peak_structure_rank",
"cbd_streak_structure_radius",
"cbd_streak_structure_rank",
"cbd_peak_vmin",
"cbd_streak_vmin",
"cbd_min_size",
"cbd_npts",
"cbd_xtol",
"cbd_nfa",
"cbd_num_threads",
]
if not all(param in results.keys() for param in params_required):
print(f"ERROR: Not enough parameters for streak finder analysis. Skipping.\n"
f"{params_required=}")
return
peak_structure_radius = results["cbd_peak_structure_radius"] # peak
peak_structure_rank = results["cbd_peak_structure_rank"]
streak_structure_radius = results["cbd_streak_structure_radius"] # streak
streak_structure_rank = results["cbd_streak_structure_rank"]
peak_vmin = results["cbd_peak_vmin"] # peak
streak_vmin = results["cbd_streak_vmin"] # streak
min_size = results["cbd_min_size"]
npts = results["cbd_npts"]
xtol = results["cbd_xtol"]
nfa = results["cbd_nfa"]
num_threads = results["cbd_num_threads"]
min_hit_streaks = results.get("cbd_min_hit_streaks", DEFAULT_MIN_HIT_STREAKS)
x_center = results.get("beam_center_x", None)
y_center = results.get("beam_center_y", None)
mask_rois = results.get("cbd_mask_rois", []) # list of [y_min, y_max, x_min, x_max]
for mask_roi in mask_rois:
cryst_data = cryst_data.mask_region(mask_roi)
crop_roi = results.get("cbd_crop_roi", None) # [y_min, y_max, x_min, x_max]
if crop_roi is not None:
crop_roi_t = [crop_roi[2], crop_roi[3], crop_roi[0], crop_roi[1]]# y0, y1, x0, x1
cryst_data = cryst_data.crop(crop_roi_t)
peaks_structure = Structure2D(peak_structure_radius, peak_structure_rank)
streaks_structure = Structure2D(streak_structure_radius, streak_structure_rank)
det_obj = cryst_data.streak_detector(streaks_structure)
peaks = det_obj.detect_peaks(peak_vmin, npts, peaks_structure, num_threads)
detected = det_obj.detect_streaks(peaks, xtol, streak_vmin, min_size, nfa=nfa,
num_threads=num_threads)
if isinstance(detected, list):
detected = detected[0]
if not detected.streaks:
results["number_of_streaks"] = 0
results["is_hit_frame"] = False
results["streaks"] = []
results["streak_lengths"] = []
results["bragg_counts"] = []
return
streaks = det_obj.to_streaks(detected)
detected_streaks = np.array(detected.streaks)
streak_lines = streaks.lines
# Adjust to crop region
if crop_roi is not None:
shift = [crop_roi[0], crop_roi[2], crop_roi[0], crop_roi[2]]
streak_lines = streak_lines + shift
if x_center is not None and y_center is not None:
if crop_roi is not None:
x_center -= crop_roi[0]
y_center -= crop_roi[2]
streaks_mask = streaks.concentric_only(x_center, y_center)
streak_lines = streak_lines[streaks_mask]
detected_streaks = detected_streaks[streaks_mask]
streak_lengths = np.sqrt(
np.pow((streak_lines[..., 2] - streak_lines[..., 0]), 2) +
np.pow((streak_lines[..., 2] - streak_lines[..., 0]), 2)
).tolist()
streak_lines = streak_lines.T
_, number_of_streaks = streak_lines.shape
list_result = streak_lines.tolist() # arr(4, n_lines); 0coord x0, y0, x1, y1
bragg_counts = [streak.total_mass() for streak in detected_streaks]
results["number_of_streaks"] = number_of_streaks
results["is_hit_frame"] = (number_of_streaks > min_hit_streaks)
results["streaks"] = list_result
results["streak_lengths"] = streak_lengths
results["bragg_counts"] = bragg_counts

View File

@@ -2,7 +2,10 @@ import argparse
import numpy as np
from algos import calc_apply_aggregation, calc_apply_threshold, calc_mask_pixels, calc_peakfinder_analysis, calc_radial_integration, calc_roi, calc_spi_analysis, JFData
from algos import (
calc_apply_aggregation, calc_apply_threshold, calc_mask_pixels, calc_peakfinder_analysis,
calc_radial_integration, calc_roi, calc_spi_analysis, calc_streakfinder_analysis, JFData
)
from utils import Aggregator, BufferedJSON, randskip, read_bit
from zmqsocks import ZMQSockets
@@ -115,6 +118,11 @@ def work(backend_address, accumulator_host, accumulator_port, visualisation_host
calc_peakfinder_analysis(results, pfimage, pixel_mask_pf)
# ???
# Streak finder processing for convergent-beam diffraction experiments
# changes image and mask in place if do_snr=True in parameters file
image = calc_streakfinder_analysis(results, image, pixel_mask_pf)
image, aggregation_is_ready = calc_apply_aggregation(results, image, pixel_mask_pf, aggregator)
results["type"] = str(image.dtype)