diff --git a/base.py b/base.py new file mode 100644 index 0000000..1008fe3 --- /dev/null +++ b/base.py @@ -0,0 +1,36 @@ +import time +import logging + + +class Logger(object): + def __init__(self, logpath): + self.terminal = sys.stdout + self.log = open(logpath, "a") + + def write(self, message): + self.terminal.write(message) + self.log.write(message) + + def flush(self): + pass + + +class Instrument: + def __init__(self): + pass + + def remove(self, client): + try: + del self.clients[client.id] + except KeyError: + logging.warning('client already removed %s', client.id) + + def register(self, client): + self.clients[client.id] = client + return client + + +def get_abs_time(times): + now = int(time.time() + 0.999) + oneyear = 365 * 24 * 3600 + return [t + now if t < oneyear else t for t in times] diff --git a/client/jsFiles/SEAWebClientCommunication.js b/client/jsFiles/SEAWebClientCommunication.js index 4ad9c49..7d3dcb9 100644 --- a/client/jsFiles/SEAWebClientCommunication.js +++ b/client/jsFiles/SEAWebClientCommunication.js @@ -208,7 +208,7 @@ function updateValues(message, src) { let elem = matches[j]; let type = elem.__ctype__; if (type == "rdonly" || type == "rdlink") { - let text = htmlEscape(value); + let text = htmlEscape(component.formatted); if (text) { elem.innerHTML = text; } diff --git a/config/generic.ini b/config/generic.ini index 66a37a0..d9a4e87 100644 --- a/config/generic.ini +++ b/config/generic.ini @@ -1,6 +1,8 @@ [chart] tt=unit:K tt.target=unit:K +tt.set_power=unit:W +tt.power=unit:W cc=- hemot.target=- mf=unit:T diff --git a/influxgraph.py b/influxgraph.py index b070783..e25fb56 100644 --- a/influxgraph.py +++ b/influxgraph.py @@ -4,24 +4,33 @@ from colors import assign_colors_to_curves import json import io from chart_config import ChartConfig +from base import Instrument + class InfluxGraph: """ - Class implementing the logic of the different routes that are called by the client to retrieve graph data with InfluxDB. + Class implementing the logic of the different routes that are called by + the client to retrieve graph data with InfluxDB. Global constants : - HISTORICAL (int) : value that represents the "historical" visualization mode, meaning that the most recent point is not in the visualisation window (no live data is sent). - ACTUAL (int) : value that represents the "actual" visualization mode, wihch is an intermediate state used before going for live mode (the requested time window includes now) - LIVE (int) : value that represents the "live" visualization mode, meaning that new points are sent to the client. + HISTORICAL (int) : value that represents the "historical" visualization mode, meaning that the + most recent point is not in the visualisation window (no live data is sent). + ACTUAL (int) : value that represents the "actual" visualization mode, wihch is an intermediate + state used before going for live mode (the requested time window includes now) + LIVE (int) : value that represents the "live" visualization mode, meaning that new points are + sent to the client. Attributes : influx_data_getter (InfluxDataGetter) : the InfluxDataGetter instance that allows to get data out of InfluxDB. chart_configs ([ChartConfig]) : an array of chart configuration to apply when /getvars is called livemode (int) : the type of visualization the user is currently in. Can be HISTORICAL, ACTUAL or LIVE. - end_query (int) : the unix timestamp in seconds of the most recent requested point in time of the last query or update. - lastvalues ({(str):((int), (float))}) : a dictionnary where the keys are the variable names, and the values are tuples, where the first - value is the unix timestamp of the most recent value known for this variable, and the second value its corresponding value - variables ({(str):(str)}) : a dictionnary of the current available variables requested by the client. The key is the InfluxDB name of the curve, and the value is its label in the GUI. + end_query (int) : the unix timestamp in seconds of the most recent requested point in time of the last query + or update. + lastvalues ({(str):((int), (float))}) : a dictionnary where the keys are the variable names, and the values + are tuples, where the first value is the unix timestamp of the most recent value known for this variable, + and the second value its corresponding value + variables ({(str):(str)}) : a dictionnary of the current available variables requested by the client. + The key is the InfluxDB name of the curve, and the value is its label in the GUI. """ HISTORICAL = 0 ACTUAL = 1 @@ -33,13 +42,12 @@ class InfluxGraph: self.livemode = self.HISTORICAL self.end_query = 0 self.lastvalues = {} - self.variables = {} # name:label - + self.variables = {} # name:label def get_abs_time(self, times): """ - Gets the absolute times for the given pontential relative times. If the given timestamps are less than one year, then the value is relative - and converted into an asbolute timestamps + Gets the absolute times for the given potential relative times. If a given timestamp is less than + one year, then the value is relative and converted into an absolute timestamp Parameters : times([(float)]) : an array of unix timestamps or relative duration (< 1 year) as floats @@ -58,7 +66,7 @@ class InfluxGraph: Parameters : result ({(str):[[(int),(float)]]}) : a dictionnary with the variable names as key, and an array of points, - which are an array containing the timestamp as their first value, and the y-value in float as their second one. + which are a tuple (timestamp, y-value as float) endtime (int) : the unix timestamp in seconds of the time we want to have data until """ for var, c in result.items(): @@ -75,12 +83,14 @@ class InfluxGraph: Parameters : variables (str) : a comma separataed value string of variable names (influx names) to retrieve - time (str) : a commma separated value string (range) of seconds. They are treated as relative from now if they are lesser than one year. + time (str) : a commma separated value string (range) of seconds. They are treated as relative from now + if they are lesser than one year. interval (str) : the interval (resolution) of the values to get (string in milliseconds) Returns : - {"type":"graph-draw", "graph":{(str):[[(int),(float)]]}} : a dictionnary with its "graph-draw" type (so it can be processed by the client), and a "graph" dictionnary with the variable names as key, and an array of points, - which are an array containing the timestamp as their first value, and the y-value in float as their second one. + {"type":"graph-draw", "graph":{(str):[[(int),(float)]]}} : a dictionnary with its "graph-draw" type + (so it can be processed by the client), and a "graph" dictionnary with the variable names as key, + and an array of points as a tuple (timestamp, y-value as float) """ time = [float(t) for t in time.split(',')] start, end, now = self.get_abs_time(time + [0]) @@ -103,11 +113,12 @@ class InfluxGraph: Called when the route /gettime is reached. Parameters : - time (str="-1800,0") : the given point in time represented by a string, which is a comma separated unix timestamp values list (in seconds). They are treated as relative from now if they are lesser than one year. + time (str="-1800,0") : the given point in time represented by a string, which is a comma separated unix + timestamp values list (in seconds). They are treated as relative from now if they are lesser than one year. Returns : - {"type":"time", "time":(int)} : a dictionnary with its "time" type (so the data can be processed by the client) and the server unix timestamp in seconds corresponding - to the time asked by the client + {"type":"time", "time":(int)} : a dictionnary with its "time" type (so the data can be processed by the + client) and the server unix timestamp in seconds corresponding to the time asked by the client """ time = [float(t) for t in time.split(',')] return dict(type='time', time= self.get_abs_time(time)) @@ -118,13 +129,17 @@ class InfluxGraph: Called when the route /getvars is reached. Parameters : - time (str) : the given point in time represented by a string, which is a unix timestamp in seconds. It is treated as relative from now if it is lesser than one year. + time (str) : the given point in time represented by a string, which is a unix timestamp in seconds. + It is treated as relative from now if it is lesser than one year. userconfiguration (str|None) : the JSON string representing the user configuration Returns : - {"type":"var_list", "device":(str), "blocks":[{"tag":(str),"unit":(str), "curves":[{"name":(str), "label":(str), "color":(str), "original_color":(str)}]}]} : - a dictionnary with its "var_list" type (so the data can be processed by the client), the device that was currently set at that time, and the available curves with the name of the internal variable, - the color to display for this curve, its original color in SEA, grouped by their tag (which is a category or unit if absent) and their unit (in "blocks") + {"type":"var_list", "device":(str), "blocks":[{"tag":(str),"unit":(str), "curves": + [{"name":(str), "label":(str), "color":(str), "original_color":(str)}]}]}: + a dictionnary with its "var_list" type (so the data can be processed by the client), the device that + was currently set at that time, and the available curves with the name of the internal variable, + the color to display for this curve, its original color in SEA, grouped by their tag (which is a + category or unit if absent) and their unit (in "blocks") """ time = [float(t) for t in time.split(',')] @@ -148,7 +163,8 @@ class InfluxGraph: Sets the current visualisation mode to LIVE if not in HISTORICAL mode. Called when the route /updategraph is reached. Returns : - {"type":"accept-graph", "live": bool} : a dict with its "accept-graph" type and a "live" value telling if the server could change its visualization mode to live + {"type":"accept-graph", "live": bool} : a dict with its "accept-graph" type and a "live" + value telling if the server could change its visualization mode to live """ logging.info("UPD GRAPH %d", self.livemode) if self.livemode == self.HISTORICAL: @@ -188,12 +204,16 @@ class InfluxGraph: def graphpoll(self): """ - Polls the last known values for all the available variables, and returns only those whose polled values are more recent than the most recent displayed one. - Every plain minute, all the variables are returned with a point having their last known value at the current timestamp to synchronize all the curves on the GUI. + Polls the last known values for all the available variables, and returns only those whose polled values + are more recent than the most recent displayed one. + Every plain minute, all the variables are returned with a point having their last known value at the current + timestamp to synchronize all the curves on the GUI. Returns : - {"type":"graph-update", "time":(int), "graph":{(str):[[(int),(float)]]}} | None : a dictionnary with its "graph-update" type - (so it can be processed by the client), and a "graph" dictionnary with the variable names as key, and an array of points, which are an array containing the timestamp + {"type":"graph-update", "time":(int), "graph":{(str):[[(int),(float)]]}} | None : + a dictionnary with its "graph-update" type + (so it can be processed by the client), and a "graph" dictionnary with the variable names as key, + and an array of points, which are an array containing the timestamp as their first value, and the y-value in float as their second one """ if self.livemode != self.LIVE: @@ -203,7 +223,8 @@ class InfluxGraph: result = self.influx_data_getter.poll_last_values(list(self.variables.keys()), self.lastvalues, now) for variable, values in list(result.items()): tlast = self.lastvalues.get(variable, (0,))[0] - # removes points older than the last known point (queries are in seconds and might return points already displayed) + # removes points older than the last known point + # (queries are in seconds and might return points already displayed) while values and values[0][0] <= tlast: values.pop(0) if values and values[-1][0] > tlast: @@ -219,3 +240,54 @@ class InfluxGraph: if len(result) > 0: return dict(type='graph-update', time=now, graph=result) return None + + +class InfluxInstrument(Instrument): + + def __init__(self, instr_name, inst_config=None): + self.db = InfluxDB() + self.influx_data_getter = InfluxDataGetter(self.db, inst_name) + self.clients = {} + self.title = instr_name + self.device = self.influx_data_getter.get_device_name(int(datetime.now().timestamp())) + + def new_client(self): + return self.register(InfluxClient()) + + +class InfluxParams: + """Class with dummy routes, in case client side is started with the right part init commands""" + def __init__(self): + self.id = uuid.uuid4().hex[0:15] + self.queue = [] + + def poll(self): + messages = self.queue + self.queue = [] + msg = self.graphpoll() + if msg: + messages.append(msg) + return messages + + def info(self): + return ["na"] + + def w_getblock(self, path): + return dict(type='draw', title="graph", path=path, components=[]) + + def w_updateblock(self, path): + return dict(type='accept-block') + + def w_console(self): + return dict(type='accept-console') + + def w_sendcommand(self, command): + return dict(type='accept-command') + + +class InfluxClient(InfluxParams, InfluxGraph): + def __init__(self): + InfluxParams.__init__(self) + InfluxGraph.__init__(self, instrument.influx_data_getter, instrument.title) + + diff --git a/seagraph.py b/seagraph.py deleted file mode 100644 index 2d4e4ab..0000000 --- a/seagraph.py +++ /dev/null @@ -1,230 +0,0 @@ -from datetime import date -import time -import sys -import os -import logging -import json -import numpy as np - -class PrettyFloat(float): - """saves bandwidth when converting to JSON - - a lot of numbers originally have a fixed (low) number of decimal digits - as the binary representation is not exact, it might happen, that a - lot of superfluous digits are transmitted: - - str(1/10*3) == '0.30000000000000004' - str(PrettyFloat(1/10*3)) == '0.3' - """ - def __repr__(self): - return '%.15g' % self - -#encode = "!#$%&'()*+,-./0123456789:;<=>?@ABCDEFGHIJKLMNOPQRSTUVWXYZ[]^_`abcdefghijklmnopqrstuvwxyz{|}~" - -def get_abs_time(times): - now = int(time.time() + 0.999) - oneyear = 365 * 24 * 3600 - return [t + now if t < oneyear else t for t in times] - -class Scanner(object): - def __init__(self, directory, test_day=None): - self.directory = directory - self.last_time = {} - self.test_day = test_day - - def scan(self, variable, timerange, result): - """return value is true when there are aditionnal points after the time range""" - start, to, now = get_abs_time(timerange + [0]) - old = None - t = 0 - for di in range(date.fromtimestamp(start).toordinal(), date.fromtimestamp(to).toordinal() + 1): - d = date.fromordinal(di) - year, mon, day = self.test_day if self.test_day else (d.year, d.month, d.day) - path = self.directory + "logger/%d/%s/%.2d-%.2d.log" % \ - (year, variable.lower(), mon, day) - try: - # logging.info("logger path %s", path) - with open(path) as f: - t0 = time.mktime((d.year, d.month, d.day, 0, 0, 0, 0, 0, -1)) - for line in f: - if line[0] != '#': - t = t0 + (int(line[0:2]) * 60 + int(line[3:5])) * 60 + int(line[6:8]) - if line[-1:] == '\n': - value = line[9:-1] - else: - value = line[9:] - if t < start: - old = value - else: - if old is not None: - self.next(start, old, result) - old = None - self.next(t, value, result) - if t > to: - break - except IOError: - # print(f'error reading {path}') - pass - if t < start: - #if t == 0: - # t = start - if old is not None: - self.next(t, old, result) - if t != self.last_time.get(variable,0): - self.last_time[variable] = t - return True - return False - -class NumericScanner(Scanner): - def __init__(self, *args, **kwargs): - Scanner.__init__(self, *args, **kwargs) - - def next(self, t, value, result): - try: - value = PrettyFloat(value) - except: - value = None - result.append([PrettyFloat(t), value]) - #self.value = value - #self.last = t - - def get_message(self, variables, timerange, show_empty=True): - self.dirty = False - result = {} - for var in variables: - self.last = 0 - curve = [] - if self.scan(var, timerange, curve): - self.dirty = True - if show_empty or len(curve) > 1: - result[var] = curve - return result - -class ColorMap(object): - ''' - ColorMap is using official CSS color names, with the exception of Green, as this - is defined differently with X11 colors than in SEA, and used heavily in config files. - Here Green is an alias to Lime (#00FF00) and MidGreen is #008000, which is called Green in CSS. - The function to_code is case insensitive and accepts also names with underscores. - The order is choosen by M. Zolliker for the SEA client, originally only the first 16 were used. - ''' - hex_name = (("#FFFFFF","White"), ("#FF0000","Red"), ("#00FF00","Lime"), ("#0000FF","Blue"), ("#FF00FF","Magenta"), - ("#FFFF00","Yellow"), ("#00FFFF","Cyan"), ("#000000","Black"), ("#FFA500","Orange"), ("#006400","DarkGreen"), - ("#9400D3","DarkViolet"), ("#A52A2A","Brown"), ("#87CEEB","SkyBlue"), ("#808080","Gray"), ("#FF69B4","HotPink"), - ("#FFFFE0","LightYellow"), ("#00FF7F","SpringGreen"), ("#000080","Navy"), ("#1E90FF","DodgerBlue"), - ("#9ACD32","YellowGreen"), ("#008B8B","DarkCyan"), ("#808000","Olive"), ("#DEB887","BurlyWood"), - ("#7B68EE","MediumSlateBlue"), ("#483D8B","DarkSlateBlue"), ("#98FB98","PaleGreen"), ("#FF1493","DeepPink"), - ("#FF6347","Tomato"), ("#32CD32","LimeGreen"), ("#DDA0DD","Plum"), ("#7FFF00","Chartreuse"), ("#800080","Purple"), - ("#00CED1","DarkTurquoise"), ("#8FBC8F","DarkSeaGreen"), ("#4682B4","SteelBlue"), ("#800000","Maroon"), - ("#3CB371","MediumSeaGreen"), ("#FF4500","OrangeRed"), ("#BA55D3","MediumOrchid"), ("#2F4F4F","DarkSlateGray"), - ("#CD853F","Peru"), ("#228B22","ForestGreen"), ("#48D1CC","MediumTurquoise"), ("#DC143C","Crimson"), - ("#D3D3D3","LightGray"), ("#ADFF2F","GreenYellow"), ("#7FFFD4","Aquamarine"), ("#BC8F8F","RosyBrown"), - ("#20B2AA","LightSeaGreen"), ("#C71585","MediumVioletRed"), ("#F0E68C","Khaki"), ("#6495ED","CornflowerBlue"), - ("#556B2F","DarkOliveGreen"), ("#CD5C5C","IndianRed "), ("#2E8B57","SeaGreen"), ("#F08080","LightCoral"), - ("#8A2BE2","BlueViolet"), ("#AFEEEE","PaleTurquoise"), ("#4169E1","RoyalBlue"), ("#0000CD","MediumBlue"), - ("#B8860B","DarkGoldenRod"), ("#00BFFF","DeepSkyBlue"), ("#FFC0CB","Pink"), ("#4B0082","Indigo "), ("#A0522D","Sienna"), - ("#FFD700","Gold"), ("#F4A460","SandyBrown"), ("#DAA520","GoldenRod"), ("#DA70D6","Orchid"), ("#E6E6FA","Lavender"), - ("#5F9EA0","CadetBlue"), ("#D2691E","Chocolate"), ("#66CDAA","MediumAquaMarine"), ("#6B8E23","OliveDrab"), - ("#A9A9A9","DarkGray"), ("#BDB76B","DarkKhaki"), ("#696969","DimGray"), ("#B0C4DE","LightSteelBlue"), - ("#191970","MidnightBlue"), ("#FFE4C4","Bisque"), ("#6A5ACD","SlateBlue"), ("#EE82EE","Violet"), - ("#8B4513","SaddleBrown"), ("#FF7F50","Coral"), ("#008000","MidGreen"), ("#DB7093","PaleVioletRed"), ("#C0C0C0","Silver"), - ("#E0FFFF","LightCyan"), ("#9370DB","MediumPurple"), ("#FF8C00","DarkOrange"), ("#00FA9A","MediumSpringGreen"), - ("#E9967A","DarkSalmon"), ("#778899","LightSlateGray"), ("#9932CC","DarkOrchid"), ("#EEE8AA","PaleGoldenRod"), - ("#F8F8FF","GhostWhite"), ("#FFA07A","LightSalmon"), ("#ADD8E6","LightBlue"), ("#D8BFD8","Thistle"), - ("#FFE4E1","MistyRose"), ("#FFDEAD","NavajoWhite"), ("#40E0D0","Turquoise"), ("#90EE90","LightGreen"), - ("#B22222","FireBrick"), ("#008080","Teal"), ("#F0FFF0","HoneyDew"), ("#FFFACD","LemonChiffon"), ("#FFF5EE","SeaShell"), - ("#F5F5DC","Beige"), ("#DCDCDC","Gainsboro"), ("#FA8072","Salmon"), ("#8B008B","DarkMagenta"), ("#FFB6C1","LightPink"), - ("#708090","SlateGray"), ("#87CEFA","LightSkyBlue"), ("#FFEFD5","PapayaWhip"), ("#D2B48C","Tan"), ("#FFFFF0","Ivory"), - ("#F0FFFF","Azure"), ("#F5DEB3","Wheat"), ("#00008B","DarkBlue"), ("#FFDAB9","PeachPuff"), ("#8B0000","DarkRed"), - ("#FAF0E6","Linen"), ("#B0E0E6","PowderBlue"), ("#FFE4B5","Moccasin"), ("#F5F5F5","WhiteSmoke"), ("#FFF8DC","Cornsilk"), - ("#FFFAFA","Snow"), ("#FFF0F5","LavenderBlush"), ("#FFEBCD","BlanchedAlmond"), ("#F0F8FF","AliceBlue"), - ("#FAEBD7","AntiqueWhite"), ("#FDF5E6","OldLace"), ("#FAFAD2","LightGoldenRodYellow"), ("#F5FFFA","MintCream"), - ("#FFFAF0","FloralWhite"), ("#7CFC00","LawnGreen"), ("#663399","RebeccaPurple")) - codes = {} - for i, pair in enumerate(hex_name): - codes[pair[0]] = i - low = pair[1].lower() - codes[low] = i - codes[low.replace("gray","grey")] = i - codes["green"] = 2 - codes["fuchsia"] = 4 - codes["aqua"] = 6 - - @staticmethod - def to_code(colortext): - try: - return int(colortext) - except ValueError: - return ColorMap.codes.get(colortext.lower().replace("_",""),-1) - - @staticmethod - def check_hex(code): - if not code.startswith("#"): - return None - if len(code) == 4: # convert short code to long code - code = code[0:2] + code[1:3] + code[2:4] + code[3] - if len(code) != 7: - return None - try: - int(code[1:]) # we have a valid hex color code - return code - except ValueError: - return None - - @staticmethod - def to_hex(code): - try: - return ColorMap.hex_name[code][0] - except IndexError: - return -1 - -class VarsScanner(Scanner): - colors = {"red":0} - def __init__(self, directory, test_day=None): - Scanner.__init__(self, directory, test_day=test_day) - logging.info('vars dir %s', directory) - - def next(self, t, value, result): - logging.info('vars %s', value) - for var in value.strip().split(" "): - vars = var.split("|") - if len(vars) == 1: - vars.append("") - if len(vars) == 2: - vars.append(vars[0]) - if len(vars) == 3: - vars.append("") - name, unit, label, color = vars - if not unit in result: - result[unit] = dict(tag = unit, unit = unit.split("_")[0], curves=[]) - result[unit]["curves"].append(dict(name=name, label=label, color=color)) - - def get_message(self, time): - # get last value only - result = {} - self.scan("vars", [time, time], result) - for unit in result: - color_set = set() - auto_curves = [] - for curve in result[unit]["curves"]: - col = curve["color"].strip() - c = ColorMap.to_code(col) - if c < 0: - valid = ColorMap.check_hex(col) - if valid: - curve["original_color"] = col - curve["color"] = valid - else: - auto_curves.append(curve) - curve["original_color"] = col + "?" - else: - color_set.add(c) - curve["original_color"] = col - curve["color"] = ColorMap.to_hex(c) - c = 1 # omit white - for curve in auto_curves: - while c in color_set: c += 1 # find unused color - curve["color"] = ColorMap.to_hex(c) - c += 1 - return result - diff --git a/seaweb.py b/seaweb.py deleted file mode 100755 index 8d27e92..0000000 --- a/seaweb.py +++ /dev/null @@ -1,1213 +0,0 @@ -#!/usr/bin/env python - -import sys -sys.path.append("./lib") -# Make sure your gevent version is >= 1.0 -import gevent -import gevent.pywsgi -import gevent.queue -import flask -import time -import pprint -import random -import time -from datetime import date, datetime -from collections import deque -import sys -import socket -import tcp_lineserver -import uuid -import seagraph -import traceback -import logging -import circularlog -from gevent.lock import RLock -import os -import signal - -from influxgraph import InfluxGraph - -from influxdb import InfluxDB, InfluxDataGetter - -try: import simplejson as json -except ImportError: import json - -def guess_mimetype(filename): - if filename.endswith('.js'): - mimetype = 'text/javascript' - elif filename.endswith('.css'): - mimetype = 'text/css' - elif filename.endswith('.ico'): - mimetype = 'image/x-icon' - elif filename.endswith(".png"): - mimetype = "image/png" - else: - mimetype = 'text/html' - return mimetype - -#class SeawebException(Exception): -# pass - -# SSE 'protocol' is described here: http://mzl.la/UPFyxY -def to_json_sse(msg): - txt = json.dumps(msg, separators=(',',': ')) - return 'data: %s\n\n' % txt - -app = flask.Flask(__name__) - -update_rider = circularlog.Rider("upd") -pollinterval = 0.2 - -@app.route('/update') -def get_update(path=None): - # Client Adress: socket.getfqdn(flask.request.remote_addr) - client = instrument.newClient() - client.remote_info = circularlog.strtm() + " " + socket.getfqdn(flask.request.remote_addr.split(':')[-1]) - - @flask.stream_with_context - def generator(): - logging.info('UPDATE %s %s', client.id, socket.getfqdn(flask.request.remote_addr.split(':')[-1])) - #msg = dict(type='id', id=client.id, title=instrument.title); - #yield to_json_sse(msg) - msg = dict(type='id', id=client.id, instrument=instrument.title, device=instrument.device) - yield to_json_sse(msg) - try: - lastmsg = time.time() - while True: - if client.info() == "": - print(time.time()-lastmsg) - messages = client.poll() - for msg in messages: - update_rider.put('-', repr(msg)) - yield to_json_sse(msg) - if messages: - lastmsg = time.time() - else: - if time.time() > lastmsg + 30: - if not client.info(): - raise GeneratorExit("no activity") - logging.info('HEARTBEAT %s (%s)', client.id, "; ".join(client.info())) - yield to_json_sse(dict(type='heartbeat')) - lastmsg = time.time() - else: - gevent.sleep(pollinterval) - except (GeneratorExit, tcp_lineserver.Disconnected) as e: - logging.info("except clause %r", repr(e)) - logging.info('CLOSED %s', client.id) - print('CLOSE client') - instrument.remove(client) - pass - except Exception as e: - logging.info('error') - logging.error('%s', traceback.format_exc()) - instrument.remove(client) - #msg = dict(type='error',error=traceback.format_exc()) - #yield to_json_sse(msg) - - resp = flask.Response(generator(), mimetype='text/event-stream') - resp.headers['Access-Control-Allow-Origin'] = '*' - return resp - -@app.route('/circular') -def dump_circular(): - circularlog.log() - return "log" - -@app.route('/clients') -def show_clients(): - result = "" - for id in instrument.clients: - c = instrument.clients[id] - result += c.remote_info + " " + "; ".join(c.info()) + "
" - return result - -@app.route('/export') -def export(): - args = flask.request.args - kwargs = dict((k, args.get(k)) for k in args) - path = flask.request.path - logging.info('GET %s %s', path, repr(kwargs)) - try: - id = kwargs.pop('id') - client = instrument.clients[id] - bytes = client.w_export(**kwargs) - return flask.send_file( - bytes, - as_attachment=True, - download_name='export.tsv', - mimetype='text/tab-separated-values' - ) - - except Exception as e: - logging.error('%s', traceback.format_exc()) - circularlog.log() - msg = dict(type='error', request=path[1:], error=repr(e)) - logging.error('MSG: %r', msg) - resp = flask.Response(json.dumps(msg), mimetype='application/json') - resp.headers['Access-Control-Allow-Origin'] = '*' - return resp - -@app.route('/getblock') -@app.route('/updateblock') -@app.route('/sendcommand') -@app.route('/console') -@app.route('/graph') -@app.route('/updategraph') -@app.route('/gettime') -@app.route('/getvars', methods=["GET", "POST"]) -def reply(): - args = flask.request.values - kwargs = dict((k, args.get(k)) for k in args) - path = flask.request.path - logging.info('GET %s %s', path, repr(kwargs)) - try: - id = kwargs.pop('id') - client = instrument.clients[id] - msg = getattr(client, "w_" + path[1:])(**kwargs) - except Exception as e: - logging.error('%s', traceback.format_exc()) - circularlog.log() - msg = dict(type='error', request=path[1:], error=repr(e)) - resp = flask.Response(json.dumps(msg), mimetype='application/json') - resp.headers['Access-Control-Allow-Origin'] = '*' - return resp - -@app.route('/test/') -def subdir_test_file(file): - gevent.sleep(2) - resp = flask.send_file("client/test/"+file, mimetype=guess_mimetype(file)) - return resp - -@app.route('/components/curves_settings_popup/color_selector/') -@app.route('/components/curves_settings_popup/') -@app.route('/components/action_entry/') -@app.route('/components/export_popup/') -@app.route('/components/dates_popup/') -@app.route('/components/menu_popup/') -@app.route('/components/help_popup/') -@app.route('/components/help_entry/') -@app.route('/components/control/') -@app.route('/components/divider/') -@app.route('/components/states_indicator/dates/') -@app.route('/res/') -@app.route('/jsFiles/') -@app.route('/cssFiles/') -@app.route('/externalFiles/') -def subdir_file(file): - subdir = "/".join(flask.request.path.split("/")[1:-1]) - resp = flask.send_file("client/" + subdir+"/"+file, mimetype=guess_mimetype(file)) - #resp.headers['Content-Security-Policy'] = "sandbox; script-src 'unsafe-inline';" - return resp - -@app.route('/externalFiles/maps/.map') -def replace_by_empty(file): - return "" - -@app.route('/') -def default(): - return general_file('SEAWebClient.html') - -@app.route('/') -def general_file(file): - subdir = "client/" - try: - resp = flask.send_file(subdir+file, mimetype=guess_mimetype(file)) - except FileNotFoundError: - logging.warning('file %s not found', file) - return 'file not found' - #resp.headers['Content-Security-Policy'] = "sandbox; script-src 'unsafe-inline';" - return resp - -def hostport_split(hostport): - h = hostport.split(':') - return (h[0], int(h[1])) - -def sea_request_reply(socket, command, tmo=5): - t = 0 - # wait for at most 60 seconds for not beeing busy - while socket.busy: - if t >= 60: - logging.error('still busy at %s (before command %s)', getattr(socket, "name", "noname"), command) - socket.busy = False - else: - gevent.sleep(0.1) - t += 0.1 - if t > 5: - logging.warning('unusual wait time %.4g (before command %s)', t, command) - socket.busy = True - socket.send_line("fulltransact "+command) - data = [] - dumpdata = [] - t = 0 - while True: - while socket.busy: - line = socket.get_line() - if line != None: - t = 0 - break - if t >= tmo: - socket.busy = False - logging.error('timeout on command %s (%s)', command, getattr(socket, "name", "noname")) - socket.reconnect() - raise Exception("timeout") - gevent.sleep(0.1) - t += 0.1 - else: - logging.error('interrupted command %s (%s)', command, getattr(socket, "name", "noname")) - socket.reconnect() - raise Exception("timeout") - dumpdata.append(line) - if line == 'TRANSACTIONFINISHED': - break - elif line.startswith('TRANSACTIONSTART '): - data = [] - else: - data.append(line) - if t>2: - logging.info('DUMPDATA %.4g %s', t, '|'.join(dumpdata)) - socket.busy = False - return data - - -class SeaGroup: - def __init__(self): - self.version = 0 - self.components = [] - self.grouptitle = "untitled" - self.lastpoll = 0 - self.lastreq = 0 - self.empty_values = {} - - -class Instrument: - def remove(self, client): - try: - del self.clients[client.id] - except KeyError: - logger.warning('client already removed %s', client.id) - - def register(self, client): - self.clients[client.id] = client - return client - - -class SeaInstrument(Instrument): - # convert SEA layout tag like "-W" to more meaningful name. - # the code: 0: modifier, 1: enum name, 2: input element - tags = { - '-W': ('width', 0), - '-T': ('title', 0), - '-H': ('tooltip', 0), - '-V': ('value', 0), - '-S': ('style', 0), - '-D': ('div', 0), - '-r': ('enum_name', 1), - '-R': ('enum', 2), - '-I': ('input', 2), - '-i': ('rdonly', 2), - '-L': ('rdonly', 2), - '-G': ('group', 2), - '-C': ('checkbox', 2), - '-B': ('pushbutton', 2), - '-l': ('link', 0), - '-E': ('end', 2), - } - - def __init__(self, inst_name, instrument_config): - self.host_port = hostport_split(instrument_config['sea']) - self.inst_name = inst_name - self.title = inst_name - self.clients = {} - self.logger_dir = instrument_config.get('logger_dir','') - test_day = instrument_config.get('test_day', None) - self.test_day = [int(x) for x in test_day.split('-')] if test_day else None - self.seaspy = tcp_lineserver.LineClient(self.host_port, ['Spy 007'], True, ridername='spy') - self.seaspy.busy = False - self.seaspy.name = "SEA seaspy" - self.seacmd = None - self.last_client_remove = time.time() - self.history = deque(maxlen=1000) - self.sea_lock = RLock() - self.init() - gevent.Greenlet.spawn(self.checkconnections) - - def init(self): - self.values = {} - self.groups = {} - with self.sea_lock: - self.device = sea_request_reply(self.seaspy, "samenv name")[0] # first line - self.consolepos = 0 - self.timeStamp = None - self.history.clear() - self.lastcmd = None - - def checkconnections(self): - while True: - if len(self.clients) == 0 and self.seaspy.connected: - if time.time() > self.last_client_remove + 30: - logging.info("close SEA connections") - self.history.clear() - self.seaspy.close() - if self.seacmd: - self.seacmd.close() - gevent.sleep(10) - - def newClient(self): - if not self.seaspy.connected: - self.init() - return self.register(SeaClient()) - - def remove(self, client): - Instrument.remove(self, client) - self.last_client_remove = time.time() - - def findgroup(self, path): - 'get a group from sea and store it' - if not path in self.groups: - self.groups[path] = SeaGroup() - self.groups[path].lastpoll = 0 - self.groups[path].lastreq = time.time() - self.poll_groups([path]) - return self.groups[path] - - def poll_groups(self, paths): - 'polls values and components of requested groups' - for path in list(paths): - gobj = self.groups[path] - now = time.time() - if now < gobj.lastpoll + 0.5: - continue # do not poll before 500 ms have passed - gobj.lastreq = now - gobj.lastpoll = now - try: - with self.sea_lock: - data = sea_request_reply(self.seaspy, 'getgroup '+path) - except Exception as e: - logging.error('ERROR (getgroup %s) %s', path, traceback.format_exc()) - continue - components = [] - values = {} - grouptitle = None - within_enum = False - item = {} - olditem = {} - for line in data: - (key, type) = self.tags.get(line[0:2], ('',-1)) - if type < 0: - continue - if type == 0: # modifier - item[key] = line[2:] - continue - if within_enum and type >= 2: - enum['enum_names'] = enum_names - name = enum['name'] - self.values[name] = enum.get('value', '') - values[name] = None #NEW - del enum['value'] - components.append(enum) - del enum - within_enum = False - if type == 1: - item['value'] = line[2:] - enum_names.append(item) - item = {} - continue - if key == 'enum': - enum = item - item = {} - within_enum = True - enum['type'] = key - enum['name'] = line[2:] - enum_names = [] - continue - if key == 'pushbutton': - if line[2] != ' ': - continue # skip special buttons - line = line[1:] # skip space - try: - item['value'] = item['title'] - item['title'] = ' ' - # was olditem.get('title',olditem['name']) - except: - pass - if key == 'end': - continue - if key == 'group': - if grouptitle == None: - grouptitle = item.get('title', line[2:]) - item = {} - continue - item['type'] = key - name = line[2:] - try: - self.values[name] = item['value'] - values[name] = None - del item['value'] - except KeyError: - pass - item['name'] = name - components.append(item) - olditem = item - item = {} - if gobj.components != components: - gobj.components = components - gobj.empty_values = values - gobj.version += 1 - if grouptitle != None: - gobj.grouptitle = grouptitle - - - def make_seacmd(self): - global port - if not self.seacmd: - self.seacmd = tcp_lineserver.LineClient(self.host_port, - ['seauser seaser', 'fulltransact config listen 1', 'fulltransact commandlog tail 200'], - True, ridername='cmd') - self.seacmd.name = "SEA user" - self.seacmd.connect() - - def console(self): - self.make_seacmd() - - def addconsole(self, msg): - self.history.append(msg) - self.consolepos += 1 - - def pollconsole(self): - if not self.seacmd: - return - while True: - line = self.seacmd.get_line() - if line == None: - return None - if (line.startswith('Deleting connection') - or line.startswith('Accepted connection') - or (line.startswith('User ') and ' privilege' in line) - or line.startswith('Change of Authorisation') - or line.startswith('fulltransact config ') - or line.startswith('UserRights = ') - or line.startswith('fulltransact status') - or line == 'OK' or line == 'OK.' - or line.startswith('fulltransact commandlog tail') - or line.startswith('Login OK') - or line.startswith('TRANSACTIONSTART commandlog tail ') - ): - pass - elif line.startswith('TRANSACTIONSTART'): - if self.lastcmd != None: - self.addconsole(('command', self.lastcmd, self.lastid)) - self.lastcmd = None - elif line == 'TRANSACTIONFINISHED': - self.lastid = 0 - elif line.startswith('fulltransact '): - type = 'command' - self.addconsole(('command', line[13:], 0)) - elif line.startswith('==='): - self.timeStamp = line - elif line > " ": - if self.timeStamp: - self.addconsole(('reply', self.timeStamp, self.lastid)) - self.timeStamp = None - type = 'reply' - self.addconsole(('reply', line, self.lastid)) - - def getconsole(self, startindex, id): - idx = min(len(self.history), self.consolepos - startindex) # distance from end - messages = [] - for i in range(-idx,0): - typ, line, hid = self.history[i] - messages.append(dict(type=typ, line=line, origin=('self' if hid==id else 'other'))) - return self.consolepos, messages - - def command(self, command, id): - self.make_seacmd() - self.seacmd.send_line('fulltransact '+command) - self.lastid = id - self.lastcmd = command - - -class SeaInfluxInstrument(SeaInstrument): - - def __init__(self, inst_name, instrument_config): - super().__init__(inst_name, instrument_config) - self.db = InfluxDB() - self.influx_data_getter = InfluxDataGetter(self.db, inst_name) - self.device = self.influx_data_getter.get_device_name(int(datetime.now().timestamp())) - - def newClient(self): - if not self.seaspy.connected: - self.init() - return self.register(SeaInfluxClient()) - - -class InfluxInstrument(Instrument): - - def __init__(self, instr_name): - self.db = InfluxDB() - self.influx_data_getter = InfluxDataGetter(self.db, inst_name) - self.clients = {} - self.title = instr_name - self.device = self.influx_data_getter.get_device_name(int(datetime.now().timestamp())) - - def newClient(self): - return self.register(InfluxClient()) - -class SeaGraph: - HISTORICAL = 0 - ACTUAL = 1 - LIVE = 2 - - def __init__(self): - self.livemode = self.HISTORICAL - self.time = [0, 0] - self.lastvalues = {} - self.variables = [] - - def strip_future(self, result): - 'strip future points (happens only on dummy test_day)' - # if self.livemode == self.LIVE: - for c in result.values(): - while c: - lastt, lastx = c[-1] - if lastt <= self.time[1]: - break - c.pop() - - def complete_to_end(self, result, endtime): - for var, c in result.items(): - if c: - lastt, lastx = c[-1] - if lastt < endtime: - c.append((endtime, lastx)) - self.lastvalues[var] = (endtime, lastx) - - def w_graph(self, variables, time="-1800,0"): - """get given curves - - variables: comma separated list of variables to get - time: comma separated time range (beg,end) values < 1 year are treated as relative to the current time - """ - time = [float(t) for t in time.split(',')] - self.last_t = 0 - start, end, now = seagraph.get_abs_time(time + [0]) - self.time = [start, end] - variables = variables.split(',') - self.livemode = self.ACTUAL if end >= now else self.HISTORICAL - logging.info('LIVE %g %g %d %d', end, now, end >= now, self.livemode) - self.scanner = seagraph.NumericScanner(instrument.logger_dir, instrument.test_day) - #result = self.scanner.get_message(self.variables, self.time) - #self.time[0] = self.time[1] - result = self.scanner.get_message(variables, self.time, show_empty=True) - self.strip_future(result) - #for var in ('treg.set.reg', 'mf'): - # curve = result.get(var,[(0,0)]) - # print(var, curve[0][0] - now, curve[-1][0] - now, curve) - self.complete_to_end(result, end) - self.time[0] = self.time[1] - # reduction not yet implemented - return dict(type='graph-draw', reduced=False, graph=result) - - def w_gettime(self, time): - """parse time (using server time) - time: comma separated time range (beg,end) values < 1 year are treated as relative to the current time - """ - time = [float(t) for t in time.split(',')] - return dict(type='time', time= seagraph.get_abs_time(time)) - - def w_getvars(self, time): - """get the curves available at given time (unix timestamp as string) - """ - time = [float(t) for t in time.split(',')] - scanner = seagraph.VarsScanner(instrument.logger_dir, instrument.test_day) - result = dict(type='var_list') - result['blocks'] = list(scanner.get_message(time[-1]).values()) - # updates the self.variables attribute to keep track of the available variables - self.variables = [variable["name"] for block in result['blocks'] for variable in block["curves"]] - return result - - def w_updategraph(self): - """update live values - seems not to work""" - logging.info("UPD GRAPH %d", self.livemode) - if self.livemode == self.HISTORICAL: - return dict(type='accept-graph', live=False) - else: - self.livemode = self.LIVE - return dict(type='accept-graph', live=True) - #self.livemode = self.LIVE - #return dict(type='accept-graph', live=True) - - def graphpoll(self): - if self.livemode == self.LIVE: - self.time[1], = seagraph.get_abs_time([0]) - else: - self.time[1] = self.time[0] # do not update - if self.time[1] > self.time[0]: - result = self.scanner.get_message(self.variables, self.time, show_empty=False) - self.strip_future(result) - if int(self.time[1] / 60) != int(self.time[0] / 60): - # update unchanged values - for var, (lastt, lastx) in self.lastvalues.items(): - if var not in result: - result[var] = [(self.time[1], lastx)] - self.time[0] = self.time[1] - if len(result) > 0: - return dict(type='graph-update', reduced=False, time=self.time[1], graph=result) - return None - -class SeaParams: - - def __init__(self): - self.group_version = {} - self.group_values = {} - self.values = {} - self.consolepos = 0 - self.id = uuid.uuid4().hex[0:15] - # SeaGraph.__init__(self) - self.queue = [] - - def poll(self): - messages = self.queue - self.queue = [] - updates = [] - # group updates - instrument.poll_groups(self.group_version.keys()) - for path, gv in self.group_version.items(): - if path in self.group_values: - gobj = instrument.groups[path] - if gv != gobj.version: - logging.info('redraw: %s client: %d instr: %d', path, gv, gobj.version) - self.group_version[path] = gobj.version - messages.append(dict(type='redraw', path=path)) - break - else: - values = self.group_values[path] - for name, client_value in values.items(): - inst_value = instrument.values.get(name, None) - if client_value != inst_value: - values[name] = inst_value - updates.append({'name': name, 'value': inst_value}) - if len(updates) > 0: - messages.append(dict(type='update', updates=updates)) - # console messages - instrument.pollconsole() - self.consolepos, msg = instrument.getconsole(self.consolepos, self.id) - messages.extend(msg) - - # graph messages - msg = self.graphpoll() - if msg: - messages.append(msg) - return messages - - def info(self): - return self.group_version.keys() - - def w_getblock(self, path): - gobj = instrument.findgroup(path.split(',')[-1]) - # self.group_version[path] = gobj.version - # simplify: allow only one group per client - self.group_version = {path: gobj.version} - logging.info('getblock %s %d', path, gobj.version) - return dict(type='draw', title=gobj.grouptitle, path=path, components=gobj.components) - - def w_updateblock(self, path): - gobj = instrument.findgroup(path) - logging.info('make active %s', path) - #if not path in self.group_values: - self.group_values[path] = gobj.empty_values.copy() - return dict(type='accept-block') - - def w_console(self): - self.consolepos = 0 - instrument.console() - return dict(type='accept-console') - - def w_sendcommand(self, command): - instrument.command(command, self.id) - return dict(type='accept-command') - - -class InfluxParams: - """Class with dummy routes, in case client side is started with the right part init commands""" - def __init__(self): - self.id = uuid.uuid4().hex[0:15] - self.queue = [] - - def poll(self): - messages = self.queue - self.queue = [] - msg = self.graphpoll() - if msg: - messages.append(msg) - return messages - - def info(self): - return ["na"] - - def w_getblock(self, path): - return dict(type='draw', title="graph", path=path, components=[]) - - def w_updateblock(self, path): - return dict(type='accept-block') - - def w_console(self): - return dict(type='accept-console') - - def w_sendcommand(self, command): - return dict(type='accept-command') - - -class SeaClient(SeaParams, SeaGraph): - def __init__(self): - SeaParams.__init__(self) - SeaGraph.__init__(self) - - -class SeaInfluxClient(SeaParams, InfluxGraph): - def __init__(self): - SeaParams.__init__(self) - InfluxGraph.__init__(self, instrument.influx_data_getter, instrument.title) - - -class InfluxClient(InfluxParams, InfluxGraph): - def __init__(self): - InfluxParams.__init__(self) - InfluxGraph.__init__(self, instrument.influx_data_getter, instrument.title) - - -class DummyClient(SeaGraph): - asynch = set(('id','update','redraw','command','reply','graph-update','graph-redraw')) - - def __init__(self, host_port): - self.linesocket = tcp_lineserver.LineClient(host_port) - self.id = uuid.uuid4().hex[0:15] - self.linesocket.send_line(json.dumps(dict(type='init', id=self.id))) - self.queue = [] - self.syncreply = [] - SeaGraph.__init__(self) - - def cmd_reply(self, command, replytype=None, tmo=5): - self.linesocket.send_line(json.dumps(command)) - t = 0 - while True: - if self.syncreply: - msg = self.syncreply.pop(0) - break - line = self.linesocket.get_line() - if line != None: - msg = json.loads(line) - if msg['type'] in self.asynch: - t = 0 - self.queue.append(msg) - else: - break - if t >= tmo: - raise Exception("timeout") - gevent.sleep(0.1) - t += 0.1 - if replytype and msg['type'] != replytype: - logging.error('REPLY MISMATCH %s %s <> %s' , command, replytype, msg['type']) - return msg - - - def w_getblock(self, path): - return self.cmd_reply(dict(type='getblock', path=path, id=self.id), 'draw') - - def w_updateblock(self, path): - return self.cmd_reply(dict(type='updateblock', path=path, id=self.id), 'accept-block') - - def w_console(self): - return self.cmd_reply(dict(type='console', id=self.id), 'accept-console') - - def w_sendcommand(self, command): - return self.cmd_reply(dict(type='sendcommand', command=command, id=self.id), 'accept-command') - - def poll(self): - if self.queue: - messages = self.queue - self.queue = [] - return messages - line = self.linesocket.get_line() - messages = [] - if line: - msg = json.loads(line) - if msg['type'] in self.asynch: - messages.append(msg) - else: - self.syncreply.append(msg) - - # graph messages - msg = self.graphpoll() - if msg: - messages.append(msg) - return messages - - def info(self): - return ["na"] - -class DummyInstrument(Instrument): - - def __init__(self, inst_name, instrument_config): - self.instrument_config = instrument_config - self.host_port = hostport_split(instrument_config['hostport']) - self.logger_dir = instrument_config.get('logger_dir', '') - test_day = instrument_config.get('test_day', None) - self.test_day = [int(x) for x in test_day.split('-')] if test_day else None - self.title = inst_name - self.clients = {} - - def newClient(self): - return self.register(DummyClient(self.host_port)) - - -class SecopMsg(tuple): - asynch = False - - def __new__(cls, line): - sl = (line+' ').split(' ', 2) - if len(sl[0].split(',')) > 1: - return tuple.__new__(cls, ('idn', None, line)) - else: - typ, par, val = sl - val = val.strip() or None - if val: - val = json.loads(val) - if typ in ('update', 'error_update'): - cls = SecopAsyncMsg - return tuple.__new__(cls, (typ, par, val)) - - @property - def type(self): - return self[0] - - @property - def par(self): - return self[1] - - @property - def value(self): - return self[2] - - def __repr__(self): - value = repr(self.value) - if len(value) > 50: - value = value[:50] + '...' - return "SecopMsg('%s %s %s')" % (self.type, self.par, value) - - -class SecopAsyncMsg(SecopMsg): - asynch = True - - -def SecopEncode(cmd, par=None, value=None): - line = cmd - if par: - line += " " + par - if value: - line += " " + json.dumps(value) - return line - - -def convert_par(module, name, par): - result = dict(type='input', name=module+":"+name, title=name) - if par.get('readonly', True): - result['type'] = 'rdonly' - else: - result['command'] = 'change %s:%s' % (module, name) - if par['datainfo']['type'] == 'enum': - result['enum_names'] = [dict(title=k, value=v) for k, v in par['datainfo']['members'].items()] - result['type'] = 'enum' - elif par['datainfo']['type'] == 'bool': - result['type'] = 'checkbox' - return result - - -def convert_event(messages): - if isinstance(messages, SecopMsg): - messages = [messages] - updates = [] - for msg in messages: - if msg.type == 'update': - updates.append(dict(name=msg.par, value=msg.value[0])) - elif msg.type == 'error_update': - updates.append(dict(name=msg.par, error=f'{msg.value[0]} - {msg.value[1]}')) - # updates.append(dict(name=msg.par, value=str(msg.value[0]))) - return [dict(type='update', updates=updates)] - - -class SecNodeClient: - def __init__(self, host_port): - self.linesocket = tcp_lineserver.LineClient(hostport_split(host_port)) - self.consolequeue = [] - self.queue = [] - self.syncreply = [] - #self.out = open("debug.txt", "w") - #self.out = sys.stdout - self.out = None - idn = self.cmd_reply("*IDN?", "idn") - self.idn = idn.value - self.description = self.cmd_reply("describe", "describing").value - - def cmd_reply(self, command, replytype, tmo=5): - self.replytype = replytype - if self.out: self.out.write(">"+command+"\n") - self.consolequeue.append(dict(type='command',line=command,origin='self')) - self.linesocket.send_line(command) - t = 0 - delta = 0.001 - while True: - if self.syncreply: - msg = self.syncreply.pop(0) - break - line = self.linesocket.get_line() - if line is None: - if t >= tmo: - raise Exception("timeout") - gevent.sleep(delta) - t += delta - continue - if not line.startswith(('update', 'error_update')): - self.consolequeue.append(dict(type='reply',line=line,origin='other')) - if self.out: self.out.write("<"+line+"\n") - msg = SecopMsg(line) - if not msg.asynch: - break - self.queue.append(msg) - if not replytype.startswith(msg.type): - logging.error('REPLY MISMATCH %s <> %s', replytype, repr(msg)) - self.replytype = "" - return msg - - def poll(self): - if self.consolequeue: - messages = self.consolequeue - self.consolequeue = [] - return messages - messages = self.queue - self.queue = [] - while 1: - line = self.linesocket.get_line() - if not line: - break - if not line.startswith(('update', 'error_update')): - self.consolequeue.append(dict(type='reply',line=line,origin='other')) - if self.out: self.out.write("<"+line+"\n") - msg = SecopMsg(line) - if msg.asynch: # and self.replytype != msg['type'] + "=" + msg.par: - messages.append(msg) - else: - self.syncreply.append(msg) - break - if messages: - return convert_event(messages) - return [] - - - -class SecopClient: - prio_par = ["value", "status", "target"] - hide_par = ["baseclass", "class", "pollinterval"] - skip_par = ["status2"] - - def __init__(self, host_ports): - self.nodes = [] - self.node_map = {} - for host_port in host_ports.split(','): - node = SecNodeClient(host_port) - self.nodes.append(node) - for name, mod in node.description["modules"].items(): - self.node_map[name] = node - self.id = uuid.uuid4().hex[0:15] - - def w_getblock(self, path): - path = path.split(',')[-1] - if path == "main": - components = [] - for node in self.nodes: - for name, m in node.description["modules"].items(): - #components.append(convert_par(name, 'value', m['parameters']['value'])) - components.append(dict(type='rdlink', name=name+':value', title=name)) - #print components - return dict(type='draw', path='main', title='modules', components=components) - else: - node = self.node_map[path] - module = node.description['modules'][path] - logging.info('MP %r', path) - parameters = dict(module["accessibles"]) - components = [] - for name in SecopClient.skip_par: - if name in parameters: - parameters.pop(name) - for name in SecopClient.prio_par: - if name in parameters: - components.append(convert_par(path, name, parameters.pop(name))) - components1 = [] - for name in SecopClient.hide_par: - if name in parameters: - components1.append(convert_par(path, name, parameters.pop(name))) - for name, p in parameters.items(): - components.append(convert_par(path, name, parameters[name])) - components.extend(components1) - return dict(type='draw', path=path, title=path, components=components) - - def w_updateblock(self, path): - if path == 'main': - path = '' - for node in self.nodes: - node.cmd_reply(f"activate", "active") - else: - node = self.node_map[path] - node.cmd_reply(f"activate {path}", "active") - return dict(type='accept-block') - - def w_console(self): - return dict(type='accept-console') - - def w_sendcommand(self, command): - # TODO: if not obsolete: add node to command. now its always sending to the first node - logging.info('SENDCOMMAND %r', command) - if not command.strip(): - return dict(type='accept-command') - scmd = command.split(' ') - if scmd[1:2]: - module = scmd[1].split(':')[0] - node = self.node_map[module] - else: - node = self.nodes[0] - print('no module given, send command to first node', command) - if scmd[0] == 'change': - replytype = f'changed {scmd[1]}' - else: - replytype = None - # cmd = "change " + command - reply = node.cmd_reply(command, replytype) - return dict(type='accept-command', result=reply) - - def poll(self): - messages = [] - for node in self.nodes: - messages.extend(node.poll()) - return messages - - def info(self): - return ["na"] - - -class SecopInstrument(Instrument): - - def __init__(self, inst_name, instrument_config): - self.instrument_config = instrument_config - self.host_ports = instrument_config['hostport'] - self.logger_dir = instrument_config.get('logger_dir', '') - #test_day = instrument_config.get('test_day', None) - #self.test_day = [int(x) for x in test_day.split('-')] if test_day else None - self.title = inst_name - self.clients = {} - - def newClient(self): - return self.register(SecopClient(self.host_ports)) - - -class SecopInfluxClient(SecopClient, InfluxGraph): - def __init__(self): - SecopClient.__init__(self, instrument.host_ports) - InfluxGraph.__init__(self, instrument.influx_data_getter, instrument.title) - - def poll(self): - messages = super().poll() - msg = self.graphpoll() - if msg: - messages.append(msg) - return messages - - -class SecopInfluxInstrument(SecopInstrument): - - def __init__(self, inst_name, instrument_config): - super().__init__(inst_name, instrument_config) - self.db = InfluxDB() - self.influx_data_getter = InfluxDataGetter(self.db, inst_name) - self.device = self.influx_data_getter.get_device_name(int(datetime.now().timestamp())) - - def newClient(self): - #if not self.seaspy.connected: - # self.init() - return self.register(SecopInfluxClient()) - - -class Logger(object): - def __init__(self, logpath): - self.terminal = sys.stdout - self.log = open(logpath, "a") - - def write(self, message): - self.terminal.write(message) - self.log.write(message) - - def flush(self): - pass - - -def handle_pdb(sig, frame): - import pdb - print('PDB') - pdb.Pdb().set_trace(frame) - - -def handle_term(sig, _): - server.stop() - server.close() - - -if __name__ == '__main__': - signal.signal(signal.SIGUSR1, handle_pdb) - signal.signal(signal.SIGTERM, handle_term) - print('PID', os.getpid()) - - if len(sys.argv) > 3: - instrument_config = {} - for arg in sys.argv[1:]: - split = arg.split('=') - instrument_config[split[0]] = '='.join(split[1:]) - port = int(instrument_config['port']) - inst_name = instrument_config['instrument'] - else: - # take config from instruments.json - try: - port = int(sys.argv[1]) - except IndexError: - port = 5000 - try: - inst_name = sys.argv[2] - except IndexError: - inst_name = 'seadummy' - - with open('instruments.json') as f: - instrument_list = json.load(f) - - instrument_config = instrument_list[inst_name] - logging.basicConfig(filename='log/%s.log' % inst_name, filemode='w', level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s') - - # sys.stdout = Logger(instrument_config.get('logger_dir', '.') + '/seaweb_stdout.txt') - # print '-' * 80 - type = instrument_config.get('type', 'sea') - if type == 'sea': - instrument = SeaInstrument(inst_name, instrument_config) - elif type == 'influxsea': - instrument = SeaInfluxInstrument(inst_name, instrument_config) - elif type == 'influxsecop': - instrument = SecopInfluxInstrument(inst_name, instrument_config) - elif type == 'influx': - instrument = InfluxInstrument(inst_name) - elif type == 'dummy': - instrument = DummyInstrument(inst_name, instrument_config) - elif type == 'secop': - instrument = SecopInstrument(inst_name, instrument_config) - else: - raise TypeError('bad instrument type') - - app.debug = True - #server = gevent.wsgi.WSGIServer(('', port), app, keyfile='key.key', certfile='key.crt') - server = gevent.pywsgi.WSGIServer(('', port), app, log=logging.getLogger('server')) - server.serve_forever() - # Then visit http://localhost:5000/test for a test diff --git a/secop.py b/secop.py new file mode 100644 index 0000000..6c1d0ec --- /dev/null +++ b/secop.py @@ -0,0 +1,189 @@ +import time +import logging +import uuid +from base import Instrument, get_abs_time +from influxdb import InfluxDB, InfluxDataGetter +from influxgraph import InfluxGraph +from frappy.client import SecopClient as SecNodeClient +from frappy.lib.enum import EnumMember +from frappy.datatypes import get_datatype + + +def convert_par(module, name, par): + result = dict(type='input', name=module+":"+name, title=name) + if par.get('readonly', True): + result['type'] = 'rdonly' + else: + result['command'] = 'change %s:%s' % (module, name) + if par['datainfo']['type'] == 'enum': + result['enum_names'] = [dict(title=k, value=v) for k, v in par['datainfo']['members'].items()] + result['type'] = 'enum' + elif par['datainfo']['type'] == 'bool': + result['type'] = 'checkbox' + return result + + +class SecopClient: + prio_par = ["value", "status", "target"] + hide_par = ["baseclass", "class", "pollinterval"] + skip_par = ["status2"] + + def __init__(self, instrument): + self.instrument = instrument + self.id = uuid.uuid4().hex[0:15] + self.module_updates = set() + self.param_updates = set() + self.updates = {} + + def w_getblock(self, path): + path = path.split(',')[-1] # TODO: why this? + if path == "main": + components = [dict(type='rdlink', name=name+':value', title=name) + for node in self.instrument.nodes for name in node.modules] + self.param_updates = {'value'} + return dict(type='draw', path='main', title='modules', components=components) + self.module_updates.add(path) # TODO: remove others? + node = self.instrument.node_map[path] + module = node.modules[path] + # logging.info('MP %r', path) + parameters = dict(module["parameters"]) + components = [] + for name in SecopClient.skip_par: + if name in parameters: + parameters.pop(name) + for name in SecopClient.prio_par: + if name in parameters: + components.append(convert_par(path, name, parameters.pop(name))) + components1 = [] + for name in SecopClient.hide_par: + if name in parameters: + components1.append(convert_par(path, name, parameters.pop(name))) + for name, p in parameters.items(): + components.append(convert_par(path, name, parameters[name])) + components.extend(components1) + return dict(type='draw', path=path, title=path, components=components) + + def updateItem(self, module, parameter, entry): + key = module, parameter + # print(key, entry) + if module in self.module_updates or parameter in self.param_updates: + name = f'{module}:{parameter}' + if entry.readerror: + item = {'name': name, 'error': str(entry.readerror)} + else: + item = {'name': name, 'value': str(entry), 'formatted': entry.formatted()} + # print(item) + self.updates[key] = item + + def w_updateblock(self, path): + if path == 'main': + path = '' + for node in self.instrument.nodes: + for modname in node.modules: + key = modname, 'value' + if key in node.cache: + self.updateItem(*key, node.cache[key]) + else: + node = self.instrument.node_map[path] + for param in node.modules[path]['parameters']: + key = path, param + if key in node.cache: + self.updateItem(*key, node.cache[key]) + return dict(type='accept-block') + + def w_console(self): + return dict(type='accept-console') + + def w_sendcommand(self, command): + logging.info('SENDCOMMAND %r', command) + if not command.strip(): + return dict(type='accept-command') + scmd = command.split(' ') + if scmd[0] != 'change': + scmd.insert(0, 'change') + # return dict(type='accept-command', error=f'do not know how to do {command!r}') + module, _, parameter = scmd[1].partition(':') + if not parameter: + parameter = 'target' + node = self.instrument.node_map[module] + result = node.setParameterFromString(module, parameter, scmd[2]) + return dict(type='accept-command') + + def w_gettime(self, time): + """parse time (using server time) + time: comma separated time range (beg,end) values < 1 year are treated as relative to the current time + """ + time = [float(t) for t in time.split(',')] + return dict(type='time', time=get_abs_time(time)) + + def poll(self): + updates, self.updates = self.updates, {} + if not updates: + return [] + messages = [dict(type='update', updates=list(updates.values()))] + return messages + + def info(self): + return ["na"] + + +class SecopInstrument(Instrument): + + def __init__(self, inst_name, instrument_config): + super().__init__() + self.instrument_config = instrument_config + host_ports = instrument_config['hostport'] + self.logger_dir = instrument_config.get('logger_dir', '') + # test_day = instrument_config.get('test_day', None) + # self.test_day = [int(x) for x in test_day.split('-')] if test_day else None + self.title = inst_name + self.device = 'UNDEFINED' + self.clients = {} + self.nodes = [] + self.node_map = {} + for host_port in host_ports.split(','): + node = SecNodeClient(host_port) + node.connect() + self.nodes.append(node) + for name, mod in node.modules.items(): + self.node_map[name] = node + + def register(self, client): + print('OPEN') + for node in self.nodes: + node.register_callback(None, client.updateItem) + return super().register(client) + + def remove(self, client): + print('REMOVE') + for node in self.nodes: + node.unregister_callback(None, client.updateItem) + super().remove(client) + + def new_client(self): + return self.register(SecopClient(self)) + + +class SecopInfluxClient(SecopClient, InfluxGraph): + def __init__(self, instrument): + SecopClient.__init__(self, instrument) + InfluxGraph.__init__(self, instrument.influx_data_getter, instrument.title) + + def poll(self): + messages = super().poll() + msg = self.graphpoll() + if msg: + messages.append(msg) + return messages + + +class SecopInfluxInstrument(SecopInstrument): + + def __init__(self, inst_name, instrument_config): + super().__init__(inst_name, instrument_config) + self.db = InfluxDB() + self.influx_data_getter = InfluxDataGetter(self.db, inst_name) + self.device = self.influx_data_getter.get_device_name(int(time.time())) + + def new_client(self): + return self.register(SecopInfluxClient(self)) diff --git a/webserver.py b/webserver.py new file mode 100755 index 0000000..9dc3f5f --- /dev/null +++ b/webserver.py @@ -0,0 +1,268 @@ +from gevent import monkey +monkey.patch_all() +import sys +import time +import os +import signal +import socket +import traceback +import logging +import json +from collections import deque +from datetime import date, datetime +import tcp_lineserver +import gevent +import gevent.pywsgi +import gevent.queue +from gevent.lock import RLock +import flask +import pprint +import random +import uuid +import circularlog + + +def guess_mimetype(filename): + if filename.endswith('.js'): + mimetype = 'text/javascript' + elif filename.endswith('.css'): + mimetype = 'text/css' + elif filename.endswith('.ico'): + mimetype = 'image/x-icon' + elif filename.endswith(".png"): + mimetype = "image/png" + else: + mimetype = 'text/html' + return mimetype + + +class MyEncoder(json.JSONEncoder): + def default(self, obj): + try: + return super().default(obj) + except TypeError: + return int(obj) # try to convert SECoP Enum + + +# SSE 'protocol' is described here: https://bit.ly/UPFyxY +def to_json_sse(msg): + txt = json.dumps(msg, separators=(',', ': '), cls=MyEncoder) + logging.debug('data: %s', txt) + return 'data: %s\n\n' % txt + + +instrument = None +app = flask.Flask(__name__) + +update_rider = circularlog.Rider("upd") +pollinterval = 0.2 + + +@app.route('/update') +def get_update(path=None): + # Client Adress: socket.getfqdn(flask.request.remote_addr) + client = instrument.new_client() + client.remote_info = circularlog.strtm() + " " + socket.getfqdn(flask.request.remote_addr.split(':')[-1]) + + @flask.stream_with_context + def generator(): + logging.info('UPDATE %s %s', client.id, socket.getfqdn(flask.request.remote_addr.split(':')[-1])) + # msg = dict(type='id', id=client.id, title=instrument.title); + # yield to_json_sse(msg) + msg = dict(type='id', id=client.id, instrument=instrument.title, device=instrument.device) + yield to_json_sse(msg) + try: + lastmsg = time.time() + while True: + if client.info() == "": + print(time.time()-lastmsg) + messages = client.poll() + for msg in messages: + update_rider.put('-', repr(msg)) + yield to_json_sse(msg) + if messages: + lastmsg = time.time() + else: + if time.time() > lastmsg + 30: + if not client.info(): + raise GeneratorExit("no activity") + logging.info('HEARTBEAT %s (%s)', client.id, "; ".join(client.info())) + yield to_json_sse(dict(type='heartbeat')) + lastmsg = time.time() + else: + gevent.sleep(pollinterval) + except (GeneratorExit, tcp_lineserver.Disconnected) as e: + logging.info("except clause %r", repr(e)) + logging.info('CLOSED %s', client.id) + print('CLOSE client') + instrument.remove(client) + pass + except Exception as e: + logging.info('error') + logging.error('%s', traceback.format_exc()) + instrument.remove(client) + # msg = dict(type='error',error=traceback.format_exc()) + # yield to_json_sse(msg) + + resp = flask.Response(generator(), mimetype='text/event-stream') + resp.headers['Access-Control-Allow-Origin'] = '*' + return resp + + +@app.route('/circular') +def dump_circular(): + circularlog.log() + return "log" + + +@app.route('/clients') +def show_clients(): + result = "" + for id in instrument.clients: + c = instrument.clients[id] + result += c.remote_info + " " + "; ".join(c.info()) + "
" + return result + + +@app.route('/export') +def export(): + args = flask.request.args + kwargs = dict((k, args.get(k)) for k in args) + path = flask.request.path + logging.info('GET %s %s', path, repr(kwargs)) + try: + id = kwargs.pop('id') + client = instrument.clients[id] + bytes = client.w_export(**kwargs) + return flask.send_file( + bytes, + as_attachment=True, + download_name='export.tsv', + mimetype='text/tab-separated-values' + ) + + except Exception as e: + logging.error('%s', traceback.format_exc()) + circularlog.log() + msg = dict(type='error', request=path[1:], error=repr(e)) + logging.error('MSG: %r', msg) + resp = flask.Response(json.dumps(msg), mimetype='application/json') + resp.headers['Access-Control-Allow-Origin'] = '*' + return resp + + +@app.route('/getblock') +@app.route('/updateblock') +@app.route('/sendcommand') +@app.route('/console') +@app.route('/graph') +@app.route('/updategraph') +@app.route('/gettime') +@app.route('/getvars', methods=["GET", "POST"]) +def reply(): + args = flask.request.values + kwargs = dict((k, args.get(k)) for k in args) + path = flask.request.path + logging.info('GET %s %r', path, kwargs) + try: + id = kwargs.pop('id') + client = instrument.clients[id] + msg = getattr(client, "w_" + path[1:])(**kwargs) + except Exception as e: + logging.error('%s', traceback.format_exc()) + circularlog.log() + msg = dict(type='error', request=path[1:], error=repr(e)) + logging.info('REPLY %s %r', path, msg) + resp = flask.Response(json.dumps(msg), mimetype='application/json') + resp.headers['Access-Control-Allow-Origin'] = '*' + return resp + + +@app.route('/test/') +def subdir_test_file(file): + gevent.sleep(2) + resp = flask.send_file("client/test/"+file, mimetype=guess_mimetype(file)) + return resp + + +@app.route('/components/curves_settings_popup/color_selector/') +@app.route('/components/curves_settings_popup/') +@app.route('/components/action_entry/') +@app.route('/components/export_popup/') +@app.route('/components/dates_popup/') +@app.route('/components/menu_popup/') +@app.route('/components/help_popup/') +@app.route('/components/help_entry/') +@app.route('/components/control/') +@app.route('/components/divider/') +@app.route('/components/states_indicator/dates/') +@app.route('/res/') +@app.route('/jsFiles/') +@app.route('/cssFiles/') +@app.route('/externalFiles/') +def subdir_file(file): + subdir = "/".join(flask.request.path.split("/")[1:-1]) + resp = flask.send_file("client/" + subdir+"/"+file, mimetype=guess_mimetype(file)) + # resp.headers['Content-Security-Policy'] = "sandbox; script-src 'unsafe-inline';" + return resp + + +@app.route('/externalFiles/maps/.map') +def replace_by_empty(file): + return "" + + +@app.route('/') +def default(): + return general_file('SEAWebClient.html') + + +@app.route('/') +def general_file(file): + subdir = "client/" + try: + resp = flask.send_file(subdir+file, mimetype=guess_mimetype(file)) + except FileNotFoundError: + logging.warning('file %s not found', file) + return 'file not found' + # resp.headers['Content-Security-Policy'] = "sandbox; script-src 'unsafe-inline';" + return resp + + +def hostport_split(hostport): + h = hostport.split(':') + return (h[0], int(h[1])) + + +def handle_pdb(sig, frame): + import pdb + print('PDB') + pdb.Pdb().set_trace(frame) + + +def handle_term(sig, _): + server.stop() + server.close() + + +def main(cls, **config): + global instrument + if not config: + for arg in sys.argv[1:]: + key, _, value = arg.partition('=') + config[key] = value + port = int(config['port']) + inst_name = config['instrument'] + instrument = cls(inst_name, config) + + signal.signal(signal.SIGUSR1, handle_pdb) + signal.signal(signal.SIGTERM, handle_term) + + app.debug = True + + logging.basicConfig(filename='log/%s.log' % inst_name, filemode='w', level=logging.INFO, + format='%(asctime)s %(levelname)s %(message)s') + + #server = gevent.wsgi.WSGIServer(('', port), app, keyfile='key.key', certfile='key.crt') + server = gevent.pywsgi.WSGIServer(('', port), app, log=logging.getLogger('server')) + server.serve_forever()