Using registry based resolution of forward refs for a complete object model before serialisation

This commit is contained in:
Simone Baffelli
2025-08-20 13:26:39 +02:00
parent 163033539d
commit 6703637e3f
8 changed files with 346 additions and 81 deletions
@@ -1,4 +1,4 @@
from pydantic import BaseModel, Field
from pydantic import BaseModel, Field, field_validator
from rdflib.graph import Node
from rdflib import URIRef, RDF, Literal
from lib_ro_crate_schema.crate.rdf import is_type, object_id
@@ -1,4 +1,5 @@
from typing import Protocol
from typing import Protocol, Self
from lib_ro_crate_schema.crate.registry import Registry
from rdflib import Graph
from rdflib import Node, URIRef, RDF, IdentifiedNode
from rdflib import Namespace
@@ -19,6 +20,13 @@ class RDFDeserializable[T](Protocol):
def from_rdf(cls, triples: list[Triple]): ...
class Resolvable(Protocol):
"""
A protocol for a class that implements reference resolution
"""
def resolve(self, reg: Registry) -> Self: ...
def is_type(id: str, type: URIRef) -> Triple:
"""
Prepare a triple that asserts that something
@@ -0,0 +1,107 @@
from rdflib import Graph, RDF, RDFS, OWL, URIRef, Node
from lib_ro_crate_schema.crate.rdf import SCHEMA
from lib_ro_crate_schema.crate.type_property import TypeProperty
from typing import Dict, Any, Optional
from rdflib import Graph, RDF, RDFS, OWL, URIRef, Node
from lib_ro_crate_schema.crate.rdf import SCHEMA
from lib_ro_crate_schema.crate.type_property import TypeProperty
from typing import Dict, Any, Optional
from pydantic import BaseModel
def resolve_reference(ref: Optional[Node], cache: Dict[URIRef, Any]) -> Optional[Any]:
"""Resolve a reference from the graph, using cache or returning a Ref wrapper."""
match ref:
case None:
return None
case URIRef() as uri if uri in cache:
return cache[uri]
case URIRef() as uri:
return Ref(uri=uri)
case _:
raise TypeError(f"Reference must be a URIRef or None, got {type(ref)}")
class Ref(BaseModel):
"""A reference to an entity to be resolved in a second pass."""
uri: str
# def __init__(self, uri: URIRef) -> None:
# self.uri = uri
# def __repr__(self) -> str:
# return f"Ref({self.uri})"
def get_subjects_by_type(graph: Graph, rdf_type: Node) -> set[Node]:
"""Return all subjects of a given rdf:type."""
return set(graph.subjects(RDF.type, rdf_type))
def get_predicate_object_map(graph: Graph, subject: Node) -> Dict[URIRef, Node]:
"""Return a dict of predicate -> object for a given subject."""
return {p: o for p, o in graph.predicate_objects(subject)}
def reconstruct_property(
prop_subject: Node, props: Dict[URIRef, Node], cache: Dict[URIRef, Any]
) -> Dict[URIRef, Any]:
# Ensure prop_subject is a URIRef
if not isinstance(prop_subject, URIRef):
raise TypeError(f"prop_subject must be a URIRef, got {type(prop_subject)}")
domainIncludesRef: Optional[Node] = props.get(SCHEMA["domainIncludes"])
domainIncludesResolved = resolve_reference(domainIncludesRef, cache)
breakpoint()
tp = TypeProperty(
id=prop_subject,
domain_includes=[domainIncludesResolved] if domainIncludesResolved else [],
)
cache[prop_subject] = tp
return cache
def reconstruct_types(graph: Graph, cache: Dict[URIRef, Any]) -> Dict[URIRef, Any]:
print("Reconstructing Classes:")
for class_subject in get_subjects_by_type(graph, RDFS.Class):
props = get_predicate_object_map(graph, class_subject)
print(f" Class: {class_subject}, {props}")
# TODO: Instantiate Type and assign properties from cache if needed
# cache[class_subject] = Type(...)
return cache
def reconstruct_properties(graph: Graph, cache: Dict[URIRef, Any]) -> Dict[URIRef, Any]:
print("Reconstructing Properties:")
for prop_subject in get_subjects_by_type(graph, RDF.Property):
props = get_predicate_object_map(graph, prop_subject)
print(f" Property: {prop_subject}, {props}")
cache = reconstruct_property(prop_subject, props, cache)
return cache
def reconstruct_restrictions(
graph: Graph, cache: Dict[URIRef, Any]
) -> Dict[URIRef, Any]:
print("Reconstructing Restrictions:")
for restr_subject in get_subjects_by_type(graph, OWL.Restriction):
props = get_predicate_object_map(graph, restr_subject)
print(f" Restriction: {restr_subject}, {props}")
# TODO: Instantiate Restriction and add to cache
return cache
def reconstruct_metadata_entries(
graph: Graph, cache: Dict[URIRef, Any]
) -> Dict[URIRef, Any]:
print("Reconstructing Metadata Entries:")
# TODO: Implement as needed
return cache
def reconstruct(graph: Graph) -> Dict[URIRef, Any]:
cache: Dict[URIRef, Any] = {}
cache = reconstruct_properties(graph, cache)
cache = reconstruct_types(graph, cache)
cache = reconstruct_restrictions(graph, cache)
cache = reconstruct_metadata_entries(graph, cache)
# TODO: Second pass to resolve Ref objects
return cache
@@ -0,0 +1,37 @@
from typing import TypeVar, Dict, Callable, Any
from pydantic import BaseModel
T = TypeVar("T")
R = TypeVar("R")
class ForwardRef[R](BaseModel):
"""
This internal class is used to mark
properties as forward refs to be resolved
"""
ref: str
class Registry[T]:
"""
A registry used for
forward reference resolution
"""
def __init__(self):
self._store: Dict[str, T] = {}
def register(self, key: str, value: T):
self._store[key] = value
def resolve(self, key: ForwardRef[T]) -> T:
return self._store.get(key.ref)
def clear(self):
self._store.clear()
type_registry = Registry[BaseModel]()
@@ -4,14 +4,16 @@ from typing import Generator, Literal
from lib_ro_crate_schema.crate.metadata_entry import MetadataEntry
from lib_ro_crate_schema.crate.rdf import BASE, Triple, object_id
from lib_ro_crate_schema.crate.registry import Registry
from lib_ro_crate_schema.crate.type import Type
from lib_ro_crate_schema.crate.type_property import TypeProperty
from pydantic import BaseModel
from lib_ro_crate_schema.crate.restriction import Restriction
from pydantic import BaseModel, Field, PrivateAttr
from lib_ro_crate_schema.crate.rdf import SCHEMA
from rdflib import RDFS, RDF, Graph
type TypeRegistry = dict[TypeProperty, list[Type]]
from lib_ro_crate_schema.crate.registry import ForwardRef
from typing import Any
from typing import List, Tuple
type TypeRegistry = List[Tuple[TypeProperty, Type]]
@@ -31,10 +33,54 @@ def types_to_triples(used_types: TypeRegistry) -> Generator[Triple, None, None]:
class SchemaFacade(BaseModel):
"""
`_registry` stores a registry of properties and types
to allow forward references to other types
"""
_registry: Registry[Type | TypeProperty | Restriction] = PrivateAttr(
default=Registry()
)
types: list[Type]
metadata_entries: list[MetadataEntry]
prefix: str = "base"
def model_post_init(self, context: Any) -> None:
"""
Register all classes and properties for later reference resolution.
Convert all string refs in properties to ForwardRef using Pydantic post-init.
"""
for current_type in self.types:
self._registry.register(current_type.id, current_type)
if current_type.rdfs_property:
for prop in current_type.rdfs_property:
self._registry.register(prop.id, prop)
super().model_post_init(context)
def resolve_ref[T](self, ref: str | ForwardRef[T]) -> T:
"""
Resolve a reference (ForwardRef, str, or id) to the actual object using the registry.
"""
match ref:
case ForwardRef(ref=ref_id):
return self._registry.resolve(ref_id)
case str(ref_id):
return self._registry.resolve(ref_id)
case _:
return ref
def resolve_forward_refs(self):
"""
Walk all types/properties and delegate reference resolution to each property.
"""
for current_type in self.types:
current_type.resolve(self._registry)
# for current_type in self.types:
# if current_type.rdfs_property:
# for prop in current_type.rdfs_property:
# if hasattr(prop, "resolve_references"):
# prop.resolve_references(self)
def to_triples(self) -> Generator[Triple, None, None]:
for p in self.types:
yield from p.to_triples()
@@ -1,6 +1,7 @@
from typing import List, Generator
from typing import List, Generator, Union
from lib_ro_crate_schema.crate.rdf import is_type, object_id
from lib_ro_crate_schema.crate.registry import ForwardRef, Registry
from .restriction import Restriction
from .type_property import TypeProperty
from pydantic import BaseModel
@@ -10,7 +11,7 @@ from rdflib import Node, Literal, URIRef, RDFS, OWL
class Type(BaseModel):
id: str
type: str
subclass_of: List[str] | None
subclass_of: List[Union[str, "Type", ForwardRef["Type"]]] | None
ontological_annotations: List[str] | None
rdfs_property: List[TypeProperty] | None
comment: str
@@ -27,6 +28,12 @@ class Type(BaseModel):
if self.rdfs_property
]
def resolve(self, registry: Registry):
print(f"Before: {self.rdfs_property}")
for prop in self.rdfs_property:
prop.resolve(registry)
print(f"After: {self.rdfs_property}")
def to_triples(self) -> Generator[Node]:
"""
Emits the type definition as a set of triples
@@ -62,3 +69,6 @@ class Type(BaseModel):
# ontological_annotations=
# equivalent_class=
# )
TypeProperty.model_rebuild()
@@ -1,24 +1,113 @@
from typing import List, Optional, Union
from typing import List, Optional, Union, TYPE_CHECKING
from lib_ro_crate_schema.crate.rdf import SCHEMA, is_type, object_id
from lib_ro_crate_schema.crate.ro_constants import DOMAIN_IDENTIFIER
from .literal_type import LiteralType, to_rdf
from pydantic import BaseModel, Field
from rocrate.model import Person
from rocrate.model.contextentity import ContextEntity
from lib_ro_crate_schema.crate.literal_type import LiteralType, to_rdf
from lib_ro_crate_schema.crate.registry import ForwardRef, Registry
from pydantic import BaseModel, Field, ValidationError, ValidationInfo, field_validator
from rdflib import URIRef, RDF, RDFS, Literal, OWL
if TYPE_CHECKING:
from lib_ro_crate_schema.crate.type import Type
class TypeProperty(BaseModel):
id: str
label: Optional[str] = None
comment: Optional[str] = None
domain_includes: Optional[List[str]] = None
range_includes: Optional[List[str]] = None
range_includes_data_type: Optional[List[LiteralType]] = None
_domain_includes: Optional[List[ForwardRef["Type"]]] = None # internal use only
range_includes: Optional[List[Union[LiteralType, ForwardRef["Type"], "Type"]]] = (
None
)
ontological_annotations: Optional[List[str]] = None
@field_validator("range_includes", mode="before")
@classmethod
def wrap_forward_refs(
cls, v: Optional[List[Union[LiteralType, ForwardRef["Type"]]]]
):
"""
Allows the user-facing API to specify the forward reference as a string
"""
match v:
case None:
return v
case els:
values = []
for range_element in els:
match range_element:
case LiteralType():
values.append(range_element)
case ForwardRef(ref):
values.append(range_element)
case str(ref):
values.append(ForwardRef(ref=ref))
return values
@property
def domain_includes(self) -> Optional[List[str]]:
# For serialization only
return self._domain_includes
# @property
# def range_includes(self) -> Optional[List[str]]:
# # For serialization only
# return self._range_includes
def resolve(self, registry: Registry):
"""
Resolve all references to types
"""
from lib_ro_crate_schema.crate.type import Type
range_includes = []
domain_includes = []
for range_element in self.range_includes:
match range_element:
case Type() | LiteralType():
range_includes.append(range_element)
case ForwardRef():
print(range_element)
range_includes.append(registry.resolve(range_element))
case _:
raise TypeError(
f"Unsupported range_includes element: {range_element!r}"
)
for domain_element in self._domain_includes if self._domain_includes else []:
match domain_element:
case Type():
domain_includes.append(domain_element)
case ForwardRef():
domain_element.append(registry.resolve(domain_element))
case _:
raise TypeError(
f"Unsupported range_includes element: {domain_element!r}"
)
self._domain_includes = domain_includes
self.range_includes = range_includes
def _resolve_range_includes(self):
""" """
from lib_ro_crate_schema.crate.type import Type
resolved = []
if not self.range_includes:
return resolved
for range_element in self.range_includes:
match range_element:
case Type(id=tid):
resolved.append(object_id(tid))
case LiteralType():
resolved.append(to_rdf(range_element))
case str(ref):
resolved.append(URIRef(ref))
case _:
raise TypeError(
f"Unsupported range_includes element: {range_element!r}"
)
return resolved
def to_triples(self, subject=None):
subj = object_id(self.id) if subject is None else subject
yield (subj, RDF.type, RDF.Property)
@@ -29,12 +118,9 @@ class TypeProperty(BaseModel):
if self.domain_includes:
for d in self.domain_includes:
yield (subj, SCHEMA.domainIncludes, URIRef(d))
if self.range_includes:
for r in self.range_includes:
yield (subj, SCHEMA.rangeIncludes, URIRef(r))
if self.range_includes_data_type:
for r in self.range_includes_data_type:
yield (subj, SCHEMA.rangeIncludes, to_rdf(r))
for r in self._resolve_range_includes():
print(type(r), r)
yield (subj, SCHEMA.rangeIncludes, r)
if self.ontological_annotations:
for r in self.ontological_annotations:
yield (subj, OWL.equivalentClass, URIRef(r))
@@ -1,33 +1,25 @@
from collections import defaultdict
# Utility functions for reconstruction
import json
from lib_ro_crate_schema.crate.rdf import BASE
from lib_ro_crate_schema.crate.type import Type
from lib_ro_crate_schema.crate.type_property import TypeProperty
from lib_ro_crate_schema.crate.literal_type import LiteralType
from lib_ro_crate_schema.crate.metadata_entry import MetadataEntry
from lib_ro_crate_schema.crate.schema_facade import SchemaFacade
from rocrate.rocrate import ROCrate
from rdflib import Graph, RDF, RDFS, OWL, URIRef, Node
from lib_ro_crate_schema.crate.jsonld_utils import (
add_schema_to_crate,
)
from rdflib import Graph
from lib_ro_crate_schema.crate.jsonld_utils import add_schema_to_crate
from lib_ro_crate_schema.crate import reconstruction
def main():
has_name = TypeProperty(id="hasName", range_includes_data_type=[LiteralType.STRING])
has_name = TypeProperty(id="hasName", range_includes=[LiteralType.STRING])
has_identifier = TypeProperty(
id="hasIdentifier", range_includes_data_type=[LiteralType.STRING]
id="hasIdentifier", range_includes=[LiteralType.STRING]
)
creator_type = Type(
id="Creator",
type="Type",
subclass_of=["https://schema.org/Thing"],
ontological_annotations=["http://purl.org/dc/terms/creator"],
rdfs_property=[has_name, has_identifier],
comment="",
label="",
)
has_colleague = TypeProperty(id="hasColleague", range_includes=["Participant"])
participant_type = Type(
id="Participant",
@@ -39,10 +31,20 @@ def main():
label="",
)
creator_type = Type(
id="Creator",
type="Type",
subclass_of=["https://schema.org/Thing"],
ontological_annotations=["http://purl.org/dc/terms/creator"],
rdfs_property=[has_name, has_identifier, has_colleague],
comment="",
label="",
)
# Example MetadataEntry using property and type references (object and string)
creator_entry = MetadataEntry(
id="creator1",
types=[creator_type],
types=[creator_type, participant_type],
props={
"has_name": "John Author",
"has_identifier": "https://orcid.org/0000-0000-0000-0000",
@@ -52,10 +54,11 @@ def main():
participant_entry = MetadataEntry(
id="participant",
types=[participant_type],
types=[participant_type, creator_type],
props={
"hasName": "Karl Participant",
"hasIdentifier": "https://orcid.org/0000-0000-0000-0001",
"hasColleague": "creator1",
},
references={},
)
@@ -65,54 +68,22 @@ def main():
# properties=[has_name, has_identifier],
metadata_entries=[creator_entry, participant_entry],
)
#Resolve refs
schema.resolve_forward_refs()
breakpoint()
#Add it to a crate
crate = ROCrate()
crate.license = "a"
crate.name = "mtcrate"
crate.description = "test crate"
res = add_schema_to_crate(schema, crate)
schema_graph = schema.to_graph()
reconstruct(schema_graph)
#Serialise
print(json.dumps(res))
def reconstruct(graph: Graph) -> SchemaFacade:
# Utility functions for reconstruction
def get_subjects_by_type(graph: Graph, rdf_type: Node) -> set[Node]:
"""Return all subjects of a given rdf:type."""
return set(graph.subjects(RDF.type, rdf_type))
def get_predicate_object_map(graph: Graph, subject: Node) -> dict[URIRef, Node]:
"""Return a dict of predicate -> object for a given subject."""
return {p: o for p, o in graph.predicate_objects(subject)}
# Reconstruct in correct order: Classes, Properties, Restrictions, Metadata Entries
print("Reconstructing Classes:")
for class_subject in get_subjects_by_type(graph, RDFS.Class):
props = get_predicate_object_map(graph, class_subject)
print(f" Class: {class_subject}, {props}")
# Here you would instantiate Type(...)
print("Reconstructing Properties:")
for prop_subject in get_subjects_by_type(graph, RDF.Property):
props = get_predicate_object_map(graph, prop_subject)
print(f" Property: {prop_subject}, {props}")
# Here you would instantiate TypeProperty(...)
print("Reconstructing Restrictions:")
for restr_subject in get_subjects_by_type(graph, OWL.Restriction):
props = get_predicate_object_map(graph, restr_subject)
print(f" Restriction: {restr_subject}, {props}")
# Here you would handle restrictions
# Example: reconstructing metadata entries if you have a special type
# for entry_subject in get_subjects_by_type(graph, PROFILE.MetadataEntry):
# props = get_predicate_object_map(graph, entry_subject)
# print(f" MetadataEntry: {entry_subject}, {props}")
breakpoint()
# Use the reconstruction module's main entry point
def reconstruct(graph: Graph):
return reconstruction.reconstruct(graph)
if __name__ == "__main__":