mirror of
https://github.com/tiqi-group/pydase.git
synced 2025-04-20 08:20:02 +02:00
Merge pull request #99 from tiqi-group/cleanup/removes_deprecated_code
Cleanup/removes deprecated code
This commit is contained in:
commit
4f1cc4787d
@ -1,8 +1,7 @@
|
||||
import inspect
|
||||
import logging
|
||||
import warnings
|
||||
from enum import Enum
|
||||
from typing import TYPE_CHECKING, Any, get_type_hints
|
||||
from typing import Any, get_type_hints
|
||||
|
||||
import rpyc # type: ignore[import-untyped]
|
||||
|
||||
@ -15,20 +14,12 @@ from pydase.observer_pattern.observable.observable import (
|
||||
from pydase.utils.helpers import (
|
||||
convert_arguments_to_hinted_types,
|
||||
get_class_and_instance_attributes,
|
||||
get_object_attr_from_path_list,
|
||||
is_property_attribute,
|
||||
parse_list_attr_and_index,
|
||||
update_value_if_changed,
|
||||
)
|
||||
from pydase.utils.serializer import (
|
||||
Serializer,
|
||||
generate_serialized_data_paths,
|
||||
get_nested_dict_by_path,
|
||||
)
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from pathlib import Path
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@ -51,16 +42,6 @@ class DataService(rpyc.Service, AbstractDataService):
|
||||
if not hasattr(self, "_autostart_tasks"):
|
||||
self._autostart_tasks = {}
|
||||
|
||||
filename = kwargs.pop("filename", None)
|
||||
if filename is not None:
|
||||
warnings.warn(
|
||||
"The 'filename' argument is deprecated and will be removed in a future "
|
||||
"version. Please pass the 'filename' argument to `pydase.Server`.",
|
||||
DeprecationWarning,
|
||||
stacklevel=2,
|
||||
)
|
||||
self._filename: str | Path = filename
|
||||
|
||||
self.__check_instance_classes()
|
||||
self._initialised = True
|
||||
|
||||
@ -125,27 +106,6 @@ class DataService(rpyc.Service, AbstractDataService):
|
||||
):
|
||||
self.__warn_if_not_observable(attr_value)
|
||||
|
||||
def __set_attribute_based_on_type( # noqa: PLR0913
|
||||
self,
|
||||
target_obj: Any,
|
||||
attr_name: str,
|
||||
attr: Any,
|
||||
value: Any,
|
||||
index: int | None,
|
||||
path_list: list[str],
|
||||
) -> None:
|
||||
if isinstance(attr, Enum):
|
||||
update_value_if_changed(target_obj, attr_name, attr.__class__[value])
|
||||
elif isinstance(attr, list) and index is not None:
|
||||
update_value_if_changed(attr, index, value)
|
||||
elif isinstance(attr, DataService) and isinstance(value, dict):
|
||||
for key, v in value.items():
|
||||
self.update_DataService_attribute([*path_list, attr_name], key, v)
|
||||
elif callable(attr):
|
||||
process_callable_attribute(attr, value["args"])
|
||||
else:
|
||||
update_value_if_changed(target_obj, attr_name, value)
|
||||
|
||||
def _rpyc_getattr(self, name: str) -> Any:
|
||||
if name.startswith("_"):
|
||||
# disallow special and private attributes
|
||||
@ -166,71 +126,6 @@ class DataService(rpyc.Service, AbstractDataService):
|
||||
# allow all other attributes
|
||||
setattr(self, name, value)
|
||||
|
||||
def write_to_file(self) -> None:
|
||||
"""
|
||||
Serialize the DataService instance and write it to a JSON file.
|
||||
|
||||
This method is deprecated and will be removed in a future version.
|
||||
Service persistence is handled by `pydase.Server` now, instead.
|
||||
"""
|
||||
|
||||
warnings.warn(
|
||||
"'write_to_file' is deprecated and will be removed in a future version. "
|
||||
"Service persistence is handled by `pydase.Server` now, instead.",
|
||||
DeprecationWarning,
|
||||
stacklevel=2,
|
||||
)
|
||||
|
||||
if hasattr(self, "_state_manager"):
|
||||
self._state_manager.save_state()
|
||||
|
||||
def load_DataService_from_JSON( # noqa: N802
|
||||
self, json_dict: dict[str, Any]
|
||||
) -> None:
|
||||
warnings.warn(
|
||||
"'load_DataService_from_JSON' is deprecated and will be removed in a "
|
||||
"future version. "
|
||||
"Service persistence is handled by `pydase.Server` now, instead.",
|
||||
DeprecationWarning,
|
||||
stacklevel=2,
|
||||
)
|
||||
|
||||
# Traverse the serialized representation and set the attributes of the class
|
||||
serialized_class = self.serialize()["value"]
|
||||
for path in generate_serialized_data_paths(json_dict):
|
||||
nested_json_dict = get_nested_dict_by_path(json_dict, path)
|
||||
value = nested_json_dict["value"]
|
||||
value_type = nested_json_dict["type"]
|
||||
|
||||
nested_class_dict = get_nested_dict_by_path(serialized_class, path)
|
||||
class_value_type = nested_class_dict.get("type", None)
|
||||
if class_value_type == value_type:
|
||||
class_attr_is_read_only = nested_class_dict["readonly"]
|
||||
if class_attr_is_read_only:
|
||||
logger.debug(
|
||||
"Attribute '%s' is read-only. Ignoring value from JSON "
|
||||
"file...",
|
||||
path,
|
||||
)
|
||||
continue
|
||||
# Split the path into parts
|
||||
parts = path.split(".")
|
||||
attr_name = parts[-1]
|
||||
|
||||
# Convert dictionary into Quantity
|
||||
if class_value_type == "Quantity":
|
||||
value = u.convert_to_quantity(value)
|
||||
|
||||
self.update_DataService_attribute(parts[:-1], attr_name, value)
|
||||
else:
|
||||
logger.info(
|
||||
"Attribute type of '%s' changed from '%s' to "
|
||||
"'%s'. Ignoring value from JSON file...",
|
||||
path,
|
||||
value_type,
|
||||
class_value_type,
|
||||
)
|
||||
|
||||
def serialize(self) -> dict[str, dict[str, Any]]:
|
||||
"""
|
||||
Serializes the instance into a dictionary, preserving the structure of the
|
||||
@ -249,37 +144,3 @@ class DataService(rpyc.Service, AbstractDataService):
|
||||
dict: The serialized instance.
|
||||
"""
|
||||
return Serializer.serialize_object(self)
|
||||
|
||||
def update_DataService_attribute( # noqa: N802
|
||||
self,
|
||||
path_list: list[str],
|
||||
attr_name: str,
|
||||
value: Any,
|
||||
) -> None:
|
||||
warnings.warn(
|
||||
"'update_DataService_attribute' is deprecated and will be removed in a "
|
||||
"future version. "
|
||||
"Service state management is handled by `pydase.data_service.state_manager`"
|
||||
"now, instead.",
|
||||
DeprecationWarning,
|
||||
stacklevel=2,
|
||||
)
|
||||
|
||||
# If attr_name corresponds to a list entry, extract the attr_name and the index
|
||||
attr_name, index = parse_list_attr_and_index(attr_name)
|
||||
# Traverse the object according to the path parts
|
||||
target_obj = get_object_attr_from_path_list(self, path_list)
|
||||
|
||||
# If the attribute is a property, change it using the setter without getting the
|
||||
# property value (would otherwise be bad for expensive getter methods)
|
||||
if is_property_attribute(target_obj, attr_name):
|
||||
setattr(target_obj, attr_name, value)
|
||||
return
|
||||
|
||||
attr = get_object_attr_from_path_list(target_obj, [attr_name])
|
||||
if attr is None:
|
||||
return
|
||||
|
||||
self.__set_attribute_based_on_type(
|
||||
target_obj, attr_name, attr, value, index, path_list
|
||||
)
|
||||
|
@ -177,8 +177,6 @@ class Server:
|
||||
self.servers: dict[str, asyncio.Future[Any]] = {}
|
||||
self.executor: ThreadPoolExecutor | None = None
|
||||
self._state_manager = StateManager(self._service, filename)
|
||||
if getattr(self._service, "_filename", None) is not None:
|
||||
self._service._state_manager = self._state_manager
|
||||
self._observer = DataServiceObserver(self._state_manager)
|
||||
self._state_manager.load_state()
|
||||
|
||||
|
@ -5,7 +5,6 @@ from typing import Any
|
||||
import pydase
|
||||
import pydase.components
|
||||
import pydase.units as u
|
||||
import pytest
|
||||
from pydase.data_service.data_service_observer import DataServiceObserver
|
||||
from pydase.data_service.state_manager import (
|
||||
StateManager,
|
||||
@ -251,16 +250,6 @@ def test_load_state(tmp_path: Path, caplog: LogCaptureFixture) -> None:
|
||||
assert "'my_slider.step_size' changed to '2.0'" in caplog.text
|
||||
|
||||
|
||||
def test_filename_warning(tmp_path: Path, caplog: LogCaptureFixture) -> None:
|
||||
file = tmp_path / "test_state.json"
|
||||
|
||||
with pytest.warns(DeprecationWarning):
|
||||
service = Service(filename=str(file))
|
||||
StateManager(service=service, filename=str(file))
|
||||
|
||||
assert f"Overwriting filename {str(file)!r} with {str(file)!r}." in caplog.text
|
||||
|
||||
|
||||
def test_filename_error(caplog: LogCaptureFixture) -> None:
|
||||
service = Service()
|
||||
manager = StateManager(service=service)
|
||||
|
@ -1,9 +1,10 @@
|
||||
from typing import Any
|
||||
|
||||
import pydase
|
||||
import pydase.units as u
|
||||
from pydase.data_service.data_service import DataService
|
||||
from pydase.data_service.data_service_observer import DataServiceObserver
|
||||
from pydase.data_service.state_manager import StateManager
|
||||
from pydase.data_service.state_manager import StateManager, load_state
|
||||
from pytest import LogCaptureFixture
|
||||
|
||||
|
||||
@ -99,7 +100,10 @@ def test_autoconvert_offset_to_baseunit() -> None:
|
||||
def test_loading_from_json(caplog: LogCaptureFixture) -> None:
|
||||
"""This function tests if the quantity read from the json description is actually
|
||||
passed as a quantity to the property setter."""
|
||||
JSON_DICT = {
|
||||
import json
|
||||
import tempfile
|
||||
|
||||
serialization_dict = {
|
||||
"some_unit": {
|
||||
"type": "Quantity",
|
||||
"value": {"magnitude": 10.0, "unit": "A"},
|
||||
@ -118,14 +122,17 @@ def test_loading_from_json(caplog: LogCaptureFixture) -> None:
|
||||
return self._unit
|
||||
|
||||
@some_unit.setter
|
||||
@load_state
|
||||
def some_unit(self, value: u.Quantity) -> None:
|
||||
assert isinstance(value, u.Quantity)
|
||||
self._unit = value
|
||||
|
||||
service_instance = ServiceClass()
|
||||
state_manager = StateManager(service_instance)
|
||||
DataServiceObserver(state_manager)
|
||||
|
||||
service_instance.load_DataService_from_JSON(JSON_DICT)
|
||||
fp = tempfile.NamedTemporaryFile("w+")
|
||||
json.dump(serialization_dict, fp)
|
||||
fp.seek(0)
|
||||
|
||||
pydase.Server(service_instance, filename=fp.name)
|
||||
|
||||
assert "'some_unit' changed to '10.0 A'" in caplog.text
|
||||
|
Loading…
x
Reference in New Issue
Block a user