332 lines
9.8 KiB
Python
332 lines
9.8 KiB
Python
#!/usr/bin/env -S uv run --script
|
|
#
|
|
# /// script
|
|
# requires-python = ">=3.12"
|
|
# dependencies = [
|
|
# "pillow",
|
|
# "scilog",
|
|
# ]
|
|
# ///
|
|
|
|
|
|
import argparse
|
|
import re
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Optional, Set, Tuple
|
|
|
|
from PIL import Image, UnidentifiedImageError
|
|
|
|
from scilog import SciLog, LogbookMessage
|
|
|
|
|
|
def get_image_size(file_path):
|
|
"""
|
|
|
|
Args:
|
|
file_path:
|
|
|
|
Returns:
|
|
|
|
Raises:
|
|
OSError: If the file does not exist.
|
|
PIL.Image.UnidentifiedImageError: If the image is not recognized.
|
|
"""
|
|
|
|
with Image.open(file_path) as img:
|
|
return img.size
|
|
|
|
|
|
re_single_nbsp = re.compile(r"(?<!;) (?!&)")
|
|
re_rep_pre_spc = re.compile(r" {2,}")
|
|
|
|
|
|
def replace_pre_spaces(s: str) -> str:
|
|
"""
|
|
Replace multiple spaces with the corresponding number of entities.
|
|
|
|
Args:
|
|
s: string
|
|
|
|
Returns: string
|
|
|
|
"""
|
|
def nbsp(mo):
|
|
return " " * len(mo.group())
|
|
|
|
return re_rep_pre_spc.sub(nbsp, s)
|
|
|
|
|
|
class ScilogIngestor:
|
|
def __init__(self):
|
|
super().__init__()
|
|
self._author: str = ""
|
|
self._pgroup: str = ""
|
|
self._logbook: str = ""
|
|
self._message: str = ""
|
|
self._tags: List[str] = []
|
|
self._attachments: List[str] = []
|
|
self._url: str = ""
|
|
self._user: str = ""
|
|
self._password: str = ""
|
|
self._location: str = ""
|
|
self._scilog: Optional[SciLog] = None
|
|
|
|
def prepare(self):
|
|
pass
|
|
|
|
@staticmethod
|
|
def _adjust_image_size(file_info):
|
|
try:
|
|
filepath = file_info['filepath']
|
|
except KeyError:
|
|
return
|
|
try:
|
|
w, h = get_image_size(filepath)
|
|
except (OSError, UnidentifiedImageError):
|
|
return
|
|
else:
|
|
# limit image size
|
|
while w > 400:
|
|
w //= 2
|
|
file_info['style'] = {'width': f'{w}px', 'height': ''}
|
|
|
|
def convert_plain(self, mesg: str) -> str:
|
|
"""
|
|
convert plain-text entry to HTML
|
|
|
|
- Convert plain text message to HTML paragraph with <tt> formatting.
|
|
- Replace line feeds by <br> tags.
|
|
- Replace tabs and spaces.
|
|
|
|
Args:
|
|
mesg:
|
|
|
|
Returns: string
|
|
"""
|
|
|
|
mesg = mesg.replace("&", "&").replace("<", "<").replace(">", ">")
|
|
mesg = mesg.replace("\n", "<br>")
|
|
mesg = mesg.expandtabs(8)
|
|
mesg = replace_pre_spaces(mesg)
|
|
mesg = "<p><tt>" + mesg + "</tt></p>"
|
|
|
|
return mesg
|
|
|
|
@staticmethod
|
|
def is_pgroup(value: str) -> str:
|
|
value = value.lower()
|
|
if len(value) == 6 and value[0] in {"e", "p"} and int(value[1:]):
|
|
return "p" + value[1:]
|
|
else:
|
|
return ""
|
|
|
|
@property
|
|
def scilog(self) -> SciLog:
|
|
"""
|
|
Return a SciLog instance.
|
|
|
|
Returns: SciLog
|
|
"""
|
|
|
|
if self._scilog is None:
|
|
options = {"username": self._user,
|
|
"password": self._password}
|
|
url = self._url
|
|
self._scilog = SciLog(url, options=options)
|
|
|
|
return self._scilog
|
|
|
|
def ingest_message(self):
|
|
"""
|
|
ingest message into scilog
|
|
|
|
this method ingests the current message into scilog.
|
|
|
|
"""
|
|
|
|
log = self.scilog
|
|
|
|
try:
|
|
logbook_name = self._logbook
|
|
except KeyError:
|
|
lb_filter = {"ownerGroup": self._pgroup, "deleted": False}
|
|
else:
|
|
lb_filter = {"ownerGroup": self._pgroup, "name": logbook_name, "deleted": False}
|
|
|
|
logbooks = log.get_logbooks(where=lb_filter, limit=10)
|
|
try:
|
|
logbook = logbooks[0]
|
|
except IndexError:
|
|
raise ValueError(f"no logbook found for {lb_filter}")
|
|
else:
|
|
if len(logbooks) > 1:
|
|
raise ValueError(f"multiple logbooks found for {lb_filter}")
|
|
else:
|
|
log.select_logbook(logbook)
|
|
|
|
msg = LogbookMessage()
|
|
if self._message[0] == '<':
|
|
msg.add_text(self._message)
|
|
else:
|
|
for line in self._message.split('\n'):
|
|
msg.add_text(line)
|
|
for att in self._attachments:
|
|
msg.add_file(att)
|
|
file_info = msg._content.files[-1]
|
|
self._adjust_image_size(file_info)
|
|
if self._tags:
|
|
msg.add_tag(self._tags)
|
|
log.send_logbook_message(msg)
|
|
|
|
def load_attributes_from_file(self, attributes_file):
|
|
"""
|
|
Load attributes, tags and attachment paths from a file.
|
|
|
|
Each line of the file declares a key:value pair.
|
|
The keys are:
|
|
- author: (required) Value is the e-mail address of the author.
|
|
- pgroup: (required) Value is the p-group of the logbook.
|
|
- logbook: (required) Value is the name of the logbook.
|
|
- location: (not used) Value is the name of the location. Currently not used.
|
|
- tag: (optional) Value is a tag to be added to the snippet. Can occur multiple times.
|
|
Duplicates are ignored. Spaces and commas are removed.
|
|
- attachment: (optional) Value is the path of an attachment. Can occur multiple times.
|
|
Duplicates are ignored.
|
|
|
|
:param attributes_file:
|
|
:return:
|
|
"""
|
|
|
|
unique_tags = set()
|
|
unique_attachments = set()
|
|
|
|
with open(attributes_file, "rt", encoding="utf8") as f:
|
|
for line in f.readlines():
|
|
try:
|
|
key, val = line.split(":", 1)
|
|
key = key.strip()
|
|
val = val.strip()
|
|
except ValueError:
|
|
continue
|
|
|
|
if key == "tag":
|
|
val = val.replace(" ", "")
|
|
val = val.replace(",", "")
|
|
if val not in unique_tags:
|
|
self._tags.append(val)
|
|
unique_tags.add(val)
|
|
elif key == "attachment":
|
|
if val not in unique_attachments:
|
|
self._attachments.append(val)
|
|
unique_attachments.add(val)
|
|
elif key == "author":
|
|
self._author = val
|
|
elif key == "pgroup":
|
|
self._pgroup = val
|
|
elif key == "location":
|
|
self._location = val
|
|
elif key == "logbook":
|
|
self._logbook = val
|
|
|
|
def load_message_from_file(self, message_file):
|
|
"""
|
|
Load message from file
|
|
|
|
If the file starts with a `<`, the content is assumed to be HTML.
|
|
Otherwise, it is assumed to be plain text and will be tagged and escaped.
|
|
|
|
:param message_file:
|
|
:return:
|
|
"""
|
|
|
|
with open(message_file, "rt", encoding="utf8") as f:
|
|
self._message = "\n".join(f.readlines())
|
|
|
|
def load_credentials_from_file(self, credentials_file):
|
|
"""
|
|
Load the credentials from a file.
|
|
|
|
The file must contain three lines, each in the form key:value.
|
|
The keys are `url`, `user`, and `password`.
|
|
The URL must start with `https://` and end with `/api/v1`.
|
|
|
|
:param credentials_file: path of the credentials file.
|
|
If None, it is read from `.scilog.cred` the home directory.
|
|
Make sure to protect the credentials from unauthorized access!
|
|
:return: None
|
|
"""
|
|
|
|
if credentials_file is None:
|
|
credentials_file = Path.home() / ".scilog.cred"
|
|
if not Path(credentials_file).exists():
|
|
raise ValueError("Missing credentials")
|
|
with open(credentials_file, "rt", encoding="utf8") as f:
|
|
for line in f.readlines():
|
|
try:
|
|
key, val = line.split(":", 1)
|
|
key = key.strip()
|
|
val = val.strip()
|
|
except ValueError:
|
|
continue
|
|
|
|
if key == "user":
|
|
self._user = val
|
|
elif key == "password":
|
|
self._password = val
|
|
elif key == "url":
|
|
self._url = val
|
|
|
|
def validate(self):
|
|
"""
|
|
Perform a number of basic validity checks on the attributes.
|
|
|
|
:return:
|
|
:raise ValueError if a problem is found.
|
|
"""
|
|
|
|
if not re.match(r"[^@]+@[^@]+\.[^@]+", self._author):
|
|
raise ValueError("Invalid email address.")
|
|
if not self._user or not self._password:
|
|
raise ValueError("Invalid credentials.")
|
|
if not self.is_pgroup(self._pgroup):
|
|
raise ValueError("Invalid pgroup.")
|
|
if not self._logbook:
|
|
raise ValueError("Empty logbook name.")
|
|
if not self._message:
|
|
raise ValueError("Empty message.")
|
|
if not re.match(r"https://[a-zA-Z0-9]+\.[a-zA-Z0-9.]+(:[0-9]+)?/api/v1", self._url):
|
|
raise ValueError("Invalid URL.")
|
|
|
|
def run(self):
|
|
self.validate()
|
|
self.ingest_message()
|
|
|
|
|
|
def main(attributes_file, message_file, credentials_file):
|
|
ingestor = ScilogIngestor()
|
|
ingestor.load_attributes_from_file(attributes_file)
|
|
ingestor.load_message_from_file(message_file)
|
|
ingestor.load_credentials_from_file(credentials_file)
|
|
ingestor.run()
|
|
print("Message successfully transmitted")
|
|
|
|
|
|
def parse_args():
|
|
parser = argparse.ArgumentParser(
|
|
description="Simple file interface to ingest an entry into a SciLog logbook",
|
|
formatter_class=argparse.ArgumentDefaultsHelpFormatter
|
|
)
|
|
|
|
parser.add_argument("-c", "--credentials", default=None, help="credentials file, UTF-8 encoded")
|
|
parser.add_argument("attributes", help="attributes file, UTF-8 encoded")
|
|
parser.add_argument("message", help="message file, UTF-8 encoded")
|
|
|
|
clargs = parser.parse_args()
|
|
|
|
return clargs
|
|
|
|
|
|
if __name__ == "__main__":
|
|
clargs = parse_args()
|
|
main(clargs.attributes, clargs.message, clargs.credentials)
|