Another Cleanup

This commit is contained in:
Snowwpanda
2025-11-09 23:55:54 +01:00
parent f97d3c2904
commit c8342c2671
29 changed files with 0 additions and 4697 deletions

View File

@@ -1,76 +0,0 @@
# 🧪 RO-Crate Full Example Guide
**File:** `examples/full_example.py`
Comprehensive example demonstrating advanced RO-Crate features: chemical synthesis workflow, circular relationships, SHACL validation, and dynamic updates.
## 📊 **Data Model**
#### **OpenBIS Entities** (`http://openbis.org/`)
| Entity | Properties | Relationships |
|--------|------------|---------------|
| **Project** | code, name, description, created_date | → space |
| **Space** | name, description, created_date | → collections[] |
| **Collection** | name, sample_type, storage_conditions, created_date | _(leaf node)_ |
| **Equipment** | name, model, serial_number, created_date, configuration{} | → parent_equipment |
#### **Schema.org Entities** (`https://schema.org/`)
| Entity | Properties | Relationships |
|--------|------------|---------------|
| **Molecule** | name, **smiles**, molecular_weight, cas_number, created_date, experimental_notes | → contains_molecules[] |
| **Person** | name, orcid, email | → affiliation |
| **Organization** | name, country, website | _(referenced by Person)_ |
| **Publication** | title, doi, publication_date | → authors[], molecules[], equipment[], organization |
## ⚡ **Workflow: Setup → Experiment → Export**
**Created Entities:**
- 1 Project, 1 Space, 1 Collection, 2 Equipment (nested)
- 5 Molecules, 2 People, 1 Organization, 1 Publication
**Key Features:**
-**Circular Relationships**: Person ↔ Person colleagues (auto-resolved)
-**Mixed Namespaces**: OpenBIS + schema.org with auto-context
-**SHACL Validation**: 100% compliance with 150+ rules
-**Dynamic Updates**: Experiment modifies molecules + adds new product
## 🔧 **Key Technical Features**
### **1. Circular Relationship Resolution**
```python
# Automatic resolution of Person ↔ Person colleagues
sarah = Person(colleagues=[marcus])
marcus = Person(colleagues=[sarah])
# → SchemaFacade.resolve_placeholders() merges duplicates
```
### **2. Chemical Data with SMILES**
- Benzene: `c1ccccc1` → Toluene: `Cc1ccccc1` → Product: `(c1ccccc1).(Cc1ccccc1)`
### **3. Scale Metrics**
- **Entities**: 15 → 16 (after synthesis)
- **RDF Triples**: ~500 → ~530
- **SHACL Validation**: 100% compliance
## <20> **Usage**
```bash
PYTHONPATH=./src python examples/full_example.py
```
**Output:**
Initial Crate: `full_example_initial/`
Final Crate: `full_example_final/` including file [experimental_observations](examples/experimental_observations.csv)
## ✅ **Testing**
```bash
python -m pytest tests/ -v # Full suite (85 tests)
```
---
**Production-ready RO-Crate library with automatic relationship resolution, comprehensive validation, and modern architecture.**

View File

@@ -1,130 +0,0 @@
# PyPI Publishing Checklist
Use this checklist when you're ready to publish to PyPI.
## Pre-Publishing ✅
- [ ] All tests pass: `python run_all_tests.py`
- [ ] Examples run successfully: `python examples/decorator_example.py`
- [ ] Version updated in:
- [ ] `pyproject.toml` (line 3: `version = "X.Y.Z"`)
- [ ] `src/lib_ro_crate_schema/__init__.py` (line 25: `__version__ = "X.Y.Z"`)
- [ ] `src/lib_ro_crate_schema/crate/__init__.py` (line 4: `__version__ = "X.Y.Z"`)
- [ ] Changes committed to git
- [ ] Git tag created: `git tag -a vX.Y.Z -m "Release vX.Y.Z"`
## Build Package ✅
```bash
cd "c:\git\eln_interoperability\ro-crate-interoperability-profile\0.2.x\lib\python\lib-ro-crate-schema"
# Clean old builds
Remove-Item -Recurse -Force dist, build -ErrorAction SilentlyContinue
# Build
python -m build
# Verify
python -m twine check dist/*
```
- [ ] Build completed without errors
- [ ] Twine check passed
## Test on Test PyPI ✅
```bash
# Upload to Test PyPI
python -m twine upload --repository testpypi dist/*
```
When prompted:
- Username: `__token__`
- Password: `[Your Test PyPI API token]`
- [ ] Uploaded successfully
- [ ] Check the page: https://test.pypi.org/project/lib-ro-crate-schema/
### Test Installation
```bash
# Create test environment
python -m venv test_env
test_env\Scripts\activate
# Install from Test PyPI
pip install --index-url https://test.pypi.org/simple/ --extra-index-url https://pypi.org/simple/ lib-ro-crate-schema
# Test import
python -c "from lib_ro_crate_schema import SchemaFacade, ro_crate_schema; print('✅ Import successful!')"
```
- [ ] Installed without errors
- [ ] Import works correctly
- [ ] Basic functionality works
## Publish to PyPI ✅
**⚠️ This cannot be undone! Once published, you cannot upload the same version again.**
```bash
# Upload to production PyPI
python -m twine upload dist/*
```
When prompted:
- Username: `__token__`
- Password: `[Your PyPI API token]`
- [ ] Uploaded successfully
- [ ] Check the page: https://pypi.org/project/lib-ro-crate-schema/
- [ ] README renders correctly on PyPI
### Verify Installation
```bash
# Fresh environment
python -m venv verify_env
verify_env\Scripts\activate
# Install from PyPI
pip install lib-ro-crate-schema
# Test
python -c "from lib_ro_crate_schema import SchemaFacade; print('✅ Success!')"
```
- [ ] Installed from PyPI successfully
- [ ] All imports work
## Post-Publishing ✅
- [ ] Push git tag: `git push origin vX.Y.Z`
- [ ] Create GitHub release from tag
- [ ] Add release notes to GitHub release
- [ ] Update CHANGELOG (if you have one)
- [ ] Announce release (if applicable)
## Quick Commands
```bash
# All in one - Test PyPI
python -m build && python -m twine check dist/* && python -m twine upload --repository testpypi dist/*
# All in one - Production PyPI
python -m build && python -m twine check dist/* && python -m twine upload dist/*
```
## Need Help?
- **Test PyPI**: https://test.pypi.org/account/register/
- **PyPI**: https://pypi.org/account/register/
- **Full Guide**: See [PUBLISHING.md](PUBLISHING.md)
- **Package Docs**: See [README.md](README.md)
---
**Remember**:
- Always test on Test PyPI first!
- You cannot reupload the same version to PyPI
- Keep your API tokens secure

View File

@@ -1,163 +0,0 @@
# Quick Start Guide
Get started with `lib-ro-crate-schema` in 5 minutes!
## Installation
```bash
pip install lib-ro-crate-schema
```
## Your First RO-Crate
Create a file called `my_first_crate.py`:
```python
from lib_ro_crate_schema import SchemaFacade, ro_crate_schema, Field
from pydantic import BaseModel
# 1. Define your data model with decorators
@ro_crate_schema(ontology="https://schema.org/Person")
class Person(BaseModel):
name: str = Field(ontology="https://schema.org/name")
email: str = Field(ontology="https://schema.org/email")
# 2. Create some data
alice = Person(name="Alice Smith", email="alice@example.com")
# 3. Create and export an RO-Crate
facade = SchemaFacade()
facade.add_all_registered_models() # Register your models
facade.add_model_instance(alice, "person_001") # Add data
facade.write("my_first_crate") # Export!
print("✅ RO-Crate created in ./my_first_crate/")
```
Run it:
```bash
python my_first_crate.py
```
This creates a folder `my_first_crate/` containing:
- `ro-crate-metadata.json` - Your data and schema in JSON-LD format
- Proper RDF/OWL type definitions
- Schema.org vocabulary mappings
## Next Steps
### Add Files to Your Crate
```python
# Add a data file before writing
facade.add_file("data.csv",
name="Experimental Data",
description="Raw measurements")
facade.write("my_crate")
```
### Define Related Objects
```python
@ro_crate_schema(ontology="https://schema.org/Organization")
class Organization(BaseModel):
name: str = Field(ontology="https://schema.org/name")
@ro_crate_schema(ontology="https://schema.org/Person")
class Person(BaseModel):
name: str = Field(ontology="https://schema.org/name")
affiliation: Organization = Field(ontology="https://schema.org/affiliation")
# Create related objects
mit = Organization(name="MIT")
alice = Person(name="Alice", affiliation=mit)
# Export both
facade = SchemaFacade()
facade.add_all_registered_models()
facade.add_model_instance(mit, "org_001")
facade.add_model_instance(alice, "person_001")
facade.write("my_crate")
```
### Import and Modify Existing Crates
```python
from lib_ro_crate_schema import SchemaFacade
# Load existing crate
facade = SchemaFacade.from_ro_crate("existing_crate")
# Modify it
# (add more instances, files, etc.)
# Export modified version
facade.write("modified_crate")
```
## What Just Happened?
When you use `@ro_crate_schema`:
1. Your Pydantic model is registered as an RO-Crate type
2. Field annotations map to ontology properties (like Schema.org)
3. The library generates proper RDF/OWL definitions
4. Your data is packaged following the RO-Crate specification
## More Examples
Check out the `examples/` directory:
- `decorator_example.py` - More complex schemas
- `full_example.py` - Scientific workflow with files
- `minimal_import_example.py` - Working with existing crates
## Common Patterns
### Optional Fields
```python
from typing import Optional
@ro_crate_schema(ontology="https://schema.org/Person")
class Person(BaseModel):
name: str = Field(ontology="https://schema.org/name")
email: Optional[str] = Field(default=None, ontology="https://schema.org/email")
```
### Lists
```python
from typing import List
@ro_crate_schema(ontology="https://schema.org/Dataset")
class Dataset(BaseModel):
name: str = Field(ontology="https://schema.org/name")
authors: List[Person] = Field(ontology="https://schema.org/author")
```
### Dates and Times
```python
from datetime import datetime
@ro_crate_schema(ontology="https://schema.org/Event")
class Event(BaseModel):
name: str = Field(ontology="https://schema.org/name")
date: datetime = Field(ontology="https://schema.org/startDate")
```
## Need Help?
- **Full Documentation**: See [README.md](README.md)
- **API Reference**: Browse the [src/lib_ro_crate_schema/](src/lib_ro_crate_schema/) directory
- **Examples**: Check [examples/](examples/) for real-world usage
- **Issues**: Report bugs at [GitHub Issues](https://github.com/Snowwpanda/ro-crate-interoperability-profile/issues)
## Understanding RO-Crate
RO-Crate (Research Object Crate) is a standard for packaging research data with metadata. It uses:
- **JSON-LD**: Linked data format
- **Schema.org**: Standard vocabulary for describing things
- **RDF/OWL**: Semantic web technologies
This library makes it easy to create RO-Crates from Python without needing to understand all these technologies!
---
**Ready for more?** Check out the full [README.md](README.md) for advanced usage and API details.

View File

@@ -1,110 +0,0 @@
@startuml RO-Crate Architecture
!theme plain
skinparam backgroundColor white
skinparam componentStyle rectangle
package "Input Sources" as inputs {
[SHACL Schema\nConstraints] as shacl
[Pydantic Models\n@ro_crate_schema] as pymod
[Manual Schema\nDefinition] as manual
[Existing RO-Crate\nMetadata] as rocin
}
package "External Dependencies" as external {
[RDFLib\nRDF Graph Processing] as rdflib
[RO-Crate\nPython Library] as rocrate
[Pydantic\nData Validation] as pydantic
[JSON-LD\nLinked Data] as jsonld
}
package "Core Library Components" as core {
package "Schema Facade (Orchestrator)" as orchestrator {
[SchemaFacade\nMain API Controller] as sf
}
package "Schema Components" as components {
[Type\nRDFS Classes] as type
[TypeProperty\nRDFS Properties] as prop
[MetadataEntry\nRDF Instances] as meta
[Restriction\nConstraints] as rest
}
package "Registry & Discovery" as registry {
[SchemaRegistry\nDecorator System] as reg
[ForwardRefResolver\nReference Linking] as frr
}
package "JSON-LD Processing" as jsonld_proc {
[JSONLDUtils\nContext Generation] as jsonldutils
[Dynamic Context\nNamespace Detection] as ctx
}
package "RDF Processing" as rdf_proc {
[RDF Module\nTriple Generation] as rdfp
[RDF Graph\nConversion] as graph
}
}
package "API Interfaces" as apis {
[Python API\nadd_type(), get_entries()] as pyapi
[Java API Compatibility\naddType(), getEntries()] as japi
[Decorator API\n@ro_crate_schema] as decapi
}
package "Output Formats" as outputs {
[RO-Crate\nJSON-LD Files] as rocout
[RDF/Turtle\nSerialization] as ttlout
[Pure JSON-LD\nSchema Export] as jsonout
[Data Files\nAttachment] as fileout
}
package "Examples & Usage" as usage {
[Examples\nfull_example.py\nquickstart.py] as examples
[Test Suite\npytest Framework\n83 Tests] as tests
}
' Data Flow Connections
shacl --> sf
pymod --> reg
manual --> sf
rocin --> sf
reg --> sf
sf --> type
sf --> prop
sf --> meta
sf --> rest
type --> rdfp
prop --> rdfp
meta --> rdfp
rest --> rdfp
rdfp --> graph
graph --> jsonldutils
jsonldutils --> ctx
frr --> sf
sf --> pyapi
sf --> japi
reg --> decapi
sf --> rocout
graph --> ttlout
jsonldutils --> jsonout
sf --> fileout
pyapi --> examples
japi --> examples
decapi --> examples
sf --> tests
' External Dependencies
rdflib --> graph
rocrate --> sf
pydantic --> reg
jsonld --> jsonldutils
@enduml

View File

@@ -1,118 +0,0 @@
@startuml RO-Crate Core Classes
!theme plain
skinparam class {
BackgroundColor White
BorderColor Black
ArrowColor Black
}
package "Core Schema Objects" {
class SchemaFacade {
+types: List[Type]
+metadata_entries: List[MetadataEntry]
+standalone_properties: List[TypeProperty]
+standalone_restrictions: List[Restriction]
+prefix: str
--
+addType(type: Type)
+addEntry(entry: MetadataEntry)
+add_property_type(prop: TypeProperty)
+get_crate(): ROCrate
+from_ro_crate(path): SchemaFacade
+write(destination: str)
+to_json(): dict
}
class Type {
+id: str
+rdfs_property: List[TypeProperty]
+restrictions: List[Restriction]
+label: str
+comment: str
+sub_class_of: List[ForwardRef]
--
+to_triples(): Generator[Triple]
}
class TypeProperty {
+id: str
+range_includes: List[LiteralType]
+domain_includes: List[str]
+required: bool
+label: str
+comment: str
--
+to_triples(): Generator[Triple]
}
class MetadataEntry {
+id: str
+class_id: str
+properties: Dict[str, Any]
+label: str
+comment: str
--
+to_triples(): Generator[Triple]
}
class Restriction {
+id: str
+target_class: str
+target_property: str
+restriction_type: RestrictionType
+value: Any
--
+to_triples(): Generator[Triple]
}
}
package "Registry System" {
class SchemaRegistry {
+registered_models: Dict[str, TypeTemplate]
--
+register_model(name: str, template: TypeTemplate)
+get_model(name: str): TypeTemplate
+list_models(): List[str]
}
class TypeTemplate {
+name: str
+properties: List[TypePropertyTemplate]
+base_classes: List[str]
--
+to_type(): Type
}
}
package "Processing Utilities" {
class JSONLDUtils {
--
+get_context(graph: Graph): List
+add_schema_to_crate(facade: SchemaFacade, crate: ROCrate): ROCrate
}
class ForwardRefResolver {
--
+resolve_ref(ref: Union[ForwardRef, str]): Any
}
}
' Relationships
SchemaFacade ||--o{ Type : contains
SchemaFacade ||--o{ MetadataEntry : contains
SchemaFacade ||--o{ TypeProperty : "standalone properties"
SchemaFacade ||--o{ Restriction : "standalone restrictions"
Type ||--o{ TypeProperty : defines
Type ||--o{ Restriction : constraints
SchemaRegistry ||--o{ TypeTemplate : manages
TypeTemplate --> Type : generates
SchemaFacade --> JSONLDUtils : uses
SchemaFacade --> ForwardRefResolver : uses
SchemaFacade --> SchemaRegistry : accesses
@enduml

View File

@@ -1,174 +0,0 @@
#!/usr/bin/env python3
"""
Focused test for circular import handling in RO-Crate schema.
This test specifically creates two people who are each other's colleagues
to verify how the system handles circular references during:
1. Schema creation
2. RDF serialization
3. JSON-LD export
4. Round-trip import/export
"""
import sys
import json
from pathlib import Path
from typing import List, Optional
# Add src to path for imports
sys.path.insert(0, str(Path(__file__).parent.parent / "src"))
from pydantic import BaseModel
from lib_ro_crate_schema.crate.decorators import ro_crate_schema, Field
from lib_ro_crate_schema.crate.schema_facade import SchemaFacade
@ro_crate_schema(ontology="https://schema.org/Organization")
class SimpleOrganization(BaseModel):
"""Simple organization for testing"""
name: str = Field(ontology="https://schema.org/name")
country: str = Field(ontology="https://schema.org/addressCountry")
@ro_crate_schema(ontology="https://schema.org/Person")
class SimplePerson(BaseModel):
"""Person with circular colleague relationship"""
name: str = Field(ontology="https://schema.org/name")
email: str = Field(ontology="https://schema.org/email")
affiliation: SimpleOrganization = Field(ontology="https://schema.org/affiliation")
colleagues: List['SimplePerson'] = Field(default=[], ontology="https://schema.org/colleague")
def test_circular_imports():
"""Test circular colleague relationships"""
print("🧪 CIRCULAR IMPORT TEST")
print("=" * 50)
# Create organization
org = SimpleOrganization(
name="Test University",
country="Switzerland"
)
# Create two people without colleagues initially
alice = SimplePerson(
name="Dr. Alice Johnson",
email="alice@test.edu",
affiliation=org,
colleagues=[]
)
bob = SimplePerson(
name="Prof. Bob Smith",
email="bob@test.edu",
affiliation=org,
colleagues=[]
)
print(f"✅ Created Alice (colleagues: {len(alice.colleagues)})")
print(f"✅ Created Bob (colleagues: {len(bob.colleagues)})")
# Establish circular colleague relationship
alice = alice.model_copy(update={'colleagues': [bob]})
bob = bob.model_copy(update={'colleagues': [alice]})
print(f"\n🔄 Circular relationships established:")
print(f" Alice colleagues: {[c.name for c in alice.colleagues]}")
print(f" Bob colleagues: {[c.name for c in bob.colleagues]}")
# Test schema creation with circular refs
print(f"\n📊 Testing schema creation...")
facade = SchemaFacade()
facade.add_all_registered_models()
print(f" ✅ Schema created with {len(facade.types)} types")
# Add instances to facade
facade.add_model_instance(org, "test_org")
facade.add_model_instance(alice, "alice")
facade.add_model_instance(bob, "bob")
print(f" ✅ Added {len(facade.metadata_entries)} instances to facade")
# Test RDF generation
print(f"\n🕸️ Testing RDF generation...")
try:
graph = facade.to_graph()
print(f" ✅ Generated {len(graph)} RDF triples successfully")
except Exception as e:
print(f" ❌ RDF generation failed: {e}")
return False
# Test JSON-LD export
print(f"\n📄 Testing RO-Crate export...")
try:
import os
output_dir = "output_crates"
os.makedirs(output_dir, exist_ok=True)
output_path = os.path.join(output_dir, "circular_test")
facade.write(output_path, name="Circular Import Test",
description="Testing circular colleague relationships")
print(f" ✅ Exported to {output_path}")
except Exception as e:
print(f" ❌ Export failed: {e}")
return False
# Test round-trip import
print(f"\n🔄 Testing round-trip import...")
try:
imported_facade = SchemaFacade.from_ro_crate(output_path)
print(f" ✅ Imported {len(imported_facade.types)} types, {len(imported_facade.metadata_entries)} entries")
# Check if circular references are preserved
alice_entry = None
bob_entry = None
for entry in imported_facade.metadata_entries:
if entry.id == "alice":
alice_entry = entry
elif entry.id == "bob":
bob_entry = entry
if alice_entry and bob_entry:
print(f" ✅ Found Alice and Bob entries after import")
# Check if colleague relationships survived
alice_colleagues = alice_entry.properties.get('colleagues', [])
bob_colleagues = bob_entry.properties.get('colleagues', [])
print(f" Alice colleagues in imported data: {alice_colleagues}")
print(f" Bob colleagues in imported data: {bob_colleagues}")
else:
print(f" ⚠️ Could not find Alice/Bob entries after import")
except Exception as e:
print(f" ❌ Import failed: {e}")
return False
# Examine the actual JSON-LD structure
print(f"\n🔍 Examining generated JSON-LD structure...")
try:
with open(f"{output_path}/ro-crate-metadata.json", 'r') as f:
crate_data = json.load(f)
# Find Person entities
person_entities = []
for entity in crate_data.get("@graph", []):
if entity.get("@type") == "SimplePerson":
person_entities.append(entity)
print(f" Found {len(person_entities)} Person entities:")
for person in person_entities:
person_id = person.get("@id", "unknown")
person_name = person.get("base:name", "unknown")
colleagues = person.get("base:colleagues", "none")
print(f" - {person_id}: {person_name}")
print(f" Colleagues: {colleagues}")
except Exception as e:
print(f" ⚠️ Could not examine JSON-LD: {e}")
print(f"\n🎉 Circular import test completed!")
return True
if __name__ == "__main__":
test_circular_imports()

View File

@@ -1,185 +0,0 @@
"""
Example demonstrating the decorator-based model registration system.
"""
from datetime import datetime
from typing import List, Optional
from pydantic import BaseModel
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent / "src"))
from lib_ro_crate_schema.crate.decorators import ro_crate_schema, Field
from lib_ro_crate_schema.crate.schema_facade import SchemaFacade
from lib_ro_crate_schema.crate.schema_registry import get_schema_registry
# Example 1: Basic model with ontology annotations (required and optional fields)
@ro_crate_schema(ontology="https://schema.org/Person")
class Person(BaseModel):
"""A person in the research project"""
# Required fields (minCardinality: 1)
name: str = Field(ontology="https://schema.org/name", comment="Person's full name")
email: str = Field(ontology="https://schema.org/email", comment="Contact email address")
# Optional fields (minCardinality: 0)
orcid: Optional[str] = Field(default=None, ontology="https://orcid.org/", comment="ORCID identifier")
phone: Optional[str] = Field(default=None, ontology="https://schema.org/telephone", comment="Phone number")
affiliation: Optional[str] = Field(default=None, ontology="https://schema.org/affiliation", comment="Institution affiliation")
# Example 2: Model with relationships and mixed required/optional fields
@ro_crate_schema(ontology="https://schema.org/Dataset")
class Dataset(BaseModel):
"""A research dataset"""
# Required fields (minCardinality: 1)
title: str = Field(ontology="https://schema.org/name", comment="Dataset title")
description: str = Field(ontology="https://schema.org/description", comment="Dataset description")
authors: List[Person] = Field(ontology="https://schema.org/author", comment="Dataset authors")
created_date: datetime = Field(ontology="https://schema.org/dateCreated", comment="Creation date")
# Optional fields (minCardinality: 0)
keywords: Optional[List[str]] = Field(default=None, ontology="https://schema.org/keywords", comment="Research keywords")
version: Optional[str] = Field(default=None, ontology="https://schema.org/version", comment="Dataset version")
license: Optional[str] = Field(default=None, ontology="https://schema.org/license", comment="License information")
# Example 3: Model with institutional information
@ro_crate_schema(ontology="https://schema.org/Organization")
class Institution(BaseModel):
"""Research institution or organization"""
name: str = Field(ontology="https://schema.org/name", comment="Institution name")
country: str = Field(comment="Country where institution is located")
website: Optional[str] = Field(default=None, comment="Institution website")
def example_usage():
"""Demonstrate the complete workflow"""
print("=== Decorator-based RO-Crate Schema Generation ===")
print()
# 1. Show registered models (automatically registered by decorators)
registry = get_schema_registry()
print("Registered models:")
for model_name, type_template in registry.get_all_type_templates().items():
print(f" - {model_name}: {type_template.ontology}")
for prop_info in type_template.type_properties:
print(f" * {prop_info.name}: {prop_info.rdf_type} (ontology: {prop_info.ontology})")
print()
# 2. Create schema facade and add all registered models
facade = SchemaFacade()
facade.add_all_registered_models()
print(f"Schema contains {len(facade.types)} types:")
for type_obj in facade.types:
print(f" - {type_obj.id}: {type_obj.ontological_annotations}")
print()
# 3. Create model instances and add them as metadata
person1 = Person(
name="Dr. Jane Smith",
email="jane.smith@university.edu",
orcid="0000-0000-0000-0001"
)
person2 = Person(
name="Prof. John Doe",
email="john.doe@institute.org"
)
dataset = Dataset(
title="Climate Change Impact Study",
description="Analysis of climate data from 2000-2023",
authors=[person1, person2],
created_date=datetime(2024, 1, 15),
keywords=["climate", "environment", "data analysis"]
)
# Add instances as metadata entries
facade.add_model_instance(person1, "jane_smith")
facade.add_model_instance(person2, "john_doe")
facade.add_model_instance(dataset, "climate_study_2024")
print(f"Metadata contains {len(facade.metadata_entries)} entries:")
for entry in facade.metadata_entries:
print(f" - {entry.id} ({entry.class_id})")
print(f" Properties: {entry.properties}")
print(f" References: {entry.references}")
print()
# 4. Generate RDF graph
graph = facade.to_graph()
print(f"Generated RDF graph with {len(graph)} triples")
print()
print("Sample triples:")
for i, (s, p, o) in enumerate(graph):
if i < 10: # Show first 10 triples
print(f" {s} {p} {o}")
print()
# 5. Convert to RO-Crate
from lib_ro_crate_schema.crate.jsonld_utils import add_schema_to_crate
from rocrate.rocrate import ROCrate
import json
from pathlib import Path
print("🔄 Adding schema and metadata to RO-Crate...")
crate = ROCrate()
crate.name = "Decorator Example RO-Crate"
crate.description = "Generated using decorator-based schema registration"
final_crate = add_schema_to_crate(facade, crate)
# Get JSON representation by writing to temp directory
import tempfile
with tempfile.TemporaryDirectory() as temp_dir:
final_crate.write(temp_dir)
metadata_file = Path(temp_dir) / "ro-crate-metadata.json"
with open(metadata_file, 'r') as f:
final_crate_json = json.load(f)
# Save to file
output_path = Path("ro-crate-metadata.json")
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(final_crate_json, f, indent=2)
print(f"✅ RO-Crate saved to: {output_path.absolute()}")
print(f"📊 Total entities in @graph: {len(final_crate_json['@graph'])}")
print()
# Show entity types summary
entity_types = {}
for entity in final_crate_json["@graph"]:
entity_type = entity.get("@type", "Unknown")
if isinstance(entity_type, list):
for t in entity_type:
entity_types[t] = entity_types.get(t, 0) + 1
else:
entity_types[entity_type] = entity_types.get(entity_type, 0) + 1
print("📋 Entity types in RO-Crate:")
for entity_type, count in entity_types.items():
print(f" - {entity_type}: {count}")
print()
# Show context
context = final_crate_json["@context"]
print(f"🔗 RO-Crate @context: {context}")
print()
print("🎯 Key Features Demonstrated:")
print(" ✓ Pydantic models → RDFS schema")
print(" ✓ Ontology annotations (schema.org, ORCID)")
print(" ✓ Model instances → RDF metadata")
print(" ✓ Proper RO-Crate integration")
print(" ✓ JSON-LD context management")
print(" ✓ Schema embedding in ro-crate-metadata.json")
return facade, final_crate_json
if __name__ == "__main__":
example_usage()

View File

@@ -1,135 +0,0 @@
# Utility functions for reconstruction
import json
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent / "src"))
from lib_ro_crate_schema.crate.type import Type
from lib_ro_crate_schema.crate.type_property import TypeProperty
from lib_ro_crate_schema.crate.literal_type import LiteralType
from lib_ro_crate_schema.crate.metadata_entry import MetadataEntry
from lib_ro_crate_schema.crate.schema_facade import SchemaFacade
from rocrate.rocrate import ROCrate
from rdflib import Graph
from lib_ro_crate_schema.crate.jsonld_utils import add_schema_to_crate
# from lib_ro_crate_schema.crate import reconstruction # Not available
def main():
"""
Example demonstrating manual RO-Crate construction with automatic OWL restrictions.
When manually creating TypeProperty objects, you can specify required=True/False
to automatically generate OWL restrictions with appropriate cardinality constraints:
- required=True -> generates minCardinality: 1 (field is mandatory)
- required=False -> generates minCardinality: 0 (field is optional)
This ensures Java compatibility where OWL restrictions define field requirements.
"""
# Define properties with cardinality information
name = TypeProperty(
id="name",
range_includes=[LiteralType.STRING],
required=True, # This will generate minCardinality: 1
label="Full Name",
comment="The full name of the entity"
)
identifier = TypeProperty(
id="identifier",
range_includes=[LiteralType.STRING],
required=True, # This will generate minCardinality: 1
label="Identifier",
comment="Unique identifier for the entity"
)
colleague = TypeProperty(
id="colleague",
range_includes=["Participant"],
required=False, # This will generate minCardinality: 0 (optional)
label="Colleague",
comment="Optional colleague relationship"
)
participant_type = Type(
id="Participant",
type="Type",
subclass_of=["https://schema.org/Thing"],
ontological_annotations=["http://purl.org/dc/terms/creator"],
rdfs_property=[name, identifier],
comment="A participant in the research",
label="Participant",
)
creator_type = Type(
id="Creator",
type="Type",
subclass_of=["https://schema.org/Thing"],
ontological_annotations=["http://purl.org/dc/terms/creator"],
rdfs_property=[name, identifier, colleague],
comment="A creator of the research work",
label="Creator",
)
# Example MetadataEntry using new format with class_id and values
creator_entry = MetadataEntry(
id="creator1",
class_id="Creator",
values={
"name": "John Author",
"identifier": "https://orcid.org/0000-0000-0000-0000",
},
references={},
)
participant_entry = MetadataEntry(
id="participant",
class_id="Participant",
values={
"name": "Karl Participant",
"identifier": "https://orcid.org/0000-0000-0000-0001",
},
references={
"colleague": ["creator1"]
},
)
schema = SchemaFacade(
types=[creator_type, participant_type],
# properties=[has_name, has_identifier],
metadata_entries=[creator_entry, participant_entry],
)
#Resolve refs
schema.resolve_forward_refs()
#Add it to a crate
crate = ROCrate()
crate.license = "a"
crate.name = "mtcrate"
crate.description = "test crate"
crate = add_schema_to_crate(schema, crate)
#Serialise - write to temp dir and read back for JSON output
import tempfile
with tempfile.TemporaryDirectory() as temp_dir:
crate.write(temp_dir)
metadata_file = Path(temp_dir) / "ro-crate-metadata.json"
with open(metadata_file, 'r') as f:
res = json.load(f)
print(json.dumps(res))
# Write to file
import os
output_dir = "output_crates"
os.makedirs(output_dir, exist_ok=True)
crate_path = os.path.join(output_dir, "example_crate")
crate.write(crate_path)
# Use the reconstruction module's main entry point
def reconstruct(graph: Graph):
# return reconstruction.reconstruct(graph) # Not available
raise NotImplementedError("Reconstruction module not available")
if __name__ == "__main__":
main()

View File

@@ -1,224 +0,0 @@
#!/usr/bin/env python3
"""
Demonstration of exporting Pydantic models from SchemaFacade.
This example shows how to:
1. Create a schema with Type definitions
2. Export those Types as Pydantic model classes
3. Use the generated classes to create and validate instances
"""
import sys
from pathlib import Path
# Add src to path for imports
sys.path.insert(0, str(Path(__file__).parent.parent / "src"))
from lib_ro_crate_schema.crate.schema_facade import SchemaFacade
from lib_ro_crate_schema.crate.type import Type
from lib_ro_crate_schema.crate.type_property import TypeProperty
from lib_ro_crate_schema.crate.restriction import Restriction
from lib_ro_crate_schema.crate.literal_type import LiteralType
from pydantic import BaseModel
from typing import List, Optional
def main():
print("🔧 RO-Crate Pydantic Export Demo")
print("=" * 50)
# Create SchemaFacade and add some types
# For this demo, we'll define two types: Person and Organization
# The ro-crate-schema will not be exported as crate, just used here for model generation
facade = SchemaFacade()
# Define Person type, starting with the properties and restrictions
person_name_prop = TypeProperty(
id="name",
label="Full Name",
comment="The complete name of the person",
range_includes=["http://www.w3.org/2001/XMLSchema#string"],
required=True
)
person_age_prop = TypeProperty(
id="age",
label="Age",
comment="Age in years",
range_includes=["http://www.w3.org/2001/XMLSchema#integer"],
required=False
)
person_emails_prop = TypeProperty(
id="emails",
label="Email Addresses",
comment="List of email addresses",
range_includes=["http://www.w3.org/2001/XMLSchema#string"],
required=False
)
# Create restrictions
person_name_restriction = Restriction(
property_type="name",
min_cardinality=1,
max_cardinality=1
)
person_age_restriction = Restriction(
property_type="age",
min_cardinality=0,
max_cardinality=1
)
person_emails_restriction = Restriction(
property_type="emails",
min_cardinality=0,
max_cardinality=None # Unbounded list
)
person_type = Type(
id="Person",
label="Person",
comment="Represents a person with personal information",
subclass_of=["https://schema.org/Person"],
rdfs_property=[person_name_prop, person_age_prop, person_emails_prop],
restrictions=[person_name_restriction, person_age_restriction, person_emails_restriction]
)
# Define Organization type, starting with properties and restrictions
org_name_prop = TypeProperty(
id="name",
label="Organization Name",
comment="The official name of the organization",
range_includes=["http://www.w3.org/2001/XMLSchema#string"],
required=True
)
org_employees_prop = TypeProperty(
id="employees",
label="Employees",
comment="People working for this organization",
range_includes=["Person"], # Reference to Person type
required=False
)
org_name_restriction = Restriction(
property_type="name",
min_cardinality=1,
max_cardinality=1
)
org_employees_restriction = Restriction(
property_type="employees",
min_cardinality=0,
max_cardinality=None # Unbounded list
)
organization_type = Type(
id="Organization",
label="Organization",
comment="Represents an organization or company",
subclass_of=["https://schema.org/Organization"],
rdfs_property=[org_name_prop, org_employees_prop],
restrictions=[org_name_restriction, org_employees_restriction]
)
# Add types to facade
facade.addType(person_type)
facade.addType(organization_type)
print("📋 Schema created with types:")
for type_def in facade.get_types():
print(f" - {type_def.id}: {type_def.comment}")
print("\n🏗️ Exporting Pydantic models...")
# Export individual model
print("\n1⃣ Export single model:")
PersonModel = facade.export_pydantic_model("Person")
print(f"Generated class: {PersonModel.__name__}")
print(f"Fields: {list(PersonModel.__annotations__.keys())}")
# Export all models
print("\n2⃣ Export all models:")
models = facade.export_all_pydantic_models()
print("Generated models:")
for name, model_class in models.items():
print(f" - {name}: {model_class.__name__}")
print(f" Fields: {list(model_class.__annotations__.keys())}")
print("\n✨ Testing generated models...")
# Test Person model
print("\n👤 Creating Person instances:")
try:
# Valid person with required field
person1 = PersonModel(name="Alice Johnson", age=30, emails=["alice@example.com", "alice.j@work.com"])
print(f"✅ Created person: {person1.name}, age {person1.age}")
print(f" Emails: {person1.emails}")
# Person with only required fields
person2 = PersonModel(name="Bob Smith")
print(f"✅ Created person: {person2.name} (minimal)")
# Test validation - missing required field
print("\n🔍 Testing validation:")
try:
invalid_person = PersonModel(age=25) # Missing required 'name'
print("❌ This should have failed!")
except Exception as e:
print(f"✅ Validation caught error: {e}")
except Exception as e:
print(f"❌ Error creating person: {e}")
# Test Organization model
print("\n🏢 Creating Organization instances:")
try:
OrganizationModel = models["Organization"]
# Note: For now, forward references to other models need to be handled carefully
# In a real implementation, you'd want to resolve these properly
person_as_dict = {"name": "Charlie Brown", "age": 28}
org = OrganizationModel(name="Acme Corporation", employees=[person1, person_as_dict])
print(f"✅ Created organization: {org.name} with employees {[emp.name for emp in org.employees]}")
# Test validation - employees must be person instances or dicts with the right fields
try:
invalid_org = OrganizationModel(name="Invalid Org", employees=["Not a person"])
print("❌ This should have failed!")
except Exception as e:
print(f"✅ Validation caught error: {e}")
# Test validation - employees missing name (required field) will fail
fake_person = {"firstname": "Fake", "lastname": "Person"}
try:
invalid_org = OrganizationModel(name="Invalid Org", employees=[fake_person])
print("❌ This should have failed!")
except Exception as e:
print(f"✅ Validation caught error: {e}")
except Exception as e:
print(f"❌ Error creating organization: {e}")
print("\n🎯 Model schemas:")
print("\nPerson model schema:")
try:
print(PersonModel.model_json_schema())
except Exception as e:
print(f"Schema generation error: {e}")
print("\n🎉 Pydantic export demo completed!")
print("\n💡 Key features demonstrated:")
print(" ✓ Export Type definitions as Pydantic model classes")
print(" ✓ Handle required vs optional fields from OWL restrictions")
print(" ✓ Support list fields (unbounded cardinality)")
print(" ✓ Map RDF types to Python types")
print(" ✓ Generate proper Pydantic validation")
print(" ✓ Preserve field metadata (descriptions)")
if __name__ == "__main__":
main()

View File

@@ -1,36 +0,0 @@
#!/usr/bin/env python3
"""
Minimal import example: Load external openBIS RO-Crate and print summary.
"""
import sys
from pathlib import Path
# Add src to path
sys.path.insert(0, str(Path(__file__).parent.parent / "src"))
from lib_ro_crate_schema.crate.schema_facade import SchemaFacade
# Import openBIS RO-Crate from external lib/example (kept outside for now)
crate_path = Path(__file__).parent.parent.parent.parent / "example" / "obenbis-one-publication" / "ro-crate-metadata.json"
facade = SchemaFacade.from_ro_crate(str(crate_path))
# Print summary
print(f"📁 Imported SchemaFacade with:")
print(f" - {len(facade.types)} RDFS Classes (types)")
print(f" - {len(facade.metadata_entries)} metadata entries")
print(f"\n📋 Types imported:")
for t in facade.types:
props = len(t.rdfs_property or [])
restrictions = len(t.get_restrictions())
print(f" - {t.id}: {props} properties, {restrictions} restrictions")
print(f"\n📦 Metadata entries:")
for entry in facade.metadata_entries[:5]: # Show first 5
print(f" - {entry.id} (type: {entry.class_id})")
print(f"\n🎯 Ready to use! You can now:")
print(f" - Export: facade.write('output-directory')")
print(f" - Add data: facade.addEntry(...)")
print(f" - Add types: facade.addType(...)")

View File

@@ -1,77 +0,0 @@
#!/usr/bin/env python3
"""
Test runner for RO-Crate Schema Library
"""
import sys
import subprocess
from pathlib import Path
def run_test(test_file):
"""Run a single test file and return success status"""
print(f"\n🧪 Running {test_file.name}")
print("=" * 60)
try:
result = subprocess.run([sys.executable, str(test_file)],
capture_output=False,
check=True,
cwd=test_file.parent)
print(f"{test_file.name} PASSED")
return True
except subprocess.CalledProcessError as e:
print(f"{test_file.name} FAILED (exit code: {e.returncode})")
return False
except Exception as e:
print(f"{test_file.name} ERROR: {e}")
return False
def main():
"""Run all tests"""
print("🚀 RO-Crate Schema Library Test Suite")
print("=" * 60)
# Find test directory
test_dir = Path(__file__).parent / "tests"
if not test_dir.exists():
print(f"❌ Test directory not found: {test_dir}")
return False
# Find all test files
test_files = list(test_dir.glob("test_*.py"))
if not test_files:
print(f"❌ No test files found in {test_dir}")
return False
print(f"📋 Found {len(test_files)} test files:")
for test_file in test_files:
print(f" - {test_file.name}")
# Run tests
results = []
for test_file in test_files:
success = run_test(test_file)
results.append((test_file.name, success))
# Summary
print("\n🎯 Test Results Summary")
print("=" * 60)
passed = sum(1 for _, success in results if success)
total = len(results)
for test_name, success in results:
status = "✅ PASS" if success else "❌ FAIL"
print(f" {test_name}: {status}")
print(f"\n📊 Overall: {passed}/{total} tests passed")
if passed == total:
print("🏆 ALL TESTS PASSED!")
return True
else:
print("💥 SOME TESTS FAILED!")
return False
if __name__ == "__main__":
success = main()
sys.exit(0 if success else 1)

View File

@@ -1,104 +0,0 @@
#!/usr/bin/env python3
"""
Interactive test runner for RO-Crate bidirectional system
"""
import sys
import subprocess
from pathlib import Path
def run_test(test_file, working_dir=None):
"""Run a test file with proper environment setup"""
import os
original_dir = Path.cwd()
try:
if working_dir:
Path(working_dir).mkdir(parents=True, exist_ok=True)
os.chdir(working_dir)
# Make test_file relative to the working directory if it's absolute
if working_dir and test_file.is_absolute():
try:
test_file = test_file.relative_to(working_dir)
except ValueError:
# If we can't make it relative, use the absolute path
pass
# Try to use uv if available, otherwise use regular python
try:
result = subprocess.run([
"uv", "run", "python", str(test_file)
], check=True, capture_output=False)
except (subprocess.CalledProcessError, FileNotFoundError):
# Fallback to regular python
result = subprocess.run([
"python", str(test_file)
], check=True, capture_output=False)
return result.returncode == 0
except Exception as e:
print(f"❌ Error running {test_file}: {e}")
return False
finally:
os.chdir(original_dir)
def main():
print("🔬 RO-Crate Bidirectional Test Runner")
print("=====================================")
# Get the path to test folder
test_folder = Path(__file__).parent / "tests"
# Read in the tests dictionary
if not test_folder.exists():
print(f"❌ Test folder not found: {test_folder}")
sys.exit(1)
tests = {}
test_counter = 1
for test in test_folder.glob("test_*.py"):
test_name = test.stem.replace("test_", "").replace("_", " ").title()
tests[str(test_counter)] = (test_name, test, None)
test_counter += 1
print("\nAvailable tests:")
for key, (name, _, _) in tests.items():
print(f"{key}. {name}")
print()
choice = input("Select test (number) or press Enter for complete test: ").strip()
if not choice:
# Run script run_all_tests.py
script_path = Path(__file__).parent / "run_all_tests.py"
if script_path.exists():
print("\n🔄 Running all tests via run_all_tests.py...")
success = run_test(script_path)
if success:
print("\n✅ All tests completed successfully!")
else:
print("\n❌ Some tests failed!")
sys.exit(1)
print("\n🏁 Test execution completed!")
return
if choice in tests:
name, test_file, working_dir = tests[choice]
print(f"\n🔄 Running {name}...")
success = run_test(test_file, working_dir)
if success:
print(f"\n{name} completed successfully!")
else:
print(f"\n{name} failed!")
sys.exit(1)
else:
print("❌ Invalid choice. Running default complete test...")
run_test("test_complete_round_trip.py")
print("\n🏁 Test execution completed!")
if __name__ == "__main__":
main()

View File

@@ -1 +0,0 @@
# Test package for lib-ro-crate-schema

View File

@@ -1,324 +0,0 @@
# RO-Crate Schema SHACL Validation
# Updated for the modern Python lib-ro-crate-schema architecture
# Validates RDF output from the TypeProperty, Type, and MetadataEntry classes
@prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> .
@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> .
@prefix owl: <http://www.w3.org/2002/07/owl#> .
@prefix schema: <https://schema.org/> .
@prefix xsd: <http://www.w3.org/2001/XMLSchema#> .
@prefix sh: <http://www.w3.org/ns/shacl#> .
@prefix base: <http://example.com/> .
@prefix ex: <http://example.org/shapes#> .
# =====================================================
# RDFS CLASS DEFINITIONS (Type objects)
# =====================================================
ex:ClassDefinitionShape
a sh:NodeShape ;
sh:targetClass rdfs:Class ;
sh:name "RDFS Class Shape" ;
sh:description "Validates Type objects - RDFS class definitions with properties and restrictions" ;
# Must have rdfs:subClassOf (inheritance)
sh:property [
sh:path rdfs:subClassOf ;
sh:nodeKind sh:IRI ;
sh:minCount 1 ;
sh:message "Every rdfs:Class must have at least one rdfs:subClassOf relationship"
] ;
# Optional label and comment
sh:property [
sh:path rdfs:label ;
sh:datatype xsd:string ;
sh:maxCount 1 ;
sh:message "rdfs:label must be a single string literal"
] ;
sh:property [
sh:path rdfs:comment ;
sh:datatype xsd:string ;
sh:maxCount 1 ;
sh:message "rdfs:comment must be a single string literal"
] ;
# OWL restrictions (cardinality constraints)
sh:property [
sh:path owl:restriction ;
sh:class owl:Restriction ;
sh:message "owl:restriction must reference valid owl:Restriction objects"
] ;
# Optional equivalent classes
sh:property [
sh:path owl:equivalentClass ;
sh:nodeKind sh:IRI ;
sh:message "owl:equivalentClass must be IRIs"
] .
# =====================================================
# RDF PROPERTY DEFINITIONS (TypeProperty objects)
# =====================================================
ex:PropertyDefinitionShape
a sh:NodeShape ;
sh:targetClass rdf:Property ;
sh:name "RDF Property Shape" ;
sh:description "Validates TypeProperty objects - RDF property definitions with domain/range" ;
# Optional label and comment
sh:property [
sh:path rdfs:label ;
sh:datatype xsd:string ;
sh:maxCount 1 ;
sh:message "rdfs:label must be a single string literal"
] ;
sh:property [
sh:path rdfs:comment ;
sh:datatype xsd:string ;
sh:maxCount 1 ;
sh:message "rdfs:comment must be a single string literal"
] ;
# Domain includes (what classes can have this property)
sh:property [
sh:path schema:domainIncludes ;
sh:nodeKind sh:IRI ;
sh:message "schema:domainIncludes must reference valid class IRIs"
] ;
# Range includes (what types can be values)
sh:property [
sh:path schema:rangeIncludes ;
sh:nodeKind sh:IRI ;
sh:message "schema:rangeIncludes must reference valid type/class IRIs"
] ;
# Optional equivalent properties
sh:property [
sh:path owl:equivalentProperty ;
sh:nodeKind sh:IRI ;
sh:message "owl:equivalentProperty must be IRIs"
] .
# =====================================================
# OWL RESTRICTION DEFINITIONS (Cardinality constraints)
# =====================================================
ex:RestrictionDefinitionShape
a sh:NodeShape ;
sh:targetClass owl:Restriction ;
sh:name "OWL Restriction Shape" ;
sh:description "Validates cardinality restrictions generated from TypeProperty.required fields" ;
# Must reference a property
sh:property [
sh:path owl:onProperty ;
sh:class rdf:Property ;
sh:minCount 1 ;
sh:maxCount 1 ;
sh:message "owl:Restriction must have exactly one owl:onProperty referencing an rdf:Property"
] ;
# Must have at least one cardinality constraint
sh:or (
[
sh:property [
sh:path owl:minCardinality ;
sh:minCount 1 ;
sh:maxCount 1
]
]
[
sh:property [
sh:path owl:maxCardinality ;
sh:minCount 1 ;
sh:maxCount 1
]
]
[
sh:property [
sh:path owl:cardinality ;
sh:minCount 1 ;
sh:maxCount 1
]
]
) ;
# Cardinality values must be non-negative integers (0 or 1 in our system)
sh:property [
sh:path owl:minCardinality ;
sh:nodeKind sh:Literal ;
sh:in (0 1) ;
sh:message "minCardinality must be 0 (optional) or 1 (required)"
] ;
sh:property [
sh:path owl:maxCardinality ;
sh:nodeKind sh:Literal ;
sh:in (0 1) ;
sh:message "maxCardinality must be 0 (unbounded) or 1 (single value)"
] .
# =====================================================
# METADATA ENTRY INSTANCES (MetadataEntry objects)
# =====================================================
ex:InstanceShape
a sh:NodeShape ;
sh:name "Metadata Entry Instance Shape" ;
sh:description "Validates MetadataEntry instances - entities with properties and references" ;
# Target nodes that have a type but are not schema definitions
sh:target [
a sh:SPARQLTarget ;
sh:select """
SELECT ?this WHERE {
?this a ?type .
FILTER(
?type != rdfs:Class &&
?type != rdf:Property &&
?type != owl:Restriction &&
!STRSTARTS(STR(?type), "http://www.w3.org/") &&
!STRSTARTS(STR(?type), "https://schema.org/")
)
}
"""
] ;
# Must have exactly one type declaration
sh:property [
sh:path rdf:type ;
sh:minCount 1 ;
sh:message "Every metadata entry must have exactly one rdf:type"
] .
# =====================================================
# RANGE VALIDATION FOR COMMON XSD TYPES
# =====================================================
ex:StringPropertyShape
a sh:NodeShape ;
sh:name "String Property Validation" ;
sh:description "Validates properties with xsd:string range" ;
sh:target [
a sh:SPARQLTarget ;
sh:select """
SELECT ?this WHERE {
?this ?prop ?value .
?prop schema:rangeIncludes xsd:string .
FILTER(isLiteral(?value))
}
"""
] ;
sh:nodeKind sh:Literal ;
sh:datatype xsd:string .
ex:IntegerPropertyShape
a sh:NodeShape ;
sh:name "Integer Property Validation" ;
sh:description "Validates properties with xsd:integer range" ;
sh:target [
a sh:SPARQLTarget ;
sh:select """
SELECT ?this WHERE {
?this ?prop ?value .
?prop schema:rangeIncludes xsd:integer .
FILTER(isLiteral(?value))
}
"""
] ;
sh:nodeKind sh:Literal ;
sh:datatype xsd:integer .
# =====================================================
# REFERENCE VALIDATION (Object Properties)
# =====================================================
ex:ReferencePropertyShape
a sh:NodeShape ;
sh:name "Reference Property Validation" ;
sh:description "Validates reference properties that point to other entities" ;
sh:target [
a sh:SPARQLTarget ;
sh:select """
SELECT ?this WHERE {
?this ?prop ?target .
?prop schema:rangeIncludes ?rangeClass .
?target a ?targetType .
FILTER(
!isLiteral(?target) &&
?rangeClass != xsd:string &&
?rangeClass != xsd:integer &&
?rangeClass != xsd:dateTime &&
?rangeClass != xsd:boolean
)
}
"""
] ;
sh:nodeKind sh:IRI .
# =====================================================
# CONSISTENCY VALIDATION
# =====================================================
ex:DomainConsistencyShape
a sh:NodeShape ;
sh:name "Domain Consistency Validation" ;
sh:description "Ensures entities only use properties appropriate for their type" ;
sh:target [
a sh:SPARQLTarget ;
sh:select """
SELECT ?this WHERE {
?this a ?type .
?this ?prop ?value .
?prop schema:domainIncludes ?domain .
FILTER(?type != ?domain && ?type != rdfs:Class && ?type != rdf:Property && ?type != owl:Restriction)
}
"""
] ;
sh:sparql [
a sh:SPARQLConstraint ;
sh:message "Entity type must be compatible with property domain" ;
sh:select """
SELECT $this ?prop ?domain ?actualType WHERE {
$this a ?actualType .
$this ?prop ?value .
?prop schema:domainIncludes ?domain .
FILTER(?actualType != ?domain)
}
"""
] .
ex:RequiredPropertyShape
a sh:NodeShape ;
sh:name "Required Property Validation" ;
sh:description "Ensures required properties (minCardinality=1) are present" ;
sh:target [
a sh:SPARQLTarget ;
sh:select """
SELECT ?this WHERE {
?this a ?type .
?type owl:restriction ?restriction .
?restriction owl:minCardinality 1 .
?restriction owl:onProperty ?requiredProp .
FILTER NOT EXISTS { ?this ?requiredProp ?value }
}
"""
] ;
sh:sparql [
a sh:SPARQLConstraint ;
sh:message "Required property is missing" ;
sh:select """
SELECT $this ?requiredProp WHERE {
$this a ?type .
?type owl:restriction ?restriction .
?restriction owl:minCardinality 1 .
?restriction owl:onProperty ?requiredProp .
FILTER NOT EXISTS { $this ?requiredProp ?value }
}
"""
] .

View File

@@ -1,138 +0,0 @@
#!/usr/bin/env python3
"""
Simple test to see how unknown namespaces are handled by get_context function.
"""
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent / "src"))
from lib_ro_crate_schema.crate.jsonld_utils import get_context
from rdflib import Graph, URIRef, Literal
from rdflib.namespace import RDF, RDFS
def create_graph_with_unknown_namespaces():
"""Create an RDF graph with unknown namespaces."""
g = Graph()
# Add triples with unknown pokemon.org namespace
pokemon_ns = "http://pokemon.org/"
pikachu = URIRef(pokemon_ns + "pikachu")
pokemon_name = URIRef(pokemon_ns + "pokemonName")
electric_type = URIRef(pokemon_ns + "ElectricPokemon")
# Add some triples
g.add((pikachu, RDF.type, electric_type))
g.add((pikachu, pokemon_name, Literal("Pikachu")))
g.add((pokemon_name, RDF.type, RDF.Property))
g.add((pokemon_name, RDFS.label, Literal("Pokemon Name")))
# Add triples with another unknown namespace
villains_ns = "http://villains.org/"
team_rocket = URIRef(villains_ns + "team_rocket")
criminal_org = URIRef(villains_ns + "CriminalOrganization")
motto = URIRef(villains_ns + "motto")
g.add((team_rocket, RDF.type, criminal_org))
g.add((team_rocket, motto, Literal("Prepare for trouble!")))
# Also add some known namespaces for comparison
schema_name = URIRef("https://schema.org/name")
g.add((pikachu, schema_name, Literal("Pikachu the Electric Mouse")))
# Add example.com namespace (base namespace in predefined list)
example_person = URIRef("http://example.com/trainer")
example_name = URIRef("http://example.com/trainerName")
g.add((example_person, example_name, Literal("Ash Ketchum")))
g.add((example_name, RDF.type, RDF.Property))
return g
def main():
print("🔍 TESTING get_context() WITH UNKNOWN NAMESPACES")
print("=" * 55)
# Create graph with unknown namespaces
g = create_graph_with_unknown_namespaces()
print("📊 Graph Statistics:")
print(f" Total triples: {len(g)}")
print("\n🔍 URIs in the graph:")
all_uris = set()
for s, p, o in g:
for uri in [str(s), str(p), str(o)]:
if uri.startswith('http'):
all_uris.add(uri)
# Group by namespace
namespaces = {}
for uri in sorted(all_uris):
if 'pokemon.org' in uri:
namespaces.setdefault('pokemon.org', []).append(uri)
elif 'villains.org' in uri:
namespaces.setdefault('villains.org', []).append(uri)
elif 'schema.org' in uri:
namespaces.setdefault('schema.org', []).append(uri)
elif 'example.com' in uri:
namespaces.setdefault('example.com', []).append(uri)
else:
namespaces.setdefault('other', []).append(uri)
for ns, uris in namespaces.items():
print(f"\n {ns}:")
for uri in uris[:3]: # Show first 3
print(f" {uri}")
if len(uris) > 3:
print(f" ... and {len(uris) - 3} more")
# Test get_context function
print(f"\n🎯 Testing get_context() function:")
context = get_context(g)
print("📋 Generated Context:")
if isinstance(context, list):
for i, ctx_layer in enumerate(context):
if isinstance(ctx_layer, str):
print(f" Layer {i}: \"{ctx_layer}\"")
else:
print(f" Layer {i}:")
for prefix, uri in sorted(ctx_layer.items()):
print(f" \"{prefix}\": \"{uri}\"")
else:
print(f" Single context: {context}")
# Analyze what happened
print(f"\n🧪 Analysis:")
detected_namespaces = set()
if isinstance(context, list) and len(context) > 1:
for ctx in context[1:]:
if isinstance(ctx, dict):
detected_namespaces.update(ctx.values())
test_namespaces = [
('pokemon.org', 'http://pokemon.org/'),
('villains.org', 'http://villains.org/'),
('schema.org', 'https://schema.org/'),
('example.com', 'http://example.com/')
]
for ns_name, ns_uri in test_namespaces:
if ns_uri in detected_namespaces:
print(f"{ns_name}: DETECTED")
else:
print(f"{ns_name}: NOT DETECTED")
print(f"\n🎮 Conclusion:")
unknown_detected = any(ns in detected_namespaces for _, ns in test_namespaces[:2])
if unknown_detected:
print(f" 🎉 Unknown namespaces are automatically detected!")
else:
print(f" ❌ Unknown namespaces are NOT automatically detected")
print(f" ➡️ Only predefined namespaces in namespace_prefixes are recognized")
if __name__ == "__main__":
main()

View File

@@ -1,93 +0,0 @@
#!/usr/bin/env python3
"""
Test the enhanced @ro_crate_schema decorator with explicit id parameter.
"""
import sys
sys.path.append('src')
from lib_ro_crate_schema.crate.schema_facade import SchemaFacade
from lib_ro_crate_schema.crate.decorators import ro_crate_schema, Field
from pydantic import BaseModel
# Test the new 'id' parameter in the decorator
@ro_crate_schema(
id="CustomPerson",
ontology="https://schema.org/Person"
)
class PersonModel(BaseModel):
"""A person model with explicit ID different from class name"""
name: str = Field(ontology="https://schema.org/name")
email: str = Field(ontology="https://schema.org/email")
# Test without explicit ID (should default to class name)
@ro_crate_schema(ontology="https://schema.org/Dataset")
class DatasetModel(BaseModel):
"""A dataset model without explicit ID"""
title: str = Field(ontology="https://schema.org/name")
description: str = Field(ontology="https://schema.org/description")
def test_decorator_with_id():
print("🧪 Testing @ro_crate_schema decorator with explicit id parameter...")
# Create facade and add models
facade = SchemaFacade()
facade.add_all_registered_models()
print("\n📊 Registered types:")
for type_obj in facade.get_types():
print(f" - Type ID: '{type_obj.id}' (from class: {type_obj.__class__.__name__})")
# Verify that PersonModel got the custom ID "CustomPerson"
person_type = facade.get_type("CustomPerson")
dataset_type = facade.get_type("DatasetModel") # Should use class name
if person_type:
print(f"✅ Found PersonModel with custom ID: '{person_type.id}'")
else:
print("❌ PersonModel with custom ID not found")
if dataset_type:
print(f"✅ Found DatasetModel with default ID: '{dataset_type.id}'")
else:
print("❌ DatasetModel with default ID not found")
# Create instances and add them
person = PersonModel(name="Alice Johnson", email="alice@example.com")
dataset = DatasetModel(title="Test Dataset", description="A test dataset")
facade.add_model_instance(person, "alice")
facade.add_model_instance(dataset, "test_dataset")
print("\n📦 Metadata entries:")
for entry in facade.get_entries():
print(f" - {entry.id} (class_id: {entry.class_id})")
# Verify the entries use the correct type IDs
alice_entry = facade.get_entry("alice")
dataset_entry = facade.get_entry("test_dataset")
if alice_entry and alice_entry.class_id == "CustomPerson":
print("✅ Alice entry correctly references 'CustomPerson' type")
else:
print(f"❌ Alice entry has wrong class_id: {alice_entry.class_id if alice_entry else 'None'}")
if dataset_entry and dataset_entry.class_id == "DatasetModel":
print("✅ Dataset entry correctly references 'DatasetModel' type")
else:
print(f"❌ Dataset entry has wrong class_id: {dataset_entry.class_id if dataset_entry else 'None'}")
# Export and verify
print("\n💾 Testing RO-Crate export...")
import os
output_dir = "output_crates"
os.makedirs(output_dir, exist_ok=True)
test_output_path = os.path.join(output_dir, "test_decorator_id_output")
facade.write(test_output_path, name="Test ID Parameter")
print("✅ Export successful!")
print("\n🎉 Test completed successfully!")
if __name__ == "__main__":
test_decorator_with_id()

View File

@@ -1,76 +0,0 @@
#!/usr/bin/env python3
"""
Test the refactored get_crate method to ensure it works independently.
"""
import sys
sys.path.append('src')
from lib_ro_crate_schema.crate.schema_facade import SchemaFacade
from lib_ro_crate_schema.crate.metadata_entry import MetadataEntry
from lib_ro_crate_schema.crate.type import Type
from lib_ro_crate_schema.crate.type_property import TypeProperty
from lib_ro_crate_schema.crate.restriction import Restriction
def test_get_crate_method():
print("🧪 Testing get_crate method...")
# Create a simple schema
facade = SchemaFacade()
# Add a simple type with a property
name_prop = TypeProperty(
id="name",
range_includes=["http://www.w3.org/2001/XMLSchema#string"],
required=True
)
person_type = Type(
id="Person",
rdfs_property=[name_prop],
comment="A person entity"
)
facade.addType(person_type)
# Add a metadata entry
person_entry = MetadataEntry(
id="john_doe",
class_id="Person",
properties={"name": "John Doe"}
)
facade.addEntry(person_entry)
# Test get_crate method
print("📦 Testing get_crate method...")
crate = facade.get_crate(
name="Test RO-Crate",
description="A test crate created using get_crate method"
)
print(f"✅ Created crate: {crate}")
print(f"✅ Crate name: {getattr(crate, 'name', 'Not set')}")
print(f"✅ Crate description: {getattr(crate, 'description', 'Not set')}")
# Test that the crate can be written
print("💾 Testing crate writing...")
import os
output_dir = "output_crates"
os.makedirs(output_dir, exist_ok=True)
test_get_crate_path = os.path.join(output_dir, "test_get_crate_output")
crate.write(test_get_crate_path)
print(f"✅ Crate written successfully to '{test_get_crate_path}'")
# Test that write method still works (using get_crate internally)
print("💾 Testing write method (should use get_crate internally)...")
test_write_path = os.path.join(output_dir, "test_write_output")
facade.write(test_write_path, name="Test via Write", description="Using write method")
print("✅ Write method works correctly")
print("🎉 All tests passed!")
if __name__ == "__main__":
test_get_crate_method()

View File

@@ -1,400 +0,0 @@
import unittest
import sys
import json
import tempfile
from pathlib import Path
from lib_ro_crate_schema.crate.schema_facade import SchemaFacade
from lib_ro_crate_schema.crate.type import Type
from lib_ro_crate_schema.crate.type_property import TypeProperty
from lib_ro_crate_schema.crate.literal_type import LiteralType
from lib_ro_crate_schema.crate.metadata_entry import MetadataEntry
class TestIntegrationExamples(unittest.TestCase):
"""Integration tests using real examples from the codebase"""
def setUp(self):
"""Set up paths to example files"""
self.test_dir = Path(__file__).parent
self.examples_dir = self.test_dir.parent.parent / "examples"
self.lib_dir = self.test_dir.parent
self.obenbis_crate = self.lib_dir.parent.parent / "example" / "obenbis-one-publication" / "ro-crate-metadata.json"
def test_examples_py_recreation(self):
"""Test recreating the example from examples.py"""
# Recreate the example schema from examples.py
name = TypeProperty(
id="name",
range_includes=[LiteralType.STRING],
required=True,
label="Full Name",
comment="The full name of the entity"
)
identifier = TypeProperty(
id="identifier",
range_includes=[LiteralType.STRING],
required=True,
label="Identifier",
comment="Unique identifier for the entity"
)
colleague = TypeProperty(
id="colleague",
range_includes=["Participant"],
required=False,
label="Colleague",
comment="Optional colleague relationship"
)
participant_type = Type(
id="Participant",
subclass_of=["https://schema.org/Thing"],
ontological_annotations=["http://purl.org/dc/terms/creator"],
rdfs_property=[name, identifier],
comment="A participant in the research",
label="Participant",
)
creator_type = Type(
id="Creator",
subclass_of=["https://schema.org/Thing"],
ontological_annotations=["http://purl.org/dc/terms/creator"],
rdfs_property=[name, identifier, colleague],
comment="A creator of the research work",
label="Creator",
)
creator_entry = MetadataEntry(
id="creator1",
class_id="Creator",
properties={
"name": "John Author",
"identifier": "https://orcid.org/0000-0000-0000-0000",
},
references={},
)
participant_entry = MetadataEntry(
id="participant",
class_id="Participant",
properties={
"name": "Karl Participant",
"identifier": "https://orcid.org/0000-0000-0000-0001",
},
references={
"colleague": ["creator1"]
},
)
schema = SchemaFacade(
types=[creator_type, participant_type],
metadata_entries=[creator_entry, participant_entry],
)
# Test the schema
self.assertEqual(len(schema.types), 2)
self.assertEqual(len(schema.metadata_entries), 2)
# Test types
creator = schema.get_type("Creator")
self.assertIsNotNone(creator)
self.assertEqual(creator.label, "Creator")
self.assertEqual(len(creator.rdfs_property), 3) # name, identifier, colleague
participant = schema.get_type("Participant")
self.assertIsNotNone(participant)
self.assertEqual(participant.label, "Participant")
self.assertEqual(len(participant.rdfs_property), 2) # name, identifier
# Test metadata entries
creator_md = schema.get_entry("creator1")
self.assertIsNotNone(creator_md)
self.assertEqual(creator_md.properties["name"], "John Author")
participant_md = schema.get_entry("participant")
self.assertIsNotNone(participant_md)
self.assertEqual(participant_md.references["colleague"], ["creator1"])
# Test triple generation
triples = list(schema.to_triples())
self.assertGreater(len(triples), 0)
# Test JSON generation
json_data = schema.to_json()
self.assertIn("@context", json_data)
self.assertIn("@graph", json_data)
def test_obenbis_import(self):
"""Test importing the OpenBIS one-publication RO-Crate"""
if not self.obenbis_crate.exists():
self.skipTest(f"OpenBIS example file not found at {self.obenbis_crate}")
# Import the OpenBIS RO-Crate
facade = SchemaFacade.from_ro_crate(self.obenbis_crate)
# Test that import was successful
self.assertIsNotNone(facade)
# Should have imported some types and/or metadata entries
total_items = len(facade.types) + len(facade.metadata_entries)
self.assertGreater(total_items, 0, "Should have imported some schema elements")
# Test that we can generate JSON-LD from imported data
json_data = facade.to_json()
self.assertIn("@context", json_data)
self.assertIn("@graph", json_data)
# Test that we can generate triples
triples = list(facade.to_triples())
self.assertGreater(len(triples), 0, "Should generate RDF triples")
print(f"Imported facade with {len(facade.types)} types and {len(facade.metadata_entries)} metadata entries")
# If we have types, test they have proper structure
if facade.types:
first_type = facade.types[0]
self.assertIsNotNone(first_type.id)
print(f"First imported type: {first_type.id}")
# If we have metadata entries, test they have proper structure
if facade.metadata_entries:
first_entry = facade.metadata_entries[0]
self.assertIsNotNone(first_entry.id)
self.assertIsNotNone(first_entry.class_id)
print(f"First imported entry: {first_entry.id} of type {first_entry.class_id}")
def test_obenbis_structure_analysis(self):
"""Test analyzing the structure of the OpenBIS RO-Crate"""
if not self.obenbis_crate.exists():
self.skipTest(f"OpenBIS example file not found at {self.obenbis_crate}")
# Read raw JSON to analyze structure
with open(self.obenbis_crate, 'r') as f:
crate_data = json.load(f)
self.assertIn("@graph", crate_data)
graph = crate_data["@graph"]
# Analyze what types of entities are in the crate
entity_types = {}
rdfs_classes = []
rdf_properties = []
owl_restrictions = []
metadata_entities = []
for item in graph:
item_type = item.get("@type", "Unknown")
item_id = item.get("@id", "")
if item_type == "rdfs:Class":
rdfs_classes.append(item_id)
elif item_type in ["rdf:Property", "rdfs:Property"]:
rdf_properties.append(item_id)
elif item_type == "owl:Restriction":
owl_restrictions.append(item_id)
elif item_id not in ["./", "ro-crate-metadata.json"]:
metadata_entities.append((item_id, item_type))
# Count entity types
if item_type in entity_types:
entity_types[item_type] += 1
else:
entity_types[item_type] = 1
print("\nOpenBIS RO-Crate structure analysis:")
print(f"Total entities: {len(graph)}")
print(f"RDFS Classes: {len(rdfs_classes)}")
print(f"RDF Properties: {len(rdf_properties)}")
print(f"OWL Restrictions: {len(owl_restrictions)}")
print(f"Metadata entities: {len(metadata_entities)}")
print("\nEntity type distribution:")
for entity_type, count in sorted(entity_types.items()):
print(f" {entity_type}: {count}")
# Test that the structure makes sense
self.assertGreater(len(graph), 0, "Should have entities in the graph")
if rdfs_classes:
print(f"\nSample RDFS Classes: {rdfs_classes[:5]}")
if rdf_properties:
print(f"Sample RDF Properties: {rdf_properties[:5]}")
if metadata_entities:
print(f"Sample Metadata Entities: {[f'{id} ({type})' for id, type in metadata_entities[:5]]}")
def test_create_minimal_example(self):
"""Test creating a minimal working example similar to examples.py"""
# Create a minimal Person schema
name_prop = TypeProperty(
id="name",
range_includes=[LiteralType.STRING],
required=True,
label="Name"
)
email_prop = TypeProperty(
id="email",
range_includes=[LiteralType.STRING],
required=False,
label="Email"
)
person_type = Type(
id="Person",
rdfs_property=[name_prop, email_prop],
label="Person",
comment="A person entity"
)
# Create a person instance
person_instance = MetadataEntry(
id="john_doe",
class_id="Person",
properties={
"name": "John Doe",
"email": "john@example.com"
}
)
# Create facade
facade = SchemaFacade(
types=[person_type],
metadata_entries=[person_instance]
)
# Test basic functionality
self.assertEqual(len(facade.types), 1)
self.assertEqual(len(facade.metadata_entries), 1)
# Test export to temporary directory
with tempfile.TemporaryDirectory() as temp_dir:
facade.write(
temp_dir,
name="Minimal Example",
description="A minimal RO-Crate example",
license="CC0"
)
# Verify files were created
metadata_file = Path(temp_dir) / "ro-crate-metadata.json"
self.assertTrue(metadata_file.exists())
# Verify the JSON structure
with open(metadata_file, 'r') as f:
exported_data = json.load(f)
self.assertIn("@context", exported_data)
self.assertIn("@graph", exported_data)
# Check that our Person type and instance are included
graph = exported_data["@graph"]
person_class_found = any(
(item.get("@id") in ["Person", "base:Person", "http://example.com/Person"]) and item.get("@type") == "rdfs:Class"
for item in graph
)
self.assertTrue(person_class_found, "Should export Person class")
person_instance_found = any(
(item.get("@id") in ["john_doe", "base:john_doe", "http://example.com/john_doe"]) and
item.get("@type") in ["Person", "base:Person", "http://example.com/Person"]
for item in graph
)
self.assertTrue(person_instance_found, "Should export person instance")
print(f"\nMinimal example exported with {len(graph)} entities")
def test_complex_relationship_example(self):
"""Test creating example with complex relationships between entities"""
# Define properties
name_prop = TypeProperty(id="name", range_includes=[LiteralType.STRING], required=True)
title_prop = TypeProperty(id="title", range_includes=[LiteralType.STRING], required=True)
author_prop = TypeProperty(id="author", range_includes=["Person"], required=True)
publisher_prop = TypeProperty(id="publisher", range_includes=["Organization"], required=False)
# Define types
person_type = Type(
id="Person",
rdfs_property=[name_prop],
label="Person"
)
organization_type = Type(
id="Organization",
rdfs_property=[name_prop],
label="Organization"
)
article_type = Type(
id="Article",
rdfs_property=[title_prop, author_prop, publisher_prop],
label="Article"
)
# Create instances
author = MetadataEntry(
id="author1",
class_id="Person",
properties={"name": "Dr. Jane Smith"}
)
publisher = MetadataEntry(
id="pub1",
class_id="Organization",
properties={"name": "Academic Press"}
)
article = MetadataEntry(
id="article1",
class_id="Article",
properties={"title": "Advanced RO-Crate Techniques"},
references={
"author": ["author1"],
"publisher": ["pub1"]
}
)
# Create facade
facade = SchemaFacade(
types=[person_type, organization_type, article_type],
metadata_entries=[author, publisher, article]
)
# Test relationships
self.assertEqual(len(facade.types), 3)
self.assertEqual(len(facade.metadata_entries), 3)
# Test that references work correctly
article_entry = facade.get_entry("article1")
self.assertIn("author1", article_entry.references["author"])
self.assertIn("pub1", article_entry.references["publisher"])
# Test triple generation includes relationships
triples = list(facade.to_triples())
triple_strs = [(str(s), str(p), str(o)) for s, p, o in triples]
# Should have triples linking article to author and publisher
author_ref_found = any(
"article1" in triple[0] and "author" in triple[1] and "author1" in triple[2]
for triple in triple_strs
)
self.assertTrue(author_ref_found, "Should generate author reference triple")
publisher_ref_found = any(
"article1" in triple[0] and "publisher" in triple[1] and "pub1" in triple[2]
for triple in triple_strs
)
self.assertTrue(publisher_ref_found, "Should generate publisher reference triple")
print(f"\nComplex relationship example generated {len(triples)} triples")
if __name__ == '__main__':
unittest.main()

View File

@@ -1,272 +0,0 @@
import unittest
import sys
from pathlib import Path
from datetime import datetime
# Add source to path
sys.path.insert(0, str(Path(__file__).parent.parent / "src"))
from lib_ro_crate_schema.crate.metadata_entry import MetadataEntry
from rdflib import URIRef, RDF, Literal
class TestMetadataEntry(unittest.TestCase):
"""Test cases for the MetadataEntry class"""
def setUp(self):
"""Set up test fixtures"""
self.basic_entry = MetadataEntry(
id="basic_entry",
class_id="BasicClass"
)
self.complete_entry = MetadataEntry(
id="person1",
class_id="Person",
properties={
"name": "John Doe",
"age": 30,
"active": True
},
references={
"knows": ["person2", "person3"],
"worksFor": ["organization1"]
}
)
self.datetime_entry = MetadataEntry(
id="event1",
class_id="Event",
properties={
"title": "Important Meeting",
"startTime": datetime(2023, 12, 25, 14, 30, 0)
}
)
def test_metadata_entry_creation(self):
"""Test basic MetadataEntry object creation"""
self.assertEqual(self.basic_entry.id, "basic_entry")
self.assertEqual(self.basic_entry.class_id, "BasicClass")
self.assertEqual(self.basic_entry.properties, {})
self.assertEqual(self.basic_entry.references, {})
def test_complete_entry_properties(self):
"""Test entry with complete properties and references"""
self.assertEqual(self.complete_entry.id, "person1")
self.assertEqual(self.complete_entry.class_id, "Person")
# Check properties
self.assertEqual(self.complete_entry.properties["name"], "John Doe")
self.assertEqual(self.complete_entry.properties["age"], 30)
self.assertEqual(self.complete_entry.properties["active"], True)
# Check references
self.assertEqual(self.complete_entry.references["knows"], ["person2", "person3"])
self.assertEqual(self.complete_entry.references["worksFor"], ["organization1"])
def test_java_api_compatibility(self):
"""Test Java API compatibility methods"""
self.assertEqual(self.complete_entry.getId(), "person1")
self.assertEqual(self.complete_entry.getClassId(), "Person")
values = self.complete_entry.getValues()
self.assertEqual(values["name"], "John Doe")
self.assertEqual(values["age"], 30)
references = self.complete_entry.getReferences()
self.assertEqual(references["knows"], ["person2", "person3"])
# Test alias method
self.assertEqual(self.complete_entry.get_values(), self.complete_entry.properties)
def test_to_triples(self):
"""Test RDF triple generation"""
triples = list(self.complete_entry.to_triples())
# Should generate multiple triples
self.assertGreater(len(triples), 0)
# Convert to string representation for easier testing
triple_strs = [(str(s), str(p), str(o)) for s, p, o in triples]
# Check for type declaration
type_triple_found = any("Person" in triple[2] for triple in triple_strs)
self.assertTrue(type_triple_found, "Should generate class type triple")
# Check for properties
name_triple_found = any("name" in triple[1] and "John Doe" in triple[2] for triple in triple_strs)
self.assertTrue(name_triple_found, "Should generate property triples")
age_triple_found = any("age" in triple[1] and "30" in triple[2] for triple in triple_strs)
self.assertTrue(age_triple_found, "Should generate age property triple")
def test_datetime_handling(self):
"""Test handling of datetime objects in properties"""
triples = list(self.datetime_entry.to_triples())
triple_strs = [(str(s), str(p), str(o)) for s, p, o in triples]
# Datetime should be converted to ISO format string
datetime_found = any("startTime" in triple[1] and "2023-12-25T14:30:00" in triple[2] for triple in triple_strs)
self.assertTrue(datetime_found, "Should convert datetime to ISO string")
def test_reference_triples(self):
"""Test reference generation in triples"""
triples = list(self.complete_entry.to_triples())
triple_strs = [(str(s), str(p), str(o)) for s, p, o in triples]
# Check for reference triples (no Literal wrapper for references)
knows_ref_found = any("knows" in triple[1] and "person2" in triple[2] for triple in triple_strs)
self.assertTrue(knows_ref_found, "Should generate reference triples")
works_for_ref_found = any("worksFor" in triple[1] and "organization1" in triple[2] for triple in triple_strs)
self.assertTrue(works_for_ref_found, "Should generate worksFor reference")
def test_empty_entry_triples(self):
"""Test triple generation for entry with no properties or references"""
empty_entry = MetadataEntry(id="empty", class_id="EmptyClass")
triples = list(empty_entry.to_triples())
# Should at least generate the type declaration
self.assertGreater(len(triples), 0)
triple_strs = [(str(s), str(p), str(o)) for s, p, o in triples]
type_found = any("EmptyClass" in triple[2] for triple in triple_strs)
self.assertTrue(type_found, "Should generate type declaration even for empty entry")
def test_mixed_property_types(self):
"""Test entry with various property value types"""
mixed_entry = MetadataEntry(
id="mixed",
class_id="MixedType",
properties={
"string_prop": "text value",
"int_prop": 42,
"float_prop": 3.14,
"bool_prop": False,
"none_prop": None # Should be filtered out
}
)
triples = list(mixed_entry.to_triples())
triple_strs = [(str(s), str(p), str(o)) for s, p, o in triples]
# Check each type is properly handled
string_found = any("string_prop" in triple[1] and "text value" in triple[2] for triple in triple_strs)
int_found = any("int_prop" in triple[1] and "42" in triple[2] for triple in triple_strs)
float_found = any("float_prop" in triple[1] and "3.14" in triple[2] for triple in triple_strs)
bool_found = any("bool_prop" in triple[1] and "false" in triple[2] for triple in triple_strs)
self.assertTrue(string_found, "Should handle string properties")
self.assertTrue(int_found, "Should handle integer properties")
self.assertTrue(float_found, "Should handle float properties")
self.assertTrue(bool_found, "Should handle boolean properties")
# None properties should not generate triples (filtered out in actual implementation)
none_found = any("none_prop" in triple[1] for triple in triple_strs)
# Note: The current implementation might include None values,
# but ideally they should be filtered out
def test_multiple_references_same_property(self):
"""Test property with multiple reference values"""
multi_ref_entry = MetadataEntry(
id="multi_ref",
class_id="MultiRef",
references={
"collaborator": ["person1", "person2", "person3"]
}
)
triples = list(multi_ref_entry.to_triples())
triple_strs = [(str(s), str(p), str(o)) for s, p, o in triples]
# Should generate separate triples for each reference
collab1_found = any("collaborator" in triple[1] and "person1" in triple[2] for triple in triple_strs)
collab2_found = any("collaborator" in triple[1] and "person2" in triple[2] for triple in triple_strs)
collab3_found = any("collaborator" in triple[1] and "person3" in triple[2] for triple in triple_strs)
self.assertTrue(collab1_found, "Should generate triple for person1")
self.assertTrue(collab2_found, "Should generate triple for person2")
self.assertTrue(collab3_found, "Should generate triple for person3")
def test_id_and_class_id_validation(self):
"""Test that id and class_id are properly set and accessible"""
entry = MetadataEntry(id="test_id", class_id="TestClass")
# Direct access
self.assertEqual(entry.id, "test_id")
self.assertEqual(entry.class_id, "TestClass")
# Java API access
self.assertEqual(entry.getId(), "test_id")
self.assertEqual(entry.getClassId(), "TestClass")
def test_get_entry_as_compatibility(self):
"""Test the get_entry_as method for SchemaFacade compatibility"""
# This test verifies that MetadataEntry objects work with the new get_entry_as method
from lib_ro_crate_schema.crate.schema_facade import SchemaFacade
from pydantic import BaseModel
from typing import Optional
# Create a simple test model
class TestPerson(BaseModel):
name: str
age: Optional[int] = None
active: Optional[bool] = None
# Create a facade and add our test entry
facade = SchemaFacade()
facade.addEntry(self.complete_entry)
# Test conversion to our test model
person_instance = facade.get_entry_as("person1", TestPerson)
self.assertIsNotNone(person_instance)
self.assertIsInstance(person_instance, TestPerson)
self.assertEqual(person_instance.name, "John Doe")
self.assertEqual(person_instance.age, 30)
self.assertEqual(person_instance.active, True)
# Test with non-existent entry
none_result = facade.get_entry_as("nonexistent", TestPerson)
self.assertIsNone(none_result)
def test_get_entry_as_with_references(self):
"""Test get_entry_as handling of references"""
from lib_ro_crate_schema.crate.schema_facade import SchemaFacade
from pydantic import BaseModel
from typing import Optional, List
class TestOrganization(BaseModel):
name: str
class TestPersonWithRefs(BaseModel):
name: str
age: Optional[int] = None
knows: Optional[List[str]] = None # Keep as strings for this test
worksFor: Optional[str] = None # Single reference as string
# Create facade and add entries
facade = SchemaFacade()
facade.addEntry(self.complete_entry)
# Add a referenced organization entry
org_entry = MetadataEntry(
id="organization1",
class_id="Organization",
properties={"name": "Tech Corp"}
)
facade.addEntry(org_entry)
# Test conversion
person = facade.get_entry_as("person1", TestPersonWithRefs)
self.assertIsNotNone(person)
self.assertEqual(person.name, "John Doe")
self.assertEqual(person.age, 30)
self.assertEqual(person.knows, ["person2", "person3"]) # References as IDs
self.assertEqual(person.worksFor, "organization1") # Single reference as ID
if __name__ == '__main__':
unittest.main()

View File

@@ -1,209 +0,0 @@
"""
Test suite for Pydantic model export functionality in SchemaFacade.
"""
import unittest
import sys
from pathlib import Path
from typing import List, Optional
# Add src to path for imports
sys.path.insert(0, str(Path(__file__).parent.parent / "src"))
from lib_ro_crate_schema.crate.schema_facade import SchemaFacade
from lib_ro_crate_schema.crate.type import Type
from lib_ro_crate_schema.crate.type_property import TypeProperty
from lib_ro_crate_schema.crate.restriction import Restriction
from pydantic import BaseModel, ValidationError
class TestPydanticExport(unittest.TestCase):
"""Test Pydantic model export functionality"""
def setUp(self):
"""Set up test fixtures"""
self.facade = SchemaFacade()
# Create a simple Person type
person_name_prop = TypeProperty(
id="name",
label="Name",
comment="Person's name",
range_includes=["http://www.w3.org/2001/XMLSchema#string"],
required=True
)
person_age_prop = TypeProperty(
id="age",
label="Age",
comment="Age in years",
range_includes=["http://www.w3.org/2001/XMLSchema#integer"],
required=False
)
person_type = Type(
id="Person",
label="Person",
comment="A person",
rdfs_property=[person_name_prop, person_age_prop],
restrictions=[
Restriction(property_type="name", min_cardinality=1, max_cardinality=1),
Restriction(property_type="age", min_cardinality=0, max_cardinality=1)
]
)
self.facade.addType(person_type)
def test_export_single_model(self):
"""Test exporting a single model"""
PersonModel = self.facade.export_pydantic_model("Person")
# Check class properties
self.assertEqual(PersonModel.__name__, "Person")
self.assertIn("name", PersonModel.__annotations__)
self.assertIn("age", PersonModel.__annotations__)
# Test instance creation
person = PersonModel(name="Alice")
self.assertEqual(person.name, "Alice")
self.assertIsNone(person.age)
# Test validation
with self.assertRaises(ValidationError):
PersonModel() # Missing required 'name'
def test_export_all_models(self):
"""Test exporting all models"""
models = self.facade.export_all_pydantic_models()
self.assertIn("Person", models)
PersonModel = models["Person"]
# Test functionality
person = PersonModel(name="Bob", age=30)
self.assertEqual(person.name, "Bob")
self.assertEqual(person.age, 30)
def test_type_mapping(self):
"""Test RDF type to Python type mapping"""
# Test different data types
string_type = self.facade._rdf_type_to_python_type(["http://www.w3.org/2001/XMLSchema#string"])
self.assertEqual(string_type, str)
int_type = self.facade._rdf_type_to_python_type(["http://www.w3.org/2001/XMLSchema#integer"])
self.assertEqual(int_type, int)
bool_type = self.facade._rdf_type_to_python_type(["http://www.w3.org/2001/XMLSchema#boolean"])
self.assertEqual(bool_type, bool)
# Test schema.org types
schema_text = self.facade._rdf_type_to_python_type(["https://schema.org/Text"])
self.assertEqual(schema_text, str)
def test_field_requirements(self):
"""Test field requirement detection from restrictions"""
person_type = self.facade.get_type("Person")
# name should be required (minCardinality: 1)
self.assertTrue(self.facade._is_field_required(person_type, "name"))
# age should be optional (minCardinality: 0)
self.assertFalse(self.facade._is_field_required(person_type, "age"))
def test_list_fields(self):
"""Test list field detection"""
# Add a type with list property
list_prop = TypeProperty(
id="tags",
label="Tags",
range_includes=["http://www.w3.org/2001/XMLSchema#string"]
)
list_type = Type(
id="TaggedItem",
rdfs_property=[list_prop],
restrictions=[
Restriction(property_type="tags", min_cardinality=0, max_cardinality=None) # Unbounded
]
)
self.facade.addType(list_type)
# Test list detection
self.assertTrue(self.facade._is_field_list(list_type, "tags"))
# Export and test
TaggedModel = self.facade.export_pydantic_model("TaggedItem")
tagged = TaggedModel(tags=["tag1", "tag2"])
self.assertEqual(tagged.tags, ["tag1", "tag2"])
def test_forward_references(self):
"""Test forward references between models"""
# Add Organization type that references Person
org_name_prop = TypeProperty(
id="name",
label="Organization Name",
range_includes=["http://www.w3.org/2001/XMLSchema#string"]
)
org_members_prop = TypeProperty(
id="members",
label="Members",
range_includes=["Person"] # Forward reference
)
org_type = Type(
id="Organization",
rdfs_property=[org_name_prop, org_members_prop],
restrictions=[
Restriction(property_type="name", min_cardinality=1, max_cardinality=1),
Restriction(property_type="members", min_cardinality=0, max_cardinality=None)
]
)
self.facade.addType(org_type)
# Export all models (should handle forward references)
models = self.facade.export_all_pydantic_models()
# Test that both models were created
self.assertIn("Person", models)
self.assertIn("Organization", models)
# Test basic functionality (forward ref might not work perfectly but shouldn't crash)
OrgModel = models["Organization"]
org = OrgModel(name="Test Corp")
self.assertEqual(org.name, "Test Corp")
def test_nonexistent_type(self):
"""Test error handling for nonexistent types"""
with self.assertRaises(ValueError):
self.facade.export_pydantic_model("NonExistentType")
def test_custom_base_class(self):
"""Test using custom base class"""
class CustomBase(BaseModel):
custom_field: str = "default"
PersonModel = self.facade.export_pydantic_model("Person", base_class=CustomBase)
# Should inherit from custom base
self.assertTrue(issubclass(PersonModel, CustomBase))
# Should have both custom and schema fields
person = PersonModel(name="Test")
self.assertEqual(person.name, "Test")
self.assertEqual(person.custom_field, "default")
def test_field_metadata(self):
"""Test that field metadata is preserved"""
PersonModel = self.facade.export_pydantic_model("Person")
# Check model schema includes field descriptions
schema = PersonModel.model_json_schema()
self.assertIn("Person's name", schema["properties"]["name"]["description"])
self.assertIn("Age in years", schema["properties"]["age"]["description"])
if __name__ == "__main__":
unittest.main()

View File

@@ -1,211 +0,0 @@
import unittest
import sys
from pathlib import Path
# Add source to path
sys.path.insert(0, str(Path(__file__).parent.parent / "src"))
from lib_ro_crate_schema.crate.restriction import Restriction
from rdflib import OWL, Literal, XSD
class TestRestriction(unittest.TestCase):
"""Test cases for the Restriction class"""
def setUp(self):
"""Set up test fixtures"""
self.basic_restriction = Restriction(property_type="testProperty")
self.complete_restriction = Restriction(
id="complete_restriction",
property_type="name",
min_cardinality=1,
max_cardinality=1
)
self.unbounded_restriction = Restriction(
property_type="tags",
min_cardinality=0,
max_cardinality=None # Unbounded
)
def test_restriction_creation(self):
"""Test basic Restriction object creation"""
self.assertEqual(self.basic_restriction.property_type, "testProperty")
self.assertIsNone(self.basic_restriction.min_cardinality)
self.assertIsNone(self.basic_restriction.max_cardinality)
self.assertIsNotNone(self.basic_restriction.id) # Auto-generated UUID
def test_restriction_with_cardinalities(self):
"""Test restriction with explicit cardinalities"""
self.assertEqual(self.complete_restriction.property_type, "name")
self.assertEqual(self.complete_restriction.min_cardinality, 1)
self.assertEqual(self.complete_restriction.max_cardinality, 1)
def test_unbounded_restriction(self):
"""Test restriction with unbounded max cardinality"""
self.assertEqual(self.unbounded_restriction.property_type, "tags")
self.assertEqual(self.unbounded_restriction.min_cardinality, 0)
self.assertIsNone(self.unbounded_restriction.max_cardinality)
def test_to_triples(self):
"""Test RDF triple generation"""
triples = list(self.complete_restriction.to_triples())
# Should generate multiple triples
self.assertGreater(len(triples), 0)
# Convert to string representation for easier testing
triple_strs = [(str(s), str(p), str(o)) for s, p, o in triples]
# Check for essential triples
type_triple_found = any("Restriction" in triple[2] for triple in triple_strs)
self.assertTrue(type_triple_found, "Should generate owl:Restriction type triple")
on_property_found = any("onProperty" in triple[1] for triple in triple_strs)
self.assertTrue(on_property_found, "Should generate owl:onProperty triple")
min_card_found = any("minCardinality" in triple[1] for triple in triple_strs)
self.assertTrue(min_card_found, "Should generate owl:minCardinality triple")
max_card_found = any("maxCardinality" in triple[1] for triple in triple_strs)
self.assertTrue(max_card_found, "Should generate owl:maxCardinality triple")
def test_minimal_restriction_triples(self):
"""Test triple generation for restriction with no cardinalities"""
minimal = Restriction(property_type="minimal_prop")
triples = list(minimal.to_triples())
# Should at least generate type and onProperty triples
self.assertGreater(len(triples), 0)
triple_strs = [(str(s), str(p), str(o)) for s, p, o in triples]
type_found = any("Restriction" in triple[2] for triple in triple_strs)
self.assertTrue(type_found, "Should generate owl:Restriction type")
on_property_found = any("onProperty" in triple[1] for triple in triple_strs)
self.assertTrue(on_property_found, "Should generate owl:onProperty")
# Should NOT generate cardinality triples when they're None
min_card_found = any("minCardinality" in triple[1] for triple in triple_strs)
max_card_found = any("maxCardinality" in triple[1] for triple in triple_strs)
self.assertFalse(min_card_found, "Should not generate minCardinality when None")
self.assertFalse(max_card_found, "Should not generate maxCardinality when None")
def test_only_min_cardinality(self):
"""Test restriction with only min cardinality set"""
restriction = Restriction(
property_type="min_only",
min_cardinality=1
)
triples = list(restriction.to_triples())
triple_strs = [(str(s), str(p), str(o)) for s, p, o in triples]
min_card_found = any("minCardinality" in triple[1] for triple in triple_strs)
max_card_found = any("maxCardinality" in triple[1] for triple in triple_strs)
self.assertTrue(min_card_found, "Should generate minCardinality")
self.assertFalse(max_card_found, "Should not generate maxCardinality when None")
def test_only_max_cardinality(self):
"""Test restriction with only max cardinality set"""
restriction = Restriction(
property_type="max_only",
max_cardinality=5
)
triples = list(restriction.to_triples())
triple_strs = [(str(s), str(p), str(o)) for s, p, o in triples]
min_card_found = any("minCardinality" in triple[1] for triple in triple_strs)
max_card_found = any("maxCardinality" in triple[1] for triple in triple_strs)
self.assertFalse(min_card_found, "Should not generate minCardinality when None")
self.assertTrue(max_card_found, "Should generate maxCardinality")
def test_zero_cardinalities(self):
"""Test restriction with zero cardinalities (explicit zeros)"""
restriction = Restriction(
property_type="zero_test",
min_cardinality=0,
max_cardinality=0
)
triples = list(restriction.to_triples())
triple_strs = [(str(s), str(p), str(o)) for s, p, o in triples]
# Zero cardinalities should be included (different from None)
min_card_found = any("minCardinality" in triple[1] and "0" in triple[2] for triple in triple_strs)
max_card_found = any("maxCardinality" in triple[1] and "0" in triple[2] for triple in triple_strs)
self.assertTrue(min_card_found, "Should generate minCardinality=0")
self.assertTrue(max_card_found, "Should generate maxCardinality=0")
def test_common_restriction_patterns(self):
"""Test common restriction patterns used in RO-Crate schemas"""
# Required single value (exactly one)
required_single = Restriction(
property_type="title",
min_cardinality=1,
max_cardinality=1
)
# Optional single value (zero or one)
optional_single = Restriction(
property_type="description",
min_cardinality=0,
max_cardinality=1
)
# Required multiple values (one or more)
required_multiple = Restriction(
property_type="author",
min_cardinality=1,
max_cardinality=None
)
# Optional multiple values (zero or more)
optional_multiple = Restriction(
property_type="keywords",
min_cardinality=0,
max_cardinality=None
)
# Test each pattern generates appropriate triples
patterns = [required_single, optional_single, required_multiple, optional_multiple]
for restriction in patterns:
triples = list(restriction.to_triples())
self.assertGreater(len(triples), 0, f"Restriction {restriction.property_type} should generate triples")
# All should have type and onProperty
triple_strs = [(str(s), str(p), str(o)) for s, p, o in triples]
type_found = any("Restriction" in triple[2] for triple in triple_strs)
on_prop_found = any("onProperty" in triple[1] for triple in triple_strs)
self.assertTrue(type_found, f"Restriction {restriction.property_type} should have type")
self.assertTrue(on_prop_found, f"Restriction {restriction.property_type} should have onProperty")
def test_custom_id(self):
"""Test restriction with custom ID"""
custom_id = "Person_name_restriction"
restriction = Restriction(
id=custom_id,
property_type="name",
min_cardinality=1
)
self.assertEqual(restriction.id, custom_id)
triples = list(restriction.to_triples())
# The subject of triples should use the custom ID
subjects = set(str(triple[0]) for triple in triples)
custom_id_used = any(custom_id in subject for subject in subjects)
self.assertTrue(custom_id_used, "Should use custom ID in triples")
if __name__ == '__main__':
unittest.main()

View File

@@ -1,397 +0,0 @@
import unittest
import sys
import json
import tempfile
from pathlib import Path
# Add source to path
sys.path.insert(0, str(Path(__file__).parent.parent / "src"))
from lib_ro_crate_schema.crate.schema_facade import SchemaFacade
from lib_ro_crate_schema.crate.type import Type
from lib_ro_crate_schema.crate.type_property import TypeProperty
from lib_ro_crate_schema.crate.literal_type import LiteralType
from lib_ro_crate_schema.crate.metadata_entry import MetadataEntry
from lib_ro_crate_schema.crate.restriction import Restriction
class TestRoundTripCycles(unittest.TestCase):
"""Test round-trip conversion cycles to verify no data loss during import/export"""
def setUp(self):
"""Set up test fixtures with comprehensive schema"""
# Create a comprehensive test schema
# Properties
self.name_prop = TypeProperty(
id="name",
range_includes=[LiteralType.STRING],
required=True,
label="Full Name",
comment="The complete name of the entity",
ontological_annotations=["https://schema.org/name"]
)
self.age_prop = TypeProperty(
id="age",
range_includes=[LiteralType.INTEGER],
required=False,
label="Age",
comment="Age in years"
)
self.email_prop = TypeProperty(
id="email",
range_includes=[LiteralType.STRING],
required=False,
label="Email Address"
)
self.knows_prop = TypeProperty(
id="knows",
range_includes=["Person"],
required=False,
label="Knows",
comment="People this person knows"
)
# Restrictions
self.name_restriction = Restriction(
id="Person_name_restriction",
property_type="name",
min_cardinality=1,
max_cardinality=1
)
self.knows_restriction = Restriction(
id="Person_knows_restriction",
property_type="knows",
min_cardinality=0,
max_cardinality=None # Unbounded
)
# Types
self.person_type = Type(
id="Person",
subclass_of=["https://schema.org/Thing"],
ontological_annotations=["https://schema.org/Person"],
rdfs_property=[self.name_prop, self.age_prop, self.email_prop, self.knows_prop],
restrictions=[self.name_restriction, self.knows_restriction],
comment="A person entity with comprehensive metadata",
label="Person"
)
self.organization_type = Type(
id="Organization",
subclass_of=["https://schema.org/Thing"],
ontological_annotations=["https://schema.org/Organization"],
rdfs_property=[self.name_prop],
comment="An organization",
label="Organization"
)
# Metadata entries
self.person1 = MetadataEntry(
id="person1",
class_id="Person",
properties={
"name": "Alice Johnson",
"age": 30,
"email": "alice@example.com"
},
references={
"knows": ["person2"]
}
)
self.person2 = MetadataEntry(
id="person2",
class_id="Person",
properties={
"name": "Bob Smith",
"age": 25
},
references={
"knows": ["person1"] # Mutual relationship
}
)
self.org1 = MetadataEntry(
id="org1",
class_id="Organization",
properties={
"name": "Example Corp"
}
)
# Complete facade
self.original_facade = SchemaFacade(
types=[self.person_type, self.organization_type],
metadata_entries=[self.person1, self.person2, self.org1]
)
def test_export_import_roundtrip(self):
"""Test export to file and import back maintains schema integrity"""
with tempfile.TemporaryDirectory() as temp_dir:
# Export original facade
self.original_facade.write(
temp_dir,
name="Roundtrip Test",
description="Testing roundtrip conversion",
license="MIT"
)
# Import back from file
metadata_file = Path(temp_dir) / "ro-crate-metadata.json"
imported_facade = SchemaFacade.from_ro_crate(metadata_file)
# Compare facades
self._compare_facades(self.original_facade, imported_facade, "File roundtrip")
def test_json_dict_roundtrip(self):
"""Test conversion to JSON dict and back maintains schema integrity"""
# Convert to JSON dict
json_data = self.original_facade.to_json()
# Import from dict
imported_facade = SchemaFacade.from_dict(json_data)
# Compare facades
self._compare_facades(self.original_facade, imported_facade, "JSON dict roundtrip")
def test_multiple_roundtrips(self):
"""Test multiple export/import cycles to ensure stability"""
current_facade = self.original_facade
for cycle in range(3): # Test 3 cycles
with tempfile.TemporaryDirectory() as temp_dir:
# Export current facade
current_facade.write(
temp_dir,
name=f"Multi-roundtrip Cycle {cycle + 1}",
description="Testing multiple roundtrip cycles"
)
# Import back
metadata_file = Path(temp_dir) / "ro-crate-metadata.json"
current_facade = SchemaFacade.from_ro_crate(metadata_file)
# Compare with original (should remain consistent)
self._compare_facades(
self.original_facade,
current_facade,
f"Multiple roundtrip cycle {cycle + 1}"
)
def test_triples_preservation(self):
"""Test that RDF triples are preserved through roundtrip"""
# Get original triples
original_triples = set()
for triple in self.original_facade.to_triples():
# Normalize to string representation for comparison
triple_str = (str(triple[0]), str(triple[1]), str(triple[2]))
original_triples.add(triple_str)
# Roundtrip via JSON
json_data = self.original_facade.to_json()
imported_facade = SchemaFacade.from_dict(json_data)
# Get imported triples
imported_triples = set()
for triple in imported_facade.to_triples():
triple_str = (str(triple[0]), str(triple[1]), str(triple[2]))
imported_triples.add(triple_str)
# Compare triple sets
print(f"\nTriples preservation test:")
print(f"Original triples: {len(original_triples)}")
print(f"Imported triples: {len(imported_triples)}")
# Find differences
only_in_original = original_triples - imported_triples
only_in_imported = imported_triples - original_triples
if only_in_original:
print(f"Triples lost in import: {len(only_in_original)}")
for triple in list(only_in_original)[:5]: # Show first 5
print(f" Lost: {triple}")
if only_in_imported:
print(f"New triples in import: {len(only_in_imported)}")
for triple in list(only_in_imported)[:5]: # Show first 5
print(f" New: {triple}")
# Allow some differences due to RO-Crate structure additions
# But core schema triples should be preserved
self.assertGreater(len(imported_triples), 0, "Should have imported triples")
def test_obenbis_roundtrip(self):
"""Test roundtrip with the OpenBIS example if available"""
obenbis_file = (Path(__file__).parent.parent.parent.parent /
"example" / "obenbis-one-publication" / "ro-crate-metadata.json")
if not obenbis_file.exists():
self.skipTest(f"OpenBIS example not found at {obenbis_file}")
# Import OpenBIS RO-Crate
original_facade = SchemaFacade.from_ro_crate(obenbis_file)
with tempfile.TemporaryDirectory() as temp_dir:
# Export it
original_facade.write(
temp_dir,
name="OpenBIS Roundtrip Test",
description="Testing OpenBIS RO-Crate roundtrip"
)
# Import back
metadata_file = Path(temp_dir) / "ro-crate-metadata.json"
imported_facade = SchemaFacade.from_ro_crate(metadata_file)
# Basic consistency checks
print(f"\nOpenBIS roundtrip test:")
print(f"Original - Types: {len(original_facade.types)}, Entries: {len(original_facade.metadata_entries)}")
print(f"Imported - Types: {len(imported_facade.types)}, Entries: {len(imported_facade.metadata_entries)}")
# Should have similar structure (allowing for some differences due to RO-Crate additions)
self.assertGreaterEqual(
len(imported_facade.types) + len(imported_facade.metadata_entries),
0,
"Should have imported some entities"
)
def test_property_cardinality_preservation(self):
"""Test that property cardinality information is preserved"""
# Create a facade with specific cardinality requirements
required_prop = TypeProperty(id="required_field", range_includes=[LiteralType.STRING], required=True)
optional_prop = TypeProperty(id="optional_field", range_includes=[LiteralType.STRING], required=False)
test_type = Type(
id="TestType",
rdfs_property=[required_prop, optional_prop]
)
test_facade = SchemaFacade(types=[test_type])
# Roundtrip via JSON
json_data = test_facade.to_json()
imported_facade = SchemaFacade.from_dict(json_data)
# Check that cardinality info is preserved through restrictions
imported_type = imported_facade.get_type("TestType")
self.assertIsNotNone(imported_type)
restrictions = imported_type.get_restrictions()
# Find restrictions for our properties
required_restriction = None
optional_restriction = None
for restriction in restrictions:
if restriction.property_type == "required_field":
required_restriction = restriction
elif restriction.property_type == "optional_field":
optional_restriction = restriction
# Check cardinalities (if restrictions were generated)
if required_restriction:
self.assertEqual(required_restriction.min_cardinality, 1, "Required field should have min cardinality 1")
if optional_restriction:
self.assertEqual(optional_restriction.min_cardinality, 0, "Optional field should have min cardinality 0")
def test_ontological_annotations_preservation(self):
"""Test that ontological annotations are preserved"""
# Test facade with ontological annotations
json_data = self.original_facade.to_json()
imported_facade = SchemaFacade.from_dict(json_data)
# Check Person type annotations
original_person = self.original_facade.get_type("Person")
imported_person = imported_facade.get_type("Person")
if imported_person and original_person:
print(f"\nOntological annotations test:")
print(f"Original Person ontological annotations: {original_person.ontological_annotations}")
print(f"Imported Person ontological annotations: {imported_person.ontological_annotations}")
# Should preserve ontological mapping
if original_person.ontological_annotations:
self.assertIsNotNone(
imported_person.ontological_annotations,
"Should preserve ontological annotations"
)
def _compare_facades(self, original: SchemaFacade, imported: SchemaFacade, test_name: str):
"""Helper method to compare two facades for consistency"""
print(f"\n{test_name} comparison:")
print(f"Original - Types: {len(original.types)}, Entries: {len(original.metadata_entries)}")
print(f"Imported - Types: {len(imported.types)}, Entries: {len(imported.metadata_entries)}")
# Basic counts should be similar (allowing for RO-Crate structure additions)
self.assertGreaterEqual(
len(imported.types) + len(imported.metadata_entries),
len(original.types) + len(original.metadata_entries),
"Should preserve at least original entities"
)
# Check specific types are preserved
for original_type in original.types:
imported_type = imported.get_type(original_type.id)
if imported_type: # May not be preserved due to import/export limitations
self.assertEqual(
imported_type.id,
original_type.id,
f"Type ID should be preserved: {original_type.id}"
)
if original_type.label and imported_type.label:
self.assertEqual(
imported_type.label,
original_type.label,
f"Type label should be preserved: {original_type.id}"
)
# Check specific metadata entries are preserved
for original_entry in original.metadata_entries:
imported_entry = imported.get_entry(original_entry.id)
if imported_entry: # May not be preserved due to import/export limitations
self.assertEqual(
imported_entry.id,
original_entry.id,
f"Entry ID should be preserved: {original_entry.id}"
)
self.assertEqual(
imported_entry.class_id,
original_entry.class_id,
f"Entry class ID should be preserved: {original_entry.id}"
)
# Test that we can generate valid output from imported facade
try:
imported_json = imported.to_json()
self.assertIn("@context", imported_json)
self.assertIn("@graph", imported_json)
except Exception as e:
self.fail(f"Failed to generate JSON from imported facade: {e}")
try:
imported_triples = list(imported.to_triples())
self.assertGreater(len(imported_triples), 0, "Should generate triples from imported facade")
except Exception as e:
self.fail(f"Failed to generate triples from imported facade: {e}")
print(f"{test_name} completed successfully")
if __name__ == '__main__':
unittest.main()

View File

@@ -1,337 +0,0 @@
import unittest
import sys
import json
import tempfile
from pathlib import Path
# Add source to path
sys.path.insert(0, str(Path(__file__).parent.parent / "src"))
from lib_ro_crate_schema.crate.schema_facade import SchemaFacade
from lib_ro_crate_schema.crate.type import Type
from lib_ro_crate_schema.crate.type_property import TypeProperty
from lib_ro_crate_schema.crate.literal_type import LiteralType
from lib_ro_crate_schema.crate.metadata_entry import MetadataEntry
from lib_ro_crate_schema.crate.restriction import Restriction
class TestSchemaFacade(unittest.TestCase):
"""Test cases for the SchemaFacade class"""
def setUp(self):
"""Set up test fixtures"""
# Create a basic schema with types and properties
self.name_property = TypeProperty(
id="name",
range_includes=[LiteralType.STRING],
required=True
)
self.age_property = TypeProperty(
id="age",
range_includes=[LiteralType.INTEGER],
required=False
)
self.person_type = Type(
id="Person",
rdfs_property=[self.name_property, self.age_property],
comment="A person entity",
label="Person"
)
self.person_entry = MetadataEntry(
id="person1",
class_id="Person",
properties={"name": "John Doe", "age": 30}
)
self.facade = SchemaFacade(
types=[self.person_type],
metadata_entries=[self.person_entry]
)
def test_facade_creation(self):
"""Test basic SchemaFacade creation"""
empty_facade = SchemaFacade()
self.assertEqual(len(empty_facade.types), 0)
self.assertEqual(len(empty_facade.metadata_entries), 0)
self.assertEqual(len(self.facade.types), 1)
self.assertEqual(len(self.facade.metadata_entries), 1)
def test_fluent_api(self):
"""Test fluent API methods"""
facade = SchemaFacade()
result = facade.addType(self.person_type).addEntry(self.person_entry)
# Check method chaining works
self.assertEqual(result, facade)
# Check items were added
self.assertIn(self.person_type, facade.types)
self.assertIn(self.person_entry, facade.metadata_entries)
def test_get_methods(self):
"""Test getter methods"""
# Test get_types
types = self.facade.get_types()
self.assertEqual(len(types), 1)
self.assertEqual(types[0].id, "Person")
# Test get_type
person_type = self.facade.get_type("Person")
self.assertIsNotNone(person_type)
self.assertEqual(person_type.id, "Person")
non_existent = self.facade.get_type("NonExistent")
self.assertIsNone(non_existent)
# Test get_entries
entries = self.facade.get_entries()
self.assertEqual(len(entries), 1)
self.assertEqual(entries[0].id, "person1")
# Test get_entry
person_entry = self.facade.get_entry("person1")
self.assertIsNotNone(person_entry)
self.assertEqual(person_entry.id, "person1")
# Test get_entries_by_class
person_entries = self.facade.get_entries_by_class("Person")
self.assertEqual(len(person_entries), 1)
self.assertEqual(person_entries[0].id, "person1")
def test_java_api_compatibility(self):
"""Test Java API compatibility methods"""
# Test property methods
properties = self.facade.get_property_types()
self.assertEqual(len(properties), 2)
property_ids = [prop.id for prop in properties]
self.assertIn("name", property_ids)
self.assertIn("age", property_ids)
# Test get_property_type
name_prop = self.facade.get_property_type("name")
self.assertIsNotNone(name_prop)
self.assertEqual(name_prop.id, "name")
# Test get_crate (basic functionality)
crate = self.facade.get_crate()
self.assertIsNotNone(crate)
def test_to_triples(self):
"""Test RDF triple generation"""
triples = list(self.facade.to_triples())
# Should generate triples for both types and metadata entries
self.assertGreater(len(triples), 0)
triple_strs = [(str(s), str(p), str(o)) for s, p, o in triples]
# Should include type definition triples
class_triple_found = any("Class" in triple[2] for triple in triple_strs)
self.assertTrue(class_triple_found, "Should generate class definition triples")
# Should include metadata entry triples
person_triple_found = any("person1" in triple[0] for triple in triple_strs)
self.assertTrue(person_triple_found, "Should generate metadata entry triples")
def test_to_graph(self):
"""Test RDF Graph generation"""
graph = self.facade.to_graph()
# Should have triples
self.assertGreater(len(graph), 0)
# Should have proper namespace binding
namespaces = dict(graph.namespaces())
self.assertIn('base', namespaces)
def test_to_json(self):
"""Test JSON-LD generation"""
json_data = self.facade.to_json()
self.assertIsInstance(json_data, dict)
self.assertIn("@context", json_data)
self.assertIn("@graph", json_data)
def test_write_to_crate(self):
"""Test writing to RO-Crate directory"""
with tempfile.TemporaryDirectory() as temp_dir:
self.facade.write(
temp_dir,
name="Test Crate",
description="A test RO-Crate",
license="MIT"
)
# Check that metadata file was created
metadata_file = Path(temp_dir) / "ro-crate-metadata.json"
self.assertTrue(metadata_file.exists())
# Check that the file contains valid JSON
with open(metadata_file, 'r') as f:
crate_data = json.load(f)
self.assertIn("@context", crate_data)
self.assertIn("@graph", crate_data)
def test_from_ro_crate_roundtrip(self):
"""Test creating facade from RO-Crate and ensuring roundtrip consistency"""
with tempfile.TemporaryDirectory() as temp_dir:
# Write original facade
self.facade.write(temp_dir, name="Roundtrip Test")
# Read back from file
metadata_file = Path(temp_dir) / "ro-crate-metadata.json"
imported_facade = SchemaFacade.from_ro_crate(metadata_file)
# Check that types were imported
self.assertGreater(len(imported_facade.types), 0)
# Check that metadata entries were imported
self.assertGreater(len(imported_facade.metadata_entries), 0)
def test_from_dict(self):
"""Test creating facade from dictionary"""
# Create a simple RO-Crate structure
crate_dict = {
"@context": ["https://w3id.org/ro/crate/1.1/context"],
"@graph": [
{
"@id": "./",
"@type": "Dataset",
"name": "Test Dataset"
},
{
"@id": "ro-crate-metadata.json",
"@type": "CreativeWork",
"about": {"@id": "./"}
},
{
"@id": "Person",
"@type": "rdfs:Class",
"rdfs:label": "Person",
"rdfs:comment": "A person"
},
{
"@id": "name",
"@type": "rdf:Property",
"rdfs:label": "Name",
"schema:domainIncludes": {"@id": "Person"},
"schema:rangeIncludes": {"@id": "http://www.w3.org/2001/XMLSchema#string"}
},
{
"@id": "person1",
"@type": "Person",
"name": "Alice Johnson"
}
]
}
facade = SchemaFacade.from_dict(crate_dict)
# Should have imported the class
person_type = facade.get_type("Person")
self.assertIsNotNone(person_type)
self.assertEqual(person_type.label, "Person")
# Should have imported the metadata entry
person_entry = facade.get_entry("person1")
self.assertIsNotNone(person_entry)
self.assertEqual(person_entry.class_id, "Person")
def test_resolve_forward_refs(self):
"""Test forward reference resolution"""
# This is mostly an internal method, but we can test it doesn't crash
self.facade.resolve_forward_refs()
# Should still have the same number of types and entries
self.assertEqual(len(self.facade.types), 1)
self.assertEqual(len(self.facade.metadata_entries), 1)
def test_add_property_type(self):
"""Test adding standalone property to registry"""
new_prop = TypeProperty(id="email", range_includes=[LiteralType.STRING])
result = self.facade.add_property_type(new_prop)
# Should return self for chaining
self.assertEqual(result, self.facade)
# Should be able to retrieve the property
retrieved_prop = self.facade.get_property_type("email")
self.assertIsNotNone(retrieved_prop)
self.assertEqual(retrieved_prop.id, "email")
def test_complex_schema(self):
"""Test facade with complex schema including restrictions"""
# Create a type with custom restrictions
title_prop = TypeProperty(id="title", range_includes=[LiteralType.STRING])
authors_prop = TypeProperty(id="authors", range_includes=["Person"])
title_restriction = Restriction(
property_type="title",
min_cardinality=1,
max_cardinality=1
)
authors_restriction = Restriction(
property_type="authors",
min_cardinality=1,
max_cardinality=None # Unbounded
)
article_type = Type(
id="Article",
rdfs_property=[title_prop, authors_prop],
restrictions=[title_restriction, authors_restriction],
comment="A research article",
label="Article"
)
article_entry = MetadataEntry(
id="article1",
class_id="Article",
properties={"title": "Great Research"},
references={"authors": ["person1"]}
)
complex_facade = SchemaFacade(
types=[self.person_type, article_type],
metadata_entries=[self.person_entry, article_entry]
)
# Test that complex schema works
self.assertEqual(len(complex_facade.types), 2)
self.assertEqual(len(complex_facade.metadata_entries), 2)
# Test restrictions are included
article = complex_facade.get_type("Article")
restrictions = article.get_restrictions()
self.assertGreater(len(restrictions), 0)
# Test triple generation works
triples = list(complex_facade.to_triples())
self.assertGreater(len(triples), 0)
def test_empty_facade_operations(self):
"""Test operations on empty facade"""
empty_facade = SchemaFacade()
# Should handle empty operations gracefully
self.assertEqual(len(empty_facade.get_types()), 0)
self.assertEqual(len(empty_facade.get_entries()), 0)
self.assertIsNone(empty_facade.get_type("NonExistent"))
self.assertIsNone(empty_facade.get_entry("NonExistent"))
self.assertEqual(len(empty_facade.get_entries_by_class("NonExistent")), 0)
# Should still generate basic structure
json_data = empty_facade.to_json()
self.assertIn("@context", json_data)
if __name__ == '__main__':
unittest.main()

View File

@@ -1,129 +0,0 @@
#!/usr/bin/env python3
"""
Test standalone properties and restrictions in SchemaFacade
"""
import sys
sys.path.append('src')
from lib_ro_crate_schema.crate.schema_facade import SchemaFacade
from lib_ro_crate_schema.crate.type_property import TypeProperty
from lib_ro_crate_schema.crate.restriction import Restriction
from lib_ro_crate_schema.crate.type import Type
def test_standalone_elements():
"""Test adding and retrieving standalone properties and restrictions"""
print("🧪 Testing standalone properties and restrictions...")
# Create a facade
facade = SchemaFacade()
# Test 1: Add standalone property
standalone_prop = TypeProperty(
id="globalProperty",
label="Global Property",
comment="A property that exists independently of any type",
range_includes=["xsd:string"]
)
facade.add_property_type(standalone_prop)
print(f"✅ Added standalone property: {standalone_prop.id}")
# Test 2: Add standalone restriction
standalone_restriction = Restriction(
id="globalRestriction",
property_type="globalProperty",
min_cardinality=1,
max_cardinality=5
)
facade.add_restriction(standalone_restriction)
print(f"✅ Added standalone restriction: {standalone_restriction.id}")
# Test 3: Add a type with its own properties
person_name_prop = TypeProperty(
id="personName",
label="Person Name",
comment="Name property specific to Person type",
range_includes=["xsd:string"]
)
person_type = Type(
id="Person",
label="Person",
comment="A person entity",
rdfs_property=[person_name_prop]
)
facade.addType(person_type)
print(f"✅ Added type with attached property: {person_type.id}")
# Test 4: Verify counts
all_properties = facade.get_property_types()
all_restrictions = facade.get_restrictions()
print(f"\n📊 Summary:")
print(f" Total properties: {len(all_properties)}")
print(f" Total restrictions: {len(all_restrictions)}")
print(f" Total types: {len(facade.types)}")
# Test 5: Check specific retrieval
retrieved_prop = facade.get_property_type("globalProperty")
retrieved_restriction = facade.get_restriction("globalRestriction")
print(f"\n🔍 Specific retrieval:")
print(f" Retrieved global property: {'' if retrieved_prop else ''}")
print(f" Retrieved global restriction: {'' if retrieved_restriction else ''}")
# Test 6: List all properties (standalone + type-attached)
print(f"\n📋 All properties found:")
for prop in all_properties:
is_standalone = any(p.id == prop.id for p in facade.property_types)
status = "standalone" if is_standalone else "type-attached"
print(f" - {prop.id} ({status})")
# Test 7: Export to RDF and verify triples include standalone elements
print(f"\n🔄 RDF export test:")
graph = facade.to_graph()
triple_count = len(graph)
print(f" Generated {triple_count} RDF triples")
# Test 8: Round-trip test - export and reimport
print(f"\n🔄 Round-trip test:")
import os
output_dir = "output_crates"
os.makedirs(output_dir, exist_ok=True)
test_output_path = os.path.join(output_dir, "test_standalone_output")
facade.write(test_output_path, name="Standalone Elements Test")
# Import back
imported_facade = SchemaFacade.from_ro_crate(test_output_path)
imported_properties = imported_facade.get_property_types()
imported_restrictions = imported_facade.get_restrictions()
print(f" Original properties: {len(all_properties)}")
print(f" Imported properties: {len(imported_properties)}")
print(f" Original restrictions: {len(all_restrictions)}")
print(f" Imported restrictions: {len(imported_restrictions)}")
# Check if our standalone elements survived the round-trip
survived_global_prop = imported_facade.get_property_type("globalProperty")
survived_global_restr = imported_facade.get_restriction("globalRestriction")
print(f" Standalone property survived: {'' if survived_global_prop else ''}")
print(f" Standalone restriction survived: {'' if survived_global_restr else ''}")
print(f"\n🎉 Test completed!")
# Verify test assertions instead of returning values
assert survived_global_prop is not None, "Standalone property should survive round-trip"
assert survived_global_restr is not None, "Standalone restriction should survive round-trip"
assert len(imported_properties) > 0, "Should have imported properties"
assert len(imported_restrictions) > 0, "Should have imported restrictions"
if __name__ == "__main__":
test_standalone_elements()
print(f"\n📈 Test completed successfully!")

View File

@@ -1,144 +0,0 @@
import unittest
import sys
from pathlib import Path
# Add source to path
sys.path.insert(0, str(Path(__file__).parent.parent / "src"))
from lib_ro_crate_schema.crate.type import Type
from lib_ro_crate_schema.crate.type_property import TypeProperty
from lib_ro_crate_schema.crate.literal_type import LiteralType
from lib_ro_crate_schema.crate.restriction import Restriction
from rdflib import RDFS, RDF, OWL, Literal, URIRef
class TestType(unittest.TestCase):
"""Test cases for the Type class"""
def setUp(self):
"""Set up test fixtures"""
self.basic_type = Type(id="TestType")
# Create a property for testing
self.test_property = TypeProperty(
id="testProperty",
range_includes=[LiteralType.STRING],
required=True
)
# Create a complete type with all features
self.complete_type = Type(
id="Person",
subclass_of=["https://schema.org/Thing"],
ontological_annotations=["https://schema.org/Person"],
rdfs_property=[self.test_property],
comment="A person entity",
label="Person"
)
def test_type_creation(self):
"""Test basic Type object creation"""
self.assertEqual(self.basic_type.id, "TestType")
self.assertIsInstance(self.basic_type.subclass_of, list)
self.assertEqual(self.basic_type.subclass_of, ["https://schema.org/Thing"])
def test_fluent_api(self):
"""Test fluent API methods"""
type_obj = Type(id="FluentTest")
result = (type_obj
.setLabel("Test Label")
.setComment("Test Comment")
.addProperty(self.test_property)
.setOntologicalAnnotations(["http://example.org/TestClass"]))
# Check method chaining works
self.assertEqual(result, type_obj)
# Check values were set
self.assertEqual(type_obj.label, "Test Label")
self.assertEqual(type_obj.comment, "Test Comment")
self.assertEqual(type_obj.ontological_annotations, ["http://example.org/TestClass"])
self.assertIn(self.test_property, type_obj.rdfs_property)
def test_java_api_compatibility(self):
"""Test Java API compatibility methods"""
self.assertEqual(self.complete_type.getId(), "Person")
self.assertEqual(self.complete_type.getLabel(), "Person")
self.assertEqual(self.complete_type.getComment(), "A person entity")
self.assertEqual(self.complete_type.getSubClassOf(), ["https://schema.org/Thing"])
self.assertEqual(self.complete_type.getOntologicalAnnotations(), ["https://schema.org/Person"])
def test_get_restrictions(self):
"""Test restriction generation from properties"""
restrictions = self.complete_type.get_restrictions()
self.assertIsInstance(restrictions, list)
self.assertTrue(len(restrictions) >= 1)
# Find the restriction for our test property
test_prop_restriction = None
for restriction in restrictions:
if restriction.property_type == "testProperty":
test_prop_restriction = restriction
break
self.assertIsNotNone(test_prop_restriction)
self.assertEqual(test_prop_restriction.min_cardinality, 1) # required=True
def test_to_triples(self):
"""Test RDF triple generation"""
triples = list(self.complete_type.to_triples())
# Should generate multiple triples
self.assertGreater(len(triples), 0)
# Convert to list of tuples for easier testing
triple_strs = [(str(s), str(p), str(o)) for s, p, o in triples]
# Check for essential triples - look for Class in the object
type_triple_found = any("Class" in triple[2] for triple in triple_strs)
self.assertTrue(type_triple_found, "Should generate rdfs:Class type triple")
label_triple_found = any("label" in triple[1] for triple in triple_strs)
self.assertTrue(label_triple_found, "Should generate rdfs:label triple")
def test_empty_type(self):
"""Test type with minimal configuration"""
empty_type = Type(id="MinimalType")
triples = list(empty_type.to_triples())
# Should at least generate the class type declaration
self.assertGreater(len(triples), 0)
def test_property_addition(self):
"""Test adding properties to a type"""
type_obj = Type(id="TestType")
prop1 = TypeProperty(id="prop1", range_includes=[LiteralType.STRING])
prop2 = TypeProperty(id="prop2", range_includes=[LiteralType.INTEGER])
type_obj.addProperty(prop1).addProperty(prop2)
self.assertEqual(len(type_obj.rdfs_property), 2)
self.assertIn(prop1, type_obj.rdfs_property)
self.assertIn(prop2, type_obj.rdfs_property)
def test_custom_restrictions(self):
"""Test type with custom restrictions"""
custom_restriction = Restriction(
property_type="customProp",
min_cardinality=2,
max_cardinality=5
)
type_obj = Type(
id="RestrictedType",
restrictions=[custom_restriction]
)
restrictions = type_obj.get_restrictions()
self.assertIn(custom_restriction, restrictions)
if __name__ == '__main__':
unittest.main()

View File

@@ -1,187 +0,0 @@
import unittest
import sys
from pathlib import Path
# Add source to path
sys.path.insert(0, str(Path(__file__).parent.parent / "src"))
from lib_ro_crate_schema.crate.type_property import TypeProperty
from lib_ro_crate_schema.crate.literal_type import LiteralType
from rdflib import RDF, RDFS, Literal, URIRef
class TestTypeProperty(unittest.TestCase):
"""Test cases for the TypeProperty class"""
def setUp(self):
"""Set up test fixtures"""
self.basic_property = TypeProperty(id="basicProp")
self.complete_property = TypeProperty(
id="completeProp",
domain_includes=["Person"],
range_includes=[LiteralType.STRING],
ontological_annotations=["https://schema.org/name"],
comment="A complete property for testing",
label="Complete Property",
required=True
)
def test_property_creation(self):
"""Test basic TypeProperty object creation"""
self.assertEqual(self.basic_property.id, "basicProp")
self.assertEqual(self.basic_property.domain_includes, [])
self.assertEqual(self.basic_property.range_includes, [])
self.assertIsNone(self.basic_property.required)
def test_fluent_api(self):
"""Test fluent API methods"""
prop = TypeProperty(id="fluentTest")
result = (prop
.setLabel("Test Label")
.setComment("Test Comment")
.setTypes([LiteralType.STRING, LiteralType.INTEGER])
.setRequired(True)
.setOntologicalAnnotations(["http://example.org/prop"]))
# Check method chaining works
self.assertEqual(result, prop)
# Check values were set
self.assertEqual(prop.label, "Test Label")
self.assertEqual(prop.comment, "Test Comment")
self.assertTrue(prop.required)
self.assertEqual(prop.range_includes, [LiteralType.STRING, LiteralType.INTEGER])
self.assertEqual(prop.ontological_annotations, ["http://example.org/prop"])
def test_add_type(self):
"""Test adding single type to range"""
prop = TypeProperty(id="testProp")
prop.addType(LiteralType.STRING)
prop.addType("CustomType")
self.assertIn(LiteralType.STRING, prop.range_includes)
self.assertIn("CustomType", prop.range_includes)
def test_java_api_compatibility(self):
"""Test Java API compatibility methods"""
self.assertEqual(self.complete_property.getId(), "completeProp")
self.assertEqual(self.complete_property.getLabel(), "Complete Property")
self.assertEqual(self.complete_property.getComment(), "A complete property for testing")
self.assertEqual(self.complete_property.getDomain(), ["Person"])
self.assertEqual(self.complete_property.getRange(), [LiteralType.STRING])
self.assertEqual(self.complete_property.getOntologicalAnnotations(), ["https://schema.org/name"])
def test_cardinality_methods(self):
"""Test cardinality getter methods"""
# Required property
required_prop = TypeProperty(id="required", required=True)
self.assertEqual(required_prop.get_min_cardinality(), 1)
self.assertEqual(required_prop.get_max_cardinality(), 1)
# Optional property
optional_prop = TypeProperty(id="optional", required=False)
self.assertEqual(optional_prop.get_min_cardinality(), 0)
self.assertEqual(optional_prop.get_max_cardinality(), 1)
# Unspecified property (defaults to optional)
unspecified_prop = TypeProperty(id="unspecified")
self.assertEqual(unspecified_prop.get_min_cardinality(), 0)
self.assertEqual(unspecified_prop.get_max_cardinality(), 1)
def test_to_triples(self):
"""Test RDF triple generation"""
triples = list(self.complete_property.to_triples())
# Should generate multiple triples
self.assertGreater(len(triples), 0)
# Convert to string representation for easier testing
triple_strs = [(str(s), str(p), str(o)) for s, p, o in triples]
# Check for essential triples
type_triple_found = any("Property" in triple[2] for triple in triple_strs)
self.assertTrue(type_triple_found, "Should generate rdf:Property type triple")
label_triple_found = any("label" in triple[1] for triple in triple_strs)
self.assertTrue(label_triple_found, "Should generate rdfs:label triple")
domain_triple_found = any("domainIncludes" in triple[1] for triple in triple_strs)
self.assertTrue(domain_triple_found, "Should generate domainIncludes triple")
def test_range_includes_xsd_types(self):
"""Test handling of XSD data types in range_includes"""
prop = TypeProperty(
id="xsdTest",
range_includes=["xsd:string", "xsd:integer", "xsd:boolean"]
)
triples = list(prop.to_triples())
triple_strs = [(str(s), str(p), str(o)) for s, p, o in triples]
# Should convert xsd: prefixes to full URIs
xsd_string_found = any("XMLSchema#string" in triple[2] for triple in triple_strs)
self.assertTrue(xsd_string_found, "Should convert xsd:string to full URI")
def test_range_includes_base_types(self):
"""Test handling of base: prefixed types in range_includes"""
prop = TypeProperty(
id="baseTest",
range_includes=["base:CustomType"]
)
triples = list(prop.to_triples())
triple_strs = [(str(s), str(p), str(o)) for s, p, o in triples]
# Should handle base: prefixed types
base_type_found = any("CustomType" in triple[2] for triple in triple_strs)
self.assertTrue(base_type_found, "Should handle base: prefixed types")
def test_ontological_annotations(self):
"""Test ontological annotation handling"""
prop = TypeProperty(
id="ontoTest",
ontological_annotations=["https://schema.org/name", "http://purl.org/dc/terms/title"]
)
triples = list(prop.to_triples())
triple_strs = [(str(s), str(p), str(o)) for s, p, o in triples]
# Should generate owl:equivalentProperty triples
equiv_prop_found = any("equivalentProperty" in triple[1] for triple in triple_strs)
self.assertTrue(equiv_prop_found, "Should generate owl:equivalentProperty triples")
def test_empty_property(self):
"""Test property with minimal configuration"""
empty_prop = TypeProperty(id="minimal")
triples = list(empty_prop.to_triples())
# Should at least generate the property type declaration
self.assertGreater(len(triples), 0)
# Should be an rdf:Property
type_triple_found = any("Property" in str(triple) for triple in triples)
self.assertTrue(type_triple_found)
def test_multiple_domains(self):
"""Test property with multiple domain classes"""
prop = TypeProperty(
id="multiDomain",
domain_includes=["Person", "Organization", "Event"]
)
triples = list(prop.to_triples())
triple_strs = [(str(s), str(p), str(o)) for s, p, o in triples]
# Should generate domainIncludes for each domain
person_domain = any("Person" in triple[2] and "domainIncludes" in triple[1] for triple in triple_strs)
org_domain = any("Organization" in triple[2] and "domainIncludes" in triple[1] for triple in triple_strs)
event_domain = any("Event" in triple[2] and "domainIncludes" in triple[1] for triple in triple_strs)
self.assertTrue(person_domain, "Should include Person in domain")
self.assertTrue(org_domain, "Should include Organization in domain")
self.assertTrue(event_domain, "Should include Event in domain")
if __name__ == '__main__':
unittest.main()

View File

@@ -1,247 +0,0 @@
"""
Test for unknown namespace detection and resolution in JSON-LD contexts.
This test verifies that the system can automatically detect and create prefixes
for namespaces that are not predefined in the namespace_prefixes dictionary.
"""
import tempfile
import json
from pathlib import Path
import pytest
from rocrate.rocrate import ROCrate
from lib_ro_crate_schema.crate.schema_facade import SchemaFacade
class TestUnknownNamespaces:
"""Test suite for unknown namespace handling."""
def test_unknown_namespace_detection_in_context(self):
"""Test that unknown namespaces are automatically detected by get_context."""
from lib_ro_crate_schema.crate.jsonld_utils import get_context
from rdflib import Graph, URIRef, Literal
from rdflib.namespace import RDF, RDFS
# Create graph with unknown namespaces
g = Graph()
# Add triples with unknown pokemon.org namespace
pokemon_ns = "http://pokemon.org/"
pikachu = URIRef(pokemon_ns + "pikachu")
pokemon_name = URIRef(pokemon_ns + "pokemonName")
electric_type = URIRef(pokemon_ns + "ElectricPokemon")
g.add((pikachu, RDF.type, electric_type))
g.add((pikachu, pokemon_name, Literal("Pikachu")))
g.add((pokemon_name, RDF.type, RDF.Property))
g.add((pokemon_name, RDFS.label, Literal("Pokemon Name")))
# Add triples with another unknown namespace
villains_ns = "http://villains.org/"
team_rocket = URIRef(villains_ns + "team_rocket")
criminal_org = URIRef(villains_ns + "CriminalOrganization")
motto = URIRef(villains_ns + "motto")
g.add((team_rocket, RDF.type, criminal_org))
g.add((team_rocket, motto, Literal("Prepare for trouble!")))
# Also add known namespace
schema_name = URIRef("https://schema.org/name")
g.add((pikachu, schema_name, Literal("Pikachu the Electric Mouse")))
# Test context generation
context = get_context(g)
assert isinstance(context, list)
assert len(context) >= 2
# Check that both unknown namespaces were detected
detected_namespaces = {}
if len(context) > 1 and isinstance(context[1], dict):
detected_namespaces = context[1]
assert "pokemon" in detected_namespaces
assert detected_namespaces["pokemon"] == "http://pokemon.org/"
assert "villains" in detected_namespaces
assert detected_namespaces["villains"] == "http://villains.org/"
assert "schema" in detected_namespaces
assert detected_namespaces["schema"] == "https://schema.org/"
def test_known_namespaces_still_work(self):
"""Test that predefined namespaces still work correctly."""
from lib_ro_crate_schema.crate.jsonld_utils import get_context
from rdflib import Graph, URIRef, Literal
from rdflib.namespace import RDF, RDFS
g = Graph()
# Add triples with known namespaces used as predicates and types
person = URIRef("http://someone.example/john")
# Use example.com as a predicate (will trigger base: namespace)
example_property = URIRef("http://example.com/customProperty")
g.add((person, example_property, Literal("Some value")))
# Use schema.org properties and types
schema_name = URIRef("https://schema.org/name")
g.add((person, schema_name, Literal("John Doe")))
g.add((person, RDF.type, URIRef("https://schema.org/Person")))
# Use openbis.org as a predicate
openbis_property = URIRef("http://openbis.org/sampleId")
g.add((person, openbis_property, Literal("sample123")))
context = get_context(g)
assert isinstance(context, list)
if len(context) > 1 and isinstance(context[1], dict):
namespaces = context[1]
assert "base" in namespaces
assert namespaces["base"] == "http://example.com/"
assert "schema" in namespaces
assert namespaces["schema"] == "https://schema.org/"
assert "openbis" in namespaces
assert namespaces["openbis"] == "http://openbis.org/"
def test_prefix_collision_handling(self):
"""Test that prefix collisions are handled gracefully."""
from lib_ro_crate_schema.crate.jsonld_utils import get_context
from rdflib import Graph, URIRef, Literal
from rdflib.namespace import RDF
g = Graph()
# Create a scenario where we might have prefix collisions
# Use pokemon.org multiple times with DIFFERENT types (should get 'pokemon' prefix)
pokemon_uri1 = URIRef("http://pokemon.org/pikachu")
pokemon_uri2 = URIRef("http://pokemon.org/raichu")
g.add((pokemon_uri1, RDF.type, URIRef("http://pokemon.org/ElectricPokemon")))
g.add((pokemon_uri2, RDF.type, URIRef("http://pokemon.org/EvolutionPokemon")))
# Use pokemon.com multiple times (should get 'pokemon1' or similar)
pokemon_com_uri1 = URIRef("http://pokemon.com/charizard")
pokemon_com_uri2 = URIRef("http://pokemon.com/blastoise")
g.add((pokemon_com_uri1, RDF.type, URIRef("http://pokemon.com/FirePokemon")))
g.add((pokemon_com_uri2, RDF.type, URIRef("http://pokemon.com/WaterPokemon")))
context = get_context(g)
if isinstance(context, list) and len(context) > 1 and isinstance(context[1], dict):
namespaces = context[1]
# Both namespaces should be detected with different prefixes
pokemon_prefixes = [k for k, v in namespaces.items()
if 'pokemon.' in v]
assert len(pokemon_prefixes) == 2
# Verify the actual mappings exist
namespace_values = list(namespaces.values())
assert "http://pokemon.org/" in namespace_values
assert "http://pokemon.com/" in namespace_values
def test_minimum_usage_threshold(self):
"""Test that namespaces need minimum usage count to be detected."""
from lib_ro_crate_schema.crate.jsonld_utils import get_context
from rdflib import Graph, URIRef, Literal
from rdflib.namespace import RDF
g = Graph()
# Add only one URI from a namespace (below threshold)
single_use = URIRef("http://rarely-used.org/single")
g.add((single_use, RDF.type, URIRef("https://schema.org/Thing")))
# Add multiple URIs from another namespace (above threshold)
frequent_ns = "http://frequent.org/"
for i in range(3):
uri = URIRef(f"{frequent_ns}item{i}")
g.add((uri, RDF.type, URIRef(f"{frequent_ns}ItemType")))
# Add another usage to ensure it meets the threshold
g.add((uri, URIRef(f"{frequent_ns}hasProperty"), Literal(f"value{i}")))
context = get_context(g)
if isinstance(context, list) and len(context) > 1 and isinstance(context[1], dict):
namespaces = context[1]
# frequent.org should be detected
assert "frequent" in namespaces
assert namespaces["frequent"] == "http://frequent.org/"
# rarely-used.org should NOT be detected (only 1 usage)
rarely_used_prefixes = [k for k, v in namespaces.items()
if 'rarely-used.org' in v]
assert len(rarely_used_prefixes) == 0
@pytest.fixture
def temp_ro_crate():
"""Create a temporary RO-Crate with unknown namespaces for testing."""
crate = ROCrate()
# Add entities with unknown namespaces
pokemon_entity = {
'@id': 'http://pokemon.org/pikachu',
'@type': 'http://pokemon.org/ElectricPokemon',
'http://pokemon.org/pokemonName': 'Pikachu',
'http://pokemon.org/type': 'Electric',
'https://schema.org/name': 'Pikachu the Electric Mouse'
}
villain_entity = {
'@id': 'http://villains.org/team_rocket',
'@type': 'http://villains.org/CriminalOrganization',
'http://villains.org/motto': 'Prepare for trouble!',
'https://schema.org/name': 'Team Rocket'
}
crate.add_jsonld(pokemon_entity)
crate.add_jsonld(villain_entity)
return crate
class TestRoundTripNamespaces:
"""Test namespace handling through full import/export cycles."""
def test_rocrate_roundtrip_with_unknown_namespaces(self, temp_ro_crate):
"""Test that unknown namespaces survive import/export cycles."""
with tempfile.TemporaryDirectory() as temp_dir:
temp_path = Path(temp_dir)
# Export original crate
temp_ro_crate.metadata.write(temp_path)
metadata_file = temp_path / 'ro-crate-metadata.json'
original_data = json.loads(metadata_file.read_text())
# Verify original contains full URIs
original_entities = original_data.get('@graph', [])
pokemon_entities = [e for e in original_entities
if 'pokemon.org' in e.get('@id', '')]
assert len(pokemon_entities) >= 1
# Import via SchemaFacade
imported_facade = SchemaFacade.from_ro_crate(temp_path)
assert len(imported_facade.metadata_entries) > 0
# Re-export and check context
final_crate = imported_facade.get_crate()
with tempfile.TemporaryDirectory() as final_dir:
final_crate.metadata.write(final_dir)
final_metadata_file = Path(final_dir) / 'ro-crate-metadata.json'
final_data = json.loads(final_metadata_file.read_text())
# Check that some form of context enhancement occurred
final_context = final_data.get('@context', [])
assert isinstance(final_context, list)
if len(final_context) > 1:
assert isinstance(final_context[1], dict)
# Should have some namespace mappings
assert len(final_context[1]) > 0
if __name__ == "__main__":
pytest.main([__file__])