diff --git a/dap/accumulator.py b/dap/accumulator.py index 16f1977..9be5745 100644 --- a/dap/accumulator.py +++ b/dap/accumulator.py @@ -1,8 +1,6 @@ import argparse -from bsread.sender import Sender, PUB - -from utils import FileHandler, Sorter +from utils import FileHandler, Sorter, make_bsread_sender, pack_bsread_data from zmqsocks import ZMQSocketsAccumulator, make_address @@ -32,8 +30,7 @@ def accumulate(accumulator_addr, bsread_port): sorter = Sorter() if bsread_port: - sender = Sender(port=bsread_port, block=False, mode=PUB) - sender.open() + sender = make_bsread_sender(bsread_port) while True: if not zmq_socks.has_data(): @@ -62,13 +59,7 @@ def accumulate(accumulator_addr, bsread_port): timestamp = tuple(results["timestamp"]) - data = {} - for k, v in results.items(): - if isinstance(v, bool): - v = int(v) - elif isinstance(v, list) and not v: - v = None - data[f"{detector}:{k}"] = v + data = pack_bsread_data(results, detector) sorter.add(pulse_id, (timestamp, data)) diff --git a/dap/utils/__init__.py b/dap/utils/__init__.py index cb78642..7447f85 100644 --- a/dap/utils/__init__.py +++ b/dap/utils/__init__.py @@ -1,5 +1,6 @@ from .aggregator import Aggregator +from .bsreadext import make_bsread_sender, pack_bsread_data from .bits import read_bit from .bufjson import BufferedJSON from .filehandler import FileHandler diff --git a/dap/utils/bsreadext.py b/dap/utils/bsreadext.py new file mode 100644 index 0000000..61cb85a --- /dev/null +++ b/dap/utils/bsreadext.py @@ -0,0 +1,21 @@ +from bsread.sender import Sender, PUB + + +def make_bsread_sender(bsread_port): + sender = Sender(port=bsread_port, block=False, mode=PUB) + sender.open() + return sender + + +def pack_bsread_data(orig, prefix): + data = {} + for k, v in orig.items(): + if isinstance(v, bool): + v = int(v) + elif isinstance(v, list) and not v: + v = None + data[f"{prefix}:{k}"] = v + return data + + +