Files
tst/script/startup_c.py
2025-10-23 16:33:17 +02:00

140 lines
4.3 KiB
Python

###################################################################################################
#Utilities
###################################################################################################
def get_setting(name=None):
"""Get a persisted script setting value.
Args:
name (str): name of the setting.
Returns:
String with setting value or None if setting is undefined.
If name is None then returns map with all settings.
"""
return Context.getSettings() if (name is None) else Context.getSetting(name)
def set_setting(name, value):
"""Set a persisted script setting value.
Args:
name (str): name of the setting.
value (obj): value for the setting, converted to string (if None then remove the setting).
Returns:
None.
"""
Context.setSetting(name, value)
def exec_cmd(cmd, stderr_raise_ex = True):
"""Executes a shell command. If errors happens raises an exception.
Args:
cmd (str or list of str): command process input and parameters. If stderr_raise_ex is set then raise exception if stderr is not null.
Returns:
Output of command process.
"""
import subprocess
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE if stderr_raise_ex else subprocess.STDOUT)
ret=result.stdout.decode('utf-8')
err=result.stderr.decode('utf-8')
if stderr_raise_ex and (err is not None) and err!="":
raise Exception(err)
return ret
def bsget(channel, modulo=1, offset=0, timeout = 5.0):
"""Reads an values a bsread stream, using the default provider.
Args:
channel(str or list of str): channel name(s)
module(int, optional): stream modulo
offset(int, optional): stream offset
timeout(float, optional): stream timeout in secs
Returns:
BS value or list of values
"""
channels = to_list(channel)
ret = Stream.readChannels(channels, modulo, offset, int(timeout * 1000))
if is_string(channel):
return ret[0]
return ret
def flatten(data):
"""Flattens multi-dimentional or nested data.
Args:
data (tuple, array, List or Array): input data
Returns:
Iterator on the flattened data.
"""
if is_array(data):
if not data.typecode.startswith('['):
return data
import itertools
return itertools.chain(*data)
def frange_gen(start, finish, step):
while ((step >= 0.0) and (start <= finish)) or ((step < 0.0) and (start >= finish)):
yield start
start += step
def frange(start, finish, step, enforce_finish = False, inclusive_finish = False):
"""Create a list with a range of float values (a float equivalent to "range").
Args:
start(float): start of range.
finish(float): end of range.
step(float): step size.
enforce_finish(boolean, optional): adds the final element even if range was not exact.
inclusive_finish(boolean, optional): if false finish is exclusive (like in "range").
Returns:
list
"""
step = float(step)
ret = list(frange_gen(start, finish, step))
if len(ret) > 0:
if inclusive_finish == False:
if ret[-1]==finish:
del ret[-1]
if enforce_finish and ret[-1]!=finish:
ret.append(finish)
return ret
def notify(subject, text, attachments = None, to=None):
"""Send email message.
Args:
subject(str): Message subject.
text(str): Message body.
attachments(list of str, optional): list of files to be attached (expansion tokens are allowed).
to (list ofd str, optional): recipients. If None uses the recipients defined in mail.properties.
Returns:
None
"""
Context.notify(subject, text, to_list(attachments), to_list(to))
def expand_path(path, timestamp=-1):
"""Expand path containing tokens.
Args:
path(str): path name.
timestamp(int): If not defined(-1), uses now.
Returns:
Expanded path name.
"""
return Setup.expandPath(path, timestamp)
def send_event(name, value=True):
"""Send an interpreter event, which is propagated as a SSE.
Args:
name(str): event name.
value(Object): event value.
"""
get_interpreter().sendEvent(name, value)