mirror of
https://github.com/tiqi-group/pydase.git
synced 2025-04-20 00:10:03 +02:00
using __future__.annotations instead of quoted types
This commit is contained in:
parent
19f91b7cf3
commit
999a6016ff
@ -1,33 +1,36 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from abc import ABC, abstractmethod
|
||||
from collections.abc import Iterable
|
||||
from typing import TYPE_CHECKING, Any, ClassVar, SupportsIndex
|
||||
|
||||
from pydase.utils.helpers import parse_serialized_key
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from collections.abc import Iterable
|
||||
|
||||
from pydase.observer_pattern.observer.observer import Observer
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ObservableObject(ABC):
|
||||
_list_mapping: ClassVar[dict[int, "_ObservableList"]] = {}
|
||||
_dict_mapping: ClassVar[dict[int, "_ObservableDict"]] = {}
|
||||
_list_mapping: ClassVar[dict[int, _ObservableList]] = {}
|
||||
_dict_mapping: ClassVar[dict[int, _ObservableDict]] = {}
|
||||
|
||||
def __init__(self) -> None:
|
||||
if not hasattr(self, "_observers"):
|
||||
self._observers: dict[str, list["ObservableObject | Observer"]] = {}
|
||||
self._observers: dict[str, list[ObservableObject | Observer]] = {}
|
||||
|
||||
def add_observer(
|
||||
self, observer: "ObservableObject | Observer", attr_name: str = ""
|
||||
self, observer: ObservableObject | Observer, attr_name: str = ""
|
||||
) -> None:
|
||||
if attr_name not in self._observers:
|
||||
self._observers[attr_name] = []
|
||||
if observer not in self._observers[attr_name]:
|
||||
self._observers[attr_name].append(observer)
|
||||
|
||||
def _remove_observer(self, observer: "ObservableObject", attribute: str) -> None:
|
||||
def _remove_observer(self, observer: ObservableObject, attribute: str) -> None:
|
||||
if attribute in self._observers:
|
||||
self._observers[attribute].remove(observer)
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user