first version of csaxs eps
This commit is contained in:
@@ -1 +1,15 @@
|
||||
############################################################
|
||||
|
||||
|
||||
|
||||
############################################################
|
||||
##################### EPS ##################################
|
||||
############################################################
|
||||
x12saEPS:
|
||||
description: X12SA EPS info and control
|
||||
deviceClass: csaxs_bec.devices.epics.eps.cSAXSEps
|
||||
deviceConfig: {}
|
||||
enabled: true
|
||||
onFailure: buffer
|
||||
readOnly: false
|
||||
readoutPriority: baseline
|
||||
204
csaxs_bec/devices/epics/eps.py
Normal file
204
csaxs_bec/devices/epics/eps.py
Normal file
@@ -0,0 +1,204 @@
|
||||
|
||||
from ophyd import Device, Component as Cpt, EpicsSignal
|
||||
|
||||
|
||||
|
||||
# ---------------------------
|
||||
# Registry: sections/channels
|
||||
# ---------------------------
|
||||
CHANNELS = {
|
||||
"Valves Frontend": [
|
||||
{"attr": "FEVVPG0000", "label": "FE-VVPG-0000", "pv": "X12SA-FE-VVPG-0000:PLC_OPEN"},
|
||||
{"attr": "FEVVPG1010", "label": "FE-VVPG-1010", "pv": "X12SA-FE-VVPG-1010:PLC_OPEN"},
|
||||
{"attr": "FEVVFV2010", "label": "FE-VVFV-2010", "pv": "X12SA-FE-VVFV-2010:PLC_OPEN"},
|
||||
{"attr": "FEVVPG2010", "label": "FE-VVPG-2010", "pv": "X12SA-FE-VVPG-2010:PLC_OPEN"},
|
||||
],
|
||||
|
||||
"Valves Optics Hutch": [
|
||||
{"attr": "OPVVPG1010", "label": "OP-VVPG-1010", "pv": "X12SA-OP-VVPG-1010:PLC_OPEN"},
|
||||
{"attr": "OPVVPG2010", "label": "OP-VVPG-2010", "pv": "X12SA-OP-VVPG-2010:PLC_OPEN"},
|
||||
{"attr": "OPVVPG3010", "label": "OP-VVPG-3010", "pv": "X12SA-OP-VVPG-3010:PLC_OPEN"},
|
||||
{"attr": "OPVVPG3020", "label": "OP-VVPG-3020", "pv": "X12SA-OP-VVPG-3020:PLC_OPEN"},
|
||||
{"attr": "OPVVPG4010", "label": "OP-VVPG-4010", "pv": "X12SA-OP-VVPG-4010:PLC_OPEN"},
|
||||
{"attr": "OPVVPG5010", "label": "OP-VVPG-5010", "pv": "X12SA-OP-VVPG-5010:PLC_OPEN"},
|
||||
{"attr": "OPVVPG6010", "label": "OP-VVPG-6010", "pv": "X12SA-OP-VVPG-6010:PLC_OPEN"},
|
||||
{"attr": "OPVVPG7010", "label": "OP-VVPG-7010", "pv": "X12SA-OP-VVPG-7010:PLC_OPEN"},
|
||||
],
|
||||
|
||||
"Valves ES Hutch": [
|
||||
{"attr": "ESVVPG1010", "label": "ES-VVPG-1010", "pv": "X12SA-ES-VVPG-1010:PLC_OPEN"},
|
||||
],
|
||||
|
||||
"Shutters Frontend": [
|
||||
{"attr": "FEPSH1", "label": "FE-PSH1-EMLS-0010", "pv": "X12SA-FE-PSH1-EMLS-0010:OPEN"},
|
||||
{"attr": "FESTO1", "label": "FE-STO1-EMLS-0010", "pv": "X12SA-FE-STO1-EMLS-0010:OPEN"},
|
||||
],
|
||||
|
||||
"Shutters Endstation": [
|
||||
{"attr": "ESPSH17010", "label": "OP-PSH1-EMLS-7010", "pv": "X12SA-OP-PSH1-EMLS-7010:OPEN"},
|
||||
],
|
||||
}
|
||||
|
||||
|
||||
class cSAXSEps(Device):
|
||||
# -------------------------------------------------------------------------
|
||||
# Cpt definitions (AUTO-GENERATED FROM CHANNELS: one per registry entry)
|
||||
# -------------------------------------------------------------------------
|
||||
USER_ACCESS = [
|
||||
"show_all",
|
||||
]
|
||||
SUB_VALUE = "value"
|
||||
_default_sub = SUB_VALUE
|
||||
# Valves Frontend
|
||||
FEVVPG0000 = Cpt(EpicsSignal, name="FE-VVPG-0000", read_pv="X12SA-FE-VVPG-0000:PLC_OPEN")
|
||||
FEVVPG1010 = Cpt(EpicsSignal, name="FE-VVPG-1010", read_pv="X12SA-FE-VVPG-1010:PLC_OPEN")
|
||||
FEVVFV2010 = Cpt(EpicsSignal, name="FE-VVFV-2010", read_pv="X12SA-FE-VVFV-2010:PLC_OPEN")
|
||||
FEVVPG2010 = Cpt(EpicsSignal, name="FE-VVPG-2010", read_pv="X12SA-FE-VVPG-2010:PLC_OPEN")
|
||||
|
||||
# Valves Optics Hutch
|
||||
OPVVPG1010 = Cpt(EpicsSignal, name="OP-VVPG-1010", read_pv="X12SA-OP-VVPG-1010:PLC_OPEN")
|
||||
OPVVPG2010 = Cpt(EpicsSignal, name="OP-VVPG-2010", read_pv="X12SA-OP-VVPG-2010:PLC_OPEN")
|
||||
OPVVPG3010 = Cpt(EpicsSignal, name="OP-VVPG-3010", read_pv="X12SA-OP-VVPG-3010:PLC_OPEN")
|
||||
OPVVPG3020 = Cpt(EpicsSignal, name="OP-VVPG-3020", read_pv="X12SA-OP-VVPG-3020:PLC_OPEN")
|
||||
OPVVPG4010 = Cpt(EpicsSignal, name="OP-VVPG-4010", read_pv="X12SA-OP-VVPG-4010:PLC_OPEN")
|
||||
OPVVPG5010 = Cpt(EpicsSignal, name="OP-VVPG-5010", read_pv="X12SA-OP-VVPG-5010:PLC_OPEN")
|
||||
OPVVPG6010 = Cpt(EpicsSignal, name="OP-VVPG-6010", read_pv="X12SA-OP-VVPG-6010:PLC_OPEN")
|
||||
OPVVPG7010 = Cpt(EpicsSignal, name="OP-VVPG-7010", read_pv="X12SA-OP-VVPG-7010:PLC_OPEN")
|
||||
|
||||
# Valves ES Hutch
|
||||
ESVVPG1010 = Cpt(EpicsSignal, name="ES-VVPG-1010", read_pv="X12SA-ES-VVPG-1010:PLC_OPEN")
|
||||
|
||||
# Shutters Frontend
|
||||
FEPSH1 = Cpt(EpicsSignal, name="FE-PSH1-EMLS-0010", read_pv="X12SA-FE-PSH1-EMLS-0010:OPEN")
|
||||
FESTO1 = Cpt(EpicsSignal, name="FE-STO1-EMLS-0010", read_pv="X12SA-FE-STO1-EMLS-0010:OPEN")
|
||||
|
||||
# Shutters Endstation
|
||||
ESPSH17010 = Cpt(EpicsSignal, name="OP-PSH1-EMLS-7010", read_pv="X12SA-OP-PSH1-EMLS-7010:OPEN")
|
||||
|
||||
# ------------------------
|
||||
# Status / consistency API
|
||||
# ------------------------
|
||||
def show_all(self):
|
||||
|
||||
"""
|
||||
Print the status of all valves and shutters, grouped by section, driven by CHANNELS.
|
||||
- Green = OPEN (True), Red = CLOSED (False)
|
||||
- Missing attributes are shown as MISSING
|
||||
"""
|
||||
# ANSI colors
|
||||
red = "\x1b[91m"
|
||||
green = "\x1b[92m"
|
||||
white = "\x1b[0m"
|
||||
bold = "\x1b[1m"
|
||||
|
||||
def safe_get_attr_value(attr_name):
|
||||
obj = getattr(self, attr_name, None)
|
||||
if obj is None:
|
||||
return None
|
||||
try:
|
||||
return bool(obj.get())
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
def render_line(label, is_open):
|
||||
if is_open is None:
|
||||
return f" - {label:<24} {red}MISSING{white}"
|
||||
color = green if is_open else red
|
||||
status = "OPEN" if is_open else "CLOSED"
|
||||
return f" - {label:<24} {color}{status}{white}"
|
||||
|
||||
def section_header(title):
|
||||
print(f"\n{bold}{title}{white}")
|
||||
|
||||
def show_section(title, items):
|
||||
section_header(title)
|
||||
|
||||
values = [(item["label"], safe_get_attr_value(item["attr"])) for item in items]
|
||||
|
||||
# Summary
|
||||
vals_present = [v for (_, v) in values if v is not None]
|
||||
if not vals_present:
|
||||
print(" (no channels found on this device)")
|
||||
else:
|
||||
total = len(vals_present)
|
||||
open_cnt = sum(1 for v in vals_present if v)
|
||||
all_open = (total > 0 and open_cnt == total)
|
||||
if all_open:
|
||||
print(f"{green}All channels in this section are open.{white}")
|
||||
else:
|
||||
print(f"{red}Warning: Not all channels in this section are open. ({open_cnt}/{total} open){white}")
|
||||
|
||||
# Per-channel lines
|
||||
for label, val in values:
|
||||
print(render_line(label, val))
|
||||
|
||||
print("X12SA valve/shutter status")
|
||||
for section, items in CHANNELS.items():
|
||||
show_section(section, items)
|
||||
|
||||
def consistency_report(self, *, verbose=True):
|
||||
"""
|
||||
Checks for:
|
||||
- attributes listed in CHANNELS but missing on the device
|
||||
- duplicate PVs in CHANNELS (should be none in a real system)
|
||||
"""
|
||||
# Missing attributes
|
||||
missing_attrs = []
|
||||
# Duplicate PVs
|
||||
seen_pvs = {}
|
||||
duplicates = []
|
||||
|
||||
for section, items in CHANNELS.items():
|
||||
for it in items:
|
||||
# attr present?
|
||||
if getattr(self, it["attr"], None) is None:
|
||||
missing_attrs.append((section, it["attr"], it["label"], it["pv"]))
|
||||
|
||||
# PV uniqueness
|
||||
pv = it["pv"]
|
||||
if pv in seen_pvs:
|
||||
duplicates.append((pv, seen_pvs[pv], (section, it["attr"], it["label"])))
|
||||
else:
|
||||
seen_pvs[pv] = (section, it["attr"], it["label"])
|
||||
|
||||
if verbose:
|
||||
print("=== Consistency Report ===")
|
||||
if missing_attrs:
|
||||
print("\nMissing attributes on device (define matching Cpt):")
|
||||
for section, attr, label, pv in missing_attrs:
|
||||
print(f" - [{section}] {attr} ({label}) pv={pv}")
|
||||
else:
|
||||
print("\nNo missing attributes.")
|
||||
|
||||
if duplicates:
|
||||
print("\nDuplicate PVs in CHANNELS (fix registry; real system should have none):")
|
||||
for pv, first, dup in duplicates:
|
||||
fsec, fattr, flabel = first
|
||||
dsec, dattr, dlabel = dup
|
||||
print(f" - {pv}\n first: [{fsec}] {fattr} ({flabel})\n second: [{dsec}] {dattr} ({dlabel})")
|
||||
else:
|
||||
print("\nNo duplicate PVs.")
|
||||
|
||||
return {
|
||||
"missing_attrs": missing_attrs,
|
||||
"duplicate_pvs": duplicates,
|
||||
}
|
||||
|
||||
|
||||
# -------------------------------
|
||||
# Developer helper (optional): re-generate Cpt definitions from CHANNELS
|
||||
# -------------------------------
|
||||
def codegen_cpt_definitions():
|
||||
"""Print Cpt(...) definitions for all channels based on CHANNELS."""
|
||||
lines = []
|
||||
lines.append("# === AUTO-GENERATED FROM CHANNELS ===")
|
||||
for section, items in CHANNELS.items():
|
||||
lines.append(f"\n# {section}")
|
||||
for it in items:
|
||||
attr = it["attr"]
|
||||
label = it["label"]
|
||||
pv = it["pv"]
|
||||
lines.append(
|
||||
f'{attr} = Cpt(EpicsSignal, name="{label}", read_pv="{pv}")'
|
||||
)
|
||||
print("\n".join(lines))
|
||||
Reference in New Issue
Block a user