import importlib
import os
import re
import sys
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, Optional
from ..loaders.rocrate_metadata import rocrate
from ..utils import logging
from ..utils.utils import get_study_metadata, load_yaml, set_study_metadata
from .global_validation import get_global_validator
from .validator import Validator
logger = logging.getLogger()
if TYPE_CHECKING:
from linkml.validator.report import ValidationReport as LinkMLValidationReport
from rdflib.term import URIRef
from ..ontologies.solidipes import ROCrateMetadata as PydanticROCrateMetadata
[docs]
class Ontology(ABC):
[docs]
@abstractmethod
def get_class_names(self) -> list[str]:
"""Get the list of class names of the ontology."""
[docs]
@abstractmethod
def validate(self, obj) -> bool:
"""Validate a Dataset according to the ontology."""
[docs]
@abstractmethod
def validate_file(self, obj) -> bool:
"""Validate a File according to the ontology."""
[docs]
@abstractmethod
def get_file_class_name(self, obj) -> str:
"""Get the class name of a File according to the ontology."""
[docs]
def get_file_compatible_class_names(self, obj) -> list[str]:
"""Get the list of compatible class names of a File according to the ontology."""
return self.get_class_names()
[docs]
class LinkMLOntology(Ontology):
def __init__(self, identifier: str | None = None):
from linkml.validator import Validator as LinkMLValidator
from linkml.validator.plugins import (
JsonschemaValidationPlugin,
RecommendedSlotsPlugin,
)
if not identifier:
identifier = os.path.join(os.path.dirname(__file__), "..", "ontologies", "solidipes.yaml")
try:
self.schema = load_yaml(identifier)
self.linkml_validator = LinkMLValidator(
schema=identifier,
validation_plugins=[
JsonschemaValidationPlugin(
closed=True, # Disallow additional properties on root
include_range_class_descendants=True,
),
RecommendedSlotsPlugin(),
],
)
except Exception:
self.schema = {}
self.linkml_validator = None
[docs]
def get_class_names(self) -> list[str]:
return list(self.schema.get("classes", {}).keys())
[docs]
def _get_linkml_validation_report(self) -> "LinkMLValidationReport":
rocrate_metadata = rocrate.as_dict()
return self.linkml_validator.validate(rocrate_metadata)
[docs]
def validate(self, obj) -> bool:
try:
validation_report = self._get_linkml_validation_report()
except Exception as e:
self.add_validation_error(f"Error in ontology validation: {e}")
return False
if len(validation_report.results) == 0:
return True
for validation_result in validation_report.results:
self.add_validation_error(validation_result.message)
return False
[docs]
def validate_file(self, obj) -> bool:
try:
validation_report = self._get_linkml_validation_report()
except Exception as e:
self.add_validation_error(f"Error in ontology validation: {e}")
return False
if len(validation_report.results) == 0:
return True
for validation_result in validation_report.results:
message = validation_result.message
if not re.search(rf"'@id': '{obj.unique_identifier}'", message):
continue
self.add_validation_error(message)
# Can add more details if needed
# for message in validation_result.context:
# self.add_validation_error(message)
return False
return True
[docs]
def get_file_class_name(self, obj) -> str:
raise NotImplementedError("get_file_class_name is not implemented for LinkMLOntology")
[docs]
class PydanticOntology(Ontology):
[docs]
class ModuleLoadResult: ...
[docs]
class ModuleLoadOk(ModuleLoadResult): ...
[docs]
class ModuleLoadError(ModuleLoadResult):
def __init__(self, error_message: str):
self.error_message = error_message
def __init__(self, identifier: str | None = None):
self.module = None
self.module_load_status = self.ModuleLoadOk()
if identifier:
module_name = os.path.splitext(os.path.basename(identifier))[0]
module_dir = os.path.dirname(os.path.abspath(identifier))
if module_dir not in sys.path:
sys.path.insert(0, module_dir)
try:
spec = importlib.util.spec_from_file_location(module_name, identifier)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
self.module = module
except (AttributeError, FileNotFoundError):
error_message = f'Could not find ontology module: "{identifier}". Defaulting to base Solidipes ontology ("solidipes.ontologies.solidipes").'
logger.warning(error_message)
self.module_load_status = self.ModuleLoadError(error_message)
pass
if self.module is not None and not hasattr(self.module, "ROCrateMetadata"):
error_message = f'Could not find Solidipes ontology classes in module from identifier: "{identifier}". Defaulting to base Solidipes ontology ("solidipes.ontologies.solidipes").'
logger.warning(error_message)
self.module = None
self.module_load_status = self.ModuleLoadError(error_message)
if self.module is None or not hasattr(self.module, "ROCrateMetadata"):
from solidipes_core_plugin.ontologies import core_ontology
self.module = core_ontology
self.model = getattr(self.module, "ROCrateMetadata")
self.__model_instance: "PydanticROCrateMetadata" = None
self._rocrate_metadata_checksum: str | None = None
_validation_errors: list[str] = []
_validation_errors_per_unique_identifier: dict[str, list[str]] = {}
[docs]
@classmethod
def _reset_validation_errors(cls) -> None:
cls._validation_errors = []
cls._validation_errors_per_unique_identifier = {}
[docs]
@classmethod
def add_validation_error(cls, error: str, unique_identifier: str | None = None) -> None:
if unique_identifier:
if unique_identifier not in cls._validation_errors_per_unique_identifier:
cls._validation_errors_per_unique_identifier[unique_identifier] = []
cls._validation_errors.append(f"Ontology validation errors in '{unique_identifier}'")
cls._validation_errors_per_unique_identifier[unique_identifier].append(error)
else:
cls._validation_errors.append(error)
[docs]
@classmethod
def _get_validation_errors(cls, unique_identifier: str | None = None) -> list[str]:
if unique_identifier:
return cls._validation_errors_per_unique_identifier.get(unique_identifier, [])
else:
return cls._validation_errors
[docs]
@classmethod
def _has_validation_errors(cls, unique_identifier: str | None = None) -> bool:
if unique_identifier:
return len(cls._get_validation_errors(unique_identifier)) > 0
else:
return len(cls._get_validation_errors()) > 0 or len(cls._validation_errors_per_unique_identifier) > 0
@property
def _model_instance(self) -> "PydanticROCrateMetadata | None":
from pydantic import ValidationError
if self._rocrate_metadata_checksum != rocrate.checksum or self.__model_instance is None:
logger.debug("Creating new model instance for ontology validation.")
self._reset_validation_errors()
rocrate_metadata = rocrate.as_dict()
self._rocrate_metadata_checksum = rocrate.checksum
self.__model_instance = {}
try:
self.__model_instance = self.model(**rocrate_metadata)
except ValidationError as e:
for error in e.errors():
unique_identifier = None
input = error.get("input", None)
if isinstance(input, dict):
unique_identifier = input.get("@id", None)
loc = [str(loc_part) for loc_part in error.get("loc", [])]
self.add_validation_error(
f"Error in ontology validation ({error['type']}) at {'.'.join(loc)}: {error['msg']}",
unique_identifier=unique_identifier,
)
except Exception as e:
raise e
self.add_validation_error(f"Error in ontology validation ({type(e).__name__}): {e}")
return self.__model_instance
[docs]
def get_class_names(self) -> list[str]:
if not self.model:
return []
try:
ROCrateMetadata = getattr(self.module, "ROCrateMetadata")
except AttributeError:
return []
classes = ROCrateMetadata.getClasses()
classes.sort(key=lambda cls: len(cls.__mro__), reverse=True) # Sort by inheritance depth
classes.reverse() # Double reverse seems to keep original declaration order
return [cls.__name__ for cls in classes]
[docs]
def validate(self, obj) -> bool:
if not self.model:
self.add_validation_error("No model defined for ontology validation.")
return False
_ = self._model_instance
return not self._has_validation_errors()
[docs]
def validate_file(self, obj) -> bool:
if not self.model:
self.add_validation_error("No model defined for ontology validation.")
return True
_ = self._model_instance
return self._get_validation_errors(unique_identifier=obj.unique_identifier)
[docs]
def get_file_class_name(self, obj) -> str:
cls = self.get_file_class(obj)
if cls is None:
return "Unknown"
return cls.__name__
[docs]
def get_file_class(self, obj) -> str:
if not self._model_instance:
logger.debug("Model instance not available for getting file class.")
return
for entity in self._model_instance.graph:
if getattr(entity, "at_id", None) == obj.unique_identifier:
return entity.__class__
logger.debug(f"Could not find class for file with unique_identifier: {obj.unique_identifier}")
[docs]
def get_file_rocrate_dict(self, obj) -> dict:
_ = self._model_instance
for entity in rocrate.as_dict().get("@graph", []):
if entity.get("@id", None) == obj.unique_identifier:
return entity
return {}
[docs]
def get_file_compatible_class_names(self, obj) -> list[str]:
rocrate_dict = self.get_file_rocrate_dict(obj)
if not rocrate_dict:
return []
compatible_class_names = []
for class_name in self.get_class_names():
cls = getattr(self.module, class_name)
try:
_ = cls(**rocrate_dict)
compatible_class_names.append(class_name)
except Exception:
continue
return compatible_class_names
[docs]
def get_file_compatible_classes(self, obj) -> list[str]:
rocrate_dict = self.get_file_rocrate_dict(obj)
if not rocrate_dict:
return []
compatible_classes = []
for class_name in self.get_class_names():
cls = getattr(self.module, class_name)
try:
_ = cls(**rocrate_dict)
compatible_classes.append(cls)
except Exception:
continue
return compatible_classes
[docs]
class RDFOntology(Ontology):
def __init__(self, identifier: str):
from rdflib import Graph
self.identifier = identifier
self.graph = Graph()
self.graph.parse(self.identifier)
[docs]
def get_classes(self) -> list["URIRef"]:
from rdflib import RDF, Namespace
OWL = Namespace("http://www.w3.org/2002/07/owl#")
classes = self.graph.subjects(RDF.type, OWL.Class)
return list(classes) or []
[docs]
def get_class_name(self, class_: "URIRef") -> str:
return str(class_).split("#")[-1]
[docs]
def get_class_from_name(self, class_name: str) -> Optional["URIRef"]:
for class_ in self.get_classes():
if self.get_class_name(class_) == class_name:
return class_
return None
[docs]
def get_class_names(self) -> list[str]:
return [self.get_class_name(class_) for class_ in self.get_classes()]
[docs]
def validate(self, obj) -> bool:
raise NotImplementedError("validate is not implemented for RDFOntology")
[docs]
def validate_file(self, obj) -> bool:
raise NotImplementedError("validate_file is not implemented for RDFOntology")
[docs]
def get_file_class_name(self, obj) -> str:
raise NotImplementedError("get_file_class_name is not implemented for RDFOntology")
[docs]
class OntologyValidator(Validator):
def __init__(self, description: str = "Ontology is matched", **kwargs):
super().__init__(description=description, **kwargs)
ontology_identifier = get_ontology_identifier()
# translate to core plugin
if ontology_identifier == "solidipes.ontologies.solidipes":
ontology_identifier = "solidipes_core_plugin.ontologies.core_ontology"
self.ontology: Ontology = get_ontology_from_identifier(ontology_identifier)
[docs]
def _validate(self, obj=None) -> bool:
self._result = self.ontology.validate(obj)
self._errors = self.ontology._get_validation_errors()
return self._result
[docs]
def set_ontology_identifier(identifier: str):
study_metadata = get_study_metadata()
if "ontology" not in study_metadata:
study_metadata["ontology"] = {}
ontology_info = study_metadata["ontology"]
ontology_info["identifier"] = identifier
set_study_metadata(study_metadata)
ontology_validator = get_global_validator(OntologyValidator)
ontology_validator.ontology = get_ontology_from_identifier(identifier)
[docs]
def get_ontology_identifier() -> str:
ontology_info = get_study_metadata().get("ontology", {})
if "identifier" in ontology_info:
return ontology_info["identifier"]
study_metadata = get_study_metadata()
if "ontology" not in study_metadata:
study_metadata["ontology"] = {}
ontology_info = study_metadata["ontology"]
ontology_info["identifier"] = "solidipes_core_plugin.ontologies.core_ontology"
set_study_metadata(study_metadata)
return "solidipes_core_plugin.ontologies.core_ontology"
[docs]
def get_ontology_from_identifier(identifier: str) -> Ontology:
if identifier.endswith(".yaml") or identifier.endswith(".yml"):
return LinkMLOntology(identifier)
if identifier.endswith(".py"):
return PydanticOntology(identifier)
if identifier.endswith(".ttl") or identifier.endswith(".rdf") or identifier.endswith(".owl"):
return RDFOntology(identifier)
try:
module = importlib.import_module(identifier)
return PydanticOntology(module.__file__)
except (ModuleNotFoundError, ImportError, ValueError) as e:
if identifier == "solidipes_core_plugin.ontologies.core_ontology":
raise RuntimeError(f"Internal ontology of module loading problem: {e}")
logger.warning(
f'Could not determine ontology type from identifier: "{identifier}". Defaulting to PydanticOntology.'
)
set_ontology_identifier("solidipes_core_plugin.ontologies.core_ontology")
return PydanticOntology(identifier)
[docs]
def get_available_ontology_identifiers() -> list[str]:
"""Get a list of available ontology identifiers from plugins and dataset."""
from solidipes_core_plugin.loaders.ontology import PydanticOntology
from ..plugins.discovery import ontology_list
from .curation import CurationValidator
try:
scanner = get_global_validator(CurationValidator).scanner
loader_dict = scanner.get_loader_dict()
identifiers_from_dataset = [
loader.path for loader in loader_dict.values() if isinstance(loader, PydanticOntology)
]
except Exception:
identifiers_from_dataset = []
identifiers_from_plugins = [model.__module__ for model in ontology_list]
return identifiers_from_dataset + identifiers_from_plugins