first potentially useful version

This commit is contained in:
2020-09-22 12:43:16 +02:00
parent 0245823e43
commit da81973bc0
22 changed files with 289 additions and 267 deletions

0
utils/__init__.py Normal file
View File

44
utils/alarms.py Normal file
View File

@ -0,0 +1,44 @@
STATUS = {
0: "no alarm",
1: "read",
2: "write",
3: "hihi",
4: "high",
5: "lolo",
6: "low",
7: "state",
8: "cos",
9: "comm",
10: "timeout",
11: "hw limit",
12: "calc",
13: "scan",
14: "link",
15: "soft",
16: "bad sub",
17: "udf",
18: "disable",
19: "simm",
20: "read access",
21: "write access"
}
SEVERITY = {
0: "no alarm",
1: "minor",
2: "major",
3: "invalid"
}
def message(status_code, severity_code):
status = STATUS.get(status_code, "unknown")
if status_code == severity_code == 0:
return status
severity = SEVERITY.get(severity_code, "unknown")
return f"{status} ({severity})"

13
utils/colors.py Normal file
View File

@ -0,0 +1,13 @@
from colorama import Fore
RED = Fore.RED
GREEN = Fore.GREEN
YELLOW = Fore.YELLOW
def colored(color, msg):
return color + str(msg) + Fore.RESET

16
utils/consts.py Normal file
View File

@ -0,0 +1,16 @@
from utils import colors
COL_NOT_CONNECTED = colors.RED
COL_SUCCESS = colors.GREEN
COL_ALARM = colors.YELLOW
MSG_NOT_CONNECTED = "did not connect"
MSG_SUCCESS = "OK"
SYM_GOOD = "👍"
SYM_BAD = "💔"

47
utils/df.py Normal file
View File

@ -0,0 +1,47 @@
import numpy as np
import pandas as pd
def compare_dfs(df1, df2):
df = pd.concat((df1, df2))
diff = df.groupby(level=0).agg(report_diff)
drop_empty(diff)
return diff
def report_diff(x):
return "" if equal(*x) else " {} | {}".format(*x) #TODO: color left and right differently
def equal(a, b):
return a == b or (np.isnan(a) and np.isnan(b))
def drop_empty(df):
replace_empty_nan(df)
drop_nan_cols(df)
drop_nan_rows(df)
replace_nan_empty(df)
def replace_empty_nan(df):
df.replace("", np.nan, inplace=True)
def replace_nan_empty(df):
df.replace(np.nan, "", inplace=True)
def drop_nan_cols(df):
df.dropna(axis="columns", how="all", inplace=True)
def drop_nan_rows(df):
df.dropna(axis="index", how="all", inplace=True)
def drop_col(df, name):
df.drop(name, axis="columns", inplace=True)
def count_true(bdf):
good = bdf[bdf]
ngood = len(good)
return ngood

47
utils/epics.py Normal file
View File

@ -0,0 +1,47 @@
import numpy as np
from .alarms import message
from .colors import colored
from .consts import COL_NOT_CONNECTED, COL_SUCCESS, COL_ALARM
from .consts import MSG_NOT_CONNECTED, MSG_SUCCESS
class DataGetter:
def __init__(self, timeout, silent):
self.timeout = timeout
self.silent = silent
def __call__(self, pv):
connected = pv.wait_for_connection(self.timeout)
if not connected:
value = np.nan
status = severity = -1
msg = MSG_NOT_CONNECTED
col = COL_NOT_CONNECTED
else:
value = pv.value
status = pv.status
severity = pv.severity
if status == 0 and severity == 0:
msg = MSG_SUCCESS
col = COL_SUCCESS
else:
msg = message(status, severity)
col = COL_ALARM
data = {
"connected": connected,
"value": value,
"status": status,
"severity": severity
}
if not self.silent:
msg = colored(col, msg)
print(pv.pvname, msg)
return data

20
utils/execute.py Normal file
View File

@ -0,0 +1,20 @@
from concurrent.futures import ThreadPoolExecutor
def parallel(func, targets, names=None):
with ThreadPoolExecutor() as executor:
results = executor.map(func, targets)
if names:
return dict(zip(names, results))
else:
return list(results)
def serial(func, targets, names=None):
if names:
return {n: func(t) for n, t in zip(names, targets)}
else:
return [func(t) for t in targets]

40
utils/fileio.py Normal file
View File

@ -0,0 +1,40 @@
import pandas as pd
def load_csv(fname):
fname = fix_file_ext(fname, "csv")
return pd.read_csv(fname, index_col=0, comment="#", float_precision="high")
def store_csv(df, fname, meta=None):
fname = fix_file_ext(fname, "csv")
with open(fname, "w") as f:
if meta is not None:
f.write(f"# {meta}\n")
df.to_csv(f)
def fix_file_ext(fn, ext):
if not ext.startswith("."):
ext = "." + ext
if not fn.endswith(ext):
fn += ext
return fn
def load_config(fname):
chans = set()
for line in read_lines(fname):
line = remove_comments(line).strip()
if not line:
continue
chans.add(line)
return sorted(chans)
def read_lines(fname):
with open(fname, "r") as f:
yield from f
def remove_comments(line, comment_char="#"):
return line.split(comment_char)[0]

11
utils/printing.py Normal file
View File

@ -0,0 +1,11 @@
from .consts import SYM_GOOD, SYM_BAD
def print_good(*args, **kwargs):
return print(SYM_GOOD, *args, **kwargs)
def print_bad(*args, **kwargs):
return print(SYM_BAD, *args, **kwargs)