294 lines
9.0 KiB
Python
294 lines
9.0 KiB
Python
""" Plot channels from BS (beam synchronous) or PV (EPICS) sources to the terminal. """
|
|
|
|
from typing import Tuple, Optional
|
|
from itertools import cycle, islice
|
|
import re
|
|
import time
|
|
import asyncio
|
|
|
|
from collections import deque
|
|
import click
|
|
|
|
import numpy as np
|
|
|
|
from bsread import dispatcher, source
|
|
|
|
import plotext as plt
|
|
import epics
|
|
|
|
|
|
def clean_name(fname):
|
|
"""Returns the filename with all special characters but / . _ - #
|
|
replaced with an underscore.
|
|
|
|
"""
|
|
cleaned_name = re.sub(r"[^a-zA-Z0-9\/\,\#\._-]", "_", fname)
|
|
return cleaned_name
|
|
|
|
|
|
class ChannelData:
|
|
"""
|
|
Collection of timestamps and values of a channel in a deque with a maximum length.
|
|
Additionally allows to save the data (peridodically) to a file.
|
|
|
|
Args:
|
|
name (str): The name of the channel.
|
|
save_file_prefix (str, optional): The prefix for the save file name. Defaults to None.
|
|
window_length (int, optional): The maximum length of the data window. Defaults to 500.
|
|
|
|
"""
|
|
|
|
def __init__(self, name: str, save_file_prefix=None, window_length=500):
|
|
self.name = name
|
|
self.window_length = window_length
|
|
self._x, self._y = deque(), deque()
|
|
self.save_file_prefix = save_file_prefix
|
|
|
|
if self.save_file_prefix is not None:
|
|
self.save = True
|
|
else:
|
|
self.save = False
|
|
|
|
if self.save:
|
|
self.x_save, self.y_save = deque(), deque()
|
|
|
|
save_file_prefix = clean_name(save_file_prefix)
|
|
self.save_fname = clean_name(f"{save_file_prefix}_{self.name}.dat")
|
|
|
|
with open(self.save_fname, "w", encoding="utf-8") as f:
|
|
header = f"timestamp {self.name}\n"
|
|
f.write(header)
|
|
|
|
@property
|
|
def x(self):
|
|
return np.array(self._x)
|
|
|
|
@property
|
|
def y(self):
|
|
return np.array(self._y)
|
|
|
|
def append(self, x, y):
|
|
"""
|
|
Append a new data point to the channel.
|
|
|
|
Args:
|
|
x: The x value.
|
|
y: The y value.
|
|
|
|
"""
|
|
self._x.append(x)
|
|
self._y.append(y)
|
|
|
|
if len(self._x) > self.window_length:
|
|
self._x.popleft()
|
|
self._y.popleft()
|
|
|
|
if self.save:
|
|
self.x_save.append(x)
|
|
self.y_save.append(y)
|
|
|
|
def trigger_save(self):
|
|
"""
|
|
Trigger the save operation to save the data to a file.
|
|
"""
|
|
if self.save:
|
|
out = np.zeros((len(self.x_save), 2))
|
|
out[:, 0] = self.x_save
|
|
out[:, 1] = self.y_save
|
|
|
|
self.x_save, self.y_save = deque(), deque()
|
|
|
|
with open(self.save_fname, "a", encoding="utf-8") as f:
|
|
np.savetxt(
|
|
f,
|
|
out,
|
|
fmt="%.14g",
|
|
)
|
|
|
|
def __str__(self) -> str:
|
|
return f"Channel:{self.name}"
|
|
|
|
def __repr__(self) -> str:
|
|
return f"Channel({self.name})"
|
|
|
|
|
|
def yield_bs_data(channel_names: list[str]):
|
|
"""Yields BS data from the given channels with timestamp."""
|
|
if len(channel_names) == 0:
|
|
return None
|
|
|
|
with source(channels=channel_names) as stream:
|
|
while True:
|
|
message = stream.receive()
|
|
# messages contains:
|
|
# pulse_id = message.data.pulse_id
|
|
# global_timestamp = message.data.global_timestamp,
|
|
# the global timestamp in seconds since 1970 for this pulse_id
|
|
# global_timestamp_offset = message.data.global_timestamp_offset, the ns part of the timestamp?
|
|
|
|
# channel_data = message.data.data['channel_name']
|
|
# message.data.data['SINSB0-'].timestamp # timestamp of the IOC
|
|
# message.data.data['SINSB0-'].timestamp_offset # timestamp offset (ns) of the IOC
|
|
|
|
timestamp = message.data.global_timestamp + message.data.global_timestamp_offset / 1000_000_000
|
|
channel_data = {"pulse_id": {"value": message.data.pulse_id, "timestamp": timestamp}}
|
|
|
|
for key, value in message.data.data.items():
|
|
channel_data[key] = {
|
|
"value": value.value,
|
|
"timestamp": value.timestamp + value.timestamp_offset / 1000_000_000,
|
|
}
|
|
|
|
yield channel_data
|
|
|
|
|
|
def yield_pv_data(channel_names: list[str]):
|
|
"""Yields PV data from the given channels with timestamp."""
|
|
|
|
pvs = {channel: epics.PV(channel) for channel in channel_names}
|
|
channel_data = {}
|
|
|
|
while True:
|
|
for channel, pv in pvs.items():
|
|
data = pv.get_with_metadata()
|
|
channel_data[channel] = {"value": data["value"], "timestamp": data["timestamp"]}
|
|
|
|
yield channel_data
|
|
|
|
|
|
def roundrobin(*iterables):
|
|
"Visit input iterables in a cycle until each is exhausted."
|
|
# roundrobin('ABC', 'D', 'EF') → A D E B F C
|
|
# Algorithm credited to George Sakkis, posted as recipe in itertools
|
|
iterators = map(iter, iterables)
|
|
for num_active in range(len(iterables), 0, -1):
|
|
iterators = cycle(islice(iterators, num_active))
|
|
yield from map(next, iterators)
|
|
|
|
|
|
def yield_data(channel_names: list, only_pv=False):
|
|
"""Obtains data as a generator from either BS or PV sources.
|
|
Decides based on availability, uses BS data as default.
|
|
|
|
Returns: channel_data as dict (e.g. channel_data[channel_name]['value', 'timestamp'])
|
|
"""
|
|
bs_channels = dispatcher.get_current_channels()
|
|
|
|
# separate into BS and PV channels
|
|
|
|
if only_pv:
|
|
pv_list = channel_names
|
|
bs_list = []
|
|
else:
|
|
bs_list = [ch for ch in channel_names if ch in bs_channels]
|
|
pv_list = [ch for ch in channel_names if ch not in bs_list]
|
|
|
|
for channel_data in roundrobin(yield_bs_data(bs_list), yield_pv_data(pv_list)):
|
|
yield channel_data
|
|
|
|
|
|
def collect_data(channels: dict):
|
|
"""Collects data from the given channels and yields the updated data dictionary."""
|
|
|
|
# typical channel_data:
|
|
# {'pulse_id': {'value': 21289615168, 'timestamp': 1717427820.5366151},
|
|
# 'SINSB01-RLLE-DSP:PHASE-VS': {'value': -121.776505, 'timestamp': 1717418177.347}}
|
|
|
|
channel_names = list(channels.keys())
|
|
|
|
for update in yield_data(channel_names):
|
|
for name, entry in update.items():
|
|
channels[name].append(entry["timestamp"], entry["value"])
|
|
|
|
yield channels
|
|
|
|
|
|
def plot(channels, start_time=0, in_subplots=False, marker="hd"):
|
|
"""Plots channels on the console."""
|
|
|
|
# plt.clf()
|
|
plt.limit_size(False, False)
|
|
plt.theme("pro")
|
|
|
|
plt.xlabel("Timestamp")
|
|
plt.ylabel("Value")
|
|
|
|
num_channels = len(channels)
|
|
|
|
if in_subplots:
|
|
plt.subplots(num_channels, 1)
|
|
plt.main().plot_size(plt.tw(), plt.th() - 1)
|
|
plt.clear_data()
|
|
|
|
for i, channel in enumerate(channels.values()):
|
|
if in_subplots:
|
|
plt.subplot(i + 1, 1)
|
|
|
|
plt.plot(channel.x - start_time, channel.y, label=channel.name, marker=marker)
|
|
|
|
plt.show()
|
|
|
|
|
|
async def runner(
|
|
channels: dict, acquire_interval=0.1, plot_interval=1, in_subplots=False, abort=False, relative_time=True
|
|
):
|
|
"""
|
|
Loop for data collection and plotting.
|
|
acquire_interval: interval in seconds for data acquisition (needs to be faster than 100 Hz,
|
|
e.g. smaller than 0.01 s to capture all BS data).
|
|
"""
|
|
|
|
if relative_time:
|
|
start_time = time.time()
|
|
else:
|
|
start_time = 0
|
|
|
|
collector = collect_data(channels) # initialize generator
|
|
|
|
last_collect_time = time.time()
|
|
last_plot_time = time.time()
|
|
|
|
while not abort:
|
|
now = time.time()
|
|
elapsed_collect = now - last_collect_time
|
|
elapsed_plot = now - last_plot_time
|
|
|
|
if elapsed_collect > acquire_interval:
|
|
channels = collector.send(None)
|
|
last_collect_time = time.time()
|
|
|
|
if elapsed_plot > plot_interval: # default: 1 Hz plot updates and save interval
|
|
plot(channels, in_subplots=in_subplots, start_time=start_time)
|
|
|
|
for channel in channels.values():
|
|
channel.trigger_save()
|
|
|
|
last_plot_time = time.time()
|
|
|
|
sleep_time = max(0, acquire_interval - elapsed_collect)
|
|
|
|
await asyncio.sleep(sleep_time) # should be faster than 100 Hz for BS data
|
|
|
|
|
|
@click.command()
|
|
@click.argument("channel_names", nargs=-1)
|
|
@click.option("--acquire-interval", default=0.1, help="Interval in seconds for data collection.")
|
|
@click.option("--plot-interval", default=1, help="Interval in seconds for plot update.")
|
|
@click.option("--save-prefix", default=None, help="Prefix for the save filenames.")
|
|
@click.option("--in-subplots", default=False, type=bool, help="Whether to plot in subplots.")
|
|
def main(
|
|
channel_names: Tuple[str, ...],
|
|
acquire_interval=0.1,
|
|
plot_interval=1,
|
|
save_prefix: Optional[str] = None,
|
|
in_subplots=False,
|
|
):
|
|
""" Plot channels from BS (beam synchronous) or PV (EPICS) sources. """
|
|
channels = {channel: ChannelData(channel, save_file_prefix=save_prefix) for channel in channel_names}
|
|
asyncio.run(runner(channels, acquire_interval=float(acquire_interval), plot_interval=plot_interval, in_subplots=in_subplots))
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|