diff --git a/csaxs_bec/device_configs/omny_config.yaml b/csaxs_bec/device_configs/omny_config.yaml index a0ae5c5..b275569 100644 --- a/csaxs_bec/device_configs/omny_config.yaml +++ b/csaxs_bec/device_configs/omny_config.yaml @@ -10,6 +10,17 @@ omny_samples: readOnly: false readoutPriority: baseline ############################################################ +##################### OMNY samples ######################### +############################################################ +omny_vcs: + description: OMNYVCS + deviceClass: csaxs_bec.devices.omny.omny_vcs.OMNYVCS + deviceConfig: {} + enabled: true + onFailure: buffer + readOnly: false + readoutPriority: baseline +############################################################ ##################### OMNY dewar ########################### ############################################################ omny_dewar: diff --git a/csaxs_bec/devices/omny/omny_vcs.py b/csaxs_bec/devices/omny/omny_vcs.py new file mode 100644 index 0000000..8f0561b --- /dev/null +++ b/csaxs_bec/devices/omny/omny_vcs.py @@ -0,0 +1,161 @@ +import time +import datetime + +from ophyd import Component as Cpt +from ophyd import Device +from ophyd import DynamicDeviceComponent as Dcpt +from ophyd import EpicsSignal +from prettytable import FRAME, PrettyTable +import numpy as np + +class OMNYVCSError(Exception): + pass + + +class OMNYVCS(Device): + USER_ACCESS = [ + "show_all", + "valves_in_measurement_position", + "help", + ] + SUB_VALUE = "value" + _default_sub = SUB_VALUE + + + + XOMNY_ES1_EXPMP1_VOLTAGE = Cpt( + EpicsSignal, name="XOMNY_ES1_EXPMP1_VOLTAGE", read_pv="XOMNY-ES1-EXPMP1:VOLTAGE" + ) + + XOMNY_ES1_EXPMP1_PRESSURE = Cpt( + EpicsSignal, name="XOMNY_ES1_EXPMP1_PRESSURE", read_pv="XOMNY-ES1-EXPMP1:PRESSURE" + ) + + XOMNY_ES1_PU2MT1_VOLTAGE = Cpt( + EpicsSignal, name="XOMNY_ES1_PU2MT1_VOLTAGE", read_pv="XOMNY-ES1-PU2MT1:VOLTAGE" + ) + + XOMNY_ES1_PU2MT1_PRESSURE = Cpt( + EpicsSignal, name="XOMNY_ES1_PU2MT1_PRESSURE", read_pv="XOMNY-ES1-PU2MT1:PRESSURE" + ) + + XOMNY_ES1_PU1MF1_VOLTAGE = Cpt( + EpicsSignal, name="XOMNY_ES1_PU1MF1_VOLTAGE", read_pv="XOMNY-ES1-PU1MF1:VOLTAGE" + ) + + XOMNY_ES1_PU1MF1_PRESSURE = Cpt( + EpicsSignal, name="XOMNY_ES1_PU1MF1_PRESSURE", read_pv="XOMNY-ES1-PU1MF1:PRESSURE" + ) + + XOMNY_ES1_LL1MF1_VOLTAGE = Cpt( + EpicsSignal, name="XOMNY_ES1_LL1MF1_VOLTAGE", read_pv="XOMNY-ES1-LL1MF1:VOLTAGE" + ) + + XOMNY_ES1_LL1MF1_PRESSURE = Cpt( + EpicsSignal, name="XOMNY_ES1_LL1MF1_PRESSURE", read_pv="XOMNY-ES1-LL1MF1:PRESSURE" + ) + + XOMNY_ES1_VV1MT1_VOLTAGE = Cpt( + EpicsSignal, name="XOMNY_ES1_VV1MT1_VOLTAGE", read_pv="XOMNY-ES1-VV1MT1:VOLTAGE" + ) + + XOMNY_ES1_VV1MT1_PRESSURE = Cpt( + EpicsSignal, name="XOMNY_ES1_VV1MT1_PRESSURE", read_pv="XOMNY-ES1-VV1MT1:PRESSURE" + ) + + XOMNY_ES1_EXPVG1_POSITION = Cpt( + EpicsSignal, name="XOMNY_ES1_EXPVG1_POSITION", read_pv="XOMNY-ES1-EXPVG1:POSITION" + ) + + XOMNY_ES1_WI2VG1_POSITION = Cpt( + EpicsSignal, name="XOMNY_ES1_WI2VG1_POSITION", read_pv="XOMNY-ES1-WI2VG1:POSITION" + ) + + XOMNY_ES1_EXPVG2_POSITION = Cpt( + EpicsSignal, name="XOMNY_ES1_EXPVG2_POSITION", read_pv="XOMNY-ES1-EXPVG2:POSITION" + ) + + XOMNY_ES1_LL1VG1_CLOSE = Cpt( + EpicsSignal, name="XOMNY_ES1_LL1VG1_CLOSE", read_pv="XOMNY-ES1-LL1VG1:CLOSE" + ) + + XOMNY_ES1_LL1DK_VG_CLOSED = Cpt( + EpicsSignal, name="XOMNY_ES1_LL1DK_VG_CLOSED", read_pv="XOMNY-ES1-LL1DK:VG_CLOSED" + ) + + XOMNY_ES1_LL1SH_VG_CLOSED = Cpt( + EpicsSignal, name="XOMNY_ES1_LL1SH_VG_CLOSED", read_pv="XOMNY-ES1-LL1SH:VG_CLOSED" + ) + + XOMNY_ES1_LL1SH_MAN_POS_OK = Cpt( + EpicsSignal, name="XOMNY_ES1_LL1SH_MAN_POS_OK", read_pv="XOMNY-ES1-LL1SH:MAN_POS_OK" + ) + + + def __init__(self, prefix="", *, name, **kwargs): + super().__init__(prefix, name=name, **kwargs) + self.XOMNY_ES1_LL1SH_MAN_POS_OK.subscribe(self._emit_value) + + def _emit_value(self, **kwargs): + timestamp = kwargs.pop("timestamp", time.time()) + self.wait_for_connection() + self._run_subs(sub_type=self.SUB_VALUE, timestamp=timestamp, obj=self) + + + def show_all(self): + red = "\x1b[91m" + white = "\x1b[0m" + + print("OMNY Vaccum Status") + if float(self.XOMNY_ES1_EXPMP1_VOLTAGE.get()) < 0.5: + print(red + " Main chamber: Sensor failure" + white) + else: + print(f" Main chamber: {float(self.XOMNY_ES1_EXPMP1_PRESSURE.get()):.2e} mbar") + + if float(self.XOMNY_ES1_PU2MT1_VOLTAGE.get()) < 0.5: + print(red + " Flight tube: Sensor failure" + white) + else: + print(f" Flight tube: {float(self.XOMNY_ES1_PU2MT1_PRESSURE.get()):.2e} mbar") + + if float(self.XOMNY_ES1_PU1MF1_VOLTAGE.get()) < 0.5: + print(red+" Beamline: Sensor failure"+white) + else: + print(f" Beamline: {float(self.XOMNY_ES1_PU1MF1_PRESSURE.get()):.2e} mbar") + + if float(self.XOMNY_ES1_LL1MF1_VOLTAGE.get()) < 0.5: + print(red+" LoadLock: Sensor failure"+white) + else: + print(f" LoadLock: {float(self.XOMNY_ES1_LL1MF1_PRESSURE.get()):.2e} mbar") + + if float(self.XOMNY_ES1_VV1MT1_VOLTAGE.get()) < 0.5: + print(red+" Pre-pump: Sensor failure"+white) + else: + print(f" Pre-pump: {float(self.XOMNY_ES1_VV1MT1_PRESSURE.get()):.2e} mbar") + + print("\nValve status") + print(f" Upstream gate valve: {str(self.XOMNY_ES1_EXPVG1_POSITION.get())}") +#printf (" Upstream window valve: %s\n", _ovcs_upstream_window_valve) + print(f" Downstream window bypass: {str(self.XOMNY_ES1_WI2VG1_POSITION.get())}") + print(f" Downstream gate valve: {str(self.XOMNY_ES1_EXPVG2_POSITION.get())}") + if self.valves_in_measurement_position(): + print("The OMNY valves are in the correct configuration to perform a measurement") + else: + print(red+"The valves of the OMNY vacuum system are not in the state for measurements."+white) + + + print("\nLoad Lock Status") + print(f" Chamber: {str(self.XOMNY_ES1_LL1VG1_CLOSE.get())}") + print(f" Dock: {str(self.XOMNY_ES1_LL1DK_VG_CLOSED.get())}") + print(f" Shuttle: {str(self.XOMNY_ES1_LL1SH_VG_CLOSED.get())}") + print(f" Manipulator status: {str(self.XOMNY_ES1_LL1SH_MAN_POS_OK.get())}") + + def valves_in_measurement_position(self): + if str(self.XOMNY_ES1_EXPVG1_POSITION.get()) == "OPEN" and str(self.XOMNY_ES1_WI2VG1_POSITION.get()) == "CLOSED" and str(self.XOMNY_ES1_EXPVG2_POSITION.get()) == "OPEN": + return True + else: + return False + + + def help(self): + print("Help for OMNY Vacuum System:") + print("This device shows an overview of the OMNY vacuum status. Use show_all()") \ No newline at end of file