From 5d2a3ed26bc8fa565ceae38c3c32ffae1f3ba1ef Mon Sep 17 00:00:00 2001 From: Sven Augustin Date: Mon, 12 Feb 2024 12:52:59 +0100 Subject: [PATCH] top to bottom function order for exectuable scripts --- sf_daq_broker/writer/start.py | 291 +++++++++++++++++----------------- 1 file changed, 145 insertions(+), 146 deletions(-) diff --git a/sf_daq_broker/writer/start.py b/sf_daq_broker/writer/start.py index 8d50167..2e4e2df 100644 --- a/sf_daq_broker/writer/start.py +++ b/sf_daq_broker/writer/start.py @@ -20,44 +20,130 @@ _logger = logging.getLogger("broker_writer") -def audit_failed_write_request(write_request): - original_output_file = write_request.get("output_file", None) - if not original_output_file: - return +def run(): + parser = argparse.ArgumentParser(description="data writer") - output_file = original_output_file + ".err" + parser.add_argument("--broker_url", default=broker_config.DEFAULT_BROKER_URL, help="RabbitMQ broker URL") + parser.add_argument("--log_level", default="INFO", choices=["CRITICAL", "ERROR", "WARNING", "INFO", "DEBUG"], help="log level") + parser.add_argument("--writer_id", default=1, type=int, help="writer ID") + parser.add_argument("--writer_type", default=0, type=int, choices=range(4), help="writer type (0: epics/BS/camera; 1: detector retrieve; 2: detector conversion; 3: pedestal)") + + clargs = parser.parse_args() + + stream_handler = logging.StreamHandler() + stream_handler.setLevel(clargs.log_level) + formatter = logging.Formatter(f"[%(levelname)s] (broker_writer_{clargs.writer_id}_{clargs.writer_type}) %(message)s") + stream_handler.setFormatter(formatter) + + _logger.setLevel(clargs.log_level) + _logger.addHandler(stream_handler) + + for logger_name in ["data_api", "data_api3"]: + logger_data_api = logging.getLogger(logger_name) + logger_data_api.setLevel(clargs.log_level) + logger_data_api.addHandler(stream_handler) + + # make message-broker less noisy in logs + logging.getLogger("pika").setLevel(logging.WARNING) + + _logger.info("Writer started. Waiting for requests.") + + start_service(broker_url=clargs.broker_url, writer_type=clargs.writer_type) + + +def start_service(broker_url, writer_type=0): + connection = BlockingConnection(ConnectionParameters(broker_url)) + channel = connection.channel() + + channel.exchange_declare(exchange=broker_config.STATUS_EXCHANGE, exchange_type="fanout") + channel.exchange_declare(exchange=broker_config.REQUEST_EXCHANGE, exchange_type="topic") + + #TODO: should introduce named writer_types / enum? + + ROUTING_KEYS = { + 1: broker_config.DETECTOR_RETRIEVE_ROUTE, + 2: broker_config.DETECTOR_CONVERSION_ROUTE, + 3: broker_config.DETECTOR_PEDESTAL_ROUTE + } + + REQUEST_QUEUES = { + 1: broker_config.DETECTOR_RETRIEVE_QUEUE, + 2: broker_config.DETECTOR_CONVERSION_QUEUE, + 3: broker_config.DETECTOR_PEDESTAL_QUEUE + } + + routing_key = ROUTING_KEYS.get(writer_type, broker_config.DEFAULT_ROUTE) + request_queue = REQUEST_QUEUES.get(writer_type, broker_config.DEFAULT_QUEUE) + + channel.queue_declare(queue=request_queue, auto_delete=True) + channel.queue_bind(queue=request_queue, exchange=broker_config.REQUEST_EXCHANGE, routing_key=routing_key) + channel.basic_qos(prefetch_count=1) + + broker_client = BrokerClient(broker_url=broker_url) + + on_broker_message_cb = partial(on_broker_message, connection=connection, broker_client=broker_client) + channel.basic_consume(request_queue, on_broker_message_cb) try: - current_time = datetime.now().strftime(config.AUDIT_FILE_TIME_FORMAT) - - with open(output_file, "w") as audit_file: - pretty_write_request = json_obj_to_str(write_request) - audit_file.write(f"[{current_time}] {pretty_write_request}") - - except Exception: - _logger.exception(f"Error while trying to write request {write_request} to file {output_file}.") + channel.start_consuming() + except KeyboardInterrupt: + channel.stop_consuming() -def wait_for_delay(request_timestamp, writer_type): - if request_timestamp is None: - return +def on_broker_message(channel, method_frame, _header_frame, body, connection, broker_client): + try: + request = json_str_to_obj(body.decode()) - time_to_wait = config.BSDATA_RETRIEVAL_DELAY - if writer_type == broker_config.TAG_DETECTOR_RETRIEVE: - time_to_wait = config.DETECTOR_RETRIEVAL_DELAY + output_file = request.get("output_file", None) + update_status(channel, body, "write_start", output_file) -# # should not come here in this case, since request_timestamp is None -# if writer_type == broker_config.TAG_PEDESTAL or writer_type != broker_config.TAG_POWER_ON: -# time_to_wait = 0 + def process_async(): + try: + process_request(request, broker_client) - current_timestamp = time() - # sleep time = target sleep time - time that has already passed. - adjusted_retrieval_delay = time_to_wait - (current_timestamp - request_timestamp) - adjusted_retrieval_delay = max(adjusted_retrieval_delay, 0) + except Exception as e: + _logger.exception("Error while trying to write a requested data.") + callback = partial(reject_request, channel, method_frame, body, output_file, e) - _logger.debug(f"Request timestamp={request_timestamp}, current_timestamp={current_timestamp}, adjusted_retrieval_delay={adjusted_retrieval_delay}.") - _logger.info(f"Sleeping for {adjusted_retrieval_delay} seconds before continuing.") - sleep(adjusted_retrieval_delay) + else: + callback = partial(confirm_request, channel, method_frame, body, output_file) + + connection.add_callback_threadsafe(callback) + + thread = Thread(target=process_async) + thread.daemon = True + thread.start() + + except Exception as e: + _logger.exception("Error while trying to write a requested data.") + reject_request(channel, method_frame, body, output_file, e) + + +def confirm_request(channel, method_frame, body, output_file): + channel.basic_ack(delivery_tag=method_frame.delivery_tag) + update_status(channel, body, "write_finished", output_file) + + +def reject_request(channel, method_frame, body, output_file, e): + channel.basic_reject(delivery_tag=method_frame.delivery_tag, requeue=False) + update_status(channel, body, "write_rejected", output_file, str(e)) + + +def update_status(channel, body, action, file, message=None): + status_header = { + "action": action, + "source": "bsread_writer", + "routing_key": "*", + "file": file, + "message": message + } + + channel.basic_publish( + exchange=broker_config.STATUS_EXCHANGE, + properties=BasicProperties(headers=status_header), + routing_key="", + body=body + ) def process_request(request, broker_client): @@ -197,130 +283,44 @@ def process_request(request, broker_client): logger_data_api.removeHandler(file_handler) -def update_status(channel, body, action, file, message=None): - status_header = { - "action": action, - "source": "bsread_writer", - "routing_key": "*", - "file": file, - "message": message - } +def wait_for_delay(request_timestamp, writer_type): + if request_timestamp is None: + return - channel.basic_publish( - exchange=broker_config.STATUS_EXCHANGE, - properties=BasicProperties(headers=status_header), - routing_key="", - body=body - ) + time_to_wait = config.BSDATA_RETRIEVAL_DELAY + if writer_type == broker_config.TAG_DETECTOR_RETRIEVE: + time_to_wait = config.DETECTOR_RETRIEVAL_DELAY + +# # should not come here in this case, since request_timestamp is None +# if writer_type == broker_config.TAG_PEDESTAL or writer_type != broker_config.TAG_POWER_ON: +# time_to_wait = 0 + + current_timestamp = time() + # sleep time = target sleep time - time that has already passed. + adjusted_retrieval_delay = time_to_wait - (current_timestamp - request_timestamp) + adjusted_retrieval_delay = max(adjusted_retrieval_delay, 0) + + _logger.debug(f"Request timestamp={request_timestamp}, current_timestamp={current_timestamp}, adjusted_retrieval_delay={adjusted_retrieval_delay}.") + _logger.info(f"Sleeping for {adjusted_retrieval_delay} seconds before continuing.") + sleep(adjusted_retrieval_delay) -def confirm_request(channel, method_frame, body, output_file): - channel.basic_ack(delivery_tag=method_frame.delivery_tag) - update_status(channel, body, "write_finished", output_file) +def audit_failed_write_request(write_request): + original_output_file = write_request.get("output_file", None) + if not original_output_file: + return - -def reject_request(channel, method_frame, body, output_file, e): - channel.basic_reject(delivery_tag=method_frame.delivery_tag, requeue=False) - update_status(channel, body, "write_rejected", output_file, str(e)) - - -def on_broker_message(channel, method_frame, _header_frame, body, connection, broker_client): - try: - request = json_str_to_obj(body.decode()) - - output_file = request.get("output_file", None) - update_status(channel, body, "write_start", output_file) - - def process_async(): - try: - process_request(request, broker_client) - - except Exception as e: - _logger.exception("Error while trying to write a requested data.") - callback = partial(reject_request, channel, method_frame, body, output_file, e) - - else: - callback = partial(confirm_request, channel, method_frame, body, output_file) - - connection.add_callback_threadsafe(callback) - - thread = Thread(target=process_async) - thread.daemon = True - thread.start() - - except Exception as e: - _logger.exception("Error while trying to write a requested data.") - reject_request(channel, method_frame, body, output_file, e) - - -def start_service(broker_url, writer_type=0): - connection = BlockingConnection(ConnectionParameters(broker_url)) - channel = connection.channel() - - channel.exchange_declare(exchange=broker_config.STATUS_EXCHANGE, exchange_type="fanout") - channel.exchange_declare(exchange=broker_config.REQUEST_EXCHANGE, exchange_type="topic") - - #TODO: should introduce named writer_types / enum? - - ROUTING_KEYS = { - 1: broker_config.DETECTOR_RETRIEVE_ROUTE, - 2: broker_config.DETECTOR_CONVERSION_ROUTE, - 3: broker_config.DETECTOR_PEDESTAL_ROUTE - } - - REQUEST_QUEUES = { - 1: broker_config.DETECTOR_RETRIEVE_QUEUE, - 2: broker_config.DETECTOR_CONVERSION_QUEUE, - 3: broker_config.DETECTOR_PEDESTAL_QUEUE - } - - routing_key = ROUTING_KEYS.get(writer_type, broker_config.DEFAULT_ROUTE) - request_queue = REQUEST_QUEUES.get(writer_type, broker_config.DEFAULT_QUEUE) - - channel.queue_declare(queue=request_queue, auto_delete=True) - channel.queue_bind(queue=request_queue, exchange=broker_config.REQUEST_EXCHANGE, routing_key=routing_key) - channel.basic_qos(prefetch_count=1) - - broker_client = BrokerClient(broker_url=broker_url) - - on_broker_message_cb = partial(on_broker_message, connection=connection, broker_client=broker_client) - channel.basic_consume(request_queue, on_broker_message_cb) + output_file = original_output_file + ".err" try: - channel.start_consuming() - except KeyboardInterrupt: - channel.stop_consuming() + current_time = datetime.now().strftime(config.AUDIT_FILE_TIME_FORMAT) + with open(output_file, "w") as audit_file: + pretty_write_request = json_obj_to_str(write_request) + audit_file.write(f"[{current_time}] {pretty_write_request}") -def run(): - parser = argparse.ArgumentParser(description="data writer") - - parser.add_argument("--broker_url", default=broker_config.DEFAULT_BROKER_URL, help="RabbitMQ broker URL") - parser.add_argument("--log_level", default="INFO", choices=["CRITICAL", "ERROR", "WARNING", "INFO", "DEBUG"], help="log level") - parser.add_argument("--writer_id", default=1, type=int, help="writer ID") - parser.add_argument("--writer_type", default=0, type=int, choices=range(4), help="writer type (0: epics/BS/camera; 1: detector retrieve; 2: detector conversion; 3: pedestal)") - - clargs = parser.parse_args() - - stream_handler = logging.StreamHandler() - stream_handler.setLevel(clargs.log_level) - formatter = logging.Formatter(f"[%(levelname)s] (broker_writer_{clargs.writer_id}_{clargs.writer_type}) %(message)s") - stream_handler.setFormatter(formatter) - - _logger.setLevel(clargs.log_level) - _logger.addHandler(stream_handler) - - for logger_name in ["data_api", "data_api3"]: - logger_data_api = logging.getLogger(logger_name) - logger_data_api.setLevel(clargs.log_level) - logger_data_api.addHandler(stream_handler) - - # make message-broker less noisy in logs - logging.getLogger("pika").setLevel(logging.WARNING) - - _logger.info("Writer started. Waiting for requests.") - - start_service(broker_url=clargs.broker_url, writer_type=clargs.writer_type) + except Exception: + _logger.exception(f"Error while trying to write request {write_request} to file {output_file}.") @@ -330,4 +330,3 @@ if __name__ == "__main__": run() -