Source code for solidipes_core_plugin.utils.rclone_utils

import inspect
import json
import os
import subprocess
from typing import List

from solidipes.mounters.cloud import optional_parameter

################################################################
from solidipes.utils import solidipes_logging as logging

################################################################
print = logging.invalidPrint
logger = logging.getLogger()
################################################################


[docs] class RcloneUtils: """Wrapper to all Rclone features.""" credential_names = ["pass", "bearer_token"] def __init__(self, **kwargs) -> None: super().__init__() import shutil path = shutil.which("rclone") if not path: raise FileNotFoundError("cannot find rclone") logger.info(f"reading config: {self.rclone_config_fname()}") @optional_parameter def remote() -> str: "Remote name to use/save in the rclone config" pass
[docs] def save_rclone_config(self) -> None: if not hasattr(self, "_remote"): raise RuntimeError("remote keyword must be used") import configparser config = configparser.ConfigParser() try: config.read(self.rclone_config_fname()) except Exception: pass config[self.remote] = {} for k, v in self.make_rclone_config().items(): config[self.remote][k] = v with open(self.rclone_config_fname(), "w") as f: config.write(f)
[docs] def make_rclone_config(self): config = {} import configparser _config = configparser.ConfigParser() _config.read(self.rclone_config_fname()) if hasattr(self, "_remote") and self._remote in _config: return _config[self.remote] config["type"] = self._protocol # check dynamically built methods for parameter values if applicable for name, method in inspect.getmembers(self): # Build a rclone configuration file option_shift = {"password": "pass", "twofa": "2fa"} option_name = name if option_name in option_shift: option_name = option_shift[name] if option_name not in self.option_properties: continue logger.debug(f"option_name {option_name}") logger.debug(f"method name: {name}") value = getattr(self, name) logger.debug(f"value: {value}") properties = self.option_properties[option_name] logger.debug(f"properties: {properties}") if value is None or value == "": if properties["obscured"]: raise RuntimeError(f"option: '{name}' is mandatory here") continue # Only use rclone-related options with an actual value if properties["obscured"]: value = self.rclone_obscure(value) config[option_name] = value return config
[docs] def make_rclone_options(self): config = self.make_rclone_config() if hasattr(self, "option_properties"): option_properties = self.option_properties elif "type" in config: self._protocol = config["type"] _class = rclone_classes_per_parser_key["rclone-" + self._protocol] option_properties = _class.option_properties rclone_options = [] # check dynamically built methods for parameter values if applicable for option_name, value in config.items(): # Only use rclone-related options with an actual value if option_name not in option_properties: continue option_prefix = "-" + option_properties[option_name]["prefix"] rclone_options += [f"-{option_prefix}-{option_name}={value}"] return rclone_options
[docs] def create_command(self, rclone_cmd, headless=False) -> List[str]: rclone_options = self.make_rclone_options() logger.debug(self.make_rclone_config()) rclone_args = [ f":{self._protocol}:", "-vv", ] command = ( [ "rclone", rclone_cmd, ] + rclone_args + rclone_options ) return command
[docs] @classmethod def rclone_config_fname(cls) -> str: out, err = cls.run_and_check_return("rclone config file".split(), fail_message="config failed") path = out.split("\n")[1] logger.debug(path) return path
[docs] def check_connection(self): return self.lsd("--max-depth 0")
[docs] def ls(self, options="", **kwargs) -> None: command = self.create_command("ls", **kwargs) command += options.split() logger.debug(" ".join(command)) return self.run_and_check_return(command, fail_message="ls failed", **kwargs)
[docs] def lsd(self, options="", **kwargs) -> None: command = self.create_command("lsd", **kwargs) command += options.split() logger.debug(" ".join(command)) return self.run_and_check_return(command, fail_message="ls failed", **kwargs)
[docs] def about(self, options="", **kwargs) -> None: command = self.create_command("about", **kwargs) command += options.split() logger.debug(" ".join(command)) return self.run_and_check_return(command, fail_message="about failed", **kwargs)
[docs] def sync(self, src, dst, options="", **kwargs) -> None: self.save_rclone_config() command = ["rclone", "sync", src, dst] command += options.split() logger.info(" ".join(command)) return self.run_and_check_return(command, fail_message="about failed", **kwargs)
[docs] def mount(self, **kwargs) -> None: """Mount Rclone remote volume""" # check if can connect self.check_connection() # Create directory if it does not exist if not os.path.exists(self.path): os.makedirs(self.path) command = self.create_command("mount", headless=False) command += [self.path, "--allow-non-empty", "--daemon", "-vv"] logger.debug(" ".join(command)) self.run_and_check_return(command, fail_message="Mounting failed", **kwargs)
[docs] def rclone_obscure(self, string): """ Call rclone to encrypt a string (use for options where IsPassword is true) """ stdout, stderr = self.run_and_check_return(["rclone", "obscure", string], fail_message="Obscure failed") return stdout
################################################################ # Dynamically creates the classes for mounting with rclone # ################################################################
[docs] def create_dynamic_function(name, docstring, is_required=True): """ Factory function to: - create a function with the given name. - set its docstring - decorate it as a solidipes parameter or optional_parameter """ def dynamic_func() -> str: """Template function, will receive its actual docstring later""" return "" dynamic_func.__name__ = name dynamic_func.__doc__ = docstring from solidipes.utils.utils import optional_parameter, parameter # Apply the appropriate decorator if is_required: return parameter(dynamic_func) else: return optional_parameter(dynamic_func)
################################################################
[docs] def rclone_config_schema(): """ Call rclone to retrieve the JSON description of all available protocols, and their associated options """ try: proc = subprocess.Popen( ["rclone", "config", "providers"], stdout=subprocess.PIPE, ) except FileNotFoundError: message = "rclone not found - is it installed in your path ?" message += "\nrclone mount commands are not available" logger.error(message) return [] except subprocess.CalledProcessError: message = "Error executing rclone" logger.error(message) return [] std_output = proc.stdout.read() schema = json.loads(std_output) return schema
################################################################ rclone_classes = {} rclone_classes_per_parser_key = {} ################################################################
[docs] def build_rclone_classes(): import textwrap for protocol_object in rclone_config_schema(): protocol_name = protocol_object["Name"].replace(" ", "_") protocol_prefix = protocol_object["Prefix"] if protocol_name == "alias": continue class_name = protocol_name.capitalize() + "RcloneUtils" docstring = f"{protocol_name} filesystem (rclone)" _class = type( class_name, (RcloneUtils,), { "parser_key": f"rclone-{protocol_name}", "_protocol": protocol_prefix, "__doc__": docstring, }, ) rclone_classes[class_name] = _class option_properties = {} registered_options = set() credential_names = ["access_key_id", "secret_access_key"] # Loop over potential parameters, starting with all mandatory ones for required in [True, False]: for option in protocol_object["Options"]: prefix = "" if option["NoPrefix"] else protocol_object["Prefix"] if option["Required"] == required: option_shift = {"pass": "password", "2fa": "twofa"} option_name = option["Name"] if option_name in registered_options: continue registered_options.add(option_name) if option_name in option_shift: option_name = option_shift[option_name] option_help = option["Help"].replace("\\", "\\\\") option_help = textwrap.fill(option_help, width=79) func = create_dynamic_function(option_name, option_help, required) setattr(_class, option_name, func) option_properties[option["Name"]] = {"prefix": prefix, "obscured": option["IsPassword"]} setattr(_class, "option_properties", option_properties) if option["IsPassword"]: credential_names.append(option_name) _class.credential_names = credential_names rclone_classes_per_parser_key[_class.parser_key] = _class
################################################################ build_rclone_classes() ################################################################
[docs] def declare_subclasses(parent_class, prefix): subclasses = {} for class_name, _class in rclone_classes.items(): new_class_name = class_name.replace("Utils", prefix) # logger.info(f"declare {new_class_name}") daughter_class = type( new_class_name, (parent_class, _class), { "parser_key": _class.parser_key, "_protocol": _class._protocol, "__doc__": prefix + ": " + _class.__doc__, }, ) subclasses[new_class_name] = daughter_class return subclasses
################################################################