Files
cristallina_analysis_package/src/cristallina/SEA_GraphClient.py

192 lines
6.0 KiB
Python

"""client for SEA GraphServer
Usage:
# open a client to host 'samenv', port 8764
# the port number for the graph server may be retrived by
# the command 'sea list' on the samenv machine
client = GraphClient('samenv:8764')
# get one curve
tlist, vlist, period = client.get_curves(start, end, name)
or
# get all important curves
curves = client.get_curves(start, end)
# where:
start, end: interval (unix time, as retrieved from time.time())
name: the name of a curve (if no name or a list of names
is given, the result is a curves dict)
curves: dict <name> of [tlist, vlist, period]
tlist: time axis (unix time)
vlist: values (y-axis)
period: the expected resolution (a hint for graphic clients,
saying that for a time step t(n) - t(n-1) significantly
bigger than period, an additional point should be added
at t(n) - period)
"""
import socket
import time
FINISH = b'\nTRANSACTIONFINISHED'
START = b'TRANSACTIONSTART'
FMIN = min(FINISH[1:])
FMAX = max(FINISH[1:])
def expect_reply(sock, expected):
while expected:
got = sock.recv(8192)
if not expected.startswith(got):
raise ValueError('expected %r but got %r' % (expected, got))
expected = expected[len(got):]
def raw_sics_client(hostport, login):
if ':' in hostport:
host, port = hostport.split(':')
hostport = (host, int(port))
sock = socket.create_connection(hostport, timeout=3)
bbuf = b''
expect_reply(sock, b'OK\n')
sock.sendall(login.encode('latin-1') + b'\n')
expect_reply(sock, b'Login OK\n')
request = yield None
while True:
sock.sendall(b'fulltransact %s\n' % request.encode('latin-1'))
try:
reply = sock.recv(8192)
if not reply:
sock.close()
return
except socket.timeout:
sock.shutdown(socket.SHUT_RDWR)
sock.close()
raise
before, tag, after = reply.partition(FINISH)
if tag:
result = bbuf + before
elif FMIN <= reply[0] <= FMAX: # the FINISH tag may have been cut
bbuf, tag, after = (bbuf + reply).partition(FINISH)
if not tag:
continue
result = bbuf
else:
bbuf += before
continue
bbuf = after[1:]
before, tag, result = result.rpartition(START)
if tag:
before, nl, result = result.partition(b'\n')
try:
request = yield result.decode('latin-1')
except GeneratorExit:
sock.shutdown(socket.SHUT_RDWR)
sock.close()
return
def sics_client(hostport, command=None, login='Spy 007'):
sics = raw_sics_client(hostport, login)
next(sics)
if command is None:
return sics
result = sics.send(command)
sics.close()
return result
class GraphClient:
def __init__(self, hostport):
self.sc = sics_client(hostport)
def close(self):
self.sc.close()
def get_raw(self, start, end, *args):
"""get raw curves (values as text)"""
arglist = ' '.join(args)
try:
reply = self.sc.send(f'graph {start} {end} {arglist}')
except StopIteration:
raise ConnectionError('connection closed')
lines = reply.split('\n')
curve = None
result = {}
t = 0
for line in lines[1:]: # skip first line
if line.startswith('*'):
spl = line[1:].split()
key = spl[0]
if key in ('0', '1'):
break
tlist = []
vlist = []
curve = [tlist, vlist, 1]
if len(spl) >= 3 and spl[1] == 'period':
curve[2] = float(spl[2])
result[key] = curve
t = 0
else:
tdif, _, value = line.partition(' ')
try:
t += float(tdif)
except ValueError:
print(lines)
tlist.append(t)
vlist.append(value)
return result
def get_names(self, start, end=None):
"""get names and properties of curves configured to be display on SEA GUI graphics"""
end = start if end is None else end
result = self.get_raw(start, end, 'text', 'vars')
curves = {}
for vlist in result['vars'][1]: # text values
for item in vlist.split():
item = item.split('|')
curves[item[0]] = item[1:] + [''] * (4 - len(item))
return curves
def get_curves(self, start, end, name=None, none_value=None, nmax=None):
"""get curves
start, end: interval (unix time, as retrieved from time.time())
non positive values are taken relative to the current time
name: a single name or a list of names or None to get all curves (as shown in the SEA GUI)
none_value: replacement when no value is defined
nmax: max. number of points per curve
when name is a string, returns [tlist, vlist, period]
when name is None or a list of strings (names) returns a dict <name> of [tlist, vlist, period]
tlist: time axis (unix time)
vlist: values (y-axis)
period: the expected resolution (a hint for graphic clients)
"""
if isinstance(name, str):
names = [name]
elif name is None:
names = self.get_names(start, end)
else: # assume names is a list of strings
names = name
args = ['np', str(nmax)] + names if nmax else names
result = self.get_raw(start, end, *args)
for key, curve in result.items():
vlist = curve[1]
for i, v in enumerate(vlist):
try:
vlist[i] = float(v)
except ValueError:
vlist[i] = none_value
return result[name] if isinstance(name, str) else result