mirror of
https://github.com/paulscherrerinstitute/sf_daq_broker.git
synced 2026-05-17 15:26:48 +02:00
top to bottom function order for exectuable scripts
This commit is contained in:
+145
-146
@@ -20,44 +20,130 @@ _logger = logging.getLogger("broker_writer")
|
||||
|
||||
|
||||
|
||||
def audit_failed_write_request(write_request):
|
||||
original_output_file = write_request.get("output_file", None)
|
||||
if not original_output_file:
|
||||
return
|
||||
def run():
|
||||
parser = argparse.ArgumentParser(description="data writer")
|
||||
|
||||
output_file = original_output_file + ".err"
|
||||
parser.add_argument("--broker_url", default=broker_config.DEFAULT_BROKER_URL, help="RabbitMQ broker URL")
|
||||
parser.add_argument("--log_level", default="INFO", choices=["CRITICAL", "ERROR", "WARNING", "INFO", "DEBUG"], help="log level")
|
||||
parser.add_argument("--writer_id", default=1, type=int, help="writer ID")
|
||||
parser.add_argument("--writer_type", default=0, type=int, choices=range(4), help="writer type (0: epics/BS/camera; 1: detector retrieve; 2: detector conversion; 3: pedestal)")
|
||||
|
||||
clargs = parser.parse_args()
|
||||
|
||||
stream_handler = logging.StreamHandler()
|
||||
stream_handler.setLevel(clargs.log_level)
|
||||
formatter = logging.Formatter(f"[%(levelname)s] (broker_writer_{clargs.writer_id}_{clargs.writer_type}) %(message)s")
|
||||
stream_handler.setFormatter(formatter)
|
||||
|
||||
_logger.setLevel(clargs.log_level)
|
||||
_logger.addHandler(stream_handler)
|
||||
|
||||
for logger_name in ["data_api", "data_api3"]:
|
||||
logger_data_api = logging.getLogger(logger_name)
|
||||
logger_data_api.setLevel(clargs.log_level)
|
||||
logger_data_api.addHandler(stream_handler)
|
||||
|
||||
# make message-broker less noisy in logs
|
||||
logging.getLogger("pika").setLevel(logging.WARNING)
|
||||
|
||||
_logger.info("Writer started. Waiting for requests.")
|
||||
|
||||
start_service(broker_url=clargs.broker_url, writer_type=clargs.writer_type)
|
||||
|
||||
|
||||
def start_service(broker_url, writer_type=0):
|
||||
connection = BlockingConnection(ConnectionParameters(broker_url))
|
||||
channel = connection.channel()
|
||||
|
||||
channel.exchange_declare(exchange=broker_config.STATUS_EXCHANGE, exchange_type="fanout")
|
||||
channel.exchange_declare(exchange=broker_config.REQUEST_EXCHANGE, exchange_type="topic")
|
||||
|
||||
#TODO: should introduce named writer_types / enum?
|
||||
|
||||
ROUTING_KEYS = {
|
||||
1: broker_config.DETECTOR_RETRIEVE_ROUTE,
|
||||
2: broker_config.DETECTOR_CONVERSION_ROUTE,
|
||||
3: broker_config.DETECTOR_PEDESTAL_ROUTE
|
||||
}
|
||||
|
||||
REQUEST_QUEUES = {
|
||||
1: broker_config.DETECTOR_RETRIEVE_QUEUE,
|
||||
2: broker_config.DETECTOR_CONVERSION_QUEUE,
|
||||
3: broker_config.DETECTOR_PEDESTAL_QUEUE
|
||||
}
|
||||
|
||||
routing_key = ROUTING_KEYS.get(writer_type, broker_config.DEFAULT_ROUTE)
|
||||
request_queue = REQUEST_QUEUES.get(writer_type, broker_config.DEFAULT_QUEUE)
|
||||
|
||||
channel.queue_declare(queue=request_queue, auto_delete=True)
|
||||
channel.queue_bind(queue=request_queue, exchange=broker_config.REQUEST_EXCHANGE, routing_key=routing_key)
|
||||
channel.basic_qos(prefetch_count=1)
|
||||
|
||||
broker_client = BrokerClient(broker_url=broker_url)
|
||||
|
||||
on_broker_message_cb = partial(on_broker_message, connection=connection, broker_client=broker_client)
|
||||
channel.basic_consume(request_queue, on_broker_message_cb)
|
||||
|
||||
try:
|
||||
current_time = datetime.now().strftime(config.AUDIT_FILE_TIME_FORMAT)
|
||||
|
||||
with open(output_file, "w") as audit_file:
|
||||
pretty_write_request = json_obj_to_str(write_request)
|
||||
audit_file.write(f"[{current_time}] {pretty_write_request}")
|
||||
|
||||
except Exception:
|
||||
_logger.exception(f"Error while trying to write request {write_request} to file {output_file}.")
|
||||
channel.start_consuming()
|
||||
except KeyboardInterrupt:
|
||||
channel.stop_consuming()
|
||||
|
||||
|
||||
def wait_for_delay(request_timestamp, writer_type):
|
||||
if request_timestamp is None:
|
||||
return
|
||||
def on_broker_message(channel, method_frame, _header_frame, body, connection, broker_client):
|
||||
try:
|
||||
request = json_str_to_obj(body.decode())
|
||||
|
||||
time_to_wait = config.BSDATA_RETRIEVAL_DELAY
|
||||
if writer_type == broker_config.TAG_DETECTOR_RETRIEVE:
|
||||
time_to_wait = config.DETECTOR_RETRIEVAL_DELAY
|
||||
output_file = request.get("output_file", None)
|
||||
update_status(channel, body, "write_start", output_file)
|
||||
|
||||
# # should not come here in this case, since request_timestamp is None
|
||||
# if writer_type == broker_config.TAG_PEDESTAL or writer_type != broker_config.TAG_POWER_ON:
|
||||
# time_to_wait = 0
|
||||
def process_async():
|
||||
try:
|
||||
process_request(request, broker_client)
|
||||
|
||||
current_timestamp = time()
|
||||
# sleep time = target sleep time - time that has already passed.
|
||||
adjusted_retrieval_delay = time_to_wait - (current_timestamp - request_timestamp)
|
||||
adjusted_retrieval_delay = max(adjusted_retrieval_delay, 0)
|
||||
except Exception as e:
|
||||
_logger.exception("Error while trying to write a requested data.")
|
||||
callback = partial(reject_request, channel, method_frame, body, output_file, e)
|
||||
|
||||
_logger.debug(f"Request timestamp={request_timestamp}, current_timestamp={current_timestamp}, adjusted_retrieval_delay={adjusted_retrieval_delay}.")
|
||||
_logger.info(f"Sleeping for {adjusted_retrieval_delay} seconds before continuing.")
|
||||
sleep(adjusted_retrieval_delay)
|
||||
else:
|
||||
callback = partial(confirm_request, channel, method_frame, body, output_file)
|
||||
|
||||
connection.add_callback_threadsafe(callback)
|
||||
|
||||
thread = Thread(target=process_async)
|
||||
thread.daemon = True
|
||||
thread.start()
|
||||
|
||||
except Exception as e:
|
||||
_logger.exception("Error while trying to write a requested data.")
|
||||
reject_request(channel, method_frame, body, output_file, e)
|
||||
|
||||
|
||||
def confirm_request(channel, method_frame, body, output_file):
|
||||
channel.basic_ack(delivery_tag=method_frame.delivery_tag)
|
||||
update_status(channel, body, "write_finished", output_file)
|
||||
|
||||
|
||||
def reject_request(channel, method_frame, body, output_file, e):
|
||||
channel.basic_reject(delivery_tag=method_frame.delivery_tag, requeue=False)
|
||||
update_status(channel, body, "write_rejected", output_file, str(e))
|
||||
|
||||
|
||||
def update_status(channel, body, action, file, message=None):
|
||||
status_header = {
|
||||
"action": action,
|
||||
"source": "bsread_writer",
|
||||
"routing_key": "*",
|
||||
"file": file,
|
||||
"message": message
|
||||
}
|
||||
|
||||
channel.basic_publish(
|
||||
exchange=broker_config.STATUS_EXCHANGE,
|
||||
properties=BasicProperties(headers=status_header),
|
||||
routing_key="",
|
||||
body=body
|
||||
)
|
||||
|
||||
|
||||
def process_request(request, broker_client):
|
||||
@@ -197,130 +283,44 @@ def process_request(request, broker_client):
|
||||
logger_data_api.removeHandler(file_handler)
|
||||
|
||||
|
||||
def update_status(channel, body, action, file, message=None):
|
||||
status_header = {
|
||||
"action": action,
|
||||
"source": "bsread_writer",
|
||||
"routing_key": "*",
|
||||
"file": file,
|
||||
"message": message
|
||||
}
|
||||
def wait_for_delay(request_timestamp, writer_type):
|
||||
if request_timestamp is None:
|
||||
return
|
||||
|
||||
channel.basic_publish(
|
||||
exchange=broker_config.STATUS_EXCHANGE,
|
||||
properties=BasicProperties(headers=status_header),
|
||||
routing_key="",
|
||||
body=body
|
||||
)
|
||||
time_to_wait = config.BSDATA_RETRIEVAL_DELAY
|
||||
if writer_type == broker_config.TAG_DETECTOR_RETRIEVE:
|
||||
time_to_wait = config.DETECTOR_RETRIEVAL_DELAY
|
||||
|
||||
# # should not come here in this case, since request_timestamp is None
|
||||
# if writer_type == broker_config.TAG_PEDESTAL or writer_type != broker_config.TAG_POWER_ON:
|
||||
# time_to_wait = 0
|
||||
|
||||
current_timestamp = time()
|
||||
# sleep time = target sleep time - time that has already passed.
|
||||
adjusted_retrieval_delay = time_to_wait - (current_timestamp - request_timestamp)
|
||||
adjusted_retrieval_delay = max(adjusted_retrieval_delay, 0)
|
||||
|
||||
_logger.debug(f"Request timestamp={request_timestamp}, current_timestamp={current_timestamp}, adjusted_retrieval_delay={adjusted_retrieval_delay}.")
|
||||
_logger.info(f"Sleeping for {adjusted_retrieval_delay} seconds before continuing.")
|
||||
sleep(adjusted_retrieval_delay)
|
||||
|
||||
|
||||
def confirm_request(channel, method_frame, body, output_file):
|
||||
channel.basic_ack(delivery_tag=method_frame.delivery_tag)
|
||||
update_status(channel, body, "write_finished", output_file)
|
||||
def audit_failed_write_request(write_request):
|
||||
original_output_file = write_request.get("output_file", None)
|
||||
if not original_output_file:
|
||||
return
|
||||
|
||||
|
||||
def reject_request(channel, method_frame, body, output_file, e):
|
||||
channel.basic_reject(delivery_tag=method_frame.delivery_tag, requeue=False)
|
||||
update_status(channel, body, "write_rejected", output_file, str(e))
|
||||
|
||||
|
||||
def on_broker_message(channel, method_frame, _header_frame, body, connection, broker_client):
|
||||
try:
|
||||
request = json_str_to_obj(body.decode())
|
||||
|
||||
output_file = request.get("output_file", None)
|
||||
update_status(channel, body, "write_start", output_file)
|
||||
|
||||
def process_async():
|
||||
try:
|
||||
process_request(request, broker_client)
|
||||
|
||||
except Exception as e:
|
||||
_logger.exception("Error while trying to write a requested data.")
|
||||
callback = partial(reject_request, channel, method_frame, body, output_file, e)
|
||||
|
||||
else:
|
||||
callback = partial(confirm_request, channel, method_frame, body, output_file)
|
||||
|
||||
connection.add_callback_threadsafe(callback)
|
||||
|
||||
thread = Thread(target=process_async)
|
||||
thread.daemon = True
|
||||
thread.start()
|
||||
|
||||
except Exception as e:
|
||||
_logger.exception("Error while trying to write a requested data.")
|
||||
reject_request(channel, method_frame, body, output_file, e)
|
||||
|
||||
|
||||
def start_service(broker_url, writer_type=0):
|
||||
connection = BlockingConnection(ConnectionParameters(broker_url))
|
||||
channel = connection.channel()
|
||||
|
||||
channel.exchange_declare(exchange=broker_config.STATUS_EXCHANGE, exchange_type="fanout")
|
||||
channel.exchange_declare(exchange=broker_config.REQUEST_EXCHANGE, exchange_type="topic")
|
||||
|
||||
#TODO: should introduce named writer_types / enum?
|
||||
|
||||
ROUTING_KEYS = {
|
||||
1: broker_config.DETECTOR_RETRIEVE_ROUTE,
|
||||
2: broker_config.DETECTOR_CONVERSION_ROUTE,
|
||||
3: broker_config.DETECTOR_PEDESTAL_ROUTE
|
||||
}
|
||||
|
||||
REQUEST_QUEUES = {
|
||||
1: broker_config.DETECTOR_RETRIEVE_QUEUE,
|
||||
2: broker_config.DETECTOR_CONVERSION_QUEUE,
|
||||
3: broker_config.DETECTOR_PEDESTAL_QUEUE
|
||||
}
|
||||
|
||||
routing_key = ROUTING_KEYS.get(writer_type, broker_config.DEFAULT_ROUTE)
|
||||
request_queue = REQUEST_QUEUES.get(writer_type, broker_config.DEFAULT_QUEUE)
|
||||
|
||||
channel.queue_declare(queue=request_queue, auto_delete=True)
|
||||
channel.queue_bind(queue=request_queue, exchange=broker_config.REQUEST_EXCHANGE, routing_key=routing_key)
|
||||
channel.basic_qos(prefetch_count=1)
|
||||
|
||||
broker_client = BrokerClient(broker_url=broker_url)
|
||||
|
||||
on_broker_message_cb = partial(on_broker_message, connection=connection, broker_client=broker_client)
|
||||
channel.basic_consume(request_queue, on_broker_message_cb)
|
||||
output_file = original_output_file + ".err"
|
||||
|
||||
try:
|
||||
channel.start_consuming()
|
||||
except KeyboardInterrupt:
|
||||
channel.stop_consuming()
|
||||
current_time = datetime.now().strftime(config.AUDIT_FILE_TIME_FORMAT)
|
||||
|
||||
with open(output_file, "w") as audit_file:
|
||||
pretty_write_request = json_obj_to_str(write_request)
|
||||
audit_file.write(f"[{current_time}] {pretty_write_request}")
|
||||
|
||||
def run():
|
||||
parser = argparse.ArgumentParser(description="data writer")
|
||||
|
||||
parser.add_argument("--broker_url", default=broker_config.DEFAULT_BROKER_URL, help="RabbitMQ broker URL")
|
||||
parser.add_argument("--log_level", default="INFO", choices=["CRITICAL", "ERROR", "WARNING", "INFO", "DEBUG"], help="log level")
|
||||
parser.add_argument("--writer_id", default=1, type=int, help="writer ID")
|
||||
parser.add_argument("--writer_type", default=0, type=int, choices=range(4), help="writer type (0: epics/BS/camera; 1: detector retrieve; 2: detector conversion; 3: pedestal)")
|
||||
|
||||
clargs = parser.parse_args()
|
||||
|
||||
stream_handler = logging.StreamHandler()
|
||||
stream_handler.setLevel(clargs.log_level)
|
||||
formatter = logging.Formatter(f"[%(levelname)s] (broker_writer_{clargs.writer_id}_{clargs.writer_type}) %(message)s")
|
||||
stream_handler.setFormatter(formatter)
|
||||
|
||||
_logger.setLevel(clargs.log_level)
|
||||
_logger.addHandler(stream_handler)
|
||||
|
||||
for logger_name in ["data_api", "data_api3"]:
|
||||
logger_data_api = logging.getLogger(logger_name)
|
||||
logger_data_api.setLevel(clargs.log_level)
|
||||
logger_data_api.addHandler(stream_handler)
|
||||
|
||||
# make message-broker less noisy in logs
|
||||
logging.getLogger("pika").setLevel(logging.WARNING)
|
||||
|
||||
_logger.info("Writer started. Waiting for requests.")
|
||||
|
||||
start_service(broker_url=clargs.broker_url, writer_type=clargs.writer_type)
|
||||
except Exception:
|
||||
_logger.exception(f"Error while trying to write request {write_request} to file {output_file}.")
|
||||
|
||||
|
||||
|
||||
@@ -330,4 +330,3 @@ if __name__ == "__main__":
|
||||
run()
|
||||
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user