"""A class for reading and generating .prm parameter files.
Index
-----
.. currentmodule:: FOX
.. autosummary::
PRMContainer
PRMContainer.read
PRMContainer.write
PRMContainer.overlay_mapping
PRMContainer.overlay_cp2k_settings
PRMContainer.concatenate
API
---
.. autoclass:: PRMContainer
:noindex:
:members: atoms, bonds, angles, dihedrals, impropers, nonbonded, nbfix
.. automethod:: PRMContainer.read
.. automethod:: PRMContainer.write
.. automethod:: PRMContainer.overlay_mapping
.. automethod:: PRMContainer.overlay_cp2k_settings
.. automethod:: PRMContainer.concatenate
"""
from __future__ import annotations
import copy
import textwrap
from types import MappingProxyType
from typing import (Any, Iterator, Dict, Tuple, Mapping, List, Union, Iterable, Sequence,
Optional, ClassVar, MutableSequence, Type, NamedTuple, TYPE_CHECKING)
from itertools import chain, repeat
from contextlib import nullcontext
import numpy as np
import pandas as pd
from pandas.testing import assert_frame_equal
from scm.plams import Settings
from nanoutils import AbstractFileContainer, set_docstring
from .cp2k_to_prm import CP2K_TO_PRM as _CP2K_TO_PRM, PRMMapping, PostProcess
from ..functions.cp2k_utils import parse_cp2k_value
if TYPE_CHECKING:
from typing_extensions import Self, Literal
_DFNames = Literal[
"atoms",
"bonds",
"angles",
"dihedrals",
"nbfix",
"nonbonded",
"improper",
"impropers",
]
__all__ = ['PRMContainer']
# e.g. a Pandas.Series with an Index
SeriesIdx = Mapping[str, float]
# e.g. a Pandas.Series with a MultiIndex
SeriesMultiIdx = Mapping[Tuple[str, ...], float]
class _PRMAttrTup(NamedTuple):
"""A :class:`~collections.namedtuple` representing :class:`PRMContainer` attributes."""
atoms: Optional[pd.DataFrame]
bonds: Optional[pd.DataFrame]
angles: Optional[pd.DataFrame]
dihedrals: Optional[pd.DataFrame]
impropers: Optional[pd.DataFrame]
nbfix: Optional[pd.DataFrame]
hbond: Optional[str]
nonbonded_header: Optional[str]
nonbonded: Optional[pd.DataFrame]
[docs]class PRMContainer(AbstractFileContainer):
"""A class for managing prm files.
Examples
--------
.. code:: python
>>> from FOX import PRMContainer
>>> input_file = str(...)
>>> output_file = str(...)
>>> prm = PRMContainer.read(input_file)
>>> prm.write(output_file)
"""
# Attribute names should be in the same order as in __init__()
__slots__ = _PRMAttrTup._fields + ('_pd_printoptions', '__weakref__')
#: A dataframe holding atomic parameters.
atoms: Optional[pd.DataFrame]
#: A dataframe holding bond-related parameters.
bonds: Optional[pd.DataFrame]
#: A dataframe holding angle-related parameters.
angles: Optional[pd.DataFrame]
#: A dataframe holding proper dihedral-related parameters.
dihedrals: Optional[pd.DataFrame]
#: A dataframe holding improper diehdral-related parameters.
impropers: Optional[pd.DataFrame]
#: A dataframe holding non-bonded atomic parameters.
nonbonded: Optional[pd.DataFrame]
#: A string holding additional non-bonded related info.
nonbonded_header: Optional[str]
#: A dataframe holding non-bonded pair-wise atomic parameters.
nbfix: Optional[pd.DataFrame]
#: A string holding hydrogen bonding-related info.
hbond: Optional[str]
#: A mapping providing tools for converting CP2K settings to .prm-compatible values.
#: See :data:`CP2K_TO_PRM<FOX.io.cp2k_to_prm.CP2K_TO_PRM>`.
CP2K_TO_PRM: ClassVar[Mapping[str, PRMMapping]] = _CP2K_TO_PRM
#: A tuple of supported .psf headers.
_HEADERS: ClassVar[Tuple[str, ...]] = (
'ATOMS', 'BONDS', 'ANGLES', 'DIHEDRALS', 'NBFIX', 'HBOND', 'NONBONDED', 'IMPROPER',
'IMPROPERS', 'END'
)
#: Define the columns for each DataFrame which hold its index
_INDEX: ClassVar[Mapping[_DFNames, List[int]]] = MappingProxyType({
'atoms': [2],
'bonds': [0, 1],
'angles': [0, 1, 2],
'dihedrals': [0, 1, 2, 3],
'nbfix': [0, 1],
'nonbonded': [0],
'improper': [0, 1, 2, 3],
'impropers': [0, 1, 2, 3]
})
#: Placeholder values for DataFrame columns
_COLUMNS: ClassVar[Mapping[_DFNames, Tuple[Any, ...]]] = MappingProxyType({
'atoms': (None, -1, None, np.nan),
'bonds': (None, None, np.nan, np.nan),
'angles': (None, None, None, np.nan, np.nan, np.nan, np.nan),
'dihedrals': (None, None, None, None, np.nan, -1, np.nan),
'nbfix': (None, None, np.nan, np.nan, np.nan, np.nan),
'nonbonded': (None, np.nan, np.nan, np.nan, np.nan, np.nan, np.nan),
'improper': (None, None, None, None, np.nan, 0, np.nan),
'impropers': (None, None, None, None, np.nan, 0, np.nan)
})
@property
def improper(self) -> Optional[pd.DataFrame]:
"""Alias for :attr:`PRMContainer.impropers`."""
return self.impropers
@improper.setter
def improper(self, value: Optional[pd.DataFrame]) -> None:
self.impropers = value
def __init__(self, atoms: Optional[pd.DataFrame] = None,
bonds: Optional[pd.DataFrame] = None,
angles: Optional[pd.DataFrame] = None,
dihedrals: Optional[pd.DataFrame] = None,
impropers: Optional[pd.DataFrame] = None,
nbfix: Optional[pd.DataFrame] = None,
hbond: Optional[str] = None,
nonbonded_header: Optional[str] = None,
nonbonded: Optional[pd.DataFrame] = None,
improper: Optional[pd.DataFrame] = None) -> None:
"""Initialize a :class:`PRMContainer` instance."""
if impropers is not None and improper is not None:
raise TypeError("'impropers' and 'improper' cannot be both specified")
self.atoms = atoms
self.bonds = bonds
self.angles = angles
self.dihedrals = dihedrals
self.improper = improper if improper is not None else impropers
self.nonbonded_header = nonbonded_header
self.nonbonded = nonbonded
self.nbfix = nbfix
self.hbond = hbond
# Print options for Pandas DataFrames
self._pd_printoptions: Dict[str, Any] = {'display.max_rows': 20}
@property
def pd_printoptions(self) -> Iterator[Union[str, Any]]:
"""Return an iterator flattening :attr:`_pd_printoptions`."""
return chain.from_iterable(self._pd_printoptions.items())
def __repr__(self) -> str:
"""Implement :class:`str(self)<str>` and :func:`repr(self)<repr>`."""
# Get all to-be printed attribute (names)
cls = type(self)
attr_names = cls.__slots__[:-2]
# Determine the indentation width
width = max(len(k) for k in attr_names)
indent = width + 3
# Gather string representations of all attributes
ret = ''
with pd.option_context(*self.pd_printoptions):
items = ((k, getattr(self, k)) for k in attr_names)
for k, _v in items:
v = textwrap.indent(repr(_v), ' ' * indent)[indent:]
ret += f'{k:{width}} = {v},\n'
return f'{cls.__name__}(\n{textwrap.indent(ret[:-2], 4 * " ")}\n)'
def __eq__(self, value: object) -> bool:
"""Implement :meth:`self == value<object.__eq__>`."""
if type(self) is not type(value):
return False
# Get all to-be printed attribute (names)
cls = type(self)
attr_names = cls.__slots__[:-2]
# Compare the attributes
ret = True
str_or_none = {'nonbonded_header', 'hbond'}
iterator = ((k, getattr(self, k), getattr(value, k)) for k in attr_names)
for k, attr1, attr2 in iterator:
if attr1 is attr2:
continue
elif k in str_or_none:
ret &= attr1 == attr2
continue
try:
assert_frame_equal(attr1, attr2)
except AssertionError:
return False
return ret
def __reduce__(self) -> Tuple[Type[Self], _PRMAttrTup, Dict[str, Any]]:
"""Helper function for :mod:`pickle`."""
cls = type(self)
attr_names = cls.__slots__[:-2]
attr_tup = _PRMAttrTup._make(getattr(self, k) for k in attr_names)
return cls, attr_tup, self._pd_printoptions
def __setstate__(self, state: Dict[str, Any]) -> None:
"""Helper function for :meth:`__reduce__`."""
self._pd_printoptions = state
def copy(self, deep: bool = True) -> Self:
"""Create and return a copy of this instance.
Parameters
----------
deep : :class:`bool`
If :data:`True`, return a deep copy.
Returns
-------
:class:`FOX.PRMContainer`
A new prmcontainer.
"""
if deep:
return copy.deepcopy(self)
else:
return copy.copy(self)
"""########################### methods for reading .prm files. ##############################"""
@classmethod
@set_docstring(AbstractFileContainer._read.__doc__)
def _read(cls, file_obj, decoder):
ret = {}
special_header = {'hbond', 'nonbonded'}
iterator = (decoder(i).rstrip('\n') for i in file_obj)
for i in iterator:
if i.startswith('!') or i.startswith('*') or i.isspace() or not i:
continue # Ignore comment lines and empty lines
_key = i.split(maxsplit=1)[0]
key = _key.lower()
if _key in cls._HEADERS:
ret[key] = value = []
if key in special_header:
value.append(i.split()[1:])
continue
v, *_ = i.partition('!')
value.append(v.split())
cls._read_post_iterate(ret)
return ret
@classmethod
def _read_post_iterate(cls, kwargs: dict) -> None:
"""Post process the dictionary produced by :meth:`PRMContainer._read_iterate`."""
kwargs.pop('end', None)
kwargs.pop('end_comment', None)
nonbonded_header = None
for k, v in kwargs.items():
if k == 'hbond':
kwargs[k] = ' '.join(chain.from_iterable(v)).split('!')[0].rstrip()
elif k == 'nonbonded':
nonbonded_header = ' '.join(chain.from_iterable(v[0:2])).rstrip()
kwargs[k] = df = pd.DataFrame(v[2:])
cls._process_df(df, k)
else:
kwargs[k] = df = pd.DataFrame(v)
cls._process_df(df, k)
kwargs['nonbonded_header'] = nonbonded_header
for k, v in kwargs.items():
if isinstance(v, pd.DataFrame):
if not v.any().any():
kwargs[k] = None
@classmethod
def _process_df(cls, df: pd.DataFrame, key: _DFNames) -> None:
"""Fill in all columns, set their data type and assign an index to **df**."""
for i, default in enumerate(cls._COLUMNS[key]):
if i not in df:
df[i] = default
else:
default_type = str if default is None else type(default)
df[i] = df[i].astype(default_type, copy=False)
df.set_index(cls._INDEX[key], inplace=True)
"""########################### methods for writing .prm files. ##############################"""
@set_docstring(AbstractFileContainer._write.__doc__)
def _write(self, file_obj, encoder) -> None:
isnull = pd.isnull
write = lambda n: file_obj.write(encoder(n)) # noqa: E731
for key in self._HEADERS[:-2]:
key_low = key.lower()
df = getattr(self, key_low)
if key_low == 'hbond' and df is not None:
write(f'\n{key} {df}\n')
continue
elif not isinstance(df, pd.DataFrame):
continue
df = df.reset_index() # Do NOT modify this inplace
df_str = ('{:8} ' * df.shape[1])[:-1]
if key_low != 'nonbonded':
write(f'\n{key}\n')
else:
if self.nonbonded_header is not None:
header = '-\n'.join(i for i in self.nonbonded_header.split('-'))
write(f'\n{key} {header}\n')
for _, row_value in df.iterrows():
write_str = df_str.format(*(('' if isnull(i) else i) for i in row_value))
write(f'{write_str.rstrip()}\n')
write('\nEND\n')
"""######################### Methods for updating the PRMContainer ##########################"""
[docs] def overlay_mapping(self, prm_name: str,
param: Mapping[str, Union[SeriesIdx, SeriesMultiIdx]],
units: Optional[Iterable[Optional[str]]] = None) -> None:
"""Update a set of parameters, **prm_name**, with those provided in **param_df**.
Examples
--------
.. code:: python
>>> from FOX import PRMContainer
>>> prm = PRMContainer(...)
>>> param_dict = {}
>>> param_dict['epsilon'] = {'Cd Cd': ..., 'Cd Se': ..., 'Se Se': ...} # epsilon
>>> param_dict['sigma'] = {'Cd Cd': ..., 'Cd Se': ..., 'Se Se': ...} # sigma
>>> units = ('kcal/mol', 'angstrom') # input units for epsilon and sigma
>>> prm.overlay_mapping('nonbonded', param_dict, units=units)
Parameters
----------
prm_name : :class:`str`
The name of the parameter of interest.
See the keys of :attr:`PRMContainer.CP2K_TO_PRM` for accepted values.
param : :class:`pandas.DataFrame` or nested :class:`~collections.abc.Mapping`
A DataFrame or nested mapping with the to-be added parameters.
The keys should be a subset of
:attr:`PRMContainer.CP2K_TO_PRM[prm_name]["columns"]<PRMContainer.CP2K_TO_PRM>`.
If the index/nested sub-keys consist of strings then they'll be split and turned into
a :class:`pandas.MultiIndex`.
Note that the resulting values are *not* sorted.
units : :class:`Iterable[str] <collections.abc.Iterable>`, optional
An iterable with the input units of each column in **param_df**.
If ``None``, default to the defaults specified in
:attr:`PRMContainer.CP2K_TO_PRM[prm_name]["unit"]<PRMContainer.CP2K_TO_PRM>`.
"""
# Parse arguments
if units is None:
units = repeat(None)
try:
prm_map = self.CP2K_TO_PRM[prm_name]
except KeyError as ex:
raise ValueError(f"'prm_name is of invalid value ({prm_name!r}); "
f"accepted values: {tuple(self.CP2K_TO_PRM.keys())!r}") from ex
# Extract parameter specific arguments
name = prm_map['name']
output_units = prm_map['unit']
post_process = prm_map['post_process']
key_set = set(prm_map['key'])
str2int = {item: i for item, i in zip(prm_map['key'], prm_map['columns'])}
# Ensure that the attribute in question is a DataFrame and not None
df = getattr(self, name)
if df is None:
df = pd.DataFrame()
setattr(self, name, df)
self._process_df(df, name)
# Parse and validate the columns
param_df = pd.DataFrame(param, copy=True)
if not key_set.issuperset(param_df.columns):
raise ValueError("The keys in `param` should be a subset of "
f"`PRMContainer.CP2K_TO_PRM[{prm_name!r}]['key']`")
param_df.columns = [str2int[i] for i in param_df.columns]
# Parse and validate the index
if not isinstance(param_df.index, pd.MultiIndex):
iterator1 = (i.split() for i in param_df.index)
param_df.index = pd.MultiIndex.from_tuples(iterator1)
# Apply unit conversion and post-processing
iterator2 = zip(param_df.items(), units, output_units, post_process)
for (k, series), unit, output_unit, func in iterator2:
series_new = parse_cp2k_value(series, unit=output_unit, default_unit=unit)
if func is not None:
series_new = func(series_new)
param_df[k] = series_new
# Updated the DataFrame
columns = param_df.columns
for index, values in param_df.iterrows():
df.loc[index, columns] = values
[docs] def overlay_cp2k_settings(self, cp2k_settings: Settings) -> None:
"""Extract forcefield information from PLAMS-style CP2K settings.
Performs an inplace update of this instance.
Examples
--------
Example input value for **cp2k_settings**.
In the provided example the **cp2k_settings** are directly extracted from a CP2K .inp file.
.. code:: python
>>> import cp2kparser # https://github.com/nlesc-nano/CP2K-Parser
>>> filename = str(...)
>>> cp2k_settings: dict = cp2kparser.read_input(filename)
>>> print(cp2k_settings)
{'force_eval': {'mm': {'forcefield': {'nonbonded': {'lennard-jones': [...]}}}}}
Parameters
----------
cp2k_settings : :class:`~collections.abc.Mapping`
A Mapping with PLAMS-style CP2K settings.
"""
if 'input' not in cp2k_settings:
cp2k_settings = Settings({'input': cp2k_settings})
# If cp2k_settings is a Settings instance enable the `suppress_missing` context manager
# In this manner normal KeyErrors will be raised, just like with dict
if isinstance(cp2k_settings, Settings):
context_manager = cp2k_settings.suppress_missing
else:
context_manager = nullcontext
with context_manager():
for prm_map in self.CP2K_TO_PRM.values():
name = prm_map['name']
columns = list(prm_map['columns'])
key_path = prm_map['key_path']
key = prm_map['key']
unit = prm_map['unit']
default_unit = prm_map['default_unit']
post_process = prm_map['post_process']
self._overlay_cp2k_settings(cp2k_settings,
name, columns, key_path, key, unit,
default_unit, post_process)
def _overlay_cp2k_settings(
self,
cp2k_settings: Settings,
name: _DFNames,
columns: MutableSequence[int],
key_path: Sequence[str],
key: Iterable[str],
unit: Iterable[str],
default_unit: Iterable[Optional[str]],
post_process: Iterable[Optional[PostProcess]],
) -> None:
"""Helper function for :meth:`PRMContainer.overlay_cp2k_settings`."""
# Extract the appropiate dict or sequence of dicts
try:
prm_iter = Settings.get_nested(cp2k_settings, key_path)
except KeyError:
return
else:
prm_iter = (prm_iter,) if isinstance(prm_iter, Mapping) else prm_iter
# Ensure that PRMContainter section is a DataFrame and not None
df = getattr(self, name)
if df is None:
df = pd.DataFrame()
setattr(self, name, df)
self._process_df(df, name)
# Extract, parse and write the values
for i, prm_dict in enumerate(prm_iter):
try: # Extract the appropiate values
index = prm_dict['atoms']
value_gen = (prm_dict[k] for k in key)
except KeyError as ex:
raise KeyError(f"Failed to extract the {ex!r} key from "
f"{key_path[-1]!r} block {i}") from ex
# Sanitize the values and convert them into appropiate units
iterator = zip(value_gen, unit, default_unit)
value_list = [parse_cp2k_value(*args) for args in iterator]
# Post-process the values
for i, (prm, func) in enumerate(zip(value_list, post_process)):
if func is not None:
value_list[i] = func(prm)
# Assign the values
df.loc[index, columns] = value_list
def _concatenate(
self,
prm_iter: Iterable[PRMContainer],
field: _DFNames,
) -> None | pd.DataFrame:
iterator: Iterator[pd.DataFrame | None] = chain(
[getattr(self, field)],
(getattr(i, field) for i in prm_iter),
)
df_list = [i for i in iterator if i is not None]
if len(df_list) == 0:
return None
df = pd.concat(df_list)
return df.loc[df.index.drop_duplicates(), :]
[docs] def concatenate(self, prm_iter: Iterable[PRMContainer]) -> Self:
"""Concatenate multiple PRMContainers into a single instance.
Parameters
----------
prm_iter : list[FOX.PRMContainer]
A list with other PRMContainers to concatenate
Returns
-------
FOX.PRMContainer
The new concatenated PRMContainer
"""
prm_list: list[PRMContainer] = []
for prm in prm_iter:
if not isinstance(prm, PRMContainer):
raise TypeError("Expected a PRMContainer")
prm_list.append(prm)
cls = type(self)
return cls(
hbond=self.hbond,
nonbonded_header=self.nonbonded_header,
atoms=self._concatenate(prm_list, "atoms"),
bonds=self._concatenate(prm_list, "bonds"),
angles=self._concatenate(prm_list, "angles"),
dihedrals=self._concatenate(prm_list, "dihedrals"),
nbfix=self._concatenate(prm_list, "nbfix"),
nonbonded=self._concatenate(prm_list, "nonbonded"),
impropers=self._concatenate(prm_list, "impropers"),
)