#!/usr/bin/env python import warnings warnings.filterwarnings("error") import argparse parser = argparse.ArgumentParser() parser.add_argument("pvname") clargs = parser.parse_args() import numpy as np from collections import deque from time import sleep from functools import wraps from types import SimpleNamespace from matplotlib import pyplot as plt from epics import PV class PVCollection: def __init__(self, name, **kwargs): self.name = name # self.pvnames = SimpleNamespace(**kwargs) pvs = {k: PV(v) for k, v in kwargs.items()} for pv in pvs.values(): pv.wait_for_connection() self.pvs = SimpleNamespace(**pvs) def __str__(self): return self.name class Camera(PVCollection): def __init__(self, pvname): base = pvname.rsplit(":", 1)[0] super().__init__( base, image = pvname, width = base + ":WIDTH", height = base + ":HEIGHT" ) def get(self): data = self.pvs.image.get() data.shape = self.get_shape() return data def get_shape(self): width = self.pvs.width.get() height = self.pvs.height.get() shape = (width, height) return shape def add_callback(self, callback, **kwargs): @wraps(callback) def wrapper(*args, value=None, **kwargs): value.shape = self.get_shape() return callback(*args, value=value, **kwargs) return self.pvs.image.add_callback(callback=wrapper, with_ctrlvars=False, **kwargs) def disconnect(self): for pv in self.pvs.__dict__.values(): #TODO pv.disconnect() class Source(PV): def __init__(self, pvname, *args, **kwargs): self.name = pvname super().__init__(pvname, *args, **kwargs) self.wait_for_connection() class Animation: def __init__(self, source, plot_func): value = source.get() self.plot = plot_func(value) plt.suptitle(source.name) source.add_callback(self.update) plt.show() source.disconnect() def update(self, value=None, **kwargs): self.plot.set(value) plt.draw() class PlotScalar: def __init__(self, *args, **kwargs): self.fig, axs = plt.subplots(1, 2) self.ax_time, self.ax_hist = ax_time, ax_hist = axs length = 100 # init = [0] + [np.nan] * (length-2) + [0] # self.cache = cache = deque(init, maxlen=length) self.cache = cache = deque(maxlen=length) self.x = x = range(length) # lines = ax_time.plot(x, cache) lines = ax_time.plot([0], [0]) self.plot_time = lines[0] #TODO: wtf? # plot_hist = ax_hist.bar(np.arange(0, 1, 100), [0] * 100) # self.plot_hist = plot_hist def set(self, value): cache = self.cache cache.append(value) x = range(len(cache)) self.plot_time.set_data(x, cache) nums = np.nan_to_num(cache) # print(nums) if min(nums) != max(nums): self.ax_time.set_ylim(min(nums), max(nums)) self.ax_time.relim() self.ax_time.autoscale_view() nums = np.array(cache) nums = nums[np.isfinite(nums)] ys, xs = np.histogram(nums, bins="auto") # print(ys) # for rect, x, y in zip(self.plot_hist, xs, ys): # rect.set_x(x) # rect.set_height(y) width = (xs[1:] - xs[:-1]).mean() try: self.plot_hist.remove() color = self.plot_hist.patches[0].get_facecolor() except: color = None plot_hist = self.ax_hist.bar(xs[:-1], ys, color=color,align="edge", width=width) self.plot_hist = plot_hist n = len(xs) self.ax_hist.set_title(f"#bins: {n}") self.ax_hist.relim() self.ax_hist.autoscale_view() class Plot1D: def __init__(self, *args, **kwargs): lines = plt.plot(*args, **kwargs) self.plot = lines[0] #TODO: wtf? def set(self, value): self.plot.set_ydata(value) class Plot2D: def __init__(self, *args, **kwargs): self.plot = plt.imshow(*args, **kwargs) def set(self, value): self.plot.set_data(value) pvname = clargs.pvname if pvname.endswith(":FPICTURE"): print("Camera", pvname) src = Camera(pvname) fun = Plot2D else: print("Source", pvname) src = Source(pvname) print(src.nelm) if src.nelm == 0: raise SystemExit(f"{src}: {src.value}") if src.nelm == 1: print("Scalar") fun = PlotScalar else: print("1D") fun = Plot1D Animation(src, fun)