Fitting and pneumatic valve
This commit is contained in:
@@ -441,17 +441,17 @@ samstream:
|
||||
readoutPriority: async
|
||||
readOnly: false
|
||||
softwareTrigger: false
|
||||
samimg:
|
||||
description: Sample camera image from EPICS
|
||||
deviceClass: pxiii_bec.devices.NDArrayPreview
|
||||
deviceConfig:
|
||||
prefix: 'X06DA-SAMCAM:image1:'
|
||||
deviceTags:
|
||||
- detector
|
||||
enabled: true
|
||||
readoutPriority: async
|
||||
readOnly: false
|
||||
softwareTrigger: false
|
||||
# samimg:
|
||||
# description: Sample camera image from EPICS
|
||||
# deviceClass: pxiii_bec.devices.NDArrayPreview
|
||||
# deviceConfig:
|
||||
# prefix: 'X06DA-SAMCAM:image1:'
|
||||
# deviceTags:
|
||||
# - detector
|
||||
# enabled: true
|
||||
# readoutPriority: async
|
||||
# readOnly: false
|
||||
# softwareTrigger: false
|
||||
|
||||
|
||||
bstop_pneum:
|
||||
@@ -492,8 +492,8 @@ bstop_z:
|
||||
softwareTrigger: false
|
||||
bstop_pneum:
|
||||
description: Beamstop pneumatic
|
||||
deviceClass: ophyd.EpicsSignal
|
||||
deviceConfig: {read_pv: 'X06DA-ES-BS:GET-POS', write_pv: 'X06DA-ES-BS:SET-POS', put_complete: true}
|
||||
deviceClass: pxiii_bec.devices.PneumaticValve
|
||||
deviceConfig: {read_pv: 'X06DA-ES-BS:GET-POS', write_pv: 'X06DA-ES-BS:SET-POS', auto_monitor: true, put_complete: true}
|
||||
onFailure: buffer
|
||||
enabled: true
|
||||
readoutPriority: monitored
|
||||
@@ -511,7 +511,7 @@ bstop_diode:
|
||||
frontlight:
|
||||
description: Microscope frontlight
|
||||
deviceClass: ophyd.EpicsSignal
|
||||
deviceConfig: {read_pv: 'X06DA-ES-FL:SET-BRGHT', put_complete: true}
|
||||
deviceConfig: {read_pv: 'X06DA-ES-FL:SET-BRGHT', kind: 'normal', put_complete: true}
|
||||
onFailure: buffer
|
||||
enabled: true
|
||||
readoutPriority: monitored
|
||||
@@ -519,8 +519,8 @@ frontlight:
|
||||
softwareTrigger: false
|
||||
backlight:
|
||||
description: Backlight reflector
|
||||
deviceClass: ophyd.EpicsSignal
|
||||
deviceConfig: {read_pv: 'X06DA-ES-BL:GET-POS', write_pv: 'X06DA-ES-BL:SET-POS', put_complete: true}
|
||||
deviceClass: pxiii_bec.devices.PneumaticValve
|
||||
deviceConfig: {read_pv: 'X06DA-ES-BL:GET-POS', write_pv: 'X06DA-ES-BL:SET-POS', auto_monitor: true, put_complete: true}
|
||||
onFailure: buffer
|
||||
enabled: true
|
||||
readoutPriority: monitored
|
||||
|
||||
@@ -5,6 +5,7 @@ Created on Tue Jun 11 11:28:38 2024
|
||||
@author: mohacsi_i
|
||||
"""
|
||||
import types
|
||||
from collections import OrderedDict
|
||||
from ophyd import Component, PVPositioner, Signal, EpicsSignal, EpicsSignalRO, Kind, PositionerBase
|
||||
from ophyd.status import Status, MoveStatus
|
||||
|
||||
@@ -195,6 +196,18 @@ class A3200Axis(PVPositioner):
|
||||
raise
|
||||
return status
|
||||
|
||||
def describe(self):
|
||||
"""Workaround to schema expected by the BEC"""
|
||||
d = super().describe()
|
||||
d[str(self.name)] = d[f"{self.name}_readback"]
|
||||
return d
|
||||
|
||||
def read(self) -> OrderedDict[str, dict]:
|
||||
"""Workaround to schema expected by the BEC"""
|
||||
d = super().read()
|
||||
d[str(self.name)] = d[f"{self.name}_readback"]
|
||||
return d
|
||||
|
||||
def move(self, position, wait=True, timeout=None, moved_cb=None, **kwargs) -> MoveStatus:
|
||||
"""Exposes the ophyd move command through BEC abstraction"""
|
||||
return self.omove(position, wait=wait, timeout=timeout, moved_cb=moved_cb, **kwargs)
|
||||
|
||||
43
pxiii_bec/devices/PneumaticValve.py
Normal file
43
pxiii_bec/devices/PneumaticValve.py
Normal file
@@ -0,0 +1,43 @@
|
||||
from ophyd import EpicsSignal
|
||||
from ophyd.status import SubscriptionStatus
|
||||
|
||||
|
||||
class PneumaticValve(EpicsSignal):
|
||||
"""Wrapper around EpicsSignal to wait until reaching target
|
||||
|
||||
NOTE: The SET and GET states do not match exactly
|
||||
"""
|
||||
|
||||
def set(self, value, *, timeout=5, settle_time=0.1):
|
||||
"""Overloaded settet that waits for target state"""
|
||||
# Lazy hardcoded state lookup
|
||||
target = 1 if value in (1, "Measure") else 2
|
||||
|
||||
# Define wait until the busy flag goes high
|
||||
def on_target(*, value, **_):
|
||||
return bool(value == target)
|
||||
|
||||
# Subscribe in advance and wait for update
|
||||
status = SubscriptionStatus(self, on_target, timeout=timeout, settle_time=0.1)
|
||||
|
||||
# Set value to start movement
|
||||
super().set(value, settle_time=settle_time).wait()
|
||||
|
||||
return status
|
||||
|
||||
def check_value(self, value):
|
||||
"""Input validation"""
|
||||
if value not in (0, 1, "Measure", "Park"):
|
||||
raise ValueError(f"Unsupported pneumatic valve target {value}")
|
||||
return super().check_value(value)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
pneum = PneumaticValve(
|
||||
read_pv="X06DA-ES-BS:GET-POS",
|
||||
write_pv="X06DA-ES-BS:SET-POS",
|
||||
auto_monitor=True,
|
||||
put_complete=True,
|
||||
name="bspump",
|
||||
)
|
||||
pneum.wait_for_connection()
|
||||
@@ -10,6 +10,7 @@ import time
|
||||
from threading import Thread, Lock
|
||||
import requests
|
||||
from requests.adapters import HTTPAdapter, Retry
|
||||
from collections import OrderedDict
|
||||
from ophyd import Component, Kind, Signal, PVPositioner
|
||||
from ophyd.status import SubscriptionStatus
|
||||
|
||||
@@ -209,6 +210,18 @@ class SmarGonAxis(PVPositioner):
|
||||
status = SubscriptionStatus(self.readback, on_target, timeout=timeout, settle_time=0.1)
|
||||
return status
|
||||
|
||||
def describe(self):
|
||||
"""Workaround to schema expected by the BEC"""
|
||||
d = super().describe()
|
||||
d[str(self.name)] = d[f"{self.name}_readback"]
|
||||
return d
|
||||
|
||||
def read(self) -> OrderedDict[str, dict]:
|
||||
"""Workaround to schema expected by the BEC"""
|
||||
d = super().read()
|
||||
d[str(self.name)] = d[f"{self.name}_readback"]
|
||||
return d
|
||||
|
||||
def _pos_changed(self, timestamp=None, value=None, **kwargs):
|
||||
pass
|
||||
|
||||
|
||||
@@ -11,3 +11,4 @@ from .SmarGonB import SmarGonAxis as SmarGonAxisB
|
||||
from .StdDaqPreview import StdDaqPreviewDetector
|
||||
from .NDArrayPreview import NDArrayPreview
|
||||
from .SamCamDetector import SamCamDetector
|
||||
from .PneumaticValve import PneumaticValve
|
||||
|
||||
@@ -1,58 +1,61 @@
|
||||
from bec_widgets.cli.client_utils import BECGuiClient
|
||||
|
||||
|
||||
def bl_check_beam():
|
||||
"""Check beamline status before scan"""
|
||||
return True
|
||||
|
||||
|
||||
def ascan(
|
||||
motor,
|
||||
scan_start,
|
||||
scan_end,
|
||||
steps,
|
||||
exp_time,
|
||||
datasource,
|
||||
**kwargs
|
||||
):
|
||||
def ascan(motor, scan_start, scan_end, steps, exp_time, datasource, visual=True, **kwargs):
|
||||
"""Demo step scan with plotting
|
||||
|
||||
This is a small BEC user-space demo step scan. It tries to be a
|
||||
This is a simple user-space demo step scan with the BEC. It be a
|
||||
standard BEC scan, while still setting up the environment.
|
||||
|
||||
Example:
|
||||
--------
|
||||
ascan(dev.dccm_energy, 12,13, steps=21, exp_time=0.1, datasource=dev.dccm_xbpm)
|
||||
"""
|
||||
# Dummy method to check beamline status
|
||||
if not bl_check_beam():
|
||||
raise RuntimeError("Beamline is not in ready state")
|
||||
|
||||
# # GUI setup
|
||||
# # Get or create gui
|
||||
gui = bec.gui
|
||||
print(gui.windows)
|
||||
# Use existing main window
|
||||
# window = gui.windows['main'].widget
|
||||
if visual:
|
||||
# Get or create scan specific window
|
||||
window = None
|
||||
for _, val in bec.gui.windows.items():
|
||||
if val.title == "CurrentScan":
|
||||
window = val.widget
|
||||
window.clear_all()
|
||||
if window is None:
|
||||
window = bec.gui.new("CurrentScan")
|
||||
|
||||
# Create scan specific window
|
||||
window = None
|
||||
for _, val in gui.windows.items():
|
||||
if val.title == "CurrentScan":
|
||||
window = val.widget
|
||||
window.clear_all()
|
||||
if window is None:
|
||||
window = gui.new("CurrentScan")
|
||||
|
||||
dock = window.add_dock(f"ScanDisplay {motor}")
|
||||
plt1 = dock.add_widget('BECWaveformWidget')
|
||||
plt1.plot(x_name=motor, y_name=datasource)
|
||||
plt1.set_x_label(motor)
|
||||
plt1.set_y_label(datasource)
|
||||
gui.show()
|
||||
# Draw a simploe plot in the window
|
||||
dock = window.add_dock(f"ScanDisplay {motor}")
|
||||
plt1 = dock.add_widget("BECWaveformWidget")
|
||||
plt1.plot(x_name=motor, y_name=datasource)
|
||||
plt1.set_x_label(motor)
|
||||
plt1.set_y_label(datasource)
|
||||
plt1.add_dap(motor, datasource, dap="LinearModel")
|
||||
window.show()
|
||||
|
||||
print("Handing over to 'scans.line_scan'")
|
||||
if 'relative' in kwargs:
|
||||
del kwargs['relative']
|
||||
scans.line_scan(motor, scan_start, scan_end, steps=steps, exp_time=exp_time, relative=False, **kwargs)
|
||||
if "relative" in kwargs:
|
||||
del kwargs["relative"]
|
||||
s = scans.line_scan(
|
||||
motor, scan_start, scan_end, steps=steps, exp_time=exp_time, relative=False, **kwargs
|
||||
)
|
||||
|
||||
if visual:
|
||||
fit = plt1.get_dap_params()
|
||||
else:
|
||||
fit = bec.dap.LinearModel.fit(s, motor.name, motor.name, datasource.name, datasource.name)
|
||||
|
||||
# fit = bec.dap.LinearModel(motor, datasource)
|
||||
|
||||
# If fitting via GUI
|
||||
# firt_par = plt1.get_dap_params()
|
||||
|
||||
return s, fit
|
||||
|
||||
# # Some basic fit
|
||||
# dkey = datasource.full_name
|
||||
# datapoints = bec.history[-1].devices[dkey].read()[dkey]['value']
|
||||
# positions
|
||||
|
||||
Reference in New Issue
Block a user