Files
maloja/camcheck.py

117 lines
3.0 KiB
Python
Executable File

#!/photonics/home/gac-maloja/.conda/envs/mdaq/bin/python
#!/usr/bin/env python
import logging
class DisabledLogger():
def __enter__(self):
logging.disable(logging.CRITICAL)
def __exit__(self, exit_type, exit_value, exit_traceback):
logging.disable(logging.NOTSET)
#from collections import defaultdict
from datetime import datetime, timedelta
from bsread import dispatcher
import data_api as dapi
import epics
from colorama import Fore, Back, Style
true = Fore.GREEN + "" + Style.RESET_ALL
false = Fore.RED + "" + Style.RESET_ALL
cam_names = [
# "DOES-NOT-EXIST",
"SATES21-CAMS154-M1",
"SATES24-CAMS161-M1",
# "SATES21-PATT-M1" # renamed to below
"SATES21-CAMS-PATT1"
]
cam_names += [f"SATES21-CAMS154-GIGE{i+1}" for i in range(8)]
width = max(len(str(i)) for i in cam_names)
fpics = [i + ":FPICTURE" for i in cam_names]
pvs = [epics.get_pv(c) for c in fpics]
pv_states = [pv.wait_for_connection(timeout=0.5) for pv in pvs]
base_url = "https://dispatcher-api.psi.ch/sf-imagebuffer" # needed due to data/image buffer split (Jan 2022)
bs_chans = dispatcher.get_current_channels(base_url)
bs_chans = set(x["name"] for x in bs_chans)
bs_states = [(i in bs_chans) for i in fpics]
#data = defaultdict(list)
#for f in fpics:
# res = dapi.search(f)
# for i in res:
# if not i["channels"]:
# continue
# be = i["backend"]
# data[f].append(be)
# print(f, be)
now = datetime.now()
start = now - timedelta(minutes=1)
end = start + timedelta(seconds=1)
ib_chans = ["sf-imagebuffer/" + i for i in fpics] #TODO does this speed this up?
aggregation = dapi.Aggregation(aggregation_type="value", aggregations=["sum"], nr_of_bins=1)
with DisabledLogger():
try:
base_url = "https://data-api.psi.ch/sf-imagebuffer" # needed due to data/image buffer split (Jan 2022)
print("\nCheck with Simon to get the image buffer channel search fixed!\n")
ib_data = dapi.get_data(channels=ib_chans, start=start, end=end, aggregation=aggregation, base_url=base_url)
except RuntimeError as e:
print("dapi got", e)
ib_data = {}
def get_state(f):
try:
d = ib_data[f + ":sum"]
except KeyError:
try:
d = ib_data[f]
except KeyError:
return False
if len(d) != 1:
print(d)
return not all(d.isna())
ib_states = [get_state(f) for f in fpics]
head = ("name", "PV", "BS", "IB")
bool_width = 2
bool_line = "-" * bool_width
line = ["-" * width] + [bool_line] * 3
res = [head, line]
for cam, pv_state, bs_state, ib_state in zip(cam_names, pv_states, bs_states, ib_states):
pv_state = true if pv_state else false
bs_state = true if bs_state else false
ib_state = true if ib_state else false
new = (cam, pv_state, bs_state, ib_state)
res.append(new)
print()
for name, pv, bs, ib in res:
name = name.ljust(width)
# pv = pv.ljust(bool_width)
# bs = bs.ljust(bool_width)
# ib = ib.ljust(bool_width)
print(name, pv, bs, ib)