WIP
This commit is contained in:
@@ -1,9 +1,11 @@
|
||||
from bec_widgets.cli.auto_updates import AutoUpdates, ScanInfo
|
||||
from bec_widgets.cli.rpc.rpc_base import RPCResponseTimeoutError
|
||||
|
||||
|
||||
class PlotUpdate(AutoUpdates):
|
||||
create_default_dock = True
|
||||
enabled = True
|
||||
_scan_msg = None
|
||||
|
||||
def do_update(self, msg):
|
||||
"""Save the original scan message for future use"""
|
||||
@@ -23,7 +25,7 @@ class PlotUpdate(AutoUpdates):
|
||||
# plt.set(title=f"PXIII: Scan {info.scan_number}", x_label=dev_x, y_label=dev_y)
|
||||
|
||||
def keyword_handler(self, info: ScanInfo) -> None:
|
||||
""" Simple keyword handler
|
||||
"""Simple keyword handler
|
||||
|
||||
This simple keyword handler looks for the keyword 'datasource' in the scan arguments.
|
||||
This allows the user to explictly specify the desired data source. Useful for alignment
|
||||
@@ -36,26 +38,28 @@ class PlotUpdate(AutoUpdates):
|
||||
dev_y = None
|
||||
if not dev_y:
|
||||
return
|
||||
|
||||
|
||||
plt1 = self.get_default_figure()
|
||||
try:
|
||||
# This will throw RPCResponseTimeoutError
|
||||
plt1.clear_all()
|
||||
except Exception:
|
||||
except RPCResponseTimeoutError:
|
||||
pass
|
||||
try:
|
||||
# TODO: What about 2D scans?
|
||||
# This will throw RPCResponseTimeoutError
|
||||
plt1.plot(x_name=dev_x, y_name=dev_y)
|
||||
except Exception:
|
||||
except RPCResponseTimeoutError:
|
||||
pass
|
||||
try:
|
||||
# This will throw RPCResponseTimeoutError
|
||||
plt1.set(title=f"PXIII: Scan {info.scan_number}", x_label=dev_x, y_label=dev_y)
|
||||
except Exception:
|
||||
except RPCResponseTimeoutError:
|
||||
pass
|
||||
# plt1.add_dap(dev_x, dev_y, dap="LinearModel")
|
||||
|
||||
def handler(self, info: ScanInfo) -> None:
|
||||
"""Dock configuration handler"""
|
||||
# EXAMPLES:
|
||||
# if info.scan_name == "line_scan" and info.scan_report_devices:
|
||||
# self.simple_line_scan(info)
|
||||
@@ -65,4 +69,3 @@ class PlotUpdate(AutoUpdates):
|
||||
# return
|
||||
super().handler(info)
|
||||
self.keyword_handler(info)
|
||||
|
||||
|
||||
@@ -98,8 +98,8 @@ class AerotechAbrMixin(CustomPrepare):
|
||||
d["var_5"] = 0
|
||||
d["var_6"] = 0
|
||||
d["var_7"] = 0
|
||||
d["var_8"] = 0
|
||||
d["var_9"] = 0
|
||||
# d["var_8"] = 0
|
||||
# d["var_9"] = 0
|
||||
if scanname in ("verticallinescan", "vlinescan"):
|
||||
d["scan_command"] = AbrCmd.VERTICAL_LINE_SCAN
|
||||
d["var_1"] = scanargs["range"] / scanargs["steps"]
|
||||
@@ -109,8 +109,8 @@ class AerotechAbrMixin(CustomPrepare):
|
||||
d["var_5"] = 0
|
||||
d["var_6"] = 0
|
||||
d["var_7"] = 0
|
||||
d["var_8"] = 0
|
||||
d["var_9"] = 0
|
||||
# d["var_8"] = 0
|
||||
# d["var_9"] = 0
|
||||
if scanname in ("screeningscan"):
|
||||
d["scan_command"] = AbrCmd.SCREENING
|
||||
d["var_1"] = scanargs["start"]
|
||||
@@ -120,8 +120,8 @@ class AerotechAbrMixin(CustomPrepare):
|
||||
d["var_5"] = scanargs["steps"]
|
||||
d["var_6"] = scanargs.get("delta", 0.5)
|
||||
d["var_7"] = 0
|
||||
d["var_8"] = 0
|
||||
d["var_9"] = 0
|
||||
# d["var_8"] = 0
|
||||
# d["var_9"] = 0
|
||||
if scanname in ("rasterscan", "rastersimplescan"):
|
||||
d["scan_command"] = AbrCmd.RASTER_SCAN_SIMPLE
|
||||
d["var_1"] = scanargs["exp_time"]
|
||||
@@ -131,8 +131,8 @@ class AerotechAbrMixin(CustomPrepare):
|
||||
d["var_5"] = scanargs["steps_y"]
|
||||
d["var_6"] = 0
|
||||
d["var_7"] = 0
|
||||
d["var_8"] = 0
|
||||
d["var_9"] = 0
|
||||
# d["var_8"] = 0
|
||||
# d["var_9"] = 0
|
||||
|
||||
# Reconfigure if got a valid scan config
|
||||
if len(d) > 0:
|
||||
|
||||
@@ -13,7 +13,8 @@ Created on Thu Jan 30 2025
|
||||
"""
|
||||
from ophyd import ADComponent
|
||||
from ophyd_devices.devices.areadetector.cam import GenICam
|
||||
from ophyd_devices.devices.areadetector.plugins import ImagePlugin_V35
|
||||
|
||||
# from ophyd_devices.devices.areadetector.plugins import ImagePlugin_V35
|
||||
from ophyd_devices.interfaces.base_classes.psi_detector_base import (
|
||||
PSIDetectorBase,
|
||||
CustomDetectorMixin,
|
||||
@@ -25,6 +26,8 @@ logger = bec_logger.logger
|
||||
|
||||
|
||||
class SamCamSetup(CustomDetectorMixin):
|
||||
"""Simple camera mixin class, the SAMCAM is usually streaming"""
|
||||
|
||||
def on_stage(self):
|
||||
"""Just make sure it's running continously"""
|
||||
self.parent.cam.acquire.put(1, wait=True)
|
||||
@@ -40,7 +43,7 @@ class SamCamDetector(PSIDetectorBase):
|
||||
"""Sample camera device
|
||||
|
||||
The SAMCAM continously streams images to the GUI and sample alignment
|
||||
scripts via ZMQ.
|
||||
scripts via ZMQ.
|
||||
"""
|
||||
|
||||
custom_prepare_cls = SamCamSetup
|
||||
|
||||
@@ -8,9 +8,8 @@ The SmarGon axes are interfaced as positioners.
|
||||
|
||||
import time
|
||||
import threading
|
||||
from threading import Thread, Lock
|
||||
import requests
|
||||
from collections import OrderedDict
|
||||
import requests
|
||||
from requests.adapters import HTTPAdapter, Retry
|
||||
from ophyd import Component, Kind, Signal, PVPositioner
|
||||
from ophyd.status import SubscriptionStatus
|
||||
@@ -27,7 +26,7 @@ except ModuleNotFoundError:
|
||||
|
||||
# SmarGon contoller can't really handle multiple connections
|
||||
# Use this mutex to ensure one access at a time
|
||||
mutex = Lock()
|
||||
mutex = threading.Lock()
|
||||
|
||||
|
||||
class LimitedSmarGonSignal(Signal):
|
||||
|
||||
@@ -5,5 +5,4 @@ from .mx_measurements import (
|
||||
MeasureScreening,
|
||||
MeasureHelical,
|
||||
MeasureHelical2,
|
||||
|
||||
)
|
||||
|
||||
@@ -1,3 +1,7 @@
|
||||
import bec
|
||||
import bec_lib.devicemanager.DeviceContainer as dev
|
||||
|
||||
|
||||
def rock(steps, exp_time, scan_start=None, scan_end=None, datasource=None, visual=True, **kwargs):
|
||||
"""Demo step scan with plotting
|
||||
|
||||
@@ -58,6 +62,7 @@ def rock(steps, exp_time, scan_start=None, scan_end=None, datasource=None, visua
|
||||
s, motor.name, motor.name, datasource.name, datasource.name
|
||||
)
|
||||
|
||||
# Move to fitted maximum
|
||||
# TODO: Validate fitted position
|
||||
# TODO: Move to fitted maximum
|
||||
|
||||
return s, firt_par
|
||||
|
||||
@@ -1,3 +1,7 @@
|
||||
import bec
|
||||
import bec_lib.devicemanager.DeviceContainer as dev
|
||||
|
||||
|
||||
def bl_check_beam():
|
||||
"""Check beamline status before scan"""
|
||||
return True
|
||||
@@ -59,23 +63,18 @@ def ascan(
|
||||
)
|
||||
|
||||
if visual:
|
||||
# If fitting via GUI
|
||||
# Fitting via GUI
|
||||
firt_par = plt1.get_dap_params()
|
||||
return s, firt_par
|
||||
else:
|
||||
# Fitting without GUI
|
||||
firt_par = bec.dap.LinearModel.fit(
|
||||
s, motor.name, motor.name, datasource.name, datasource.name
|
||||
)
|
||||
return s, firt_par
|
||||
# fit = bec.dap.LinearModel(motor, datasource)
|
||||
|
||||
# If fitting via GUI
|
||||
# firt_par = plt1.get_dap_params()
|
||||
|
||||
# return s, firt_par
|
||||
# #return s
|
||||
|
||||
# # Some basic fit
|
||||
# dkey = datasource.full_name
|
||||
# NOTE: s.scan.data == bec.history[-1]
|
||||
# datapoints = bec.history[-1].devices[dkey].read()[dkey]['value']
|
||||
# positions
|
||||
|
||||
return s, firt_par
|
||||
|
||||
Reference in New Issue
Block a user