1 Commits

Author SHA1 Message Date
gac bernina 32f33f9f9c test for better lazy init 2026-05-26 21:27:48 +02:00
2 changed files with 306 additions and 157 deletions
+203 -84
View File
@@ -19,7 +19,8 @@ from importlib import import_module
from lazy_object_proxy import Proxy as Proxy_orig from lazy_object_proxy import Proxy as Proxy_orig
from tabulate import tabulate from tabulate import tabulate
from concurrent.futures import ThreadPoolExecutor, as_completed from concurrent.futures import ThreadPoolExecutor, as_completed
from threading import Thread from dataclasses import dataclass, field
from threading import Event, Thread, local
from tqdm import tqdm from tqdm import tqdm
from rich import progress from rich import progress
from inspect import signature from inspect import signature
@@ -113,7 +114,7 @@ def format_manual_instantiation(
] ]
call_text = f"{obj_name}({', '.join(call_parts)})" call_text = f"{obj_name}({', '.join(call_parts)})"
return f"For manual instantiation copy/paste: {import_stmt}; {call_text}" return f"Manual instantiation attempt: {import_stmt}; {call_text}"
def append_manual_context(exc, manual_context): def append_manual_context(exc, manual_context):
@@ -176,7 +177,7 @@ def replaceComponent(inp, dict_all, config_all, lazy=False):
ind = [ta.name == tca["name"] for tca in config_all].index(True) ind = [ta.name == tca["name"] for tca in config_all].index(True)
outp.append( outp.append(
initFromConfigList( initFromConfigList(
config_list[ind : ind + 1], config_all, lazy=lazy config_all[ind : ind + 1], config_all, lazy=lazy
) )
) )
elif isinstance(ta, dict) or isinstance(ta, list): elif isinstance(ta, dict) or isinstance(ta, list):
@@ -190,9 +191,9 @@ def replaceComponent(inp, dict_all, config_all, lazy=False):
if ta.name in dict_all.keys(): if ta.name in dict_all.keys():
outp[tk] = dict_all[ta.name] outp[tk] = dict_all[ta.name]
else: else:
ind = [tk.name == tca["name"] for tca in config_all].index(True) ind = [ta.name == tca["name"] for tca in config_all].index(True)
outp[tk] = initFromConfigList( outp[tk] = initFromConfigList(
config_list[ind : ind + 1], config_all, lazy=lazy config_all[ind : ind + 1], config_all, lazy=lazy
) )
elif isinstance(ta, dict) or isinstance(ta, list): elif isinstance(ta, dict) or isinstance(ta, list):
outp[tk] = replaceComponent(ta, dict_all, config_all, lazy=lazy) outp[tk] = replaceComponent(ta, dict_all, config_all, lazy=lazy)
@@ -356,6 +357,21 @@ class IsInitialisingError(Exception):
pass pass
@dataclass
class LazyNode:
name: str
factory: object
args: tuple
kwargs: dict
module_name: str = None
dependencies: set = field(default_factory=set)
state: str = "pending"
result: object = None
exception: Exception = None
event: Event = field(default_factory=Event)
start_time: float = 0.0
class Namespace(Assembly): class Namespace(Assembly):
def __init__( def __init__(
self, self,
@@ -368,6 +384,7 @@ class Namespace(Assembly):
super().__init__(name) super().__init__(name)
# self.name = name # self.name = name
self.lazy_items = {} self.lazy_items = {}
self.lazy_nodes = {}
self.initialized_items = {} self.initialized_items = {}
self.failed_items = {} self.failed_items = {}
self.failed_items_excpetion = {} self.failed_items_excpetion = {}
@@ -378,6 +395,7 @@ class Namespace(Assembly):
self.names_without_alias = [] self.names_without_alias = []
self._initializing = [] self._initializing = []
self._thread_local = local()
self.root_module = root_module self.root_module = root_module
self.alias_namespace = alias_namespace self.alias_namespace = alias_namespace
if required_names_directory: if required_names_directory:
@@ -430,6 +448,10 @@ class Namespace(Assembly):
names = self.failed_names names = self.failed_names
for name in names: for name in names:
self.lazy_items[name] = self.failed_items.pop(name) self.lazy_items[name] = self.failed_items.pop(name)
self.lazy_nodes[name].state = "pending"
self.lazy_nodes[name].result = None
self.lazy_nodes[name].exception = None
self.lazy_nodes[name].event.clear()
try: try:
self.failed_items_excpetion.pop(name) self.failed_items_excpetion.pop(name)
except KeyError: except KeyError:
@@ -821,59 +843,134 @@ class Namespace(Assembly):
aliases_out = aliases aliases_out = aliases
return aliases, has_no_aliases return aliases, has_no_aliases
def append_obj( def _collect_dependencies_from_value(self, value):
self, if isinstance(value, NamespaceComponent):
obj_factory, return {value.obj_name}
*args, if isinstance(value, Component):
lazy=False, return {value.name}
name=None, if isinstance(value, list):
module_name=None, deps = set()
init_timeout=30, for item in value:
**kwargs, deps |= self._collect_dependencies_from_value(item)
): return deps
if isinstance(value, tuple):
deps = set()
for item in value:
deps |= self._collect_dependencies_from_value(item)
return deps
if isinstance(value, dict):
deps = set()
for item in value.values():
deps |= self._collect_dependencies_from_value(item)
return deps
return set()
def _collect_dependencies(self, *args, **kwargs):
dependencies = set()
for value in args:
dependencies |= self._collect_dependencies_from_value(value)
for value in kwargs.values():
dependencies |= self._collect_dependencies_from_value(value)
return dependencies
def _resolve_factory_inputs(self, *args, lazy=False, **kwargs):
resolved_args = []
for value in args:
resolved_args.append(self._resolve_factory_input(value, lazy=lazy))
resolved_kwargs = {}
for key, value in kwargs.items():
resolved_kwargs[key] = self._resolve_factory_input(value, lazy=lazy)
return resolved_args, resolved_kwargs
def _resolve_factory_input(self, value, lazy=False):
if isinstance(value, NamespaceComponent):
if lazy: if lazy:
return Proxy(lambda value=value: value.get())
return self._materialize_dependency(value.obj_name)
def init_local(): if isinstance(value, Component):
if lazy:
return Proxy(lambda name=value.name: self._get_ready_object(name))
return self._materialize_dependency(value.name)
if name in self.failed_names: if isinstance(value, list):
tmpexc = self.failed_items_excpetion return [self._resolve_factory_input(item, lazy=lazy) for item in value]
if isinstance(tmpexc[name], BaseException):
raise tmpexc[name]
else:
raise IsInitialisingError(
f"{name} failed previously to initialize."
)
if name in self._initializing: if isinstance(value, tuple):
self._init_priority[name] += 1 return tuple(self._resolve_factory_input(item, lazy=lazy) for item in value)
while name in self._initializing:
if (
time() - self._initialisation_start_time[name]
) <= init_timeout:
sleep(5)
else:
# print(f'{name} waiting init since {time()-self._initialisation_start_time[name]} s')
# sleep(5)
# # passfailed_items_excpetion
self._initializing.pop(self._initializing.index(name))
raise IsInitialisingError(
f"NB: {name} initialization timed out!"
)
else: if isinstance(value, dict):
self._initializing.append(name) return {
self._init_priority[name] = 0 key: self._resolve_factory_input(item, lazy=lazy)
self._initialisation_start_time[name] = time() for key, item in value.items()
}
# args, kwargs = replace_NamespaceComponents(*args, **kwargs) return value
if module_name: def _get_ready_object(self, name):
obj_maker = getattr(import_module(module_name), obj_factory) if name in self.initialized_names:
else: return self.initialized_items[name]
obj_maker = obj_factory if name in self.lazy_names:
return self._ensure_node(name)
raise Exception(f"Name {name} is not initialized!")
args_resolved, kwargs_resolved = replace_NamespaceComponents( def _materialize_dependency(self, name):
*args, **kwargs if name in self.initialized_names:
return self.initialized_items[name]
if name not in self.lazy_names:
raise Exception(f"Name {name} is not initialized!")
proxy = self.lazy_items.get(name)
obj = self._ensure_node(name)
if proxy is not None:
self.lazy_items[name] = proxy
return obj
def _ensure_node(self, name):
node = self.lazy_nodes[name]
if node.state == "done":
return node.result
if node.state == "failed":
raise node.exception
if node.state == "pending":
node.state = "running"
node.start_time = time()
node.event.clear()
worker = Thread(target=self._run_node, args=(name,), daemon=True)
worker.start()
node.event.wait()
if node.state == "done":
return node.result
if node.state == "failed":
raise node.exception
raise RuntimeError(f"Lazy node {name} did not complete cleanly")
def _run_node(self, name):
node = self.lazy_nodes[name]
stack = getattr(self._thread_local, "stack", [])
try:
if name in stack:
raise RuntimeError(f"Cyclic lazy dependency detected for {name}")
stack.append(name)
self._thread_local.stack = stack
for dep_name in node.dependencies:
self._ensure_node(dep_name)
obj_maker = node.factory
if node.module_name:
obj_maker = getattr(import_module(node.module_name), node.factory)
args_resolved, kwargs_resolved = self._resolve_factory_inputs(
*node.args, lazy=True, **node.kwargs
) )
accepts_name = "name" in signature(obj_maker).parameters accepts_name = "name" in signature(obj_maker).parameters
manual_context = format_manual_instantiation( manual_context = format_manual_instantiation(
@@ -882,7 +979,7 @@ class Namespace(Assembly):
kwargs_resolved, kwargs_resolved,
name=name, name=name,
accepts_name=accepts_name, accepts_name=accepts_name,
module_name=module_name, module_name=node.module_name,
) )
try: try:
if accepts_name: if accepts_name:
@@ -892,28 +989,26 @@ class Namespace(Assembly):
**kwargs_resolved, **kwargs_resolved,
) )
else: else:
obj_initialized = obj_maker( obj_initialized = obj_maker(*args_resolved, **kwargs_resolved)
*args_resolved,
**kwargs_resolved,
)
except Exception as e: except Exception as e:
append_manual_context(e, manual_context) append_manual_context(e, manual_context)
self.failed_items[name] = self.lazy_items.pop(name) self.failed_items[name] = self.lazy_items.pop(name)
self.failed_items_excpetion[name] = e self.failed_items_excpetion[name] = e
self._initializing.pop(self._initializing.index(name)) node.state = "failed"
raise node.exception = e
node.result = None
return
self.initialized_items[name] = obj_initialized
self.initialisation_times_lazy[name] = time() - node.start_time
try: try:
self.initialized_items[name] = self.lazy_items.pop(name) self.lazy_items.pop(name)
except KeyError: except KeyError:
self.initialized_items[name] = self.failed_items.pop(name) pass
self._initializing.pop(self._initializing.index(name)) node.state = "done"
# if name in self.initialisation_times_lazy.keys(): node.result = obj_initialized
# self.initialisation_times_lazy[name] += time() - starttime node.exception = None
# else:
self.initialisation_times_lazy[name] = (
time() - self._initialisation_start_time[name]
)
if hasattr(obj_initialized, "alias"): if hasattr(obj_initialized, "alias"):
self._append( self._append(
obj_initialized, obj_initialized,
@@ -931,28 +1026,62 @@ class Namespace(Assembly):
except Exception as e: except Exception as e:
print(f'could not init alias {ta["alias"]}') print(f'could not init alias {ta["alias"]}')
print("error message", e) print("error message", e)
# traceback.print_tb(e)
else: else:
self.names_without_alias.append(name) self.names_without_alias.append(name)
return obj_initialized return obj_initialized
except Exception as e:
node.state = "failed"
node.exception = e
node.result = None
return
finally:
node.event.set()
stack.pop()
self._thread_local.stack = stack
obj_lazy = Proxy(init_local) def append_obj(
self,
obj_factory,
*args,
lazy=False,
name=None,
module_name=None,
init_timeout=30,
**kwargs,
):
self.failed_items.pop(name, None)
self.failed_items_excpetion.pop(name, None)
self.initialized_items.pop(name, None)
self.lazy_items.pop(name, None)
self.lazy_nodes.pop(name, None)
if lazy:
self.lazy_nodes[name] = LazyNode(
name=name,
factory=obj_factory,
args=args,
kwargs=kwargs,
module_name=module_name,
dependencies=self._collect_dependencies(*args, **kwargs),
)
obj_lazy = Proxy(partial(self._ensure_node, name))
self.lazy_items[name] = obj_lazy self.lazy_items[name] = obj_lazy
if self.root_module: if self.root_module:
sys.modules[self.root_module].__dict__[name] = obj_lazy sys.modules[self.root_module].__dict__[name] = obj_lazy
return obj_lazy return obj_lazy
else:
starttime = time() starttime = time()
args, kwargs = replace_NamespaceComponents(*args, **kwargs) args_resolved, kwargs_resolved = self._resolve_factory_inputs(
*args, lazy=False, **kwargs
)
if module_name: if module_name:
obj_maker = getattr(import_module(module_name), obj_factory) obj_maker = getattr(import_module(module_name), obj_factory)
else: else:
obj_maker = obj_factory obj_maker = obj_factory
try: try:
obj = obj_maker(*args, name=name, **kwargs) obj = obj_maker(*args_resolved, name=name, **kwargs_resolved)
except TypeError: except TypeError:
obj = obj_maker(*args, **kwargs) obj = obj_maker(*args_resolved, **kwargs_resolved)
self.initialized_items[name] = obj self.initialized_items[name] = obj
self.initialisation_times_lazy[name] = time() - starttime self.initialisation_times_lazy[name] = time() - starttime
if self.root_module: if self.root_module:
@@ -988,18 +1117,8 @@ class Namespace(Assembly):
def append_obj_from_config(self, cnf, lazy=False): def append_obj_from_config(self, cnf, lazy=False):
module_name, obj_factory = cnf["type"].split(":") module_name, obj_factory = cnf["type"].split(":")
args = [] args = list(cnf["args"])
for targ in cnf["args"]: kwargs = dict(cnf["kwargs"])
if isinstance(targ, Component):
args.append(self.get_obj(targ.name))
else:
args.append(targ)
kwargs = {}
for tk, tv in cnf["kwargs"].items():
if isinstance(tv, Component):
kwargs[tk] = self.get_obj(tv.name)
else:
kwargs[tk] = tv
if "lazy" in cnf.keys(): if "lazy" in cnf.keys():
lazy = cnf["lazy"] lazy = cnf["lazy"]
+31 -1
View File
@@ -1,6 +1,6 @@
import pytest import pytest
from eco.utilities.config import Namespace from eco.utilities.config import Component, Namespace
class BadThing: class BadThing:
@@ -8,6 +8,16 @@ class BadThing:
raise ValueError("boom") raise ValueError("boom")
class DependencyThing:
def __init__(self, name=None):
self.name = name
class NeedsDependency:
def __init__(self, dependency, name=None):
self.dependency = dependency
def test_lazy_init_failure_includes_manual_instantiation_string(): def test_lazy_init_failure_includes_manual_instantiation_string():
ns = Namespace(name="test") ns = Namespace(name="test")
ns.append_obj(BadThing, 1, lazy=True, name="bad") ns.append_obj(BadThing, 1, lazy=True, name="bad")
@@ -26,3 +36,23 @@ def test_lazy_init_failure_includes_manual_instantiation_string():
"BadThing(1, name='bad')" "BadThing(1, name='bad')"
) )
assert ns.failed_items_excpetion["bad"].args[-1] == manual assert ns.failed_items_excpetion["bad"].args[-1] == manual
def test_append_obj_from_config_resolves_component_dependencies_eagerly_for_non_lazy_nodes():
ns = Namespace(name="test")
ns.append_obj(DependencyThing, lazy=True, name="dependency")
ns.append_obj_from_config(
{
"type": f"{__name__}:NeedsDependency",
"name": "needs_dependency",
"args": [Component("dependency")],
"kwargs": {},
"lazy": False,
}
)
root = ns.initialized_items["needs_dependency"]
assert isinstance(root.dependency, DependencyThing)
assert root.dependency is not ns.get_obj("dependency")
assert root.dependency.name == "dependency"