""" Plot channels from BS (beam synchronous) or PV (EPICS) sources to the terminal. """ from typing import Tuple, Optional from itertools import cycle, islice import re import time import asyncio from collections import deque import click import numpy as np from bsread import dispatcher, source import epics try: from nicegui import ui, app, context, run except ImportError as e: print("nicegui not available, falling back to console plot.") print(e) from plots import ConsolePlot, PlotlyPlot def clean_name(fname): """Returns the filename with all special characters but / . _ - # replaced with an underscore. """ cleaned_name = re.sub(r"[^a-zA-Z0-9\/\,\#\._-]", "_", fname) return cleaned_name class ChannelData: """ Collection of timestamps and values of a channel in a deque with a maximum length. Additionally allows to save the data (peridodically) to a file. Args: name (str): The name of the channel. save_file_prefix (str, optional): The prefix for the save file name. Defaults to None. window_length (int, optional): The maximum length of the data window. Defaults to 500. """ def __init__(self, name: str, save_file_prefix=None, window_length=500): self.name = name self.window_length = window_length self._x, self._y = deque(), deque() self.save_file_prefix = save_file_prefix if self.save_file_prefix is not None: self.save = True else: self.save = False if self.save: self.x_save, self.y_save = deque(), deque() save_file_prefix = clean_name(save_file_prefix) self.save_fname = clean_name(f"{save_file_prefix}_{self.name}.dat") with open(self.save_fname, "w", encoding="utf-8") as f: header = f"timestamp {self.name}\n" f.write(header) @property def x(self): return np.array(self._x) @property def y(self): return np.array(self._y) def append(self, x, y): """ Append a new data point to the channel. Args: x: The x value. y: The y value. """ self._x.append(x) self._y.append(y) if len(self._x) > self.window_length: self._x.popleft() self._y.popleft() if self.save: self.x_save.append(x) self.y_save.append(y) def trigger_save(self): """ Trigger the save operation to save the data to a file. """ if self.save: out = np.zeros((len(self.x_save), 2)) out[:, 0] = self.x_save out[:, 1] = self.y_save self.x_save, self.y_save = deque(), deque() with open(self.save_fname, "a", encoding="utf-8") as f: np.savetxt( f, out, fmt="%.14g", ) def __str__(self) -> str: return f"Channel:{self.name}" def __repr__(self) -> str: return f"Channel({self.name})" def yield_bs_data(channel_names: list[str]): """Yields BS data from the given channels with timestamp.""" if len(channel_names) == 0: return None with source(channels=channel_names) as stream: while True: message = stream.receive() # messages contains: # pulse_id = message.data.pulse_id # global_timestamp = message.data.global_timestamp, # the global timestamp in seconds since 1970 for this pulse_id # global_timestamp_offset = message.data.global_timestamp_offset, the ns part of the timestamp? # channel_data = message.data.data['channel_name'] # message.data.data['SINSB0-'].timestamp # timestamp of the IOC # message.data.data['SINSB0-'].timestamp_offset # timestamp offset (ns) of the IOC timestamp = message.data.global_timestamp + message.data.global_timestamp_offset / 1000_000_000 channel_data = {"pulse_id": {"value": message.data.pulse_id, "timestamp": timestamp}} for key, value in message.data.data.items(): channel_data[key] = { "value": value.value, "timestamp": value.timestamp + value.timestamp_offset / 1000_000_000, } yield channel_data def yield_pv_data(channel_names: list[str]): """Yields PV data from the given channels with timestamp.""" pvs = {channel: epics.PV(channel) for channel in channel_names} channel_data = {} while True: for channel, pv in pvs.items(): data = pv.get_with_metadata() channel_data[channel] = {"value": data["value"], "timestamp": data["timestamp"]} yield channel_data def yield_test_data(channel_names: list[str]): """Yields test data from the given channels with timestamp.""" import random import time while True: channel_data = { channel: {"value": random.random(), "timestamp": time.time()} for channel in channel_names } yield channel_data def roundrobin(*iterables): "Visit input iterables in a cycle until each is exhausted." # roundrobin('ABC', 'D', 'EF') → A D E B F C # Algorithm credited to George Sakkis, posted as recipe in itertools iterators = map(iter, iterables) for num_active in range(len(iterables), 0, -1): iterators = cycle(islice(iterators, num_active)) yield from map(next, iterators) def yield_data(channel_names: list, only_pv=False): """Obtains data as a generator from either BS or PV sources. Decides based on availability, uses BS data as default. Returns: channel_data as dict (e.g. channel_data[channel_name]['value', 'timestamp']) """ bs_channels = dispatcher.get_current_channels() # separate into BS, PV and test channels test_list = [ch for ch in channel_names if ch.startswith("TEST_CHANNEL")] for test_ch in test_list: channel_names.remove(test_ch) if only_pv: pv_list = channel_names bs_list = [] else: bs_list = [ch for ch in channel_names if ch in bs_channels] pv_list = [ch for ch in channel_names if ch not in bs_list] for channel_data in roundrobin( yield_bs_data(bs_list), yield_pv_data(pv_list), yield_test_data(test_list) ): yield channel_data def collect_data(channels: dict): """Collects data from the given channels and yields the updated data dictionary.""" # typical channel_data: # {'pulse_id': {'value': 21289615168, 'timestamp': 1717427820.5366151}, # 'SINSB01-RLLE-DSP:PHASE-VS': {'value': -121.776505, 'timestamp': 1717418177.347}} channel_names = list(channels.keys()) for update in yield_data(channel_names): for name, entry in update.items(): channels[name].append(entry["timestamp"], entry["value"]) yield channels async def runner( channels: dict, plot_interface, acquire_interval=0.1, plot_interval=1, in_subplots=False, abort=False, relative_time=True, ): """ Loop for data collection and plotting. acquire_interval: interval in seconds for data acquisition (needs to be faster than 100 Hz, e.g. smaller than 0.01 s to capture all BS data). """ if relative_time: start_time = time.time() else: start_time = 0 collector = collect_data(channels) # initialize generator last_collect_time = time.time() last_plot_time = time.time() plot = plot_interface while not abort: now = time.time() elapsed_collect = now - last_collect_time elapsed_plot = now - last_plot_time if elapsed_collect > acquire_interval: channels = collector.send(None) last_collect_time = time.time() if elapsed_plot > plot_interval: # default: 1 Hz plot updates and save interval plot.update_plot(channels, in_subplots=in_subplots, start_time=start_time) for channel in channels.values(): channel.trigger_save() last_plot_time = time.time() sleep_time = max(0, acquire_interval - elapsed_collect) await asyncio.sleep(sleep_time) # should be faster than 100 Hz for BS data @click.command() @click.argument("channel_names", nargs=-1) @click.option("--acquire-interval", default=0.1, help="Interval in seconds for data collection.") @click.option("--plot-interval", default=1, help="Interval in seconds for plot update.") @click.option("--save-prefix", default=None, help="Prefix for the save filenames.") @click.option("--in-subplots", default=False, type=bool, help="Whether to plot in subplots.") @click.option( "--web", is_flag=True, default=False, type=bool, help="Whether to use the local web interface as a GUI." ) def main( channel_names: Tuple[str, ...], acquire_interval=0.1, plot_interval=1, save_prefix: Optional[str] = None, in_subplots=False, web=False, ): """Plot channels from BS (beam synchronous) or PV (EPICS) sources.""" channels = {channel: ChannelData(channel, save_file_prefix=save_prefix) for channel in channel_names} start_time = time.time() if not web: # console plot version plot = ConsolePlot(channels, start_time=start_time, in_subplots=in_subplots) asyncio.run( runner( channels, plot, acquire_interval=float(acquire_interval), plot_interval=plot_interval, in_subplots=in_subplots, ) ) else: # nicegui version plot = PlotlyPlot(channels, start_time=start_time, in_subplots=in_subplots) async def run_async(): await runner( channels, plot, acquire_interval=float(acquire_interval), plot_interval=plot_interval, in_subplots=in_subplots, ) app.on_startup(run_async) ui.run(reload=False, port=8004) if __name__ == "__main__": main()