""" Plot channels from BS (beam synchronous) or PV (EPICS) sources to the terminal. """ from typing import Tuple, Optional from itertools import cycle, islice import re import time import random from collections import deque import asyncio import click import numpy as np from bsread import dispatcher, source import epics try: from nicegui import ui, app from plots import PlotlyPlot except ImportError as e: print("Nicegui not available, falling back to console plot.") print(e) from plots import ConsolePlot def clean_name(fname): """Returns the filename with all special characters but / . _ - # replaced with an underscore. """ cleaned_name = re.sub(r"[^a-zA-Z0-9\/\,\#\._-]", "_", fname) return cleaned_name class ChannelData: """ Collection of timestamps and values of a channel in a deque with a maximum length. Additionally allows to save the data (peridodically) to a file. Args: name (str): The name of the channel. save_file_prefix (str, optional): The prefix for the save file name. Defaults to None. window_length (int, optional): The maximum length of the data window. Defaults to 500. """ def __init__(self, name: str, save_file_prefix=None, window_length=500): self.name = name self.window_length = window_length self._x, self._y = deque(), deque() self.save_file_prefix = save_file_prefix if self.save_file_prefix is not None: self.save = True else: self.save = False if self.save: self.x_save, self.y_save = deque(), deque() save_file_prefix = clean_name(save_file_prefix) self.save_fname = clean_name(f"{save_file_prefix}_{self.name}.dat") with open(self.save_fname, "w", encoding="utf-8") as f: header = f"timestamp {self.name}\n" f.write(header) @property def x(self): return np.array(self._x) @property def y(self): return np.array(self._y) def append(self, x, y): """ Append a new data point to the channel. Args: x: The x value (timestamp). y: The y value. """ self._x.append(x) self._y.append(y) if len(self._x) > self.window_length: self._x.popleft() self._y.popleft() if self.save: self.x_save.append(x) self.y_save.append(y) def trigger_save(self): """ Trigger the save operation to save the data to a file. """ if self.save: out = np.zeros((len(self.x_save), 2)) out[:, 0] = self.x_save out[:, 1] = self.y_save self.x_save, self.y_save = deque(), deque() with open(self.save_fname, "a", encoding="utf-8") as f: np.savetxt( f, out, fmt="%.14g", ) def __str__(self) -> str: return f"Channel:{self.name}" def __repr__(self) -> str: return f"Channel({self.name})" def yield_bs_data(channel_names: list[str]): """Yields BS data from the given channels with timestamp.""" if len(channel_names) == 0: return None with source(channels=channel_names) as stream: while True: message = stream.receive() # messages contains: # pulse_id = message.data.pulse_id # global_timestamp = message.data.global_timestamp, # the global timestamp in seconds since 1970 for this pulse_id # global_timestamp_offset = message.data.global_timestamp_offset, the ns part of the timestamp? # channel_data = message.data.data['channel_name'] # message.data.data['SINSB0-'].timestamp # timestamp of the IOC # message.data.data['SINSB0-'].timestamp_offset # timestamp offset (ns) of the IOC timestamp = ( message.data.global_timestamp + message.data.global_timestamp_offset / 1000_000_000 ) channel_data = {"pulse_id": {"value": message.data.pulse_id, "timestamp": timestamp}} for key, value in message.data.data.items(): channel_data[key] = { "value": value.value, "timestamp": value.timestamp + value.timestamp_offset / 1000_000_000, } yield channel_data def yield_pv_data(channel_names: list[str]): """Yields PV data from the given channels with timestamp.""" pvs = {channel: epics.PV(channel) for channel in channel_names} channel_data = {} while True: for channel, pv in pvs.items(): data = pv.get_with_metadata() channel_data[channel] = {"value": data["value"], "timestamp": data["timestamp"]} yield channel_data def yield_test_data(channel_names: list[str]): """Yields test data from the given channels with timestamp.""" while True: channel_data = { channel: {"value": random.random(), "timestamp": time.time()} for channel in channel_names } yield channel_data def roundrobin(*iterables): """Visit input iterables in a cycle until each is exhausted.""" # roundrobin('ABC', 'D', 'EF') → A D E B F C # Algorithm credited to George Sakkis, posted as recipe in itertools iterators = map(iter, iterables) for num_active in range(len(iterables), 0, -1): iterators = cycle(islice(iterators, num_active)) yield from map(next, iterators) def yield_data(channel_names: list, only_pv=False): """Obtains data as a generator from either BS or PV sources. Decides based on availability, uses BS data as default. Returns: channel_data as dict (e.g. channel_data[channel_name]['value', 'timestamp']) """ bs_channels = dispatcher.get_current_channels() # separate into BS, PV and test channels test_list = [ch for ch in channel_names if ch.startswith("TEST_CHANNEL")] for test_ch in test_list: channel_names.remove(test_ch) if only_pv: pv_list = channel_names bs_list = [] else: bs_list = [ch for ch in channel_names if ch in bs_channels] pv_list = [ch for ch in channel_names if ch not in bs_list] for channel_data in roundrobin( yield_bs_data(bs_list), yield_pv_data(pv_list), yield_test_data(test_list) ): yield channel_data def collect_data(channels: dict): """Collects data from the given channels and respective sources and yields the updated data dictionary. """ # typical channel_data: # {'pulse_id': {'value': 21289615168, 'timestamp': 1717427820.5366151}, # 'SINSB01-RLLE-DSP:PHASE-VS': {'value': -121.776505, 'timestamp': 1717418177.347}} channel_names = list(channels.keys()) for update in yield_data(channel_names): for name, entry in update.items(): channels[name].append(entry["timestamp"], entry["value"]) yield channels async def runner( channels: dict, plot_interface: ConsolePlot | PlotlyPlot, acquire_interval=0.1, plot_interval=1, in_subplots=False, abort=False, relative_time=True, ): """ Loop for data collection and plotting. acquire_interval: interval in seconds for data acquisition (needs to be faster than 100 Hz, e.g. smaller than 0.01 s to capture all BS data). """ if relative_time: start_time = time.time() else: start_time = 0 collector = collect_data(channels) # initialize generator last_collect_time = time.time() last_plot_time = time.time() plot = plot_interface while not abort: now = time.time() elapsed_collect = now - last_collect_time elapsed_plot = now - last_plot_time if elapsed_collect > acquire_interval: channels = collector.send(None) last_collect_time = time.time() if elapsed_plot > plot_interval: # default: 1 Hz plot updates and save interval plot.update_plot(channels, in_subplots=in_subplots, start_time=start_time) for channel in channels.values(): channel.trigger_save() last_plot_time = time.time() sleep_time = max(0, acquire_interval - elapsed_collect) await asyncio.sleep(sleep_time) # should be faster than 100 Hz for BS data @click.command() @click.argument("channel_names", nargs=-1, type=str, required=True) @click.option("--acquire-interval", default=0.1, help="Interval in seconds for data collection.") @click.option("--plot-interval", default=1, help="Interval in seconds for plot update.") @click.option("--save-prefix", default=None, help="Prefix for the save filenames.") @click.option("--in-subplots", is_flag=True, default=False, type=bool, help="Enable plotting each channel in a subplot.") @click.option( "--relative-time", default=True, is_flag=True, type=bool, help="Use relative time since start of acquisition." ) @click.option( "--web", is_flag=True, default=False, type=bool, help="Whether to use the local web interface as a GUI.", ) def main( channel_names: Tuple[str, ...], acquire_interval=0.1, plot_interval=1, save_prefix: Optional[str] = None, in_subplots=False, relative_time=True, web=False, ): """Plot channel data from BS (beam synchronous) or PV (EPICS) sources. Use TEST_CHANNEL for test data. """ channels = { channel: ChannelData(channel, save_file_prefix=save_prefix) for channel in channel_names } if not web: # console plot interface plot = ConsolePlot(channels, in_subplots=in_subplots) asyncio.run( runner( channels, plot, acquire_interval=float(acquire_interval), plot_interval=plot_interval, in_subplots=in_subplots, relative_time=relative_time, ) ) else: # nicegui plotly interface plot = PlotlyPlot(channels, in_subplots=in_subplots) async def run_async(): await runner( channels, plot, acquire_interval=float(acquire_interval), plot_interval=plot_interval, in_subplots=in_subplots, relative_time=relative_time, ) app.on_startup(run_async) ui.run(reload=False, port=8004) if __name__ == "__main__": main()