1116 lines
38 KiB
Python
Executable File
1116 lines
38 KiB
Python
Executable File
#!/usr/bin/env python
|
|
|
|
import sys
|
|
sys.path.append("./lib")
|
|
# Make sure your gevent version is >= 1.0
|
|
import gevent
|
|
import gevent.pywsgi
|
|
import gevent.queue
|
|
import flask
|
|
import time
|
|
import pprint
|
|
import random
|
|
import time
|
|
from datetime import date, datetime
|
|
from collections import deque
|
|
import sys
|
|
import socket
|
|
import tcp_lineserver
|
|
import uuid
|
|
import seagraph
|
|
import traceback
|
|
import logging
|
|
import circularlog
|
|
from gevent.lock import RLock
|
|
import os
|
|
import signal
|
|
|
|
from influxgraph import InfluxGraph
|
|
|
|
from influxdb import InfluxDB, InfluxDataGetter
|
|
|
|
try: import simplejson as json
|
|
except ImportError: import json
|
|
|
|
def guess_mimetype(filename):
|
|
if filename.endswith('.js'):
|
|
mimetype = 'text/javascript'
|
|
elif filename.endswith('.css'):
|
|
mimetype = 'text/css'
|
|
elif filename.endswith('.ico'):
|
|
mimetype = 'image/x-icon'
|
|
elif filename.endswith(".png"):
|
|
mimetype = "image/png"
|
|
else:
|
|
mimetype = 'text/html'
|
|
return mimetype
|
|
|
|
#class SeawebException(Exception):
|
|
# pass
|
|
|
|
# SSE 'protocol' is described here: http://mzl.la/UPFyxY
|
|
def to_json_sse(msg):
|
|
txt = json.dumps(msg, separators=(',',': '))
|
|
# print 'MSG(size='+str(len(txt))+')', txt[:256]
|
|
return 'data: %s\n\n' % txt
|
|
|
|
app = flask.Flask(__name__)
|
|
|
|
update_rider = circularlog.Rider("upd")
|
|
|
|
@app.route('/update')
|
|
def get_update(path=None):
|
|
# Client Adress: socket.getfqdn(flask.request.remote_addr)
|
|
client = instrument.newClient()
|
|
client.remote_info = circularlog.strtm() + " " + socket.getfqdn(flask.request.remote_addr.split(':')[-1])
|
|
|
|
@flask.stream_with_context
|
|
def generator():
|
|
logging.info('UPDATE %s %s', client.id, socket.getfqdn(flask.request.remote_addr.split(':')[-1]))
|
|
#msg = dict(type='id', id=client.id, title=instrument.title);
|
|
#yield to_json_sse(msg)
|
|
msg = dict(type='id', id=client.id, instrument=instrument.title, device=instrument.device)
|
|
yield to_json_sse(msg)
|
|
try:
|
|
lastmsg = time.time()
|
|
while True:
|
|
if client.info() == "":
|
|
print(time.time()-lastmsg)
|
|
messages = client.poll()
|
|
for msg in messages:
|
|
update_rider.put('-', repr(msg))
|
|
yield to_json_sse(msg)
|
|
if messages:
|
|
lastmsg = time.time()
|
|
gevent.sleep(1)
|
|
else:
|
|
if time.time() > lastmsg + 30:
|
|
if not client.info():
|
|
raise GeneratorExit("no activity")
|
|
logging.info('HEARTBEAT %s (%s)', client.id, "; ".join(client.info()))
|
|
yield to_json_sse(dict(type='heartbeat'))
|
|
lastmsg = time.time()
|
|
else:
|
|
gevent.sleep(0.5)
|
|
except (GeneratorExit, tcp_lineserver.Disconnected) as e:
|
|
logging.info("except clause %r", repr(e))
|
|
logging.info('CLOSED %s', client.id)
|
|
print('CLOSE client')
|
|
instrument.remove(client)
|
|
pass
|
|
except Exception as e:
|
|
logging.info('error')
|
|
logging.error('%s', traceback.format_exc())
|
|
instrument.remove(client)
|
|
#msg = dict(type='error',error=traceback.format_exc())
|
|
#yield to_json_sse(msg)
|
|
|
|
resp = flask.Response(generator(), mimetype='text/event-stream')
|
|
resp.headers['Access-Control-Allow-Origin'] = '*'
|
|
return resp
|
|
|
|
@app.route('/circular')
|
|
def dump_circular():
|
|
circularlog.log()
|
|
return "log"
|
|
|
|
@app.route('/clients')
|
|
def show_clients():
|
|
result = ""
|
|
for id in instrument.clients:
|
|
c = instrument.clients[id]
|
|
result += c.remote_info + " " + "; ".join(c.info()) + "<br>"
|
|
return result
|
|
|
|
@app.route('/export')
|
|
def export():
|
|
args = flask.request.args
|
|
kwargs = dict((k, args.get(k)) for k in args)
|
|
path = flask.request.path
|
|
logging.info('GET %s %s', path, repr(kwargs))
|
|
try:
|
|
id = kwargs.pop('id')
|
|
client = instrument.clients[id]
|
|
bytes = client.w_export(**kwargs)
|
|
|
|
return flask.send_file(
|
|
bytes,
|
|
as_attachment=True,
|
|
download_name='export.tsv',
|
|
mimetype='text/tab-separated-values'
|
|
)
|
|
|
|
except Exception as e:
|
|
logging.error('%s', traceback.format_exc())
|
|
circularlog.log()
|
|
msg = dict(type='error', request=path[1:], error=repr(e))
|
|
resp = flask.Response(json.dumps(msg), mimetype='application/json')
|
|
resp.headers['Access-Control-Allow-Origin'] = '*'
|
|
return resp
|
|
|
|
@app.route('/getblock')
|
|
@app.route('/updateblock')
|
|
@app.route('/sendcommand')
|
|
@app.route('/console')
|
|
@app.route('/graph')
|
|
@app.route('/updategraph')
|
|
@app.route('/gettime')
|
|
@app.route('/getvars', methods=["GET", "POST"])
|
|
def reply():
|
|
args = flask.request.values
|
|
kwargs = dict((k, args.get(k)) for k in args)
|
|
path = flask.request.path
|
|
logging.info('GET %s %s', path, repr(kwargs))
|
|
try:
|
|
id = kwargs.pop('id')
|
|
client = instrument.clients[id]
|
|
msg = getattr(client, "w_" + path[1:])(**kwargs)
|
|
except Exception as e:
|
|
logging.error('%s', traceback.format_exc())
|
|
circularlog.log()
|
|
msg = dict(type='error', request=path[1:], error=repr(e))
|
|
resp = flask.Response(json.dumps(msg), mimetype='application/json')
|
|
resp.headers['Access-Control-Allow-Origin'] = '*'
|
|
return resp
|
|
|
|
@app.route('/test/<file>')
|
|
def subdir_test_file(file):
|
|
gevent.sleep(2)
|
|
resp = flask.send_file("client/test/"+file, mimetype=guess_mimetype(file))
|
|
return resp
|
|
|
|
@app.route('/components/curves_settings_popup/color_selector/<file>')
|
|
@app.route('/components/curves_settings_popup/<file>')
|
|
@app.route('/components/action_entry/<file>')
|
|
@app.route('/components/export_popup/<file>')
|
|
@app.route('/components/dates_popup/<file>')
|
|
@app.route('/components/menu_popup/<file>')
|
|
@app.route('/components/help_popup/<file>')
|
|
@app.route('/components/help_entry/<file>')
|
|
@app.route('/components/control/<file>')
|
|
@app.route('/components/divider/<file>')
|
|
@app.route('/components/states_indicator/dates/<file>')
|
|
@app.route('/res/<file>')
|
|
@app.route('/jsFiles/<file>')
|
|
@app.route('/cssFiles/<file>')
|
|
@app.route('/externalFiles/<file>')
|
|
def subdir_file(file):
|
|
subdir = "/".join(flask.request.path.split("/")[1:-1])
|
|
resp = flask.send_file("client/" + subdir+"/"+file, mimetype=guess_mimetype(file))
|
|
#resp.headers['Content-Security-Policy'] = "sandbox; script-src 'unsafe-inline';"
|
|
return resp
|
|
|
|
@app.route('/externalFiles/maps/<file>.map')
|
|
def replace_by_empty(file):
|
|
return ""
|
|
|
|
@app.route('/')
|
|
def default():
|
|
return general_file('SEAWebClient.html')
|
|
|
|
@app.route('/<file>')
|
|
def general_file(file):
|
|
subdir = "client/"
|
|
resp = flask.send_file(subdir+file, mimetype=guess_mimetype(file))
|
|
#resp.headers['Content-Security-Policy'] = "sandbox; script-src 'unsafe-inline';"
|
|
return resp
|
|
|
|
def hostport_split(hostport):
|
|
h = hostport.split(':')
|
|
return (h[0], int(h[1]))
|
|
|
|
def sea_request_reply(socket, command, tmo=5):
|
|
t = 0
|
|
# wait for at most 60 seconds for not beeing busy
|
|
while socket.busy:
|
|
if t >= 60:
|
|
logging.error('still busy at %s (before command %s)', getattr(socket, "name", "noname"), command)
|
|
socket.busy = False
|
|
else:
|
|
gevent.sleep(0.1)
|
|
t += 0.1
|
|
if t > 5:
|
|
logging.warning('unusual wait time %.4g (before command %s)', t, command)
|
|
#print(command)
|
|
socket.busy = True
|
|
socket.send_line("fulltransact "+command)
|
|
data = []
|
|
dumpdata = []
|
|
t = 0
|
|
while True:
|
|
while socket.busy:
|
|
line = socket.get_line()
|
|
if line != None:
|
|
t = 0
|
|
break
|
|
if t >= tmo:
|
|
socket.busy = False
|
|
logging.error('timeout on command %s (%s)', command, getattr(socket, "name", "noname"))
|
|
socket.reconnect()
|
|
raise Exception("timeout")
|
|
gevent.sleep(0.1)
|
|
t += 0.1
|
|
else:
|
|
logging.error('interrupted command %s (%s)', command, getattr(socket, "name", "noname"))
|
|
socket.reconnect()
|
|
raise Exception("timeout")
|
|
dumpdata.append(line)
|
|
if line == 'TRANSACTIONFINISHED':
|
|
break
|
|
elif line.startswith('TRANSACTIONSTART '):
|
|
data = []
|
|
else:
|
|
data.append(line)
|
|
if t>2:
|
|
logging.info('DUMPDATA %.4g %s', t, '|'.join(dumpdata))
|
|
socket.busy = False
|
|
return data
|
|
|
|
|
|
class SeaGroup:
|
|
def __init__(self):
|
|
self.version = 0
|
|
self.components = []
|
|
self.grouptitle = "untitled"
|
|
self.lastpoll = 0
|
|
self.lastreq = 0
|
|
self.empty_values = {}
|
|
|
|
|
|
class Instrument:
|
|
def remove(self, client):
|
|
try:
|
|
del self.clients[client.id]
|
|
except KeyError:
|
|
logger.warning('client already removed %s', client.id)
|
|
|
|
def register(self, client):
|
|
self.clients[client.id] = client
|
|
return client
|
|
|
|
|
|
class SeaInstrument(Instrument):
|
|
# convert SEA layout tag like "-W" to more meaningful name.
|
|
# the code: 0: modifier, 1: enum name, 2: input element
|
|
tags = {
|
|
'-W': ('width', 0),
|
|
'-T': ('title', 0),
|
|
'-H': ('tooltip', 0),
|
|
'-V': ('value', 0),
|
|
'-S': ('style', 0),
|
|
'-D': ('div', 0),
|
|
'-r': ('enum_name', 1),
|
|
'-R': ('enum', 2),
|
|
'-I': ('input', 2),
|
|
'-i': ('rdonly', 2),
|
|
'-L': ('rdonly', 2),
|
|
'-G': ('group', 2),
|
|
'-C': ('checkbox', 2),
|
|
'-B': ('pushbutton', 2),
|
|
'-l': ('link', 0),
|
|
'-E': ('end', 2),
|
|
}
|
|
|
|
def __init__(self, inst_name, instrument_config):
|
|
self.host_port = hostport_split(instrument_config['sea'])
|
|
self.inst_name = inst_name
|
|
self.title = inst_name
|
|
self.clients = {}
|
|
self.logger_dir = instrument_config.get('logger_dir','')
|
|
test_day = instrument_config.get('test_day', None)
|
|
self.test_day = [int(x) for x in test_day.split('-')] if test_day else None
|
|
self.seaspy = tcp_lineserver.LineClient(self.host_port, ['Spy 007'], True, ridername='spy')
|
|
self.seaspy.busy = False
|
|
self.seaspy.name = "SEA seaspy"
|
|
self.seacmd = None
|
|
self.last_client_remove = time.time()
|
|
self.history = deque(maxlen=1000)
|
|
self.sea_lock = RLock()
|
|
self.init()
|
|
gevent.Greenlet.spawn(self.checkconnections)
|
|
|
|
def init(self):
|
|
self.values = {}
|
|
self.groups = {}
|
|
with self.sea_lock:
|
|
self.device = sea_request_reply(self.seaspy, "samenv name")[0] # first line
|
|
self.consolepos = 0
|
|
self.timeStamp = None
|
|
self.history.clear()
|
|
self.lastcmd = None
|
|
|
|
def checkconnections(self):
|
|
while True:
|
|
if len(self.clients) == 0 and self.seaspy.connected:
|
|
if time.time() > self.last_client_remove + 30:
|
|
logging.info("close SEA connections")
|
|
self.history.clear()
|
|
self.seaspy.close()
|
|
if self.seacmd:
|
|
self.seacmd.close()
|
|
gevent.sleep(10)
|
|
|
|
def newClient(self):
|
|
if not self.seaspy.connected:
|
|
self.init()
|
|
return self.register(SeaClient())
|
|
|
|
def remove(self, client):
|
|
Instrument.remove(self, client)
|
|
self.last_client_remove = time.time()
|
|
|
|
def findgroup(self, path):
|
|
'get a group from sea and store it'
|
|
if not path in self.groups:
|
|
self.groups[path] = SeaGroup()
|
|
self.groups[path].lastpoll = 0
|
|
self.groups[path].lastreq = time.time()
|
|
self.poll_groups([path])
|
|
return self.groups[path]
|
|
|
|
def poll_groups(self, paths):
|
|
'polls values and components of requested groups'
|
|
for path in list(paths):
|
|
gobj = self.groups[path]
|
|
now = time.time()
|
|
if now < gobj.lastpoll + 0.5:
|
|
# print 'too fast', path
|
|
continue # do not poll before 500 ms have passed
|
|
gobj.lastreq = now
|
|
gobj.lastpoll = now
|
|
try:
|
|
with self.sea_lock:
|
|
data = sea_request_reply(self.seaspy, 'getgroup '+path)
|
|
except Exception as e:
|
|
logging.error('ERROR (getgroup %s) %s', path, traceback.format_exc())
|
|
continue
|
|
components = []
|
|
values = {}
|
|
grouptitle = None
|
|
within_enum = False
|
|
item = {}
|
|
olditem = {}
|
|
for line in data:
|
|
(key, type) = self.tags.get(line[0:2], ('',-1))
|
|
if type < 0:
|
|
continue
|
|
if type == 0: # modifier
|
|
item[key] = line[2:]
|
|
continue
|
|
if within_enum and type >= 2:
|
|
enum['enum_names'] = enum_names
|
|
name = enum['name']
|
|
self.values[name] = enum.get('value', '')
|
|
values[name] = None #NEW
|
|
del enum['value']
|
|
components.append(enum)
|
|
del enum
|
|
within_enum = False
|
|
if type == 1:
|
|
item['value'] = line[2:]
|
|
enum_names.append(item)
|
|
item = {}
|
|
continue
|
|
if key == 'enum':
|
|
enum = item
|
|
item = {}
|
|
within_enum = True
|
|
enum['type'] = key
|
|
enum['name'] = line[2:]
|
|
enum_names = []
|
|
continue
|
|
if key == 'pushbutton':
|
|
if line[2] != ' ':
|
|
continue # skip special buttons
|
|
line = line[1:] # skip space
|
|
try:
|
|
item['value'] = item['title']
|
|
item['title'] = ' '
|
|
# was olditem.get('title',olditem['name'])
|
|
except:
|
|
pass
|
|
if key == 'end':
|
|
continue
|
|
if key == 'group':
|
|
if grouptitle == None:
|
|
grouptitle = item.get('title', line[2:])
|
|
item = {}
|
|
continue
|
|
item['type'] = key
|
|
name = line[2:]
|
|
try:
|
|
self.values[name] = item['value']
|
|
values[name] = None
|
|
del item['value']
|
|
except KeyError:
|
|
pass
|
|
item['name'] = name
|
|
components.append(item)
|
|
olditem = item
|
|
item = {}
|
|
if gobj.components != components:
|
|
gobj.components = components
|
|
gobj.empty_values = values
|
|
gobj.version += 1
|
|
#print 'changed',path, gobj.version
|
|
#print 'FROM', gobj.components
|
|
#print 'TO ', components
|
|
if grouptitle != None:
|
|
gobj.grouptitle = grouptitle
|
|
|
|
|
|
def make_seacmd(self):
|
|
global port
|
|
if not self.seacmd:
|
|
self.seacmd = tcp_lineserver.LineClient(self.host_port,
|
|
['seauser seaser', 'fulltransact config listen 1', 'fulltransact commandlog tail 200'],
|
|
True, ridername='cmd')
|
|
self.seacmd.name = "SEA user"
|
|
self.seacmd.connect()
|
|
|
|
def console(self):
|
|
self.make_seacmd()
|
|
|
|
def addconsole(self, msg):
|
|
self.history.append(msg)
|
|
self.consolepos += 1
|
|
|
|
def pollconsole(self):
|
|
if not self.seacmd:
|
|
return
|
|
while True:
|
|
line = self.seacmd.get_line()
|
|
if line == None:
|
|
return None
|
|
if (line.startswith('Deleting connection')
|
|
or line.startswith('Accepted connection')
|
|
or (line.startswith('User ') and ' privilege' in line)
|
|
or line.startswith('Change of Authorisation')
|
|
or line.startswith('fulltransact config ')
|
|
or line.startswith('UserRights = ')
|
|
or line.startswith('fulltransact status')
|
|
or line == 'OK' or line == 'OK.'
|
|
or line.startswith('fulltransact commandlog tail')
|
|
or line.startswith('Login OK')
|
|
or line.startswith('TRANSACTIONSTART commandlog tail ')
|
|
):
|
|
pass
|
|
elif line.startswith('TRANSACTIONSTART'):
|
|
if self.lastcmd != None:
|
|
self.addconsole(('command', self.lastcmd, self.lastid))
|
|
self.lastcmd = None
|
|
elif line == 'TRANSACTIONFINISHED':
|
|
self.lastid = 0
|
|
elif line.startswith('fulltransact '):
|
|
type = 'command'
|
|
self.addconsole(('command', line[13:], 0))
|
|
elif line.startswith('==='):
|
|
self.timeStamp = line
|
|
elif line > " ":
|
|
if self.timeStamp:
|
|
self.addconsole(('reply', self.timeStamp, self.lastid))
|
|
self.timeStamp = None
|
|
type = 'reply'
|
|
self.addconsole(('reply', line, self.lastid))
|
|
|
|
def getconsole(self, startindex, id):
|
|
idx = min(len(self.history), self.consolepos - startindex) # distance from end
|
|
messages = []
|
|
for i in range(-idx,0):
|
|
typ, line, hid = self.history[i]
|
|
messages.append(dict(type=typ, line=line, origin=('self' if hid==id else 'other')))
|
|
return self.consolepos, messages
|
|
|
|
def command(self, command, id):
|
|
self.make_seacmd()
|
|
self.seacmd.send_line('fulltransact '+command)
|
|
self.lastid = id
|
|
self.lastcmd = command
|
|
|
|
|
|
class SeaInfluxInstrument(SeaInstrument):
|
|
|
|
def __init__(self, inst_name, instrument_config):
|
|
super().__init__(inst_name, instrument_config)
|
|
self.db = InfluxDB()
|
|
self.influx_data_getter = InfluxDataGetter(self.db, inst_name)
|
|
self.device = self.influx_data_getter.get_device_name(int(datetime.now().timestamp()))
|
|
|
|
def newClient(self):
|
|
if not self.seaspy.connected:
|
|
self.init()
|
|
return self.register(SeaInfluxClient())
|
|
|
|
class InfluxInstrument(Instrument):
|
|
|
|
def __init__(self, instr_name):
|
|
self.db = InfluxDB()
|
|
self.influx_data_getter = InfluxDataGetter(self.db, inst_name)
|
|
self.clients = {}
|
|
self.title = instr_name
|
|
self.device = self.influx_data_getter.get_device_name(int(datetime.now().timestamp()))
|
|
|
|
def newClient(self):
|
|
return self.register(InfluxClient())
|
|
|
|
class SeaGraph:
|
|
HISTORICAL = 0
|
|
ACTUAL = 1
|
|
LIVE = 2
|
|
|
|
def __init__(self):
|
|
self.livemode = self.HISTORICAL
|
|
self.time = [0, 0]
|
|
self.lastvalues = {}
|
|
self.variables = []
|
|
|
|
def strip_future(self, result):
|
|
'strip future points (happens only on dummy test_day)'
|
|
# if self.livemode == self.LIVE:
|
|
for c in result.values():
|
|
while c:
|
|
lastt, lastx = c[-1]
|
|
if lastt <= self.time[1]:
|
|
break
|
|
c.pop()
|
|
|
|
def complete_to_end(self, result, endtime):
|
|
for var, c in result.items():
|
|
if c:
|
|
lastt, lastx = c[-1]
|
|
if lastt < endtime:
|
|
c.append((endtime, lastx))
|
|
self.lastvalues[var] = (endtime, lastx)
|
|
|
|
def w_graph(self, variables, time="-1800,0"):
|
|
"""get given curves
|
|
|
|
variables: comma separated list of variables to get
|
|
time: comma separated time range (beg,end) values < 1 year are treated as relative to the current time
|
|
"""
|
|
time = [float(t) for t in time.split(',')]
|
|
self.last_t = 0
|
|
start, end, now = seagraph.get_abs_time(time + [0])
|
|
self.time = [start, end]
|
|
variables = variables.split(',')
|
|
self.livemode = self.ACTUAL if end >= now else self.HISTORICAL
|
|
logging.info('LIVE %g %g %d %d', end, now, end >= now, self.livemode)
|
|
self.scanner = seagraph.NumericScanner(instrument.logger_dir, instrument.test_day)
|
|
#result = self.scanner.get_message(self.variables, self.time)
|
|
#self.time[0] = self.time[1]
|
|
result = self.scanner.get_message(variables, self.time, show_empty=True)
|
|
self.strip_future(result)
|
|
#for var in ('treg.set.reg', 'mf'):
|
|
# curve = result.get(var,[(0,0)])
|
|
# print(var, curve[0][0] - now, curve[-1][0] - now, curve)
|
|
self.complete_to_end(result, end)
|
|
self.time[0] = self.time[1]
|
|
# reduction not yet implemented
|
|
return dict(type='graph-draw', reduced=False, graph=result)
|
|
|
|
def w_gettime(self, time):
|
|
"""parse time (using server time)
|
|
time: comma separated time range (beg,end) values < 1 year are treated as relative to the current time
|
|
"""
|
|
time = [float(t) for t in time.split(',')]
|
|
return dict(type='time', time= seagraph.get_abs_time(time))
|
|
|
|
def w_getvars(self, time):
|
|
"""get the curves available at given time (unix timestamp as string)
|
|
"""
|
|
time = [float(t) for t in time.split(',')]
|
|
scanner = seagraph.VarsScanner(instrument.logger_dir, instrument.test_day)
|
|
result = dict(type='var_list')
|
|
result['blocks'] = list(scanner.get_message(time[-1]).values())
|
|
# updates the self.variables attribute to keep track of the available variables
|
|
self.variables = [variable["name"] for block in result['blocks'] for variable in block["curves"]]
|
|
return result
|
|
|
|
def w_updategraph(self):
|
|
"""update live values - seems not to work"""
|
|
logging.info("UPD GRAPH %d", self.livemode)
|
|
if self.livemode == self.HISTORICAL:
|
|
return dict(type='accept-graph', live=False)
|
|
else:
|
|
self.livemode = self.LIVE
|
|
return dict(type='accept-graph', live=True)
|
|
#self.livemode = self.LIVE
|
|
#return dict(type='accept-graph', live=True)
|
|
|
|
def graphpoll(self):
|
|
if self.livemode == self.LIVE:
|
|
self.time[1], = seagraph.get_abs_time([0])
|
|
else:
|
|
self.time[1] = self.time[0] # do not update
|
|
if self.time[1] > self.time[0]:
|
|
result = self.scanner.get_message(self.variables, self.time, show_empty=False)
|
|
self.strip_future(result)
|
|
if int(self.time[1] / 60) != int(self.time[0] / 60):
|
|
# update unchanged values
|
|
for var, (lastt, lastx) in self.lastvalues.items():
|
|
if var not in result:
|
|
result[var] = [(self.time[1], lastx)]
|
|
self.time[0] = self.time[1]
|
|
if len(result) > 0:
|
|
return dict(type='graph-update', reduced=False, time=self.time[1], graph=result)
|
|
return None
|
|
|
|
class SeaParams:
|
|
|
|
def __init__(self):
|
|
self.group_version = {}
|
|
self.group_values = {}
|
|
self.values = {}
|
|
self.consolepos = 0
|
|
self.id = uuid.uuid4().hex[0:15]
|
|
# SeaGraph.__init__(self)
|
|
self.queue = []
|
|
|
|
def poll(self):
|
|
messages = self.queue
|
|
self.queue = []
|
|
updates = []
|
|
# group updates
|
|
instrument.poll_groups(self.group_version.keys())
|
|
for path, gv in self.group_version.items():
|
|
if path in self.group_values:
|
|
gobj = instrument.groups[path]
|
|
if gv != gobj.version:
|
|
logging.info('redraw: %s client: %d instr: %d', path, gv, gobj.version)
|
|
self.group_version[path] = gobj.version
|
|
messages.append(dict(type='redraw', path=path))
|
|
break
|
|
else:
|
|
values = self.group_values[path]
|
|
for name, client_value in values.items():
|
|
inst_value = instrument.values.get(name, None)
|
|
if client_value != inst_value:
|
|
values[name] = inst_value
|
|
updates.append({'name': name, 'value': inst_value})
|
|
if len(updates) > 0:
|
|
messages.append(dict(type='update', updates=updates))
|
|
# console messages
|
|
instrument.pollconsole()
|
|
self.consolepos, msg = instrument.getconsole(self.consolepos, self.id)
|
|
messages.extend(msg)
|
|
|
|
# graph messages
|
|
msg = self.graphpoll()
|
|
if msg:
|
|
messages.append(msg)
|
|
return messages
|
|
|
|
def info(self):
|
|
return self.group_version.keys()
|
|
|
|
def w_getblock(self, path):
|
|
gobj = instrument.findgroup(path.split(',')[-1])
|
|
# self.group_version[path] = gobj.version
|
|
# simplify: allow only one group per client
|
|
self.group_version = {path: gobj.version}
|
|
logging.info('getblock %s %d', path, gobj.version)
|
|
return dict(type='draw', title=gobj.grouptitle, path=path, components=gobj.components)
|
|
|
|
def w_updateblock(self, path):
|
|
gobj = instrument.findgroup(path)
|
|
logging.info('make active %s', path)
|
|
#if not path in self.group_values:
|
|
self.group_values[path] = gobj.empty_values.copy()
|
|
return dict(type='accept-block')
|
|
|
|
def w_console(self):
|
|
self.consolepos = 0
|
|
instrument.console()
|
|
return dict(type='accept-console')
|
|
|
|
def w_sendcommand(self, command):
|
|
instrument.command(command, self.id)
|
|
return dict(type='accept-command')
|
|
|
|
|
|
class InfluxParams:
|
|
"""Class with dummy routes, in case client side is started with the right part init commands"""
|
|
def __init__(self):
|
|
self.id = uuid.uuid4().hex[0:15]
|
|
self.queue = []
|
|
|
|
def poll(self):
|
|
messages = self.queue
|
|
self.queue = []
|
|
msg = self.graphpoll()
|
|
if msg:
|
|
messages.append(msg)
|
|
return messages
|
|
|
|
def info(self):
|
|
return ["na"]
|
|
|
|
def w_getblock(self, path):
|
|
return dict(type='draw', title="graph", path=path, components=[])
|
|
|
|
def w_updateblock(self, path):
|
|
return dict(type='accept-block')
|
|
|
|
def w_console(self):
|
|
return dict(type='accept-console')
|
|
|
|
def w_sendcommand(self, command):
|
|
return dict(type='accept-command')
|
|
|
|
|
|
class SeaClient(SeaParams, SeaGraph):
|
|
def __init__(self):
|
|
SeaParams.__init__(self)
|
|
SeaGraph.__init__(self)
|
|
|
|
|
|
class SeaInfluxClient(SeaParams, InfluxGraph):
|
|
def __init__(self):
|
|
SeaParams.__init__(self)
|
|
InfluxGraph.__init__(self, instrument.influx_data_getter, instrument.title)
|
|
|
|
class InfluxClient(InfluxParams, InfluxGraph):
|
|
def __init__(self):
|
|
InfluxParams.__init__(self)
|
|
InfluxGraph.__init__(self, instrument.influx_data_getter, instrument.title)
|
|
|
|
|
|
class DummyClient(SeaGraph):
|
|
asynch = set(('id','update','redraw','command','reply','graph-update','graph-redraw'))
|
|
|
|
def __init__(self, host_port):
|
|
self.linesocket = tcp_lineserver.LineClient(host_port)
|
|
self.id = uuid.uuid4().hex[0:15]
|
|
self.linesocket.send_line(json.dumps(dict(type='init', id=self.id)))
|
|
self.queue = []
|
|
self.syncreply = []
|
|
SeaGraph.__init__(self)
|
|
|
|
def cmd_reply(self, command, replytype=None, tmo=5):
|
|
self.linesocket.send_line(json.dumps(command))
|
|
t = 0
|
|
while True:
|
|
if self.syncreply:
|
|
msg = self.syncreply.pop(0)
|
|
break
|
|
line = self.linesocket.get_line()
|
|
if line != None:
|
|
msg = json.loads(line)
|
|
if msg['type'] in self.asynch:
|
|
t = 0
|
|
# print 'PUSH',msg, replytype
|
|
self.queue.append(msg)
|
|
else:
|
|
break
|
|
if t >= tmo:
|
|
# print 'TIMEOUT'
|
|
raise Exception("timeout")
|
|
gevent.sleep(0.1)
|
|
t += 0.1
|
|
if replytype and msg['type'] != replytype:
|
|
logging.error('REPLY MISMATCH %s %s <> %s' , command, replytype, msg['type'])
|
|
return msg
|
|
|
|
|
|
def w_getblock(self, path):
|
|
return self.cmd_reply(dict(type='getblock', path=path, id=self.id), 'draw')
|
|
|
|
def w_updateblock(self, path):
|
|
return self.cmd_reply(dict(type='updateblock', path=path, id=self.id), 'accept-block')
|
|
|
|
def w_console(self):
|
|
return self.cmd_reply(dict(type='console', id=self.id), 'accept-console')
|
|
|
|
def w_sendcommand(self, command):
|
|
return self.cmd_reply(dict(type='sendcommand', command=command, id=self.id), 'accept-command')
|
|
|
|
def poll(self):
|
|
if self.queue:
|
|
messages = self.queue
|
|
self.queue = []
|
|
return messages
|
|
line = self.linesocket.get_line()
|
|
messages = []
|
|
if line:
|
|
msg = json.loads(line)
|
|
if msg['type'] in self.asynch:
|
|
messages.append(msg)
|
|
else:
|
|
self.syncreply.append(msg)
|
|
|
|
# graph messages
|
|
msg = self.graphpoll()
|
|
if msg:
|
|
messages.append(msg)
|
|
return messages
|
|
|
|
def info(self):
|
|
return ["na"]
|
|
|
|
class DummyInstrument(Instrument):
|
|
|
|
def __init__(self, inst_name, instrument_config):
|
|
self.instrument_config = instrument_config
|
|
self.host_port = hostport_split(instrument_config['hostport'])
|
|
self.logger_dir = instrument_config.get('logger_dir', '')
|
|
test_day = instrument_config.get('test_day', None)
|
|
self.test_day = [int(x) for x in test_day.split('-')] if test_day else None
|
|
self.title = inst_name
|
|
self.clients = {}
|
|
|
|
def newClient(self):
|
|
return self.register(DummyClient(self.host_port))
|
|
|
|
|
|
class SecopMsg:
|
|
def __init__(self, line):
|
|
self.par = None
|
|
self.value = None
|
|
sl = line.split(' ')
|
|
if len(sl[0].split(',')) > 1:
|
|
self.type = 'idn'
|
|
self.value = line
|
|
else:
|
|
self.type = sl[0]
|
|
if len(sl) > 1:
|
|
self.par = sl[1]
|
|
if len(sl) > 2:
|
|
self.value = json.loads(' '.join(sl[2:]))
|
|
self.asynch = self.type in ('update', 'error_update')
|
|
|
|
def __repr__(self):
|
|
value = repr(self.value)
|
|
if len(value) > 50:
|
|
value = value[:50] + '...'
|
|
return "SecopMsg('%s %s %s')" % (self.type, self.par, value)
|
|
|
|
|
|
def SecopEncode(cmd, par=None, value=None):
|
|
line = cmd
|
|
if par:
|
|
line += " " + par
|
|
if value:
|
|
line += " " + json.dumps(value)
|
|
return line
|
|
|
|
|
|
def convert_par(module, name, par):
|
|
result = dict(type='input', name=module+":"+name, title=name)
|
|
if par.get('readonly', True):
|
|
result['type'] = 'rdonly'
|
|
else:
|
|
result['command'] = 'change %s:%s' % (module, name)
|
|
if par['datainfo']['type'] == 'enum':
|
|
result['enum_names'] = [dict(title=k, value=v) for k, v in par['datainfo']['members'].items()]
|
|
result['type'] = 'enum'
|
|
elif par['datainfo']['type'] == 'bool':
|
|
result['type'] = 'checkbox'
|
|
return result
|
|
|
|
|
|
def convert_event(messages):
|
|
if isinstance(messages, SecopMsg):
|
|
messages = [messages]
|
|
updates = []
|
|
for msg in messages:
|
|
if msg['type'] == 'update':
|
|
updates.append(dict(name=msg.par, value=msg.value[0]))
|
|
# updates.append(dict(name=msg.par, value=str(msg.value[0])))
|
|
return [dict(type='update', updates=updates)]
|
|
|
|
|
|
class SecopClient:
|
|
prio_par = ["value", "status", "target"]
|
|
hide_par = ["baseclass", "class", "pollinterval"]
|
|
skip_par = ["status2"]
|
|
|
|
def __init__(self, host_port):
|
|
self.linesocket = tcp_lineserver.LineClient(host_port)
|
|
self.id = uuid.uuid4().hex[0:15]
|
|
self.queue = []
|
|
self.syncreply = []
|
|
self.consolequeue = []
|
|
#self.out = open("debug.txt", "w")
|
|
#self.out = sys.stdout
|
|
self.out = None
|
|
idn = self.cmd_reply("*IDN?", "idn")
|
|
self.idn = idn.value
|
|
self.description = self.cmd_reply("describe", "describing").value
|
|
|
|
def cmd_reply(self, command, replytype, tmo=5):
|
|
self.replytype = replytype
|
|
if self.out: self.out.write(">"+command+"\n")
|
|
self.consolequeue.append(dict(type='command',line=command,origin='self'))
|
|
self.linesocket.send_line(command)
|
|
t = 0
|
|
while True:
|
|
if self.syncreply:
|
|
msg = self.syncreply.pop(0)
|
|
break
|
|
line = self.linesocket.get_line()
|
|
if line != None:
|
|
self.consolequeue.append(dict(type='reply',line=line,origin='other'))
|
|
if self.out: self.out.write("<"+line+"\n")
|
|
msg = SecopMsg(line)
|
|
#print '<', msg['type'], msg.par
|
|
if msg.asynch and replytype != msg['type'] + "=" + msg.par:
|
|
t = 0
|
|
self.queue.append(msg)
|
|
else:
|
|
break
|
|
if t >= tmo:
|
|
#print 'TIMEOUT'
|
|
raise Exception("timeout")
|
|
gevent.sleep(0.1)
|
|
t += 0.1
|
|
#print 'REPLY', msg['type'], msg.par, json.dumps(msg.value)[0:50]
|
|
if not replytype.startswith(msg['type']):
|
|
logging.error('REPLY MISMATCH %s <> %s', replytype, '<>', repr(msg))
|
|
self.replytype = ""
|
|
return msg
|
|
|
|
def w_getblock(self, path):
|
|
path = path.split(',')[-1]
|
|
if path == "main":
|
|
components = []
|
|
for name, m in self.description["modules"].items():
|
|
#components.append(convert_par(name, 'value', m['parameters']['value']))
|
|
components.append(dict(type='rdlink', name=name+':value', title=name))
|
|
#print components
|
|
return dict(type='draw', path='main', title='modules', components=components)
|
|
else:
|
|
module = self.description['modules'][path]
|
|
parameters = module["parameters"]
|
|
components = []
|
|
for name in SecopClient.skip_par:
|
|
if name in parameters:
|
|
parameters.pop(name)
|
|
for name in SecopClient.prio_par:
|
|
if name in parameters:
|
|
components.append(convert_par(path, name, parameters.pop(name)))
|
|
components1 = []
|
|
for name in SecopClient.hide_par:
|
|
if name in parameters:
|
|
components1.append(convert_par(path, name, parameters.pop(name)))
|
|
for name, p in parameters.items():
|
|
components.append(convert_par(path, name, parameters[name]))
|
|
components.extend(components1)
|
|
return dict(type='draw', path=path, title=path, components=components)
|
|
|
|
def w_updateblock(self, path):
|
|
self.cmd_reply("activate", "active")
|
|
return dict(type='accept-block')
|
|
|
|
def w_console(self):
|
|
return dict(type='accept-console')
|
|
|
|
def w_sendcommand(self, command):
|
|
#print 'COMMAND', command
|
|
cmd = "change " + command
|
|
return self.cmd_reply(cmd, 'event ' + command.split(' ')[0])
|
|
|
|
def poll(self):
|
|
if self.consolequeue:
|
|
messages = self.consolequeue
|
|
self.consolequeue = []
|
|
return messages
|
|
if self.queue:
|
|
messages = convert_event(self.queue)
|
|
self.queue = []
|
|
return messages
|
|
line = self.linesocket.get_line()
|
|
if line:
|
|
self.consolequeue.append(dict(type='reply',line=line,origin='other'))
|
|
if self.out: self.out.write("<"+line+"\n")
|
|
msg = SecopMsg(line)
|
|
if msg.asynch and self.replytype != msg['type'] + "=" + msg.par:
|
|
return convert_event(SecopMsg(line))
|
|
self.syncreply.append(msg)
|
|
return []
|
|
|
|
def info(self):
|
|
return ["na"]
|
|
|
|
class SecopInstrument(Instrument):
|
|
|
|
def __init__(self, inst_name, instrument_config):
|
|
self.instrument_config = instrument_config
|
|
self.host_port = hostport_split(instrument_config['hostport'])
|
|
self.logger_dir = instrument_config.get('logger_dir', '')
|
|
test_day = instrument_config.get('test_day', None)
|
|
self.test_day = [int(x) for x in test_day.split('-')] if test_day else None
|
|
self.title = inst_name
|
|
self.clients = {}
|
|
|
|
def newClient(self):
|
|
return self.register(SecopClient(self.host_port))
|
|
|
|
class Logger(object):
|
|
def __init__(self, logpath):
|
|
self.terminal = sys.stdout
|
|
self.log = open(logpath, "a")
|
|
|
|
def write(self, message):
|
|
self.terminal.write(message)
|
|
self.log.write(message)
|
|
|
|
def flush(self):
|
|
pass
|
|
|
|
|
|
def handle_pdb(sig, frame):
|
|
import pdb
|
|
print('PDB')
|
|
pdb.Pdb().set_trace(frame)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
signal.signal(signal.SIGUSR1, handle_pdb)
|
|
print('PID', os.getpid())
|
|
|
|
if len(sys.argv) > 3:
|
|
instrument_config = {}
|
|
for arg in sys.argv[1:]:
|
|
split = arg.split('=')
|
|
instrument_config[split[0]] = '='.join(split[1:])
|
|
port = int(instrument_config['port'])
|
|
inst_name = instrument_config['instrument']
|
|
else:
|
|
# take config from instruments.json
|
|
try:
|
|
port = int(sys.argv[1])
|
|
except IndexError:
|
|
port = 5000
|
|
try:
|
|
inst_name = sys.argv[2]
|
|
except IndexError:
|
|
inst_name = 'seadummy'
|
|
|
|
with open('instruments.json') as f:
|
|
instrument_list = json.load(f)
|
|
|
|
instrument_config = instrument_list[inst_name]
|
|
logging.basicConfig(filename='log/%s.log' % inst_name, filemode='w', level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s')
|
|
|
|
# sys.stdout = Logger(instrument_config.get('logger_dir', '.') + '/seaweb_stdout.txt')
|
|
# print '-' * 80
|
|
type = instrument_config.get('type', 'sea')
|
|
if type == 'sea':
|
|
instrument = SeaInstrument(inst_name, instrument_config)
|
|
elif type == 'influxsea':
|
|
instrument = SeaInfluxInstrument(inst_name, instrument_config)
|
|
elif type == 'influx':
|
|
instrument = InfluxInstrument(inst_name)
|
|
elif type == 'dummy':
|
|
instrument = DummyInstrument(inst_name, instrument_config)
|
|
elif type == 'secop':
|
|
instrument = SecopInstrument(inst_name, instrument_config)
|
|
else:
|
|
raise TypeError('bad instrument type')
|
|
|
|
app.debug = True
|
|
#server = gevent.wsgi.WSGIServer(('', port), app, keyfile='key.key', certfile='key.crt')
|
|
server = gevent.pywsgi.WSGIServer(('', port), app, log=logging.getLogger('server'))
|
|
server.serve_forever()
|
|
# Then visit http://localhost:5000/test for a test
|