moved zmq logic into ZMQSocketsAccumulator

This commit is contained in:
2025-10-15 12:45:11 +02:00
parent d8cdc17860
commit 1de99bf512
2 changed files with 27 additions and 15 deletions

View File

@@ -1,11 +1,9 @@
import argparse
import zmq
from utils import FileHandler
from zmqsocks import ZMQSocketsAccumulator
FLAGS = 0
OUTPUT_DIR_NAME = "/gpfs/photonics/swissfel/buffer/dap/data"
@@ -22,14 +20,8 @@ def main():
def accumulate(_accumulator_host, accumulator_port): #TODO: accumulator_host is not used
zmq_context = zmq.Context(io_threads=4)
poller = zmq.Poller()
accumulator_socket = zmq_context.socket(zmq.PULL)
accumulator_socket.bind(f"tcp://*:{accumulator_port}")
poller.register(accumulator_socket, zmq.POLLIN)
def accumulate(accumulator_host, accumulator_port):
zmq_socks = ZMQSocketsAccumulator(accumulator_host, accumulator_port)
run_name_before = None
output_dap = FileHandler()
@@ -37,13 +29,11 @@ def accumulate(_accumulator_host, accumulator_port): #TODO: accumulator_host is
n_frames_received = 0
while True:
events = dict(poller.poll(10)) # check for worker output every 0.01 seconds
if accumulator_socket not in events:
if not zmq_socks.has_data():
output_dap.flush() # may be too intensive
continue
results = accumulator_socket.recv_json(FLAGS)
results = zmq_socks.get_data()
n_frames_received += 1
detector = results.get("detector_name", "")

View File

@@ -5,6 +5,28 @@ import zmq
FLAGS = 0
class ZMQSocketsAccumulator:
def __init__(self, _accumulator_host, accumulator_port): #TODO: accumulator_host is not used
zmq_context = zmq.Context(io_threads=4)
self.poller = poller = zmq.Poller()
# receive from workers:
self.accumulator_socket = accumulator_socket = zmq_context.socket(zmq.PULL)
accumulator_socket.bind(f"tcp://*:{accumulator_port}")
poller.register(accumulator_socket, zmq.POLLIN)
def has_data(self):
events = dict(self.poller.poll(10)) # check for worker output every 0.01 seconds
return (self.accumulator_socket in events)
def get_data(self):
return self.accumulator_socket.recv_json(FLAGS)
class ZMQSocketsWorker:
def __init__(self, backend_address, accumulator_host, accumulator_port, visualisation_host, visualisation_port):