Files
plot_channels/plot_channels.py
T
2024-06-04 18:25:30 +02:00

292 lines
8.9 KiB
Python

""" Plot channels from BS (beam synchronous) or PV (EPICS) sources to the terminal. """
from typing import Tuple, Optional
from itertools import cycle, islice
import re
import time
from collections import deque
import click
import numpy as np
from bsread import dispatcher, source
import plotext as plt
import epics
def clean_name(fname):
"""Returns the filename with all special characters but / . _ - #
replaced with an underscore.
"""
cleaned_name = re.sub(r"[^a-zA-Z0-9\/\,\#\._-]", "_", fname)
return cleaned_name
class ChannelData:
"""
Collection of timestamps and values of a channel in a deque with a maximum length.
Additionally allows to save the data (peridodically) to a file.
Args:
name (str): The name of the channel.
save_file_prefix (str, optional): The prefix for the save file name. Defaults to None.
window_length (int, optional): The maximum length of the data window. Defaults to 500.
"""
def __init__(self, name: str, save_file_prefix=None, window_length=500):
self.name = name
self.window_length = window_length
self._x, self._y = deque(), deque()
self.save_file_prefix = save_file_prefix
if self.save_file_prefix is not None:
self.save = True
else:
self.save = False
if self.save:
self.x_save, self.y_save = deque(), deque()
save_file_prefix = clean_name(save_file_prefix)
self.save_fname = clean_name(f"{save_file_prefix}_{self.name}.dat")
with open(self.save_fname, "w", encoding="utf-8") as f:
header = f"timestamp {self.name}\n"
f.write(header)
@property
def x(self):
return np.array(self._x)
@property
def y(self):
return np.array(self._y)
def append(self, x, y):
"""
Append a new data point to the channel.
Args:
x: The x value.
y: The y value.
"""
self._x.append(x)
self._y.append(y)
if len(self._x) > self.window_length:
self._x.popleft()
self._y.popleft()
if self.save:
self.x_save.append(x)
self.y_save.append(y)
def trigger_save(self):
"""
Trigger the save operation to save the data to a file.
"""
if self.save:
out = np.zeros((len(self.x_save), 2))
out[:, 0] = self.x_save
out[:, 1] = self.y_save
self.x_save, self.y_save = deque(), deque()
with open(self.save_fname, "a", encoding="utf-8") as f:
np.savetxt(
f,
out,
fmt="%.14g",
)
def __str__(self) -> str:
return f"Channel:{self.name}"
def __repr__(self) -> str:
return f"Channel({self.name})"
def yield_bs_data(channel_names: list[str]):
"""Yields BS data from the given channels with timestamp."""
if len(channel_names) == 0:
return None
with source(channels=channel_names) as stream:
while True:
message = stream.receive()
# messages contains:
# pulse_id = message.data.pulse_id
# global_timestamp = message.data.global_timestamp,
# the global timestamp in seconds since 1970 for this pulse_id
# global_timestamp_offset = message.data.global_timestamp_offset, the ns part of the timestamp?
# channel_data = message.data.data['channel_name']
# message.data.data['SINSB0-'].timestamp # timestamp of the IOC
# message.data.data['SINSB0-'].timestamp_offset # timestamp offset (ns) of the IOC
timestamp = message.data.global_timestamp + message.data.global_timestamp_offset / 1000_000_000
channel_data = {"pulse_id": {"value": message.data.pulse_id, "timestamp": timestamp}}
for key, value in message.data.data.items():
channel_data[key] = {
"value": value.value,
"timestamp": value.timestamp + value.timestamp_offset / 1000_000_000,
}
yield channel_data
def yield_pv_data(channel_names: list[str]):
"""Yields PV data from the given channels with timestamp."""
pvs = {channel: epics.PV(channel) for channel in channel_names}
channel_data = {}
while True:
for channel, pv in pvs.items():
data = pv.get_with_metadata()
channel_data[channel] = {"value": data["value"], "timestamp": data["timestamp"]}
yield channel_data
def roundrobin(*iterables):
"Visit input iterables in a cycle until each is exhausted."
# roundrobin('ABC', 'D', 'EF') → A D E B F C
# Algorithm credited to George Sakkis, posted as recipe in itertools
iterators = map(iter, iterables)
for num_active in range(len(iterables), 0, -1):
iterators = cycle(islice(iterators, num_active))
yield from map(next, iterators)
def yield_data(channel_names: list, only_pv=False):
"""Obtains data as a generator from either BS or PV sources.
Decides based on availability, uses BS data as default.
Returns: channel_data as dict (e.g. channel_data[channel_name]['value', 'timestamp'])
"""
bs_channels = dispatcher.get_current_channels()
# separate into BS and PV channels
if only_pv:
pv_list = channel_names
bs_list = []
else:
bs_list = [ch for ch in channel_names if ch in bs_channels]
pv_list = [ch for ch in channel_names if ch not in bs_list]
for channel_data in roundrobin(yield_bs_data(bs_list), yield_pv_data(pv_list)):
yield channel_data
def collect_data(channels: dict):
"""Collects data from the given channels and yields the updated data dictionary."""
# typical channel_data:
# {'pulse_id': {'value': 21289615168, 'timestamp': 1717427820.5366151},
# 'SINSB01-RLLE-DSP:PHASE-VS': {'value': -121.776505, 'timestamp': 1717418177.347}}
channel_names = list(channels.keys())
for update in yield_data(channel_names):
for name, entry in update.items():
channels[name].append(entry["timestamp"], entry["value"])
yield channels
def plot(channels, start_time=0, in_subplots=False, marker="hd"):
"""Plots channels on the console."""
# plt.clf()
plt.limit_size(False, False)
plt.theme("pro")
plt.xlabel("Timestamp")
plt.ylabel("Value")
num_channels = len(channels)
if in_subplots:
plt.subplots(num_channels, 1)
plt.main().plot_size(plt.tw(), plt.th() - 1)
plt.clear_data()
for i, channel in enumerate(channels.values()):
if in_subplots:
plt.subplot(i + 1, 1)
plt.plot(channel.x - start_time, channel.y, label=channel.name, marker=marker)
plt.show()
def runner(
channels: dict, acquire_interval=0.1, plot_interval=1, in_subplots=False, abort=False, relative_time=True
):
"""
Loop for data collection and plotting.
acquire_interval: interval in seconds for data acquisition (needs to be faster than 100 Hz,
e.g. smaller than 0.01 s to capture all BS data).
"""
if relative_time:
start_time = time.time()
else:
start_time = 0
collector = collect_data(channels) # initialize generator
last_collect_time = time.time()
last_plot_time = time.time()
while not abort:
now = time.time()
elapsed_collect = now - last_collect_time
elapsed_plot = now - last_plot_time
if elapsed_collect > acquire_interval:
channels = collector.send(None)
last_collect_time = time.time()
if elapsed_plot > plot_interval: # default: 1 Hz plot updates and save interval
plot(channels, in_subplots=in_subplots, start_time=start_time)
for channel in channels.values():
channel.trigger_save()
last_plot_time = time.time()
sleep_time = max(0, acquire_interval - elapsed_collect)
time.sleep(sleep_time) # should be faster than 100 Hz for BS data
@click.command()
@click.argument("channel_names", nargs=-1)
@click.option("--acquire-interval", default=0.1, help="Interval in seconds for data collection.")
@click.option("--plot-interval", default=1, help="Interval in seconds for plot update.")
@click.option("--save-prefix", default=None, help="Prefix for the save filenames.")
@click.option("--in-subplots", default=False, type=bool, help="Whether to plot in subplots.")
def main(
channel_names: Tuple[str, ...],
acquire_interval=0.1,
plot_interval=1,
save_prefix: Optional[str] = None,
in_subplots=False,
):
""" Plot channels from BS (beam synchronous) or PV (EPICS) sources. """
channels = {channel: ChannelData(channel, save_file_prefix=save_prefix) for channel in channel_names}
runner(channels, acquire_interval=float(acquire_interval), plot_interval=plot_interval, in_subplots=in_subplots)
if __name__ == "__main__":
main()