diff --git a/dap/algos/__init__.py b/dap/algos/__init__.py index 1ca0eec..f30eec4 100644 --- a/dap/algos/__init__.py +++ b/dap/algos/__init__.py @@ -1,6 +1,7 @@ from .addmask import calc_apply_additional_mask from .jfdata import JFData +from .forcesend import calc_force_send from .mask import calc_mask_pixels from .peakfind import calc_peakfinder_analysis from .radprof import calc_radial_integration diff --git a/dap/algos/forcesend.py b/dap/algos/forcesend.py new file mode 100644 index 0000000..f336523 --- /dev/null +++ b/dap/algos/forcesend.py @@ -0,0 +1,40 @@ +import numpy as np + + +def calc_force_send(results, data, pixel_mask_pf, image, n_aggregated_images, data_summed): + force_send_visualisation = False + if data.dtype != np.uint16: + apply_threshold = results.get("apply_threshold", False) + apply_aggregation = results.get("apply_aggregation", False) + if not apply_aggregation: + data_summed = None + n_aggregated_images = 1 + if apply_threshold or apply_aggregation: + if apply_threshold and all(k in results for k in ("threshold_min", "threshold_max")): + threshold_min = float(results["threshold_min"]) + threshold_max = float(results["threshold_max"]) + data[data < threshold_min] = 0.0 + if threshold_max > threshold_min: + data[data > threshold_max] = 0.0 + if apply_aggregation and "aggregation_max" in results: + if data_summed is not None: + data += data_summed + n_aggregated_images += 1 + data_summed = data.copy() + data_summed[data == -np.nan] = -np.nan #TODO: this does nothing + results["aggregated_images"] = n_aggregated_images + results["worker"] = 1 #TODO: keep this for backwards compatibility? + if n_aggregated_images >= results["aggregation_max"]: + force_send_visualisation = True + data_summed = None + n_aggregated_images = 1 + if pixel_mask_pf is not None: + data[~pixel_mask_pf] = np.nan + + else: + data = image + + return data, force_send_visualisation, n_aggregated_images, data_summed + + + diff --git a/dap/worker.py b/dap/worker.py index 1a402be..77cae96 100644 --- a/dap/worker.py +++ b/dap/worker.py @@ -3,7 +3,7 @@ from random import randint import numpy as np -from algos import calc_apply_threshold, calc_mask_pixels, calc_peakfinder_analysis, calc_radial_integration, calc_roi, calc_spi_analysis, JFData +from algos import calc_apply_threshold, calc_force_send, calc_mask_pixels, calc_peakfinder_analysis, calc_radial_integration, calc_roi, calc_spi_analysis, JFData from utils import BufferedJSON, read_bit from zmqsocks import ZMQSockets @@ -118,37 +118,7 @@ def work(backend_address, accumulator_host, accumulator_port, visualisation_host calc_peakfinder_analysis(results, pfdata, pixel_mask_pf) # ??? - force_send_visualisation = False - if data.dtype != np.uint16: - apply_threshold = results.get("apply_threshold", False) - apply_aggregation = results.get("apply_aggregation", False) - if not apply_aggregation: - data_summed = None - n_aggregated_images = 1 - if apply_threshold or apply_aggregation: - if apply_threshold and all(k in results for k in ("threshold_min", "threshold_max")): - threshold_min = float(results["threshold_min"]) - threshold_max = float(results["threshold_max"]) - data[data < threshold_min] = 0.0 - if threshold_max > threshold_min: - data[data > threshold_max] = 0.0 - if apply_aggregation and "aggregation_max" in results: - if data_summed is not None: - data += data_summed - n_aggregated_images += 1 - data_summed = data.copy() - data_summed[data == -np.nan] = -np.nan #TODO: this does nothing - results["aggregated_images"] = n_aggregated_images - results["worker"] = 1 #TODO: keep this for backwards compatibility? - if n_aggregated_images >= results["aggregation_max"]: - force_send_visualisation = True - data_summed = None - n_aggregated_images = 1 - if pixel_mask_pf is not None: - data[~pixel_mask_pf] = np.nan - - else: - data = image + data, force_send_visualisation, n_aggregated_images, data_summed = calc_force_send(results, data, pixel_mask_pf, image, n_aggregated_images, data_summed) results["type"] = str(data.dtype) results["shape"] = data.shape @@ -157,6 +127,8 @@ def work(backend_address, accumulator_host, accumulator_port, visualisation_host zmq_socks.send_accumulator(results) + apply_aggregation = results.get("apply_aggregation", False) + send_empty_cond1 = (apply_aggregation and "aggregation_max" in results and not force_send_visualisation) send_empty_cond2 = (not results["is_good_frame"] or not (results["is_hit_frame"] or randint(1, skip_frames_rate) == 1))