From 9208ff993d4e07dc967f7337a78ef4f6a4d447b9 Mon Sep 17 00:00:00 2001 From: Markus Zolliker Date: Thu, 16 Jan 2025 09:36:26 +0100 Subject: [PATCH] Initial commit --- feeder.py | 21 ++++ influx.py | 335 ++++++++++++++++++++++++++++++++++++++++++++++++++ nicoscache.py | 171 ++++++++++++++++++++++++++ secop.py | 140 +++++++++++++++++++++ t.py | 93 ++++++++++++++ 5 files changed, 760 insertions(+) create mode 100644 feeder.py create mode 100644 influx.py create mode 100644 nicoscache.py create mode 100644 secop.py create mode 100644 t.py diff --git a/feeder.py b/feeder.py new file mode 100644 index 0000000..214dd95 --- /dev/null +++ b/feeder.py @@ -0,0 +1,21 @@ +from streams import Stream, EventGenerator +from nicoscache import NicosStream +from secop import ScanStream, ScanReply +from influx import testdb + + +def main(): + e = EventGenerator(ScanReply(), ScanStream(), n=NicosStream('localhost:14002')) + e.return_on_wait = False + db = testdb() + errors = {} + while 1: + for (obj, param), value, error, ts, tags in e: + meas = f'{obj}.{param}' + db.write_float(meas, 'float', value, ts, **tags) + if error and error != errors.get(meas): + errors[meas] = error + db.write_string(meas, 'error', error, ts, **tags) + print('---') + +main() diff --git a/influx.py b/influx.py new file mode 100644 index 0000000..aa97fca --- /dev/null +++ b/influx.py @@ -0,0 +1,335 @@ +# ***************************************************************************** +# Copyright (c) 2024 ff by the module authors +# +# This program is free software; you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free Software +# Foundation; either version 2 of the License, or (at your option) any later +# version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more +# details. +# +# You should have received a copy of the GNU General Public License along with +# this program; if not, write to the Free Software Foundation, Inc., +# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# +# Module authors: +# Konstantin Kholostov +# +# ***************************************************************************** +import re +import time +from math import floor, ceil, copysign +from datetime import datetime +import pandas as pd + +from influxdb_client import InfluxDBClient, BucketRetentionRules, Point +from influxdb_client.client.write_api import SYNCHRONOUS as write_option + +OFFSET, SCALE = pd.to_datetime(['1970-01-01 00:00:00', '1970-01-01 00:00:01']).astype(int) +DAY = 24 * 3600 +# write_precision from digits after decimal point +TIME_PRECISION = ['s'] + ['ms'] * 3 + ['us'] * 3 + ['ns'] * 3 + + +def identity(v): + return v + + +def negnull2none(v): + """converts -0.0 to None, returns argument for everything else, also strings""" + return None if v == 0 and copysign(1, v) == -1 else v + + +def wildcard_filter(key, names): + patterns = [] + for name in names: + patterns.append('[^.]*'.join(re.escape(v) for v in name.split('*'))) + pattern = '|'.join(patterns) + return f'|> filter(fn:(r) => r.{key} =~ /^({pattern})$/)' + + +class CurveDict(dict): + def __missing__(self, key): + return [] + + +class NamedTuple(tuple): + """for our purpose improved version of collection.namedtuple + + - names may be any string, but when not an identifer, no attribute access + - dict like access with [] + - customized converter (or validator) function for initialization + + Usage: + + MyNamedTuple1 = NamedTuple('a', 'b') + x = MyNamedTuple1(1, 'y') + assert x == (1, 'y') == (x.a, x.b) + MyNamedTuple2 = NamedTuple(a=str, b=float) + y = MyNamedTuple2(10, b='2') + assert y == ('10', 2) == (y['a'], y['b']) + """ + _indices = None + _converters = None + + def __new__(cls, *args, **kwds): + if cls is NamedTuple: + return cls.getcls(dict({a: identity for a in args}, **kwds)) + values = dict(zip(cls._converters, args), **kwds) + elements = [] + for key, cvt in cls._converters.items(): + try: + elements.append(cvt(values[key])) + except KeyError: + elements.append(None) + return super().__new__(cls, elements) + + def __getitem__(self, key): + try: + return tuple(self)[key] + except Exception: + return tuple(self)[self._indices[key]] + + @classmethod + def getcls(cls, converters): + attributes = {'_converters': converters, + '_indices': {k: i for i, k in enumerate(converters)}} + for idx, name in enumerate(converters): + if name.isidentifier(): + attributes[name] = property(lambda s, i=idx: s[i]) + return type('NamedTuple', (cls,), attributes) + + +def abs_range(start=None, stop=None, interval=None): + now = time.time() + if start is None: + start = int(now - 32 * DAY) + elif start < 366 * DAY: + start = int(now + start) + if stop is None: + stop = int(now + DAY) + elif stop < 366 * DAY: + stop = ceil(now + stop) + return start, stop + + +def round_range(start, stop, interval=None): + interval = max(1, interval or 0) + start = floor(floor(start) // interval * interval) + stop = ceil(ceil(stop // interval) * interval) + return start, stop + + +class InfluxDBWrapper: + """Wrapper for InfluxDB API 2.0. + + based on nicos.services.cache.database.influxdb + (work of Konstantin Kholostov ) + """ + + def __init__(self, url, token, org, bucket, create=False, watch_interval=300): + self._watch_interval = watch_interval + self._update_queue = [] + self._url = url + self._token = token + self._org = org + self._bucket = bucket + self._client = InfluxDBClient(url=self._url, token=self._token, + org=self._org) + self._write_api_write = self._client.write_api(write_options=write_option).write + self._deadline = 0 + self._active_streams = {} + self.set_time_precision(3) + self.add_new_bucket(self._bucket, create) + + def set_time_precision(self, digits): + self.timedig = max(0, min(digits, 9)) + self._write_precision = TIME_PRECISION[self.timedig] + + def to_timestamp(self, timevalue): + return round(timevalue.timestamp(), self.timedig) + + def disconnect(self): + for stream, last in self._active_streams.items(): + self._update_queue.append(Point('_streams_') + .time(last, write_precision=self._write_precision) + .field('interval', 0).tag('stream', stream)) + self.flush() + self._client.close() + + def get_bucket_names(self): + bucket_names = [] + buckets = self._client.buckets_api().find_buckets().buckets + for bucket in buckets: + bucket_names.append(bucket.name) + return bucket_names + + def add_new_bucket(self, bucket_name, create): + if bucket_name not in self.get_bucket_names(): + if not create: + raise ValueError(f'unknown bucket {bucket_name}') + retention_rules = BucketRetentionRules(type='expire', + every_seconds=0) + self._client.buckets_api().create_bucket( + bucket_name=bucket_name, + retention_rules=retention_rules, org=self._org) + + def get_measurements(self): + return [r['_value'] for t in self._client.query_api().query(f""" + import "influxdata/influxdb/schema" + schema.measurements(bucket: "{self._bucket}")""") for r in t] + + def delete_measurement(self, measurement): + delete_api = self._client.delete_api() + delete_api.delete('1970-01-01T00:00:00Z', '2038-01-01T00:00:00Z', f'_measurement="{measurement}"', + bucket=self._bucket, org=self._org) + + def delete_all_measurements(self): + all = self.get_measurements() + for meas in all: + self.get_measurements(meas) + print('deleted', all) + + def flush(self): + points = self._update_queue + if points: + self._update_queue = [] + self._write_api_write(bucket=self._bucket, record=points) + + def query(self, start=None, stop=None, interval=None, last=False, **tags): + """Returns queried data as InfluxDB tables + + :param start: start time. default is a month ago + :param stop: end time, default is tomorrow at the same time + :param interval: is set an aggregation filter will be applied. This will + return only the latest values for a time interval in seconds. + :param last: when True, only the last value within the interval is returned + (for any existing combinations of tags!) + :param tags: selection criteria: + =None + return records independent of the tag. the value will be contained in the result dicts key + =[, , ...] + return records with given tag from the list. the value is contained in the result dicts key + >=: + return only records with given tag matching . the value is not part of the results key + = + where is a callable. return tag as value in returned rows. is used for value conversion + + :return: a dict of list of > + where and are NamedTuple + """ + self.flush() + start, stop = round_range(*abs_range(start, stop), interval) + msg = [f'from(bucket:"{self._bucket}")', + f'|> range(start: {start}, stop: {stop})'] + + columns = {'_time': self.to_timestamp, '_value': negnull2none} + keynames = [] + for key, crit in tags.items(): + if crit is None: + keynames.append(key) + continue + if callable(crit): + columns[key] = crit + continue + if isinstance(crit, str): + if '*' in crit: + keynames.append(key) + msg.append(wildcard_filter(key, [crit])) + continue + crit = f'"{crit}"' + elif not isinstance(crit, (int, float)): + try: + keynames.append(key) + msg.append(wildcard_filter(key, crit)) + continue + except Exception: + raise ValueError(f'illegal value for {key}: {crit}') + msg.append(f'|> filter(fn:(r) => r.{key} == {crit})') + + if last: + msg.append('|> last(column: "_time")') + if interval: + msg.append(f'|> aggregateWindow(every: {interval}s, fn: last, createEmpty: false)') + if columns is not None: + msg.append(f'''|> keep(columns:["{'","'.join(list(columns) + keynames)}"])''') + msg = '\n'.join(msg) + print(msg) + tables = self._client.query_api().query(msg) + result = CurveDict() + keycls = NamedTuple(*keynames) + colcls = NamedTuple(**columns) + for table in tables: + key = None + for rec in table: + print(rec.values) + if key is None: + key = keycls(**rec.values) + result.setdefault(key, []) + data = colcls(**rec.values) + result[key].append(data) + result[key].sort(key=lambda v: v[0]) + print('---') + return result + + def curves(self, start=None, stop=None, measurement=None, field='float', interval=None, + add_prev=True, **tags): + + for key, val in zip(('_measurement', '_field'), (measurement, field)): + tags.setdefault(key, val) + start, stop = abs_range(start, stop) + rstart, rstop = round_range(start, stop, interval) + if rstart < rstop: + result = self.query(rstart, rstop, interval, **tags) + else: + result = {} + if add_prev: + prev = self.query(rstart - DAY, rstart, last=True, **tags) + for key, prev in prev.items(): + curve = result.get(key) + first = prev[-1] + if first[1] is not None: + if curve: + if first[0] < curve[0][0]: + curve.insert(0, first) + else: + result[key] = [first] + return result + + def write(self, measurement, field, value, ts, **tags): + self._active_streams[tags.get('stream')] = ts + if ts > self._deadline: + dl = ts // self._watch_interval * self._watch_interval + for stream, last in self._active_streams.items(): + self._update_queue.append( + Point('_streams_') + .time(datetime.utcfromtimestamp(last), write_precision=self._write_precision) + .field('interval', self._watch_interval).tag('stream', stream)) + self._active_streams.clear() + self._deadline = dl + self._watch_interval + point = Point(measurement).field(f'{field}', value) + if ts: + point.time(datetime.utcfromtimestamp(ts), write_precision=self._write_precision) + for key, val in tags.items(): + point.tag(key, val) + self._update_queue.append(point) + if len(self._update_queue) > 0: # 100 + self.flush() + + def write_float(self, measurement, field, value, ts, **tags): + # make sure value is float + value = -0.0 if value is None else float(value) + self.write(measurement, field, value, ts, **tags) + + def write_string(self, measurement, field, value, ts, **tags): + # make sure value is string + value = '' if value is None else str(value) + self.write(measurement, field, value, ts, **tags) + + +def testdb(): + token = "zqDbTcMv9UizfdTj15Fx_6vBetkM5mXN56EE9CiDaFsh7O2FFWZ2X4VwAAmdyqZr3HbpIr5ixRju07-oQmxpXw==" + return InfluxDBWrapper('http://pc16392:8086', token, 'linse', 'curve-test') diff --git a/nicoscache.py b/nicoscache.py new file mode 100644 index 0000000..f502f1d --- /dev/null +++ b/nicoscache.py @@ -0,0 +1,171 @@ +import re +from ast import literal_eval +from streams import Stream +from secop import EnumConvert + + +OP_TELL = '=' +OP_ASK = '?' +OP_WILDCARD = '*' +OP_SUBSCRIBE = ':' +OP_UNSUBSCRIBE = '|' +OP_TELLOLD = '!' +OP_LOCK = '$' +OP_REWRITE = '~' + +OP_LOCK_LOCK = '+' +OP_LOCK_UNLOCK = '-' + +# put flags between key and op... +FLAG_NO_STORE = '#' + +# end/sync special token +END_MARKER = '###' +SYNC_MARKER = '#sync#' +PING = '#ping#' + +# Time constant +CYCLETIME = 0.1 + + +opkeys = OP_TELL + OP_ASK + OP_WILDCARD + OP_SUBSCRIBE + OP_UNSUBSCRIBE + \ + OP_TELLOLD + OP_LOCK + OP_REWRITE + +# regular expression matching a cache protocol message +msg_pattern = re.compile(r''' + ^ (?: + \s* (?P