Source code for FOX.armc.armc_pt

"""A module for performing Addaptive Rate Monte Carlo Parallel Tempering (ARMC-PT) forcefield parameter optimizations.

Index
-----
.. currentmodule:: FOX.armc
.. autosummary::
    ARMCPT

API
---
.. autoclass:: ARMCPT
    :members:

"""  # noqa: E501

from typing import (
    Tuple, Dict, Mapping, Iterable, List, Sequence, overload, Any, TYPE_CHECKING, Callable,
    ClassVar, Optional
)

import numpy as np

from nanoutils import Literal
from .armc import ARMC
from ..type_hints import ArrayOrScalar
from ..io.hdf5_utils import create_xyz_hdf5

if TYPE_CHECKING:
    from ..classes import MultiMolecule
    from ..io import PSFContainer
else:
    from ..type_alias import MultiMolecule, PSFContainer

__all__ = ['ARMCPT']

PesDict = Dict[str, ArrayOrScalar]
PesMapping = Mapping[str, ArrayOrScalar]

MolList = List[MultiMolecule]
MolIter = Iterable[MultiMolecule]

Key = Tuple[float, ...]
SwapFunc = Callable[[np.ndarray, "ARMCPT"], Iterable[Tuple[int, int]]]


def swap_random(acceptance: np.ndarray, armcpt: "ARMCPT",
                weighted: bool = False) -> List[Tuple[int, int]]:
    r"""Swap the :math:`\phi` and move range of two between forcefield parameter sets.

    The two main parameters are the acceptance rate
    :math:`\boldsymbol{\alpha} \in \mathbb{R}^{(n,)}` and target acceptance rate
    :math:`\boldsymbol{\alpha^{t}} \in \mathbb{R}^{(n,)}`,
    both vector's elements being the :math:`[0,1]` interval.

    The (weighted) probability distribution :math:`\boldsymbol{p} \in \mathbb{R}^{(n,)}` is
    consequently used for identifying which two forcefield parameter sets
    are swapped.

    .. math::

        \hat{\boldsymbol{p}} = |\boldsymbol{\alpha} - \boldsymbol{\alpha^{t}}|^{-1}

        \boldsymbol{p} = \frac{\hat{\boldsymbol{p}}}{\sum^n_{i} {\hat{\boldsymbol{p}}}_{i}}

    Parameters
    ----------
    acceptance : :class:`np.ndarray[np.bool_] <numpy.ndarray>`, shape :math:`(n, m)`
        A 2D boolean array with acceptance rates over
        the course of the last super-iteration.
    armc : :class:`FOX.armc.ARMCPT`
        An ARMCPT instance.
    weighted : :class:`bool`
        If ``True``, identify the to-be swapped parameters via a
        weighted probability distribution.

    Returns
    -------
    :data:`List[Tuple[int, int]]<typing.List>`
        A list of 2-tuples with to-be swapped indices.

    """
    self = armcpt
    if weighted:
        _p = acceptance.mean(axis=0) - self.phi.a_target
        if 0 in _p:
            p = np.zeros_like(_p)
            p[_p == 0] = True
        else:
            p = _p**-1
    else:
        p = np.ones(acceptance.shape[1])
    p /= p.sum()  # normalize

    idx_range = np.arange(len(p))
    idx1 = np.random.choice(idx_range, p=p)
    idx2 = np.random.choice(idx_range, p=p)

    if idx1 != idx2:
        self.logger.info(f"Swapping parameter sets {idx1} and {idx2}")
        return [(idx1, idx2)]
    else:
        self.logger.info("Preserving all parameter sets")
        return []


[docs]class ARMCPT(ARMC): """An :class:`ARMC` subclass implementing a parallel tempering procedure.""" HAS_LOOP: ClassVar[Literal[True]] = True def __init__(self, swapper: SwapFunc = swap_random, **kwargs: Any) -> None: r"""Initialize an :class:`ARMCPT` instance. Parameters ---------- swapper : :data:`Callable[[ndarray, ARMCPT], Iterable[Tuple[int, int]]<collections.abc.Callable>` A callable for identifying the to-be swapped parameter. Should return an iterable with 2-tuples of to-be swapped indices. \**kwargs : :data:`~typing.Any` Further keyword arguments for the :class:`ARMC` and :class:`MonteCarloABC` superclasses. """ # noqa: E501 super().__init__(**kwargs) self.param._is_independent = True self.swap_phi: SwapFunc = swapper if len(self.phi.phi) <= 1: raise ValueError("{self.__class__.__name__!r} requires 'phi.phi' to " "contain more than 1 element") elif len(self._molecule) > 1: raise NotImplementedError
[docs] def acceptance(self) -> np.ndarray: """Create an empty 2D boolean array for holding the acceptance.""" shape = (self.sub_iter_len, len(self.phi.phi)) return np.zeros(shape, dtype=bool)
@overload def _parse_call(self, start: None = ..., key_new: None = ...) -> List[Key]: ... @overload def _parse_call(self, start: int, key_new: Iterable[Key]) -> List[Key]: ... def _parse_call(self, start=None, key_new=None): # noqa: E301 """Parse the arguments of :meth:`__call__` and prepare the first key.""" ret = super()._parse_call(start, key_new) if start is key_new is None: self[ret] = self[ret][0] i = len(self.param.param.columns) return i * [ret] else: return list(ret) @overload def __call__(self, start: None = ..., key_new: None = ...) -> None: ... @overload def __call__(self, start: int, key_new: Key) -> None: ... def __call__(self, start=None, key_new=None): # noqa: E301 """Initialize the Addaptive Rate Monte Carlo procedure.""" key_new = self._parse_call(start, key_new) start_ = start if start is not None else 0 # Start the main loop for kappa in range(start_, self.super_iter_len): acceptance = self.acceptance() create_xyz_hdf5(self.hdf5_file, self.molecule, iter_len=self.sub_iter_len, phi=self.phi.phi) for omega in range(self.sub_iter_len): key_new = self.do_inner(kappa, omega, acceptance, key_new) self.phi.update(acceptance) idx_iter = self.swap_phi(acceptance, self) self._swap_phi(idx_iter)
[docs] def do_inner(self, kappa: int, omega: int, acceptance: np.ndarray, key_old: Sequence[Key]) -> List[Key]: r"""Run the inner loop of the :meth:`ARMC.__call__` method. Parameters ---------- kappa : :class:`int` The super-iteration, :math:`\kappa`, in :meth:`ARMC.__call__`. omega : :class:`int` The sub-iteration, :math:`\omega`, in :meth:`ARMC.__call__`. acceptance : :class:`np.ndarray[np.bool_] <numpy.ndarray>` An array with the acceptance over the course of the latest super-iteration key_new : :class:`tuple[float, ...]` A tuple with the latest set of forcefield parameters. Returns ------- :class:`tuple[float, ...]` The latest set of parameters. """ # Step 1: Perform a random move _key_new = [self._do_inner1(key, idx=i) for i, key in enumerate(key_old)] # Step 2: Calculate PES descriptors pes_new, pes_validation, mol_list = self._do_inner2() # Step 3: Evaluate the auxiliary error; accept if the new parameter set lowers the error error_change, aux_new, aux_validation = self._do_inner3(pes_new, pes_validation, key_old) accept: np.ndarray = error_change < 0 # Step 4: Update the auxiliary error history, apply phi & update job settings acceptance[omega] = accept key_new = self._do_inner4(accept, error_change, aux_new, _key_new, key_old, kappa, omega) # Step 5: Export the results to HDF5 self._do_inner5(mol_list, accept, aux_new.T, aux_validation.T, pes_new, pes_validation, kappa, omega) return key_new
def _do_inner3(self, pes_new: PesMapping, pes_validation: PesMapping, key_old: Iterable[Key]) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: """Evaluate the auxiliary error; accept if the new parameter set lowers the error.""" aux_new = self.get_aux_error(pes_new) aux_validation = self.get_aux_error(pes_validation, validation=True) aux_old = np.array([self[k] for k in key_old]) error_change = (aux_new - aux_old).sum(axis=-1) return error_change, aux_new, aux_validation def _do_inner4(self, accept: Iterable[bool], error_change: Iterable[bool], aux_new: Iterable[np.ndarray], key_new: Iterable[Key], key_old: Iterable[Key], kappa: int, omega: int) -> List[Key]: """Update the auxiliary error history, apply phi & update job settings.""" ret = [] enumerator = enumerate(zip(key_new, key_old, accept, error_change, aux_new)) for i, (k_new, k_old, acc, err_change, _aux_new) in enumerator: err_round = round(err_change, 4) aux_round = round(_aux_new.sum(), 4) epilog = f'error_change = {err_round}; error = {aux_round}' if acc: self.logger.info(f"Accepting move {(kappa, omega)}: {epilog}") self[k_new] = self.apply_phi(_aux_new, idx=i) self.param.param_old[i] = self.param.param[i] ret.append(k_new) else: self.logger.info(f"Rejecting move {(kappa, omega)}: {epilog}") self[k_new] = _aux_new self[k_old] = self.apply_phi(self[k_old], idx=i) ret.append(k_old) return ret def _swap_phi(self, idx_iter: Iterable[Tuple[int, int]]) -> None: """Swap the array-elements **idx1** and **idx2** of four :class:`ARMCPT` attributes. Affects the following attributes: * :attr:`ARMCPT.phi.phi<FOX.armc.PhiUpdater.phi>` * :attr:`ARMCPT.phi.a_target<FOX.armc.PhiUpdater.a_target>` * :attr:`ARMCPT.phi.gamma<FOX.armc.PhiUpdater.gamma>` * :attr:`ARMCPT.param.move_range<FOX.armc.ParamMapping.move_range>` """ for idx1, idx2 in idx_iter: i = [idx1, idx2] j = [idx2, idx1] self.phi.phi[i] = self.phi.phi[j] self.phi.a_target[i] = self.phi.a_target[j] self.phi.gamma[i] = self.phi.gamma[j] self.param.move_range[i] = self.param.move_range[j]
[docs] def to_yaml_dict( # type: ignore[override] self, *, path: str = '.', folder: str = 'MM_MD_workdir', logfile: str = 'armc.log', psf: Optional[Iterable[PSFContainer]] = None, ) -> Dict[str, Any]: """Convert an :class:`ARMC` instance into a .yaml readable by :class:`ARMC.from_yaml`. Returns ------- :data:`dict[str, Any] <dict>` A dictionary. """ ret = super().to_yaml_dict(path=path, folder=folder, logfile=logfile, psf=psf) # Convert lists of settings into just settings for k, v in ret['job'].items(): if k in {'molecule', 'type', 'lattice'}: continue v['settings'] = v['settings'][0] for k in ('pes', 'pes_validation'): for _k, v in ret[k].items(): ret[k][_k]["weight"] = ret[k][_k]["weight"][:1] return ret