mirror of
https://github.com/tiqi-group/pydase.git
synced 2025-04-22 01:00:02 +02:00
adds ProxyList methods
This commit is contained in:
parent
507f286963
commit
fbada6d818
@ -1,9 +1,11 @@
|
||||
import asyncio
|
||||
import logging
|
||||
from collections.abc import Iterable
|
||||
from copy import copy
|
||||
from typing import TYPE_CHECKING, Any, cast
|
||||
|
||||
import socketio # type: ignore
|
||||
from typing_extensions import SupportsIndex
|
||||
|
||||
from pydase.utils.serialization.deserializer import Deserializer, loads
|
||||
from pydase.utils.serialization.serializer import dump
|
||||
@ -52,6 +54,139 @@ class ProxyList(list[Any]):
|
||||
serialized_object=result, sio_client=self._sio, loop=self._loop
|
||||
)
|
||||
|
||||
def append(self, __object: Any) -> None:
|
||||
full_access_path = f"{self._parent_path}.append"
|
||||
|
||||
async def set_result() -> Any:
|
||||
return await self._sio.call(
|
||||
"trigger_method",
|
||||
{
|
||||
"access_path": full_access_path,
|
||||
"args": dump([__object]),
|
||||
"kwargs": dump({}),
|
||||
},
|
||||
)
|
||||
|
||||
result: SerializedObject | None = asyncio.run_coroutine_threadsafe(
|
||||
set_result(),
|
||||
loop=self._loop,
|
||||
).result()
|
||||
if result is not None:
|
||||
ProxyLoader.loads_proxy(
|
||||
serialized_object=result, sio_client=self._sio, loop=self._loop
|
||||
)
|
||||
|
||||
def clear(self) -> None:
|
||||
full_access_path = f"{self._parent_path}.clear"
|
||||
|
||||
async def set_result() -> Any:
|
||||
return await self._sio.call(
|
||||
"trigger_method",
|
||||
{
|
||||
"access_path": full_access_path,
|
||||
"args": dump([]),
|
||||
"kwargs": dump({}),
|
||||
},
|
||||
)
|
||||
|
||||
result: SerializedObject | None = asyncio.run_coroutine_threadsafe(
|
||||
set_result(),
|
||||
loop=self._loop,
|
||||
).result()
|
||||
if result is not None:
|
||||
ProxyLoader.loads_proxy(
|
||||
serialized_object=result, sio_client=self._sio, loop=self._loop
|
||||
)
|
||||
|
||||
def extend(self, __iterable: Iterable[Any]) -> None:
|
||||
full_access_path = f"{self._parent_path}.extend"
|
||||
|
||||
async def set_result() -> Any:
|
||||
return await self._sio.call(
|
||||
"trigger_method",
|
||||
{
|
||||
"access_path": full_access_path,
|
||||
"args": dump([__iterable]),
|
||||
"kwargs": dump({}),
|
||||
},
|
||||
)
|
||||
|
||||
result: SerializedObject | None = asyncio.run_coroutine_threadsafe(
|
||||
set_result(),
|
||||
loop=self._loop,
|
||||
).result()
|
||||
if result is not None:
|
||||
ProxyLoader.loads_proxy(
|
||||
serialized_object=result, sio_client=self._sio, loop=self._loop
|
||||
)
|
||||
|
||||
def insert(self, __index: SupportsIndex, __object: Any) -> None:
|
||||
full_access_path = f"{self._parent_path}.insert"
|
||||
|
||||
async def set_result() -> Any:
|
||||
return await self._sio.call(
|
||||
"trigger_method",
|
||||
{
|
||||
"access_path": full_access_path,
|
||||
"args": dump([__index, __object]),
|
||||
"kwargs": dump({}),
|
||||
},
|
||||
)
|
||||
|
||||
result: SerializedObject | None = asyncio.run_coroutine_threadsafe(
|
||||
set_result(),
|
||||
loop=self._loop,
|
||||
).result()
|
||||
if result is not None:
|
||||
ProxyLoader.loads_proxy(
|
||||
serialized_object=result, sio_client=self._sio, loop=self._loop
|
||||
)
|
||||
|
||||
def pop(self, __index: SupportsIndex = -1) -> Any:
|
||||
full_access_path = f"{self._parent_path}.pop"
|
||||
|
||||
async def set_result() -> Any:
|
||||
return await self._sio.call(
|
||||
"trigger_method",
|
||||
{
|
||||
"access_path": full_access_path,
|
||||
"args": dump([__index]),
|
||||
"kwargs": dump({}),
|
||||
},
|
||||
)
|
||||
|
||||
result: SerializedObject | None = asyncio.run_coroutine_threadsafe(
|
||||
set_result(),
|
||||
loop=self._loop,
|
||||
).result()
|
||||
if result is not None:
|
||||
return ProxyLoader.loads_proxy(
|
||||
serialized_object=result, sio_client=self._sio, loop=self._loop
|
||||
)
|
||||
return None
|
||||
|
||||
def remove(self, __value: Any) -> None:
|
||||
full_access_path = f"{self._parent_path}.remove"
|
||||
|
||||
async def set_result() -> Any:
|
||||
return await self._sio.call(
|
||||
"trigger_method",
|
||||
{
|
||||
"access_path": full_access_path,
|
||||
"args": dump([__value]),
|
||||
"kwargs": dump({}),
|
||||
},
|
||||
)
|
||||
|
||||
result: SerializedObject | None = asyncio.run_coroutine_threadsafe(
|
||||
set_result(),
|
||||
loop=self._loop,
|
||||
).result()
|
||||
if result is not None:
|
||||
ProxyLoader.loads_proxy(
|
||||
serialized_object=result, sio_client=self._sio, loop=self._loop
|
||||
)
|
||||
|
||||
|
||||
class ProxyClassMixin:
|
||||
def __init__(self) -> None:
|
||||
|
Loading…
x
Reference in New Issue
Block a user