Source code for rxnDB.app

#######################################################
## .0. Load Libraries                            !!! ##
#######################################################
import pandas as pd
import plotly.graph_objects as go
from faicons import icon_svg
from shiny import App, Inputs, Outputs, Session, reactive, render, ui
from shinywidgets import render_plotly

from rxnDB.data.loader import RxnDBLoader
from rxnDB.data.processor import RxnDBProcessor
from rxnDB.ui import configure_ui
from rxnDB.utils import app_dir
from rxnDB.visualize import RxnDBPlotter

#######################################################
## .1. Init Data                                 !!! ##
#######################################################
try:
    in_data = app_dir / "data" / "cache" / "rxnDB.parquet"
    rxnDB_df = RxnDBLoader.load_parquet(in_data)
    processor = RxnDBProcessor(rxnDB_df)
except FileNotFoundError:
    raise FileNotFoundError(f"Error: Data file not found at {in_data.name}!")
except Exception as e:
    raise RuntimeError(f"Error loading or processing data: {e}!")

#######################################################
## .2. Init UI                                   !!! ##
#######################################################
try:
    app_ui: ui.Tag = configure_ui()
except Exception as e:
    raise RuntimeError(f"Error loading shinyapp UI: {e}!")


#######################################################
## .4. Server Logic                              !!! ##
#######################################################
[docs]def server(input: Inputs, output: Outputs, session: Session) -> None: """Server logic.""" # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Reactive state values # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ _rv_toggle_data_type = reactive.value("all") _rv_toggle_similar_reactions = reactive.value("off") _rv_selected_chemical_system = reactive.value(["Al", "O", "Si"]) _rv_selected_phase_abbrevs = reactive.value(set()) _rv_selected_table_columns = reactive.value( ["unique_id", "reaction", "type", "reference"] ) _rv_selected_table_rows = reactive.value([]) _rv_selected_temperature_units = reactive.value("celcius") _rv_selected_pressure_units = reactive.value("gigapascal") _rv_group_display_modes = reactive.value({}) _rv_ui_initialized = reactive.value(False) # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Initialization # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @reactive.effect def _re_initialize_once() -> None: """Initialize app state once at startup.""" if not _rv_ui_initialized(): _re_initialize_defaults() _rv_ui_initialized.set(True) # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ def _re_initialize_defaults() -> None: """Initialize default values for groups if not already set.""" checkbox_groups = processor.get_all_group_names() current_display_modes = _rv_group_display_modes().copy() changed = False for group in checkbox_groups: if group not in current_display_modes: current_display_modes[group] = "name" changed = True if changed: _rv_group_display_modes.set(current_display_modes) # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Helper functions for phase selection management # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ def _convert_phase_abbrevs_to_selected_boxes( phases: set[str], display_mode: str, boxes: list[str] ) -> set[str]: """Convert phase abbreviations to the current display boxes.""" if not phases: return set() selections = set() if display_mode == "abbreviation": selections = phases.intersection(boxes) elif display_mode == "name": for abbrev in phases: name = set(processor.get_phase_name_from_abbrev(abbrev)) selections.update(name.intersection(boxes)) elif display_mode == "formula": for abbrev in phases: formula = set(processor.get_phase_formula_from_abbrev(abbrev)) selections.update(formula.intersection(boxes)) return selections # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ def _convert_selected_boxes_to_phase_abbrevs( selections: list[str], display_mode: str ) -> set[str]: """Convert selected boxes back to phase abbreviations.""" if not selections: return set() abbrevs = set() for box in selections: if display_mode == "abbreviation": abbrevs.add(box) elif display_mode == "name": abbrev = processor.get_phase_abbrev_from_name(box) if abbrev: abbrevs.update(abbrev) elif display_mode == "formula": abbrev = processor.get_phase_abbrev_from_formula(box) if abbrev: abbrevs.update(abbrev) return abbrevs # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Reactive UI components # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @render.ui def sidebar_chemical_system_ui() -> ui.Tag: """Render sidebar chemical sytem UI.""" if not _rv_ui_initialized(): return ui.div("Loading ...") components = processor.get_all_chemical_components() with reactive.isolate(): selected_chemical_system = _rv_selected_chemical_system() return ui.input_selectize( "selected_chemical_system", "Filter by chemical system", choices={element: element for element in sorted(components)}, selected=list(selected_chemical_system), multiple=True, ) # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @render.ui def sidebar_checkbox_ui() -> ui.Tag: """Render sidebar checkbox UI.""" if not _rv_ui_initialized(): return ui.div("Loading ...") checkbox_groups = processor.get_all_group_names() if not checkbox_groups: return ui.div("No phase groups available ...") with reactive.isolate(): current_display_modes = _rv_group_display_modes() current_phases = _rv_selected_phase_abbrevs() current_chemical_system = _rv_selected_chemical_system() ui_elements = [] for group in checkbox_groups: # Stable UI IDs group_id = processor.get_group_id(group) id_radio = f"mode_{group_id}" id_boxes = f"boxes_{group_id}" display_mode = current_display_modes.get(group, "name") boxes = processor.get_grouped_phases( group, current_chemical_system, display_mode ) selections = _convert_phase_abbrevs_to_selected_boxes( current_phases, display_mode, boxes ) display_mode_ui = ui.input_radio_buttons( id_radio, "Display Mode", choices=["abbreviation", "name", "formula"], selected=display_mode, inline=False, ) popover_icon = ui.span( icon_svg("gear"), class_="sidebar-popover-icon", ) popover_ui = ui.popover( popover_icon, ui.div( display_mode_ui, class_="sidebar-popover-radio-btns", ), title=f"{group} Settings", placement="top", ) checkbox_group_ui = ui.input_checkbox_group( id_boxes, None, choices=sorted(boxes), selected=list(selections), ) popover_container = ui.div(popover_ui, class_="sidebar-popover-container") panel_container = ui.div( checkbox_group_ui, popover_container, class_="sidebar-panel-container" ) ui_elements.append( ui.accordion_panel( group, panel_container, value=group, ) ) return ui.accordion(*ui_elements, id="accordion") # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @render.ui def table_column_selector_ui(): """Generate table column selector UI (only re-renders on initialization).""" if not _rv_ui_initialized(): return columns = processor.data.columns return ( ui.input_selectize( "select_table_columns", "Select table columns", {col: col for col in columns}, multiple=True, ), ) # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # UI event handlers # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @reactive.effect def _handle_sidebar_ui_changes(): """Handle sidebar UI changes.""" if not _rv_ui_initialized(): return checkbox_groups = processor.get_all_group_names() current_display_modes = _rv_group_display_modes().copy() current_chemical_system = _rv_selected_chemical_system().copy() with reactive.isolate(): current_selected_phase_abbrevs = _rv_selected_phase_abbrevs() new_chemical_system = list(input["selected_chemical_system"]()) chemical_system_state_change = False if new_chemical_system is not None: if current_chemical_system != new_chemical_system: current_chemical_system = new_chemical_system chemical_system_state_change = True display_mode_state_change = False for group in checkbox_groups: group_id = processor.get_group_id(group) id_radio = f"mode_{group_id}" new_display_mode = input[id_radio]() if new_display_mode is not None: if current_display_modes.get(group) != new_display_mode: current_display_modes[group] = new_display_mode display_mode_state_change = True new_boxes = processor.get_grouped_phases( group, new_chemical_system, new_display_mode ) new_selections = _convert_phase_abbrevs_to_selected_boxes( current_selected_phase_abbrevs, new_display_mode, new_boxes ) ui.update_checkbox_group( id=f"boxes_{group_id}", choices=sorted(new_boxes), selected=list(new_selections), ) if display_mode_state_change: _rv_group_display_modes.set(current_display_modes) if chemical_system_state_change: _rv_selected_chemical_system.set(current_chemical_system) # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @reactive.effect def _handle_phase_selections(): """ Handle phase selection changes from checkbox groups and update the central _rv_selected_phase_abbrevs state. """ if not _rv_ui_initialized(): return # Get phase groups checkbox_groups = processor.get_all_group_names() # Get reactive state values with reactive.isolate(): current_display_modes = _rv_group_display_modes() # Check for state change and update central list of selected phases newly_selected_phase_abbrevs = set() for group in checkbox_groups: group_id = processor.get_group_id(group) id_boxes = f"boxes_{group_id}" selected_boxes = input[id_boxes]() if selected_boxes is not None: display_mode = current_display_modes.get(group, "name") phase_abbrevs = _convert_selected_boxes_to_phase_abbrevs( list(selected_boxes), display_mode ) newly_selected_phase_abbrevs.update(phase_abbrevs) # Update the central reactive value if the set of selected abbreviations has changed if newly_selected_phase_abbrevs != _rv_selected_phase_abbrevs(): _rv_selected_phase_abbrevs.set(newly_selected_phase_abbrevs) # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Global toggle event listeners # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @reactive.effect @reactive.event(input.toggle_similar_reactions) def _re_toggle_similar_reactions() -> None: """Toggles similar reactions mode.""" if not _rv_ui_initialized(): return if _rv_toggle_similar_reactions() == "off": _rv_toggle_similar_reactions.set("and") elif _rv_toggle_similar_reactions() == "and": _rv_toggle_similar_reactions.set("or") else: _rv_toggle_similar_reactions.set("off") # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @reactive.effect @reactive.event(input.toggle_data_type) def _re_toggle_data_type() -> None: """Toggles data type mode.""" if not _rv_ui_initialized(): return if _rv_toggle_data_type() == "all": _rv_toggle_data_type.set("phase_boundary") elif _rv_toggle_data_type() == "phase_boundary": _rv_toggle_data_type.set("calibration") elif _rv_toggle_data_type() == "calibration": _rv_toggle_data_type.set("melting_curve") else: _rv_toggle_data_type.set("all") # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @reactive.effect @reactive.event(input.clear_table_row_selection) def _re_clear_table_row_selections() -> None: """Clears table selections.""" if not _rv_ui_initialized(): return _rv_selected_table_rows.set([]) # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @reactive.effect def _re_update_pressure_units(): """Updates selected pressure units and stores the previous value.""" if not _rv_ui_initialized(): return selected_pressure_units = input.select_pressure_units() if ( selected_pressure_units and selected_pressure_units != _rv_selected_pressure_units() ): _rv_selected_pressure_units.set(selected_pressure_units) # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @reactive.effect def _re_update_temperature_units(): """Updates selected temperature units and stores the previous value.""" if not _rv_ui_initialized(): return selected_temperature_units = input.select_temperature_units() if ( selected_temperature_units and selected_temperature_units != _rv_selected_temperature_units() ): _rv_selected_temperature_units.set(selected_temperature_units) # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @reactive.effect def _update_chemical_system(): """Handle chemical system changes.""" if not _rv_ui_initialized(): return selected_chemical_system = list(input.selected_chemical_system()) if selected_chemical_system: _rv_selected_chemical_system.set(selected_chemical_system) else: _rv_selected_chemical_system.set([]) # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Table selection event listeners # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @reactive.effect def _re_update_table_column_selections() -> None: """Updates selected table columns.""" if not _rv_ui_initialized(): return selected_columns = input.select_table_columns() if selected_columns: _rv_selected_table_columns.set(list(selected_columns)) else: _rv_selected_table_columns.set( ["unique_id", "reaction", "type", "reference"] ) # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @reactive.effect @reactive.event(input.table_selected_rows) def _re_update_table_row_selections() -> None: """Updates selected table rows.""" if not _rv_ui_initialized(): return indices = input.table_selected_rows() if indices: current_table_df = rc_get_table_data() if not current_table_df.empty: valid_indices = [i for i in indices if i < len(current_table_df)] if valid_indices: ids = current_table_df.iloc[valid_indices]["unique_id"].tolist() _rv_selected_table_rows.set(ids) else: _rv_selected_table_rows.set([]) else: _rv_selected_table_rows.set([]) else: _rv_selected_table_rows.set([]) # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Reactive calculations for data filtering # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @reactive.calc def rc_get_table_data() -> pd.DataFrame: """Get data for table widget.""" df = rc_get_filtered_data() selected_columns = _rv_selected_table_columns() array_columns = [ "T", "P", "T_uncertainty", "P_uncertainty", "lnK", "lnk_uncertainty", ] if not df.empty: if any(col in selected_columns for col in array_columns): return df[selected_columns].round(2).reset_index(drop=True) else: return ( df[selected_columns] .drop_duplicates(subset="unique_id") .reset_index(drop=True) ) else: return pd.DataFrame(columns=selected_columns) # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @reactive.calc def rc_get_plotly_data() -> pd.DataFrame: """Get data for Plotly widget.""" df = rc_get_filtered_data() selected_table_rows = _rv_selected_table_rows() find_similar_mode = _rv_toggle_similar_reactions() data_type = _rv_toggle_data_type() if selected_table_rows: if find_similar_mode != "off": reactants = processor.get_reactant_abbrevs_from_ids(selected_table_rows) products = processor.get_product_abbrevs_from_ids(selected_table_rows) if reactants or products: df = processor.filter_by_reactants_and_product_abbrevs( list(reactants), list(products), method=str(find_similar_mode), ) df = convert_units(df) else: df = pd.DataFrame(columns=df.columns) else: df = df[df["unique_id"].isin(selected_table_rows)] if data_type == "all": return df elif data_type == "phase_boundary": return ( df[df["type"] == "phase_boundary"] if "type" in df.columns else pd.DataFrame(columns=df.columns) ) elif data_type == "calibration": return ( df[df["type"] == "calibration"] if "type" in df.columns else pd.DataFrame(columns=df.columns) ) elif data_type == "melting_curve": return ( df[df["type"] == "melting_curve"] if "type" in df.columns else pd.DataFrame(columns=df.columns) ) else: return pd.DataFrame(columns=df.columns) # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ def convert_units(df: pd.DataFrame) -> pd.DataFrame: """""" current_temperature_units = _rv_selected_temperature_units() current_pressure_units = _rv_selected_pressure_units() if current_temperature_units == "celcius": df = pd.DataFrame(df.apply(processor.convert_T_to_celcius, axis=1)) elif current_temperature_units == "kelvin": df = pd.DataFrame(df.apply(processor.convert_T_to_kelvin, axis=1)) if current_pressure_units == "gigapascal": df = pd.DataFrame(df.apply(processor.convert_P_to_gigapascal, axis=1)) elif current_pressure_units == "kilobar": df = pd.DataFrame(df.apply(processor.convert_P_to_kbar, axis=1)) return df # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @reactive.calc def rc_get_filtered_data() -> pd.DataFrame: """Initial filtering based on selected phases and plot type.""" phases = _rv_selected_phase_abbrevs() if phases: df = processor.filter_by_reactants_and_product_abbrevs( list(phases), list(phases) ) else: df = pd.DataFrame(columns=processor.data.columns) _ = input.clear_table_row_selection() return convert_units(df) # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Render and update widgets # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @render.data_frame def table() -> render.DataTable: """Render table.""" _ = input.clear_table_row_selection() df = rc_get_table_data() return render.DataTable(df, height="98%", selection_mode="rows") # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @output @render_plotly def plotly() -> go.FigureWidget: """Render plotly.""" if not _rv_ui_initialized(): return go.FigureWidget() with reactive.isolate(): df = rc_get_plotly_data() current_temperature_units = _rv_selected_temperature_units() current_pressure_units = _rv_selected_pressure_units() current_dark_mode = input.dark_mode() == "dark" df = processor.add_color_keys(df) uids = [] if not df.empty and "unique_id" in df.columns: uids = df["unique_id"].unique().tolist() plotter = RxnDBPlotter(df, uids, current_dark_mode) fig = plotter.plot(current_temperature_units, current_pressure_units) # Create and return the widget widget = go.FigureWidget(fig) return widget # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @reactive.effect def _update_plotly_data() -> None: """Update plotly widget data without re-rendering.""" if not _rv_ui_initialized(): return widget = plotly.widget if widget is None: return df = rc_get_plotly_data() current_temperature_units = _rv_selected_temperature_units() current_pressure_units = _rv_selected_pressure_units() current_dark_mode = input.dark_mode() == "dark" df = processor.add_color_keys(df.copy()) uids = [] if not df.empty and "unique_id" in df.columns: uids = df["unique_id"].unique().tolist() plotter = RxnDBPlotter(df, uids, current_dark_mode) fig = plotter.plot(current_temperature_units, current_pressure_units) # Update widget in place using batch_update to prevent flickering with widget.batch_update(): widget.data = () widget.add_traces(fig.data) widget.layout.update(fig.layout) # type: ignore # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @render.text def plot_settings() -> str: """Show plot settings info""" data_type = f"Type: {_rv_toggle_data_type()}\n" similar_reactions = f"Similar rxns: {_rv_toggle_similar_reactions()}\n" selected_rows = _rv_selected_table_rows() if selected_rows: selections_str = "\n".join(selected_rows) else: selections_str = "None" table_selections = f"Table selections:\n{selections_str}" info = data_type + similar_reactions + table_selections return info
####################################################### ## .5. Shiny App !!! ## ####################################################### app: App = App(app_ui, server)