strip leading underscore from secop parameter names
+ other fixes and debugging prints
This commit is contained in:
@ -16,13 +16,14 @@ def main():
|
|||||||
fm = FrappyManager()
|
fm = FrappyManager()
|
||||||
fm.get_info()
|
fm.get_info()
|
||||||
host = socket.gethostname().split('.')[0]
|
host = socket.gethostname().split('.')[0]
|
||||||
for ins, procs in fm.get_procs().items():
|
cfginfo = {}
|
||||||
|
for ins, procs in fm.get_procs(cfginfo=cfginfo).items():
|
||||||
for service in procs:
|
for service in procs:
|
||||||
if service in procs:
|
if service in procs:
|
||||||
port = fm.info.get(ins, {}).get(service, {})
|
port = fm.info.get(ins, {}).get(service, {})
|
||||||
if port:
|
if port:
|
||||||
uri = f'{host}:{port}'
|
uri = f'{host}:{port}'
|
||||||
print('CREATE', uri)
|
print('CREATE', uri, cfginfo.get((ins, service)))
|
||||||
TrySecopConnect(uri)
|
TrySecopConnect(uri)
|
||||||
|
|
||||||
event_map = {'value': db.add_float, 'error': db.add_error, 'stream': db.add_stream}
|
event_map = {'value': db.add_float, 'error': db.add_error, 'stream': db.add_stream}
|
||||||
|
14
secop.py
14
secop.py
@ -54,7 +54,7 @@ class SecopStream(Stream):
|
|||||||
self.device = self.device[:-7]
|
self.device = self.device[:-7]
|
||||||
self.tags['device'] = self.device
|
self.tags['device'] = self.device
|
||||||
self.modules = self.descr['modules']
|
self.modules = self.descr['modules']
|
||||||
self.convert = {}
|
self.param_info = {}
|
||||||
self.tags_dict = TagsDict(self.tags)
|
self.tags_dict = TagsDict(self.tags)
|
||||||
for mod, moddesc in self.modules.items():
|
for mod, moddesc in self.modules.items():
|
||||||
for key in ('_original_id', 'original_id'):
|
for key in ('_original_id', 'original_id'):
|
||||||
@ -62,10 +62,12 @@ class SecopStream(Stream):
|
|||||||
if value:
|
if value:
|
||||||
self.tags_dict[mod] = dict(self.tags, device=value)
|
self.tags_dict[mod] = dict(self.tags, device=value)
|
||||||
break
|
break
|
||||||
for param, desc in moddesc['accessibles'].items():
|
parameters = moddesc['accessibles']
|
||||||
|
for param, desc in parameters.items():
|
||||||
dt = desc['datainfo']
|
dt = desc['datainfo']
|
||||||
if dt['type'] in ('double', 'int', 'enum'):
|
if dt['type'] in ('double', 'int', 'enum'):
|
||||||
self.convert[mod, param] = float
|
stripped = param[1:] if param.startswith('_') else param
|
||||||
|
self.param_info[mod, param] = float, (mod, param if stripped in parameters else stripped)
|
||||||
self.send('activate')
|
self.send('activate')
|
||||||
|
|
||||||
def ping(self):
|
def ping(self):
|
||||||
@ -81,9 +83,9 @@ class SecopStream(Stream):
|
|||||||
if match:
|
if match:
|
||||||
cmd, ident, data = match.groups()
|
cmd, ident, data = match.groups()
|
||||||
mod, _, param = ident.partition(':')
|
mod, _, param = ident.partition(':')
|
||||||
key = (mod, param or 'value')
|
cvt_key = self.param_info.get((mod, param or 'value'))
|
||||||
cvt = self.convert.get(key)
|
if cvt_key:
|
||||||
if cvt:
|
cvt, key = cvt_key
|
||||||
data = json.loads(data)
|
data = json.loads(data)
|
||||||
tags = self.tags_dict[key[0]]
|
tags = self.tags_dict[key[0]]
|
||||||
if cmd == 'error_update':
|
if cmd == 'error_update':
|
||||||
|
33
seinflux.py
33
seinflux.py
@ -218,7 +218,8 @@ class SEHistory(InfluxDBWrapper):
|
|||||||
for row in table:
|
for row in table:
|
||||||
all_entries.setdefault(stream, []).append((row, table.tags))
|
all_entries.setdefault(stream, []).append((row, table.tags))
|
||||||
result = {}
|
result = {}
|
||||||
for stream, entries in all_entries.items():
|
for stream in sorted(all_entries):
|
||||||
|
entries = all_entries[stream]
|
||||||
current = None
|
current = None
|
||||||
instrument = None
|
instrument = None
|
||||||
for entry in sorted(entries, key=lambda e: e[0][0]):
|
for entry in sorted(entries, key=lambda e: e[0][0]):
|
||||||
@ -307,9 +308,8 @@ class SEHistory(InfluxDBWrapper):
|
|||||||
else:
|
else:
|
||||||
device = None
|
device = None
|
||||||
devices.pop(device, None)
|
devices.pop(device, None)
|
||||||
devcombi = tuple(zip(*devices.items()))
|
devcombi = tuple(zip(*sorted(devices.items())))
|
||||||
if devcombi != combi:
|
if devcombi != combi:
|
||||||
print('D', devcombi, fmtime(ts))
|
|
||||||
if combi:
|
if combi:
|
||||||
prevend = min(ts, chunk[1])
|
prevend = min(ts, chunk[1])
|
||||||
if prevend - chunk[0] < gap:
|
if prevend - chunk[0] < gap:
|
||||||
@ -322,9 +322,7 @@ class SEHistory(InfluxDBWrapper):
|
|||||||
# merge when started at the same day
|
# merge when started at the same day
|
||||||
chunk = chunks[-1][1]
|
chunk = chunks[-1][1]
|
||||||
chunk[1] = eternity
|
chunk[1] = eternity
|
||||||
print('EXTEND', devcombi, fmtime(ts))
|
|
||||||
else:
|
else:
|
||||||
print('APPEND', devcombi, fmtime(ts))
|
|
||||||
chunk = [ts, eternity]
|
chunk = [ts, eternity]
|
||||||
chunks.append(chunk)
|
chunks.append(chunk)
|
||||||
else:
|
else:
|
||||||
@ -369,13 +367,7 @@ class SEHistory(InfluxDBWrapper):
|
|||||||
flag = bool(value)
|
flag = bool(value)
|
||||||
try:
|
try:
|
||||||
previns, prevts = self.get_instrument(stream, ts, **tags)
|
previns, prevts = self.get_instrument(stream, ts, **tags)
|
||||||
if prevts is None:
|
if prevts is not None and (previns is None or (ts or 1e10) < prevts):
|
||||||
if not flag:
|
|
||||||
return # no change
|
|
||||||
else:
|
|
||||||
if previns == value and flag:
|
|
||||||
return # no change
|
|
||||||
if previns is None or ts < prevts:
|
|
||||||
ts = prevts + 0.001
|
ts = prevts + 0.001
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f'Exception in get_instrument {e!r}')
|
print(f'Exception in get_instrument {e!r}')
|
||||||
@ -384,5 +376,22 @@ class SEHistory(InfluxDBWrapper):
|
|||||||
tags['instrument'] = value
|
tags['instrument'] = value
|
||||||
self._add_point('_stream_', 'on', flag, ts, tags)
|
self._add_point('_stream_', 'on', flag, ts, tags)
|
||||||
|
|
||||||
|
def remove_experiment(self, stream, ts=None, **tags):
|
||||||
|
if ts is not None:
|
||||||
|
ts += 1
|
||||||
|
reply = self.query(None, ts, _measurement='_stream_', _field='on',
|
||||||
|
stream=stream, single=1, **tags)
|
||||||
|
tagset = set(tags) | {'stream'}
|
||||||
|
tagset.discard('instrument')
|
||||||
|
for table in reply.values():
|
||||||
|
ts, flag = table[-1][:2]
|
||||||
|
if flag:
|
||||||
|
addtags = {k: v for k, v in table.tags.items()
|
||||||
|
if k not in {'instrument', '_measurement', '_field'}}
|
||||||
|
print(ts, addtags)
|
||||||
|
self._add_point('_stream_', 'on', False, ts + 0.001,
|
||||||
|
addtags)
|
||||||
|
self.flush()
|
||||||
|
|
||||||
def add_stream(self, value, tags, key, ts):
|
def add_stream(self, value, tags, key, ts):
|
||||||
self.set_instrument(key, value, ts, **tags)
|
self.set_instrument(key, value, ts, **tags)
|
||||||
|
@ -74,7 +74,7 @@ class Stream(Base):
|
|||||||
self.connect()
|
self.connect()
|
||||||
self.init(**kwds)
|
self.init(**kwds)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print('I', self.uri, repr(e))
|
print('FAIL', self.uri, repr(e))
|
||||||
raise
|
raise
|
||||||
|
|
||||||
def connect(self):
|
def connect(self):
|
||||||
@ -276,10 +276,12 @@ class EventStream:
|
|||||||
while 1:
|
while 1:
|
||||||
for stream in self.wait_ready(1):
|
for stream in self.wait_ready(1):
|
||||||
if not isinstance(stream, Stream):
|
if not isinstance(stream, Stream):
|
||||||
|
# stream is a UdpStream
|
||||||
for streamcls, uri, kwargs in stream.events():
|
for streamcls, uri, kwargs in stream.events():
|
||||||
stream = self.streams.get(uri)
|
stream = self.streams.get(uri)
|
||||||
if stream:
|
if stream:
|
||||||
stream.tags.update(kwargs)
|
stream.tags.update(kwargs)
|
||||||
|
print('update stream', uri, kwargs)
|
||||||
else:
|
else:
|
||||||
try:
|
try:
|
||||||
self.streams[uri] = stream = streamcls(uri, **kwargs)
|
self.streams[uri] = stream = streamcls(uri, **kwargs)
|
||||||
@ -288,7 +290,7 @@ class EventStream:
|
|||||||
print('can not connect to', uri, repr(e), streamcls)
|
print('can not connect to', uri, repr(e), streamcls)
|
||||||
continue
|
continue
|
||||||
device = stream.tags.get('device')
|
device = stream.tags.get('device')
|
||||||
events.append(('stream', kwargs.get('instrument', '0'),
|
events.append(('stream', kwargs.get('instrument'),
|
||||||
{'device': device}, stream.uri, int(time.time())))
|
{'device': device}, stream.uri, int(time.time())))
|
||||||
for name, stream in self.streams.items():
|
for name, stream in self.streams.items():
|
||||||
try:
|
try:
|
||||||
|
Reference in New Issue
Block a user