1 Commits

Author SHA1 Message Date
gac bernina 32f33f9f9c test for better lazy init 2026-05-26 21:27:48 +02:00
2 changed files with 306 additions and 157 deletions
+275 -156
View File
@@ -19,7 +19,8 @@ from importlib import import_module
from lazy_object_proxy import Proxy as Proxy_orig
from tabulate import tabulate
from concurrent.futures import ThreadPoolExecutor, as_completed
from threading import Thread
from dataclasses import dataclass, field
from threading import Event, Thread, local
from tqdm import tqdm
from rich import progress
from inspect import signature
@@ -113,7 +114,7 @@ def format_manual_instantiation(
]
call_text = f"{obj_name}({', '.join(call_parts)})"
return f"For manual instantiation copy/paste: {import_stmt}; {call_text}"
return f"Manual instantiation attempt: {import_stmt}; {call_text}"
def append_manual_context(exc, manual_context):
@@ -176,7 +177,7 @@ def replaceComponent(inp, dict_all, config_all, lazy=False):
ind = [ta.name == tca["name"] for tca in config_all].index(True)
outp.append(
initFromConfigList(
config_list[ind : ind + 1], config_all, lazy=lazy
config_all[ind : ind + 1], config_all, lazy=lazy
)
)
elif isinstance(ta, dict) or isinstance(ta, list):
@@ -190,9 +191,9 @@ def replaceComponent(inp, dict_all, config_all, lazy=False):
if ta.name in dict_all.keys():
outp[tk] = dict_all[ta.name]
else:
ind = [tk.name == tca["name"] for tca in config_all].index(True)
ind = [ta.name == tca["name"] for tca in config_all].index(True)
outp[tk] = initFromConfigList(
config_list[ind : ind + 1], config_all, lazy=lazy
config_all[ind : ind + 1], config_all, lazy=lazy
)
elif isinstance(ta, dict) or isinstance(ta, list):
outp[tk] = replaceComponent(ta, dict_all, config_all, lazy=lazy)
@@ -356,6 +357,21 @@ class IsInitialisingError(Exception):
pass
@dataclass
class LazyNode:
name: str
factory: object
args: tuple
kwargs: dict
module_name: str = None
dependencies: set = field(default_factory=set)
state: str = "pending"
result: object = None
exception: Exception = None
event: Event = field(default_factory=Event)
start_time: float = 0.0
class Namespace(Assembly):
def __init__(
self,
@@ -368,6 +384,7 @@ class Namespace(Assembly):
super().__init__(name)
# self.name = name
self.lazy_items = {}
self.lazy_nodes = {}
self.initialized_items = {}
self.failed_items = {}
self.failed_items_excpetion = {}
@@ -378,6 +395,7 @@ class Namespace(Assembly):
self.names_without_alias = []
self._initializing = []
self._thread_local = local()
self.root_module = root_module
self.alias_namespace = alias_namespace
if required_names_directory:
@@ -430,6 +448,10 @@ class Namespace(Assembly):
names = self.failed_names
for name in names:
self.lazy_items[name] = self.failed_items.pop(name)
self.lazy_nodes[name].state = "pending"
self.lazy_nodes[name].result = None
self.lazy_nodes[name].exception = None
self.lazy_nodes[name].event.clear()
try:
self.failed_items_excpetion.pop(name)
except KeyError:
@@ -821,6 +843,202 @@ class Namespace(Assembly):
aliases_out = aliases
return aliases, has_no_aliases
def _collect_dependencies_from_value(self, value):
if isinstance(value, NamespaceComponent):
return {value.obj_name}
if isinstance(value, Component):
return {value.name}
if isinstance(value, list):
deps = set()
for item in value:
deps |= self._collect_dependencies_from_value(item)
return deps
if isinstance(value, tuple):
deps = set()
for item in value:
deps |= self._collect_dependencies_from_value(item)
return deps
if isinstance(value, dict):
deps = set()
for item in value.values():
deps |= self._collect_dependencies_from_value(item)
return deps
return set()
def _collect_dependencies(self, *args, **kwargs):
dependencies = set()
for value in args:
dependencies |= self._collect_dependencies_from_value(value)
for value in kwargs.values():
dependencies |= self._collect_dependencies_from_value(value)
return dependencies
def _resolve_factory_inputs(self, *args, lazy=False, **kwargs):
resolved_args = []
for value in args:
resolved_args.append(self._resolve_factory_input(value, lazy=lazy))
resolved_kwargs = {}
for key, value in kwargs.items():
resolved_kwargs[key] = self._resolve_factory_input(value, lazy=lazy)
return resolved_args, resolved_kwargs
def _resolve_factory_input(self, value, lazy=False):
if isinstance(value, NamespaceComponent):
if lazy:
return Proxy(lambda value=value: value.get())
return self._materialize_dependency(value.obj_name)
if isinstance(value, Component):
if lazy:
return Proxy(lambda name=value.name: self._get_ready_object(name))
return self._materialize_dependency(value.name)
if isinstance(value, list):
return [self._resolve_factory_input(item, lazy=lazy) for item in value]
if isinstance(value, tuple):
return tuple(self._resolve_factory_input(item, lazy=lazy) for item in value)
if isinstance(value, dict):
return {
key: self._resolve_factory_input(item, lazy=lazy)
for key, item in value.items()
}
return value
def _get_ready_object(self, name):
if name in self.initialized_names:
return self.initialized_items[name]
if name in self.lazy_names:
return self._ensure_node(name)
raise Exception(f"Name {name} is not initialized!")
def _materialize_dependency(self, name):
if name in self.initialized_names:
return self.initialized_items[name]
if name not in self.lazy_names:
raise Exception(f"Name {name} is not initialized!")
proxy = self.lazy_items.get(name)
obj = self._ensure_node(name)
if proxy is not None:
self.lazy_items[name] = proxy
return obj
def _ensure_node(self, name):
node = self.lazy_nodes[name]
if node.state == "done":
return node.result
if node.state == "failed":
raise node.exception
if node.state == "pending":
node.state = "running"
node.start_time = time()
node.event.clear()
worker = Thread(target=self._run_node, args=(name,), daemon=True)
worker.start()
node.event.wait()
if node.state == "done":
return node.result
if node.state == "failed":
raise node.exception
raise RuntimeError(f"Lazy node {name} did not complete cleanly")
def _run_node(self, name):
node = self.lazy_nodes[name]
stack = getattr(self._thread_local, "stack", [])
try:
if name in stack:
raise RuntimeError(f"Cyclic lazy dependency detected for {name}")
stack.append(name)
self._thread_local.stack = stack
for dep_name in node.dependencies:
self._ensure_node(dep_name)
obj_maker = node.factory
if node.module_name:
obj_maker = getattr(import_module(node.module_name), node.factory)
args_resolved, kwargs_resolved = self._resolve_factory_inputs(
*node.args, lazy=True, **node.kwargs
)
accepts_name = "name" in signature(obj_maker).parameters
manual_context = format_manual_instantiation(
obj_maker,
args_resolved,
kwargs_resolved,
name=name,
accepts_name=accepts_name,
module_name=node.module_name,
)
try:
if accepts_name:
obj_initialized = obj_maker(
*args_resolved,
name=name,
**kwargs_resolved,
)
else:
obj_initialized = obj_maker(*args_resolved, **kwargs_resolved)
except Exception as e:
append_manual_context(e, manual_context)
self.failed_items[name] = self.lazy_items.pop(name)
self.failed_items_excpetion[name] = e
node.state = "failed"
node.exception = e
node.result = None
return
self.initialized_items[name] = obj_initialized
self.initialisation_times_lazy[name] = time() - node.start_time
try:
self.lazy_items.pop(name)
except KeyError:
pass
node.state = "done"
node.result = obj_initialized
node.exception = None
if hasattr(obj_initialized, "alias"):
self._append(
obj_initialized,
name=name,
is_setting=True,
is_display="recursive",
call_obj=False,
)
if self.alias_namespace and hasattr(obj_initialized, "alias"):
for ta in obj_initialized.alias.get_all():
try:
self.alias_namespace.update(
ta["alias"], ta["channel"], ta["channeltype"]
)
except Exception as e:
print(f'could not init alias {ta["alias"]}')
print("error message", e)
else:
self.names_without_alias.append(name)
return obj_initialized
except Exception as e:
node.state = "failed"
node.exception = e
node.result = None
return
finally:
node.event.set()
stack.pop()
self._thread_local.stack = stack
def append_obj(
self,
obj_factory,
@@ -831,152 +1049,63 @@ class Namespace(Assembly):
init_timeout=30,
**kwargs,
):
self.failed_items.pop(name, None)
self.failed_items_excpetion.pop(name, None)
self.initialized_items.pop(name, None)
self.lazy_items.pop(name, None)
self.lazy_nodes.pop(name, None)
if lazy:
def init_local():
if name in self.failed_names:
tmpexc = self.failed_items_excpetion
if isinstance(tmpexc[name], BaseException):
raise tmpexc[name]
else:
raise IsInitialisingError(
f"{name} failed previously to initialize."
)
if name in self._initializing:
self._init_priority[name] += 1
while name in self._initializing:
if (
time() - self._initialisation_start_time[name]
) <= init_timeout:
sleep(5)
else:
# print(f'{name} waiting init since {time()-self._initialisation_start_time[name]} s')
# sleep(5)
# # passfailed_items_excpetion
self._initializing.pop(self._initializing.index(name))
raise IsInitialisingError(
f"NB: {name} initialization timed out!"
)
else:
self._initializing.append(name)
self._init_priority[name] = 0
self._initialisation_start_time[name] = time()
# args, kwargs = replace_NamespaceComponents(*args, **kwargs)
if module_name:
obj_maker = getattr(import_module(module_name), obj_factory)
else:
obj_maker = obj_factory
args_resolved, kwargs_resolved = replace_NamespaceComponents(
*args, **kwargs
)
accepts_name = "name" in signature(obj_maker).parameters
manual_context = format_manual_instantiation(
obj_maker,
args_resolved,
kwargs_resolved,
name=name,
accepts_name=accepts_name,
module_name=module_name,
)
try:
if accepts_name:
obj_initialized = obj_maker(
*args_resolved,
name=name,
**kwargs_resolved,
)
else:
obj_initialized = obj_maker(
*args_resolved,
**kwargs_resolved,
)
except Exception as e:
append_manual_context(e, manual_context)
self.failed_items[name] = self.lazy_items.pop(name)
self.failed_items_excpetion[name] = e
self._initializing.pop(self._initializing.index(name))
raise
try:
self.initialized_items[name] = self.lazy_items.pop(name)
except KeyError:
self.initialized_items[name] = self.failed_items.pop(name)
self._initializing.pop(self._initializing.index(name))
# if name in self.initialisation_times_lazy.keys():
# self.initialisation_times_lazy[name] += time() - starttime
# else:
self.initialisation_times_lazy[name] = (
time() - self._initialisation_start_time[name]
)
if hasattr(obj_initialized, "alias"):
self._append(
obj_initialized,
name=name,
is_setting=True,
is_display="recursive",
call_obj=False,
)
if self.alias_namespace and hasattr(obj_initialized, "alias"):
for ta in obj_initialized.alias.get_all():
try:
self.alias_namespace.update(
ta["alias"], ta["channel"], ta["channeltype"]
)
except Exception as e:
print(f'could not init alias {ta["alias"]}')
print("error message", e)
# traceback.print_tb(e)
else:
self.names_without_alias.append(name)
return obj_initialized
obj_lazy = Proxy(init_local)
self.lazy_nodes[name] = LazyNode(
name=name,
factory=obj_factory,
args=args,
kwargs=kwargs,
module_name=module_name,
dependencies=self._collect_dependencies(*args, **kwargs),
)
obj_lazy = Proxy(partial(self._ensure_node, name))
self.lazy_items[name] = obj_lazy
if self.root_module:
sys.modules[self.root_module].__dict__[name] = obj_lazy
return obj_lazy
starttime = time()
args_resolved, kwargs_resolved = self._resolve_factory_inputs(
*args, lazy=False, **kwargs
)
if module_name:
obj_maker = getattr(import_module(module_name), obj_factory)
else:
starttime = time()
args, kwargs = replace_NamespaceComponents(*args, **kwargs)
if module_name:
obj_maker = getattr(import_module(module_name), obj_factory)
else:
obj_maker = obj_factory
try:
obj = obj_maker(*args, name=name, **kwargs)
except TypeError:
obj = obj_maker(*args, **kwargs)
self.initialized_items[name] = obj
self.initialisation_times_lazy[name] = time() - starttime
if self.root_module:
sys.modules[self.root_module].__dict__[name] = obj
if hasattr(obj, "alias"):
self._append(
obj,
name=name,
is_setting=True,
is_display="recursive",
call_obj=False,
)
if self.alias_namespace and hasattr(obj, "alias"):
for ta in obj.alias.get_all():
try:
self.alias_namespace.update(
ta["alias"], ta["channel"], ta["channeltype"]
)
except Exception as e:
print(f'could not init alias {ta["alias"]}')
print("error message", e)
else:
self.names_without_alias.append(name)
return obj
obj_maker = obj_factory
try:
obj = obj_maker(*args_resolved, name=name, **kwargs_resolved)
except TypeError:
obj = obj_maker(*args_resolved, **kwargs_resolved)
self.initialized_items[name] = obj
self.initialisation_times_lazy[name] = time() - starttime
if self.root_module:
sys.modules[self.root_module].__dict__[name] = obj
if hasattr(obj, "alias"):
self._append(
obj,
name=name,
is_setting=True,
is_display="recursive",
call_obj=False,
)
if self.alias_namespace and hasattr(obj, "alias"):
for ta in obj.alias.get_all():
try:
self.alias_namespace.update(
ta["alias"], ta["channel"], ta["channeltype"]
)
except Exception as e:
print(f'could not init alias {ta["alias"]}')
print("error message", e)
else:
self.names_without_alias.append(name)
return obj
def get_obj(self, name):
if name in self.lazy_names:
@@ -988,18 +1117,8 @@ class Namespace(Assembly):
def append_obj_from_config(self, cnf, lazy=False):
module_name, obj_factory = cnf["type"].split(":")
args = []
for targ in cnf["args"]:
if isinstance(targ, Component):
args.append(self.get_obj(targ.name))
else:
args.append(targ)
kwargs = {}
for tk, tv in cnf["kwargs"].items():
if isinstance(tv, Component):
kwargs[tk] = self.get_obj(tv.name)
else:
kwargs[tk] = tv
args = list(cnf["args"])
kwargs = dict(cnf["kwargs"])
if "lazy" in cnf.keys():
lazy = cnf["lazy"]
+31 -1
View File
@@ -1,6 +1,6 @@
import pytest
from eco.utilities.config import Namespace
from eco.utilities.config import Component, Namespace
class BadThing:
@@ -8,6 +8,16 @@ class BadThing:
raise ValueError("boom")
class DependencyThing:
def __init__(self, name=None):
self.name = name
class NeedsDependency:
def __init__(self, dependency, name=None):
self.dependency = dependency
def test_lazy_init_failure_includes_manual_instantiation_string():
ns = Namespace(name="test")
ns.append_obj(BadThing, 1, lazy=True, name="bad")
@@ -26,3 +36,23 @@ def test_lazy_init_failure_includes_manual_instantiation_string():
"BadThing(1, name='bad')"
)
assert ns.failed_items_excpetion["bad"].args[-1] == manual
def test_append_obj_from_config_resolves_component_dependencies_eagerly_for_non_lazy_nodes():
ns = Namespace(name="test")
ns.append_obj(DependencyThing, lazy=True, name="dependency")
ns.append_obj_from_config(
{
"type": f"{__name__}:NeedsDependency",
"name": "needs_dependency",
"args": [Component("dependency")],
"kwargs": {},
"lazy": False,
}
)
root = ns.initialized_items["needs_dependency"]
assert isinstance(root.dependency, DependencyThing)
assert root.dependency is not ns.get_obj("dependency")
assert root.dependency.name == "dependency"