cleanup
This commit is contained in:
@ -12,55 +12,58 @@ OUTPUT_DIR_NAME = "/gpfs/photonics/swissfel/buffer/dap/data"
|
|||||||
def main():
|
def main():
|
||||||
parser = argparse.ArgumentParser()
|
parser = argparse.ArgumentParser()
|
||||||
|
|
||||||
parser.add_argument("--accumulator", default="localhost", help="name of host where accumulator works")
|
parser.add_argument("--accumulator_host", default="localhost")
|
||||||
parser.add_argument("--accumulator_port", default=13002, type=int, help="accumulator port")
|
parser.add_argument("--accumulator_port", default=13002, type=int)
|
||||||
|
|
||||||
args = parser.parse_args()
|
clargs = parser.parse_args()
|
||||||
|
|
||||||
# FA_HOST_ACCUMULATE = args.accumulator
|
accumulate(clargs.accumulator_host, clargs.accumulator_port)
|
||||||
FA_PORT_ACCUMULATE = args.accumulator_port
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
def accumulate(_accumulator_host, accumulator_port): #TODO: accumulator_host is not used
|
||||||
zmq_context = zmq.Context(io_threads=4)
|
zmq_context = zmq.Context(io_threads=4)
|
||||||
poller = zmq.Poller()
|
poller = zmq.Poller()
|
||||||
|
|
||||||
# Accumulator
|
|
||||||
accumulator_socket = zmq_context.socket(zmq.PULL)
|
accumulator_socket = zmq_context.socket(zmq.PULL)
|
||||||
accumulator_socket.bind(f"tcp://*:{FA_PORT_ACCUMULATE}")
|
accumulator_socket.bind(f"tcp://*:{accumulator_port}")
|
||||||
|
|
||||||
poller.register(accumulator_socket, zmq.POLLIN)
|
poller.register(accumulator_socket, zmq.POLLIN)
|
||||||
|
|
||||||
n_frames_received = 0
|
|
||||||
|
|
||||||
run_name_before = "very_first_start"
|
run_name_before = "very_first_start"
|
||||||
fNameOutput = f"{OUTPUT_DIR_NAME}/{run_name_before}.dap"
|
fNameOutput = f"{OUTPUT_DIR_NAME}/{run_name_before}.dap"
|
||||||
if not os.path.isdir(os.path.dirname(fNameOutput)):
|
if not os.path.isdir(os.path.dirname(fNameOutput)):
|
||||||
os.makedirs(os.path.dirname(fNameOutput))
|
os.makedirs(os.path.dirname(fNameOutput))
|
||||||
outputDap = open(fNameOutput, "a")
|
outputDap = open(fNameOutput, "a")
|
||||||
|
|
||||||
|
n_frames_received = 0
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
events = dict(poller.poll(10)) # in accumulator check for worker output every 0.01 seconds
|
events = dict(poller.poll(10)) # in accumulator check for worker output every 0.01 seconds
|
||||||
|
|
||||||
if accumulator_socket in events:
|
if accumulator_socket not in events:
|
||||||
results = accumulator_socket.recv_json(FLAGS)
|
|
||||||
n_frames_received += 1
|
|
||||||
|
|
||||||
pulse_id = results.get("pulse_id", 0)
|
|
||||||
run_name = str(pulse_id//10000*10000)
|
|
||||||
detector = results.get("detector_name", "")
|
|
||||||
|
|
||||||
if run_name != run_name_before:
|
|
||||||
run_name_before = run_name
|
|
||||||
outputDap.close()
|
|
||||||
fNameOutput = f"{OUTPUT_DIR_NAME}/{detector}/{run_name_before}.dap"
|
|
||||||
if not os.path.isdir(os.path.dirname(fNameOutput)):
|
|
||||||
os.makedirs(os.path.dirname(fNameOutput))
|
|
||||||
outputDap = open(fNameOutput, "a")
|
|
||||||
|
|
||||||
pr_rois = results.get("roi_intensities", [])
|
|
||||||
print(pulse_id, results.get("is_good_frame", -1), results.get("is_hit_frame", False), results.get("number_of_spots", -1), results.get("laser_on", False), *pr_rois, file=outputDap)
|
|
||||||
|
|
||||||
else:
|
|
||||||
outputDap.flush() # may be too intensive
|
outputDap.flush() # may be too intensive
|
||||||
|
continue
|
||||||
|
|
||||||
|
results = accumulator_socket.recv_json(FLAGS)
|
||||||
|
n_frames_received += 1
|
||||||
|
|
||||||
|
pulse_id = results.get("pulse_id", 0)
|
||||||
|
run_name = str(pulse_id//10000*10000)
|
||||||
|
detector = results.get("detector_name", "")
|
||||||
|
|
||||||
|
if run_name != run_name_before:
|
||||||
|
run_name_before = run_name
|
||||||
|
outputDap.close()
|
||||||
|
fNameOutput = f"{OUTPUT_DIR_NAME}/{detector}/{run_name_before}.dap"
|
||||||
|
if not os.path.isdir(os.path.dirname(fNameOutput)):
|
||||||
|
os.makedirs(os.path.dirname(fNameOutput))
|
||||||
|
outputDap = open(fNameOutput, "a")
|
||||||
|
|
||||||
|
pr_rois = results.get("roi_intensities", [])
|
||||||
|
print(pulse_id, results.get("is_good_frame", -1), results.get("is_hit_frame", False), results.get("number_of_spots", -1), results.get("laser_on", False), *pr_rois, file=outputDap)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user