Files
sf_databuffer/bufferutils.py

337 lines
11 KiB
Python

import argparse
import re
import json
import logging
import requests
from io import StringIO
from pathlib import Path
logging.basicConfig(
level=logging.INFO,
# format='[%(asctime)s] %(levelname)-8s %(name)-12s %(message)s'
format='%(levelname)-8s %(message)s'
)
base_directory = Path(".")
upload_url = "https://dispatcher-api.psi.ch/sf/configuration/upload"
# upload_url = "http://localhost:1234"
def _remove_comments(text):
""" remove c-style comments.
text: blob of text with comments (can include newlines)
returns: text with comments removed
# Stolen from https://www.saltycrane.com/blog/2007/11/remove-c-comments-python/
"""
pattern = r"""
## --------- COMMENT ---------
/\* ## Start of /* ... */ comment
[^*]*\*+ ## Non-* followed by 1-or-more *'s
( ##
[^/*][^*]*\*+ ##
)* ## 0-or-more things which don't start with /
## but do end with '*'
/ ## End of /* ... */ comment
| ## -OR- various things which aren't comments:
( ##
## ------ " ... " STRING ------
" ## Start of " ... " string
( ##
\\. ## Escaped char
| ## -OR-
[^"\\] ## Non "\ characters
)* ##
" ## End of " ... " string
| ## -OR-
##
## ------ ' ... ' STRING ------
' ## Start of ' ... ' string
( ##
\\. ## Escaped char
| ## -OR-
[^'\\] ## Non '\ characters
)* ##
' ## End of ' ... ' string
| ## -OR-
##
## ------ ANYTHING ELSE -------
. ## Anything other char
[^/"'\\]* ## Chars which doesn't start a comment, string
) ## or escape
"""
regex = re.compile(pattern, re.VERBOSE | re.MULTILINE | re.DOTALL)
non_comments = [m.group(2) for m in regex.finditer(text) if m.group(2)]
return "".join(non_comments)
def _load_file(file_path):
with open(file_path) as file_h:
text = file_h.read()
text = _remove_comments(text)
config = json.loads(text)
return config
def remove_labels(sources):
"""
Remove labels from the source config
:param sources:
:return: source config without labels
"""
new_sources = []
for source in sources["sources"]:
source.pop('labels', None)
new_sources.append(source)
return {"sources": new_sources}
def remove_labeled_source(sources, label):
"""
Remove (a) source(s) with a specific label
:param sources:
:param label:
:return: source config without the source(s) with the given label
"""
return {"sources": [x for x in sources["sources"] if "labels" not in x or (label not in x['labels'])]}
def remove_image_source(sources):
return {"sources": [x for x in sources["sources"] if "backend" not in x or x['backend'] != "sf-imagebuffer"]}
def get_labels(sources):
"""
Retrieve all used labels in the source configurations
:param sources: source config
:return: list of available labels
"""
labels = set([item for x in sources["sources"] if "labels" in x for item in x["labels"]])
return labels
def get_labeled_sources(sources, label):
"""
Get source(s) with the given label
:param sources:
:param label:
:return: list of source config that contains label
"""
return [x for x in sources["sources"] if "labels" in x and label in x['labels']]
def get_image_sources(sources):
"""
Get image source(s)
:param sources:
:return: list of source config that are images
"""
return [x for x in sources["sources"] if "backend" in x and x['backend'] == "sf-imagebuffer"]
def read_files(files_dir, file_type):
"""
Read sources or policies files
:param files_dir:
:param file_type: "sources" or "policies"
:return:
"""
sources = []
for file in files_dir.iterdir():
logging.debug(f"Read file: {file}")
config = _load_file(file)
sources.extend(config[file_type])
return {file_type: sources}
def upload_sources_and_policies(sources, policies):
"""
Upload sources and policies definition to the data/imagebuffer
:param sources: sources to upload
:param policies: policies to upload
:return:
"""
upload_files = [("files", ("all.sources", StringIO(json.dumps(sources)))),
("files", ("all.policies", StringIO(json.dumps(policies))))]
logging.info(f"Upload {len(sources['sources'])} sources and {len(policies['policies'])} policies")
test_response = requests.post(upload_url, files=upload_files)
if test_response.ok:
logging.info("Upload completed successfully!")
else:
logging.error(f"Upload failed!\n{test_response.text}")
def update_channel_cache():
# Update channel cache of data-api
logging.info("Update channel cache")
response = requests.post("https://data-api.psi.ch/sf/channels/config", json={"reload": "true"})
if response.ok:
logging.info("Channel cache updated successfully!")
else:
logging.error(f"Updating channel cache failed!\n{response.text}")
def main():
parser = argparse.ArgumentParser(description="Utility commands to work with the databuffer",
formatter_class=argparse.RawTextHelpFormatter,
epilog=f"--------\n\n"
f"Examples:\n"
"bufferutils upload\n"
"bufferutils restart --label <label>\n"
)
subparsers = parser.add_subparsers(title='command',
description='valid commands',
dest='command',
help='command to execute')
parser_upload = subparsers.add_parser('upload',
help="upload configuration",
formatter_class=argparse.RawTextHelpFormatter)
parser_restart = subparsers.add_parser('restart',
help="restart a source",
formatter_class=argparse.RawTextHelpFormatter)
parser_restart.add_argument('-l',
'--label',
default=None,
help="label that identifies the source(s) to restart")
parser_stop = subparsers.add_parser('stop',
help="stop a source",
formatter_class=argparse.RawTextHelpFormatter)
parser_stop.add_argument('-l',
'--label',
default=None,
help="label that identifies the source(s) to stop")
parser_stop.add_argument('-t',
'--type',
default=None,
help="type of to stop")
parser_list = subparsers.add_parser('list',
help="list",
formatter_class=argparse.RawTextHelpFormatter)
parser_list.add_argument('--label',
action="store_true",
help="list labels")
arguments = parser.parse_args()
#
# UPLOAD
if arguments.command == 'upload':
policies = read_files(base_directory / Path("policies"), "policies")
sources = read_files(base_directory / Path("sources"), "sources")
# Just to make sure that the additional labels entry does not break the backend - remove it
# sources = remove_labels(sources)
# print(json.dumps(sources))
upload_sources_and_policies(sources, policies)
# Update channels chache
update_channel_cache()
#
# RESTART
elif arguments.command == 'restart':
if arguments.label:
label = arguments.label
logging.info(f"Restart: {label}")
policies = read_files(base_directory / Path("policies"), "policies")
sources = read_files(base_directory / Path("sources"), "sources")
sources_new = sources.copy()
# Only for debugging purposes
labeled_sources = get_labeled_sources(sources_new, label)
for s in labeled_sources:
logging.info(f"Restarting {s['stream']}")
sources_new = remove_labeled_source(sources_new, label)
# Stopping the removed source(s)
upload_sources_and_policies(sources_new, policies)
# Starting the source(s) again
upload_sources_and_policies(sources, policies)
# There is no need to update the channel chache as nothing really changed
else:
logging.warning("Not yet implemented")
parser_restart.print_usage()
#
# STOP
elif arguments.command == 'stop':
if arguments.label:
label = arguments.label
logging.info(f"Stop: {label}")
policies = read_files(base_directory / Path("policies"), "policies")
sources = read_files(base_directory / Path("sources"), "sources")
# Only for debugging purposes
labeled_sources = get_labeled_sources(sources, label)
for s in labeled_sources:
logging.info(f"Stop {s['stream']}")
sources_new = remove_labeled_source(sources, label)
# Stopping the removed source(s)
upload_sources_and_policies(sources_new, policies)
elif arguments.type:
type = arguments.type
if type != "image":
logging.warning(f"Type {type} currently not supported")
return
logging.info(f"Stop: {type}")
policies = read_files(base_directory / Path("policies"), "policies")
sources = read_files(base_directory / Path("sources"), "sources")
# Only for debugging purposes
image_sources = get_image_sources(sources)
for s in image_sources:
logging.info(f"Stop {s['stream']}")
sources_new = remove_image_source(sources)
# Stopping the removed source(s)
upload_sources_and_policies(sources_new, policies)
else:
logging.warning("Not yet implemented")
parser_stop.print_usage()
#
# LIST
elif arguments.command == 'list':
if arguments.label:
sources = read_files(base_directory / Path("sources"), "sources")
for label in get_labels(sources):
print(label)
else:
logging.warning("Not yet implemented")
parser_list.print_usage()
else:
parser.print_usage()
return -1
if __name__ == '__main__':
main()