added snippet interface and httpclient

This commit is contained in:
2021-05-31 21:18:25 +02:00
parent 6d207c1db9
commit a1856a68e5
8 changed files with 257 additions and 50 deletions

View File

@ -1,5 +1,5 @@
from .scicat import SciCat
from .scilog import SciLog
from .scilog import Basesnippet, Paragraph

View File

@ -4,13 +4,13 @@ from .config import Config
from .utils import typename
AUTH_HEADERS = {
HEADER_JSON = {
"Content-type": "application/json",
"Accept": "application/json"
}
class AuthClient(ABC):
class AuthMixin(ABC):
def __init__(self, address):
self.address = address.rstrip("/")
@ -27,14 +27,11 @@ class AuthClient(ABC):
def authenticate(self, username, password):
raise NotImplementedError
@property
def auth_headers(self):
headers = AUTH_HEADERS.copy()
headers["Authorization"] = self.token
return headers
@property
def token(self):
return self._retrieve_token()
def _retrieve_token(self):
username = getpass.getuser()
token = self._token
if token is None:
@ -45,6 +42,7 @@ class AuthClient(ABC):
password = getpass.getpass(prompt=f"{tn} password for {username}: ")
token = self.authenticate(username, password)
self.config[username] = self._token = token
return token

View File

@ -1,6 +1,6 @@
from pathlib import Path
import json
import os
class Config(dict):
@ -29,6 +29,9 @@ class Config(dict):
def _save(self):
json_save(self, self.fname)
def delete(self):
print(self.fname)
os.remove(self.fname)

84
scilog/httpclient.py Normal file
View File

@ -0,0 +1,84 @@
import requests
import functools
import json
from abc import ABC, abstractmethod
from .authclient import AuthMixin, AuthError, HEADER_JSON
def authenticated(func):
@functools.wraps(func)
def authenticated_call(*args, **kwargs):
if not issubclass(type(args[0]), HttpClient):
raise AttributeError("First argument must be an instance of HttpClient")
if "headers" in kwargs:
kwargs["headers"]["Authorization"] = args[0].token
else:
kwargs["headers"] = {}
kwargs["headers"]["Authorization"] = args[0].token
return func(*args, **kwargs)
return authenticated_call
class HttpClient(AuthMixin):
def __init__(self, address):
self.address = address
self._verify_certificate = True
super().__init__(address)
def authenticate(self, username, password):
auth_payload = {
"principal": username,
"password": password
}
res = self._login(auth_payload, HEADER_JSON)
try:
token = "Bearer " + res["token"]
except KeyError as e:
raise AuthError(res) from e
else:
return token
@authenticated
def get_request(self, url, params=None, headers=None, timeout=10):
response = requests.get(url, params=params, headers=headers, timeout=timeout, verify=self._verify_certificate)
if response.ok:
return response.json()
else:
if response.reason == "Unauthorized":
self.config.delete()
self._retrieve_token()
raise response.raise_for_status()
else:
raise response.raise_for_status()
@authenticated
def post_request(self, url, payload=None, headers=None, timeout=10):
response = requests.post(url, json=payload, headers=headers, timeout=timeout, verify=self._verify_certificate).json()
return response
def _login(self, payload=None, headers=None, timeout=10):
return requests.post(self.address + "/users/login", json=payload, headers=headers, timeout=timeout, verify=self._verify_certificate).json()
def typename(self, obj):
return type(obj).__name__
@staticmethod
def make_filter(where:dict=None, limit:int=0, skip:int=0, fields:dict=None, include:dict=None, order:list=None):
filt = dict()
if where is not None:
items = [{k: v} for k, v in where.items()]
filt["where"] = {"and": items}
if limit > 0:
filt["limit"] = limit
if skip > 0:
filt["skip"] = skip
if fields is not None:
filt["fields"] = include
if order is not None:
filt["order"] = order
filt = json.dumps(filt)
return {"filter": filt}

View File

@ -1,8 +1,8 @@
from .authclient import AuthClient, AuthError, AUTH_HEADERS
from .authclient import AuthMixin, AuthError, HEADER_JSON
from .utils import post_request, get_request
class SciCat(AuthClient):
class SciCat(AuthMixin):
def authenticate(self, username, password):
url = self.address + "/users/login"
@ -10,7 +10,7 @@ class SciCat(AuthClient):
"username": username,
"password": password
}
res = post_request(url, auth_payload, AUTH_HEADERS)
res = post_request(url, auth_payload, HEADER_JSON)
try:
token = res["id"]
except KeyError as e:

View File

@ -1,41 +1,90 @@
from .authclient import AuthClient, AuthError, AUTH_HEADERS
from .utils import post_request, get_request
from __future__ import annotations
from .authclient import AuthMixin, AuthError, HEADER_JSON
from .mkfilt import make_filter
from .httpclient import HttpClient
from .snippet import Snippet
from typing import TypeVar, Union, List, Type, get_type_hints
import functools
class SciLog(AuthClient):
class Basesnippet(Snippet):
def __init__(self):
super().__init__()
self.set_properties(
id=str,
parentId=str,
ownerGroup=str,
accessGroups=list,
snippetType=str,
isPrivate=bool,
createdAt=str,
createdBy=str,
updatedAt=str,
updateBy=str,
subsnippets=List[type(Basesnippet)],
tags=List[str],
dashboardName=str,
files=str,
location=str,
defaultOrder=int,
linkType=str,
versionable=bool,
deleted=bool)
def authenticate(self, username, password):
url = self.address + "/users/login"
auth_payload = {
"principal": username,
"password": password
}
res = post_request(url, auth_payload, AUTH_HEADERS)
try:
token = "Bearer " + res["token"]
except KeyError as e:
raise SciLogAuthError(res) from e
else:
return token
class Paragraph(Basesnippet):
def __init__(self):
super().__init__()
self.set_properties(textcontent=str, isMessage=str)
class SciLogRestAPI(HttpClient):
def __init__(self, url):
super().__init__(url)
self._verify_certificate = False
class SciLog():
def __init__(self, url="https://lnode2.psi.ch/api/v1"):
self.http_client = SciLogRestAPI(url)
self.logbook_id = None
self.owner_group = None
def select_logbook(self, logbook:type(Basesnippet)):
self.logbook_id = logbook.id
self.owner_group = logbook.ownerGroup
def get_snippets(self, **kwargs):
url = self.address + "/basesnippets"
params = make_filter(**kwargs)
headers = self.auth_headers
return get_request(url, params=params, headers=headers)
url = self.http_client.address + "/basesnippets"
params = self.http_client.make_filter(where=kwargs)
headers = HEADER_JSON.copy()
return self.http_client.get_request(url, params=params, headers=headers)
def send_message(self, msg, **kwargs):
url = self.http_client.address + "/basesnippets"
payload = kwargs
kwargs["textcontent"] = msg
headers = HEADER_JSON.copy()
return self.http_client.post_request(url, payload=payload, headers=headers)
def post_snippet(self, **kwargs):
url = self.address + "/basesnippets"
url = self.http_client.address + "/basesnippets"
payload = kwargs
headers = self.auth_headers
return post_request(url, payload=payload, headers=headers)
headers = HEADER_JSON.copy()
return self.http_client.post_request(url, payload=payload, headers=headers)
def get_logbooks(self, **kwargs):
url = self.http_client.address + "/basesnippets"
snippet = Basesnippet()
snippet.import_dict(kwargs)
snippet.snippetType = "logbook"
params = self.http_client.make_filter(where=snippet.to_dict(include_none=False))
headers = HEADER_JSON.copy()
return Basesnippet.from_http_response(self.http_client.get_request(url, params=params, headers=headers))
class SciLogAuthError(AuthError):
pass

72
scilog/snippet.py Normal file
View File

@ -0,0 +1,72 @@
from typing import Type, get_type_hints, TypeVar
import functools
def typechecked(func):
@functools.wraps(func)
def typechecked_call(*args, **kwargs):
func_types = get_type_hints(func)
for index, key in enumerate(func_types.keys()):
if key != "return":
assert func_types[key] == type(args[index+1]), f"{repr(func)} expected to receive input of type {func_types[key].__name__} but received {type(args[index+1]).__name__}"
return func(*args, **kwargs)
return typechecked_call
def property_maker(cls, name, type_name):
storage_name = '_' + name
@property
def prop(self) -> type_name:
return getattr(self, storage_name)
@prop.setter
@typechecked
def prop(self, value: type_name) -> None:
setattr(self, storage_name, value)
return prop
class Snippet(dict):
def __init__(self, **kwargs):
self._properties = []
self.set_properties(**kwargs)
def set_properties(self, **kwargs):
for key, value in kwargs.items():
storage_name = '_' + key
setattr(Snippet, storage_name, None)
setattr(Snippet, key, property_maker(self, key, value))
self._properties.append(key)
def to_dict(self, include_none=True):
if include_none:
return {key: getattr(self, key) for key in self._properties }
else:
return {key: getattr(self, key) for key in self._properties if getattr(self, key) is not None}
def import_dict(self, properties):
for key in properties.keys():
setattr(self, key, properties[key])
@classmethod
def from_dict(cls, properties):
tmp = cls()
tmp.import_dict(properties)
return tmp
def __str__(self):
return f"{type(self).__name__}"
@classmethod
def from_http_response(cls, response):
if type(response)==list:
return [cls.from_dict(resp) for resp in response]
else:
return cls.from_dict(response)
if __name__ == "__main__":
tmp = Snippet(id=str, textcontent=str, defaultOrder=int)
print(tmp.id)
tmp.id = 2