import os
from ..utils import solidipes_logging as logging
from .cloud import Mounter, get_cloud_dir_path
################################################################
print = logging.invalidPrint
logger = logging.getLogger()
################################################################
[docs]
class JuiceFSSQLiteMounter(Mounter):
"JuiceFS file system, local database"
parser_key = "juicefs_local"
[docs]
def mount(self, path, mount_info, **kwargs):
database_filename = f"{self.mount_id}.db"
database_path = os.path.join(get_cloud_dir_path(), database_filename)
database_url = f"sqlite3://{database_path}"
bucket_url = f"{mount_info['endpoint_url'].rstrip('/')}/{mount_info['bucket_name']}"
os.environ["AWS_ACCESS_KEY"] = mount_info["access_key_id"]
os.environ["AWS_SECRET_ACCESS_KEY"] = mount_info["secret_access_key"]
# Create database file and remote directory if first time mount
if not os.path.exists(database_path):
remote_dir_name = mount_info.get("remote_dir_name", self.mount_id)
cmd = [
"juicefs",
"format",
"--storage",
"s3",
"--bucket",
bucket_url,
database_url,
remote_dir_name,
]
self.run_and_check_return(cmd, fail_message="Formatting failed")
# Mount S3 bucket
cmd = [
"juicefs",
"mount",
"--background",
database_url,
path,
]
self.run_and_check_process(cmd, fail_message="Mounting failed")
################################################################
[docs]
class JuiceFSPSQLMounter(Mounter):
"JuiceFS file system, remote psql database"
parser_key = "juicefs"
[docs]
def mount(self, path, mount_info=None, allow_root=False, **kwargs):
# Create mount_id (if necessary), used to find database file
logger.info(mount_info)
if mount_info is None:
mount_info = self.get_existing_mount_info(path)
logger.info(mount_info)
# Create directory if it does not exist
if not os.path.exists(path):
os.makedirs(path)
# Create mount_id (if necessary), used to find database file
# mount_id = get_mount_id(mount_info)
database_url = mount_info["database_url"]
if not database_url.startswith("postgres://"):
raise RuntimeError(f"Inconsistent database url: {database_url}")
logger.debug(database_url)
protocol, url = database_url.split("://")
host = url.split("/")[0]
# database_name = url.split("/")[1].split("?")[0]
database_name = "dcsm"
port = 5432
if "username" in mount_info:
username = mount_info["username"]
elif "DCSM_USERNAME" in os.environ:
username = os.environ["DCSM_USERNAME"]
else:
raise RuntimeError("Cannot find DCSM username")
database_url = (
protocol
+ "://"
+ username
+ "@"
+ url
+ "/"
+ database_name
+ f"?sslmode=disable&search_path=juicefs-{self.mount_id}"
)
logger.debug(url)
logger.debug(host)
logger.debug(database_name)
if "password" in mount_info:
psql_password = mount_info["password"]
elif "DCSM_PASSWORD" in os.environ:
psql_password = os.environ["DCSM_PASSWORD"]
else:
raise RuntimeError("Cannot find DCSM password")
import argparse
psql_config = argparse.Namespace(
database=database_name, host=host, port=port, username=username, password=psql_password
)
conn = self.connect_to_postgres(psql_config)
cursor = conn.cursor()
from psycopg2 import sql
if username == "admin":
mount_info_query = sql.SQL("SELECT * from storage where mount_id = {mount_id}").format(
mount_id=sql.Literal(f"juicefs-{self.mount_id}")
)
logger.debug(mount_info_query)
cursor.execute(mount_info_query)
db_mount_info = [i for i in cursor][0]
logger.debug(db_mount_info)
_, _, _, _, access_key, secret_key = db_mount_info
else:
mount_info_query = sql.SQL("SELECT * from {username}.user_mounts where mount_id = {mount_id}").format(
username=sql.Identifier(username), mount_id=sql.Literal(f"juicefs-{self.mount_id}")
)
logger.debug(mount_info_query)
cursor.execute(mount_info_query)
db_mount_info = [i for i in cursor]
logger.info(db_mount_info)
db_mount_info = db_mount_info[0]
logger.debug(db_mount_info)
_, _, access_key, secret_key = db_mount_info
env = {"META_PASSWORD": psql_password, "AWS_ACCESS_KEY": access_key, "AWS_SECRET_ACCESS_KEY": secret_key}
logger.error(env)
env.update(os.environ)
cmd = ["juicefs", "mount", "--background"]
if allow_root:
cmd += [
"-o",
"allow_root",
]
cmd += [
database_url,
path,
]
logger.debug(cmd)
# Mount S3 bucket
self.run_and_check_return(cmd, fail_message="Mounting failed")
# Remove keys from database
cmd = [
"juicefs",
"config",
database_url,
"--access-key",
"",
"--secret-key",
"",
"--force", # Skip keys validation
]
self.run_and_check_return(cmd, fail_message="Failed to remove keys from database")
[docs]
def connect_to_postgres(self, psql_config):
import psycopg2
database_name = psql_config.database
HOST = psql_config.host
PORT = psql_config.port
ADMIN_USERNAME = psql_config.username
ADMIN_PASSWORD = psql_config.password
try:
connection = psycopg2.connect(
host=HOST,
port=PORT,
user=ADMIN_USERNAME,
password=ADMIN_PASSWORD,
database=database_name,
)
connection.autocommit = True
except Exception as e:
message = f"Error connecting to Postgres: {e}"
raise RuntimeError(message).with_traceback(e.__traceback__)
return connection