diff --git a/README.md b/README.md index edb0f89..227ba92 100644 --- a/README.md +++ b/README.md @@ -105,6 +105,48 @@ options: * `'spot_x/spot_y/spot_intensity': 3*list[float]` - Provides coordinates and intensity of the identified peaks within the frame. * `'is_hit_frame': True/False` - Marks whether a frame qualifies as a hit based on the number of identified peaks exceeding the defined threshold. + * **streakfinder Algorithm** + + This algorithm is using [streak-finder package](https://github.com/simply-nicky/streak_finder) - a connection-based streak finding algorithm for convergent beam diffraction patterns. + + Input parameters: + * `'do_snr': 1/0` - Specifies whether to do whitefield and std correction, if selected - changes image and mask in place. + * `'cbd_std_data_file': str` - hdf5 data file containing pre-calculated std. + * `'cbd_std_dataset': str` - [optional] dataset containing pre-calculated std, defaults to `'entry/crystallography/std'`. + * `'cbd_whitefield_data_file': str` - hdf5 data file containing pre-calculated white field. + * `'cbd_whitefield_dataset': str` - [optional] dataset containing pre-calculated white field, defaults to `'entry/crystallography/whitefield'`. + * `'cbd_mask_data_file': str` - [optional] hdf5 data file containing pre-calculated mask. + * `'cbd_mask_dataset': str` - [optional] dataset containing pre-calculated mask, defaults to `'entry/instrument/detector/mask'`. + * `'cbd_scale_whitefield': 1/0` - Specifies whether to scale whitefield to signal, useful if intensity jumps. + + * `'do_streakfinder_analysis': 1/0` - Specifies whether to execute the streak-finder algorithm. + * `'cbd_peak_structure_radius': int` - Connectivity structure radius for *peaks* detection. + * `'cbd_peak_structure_rank': int` - Connectivity structure rank for *peaks* detection. + * `'cbd_peak_vmin': float` - Peak threshold. All peaks with values lower than ``cbd_peak_vmin`` are discarded. + * `'cbd_npts': int` - Support size threshold. The support structure is a connected set of pixels which + value is above the threshold ``cbd_peak_vmin``. A peak is discarded is the size of support + set is lower than ``cbd_npts``. + * `'cbd_streak_structure_radius': int` - Connectivity structure radius for *streaks* detection. + * `'cbd_streak_structure_rank': int` - Connectivity structure rank for *streaks* detection. + * `'cbd_streak_vmin': float` - Streak threshold. All streaks with values lower than ``cbd_vmin`` are discarded. + * `'cbd_min_size': float` - Minimum number of linelets required in a detected streak. + * `'cbd_xtol': float` - Distance threshold. A new linelet is added to a streak if it's distance to the + streak is no more than ``cbd_xtol``. + * `'cbd_nfa': 1` - Number of false alarms, allowed number of unaligned points in a streak. + * `'cbd_num_threads': int` - Number of threads to use for peak finder algorithm. + * `'cbd_negative_handler': 'mask'/'zero'/'shift'/''` - [optional] Method to handle negative values in converted frames, defaults to `''` (do not handle). + * `'cbd_min_hit_streaks': int` - [optional] Minimum number of discovered streaks to categorize frame as a hit, defaults to 5. + * `'cbd_mask_rois': list[(int, int, int, int)]` - [optional] list of `(y_min, y_max, x_min, x_max)` coordinates of ROIs to mask out during peak finding. + * `'cbd_crop_roi': [int, int, int, int]` - [optional] run streak finder on a cropped image, e.g. one quadrant, for purpose of spedup. + + Algorithm Output: + * `'number_of_streaks': int` - Indicates the count of identified streaks. + * `'streak_lengths': list[float]` - Provides the lengths of identified streaks. + * `'bragg_counts': list[float]` - Provides the intensity sum within identified streaks. + * `'streaks': 4*list[float]` - Provides coordinates of the identified streaks: x0, y0, x1, y1. + * `'is_hit_frame': True/False` - Marks whether a frame qualifies as a hit based on the number of identified streaks. + * `'cbd_error': str` - An error message in case the streak finder failed on one of it's stages. + * **Radial Profile Integration** This algorithm integrates pixel intensities radially based on defined parameters. @@ -197,6 +239,16 @@ options: Use the `'apply_additional_mask': 0/1` - Input flag to enable this functionality. + * **Additional Mask from file** + + Alternative to previous additional masking, mask data is read from specified file. NumPy and HDF5 formats are supported. + Input parameters: + * `'apply_additional_mask': 1/0` - Input flag to enable this functionality. + * `'mask_file': str` - Path to the hdf5 or npy file with mask data. + * `'mask_dataset': str` [Optional] - Name of the dataset containing mask in the hdf5 file, default is `"mask_data"`. + Algorithm Output: + * `'mask_from_file_applied': 1/0` - Indicates whether the algorithm ran successfully. + * **Filter based on pulse picker information** If the event propagation capability is accessible for the detector and the pulse picker information is correctly configured for propagation, the filtration based on pulse picker information becomes feasible by using the @@ -238,7 +290,59 @@ Algorithms use input parameters specified in a JSON file provided to worker.py ( } ``` +Example JSON for Convergent-Beam Diffraction Streak-Finder: + ```json + { + "beam_center_x": 1119.0, + "beam_center_y": 1068.0, + "detector_distance": 0.092, + "do_peakfinder_analysis": 0, + "beam_energy": 11993.610318642704, + "apply_threshold": 0, + "threshold_min": 0, + "threshold_max": 35, + "apply_aggregation": 0, + "aggregation_max": 2, + "double_pixels": "mask", + "detector_rate": 100, + "do_radial_integration": 0, + "do_spi_analysis": 0, + "threshold_value": "NaN", + "select_only_ppicker_events": 0, + "disabled_modules": [], + "roi_x1": [], + "roi_y1": [], + "roi_x2": [], + "roi_y2": [], + "": 1, + "mask_file": "/sf/jungfrau/config/additional_mask/JF07T32V02.h5", + "mask_dataset": "mask_data", + "do_snr": 0, + "cbd_std_data_file": "/sf/instrument/exp/00m_mustermann/res/aux_data/streakfinder_metadata.h5", + "cbd_std_dataset": "entry/crystallography/std", + "cbd_whitefield_data_file": "/sf/bernina/exp/00m_mustermann/res/aux_data/streakfinder_metadata.h5", + "cbd_whitefield_dataset": "entry/crystallography/whitefield", + "cbd_mask_data_file": "/sf/bernina/exp/00m_mustermann/res/aux_data/JF_mask.h5", + "cbd_mask_dataset": "mask_data", + "cbd_scale_whitefield": 0, + "do_streakfinder_analysis": 1, + "cbd_negative_handler": "zero", + "cbd_peak_structure_radius": 2, + "cbd_peak_structure_rank": 2, + "cbd_peak_vmin": 50, + "cbd_npts": 10, + "cbd_streak_structure_radius": 6, + "cbd_streak_structure_rank": 4, + "cbd_xtol": 2.0, + "cbd_streak_vmin": 30, + "cbd_min_size": 25, + "cbd_nfa": 1, + "cbd_num_threads": 32, + "cbd_crop_roi": [0, 2107, 0, 2216] +} + ``` # Acknowledgment Special thanks to Valerio Mariani for providing the cython implementation of peakfinder8. +Special thanks to Nikolai Ivanov for providing the C++ implementation of streak-finder as well as Lisa Dorofeeva for integrating it. diff --git a/dap/algos/__init__.py b/dap/algos/__init__.py index 2b5f92e..d5e8f0b 100644 --- a/dap/algos/__init__.py +++ b/dap/algos/__init__.py @@ -1,5 +1,6 @@ from .addmask import calc_apply_additional_mask +from .addmaskfile import calc_apply_additional_mask_from_file from .aggregation import calc_apply_aggregation from .jfdata import JFData from .mask import calc_mask_pixels @@ -7,6 +8,7 @@ from .peakfind import calc_peakfinder_analysis from .radprof import calc_radial_integration from .roi import calc_roi from .spiana import calc_spi_analysis +from .streakfind import calc_streakfinder_analysis from .thresh import calc_apply_threshold diff --git a/dap/algos/addmaskfile.py b/dap/algos/addmaskfile.py new file mode 100644 index 0000000..4f82ee5 --- /dev/null +++ b/dap/algos/addmaskfile.py @@ -0,0 +1,36 @@ +import h5py +import numpy as np + + +DEFAULT_MASK_DATASET = "mask_data" + + +def calc_apply_additional_mask_from_file(results, pixel_mask_pf): + apply_additional_mask = results.get("apply_additional_mask_from_file", False) + if not apply_additional_mask: + return + + mask_file = results.get("mask_file", None) + if not mask_file: + return + mask_dataset = results.get("mask_dataset", DEFAULT_MASK_DATASET) + + # Support for hdf5 and npy + if mask_file.endswith(".npy"): + try: + mask = np.load(mask_file).astype(np.bool) + except Exception as error: + results["mask_error"] = f"Error loading mask data from NumPy file {mask_file}:\n{error}" + return + else: + try: + with h5py.File(mask_file, "r") as h5f: + mask = h5f[mask_dataset][:].astype(np.bool) + except Exception as error: + results["mask_error"] = f"Error loading mask from hdf5 file {mask_file}:\n{error}" + return + + try: + np.multiply(pixel_mask_pf, mask, out=pixel_mask_pf) + except Exception as error: + results["mask_error"] = f"Error applying additional mask from file {mask_file}:\n{error}" diff --git a/dap/algos/jfdata.py b/dap/algos/jfdata.py index bd2fa1e..1a578bf 100644 --- a/dap/algos/jfdata.py +++ b/dap/algos/jfdata.py @@ -3,6 +3,7 @@ import numpy as np import jungfrau_utils as ju from .addmask import calc_apply_additional_mask +from .addmaskfile import calc_apply_additional_mask_from_file class JFData: @@ -58,6 +59,7 @@ class JFData: pixel_mask_pf = np.ascontiguousarray(pixel_mask_corrected) calc_apply_additional_mask(results, pixel_mask_pf) # changes pixel_mask_pf in place + calc_apply_additional_mask_from_file(results, pixel_mask_pf) # further changes pixel_mask_pf in place self.id_pixel_mask_corrected = new_id_pixel_mask_corrected self.pixel_mask_pf = pixel_mask_pf diff --git a/dap/algos/streakfind.py b/dap/algos/streakfind.py new file mode 100644 index 0000000..fc91a28 --- /dev/null +++ b/dap/algos/streakfind.py @@ -0,0 +1,209 @@ +""" +Streak Finder algorithm implemented by CFEL Chapman group + +Requires Convergent beam streak finder package installed: + +https://github.com/simply-nicky/streak_finder/ +(note g++ 11 required for building, numpy 2+ required) +""" +import h5py +import numpy as np +from streak_finder import CrystData +from streak_finder.label import Structure2D + +DEFAULT_NUM_THREADS = 16 +DEFAULT_MIN_HIT_STREAKS = 5 + + +def _handle_negative_values(data, mask, handler: str): + if not handler or np.all(data>=0): + return + if handler == "shift": + # Shift to min=0 + data -= np.min(data) + elif handler == "mask": + mask[data<0] = np.nan + elif handler == "zero": + data[data<0] = 0 + + +def calc_streakfinder_analysis(results, data, pf_pixel_mask): + do_snr = results.get("do_snr", False) + do_streakfinder_analysis = results.get("do_streakfinder_analysis", False) + if not do_snr and not do_streakfinder_analysis: + return data + + negative_val_handler = results.get("cbd_negative_handler", "") + _handle_negative_values(data, pf_pixel_mask, negative_val_handler) + + try: + cryst_data = _generate_cryst_data(results, data, pf_pixel_mask) + except Exception as error: # Broad exception - we don't want to break anything here + results["cbd_error"] = f"Error processing CBD data:\n{error}" + return data + + if do_snr: + # Changes data and mask in-place + data = cryst_data.snr[0].copy() + + try: + _calc_streakfinder_analysis(results, cryst_data) + except Exception as error: # Broad exception - we don't want to break anything here + results["cbd_error"] = f"Error processing CBD data:\n{error}" + + return data + +def _generate_cryst_data(results, data, pf_pixel_mask) -> CrystData: + params_required = [ + "cbd_whitefield_data_file", + "cbd_std_data_file", + "cbd_scale_whitefield", # Bool + ] + + if not all(param in results.keys() for param in params_required): + raise ValueError(f"ERROR: Not enough parameters for CBD correction. Skipping\n" + f"{params_required=}") + + whitefield_data_file = results["cbd_whitefield_data_file"] + std_data_file = results["cbd_std_data_file"] + scale_whitefield = results["cbd_scale_whitefield"] + + # Using CXI Store specification as default + whitefield_dataset = results.get("cbd_whitefield_dataset", "entry/crystallography/whitefield") + std_dataset = results.get("cbd_std_dataset", "entry/crystallography/std") + + num_threads = results.get("cbd_num_threads", DEFAULT_NUM_THREADS) + + with h5py.File(whitefield_data_file, "r") as hf: + whitefield = hf[whitefield_dataset][:] + + with h5py.File(std_data_file, "r") as hf: + std = hf[std_dataset][:] + + mask_data_file = results.get("cbd_mask_data_file", None) + if mask_data_file is None: + mask = pf_pixel_mask + else: + mask_dataset = results.get("cbd_mask_dataset", "entry/instrument/detector/mask") + with h5py.File(mask_data_file, "r") as hf: + mask = hf[mask_dataset][:].astype(np.bool) + mask *= pf_pixel_mask + + data = CrystData( + data=data[np.newaxis, :], + mask=mask, + std=std, + whitefield=whitefield + ) + if scale_whitefield: + data = data.scale_whitefield(method='median', num_threads=num_threads) + + data = data.update_snr() + return data + +def _calc_streakfinder_analysis(results, cryst_data: CrystData): + do_streakfinder_analysis = results.get("do_streakfinder_analysis", False) + if not do_streakfinder_analysis: + return + + params_required = [ + "cbd_peak_structure_radius", + "cbd_peak_structure_rank", + "cbd_streak_structure_radius", + "cbd_streak_structure_rank", + "cbd_peak_vmin", + "cbd_streak_vmin", + "cbd_min_size", + "cbd_npts", + "cbd_xtol", + "cbd_nfa", + "cbd_num_threads", + ] + + if not all(param in results.keys() for param in params_required): + print(f"ERROR: Not enough parameters for streak finder analysis. Skipping.\n" + f"{params_required=}") + return + + peak_structure_radius = results["cbd_peak_structure_radius"] # peak + peak_structure_rank = results["cbd_peak_structure_rank"] + streak_structure_radius = results["cbd_streak_structure_radius"] # streak + streak_structure_rank = results["cbd_streak_structure_rank"] + peak_vmin = results["cbd_peak_vmin"] # peak + streak_vmin = results["cbd_streak_vmin"] # streak + min_size = results["cbd_min_size"] + npts = results["cbd_npts"] + xtol = results["cbd_xtol"] + nfa = results["cbd_nfa"] + num_threads = results["cbd_num_threads"] + + min_hit_streaks = results.get("cbd_min_hit_streaks", DEFAULT_MIN_HIT_STREAKS) + + x_center = results.get("beam_center_x", None) + y_center = results.get("beam_center_y", None) + + mask_rois = results.get("cbd_mask_rois", []) # list of [y_min, y_max, x_min, x_max] + + for mask_roi in mask_rois: + cryst_data = cryst_data.mask_region(mask_roi) + + crop_roi = results.get("cbd_crop_roi", None) # [y_min, y_max, x_min, x_max] + + if crop_roi is not None: + crop_roi_t = [crop_roi[2], crop_roi[3], crop_roi[0], crop_roi[1]]# y0, y1, x0, x1 + cryst_data = cryst_data.crop(crop_roi_t) + + peaks_structure = Structure2D(peak_structure_radius, peak_structure_rank) + streaks_structure = Structure2D(streak_structure_radius, streak_structure_rank) + + + det_obj = cryst_data.streak_detector(streaks_structure) + peaks = det_obj.detect_peaks(peak_vmin, npts, peaks_structure, num_threads) + detected = det_obj.detect_streaks(peaks, xtol, streak_vmin, min_size, nfa=nfa, + num_threads=num_threads) + + if isinstance(detected, list): + detected = detected[0] + + if not detected.streaks: + results["number_of_streaks"] = 0 + results["is_hit_frame"] = False + results["streaks"] = [] + results["streak_lengths"] = [] + results["bragg_counts"] = [] + return + + streaks = det_obj.to_streaks(detected) + detected_streaks = np.array(detected.streaks) + streak_lines = streaks.lines + + # Adjust to crop region + if crop_roi is not None: + shift = [crop_roi[0], crop_roi[2], crop_roi[0], crop_roi[2]] + streak_lines = streak_lines + shift + + if x_center is not None and y_center is not None: + if crop_roi is not None: + x_center -= crop_roi[0] + y_center -= crop_roi[2] + streaks_mask = streaks.concentric_only(x_center, y_center) + streak_lines = streak_lines[streaks_mask] + detected_streaks = detected_streaks[streaks_mask] + + streak_lengths = np.sqrt( + np.pow((streak_lines[..., 2] - streak_lines[..., 0]), 2) + + np.pow((streak_lines[..., 2] - streak_lines[..., 0]), 2) + ).tolist() + + streak_lines = streak_lines.T + _, number_of_streaks = streak_lines.shape + + list_result = streak_lines.tolist() # arr(4, n_lines); 0coord x0, y0, x1, y1 + + bragg_counts = [streak.total_mass() for streak in detected_streaks] + + results["number_of_streaks"] = number_of_streaks + results["is_hit_frame"] = (number_of_streaks > min_hit_streaks) + results["streaks"] = list_result + results["streak_lengths"] = streak_lengths + results["bragg_counts"] = bragg_counts diff --git a/dap/worker.py b/dap/worker.py index d32f0d4..23a4de5 100644 --- a/dap/worker.py +++ b/dap/worker.py @@ -2,7 +2,10 @@ import argparse import numpy as np -from algos import calc_apply_aggregation, calc_apply_threshold, calc_mask_pixels, calc_peakfinder_analysis, calc_radial_integration, calc_roi, calc_spi_analysis, JFData +from algos import ( + calc_apply_aggregation, calc_apply_threshold, calc_mask_pixels, calc_peakfinder_analysis, + calc_radial_integration, calc_roi, calc_spi_analysis, calc_streakfinder_analysis, JFData +) from utils import Aggregator, BufferedJSON, randskip, read_bit from zmqsocks import ZMQSockets @@ -115,6 +118,11 @@ def work(backend_address, accumulator_host, accumulator_port, visualisation_host calc_peakfinder_analysis(results, pfimage, pixel_mask_pf) # ??? + + # Streak finder processing for convergent-beam diffraction experiments + # changes image and mask in place if do_snr=True in parameters file + image = calc_streakfinder_analysis(results, image, pixel_mask_pf) + image, aggregation_is_ready = calc_apply_aggregation(results, image, pixel_mask_pf, aggregator) results["type"] = str(image.dtype)