#!/usr/bin/env python from collections import defaultdict from pathlib import Path import pandas as pd from tqdm import tqdm from sfdata import SFScanInfo, SFDataFiles from utils import cprint, json_load def process(base, channels): base = Path(base) dirs = base.glob("run*-*") collected = [] for d in tqdm(sorted(dirs)): run, name = parse_run_name(d.name) fn_scan = d / "meta" / "scan.json" scan = SFScanInfo(fn_scan) n = len(scan) info = scan.info adj_ids = info["scan_parameters"]["Id"] first_adj_id = adj_ids[0] typ = "scan" if n == 1: if first_adj_id.lower() == "dummy": typ = "static" else: cprint(f'run {run} is single step (i.e., static) but adjustable is "{first_adj_id}" (and not some variant of "dummy"), will treat as scan', color="red") fn_acq = d / "meta" / "acq0001.json" acq = json_load(fn_acq) timestamp = acq["request_time"] n_pulses = parse_n_pulses(scan) entries = { "run": run, "filename": name, "timeStamp": timestamp, "n_pulses": n_pulses } if typ == "scan": entries["scanned_adjs"] = first_adj_id if len(adj_ids) == 1 else adj_ids res = parse_scan(run, scan) entries.update(res) step = scan[0] res = read_data(step, channels) entries.update(res) tqdm.write(str(entries)) collected.append(entries) return collected def parse_run_name(name): run, name = name.split("-", 1) assert run.startswith("run") run = run[len("run"):] run = int(run) return run, name def parse_n_pulses(scan): pids = scan.info["pulseIds"] first_pids = pids[0] pid_start, pid_stop = first_pids n_pulses = pid_stop - pid_start return n_pulses def parse_scan(run, scan): start, stop, n_steps = parse_rbks_best_effort(run, scan) res = { "v_min": start, "v_max": stop, "n_steps": n_steps } return res def parse_rbks_best_effort(run, scan): try: return parse_rbks(scan.readbacks) except Exception as e: cprint(run, "Could not parse readbacks, will use set values, because of:", e, color="red") return parse_rbks(scan.values) def parse_rbks(rbks): start = min(rbks) stop = max(rbks) nsteps = len(rbks) return start, stop, nsteps def read_data(step, channels): res = {} for col_name, ch_name in channels.items(): val = step[ch_name][0][0] res[col_name] = val return res def dump(d, fn, key="data"): df = pd.DataFrame.from_records(d) print(df) df.to_hdf(fn, key) def read_channels_file(fn): res = {} with open(fn) as f: for line in f: line = line.split("#")[0].strip() if not line: continue if "=" in line: left, right = line.split("=", 1) else: left = right = line left = left.strip() right = right.strip() # print(left, right) res[left] = right return res if __name__ == "__main__": import argparse desc = "Retroactively produce a stand output hdf5 from the written data files" parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter, description=desc) parser.add_argument("base", help="base folder (e.g., /sf/instrument/data/p12345/raw/)") parser.add_argument("-o", "--output", help="output file name, if not specified no output is written") parser.add_argument("-c", "--channels", help="channels file name, ascii file where each line is: column name = channel name", default="channels.txt") clargs = parser.parse_args() # print(clargs) # raise SystemExit chans = read_channels_file(clargs.channels) print("", "channels:", chans, "", sep="\n") coll = process(clargs.base, chans) df = pd.DataFrame.from_records(coll) cprint("", "result:", df, "", sep="\n", color="green") if clargs.output: output = clargs.output ext = ".h5" if not output.endswith(ext): output += ext df.to_hdf(output, key="data")