All checks were successful
Run apocalypse tests / Explore-Gitea-Actions (push) Successful in 10s
77 lines
2.3 KiB
Python
77 lines
2.3 KiB
Python
#!/usr/bin/env python
|
|
|
|
import json
|
|
import uuid
|
|
from time import sleep, time_ns
|
|
|
|
from pika import BasicProperties, BlockingConnection, ConnectionParameters
|
|
|
|
from apocalypse import broker_config
|
|
|
|
|
|
def prepare_msg(chn, data, file_name):
|
|
"""
|
|
chn(pika.adapters.blocking_connection.BlockingChannel):
|
|
A RabbitMQ channel object through which the message will be published.
|
|
"""
|
|
headers, body = setup_msg(data, file_name)
|
|
correlation_id = str(uuid.uuid4())
|
|
routing_key = ""
|
|
properties = BasicProperties(
|
|
headers=headers, correlation_id=correlation_id, timestamp=time_ns()
|
|
)
|
|
chn.basic_publish(
|
|
exchange=broker_config.STATUS_EXCHANGE,
|
|
properties=properties,
|
|
routing_key=routing_key,
|
|
body=body
|
|
)
|
|
|
|
|
|
def setup_msg(data, file_name):
|
|
""" """
|
|
|
|
headers = {
|
|
"action": "write_finished",
|
|
"source": "apocalypse",
|
|
"message": None
|
|
} # apo or not
|
|
|
|
headers["routing_key"] = ""
|
|
headers["file"] = str(file_name)
|
|
|
|
metadata = {
|
|
"general/created": data["request_time"], # do i change it or keep?
|
|
"general/instrument": data["endstation"],
|
|
"general/process": __name__,
|
|
"general/user": data["pgroup"][1:6]
|
|
}
|
|
body = data
|
|
|
|
body.update({
|
|
"writer_type": "apocalypse",
|
|
"output_file": str(file_name),
|
|
"metadata": metadata
|
|
}) # apo or what? how do i figure our which buffer?
|
|
|
|
body = json.dumps(body)
|
|
body = body.encode()
|
|
return headers, body
|
|
|
|
|
|
def send_msg(args):
|
|
""" """
|
|
params = ConnectionParameters(args.broker_url)
|
|
with BlockingConnection(params) as connection:
|
|
with connection.channel() as chn:
|
|
chn.exchange_declare(exchange=broker_config.STATUS_EXCHANGE, exchange_type="fanout")
|
|
queue = chn.queue_declare(queue="", exclusive=True).method.queue
|
|
chn.queue_bind(queue=queue, exchange=broker_config.STATUS_EXCHANGE)
|
|
for idx, name in enumerate(args.files_meta):
|
|
for file_name in args.files[idx]:
|
|
with open(name, "r") as f:
|
|
data = json.load(f)
|
|
data["endstation"] = data.pop("beamline")
|
|
prepare_msg(chn, data, file_name)
|
|
sleep(5)
|