Source code for solidipes.utils.cloud

import os
import subprocess
import tempfile
import uuid

from ..utils import solidipes_logging as logging
from .config import cloud_connection_timeout
from .utils import (
    get_cloud_dir_path,
    get_cloud_info,
    get_path_relative_to_root,
    get_path_relative_to_workdir,
    set_cloud_info,
)

print = logging.invalidPrint
logger = logging.getLogger()

key_names_per_mount_type = {
    "s3": ["access_key_id", "secret_access_key", "username", "password"],
    "smb": ["password"],
}


[docs] def check_process_return(process, fail_message): try: process.check_returncode() except subprocess.CalledProcessError as e: if e.stderr: raise RuntimeError(f"{fail_message}: {e.stderr.decode()}") else: raise RuntimeError(fail_message)
[docs] def get_existing_mount_info(path): path = get_path_relative_to_root(path) config = get_cloud_info() if path not in config: raise ValueError(f'Path "{path}" has not been set as mounting point.') mount_info = config[path] return mount_info
[docs] def get_mount_id(mount_info): """Create new unique mount_id if not already set.""" if "mount_id" not in mount_info: mount_id = str(uuid.uuid4()) mount_info["mount_id"] = mount_id else: mount_id = mount_info["mount_id"] return mount_id
[docs] def mount(path, mount_info, **kwargs): if os.path.ismount(path): raise RuntimeError(f'"{path}" is already mounted.') mount_type = mount_info["type"] if mount_type == "s3": mount_system = mount_info.get("system", "juicefs") if mount_system == "juicefs": mount_s3_juicefs(path, mount_info, **kwargs) elif mount_system == "s3fs": mount_s3fs(path, mount_info, **kwargs) elif mount_type == "ssh": mount_system = mount_info.get("system", "sshfs") if mount_system == "sshfs": mount_sshfs(path, mount_info, **kwargs) elif mount_type == "nfs": mount_system = mount_info.get("system", "mount") if mount_system == "mount": mount_nfs_with_mount_command(path, mount_info, **kwargs) elif mount_type == "smb": mount_system = mount_info.get("system", "mount") if mount_system == "mount": mount_smb_with_mount_command(path, mount_info, **kwargs) else: raise ValueError(f'Unknown cloud storage type "{mount_type}".') wait_mount(path)
[docs] def wait_mount(path): import time wait = 0 while not os.path.ismount(path): time.sleep(1) wait += 1 if wait > cloud_connection_timeout: raise RuntimeError(f'"{path}" may not be mounted.')
[docs] def mount_s3fs(path, mount_info=None): if mount_info is None: mount_info = get_existing_mount_info(path) # Check that keys are available if "access_key_id" not in mount_info or "secret_access_key" not in mount_info: raise RuntimeError("Mounting failed: access_key_id and secret_access_key are not available.") # Create directory if it does not exist if not os.path.exists(path): os.makedirs(path) # Create temporary passwd file passwd_path = write_temp_passwd_file(mount_info["access_key_id"], mount_info["secret_access_key"]) # Mount S3 bucket bucket_path = mount_info["bucket_name"] mount_id = get_mount_id(mount_info) remote_dir_name = mount_info.get("remote_dir_name", mount_id) if remote_dir_name != ".": bucket_path += f":/{remote_dir_name.rstrip('/')}" mount_process = subprocess.run( [ "s3fs", bucket_path, path, "-o", f"passwd_file={passwd_path}", "-o", f"url={mount_info['endpoint_url']}", "-o", "nonempty", ], stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=cloud_connection_timeout, ) check_process_return(mount_process, "Mounting failed") # Remove temporary passwd file os.remove(passwd_path)
[docs] def write_temp_passwd_file(access_key_id, secret_access_key): with tempfile.NamedTemporaryFile(mode="w", suffix=".passwd", delete=False) as f: f.write(f"{access_key_id}:{secret_access_key}\n") file_path = f.name return file_path
[docs] def mount_s3_juicefs(path, mount_info=None, **kwargs): if mount_info is None: mount_info = get_existing_mount_info(path) logger.debug(mount_info) # Create directory if it does not exist if not os.path.exists(path): os.makedirs(path) if "database_url" in mount_info: mount_s3_juicefs_psql(path, mount_info, **kwargs) else: mount_s3_juicefs_sqlite3(path, mount_info, **kwargs)
[docs] def mount_s3_juicefs_sqlite3(path, mount_info=None, **kwargs): mount_id = get_mount_id(mount_info) database_filename = f"{mount_id}.db" database_path = os.path.join(get_cloud_dir_path(), database_filename) database_url = f"sqlite3://{database_path}" bucket_url = f"{mount_info['endpoint_url'].rstrip('/')}/{mount_info['bucket_name']}" os.environ["AWS_ACCESS_KEY"] = mount_info["access_key_id"] os.environ["AWS_SECRET_ACCESS_KEY"] = mount_info["secret_access_key"] # Create database file and remote directory if first time mount if not os.path.exists(database_path): remote_dir_name = mount_info.get("remote_dir_name", mount_id) format_process = subprocess.run( [ "juicefs", "format", "--storage", "s3", "--bucket", bucket_url, database_url, remote_dir_name, ], stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=cloud_connection_timeout, ) check_process_return(format_process, "Formatting failed") # Mount S3 bucket mount_process = subprocess.run( [ "juicefs", "mount", "--background", database_url, path, ], stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=cloud_connection_timeout, ) check_process_return(mount_process, "Mounting failed")
[docs] def connect_to_postgres(psql_config): import psycopg2 database_name = psql_config.database HOST = psql_config.host PORT = psql_config.port ADMIN_USERNAME = psql_config.username ADMIN_PASSWORD = psql_config.password try: connection = psycopg2.connect( host=HOST, port=PORT, user=ADMIN_USERNAME, password=ADMIN_PASSWORD, database=database_name, ) connection.autocommit = True except Exception as e: message = f"Error connecting to Postgres: {e}" raise RuntimeError(message).with_traceback(e.__traceback__) return connection
[docs] def mount_s3_juicefs_psql(path, mount_info=None, allow_root=False, **kwargs): # Create mount_id (if necessary), used to find database file mount_id = get_mount_id(mount_info) logger.info(mount_info) if mount_info is None: mount_info = get_existing_mount_info(path) logger.info(mount_info) # Create directory if it does not exist if not os.path.exists(path): os.makedirs(path) # Create mount_id (if necessary), used to find database file # mount_id = get_mount_id(mount_info) database_url = mount_info["database_url"] if not database_url.startswith("postgres://"): raise RuntimeError(f"Inconsistent database url: {database_url}") logger.debug(database_url) protocol, url = database_url.split("://") host = url.split("/")[0] # database_name = url.split("/")[1].split("?")[0] database_name = "dcsm" port = 5432 if "username" in mount_info: username = mount_info["username"] elif "DCSM_USERNAME" in os.environ: username = os.environ["DCSM_USERNAME"] else: raise RuntimeError("Cannot find DCSM username") database_url = ( protocol + "://" + username + "@" + url + "/" + database_name + f"?sslmode=disable&search_path=juicefs-{mount_id}" ) logger.debug(url) logger.debug(host) logger.debug(database_name) if "password" in mount_info: psql_password = mount_info["password"] elif "DCSM_PASSWORD" in os.environ: psql_password = os.environ["DCSM_PASSWORD"] else: raise RuntimeError("Cannot find DCSM password") import argparse psql_config = argparse.Namespace( database=database_name, host=host, port=port, username=username, password=psql_password ) conn = connect_to_postgres(psql_config) cursor = conn.cursor() from psycopg2 import sql if username == "admin": mount_info_query = sql.SQL("SELECT * from storage where mount_id = {mount_id}").format( mount_id=sql.Literal(f"juicefs-{mount_id}") ) logger.debug(mount_info_query) cursor.execute(mount_info_query) db_mount_info = [i for i in cursor][0] logger.debug(db_mount_info) _, _, _, _, access_key, secret_key = db_mount_info else: mount_info_query = sql.SQL("SELECT * from {username}.user_mounts where mount_id = {mount_id}").format( username=sql.Identifier(username), mount_id=sql.Literal(f"juicefs-{mount_id}") ) logger.debug(mount_info_query) cursor.execute(mount_info_query) db_mount_info = [i for i in cursor] logger.info(db_mount_info) db_mount_info = db_mount_info[0] logger.debug(db_mount_info) _, _, access_key, secret_key = db_mount_info env = {"META_PASSWORD": psql_password, "AWS_ACCESS_KEY": access_key, "AWS_SECRET_ACCESS_KEY": secret_key} logger.error(env) env.update(os.environ) cmd = ["juicefs", "mount", "--background"] if allow_root: cmd += [ "-o", "allow_root", ] cmd += [ database_url, path, ] logger.debug(cmd) # Mount S3 bucket mount_process = subprocess.run( cmd, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=cloud_connection_timeout, ) check_process_return(mount_process, "Mounting failed") # Remove keys from database remove_keys_process = subprocess.run( [ "juicefs", "config", database_url, "--access-key", "", "--secret-key", "", "--force", # Skip keys validation ], env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=cloud_connection_timeout, ) check_process_return(remove_keys_process, "Failed to remove keys from database")
[docs] def mount_sshfs(path, mount_info=None, headless=False): if mount_info is None: mount_info = get_existing_mount_info(path) # Create directory if it does not exist if not os.path.exists(path): os.makedirs(path) # Mount SSH file system endpoint = mount_info["endpoint"] command = [ "sshfs", endpoint, path, ] options = [] if headless: options.append("password_stdin") if len(options) > 0: command += ["-o", ",".join(options)] mount_process = subprocess.run( command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=cloud_connection_timeout, input=b"\n" if headless else None, ) check_process_return(mount_process, "Mounting failed")
[docs] def mount_nfs_with_mount_command(path, mount_info=None, headless=False): mount_with_mount_command("nfs", path, mount_info, headless=headless)
[docs] def mount_smb_with_mount_command(path, mount_info=None, headless=False): mount_with_mount_command("cifs", path, mount_info, headless=headless)
[docs] def mount_with_mount_command(mount_command_type, path, mount_info, headless=False): if mount_info is None: mount_info = get_existing_mount_info(path) # Create directory if it does not exist if not os.path.exists(path): os.makedirs(path) # Mount using "mount" command endpoint = mount_info["endpoint"] command = [ "sudo", "mount", "-t", mount_command_type, endpoint, path, ] if headless: command.insert(1, "-S") # read password from stdin options = [] if "username" in mount_info: options.append(f"username={mount_info['username']}") if "password" in mount_info: options.append(f"password={mount_info['password']}") elif headless: options.append("password=''") if "domain" in mount_info: options.append(f"domain={mount_info['domain']}") if len(options) > 0: command.extend(["-o", ",".join(options)]) mount_process = subprocess.run( command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=cloud_connection_timeout, input=b"\n" if headless else None, ) check_process_return(mount_process, "Mounting failed")
[docs] def unmount(path, headless=False): command = ["umount", path] # Check if mounting method requires sudo config = get_cloud_info() path_relative_to_root = get_path_relative_to_root(path) mount_system = config.get(path_relative_to_root, {}).get("system", "") sudo = mount_system in ["mount"] if sudo: command.insert(0, "sudo") if headless: command.insert(1, "-S") # read password from stdin unmount_process = subprocess.run( command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, input=b"\n" if headless else None, ) check_process_return(unmount_process, "Unmounting failed")
[docs] def convert_local_to_cloud(local_path, mount_info): """Copy local content to cloud, unmount temp cloud and mount at final location""" temp_path = tempfile.mkdtemp() logger.info("Mounting to temporary location...") mount(temp_path, mount_info) logger.info("Copying local content to cloud...") rsync(local_path, temp_path) os.system(f"rm -rf {local_path}") logger.info("Unmounting temporary cloud...") unmount(temp_path) os.rmdir(temp_path) logger.info("Mounting cloud at final location...") mount(local_path, mount_info)
[docs] def convert_cloud_to_cloud(local_path, mount_info_prev, mount_info_new): raise NotImplementedError("Not implemented. Please convert to local first.")
[docs] def add_global_mount_info(mount_info): """Use mount_id to retrieve keys from user home's .solidipes directory. Keys already present in mount_info are not replaced. If one key is not found, no error is raised. Error should happen later when trying to mount. """ if "mount_id" not in mount_info: return # Retrieve user info mount_id = mount_info["mount_id"] user_config = get_cloud_info(user=True) if mount_id not in user_config: # and len(missing_keys) > 0: logger.warning(f'Mount information for "{mount_id}" not found in user\'s .solidipes directory.') return user_mount_info = user_config[mount_id].copy() user_mount_info.update(mount_info) mount_info.update(user_mount_info) logger.debug(mount_info)
[docs] def remove_keys_from_info(mount_info): """Remove keys from info and generate mount_id if necessary""" mount_type = mount_info["type"] key_names = key_names_per_mount_type.get(mount_type, None) if key_names is None: return # Retrieve user info mount_id = get_mount_id(mount_info) user_config = get_cloud_info(user=True) # Remove keys from current config, and add "removed_keys" entry removed_keys = {} for key_name in key_names: if key_name in mount_info: removed_keys[key_name] = mount_info.pop(key_name) if "removed_keys" not in mount_info: mount_info["removed_keys"] = [] mount_info["removed_keys"].append(key_name) # Save keys in user config (if does not already exist) if mount_id not in user_config and len(removed_keys) > 0: user_config[mount_id] = removed_keys set_cloud_info(user_config, user=True)
[docs] def rsync(source_dir, target_dir, delete=False): args = [ "rsync", "-rlv", # recursive, links, verbose, cannot use -a with juicefs source_dir.rstrip("/") + "/", target_dir, ] if delete: args.append("--delete") rsync_process = subprocess.run( args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) check_process_return(rsync_process, "Rsync failed")
[docs] def list_mounts(only_mounted=False): """Get config expressed relative to working directory, with mount status""" config = get_cloud_info() mounts = {} for local_path, mount_info in config.items(): local_path_relative_to_workdir = get_path_relative_to_workdir(local_path) mount_info["mounted"] = os.path.ismount(local_path_relative_to_workdir) if only_mounted and not mount_info["mounted"]: continue mounts[local_path_relative_to_workdir] = mount_info return mounts
[docs] def mount_all(headless=False, allow_root=False): """Mount all mounts that are not already mounted""" mounts = list_mounts() for local_path, mount_info in mounts.items(): if mount_info["mounted"]: continue logger.info(f"Mounting {local_path}...") try: add_global_mount_info(mount_info) mount(local_path, mount_info, headless=headless, allow_root=allow_root) except Exception as e: logger.error(f"Abort after raising {type(e)} {e}") raise e logger.info("Mount All: Done!")
[docs] def unmount_all(headless=False): """Unmount all mounted mounts""" mounts = list_mounts(only_mounted=True) for local_path in mounts.keys(): logger.info(f"Unmounting {local_path}...") try: unmount(local_path, headless=headless) except Exception as e: logger.error(f"{e}")