7 Commits

Author SHA1 Message Date
Unknown MX Person
d1844eaeba WIP 2025-03-25 17:42:55 +01:00
gac-x06da
d53ec4c14c WIP 2025-03-21 09:57:25 +01:00
gac-x06da
4091f11b0a WIP 2025-03-21 09:27:29 +01:00
gac-x06da
29ae5c196b WIP 2025-03-14 16:10:16 +01:00
gac-x06da
74521da7b3 Blacking 2025-03-14 12:40:23 +01:00
gac-x06da
a5f844b816 Keyword scans for 2D 2025-03-14 12:32:57 +01:00
gac-x06da
34d4d6ef8c Added AttributeError to updates 2025-03-14 12:32:57 +01:00
6 changed files with 185 additions and 34 deletions

View File

@@ -4,9 +4,22 @@ from bec_widgets.cli.rpc.rpc_base import RPCResponseTimeoutError
class PlotUpdate(AutoUpdates):
create_default_dock = True
dock_name = "default_dock"
enabled = True
_scan_msg = None
# def __init__(self, gui: BECDockArea):
# super().__init__(gui)
def start_default_dock(self):
"""
Create a default dock for the auto updates.
"""
self._default_dock = self.gui.add_dock(self.dock_name)
self._default_dock.add_widget("BECWaveformWidget")
self._default_fig = self._default_dock.widget_list[0]
def do_update(self, msg):
"""Save the original scan message for future use"""
self._scan_msg = msg
@@ -24,39 +37,128 @@ class PlotUpdate(AutoUpdates):
# plt = self.figure.plot(dev_x, dev_y)
# plt.set(title=f"PXIII: Scan {info.scan_number}", x_label=dev_x, y_label=dev_y)
def keyword_handler(self, info: ScanInfo) -> None:
"""Simple keyword handler
def plot_handler(self, info: ScanInfo) -> None:
"""Simple keyword handler for 'plot'
This simple keyword handler looks for the keyword 'datasource' in the scan arguments.
This simple keyword handler looks for the keyword 'plot' in the scan arguments.
This allows the user to explictly specify the desired data source. Useful for alignment
scans.
"""
print(info.scan_report_devices)
dev_x = info.scan_report_devices[0]
# Keyword lookup for 'plot' and 'fit'
if "kwargs" in self._scan_msg.info:
dev_y = self._scan_msg.info["kwargs"].get("datasource", None)
signals = self._scan_msg.info["kwargs"].get("plot", None)
fit = self._scan_msg.info["kwargs"].get("fit", None)
else:
dev_y = None
if not dev_y:
signals = None
fit = None
if not signals:
return
if isinstance(signals, str):
signals = [signals]
if isinstance(fit, str):
fit = [fit]
# try:
# self.gui.clear_all()
# except RPCResponseTimeoutError:
# pass
# try:
# self._default_dock = self.gui.add_dock(self.dock_name)
# except RPCResponseTimeoutError:
# pass
# try:
# self._default_fig = self._default_dock.add_widget("BECWaveformWidget")
# except RPCResponseTimeoutError:
# pass
# try:
# self._default_fig = self._default_dock.widget_list[0]
# except RPCResponseTimeoutError:
# pass
# # dck1 = self._default_dock
# # dck1.clear_all()
plt1 = self.get_default_figure()
# # Clear figure
# try:
# old_data = yield plt1.get_all_data()
# except RPCResponseTimeoutError:
# old_data = [1,2,3,4,5,6,7,8,9]
# print(old_data)
# for _ in range(len(old_data)):
# try:
# plt1.remove_curve(-1)
# except RPCResponseTimeoutError:
# pass
# except Exception as ex:
# print(f"{ex}\t{type(ex)}")
# clear_all() will throw RPCResponseTimeoutError
# try:
# plt1.clear_all()
# except RPCResponseTimeoutError:
# pass
print(type(plt1), plt1)
print(f"Plotted signals: {signals}")
# plot() will throw RPCResponseTimeoutError
try:
# This will throw RPCResponseTimeoutError
plt1.clear_all()
if len(info.scan_report_devices) == 2:
# 2D plot
dev_x = info.scan_report_devices[0]
dev_y = info.scan_report_devices[1]
plt1.plot(
x_name=dev_x,
y_name=dev_y,
z_name=signals[0],
title=f"Scan {info.scan_number}",
x_label=dev_x,
y_label=dev_y,
z_label=signals[0],
)
elif len(info.scan_report_devices) == 1:
# 1D plot
dev_x = info.scan_report_devices[0]
for sig in signals:
try:
plt1.plot(
x_name=dev_x,
y_name=sig,
title=f"Scan {info.scan_number}",
x_label=dev_x,
y_label=sig,
)
except RPCResponseTimeoutError:
pass
else:
# Default is 1D
dev_x = info.scan_report_devices[0]
for sig in signals:
try:
plt1.plot(
x_name=dev_x,
y_name=sig,
title=f"Scan {info.scan_number}",
x_label=dev_x,
y_label=sig,
)
except RPCResponseTimeoutError:
pass
except RPCResponseTimeoutError:
pass
try:
# TODO: What about 2D scans?
# This will throw RPCResponseTimeoutError
plt1.plot(x_name=dev_x, y_name=dev_y)
except RPCResponseTimeoutError:
pass
try:
# This will throw RPCResponseTimeoutError
plt1.set(title=f"PXIII: Scan {info.scan_number}", x_label=dev_x, y_label=dev_y)
except RPCResponseTimeoutError:
pass
# plt1.add_dap(dev_x, dev_y, dap="LinearModel")
if fit is not None and len(fit):
dev_x = info.scan_report_devices[0]
sig = signals[0]
plt1.add_dap(dev_x, sig, dap=fit[0])
def handler(self, info: ScanInfo) -> None:
"""Dock configuration handler"""
@@ -68,4 +170,4 @@ class PlotUpdate(AutoUpdates):
# self.run_grid_scan_update(info)
# return
super().handler(info)
self.keyword_handler(info)
self.plot_handler(info)

View File

@@ -1,3 +1,39 @@
sls_current:
description: SLS current
deviceClass: ophyd.EpicsSignalRO
deviceConfig: {read_pv: 'ARS07-DPCT-0100:CURR', auto_monitor: true}
onFailure: buffer
enabled: true
readoutPriority: monitored
deviceTags:
- ring
- fe
readOnly: true
softwareTrigger: false
vg0_press:
description: VG0 pressure
deviceClass: ophyd.EpicsSignalRO
deviceConfig: {read_pv: 'X06DA-FE-VMCC-0000:PRESSURE', auto_monitor: true}
onFailure: buffer
enabled: true
readoutPriority: monitored
deviceTags:
- fe
readOnly: true
softwareTrigger: false
abs_press:
description: Absorber pressure
deviceClass: ophyd.EpicsSignalRO
deviceConfig: {read_pv: 'X06DA-FE-ABS1-VMCC-1010:PRESSURE', auto_monitor: true}
onFailure: buffer
enabled: true
readoutPriority: monitored
deviceTags:
- fe
readOnly: true
softwareTrigger: false
sldi_cenx:
description: FE slit-diaphragm horizontal center
deviceClass: ophyd.EpicsMotor
@@ -5,33 +41,41 @@ sldi_cenx:
onFailure: buffer
enabled: true
readoutPriority: monitored
deviceTags:
- fe
readOnly: false
softwareTrigger: false
sldi_sizex:
description: FE slit-diaphragm horizontal size
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-FE-SLDI:GAPX'}
deviceConfig: {prefix: 'X06DA-FE-SLDI:SIZEX'}
onFailure: buffer
enabled: true
readoutPriority: monitored
deviceTags:
- fe
readOnly: false
softwareTrigger: false
sldi_ceny:
description: FE slit-diaphragm vertical center
deviceClass: ophyd.EpicsMotor
deviceClass: ophyd_devices.EpicsMotorEC
deviceConfig: {prefix: 'X06DA-FE-SLDI:CENY'}
onFailure: buffer
enabled: true
readoutPriority: monitored
deviceTags:
- fe
readOnly: false
softwareTrigger: false
sldi_sizey:
description: FE slit-diaphragm vertical size
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-FE-SLDI:GAPY'}
deviceConfig: {prefix: 'X06DA-FE-SLDI:SIZEY'}
onFailure: buffer
enabled: true
readoutPriority: monitored
deviceTags:
- fe
readOnly: false
softwareTrigger: false

View File

@@ -102,6 +102,8 @@ class StdDaqPreviewMixin(CustomDetectorMixin):
self.parent.array_counter.put(header["frame"], force=True)
self.parent.ndimensions.put(len(header["shape"]), force=True)
self.parent.array_size.put(header["shape"], force=True)
self.parent.array_average.put(np.mean(image), force=True)
# self.parent.array_data.put(data, force=True)
self.parent.shaped_image.put(image, force=True)
@@ -152,6 +154,7 @@ class StdDaqPreviewDetector(PSIDetectorBase):
url = Component(Signal, kind=Kind.config, metadata={"write_access": False})
throttle = Component(Signal, value=0.25, kind=Kind.config)
# Streamed data status
array_average = Component(Signal, kind=Kind.hinted, metadata={"write_access": False})
array_counter = Component(Signal, kind=Kind.hinted, metadata={"write_access": False})
ndimensions = Component(Signal, kind=Kind.normal, metadata={"write_access": False})
array_size = Component(Signal, kind=Kind.normal, metadata={"write_access": False})

View File

@@ -1,5 +1,6 @@
import bec
import bec_lib.devicemanager.DeviceContainer as dev
# pylint: disable=undefined-variable
# import bec
# import bec_lib.devicemanager.DeviceContainer as dev
def rock(steps, exp_time, scan_start=None, scan_end=None, datasource=None, visual=True, **kwargs):

View File

@@ -1,5 +1,5 @@
import bec
import bec_lib.devicemanager.DeviceContainer as dev
# import bec
# import bec_lib.devicemanager.DeviceContainer as dev
def bl_check_beam():
@@ -13,7 +13,7 @@ def ascan(
scan_end,
steps,
exp_time,
datasource=None,
plot=None,
visual=True,
relative=False,
**kwargs,
@@ -44,10 +44,10 @@ def ascan(
# Draw a simploe plot in the window
dock = window.add_dock(f"ScanDisplay {motor}")
plt1 = dock.add_widget("BECWaveformWidget")
plt1.plot(x_name=motor, y_name=datasource)
plt1.plot(x_name=motor, y_name=plot)
plt1.set_x_label(motor)
plt1.set_y_label(datasource)
plt1.add_dap(motor, datasource, dap="LinearModel")
plt1.set_y_label(plot)
plt1.add_dap(motor, plot, dap="LinearModel")
window.show()
print("Handing over to 'scans.line_scan'")
@@ -57,7 +57,7 @@ def ascan(
scan_end,
steps=steps,
exp_time=exp_time,
datasource=datasource,
plot=plot,
relative=relative,
**kwargs,
)
@@ -68,7 +68,7 @@ def ascan(
else:
# Fitting without GUI
firt_par = bec.dap.LinearModel.fit(
s, motor.name, motor.name, datasource.name, datasource.name
s, motor.name, motor.name, plot.name, plot.name
)
# # Some basic fit

View File

@@ -16,6 +16,7 @@ dependencies = [
"bec_ipython_client",
"bec_lib",
"bec_server",
"bec_widgets",
"ophyd_devices",
"std_daq_client",
"rich",