Source code for solidipes.validators.ontology

import importlib
import os
import re
import sys
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, Optional

from ..loaders.rocrate_metadata import rocrate
from ..utils import logging
from ..utils.utils import get_study_metadata, load_yaml, set_study_metadata
from .global_validation import get_global_validator
from .validator import Validator

logger = logging.getLogger()


if TYPE_CHECKING:
    from linkml.validator.report import ValidationReport as LinkMLValidationReport
    from rdflib.term import URIRef

    from ..ontologies.solidipes import ROCrateMetadata as PydanticROCrateMetadata


[docs] class Ontology(ABC):
[docs] @abstractmethod def get_class_names(self) -> list[str]: """Get the list of class names of the ontology."""
[docs] @abstractmethod def validate(self, obj) -> bool: """Validate a Dataset according to the ontology."""
[docs] @abstractmethod def validate_file(self, obj) -> bool: """Validate a File according to the ontology."""
[docs] @abstractmethod def get_file_class_name(self, obj) -> str: """Get the class name of a File according to the ontology."""
[docs] def get_file_compatible_class_names(self, obj) -> list[str]: """Get the list of compatible class names of a File according to the ontology.""" return self.get_class_names()
[docs] class LinkMLOntology(Ontology): def __init__(self, identifier: str | None = None): from linkml.validator import Validator as LinkMLValidator from linkml.validator.plugins import ( JsonschemaValidationPlugin, RecommendedSlotsPlugin, ) if not identifier: identifier = os.path.join(os.path.dirname(__file__), "..", "ontologies", "solidipes.yaml") try: self.schema = load_yaml(identifier) self.linkml_validator = LinkMLValidator( schema=identifier, validation_plugins=[ JsonschemaValidationPlugin( closed=True, # Disallow additional properties on root include_range_class_descendants=True, ), RecommendedSlotsPlugin(), ], ) except Exception: self.schema = {} self.linkml_validator = None
[docs] def get_class_names(self) -> list[str]: return list(self.schema.get("classes", {}).keys())
[docs] def _get_linkml_validation_report(self) -> "LinkMLValidationReport": rocrate_metadata = rocrate.as_dict() return self.linkml_validator.validate(rocrate_metadata)
[docs] def validate(self, obj) -> bool: try: validation_report = self._get_linkml_validation_report() except Exception as e: self.add_validation_error(f"Error in ontology validation: {e}") return False if len(validation_report.results) == 0: return True for validation_result in validation_report.results: self.add_validation_error(validation_result.message) return False
[docs] def validate_file(self, obj) -> bool: try: validation_report = self._get_linkml_validation_report() except Exception as e: self.add_validation_error(f"Error in ontology validation: {e}") return False if len(validation_report.results) == 0: return True for validation_result in validation_report.results: message = validation_result.message if not re.search(rf"'@id': '{obj.unique_identifier}'", message): continue self.add_validation_error(message) # Can add more details if needed # for message in validation_result.context: # self.add_validation_error(message) return False return True
[docs] def get_file_class_name(self, obj) -> str: raise NotImplementedError("get_file_class_name is not implemented for LinkMLOntology")
[docs] class PydanticOntology(Ontology):
[docs] class ModuleLoadResult: ...
[docs] class ModuleLoadOk(ModuleLoadResult): ...
[docs] class ModuleLoadError(ModuleLoadResult): def __init__(self, error_message: str): self.error_message = error_message
def __init__(self, identifier: str | None = None): self.module = None self.module_load_status = self.ModuleLoadOk() if identifier: module_name = os.path.splitext(os.path.basename(identifier))[0] module_dir = os.path.dirname(os.path.abspath(identifier)) if module_dir not in sys.path: sys.path.insert(0, module_dir) try: spec = importlib.util.spec_from_file_location(module_name, identifier) module = importlib.util.module_from_spec(spec) spec.loader.exec_module(module) self.module = module except (AttributeError, FileNotFoundError): error_message = f'Could not find ontology module: "{identifier}". Defaulting to base Solidipes ontology ("solidipes.ontologies.solidipes").' logger.warning(error_message) self.module_load_status = self.ModuleLoadError(error_message) pass if self.module is not None and not hasattr(self.module, "ROCrateMetadata"): error_message = f'Could not find Solidipes ontology classes in module from identifier: "{identifier}". Defaulting to base Solidipes ontology ("solidipes.ontologies.solidipes").' logger.warning(error_message) self.module = None self.module_load_status = self.ModuleLoadError(error_message) if self.module is None or not hasattr(self.module, "ROCrateMetadata"): from ..ontologies import solidipes as solidipes_ontology self.module = solidipes_ontology self.model = getattr(self.module, "ROCrateMetadata") self.__model_instance: "PydanticROCrateMetadata" = None self._rocrate_metadata_checksum: str | None = None _validation_errors: list[str] = [] _validation_errors_per_unique_identifier: dict[str, list[str]] = {}
[docs] @classmethod def _reset_validation_errors(cls) -> None: cls._validation_errors = [] cls._validation_errors_per_unique_identifier = {}
[docs] @classmethod def add_validation_error(cls, error: str, unique_identifier: str | None = None) -> None: if unique_identifier: if unique_identifier not in cls._validation_errors_per_unique_identifier: cls._validation_errors_per_unique_identifier[unique_identifier] = [] cls._validation_errors.append(f"Ontology validation errors in '{unique_identifier}'") cls._validation_errors_per_unique_identifier[unique_identifier].append(error) else: cls._validation_errors.append(error)
[docs] @classmethod def _get_validation_errors(cls, unique_identifier: str | None = None) -> list[str]: if unique_identifier: return cls._validation_errors_per_unique_identifier.get(unique_identifier, []) else: return cls._validation_errors
[docs] @classmethod def _has_validation_errors(cls, unique_identifier: str | None = None) -> bool: if unique_identifier: return len(cls._get_validation_errors(unique_identifier)) > 0 else: return len(cls._get_validation_errors()) > 0 or len(cls._validation_errors_per_unique_identifier) > 0
@property def _model_instance(self) -> "PydanticROCrateMetadata | None": from pydantic import ValidationError if self._rocrate_metadata_checksum != rocrate.checksum or self.__model_instance is None: logger.debug("Creating new model instance for ontology validation.") self._reset_validation_errors() rocrate_metadata = rocrate.as_dict() self._rocrate_metadata_checksum = rocrate.checksum self.__model_instance = {} try: self.__model_instance = self.model(**rocrate_metadata) except ValidationError as e: for error in e.errors(): unique_identifier = None input = error.get("input", None) if isinstance(input, dict): unique_identifier = input.get("@id", None) loc = [str(loc_part) for loc_part in error.get("loc", [])] self.add_validation_error( f"Error in ontology validation ({error['type']}) at {'.'.join(loc)}: {error['msg']}", unique_identifier=unique_identifier, ) except Exception as e: self.add_validation_error(f"Error in ontology validation ({type(e).__name__}): {e}") return self.__model_instance
[docs] def get_class_names(self) -> list[str]: if not self.model: return [] try: ROCrateEntity = getattr(self.module, "ROCrateEntity") except AttributeError: return [] classes = [ obj for obj in vars(self.module).values() if isinstance(obj, type) and issubclass(obj, ROCrateEntity) and "ontology_class" in getattr(obj, "model_fields", {}) ] classes.sort(key=lambda cls: len(cls.__mro__), reverse=True) # Sort by inheritance depth classes.reverse() # Double reverse seems to keep original declaration order return [cls.__name__ for cls in classes]
[docs] def validate(self, obj) -> bool: if not self.model: self.add_validation_error("No model defined for ontology validation.") return False _ = self._model_instance return not self._has_validation_errors()
[docs] def validate_file(self, obj) -> bool: if not self.model: self.add_validation_error("No model defined for ontology validation.") return True _ = self._model_instance return self._get_validation_errors(unique_identifier=obj.unique_identifier)
[docs] def get_file_class_name(self, obj) -> str: if not self._model_instance: logger.debug("Model instance not available for getting file class name.") return "Undefined" for entity in self._model_instance.graph: if getattr(entity, "at_id", None) == obj.unique_identifier: return entity.__class__.__name__ logger.debug(f"Could not find class name for file with unique_identifier: {obj.unique_identifier}") return "Unknown"
[docs] def get_file_rocrate_dict(self, obj) -> dict: _ = self._model_instance for entity in rocrate.as_dict().get("@graph", []): if entity.get("@id", None) == obj.unique_identifier: return entity return {}
[docs] def get_file_compatible_class_names(self, obj) -> list[str]: rocrate_dict = self.get_file_rocrate_dict(obj) if not rocrate_dict: return [] compatible_class_names = [] for class_name in self.get_class_names(): cls = getattr(self.module, class_name) try: _ = cls(**rocrate_dict) compatible_class_names.append(class_name) except Exception: continue return compatible_class_names
[docs] class RDFOntology(Ontology): def __init__(self, identifier: str): from rdflib import Graph self.identifier = identifier self.graph = Graph() self.graph.parse(self.identifier)
[docs] def get_classes(self) -> list["URIRef"]: from rdflib import RDF, Namespace OWL = Namespace("http://www.w3.org/2002/07/owl#") classes = self.graph.subjects(RDF.type, OWL.Class) return list(classes) or []
[docs] def get_class_name(self, class_: "URIRef") -> str: return str(class_).split("#")[-1]
[docs] def get_class_from_name(self, class_name: str) -> Optional["URIRef"]: for class_ in self.get_classes(): if self.get_class_name(class_) == class_name: return class_ return None
[docs] def get_class_names(self) -> list[str]: return [self.get_class_name(class_) for class_ in self.get_classes()]
[docs] def validate(self, obj) -> bool: raise NotImplementedError("validate is not implemented for RDFOntology")
[docs] def validate_file(self, obj) -> bool: raise NotImplementedError("validate_file is not implemented for RDFOntology")
[docs] def get_file_class_name(self, obj) -> str: raise NotImplementedError("get_file_class_name is not implemented for RDFOntology")
[docs] class OntologyValidator(Validator): def __init__(self, description: str = "Ontology is matched", **kwargs): super().__init__(description=description, **kwargs) ontology_identifier = get_ontology_identifier() self.ontology: Ontology = get_ontology_from_identifier(ontology_identifier)
[docs] def _validate(self, obj=None) -> bool: self._result = self.ontology.validate(obj) self._errors = self.ontology._get_validation_errors() return self._result
[docs] def set_ontology_identifier(identifier: str): study_metadata = get_study_metadata() if "ontology" not in study_metadata: study_metadata["ontology"] = {} ontology_info = study_metadata["ontology"] ontology_info["identifier"] = identifier set_study_metadata(study_metadata) ontology_validator = get_global_validator(OntologyValidator) ontology_validator.ontology = get_ontology_from_identifier(identifier)
[docs] def get_ontology_identifier() -> str: ontology_info = get_study_metadata().get("ontology", {}) return ontology_info.get("identifier", "")
[docs] def get_ontology_from_identifier(identifier: str) -> Ontology: if identifier.endswith(".yaml") or identifier.endswith(".yml"): return LinkMLOntology(identifier) if identifier.endswith(".py"): return PydanticOntology(identifier) if identifier.endswith(".ttl") or identifier.endswith(".rdf") or identifier.endswith(".owl"): return RDFOntology(identifier) try: module = importlib.import_module(identifier) return PydanticOntology(module.__file__) except (ModuleNotFoundError, ImportError, ValueError): pass logger.warning( f'Could not determine ontology type from identifier: "{identifier}". Defaulting to PydanticOntology.' ) set_ontology_identifier("solidipes.ontologies.solidipes") return PydanticOntology(identifier)
[docs] def get_available_ontology_identifiers() -> list[str]: """Get a list of available ontology identifiers from plugins and dataset.""" from solidipes_core_plugin.loaders.ontology import PydanticOntology from ..plugins.discovery import ontology_list from .curation import CurationValidator try: scanner = get_global_validator(CurationValidator).scanner loader_dict = scanner.get_loader_dict() identifiers_from_dataset = [ loader.path for loader in loader_dict.values() if isinstance(loader, PydanticOntology) ] except Exception: identifiers_from_dataset = [] identifiers_from_plugins = [model.__module__ for model in ontology_list] return identifiers_from_dataset + identifiers_from_plugins