Files
seweb/seaweb.py
l_samenv ac542b1694 improve time axis labels
+ change poll interval to 1 sec
2024-09-25 07:51:43 +02:00

1116 lines
38 KiB
Python
Executable File

#!/usr/bin/env python
import sys
sys.path.append("./lib")
# Make sure your gevent version is >= 1.0
import gevent
import gevent.pywsgi
import gevent.queue
import flask
import time
import pprint
import random
import time
from datetime import date, datetime
from collections import deque
import sys
import socket
import tcp_lineserver
import uuid
import seagraph
import traceback
import logging
import circularlog
from gevent.lock import RLock
import os
import signal
from influxgraph import InfluxGraph
from influxdb import InfluxDB, InfluxDataGetter
try: import simplejson as json
except ImportError: import json
def guess_mimetype(filename):
if filename.endswith('.js'):
mimetype = 'text/javascript'
elif filename.endswith('.css'):
mimetype = 'text/css'
elif filename.endswith('.ico'):
mimetype = 'image/x-icon'
elif filename.endswith(".png"):
mimetype = "image/png"
else:
mimetype = 'text/html'
return mimetype
#class SeawebException(Exception):
# pass
# SSE 'protocol' is described here: http://mzl.la/UPFyxY
def to_json_sse(msg):
txt = json.dumps(msg, separators=(',',': '))
# print 'MSG(size='+str(len(txt))+')', txt[:256]
return 'data: %s\n\n' % txt
app = flask.Flask(__name__)
update_rider = circularlog.Rider("upd")
@app.route('/update')
def get_update(path=None):
# Client Adress: socket.getfqdn(flask.request.remote_addr)
client = instrument.newClient()
client.remote_info = circularlog.strtm() + " " + socket.getfqdn(flask.request.remote_addr.split(':')[-1])
@flask.stream_with_context
def generator():
logging.info('UPDATE %s %s', client.id, socket.getfqdn(flask.request.remote_addr.split(':')[-1]))
#msg = dict(type='id', id=client.id, title=instrument.title);
#yield to_json_sse(msg)
msg = dict(type='id', id=client.id, instrument=instrument.title, device=instrument.device)
yield to_json_sse(msg)
try:
lastmsg = time.time()
while True:
if client.info() == "":
print(time.time()-lastmsg)
messages = client.poll()
for msg in messages:
update_rider.put('-', repr(msg))
yield to_json_sse(msg)
if messages:
lastmsg = time.time()
gevent.sleep(1)
else:
if time.time() > lastmsg + 30:
if not client.info():
raise GeneratorExit("no activity")
logging.info('HEARTBEAT %s (%s)', client.id, "; ".join(client.info()))
yield to_json_sse(dict(type='heartbeat'))
lastmsg = time.time()
else:
gevent.sleep(0.5)
except (GeneratorExit, tcp_lineserver.Disconnected) as e:
logging.info("except clause %r", repr(e))
logging.info('CLOSED %s', client.id)
print('CLOSE client')
instrument.remove(client)
pass
except Exception as e:
logging.info('error')
logging.error('%s', traceback.format_exc())
instrument.remove(client)
#msg = dict(type='error',error=traceback.format_exc())
#yield to_json_sse(msg)
resp = flask.Response(generator(), mimetype='text/event-stream')
resp.headers['Access-Control-Allow-Origin'] = '*'
return resp
@app.route('/circular')
def dump_circular():
circularlog.log()
return "log"
@app.route('/clients')
def show_clients():
result = ""
for id in instrument.clients:
c = instrument.clients[id]
result += c.remote_info + " " + "; ".join(c.info()) + "<br>"
return result
@app.route('/export')
def export():
args = flask.request.args
kwargs = dict((k, args.get(k)) for k in args)
path = flask.request.path
logging.info('GET %s %s', path, repr(kwargs))
try:
id = kwargs.pop('id')
client = instrument.clients[id]
bytes = client.w_export(**kwargs)
return flask.send_file(
bytes,
as_attachment=True,
download_name='export.tsv',
mimetype='text/tab-separated-values'
)
except Exception as e:
logging.error('%s', traceback.format_exc())
circularlog.log()
msg = dict(type='error', request=path[1:], error=repr(e))
resp = flask.Response(json.dumps(msg), mimetype='application/json')
resp.headers['Access-Control-Allow-Origin'] = '*'
return resp
@app.route('/getblock')
@app.route('/updateblock')
@app.route('/sendcommand')
@app.route('/console')
@app.route('/graph')
@app.route('/updategraph')
@app.route('/gettime')
@app.route('/getvars', methods=["GET", "POST"])
def reply():
args = flask.request.values
kwargs = dict((k, args.get(k)) for k in args)
path = flask.request.path
logging.info('GET %s %s', path, repr(kwargs))
try:
id = kwargs.pop('id')
client = instrument.clients[id]
msg = getattr(client, "w_" + path[1:])(**kwargs)
except Exception as e:
logging.error('%s', traceback.format_exc())
circularlog.log()
msg = dict(type='error', request=path[1:], error=repr(e))
resp = flask.Response(json.dumps(msg), mimetype='application/json')
resp.headers['Access-Control-Allow-Origin'] = '*'
return resp
@app.route('/test/<file>')
def subdir_test_file(file):
gevent.sleep(2)
resp = flask.send_file("client/test/"+file, mimetype=guess_mimetype(file))
return resp
@app.route('/components/curves_settings_popup/color_selector/<file>')
@app.route('/components/curves_settings_popup/<file>')
@app.route('/components/action_entry/<file>')
@app.route('/components/export_popup/<file>')
@app.route('/components/dates_popup/<file>')
@app.route('/components/menu_popup/<file>')
@app.route('/components/help_popup/<file>')
@app.route('/components/help_entry/<file>')
@app.route('/components/control/<file>')
@app.route('/components/divider/<file>')
@app.route('/components/states_indicator/dates/<file>')
@app.route('/res/<file>')
@app.route('/jsFiles/<file>')
@app.route('/cssFiles/<file>')
@app.route('/externalFiles/<file>')
def subdir_file(file):
subdir = "/".join(flask.request.path.split("/")[1:-1])
resp = flask.send_file("client/" + subdir+"/"+file, mimetype=guess_mimetype(file))
#resp.headers['Content-Security-Policy'] = "sandbox; script-src 'unsafe-inline';"
return resp
@app.route('/externalFiles/maps/<file>.map')
def replace_by_empty(file):
return ""
@app.route('/')
def default():
return general_file('SEAWebClient.html')
@app.route('/<file>')
def general_file(file):
subdir = "client/"
resp = flask.send_file(subdir+file, mimetype=guess_mimetype(file))
#resp.headers['Content-Security-Policy'] = "sandbox; script-src 'unsafe-inline';"
return resp
def hostport_split(hostport):
h = hostport.split(':')
return (h[0], int(h[1]))
def sea_request_reply(socket, command, tmo=5):
t = 0
# wait for at most 60 seconds for not beeing busy
while socket.busy:
if t >= 60:
logging.error('still busy at %s (before command %s)', getattr(socket, "name", "noname"), command)
socket.busy = False
else:
gevent.sleep(0.1)
t += 0.1
if t > 5:
logging.warning('unusual wait time %.4g (before command %s)', t, command)
#print(command)
socket.busy = True
socket.send_line("fulltransact "+command)
data = []
dumpdata = []
t = 0
while True:
while socket.busy:
line = socket.get_line()
if line != None:
t = 0
break
if t >= tmo:
socket.busy = False
logging.error('timeout on command %s (%s)', command, getattr(socket, "name", "noname"))
socket.reconnect()
raise Exception("timeout")
gevent.sleep(0.1)
t += 0.1
else:
logging.error('interrupted command %s (%s)', command, getattr(socket, "name", "noname"))
socket.reconnect()
raise Exception("timeout")
dumpdata.append(line)
if line == 'TRANSACTIONFINISHED':
break
elif line.startswith('TRANSACTIONSTART '):
data = []
else:
data.append(line)
if t>2:
logging.info('DUMPDATA %.4g %s', t, '|'.join(dumpdata))
socket.busy = False
return data
class SeaGroup:
def __init__(self):
self.version = 0
self.components = []
self.grouptitle = "untitled"
self.lastpoll = 0
self.lastreq = 0
self.empty_values = {}
class Instrument:
def remove(self, client):
try:
del self.clients[client.id]
except KeyError:
logger.warning('client already removed %s', client.id)
def register(self, client):
self.clients[client.id] = client
return client
class SeaInstrument(Instrument):
# convert SEA layout tag like "-W" to more meaningful name.
# the code: 0: modifier, 1: enum name, 2: input element
tags = {
'-W': ('width', 0),
'-T': ('title', 0),
'-H': ('tooltip', 0),
'-V': ('value', 0),
'-S': ('style', 0),
'-D': ('div', 0),
'-r': ('enum_name', 1),
'-R': ('enum', 2),
'-I': ('input', 2),
'-i': ('rdonly', 2),
'-L': ('rdonly', 2),
'-G': ('group', 2),
'-C': ('checkbox', 2),
'-B': ('pushbutton', 2),
'-l': ('link', 0),
'-E': ('end', 2),
}
def __init__(self, inst_name, instrument_config):
self.host_port = hostport_split(instrument_config['sea'])
self.inst_name = inst_name
self.title = inst_name
self.clients = {}
self.logger_dir = instrument_config.get('logger_dir','')
test_day = instrument_config.get('test_day', None)
self.test_day = [int(x) for x in test_day.split('-')] if test_day else None
self.seaspy = tcp_lineserver.LineClient(self.host_port, ['Spy 007'], True, ridername='spy')
self.seaspy.busy = False
self.seaspy.name = "SEA seaspy"
self.seacmd = None
self.last_client_remove = time.time()
self.history = deque(maxlen=1000)
self.sea_lock = RLock()
self.init()
gevent.Greenlet.spawn(self.checkconnections)
def init(self):
self.values = {}
self.groups = {}
with self.sea_lock:
self.device = sea_request_reply(self.seaspy, "samenv name")[0] # first line
self.consolepos = 0
self.timeStamp = None
self.history.clear()
self.lastcmd = None
def checkconnections(self):
while True:
if len(self.clients) == 0 and self.seaspy.connected:
if time.time() > self.last_client_remove + 30:
logging.info("close SEA connections")
self.history.clear()
self.seaspy.close()
if self.seacmd:
self.seacmd.close()
gevent.sleep(10)
def newClient(self):
if not self.seaspy.connected:
self.init()
return self.register(SeaClient())
def remove(self, client):
Instrument.remove(self, client)
self.last_client_remove = time.time()
def findgroup(self, path):
'get a group from sea and store it'
if not path in self.groups:
self.groups[path] = SeaGroup()
self.groups[path].lastpoll = 0
self.groups[path].lastreq = time.time()
self.poll_groups([path])
return self.groups[path]
def poll_groups(self, paths):
'polls values and components of requested groups'
for path in list(paths):
gobj = self.groups[path]
now = time.time()
if now < gobj.lastpoll + 0.5:
# print 'too fast', path
continue # do not poll before 500 ms have passed
gobj.lastreq = now
gobj.lastpoll = now
try:
with self.sea_lock:
data = sea_request_reply(self.seaspy, 'getgroup '+path)
except Exception as e:
logging.error('ERROR (getgroup %s) %s', path, traceback.format_exc())
continue
components = []
values = {}
grouptitle = None
within_enum = False
item = {}
olditem = {}
for line in data:
(key, type) = self.tags.get(line[0:2], ('',-1))
if type < 0:
continue
if type == 0: # modifier
item[key] = line[2:]
continue
if within_enum and type >= 2:
enum['enum_names'] = enum_names
name = enum['name']
self.values[name] = enum.get('value', '')
values[name] = None #NEW
del enum['value']
components.append(enum)
del enum
within_enum = False
if type == 1:
item['value'] = line[2:]
enum_names.append(item)
item = {}
continue
if key == 'enum':
enum = item
item = {}
within_enum = True
enum['type'] = key
enum['name'] = line[2:]
enum_names = []
continue
if key == 'pushbutton':
if line[2] != ' ':
continue # skip special buttons
line = line[1:] # skip space
try:
item['value'] = item['title']
item['title'] = ' '
# was olditem.get('title',olditem['name'])
except:
pass
if key == 'end':
continue
if key == 'group':
if grouptitle == None:
grouptitle = item.get('title', line[2:])
item = {}
continue
item['type'] = key
name = line[2:]
try:
self.values[name] = item['value']
values[name] = None
del item['value']
except KeyError:
pass
item['name'] = name
components.append(item)
olditem = item
item = {}
if gobj.components != components:
gobj.components = components
gobj.empty_values = values
gobj.version += 1
#print 'changed',path, gobj.version
#print 'FROM', gobj.components
#print 'TO ', components
if grouptitle != None:
gobj.grouptitle = grouptitle
def make_seacmd(self):
global port
if not self.seacmd:
self.seacmd = tcp_lineserver.LineClient(self.host_port,
['seauser seaser', 'fulltransact config listen 1', 'fulltransact commandlog tail 200'],
True, ridername='cmd')
self.seacmd.name = "SEA user"
self.seacmd.connect()
def console(self):
self.make_seacmd()
def addconsole(self, msg):
self.history.append(msg)
self.consolepos += 1
def pollconsole(self):
if not self.seacmd:
return
while True:
line = self.seacmd.get_line()
if line == None:
return None
if (line.startswith('Deleting connection')
or line.startswith('Accepted connection')
or (line.startswith('User ') and ' privilege' in line)
or line.startswith('Change of Authorisation')
or line.startswith('fulltransact config ')
or line.startswith('UserRights = ')
or line.startswith('fulltransact status')
or line == 'OK' or line == 'OK.'
or line.startswith('fulltransact commandlog tail')
or line.startswith('Login OK')
or line.startswith('TRANSACTIONSTART commandlog tail ')
):
pass
elif line.startswith('TRANSACTIONSTART'):
if self.lastcmd != None:
self.addconsole(('command', self.lastcmd, self.lastid))
self.lastcmd = None
elif line == 'TRANSACTIONFINISHED':
self.lastid = 0
elif line.startswith('fulltransact '):
type = 'command'
self.addconsole(('command', line[13:], 0))
elif line.startswith('==='):
self.timeStamp = line
elif line > " ":
if self.timeStamp:
self.addconsole(('reply', self.timeStamp, self.lastid))
self.timeStamp = None
type = 'reply'
self.addconsole(('reply', line, self.lastid))
def getconsole(self, startindex, id):
idx = min(len(self.history), self.consolepos - startindex) # distance from end
messages = []
for i in range(-idx,0):
typ, line, hid = self.history[i]
messages.append(dict(type=typ, line=line, origin=('self' if hid==id else 'other')))
return self.consolepos, messages
def command(self, command, id):
self.make_seacmd()
self.seacmd.send_line('fulltransact '+command)
self.lastid = id
self.lastcmd = command
class SeaInfluxInstrument(SeaInstrument):
def __init__(self, inst_name, instrument_config):
super().__init__(inst_name, instrument_config)
self.db = InfluxDB()
self.influx_data_getter = InfluxDataGetter(self.db, inst_name)
self.device = self.influx_data_getter.get_device_name(int(datetime.now().timestamp()))
def newClient(self):
if not self.seaspy.connected:
self.init()
return self.register(SeaInfluxClient())
class InfluxInstrument(Instrument):
def __init__(self, instr_name):
self.db = InfluxDB()
self.influx_data_getter = InfluxDataGetter(self.db, inst_name)
self.clients = {}
self.title = instr_name
self.device = self.influx_data_getter.get_device_name(int(datetime.now().timestamp()))
def newClient(self):
return self.register(InfluxClient())
class SeaGraph:
HISTORICAL = 0
ACTUAL = 1
LIVE = 2
def __init__(self):
self.livemode = self.HISTORICAL
self.time = [0, 0]
self.lastvalues = {}
self.variables = []
def strip_future(self, result):
'strip future points (happens only on dummy test_day)'
# if self.livemode == self.LIVE:
for c in result.values():
while c:
lastt, lastx = c[-1]
if lastt <= self.time[1]:
break
c.pop()
def complete_to_end(self, result, endtime):
for var, c in result.items():
if c:
lastt, lastx = c[-1]
if lastt < endtime:
c.append((endtime, lastx))
self.lastvalues[var] = (endtime, lastx)
def w_graph(self, variables, time="-1800,0"):
"""get given curves
variables: comma separated list of variables to get
time: comma separated time range (beg,end) values < 1 year are treated as relative to the current time
"""
time = [float(t) for t in time.split(',')]
self.last_t = 0
start, end, now = seagraph.get_abs_time(time + [0])
self.time = [start, end]
variables = variables.split(',')
self.livemode = self.ACTUAL if end >= now else self.HISTORICAL
logging.info('LIVE %g %g %d %d', end, now, end >= now, self.livemode)
self.scanner = seagraph.NumericScanner(instrument.logger_dir, instrument.test_day)
#result = self.scanner.get_message(self.variables, self.time)
#self.time[0] = self.time[1]
result = self.scanner.get_message(variables, self.time, show_empty=True)
self.strip_future(result)
#for var in ('treg.set.reg', 'mf'):
# curve = result.get(var,[(0,0)])
# print(var, curve[0][0] - now, curve[-1][0] - now, curve)
self.complete_to_end(result, end)
self.time[0] = self.time[1]
# reduction not yet implemented
return dict(type='graph-draw', reduced=False, graph=result)
def w_gettime(self, time):
"""parse time (using server time)
time: comma separated time range (beg,end) values < 1 year are treated as relative to the current time
"""
time = [float(t) for t in time.split(',')]
return dict(type='time', time= seagraph.get_abs_time(time))
def w_getvars(self, time):
"""get the curves available at given time (unix timestamp as string)
"""
time = [float(t) for t in time.split(',')]
scanner = seagraph.VarsScanner(instrument.logger_dir, instrument.test_day)
result = dict(type='var_list')
result['blocks'] = list(scanner.get_message(time[-1]).values())
# updates the self.variables attribute to keep track of the available variables
self.variables = [variable["name"] for block in result['blocks'] for variable in block["curves"]]
return result
def w_updategraph(self):
"""update live values - seems not to work"""
logging.info("UPD GRAPH %d", self.livemode)
if self.livemode == self.HISTORICAL:
return dict(type='accept-graph', live=False)
else:
self.livemode = self.LIVE
return dict(type='accept-graph', live=True)
#self.livemode = self.LIVE
#return dict(type='accept-graph', live=True)
def graphpoll(self):
if self.livemode == self.LIVE:
self.time[1], = seagraph.get_abs_time([0])
else:
self.time[1] = self.time[0] # do not update
if self.time[1] > self.time[0]:
result = self.scanner.get_message(self.variables, self.time, show_empty=False)
self.strip_future(result)
if int(self.time[1] / 60) != int(self.time[0] / 60):
# update unchanged values
for var, (lastt, lastx) in self.lastvalues.items():
if var not in result:
result[var] = [(self.time[1], lastx)]
self.time[0] = self.time[1]
if len(result) > 0:
return dict(type='graph-update', reduced=False, time=self.time[1], graph=result)
return None
class SeaParams:
def __init__(self):
self.group_version = {}
self.group_values = {}
self.values = {}
self.consolepos = 0
self.id = uuid.uuid4().hex[0:15]
# SeaGraph.__init__(self)
self.queue = []
def poll(self):
messages = self.queue
self.queue = []
updates = []
# group updates
instrument.poll_groups(self.group_version.keys())
for path, gv in self.group_version.items():
if path in self.group_values:
gobj = instrument.groups[path]
if gv != gobj.version:
logging.info('redraw: %s client: %d instr: %d', path, gv, gobj.version)
self.group_version[path] = gobj.version
messages.append(dict(type='redraw', path=path))
break
else:
values = self.group_values[path]
for name, client_value in values.items():
inst_value = instrument.values.get(name, None)
if client_value != inst_value:
values[name] = inst_value
updates.append({'name': name, 'value': inst_value})
if len(updates) > 0:
messages.append(dict(type='update', updates=updates))
# console messages
instrument.pollconsole()
self.consolepos, msg = instrument.getconsole(self.consolepos, self.id)
messages.extend(msg)
# graph messages
msg = self.graphpoll()
if msg:
messages.append(msg)
return messages
def info(self):
return self.group_version.keys()
def w_getblock(self, path):
gobj = instrument.findgroup(path.split(',')[-1])
# self.group_version[path] = gobj.version
# simplify: allow only one group per client
self.group_version = {path: gobj.version}
logging.info('getblock %s %d', path, gobj.version)
return dict(type='draw', title=gobj.grouptitle, path=path, components=gobj.components)
def w_updateblock(self, path):
gobj = instrument.findgroup(path)
logging.info('make active %s', path)
#if not path in self.group_values:
self.group_values[path] = gobj.empty_values.copy()
return dict(type='accept-block')
def w_console(self):
self.consolepos = 0
instrument.console()
return dict(type='accept-console')
def w_sendcommand(self, command):
instrument.command(command, self.id)
return dict(type='accept-command')
class InfluxParams:
"""Class with dummy routes, in case client side is started with the right part init commands"""
def __init__(self):
self.id = uuid.uuid4().hex[0:15]
self.queue = []
def poll(self):
messages = self.queue
self.queue = []
msg = self.graphpoll()
if msg:
messages.append(msg)
return messages
def info(self):
return ["na"]
def w_getblock(self, path):
return dict(type='draw', title="graph", path=path, components=[])
def w_updateblock(self, path):
return dict(type='accept-block')
def w_console(self):
return dict(type='accept-console')
def w_sendcommand(self, command):
return dict(type='accept-command')
class SeaClient(SeaParams, SeaGraph):
def __init__(self):
SeaParams.__init__(self)
SeaGraph.__init__(self)
class SeaInfluxClient(SeaParams, InfluxGraph):
def __init__(self):
SeaParams.__init__(self)
InfluxGraph.__init__(self, instrument.influx_data_getter, instrument.title)
class InfluxClient(InfluxParams, InfluxGraph):
def __init__(self):
InfluxParams.__init__(self)
InfluxGraph.__init__(self, instrument.influx_data_getter, instrument.title)
class DummyClient(SeaGraph):
asynch = set(('id','update','redraw','command','reply','graph-update','graph-redraw'))
def __init__(self, host_port):
self.linesocket = tcp_lineserver.LineClient(host_port)
self.id = uuid.uuid4().hex[0:15]
self.linesocket.send_line(json.dumps(dict(type='init', id=self.id)))
self.queue = []
self.syncreply = []
SeaGraph.__init__(self)
def cmd_reply(self, command, replytype=None, tmo=5):
self.linesocket.send_line(json.dumps(command))
t = 0
while True:
if self.syncreply:
msg = self.syncreply.pop(0)
break
line = self.linesocket.get_line()
if line != None:
msg = json.loads(line)
if msg['type'] in self.asynch:
t = 0
# print 'PUSH',msg, replytype
self.queue.append(msg)
else:
break
if t >= tmo:
# print 'TIMEOUT'
raise Exception("timeout")
gevent.sleep(0.1)
t += 0.1
if replytype and msg['type'] != replytype:
logging.error('REPLY MISMATCH %s %s <> %s' , command, replytype, msg['type'])
return msg
def w_getblock(self, path):
return self.cmd_reply(dict(type='getblock', path=path, id=self.id), 'draw')
def w_updateblock(self, path):
return self.cmd_reply(dict(type='updateblock', path=path, id=self.id), 'accept-block')
def w_console(self):
return self.cmd_reply(dict(type='console', id=self.id), 'accept-console')
def w_sendcommand(self, command):
return self.cmd_reply(dict(type='sendcommand', command=command, id=self.id), 'accept-command')
def poll(self):
if self.queue:
messages = self.queue
self.queue = []
return messages
line = self.linesocket.get_line()
messages = []
if line:
msg = json.loads(line)
if msg['type'] in self.asynch:
messages.append(msg)
else:
self.syncreply.append(msg)
# graph messages
msg = self.graphpoll()
if msg:
messages.append(msg)
return messages
def info(self):
return ["na"]
class DummyInstrument(Instrument):
def __init__(self, inst_name, instrument_config):
self.instrument_config = instrument_config
self.host_port = hostport_split(instrument_config['hostport'])
self.logger_dir = instrument_config.get('logger_dir', '')
test_day = instrument_config.get('test_day', None)
self.test_day = [int(x) for x in test_day.split('-')] if test_day else None
self.title = inst_name
self.clients = {}
def newClient(self):
return self.register(DummyClient(self.host_port))
class SecopMsg:
def __init__(self, line):
self.par = None
self.value = None
sl = line.split(' ')
if len(sl[0].split(',')) > 1:
self.type = 'idn'
self.value = line
else:
self.type = sl[0]
if len(sl) > 1:
self.par = sl[1]
if len(sl) > 2:
self.value = json.loads(' '.join(sl[2:]))
self.asynch = self.type in ('update', 'error_update')
def __repr__(self):
value = repr(self.value)
if len(value) > 50:
value = value[:50] + '...'
return "SecopMsg('%s %s %s')" % (self.type, self.par, value)
def SecopEncode(cmd, par=None, value=None):
line = cmd
if par:
line += " " + par
if value:
line += " " + json.dumps(value)
return line
def convert_par(module, name, par):
result = dict(type='input', name=module+":"+name, title=name)
if par.get('readonly', True):
result['type'] = 'rdonly'
else:
result['command'] = 'change %s:%s' % (module, name)
if par['datainfo']['type'] == 'enum':
result['enum_names'] = [dict(title=k, value=v) for k, v in par['datainfo']['members'].items()]
result['type'] = 'enum'
elif par['datainfo']['type'] == 'bool':
result['type'] = 'checkbox'
return result
def convert_event(messages):
if isinstance(messages, SecopMsg):
messages = [messages]
updates = []
for msg in messages:
if msg['type'] == 'update':
updates.append(dict(name=msg.par, value=msg.value[0]))
# updates.append(dict(name=msg.par, value=str(msg.value[0])))
return [dict(type='update', updates=updates)]
class SecopClient:
prio_par = ["value", "status", "target"]
hide_par = ["baseclass", "class", "pollinterval"]
skip_par = ["status2"]
def __init__(self, host_port):
self.linesocket = tcp_lineserver.LineClient(host_port)
self.id = uuid.uuid4().hex[0:15]
self.queue = []
self.syncreply = []
self.consolequeue = []
#self.out = open("debug.txt", "w")
#self.out = sys.stdout
self.out = None
idn = self.cmd_reply("*IDN?", "idn")
self.idn = idn.value
self.description = self.cmd_reply("describe", "describing").value
def cmd_reply(self, command, replytype, tmo=5):
self.replytype = replytype
if self.out: self.out.write(">"+command+"\n")
self.consolequeue.append(dict(type='command',line=command,origin='self'))
self.linesocket.send_line(command)
t = 0
while True:
if self.syncreply:
msg = self.syncreply.pop(0)
break
line = self.linesocket.get_line()
if line != None:
self.consolequeue.append(dict(type='reply',line=line,origin='other'))
if self.out: self.out.write("<"+line+"\n")
msg = SecopMsg(line)
#print '<', msg['type'], msg.par
if msg.asynch and replytype != msg['type'] + "=" + msg.par:
t = 0
self.queue.append(msg)
else:
break
if t >= tmo:
#print 'TIMEOUT'
raise Exception("timeout")
gevent.sleep(0.1)
t += 0.1
#print 'REPLY', msg['type'], msg.par, json.dumps(msg.value)[0:50]
if not replytype.startswith(msg['type']):
logging.error('REPLY MISMATCH %s <> %s', replytype, '<>', repr(msg))
self.replytype = ""
return msg
def w_getblock(self, path):
path = path.split(',')[-1]
if path == "main":
components = []
for name, m in self.description["modules"].items():
#components.append(convert_par(name, 'value', m['parameters']['value']))
components.append(dict(type='rdlink', name=name+':value', title=name))
#print components
return dict(type='draw', path='main', title='modules', components=components)
else:
module = self.description['modules'][path]
parameters = module["parameters"]
components = []
for name in SecopClient.skip_par:
if name in parameters:
parameters.pop(name)
for name in SecopClient.prio_par:
if name in parameters:
components.append(convert_par(path, name, parameters.pop(name)))
components1 = []
for name in SecopClient.hide_par:
if name in parameters:
components1.append(convert_par(path, name, parameters.pop(name)))
for name, p in parameters.items():
components.append(convert_par(path, name, parameters[name]))
components.extend(components1)
return dict(type='draw', path=path, title=path, components=components)
def w_updateblock(self, path):
self.cmd_reply("activate", "active")
return dict(type='accept-block')
def w_console(self):
return dict(type='accept-console')
def w_sendcommand(self, command):
#print 'COMMAND', command
cmd = "change " + command
return self.cmd_reply(cmd, 'event ' + command.split(' ')[0])
def poll(self):
if self.consolequeue:
messages = self.consolequeue
self.consolequeue = []
return messages
if self.queue:
messages = convert_event(self.queue)
self.queue = []
return messages
line = self.linesocket.get_line()
if line:
self.consolequeue.append(dict(type='reply',line=line,origin='other'))
if self.out: self.out.write("<"+line+"\n")
msg = SecopMsg(line)
if msg.asynch and self.replytype != msg['type'] + "=" + msg.par:
return convert_event(SecopMsg(line))
self.syncreply.append(msg)
return []
def info(self):
return ["na"]
class SecopInstrument(Instrument):
def __init__(self, inst_name, instrument_config):
self.instrument_config = instrument_config
self.host_port = hostport_split(instrument_config['hostport'])
self.logger_dir = instrument_config.get('logger_dir', '')
test_day = instrument_config.get('test_day', None)
self.test_day = [int(x) for x in test_day.split('-')] if test_day else None
self.title = inst_name
self.clients = {}
def newClient(self):
return self.register(SecopClient(self.host_port))
class Logger(object):
def __init__(self, logpath):
self.terminal = sys.stdout
self.log = open(logpath, "a")
def write(self, message):
self.terminal.write(message)
self.log.write(message)
def flush(self):
pass
def handle_pdb(sig, frame):
import pdb
print('PDB')
pdb.Pdb().set_trace(frame)
if __name__ == '__main__':
signal.signal(signal.SIGUSR1, handle_pdb)
print('PID', os.getpid())
if len(sys.argv) > 3:
instrument_config = {}
for arg in sys.argv[1:]:
split = arg.split('=')
instrument_config[split[0]] = '='.join(split[1:])
port = int(instrument_config['port'])
inst_name = instrument_config['instrument']
else:
# take config from instruments.json
try:
port = int(sys.argv[1])
except IndexError:
port = 5000
try:
inst_name = sys.argv[2]
except IndexError:
inst_name = 'seadummy'
with open('instruments.json') as f:
instrument_list = json.load(f)
instrument_config = instrument_list[inst_name]
logging.basicConfig(filename='log/%s.log' % inst_name, filemode='w', level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s')
# sys.stdout = Logger(instrument_config.get('logger_dir', '.') + '/seaweb_stdout.txt')
# print '-' * 80
type = instrument_config.get('type', 'sea')
if type == 'sea':
instrument = SeaInstrument(inst_name, instrument_config)
elif type == 'influxsea':
instrument = SeaInfluxInstrument(inst_name, instrument_config)
elif type == 'influx':
instrument = InfluxInstrument(inst_name)
elif type == 'dummy':
instrument = DummyInstrument(inst_name, instrument_config)
elif type == 'secop':
instrument = SecopInstrument(inst_name, instrument_config)
else:
raise TypeError('bad instrument type')
app.debug = True
#server = gevent.wsgi.WSGIServer(('', port), app, keyfile='key.key', certfile='key.crt')
server = gevent.pywsgi.WSGIServer(('', port), app, log=logging.getLogger('server'))
server.serve_forever()
# Then visit http://localhost:5000/test for a test