import os
from datasize import DataSize
from solidipes.uploaders.uploader import Uploader
from solidipes.utils import generate_readme, include_metadata_description
from solidipes.utils import study_medatada_mandatory_fields as mandatory_fields
from solidipes.utils import study_medatada_removed_fields_upload as removed_fields
from ..utils.zenodo_utils import (
ZenodoException,
clean_deposition,
create_deposition,
get_access_token,
get_existing_deposition_identifier,
get_existing_deposition_infos,
save_deposition_identifier,
upload_archive,
upload_deposition_metadata,
)
################################################################
[docs]
class ZenodoUploader(Uploader):
command = "zenodo"
command_help = "Publish study to Zenodo"
[docs]
def upload(self, args):
try:
main(args)
except ZenodoException as e:
handle_zenodo_exception(e)
[docs]
def populate_arg_parser(self, parser):
parser.description = self.command_help
parser.add_argument(
"directory",
nargs="?",
default=None,
help=(
"Path to the directory containing the study to upload. Defaults to the root of the current Solidipes"
" study."
),
)
parser.add_argument(
"--sandbox",
help="use Zenodo sandbox test platform",
action="store_true",
)
parser.add_argument("--access_token", type=str, default=None, help="Provide the Zenodo token")
parser.add_argument("--no_cleanup", action="store_true", help="Do not clean the produced archive")
deposition_group = parser.add_mutually_exclusive_group()
deposition_group.add_argument(
"--new-deposition",
help="create a new deposition instead of updating a previously created one",
action="store_true",
)
deposition_group.add_argument(
"--tmp_dir",
help=(
"specify an existing directory where to store the temporary objects. Default to the system's temporary"
" directory."
),
default="/tmp" if os.name != "nt" else os.path.expanduser(r"~\AppData\Local\Temp"),
type=str,
)
deposition_group.add_argument(
"--existing-deposition",
dest="existing_identifier",
nargs="?",
help="URL or DOI of the study to update. It must be in unplublished state.",
)
################################################################
[docs]
def handle_zenodo_exception(e):
print(e)
if "has been deleted" in str(e) or "does not exist" in str(e):
print(
'Run the command with the "--new-deposition" option to create'
' a new entry, or the "--existing-deposition" option to use'
" another existing entry."
)
if "Error deleting file" in str(e):
print("Please check that the deposition is in draft state.")
################################################################
[docs]
def main(args):
"""Upload content to Zenodo"""
# fetch where is the root of the things to Zip
get_root_directory(args)
# Zip directory into temporary file
# mount_all()
generate_readme()
create_archive(args)
print("Uploading archive")
get_deposition_uri(args)
upload_deposition_metadata(**vars(args))
# upload the archive
upload_archive(**vars(args))
# Final message
print("Upload complete.")
print("Please review your deposition and publish it when ready.")
# Remove temporary file
if args.no_cleanup:
print(f'The archive has been kept at "{args.archive_path}".')
else:
os.remove(args.archive_path)
print("Deleted temporary archive.")
################################################################
[docs]
def get_root_directory(config):
from solidipes.utils.utils import get_study_root_path
if config.directory is None:
config.root_directory = get_study_root_path()
else:
config.root_directory = config.directory
################################################################
[docs]
def get_deposition_uri(config):
root_directory = config.root_directory
# Check if the directory exists
if not os.path.isdir(root_directory):
raise ValueError(f"Error: directory {root_directory} does not exist")
# Check if the metadata file exists and load it
metadata = load_and_check_metadata(config)
if config.access_token is None:
config.access_token = get_access_token()
get_cleaned_deposition_infos(config)
config.metadata = metadata
################################################################
################################################################
[docs]
def create_archive(config, _print=print):
"""Create a temporary zip archive of the directory"""
if "_print" in config:
_print = config._print
dir_path = config.root_directory
import zipfile
from solidipes.scanners.scanner_local import ExportScanner
from solidipes.utils.utils import bcolors, solidipes_dirname
archive_filename = _get_archive_filename(dir_path)
if config.tmp_dir is None:
config.tmp_dir = "/tmp" if os.name != "nt" else os.path.expanduser(r"~\AppData\Local\Temp")
archive_path = os.path.join(config.tmp_dir, archive_filename)
scanner = ExportScanner()
if os.path.exists(archive_path) and scanner.get_modified_time() < os.path.getmtime(archive_path):
_print(f"Using existing archive {archive_path}...")
config.archive_path = archive_path
return
_print(f"Creating archive {archive_path}...")
with zipfile.ZipFile(archive_path, "w", strict_timestamps=False) as zip_file:
for current_dir, sub_dirs, files in os.walk(dir_path):
# Remove excluded dirs (except .solidipes, which can be matched to ".*")
sub_dirs[:] = [
d for d in sub_dirs if (not scanner.is_excluded(os.path.join(current_dir, d))) or d == solidipes_dirname
]
if current_dir != dir_path: # prevent addition of "."
zip_path = os.path.relpath(current_dir, dir_path)
zip_file.write(current_dir, zip_path)
# Print tree
depth = len(zip_path.split(os.sep))
_print("│ " * depth + f"{bcolors.BRIGHT_BLUE}{current_dir.split(os.sep)[-1]}{bcolors.RESET}")
for filename in files:
path = os.path.join(current_dir, filename)
# Exclude files
if scanner.is_excluded(path):
continue
zip_path = os.path.relpath(path, dir_path)
try:
zip_file.write(
path,
zip_path,
)
except Exception as e:
print(f"error during zip of file {path} into {zip_path}")
raise e
# Print tree
depth = len(zip_path.split(os.sep))
_print("│ " * depth + filename)
print(f"\nArchive size: {DataSize(os.path.getsize(archive_path)):.2a}\n")
config.archive_path = archive_path
################################################################
[docs]
def _get_archive_filename(dir_path: str) -> str:
dir_name = os.path.basename(os.path.normpath(dir_path))
archive_name = dir_name if dir_name != "." else "archive"
archive_name = f"{archive_name}.zip"
return archive_name
################################################################
[docs]
def get_cleaned_deposition_infos(config):
"""Get deposition urls
If no deposition has been created yet, or if new_deposition is True, create a new deposition.
Otherwise, the saved deposition or the one specified by existing_identifier is used.
"""
new_deposition = config.new_deposition
existing_identifier = config.existing_identifier
access_token = config.access_token
sandbox = config.sandbox
root_directory = config.root_directory
deposition_identifier = None
# Get existing deposition identifier, if any
if existing_identifier:
deposition_identifier = existing_identifier
elif not new_deposition:
# Otherwise, load saved identifier
deposition_identifier = get_existing_deposition_identifier(root_directory)
if deposition_identifier:
# Update existing record
deposition_url, bucket_url, web_url = get_existing_deposition_infos(
deposition_identifier, access_token, sandbox
)
print(f"Updating deposition at {web_url}")
# Delete current files
clean_deposition(deposition_url, access_token)
else:
# Create deposition
deposition_url, bucket_url, web_url = create_deposition(access_token, sandbox)
print(f"Deposition created: {web_url}")
# Save deposition identifier if successfully created or accessed
save_deposition_identifier(web_url, root_directory)
config.deposition_url = deposition_url
config.bucket_url = bucket_url