From bb92aadf09a7e4d7efef1c25660a2472dfea93a3 Mon Sep 17 00:00:00 2001 From: Sven Augustin Date: Wed, 11 Dec 2024 18:40:40 +0100 Subject: [PATCH] added first version of a broker request status monitor --- slic/core/acquisition/broker/requeststatus.py | 117 ++++++++++++++++++ 1 file changed, 117 insertions(+) create mode 100644 slic/core/acquisition/broker/requeststatus.py diff --git a/slic/core/acquisition/broker/requeststatus.py b/slic/core/acquisition/broker/requeststatus.py new file mode 100644 index 000000000..2525568cd --- /dev/null +++ b/slic/core/acquisition/broker/requeststatus.py @@ -0,0 +1,117 @@ +import json +from datetime import datetime +from threading import Thread + +from pika import BlockingConnection, ConnectionParameters + + +DEFAULT_BROKER_URL = "sf-daq" +STATUS_EXCHANGE = "status" + + +class RequestStatus: + + ENTRIES = ( + "announced", + "started", + "success", + "failure", + "ghosts" + ) + + + def __init__(self, broker_url=DEFAULT_BROKER_URL): + self.broker_url = broker_url + + for i in self.ENTRIES: + setattr(self, i, {}) + + self.thread = thread = Thread(target=self._run) + thread.daemon = True + thread.start() + + + def clear(self): + for i in self.ENTRIES: + getattr(self, i).clear() + + + def _run(self): + connection = BlockingConnection(ConnectionParameters(self.broker_url)) + + channel = connection.channel() + channel.exchange_declare(exchange=STATUS_EXCHANGE, exchange_type="fanout") + + queue = channel.queue_declare(queue="", exclusive=True).method.queue + channel.queue_bind(queue=queue, exchange=STATUS_EXCHANGE) + channel.basic_consume(queue, self._on_status, auto_ack=True) + + try: + channel.start_consuming() + except KeyboardInterrupt: + channel.stop_consuming() + + + def _on_status(self, _channel, _method_frame, header_frame, body): + correlation_id = header_frame.correlation_id + headers = header_frame.headers + timestamp = header_frame.timestamp + + body = body.decode() + request = json.loads(body) + + action = headers["action"] + + timestamp = datetime.fromtimestamp(timestamp / 1e9) + + key = correlation_id.split("-", 1)[0] + + #TODO: nicely pack the information + data = { + "timestamp": timestamp, + "headers": headers, + "request": request + } + + if action == "write_request": + self.announced[key] = data + elif action == "write_start": + if key in self.announced: + self.announced.pop(key) + self.started[key] = data + else: + self.ghosts[key] = data + elif action == "write_finished": + if key in self.started: + self.started.pop(key) + self.success[key] = data + else: + self.ghosts[key] = data + elif action == "write_rejected": + if key in self.started: + self.started.pop(key) + self.failure[key] = data + else: + self.ghosts[key] = data + + + def __repr__(self): + header = "Request Status:" + underline = "-" * len(header) + res = [header, underline] + + maxlen = max(len(i) for i in self.ENTRIES) + entries = {f"{i}:".ljust(maxlen+1): len(getattr(self, i)) for i in self.ENTRIES} + res += [f"{k} {v}" for k, v in entries.items()] + + res = "\n".join(res) + return res + + + + + +if __name__ == "__main__": + rs = RequestStatus() + +