683 lines
26 KiB
Python
683 lines
26 KiB
Python
# *****************************************************************************
|
|
# Copyright (c) 2024 ff by the module authors
|
|
#
|
|
# This program is free software; you can redistribute it and/or modify it under
|
|
# the terms of the GNU General Public License as published by the Free Software
|
|
# Foundation; either version 2 of the License, or (at your option) any later
|
|
# version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful, but WITHOUT
|
|
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
|
|
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
|
|
# details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License along with
|
|
# this program; if not, write to the Free Software Foundation, Inc.,
|
|
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
|
|
#
|
|
# Module authors:
|
|
# Markus Zolliker <markus.zolliker@psi.ch>
|
|
#
|
|
# *****************************************************************************
|
|
import re
|
|
import time
|
|
from pathlib import Path
|
|
from configparser import ConfigParser
|
|
from datetime import datetime, timezone
|
|
from math import floor, ceil
|
|
|
|
from influxdb_client import InfluxDBClient, BucketRetentionRules, Point
|
|
from influxdb_client.client.write_api import SYNCHRONOUS
|
|
|
|
DAY = 24 * 3600
|
|
YEAR = 366 * DAY
|
|
# write_precision from digits after decimal point
|
|
TIME_PRECISION = ['s'] + ['ms'] * 3 + ['us'] * 3 + ['ns'] * 3
|
|
UNDEF = '<undef>'
|
|
|
|
try:
|
|
parse_time = datetime.fromisoformat
|
|
except AttributeError:
|
|
from dateutil.parser import parse as parse_time
|
|
|
|
|
|
def to_time(v):
|
|
return parse_time(v).timestamp()
|
|
|
|
|
|
def to_iso(t):
|
|
return datetime.fromtimestamp(t, timezone.utc).isoformat().replace('+00:00', 'Z')
|
|
|
|
|
|
class PrettyFloat(float):
|
|
"""saves bandwidth when converting to JSON
|
|
|
|
a lot of numbers originally have a fixed (low) number of decimal digits.
|
|
as the binary representation is not exact, it might happen, that a
|
|
lot of superfluous digits are transmitted:
|
|
|
|
str(1/10*3) == '0.30000000000000004'
|
|
str(PrettyFloat(1/10*3)) == '0.3'
|
|
"""
|
|
def __new__(cls, value):
|
|
return None if value == '-0' else super().__new__(cls, value)
|
|
|
|
def __repr__(self):
|
|
return '%.15g' % self
|
|
|
|
|
|
class Converters(dict):
|
|
def __init__(self, datatypes):
|
|
super().__init__((i, getattr(self, f"cvt_{d.split(':')[0]}"))
|
|
for i, d in enumerate(datatypes) if i > 2)
|
|
|
|
def as_tuple(self, row):
|
|
"""get selected columns as tuple"""
|
|
return tuple(f(row[i]) for i, f in self.items())
|
|
|
|
cvt_double = staticmethod(PrettyFloat)
|
|
|
|
@staticmethod
|
|
def cvt_string(value):
|
|
return value
|
|
|
|
@staticmethod
|
|
def cvt_long(value):
|
|
return int(value)
|
|
|
|
@staticmethod
|
|
def cvt_dateTime(value):
|
|
return to_time(value)
|
|
|
|
@staticmethod
|
|
def cvt_boolean(value):
|
|
return value == 'true'
|
|
|
|
cvt_unsigned_long = cvt_duration = cvt_long
|
|
|
|
|
|
class Table(list):
|
|
"""a list of tuples with meta info"""
|
|
def __init__(self, tags={}, key_names=(), column_names=(), rows=None):
|
|
super().__init__()
|
|
self.tags = tags
|
|
self.key_names = key_names
|
|
self.column_names = column_names
|
|
if rows:
|
|
self[:] = rows
|
|
|
|
def to_csv_rows(self, timeoffset=0, sep='\t', none='none', float_format='%.15g'):
|
|
for row in self:
|
|
result = ['%.15g' % (row[0] - timeoffset)]
|
|
for value in row[1:]:
|
|
try:
|
|
result.append(float_format % value)
|
|
except TypeError:
|
|
if value is None:
|
|
result.append(none)
|
|
else:
|
|
result.append(str(value).replace(sep, ' '))
|
|
yield sep.join(result)
|
|
|
|
|
|
class Single(Table):
|
|
"""a single row of a table, as a list with meta info"""
|
|
def __init__(self, tags={}, key_names=(), column_names=(), rows=None):
|
|
super().__init__(tags, key_names, column_names)
|
|
if rows:
|
|
single_row, = rows
|
|
self[:] = single_row
|
|
|
|
|
|
def summarize_tags(curves, remove_multiple=False):
|
|
"""summarize tags
|
|
|
|
:param curves: list of curves (type Table)
|
|
:param remove_multiple: True: remove non-unique values
|
|
:return: dict <key> of comma separated values
|
|
"""
|
|
result = {}
|
|
for curve in curves:
|
|
for k, v in curve.tags.items():
|
|
result.setdefault(k, set()).add(str(v))
|
|
if remove_multiple:
|
|
return {k: ','.join(v) for k, v in result.items() if len(v) == 1}
|
|
return {k: ','.join(v) for k, v in result.items()}
|
|
|
|
|
|
class RegExp(str):
|
|
"""indicates, tht this string should be treated as regexp
|
|
|
|
Usage: RegExp(<pattern>)
|
|
when used in InfluxDBWrapper.query, uses Go regexp syntax!
|
|
"""
|
|
|
|
|
|
def append_wildcard_filter(msg, key, names):
|
|
patterns = []
|
|
for pattern in names:
|
|
if isinstance(pattern, RegExp):
|
|
patterns.append(pattern)
|
|
else:
|
|
# patterns.append('[^.]*'.join(re.escape(v) for v in pattern.split('*')))
|
|
patterns.append('.*'.join(re.escape(v) for v in pattern.split('*')))
|
|
if patterns:
|
|
pattern = '|'.join(patterns)
|
|
msg.append(f'|> filter(fn:(r) => r.{key} =~ /^({pattern})$/)')
|
|
|
|
|
|
class CurveDict(dict):
|
|
def __missing__(self, key):
|
|
return []
|
|
|
|
|
|
def abs_range(start=None, stop=None):
|
|
now = time.time()
|
|
if start is None: # since ever
|
|
start = 0
|
|
elif start < YEAR:
|
|
start = int(now + start)
|
|
if stop is None:
|
|
stop = int(now + YEAR)
|
|
elif stop < YEAR:
|
|
stop = ceil(now + stop)
|
|
return start, stop
|
|
|
|
|
|
def round_range(start, stop, interval=None):
|
|
interval = max(1, interval or 0)
|
|
start = floor(floor(start) // interval * interval)
|
|
stop = ceil(ceil(stop // interval) * interval)
|
|
return start, stop
|
|
|
|
|
|
class InfluxDBWrapper:
|
|
"""Wrapper for InfluxDB API 2.0.
|
|
|
|
based on nicos.services.cache.database.influxdb
|
|
(work of Konstantin Kholostov <k.kholostov@fz-juelich.de>)
|
|
"""
|
|
_update_queue = None
|
|
_write_api_write = None
|
|
|
|
def __init__(self, uri=None, token=None, org=None, bucket=None, access='readonly'):
|
|
"""initialize
|
|
|
|
:param uri: the uri for the influx DB or a name to look up in ~/.sehistory
|
|
:param token: the token
|
|
:param org: the organisation
|
|
:param bucket: the bucket name
|
|
:param access: 'readonly', 'write' (RW) or 'create' (incl. RW)
|
|
"""
|
|
if ':' in uri:
|
|
args = uri, token, org, bucket
|
|
else:
|
|
parser = ConfigParser()
|
|
parser.optionxform = str
|
|
parser.read([Path('~').expanduser() / '.sehistory'])
|
|
section = parser[uri]
|
|
args = [section[k] for k in ('uri', 'token', 'org', 'bucket')]
|
|
self._url, self._token, self._org, self._bucket =args
|
|
self._client = InfluxDBClient(url=self._url, token=self._token,
|
|
org=self._org)
|
|
if access != 'readonly':
|
|
self.enable_write_access()
|
|
self._deadline = 0
|
|
self.set_time_precision(3)
|
|
self.add_new_bucket(self._bucket, access == 'create')
|
|
self._write_buffer = []
|
|
self._alias = {}
|
|
print('InfluxDBWrapper', self._url, self._org, self._bucket)
|
|
|
|
def enable_write_access(self):
|
|
self._write_api_write = self._client.write_api(write_options=SYNCHRONOUS).write
|
|
|
|
def set_time_precision(self, digits):
|
|
self.timedig = max(0, min(digits, 9))
|
|
self._write_precision = TIME_PRECISION[self.timedig]
|
|
|
|
def disconnect(self):
|
|
self.flush()
|
|
self._client.close()
|
|
|
|
def get_bucket_names(self):
|
|
bucket_names = []
|
|
buckets = self._client.buckets_api().find_buckets().buckets
|
|
for bucket in buckets:
|
|
bucket_names.append(bucket.name)
|
|
return bucket_names
|
|
|
|
def add_new_bucket(self, bucket_name, create):
|
|
if bucket_name not in self.get_bucket_names():
|
|
if not create:
|
|
raise ValueError(f'unknown bucket {bucket_name}')
|
|
retention_rules = BucketRetentionRules(type='expire',
|
|
every_seconds=0)
|
|
self._client.buckets_api().create_bucket(
|
|
bucket_name=bucket_name,
|
|
retention_rules=retention_rules, org=self._org)
|
|
|
|
def get_measurements(self):
|
|
return [r['_value'] for t in self._client.query_api().query(f"""
|
|
import "influxdata/influxdb/schema"
|
|
schema.measurements(bucket: "{self._bucket}")""") for r in t]
|
|
|
|
def delete_measurement(self, measurement, start=None, stop=None):
|
|
delete_api = self._client.delete_api()
|
|
start, stop = abs_range(start, stop)
|
|
if stop is None:
|
|
stop = time.time() + DAY
|
|
delete_api.delete(to_iso(start), to_iso(stop), f'_measurement="{measurement}"',
|
|
bucket=self._bucket, org=self._org)
|
|
|
|
def delete_all_measurements(self, measurements=None, start=0, stop=None):
|
|
if measurements is None:
|
|
measurements = self.get_measurements()
|
|
for meas in measurements:
|
|
self.delete_measurement(meas, start, stop)
|
|
|
|
def _get_rows(self, reader, as_tuple, first_row):
|
|
row = first_row
|
|
tableno = row[2]
|
|
try:
|
|
while 1:
|
|
if row[0]:
|
|
first_row[:] = row
|
|
return
|
|
if row[2] != tableno:
|
|
# table id changed: new table, store first row for next call
|
|
first_row[:] = row
|
|
return
|
|
yield as_tuple(row)
|
|
row = next(reader)
|
|
if not row:
|
|
raise ValueError('EMPTY')
|
|
except StopIteration:
|
|
first_row.clear() # indicate end of data
|
|
|
|
|
|
# query the database
|
|
|
|
def query(self, start=None, stop=None, interval=None, single=None, columns=None, **tags):
|
|
"""Returns queried data as InfluxDB tables
|
|
|
|
:param start: start time (default: since ever)
|
|
:param stop: end time (default: eternity = 1 year in the future)
|
|
:param interval: if set an aggregation filter will be applied. This will
|
|
return only the latest values per time interval in seconds.
|
|
:param single: when True (or 1), only the last value within the interval is returned
|
|
(for any existing combinations of tags!)
|
|
single=-1: return the first value instead
|
|
:param columns: if given, return only these columns (in addition to '_time' and '_value')
|
|
:param tags: selection criteria:
|
|
<tag>=None
|
|
return records independent of this tag.
|
|
the obtained value will be contained in the result dicts key
|
|
<tag>=[<value1>, <value2>, ...]
|
|
return records where tag is one from the list.
|
|
the obtained value is contained in the result dicts key
|
|
<tag>=<value>:
|
|
return only records with given tag matching <value>.
|
|
the obtained value is contained in the result dicts key only
|
|
if the value is an instance of RegExp or when it contains an asterisk ('*')
|
|
|
|
:return: a dict <tuple of key values> of <Table instance>
|
|
Table is an extension of list, with some meta info
|
|
"""
|
|
result = {}
|
|
for rows, key, props in self.query_gen(start, stop, interval, single, columns, **tags):
|
|
if single:
|
|
result[key] = Single(*props, rows=rows)
|
|
else:
|
|
table = Table(*props, rows=rows)
|
|
table.sort()
|
|
result[key] = table
|
|
return result
|
|
|
|
def query_gen(self, start=None, stop=None, interval=None, single=None, columns=None, **tags):
|
|
"""Returns queried data as InfluxDB as a generator
|
|
|
|
argument description: see query methods
|
|
|
|
:return: an iterator of (rows, key, (tags, key_names, column_names))
|
|
|
|
remark: rows not consumed in between iteration steps are lost
|
|
using this generator version does reduce memory usage
|
|
"""
|
|
self.flush()
|
|
start, stop = round_range(*abs_range(start, stop))
|
|
msg = [f'from(bucket:"{self._bucket}")',
|
|
f'|> range(start: {start}, stop: {stop})']
|
|
|
|
keylist = []
|
|
dropcols = ['_start', '_stop']
|
|
fixed_tags = {}
|
|
for key, crit in tags.items():
|
|
if crit is None:
|
|
keylist.append(key)
|
|
continue
|
|
if isinstance(crit, str):
|
|
if isinstance(crit, RegExp) or '*' in crit:
|
|
keylist.append(key)
|
|
append_wildcard_filter(msg, key, [crit])
|
|
continue
|
|
fixed_tags[key] = crit
|
|
dropcols.append(key)
|
|
crit = f'"{crit}"'
|
|
elif isinstance(crit, (int, float)):
|
|
fixed_tags[key] = crit
|
|
dropcols.append(key)
|
|
else:
|
|
try:
|
|
keylist.append(key)
|
|
append_wildcard_filter(msg, key, crit)
|
|
continue
|
|
except Exception:
|
|
raise ValueError(f'illegal value for {key}: {crit}')
|
|
msg.append(f'|> filter(fn:(r) => r.{key} == {crit})')
|
|
if single:
|
|
if single < 0:
|
|
msg.append('|> first(column: "_time")')
|
|
else:
|
|
msg.append('|> last(column: "_time")')
|
|
if interval:
|
|
msg.append(f'|> aggregateWindow(every: {interval:g}s, fn: last, createEmpty: false)')
|
|
if columns is None:
|
|
msg.append(f'''|> drop(columns:["{'","'.join(dropcols)}"])''')
|
|
else:
|
|
columns = ['_time', '_value'] + list(columns)
|
|
msg.append(f'''|> keep(columns:["{'","'.join(columns + keylist)}"])''')
|
|
|
|
msg = '\n'.join(msg)
|
|
# print(msg)
|
|
self.msg = msg
|
|
|
|
try:
|
|
reader = self._client.query_api().query_csv(msg)
|
|
except Exception:
|
|
print(msg)
|
|
raise
|
|
|
|
try:
|
|
row = next(reader)
|
|
except StopIteration:
|
|
return
|
|
converters = key_dict = table_properties = None # make IDE happy
|
|
while 1:
|
|
header = {}
|
|
if row[0]: # this is a header
|
|
header[row[0]] = row
|
|
for row in reader:
|
|
if row:
|
|
if not row[0]:
|
|
break
|
|
header[row[0]] = row
|
|
else:
|
|
return # this should not happen
|
|
# we are now at the row with the column names
|
|
column_names = row
|
|
converters = Converters(header['#datatype'])
|
|
group = header['#group']
|
|
keys = {k: None for k in keylist}
|
|
|
|
for col, (name, grp) in enumerate(zip(column_names, group)):
|
|
if grp != 'true':
|
|
continue
|
|
if columns is None or name in keys:
|
|
keys[name] = col, converters.pop(col)
|
|
none_keys = [k for k, v in keys.items() if v is None]
|
|
if none_keys:
|
|
for k in none_keys:
|
|
keys.pop(k)
|
|
# break
|
|
row = next(reader)
|
|
# we are at the first data row
|
|
key_dict = {n: f(row[i]) for n, (i, f) in keys.items()}
|
|
column_keys = tuple(column_names[i] for i in converters)
|
|
table_properties = {**fixed_tags, **key_dict}, tuple(keys), column_keys
|
|
key = tuple(key_dict.values())
|
|
row = list(row) # copy row, as it will be modified
|
|
rows = self._get_rows(reader, converters.as_tuple, row)
|
|
yield rows, key, table_properties
|
|
# consume unused rows
|
|
consumed = sum(1 for _ in rows)
|
|
if consumed:
|
|
print('skip', consumed, 'rows')
|
|
if not row: # reader is at end
|
|
return
|
|
|
|
def curves(self, start=None, stop=None, measurement=('*.value', '*.target'), field='float',
|
|
interval=None, add_prev=3600, add_end=False, merge=None, pivot=False, **kwds):
|
|
"""get curves
|
|
|
|
:param start: start time (default: since ever)
|
|
:param stop: end time (default: eternity = 1 year in the future)
|
|
:param measurement: '<module>.<parameter>' (default: ['*.value', '*.target'])
|
|
:param field: default 'float' (only numeric curves)
|
|
:param interval: if given, the result is binned
|
|
:param add_prev: amount of time to look back for the last previous point (default: 1 hr)
|
|
:param add_end: whether to add endpoint at stop time (default: False)
|
|
:param merge: None: no merge happens, else curves with the same final key are merged. 2 cases:
|
|
one key (str): the name of final key. result will be a dict of <str> of <Table>
|
|
a tuple of keys: the names of the final key elements. result: dict of <tuple> of Table
|
|
:param pivot: sort values in to columns of one big table
|
|
:param kwds: further selection criteria
|
|
:return: a dict <key or tuple of key values> of <Table> or <Single>
|
|
|
|
where <Table> is a list of tuples with some meta info (table.tags, table.column_names)
|
|
and <Single> is a list (a single row of a table) with the same meta info
|
|
|
|
when _field='float' (the default), the returned values are either a floats or None
|
|
"""
|
|
tags = {k: v for k, v in (('_measurement', measurement), ('_field', field)) if v is not None}
|
|
tags.update(kwds)
|
|
start, stop = abs_range(start, stop)
|
|
rstart, rstop = round_range(start, stop, interval)
|
|
if rstart < rstop:
|
|
result = self.query(rstart, rstop, interval, columns=None, **tags)
|
|
# result = self.query(rstart, rstop, interval, columns=['stream', 'device'], **tags)
|
|
else:
|
|
result = {}
|
|
start_row = {}
|
|
if add_prev:
|
|
prev_data = self.query(rstart - add_prev, rstart, single=1, **tags)
|
|
for key, first in prev_data.items():
|
|
curve = result.get(key)
|
|
if first[1] is not None:
|
|
if curve:
|
|
if first[0] < curve[0][0]:
|
|
if pivot:
|
|
curve.insert(0, (rstart,) + tuple(first[1:]))
|
|
# start_row.setdefault(key[1:], {})[key[0]] = first[1]
|
|
else:
|
|
curve.insert(0, tuple(first))
|
|
else:
|
|
result[key] = table = Table(first.tags, first.key_names, first.column_names)
|
|
table.append(tuple(first))
|
|
if add_end:
|
|
self.complete(result, stop)
|
|
if merge:
|
|
single_merge = isinstance(merge, str)
|
|
if single_merge:
|
|
merge = [merge]
|
|
rows = []
|
|
common_tags = summarize_tags(result.values(), True)
|
|
col_keys = {}
|
|
col_info = {}
|
|
for key, curve in result.items():
|
|
merge_tags = {k: curve.tags.get(k, '') for k in merge}
|
|
for k, v in zip(curve.key_names, key):
|
|
merge_tags.setdefault(k, v)
|
|
merge_key = tuple(zip(*merge_tags.items())) # (<keys tuple>, <values tuple>)
|
|
info = col_info.get(merge_key)
|
|
if info is None:
|
|
col_idx = len(col_keys) + 1
|
|
col_keys[col_idx] = merge_key
|
|
col_info[merge_key] = info = [col_idx, []]
|
|
else:
|
|
col_idx = info[0]
|
|
info[1].append(curve)
|
|
assert curve.column_names[1] == '_value'
|
|
for row in curve:
|
|
rows.append((row[0], row[1], col_idx))
|
|
# merged.append((merge_key, curve))
|
|
rows.sort(key=lambda x: x[0])
|
|
if pivot:
|
|
header = []
|
|
for merge_key, (col_idx, curves) in col_info.items():
|
|
tags = summarize_tags(curves)
|
|
primary = tags.pop(merge[0])
|
|
header.append(' '.join([primary] + [f"{k}={v}" for k, v in tags.items() if k not in common_tags]))
|
|
result = Table(common_tags, (), ('_time',) + tuple(header))
|
|
values = [0] + [None] * len(col_keys)
|
|
for row in rows:
|
|
col_nr = row[2]
|
|
values[col_nr] = row[1]
|
|
if row[0] > values[0]:
|
|
values[0] = row[0]
|
|
result.append(tuple(values))
|
|
elif row[0] < values[0]:
|
|
raise ValueError(f'{rstart} {values[0]} {row[0]}')
|
|
else:
|
|
result = {}
|
|
by_idx = {}
|
|
for merge_key, (col_idx, curves) in col_info.items():
|
|
tags = summarize_tags(curves)
|
|
primary = tags[merge[0]]
|
|
table = Table(tags, merge_key[0], ('_time', primary))
|
|
result[primary if single_merge else merge_key[1][:len(merge)]] = table
|
|
by_idx[col_idx] = table
|
|
for row in rows:
|
|
by_idx[row[2]].append((row[0], row[1]))
|
|
return result
|
|
|
|
@staticmethod
|
|
def complete(curve_dict, end_time=0, tag='stream'):
|
|
"""complete to end_time
|
|
|
|
if end_time is not given, is is the max timestamp within the same stream
|
|
"""
|
|
end_time_dict = {}
|
|
if not end_time:
|
|
for curve in curve_dict.values():
|
|
key = curve.tags.get(tag)
|
|
end_time_dict[key] = max(end_time_dict.get(key, 0), curve[-1][0])
|
|
for curve in curve_dict.values():
|
|
if len(curve):
|
|
tlast, value = curve[-1]
|
|
etime = end_time_dict.get(curve.tags.get(tag), end_time)
|
|
if value is not None and tlast < etime:
|
|
curve.append((etime, value))
|
|
|
|
def export(self, start, stop, measurement=('*.value', '*.target'), field='float',
|
|
interval=None, add_prev=3600, add_end=False, timeoffset=None, none='', **tags):
|
|
result = self.curves(start, stop, measurement, field, interval, add_prev, add_end,
|
|
merge=('_measurement', 'device', 'stream'), pivot=True, **tags)
|
|
if timeoffset is None:
|
|
timeoffset = int(start)
|
|
result.tags.pop('_field', None)
|
|
rows = [f"# {' '.join(f'{k}={v}' for k, v in result.tags.items())}"]
|
|
rows.extend(f'# col {i} {k}' for i, k in enumerate(result.column_names))
|
|
rows.extend(result.to_csv_rows(timeoffset, none=none))
|
|
rows.append('')
|
|
return '\n'.join(rows)
|
|
|
|
# write to the database
|
|
|
|
def _add_point(self, measurement, field, value, ts, tags):
|
|
point = Point(measurement).field(f'{field}', value)
|
|
if ts:
|
|
point.time(datetime.utcfromtimestamp(ts), write_precision=self._write_precision)
|
|
for key, val in tags.items():
|
|
point.tag(key, val)
|
|
self._write_buffer.append(point)
|
|
|
|
def write(self, measurement, field, value, ts, **tags):
|
|
"""add point and flush"""
|
|
self._add_point(measurement, field, value, ts, tags)
|
|
self.flush()
|
|
|
|
def flush(self):
|
|
"""flush write buffer"""
|
|
points = self._write_buffer
|
|
self._write_buffer = []
|
|
if points:
|
|
try:
|
|
self._write_api_write(bucket=self._bucket, record=points)
|
|
except TypeError as e:
|
|
if self._write_api_write is None:
|
|
raise PermissionError('no write access - need access="write"') from None
|
|
raise
|
|
|
|
# TODO: move these sehistory related methods to a subclass
|
|
def add_float(self, value, key, tags, ts):
|
|
self._add_point('.'.join(key), 'float', -0.0 if value is None else float(value), ts, tags)
|
|
|
|
def add_error(self, value, key, tags, ts):
|
|
self._add_point('.'.join(key), 'error', '' if value is None else str(value), ts, tags)
|
|
|
|
def get_instrument(self, stream, ts=None, **tags):
|
|
if ts is None:
|
|
ts = int(time.time())
|
|
reply = self.query(None, int(ts) + 1, _measurement='_stream_', _field='on',
|
|
stream=stream, single=1, **tags)
|
|
if reply:
|
|
entry = sorted(reply.values(), key=lambda r: r[0])[-1]
|
|
return entry.tags.get('instrument'), entry[0]
|
|
return None, None
|
|
|
|
def get_streams(self, instrument=None, ts=None, **tags):
|
|
"""get streams for one or all instruments
|
|
|
|
:param instrument: None when looking for all instruments
|
|
:param ts: the time or None when now
|
|
:return: dict <stream> of <instrument> or '0' when instrument is not known
|
|
"""
|
|
if ts is None:
|
|
ts = int(time.time())
|
|
reply = self.query(None, int(ts) + 1, _measurement='_stream_', _field='on',
|
|
single=1, instrument=instrument, **tags)
|
|
all_entries = {}
|
|
for entry in reply.values():
|
|
all_entries.setdefault(entry.tags.get('stream'), []).append(entry)
|
|
result = {}
|
|
for stream, entries in all_entries.items():
|
|
entry = sorted(entries, key=lambda r: r[0])[-1]
|
|
if entry[1]: # on=True
|
|
result[stream] = entry.tags.get('instrument', '0')
|
|
return result
|
|
|
|
def set_instrument(self, stream, value, ts=None, guess=True, **tags):
|
|
"""set stream and instrument on or off
|
|
|
|
:param stream: the uri of the stream
|
|
:param value: instrument, "0" when unknown or None when switching to off
|
|
:param ts: the time or None when now
|
|
:param guess: when instrument is undefined, take from previous
|
|
"""
|
|
prev, prevts = self.get_instrument(stream, ts, **tags)
|
|
if prevts is not None:
|
|
if prev in (None, '0'):
|
|
ts = prevts + 0.001
|
|
else:
|
|
if value == '0' and guess:
|
|
value = prev
|
|
if ts < prevts:
|
|
ts = prevts + 0.001
|
|
tags['stream'] = stream
|
|
if value:
|
|
tags['instrument'] = value
|
|
flag = True
|
|
else:
|
|
tags['instrument'] = prev or '0'
|
|
flag = False
|
|
self._add_point('_stream_', 'on', flag, ts, tags)
|
|
|
|
def add_stream(self, value, tags, key, ts):
|
|
self.set_instrument(key, value, ts, **tags)
|
|
|
|
|
|
def testdb():
|
|
token = "zqDbTcMv9UizfdTj15Fx_6vBetkM5mXN56EE9CiDaFsh7O2FFWZ2X4VwAAmdyqZr3HbpIr5ixRju07-oQmxpXw=="
|
|
return InfluxDBWrapper('http://pc16392:8086', token, 'linse', 'curve-test')
|