merge newer version of scan code

This commit is contained in:
2020-03-27 22:12:23 +00:00
parent baa6be88f3
commit 0cbd394ea0
3 changed files with 118 additions and 21 deletions
+28
View File
@@ -0,0 +1,28 @@
class RunFilenameGenerator:
def __init__(self, path, prefix="run", Ndigits=4, separator="_", suffix="json"):
self.separator = separator
self.prefix = prefix
self.Ndigits = Ndigits
self.path = Path(path)
self.suffix = suffix
def get_existing_runnumbers(self):
pattern = self.prefix + self.Ndigits * "[0-9]" + self.separator + "*." + self.suffix
fl = self.path.glob(pattern)
fl = [tf for tf in fl if tf.is_file()]
runnos = [int(tf.name.split(self.prefix)[1].split(self.separator)[0]) for tf in fl]
return runnos
def get_nextrun_filename(self, name):
runnos = self.get_existing_runnumbers()
if runnos:
runno = max(runnos) + 1
else:
runno = 0
return self.prefix + "{{:0{:d}d}}".format(self.Ndigits).format(runno) + self.separator + name + "." + self.suffix
+67 -11
View File
@@ -1,51 +1,107 @@
from pathlib import Path
import numpy as np
from .scansimple import ScanSimple
from ..devices_general.adjustable import DummyAdjustable
class Scanner:
def __init__(self, data_base_dir="", scan_info_dir="", default_counters=[], checker=None, scan_directories=False):
self.data_base_dir = data_base_dir
scan_info_dir = Path(scan_info_dir)
if not scan_info_dir.exists():
print(f"Path {scan_info_dir.absolute().as_posix()} does not exist, will try to create it...")
scan_info_dir.mkdir(parents=True)
print(f"Tried to create {scan_info_dir.absolute().as_posix()}")
scan_info_dir.chmod(0o775)
print(f"Tried to change permissions to 775")
for counter in default_counters:
if counter._default_file_path is not None:
data_dir = Path(counter._default_file_path + self.data_base_dir)
if not data_dir.exists():
print(f"Path {data_dir.absolute().as_posix()} does not exist, will try to create it...")
data_dir.mkdir(parents=True)
print(f"Tried to create {data_dir.absolute().as_posix()}")
data_dir.chmod(0o775)
print(f"Tried to change permissions to 775")
self.scan_info_dir = scan_info_dir
self.filename_generator = RunFilenameGenerator(self.scan_info_dir)
self._default_counters = default_counters
self.checker = checker
self._scan_directories = scan_directories
def ascan(self, adjustable, start_pos, end_pos, N_intervals, N_pulses, file_name=None, start_immediately=True, step_info=None):
def ascan(self, adjustable, start_pos, end_pos, N_intervals, N_pulses, file_name=None, counters=[], start_immediately=True, step_info=None):
positions = np.linspace(start_pos, end_pos, N_intervals + 1)
values = [[tp] for tp in positions]
s = ScanSimple([adjustable], values, self._default_counters, file_name, Npulses=N_pulses, basepath=self.data_base_dir, scan_info_dir=self.scan_info_dir, checker=self.checker, scan_directories=self._scan_directories)
file_name = self.filename_generator.get_nextrun_filename(file_name)
if not counters:
counters = self._default_counters
s = ScanSimple([adjustable], values, counters, file_name, Npulses=N_pulses, basepath=self.data_base_dir, scan_info_dir=self.scan_info_dir, checker=self.checker, scan_directories=self._scan_directories)
if start_immediately:
s.scanAll(step_info=step_info)
return s
def a2scan(self, adjustable0, start0_pos, end0_pos, adjustable1, start1_pos, end1_pos, N_intervals, N_pulses, file_name=None, start_immediately=True, step_info=None):
def a2scan(self, adjustable0, start0_pos, end0_pos, adjustable1, start1_pos, end1_pos, N_intervals, N_pulses, file_name=None, counters=[], start_immediately=True, step_info=None):
positions0 = np.linspace(start0_pos, end0_pos, N_intervals + 1)
positions1 = np.linspace(start1_pos, end1_pos, N_intervals + 1)
values = [[tp0, tp1] for tp0, tp1 in zip(positions0, position1)]
s = ScanSimple([adjustable0, adjustable1], values, self._default_counters, file_name, Npulses=N_pulses, basepath=self.data_base_dir, scan_info_dir=self.scan_info_dir, checker=self.checker, scan_directories=self._scan_directories)
values = [[tp0, tp1] for tp0, tp1 in zip(positions0, positions1)]
if not counters:
counters = self._default_counters
s = ScanSimple([adjustable0, adjustable1], values, self.counters, file_name, Npulses=N_pulses, basepath=self.data_base_dir, scan_info_dir=self.scan_info_dir, checker=self.checker, scan_directories=self._scan_directories)
if start_immediately:
s.scanAll(step_info=step_info)
return s
def rscan(self, adjustable, start_pos, end_pos, N_intervals, N_pulses, file_name=None, start_immediately=True):
def rscan(self, adjustable, start_pos, end_pos, N_intervals, N_pulses, file_name=None, counters=[], start_immediately=True, step_info=None):
positions = np.linspace(start_pos, end_pos, N_intervals + 1)
current = adjustable.get_current_value()
values = [[tp + current] for tp in positions]
s = ScanSimple([adjustable], values, self._default_counters, file_name, Npulses=N_pulses, basepath=self.data_base_dir, scan_info_dir=self.scan_info_dir, checker=self.checker, scan_directories=self._scan_directories)
file_name = self.filename_generator.get_nextrun_filename(file_name)
if not counters:
counters = self._default_counters
s = ScanSimple([adjustable], values, counters, file_name, Npulses=N_pulses, basepath=self.data_base_dir, scan_info_dir=self.scan_info_dir, checker=self.checker, scan_directories=self._scan_directories)
if start_immediately:
s.scanAll()
s.scanAll(step_info=step_info)
return s
def dscan(self, *args, **kwargs):
print("Warning: dscan will be deprecated for rscan unless someone explains what it stands for in spec!")
return self.rscan(*args, **kwargs)
def ascanList(self, adjustable, posList, N_pulses, file_name=None, start_immediately=True, step_info=None):
positions = posList.astype(np.float)
def ascanList(self, adjustable, posList, N_pulses, file_name=None, counters=[], start_immediately=True, step_info=None):
positions = posList
values = [[tp] for tp in positions]
s = ScanSimple([adjustable], values, self._default_counters, file_name, Npulses=N_pulses, basepath=self.data_base_dir, scan_info_dir=self.scan_info_dir, checker=self.checker, scan_directories=self._scan_directories)
file_name = self.filename_generator.get_nextrun_filename(file_name)
if not counters:
counters = self._default_counters
s = ScanSimple([adjustable], values, counters, file_name, Npulses=N_pulses, basepath=self.data_base_dir, scan_info_dir=self.scan_info_dir, checker=self.checker, scan_directories=self._scan_directories)
if start_immediately:
s.scanAll(step_info=step_info)
return s
def a2scanList(self, adjustable0, start0_pos, end0_pos, adjustable1, start1_pos, end1_pos, N_intervals, N_pulses, file_name=None, counters=[], start_immediately=True, step_info=None):
positions0 = np.linspace(start0_pos, end0_pos, N_intervals + 1)
positions1 = posList
values = [[tp0, tp1] for tp0, tp1 in zip(positions0, positions1)]
if not counters:
counters = self._default_counters
s = ScanSimple([adjustable0, adjustable1], values, self.counters, file_name, Npulses=N_pulses, basepath=self.data_base_dir, scan_info_dir=self.scan_info_dir, checker=self.checker, scan_directories=self._scan_directories)
if start_immediately:
s.scanAll(step_info=step_info)
return s
def acquire(self, N_pulses, N_repetitions=1, file_name=None, counters=[], start_immediately=True, step_info=None):
adjustable = DummyAdjustable()
positions = list(range(N_repetitions))
values = [[tp] for tp in positions]
file_name = self.filename_generator.get_nextrun_filename(file_name)
if not counters:
counters = self._default_counters
s = ScanSimple([adjustable], values, counters, file_name, Npulses=N_pulses, basepath=self.data_base_dir, scan_info_dir=self.scan_info_dir, checker=self.checker, scan_directories=self._scan_directories)
if start_immediately:
s.scanAll(step_info=step_info)
return s
+23 -10
View File
@@ -1,11 +1,12 @@
import os
import json
import traceback
import colorama
class ScanSimple:
def __init__(self, adjustables, values, counterCallers, fina, Npulses=100, basepath="", scan_info_dir="", checker=None, scan_directories=False):
def __init__(self, adjustables, values, counterCallers, fina, Npulses=100, basepath="", scan_info_dir="", checker=None, scan_directories=False, callbackStartStep=None, checker_sleep_time=0.2):
self.Nsteps = len(values)
self.pulses_per_step = Npulses
self.adjustables = adjustables
@@ -20,7 +21,7 @@ class ScanSimple:
self.scan_info = {
"scan_parameters": {
"name": [ta.name for ta in adjustables],
"Id": [ta.Id for ta in adjustables]
"Id": [ta.Id if hasattr(ta, "Id") else "noId" for ta in adjustables]
},
"scan_values_all": values,
"scan_values": [],
@@ -29,27 +30,37 @@ class ScanSimple:
"scan_step_info": []
}
self.scan_info_filename = os.path.join(self.scan_info_dir, fina)
self.scan_info_filename += "_scan_info.json"
# self.scan_info_filename += "_scan_info.json"
self._scan_directories = scan_directories
self.checker = checker
self.initial_values = []
self._checker_sleep_time = checker_sleep_time
print(f"Scan info in file {self.scan_info_filename}.")
for adj in self.adjustables:
tv = adj.get_current_value()
self.initial_values.append(adj.get_current_value())
print("Initial value of %s : %g" % (adj.name, tv))
def get_filename(self, stepNo, Ndigits=4):
fina = os.path.join(self.basepath, self.fina)
fina = os.path.join(self.basepath, Path(self.fina).stem)
if self._scan_directories:
fina = os.path.join(fina, self.fina)
fina += "_step%04d" % stepNo
return fina
def doNextStep(self, step_info=None, verbose=True):
# for call in self.callbacks_start_step:
# call()
if self.checker:
while not self.checker.check():
print("Condition checker is not happy, waiting for OK conditions.")
self.checker.sleep()
first_check = time()
checker_unhappy = False
while not self.checker.check_now():
print(colorama.Fore.RED + f"Condition checker is not happy, waiting for OK conditions since {time()-first_check:5.1f} seconds." + colorama.Fore.RESET, end="\r")
sleep(self._checker_sleep_time)
checker_unhappy = True
if checker_unhappy:
print(colorama.Fore.RED + f"Condition checker was not happy and waiting for {time()-first_check:5.1f} seconds." + colorama.Fore.RESET)
self.checker.clear_and_start_counting()
if not len(self.values_todo) > 0:
return False
@@ -59,7 +70,7 @@ class ScanSimple:
ms = []
fina = self.get_filename(self.nextStep)
for adj, tv in zip(self.adjustables, values_step):
ms.append(adj.changeTo(tv))
ms.append(adj.set_target_value(tv))
for tm in ms:
tm.wait()
readbacks_step = []
@@ -79,7 +90,7 @@ class ScanSimple:
print("Done with acquisition")
if self.checker:
if not self.checker.check():
if not self.checker.stop_and_analyze():
return True
if callable(step_info):
tstepinfo = step_info()
@@ -113,11 +124,13 @@ class ScanSimple:
tb = "Ended all steps without interruption."
finally:
print(tb)
if input("Move back to initial values? (y/n)")[0] == "y":
self.changeToInitialValues()
def changeToInitialValues(self):
c = []
for adj, iv in zip(self.adjustables, self.initial_values):
c.append(adj.changeTo(iv))
c.append(adj.set_target_value(iv))
return c