import fnmatch
import os
import urllib.parse
from datasize import DataSize
from st_aggrid import AgGrid, GridOptionsBuilder, JsCode
from solidipes.loaders.file import File
from solidipes.loaders.file_sequence import FileSequence
from solidipes.loaders.mime_types import get_possible_extensions, get_possible_mimes
from solidipes.reports.widgets.utils import FileWrapper
from solidipes.utils import logging, rename_file
from .solidipes_widget import SolidipesWidget as SPW
################################################################
print = logging.invalidPrint
logger = logging.getLogger()
################################################################
error_cell_renderer = JsCode("""
class ErrorCellRenderer {
init(params) {
if (!params.value) {
this.eGui = document.createElement("span");
this.eGui.innerText = "";
return;
}
this.eGui = document.createElement("div");
this.eGui.innerText = params.value;
}
getGui() {
return this.eGui;
}
}
""")
url_cell_renderer = JsCode("""
class UrlCellRenderer {
init(params) {
if (!params.value) {
this.eGui = document.createElement("span");
this.eGui.innerText = "";
return;
}
this.eGui = document.createElement("a");
this.eGui.innerText = "View File";
this.eGui.setAttribute("href", "");
let parentLocation = window.parent.location;
let parentUrl = parentLocation.origin + parentLocation.pathname;
let url = parentUrl + params.value;
this.eGui.addEventListener("click", _ => {
parent.window.open(url, "_self");
});
// Using href does not work because inside an iframe
// this.eGui.setAttribute("href", url);
// this.eGui.setAttribute("target", "_parent");
}
getGui() {
return this.eGui;
}
}
""")
file_size_aggregator = JsCode("""
function(params) {
let totalSize = params.values.reduce((total, value) => total + value.value, 0);
const units = ["B", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "ZiB", "YiB"];
let i = 0;
let displaySize = totalSize;
for (; i < units.length; i++) {
if (displaySize < 1024) {
break;
}
displaySize /= 1024;
}
return {
value: totalSize,
display: `${Math.round(displaySize * 100) / 100}${units[i]}`,
};
}
""")
file_size_comparator = JsCode("""
function(value1, value2, node1, node2, isDescending) {
return (value1?.value - value2?.value) || 1;
}
""")
file_size_value_formatter = JsCode("""
function(params) {
return params.value?.display;
}
""")
status_aggregator = JsCode("""
function(params) {
let valid = true;
let message = false;
for (let value of params.values) {
if (value.includes("🚫")) {
valid = false;
params?.rowNode?.setExpanded(true);
}
if (value.includes("✉️")) {
message = true;
}
}
let status = valid ? "✅" : "🚫";
if (message) {
status += " ✉️";
}
return status;
}
""")
extension_value_formatter = JsCode("""
function(params) {
return params.value?.current;
}
""")
extension_cell_editor_values = JsCode("""
function(params) {
let initial = params.value?.initial || "";
let possible = params.value?.possible || [];
return possible.map(value => ({current: value, initial, possible: possible}));
}
""")
extension_cell_editor_format_value = JsCode("""
function(params) {
return params?.current;
}
""")
extension_comparator = JsCode("""
function(value1, value2, node1, node2, isDescending) {
return (value1?.current.localeCompare(value2?.current)) || 1;
}
""")
[docs]
class FileList(SPW):
def __init__(self, all_found_files=[], show_curation_cols=True, **kwargs):
super().__init__(**kwargs)
self.file_wildcard = self.layout.text_input("Filtering file pattern", value="*")
self.show_only_error = self.layout.checkbox("Show only files with errors")
self.display_files(all_found_files, show_curation_cols)
self.current_dir_layout = None
[docs]
def file_as_dict(self, e):
path = os.path.basename(e.file_info.path)
dir_path = os.path.dirname(e.file_info.path)
if dir_path.startswith("." + os.sep):
dir_path = dir_path[2:]
dir_list = dir_path.split(os.sep)
if dir_list == ["."]:
dir_list = []
dir_dict = {f"Directory_{i}": "📁 " + dir_list[i] for i in range(len(dir_list))}
if isinstance(e.f, FileSequence):
path = e.f.path
if isinstance(e.f, FileSequence):
file_size = e.total_size
else:
file_size = e.file_info.size
file_type = e.file_info.type.strip()
human_readable_file_size = f"{DataSize(file_size):.2a}"
if e.state.valid and (not e.discussions or e.archived_discussions):
valid = "✅"
else:
valid = "🚫"
if e.discussions:
valid += " ✉️"
current_extension = e.file_info.extension
possible_extensions = get_possible_extensions(e.file_info.type)
possible_mimes = get_possible_mimes(current_extension)
url = f"?page=display_page&file={e.path}"
if isinstance(e.f, FileSequence):
dir_path = os.path.dirname(e.f.path)
encoded_paths = [urllib.parse.quote(os.path.relpath(p, dir_path)) for p in e.f._paths]
url += f"&loader={e.f.__class__.__name__}"
url += "&paths=" + ",".join(encoded_paths)
file_dict = {
"Status": valid,
"Filename": path,
"Path": e.file_info.path,
"Extension": {"current": current_extension, "initial": current_extension, "possible": possible_extensions},
"Type": {"current": file_type, "initial": file_type, "possible": possible_mimes},
"Size": {"value": file_size, "display": human_readable_file_size},
"Open": url,
"Errors": "\n".join(e.errors),
}
file_dict.update(dir_dict)
return file_dict
[docs]
def display_files(self, files, show_curation_cols):
import pandas as pd
bar = self.progress_layout.progress(0, text="Loading files")
n_files = len(files)
dict_files = {"root": []}
current_dir = "root"
for i, (full_path, f) in enumerate(files):
percent_complete = i * 100 // n_files
bar.progress(percent_complete + 1, text=f"Listing {full_path}")
if isinstance(f, File) or isinstance(f, FileSequence):
f = FileWrapper(f)
f.state.valid = f.valid_loading
if not fnmatch.fnmatch(f.file_info.path.lower(), self.file_wildcard):
logger.info(f"Exclude {f.file_info.path.lower()}")
continue
if self.show_only_error and f.state.valid:
continue
dict_files[current_dir].append(self.file_as_dict(f))
else:
pass
for _dir, _files in dict_files.items():
if len(_files) == 0:
continue
_files = pd.DataFrame(_files)
if _files["Status"].str.contains("🚫").any():
from solidipes.utils import remove_completed_stage
remove_completed_stage(1)
else:
from solidipes.utils import add_completed_stage
add_completed_stage(1)
dir_columns = [col for col in _files.columns if col.startswith("Directory")]
grid_builder = GridOptionsBuilder.from_dataframe(_files)
grid_builder.configure_column(
"Status",
# Putting aggFunc=status_aggregator directly fails when editing the grid
aggFunc="status_aggregator",
)
grid_builder.configure_column(
"Path",
hide=True,
)
grid_builder.configure_column(
"Extension",
cellEditor="agRichSelectCellEditor",
cellEditorParams={
"formatValue": extension_cell_editor_format_value,
"values": extension_cell_editor_values,
"allowTyping": True,
"filterList": True,
},
comparator=extension_comparator,
editable=True,
valueFormatter=extension_value_formatter,
)
grid_builder.configure_column(
"Type",
cellEditor="agRichSelectCellEditor",
cellEditorParams={
"formatValue": extension_cell_editor_format_value,
"values": extension_cell_editor_values,
"allowTyping": True,
"filterList": True,
},
comparator=extension_comparator,
editable=True,
valueFormatter=extension_value_formatter,
)
grid_builder.configure_column(
"Size",
# Putting aggFunc=file_size_aggregator directly fails when editing the grid
aggFunc="file_size_aggregator",
comparator=file_size_comparator,
valueFormatter=file_size_value_formatter,
)
grid_builder.configure_column(
"Open",
cellRenderer=url_cell_renderer,
)
grid_builder.configure_column(
"Errors",
cellRenderer=error_cell_renderer,
)
for col in dir_columns:
grid_builder.configure_column(
col,
hide=True,
rowGroup=True,
)
if not show_curation_cols:
for col in ["Status", "Extension", "Type", "Open", "Errors"]:
grid_builder.configure_column(
col,
hide=True,
)
grid_builder.configure_columns("Filename", wrapText=True)
grid_builder.configure_columns("Errors", wrapText=True)
grid_builder.configure_columns("Errors", autoHeight=True)
grid_options = grid_builder.build()
grid_options["aggFuncs"] = {
"status_aggregator": status_aggregator,
"file_size_aggregator": file_size_aggregator,
}
grid_options["autoGroupColumnDef"]["headerName"] = "Directory"
if show_curation_cols:
grid_options["autoSizeStrategy"] = {"type": "fitCellContents"}
# grid_options["autoSizeStrategy"] = {"type": "fitGridWidth"}
else:
grid_options["autoSizeStrategy"] = {"type": "fitGridWidth"}
# grid_options["domLayout"] = "autoHeight" # Bugged: initial height is sometimes too small
grid_options["groupAllowUnbalanced"] = True
grid_options["groupDefaultExpanded"] = -1
# if self.show_only_error:
# grid_options["groupDefaultExpanded"] = -1
# elif not show_curation_cols:
# grid_options["groupDefaultExpanded"] = 1
# else:
# grid_options["groupDefaultExpanded"] = 2
grid_options["suppressAggFuncInHeader"] = True
grid_return = AgGrid(_files, gridOptions=grid_options, allow_unsafe_jscode=True)
new_grid_data = grid_return["data"]
self.rename_files(new_grid_data)
self.change_mime_files(new_grid_data)
self.progress_layout.empty()
[docs]
def rename_files(self, new_grid_data):
from streamlit.components.v1 import html
from solidipes.reports.web_report import clear_session_state
extensions = new_grid_data["Extension"]
changed_extensions = extensions.apply(lambda x: x["current"] != x["initial"])
files_to_rename = new_grid_data[changed_extensions]
if files_to_rename.empty:
return
self.layout.write("Renaming files...")
for _, file in files_to_rename.iterrows():
current_path = file["Path"]
new_extension = file["Extension"]["current"]
new_path = os.path.splitext(current_path)[0] + "." + new_extension
rename_file(current_path, new_path)
# Reload file list
clear_session_state()
html("""
<script type="text/javascript">
window.parent.location.reload();
</script>
""")
[docs]
def change_mime_files(self, new_grid_data):
from streamlit.components.v1 import html
from solidipes.reports.web_report import clear_session_state
new_type = new_grid_data["Type"]
changed_type = new_type.apply(lambda x: x["current"] != x["initial"])
files_to_retype = new_grid_data[changed_type]
if files_to_retype.empty:
return
self.layout.write("Renaming files...")
for _, file in files_to_retype.iterrows():
from solidipes.utils import get_mimes, set_mimes
current_path = file["Path"]
new_mime = file["Type"]["current"]
mimes = get_mimes()
mimes[current_path] = new_mime
set_mimes(mimes)
# Reload file list
clear_session_state()
html("""
<script type="text/javascript">
window.parent.location.reload();
</script>
""")