Source code for rxnDB.data.processor

#######################################################
## .0. Load Libraries                            !!! ##
#######################################################
import re
from dataclasses import dataclass, field

import pandas as pd
import plotly.express as px

from rxnDB.data.mapping import MINERAL_ABBREV_MAP as MAP


#######################################################
## .1. RxnDBProcessor                            !!! ##
#######################################################
[docs]@dataclass class RxnDBProcessor: df: pd.DataFrame allow_empty: bool = False color_palette: str = "Alphabet" _original_df: pd.DataFrame = field(init=False, repr=False) _uid_to_reactant_abbrevs_lookup: dict[str, set[str]] = field(init=False, repr=False) _uid_to_product_abbrevs_lookup: dict[str, set[str]] = field(init=False, repr=False) _phase_name_to_abbrev_lookup: dict[str, set[str]] = field(init=False, repr=False) _phase_group_to_abbrev_lookup: dict[str, set[str]] = field(init=False, repr=False) _phase_formula_to_abbrev_lookup: dict[str, set[str]] = field(init=False, repr=False) _phase_abbrev_to_name_lookup: dict[str, set[str]] = field(init=False, repr=False) _phase_abbrev_to_group_lookup: dict[str, set[str]] = field(init=False, repr=False) _phase_abbrev_to_formula_lookup: dict[str, set[str]] = field(init=False, repr=False) _grouped_phases_by_mode: dict[str, dict[str, set[str]]] = field( init=False, repr=False ) _reaction_groups: dict[str, int] = field( init=False, repr=False, default_factory=dict ) _color_map: dict[str, str] = field(init=False, repr=False, default_factory=dict) # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs] def __post_init__(self) -> None: """Initialize the processor and validate the DataFrame.""" if not isinstance(self.df, pd.DataFrame): raise TypeError("Input 'df' must be a pandas DataFrame.") if not self.allow_empty and self.df.empty: raise ValueError("RxnDB dataframe cannot be empty unless allow_empty=True") required_cols = [ "unique_id", "reaction", "reaction_names", "reactants", "reactant_names", "reactant_groups", "reactant_formulas", "products", "product_names", "product_groups", "product_formulas", "type", "units_P", "units_T", "T", "T_uncertainty", "P", "P_uncertainty", "plot_type", "reference", ] missing = [col for col in required_cols if col not in self.df.columns] if missing: raise ValueError(f"Missing required columns: {missing}") self._original_df = self.df.copy() self._precompute_phase_info() self._build_color_map()
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs] def _precompute_phase_info(self) -> None: """Pre-compute phase information for faster filtering.""" self._phase_name_to_abbrev_lookup = {} self._phase_group_to_abbrev_lookup = {} self._phase_formula_to_abbrev_lookup = {} self._phase_abbrev_to_name_lookup = {} self._phase_abbrev_to_group_lookup = {} self._phase_abbrev_to_formula_lookup = {} self._grouped_phases = {"abbreviation": {}, "name": {}, "formula": {}} group_order = [ "Aluminosilicates", "Silica minerals", "High-P phases", "Garnets & olivines", "Other orthosilicates", "Pyroxenes & pyroxenoids", "Amphibole", "Other chain silicates", "Feldspars & feldspathoid", "Other framework silicates", "Chlorites", "Micas", "Other sheet silicates", "Cyclosilicates", "Sorosilicates", "Carbonates", "Oxides", "Hydroxides", "Halides & sulphides", "Elements", "Gas species", "Melt species", ] group_rank = {name: i for i, name in enumerate(group_order)} self._group_rank = group_rank for abbrev, info in MAP.items(): name = f"{abbrev} ({info['name']})" group = info["group"] formula = f"{abbrev} ({info['formula']})" if name not in self._phase_name_to_abbrev_lookup: self._phase_name_to_abbrev_lookup[name] = set() self._phase_name_to_abbrev_lookup[name].add(abbrev) if group not in self._phase_group_to_abbrev_lookup: self._phase_group_to_abbrev_lookup[group] = set() self._phase_group_to_abbrev_lookup[group].add(abbrev) if formula not in self._phase_formula_to_abbrev_lookup: self._phase_formula_to_abbrev_lookup[formula] = set() self._phase_formula_to_abbrev_lookup[formula].add(abbrev) if abbrev not in self._phase_abbrev_to_name_lookup: self._phase_abbrev_to_name_lookup[abbrev] = set() self._phase_abbrev_to_name_lookup[abbrev].add(name) if abbrev not in self._phase_abbrev_to_group_lookup: self._phase_abbrev_to_group_lookup[abbrev] = set() self._phase_abbrev_to_group_lookup[abbrev].add(group) if abbrev not in self._phase_abbrev_to_formula_lookup: self._phase_abbrev_to_formula_lookup[abbrev] = set() self._phase_abbrev_to_formula_lookup[abbrev].add(formula) for mode, label in { "abbreviation": abbrev, "name": name, "formula": formula, }.items(): if group not in self._grouped_phases[mode]: self._grouped_phases[mode][group] = set() self._grouped_phases[mode][group].add(label) self._uid_to_reactant_abbrevs_lookup = {} self._uid_to_product_abbrevs_lookup = {} for _, row in self._original_df.iterrows(): uid = row["unique_id"] for reactant in row["reactants"]: if pd.notna(reactant) and isinstance(reactant, str): if reactant not in self._uid_to_reactant_abbrevs_lookup: self._uid_to_reactant_abbrevs_lookup[reactant] = set() self._uid_to_reactant_abbrevs_lookup[reactant].add(uid) for product in row["products"]: if pd.notna(product) and isinstance(product, str): if product not in self._uid_to_product_abbrevs_lookup: self._uid_to_product_abbrevs_lookup[product] = set() self._uid_to_product_abbrevs_lookup[product].add(uid)
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs] def filter_by_reactants_and_product_abbrevs( self, reactant_abbrevs: list[str], product_abbrevs: list[str], method: str = "and", ) -> pd.DataFrame: """ Filter by reactant_abbrevs and/or product_abbrevs. - If both reactants and products are provided, returns reactions matching criteria (intersection or union). - If only reactants are provided, returns reactions matching ANY of the reactants (union). - If only products are provided, returns reactions matching ANY of the products (union). - If neither is provided, returns the original dataframe. """ if not reactant_abbrevs and not product_abbrevs: return pd.DataFrame(columns=self._original_df.columns) if reactant_abbrevs and not product_abbrevs: return self.filter_by_reactant_abbrevs(reactant_abbrevs) if not reactant_abbrevs and product_abbrevs: return self.filter_by_product_abbrevs(product_abbrevs) if reactant_abbrevs and product_abbrevs: f_reactant_ids = self.get_unique_ids_from_phase_abbrevs( reactant_abbrevs, self._uid_to_reactant_abbrevs_lookup ) f_product_ids = self.get_unique_ids_from_phase_abbrevs( product_abbrevs, self._uid_to_product_abbrevs_lookup ) r_reactant_ids = self.get_unique_ids_from_phase_abbrevs( reactant_abbrevs, self._uid_to_product_abbrevs_lookup ) r_product_ids = self.get_unique_ids_from_phase_abbrevs( product_abbrevs, self._uid_to_reactant_abbrevs_lookup ) if not f_reactant_ids: return pd.DataFrame(columns=self._original_df.columns) if not f_product_ids: return pd.DataFrame(columns=self._original_df.columns) if method == "and": matching_ids = f_reactant_ids.intersection(f_product_ids).union( r_reactant_ids.intersection(r_product_ids) ) else: matching_ids = ( f_reactant_ids.union(f_product_ids) .union(r_reactant_ids) .union(r_product_ids) ) if not matching_ids: return pd.DataFrame(columns=self._original_df.columns) return self._original_df[self._original_df["unique_id"].isin(matching_ids)] return pd.DataFrame(columns=self._original_df.columns)
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs] def filter_by_reactant_abbrevs(self, phase_abbrevs: list[str]) -> pd.DataFrame: """Filter dataframe by reactant phase_abbrevs (union logic).""" if not phase_abbrevs: return self._original_df matching_ids = self.get_unique_ids_from_phase_abbrevs( phase_abbrevs, self._uid_to_reactant_abbrevs_lookup ) if not matching_ids: return pd.DataFrame(columns=self._original_df.columns) return self._original_df[self._original_df["unique_id"].isin(matching_ids)]
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs] def filter_by_product_abbrevs(self, phase_abbrevs: list[str]) -> pd.DataFrame: """Filter dataframe by product phase_abbrevs (union logic).""" if not phase_abbrevs: return self._original_df matching_ids = self.get_unique_ids_from_phase_abbrevs( phase_abbrevs, self._uid_to_product_abbrevs_lookup ) if not matching_ids: return pd.DataFrame(columns=self._original_df.columns) return self._original_df[self._original_df["unique_id"].isin(matching_ids)]
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs] def filter_by_ids(self, unique_ids: list[str]) -> pd.DataFrame: """Filter dataframe by unique IDs.""" if not unique_ids: return self._original_df return ( self._original_df[self._original_df["unique_id"].isin(unique_ids)] if "unique_id" in self._original_df.columns else pd.DataFrame(columns=self._original_df.columns) )
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs] def filter_by_type(self, types: list[str]) -> pd.DataFrame: """Filter by specific types of data.""" if not types: return self._original_df return ( self._original_df[self._original_df["type"].isin(types)] if "type" in self._original_df.columns else pd.DataFrame(columns=self._original_df.columns) )
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs] def filter_by_plot_type(self, plot_type: str) -> pd.DataFrame: """Filter by specific plot type.""" if not plot_type: return self._original_df return ( self._original_df[self._original_df["plot_type"] == plot_type] if "plot_type" in self._original_df.columns else pd.DataFrame(columns=self._original_df.columns) )
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs] def get_unique_ids_from_phase_abbrevs( self, phase_abbrevs: list[str], lookup: dict[str, set[str]] ) -> set[str]: """Get all unique IDs matching any phase in the list.""" if not phase_abbrevs: return set() matching_ids = set() for phase in phase_abbrevs: matching_ids.update(lookup.get(phase, set())) return matching_ids
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs] def get_reactant_abbrevs_from_ids(self, unique_ids: list[str]) -> set[str]: """Get unique reactants associated with a list of reaction IDs.""" if not unique_ids: return set() reactant_abbrevs = { reactant for reactant, uids in self._uid_to_reactant_abbrevs_lookup.items() if uids.intersection(unique_ids) } return reactant_abbrevs
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs] def get_product_abbrevs_from_ids(self, unique_ids: list[str]) -> set[str]: """Get unique products associated with a list of reaction IDs.""" if not unique_ids: return set() product_abbrevs = { product for product, uids in self._uid_to_product_abbrevs_lookup.items() if uids.intersection(unique_ids) } return product_abbrevs
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs] def get_phase_abbrev_from_name(self, name: str) -> set[str]: """""" return self._phase_name_to_abbrev_lookup.get(name, set())
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs] def get_phase_abbrev_from_group(self, group: str) -> set[str]: """""" return self._phase_group_to_abbrev_lookup.get(group, set())
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs] def get_phase_abbrev_from_formula(self, formula: str) -> set[str]: """""" return self._phase_formula_to_abbrev_lookup.get(formula, set())
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs] def get_phase_name_from_abbrev(self, abbrev: str) -> set[str]: """Get the common name of a phase from its abbreviation.""" return self._phase_abbrev_to_name_lookup.get(abbrev, set())
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs] def get_phase_formula_from_abbrev(self, abbrev: str) -> set[str]: """Get the chemical formula of a phase from its abbreviation.""" return self._phase_abbrev_to_formula_lookup.get(abbrev, set())
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs] def get_phase_group_from_abbrev(self, abbrev: str) -> set[str]: """Get the mineral group of a phase from its abbreviation.""" return self._phase_abbrev_to_group_lookup.get(abbrev, set())
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs] def get_phase_info_from_abbrev(self, abbrev: str) -> dict[str, str | set]: """Get all information about a phase from its abbreviation.""" if abbrev not in self._phase_abbrev_to_name_lookup: return {} return { "abbreviation": abbrev, "name": self._phase_abbrev_to_name_lookup.get(abbrev, set()), "formula": self._phase_abbrev_to_formula_lookup.get(abbrev, set()), "group": self._phase_abbrev_to_group_lookup.get(abbrev, set()), }
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs] def get_all_phase_info(self) -> dict[str, str | set]: """Get a dictionary of all phases with their complete information.""" all_phases = {} for abbrev in self._phase_abbrev_to_name_lookup: all_phases[abbrev] = self.get_phase_info_from_abbrev(abbrev) return all_phases
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs] def get_reactant_info_from_unique_id( self, unique_id: str ) -> list[dict[str, str | set]]: """Get information about all reactants for a specific reaction ID.""" if unique_id not in self._original_df["unique_id"].values: return [] row = self._original_df[self._original_df["unique_id"] == unique_id].iloc[0] reactants = row.get("reactants", []) if not isinstance(reactants, list): return [] return [ self.get_phase_info_from_abbrev(abbrev) for abbrev in reactants if abbrev in self._phase_abbrev_to_name_lookup ]
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs] def get_product_info_from_unique_id( self, unique_id: str ) -> list[dict[str, str | set]]: """Get information about all products for a specific reaction ID.""" if unique_id not in self._original_df["unique_id"].values: return [] row = self._original_df[self._original_df["unique_id"] == unique_id].iloc[0] products = row.get("products", []) if not isinstance(products, list): return [] return [ self.get_phase_info_from_abbrev(abbrev) for abbrev in products if abbrev in self._phase_abbrev_to_name_lookup.keys() ]
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs] def get_reaction_info_from_unique_id( self, unique_id: str ) -> dict[str, list[dict[str, str | set]]]: """Get comprehensive information about all phases in a reaction.""" return { "reactants": self.get_reactant_info_from_unique_id(unique_id), "products": self.get_product_info_from_unique_id(unique_id), }
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs] def get_all_group_names(self) -> list[str]: """Get all groups names.""" return list(self._group_rank.keys())
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs] def get_all_grouped_phases(self, display_mode: str) -> dict[str, list[str]]: """Get all checkbox group phases based on the display mode.""" grouped = self._grouped_phases.get(display_mode) if not grouped: raise ValueError(f"Invalid display_mode: {display_mode!r}") return { k: list(grouped[k]) for k in sorted( grouped, key=lambda g: self._group_rank.get(g, float("inf")) ) }
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs] def get_grouped_phases( self, group: str, components: list[str], display_mode: str ) -> list[str]: """Get individual checkbox group phases based on the display mode.""" if display_mode not in ["abbreviation", "name", "formula"]: raise ValueError(f"Invalid display_mode: {display_mode!r}") grouped_formulas = self._grouped_phases.get("formula", {}) group_formulas = grouped_formulas.get(group, set()) grouped_display_mode = self._grouped_phases.get(display_mode, {}) group_display_phases = grouped_display_mode.get(group, set()) matching_phase_keys = set() for phase in group_formulas: match = re.search(r"\(([^)]+)\)", phase) if match: formula = match.group(1) elements = self._extract_elements(formula) if elements.issubset(set(components)): # if all(component in elements for component in components): phase_key = phase.split("(", 1)[0].strip() matching_phase_keys.add(phase_key) result = [ box for box in group_display_phases if box.split("(", 1)[0].strip() in matching_phase_keys ] return result
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs] def _extract_elements(self, formula: str) -> set[str]: return set(re.findall(r"[A-Z][a-z]?", formula))
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs] def get_all_chemical_components(self) -> set[str]: grouped_formulas = self._grouped_phases.get("formula", {}) elements = set() for formula_strings in grouped_formulas.values(): for string in formula_strings: match = re.search(r"\(([^)]+)\)", string) if match: formula = match.group(1) elements.update(self._extract_elements(formula)) return elements
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs] def _build_reaction_groups(self, method: str = "or"): """ Group reactions based on shared reactant_abbrevs AND product_abbrevs. Assigns each unique ID to a group number. """ self._reaction_groups = {} group_counter = 0 processed_ids = set() for uid in self._original_df["unique_id"].unique(): if uid in processed_ids: continue row = self._original_df[self._original_df["unique_id"] == uid].iloc[0] reactant_abbrevs = row.get("reactants", []) product_abbrevs = row.get("products", []) if not isinstance(reactant_abbrevs, list) or not isinstance( product_abbrevs, list ): continue if not reactant_abbrevs or not product_abbrevs: continue f_reactant_ids = self.get_unique_ids_from_phase_abbrevs( reactant_abbrevs, self._uid_to_reactant_abbrevs_lookup ) f_product_ids = self.get_unique_ids_from_phase_abbrevs( product_abbrevs, self._uid_to_product_abbrevs_lookup ) r_reactant_ids = self.get_unique_ids_from_phase_abbrevs( reactant_abbrevs, self._uid_to_product_abbrevs_lookup ) r_product_ids = self.get_unique_ids_from_phase_abbrevs( product_abbrevs, self._uid_to_reactant_abbrevs_lookup ) if method == "and": matching_ids = f_reactant_ids.intersection(f_product_ids).union( r_reactant_ids.intersection(r_product_ids) ) else: matching_ids = f_reactant_ids.union(f_product_ids).union( r_reactant_ids.union(r_product_ids) ) if matching_ids: for match_id in matching_ids: self._reaction_groups[match_id] = group_counter processed_ids.add(match_id) self._reaction_groups[uid] = group_counter processed_ids.add(uid) group_counter += 1 for uid in self._original_df["unique_id"].unique(): if uid not in processed_ids: self._reaction_groups[uid] = group_counter group_counter += 1
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs] def _build_color_map(self): """ Build a color map for reaction groups. Assigns a color to each unique group number. """ if not self._reaction_groups: self._build_reaction_groups() unique_groups = set(self._reaction_groups.values()) palette = self._get_color_palette() self._color_map = { str(group): palette[i % len(palette)] for i, group in enumerate(unique_groups) }
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs] def _get_color_palette(self) -> list[str]: """Get a color palette based on the specified name.""" if self.color_palette in dir(px.colors.qualitative): return getattr(px.colors.qualitative, self.color_palette) elif self.color_palette.lower() in px.colors.named_colorscales(): return [color[1] for color in px.colors.get_colorscale(self.color_palette)] else: print( f"'{self.color_palette}' is not a valid palette, using default 'Set1'." ) return px.colors.qualitative.Set1
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs] def _get_color_from_reaction(self, unique_id: str) -> str: """Get the color for a specific unique ID.""" if unique_id not in self._reaction_groups: return "#000000" group = self._reaction_groups[unique_id] return self._color_map.get(str(group), "#000000")
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs] def add_color_keys(self, filtered_df: pd.DataFrame) -> pd.DataFrame: """Add color information to a filtered dataframe.""" df_copy = filtered_df.copy() df_copy["rxn_group"] = df_copy["unique_id"].map( lambda x: self._reaction_groups.get(x, -1) ) df_copy["rxn_color_key"] = df_copy["unique_id"].map( lambda x: self._get_color_from_reaction(x) ) return df_copy
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs] @staticmethod def get_group_id(group_name: str) -> str: """Reformats group id for in compatible format for shiny UI IDs.""" return ( group_name.lower().replace(" ", "_").replace("&", "and").replace("-", "_") )
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs] @staticmethod def convert_P_to_kbar(row): if row["units_P"] == "GPa": row["P"] *= 10 row["P_uncertainty"] *= 10 return row
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs] @staticmethod def convert_P_to_gigapascal(row): if row["units_P"] == "kbar": row["P"] *= 0.1 row["P_uncertainty"] *= 0.1 return row
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs] @staticmethod def convert_T_to_kelvin(row): if row["units_T"] == "C": row["T"] += 273 return row
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs] @staticmethod def convert_T_to_celcius(row): if row["units_T"] == "K": row["T"] -= 273 return row
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @property def phases(self) -> list[str]: """Get a list of unique phase names from reactants and products.""" all_phases = set(self._uid_to_reactant_abbrevs_lookup.keys()) | set( self._uid_to_product_abbrevs_lookup.keys() ) return list(all_phases) # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @property def data(self) -> pd.DataFrame: """""" return self._original_df
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs]def main(): """""" from rxnDB.data.loader import RxnDBLoader from rxnDB.utils import app_dir filepath = app_dir / "data" / "cache" / "rxnDB.parquet" rxnDB: pd.DataFrame = RxnDBLoader.load_parquet(filepath) processor: RxnDBProcessor = RxnDBProcessor(rxnDB) all_phases = processor.phases print(all_phases)
if __name__ == "__main__": main()