rework of the server side

main change: the same server may be used for several instruments

- client classes are 'Interactors' dealing with the parameters etc.
- methods from history are added to the clients
+ improvements on the js client side
This commit is contained in:
l_samenv
2025-03-19 08:14:06 +01:00
parent b8ac8f8bb5
commit 958e472f3b
8 changed files with 445 additions and 292 deletions

View File

@ -3,20 +3,19 @@ import logging
import json
import io
import uuid
from configparser import ConfigParser
# from configparser import ConfigParser
from math import ceil
from sehistory.seinflux import SEHistory
from sehistory.seinflux import fmtime
from colors import assign_colors_to_curves
from chart_config import ChartConfig
from base import Instrument, get_abs_time
from secop import SecopClient, SecopInstrument
from base import get_abs_time, HandlerBase
def split_tags(tags):
return {k: v.split(',') for k, v in tags.items()}
class InfluxGraph:
class InfluxGraph(HandlerBase):
"""Class implementing the logic of the different routes that are called by the client to retrieve graph data with InfluxDB.
Global constants :
@ -43,22 +42,37 @@ class InfluxGraph:
ACTUAL = 1
LIVE = 2
def __init__(self, instrument):
self.instrument = instrument
self.db = instrument.db
instrument_name = instrument.title
def __init__(self, server, instrument, device_name, tags):
"""create instance for retrieving history
:param db: a database client (SEInflux instance)
:param instrument: the name of anm instrument or None
:param streams: a stream or comma separated list of streams
:param devices: a device name ar a comma separated list of devices
:param device_name: (comma separated) device name for labelling
typically only one of the 3 last parameters are needed
if more are specified, all of them must be fulfilled
"""
super().__init__() # put methods w_... to handlers
self.handlers['graphpoll'] = self.graphpoll
self.server = server
self.db = server.db
# self.influx_data_getter = influx_data_getter
self.chart_configs = [ChartConfig("./config/generic.ini")]
try:
self.chart_configs.append(ChartConfig(f"./config/{instrument_name}.ini"))
except KeyError:
pass
self.instrument = instrument
self.device_name = device_name
if instrument: # TODO: should it not be better to have inifiles per device?
try:
self.chart_configs.append(ChartConfig(f"./config/{instrument}.ini"))
except KeyError:
pass
self.livemode = self.HISTORICAL
self.last_values = {} # dict <variable> of last known point (<time>, <value>)
self.last_time = {} # dict <stream> of last received time
self.last_minute = 0
self.last_update = 0 # time of last call with a result
self.tags = {} # tags for query (determines device and/or server)
self.tags = None
self.init_tags = tags
def w_graph(self, variables, time="-1800,0", interval=None):
"""Get the curves given by variables in the time range "time"
@ -84,14 +98,12 @@ class InfluxGraph:
logging.info('LIVE %g %g %d %d', end, now, end >= now, self.livemode)
if interval:
interval = float(interval)
print('CURVES', start - now, end - now, self.tags)
result = self.db.curves(start, end, queried_variables, merge='_measurement',
interval=interval or None, **self.tags)
print('LEN', len(result))
self.update_last(result)
self.db.complete(result, self.last_time, 'stream')
self.last_minute = now // 60
return dict(type='graph-draw', graph={k: v for k, v in result.items()})
return dict(type='graph-draw', graph={k: result[k] for k in queried_variables if k in result})
def update_last(self, curve_dict):
"""update last values per variable and last time per stream"""
@ -118,7 +130,7 @@ class InfluxGraph:
return dict(type='time', time=get_abs_time(
[float(t) for t in time.split(',')]))
def w_getvars(self, time, userconfiguration=None, instrument=None, **tags):
def w_getvars(self, time, userconfiguration=None, **_):
"""Get the curve names available at a given point in time
with a possible user configuration on the client side.
@ -145,22 +157,17 @@ class InfluxGraph:
if userconfiguration is not None:
userconfiguration = json.loads(userconfiguration)
self.tags = split_tags(tags)
if instrument:
self.tags['stream'] = list(self.db.get_streams(instrument))
print('TAGS', self.tags)
if self.instrument:
streams, tags, self.device_name = self.server.lookup_streams(self.instrument, **self.init_tags)
self.tags = {**self.init_tags, **tags}
blocks = self.get_available_variables(start_time, end_time, self.chart_configs, userconfiguration)
device_name = self.tags.get('device', '<unknown>')
# initialize self.last_values to keep track of the available variables
self.last_values = {var["name"]: [0, None] for block in blocks for var in block["curves"]}
assign_colors_to_curves(blocks)
result = dict(type='var_list')
result['blocks'] = blocks
result['device'] = device_name
# print('DEVICE', device_name, tags)
# for block in blocks:
# print(block['tag'], [c['name'] for c in block['curves']])
return result
return {'type': 'var_list', 'blocks': blocks, 'device': self.device_name}
def get_available_variables(self, start_time, end_time, chart_configs=None, user_config=None):
"""Gets the available variables
@ -182,15 +189,16 @@ class InfluxGraph:
Returns :
[{"tag":(str), "unit":(str), "curves":[{"name":(str), "label":(str), "color":(str)}]}] :
a list of dictionnaries, each one representing
a list of dicts, each one representing
a block of curves with their name, their label and their color to display,
grouped by their category if given or unit (in tag).
"""
if start_time == end_time:
start_time = end_time - 3600
result = self.db.curves(start_time, end_time, _measurement=None, _field='float', **self.tags)
result = self.db.curves(start_time, end_time, _measurement=None,
merge='_measurement', **self.tags)
assert all(c.key_names[0] == '_measurement' for c in result.values())
variables = {k[0] for k in result}
variables = {k: t.tags.get('unit') for k, t in result.items()}
config = {}
if chart_configs:
for chart_config in chart_configs:
@ -215,38 +223,59 @@ class InfluxGraph:
cat = '*'
elif not cat:
return
unit = unit or '1'
tag = cat.replace('*', unit)
grp = groups.get(tag)
if grp is None:
curves = []
groups[tag] = {'tag': cat.replace('*', unit), 'unit': unit, 'curves': curves}
crv_dict = {}
groups[tag] = {'tag': cat.replace('*', unit), 'unit': unit, 'curves': crv_dict}
else:
curves = grp['curves']
curves.append({'name': name, 'unit': unit, 'label': label or name})
crv_dict = grp['curves']
crv_dict[name] = {'name': name, 'unit': unit, 'label': label or name}
# treat variables in config first (in their order!)
result = {}
for key, cfg in config.items():
cat = cfg.pop('cat', None)
unit = cfg.get('unit', '1')
cfgunit = cfg.pop('unit', '')
if '.' in key:
if key in variables:
add_to_groups(key, cat, **cfg)
variables.discard(key)
unit = variables.pop(key, object)
if unit is not object:
add_to_groups(key, cat, cfgunit or unit, **cfg)
else:
var = f'{key}.value'
if var in variables:
unit = variables.pop(var, object)
if unit is not object:
label = cfg.pop('label', None) or key
add_to_groups(var, cat, label=label, **cfg)
variables.discard(var)
add_to_groups(var, cat, cfgunit or unit, label=label, **cfg)
var = f'{key}.target'
if var in variables:
unit = variables.pop(var, object)
if unit is not object:
cfg.pop('color', None)
add_to_groups(var, cat, **cfg)
variables.discard(var)
for var in variables:
add_to_groups(var)
return list(groups.values())
add_to_groups(var, cat, cfgunit or unit, **cfg)
for var, unit in variables.items():
add_to_groups(var, unit=unit)
# make order a bit more common
result = []
for key in ['K', 'T', 'W', 'ln/min'] + list(groups):
if key in groups:
group = groups.pop(key)
curve_dict = group['curves']
curves = []
# get first '.value' parameters and add targets if available
ordered_keys = [f'{m}.value' for m in ('tt', 'T', 'ts', 'Ts')]
for name in ordered_keys + list(curve_dict):
if name.endswith('.value'):
try:
curves.append(curve_dict.pop(name))
curves.append(curve_dict.pop(f'{name[:-6]}.target'))
except KeyError:
pass # skip not existing or already removed items
# add remaining curves
curves.extend(curve_dict.values())
print(key, curves)
group['curves'] = curves
result.append(group)
return result
def w_updategraph(self):
"""Set the current visualisation mode to LIVE if not in HISTORICAL mode.
@ -348,17 +377,18 @@ class InfluxGraph:
return dict(type='graph-update', time=last_time, graph=result)
return None
class InfluxInstrument(Instrument):
def __init__(self, instr_name, inst_config=None):
super().__init__()
self.db = InfluxDB()
# self.influx_data_getter = InfluxDataGetter(self.db, instr_name)
self.title = instr_name
self.device = self.influx_data_getter.get_device_name(int(current_time()))
def new_client(self):
return self.register(InfluxClient(self))
# class InfluxInstrument(HandlerBase):
#
# def __init__(self, instr_name, inst_config=None):
# super().__init__()
# self.db = InfluxDB()
# # self.influx_data_getter = InfluxDataGetter(self.db, instr_name)
# self.title = instr_name
# self.device = self.influx_data_getter.get_device_name(int(current_time()))
#
# def new_client(self):
# return self.register(InfluxClient(self))
class InfluxParams:
@ -383,53 +413,50 @@ class InfluxParams:
return dict(type='accept-command')
class InfluxClient(InfluxParams, InfluxGraph):
def __init__(self, instrument):
InfluxParams.__init__(self)
InfluxGraph.__init__(self, instrument)
def poll(self):
messages = self.queue
self.queue = []
msg = self.graphpoll()
if msg:
messages.append(msg)
return messages
class SecopInfluxClient(SecopClient, InfluxGraph):
def __init__(self, instrument):
SecopClient.__init__(self, instrument)
InfluxGraph.__init__(self, instrument)
def poll(self):
messages = super().poll()
msg = self.graphpoll()
if msg:
messages.append(msg)
return messages
class SecopInfluxInstrument(SecopInstrument):
def __init__(self, inst_name, instrument_config):
super().__init__(inst_name, instrument_config)
config = ConfigParser()
config.optionxform = str
config.read("./config/influx.ini")
section = config["INFLUX"]
self.db = SEHistory()
# self.db = InfluxDBWrapper(uri=section["url"], token=section["token"],
# org=section["org"], bucket=section['bucket'])
# self.influx_data_getter = InfluxDataGetter(self.db, inst_name)
# self.device = self.influx_data_getter.get_device_name(int(current_time()))
def new_client(self):
return self.register(SecopInfluxClient(self))
def get_streams(self, timestamp=None):
return self.db.get_streams(None, timestamp)
def get_experiments(self, start=None, stop=None):
return self.db.get_experiments(start, stop)
# class InfluxClient(InfluxParams, InfluxGraph):
# def __init__(self, instrument):
# InfluxParams.__init__(self)
# InfluxGraph.__init__(self, instrument)
#
# def poll(self):
# messages = self.queue
# self.queue = []
# msg = self.graphpoll()
# if msg:
# messages.append(msg)
# return messages
#
#
# class SecopInfluxClient(SecopClient, InfluxGraph):
# def __init__(self, instrument):
# SecopClient.__init__(self, instrument)
# InfluxGraph.__init__(self, instrument)
#
# def poll(self):
# messages = super().poll()
# msg = self.graphpoll()
# if msg:
# messages.append(msg)
# return messages
#
#
# class SecopInfluxInstrument(SecopInstrument):
#
# def __init__(self, inst_name, instrument_config):
# super().__init__(inst_name, instrument_config)
# config = ConfigParser()
# config.optionxform = str
# config.read("./config/influx.ini")
# section = config["INFLUX"]
# self.db = SEHistory()
# # self.db = InfluxDBWrapper(uri=section["url"], token=section["token"],
# # org=section["org"], bucket=section['bucket'])
# # self.influx_data_getter = InfluxDataGetter(self.db, inst_name)
# # self.device = self.influx_data_getter.get_device_name(int(current_time()))
#
# def get_streams(self, timestamp=None):
# return self.db.get_streams(None, timestamp)
#
# def get_experiments(self, start=None, stop=None):
# return self.db.get_experiments(start, stop)