mirror of
https://github.com/tiqi-group/pydase.git
synced 2025-06-06 13:30:41 +02:00
Merge pull request #123 from tiqi-group/36-feat-add-support-for-dictionaries
feat: adds support for dictionaries
This commit is contained in:
commit
46868743c7
@ -223,6 +223,7 @@ In `pydase`, components are fundamental building blocks that bridge the Python b
|
||||
- `int` and `float`: Manifested as the `NumberComponent`.
|
||||
- `bool`: Rendered as a `ButtonComponent`.
|
||||
- `list`: Each item displayed individually, named after the list attribute and its index.
|
||||
- `dict`: Each key-value pair displayed individually, named after the dictionary attribute and its key. **Note** that the dictionary keys must be strings.
|
||||
- `enum.Enum`: Presented as an `EnumComponent`, facilitating dropdown selection.
|
||||
|
||||
### Method Components
|
||||
|
42
frontend/src/components/DictComponent.tsx
Normal file
42
frontend/src/components/DictComponent.tsx
Normal file
@ -0,0 +1,42 @@
|
||||
import React, { useEffect, useRef } from 'react';
|
||||
import { DocStringComponent } from './DocStringComponent';
|
||||
import { SerializedValue, GenericComponent } from './GenericComponent';
|
||||
import { LevelName } from './NotificationsComponent';
|
||||
|
||||
type DictComponentProps = {
|
||||
value: Record<string, SerializedValue>;
|
||||
docString: string;
|
||||
isInstantUpdate: boolean;
|
||||
addNotification: (message: string, levelname?: LevelName) => void;
|
||||
id: string;
|
||||
};
|
||||
|
||||
export const DictComponent = React.memo((props: DictComponentProps) => {
|
||||
const { value, docString, isInstantUpdate, addNotification, id } = props;
|
||||
|
||||
const renderCount = useRef(0);
|
||||
const valueArray = Object.values(value);
|
||||
|
||||
useEffect(() => {
|
||||
renderCount.current++;
|
||||
}, [props]);
|
||||
|
||||
return (
|
||||
<div className={'listComponent'} id={id}>
|
||||
{process.env.NODE_ENV === 'development' && (
|
||||
<div>Render count: {renderCount.current}</div>
|
||||
)}
|
||||
<DocStringComponent docString={docString} />
|
||||
{valueArray.map((item) => {
|
||||
return (
|
||||
<GenericComponent
|
||||
key={item.full_access_path}
|
||||
attribute={item}
|
||||
isInstantUpdate={isInstantUpdate}
|
||||
addNotification={addNotification}
|
||||
/>
|
||||
);
|
||||
})}
|
||||
</div>
|
||||
);
|
||||
});
|
@ -14,6 +14,8 @@ import { LevelName } from './NotificationsComponent';
|
||||
import { getIdFromFullAccessPath } from '../utils/stringUtils';
|
||||
import { WebSettingsContext } from '../WebSettings';
|
||||
import { updateValue } from '../socket';
|
||||
import { DictComponent } from './DictComponent';
|
||||
import { parseFullAccessPath } from '../utils/stateUtils';
|
||||
|
||||
type AttributeType =
|
||||
| 'str'
|
||||
@ -21,7 +23,9 @@ type AttributeType =
|
||||
| 'float'
|
||||
| 'int'
|
||||
| 'Quantity'
|
||||
| 'None'
|
||||
| 'list'
|
||||
| 'dict'
|
||||
| 'method'
|
||||
| 'DataService'
|
||||
| 'DeviceConnection'
|
||||
@ -48,12 +52,37 @@ type GenericComponentProps = {
|
||||
addNotification: (message: string, levelname?: LevelName) => void;
|
||||
};
|
||||
|
||||
const getPathFromPathParts = (pathParts: string[]): string => {
|
||||
let path = '';
|
||||
for (const pathPart of pathParts) {
|
||||
if (!pathPart.startsWith('[') && path !== '') {
|
||||
path += '.';
|
||||
}
|
||||
path += pathPart;
|
||||
}
|
||||
return path;
|
||||
};
|
||||
|
||||
const createDisplayNameFromAccessPath = (fullAccessPath: string): string => {
|
||||
const displayNameParts = [];
|
||||
const parsedFullAccessPath = parseFullAccessPath(fullAccessPath);
|
||||
for (let i = parsedFullAccessPath.length - 1; i >= 0; i--) {
|
||||
const item = parsedFullAccessPath[i];
|
||||
displayNameParts.unshift(item);
|
||||
if (!item.startsWith('[')) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
return getPathFromPathParts(displayNameParts);
|
||||
};
|
||||
|
||||
export const GenericComponent = React.memo(
|
||||
({ attribute, isInstantUpdate, addNotification }: GenericComponentProps) => {
|
||||
const { full_access_path: fullAccessPath } = attribute;
|
||||
const id = getIdFromFullAccessPath(fullAccessPath);
|
||||
const webSettings = useContext(WebSettingsContext);
|
||||
let displayName = fullAccessPath.split('.').at(-1);
|
||||
|
||||
let displayName = createDisplayNameFromAccessPath(fullAccessPath);
|
||||
|
||||
if (webSettings[fullAccessPath]) {
|
||||
if (webSettings[fullAccessPath].display === false) {
|
||||
@ -212,6 +241,16 @@ export const GenericComponent = React.memo(
|
||||
id={id}
|
||||
/>
|
||||
);
|
||||
} else if (attribute.type === 'dict') {
|
||||
return (
|
||||
<DictComponent
|
||||
value={attribute.value as Record<string, SerializedValue>}
|
||||
docString={attribute.doc}
|
||||
isInstantUpdate={isInstantUpdate}
|
||||
addNotification={addNotification}
|
||||
id={id}
|
||||
/>
|
||||
);
|
||||
} else if (attribute.type === 'Image') {
|
||||
return (
|
||||
<ImageComponent
|
||||
|
@ -328,9 +328,7 @@ export const NumberComponent = React.memo((props: NumberComponentProps) => {
|
||||
|
||||
useEffect(() => {
|
||||
// Set the cursor position after the component re-renders
|
||||
const inputElement = document.getElementsByName(
|
||||
fullAccessPath
|
||||
)[0] as HTMLInputElement;
|
||||
const inputElement = document.getElementsByName(id)[0] as HTMLInputElement;
|
||||
if (inputElement && cursorPosition !== null) {
|
||||
inputElement.setSelectionRange(cursorPosition, cursorPosition);
|
||||
}
|
||||
@ -352,7 +350,8 @@ export const NumberComponent = React.memo((props: NumberComponentProps) => {
|
||||
type="text"
|
||||
value={inputString}
|
||||
disabled={readOnly}
|
||||
name={fullAccessPath}
|
||||
onChange={() => {}}
|
||||
name={id}
|
||||
onKeyDown={handleKeyDown}
|
||||
onBlur={handleBlur}
|
||||
className={isInstantUpdate && !readOnly ? 'instantUpdate' : ''}
|
||||
|
@ -90,7 +90,7 @@ export const StringComponent = React.memo((props: StringComponentProps) => {
|
||||
</InputGroup.Text>
|
||||
<Form.Control
|
||||
type="text"
|
||||
name={fullAccessPath}
|
||||
name={id}
|
||||
value={inputString}
|
||||
disabled={readOnly}
|
||||
onChange={handleChange}
|
||||
|
@ -7,19 +7,129 @@ export type State = {
|
||||
doc: string | null;
|
||||
};
|
||||
|
||||
/**
|
||||
* Splits a full access path into its atomic parts, separating attribute names, numeric
|
||||
* indices (including floating points), and string keys within indices.
|
||||
*
|
||||
* @param path The full access path string to be split into components.
|
||||
* @returns An array of components that make up the path, including attribute names,
|
||||
* numeric indices, and string keys as separate elements.
|
||||
*/
|
||||
export function parseFullAccessPath(path: string): string[] {
|
||||
// The pattern matches:
|
||||
// \w+ - Words
|
||||
// \[\d+\.\d+\] - Floating point numbers inside brackets
|
||||
// \[\d+\] - Integers inside brackets
|
||||
// \["[^"]*"\] - Double-quoted strings inside brackets
|
||||
// \['[^']*'\] - Single-quoted strings inside brackets
|
||||
const pattern = /\w+|\[\d+\.\d+\]|\[\d+\]|\["[^"]*"\]|\['[^']*'\]/g;
|
||||
const matches = path.match(pattern);
|
||||
|
||||
return matches ?? []; // Return an empty array if no matches found
|
||||
}
|
||||
|
||||
/**
|
||||
* Parse a serialized key and convert it to an appropriate type (number or string).
|
||||
*
|
||||
* @param serializedKey The serialized key, which might be enclosed in brackets and quotes.
|
||||
* @returns The processed key as a number or an unquoted string.
|
||||
*
|
||||
* Examples:
|
||||
* console.log(parseSerializedKey("attr_name")); // Outputs: attr_name (string)
|
||||
* console.log(parseSerializedKey("[123]")); // Outputs: 123 (number)
|
||||
* console.log(parseSerializedKey("[12.3]")); // Outputs: 12.3 (number)
|
||||
* console.log(parseSerializedKey("['hello']")); // Outputs: hello (string)
|
||||
* console.log(parseSerializedKey('["12.34"]')); // Outputs: "12.34" (string)
|
||||
* console.log(parseSerializedKey('["complex"]'));// Outputs: "complex" (string)
|
||||
*/
|
||||
function parseSerializedKey(serializedKey: string): string | number {
|
||||
// Strip outer brackets if present
|
||||
if (serializedKey.startsWith('[') && serializedKey.endsWith(']')) {
|
||||
serializedKey = serializedKey.slice(1, -1);
|
||||
}
|
||||
|
||||
// Strip quotes if the resulting string is quoted
|
||||
if (
|
||||
(serializedKey.startsWith("'") && serializedKey.endsWith("'")) ||
|
||||
(serializedKey.startsWith('"') && serializedKey.endsWith('"'))
|
||||
) {
|
||||
return serializedKey.slice(1, -1);
|
||||
}
|
||||
|
||||
// Try converting to a number if the string is not quoted
|
||||
const parsedNumber = parseFloat(serializedKey);
|
||||
if (!isNaN(parsedNumber)) {
|
||||
return parsedNumber;
|
||||
}
|
||||
|
||||
// Return the original string if it's not a valid number
|
||||
return serializedKey;
|
||||
}
|
||||
|
||||
function getOrCreateItemInContainer(
|
||||
container: Record<string | number, SerializedValue> | SerializedValue[],
|
||||
key: string | number,
|
||||
allowAddKey: boolean
|
||||
): SerializedValue {
|
||||
// Check if the key exists and return the item if it does
|
||||
if (key in container) {
|
||||
return container[key];
|
||||
}
|
||||
|
||||
// Handling the case where the key does not exist
|
||||
if (Array.isArray(container)) {
|
||||
// Handling arrays
|
||||
if (allowAddKey && key === container.length) {
|
||||
container.push(createEmptySerializedObject());
|
||||
return container[key];
|
||||
}
|
||||
throw new Error(`Index out of bounds: ${key}`);
|
||||
} else {
|
||||
// Handling objects
|
||||
if (allowAddKey) {
|
||||
container[key] = createEmptySerializedObject();
|
||||
return container[key];
|
||||
}
|
||||
throw new Error(`Key not found: ${key}`);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieve an item from a container specified by the passed key. Add an item to the
|
||||
* container if allowAppend is set to True.
|
||||
*
|
||||
* @param container Either a dictionary or list of serialized objects.
|
||||
* @param key The key name or index (as a string) representing the attribute in the container.
|
||||
* @param allowAppend Whether to allow appending a new entry if the specified index is out of range by exactly one position.
|
||||
* @returns The serialized object corresponding to the specified key.
|
||||
* @throws SerializationPathError If the key is invalid or leads to an access error without append permissions.
|
||||
* @throws SerializationValueError If the expected structure is incorrect.
|
||||
*/
|
||||
function getContainerItemByKey(
|
||||
container: Record<string, SerializedValue> | SerializedValue[],
|
||||
key: string,
|
||||
allowAppend: boolean = false
|
||||
): SerializedValue {
|
||||
const processedKey = parseSerializedKey(key);
|
||||
|
||||
try {
|
||||
return getOrCreateItemInContainer(container, processedKey, allowAppend);
|
||||
} catch (error) {
|
||||
if (error instanceof RangeError) {
|
||||
throw new Error(`Index '${processedKey}': ${error.message}`);
|
||||
} else if (error instanceof Error) {
|
||||
throw new Error(`Key '${processedKey}': ${error.message}`);
|
||||
}
|
||||
throw error; // Re-throw if it's not a known error type
|
||||
}
|
||||
}
|
||||
|
||||
export function setNestedValueByPath(
|
||||
serializationDict: Record<string, SerializedValue>,
|
||||
path: string,
|
||||
serializedValue: SerializedValue
|
||||
): Record<string, SerializedValue> {
|
||||
const parentPathParts = path.split('.').slice(0, -1);
|
||||
const attrName = path.split('.').pop();
|
||||
|
||||
if (!attrName) {
|
||||
throw new Error('Invalid path');
|
||||
}
|
||||
|
||||
let currentSerializedValue: SerializedValue;
|
||||
const pathParts = parseFullAccessPath(path);
|
||||
const newSerializationDict: Record<string, SerializedValue> = JSON.parse(
|
||||
JSON.stringify(serializationDict)
|
||||
);
|
||||
@ -27,81 +137,36 @@ export function setNestedValueByPath(
|
||||
let currentDict = newSerializationDict;
|
||||
|
||||
try {
|
||||
for (const pathPart of parentPathParts) {
|
||||
currentSerializedValue = getNextLevelDictByKey(currentDict, pathPart, false);
|
||||
// @ts-expect-error The value will be of type SerializedValue as we are still
|
||||
// looping through the parent parts
|
||||
currentDict = currentSerializedValue['value'];
|
||||
for (let i = 0; i < pathParts.length - 1; i++) {
|
||||
const pathPart = pathParts[i];
|
||||
const nextLevelSerializedObject = getContainerItemByKey(
|
||||
currentDict,
|
||||
pathPart,
|
||||
false
|
||||
);
|
||||
currentDict = nextLevelSerializedObject['value'] as Record<
|
||||
string,
|
||||
SerializedValue
|
||||
>;
|
||||
}
|
||||
|
||||
currentSerializedValue = getNextLevelDictByKey(currentDict, attrName, true);
|
||||
const finalPart = pathParts[pathParts.length - 1];
|
||||
const finalObject = getContainerItemByKey(currentDict, finalPart, true);
|
||||
|
||||
Object.assign(finalObject, serializedValue);
|
||||
|
||||
Object.assign(currentSerializedValue, serializedValue);
|
||||
return newSerializationDict;
|
||||
} catch (error) {
|
||||
console.error(error);
|
||||
return currentDict;
|
||||
console.error(`Error occurred trying to change ${path}: ${error}`);
|
||||
}
|
||||
}
|
||||
|
||||
function getNextLevelDictByKey(
|
||||
serializationDict: Record<string, SerializedValue>,
|
||||
attrName: string,
|
||||
allowAppend: boolean = false
|
||||
): SerializedValue {
|
||||
const [key, index] = parseListAttrAndIndex(attrName);
|
||||
let currentDict: SerializedValue;
|
||||
|
||||
try {
|
||||
if (index !== null) {
|
||||
if (!serializationDict[key] || !Array.isArray(serializationDict[key]['value'])) {
|
||||
throw new Error(`Expected an array at '${key}', but found something else.`);
|
||||
}
|
||||
|
||||
if (index < serializationDict[key]['value'].length) {
|
||||
currentDict = serializationDict[key]['value'][index];
|
||||
} else if (allowAppend && index === serializationDict[key]['value'].length) {
|
||||
// Appending to list
|
||||
// @ts-expect-error When the index is not null, I expect an array
|
||||
serializationDict[key]['value'].push({});
|
||||
currentDict = serializationDict[key]['value'][index];
|
||||
} else {
|
||||
throw new Error(`Index out of range for '${key}[${index}]'.`);
|
||||
}
|
||||
} else {
|
||||
if (!serializationDict[key]) {
|
||||
throw new Error(`Key '${key}' not found.`);
|
||||
}
|
||||
currentDict = serializationDict[key];
|
||||
}
|
||||
} catch (error) {
|
||||
throw new Error(`Error occurred trying to access '${attrName}': ${error}`);
|
||||
}
|
||||
|
||||
if (typeof currentDict !== 'object' || currentDict === null) {
|
||||
throw new Error(
|
||||
`Expected a dictionary at '${attrName}', but found type '${typeof currentDict}' instead.`
|
||||
);
|
||||
}
|
||||
|
||||
return currentDict;
|
||||
}
|
||||
|
||||
function parseListAttrAndIndex(attrString: string): [string, number | null] {
|
||||
let index: number | null = null;
|
||||
let attrName = attrString;
|
||||
|
||||
if (attrString.includes('[') && attrString.endsWith(']')) {
|
||||
const parts = attrString.split('[');
|
||||
attrName = parts[0];
|
||||
const indexPart = parts[1].slice(0, -1); // Removes the closing ']'
|
||||
|
||||
if (!isNaN(parseInt(indexPart))) {
|
||||
index = parseInt(indexPart);
|
||||
} else {
|
||||
console.error(`Invalid index format in key: ${attrString}`);
|
||||
}
|
||||
}
|
||||
|
||||
return [attrName, index];
|
||||
function createEmptySerializedObject(): SerializedValue {
|
||||
return {
|
||||
full_access_path: '',
|
||||
value: undefined,
|
||||
type: 'None',
|
||||
doc: null,
|
||||
readonly: false
|
||||
};
|
||||
}
|
||||
|
@ -1,6 +1,6 @@
|
||||
[tool.poetry]
|
||||
name = "pydase"
|
||||
version = "0.8.1"
|
||||
version = "0.8.2"
|
||||
description = "A flexible and robust Python library for creating, managing, and interacting with data services, with built-in support for web and RPC servers, and customizable features for diverse use cases."
|
||||
authors = ["Mose Mueller <mosmuell@ethz.ch>"]
|
||||
readme = "README.md"
|
||||
|
@ -75,6 +75,37 @@ def update_value(
|
||||
)
|
||||
|
||||
|
||||
class ProxyDict(dict[str, Any]):
|
||||
def __init__(
|
||||
self,
|
||||
original_dict: dict[str, Any],
|
||||
parent_path: str,
|
||||
sio_client: socketio.AsyncClient,
|
||||
loop: asyncio.AbstractEventLoop,
|
||||
) -> None:
|
||||
super().__init__(original_dict)
|
||||
self._parent_path = parent_path
|
||||
self._loop = loop
|
||||
self._sio = sio_client
|
||||
|
||||
def __setitem__(self, key: str, value: Any) -> None:
|
||||
observer_key = key
|
||||
if isinstance(key, str):
|
||||
observer_key = f'"{key}"'
|
||||
|
||||
full_access_path = f"{self._parent_path}[{observer_key}]"
|
||||
|
||||
update_value(self._sio, self._loop, full_access_path, value)
|
||||
|
||||
def pop(self, key: str) -> Any: # type: ignore
|
||||
"""Removes the element from the dictionary on the server. It does not return
|
||||
any proxy as the corresponding object on the server does not live anymore."""
|
||||
|
||||
full_access_path = f"{self._parent_path}.pop"
|
||||
|
||||
trigger_method(self._sio, self._loop, full_access_path, [key], {})
|
||||
|
||||
|
||||
class ProxyList(list[Any]):
|
||||
def __init__(
|
||||
self,
|
||||
@ -266,7 +297,17 @@ class ProxyLoader:
|
||||
sio_client: socketio.AsyncClient,
|
||||
loop: asyncio.AbstractEventLoop,
|
||||
) -> Any:
|
||||
return loads(serialized_object)
|
||||
return ProxyDict(
|
||||
{
|
||||
key: ProxyLoader.loads_proxy(value, sio_client, loop)
|
||||
for key, value in cast(
|
||||
dict[str, SerializedObject], serialized_object["value"]
|
||||
).items()
|
||||
},
|
||||
parent_path=serialized_object["full_access_path"],
|
||||
sio_client=sio_client,
|
||||
loop=loop,
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def update_data_service_proxy(
|
||||
|
@ -73,7 +73,7 @@ class DataService(AbstractDataService):
|
||||
|
||||
if not issubclass(
|
||||
value_class,
|
||||
(int | float | bool | str | list | Enum | u.Quantity | Observable),
|
||||
(int | float | bool | str | list | dict | Enum | u.Quantity | Observable),
|
||||
):
|
||||
logger.warning(
|
||||
"Class '%s' does not inherit from DataService. This may lead to"
|
||||
|
@ -7,9 +7,10 @@ from typing import TYPE_CHECKING, Any, cast
|
||||
|
||||
from pydase.data_service.data_service_cache import DataServiceCache
|
||||
from pydase.utils.helpers import (
|
||||
get_object_attr_from_path,
|
||||
get_object_by_path_parts,
|
||||
is_property_attribute,
|
||||
parse_list_attr_and_index,
|
||||
parse_full_access_path,
|
||||
parse_serialized_key,
|
||||
)
|
||||
from pydase.utils.serialization.deserializer import loads
|
||||
from pydase.utils.serialization.serializer import (
|
||||
@ -236,44 +237,32 @@ class StateManager:
|
||||
def __update_attribute_by_path(
|
||||
self, path: str, serialized_value: SerializedObject
|
||||
) -> None:
|
||||
parent_path, attr_name = ".".join(path.split(".")[:-1]), path.split(".")[-1]
|
||||
|
||||
# If attr_name corresponds to a list entry, extract the attr_name and the
|
||||
# index
|
||||
attr_name, index = parse_list_attr_and_index(attr_name)
|
||||
|
||||
# Update path to reflect the attribute without list indices
|
||||
path = f"{parent_path}.{attr_name}" if parent_path != "" else attr_name
|
||||
path_parts = parse_full_access_path(path)
|
||||
target_obj = get_object_by_path_parts(self.service, path_parts[:-1])
|
||||
|
||||
attr_cache_type = get_nested_dict_by_path(self.cache_value, path)["type"]
|
||||
|
||||
# Traverse the object according to the path parts
|
||||
target_obj = get_object_attr_from_path(self.service, parent_path)
|
||||
|
||||
# De-serialize the value
|
||||
if attr_cache_type in ("ColouredEnum", "Enum"):
|
||||
enum_attr = get_object_attr_from_path(target_obj, attr_name)
|
||||
enum_attr = get_object_by_path_parts(target_obj, [path_parts[-1]])
|
||||
# take the value of the existing enum class
|
||||
if serialized_value["type"] in ("ColouredEnum", "Enum"):
|
||||
try:
|
||||
setattr(
|
||||
target_obj,
|
||||
attr_name,
|
||||
enum_attr.__class__[serialized_value["value"]],
|
||||
)
|
||||
return
|
||||
value = enum_attr.__class__[serialized_value["value"]]
|
||||
except KeyError:
|
||||
# This error will arise when setting an enum from another enum class
|
||||
# In this case, we resort to loading the enum and setting it
|
||||
# directly
|
||||
pass
|
||||
|
||||
value = loads(serialized_value)
|
||||
|
||||
if attr_cache_type == "list":
|
||||
list_obj = get_object_attr_from_path(target_obj, attr_name)
|
||||
list_obj[index] = value
|
||||
value = loads(serialized_value)
|
||||
else:
|
||||
setattr(target_obj, attr_name, value)
|
||||
value = loads(serialized_value)
|
||||
|
||||
# set the value
|
||||
if isinstance(target_obj, list | dict):
|
||||
processed_key = parse_serialized_key(path_parts[-1])
|
||||
target_obj[processed_key] = value # type: ignore
|
||||
else:
|
||||
setattr(target_obj, path_parts[-1], value)
|
||||
|
||||
def __is_loadable_state_attribute(self, full_access_path: str) -> bool:
|
||||
"""Checks if an attribute defined by a dot-separated path should be loaded from
|
||||
@ -283,20 +272,17 @@ class StateManager:
|
||||
attributes default to being loadable.
|
||||
"""
|
||||
|
||||
parent_path, attr_name = (
|
||||
".".join(full_access_path.split(".")[:-1]),
|
||||
full_access_path.split(".")[-1],
|
||||
)
|
||||
parent_object = get_object_attr_from_path(self.service, parent_path)
|
||||
path_parts = parse_full_access_path(full_access_path)
|
||||
parent_object = get_object_by_path_parts(self.service, path_parts[:-1])
|
||||
|
||||
if is_property_attribute(parent_object, attr_name):
|
||||
prop = getattr(type(parent_object), attr_name)
|
||||
if is_property_attribute(parent_object, path_parts[-1]):
|
||||
prop = getattr(type(parent_object), path_parts[-1])
|
||||
has_decorator = has_load_state_decorator(prop)
|
||||
if not has_decorator:
|
||||
logger.debug(
|
||||
"Property '%s' has no '@load_state' decorator. "
|
||||
"Ignoring value from JSON file...",
|
||||
attr_name,
|
||||
path_parts[-1],
|
||||
)
|
||||
return has_decorator
|
||||
|
||||
@ -314,6 +300,6 @@ class StateManager:
|
||||
logger.debug(
|
||||
"Path %a could not be loaded. It does not correspond to an attribute of"
|
||||
" the class. Ignoring value from JSON file...",
|
||||
attr_name,
|
||||
path_parts[-1],
|
||||
)
|
||||
return False
|
||||
|
@ -1,13 +1,13 @@
|
||||
{
|
||||
"files": {
|
||||
"main.css": "/static/css/main.7ef670d5.css",
|
||||
"main.js": "/static/js/main.17005bcb.js",
|
||||
"main.js": "/static/js/main.57f8ec4c.js",
|
||||
"index.html": "/index.html",
|
||||
"main.7ef670d5.css.map": "/static/css/main.7ef670d5.css.map",
|
||||
"main.17005bcb.js.map": "/static/js/main.17005bcb.js.map"
|
||||
"main.57f8ec4c.js.map": "/static/js/main.57f8ec4c.js.map"
|
||||
},
|
||||
"entrypoints": [
|
||||
"static/css/main.7ef670d5.css",
|
||||
"static/js/main.17005bcb.js"
|
||||
"static/js/main.57f8ec4c.js"
|
||||
]
|
||||
}
|
@ -1 +1 @@
|
||||
<!doctype html><html lang="en"><head><meta charset="utf-8"/><link rel="icon" href="/favicon.ico"/><meta name="viewport" content="width=device-width,initial-scale=1"/><meta name="theme-color" content="#000000"/><meta name="description" content="Web site displaying a pydase UI."/><link rel="apple-touch-icon" href="/logo192.png"/><link rel="manifest" href="/manifest.json"/><title>pydase App</title><script defer="defer" src="/static/js/main.17005bcb.js"></script><link href="/static/css/main.7ef670d5.css" rel="stylesheet"></head><body><noscript>You need to enable JavaScript to run this app.</noscript><div id="root"></div></body></html>
|
||||
<!doctype html><html lang="en"><head><meta charset="utf-8"/><link rel="icon" href="/favicon.ico"/><meta name="viewport" content="width=device-width,initial-scale=1"/><meta name="theme-color" content="#000000"/><meta name="description" content="Web site displaying a pydase UI."/><link rel="apple-touch-icon" href="/logo192.png"/><link rel="manifest" href="/manifest.json"/><title>pydase App</title><script defer="defer" src="/static/js/main.57f8ec4c.js"></script><link href="/static/css/main.7ef670d5.css" rel="stylesheet"></head><body><noscript>You need to enable JavaScript to run this app.</noscript><div id="root"></div></body></html>
|
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
1
src/pydase/frontend/static/js/main.57f8ec4c.js.map
Normal file
1
src/pydase/frontend/static/js/main.57f8ec4c.js.map
Normal file
File diff suppressed because one or more lines are too long
@ -15,6 +15,7 @@ class Observable(ObservableObject):
|
||||
for k in set(type(self).__dict__)
|
||||
- set(Observable.__dict__)
|
||||
- set(self.__dict__)
|
||||
- {"__annotations__"}
|
||||
}
|
||||
for name, value in class_attrs.items():
|
||||
if isinstance(value, property) or callable(value):
|
||||
|
@ -3,6 +3,8 @@ from abc import ABC, abstractmethod
|
||||
from collections.abc import Iterable
|
||||
from typing import TYPE_CHECKING, Any, ClassVar, SupportsIndex
|
||||
|
||||
from pydase.utils.helpers import parse_serialized_key
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from pydase.observer_pattern.observer.observer import Observer
|
||||
|
||||
@ -81,7 +83,7 @@ class ObservableObject(ABC):
|
||||
)
|
||||
observer._notify_change_start(extended_attr_path)
|
||||
|
||||
def _initialise_new_objects(self, attr_name_or_key: Any, value: Any) -> Any:
|
||||
def _initialise_new_objects(self, attr_name_or_key: str, value: Any) -> Any:
|
||||
new_value = value
|
||||
if isinstance(value, list):
|
||||
if id(value) in self._list_mapping:
|
||||
@ -93,14 +95,14 @@ class ObservableObject(ABC):
|
||||
self._list_mapping[id(value)] = new_value
|
||||
elif isinstance(value, dict):
|
||||
if id(value) in self._dict_mapping:
|
||||
# If the list `value` was already referenced somewhere else
|
||||
# If the dict `value` was already referenced somewhere else
|
||||
new_value = self._dict_mapping[id(value)]
|
||||
else:
|
||||
# convert the builtin list into a ObservableList
|
||||
new_value = _ObservableDict(original_dict=value)
|
||||
self._dict_mapping[id(value)] = new_value
|
||||
if isinstance(new_value, ObservableObject):
|
||||
new_value.add_observer(self, str(attr_name_or_key))
|
||||
new_value.add_observer(self, attr_name_or_key)
|
||||
return new_value
|
||||
|
||||
@abstractmethod
|
||||
@ -224,7 +226,7 @@ class _ObservableList(ObservableObject, list[Any]):
|
||||
return instance_attr_name
|
||||
|
||||
|
||||
class _ObservableDict(dict[str, Any], ObservableObject):
|
||||
class _ObservableDict(ObservableObject, dict[str, Any]):
|
||||
def __init__(
|
||||
self,
|
||||
original_dict: dict[str, Any],
|
||||
@ -233,24 +235,26 @@ class _ObservableDict(dict[str, Any], ObservableObject):
|
||||
ObservableObject.__init__(self)
|
||||
dict.__init__(self)
|
||||
for key, value in self._original_dict.items():
|
||||
super().__setitem__(key, self._initialise_new_objects(f"['{key}']", value))
|
||||
self.__setitem__(key, self._initialise_new_objects(f'["{key}"]', value))
|
||||
|
||||
def __setitem__(self, key: str, value: Any) -> None:
|
||||
if not isinstance(key, str):
|
||||
logger.warning("Converting non-string dictionary key %s to string.", key)
|
||||
key = str(key)
|
||||
raise ValueError(
|
||||
f"Invalid key type: {key} ({type(key).__name__}). In pydase services, "
|
||||
"dictionary keys must be strings."
|
||||
)
|
||||
|
||||
if hasattr(self, "_observers"):
|
||||
self._remove_observer_if_observable(f"['{key}']")
|
||||
value = self._initialise_new_objects(key, value)
|
||||
self._notify_change_start(f"['{key}']")
|
||||
self._remove_observer_if_observable(f'["{key}"]')
|
||||
value = self._initialise_new_objects(f'["{key}"]', value)
|
||||
self._notify_change_start(f'["{key}"]')
|
||||
|
||||
super().__setitem__(key, value)
|
||||
|
||||
self._notify_changed(f"['{key}']", value)
|
||||
self._notify_changed(f'["{key}"]', value)
|
||||
|
||||
def _remove_observer_if_observable(self, name: str) -> None:
|
||||
key = name[2:-2]
|
||||
key = str(parse_serialized_key(name))
|
||||
current_value = self.get(key, None)
|
||||
|
||||
if isinstance(current_value, ObservableObject):
|
||||
@ -262,3 +266,11 @@ class _ObservableDict(dict[str, Any], ObservableObject):
|
||||
if observer_attr_name != "":
|
||||
return f"{observer_attr_name}{instance_attr_name}"
|
||||
return instance_attr_name
|
||||
|
||||
def pop(self, key: str) -> Any: # type: ignore[override]
|
||||
self._remove_observer_if_observable(f'["{key}"]')
|
||||
|
||||
popped_item = super().pop(key)
|
||||
|
||||
self._notify_changed("", self)
|
||||
return popped_item
|
||||
|
@ -16,6 +16,7 @@ from pydase.data_service.data_service_observer import DataServiceObserver
|
||||
from pydase.server.web_server.sio_setup import (
|
||||
setup_sio_server,
|
||||
)
|
||||
from pydase.utils.helpers import get_path_from_path_parts, parse_full_access_path
|
||||
from pydase.utils.serialization.serializer import generate_serialized_data_paths
|
||||
from pydase.version import __version__
|
||||
|
||||
@ -131,8 +132,18 @@ class WebServer:
|
||||
if path in current_web_settings:
|
||||
continue
|
||||
|
||||
# Creating the display name by reversely looping through the path parts
|
||||
# until an item does not start with a square bracket, and putting the parts
|
||||
# back together again. This allows for display names like
|
||||
# >>> 'dict_attr["some.dotted.key"]'
|
||||
display_name_parts: list[str] = []
|
||||
for item in parse_full_access_path(path)[::-1]:
|
||||
display_name_parts.insert(0, item)
|
||||
if not item.startswith("["):
|
||||
break
|
||||
|
||||
current_web_settings[path] = {
|
||||
"displayName": path.split(".")[-1],
|
||||
"displayName": get_path_from_path_parts(display_name_parts),
|
||||
"display": True,
|
||||
}
|
||||
|
||||
|
@ -1,3 +1,4 @@
|
||||
import inspect
|
||||
from collections.abc import Callable
|
||||
from typing import Any
|
||||
|
||||
@ -25,3 +26,17 @@ def frontend(func: Callable[..., Any]) -> Callable[..., Any]:
|
||||
# Mark the function for frontend display.
|
||||
func._display_in_frontend = True # type: ignore
|
||||
return func
|
||||
|
||||
|
||||
def render_in_frontend(func: Callable[..., Any]) -> bool:
|
||||
"""Determines if the method should be rendered in the frontend.
|
||||
|
||||
It checks if the "@frontend" decorator was used or the method is a coroutine."""
|
||||
|
||||
if inspect.iscoroutinefunction(func):
|
||||
return True
|
||||
|
||||
try:
|
||||
return func._display_in_frontend # type: ignore
|
||||
except AttributeError:
|
||||
return False
|
||||
|
@ -1,5 +1,6 @@
|
||||
import inspect
|
||||
import logging
|
||||
import re
|
||||
from collections.abc import Callable
|
||||
from itertools import chain
|
||||
from typing import Any
|
||||
@ -7,6 +8,92 @@ from typing import Any
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def parse_serialized_key(serialized_key: str) -> str | int | float:
|
||||
"""
|
||||
Parse a serialized key and convert it to an appropriate type (int, float, or str).
|
||||
|
||||
Args:
|
||||
serialized_key: str
|
||||
The serialized key, which might be enclosed in brackets and quotes.
|
||||
|
||||
Returns:
|
||||
int | float | str:
|
||||
The processed key as an integer, float, or unquoted string.
|
||||
|
||||
Examples:
|
||||
```python
|
||||
print(parse_serialized_key("attr_name")) # Outputs: attr_name (str)
|
||||
print(parse_serialized_key("[123]")) # Outputs: 123 (int)
|
||||
print(parse_serialized_key("[12.3]")) # Outputs: 12.3 (float)
|
||||
print(parse_serialized_key("['hello']")) # Outputs: hello (str)
|
||||
print(parse_serialized_key('["12.34"]')) # Outputs: 12.34 (str)
|
||||
print(parse_serialized_key('["complex"]')) # Outputs: complex (str)
|
||||
```
|
||||
"""
|
||||
|
||||
# Strip outer brackets if present
|
||||
if serialized_key.startswith("[") and serialized_key.endswith("]"):
|
||||
serialized_key = serialized_key[1:-1]
|
||||
|
||||
# Strip quotes if the resulting string is quoted
|
||||
if serialized_key.startswith(("'", '"')) and serialized_key.endswith(("'", '"')):
|
||||
return serialized_key[1:-1]
|
||||
|
||||
# Try converting to float or int if the string is not quoted
|
||||
try:
|
||||
return float(serialized_key) if "." in serialized_key else int(serialized_key)
|
||||
except ValueError:
|
||||
# Return the original string if it's not a valid number
|
||||
return serialized_key
|
||||
|
||||
|
||||
def parse_full_access_path(path: str) -> list[str]:
|
||||
"""
|
||||
Splits a full access path into its atomic parts, separating attribute names, numeric
|
||||
indices (including floating points), and string keys within indices.
|
||||
|
||||
Args:
|
||||
path: str
|
||||
The full access path string to be split into components.
|
||||
|
||||
Returns:
|
||||
list[str]
|
||||
A list of components that make up the path, including attribute names,
|
||||
numeric indices, and string keys as separate elements.
|
||||
"""
|
||||
# Matches:
|
||||
# \w+ - Words
|
||||
# \[\d+\.\d+\] - Floating point numbers inside brackets
|
||||
# \[\d+\] - Integers inside brackets
|
||||
# \["[^"]*"\] - Double-quoted strings inside brackets
|
||||
# \['[^']*'\] - Single-quoted strings inside brackets
|
||||
pattern = r'\w+|\[\d+\.\d+\]|\[\d+\]|\["[^"]*"\]|\[\'[^\']*\']'
|
||||
return re.findall(pattern, path)
|
||||
|
||||
|
||||
def get_path_from_path_parts(path_parts: list[str]) -> str:
|
||||
"""Creates the full access path from its atomic parts.
|
||||
|
||||
The reverse function is given by `parse_full_access_path`.
|
||||
|
||||
Args:
|
||||
path_parts: list[str]
|
||||
A list of components that make up the path, including attribute names,
|
||||
numeric indices and string keys enclosed in square brackets as separate
|
||||
elements.
|
||||
Returns:
|
||||
str
|
||||
The full access path corresponding to the path_parts.
|
||||
"""
|
||||
|
||||
path = ""
|
||||
for path_part in path_parts:
|
||||
if not path_part.startswith("[") and path != "":
|
||||
path += "."
|
||||
path += path_part
|
||||
return path
|
||||
|
||||
|
||||
def get_attribute_doc(attr: Any) -> str | None:
|
||||
"""This function takes an input attribute attr and returns its documentation
|
||||
string if it's different from the documentation of its type, otherwise,
|
||||
@ -30,6 +117,20 @@ def get_class_and_instance_attributes(obj: object) -> dict[str, Any]:
|
||||
return dict(chain(type(obj).__dict__.items(), obj.__dict__.items()))
|
||||
|
||||
|
||||
def get_object_by_path_parts(target_obj: Any, path_parts: list[str]) -> Any:
|
||||
for part in path_parts:
|
||||
if part.startswith("["):
|
||||
deserialized_part = parse_serialized_key(part)
|
||||
target_obj = target_obj[deserialized_part]
|
||||
else:
|
||||
try:
|
||||
target_obj = getattr(target_obj, part)
|
||||
except AttributeError:
|
||||
logger.debug("Attribute %a does not exist in the object.", part)
|
||||
return None
|
||||
return target_obj
|
||||
|
||||
|
||||
def get_object_attr_from_path(target_obj: Any, path: str) -> Any:
|
||||
"""
|
||||
Traverse the object tree according to the given path.
|
||||
@ -46,94 +147,8 @@ def get_object_attr_from_path(target_obj: Any, path: str) -> Any:
|
||||
Raises:
|
||||
ValueError: If a list index in the path is not a valid integer.
|
||||
"""
|
||||
path_list = path.split(".") if path != "" else []
|
||||
for part in path_list:
|
||||
try:
|
||||
# Try to split the part into attribute and index
|
||||
attr, index_str = part.split("[", maxsplit=1)
|
||||
index_str = index_str.replace("]", "")
|
||||
index = int(index_str)
|
||||
target_obj = getattr(target_obj, attr)[index]
|
||||
except ValueError:
|
||||
# No index, so just get the attribute
|
||||
target_obj = getattr(target_obj, part)
|
||||
except AttributeError:
|
||||
# The attribute doesn't exist
|
||||
logger.debug("Attribute % does not exist in the object.", part)
|
||||
return None
|
||||
return target_obj
|
||||
|
||||
|
||||
def update_value_if_changed(
|
||||
target: Any, attr_name_or_index: str | int, new_value: Any
|
||||
) -> None:
|
||||
"""
|
||||
Updates the value of an attribute or a list element on a target object if the new
|
||||
value differs from the current one.
|
||||
|
||||
This function supports updating both attributes of an object and elements of a list.
|
||||
|
||||
- For objects, the function first checks the current value of the attribute. If the
|
||||
current value differs from the new value, the function updates the attribute.
|
||||
|
||||
- For lists, the function checks the current value at the specified index. If the
|
||||
current value differs from the new value, the function updates the list element
|
||||
at the given index.
|
||||
|
||||
Args:
|
||||
target (Any):
|
||||
The target object that has the attribute or the list.
|
||||
attr_name_or_index (str | int):
|
||||
The name of the attribute or the index of the list element.
|
||||
new_value (Any):
|
||||
The new value for the attribute or the list element.
|
||||
"""
|
||||
|
||||
if isinstance(target, list) and isinstance(attr_name_or_index, int):
|
||||
if target[attr_name_or_index] != new_value:
|
||||
target[attr_name_or_index] = new_value
|
||||
elif isinstance(attr_name_or_index, str):
|
||||
# If the type matches and the current value is different from the new value,
|
||||
# update the attribute.
|
||||
if getattr(target, attr_name_or_index) != new_value:
|
||||
setattr(target, attr_name_or_index, new_value)
|
||||
else:
|
||||
logger.error("Incompatible arguments: %s, %s.", target, attr_name_or_index)
|
||||
|
||||
|
||||
def parse_list_attr_and_index(attr_string: str) -> tuple[str, int | None]:
|
||||
"""
|
||||
Parses an attribute string and extracts a potential list attribute name and its
|
||||
index.
|
||||
Logs an error if the index is not a valid digit.
|
||||
|
||||
Args:
|
||||
attr_string (str):
|
||||
The attribute string to parse. Can be a regular attribute name (e.g.,
|
||||
'attr_name') or a list attribute with an index (e.g., 'list_attr[2]').
|
||||
|
||||
Returns:
|
||||
tuple[str, Optional[int]]:
|
||||
A tuple containing the attribute name as a string and the index as an
|
||||
integer if present, otherwise None.
|
||||
|
||||
Examples:
|
||||
>>> parse_attribute_and_index('list_attr[2]')
|
||||
('list_attr', 2)
|
||||
>>> parse_attribute_and_index('attr_name')
|
||||
('attr_name', None)
|
||||
"""
|
||||
|
||||
index = None
|
||||
attr_name = attr_string
|
||||
if "[" in attr_string and attr_string.endswith("]"):
|
||||
attr_name, index_part = attr_string.split("[", 1)
|
||||
index_part = index_part.rstrip("]")
|
||||
if index_part.isdigit():
|
||||
index = int(index_part)
|
||||
else:
|
||||
logger.error("Invalid index format in key: %s", attr_name)
|
||||
return attr_name, index
|
||||
path_parts = parse_full_access_path(path)
|
||||
return get_object_by_path_parts(target_obj, path_parts)
|
||||
|
||||
|
||||
def get_component_classes() -> list[type]:
|
||||
@ -154,12 +169,12 @@ def get_data_service_class_reference() -> Any:
|
||||
|
||||
|
||||
def is_property_attribute(target_obj: Any, access_path: str) -> bool:
|
||||
parent_path, attr_name = (
|
||||
".".join(access_path.split(".")[:-1]),
|
||||
access_path.split(".")[-1],
|
||||
)
|
||||
target_obj = get_object_attr_from_path(target_obj, parent_path)
|
||||
return isinstance(getattr(type(target_obj), attr_name, None), property)
|
||||
path_parts = parse_full_access_path(access_path)
|
||||
target_obj = get_object_by_path_parts(target_obj, path_parts[:-1])
|
||||
|
||||
# don't have to check if target_obj is dict or list as their content cannot be
|
||||
# properties -> always return False then
|
||||
return isinstance(getattr(type(target_obj), path_parts[-1], None), property)
|
||||
|
||||
|
||||
def function_has_arguments(func: Callable[..., Any]) -> bool:
|
||||
@ -172,17 +187,3 @@ def function_has_arguments(func: Callable[..., Any]) -> bool:
|
||||
if len(parameters) > 0:
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def render_in_frontend(func: Callable[..., Any]) -> bool:
|
||||
"""Determines if the method should be rendered in the frontend.
|
||||
|
||||
It checks if the "@frontend" decorator was used or the method is a coroutine."""
|
||||
|
||||
if inspect.iscoroutinefunction(func):
|
||||
return True
|
||||
|
||||
try:
|
||||
return func._display_in_frontend # type: ignore
|
||||
except AttributeError:
|
||||
return False
|
||||
|
@ -9,12 +9,13 @@ from typing import TYPE_CHECKING, Any, Literal, cast
|
||||
import pydase.units as u
|
||||
from pydase.data_service.abstract_data_service import AbstractDataService
|
||||
from pydase.data_service.task_manager import TaskStatus
|
||||
from pydase.utils.decorators import render_in_frontend
|
||||
from pydase.utils.helpers import (
|
||||
get_attribute_doc,
|
||||
get_component_classes,
|
||||
get_data_service_class_reference,
|
||||
parse_list_attr_and_index,
|
||||
render_in_frontend,
|
||||
parse_full_access_path,
|
||||
parse_serialized_key,
|
||||
)
|
||||
from pydase.utils.serialization.types import (
|
||||
DataServiceTypes,
|
||||
@ -166,10 +167,11 @@ class Serializer:
|
||||
def _serialize_dict(obj: dict[str, Any], access_path: str = "") -> SerializedDict:
|
||||
readonly = False
|
||||
doc = get_attribute_doc(obj)
|
||||
value = {
|
||||
key: Serializer.serialize_object(val, access_path=f'{access_path}["{key}"]')
|
||||
for key, val in obj.items()
|
||||
}
|
||||
value = {}
|
||||
for key, val in obj.items():
|
||||
value[key] = Serializer.serialize_object(
|
||||
val, access_path=f'{access_path}["{key}"]'
|
||||
)
|
||||
return {
|
||||
"full_access_path": access_path,
|
||||
"type": "dict",
|
||||
@ -301,7 +303,7 @@ def dump(obj: Any) -> SerializedObject:
|
||||
|
||||
|
||||
def set_nested_value_by_path(
|
||||
serialization_dict: dict[str, SerializedObject], path: str, value: Any
|
||||
serialization_dict: dict[Any, SerializedObject], path: str, value: Any
|
||||
) -> None:
|
||||
"""
|
||||
Set a value in a nested dictionary structure, which conforms to the serialization
|
||||
@ -322,23 +324,24 @@ def set_nested_value_by_path(
|
||||
serialized representation of the 'value' to the list.
|
||||
"""
|
||||
|
||||
parent_path_parts, attr_name = path.split(".")[:-1], path.split(".")[-1]
|
||||
current_dict: dict[str, SerializedObject] = serialization_dict
|
||||
path_parts = parse_full_access_path(path)
|
||||
current_dict: dict[Any, SerializedObject] = serialization_dict
|
||||
|
||||
try:
|
||||
for path_part in parent_path_parts:
|
||||
next_level_serialized_object = get_next_level_dict_by_key(
|
||||
for path_part in path_parts[:-1]:
|
||||
next_level_serialized_object = get_container_item_by_key(
|
||||
current_dict, path_part, allow_append=False
|
||||
)
|
||||
current_dict = cast(
|
||||
dict[str, SerializedObject], next_level_serialized_object["value"]
|
||||
dict[Any, SerializedObject],
|
||||
next_level_serialized_object["value"],
|
||||
)
|
||||
|
||||
next_level_serialized_object = get_next_level_dict_by_key(
|
||||
current_dict, attr_name, allow_append=True
|
||||
next_level_serialized_object = get_container_item_by_key(
|
||||
current_dict, path_parts[-1], allow_append=True
|
||||
)
|
||||
except (SerializationPathError, SerializationValueError, KeyError) as e:
|
||||
logger.error(e)
|
||||
logger.error("Error occured trying to change %a: %s", path, e)
|
||||
return
|
||||
|
||||
if next_level_serialized_object["type"] == "method": # state change of task
|
||||
@ -360,149 +363,186 @@ def set_nested_value_by_path(
|
||||
|
||||
|
||||
def get_nested_dict_by_path(
|
||||
serialization_dict: dict[str, SerializedObject],
|
||||
serialization_dict: dict[Any, SerializedObject],
|
||||
path: str,
|
||||
) -> SerializedObject:
|
||||
parent_path_parts, attr_name = path.split(".")[:-1], path.split(".")[-1]
|
||||
current_dict: dict[str, SerializedObject] = serialization_dict
|
||||
path_parts = parse_full_access_path(path)
|
||||
current_dict: dict[Any, SerializedObject] = serialization_dict
|
||||
|
||||
for path_part in parent_path_parts:
|
||||
next_level_serialized_object = get_next_level_dict_by_key(
|
||||
for path_part in path_parts[:-1]:
|
||||
next_level_serialized_object = get_container_item_by_key(
|
||||
current_dict, path_part, allow_append=False
|
||||
)
|
||||
current_dict = cast(
|
||||
dict[str, SerializedObject], next_level_serialized_object["value"]
|
||||
dict[Any, SerializedObject],
|
||||
next_level_serialized_object["value"],
|
||||
)
|
||||
return get_next_level_dict_by_key(current_dict, attr_name, allow_append=False)
|
||||
return get_container_item_by_key(current_dict, path_parts[-1], allow_append=False)
|
||||
|
||||
|
||||
def get_next_level_dict_by_key(
|
||||
serialization_dict: dict[str, SerializedObject],
|
||||
attr_name: str,
|
||||
def create_empty_serialized_object() -> SerializedObject:
|
||||
"""Create a new empty serialized object."""
|
||||
|
||||
return {
|
||||
"full_access_path": "",
|
||||
"value": None,
|
||||
"type": "None",
|
||||
"doc": None,
|
||||
"readonly": False,
|
||||
}
|
||||
|
||||
|
||||
def get_or_create_item_in_container(
|
||||
container: dict[Any, SerializedObject] | list[SerializedObject],
|
||||
key: Any,
|
||||
*,
|
||||
allow_add_key: bool,
|
||||
) -> SerializedObject:
|
||||
"""Ensure the key exists in the dictionary, append if necessary and allowed."""
|
||||
|
||||
try:
|
||||
return container[key]
|
||||
except IndexError:
|
||||
if allow_add_key and key == len(container):
|
||||
cast(list[SerializedObject], container).append(
|
||||
create_empty_serialized_object()
|
||||
)
|
||||
return container[key]
|
||||
raise
|
||||
except KeyError:
|
||||
if allow_add_key:
|
||||
container[key] = create_empty_serialized_object()
|
||||
return container[key]
|
||||
raise
|
||||
|
||||
|
||||
def get_container_item_by_key(
|
||||
container: dict[Any, SerializedObject] | list[SerializedObject],
|
||||
key: str,
|
||||
*,
|
||||
allow_append: bool = False,
|
||||
) -> SerializedObject:
|
||||
"""
|
||||
Retrieve a nested dictionary entry or list item from a data structure serialized
|
||||
with `pydase.utils.serializer.Serializer`.
|
||||
Retrieve an item from a container specified by the passed key. Add an item to the
|
||||
container if allow_append is set to True.
|
||||
|
||||
If specified keys or indexes do not exist, the function can append new elements to
|
||||
dictionaries and to lists if `allow_append` is True and the missing element is
|
||||
exactly the next sequential index (for lists).
|
||||
|
||||
Args:
|
||||
serialization_dict: The base dictionary representing serialized data.
|
||||
attr_name: The key name representing the attribute in the dictionary,
|
||||
e.g. 'list_attr[0]' or 'attr'
|
||||
allow_append: Flag to allow appending a new entry if `index` is out of range by
|
||||
one.
|
||||
container: dict[str, SerializedObject] | list[SerializedObject]
|
||||
The container representing serialized data.
|
||||
key: str
|
||||
The key name representing the attribute in the dictionary, which may include
|
||||
direct keys or indexes (e.g., 'attr_name', '["key"]' or '[0]').
|
||||
allow_append: bool
|
||||
Flag to allow appending a new entry if the specified index is out of range
|
||||
by exactly one position.
|
||||
|
||||
Returns:
|
||||
The dictionary or list item corresponding to the attribute and index.
|
||||
SerializedObject
|
||||
The dictionary or list item corresponding to the specified attribute and
|
||||
index.
|
||||
|
||||
Raises:
|
||||
SerializationPathError: If the path composed of `attr_name` and `index` is
|
||||
invalid or leads to an IndexError or KeyError.
|
||||
SerializationValueError: If the expected nested structure is not a dictionary.
|
||||
SerializationPathError:
|
||||
If the path composed of `attr_name` and any specified index is invalid, or
|
||||
leads to an IndexError or KeyError. This error is also raised if an attempt
|
||||
to access a nonexistent key or index occurs without permission to append.
|
||||
SerializationValueError:
|
||||
If the retrieval results in an object that is expected to be a dictionary
|
||||
but is not, indicating a mismatch between expected and actual serialized
|
||||
data structure.
|
||||
"""
|
||||
# Check if the key contains an index part like 'attr_name[<index>]'
|
||||
attr_name, index = parse_list_attr_and_index(attr_name)
|
||||
processed_key = parse_serialized_key(key)
|
||||
|
||||
try:
|
||||
if index is not None:
|
||||
next_level_serialized_object = cast(
|
||||
list[SerializedObject], serialization_dict[attr_name]["value"]
|
||||
)[index]
|
||||
else:
|
||||
next_level_serialized_object = serialization_dict[attr_name]
|
||||
except IndexError as e:
|
||||
if (
|
||||
index is not None
|
||||
and allow_append
|
||||
and index
|
||||
== len(cast(list[SerializedObject], serialization_dict[attr_name]["value"]))
|
||||
):
|
||||
# Appending to list
|
||||
cast(list[SerializedObject], serialization_dict[attr_name]["value"]).append(
|
||||
{
|
||||
"full_access_path": "",
|
||||
"value": None,
|
||||
"type": "None",
|
||||
"doc": None,
|
||||
"readonly": False,
|
||||
}
|
||||
)
|
||||
next_level_serialized_object = cast(
|
||||
list[SerializedObject], serialization_dict[attr_name]["value"]
|
||||
)[index]
|
||||
else:
|
||||
raise SerializationPathError(
|
||||
f"Error occured trying to change '{attr_name}[{index}]': {e}"
|
||||
)
|
||||
except KeyError:
|
||||
if not allow_append:
|
||||
raise SerializationPathError(
|
||||
f"Error occured trying to access the key '{attr_name}': it is either "
|
||||
"not present in the current dictionary or its value does not contain "
|
||||
"a 'value' key."
|
||||
)
|
||||
serialization_dict[attr_name] = {
|
||||
"full_access_path": "",
|
||||
"value": None,
|
||||
"type": "None",
|
||||
"doc": None,
|
||||
"readonly": False,
|
||||
}
|
||||
next_level_serialized_object = serialization_dict[attr_name]
|
||||
|
||||
if not isinstance(next_level_serialized_object, dict):
|
||||
raise SerializationValueError(
|
||||
f"Expected a dictionary at '{attr_name}', but found type "
|
||||
f"'{type(next_level_serialized_object).__name__}' instead."
|
||||
return get_or_create_item_in_container(
|
||||
container, processed_key, allow_add_key=allow_append
|
||||
)
|
||||
|
||||
return next_level_serialized_object
|
||||
except IndexError as e:
|
||||
raise SerializationPathError(f"Index '{processed_key}': {e}")
|
||||
except KeyError as e:
|
||||
raise SerializationPathError(f"Key '{processed_key}': {e}")
|
||||
|
||||
|
||||
def generate_serialized_data_paths(
|
||||
data: dict[str, Any], parent_path: str = ""
|
||||
def get_data_paths_from_serialized_object( # noqa: C901
|
||||
serialized_obj: SerializedObject,
|
||||
parent_path: str = "",
|
||||
) -> list[str]:
|
||||
"""
|
||||
Generate a list of access paths for all attributes in a dictionary representing
|
||||
data serialized with `pydase.utils.serializer.Serializer`, excluding those that are
|
||||
methods. This function handles nested structures, including lists, by generating
|
||||
paths for each element in the nested lists.
|
||||
Recursively extracts full access paths from a serialized object.
|
||||
|
||||
Args:
|
||||
data (dict[str, Any]): The dictionary representing serialized data, typically
|
||||
produced by `pydase.utils.serializer.Serializer`.
|
||||
parent_path (str, optional): The base path to prepend to the keys in the `data`
|
||||
dictionary to form the access paths. Defaults to an empty string.
|
||||
serialized_obj (SerializedObject):
|
||||
The dictionary representing the serialization of an object. Produced by
|
||||
`pydase.utils.serializer.Serializer`.
|
||||
|
||||
Returns:
|
||||
list[str]: A list of strings where each string is a dot-notation access path
|
||||
to an attribute in the serialized data. For list elements, the path includes
|
||||
the index in square brackets.
|
||||
list[str]:
|
||||
A list of strings, each representing a full access path in the serialized
|
||||
object.
|
||||
"""
|
||||
|
||||
paths: list[str] = []
|
||||
|
||||
if isinstance(serialized_obj["value"], list):
|
||||
for index, value in enumerate(serialized_obj["value"]):
|
||||
new_path = f"{parent_path}[{index}]"
|
||||
paths.append(new_path)
|
||||
if serialized_dict_is_nested_object(value):
|
||||
paths.extend(get_data_paths_from_serialized_object(value, new_path))
|
||||
|
||||
elif serialized_dict_is_nested_object(serialized_obj):
|
||||
for key, value in cast(
|
||||
dict[str, SerializedObject], serialized_obj["value"]
|
||||
).items():
|
||||
# Serialized dictionaries need to have a different new_path than nested
|
||||
# classes
|
||||
if serialized_obj["type"] == "dict":
|
||||
processed_key = key
|
||||
if isinstance(key, str):
|
||||
processed_key = f'"{key}"'
|
||||
new_path = f"{parent_path}[{processed_key}]"
|
||||
else:
|
||||
new_path = f"{parent_path}.{key}" if parent_path != "" else key
|
||||
|
||||
paths.append(new_path)
|
||||
if serialized_dict_is_nested_object(value):
|
||||
paths.extend(get_data_paths_from_serialized_object(value, new_path))
|
||||
|
||||
return paths
|
||||
|
||||
|
||||
def generate_serialized_data_paths(
|
||||
data: dict[str, SerializedObject],
|
||||
) -> list[str]:
|
||||
"""
|
||||
Recursively extracts full access paths from a serialized DataService class instance.
|
||||
|
||||
Args:
|
||||
data (dict[str, SerializedObject]):
|
||||
The value of the "value" key of a serialized DataService class instance.
|
||||
|
||||
Returns:
|
||||
list[str]:
|
||||
A list of strings, each representing a full access path in the serialized
|
||||
object.
|
||||
"""
|
||||
|
||||
paths: list[str] = []
|
||||
|
||||
for key, value in data.items():
|
||||
new_path = f"{parent_path}.{key}" if parent_path else key
|
||||
paths.append(new_path)
|
||||
paths.append(key)
|
||||
|
||||
if serialized_dict_is_nested_object(value):
|
||||
if isinstance(value["value"], list):
|
||||
for index, item in enumerate(value["value"]):
|
||||
indexed_key_path = f"{new_path}[{index}]"
|
||||
paths.append(indexed_key_path)
|
||||
if serialized_dict_is_nested_object(item):
|
||||
paths.extend(
|
||||
generate_serialized_data_paths(
|
||||
item["value"], indexed_key_path
|
||||
)
|
||||
)
|
||||
continue
|
||||
paths.extend(generate_serialized_data_paths(value["value"], new_path))
|
||||
paths.extend(get_data_paths_from_serialized_object(value, key))
|
||||
return paths
|
||||
|
||||
|
||||
def serialized_dict_is_nested_object(serialized_dict: SerializedObject) -> bool:
|
||||
return (
|
||||
serialized_dict["type"] != "Quantity"
|
||||
and isinstance(serialized_dict["value"], dict)
|
||||
) or isinstance(serialized_dict["value"], list)
|
||||
value = serialized_dict["value"]
|
||||
# We are excluding Quantity here as the value corresponding to the "value" key is
|
||||
# a dictionary of the form {"magnitude": ..., "unit": ...}
|
||||
return serialized_dict["type"] != "Quantity" and (isinstance(value, dict | list))
|
||||
|
@ -12,6 +12,8 @@ def pydase_client() -> Generator[pydase.Client, None, Any]:
|
||||
class SubService(pydase.DataService):
|
||||
name = "SubService"
|
||||
|
||||
subservice_instance = SubService()
|
||||
|
||||
class MyService(pydase.DataService):
|
||||
def __init__(self) -> None:
|
||||
super().__init__()
|
||||
@ -19,6 +21,10 @@ def pydase_client() -> Generator[pydase.Client, None, Any]:
|
||||
self._my_property = 12.1
|
||||
self.sub_service = SubService()
|
||||
self.list_attr = [1, 2]
|
||||
self.dict_attr = {
|
||||
"foo": subservice_instance,
|
||||
"dotted.key": subservice_instance,
|
||||
}
|
||||
|
||||
@property
|
||||
def my_property(self) -> float:
|
||||
@ -104,6 +110,18 @@ def test_list(pydase_client: pydase.Client) -> None:
|
||||
assert pydase_client.proxy.list_attr == []
|
||||
|
||||
|
||||
def test_dict(pydase_client: pydase.Client) -> None:
|
||||
pydase_client.proxy.dict_attr["foo"].name = "foo"
|
||||
assert pydase_client.proxy.dict_attr["foo"].name == "foo"
|
||||
assert pydase_client.proxy.dict_attr["dotted.key"].name == "foo"
|
||||
|
||||
# pop will not return anything as the server object was deleted
|
||||
assert pydase_client.proxy.dict_attr.pop("dotted.key") is None
|
||||
|
||||
# pop will remove the dictionary entry on the server
|
||||
assert list(pydase_client.proxy.dict_attr.keys()) == ["foo"]
|
||||
|
||||
|
||||
def test_tab_completion(pydase_client: pydase.Client) -> None:
|
||||
# Tab completion gets its suggestions from the __dir__ class method
|
||||
assert all(
|
||||
|
216
tests/observer_pattern/observable/test_observable_dict.py
Normal file
216
tests/observer_pattern/observable/test_observable_dict.py
Normal file
@ -0,0 +1,216 @@
|
||||
import logging
|
||||
from typing import Any
|
||||
|
||||
import pytest
|
||||
from pydase.observer_pattern.observable import Observable
|
||||
from pydase.observer_pattern.observer import Observer
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class MyObserver(Observer):
|
||||
def on_change(self, full_access_path: str, value: Any) -> None:
|
||||
logger.info("'%s' changed to '%s'", full_access_path, value)
|
||||
|
||||
|
||||
def test_simple_class_dict_attribute(caplog: pytest.LogCaptureFixture) -> None:
|
||||
class MyObservable(Observable):
|
||||
dict_attr = {"first": "Hello"}
|
||||
|
||||
instance = MyObservable()
|
||||
MyObserver(instance)
|
||||
instance.dict_attr["first"] = "Ciao"
|
||||
instance.dict_attr["second"] = "World"
|
||||
|
||||
assert "'dict_attr[\"first\"]' changed to 'Ciao'" in caplog.text
|
||||
assert "'dict_attr[\"second\"]' changed to 'World'" in caplog.text
|
||||
|
||||
|
||||
def test_instance_dict_attribute(caplog: pytest.LogCaptureFixture) -> None:
|
||||
class NestedObservable(Observable):
|
||||
def __init__(self) -> None:
|
||||
super().__init__()
|
||||
self.name = "Hello"
|
||||
|
||||
class MyObservable(Observable):
|
||||
def __init__(self) -> None:
|
||||
super().__init__()
|
||||
self.dict_attr = {"first": NestedObservable()}
|
||||
|
||||
instance = MyObservable()
|
||||
MyObserver(instance)
|
||||
instance.dict_attr["first"].name = "Ciao"
|
||||
|
||||
assert "'dict_attr[\"first\"].name' changed to 'Ciao'" in caplog.text
|
||||
|
||||
|
||||
def test_class_dict_attribute(caplog: pytest.LogCaptureFixture) -> None:
|
||||
class NestedObservable(Observable):
|
||||
name = "Hello"
|
||||
|
||||
class MyObservable(Observable):
|
||||
dict_attr = {"first": NestedObservable()}
|
||||
|
||||
instance = MyObservable()
|
||||
MyObserver(instance)
|
||||
instance.dict_attr["first"].name = "Ciao"
|
||||
|
||||
assert "'dict_attr[\"first\"].name' changed to 'Ciao'" in caplog.text
|
||||
|
||||
|
||||
def test_nested_dict_instances(caplog: pytest.LogCaptureFixture) -> None:
|
||||
dict_instance = {"first": "Hello", "second": "World"}
|
||||
|
||||
class MyObservable(Observable):
|
||||
def __init__(self) -> None:
|
||||
super().__init__()
|
||||
self.nested_dict_attr = {"nested": dict_instance}
|
||||
|
||||
instance = MyObservable()
|
||||
MyObserver(instance)
|
||||
instance.nested_dict_attr["nested"]["first"] = "Ciao"
|
||||
|
||||
assert "'nested_dict_attr[\"nested\"][\"first\"]' changed to 'Ciao'" in caplog.text
|
||||
|
||||
|
||||
def test_dict_in_list_instance(caplog: pytest.LogCaptureFixture) -> None:
|
||||
dict_instance = {"first": "Hello", "second": "World"}
|
||||
|
||||
class MyObservable(Observable):
|
||||
def __init__(self) -> None:
|
||||
super().__init__()
|
||||
self.dict_in_list = [dict_instance]
|
||||
|
||||
instance = MyObservable()
|
||||
MyObserver(instance)
|
||||
instance.dict_in_list[0]["first"] = "Ciao"
|
||||
|
||||
assert "'dict_in_list[0][\"first\"]' changed to 'Ciao'" in caplog.text
|
||||
|
||||
|
||||
def test_list_in_dict_instance(caplog: pytest.LogCaptureFixture) -> None:
|
||||
list_instance: list[Any] = [1, 2, 3]
|
||||
|
||||
class MyObservable(Observable):
|
||||
def __init__(self) -> None:
|
||||
super().__init__()
|
||||
self.list_in_dict = {"some_list": list_instance}
|
||||
|
||||
instance = MyObservable()
|
||||
MyObserver(instance)
|
||||
instance.list_in_dict["some_list"][0] = "Ciao"
|
||||
|
||||
assert "'list_in_dict[\"some_list\"][0]' changed to 'Ciao'" in caplog.text
|
||||
|
||||
|
||||
def test_key_type_error(caplog: pytest.LogCaptureFixture) -> None:
|
||||
class MyObservable(Observable):
|
||||
def __init__(self) -> None:
|
||||
super().__init__()
|
||||
self.dict_attr = {1.0: 1.0}
|
||||
|
||||
with pytest.raises(ValueError) as exc_info:
|
||||
MyObservable()
|
||||
|
||||
assert (
|
||||
"Invalid key type: 1.0 (float). In pydase services, dictionary keys must be "
|
||||
"strings." in str(exc_info)
|
||||
)
|
||||
|
||||
|
||||
def test_removed_observer_on_class_dict_attr(caplog: pytest.LogCaptureFixture) -> None:
|
||||
class NestedObservable(Observable):
|
||||
def __init__(self) -> None:
|
||||
super().__init__()
|
||||
self.name = "Hello"
|
||||
|
||||
nested_instance = NestedObservable()
|
||||
|
||||
class MyObservable(Observable):
|
||||
nested_attr = nested_instance
|
||||
changed_dict_attr = {"nested": nested_instance}
|
||||
|
||||
instance = MyObservable()
|
||||
MyObserver(instance)
|
||||
instance.changed_dict_attr["nested"] = "Ciao"
|
||||
|
||||
assert "'changed_dict_attr[\"nested\"]' changed to 'Ciao'" in caplog.text
|
||||
caplog.clear()
|
||||
|
||||
assert nested_instance._observers == {
|
||||
'["nested"]': [],
|
||||
"nested_attr": [instance],
|
||||
}
|
||||
|
||||
instance.nested_attr.name = "Hi"
|
||||
|
||||
assert "'nested_attr.name' changed to 'Hi'" in caplog.text
|
||||
assert "'changed_dict_attr[\"nested\"].name' changed to 'Hi'" not in caplog.text
|
||||
|
||||
|
||||
def test_removed_observer_on_instance_dict_attr(
|
||||
caplog: pytest.LogCaptureFixture,
|
||||
) -> None:
|
||||
class NestedObservable(Observable):
|
||||
def __init__(self) -> None:
|
||||
super().__init__()
|
||||
self.name = "Hello"
|
||||
|
||||
nested_instance = NestedObservable()
|
||||
|
||||
class MyObservable(Observable):
|
||||
def __init__(self) -> None:
|
||||
super().__init__()
|
||||
self.nested_attr = nested_instance
|
||||
self.changed_dict_attr = {"nested": nested_instance}
|
||||
|
||||
instance = MyObservable()
|
||||
MyObserver(instance)
|
||||
instance.changed_dict_attr["nested"] = "Ciao"
|
||||
|
||||
assert "'changed_dict_attr[\"nested\"]' changed to 'Ciao'" in caplog.text
|
||||
caplog.clear()
|
||||
|
||||
assert nested_instance._observers == {
|
||||
'["nested"]': [],
|
||||
"nested_attr": [instance],
|
||||
}
|
||||
|
||||
instance.nested_attr.name = "Hi"
|
||||
|
||||
assert "'nested_attr.name' changed to 'Hi'" in caplog.text
|
||||
assert "'changed_dict_attr[\"nested\"].name' changed to 'Hi'" not in caplog.text
|
||||
|
||||
|
||||
def test_dotted_dict_key(caplog: pytest.LogCaptureFixture) -> None:
|
||||
class MyObservable(Observable):
|
||||
def __init__(self) -> None:
|
||||
super().__init__()
|
||||
self.dict_attr = {"dotted.key": 1.0}
|
||||
|
||||
instance = MyObservable()
|
||||
MyObserver(instance)
|
||||
instance.dict_attr["dotted.key"] = "Ciao"
|
||||
|
||||
assert "'dict_attr[\"dotted.key\"]' changed to 'Ciao'" in caplog.text
|
||||
|
||||
|
||||
def test_pop(caplog: pytest.LogCaptureFixture) -> None:
|
||||
class NestedObservable(Observable):
|
||||
def __init__(self) -> None:
|
||||
super().__init__()
|
||||
self.name = "Hello"
|
||||
|
||||
nested_instance = NestedObservable()
|
||||
|
||||
class MyObservable(Observable):
|
||||
def __init__(self) -> None:
|
||||
super().__init__()
|
||||
self.dict_attr = {"nested": nested_instance}
|
||||
|
||||
instance = MyObservable()
|
||||
MyObserver(instance)
|
||||
assert instance.dict_attr.pop("nested") == nested_instance
|
||||
assert nested_instance._observers == {'["nested"]': []}
|
||||
|
||||
assert f"'dict_attr' changed to '{instance.dict_attr}'" in caplog.text
|
@ -69,66 +69,6 @@ def test_class_object_list_attribute(caplog: pytest.LogCaptureFixture) -> None:
|
||||
assert "'list_attr[0].name' changed to 'Ciao'" in caplog.text
|
||||
|
||||
|
||||
def test_simple_instance_dict_attribute(caplog: pytest.LogCaptureFixture) -> None:
|
||||
class MyObservable(Observable):
|
||||
def __init__(self) -> None:
|
||||
super().__init__()
|
||||
self.dict_attr = {"first": "Hello"}
|
||||
|
||||
instance = MyObservable()
|
||||
MyObserver(instance)
|
||||
instance.dict_attr["first"] = "Ciao"
|
||||
instance.dict_attr["second"] = "World"
|
||||
|
||||
assert "'dict_attr['first']' changed to 'Ciao'" in caplog.text
|
||||
assert "'dict_attr['second']' changed to 'World'" in caplog.text
|
||||
|
||||
|
||||
def test_simple_class_dict_attribute(caplog: pytest.LogCaptureFixture) -> None:
|
||||
class MyObservable(Observable):
|
||||
dict_attr = {"first": "Hello"}
|
||||
|
||||
instance = MyObservable()
|
||||
MyObserver(instance)
|
||||
instance.dict_attr["first"] = "Ciao"
|
||||
instance.dict_attr["second"] = "World"
|
||||
|
||||
assert "'dict_attr['first']' changed to 'Ciao'" in caplog.text
|
||||
assert "'dict_attr['second']' changed to 'World'" in caplog.text
|
||||
|
||||
|
||||
def test_instance_dict_attribute(caplog: pytest.LogCaptureFixture) -> None:
|
||||
class NestedObservable(Observable):
|
||||
def __init__(self) -> None:
|
||||
super().__init__()
|
||||
self.name = "Hello"
|
||||
|
||||
class MyObservable(Observable):
|
||||
def __init__(self) -> None:
|
||||
super().__init__()
|
||||
self.dict_attr = {"first": NestedObservable()}
|
||||
|
||||
instance = MyObservable()
|
||||
MyObserver(instance)
|
||||
instance.dict_attr["first"].name = "Ciao"
|
||||
|
||||
assert "'dict_attr['first'].name' changed to 'Ciao'" in caplog.text
|
||||
|
||||
|
||||
def test_class_dict_attribute(caplog: pytest.LogCaptureFixture) -> None:
|
||||
class NestedObservable(Observable):
|
||||
name = "Hello"
|
||||
|
||||
class MyObservable(Observable):
|
||||
dict_attr = {"first": NestedObservable()}
|
||||
|
||||
instance = MyObservable()
|
||||
MyObserver(instance)
|
||||
instance.dict_attr["first"].name = "Ciao"
|
||||
|
||||
assert "'dict_attr['first'].name' changed to 'Ciao'" in caplog.text
|
||||
|
||||
|
||||
def test_removed_observer_on_class_list_attr(caplog: pytest.LogCaptureFixture) -> None:
|
||||
class NestedObservable(Observable):
|
||||
name = "Hello"
|
||||
@ -152,35 +92,6 @@ def test_removed_observer_on_class_list_attr(caplog: pytest.LogCaptureFixture) -
|
||||
assert "'changed_list_attr[0].name' changed to 'Hi'" not in caplog.text
|
||||
|
||||
|
||||
def test_removed_observer_on_instance_dict_attr(
|
||||
caplog: pytest.LogCaptureFixture,
|
||||
) -> None:
|
||||
class NestedObservable(Observable):
|
||||
def __init__(self) -> None:
|
||||
super().__init__()
|
||||
self.name = "Hello"
|
||||
|
||||
nested_instance = NestedObservable()
|
||||
|
||||
class MyObservable(Observable):
|
||||
def __init__(self) -> None:
|
||||
super().__init__()
|
||||
self.nested_attr = nested_instance
|
||||
self.changed_dict_attr = {"nested": nested_instance}
|
||||
|
||||
instance = MyObservable()
|
||||
MyObserver(instance)
|
||||
instance.changed_dict_attr["nested"] = "Ciao"
|
||||
|
||||
assert "'changed_dict_attr['nested']' changed to 'Ciao'" in caplog.text
|
||||
caplog.clear()
|
||||
|
||||
instance.nested_attr.name = "Hi"
|
||||
|
||||
assert "'nested_attr.name' changed to 'Hi'" in caplog.text
|
||||
assert "'changed_dict_attr['nested'].name' changed to 'Hi'" not in caplog.text
|
||||
|
||||
|
||||
def test_removed_observer_on_instance_list_attr(
|
||||
caplog: pytest.LogCaptureFixture,
|
||||
) -> None:
|
||||
@ -210,78 +121,6 @@ def test_removed_observer_on_instance_list_attr(
|
||||
assert "'changed_list_attr[0].name' changed to 'Hi'" not in caplog.text
|
||||
|
||||
|
||||
def test_removed_observer_on_class_dict_attr(caplog: pytest.LogCaptureFixture) -> None:
|
||||
class NestedObservable(Observable):
|
||||
def __init__(self) -> None:
|
||||
super().__init__()
|
||||
self.name = "Hello"
|
||||
|
||||
nested_instance = NestedObservable()
|
||||
|
||||
class MyObservable(Observable):
|
||||
def __init__(self) -> None:
|
||||
super().__init__()
|
||||
self.nested_attr = nested_instance
|
||||
self.changed_dict_attr = {"nested": nested_instance}
|
||||
|
||||
instance = MyObservable()
|
||||
MyObserver(instance)
|
||||
instance.changed_dict_attr["nested"] = "Ciao"
|
||||
|
||||
assert "'changed_dict_attr['nested']' changed to 'Ciao'" in caplog.text
|
||||
caplog.clear()
|
||||
|
||||
instance.nested_attr.name = "Hi"
|
||||
|
||||
assert "'nested_attr.name' changed to 'Hi'" in caplog.text
|
||||
assert "'changed_dict_attr['nested'].name' changed to 'Hi'" not in caplog.text
|
||||
|
||||
|
||||
def test_nested_dict_instances(caplog: pytest.LogCaptureFixture) -> None:
|
||||
dict_instance = {"first": "Hello", "second": "World"}
|
||||
|
||||
class MyObservable(Observable):
|
||||
def __init__(self) -> None:
|
||||
super().__init__()
|
||||
self.nested_dict_attr = {"nested": dict_instance}
|
||||
|
||||
instance = MyObservable()
|
||||
MyObserver(instance)
|
||||
instance.nested_dict_attr["nested"]["first"] = "Ciao"
|
||||
|
||||
assert "'nested_dict_attr['nested']['first']' changed to 'Ciao'" in caplog.text
|
||||
|
||||
|
||||
def test_dict_in_list_instance(caplog: pytest.LogCaptureFixture) -> None:
|
||||
dict_instance = {"first": "Hello", "second": "World"}
|
||||
|
||||
class MyObservable(Observable):
|
||||
def __init__(self) -> None:
|
||||
super().__init__()
|
||||
self.dict_in_list = [dict_instance]
|
||||
|
||||
instance = MyObservable()
|
||||
MyObserver(instance)
|
||||
instance.dict_in_list[0]["first"] = "Ciao"
|
||||
|
||||
assert "'dict_in_list[0]['first']' changed to 'Ciao'" in caplog.text
|
||||
|
||||
|
||||
def test_list_in_dict_instance(caplog: pytest.LogCaptureFixture) -> None:
|
||||
list_instance: list[Any] = [1, 2, 3]
|
||||
|
||||
class MyObservable(Observable):
|
||||
def __init__(self) -> None:
|
||||
super().__init__()
|
||||
self.list_in_dict = {"some_list": list_instance}
|
||||
|
||||
instance = MyObservable()
|
||||
MyObserver(instance)
|
||||
instance.list_in_dict["some_list"][0] = "Ciao"
|
||||
|
||||
assert "'list_in_dict['some_list'][0]' changed to 'Ciao'" in caplog.text
|
||||
|
||||
|
||||
def test_list_append(caplog: pytest.LogCaptureFixture) -> None:
|
||||
class OtherObservable(Observable):
|
||||
def __init__(self) -> None:
|
||||
|
@ -1,7 +1,7 @@
|
||||
import asyncio
|
||||
import enum
|
||||
from enum import Enum
|
||||
from typing import Any
|
||||
from typing import Any, ClassVar
|
||||
|
||||
import pydase
|
||||
import pydase.units as u
|
||||
@ -13,8 +13,10 @@ from pydase.utils.serialization.serializer import (
|
||||
SerializationPathError,
|
||||
SerializedObject,
|
||||
dump,
|
||||
generate_serialized_data_paths,
|
||||
get_container_item_by_key,
|
||||
get_data_paths_from_serialized_object,
|
||||
get_nested_dict_by_path,
|
||||
get_next_level_dict_by_key,
|
||||
serialized_dict_is_nested_object,
|
||||
set_nested_value_by_path,
|
||||
)
|
||||
@ -27,6 +29,26 @@ class MyEnum(enum.Enum):
|
||||
FINISHED = "finished"
|
||||
|
||||
|
||||
class MySubclass(pydase.DataService):
|
||||
attr3 = 1.0
|
||||
list_attr: ClassVar[list[Any]] = [1.0, 1]
|
||||
some_quantity: u.Quantity = 1.0 * u.units.A
|
||||
|
||||
|
||||
class ServiceClass(pydase.DataService):
|
||||
attr1 = 1.0
|
||||
attr2 = MySubclass()
|
||||
enum_attr = MyEnum.RUNNING
|
||||
attr_list: ClassVar[list[Any]] = [0, 1, MySubclass()]
|
||||
dict_attr: ClassVar[dict[Any, Any]] = {"foo": 1.0, "bar": {"foo": "bar"}}
|
||||
|
||||
def my_task(self) -> None:
|
||||
pass
|
||||
|
||||
|
||||
service_instance = ServiceClass()
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"test_input, expected",
|
||||
[
|
||||
@ -378,7 +400,7 @@ def test_dict_serialization() -> None:
|
||||
|
||||
test_dict = {
|
||||
"int_key": 1,
|
||||
"float_key": 1.0,
|
||||
"1.0": 1.0,
|
||||
"bool_key": True,
|
||||
"Quantity_key": 1.0 * u.units.s,
|
||||
"DataService_key": MyClass(),
|
||||
@ -420,8 +442,8 @@ def test_dict_serialization() -> None:
|
||||
"type": "bool",
|
||||
"value": True,
|
||||
},
|
||||
"float_key": {
|
||||
"full_access_path": '["float_key"]',
|
||||
"1.0": {
|
||||
"full_access_path": '["1.0"]',
|
||||
"doc": None,
|
||||
"readonly": False,
|
||||
"type": "float",
|
||||
@ -468,22 +490,125 @@ def test_derived_data_service_serialization() -> None:
|
||||
|
||||
@pytest.fixture
|
||||
def setup_dict() -> dict[str, Any]:
|
||||
class MySubclass(pydase.DataService):
|
||||
attr3 = 1.0
|
||||
list_attr = [1.0, 1]
|
||||
|
||||
class ServiceClass(pydase.DataService):
|
||||
attr1 = 1.0
|
||||
attr2 = MySubclass()
|
||||
enum_attr = MyEnum.RUNNING
|
||||
attr_list = [0, 1, MySubclass()]
|
||||
|
||||
def my_task(self) -> None:
|
||||
pass
|
||||
|
||||
return ServiceClass().serialize()["value"] # type: ignore
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"serialized_object, attr_name, allow_append, expected",
|
||||
[
|
||||
(
|
||||
dump(service_instance)["value"],
|
||||
"attr1",
|
||||
False,
|
||||
{
|
||||
"doc": None,
|
||||
"full_access_path": "attr1",
|
||||
"readonly": False,
|
||||
"type": "float",
|
||||
"value": 1.0,
|
||||
},
|
||||
),
|
||||
(
|
||||
dump(service_instance.attr_list)["value"],
|
||||
"[0]",
|
||||
False,
|
||||
{
|
||||
"doc": None,
|
||||
"full_access_path": "[0]",
|
||||
"readonly": False,
|
||||
"type": "int",
|
||||
"value": 0,
|
||||
},
|
||||
),
|
||||
(
|
||||
dump(service_instance.attr_list)["value"],
|
||||
"[3]",
|
||||
True,
|
||||
{
|
||||
# we do not know the full_access_path of this entry within the
|
||||
# serialized object
|
||||
"full_access_path": "",
|
||||
"value": None,
|
||||
"type": "None",
|
||||
"doc": None,
|
||||
"readonly": False,
|
||||
},
|
||||
),
|
||||
(
|
||||
dump(service_instance.attr_list)["value"],
|
||||
"[3]",
|
||||
False,
|
||||
SerializationPathError,
|
||||
),
|
||||
(
|
||||
dump(service_instance.dict_attr)["value"],
|
||||
"['foo']",
|
||||
False,
|
||||
{
|
||||
"full_access_path": '["foo"]',
|
||||
"value": 1.0,
|
||||
"type": "float",
|
||||
"doc": None,
|
||||
"readonly": False,
|
||||
},
|
||||
),
|
||||
(
|
||||
dump(service_instance.dict_attr)["value"],
|
||||
"['unset_key']",
|
||||
True,
|
||||
{
|
||||
# we do not know the full_access_path of this entry within the
|
||||
# serialized object
|
||||
"full_access_path": "",
|
||||
"value": None,
|
||||
"type": "None",
|
||||
"doc": None,
|
||||
"readonly": False,
|
||||
},
|
||||
),
|
||||
(
|
||||
dump(service_instance.dict_attr)["value"],
|
||||
"['unset_key']",
|
||||
False,
|
||||
SerializationPathError,
|
||||
),
|
||||
(
|
||||
dump(service_instance)["value"],
|
||||
"invalid_path",
|
||||
True,
|
||||
{
|
||||
# we do not know the full_access_path of this entry within the
|
||||
# serialized object
|
||||
"full_access_path": "",
|
||||
"value": None,
|
||||
"type": "None",
|
||||
"doc": None,
|
||||
"readonly": False,
|
||||
},
|
||||
),
|
||||
(
|
||||
dump(service_instance)["value"],
|
||||
"invalid_path",
|
||||
False,
|
||||
SerializationPathError,
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_get_container_item_by_key(
|
||||
serialized_object: dict[str, Any], attr_name: str, allow_append: bool, expected: Any
|
||||
) -> None:
|
||||
if isinstance(expected, type) and issubclass(expected, Exception):
|
||||
with pytest.raises(expected):
|
||||
get_container_item_by_key(
|
||||
serialized_object, attr_name, allow_append=allow_append
|
||||
)
|
||||
else:
|
||||
nested_dict = get_container_item_by_key(
|
||||
serialized_object, attr_name, allow_append=allow_append
|
||||
)
|
||||
assert nested_dict == expected
|
||||
|
||||
|
||||
def test_update_attribute(setup_dict: dict[str, Any]) -> None:
|
||||
set_nested_value_by_path(setup_dict, "attr1", 15)
|
||||
assert setup_dict["attr1"]["value"] == 15
|
||||
@ -565,8 +690,8 @@ def test_update_invalid_list_index(
|
||||
) -> None:
|
||||
set_nested_value_by_path(setup_dict, "attr_list[10]", 30)
|
||||
assert (
|
||||
"Error occured trying to change 'attr_list[10]': list index "
|
||||
"out of range" in caplog.text
|
||||
"Error occured trying to change 'attr_list[10]': Index '10': list index out of "
|
||||
"range" in caplog.text
|
||||
)
|
||||
|
||||
|
||||
@ -580,26 +705,6 @@ def test_update_class_attribute_inside_list(setup_dict: dict[str, Any]) -> None:
|
||||
assert setup_dict["attr_list"]["value"][2]["value"]["attr3"]["value"] == 50 # noqa
|
||||
|
||||
|
||||
def test_get_next_level_attribute_nested_dict(setup_dict: dict[str, Any]) -> None:
|
||||
nested_dict = get_next_level_dict_by_key(setup_dict, "attr1")
|
||||
assert nested_dict == setup_dict["attr1"]
|
||||
|
||||
|
||||
def test_get_next_level_list_entry_nested_dict(setup_dict: dict[str, Any]) -> None:
|
||||
nested_dict = get_next_level_dict_by_key(setup_dict, "attr_list[0]")
|
||||
assert nested_dict == setup_dict["attr_list"]["value"][0]
|
||||
|
||||
|
||||
def test_get_next_level_invalid_path_nested_dict(setup_dict: dict[str, Any]) -> None:
|
||||
with pytest.raises(SerializationPathError):
|
||||
get_next_level_dict_by_key(setup_dict, "invalid_path")
|
||||
|
||||
|
||||
def test_get_next_level_invalid_list_index(setup_dict: dict[str, Any]) -> None:
|
||||
with pytest.raises(SerializationPathError):
|
||||
get_next_level_dict_by_key(setup_dict, "attr_list[10]")
|
||||
|
||||
|
||||
def test_get_attribute(setup_dict: dict[str, Any]) -> None:
|
||||
nested_dict = get_nested_dict_by_path(setup_dict, "attr1")
|
||||
assert nested_dict["value"] == 1.0
|
||||
@ -871,3 +976,89 @@ def test_dynamically_add_attributes(test_input: Any, expected: dict[str, Any]) -
|
||||
|
||||
set_nested_value_by_path(serialized_object, "new_attr", test_input)
|
||||
assert serialized_object == expected
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"obj, expected",
|
||||
[
|
||||
(
|
||||
service_instance.attr2,
|
||||
[
|
||||
"attr3",
|
||||
"list_attr",
|
||||
"list_attr[0]",
|
||||
"list_attr[1]",
|
||||
"some_quantity",
|
||||
],
|
||||
),
|
||||
(
|
||||
service_instance.dict_attr,
|
||||
[
|
||||
'["foo"]',
|
||||
'["bar"]',
|
||||
'["bar"]["foo"]',
|
||||
],
|
||||
),
|
||||
(
|
||||
service_instance.attr_list,
|
||||
[
|
||||
"[0]",
|
||||
"[1]",
|
||||
"[2]",
|
||||
"[2].attr3",
|
||||
"[2].list_attr",
|
||||
"[2].list_attr[0]",
|
||||
"[2].list_attr[1]",
|
||||
"[2].some_quantity",
|
||||
],
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_get_data_paths_from_serialized_object(obj: Any, expected: list[str]) -> None:
|
||||
assert get_data_paths_from_serialized_object(dump(obj=obj)) == expected
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"obj, expected",
|
||||
[
|
||||
(
|
||||
service_instance,
|
||||
[
|
||||
"attr1",
|
||||
"attr2",
|
||||
"attr2.attr3",
|
||||
"attr2.list_attr",
|
||||
"attr2.list_attr[0]",
|
||||
"attr2.list_attr[1]",
|
||||
"attr2.some_quantity",
|
||||
"attr_list",
|
||||
"attr_list[0]",
|
||||
"attr_list[1]",
|
||||
"attr_list[2]",
|
||||
"attr_list[2].attr3",
|
||||
"attr_list[2].list_attr",
|
||||
"attr_list[2].list_attr[0]",
|
||||
"attr_list[2].list_attr[1]",
|
||||
"attr_list[2].some_quantity",
|
||||
"dict_attr",
|
||||
'dict_attr["foo"]',
|
||||
'dict_attr["bar"]',
|
||||
'dict_attr["bar"]["foo"]',
|
||||
"enum_attr",
|
||||
"my_task",
|
||||
],
|
||||
),
|
||||
(
|
||||
service_instance.attr2,
|
||||
[
|
||||
"attr3",
|
||||
"list_attr",
|
||||
"list_attr[0]",
|
||||
"list_attr[1]",
|
||||
"some_quantity",
|
||||
],
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_generate_serialized_data_paths(obj: Any, expected: list[str]) -> None:
|
||||
assert generate_serialized_data_paths(dump(obj=obj)["value"]) == expected
|
||||
|
@ -1,10 +1,113 @@
|
||||
from typing import Any
|
||||
|
||||
import pydase
|
||||
import pytest
|
||||
from pydase.utils.helpers import (
|
||||
get_object_by_path_parts,
|
||||
get_path_from_path_parts,
|
||||
is_property_attribute,
|
||||
parse_full_access_path,
|
||||
parse_serialized_key,
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"serialized_key, expected",
|
||||
[
|
||||
("attr_name", "attr_name"),
|
||||
("[0]", 0),
|
||||
("[0.0]", 0.0),
|
||||
('["some_key"]', "some_key"),
|
||||
('["12.34"]', "12.34"),
|
||||
],
|
||||
)
|
||||
def test_parse_serialized_key(serialized_key: str, expected: str) -> None:
|
||||
assert parse_serialized_key(serialized_key) == expected
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"full_access_path, expected",
|
||||
[
|
||||
("attr_name", ["attr_name"]),
|
||||
("parent.attr_name", ["parent", "attr_name"]),
|
||||
("nested.parent.attr_name", ["nested", "parent", "attr_name"]),
|
||||
("nested.parent.attr_name", ["nested", "parent", "attr_name"]),
|
||||
("attr_name[0]", ["attr_name", "[0]"]),
|
||||
("parent.attr_name[0]", ["parent", "attr_name", "[0]"]),
|
||||
("attr_name[0][1]", ["attr_name", "[0]", "[1]"]),
|
||||
('attr_name[0]["some_key"]', ["attr_name", "[0]", '["some_key"]']),
|
||||
(
|
||||
'dict_attr["some_key"].attr_name["other_key"]',
|
||||
["dict_attr", '["some_key"]', "attr_name", '["other_key"]'],
|
||||
),
|
||||
("dict_attr[2.1]", ["dict_attr", "[2.1]"]),
|
||||
],
|
||||
)
|
||||
def test_parse_full_access_path(full_access_path: str, expected: list[str]) -> None:
|
||||
assert parse_full_access_path(full_access_path) == expected
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"path_parts, expected",
|
||||
[
|
||||
(["attr_name"], "attr_name"),
|
||||
(["parent", "attr_name"], "parent.attr_name"),
|
||||
(["nested", "parent", "attr_name"], "nested.parent.attr_name"),
|
||||
(["nested", "parent", "attr_name"], "nested.parent.attr_name"),
|
||||
(["attr_name", "[0]"], "attr_name[0]"),
|
||||
(["parent", "attr_name", "[0]"], "parent.attr_name[0]"),
|
||||
(["attr_name", "[0]", "[1]"], "attr_name[0][1]"),
|
||||
(["attr_name", "[0]", '["some_key"]'], 'attr_name[0]["some_key"]'),
|
||||
(
|
||||
["dict_attr", '["some_key"]', "attr_name", '["other_key"]'],
|
||||
'dict_attr["some_key"].attr_name["other_key"]',
|
||||
),
|
||||
(["dict_attr", "[2.1]"], "dict_attr[2.1]"),
|
||||
],
|
||||
)
|
||||
def test_get_path_from_path_parts(path_parts: list[str], expected: str) -> None:
|
||||
assert get_path_from_path_parts(path_parts) == expected
|
||||
|
||||
|
||||
class SubService(pydase.DataService):
|
||||
name = "SubService"
|
||||
some_int = 1
|
||||
some_float = 1.0
|
||||
|
||||
|
||||
class MyService(pydase.DataService):
|
||||
def __init__(self) -> None:
|
||||
super().__init__()
|
||||
self.some_float = 1.0
|
||||
self.subservice = SubService()
|
||||
self.list_attr = [1.0, SubService()]
|
||||
self.dict_attr = {"foo": SubService(), "dotted.key": "float_as_key"}
|
||||
|
||||
|
||||
service_instance = MyService()
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"path_parts, expected",
|
||||
[
|
||||
(["some_float"], service_instance.some_float),
|
||||
(["subservice"], service_instance.subservice),
|
||||
(["list_attr", "[0]"], service_instance.list_attr[0]),
|
||||
(["list_attr", "[1]"], service_instance.list_attr[1]),
|
||||
(["dict_attr", '["foo"]'], service_instance.dict_attr["foo"]),
|
||||
(["dict_attr", '["foo"]', "name"], service_instance.dict_attr["foo"].name), # type: ignore
|
||||
(["dict_attr", '["dotted.key"]'], service_instance.dict_attr["dotted.key"]),
|
||||
],
|
||||
)
|
||||
def test_get_object_by_path_parts(path_parts: list[str], expected: Any) -> None:
|
||||
assert get_object_by_path_parts(service_instance, path_parts) == expected
|
||||
|
||||
|
||||
def test_get_object_by_path_parts_error(caplog: pytest.LogCaptureFixture) -> None:
|
||||
assert get_object_by_path_parts(service_instance, ["non_existent_attr"]) is None
|
||||
assert "Attribute 'non_existent_attr' does not exist in the object." in caplog.text
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"attr_name, expected",
|
||||
[
|
||||
@ -12,13 +115,29 @@ from pydase.utils.helpers import (
|
||||
("my_property", True),
|
||||
("my_method", False),
|
||||
("non_existent_attr", False),
|
||||
("nested_class_instance", False),
|
||||
("nested_class_instance.my_property", True),
|
||||
("list_attr", False),
|
||||
("list_attr[0]", False),
|
||||
("list_attr[0].my_property", True),
|
||||
("dict_attr", False),
|
||||
("dict_attr['foo']", False),
|
||||
("dict_attr['foo'].my_property", True),
|
||||
],
|
||||
)
|
||||
def test_is_property_attribute(attr_name: str, expected: bool) -> None:
|
||||
class NestedClass:
|
||||
@property
|
||||
def my_property(self) -> str:
|
||||
return "I'm a nested property"
|
||||
|
||||
# Test Suite
|
||||
class DummyClass:
|
||||
def __init__(self) -> None:
|
||||
self.regular_attribute = "I'm just an attribute"
|
||||
self.nested_class_instance = NestedClass()
|
||||
self.list_attr = [NestedClass()]
|
||||
self.dict_attr = {"foo": NestedClass()}
|
||||
|
||||
@property
|
||||
def my_property(self) -> str:
|
||||
|
Loading…
x
Reference in New Issue
Block a user