Files
spear/wait_4_msg.py
2025-09-09 13:31:08 +02:00

164 lines
4.9 KiB
Python
Executable File

#!/usr/bin/env python
import copy
import json
import re
from datetime import datetime
from functools import partial
from logzero import logger
from pika import BlockingConnection, ConnectionParameters
from stand.client import Client
STATUS_EXCHANGE = "status"
def wait_for_message(args):
params = ConnectionParameters(args.broker_url)
with BlockingConnection(params) as connection:
with connection.channel() as chn:
chn.exchange_declare(exchange=STATUS_EXCHANGE, exchange_type="fanout")
queue = chn.queue_declare(queue="", exclusive=True).method.queue
chn.queue_bind(queue=queue, exchange=STATUS_EXCHANGE)
chn.basic_consume(queue, partial(on_status, args=args), auto_ack=True)
try:
chn.start_consuming()
except KeyboardInterrupt:
logger.debug("keyboard interrupt, exiting")
chn.stop_consuming()
def on_status(_chn, _method_frame, header_frame, body, args):
header = header_frame.headers
body = body.decode()
request = json.loads(body)
t_stamp = header_frame.timestamp
#print(header)
#print(body)
if (
header.get("action") is None or request.get("writer_type") is None
or not isinstance(request.get("apo_meta"), dict) or request.get("metadata") is None
):
return
if (
request["metadata"].get("general/user") is None
or request["metadata"].get("general/instrument") is None
or request["apo_meta"].get("table") is None
):
return
#if header["action"] == "request_success":
#print(header)
#print(body)
if msg_filter(
action=header["action"],
req_endstation=request["metadata"]["general/instrument"],
endstation=args.endstation,
req_pgroup=int(request["metadata"]["general/user"]),
pgroup=args.pgroup,
):
row = {
"pgroup": request["metadata"]["general/user"],
"run_number": request["apo_meta"]["run_number"],
"acq_number": request["apo_meta"]["acq_number"],
"t_stamp": datetime.fromtimestamp(t_stamp / 1000000000).strftime("%Y-%m-%d %H:%M:%S"),
}
row.update(request["apo_meta"]["table"])
try:
add_row(args, row)
except KeyboardInterrupt as exc:
logger.debug("keyboard interrupt, exiting")
raise SystemExit from exc
except Exception:
logger.exception("failed to add row")
raise
def msg_filter(action=None, req_endstation=None, endstation=None, req_pgroup=None, pgroup=None, ):
result = action == "request_success" and req_endstation == endstation
if pgroup:
result = result and req_pgroup == pgroup
return result
def add_row(args, dic):
args.stand_cli.add_row(**dic)
def get_host(endstation):
host_endstations = {
"alvra": "saresa-vcons-01.psi.ch",
"bernina": "saresb-vcons-01.psi.ch",
"cristallina": "saresc-vcons-01.psi.ch",
"diavolezza": "satesd-vcons-01.psi.ch",
"maloja": "satese-vcons-01.psi.ch",
"furka": "satesf-vcons-01.psi.ch"
}
return host_endstations.get(endstation)
def check_pgroup(pgroup):
if pgroup is None:
return
if re.match(r"^p\d{5}$", pgroup.casefold()):
return int(pgroup[1:])
if re.match(r"^\d{5}$", pgroup):
return pgroup
msg = f"Pgroup {pgroup} supplied in incorrect format. Proper format 'pXXXXX', X is a digit"
logger.error(msg)
raise SystemExit(msg)
if __name__ == "__main__":
import argparse
ENDSTATIONS = [
"alvra",
"bernina",
"cristallina",
"diavolezza",
"maloja",
"furka"
] # yapf: disable
description = ""
parser = argparse.ArgumentParser(description=description)
parser._optionals.title = "flag arguments"
required = parser.add_argument_group("required")
optional = parser.add_argument_group("optional")
required.add_argument(
"--endstation",
"-e",
choices=ENDSTATIONS,
required=True,
help="Endstation running the table",
)
optional.add_argument(
"--pgroup",
"-p",
help="""
Pgroup number, given as string in format 'pXXXXX', where X is a digit,
to make sure right data is processed""",
)
optional.add_argument("--broker-url", default="sf-daq-11", help="RabbitMQ broker URL")
optional.add_argument("--stand-host", default="endstation", help="Host for stand")
optional.add_argument("--stand-port", default="9090", help="Port for stand")
args = parser.parse_args()
#keeping pgroup as int
args.pgroup = check_pgroup(args.pgroup)
if args.stand_host == "endstation":
args.stand_host = get_host(args.endstation)
args.stand_cli = Client(host=args.stand_host, port=args.stand_port)
wait_for_message(args)