#!/usr/bin/env python3 #fn_proc = "spectrometer.py" #fn_proc = "do_nothing.py" #fn_imgs = "example_images.npy" import argparse parser = argparse.ArgumentParser(description="Profile a Processing Script") parser.add_argument("proc", help="processing script") parser.add_argument("imgs", help="example images as numpy dump") clargs = parser.parse_args() from datetime import datetime import cProfile as profile import importlib import numpy as np import os import pstats import timeit PROC_FUNC_NAME = "process_image" def main(fn_proc, fn_imgs): proc = proc_import(fn_proc) imgs = np.load(fn_imgs) pulse_id = timestamp = x_axis = y_axis = None parameters = { #TODO: this should be read from a json config, like in the pipeline "camera_name": "test", "roi_background": [0, 2048, 50, 350], "roi_signal": [0, 2048, 550, 850], "project_axis": 1 } def test(): for i, img in enumerate(imgs): # print(i) res = proc( img.T, pulse_id, timestamp, x_axis, y_axis, parameters ) proc(imgs[0].T, pulse_id, timestamp, x_axis, y_axis, parameters) timestamp = datetime.now().strftime("%Y%m%d-%H%M%S") os.makedirs("prof", exist_ok=True) fn_base_res = f"prof/{fn_proc}-{timestamp}" func_profile(test, fn_base_res) func_timeit(test, fn_base_res) # profiling def func_profile(func, fn): p = profile.Profile() p.runcall(func) p.print_stats() fn_prof = fn + ".prof" p.dump_stats(fn_prof) fn_res = fn + ".result" write_stats(p, fn_res) def write_stats(p, fn): with open(fn, "x") as f: s = pstats.Stats(p, stream=f) s.print_stats() # timing def func_timeit(func, fn, repeat=5, number=10): t = timeit.Timer(func) res = t.repeat(repeat=repeat, number=number) res = np.array(res) res /= number desc = np_describe(res) desc = printable_dict(desc) print(desc) fn_res = fn + ".timing" with open(fn_res, "x") as f: f.write(str(res.tolist())) f.write("\n\n") f.write(desc) def np_describe(a): a = np.asarray(a) aggrs = ( min, max, np.median, np.mean, np.var, np.std ) return {f.__name__: f(a) for f in aggrs} def printable_dict(d, header=None): length = maxstrlen(d) + 1 lines = ("{}:{}{}".format(k, " "*(length-strlen(k)), v) for k, v in d.items()) if header: header = format_header(header) lines = [header] + lines return "\n".join(lines) + "\n" def maxstrlen(seq): seq = [str(i) for i in seq] return maxlen(seq) def maxlen(seq): if not seq: # max of empty sequence is a ValueError return 0 return max(len(i) for i in seq) def strlen(x): return len(str(x)) # importing def proc_import(fn): proc = fn_import(fn) if not hasattr(proc, PROC_FUNC_NAME): msg = f"file \"{fn}\" does not contain processing function: {PROC_FUNC_NAME}" raise ValueError(msg) return proc.process_image def fn_import(fn): mod = remove_suffix(fn, ".py") return importlib.import_module(mod) def remove_suffix(string, suffix): if string.endswith(suffix): string = string[:-len(suffix)] return string if __name__ == "__main__": main(clargs.proc, clargs.imgs)