first prototype
This commit is contained in:
5
scilog/__init__.py
Normal file
5
scilog/__init__.py
Normal file
@ -0,0 +1,5 @@
|
||||
|
||||
from .scicat import SciCat
|
||||
from .scilog import SciLog
|
||||
|
||||
|
5
scilog/autherror.py
Normal file
5
scilog/autherror.py
Normal file
@ -0,0 +1,5 @@
|
||||
|
||||
class AuthError(Exception):
|
||||
pass
|
||||
|
||||
|
44
scilog/config.py
Normal file
44
scilog/config.py
Normal file
@ -0,0 +1,44 @@
|
||||
from pathlib import Path
|
||||
import json
|
||||
|
||||
|
||||
class Config(dict):
|
||||
|
||||
def __init__(self, fname, folder=None):
|
||||
if folder is not None:
|
||||
folder = Path(folder)
|
||||
else:
|
||||
folder = Path.home()
|
||||
self.fname = folder / fname
|
||||
content = self._load()
|
||||
super().update(content)
|
||||
|
||||
def __setitem__(self, name, value):
|
||||
self.update(**{name: value})
|
||||
|
||||
def update(self, **kwargs):
|
||||
super().update(**kwargs)
|
||||
self._save()
|
||||
|
||||
def _load(self):
|
||||
fn = self.fname
|
||||
if fn.exists():
|
||||
return json_load(fn)
|
||||
else:
|
||||
return {}
|
||||
|
||||
def _save(self):
|
||||
json_save(self, self.fname)
|
||||
|
||||
|
||||
|
||||
def json_save(what, filename, *args, indent=4, sort_keys=True, **kwargs):
|
||||
with open(filename, "w") as f:
|
||||
json.dump(what, f, *args, indent=indent, sort_keys=sort_keys, **kwargs)
|
||||
|
||||
def json_load(filename, *args, **kwargs):
|
||||
with open(filename, "r") as f:
|
||||
return json.load(f, *args, **kwargs)
|
||||
|
||||
|
||||
|
11
scilog/mkfilt.py
Normal file
11
scilog/mkfilt.py
Normal file
@ -0,0 +1,11 @@
|
||||
import json
|
||||
|
||||
|
||||
def make_filter(**kwargs):
|
||||
items = [{k: v} for k, v in kwargs.items()]
|
||||
filt = {"where": {"and": items}}
|
||||
filt = json.dumps(filt)
|
||||
return {"filter": filt}
|
||||
|
||||
|
||||
|
69
scilog/scicat.py
Normal file
69
scilog/scicat.py
Normal file
@ -0,0 +1,69 @@
|
||||
import getpass
|
||||
from .config import Config
|
||||
from .utils import post_request, get_request
|
||||
from .autherror import AuthError
|
||||
|
||||
|
||||
AUTH_HEADERS = {
|
||||
"Content-type": "application/json",
|
||||
"Accept": "application/json"
|
||||
}
|
||||
|
||||
|
||||
class SciCat:
|
||||
|
||||
def __init__(self, address):
|
||||
self.address = address.rstrip("/")
|
||||
self._token = None
|
||||
self.config = Config(".scicat-tokens")
|
||||
|
||||
def __repr__(self):
|
||||
return f"SciCat @ {self.address}"
|
||||
|
||||
@property
|
||||
def proposals(self):
|
||||
url = self.address + "/proposals"
|
||||
headers = self.auth_headers
|
||||
return get_request(url, headers=headers)
|
||||
|
||||
|
||||
@property
|
||||
def auth_headers(self):
|
||||
headers = AUTH_HEADERS.copy()
|
||||
headers["Authorization"] = self.token
|
||||
return headers
|
||||
|
||||
@property
|
||||
def token(self):
|
||||
username = getpass.getuser()
|
||||
password = getpass.getpass(prompt=f"SciCat password for {username}: ")
|
||||
token = self._token
|
||||
if token is None:
|
||||
try:
|
||||
token = self.config[username]
|
||||
except KeyError:
|
||||
token = self.authenticate(username, password)
|
||||
self.config[username] = self._token = token
|
||||
return token
|
||||
|
||||
def authenticate(self, username, password):
|
||||
url = self.address + "/users/login"
|
||||
auth_payload = {
|
||||
"username": username,
|
||||
"password": password
|
||||
}
|
||||
res = post_request(url, auth_payload, AUTH_HEADERS)
|
||||
try:
|
||||
token = res["id"]
|
||||
except KeyError as e:
|
||||
raise SciCatAuthError(res) from e
|
||||
else:
|
||||
return token
|
||||
|
||||
|
||||
|
||||
class SciCatAuthError(AuthError):
|
||||
pass
|
||||
|
||||
|
||||
|
77
scilog/scilog.py
Normal file
77
scilog/scilog.py
Normal file
@ -0,0 +1,77 @@
|
||||
import getpass
|
||||
from .config import Config
|
||||
from .utils import post_request, get_request
|
||||
from .autherror import AuthError
|
||||
from .mkfilt import make_filter
|
||||
|
||||
|
||||
AUTH_HEADERS = {
|
||||
"Content-type": "application/json",
|
||||
"Accept": "application/json"
|
||||
}
|
||||
|
||||
|
||||
class SciLog:
|
||||
|
||||
def __init__(self, address):
|
||||
self.address = address.rstrip("/")
|
||||
self._token = None
|
||||
self.config = Config(".scilog-tokens")
|
||||
|
||||
def __repr__(self):
|
||||
return f"SciLog @ {self.address}"
|
||||
|
||||
|
||||
def get_snippets(self, **kwargs):
|
||||
url = self.address + "/basesnippets"
|
||||
params = make_filter(**kwargs)
|
||||
headers = self.auth_headers
|
||||
return get_request(url, params=params, headers=headers)
|
||||
|
||||
def post_snippet(self, **kwargs):
|
||||
url = self.address + "/basesnippets"
|
||||
payload = kwargs
|
||||
headers = self.auth_headers
|
||||
return post_request(url, payload=payload, headers=headers)
|
||||
|
||||
|
||||
@property
|
||||
def auth_headers(self):
|
||||
headers = AUTH_HEADERS.copy()
|
||||
headers["Authorization"] = self.token
|
||||
return headers
|
||||
|
||||
@property
|
||||
def token(self):
|
||||
username = getpass.getuser()
|
||||
token = self._token
|
||||
if token is None:
|
||||
try:
|
||||
token = self.config[username]
|
||||
except KeyError:
|
||||
password = getpass.getpass(prompt=f"SciLog password for {username}: ")
|
||||
token = self.authenticate(username, password)
|
||||
self.config[username] = self._token = token
|
||||
return token
|
||||
|
||||
def authenticate(self, username, password):
|
||||
url = self.address + "/users/login"
|
||||
auth_payload = {
|
||||
"principal": username,
|
||||
"password": password
|
||||
}
|
||||
res = post_request(url, auth_payload, AUTH_HEADERS)
|
||||
try:
|
||||
token = "Bearer " + res["token"]
|
||||
except KeyError as e:
|
||||
raise SciLogAuthError(res) from e
|
||||
else:
|
||||
return token
|
||||
|
||||
|
||||
|
||||
class SciLogAuthError(AuthError):
|
||||
pass
|
||||
|
||||
|
||||
|
20
scilog/utils.py
Normal file
20
scilog/utils.py
Normal file
@ -0,0 +1,20 @@
|
||||
import requests
|
||||
|
||||
|
||||
#TODO: add params/payload and response validation
|
||||
|
||||
|
||||
def get_request(url, params=None, headers=None, timeout=10):
|
||||
response = requests.get(url, params=params, headers=headers, timeout=timeout, verify=False).json()
|
||||
return response
|
||||
|
||||
def post_request(url, payload=None, headers=None, timeout=10):
|
||||
response = requests.post(url, json=payload, headers=headers, timeout=timeout, verify=False).json()
|
||||
return response
|
||||
|
||||
|
||||
def typename(obj):
|
||||
return type(obj).__name__
|
||||
|
||||
|
||||
|
Reference in New Issue
Block a user