Source code for solidipes.loaders.rocrate_metadata
import atexit
import shutil
from typing import Literal, Optional, TypeVar, Union
from unittest.mock import patch
from rocrate.model.dataset import Dataset as ROCrateDataset
from rocrate.model.file import File as ROCrateFile
from rocrate.rocrate import ROCrate
from .data_container import DataContainer
from .cached_metadata import CachedMetadata, ObservableDict, cached_loadable
from ..utils import logging
from ..utils.utils import compute_checksum, get_study_root_path, transform_data_containers_to_dict
print = logging.invalidPrint
logger = logging.getLogger()
original_shutil_copy = shutil.copy
[docs]
def shutil_copy_skip_wildcard(src, dst, *, follow_symlinks=True):
if "*" in str(src) or "*" in str(dst):
logger.debug(f"Preventing RO-Crate to copy {src} to {dst}")
return
return original_shutil_copy(src, dst, follow_symlinks=follow_symlinks)
[docs]
class ROCrateProxy:
def __init__(self) -> None:
"""RO-Crate crate object proxy."""
self._crate: Optional[ROCrate] = None
@property
def crate(self) -> ROCrate:
if self._crate is not None:
return self._crate
try:
self._crate = ROCrate(get_study_root_path())
except ValueError: # Not a valid RO-Crate: missing ro-crate-metadata.json
logger.info("RO-Crate metadata file missing or invalid, creating a new one")
self._crate = ROCrate()
return self._crate
[docs]
def write_json(self) -> None:
import os
try:
study_root_path = get_study_root_path()
except FileNotFoundError:
return
logger.debug(f"Writing RO-Crate metadata {study_root_path}")
previous_checksum = None
previous_access_time = None
previous_modification_time = None
metadata_filepath = os.path.join(study_root_path, "ro-crate-metadata.json")
if os.path.exists(metadata_filepath) and os.path.isfile(metadata_filepath):
previous_checksum = compute_checksum(metadata_filepath)
stats = os.lstat(metadata_filepath)
previous_access_time = stats.st_atime
previous_modification_time = stats.st_mtime
not_ok = True
while not_ok:
try:
with patch("shutil.copy", side_effect=shutil_copy_skip_wildcard):
self.crate.write(study_root_path)
not_ok = False
if previous_checksum is None:
logger.debug(f"Saved RO-Crate metadata {study_root_path}")
return
checksum = compute_checksum(metadata_filepath)
if checksum != previous_checksum:
logger.debug(f"Saved RO-Crate metadata {study_root_path}")
return
os.utime(metadata_filepath, (previous_access_time, previous_modification_time))
except FileNotFoundError as e:
logger.error(e)
import os
fname = os.path.relpath(e.filename, self.crate.source)
self.crate.delete(fname)
def __getattr__(self, key: str):
return getattr(self.crate, key)
rocrate = ROCrateProxy()
[docs]
class ROCrateMetadataProperty(property):
"""Marker class for RO-Crate metadata properties."""
T = TypeVar("T")
[docs]
class rocrate_metadata(cached_loadable):
"""Decorator for loadables that are saved as RO-Crate metadata."""
[docs]
def wrapped_fget(self, obj):
data = super().wrapped_fget(obj)
value = data
if isinstance(value, ObservableDict):
value = value._data
def callback():
value = getattr(obj, self.key, None)
if isinstance(value, ObservableDict):
value = value._data
value = transform_data_containers_to_dict(value)
obj.additional_metadata[self.key] = value
obj.save_field_to_cache(self.key)
data = ObservableDict(value, callback=callback)
value = transform_data_containers_to_dict(value)
obj.additional_metadata[self.key] = value
return data
[docs]
def wrapped_fset(self, obj, value) -> None:
super().wrapped_fset(obj, value)
obj.additional_metadata[self.key] = value
[docs]
class ROCrateMetadata(CachedMetadata):
"""RO-Crate metadata."""
rocrate_metadata = rocrate_metadata
def __init__(self, *args, **kwargs) -> None:
self._rocrate_type: Literal["dataset", "directory", "file"] = "file"
self._rocrate_entity: Optional[Union[ROCrateDataset, ROCrateFile]] = None
super().__init__(*args, **kwargs)
[docs]
def update_cached_metadata(self) -> None:
self._load_rocrate_metadata()
super().update_cached_metadata()
[docs]
def _load_rocrate_metadata(self) -> None:
"""Load field from RO-Crate metadata file if not already loaded from cache."""
cls = self.__class__
rocrate_metadata_from_file = self.get_rocrate_entity().properties()
for attribute_name in dir(cls):
attribute = getattr(cls, attribute_name)
if not isinstance(attribute, rocrate_metadata):
continue
value = self._data_collection.get(attribute_name, None)
if value is None:
# Try to get value from RO-Crate metadata file
value = rocrate_metadata_from_file.get(attribute_name, None)
if value is None:
continue
if isinstance(value, ObservableDict):
value = value._data
if isinstance(value, dict) or isinstance(value, DataContainer):
def callback():
value = getattr(self, attribute_name, None)
if isinstance(value, ObservableDict):
value = value._data
value = transform_data_containers_to_dict(value)
self.additional_metadata[attribute_name] = value
self.save_field_to_cache(attribute_name)
value = ObservableDict(value, callback=callback)
self.add(attribute_name, value)
[docs]
def get_rocrate_entity(self) -> Union[ROCrateDataset, ROCrateFile]:
if self._rocrate_entity is None:
self._rocrate_entity = rocrate.get(self.unique_identifier.replace("\\", "/"))
if self._rocrate_entity is None:
rocrate_add_method = getattr(rocrate, f"add_{self._rocrate_type}")
self._rocrate_entity = rocrate_add_method(self.path, dest_path=self.unique_identifier)
for attr in dir(self.__class__):
if isinstance(getattr(self.__class__, attr), ROCrateMetadataProperty):
_ = getattr(self, attr) # Trigger adding default value
return self._rocrate_entity
@property
def additional_metadata(self) -> ObservableDict:
return ObservableDict(self.get_rocrate_entity().properties(), lambda: self._schedule_commit())
[docs]
@classmethod
def _commit(cls) -> None:
"""Update RO-Crate metadata file."""
rocrate.write_json()
super()._commit()
[docs]
@classmethod
def close_cached_metadata(cls) -> None:
"""Close cached metadata and RO-Crate."""
super().close_cached_metadata()
rocrate.close_crate()
atexit.register(ROCrateMetadata.close_cached_metadata)