Files
miniarchiver/script/local.py
gobbo_a 8837d77401
2024-05-23 08:55:33 +02:00

135 lines
4.7 KiB
Python

###################################################################################################
# Deployment specific global definitions - executed after startup.py
###################################################################################################
import java.time.format.DateTimeFormatter as DateTimeFormatter
import java.time.Instant as Instant
import java.time.LocalDateTime as LocalDateTime
import java.time.ZoneOffset as ZoneOffset
import org.jfree.chart.axis.DateAxis as DateAxis
import traceback
import ch.psi.pshell.epics.InvalidValueAction as InvalidValueAction
#with Channel("ABORF-A0-HVPS:I", monitored=False) as ch:
# print ch.read()
class Channel:
def __init__(self,channel_name, monitored, polling):
self.channel_name=channel_name
self.monitored= monitored
self.polling = polling
def __enter__(self):
self.channel=GenericChannel(self.channel_name,self.channel_name, True, InvalidValueAction.None)
self.channel.setMonitored(self.monitored)
self.channel.setPolling(self.polling)
#self.channel.setPolling(1000)
self.channel.initialize()
return self.channel
def __exit__(self, exc_type, exc_val, exc_tb):
self.channel.close()
#timeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")
timeFormatter = DateTimeFormatter.ofPattern("HH:mm:ss.SSS")
def get_time_str(timestamp):
#instant = Instant.ofEpochMilli(timestamp)
#currentTime = LocalDateTime.ofInstant(instant, ZoneOffset.systemDefault())
#return currentTime.format(timeFormatter);
return str(timestamp)
ITEM_SEPARATOR = "; "
def init_archiver(f, cls = None):
if cls is None:
cls = "java.lang.Double"
f.write ("#Timestamp" + ITEM_SEPARATOR + "Value" + ITEM_SEPARATOR + "\n")
f.write ("#java.lang.Long" + ITEM_SEPARATOR + Class.getName(cls) + ITEM_SEPARATOR + "\n")
f.flush()
def write_archive(f, timestamp, value):
v = get_time_str(timestamp) + ITEM_SEPARATOR+ str(value) + ITEM_SEPARATOR
f.write (v+ "\n")
f.flush()
class ChannelListener (DeviceListener):
def __init__(self, f):
self.f = f
def onCacheChanged(self, ch, value, former, timestamp, valueChange):
if self.f.tell() == 0:
cls=ch.getElementType()
init_archiver(self.f, cls)
write_archive(self.f, timestamp, value)
def parse_archiver_config(metadata=True, file_name=None):
#path = expand_path("{script}/config")
path = "/sls/rf/data/SLS_Teststand/PShell/sls_archiver/config/RF"
ret = {}
if not file_name:
for file_name in os.listdir(path):
if file_name.endswith(".config"):
# If the file has the desired extension, add it to the list
ret.update(parse_archiver_config(metadata, path + "/" + file_name))
else:
with open(file_name, 'r') as f:
for line in f:
line = line.strip()
if line.startswith('#'):
continue # Skip lines starting with '#'
parts = line.split() # Split the line by whitespace
if len(parts) >= 4:
def adjust(v):
try:
if v=="None":
return False
elif v=="Monitor":
return True
else:
return float(v)
except Exception as e:
print (e)
return None
name = parts[0]
if metadata or not "." in name:
v1 = adjust(parts[1])
v2 = adjust(parts[2])
v3 = adjust(parts[3])
ret[name] = [v1, v2, v3]
return ret
def parse_short_term_archiver_config(metadata):
ret = parse_archiver_config(metadata)
log("Initializing " + str(len(ret)) + " channels")
cfg = {name:v[0] for name,v in ret.items()}
log(str(cfg))
return cfg
def plot_channel(channel, day, title=None):
path="/"+day+"/archiver|/" + channel.replace(":", "_")
data =load_data(path)
tm = [i[0]for i in data]
val = [i[1]for i in data]
p=plot(val, name=channel + " [" + day + "]", xdata=tm, title=title)[0]
axis = DateAxis(None)
axis.setTickLabelPaint(p.getAxisTextColor())
p.getChart().getXYPlot().setDomainAxis(axis)
#channels = parse_short_term_archiver_config(False)
#for c in channels.keys():
# print c, caget(c)