diff --git a/csaxs_bec/device_configs/flomni_config.yaml b/csaxs_bec/device_configs/flomni_config.yaml index 0e39c95..0d1ddc3 100644 --- a/csaxs_bec/device_configs/flomni_config.yaml +++ b/csaxs_bec/device_configs/flomni_config.yaml @@ -431,6 +431,17 @@ flomni_temphum: onFailure: buffer readOnly: false readoutPriority: baseline +# ############################################################ +# ########## OMNY / flOMNI / LamNI fast shutter ############## +# ############################################################ +omnyfsh: + description: omnyfsh connects to read fast shutter at X12 if in that network + deviceClass: csaxs_bec.devices.omny.shutter.OMNYFastEpicsShutter + deviceConfig: {} + enabled: true + onFailure: buffer + readOnly: false + readoutPriority: baseline ############################################################ #################### GUI Signals ########################### ############################################################ @@ -441,4 +452,13 @@ omny_xray_gui: enabled: true onFailure: buffer readOnly: false - readoutPriority: on_request \ No newline at end of file + readoutPriority: on_request + +calculated_signal: + description: Calculated signal from alignment for fit + deviceClass: ophyd_devices.ComputedSignal + deviceConfig: + compute_method: "def just_rand():\n return 42" + enabled: true + readOnly: false + readoutPriority: baseline \ No newline at end of file diff --git a/csaxs_bec/devices/omny/shutter.py b/csaxs_bec/devices/omny/shutter.py new file mode 100644 index 0000000..3ffd7e6 --- /dev/null +++ b/csaxs_bec/devices/omny/shutter.py @@ -0,0 +1,82 @@ +import time +import socket +from ophyd import Component as Cpt +from ophyd import Device +from ophyd import EpicsSignal + + +class OMNYFastEpicsShutterError(Exception): + pass + + +def _detect_host_pv(): + """Detect host subnet and return appropriate PV name.""" + try: + hostname = socket.gethostname() + local_ip = socket.gethostbyname(hostname) + if local_ip.startswith("129.129.122."): + return "X12SA-ES1-TTL:OUT_01" + else: + return "XOMNYI-XEYE-DUMMYSHUTTER:0" + except Exception as ex: + print(f"Warning: could not detect IP subnet ({ex}), using dummy shutter.") + return "XOMNYI-XEYE-DUMMYSHUTTER:0" + + +class OMNYFastEpicsShutter(Device): + """ + Fast EPICS shutter with automatic PV selection based on host subnet. + """ + + USER_ACCESS = ["fshopen", "fshclose", "fshstatus", "fshinfo", "help"] + SUB_VALUE = "value" + _default_sub = SUB_VALUE + + # PV is detected dynamically at import time + shutter = Cpt(EpicsSignal, name="shutter", read_pv=_detect_host_pv(), auto_monitor=True) + + def __init__(self, prefix="", *, name, **kwargs): + super().__init__(prefix, name=name, **kwargs) + self.shutter.subscribe(self._emit_value) + + def _emit_value(self, **kwargs): + timestamp = kwargs.pop("timestamp", time.time()) + self.wait_for_connection() + self._run_subs(sub_type=self.SUB_VALUE, timestamp=timestamp, obj=self) + + # ----------------------------------------------------- + # User-facing shutter control functions + # ----------------------------------------------------- + + def fshopen(self): + """Open the fast shutter.""" + try: + self.shutter.put(1, wait=True) + except Exception as ex: + raise OMNYFastEpicsShutterError(f"Failed to open shutter: {ex}") + + def fshclose(self): + """Close the fast shutter.""" + try: + self.shutter.put(0, wait=True) + except Exception as ex: + raise OMNYFastEpicsShutterError(f"Failed to close shutter: {ex}") + + def fshstatus(self): + """Return the fast shutter status (0=closed, 1=open).""" + try: + return self.shutter.get() + except Exception as ex: + raise OMNYFastEpicsShutterError(f"Failed to read shutter status: {ex}") + + def fshinfo(self): + """Print information about which EPICS PV channel is being used.""" + pvname = self.shutter.pvname + print(f"Fast shutter connected to EPICS channel: {pvname}") + return pvname + + def help(self): + """Display available user methods.""" + print("Available methods:") + for method in self.USER_ACCESS: + print(f" - {method}")