diff --git a/sf_daq_broker/writer/detector_writer.py b/sf_daq_broker/writer/detector_writer.py index 016feb3..d00ca99 100644 --- a/sf_daq_broker/writer/detector_writer.py +++ b/sf_daq_broker/writer/detector_writer.py @@ -90,8 +90,7 @@ def detector_retrieve(request, output_file_detector): _logger.info("Finished retrieve from the buffer") if pedestal_run: - # sleep, to make sure h5 file is readable (strange but got problem rarely trying to read it) - sleep(60) + sleep(5) time_start = time() if detector in PEDESTAL_SPECIFIC: diff --git a/sf_daq_broker/writer/post_retrieve.py b/sf_daq_broker/writer/post_retrieve.py index 3e4019e..5cdaab7 100644 --- a/sf_daq_broker/writer/post_retrieve.py +++ b/sf_daq_broker/writer/post_retrieve.py @@ -1,11 +1,13 @@ import argparse import os import json +from glob import glob +import logging +from datetime import datetime + from sf_daq_broker.writer.bsread_writer import write_from_imagebuffer, write_from_databuffer_api3 from sf_daq_broker.utils import get_data_api_request -import logging from sf_daq_broker import config -from datetime import datetime #logger = logging.getLogger("data_api3") logger = logging.getLogger("broker_writer") @@ -64,25 +66,27 @@ data_request["channels"] = [{'name': ch, 'backend': config.IMAGE_BACKEND if ch.e run_number = run_info.get("run_number", 0) acquisition_number = run_info.get("acquisition_number", 0) -user_tag = run_info.get("user_tag_cleaned", None) parameters = None -if user_tag is not None: - run_dir_name = f'run{run_number:04}-{user_tag}' +list_data_directories_run = glob(f'/sf/{run_info["beamline"]}/data/{run_info["pgroup"]}/raw/run{run_number:04}*') +if len(list_data_directories_run) != 1: + print(f"Ambiguous data directries : {list_data_directories_run}") + exit() +data_directory=list_data_directories_run[0] if source == "image": - output_file = f'/sf/{run_info["beamline"]}/data/{run_info["pgroup"]}/raw/{run_dir_name}/data/acq{acquisition_number:04}.CAMERAS.h5.2' + output_file = f'{data_directory}/data/acq{acquisition_number:04}.CAMERAS.h5.2' write_from_imagebuffer(data_request, output_file, parameters) elif source == "data_api3": - output_file = f'/sf/{run_info["beamline"]}/data/{run_info["pgroup"]}/raw/{run_dir_name}/data/acq{acquisition_number:04}.BSDATA.h5.2' + output_file = f'{data_directory}/data/acq{acquisition_number:04}.BSDATA.h5.2' write_from_databuffer_api3(data_request, output_file, parameters) else: - output_file = f'/sf/{run_info["beamline"]}/data/{run_info["pgroup"]}/raw/run{run_number:04}/data/acq{acquisition_number:04}.PVCHANNELS.h5' + output_file = f'{data_directory}/data/acq{acquisition_number:04}.PVCHANNELS.h5' metadata = { "general/user": run_info["pgroup"], diff --git a/sf_daq_broker/writer/start.py b/sf_daq_broker/writer/start.py index ec11e69..5bacc42 100644 --- a/sf_daq_broker/writer/start.py +++ b/sf_daq_broker/writer/start.py @@ -10,7 +10,8 @@ from pika import BlockingConnection, ConnectionParameters, BasicProperties from sf_daq_broker import config, utils import sf_daq_broker.rabbitmq.config as broker_config -from sf_daq_broker.utils import get_data_api_request +from sf_daq_broker.rabbitmq.msg_broker_client import RabbitMqClient +from sf_daq_broker.utils import get_data_api_request, get_writer_request from sf_daq_broker.writer.bsread_writer import write_from_imagebuffer, write_from_databuffer_api3 from sf_daq_broker.detector.pedestal import take_pedestal from sf_daq_broker.writer.detector_writer import detector_retrieve @@ -64,7 +65,7 @@ def wait_for_delay(request_timestamp, writer_type): sleep(adjusted_retrieval_delay) -def process_request(request): +def process_request(request, broker_client): writer_type = request["writer_type"] channels = request.get("channels", None) @@ -108,7 +109,7 @@ def process_request(request): wait_for_delay(request_timestamp, writer_type) - _logger.info("Starting download.") + _logger.info("Starting payload.") start_time = time() @@ -122,8 +123,10 @@ def process_request(request): elif writer_type == broker_config.TAG_PEDESTAL: _logger.info("Doing pedestal.") + detectors = request.get("detectors", []) det_start_pulse_id, det_stop_pulse_id = take_pedestal(detectors_name=detectors, rate=request.get("rate_multiplicator", 1)) + # overwrite start/stop pulse_id's in run_info json file run_file_json = request.get("run_file_json", None) if run_file_json is not None: @@ -144,24 +147,28 @@ def process_request(request): "directory_name" : request.get("directory_name"), "request_time" : request.get("request_time", str(datetime.now())) } - + + broker_client.open() + for detector in detectors: request_det_retrieve["detector_name"] = detector request_det_retrieve["detectors"] = {} request_det_retrieve["detectors"][detector] = {} output_file_prefix = request.get("output_file_prefix", "/tmp/error") output_file_det = f'{output_file_prefix}.{detector}.h5' - try: - detector_retrieve(request_det_retrieve, output_file_det) - except Exception as ex: - _logger.exception("Error while trying to retrieve and convert pedestal data") - sleep(120) - try: - detector_retrieve(request_det_retrieve, output_file_det) - except Exception as ex2: - _logger.exception("(second attempt) Error while trying to retrieve and convert pedestal data") - - + run_log_file_det = run_log_file[:-4] + "." + detector + ".log" + + write_request = get_writer_request(writer_type=broker_config.TAG_DETECTOR_RETRIEVE, + channels=request_det_retrieve, + output_file=output_file_det, + metadata=None, + start_pulse_id=det_start_pulse_id, + stop_pulse_id=det_start_pulse_id, + run_log_file=run_log_file_det) + + broker_client.send(write_request, broker_config.TAG_DETECTOR_RETRIEVE) + + broker_client.close() elif writer_type == broker_config.TAG_POWER_ON: _logger.info("Power ON detector") @@ -222,7 +229,7 @@ def reject_request(channel, method_frame, body, output_file, e): update_status(channel, body, "write_rejected", output_file, str(e)) -def on_broker_message(channel, method_frame, header_frame, body, connection): +def on_broker_message(channel, method_frame, header_frame, body, connection, broker_client): try: request = json.loads(body.decode()) @@ -232,7 +239,7 @@ def on_broker_message(channel, method_frame, header_frame, body, connection): def process_async(): try: - process_request(request) + process_request(request, broker_client) except Exception as ex: _logger.exception("Error while trying to write a requested data.") @@ -282,7 +289,9 @@ def start_service(broker_url, writer_type=0): channel.basic_qos(prefetch_count=1) - on_broker_message_f = partial(on_broker_message, connection=connection) + broker_client = RabbitMqClient(broker_url=broker_url) + + on_broker_message_f = partial(on_broker_message, connection=connection, broker_client=broker_client) channel.basic_consume(request_queue, on_broker_message_f) try: