From 71a1e5d48803b0642a763cba568cab8cf8cf13c7 Mon Sep 17 00:00:00 2001 From: Sven Augustin Date: Thu, 16 Oct 2025 20:58:53 +0200 Subject: [PATCH] make bsread timestamp already in worker; do not send timestamp(s) and pulse ID --- dap/accumulator.py | 15 ++++++--------- dap/worker.py | 4 ++-- 2 files changed, 8 insertions(+), 11 deletions(-) diff --git a/dap/accumulator.py b/dap/accumulator.py index e366e7f..cc0e7b3 100644 --- a/dap/accumulator.py +++ b/dap/accumulator.py @@ -2,7 +2,7 @@ import argparse from bsread.sender import Sender, PUB -from utils import FileHandler, Sorter, make_bsread_timestamp +from utils import FileHandler, Sorter from zmqsocks import ZMQSocketsAccumulator, make_address @@ -60,9 +60,9 @@ def accumulate(accumulator_addr, bsread_port): if not bsread_port: continue + timestamp = results["timestamp"] + to_copy = ( - "pulse_id", - "timestamp", "frame", "is_good_frame", "number_of_spots", @@ -71,13 +71,10 @@ def accumulate(accumulator_addr, bsread_port): data = {f"{detector}:{k}": results[k] for k in to_copy} - sorter.add(pulse_id, data) + sorter.add(pulse_id, (timestamp, data)) ready = sorter.flush_ready() - for pulse_id, data in ready: - timestamp = results["timestamp"] - timestamp = make_bsread_timestamp(timestamp, pulse_id) - data[f"{detector}:bsread_timestamp_secs"], data[f"{detector}:bsread_timestamp_nanos"] = timestamp - sender.send(pulse_id=pulse_id, data=data, timestamp=timestamp) + for pulse_id, (timestamp, data) in ready: + sender.send(pulse_id=pulse_id, timestamp=timestamp, data=data) diff --git a/dap/worker.py b/dap/worker.py index 98794cd..67a63de 100644 --- a/dap/worker.py +++ b/dap/worker.py @@ -7,7 +7,7 @@ from algos import ( calc_apply_aggregation, calc_apply_threshold, calc_mask_pixels, calc_peakfinder_analysis, calc_radial_integration, calc_roi, calc_spi_analysis, calc_streakfinder_analysis, JFData ) -from utils import Aggregator, BufferedJSON, randskip, read_bit +from utils import Aggregator, BufferedJSON, make_bsread_timestamp, randskip, read_bit from zmqsocks import ZMQSocketsWorker, make_address @@ -74,7 +74,7 @@ def work(backend_addr, accumulator_addr, visualisation_addr, fn_peakfinder_param results = metadata.copy() results.update(peakfinder_parameters) - results["timestamp"] = timestamp + results["timestamp"] = make_bsread_timestamp(timestamp, pulse_id) results["number_of_spots"] = 0 results["is_hit_frame"] = False