mirror of
https://github.com/tiqi-group/pydase_service_base.git
synced 2025-04-20 08:20:03 +02:00
fixes ionizer interface (new pydase version)
This commit is contained in:
parent
a144edfb75
commit
b6eb0a49c7
@ -1,3 +1,4 @@
|
|||||||
|
import logging
|
||||||
from enum import Enum
|
from enum import Enum
|
||||||
from typing import Any
|
from typing import Any
|
||||||
|
|
||||||
@ -5,22 +6,36 @@ import pydase
|
|||||||
import pydase.components
|
import pydase.components
|
||||||
import pydase.units as u
|
import pydase.units as u
|
||||||
import tiqi_rpc
|
import tiqi_rpc
|
||||||
from pydase.utils.helpers import get_object_attr_from_path
|
from pydase.data_service.data_service_observer import DataServiceObserver
|
||||||
|
from pydase.utils.helpers import get_object_attr_from_path_list
|
||||||
|
|
||||||
from icon_service_base.ionizer_interface.rpc_interface import RPCInterface
|
from icon_service_base.ionizer_interface.rpc_interface import RPCInterface
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class IonizerServer:
|
class IonizerServer:
|
||||||
def __init__(
|
def __init__(
|
||||||
self, service: pydase.DataService, port: int, host: str, **kwargs: Any
|
self,
|
||||||
|
data_service_observer: DataServiceObserver,
|
||||||
|
host: str,
|
||||||
|
port: int,
|
||||||
|
**kwargs: Any,
|
||||||
) -> None:
|
) -> None:
|
||||||
|
self.data_service_observer = data_service_observer
|
||||||
|
self.service = self.data_service_observer.state_manager.service
|
||||||
self.server = tiqi_rpc.Server(
|
self.server = tiqi_rpc.Server(
|
||||||
RPCInterface(service, **kwargs),
|
RPCInterface(self.service, **kwargs),
|
||||||
host=host,
|
host=host,
|
||||||
port=port, # type: ignore
|
port=port, # type: ignore
|
||||||
)
|
)
|
||||||
|
|
||||||
def notify_Ionizer(parent_path: str, attr_name: str, value: Any) -> None:
|
self.data_service_observer.add_notification_callback(self.notify_Ionizer)
|
||||||
|
self.server.install_signal_handlers = lambda: None # type: ignore
|
||||||
|
|
||||||
|
def notify_Ionizer(
|
||||||
|
self, full_access_path: str, value: Any, cached_value: dict[str, Any]
|
||||||
|
) -> None:
|
||||||
"""This function notifies Ionizer about changed values.
|
"""This function notifies Ionizer about changed values.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
@ -28,25 +43,26 @@ class IonizerServer:
|
|||||||
- attr_name (str): The name of the changed parameter.
|
- attr_name (str): The name of the changed parameter.
|
||||||
- value (Any): The value of the parameter.
|
- value (Any): The value of the parameter.
|
||||||
"""
|
"""
|
||||||
parent_path_list = parent_path.split(".")[1:] # without classname
|
parent_path_list, attr_name = (
|
||||||
name = ".".join([*parent_path_list, attr_name])
|
full_access_path.split(".")[:-1],
|
||||||
|
full_access_path.split(".")[-1],
|
||||||
|
) # without classname
|
||||||
if isinstance(value, Enum):
|
if isinstance(value, Enum):
|
||||||
value = value.value
|
value = value.value
|
||||||
if isinstance(value, u.Quantity):
|
if isinstance(value, u.Quantity):
|
||||||
value = value.m
|
value = value.m
|
||||||
if attr_name == "value":
|
if attr_name == "value":
|
||||||
parent_object = get_object_attr_from_path(service, parent_path_list)
|
parent_object = get_object_attr_from_path_list(
|
||||||
|
self.service, parent_path_list
|
||||||
|
)
|
||||||
if isinstance(parent_object, pydase.components.NumberSlider):
|
if isinstance(parent_object, pydase.components.NumberSlider):
|
||||||
# removes the "value" from name -> Ionizer does not know about the
|
# removes the "value" from name -> Ionizer does not know about the
|
||||||
# internals of NumberSlider
|
# internals of NumberSlider
|
||||||
name = ".".join(name.split(".")[:-1])
|
full_access_path = ".".join(full_access_path.split(".")[:-1])
|
||||||
|
|
||||||
return self.server._handler.notify( # type: ignore
|
return self.server._handler.notify( # type: ignore
|
||||||
{"name": name, "value": value}
|
{"name": full_access_path, "value": value}
|
||||||
)
|
)
|
||||||
|
|
||||||
service._callback_manager.add_notification_callback(notify_Ionizer)
|
|
||||||
self.server.install_signal_handlers = lambda: None # type: ignore
|
|
||||||
|
|
||||||
async def serve(self) -> None:
|
async def serve(self) -> None:
|
||||||
await self.server.serve()
|
await self.server.serve()
|
||||||
|
@ -1,11 +1,11 @@
|
|||||||
import inspect
|
import inspect
|
||||||
from enum import Enum
|
from enum import Enum
|
||||||
from typing import Any, Optional
|
from typing import Any
|
||||||
|
|
||||||
from pydase import DataService
|
from pydase import DataService
|
||||||
from pydase.components import NumberSlider
|
from pydase.components import NumberSlider
|
||||||
from pydase.units import Quantity
|
from pydase.units import Quantity
|
||||||
from pydase.utils.helpers import get_object_attr_from_path
|
from pydase.utils.helpers import get_object_attr_from_path_list
|
||||||
from pydase.version import __version__
|
from pydase.version import __version__
|
||||||
|
|
||||||
|
|
||||||
@ -27,9 +27,7 @@ class RPCInterface:
|
|||||||
async def info(self) -> dict:
|
async def info(self) -> dict:
|
||||||
return self._info
|
return self._info
|
||||||
|
|
||||||
async def get_props(self, name: Optional[str] = None) -> dict[str, Any]:
|
async def get_props(self) -> dict[str, Any]:
|
||||||
if name is None:
|
|
||||||
return self._service.serialize()
|
|
||||||
return self._service.serialize()
|
return self._service.serialize()
|
||||||
|
|
||||||
async def get_param(self, full_access_path: str) -> Any:
|
async def get_param(self, full_access_path: str) -> Any:
|
||||||
@ -38,26 +36,27 @@ class RPCInterface:
|
|||||||
This method is called when Ionizer initilizes the Plugin or refreshes. The
|
This method is called when Ionizer initilizes the Plugin or refreshes. The
|
||||||
widgets need to store the full_access_path in their name attribute.
|
widgets need to store the full_access_path in their name attribute.
|
||||||
"""
|
"""
|
||||||
param = get_object_attr_from_path(self._service, full_access_path.split("."))
|
param = get_object_attr_from_path_list(
|
||||||
|
self._service, full_access_path.split(".")
|
||||||
|
)
|
||||||
if isinstance(param, NumberSlider):
|
if isinstance(param, NumberSlider):
|
||||||
return param.value
|
return param.value
|
||||||
elif isinstance(param, DataService):
|
if isinstance(param, DataService):
|
||||||
return param.serialize()
|
return param.serialize()
|
||||||
elif inspect.ismethod(param):
|
if inspect.ismethod(param):
|
||||||
# explicitly serialize any methods that will be returned
|
# explicitly serialize any methods that will be returned
|
||||||
full_access_path = param.__name__
|
full_access_path = param.__name__
|
||||||
args = inspect.signature(param).parameters
|
args = inspect.signature(param).parameters
|
||||||
return f"{full_access_path}({', '.join(args)})"
|
return f"{full_access_path}({', '.join(args)})"
|
||||||
elif isinstance(param, Enum):
|
if isinstance(param, Enum):
|
||||||
return param.value
|
return param.value
|
||||||
elif isinstance(param, Quantity):
|
if isinstance(param, Quantity):
|
||||||
return param.m
|
return param.m
|
||||||
else:
|
|
||||||
return param
|
return param
|
||||||
|
|
||||||
async def set_param(self, full_access_path: str, value: Any) -> None:
|
async def set_param(self, full_access_path: str, value: Any) -> None:
|
||||||
parent_path_list = full_access_path.split(".")[:-1]
|
parent_path_list = full_access_path.split(".")[:-1]
|
||||||
parent_object = get_object_attr_from_path(self._service, parent_path_list)
|
parent_object = get_object_attr_from_path_list(self._service, parent_path_list)
|
||||||
attr_name = full_access_path.split(".")[-1]
|
attr_name = full_access_path.split(".")[-1]
|
||||||
# I don't want to trigger the execution of a property getter as this might take
|
# I don't want to trigger the execution of a property getter as this might take
|
||||||
# a while when connecting to remote devices
|
# a while when connecting to remote devices
|
||||||
@ -80,7 +79,9 @@ class RPCInterface:
|
|||||||
|
|
||||||
async def remote_call(self, full_access_path: str, *args: Any) -> Any:
|
async def remote_call(self, full_access_path: str, *args: Any) -> Any:
|
||||||
full_access_path_list = full_access_path.split(".")
|
full_access_path_list = full_access_path.split(".")
|
||||||
method_object = get_object_attr_from_path(self._service, full_access_path_list)
|
method_object = get_object_attr_from_path_list(
|
||||||
|
self._service, full_access_path_list
|
||||||
|
)
|
||||||
return method_object(*args)
|
return method_object(*args)
|
||||||
|
|
||||||
async def emit(self, message: str) -> None:
|
async def emit(self, message: str) -> None:
|
||||||
|
Loading…
x
Reference in New Issue
Block a user