diff --git a/feeder.py b/feeder.py index 214dd95..95e47ed 100644 --- a/feeder.py +++ b/feeder.py @@ -1,21 +1,23 @@ -from streams import Stream, EventGenerator +from streams import EventStream from nicoscache import NicosStream from secop import ScanStream, ScanReply from influx import testdb def main(): - e = EventGenerator(ScanReply(), ScanStream(), n=NicosStream('localhost:14002')) - e.return_on_wait = False + # egen = EventStream(ScanReply(), ScanStream(), n=NicosStream('localhost:14002')) + egen = EventStream(ScanReply(), ScanStream()) db = testdb() - errors = {} - while 1: - for (obj, param), value, error, ts, tags in e: - meas = f'{obj}.{param}' - db.write_float(meas, 'float', value, ts, **tags) - if error and error != errors.get(meas): - errors[meas] = error - db.write_string(meas, 'error', error, ts, **tags) - print('---') + db.enable_write_access() + try: + while 1: + for event in egen.get_events(): + db.add_point(*event) + db.flush() + finally: + for event in egen.finish(): + db.add_point(*event) + db.disconnect() + main() diff --git a/influx.py b/influx.py index c9760d2..d4abaf4 100644 --- a/influx.py +++ b/influx.py @@ -16,33 +16,94 @@ # 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA # # Module authors: -# Konstantin Kholostov +# Markus Zolliker # # ***************************************************************************** import re import time -import threading -import queue -from math import floor, ceil, copysign from datetime import datetime -import pandas as pd +from math import floor, ceil from influxdb_client import InfluxDBClient, BucketRetentionRules, Point -from influxdb_client.client.write_api import SYNCHRONOUS as write_option +from influxdb_client.client.write_api import SYNCHRONOUS -OFFSET, SCALE = pd.to_datetime(['1970-01-01 00:00:00', '1970-01-01 00:00:01']).astype(int) DAY = 24 * 3600 # write_precision from digits after decimal point TIME_PRECISION = ['s'] + ['ms'] * 3 + ['us'] * 3 + ['ns'] * 3 +try: + parse_time = datetime.fromisoformat +except AttributeError: + from dateutil.parser import parse as parse_time + + +def to_time(v): + return parse_time(v).timestamp() + + def identity(v): return v -def negnull2none(v): - """converts -0.0 to None, returns argument for everything else, also strings""" - return None if v == 0 and copysign(1, v) == -1 else v +def double(v): + return None if v == '-0' else float(v) + + +CONVERTER = { + 'string': identity, + 'long': int, + 'double': double, + 'unsigned_long': int, + 'duration': int, + 'dateTime:RFC3339': to_time, + 'dateTime:RFC3339Nano': to_time, + # 'base64Binary': base64.b64decode, +} + + +class NamedTuple(tuple): + """for our purpose improved version of collection.namedtuple + + - names may be any string, but when not an identifer, attribute access is not possible + - access by key with get ([ ] is for indexed access) + + Usage: + + MyNamedTuple = NamedTuple.make_class(('a', 'b')) + x = MyNamedTuple(('a', 2.0)) + assert x == ('a', 2.0) == (x.a, x.b) == (x.get('a'), x.get('b')) == (x[0], x[1]) + """ + keys = None + _idx_by_name = None + + def __new__(cls, keys): + """create NamedTuple class from keys + + :param keys: a sequence of names for the elements + """ + idxbyname = {n: i for i, n in enumerate(keys)} + attributes = {n: property(lambda s, idx=i: s[idx]) + for i, n in enumerate(keys) + if n.isidentifier() and not hasattr(cls, n)} + # clsname = '_'.join(attributes) + attributes.update(_idx_by_name=idxbyname, __new__=tuple.__new__, keys=tuple(keys)) + return type(f"NamedTuple", (cls,), attributes) + + def get(self, key, default=None, strict=False): + """get item by key + + :param key: the key + :param default: value to return when key does not exist + :param strict: raise KeyError when key does not exist and ignore default + :return: the value of requested element or default if the key does not exist + """ + try: + return self[self._idx_by_name[key]] + except KeyError: + if strict: + raise + return default class RegExp(str): @@ -53,14 +114,17 @@ class RegExp(str): """ -def wildcard_filter(key, names): +def append_wildcard_filter(msg, key, names): patterns = [] for pattern in names: if isinstance(pattern, RegExp): patterns.append(pattern) else: - patterns.append('[^.]*'.join(re.escape(v) for v in pattern.split('*'))) - return f'|> filter(fn:(r) => r.{key} =~ /^({pattern})$/)' + # patterns.append('[^.]*'.join(re.escape(v) for v in pattern.split('*'))) + patterns.append('.*'.join(re.escape(v) for v in pattern.split('*'))) + if patterns: + pattern = '|'.join(patterns) + msg.append(f'|> filter(fn:(r) => r.{key} =~ /^({pattern})$/)') class CurveDict(dict): @@ -68,54 +132,7 @@ class CurveDict(dict): return [] -class NamedTuple(tuple): - """for our purpose improved version of collection.namedtuple - - - names may be any string, but when not an identifer, no attribute access - - dict like access with [] - - customized converter (or validator) function for initialization - - Usage: - - MyNamedTuple1 = NamedTuple('a', 'b') - x = MyNamedTuple1(1, 'y') - assert x == (1, 'y') == (x.a, x.b) - MyNamedTuple2 = NamedTuple(a=str, b=float) - y = MyNamedTuple2(10, b='2') - assert y == ('10', 2) == (y['a'], y['b']) - """ - _indices = None - _converters = None - - def __new__(cls, *args, **kwds): - if cls is NamedTuple: - return cls.getcls(dict({a: identity for a in args}, **kwds)) - values = dict(zip(cls._converters, args), **kwds) - elements = [] - for key, cvt in cls._converters.items(): - try: - elements.append(cvt(values[key])) - except KeyError: - elements.append(None) - return super().__new__(cls, elements) - - def __getitem__(self, key): - try: - return tuple(self)[key] - except Exception: - return tuple(self)[self._indices[key]] - - @classmethod - def getcls(cls, converters): - attributes = {'_converters': converters, - '_indices': {k: i for i, k in enumerate(converters)}} - for idx, name in enumerate(converters): - if name.isidentifier(): - attributes[name] = property(lambda s, i=idx: s[i]) - return type('NamedTuple', (cls,), attributes) - - -def abs_range(start=None, stop=None, interval=None): +def abs_range(start=None, stop=None): now = time.time() if start is None: start = int(now - 32 * DAY) @@ -142,64 +159,38 @@ class InfluxDBWrapper: (work of Konstantin Kholostov ) """ _update_queue = None - _write_thread = None + _write_api_write = None - def __init__(self, url, token, org, bucket, threaded=False, create=False, watch_interval=300): - self._watch_interval = watch_interval + def __init__(self, url, token, org, bucket, access='readonly'): + """initialize + + :param url: the url for the influx DB + :param token: the token + :param org: the organisation + :param bucket: the bucket name + :param access: 'readonly', 'write' (RW) or 'create' (incl. RW) + """ self._url = url self._token = token self._org = org self._bucket = bucket self._client = InfluxDBClient(url=self._url, token=self._token, org=self._org) - self._write_api_write = self._client.write_api(write_options=write_option).write + if access != 'readonly': + self.enable_write_access() self._deadline = 0 - self._active_streams = {} self.set_time_precision(3) - self.add_new_bucket(self._bucket, create) - if threaded: - self._update_queue = queue.Queue(100) - self._write_thread = threading.Thread(target=self._write_thread) + self.add_new_bucket(self._bucket, access == 'create') + self._write_buffer = [] - def _write(self, point): - if self._update_queue: - self._update_queue.put(point) - else: - self._write_api_write(bucket=self._bucket, record=[point]) - - def _write_thread(self): - while 1: - points = [self.write_queue.get()] - try: - while 1: - points.append(self.write_queue.get(False)) - except queue.Empty: - pass - self._write_api_write(bucket=self._bucket, record=points) - event = self._wait_complete - if event: - self._wait_complete = None - event.set() - - def flush(self): - if self._write_thread: - self._wait_complete = event = threading.Event() - event.wait() + def enable_write_access(self): + self._write_api_write = self._client.write_api(write_options=SYNCHRONOUS).write def set_time_precision(self, digits): self.timedig = max(0, min(digits, 9)) self._write_precision = TIME_PRECISION[self.timedig] - def to_timestamp(self, timevalue): - return round(timevalue.timestamp(), self.timedig) - def disconnect(self): - for _ in range(10): - self._write_thread.join(1) - for stream, last in self._active_streams.items(): - self._write(Point('_streams_') - .time(last, write_precision=self._write_precision) - .field('interval', 0).tag('stream', stream)) self.flush() self._client.close() @@ -231,138 +222,209 @@ class InfluxDBWrapper: bucket=self._bucket, org=self._org) def delete_all_measurements(self): - all = self.get_measurements() - for meas in all: - self.get_measurements(meas) - print('deleted', all) + measurements = self.get_measurements() + for meas in measurements: + self.delete_measurement(meas) + print('deleted', measurements) - def query(self, start=None, stop=None, interval=None, last=False, **tags): + # query the database + + def query(self, start=None, stop=None, interval=None, last=False, columns=None, **tags): """Returns queried data as InfluxDB tables :param start: start time. default is a month ago :param stop: end time, default is tomorrow at the same time - :param interval: is set an aggregation filter will be applied. This will - return only the latest values for a time interval in seconds. + :param interval: if set an aggregation filter will be applied. This will + return only the latest values per time interval in seconds. :param last: when True, only the last value within the interval is returned (for any existing combinations of tags!) + :param columns: if given, return only these columns (in addition to '_time' and '_value') :param tags: selection criteria: =None - return records independent of the tag. the value will be contained in the result dicts key + return records independent of this tag. the value will be contained in the result dicts key =[, , ...] - return records with given tag from the list. the value is contained in the result dicts key + return records where tag is one from the list. the value is contained in the result dicts key >=: return only records with given tag matching . the value is not part of the results key - = - where is a callable. return tag as value in returned rows. is used for value conversion - :return: a dict of list of > + :return: a dict of list of where and are NamedTuple """ self.flush() - start, stop = round_range(*abs_range(start, stop), interval) + start, stop = round_range(*abs_range(start, stop)) msg = [f'from(bucket:"{self._bucket}")', f'|> range(start: {start}, stop: {stop})'] - columns = {'_time': self.to_timestamp, '_value': negnull2none} keynames = [] + dropcols = ['_start', '_stop'] for key, crit in tags.items(): if crit is None: keynames.append(key) continue - if callable(crit): - columns[key] = crit - continue if isinstance(crit, str): if isinstance(crit, RegExp) or '*' in crit: keynames.append(key) - msg.append(wildcard_filter(key, [crit])) + append_wildcard_filter(msg, key, [crit]) continue + dropcols.append(key) crit = f'"{crit}"' - elif not isinstance(crit, (int, float)): + elif isinstance(crit, (int, float)): + dropcols.append(key) + else: try: keynames.append(key) - msg.append(wildcard_filter(key, crit)) + append_wildcard_filter(msg, key, crit) continue except Exception: raise ValueError(f'illegal value for {key}: {crit}') msg.append(f'|> filter(fn:(r) => r.{key} == {crit})') - if last: msg.append('|> last(column: "_time")') if interval: msg.append(f'|> aggregateWindow(every: {interval}s, fn: last, createEmpty: false)') - if columns is not None: - msg.append(f'''|> keep(columns:["{'","'.join(list(columns) + keynames)}"])''') + if columns is None: + msg.append(f'''|> drop(columns:["{'","'.join(dropcols)}"])''') + else: + columns = ['_time', '_value'] + list(columns) + msg.append(f'''|> keep(columns:["{'","'.join(columns + keynames)}"])''') + msg = '\n'.join(msg) print(msg) - tables = self._client.query_api().query(msg) - result = CurveDict() - keycls = NamedTuple(*keynames) - colcls = NamedTuple(**columns) - for table in tables: - key = None - for rec in table: - print(rec.values) - if key is None: - key = keycls(**rec.values) - result.setdefault(key, []) - data = colcls(**rec.values) - result[key].append(data) - result[key].sort(key=lambda v: v[0]) - print('---') + + reader = self._client.query_api().query_csv(msg) + sort = False + converters = None + group = None + column_names = None + key = None + result = {} + table = None + + for row in reader: + if not row: + continue + if row[0]: + if row[0] == '#datatype': + converters = {i: CONVERTER.get(d) for i, d in enumerate(row) if i > 2} + column_names = None + elif row[0] == '#group': + group = row + continue + if column_names is None: + column_names = row + keys = {} + for col, (name, grp) in enumerate(zip(column_names, group)): + if grp != 'true': + continue + # if name in keynames or (columns is None and name not in dropcols): + if name in keynames or columns is None: + keys[col] = converters.pop(col) + else: + sort = True + valuecls = NamedTuple([row[i] for i in converters]) + keycls = NamedTuple([row[i] for i in keys]) + continue + if row[2] != table: + # new table, new key + table = row[2] + key = keycls(f(row[i]) for i, f in keys.items()) + if result.get(key) is None: + result[key] = [] + elif not sort: + # this should not happen + sort = True + + result[key].append(valuecls(f(row[i]) for i, f in converters.items())) + if last: + for key, table in result.items(): + result[key], = table + elif sort: + for table in result.values(): + table.sort() return result def curves(self, start=None, stop=None, measurement=None, field='float', interval=None, - add_prev=True, **tags): + add_prev=3600, add_end=False, **tags): + """get curves + :param start: start time (default: one month ago) + :param stop: end time (default: tomorrow) + :param measurement: '.' (default: ['*.value', '*.target']) + :param field: default 'float' (only numeric curves) + :param interval: if given, the result is binned + :param add_prev: amount of time to look back for the last previous point (default: 1 hr) + :param add_end: whether to add endpoint at stop time (default: False) + :param tags: further selection criteria + :return: a dict of list of + where and are NamedTuple + is (, ) + + when field='float' (the default), the returned values are either a floats or None + """ for key, val in zip(('_measurement', '_field'), (measurement, field)): tags.setdefault(key, val) start, stop = abs_range(start, stop) rstart, rstop = round_range(start, stop, interval) if rstart < rstop: - result = self.query(rstart, rstop, interval, **tags) + result = self.query(rstart, rstop, interval, columns=[], **tags) else: result = {} if add_prev: - prev = self.query(rstart - DAY, rstart, last=True, **tags) - for key, prev in prev.items(): + prev_data = self.query(rstart - add_prev, rstart, last=True, **tags) + for key, first in prev_data.items(): curve = result.get(key) - first = prev[-1] if first[1] is not None: if curve: if first[0] < curve[0][0]: curve.insert(0, first) else: result[key] = [first] + if add_end: + for key, curve in result.items(): + if curve: + last = list(curve[-1]) + if last[0] < stop: + last[0] = stop + curve.append(type(curve[-1])(last)) return result - def write(self, measurement, field, value, ts, **tags): - self._active_streams[tags.get('stream')] = ts - if ts > self._deadline: - dl = ts // self._watch_interval * self._watch_interval - for stream, last in self._active_streams.items(): - self._write( - Point('_streams_') - .time(datetime.utcfromtimestamp(last), write_precision=self._write_precision) - .field('interval', self._watch_interval).tag('stream', stream)) - self._active_streams.clear() - self._deadline = dl + self._watch_interval + # write to the database + + def _add_point(self, value, ts, measurement, field, tags): point = Point(measurement).field(f'{field}', value) if ts: point.time(datetime.utcfromtimestamp(ts), write_precision=self._write_precision) for key, val in tags.items(): point.tag(key, val) - self._write(point) + self._write_buffer.append(point) - def write_float(self, measurement, field, value, ts, **tags): - # make sure value is float - value = -0.0 if value is None else float(value) - self.write(measurement, field, value, ts, **tags) + def write(self, measurement, field, value, ts, **tags): + """add point and flush""" + self._add_point(measurement, field, value, ts, tags) + self.flush() - def write_string(self, measurement, field, value, ts, **tags): - # make sure value is string - value = '' if value is None else str(value) - self.write(measurement, field, value, ts, **tags) + def flush(self): + """flush write buffer""" + points = self._write_buffer + self._write_buffer = [] + if points: + try: + self._write_api_write(bucket=self._bucket, record=points) + except TypeError as e: + if self._write_api_write is None: + raise PermissionError('no write access - need access="write"') from None + raise + + def add_point(self, isfloat, value, *args): + """add point to the buffer + + flush must be called in order to write the buffer + """ + if isfloat: + # make sure value is float + self._add_point(-0.0 if value is None else float(value), *args) + else: + self._add_point('' if value is None else str(value), *args) def testdb(): diff --git a/nicoscache.py b/nicoscache.py index d6c2a45..8768230 100644 --- a/nicoscache.py +++ b/nicoscache.py @@ -116,7 +116,10 @@ class NicosStream(Stream): def ping(self): self.send(f'{PING}{OP_ASK}') - def events(self, matchmsg=msg_pattern.match): + def get_tags(self, key): + return self.tags + + def event_generator(self, matchmsg=msg_pattern.match): if self.is_offline(): return events = {} @@ -168,4 +171,4 @@ class NicosStream(Stream): value = None error = 'error' cnt += 1 - yield key, value, error, ts, self.tags + yield key, value, error, ts, self.get_tags(key) diff --git a/secop.py b/secop.py index e8003c9..fde46ee 100644 --- a/secop.py +++ b/secop.py @@ -21,7 +21,6 @@ class SecopStream(Stream): def init(self): self._buffer = [] - print('send idn', 'tmo:', self.socket.timeout) self.send('*IDN?') resend = True messages = [] @@ -64,14 +63,18 @@ class SecopStream(Stream): def ping(self): self.send('ping') - def events(self): + def get_tags(self, key): + return dict(self.tags, device=self.original_id.get(key[0], self.device)) + + def event_generator(self): try: cnt = 0 for msg in self.get_lines(): match = UPDATE.match(msg) if match: - cmd, id, data = match.groups() - key = tuple(id.split(':')) + cmd, ident, data = match.groups() + mod, _, param = ident.partition(':') + key = (mod, param or 'value') cvt = self.convert.get(key) if cvt: data = json.loads(data) @@ -85,12 +88,10 @@ class SecopStream(Stream): ts = data[1].get('t', time.time()) value = cvt(data[0]) cnt += 1 - yield key, value, error, ts, dict( - self.tags, device=self.original_id.get(key[0], self.device)) + yield key, value, error, ts, self.get_tags(key) elif msg == 'active': # from now on, no more waiting self.notimeout() - #print('SECOP', self.uri, 'cnt=', cnt) except Exception as e: print(self.uri, repr(e)) diff --git a/streams.py b/streams.py index 24a6a5e..d00b0f9 100644 --- a/streams.py +++ b/streams.py @@ -4,6 +4,10 @@ import re from select import select +class StreamDead(Exception): + """raised when stream is dead""" + + def parse_uri(uri): scheme, _, hostport = uri.rpartition('://') scheme = scheme or 'tcp' @@ -22,6 +26,18 @@ class Base: select_dict = {} +def short_hostname(host): + """psi/lin/se special + + - treat case where -129129xxxx is appended + """ + host = socket.gethostbyaddr(host)[0] + match = re.match(r'([^.-]+)(?:-129129\d{6}|(-[~.]*|)).psi.ch', host) + if match: + host = match.group(1) + (match.group(2) or '') + return host + + class Stream(Base): _last_time = None dead = False @@ -41,11 +57,13 @@ class Stream(Base): self.timeout = timeout self.socket = None self.cache = {} - self.first_values = time.time() + self.errors = {} + self.start_time = time.time() + self.next_hour = (self.start_time // 3600 + 1) * 3600 + self.generator = self.event_generator() try: self.connect() self.init() - self.first_values = 0 except Exception as e: print(self.uri, repr(e)) raise @@ -56,11 +74,7 @@ class Stream(Base): self.settimeout(self.timeout) host, _, port = self.uri.partition(':') # try to convert uri to host name - host = socket.gethostbyaddr(host)[0] - # psi special: shorten name, in case a computer has several network connections - match = re.match(r'([^.-]+)(?:-129129\d{6}|(-[~.]*|)).psi.ch', host) - if match: - host = match.group(1) + (match.group(2) or '') + host = short_hostname(host) self.tags['stream'] = f'{host}:{port}' print(self.uri, '=', self.tags['stream'], 'connected') self._buffer = [] @@ -69,7 +83,7 @@ class Stream(Base): self._pinged = False def init(self): - pass + raise NotImplementedError def ping(self): raise NotImplementedError @@ -153,9 +167,7 @@ class Stream(Base): received = self.socket.recv(NETBUFSIZE) if not received: raise ConnectionAbortedError('connection closed by other end') - except TimeoutError: - break - except BlockingIOError: + except (TimeoutError, BlockingIOError): break except Exception: self._last_live = now @@ -174,8 +186,55 @@ class Stream(Base): if len(received) < NETBUFSIZE and self.timeout == 0: break + def event_generator(self): + raise NotImplementedError -class EventGenerator: + def get_tags(self, key): + """get tags for key""" + raise NotImplementedError + + def finish_events(self, events, end_time): + for key in list(self.cache): + self.cache.pop(key) + dbkey = '.'.join(key) + events.append((True, None, end_time, dbkey, 'float', self.tags)) + events.append((False, 'END', end_time, dbkey, 'error', self.tags)) + + def get_events(self, events, maxevents): + """get available events + + :param events: a list events to be appended to + :param maxevents: hint for max number of events to be joined + there might be more after a full hour or when the stream is dying + :return: True when maxevents is reached + """ + for key, value, error, ts, tags in self.generator: + ts = max(self.start_time, min(ts or INF, time.time())) + if ts >= self.next_hour: + ts_ = (ts // 3600) * 3600 + for key_, value_ in self.cache.items(): + events.append((True, value_, ts_, '.'.join(key_), 'float', self.get_tags(key_))) + for key_, error_ in self.errors.items(): + events.append((False, error_, ts_, '.'.join(key_), 'error', self.get_tags(key_))) + self.next_hour = ts_ + 3600 + if value != self.cache.get(key, None) or error != self.errors.get(key, None): + dbkey = '.'.join(key) + events.append((True, value, ts, dbkey, 'float', tags)) + self.cache[key] = value + if error and self.errors.get(key) != error: + events.append((False, error, ts, dbkey, 'error', tags)) + self.errors[key] = error + elif len(events) >= maxevents: + return True + else: + if self.dead: + self.finish_events(events, self.dead) + raise StreamDead() + self.generator = self.event_generator() + return False + + +class EventStream: return_on_wait = False # return_on_wait = True: stop generator when no more streams have buffered content # note: a stream with buffered content might not be ready to emit any event, because @@ -189,31 +248,38 @@ class EventGenerator: ready = select(Stream.select_dict, [], [], timeout)[0] return [Stream.select_dict[f] for f in ready] - def gen(self): + def get_events(self, maxevents=20): + """return events from all streams + + :param maxevents: hint for max number of events to be joined + there might be more after a full hour or when the stream is dying + :return: list of events + + wait for at least one event + """ + events = [] while 1: for stream in self.wait_ready(1): if not isinstance(stream, Stream): for streamcls, uri, *args in stream.events(): if uri not in self.streams: - print('ADD STREAM', uri, *args) + print('add stream', uri, *args) self.streams[uri] = streamcls(uri, *args) for name, stream in self.streams.items(): - if stream.dead: - print('REMOVE STREAM', name) - for key in stream.cache: - yield key, None, 'END', stream.dead, stream.tags + try: + if stream.get_events(events, maxevents): + return events + except StreamDead: self.streams.pop(name) - break - for key, value, error, ts, tags in stream.events(): - ts = max(stream.first_values, min(ts or INF, time.time())) - prev = stream.cache.get(key, None) - if (value, error) != prev: - yield key, value, error, ts, tags - stream.cache[key] = value, error - if self.return_on_wait and not self.wait_ready(0): - return - - def __iter__(self): - return self.gen() - + if events: + return events + if events: + return events + def finish(self): + events = [] + end_time = time.time() + for stream in self.streams.values(): + stream.close() + stream.finish_events(events, end_time) + return events diff --git a/t.py b/t.py index fdc415f..6871f9b 100644 --- a/t.py +++ b/t.py @@ -1,93 +1,49 @@ import time import numpy as np import pandas as pd -from influx import InfluxDBWrapper, NamedTuple +from influx import InfluxDBWrapper, NamedTuple, RegExp -NAN = float('nan') +DAY = 24 * 3600 token = "zqDbTcMv9UizfdTj15Fx_6vBetkM5mXN56EE9CiDaFsh7O2FFWZ2X4VwAAmdyqZr3HbpIr5ixRju07-oQmxpXw==" db = InfluxDBWrapper('http://pc16392:8086', token, 'linse', 'curve-test') -print('read(dev, [param], [start], [end], [interval])') +print(""" +qry([start], [stop], [interval=...,] [last=True,] [columns=[...],] [=, ] ...) +crv([start], [stop], [mod.par], ['float'], [interval=...,] [add_prev=False,] [add_end=True,] [=, ] ...) +""") + +offset = (time.time() // 3600) * 3600 +result = {} -def read(dev=None, param='value_float', start=None, end=None, interval=None, offset=None, **tags): - now = time.time() - if start is None: - start = now - 1800 - elif start < 0: - start += now - measurement = dev - # measurement = f'nicos/{dev}' if dev else None - print('QUERY', measurement, param, start, end, interval) - tables = db.query(measurement, param, start, end, interval, **tags) - sec = time.time()-now - print(f'query took {sec:.3f} seconds') - now = time.time() - result = {} - if offset is None: - offset = start - elif offset == 'now': - offset = now - for table in tables: - for rec in table.records: - # value = None if rec['expired'] == 'True' else rec['_value'] - value = rec['_value'] - ts = rec['_time'].timestamp() - key = f"{rec['_measurement']}:{rec['_field']}" - # print(rec['expired']) - result.setdefault(key, []).append(((rec['_time'].timestamp() - offset), value)) - sec = time.time()-now - print(f'rearrange took {sec:.3f} seconds') - pts = sum(len(v) for v in result.values()) - print(pts, 'points', round(pts / sec / 1000), 'points/ms') - for curve, data in result.items(): - result[curve] = sorted(data) - return result - - -def summ(result): - for meas, curves in result.items(): - print(meas) - for field, curve in curves.items(): - timerange = '..'.join([time.strftime('%m-%d %H:%M:%S', time.localtime(curve[i][0])) for i in [0,-1]]) - print(' ', timerange, len(curve[:,0]), field) - - -def getlast(*args, **kwds): - for meas, params in db.getLast(*args, **kwds).items(): - print('---', meas) - for key, (value, error) in params.items(): - if key[0].startswith('?'): - continue - if error: - print(key, 'ERROR', error) +def prt(): + for i, (key, curve) in enumerate(result.items()): + if i > 5: + print('--- ...') + break + print('---', key, list(curve[0]._idx_by_name)) + n = len(curve) + if n > 7: + curves = [curve[:3], None, curve[-3:]] + else: + curves = [curve] + for crv in curves: + if crv is None: + print('...') else: - print(key, value) + for row in crv: + print(round(row[0] - offset, db.timedig), row[1:]) -def curves(*args, offset=0, **kwds): +def qry(*args, **kwds): + global result + result = db.query(*args, **kwds) + prt() + + +def crv(*args, **kwds): + global result result = db.curves(*args, **kwds) - for key, curve in result.items(): - print('---', key) - for row in curve: - print(round(row[0]-offset, db.timedig), row[1:]) - - -start = time.mktime((2024, 5, 30, 0, 0, 0, 0, 0, -1)) -end = time.mktime((2024, 6, 19, 0, 0, 0, 0, 0, -1)) -DAY = 24 * 3600 -now = time.time() - - -def prt(key, *args, offset=0, scale=1, **kwds): - for key, curve in db.getCurve(key, *args, **kwds).items(): - print('---', key) - for t, v in curve: - print(t, v) - - -def test(): - pass - + prt()