Source code for solidipes.reports.widgets.zenodo

#!/bin/env python
################################################################
import os

import streamlit as st
import yaml
from streamlit_editable_list import editable_list

from solidipes.reports.widgets.file_list import FileList
from solidipes.scanners.scanner import ExportScanner, StreamlitProgressBar, list_files
from solidipes.utils import get_study_metadata, get_study_root_path, logging, set_study_metadata
from solidipes.utils.git_infos import GitInfos
from solidipes.utils.metadata import lang
from solidipes.utils.metadata import licences_data_or_software as licenses
from solidipes.utils.utils import get_zenodo_infos
from solidipes.utils.zenodo_utils import get_existing_deposition_identifier

from .custom_widgets import EditProgBox, EditTextBox

################################################################
print = logging.invalidPrint
logger = logging.getLogger()
################################################################


[docs] class ZenodoInfos: def __init__(self, layout): self.git_infos = GitInfos() self.layout = layout.container() self.zenodo_metadata = get_study_metadata() self.key = "zenodo_infos"
[docs] def _create_stateful_property(self, property_key): streamlit_key_template = property_key + "_{self.key}" def getter(self): streamlit_key = streamlit_key_template.format(self=self) return getattr(st.session_state, streamlit_key, False) def setter(self, value): streamlit_key = streamlit_key_template.format(self=self) st.session_state[streamlit_key] = value return property(getter, setter)
edit_mode = _create_stateful_property(None, "edit_mode") must_save = _create_stateful_property(None, "must_save")
[docs] def saveZenodoEntry(self, key, value): self.zenodo_metadata[key] = value set_study_metadata(self.zenodo_metadata)
[docs] def save_description(self, value): self.zenodo_metadata["description"] = value set_study_metadata(self.zenodo_metadata)
[docs] def show_edit_button(self): st.button("Edit metadata :pencil:", on_click=lambda: setattr(self, "edit_mode", True))
[docs] def show_title(self): st.markdown(f"## <center> {self.zenodo_metadata['title']} </center>", unsafe_allow_html=True)
[docs] def edit_title(self): st.subheader("Title") title = st.text_input("", self.zenodo_metadata["title"], key=f"title_{self.key}", label_visibility="collapsed") if self.must_save: self.saveZenodoEntry("title", title)
[docs] def format_keywords(self, keywords): return "<b>Keywords:</b> " + ", ".join(keywords)
[docs] def show_keywords(self): st.markdown(self.format_keywords(self.zenodo_metadata["keywords"]), unsafe_allow_html=True)
[docs] def edit_keywords(self): keywords_data = [[k] for k in self.zenodo_metadata["keywords"]] input_params = [ { "placeholder": "Keyword", "type": "text", "value": "", }, ] st.subheader("Keywords") keywords_data = editable_list(keywords_data, input_params, auto_save=True, key=f"keywords_{self.key}") keywords = [k[0] for k in keywords_data] if self.must_save: self.saveZenodoEntry("keywords", keywords)
[docs] def format_authors(self, authors_data): orcid_img = '<img height="15" src="https://zenodo.org/static/images/orcid.svg">' authors = [] affiliations = [] for auth in authors_data: if "affiliation" in auth: aff = auth["affiliation"].split(";") for e in aff: if e.strip() not in affiliations: affiliations.append(e.strip()) for auth in authors_data: text = "" if "orcid" in auth: text += f'<a href="https://orcid.org/{auth["orcid"]}">{orcid_img}</a> ' if "name" in auth: text += f'**{auth["name"]}**' if "affiliation" in auth: text += "$^{" aff = auth["affiliation"].split(";") aff = [affiliations.index(e.strip()) + 1 for e in aff] aff = [str(e) for e in aff] text += f'{",".join(aff)}' text += "}$" authors.append(text) formatted = "**<center> " + ", ".join(authors) + " </center>**\n" for idx, aff in enumerate(affiliations): formatted += f"<center><sup>{idx+1}</sup> <i>{aff}</i></center>\n" return formatted
[docs] def show_creators(self): st.markdown(self.format_authors(self.zenodo_metadata["creators"]), unsafe_allow_html=True)
[docs] def edit_creators(self): creators_data = [ [ a.get("name", ""), a.get("affiliation", ""), a.get("orcid", ""), ] for a in self.zenodo_metadata["creators"] ] input_params = [ { "placeholder": "Name", "type": "text", "value": "", }, { "placeholder": "Affiliations, separated by ;", "type": "text", "value": "", }, { "placeholder": "ORCID", "type": "text", "value": "", }, ] st.subheader("Authors") creators_data = editable_list(creators_data, input_params, auto_save=True, key=f"creators_{self.key}") if not self.must_save: return creators = [] for creator in creators_data: creator_dict = {} creator_dict["name"] = creator[0] if creator[1] != "": creator_dict["affiliation"] = creator[1] if creator[2] != "": creator_dict["orcid"] = creator[2] creators.append(creator_dict) for e in creators: if e["name"] == "": raise RuntimeError("An author needs mandatorily a name") self.saveZenodoEntry("creators", creators)
[docs] def show_general_metadata(self): entries = [ f"**Upload type**: {self.zenodo_metadata['upload_type']}", f"**License**: {self.zenodo_metadata['license']}", f"**Language**: {self.zenodo_metadata['language']}", ] if "doi" in self.zenodo_metadata: entries.append(f"**DOI**: {self.zenodo_metadata['doi']}") st.markdown(" \n".join(entries))
[docs] def edit_general_metadata(self): st.subheader("General Metadata") upload_type = self.edit_upload_type() license = self.edit_license() language = self.edit_language() doi = self.edit_doi() if not self.must_save: return if doi != "": self.saveZenodoEntry("doi", doi) elif "doi" in self.zenodo_metadata: del self.zenodo_metadata["doi"] self.saveZenodoEntry("upload_type", upload_type) self.saveZenodoEntry("license", license) self.saveZenodoEntry("language", language)
[docs] def edit_upload_type(self): options = [ "publication", "poster", "presentation", "dataset", "image", "video", "software", "lesson", "physicalobject", "other", ] value = self.zenodo_metadata["upload_type"] return st.selectbox("Upload type", options=options, index=options.index(value))
[docs] def edit_license(self): options = [_l[0] for _l in licenses] fmt_map = dict(licenses) value = self.zenodo_metadata["license"] return st.selectbox( "License", options=options, index=options.index(value), format_func=lambda x: fmt_map[x] + f" ({x})" )
[docs] def edit_language(self): options = [_l[0] for _l in lang] fmt_map = dict(lang) value = self.zenodo_metadata["language"] return st.selectbox("Language", options=options, index=options.index(value), format_func=lambda x: fmt_map[x])
[docs] def edit_doi(self): value = "" if "doi" in self.zenodo_metadata: value = self.zenodo_metadata["doi"] return st.text_input("DOI", value=value, placeholder="put a reserved doi if you have one")
[docs] def textbox(self, key, **kwargs): EditTextBox(self.zenodo_metadata[key], caption=key.capitalize(), key=key, **kwargs)
[docs] def description_box(self, **kwargs): desc = self.zenodo_metadata["description"] with st.expander("**Description**", expanded=True): EditProgBox(desc, language="markdown", key="description", on_apply=self.save_description, **kwargs)
[docs] def show(self): with self.layout: # Must show editable form temporarily to save new metadata erasable = st.empty() with erasable: self.show_editable() if not self.edit_mode: erasable.empty() self.show_formatted() self.description_box() self.raw_editor()
[docs] def show_formatted(self): self.show_edit_button() self.show_title() self.show_creators() self.show_keywords() self.show_general_metadata() self.show_related_identifiers()
[docs] def show_editable(self): with st.form(f"form_{self.key}"): self.edit_title() self.edit_creators() self.edit_keywords() self.edit_general_metadata() self.edit_related_identifiers() self.must_save = False st.form_submit_button("Save", on_click=self.close_editable)
[docs] def close_editable(self): self.edit_mode = False self.must_save = True
[docs] def raw_editor(self): with self.layout.expander("**Additional Raw Metadata** (Zenodo YAML format)", expanded=False): st.markdown("You can edit the metadata below") st.markdown( "*Description of the Zenodo metadata can be found" " [here](https://github.com/zenodo/developers.zenodo.org" "/blob/master/source/includes/resources/deposit/" "_representation.md#deposit-metadata)*" ) st.markdown("---") zenodo_metadata = get_study_metadata() metadata = zenodo_metadata.copy() for k in [ "title", "creators", "keywords", "language", "upload_type", "license", "description", "related_identifiers", ]: if k in metadata: del metadata[k] if metadata: zenodo_content = yaml.safe_dump(metadata) else: zenodo_content = "" def save(x): metadata = yaml.safe_load(x) zenodo_metadata.update(metadata) set_study_metadata(zenodo_metadata) EditProgBox( zenodo_content, language="yaml", disable_view=True, on_apply=lambda x: save(x), key="zenodo_raw" )
################################################################
[docs] class WebProgressBar: def __init__(self, layout, filename, size): self.layout = layout self.bar = self.layout.progress(0, text="Upload Archive to **Zenodo**") self.filename = filename self.size = size self.uploaded = 0
[docs] def close(self): self.layout.empty()
[docs] def update(self, x): self.uploaded += x percent_complete = self.uploaded * 100 // self.size self.bar.progress( percent_complete, text=f"Upload Archive to **Zenodo {percent_complete}%**", )
################################################################
[docs] class ZenodoPublish: def __init__(self, layout, global_message, progress_layout): self.layout = layout self.layout = layout.container() self.global_message = global_message self.progress_layout = progress_layout
[docs] def show(self): self.layout.markdown("") self.show_submission_panel() self.show_file_list() self.show_readme()
[docs] def _print(self, val): logger.info(val) st.session_state.zenodo_publish.append(val)
[docs] def show_submission_panel(self): with self.layout.expander("Publish in Zenodo", expanded=True): token = st.text_input("Zenodo token", type="password") zenodo_metadata = get_study_metadata() existing_identifier = False data = get_zenodo_infos() if "deposition_identifier" in data: existing_identifier = data["deposition_identifier"] if "doi" in zenodo_metadata: existing_identifier = zenodo_metadata["doi"] button_title = "Reuse existing deposition" if existing_identifier: button_title += f" ({existing_identifier})" reuse_identifier = st.checkbox(button_title, value=existing_identifier is not False) new_deposition = not reuse_identifier else: new_deposition = False if new_deposition: dry_run = st.checkbox("sandbox", value=True) else: dry_run = existing_identifier and "sandbox" in existing_identifier col1, col2 = st.columns(2) title = "Submit as draft to Zenodo sandbox" if not dry_run: title = "Submit as draft to Zenodo" col2.markdown( "**Not using sandbox will submit to the main " "Zenodo website. Please push content with caution " "as it may result in a permanent entry**" ) if not reuse_identifier: existing_identifier = False def submit(): st.session_state.zenodo_publish = [] try: self.zenodo_upload(token, existing_identifier, sandbox=dry_run, new_deposition=new_deposition) except Exception as e: self.global_message.error("upload error: " + str(e)) col1.button(title, type="primary", on_click=submit) if "zenodo_publish" in st.session_state and st.session_state.zenodo_publish: url = get_existing_deposition_identifier(".") st.markdown(f"**Deposition url**: {url}") logger.info(st.session_state.zenodo_publish) st.code("\n".join(st.session_state.zenodo_publish).replace("[94m", "").replace("[0m", ""))
[docs] def zenodo_upload(self, access_token, existing_identifier, sandbox=True, new_deposition=False): import argparse import solidipes.uploaders.zenodo as upload args = argparse.Namespace() args.access_token = access_token args.sandbox = sandbox args.directory = None args._print = self._print args.existing_identifier = existing_identifier args.new_deposition = new_deposition args.tmp_dir = "/tmp" if os.name != "nt" else "C:\\temp" args.no_cleanup = True upload.main(args, progressbar=lambda filename, size: WebProgressBar(self.progress_layout, filename, size))
[docs] def show_file_list(self): self.layout.markdown("# Archive content") scanner = ExportScanner() progress_layout = self.layout.empty() files_layout = self.layout.container() scanner.progress_bar = StreamlitProgressBar("Loading files", progress_layout) found = scanner.get_loader_tree() files = list_files(found) with files_layout: FileList( all_found_files=files, progress_layout=progress_layout, show_curation_cols=False, )
[docs] def show_readme(self): with self.layout.expander("Preview of `README.md`", expanded=True): readme_path = os.path.join(get_study_root_path(), "README.md") if not os.path.isfile(readme_path): st.markdown("No README.md file found!") return readme = open(readme_path, "r").read() st.markdown(readme)