diff --git a/dap/accumulator.py b/dap/accumulator.py index fbc51f6..7dd18b3 100644 --- a/dap/accumulator.py +++ b/dap/accumulator.py @@ -1,6 +1,8 @@ import argparse -from utils import FileHandler +from bsread.sender import Sender + +from utils import FileHandler, Sorter from zmqsocks import ZMQSocketsAccumulator, make_address @@ -12,20 +14,27 @@ def main(): parser.add_argument("--accumulator_host", default="*") parser.add_argument("--accumulator_port", type=int, default=13000) + parser.add_argument("--bsread_port", type=int, default=None) #TODO: is the host needed? clargs = parser.parse_args() accumulator_addr = make_address(clargs.accumulator_host, clargs.accumulator_port) - accumulate(accumulator_addr) + accumulate(accumulator_addr, clargs.bsread_port) -def accumulate(accumulator_addr): +def accumulate(accumulator_addr, bsread_port): zmq_socks = ZMQSocketsAccumulator(accumulator_addr) output = FileHandler() + sorter = Sorter() + + if bsread_port: + sender = Sender(port=bsread_port) + sender.open() + while True: if not zmq_socks.has_data(): output.flush() # may be too intensive @@ -48,6 +57,15 @@ def accumulate(accumulator_addr): output.write(pulse_id, res_is_good_frame, res_is_hit_frame, res_number_of_spots, res_laser_on, *res_roi_intensities) + if not bsread_port: + continue + + sorter.add(pulse_id, results) + ready = sorter.flush_ready() + for i in ready: + data = {f"{detector}:{k}": v for k, v in i.items()} + sender.send(data=data, pulse_id=pulse_id) #TODO: is there a timestamp in the input? +