Files
igor-public/pearl/scilog-ingest.py

332 lines
9.8 KiB
Python

#!/usr/bin/env -S uv run --script
#
# /// script
# requires-python = ">=3.12"
# dependencies = [
# "pillow",
# "scilog",
# ]
# ///
import argparse
import re
from pathlib import Path
from typing import Any, Dict, List, Optional, Set, Tuple
from PIL import Image, UnidentifiedImageError
from scilog import SciLog, LogbookMessage
def get_image_size(file_path):
"""
Args:
file_path:
Returns:
Raises:
OSError: If the file does not exist.
PIL.Image.UnidentifiedImageError: If the image is not recognized.
"""
with Image.open(file_path) as img:
return img.size
re_single_nbsp = re.compile(r"(?<!;)&nbsp;(?!&)")
re_rep_pre_spc = re.compile(r" {2,}")
def replace_pre_spaces(s: str) -> str:
"""
Replace multiple spaces with the corresponding number of &nbsp; entities.
Args:
s: string
Returns: string
"""
def nbsp(mo):
return "&nbsp;" * len(mo.group())
return re_rep_pre_spc.sub(nbsp, s)
class ScilogIngestor:
def __init__(self):
super().__init__()
self._author: str = ""
self._pgroup: str = ""
self._logbook: str = ""
self._message: str = ""
self._tags: List[str] = []
self._attachments: List[str] = []
self._url: str = ""
self._user: str = ""
self._password: str = ""
self._location: str = ""
self._scilog: Optional[SciLog] = None
def prepare(self):
pass
@staticmethod
def _adjust_image_size(file_info):
try:
filepath = file_info['filepath']
except KeyError:
return
try:
w, h = get_image_size(filepath)
except (OSError, UnidentifiedImageError):
return
else:
# limit image size
while w > 400:
w //= 2
file_info['style'] = {'width': f'{w}px', 'height': ''}
def convert_plain(self, mesg: str) -> str:
"""
convert plain-text entry to HTML
- Convert plain text message to HTML paragraph with <tt> formatting.
- Replace line feeds by <br> tags.
- Replace tabs and spaces.
Args:
mesg:
Returns: string
"""
mesg = mesg.replace("&", "&amp;").replace("<", "&lt;").replace(">", "&gt;")
mesg = mesg.replace("\n", "<br>")
mesg = mesg.expandtabs(8)
mesg = replace_pre_spaces(mesg)
mesg = "<p><tt>" + mesg + "</tt></p>"
return mesg
@staticmethod
def is_pgroup(value: str) -> str:
value = value.lower()
if len(value) == 6 and value[0] in {"e", "p"} and int(value[1:]):
return "p" + value[1:]
else:
return ""
@property
def scilog(self) -> SciLog:
"""
Return a SciLog instance.
Returns: SciLog
"""
if self._scilog is None:
options = {"username": self._user,
"password": self._password}
url = self._url
self._scilog = SciLog(url, options=options)
return self._scilog
def ingest_message(self):
"""
ingest message into scilog
this method ingests the current message into scilog.
"""
log = self.scilog
try:
logbook_name = self._logbook
except KeyError:
lb_filter = {"ownerGroup": self._pgroup, "deleted": False}
else:
lb_filter = {"ownerGroup": self._pgroup, "name": logbook_name, "deleted": False}
logbooks = log.get_logbooks(where=lb_filter, limit=10)
try:
logbook = logbooks[0]
except IndexError:
raise ValueError(f"no logbook found for {lb_filter}")
else:
if len(logbooks) > 1:
raise ValueError(f"multiple logbooks found for {lb_filter}")
else:
log.select_logbook(logbook)
msg = LogbookMessage()
if self._message[0] == '<':
msg.add_text(self._message)
else:
for line in self._message.split('\n'):
msg.add_text(line)
for att in self._attachments:
msg.add_file(att)
file_info = msg._content.files[-1]
self._adjust_image_size(file_info)
if self._tags:
msg.add_tag(self._tags)
log.send_logbook_message(msg)
def load_attributes_from_file(self, attributes_file):
"""
Load attributes, tags and attachment paths from a file.
Each line of the file declares a key:value pair.
The keys are:
- author: (required) Value is the e-mail address of the author.
- pgroup: (required) Value is the p-group of the logbook.
- logbook: (required) Value is the name of the logbook.
- location: (not used) Value is the name of the location. Currently not used.
- tag: (optional) Value is a tag to be added to the snippet. Can occur multiple times.
Duplicates are ignored. Spaces and commas are removed.
- attachment: (optional) Value is the path of an attachment. Can occur multiple times.
Duplicates are ignored.
:param attributes_file:
:return:
"""
unique_tags = set()
unique_attachments = set()
with open(attributes_file, "rt", encoding="utf8") as f:
for line in f.readlines():
try:
key, val = line.split(":", 1)
key = key.strip()
val = val.strip()
except ValueError:
continue
if key == "tag":
val = val.replace(" ", "")
val = val.replace(",", "")
if val not in unique_tags:
self._tags.append(val)
unique_tags.add(val)
elif key == "attachment":
if val not in unique_attachments:
self._attachments.append(val)
unique_attachments.add(val)
elif key == "author":
self._author = val
elif key == "pgroup":
self._pgroup = val
elif key == "location":
self._location = val
elif key == "logbook":
self._logbook = val
def load_message_from_file(self, message_file):
"""
Load message from file
If the file starts with a `<`, the content is assumed to be HTML.
Otherwise, it is assumed to be plain text and will be tagged and escaped.
:param message_file:
:return:
"""
with open(message_file, "rt", encoding="utf8") as f:
self._message = "\n".join(f.readlines())
def load_credentials_from_file(self, credentials_file):
"""
Load the credentials from a file.
The file must contain three lines, each in the form key:value.
The keys are `url`, `user`, and `password`.
The URL must start with `https://` and end with `/api/v1`.
:param credentials_file: path of the credentials file.
If None, it is read from `.scilog.cred` the home directory.
Make sure to protect the credentials from unauthorized access!
:return: None
"""
if credentials_file is None:
credentials_file = Path.home() / ".scilog.cred"
if not Path(credentials_file).exists():
raise ValueError("Missing credentials")
with open(credentials_file, "rt", encoding="utf8") as f:
for line in f.readlines():
try:
key, val = line.split(":", 1)
key = key.strip()
val = val.strip()
except ValueError:
continue
if key == "user":
self._user = val
elif key == "password":
self._password = val
elif key == "url":
self._url = val
def validate(self):
"""
Perform a number of basic validity checks on the attributes.
:return:
:raise ValueError if a problem is found.
"""
if not re.match(r"[^@]+@[^@]+\.[^@]+", self._author):
raise ValueError("Invalid email address.")
if not self._user or not self._password:
raise ValueError("Invalid credentials.")
if not self.is_pgroup(self._pgroup):
raise ValueError("Invalid pgroup.")
if not self._logbook:
raise ValueError("Empty logbook name.")
if not self._message:
raise ValueError("Empty message.")
if not re.match(r"https://[a-zA-Z0-9]+\.[a-zA-Z0-9.]+(:[0-9]+)?/api/v1", self._url):
raise ValueError("Invalid URL.")
def run(self):
self.validate()
self.ingest_message()
def main(attributes_file, message_file, credentials_file):
ingestor = ScilogIngestor()
ingestor.load_attributes_from_file(attributes_file)
ingestor.load_message_from_file(message_file)
ingestor.load_credentials_from_file(credentials_file)
ingestor.run()
print("Message successfully transmitted")
def parse_args():
parser = argparse.ArgumentParser(
description="Simple file interface to ingest an entry into a SciLog logbook",
formatter_class=argparse.ArgumentDefaultsHelpFormatter
)
parser.add_argument("-c", "--credentials", default=None, help="credentials file, UTF-8 encoded")
parser.add_argument("attributes", help="attributes file, UTF-8 encoded")
parser.add_argument("message", help="message file, UTF-8 encoded")
clargs = parser.parse_args()
return clargs
if __name__ == "__main__":
clargs = parse_args()
main(clargs.attributes, clargs.message, clargs.credentials)