import argparse import re import json import logging import requests from io import StringIO from pathlib import Path logging.basicConfig( level=logging.INFO, # format='[%(asctime)s] %(levelname)-8s %(name)-12s %(message)s' format='%(levelname)-8s %(message)s' ) base_directory = Path(".") upload_url = "https://dispatcher-api.psi.ch/sf-databuffer/configuration/upload" delete_url = "https://dispatcher-api.psi.ch/sf-databuffer/configuration/delete" # upload_url = "http://localhost:1234" def _remove_comments(text): """ remove c-style comments. text: blob of text with comments (can include newlines) returns: text with comments removed # Stolen from https://www.saltycrane.com/blog/2007/11/remove-c-comments-python/ """ pattern = r""" ## --------- COMMENT --------- /\* ## Start of /* ... */ comment [^*]*\*+ ## Non-* followed by 1-or-more *'s ( ## [^/*][^*]*\*+ ## )* ## 0-or-more things which don't start with / ## but do end with '*' / ## End of /* ... */ comment | ## -OR- various things which aren't comments: ( ## ## ------ " ... " STRING ------ " ## Start of " ... " string ( ## \\. ## Escaped char | ## -OR- [^"\\] ## Non "\ characters )* ## " ## End of " ... " string | ## -OR- ## ## ------ ' ... ' STRING ------ ' ## Start of ' ... ' string ( ## \\. ## Escaped char | ## -OR- [^'\\] ## Non '\ characters )* ## ' ## End of ' ... ' string | ## -OR- ## ## ------ ANYTHING ELSE ------- . ## Anything other char [^/"'\\]* ## Chars which doesn't start a comment, string ) ## or escape """ regex = re.compile(pattern, re.VERBOSE | re.MULTILINE | re.DOTALL) non_comments = [m.group(2) for m in regex.finditer(text) if m.group(2)] return "".join(non_comments) def _load_file(file_path): with open(file_path) as file_h: text = file_h.read() text = _remove_comments(text) config = json.loads(text) return config def remove_labels(sources): """ Remove labels from the source config :param sources: :return: source config without labels """ new_sources = [] for source in sources["sources"]: source.pop('labels', None) new_sources.append(source) return {"sources": new_sources} def remove_labeled_source(sources, label): """ Remove (a) source(s) with a specific label :param sources: :param label: :return: source config without the source(s) with the given label """ return {"sources": [x for x in sources["sources"] if "labels" not in x or (label not in x['labels'])]} def remove_backend_source(sources, backend): """ Remove sources from a given backend :param sources: :param backend: :return: list of sources excluding the sources from the specified backend """ return {"sources": [x for x in sources["sources"] if "backend" not in x or x['backend'] != backend]} def get_labels(sources): """ Retrieve all used labels in the source configurations :param sources: source config :return: list of available labels """ labels = set([item for x in sources["sources"] if "labels" in x for item in x["labels"]]) return labels def get_labeled_sources(sources, label): """ Get source(s) with the given label :param sources: :param label: :return: list of source config that contains label """ return [x for x in sources["sources"] if "labels" in x and label in x['labels']] def get_backend_sources(sources, backend): """ Get image source(s) :param sources: :return: list of source config that are images """ return [x for x in sources["sources"] if "backend" in x and x['backend'] == backend] def read_files(files_dir, file_type): """ Read sources or policies files :param files_dir: :param file_type: "sources" or "policies" :return: """ sources = [] for file in files_dir.iterdir(): logging.debug(f"Read file: {file}") config = _load_file(file) sources.extend(config[file_type]) return {file_type: sources} def update_sources_and_policies(sources, policies, update_type="upload"): """ Update sources and policies definition to the data/imagebuffer :param sources: sources to upload :param policies: policies to upload :param update_type: update type - upload or delete :return: """ upload_files = [("files", ("all.sources", StringIO(json.dumps(sources)))), ("files", ("all.policies", StringIO(json.dumps(policies))))] logging.info(f"Update - {update_type} {len(sources['sources'])} sources and {len(policies['policies'])} policies") if update_type == "upload": test_response = requests.post(upload_url, files=upload_files) elif update_type == "delete": test_response = requests.post(delete_url, files=upload_files) else: logging.error(f"Update type - {update_type} - is not supported") if test_response.ok: logging.info("Update completed successfully!") else: logging.error(f"Update failed!\n{test_response.text}") def update_channel_cache(): # Update channel cache of data-api logging.info("Update channel cache") response = requests.post("https://data-api.psi.ch/sf/channels/config", json={"reload": "true"}) if response.ok: logging.info("Channel cache updated successfully!") else: logging.error(f"Updating channel cache failed!\n{response.text}") def main(): parser = argparse.ArgumentParser(description="Utility commands to work with the databuffer", formatter_class=argparse.RawTextHelpFormatter, epilog=f"--------\n\n" f"Examples:\n" "bufferutils upload\n" "bufferutils restart --label