Files
dap/dap/accumulator.py

93 lines
2.6 KiB
Python

import argparse
from logzero import logger as log
from utils import FileHandler, Sorter, make_bsread_sender, pack_bsread_data
from zmqsocks import ZMQSocketsAccumulator, make_address
OUTPUT_DIR = "/gpfs/photonics/swissfel/buffer/dap/data"
ENTRIES_TO_SKIP = [
"custom_script",
"detector_name",
"gain_file",
"htype",
"pedestal_file",
"pulse_id",
"timestamp"
]
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--accumulator_host", default="*")
parser.add_argument("--accumulator_port", type=int, default=13000)
parser.add_argument("--bsread_host", default="*")
parser.add_argument("--bsread_port", type=int, default=None)
parser.add_argument("--bsread_window", type=int, default=1000)
clargs = parser.parse_args()
accumulator_addr = make_address(clargs.accumulator_host, clargs.accumulator_port)
accumulate(accumulator_addr, clargs.bsread_host, clargs.bsread_port, clargs.bsread_window)
def accumulate(accumulator_addr, bsread_host, bsread_port, bsread_window):
zmq_socks = ZMQSocketsAccumulator(accumulator_addr)
output = FileHandler()
sorter = Sorter(window=bsread_window)
sender = make_bsread_sender(bsread_host, bsread_port)
while True:
if not zmq_socks.has_data():
output.flush() # may be too intensive
continue
results = zmq_socks.get_data()
detector = results.get("detector_name", "")
pulse_id = results.get("pulse_id", 0)
rounded_pulse_id = str(pulse_id // 10000 * 10000)
fname = f"{OUTPUT_DIR}/{detector}/{rounded_pulse_id}.dap"
output.switch(fname)
res_is_good_frame = results.get("is_good_frame", -1)
res_is_hit_frame = results.get("is_hit_frame", False)
res_number_of_spots = results.get("number_of_spots", -1)
res_laser_on = results.get("laser_on", False)
res_roi_intensities = results.get("roi_intensities", [])
output.write(pulse_id, res_is_good_frame, res_is_hit_frame, res_number_of_spots, res_laser_on, *res_roi_intensities)
if not sender:
continue
timestamp = tuple(results["timestamp"])
data = pack_bsread_data(results, detector, skip=ENTRIES_TO_SKIP)
sorter.add(pulse_id, (timestamp, data))
ready = sorter.flush_ready()
for pulse_id, (timestamp, data) in ready:
try:
sender.send(pulse_id=pulse_id, timestamp=timestamp, data=data)
except:
log.exception(f"{pulse_id} bsread failed to send: {data}")
if __name__ == "__main__":
main()