mirror of
https://github.com/ivan-usov-org/bec.git
synced 2025-04-22 02:20:02 +02:00
65 lines
2.1 KiB
Python
65 lines
2.1 KiB
Python
import argparse
|
|
import logging
|
|
|
|
from bec_utils import RedisConnector, ServiceConfig, MessageEndpoints, BECMessage
|
|
from datetime import datetime
|
|
|
|
from influxdb_client import InfluxDBClient, Point, WritePrecision, rest
|
|
from influxdb_client.client.write_api import SYNCHRONOUS
|
|
|
|
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
|
|
parser.add_argument(
|
|
"--config",
|
|
default="",
|
|
help="path to the config file",
|
|
)
|
|
clargs = parser.parse_args()
|
|
config_path = clargs.config
|
|
|
|
config = ServiceConfig(config_path)
|
|
|
|
logging.basicConfig(filename="influxdb-forwarder.log", level=logging.INFO, filemode="w+")
|
|
# logging.getLogger("kafka").setLevel(50)
|
|
logging.getLogger().addHandler(logging.StreamHandler())
|
|
|
|
|
|
def influxdb_ingest(msg, influxdb_writer=None, **kwargs):
|
|
msg = BECMessage.DeviceMessage.loads(msg.value)
|
|
signals = msg.content["signals"]
|
|
for dev, data in signals.items():
|
|
try:
|
|
point = (
|
|
Point(dev)
|
|
.tag("readback", dev)
|
|
.field(
|
|
"value",
|
|
float(data.get("value"))
|
|
if type(data.get("value")) == int
|
|
else data.get("value"),
|
|
)
|
|
.time(datetime.utcnow(), WritePrecision.NS)
|
|
)
|
|
influxdb_writer.write(bucket, org, point)
|
|
except rest.ApiException as e:
|
|
print(f"exception: {e.message}")
|
|
|
|
|
|
# You can generate an API token from the "API Tokens Tab" in the UI
|
|
token = "3fochbsfIA4A1ixmlD3okhVmbGQX5Sg9rhMicmiUFRYAVL7YTu0poLYCsq1bi4sOLO45fzlJkIbppTgJAlpjrA=="
|
|
org = "PSI"
|
|
bucket = "bec"
|
|
|
|
with InfluxDBClient(url="http://129.129.195.115:8086", token=token, org=org) as client:
|
|
print("started")
|
|
|
|
write_api = client.write_api(write_options=SYNCHRONOUS)
|
|
connector = RedisConnector(bootstrap=config.redis)
|
|
device_readback = connector.consumer(
|
|
pattern=MessageEndpoints.device_readback("*"),
|
|
cb=influxdb_ingest,
|
|
influxdb_writer=write_api,
|
|
threaded=False,
|
|
)
|
|
while True:
|
|
device_readback.poll_messages()
|