from functools import cache
from typing import Callable
from .config import cloud_connection_timeout, solidipes_dirname
from .parsable import ( # noqa: F401
Parsable,
get_key_to_parsables,
last_parameter,
last_parameter_with_default,
optional_parameter,
parameter,
populate_parser,
)
[docs]
class bcolors:
RESET = "\033[0m"
BOLD = "\033[1m"
UNDERLINE = "\033[4m"
BLINK = "\033[5m"
BLACK = "\033[30m"
RED = "\033[31m"
GREEN = "\033[32m"
YELLOW = "\033[33m"
BLUE = "\033[34m"
MAGENTA = "\033[35m"
CYAN = "\033[36m"
WHITE = "\033[37m" # light gray
BRIGHT_BLACK = "\033[90m" # dark gray
BRIGHT_RED = "\033[91m"
BRIGHT_GREEN = "\033[92m"
BRIGHT_YELLOW = "\033[93m"
BRIGHT_BLUE = "\033[94m"
BRIGHT_MAGENTA = "\033[95m"
BRIGHT_CYAN = "\033[96m"
BRIGHT_WHITE = "\033[97m"
[docs]
class DataRepositoryException(ValueError):
pass
[docs]
class classproperty:
def __init__(self, func: Callable) -> None:
self.func = func
def __get__(self, _, owner):
return self.func(owner)
[docs]
def find_config_directory(initial_path="", dir_name=solidipes_dirname):
"""Find a directory in the current path or any of its parents."""
import os
current_path = os.path.abspath(initial_path)
while True:
# Check if current path contains the directory
test_path = os.path.join(current_path, dir_name)
if os.path.isdir(test_path):
return test_path
# Check if current path is the root
if current_path == os.path.dirname(current_path):
break
# Move up to the parent directory
current_path = os.path.dirname(current_path)
raise FileNotFoundError(f'The directory "{dir_name}" was not found in {initial_path} or any of its parents')
[docs]
def get_solidipes_directory(initial_path=""):
"""Get the path to the .solidipes directory."""
import os
try:
solidipes_directory = find_config_directory(initial_path, solidipes_dirname)
# If parent directory is user's home, it is invalid (user_solidipes_directory)
if os.path.dirname(solidipes_directory) == os.path.expanduser("~"):
raise FileNotFoundError
return solidipes_directory
except FileNotFoundError as e:
raise FileNotFoundError(f'{e}. Please run "solidipes init" at the root directory of your study.')
[docs]
def get_user_solidipes_directory():
import os
path = os.path.join(os.path.expanduser("~"), solidipes_dirname)
if not os.path.isdir(path):
if os.path.exists(path):
raise FileExistsError(f'"{path}" exists but is not a directory. Please remove it.')
os.mkdir(path)
return path
[docs]
def get_study_root_path(initial_path="", **kwargs):
import os
return os.path.dirname(get_solidipes_directory(initial_path))
[docs]
def get_path_relative_to_root(path):
"""Express path relative to study root."""
import os
path = os.path.abspath(path) # Also strips trailing slash
path = os.path.relpath(path, get_study_root_path())
return path
[docs]
def get_path_relative_to_workdir(path):
"""Convert path expressed relative to study root to path expressed relative to current working directory."""
import os
path = os.path.join(get_study_root_path(), path)
path = os.path.relpath(path, os.getcwd())
return path
[docs]
def init_git_repository(initial_path=""):
from git import Repo
git_repository = Repo.init(get_study_root_path(initial_path))
return git_repository
[docs]
def get_git_repository(initial_path=""):
import os
from git import Repo
current_path = os.path.abspath(initial_path)
git_repository = Repo(current_path, search_parent_directories=True)
return git_repository
[docs]
def get_git_root(initial_path=""):
repo = get_git_repository(initial_path)
git_root = repo.git.rev_parse("--show-toplevel")
return git_root
[docs]
def get_config_path(filename_var, initial_path="", check_existence=False, user=False):
import os
current_working_dir = os.getcwd()
return _get_config_path(current_working_dir, filename_var, initial_path, check_existence, user)
[docs]
@cache
def _get_config_path(
current_working_dir,
filename_var,
initial_path="",
check_existence=False,
user=False,
):
import os
from . import config
filename = getattr(config, filename_var)
if user:
config_directory = get_user_solidipes_directory()
else:
config_directory = get_solidipes_directory(initial_path)
path = os.path.join(config_directory, filename)
if check_existence and not os.path.isfile(path):
raise FileNotFoundError(
f'The file "{path}" does not exist. Please run "solidipes init" at the root directory of your study.'
)
return path
[docs]
def load_yaml(filename):
import yaml
with open(filename, "r", encoding="utf-8") as f:
config = yaml.safe_load(f.read())
if config is None:
config = {}
return config
[docs]
def save_yaml(filename, config):
import yaml
with open(filename, "w", encoding="utf-8") as f:
f.write(yaml.safe_dump(config))
return config
[docs]
def get_study_log_path():
import os
config_directory = get_solidipes_directory()
path = os.path.join(config_directory, "solidipes.logs")
return path
[docs]
def get_readme_path(*args, **kwargs):
import os
from .config import readme_filename
return os.path.join(get_study_root_path(*args, **kwargs), readme_filename)
[docs]
def get_study_description_path(*args, **kwargs):
import os
from .config import study_description_filename
return os.path.join(get_study_root_path(*args, **kwargs), study_description_filename)
[docs]
def get_mimes_path(*args, **kwargs):
return get_config_path("mimes_filename", *args, **kwargs)
[docs]
def get_config(filename_var, *args, **kwargs):
import os
path = get_config_path(filename_var, *args, **kwargs)
if not os.path.exists(path):
return {}
return load_yaml(path)
[docs]
def set_config(filename_var, config, *args, **kwargs) -> None:
path = get_config_path(filename_var, *args, **kwargs)
save_yaml(path, config)
[docs]
def add_completed_stage(stage) -> None:
completed = set(get_completed_stages())
completed.add(stage)
set_config("completed_stages_filename", {"completed_stages": list(completed)})
[docs]
def remove_completed_stage(stage) -> None:
completed = set(get_completed_stages())
if stage in completed:
completed.remove(stage)
set_config("completed_stages_filename", {"completed_stages": list(completed)})
[docs]
def get_completed_stages():
metadata = get_config("completed_stages_filename")
if metadata == {}:
return []
if "completed_stages" not in metadata:
return []
return metadata["completed_stages"]
[docs]
def is_stage_completed(stage):
return stage in get_completed_stages()
[docs]
def generate_readme(*args, with_title=True, **kwargs) -> None:
from .metadata import lang, licenses
from .utils import get_readme_path
readme_path = get_readme_path(**kwargs)
metadata = get_study_metadata(*args, **kwargs)
content = ""
try:
old_readme_content = open(readme_path, "r", encoding="utf-8").read()
except Exception:
old_readme_content = ""
if with_title:
content += f"# {metadata['title']}\n\n<br>\n\n"
content += "## Links\n\n"
if "DOI" in metadata:
doi = metadata["DOI"]
content += f"- Data DOI: [{doi}](https://doi.org/{doi})"
if "related_identifiers" in metadata:
rels = metadata["related_identifiers"]
for r in rels:
if "resource_type" not in r:
continue
content += f"- {r['relation']} *{r['resource_type']}* [{r['identifier']}]({r['identifier']})\n"
content += "\n"
content += "## Authors\n\n"
if "creators" in metadata:
authors = metadata["creators"]
for a in authors:
content += f"- **{a['name']}**"
if "affiliation" in a:
content += f", {a['affiliation']}"
if "orcid" in a:
content += f", ORCID: [{a['orcid']}](https://orcid.org/{a['orcid']})"
content += "\n"
content += "\n"
content += "## Language\n\n"
if "language" in metadata:
_lang = dict(lang)[metadata["language"]]
content += f"- {_lang}\n\n"
content += "## License\n\n"
if "license" in metadata:
lic = metadata["license"]
if isinstance(lic, dict):
lic = lic["id"]
_lic = dict(licenses)[lic.lower()]
content += f"- {_lic}\n\n"
content += metadata["description"]
if content != old_readme_content:
with open(readme_path, "w", encoding="UTF-8") as f:
f.write(content)
[docs]
def get_zenodo_infos(*args, **kwargs):
return get_config("zenodo_infos_filename", *args, **kwargs)
[docs]
def set_zenodo_infos(config, *args, **kwargs) -> None:
set_config("zenodo_infos_filename", config, *args, **kwargs)
[docs]
def get_mimes(*args, **kwargs):
try:
return get_config("mimes_filename", *args, **kwargs)
except FileNotFoundError:
return {}
[docs]
def set_mimes(config, *args, **kwargs) -> None:
set_config("mimes_filename", config, *args, **kwargs)
[docs]
def get_ignore(*args, **kwargs) -> set[str]:
from .config import default_ignore_patterns
ignore: set[str] = set(get_config("ignore_filename", *args, **kwargs))
ignore = ignore.union(default_ignore_patterns)
ignore = [e.strip() for e in ignore if e.strip() != ""]
return ignore
[docs]
def set_ignore(config: set[str], *args, **kwargs) -> None:
config = [e.strip() for e in config if e.strip() != ""]
set_config("ignore_filename", list(config), *args, **kwargs)
[docs]
def dict_to_pretty_str(d: dict, indent=0, indent_size=4):
string = ""
indent_string = " " * indent_size
for key, value in d.items():
string += indent_string * indent + str(key) + ": "
if isinstance(value, dict):
string += "{\n" + dict_to_pretty_str(value, indent + 1) + indent_string * indent + "}\n"
else:
string += str(value) + "\n"
return string
[docs]
def rename_file(old_name, new_name) -> None:
import subprocess
from solidipes.utils.git_infos import GitInfos
git_infos = GitInfos()
if git_infos.repository and get_path_relative_to_root(old_name) not in git_infos.repository.untracked_files:
cmd = f"git mv {old_name} {new_name}"
else:
cmd = f"mv {old_name} {new_name}"
subprocess.call(cmd, shell=True)
[docs]
def compute_checksum(filepath: str) -> str:
import hashlib
with open(filepath, "rb") as f:
file_hash = hashlib.md5()
while chunk := f.read(8192):
file_hash.update(chunk)
return file_hash.hexdigest()
[docs]
def run_and_check_return(
command,
headless=False,
fail_message="Error",
timeout=cloud_connection_timeout,
cwd=None,
) -> None:
import subprocess
process = subprocess.run(
command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
timeout=timeout,
cwd=cwd,
input=b"\n" if headless else None,
)
check_process_return(process, fail_message)
stdout = process.stdout.decode().strip()
stderr = process.stderr.decode().strip()
return stdout, stderr
[docs]
class ExecError(RuntimeError):
pass
[docs]
def check_process_return(process, fail_message) -> None:
import subprocess
try:
process.check_returncode()
except subprocess.CalledProcessError as e:
if e.stderr:
raise ExecError(f"{fail_message}: {e.stderr.decode()}")
else:
raise ExecError(fail_message)
[docs]
def string_to_regex_pattern(pattern: str) -> str:
"""Convert a string into a pattern that matches exactly."""
import re
pattern = "^" + re.escape(pattern) + "$"
return pattern