diff --git a/dap/worker.py b/dap/worker.py index 2260e92..ad27ef2 100644 --- a/dap/worker.py +++ b/dap/worker.py @@ -35,7 +35,6 @@ def prepare_radial_profile(data, center, keep_pixels=None): def main(): - parser = argparse.ArgumentParser() parser.add_argument("--backend", default=None, help="backend address") @@ -46,26 +45,26 @@ def main(): parser.add_argument("--peakfinder_parameters", default=None, help="json file with peakfinder parameters") parser.add_argument("--skip_frames_rate", default=1, type=int, help="send to streamvis each of skip_frames_rate frames") - args = parser.parse_args() + clargs = parser.parse_args() - if args.backend: - BACKEND_ADDRESS = args.backend + if clargs.backend: + BACKEND_ADDRESS = clargs.backend else: raise SystemExit("no backend address defined") - FA_HOST_ACCUMULATE = args.accumulator - FA_PORT_ACCUMULATE = args.accumulator_port - FA_HOST_VISUALISATION = args.visualisation - FA_PORT_VISUALISATION = args.visualisation_port + FA_HOST_ACCUMULATE = clargs.accumulator + FA_PORT_ACCUMULATE = clargs.accumulator_port + FA_HOST_VISUALISATION = clargs.visualisation + FA_PORT_VISUALISATION = clargs.visualisation_port - skip_frames_rate = args.skip_frames_rate + skip_frames_rate = clargs.skip_frames_rate peakfinder_parameters = {} peakfinder_parameters_time = -1 - if args.peakfinder_parameters is not None and os.path.exists(args.peakfinder_parameters): - with open(args.peakfinder_parameters, "r") as read_file: + if clargs.peakfinder_parameters is not None and os.path.exists(clargs.peakfinder_parameters): + with open(clargs.peakfinder_parameters, "r") as read_file: peakfinder_parameters = json.load(read_file) - peakfinder_parameters_time = os.path.getmtime(args.peakfinder_parameters) + peakfinder_parameters_time = os.path.getmtime(clargs.peakfinder_parameters) pulseid = 0 @@ -85,10 +84,10 @@ def main(): poller.register(backend_socket, zmq.POLLIN) accumulator_socket = zmq_context.socket(zmq.PUSH) - accumulator_socket.connect('tcp://%s:%s' % (FA_HOST_ACCUMULATE, FA_PORT_ACCUMULATE) ) + accumulator_socket.connect(f"tcp://{FA_HOST_ACCUMULATE}:{FA_PORT_ACCUMULATE}") visualisation_socket = zmq_context.socket(zmq.PUB) - visualisation_socket.connect('tcp://%s:%s' % (FA_HOST_VISUALISATION, FA_PORT_VISUALISATION) ) + visualisation_socket.connect(f"tcp://{FA_HOST_VISUALISATION}:{FA_PORT_VISUALISATION}") # in case of problem with communication to visualisation, keep in 0mq buffer only few messages visualisation_socket.set_hwm(10) @@ -112,62 +111,62 @@ def main(): # check if peakfinder parameters changed and then re-read it try: if peakfinder_parameters_time > 0: - new_time = os.path.getmtime(args.peakfinder_parameters) + new_time = os.path.getmtime(clargs.peakfinder_parameters) if ( new_time - peakfinder_parameters_time ) > 2.0: old_peakfinder_parameters = peakfinder_parameters sleep(0.5) - with open(args.peakfinder_parameters, "r") as read_file: + with open(clargs.peakfinder_parameters, "r") as read_file: peakfinder_parameters = json.load(read_file) peakfinder_parameters_time = new_time center_radial_integration = None if worker == 0: - print(f'({pulseid}) update peakfinder parameters {old_peakfinder_parameters}', flush=True) - print(f' --> {peakfinder_parameters}', flush=True) + print(f"({pulseid}) update peakfinder parameters {old_peakfinder_parameters}", flush=True) + print(f" --> {peakfinder_parameters}", flush=True) print("",flush=True) except Exception as e: - print(f'({pulseid}) problem ({e}) to read peakfinder parameters file, worker : {worker}', flush=True) + print(f"({pulseid}) problem ({e}) to read peakfinder parameters file, worker : {worker}", flush=True) events = dict(poller.poll(2000)) # check every 2 seconds in each worker if backend_socket in events: metadata = backend_socket.recv_json(FLAGS) image = backend_socket.recv(FLAGS, copy=False, track=False) - image = np.frombuffer(image, dtype=metadata['type']).reshape(metadata['shape']) + image = np.frombuffer(image, dtype=metadata["type"]).reshape(metadata["shape"]) results = copy(metadata) - if results['shape'][0] == 2 and results['shape'][1] == 2: + if results["shape"][0] == 2 and results["shape"][1] == 2: continue - pulseid = results.get('pulse_id', 0) + pulseid = results.get("pulse_id", 0) results.update(peakfinder_parameters) - detector = results.get('detector_name', "") + detector = results.get("detector_name", "") - results['laser_on'] = False - results['number_of_spots'] = 0 - results['is_hit_frame'] = False + results["laser_on"] = False + results["number_of_spots"] = 0 + results["is_hit_frame"] = False daq_rec = results.get("daq_rec",0) - event_laser = bool((daq_rec>>16)&1) - event_darkshot = bool((daq_rec>>17)&1) - event_fel = bool((daq_rec>>18)&1) - event_ppicker = bool((daq_rec>>19)&1) + event_laser = bool((daq_rec >> 16) & 1) + event_darkshot = bool((daq_rec >> 17) & 1) + event_fel = bool((daq_rec >> 18) & 1) + event_ppicker = bool((daq_rec >> 19) & 1) # if not event_darkshot: - results['laser_on'] = event_laser + results["laser_on"] = event_laser # Filter only ppicker events, if requested; skipping all other events - select_only_ppicker_events = results.get('select_only_ppicker_events', 0) + select_only_ppicker_events = results.get("select_only_ppicker_events", 0) if select_only_ppicker_events and not event_ppicker: continue # special settings for p20270, only few shots were opened by pulse-picker # if detector in ["JF06T32V02"]: # if event_ppicker: -# results['number_of_spots'] = 50 -# results['is_hit_frame'] = True +# results["number_of_spots"] = 50 +# results["is_hit_frame"] = True - double_pixels = results.get('double_pixels', "mask") + double_pixels = results.get("double_pixels", "mask") pedestal_file_name = metadata.get("pedestal_name", None) if pedestal_file_name is not None and pedestal_file_name != pedestal_file_name_saved: @@ -182,18 +181,18 @@ def main(): if ju_stream_adapter.handler.pedestal_file is None or ju_stream_adapter.handler.pedestal_file == "": continue - data=np.ascontiguousarray(data) + data = np.ascontiguousarray(data) - # starting from ju 3.3.1 pedestal file is cached in library, re-calculated only if parameters(and/or pedestal file) are changed + # starting from ju 3.3.1 pedestal file is cached in library, re-calculated only if parameters (and/or pedestal file) are changed id_pixel_mask_1 = id(pixel_mask_corrected) - pixel_mask_corrected=ju_stream_adapter.handler.get_pixel_mask(geometry=True, gap_pixels=True, double_pixels=double_pixels) + pixel_mask_corrected = ju_stream_adapter.handler.get_pixel_mask(geometry=True, gap_pixels=True, double_pixels=double_pixels) id_pixel_mask_2 = id(pixel_mask_corrected) if id_pixel_mask_1 != id_pixel_mask_2: keep_pixels = None r_radial_integration = None if pixel_mask_corrected is not None: - #pixel_mask_corrected=np.ascontiguousarray(pixel_mask_corrected) + #pixel_mask_corrected = np.ascontiguousarray(pixel_mask_corrected) pixel_mask_pf = np.ascontiguousarray(pixel_mask_corrected).astype(np.int8, copy=False) else: @@ -262,9 +261,9 @@ def main(): if pixel_mask_corrected is not None: data_s = copy(image) saturated_pixels_coordinates = ju_stream_adapter.handler.get_saturated_pixels(data_s, mask=True, geometry=True, gap_pixels=True, double_pixels=double_pixels) - results['saturated_pixels'] = len(saturated_pixels_coordinates[0]) - results['saturated_pixels_x'] = saturated_pixels_coordinates[1].tolist() - results['saturated_pixels_y'] = saturated_pixels_coordinates[0].tolist() + results["saturated_pixels"] = len(saturated_pixels_coordinates[0]) + results["saturated_pixels_x"] = saturated_pixels_coordinates[1].tolist() + results["saturated_pixels_y"] = saturated_pixels_coordinates[0].tolist() # pump probe analysis do_radial_integration = results.get("do_radial_integration", 0) @@ -276,18 +275,18 @@ def main(): if keep_pixels is None and pixel_mask_pf is not None: keep_pixels = pixel_mask_pf!=0 if center_radial_integration is None: - center_radial_integration = [results['beam_center_x'], results['beam_center_y']] + center_radial_integration = [results["beam_center_x"], results["beam_center_y"]] r_radial_integration = None if r_radial_integration is None: r_radial_integration, nr_radial_integration = prepare_radial_profile(data_copy_1, center_radial_integration, keep_pixels) - r_min_max = [int(np.min(r_radial_integration)), int(np.max(r_radial_integration))+1] + r_min_max = [int(np.min(r_radial_integration)), int(np.max(r_radial_integration)) + 1] - apply_threshold = results.get('apply_threshold', 0) + apply_threshold = results.get("apply_threshold", 0) - if (apply_threshold != 0) and all ( k in results for k in ('threshold_min', 'threshold_max')): - threshold_min = float(results['threshold_min']) - threshold_max = float(results['threshold_max']) + if apply_threshold != 0 and all(k in results for k in ("threshold_min", "threshold_max")): + threshold_min = float(results["threshold_min"]) + threshold_max = float(results["threshold_max"]) data_copy_1[data_copy_1 < threshold_min] = np.nan if threshold_max > threshold_min: data_copy_1[data_copy_1 > threshold_max] = np.nan @@ -297,89 +296,93 @@ def main(): silent_region_min = results.get("radial_integration_silent_min", None) silent_region_max = results.get("radial_integration_silent_max", None) - if ( silent_region_min is not None and silent_region_max is not None and - silent_region_max > silent_region_min and - silent_region_min > r_min_max[0] and silent_region_max < r_min_max[1] ): + if ( + silent_region_min is not None and + silent_region_max is not None and + silent_region_max > silent_region_min and + silent_region_min > r_min_max[0] and + silent_region_max < r_min_max[1] + ): integral_silent_region = np.sum(rp[silent_region_min:silent_region_max]) - rp = rp/integral_silent_region - results['radint_normalised'] = [silent_region_min, silent_region_max] + rp = rp / integral_silent_region + results["radint_normalised"] = [silent_region_min, silent_region_max] - results['radint_I'] = list(rp[r_min_max[0]:]) - results['radint_q'] = r_min_max + results["radint_I"] = list(rp[r_min_max[0]:]) + results["radint_q"] = r_min_max #copy image to work with peakfinder, just in case - d=np.copy(data) + d = np.copy(data) - # make all masked pixels values nan's + # make all masked pixels values nans if pixel_mask_pf is not None: d[pixel_mask_pf != 1] = np.nan - apply_threshold = results.get('apply_threshold', 0) - threshold_value_choice = results.get('threshold_value', "NaN") + apply_threshold = results.get("apply_threshold", 0) + threshold_value_choice = results.get("threshold_value", "NaN") threshold_value = 0 if threshold_value_choice == "0" else np.nan - if (apply_threshold != 0) and all ( k in results for k in ('threshold_min', 'threshold_max')): - threshold_min = float(results['threshold_min']) - threshold_max = float(results['threshold_max']) + if apply_threshold != 0 and all(k in results for k in ("threshold_min", "threshold_max")): + threshold_min = float(results["threshold_min"]) + threshold_max = float(results["threshold_max"]) d[d < threshold_min] = threshold_value if threshold_max > threshold_min: d[d > threshold_max] = threshold_value # if roi calculation request is present, make it - roi_x1 = results.get('roi_x1', []) - roi_x2 = results.get('roi_x2', []) - roi_y1 = results.get('roi_y1', []) - roi_y2 = results.get('roi_y2', []) + roi_x1 = results.get("roi_x1", []) + roi_x2 = results.get("roi_x2", []) + roi_y1 = results.get("roi_y1", []) + roi_y2 = results.get("roi_y2", []) if len(roi_x1) > 0 and len(roi_x1) == len(roi_x2) and len(roi_x1) == len(roi_y1) and len(roi_x1) == len(roi_y2): - roi_results = [0]*len(roi_x1) - roi_results_normalised = [0]*len(roi_x1) + roi_results = [0] * len(roi_x1) + roi_results_normalised = [0] * len(roi_x1) if pixel_mask_pf is not None: - results['roi_intensities_x'] = [] - results['roi_intensities_proj_x'] = [] + results["roi_intensities_x"] = [] + results["roi_intensities_proj_x"] = [] for iRoi in range(len(roi_x1)): - data_roi = np.copy(d[roi_y1[iRoi]:roi_y2[iRoi],roi_x1[iRoi]:roi_x2[iRoi]]) + data_roi = np.copy(d[roi_y1[iRoi]:roi_y2[iRoi], roi_x1[iRoi]:roi_x2[iRoi]]) roi_results[iRoi] = np.nansum(data_roi) if threshold_value_choice == "NaN": - roi_results_normalised[iRoi] = roi_results[iRoi]/((roi_y2[iRoi]-roi_y1[iRoi])*(roi_x2[iRoi]-roi_x1[iRoi])) + roi_results_normalised[iRoi] = roi_results[iRoi] / ((roi_y2[iRoi] - roi_y1[iRoi]) * (roi_x2[iRoi] - roi_x1[iRoi])) else: roi_results_normalised[iRoi] = np.nanmean(data_roi) - results['roi_intensities_x'].append([roi_x1[iRoi],roi_x2[iRoi]]) - results['roi_intensities_proj_x'].append(np.nansum(data_roi,axis=0).tolist()) + results["roi_intensities_x"].append([roi_x1[iRoi], roi_x2[iRoi]]) + results["roi_intensities_proj_x"].append(np.nansum(data_roi,axis=0).tolist()) - results['roi_intensities'] = [ float(r) for r in roi_results] - results['roi_intensities_normalised'] = [ float(r) for r in roi_results_normalised ] + results["roi_intensities"] = [float(r) for r in roi_results] + results["roi_intensities_normalised"] = [float(r) for r in roi_results_normalised ] # SPI analysis do_spi_analysis = results.get("do_spi_analysis", 0) - if (do_spi_analysis != 0) and 'roi_intensities_normalised' in results and len(results['roi_intensities_normalised']) >= 2: + if (do_spi_analysis != 0) and "roi_intensities_normalised" in results and len(results["roi_intensities_normalised"]) >= 2: - if 'spi_limit' in results and len(results['spi_limit']) == 2: + if "spi_limit" in results and len(results["spi_limit"]) == 2: number_of_spots = 0 - if results['roi_intensities_normalised'][0] >= results['spi_limit'][0]: + if results["roi_intensities_normalised"][0] >= results["spi_limit"][0]: number_of_spots += 25 - if results['roi_intensities_normalised'][1] >= results['spi_limit'][1]: + if results["roi_intensities_normalised"][1] >= results["spi_limit"][1]: number_of_spots += 50 - results['number_of_spots'] = number_of_spots + results["number_of_spots"] = number_of_spots if number_of_spots > 0: - results['is_hit_frame'] = True + results["is_hit_frame"] = True # in case all needed parameters are present, make peakfinding - do_peakfinder_analysis = results.get('do_peakfinder_analysis', 0) - if (do_peakfinder_analysis != 0) and pixel_mask_pf is not None and all ( k in results for k in ('beam_center_x', 'beam_center_y', 'hitfinder_min_snr', 'hitfinder_min_pix_count', 'hitfinder_adc_thresh')): - x_beam = results['beam_center_x'] - 0.5 # to coordinates where position of first pixel/point is 0.5, 0.5 - y_beam = results['beam_center_y'] - 0.5 # to coordinates where position of first pixel/point is 0.5, 0.5 - hitfinder_min_snr = results['hitfinder_min_snr'] - hitfinder_min_pix_count = int(results['hitfinder_min_pix_count']) - hitfinder_adc_thresh = results['hitfinder_adc_thresh'] + do_peakfinder_analysis = results.get("do_peakfinder_analysis", 0) + if do_peakfinder_analysis != 0 and pixel_mask_pf is not None and all(k in results for k in ("beam_center_x", "beam_center_y", "hitfinder_min_snr", "hitfinder_min_pix_count", "hitfinder_adc_thresh")): + x_beam = results["beam_center_x"] - 0.5 # to coordinates where position of first pixel/point is 0.5, 0.5 + y_beam = results["beam_center_y"] - 0.5 # to coordinates where position of first pixel/point is 0.5, 0.5 + hitfinder_min_snr = results["hitfinder_min_snr"] + hitfinder_min_pix_count = int(results["hitfinder_min_pix_count"]) + hitfinder_adc_thresh = results["hitfinder_adc_thresh"] asic_ny, asic_nx = d.shape nasics_y, nasics_x = 1, 1 @@ -395,61 +398,63 @@ def main(): y, x = np.indices(d.shape) pix_r = np.sqrt((x-x_beam)**2 + (y-y_beam)**2) - (peak_list_x, peak_list_y, peak_list_value) = peakfinder_8(max_num_peaks, - d.astype(np.float32), - maskPr.astype(np.int8), - pix_r.astype(np.float32), - asic_nx, asic_ny, - nasics_x, nasics_y, - hitfinder_adc_thresh, - hitfinder_min_snr, - hitfinder_min_pix_count, - hitfinder_max_pix_count, - hitfinder_local_bg_radius) + peak_list_x, peak_list_y, peak_list_value = peakfinder_8( + max_num_peaks, + d.astype(np.float32), + maskPr.astype(np.int8), + pix_r.astype(np.float32), + asic_nx, asic_ny, + nasics_x, nasics_y, + hitfinder_adc_thresh, + hitfinder_min_snr, + hitfinder_min_pix_count, + hitfinder_max_pix_count, + hitfinder_local_bg_radius + ) number_of_spots = len(peak_list_x) - results['number_of_spots'] = number_of_spots + results["number_of_spots"] = number_of_spots if number_of_spots != 0: - results['spot_x'] = [-1.0] * number_of_spots - results['spot_y'] = [-1.0] * number_of_spots - results['spot_intensity'] = copy(peak_list_value) + results["spot_x"] = [-1.0] * number_of_spots + results["spot_y"] = [-1.0] * number_of_spots + results["spot_intensity"] = copy(peak_list_value) for i in range(number_of_spots): - results['spot_x'][i] = peak_list_x[i]+0.5 - results['spot_y'][i] = peak_list_y[i]+0.5 + results["spot_x"][i] = peak_list_x[i] + 0.5 + results["spot_y"][i] = peak_list_y[i] + 0.5 else: - results['spot_x'] = [] - results['spot_y'] = [] - results['spot_intensity'] = [] + results["spot_x"] = [] + results["spot_y"] = [] + results["spot_intensity"] = [] - npeaks_threshold_hit = results.get('npeaks_threshold_hit', 15) + npeaks_threshold_hit = results.get("npeaks_threshold_hit", 15) if number_of_spots >= npeaks_threshold_hit: - results['is_hit_frame'] = True + results["is_hit_frame"] = True forceSendVisualisation = False if data.dtype != np.uint16: - apply_threshold = results.get('apply_threshold', 0) - apply_aggregation = results.get('apply_aggregation', 0) + apply_threshold = results.get("apply_threshold", 0) + apply_aggregation = results.get("apply_aggregation", 0) if apply_aggregation == 0: data_summed = None n_aggregated_images = 1 - if (apply_threshold != 0) or (apply_aggregation != 0 ): - if (apply_threshold != 0) and all ( k in results for k in ('threshold_min', 'threshold_max')): - threshold_min = float(results['threshold_min']) - threshold_max = float(results['threshold_max']) + if apply_threshold != 0 or apply_aggregation != 0: + if apply_threshold != 0 and all(k in results for k in ("threshold_min", "threshold_max")): + threshold_min = float(results["threshold_min"]) + threshold_max = float(results["threshold_max"]) data[data < threshold_min] = 0.0 if threshold_max > threshold_min: data[data > threshold_max] = 0.0 - if (apply_aggregation != 0 ) and 'aggregation_max' in results: + if (apply_aggregation != 0 ) and "aggregation_max" in results: if data_summed is not None: data += data_summed n_aggregated_images += 1 data_summed = data.copy() data_summed[data == -np.nan] = -np.nan - results['aggregated_images'] = n_aggregated_images - results['worker'] = worker - if n_aggregated_images >= results['aggregation_max']: + results["aggregated_images"] = n_aggregated_images + results["worker"] = worker + if n_aggregated_images >= results["aggregation_max"]: forceSendVisualisation = True data_summed = None n_aggregated_images = 1 @@ -458,31 +463,31 @@ def main(): else: data = image - results['type'] = str(data.dtype) - results['shape'] = data.shape + results["type"] = str(data.dtype) + results["shape"] = data.shape - frame_number = metadata['frame'] + frame_number = metadata["frame"] # accumulator_socket.send_json(results, FLAGS) - if (apply_aggregation != 0 ) and 'aggregation_max' in results: + if (apply_aggregation != 0 ) and "aggregation_max" in results: if forceSendVisualisation: visualisation_socket.send_json(results, FLAGS | zmq.SNDMORE) visualisation_socket.send(data, FLAGS, copy=True, track=True) else: - data = np.empty((2,2), dtype=np.uint16) - results['type'] = str(data.dtype) - results['shape'] = data.shape + data = np.empty((2, 2), dtype=np.uint16) + results["type"] = str(data.dtype) + results["shape"] = data.shape visualisation_socket.send_json(results, FLAGS | zmq.SNDMORE) visualisation_socket.send(data, FLAGS, copy=True, track=True) else: - if results['is_good_frame'] and (results['is_hit_frame'] or randint(1, skip_frames_rate) == 1): + if results["is_good_frame"] and (results["is_hit_frame"] or randint(1, skip_frames_rate) == 1): visualisation_socket.send_json(results, FLAGS | zmq.SNDMORE) visualisation_socket.send(data, FLAGS, copy=True, track=True) else: - data = np.empty((2,2), dtype=np.uint16) - results['type'] = str(data.dtype) - results['shape'] = data.shape + data = np.empty((2, 2), dtype=np.uint16) + results["type"] = str(data.dtype) + results["shape"] = data.shape visualisation_socket.send_json(results, FLAGS | zmq.SNDMORE) visualisation_socket.send(data, FLAGS, copy=True, track=True)