From d66254c4cd1295a0471a206affc8e8fc0707cf6d Mon Sep 17 00:00:00 2001 From: Simone Baffelli Date: Thu, 18 Sep 2025 13:55:52 +0200 Subject: [PATCH] First rough version --- .../crate/type_property.py | 243 ++++++++++++++++-- 1 file changed, 218 insertions(+), 25 deletions(-) diff --git a/0.2.x/lib/python/lib-ro-crate-schema/src/lib_ro_crate_schema/crate/type_property.py b/0.2.x/lib/python/lib-ro-crate-schema/src/lib_ro_crate_schema/crate/type_property.py index e58b4ea..3e43295 100644 --- a/0.2.x/lib/python/lib-ro-crate-schema/src/lib_ro_crate_schema/crate/type_property.py +++ b/0.2.x/lib/python/lib-ro-crate-schema/src/lib_ro_crate_schema/crate/type_property.py @@ -1,16 +1,36 @@ +from __future__ import annotations + from enum import Enum import itertools -from typing import Annotated, Iterable, List, Optional, Union, TYPE_CHECKING +from typing import Annotated, Any, Iterable, List, Optional, Union, TYPE_CHECKING from lib_ro_crate_schema.crate.rdf import SCHEMA, is_type, object_id from lib_ro_crate_schema.crate.literal_type import LiteralType, to_rdf from lib_ro_crate_schema.crate.registry import ForwardRef, Registry -from pydantic import BaseModel, Field, ValidationError, ValidationInfo, field_validator, create_model +from pydantic import ( + AnyUrl, + BaseModel, + Field, + ValidationError, + ValidationInfo, + field_validator, + create_model, +) from pydantic_rdf import BaseRdfModel, WithPredicate from rdflib import BNode, Graph, Namespace, URIRef, RDF, RDFS, Literal, OWL, XSD, SDO +import re +from urllib.parse import urlparse +from datetime import datetime +from decimal import Decimal +from typing import Annotated, Optional, Iterable + +from pydantic import create_model +from pydantic_rdf import BaseRdfModel, WithPredicate +from rdflib import URIRef + from pydantic import computed_field @@ -185,31 +205,204 @@ class Type(BaseModel): ) -def generate_model +# --------------------------------------------------------------------------- +# helpers +# --------------------------------------------------------------------------- + + +def _safe_field_name(iri: str) -> str: + """ + Make a safe Python identifier from an IRI: + - prefer fragment; else last path segment + - replace non-word chars with '_' + - prefix 'f_' if empty or starts with a digit + - preserve camelCase (no forced snake_case) + """ + parsed = urlparse(iri) + candidate = parsed.fragment or parsed.path.rsplit("/", 1)[-1] + candidate = re.sub(r"\W", "_", candidate) + if not candidate or candidate[0].isdigit(): + candidate = f"f_{candidate}" + return candidate + + +def _python_type_for_range(rng) -> type: + """ + Map your model's range types to Python types expected by pydantic-rdf. + - LiteralType -> Python scalar + - Type -> URIRef (object property) + """ + match rng: + # Literal ranges + case LiteralType.BOOLEAN: + return bool + case LiteralType.INTEGER: + return int + case LiteralType.DOUBLE: + return float + case LiteralType.DECIMAL: + return Decimal + case LiteralType.FLOAT: + return float + case LiteralType.DATETIME: + return datetime + case LiteralType.STRING: + return str + case LiteralType.XML_LITERAL: + return str # or a custom XML wrapper + + # Object range (points to another resource of some Type) + case Type(): + return URIRef + + case _: + raise TypeError(f"Unsupported range: {rng!r}") + + +def _union_type_for_ranges(ranges: list[LiteralType | Type]) -> type: + """ + Build a PEP 604 union (A | B | ...) from the allowed ranges. + """ + ts = tuple(_python_type_for_range(r) for r in ranges) + base = ts[0] + for t in ts[1:]: + base = base | t + return base + + +def _cardinality_for_prop(t: Type, prop: PropertyType) -> tuple[int, Optional[int]]: + """ + Extract (min, max) from your Type.restrictions(). Defaults to (0, 1). + """ + for r in t.restrictions(): + # r.on_property is an RdfPropertyType; compare by URI string + if str(r.on_property.uri) == str(prop.id): + return r.min_cardinality, r.max_cardinality + return 0, 1 + + +def _maybe_sequence_type(base_t: type, min_c: int, max_c: Optional[int]) -> type: + """ + If cardinality allows multiple values, use list[base_t]. + """ + if max_c is None or max_c > 1 or min_c > 1: + return list[base_t] + return base_t + + +def _maybe_optional(base_t: type, min_c: int) -> type: + """ + Make Optional[...] when min=0 and not already a list[...] type. + """ + match base_t: + case list(x): + return base_t + case _: + return base_t | None if min_c == 0 else base_t + + +def build_entry_model_for_type(t: Type) -> type[BaseRdfModel]: + """ + Create a BaseRdfModel subclass whose fields correspond to the properties + of the given Type, each annotated with WithPredicate(URIRef(prop.id)). + """ + cls_name = _safe_field_name(t.id) + "Entry" + + # shell + Base = create_model( # type: ignore[call-arg] + cls_name, + __base__=BaseRdfModel, + __module__=__name__, + ) + + # fix rdf:type at class level as expected by pydantic-rdf + setattr(Base, "rdf_type", URIRef(t.id)) + + # build fields + fields: dict[str, tuple[type, object]] = {} + for prop in t.properties: + base_t = _union_type_for_ranges(prop.range_includes) + min_c, max_c = _cardinality_for_prop(t, prop) + base_t = _maybe_sequence_type(base_t, min_c, max_c) + base_t = _maybe_optional(base_t, min_c) + + annotated_t = Annotated[base_t, WithPredicate(URIRef(prop.id))] + fields[_safe_field_name(prop.id)] = (annotated_t, None) + + # finalize subclass with attached fields + return create_model( # type: ignore[call-arg] + cls_name, + __base__=Base, + __module__=__name__, + **fields, + ) + + +# --------------------------------------------------------------------------- +# factory +# --------------------------------------------------------------------------- + class MetadataEntry(BaseModel): - id: str + """ + High-level, schema-driven entry: + - id: IRI of the node + - type: Type (with properties) + - properties: values keyed by property IRI, label, or safe field name + """ + + id: Union[AnyUrl, str] = Field(...) type: Type - properties: dict[str, str | int | float | bool] + properties: dict[str, dict | int | str | float] = Field(default_factory=dict) + @field_validator("id", mode="before") + @classmethod + def _normalize_id(cls, v: Any) -> str: + # Accept AnyUrl, URIRef, str + match v: + case URIRef(): + return str(v) + case _: + return str(v) + + # Convenience API def to_internal(self) -> BaseRdfModel: - return RdfMetadataEntry.from_external(self) + """Build the concrete BaseRdfModel instance (flattened triples).""" + return RdfMetadataEntryFactory.from_external(self) + + def to_graph(self, g: Graph | None = None) -> Graph: + """Serialize directly to an rdflib Graph.""" + g = g or Graph() + self.to_rdf().to_graph(g) + return g -create_model(__base__= BaseRdfModel) -# class RdfMetadataEntry(BaseRdfModel): -# rdf_type: URIRef -# properties: Annotated(dict[str, str], WithPredicate()) -# _rdf_namespace = MY_NS -# @classmethod -# def from_external(cls: type["RdfMetadataEntry"], external: MetadataEntry): -# breakpoint() +class RdfMetadataEntryFactory: + """ + Turn a high-level MetadataEntry into a concrete BaseRdfModel instance + with flattened RDF predicates (no nested dict). + """ -# return cls( -# rdf_type=external.type.to_internal().uri, -# uri=external.id, -# **external.properties, -# ) + @staticmethod + def from_external(entry: MetadataEntry) -> BaseRdfModel: + Model = build_entry_model_for_type(entry.type) + + # accept incoming keys as exact IRI, label, or sanitized field name + def _value_for(prop: PropertyType): + for k in (prop.id, prop.label, _safe_field_name(prop.id)): + if k is None: + continue + if (val := entry.properties.get(k)) is not None: + return val + return None + + kwargs = { + _safe_field_name(prop.id): v + for prop in entry.type.properties + if (v := _value_for(prop)) is not None + } + + return Model(uri=entry.id, **kwargs) def merge_graphs_from_lists(*graph_lists: Iterable[list[Graph]]) -> Graph: @@ -233,13 +426,13 @@ class SchemaFacade(BaseModel): return merged -t0 = Type(id="root", subclass_of=[]) -p1 = PropertyType(id="d", label="a", range_includes=[LiteralType.INTEGER]) -p2 = PropertyType(id="d1", label="a1", range_includes=[LiteralType.XML_LITERAL]) -t1 = Type(id="c", equivalent_class="a", subclass_of=[t0], properties=[p1, p2]) -md = MetadataEntry(id="a", type=t1, properties={"d": "a", "d1": "a"}) +t0 = Type(id="Object", subclass_of=[]) +p1 = PropertyType(id="count", label="count", range_includes=[LiteralType.INTEGER]) +p2 = PropertyType(id="name", label="name", range_includes=[LiteralType.STRING]) +t1 = Type(id="MyType", equivalent_class="a", subclass_of=[t0], properties=[p1, p2]) +md = MetadataEntry(id="a", type=t1, properties={"count": 3, "name": "e"}) f1 = SchemaFacade(types=[t1], entries=[md]) g1 = f1.to_rdf() -print(g1.serialize(format="turtle")) +print(g1.serialize(format="json-ld"))