Source code for solidipes.scripts.mount

import argparse

command = "mount"
command_help = "Mount cloud storage"


[docs] def main(args): from ..utils import bcolors, get_cloud_info, get_path_relative_to_root, set_cloud_info from ..utils.cloud import ( add_global_mount_info, convert_cloud_to_cloud, convert_local_to_cloud, list_mounts, mount, mount_all, remove_keys_from_info, ) config = get_cloud_info() # --all: mount all existing mount points if args.all: mount_all(allow_root=args.allow_root) return # --list-existing: show existing mount points if args.list_existing: mounts = list_mounts() print("Existing mount points:") for local_path, mount_info in mounts.items(): mounted_message = " mounted" if mount_info["mounted"] else "" print( f" {local_path} {bcolors.BRIGHT_GREEN}({mount_info['type']}," f" {mount_info['system']}){mounted_message}{bcolors.RESET}" ) return # Mount or convert, depending on provided and saved configuration local_path_relative_to_root = get_path_relative_to_root(args.local_path) mount_info_saved = config.get(local_path_relative_to_root, None) mount_info_provided = get_mount_info_from_args(args) if mount_info_saved is not None: if mount_info_provided is not None: if args.convert: if not args.force: print(f'Mount info for "{args.local_path}" already exists. Use --force to convert. Aborting...') return add_global_mount_info(mount_info_saved) convert_cloud_to_cloud(args.local_path, mount_info_saved, mount_info_provided) else: if not args.force: print(f'Mount info for "{args.local_path}" already exists. Use --force to replace it. Aborting...') return print("Mounting...") mount(args.local_path, mount_info_provided, allow_root=args.allow_root) else: # Mount info not provided if args.convert: print(f'No mount info provided for converting "{args.local_path}". Aborting...') return else: add_global_mount_info(mount_info_saved) print("Mounting...") mount(args.local_path, mount_info_saved, allow_root=args.allow_root) else: # Mount info not saved if mount_info_provided is not None: if args.convert: convert_local_to_cloud(args.local_path, mount_info_provided) else: print("Mounting...") mount(args.local_path, mount_info_provided, allow_root=args.allow_root) else: # Mount info not provided print("No mount info provided and no mount info saved for this directory.") return # Save config info if mount is successful if local_path_relative_to_root not in config or args.force: if not args.public_keys: remove_keys_from_info(mount_info_provided) config[local_path_relative_to_root] = mount_info_provided set_cloud_info(config) print("Mount: Done!")
[docs] def get_mount_info_from_args(args): if args.type == "s3": if args.endpoint_url.startswith("postgres://"): database_url = args.endpoint_url mount_info = { "type": "s3", "system": args.system, "database_url": database_url, "mount_id": args.mount_id, "username": args.username, "password": args.password, } elif args.endpoint_url.startswith("sqlite3://"): mount_info = { "type": "s3", "system": args.system, "endpoint_url": args.endpoint_url.rstrip("/"), "bucket_name": args.bucket_name, "access_key_id": args.access_key_id, "secret_access_key": args.secret_access_key, } else: # default behavior for backward compatibility mount_info = { "type": "s3", "system": args.system, "endpoint_url": args.endpoint_url.rstrip("/"), "bucket_name": args.bucket_name, "access_key_id": args.access_key_id, "secret_access_key": args.secret_access_key, } if args.remote_dir_name is not None: mount_info["remote_dir_name"] = args.remote_dir_name.rstrip("/") return mount_info elif args.type in ["ssh", "nfs", "smb"]: mount_info = { "type": args.type, "system": args.system, "endpoint": args.endpoint.rstrip("/"), } if getattr(args, "username", ""): mount_info["username"] = args.username if getattr(args, "password", ""): mount_info["password"] = args.password if getattr(args, "domain", ""): mount_info["domain"] = args.domain return mount_info elif args.type is None: return None else: raise ValueError(f'Unknown cloud storage type "{args.type}".')
[docs] def populate_arg_parser(parser): parser.description = command_help paths = parser.add_mutually_exclusive_group(required=True) paths.add_argument( "-p", "--local-path", help=( "Path to the directory to mount. If the directory has already been mounted before, there is no need to" " indicate other mounting parameters. If not specified, the command shows existing mounting points." ), ) paths.add_argument( "-a", "--all", action="store_true", help="Mount all existing mounting points (not already mounted).", ) paths.add_argument( "-l", "--list-existing", action="store_true", help="List existing mount points.", ) parser.add_argument( "-f", "--force", help="Replace the currently saved configuration for this directory", action="store_true", ) parser.add_argument( "-c", "--convert", action="store_true", help="Send the contents of the local directory to the cloud storage (convert it to cloud storage).", ) parser.add_argument( "--allow_root", action="store_true", help="Allow root to access the fuse mounting", ) parser.add_argument( "-k", "--public-keys", action="store_true", help=( "Save all access keys publicly in local .solidipes directory. WARNING: when published, everyone will be" " able to see your keys and will have full access (possibly write access) to your mounted directory." ), ) # Cloud types subparsers type_parsers = parser.add_subparsers(dest="type", help="Type of cloud storage to mount") s3_parser = type_parsers.add_parser("s3", help="S3 bucket") populate_s3_parser(s3_parser) ssh_parser = type_parsers.add_parser("ssh", help="remote file system through ssh") populate_ssh_parser(ssh_parser) nfs_parser = type_parsers.add_parser("nfs", help="nfs file system") populate_nfs_parser(nfs_parser) smb_parser = type_parsers.add_parser("smb", help="smb file system") populate_smb_parser(smb_parser)
[docs] def populate_s3_parser(parser): parser.description = "Mount an S3 bucket" parser.add_argument( "endpoint_url", metavar="endpoint-url", help="URL of the S3 endpoint", ) parser.add_argument( "--bucket_name", metavar="bucket-name", help="Name of the S3 bucket", ) parser.add_argument( "-u", "--username", default="", help="Username", ) parser.add_argument( "-p", "--password", default="", help="password", ) parser.add_argument( "--access_key_id", metavar="access-key-id", help="Access key ID", ) parser.add_argument( "--secret_access_key", metavar="secret-access-key", help="Secret access key", ) parser.add_argument( "--mount_id", metavar="mount-id", help="Remote juicefs mount_id", ) parser.add_argument( "-s", "--system", choices=["juicefs", "s3fs"], default="juicefs", help="System to use for mounting. Default: juicefs", ) parser.add_argument( "-n", "--remote-dir-name", help="Name of the mounted directory in the bucket. If not specified, a random unique name is attributed.", )
[docs] def populate_ssh_parser(parser): parser.description = "Mount a remote file system through ssh" parser.add_argument( "endpoint", help="[user@]host[:path]", ) parser.add_argument( "-s", "--system", choices=["sshfs"], default="sshfs", help="System to use for mounting. Default: sshfs", )
[docs] def populate_nfs_parser(parser): parser.description = "Mount an nfs file system" parser.add_argument( "endpoint", help="host:path", ) parser.add_argument( "-s", "--system", choices=["mount"], default="mount", help="System to use for mounting. Default: mount command", )
[docs] def populate_smb_parser(parser): parser.description = "Mount an smb file system" parser.add_argument( "endpoint", help="//host/path", ) parser.add_argument( "-u", "--username", default="", help="Username", ) parser.add_argument( "-s", "--system", choices=["mount"], default="mount", help="System to use for mounting. Default: mount command", )
if __name__ == "__main__": parser = argparse.ArgumentParser() populate_arg_parser(parser) args = parser.parse_args() main(args)