Source code for solidipes_core_plugin.uploaders.dspace7

import os
import zipfile

from datasize import DataSize
from solidipes.scanners.scanner_local import ExportScanner
from solidipes.uploaders.uploader import Uploader, text_progress_bar
from solidipes.utils import (
    DataRepositoryException,
    bcolors,
    generate_readme,
    get_study_metadata,
    get_study_metadata_path,
    get_study_root_path,
    include_metadata_description,
    solidipes_dirname,
)
from solidipes.utils import study_medatada_mandatory_fields as mandatory_fields
from solidipes.utils import study_medatada_removed_fields_upload as removed_fields

from ..utils.dspace7_utils import (
    clean_deposition,
    create_deposition,
    get_access_token,
    get_existing_deposition_identifier,
    get_existing_deposition_infos,
    list_collections,
    save_deposition_identifier,
    upload_archive,
    upload_deposition_metadata,
)

################################################################


[docs] class DFspace7Uploader(Uploader): command = ["dspace7", "infoscience", "boris"] command_help = "Publish study to a DSpace7 platform (e.g. Infoscience)"
[docs] def upload(self, args): try: main(args) except DataRepositoryException as e: handle_dspace7_exception(e)
[docs] def populate_arg_parser(self, parser): parser.description = self.command_help parser.add_argument( "directory", nargs="?", default=None, help=( "Path to the directory containing the study to upload." " Defaults to the root of the current Solidipes study." ), ) parser.add_argument( "--host", type=str, default="infoscience-sb.epfl.ch", help="hostname of the Dspace7 instance", # action="store_true" ) parser.add_argument( "--user", type=str, default="john.smith@epfl.ch", help="username", # action="store_true" ) parser.add_argument( "--pw", type=str, default="XXXXXX", help="password", # action="store_true" ) parser.add_argument( "--collection", type=str, default=None, help="UUID of the target collection on the instance", # getaction="store_true" ) parser.add_argument("--access_token", type=str, default=None, help="Provide the Dspace7 token") parser.add_argument("--no_cleanup", action="store_true", help="Do not clean the produced archive") deposition_group = parser.add_mutually_exclusive_group() deposition_group.add_argument( "--new-deposition", help="create a new deposition instead of updating a previously created one", action="store_true", ) deposition_group.add_argument( "--tmp_dir", help=( "specify an existing directory where to store the temporary objects. Default to the system's temporary" " directory." ), default="/tmp" if os.name != "nt" else os.path.expanduser(r"~\AppData\Local\Temp"), type=str, ) deposition_group.add_argument( "--existing-deposition", dest="existing_identifier", nargs="?", help="URL or DOI of the study to update. It must be in unplublished state.", )
################################################################
[docs] def handle_dspace7_exception(e): print(e) if "has been deleted" in str(e) or "does not exist" in str(e): print( 'Run the command with the "--new-deposition" option to create' ' a new entry, or the "--existing-deposition" option to use' " another existing entry." ) if "Error deleting file" in str(e): print("Please check that the deposition is in draft state.")
################################################################
[docs] def main(args, progressbar=text_progress_bar): """Upload content to Dspace7""" # fetch where is the root of the things to Zip get_root_directory(args) # Zip directory into temporary file # mount_all() generate_readme() create_archive(args) print("Uploading archive") get_deposition_uri(args) upload_deposition_metadata(**vars(args)) # upload the archive upload_archive(progressbar=progressbar, **vars(args)) # Final message print("Upload complete.") print("Please review your deposition and publish it when ready.") # Remove temporary file if args.no_cleanup: print(f'The archive has been kept at "{args.archive_path}".') else: os.remove(args.archive_path) print("Deleted temporary archive.")
################################################################
[docs] def get_root_directory(config): if config.directory is None: config.root_directory = get_study_root_path() else: config.root_directory = config.directory
################################################################
[docs] def get_deposition_uri(config): root_directory = config.root_directory # Check if the directory exists if not os.path.isdir(root_directory): raise ValueError(f"Error: directory {root_directory} does not exist") # Check if the metadata file exists and load it metadata = load_and_check_metadata(config) if config.access_token is None: config.access_token = get_access_token(config.host) if config.collection is None: collections = list_collections(config.host) for idx, coll in enumerate(collections): print(f"{idx}.\t{coll['name']}") coll_selection = input("Enter the target collection number: ") config.collection = collections[int(coll_selection)]["id"] get_cleaned_deposition_infos(config) config.metadata = metadata
################################################################
[docs] def load_and_check_metadata(config): """Load/create metadata file and check if mandatory fields are present""" dir_path = config.root_directory metadata = get_study_metadata(initial_path=dir_path, check_existence=True) metadata_path = get_study_metadata_path(initial_path=dir_path) # Replace description with content from DESCRIPTION.md converted in HTML metadata = include_metadata_description(metadata, md_to_html=True, use_readme=False, initial_path=dir_path) # Check if mandatory fields are present for field in mandatory_fields.keys(): if field not in metadata or not metadata[field]: raise ValueError( f'Error: field "{field}" is missing from metadata file or is empty. Please edit {metadata_path} and try' " again." ) # Check that creators is a list if not isinstance(metadata["creators"], list): raise ValueError(f'Error: field "creators" must be a list. Please edit {metadata_path} and try again.') # Check that each creator has a name for creator in metadata["creators"]: if "name" not in creator or not creator["name"]: raise ValueError( f'Error: field "name" is missing from one of the creators. Please edit {metadata_path} and try again.' ) # Clean for field in removed_fields: if field in metadata: del metadata[field] if "related_identifiers" in metadata: for related_identifier in metadata["related_identifiers"]: if "relation" in related_identifier and related_identifier["relation"] == "isVersionOf": related_identifier["relation"] = "isNewVersionOf" return metadata
################################################################
[docs] def create_archive(config, _print=print): """Create a temporary zip archive of the directory""" if "_print" in config: _print = config._print dir_path = config.root_directory archive_filename = _get_archive_filename(dir_path) if config.tmp_dir is None: config.tmp_dir = "/tmp" if os.name != "nt" else os.path.expanduser(r"~\AppData\Local\Temp") archive_path = os.path.join(config.tmp_dir, archive_filename) scanner = ExportScanner() if os.path.exists(archive_path) and scanner.get_modified_time() < os.path.getmtime(archive_path): _print(f"Using existing archive {archive_path}...") config.archive_path = archive_path return _print(f"Creating archive {archive_path}...") with zipfile.ZipFile(archive_path, "w", strict_timestamps=False) as zip_file: for current_dir, sub_dirs, files in os.walk(dir_path): # Remove excluded dirs (except .solidipes, which can be matched to ".*") sub_dirs[:] = [ d for d in sub_dirs if (not scanner.is_excluded(os.path.join(current_dir, d))) or d == solidipes_dirname ] if current_dir != dir_path: # prevent addition of "." zip_path = os.path.relpath(current_dir, dir_path) zip_file.write(current_dir, zip_path) # Print tree depth = len(zip_path.split(os.sep)) _print("│ " * depth + f"{bcolors.BRIGHT_BLUE}{current_dir.split(os.sep)[-1]}{bcolors.RESET}") for filename in files: path = os.path.join(current_dir, filename) # Exclude files if scanner.is_excluded(path): continue zip_path = os.path.relpath(path, dir_path) try: zip_file.write( path, zip_path, ) except Exception as e: print(f"error during zip of file {path} into {zip_path}") raise e # Print tree depth = len(zip_path.split(os.sep)) _print("│ " * depth + filename) print(f"\nArchive size: {DataSize(os.path.getsize(archive_path)):.2a}\n") config.archive_path = archive_path
################################################################
[docs] def _get_archive_filename(dir_path: str) -> str: dir_name = os.path.basename(os.path.normpath(dir_path)) archive_name = dir_name if dir_name != "." else "archive" archive_name = f"{archive_name}.zip" return archive_name
################################################################
[docs] def get_cleaned_deposition_infos(config): """Get deposition urls If no deposition has been created yet, or if new_deposition is True, create a new deposition. Otherwise, the saved deposition or the one specified by existing_identifier is used. """ new_deposition = config.new_deposition existing_identifier = config.existing_identifier access_token = config.access_token root_directory = config.root_directory collection = config.collection hostname = config.host sandbox = False deposition_identifier = None # Get existing deposition identifier, if any if existing_identifier: deposition_identifier = existing_identifier elif not new_deposition: # Otherwise, load saved identifier deposition_identifier = get_existing_deposition_identifier(root_directory) if deposition_identifier: # Update existing record deposition_url, bucket_url, web_url = get_existing_deposition_infos( deposition_identifier, access_token, sandbox ) print(f"Updating deposition at {web_url}") # Delete current files clean_deposition(deposition_url, access_token) else: # Create deposition deposition_url, bucket_url, web_url = create_deposition(access_token, hostname, collection) print(f"Deposition created: {web_url}") # Save deposition identifier if successfully created or accessed save_deposition_identifier(web_url, root_directory) config.deposition_url = deposition_url config.bucket_url = bucket_url