Source code for solidipes.loaders.rocrate_metadata
import atexit
import json
import os
import shutil
from typing import Literal, Optional, TypeVar, Union
from unittest.mock import patch
from rocrate.model.dataset import Dataset as ROCrateDataset
from rocrate.model.file import File as ROCrateFile
from rocrate.rocrate import ROCrate
from ..utils import logging
from ..utils.utils import compute_checksum, get_study_root_path, transform_data_containers_to_dict
from ..validators.validator import validator
from .cached_metadata import CachedMetadata, ObservableDict, cached_loadable
from .data_container import DataContainer
print = logging.invalidPrint
logger = logging.getLogger()
original_shutil_copy = shutil.copy
[docs]
def shutil_copy_skip_wildcard(src, dst, *, follow_symlinks=True):
if "*" in str(src) or "*" in str(dst):
logger.debug(f"Preventing RO-Crate to copy {src} to {dst}")
return
return original_shutil_copy(src, dst, follow_symlinks=follow_symlinks)
[docs]
class ROCrateProxy:
def __init__(self) -> None:
"""RO-Crate crate object proxy."""
self._crate: Optional[ROCrate] = None
self.checksum: str | None = None
@property
def crate(self) -> ROCrate:
if self._crate is not None:
return self._crate
try:
root_path = get_study_root_path()
self._readonly = not os.access(root_path, os.W_OK)
self._crate = ROCrate(root_path)
except ValueError: # Not a valid RO-Crate: missing ro-crate-metadata.json
logger.info("RO-Crate metadata file missing or invalid, creating a new one")
self._crate = ROCrate()
return self._crate
[docs]
def write_json(self) -> bool:
"""Write the RO-Crate metadata file to disk if it has changed.
Returns True if the file was written, False otherwise.
"""
import os
try:
study_root_path = get_study_root_path()
except FileNotFoundError:
return False
metadata_filepath = os.path.join(study_root_path, "ro-crate-metadata.json")
if not os.access(os.path.dirname(metadata_filepath), os.W_OK):
return
if os.path.exists(metadata_filepath) and not os.access(metadata_filepath, os.W_OK):
return
logger.info(f"Writing RO-Crate metadata {study_root_path}")
# import traceback
# logger.error("write rocrate\n\n" + "\n".join(traceback.format_stack()))
not_ok = True
f = None
if os.path.exists(metadata_filepath):
from .file import load_file
try:
f = load_file(metadata_filepath)
except RuntimeError:
pass
while not_ok:
try:
with patch("shutil.copy", side_effect=shutil_copy_skip_wildcard):
self.crate.write(study_root_path)
not_ok = False
self.checksum = compute_checksum(metadata_filepath)
logger.info(f"Saved RO-Crate metadata {study_root_path}")
# force update of the cached modified time
if f is not None:
cache = f.get_cached_metadata()
cache["modified_time"] = f.modified_time
CachedMetadata._commit()
return True
except FileNotFoundError as e:
logger.error(e)
import os
with patch("shutil.copy", side_effect=shutil_copy_skip_wildcard):
fname = os.path.relpath(e.filename, self.crate.source)
self.crate.delete(fname)
return False
[docs]
def as_dict(self) -> dict:
"""Get the RO-Crate crate as a dictionary. Reads from the ro-crate-metadata.json file without updating it!"""
rocrate_metadata_path = os.path.join(get_study_root_path(), "ro-crate-metadata.json")
try:
with open(rocrate_metadata_path) as f:
rocrate_dict = json.load(f)
data = rocrate_dict["@graph"]
for entry in data:
keys = [e for e in entry.keys()]
for k in keys:
if not k.startswith("json_"):
continue
new_key = k.split("json_")[1]
entry[new_key] = json.loads(entry[k])
del entry[k]
except FileNotFoundError:
return {}
return rocrate_dict
def __getattr__(self, key: str):
return getattr(self.crate, key)
rocrate = ROCrateProxy()
[docs]
class ROCrateMetadataProperty(property):
"""Marker class for RO-Crate metadata properties."""
T = TypeVar("T")
[docs]
class rocrate_metadata(cached_loadable):
"""Decorator for loadables that are saved as RO-Crate metadata."""
[docs]
def wrapped_fget(self, obj):
data = super().wrapped_fget(obj)
value = data
if isinstance(value, ObservableDict):
value = value._data
def callback():
value = getattr(obj, self.key, None)
if isinstance(value, ObservableDict):
value = value._data
value = transform_data_containers_to_dict(value)
obj.set_crate_metadata(self.key, value)
obj.save_field_to_cache(self.key)
data = ObservableDict(value, callback=callback)
value = transform_data_containers_to_dict(value)
obj.set_crate_metadata(self.key, value)
return data
[docs]
def wrapped_fset(self, obj, value) -> None:
if self.key == "ontology_class":
logger.warning("ontology_class is read only")
return
super().wrapped_fset(obj, value)
obj.set_crate_metadata(self.key, value)
[docs]
class ROCrateMetadata(CachedMetadata):
"""RO-Crate metadata."""
rocrate_metadata = rocrate_metadata
def __init__(self, *args, **kwargs) -> None:
self._rocrate_type: Literal["dataset", "directory", "file"] = "file"
self._rocrate_entity: Optional[Union[ROCrateDataset, ROCrateFile]] = None
super().__init__(*args, **kwargs)
logger.debug(f'Creating rocrate metadata container "{self.unique_identifier}" ({self.class_path})')
self.get_rocrate_entity()
[docs]
def update_cached_metadata(self) -> None:
self._load_rocrate_metadata()
super().update_cached_metadata()
[docs]
def _load_rocrate_metadata(self) -> None:
"""Load field from RO-Crate metadata file if not already loaded from cache."""
cls = self.__class__
rocrate_metadata_from_file = self.get_rocrate_entity().properties()
for attribute_name in dir(cls):
attribute = getattr(cls, attribute_name)
if not isinstance(attribute, rocrate_metadata):
continue
value = self._data_collection.get(attribute_name, None)
if value is None:
# Try to get value from RO-Crate metadata file
value = rocrate_metadata_from_file.get(attribute_name, None)
if value is None:
continue
if isinstance(value, ObservableDict):
value = value._data
if isinstance(value, dict) or isinstance(value, DataContainer):
def callback():
value = getattr(self, attribute_name, None)
if isinstance(value, ObservableDict):
value = value._data
value = transform_data_containers_to_dict(value)
self.set_crate_metadata(attribute_name, value)
self.save_field_to_cache(attribute_name)
value = ObservableDict(value, callback=callback)
self.add(attribute_name, value)
[docs]
def get_rocrate_entity(self) -> Union[ROCrateDataset, ROCrateFile]:
if self._rocrate_entity is None:
self._rocrate_entity = rocrate.get(self.unique_identifier.replace("\\", "/"))
if self._rocrate_entity is None:
rocrate_add_method = getattr(rocrate, f"add_{self._rocrate_type}")
self._rocrate_entity = rocrate_add_method(self.path, dest_path=self.unique_identifier)
for attr in dir(self.__class__):
if isinstance(getattr(self.__class__, attr), ROCrateMetadataProperty):
_ = getattr(self, attr) # Trigger adding default value
return self._rocrate_entity
@property
def additional_metadata(self):
d = self.get_rocrate_entity().properties()
return d
[docs]
def get_crate_metadata(self, key):
crate_metadata = self.get_rocrate_entity().properties()
if key in crate_metadata.keys():
return crate_metadata.get(key)
if "json_" + key in crate_metadata.keys():
return json.loads(crate_metadata["json_" + key])
raise ValueError(key)
[docs]
def set_crate_metadata(self, key, value):
crate_metadata = self.get_rocrate_entity().properties()
# crate_metadata[key] = self.crate_nest(key, value)
if isinstance(value, dict):
import json
crate_metadata["json_" + key] = json.dumps(value)
else:
crate_metadata[key] = value
[docs]
@classmethod
def close_cached_metadata(cls) -> None:
"""Close cached metadata and RO-Crate."""
super().close_cached_metadata()
rocrate.close_crate()
[docs]
@rocrate_metadata
def ontology_class(self) -> Optional[str]:
"""Ontology class of this data container."""
return self.class_path
@validator(description="Ontology is matched")
def ontology_is_matched(self) -> bool:
"""Check if the ontology is matched"""
from ..validators.global_validation import get_global_validator
from ..validators.ontology import OntologyValidator
ontology_validator = get_global_validator(OntologyValidator)
errors = ontology_validator.ontology.validate_file(self)
self.add_validation_error(errors)
return len(errors) == 0
[docs]
def update_rocrate_json() -> None:
"""Update the RO-Crate metadata file on disk if necessary."""
ROCrateMetadata.commit_if_scheduled()
atexit.register(ROCrateMetadata.close_cached_metadata)
################################################################