From 161d0a58022a1fdf7ce765b301a4235534b0dd21 Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Wed, 12 Aug 2020 11:34:58 +0200 Subject: [PATCH] Add BrokerClient class --- core-broker/core-broker/BrokerClient.py | 71 +++++++++++++++++++++++++ core-broker/core-broker/__init__.py | 0 2 files changed, 71 insertions(+) create mode 100644 core-broker/core-broker/BrokerClient.py create mode 100644 core-broker/core-broker/__init__.py diff --git a/core-broker/core-broker/BrokerClient.py b/core-broker/core-broker/BrokerClient.py new file mode 100644 index 0000000..2b676fe --- /dev/null +++ b/core-broker/core-broker/BrokerClient.py @@ -0,0 +1,71 @@ +import json + +from pika import BlockingConnection, ConnectionParameters, BasicProperties + + +class BrokerClient(object): + + REQUEST_EXCHANGE = "request" + STATUS_EXCHANGE = "status" + DEFAULT_BROKER_URL = "127.0.0.1" + + def __init__(self, broker_url=DEFAULT_BROKER_URL): + self.connection = BlockingConnection(ConnectionParameters(broker_url)) + self.channel = self.connection.channel() + + self.channel.exchange_declare(exchange=self.REQUEST_EXCHANGE, + exchange_type="topic") + + self.channel.exchange_declare(exchange=self.STATUS_EXCHANGE, + exchange_type="fanout") + + def close(self): + self.connection.close() + + def request_write(self, + output_prefix, + metadata=None, + detectors=None, + bsread_channels=None, + epics_pvs=None): + + routing_key = "." + + if detectors: + for detector in detectors: + routing_key += detector + "." + + if bsread_channels: + routing_key += "bsread" + "." + + if epics_pvs: + routing_key += "epics" + "." + + body_bytes = json.dumps({ + "output_prefix": output_prefix, + "metadata": metadata, + "detectors": detectors, + "bsread_channels": bsread_channels, + "epics_pvs": epics_pvs + }).encode() + + self.channel.basic_publish(exchange=self.REQUEST_EXCHANGE, + routing_key=routing_key, + body=body_bytes) + + status_header = { + "action": "write_request", + "source": "BrokerClient", + "routing_key": routing_key + } + + self.channel.basic_publish(exchange=self.STATUS_EXCHANGE, + properties=BasicProperties( + headers=status_header), + routing_key="", + body=body_bytes) + + +broker = BrokerClient() +broker.request_write("/tmp/test", epics_pvs=["test"]) +broker.close() diff --git a/core-broker/core-broker/__init__.py b/core-broker/core-broker/__init__.py new file mode 100644 index 0000000..e69de29