refactored scanning code

This commit is contained in:
2020-03-30 14:21:44 +00:00
parent 0cbd394ea0
commit aa07c3496e
3 changed files with 130 additions and 97 deletions
+1
View File
@@ -1,3 +1,4 @@
from pathlib import Path
class RunFilenameGenerator:
+96 -78
View File
@@ -2,109 +2,127 @@ from pathlib import Path
import numpy as np
from .scansimple import ScanSimple
from ..devices_general.adjustable import DummyAdjustable
from .runname import RunFilenameGenerator
from ..devices.general.adjustable import DummyAdjustable
def make_dir(p):
p = Path(p)
if p.exists():
return
printable = p.absolute().as_posix()
print(f"Path \"{printable}\" does not exist, will try to create it...")
#TODO:
# p.mkdir(parents=True)
# p.chmod(0o775)
def make_positions(start, end, n):
return np.linspace(start, end, n + 1)
class Scanner:
def __init__(self, data_base_dir="", scan_info_dir="", default_counters=[], checker=None, scan_directories=False):
self.data_base_dir = data_base_dir
scan_info_dir = Path(scan_info_dir)
if not scan_info_dir.exists():
print(f"Path {scan_info_dir.absolute().as_posix()} does not exist, will try to create it...")
scan_info_dir.mkdir(parents=True)
print(f"Tried to create {scan_info_dir.absolute().as_posix()}")
scan_info_dir.chmod(0o775)
print(f"Tried to change permissions to 775")
self.scan_info_dir = scan_info_dir
self.default_counters = default_counters
self.checker = checker
self.scan_directories = scan_directories
make_dir(scan_info_dir)
self.filename_generator = RunFilenameGenerator(scan_info_dir)
for counter in default_counters:
if counter._default_file_path is not None:
data_dir = Path(counter._default_file_path + self.data_base_dir)
if not data_dir.exists():
print(f"Path {data_dir.absolute().as_posix()} does not exist, will try to create it...")
data_dir.mkdir(parents=True)
print(f"Tried to create {data_dir.absolute().as_posix()}")
data_dir.chmod(0o775)
print(f"Tried to change permissions to 775")
default_path = counter.default_path
if default_path is None:
continue
data_dir = default_path + data_base_dir
make_dir(data_dir)
self.scan_info_dir = scan_info_dir
self.filename_generator = RunFilenameGenerator(self.scan_info_dir)
self._default_counters = default_counters
self.checker = checker
self._scan_directories = scan_directories
def ascan(self, adjustable, start_pos, end_pos, N_intervals, N_pulses, file_name=None, counters=[], start_immediately=True, step_info=None):
positions = np.linspace(start_pos, end_pos, N_intervals + 1)
values = [[tp] for tp in positions]
file_name = self.filename_generator.get_nextrun_filename(file_name)
def make_scan(self, adjustables, positions, n_pulses, filename, counters=[], start_immediately=True, step_info=None):
print(adjustables)#, adjustables.shape, adjustables.dtype)
print(list(positions))#, positions.shape, positions.dtype)
return
filename = self.filename_generator.get_nextrun_filename(filename)
if not counters:
counters = self._default_counters
s = ScanSimple([adjustable], values, counters, file_name, Npulses=N_pulses, basepath=self.data_base_dir, scan_info_dir=self.scan_info_dir, checker=self.checker, scan_directories=self._scan_directories)
counters = self.default_counters
s = ScanSimple(adjustables, positions, counters, filename, Npulses=n_pulses, basepath=self.data_base_dir, scan_info_dir=self.scan_info_dir, checker=self.checker, scan_directories=self.scan_directories)
if start_immediately:
s.scanAll(step_info=step_info)
return s
def a2scan(self, adjustable0, start0_pos, end0_pos, adjustable1, start1_pos, end1_pos, N_intervals, N_pulses, file_name=None, counters=[], start_immediately=True, step_info=None):
positions0 = np.linspace(start0_pos, end0_pos, N_intervals + 1)
positions1 = np.linspace(start1_pos, end1_pos, N_intervals + 1)
values = [[tp0, tp1] for tp0, tp1 in zip(positions0, positions1)]
if not counters:
counters = self._default_counters
s = ScanSimple([adjustable0, adjustable1], values, self.counters, file_name, Npulses=N_pulses, basepath=self.data_base_dir, scan_info_dir=self.scan_info_dir, checker=self.checker, scan_directories=self._scan_directories)
if start_immediately:
s.scanAll(step_info=step_info)
return s
def rscan(self, adjustable, start_pos, end_pos, N_intervals, N_pulses, file_name=None, counters=[], start_immediately=True, step_info=None):
positions = np.linspace(start_pos, end_pos, N_intervals + 1)
current = adjustable.get_current_value()
values = [[tp + current] for tp in positions]
file_name = self.filename_generator.get_nextrun_filename(file_name)
if not counters:
counters = self._default_counters
s = ScanSimple([adjustable], values, counters, file_name, Npulses=N_pulses, basepath=self.data_base_dir, scan_info_dir=self.scan_info_dir, checker=self.checker, scan_directories=self._scan_directories)
if start_immediately:
s.scanAll(step_info=step_info)
return s
def ascan(self, adjustable, start_pos, end_pos, n_intervals, *args, **kwargs):
adjustables = [adjustable]
positions = make_positions(start_pos, end_pos, n_intervals)
positions = zip(positions)
return self.make_scan(adjustables, positions, *args, **kwargs)
def a2scan(self, adjustable0, start0_pos, end0_pos, adjustable1, start1_pos, end1_pos, n_intervals, *args, **kwargs):
adjustables = [adjustable0, adjustable1]
positions0 = make_positions(start0_pos, end0_pos, n_intervals)
positions1 = make_positions(start1_pos, end1_pos, n_intervals)
positions = zip(positions0, positions1)
return self.make_scan(adjustables, positions, *args, **kwargs)
def rscan(self, adjustable, start_pos, end_pos, n_intervals, *args, **kwargs):
adjustables = [adjustable]
positions = make_positions(start_pos, end_pos, n_intervals)
positions += adjustable.get_current_value()
positions = zip(positions)
return self.make_scan(adjustables, positions, *args, **kwargs)
def dscan(self, *args, **kwargs):
print("Warning: dscan will be deprecated for rscan unless someone explains what it stands for in spec!")
return self.rscan(*args, **kwargs)
def ascanList(self, adjustable, posList, N_pulses, file_name=None, counters=[], start_immediately=True, step_info=None):
positions = posList
values = [[tp] for tp in positions]
file_name = self.filename_generator.get_nextrun_filename(file_name)
if not counters:
counters = self._default_counters
s = ScanSimple([adjustable], values, counters, file_name, Npulses=N_pulses, basepath=self.data_base_dir, scan_info_dir=self.scan_info_dir, checker=self.checker, scan_directories=self._scan_directories)
if start_immediately:
s.scanAll(step_info=step_info)
return s
def a2scanList(self, adjustable0, start0_pos, end0_pos, adjustable1, start1_pos, end1_pos, N_intervals, N_pulses, file_name=None, counters=[], start_immediately=True, step_info=None):
positions0 = np.linspace(start0_pos, end0_pos, N_intervals + 1)
positions1 = posList
values = [[tp0, tp1] for tp0, tp1 in zip(positions0, positions1)]
if not counters:
counters = self._default_counters
s = ScanSimple([adjustable0, adjustable1], values, self.counters, file_name, Npulses=N_pulses, basepath=self.data_base_dir, scan_info_dir=self.scan_info_dir, checker=self.checker, scan_directories=self._scan_directories)
if start_immediately:
s.scanAll(step_info=step_info)
return s
def ascan_list(self, adjustable, positions, *args, **kwargs):
adjustables = [adjustable]
def acquire(self, N_pulses, N_repetitions=1, file_name=None, counters=[], start_immediately=True, step_info=None):
positions = zip(positions)
return self.make_scan(adjustables, positions, *args, **kwargs)
def a2scan_list(self, adjustable0, positions0, adjustable1, positions1, *args, **kwargs):
adjustables = [adjustable0, adjustable1]
positions = zip(positions0, positions1)
return self.make_scan(adjustables, positions, *args, **kwargs)
def acquire(self, n_intervals, *args, **kwargs):
adjustable = DummyAdjustable()
adjustables = [adjustable]
positions = list(range(N_repetitions))
values = [[tp] for tp in positions]
file_name = self.filename_generator.get_nextrun_filename(file_name)
if not counters:
counters = self._default_counters
s = ScanSimple([adjustable], values, counters, file_name, Npulses=N_pulses, basepath=self.data_base_dir, scan_info_dir=self.scan_info_dir, checker=self.checker, scan_directories=self._scan_directories)
if start_immediately:
s.scanAll(step_info=step_info)
return s
positions = range(n_intervals)
positions = zip(positions)
return self.make_scan(adjustables, positions, *args, **kwargs)
+33 -19
View File
@@ -38,9 +38,10 @@ class ScanSimple:
print(f"Scan info in file {self.scan_info_filename}.")
for adj in self.adjustables:
tv = adj.get_current_value()
self.initial_values.append(adj.get_current_value())
self.initial_values.append(tv)
print("Initial value of %s : %g" % (adj.name, tv))
def get_filename(self, stepNo, Ndigits=4):
fina = os.path.join(self.basepath, Path(self.fina).stem)
if self._scan_directories:
@@ -48,6 +49,8 @@ class ScanSimple:
fina += "_step%04d" % stepNo
return fina
def doNextStep(self, step_info=None, verbose=True):
# for call in self.callbacks_start_step:
# call()
@@ -64,74 +67,85 @@ class ScanSimple:
if not len(self.values_todo) > 0:
return False
values_step = self.values_todo[0]
if verbose:
print("Starting scan step %d of %d" % (self.nextStep + 1, len(self.values_todo) + len(self.values_done)))
ms = []
fina = self.get_filename(self.nextStep)
for adj, tv in zip(self.adjustables, values_step):
ms.append(adj.set_target_value(tv))
tm = adj.set_target_value(tv)
ms.append(tm)
for tm in ms:
tm.wait()
readbacks_step = []
for adj in self.adjustables:
readbacks_step.append(adj.get_current_value())
tv = adj.get_current_value()
readbacks_step.append(tv)
if verbose:
print("Moved variables, now starting acquisition")
filenames = []
acs = []
for ctr in self.counterCallers:
acq = ctr.acquire(file_name=fina, Npulses=self.pulses_per_step)
filenames.extend(acq.file_names)
acs.append(acq)
for ta in acs:
ta.wait()
if verbose:
print("Done with acquisition")
if self.checker:
if not self.checker.stop_and_analyze():
return True
if callable(step_info):
tstepinfo = step_info()
else:
tstepinfo = step_info
self.values_done.append(self.values_todo.pop(0))
vals = self.values_todo.pop(0)
self.values_done.append(vals)
self.readbacks.append(readbacks_step)
self.appendScanInfo(values_step, readbacks_step, step_files=filenames, step_info=tstepinfo)
self.writeScanInfo()
self.nextStep += 1
return True
def appendScanInfo(self, values_step, readbacks_step, step_files=None, step_info=None):
self.scan_info["scan_values"].append(values_step)
self.scan_info["scan_readbacks"].append(readbacks_step)
self.scan_info["scan_files"].append(step_files)
self.scan_info["scan_step_info"].append(step_info)
def writeScanInfo(self):
with open(self.scan_info_filename, "w") as f:
json.dump(self.scan_info, f, indent=4, sort_keys=True)
def scanAll(self, step_info=None):
done = False
try:
while not done:
done = not self.doNextStep(step_info=step_info)
except:
tb = traceback.format_exc()
else:
tb = "Ended all steps without interruption."
finally:
print(tb)
if input("Move back to initial values? (y/n)")[0] == "y":
self.changeToInitialValues()
while self.doNextStep(step_info=step_info):
pass
print("Ended all steps without interruption.")
if input("Move back to initial values? (y/n)")[0] == "y":
self.changeToInitialValues()
def changeToInitialValues(self):
c = []
for adj, iv in zip(self.adjustables, self.initial_values):
c.append(adj.set_target_value(iv))
return c
adj.set_target_value(iv)