chapman start

This commit is contained in:
2025-07-14 22:34:09 +02:00
parent 9dce2eb353
commit 07e73cb472
14 changed files with 5483 additions and 1186 deletions
+275 -45
View File
@@ -4,12 +4,15 @@ import shutil
from threading import Thread
import time
import traceback
import colorama
import numpy as np
import requests
from pathlib import Path
from time import sleep
from eco.utilities import NumpyEncoder
from eco.elements.protocols import Adjustable
from eco.utilities.utilities import foo_get_kwargs
from ..epics.detector import DetectorPvDataStream
from ..epics.utilities_epics import Monitor
from epics import PV
@@ -18,7 +21,7 @@ from ..elements.assembly import Assembly
from ..utilities.path_alias import PathAlias
import inputimeout
from IPython import get_ipython
from os.path import relpath
class Daq(Assembly):
def __init__(
@@ -39,6 +42,9 @@ class Daq(Assembly):
rate_multiplicator=None,
name=None,
namespace=None,
checker=None,
run_table=None,
pulse_picker=None,
elog = None,
):
super().__init__(name=name)
@@ -74,15 +80,38 @@ class Daq(Assembly):
self._detectors_event_code = detectors_event_code
self.name = name
self.namespace = namespace
self.checker = checker
self.run_table=run_table
self.pulse_picker = pulse_picker
self._default_file_path = None
if not rate_multiplicator == "auto":
print(
"warning: rate multiplicator automatically determined from event_master!"
)
self.callbacks_start_scan = [self.check_counters_for_scan, self.append_start_status_to_scan, self.count_run_number_up_and_attach_to_scan, self.scan_message_to_elog, self.append_scan_monitors]
self.callbacks_start_step = [self.copy_aliases_to_scan]
self.callbacks_end_step = [self.copy_scan_info_to_raw]
self.callbacks_end_scan = [self.append_status_to_scan_and_store, self.copy_scan_info_to_raw, self.end_scan_monitors]
self.callbacks_start_scan = [
self.check_counters_for_scan,
self.init_namespace,
self.append_start_status_to_scan,
self.count_run_number_up_and_attach_to_scan,
self.scan_message_to_elog,
self._create_runtable_metadata_append_status_to_runtable,
self.append_scan_monitors]
self.callbacks_start_step = [
self.copy_aliases_to_scan,
self.check_checker_before_step,
self.pulse_picker_action_start_step,
]
self.callbacks_step_counting = []
self.callbacks_end_step = [
self.pulse_picker_action_end_step,
self.copy_scan_info_to_raw,
self.check_checker_after_step
]
self.callbacks_end_scan = [
self.append_status_to_scan_and_store,
self.copy_scan_info_to_raw,
self.end_scan_monitors,
]
self.elog = elog
@property
@@ -112,14 +141,14 @@ class Daq(Assembly):
if scan:
acq_pars = {
"scan_info": {
"scan_name": scan.description,
"scan_name": scan.description(),
"scan_values": scan.values_current_step,
"scan_readbacks": scan.readbacks_current_step,
"scan_step_info": {
"step_number": scan.next_step + 1,
},
"name": [adj.name for adj in scan.adjustables],
"expected_total_number_of_steps": scan.number_of_steps,
"expected_total_number_of_steps": scan.number_of_steps(),
},
"run_number": scan.daq_run_number,
"user_tag": "usertag",
@@ -165,7 +194,28 @@ class Daq(Assembly):
stop_id=self.running[ix]["start_id"] + Npulses - 1, acq_ix=ix, wait=wait
)
def start(self, label=None, **kwargs):
def start(self, label=None, scan=None, **kwargs):
if scan:
acq_pars = {
"scan_info": {
"scan_name": scan.description(),
"scan_values": scan.values_current_step,
"scan_readbacks": scan.readbacks_current_step,
"scan_step_info": {
"step_number": scan.next_step + 1,
},
"name": [adj.name for adj in scan.adjustables],
"expected_total_number_of_steps": scan.number_of_steps(),
},
"run_number": scan.daq_run_number,
"user_tag": "usertag",
}
kwargs.update(acq_pars)
kwargs['channels_JF'] = kwargs.get('channels_JF',self.channels["channels_JF"].get_current_value())
kwargs['channels_BS'] = kwargs.get('channels_BS',self.channels["channels_BS"].get_current_value())
kwargs['channels_BSCAM'] = kwargs.get('channels_BSCAM',self.channels["channels_BSCAM"].get_current_value())
kwargs['channels_CA'] = kwargs.get('channels_CA',self.channels["channels_CA"].get_current_value())
starttime_local = time.time()
while self.pulse_id._pv.get_timevars() is None:
time.sleep(0.02)
@@ -178,6 +228,8 @@ class Daq(Assembly):
}
acq_pars.update(kwargs)
self.running.append(acq_pars)
if scan:
scan.daq_current_acquisition_index = self.running.index(acq_pars)
return self.running.index(acq_pars)
def stop(
@@ -187,19 +239,68 @@ class Daq(Assembly):
label=None,
wait=True,
wait_cycle_sleep=0.01,
scan=None,
):
if not stop_id:
stop_id = int(self.pulse_id.get_current_value())
if scan:
acq_ix = scan.daq_current_acquisition_index
if not acq_ix:
acq_ix = -1
acq_pars = self.running.pop(acq_ix)
acq_pars["stop_id"] = stop_id
label = acq_pars.pop("label")
# if scan:
# tmp = scan.info()
# tmp['daq_pars'] = acq_pars
# scan.info()
if wait:
while int(self.pulse_id.get_current_value()) < stop_id:
sleep(wait_cycle_sleep)
return self.retrieve(**acq_pars)
response = self.retrieve(**acq_pars)
# print(response)
if scan and not scan.daq_run_number==int(response["run_number"]):
raise Exception(
f"Run number mismatch: scan {scan.daq_run_number} != response {int(response['run_number'])}"
)
# correct file names to relative paths
if scan:
run_directory = list(
Path(f"/sf/bernina/data/{self.pgroup}/raw").glob(f"run{scan.daq_run_number:04d}*")
)[0].as_posix()
response['files'] = [relpath(file, run_directory) for file in response['files']]
return response
# if scan:
# response = self.acquire_pulses(
# Npulses,
# # directory_relative=Path(file_name).parents[0],
# wait=True,
# channels_JF=self.channels["channels_JF"].get_current_value(),
# channels_BS=self.channels["channels_BS"].get_current_value(),
# channels_BSCAM=self.channels["channels_BSCAM"].get_current_value(),
# channels_CA=self.channels["channels_CA"].get_current_value(),
# **acq_pars,
# )
# acquisition.acquisition_kwargs.update({"file_names": response["files"]})
# if scan and not scan.daq_run_number==int(response["run_number"]):
# raise Exception(
# f"Run number mismatch: scan {scan.daq_run_number} != response {int(response['run_number'])}"
# )
# for key, val in acquisition.acquisition_kwargs.items():
# acquisition.__dict__[key] = val
def retrieve(
self,
@@ -356,8 +457,37 @@ class Daq(Assembly):
json={"pgroup": pgroup, "run_number": run_number, "files": file_names},
)
def pulse_picker_action_start_step(
self,
scan,
do_pulse_picker_action=True,
**kwargs,
):
if not self.pulse_picker:
return
if not do_pulse_picker_action:
return
self.pulse_picker.open(verbose=False)
def pulse_picker_action_end_step(
self,
scan,
do_pulse_picker_action=True,
**kwargs,
):
if not self.pulse_picker:
return
if not do_pulse_picker_action:
return
self.pulse_picker.close(verbose=False)
def check_counters_for_scan(
self, scan, channels_to_check=["channels_BSCAM", "channels_JF"], timeout=3
self, scan, channels_to_check=["channels_BSCAM", "channels_JF"], channels_check_timeout=3, **kwargs
):
if not set(self.channels.keys()).intersection(set(channels_to_check)):
return
@@ -367,8 +497,8 @@ class Daq(Assembly):
print(f"{nam} : {chs.get_current_value()}")
try:
o = inputimeout.inputimeout(
prompt=f"Press Ctrl-c to abort, Return to continue, or wait {timeout} seconds",
timeout=timeout,
prompt=f"Press Ctrl-c to abort, Return to continue, or wait {channels_check_timeout} seconds",
timeout=channels_check_timeout,
)
except inputimeout.TimeoutOccurred:
print("... timed out, continuing with selection.")
@@ -409,6 +539,10 @@ class Daq(Assembly):
# except Exception as e:
# print(f"Error to set dap configuration {e}")
def init_namespace(self,scan=None, init_required_namespace_components_only=True, append_status_info=True):
if append_status_info:
self.namespace.init_all(silent=False, required_only=init_required_namespace_components_only)
def append_start_status_to_scan(self,scan=None, append_status_info=True):
if not append_status_info:
return
@@ -416,6 +550,53 @@ class Daq(Assembly):
stat = {"status_run_start": namespace_status}
scan.namespace_status = stat
def _create_runtable_metadata_append_status_to_runtable(
self,
scan,
append_status_info=True):
print("run_table appending run")
runno = scan.daq_run_number
metadata = {
"type": "scan",
"name": scan.description.get_current_value(),
"scan_info_file": '',
}
for n, adj in enumerate(scan.adjustables):
nname = None
adj_pvname = None
if hasattr(adj, "Id"):
adj_pvname = adj.Id
if hasattr(adj, "name"):
nname = adj.name
metadata.update(
{
f"scan_dim_{n}": nname,
f"from_dim_{n}": scan.values_todo.get_current_value()[0][n],
f"to_dim_{n}": scan.values_todo.get_current_value()[-1][n],
f"pvname_dim_{n}": adj_pvname,
}
)
if np.mean(np.diff(scan.pulses_per_step)) < 1:
pulses_per_step = scan.pulses_per_step[0]
else:
pulses_per_step = scan.pulses_per_step
metadata.update(
{
"steps": len(scan.values_todo.get_current_value()),
"pulses_per_step": pulses_per_step,
"counters": scan.counters_names.get_current_value(),
"scan_command": scan.scan_command.get_current_value(),
}
)
t_start_rt = time.time()
try:
self.run_table.append_run(runno, metadata=metadata, d=scan.namespace_status["status_run_start"]["status"])
except:
print("WARNING: issue adding data to run table")
print(f"Runtable appending took: {time.time()-t_start_rt:.3f} s")
def copy_scan_info_to_raw(self,scan, **kwargs):
t_start = time.time()
@@ -426,22 +607,12 @@ class Daq(Assembly):
runno = self.get_last_run_number()
# get data that should come later from api or similar.
run_directory = list(
Path(f"/sf/bernina/data/{self.pgroup}/raw").glob(f"run{runno:04d}*")
)[0].as_posix()
# run_directory = list(
# Path(f"/sf/bernina/data/{self.pgroup}/raw").glob(f"run{runno:04d}*")
# )[0].as_posix()
# Get scan info from scan
si = scan.scan_info
# correct some data in there (relative paths for now)
from os.path import relpath
newfiles = []
for files in si["scan_files"]:
newfiles.append([relpath(file, run_directory) for file in files])
si["scan_files"] = newfiles
# save temprary file and send then to raw
pgroup = self.pgroup
@@ -483,14 +654,13 @@ class Daq(Assembly):
# )
def append_status_to_scan_and_store(self,
scan, append_status_info=True, **kwargs
):
if not append_status_info:
return
if not len(scan.values_done)>0:
if not len(scan.values_done())>0:
return
namespace_status = self.namespace.get_status(base=None)
@@ -530,9 +700,40 @@ class Daq(Assembly):
# print(response.json())
# print("###############################")
scan.scan_info["scan_parameters"]["status"] = "aux/status.json"
def check_checker_before_step(self,scan, **kwargs):
# self.
if self.checker:
first_check = time.time()
checker_unhappy = False
print('')
while not self.checker.check_now():
print(
colorama.Fore.RED
+ f"Condition checker is not happy, waiting for OK conditions since {time.time()-first_check:5.1f} seconds."
+ colorama.Fore.RESET,
# end="\r",
)
sleep(1)
checker_unhappy = True
if checker_unhappy:
print(
colorama.Fore.RED
+ f"Condition checker was not happy and waiting for {time.time()-first_check:5.1f} seconds."
+ colorama.Fore.RESET
)
self.checker.clear_and_start_counting()
def check_checker_after_step(self,scan, **kwargs):
if self.checker:
if not self.checker.stop_and_analyze():
scan._current_step_ok = False
def copy_aliases_to_scan(self, scan, force=False, **kwargs):
if force or (len(scan.values_done) == 1):
def copy_aliases_to_scan(self, scan, send_aliases_now=False, **kwargs):
if send_aliases_now or (len(scan.values_done()) == 1):
namespace_aliases = self.namespace.alias.get_all()
if hasattr(scan, "daq_run_number"):
runno = scan.daq_run_number
@@ -583,30 +784,28 @@ class Daq(Assembly):
# print("################################")
scan.scan_info["scan_parameters"]["aliases"] = "aux/aliases.json"
def scan_message_to_elog(self, scan=None):
def scan_message_to_elog(self, scan=None, **kwargs):
# def _create_metadata_structure_start_scan(
# scan, run_table=run_table, elog=elog, append_status_info=True, **kwargs
# ):
runno = scan.daq_run_number
message_string = f"#### DAQ run {runno}"
if scan.description:
message_string += f': {scan.description}\n'
if scan.description():
message_string += f': {scan.description()}\n'
else:
message_string += f'\n'
try:
scan_command = get_ipython().user_ns["In"][-1]
message_string += "`" + scan_command + "`\n"
except:
print("Count not retrieve ipython scan command!")
elog_ids = scan.status_to_elog(message_string,auto_title=False)
scan._elog_id = elog_ids[1]
# message_string += "`" + metadata["scan_info_file"] + "`\n"
try:
elog_ids = self.elog.post(
message_string,
Title=f'Run {runno}: {scan.description}',
text_encoding="markdown",
)
scan._elog_id = elog_ids[1]
# try:
# elog_ids = self.elog.post(
# message_string,
# Title=f'Run {runno}: {scan.description()}',
# text_encoding="markdown",
# )
# metadata.update({"elog_message_id": scan._elog_id})
# metadata.update(
# {"elog_post_link": scan._elog.elogs[1]._log._url + str(scan._elog_id)}
@@ -632,12 +831,12 @@ class Daq(Assembly):
scan.daq_monitors[tname] = Monitor(adj.pvname)
except Exception:
print(f"Could not add CA monitor for {tname}")
traceback.print_exc()
# traceback.print_exc()
try:
rname = adj.readback.alias.get_full_name()
except Exception:
print("no readback configured")
traceback.print_exc()
# traceback.print_exc()
try:
scan.daq_monitors[rname] = Monitor(adj.readback.pvname)
except Exception:
@@ -690,6 +889,37 @@ class Daq(Assembly):
)
print(f"Status: {response.json()['status']} Message: {response.json()['message']}")
def get_callback_keywords(self):
kws_all = set([])
for cb in self.callbacks_start_scan:
kws = foo_get_kwargs(cb)
if kws:
kws_all.update(set(kws))
print(cb.__name__, "has keywords:", kws)
for cb in self.callbacks_start_step:
kws = foo_get_kwargs(cb)
if kws:
kws_all.update(set(kws))
print(cb.__name__, "has keywords:", kws)
for cb in self.callbacks_step_counting:
kws = foo_get_kwargs(cb)
if kws:
kws_all.update(set(kws))
print(cb.__name__, "has keywords:", kws)
for cb in self.callbacks_end_step:
kws = foo_get_kwargs(cb)
if kws:
kws_all.update(set(kws))
print(cb.__name__, "has keywords:", kws)
for cb in self.callbacks_end_scan:
kws = foo_get_kwargs(cb)
if kws:
kws_all.update(set(kws))
print(cb.__name__, "has keywords:", kws)
return kws_all
# scan.monitors = None
# def run_table_stuff(self):
+462 -226
View File
@@ -1,3 +1,5 @@
from datetime import datetime
from itertools import product
from numbers import Number
import os
import json
@@ -7,8 +9,10 @@ import traceback
from pathlib import Path
import colorama
from eco.elements.detector import DetectorGet, DetectorMemory
from eco.elements.protocols import Adjustable
from eco.utilities.utilities import NumpyEncoder, foo_get_kwargs
from ..elements.adjustable import DummyAdjustable
from ..elements.adjustable import AdjustableMemory, DummyAdjustable
from IPython import get_ipython
from .daq_client import Daq
from eco.elements.assembly import Assembly
@@ -32,7 +36,7 @@ class RunList(Assembly):
def get_run_list(self): ...
class StepScan:
class StepScan(Assembly):
def __init__(
self,
adjustables,
@@ -41,82 +45,111 @@ class StepScan:
description='',
Npulses=100,
basepath="",
# scan_in1fo_dir="",
settling_time=0,
# checker=None,
# scan_directories=False,
callbacks_start_scan=[],
callbacks_start_step=[],
callbacks_step_counting=[],
callbacks_end_step=[],
callbacks_end_scan=[],
# checker_sleep_time=2,
return_at_end="question",
# run_number=None,
return_at_end="timeout",
gridspecs = None,
elog=None,
name='current_scan',
**kwargs_callbacks,
):
# if np.any([char in fina for char in inval_chars]):
# raise ScanNameError
self.number_of_steps = len(values)
super().__init__(name=name)
self._append(DetectorMemory, datetime.now().strftime('%Y-%M-%d %H:%M:%S'), name='start_time')
self._description = description
self._append(DetectorGet, lambda : self._description, name='description')
self.adjustables = adjustables
self._append(DetectorGet, lambda : self._get_names(self.adjustables), name='adjustables_names')
self.counters = counters
self._append(DetectorGet, lambda : self._get_names(self.counters), name='counters_names')
self._append(DetectorMemory, len(values), name='number_of_steps')
try:
scan_command = get_ipython().user_ns["In"][-1]
except:
scan_command = "unknown"
self._append(DetectorMemory, scan_command, name='scan_command')
if not isinstance(Npulses, Number):
if not len(Npulses) == len(values):
raise ValueError("steps for Number of pulses and values must match!")
self.pulses_per_step = Npulses
else:
self.pulses_per_step = [Npulses] * len(values)
self.adjustables = adjustables
self.values_todo = values
self.values_done = []
self._values_todo = values
self._append(DetectorGet, lambda : self._values_todo, name='values_todo', is_display=False)
self._values_done = []
self._append(DetectorGet, lambda : self._values_done, name='values_done', is_display=False)
self._append(DetectorMemory, gridspecs, name='grid_specs', is_display=False)
self.pulses_done = []
self.readbacks = []
self.counters = counters
self.settling_time = settling_time
self._settling_time = settling_time
self._append(DetectorGet, lambda : self._settling_time, name='settling_time', is_display=False)
self.next_step = 0
self.description = description
# self.scan_info_dir = scan_info_dir
anames = []
for ta in adjustables:
try:
anames.append(ta.alias.get_full_name())
except:
anames.append(ta.name)
self.scan_info = {
"scan_parameters": {
"name": anames,
"name": self.adjustables_names.get_current_value(),
"grid_specs": self.grid_specs.get_current_value(),
# "Id": [ta.Id if hasattr(ta, "Id") else "noId" for ta in adjustables],
},
"scan_description": self.description,
"scan_description": self._description,
"scan_values_all": values,
"scan_values": [],
"scan_readbacks": [],
"scan_files": [],
"scan_step_info": [],
}
# self.scan_info_filename = os.path.join(self.scan_info_dir, fina)
# self._scan_directories = scan_directories
# self.checker = checker
self.initial_values = []
self._append(DetectorGet, lambda : self.scan_info, name='info', is_display=False)
initial_values = []
for adj in self.adjustables:
tv = adj.get_current_value()
initial_values.append(adj.get_current_value())
print("Initial value of %s : %g" % (adj.name, tv))
self._append(DetectorMemory, initial_values, name='initial_values', is_display=False)
self.return_at_end = return_at_end
# self._checker_sleep_time = checker_sleep_time
self._elog = elog
# self.run_number = run_number
self.remaining_tasks = []
self.callbacks_start_scan = callbacks_start_scan
self.callbacks_start_step = callbacks_start_step
self.callbacks_step_counting = callbacks_step_counting
self.callbacks_end_step = callbacks_end_step
self.callbacks_end_scan = callbacks_end_scan
self.callbacks_kwargs = kwargs_callbacks
# print(f"Scan info in file {self.scan_info_filename}.")
for adj in self.adjustables:
tv = adj.get_current_value()
self.initial_values.append(adj.get_current_value())
print("Initial value of %s : %g" % (adj.name, tv))
self.run_callbacks_start_scan()
self._have_run_callbacks_start_scan = False
def _get_names(self, elements):
"""Get the names of the elements."""
names = []
for el in elements:
if hasattr(el, "alias"):
names.append(el.alias.get_full_name())
elif hasattr(el, "name"):
names.append(el.name)
else:
names.append("unknown")
return names
def run_callbacks_start_scan(self):
if self.callbacks_start_scan:
for caller in self.callbacks_start_scan:
@@ -133,6 +166,21 @@ class StepScan:
if hasattr(ctr, "callbacks_start_step") and ctr.callbacks_start_step:
for tcb in ctr.callbacks_start_step:
tcb(self, **self.callbacks_kwargs)
def run_callbacks_step_counting(self):
if self.callbacks_step_counting:
for caller in self.callbacks_step_counting:
caller(self, **self.callbacks_kwargs)
for ctr in self.counters:
if hasattr(ctr, "callbacks_step_counting") and ctr.callbacks_step_counting:
for tcb in ctr.callbacks_step_counting:
tcb(self, **self.callbacks_kwargs)
def has_callbacks_step_counting(self):
if self.callbacks_step_counting:
return True
for ctr in self.counters:
if hasattr(ctr, "callbacks_step_counting") and ctr.callbacks_step_counting:
return True
return False
def run_callbacks_end_step(self):
if self.callbacks_end_step:
for caller in self.callbacks_end_step:
@@ -160,16 +208,16 @@ class StepScan:
# return fina
def do_next_step(self, step_info=None, verbose=True):
self._current_step_ok = True
t_step_start = time()
self.run_callbacks_start_step()
dt_callbacks_step_start = time()-t_step_start
if not len(self.values_todo) > 0:
if not len(self._values_todo) > 0:
return False
self.values_current_step = self.values_todo[0]
statstr = "Step %d of %d" % (self.next_step + 1, len(self.values_todo) + len(self.values_done))
self.values_current_step = self._values_todo[0]
statstr = "Step %d of %d" % (self.next_step + 1, len(self._values_todo) + len(self._values_done))
# fina = self.get_filename(self.nextStep)
@@ -182,7 +230,7 @@ class StepScan:
dt_adj = time()-t_adj_start
# settling
sleep(self.settling_time)
sleep(self._settling_time)
# counters
t_ctr_start = time()
@@ -206,44 +254,40 @@ class StepScan:
statstr += ' ; Ctrs '
acs = []
for ctr in self.counters:
# if isinstance(ctr, Daq):
# acq_pars = {
# "scan_info": {
# "scan_name": self.description,
# "scan_values": values_step,
# "scan_readbacks": readbacks_step,
# "scan_step_info": {
# "step_number": self.nextStep + 1,
# },
# "name": [adj.name for adj in self.adjustables],
# "expected_total_number_of_steps": len(self.values_todo)
# + len(self.values_done),
# },
# "run_number": self.run_number,
# "user_tag": self.fina,
# }
# acq = ctr.acquire(
# file_name=fina, Npulses=self.pulses_per_step[0], acq_pars=acq_pars
# )
# else:
acq = ctr.acquire(scan=self, Npulses=self.pulses_per_step[0])
acs.append(acq)
try:
if hasattr(ctr, "name"):
statstr += f"{ctr.name}, "
except:
pass
filenames = []
for ta in acs:
ta.wait()
filenames.extend(ta.file_names)
if not self.has_callbacks_step_counting():
acs = []
for ctr in self.counters:
acq = ctr.acquire(scan=self, Npulses=self.pulses_per_step[0])
acs.append(acq)
try:
if hasattr(ctr, "name"):
statstr += f"{ctr.name}, "
except:
pass
filenames = []
for ta in acs:
ta.wait()
filenames.extend(ta.file_names)
else:
acs = []
for ctr in self.counters:
ctr.start(scan=self)
try:
if hasattr(ctr, "name"):
statstr += f"{ctr.name}, "
except:
pass
self.run_callbacks_step_counting()
filenames = []
for ctr in self.counters:
resp = ctr.stop(scan=self)
filenames.extend(resp["files"])
statstr = statstr[:-2] + ' done.'
print(statstr, end='\n')
dt_ctr = time() - t_ctr_start
sleep(.003)
sleep(.003) # from display debugging, maybe unnecessary.
@@ -253,10 +297,10 @@ class StepScan:
# if not self.checker.stop_and_analyze():
# return True
if callable(step_info):
tstepinfo = step_info()
tstepinfo = step_info.get_current_value()
else:
tstepinfo = {}
self.values_done.append(self.values_todo.pop(0))
self._values_done.append(self._values_todo.pop(0))
self.pulses_done.append(self.pulses_per_step.pop(0))
self.readbacks.append(self.readbacks_current_step)
@@ -271,14 +315,20 @@ class StepScan:
"callbacks_step_end": dt_callbacks_step_end,
}
gridspecs = self.grid_specs.get_current_value()
if gridspecs:
tstepinfo['grid_index'] = gridspecs['index_plan'][self.next_step]
self.appendScanInfo(
self.values_current_step, self.readbacks_current_step, step_files=filenames, step_info=tstepinfo
)
# self.writeScanInfo()
self.next_step += 1
return True
if self._current_step_ok:
self.next_step += 1
return True
else:
return False
def appendScanInfo(
self, values_step, readbacks_step, step_files=None, step_info=None
@@ -301,6 +351,11 @@ class StepScan:
if kws:
kws_all.update(set(kws))
print(cb.__name__, "has keywords:", kws)
for cb in self.callbacks_step_counting:
kws = foo_get_kwargs(cb)
if kws:
kws_all.update(set(kws))
print(cb.__name__, "has keywords:", kws)
for cb in self.callbacks_end_step:
kws = foo_get_kwargs(cb)
if kws:
@@ -324,6 +379,12 @@ class StepScan:
if kws:
kws_all.update(set(kws))
print(cb.__name__, "has keywords:", kws)
if hasattr(ctr, "callbacks_step_counting"):
for cb in ctr.callbacks_step_counting:
kws = foo_get_kwargs(cb)
if kws:
kws_all.update(set(kws))
print(cb.__name__, "has keywords:", kws)
if hasattr(ctr, "callbacks_end_step"):
for cb in ctr.callbacks_end_step:
kws = foo_get_kwargs(cb)
@@ -365,8 +426,11 @@ class StepScan:
f.truncate()
def scan_all(self, step_info=None):
if not self._have_run_callbacks_start_scan:
self.run_callbacks_start_scan()
self._have_run_callbacks_start_scan = True
done = False
steps_remaining = len(self.values_todo)
steps_remaining = len(self._values_todo)
with Progress() as self._progress:
pr_task = self._progress.add_task(
"[green]Scanning...", total=steps_remaining
@@ -391,6 +455,7 @@ class StepScan:
print("Changing back to value(s) before scan.")
for ch in chs:
ch.wait()
elif self.return_at_end == "timeout":
timeout = 10
try:
@@ -424,12 +489,15 @@ class StepScan:
def changeToInitialValues(self):
c = []
for adj, iv in zip(self.adjustables, self.initial_values):
for adj, iv in zip(self.adjustables, self.initial_values()):
c.append(adj.set_target_value(iv))
return c
class Scans:
class Scans(Assembly):
"""Convenience class to initialte typical scans with some default parameters the base StepScan and others."""
def __init__(
self,
@@ -440,14 +508,19 @@ class Scans:
# scan_directories=False,
callbacks_start_scan=[],
callbacks_start_step=[],
callbacks_step_counting=[],
callbacks_end_step=[],
callbacks_end_scan=[],
# run_table=None,
elog=None,
name='scans',
):
super().__init__(name=name)
# self._run_table = run_table
self.callbacks_start_scan = callbacks_start_scan
self.callbacks_start_step = callbacks_start_step
self.callbacks_step_counting = callbacks_step_counting
self.callbacks_end_step = callbacks_end_step
self.callbacks_end_scan = callbacks_end_scan
# self.data_base_dir = data_base_dir
@@ -476,9 +549,79 @@ class Scans:
# self.scan_info_dir = scan_info_dir
# self.filename_generator = RunFilenameGenerator(self.scan_info_dir)
self._default_counters = default_counters
self._append(DetectorGet, self._get_counter_names, name='default_counters_names')
self._append(DetectorMemory, 'none since session start', name='acquiring_scan')
# self.checker = checker
# self._scan_directories = scan_directories
self._elog = elog
def _get_counter_names(self):
"""Get the names of the default counters."""
return [tc.name for tc in self._default_counters]
def get_callback_keywords(self):
kws_all = set([])
for cb in self.callbacks_start_scan:
kws = foo_get_kwargs(cb)
if kws:
kws_all.update(set(kws))
print(cb.__name__, "has keywords:", kws)
for cb in self.callbacks_start_step:
kws = foo_get_kwargs(cb)
if kws:
kws_all.update(set(kws))
print(cb.__name__, "has keywords:", kws)
for cb in self.callbacks_step_counting:
kws = foo_get_kwargs(cb)
if kws:
kws_all.update(set(kws))
print(cb.__name__, "has keywords:", kws)
for cb in self.callbacks_end_step:
kws = foo_get_kwargs(cb)
if kws:
kws_all.update(set(kws))
print(cb.__name__, "has keywords:", kws)
for cb in self.callbacks_end_scan:
kws = foo_get_kwargs(cb)
if kws:
kws_all.update(set(kws))
print(cb.__name__, "has keywords:", kws)
for ctr in self._default_counters:
if hasattr(ctr, "callbacks_start_scan"):
for cb in ctr.callbacks_start_scan:
kws = foo_get_kwargs(cb)
if kws:
kws_all.update(set(kws))
print(cb.__name__, "has keywords:", kws)
if hasattr(ctr, "callbacks_start_step"):
for cb in ctr.callbacks_start_step:
kws = foo_get_kwargs(cb)
if kws:
kws_all.update(set(kws))
print(cb.__name__, "has keywords:", kws)
if hasattr(ctr, "callbacks_step_counting"):
for cb in ctr.callbacks_step_counting:
kws = foo_get_kwargs(cb)
if kws:
kws_all.update(set(kws))
print(cb.__name__, "has keywords:", kws)
if hasattr(ctr, "callbacks_end_step"):
for cb in ctr.callbacks_end_step:
kws = foo_get_kwargs(cb)
if kws:
kws_all.update(set(kws))
print(cb.__name__, "has keywords:", kws)
if hasattr(ctr, "callbacks_end_scan"):
for cb in ctr.callbacks_end_scan:
kws = foo_get_kwargs(cb)
if kws:
kws_all.update(set(kws))
print(cb.__name__, "has keywords:", kws)
return kws_all
def acquire(
self,
@@ -488,13 +631,11 @@ class Scans:
counters=[],
start_immediately=True,
settling_time=0,
step_info=None,
return_at_end=True,
# checker="default",
step_info=None,
**kwargs_callbacks,
):
adjustable = DummyAdjustable()
positions = list(range(N_repetitions))
values = [[tp] for tp in positions]
# file_name = self.filename_generator.get_nextrun_filename(file_name)
@@ -516,11 +657,13 @@ class Scans:
callbacks_end_scan=self.callbacks_end_scan,
elog=self._elog,
return_at_end=return_at_end,
name='acquiring_scan',
**kwargs_callbacks,
)
self._append(s,name='acquiring_scan', overwrite=True)
if start_immediately:
s.scan_all(step_info=step_info)
return s
# return s
def ascan(
self,
@@ -529,70 +672,25 @@ class Scans:
end_pos,
N_intervals,
N_pulses,
file_name="",
counters=[],
checker="default",
start_immediately=True,
step_info=None,
return_at_end="question",
settling_time=0,
**kwargs_callbacks,
):
positions = np.linspace(start_pos, end_pos, N_intervals + 1)
values = [[tp] for tp in positions]
file_name = self.filename_generator.get_nextrun_filename(file_name)
run_number = self.filename_generator.get_nextrun_number()
if not counters:
counters = self._default_counters
if checker == "default":
checker = self.checker
s = StepScan(
[adjustable],
values,
counters,
file_name,
Npulses=N_pulses,
basepath=self.data_base_dir,
scan_info_dir=self.scan_info_dir,
checker=self.checker,
settling_time=settling_time,
scan_directories=self._scan_directories,
return_at_end=return_at_end,
callbacks_start_scan=self.callbacks_start_scan,
callbacks_start_step=self.callbacks_start_step,
callbacks_end_step=self.callbacks_end_step,
callbacks_end_scan=self.callbacks_end_scan,
run_table=self._run_table,
elog=self._elog,
run_number=run_number,
**kwargs_callbacks,
)
if start_immediately:
s.scan_all(step_info=step_info)
return s
def ascan_position_list(
self,
adjustable,
position_list,
N_pulses,
description="",
counters=[],
# checker="default",
start_immediately=True,
return_at_end="timeout",
settling_time=0,
step_info=None,
return_at_end="question",
**kwargs_callbacks,
):
positions = position_list
if type(N_intervals) is float:
print('Interval size defined as float, interpreting as interval size.')
positions = np.arange(start_pos, N_intervals, end_pos)
elif type(N_intervals) is int:
print('Interval size defined as int, interpreting as number of intervals.')
positions = np.linspace(start_pos, end_pos, N_intervals + 1)
values = [[tp] for tp in positions]
# description = self.filename_generator.get_nextrun_filename(description)
# run_number = self.filename_generator.get_nextrun_number()
if not counters:
counters = self._default_counters
# if checker == "default":
# checker = self.checker
s = StepScan(
[adjustable],
values,
@@ -606,11 +704,55 @@ class Scans:
callbacks_end_step=self.callbacks_end_step,
callbacks_end_scan=self.callbacks_end_scan,
elog=self._elog,
name='acquiring_scan',
**kwargs_callbacks,
)
self._append(s,name='acquiring_scan', overwrite=True)
if start_immediately:
s.scan_all(step_info=step_info)
return s
# return s
def ascan_position_list(
self,
adjustable,
position_list,
N_pulses,
description="",
counters=[],
# checker="default",
start_immediately=True,
settling_time=0,
step_info=None,
return_at_end="timeout",
name='acquiring_scan',
**kwargs_callbacks,
):
positions = position_list
values = [[tp] for tp in positions]
if not counters:
counters = self._default_counters
s = StepScan(
[adjustable],
values,
counters=counters,
description=description,
Npulses=N_pulses,
settling_time=settling_time,
return_at_end=return_at_end,
callbacks_start_scan=self.callbacks_start_scan,
callbacks_start_step=self.callbacks_start_step,
callbacks_end_step=self.callbacks_end_step,
callbacks_end_scan=self.callbacks_end_scan,
elog=self._elog,
name='acquiring_scan',
**kwargs_callbacks,
)
self._append(s,name='acquiring_scan', overwrite=True)
if start_immediately:
s.scan_all(step_info=step_info)
# return s
def dscan(
@@ -620,49 +762,112 @@ class Scans:
end_pos,
N_intervals,
N_pulses,
file_name="",
description="",
counters=[],
checker="default",
start_immediately=True,
settling_time=0,
step_info=None,
return_at_end="question",
return_at_end="timeout",
**kwargs_callbacks,
):
positions = np.linspace(start_pos, end_pos, N_intervals + 1)
"""Differential scan, i.e. the adjustable is moved to the start position and then moved in steps of the interval size."""
if type(N_intervals) is float:
print('Interval size defined as float, interpreting as interval size.')
positions = np.arange(start_pos, N_intervals, end_pos)
elif type(N_intervals) is int:
print('Interval size defined as int, interpreting as number of intervals.')
positions = np.linspace(start_pos, end_pos, N_intervals + 1)
current = adjustable.get_current_value()
values = [[tp + current] for tp in positions]
file_name = self.filename_generator.get_nextrun_filename(file_name)
run_number = self.filename_generator.get_nextrun_number()
if not counters:
counters = self._default_counters
if checker == "default":
checker = self.checker
s = StepScan(
[adjustable],
values,
counters,
file_name,
Npulses=N_pulses,
basepath=self.data_base_dir,
scan_info_dir=self.scan_info_dir,
checker=self.checker,
scan_directories=self._scan_directories,
description=description,
return_at_end=return_at_end,
settling_time=settling_time,
callbacks_start_scan=self.callbacks_start_scan,
callbacks_start_step=self.callbacks_start_step,
callbacks_end_step=self.callbacks_end_step,
callbacks_end_scan=self.callbacks_end_scan,
run_table=self._run_table,
elog=self._elog,
run_number=run_number,
name='acquiring_scan',
**kwargs_callbacks,
)
self._append(s,name='acquiring_scan', overwrite=True, status=True)
if start_immediately:
s.scan_all(step_info=step_info)
return s
# return s
def snakescan(
self,
adjustable_slow,
step_interval,
Nrows,
adjustable_fast,
interval,
description="",
counters=[],
start_immediately=True,
settling_time=0,
step_info=None,
return_at_end="timeout",
**kwargs_callbacks,
):
adj_slow_start = adjustable_slow.get_current_value()
adj_fast_start = adjustable_fast.get_current_value()
print('Snakescan is relative, starting from here: %s, %s' % (adj_slow_start, adj_fast_start))
start_positions = [
[adj_slow_start + step_interval * i, adj_fast_start + (i%2)*interval ]
for i in range(Nrows)]
def counting_function(scan):
cv = adjustable_fast.get_current_value()
print(cv)
if abs(cv - adj_fast_start) < abs(cv - adj_fast_start - interval):
print('moving to interval')
adjustable_fast.set_target_value(adj_fast_start +interval).wait()
else:
print('moving back')
adjustable_fast.set_target_value(adj_fast_start).wait()
if not counters:
counters = self._default_counters
s = StepScan(
[adjustable_slow, adjustable_fast],
start_positions,
counters,
Npulses=1,
description=description,
return_at_end=return_at_end,
settling_time=settling_time,
callbacks_start_scan=self.callbacks_start_scan,
callbacks_start_step=self.callbacks_start_step,
callbacks_step_counting=[counting_function],
callbacks_end_step=self.callbacks_end_step,
callbacks_end_scan=self.callbacks_end_scan,
elog=self._elog,
name='acquiring_scan',
**kwargs_callbacks,
)
self._append(s,name='acquiring_scan', overwrite=True)
if start_immediately:
s.scan_all(step_info=step_info)
# return s
def a2scan(
self,
adjustable0,
@@ -685,7 +890,7 @@ class Scans:
positions1 = np.linspace(start1_pos, end1_pos, N_intervals + 1)
values = [[tp0, tp1] for tp0, tp1 in zip(positions0, positions1)]
if not counters:
counters = self._default_counters
counters = self.default_counters.get_current_value()
if checker == "default":
checker = self.checker
s = StepScan(
@@ -705,78 +910,86 @@ class Scans:
run_table=self._run_table,
elog=self._elog,
return_at_end=return_at_end,
name='acquiring_scan',
**kwargs_callbacks,
)
self._append(s,name='acquiring_scan', overwrite=True)
if start_immediately:
s.scan_all(step_info=step_info)
return s
# return s
def meshscan(
self,
*adj_specs,
scanning_order='last_fastest',
N_pulses=None,
description="",
counters=[],
start_immediately=True,
return_at_end="timeout",
settling_time=0,
step_info=None,
**kwargs_callbacks,
):
"""
Mesh scan, i.e. a scan in multiple dimensions, where the last adjustable is moved first.
The scanning order can be changed by setting the `scanning_order` parameter.
"""
adjustables = []
positions = []
for adj_spec in adj_specs:
adj = adj_spec[0]
spec = adj_spec[1:]
if isinstance(adj, Adjustable):
adjustables.append(adj)
positions.append(interpret_step_specification(spec))
shape = [len(tp) for tp in positions]
if scanning_order=='last_fastest':
index_plan = list(product(*[range(n) for n in shape]))
elif scanning_order=='fist_fastst':
index_plan = [tc[::-1] for tc in product(*[range(n) for n in shape][::-1])]
values = []
for ixs in index_plan:
values.append([tp[ti] for ti,tp in zip(ixs,positions)])
gridspecs = {
'shape' : shape,
'positions':positions,
'index_plan':index_plan,
}
if not counters:
counters = self._default_counters
s = StepScan(
adjustables,
values,
counters=counters,
Npulses=N_pulses,
description=description,
return_at_end=return_at_end,
settling_time=settling_time,
callbacks_start_scan=self.callbacks_start_scan,
callbacks_start_step=self.callbacks_start_step,
callbacks_end_step=self.callbacks_end_step,
callbacks_end_scan=self.callbacks_end_scan,
elog=self._elog,
gridspecs=gridspecs,
name='acquiring_scan',
**kwargs_callbacks,
)
self._append(s,name='acquiring_scan', overwrite=True)
if start_immediately:
s.scan_all(step_info=step_info)
# def rscan(self, *args, **kwargs):
# print(
# "Warning: This is not implemented, should be reflectivity scan. \n for relative/differential scan please use dscan ."
# )
# # return self.rscan(*args, **kwargs)
# def a2scanList(
# self,
# adjustable0,
# start0_pos,
# end0_pos,
# adjustable1,
# start1_pos,
# end1_pos,
# N_intervals,
# N_pulses,
# file_name=None,
# counters=[],
# checker="default",
# start_immediately=True,
# step_info=None,
# return_at_end="question",
# **kwargs_callbacks,
# ):
# positions0 = np.linspace(start0_pos, end0_pos, N_intervals + 1)
# positions1 = np.linspace(start1_pos, end1_pos, N_intervals + 1)
# # self.prefix
# # + f"{runno:{self.Ndigits}0d}"
# # + self.separator
# # + "*."
# # + self.suffix
# values = [[tp0, tp1] for tp0, tp1 in zip(positions0, positions1)]
# if not counters:
# counters = self._default_counters
# if checker == "default":
# checker = self.checker
# s = Scan(
# [adjustable0, adjustable1],
# values,
# self.counters,
# file_name,
# Npulses=N_pulses,
# basepath=self.data_base_dir,
# scan_info_dir=self.scan_info_dir,
# checker=self.checker,
# scan_directories=self._scan_directories,
# return_at_end=return_at_end,
# callbacks_start_scan=self.callbacks_start_scan,
# callbacks_start_step=self.callbacks_start_step,
# callbacks_end_step=self.callbacks_end_step,
# callbacks_end_scan=self.callbacks_end_scan,
# run_table=self._run_table,
# elog=self._elog,
# **kwargs_callbacks,
# )
# if start_immediately:
# s.scan_all(step_info=step_info)
# return s
class RunFilenameGenerator:
@@ -833,3 +1046,26 @@ class RunFilenameGenerator:
+ "."
+ self.suffix
)
def interpret_step_specification(spec):
# normal linear scan
if len(spec) == 3 and all(isinstance(ta,Number) for ta in spec):
start_pos, end_pos, N_intervals = spec
if type(N_intervals) is float:
print('Interval size defined as float, interpreting as interval size.')
positions = np.arange(start_pos, N_intervals, end_pos)
elif type(N_intervals) is int:
print('Interval size defined as int, interpreting as number of intervals.')
positions = np.linspace(start_pos, end_pos, N_intervals + 1)
return positions
elif len(spec) == 1 and np.iterable(spec[0]):
if type(spec[0]) is str:
raise Exception("Step position specification is a string, interpreting as position list!")
positions = spec[0]
return positions
else:
raise Exception(
"Step position specification is not understood, should be 3 numbers or a list of positions."
)
+218 -855
View File
File diff suppressed because it is too large Load Diff
File diff suppressed because it is too large Load Diff
+7 -1
View File
@@ -1573,6 +1573,8 @@ class SmaractRecord(Assembly):
alias_fields={},
backlash_definition=False,
expect_bad_limits=True,
preferred_home_direction='forward',
**kwargs,
):
super().__init__(name=name)
# self.settings.append(self)
@@ -1580,6 +1582,7 @@ class SmaractRecord(Assembly):
self.pvname = pvname
self._motor = _Motor(pvname)
self._append(AdjustableMemory,preferred_home_direction, name=preferred_home_direction)
self._elog = elog
for an, af in alias_fields.items():
self.alias.append(
@@ -1709,7 +1712,10 @@ class SmaractRecord(Assembly):
self.set_limits(-abs_set_value, abs_set_value)
def home(self):
self.home_forward(1)
if self.preferred_home_direction.get_current_value() == "forward" or self.preferred_home_direction.get_current_value() is None or self.preferred_home_direction.get_current_value():
self.home_forward(1)
elif self.preferred_home_direction.get_current_value() == "reverse" or self.preferred_home_direction.get_current_value() is False:
self.home_reverse(1)
time.sleep(0.1)
while not self.flags.is_homed.get_current_value():
time.sleep(0.1)
+11 -2
View File
@@ -15,6 +15,7 @@ from eco.aliases import Alias
from eco.devices_general.utilities import Changer
from eco.elements import memory
from eco.utilities.keypress import KeyPress
# from .assembly import Assembly
@@ -422,13 +423,19 @@ def _keywordChecker(kw_key_list_tups):
@tweak_option
@value_property
class AdjustableMemory:
def __init__(self, value=0, name="adjustable_memory"):
def __init__(self, value=0, name="adjustable_memory", return_deep_copy=True):
self.name = name
self.alias = Alias(name)
self.current_value = value
self._return_deep_copy = return_deep_copy
if memory.global_memory_dir:
self.memory = memory.Memory(self)
def get_current_value(self):
return deepcopy(self.current_value)
if self._return_deep_copy:
return deepcopy(self.current_value)
else:
return self.current_value
def set_target_value(self, value, hold=False):
def changer(value):
@@ -485,6 +492,8 @@ class AdjustableFS:
self._write_value(default_value)
self.alias = Alias(name)
self.max_read_period = max_read_period
# if memory.global_memory_dir:
# self.memory = memory.Memory(self)
self.name = name
def get_current_value(self):
+64 -22
View File
@@ -39,6 +39,17 @@ class Collection:
def get_list(self):
return self._list
# esired new way, in order to old containers and allow them to be replaced "on top" of a structure.
# causes other issues from recoursion, bigger issue...
# def get_list(self):
# ls = []
# for item in self._list:
# if hasattr(item, f"{self.name}") and isinstance(item.__dict__[self.name],Collection):
# ls.append(item.__dict__[self.name].get_list())
# else:
# ls.append(ls)
# return ls
def append(self, obj, recursive=True, force=False):
if force:
@@ -67,6 +78,13 @@ class Collection:
def pop_item(self, item):
return self.pop(self.index(item))
def pop_obj_children(self, obj):
o = []
for it in obj.__dict__[self.name].get_list():
if (it in self._list):
o.append(self.pop_item(it))
return o
def __call__(self):
return self.get_list()
@@ -118,9 +136,28 @@ class Assembly:
is_alias=True,
view_toplevel_only=True,
call_obj=True,
append_property_with_name=False,
# append_property_with_name=False,
overwrite=False,
**kwargs,
):
if overwrite:
if name in self.__dict__:
old = self.__dict__[name]
for collection in [
self.settings_collection,
self.status_collection,
]:
if isinstance(old,Assembly):
collection.pop_obj_children(old)
else:
if old in collection.get_list():
collection.pop_item(old)
self.display_collection.pop_item(old)
self.alias.pop_object(old.alias)
# del self.__dict__[name]
if isinstance(foo_obj_init, Adjustable) and not isclass(foo_obj_init):
# adj_copy = copy.copy(foo_obj_init)
adj_copy = foo_obj_init
@@ -138,23 +175,23 @@ class Assembly:
# except:
# print(f'object {name} / {foo_obj_init} not initialized with name/parent')
# self.__dict__[name] = foo_obj_init(*args, **kwargs)
if append_property_with_name:
if isinstance(self.__dict__[name], Adjustable):
self.__class__.__dict__[append_property_with_name] = property(
self.__dict__[name].get_current_value,
lambda val: self.__dict__[name].set_target_value(val).wait(),
)
elif isinstance(self.__dict__[name], Detector):
self.__class__.__dict__[append_property_with_name] = property(
self.__dict__[name].get_current_value,
)
# if append_property_with_name:
# if isinstance(self.__dict__[name], Adjustable):
# self.__class__.__dict__[append_property_with_name] = property(
# self.__dict__[name].get_current_value,
# lambda val: self.__dict__[name].set_target_value(val).wait(),
# )
# elif isinstance(self.__dict__[name], Detector):
# self.__class__.__dict__[append_property_with_name] = property(
# self.__dict__[name].get_current_value,
# )
if is_setting == "auto":
is_setting = isinstance(self.__dict__[name], Adjustable)
# if is_setting == "auto":
# is_setting = isinstance(self.__dict__[name], Adjustable)
if is_setting:
self.settings_collection.append(self.__dict__[name], recursive=True)
if is_status == "auto":
is_status = isinstance(self.__dict__[name], Detector)
# if is_status == "auto":
# is_status = isinstance(self.__dict__[name], Detector)
if is_status:
self.status_collection.append(self.__dict__[name], recursive=True)
if is_display:
@@ -358,7 +395,7 @@ class Assembly:
return s
def get_display_str(
self, tablefmt="simple", maxcolwidths=[None, 50, None, None, None]
self, tablefmt="simple", with_base_name=False, maxcolwidths=[None, 50, None, None, None]
):
main_name = self.name
stats = self.display_collection()
@@ -366,7 +403,7 @@ class Assembly:
tab = []
for to in stats:
name = to.alias.get_full_name(base=self)
is_adjustable = isinstance(to, Adjustable)
is_detector = isinstance(to, Detector)
typechar = ""
@@ -396,9 +433,14 @@ class Assembly:
if value is None:
value = ""
tab.append(
[".".join([main_name, name]), value, unit, typechar, description]
)
if with_base_name:
tab.append(
[".".join([main_name, name]), value, unit, typechar, description]
)
else:
tab.append(
[ name, value, unit, typechar, description]
)
if tab:
s = tabulate(tab, tablefmt=tablefmt, maxcolwidths=maxcolwidths)
else:
@@ -437,7 +479,7 @@ class Assembly:
json.dump(stat, f, indent=4, cls=NumpyEncoder)
files.append(filepath)
elog.post(
return elog.post(
message,
*files,
text_encoding="html",
@@ -445,7 +487,7 @@ class Assembly:
# tags=[],
def __repr__(self):
label = self.alias.get_full_name() + " status\n"
label = self.alias.get_full_name() + " display\n"
return label + self.get_display_str()
# def _wait_for_initialisation(self, timeout=2):
+24 -1
View File
@@ -1,4 +1,5 @@
from eco.elements.adjustable import AdjustableMemory, default_representation
from copy import deepcopy
from eco.elements.adjustable import AdjustableMemory, default_representation, spec_convenience
from eco.elements.assembly import Assembly
from eco.aliases import Alias
import time
@@ -91,3 +92,25 @@ class DetectorGet:
if self._cache_get_seconds:
self._get_cache = (ts, value)
return value
@call_convenience
@value_property
class DetectorMemory:
def __init__(self, value=0, name="detector_memory", return_deep_copy=True):
self.name = name
self.alias = Alias(name)
self.current_value = value
self._return_deep_copy = return_deep_copy
def get_current_value(self):
if self._return_deep_copy:
return deepcopy(self.current_value)
else:
return self.current_value
def __repr__(self):
name = self.name
cv = self.get_current_value()
s = f"{name} at value: {cv}" + "\n"
return s
+55 -25
View File
@@ -1,6 +1,7 @@
import json
import importlib
from pathlib import Path
from eco.elements.adjustable import AdjustableFS, AdjustableMemory
from eco.elements.protocols import InitialisationWaitable
import sys
from time import sleep, time
@@ -21,6 +22,7 @@ from threading import Thread
from tqdm import tqdm
from rich import progress
from inspect import signature
from simple_term_menu import TerminalMenu
import traceback
@@ -322,7 +324,7 @@ class IsInitialisingError(Exception):
class Namespace(Assembly):
def __init__(self, name=None, root_module=None, alias_namespace=None):
def __init__(self, name=None, root_module=None, alias_namespace=None, required_names=[], required_names_directory=None):
super().__init__(name)
# self.name = name
self.lazy_items = {}
@@ -338,6 +340,11 @@ class Namespace(Assembly):
self._initializing = []
self.root_module = root_module
self.alias_namespace = alias_namespace
if required_names_directory:
self._append(AdjustableFS,required_names_directory,name='required_names', is_setting=True, is_display=True)
else:
self._append(AdjustableMemory,[], name='required_names', is_setting=True, is_display=True)
@property
def initialisation_times_sorted(self):
@@ -383,6 +390,18 @@ class Namespace(Assembly):
except ValueError:
pass
def select_required_names(self):
terminal_menu = TerminalMenu(
self.all_names,
multi_select=True,
show_multi_select_hint=True,
preselected_entries=list(self.required_names()),
title="Select required names for namespace %s" % self.name,
)
selected_indices = terminal_menu.show()
selected_names = terminal_menu.chosen_menu_entries
if selected_names:
self.required_names(list(selected_names))
@@ -425,6 +444,7 @@ class Namespace(Assembly):
def init_all(
self,
required_only=True,
verbose=False,
raise_errors=False,
print_summary=True,
@@ -441,6 +461,20 @@ class Namespace(Assembly):
print(
f"WARNING - previously hard failed items are NOT initialized:\n{self.failed_names} "
)
if required_only:
if not self.required_names():
print(
f"WARNING - No required names defined in namespace {self.name}, initializing all items!"
)
else:
names_to_init = (
self.all_names - self.initialized_names - set(exclude_names)
) & set(self.required_names())
else:
names_to_init = self.all_names - self.initialized_names - set(exclude_names)
if silent:
self.silently_initializing = True
print(
@@ -453,7 +487,7 @@ class Namespace(Assembly):
self.exc_init.submit(
self.init_name, name, verbose=verbose, raise_errors=raise_errors
)
for name in (self.all_names - set(exclude_names))
for name in names_to_init
]
self.exc_init.shutdown(wait=True)
self.exc_init = ThreadPoolExecutor(max_workers=1)
@@ -462,22 +496,22 @@ class Namespace(Assembly):
self.init_name, name, verbose=verbose, raise_errors=raise_errors
)
for name in (
self.all_names - self.initialized_names - set(exclude_names)
names_to_init
)
]
self.exc_init.shutdown(wait=True)
self.silently_initializing = False
if giveup_failed:
failed_names = self.lazy_names
failed_names = names_to_init.intersection(self.lazy_names)
for k in failed_names:
self.failed_items[k] = self.lazy_items.pop(k)
if print_summary:
print(
f"Initialized {len(self.initialized_names)} of {len(self.all_names)}."
f"Initialized {len(self.initialized_names & names_to_init)} of {len(names_to_init)}."
)
print(
"Failed objects: "
+ ", ".join(self.lazy_names.union(self.failed_names))
+ ", ".join(self.failed_names & names_to_init)
)
print(f"Initialisation took {time()-starttime} seconds")
@@ -492,13 +526,11 @@ class Namespace(Assembly):
lambda name: self.init_name(
name, verbose=verbose, raise_errors=raise_errors
),
self.all_names
- self.initialized_names
- set(exclude_names),
names_to_init,
),
description="Initializing ...",
total=len(
self.all_names - self.initialized_names - set(exclude_names)
names_to_init
),
transient=True,
)
@@ -512,13 +544,11 @@ class Namespace(Assembly):
lambda name: self.init_name(
name, verbose=verbose, raise_errors=raise_errors
),
self.all_names
- self.initialized_names
- set(exclude_names),
names_to_init,
),
description="Initializing ...",
total=len(
self.all_names - self.initialized_names - set(exclude_names)
names_to_init
),
transient=True,
)
@@ -526,18 +556,18 @@ class Namespace(Assembly):
# )
# # )
if giveup_failed:
failed_names = self.lazy_names
for k in failed_names:
self.failed_items[k] = self.lazy_items.pop(k)
failed_names = names_to_init.intersection(self.lazy_names)
for k in failed_names:
self.failed_items[k] = self.lazy_items.pop(k)
if print_summary:
print(
f"Initialized {len(self.initialized_names)} of {len(self.all_names)}."
)
print(
"Failed objects: "
+ ", ".join(self.lazy_names.union(self.failed_names))
)
print(f"Initialisation took {time()-starttime} seconds")
print(
f"Initialized {len(self.initialized_names & names_to_init)} of {len(names_to_init)}."
)
print(
"Failed objects: "
+ ", ".join(self.failed_names & names_to_init)
)
print(f"Initialisation took {time()-starttime} seconds")
if (not silent) and print_times:
try:
+13 -3
View File
@@ -21,7 +21,7 @@ class RuntableGsheet:
def require_worksheets(self):
tls = [tmp.title for tmp in self._spreadsheet.worksheets()]
for title in [self._wstitle_available_keys, self._wstitle_run_table]:
for title in [self._wstitle_run_table,self._wstitle_available_keys]:
if not title in tls:
self._spreadsheet.add_worksheet(title,1,1)
@@ -37,11 +37,15 @@ class RuntableGsheet:
ncolstot = 0
for i,k in enumerate(keys):
ti = np.unravel_index(i, shape, order='C')
if self._remove_leading:
tk = k.split(self._remove_leading)[1]
else:
tk = k
cells.append(
gspread.Cell(
ti[1]+rng['startRowIndex']+1,
ti[0]+rng['startColumnIndex']+1,
k.split(self._remove_leading)[1]))
tk))
nrowstot = int(max(nrowstot,ti[1]+rng['startRowIndex']+1))
ncolstot = int(max(ncolstot,ti[0]+rng['startColumnIndex']+1))
@@ -61,9 +65,13 @@ class RuntableGsheet:
set_cells= []
nrowstot = 0
ncolstot = 0
test_keys = [tk.split(self._remove_leading)[1] for tk in table.keys()]
# test_keys = [tk.split(self._remove_leading)[1] for tk in table.keys()]
test_keys = [tk for tk in table.keys()]
for cell in cell_list:
tstr = cell.value
if not tstr:
continue
if not isinstance(tstr,str):
continue
tstr = tstr.split(self._name_delimiter)[0].strip()
@@ -71,6 +79,8 @@ class RuntableGsheet:
if tstr in test_keys:
set_vals = table[tstr]
for n,set_val in enumerate(set_vals):
if np.isnan(set_val):
set_val='nan'
set_cells.append(gspread.Cell(cell.row + n + 1, cell.col, set_val))
nrowstot = int(max(nrowstot,cell.row + n + 1))
ncolstot = int(max(ncolstot,cell.col))
+946
View File
@@ -0,0 +1,946 @@
from oauth2client.service_account import ServiceAccountCredentials
from pandas import DataFrame
import pandas as pd
import warnings
from eco.utilities.runtable_gsheet import RuntableGsheet
from ..elements.adjustable import AdjustableFS
from ..elements.memory import Memory
from subprocess import call
from eco.utilities.config import Proxy
from eco.bernina import namespace
warnings.simplefilter(action="ignore", category=pd.errors.PerformanceWarning)
warnings.simplefilter(action="ignore", category=UserWarning)
import timeit
import os
from pathlib import Path
from epics import PV
import numpy as np
import gspread
import gspread_dataframe as gd
import gspread_formatting as gf
import gspread_formatting.dataframe as gf_dataframe
from datetime import datetime
import xlwt
import openpyxl
from ..devices_general.pv_adjustable import PvRecord
from epics import caget
import eco
import threading
pd.options.display.max_rows = 100
pd.options.display.max_columns = 50
pd.options.display.max_colwidth = 50
pd.options.display.width = None
pd.set_option("display.float_format", lambda x: "%.5g" % x)
class Gsheet_API:
def __init__(
self,
keydf_fname,
cred_fname,
exp_id,
exp_path,
gsheet_key_path,
):
### credentials and settings for uploading to gspread ###
self._scope = [
"https://spreadsheets.google.com/feeds",
"https://www.googleapis.com/auth/drive",
]
self._credentials = ServiceAccountCredentials.from_json_keyfile_name(
"/sf/bernina/config/src/python/gspread/pandas_push", self._scope
)
self.gc = gspread.authorize(self._credentials)
self._keydf_fname = keydf_fname
self.keys = (
"metadata gps jet energy las_inc delay lxt pulse_id att_self att_fe_self"
)
self._key_df = DataFrame()
self.gsheet_keys = AdjustableFS(
gsheet_key_path,
name="gsheet_keys",
default_value="metadata thc gps xrd att att_usd kb",
)
self.init_runtable(exp_id)
def create_rt_spreadsheet(self, exp_id):
self.gc = gspread.authorize(self._credentials)
spreadsheet = self.gc.create(
title=f"run_table_{exp_id}", folder_id="1F7DgF0HW1O71nETpfrTvQ35lRZCs5GvH"
)
spreadsheet.add_worksheet("runtable", 10, 10)
spreadsheet.add_worksheet("positions", 10, 10)
ws = spreadsheet.get_worksheet(0)
spreadsheet.del_worksheet(ws)
return spreadsheet
def _append_to_gspread_key_df(self, gspread_key_df):
if os.path.exists(self._keydf_fname):
self._key_df = pd.read_pickle(self._keydf_fname)
# deprecated: self._key_df = self._key_df.append(gspread_key_df)
self._key_df = pd.concat([self._key_df, gspread_key_df])
self._key_df.to_pickle(self._keydf_fname)
else:
self._key_df.to_pickle(self._keydf_fname)
def init_runtable(self, exp_id):
if os.path.exists(self._keydf_fname):
self._key_df = pd.read_pickle(self._keydf_fname)
if self._key_df is not None and str(exp_id) in self._key_df.index:
spreadsheet_key = self._key_df["keys"][f"{exp_id}"]
else:
f_create = str(
input(
f"No google spreadsheet id found for experiment {exp_id}. Create new run_table spreadsheet? (y/n) "
)
)
if f_create == "y":
print("creating")
spreadsheet = self.create_rt_spreadsheet(exp_id=exp_id)
print("created")
gspread_key_df = DataFrame(
{"keys": [spreadsheet.id]},
index=[f"{exp_id}"],
)
print(gspread_key_df)
spreadsheet_key = spreadsheet.id
self._append_to_gspread_key_df(gspread_key_df)
self._spreadsheet_key = spreadsheet_key
def upload_rt(self, worksheet="runtable", keys=None, df=None):
"""
This function uploads all entries of which "type" contains "scan" to the worksheet positions.
keys takes a string of keys separated by a space, e.g. 'gps xrd las'. All columns, which contain
any of these strings are uploaded. keys = None defaults to self.keys. keys = '' returns all columns
"""
self.gc = gspread.authorize(self._credentials)
ws = self.gc.open_by_key(self._spreadsheet_key).worksheet(worksheet)
upload_df = df[df["metadata.type"].str.contains("scan", na=False)]
gd.set_with_dataframe(ws, upload_df, include_index=True, col=2)
gf_dataframe.format_with_dataframe(
ws, upload_df, include_index=True, include_column_header=True, col=2
)
def upload_pos(self, worksheet="positions", keys=None, df=None):
"""
This function uploads all entries with "type == pos" to the worksheet positions.
keys takes a list of strin All columns, which contain any of these strings are uploaded.
keys = None defaults to self.keys. keys = [] returns all columns
"""
self.gc = gspread.authorize(self._credentials)
ws = self.gc.open_by_key(self._spreadsheet_key).worksheet(worksheet)
upload_df = df[df["metadata.type"].str.contains("pos", na=False)]
gd.set_with_dataframe(ws, upload_df, include_index=True, col=2)
gf_dataframe.format_with_dataframe(
ws, upload_df, include_index=True, include_column_header=True, col=2
)
def _upload_all(self, df):
try:
self.upload_rt(df=df)
self.upload_pos(df=df)
except:
print(
f"Uploading of runtable to gsheet https://docs.google.com/spreadsheets/d/{self._spreadsheet_key}/ failed. Run run_table.upload_rt() for error traceback"
)
def upload_all(self, df):
rt = threading.Thread(target=self._upload_all, kwargs={"df": df})
rt.start()
class Container:
def __init__(self, df, name=""):
self._cols = df.columns
self._top_level_name = name
self._df = df
self.__dir__()
def _slice_df(self):
self._df.load()
next_level_names = self._get_next_level_names()
try:
if len(next_level_names) == 0:
columns_to_keep = [self._top_level_name[:-1]]
else:
columns_to_keep = [
f"{self._top_level_name}{n}"
for n in next_level_names
if f"{self._top_level_name}{n}" in self._cols
]
sdf = self._df[columns_to_keep]
except:
sdf = pd.DataFrame(columns=next_level_names)
return sdf
def _get_next_level_names(self):
if len(self._top_level_name) == 0:
next_level_names = np.unique(
np.array([n.split(".")[0] for n in self._cols])
)
else:
next_level_names = np.unique(
np.array(
[
n.split(self._top_level_name)[1].split(".")[0]
for n in self._cols
if n[: len(self._top_level_name)] == self._top_level_name
]
)
)
return next_level_names
def _create_lazy_container(self, n):
def cr():
return Container(df=self._df, name=self._top_level_name + n + ".")
return cr
def _create_first_level_container(self, names, lazy=True):
for n in names:
if lazy:
self.__dict__[n] = Proxy(self._create_lazy_container(n))
else:
self.__dict__[n] = Container(
self._df, name=self._top_level_name + n + "."
)
def to_dataframe(self, full_name=True, next_level=False):
df = self._slice_df()
if not full_name:
coln = pd.Index([k.split(self._top_level_name)[1] for k in df.columns])
df.columns = coln
return df
def recall(self, key, next_level=True, get_status=True):
sr = self[key]
if next_level:
srs = [self.__dict__[k][key] for k in self._get_next_level_names()]
srs.insert(0, sr)
sr = self._concatenate_srs(srs)
idxn = pd.Index([k.split(self._top_level_name)[1] for k in sr.index])
sr.index = idxn
dev = name2obj(self._df.devices, self._top_level_name)
memory = Memory(obj=dev, memory_dir="")
if get_status:
try:
# get setting keys from obj
mem = {
tk: {ak: sr[ak] for ak in tv.keys() if ak in sr.index}
for tk, tv in dev.get_status().items()
}
except:
mem = {"settings": {k: v for k, v in sr.items() if not "readback" in k}}
else:
mem = {"settings": {k: v for k, v in sr.items() if not "readback" in k}}
memory.recall(input_obj=mem)
def _concatenate_dfs(self, dfs):
dfc = dfs[0]
for df in dfs[1:]:
dfc = dfc.join(df)
return dfc
def _concatenate_srs(self, srs):
src = srs[0]
for sr in srs[1:]:
if type(sr) == pd.core.series.Series:
src = src.append(sr)
return src
def __dir__(self):
next_level_names = self._get_next_level_names()
to_create = np.array(
[n for n in next_level_names if not n in self.__dict__.keys()]
)
directory = list(next_level_names)
directory.extend(["to_dataframe", "recall"])
self._create_first_level_container(to_create)
return directory
def __repr__(self):
# pd.options.display.width = os.get_terminal_size().columns
return self._slice_df().T.__repr__()
def _repr_html_(self):
sdf = self._slice_df()
if hasattr(sdf, "_repr_html_"):
return sdf.T._repr_html_()
else:
return None
def __getitem__(self, key):
if type(key) is tuple:
key = list(key)
df = self._slice_df().loc[key]
if hasattr(df, "T"):
df = df.T
return df
class Run_Table2:
def __init__(
self,
data=None,
exp_id="no_exp_id",
exp_path="run_data/runtable/",
keydf_fname=None,
cred_fname=None,
devices=None,
name=None,
gsheet_key_path=None,
parse=True,
):
self._data = Run_Table_DataFrame(
data=data,
exp_id=exp_id,
exp_path=exp_path,
devices=devices,
name=name,
parse=parse,
)
if np.all([k is not None for k in [keydf_fname, cred_fname, gsheet_key_path]]):
self._google_sheet_api = Gsheet_API(
keydf_fname,
cred_fname,
exp_id,
exp_path,
gsheet_key_path,
)
self.channels_gsheet = self._google_sheet_api.gsheet_keys
self._rt_gsheet = RuntableGsheet(self._google_sheet_api.gc.open_by_key(self._google_sheet_api._spreadsheet_key))
# self._rt_gsheet.require_worksheets()
else:
self._google_sheet_api = None
self.__dir__() # why necessary to run here?
def update(self):
self._rt_gsheet.set_available_keys(self._data.df.keys())
self._rt_gsheet.fill_run_table_data(self._data.df)
def append_run(
self,
runno,
metadata,
d={},
wait=False,
):
ar = threading.Thread(target=self._append_run, args=(runno, metadata,), kwargs={"d": d})
ar.start()
if wait:
ar.join()
def _append_run(
self,
runno,
metadata,
d={},
):
self._data.append_run(runno, metadata, d=d)
if self._google_sheet_api is not None:
df = self._reduce_df()
self._google_sheet_api._upload_all(df=df)
# self._rt_gsheet.set_available_keys(self._data.df.keys())
self._rt_gsheet.fill_run_table_data(self._data.df)
def append_pos(
self,
name,
wait=False,
):
ar = threading.Thread(target=self._append_pos, args=(name,))
ar.start()
if wait:
ar.join()
def _append_pos(
self,
name,
):
self._data.append_pos(name)
if self._google_sheet_api is not None:
df = self._reduce_df()
self._google_sheet_api._upload_all(df=df)
def to_dataframe(self):
return DataFrame(self._data)
###### diagnostic and convencience functions ######
def from_pgroup(self, pgroup):
"""
returns a run_table instance of the specified pgroup
note: this does neither replace the current run_table nor switch the automatic appending of data to a new run_table or pgroup
usage: run_table_pxxx = run_table.run_table_from_other_pgroup('pxxx')
"""
return Run_Table2(
data=f"/sf/bernina/data/{pgroup}/res/run_table/{pgroup}_runtable.pkl"
)
def check_timeouts(self, include_bad_adjustables=True, plot=True, repeats=1):
return self._data.check_timeouts(
include_bad_adjustables=include_bad_adjustables, plot=plot, repeats=repeats
)
def _reduce_df(
self,
keys=None,
):
if keys is None:
keys = self._google_sheet_api.gsheet_keys()
dfs = []
for key in keys.split(" "):
d = self
add = True
for k in key.split("."):
if k in d.__dict__.keys():
d = d.__dict__[k]
else:
add = False
if all([hasattr(d, "to_dataframe"), add]):
dfs.append(d.to_dataframe())
dfc = self._concatenate_dfs(dfs)
return dfc
def _concatenate_dfs(self, dfs):
dfc = dfs[0]
for df in dfs[1:]:
dfc = dfc.join(df)
return dfc
def _create_lazy_container(self, dev):
def cr():
return Container(df=self._data, name=dev + ".")
return cr
def __dir__(self):
lazy = True
devs = np.unique(np.array([n.split(".")[0] for n in self._data.columns]))
for dev in devs:
if dev not in self.__dict__.keys():
if lazy:
self.__dict__[dev] = Proxy(self._create_lazy_container(dev))
else:
self.__dict__[dev] = Container(df=self._data, name=dev + ".")
directory = self.__dict__.keys()
return directory
def __str__(self):
devs = np.unique(np.array([n.split(".")[0] for n in self._data.columns]))
devs = np.array([dev for dev in devs if 0<len(dev)])
devs_abc = np.array([dev[0] for dev in devs])
devs_dict = {abc: devs[devs_abc == abc] for abc in np.unique(devs_abc)}
devs_str = ""
for key, value in devs_dict.items():
devs_str = devs_str + f"{key.capitalize()}\n"
for val in value:
devs_str = devs_str + f"{val}\n"
devs_str = devs_str + f"\n"
return devs_str
def __repr__(self):
return self.__str__()
def check_timeouts(self, include_bad_adjustables=True, plot=True, repeats=1):
return self._data.check_timeouts(
include_bad_adjustables=include_bad_adjustables, plot=plot, repeats=repeats
)
class Run_Table_DataFrame(DataFrame):
def __init__(
self,
data=None,
exp_id=None,
exp_path=None,
devices=None,
name=None,
parse=True,
):
if type(data) is str:
data = pd.read_pickle(data)
super().__init__(data=data)
### Load devices to parse for adjustables ###
if type(devices) == str:
import importlib
devices = importlib.import_module(devices)
self.devices = devices
self.name = name
self.fname = exp_path + f"{exp_id}_runtable.pkl"
self.load()
self.parse = parse
### dicts holding adjustables and bad (not connected) adjustables ###
self.adjustables = {}
self.bad_adjustables = {}
self.ids_parsed = {}
self.ids_bad = []
###parsing options
self._parse_exclude_keys = "status_indicators settings_collection status_indicators_collection presets memory _elog _currentChange _flags __ alias namespace scan MasterEventSystem _motor Alias".split(
" "
)
self._parse_exclude_class_types = (
"__ alias namespace scan MasterEventSystem _motor Alias Collection".split(
" "
)
)
self._adj_exclude_class_types = (
"__ alias namespace scan MasterEventSystem _motor Alias".split(" ")
)
self.key_order = "metadata gps xrd midir env_thc temperature1_rbk temperature2_rbk time name gps gps_hex thc ocb eos las lxt phase_shifter mono att att_fe slit_und slit_switch slit_att slit_kb slit_cleanup pulse_id mono_energy_rbk att_transmission att_fe_transmission"
pd.options.display.max_rows = 100
pd.options.display.max_columns = 50
pd.set_option("display.float_format", lambda x: "%.5g" % x)
# def _get_values(self):
# is_connected = np.array([pv.connected for pv in self._pvs.values()])
# filtered_dict = {key: pv.value for key, pv in self._pvs.items() if pv.connected}
# return filtered_dict
@property
def df(self):
return self
@df.setter
def df(self, data):
super().__init__(data)
@df.deleter
def df(self):
return
def _remove_duplicates(self):
self.df = self[~self.index.duplicated(keep="last")]
def save(self):
data_dir = Path(os.path.dirname(self.fname))
if not data_dir.exists():
print(
f"Path {data_dir.absolute().as_posix()} does not exist, will create it..."
)
data_dir.mkdir(parents=True)
print(f"Tried to create {data_dir.absolute().as_posix()}")
data_dir.chmod(0o775)
print(f"Tried to change permissions to 775")
pd.DataFrame(self).to_pickle(self.fname + "tmp")
call(["mv", self.fname + "tmp", self.fname])
def load(self):
if os.path.exists(self.fname):
self.df = pd.read_pickle(self.fname)
def _append_run(self, runno, metadata={}, d={}, wait=False):
if wait:
self.append_run(runno, metadata=metadata, d=d)
else:
ar = threading.Thread(
target=self.append_run,
args=[
runno,
],
kwargs={"metadata": metadata, "d": d},
)
ar.start()
def append_run(
self,
runno,
metadata={
"type": "ascan",
"name": "phi scan (001)",
"scan_motor": "phi",
"from": 1,
"to": 2,
"steps": 51,
},
d={},
):
self.load()
if np.all([len(self.ids_parsed) == 0, self.parse]):
self._parse_parent()
dat = self._get_adjustable_values(d=d)
dat["metadata.time"] = datetime.now()
dat.update({"metadata."+k:v for k, v in metadata.items()})
values = np.array(
list(dat.values()), dtype=object
)
index = np.array(
list(dat.keys())
)
run_df = DataFrame([values], columns=index, index=[runno])
# deprecated: self.df = self.append(run_df)
self.df = pd.concat([self.df, run_df])
self._remove_duplicates()
# self.order_df()
self.save()
def append_pos(self, name="", d={}):
self.load()
if np.all([len(self.ids_parsed) == 0, self.parse]):
self._parse_parent()
try:
posno = (
int(self[self["metadata.type"] == "pos"].index[-1].split("p")[1]) + 1
)
except:
posno = 0
dat = self._get_adjustable_values(d=d)
dat.update({"metadata.time": datetime.now(), "metadata.name": name, "metadata.type": "pos"})
values = np.array(
list(dat.values()), dtype=object
)
index = np.array(
list(dat.keys())
)
pos_df = DataFrame([values], columns=index, index=[f"p{posno}"])
# deprecated: self.df = self.append(pos_df)
self.df = pd.concat([self.df, pos_df])
self._remove_duplicates()
# self.order_df()
self.save()
def _get_adjustable_values(self, silent=False, d={}, by_id=True, multiindex=False):
"""
This function gets the values of all adjustables in good adjustables and raises an error, when an adjustable is not connected anymore
"""
dat = {}
if self.parse:
for aid, adict in self.ids_parsed.items():
if aid in self.ids_bad:
continue
if not "value" in adict.keys():
continue
## try getting the value from the dict passed from the status
v = None
for name in adict["names"]:
if "bernina." + name in d.keys():
v = d["bernina." + name]
break
if v is None:
try:
v = adict["value"].get_current_value()
except:
print(
f"run_table: getting value of {adict['names']} failed, adding it to list of bad adjustables run_table._data.ids_bad"
)
self.ids_bad.append(aid)
continue
for name in adict["names"]:
dat[name]=v
else:
if len(d)==0:
st = namespace.get_status(base=None)
d = st["status"]
d.update(st["settings"])
dat = {k[len(k.split(".")[0])+1:] : v for k,v in d.items()}
return dat
def _get_all_adjustables(
self, device, adj_prefix=None, parent_name=None, verbose=False, exclude_keys=[], adjustable_exclude_class_types=[], foo_get_current_value="get_current_value"
):
if verbose:
print(f"\nparsing children of {parent_name}")
print("parent_name", parent_name)
print("device.name", device.name)
print("adj_prefix", adj_prefix)
if adj_prefix is not None:
name = ".".join([adj_prefix, device.name])
else:
name = device.name
if id(device) in self.ids_parsed.keys():
if "ids" in self.ids_parsed[id(device)].keys():
if len(self.ids_parsed[id(device)]["ids"]) > 0:
for adj_id in self.ids_parsed[id(device)]["ids"]:
adj_name = self.ids_parsed[adj_id]["name"]
# if parent_name == name:
# k = adj_name
# else:
k = ".".join([name, adj_name])
self.ids_parsed[adj_id]["names_parent"].append(name)
# self.ids_parsed[adj_id]["names"].append(".".join([parent_name, k]))
self.ids_parsed[adj_id]["names"].append(k)
return
for key in device.__dict__.keys():
if ~np.any([s in key for s in exclude_keys]):
value = device.__dict__[key]
if np.all(
[
~np.any(
[
s in str(type(value))
for s in adjustable_exclude_class_types
]
),
hasattr(value, foo_get_current_value),
]
):
## create device entry only if it has adjustables
if id(device) in self.ids_parsed.keys():
if not "ids" in self.ids_parsed[id(device)].keys():
self.ids_parsed[id(device)]["ids"] = []
else:
self.ids_parsed.update({id(device): {"ids": []}})
k = ".".join([name, key])
if id(value) in self.ids_parsed.keys():
if "value" in self.ids_parsed[id(value)]:
self.ids_parsed[id(device)]["ids"].append(id(value))
self.ids_parsed[id(value)]["names_parent"].append(name)
self.ids_parsed[id(value)]["names"].append(k)
continue
self.ids_parsed[id(device)]["ids"].append(id(value))
self.ids_parsed[id(value)] = {}
self.ids_parsed[id(value)]["names_parent"] = [name]
self.ids_parsed[id(value)]["name"] = key
self.ids_parsed[id(value)]["names"] = [k]
self.ids_parsed[id(value)]["value"] = value
#if parent_name == name:
## only a fix to record get_current_values() of top level devices
#if hasattr(device, foo_get_current_value):
# ## create device entry only if it has adjustables
# if id(device) in self.ids_parsed.keys():
# if not "ids" in self.ids_parsed[id(device)].keys():
# self.ids_parsed[id(device)]["ids"] = []
# else:
# self.ids_parsed.update({id(device): {"ids": []}})
# self.ids_parsed[id(device)]["ids"].append(id(device))
# self.ids_parsed[id(device)]["names_parent"] = [name]
# self.ids_parsed[id(device)]["name"] = "value"
# self.ids_parsed[id(device)]["names"] = [".".join([name, "value"])]
# self.ids_parsed[id(device)]["value"] = device
def _parse_child_instances(
self, parent_class, adj_prefix=None, parent_name=None, verbose=False, exclude_keys=[], parse_exclude_class_types=[], adjustable_exclude_class_types=[], is_eco=True, foo_get_current_value="get_current_value"
):
# check if the parent_class was already parsed in its parents
if adj_prefix is not None:
if parent_class.name in adj_prefix:
return []
self._get_all_adjustables(
parent_class, adj_prefix, parent_name, verbose=verbose, exclude_keys=exclude_keys, adjustable_exclude_class_types=adjustable_exclude_class_types, foo_get_current_value=foo_get_current_value
)
if adj_prefix is not None:
adj_prefix = ".".join([adj_prefix, parent_class.name])
else:
adj_prefix = parent_class.name
sub_classes = []
for key in parent_class.__dict__.keys():
if ~np.any([s in key for s in exclude_keys]):
s_class = parent_class.__dict__[key]
reqs = []
if is_eco:
reqs = [
"eco" in str(s_class.__class__),
]
if np.all(
reqs +
[
hasattr(s_class, "__dict__"),
s_class.__hash__ is not None,
~np.any(
[
s in str(s_class.__class__)
for s in parse_exclude_class_types
]
),
]
):
if adj_prefix is None or ~np.any(
[
key == s
for s in ".".join([parent_name, adj_prefix]).split(".")
]
):
s_class_name = None
if hasattr(s_class, "name"):
s_class_name = s_class.name
if s_class_name == None:
s_class.name = key
sub_classes.append(s_class)
return set(sub_classes).union(
[
s
for c in sub_classes
for s in self._parse_child_instances(c, adj_prefix, parent_name, exclude_keys=exclude_keys, parse_exclude_class_types=parse_exclude_class_types, adjustable_exclude_class_types=adjustable_exclude_class_types, is_eco=is_eco, foo_get_current_value=foo_get_current_value)
]
)
def _parse_parent(self, parent=None, verbose=False, exclude_keys=[], parse_exclude_class_types=[], adjustable_exclude_class_types=[], is_eco=True, foo_get_current_value="get_current_value"):
if len(exclude_keys) == 0:
exclude_keys = self._parse_exclude_keys
if len(parse_exclude_class_types) == 0:
parse_exclude_class_types = self._parse_exclude_class_types
if len(adjustable_exclude_class_types) == 0:
adjustable_exclude_class_types = self._adj_exclude_class_types
self.ids_parsed = {}
if parent == None:
parent = self.devices
self.ids_parsed[id(parent)] = {"ids": []}
for key in parent.__dict__.keys():
try:
if ~np.any([s in key for s in exclude_keys]):
s_class = parent.__dict__[key]
reqs = []
if is_eco:
reqs = [
"eco" in str(s_class.__class__),
]
if np.all(
reqs +
[
hasattr(s_class, "__dict__"),
s_class.__hash__ is not None,
~np.any(
[
s in str(s_class.__class__)
for s in parse_exclude_class_types
]
),
]
):
s_class_name = None
if hasattr(s_class, "name"):
s_class_name = s_class.name
if s_class_name == None:
s_class.name = key
self._parse_child_instances(
s_class, parent_name=key, verbose=verbose, exclude_keys=exclude_keys, parse_exclude_class_types=parse_exclude_class_types, adjustable_exclude_class_types=adjustable_exclude_class_types, is_eco=is_eco, foo_get_current_value=foo_get_current_value
)
except Exception as e:
print(e)
print(key)
self._check_adjustables()
def _check_adjustables(self, check_for_current_none_values=True, by_id=True):
self.ids_bad = []
for aid, adict in self.ids_parsed.items():
if "value" in adict.keys():
try:
v = adict["value"].get_current_value()
except Exception as e:
self.ids_bad.append(aid)
print(
f"get_current_value() method of {adict['names']} failed with {e}"
)
continue
if check_for_current_none_values and v is None:
self.ids_bad.append(aid)
def _orderlist(self, mylist, key_order, orderlist=None):
key_order = key_order.split(" ")
if orderlist == None:
index = np.concatenate(
[np.where(np.array(mylist) == k)[0] for k in key_order if k in mylist]
)
else:
index = np.concatenate(
[
np.where(np.array(orderlist) == k)[0]
for k in key_order
if k in orderlist
]
)
curidx = np.arange(len(mylist))
newidx = np.append(index, np.delete(curidx, index))
return [mylist[n] for n in newidx]
def order_df(self, key_order=None):
"""
This function orders the columns of the stored dataframe by the given key_order.
key_order is a string with consecutive keys such as 'name type pulse_id. It defaults to self.key_order'
"""
if key_order is None:
key_order = self.key_order
devs = [item[0] for item in list(self.columns)]
self.df = self[self._orderlist(list(self.columns), key_order, orderlist=devs)]
#### diagnostic and convenience functions ####
def check_timeouts(
self, include_bad_adjustables=True, repeats=1, plot=True, verbose=True
):
if np.all([len(self.ids_parsed) == 0, self.parse]):
self._parse_parent(verbose=verbose)
ts = []
devs = []
def get_dev_adjs(dev):
for k, adj in dev.items():
val = adj.get_current_value()
for k, dev in self.good_adjustables.items():
def func(dev=dev):
return get_dev_adjs(dev)
t = timeit.timeit(func, number=repeats)
ts.append(float(t))
devs.append(k)
print(k, t)
idx = np.argsort(ts)
self.times = [np.array(devs)[idx], np.array(ts)[idx]]
print("recorded adjustable results stored in run_table._data.times")
if include_bad_adjustables:
for k, dev in self.bad_adjustables.items():
def func(dev=dev):
return get_dev_adjs(dev)
t = timeit.timeit(func, number=repeats)
ts.append(float(t))
devs.append(k)
print(k, t)
idx = np.argsort(ts)
print(
"rejected timed out adjustable results stored in run_table._data.times_rejected"
)
self.times_rejected = [np.array(devs)[idx], np.array(ts)[idx]]
if plot:
import pylab as plt
fig, ax = plt.subplots(1)
if include_bad_adjustables:
plt.barh(
self.times_rejected[0],
self.times_rejected[1],
color="red",
label="rejected adjustables",
)
plt.barh(
self.times[0],
self.times[1],
label="recorded adjustables",
color="seagreen",
)
plt.xlabel("time (s)")
plt.legend()
def name2obj(obj_parent, name, delimiter="."):
if type(name) is str:
name = name.split(delimiter)
obj = obj_parent
for tn in name:
if not tn or tn == "self":
obj = obj
else:
obj = obj.__dict__[tn]
return obj
+1 -1
View File
@@ -48,7 +48,7 @@ class KbVer(Assembly):
self._append(
AdjustableVirtual,
[self.bend1, self.bend2],
lambda b1, b2: float(np.diff([b1, b2])),
lambda b1, b2: np.diff([b1, b2])[0],
lambda mn: self._get_benders_set_diff(mn),
name="bender_diff",
unit="mm",
+1 -1
View File
@@ -67,7 +67,7 @@ class OffsetMirrorsBernina(Assembly):
self._append(
AdjustableVirtual,
[self.mirr1.rz, self.mirr2.rz],
lambda b1, b2: float(np.diff([b1, b2])),
lambda b1, b2: np.diff([b1, b2])[0],
lambda mn: self._set_diff_2adj(mn, self.mirr1.rz, self.mirr2.rz),
name="rz_diff",
unit="mrad",
+6 -4
View File
@@ -107,15 +107,17 @@ class XrayPulsePicker(Assembly):
else:
return "unknown"
def open(self):
def open(self,verbose=True):
self.evr_output_enable.set_target_value(1).wait()
#self._evrsrc.put(62)
print("Opened Pulse Picker")
if verbose:
print("Opened Pulse Picker")
def close(self):
def close(self,verbose=True):
self.evr_output_enable.set_target_value(0).wait()
#self._evrsrc.put(62)
print("Closed Pulse Picker")
if verbose:
print("Closed Pulse Picker")
def set_target_value(self,value,hold=False):
if value: