Files
dap/dap/worker.py

168 lines
5.5 KiB
Python

import argparse
from time import time_ns
import numpy as np
from logzero import logger as log
from algos import (
calc_apply_aggregation, calc_apply_threshold, calc_custom, calc_mask_pixels, calc_peakfinder_analysis,
calc_radial_integration, calc_roi, calc_spi_analysis, calc_streakfinder_analysis, JFData
)
from utils import Aggregator, BufferedJSON, make_bsread_timestamp, randskip, read_bit
from zmqsocks import ZMQSocketsWorker, make_address
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--backend_host", required=True)
parser.add_argument("--backend_port", required=True)
parser.add_argument("--accumulator_host", default="localhost")
parser.add_argument("--accumulator_port", type=int, default=13000)
parser.add_argument("--visualisation_host", default="localhost")
parser.add_argument("--visualisation_port", type=int, default=12000)
parser.add_argument("--config_file", default=None, help="json file with configuration parameters")
parser.add_argument("--skip_frames", type=int, default=1, help="skip (approximately) skip_frames %% of pulses")
clargs = parser.parse_args()
backend_addr = make_address(clargs.backend_host, clargs.backend_port)
accumulator_addr = make_address(clargs.accumulator_host, clargs.accumulator_port)
visualisation_addr = make_address(clargs.visualisation_host, clargs.visualisation_port)
work(
backend_addr,
accumulator_addr,
visualisation_addr,
clargs.config_file,
clargs.skip_frames
)
def work(backend_addr, accumulator_addr, visualisation_addr, fn_config, skip_frames):
config_file = BufferedJSON(fn_config)
jfdata = JFData()
zmq_socks = ZMQSocketsWorker(backend_addr, accumulator_addr, visualisation_addr)
aggregator = Aggregator()
while True:
if not zmq_socks.has_data():
continue
raw_image, metadata = zmq_socks.get_data()
timestamp = time_ns()
if metadata["shape"] == [2, 2]: # this is used as marker for empty images
continue
pulse_id = metadata.get("pulse_id", 0)
try:
config = config_file.load()
except:
log.exception(f"({pulse_id}) cannot read config file: {config_file.fname}")
results = metadata.copy()
results.update(config)
results["timestamp"] = make_bsread_timestamp(timestamp, pulse_id)
results["number_of_spots"] = 0
results["is_hit_frame"] = False
daq_rec = results.get("daq_rec", 0)
event_laser = read_bit(daq_rec, 16)
event_darkshot = read_bit(daq_rec, 17)
# event_fel = read_bit(daq_rec, 18)
event_ppicker = read_bit(daq_rec, 19)
results["laser_on"] = event_laser and not event_darkshot
# if requested, filter on ppicker events by skipping other events
select_only_ppicker_events = results.get("select_only_ppicker_events", False)
if select_only_ppicker_events and not event_ppicker:
continue
pedestal_name = metadata.get("pedestal_name", None)
jfdata.ensure_current_pixel_mask(pedestal_name)
double_pixels = results.get("double_pixels", "mask")
image = jfdata.process(raw_image, metadata, double_pixels)
if image is None:
continue
pixel_mask = jfdata.get_pixel_mask(results, double_pixels)
if pixel_mask is not None:
saturated_pixels_y, saturated_pixels_x = jfdata.get_saturated_pixels(raw_image, double_pixels)
results["saturated_pixels"] = len(saturated_pixels_x)
results["saturated_pixels_x"] = saturated_pixels_x
results["saturated_pixels_y"] = saturated_pixels_y
calc_radial_integration(results, image, pixel_mask)
pfimage = image.copy() #TODO: is this copy needed?
calc_mask_pixels(pfimage, pixel_mask) # changes pfimage in place
calc_apply_threshold(results, pfimage) # changes pfimage in place
calc_roi(results, pfimage)
calc_spi_analysis(results, pfimage)
calc_peakfinder_analysis(results, pfimage, pixel_mask)
calc_custom(results, pfimage, pixel_mask)
# streak finder for convergent-beam diffraction experiments
# changes image and pixel_mask in place if do_snr=True in parameters file
image = calc_streakfinder_analysis(results, image, pixel_mask)
image, aggregation_is_ready = calc_apply_aggregation(results, image, pixel_mask, aggregator)
results["type"] = str(image.dtype)
results["shape"] = image.shape
zmq_socks.send_accumulator(results)
apply_aggregation = results.get("apply_aggregation", False)
aggregation_is_enabled = (apply_aggregation and "aggregation_max" in results)
aggregation_is_enabled_but_not_ready = (aggregation_is_enabled and not aggregation_is_ready)
is_bad_frame = (not results["is_good_frame"])
# hits are sent at full rate, but no-hits are sent at reduced frequency
is_no_hit_frame = (not results["is_hit_frame"])
random_skip = randskip(skip_frames)
is_no_hit_frame_and_skipped = (is_no_hit_frame and random_skip)
if aggregation_is_enabled_but_not_ready or is_bad_frame or is_no_hit_frame_and_skipped:
image = np.empty((2, 2), dtype=np.uint16)
results["type"] = str(image.dtype)
results["shape"] = image.shape
zmq_socks.send_visualisation(results, image)
if __name__ == "__main__":
main()