Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 32f33f9f9c |
+203
-84
@@ -19,7 +19,8 @@ from importlib import import_module
|
|||||||
from lazy_object_proxy import Proxy as Proxy_orig
|
from lazy_object_proxy import Proxy as Proxy_orig
|
||||||
from tabulate import tabulate
|
from tabulate import tabulate
|
||||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||||
from threading import Thread
|
from dataclasses import dataclass, field
|
||||||
|
from threading import Event, Thread, local
|
||||||
from tqdm import tqdm
|
from tqdm import tqdm
|
||||||
from rich import progress
|
from rich import progress
|
||||||
from inspect import signature
|
from inspect import signature
|
||||||
@@ -113,7 +114,7 @@ def format_manual_instantiation(
|
|||||||
]
|
]
|
||||||
call_text = f"{obj_name}({', '.join(call_parts)})"
|
call_text = f"{obj_name}({', '.join(call_parts)})"
|
||||||
|
|
||||||
return f"For manual instantiation copy/paste: {import_stmt}; {call_text}"
|
return f"Manual instantiation attempt: {import_stmt}; {call_text}"
|
||||||
|
|
||||||
|
|
||||||
def append_manual_context(exc, manual_context):
|
def append_manual_context(exc, manual_context):
|
||||||
@@ -176,7 +177,7 @@ def replaceComponent(inp, dict_all, config_all, lazy=False):
|
|||||||
ind = [ta.name == tca["name"] for tca in config_all].index(True)
|
ind = [ta.name == tca["name"] for tca in config_all].index(True)
|
||||||
outp.append(
|
outp.append(
|
||||||
initFromConfigList(
|
initFromConfigList(
|
||||||
config_list[ind : ind + 1], config_all, lazy=lazy
|
config_all[ind : ind + 1], config_all, lazy=lazy
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
elif isinstance(ta, dict) or isinstance(ta, list):
|
elif isinstance(ta, dict) or isinstance(ta, list):
|
||||||
@@ -190,9 +191,9 @@ def replaceComponent(inp, dict_all, config_all, lazy=False):
|
|||||||
if ta.name in dict_all.keys():
|
if ta.name in dict_all.keys():
|
||||||
outp[tk] = dict_all[ta.name]
|
outp[tk] = dict_all[ta.name]
|
||||||
else:
|
else:
|
||||||
ind = [tk.name == tca["name"] for tca in config_all].index(True)
|
ind = [ta.name == tca["name"] for tca in config_all].index(True)
|
||||||
outp[tk] = initFromConfigList(
|
outp[tk] = initFromConfigList(
|
||||||
config_list[ind : ind + 1], config_all, lazy=lazy
|
config_all[ind : ind + 1], config_all, lazy=lazy
|
||||||
)
|
)
|
||||||
elif isinstance(ta, dict) or isinstance(ta, list):
|
elif isinstance(ta, dict) or isinstance(ta, list):
|
||||||
outp[tk] = replaceComponent(ta, dict_all, config_all, lazy=lazy)
|
outp[tk] = replaceComponent(ta, dict_all, config_all, lazy=lazy)
|
||||||
@@ -356,6 +357,21 @@ class IsInitialisingError(Exception):
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class LazyNode:
|
||||||
|
name: str
|
||||||
|
factory: object
|
||||||
|
args: tuple
|
||||||
|
kwargs: dict
|
||||||
|
module_name: str = None
|
||||||
|
dependencies: set = field(default_factory=set)
|
||||||
|
state: str = "pending"
|
||||||
|
result: object = None
|
||||||
|
exception: Exception = None
|
||||||
|
event: Event = field(default_factory=Event)
|
||||||
|
start_time: float = 0.0
|
||||||
|
|
||||||
|
|
||||||
class Namespace(Assembly):
|
class Namespace(Assembly):
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
@@ -368,6 +384,7 @@ class Namespace(Assembly):
|
|||||||
super().__init__(name)
|
super().__init__(name)
|
||||||
# self.name = name
|
# self.name = name
|
||||||
self.lazy_items = {}
|
self.lazy_items = {}
|
||||||
|
self.lazy_nodes = {}
|
||||||
self.initialized_items = {}
|
self.initialized_items = {}
|
||||||
self.failed_items = {}
|
self.failed_items = {}
|
||||||
self.failed_items_excpetion = {}
|
self.failed_items_excpetion = {}
|
||||||
@@ -378,6 +395,7 @@ class Namespace(Assembly):
|
|||||||
|
|
||||||
self.names_without_alias = []
|
self.names_without_alias = []
|
||||||
self._initializing = []
|
self._initializing = []
|
||||||
|
self._thread_local = local()
|
||||||
self.root_module = root_module
|
self.root_module = root_module
|
||||||
self.alias_namespace = alias_namespace
|
self.alias_namespace = alias_namespace
|
||||||
if required_names_directory:
|
if required_names_directory:
|
||||||
@@ -430,6 +448,10 @@ class Namespace(Assembly):
|
|||||||
names = self.failed_names
|
names = self.failed_names
|
||||||
for name in names:
|
for name in names:
|
||||||
self.lazy_items[name] = self.failed_items.pop(name)
|
self.lazy_items[name] = self.failed_items.pop(name)
|
||||||
|
self.lazy_nodes[name].state = "pending"
|
||||||
|
self.lazy_nodes[name].result = None
|
||||||
|
self.lazy_nodes[name].exception = None
|
||||||
|
self.lazy_nodes[name].event.clear()
|
||||||
try:
|
try:
|
||||||
self.failed_items_excpetion.pop(name)
|
self.failed_items_excpetion.pop(name)
|
||||||
except KeyError:
|
except KeyError:
|
||||||
@@ -821,59 +843,134 @@ class Namespace(Assembly):
|
|||||||
aliases_out = aliases
|
aliases_out = aliases
|
||||||
return aliases, has_no_aliases
|
return aliases, has_no_aliases
|
||||||
|
|
||||||
def append_obj(
|
def _collect_dependencies_from_value(self, value):
|
||||||
self,
|
if isinstance(value, NamespaceComponent):
|
||||||
obj_factory,
|
return {value.obj_name}
|
||||||
*args,
|
if isinstance(value, Component):
|
||||||
lazy=False,
|
return {value.name}
|
||||||
name=None,
|
if isinstance(value, list):
|
||||||
module_name=None,
|
deps = set()
|
||||||
init_timeout=30,
|
for item in value:
|
||||||
**kwargs,
|
deps |= self._collect_dependencies_from_value(item)
|
||||||
):
|
return deps
|
||||||
|
if isinstance(value, tuple):
|
||||||
|
deps = set()
|
||||||
|
for item in value:
|
||||||
|
deps |= self._collect_dependencies_from_value(item)
|
||||||
|
return deps
|
||||||
|
if isinstance(value, dict):
|
||||||
|
deps = set()
|
||||||
|
for item in value.values():
|
||||||
|
deps |= self._collect_dependencies_from_value(item)
|
||||||
|
return deps
|
||||||
|
return set()
|
||||||
|
|
||||||
|
def _collect_dependencies(self, *args, **kwargs):
|
||||||
|
dependencies = set()
|
||||||
|
for value in args:
|
||||||
|
dependencies |= self._collect_dependencies_from_value(value)
|
||||||
|
for value in kwargs.values():
|
||||||
|
dependencies |= self._collect_dependencies_from_value(value)
|
||||||
|
return dependencies
|
||||||
|
|
||||||
|
def _resolve_factory_inputs(self, *args, lazy=False, **kwargs):
|
||||||
|
resolved_args = []
|
||||||
|
for value in args:
|
||||||
|
resolved_args.append(self._resolve_factory_input(value, lazy=lazy))
|
||||||
|
|
||||||
|
resolved_kwargs = {}
|
||||||
|
for key, value in kwargs.items():
|
||||||
|
resolved_kwargs[key] = self._resolve_factory_input(value, lazy=lazy)
|
||||||
|
|
||||||
|
return resolved_args, resolved_kwargs
|
||||||
|
|
||||||
|
def _resolve_factory_input(self, value, lazy=False):
|
||||||
|
if isinstance(value, NamespaceComponent):
|
||||||
if lazy:
|
if lazy:
|
||||||
|
return Proxy(lambda value=value: value.get())
|
||||||
|
return self._materialize_dependency(value.obj_name)
|
||||||
|
|
||||||
def init_local():
|
if isinstance(value, Component):
|
||||||
|
if lazy:
|
||||||
|
return Proxy(lambda name=value.name: self._get_ready_object(name))
|
||||||
|
return self._materialize_dependency(value.name)
|
||||||
|
|
||||||
if name in self.failed_names:
|
if isinstance(value, list):
|
||||||
tmpexc = self.failed_items_excpetion
|
return [self._resolve_factory_input(item, lazy=lazy) for item in value]
|
||||||
if isinstance(tmpexc[name], BaseException):
|
|
||||||
raise tmpexc[name]
|
|
||||||
else:
|
|
||||||
raise IsInitialisingError(
|
|
||||||
f"{name} failed previously to initialize."
|
|
||||||
)
|
|
||||||
|
|
||||||
if name in self._initializing:
|
if isinstance(value, tuple):
|
||||||
self._init_priority[name] += 1
|
return tuple(self._resolve_factory_input(item, lazy=lazy) for item in value)
|
||||||
while name in self._initializing:
|
|
||||||
if (
|
|
||||||
time() - self._initialisation_start_time[name]
|
|
||||||
) <= init_timeout:
|
|
||||||
sleep(5)
|
|
||||||
else:
|
|
||||||
# print(f'{name} waiting init since {time()-self._initialisation_start_time[name]} s')
|
|
||||||
# sleep(5)
|
|
||||||
# # passfailed_items_excpetion
|
|
||||||
self._initializing.pop(self._initializing.index(name))
|
|
||||||
raise IsInitialisingError(
|
|
||||||
f"NB: {name} initialization timed out!"
|
|
||||||
)
|
|
||||||
|
|
||||||
else:
|
if isinstance(value, dict):
|
||||||
self._initializing.append(name)
|
return {
|
||||||
self._init_priority[name] = 0
|
key: self._resolve_factory_input(item, lazy=lazy)
|
||||||
self._initialisation_start_time[name] = time()
|
for key, item in value.items()
|
||||||
|
}
|
||||||
|
|
||||||
# args, kwargs = replace_NamespaceComponents(*args, **kwargs)
|
return value
|
||||||
|
|
||||||
if module_name:
|
def _get_ready_object(self, name):
|
||||||
obj_maker = getattr(import_module(module_name), obj_factory)
|
if name in self.initialized_names:
|
||||||
else:
|
return self.initialized_items[name]
|
||||||
obj_maker = obj_factory
|
if name in self.lazy_names:
|
||||||
|
return self._ensure_node(name)
|
||||||
|
raise Exception(f"Name {name} is not initialized!")
|
||||||
|
|
||||||
args_resolved, kwargs_resolved = replace_NamespaceComponents(
|
def _materialize_dependency(self, name):
|
||||||
*args, **kwargs
|
if name in self.initialized_names:
|
||||||
|
return self.initialized_items[name]
|
||||||
|
|
||||||
|
if name not in self.lazy_names:
|
||||||
|
raise Exception(f"Name {name} is not initialized!")
|
||||||
|
|
||||||
|
proxy = self.lazy_items.get(name)
|
||||||
|
obj = self._ensure_node(name)
|
||||||
|
if proxy is not None:
|
||||||
|
self.lazy_items[name] = proxy
|
||||||
|
return obj
|
||||||
|
|
||||||
|
def _ensure_node(self, name):
|
||||||
|
node = self.lazy_nodes[name]
|
||||||
|
if node.state == "done":
|
||||||
|
return node.result
|
||||||
|
if node.state == "failed":
|
||||||
|
raise node.exception
|
||||||
|
|
||||||
|
if node.state == "pending":
|
||||||
|
node.state = "running"
|
||||||
|
node.start_time = time()
|
||||||
|
node.event.clear()
|
||||||
|
worker = Thread(target=self._run_node, args=(name,), daemon=True)
|
||||||
|
worker.start()
|
||||||
|
|
||||||
|
node.event.wait()
|
||||||
|
|
||||||
|
if node.state == "done":
|
||||||
|
return node.result
|
||||||
|
if node.state == "failed":
|
||||||
|
raise node.exception
|
||||||
|
|
||||||
|
raise RuntimeError(f"Lazy node {name} did not complete cleanly")
|
||||||
|
|
||||||
|
def _run_node(self, name):
|
||||||
|
node = self.lazy_nodes[name]
|
||||||
|
stack = getattr(self._thread_local, "stack", [])
|
||||||
|
try:
|
||||||
|
if name in stack:
|
||||||
|
raise RuntimeError(f"Cyclic lazy dependency detected for {name}")
|
||||||
|
|
||||||
|
stack.append(name)
|
||||||
|
self._thread_local.stack = stack
|
||||||
|
|
||||||
|
for dep_name in node.dependencies:
|
||||||
|
self._ensure_node(dep_name)
|
||||||
|
|
||||||
|
obj_maker = node.factory
|
||||||
|
if node.module_name:
|
||||||
|
obj_maker = getattr(import_module(node.module_name), node.factory)
|
||||||
|
|
||||||
|
args_resolved, kwargs_resolved = self._resolve_factory_inputs(
|
||||||
|
*node.args, lazy=True, **node.kwargs
|
||||||
)
|
)
|
||||||
accepts_name = "name" in signature(obj_maker).parameters
|
accepts_name = "name" in signature(obj_maker).parameters
|
||||||
manual_context = format_manual_instantiation(
|
manual_context = format_manual_instantiation(
|
||||||
@@ -882,7 +979,7 @@ class Namespace(Assembly):
|
|||||||
kwargs_resolved,
|
kwargs_resolved,
|
||||||
name=name,
|
name=name,
|
||||||
accepts_name=accepts_name,
|
accepts_name=accepts_name,
|
||||||
module_name=module_name,
|
module_name=node.module_name,
|
||||||
)
|
)
|
||||||
try:
|
try:
|
||||||
if accepts_name:
|
if accepts_name:
|
||||||
@@ -892,28 +989,26 @@ class Namespace(Assembly):
|
|||||||
**kwargs_resolved,
|
**kwargs_resolved,
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
obj_initialized = obj_maker(
|
obj_initialized = obj_maker(*args_resolved, **kwargs_resolved)
|
||||||
*args_resolved,
|
|
||||||
**kwargs_resolved,
|
|
||||||
)
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
append_manual_context(e, manual_context)
|
append_manual_context(e, manual_context)
|
||||||
self.failed_items[name] = self.lazy_items.pop(name)
|
self.failed_items[name] = self.lazy_items.pop(name)
|
||||||
self.failed_items_excpetion[name] = e
|
self.failed_items_excpetion[name] = e
|
||||||
self._initializing.pop(self._initializing.index(name))
|
node.state = "failed"
|
||||||
raise
|
node.exception = e
|
||||||
|
node.result = None
|
||||||
|
return
|
||||||
|
|
||||||
|
self.initialized_items[name] = obj_initialized
|
||||||
|
self.initialisation_times_lazy[name] = time() - node.start_time
|
||||||
try:
|
try:
|
||||||
self.initialized_items[name] = self.lazy_items.pop(name)
|
self.lazy_items.pop(name)
|
||||||
except KeyError:
|
except KeyError:
|
||||||
self.initialized_items[name] = self.failed_items.pop(name)
|
pass
|
||||||
self._initializing.pop(self._initializing.index(name))
|
node.state = "done"
|
||||||
# if name in self.initialisation_times_lazy.keys():
|
node.result = obj_initialized
|
||||||
# self.initialisation_times_lazy[name] += time() - starttime
|
node.exception = None
|
||||||
# else:
|
|
||||||
self.initialisation_times_lazy[name] = (
|
|
||||||
time() - self._initialisation_start_time[name]
|
|
||||||
)
|
|
||||||
if hasattr(obj_initialized, "alias"):
|
if hasattr(obj_initialized, "alias"):
|
||||||
self._append(
|
self._append(
|
||||||
obj_initialized,
|
obj_initialized,
|
||||||
@@ -931,28 +1026,62 @@ class Namespace(Assembly):
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f'could not init alias {ta["alias"]}')
|
print(f'could not init alias {ta["alias"]}')
|
||||||
print("error message", e)
|
print("error message", e)
|
||||||
# traceback.print_tb(e)
|
|
||||||
else:
|
else:
|
||||||
self.names_without_alias.append(name)
|
self.names_without_alias.append(name)
|
||||||
return obj_initialized
|
return obj_initialized
|
||||||
|
except Exception as e:
|
||||||
|
node.state = "failed"
|
||||||
|
node.exception = e
|
||||||
|
node.result = None
|
||||||
|
return
|
||||||
|
finally:
|
||||||
|
node.event.set()
|
||||||
|
stack.pop()
|
||||||
|
self._thread_local.stack = stack
|
||||||
|
|
||||||
obj_lazy = Proxy(init_local)
|
def append_obj(
|
||||||
|
self,
|
||||||
|
obj_factory,
|
||||||
|
*args,
|
||||||
|
lazy=False,
|
||||||
|
name=None,
|
||||||
|
module_name=None,
|
||||||
|
init_timeout=30,
|
||||||
|
**kwargs,
|
||||||
|
):
|
||||||
|
self.failed_items.pop(name, None)
|
||||||
|
self.failed_items_excpetion.pop(name, None)
|
||||||
|
self.initialized_items.pop(name, None)
|
||||||
|
self.lazy_items.pop(name, None)
|
||||||
|
self.lazy_nodes.pop(name, None)
|
||||||
|
|
||||||
|
if lazy:
|
||||||
|
self.lazy_nodes[name] = LazyNode(
|
||||||
|
name=name,
|
||||||
|
factory=obj_factory,
|
||||||
|
args=args,
|
||||||
|
kwargs=kwargs,
|
||||||
|
module_name=module_name,
|
||||||
|
dependencies=self._collect_dependencies(*args, **kwargs),
|
||||||
|
)
|
||||||
|
obj_lazy = Proxy(partial(self._ensure_node, name))
|
||||||
self.lazy_items[name] = obj_lazy
|
self.lazy_items[name] = obj_lazy
|
||||||
if self.root_module:
|
if self.root_module:
|
||||||
sys.modules[self.root_module].__dict__[name] = obj_lazy
|
sys.modules[self.root_module].__dict__[name] = obj_lazy
|
||||||
return obj_lazy
|
return obj_lazy
|
||||||
|
|
||||||
else:
|
|
||||||
starttime = time()
|
starttime = time()
|
||||||
args, kwargs = replace_NamespaceComponents(*args, **kwargs)
|
args_resolved, kwargs_resolved = self._resolve_factory_inputs(
|
||||||
|
*args, lazy=False, **kwargs
|
||||||
|
)
|
||||||
if module_name:
|
if module_name:
|
||||||
obj_maker = getattr(import_module(module_name), obj_factory)
|
obj_maker = getattr(import_module(module_name), obj_factory)
|
||||||
else:
|
else:
|
||||||
obj_maker = obj_factory
|
obj_maker = obj_factory
|
||||||
try:
|
try:
|
||||||
obj = obj_maker(*args, name=name, **kwargs)
|
obj = obj_maker(*args_resolved, name=name, **kwargs_resolved)
|
||||||
except TypeError:
|
except TypeError:
|
||||||
obj = obj_maker(*args, **kwargs)
|
obj = obj_maker(*args_resolved, **kwargs_resolved)
|
||||||
self.initialized_items[name] = obj
|
self.initialized_items[name] = obj
|
||||||
self.initialisation_times_lazy[name] = time() - starttime
|
self.initialisation_times_lazy[name] = time() - starttime
|
||||||
if self.root_module:
|
if self.root_module:
|
||||||
@@ -988,18 +1117,8 @@ class Namespace(Assembly):
|
|||||||
|
|
||||||
def append_obj_from_config(self, cnf, lazy=False):
|
def append_obj_from_config(self, cnf, lazy=False):
|
||||||
module_name, obj_factory = cnf["type"].split(":")
|
module_name, obj_factory = cnf["type"].split(":")
|
||||||
args = []
|
args = list(cnf["args"])
|
||||||
for targ in cnf["args"]:
|
kwargs = dict(cnf["kwargs"])
|
||||||
if isinstance(targ, Component):
|
|
||||||
args.append(self.get_obj(targ.name))
|
|
||||||
else:
|
|
||||||
args.append(targ)
|
|
||||||
kwargs = {}
|
|
||||||
for tk, tv in cnf["kwargs"].items():
|
|
||||||
if isinstance(tv, Component):
|
|
||||||
kwargs[tk] = self.get_obj(tv.name)
|
|
||||||
else:
|
|
||||||
kwargs[tk] = tv
|
|
||||||
if "lazy" in cnf.keys():
|
if "lazy" in cnf.keys():
|
||||||
lazy = cnf["lazy"]
|
lazy = cnf["lazy"]
|
||||||
|
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
from eco.utilities.config import Namespace
|
from eco.utilities.config import Component, Namespace
|
||||||
|
|
||||||
|
|
||||||
class BadThing:
|
class BadThing:
|
||||||
@@ -8,6 +8,16 @@ class BadThing:
|
|||||||
raise ValueError("boom")
|
raise ValueError("boom")
|
||||||
|
|
||||||
|
|
||||||
|
class DependencyThing:
|
||||||
|
def __init__(self, name=None):
|
||||||
|
self.name = name
|
||||||
|
|
||||||
|
|
||||||
|
class NeedsDependency:
|
||||||
|
def __init__(self, dependency, name=None):
|
||||||
|
self.dependency = dependency
|
||||||
|
|
||||||
|
|
||||||
def test_lazy_init_failure_includes_manual_instantiation_string():
|
def test_lazy_init_failure_includes_manual_instantiation_string():
|
||||||
ns = Namespace(name="test")
|
ns = Namespace(name="test")
|
||||||
ns.append_obj(BadThing, 1, lazy=True, name="bad")
|
ns.append_obj(BadThing, 1, lazy=True, name="bad")
|
||||||
@@ -26,3 +36,23 @@ def test_lazy_init_failure_includes_manual_instantiation_string():
|
|||||||
"BadThing(1, name='bad')"
|
"BadThing(1, name='bad')"
|
||||||
)
|
)
|
||||||
assert ns.failed_items_excpetion["bad"].args[-1] == manual
|
assert ns.failed_items_excpetion["bad"].args[-1] == manual
|
||||||
|
|
||||||
|
|
||||||
|
def test_append_obj_from_config_resolves_component_dependencies_eagerly_for_non_lazy_nodes():
|
||||||
|
ns = Namespace(name="test")
|
||||||
|
ns.append_obj(DependencyThing, lazy=True, name="dependency")
|
||||||
|
|
||||||
|
ns.append_obj_from_config(
|
||||||
|
{
|
||||||
|
"type": f"{__name__}:NeedsDependency",
|
||||||
|
"name": "needs_dependency",
|
||||||
|
"args": [Component("dependency")],
|
||||||
|
"kwargs": {},
|
||||||
|
"lazy": False,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
root = ns.initialized_items["needs_dependency"]
|
||||||
|
assert isinstance(root.dependency, DependencyThing)
|
||||||
|
assert root.dependency is not ns.get_obj("dependency")
|
||||||
|
assert root.dependency.name == "dependency"
|
||||||
|
|||||||
Reference in New Issue
Block a user