adds backend DeviceConnection component

This commit is contained in:
Mose Müller 2024-01-16 13:39:35 +01:00
parent 2c95a2496c
commit 4f71633c5e
2 changed files with 31 additions and 0 deletions

View File

@ -28,6 +28,7 @@ print(my_service.voltage.value) # Output: 5
"""
from pydase.components.coloured_enum import ColouredEnum
from pydase.components.connection import DeviceConnection
from pydase.components.image import Image
from pydase.components.number_slider import NumberSlider
@ -35,4 +36,5 @@ __all__ = [
"NumberSlider",
"Image",
"ColouredEnum",
"DeviceConnection",
]

View File

@ -0,0 +1,29 @@
import asyncio
from abc import ABC, abstractmethod
import pydase
class DeviceConnection(pydase.DataService, ABC):
def __init__(self) -> None:
super().__init__()
# self._autostart_tasks = {"_handle_connection": ()} # type: ignore
self._handle_connection_wait_time = 2.0
@abstractmethod
def connect(self) -> None:
"""Tries to connect to the abstracted device."""
...
@property
@abstractmethod
def available(self) -> bool:
"""Checks if the abstracted device is available."""
...
async def _handle_connection(self) -> None:
"""Tries reconnecting to the device if it is not available."""
while True:
if not self.available:
self.connect()
await asyncio.sleep(self._handle_connection_wait_time)