import os
import zipfile
from datasize import DataSize
from solidipes.scanners.scanner_local import ExportScanner
from solidipes.uploaders.uploader import Uploader, text_progress_bar
from solidipes.utils import (
DataRepositoryException,
bcolors,
generate_readme,
get_study_metadata,
get_study_metadata_path,
get_study_root_path,
include_metadata_description,
solidipes_dirname,
)
from solidipes.utils import study_medatada_mandatory_fields as mandatory_fields
from solidipes.utils import study_medatada_removed_fields_upload as removed_fields
from ..utils.dspace7_utils import (
clean_deposition,
create_deposition,
get_access_token,
get_existing_deposition_identifier,
get_existing_deposition_infos,
list_collections,
save_deposition_identifier,
upload_archive,
upload_deposition_metadata,
)
################################################################
[docs]
class DFspace7Uploader(Uploader):
command = ["dspace7", "infoscience", "boris"]
command_help = "Publish study to a DSpace7 platform (e.g. Infoscience)"
[docs]
def upload(self, args):
try:
main(args)
except DataRepositoryException as e:
handle_dspace7_exception(e)
[docs]
def populate_arg_parser(self, parser):
parser.description = self.command_help
parser.add_argument(
"directory",
nargs="?",
default=None,
help=(
"Path to the directory containing the study to upload."
" Defaults to the root of the current Solidipes study."
),
)
parser.add_argument(
"--host",
type=str,
default="infoscience-sb.epfl.ch",
help="hostname of the Dspace7 instance",
# action="store_true"
)
parser.add_argument(
"--user",
type=str,
default="john.smith@epfl.ch",
help="username",
# action="store_true"
)
parser.add_argument(
"--pw",
type=str,
default="XXXXXX",
help="password",
# action="store_true"
)
parser.add_argument(
"--collection",
type=str,
default=None,
help="UUID of the target collection on the instance",
# getaction="store_true"
)
parser.add_argument("--access_token", type=str, default=None, help="Provide the Dspace7 token")
parser.add_argument("--no_cleanup", action="store_true", help="Do not clean the produced archive")
deposition_group = parser.add_mutually_exclusive_group()
deposition_group.add_argument(
"--new-deposition",
help="create a new deposition instead of updating a previously created one",
action="store_true",
)
deposition_group.add_argument(
"--tmp_dir",
help=(
"specify an existing directory where to store the temporary objects. Default to the system's temporary"
" directory."
),
default="/tmp" if os.name != "nt" else os.path.expanduser(r"~\AppData\Local\Temp"),
type=str,
)
deposition_group.add_argument(
"--existing-deposition",
dest="existing_identifier",
nargs="?",
help="URL or DOI of the study to update. It must be in unplublished state.",
)
################################################################
[docs]
def handle_dspace7_exception(e):
print(e)
if "has been deleted" in str(e) or "does not exist" in str(e):
print(
'Run the command with the "--new-deposition" option to create'
' a new entry, or the "--existing-deposition" option to use'
" another existing entry."
)
if "Error deleting file" in str(e):
print("Please check that the deposition is in draft state.")
################################################################
[docs]
def main(args, progressbar=text_progress_bar):
"""Upload content to Dspace7"""
# fetch where is the root of the things to Zip
get_root_directory(args)
# Zip directory into temporary file
# mount_all()
generate_readme()
create_archive(args)
print("Uploading archive")
get_deposition_uri(args)
upload_deposition_metadata(**vars(args))
# upload the archive
upload_archive(progressbar=progressbar, **vars(args))
# Final message
print("Upload complete.")
print("Please review your deposition and publish it when ready.")
# Remove temporary file
if args.no_cleanup:
print(f'The archive has been kept at "{args.archive_path}".')
else:
os.remove(args.archive_path)
print("Deleted temporary archive.")
################################################################
[docs]
def get_root_directory(config):
if config.directory is None:
config.root_directory = get_study_root_path()
else:
config.root_directory = config.directory
################################################################
[docs]
def get_deposition_uri(config):
root_directory = config.root_directory
# Check if the directory exists
if not os.path.isdir(root_directory):
raise ValueError(f"Error: directory {root_directory} does not exist")
# Check if the metadata file exists and load it
metadata = load_and_check_metadata(config)
if config.access_token is None:
config.access_token = get_access_token(config.host)
if config.collection is None:
collections = list_collections(config.host)
for idx, coll in enumerate(collections):
print(f"{idx}.\t{coll['name']}")
coll_selection = input("Enter the target collection number: ")
config.collection = collections[int(coll_selection)]["id"]
get_cleaned_deposition_infos(config)
config.metadata = metadata
################################################################
################################################################
[docs]
def create_archive(config, _print=print):
"""Create a temporary zip archive of the directory"""
if "_print" in config:
_print = config._print
dir_path = config.root_directory
archive_filename = _get_archive_filename(dir_path)
if config.tmp_dir is None:
config.tmp_dir = "/tmp" if os.name != "nt" else os.path.expanduser(r"~\AppData\Local\Temp")
archive_path = os.path.join(config.tmp_dir, archive_filename)
scanner = ExportScanner()
if os.path.exists(archive_path) and scanner.get_modified_time() < os.path.getmtime(archive_path):
_print(f"Using existing archive {archive_path}...")
config.archive_path = archive_path
return
_print(f"Creating archive {archive_path}...")
with zipfile.ZipFile(archive_path, "w", strict_timestamps=False) as zip_file:
for current_dir, sub_dirs, files in os.walk(dir_path):
# Remove excluded dirs (except .solidipes, which can be matched to ".*")
sub_dirs[:] = [
d for d in sub_dirs if (not scanner.is_excluded(os.path.join(current_dir, d))) or d == solidipes_dirname
]
if current_dir != dir_path: # prevent addition of "."
zip_path = os.path.relpath(current_dir, dir_path)
zip_file.write(current_dir, zip_path)
# Print tree
depth = len(zip_path.split(os.sep))
_print("│ " * depth + f"{bcolors.BRIGHT_BLUE}{current_dir.split(os.sep)[-1]}{bcolors.RESET}")
for filename in files:
path = os.path.join(current_dir, filename)
# Exclude files
if scanner.is_excluded(path):
continue
zip_path = os.path.relpath(path, dir_path)
try:
zip_file.write(
path,
zip_path,
)
except Exception as e:
print(f"error during zip of file {path} into {zip_path}")
raise e
# Print tree
depth = len(zip_path.split(os.sep))
_print("│ " * depth + filename)
print(f"\nArchive size: {DataSize(os.path.getsize(archive_path)):.2a}\n")
config.archive_path = archive_path
################################################################
[docs]
def _get_archive_filename(dir_path: str) -> str:
dir_name = os.path.basename(os.path.normpath(dir_path))
archive_name = dir_name if dir_name != "." else "archive"
archive_name = f"{archive_name}.zip"
return archive_name
################################################################
[docs]
def get_cleaned_deposition_infos(config):
"""Get deposition urls
If no deposition has been created yet, or if new_deposition is True, create a new deposition.
Otherwise, the saved deposition or the one specified by existing_identifier is used.
"""
new_deposition = config.new_deposition
existing_identifier = config.existing_identifier
access_token = config.access_token
root_directory = config.root_directory
collection = config.collection
hostname = config.host
sandbox = False
deposition_identifier = None
# Get existing deposition identifier, if any
if existing_identifier:
deposition_identifier = existing_identifier
elif not new_deposition:
# Otherwise, load saved identifier
deposition_identifier = get_existing_deposition_identifier(root_directory)
if deposition_identifier:
# Update existing record
deposition_url, bucket_url, web_url = get_existing_deposition_infos(
deposition_identifier, access_token, sandbox
)
print(f"Updating deposition at {web_url}")
# Delete current files
clean_deposition(deposition_url, access_token)
else:
# Create deposition
deposition_url, bucket_url, web_url = create_deposition(access_token, hostname, collection)
print(f"Deposition created: {web_url}")
# Save deposition identifier if successfully created or accessed
save_deposition_identifier(web_url, root_directory)
config.deposition_url = deposition_url
config.bucket_url = bucket_url