Source code for solidipes.reports.widgets.file_list

import fnmatch
import os

from datasize import DataSize
from st_aggrid import AgGrid, GridOptionsBuilder, JsCode

from solidipes.loaders.file import File, load_file
from solidipes.loaders.file_sequence import FileSequence
from solidipes.loaders.mime_types import get_possible_extensions, get_possible_mimes
from solidipes.reports.widgets.utils import FileWrapper
from solidipes.utils import logging, rename_file

from .solidipes_widget import SolidipesWidget as SPW

################################################################
print = logging.invalidPrint
logger = logging.getLogger()
################################################################

error_cell_renderer = JsCode("""
    class ErrorCellRenderer {
        init(params) {
            if (!params.value) {
                this.eGui = document.createElement("span");
                this.eGui.innerText = "";
                return;
            }

            this.eGui = document.createElement("div");
            this.eGui.innerText = params.value;
        }

        getGui() {
            return this.eGui;
        }
    }
""")


url_cell_renderer = JsCode("""
    class UrlCellRenderer {
        init(params) {
            if (!params.value) {
                this.eGui = document.createElement("span");
                this.eGui.innerText = "";
                return;
            }

            this.eGui = document.createElement("a");
            this.eGui.innerText = "View File";
            this.eGui.setAttribute("href", "");

            let parentLocation = window.parent.location;
            let parentUrl = parentLocation.origin + parentLocation.pathname;
            let url = parentUrl + params.value;

            this.eGui.addEventListener("click", _ => {
                parent.window.open(url, "_self");
            });
            // Using href does not work because inside an iframe
            // this.eGui.setAttribute("href", url);
            // this.eGui.setAttribute("target", "_parent");
        }

        getGui() {
            return this.eGui;
        }
    }
""")


file_size_aggregator = JsCode("""
    function(params) {
        let totalSize = params.values.reduce((total, value) => total + value.value, 0);

        const units = ["B", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "ZiB", "YiB"];
        let i = 0;
        let displaySize = totalSize;

        for (; i < units.length; i++) {
            if (displaySize < 1024) {
                break;
            }
            displaySize /= 1024;
        }

        return {
            value: totalSize,
            display: `${Math.round(displaySize * 100) / 100}${units[i]}`,
        };
    }
""")


file_size_comparator = JsCode("""
    function(value1, value2, node1, node2, isDescending) {
        return (value1?.value - value2?.value) || 1;
    }
""")


file_size_value_formatter = JsCode("""
    function(params) {
        return params.value?.display;
    }
""")


status_aggregator = JsCode("""
    function(params) {
        let valid = true;
        let message = false;

        for (let value of params.values) {
            if (value.includes("🚫")) {
                valid = false;
                // params?.rowNode?.setExpanded(true);
            }
            if (value.includes("✉️")) {
                message = true;
            }
        }

        let status = valid ? "✅" : "🚫";
        if (message) {
            status += " ✉️";
        }

        return status;
    }
""")

unique_aggregator = JsCode("""
    function(params) {

        let vals = [];
        for (let val of params.values) {
          if (!val) continue;
          if ((typeof val === "Object") && !('current' in val)) val = val.current;
          if (!(vals.includes(val))) vals.push(val);
        }
        if (vals.length != 1) return;
        return params.values[0];
    }
""")


url_aggregator = JsCode("""
    function(params) {
        let paths = [];
        let path = null;
        let loader = null;
        for (let value of params.values) {
          if (!value) continue;
          let _options = value.split('?page=display_page').pop();
          _options = _options.split('&');
          options = {};
          for (let opt of _options) {
            if (!opt){
              continue;
            }
            let key = opt.split('=')[0];
            let val = opt.split('=')[1];
            options[key] = val;
          }
          if (!('group' in options)){
            continue;
          }
          paths.push(options.path);
          path = options.group;
          loader = options.group_loader;
        }
        if (!path) return;
        let url = '?page=display_page&file=' + path + "&paths=" + paths.join(',') + '&loader=' + loader;

        return url;
    }
""")


extension_value_formatter = JsCode("""
    function(params) {
        return params.value?.current;
    }
""")


extension_cell_editor_values = JsCode("""
    function(params) {
        let initial = params.value?.initial || "";
        let possible = params.value?.possible || [];
        return possible.map(value => ({current: value, initial, possible: possible}));
    }
""")


extension_cell_editor_format_value = JsCode("""
    function(params) {
        return params?.current;
    }
""")


extension_comparator = JsCode("""
    function(value1, value2, node1, node2, isDescending) {
        return (value1?.current.localeCompare(value2?.current)) || 1;
    }
""")


[docs] class FileList(SPW): def __init__(self, all_found_files=[], show_curation_cols=True, **kwargs): super().__init__(**kwargs) self.file_wildcard = self.layout.text_input("Filtering file pattern", value="*") cols = self.layout.columns(2) self.show_only_error = cols[0].checkbox("Show only files with errors") self.full_scan = cols[1].checkbox("Perform a full scan (slower)") self.display_files(all_found_files, show_curation_cols) self.current_dir_layout = None
[docs] def file_as_dict(self, e, group=None, loader=None): path = os.path.basename(e.file_info.path) dir_path = os.path.dirname(e.file_info.path) if dir_path.startswith("." + os.sep): dir_path = dir_path[2:] dir_list = dir_path.split(os.sep) if dir_list == ["."]: dir_list = [] dir_dict = {f"Directory_{i}": "📁 " + dir_list[i] for i in range(len(dir_list))} if group is not None: dir_dict[f"Directory_{len(dir_list)}"] = "📦 " + group if isinstance(e.f, FileSequence): path = e.f.path if isinstance(e.f, FileSequence): file_size = e.total_size else: file_size = e.file_info.size file_type = e.file_info.type.strip() human_readable_file_size = f"{DataSize(file_size):.2a}" if e.state.valid and (not e.discussions or e.archived_discussions): valid = "✅" else: valid = "🚫" if e.discussions: valid += " ✉️" current_extension = e.file_info.extension possible_extensions = get_possible_extensions(e.file_info.type) possible_mimes = get_possible_mimes(current_extension) if group is not None: url = f"?page=display_page&file={e.path}&path={path}&group={os.path.join('./', dir_path, group)}" url += f"&group_loader={loader}" elif isinstance(e.f, FileSequence): p = os.path.basename(e.path) path = os.path.basename(path) url = f"?page=display_page&file={p}&paths={','.join(e.f.paths)}&loader=FileSequence" path = "📦 " + path else: url = f"?page=display_page&file={e.path}" file_dict = { "Status": valid, "Filename": path, "Path": e.file_info.path, "Extension": {"current": current_extension, "initial": current_extension, "possible": possible_extensions}, "Type": {"current": file_type, "initial": file_type, "possible": possible_mimes}, "Size": {"value": file_size, "display": human_readable_file_size}, "Open": url, "Errors": "\n".join(e.errors), } if group is not None: file_dict["Group"] = group file_dict.update(dir_dict) return file_dict
[docs] def display_files(self, files, show_curation_cols): import pandas as pd bar = self.progress_layout.progress(0, text="Loading files") n_files = len(files) _files = [] for i, (full_path, f) in enumerate(files): percent_complete = i * 100 // n_files bar.progress(percent_complete + 1, text=f"Listing {full_path}") if isinstance(f, File) or isinstance(f, FileSequence): f = FileWrapper(f) f.state.valid = f.valid_loading if not fnmatch.fnmatch(f.file_info.path.lower(), self.file_wildcard): logger.info(f"Exclude {f.file_info.path.lower()}") continue if self.show_only_error and f.state.valid: continue if isinstance(f.f, FileSequence): package_path = os.path.basename(f.f.path) if self.full_scan: for path in f.f._paths: _sub_f = load_file(path) _sub_f = FileWrapper(_sub_f) _sub_f.state.valid = _sub_f.valid_loading _files.append(self.file_as_dict(_sub_f, group=package_path, loader=f.f.__class__.__name__)) else: _files.append(self.file_as_dict(f)) else: _files.append(self.file_as_dict(f)) _files = pd.DataFrame(_files) # import streamlit as st # st.write(_files) if len(_files) and _files["Status"].str.contains("🚫").any(): from solidipes.utils import remove_completed_stage remove_completed_stage(1) else: from solidipes.utils import add_completed_stage add_completed_stage(1) dir_columns = [col for col in _files.columns if col.startswith("Directory")] grid_builder = GridOptionsBuilder.from_dataframe(_files) grid_builder.configure_column( "Status", aggFunc="status_aggregator", ) grid_builder.configure_column( "Open", aggFunc="url_aggregator", ) grid_builder.configure_column( "Extension", aggFunc="unique_aggregator", ) grid_builder.configure_column( "Type", aggFunc="unique_aggregator", ) grid_builder.configure_column( "Errors", aggFunc="unique_aggregator", ) grid_builder.configure_column( "Path", hide=True, ) grid_builder.configure_column( "Extension", cellEditor="agRichSelectCellEditor", cellEditorParams={ "formatValue": extension_cell_editor_format_value, "values": extension_cell_editor_values, "allowTyping": True, "filterList": True, }, comparator=extension_comparator, editable=True, valueFormatter=extension_value_formatter, ) grid_builder.configure_column( "Type", cellEditor="agRichSelectCellEditor", cellEditorParams={ "formatValue": extension_cell_editor_format_value, "values": extension_cell_editor_values, "allowTyping": True, "filterList": True, }, comparator=extension_comparator, editable=True, valueFormatter=extension_value_formatter, ) grid_builder.configure_column( "Size", # Putting aggFunc=file_size_aggregator directly fails when editing the grid aggFunc="file_size_aggregator", comparator=file_size_comparator, valueFormatter=file_size_value_formatter, ) grid_builder.configure_column( "Open", cellRenderer=url_cell_renderer, ) grid_builder.configure_column( "Errors", cellRenderer=error_cell_renderer, ) for col in dir_columns: grid_builder.configure_column( col, hide=True, rowGroup=True, ) grid_builder.configure_column( "Group", hide=True, ) if not show_curation_cols: for col in ["Status", "Extension", "Type", "Open", "Errors"]: grid_builder.configure_column( col, hide=True, ) grid_builder.configure_columns("Filename", wrapText=True) grid_builder.configure_columns("Errors", wrapText=True) grid_builder.configure_columns("Errors", autoHeight=True) grid_options = grid_builder.build() grid_options["aggFuncs"] = { "unique_aggregator": unique_aggregator, "status_aggregator": status_aggregator, "file_size_aggregator": file_size_aggregator, "url_aggregator": url_aggregator, } grid_options["autoGroupColumnDef"]["headerName"] = "Directory" if show_curation_cols: grid_options["autoSizeStrategy"] = {"type": "fitCellContents"} # grid_options["autoSizeStrategy"] = {"type": "fitGridWidth"} else: grid_options["autoSizeStrategy"] = {"type": "fitGridWidth"} # grid_options["domLayout"] = "autoHeight" # Bugged: initial height is sometimes too small grid_options["groupAllowUnbalanced"] = True # grid_options["groupDefaultExpanded"] = 0 grid_options["isGroupOpenByDefault"] = JsCode(""" function (params){ if (params.key.includes("📦")){ return false; } return true; } """) # if self.show_only_error: # grid_options["groupDefaultExpanded"] = -1 # elif not show_curation_cols: # grid_options["groupDefaultExpanded"] = 1 # else: # grid_options["groupDefaultExpanded"] = 2 grid_options["suppressAggFuncInHeader"] = True grid_return = AgGrid(_files, gridOptions=grid_options, allow_unsafe_jscode=True) new_grid_data = grid_return["data"] self.rename_files(new_grid_data) self.change_mime_files(new_grid_data) self.progress_layout.empty()
[docs] def rename_files(self, new_grid_data): if len(new_grid_data) == 0: return from streamlit.components.v1 import html from solidipes.reports.web_report import clear_session_state extensions = new_grid_data["Extension"] changed_extensions = extensions.apply(lambda x: x["current"] != x["initial"]) files_to_rename = new_grid_data[changed_extensions] if files_to_rename.empty: return self.layout.write("Renaming files...") for _, file in files_to_rename.iterrows(): current_path = file["Path"] new_extension = file["Extension"]["current"] new_path = os.path.splitext(current_path)[0] + "." + new_extension rename_file(current_path, new_path) # Reload file list clear_session_state() html(""" <script type = "text/javascript" > window.parent.location.reload(); </script > """)
[docs] def change_mime_files(self, new_grid_data): if len(new_grid_data) == 0: return from streamlit.components.v1 import html from solidipes.reports.web_report import clear_session_state new_type = new_grid_data["Type"] changed_type = new_type.apply(lambda x: x["current"] != x["initial"]) files_to_retype = new_grid_data[changed_type] if files_to_retype.empty: return self.layout.write("Renaming files...") for _, file in files_to_retype.iterrows(): from solidipes.utils import get_mimes, set_mimes current_path = file["Path"] new_mime = file["Type"]["current"] mimes = get_mimes() mimes[current_path] = new_mime set_mimes(mimes) # Reload file list clear_session_state() html(""" <script type = "text/javascript" > window.parent.location.reload(); </script > """)