1 Commits

Author SHA1 Message Date
gac bernina 32f33f9f9c test for better lazy init 2026-05-26 21:27:48 +02:00
2 changed files with 306 additions and 157 deletions
+275 -156
View File
@@ -19,7 +19,8 @@ from importlib import import_module
from lazy_object_proxy import Proxy as Proxy_orig from lazy_object_proxy import Proxy as Proxy_orig
from tabulate import tabulate from tabulate import tabulate
from concurrent.futures import ThreadPoolExecutor, as_completed from concurrent.futures import ThreadPoolExecutor, as_completed
from threading import Thread from dataclasses import dataclass, field
from threading import Event, Thread, local
from tqdm import tqdm from tqdm import tqdm
from rich import progress from rich import progress
from inspect import signature from inspect import signature
@@ -113,7 +114,7 @@ def format_manual_instantiation(
] ]
call_text = f"{obj_name}({', '.join(call_parts)})" call_text = f"{obj_name}({', '.join(call_parts)})"
return f"For manual instantiation copy/paste: {import_stmt}; {call_text}" return f"Manual instantiation attempt: {import_stmt}; {call_text}"
def append_manual_context(exc, manual_context): def append_manual_context(exc, manual_context):
@@ -176,7 +177,7 @@ def replaceComponent(inp, dict_all, config_all, lazy=False):
ind = [ta.name == tca["name"] for tca in config_all].index(True) ind = [ta.name == tca["name"] for tca in config_all].index(True)
outp.append( outp.append(
initFromConfigList( initFromConfigList(
config_list[ind : ind + 1], config_all, lazy=lazy config_all[ind : ind + 1], config_all, lazy=lazy
) )
) )
elif isinstance(ta, dict) or isinstance(ta, list): elif isinstance(ta, dict) or isinstance(ta, list):
@@ -190,9 +191,9 @@ def replaceComponent(inp, dict_all, config_all, lazy=False):
if ta.name in dict_all.keys(): if ta.name in dict_all.keys():
outp[tk] = dict_all[ta.name] outp[tk] = dict_all[ta.name]
else: else:
ind = [tk.name == tca["name"] for tca in config_all].index(True) ind = [ta.name == tca["name"] for tca in config_all].index(True)
outp[tk] = initFromConfigList( outp[tk] = initFromConfigList(
config_list[ind : ind + 1], config_all, lazy=lazy config_all[ind : ind + 1], config_all, lazy=lazy
) )
elif isinstance(ta, dict) or isinstance(ta, list): elif isinstance(ta, dict) or isinstance(ta, list):
outp[tk] = replaceComponent(ta, dict_all, config_all, lazy=lazy) outp[tk] = replaceComponent(ta, dict_all, config_all, lazy=lazy)
@@ -356,6 +357,21 @@ class IsInitialisingError(Exception):
pass pass
@dataclass
class LazyNode:
name: str
factory: object
args: tuple
kwargs: dict
module_name: str = None
dependencies: set = field(default_factory=set)
state: str = "pending"
result: object = None
exception: Exception = None
event: Event = field(default_factory=Event)
start_time: float = 0.0
class Namespace(Assembly): class Namespace(Assembly):
def __init__( def __init__(
self, self,
@@ -368,6 +384,7 @@ class Namespace(Assembly):
super().__init__(name) super().__init__(name)
# self.name = name # self.name = name
self.lazy_items = {} self.lazy_items = {}
self.lazy_nodes = {}
self.initialized_items = {} self.initialized_items = {}
self.failed_items = {} self.failed_items = {}
self.failed_items_excpetion = {} self.failed_items_excpetion = {}
@@ -378,6 +395,7 @@ class Namespace(Assembly):
self.names_without_alias = [] self.names_without_alias = []
self._initializing = [] self._initializing = []
self._thread_local = local()
self.root_module = root_module self.root_module = root_module
self.alias_namespace = alias_namespace self.alias_namespace = alias_namespace
if required_names_directory: if required_names_directory:
@@ -430,6 +448,10 @@ class Namespace(Assembly):
names = self.failed_names names = self.failed_names
for name in names: for name in names:
self.lazy_items[name] = self.failed_items.pop(name) self.lazy_items[name] = self.failed_items.pop(name)
self.lazy_nodes[name].state = "pending"
self.lazy_nodes[name].result = None
self.lazy_nodes[name].exception = None
self.lazy_nodes[name].event.clear()
try: try:
self.failed_items_excpetion.pop(name) self.failed_items_excpetion.pop(name)
except KeyError: except KeyError:
@@ -821,6 +843,202 @@ class Namespace(Assembly):
aliases_out = aliases aliases_out = aliases
return aliases, has_no_aliases return aliases, has_no_aliases
def _collect_dependencies_from_value(self, value):
if isinstance(value, NamespaceComponent):
return {value.obj_name}
if isinstance(value, Component):
return {value.name}
if isinstance(value, list):
deps = set()
for item in value:
deps |= self._collect_dependencies_from_value(item)
return deps
if isinstance(value, tuple):
deps = set()
for item in value:
deps |= self._collect_dependencies_from_value(item)
return deps
if isinstance(value, dict):
deps = set()
for item in value.values():
deps |= self._collect_dependencies_from_value(item)
return deps
return set()
def _collect_dependencies(self, *args, **kwargs):
dependencies = set()
for value in args:
dependencies |= self._collect_dependencies_from_value(value)
for value in kwargs.values():
dependencies |= self._collect_dependencies_from_value(value)
return dependencies
def _resolve_factory_inputs(self, *args, lazy=False, **kwargs):
resolved_args = []
for value in args:
resolved_args.append(self._resolve_factory_input(value, lazy=lazy))
resolved_kwargs = {}
for key, value in kwargs.items():
resolved_kwargs[key] = self._resolve_factory_input(value, lazy=lazy)
return resolved_args, resolved_kwargs
def _resolve_factory_input(self, value, lazy=False):
if isinstance(value, NamespaceComponent):
if lazy:
return Proxy(lambda value=value: value.get())
return self._materialize_dependency(value.obj_name)
if isinstance(value, Component):
if lazy:
return Proxy(lambda name=value.name: self._get_ready_object(name))
return self._materialize_dependency(value.name)
if isinstance(value, list):
return [self._resolve_factory_input(item, lazy=lazy) for item in value]
if isinstance(value, tuple):
return tuple(self._resolve_factory_input(item, lazy=lazy) for item in value)
if isinstance(value, dict):
return {
key: self._resolve_factory_input(item, lazy=lazy)
for key, item in value.items()
}
return value
def _get_ready_object(self, name):
if name in self.initialized_names:
return self.initialized_items[name]
if name in self.lazy_names:
return self._ensure_node(name)
raise Exception(f"Name {name} is not initialized!")
def _materialize_dependency(self, name):
if name in self.initialized_names:
return self.initialized_items[name]
if name not in self.lazy_names:
raise Exception(f"Name {name} is not initialized!")
proxy = self.lazy_items.get(name)
obj = self._ensure_node(name)
if proxy is not None:
self.lazy_items[name] = proxy
return obj
def _ensure_node(self, name):
node = self.lazy_nodes[name]
if node.state == "done":
return node.result
if node.state == "failed":
raise node.exception
if node.state == "pending":
node.state = "running"
node.start_time = time()
node.event.clear()
worker = Thread(target=self._run_node, args=(name,), daemon=True)
worker.start()
node.event.wait()
if node.state == "done":
return node.result
if node.state == "failed":
raise node.exception
raise RuntimeError(f"Lazy node {name} did not complete cleanly")
def _run_node(self, name):
node = self.lazy_nodes[name]
stack = getattr(self._thread_local, "stack", [])
try:
if name in stack:
raise RuntimeError(f"Cyclic lazy dependency detected for {name}")
stack.append(name)
self._thread_local.stack = stack
for dep_name in node.dependencies:
self._ensure_node(dep_name)
obj_maker = node.factory
if node.module_name:
obj_maker = getattr(import_module(node.module_name), node.factory)
args_resolved, kwargs_resolved = self._resolve_factory_inputs(
*node.args, lazy=True, **node.kwargs
)
accepts_name = "name" in signature(obj_maker).parameters
manual_context = format_manual_instantiation(
obj_maker,
args_resolved,
kwargs_resolved,
name=name,
accepts_name=accepts_name,
module_name=node.module_name,
)
try:
if accepts_name:
obj_initialized = obj_maker(
*args_resolved,
name=name,
**kwargs_resolved,
)
else:
obj_initialized = obj_maker(*args_resolved, **kwargs_resolved)
except Exception as e:
append_manual_context(e, manual_context)
self.failed_items[name] = self.lazy_items.pop(name)
self.failed_items_excpetion[name] = e
node.state = "failed"
node.exception = e
node.result = None
return
self.initialized_items[name] = obj_initialized
self.initialisation_times_lazy[name] = time() - node.start_time
try:
self.lazy_items.pop(name)
except KeyError:
pass
node.state = "done"
node.result = obj_initialized
node.exception = None
if hasattr(obj_initialized, "alias"):
self._append(
obj_initialized,
name=name,
is_setting=True,
is_display="recursive",
call_obj=False,
)
if self.alias_namespace and hasattr(obj_initialized, "alias"):
for ta in obj_initialized.alias.get_all():
try:
self.alias_namespace.update(
ta["alias"], ta["channel"], ta["channeltype"]
)
except Exception as e:
print(f'could not init alias {ta["alias"]}')
print("error message", e)
else:
self.names_without_alias.append(name)
return obj_initialized
except Exception as e:
node.state = "failed"
node.exception = e
node.result = None
return
finally:
node.event.set()
stack.pop()
self._thread_local.stack = stack
def append_obj( def append_obj(
self, self,
obj_factory, obj_factory,
@@ -831,152 +1049,63 @@ class Namespace(Assembly):
init_timeout=30, init_timeout=30,
**kwargs, **kwargs,
): ):
self.failed_items.pop(name, None)
self.failed_items_excpetion.pop(name, None)
self.initialized_items.pop(name, None)
self.lazy_items.pop(name, None)
self.lazy_nodes.pop(name, None)
if lazy: if lazy:
self.lazy_nodes[name] = LazyNode(
def init_local(): name=name,
factory=obj_factory,
if name in self.failed_names: args=args,
tmpexc = self.failed_items_excpetion kwargs=kwargs,
if isinstance(tmpexc[name], BaseException): module_name=module_name,
raise tmpexc[name] dependencies=self._collect_dependencies(*args, **kwargs),
else: )
raise IsInitialisingError( obj_lazy = Proxy(partial(self._ensure_node, name))
f"{name} failed previously to initialize."
)
if name in self._initializing:
self._init_priority[name] += 1
while name in self._initializing:
if (
time() - self._initialisation_start_time[name]
) <= init_timeout:
sleep(5)
else:
# print(f'{name} waiting init since {time()-self._initialisation_start_time[name]} s')
# sleep(5)
# # passfailed_items_excpetion
self._initializing.pop(self._initializing.index(name))
raise IsInitialisingError(
f"NB: {name} initialization timed out!"
)
else:
self._initializing.append(name)
self._init_priority[name] = 0
self._initialisation_start_time[name] = time()
# args, kwargs = replace_NamespaceComponents(*args, **kwargs)
if module_name:
obj_maker = getattr(import_module(module_name), obj_factory)
else:
obj_maker = obj_factory
args_resolved, kwargs_resolved = replace_NamespaceComponents(
*args, **kwargs
)
accepts_name = "name" in signature(obj_maker).parameters
manual_context = format_manual_instantiation(
obj_maker,
args_resolved,
kwargs_resolved,
name=name,
accepts_name=accepts_name,
module_name=module_name,
)
try:
if accepts_name:
obj_initialized = obj_maker(
*args_resolved,
name=name,
**kwargs_resolved,
)
else:
obj_initialized = obj_maker(
*args_resolved,
**kwargs_resolved,
)
except Exception as e:
append_manual_context(e, manual_context)
self.failed_items[name] = self.lazy_items.pop(name)
self.failed_items_excpetion[name] = e
self._initializing.pop(self._initializing.index(name))
raise
try:
self.initialized_items[name] = self.lazy_items.pop(name)
except KeyError:
self.initialized_items[name] = self.failed_items.pop(name)
self._initializing.pop(self._initializing.index(name))
# if name in self.initialisation_times_lazy.keys():
# self.initialisation_times_lazy[name] += time() - starttime
# else:
self.initialisation_times_lazy[name] = (
time() - self._initialisation_start_time[name]
)
if hasattr(obj_initialized, "alias"):
self._append(
obj_initialized,
name=name,
is_setting=True,
is_display="recursive",
call_obj=False,
)
if self.alias_namespace and hasattr(obj_initialized, "alias"):
for ta in obj_initialized.alias.get_all():
try:
self.alias_namespace.update(
ta["alias"], ta["channel"], ta["channeltype"]
)
except Exception as e:
print(f'could not init alias {ta["alias"]}')
print("error message", e)
# traceback.print_tb(e)
else:
self.names_without_alias.append(name)
return obj_initialized
obj_lazy = Proxy(init_local)
self.lazy_items[name] = obj_lazy self.lazy_items[name] = obj_lazy
if self.root_module: if self.root_module:
sys.modules[self.root_module].__dict__[name] = obj_lazy sys.modules[self.root_module].__dict__[name] = obj_lazy
return obj_lazy return obj_lazy
starttime = time()
args_resolved, kwargs_resolved = self._resolve_factory_inputs(
*args, lazy=False, **kwargs
)
if module_name:
obj_maker = getattr(import_module(module_name), obj_factory)
else: else:
starttime = time() obj_maker = obj_factory
args, kwargs = replace_NamespaceComponents(*args, **kwargs) try:
if module_name: obj = obj_maker(*args_resolved, name=name, **kwargs_resolved)
obj_maker = getattr(import_module(module_name), obj_factory) except TypeError:
else: obj = obj_maker(*args_resolved, **kwargs_resolved)
obj_maker = obj_factory self.initialized_items[name] = obj
try: self.initialisation_times_lazy[name] = time() - starttime
obj = obj_maker(*args, name=name, **kwargs) if self.root_module:
except TypeError: sys.modules[self.root_module].__dict__[name] = obj
obj = obj_maker(*args, **kwargs) if hasattr(obj, "alias"):
self.initialized_items[name] = obj self._append(
self.initialisation_times_lazy[name] = time() - starttime obj,
if self.root_module: name=name,
sys.modules[self.root_module].__dict__[name] = obj is_setting=True,
if hasattr(obj, "alias"): is_display="recursive",
self._append( call_obj=False,
obj, )
name=name, if self.alias_namespace and hasattr(obj, "alias"):
is_setting=True, for ta in obj.alias.get_all():
is_display="recursive", try:
call_obj=False, self.alias_namespace.update(
) ta["alias"], ta["channel"], ta["channeltype"]
if self.alias_namespace and hasattr(obj, "alias"): )
for ta in obj.alias.get_all(): except Exception as e:
try: print(f'could not init alias {ta["alias"]}')
self.alias_namespace.update( print("error message", e)
ta["alias"], ta["channel"], ta["channeltype"] else:
) self.names_without_alias.append(name)
except Exception as e: return obj
print(f'could not init alias {ta["alias"]}')
print("error message", e)
else:
self.names_without_alias.append(name)
return obj
def get_obj(self, name): def get_obj(self, name):
if name in self.lazy_names: if name in self.lazy_names:
@@ -988,18 +1117,8 @@ class Namespace(Assembly):
def append_obj_from_config(self, cnf, lazy=False): def append_obj_from_config(self, cnf, lazy=False):
module_name, obj_factory = cnf["type"].split(":") module_name, obj_factory = cnf["type"].split(":")
args = [] args = list(cnf["args"])
for targ in cnf["args"]: kwargs = dict(cnf["kwargs"])
if isinstance(targ, Component):
args.append(self.get_obj(targ.name))
else:
args.append(targ)
kwargs = {}
for tk, tv in cnf["kwargs"].items():
if isinstance(tv, Component):
kwargs[tk] = self.get_obj(tv.name)
else:
kwargs[tk] = tv
if "lazy" in cnf.keys(): if "lazy" in cnf.keys():
lazy = cnf["lazy"] lazy = cnf["lazy"]
+31 -1
View File
@@ -1,6 +1,6 @@
import pytest import pytest
from eco.utilities.config import Namespace from eco.utilities.config import Component, Namespace
class BadThing: class BadThing:
@@ -8,6 +8,16 @@ class BadThing:
raise ValueError("boom") raise ValueError("boom")
class DependencyThing:
def __init__(self, name=None):
self.name = name
class NeedsDependency:
def __init__(self, dependency, name=None):
self.dependency = dependency
def test_lazy_init_failure_includes_manual_instantiation_string(): def test_lazy_init_failure_includes_manual_instantiation_string():
ns = Namespace(name="test") ns = Namespace(name="test")
ns.append_obj(BadThing, 1, lazy=True, name="bad") ns.append_obj(BadThing, 1, lazy=True, name="bad")
@@ -26,3 +36,23 @@ def test_lazy_init_failure_includes_manual_instantiation_string():
"BadThing(1, name='bad')" "BadThing(1, name='bad')"
) )
assert ns.failed_items_excpetion["bad"].args[-1] == manual assert ns.failed_items_excpetion["bad"].args[-1] == manual
def test_append_obj_from_config_resolves_component_dependencies_eagerly_for_non_lazy_nodes():
ns = Namespace(name="test")
ns.append_obj(DependencyThing, lazy=True, name="dependency")
ns.append_obj_from_config(
{
"type": f"{__name__}:NeedsDependency",
"name": "needs_dependency",
"args": [Component("dependency")],
"kwargs": {},
"lazy": False,
}
)
root = ns.initialized_items["needs_dependency"]
assert isinstance(root.dependency, DependencyThing)
assert root.dependency is not ns.get_obj("dependency")
assert root.dependency.name == "dependency"