From 1de99bf512cf1ebd057dc63faa520ee6339f1aaf Mon Sep 17 00:00:00 2001 From: Sven Augustin Date: Wed, 15 Oct 2025 12:45:11 +0200 Subject: [PATCH] moved zmq logic into ZMQSocketsAccumulator --- dap/accumulator.py | 20 +++++--------------- dap/zmqsocks.py | 22 ++++++++++++++++++++++ 2 files changed, 27 insertions(+), 15 deletions(-) diff --git a/dap/accumulator.py b/dap/accumulator.py index 0edf1d4..43e4a88 100644 --- a/dap/accumulator.py +++ b/dap/accumulator.py @@ -1,11 +1,9 @@ import argparse -import zmq - from utils import FileHandler +from zmqsocks import ZMQSocketsAccumulator -FLAGS = 0 OUTPUT_DIR_NAME = "/gpfs/photonics/swissfel/buffer/dap/data" @@ -22,14 +20,8 @@ def main(): -def accumulate(_accumulator_host, accumulator_port): #TODO: accumulator_host is not used - zmq_context = zmq.Context(io_threads=4) - poller = zmq.Poller() - - accumulator_socket = zmq_context.socket(zmq.PULL) - accumulator_socket.bind(f"tcp://*:{accumulator_port}") - - poller.register(accumulator_socket, zmq.POLLIN) +def accumulate(accumulator_host, accumulator_port): + zmq_socks = ZMQSocketsAccumulator(accumulator_host, accumulator_port) run_name_before = None output_dap = FileHandler() @@ -37,13 +29,11 @@ def accumulate(_accumulator_host, accumulator_port): #TODO: accumulator_host is n_frames_received = 0 while True: - events = dict(poller.poll(10)) # check for worker output every 0.01 seconds - - if accumulator_socket not in events: + if not zmq_socks.has_data(): output_dap.flush() # may be too intensive continue - results = accumulator_socket.recv_json(FLAGS) + results = zmq_socks.get_data() n_frames_received += 1 detector = results.get("detector_name", "") diff --git a/dap/zmqsocks.py b/dap/zmqsocks.py index 0d134d2..13ad67c 100644 --- a/dap/zmqsocks.py +++ b/dap/zmqsocks.py @@ -5,6 +5,28 @@ import zmq FLAGS = 0 +class ZMQSocketsAccumulator: + + def __init__(self, _accumulator_host, accumulator_port): #TODO: accumulator_host is not used + zmq_context = zmq.Context(io_threads=4) + self.poller = poller = zmq.Poller() + + # receive from workers: + self.accumulator_socket = accumulator_socket = zmq_context.socket(zmq.PULL) + accumulator_socket.bind(f"tcp://*:{accumulator_port}") + + poller.register(accumulator_socket, zmq.POLLIN) + + + def has_data(self): + events = dict(self.poller.poll(10)) # check for worker output every 0.01 seconds + return (self.accumulator_socket in events) + + def get_data(self): + return self.accumulator_socket.recv_json(FLAGS) + + + class ZMQSocketsWorker: def __init__(self, backend_address, accumulator_host, accumulator_port, visualisation_host, visualisation_port):