Trying to use pydantic-rdf

This commit is contained in:
Simone Baffelli
2025-09-11 16:30:04 +02:00
parent 8a7b849c00
commit b8b00a5ff5
4 changed files with 303 additions and 169 deletions

View File

@@ -9,6 +9,7 @@ authors = [
requires-python = ">=3.13"
dependencies = [
"pydantic>=2.11.7",
"pydantic-rdf>=0.2.0",
"pyld>=2.0.4",
"pyshacl>=0.30.1",
"rdflib-jsonld>=0.6.2",

View File

@@ -8,67 +8,69 @@ from pydantic import BaseModel
from rdflib import Node, Literal, URIRef, RDFS, OWL
class Type(BaseModel):
id: str
type: str
subclass_of: List[Union[str, "Type", ForwardRef["Type"]]] | None
ontological_annotations: List[str] | None
rdfs_property: List[TypeProperty] | None
comment: str
label: str
def get_restrictions(self) -> list[Restriction]:
"""
Get the restrictions that
represent the properties of this type (RDFS:Class)
"""
return [
Restriction(property_type=prop.id, min_cardinality=1, max_cardinality=1)
for prop in self.rdfs_property
if self.rdfs_property
]
def resolve(self, registry: Registry):
print(f"Before: {self.rdfs_property}")
for prop in self.rdfs_property:
prop.resolve(registry)
print(f"After: {self.rdfs_property}")
def to_triples(self) -> Generator[Node]:
"""
Emits the type definition as a set of triples
whose subject is a RDFS:Class
"""
yield is_type(self.id, RDFS.Class)
yield (object_id(self.id), RDFS.comment, Literal(self.comment))
yield (object_id(self.id), RDFS.label, Literal(self.label))
annotations = [
(object_id(self.id), OWL.equivalentClass, URIRef(cls))
for cls in self.ontological_annotations
]
for ann in annotations:
yield ann
for restriction in self.get_restrictions():
yield from restriction.to_triples()
for prop in self.rdfs_property:
prop_with_domain = prop.model_copy(update=dict(domain_includes=[self.id]))
yield from prop_with_domain.to_triples()
# def to_ro(self) -> RdfsClass:
# return RdfsClass(id=self.id,
# self_type="rdfs:Class",
# subclass_of=serialize_references(self.subclass_of),
# #rdfs_properties=[prop.to_ro() for prop in self.rdfs_property] if self.rdfs_property is not None else None,
# ontological_annotations=None)
# def to_ro(self):
# return RdfsClass(
# id=RoId(id=self.id),
# subclass_of=[RoId(id=i) for i in self.subclass_of if i] if self.subclass_of else [],
# ontological_annotations=
# equivalent_class=
# )
TypeProperty.model_rebuild()
# class Type(BaseModel):
# id: str
# type: str
# subclass_of: List[Union[str, "Type", ForwardRef["Type"]]] | None
# ontological_annotations: List[str] | None
# rdfs_property: List[TypeProperty] | None
# comment: str
# label: str
# def get_restrictions(self) -> list[Restriction]:
# """
# Get the restrictions that
# represent the properties of this type (RDFS:Class)
# """
# return [
# Restriction(property_type=prop.id, min_cardinality=1, max_cardinality=1)
# for prop in self.rdfs_property
# if self.rdfs_property
# ]
# def resolve(self, registry: Registry):
# print(f"Before: {self.rdfs_property}")
# for prop in self.rdfs_property:
# prop.resolve(registry)
# print(f"After: {self.rdfs_property}")
# def to_triples(self) -> Generator[Node]:
# """
# Emits the type definition as a set of triples
# whose subject is a RDFS:Class
# """
# yield is_type(self.id, RDFS.Class)
# yield (object_id(self.id), RDFS.comment, Literal(self.comment))
# yield (object_id(self.id), RDFS.label, Literal(self.label))
# annotations = [
# (object_id(self.id), OWL.equivalentClass, URIRef(cls))
# for cls in self.ontological_annotations
# ]
# for ann in annotations:
# yield ann
# for restriction in self.get_restrictions():
# yield from restriction.to_triples()
# for prop in self.rdfs_property:
# prop_with_domain = prop.model_copy(update=dict(domain_includes=[self.id]))
# yield from prop_with_domain.to_triples()
# # def to_ro(self) -> RdfsClass:
# # return RdfsClass(id=self.id,
# # self_type="rdfs:Class",
# # subclass_of=serialize_references(self.subclass_of),
# # #rdfs_properties=[prop.to_ro() for prop in self.rdfs_property] if self.rdfs_property is not None else None,
# # ontological_annotations=None)
# # def to_ro(self):
# # return RdfsClass(
# # id=RoId(id=self.id),
# # subclass_of=[RoId(id=i) for i in self.subclass_of if i] if self.subclass_of else [],
# # ontological_annotations=
# # equivalent_class=
# # )
# TypeProperty.model_rebuild()

View File

@@ -1,4 +1,6 @@
from typing import List, Optional, Union, TYPE_CHECKING
from enum import Enum
import itertools
from typing import Annotated, Iterable, List, Optional, Union, TYPE_CHECKING
from lib_ro_crate_schema.crate.rdf import SCHEMA, is_type, object_id
@@ -6,122 +8,236 @@ from lib_ro_crate_schema.crate.literal_type import LiteralType, to_rdf
from lib_ro_crate_schema.crate.registry import ForwardRef, Registry
from pydantic import BaseModel, Field, ValidationError, ValidationInfo, field_validator
from rdflib import URIRef, RDF, RDFS, Literal, OWL
from pydantic_rdf import BaseRdfModel, WithPredicate
from rdflib import BNode, Graph, Namespace, URIRef, RDF, RDFS, Literal, OWL, XSD
from pydantic import computed_field
if TYPE_CHECKING:
from lib_ro_crate_schema.crate.type import Type
class TypeProperty(BaseModel):
id: str
label: Optional[str] = None
comment: Optional[str] = None
_domain_includes: Optional[List[ForwardRef["Type"]]] = None # internal use only
range_includes: Optional[List[Union[LiteralType, ForwardRef["Type"], "Type"]]] = (
None
MY_NS = Namespace("ro-schema")
class LiteralType(Enum):
BOOLEAN = "xsd:boolean"
INTEGER = "xsd:integer"
DOUBLE = "xsd:double"
DECIMAL = "xsd:decimal"
FLOAT = "xsd:float"
DATETIME = "xsd:dateTime"
STRING = "xsd:string"
XML_LITERAL = "rdf:XMLLiteral"
class TypeProperty(BaseRdfModel):
rdf_type = RDF.Property
_rdf_namespace = RDF
label: Annotated[str | None, WithPredicate(RDFS.label)] = Field(...)
range_includes: Annotated[
list[Union[LiteralType, "Type"]], WithPredicate(SCHEMA.RangeIncludes)
] = Field(...)
class Restriction(BaseRdfModel):
rdf_type = OWL.Restriction
_rdf_namespace = MY_NS
on_property: Annotated[TypeProperty, WithPredicate(OWL.onProperty)] = Field(...)
min_cardinality: Annotated[int, WithPredicate(OWL.minCardinality)] = Field(...)
max_cardinality: Annotated[int, WithPredicate(OWL.maxCardinality)] = Field(...)
class InternalType(BaseRdfModel):
rdf_type = RDFS.Class
_rdf_namespace = MY_NS
equivalent_class: Annotated[str | None, WithPredicate(OWL.equivalentClass)] = Field(
default=None
)
ontological_annotations: Optional[List[str]] = None
subclass_of: Annotated[list["InternalType"], WithPredicate(RDFS.subClassOf)] = (
Field(default=[])
)
label: Annotated[str | None, WithPredicate(RDFS.label)] = Field(None)
comment: Annotated[str | None, WithPredicate(RDFS.comment)] = Field(default=None)
restrictions: Annotated[
list[Restriction] , WithPredicate(OWL.Restriction)
] = Field(default=[])
@field_validator("range_includes", mode="before")
@classmethod
def wrap_forward_refs(
cls, v: Optional[List[Union[LiteralType, ForwardRef["Type"]]]]
):
"""
Allows the user-facing API to specify the forward reference as a string
"""
match v:
case None:
return v
case els:
values = []
for range_element in els:
match range_element:
case LiteralType():
values.append(range_element)
case ForwardRef(ref):
values.append(range_element)
case str(ref):
values.append(ForwardRef(ref=ref))
return values
@property
def domain_includes(self) -> Optional[List[str]]:
# For serialization only
return self._domain_includes
class Type(BaseModel):
id: str
equivalent_class: str = Field(default=None)
subclass_of: list["Type"] = Field(default=[])
label: str | None = Field(default=None)
comment: str | None = Field(default=None)
properties: list[TypeProperty] = Field(default=[])
# @property
# def range_includes(self) -> Optional[List[str]]:
# # For serialization only
# return self._range_includes
def restrictions(self) -> list[Restriction]:
return [
Restriction(
uri=BNode(), on_property=prop, min_cardinality=0, max_cardinality=1
)
for prop in self.properties
]
def resolve(self, registry: Registry):
"""
Resolve all references to types
"""
from lib_ro_crate_schema.crate.type import Type
range_includes = []
domain_includes = []
for range_element in self.range_includes:
match range_element:
case Type() | LiteralType():
range_includes.append(range_element)
case ForwardRef():
print(range_element)
range_includes.append(registry.resolve(range_element))
case _:
raise TypeError(
f"Unsupported range_includes element: {range_element!r}"
)
for domain_element in self._domain_includes if self._domain_includes else []:
match domain_element:
case Type():
domain_includes.append(domain_element)
case ForwardRef():
domain_element.append(registry.resolve(domain_element))
case _:
raise TypeError(
f"Unsupported range_includes element: {domain_element!r}"
)
self._domain_includes = domain_includes
self.range_includes = range_includes
def to_internal(self) -> InternalType:
restrictions: list[Restriction] = self.restrictions()
breakpoint()
return InternalType(
uri=self.id,
subclass_of=[c.to_internal() for c in self.subclass_of],
label=self.comment,
equivalent_class=self.equivalent_class,
restrictions=restrictions,
)
def _resolve_range_includes(self):
""" """
from lib_ro_crate_schema.crate.type import Type
resolved = []
if not self.range_includes:
return resolved
for range_element in self.range_includes:
match range_element:
case Type(id=tid):
resolved.append(object_id(tid))
case LiteralType():
resolved.append(to_rdf(range_element))
case str(ref):
resolved.append(URIRef(ref))
case _:
raise TypeError(
f"Unsupported range_includes element: {range_element!r}"
)
return resolved
def merge_graphs_from_lists(*graph_lists: Iterable[list[Graph]]) -> Graph:
merged = Graph()
for g in itertools.chain.from_iterable(graph_lists):
merged += g
return merged
def to_triples(self, subject=None):
subj = object_id(self.id) if subject is None else subject
yield (subj, RDF.type, RDF.Property)
if self.label:
yield (subj, RDFS.label, Literal(self.label))
if self.comment:
yield (subj, RDFS.comment, Literal(self.comment))
if self.domain_includes:
for d in self.domain_includes:
yield (subj, SCHEMA.domainIncludes, URIRef(d))
for r in self._resolve_range_includes():
print(type(r), r)
yield (subj, SCHEMA.rangeIncludes, r)
if self.ontological_annotations:
for r in self.ontological_annotations:
yield (subj, OWL.equivalentClass, URIRef(r))
# Add more as needed for data types, annotations, etc.
def to_rdfschema(type: BaseModel) -> Type:
def fromrdf_schnea(schema: SchemaFacade) -> List[Type]
class SchemaFacade(BaseModel):
types: List[Type]
def to_rdf(self):
rdf_types: list[Graph] = [t.to_internal().model_dump_rdf() for t in self.types]
breakpoint()
merged = merge_graphs_from_lists(rdf_types)
return merged
t0 = Type(id="root", subclass_of=[])
p1 = TypeProperty(uri="d", label="a", range_includes=[LiteralType.INTEGER])
p2 = TypeProperty(uri="d1", label="a1", range_includes=[LiteralType.XML_LITERAL])
t1 = Type(id="c", equivalent_class="a", subclass_of=[t0], properties=[p1, p2])
f1 = SchemaFacade(types=[t1])
g1 = f1.to_rdf()
print(g1.serialize(format="turtle"))
# class TypeProperty(BaseModel):
# id: str
# label: Optional[str] = None
# comment: Optional[str] = None
# _domain_includes: Optional[List[ForwardRef["Type"]]] = None # internal use only
# range_includes: Optional[List[Union[LiteralType, ForwardRef["Type"], "Type"]]] = (
# None
# )
# ontological_annotations: Optional[List[str]] = None
# @field_validator("range_includes", mode="before")
# @classmethod
# def wrap_forward_refs(
# cls, v: Optional[List[Union[LiteralType, ForwardRef["Type"]]]]
# ):
# """
# Allows the user-facing API to specify the forward reference as a string
# """
# match v:
# case None:
# return v
# case els:
# values = []
# for range_element in els:
# match range_element:
# case LiteralType():
# values.append(range_element)
# case ForwardRef(ref):
# values.append(range_element)
# case str(ref):
# values.append(ForwardRef(ref=ref))
# return values
# @property
# def domain_includes(self) -> Optional[List[str]]:
# # For serialization only
# return self._domain_includes
# # @property
# # def range_includes(self) -> Optional[List[str]]:
# # # For serialization only
# # return self._range_includes
# def resolve(self, registry: Registry):
# """
# Resolve all references to types
# """
# from lib_ro_crate_schema.crate.type import Type
# range_includes = []
# domain_includes = []
# for range_element in self.range_includes:
# match range_element:
# case Type() | LiteralType():
# range_includes.append(range_element)
# case ForwardRef():
# print(range_element)
# range_includes.append(registry.resolve(range_element))
# case _:
# raise TypeError(
# f"Unsupported range_includes element: {range_element!r}"
# )
# for domain_element in self._domain_includes if self._domain_includes else []:
# match domain_element:
# case Type():
# domain_includes.append(domain_element)
# case ForwardRef():
# domain_element.append(registry.resolve(domain_element))
# case _:
# raise TypeError(
# f"Unsupported range_includes element: {domain_element!r}"
# )
# self._domain_includes = domain_includes
# self.range_includes = range_includes
# def _resolve_range_includes(self):
# """ """
# from lib_ro_crate_schema.crate.type import Type
# resolved = []
# if not self.range_includes:
# return resolved
# for range_element in self.range_includes:
# match range_element:
# case Type(id=tid):
# resolved.append(object_id(tid))
# case LiteralType():
# resolved.append(to_rdf(range_element))
# case str(ref):
# resolved.append(URIRef(ref))
# case _:
# raise TypeError(
# f"Unsupported range_includes element: {range_element!r}"
# )
# return resolved
# def to_triples(self, subject=None):
# subj = object_id(self.id) if subject is None else subject
# yield (subj, RDF.type, RDF.Property)
# if self.label:
# yield (subj, RDFS.label, Literal(self.label))
# if self.comment:
# yield (subj, RDFS.comment, Literal(self.comment))
# if self.domain_includes:
# for d in self.domain_includes:
# yield (subj, SCHEMA.domainIncludes, URIRef(d))
# for r in self._resolve_range_includes():
# print(type(r), r)
# yield (subj, SCHEMA.rangeIncludes, r)
# if self.ontological_annotations:
# for r in self.ontological_annotations:
# yield (subj, OWL.equivalentClass, URIRef(r))
# # Add more as needed for data types, annotations, etc.

View File

@@ -137,6 +137,7 @@ version = "0.1.0"
source = { editable = "." }
dependencies = [
{ name = "pydantic" },
{ name = "pydantic-rdf" },
{ name = "pyld" },
{ name = "pyshacl" },
{ name = "rdflib-jsonld" },
@@ -146,6 +147,7 @@ dependencies = [
[package.metadata]
requires-dist = [
{ name = "pydantic", specifier = ">=2.11.7" },
{ name = "pydantic-rdf", specifier = ">=0.2.0" },
{ name = "pyld", specifier = ">=2.0.4" },
{ name = "pyshacl", specifier = ">=0.30.1" },
{ name = "rdflib-jsonld", specifier = ">=0.6.2" },
@@ -280,6 +282,19 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/6f/9a/e73262f6c6656262b5fdd723ad90f518f579b7bc8622e43a942eec53c938/pydantic_core-2.33.2-cp313-cp313t-win_amd64.whl", hash = "sha256:c2fc0a768ef76c15ab9238afa6da7f69895bb5d1ee83aeea2e3509af4472d0b9", size = 1935777, upload-time = "2025-04-23T18:32:25.088Z" },
]
[[package]]
name = "pydantic-rdf"
version = "0.2.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "pydantic" },
{ name = "rdflib" },
]
sdist = { url = "https://files.pythonhosted.org/packages/9a/5a/1231b6e90cff4ddb6fa44c159593af737b45dc1d4738c63faa33ebe0e0be/pydantic_rdf-0.2.0.tar.gz", hash = "sha256:e1d9055cb6957f85957af6855fe7d99ded59025e2ac4fa35a5ca8e922ec9117f", size = 67412, upload-time = "2025-05-03T02:07:11.568Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/51/e4/ad85698626ec4d6d8ee3cd569a78191ef7ab1d6d729a346dadd6d445429f/pydantic_rdf-0.2.0-py3-none-any.whl", hash = "sha256:603b3b62b00970655c87bc4b7b3fa415679ffb585d4b8605127ecb4d20f1ad5c", size = 8831, upload-time = "2025-05-03T02:07:10.706Z" },
]
[[package]]
name = "pyld"
version = "2.0.4"