mirror of
https://github.com/tiqi-group/pydase.git
synced 2025-04-20 00:10:03 +02:00
feat: adding support for python 3.8, 3.9
This commit is contained in:
parent
55ab705542
commit
f948605b58
4
.flake8
4
.flake8
@ -1,8 +1,8 @@
|
||||
[flake8]
|
||||
ignore = E501,W503,FS003,F403,F405,E203
|
||||
ignore = E501,W503,FS003,F403,F405,E203,UNT001
|
||||
include = src
|
||||
max-line-length = 88
|
||||
max-doc-length = 88
|
||||
max-complexity = 7
|
||||
max-expression-complexity = 5.5
|
||||
use_class_attributes_order_strict_mode=True
|
||||
use_class_attributes_order_strict_mode=True
|
||||
|
47
.github/workflows/python-package.yml
vendored
47
.github/workflows/python-package.yml
vendored
@ -5,37 +5,36 @@ name: Python package
|
||||
|
||||
on:
|
||||
push:
|
||||
branches: [ "main" ]
|
||||
branches: ['main']
|
||||
pull_request:
|
||||
branches: [ "main" ]
|
||||
branches: ['main']
|
||||
|
||||
jobs:
|
||||
build:
|
||||
|
||||
runs-on: ubuntu-latest
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
python-version: ["3.10", "3.11"]
|
||||
python-version: ['3.8', '3.9', '3.10', '3.11']
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
- name: Set up Python ${{ matrix.python-version }}
|
||||
uses: actions/setup-python@v3
|
||||
with:
|
||||
python-version: ${{ matrix.python-version }}
|
||||
- name: Install dependencies
|
||||
run: |
|
||||
python -m pip install --upgrade pip
|
||||
python -m pip install poetry
|
||||
poetry install
|
||||
- name: Lint with flake8
|
||||
run: |
|
||||
# stop the build if there are Python syntax errors or undefined names
|
||||
poetry run flake8 src/pydase --count --show-source --statistics
|
||||
- name: Test with pytest
|
||||
run: |
|
||||
poetry run pytest
|
||||
- name: Test with pyright
|
||||
run: |
|
||||
poetry run pyright src/pydase
|
||||
- uses: actions/checkout@v3
|
||||
- name: Set up Python ${{ matrix.python-version }}
|
||||
uses: actions/setup-python@v3
|
||||
with:
|
||||
python-version: ${{ matrix.python-version }}
|
||||
- name: Install dependencies
|
||||
run: |
|
||||
python -m pip install --upgrade pip
|
||||
python -m pip install poetry
|
||||
poetry install
|
||||
- name: Lint with flake8
|
||||
run: |
|
||||
# stop the build if there are Python syntax errors or undefined names
|
||||
poetry run flake8 src/pydase --count --show-source --statistics
|
||||
- name: Test with pytest
|
||||
run: |
|
||||
poetry run pytest
|
||||
- name: Test with pyright
|
||||
run: |
|
||||
poetry run pyright src/pydase
|
||||
|
1814
poetry.lock
generated
1814
poetry.lock
generated
File diff suppressed because it is too large
Load Diff
@ -1,2 +1,3 @@
|
||||
[virtualenvs]
|
||||
in-project = true
|
||||
prefer-active-python = true
|
||||
|
@ -8,7 +8,7 @@ packages = [{ include = "pydase", from = "src" }]
|
||||
|
||||
|
||||
[tool.poetry.dependencies]
|
||||
python = "^3.10"
|
||||
python = "^3.8"
|
||||
rpyc = "^5.3.1"
|
||||
loguru = "^0.7.0"
|
||||
fastapi = "^0.100.0"
|
||||
@ -17,7 +17,7 @@ toml = "^0.10.2"
|
||||
python-socketio = "^5.8.0"
|
||||
websockets = "^11.0.3"
|
||||
confz = "^2.0.0"
|
||||
pint = "^0.22"
|
||||
pint = "^0.21"
|
||||
pillow = "^10.0.0"
|
||||
|
||||
[tool.poetry.group.dev.dependencies]
|
||||
|
@ -1,7 +1,7 @@
|
||||
import base64
|
||||
import io
|
||||
from pathlib import Path
|
||||
from typing import TYPE_CHECKING, Optional
|
||||
from typing import TYPE_CHECKING, Optional, Union
|
||||
from urllib.request import urlopen
|
||||
|
||||
import PIL.Image
|
||||
@ -29,7 +29,7 @@ class Image(DataService):
|
||||
def format(self) -> str:
|
||||
return self._format
|
||||
|
||||
def load_from_path(self, path: Path | str) -> None:
|
||||
def load_from_path(self, path: Union[Path, str]) -> None:
|
||||
with PIL.Image.open(path) as image:
|
||||
self._load_from_PIL(image)
|
||||
|
||||
@ -68,7 +68,7 @@ class Image(DataService):
|
||||
else:
|
||||
logger.error("Image format is 'None'. Skipping...")
|
||||
|
||||
def _get_image_format_from_bytes(self, value_: bytes) -> str | None:
|
||||
def _get_image_format_from_bytes(self, value_: bytes) -> Union[str, None]:
|
||||
image_data = base64.b64decode(value_)
|
||||
# Create a writable memory buffer for the image
|
||||
image_buffer = io.BytesIO(image_data)
|
||||
|
@ -1,4 +1,4 @@
|
||||
from typing import Any, Literal
|
||||
from typing import Any, Literal, Union
|
||||
|
||||
from loguru import logger
|
||||
|
||||
@ -39,11 +39,11 @@ class NumberSlider(DataService):
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
value: float | int = 0,
|
||||
value: Union[float, int] = 0,
|
||||
min: float = 0.0,
|
||||
max: float = 100.0,
|
||||
step_size: float | int = 1.0,
|
||||
type: Literal["int"] | Literal["float"] = "float",
|
||||
step_size: Union[float, int] = 1.0,
|
||||
type: Union[Literal["int"], Literal["float"]] = "float",
|
||||
) -> None:
|
||||
if type not in {"float", "int"}:
|
||||
logger.error(f"Unknown type '{type}'. Using 'float'.")
|
||||
|
@ -1,9 +1,9 @@
|
||||
from typing import Literal
|
||||
from typing import Literal, Union
|
||||
|
||||
from confz import BaseConfig, EnvSource
|
||||
|
||||
|
||||
class OperationMode(BaseConfig): # type: ignore
|
||||
environment: Literal["development"] | Literal["production"] = "development"
|
||||
environment: Union[Literal["development"], Literal["production"]] = "development"
|
||||
|
||||
CONFIG_SOURCES = EnvSource(allow=["ENVIRONMENT"])
|
||||
|
@ -1,8 +1,14 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import inspect
|
||||
from collections.abc import Callable
|
||||
from typing import TYPE_CHECKING, Any
|
||||
import sys
|
||||
|
||||
if sys.version_info < (3, 9):
|
||||
from typing import Callable # noqa
|
||||
else:
|
||||
from collections.abc import Callable
|
||||
|
||||
from typing import TYPE_CHECKING, Any, Union
|
||||
|
||||
from loguru import logger
|
||||
|
||||
@ -206,8 +212,8 @@ class CallbackManager:
|
||||
|
||||
def __register_recursive_parameter_callback(
|
||||
self,
|
||||
obj: "AbstractDataService | DataServiceList",
|
||||
callback: Callable[[str | int, Any], None],
|
||||
obj: Union["AbstractDataService", DataServiceList],
|
||||
callback: Callable[[Union[str, int], Any], None],
|
||||
) -> None:
|
||||
"""
|
||||
Register callback to a DataService or DataServiceList instance and its nested
|
||||
@ -222,7 +228,7 @@ class CallbackManager:
|
||||
if isinstance(obj, DataServiceList):
|
||||
# emits callback when item in list gets reassigned
|
||||
obj.add_callback(callback=callback)
|
||||
obj_list: DataServiceList | list[AbstractDataService] = obj
|
||||
obj_list: Union[DataServiceList, list[AbstractDataService]] = obj
|
||||
else:
|
||||
obj_list = [obj]
|
||||
|
||||
@ -337,7 +343,7 @@ class CallbackManager:
|
||||
|
||||
# Create and register a callback for the object
|
||||
# only emit the notification when the call was registered by the root object
|
||||
callback: Callable[[str, dict[str, Any] | None], None] = (
|
||||
callback: Callable[[str, Union[dict[str, Any], None]], None] = (
|
||||
lambda name, status: obj._callback_manager.emit_notification(
|
||||
parent_path=parent_path, name=name, value=status
|
||||
)
|
||||
|
@ -3,7 +3,7 @@ import inspect
|
||||
import json
|
||||
import os
|
||||
from enum import Enum
|
||||
from typing import Any, Optional, cast, get_type_hints
|
||||
from typing import Any, Dict, List, Optional, cast, get_type_hints
|
||||
|
||||
import rpyc
|
||||
from loguru import logger
|
||||
@ -28,7 +28,7 @@ from pydase.utils.warnings import (
|
||||
)
|
||||
|
||||
|
||||
def process_callable_attribute(attr: Any, args: dict[str, Any]) -> Any:
|
||||
def process_callable_attribute(attr: Any, args: Dict[str, Any]) -> Any:
|
||||
converted_args_or_error_msg = convert_arguments_to_hinted_types(
|
||||
args, get_type_hints(attr)
|
||||
)
|
||||
@ -94,7 +94,7 @@ class DataService(rpyc.Service, AbstractDataService):
|
||||
attr: Any,
|
||||
value: Any,
|
||||
index: Optional[int],
|
||||
path_list: list[str],
|
||||
path_list: List[str],
|
||||
) -> None:
|
||||
if isinstance(attr, Enum):
|
||||
update_value_if_changed(target_obj, attr_name, attr.__class__[value])
|
||||
@ -135,7 +135,7 @@ class DataService(rpyc.Service, AbstractDataService):
|
||||
with open(self._filename, "r") as f:
|
||||
# Load JSON data from file and update class attributes with these
|
||||
# values
|
||||
self.load_DataService_from_JSON(cast(dict[str, Any], json.load(f)))
|
||||
self.load_DataService_from_JSON(cast(Dict[str, Any], json.load(f)))
|
||||
|
||||
def write_to_file(self) -> None:
|
||||
"""
|
||||
@ -153,7 +153,7 @@ class DataService(rpyc.Service, AbstractDataService):
|
||||
'Skipping "write_to_file"...'
|
||||
)
|
||||
|
||||
def load_DataService_from_JSON(self, json_dict: dict[str, Any]) -> None:
|
||||
def load_DataService_from_JSON(self, json_dict: Dict[str, Any]) -> None:
|
||||
# Traverse the serialized representation and set the attributes of the class
|
||||
serialized_class = self.serialize()
|
||||
for path in generate_paths_from_DataService_dict(json_dict):
|
||||
@ -189,7 +189,7 @@ class DataService(rpyc.Service, AbstractDataService):
|
||||
f'"{class_value_type}". Ignoring value from JSON file...'
|
||||
)
|
||||
|
||||
def serialize(self) -> dict[str, dict[str, Any]]: # noqa
|
||||
def serialize(self) -> Dict[str, Dict[str, Any]]: # noqa
|
||||
"""
|
||||
Serializes the instance into a dictionary, preserving the structure of the
|
||||
instance.
|
||||
@ -218,7 +218,7 @@ class DataService(rpyc.Service, AbstractDataService):
|
||||
Returns:
|
||||
dict: The serialized instance.
|
||||
"""
|
||||
result: dict[str, dict[str, Any]] = {}
|
||||
result: Dict[str, Dict[str, Any]] = {}
|
||||
|
||||
# Get the dictionary of the base class
|
||||
base_set = set(type(super()).__dict__)
|
||||
@ -295,7 +295,7 @@ class DataService(rpyc.Service, AbstractDataService):
|
||||
sig = inspect.signature(value)
|
||||
|
||||
# Store parameters and their anotations in a dictionary
|
||||
parameters: dict[str, Optional[str]] = {}
|
||||
parameters: Dict[str, Optional[str]] = {}
|
||||
for k, v in sig.parameters.items():
|
||||
annotation = v.annotation
|
||||
if annotation is not inspect._empty:
|
||||
@ -356,7 +356,7 @@ class DataService(rpyc.Service, AbstractDataService):
|
||||
|
||||
def update_DataService_attribute(
|
||||
self,
|
||||
path_list: list[str],
|
||||
path_list: List[str],
|
||||
attr_name: str,
|
||||
value: Any,
|
||||
) -> None:
|
||||
|
@ -1,5 +1,11 @@
|
||||
from collections.abc import Callable
|
||||
from typing import Any
|
||||
import sys
|
||||
|
||||
if sys.version_info < (3, 10):
|
||||
from typing import Callable
|
||||
else:
|
||||
from collections.abc import Callable
|
||||
|
||||
from typing import Any, List, Union
|
||||
|
||||
from pydase.utils.warnings import (
|
||||
warn_if_instance_class_does_not_inherit_from_DataService,
|
||||
@ -30,11 +36,11 @@ class DataServiceList(list):
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*args: list[Any],
|
||||
callback: list[Callable[[int, Any], None]] | None = None,
|
||||
*args: List[Any],
|
||||
callback: Union[List[Callable[[int, Any], None]], None] = None,
|
||||
**kwargs: Any,
|
||||
) -> None:
|
||||
self.callbacks: list[Callable[[int, Any], None]] = []
|
||||
self.callbacks: List[Callable[[int, Any], None]] = []
|
||||
if isinstance(callback, list):
|
||||
self.callbacks = callback
|
||||
|
||||
|
@ -2,9 +2,15 @@ from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import inspect
|
||||
from collections.abc import Callable
|
||||
import sys
|
||||
|
||||
if sys.version_info < (3, 9):
|
||||
from typing import Callable # noqa
|
||||
else:
|
||||
from collections.abc import Callable
|
||||
|
||||
from functools import wraps
|
||||
from typing import TYPE_CHECKING, Any, TypedDict
|
||||
from typing import TYPE_CHECKING, Any, TypedDict, Union
|
||||
|
||||
from loguru import logger
|
||||
|
||||
@ -82,7 +88,7 @@ class TaskManager:
|
||||
"""
|
||||
|
||||
self.task_status_change_callbacks: list[
|
||||
Callable[[str, dict[str, Any] | None], Any]
|
||||
Callable[[str, Union[dict[str, Any], None]], Any]
|
||||
] = []
|
||||
"""A list of callback functions to be invoked when the status of a task (start
|
||||
or stop) changes."""
|
||||
|
@ -5,7 +5,7 @@ import threading
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from enum import Enum
|
||||
from types import FrameType
|
||||
from typing import Any, Optional, Protocol, TypedDict
|
||||
from typing import Any, Dict, List, Optional, Protocol, Type, TypedDict, Union
|
||||
|
||||
import uvicorn
|
||||
from loguru import logger
|
||||
@ -79,9 +79,9 @@ class AdditionalServer(TypedDict):
|
||||
it's instantiated.
|
||||
"""
|
||||
|
||||
server: type[AdditionalServerProtocol]
|
||||
server: Type[AdditionalServerProtocol]
|
||||
port: int
|
||||
kwargs: dict[str, Any]
|
||||
kwargs: Dict[str, Any]
|
||||
|
||||
|
||||
class Server:
|
||||
@ -163,8 +163,8 @@ class Server:
|
||||
enable_rpc: bool = True,
|
||||
enable_web: bool = True,
|
||||
use_forking_server: bool = False,
|
||||
web_settings: dict[str, Any] = {},
|
||||
additional_servers: list[AdditionalServer] = [],
|
||||
web_settings: Dict[str, Any] = {},
|
||||
additional_servers: List[AdditionalServer] = [],
|
||||
**kwargs: Any,
|
||||
) -> None:
|
||||
self._service = service
|
||||
@ -179,9 +179,9 @@ class Server:
|
||||
self._rpc_server_type = ForkingServer if use_forking_server else ThreadedServer
|
||||
self._additional_servers = additional_servers
|
||||
self.should_exit = False
|
||||
self.servers: dict[str, asyncio.Future[Any]] = {}
|
||||
self.executor: ThreadPoolExecutor | None = None
|
||||
self._info: dict[str, Any] = {
|
||||
self.servers: Dict[str, asyncio.Future[Any]] = {}
|
||||
self.executor: Union[ThreadPoolExecutor, None] = None
|
||||
self._info: Dict[str, Any] = {
|
||||
"name": self._service.get_service_name(),
|
||||
"version": __version__,
|
||||
"rpc_port": self._rpc_port,
|
||||
@ -386,7 +386,7 @@ class Server:
|
||||
self.should_exit = True
|
||||
|
||||
def custom_exception_handler(
|
||||
self, loop: asyncio.AbstractEventLoop, context: dict[str, Any]
|
||||
self, loop: asyncio.AbstractEventLoop, context: Dict[str, Any]
|
||||
) -> None:
|
||||
# if any background task creates an unhandled exception, shut down the entire
|
||||
# loop. It's possible we don't want to do this, maybe make this optional in the
|
||||
|
@ -1,5 +1,5 @@
|
||||
from pathlib import Path
|
||||
from typing import Any, TypedDict
|
||||
from typing import Any, Dict, TypedDict, Union
|
||||
|
||||
import socketio
|
||||
from fastapi import FastAPI
|
||||
@ -47,10 +47,10 @@ class WebAPI:
|
||||
def __init__( # noqa: CFQ002
|
||||
self,
|
||||
service: DataService,
|
||||
frontend: str | Path | None = None,
|
||||
css: str | Path | None = None,
|
||||
frontend: Union[str, Path, None] = None,
|
||||
css: Union[str, Path, None] = None,
|
||||
enable_CORS: bool = True,
|
||||
info: dict[str, Any] = {},
|
||||
info: Dict[str, Any] = {},
|
||||
*args: Any,
|
||||
**kwargs: Any,
|
||||
):
|
||||
@ -107,11 +107,11 @@ class WebAPI:
|
||||
return self.service.get_service_name()
|
||||
|
||||
@app.get("/info")
|
||||
def info() -> dict[str, Any]:
|
||||
def info() -> Dict[str, Any]:
|
||||
return self.info
|
||||
|
||||
@app.get("/service-properties")
|
||||
def service_properties() -> dict[str, Any]:
|
||||
def service_properties() -> Dict[str, Any]:
|
||||
return self.service.serialize()
|
||||
|
||||
app.mount(
|
||||
|
@ -1,4 +1,4 @@
|
||||
from typing import TypedDict
|
||||
from typing import TypedDict, Union
|
||||
|
||||
import pint
|
||||
|
||||
@ -10,12 +10,12 @@ Unit = units.Unit
|
||||
|
||||
|
||||
class QuantityDict(TypedDict):
|
||||
magnitude: int | float
|
||||
magnitude: Union[int, float]
|
||||
unit: str
|
||||
|
||||
|
||||
def convert_to_quantity(
|
||||
value: QuantityDict | float | int | Quantity, unit: str = ""
|
||||
value: Union[QuantityDict, float, int, Quantity], unit: str = ""
|
||||
) -> Quantity:
|
||||
"""
|
||||
Convert a given value into a pint.Quantity object with the specified unit.
|
||||
@ -47,7 +47,7 @@ def convert_to_quantity(
|
||||
will be unitless.
|
||||
"""
|
||||
|
||||
if isinstance(value, int | float):
|
||||
if isinstance(value, (int, float)):
|
||||
quantity = float(value) * Unit(unit)
|
||||
elif isinstance(value, dict):
|
||||
quantity = float(value["magnitude"]) * Unit(value["unit"])
|
||||
|
@ -1,13 +1,13 @@
|
||||
import re
|
||||
from itertools import chain
|
||||
from typing import Any, Optional, cast
|
||||
from typing import Any, Dict, List, Optional, Tuple, Union, cast
|
||||
|
||||
from loguru import logger
|
||||
|
||||
STANDARD_TYPES = ("int", "float", "bool", "str", "Enum", "NoneType", "Quantity")
|
||||
|
||||
|
||||
def get_class_and_instance_attributes(obj: object) -> dict[str, Any]:
|
||||
def get_class_and_instance_attributes(obj: Any) -> Dict[str, Any]:
|
||||
"""Dictionary containing all attributes (both instance and class level) of a
|
||||
given object.
|
||||
|
||||
@ -22,7 +22,7 @@ def get_class_and_instance_attributes(obj: object) -> dict[str, Any]:
|
||||
return attrs
|
||||
|
||||
|
||||
def get_object_attr_from_path(target_obj: Any, path: list[str]) -> Any:
|
||||
def get_object_attr_from_path(target_obj: Any, path: List[str]) -> Any:
|
||||
"""
|
||||
Traverse the object tree according to the given path.
|
||||
|
||||
@ -57,7 +57,7 @@ def get_object_attr_from_path(target_obj: Any, path: list[str]) -> Any:
|
||||
|
||||
def generate_paths_from_DataService_dict(
|
||||
data: dict, parent_path: str = ""
|
||||
) -> list[str]:
|
||||
) -> List[str]:
|
||||
"""
|
||||
Recursively generate paths from a dictionary representing a DataService object.
|
||||
|
||||
@ -126,7 +126,9 @@ def generate_paths_from_DataService_dict(
|
||||
return paths
|
||||
|
||||
|
||||
def extract_dict_or_list_entry(data: dict[str, Any], key: str) -> dict[str, Any] | None:
|
||||
def extract_dict_or_list_entry(
|
||||
data: Dict[str, Any], key: str
|
||||
) -> Union[Dict[str, Any], None]:
|
||||
"""
|
||||
Extract a nested dictionary or list entry based on the provided key.
|
||||
|
||||
@ -178,7 +180,7 @@ def extract_dict_or_list_entry(data: dict[str, Any], key: str) -> dict[str, Any]
|
||||
else:
|
||||
logger.error(f"Invalid index format in key: {key}")
|
||||
|
||||
current_data: dict[str, Any] | list[dict[str, Any]] | None = data.get(
|
||||
current_data: Union[Dict[str, Any], List[Dict[str, Any]], None] = data.get(
|
||||
attr_name, None
|
||||
)
|
||||
if not isinstance(current_data, dict):
|
||||
@ -197,14 +199,14 @@ def extract_dict_or_list_entry(data: dict[str, Any], key: str) -> dict[str, Any]
|
||||
# When the attribute is a class instance, the attributes are nested in the
|
||||
# "value" key
|
||||
if current_data["type"] not in STANDARD_TYPES:
|
||||
current_data = cast(dict[str, Any], current_data.get("value", None)) # type: ignore
|
||||
current_data = cast(Dict[str, Any], current_data.get("value", None)) # type: ignore
|
||||
assert isinstance(current_data, dict)
|
||||
|
||||
return current_data
|
||||
|
||||
|
||||
def get_nested_value_from_DataService_by_path_and_key(
|
||||
data: dict[str, Any], path: str, key: str = "value"
|
||||
data: Dict[str, Any], path: str, key: str = "value"
|
||||
) -> Any:
|
||||
"""
|
||||
Get the value associated with a specific key from a dictionary given a path.
|
||||
@ -250,8 +252,8 @@ def get_nested_value_from_DataService_by_path_and_key(
|
||||
"""
|
||||
|
||||
# Split the path into parts
|
||||
parts: list[str] = re.split(r"\.", path) # Split by '.'
|
||||
current_data: dict[str, Any] | None = data
|
||||
parts: List[str] = re.split(r"\.", path) # Split by '.'
|
||||
current_data: Union[Dict[str, Any], None] = data
|
||||
|
||||
for part in parts:
|
||||
if current_data is None:
|
||||
@ -263,8 +265,8 @@ def get_nested_value_from_DataService_by_path_and_key(
|
||||
|
||||
|
||||
def convert_arguments_to_hinted_types(
|
||||
args: dict[str, Any], type_hints: dict[str, Any]
|
||||
) -> dict[str, Any] | str:
|
||||
args: Dict[str, Any], type_hints: Dict[str, Any]
|
||||
) -> Union[Dict[str, Any], str]:
|
||||
"""
|
||||
Convert the given arguments to their types hinted in the type_hints dictionary.
|
||||
|
||||
@ -306,7 +308,7 @@ def convert_arguments_to_hinted_types(
|
||||
|
||||
|
||||
def update_value_if_changed(
|
||||
target: Any, attr_name_or_index: str | int, new_value: Any
|
||||
target: Any, attr_name_or_index: Union[str, int], new_value: Any
|
||||
) -> None:
|
||||
"""
|
||||
Updates the value of an attribute or a list element on a target object if the new
|
||||
@ -342,7 +344,7 @@ def update_value_if_changed(
|
||||
logger.error(f"Incompatible arguments: {target}, {attr_name_or_index}.")
|
||||
|
||||
|
||||
def parse_list_attr_and_index(attr_string: str) -> tuple[str, Optional[int]]:
|
||||
def parse_list_attr_and_index(attr_string: str) -> Tuple[str, Optional[int]]:
|
||||
"""
|
||||
Parses an attribute string and extracts a potential list attribute name and its
|
||||
index.
|
||||
@ -381,7 +383,7 @@ def parse_list_attr_and_index(attr_string: str) -> tuple[str, Optional[int]]:
|
||||
return attr_name, index
|
||||
|
||||
|
||||
def get_component_class_names() -> list[str]:
|
||||
def get_component_class_names() -> List[str]:
|
||||
"""
|
||||
Returns the names of the component classes in a list.
|
||||
|
||||
|
@ -1,7 +1,7 @@
|
||||
import logging
|
||||
import sys
|
||||
from types import FrameType
|
||||
from typing import Optional
|
||||
from typing import Optional, Union
|
||||
|
||||
import loguru
|
||||
import rpyc
|
||||
@ -21,7 +21,7 @@ class InterceptHandler(logging.Handler):
|
||||
return
|
||||
|
||||
# Get corresponding Loguru level if it exists.
|
||||
level: int | str
|
||||
level: Union[int, str]
|
||||
try:
|
||||
level = loguru.logger.level(record.levelname).name
|
||||
except ValueError:
|
||||
|
@ -1,4 +1,10 @@
|
||||
from collections.abc import Generator
|
||||
import sys
|
||||
|
||||
if sys.version_info < (3, 10):
|
||||
from typing import Generator
|
||||
else:
|
||||
from collections.abc import Generator
|
||||
|
||||
from typing import Any
|
||||
|
||||
import pytest
|
||||
|
Loading…
x
Reference in New Issue
Block a user