From 335d0a507803e88e77fe56f23cfb0a0a242de2ce Mon Sep 17 00:00:00 2001 From: l_samenv Date: Fri, 28 Mar 2025 11:37:11 +0100 Subject: [PATCH] strip leading underscore from secop parameter names + other fixes and debugging prints --- feeder.py | 5 +++-- secop.py | 14 ++++++++------ seinflux.py | 35 ++++++++++++++++++++++------------- streams.py | 6 ++++-- 4 files changed, 37 insertions(+), 23 deletions(-) diff --git a/feeder.py b/feeder.py index eae3081..39b3d93 100644 --- a/feeder.py +++ b/feeder.py @@ -16,13 +16,14 @@ def main(): fm = FrappyManager() fm.get_info() host = socket.gethostname().split('.')[0] - for ins, procs in fm.get_procs().items(): + cfginfo = {} + for ins, procs in fm.get_procs(cfginfo=cfginfo).items(): for service in procs: if service in procs: port = fm.info.get(ins, {}).get(service, {}) if port: uri = f'{host}:{port}' - print('CREATE', uri) + print('CREATE', uri, cfginfo.get((ins, service))) TrySecopConnect(uri) event_map = {'value': db.add_float, 'error': db.add_error, 'stream': db.add_stream} diff --git a/secop.py b/secop.py index d4d852f..7f2593d 100644 --- a/secop.py +++ b/secop.py @@ -54,7 +54,7 @@ class SecopStream(Stream): self.device = self.device[:-7] self.tags['device'] = self.device self.modules = self.descr['modules'] - self.convert = {} + self.param_info = {} self.tags_dict = TagsDict(self.tags) for mod, moddesc in self.modules.items(): for key in ('_original_id', 'original_id'): @@ -62,10 +62,12 @@ class SecopStream(Stream): if value: self.tags_dict[mod] = dict(self.tags, device=value) break - for param, desc in moddesc['accessibles'].items(): + parameters = moddesc['accessibles'] + for param, desc in parameters.items(): dt = desc['datainfo'] if dt['type'] in ('double', 'int', 'enum'): - self.convert[mod, param] = float + stripped = param[1:] if param.startswith('_') else param + self.param_info[mod, param] = float, (mod, param if stripped in parameters else stripped) self.send('activate') def ping(self): @@ -81,9 +83,9 @@ class SecopStream(Stream): if match: cmd, ident, data = match.groups() mod, _, param = ident.partition(':') - key = (mod, param or 'value') - cvt = self.convert.get(key) - if cvt: + cvt_key = self.param_info.get((mod, param or 'value')) + if cvt_key: + cvt, key = cvt_key data = json.loads(data) tags = self.tags_dict[key[0]] if cmd == 'error_update': diff --git a/seinflux.py b/seinflux.py index 6300492..455ccaf 100644 --- a/seinflux.py +++ b/seinflux.py @@ -218,7 +218,8 @@ class SEHistory(InfluxDBWrapper): for row in table: all_entries.setdefault(stream, []).append((row, table.tags)) result = {} - for stream, entries in all_entries.items(): + for stream in sorted(all_entries): + entries = all_entries[stream] current = None instrument = None for entry in sorted(entries, key=lambda e: e[0][0]): @@ -307,9 +308,8 @@ class SEHistory(InfluxDBWrapper): else: device = None devices.pop(device, None) - devcombi = tuple(zip(*devices.items())) + devcombi = tuple(zip(*sorted(devices.items()))) if devcombi != combi: - print('D', devcombi, fmtime(ts)) if combi: prevend = min(ts, chunk[1]) if prevend - chunk[0] < gap: @@ -322,9 +322,7 @@ class SEHistory(InfluxDBWrapper): # merge when started at the same day chunk = chunks[-1][1] chunk[1] = eternity - print('EXTEND', devcombi, fmtime(ts)) else: - print('APPEND', devcombi, fmtime(ts)) chunk = [ts, eternity] chunks.append(chunk) else: @@ -369,14 +367,8 @@ class SEHistory(InfluxDBWrapper): flag = bool(value) try: previns, prevts = self.get_instrument(stream, ts, **tags) - if prevts is None: - if not flag: - return # no change - else: - if previns == value and flag: - return # no change - if previns is None or ts < prevts: - ts = prevts + 0.001 + if prevts is not None and (previns is None or (ts or 1e10) < prevts): + ts = prevts + 0.001 except Exception as e: print(f'Exception in get_instrument {e!r}') tags['stream'] = stream @@ -384,5 +376,22 @@ class SEHistory(InfluxDBWrapper): tags['instrument'] = value self._add_point('_stream_', 'on', flag, ts, tags) + def remove_experiment(self, stream, ts=None, **tags): + if ts is not None: + ts += 1 + reply = self.query(None, ts, _measurement='_stream_', _field='on', + stream=stream, single=1, **tags) + tagset = set(tags) | {'stream'} + tagset.discard('instrument') + for table in reply.values(): + ts, flag = table[-1][:2] + if flag: + addtags = {k: v for k, v in table.tags.items() + if k not in {'instrument', '_measurement', '_field'}} + print(ts, addtags) + self._add_point('_stream_', 'on', False, ts + 0.001, + addtags) + self.flush() + def add_stream(self, value, tags, key, ts): self.set_instrument(key, value, ts, **tags) diff --git a/streams.py b/streams.py index dd00bd3..02c4478 100644 --- a/streams.py +++ b/streams.py @@ -74,7 +74,7 @@ class Stream(Base): self.connect() self.init(**kwds) except Exception as e: - print('I', self.uri, repr(e)) + print('FAIL', self.uri, repr(e)) raise def connect(self): @@ -276,10 +276,12 @@ class EventStream: while 1: for stream in self.wait_ready(1): if not isinstance(stream, Stream): + # stream is a UdpStream for streamcls, uri, kwargs in stream.events(): stream = self.streams.get(uri) if stream: stream.tags.update(kwargs) + print('update stream', uri, kwargs) else: try: self.streams[uri] = stream = streamcls(uri, **kwargs) @@ -288,7 +290,7 @@ class EventStream: print('can not connect to', uri, repr(e), streamcls) continue device = stream.tags.get('device') - events.append(('stream', kwargs.get('instrument', '0'), + events.append(('stream', kwargs.get('instrument'), {'device': device}, stream.uri, int(time.time()))) for name, stream in self.streams.items(): try: