From e2122fb3a052547573cd0aae53454318e7f2b94d Mon Sep 17 00:00:00 2001 From: Markus Zolliker Date: Wed, 26 Feb 2025 13:43:39 +0100 Subject: [PATCH] import get_experiments - in case of localhost, remove .psi.ch from normalized host name + fix import in t.py + allow boolean as selection criterium --- influx.py | 4 ++++ seinflux.py | 66 ++++++++++++++++++++++++++++++++++++----------------- streams.py | 7 +++--- t.py | 2 +- 4 files changed, 53 insertions(+), 26 deletions(-) diff --git a/influx.py b/influx.py index 6e21d1d..7d2dbd0 100644 --- a/influx.py +++ b/influx.py @@ -338,6 +338,10 @@ class InfluxDBWrapper: fixed_tags[key] = crit dropcols.append(key) crit = f'"{crit}"' + elif isinstance(crit, bool): + crit = 'true' if crit else 'false' + fixed_tags[key] = crit + dropcols.append(key) elif isinstance(crit, (int, float)): fixed_tags[key] = crit dropcols.append(key) diff --git a/seinflux.py b/seinflux.py index bf2e9a2..7e9282f 100644 --- a/seinflux.py +++ b/seinflux.py @@ -206,25 +206,47 @@ class SEHistory(InfluxDBWrapper): def get_experiments(self, start=None, end=None, **tags): """get experiments (periods with the same device/stream/instrument combination) - :param start, end: the time period - :return: list of tuple(, , ) + :param start: start of time period + :param end: end of time period + :return: list of tuple((, ), ('instrument' or 'stream', ), dict of ) """ interval = 1 gap = 600 + eternity = 1e10 entries = {} - end = time.time() + 60 - for rows, key, (tags, _, _) in self.query_gen( - start, end, _measurement='_stream_', _field='on', interval=interval, - stream=None, device=None, instrument=None): - stream = tags.get('stream') - instrument = tags.get('instrument') - device = tags.get('device') - elist = entries.setdefault(instrument or stream, []) - for row in rows: + if start is None: + previous = {} + else: + previous = self.query(None, start, _measurement='_stream_', _field='on', interval=interval, + stream=None, device=None, instrument=None, single=1) + if end is None: + nextrow = {} + else: + nextrow = self.query(end, None, _measurement='_stream_', _field='on', interval=interval, + stream=None, device=None, instrument=None, single=-1) + start, end = abs_range(start, end) + inperiod = self.query(start, end, _measurement='_stream_', _field='on', interval=interval, + stream=None, device=None, instrument=None) + for key, single in previous.items(): + if key in inperiod: + inperiod[key].insert(0, tuple(single)) + else: + inperiod[key] = Table(rows=[tuple(single)], **single.__dict__) + + for key, table in inperiod.items(): + nextvalue = nextrow.get(key) + if nextvalue: + print('N', key, nextvalue, table.tags) + if nextvalue and not nextvalue[1]: + table.append(tuple(nextvalue)) + stream, instrument, device = [table.tags.get(k, '') for k in ('stream', 'instrument', 'device')] + key = ('instrument', instrument) if instrument else ('stream', stream) + elist = entries.setdefault(key, []) + for row in table: elist.append(row[:2] + (stream, device)) result = [] - for ins, rows in entries.items(): + for key, rows in entries.items(): rows.sort() current = {} # dict of [, , ] chunks = [current] @@ -237,25 +259,27 @@ class SEHistory(InfluxDBWrapper): continue current = {} chunks.append(current) - current[stream] = [device or stream, ts, end] + current[stream] = [device or stream, ts, eternity] else: prev = current.get(stream) if prev: prev[2] = ts - prevchange = 0 + prevexpt = [0, 0] prevdevices = {} # dict of for chunk in chunks: if chunk: devices = {k: v[0] for k, v in chunk.items() if v[0]} - start = min(t[1] for t in chunk.values()) - if start > prevchange + gap or any(v != devices.get(k) for k, v in prevdevices.items()): - prevchange = start + beg = min(t[1] for t in chunk.values()) + if beg > prevexpt[0] + gap or any(v != devices.get(k) for k, v in prevdevices.items()): prevdevices = devices - result.append((start, max(t[2] for t in chunk.values()), ins, devices)) + if prevexpt[1] > beg: + prevexpt[1] = beg # shorten previous + prevexpt = [beg, max(t[2] for t in chunk.values()), key, devices] + result.append(prevexpt) result.sort() - for start, end, ins, devices in result: - print(' .. '.join(time.strftime('%Y-%m-%d-%H:%M', time.localtime(t)) for t in (start, end)), - ins, devices) + for expt in result: + if expt[-1] == eternity: + expt[-1] = time.time() return result def set_instrument(self, stream, value, ts=None, guess=True, **tags): diff --git a/streams.py b/streams.py index b025757..3a37f88 100644 --- a/streams.py +++ b/streams.py @@ -34,10 +34,9 @@ def short_hostname(host): host = socket.gethostbyaddr(host)[0] if host == 'localhost': host = socket.gethostname() - else: - match = re.match(r'([^.-]+)(?:-129129\d{6}|(-[~.]*|)).psi.ch', host) - if match: - host = match.group(1) + (match.group(2) or '') + match = re.match(r'([^.-]+)(?:-129129\d{6}|(-[~.]*|)).psi.ch', host) + if match: + host = match.group(1) + (match.group(2) or '') return host diff --git a/t.py b/t.py index b69926d..cc6a618 100644 --- a/t.py +++ b/t.py @@ -1,7 +1,7 @@ import time import math import numpy as np -from sehistory.sehistory import SEHistory +from seinflux import SEHistory from influx import RegExp DAY = 24 * 3600