Source code for solidipes.loaders.rocrate_metadata

import atexit
import json
import os
import shutil
from typing import Literal, Optional, TypeVar, Union
from unittest.mock import patch

from rocrate.model.dataset import Dataset as ROCrateDataset
from rocrate.model.file import File as ROCrateFile
from rocrate.rocrate import ROCrate

from ..utils import logging
from ..utils.utils import compute_checksum, get_study_root_path, transform_data_containers_to_dict
from ..validators.validator import validator
from .cached_metadata import CachedMetadata, ObservableDict, cached_loadable
from .data_container import DataContainer

print = logging.invalidPrint
logger = logging.getLogger()


original_shutil_copy = shutil.copy


[docs] def shutil_copy_skip_wildcard(src, dst, *, follow_symlinks=True): if "*" in str(src) or "*" in str(dst): logger.debug(f"Preventing RO-Crate to copy {src} to {dst}") return return original_shutil_copy(src, dst, follow_symlinks=follow_symlinks)
[docs] class ROCrateProxy: def __init__(self) -> None: """RO-Crate crate object proxy.""" self._crate: Optional[ROCrate] = None self.checksum: str | None = None @property def crate(self) -> ROCrate: if self._crate is not None: return self._crate try: root_path = get_study_root_path() self._readonly = not os.access(root_path, os.W_OK) self._crate = ROCrate(root_path) except ValueError: # Not a valid RO-Crate: missing ro-crate-metadata.json logger.info("RO-Crate metadata file missing or invalid, creating a new one") self._crate = ROCrate() return self._crate
[docs] def write_json(self) -> bool: """Write the RO-Crate metadata file to disk if it has changed. Returns True if the file was written, False otherwise. """ import os try: study_root_path = get_study_root_path() except FileNotFoundError: return False metadata_filepath = os.path.join(study_root_path, "ro-crate-metadata.json") if not os.access(os.path.dirname(metadata_filepath), os.W_OK): return if os.path.exists(metadata_filepath) and not os.access(metadata_filepath, os.W_OK): return logger.info(f"Writing RO-Crate metadata {study_root_path}") # import traceback # logger.error("write rocrate\n\n" + "\n".join(traceback.format_stack())) not_ok = True f = None if os.path.exists(metadata_filepath): from .file import load_file try: f = load_file(metadata_filepath) except RuntimeError: pass while not_ok: try: with patch("shutil.copy", side_effect=shutil_copy_skip_wildcard): self.crate.write(study_root_path) not_ok = False self.checksum = compute_checksum(metadata_filepath) logger.info(f"Saved RO-Crate metadata {study_root_path}") # force update of the cached modified time if f is not None: cache = f.get_cached_metadata() cache["modified_time"] = f.modified_time CachedMetadata._commit() return True except FileNotFoundError as e: logger.error(e) import os with patch("shutil.copy", side_effect=shutil_copy_skip_wildcard): fname = os.path.relpath(e.filename, self.crate.source) self.crate.delete(fname) return False
[docs] def as_dict(self) -> dict: """Get the RO-Crate crate as a dictionary. Reads from the ro-crate-metadata.json file without updating it!""" rocrate_metadata_path = os.path.join(get_study_root_path(), "ro-crate-metadata.json") try: with open(rocrate_metadata_path) as f: rocrate_dict = json.load(f) data = rocrate_dict["@graph"] for entry in data: keys = [e for e in entry.keys()] for k in keys: if not k.startswith("json_"): continue new_key = k.split("json_")[1] entry[new_key] = json.loads(entry[k]) del entry[k] except FileNotFoundError: return {} return rocrate_dict
[docs] def close_crate(self) -> None: """Close the RO-Crate crate.""" self._crate = None
def __getattr__(self, key: str): return getattr(self.crate, key)
rocrate = ROCrateProxy()
[docs] class ROCrateMetadataProperty(property): """Marker class for RO-Crate metadata properties."""
T = TypeVar("T")
[docs] class rocrate_metadata(cached_loadable): """Decorator for loadables that are saved as RO-Crate metadata."""
[docs] def wrapped_fget(self, obj): data = super().wrapped_fget(obj) value = data if isinstance(value, ObservableDict): value = value._data def callback(): value = getattr(obj, self.key, None) if isinstance(value, ObservableDict): value = value._data value = transform_data_containers_to_dict(value) obj.set_crate_metadata(self.key, value) obj.save_field_to_cache(self.key) data = ObservableDict(value, callback=callback) value = transform_data_containers_to_dict(value) obj.set_crate_metadata(self.key, value) return data
[docs] def wrapped_fset(self, obj, value) -> None: if self.key == "ontology_class": logger.warning("ontology_class is read only") return super().wrapped_fset(obj, value) obj.set_crate_metadata(self.key, value)
[docs] class ROCrateMetadata(CachedMetadata): """RO-Crate metadata.""" rocrate_metadata = rocrate_metadata def __init__(self, *args, **kwargs) -> None: self._rocrate_type: Literal["dataset", "directory", "file"] = "file" self._rocrate_entity: Optional[Union[ROCrateDataset, ROCrateFile]] = None super().__init__(*args, **kwargs) logger.debug(f'Creating rocrate metadata container "{self.unique_identifier}" ({self.class_path})') self.get_rocrate_entity()
[docs] def update_cached_metadata(self) -> None: self._load_rocrate_metadata() super().update_cached_metadata()
[docs] def _load_rocrate_metadata(self) -> None: """Load field from RO-Crate metadata file if not already loaded from cache.""" cls = self.__class__ rocrate_metadata_from_file = self.get_rocrate_entity().properties() for attribute_name in dir(cls): attribute = getattr(cls, attribute_name) if not isinstance(attribute, rocrate_metadata): continue value = self._data_collection.get(attribute_name, None) if value is None: # Try to get value from RO-Crate metadata file value = rocrate_metadata_from_file.get(attribute_name, None) if value is None: continue if isinstance(value, ObservableDict): value = value._data if isinstance(value, dict) or isinstance(value, DataContainer): def callback(): value = getattr(self, attribute_name, None) if isinstance(value, ObservableDict): value = value._data value = transform_data_containers_to_dict(value) self.set_crate_metadata(attribute_name, value) self.save_field_to_cache(attribute_name) value = ObservableDict(value, callback=callback) self.add(attribute_name, value)
[docs] def get_rocrate_entity(self) -> Union[ROCrateDataset, ROCrateFile]: if self._rocrate_entity is None: self._rocrate_entity = rocrate.get(self.unique_identifier.replace("\\", "/")) if self._rocrate_entity is None: rocrate_add_method = getattr(rocrate, f"add_{self._rocrate_type}") self._rocrate_entity = rocrate_add_method(self.path, dest_path=self.unique_identifier) for attr in dir(self.__class__): if isinstance(getattr(self.__class__, attr), ROCrateMetadataProperty): _ = getattr(self, attr) # Trigger adding default value return self._rocrate_entity
@property def additional_metadata(self): d = self.get_rocrate_entity().properties() return d
[docs] def get_crate_metadata(self, key): crate_metadata = self.get_rocrate_entity().properties() if key in crate_metadata.keys(): return crate_metadata.get(key) if "json_" + key in crate_metadata.keys(): return json.loads(crate_metadata["json_" + key]) raise ValueError(key)
[docs] def set_crate_metadata(self, key, value): crate_metadata = self.get_rocrate_entity().properties() # crate_metadata[key] = self.crate_nest(key, value) if isinstance(value, dict): import json crate_metadata["json_" + key] = json.dumps(value) else: crate_metadata[key] = value
[docs] @classmethod def close_cached_metadata(cls) -> None: """Close cached metadata and RO-Crate.""" super().close_cached_metadata() rocrate.close_crate()
[docs] @rocrate_metadata def ontology_class(self) -> Optional[str]: """Ontology class of this data container.""" return self.class_path
@validator(description="Ontology is matched") def ontology_is_matched(self) -> bool: """Check if the ontology is matched""" from ..validators.global_validation import get_global_validator from ..validators.ontology import OntologyValidator ontology_validator = get_global_validator(OntologyValidator) errors = ontology_validator.ontology.validate_file(self) self.add_validation_error(errors) return len(errors) == 0
[docs] def update_rocrate_json() -> None: """Update the RO-Crate metadata file on disk if necessary.""" ROCrateMetadata.commit_if_scheduled()
atexit.register(ROCrateMetadata.close_cached_metadata) ################################################################