mirror of
https://github.com/tiqi-group/pydase.git
synced 2025-04-20 00:10:03 +02:00
web server: adding function handler
This commit is contained in:
parent
74576a5810
commit
b503a5018e
@ -1,6 +1,6 @@
|
||||
from enum import Enum
|
||||
from pathlib import Path
|
||||
from typing import Any, TypedDict
|
||||
from typing import Any, TypedDict, get_type_hints
|
||||
|
||||
import socketio
|
||||
from fastapi import FastAPI
|
||||
@ -50,8 +50,8 @@ class WebAPI:
|
||||
else:
|
||||
sio = socketio.AsyncServer(async_mode="asgi")
|
||||
|
||||
@sio.on("frontend_update") # type: ignore
|
||||
def handle_frontend_update(sid: str, data: FrontendUpdate) -> None:
|
||||
@sio.event # type: ignore
|
||||
def frontend_update(sid: str, data: FrontendUpdate) -> Any:
|
||||
logger.debug(f"Received frontend update: {data}")
|
||||
attr = getattr(self.service, data["name"])
|
||||
if isinstance(attr, DataService):
|
||||
@ -60,6 +60,24 @@ class WebAPI:
|
||||
setattr(
|
||||
self.service, data["name"], attr.__class__[data["value"]["value"]]
|
||||
)
|
||||
elif callable(attr):
|
||||
args: dict[str, Any] = data["value"]["args"]
|
||||
type_hints = get_type_hints(attr)
|
||||
|
||||
# Convert arguments to their hinted types
|
||||
for arg_name, arg_value in args.items():
|
||||
if arg_name in type_hints:
|
||||
arg_type = type_hints[arg_name]
|
||||
if isinstance(arg_type, type):
|
||||
# Attempt to convert the argument to its hinted type
|
||||
try:
|
||||
args[arg_name] = arg_type(arg_value)
|
||||
except ValueError:
|
||||
msg = f"Failed to convert argument '{arg_name}' to type {arg_type.__name__}"
|
||||
logger.error(msg)
|
||||
return msg
|
||||
|
||||
return attr(**args)
|
||||
else:
|
||||
setattr(self.service, data["name"], data["value"])
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user