diff --git a/example_scilog.py b/example_scilog.py index 6a48e2b..74f093b 100755 --- a/example_scilog.py +++ b/example_scilog.py @@ -15,23 +15,24 @@ urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) from scilog import SciLog +from scilog import Basesnippet, Paragraph -url = "https://lnode2.psi.ch/api/v1" +tmp = Basesnippet() +tmp.id = "2" + +# url = "https://lnode2.psi.ch/api/v1" +url = "http://[::1]:3000/" log = SciLog(url) #print(log.token) -loc = log.get_snippets(title="location", ownerGroup="admin") +logbooks = log.get_logbooks(ownerGroup=pgroup) -assert len(loc) == 1 -loc_id = loc[0]["id"] -print(loc_id) +assert len(logbooks) == 1 +logbook = logbooks[0] +print(logbook) -lb = log.get_snippets(snippetType="logbook", ownerGroup=pgroup) +log.select_logbook(logbook) -assert len(lb) == 1 -lb_id = lb[0]["id"] -print(lb_id) - -res = log.post_snippet(snippetType="paragraph", ownerGroup=pgroup, parentId=lb_id, textcontent="
from python
") +res = log.send_message("from python
") print(res) snips = log.get_snippets(snippetType="paragraph", ownerGroup=pgroup) diff --git a/scilog/__init__.py b/scilog/__init__.py index 0fd2f3d..b3dbd1c 100644 --- a/scilog/__init__.py +++ b/scilog/__init__.py @@ -1,5 +1,5 @@ from .scicat import SciCat from .scilog import SciLog - +from .scilog import Basesnippet, Paragraph diff --git a/scilog/authclient.py b/scilog/authclient.py index f4273aa..a74d55c 100644 --- a/scilog/authclient.py +++ b/scilog/authclient.py @@ -4,13 +4,13 @@ from .config import Config from .utils import typename -AUTH_HEADERS = { +HEADER_JSON = { "Content-type": "application/json", "Accept": "application/json" } -class AuthClient(ABC): +class AuthMixin(ABC): def __init__(self, address): self.address = address.rstrip("/") @@ -27,14 +27,11 @@ class AuthClient(ABC): def authenticate(self, username, password): raise NotImplementedError - @property - def auth_headers(self): - headers = AUTH_HEADERS.copy() - headers["Authorization"] = self.token - return headers - @property def token(self): + return self._retrieve_token() + + def _retrieve_token(self): username = getpass.getuser() token = self._token if token is None: @@ -45,6 +42,7 @@ class AuthClient(ABC): password = getpass.getpass(prompt=f"{tn} password for {username}: ") token = self.authenticate(username, password) self.config[username] = self._token = token + return token diff --git a/scilog/config.py b/scilog/config.py index bcb69a8..bd5a3ef 100644 --- a/scilog/config.py +++ b/scilog/config.py @@ -1,6 +1,6 @@ from pathlib import Path import json - +import os class Config(dict): @@ -29,6 +29,9 @@ class Config(dict): def _save(self): json_save(self, self.fname) + def delete(self): + print(self.fname) + os.remove(self.fname) diff --git a/scilog/httpclient.py b/scilog/httpclient.py new file mode 100644 index 0000000..9d79ff3 --- /dev/null +++ b/scilog/httpclient.py @@ -0,0 +1,84 @@ +import requests +import functools +import json +from abc import ABC, abstractmethod + +from .authclient import AuthMixin, AuthError, HEADER_JSON + +def authenticated(func): + @functools.wraps(func) + def authenticated_call(*args, **kwargs): + if not issubclass(type(args[0]), HttpClient): + raise AttributeError("First argument must be an instance of HttpClient") + if "headers" in kwargs: + kwargs["headers"]["Authorization"] = args[0].token + else: + kwargs["headers"] = {} + kwargs["headers"]["Authorization"] = args[0].token + + return func(*args, **kwargs) + return authenticated_call + +class HttpClient(AuthMixin): + def __init__(self, address): + self.address = address + self._verify_certificate = True + super().__init__(address) + + def authenticate(self, username, password): + auth_payload = { + "principal": username, + "password": password + } + res = self._login(auth_payload, HEADER_JSON) + try: + token = "Bearer " + res["token"] + except KeyError as e: + raise AuthError(res) from e + else: + return token + + @authenticated + def get_request(self, url, params=None, headers=None, timeout=10): + response = requests.get(url, params=params, headers=headers, timeout=timeout, verify=self._verify_certificate) + if response.ok: + return response.json() + else: + if response.reason == "Unauthorized": + self.config.delete() + self._retrieve_token() + raise response.raise_for_status() + else: + raise response.raise_for_status() + + + @authenticated + def post_request(self, url, payload=None, headers=None, timeout=10): + response = requests.post(url, json=payload, headers=headers, timeout=timeout, verify=self._verify_certificate).json() + return response + + def _login(self, payload=None, headers=None, timeout=10): + return requests.post(self.address + "/users/login", json=payload, headers=headers, timeout=timeout, verify=self._verify_certificate).json() + + def typename(self, obj): + return type(obj).__name__ + + @staticmethod + def make_filter(where:dict=None, limit:int=0, skip:int=0, fields:dict=None, include:dict=None, order:list=None): + filt = dict() + if where is not None: + items = [{k: v} for k, v in where.items()] + filt["where"] = {"and": items} + if limit > 0: + filt["limit"] = limit + if skip > 0: + filt["skip"] = skip + if fields is not None: + filt["fields"] = include + if order is not None: + filt["order"] = order + filt = json.dumps(filt) + return {"filter": filt} + + + diff --git a/scilog/scicat.py b/scilog/scicat.py index 91f21db..c06fad7 100644 --- a/scilog/scicat.py +++ b/scilog/scicat.py @@ -1,8 +1,8 @@ -from .authclient import AuthClient, AuthError, AUTH_HEADERS +from .authclient import AuthMixin, AuthError, HEADER_JSON from .utils import post_request, get_request -class SciCat(AuthClient): +class SciCat(AuthMixin): def authenticate(self, username, password): url = self.address + "/users/login" @@ -10,7 +10,7 @@ class SciCat(AuthClient): "username": username, "password": password } - res = post_request(url, auth_payload, AUTH_HEADERS) + res = post_request(url, auth_payload, HEADER_JSON) try: token = res["id"] except KeyError as e: diff --git a/scilog/scilog.py b/scilog/scilog.py index d9109a3..461d157 100644 --- a/scilog/scilog.py +++ b/scilog/scilog.py @@ -1,41 +1,90 @@ -from .authclient import AuthClient, AuthError, AUTH_HEADERS -from .utils import post_request, get_request +from __future__ import annotations + +from .authclient import AuthMixin, AuthError, HEADER_JSON from .mkfilt import make_filter +from .httpclient import HttpClient +from .snippet import Snippet +from typing import TypeVar, Union, List, Type, get_type_hints +import functools -class SciLog(AuthClient): +class Basesnippet(Snippet): + def __init__(self): + super().__init__() + self.set_properties( + id=str, + parentId=str, + ownerGroup=str, + accessGroups=list, + snippetType=str, + isPrivate=bool, + createdAt=str, + createdBy=str, + updatedAt=str, + updateBy=str, + subsnippets=List[type(Basesnippet)], + tags=List[str], + dashboardName=str, + files=str, + location=str, + defaultOrder=int, + linkType=str, + versionable=bool, + deleted=bool) + - def authenticate(self, username, password): - url = self.address + "/users/login" - auth_payload = { - "principal": username, - "password": password - } - res = post_request(url, auth_payload, AUTH_HEADERS) - try: - token = "Bearer " + res["token"] - except KeyError as e: - raise SciLogAuthError(res) from e - else: - return token +class Paragraph(Basesnippet): + def __init__(self): + super().__init__() + self.set_properties(textcontent=str, isMessage=str) + + +class SciLogRestAPI(HttpClient): + def __init__(self, url): + super().__init__(url) + self._verify_certificate = False + + +class SciLog(): + + def __init__(self, url="https://lnode2.psi.ch/api/v1"): + self.http_client = SciLogRestAPI(url) + self.logbook_id = None + self.owner_group = None + + def select_logbook(self, logbook:type(Basesnippet)): + self.logbook_id = logbook.id + self.owner_group = logbook.ownerGroup def get_snippets(self, **kwargs): - url = self.address + "/basesnippets" - params = make_filter(**kwargs) - headers = self.auth_headers - return get_request(url, params=params, headers=headers) + url = self.http_client.address + "/basesnippets" + params = self.http_client.make_filter(where=kwargs) + headers = HEADER_JSON.copy() + return self.http_client.get_request(url, params=params, headers=headers) + + def send_message(self, msg, **kwargs): + url = self.http_client.address + "/basesnippets" + payload = kwargs + kwargs["textcontent"] = msg + headers = HEADER_JSON.copy() + return self.http_client.post_request(url, payload=payload, headers=headers) def post_snippet(self, **kwargs): - url = self.address + "/basesnippets" + url = self.http_client.address + "/basesnippets" payload = kwargs - headers = self.auth_headers - return post_request(url, payload=payload, headers=headers) + headers = HEADER_JSON.copy() + return self.http_client.post_request(url, payload=payload, headers=headers) + def get_logbooks(self, **kwargs): + url = self.http_client.address + "/basesnippets" + snippet = Basesnippet() + snippet.import_dict(kwargs) + snippet.snippetType = "logbook" + params = self.http_client.make_filter(where=snippet.to_dict(include_none=False)) + headers = HEADER_JSON.copy() + return Basesnippet.from_http_response(self.http_client.get_request(url, params=params, headers=headers)) class SciLogAuthError(AuthError): pass - - - diff --git a/scilog/snippet.py b/scilog/snippet.py new file mode 100644 index 0000000..1bde29b --- /dev/null +++ b/scilog/snippet.py @@ -0,0 +1,72 @@ +from typing import Type, get_type_hints, TypeVar +import functools + +def typechecked(func): + @functools.wraps(func) + def typechecked_call(*args, **kwargs): + func_types = get_type_hints(func) + for index, key in enumerate(func_types.keys()): + if key != "return": + assert func_types[key] == type(args[index+1]), f"{repr(func)} expected to receive input of type {func_types[key].__name__} but received {type(args[index+1]).__name__}" + return func(*args, **kwargs) + return typechecked_call + +def property_maker(cls, name, type_name): + storage_name = '_' + name + + @property + def prop(self) -> type_name: + return getattr(self, storage_name) + + @prop.setter + @typechecked + def prop(self, value: type_name) -> None: + setattr(self, storage_name, value) + + return prop + +class Snippet(dict): + def __init__(self, **kwargs): + self._properties = [] + self.set_properties(**kwargs) + + def set_properties(self, **kwargs): + for key, value in kwargs.items(): + storage_name = '_' + key + setattr(Snippet, storage_name, None) + setattr(Snippet, key, property_maker(self, key, value)) + self._properties.append(key) + + def to_dict(self, include_none=True): + if include_none: + return {key: getattr(self, key) for key in self._properties } + else: + return {key: getattr(self, key) for key in self._properties if getattr(self, key) is not None} + + def import_dict(self, properties): + for key in properties.keys(): + setattr(self, key, properties[key]) + + @classmethod + def from_dict(cls, properties): + tmp = cls() + tmp.import_dict(properties) + return tmp + + def __str__(self): + return f"{type(self).__name__}" + + @classmethod + def from_http_response(cls, response): + if type(response)==list: + return [cls.from_dict(resp) for resp in response] + else: + return cls.from_dict(response) + + + + +if __name__ == "__main__": + tmp = Snippet(id=str, textcontent=str, defaultOrder=int) + print(tmp.id) + tmp.id = 2 \ No newline at end of file