commit 172042e73191de0e5f3844968b608bc234513afa Author: l_samenv Date: Fri Dec 4 08:54:24 2020 +0100 Initial commit diff --git a/circularlog.py b/circularlog.py new file mode 100644 index 0000000..a8d410c --- /dev/null +++ b/circularlog.py @@ -0,0 +1,51 @@ +import collections +import time +import logging + +circular = collections.deque(maxlen=80) +lastrider = None + +def strtm(t=None): + if t is None: + t = time.time() + tm = time.localtime(t) + return time.strftime("%H:%M:%S",tm)[0:7] + ("%5.3f" % (t % 10)) + +class Rider(object): + def __init__(self, name): + self.name = name + self.lastprompt = None + self.cnt = 0 + self.lastline = None + self.lasttime = 0 + + def putlast(self): + global lastrider, circular + if self.lasttime != 0: + circular.append("...") + circular.append("%s %s %s %s" % (strtm(self.lasttime), self.name, self.lastprompt, self.lastline)) + self.cnt = 0 + self.lasttime = 0 + + def put(self, prompt, line): + global lastrider, circular + now = time.time() + if prompt != self.lastprompt or lastrider != self: + if lastrider: + lastrider.putlast() + self.lastprompt = prompt + lastrider = self + else: + if self.cnt >= 4: + self.lastline = line + self.lasttime = now + return + self.cnt += 1 + circular.append("%s %s %s %s" % (strtm(now), self.name, prompt, line)) + +def log(): + if lastrider: + lastrider.putlast() + for line in circular: + logging.info("%s", line) + circular.clear() diff --git a/dummy.py b/dummy.py new file mode 100755 index 0000000..3fc16ff --- /dev/null +++ b/dummy.py @@ -0,0 +1,219 @@ +#!/usr/bin/env python + +import json +import sys +import copy +from collections import OrderedDict, deque +import tcp_lineserver as lineserver +import uuid +import traceback + +class Group(object): + def __init__(self, name): + self.name = name + self.title = name + self.components = [] + +class Component(object): + def __init__(self, name, g): + self.name = name + self.group = g + self.value = None + self.properties = {} + +init_values = ['device name', 'device stick_name'] + +def set_value(c, value): + c.value = value + print 'set',c.name,c.value,c.group + for n in init_values: + if c.name == n: + init(cindex['device name'].value) + break + for client in clients.values(): + client.send_msg(dict(type='update', updates=[dict(name=c.name, value=c.value)])) + + +def history_add(item): + history.append(item) + id, is_cmd, line = item + for cid, client in console_clients.items(): + client.send_msg(dict( + type = 'command' if is_cmd else 'reply', + origin = 'self' if cid == id else 'other', + line = line + )) + +class ClientType: + GROUP = 1 + CONSOLE = 2 + MAINUPDATE = 3 + +class DummyHandler(lineserver.LineHandler): + def __init__(self, *args, **kwargs): + lineserver.LineHandler.__init__(self, *args, **kwargs) + self.id = None + + def send_msg(self, msg): + print '<', msg + self.send_line(json.dumps(msg)) + + def handle_line(self, line): + print '>', line + msg = json.loads(line) + msgtype = msg['type'] + if msgtype == 'init': + id = msg.get('id', '0') + if self.id != msg['id']: + self.id = msg['id'] + clients[self.id] = self + self.title = "" + global main_update + main_update.append(self) + #self.send_msg({'type': 'accept-mainupdate'}) + init(cindex['device name'].value) + elif msgtype == 'getblock': + path = msg['path'] + grp = path.split(",")[-1] + g = groups[grp] + clist = [] + for c in g.components: + cdict = copy.deepcopy(c.properties) + cdict['name'] = c.name + clist.append(cdict) + self.send_msg(dict(type='draw', path=path, title=g.title, components=clist)) + elif msgtype == 'updateblock': + path = msg['path'] + grp = path.split(",")[-1] + g = groups[grp] + clist = [] + updates = [] + for c in g.components: + if c.properties.get('type','') != 'group': + updates.append(dict(name=c.name, value=c.value)) + self.send_msg(dict(type='accept-block')) + self.send_msg(dict(type='update', updates=updates)) + elif msgtype == 'console': + print '*** make console ',self.id,self + self.send_msg(dict(type='accept-console')) + console_clients[self.id] = self + for item in history: + id, is_cmd, line = item + self.send_msg(dict( + type = 'command' if is_cmd else 'reply', + origin = 'self' if self.id == id else 'other', + line = line + )) + elif msgtype == 'sendcommand': + cmd = msg['command'].split(' ') + history_add((self.id, True, msg['command'])) + if len(cmd) == 1: + name = cmd[0] + c = cindex.get(name, None) + val = None + else: + name = cmd[0] + " " + cmd[1] + try: + c = cindex[name] + except KeyError: + try: + c = cindex[cmd[0]] + except KeyError: + c = None + else: + name = cmd[0] + val = " ".join(cmd[1:]) + else: + if len(cmd) == 2: + val = None + else: + val = " ".join(cmd[2:]) + if c == None: + history_add((self.id, False, "ERROR: " + name + " not found")) + elif val == None: + history_add((self.id, False, name + " = " + c.value)) + else: + set_value(c, val) + history_add((self.id, False, "OK: " + name + " = " + c.value)) + self.send_msg(dict(type = 'accept-command')) + + def handle_close(self): + try: + print 'close client', self + del clients[self.id] + except KeyError: + print 'can not remove client' + try: + del console_clients[self.id] + print '*** removed console client',self.id,self + except KeyError: + print '*** console client already closed',self.id,self + self.close() + +def init(device): + global cindex, groups, actual_device + if device != actual_device: + print 'INIT', actual_device, device + actual_device = device + cindex = {} # component index + groups = {} + groupsfile = device + ".json" + with open(groupsfile) as fil: + groupdict = json.load(fil) + for grp in groupdict: + gdict = groupdict[grp] + g = find_group(grp) + g.title = gdict.get('title', grp) + for cdict in gdict['components']: + cname = cdict['name'] + c = Component(cname, g) + g.components.append(c) + cindex[cname] = c + c.properties = dict((k, v) for k, v in cdict.items() if k != 'name' and k != 'value') + c.value = cdict.get('value','') + if c.properties.get('type','') == 'group': + gg = find_group(cname) + if 'title' in cdict: + if gg.title == gg.name: + gg.title = cdict['title'] + else: + c.properties['title'] = gg.title + devlist = [cindex[n].value for n in init_values] + device_title = "DUMMY " + "/".join(devlist) + print 'ENDINIT', device_title + for m in main_update: + if m.title != device_title: + m.title=device_title + m.send_msg(dict(type='id', id=m.id, title=device_title)) + +def find_group(grp): + if grp in groups: + g = groups[grp] + else: + g = Group(grp) + groups[grp] = g + return g + +if __name__ == "__main__": + console_clients = {} + clients = {} + cindex = {} # component index + groups = {} + history = deque(maxlen=50) + console_id = 0 + main_update = [] + actual_device = '' + + try: + device = sys.argv[2] + except IndexError: + device = "dummy1" + try: + port = int(sys.argv[1]) + except IndexError: + port = 5001 + + init(device) + + server = lineserver.LineServer('localhost', port, DummyHandler) + server.loop() diff --git a/histgraph.py b/histgraph.py new file mode 100644 index 0000000..775ce36 --- /dev/null +++ b/histgraph.py @@ -0,0 +1,150 @@ +import time +import sys +if sys.version_info >= (3,6): + Dict = dict +else: + from collections import OrderedDict as Dict + + +class PrettyFloat(float): + def __repr__(self): + return '%.15g' % self + + +def get_abs_time(*times): + now = int(time.time() + 0.999) + oneyear = 365 * 24 * 3600 + return tuple(t + now if t < oneyear else t for t in times) + + +class ColorMap(object): + ''' + ColorMap is using official CSS color names, with the exception of Green, as this + is defined differently with X11 colors than in SEA, and used heavily in config files. + Here Green is an alias to Lime (#00FF00) and MidGreen is #008000, which is called Green in CSS. + The function to_code is case insensitive and accepts also names with underscores. + The order is choosen by M. Zolliker for the SEA client, originally only the first 16 were used. + ''' + hex_name = (("#FFFFFF","White"), ("#FF0000","Red"), ("#00FF00","Lime"), ("#0000FF","Blue"), ("#FF00FF","Magenta"), + ("#FFFF00","Yellow"), ("#00FFFF","Cyan"), ("#000000","Black"), ("#FFA500","Orange"), ("#006400","DarkGreen"), + ("#9400D3","DarkViolet"), ("#A52A2A","Brown"), ("#87CEEB","SkyBlue"), ("#808080","Gray"), ("#FF69B4","HotPink"), + ("#FFFFE0","LightYellow"), ("#00FF7F","SpringGreen"), ("#000080","Navy"), ("#1E90FF","DodgerBlue"), + ("#9ACD32","YellowGreen"), ("#008B8B","DarkCyan"), ("#808000","Olive"), ("#DEB887","BurlyWood"), + ("#7B68EE","MediumSlateBlue"), ("#483D8B","DarkSlateBlue"), ("#98FB98","PaleGreen"), ("#FF1493","DeepPink"), + ("#FF6347","Tomato"), ("#32CD32","LimeGreen"), ("#DDA0DD","Plum"), ("#7FFF00","Chartreuse"), ("#800080","Purple"), + ("#00CED1","DarkTurquoise"), ("#8FBC8F","DarkSeaGreen"), ("#4682B4","SteelBlue"), ("#800000","Maroon"), + ("#3CB371","MediumSeaGreen"), ("#FF4500","OrangeRed"), ("#BA55D3","MediumOrchid"), ("#2F4F4F","DarkSlateGray"), + ("#CD853F","Peru"), ("#228B22","ForestGreen"), ("#48D1CC","MediumTurquoise"), ("#DC143C","Crimson"), + ("#D3D3D3","LightGray"), ("#ADFF2F","GreenYellow"), ("#7FFFD4","Aquamarine"), ("#BC8F8F","RosyBrown"), + ("#20B2AA","LightSeaGreen"), ("#C71585","MediumVioletRed"), ("#F0E68C","Khaki"), ("#6495ED","CornflowerBlue"), + ("#556B2F","DarkOliveGreen"), ("#CD5C5C","IndianRed "), ("#2E8B57","SeaGreen"), ("#F08080","LightCoral"), + ("#8A2BE2","BlueViolet"), ("#AFEEEE","PaleTurquoise"), ("#4169E1","RoyalBlue"), ("#0000CD","MediumBlue"), + ("#B8860B","DarkGoldenRod"), ("#00BFFF","DeepSkyBlue"), ("#FFC0CB","Pink"), ("#4B0082","Indigo "), ("#A0522D","Sienna"), + ("#FFD700","Gold"), ("#F4A460","SandyBrown"), ("#DAA520","GoldenRod"), ("#DA70D6","Orchid"), ("#E6E6FA","Lavender"), + ("#5F9EA0","CadetBlue"), ("#D2691E","Chocolate"), ("#66CDAA","MediumAquaMarine"), ("#6B8E23","OliveDrab"), + ("#A9A9A9","DarkGray"), ("#BDB76B","DarkKhaki"), ("#696969","DimGray"), ("#B0C4DE","LightSteelBlue"), + ("#191970","MidnightBlue"), ("#FFE4C4","Bisque"), ("#6A5ACD","SlateBlue"), ("#EE82EE","Violet"), + ("#8B4513","SaddleBrown"), ("#FF7F50","Coral"), ("#008000","MidGreen"), ("#DB7093","PaleVioletRed"), ("#C0C0C0","Silver"), + ("#E0FFFF","LightCyan"), ("#9370DB","MediumPurple"), ("#FF8C00","DarkOrange"), ("#00FA9A","MediumSpringGreen"), + ("#E9967A","DarkSalmon"), ("#778899","LightSlateGray"), ("#9932CC","DarkOrchid"), ("#EEE8AA","PaleGoldenRod"), + ("#F8F8FF","GhostWhite"), ("#FFA07A","LightSalmon"), ("#ADD8E6","LightBlue"), ("#D8BFD8","Thistle"), + ("#FFE4E1","MistyRose"), ("#FFDEAD","NavajoWhite"), ("#40E0D0","Turquoise"), ("#90EE90","LightGreen"), + ("#B22222","FireBrick"), ("#008080","Teal"), ("#F0FFF0","HoneyDew"), ("#FFFACD","LemonChiffon"), ("#FFF5EE","SeaShell"), + ("#F5F5DC","Beige"), ("#DCDCDC","Gainsboro"), ("#FA8072","Salmon"), ("#8B008B","DarkMagenta"), ("#FFB6C1","LightPink"), + ("#708090","SlateGray"), ("#87CEFA","LightSkyBlue"), ("#FFEFD5","PapayaWhip"), ("#D2B48C","Tan"), ("#FFFFF0","Ivory"), + ("#F0FFFF","Azure"), ("#F5DEB3","Wheat"), ("#00008B","DarkBlue"), ("#FFDAB9","PeachPuff"), ("#8B0000","DarkRed"), + ("#FAF0E6","Linen"), ("#B0E0E6","PowderBlue"), ("#FFE4B5","Moccasin"), ("#F5F5F5","WhiteSmoke"), ("#FFF8DC","Cornsilk"), + ("#FFFAFA","Snow"), ("#FFF0F5","LavenderBlush"), ("#FFEBCD","BlanchedAlmond"), ("#F0F8FF","AliceBlue"), + ("#FAEBD7","AntiqueWhite"), ("#FDF5E6","OldLace"), ("#FAFAD2","LightGoldenRodYellow"), ("#F5FFFA","MintCream"), + ("#FFFAF0","FloralWhite"), ("#7CFC00","LawnGreen"), ("#663399","RebeccaPurple")) + codes = {} + for i, pair in enumerate(hex_name): + codes[pair[0]] = i + low = pair[1].lower() + codes[low] = i + codes[low.replace("gray","grey")] = i + codes["green"] = 2 + codes["fuchsia"] = 4 + codes["aqua"] = 6 + + @staticmethod + def to_code(colortext): + try: + return int(colortext) + except ValueError: + return ColorMap.codes.get(colortext.lower().replace("_",""),-1) + + @staticmethod + def check_hex(code): + if not code.startswith("#"): + return None + if len(code) == 4: # convert short code to long code + code = code[0:2] + code[1:3] + code[2:4] + code[3] + if len(code) != 7: + return None + try: + int(code[1:]) # we have a valid hex color code + return code + except ValueError: + return None + + @staticmethod + def to_hex(code): + try: + return ColorMap.hex_name[code][0] + except IndexError: + return -1 + + + +def get_vars(main, time): + result = {} + + time, = get_abs_time(time) + # get last value only + curves = main.get_curves(['$vars'], (time, time)) + for _, value in curves['$vars'].get(): + for var in value.split(): + vars = var.split("|") + if len(vars) == 1: + vars.append("") + if len(vars) == 2: + vars.append(vars[0]) + if len(vars) == 3: + vars.append("") + name, unit, label, color = vars + if not unit in result: + result[unit] = dict(tag = unit, unit = unit.split("_")[0], curves=Dict()) + result[unit]["curves"][name] = dict(name=name, label=label, color=color) + + for unit, curvegroup in result.items(): + color_set = set() + auto_curves = [] + curve_list = list(curvegroup["curves"].values()) + curvegroup['curves'] = curve_list + for curve in curve_list: + col = curve["color"].strip() + c = ColorMap.to_code(col) + if c < 0: + valid = ColorMap.check_hex(col) + if valid: + curve["original_color"] = col + curve["color"] = valid + else: + auto_curves.append(curve) + curve["original_color"] = col + "?" + else: + color_set.add(c) + curve["original_color"] = col + curve["color"] = ColorMap.to_hex(c) + c = 1 # omit white + for curve in auto_curves: + while c in color_set: c += 1 # find unused color + curve["color"] = ColorMap.to_hex(c) + c += 1 + return result + + +def get_curves(main, keys, timerange, show_empty=True): + curves = main.get_curves(keys, get_abs_time(*timerange), maxpoints=500) + return {k: c.for_json() for k, c in curves.items()} diff --git a/seagraph.py b/seagraph.py new file mode 100644 index 0000000..d2be8c9 --- /dev/null +++ b/seagraph.py @@ -0,0 +1,219 @@ +from datetime import date +import time +import sys +import os +import logging +import json +import numpy as np + +class PrettyFloat(float): + def __repr__(self): + return '%.15g' % self + +#encode = "!#$%&'()*+,-./0123456789:;<=>?@ABCDEFGHIJKLMNOPQRSTUVWXYZ[]^_`abcdefghijklmnopqrstuvwxyz{|}~" + +def get_abs_time(times): + now = int(time.time() + 0.999) + oneyear = 365 * 24 * 3600 + return [t + now if t < oneyear else t for t in times] + +class Scanner(object): + def __init__(self, directory, test_day=None): + self.directory = directory + self.last_time = {} + self.test_day = test_day + + def scan(self, variable, timerange, result): + start, to, now = get_abs_time(timerange + [0]) + old = None + t = 0 + for di in range(date.fromtimestamp(start).toordinal(), date.fromtimestamp(to).toordinal() + 1): + d = date.fromordinal(di) + year, mon, day = self.test_day if self.test_day else (d.year, d.month, d.day) + path = self.directory + "logger/%d/%s/%.2d-%.2d.log" % \ + (year, variable.lower(), mon, day) + try: + # logging.info("logger path %s", path) + with open(path) as f: + t0 = time.mktime((d.year, d.month, d.day, 0, 0, 0, 0, 0, -1)) + for line in f: + if line[0] != '#': + t = t0 + (int(line[0:2]) * 60 + int(line[3:5])) * 60 + int(line[6:8]) + if line[-1:] == '\n': + value = line[9:-1] + else: + value = line[9:] + if t < start: + old = value + else: + if old is not None: + self.next(start, old, result) + old = None + self.next(t, value, result) + if t > to: + break + except IOError: + pass + if t < start: + #if t == 0: + # t = start + if old is not None: + self.next(t, old, result) + if t != self.last_time.get(variable,0): + self.last_time[variable] = t + return True + return False + +class NumericScanner(Scanner): + def __init__(self, *args, **kwargs): + Scanner.__init__(self, *args, **kwargs) + + def next(self, t, value, result): + try: + value = PrettyFloat(value) + except: + value = None + result.append([PrettyFloat(t), value]) + #self.value = value + #self.last = t + + def get_message(self, variables, timerange, show_empty=True): + self.dirty = False + result = {} + for var in variables: + self.last = 0 + curve = [] + if self.scan(var, timerange, curve): + self.dirty = True + if show_empty or len(curve) > 1: + result[var] = curve + return result + +class ColorMap(object): + ''' + ColorMap is using official CSS color names, with the exception of Green, as this + is defined differently with X11 colors than in SEA, and used heavily in config files. + Here Green is an alias to Lime (#00FF00) and MidGreen is #008000, which is called Green in CSS. + The function to_code is case insensitive and accepts also names with underscores. + The order is choosen by M. Zolliker for the SEA client, originally only the first 16 were used. + ''' + hex_name = (("#FFFFFF","White"), ("#FF0000","Red"), ("#00FF00","Lime"), ("#0000FF","Blue"), ("#FF00FF","Magenta"), + ("#FFFF00","Yellow"), ("#00FFFF","Cyan"), ("#000000","Black"), ("#FFA500","Orange"), ("#006400","DarkGreen"), + ("#9400D3","DarkViolet"), ("#A52A2A","Brown"), ("#87CEEB","SkyBlue"), ("#808080","Gray"), ("#FF69B4","HotPink"), + ("#FFFFE0","LightYellow"), ("#00FF7F","SpringGreen"), ("#000080","Navy"), ("#1E90FF","DodgerBlue"), + ("#9ACD32","YellowGreen"), ("#008B8B","DarkCyan"), ("#808000","Olive"), ("#DEB887","BurlyWood"), + ("#7B68EE","MediumSlateBlue"), ("#483D8B","DarkSlateBlue"), ("#98FB98","PaleGreen"), ("#FF1493","DeepPink"), + ("#FF6347","Tomato"), ("#32CD32","LimeGreen"), ("#DDA0DD","Plum"), ("#7FFF00","Chartreuse"), ("#800080","Purple"), + ("#00CED1","DarkTurquoise"), ("#8FBC8F","DarkSeaGreen"), ("#4682B4","SteelBlue"), ("#800000","Maroon"), + ("#3CB371","MediumSeaGreen"), ("#FF4500","OrangeRed"), ("#BA55D3","MediumOrchid"), ("#2F4F4F","DarkSlateGray"), + ("#CD853F","Peru"), ("#228B22","ForestGreen"), ("#48D1CC","MediumTurquoise"), ("#DC143C","Crimson"), + ("#D3D3D3","LightGray"), ("#ADFF2F","GreenYellow"), ("#7FFFD4","Aquamarine"), ("#BC8F8F","RosyBrown"), + ("#20B2AA","LightSeaGreen"), ("#C71585","MediumVioletRed"), ("#F0E68C","Khaki"), ("#6495ED","CornflowerBlue"), + ("#556B2F","DarkOliveGreen"), ("#CD5C5C","IndianRed "), ("#2E8B57","SeaGreen"), ("#F08080","LightCoral"), + ("#8A2BE2","BlueViolet"), ("#AFEEEE","PaleTurquoise"), ("#4169E1","RoyalBlue"), ("#0000CD","MediumBlue"), + ("#B8860B","DarkGoldenRod"), ("#00BFFF","DeepSkyBlue"), ("#FFC0CB","Pink"), ("#4B0082","Indigo "), ("#A0522D","Sienna"), + ("#FFD700","Gold"), ("#F4A460","SandyBrown"), ("#DAA520","GoldenRod"), ("#DA70D6","Orchid"), ("#E6E6FA","Lavender"), + ("#5F9EA0","CadetBlue"), ("#D2691E","Chocolate"), ("#66CDAA","MediumAquaMarine"), ("#6B8E23","OliveDrab"), + ("#A9A9A9","DarkGray"), ("#BDB76B","DarkKhaki"), ("#696969","DimGray"), ("#B0C4DE","LightSteelBlue"), + ("#191970","MidnightBlue"), ("#FFE4C4","Bisque"), ("#6A5ACD","SlateBlue"), ("#EE82EE","Violet"), + ("#8B4513","SaddleBrown"), ("#FF7F50","Coral"), ("#008000","MidGreen"), ("#DB7093","PaleVioletRed"), ("#C0C0C0","Silver"), + ("#E0FFFF","LightCyan"), ("#9370DB","MediumPurple"), ("#FF8C00","DarkOrange"), ("#00FA9A","MediumSpringGreen"), + ("#E9967A","DarkSalmon"), ("#778899","LightSlateGray"), ("#9932CC","DarkOrchid"), ("#EEE8AA","PaleGoldenRod"), + ("#F8F8FF","GhostWhite"), ("#FFA07A","LightSalmon"), ("#ADD8E6","LightBlue"), ("#D8BFD8","Thistle"), + ("#FFE4E1","MistyRose"), ("#FFDEAD","NavajoWhite"), ("#40E0D0","Turquoise"), ("#90EE90","LightGreen"), + ("#B22222","FireBrick"), ("#008080","Teal"), ("#F0FFF0","HoneyDew"), ("#FFFACD","LemonChiffon"), ("#FFF5EE","SeaShell"), + ("#F5F5DC","Beige"), ("#DCDCDC","Gainsboro"), ("#FA8072","Salmon"), ("#8B008B","DarkMagenta"), ("#FFB6C1","LightPink"), + ("#708090","SlateGray"), ("#87CEFA","LightSkyBlue"), ("#FFEFD5","PapayaWhip"), ("#D2B48C","Tan"), ("#FFFFF0","Ivory"), + ("#F0FFFF","Azure"), ("#F5DEB3","Wheat"), ("#00008B","DarkBlue"), ("#FFDAB9","PeachPuff"), ("#8B0000","DarkRed"), + ("#FAF0E6","Linen"), ("#B0E0E6","PowderBlue"), ("#FFE4B5","Moccasin"), ("#F5F5F5","WhiteSmoke"), ("#FFF8DC","Cornsilk"), + ("#FFFAFA","Snow"), ("#FFF0F5","LavenderBlush"), ("#FFEBCD","BlanchedAlmond"), ("#F0F8FF","AliceBlue"), + ("#FAEBD7","AntiqueWhite"), ("#FDF5E6","OldLace"), ("#FAFAD2","LightGoldenRodYellow"), ("#F5FFFA","MintCream"), + ("#FFFAF0","FloralWhite"), ("#7CFC00","LawnGreen"), ("#663399","RebeccaPurple")) + codes = {} + for i, pair in enumerate(hex_name): + codes[pair[0]] = i + low = pair[1].lower() + codes[low] = i + codes[low.replace("gray","grey")] = i + codes["green"] = 2 + codes["fuchsia"] = 4 + codes["aqua"] = 6 + + @staticmethod + def to_code(colortext): + try: + return int(colortext) + except ValueError: + return ColorMap.codes.get(colortext.lower().replace("_",""),-1) + + @staticmethod + def check_hex(code): + if not code.startswith("#"): + return None + if len(code) == 4: # convert short code to long code + code = code[0:2] + code[1:3] + code[2:4] + code[3] + if len(code) != 7: + return None + try: + int(code[1:]) # we have a valid hex color code + return code + except ValueError: + return None + + @staticmethod + def to_hex(code): + try: + return ColorMap.hex_name[code][0] + except IndexError: + return -1 + +class VarsScanner(Scanner): + colors = {"red":0} + def __init__(self, directory, test_day=None): + Scanner.__init__(self, directory, test_day=test_day) + logging.info('vars dir %s', directory) + + def next(self, t, value, result): + logging.info('vars %s', value) + for var in value.strip().split(" "): + vars = var.split("|") + if len(vars) == 1: + vars.append("") + if len(vars) == 2: + vars.append(vars[0]) + if len(vars) == 3: + vars.append("") + name, unit, label, color = vars + if not unit in result: + result[unit] = dict(tag = unit, unit = unit.split("_")[0], curves=[]) + result[unit]["curves"].append(dict(name=name, label=label, color=color)) + + def get_message(self, time): + # get last value only + result = {} + self.scan("vars", [time, time], result) + for unit in result: + color_set = set() + auto_curves = [] + for curve in result[unit]["curves"]: + col = curve["color"].strip() + c = ColorMap.to_code(col) + if c < 0: + valid = ColorMap.check_hex(col) + if valid: + curve["original_color"] = col + curve["color"] = valid + else: + auto_curves.append(curve) + curve["original_color"] = col + "?" + else: + color_set.add(c) + curve["original_color"] = col + curve["color"] = ColorMap.to_hex(c) + c = 1 # omit white + for curve in auto_curves: + while c in color_set: c += 1 # find unused color + curve["color"] = ColorMap.to_hex(c) + c += 1 + return result + diff --git a/seagraph_new.py b/seagraph_new.py new file mode 100644 index 0000000..1932d2d --- /dev/null +++ b/seagraph_new.py @@ -0,0 +1,232 @@ +from datetime import date +import time +import sys +import os +import logging +import json + +class PrettyFloat(float): + def __repr__(self): + return '%.15g' % self + +#encode = "!#$%&'()*+,-./0123456789:;<=>?@ABCDEFGHIJKLMNOPQRSTUVWXYZ[]^_`abcdefghijklmnopqrstuvwxyz{|}~" + +def get_abs_time(times): + now = int(time.time() + 0.999) + oneyear = 365 * 24 * 3600 + return [t + now if t < oneyear else t for t in times] + +class Scanner(object): + def __init__(self, directory, test_day=None): + self.directory = directory + self.last_time = {} + self.test_day = test_day + + def scan(self, variable, timerange, next_point, *args, maxpoints=1000): + """scan *variable* for *timerange* + + args is forwarded to self.next() and contains at least the + initialisation for the result + returns a tuple(, result) + is true when the last time on the variable has changed + """ + start, to, now = get_abs_time(timerange + [0]) + tresolution = (to - start) / float(maxpoints) if maxpoints else 0 + old = None + t = 0 + for di in range(date.fromtimestamp(start).toordinal(), date.fromtimestamp(to).toordinal() + 1): + d = date.fromordinal(di) + year, mon, day = self.test_day if self.test_day else (d.year, d.month, d.day) + path = self.directory + "logger/%d/%s/%.2d-%.2d.log" % \ + (year, variable.lower(), mon, day) + try: + # logging.info("logger path %s", path) + with open(path) as f: + t0 = time.mktime((d.year, d.month, d.day, 0, 0, 0, 0, 0, -1)) + for line in f: + if line[0] != '#': + t = t0 + (int(line[0:2]) * 60 + int(line[3:5])) * 60 + int(line[6:8]) + if line[-1:] == '\n': + value = line[9:-1] + else: + value = line[9:] + if t < start: + old = value + else: + if old is not None: + next_point(start, old, tresolution, *args) + old = None + self.next(t, value, result) + if t > to: + break + except IOError: + pass + if t < start: + #if t == 0: + # t = start + if old is not None: + self.next(t, old, result) + if t != self.last_time.get(variable,0): + self.last_time[variable] = t + return True, result + return False, result + +class NumCurve(object): + def __init__(self, tresolution): + self.result = [] + self.tresolution = tresolution + + def append(self, t, txtvalue): + try: + value = PrettyFloat(txtvalue) + except: + value = None + self.result.append((PrettyFloat(t), value)) + self.value = value + self.last = t + +class NumericScanner(Scanner): + def __init__(self, *args, **kwargs): + Scanner.__init__(self, *args, **kwargs) + + def get_message(self, variables, timerange, show_empty=True): + tresolution = timerange / x + + + + self.dirty = False + result = {} + for var in variables: + self.last = 0 + self.dirty, curve = self.scan(var, timerange, []): + if show_empty or len(curve) > 1: + result[var] = curve + return result + +class ColorMap(object): + ''' + ColorMap is using official CSS color names, with the exception of Green, as this + is defined differently with X11 colors than in SEA, and used heavily in config files. + Here Green is an alias to Lime (#00FF00) and MidGreen is #008000, which is called Green in CSS. + The function to_code is case insensitive and accepts also names with underscores. + The order is choosen by M. Zolliker for the SEA client, originally only the first 16 were used. + ''' + hex_name = (("#FFFFFF","White"), ("#FF0000","Red"), ("#00FF00","Lime"), ("#0000FF","Blue"), ("#FF00FF","Magenta"), + ("#FFFF00","Yellow"), ("#00FFFF","Cyan"), ("#000000","Black"), ("#FFA500","Orange"), ("#006400","DarkGreen"), + ("#9400D3","DarkViolet"), ("#A52A2A","Brown"), ("#87CEEB","SkyBlue"), ("#808080","Gray"), ("#FF69B4","HotPink"), + ("#FFFFE0","LightYellow"), ("#00FF7F","SpringGreen"), ("#000080","Navy"), ("#1E90FF","DodgerBlue"), + ("#9ACD32","YellowGreen"), ("#008B8B","DarkCyan"), ("#808000","Olive"), ("#DEB887","BurlyWood"), + ("#7B68EE","MediumSlateBlue"), ("#483D8B","DarkSlateBlue"), ("#98FB98","PaleGreen"), ("#FF1493","DeepPink"), + ("#FF6347","Tomato"), ("#32CD32","LimeGreen"), ("#DDA0DD","Plum"), ("#7FFF00","Chartreuse"), ("#800080","Purple"), + ("#00CED1","DarkTurquoise"), ("#8FBC8F","DarkSeaGreen"), ("#4682B4","SteelBlue"), ("#800000","Maroon"), + ("#3CB371","MediumSeaGreen"), ("#FF4500","OrangeRed"), ("#BA55D3","MediumOrchid"), ("#2F4F4F","DarkSlateGray"), + ("#CD853F","Peru"), ("#228B22","ForestGreen"), ("#48D1CC","MediumTurquoise"), ("#DC143C","Crimson"), + ("#D3D3D3","LightGray"), ("#ADFF2F","GreenYellow"), ("#7FFFD4","Aquamarine"), ("#BC8F8F","RosyBrown"), + ("#20B2AA","LightSeaGreen"), ("#C71585","MediumVioletRed"), ("#F0E68C","Khaki"), ("#6495ED","CornflowerBlue"), + ("#556B2F","DarkOliveGreen"), ("#CD5C5C","IndianRed "), ("#2E8B57","SeaGreen"), ("#F08080","LightCoral"), + ("#8A2BE2","BlueViolet"), ("#AFEEEE","PaleTurquoise"), ("#4169E1","RoyalBlue"), ("#0000CD","MediumBlue"), + ("#B8860B","DarkGoldenRod"), ("#00BFFF","DeepSkyBlue"), ("#FFC0CB","Pink"), ("#4B0082","Indigo "), ("#A0522D","Sienna"), + ("#FFD700","Gold"), ("#F4A460","SandyBrown"), ("#DAA520","GoldenRod"), ("#DA70D6","Orchid"), ("#E6E6FA","Lavender"), + ("#5F9EA0","CadetBlue"), ("#D2691E","Chocolate"), ("#66CDAA","MediumAquaMarine"), ("#6B8E23","OliveDrab"), + ("#A9A9A9","DarkGray"), ("#BDB76B","DarkKhaki"), ("#696969","DimGray"), ("#B0C4DE","LightSteelBlue"), + ("#191970","MidnightBlue"), ("#FFE4C4","Bisque"), ("#6A5ACD","SlateBlue"), ("#EE82EE","Violet"), + ("#8B4513","SaddleBrown"), ("#FF7F50","Coral"), ("#008000","MidGreen"), ("#DB7093","PaleVioletRed"), ("#C0C0C0","Silver"), + ("#E0FFFF","LightCyan"), ("#9370DB","MediumPurple"), ("#FF8C00","DarkOrange"), ("#00FA9A","MediumSpringGreen"), + ("#E9967A","DarkSalmon"), ("#778899","LightSlateGray"), ("#9932CC","DarkOrchid"), ("#EEE8AA","PaleGoldenRod"), + ("#F8F8FF","GhostWhite"), ("#FFA07A","LightSalmon"), ("#ADD8E6","LightBlue"), ("#D8BFD8","Thistle"), + ("#FFE4E1","MistyRose"), ("#FFDEAD","NavajoWhite"), ("#40E0D0","Turquoise"), ("#90EE90","LightGreen"), + ("#B22222","FireBrick"), ("#008080","Teal"), ("#F0FFF0","HoneyDew"), ("#FFFACD","LemonChiffon"), ("#FFF5EE","SeaShell"), + ("#F5F5DC","Beige"), ("#DCDCDC","Gainsboro"), ("#FA8072","Salmon"), ("#8B008B","DarkMagenta"), ("#FFB6C1","LightPink"), + ("#708090","SlateGray"), ("#87CEFA","LightSkyBlue"), ("#FFEFD5","PapayaWhip"), ("#D2B48C","Tan"), ("#FFFFF0","Ivory"), + ("#F0FFFF","Azure"), ("#F5DEB3","Wheat"), ("#00008B","DarkBlue"), ("#FFDAB9","PeachPuff"), ("#8B0000","DarkRed"), + ("#FAF0E6","Linen"), ("#B0E0E6","PowderBlue"), ("#FFE4B5","Moccasin"), ("#F5F5F5","WhiteSmoke"), ("#FFF8DC","Cornsilk"), + ("#FFFAFA","Snow"), ("#FFF0F5","LavenderBlush"), ("#FFEBCD","BlanchedAlmond"), ("#F0F8FF","AliceBlue"), + ("#FAEBD7","AntiqueWhite"), ("#FDF5E6","OldLace"), ("#FAFAD2","LightGoldenRodYellow"), ("#F5FFFA","MintCream"), + ("#FFFAF0","FloralWhite"), ("#7CFC00","LawnGreen"), ("#663399","RebeccaPurple")) + codes = {} + for i, pair in enumerate(hex_name): + codes[pair[0]] = i + low = pair[1].lower() + codes[low] = i + codes[low.replace("gray","grey")] = i + codes["green"] = 2 + codes["fuchsia"] = 4 + codes["aqua"] = 6 + + @staticmethod + def to_code(colortext): + try: + return int(colortext) + except ValueError: + return ColorMap.codes.get(colortext.lower().replace("_",""),-1) + + @staticmethod + def check_hex(code): + if not code.startswith("#"): + return None + if len(code) == 4: # convert short code to long code + code = code[0:2] + code[1:3] + code[2:4] + code[3] + if len(code) != 7: + return None + try: + int(code[1:]) # we have a valid hex color code + return code + except ValueError: + return None + + @staticmethod + def to_hex(code): + try: + return ColorMap.hex_name[code][0] + except IndexError: + return -1 + +class VarsScanner(Scanner): + colors = {"red":0} + def __init__(self, directory, test_day=None): + Scanner.__init__(self, directory, test_day=test_day) + logging.info('vars dir %s', directory) + + def next(self, t, value, result): + logging.info('vars %s', value) + for var in value.strip().split(" "): + vars = var.split("|") + if len(vars) == 1: + vars.append("") + if len(vars) == 2: + vars.append(vars[0]) + if len(vars) == 3: + vars.append("") + name, unit, label, color = vars + if not unit in result: + result[unit] = dict(tag = unit, unit = unit.split("_")[0], curves=[]) + result[unit]["curves"].append(dict(name=name, label=label, color=color)) + + def get_message(self, time): + # get last value only + _, result = self.scan("vars", [time, time], {}) + for unit, curvegroup in result.items(): + color_set = set() + auto_curves = [] + for curve in curvegroup["curves"]: + col = curve["color"].strip() + c = ColorMap.to_code(col) + if c < 0: + valid = ColorMap.check_hex(col) + if valid: + curve["original_color"] = col + curve["color"] = valid + else: + auto_curves.append(curve) + curve["original_color"] = col + "?" + else: + color_set.add(c) + curve["original_color"] = col + curve["color"] = ColorMap.to_hex(c) + c = 1 # omit white + for curve in auto_curves: + while c in color_set: c += 1 # find unused color + curve["color"] = ColorMap.to_hex(c) + c += 1 + return result + diff --git a/seaweb.py b/seaweb.py new file mode 100755 index 0000000..9db0b26 --- /dev/null +++ b/seaweb.py @@ -0,0 +1,965 @@ +#!/usr/bin/env python + +import sys +sys.path.append("./lib") +# Make sure your gevent version is >= 1.0 +import gevent +import gevent.pywsgi +import gevent.queue +import flask +import time +import pprint +import random +import time +from datetime import date +from collections import deque +import sys +import socket +import tcp_lineserver +import uuid +import seagraph +import traceback +import logging +import circularlog + +import os +import signal + + + +try: import simplejson as json +except ImportError: import json + +def guess_mimetype(filename): + if filename.endswith('.js'): + mimetype = 'text/javascript' + elif filename.endswith('.css'): + mimetype = 'text/css' + elif filename.endswith('.ico'): + mimetype = 'image/x-icon' + else: + mimetype = 'text/html' + return mimetype + +#class SeawebException(Exception): +# pass + +# SSE 'protocol' is described here: http://mzl.la/UPFyxY +def to_json_sse(msg): + txt = json.dumps(msg, separators=(',',': ')) + # print 'MSG(size='+str(len(txt))+')', txt[:256] + return 'data: %s\n\n' % txt + +app = flask.Flask(__name__) + +update_rider = circularlog.Rider("upd") + +@app.route('/update') +def get_update(path=None): + # Client Adress: socket.getfqdn(flask.request.remote_addr) + client = instrument.newClient() + client.remote_info = circularlog.strtm() + " " + socket.getfqdn(flask.request.remote_addr.split(':')[-1]) + + @flask.stream_with_context + def generator(): + logging.info('UPDATE %s %s', client.id, socket.getfqdn(flask.request.remote_addr.split(':')[-1])) + #msg = dict(type='id', id=client.id, title=instrument.title); + #yield to_json_sse(msg) + try: + lastmsg = time.time() + while True: + if client.info() == "": + print(time.time()-lastmsg) + messages = client.poll() + for msg in messages: + update_rider.put('-', repr(msg)) + yield to_json_sse(msg) + if messages: + lastmsg = time.time() + else: + if time.time() > lastmsg + 30: + if not client.info(): + raise GeneratorExit("no activity") + logging.info('HEARTBEAT %s (%s)', client.id, "; ".join(client.info())) + yield to_json_sse(dict(type='heartbeat')) + lastmsg = time.time() + else: + gevent.sleep(0.5) + except (GeneratorExit, tcp_lineserver.Disconnected): + logging.info('CLOSED %s', client.id) + instrument.remove(client) + pass + except Exception as e: + logging.info('error') + logging.error('%s', traceback.format_exc()) + instrument.remove(client) + #msg = dict(type='error',error=traceback.format_exc()) + #yield to_json_sse(msg) + + resp = flask.Response(generator(), mimetype='text/event-stream') + resp.headers['Access-Control-Allow-Origin'] = '*' + return resp + +@app.route('/circular') +def dump_circular(): + circularlog.log() + return "log" + +@app.route('/clients') +def show_clients(): + result = "" + for id in instrument.clients: + c = instrument.clients[id] + result += c.remote_info + " " + "; ".join(c.info()) + "
" + return result + +@app.route('/getblock') +@app.route('/updateblock') +@app.route('/sendcommand') +@app.route('/console') +@app.route('/graph') +@app.route('/updategraph') +@app.route('/gettime') +@app.route('/getvars') +def reply(): + args = flask.request.args + kwargs = dict((k, args.get(k)) for k in args) + path = flask.request.path + logging.info('GET %s %s', path, repr(kwargs)) + try: + id = kwargs.pop('id') + client = instrument.clients[id] + msg = getattr(client, "w_" + path[1:])(**kwargs) + except Exception as e: + logging.error('%s', traceback.format_exc()) + circularlog.log() + msg = dict(type='error', request=path[1:], error=repr(e)) + resp = flask.Response(json.dumps(msg), mimetype='application/json') + resp.headers['Access-Control-Allow-Origin'] = '*' + return resp + +@app.route('/test/') +def subdir_test_file(file): + gevent.sleep(2) + try: + with open("client/test/"+file, 'r') as content_file: + content = content_file.read() + except IOError: + flask.abort(404) + resp = flask.Response(content, mimetype=guess_mimetype(file)) + return resp + +@app.route('/jsFiles/') +@app.route('/cssFiles/') +@app.route('/externalFiles/') +def subdir_file(file): + subdir = flask.request.path.split('/')[1] + try: + with open("client/" + subdir+"/"+file, 'r') as content_file: + content = content_file.read() + except IOError: + flask.abort(404) + resp = flask.Response(content, mimetype=guess_mimetype(file)) + #resp.headers['Content-Security-Policy'] = "sandbox; script-src 'unsafe-inline';" + return resp + +@app.route('/externalFiles/maps/.map') +def replace_by_empty(file): + return "" + +@app.route('/') +def default(): + return general_file('SEAWebClient.html') + +@app.route('/') +def general_file(file): + subdir = "client/" + try: + with open(subdir+file, 'r') as content_file: + content = content_file.read() + except IOError: + flask.abort(404) + resp = flask.Response(content, mimetype=guess_mimetype(file)) + #resp.headers['Content-Security-Policy'] = "sandbox; script-src 'unsafe-inline';" + return resp + +def hostport_split(hostport): + h = hostport.split(':') + return (h[0], int(h[1])) + +def sea_request_reply(socket, command, tmo=5): + t = 0 + # wait for at most 60 seconds for not beeing busy + while socket.busy: + if t >= 60: + logging.error('still busy at %s (before command %s)', getattr(socket, "name", "noname"), command) + socket.busy = False + else: + gevent.sleep(0.1) + t += 0.1 + if t > 5: + logging.warning('unusual wait time %.4g (before command %s)', t, command) + #print command + socket.busy = True + socket.send_line("fulltransact "+command) + data = [] + dumpdata = [] + t = 0 + while True: + while socket.busy: + line = socket.get_line() + if line != None: + t = 0 + break + if t >= tmo: + socket.busy = False + logging.error('timeout on command %s (%s)', command, getattr(socket, "name", "noname")) + socket.reconnect() + raise Exception("timeout") + gevent.sleep(0.1) + t += 0.1 + else: + logging.error('interrupted command %s (%s)', command, getattr(socket, "name", "noname")) + socket.reconnect() + raise Exception("timeout") + dumpdata.append(line) + if line == 'TRANSACTIONFINISHED': + break + elif line.startswith('TRANSACTIONSTART '): + data = [] + else: + data.append(line) + if t>2: + logging.info('DUMPDATA %.4g %s', t, '|'.join(dumpdata)) + socket.busy = False + return data + +class SeaGroup(object): + def __init__(self): + self.version = 0 + self.components = [] + self.grouptitle = "untitled" + self.lastpoll = 0 + self.lastreq = 0 + self.empty_values = {} + +class Instrument(object): + def remove(self, client): + try: + del self.clients[client.id] + except KeyError: + logger.warning('client already removed %s', client.id) + + def register(self, client): + self.clients[client.id] = client + return client + +class SeaInstrument(Instrument): + # convert SEA layout tag like "-W" to more meaningful name. + # the code: 0: modifier, 1: enum name, 2: input element + tags = { + '-W': ('width', 0), + '-T': ('title', 0), + '-H': ('tooltip', 0), + '-V': ('value', 0), + '-S': ('style', 0), + '-D': ('div', 0), + '-r': ('enum_name', 1), + '-R': ('enum', 2), + '-I': ('input', 2), + '-i': ('rdonly', 2), + '-L': ('rdonly', 2), + '-G': ('group', 2), + '-C': ('checkbox', 2), + '-B': ('pushbutton', 2), + '-l': ('link', 0), + '-E': ('end', 2), + } + + def __init__(self, inst_name, instrument_config): + self.host_port = hostport_split(instrument_config['sea']) + self.inst_name = inst_name + self.clients = {} + self.logger_dir = instrument_config.get('logger_dir','') + test_day = instrument_config.get('test_day', None) + self.test_day = [int(x) for x in test_day.split('-')] if test_day else None + self.seaspy = tcp_lineserver.LineClient(self.host_port, ['Spy 007'], True, ridername='spy') + self.seaspy.busy = False + self.seaspy.name = "SEA seaspy" + self.seacmd = None + self.last_client_remove = time.time() + self.history = deque(maxlen=1000) + self.init() + gevent.Greenlet.spawn(self.checkconnections) + + def init(self): + self.values = {} + self.groups = {} + self.device = sea_request_reply(self.seaspy, "samenv name")[0] # first line + self.consolepos = 0 + self.timeStamp = None + self.history.clear() + self.lastcmd = None + + def checkconnections(self): + while True: + if len(self.clients) == 0 and self.seaspy.connected: + if time.time() > self.last_client_remove + 30: + logging.info("close SEA connections") + self.history.clear() + self.seaspy.close() + if self.seacmd: + self.seacmd.close() + gevent.sleep(10) + + def newClient(self): + if not self.seaspy.connected: + self.init() + return self.register(SeaClient()) + + def remove(self, client): + Instrument.remove(self, client) + self.last_client_remove = time.time() + + def findgroup(self, path): + 'get a group from sea and store it' + if not path in self.groups: + self.groups[path] = SeaGroup() + self.groups[path].lastpoll = 0 + self.groups[path].lastreq = time.time() + self.poll_groups([path]) + return self.groups[path] + + def poll_groups(self, paths): + 'polls values and components of requested groups' + for path in paths: + gobj = self.groups[path] + now = time.time() + if now < gobj.lastpoll + 0.5: + # print 'too fast', path + continue # do not poll before 500 ms have passed + gobj.lastreq = now + gobj.lastpoll = now + try: + data = sea_request_reply(self.seaspy, 'getgroup '+path) + except Exception as e: + logging.error('ERROR (getgroup %s) %s', path, traceback.format_exc()) + continue + components = [] + values = {} + grouptitle = None + within_enum = False + item = {} + olditem = {} + for line in data: + (key, type) = self.tags.get(line[0:2], ('',-1)) + if type < 0: + continue + if type == 0: # modifier + item[key] = line[2:] + continue + if within_enum and type >= 2: + enum['enum_names'] = enum_names + name = enum['name'] + self.values[name] = enum.get('value', '') + values[name] = None #NEW + del enum['value'] + components.append(enum) + del enum + within_enum = False + if type == 1: + item['value'] = line[2:] + enum_names.append(item) + item = {} + continue + if key == 'enum': + enum = item + item = {} + within_enum = True + enum['type'] = key + enum['name'] = line[2:] + enum_names = [] + continue + if key == 'pushbutton': + if line[2] != ' ': + continue # skip special buttons + line = line[1:] # skip space + try: + item['value'] = item['title'] + item['title'] = ' ' + # was olditem.get('title',olditem['name']) + except: + pass + if key == 'end': + continue + if key == 'group': + if grouptitle == None: + grouptitle = item.get('title', line[2:]) + item = {} + continue + item['type'] = key + name = line[2:] + try: + self.values[name] = item['value'] + values[name] = None + del item['value'] + except KeyError: + pass + item['name'] = name + components.append(item) + olditem = item + item = {} + if gobj.components != components: + gobj.components = components + gobj.empty_values = values + gobj.version += 1 + #print 'changed',path, gobj.version + #print 'FROM', gobj.components + #print 'TO ', components + if grouptitle != None: + gobj.grouptitle = grouptitle + + + def make_seacmd(self): + global port + if not self.seacmd: + self.seacmd = tcp_lineserver.LineClient(self.host_port, + ['seauser seaser', 'fulltransact config listen 1', 'fulltransact commandlog tail 200'], + True, ridername='cmd') + self.seacmd.name = "SEA user" + self.seacmd.connect() + + def console(self): + self.make_seacmd() + + def addconsole(self, msg): + self.history.append(msg) + self.consolepos += 1 + + def pollconsole(self): + if not self.seacmd: + return + while True: + line = self.seacmd.get_line() + if line == None: + return None + if (line.startswith('Deleting connection') + or line.startswith('Accepted connection') + or (line.startswith('User ') and ' privilege' in line) + or line.startswith('Change of Authorisation') + or line.startswith('fulltransact config ') + or line.startswith('UserRights = ') + or line.startswith('fulltransact status') + or line == 'OK' or line == 'OK.' + or line.startswith('fulltransact commandlog tail') + or line.startswith('Login OK') + or line.startswith('TRANSACTIONSTART commandlog tail ') + ): + pass + elif line.startswith('TRANSACTIONSTART'): + if self.lastcmd != None: + self.addconsole(('command', self.lastcmd, self.lastid)) + self.lastcmd = None + elif line == 'TRANSACTIONFINISHED': + self.lastid = 0 + elif line.startswith('fulltransact '): + type = 'command' + self.addconsole(('command', line[13:], 0)) + elif line.startswith('==='): + self.timeStamp = line + elif line > " ": + if self.timeStamp: + self.addconsole(('reply', self.timeStamp, self.lastid)) + self.timeStamp = None + type = 'reply' + self.addconsole(('reply', line, self.lastid)) + + def getconsole(self, startindex, id): + idx = min(len(self.history), self.consolepos - startindex) # distance from end + messages = [] + for i in range(-idx,0): + type, line, hid = self.history[i] + messages.append(dict(type=type, line=line, origin=('self' if hid==id else 'other'))) + return self.consolepos, messages + + def command(self, command, id): + self.make_seacmd() + self.seacmd.send_line('fulltransact '+command) + self.lastid = id + self.lastcmd = command + + +class SeaGraph(object): + HISTORICAL = 0 + ACTUAL = 1 + LIVE = 2 + + def __init__(self): + self.livemode = self.HISTORICAL + self.time = [0, 0] + self.lastvalues = {} + + def strip_future(self, result): + 'strip future points (happens only on dummy test_day)' + # if self.livemode == self.LIVE: + for c in result.values(): + while c: + lastt, lastx = c[-1] + if lastt <= self.time[1]: + break + c.pop() + + def complete_to_end(self, result, endtime): + for var, c in result.items(): + if c: + lastt, lastx = c[-1] + if lastt < endtime: + c.append((endtime, lastx)) + self.lastvalues[var] = (endtime, lastx) + + def w_graph(self, variables, time="-1800,0"): + time = [float(t) for t in time.split(',')] + self.last_t = 0 + start, end, now = seagraph.get_abs_time(time + [0]) + self.time = [start, end] + self.variables = variables.split(',') + self.livemode = self.ACTUAL if end >= now else self.HISTORICAL + logging.info('LIVE %g %g %d %d', end, now, end >= now, self.livemode) + self.scanner = seagraph.NumericScanner(instrument.logger_dir, instrument.test_day) + #result = self.scanner.get_message(self.variables, self.time) + #self.time[0] = self.time[1] + result = self.scanner.get_message(self.variables, self.time, show_empty=True) + self.strip_future(result) + for var in ('treg.set.reg', 'mf'): + curve = result.get(var,[(0,0)]) + print(var, curve[0][0] - now, curve[-1][0] - now, curve) + self.complete_to_end(result, end) + self.time[0] = self.time[1] + # reduction not yet implemented + return dict(type='graph-draw', reduced=False, graph=result) + + def w_gettime(self, time): + time = [float(t) for t in time.split(',')] + return dict(type='time', time= seagraph.get_abs_time(time)) + + def w_getvars(self, time): + time = [float(t) for t in time.split(',')] + scanner = seagraph.VarsScanner(instrument.logger_dir, instrument.test_day) + result = dict(type='var_list') + result['blocks'] = list(scanner.get_message(time[-1]).values()) + return result + + def w_updategraph(self): + logging.info("UPD GRAPH %d", self.livemode) + if self.livemode == self.HISTORICAL: + return dict(type='accept-graph', live=False) + else: + self.livemode = self.LIVE + return dict(type='accept-graph', live=True) + #self.livemode = self.LIVE + #return dict(type='accept-graph', live=True) + + def graphpoll(self): + if self.livemode == self.LIVE: + self.time[1], = seagraph.get_abs_time([0]) + else: + self.time[1] = self.time[0] # do not update + if self.time[1] > self.time[0]: + result = self.scanner.get_message(self.variables, self.time, show_empty=False) + self.strip_future(result) + if int(self.time[1] / 60) != int(self.time[0] / 60): + # update unchanged values + for var, (lastt, lastx) in self.lastvalues.items(): + if var not in result: + result[var] = [(self.time[1], lastx)] + self.time[0] = self.time[1] + if len(result) > 0: + return dict(type='graph-update', reduced=False, time=self.time[1], graph=result) + return None + +class SeaClient(SeaGraph): + + def __init__(self): + self.group_version = {} + self.group_values = {} + self.values = {} + self.consolepos = 0 + self.id = uuid.uuid4().hex[0:15] + SeaGraph.__init__(self) + self.queue = [dict(type='id', id=self.id, instrument=instrument.inst_name, device=instrument.device)] + + def poll(self): + messages = self.queue + self.queue = [] + updates = [] + # group updates + instrument.poll_groups(self.group_version.keys()) + for path, gv in self.group_version.items(): + if path in self.group_values: + gobj = instrument.groups[path] + if gv != gobj.version: + logging.info('redraw: %s client: %d instr: %d', path, gv, gobj.version) + self.group_version[path] = gobj.version + messages.append(dict(type='redraw', path=path)) + break + else: + values = self.group_values[path] + for name, client_value in values.items(): + inst_value = instrument.values.get(name, None) + if client_value != inst_value: + values[name] = inst_value + updates.append({'name': name, 'value': inst_value}) + if len(updates) > 0: + messages.append(dict(type='update', updates=updates)) + # console messages + instrument.pollconsole() + self.consolepos, msg = instrument.getconsole(self.consolepos, self.id) + messages.extend(msg) + + # graph messages + msg = self.graphpoll() + if msg: + messages.append(msg) + return messages + + def info(self): + return self.group_version.keys() + + def w_getblock(self, path): + gobj = instrument.findgroup(path.split(',')[-1]) + self.group_version[path] = gobj.version + logging.info('getblock %s %d', path, gobj.version) + return dict(type='draw', title=gobj.grouptitle, path=path, components=gobj.components) + + def w_updateblock(self, path): + gobj = instrument.findgroup(path) + logging.info('make active %s', path) + #if not path in self.group_values: + self.group_values[path] = gobj.empty_values.copy() + return dict(type='accept-block') + + def w_console(self): + self.consolepos = 0 + instrument.console() + return dict(type='accept-console') + + def w_sendcommand(self, command): + instrument.command(command, self.id) + return dict(type='accept-command') + + +class DummyClient(SeaGraph): + async = set(('id','update','redraw','command','reply','graph-update','graph-redraw')) + + def __init__(self, host_port): + self.linesocket = tcp_lineserver.LineClient(host_port) + self.id = uuid.uuid4().hex[0:15] + self.linesocket.send_line(json.dumps(dict(type='init', id=self.id))) + self.queue = [] + self.syncreply = [] + SeaGraph.__init__(self) + + def cmd_reply(self, command, replytype, tmo=5): + self.linesocket.send_line(json.dumps(command)) + t = 0 + while True: + if self.syncreply: + msg = self.syncreply.pop(0) + break + line = self.linesocket.get_line() + if line != None: + msg = json.loads(line) + if msg['type'] in self.async: + t = 0 + # print 'PUSH',msg, replytype + self.queue.append(msg) + else: + break + if t >= tmo: + # print 'TIMEOUT' + raise Exception("timeout") + gevent.sleep(0.1) + t += 0.1 + if msg['type'] != replytype: + logging.error('REPLY MISMATCH %s %s <> %s' , command, replytype, msg['type']) + return msg + + + def w_getblock(self, path): + return self.cmd_reply(dict(type='getblock', path=path, id=self.id), 'draw') + + def w_updateblock(self, path): + return self.cmd_reply(dict(type='updateblock', path=path, id=self.id), 'accept-block') + + def w_console(self): + return self.cmd_reply(dict(type='console', id=self.id), 'accept-console') + + def w_sendcommand(self, command): + return self.cmd_reply(dict(type='sendcommand', command=command, id=self.id), 'accept-command') + + def poll(self): + if self.queue: + messages = self.queue + self.queue = [] + return messages + line = self.linesocket.get_line() + messages = [] + if line: + msg = json.loads(line) + if msg['type'] in self.async: + messages.append(msg) + else: + self.syncreply.append(msg) + + # graph messages + msg = self.graphpoll() + if msg: + messages.append(msg) + return messages + + def info(self): + return ["na"] + +class DummyInstrument(Instrument): + + def __init__(self, inst_name, instrument_config): + self.instrument_config = instrument_config + self.host_port = hostport_split(instrument_config['hostport']) + self.logger_dir = instrument_config.get('logger_dir', '') + test_day = instrument_config.get('test_day', None) + self.test_day = [int(x) for x in test_day.split('-')] if test_day else None + self.title = inst_name + self.clients = {} + + def newClient(self): + return self.register(DummyClient(self.host_port)) + +class SecopMsg: + def __init__(self, line): + self.par = None + self.value = None + sl = line.split(' ') + if len(sl[0].split(',')) > 1: + self.type = 'idn' + self.value = line + else: + self.type = sl[0] + if len(sl) > 1: + self.par = sl[1] + if len(sl) > 2: + self.value = json.loads(' '.join(sl[2:])) + self.async = self.type == 'event' + +def SecopEncode(cmd, par=None, value=None): + line = cmd + if par: + line += " " + par + if value: + line += " " + json.dumps(value) + return line + +def convert_par(module, name, par): + result = dict(type='input', name=module+":"+name, title=name) + if par['readonly']: + result['type']='rdonly' + #print result + return result + +def convert_event(messages): + if isinstance(messages, SecopMsg): + messages = [messages] + updates = [] + for msg in messages: + if msg['type'] == 'event': + updates.append(dict(name=msg.par, value=str(msg.value[0]))) + return [dict(type='update', updates=updates)] + +class SecopClient(object): + prio_par = ["value", "status", "target"] + hide_par = ["baseclass", "class", "pollinterval"] + skip_par = ["status2"] + + def __init__(self, host_port): + self.linesocket = tcp_lineserver.LineClient(host_port) + self.id = uuid.uuid4().hex[0:15] + self.queue = [] + self.syncreply = [] + self.consolequeue = [] + #self.out = open("debug.txt", "w") + #self.out = sys.stdout + self.out = None + idn = self.cmd_reply("*IDN?", "idn") + self.idn = idn.value + self.description = self.cmd_reply("describe", "describing").value + + def cmd_reply(self, command, replytype, tmo=5): + self.replytype = replytype + if self.out: self.out.write(">"+command+"\n") + self.consolequeue.append(dict(type='command',line=command,origin='self')) + self.linesocket.send_line(command) + t = 0 + while True: + if self.syncreply: + msg = self.syncreply.pop(0) + break + line = self.linesocket.get_line() + if line != None: + self.consolequeue.append(dict(type='reply',line=line,origin='other')) + if self.out: self.out.write("<"+line+"\n") + msg = SecopMsg(line) + #print '<', msg['type'], msg.par + if msg.async and replytype != msg['type'] + "=" + msg.par: + t = 0 + self.queue.append(msg) + else: + break + if t >= tmo: + #print 'TIMEOUT' + raise Exception("timeout") + gevent.sleep(0.1) + t += 0.1 + #print 'REPLY', msg['type'], msg.par, json.dumps(msg.value)[0:50] + if not replytype.startswith(msg['type']): + logging.error('REPLY MISMATCH %s <> %s', replytype, '<>', repr(msg)) + self.replytype = "" + return msg + + def w_getblock(self, path): + path = path.split(',')[-1] + if path == "main": + components = [] + for name, m in self.description["modules"].items(): + #components.append(convert_par(name, 'value', m['parameters']['value'])) + components.append(dict(type='rdlink', name=name+':value', title=name)) + #print components + return dict(type='draw', path='main', title='modules', components=components) + else: + module = self.description['modules'][path] + parameters = module["parameters"] + components = [] + for name in SecopClient.skip_par: + if name in parameters: + parameters.pop(name) + for name in SecopClient.prio_par: + if name in parameters: + components.append(convert_par(path, name, parameters.pop(name))) + components1 = [] + for name in SecopClient.hide_par: + if name in parameters: + components1.append(convert_par(path, name, parameters.pop(name))) + for name, p in parameters.items(): + components.append(convert_par(path, name, parameters[name])) + components.extend(components1) + return dict(type='draw', path=path, title=path, components=components) + + def w_updateblock(self, path): + self.cmd_reply("activate", "active") + return dict(type='accept-block') + + def w_console(self): + return dict(type='accept-console') + + def w_sendcommand(self, command): + #print 'COMMAND', command + cmd = "change " + command + return self.cmd_reply(cmd, 'event ' + command.split(' ')[0]) + + def poll(self): + if self.consolequeue: + messages = self.consolequeue + self.consolequeue = [] + return messages + if self.queue: + messages = convert_event(self.queue) + self.queue = [] + return messages + line = self.linesocket.get_line() + if line: + self.consolequeue.append(dict(type='reply',line=line,origin='other')) + if self.out: self.out.write("<"+line+"\n") + msg = SecopMsg(line) + if msg.async and self.replytype != msg['type'] + "=" + msg.par: + return convert_event(SecopMsg(line)) + self.syncreply.append(msg) + return [] + + def info(self): + return ["na"] + +class SecopInstrument(Instrument): + + def __init__(self, inst_name, instrument_config): + self.instrument_config = instrument_config + self.host_port = hostport_split(instrument_config['hostport']) + self.logger_dir = instrument_config.get('logger_dir', '') + test_day = instrument_config.get('test_day', None) + self.test_day = [int(x) for x in test_day.split('-')] if test_day else None + self.title = inst_name + self.clients = {} + + def newClient(self): + return self.register(SecopClient(self.host_port)) + +class Logger(object): + def __init__(self, logpath): + self.terminal = sys.stdout + self.log = open(logpath, "a") + + def write(self, message): + self.terminal.write(message) + self.log.write(message) + + def flush(self): + pass + +def handle_pdb(sig, frame): + import pdb + prin ('PDB') + pdb.Pdb().set_trace(frame) + + +if __name__ == '__main__': + signal.signal(signal.SIGUSR1, handle_pdb) + print('PID', os.getpid()) + + if len(sys.argv) > 3: + instrument_config = {} + for arg in sys.argv[1:]: + split = arg.split('=') + instrument_config[split[0]] = '='.join(split[1:]) + port = int(instrument_config['port']) + inst_name = instrument_config['instrument'] + else: + # take config from instruments.json + try: + port = int(sys.argv[1]) + except IndexError: + port = 5000 + try: + inst_name = sys.argv[2] + except IndexError: + inst_name = 'seadummy' + + with open('instruments.json') as f: + instrument_list = json.load(f) + + instrument_config = instrument_list[inst_name] + logging.basicConfig(filename=inst_name+".log", filemode='w', level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s') + + # sys.stdout = Logger(instrument_config.get('logger_dir', '.') + '/seaweb_stdout.txt') + # print '-' * 80 + type = instrument_config.get('type', 'sea') + if type == 'sea': + instrument = SeaInstrument(inst_name, instrument_config) + elif type == 'dummy': + instrument = DummyInstrument(inst_name, instrument_config) + elif type == 'secop': + instrument = SecopInstrument(inst_name, instrument_config) + else: + raise TypeError('bad instrument type') + + app.debug = True + #server = gevent.wsgi.WSGIServer(('', port), app, keyfile='key.key', certfile='key.crt') + server = gevent.pywsgi.WSGIServer(('', port), app, log=logging.getLogger('server')) + server.serve_forever() + # Then visit http://localhost:5000/test for a test diff --git a/seaweb_hist.py b/seaweb_hist.py new file mode 100755 index 0000000..e98e7e8 --- /dev/null +++ b/seaweb_hist.py @@ -0,0 +1,1014 @@ +#!/usr/bin/env python + +import sys +sys.path.append("../histreader") +# Make sure your gevent version is >= 1.0 +import gevent +import gevent.pywsgi +import gevent.queue +import flask +import time +import pprint +import random +import time +from datetime import date +from collections import deque +import sys +import socket +import tcp_lineserver +import uuid +#import seagraph +import traceback +import logging +import circularlog + +import os +import signal +from histgraph import get_vars, get_curves, get_abs_time +from histreader import MainCache +from frappyreader import FrappyReader + +try: import simplejson as json +except ImportError: import json + +def guess_mimetype(filename): + if filename.endswith('.js'): + mimetype = 'text/javascript' + elif filename.endswith('.css'): + mimetype = 'text/css' + elif filename.endswith('.ico'): + mimetype = 'image/x-icon' + else: + mimetype = 'text/html' + return mimetype + +#class SeawebException(Exception): +# pass + +# SSE 'protocol' is described here: http://mzl.la/UPFyxY +def to_json_sse(msg): + txt = json.dumps(msg, separators=(',',': ')) + # print 'MSG(size='+str(len(txt))+')', txt[:256] + return 'data: %s\n\n' % txt + +app = flask.Flask(__name__) + +update_rider = circularlog.Rider("upd") + +@app.route('/update') +def get_update(path=None): + # Client Adress: socket.getfqdn(flask.request.remote_addr) + client = instrument.newClient() + client.remote_info = circularlog.strtm() + " " + socket.getfqdn(flask.request.remote_addr.split(':')[-1]) + + @flask.stream_with_context + def generator(): + logging.info('UPDATE %s %s', client.id, socket.getfqdn(flask.request.remote_addr.split(':')[-1])) + msg = dict(type='id', id=client.id, title=instrument.title, device=instrument.device); + yield to_json_sse(msg) + try: + lastmsg = time.time() + while True: + if client.info() == "": + print(time.time()-lastmsg) + messages = client.poll() + for msg in messages: + update_rider.put('-', repr(msg)) + yield to_json_sse(msg) + if messages: + lastmsg = time.time() + else: + if time.time() > lastmsg + 30: + if not client.info(): + raise GeneratorExit("no activity") + logging.info('HEARTBEAT %s (%s)', client.id, "; ".join(client.info())) + yield to_json_sse(dict(type='heartbeat')) + lastmsg = time.time() + else: + gevent.sleep(0.5) + except (GeneratorExit, tcp_lineserver.Disconnected): + logging.info('CLOSED %s', client.id) + instrument.remove(client) + pass + except Exception as e: + logging.info('error') + logging.error('%s', traceback.format_exc()) + instrument.remove(client) + #msg = dict(type='error',error=traceback.format_exc()) + #yield to_json_sse(msg) + + resp = flask.Response(generator(), mimetype='text/event-stream') + resp.headers['Access-Control-Allow-Origin'] = '*' + return resp + +@app.route('/circular') +def dump_circular(): + circularlog.log() + return "log" + +@app.route('/clients') +def show_clients(): + result = "" + for id in instrument.clients: + c = instrument.clients[id] + result += c.remote_info + " " + "; ".join(c.info()) + "
" + return result + +@app.route('/getblock') +@app.route('/updateblock') +@app.route('/sendcommand') +@app.route('/console') +@app.route('/graph') +@app.route('/updategraph') +@app.route('/gettime') +@app.route('/getvars') +def reply(): + args = flask.request.args + kwargs = dict((k, args.get(k)) for k in args) + path = flask.request.path + logging.info('GET %s %s', path, repr(kwargs)) + try: + id = kwargs.pop('id') + client = instrument.clients[id] + msg = getattr(client, "w_" + path[1:])(**kwargs) + except Exception as e: + logging.error('%s', traceback.format_exc()) + # circularlog.log() + msg = dict(type='error', request=path[1:], error=repr(e)) + resp = flask.Response(json.dumps(msg), mimetype='application/json') + resp.headers['Access-Control-Allow-Origin'] = '*' + return resp + +@app.route('/test/') +def subdir_test_file(file): + gevent.sleep(2) + try: + with open("client/test/"+file, 'r') as content_file: + content = content_file.read() + except IOError: + flask.abort(404) + resp = flask.Response(content, mimetype=guess_mimetype(file)) + return resp + +@app.route('/jsFiles/') +@app.route('/cssFiles/') +@app.route('/externalFiles/') +def subdir_file(file): + subdir = flask.request.path.split('/')[1] + try: + with open("client/" + subdir+"/"+file, 'r') as content_file: + content = content_file.read() + except IOError: + flask.abort(404) + resp = flask.Response(content, mimetype=guess_mimetype(file)) + #resp.headers['Content-Security-Policy'] = "sandbox; script-src 'unsafe-inline';" + return resp + +@app.route('/externalFiles/maps/.map') +def replace_by_empty(file): + return "" + +@app.route('/') +def default(): + return general_file('SEAWebClient.html') + +@app.route('/') +def general_file(file): + subdir = "client/" + try: + with open(subdir+file, 'r') as content_file: + content = content_file.read() + except IOError: + flask.abort(404) + resp = flask.Response(content, mimetype=guess_mimetype(file)) + #resp.headers['Content-Security-Policy'] = "sandbox; script-src 'unsafe-inline';" + return resp + +def hostport_split(hostport): + h = hostport.split(':') + return (h[0], int(h[1])) + +def sea_request_reply(socket, command, tmo=5): + t = 0 + # wait for at most 60 seconds for not beeing busy + while socket.busy: + if t >= 60: + logging.error('still busy at %s (before command %s)', getattr(socket, "name", "noname"), command) + socket.busy = False + else: + gevent.sleep(0.1) + t += 0.1 + if t > 5: + logging.warning('unusual wait time %.4g (before command %s)', t, command) + #print command + socket.busy = True + socket.send_line("fulltransact "+command) + data = [] + dumpdata = [] + t = 0 + while True: + while socket.busy: + line = socket.get_line() + if line != None: + t = 0 + break + if t >= tmo: + socket.busy = False + logging.error('timeout on command %s (%s)', command, getattr(socket, "name", "noname")) + socket.reconnect() + raise Exception("timeout") + gevent.sleep(0.1) + t += 0.1 + else: + logging.error('interrupted command %s (%s)', command, getattr(socket, "name", "noname")) + socket.reconnect() + raise Exception("timeout") + dumpdata.append(line) + if line == 'TRANSACTIONFINISHED': + break + elif line.startswith('TRANSACTIONSTART '): + data = [] + else: + data.append(line) + if t>2: + logging.info('DUMPDATA %.4g %s', t, '|'.join(dumpdata)) + socket.busy = False + return data + + +class SeaGroup: + def __init__(self): + self.version = 0 + self.components = [] + self.grouptitle = "untitled" + self.lastpoll = 0 + self.lastreq = 0 + self.empty_values = {} + + +class Instrument: + def remove(self, client): + try: + del self.clients[client.id] + except KeyError: + logger.warning('client already removed %s', client.id) + + def register(self, client): + self.clients[client.id] = client + return client + +class SeaInstrument(Instrument): + # convert SEA layout tag like "-W" to more meaningful name. + # the code: 0: modifier, 1: enum name, 2: input element + tags = { + '-W': ('width', 0), + '-T': ('title', 0), + '-H': ('tooltip', 0), + '-V': ('value', 0), + '-S': ('style', 0), + '-D': ('div', 0), + '-r': ('enum_name', 1), + '-R': ('enum', 2), + '-I': ('input', 2), + '-i': ('rdonly', 2), + '-L': ('rdonly', 2), + '-G': ('group', 2), + '-C': ('checkbox', 2), + '-B': ('pushbutton', 2), + '-l': ('link', 0), + '-E': ('end', 2), + } + + def __init__(self, inst_name, instrument_config): + self.host_port = hostport_split(instrument_config['sea']) + self.inst_name = inst_name + self.clients = {} + self.logger_dir = instrument_config.get('logger_dir','') + test_day = instrument_config.get('test_day', None) + self.test_day = [int(x) for x in test_day.split('-')] if test_day else None + self.seaspy = tcp_lineserver.LineClient(self.host_port, ['Spy 007'], True, ridername='spy') + self.seaspy.busy = False + self.seaspy.name = "SEA seaspy" + self.seacmd = None + self.last_client_remove = time.time() + self.history = deque(maxlen=1000) + self.init() + gevent.Greenlet.spawn(self.checkconnections) + + def init(self): + self.values = {} + self.groups = {} + self.device = sea_request_reply(self.seaspy, "samenv name")[0] # first line + self.consolepos = 0 + self.timeStamp = None + self.history.clear() + self.lastcmd = None + + def checkconnections(self): + while True: + if len(self.clients) == 0 and self.seaspy.connected: + if time.time() > self.last_client_remove + 30: + logging.info("close SEA connections") + self.history.clear() + self.seaspy.close() + if self.seacmd: + self.seacmd.close() + gevent.sleep(10) + + def newClient(self): + if not self.seaspy.connected: + self.init() + return self.register(SeaClient()) + + def remove(self, client): + Instrument.remove(self, client) + self.last_client_remove = time.time() + + def findgroup(self, path): + 'get a group from sea and store it' + if not path in self.groups: + self.groups[path] = SeaGroup() + self.groups[path].lastpoll = 0 + self.groups[path].lastreq = time.time() + self.poll_groups([path]) + return self.groups[path] + + def poll_groups(self, paths): + 'polls values and components of requested groups' + for path in paths: + gobj = self.groups[path] + now = time.time() + if now < gobj.lastpoll + 0.5: + # print 'too fast', path + continue # do not poll before 500 ms have passed + gobj.lastreq = now + gobj.lastpoll = now + try: + data = sea_request_reply(self.seaspy, 'getgroup '+path) + except Exception as e: + logging.error('ERROR (getgroup %s) %s', path, traceback.format_exc()) + continue + components = [] + values = {} + grouptitle = None + within_enum = False + item = {} + olditem = {} + for line in data: + (key, type) = self.tags.get(line[0:2], ('',-1)) + if type < 0: + continue + if type == 0: # modifier + item[key] = line[2:] + continue + if within_enum and type >= 2: + enum['enum_names'] = enum_names + name = enum['name'] + self.values[name] = enum.get('value', '') + values[name] = None #NEW + del enum['value'] + components.append(enum) + del enum + within_enum = False + if type == 1: + item['value'] = line[2:] + enum_names.append(item) + item = {} + continue + if key == 'enum': + enum = item + item = {} + within_enum = True + enum['type'] = key + enum['name'] = line[2:] + enum_names = [] + continue + if key == 'pushbutton': + if line[2] != ' ': + continue # skip special buttons + line = line[1:] # skip space + try: + item['value'] = item['title'] + item['title'] = ' ' + # was olditem.get('title',olditem['name']) + except: + pass + if key == 'end': + continue + if key == 'group': + if grouptitle == None: + grouptitle = item.get('title', line[2:]) + item = {} + continue + item['type'] = key + name = line[2:] + try: + self.values[name] = item['value'] + values[name] = None + del item['value'] + except KeyError: + pass + item['name'] = name + components.append(item) + olditem = item + item = {} + if gobj.components != components: + gobj.components = components + gobj.empty_values = values + gobj.version += 1 + #print 'changed',path, gobj.version + #print 'FROM', gobj.components + #print 'TO ', components + if grouptitle != None: + gobj.grouptitle = grouptitle + + + def make_seacmd(self): + global port + if not self.seacmd: + self.seacmd = tcp_lineserver.LineClient(self.host_port, + ['seauser seaser', 'fulltransact config listen 1', 'fulltransact commandlog tail 200'], + True, ridername='cmd') + self.seacmd.name = "SEA user" + self.seacmd.connect() + + def console(self): + self.make_seacmd() + + def addconsole(self, msg): + self.history.append(msg) + self.consolepos += 1 + + def pollconsole(self): + if not self.seacmd: + return + while True: + line = self.seacmd.get_line() + if line == None: + return None + if (line.startswith('Deleting connection') + or line.startswith('Accepted connection') + or (line.startswith('User ') and ' privilege' in line) + or line.startswith('Change of Authorisation') + or line.startswith('fulltransact config ') + or line.startswith('UserRights = ') + or line.startswith('fulltransact status') + or line == 'OK' or line == 'OK.' + or line.startswith('fulltransact commandlog tail') + or line.startswith('Login OK') + or line.startswith('TRANSACTIONSTART commandlog tail ') + ): + pass + elif line.startswith('TRANSACTIONSTART'): + if self.lastcmd != None: + self.addconsole(('command', self.lastcmd, self.lastid)) + self.lastcmd = None + elif line == 'TRANSACTIONFINISHED': + self.lastid = 0 + elif line.startswith('fulltransact '): + type = 'command' + self.addconsole(('command', line[13:], 0)) + elif line.startswith('==='): + self.timeStamp = line + elif line > " ": + if self.timeStamp: + self.addconsole(('reply', self.timeStamp, self.lastid)) + self.timeStamp = None + type = 'reply' + self.addconsole(('reply', line, self.lastid)) + + def getconsole(self, startindex, id): + idx = min(len(self.history), self.consolepos - startindex) # distance from end + messages = [] + for i in range(-idx,0): + type, line, origin = self.history[i] + if origin == id: + origin = 'self' + elif origin != 'async': + origin = 'other' + messages.append(dict(type=type, line=line, origin=origin)) + return self.consolepos, messages + + def command(self, command, id): + self.make_seacmd() + self.seacmd.send_line('fulltransact '+command) + self.lastid = id + self.lastcmd = command + + +class SeaGraph: + HISTORICAL = 0 + ACTUAL = 1 + LIVE = 2 + + def __init__(self): + self.livemode = self.HISTORICAL + self.time = [0, 0] + self.lastvalues = {} + + def graphpoll(self): + if self.livemode == self.LIVE: + self.time[1], = get_abs_time(0) + else: + self.time[1] = self.time[0] # do not update + if self.time[1] > self.time[0]: + result = get_curves(main_cache, self.variables, self.time, show_empty=False) + self.strip_future(result) + if int(self.time[1] / 60) != int(self.time[0] / 60): + # update unchanged values + for var, (lastt, lastx) in self.lastvalues.items(): + if var not in result: + result[var] = [(self.time[1], lastx)] + self.time[0] = self.time[1] + if len(result) > 0: + return dict(type='graph-update', reduced=False, time=self.time[1], graph=result) + return None + + def complete_to_end(self, result, endtime): + for var, c in result.items(): + if len(c): + lastt, lastx = c[-1] + if lastt < endtime: + c.append((endtime, lastx)) + self.lastvalues[var] = (endtime, lastx) + + def strip_future(self, result): + """strip future points (happens only on dummy test_day)""" + # if self.livemode == self.LIVE: + for c in result.values(): + while len(c): + lastt, lastx = c[-1] + if lastt <= self.time[1]: + break + c.pop() + + def w_gettime(self, time): + result = get_abs_time(*[float(t) for t in time.split(',')]) + logging.info('GOTTIME %r', result) + return dict(type='time', time=result) + + def w_graph(self, variables, time="-1800,0"): + time = [float(t) for t in time.split(',')] + self.last_t = 0 + start, end, now = get_abs_time(*time, 0) + self.time = [start, end] + self.variables = variables.split(',') + self.livemode = self.ACTUAL if end >= now else self.HISTORICAL + logging.info('LIVE %g %g %d %d', end, now, end >= now, self.livemode) + # self.scanner = seagraph.NumericScanner(instrument.logger_dir, instrument.test_day) + result = get_curves(main_cache, self.variables, (start, end)) + # result = self.scanner.get_message(self.variables, self.time, show_empty=True) + self.strip_future(result) + logging.info('VARIABLES: %r %r %r', self.variables, start-now, end-now) + for var, curve in list(result.items()): + logging.info(' %s %r len=%d', var, curve[-1][0] - curve[0][0], len(curve)) + self.complete_to_end(result, end) + self.time[0] = self.time[1] + # reduction not yet implemented + return dict(type='graph-draw', reduced=False, graph=result) + + def w_getvars(self, time): + time = [float(t) for t in time.split(',')] + # scanner = seagraph.VarsScanner(instrument.logger_dir, instrument.test_day) + result = dict(type='var_list') + result['blocks'] = list(get_vars(main_cache, time[-1]).values()) + logging.info('GotVARS %r', result) + return result + + def w_updategraph(self): + logging.info("UPD GRAPH %d", self.livemode) + if self.livemode == self.HISTORICAL: + return dict(type='accept-graph', live=False) + else: + self.livemode = self.LIVE + return dict(type='accept-graph', live=True) + #self.livemode = self.LIVE + #return dict(type='accept-graph', live=True) + + +class SeaClient(SeaGraph): + + def __init__(self): + self.group_version = {} + self.group_values = {} + self.values = {} + self.consolepos = 0 + self.id = uuid.uuid4().hex[0:15] + SeaGraph.__init__(self) + self.queue = [dict(type='id', id=self.id, instrument=instrument.inst_name, device=instrument.device)] + + def poll(self): + messages = self.queue + self.queue = [] + updates = [] + # group updates + instrument.poll_groups(self.group_version.keys()) + for path, gv in self.group_version.items(): + if path in self.group_values: + gobj = instrument.groups[path] + if gv != gobj.version: + logging.info('redraw: %s client: %d instr: %d', path, gv, gobj.version) + self.group_version[path] = gobj.version + messages.append(dict(type='redraw', path=path)) + break + else: + values = self.group_values[path] + for name, client_value in values.items(): + inst_value = instrument.values.get(name, None) + if client_value != inst_value: + values[name] = inst_value + updates.append({'name': name, 'value': inst_value}) + if len(updates) > 0: + messages.append(dict(type='update', updates=updates)) + # console messages + instrument.pollconsole() + self.consolepos, msg = instrument.getconsole(self.consolepos, self.id) + messages.extend(msg) + + # graph messages + msg = self.graphpoll() + if msg: + messages.append(msg) + return messages + + def info(self): + return self.group_version.keys() + + def w_getblock(self, path): + gobj = instrument.findgroup(path.split(',')[-1]) + self.group_version[path] = gobj.version + logging.info('getblock %s %d', path, gobj.version) + return dict(type='draw', title=gobj.grouptitle, path=path, components=gobj.components) + + def w_updateblock(self, path): + gobj = instrument.findgroup(path) + logging.info('make active %s', path) + #if not path in self.group_values: + self.group_values[path] = gobj.empty_values.copy() + return dict(type='accept-block') + + def w_console(self): + self.consolepos = 0 + instrument.console() + return dict(type='accept-console') + + def w_sendcommand(self, command): + instrument.command(command, self.id) + return dict(type='accept-command') + + +class DummyClient(SeaGraph): + async = set(('id','update','redraw','command','reply','graph-update','graph-redraw')) + + def __init__(self, host_port): + self.linesocket = tcp_lineserver.LineClient(host_port) + self.id = uuid.uuid4().hex[0:15] + self.linesocket.send_line(json.dumps(dict(type='init', id=self.id))) + self.queue = [] + self.syncreply = [] + SeaGraph.__init__(self) + + def cmd_reply(self, command, replytype, tmo=5): + self.linesocket.send_line(json.dumps(command)) + t = 0 + while True: + if self.syncreply: + msg = self.syncreply.pop(0) + break + line = self.linesocket.get_line() + if line != None: + msg = json.loads(line) + if msg.type in self.async: + t = 0 + # print 'PUSH',msg, replytype + self.queue.append(msg) + else: + break + if t >= tmo: + # print 'TIMEOUT' + raise Exception("timeout") + gevent.sleep(0.1) + t += 0.1 + if msg.type != replytype: + logging.error('REPLY MISMATCH %s %s <> %s', command, replytype, msg.type) + return msg + + + def w_getblock(self, path): + return self.cmd_reply(dict(type='getblock', path=path, id=self.id), 'draw') + + def w_updateblock(self, path): + return self.cmd_reply(dict(type='updateblock', path=path, id=self.id), 'accept-block') + + def w_console(self): + return self.cmd_reply(dict(type='console', id=self.id), 'accept-console') + + def w_gettime(self, time): + pass + + def w_sendcommand(self, command): + return self.cmd_reply(dict(type='sendcommand', command=command, id=self.id), 'accept-command') + + def poll(self): + if self.queue: + messages = self.queue + self.queue = [] + return messages + line = self.linesocket.get_line() + messages = [] + if line: + msg = json.loads(line) + if msg.type in self.async: + messages.append(msg) + else: + self.syncreply.append(msg) + + # graph messages + msg = self.graphpoll() + if msg: + messages.append(msg) + return messages + + def info(self): + return ["na"] + +class DummyInstrument(Instrument): + + def __init__(self, inst_name, instrument_config): + self.instrument_config = instrument_config + self.host_port = hostport_split(instrument_config['hostport']) + self.logger_dir = instrument_config.get('logger_dir', '') + test_day = instrument_config.get('test_day', None) + self.test_day = [int(x) for x in test_day.split('-')] if test_day else None + self.title = inst_name + self.clients = {} + + def newClient(self): + return self.register(DummyClient(self.host_port)) + + +class SecopMsg: + def __init__(self, line): + self.par = None + self.value = None + sl = line.split(' ') + if len(sl[0].split(',')) > 1: + self.type = 'idn' + self.value = line + else: + self.type = sl[0] + if len(sl) > 1: + self.par = sl[1] + if len(sl) > 2: + self.value = json.loads(' '.join(sl[2:])) + self.async = self.type in ('update', 'error_update') + + def __repr__(self): + value = repr(self.value) + if len(value) > 50: + value = value[:50] + '...' + return "SecopMsg('%s %s %s')" % (self.type, self.par, value) + + +def SecopEncode(cmd, par=None, value=None): + line = cmd + if par: + line += " " + par + if value: + line += " " + json.dumps(value) + return line + + +def convert_par(module, name, par): + result = dict(type='input', name=module+":"+name, title=name) + if par.get('readonly', True): + result['type']='rdonly' + #print result + return result + + +def convert_event(messages): + if isinstance(messages, SecopMsg): + messages = [messages] + updates = [] + for msg in messages: + if msg.type == 'update': + updates.append(dict(name=msg.par, value=str(msg.value[0]))) + return [dict(type='update', updates=updates)] + + +class SecopClient(SeaGraph): + prio_par = ["value", "status", "target"] + hide_par = ["baseclass", "class", "pollinterval"] + skip_par = ["status2"] + + def __init__(self, host_port): + self.linesocket = tcp_lineserver.LineClient(host_port) + self.id = uuid.uuid4().hex[0:15] + self.queue = [] + self.syncreply = [] + self.consolequeue = [] + #self.out = open("debug.txt", "w") + #self.out = sys.stdout + self.out = None + SeaGraph.__init__(self) + idn = self.cmd_reply("*IDN?", "idn") + self.idn = idn.value + self.description = self.cmd_reply("describe", "describing").value + + def cmd_reply(self, command, replytype, tmo=5): + logging.info('COMMAND %r', command) + self.replytype = replytype + if self.out: self.out.write(">"+command+"\n") + self.consolequeue.append(dict(type='command',line=command,origin='self')) + self.linesocket.send_line(command) + t = 0 + while True: + if self.syncreply: + msg = self.syncreply.pop(0) + break + line = self.linesocket.get_line() + if line != None: + msg = SecopMsg(line) + if msg.async: + self.consolequeue.append(dict(type='reply',line=line,origin='async')) + else: + self.consolequeue.append(dict(type='reply',line=line,origin='other')) + if self.out: self.out.write("<"+line+"\n") + #print '<', msg.type, msg.par + if msg.async and replytype != msg.type + "=" + msg.par: + t = 0 + self.queue.append(msg) + else: + break + if t >= tmo: + #print 'TIMEOUT' + raise Exception("timeout") + gevent.sleep(0.1) + t += 0.1 + logging.info('REPLY %r', msg) + if msg.type.startswith('error_'): + return {} + if not replytype.startswith(msg.type): + logging.error('REPLY MISMATCH %s <> %s', replytype, repr(msg)) + self.replytype = "" + return msg + + def w_getblock(self, path): + logging.info('getblock %s', path) + path = path.split(',')[-1] + if path == "main" or path == "_overview": + components = [] + for name, m in self.description["modules"].items(): + #components.append(convert_par(name, 'value', m['parameters']['value'])) + components.append(dict(type='rdlink', name=name+':value', title=name)) + #print components + return dict(type='draw', path='main', title='modules', components=components) + else: + module = self.description['modules'][path] + parameters = module["accessibles"] + components = [] + for name in SecopClient.skip_par: + if name in parameters: + parameters.pop(name) + for name in SecopClient.prio_par: + if name in parameters: + components.append(convert_par(path, name, parameters.pop(name))) + components1 = [] + for name in SecopClient.hide_par: + if name in parameters: + components1.append(convert_par(path, name, parameters.pop(name))) + for name, p in parameters.items(): + components.append(convert_par(path, name, parameters[name])) + components.extend(components1) + return dict(type='draw', path=path, title=path, components=components) + + def w_updateblock(self, path): + logging.info('updateblock %s', path) + self.cmd_reply("activate", "active") + return dict(type='accept-block') + + def w_console(self): + return dict(type='accept-console') + + def w_sendcommand(self, command): + if not command: + return dict(type='accept-command') + cmd = "change " + command + return self.cmd_reply(cmd, 'changed ' + command.split(' ')[0]) + + def poll(self): + if self.consolequeue: + messages = self.consolequeue + self.consolequeue = [] + return messages + if self.queue: + messages = convert_event(self.queue) + self.queue = [] + return messages + line = self.linesocket.get_line() + # logging.info('poll %s', line) + if line: + if self.out: self.out.write("<"+line+"\n") + msg = SecopMsg(line) + if msg.async: # do not flood console with updates + self.consolequeue.append(dict(type='reply',line=line,origin='async')) + else: + self.consolequeue.append(dict(type='reply',line=line,origin='other')) + # logging.info('GOT MSG %r %r', msg.async, convert_event(SecopMsg(line))) + if msg.async: + return convert_event(SecopMsg(line)) + self.syncreply.append(msg) + return [] + + def info(self): + return ["na"] + + +class SecopInstrument(Instrument): + + def __init__(self, inst_name, instrument_config): + self.instrument_config = instrument_config + self.host_port = hostport_split(instrument_config['hostport']) + self.logger_dir = instrument_config.get('logger_dir', '') + test_day = instrument_config.get('test_day', None) + self.test_day = [int(x) for x in test_day.split('-')] if test_day else None + self.title = inst_name + self.clients = {} + + def newClient(self): + cl = SecopClient(self.host_port) + self.device = cl.description['equipment_id'] + logging.info('init done %s %s', self.host_port, self.device) + return self.register(cl) + + +class Logger: + def __init__(self, logpath): + self.terminal = sys.stdout + self.log = open(logpath, "a") + + def write(self, message): + self.terminal.write(message) + self.log.write(message) + + def flush(self): + pass + + +def handle_pdb(sig, frame): + import pdb + print('PDB') + pdb.Pdb().set_trace(frame) + + +if __name__ == '__main__': + signal.signal(signal.SIGUSR1, handle_pdb) + print('PID', os.getpid()) + + if len(sys.argv) > 3: + instrument_config = {} + for arg in sys.argv[1:]: + split = arg.split('=') + instrument_config[split[0]] = '='.join(split[1:]) + port = int(instrument_config['port']) + inst_name = instrument_config['instrument'] + else: + # take config from instruments.json + try: + port = int(sys.argv[1]) + except IndexError: + port = 5000 + try: + inst_name = sys.argv[2] + except IndexError: + inst_name = 'seadummy' + + with open('instruments.json') as f: + instrument_list = json.load(f) + + instrument_config = instrument_list[inst_name] + + frd = FrappyReader('/home/l_samenv/sea/%s' % inst_name, gevent=gevent) + main_cache = MainCache('/home/l_samenv/histreader/%s_hist' % inst_name, frd, gevent=gevent) + + # logging.basicConfig(filename=inst_name+".log", filemode='w', level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s') + logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s') + + # sys.stdout = Logger(instrument_config.get('logger_dir', '.') + '/seaweb_stdout.txt') + # print '-' * 80 + type = instrument_config.get('type', 'sea') + if type == 'sea': + instrument = SeaInstrument(inst_name, instrument_config) + elif type == 'dummy': + instrument = DummyInstrument(inst_name, instrument_config) + elif type == 'secop': + instrument = SecopInstrument(inst_name, instrument_config) + else: + raise TypeError('bad instrument type') + + app.debug = True + #server = gevent.wsgi.WSGIServer(('', port), app, keyfile='key.key', certfile='key.crt') + server = gevent.pywsgi.WSGIServer(('', port), app, log=logging.getLogger('server')) + server.serve_forever() + # Then visit http://localhost:5000/test for a test diff --git a/tcp_lineserver.py b/tcp_lineserver.py new file mode 100644 index 0000000..78d171b --- /dev/null +++ b/tcp_lineserver.py @@ -0,0 +1,128 @@ +import asyncore +import socket +import errno +import re +import circularlog +import logging + +class LineHandler(asyncore.dispatcher_with_send): + + def __init__(self, sock): + self.buffer = b"" + asyncore.dispatcher_with_send.__init__(self, sock) + self.crlf = 0 + + def handle_read(self): + data = self.recv(8192) + if data: + parts = data.split(b"\n") + if len(parts) == 1: + self.buffer += data + else: + self.handle_line((self.buffer + parts[0]).decode('ascii')) + for part in parts[1:-1]: + if part[-1] == b"\r": + self.crlf = True + part = part[:-1] + else: + self.crlf = False + self.handle_line(part.decode('ascii')) + self.buffer = parts[-1] + + def send_line(self, line): + self.send(line.encode('ascii') + (b"\r\n" if self.crlf else b"\n")) + + def handle_line(self, line): + ''' + test: simple echo handler + ''' + self.send_line("> " + line) + +class LineServer(asyncore.dispatcher): + + def __init__(self, host, port, lineHandlerClass): + asyncore.dispatcher.__init__(self) + self.create_socket(socket.AF_INET, socket.SOCK_STREAM) + self.set_reuse_addr() + self.bind((host, port)) + self.listen(5) + self.lineHandlerClass = lineHandlerClass + + def handle_accept(self): + pair = self.accept() + if pair is not None: + sock, addr = pair + print("Incoming connection from %s" % repr(addr)) + handler = self.lineHandlerClass(sock) + + def loop(self): + asyncore.loop() + +class Disconnected(Exception): + pass + +class LineClient(object): + + def __init__(self, host_port, announcement=None, filter_ascii=False, ridername="r"): + self.host_port = host_port + self.filter_ascii = filter_ascii + self.announcement = announcement + self.circular = circularlog.Rider(ridername) + self.connected = False + + def connect(self): + logging.info("connect to %s %s", "%s:%d" % self.host_port, getattr(self, 'name', '?')) + self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.socket.connect(self.host_port) + self.connected = True + self.buffer = [b""] + if self.announcement: + self.send_line('\n'.join(self.announcement)) + + def get_line(self): + if not self.connected: + logging.info("connect for get_line") + self.connect() + while len(self.buffer) <= 1: + self.socket.setblocking(0) + try: + data = self.socket.recv(1024) + except socket.error as e: + err = e.args[0] + if err == errno.EAGAIN or err == errno.EWOULDBLOCK: + return None + raise e + if data == "": + print(self.buffer, '<') + self.close() + raise Disconnected("disconnected") + self.socket.setblocking(1) + data = data.split(b'\n') + self.buffer[0] += data[0] + for p in data[1:]: + self.buffer.append(p) + line = self.buffer.pop(0).decode('ascii') + if len(line) > 0 and line[-1] == '\r': + line = line[0:-1] + self.circular.put("<", line) + # print '<', line + if self.filter_ascii: + # replace non ascii characters + line = re.sub(r'[^\x00-\x7E]+','?', line) + return line + + def send_line(self, line): + if not self.connected: + logging.info("connect for cmd: %s", line) + self.connect() + # print '>', line + self.circular.put(">", line) + self.socket.sendall(line.encode('ascii') + b'\n') + + def close(self): + self.socket.close() + self.connected = False + +if __name__ == "__main__": + server = LineServer("localhost", 9999, LineHandler) + server.loop()