first commit

This commit is contained in:
2024-06-09 13:01:58 +02:00
commit cba54b0046
14 changed files with 516 additions and 0 deletions

162
.gitignore vendored Normal file
View File

@ -0,0 +1,162 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/latest/usage/project/#working-with-version-control
.pdm.toml
.pdm-python
.pdm-build/
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/

2
README.md Normal file
View File

@ -0,0 +1,2 @@
# smash

2
recipients.cfg Normal file
View File

@ -0,0 +1,2 @@
sven.augustin@psi.ch
0123456789 # not actually a phone number

6
rules.cfg Normal file
View File

@ -0,0 +1,6 @@
MTEST:COUNTER1 > 0.5 # test
MTEST:COUNTER2 < 0.5
MTEST:ASTRING1 != "a string"
MTEST:ASTRING2 == something else

40
smash.py Executable file
View File

@ -0,0 +1,40 @@
#!/usr/bin/env python
"""
SMS Mail Alarm Sending Helper
"""
from datetime import datetime
from smash.epicsutils import make_alarms, main_loop
from smash.notifier import Notifier
from smash.recipients import read_recipients_file
from smash.rules import read_rules_file
fn = "recipients.cfg"
recipients = read_recipients_file(fn)
fn = "rules.cfg"
rules = read_rules_file(fn)
n = Notifier()
n.add(print)
#n.add_many(recipients)
print(n)
print()
def cb(pvname=None, value=None, comparison=None, trip_point=None, timestamp=None, **kwargs):
dtts = datetime.fromtimestamp(timestamp)
n.notify(f"[{dtts}] {pvname}: {value} {comparison} {trip_point}")
alarms = make_alarms(rules, cb)
#print(alarms)
main_loop()

0
smash/__init__.py Normal file
View File

26
smash/cfgfile.py Normal file
View File

@ -0,0 +1,26 @@
def parse_string(cfg):
lines = cfg.split("\n")
yield from parse_lines(lines)
def parse_file(fn):
with open(fn) as f:
yield from parse_lines(f)
def parse_lines(lines):
for line in lines:
line = remove_comments(line)
line = line.strip()
if not line:
continue
yield line
def remove_comments(line):
return line.split("#", 1)[0]

42
smash/epicsutils.py Normal file
View File

@ -0,0 +1,42 @@
import epics
def make_alarms(rules, callback, alert_delay=1):
rules = get_pvs(rules) # this is optional and creates all PVs at once
#check_connections(rules)
alarms = []
for pvname, comparison, trip_point in rules:
print("creating alarm:", pvname, comparison, trip_point)
a = epics.Alarm(
pvname = pvname, # this can also be a PV object
comparison = comparison,
trip_point = trip_point,
callback = callback,
alert_delay = alert_delay
)
alarms.append(a)
print()
return alarms
def get_pvs(rules):
return [(epics.get_pv(n), c, t) for n, c, t in rules]
def check_connections(rules):
for pv, _c, _t in rules:
state = pv.wait_for_connection(timeout=0.1)
if not state:
print(pv)
def main_loop():
while True:
epics.poll()

23
smash/itemize.py Normal file
View File

@ -0,0 +1,23 @@
def itemize(iterable, header=None, bullet="-"):
if not bullet.endswith(" "):
bullet += " "
lines = [bullet + str(i) for i in iterable]
if header:
header = format_header(header)
lines = [header] + lines
return "\n".join(lines)
def format_header(msg, line="-"):
msg += ":"
line = line * len(msg)
msg += "\n" + line
return msg

71
smash/notifier.py Normal file
View File

@ -0,0 +1,71 @@
from time import sleep
from .sendmail import sendmail
from .sendsms import sendsms
from .itemize import itemize
class Notifier:
def __init__(self, title="slic notification", wait_time=0.1):
self.title = title
self.wait_time = wait_time
self.funcs = set()
self.mails = set()
self.phones = set()
def add_many(self, recipients):
"""
the recipients sequence may contain email addresses, phone numbers and functions
"""
for i in recipients:
self.add(i)
def add(self, recipient):
"""
recipient may be an email address, a phone number or a function
"""
if callable(recipient):
self.funcs.add(recipient)
elif "@" in str(recipient):
self.mails.add(recipient)
else:
self.phones.add(recipient)
def clear(self):
self.funcs.clear()
self.mails.clear()
self.phones.clear()
def notify(self, message, title=None):
title = title or self.title
for f in self.funcs:
f(message)
sleep(self.wait_time)
for m in self.mails:
sendmail(m, subject=title, body=message)
sleep(self.wait_time)
for p in self.phones:
sendsms(p, message)
sleep(self.wait_time)
def __repr__(self):
func_names = (f.__name__ for f in self.funcs)
printable_funcs = itemize(func_names, header="Functions")
printable_mails = itemize(self.mails, header="Mail Adresses")
printable_phones = itemize(self.phones, header="Phone Numbers")
res = (printable_funcs, printable_mails, printable_phones)
return "\n\n".join(res)

8
smash/recipients.py Normal file
View File

@ -0,0 +1,8 @@
from .cfgfile import parse_file, parse_string
read_recipients_file = parse_file
read_recipients_string = parse_string

43
smash/rules.py Normal file
View File

@ -0,0 +1,43 @@
import ast
import epics
from .cfgfile import parse_file, parse_string
COMPARISONS = sorted(epics.Alarm.ops)
PRINTABLE_COMPARISONS = ", ".join(COMPARISONS)
def read_rules_file(fn):
for line in parse_file(fn):
yield parse_line(line)
def read_rules_string(string):
for line in parse_string(string):
yield parse_line(line)
def parse_line(line):
pieces = line.split(maxsplit=2)
pieces = [i.strip() for i in pieces]
pvname, comparison, trip_point = pieces
check_comparison(comparison)
trip_point = parse_trip_point(trip_point)
return pvname, comparison, trip_point
def check_comparison(comparison):
if comparison not in COMPARISONS:
raise ValueError(f'comparison {comparison} is not known: {PRINTABLE_COMPARISONS}')
def parse_trip_point(trip_point):
#TODO: enforce quotation marks or not?
try:
return ast.literal_eval(trip_point)
except (ValueError, SyntaxError):
# raise ValueError(f'trip point {trip_point} cannot be parsed') from e
return trip_point # use the original value as string

79
smash/sendmail.py Normal file
View File

@ -0,0 +1,79 @@
import getpass
import subprocess
from email.mime.text import MIMEText
OUTPUT_DIVIDER = "The message was:"
OUTPUT_DIVIDER_BAR = "=" * len(OUTPUT_DIVIDER)
def sendmail(to_addr, from_addr=None, subject=None, body=None):
msg = Message(to_addr, from_addr=from_addr, subject=subject, body=body)
msg.send()
return msg
class Message:
def __init__(self, to_addr, from_addr=None, subject=None, body=None):
self.to_addr = to_addr
self.from_addr = from_addr
self.subject = subject
self.body = body
def send(self):
msg = self.encode()
_run_sendmail(msg)
def encode(self, *args, **kwargs):
return self.wrap().as_bytes(*args, **kwargs)
def __repr__(self):
return self.wrap().as_string()
def wrap(self):
from_addr = self.from_addr
if from_addr is None:
from_addr = getpass.getuser()
body = self.body
if body is None: # here, None does not work!
body = ""
msg = MIMEText(body)
msg["To"] = self.to_addr
msg["From"] = from_addr
msg["Subject"] = self.subject
return msg
def _run_sendmail(msg):
cmd = ("sendmail", "-t", "-oi")
res = subprocess.run(cmd, input=msg, stderr=subprocess.PIPE)
try:
res.check_returncode()
except subprocess.CalledProcessError as e:
raise SendMailError(res.returncode, res.stderr, msg) from e
class SendMailError(Exception):
def __init__(self, err_code, err_msg, email):
self.err_code = err_code
self.err_msg = err_msg = err_msg.decode().strip()
self.email = email = email.decode()
err = [
f"error code {err_code}: {err_msg}\n",
OUTPUT_DIVIDER,
OUTPUT_DIVIDER_BAR,
email
]
err = "\n".join(err)
super().__init__(err)

12
smash/sendsms.py Normal file
View File

@ -0,0 +1,12 @@
from .sendmail import sendmail
SMS_GATEWAY_ADDRESS = "swissphone-gateway.com"
def sendsms(phone_number, text):
to_addr = f"{phone_number}@{SMS_GATEWAY_ADDRESS}"
return sendmail(to_addr, body=text)