Files
spear/wait_4_msg.py
2025-09-12 16:20:30 +02:00

154 lines
4.6 KiB
Python
Executable File

#!/usr/bin/env python
import copy
import json
from datetime import datetime
from functools import partial
from logzero import logger
from pika import BlockingConnection, ConnectionParameters
from stand.client import Client
from helpers import check_pgroup
STATUS_EXCHANGE = "status"
def wait_for_message(args):
params = ConnectionParameters(args.broker_url)
with BlockingConnection(params) as connection:
with connection.channel() as chn:
chn.exchange_declare(exchange=STATUS_EXCHANGE, exchange_type="fanout")
queue = chn.queue_declare(queue="", exclusive=True).method.queue
chn.queue_bind(queue=queue, exchange=STATUS_EXCHANGE)
chn.basic_consume(queue, partial(on_status, args=args), auto_ack=True)
try:
chn.start_consuming()
except KeyboardInterrupt:
logger.debug("keyboard interrupt, exiting")
chn.stop_consuming()
def on_status(_chn, _method_frame, header_frame, body, args):
header = header_frame.headers
body = body.decode()
request = json.loads(body)
t_stamp = header_frame.timestamp
if request.get("writer_type")=="processing":
print(header)
if (
header.get("action") is None or request.get("writer_type") is None
or not isinstance(request.get("apo_meta"), dict) or request.get("metadata") is None
):
return
if (
request["metadata"].get("general/user") is None
or request["metadata"].get("general/instrument") is None
or request["apo_meta"].get("table") is None
):
return
#if header["action"] == "request_success":
#print(header)
#print(body)
if msg_filter(
action=header["action"],
req_endstation=request["metadata"]["general/instrument"],
endstation=args.endstation,
req_pgroup=int(request["metadata"]["general/user"]),
pgroup=args.pgroup,
):
row = {
"pgroup": request["metadata"]["general/user"],
"run_number": request["apo_meta"]["run_number"],
"acq_number": request["apo_meta"]["acq_number"],
"t_stamp": datetime.fromtimestamp(t_stamp / 1000000000).strftime("%Y-%m-%d %H:%M:%S"),
}
row.update(request["apo_meta"]["table"])
try:
add_row(args, row)
except KeyboardInterrupt as exc:
logger.debug("keyboard interrupt, exiting")
raise SystemExit from exc
except Exception:
logger.exception("failed to add row")
raise
def msg_filter(action=None, req_endstation=None, endstation=None, req_pgroup=None, pgroup=None, ):
result = action == "request_success" and req_endstation == endstation
if pgroup:
result = result and req_pgroup == pgroup
return result
def add_row(args, dic):
args.stand_cli.add_row(**dic)
def get_host(endstation):
host_endstations = {
"alvra": "saresa-vcons-01.psi.ch",
"bernina": "saresb-vcons-01.psi.ch",
"cristallina": "saresc-vcons-01.psi.ch",
"diavolezza": "satesd-vcons-01.psi.ch",
"maloja": "satese-vcons-01.psi.ch",
"furka": "satesf-vcons-01.psi.ch"
}
return host_endstations.get(endstation)
if __name__ == "__main__":
import argparse
ENDSTATIONS = [
"alvra",
"bernina",
"cristallina",
"diavolezza",
"maloja",
"furka"
] # yapf: disable
description = ""
parser = argparse.ArgumentParser(description=description)
parser._optionals.title = "flag arguments"
required = parser.add_argument_group("required")
optional = parser.add_argument_group("optional")
required.add_argument(
"--endstation",
"-e",
choices=ENDSTATIONS,
required=True,
help="Endstation running the table",
)
optional.add_argument(
"--pgroup",
"-p",
help="""
Pgroup number, given as string in format 'pXXXXX', where X is a digit,
to make sure right data is processed""",
)
optional.add_argument("--broker-url", default="sf-daq-11", help="RabbitMQ broker URL")
optional.add_argument("--stand-host", default="endstation", help="Host for stand")
optional.add_argument("--stand-port", default="9090", help="Port for stand")
args = parser.parse_args()
args.pgroup = check_pgroup(args.pgroup)
#keeping pgroup as int
args.pgroup = int(args.pgroup[1:])
if args.stand_host == "endstation":
args.stand_host = get_host(args.endstation)
args.stand_cli = Client(host=args.stand_host, port=args.stand_port)
wait_for_message(args)