Merge branch 'daniel' of https://gitlab.psi.ch/samenv/seweb into daniel

This commit is contained in:
Daniel
2025-03-19 09:51:19 +01:00
13 changed files with 821 additions and 485 deletions

77
base.py
View File

@ -1,14 +1,16 @@
import sys
import time
import logging
import uuid
ONEYEAR = 366 * 24 * 3600
def get_abs_time(times):
"""
Gets the absolute times for the given potential relative times. If a given timestamp is less than
one year, then the value is relative and converted into an absolute timestamp
"""Gets the absolute times for the given potential relative times.
If a given timestamp is less than one year, then the value is
relative (to now, rounded up to a full second) and converted
into an absolute timestamp
Parameters :
times([(float)]) : an array of unix timestamps or relative duration (< 1 year) as floats
@ -33,16 +35,63 @@ class Logger(object):
pass
class Instrument:
class HandlerBase:
def __init__(self):
self.clients = {}
self.handlers = {k[2:]: getattr(self, k) for k in dir(type(self)) if k.startswith('w_')}
def remove(self, client):
try:
del self.clients[client.id]
except KeyError:
logging.warning('client already removed %s', client.id)
def register(self, client):
self.clients[client.id] = client
return client
class Client(HandlerBase):
def __init__(self, server, streams, instrument_name, device_name):
super().__init__()
self.id = uuid.uuid4().hex[0:15]
self.nodes = {}
self.node_map = {}
if streams:
for uri in streams:
urisplit = uri.rsplit('://')
kind = urisplit[0] if len(urisplit) == 2 else 'secop'
node = server.interactor_classes[kind](uri, self.node_map)
self.nodes[uri] = node
self.server = server
self.instrument_name = instrument_name
self.device_name = device_name # do not know if this is needed
self.updates = {}
def poll(self):
updates = sum((n.get_updates() for n in self.nodes.values()), start=[])
result = [dict(type='update', updates=updates)] if updates else []
graph_updates = self.handlers.get('graphpoll', object)()
if graph_updates:
result.append(graph_updates)
return result
def w_getblock(self, path):
path = path.split(',')[-1] # TODO: why this?
if path == "main": # TODO: change to "-main-"?
components = []
for node in self.nodes.values():
node.add_main_components(components)
return dict(type='draw', path='main', title='modules', components=components)
node = self.node_map[path]
return dict(type='draw', path=path, title=path, components=node.get_components(path))
def w_updateblock(self, path):
if path == 'main': # TODO: change to "-main-"?
for node in self.nodes.values():
node.update_main()
else:
node = self.node_map[path]
node.update_params(path)
return dict(type='accept-block')
def w_console(self): # TODO: check if still used
return dict(type='accept-console')
def w_sendcommand(self, command):
for node in self.nodes.values():
if node.handle_command(command):
break
return dict(type='accept-command')
def info(self):
return ["na"]

View File

@ -1,4 +1,6 @@
from configparser import ConfigParser
class ChartConfig:
"""
Class that holds the chart section of a configuration file (for an instrument).
@ -6,14 +8,42 @@ class ChartConfig:
Attributes :
chart_config (Section) : the Section corresponding to the "chart" section in the given configuration file
"""
KEYS = ["cat", "color", "unit", "label"]
def __init__(self, path):
"""
Parameters :
path (str) : the path to the configuration file
"""
self.errors = {}
self.variables = {}
cfgp = ConfigParser(interpolation=None)
cfgp.optionxform = str
cfgp.read(path)
self.chart_config = cfgp["chart"]
section = cfgp["chart"]
for key, raw_value in section.items():
if len(key.split('.')) > 1:
self.errors[key] = f'illegal key: {key}'
continue
arguments = raw_value.split(",")
keyword_mode = False
config = {'cat': '*'}
for i, argument in enumerate(arguments):
argname, _, argvalue = argument.rpartition(':')
if argname:
keyword_mode = True
config[argname] = argvalue
else:
if keyword_mode:
self.errors[key] = f"positional arg after keywd arg: {key}={raw_value!r}"
else:
try:
if argvalue:
config[self.KEYS[i]] = argvalue
except Exception as e:
self.errors[key] = f"{e!r} in {key}={raw_value}"
self.variables[key] = config
def get_variable_parameter_config(self, key):
"""
@ -27,25 +57,4 @@ class ChartConfig:
The different options are in this dict if they are found in the chart section for the given key. Returns None if the key is not in the chart section,
or if there is a syntax problem for the given key.
"""
config = {}
positionnal = ["cat", "color", "unit"]
if key in self.chart_config.keys():
raw_value = self.chart_config[key]
arguments = raw_value.split(",")
keyword_mode = False
for i, argument in enumerate(arguments):
pieces = argument.split(":")
if len(pieces) == 2:
keyword_mode = True
if pieces[1] != "":
config[pieces[0]] = pieces[1]
else:
if not keyword_mode: #everything is going well
if pieces[0] != "":
config[positionnal[i]] = pieces[0]
else: #we cannot have a positionnal argument after a keyword argument
return None
return config
else:
return None
return self.variables.get(key)

View File

@ -17,9 +17,9 @@ function buildUpdateConnection() {
// Establishes server-sent-event-connection, which is used for all sorts of
// updates and exists as long as the client is running.
// Executed at programstart (see also SEAWebClientMain.js).
// Executed at program start (see also SEAWebClientMain.js).
var path = "http://" + hostPort + "/update";
var path = "http://" + hostPort + "/update?" + window.clientTags;
if (debugCommunication) {
console.log("%cto server (SSE): " + path,
"color:white;background:lightblue");
@ -29,8 +29,7 @@ function buildUpdateConnection() {
var src = new EventSource(path);
} catch (e) {
console.log(e)
alertify.prompt(
"NETWORK ERROR",
alertify.prompt("NETWORK ERROR",
"Failed to establish connection to data-server at the given address!"
+ "Try to enter HOST and PORT of the data-server manually!",
hostPort, function(evt, value) {
@ -50,9 +49,7 @@ function buildUpdateConnection() {
src.onerror = function(e) {
console.log(e);
console.log('EVTSRC error')
alertify
.prompt(
"NETWORK ERROR",
alertify.prompt("NETWORK ERROR",
"Failed to establish connection to data-server at the given address!"
+ "Try to enter HOST and PORT of the data-server manually!",
hostPort, function(evt, value) {
@ -306,7 +303,7 @@ function reqJSONPOST(s, url, parameters, successHandler, errorHandler) {
var xhr = typeof XMLHttpRequest != 'undefined' ? new XMLHttpRequest()
: new ActiveXObject('Microsoft.XMLHTTP');
if (debugCommunication) {
console.log("%cto server (reqJSON): %s",
console.log("%cto server (reqJSONPOST): %s",
"color:white;background:lightgreen", url);
}
xhr.open('post', url, true);
@ -408,8 +405,11 @@ function successHandler(s, message) {
begin = timeRange[0] - timeRange[1];
select.value = begin;
// Server-request for variable-list.*/
reqJSONPOST(0, "http://" + hostPort + "/getvars", "time=" + timeRange[1] + "&userconfiguration=" + JSON.stringify(getFormattedUserConfigurationFromLocalStorage()) + "&id="
+ clientID, successHandler, errorHandler);
console.log('TIME', timeRange)
reqJSONPOST(0, "http://" + hostPort + "/getvars",
"time=" + timeRange[0] + ',' + timeRange[1]
+ "&userconfiguration=" + JSON.stringify(getFormattedUserConfigurationFromLocalStorage())
+ "&id=" + clientID, successHandler, errorHandler);
break;
// Response to a "getvars"-server-request.
case "var_list":
@ -429,6 +429,7 @@ function successHandler(s, message) {
nextInitCommand();
}*/
// graphs.receivedVars(message.blocks);
document.getElementById("device").innerHTML = message.device
graphs.initGraphs(message.blocks);
nextInitCommand();
break;

View File

@ -371,10 +371,9 @@ function loadExportPopup(){
*/
function exportCallback(selectedVariables, startDateTimeMs, endDateTimeMs, nan, binning=null){
let binningParam = "None";
if (binning !== null)
binningParam = binning
let exportURL = "http://" + hostPort + "/export?time=" + startDateTimeMs/1000 + "," + endDateTimeMs/1000 + "&variables=" + selectedVariables + "&nan=" + nan + "&interval=" + binningParam + "&id=" + clientID
if (binning === null || binning == "None")
binning = "";
let exportURL = "http://" + hostPort + "/export?time=" + startDateTimeMs/1000 + "," + endDateTimeMs/1000 + "&variables=" + selectedVariables + "&nan=" + nan + "&interval=" + binning + "&id=" + clientID
let a = document.createElement('a');
a.href = exportURL
a.download = true
@ -417,7 +416,7 @@ let graphs = (function (){
let minTime, maxTime; // the queried time range
let lastTime = 0; // time of most recent data point
let resolution = undefined; // current window resolution (ms/pixel)
// let resolution = undefined; // current window resolution (ms/pixel)
let activateUpdateTimeout = undefined; // timeout for the activateUpdates function
let updateAutoTimeout = undefined; // timeout for the updateAuto function (used in onZoomCompleteCallback)
@ -542,12 +541,13 @@ let graphs = (function (){
varlist = vars_array[gindex];
let graph_elm = graph_elm_array[gindex];
timeDeltaAxis = maxTime - minTime
setResolution(timeDeltaAxis)
resolution = getResolution((maxTime - minTime) / 1000)
AJAX("http://" + hostPort + "/graph?time=" + minTime/1000 + "," + maxTime/1000 + "&variables=" + varlist + "&interval=" + resolution + "&id=" + clientID).getJSON().then(function(data){
//console.log('Graph', block, data)
AJAX("http://" + hostPort + "/graph?time=" + minTime/1000 + "," + maxTime/1000
+ "&variables=" + varlist
+ "&interval=" + resolution
+ "&id=" + clientID).getJSON().then(function(data){
// console.log('Graph', block, data);
let graph = new Graph(gindex, graph_elm, "Time", block.unit, block.tag, type);
graph_array[gindex] = graph;
@ -559,7 +559,6 @@ let graphs = (function (){
for(let e of data.graph[key]){
pdata.push({x: e[0]*1000, y: e[1]});
}
addDataset(gindex, key, pdata, dict[key])
// if(pdata.length > 0){
// addDataset(gindex, key, pdata, dict[key])
@ -777,10 +776,12 @@ let graphs = (function (){
max = max/1000;
}
timeDelta = currentMaxTime - currentMinTime
setResolution(timeDelta)
resolution = getResolution((currentMaxTime - currentMinTime) / 1000)
AJAX("http://" + hostPort + "/graph?time=" + min + ","+max+"&variables=" + variables() + "&interval=" + resolution + "&id=" + clientID).getJSON().then(function(data){
AJAX("http://" + hostPort + "/graph?time=" + min + ","+max
+"&variables=" + variables()
+ "&interval=" + resolution
+ "&id=" + clientID).getJSON().then(function(data){
for(let key in data.graph){
let pdata = [];
for(let e of data.graph[key]){
@ -893,8 +894,8 @@ let graphs = (function (){
* Sets the resolution of the viewing window in milliseconds
* @param {*} timeDelta - The difference between the maximum time and the minimum time of the window
*/
function setResolution(timeDelta){
resolution = Math.ceil((timeDelta / container.getBoundingClientRect().width))
function getResolution(timeDelta){
return Math.ceil((timeDelta / container.getBoundingClientRect().width))
}
/**
@ -915,7 +916,10 @@ let graphs = (function (){
msRightTimestampGetVars = dateTimestampMs + timeValueMs;
msRightTimestampGetGraph = dateTimestampMs + 24*60*60*1000;
AJAX("http://" + hostPort + "/getvars").postForm("time=" + msRightTimestampGetVars/1000 + "&userconfiguration=" + JSON.stringify(getFormattedUserConfigurationFromLocalStorage()) + "&id="+ clientID).then(function(data){
AJAX("http://" + hostPort + "/getvars").postForm(
"time=" + msRightTimestampGetVars/1000
+ "&userconfiguration=" + JSON.stringify(getFormattedUserConfigurationFromLocalStorage())
+ "&id="+ clientID).then(function(data){
blocks = data.blocks;
document.getElementById("device").innerHTML = data.device
maxTime = msRightTimestampGetGraph;
@ -989,7 +993,10 @@ let graphs = (function (){
window["wideGraphs"] = false; // will have no effect if hideRightPart is true
adjustGrid();
AJAX("http://" + hostPort + "/getvars").postForm("time=" + msRightTimestamp/1000 + "&userconfiguration=" + JSON.stringify(getFormattedUserConfigurationFromLocalStorage()) + "&id="+ clientID).then(function(data){
AJAX("http://" + hostPort + "/getvars").postForm(
"time=" + msRightTimestamp/1000 + "&userconfiguration="
+ JSON.stringify(getFormattedUserConfigurationFromLocalStorage())
+ "&id="+ clientID).then(function(data){
currentMaxTime = msRightTimestamp + 60000;
currentMinTime = msLeftTimestamp;
@ -1063,7 +1070,7 @@ let graphs = (function (){
currentMaxTime = maxTime;
currentMinTime = minTime;
}
AJAX("http://" + hostPort + "/gettime?time=-1800,0&id="+ clientID).getJSON().then(function(data){
AJAX("http://" + hostPort + "/gettime?time=" + window['timerange'] + "&id="+ clientID).getJSON().then(function(data){
startTime = data.time[1]*1000;
maxTime = startTime;
currentMaxTime = maxTime + 60000;
@ -1291,7 +1298,10 @@ let graphs = (function (){
function applySettingsCallback(userConfiguration){
cursorLine(null);
AJAX("http://" + hostPort + "/getvars").postForm("time=" + currentMaxTime/1000 + "&userconfiguration=" + JSON.stringify(userConfiguration) + "&id="+ clientID).then(function(data){
AJAX("http://" + hostPort + "/getvars").postForm(
"time=" + currentMaxTime/1000
+ "&userconfiguration=" + JSON.stringify(userConfiguration)
+ "&id="+ clientID).then(function(data){
blocks = data.blocks;
document.getElementById("device").innerHTML = data.device
maxTime = currentMaxTime;

View File

@ -90,6 +90,21 @@ new Settings()
.treat("hideRightPart", "hr", to_bool, false) //used to completely disable the right part
.treat("wideGraphs", "wg", to_bool, false) //used to toggle the size of the graphs part
.treat("showAsync", "sa", to_bool, false)
.treat("device", "dev", 0, "")
.treat("server", "srv", 0, "")
.treat("instrument", "instrument", 0, "")
.treat("timerange", "time", 0, "-1800,0")
if (window.instrument) {
window.clientTags = "&instrument=" + window.instrument;
} else {
let args = '';
if (window.server) { args += "&stream=" + window.server; }
if (window.device) { args += "&device=" + window.device; }
window.clientTags = args;
}
console.log('TAGS', window.clientTags);
function loadFirstBlocks() {
if (debug_main_daniel) {
@ -99,11 +114,19 @@ function loadFirstBlocks() {
if (showMain) pushInitCommand("getblock?path=main&", "main")
if (showConsole) pushInitCommand("console?", "console")
if (nColumns == 1) { // probably mobile phone}
if (showGraphics) pushInitCommand("gettime?time=-1800,0&", "graphics")
if (showGraphics) pushInitCommand("gettime?time=" + window.timerange + "&", "graphics")
if (showOverview) pushInitCommand("getblock?path=_overview&", "overview")
var goFS = document.getElementById('header');
goFS.addEventListener(
'click',
function () {
document.body.requestFullscreen();
},
false,
);
} else {
if (showOverview) pushInitCommand("getblock?path=_overview&", "overview")
if (showGraphics) pushInitCommand("gettime?time=-1800,0&", "graphics")
if (showGraphics) pushInitCommand("gettime?time=" + window.timerange + "&", "graphics")
// last is shown first
}
}
@ -156,7 +179,7 @@ window.onload = function() {
let crossElement = document.getElementById("close-cross");
if(window["hideRightPart"]){
if(window.hideRightPart){
document.body.removeChild(crossElement);
}else{
crossElement.onclick = function(){
@ -180,7 +203,7 @@ window.onload = function() {
elements[2].style.display = "none"; // hide parameters
}
}else{ // else it toggles the graphs window's size and triggers the adjustGrid()
window["wideGraphs"] = !window['wideGraphs'];
window.wideGraphs = ! window.wideGraphs;
adjustGrid();
}
}
@ -194,10 +217,9 @@ window.onload = function() {
// var homeButton = document.getElementById("home-icon");
// TODO : uncomment this code with the right URL to navigate to when the way to select the instrument will be decided.
// homeButton.onclick = function () {
// window.location = "http://" + location.hostname + ":8800/";
// };
homeButton.onclick = function () {
window.location = "/select_experiment";
};
buildUpdateConnection();
// if (location.hash) {
// console.log("hash in url", location.hash);

View File

@ -18,7 +18,7 @@ def assign_colors_to_curves(blocks):
auto_curves = []
for curve in block["curves"]:
col = curve["color"].strip()
col = curve.get("color", "").strip()
c = ColorMap.to_code(col)
if c < 0:
valid = ColorMap.check_hex(col)

View File

@ -20,4 +20,15 @@ T_sorb.target=-
T_still=unit:K,color:orange
dil=-
lev=unit:%,color:brown
lev.n2=*,unit:%,color:black
lev.n2=unit:%,color:black
hefill=-
ln2fill=-
hepump=-
hemot=-
nv=-
nv.flow=unit:ln/min
nv.flowtarget=unit:ln/min
nv.flowp=unit:ln/min
stickrot=unit:deg
tcoil1=*_coil,unit:K
tcoil2=*_coil,unit:K

View File

@ -1,4 +1,5 @@
[INFLUX]
url=http://localhost:8086
url=http://linse-a:8086
org=linse
bucket=curve-test
token=zqDbTcMv9UizfdTj15Fx_6vBetkM5mXN56EE9CiDaFsh7O2FFWZ2X4VwAAmdyqZr3HbpIr5ixRju07-oQmxpXw==

View File

@ -17,6 +17,7 @@ class InfluxDB:
def __init__(self):
config = ConfigParser()
config.optionxform = str
config.read("./config/influx.ini")
self._client = InfluxDBClient(url=config["INFLUX"]["url"], token=config["INFLUX"]["token"],
org=config["INFLUX"]["org"])
@ -84,35 +85,6 @@ class InfluxDataGetter:
# ----- PUBLIC METHODS
def get_available_variables_at_time(self, time, chart_configs = None, user_config = None):
"""
Gets the available variables (those that we can have a value for since the device has been installed on the instrument) at the given point in time.
Here, a variable means : SECOP module name + parameter. By default, this method returns the parameters "value" and "target", unless the config files used in chart_configs or user_config indicates other directives.
Parameters :
time (int) : the unix timestamps in seconds of the point in time to get the variables at.
chart_configs ([ChartConfig] | None) : an array of objects, each holding a configuration file for the chart. Configurations are applied in the order of the list.
user_config ({(str):{"cat":(str), "color":(str), "unit":(str)}} | None) : the Python dict representing the user configuration, applied at the end. The key is <secop_module.parameter>.
Returns :
[{"tag":(str), "unit":(str), "curves":[{"name":(str), "label":(str), "color":(str)}]}] : a list of dictionnaries, each one representing
a block of curves with their name, their label and their color to display, grouped by their category if given or unit (in tag).
"""
all_setup_info = self._get_all_setup_info_as_dict(time)
available_variables = self._extract_variables(all_setup_info)
if not chart_configs == None:
for chart_config in chart_configs:
available_variables = self._filter_params_with_config(available_variables, chart_config)
if not user_config == None:
available_variables = self._filter_params_with_user_config(available_variables, user_config)
available_variables = self._remove_variables_params_not_displayed(available_variables)
available_variables = self._remove_variables_params_wihout_param_float_and_split(available_variables, time)
res = self._group_variables_by_cat_unit(available_variables)
return res
def get_curves_in_timerange(self, variables, time, interval = None):
"""
Gets the curves for the given variables within a timerange.
@ -199,18 +171,17 @@ class InfluxDataGetter:
parameter = "value" if len(var_param) == 1 else var_param[1]
# we need to rename the "_time" column to simply "time" in case we want binning because of the comparison done later in the "binned points with same timestamp" process.
# chr(34) is the double quote char, because we cannot escape them in a f string
query = f"""
from(bucket: "{self._bucket}")
|> range(start: {times[0]}, stop: {times[1] + 1})
|> filter(fn : (r) => r._measurement == "{variable_name_for_query}")
|> filter(fn : (r) => r._field == "{parameter+"_float"}")
{"|> aggregateWindow(every: duration(v: "+ str(self._seconds_to_nanoseconds(interval))+"), fn: last, createEmpty:false, timeDst:"+chr(34)+"binning_time"+chr(34)+")" if interval != "None" else ""}
{f'|> aggregateWindow(every: duration(v: {self._seconds_to_nanoseconds(interval)}), fn: last, createEmpty:false, timeDst:"binning_time")' if interval != 'None' else ''}
|> map(fn: (r) => ({{r with relative: ( float(v: uint(v: {"r.binning_time" if interval != "None" else "r._time"}) - uint(v:{self._seconds_to_nanoseconds(times[0])})) / 1000000000.0 )}}))
|> map(fn: (r) => ({{r with timestamp: float(v: uint(v: {"r.binning_time" if interval != "None" else "r._time"})) / 1000000000.0}}))
{"|> rename(columns: {_time:"+chr(34)+"time"+chr(34)+"})" if interval != "None" else ""}
{'|> rename(columns: {_time:"time"})' if interval != 'None' else ''}
|> drop(columns:["_start", "_stop", "_field"])
|> pivot(rowKey:["relative", "timestamp", "expired"{", "+chr(34)+"time"+chr(34) if interval != "None" else ""}], columnKey: ["_measurement"], valueColumn: "_value")
|> pivot(rowKey:["relative", "timestamp", "expired"{', "time"' if interval != "None" else ''}], columnKey: ["_measurement"], valueColumn: "_value")
"""
data_frame = self._db.query_data_frame(query)
@ -322,75 +293,6 @@ class InfluxDataGetter:
# ----- PRIVATE METHODS
def _get_all_setup_info_as_dict(self, time):
"""
Gets the value of the field setup_info in the measurements nicos/se_main, nicos/se_stick, nicos/se_addons as an array of Python dicts.
Takes the last setup_info dict (for each measurement) known at time.
Parameters
time (int) : the unix timestamps in seconds of the point in time to get the variables at.
Returns :
[{(str):((str), {...})}]: an array of the parsed "setup_info dict" of each measurements. The key is the secop_module prefixed with "se_", and the value is a tuple with its first value
being the type of Secop device for this module, and the value is too big to give its signature. Some tuple examples can be found under graphs/setup_info_examples.
"""
measurements = ["nicos/se_main", "nicos/se_stick", "nicos/se_addons"]
res = []
for measurement in measurements:
query = f"""
from(bucket: "{self._bucket}")
|> range(start: 0, stop: {time + 1})
|> filter(fn: (r) => r._measurement == "{measurement}")
|> filter(fn: (r) => r._field == "setup_info")
|> last()
|> yield(name: "res")
"""
tables = self._db.query(query)
for table in tables:
for record in table.records:
res.append(ast.literal_eval(record.get_value()))
return res
def _extract_variables(self, all_setup_info_dict):
"""
Extracts relevant information out of the setup_info dict for each available variable in measurements nicos/se_main, nicos/se_stick, nicos/se_addons
Parameters :
all_setup_info_dict ([{(str):((str), {...})}]) : an array of the parsed "setup_info dict" of each measurements. The key is the secop_module prefixed with "se_", and the value is a tuple with its first value
being the type of Secop device for this module, and the value is too big to give its signature. Some tuple examples can be found under graphs/setup_info_examples.
Returns :
[{"name":(str), "label":(str), "params":{(str):{"cat":(str), "color":(str), "unit":(str)}}}] : an array of dictionnaries, each containing the Influx name of the corresponding variable out of the setup_info dict,
the label to display in the Web GUI, and a dictionnary of parameters (including value), which consist of dictionnares with the category ("*" for value and target, "-" else), the color (empty for the moment)
and the unit ("1" if not available or empty), indexed by the name of the parameter.
"""
available_varirables = []
added_names = []
for setup_info_dict in all_setup_info_dict:
for (_, content) in setup_info_dict.items():
if content[0] != "nicos.devices.secop.devices.SecopDevice":
name = self._transform_secop_module_name_to_influx(content[1]["secop_module"])
if name not in added_names:
value_unit = "1" if (not "unit" in content[1].keys() or content[1]["unit"] == "") else content[1]["unit"]
variable = {
"name":name,
"label":content[1]["secop_module"],
"params":{"value":{"cat":"*", "color":"", "unit":value_unit}} # main value, shown by default
}
for param_name, param_content in content[1]["params_cfg"].items():
param_unit = "1" if (not "unit" in param_content.keys() or param_content["unit"] == "") else param_content["unit"]
variable["params"][param_name] = {
"cat":"*" if param_name == "target" else "-", # target is also shown by default, not the other parameters
"color":"",
"unit":param_unit
}
available_varirables.append(variable)
added_names.append(name)
return available_varirables
def _transform_secop_module_name_to_influx(self, secop_module_name):
"""
Transforms the name of the variable available in the setup_info dict into the Influx name.

View File

@ -1,19 +1,22 @@
import time
from time import time as current_time
import logging
import json
import io
import uuid
from influxdb import InfluxDB, InfluxDataGetter
# from configparser import ConfigParser
from math import ceil
from sehistory.seinflux import fmtime
from colors import assign_colors_to_curves
from chart_config import ChartConfig
from base import Instrument, get_abs_time
from secop import SecopClient, SecopInstrument
from base import get_abs_time, HandlerBase
class InfluxGraph:
"""
Class implementing the logic of the different routes that are called by
the client to retrieve graph data with InfluxDB.
def split_tags(tags):
return {k: v.split(',') for k, v in tags.items()}
class InfluxGraph(HandlerBase):
"""Class implementing the logic of the different routes that are called by the client to retrieve graph data with InfluxDB.
Global constants :
HISTORICAL (int) : value that represents the "historical" visualization mode, meaning that the
@ -29,96 +32,114 @@ class InfluxGraph:
livemode (int) : the type of visualization the user is currently in. Can be HISTORICAL, ACTUAL or LIVE.
end_query (int) : the unix timestamp in seconds of the most recent requested point in time of the last query
or update.
lastvalues ({(str):((int), (float))}) : a dictionnary where the keys are the variable names, and the values
last_values ({(str):((int), (float))}) : a dictionnary where the keys are the variable names, and the values
are tuples, where the first value is the unix timestamp of the most recent value known for this variable,
and the second value its corresponding value
variables ({(str):(str)}) : a dictionnary of the current available variables requested by the client.
variables ({(str):(str)}) : a dictionary of the current available variables requested by the client.
The key is the InfluxDB name of the curve, and the value is its label in the GUI.
"""
HISTORICAL = 0
ACTUAL = 1
LIVE = 2
def __init__(self, influx_data_getter, instrument):
self.influx_data_getter = influx_data_getter
self.chart_configs = [ChartConfig("./config/generic.ini"), ChartConfig(f"./config/{instrument}.ini")]
def __init__(self, server, instrument, device_name, tags):
"""create instance for retrieving history
:param db: a database client (SEInflux instance)
:param instrument: the name of anm instrument or None
:param streams: a stream or comma separated list of streams
:param devices: a device name ar a comma separated list of devices
:param device_name: (comma separated) device name for labelling
typically only one of the 3 last parameters are needed
if more are specified, all of them must be fulfilled
"""
super().__init__() # put methods w_... to handlers
self.handlers['graphpoll'] = self.graphpoll
self.server = server
self.db = server.db
# self.influx_data_getter = influx_data_getter
self.chart_configs = [ChartConfig("./config/generic.ini")]
self.instrument = instrument
self.device_name = device_name
if instrument: # TODO: should it not be better to have inifiles per device?
try:
self.chart_configs.append(ChartConfig(f"./config/{instrument}.ini"))
except KeyError:
pass
self.livemode = self.HISTORICAL
self.end_query = 0
self.lastvalues = {}
self.variables = {} # name:label
def complete_to_end_and_feed_lastvalues(self, result, endtime):
"""
Completes the data until the last requested point in time by adding the last known y-value at the end point.
Also feeds self.lastvalues.
Parameters :
result ({(str):[[(int),(float)]]}) : a dictionnary with the variable names as key, and an array of points,
which are a tuple (timestamp, y-value as float)
endtime (int) : the unix timestamp in seconds of the time we want to have data until
"""
for var, c in result.items():
if c:
lastt, lastx = c[-1]
if lastt < endtime:
c.append((endtime, lastx))
self.lastvalues[var] = (lastt, lastx)
self.last_values = {} # dict <variable> of last known point (<time>, <value>)
self.last_time = {} # dict <stream> of last received time
self.last_minute = 0
self.last_update = 0 # time of last call with a result
self.tags = None
self.init_tags = tags
def w_graph(self, variables, time="-1800,0", interval=None):
"""
Gets the curves given by variables in the time range "time", spaced by "interval" if given (binning/resolution)
"""Get the curves given by variables in the time range "time"
spaced by "interval" if given (binning/resolution)
Called when the route /graph is reached.
Parameters :
variables (str) : a comma separataed value string of variable names (influx names) to retrieve
time (str) : a commma separated value string (range) of seconds. They are treated as relative from now
if they are lesser than one year.
interval (str) : the interval (resolution) of the values to get (string in milliseconds)
variables (str) : a comma separated string of variable names (influx names) to retrieve
time (str) : a commma separated value string (range) of seconds.
values < one year are treated as relative from now.
interval (str) : the interval (resolution) of the values to get (string in seconds)
Returns :
{"type":"graph-draw", "graph":{(str):[[(int),(float)]]}} : a dictionnary with its "graph-draw" type
(so it can be processed by the client), and a "graph" dictionnary with the variable names as key,
{"type":"graph-draw", "graph":{(str):[[(int),(float)]]}} : a dictionary with its "graph-draw" type
(so it can be processed by the client), and a "graph" dictionary with the variable names as key,
and an array of points as a tuple (timestamp, y-value as float)
"""
time = [float(t) for t in time.split(',')]
start, end, now = get_abs_time(time + [0])
start, end, now = int(start), int(end), int(now)
queried_time_range = [start, end]
start, end, now = get_abs_time([float(t) for t in time.split(',')] + [0])
start, end, now = int(start), ceil(end), ceil(now)
queried_variables = variables.split(',')
self.livemode = self.ACTUAL if end+10 >= now else self.HISTORICAL
logging.info('LIVE %g %g %d %d', end, now, end >= now, self.livemode)
if interval : interval = int(interval)
if interval:
interval = float(interval)
result = self.db.curves(start, end, queried_variables, merge='_measurement',
interval=interval or None, **self.tags)
self.update_last(result)
self.db.complete(result, self.last_time, 'stream')
self.last_minute = now // 60
return dict(type='graph-draw', graph={k: result[k] for k in queried_variables if k in result})
result = self.influx_data_getter.get_curves_in_timerange(queried_variables, queried_time_range, interval)
self.complete_to_end_and_feed_lastvalues(result, min(end, now))
self.end_query = end
return dict(type='graph-draw', graph=result)
def update_last(self, curve_dict):
"""update last values per variable and last time per stream"""
for key, curve in curve_dict.items():
stream = curve.tags.get('stream')
tlast, value = curve[-1]
self.last_values[key] = curve[-1]
self.last_time[stream] = max(self.last_time.get(stream, 0), tlast)
def w_gettime(self, time):
"""
Gets the server time for the give time.
"""Get the server time for the given time(range).
Called when the route /gettime is reached.
Parameters :
time (str="-1800,0") : the given point in time represented by a string, which is a comma separated unix
timestamp values list (in seconds). They are treated as relative from now if they are lesser than one year.
time (str="-1800,0") : the given point in time represented by a string,
which is a comma separated unix timestamp values list (in seconds).
values < one year are treated as relative from now.
Returns :
{"type":"time", "time":(int)} : a dictionnary with its "time" type (so the data can be processed by the
{"type":"time", "time":(int)} : a dictionary with its "time" type (so the data can be processed by the
client) and the server unix timestamp in seconds corresponding to the time asked by the client
"""
time = [float(t) for t in time.split(',')]
return dict(type='time', time=get_abs_time(time))
return dict(type='time', time=get_abs_time(
[float(t) for t in time.split(',')]))
def w_getvars(self, time, userconfiguration = None):
"""
Gets the curve names available at a given point in time, with a possible user configuration on the client side.
def w_getvars(self, time, userconfiguration=None, **_):
"""Get the curve names available at a given point in time
with a possible user configuration on the client side.
Called when the route /getvars is reached.
Parameters :
time (str) : the given point in time represented by a string, which is a unix timestamp in seconds.
It is treated as relative from now if it is lesser than one year.
values < one year are treated as relative from now.
Might also be a comma separated time range.
userconfiguration (str|None) : the JSON string representing the user configuration
Returns :
@ -130,25 +151,135 @@ class InfluxGraph:
category or unit if absent) and their unit (in "blocks")
"""
time = [float(t) for t in time.split(',')]
end_time = int(get_abs_time(time)[-1])
if not userconfiguration == None : userconfiguration = json.loads(userconfiguration)
blocks = self.influx_data_getter.get_available_variables_at_time(end_time, self.chart_configs, userconfiguration)
device_name = self.influx_data_getter.get_device_name(end_time)
# updates the self.variables attribute to keep track of the available variables
self.variables = {variable["name"]:variable["label"] for block in blocks for variable in block["curves"]}
time = get_abs_time([float(t) for t in time.split(',')])
start_time = int(time[0])
end_time = int(time[-1])
if userconfiguration is not None:
userconfiguration = json.loads(userconfiguration)
if self.instrument:
streams, tags, self.device_name = self.server.lookup_streams(self.instrument, **self.init_tags)
self.tags = {**self.init_tags, **tags}
blocks = self.get_available_variables(start_time, end_time, self.chart_configs, userconfiguration)
# initialize self.last_values to keep track of the available variables
self.last_values = {var["name"]: [0, None] for block in blocks for var in block["curves"]}
assign_colors_to_curves(blocks)
result = dict(type='var_list')
result['blocks'] = blocks
result['device'] = device_name
# print('DEVICE', device_name, tags)
# for block in blocks:
# print(block['tag'], [c['name'] for c in block['curves']])
return {'type': 'var_list', 'blocks': blocks, 'device': self.device_name}
def get_available_variables(self, start_time, end_time, chart_configs=None, user_config=None):
"""Gets the available variables
(those that we can have a value for since the device has been installed
on the instrument) at the given point in time.
Here, a variable means : SECOP module name + parameter.
By default, this method returns the parameters "value" and "target",
unless the config files used in chart_configs or user_config indicates other directives.
Parameters :
start_time, send_time (int) : the unix timestamps in seconds of the point in time to get the variables at.
chart_configs ([ChartConfig] | None) :
an array of objects, each holding a configuration file for the chart.
Configurations are applied in the order of the list.
user_config ({(str):{"cat":(str), "color":(str), "unit":(str)}} | None) :
the Python dict representing the user configuration, applied at the end.
The key is <secop_module.parameter>.
Returns :
[{"tag":(str), "unit":(str), "curves":[{"name":(str), "label":(str), "color":(str)}]}] :
a list of dicts, each one representing
a block of curves with their name, their label and their color to display,
grouped by their category if given or unit (in tag).
"""
if start_time == end_time:
start_time = end_time - 3600
result = self.db.curves(start_time, end_time, _measurement=None,
merge='_measurement', **self.tags)
assert all(c.key_names[0] == '_measurement' for c in result.values())
variables = {k: t.tags.get('unit') for k, t in result.items()}
config = {}
if chart_configs:
for chart_config in chart_configs:
for key, cfg in chart_config.variables.items():
config.setdefault(key, {}).update(cfg)
if user_config:
for key, cfg in user_config.items():
config.setdefault(key, {}).update(cfg)
groups = {}
def add_to_groups(name, cat=None, unit='1', color='', label=None):
if cat == '-':
return
if name.endswith('.value'):
if not cat:
cat = '*'
if not label:
label = name[:-6]
elif name.endswith('.target'):
if not cat:
cat = '*'
elif not cat:
return
unit = unit or '1'
tag = cat.replace('*', unit)
grp = groups.get(tag)
if grp is None:
crv_dict = {}
groups[tag] = {'tag': cat.replace('*', unit), 'unit': unit, 'curves': crv_dict}
else:
crv_dict = grp['curves']
crv_dict[name] = {'name': name, 'unit': unit, 'label': label or name}
# treat variables in config first (in their order!)
for key, cfg in config.items():
cat = cfg.pop('cat', None)
cfgunit = cfg.pop('unit', '')
if '.' in key:
unit = variables.pop(key, object)
if unit is not object:
add_to_groups(key, cat, cfgunit or unit, **cfg)
else:
var = f'{key}.value'
unit = variables.pop(var, object)
if unit is not object:
label = cfg.pop('label', None) or key
add_to_groups(var, cat, cfgunit or unit, label=label, **cfg)
var = f'{key}.target'
unit = variables.pop(var, object)
if unit is not object:
cfg.pop('color', None)
add_to_groups(var, cat, cfgunit or unit, **cfg)
for var, unit in variables.items():
add_to_groups(var, unit=unit)
# make order a bit more common
result = []
for key in ['K', 'T', 'W', 'ln/min'] + list(groups):
if key in groups:
group = groups.pop(key)
curve_dict = group['curves']
curves = []
# get first '.value' parameters and add targets if available
ordered_keys = [f'{m}.value' for m in ('tt', 'T', 'ts', 'Ts')]
for name in ordered_keys + list(curve_dict):
if name.endswith('.value'):
try:
curves.append(curve_dict.pop(name))
curves.append(curve_dict.pop(f'{name[:-6]}.target'))
except KeyError:
pass # skip not existing or already removed items
# add remaining curves
curves.extend(curve_dict.values())
print(key, curves)
group['curves'] = curves
result.append(group)
return result
def w_updategraph(self):
"""
Sets the current visualisation mode to LIVE if not in HISTORICAL mode.
"""Set the current visualisation mode to LIVE if not in HISTORICAL mode.
Called when the route /updategraph is reached.
Returns :
{"type":"accept-graph", "live": bool} : a dict with its "accept-graph" type and a "live"
@ -161,7 +292,7 @@ class InfluxGraph:
self.livemode = self.LIVE
return dict(type='accept-graph', live=True)
def w_export(self, variables, time, nan, interval):
def w_export(self, variables, time, nan, interval, timeoffset=None):
"""
Returns the bytes of a dataframe with the curves given by variables in the time range "time"
Called when the route /export is reached.
@ -176,19 +307,15 @@ class InfluxGraph:
io.BytesIO : an BytesIO object containing the dataframe to retrieve
"""
time = [float(t) for t in time.split(',')]
start, end = get_abs_time(time)
start, end = int(start), int(end)
start, end = get_abs_time([float(t) for t in time.split(',')])
start, end = int(start), ceil(end)
queried_variables = variables.split(',')
if interval != "None" : interval = int(interval)
df = self.influx_data_getter.get_curves_data_frame(queried_variables, [start, end], interval, self.variables)
mem = io.BytesIO()
df.to_csv(mem, sep="\t", index=False, float_format="%.15g", na_rep=nan)
mem.seek(0)
return mem
interval = float(interval) if interval else None
timeoffset = None if timeoffset == 'now' else (timeoffset or 0)
result = self.db.export(start, end, queried_variables, timeoffset=timeoffset, none=nan, interval=interval,
**self.tags)
return io.BytesIO(result.encode('utf-8'))
def graphpoll(self):
"""
@ -199,48 +326,69 @@ class InfluxGraph:
Returns :
{"type":"graph-update", "time":(int), "graph":{(str):[[(int),(float)]]}} | None :
a dictionnary with its "graph-update" type
(so it can be processed by the client), and a "graph" dictionnary with the variable names as key,
a dictionary with its "graph-update" type
(so it can be processed by the client), and a "graph" dictionary with the variable names as key,
and an array of points, which are an array containing the timestamp
as their first value, and the y-value in float as their second one
"""
if self.livemode != self.LIVE:
return None
now, = get_abs_time([0])
result = self.influx_data_getter.poll_last_values(list(self.variables.keys()), self.lastvalues, now)
for variable, values in list(result.items()):
tlast = self.lastvalues.get(variable, (0,))[0]
# removes points older than the last known point
# (queries are in seconds and might return points already displayed)
while values and values[0][0] <= tlast:
values.pop(0)
if values and values[-1][0] > tlast:
self.lastvalues[variable] = values[-1]
now = current_time()
if now < int(self.last_update) + 1.5:
# the server is only waiting after a None return
# this avoids to many queries with expected empty result
return None
last_time = int(min(self.last_time.values(), default=now-3600))
# if len(self.last_time) > 1:
# print('time_poll_jitter', max(self.last_time.values()) - min(self.last_time.values()))
prev_minute, self.last_minute = self.last_minute, now // 60
fullminute = prev_minute != self.last_minute
add_prev = 3600 if fullminute else 0
result = self.db.curves(last_time, None, list(self.last_values),
merge='_measurement', add_prev=add_prev, **self.tags)
to_remove = {}
for key, curve in result.items():
tlast = self.last_values.get(key, [0])[0]
# remove points older than the last known point. this might happen for different reasons:
# - queries are rounded to seconds
# - clocks of different streams might not be synched
l = len(curve)
for i, row in enumerate(curve):
if row[0] > tlast:
del curve[:i]
break
else:
del result[variable]
if int(now / 60) != int(self.end_query / 60):
# Update unchanged values every plain minute
for var, (_, lastx) in self.lastvalues.items():
if var not in result:
result[var] = [(now, lastx)]
self.end_query = now
if not fullminute:
to_remove[key] = l
self.update_last(result)
if fullminute:
self.db.complete(result, self.last_time, 'stream')
for key, length in to_remove.items():
curve = result[key]
if len(curve) > l:
del curve[:l]
else:
if fullminute:
print('R', key)
result.pop(key)
# print('poll', sum(len(c) for c in result.values()), self.last_time)
if len(result) > 0:
return dict(type='graph-update', time=now, graph=result)
self.last_update = now
return dict(type='graph-update', time=last_time, graph=result)
return None
class InfluxInstrument(Instrument):
def __init__(self, instr_name, inst_config=None):
super().__init__()
self.db = InfluxDB()
self.influx_data_getter = InfluxDataGetter(self.db, instr_name)
self.title = instr_name
self.device = self.influx_data_getter.get_device_name(int(time.time()))
def new_client(self):
return self.register(InfluxClient(self))
# class InfluxInstrument(HandlerBase):
#
# def __init__(self, instr_name, inst_config=None):
# super().__init__()
# self.db = InfluxDB()
# # self.influx_data_getter = InfluxDataGetter(self.db, instr_name)
# self.title = instr_name
# self.device = self.influx_data_getter.get_device_name(int(current_time()))
#
# def new_client(self):
# return self.register(InfluxClient(self))
class InfluxParams:
@ -265,41 +413,50 @@ class InfluxParams:
return dict(type='accept-command')
class InfluxClient(InfluxParams, InfluxGraph):
def __init__(self, instrument):
InfluxParams.__init__(self)
InfluxGraph.__init__(self, instrument.influx_data_getter, instrument.title)
def poll(self):
messages = self.queue
self.queue = []
msg = self.graphpoll()
if msg:
messages.append(msg)
return messages
class SecopInfluxClient(SecopClient, InfluxGraph):
def __init__(self, instrument):
SecopClient.__init__(self, instrument)
InfluxGraph.__init__(self, instrument.influx_data_getter, instrument.title)
def poll(self):
messages = super().poll()
msg = self.graphpoll()
if msg:
messages.append(msg)
return messages
class SecopInfluxInstrument(SecopInstrument):
def __init__(self, inst_name, instrument_config):
super().__init__(inst_name, instrument_config)
self.db = InfluxDB()
self.influx_data_getter = InfluxDataGetter(self.db, inst_name)
self.device = self.influx_data_getter.get_device_name(int(time.time()))
def new_client(self):
return self.register(SecopInfluxClient(self))
# class InfluxClient(InfluxParams, InfluxGraph):
# def __init__(self, instrument):
# InfluxParams.__init__(self)
# InfluxGraph.__init__(self, instrument)
#
# def poll(self):
# messages = self.queue
# self.queue = []
# msg = self.graphpoll()
# if msg:
# messages.append(msg)
# return messages
#
#
# class SecopInfluxClient(SecopClient, InfluxGraph):
# def __init__(self, instrument):
# SecopClient.__init__(self, instrument)
# InfluxGraph.__init__(self, instrument)
#
# def poll(self):
# messages = super().poll()
# msg = self.graphpoll()
# if msg:
# messages.append(msg)
# return messages
#
#
# class SecopInfluxInstrument(SecopInstrument):
#
# def __init__(self, inst_name, instrument_config):
# super().__init__(inst_name, instrument_config)
# config = ConfigParser()
# config.optionxform = str
# config.read("./config/influx.ini")
# section = config["INFLUX"]
# self.db = SEHistory()
# # self.db = InfluxDBWrapper(uri=section["url"], token=section["token"],
# # org=section["org"], bucket=section['bucket'])
# # self.influx_data_getter = InfluxDataGetter(self.db, inst_name)
# # self.device = self.influx_data_getter.get_device_name(int(current_time()))
#
# def get_streams(self, timestamp=None):
# return self.db.get_streams(None, timestamp)
#
# def get_experiments(self, start=None, stop=None):
# return self.db.get_experiments(start, stop)

View File

@ -1,9 +1,43 @@
#!/usr/bin/env python3
#!/usr/bin/env python
import sys
import pathlib
sys.path.insert(0, str((pathlib.Path(__file__) / '..').resolve()))
import webserver
from influxgraph import SecopInfluxInstrument
import argparse
import socket
from webserver import server
from base import Client
from influxgraph import InfluxGraph
from secop import SecopInteractor
from sehistory.seinflux import SEHistory
webserver.instrument = webserver.main(SecopInfluxInstrument)
def parseArgv(argv):
parser = argparse.ArgumentParser(
description="start a webserver for history and interaction",
)
# loggroup = parser.add_mutually_exclusive_group()
# loggroup.add_argument("-v", "--verbose",
# help="Output lots of diagnostic information",
# action='store_true', default=False)
# loggroup.add_argument("-q", "--quiet", help="suppress non-error messages",
# action='store_true', default=False)
parser.add_argument("port",
type=str,
help="port number to serve\n")
# parser.add_argument('-d',
# '--daemonize',
# action='store_true',
# help='Run as daemon',
# default=False)
parser.add_argument('-i',
'--instrument',
action='store',
help="instrument, if running on an instrument computer\n"
"if the value is HOST, take the host name as instrument name",
default=None)
return parser.parse_args(argv)
args = parseArgv(sys.argv[1:])
instrument = socket.gethostname().split('.')[0] if args.instrument == 'HOST' else args.instrument
server.run(int(args.port), SEHistory(), InfluxGraph, Client, single_instrument=instrument, secop=SecopInteractor)

163
secop.py
View File

@ -1,9 +1,8 @@
import logging
import uuid
from base import Instrument, get_abs_time
from frappy.client import SecopClient as SecNodeClient
from frappy.lib.enum import EnumMember
from frappy.datatypes import get_datatype
from base import HandlerBase
from frappy.client import SecopClient
# from frappy.lib.enum import EnumMember
# from frappy.datatypes import get_datatype
def convert_par(module, name, par):
@ -20,39 +19,38 @@ def convert_par(module, name, par):
return result
class SecopClient:
class SecopInteractor(SecopClient):
prio_par = ["value", "status", "target"]
hide_par = ["baseclass", "class", "pollinterval"]
skip_par = ["status2"]
def __init__(self, instrument):
self.instrument = instrument
self.id = uuid.uuid4().hex[0:15]
def __init__(self, uri, node_map):
super().__init__(uri)
self.module_updates = set()
self.param_updates = set()
self.updates = {}
def w_getblock(self, path):
path = path.split(',')[-1] # TODO: why this?
if path == "main":
components = [dict(type='rdlink', name=name+':value', title=name)
for node in self.instrument.nodes for name in node.modules]
self.param_updates = {'value'}
return dict(type='draw', path='main', title='modules', components=components)
def add_main_components(self, components):
# todo: treat non Readable classes correctly
components.extend(dict(type='rdlink', name=name + ':value', title=name)
for name in self.modules)
self.param_updates.add('value')
self.param_updates.add('status')
def get_components(self, path):
module = self.modules[path]
self.module_updates.add(path) # TODO: remove others?
node = self.instrument.node_map[path]
module = node.modules[path]
# logging.info('MP %r', path)
parameters = dict(module["parameters"])
components = []
for name in SecopClient.skip_par:
for name in SecopInteractor.skip_par:
if name in parameters:
parameters.pop(name)
for name in SecopClient.prio_par:
for name in SecopInteractor.prio_par:
if name in parameters:
components.append(convert_par(path, name, parameters.pop(name)))
components1 = []
for name in SecopClient.hide_par:
for name in SecopInteractor.hide_par:
if name in parameters:
components1.append(convert_par(path, name, parameters.pop(name)))
for name, p in parameters.items():
@ -72,27 +70,22 @@ class SecopClient:
# print(item)
self.updates[key] = item
def w_updateblock(self, path):
if path == 'main':
path = ''
for node in self.instrument.nodes:
for modname in node.modules:
def update_main(self):
cache = self.cache
for modname in self.modules:
key = modname, 'value'
if key in node.cache:
self.updateItem(*key, node.cache[key])
else:
node = self.instrument.node_map[path]
for param in node.modules[path]['parameters']:
if key in cache:
self.updateItem(*key, cache[key])
def update_params(self, path):
cache = self.cache
for param in self.modules[path]['parameters']:
key = path, param
if key in node.cache:
self.updateItem(*key, node.cache[key])
return dict(type='accept-block')
if key in cache:
self.updateItem(*key, cache[key])
def w_console(self):
return dict(type='accept-console')
def w_sendcommand(self, command):
logging.info('SENDCOMMAND %r', command)
def handle_command(self, command):
"""handle command if we can, else return False"""
if not command.strip():
return dict(type='accept-command')
if command.startswith('change '):
@ -101,62 +94,54 @@ class SecopClient:
module, _, parameter = modpar.partition(':')
if not parameter:
parameter = 'target'
node = self.instrument.node_map[module]
if module not in self.modules:
return False
logging.info('SENDCOMMAND %r', command)
try:
node.setParameterFromString(module, parameter, strvalue)
self.setParameterFromString(module, parameter, strvalue)
except Exception as e:
print(f"{e!r} converting {strvalue} to {node.modules[module]['parameters'][parameter]['datatype']}")
return dict(type='accept-command')
print(f"{e!r} converting {strvalue} to {self.modules[module]['parameters'][parameter]['datatype']}")
return True
def w_gettime(self, time):
"""parse time (using server time)
time: comma separated time range (beg,end) values < 1 year are treated as relative to the current time
"""
time = [float(t) for t in time.split(',')]
return dict(type='time', time=get_abs_time(time))
def poll(self):
def get_updates(self):
updates, self.updates = self.updates, {}
if not updates:
return []
messages = [dict(type='update', updates=list(updates.values()))]
return messages
return list(updates.values())
def info(self):
return ["na"]
class SecopInstrument(Instrument):
def __init__(self, inst_name, instrument_config):
super().__init__()
self.instrument_config = instrument_config
host_ports = instrument_config['hostport']
self.logger_dir = instrument_config.get('logger_dir', '')
# test_day = instrument_config.get('test_day', None)
# self.test_day = [int(x) for x in test_day.split('-')] if test_day else None
self.title = inst_name
self.device = 'UNDEFINED'
self.nodes = []
self.node_map = {}
for host_port in host_ports.split(','):
node = SecNodeClient(host_port)
node.connect()
self.nodes.append(node)
for name, mod in node.modules.items():
self.node_map[name] = node
def register(self, client):
print('OPEN')
for node in self.nodes:
node.register_callback(None, client.updateItem)
return super().register(client)
def remove(self, client):
print('REMOVE')
for node in self.nodes:
node.unregister_callback(None, client.updateItem)
super().remove(client)
def new_client(self):
return self.register(SecopClient(self))
# class SecopInstrument(HandlerBase):
#
# def __init__(self, inst_name, instrument_config):
# super().__init__()
# self.instrument_config = instrument_config
# host_ports = instrument_config['hostport']
# self.logger_dir = instrument_config.get('logger_dir', '')
# # test_day = instrument_config.get('test_day', None)
# # self.test_day = [int(x) for x in test_day.split('-')] if test_day else None
# self.title = inst_name
# self.device = ''
# self.nodes = []
# self.node_map = {}
# for host_port in host_ports.split(','):
# node = SecopClient(host_port)
# node.connect()
# self.nodes.append(node)
# for name, mod in node.modules.items():
# self.node_map[name] = node
#
# def register(self, client):
# print('OPEN')
# for node in self.nodes:
# node.register_callback(None, client.updateItem)
# return super().register(client)
#
# def remove(self, client):
# print('REMOVE')
# for node in self.nodes:
# node.unregister_callback(None, client.updateItem)
# super().remove(client)
#
# def new_client(self):
# return self.register(SecopClient(self))

View File

@ -43,7 +43,91 @@ def to_json_sse(msg):
return 'data: %s\n\n' % txt
instrument = None
class Server:
"""singleton: created once in this module"""
interactor_classes = None
client_cls = None
history_cls = None
history = None
single_instrument = None
db = None
def __init__(self):
self.instruments = {}
self.clients = {}
def remove(self, client):
try:
del self.clients[client.id]
except KeyError:
logging.warning('client already removed %s', client.id)
def lookup_streams(self, instrument, stream=None, device=None):
if self.single_instrument:
instrument = self.single_instrument
if stream:
if isinstance(stream, str):
streams = stream.split(',') if stream else []
else:
streams = stream
else:
streams = []
device_names = devices = device.split(',') if device else []
tags = {}
if instrument:
# tags['instrument'] = instrument
stream_dict = self.db.get_streams(instrument, stream=list(streams), device=devices)
streams.extend((s for s in stream_dict if s not in streams))
if not devices:
device_names = list(filter(None, (t.get('device') for t in stream_dict.values())))
if streams:
tags['stream'] = streams[0] if len(streams) == 1 else streams
if devices:
tags['device'] = devices[0] if len(devices) == 1 else devices
return streams, tags, ','.join(device_names)
def register_client(self, instrument=None, stream=None, device=None):
streams, tags, device_name = self.lookup_streams(instrument, stream, device)
client = self.client_cls(self, streams, instrument or '', device_name)
history = self.history_cls(self, instrument, device_name, tags)
# history.db.debug = True
# all relevant methods of the history instance are saved in client.handlers
# so there is no reference needed to history anymore
client.handlers.update(history.handlers)
self.clients[client.id] = client
return client
def run(self, port, db, history_cls, client_cls, single_instrument=None, **interactor_classes):
self.single_instrument = single_instrument
self.db = db
self.history_cls = history_cls
self.client_cls = client_cls
self.interactor_classes = interactor_classes
app.debug = True
logging.basicConfig(filename='webserver.log', filemode='w', level=logging.INFO,
format='%(asctime)s %(levelname)s %(message)s')
# srv = gevent.wsgi.WSGIServer(('', port), app, keyfile='key.key', certfile='key.crt')
srv = gevent.pywsgi.WSGIServer(('', port), app, log=logging.getLogger('server'))
def handle_term(sig, frame):
srv.stop()
srv.close()
signal.signal(signal.SIGTERM, handle_term)
# def handle_pdb(sig, frame):
# import pdb
# print('PDB')
# pdb.Pdb().set_trace(frame)
# signal.signal(signal.SIGUSR1, handle_pdb)
srv.serve_forever()
server = Server()
app = flask.Flask(__name__)
update_rider = circularlog.Rider("upd")
@ -51,9 +135,11 @@ pollinterval = 0.2
@app.route('/update')
def get_update(path=None):
def get_update(_=None):
# Client Adress: socket.getfqdn(flask.request.remote_addr)
client = instrument.new_client()
kwargs = {k: flask.request.values.get(k) for k in ('instrument', 'stream', 'device')}
client = server.register_client(**kwargs)
client.remote_info = circularlog.strtm() + " " + socket.getfqdn(flask.request.remote_addr.split(':')[-1])
@flask.stream_with_context
@ -61,7 +147,8 @@ def get_update(path=None):
logging.info('UPDATE %s %s', client.id, socket.getfqdn(flask.request.remote_addr.split(':')[-1]))
# msg = dict(type='id', id=client.id, title=instrument.title);
# yield to_json_sse(msg)
msg = dict(type='id', id=client.id, instrument=instrument.title, device=instrument.device)
msg = dict(type='id', id=client.id, instrument=kwargs.get('instrument', '<unknown>'),
device=client.device_name)
yield to_json_sse(msg)
try:
lastmsg = time.time()
@ -87,12 +174,11 @@ def get_update(path=None):
logging.info("except clause %r", repr(e))
logging.info('CLOSED %s', client.id)
print('CLOSE client')
instrument.remove(client)
pass
server.remove(client)
except Exception as e:
logging.info('error')
logging.error('%s', traceback.format_exc())
instrument.remove(client)
server.remove(client)
# msg = dict(type='error',error=traceback.format_exc())
# yield to_json_sse(msg)
@ -110,8 +196,8 @@ def dump_circular():
@app.route('/clients')
def show_clients():
result = ""
for id in instrument.clients:
c = instrument.clients[id]
for id in server.clients:
c = server.clients[id]
result += c.remote_info + " " + "; ".join(c.info()) + "<br>"
return result
@ -124,9 +210,8 @@ def export():
logging.info('GET %s %s', path, repr(kwargs))
try:
id = kwargs.pop('id')
print('export')
client = instrument.clients[id]
bytes = client.w_export(**kwargs)
client = server.clients[id]
bytes = client.handlers['export'](**kwargs)
return flask.send_file(
bytes,
as_attachment=True,
@ -159,14 +244,19 @@ def reply():
logging.info('GET %s %r', path, kwargs)
try:
id = kwargs.pop('id')
client = instrument.clients[id]
msg = getattr(client, "w_" + path[1:])(**kwargs)
client = server.clients[id]
msg = client.handlers[path[1:]](**kwargs)
except Exception as e:
logging.error('%s', traceback.format_exc())
circularlog.log()
msg = dict(type='error', request=path[1:], error=repr(e))
logging.info('REPLY %s %r', path, msg)
resp = flask.Response(json.dumps(msg), mimetype='application/json')
jsonmsg = json.dumps(msg)
if len(jsonmsg) < 120:
logging.info('REPLY %s %s', path, jsonmsg)
else:
logging.info('REPLY %s %s...', path, jsonmsg[:80])
logging.debug('REPLY %s %r', path, jsonmsg)
resp = flask.Response(jsonmsg, mimetype='application/json')
resp.headers['Access-Control-Allow-Origin'] = '*'
return resp
@ -207,9 +297,108 @@ def replace_by_empty(file):
@app.route('/')
def default():
if not any(flask.request.values.get(k) for k in ('instrument', 'server', 'device')):
if not server.single_instrument:
return select_experiment()
return general_file('SEAWebClient.html')
@app.route('/select_instrument')
def select_instrument():
out = ['''<html><body><table>
<style>
th {
text-align: left;
}
</style>
<tr><th>instrument</th><th colspan=99>devices</th></tr>''']
result = {}
for stream, tags in server.db.get_streams().items():
ins = tags.get('instrument', '0')
result.setdefault(ins, []).append((stream, tags.get('device')))
bare_streams = result.pop('0', [])
for ins, streams in result.items():
out.append(f'<tr><td><a href="/?ins={ins}">{ins}</a></td>')
out.extend(f'<td>{d or s}</td>' for s, d in streams)
out.append('</tr>')
for stream, device in bare_streams:
out.append(f'<tr><td><a href="/?srv={stream}">{stream}</a></td><td>{device}</td><tr>')
out.extend(['</table></body></html>', ''])
return '\n'.join(out)
@app.route('/select_experiment')
def select_experiment():
out = ['''<html><head>
<meta name="viewport"
content="width=device-width, initial-scale=1.0, minimum-scale=1.0, maximum-scale=1.0, user-scalable=no">
<style>
th {
text-align: left;
background-color: #cccccc;
}
a {
text-decoration: none;
}
</style></head>
<body><table>
''']
showtitle = 0
ONEMONTH = 30 * 24 * 3600
def title(text):
out.append(f'<tr><td colspan=2><b>{text}</b></td></tr>')
# TODO: sort this by (instrument / device) and list dates
# period format: Ymd..Ymd, Ymd (single date), Ymd..now, HM..now
try:
now = time.time()
timerange = flask.request.values.get('time')
if timerange == 'all':
starttime, endtime = None, None
elif timerange:
timerange = timerange.split(',')
starttime, endtime = [None if timerange[i] == '0' else int(timerange[i]) for i in (0, -1)]
else:
starttime, endtime = now - ONEMONTH, now
chunk_list = []
for key, chunk_dict in server.db.get_experiments(starttime, endtime).items():
for (streams, devices), chunks in chunk_dict.items():
chunk_list.extend((r[1], r[0], key, devices) for r in chunks)
chunk_list.sort(reverse=True)
for end, beg, key, devices in chunk_list:
today, begdate, enddate = (time.strftime("%Y-%m-%d", time.localtime(t)) for t in (now, beg, end))
args = ['='.join(key)]
if end > now:
if begdate == today:
daterange = f'since {time.strftime("%H:%M", time.localtime(beg))}'
else:
daterange = f'since {begdate}'
if showtitle == 0:
title('currently running')
showtitle = 1
else:
daterange = begdate if begdate == enddate else f'{begdate}...{enddate}'
if end > now - ONEMONTH:
if showtitle == 1:
title('older than 30 days')
showtitle = 2
print('A', args)
out.append(f'<tr><th><a href="/?{"&".join(args)}">{key[1]} / {" ".join(devices)}</a></th>')
out.append(f'<td>{daterange}</td></tr>')
if timerange:
out.append(f'<h3><a href="/select_experiment?time=all">earlier dates</a></h3><br>')
out.extend(['</table></body></html>', ''])
except Exception as e:
logging.error('%s', traceback.format_exc())
circularlog.log()
out = [f"ERROR {e!r}"]
return '\n'.join(out)
@app.route('/<file>')
def general_file(file):
subdir = "client/"
@ -225,37 +414,3 @@ def general_file(file):
def hostport_split(hostport):
h = hostport.split(':')
return (h[0], int(h[1]))
# def handle_pdb(sig, frame):
# import pdb
# print('PDB')
# pdb.Pdb().set_trace(frame)
def main(cls, **config):
global instrument
if not config:
for arg in sys.argv[1:]:
key, _, value = arg.partition('=')
config[key] = value
port = int(config['port'])
inst_name = config['instrument']
instrument = cls(inst_name, config)
# signal.signal(signal.SIGUSR1, handle_pdb)
def handle_term(sig, _):
server.stop()
server.close()
signal.signal(signal.SIGTERM, handle_term)
app.debug = True
logging.basicConfig(filename='log/%s.log' % inst_name, filemode='w', level=logging.INFO,
format='%(asctime)s %(levelname)s %(message)s')
#server = gevent.wsgi.WSGIServer(('', port), app, keyfile='key.key', certfile='key.crt')
server = gevent.pywsgi.WSGIServer(('', port), app, log=logging.getLogger('server'))
server.serve_forever()