commit 5eb5b31c29c699bfe13dcaa7998a76100bd7d1b5 Author: woznic_n <> Date: Tue Sep 9 13:31:08 2025 +0200 first commit diff --git a/pixi.toml b/pixi.toml new file mode 100644 index 0000000..282e5ad --- /dev/null +++ b/pixi.toml @@ -0,0 +1,16 @@ +[workspace] +authors = ["woznic_n"] +channels = ["conda-forge"] +name = "spear" +platforms = ["linux-64"] +version = "0.1.0" + +[tasks] + +[dependencies] +python = "==3.12" +pika = "*" +requests = "*" +logzero = "*" +pyyaml = "*" + diff --git a/wait_4_msg.py b/wait_4_msg.py new file mode 100755 index 0000000..fcec1e2 --- /dev/null +++ b/wait_4_msg.py @@ -0,0 +1,163 @@ +#!/usr/bin/env python + +import copy +import json +import re +from datetime import datetime +from functools import partial + +from logzero import logger +from pika import BlockingConnection, ConnectionParameters +from stand.client import Client + +STATUS_EXCHANGE = "status" + + +def wait_for_message(args): + params = ConnectionParameters(args.broker_url) + with BlockingConnection(params) as connection: + with connection.channel() as chn: + chn.exchange_declare(exchange=STATUS_EXCHANGE, exchange_type="fanout") + queue = chn.queue_declare(queue="", exclusive=True).method.queue + chn.queue_bind(queue=queue, exchange=STATUS_EXCHANGE) + chn.basic_consume(queue, partial(on_status, args=args), auto_ack=True) + + try: + chn.start_consuming() + except KeyboardInterrupt: + logger.debug("keyboard interrupt, exiting") + chn.stop_consuming() + + +def on_status(_chn, _method_frame, header_frame, body, args): + header = header_frame.headers + body = body.decode() + request = json.loads(body) + t_stamp = header_frame.timestamp + #print(header) + #print(body) + if ( + header.get("action") is None or request.get("writer_type") is None + or not isinstance(request.get("apo_meta"), dict) or request.get("metadata") is None + ): + return + if ( + request["metadata"].get("general/user") is None + or request["metadata"].get("general/instrument") is None + or request["apo_meta"].get("table") is None + ): + return + #if header["action"] == "request_success": + #print(header) + #print(body) + if msg_filter( + action=header["action"], + req_endstation=request["metadata"]["general/instrument"], + endstation=args.endstation, + req_pgroup=int(request["metadata"]["general/user"]), + pgroup=args.pgroup, + ): + row = { + "pgroup": request["metadata"]["general/user"], + "run_number": request["apo_meta"]["run_number"], + "acq_number": request["apo_meta"]["acq_number"], + "t_stamp": datetime.fromtimestamp(t_stamp / 1000000000).strftime("%Y-%m-%d %H:%M:%S"), + } + row.update(request["apo_meta"]["table"]) + try: + add_row(args, row) + except KeyboardInterrupt as exc: + logger.debug("keyboard interrupt, exiting") + raise SystemExit from exc + except Exception: + logger.exception("failed to add row") + raise + + +def msg_filter(action=None, req_endstation=None, endstation=None, req_pgroup=None, pgroup=None, ): + + result = action == "request_success" and req_endstation == endstation + + if pgroup: + result = result and req_pgroup == pgroup + + return result + + +def add_row(args, dic): + + args.stand_cli.add_row(**dic) + + +def get_host(endstation): + host_endstations = { + "alvra": "saresa-vcons-01.psi.ch", + "bernina": "saresb-vcons-01.psi.ch", + "cristallina": "saresc-vcons-01.psi.ch", + "diavolezza": "satesd-vcons-01.psi.ch", + "maloja": "satese-vcons-01.psi.ch", + "furka": "satesf-vcons-01.psi.ch" + } + return host_endstations.get(endstation) + + +def check_pgroup(pgroup): + if pgroup is None: + return + if re.match(r"^p\d{5}$", pgroup.casefold()): + return int(pgroup[1:]) + if re.match(r"^\d{5}$", pgroup): + return pgroup + msg = f"Pgroup {pgroup} supplied in incorrect format. Proper format 'pXXXXX', X is a digit" + logger.error(msg) + raise SystemExit(msg) + + +if __name__ == "__main__": + import argparse + + ENDSTATIONS = [ + "alvra", + "bernina", + "cristallina", + "diavolezza", + "maloja", + "furka" + ] # yapf: disable + + description = "" + parser = argparse.ArgumentParser(description=description) + parser._optionals.title = "flag arguments" + required = parser.add_argument_group("required") + optional = parser.add_argument_group("optional") + + required.add_argument( + "--endstation", + "-e", + choices=ENDSTATIONS, + required=True, + help="Endstation running the table", + ) + + optional.add_argument( + "--pgroup", + "-p", + help=""" + Pgroup number, given as string in format 'pXXXXX', where X is a digit, + to make sure right data is processed""", + ) + + optional.add_argument("--broker-url", default="sf-daq-11", help="RabbitMQ broker URL") + optional.add_argument("--stand-host", default="endstation", help="Host for stand") + optional.add_argument("--stand-port", default="9090", help="Port for stand") + + args = parser.parse_args() + + #keeping pgroup as int + args.pgroup = check_pgroup(args.pgroup) + if args.stand_host == "endstation": + args.stand_host = get_host(args.endstation) + + args.stand_cli = Client(host=args.stand_host, port=args.stand_port) + + wait_for_message(args)