Source code for FOX.io.read_prm

"""A class for reading and generating .prm parameter files.

Index
-----
.. currentmodule:: FOX
.. autosummary::
    PRMContainer
    PRMContainer.read
    PRMContainer.write
    PRMContainer.overlay_mapping
    PRMContainer.overlay_cp2k_settings
    PRMContainer.concatenate

API
---
.. autoclass:: PRMContainer
    :noindex:
    :members: atoms, bonds, angles, dihedrals, impropers, nonbonded, nbfix

.. automethod:: PRMContainer.read
.. automethod:: PRMContainer.write
.. automethod:: PRMContainer.overlay_mapping
.. automethod:: PRMContainer.overlay_cp2k_settings
.. automethod:: PRMContainer.concatenate

"""

from __future__ import annotations

import copy
import textwrap
from types import MappingProxyType
from typing import (Any, Iterator, Dict, Tuple, Mapping, List, Union, Iterable, Sequence,
                    Optional, ClassVar, MutableSequence, Type, NamedTuple, TYPE_CHECKING)
from itertools import chain, repeat
from contextlib import nullcontext

import numpy as np
import pandas as pd
from pandas.testing import assert_frame_equal

from scm.plams import Settings
from nanoutils import AbstractFileContainer, set_docstring

from .cp2k_to_prm import CP2K_TO_PRM as _CP2K_TO_PRM, PRMMapping, PostProcess
from ..functions.cp2k_utils import parse_cp2k_value

if TYPE_CHECKING:
    from typing_extensions import Self, Literal

    _DFNames = Literal[
        "atoms",
        "bonds",
        "angles",
        "dihedrals",
        "nbfix",
        "nonbonded",
        "improper",
        "impropers",
    ]

__all__ = ['PRMContainer']

# e.g. a Pandas.Series with an Index
SeriesIdx = Mapping[str, float]

# e.g. a Pandas.Series with a MultiIndex
SeriesMultiIdx = Mapping[Tuple[str, ...], float]


class _PRMAttrTup(NamedTuple):
    """A :class:`~collections.namedtuple` representing :class:`PRMContainer` attributes."""

    atoms: Optional[pd.DataFrame]
    bonds: Optional[pd.DataFrame]
    angles: Optional[pd.DataFrame]
    dihedrals: Optional[pd.DataFrame]
    impropers: Optional[pd.DataFrame]
    nbfix: Optional[pd.DataFrame]
    hbond: Optional[str]
    nonbonded_header: Optional[str]
    nonbonded: Optional[pd.DataFrame]


[docs]class PRMContainer(AbstractFileContainer): """A class for managing prm files. Examples -------- .. code:: python >>> from FOX import PRMContainer >>> input_file = str(...) >>> output_file = str(...) >>> prm = PRMContainer.read(input_file) >>> prm.write(output_file) """ # Attribute names should be in the same order as in __init__() __slots__ = _PRMAttrTup._fields + ('_pd_printoptions', '__weakref__') #: A dataframe holding atomic parameters. atoms: Optional[pd.DataFrame] #: A dataframe holding bond-related parameters. bonds: Optional[pd.DataFrame] #: A dataframe holding angle-related parameters. angles: Optional[pd.DataFrame] #: A dataframe holding proper dihedral-related parameters. dihedrals: Optional[pd.DataFrame] #: A dataframe holding improper diehdral-related parameters. impropers: Optional[pd.DataFrame] #: A dataframe holding non-bonded atomic parameters. nonbonded: Optional[pd.DataFrame] #: A string holding additional non-bonded related info. nonbonded_header: Optional[str] #: A dataframe holding non-bonded pair-wise atomic parameters. nbfix: Optional[pd.DataFrame] #: A string holding hydrogen bonding-related info. hbond: Optional[str] #: A mapping providing tools for converting CP2K settings to .prm-compatible values. #: See :data:`CP2K_TO_PRM<FOX.io.cp2k_to_prm.CP2K_TO_PRM>`. CP2K_TO_PRM: ClassVar[Mapping[str, PRMMapping]] = _CP2K_TO_PRM #: A tuple of supported .psf headers. _HEADERS: ClassVar[Tuple[str, ...]] = ( 'ATOMS', 'BONDS', 'ANGLES', 'DIHEDRALS', 'NBFIX', 'HBOND', 'NONBONDED', 'IMPROPER', 'IMPROPERS', 'END' ) #: Define the columns for each DataFrame which hold its index _INDEX: ClassVar[Mapping[_DFNames, List[int]]] = MappingProxyType({ 'atoms': [2], 'bonds': [0, 1], 'angles': [0, 1, 2], 'dihedrals': [0, 1, 2, 3], 'nbfix': [0, 1], 'nonbonded': [0], 'improper': [0, 1, 2, 3], 'impropers': [0, 1, 2, 3] }) #: Placeholder values for DataFrame columns _COLUMNS: ClassVar[Mapping[_DFNames, Tuple[Any, ...]]] = MappingProxyType({ 'atoms': (None, -1, None, np.nan), 'bonds': (None, None, np.nan, np.nan), 'angles': (None, None, None, np.nan, np.nan, np.nan, np.nan), 'dihedrals': (None, None, None, None, np.nan, -1, np.nan), 'nbfix': (None, None, np.nan, np.nan, np.nan, np.nan), 'nonbonded': (None, np.nan, np.nan, np.nan, np.nan, np.nan, np.nan), 'improper': (None, None, None, None, np.nan, 0, np.nan), 'impropers': (None, None, None, None, np.nan, 0, np.nan) }) @property def improper(self) -> Optional[pd.DataFrame]: """Alias for :attr:`PRMContainer.impropers`.""" return self.impropers @improper.setter def improper(self, value: Optional[pd.DataFrame]) -> None: self.impropers = value def __init__(self, atoms: Optional[pd.DataFrame] = None, bonds: Optional[pd.DataFrame] = None, angles: Optional[pd.DataFrame] = None, dihedrals: Optional[pd.DataFrame] = None, impropers: Optional[pd.DataFrame] = None, nbfix: Optional[pd.DataFrame] = None, hbond: Optional[str] = None, nonbonded_header: Optional[str] = None, nonbonded: Optional[pd.DataFrame] = None, improper: Optional[pd.DataFrame] = None) -> None: """Initialize a :class:`PRMContainer` instance.""" if impropers is not None and improper is not None: raise TypeError("'impropers' and 'improper' cannot be both specified") self.atoms = atoms self.bonds = bonds self.angles = angles self.dihedrals = dihedrals self.improper = improper if improper is not None else impropers self.nonbonded_header = nonbonded_header self.nonbonded = nonbonded self.nbfix = nbfix self.hbond = hbond # Print options for Pandas DataFrames self._pd_printoptions: Dict[str, Any] = {'display.max_rows': 20} @property def pd_printoptions(self) -> Iterator[Union[str, Any]]: """Return an iterator flattening :attr:`_pd_printoptions`.""" return chain.from_iterable(self._pd_printoptions.items()) def __repr__(self) -> str: """Implement :class:`str(self)<str>` and :func:`repr(self)<repr>`.""" # Get all to-be printed attribute (names) cls = type(self) attr_names = cls.__slots__[:-2] # Determine the indentation width width = max(len(k) for k in attr_names) indent = width + 3 # Gather string representations of all attributes ret = '' with pd.option_context(*self.pd_printoptions): items = ((k, getattr(self, k)) for k in attr_names) for k, _v in items: v = textwrap.indent(repr(_v), ' ' * indent)[indent:] ret += f'{k:{width}} = {v},\n' return f'{cls.__name__}(\n{textwrap.indent(ret[:-2], 4 * " ")}\n)' def __eq__(self, value: object) -> bool: """Implement :meth:`self == value<object.__eq__>`.""" if type(self) is not type(value): return False # Get all to-be printed attribute (names) cls = type(self) attr_names = cls.__slots__[:-2] # Compare the attributes ret = True str_or_none = {'nonbonded_header', 'hbond'} iterator = ((k, getattr(self, k), getattr(value, k)) for k in attr_names) for k, attr1, attr2 in iterator: if attr1 is attr2: continue elif k in str_or_none: ret &= attr1 == attr2 continue try: assert_frame_equal(attr1, attr2) except AssertionError: return False return ret def __reduce__(self) -> Tuple[Type[Self], _PRMAttrTup, Dict[str, Any]]: """Helper function for :mod:`pickle`.""" cls = type(self) attr_names = cls.__slots__[:-2] attr_tup = _PRMAttrTup._make(getattr(self, k) for k in attr_names) return cls, attr_tup, self._pd_printoptions def __setstate__(self, state: Dict[str, Any]) -> None: """Helper function for :meth:`__reduce__`.""" self._pd_printoptions = state def copy(self, deep: bool = True) -> Self: """Create and return a copy of this instance. Parameters ---------- deep : :class:`bool` If :data:`True`, return a deep copy. Returns ------- :class:`FOX.PRMContainer` A new prmcontainer. """ if deep: return copy.deepcopy(self) else: return copy.copy(self) """########################### methods for reading .prm files. ##############################""" @classmethod @set_docstring(AbstractFileContainer._read.__doc__) def _read(cls, file_obj, decoder): ret = {} special_header = {'hbond', 'nonbonded'} iterator = (decoder(i).rstrip('\n') for i in file_obj) for i in iterator: if i.startswith('!') or i.startswith('*') or i.isspace() or not i: continue # Ignore comment lines and empty lines _key = i.split(maxsplit=1)[0] key = _key.lower() if _key in cls._HEADERS: ret[key] = value = [] if key in special_header: value.append(i.split()[1:]) continue v, *_ = i.partition('!') value.append(v.split()) cls._read_post_iterate(ret) return ret @classmethod def _read_post_iterate(cls, kwargs: dict) -> None: """Post process the dictionary produced by :meth:`PRMContainer._read_iterate`.""" kwargs.pop('end', None) kwargs.pop('end_comment', None) nonbonded_header = None for k, v in kwargs.items(): if k == 'hbond': kwargs[k] = ' '.join(chain.from_iterable(v)).split('!')[0].rstrip() elif k == 'nonbonded': nonbonded_header = ' '.join(chain.from_iterable(v[0:2])).rstrip() kwargs[k] = df = pd.DataFrame(v[2:]) cls._process_df(df, k) else: kwargs[k] = df = pd.DataFrame(v) cls._process_df(df, k) kwargs['nonbonded_header'] = nonbonded_header for k, v in kwargs.items(): if isinstance(v, pd.DataFrame): if not v.any().any(): kwargs[k] = None @classmethod def _process_df(cls, df: pd.DataFrame, key: _DFNames) -> None: """Fill in all columns, set their data type and assign an index to **df**.""" for i, default in enumerate(cls._COLUMNS[key]): if i not in df: df[i] = default else: default_type = str if default is None else type(default) df[i] = df[i].astype(default_type, copy=False) df.set_index(cls._INDEX[key], inplace=True) """########################### methods for writing .prm files. ##############################""" @set_docstring(AbstractFileContainer._write.__doc__) def _write(self, file_obj, encoder) -> None: isnull = pd.isnull write = lambda n: file_obj.write(encoder(n)) # noqa: E731 for key in self._HEADERS[:-2]: key_low = key.lower() df = getattr(self, key_low) if key_low == 'hbond' and df is not None: write(f'\n{key} {df}\n') continue elif not isinstance(df, pd.DataFrame): continue df = df.reset_index() # Do NOT modify this inplace df_str = ('{:8} ' * df.shape[1])[:-1] if key_low != 'nonbonded': write(f'\n{key}\n') else: if self.nonbonded_header is not None: header = '-\n'.join(i for i in self.nonbonded_header.split('-')) write(f'\n{key} {header}\n') for _, row_value in df.iterrows(): write_str = df_str.format(*(('' if isnull(i) else i) for i in row_value)) write(f'{write_str.rstrip()}\n') write('\nEND\n') """######################### Methods for updating the PRMContainer ##########################"""
[docs] def overlay_mapping(self, prm_name: str, param: Mapping[str, Union[SeriesIdx, SeriesMultiIdx]], units: Optional[Iterable[Optional[str]]] = None) -> None: """Update a set of parameters, **prm_name**, with those provided in **param_df**. Examples -------- .. code:: python >>> from FOX import PRMContainer >>> prm = PRMContainer(...) >>> param_dict = {} >>> param_dict['epsilon'] = {'Cd Cd': ..., 'Cd Se': ..., 'Se Se': ...} # epsilon >>> param_dict['sigma'] = {'Cd Cd': ..., 'Cd Se': ..., 'Se Se': ...} # sigma >>> units = ('kcal/mol', 'angstrom') # input units for epsilon and sigma >>> prm.overlay_mapping('nonbonded', param_dict, units=units) Parameters ---------- prm_name : :class:`str` The name of the parameter of interest. See the keys of :attr:`PRMContainer.CP2K_TO_PRM` for accepted values. param : :class:`pandas.DataFrame` or nested :class:`~collections.abc.Mapping` A DataFrame or nested mapping with the to-be added parameters. The keys should be a subset of :attr:`PRMContainer.CP2K_TO_PRM[prm_name]["columns"]<PRMContainer.CP2K_TO_PRM>`. If the index/nested sub-keys consist of strings then they'll be split and turned into a :class:`pandas.MultiIndex`. Note that the resulting values are *not* sorted. units : :class:`Iterable[str] <collections.abc.Iterable>`, optional An iterable with the input units of each column in **param_df**. If ``None``, default to the defaults specified in :attr:`PRMContainer.CP2K_TO_PRM[prm_name]["unit"]<PRMContainer.CP2K_TO_PRM>`. """ # Parse arguments if units is None: units = repeat(None) try: prm_map = self.CP2K_TO_PRM[prm_name] except KeyError as ex: raise ValueError(f"'prm_name is of invalid value ({prm_name!r}); " f"accepted values: {tuple(self.CP2K_TO_PRM.keys())!r}") from ex # Extract parameter specific arguments name = prm_map['name'] output_units = prm_map['unit'] post_process = prm_map['post_process'] key_set = set(prm_map['key']) str2int = {item: i for item, i in zip(prm_map['key'], prm_map['columns'])} # Ensure that the attribute in question is a DataFrame and not None df = getattr(self, name) if df is None: df = pd.DataFrame() setattr(self, name, df) self._process_df(df, name) # Parse and validate the columns param_df = pd.DataFrame(param, copy=True) if not key_set.issuperset(param_df.columns): raise ValueError("The keys in `param` should be a subset of " f"`PRMContainer.CP2K_TO_PRM[{prm_name!r}]['key']`") param_df.columns = [str2int[i] for i in param_df.columns] # Parse and validate the index if not isinstance(param_df.index, pd.MultiIndex): iterator1 = (i.split() for i in param_df.index) param_df.index = pd.MultiIndex.from_tuples(iterator1) # Apply unit conversion and post-processing iterator2 = zip(param_df.items(), units, output_units, post_process) for (k, series), unit, output_unit, func in iterator2: series_new = parse_cp2k_value(series, unit=output_unit, default_unit=unit) if func is not None: series_new = func(series_new) param_df[k] = series_new # Updated the DataFrame columns = param_df.columns for index, values in param_df.iterrows(): df.loc[index, columns] = values
[docs] def overlay_cp2k_settings(self, cp2k_settings: Settings) -> None: """Extract forcefield information from PLAMS-style CP2K settings. Performs an inplace update of this instance. Examples -------- Example input value for **cp2k_settings**. In the provided example the **cp2k_settings** are directly extracted from a CP2K .inp file. .. code:: python >>> import cp2kparser # https://github.com/nlesc-nano/CP2K-Parser >>> filename = str(...) >>> cp2k_settings: dict = cp2kparser.read_input(filename) >>> print(cp2k_settings) {'force_eval': {'mm': {'forcefield': {'nonbonded': {'lennard-jones': [...]}}}}} Parameters ---------- cp2k_settings : :class:`~collections.abc.Mapping` A Mapping with PLAMS-style CP2K settings. """ if 'input' not in cp2k_settings: cp2k_settings = Settings({'input': cp2k_settings}) # If cp2k_settings is a Settings instance enable the `suppress_missing` context manager # In this manner normal KeyErrors will be raised, just like with dict if isinstance(cp2k_settings, Settings): context_manager = cp2k_settings.suppress_missing else: context_manager = nullcontext with context_manager(): for prm_map in self.CP2K_TO_PRM.values(): name = prm_map['name'] columns = list(prm_map['columns']) key_path = prm_map['key_path'] key = prm_map['key'] unit = prm_map['unit'] default_unit = prm_map['default_unit'] post_process = prm_map['post_process'] self._overlay_cp2k_settings(cp2k_settings, name, columns, key_path, key, unit, default_unit, post_process)
def _overlay_cp2k_settings( self, cp2k_settings: Settings, name: _DFNames, columns: MutableSequence[int], key_path: Sequence[str], key: Iterable[str], unit: Iterable[str], default_unit: Iterable[Optional[str]], post_process: Iterable[Optional[PostProcess]], ) -> None: """Helper function for :meth:`PRMContainer.overlay_cp2k_settings`.""" # Extract the appropiate dict or sequence of dicts try: prm_iter = Settings.get_nested(cp2k_settings, key_path) except KeyError: return else: prm_iter = (prm_iter,) if isinstance(prm_iter, Mapping) else prm_iter # Ensure that PRMContainter section is a DataFrame and not None df = getattr(self, name) if df is None: df = pd.DataFrame() setattr(self, name, df) self._process_df(df, name) # Extract, parse and write the values for i, prm_dict in enumerate(prm_iter): try: # Extract the appropiate values index = prm_dict['atoms'] value_gen = (prm_dict[k] for k in key) except KeyError as ex: raise KeyError(f"Failed to extract the {ex!r} key from " f"{key_path[-1]!r} block {i}") from ex # Sanitize the values and convert them into appropiate units iterator = zip(value_gen, unit, default_unit) value_list = [parse_cp2k_value(*args) for args in iterator] # Post-process the values for i, (prm, func) in enumerate(zip(value_list, post_process)): if func is not None: value_list[i] = func(prm) # Assign the values df.loc[index, columns] = value_list def _concatenate( self, prm_iter: Iterable[PRMContainer], field: _DFNames, ) -> None | pd.DataFrame: iterator: Iterator[pd.DataFrame | None] = chain( [getattr(self, field)], (getattr(i, field) for i in prm_iter), ) df_list = [i for i in iterator if i is not None] if len(df_list) == 0: return None df = pd.concat(df_list) return df.loc[df.index.drop_duplicates(), :]
[docs] def concatenate(self, prm_iter: Iterable[PRMContainer]) -> Self: """Concatenate multiple PRMContainers into a single instance. Parameters ---------- prm_iter : list[FOX.PRMContainer] A list with other PRMContainers to concatenate Returns ------- FOX.PRMContainer The new concatenated PRMContainer """ prm_list: list[PRMContainer] = [] for prm in prm_iter: if not isinstance(prm, PRMContainer): raise TypeError("Expected a PRMContainer") prm_list.append(prm) cls = type(self) return cls( hbond=self.hbond, nonbonded_header=self.nonbonded_header, atoms=self._concatenate(prm_list, "atoms"), bonds=self._concatenate(prm_list, "bonds"), angles=self._concatenate(prm_list, "angles"), dihedrals=self._concatenate(prm_list, "dihedrals"), nbfix=self._concatenate(prm_list, "nbfix"), nonbonded=self._concatenate(prm_list, "nonbonded"), impropers=self._concatenate(prm_list, "impropers"), )