Files
apocalypse/res/send_redo_msg.py
woznic_n 177d8777c1
All checks were successful
Run apocalypse tests / Explore-Gitea-Actions (push) Successful in 10s
first commit
2025-09-08 18:53:14 +02:00

77 lines
2.3 KiB
Python

#!/usr/bin/env python
import json
import uuid
from time import sleep, time_ns
from pika import BasicProperties, BlockingConnection, ConnectionParameters
from apocalypse import broker_config
def prepare_msg(chn, data, file_name):
"""
chn(pika.adapters.blocking_connection.BlockingChannel):
A RabbitMQ channel object through which the message will be published.
"""
headers, body = setup_msg(data, file_name)
correlation_id = str(uuid.uuid4())
routing_key = ""
properties = BasicProperties(
headers=headers, correlation_id=correlation_id, timestamp=time_ns()
)
chn.basic_publish(
exchange=broker_config.STATUS_EXCHANGE,
properties=properties,
routing_key=routing_key,
body=body
)
def setup_msg(data, file_name):
""" """
headers = {
"action": "write_finished",
"source": "apocalypse",
"message": None
} # apo or not
headers["routing_key"] = ""
headers["file"] = str(file_name)
metadata = {
"general/created": data["request_time"], # do i change it or keep?
"general/instrument": data["endstation"],
"general/process": __name__,
"general/user": data["pgroup"][1:6]
}
body = data
body.update({
"writer_type": "apocalypse",
"output_file": str(file_name),
"metadata": metadata
}) # apo or what? how do i figure our which buffer?
body = json.dumps(body)
body = body.encode()
return headers, body
def send_msg(args):
""" """
params = ConnectionParameters(args.broker_url)
with BlockingConnection(params) as connection:
with connection.channel() as chn:
chn.exchange_declare(exchange=broker_config.STATUS_EXCHANGE, exchange_type="fanout")
queue = chn.queue_declare(queue="", exclusive=True).method.queue
chn.queue_bind(queue=queue, exchange=broker_config.STATUS_EXCHANGE)
for idx, name in enumerate(args.files_meta):
for file_name in args.files[idx]:
with open(name, "r") as f:
data = json.load(f)
data["endstation"] = data.pop("beamline")
prepare_msg(chn, data, file_name)
sleep(5)