diff --git a/plot_channels b/plot_channels new file mode 100755 index 0000000..5185634 Binary files /dev/null and b/plot_channels differ diff --git a/plot_channels.py b/plot_channels.py new file mode 100644 index 0000000..17c7daa --- /dev/null +++ b/plot_channels.py @@ -0,0 +1,291 @@ +""" Plot channels from BS (beam synchronous) or PV (EPICS) sources to the terminal. """ + +from typing import Tuple, Optional +from itertools import cycle, islice +import re +import time + +from collections import deque +import click + +import numpy as np + +from bsread import dispatcher, source + +import plotext as plt +import epics + + +def clean_name(fname): + """Returns the filename with all special characters but / . _ - # + replaced with an underscore. + + """ + cleaned_name = re.sub(r"[^a-zA-Z0-9\/\,\#\._-]", "_", fname) + return cleaned_name + + +class ChannelData: + """ + Collection of timestamps and values of a channel in a deque with a maximum length. + Additionally allows to save the data (peridodically) to a file. + + Args: + name (str): The name of the channel. + save_file_prefix (str, optional): The prefix for the save file name. Defaults to None. + window_length (int, optional): The maximum length of the data window. Defaults to 500. + + """ + + def __init__(self, name: str, save_file_prefix=None, window_length=500): + self.name = name + self.window_length = window_length + self._x, self._y = deque(), deque() + self.save_file_prefix = save_file_prefix + + if self.save_file_prefix is not None: + self.save = True + else: + self.save = False + + if self.save: + self.x_save, self.y_save = deque(), deque() + + save_file_prefix = clean_name(save_file_prefix) + self.save_fname = clean_name(f"{save_file_prefix}_{self.name}.dat") + + with open(self.save_fname, "w", encoding="utf-8") as f: + header = f"timestamp {self.name}\n" + f.write(header) + + @property + def x(self): + return np.array(self._x) + + @property + def y(self): + return np.array(self._y) + + def append(self, x, y): + """ + Append a new data point to the channel. + + Args: + x: The x value. + y: The y value. + + """ + self._x.append(x) + self._y.append(y) + + if len(self._x) > self.window_length: + self._x.popleft() + self._y.popleft() + + if self.save: + self.x_save.append(x) + self.y_save.append(y) + + def trigger_save(self): + """ + Trigger the save operation to save the data to a file. + """ + if self.save: + out = np.zeros((len(self.x_save), 2)) + out[:, 0] = self.x_save + out[:, 1] = self.y_save + + self.x_save, self.y_save = deque(), deque() + + with open(self.save_fname, "a", encoding="utf-8") as f: + np.savetxt( + f, + out, + fmt="%.14g", + ) + + def __str__(self) -> str: + return f"Channel:{self.name}" + + def __repr__(self) -> str: + return f"Channel({self.name})" + + +def yield_bs_data(channel_names: list[str]): + """Yields BS data from the given channels with timestamp.""" + if len(channel_names) == 0: + return None + + with source(channels=channel_names) as stream: + while True: + message = stream.receive() + # messages contains: + # pulse_id = message.data.pulse_id + # global_timestamp = message.data.global_timestamp, + # the global timestamp in seconds since 1970 for this pulse_id + # global_timestamp_offset = message.data.global_timestamp_offset, the ns part of the timestamp? + + # channel_data = message.data.data['channel_name'] + # message.data.data['SINSB0-'].timestamp # timestamp of the IOC + # message.data.data['SINSB0-'].timestamp_offset # timestamp offset (ns) of the IOC + + timestamp = message.data.global_timestamp + message.data.global_timestamp_offset / 1000_000_000 + channel_data = {"pulse_id": {"value": message.data.pulse_id, "timestamp": timestamp}} + + for key, value in message.data.data.items(): + channel_data[key] = { + "value": value.value, + "timestamp": value.timestamp + value.timestamp_offset / 1000_000_000, + } + + yield channel_data + + +def yield_pv_data(channel_names: list[str]): + """Yields PV data from the given channels with timestamp.""" + + pvs = {channel: epics.PV(channel) for channel in channel_names} + channel_data = {} + + while True: + for channel, pv in pvs.items(): + data = pv.get_with_metadata() + channel_data[channel] = {"value": data["value"], "timestamp": data["timestamp"]} + + yield channel_data + + +def roundrobin(*iterables): + "Visit input iterables in a cycle until each is exhausted." + # roundrobin('ABC', 'D', 'EF') → A D E B F C + # Algorithm credited to George Sakkis, posted as recipe in itertools + iterators = map(iter, iterables) + for num_active in range(len(iterables), 0, -1): + iterators = cycle(islice(iterators, num_active)) + yield from map(next, iterators) + + +def yield_data(channel_names: list, only_pv=False): + """Obtains data as a generator from either BS or PV sources. + Decides based on availability, uses BS data as default. + + Returns: channel_data as dict (e.g. channel_data[channel_name]['value', 'timestamp']) + """ + bs_channels = dispatcher.get_current_channels() + + # separate into BS and PV channels + + if only_pv: + pv_list = channel_names + bs_list = [] + else: + bs_list = [ch for ch in channel_names if ch in bs_channels] + pv_list = [ch for ch in channel_names if ch not in bs_list] + + for channel_data in roundrobin(yield_bs_data(bs_list), yield_pv_data(pv_list)): + yield channel_data + + +def collect_data(channels: dict): + """Collects data from the given channels and yields the updated data dictionary.""" + + # typical channel_data: + # {'pulse_id': {'value': 21289615168, 'timestamp': 1717427820.5366151}, + # 'SINSB01-RLLE-DSP:PHASE-VS': {'value': -121.776505, 'timestamp': 1717418177.347}} + + channel_names = list(channels.keys()) + + for update in yield_data(channel_names): + for name, entry in update.items(): + channels[name].append(entry["timestamp"], entry["value"]) + + yield channels + + +def plot(channels, start_time=0, in_subplots=False, marker="hd"): + """Plots channels on the console.""" + + # plt.clf() + plt.limit_size(False, False) + plt.theme("pro") + + plt.xlabel("Timestamp") + plt.ylabel("Value") + + num_channels = len(channels) + + if in_subplots: + plt.subplots(num_channels, 1) + plt.main().plot_size(plt.tw(), plt.th() - 1) + plt.clear_data() + + for i, channel in enumerate(channels.values()): + if in_subplots: + plt.subplot(i + 1, 1) + + plt.plot(channel.x - start_time, channel.y, label=channel.name, marker=marker) + + plt.show() + + +def runner( + channels: dict, acquire_interval=0.1, plot_interval=1, in_subplots=False, abort=False, relative_time=True +): + """ + Loop for data collection and plotting. + acquire_interval: interval in seconds for data acquisition (needs to be faster than 100 Hz, + e.g. smaller than 0.01 s to capture all BS data). + """ + + if relative_time: + start_time = time.time() + else: + start_time = 0 + + collector = collect_data(channels) # initialize generator + + last_collect_time = time.time() + last_plot_time = time.time() + + while not abort: + now = time.time() + elapsed_collect = now - last_collect_time + elapsed_plot = now - last_plot_time + + if elapsed_collect > acquire_interval: + channels = collector.send(None) + last_collect_time = time.time() + + if elapsed_plot > plot_interval: # default: 1 Hz plot updates and save interval + plot(channels, in_subplots=in_subplots, start_time=start_time) + + for channel in channels.values(): + channel.trigger_save() + + last_plot_time = time.time() + + sleep_time = max(0, acquire_interval - elapsed_collect) + + time.sleep(sleep_time) # should be faster than 100 Hz for BS data + + +@click.command() +@click.argument("channel_names", nargs=-1) +@click.option("--acquire-interval", default=0.1, help="Interval in seconds for data collection.") +@click.option("--plot-interval", default=1, help="Interval in seconds for plot update.") +@click.option("--save-prefix", default=None, help="Prefix for the save filenames.") +@click.option("--in-subplots", default=False, type=bool, help="Whether to plot in subplots.") +def main( + channel_names: Tuple[str, ...], + acquire_interval=0.1, + plot_interval=1, + save_prefix: Optional[str] = None, + in_subplots=False, +): + """ Plot channels from BS (beam synchronous) or PV (EPICS) sources. """ + channels = {channel: ChannelData(channel, save_file_prefix=save_prefix) for channel in channel_names} + runner(channels, acquire_interval=float(acquire_interval), plot_interval=plot_interval, in_subplots=in_subplots) + + +if __name__ == "__main__": + main()