Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 32f33f9f9c |
+275
-156
@@ -19,7 +19,8 @@ from importlib import import_module
|
||||
from lazy_object_proxy import Proxy as Proxy_orig
|
||||
from tabulate import tabulate
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
from threading import Thread
|
||||
from dataclasses import dataclass, field
|
||||
from threading import Event, Thread, local
|
||||
from tqdm import tqdm
|
||||
from rich import progress
|
||||
from inspect import signature
|
||||
@@ -113,7 +114,7 @@ def format_manual_instantiation(
|
||||
]
|
||||
call_text = f"{obj_name}({', '.join(call_parts)})"
|
||||
|
||||
return f"For manual instantiation copy/paste: {import_stmt}; {call_text}"
|
||||
return f"Manual instantiation attempt: {import_stmt}; {call_text}"
|
||||
|
||||
|
||||
def append_manual_context(exc, manual_context):
|
||||
@@ -176,7 +177,7 @@ def replaceComponent(inp, dict_all, config_all, lazy=False):
|
||||
ind = [ta.name == tca["name"] for tca in config_all].index(True)
|
||||
outp.append(
|
||||
initFromConfigList(
|
||||
config_list[ind : ind + 1], config_all, lazy=lazy
|
||||
config_all[ind : ind + 1], config_all, lazy=lazy
|
||||
)
|
||||
)
|
||||
elif isinstance(ta, dict) or isinstance(ta, list):
|
||||
@@ -190,9 +191,9 @@ def replaceComponent(inp, dict_all, config_all, lazy=False):
|
||||
if ta.name in dict_all.keys():
|
||||
outp[tk] = dict_all[ta.name]
|
||||
else:
|
||||
ind = [tk.name == tca["name"] for tca in config_all].index(True)
|
||||
ind = [ta.name == tca["name"] for tca in config_all].index(True)
|
||||
outp[tk] = initFromConfigList(
|
||||
config_list[ind : ind + 1], config_all, lazy=lazy
|
||||
config_all[ind : ind + 1], config_all, lazy=lazy
|
||||
)
|
||||
elif isinstance(ta, dict) or isinstance(ta, list):
|
||||
outp[tk] = replaceComponent(ta, dict_all, config_all, lazy=lazy)
|
||||
@@ -356,6 +357,21 @@ class IsInitialisingError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
@dataclass
|
||||
class LazyNode:
|
||||
name: str
|
||||
factory: object
|
||||
args: tuple
|
||||
kwargs: dict
|
||||
module_name: str = None
|
||||
dependencies: set = field(default_factory=set)
|
||||
state: str = "pending"
|
||||
result: object = None
|
||||
exception: Exception = None
|
||||
event: Event = field(default_factory=Event)
|
||||
start_time: float = 0.0
|
||||
|
||||
|
||||
class Namespace(Assembly):
|
||||
def __init__(
|
||||
self,
|
||||
@@ -368,6 +384,7 @@ class Namespace(Assembly):
|
||||
super().__init__(name)
|
||||
# self.name = name
|
||||
self.lazy_items = {}
|
||||
self.lazy_nodes = {}
|
||||
self.initialized_items = {}
|
||||
self.failed_items = {}
|
||||
self.failed_items_excpetion = {}
|
||||
@@ -378,6 +395,7 @@ class Namespace(Assembly):
|
||||
|
||||
self.names_without_alias = []
|
||||
self._initializing = []
|
||||
self._thread_local = local()
|
||||
self.root_module = root_module
|
||||
self.alias_namespace = alias_namespace
|
||||
if required_names_directory:
|
||||
@@ -430,6 +448,10 @@ class Namespace(Assembly):
|
||||
names = self.failed_names
|
||||
for name in names:
|
||||
self.lazy_items[name] = self.failed_items.pop(name)
|
||||
self.lazy_nodes[name].state = "pending"
|
||||
self.lazy_nodes[name].result = None
|
||||
self.lazy_nodes[name].exception = None
|
||||
self.lazy_nodes[name].event.clear()
|
||||
try:
|
||||
self.failed_items_excpetion.pop(name)
|
||||
except KeyError:
|
||||
@@ -821,6 +843,202 @@ class Namespace(Assembly):
|
||||
aliases_out = aliases
|
||||
return aliases, has_no_aliases
|
||||
|
||||
def _collect_dependencies_from_value(self, value):
|
||||
if isinstance(value, NamespaceComponent):
|
||||
return {value.obj_name}
|
||||
if isinstance(value, Component):
|
||||
return {value.name}
|
||||
if isinstance(value, list):
|
||||
deps = set()
|
||||
for item in value:
|
||||
deps |= self._collect_dependencies_from_value(item)
|
||||
return deps
|
||||
if isinstance(value, tuple):
|
||||
deps = set()
|
||||
for item in value:
|
||||
deps |= self._collect_dependencies_from_value(item)
|
||||
return deps
|
||||
if isinstance(value, dict):
|
||||
deps = set()
|
||||
for item in value.values():
|
||||
deps |= self._collect_dependencies_from_value(item)
|
||||
return deps
|
||||
return set()
|
||||
|
||||
def _collect_dependencies(self, *args, **kwargs):
|
||||
dependencies = set()
|
||||
for value in args:
|
||||
dependencies |= self._collect_dependencies_from_value(value)
|
||||
for value in kwargs.values():
|
||||
dependencies |= self._collect_dependencies_from_value(value)
|
||||
return dependencies
|
||||
|
||||
def _resolve_factory_inputs(self, *args, lazy=False, **kwargs):
|
||||
resolved_args = []
|
||||
for value in args:
|
||||
resolved_args.append(self._resolve_factory_input(value, lazy=lazy))
|
||||
|
||||
resolved_kwargs = {}
|
||||
for key, value in kwargs.items():
|
||||
resolved_kwargs[key] = self._resolve_factory_input(value, lazy=lazy)
|
||||
|
||||
return resolved_args, resolved_kwargs
|
||||
|
||||
def _resolve_factory_input(self, value, lazy=False):
|
||||
if isinstance(value, NamespaceComponent):
|
||||
if lazy:
|
||||
return Proxy(lambda value=value: value.get())
|
||||
return self._materialize_dependency(value.obj_name)
|
||||
|
||||
if isinstance(value, Component):
|
||||
if lazy:
|
||||
return Proxy(lambda name=value.name: self._get_ready_object(name))
|
||||
return self._materialize_dependency(value.name)
|
||||
|
||||
if isinstance(value, list):
|
||||
return [self._resolve_factory_input(item, lazy=lazy) for item in value]
|
||||
|
||||
if isinstance(value, tuple):
|
||||
return tuple(self._resolve_factory_input(item, lazy=lazy) for item in value)
|
||||
|
||||
if isinstance(value, dict):
|
||||
return {
|
||||
key: self._resolve_factory_input(item, lazy=lazy)
|
||||
for key, item in value.items()
|
||||
}
|
||||
|
||||
return value
|
||||
|
||||
def _get_ready_object(self, name):
|
||||
if name in self.initialized_names:
|
||||
return self.initialized_items[name]
|
||||
if name in self.lazy_names:
|
||||
return self._ensure_node(name)
|
||||
raise Exception(f"Name {name} is not initialized!")
|
||||
|
||||
def _materialize_dependency(self, name):
|
||||
if name in self.initialized_names:
|
||||
return self.initialized_items[name]
|
||||
|
||||
if name not in self.lazy_names:
|
||||
raise Exception(f"Name {name} is not initialized!")
|
||||
|
||||
proxy = self.lazy_items.get(name)
|
||||
obj = self._ensure_node(name)
|
||||
if proxy is not None:
|
||||
self.lazy_items[name] = proxy
|
||||
return obj
|
||||
|
||||
def _ensure_node(self, name):
|
||||
node = self.lazy_nodes[name]
|
||||
if node.state == "done":
|
||||
return node.result
|
||||
if node.state == "failed":
|
||||
raise node.exception
|
||||
|
||||
if node.state == "pending":
|
||||
node.state = "running"
|
||||
node.start_time = time()
|
||||
node.event.clear()
|
||||
worker = Thread(target=self._run_node, args=(name,), daemon=True)
|
||||
worker.start()
|
||||
|
||||
node.event.wait()
|
||||
|
||||
if node.state == "done":
|
||||
return node.result
|
||||
if node.state == "failed":
|
||||
raise node.exception
|
||||
|
||||
raise RuntimeError(f"Lazy node {name} did not complete cleanly")
|
||||
|
||||
def _run_node(self, name):
|
||||
node = self.lazy_nodes[name]
|
||||
stack = getattr(self._thread_local, "stack", [])
|
||||
try:
|
||||
if name in stack:
|
||||
raise RuntimeError(f"Cyclic lazy dependency detected for {name}")
|
||||
|
||||
stack.append(name)
|
||||
self._thread_local.stack = stack
|
||||
|
||||
for dep_name in node.dependencies:
|
||||
self._ensure_node(dep_name)
|
||||
|
||||
obj_maker = node.factory
|
||||
if node.module_name:
|
||||
obj_maker = getattr(import_module(node.module_name), node.factory)
|
||||
|
||||
args_resolved, kwargs_resolved = self._resolve_factory_inputs(
|
||||
*node.args, lazy=True, **node.kwargs
|
||||
)
|
||||
accepts_name = "name" in signature(obj_maker).parameters
|
||||
manual_context = format_manual_instantiation(
|
||||
obj_maker,
|
||||
args_resolved,
|
||||
kwargs_resolved,
|
||||
name=name,
|
||||
accepts_name=accepts_name,
|
||||
module_name=node.module_name,
|
||||
)
|
||||
try:
|
||||
if accepts_name:
|
||||
obj_initialized = obj_maker(
|
||||
*args_resolved,
|
||||
name=name,
|
||||
**kwargs_resolved,
|
||||
)
|
||||
else:
|
||||
obj_initialized = obj_maker(*args_resolved, **kwargs_resolved)
|
||||
except Exception as e:
|
||||
append_manual_context(e, manual_context)
|
||||
self.failed_items[name] = self.lazy_items.pop(name)
|
||||
self.failed_items_excpetion[name] = e
|
||||
node.state = "failed"
|
||||
node.exception = e
|
||||
node.result = None
|
||||
return
|
||||
|
||||
self.initialized_items[name] = obj_initialized
|
||||
self.initialisation_times_lazy[name] = time() - node.start_time
|
||||
try:
|
||||
self.lazy_items.pop(name)
|
||||
except KeyError:
|
||||
pass
|
||||
node.state = "done"
|
||||
node.result = obj_initialized
|
||||
node.exception = None
|
||||
|
||||
if hasattr(obj_initialized, "alias"):
|
||||
self._append(
|
||||
obj_initialized,
|
||||
name=name,
|
||||
is_setting=True,
|
||||
is_display="recursive",
|
||||
call_obj=False,
|
||||
)
|
||||
if self.alias_namespace and hasattr(obj_initialized, "alias"):
|
||||
for ta in obj_initialized.alias.get_all():
|
||||
try:
|
||||
self.alias_namespace.update(
|
||||
ta["alias"], ta["channel"], ta["channeltype"]
|
||||
)
|
||||
except Exception as e:
|
||||
print(f'could not init alias {ta["alias"]}')
|
||||
print("error message", e)
|
||||
else:
|
||||
self.names_without_alias.append(name)
|
||||
return obj_initialized
|
||||
except Exception as e:
|
||||
node.state = "failed"
|
||||
node.exception = e
|
||||
node.result = None
|
||||
return
|
||||
finally:
|
||||
node.event.set()
|
||||
stack.pop()
|
||||
self._thread_local.stack = stack
|
||||
|
||||
def append_obj(
|
||||
self,
|
||||
obj_factory,
|
||||
@@ -831,152 +1049,63 @@ class Namespace(Assembly):
|
||||
init_timeout=30,
|
||||
**kwargs,
|
||||
):
|
||||
self.failed_items.pop(name, None)
|
||||
self.failed_items_excpetion.pop(name, None)
|
||||
self.initialized_items.pop(name, None)
|
||||
self.lazy_items.pop(name, None)
|
||||
self.lazy_nodes.pop(name, None)
|
||||
|
||||
if lazy:
|
||||
|
||||
def init_local():
|
||||
|
||||
if name in self.failed_names:
|
||||
tmpexc = self.failed_items_excpetion
|
||||
if isinstance(tmpexc[name], BaseException):
|
||||
raise tmpexc[name]
|
||||
else:
|
||||
raise IsInitialisingError(
|
||||
f"{name} failed previously to initialize."
|
||||
)
|
||||
|
||||
if name in self._initializing:
|
||||
self._init_priority[name] += 1
|
||||
while name in self._initializing:
|
||||
if (
|
||||
time() - self._initialisation_start_time[name]
|
||||
) <= init_timeout:
|
||||
sleep(5)
|
||||
else:
|
||||
# print(f'{name} waiting init since {time()-self._initialisation_start_time[name]} s')
|
||||
# sleep(5)
|
||||
# # passfailed_items_excpetion
|
||||
self._initializing.pop(self._initializing.index(name))
|
||||
raise IsInitialisingError(
|
||||
f"NB: {name} initialization timed out!"
|
||||
)
|
||||
|
||||
else:
|
||||
self._initializing.append(name)
|
||||
self._init_priority[name] = 0
|
||||
self._initialisation_start_time[name] = time()
|
||||
|
||||
# args, kwargs = replace_NamespaceComponents(*args, **kwargs)
|
||||
|
||||
if module_name:
|
||||
obj_maker = getattr(import_module(module_name), obj_factory)
|
||||
else:
|
||||
obj_maker = obj_factory
|
||||
|
||||
args_resolved, kwargs_resolved = replace_NamespaceComponents(
|
||||
*args, **kwargs
|
||||
)
|
||||
accepts_name = "name" in signature(obj_maker).parameters
|
||||
manual_context = format_manual_instantiation(
|
||||
obj_maker,
|
||||
args_resolved,
|
||||
kwargs_resolved,
|
||||
name=name,
|
||||
accepts_name=accepts_name,
|
||||
module_name=module_name,
|
||||
)
|
||||
try:
|
||||
if accepts_name:
|
||||
obj_initialized = obj_maker(
|
||||
*args_resolved,
|
||||
name=name,
|
||||
**kwargs_resolved,
|
||||
)
|
||||
else:
|
||||
obj_initialized = obj_maker(
|
||||
*args_resolved,
|
||||
**kwargs_resolved,
|
||||
)
|
||||
except Exception as e:
|
||||
append_manual_context(e, manual_context)
|
||||
self.failed_items[name] = self.lazy_items.pop(name)
|
||||
self.failed_items_excpetion[name] = e
|
||||
self._initializing.pop(self._initializing.index(name))
|
||||
raise
|
||||
|
||||
try:
|
||||
self.initialized_items[name] = self.lazy_items.pop(name)
|
||||
except KeyError:
|
||||
self.initialized_items[name] = self.failed_items.pop(name)
|
||||
self._initializing.pop(self._initializing.index(name))
|
||||
# if name in self.initialisation_times_lazy.keys():
|
||||
# self.initialisation_times_lazy[name] += time() - starttime
|
||||
# else:
|
||||
self.initialisation_times_lazy[name] = (
|
||||
time() - self._initialisation_start_time[name]
|
||||
)
|
||||
if hasattr(obj_initialized, "alias"):
|
||||
self._append(
|
||||
obj_initialized,
|
||||
name=name,
|
||||
is_setting=True,
|
||||
is_display="recursive",
|
||||
call_obj=False,
|
||||
)
|
||||
if self.alias_namespace and hasattr(obj_initialized, "alias"):
|
||||
for ta in obj_initialized.alias.get_all():
|
||||
try:
|
||||
self.alias_namespace.update(
|
||||
ta["alias"], ta["channel"], ta["channeltype"]
|
||||
)
|
||||
except Exception as e:
|
||||
print(f'could not init alias {ta["alias"]}')
|
||||
print("error message", e)
|
||||
# traceback.print_tb(e)
|
||||
else:
|
||||
self.names_without_alias.append(name)
|
||||
return obj_initialized
|
||||
|
||||
obj_lazy = Proxy(init_local)
|
||||
self.lazy_nodes[name] = LazyNode(
|
||||
name=name,
|
||||
factory=obj_factory,
|
||||
args=args,
|
||||
kwargs=kwargs,
|
||||
module_name=module_name,
|
||||
dependencies=self._collect_dependencies(*args, **kwargs),
|
||||
)
|
||||
obj_lazy = Proxy(partial(self._ensure_node, name))
|
||||
self.lazy_items[name] = obj_lazy
|
||||
if self.root_module:
|
||||
sys.modules[self.root_module].__dict__[name] = obj_lazy
|
||||
return obj_lazy
|
||||
|
||||
starttime = time()
|
||||
args_resolved, kwargs_resolved = self._resolve_factory_inputs(
|
||||
*args, lazy=False, **kwargs
|
||||
)
|
||||
if module_name:
|
||||
obj_maker = getattr(import_module(module_name), obj_factory)
|
||||
else:
|
||||
starttime = time()
|
||||
args, kwargs = replace_NamespaceComponents(*args, **kwargs)
|
||||
if module_name:
|
||||
obj_maker = getattr(import_module(module_name), obj_factory)
|
||||
else:
|
||||
obj_maker = obj_factory
|
||||
try:
|
||||
obj = obj_maker(*args, name=name, **kwargs)
|
||||
except TypeError:
|
||||
obj = obj_maker(*args, **kwargs)
|
||||
self.initialized_items[name] = obj
|
||||
self.initialisation_times_lazy[name] = time() - starttime
|
||||
if self.root_module:
|
||||
sys.modules[self.root_module].__dict__[name] = obj
|
||||
if hasattr(obj, "alias"):
|
||||
self._append(
|
||||
obj,
|
||||
name=name,
|
||||
is_setting=True,
|
||||
is_display="recursive",
|
||||
call_obj=False,
|
||||
)
|
||||
if self.alias_namespace and hasattr(obj, "alias"):
|
||||
for ta in obj.alias.get_all():
|
||||
try:
|
||||
self.alias_namespace.update(
|
||||
ta["alias"], ta["channel"], ta["channeltype"]
|
||||
)
|
||||
except Exception as e:
|
||||
print(f'could not init alias {ta["alias"]}')
|
||||
print("error message", e)
|
||||
else:
|
||||
self.names_without_alias.append(name)
|
||||
return obj
|
||||
obj_maker = obj_factory
|
||||
try:
|
||||
obj = obj_maker(*args_resolved, name=name, **kwargs_resolved)
|
||||
except TypeError:
|
||||
obj = obj_maker(*args_resolved, **kwargs_resolved)
|
||||
self.initialized_items[name] = obj
|
||||
self.initialisation_times_lazy[name] = time() - starttime
|
||||
if self.root_module:
|
||||
sys.modules[self.root_module].__dict__[name] = obj
|
||||
if hasattr(obj, "alias"):
|
||||
self._append(
|
||||
obj,
|
||||
name=name,
|
||||
is_setting=True,
|
||||
is_display="recursive",
|
||||
call_obj=False,
|
||||
)
|
||||
if self.alias_namespace and hasattr(obj, "alias"):
|
||||
for ta in obj.alias.get_all():
|
||||
try:
|
||||
self.alias_namespace.update(
|
||||
ta["alias"], ta["channel"], ta["channeltype"]
|
||||
)
|
||||
except Exception as e:
|
||||
print(f'could not init alias {ta["alias"]}')
|
||||
print("error message", e)
|
||||
else:
|
||||
self.names_without_alias.append(name)
|
||||
return obj
|
||||
|
||||
def get_obj(self, name):
|
||||
if name in self.lazy_names:
|
||||
@@ -988,18 +1117,8 @@ class Namespace(Assembly):
|
||||
|
||||
def append_obj_from_config(self, cnf, lazy=False):
|
||||
module_name, obj_factory = cnf["type"].split(":")
|
||||
args = []
|
||||
for targ in cnf["args"]:
|
||||
if isinstance(targ, Component):
|
||||
args.append(self.get_obj(targ.name))
|
||||
else:
|
||||
args.append(targ)
|
||||
kwargs = {}
|
||||
for tk, tv in cnf["kwargs"].items():
|
||||
if isinstance(tv, Component):
|
||||
kwargs[tk] = self.get_obj(tv.name)
|
||||
else:
|
||||
kwargs[tk] = tv
|
||||
args = list(cnf["args"])
|
||||
kwargs = dict(cnf["kwargs"])
|
||||
if "lazy" in cnf.keys():
|
||||
lazy = cnf["lazy"]
|
||||
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
import pytest
|
||||
|
||||
from eco.utilities.config import Namespace
|
||||
from eco.utilities.config import Component, Namespace
|
||||
|
||||
|
||||
class BadThing:
|
||||
@@ -8,6 +8,16 @@ class BadThing:
|
||||
raise ValueError("boom")
|
||||
|
||||
|
||||
class DependencyThing:
|
||||
def __init__(self, name=None):
|
||||
self.name = name
|
||||
|
||||
|
||||
class NeedsDependency:
|
||||
def __init__(self, dependency, name=None):
|
||||
self.dependency = dependency
|
||||
|
||||
|
||||
def test_lazy_init_failure_includes_manual_instantiation_string():
|
||||
ns = Namespace(name="test")
|
||||
ns.append_obj(BadThing, 1, lazy=True, name="bad")
|
||||
@@ -26,3 +36,23 @@ def test_lazy_init_failure_includes_manual_instantiation_string():
|
||||
"BadThing(1, name='bad')"
|
||||
)
|
||||
assert ns.failed_items_excpetion["bad"].args[-1] == manual
|
||||
|
||||
|
||||
def test_append_obj_from_config_resolves_component_dependencies_eagerly_for_non_lazy_nodes():
|
||||
ns = Namespace(name="test")
|
||||
ns.append_obj(DependencyThing, lazy=True, name="dependency")
|
||||
|
||||
ns.append_obj_from_config(
|
||||
{
|
||||
"type": f"{__name__}:NeedsDependency",
|
||||
"name": "needs_dependency",
|
||||
"args": [Component("dependency")],
|
||||
"kwargs": {},
|
||||
"lazy": False,
|
||||
}
|
||||
)
|
||||
|
||||
root = ns.initialized_items["needs_dependency"]
|
||||
assert isinstance(root.dependency, DependencyThing)
|
||||
assert root.dependency is not ns.get_obj("dependency")
|
||||
assert root.dependency.name == "dependency"
|
||||
|
||||
Reference in New Issue
Block a user