154 lines
4.6 KiB
Python
Executable File
154 lines
4.6 KiB
Python
Executable File
#!/usr/bin/env python
|
|
|
|
import copy
|
|
import json
|
|
from datetime import datetime
|
|
from functools import partial
|
|
|
|
from logzero import logger
|
|
from pika import BlockingConnection, ConnectionParameters
|
|
from stand.client import Client
|
|
|
|
from helpers import check_pgroup
|
|
|
|
STATUS_EXCHANGE = "status"
|
|
|
|
|
|
def wait_for_message(args):
|
|
params = ConnectionParameters(args.broker_url)
|
|
with BlockingConnection(params) as connection:
|
|
with connection.channel() as chn:
|
|
chn.exchange_declare(exchange=STATUS_EXCHANGE, exchange_type="fanout")
|
|
queue = chn.queue_declare(queue="", exclusive=True).method.queue
|
|
chn.queue_bind(queue=queue, exchange=STATUS_EXCHANGE)
|
|
chn.basic_consume(queue, partial(on_status, args=args), auto_ack=True)
|
|
|
|
try:
|
|
chn.start_consuming()
|
|
except KeyboardInterrupt:
|
|
logger.debug("keyboard interrupt, exiting")
|
|
chn.stop_consuming()
|
|
|
|
|
|
def on_status(_chn, _method_frame, header_frame, body, args):
|
|
header = header_frame.headers
|
|
body = body.decode()
|
|
request = json.loads(body)
|
|
t_stamp = header_frame.timestamp
|
|
if request.get("writer_type")=="processing":
|
|
print(header)
|
|
if (
|
|
header.get("action") is None or request.get("writer_type") is None
|
|
or not isinstance(request.get("apo_meta"), dict) or request.get("metadata") is None
|
|
):
|
|
return
|
|
if (
|
|
request["metadata"].get("general/user") is None
|
|
or request["metadata"].get("general/instrument") is None
|
|
or request["apo_meta"].get("table") is None
|
|
):
|
|
return
|
|
#if header["action"] == "request_success":
|
|
#print(header)
|
|
#print(body)
|
|
if msg_filter(
|
|
action=header["action"],
|
|
req_endstation=request["metadata"]["general/instrument"],
|
|
endstation=args.endstation,
|
|
req_pgroup=int(request["metadata"]["general/user"]),
|
|
pgroup=args.pgroup,
|
|
):
|
|
row = {
|
|
"pgroup": request["metadata"]["general/user"],
|
|
"run_number": request["apo_meta"]["run_number"],
|
|
"acq_number": request["apo_meta"]["acq_number"],
|
|
"t_stamp": datetime.fromtimestamp(t_stamp / 1000000000).strftime("%Y-%m-%d %H:%M:%S"),
|
|
}
|
|
row.update(request["apo_meta"]["table"])
|
|
try:
|
|
add_row(args, row)
|
|
except KeyboardInterrupt as exc:
|
|
logger.debug("keyboard interrupt, exiting")
|
|
raise SystemExit from exc
|
|
except Exception:
|
|
logger.exception("failed to add row")
|
|
raise
|
|
|
|
|
|
def msg_filter(action=None, req_endstation=None, endstation=None, req_pgroup=None, pgroup=None, ):
|
|
|
|
result = action == "request_success" and req_endstation == endstation
|
|
|
|
if pgroup:
|
|
result = result and req_pgroup == pgroup
|
|
|
|
return result
|
|
|
|
|
|
def add_row(args, dic):
|
|
|
|
args.stand_cli.add_row(**dic)
|
|
|
|
|
|
def get_host(endstation):
|
|
host_endstations = {
|
|
"alvra": "saresa-vcons-01.psi.ch",
|
|
"bernina": "saresb-vcons-01.psi.ch",
|
|
"cristallina": "saresc-vcons-01.psi.ch",
|
|
"diavolezza": "satesd-vcons-01.psi.ch",
|
|
"maloja": "satese-vcons-01.psi.ch",
|
|
"furka": "satesf-vcons-01.psi.ch"
|
|
}
|
|
return host_endstations.get(endstation)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import argparse
|
|
|
|
ENDSTATIONS = [
|
|
"alvra",
|
|
"bernina",
|
|
"cristallina",
|
|
"diavolezza",
|
|
"maloja",
|
|
"furka"
|
|
] # yapf: disable
|
|
|
|
description = ""
|
|
parser = argparse.ArgumentParser(description=description)
|
|
parser._optionals.title = "flag arguments"
|
|
required = parser.add_argument_group("required")
|
|
optional = parser.add_argument_group("optional")
|
|
|
|
required.add_argument(
|
|
"--endstation",
|
|
"-e",
|
|
choices=ENDSTATIONS,
|
|
required=True,
|
|
help="Endstation running the table",
|
|
)
|
|
|
|
optional.add_argument(
|
|
"--pgroup",
|
|
"-p",
|
|
help="""
|
|
Pgroup number, given as string in format 'pXXXXX', where X is a digit,
|
|
to make sure right data is processed""",
|
|
)
|
|
|
|
optional.add_argument("--broker-url", default="sf-daq-11", help="RabbitMQ broker URL")
|
|
optional.add_argument("--stand-host", default="endstation", help="Host for stand")
|
|
optional.add_argument("--stand-port", default="9090", help="Port for stand")
|
|
|
|
args = parser.parse_args()
|
|
|
|
args.pgroup = check_pgroup(args.pgroup)
|
|
#keeping pgroup as int
|
|
args.pgroup = int(args.pgroup[1:])
|
|
if args.stand_host == "endstation":
|
|
args.stand_host = get_host(args.endstation)
|
|
|
|
args.stand_cli = Client(host=args.stand_host, port=args.stand_port)
|
|
|
|
wait_for_message(args)
|