Fitting and pneumatic valve

This commit is contained in:
gac-x06da
2025-02-06 18:04:01 +01:00
committed by mohacsi_i
parent 9ea249fff7
commit 963b775200
6 changed files with 126 additions and 53 deletions

View File

@@ -441,17 +441,17 @@ samstream:
readoutPriority: async
readOnly: false
softwareTrigger: false
samimg:
description: Sample camera image from EPICS
deviceClass: pxiii_bec.devices.NDArrayPreview
deviceConfig:
prefix: 'X06DA-SAMCAM:image1:'
deviceTags:
- detector
enabled: true
readoutPriority: async
readOnly: false
softwareTrigger: false
# samimg:
# description: Sample camera image from EPICS
# deviceClass: pxiii_bec.devices.NDArrayPreview
# deviceConfig:
# prefix: 'X06DA-SAMCAM:image1:'
# deviceTags:
# - detector
# enabled: true
# readoutPriority: async
# readOnly: false
# softwareTrigger: false
bstop_pneum:
@@ -492,8 +492,8 @@ bstop_z:
softwareTrigger: false
bstop_pneum:
description: Beamstop pneumatic
deviceClass: ophyd.EpicsSignal
deviceConfig: {read_pv: 'X06DA-ES-BS:GET-POS', write_pv: 'X06DA-ES-BS:SET-POS', put_complete: true}
deviceClass: pxiii_bec.devices.PneumaticValve
deviceConfig: {read_pv: 'X06DA-ES-BS:GET-POS', write_pv: 'X06DA-ES-BS:SET-POS', auto_monitor: true, put_complete: true}
onFailure: buffer
enabled: true
readoutPriority: monitored
@@ -511,7 +511,7 @@ bstop_diode:
frontlight:
description: Microscope frontlight
deviceClass: ophyd.EpicsSignal
deviceConfig: {read_pv: 'X06DA-ES-FL:SET-BRGHT', put_complete: true}
deviceConfig: {read_pv: 'X06DA-ES-FL:SET-BRGHT', kind: 'normal', put_complete: true}
onFailure: buffer
enabled: true
readoutPriority: monitored
@@ -519,8 +519,8 @@ frontlight:
softwareTrigger: false
backlight:
description: Backlight reflector
deviceClass: ophyd.EpicsSignal
deviceConfig: {read_pv: 'X06DA-ES-BL:GET-POS', write_pv: 'X06DA-ES-BL:SET-POS', put_complete: true}
deviceClass: pxiii_bec.devices.PneumaticValve
deviceConfig: {read_pv: 'X06DA-ES-BL:GET-POS', write_pv: 'X06DA-ES-BL:SET-POS', auto_monitor: true, put_complete: true}
onFailure: buffer
enabled: true
readoutPriority: monitored

View File

@@ -5,6 +5,7 @@ Created on Tue Jun 11 11:28:38 2024
@author: mohacsi_i
"""
import types
from collections import OrderedDict
from ophyd import Component, PVPositioner, Signal, EpicsSignal, EpicsSignalRO, Kind, PositionerBase
from ophyd.status import Status, MoveStatus
@@ -195,6 +196,18 @@ class A3200Axis(PVPositioner):
raise
return status
def describe(self):
"""Workaround to schema expected by the BEC"""
d = super().describe()
d[str(self.name)] = d[f"{self.name}_readback"]
return d
def read(self) -> OrderedDict[str, dict]:
"""Workaround to schema expected by the BEC"""
d = super().read()
d[str(self.name)] = d[f"{self.name}_readback"]
return d
def move(self, position, wait=True, timeout=None, moved_cb=None, **kwargs) -> MoveStatus:
"""Exposes the ophyd move command through BEC abstraction"""
return self.omove(position, wait=wait, timeout=timeout, moved_cb=moved_cb, **kwargs)

View File

@@ -0,0 +1,43 @@
from ophyd import EpicsSignal
from ophyd.status import SubscriptionStatus
class PneumaticValve(EpicsSignal):
"""Wrapper around EpicsSignal to wait until reaching target
NOTE: The SET and GET states do not match exactly
"""
def set(self, value, *, timeout=5, settle_time=0.1):
"""Overloaded settet that waits for target state"""
# Lazy hardcoded state lookup
target = 1 if value in (1, "Measure") else 2
# Define wait until the busy flag goes high
def on_target(*, value, **_):
return bool(value == target)
# Subscribe in advance and wait for update
status = SubscriptionStatus(self, on_target, timeout=timeout, settle_time=0.1)
# Set value to start movement
super().set(value, settle_time=settle_time).wait()
return status
def check_value(self, value):
"""Input validation"""
if value not in (0, 1, "Measure", "Park"):
raise ValueError(f"Unsupported pneumatic valve target {value}")
return super().check_value(value)
if __name__ == "__main__":
pneum = PneumaticValve(
read_pv="X06DA-ES-BS:GET-POS",
write_pv="X06DA-ES-BS:SET-POS",
auto_monitor=True,
put_complete=True,
name="bspump",
)
pneum.wait_for_connection()

View File

@@ -10,6 +10,7 @@ import time
from threading import Thread, Lock
import requests
from requests.adapters import HTTPAdapter, Retry
from collections import OrderedDict
from ophyd import Component, Kind, Signal, PVPositioner
from ophyd.status import SubscriptionStatus
@@ -209,6 +210,18 @@ class SmarGonAxis(PVPositioner):
status = SubscriptionStatus(self.readback, on_target, timeout=timeout, settle_time=0.1)
return status
def describe(self):
"""Workaround to schema expected by the BEC"""
d = super().describe()
d[str(self.name)] = d[f"{self.name}_readback"]
return d
def read(self) -> OrderedDict[str, dict]:
"""Workaround to schema expected by the BEC"""
d = super().read()
d[str(self.name)] = d[f"{self.name}_readback"]
return d
def _pos_changed(self, timestamp=None, value=None, **kwargs):
pass

View File

@@ -11,3 +11,4 @@ from .SmarGonB import SmarGonAxis as SmarGonAxisB
from .StdDaqPreview import StdDaqPreviewDetector
from .NDArrayPreview import NDArrayPreview
from .SamCamDetector import SamCamDetector
from .PneumaticValve import PneumaticValve

View File

@@ -1,58 +1,61 @@
from bec_widgets.cli.client_utils import BECGuiClient
def bl_check_beam():
"""Check beamline status before scan"""
return True
def ascan(
motor,
scan_start,
scan_end,
steps,
exp_time,
datasource,
**kwargs
):
def ascan(motor, scan_start, scan_end, steps, exp_time, datasource, visual=True, **kwargs):
"""Demo step scan with plotting
This is a small BEC user-space demo step scan. It tries to be a
This is a simple user-space demo step scan with the BEC. It be a
standard BEC scan, while still setting up the environment.
Example:
--------
ascan(dev.dccm_energy, 12,13, steps=21, exp_time=0.1, datasource=dev.dccm_xbpm)
"""
# Dummy method to check beamline status
if not bl_check_beam():
raise RuntimeError("Beamline is not in ready state")
# # GUI setup
# # Get or create gui
gui = bec.gui
print(gui.windows)
# Use existing main window
# window = gui.windows['main'].widget
if visual:
# Get or create scan specific window
window = None
for _, val in bec.gui.windows.items():
if val.title == "CurrentScan":
window = val.widget
window.clear_all()
if window is None:
window = bec.gui.new("CurrentScan")
# Create scan specific window
window = None
for _, val in gui.windows.items():
if val.title == "CurrentScan":
window = val.widget
window.clear_all()
if window is None:
window = gui.new("CurrentScan")
dock = window.add_dock(f"ScanDisplay {motor}")
plt1 = dock.add_widget('BECWaveformWidget')
plt1.plot(x_name=motor, y_name=datasource)
plt1.set_x_label(motor)
plt1.set_y_label(datasource)
gui.show()
# Draw a simploe plot in the window
dock = window.add_dock(f"ScanDisplay {motor}")
plt1 = dock.add_widget("BECWaveformWidget")
plt1.plot(x_name=motor, y_name=datasource)
plt1.set_x_label(motor)
plt1.set_y_label(datasource)
plt1.add_dap(motor, datasource, dap="LinearModel")
window.show()
print("Handing over to 'scans.line_scan'")
if 'relative' in kwargs:
del kwargs['relative']
scans.line_scan(motor, scan_start, scan_end, steps=steps, exp_time=exp_time, relative=False, **kwargs)
if "relative" in kwargs:
del kwargs["relative"]
s = scans.line_scan(
motor, scan_start, scan_end, steps=steps, exp_time=exp_time, relative=False, **kwargs
)
if visual:
fit = plt1.get_dap_params()
else:
fit = bec.dap.LinearModel.fit(s, motor.name, motor.name, datasource.name, datasource.name)
# fit = bec.dap.LinearModel(motor, datasource)
# If fitting via GUI
# firt_par = plt1.get_dap_params()
return s, fit
# # Some basic fit
# dkey = datasource.full_name
# datapoints = bec.history[-1].devices[dkey].read()[dkey]['value']
# positions