"""A module containing the :class:`ParamMapping` class.
Index
-----
.. currentmodule:: FOX.armc
.. autosummary::
ParamMappingABC
ParamMapping
API
---
.. autoclass:: ParamMappingABC
:members:
.. autoclass:: ParamMapping
:members:
"""
from __future__ import annotations
from itertools import chain
from copy import deepcopy
from abc import ABC, abstractmethod
from types import MappingProxyType
from logging import Logger
from functools import wraps, partial
from collections import defaultdict
from typing import (
Any, TypeVar, Optional, Tuple, Mapping, Iterable, ClassVar, Union,
Callable, FrozenSet, cast, MutableMapping, TYPE_CHECKING, Dict,
)
import h5py
import numpy as np
import pandas as pd
from assertionlib.dataclass import AbstractDataClass
from nanoutils import Literal, TypedDict, set_docstring
from ..type_hints import ArrayLike
from ..functions.charge_utils import update_charge, get_net_charge, ChargeError
from ..functions.cp2k_utils import UNIT_MAP_REVERSED
if TYPE_CHECKING:
from pandas.core.generic import NDFrame
else:
from ..type_alias import NDFrame
__all__ = ['ParamMappingABC', 'ParamMapping']
# A generic typevar
T = TypeVar('T')
# MultiIndex keys
Tup3 = Tuple[Any, Any, Any]
Tup2 = Tuple[Any, Any]
# All dict keys in ParamMappingABC
MetadataKeys = Literal['min', 'max', 'count', 'frozen', 'guess', 'unit']
# A function for moving parameters
MoveFunc = Callable[[float, float], float]
class _InputMapping(TypedDict):
param: Union[NDFrame, Mapping[str, pd.Series]]
class InputMapping(_InputMapping, total=False):
"""A :class:`~typing.TypedDict` representing the :class:`ParamMappingABC` input."""
min: pd.Series
max: pd.Series
count: pd.Series
frozen: pd.Series
guess: pd.Series
unit: pd.Series
def _parse_param(dct: MutableMapping[str, Any]) -> pd.DataFrame:
# Check that the 'param' key is present
try:
_param = dct['param']
except KeyError as ex:
raise KeyError(f"The {'param'!r} key is absent from the passed mapping") from ex
# Cast the data into the correct shape
if isinstance(_param, pd.Series):
dct['param'] = param = _param.to_frame(name=0)
elif isinstance(_param, pd.DataFrame):
param = _param
else:
try:
dct['param'] = param = pd.DataFrame(_param)
except TypeError as ex:
raise TypeError(f"the 'param' value expected a Series, DataFrame or dict of Series; "
f"observed type: {_param.__class__.__name__!r}") from ex
# Check that it has a 3-level MultiIndex
n_level = 3
if not isinstance(param.index, pd.MultiIndex):
raise TypeError(f"Series.index expected a {n_level}-level MultiIndex; "
f"observed type: {param.index.__class__.__name!r}")
elif len(param.index.levels) != n_level:
raise ValueError(f"Series.index expected a {n_level}-level MultiIndex; "
f"observed number levels: {len(param.index.levels)}")
return param
[docs]class ParamMappingABC(AbstractDataClass, ABC):
r"""A :class:`~collections.abc.Mapping` for storing and updating forcefield parameters.
Besides the implementation of the :class:`~collections.abc.Mapping` protocol,
this class has access to four main methods:
* :meth:`__call__` or :meth:`move` move a random parameter by a random step size.
* :meth:`identify_move` identify the parameter and move step size.
* :meth:`clip_move` clip the move.
* :meth:`apply_constraints` apply further constraints to the move.
Note that :meth:`__call__` will internally call all other three methods.
Examples
--------
.. code:: python
>>> import pandas as pd
>>> df = pd.DataFrame(..., index=pd.MultiIndex(...))
>>> param = ParamMapping(df, ...)
>>> idx = param.move()
Attributes
----------
move_range : :class:`np.ndarray[np.float64] <numpy.ndarray>`, shape :math:`(n,)`
An 1D array with all allowed move steps.
func : :class:`~collections.abc.Callable`
The callable used for applying :math:`\phi` to the auxiliary error.
The callable should take an two floats as arguments and return a new float.
_net_charge : :class:`float`, optional
The net charge of the molecular system.
Only applicable if the ``"charge"`` is among the passed parameters.
"""
_net_charge: Optional[np.ndarray]
_move_range: np.ndarray
_is_independent: bool
#: Fill values for when optional keys are absent.
FILL_VALUE: ClassVar[MappingProxyType[MetadataKeys, np.generic]] = MappingProxyType({
'min': np.float64(-np.inf),
'max': np.float64(np.inf),
'count': np.int64(-1),
'frozen': np.False_,
'guess': np.False_,
'unit': np.str_(''),
})
_PRIVATE_ATTR = frozenset({'_net_charge'}) # type: ignore
def __init__(
self,
data: Union[InputMapping, pd.DataFrame],
move_range: Iterable[float],
func: MoveFunc,
constraints: Optional[Mapping[Tup2, Optional[Iterable[Mapping[str, float]]]]] = None,
is_independent: bool = False,
**kwargs: Any,
) -> None:
r"""Initialize an :class:`ParamMappingABC` instance.
Parameters
----------
data : :class:`pandas.DataFrame` or :class:`Mapping[str, pd.Series] <collections.abc.Mapping>`
A DataFrame (or dict of Series) containing the ``"param"`` key
with the forcefield parameters.
The index should be a 2-level :class:`~pandas.MultiIndex`,
the first level containg the parameter name and the second the atom (pair).
Optionally it can also contain one or more of the following keys:
* ``"param_old"`` (:class:`float`): Old paramters from a previous iteration.
* ``"min"`` (:class:`float`): The minimum value for each parameter.
* ``"max"`` (:class:`float`): The maximum value for each parameter.
* ``"constraints"`` (:class:`object`): A dictionary of constraints for each parameter.
Setting a value to ``None`` means that there are no constraints for that parameter.
* ``"count"`` (:class:`int`): The number of unique atom (pairs) assigned
to a given parameter.
Currently only important when updating the charge,
as the latter requires a charge renormalization after every move to ensure
the total molecular charge remains constant.
See :attr:`ParamMappingABC._data`.
move_range : :class:`.Iterable[float] <collections.abc.Iterable>`
An iterable with all allowed move steps.
See :attr:`ParamMappingABC.move_range`.
func : :class:`~Collections.abc.Callable[[float, float], float]`
The callable used for applying :math:`\phi` to the auxiliary error.
The callable should take an two floats as arguments and return a new float.
See :attr:`ParamMappingABC.func`.
is_independent : :class:`bool`
Whether the parameters from different parameter-sets are shared or independent
w.r.t. each other.
\**kwargs : :data:`~typing.Any`
Further keyword arguments for **func**.
""" # noqa
super().__init__()
self._set_data(data)
self.move_range = cast(np.ndarray, move_range)
self.func: Callable[[float, float], float] = wraps(func)(partial(func, **kwargs))
self.constraints = cast(Dict[Tup2, Optional[Tuple[pd.Series, ...]]], constraints)
self._is_independent = is_independent
# Properties
@property
def is_independent(self) -> bool:
return self._is_independent
@property
def constraints(self) -> Dict[Tup2, Optional[Tuple[pd.Series, ...]]]:
return self._constraints
@constraints.setter
def constraints(
self, value: Optional[Mapping[Tup2, Optional[Iterable[Mapping[str, float]]]]]
) -> None:
if value is None:
dct: Dict[Tup2, Optional[Tuple[pd.Series, ...]]] = {}
else:
func = lambda v: tuple(pd.Series(i) for i in v) if v is not None else None # noqa: E731,E501
dct = {k: func(v) for k, v in value.items()}
for key in self.param.index:
dct.setdefault(key[:2], None)
self._constraints = dct
@property
def move_range(self) -> np.ndarray:
return self._move_range
@move_range.setter
def move_range(self, value: ArrayLike) -> None:
_ar = np.array(value, dtype=float, ndmin=1, copy=False)
prm_len = len(self.param.columns)
if _ar.ndim == 2:
if len(_ar) != prm_len:
if prm_len == 1:
for i in range(1, len(_ar)):
self.param[i] = self.param[0].copy()
self.param_old[i] = self.param_old[0].copy()
for (_, k), v in self.metadata.items():
self.metadata[i, k] = v.copy()
else:
raise ValueError(f"Expected 'move_range' length: {prm_len}; "
f"observed length: {len(_ar)}")
ar = _ar
elif _ar.ndim == 1:
ar = np.tile(_ar, prm_len)
ar.shape = prm_len, -1
else:
raise ValueError("'move_range' expected a 1D or 2D array; "
f"observed dimensionality: {_ar.ndim}")
self._move_range: np.ndarray = ar
def _set_data(self, value: Union[InputMapping, pd.DataFrame]) -> None:
dct = dict(value)
# Check that the 'param' key is present
param = _parse_param(dct)
# Fill in the defaults
metadata = pd.DataFrame(
index=param.index,
columns=pd.MultiIndex(
levels=[pd.Index([], dtype=np.int64), pd.Index([], dtype=np.object_)],
codes=[[], []],
),
)
for i in param.columns:
for name, fill_value in self.FILL_VALUE.items():
if name not in dct:
metadata[i, name] = fill_value
else:
metadata[i, name] = np.asarray(dct[name], dtype=fill_value.dtype)
# Construct a dictionary to contain the old parameter
self.param = param
self.param_old = param.copy()
self.metadata = metadata
# Cache the total charge of the system
self._set_net_charge()
# Magic methods and Mapping implementation
def __eq__(self, value: Any) -> bool:
"""Implement :meth:`self == value <object.__eq__>`."""
if type(self) is not type(value):
return False
ret = np.all(self.move_range == value.move_range)
if self._net_charge is not None and value._net_charge is not None:
ret &= (
(self._net_charge.shape == value._net_charge.shape) and
np.all(self._net_charge == value._net_charge)
)
else:
ret &= type(self._net_charge) is type(value._net_charge)
if not ret:
return False
names = ["func", "args", "keywords"]
v1, v2 = self.func, value.func
if not all(getattr(v1, n, None) == getattr(v2, n, None) for n in names):
return False
names = ["param", "param_old", "metadata", "is_independent"]
return all(np.array_equal(getattr(self, n, None), getattr(value, n, None)) for n in names)
@AbstractDataClass.inherit_annotations()
def _str_iterator(self):
return ((k.strip('_'), v) for k, v in super()._str_iterator())
def _set_net_charge(self) -> None:
"""Set the total charge of the system."""
if 'charge' in self.param.index:
iterator = (
get_net_charge(
self.param.loc['charge', i],
self.metadata.loc['charge', (i, 'count')]
) for i in self.param.columns
)
array = np.fromiter(iterator, dtype=np.float64)
array.setflags(write=False)
self._net_charge = array
else:
self._net_charge = None
def _net_charge_to_integer(self) -> None:
if self._net_charge is not None:
self._net_charge = np.round(self._net_charge).astype(np.int64)
self._net_charge.setflags(write=False)
# The actual meat of the class
[docs] def add_param(self, idx: Tup3, value: float, **kwargs: Any) -> None:
r"""Add a new parameter to this instance.
Parameters
----------
idx : :class:`tuple[str, str, str] <tuple>`
The index of the new parameter.
Must be compatible with ``pd.DataFrame.loc``.
value : :class:`float`
The value of the new parameter.
\**kwargs : :data:`~typing.Any`
Values for :class:`ParamMappingABC.metadata`.
"""
self.param.loc[idx] = value
self.param_old.loc[idx] = value
idx_range = self.metadata.columns.levels[0]
metadata = {(i, k): v for i in idx_range for k, v in self.FILL_VALUE.items()}
metadata.update(((i, k), v) for i in idx_range for k, v in kwargs.items())
self.metadata.loc[idx] = metadata
def __call__(self, logger: Optional[Logger] = None,
param_idx: int = 0) -> Union[Exception, Tup3]:
"""Update a random parameter in **self.param** by a random value from **self.move.range**.
Performs in inplace update of the ``'param'`` column in **self.param**.
By default the move is applied in a multiplicative manner.
**self.job.md_settings** and **self.job.preopt_settings** are updated to reflect the
change in parameters.
Parameters
----------
logger : :class:`logging.Logger`, optional
A logger for reporting the updated value.
param_idx : :class:`int`
The index of the parameter.
Only relevant when multiple parameter sets have to be stored
(see :class:`MultiParamMaping`).
Returns
-------
:class:`tuple[str, str] <tuple>`
The index of the updated parameter.
"""
# Prepare arguments a move
idx, x1, x2 = self.identify_move(param_idx)
_value = self.func(x1, x2)
value = self.clip_move(idx, _value, param_idx)
# Create a call to the logger
if logger is not None:
_, prm_type, atoms = idx
logger.info(f"Moving {prm_type} ({atoms}): {x1:.4f} -> {value:.4f}")
ex = self.apply_constraints(idx, value, param_idx)
if ex is not None:
return ex
self.param.loc[idx, param_idx] = value
return idx
def set_n_columns(self, n: int) -> None:
if len(self.param.columns) != 1:
raise ValueError
for i in range(1, n):
self.param[i] = self.param[0]
self.param_old[i] = self.param_old[0]
for j in self.metadata.columns.levels[1]:
self.metadata[i, j] = self.metadata[0, j]
[docs] @abstractmethod
def identify_move(self, param_idx: int) -> Tuple[Tup3, float, float]:
"""Identify the to-be moved parameter and the size of the move.
Parameters
----------
param_idx : :class:`str`
The name of the parameter-containg column.
Returns
-------
:class:`tuple[tuple[str, str, str], float, float] <tuple>`
The index of the to-be moved parameter, it's value and the size of the move.
""" # noqa
raise NotImplementedError('Trying to call an abstract method')
[docs] def clip_move(self, idx: Tup3, value: float, param_idx: int) -> float:
"""An optional function for clipping the value of **value**.
Parameters
----------
idx : :class:`tuple[str, str, str] <tuple>`
The index of the moved parameter.
value : :class:`float`
The value of the moved parameter.
param_idx : :class:`str`
The name of the parameter-containg column.
Returns
-------
:class:`float`
The newly clipped value of the moved parameter.
""" # noqa
return value
[docs] def apply_constraints(self, idx: Tup3, value: float, param: int) -> None | BaseException:
"""An optional function for applying further constraints based on **idx** and **value**.
Should perform an inplace update of this instance.
Parameters
----------
idx : :class:`tuple[str, str, str] <tuple>`
The index of the moved parameter.
value : :class:`float`
The value of the moved parameter.
param : :class:`str`
The name of the parameter-containg column.
Returns
-------
:class:`Exception`, optional
Any exceptions raised during this functions' call.
""" # noqa
pass
[docs] def to_struct_array(self) -> np.ndarray:
"""Stack all :class:`~pandas.Series` in this instance into a single structured array."""
cls = type(self)
dtype_dict = {k: type(v) for k, v in cls.FILL_VALUE.items()}
dtype_dict['unit'] = h5py.string_dtype('utf-8')
dtype: np.dtype[np.void] = np.dtype(list(dtype_dict.items()))
ret = []
for i in self.metadata.columns.levels[0]: # type: int
iterator = (v for _, v in self.metadata[i].items())
ret.append(np.rec.fromarrays(iterator, dtype=dtype))
return np.array(ret)
[docs] def constraints_to_str(self) -> pd.Series:
"""Convert the constraints into a human-readably :class:`pandas.Series`."""
dct = {k: '' for k in self.constraints}
for key, tup in self.constraints.items():
if tup is None:
continue
dct[key] += ' == '.join(
' + '.join(f'{v}*{k}' for k, v in series.items()) for series in tup
)
ret = pd.Series(dct)
ret.name = 'constraints'
ret.index.names = self.param.index.names[:2]
return ret
def to_yaml_dict(self) -> Dict[str, Any]:
cls = type(self)
func = cast('partial[float]', self.func)
try:
if isinstance(func.func, np.ufunc):
module = 'numpy'
else:
module = func.func.__module__
name = getattr(func.func, '__qualname__', func.func.__name__)
except AttributeError as ex:
raise TypeError(f"Failed to parse {cls.__name__}.func.func: {func.func!r}") from ex
ret = {
'type': f'{cls.__module__}.{cls.__name__}',
'move_range': self.move_range.tolist(),
'func': f'{module}.{name}',
'kwargs': func.keywords,
'validation': {
'allow_non_existent': True,
'charge_tolerance': 'inf'
}
}
_template = {
'param': '',
'constraints': [],
'frozen': {},
}
idx_dict = {}
index = self.param.index
constraints = self.constraints_to_str()
for key, param, _ in index:
if (key, param) in idx_dict:
continue
template = deepcopy(_template)
template['param'] = param
if constraints[key, param]:
template['constraints'].append(constraints[key, param])
lst = ret.setdefault(key, [])
lst.append(template)
idx_dict[key, param] = len(lst) - 1
# Set the extremites
metadata = self.metadata[[(0, 'min'), (0, 'max')]]
for (key, param, atom), (min_, max_) in metadata.iterrows():
i = idx_dict[key, param]
ret[key][i]['constraints'].append(f'{min_} < {atom} < {max_}')
# Set the parameters
iterator = ((k, self.param.at[k, 0], self.metadata.loc[k, [(0, 'frozen'), (0, 'unit')]]) for k in index) # noqa: E501
for (key, param, atom), value, (frozen, unit) in iterator:
i = idx_dict[key, param]
if frozen:
ret[key][i]['frozen'][atom] = value.item()
else:
ret[key][i][atom] = value.item()
ret[key][i]['unit'] = unit or None
return ret
[docs] def get_cp2k_dicts(self) -> list[defaultdict[str, pd.DataFrame]]:
"""Get dictionaries with CP2K parameters that are parsable by QMFlows."""
ret = []
df_template = pd.DataFrame(columns=["param", "unit"], dtype=object)
for i, series in self.param.items():
dct: defaultdict[str, pd.DataFrame] = defaultdict(df_template.copy)
visited = set()
for (k, param, atom), v in series.items():
visited_key = (k, param)
if visited_key not in visited:
unit = self.metadata.at[(k, param, atom), (i, "unit")] or None
visited.add(visited_key)
dct[k].at[param, "param"] = param
dct[k].at[param, "unit"] = UNIT_MAP_REVERSED.get(unit, unit)
dct[k].at[param, atom] = v
ret.append(dct)
return ret
MOVE_RANGE = np.array([[
0.900, 0.905, 0.910, 0.915, 0.920, 0.925, 0.930, 0.935, 0.940,
0.945, 0.950, 0.955, 0.960, 0.965, 0.970, 0.975, 0.980, 0.985,
0.990, 0.995, 1.005, 1.010, 1.015, 1.020, 1.025, 1.030, 1.035,
1.040, 1.045, 1.050, 1.055, 1.060, 1.065, 1.070, 1.075, 1.080,
1.085, 1.090, 1.095, 1.100
]], dtype=float)
MOVE_RANGE.setflags(write=False)
[docs]@set_docstring(ParamMappingABC.__doc__)
class ParamMapping(ParamMappingABC):
#: A set of charge-like parameters which require a parameter re-normalization after every move.
CHARGE_LIKE: ClassVar[FrozenSet[str]] = frozenset({
'charge'
})
@ParamMappingABC.inherit_annotations()
def __init__(self, data, move_range=MOVE_RANGE, func=np.multiply, **kwargs):
super().__init__(data, move_range, func=func, **kwargs)
[docs] def identify_move(self, param_idx: int) -> Tuple[Tup3, float, float]:
"""Identify and return a random parameter and move size.
Parameters
----------
param_idx : :class:`int`
The name of the parameter-containg column.
Returns
-------
:class:`tuple[tuple[str, str, str], float, float] <tuple>`
The index of the to-be moved parameter, it's value and the size of the move.
""" # noqa
# Define a random parameter
variable = ~self.metadata[param_idx, 'frozen']
random_prm: pd.Series = self.param.loc[variable, param_idx].sample()
idx, x1 = next(random_prm.items()) # Type: Tup3, float
# Define a random move size
x2: float = np.random.choice(self.move_range[param_idx], 1)[0]
return idx, x1, x2
[docs] def clip_move(self, idx: Tup3, value: float, param_idx: int) -> np.float64:
"""Ensure that **value** falls within a user-specified range.
Parameters
----------
idx : :class:`tuple[str, str, str] <tuple>`
The index of the moved parameter.
value : :class:`float`
The value of the moved parameter.
param_idx : :class:`int`
The name of the parameter-containg column.
Returns
-------
:class:`float`
The newly clipped value of the moved parameter.
""" # noqa
value_min = self.metadata.loc[idx, (param_idx, 'min')]
value_max = self.metadata.loc[idx, (param_idx, 'max')]
return np.clip(value, value_min, value_max)
[docs] def apply_constraints(self, idx: Tup3, value: float, param_idx: int) -> None | ChargeError:
"""Apply further constraints based on **idx** and **value**.
Performs an inplace update of this instance.
Parameters
----------
idx : :class:`tuple[str, str, str] <tuple>`
The index of the moved parameter.
value : :class:`float`
The value of the moved parameter.
param_idx : :class:`int`
The name of the parameter-containg column.
""" # noqa
key = idx[:2]
atom = idx[2]
is_charge_like = key[1] in self.CHARGE_LIKE
if self.is_independent:
iterator: Iterable[int] = [param_idx]
else:
iterator = chain([param_idx], (i for i in self.param.columns if i != param_idx))
param_backup = self.param.copy()
exclude_old: set[str] = set()
for i in iterator:
charge = self._net_charge[i] if is_charge_like else None
frozen = self.metadata.loc[key, (i, 'frozen')]
count = self.metadata.loc[key, (i, 'count')]
count_zero: set[str] = set(count.index[count == 0])
if frozen.any():
exclude: None | set[str] = set(frozen.index[frozen]) | exclude_old | count_zero
else:
exclude = None if not exclude_old else exclude_old | count_zero
exclude_old.update(count.index[count != 0])
ret = update_charge(
atom, value,
param=self.param.loc[key, i],
count=count,
atom_coefs=self.constraints[key],
prm_min=self.metadata.loc[key, (i, 'min')],
prm_max=self.metadata.loc[key, (i, 'max')],
net_charge=charge,
exclude=exclude,
)
# NOTE: pandas bugs out if the assignment is carried out with the help of
# a dataframe, instead setting all values `nan`; use an `ndarray` instead.
if not self.is_independent:
if exclude is not None:
include = self.param.loc[key, :].index.difference(exclude)
key_list = [(*key, i) for i in include]
self.param.loc[key_list] = self.param.loc[key_list, i].values[..., None]
else:
self.param.loc[key] = self.param.loc[key, i].values[..., None]
if ret is not None:
self.param[:] = param_backup
if exclude is not None:
ret.atoms = self.param.loc[key, :].index.difference(exclude)
else:
ret.atoms = self.param.loc[key, :].index
return ret
return None