#!/usr/bin/env python import copy import json from datetime import datetime from functools import partial from logzero import logger from pika import BlockingConnection, ConnectionParameters from stand.client import Client from helpers import check_pgroup STATUS_EXCHANGE = "status" def wait_for_message(args): params = ConnectionParameters(args.broker_url) with BlockingConnection(params) as connection: with connection.channel() as chn: chn.exchange_declare(exchange=STATUS_EXCHANGE, exchange_type="fanout") queue = chn.queue_declare(queue="", exclusive=True).method.queue chn.queue_bind(queue=queue, exchange=STATUS_EXCHANGE) chn.basic_consume(queue, partial(on_status, args=args), auto_ack=True) try: chn.start_consuming() except KeyboardInterrupt: logger.debug("keyboard interrupt, exiting") chn.stop_consuming() def on_status(_chn, _method_frame, header_frame, body, args): header = header_frame.headers body = body.decode() request = json.loads(body) t_stamp = header_frame.timestamp if request.get("writer_type")=="processing": print(header) if ( header.get("action") is None or request.get("writer_type") is None or not isinstance(request.get("apo_meta"), dict) or request.get("metadata") is None ): return if ( request["metadata"].get("general/user") is None or request["metadata"].get("general/instrument") is None or request["apo_meta"].get("table") is None ): return #if header["action"] == "request_success": #print(header) #print(body) if msg_filter( action=header["action"], req_endstation=request["metadata"]["general/instrument"], endstation=args.endstation, req_pgroup=int(request["metadata"]["general/user"]), pgroup=args.pgroup, ): row = { "pgroup": request["metadata"]["general/user"], "run_number": request["apo_meta"]["run_number"], "acq_number": request["apo_meta"]["acq_number"], "t_stamp": datetime.fromtimestamp(t_stamp / 1000000000).strftime("%Y-%m-%d %H:%M:%S"), } row.update(request["apo_meta"]["table"]) try: add_row(args, row) except KeyboardInterrupt as exc: logger.debug("keyboard interrupt, exiting") raise SystemExit from exc except Exception: logger.exception("failed to add row") raise def msg_filter(action=None, req_endstation=None, endstation=None, req_pgroup=None, pgroup=None, ): result = action == "request_success" and req_endstation == endstation if pgroup: result = result and req_pgroup == pgroup return result def add_row(args, dic): args.stand_cli.add_row(**dic) def get_host(endstation): host_endstations = { "alvra": "saresa-vcons-01.psi.ch", "bernina": "saresb-vcons-01.psi.ch", "cristallina": "saresc-vcons-01.psi.ch", "diavolezza": "satesd-vcons-01.psi.ch", "maloja": "satese-vcons-01.psi.ch", "furka": "satesf-vcons-01.psi.ch" } return host_endstations.get(endstation) if __name__ == "__main__": import argparse ENDSTATIONS = [ "alvra", "bernina", "cristallina", "diavolezza", "maloja", "furka" ] # yapf: disable description = "" parser = argparse.ArgumentParser(description=description) parser._optionals.title = "flag arguments" required = parser.add_argument_group("required") optional = parser.add_argument_group("optional") required.add_argument( "--endstation", "-e", choices=ENDSTATIONS, required=True, help="Endstation running the table", ) optional.add_argument( "--pgroup", "-p", help=""" Pgroup number, given as string in format 'pXXXXX', where X is a digit, to make sure right data is processed""", ) optional.add_argument("--broker-url", default="sf-daq-11", help="RabbitMQ broker URL") optional.add_argument("--stand-host", default="endstation", help="Host for stand") optional.add_argument("--stand-port", default="9090", help="Port for stand") args = parser.parse_args() args.pgroup = check_pgroup(args.pgroup) #keeping pgroup as int args.pgroup = int(args.pgroup[1:]) if args.stand_host == "endstation": args.stand_host = get_host(args.endstation) args.stand_cli = Client(host=args.stand_host, port=args.stand_port) wait_for_message(args)