diff --git a/__init__.py b/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/feeder.py b/feeder.py index bb690bf..8dabab9 100644 --- a/feeder.py +++ b/feeder.py @@ -2,13 +2,13 @@ import sys from streams import EventStream from nicoscache import NicosStream from secop import ScanStream, ScanReply, send_fake_udp -from influx import InfluxDBWrapper +from . import SEHistory def main(): # egen = EventStream(ScanReply(), ScanStream(), n=NicosStream('localhost:14002')) egen = EventStream(ScanReply(), ScanStream()) - db = InfluxDBWrapper('linse-c') + db = SEHistory(access='write') db.enable_write_access() event_map = {'value': db.add_float, 'error': db.add_error, 'stream': db.add_stream} diff --git a/influx.py b/influx.py index 04ed9cb..6e21d1d 100644 --- a/influx.py +++ b/influx.py @@ -21,8 +21,6 @@ # ***************************************************************************** import re import time -from pathlib import Path -from configparser import ConfigParser from datetime import datetime, timezone from math import floor, ceil @@ -31,6 +29,7 @@ from influxdb_client.client.write_api import SYNCHRONOUS DAY = 24 * 3600 YEAR = 366 * DAY +SINCE_EVER = YEAR + DAY # write_precision from digits after decimal point TIME_PRECISION = ['s'] + ['ms'] * 3 + ['us'] * 3 + ['ns'] * 3 UNDEF = '' @@ -98,9 +97,9 @@ class Converters(dict): class Table(list): """a list of tuples with meta info""" - def __init__(self, tags={}, key_names=(), column_names=(), rows=None): + def __init__(self, tags=None, key_names=(), column_names=(), rows=None): super().__init__() - self.tags = tags + self.tags = tags or {} self.key_names = key_names self.column_names = column_names if rows: @@ -122,29 +121,13 @@ class Table(list): class Single(Table): """a single row of a table, as a list with meta info""" - def __init__(self, tags={}, key_names=(), column_names=(), rows=None): + def __init__(self, tags=None, key_names=(), column_names=(), rows=None): super().__init__(tags, key_names, column_names) if rows: single_row, = rows self[:] = single_row -def summarize_tags(curves, remove_multiple=False): - """summarize tags - - :param curves: list of curves (type Table) - :param remove_multiple: True: remove non-unique values - :return: dict of comma separated values - """ - result = {} - for curve in curves: - for k, v in curve.tags.items(): - result.setdefault(k, set()).add(str(v)) - if remove_multiple: - return {k: ','.join(v) for k, v in result.items() if len(v) == 1} - return {k: ','.join(v) for k, v in result.items()} - - class RegExp(str): """indicates, tht this string should be treated as regexp @@ -174,7 +157,7 @@ class CurveDict(dict): def abs_range(start=None, stop=None): now = time.time() if start is None: # since ever - start = 0 + start = SINCE_EVER elif start < YEAR: start = int(now + start) if stop is None: @@ -203,23 +186,14 @@ class InfluxDBWrapper: def __init__(self, uri=None, token=None, org=None, bucket=None, access='readonly'): """initialize - :param uri: the uri for the influx DB or a name to look up in ~/.sehistory - :param token: the token + :param uri: the uri for the influx DB or a filepath for a config file + :param token: the token or the section in the config file :param org: the organisation :param bucket: the bucket name :param access: 'readonly', 'write' (RW) or 'create' (incl. RW) """ - if ':' in uri: - args = uri, token, org, bucket - else: - parser = ConfigParser() - parser.optionxform = str - parser.read([Path('~').expanduser() / '.sehistory']) - section = parser[uri] - args = [section[k] for k in ('uri', 'token', 'org', 'bucket')] - self._url, self._token, self._org, self._bucket =args - self._client = InfluxDBClient(url=self._url, token=self._token, - org=self._org) + self._url, self._token, self._org, self._bucket = uri, token, org, bucket + self._client = InfluxDBClient(url=uri, token=token, org=org) if access != 'readonly': self.enable_write_access() self._deadline = 0 @@ -228,7 +202,7 @@ class InfluxDBWrapper: self._write_buffer = [] self._alias = {} print('InfluxDBWrapper', self._url, self._org, self._bucket) - self.debug_reply = False + self.debug = False def enable_write_access(self): self._write_api_write = self._client.write_api(write_options=SYNCHRONOUS).write @@ -296,7 +270,6 @@ class InfluxDBWrapper: except StopIteration: first_row.clear() # indicate end of data - # query the database def query(self, start=None, stop=None, interval=None, single=None, columns=None, **tags): @@ -390,7 +363,8 @@ class InfluxDBWrapper: msg.append(f'''|> keep(columns:["{'","'.join(columns + keylist)}"])''') msg = '\n'.join(msg) - # print(msg) + if self.debug: + print(msg) self.msg = msg try: @@ -399,7 +373,7 @@ class InfluxDBWrapper: print(msg) raise - if self.debug_reply: + if self.debug: def readdebug(reader): for row in reader: print(row) @@ -409,7 +383,7 @@ class InfluxDBWrapper: row = next(reader) except StopIteration: return - converters = key_dict = table_properties = None # make IDE happy + converters = keys = column_names = None # make IDE happy while 1: header = {} if row[0]: # this is a header @@ -453,141 +427,6 @@ class InfluxDBWrapper: if not row: # reader is at end return - def curves(self, start=None, stop=None, measurement=('*.value', '*.target'), field='float', - interval=None, add_prev=3600, add_end=False, merge=None, pivot=False, **kwds): - """get curves - - :param start: start time (default: since ever) - :param stop: end time (default: eternity = 1 year in the future) - :param measurement: '.' (default: ['*.value', '*.target']) - :param field: default 'float' (only numeric curves) - :param interval: if given, the result is binned - :param add_prev: amount of time to look back for the last previous point (default: 1 hr) - :param add_end: whether to add endpoint at stop time (default: False) - :param merge: None: no merge happens, else curves with the same final key are merged. 2 cases: - one key (str): the name of final key. result will be a dict of of - a tuple of keys: the names of the final key elements. result: dict of of Table - :param pivot: sort values in to columns of one big table - :param kwds: further selection criteria - :return: a dict of
or - - where
is a list of tuples with some meta info (table.tags, table.column_names) - and is a list (a single row of a table) with the same meta info - - when _field='float' (the default), the returned values are either a floats or None - """ - tags = {k: v for k, v in (('_measurement', measurement), ('_field', field)) if v is not None} - tags.update(kwds) - start, stop = abs_range(start, stop) - rstart, rstop = round_range(start, stop, interval) - if rstart < rstop: - result = self.query(rstart, rstop, interval, columns=None, **tags) - # result = self.query(rstart, rstop, interval, columns=['stream', 'device'], **tags) - else: - result = {} - start_row = {} - if add_prev: - prev_data = self.query(rstart - add_prev, rstart, single=1, **tags) - for key, first in prev_data.items(): - curve = result.get(key) - if first[1] is not None: - if curve: - if first[0] < curve[0][0]: - if pivot: - curve.insert(0, (rstart,) + tuple(first[1:])) - # start_row.setdefault(key[1:], {})[key[0]] = first[1] - else: - curve.insert(0, tuple(first)) - else: - result[key] = table = Table(first.tags, first.key_names, first.column_names) - table.append(tuple(first)) - if add_end: - self.complete(result, stop) - if merge: - single_merge = isinstance(merge, str) - if single_merge: - merge = [merge] - rows = [] - common_tags = summarize_tags(result.values(), True) - col_keys = {} - col_info = {} - for key, curve in result.items(): - merge_tags = {k: curve.tags.get(k, '') for k in merge} - for k, v in zip(curve.key_names, key): - merge_tags.setdefault(k, v) - merge_key = tuple(zip(*merge_tags.items())) # (, ) - info = col_info.get(merge_key) - if info is None: - col_idx = len(col_keys) + 1 - col_keys[col_idx] = merge_key - col_info[merge_key] = info = [col_idx, []] - else: - col_idx = info[0] - info[1].append(curve) - assert curve.column_names[1] == '_value' - for row in curve: - rows.append((row[0], row[1], col_idx)) - # merged.append((merge_key, curve)) - rows.sort(key=lambda x: x[0]) - if pivot: - header = [] - for merge_key, (col_idx, curves) in col_info.items(): - tags = summarize_tags(curves) - primary = tags.pop(merge[0]) - header.append(' '.join([primary] + [f"{k}={v}" for k, v in tags.items() if k not in common_tags])) - result = Table(common_tags, (), ('_time',) + tuple(header)) - values = [0] + [None] * len(col_keys) - for row in rows: - col_nr = row[2] - values[col_nr] = row[1] - if row[0] > values[0]: - values[0] = row[0] - result.append(tuple(values)) - elif row[0] < values[0]: - raise ValueError(f'{rstart} {values[0]} {row[0]}') - else: - result = {} - by_idx = {} - for merge_key, (col_idx, curves) in col_info.items(): - tags = summarize_tags(curves) - primary = tags[merge[0]] - table = Table(tags, merge_key[0], ('_time', primary)) - result[primary if single_merge else merge_key[1][:len(merge)]] = table - by_idx[col_idx] = table - for row in rows: - by_idx[row[2]].append((row[0], row[1])) - return result - - @staticmethod - def complete(curve_dict, end_time=0, tag=None): - """complete to end_time - - if tag is given, end_time is a dict of - """ - if tag is None: - end_time_dict = {} - else: - end_time_dict, end_time = end_time, 0 - for curve in curve_dict.values(): - if len(curve): - tlast, value = curve[-1] - etime = end_time_dict.get(curve.tags.get(tag), end_time) - if value is not None and tlast < etime: - curve.append((etime, value)) - - def export(self, start, stop, measurement=('*.value', '*.target'), field='float', - interval=None, add_prev=3600, add_end=False, timeoffset=None, none='', **tags): - result = self.curves(start, stop, measurement, field, interval, add_prev, add_end, - merge=('_measurement', 'device', 'stream'), pivot=True, **tags) - if timeoffset is None: - timeoffset = int(start) - result.tags.pop('_field', None) - rows = [f"# {' '.join(f'{k}={v}' for k, v in result.tags.items())}"] - rows.extend(f'# col {i} {k}' for i, k in enumerate(result.column_names)) - rows.extend(result.to_csv_rows(timeoffset, none=none)) - rows.append('') - return '\n'.join(rows) - # write to the database def _add_point(self, measurement, field, value, ts, tags): @@ -614,75 +453,3 @@ class InfluxDBWrapper: if self._write_api_write is None: raise PermissionError('no write access - need access="write"') from None raise - - # TODO: move these sehistory related methods to a subclass - def add_float(self, value, key, tags, ts): - self._add_point('.'.join(key), 'float', -0.0 if value is None else float(value), ts, tags) - - def add_error(self, value, key, tags, ts): - self._add_point('.'.join(key), 'error', '' if value is None else str(value), ts, tags) - - def get_instrument(self, stream, ts=None, **tags): - if ts is None: - ts = int(time.time()) - reply = self.query(None, int(ts) + 1, _measurement='_stream_', _field='on', - stream=stream, single=1, **tags) - if reply: - entry = sorted(reply.values(), key=lambda r: r[0])[-1] - return entry.tags.get('instrument'), entry[0] - return None, None - - def get_streams(self, instrument=None, ts=None, **tags): - """get streams for one or all instruments - - :param instrument: None when looking for all instruments - :param ts: the time or None when now - :return: dict of or '0' when instrument is not known - """ - if ts is None: - ts = int(time.time()) - reply = self.query(None, int(ts) + 1, _measurement='_stream_', _field='on', - single=1, instrument=instrument, **tags) - all_entries = {} - for entry in reply.values(): - all_entries.setdefault(entry.tags.get('stream'), []).append(entry) - result = {} - for stream, entries in all_entries.items(): - entry = sorted(entries, key=lambda r: r[0])[-1] - if entry[1]: # on=True - result[stream] = entry.tags - return result - - def set_instrument(self, stream, value, ts=None, guess=True, **tags): - """set stream and instrument on or off - - :param stream: the uri of the stream - :param value: instrument, "0" when unknown or None when switching to off - :param ts: the time or None when now - :param guess: when instrument is undefined, take from previous - """ - prev, prevts = self.get_instrument(stream, ts, **tags) - if prevts is not None: - if prev in (None, '0'): - ts = prevts + 0.001 - else: - if value == '0' and guess: - value = prev - if ts < prevts: - ts = prevts + 0.001 - tags['stream'] = stream - if value: - tags['instrument'] = value - flag = True - else: - tags['instrument'] = prev or '0' - flag = False - self._add_point('_stream_', 'on', flag, ts, tags) - - def add_stream(self, value, tags, key, ts): - self.set_instrument(key, value, ts, **tags) - - -def testdb(): - token = "zqDbTcMv9UizfdTj15Fx_6vBetkM5mXN56EE9CiDaFsh7O2FFWZ2X4VwAAmdyqZr3HbpIr5ixRju07-oQmxpXw==" - return InfluxDBWrapper('http://pc16392:8086', token, 'linse', 'curve-test') diff --git a/secop.py b/secop.py index 30dcbe8..f84a474 100644 --- a/secop.py +++ b/secop.py @@ -122,8 +122,8 @@ class UdpStream(Base): continue if kind == 'for_other_node': uri = msg.pop('uri') - if 'device' not in msg: - msg['device'] = uri.split('://', 1)[-1].split(':')[0] + # if 'device' not in msg: + # msg['device'] = uri.split('://', 1)[-1].split(':')[0] kwargs = msg elif kind == 'node': uri = f"{addr[0]}:{msg['port']}" diff --git a/seinflux.py b/seinflux.py new file mode 100644 index 0000000..bf2e9a2 --- /dev/null +++ b/seinflux.py @@ -0,0 +1,288 @@ +import time +from pathlib import Path +from configparser import ConfigParser +from influx import InfluxDBWrapper, abs_range, round_range, Table + + +def summarize_tags(curves, remove_multiple=False): + """summarize tags + + :param curves: list of curves (type Table) + :param remove_multiple: True: remove non-unique values + :return: dict of comma separated values + """ + result = {} + for curve in curves: + for k, v in curve.tags.items(): + result.setdefault(k, set()).add(str(v)) + if remove_multiple: + return {k: ','.join(v) for k, v in result.items() if len(v) == 1} + return {k: ','.join(v) for k, v in result.items()} + + +class SEHistory(InfluxDBWrapper): + def __init__(self, dbname=None, instrument=None, access='readonly'): + self.instrument = instrument + parser = ConfigParser() + parser.optionxform = str + parser.read([Path('~/.sehistory').expanduser()]) + section = parser[dbname] if dbname else parser[parser.sections()[0]] + super().__init__(*(section[k] for k in ('uri', 'token', 'org', 'bucket')), access=access) + + def curves(self, start=None, stop=None, measurement=('*.value', '*.target'), field='float', + interval=None, add_prev=3600, add_end=False, merge=None, pivot=False, **kwds): + """get curves + + :param start: start time (default: since ever) + :param stop: end time (default: eternity = 1 year in the future) + :param measurement: '.' (default: ['*.value', '*.target']) + :param field: default 'float' (only numeric curves) + :param interval: if given, the result is binned + :param add_prev: amount of time to look back for the last previous point (default: 1 hr) + :param add_end: whether to add endpoint at stop time (default: False) + :param merge: None: no merge happens, else curves with the same final key are merged. 2 cases: + one key (str): the name of final key. result will be a dict of of
+ a tuple of keys: the names of the final key elements. result: dict of of Table + :param pivot: sort values in to columns of one big table + :param kwds: further selection criteria + :return: a dict of
or + + where
is a list of tuples with some meta info (table.tags, table.column_names) + and is a list (a single row of a table) with the same meta info + + when _field='float' (the default), the returned values are either a floats or None + """ + tags = {k: v for k, v in (('_measurement', measurement), ('_field', field)) if v is not None} + tags.update(kwds) + start, stop = abs_range(start, stop) + rstart, rstop = round_range(start, stop, interval) + if rstart < rstop: + result = self.query(rstart, rstop, interval, columns=None, **tags) + # result = self.query(rstart, rstop, interval, columns=['stream', 'device'], **tags) + else: + result = {} + start_row = {} + if add_prev: + prev_data = self.query(rstart - add_prev, rstart, single=1, **tags) + for key, first in prev_data.items(): + curve = result.get(key) + if first[1] is not None: + if curve: + if first[0] < curve[0][0]: + if pivot: + curve.insert(0, (rstart,) + tuple(first[1:])) + # start_row.setdefault(key[1:], {})[key[0]] = first[1] + else: + curve.insert(0, tuple(first)) + else: + result[key] = table = Table(first.tags, first.key_names, first.column_names) + table.append(tuple(first)) + if add_end: + self.complete(result, stop) + if merge: + single_merge = isinstance(merge, str) + if single_merge: + merge = [merge] + rows = [] + common_tags = summarize_tags(result.values(), True) + col_keys = {} + col_info = {} + for key, curve in result.items(): + merge_tags = {k: curve.tags.get(k, '') for k in merge} + for k, v in zip(curve.key_names, key): + merge_tags.setdefault(k, v) + merge_key = tuple(zip(*merge_tags.items())) # (, ) + info = col_info.get(merge_key) + if info is None: + col_idx = len(col_keys) + 1 + col_keys[col_idx] = merge_key + col_info[merge_key] = info = [col_idx, []] + else: + col_idx = info[0] + info[1].append(curve) + assert curve.column_names[1] == '_value' + for row in curve: + rows.append((row[0], row[1], col_idx)) + # merged.append((merge_key, curve)) + rows.sort(key=lambda x: x[0]) + if pivot: + header = [] + for merge_key, (col_idx, curves) in col_info.items(): + tags = summarize_tags(curves) + primary = tags.pop(merge[0], None) + primary = [primary] if primary else [] + header.append(' '.join(primary + [f"{k}={v}" for k, v in tags.items() if k not in common_tags])) + result = Table(common_tags, (), ('_time',) + tuple(header)) + values = [0] + [None] * len(col_keys) + for row in rows: + col_nr = row[2] + values[col_nr] = row[1] + if row[0] > values[0]: + values[0] = row[0] + result.append(tuple(values)) + elif row[0] < values[0]: + raise ValueError(f'{rstart} {values[0]} {row[0]}') + else: + result = {} + by_idx = {} + for merge_key, (col_idx, curves) in col_info.items(): + tags = summarize_tags(curves) + print('TAGS', tags) + primary = tags.get(merge[0], '_value') + table = Table(tags, merge_key[0], ('_time', primary)) + result[primary if single_merge else merge_key[1][:len(merge)]] = table + by_idx[col_idx] = table + for row in rows: + by_idx[row[2]].append((row[0], row[1])) + return result + + @staticmethod + def complete(curve_dict, end_time=0, tag=None): + """complete to end_time + + if tag is given, end_time is a dict of + """ + if tag is None: + end_time_dict = {} + else: + end_time_dict, end_time = end_time, 0 + for curve in curve_dict.values(): + if len(curve): + tlast, value = curve[-1] + etime = end_time_dict.get(curve.tags.get(tag), end_time) + if value is not None and tlast < etime: + curve.append((etime, value)) + + def export(self, start, stop, measurement=('*.value', '*.target'), field='float', + interval=None, add_prev=3600, add_end=False, timeoffset=None, none='', **tags): + result = self.curves(start, stop, measurement, field, interval, add_prev, add_end, + merge=('_measurement', 'device', 'stream'), pivot=True, **tags) + if timeoffset is None: + timeoffset = int(start) + result.tags.pop('_field', None) + rows = [f"# {' '.join(f'{k}={v}' for k, v in result.tags.items())}"] + rows.extend(f'# col {i} {k}' for i, k in enumerate(result.column_names)) + rows.extend(result.to_csv_rows(timeoffset, none=none)) + rows.append('') + return '\n'.join(rows) + + def add_float(self, value, key, tags, ts): + self._add_point('.'.join(key), 'float', -0.0 if value is None else float(value), ts, tags) + + def add_error(self, value, key, tags, ts): + self._add_point('.'.join(key), 'error', '' if value is None else str(value), ts, tags) + + def get_instrument(self, stream, ts=None, **tags): + if ts is None: + ts = int(time.time()) + reply = self.query(None, int(ts) + 1, _measurement='_stream_', _field='on', + stream=stream, single=1, **tags) + if reply: + entry = sorted(reply.values(), key=lambda r: r[0])[-1] + return entry.tags.get('instrument', self.instrument), entry[0] + return None, None + + def get_streams(self, instrument=None, ts=None, **tags): + """get streams for one or all instruments + + :param instrument: None when looking for all instruments + :param ts: the time or None when now + :return: dict of or '0' when instrument is not known + """ + if ts is None: + ts = int(time.time()) + reply = self.query(None, int(ts) + 1, _measurement='_stream_', _field='on', + single=1, instrument=instrument, **tags) + all_entries = {} + for entry in reply.values(): + all_entries.setdefault(entry.tags.get('stream'), []).append(entry) + result = {} + for stream, entries in all_entries.items(): + entry = sorted(entries, key=lambda r: r[0])[-1] + if entry[1]: # on=True + result[stream] = entry.tags + return result + + def get_experiments(self, start=None, end=None, **tags): + """get experiments (periods with the same device/stream/instrument combination) + + :param start, end: the time period + :return: list of tuple(, , ) + """ + + interval = 1 + gap = 600 + entries = {} + end = time.time() + 60 + for rows, key, (tags, _, _) in self.query_gen( + start, end, _measurement='_stream_', _field='on', interval=interval, + stream=None, device=None, instrument=None): + stream = tags.get('stream') + instrument = tags.get('instrument') + device = tags.get('device') + elist = entries.setdefault(instrument or stream, []) + for row in rows: + elist.append(row[:2] + (stream, device)) + result = [] + for ins, rows in entries.items(): + rows.sort() + current = {} # dict of [, , ] + chunks = [current] + for ts, flag, stream, device in rows: + if flag: + prev = current.get(stream) + if prev: + if device == prev[0] and prev[1] + gap < ts < prev[2] + gap: + prev[2] = ts + continue + current = {} + chunks.append(current) + current[stream] = [device or stream, ts, end] + else: + prev = current.get(stream) + if prev: + prev[2] = ts + prevchange = 0 + prevdevices = {} # dict of + for chunk in chunks: + if chunk: + devices = {k: v[0] for k, v in chunk.items() if v[0]} + start = min(t[1] for t in chunk.values()) + if start > prevchange + gap or any(v != devices.get(k) for k, v in prevdevices.items()): + prevchange = start + prevdevices = devices + result.append((start, max(t[2] for t in chunk.values()), ins, devices)) + result.sort() + for start, end, ins, devices in result: + print(' .. '.join(time.strftime('%Y-%m-%d-%H:%M', time.localtime(t)) for t in (start, end)), + ins, devices) + return result + + def set_instrument(self, stream, value, ts=None, guess=True, **tags): + """set stream and instrument on or off + + :param stream: the uri of the stream + :param value: instrument, "0" when unknown or None when switching to off + :param ts: the time or None when now + :param guess: when instrument is undefined, take from previous + """ + prev, prevts = self.get_instrument(stream, ts, **tags) + if prevts is not None: + if prev in (None, '0'): + ts = prevts + 0.001 + else: + if value == '0' and guess: + value = prev + if ts < prevts: + ts = prevts + 0.001 + tags['stream'] = stream + if value: + tags['instrument'] = value + flag = True + else: + tags['instrument'] = prev or '0' + flag = False + self._add_point('_stream_', 'on', flag, ts, tags) + + def add_stream(self, value, tags, key, ts): + self.set_instrument(key, value, ts, **tags) diff --git a/streams.py b/streams.py index 05cb8ff..c0ff36d 100644 --- a/streams.py +++ b/streams.py @@ -32,9 +32,12 @@ def short_hostname(host): - treat case where -129129xxxx is appended """ host = socket.gethostbyaddr(host)[0] - match = re.match(r'([^.-]+)(?:-129129\d{6}|(-[~.]*|)).psi.ch', host) - if match: - host = match.group(1) + (match.group(2) or '') + if host == 'localhost': + host = socket.gethostname() + else: + match = re.match(r'([^.-]+)(?:-129129\d{6}|(-[~.]*|)).psi.ch', host) + if match: + host = match.group(1) + (match.group(2) or '') return host @@ -246,8 +249,9 @@ class EventStream: # note: a stream with buffered content might not be ready to emit any event, because # of filtering - def __init__(self, *udp, **streams): + def __init__(self, *udp, instrument=None, **streams): self.streams = streams + self.instrument = instrument self.udp = {v.socket.fileno(): v for v in udp} def wait_ready(self, timeout): diff --git a/t.py b/t.py index 80ec610..b69926d 100644 --- a/t.py +++ b/t.py @@ -1,13 +1,14 @@ import time import math import numpy as np -from influx import InfluxDBWrapper, RegExp +from sehistory.sehistory import SEHistory +from influx import RegExp DAY = 24 * 3600 # token = "zqDbTcMv9UizfdTj15Fx_6vBetkM5mXN56EE9CiDaFsh7O2FFWZ2X4VwAAmdyqZr3HbpIr5ixRju07-oQmxpXw==" -db = InfluxDBWrapper('linse-c') +db = SEHistory(access='write') print(""" qry([start], [stop], [interval=...,] [last=True,] [columns=[...],] [=, ] ...)