Source code for rxnDB.data.preprocessor

#######################################################
## .0. Load Libraries                            !!! ##
#######################################################
import re
from dataclasses import dataclass
from pathlib import Path
from typing import Any, cast

from ruamel.yaml import YAML

from rxnDB.utils import app_dir


#######################################################
## .1. HP11Preprocessor                        !!! ##
#######################################################
[docs]@dataclass class HP11Preprocessor: in_data: Path out_dir: Path # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs] def __post_init__(self) -> None: """""" if not self.in_data.exists(): raise FileNotFoundError(f"Could not find {self.in_data}!") self.yaml = YAML() self.yaml.indent(mapping=2, sequence=4, offset=2) self.yaml.default_flow_style = False self.yaml.allow_unicode = True self.yaml.explicit_start = True self.out_dir.mkdir(parents=True, exist_ok=True)
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs] def preprocess(self) -> None: """""" raw_text = self.in_data.read_text() data_entries = self._split_into_entries(raw_text) for i, entry in enumerate(data_entries): print(f"Processing HP11 entry {i + 1} ...", end="\r", flush=True) rxn_data = self._process_entry(entry) in_data = self.out_dir / f"hp11-{i + 1:03}.yml" with open(in_data, "w") as file: self.yaml.dump(rxn_data, file)
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs] @staticmethod def _split_into_entries(text: str) -> list[str]: """""" entries = re.split(r"(?=\n\s*\d+\))", text) return [e.strip() for e in entries if e.strip()]
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs] def _process_entry(self, entry: str) -> dict[str, Any]: """""" lines = entry.splitlines() header = lines[0].strip() data_lines = lines[2:] index, reaction, citation = self._split_reaction_and_citation(header) reactants, products = self._split_reaction(reaction) rxn_data = self._parse_data_lines(data_lines) rounded_data = cast(dict[str, Any], self._round_data(rxn_data)) data_type = ( "phase_boundary" if all(x == 0.0 for x in rounded_data["ln_K"]["mid"]) else "calibration" ) def to_point_block( mid: list[float | None], half_range: list[float | None] ) -> dict: return { "value": [v if v is not None else None for v in mid], "uncertainty": [u if u is not None else None for u in half_range], } out_dict = { "reactants": {p: None for p in reactants}, "products": {p: None for p in products}, "data": { "type": data_type, "units": {"T": "C", "P": "kbar"}, "points": { "T": to_point_block( rounded_data["T"]["mid"], rounded_data["T"]["half_range"] ), "P": to_point_block( rounded_data["P"]["mid"], rounded_data["P"]["half_range"] ), "lnK": to_point_block( rounded_data["ln_K"]["mid"], rounded_data["ln_K"]["half_range"] ), }, }, "metadata": { "unique_id": f"hp11-{int(index):03}", "reference": citation, "method": None, "comments": None, }, } return out_dict
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs] @staticmethod def _round_data( data: dict[str, dict[str, list[float]]], decimals: int = 3 ) -> dict[str, Any]: """""" return { k: {subk: [round(x, decimals) for x in v] for subk, v in subv.items()} for k, subv in data.items() }
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs] def _split_reaction_and_citation( self, header: str ) -> tuple[str, str, dict[str, Any]]: """""" match = re.match(r"(\d+)\)\s+(.*)", header) if not match: raise ValueError(f"Invalid header: {header}") index, rest = match.groups() depth: int = 0 for i in range(len(rest) - 1, -1, -1): if rest[i] == ")": depth += 1 elif rest[i] == "(": depth -= 1 if depth == 0: reaction: str = rest[:i].strip().replace("=", "=>") citation: str = rest[i + 1 : -1].strip() return ( index, reaction, self._split_citations(citation), ) return index, rest.strip().replace("=", "=>"), {}
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs] @staticmethod def _split_reaction(reaction: str) -> tuple[list[str], list[str]]: """""" if "=>" not in reaction: raise ValueError(f"Invalid reaction: {reaction}") reactants, products = reaction.split("=>") def strip_digits(s: str) -> str: return re.sub(r"^\d+", "", s.strip()) return [strip_digits(r) for r in reactants.split("+")], [ strip_digits(p) for p in products.split("+") ]
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs] @staticmethod def _split_citations(citation_text: str) -> dict[str, Any]: parts: list[str] = re.split(r";\s*", citation_text) all_authors, years = [], [] for part in parts: match = re.match(r"(.+?)(?:,|\s)(\d{4})$", part.strip()) if match: raw_authors = ( match.group(1).replace("et al.,", "et al.").strip().rstrip(",") ) if "et al." in raw_authors: split_authors = [raw_authors] else: split_authors = re.split(r"\s*(?:&| and )\s*", raw_authors) all_authors.extend([a.strip() for a in split_authors]) years.append(int(match.group(2))) else: all_authors.append(part.strip()) years.append(None) return { "short_cite": citation_text, "authors": all_authors, "year": years if len(years) > 1 else years[0], }
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs] @staticmethod def _parse_data_lines(data_lines: list[str]) -> dict[str, Any]: """""" def to_float(s: str) -> float | None: """""" s = s.strip() return float(s) if s and s != "-" else None def mid_half( a: float | None, b: float | None ) -> tuple[float | None, float | None]: """""" if a is None and b is None: return None, None if a is None: return b, None if b is None: return a, None return (a + b) / 2, abs(b - a) / 2 parsed: list[list[float | None]] = [] for line in data_lines: tokens: list[str] = line.split() if not tokens or to_float(tokens[0]) is None: continue parsed.append([to_float(tok) for tok in tokens[:7]]) if not parsed: return {"ln_K": [], "x_CO2": [], "P": [], "T": []} lnK_mid, lnK_range = [], [] xCO2_mid, xCO2_range = [], [] P_mid, P_range = [], [] T_mid, T_range = [], [] for row in parsed: m, r = mid_half(row[0], row[1]) lnK_mid.append(m) lnK_range.append(r) m, r = mid_half(row[2], row[2]) xCO2_mid.append(m) xCO2_range.append(r) m, r = mid_half(row[3], row[4]) P_mid.append(m) P_range.append(r) m, r = mid_half(row[5], row[6]) T_mid.append(m) T_range.append(r) return { "ln_K": {"mid": lnK_mid, "half_range": lnK_range}, "x_CO2": {"mid": xCO2_mid, "half_range": xCO2_range}, "P": {"mid": P_mid, "half_range": P_range}, "T": {"mid": T_mid, "half_range": T_range}, }
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs]def main(): """""" in_data = app_dir / "data" / "sets" / "hp11-raw.txt" out_dir = app_dir / "data" / "sets" / "hp11" hp11_db = HP11Preprocessor(in_data, out_dir) hp11_db.preprocess() print("\nDatasets preprocessed!")
if __name__ == "__main__": main()