"""A module for performing Addaptive Rate Monte Carlo (ARMC) forcefield parameter optimizations.
Index
-----
.. currentmodule:: FOX.armc
.. autosummary::
ARMC
API
---
.. autoclass:: ARMC
:members:
"""
from __future__ import annotations
import os
from pathlib import Path
from itertools import repeat
from typing import (
Tuple, TYPE_CHECKING, Any, Optional, Iterable, Mapping, Callable,
overload, Dict, List, ClassVar, Collection
)
import h5py
import numpy as np
from nanoutils import Literal
from .monte_carlo import MonteCarloABC
from ..classes.multi_mol import MultiMolecule
from ..type_hints import ArrayOrScalar
from ..io.hdf5_utils import (
create_hdf5, to_hdf5, create_xyz_hdf5, _get_filename_xyz, hdf5_clear_status
)
if TYPE_CHECKING:
from .phi_updater import PhiUpdater
from ..io.read_psf import PSFContainer
else:
from ..type_alias import PhiUpdater, PSFContainer
__all__ = ['ARMC']
PesDict = Dict[str, ArrayOrScalar]
PesMapping = Mapping[str, ArrayOrScalar]
MolList = List[MultiMolecule]
MolCollection = Collection[MultiMolecule]
Key = Tuple[float, ...]
[docs]class ARMC(MonteCarloABC):
r"""The Addaptive Rate Monte Carlo class (:class:`.ARMC`).
A subclass of :class:`MonteCarloABC`.
Attributes
----------
iter_len : :class:`int`
The total number of ARMC iterations :math:`\kappa \omega`.
super_iter_len : :class:`int`
The length of each ARMC subiteration :math:`\kappa`.
sub_iter_len : :class:`int`
The length of each ARMC subiteration :math:`\omega`.
phi : :class:`PhiUpdaterABC`
A PhiUpdater instance.
\**kwargs : :data:`~typing.Any`
Keyword arguments for the :class:`MonteCarlo` superclass.
"""
iter_len: int
sub_iter_len: int
phi: PhiUpdater
HAS_LOOP: ClassVar[Literal[False]] = False
@property
def super_iter_len(self) -> int:
return self.iter_len // self.sub_iter_len
[docs] def acceptance(self) -> np.ndarray:
"""Create an empty 1D boolean array for holding the acceptance."""
return np.zeros(self.sub_iter_len, dtype=bool)
def __init__(self, phi: PhiUpdater,
iter_len: int = 50000,
sub_iter_len: int = 100,
**kwargs: Any) -> None:
r"""Initialize an :class:`ARMC` instance.
Parameters
----------
iter_len : :class:`int`
The total number of ARMC iterations :math:`\kappa \omega`.
sub_iter_len : :class:`int`
The length of each ARMC subiteration :math:`\omega`.
phi : :class:`PhiUpdaterABC`
A PhiUpdater instance.
\**kwargs : :data:`~typing.Any`
Keyword arguments for the :class:`MonteCarlo` superclass.
"""
super().__init__(**kwargs)
# Settings specific to addaptive rate Monte Carlo (ARMC)
self.phi = phi
if len(phi.phi) != len(self.param.move_range):
raise ValueError("'phi.phi' and 'param_mapping.move_range' "
"should be of the same length")
self.iter_len = iter_len
self.sub_iter_len = sub_iter_len
[docs] def to_yaml_dict( # type: ignore[override]
self, *,
path: str = '.',
folder: str = 'MM_MD_workdir',
logfile: str = 'armc.log',
psf: Optional[Iterable[PSFContainer]] = None,
) -> Dict[str, Any]:
"""Convert an :class:`ARMC` instance into a .yaml readable by :class:`ARMC.from_yaml`.
Returns
-------
:data:`dict[str, Any] <dict>`
A dictionary.
"""
cls = type(self)
ret: Dict[str, Any] = {} # type: ignore[typeddict-item]
ret['param'] = self.param.to_yaml_dict()
ret['phi'] = self.phi.to_yaml_dict()
ret['job'] = self.package_manager.to_yaml_dict()
ret['job']['molecule'] = [m.properties.filename for m in self.molecule]
if self.molecule[0].lattice is not None:
ret['job']['lattice'] = lat_lst = []
iterator1 = (
(m, MultiMolecule(m.lattice, atoms={"Xx": range(3)})) for m in self.molecule
)
for m, lat_mol in iterator1:
filename = m.properties.filename.replace(".xyz", ".lattice.xyz")
lat_mol.as_xyz(filename)
lat_lst.append(filename)
else:
ret['job']['lattice'] = None
ret['monte_carlo'] = {
'type': f'{cls.__module__}.{cls.__name__}',
'iter_len': self.iter_len,
'sub_iter_len': self.sub_iter_len,
'keep_files': self.keep_files,
'path': path,
'folder': folder,
'logfile': logfile,
'hdf5_file': self.hdf5_file,
}
# Parse the `pes` and `pes_validation` blocks
iterator2 = ((k, getattr(self, k)) for k in ('pes', 'pes_validation'))
for name, attr in iterator2:
pes: Dict[str, Any] = {}
ret[name] = pes
i_max = 1 + max((int(k.split('.')[-1]) for k in attr.keys()), default=0)
for _k, v in attr.items():
k, _i = _k.split('.')
i = int(_i)
try:
pes[k]['kwargs'][i] = v.keywords
pes[k]['weight'].append(v.weight)
except KeyError:
pes[k] = {
'func': f'{v.args[0].__module__}.{v.args[0].__qualname__}',
'kwargs': i_max * [None],
'ref': None if v.use_mol else [],
'err_func': f'{v.err_func.__module__}.{v.err_func.__qualname__}',
'weight': [v.weight],
}
pes[k]['kwargs'][i] = v.keywords
if not v.use_mol:
pes[k]['ref'].append(v.ref)
# Parse the `psf` block
if psf is None:
ret['psf'] = {'psf_file': None}
else:
workdir = Path(path, folder)
if not os.path.isdir(workdir):
os.mkdir(workdir)
ret['psf'] = {'psf_file': []}
psf_lst = ret['psf']['psf_file']
for i, psf_obj in enumerate(psf):
name = str(workdir / f'mol.{i}.psf')
psf_lst.append(name)
psf_obj.write(name)
return ret
@overload # type: ignore[override]
def __call__(self, *, start: None = ..., key_new: None = ...) -> None: ...
@overload
def __call__(self, *, start: int, key_new: Key) -> None: ...
def __call__(self, *, start=None, key_new=None): # noqa: E301
"""Initialize the Addaptive Rate Monte Carlo procedure."""
key_new = self._parse_call(start, key_new)
start_ = start if start is not None else 0
# Start the main loop
for kappa in range(start_, self.super_iter_len):
acceptance = self.acceptance()
create_xyz_hdf5(self.hdf5_file, self.molecule,
iter_len=self.sub_iter_len,
phi=self.phi.phi)
for omega in range(self.sub_iter_len):
key_new = self.do_inner(kappa, omega, acceptance, key_new)
self.phi.update(acceptance)
@overload
def _parse_call(self, start: None = ..., key_new: None = ...) -> Key: ...
@overload
def _parse_call(self, start: int, key_new: Key) -> Key: ...
def _parse_call(self, start=None, key_new=None): # noqa: E301
"""Parse the arguments of :meth:`__call__` and prepare the first key."""
if start is None:
create_hdf5(self.hdf5_file, self) # Construct the HDF5 file
key_new = self._get_first_key() # Initialize the first MD calculation
if np.inf in self[key_new]:
raise RuntimeError('One or more jobs crashed in the first ARMC iteration; '
'manual inspection of the cp2k output is recomended')
elif not self.keep_files:
self.clear_jobs()
elif key_new is None:
raise TypeError("'key_new' cannot be None if 'start' is None")
return key_new
[docs] def do_inner(self, kappa: int, omega: int, acceptance: np.ndarray, key_old: Key) -> Key:
r"""Run the inner loop of the :meth:`ARMC.__call__` method.
Parameters
----------
kappa : :class:`int`
The super-iteration, :math:`\kappa`, in :meth:`ARMC.__call__`.
omega : :class:`int`
The sub-iteration, :math:`\omega`, in :meth:`ARMC.__call__`.
acceptance : :class:`np.ndarray[np.bool_] <numpy.ndarray>`
An array with the acceptance over the course of the latest super-iteration
key_new : :class:`tuple[float, ...]`
A tuple with the latest set of forcefield parameters.
Returns
-------
:class:`tuple[float, ...]`
The latest set of parameters.
"""
# Step 1: Perform a random move
_key_new = self._do_inner1(key_old, idx=None)
# Step 2: Calculate PES descriptors
pes_new, pes_validation, mol_list = self._do_inner2()
# Step 3: Evaluate the auxiliary error; accept if the new parameter set lowers the error
error_change, aux_new, aux_validation = self._do_inner3(pes_new, pes_validation, key_old)
accept = error_change < 0
# Step 4: Update the auxiliary error history, apply phi & update job settings
acceptance[omega] = accept
key_new = self._do_inner4(accept, error_change, aux_new,
_key_new, key_old, kappa, omega)
# Step 5: Export the results to HDF5
self._do_inner5(mol_list, accept, aux_new, aux_validation,
pes_new, pes_validation, kappa, omega)
return key_new
def _do_inner1(self, key_old: Key, idx: None | int) -> Key:
"""Perform a random move."""
key_new = self.move(idx=idx)
if isinstance(key_new, Exception):
self.logger.warning(f"Recalculating move; {key_new}")
return self._do_inner1(key_old, idx=idx)
elif key_new in self:
self.logger.info("Recalculating move; move has already been visited")
return self._do_inner1(key_old, idx=idx)
return key_new
def _do_inner2(self) -> Tuple[PesDict, PesDict, Optional[MolList]]:
"""Calculate PES-descriptors."""
return self.get_pes_descriptors()
def _do_inner3(self, pes_new: PesMapping, pes_validation: PesMapping,
key_old: Key) -> Tuple[float, np.ndarray, np.ndarray]:
"""Evaluate the auxiliary error; accept if the new parameter set lowers the error."""
self.logger.info("Calculating auxiliary error")
aux_new = self.get_aux_error(pes_new)
aux_validation = self.get_aux_error(pes_validation, validation=True)
aux_old = self[key_old]
error_change = (aux_new - aux_old).sum()
return error_change, aux_new, aux_validation
def _do_inner4(self, accept: bool, error_change: float, aux_new: np.ndarray,
key_new: Key, key_old: Key, kappa: int, omega: int) -> Key:
"""Update the auxiliary error history, apply phi & update job settings."""
err_round = round(error_change, 4)
aux_round = round(aux_new.sum(), 4)
epilog = f'error_change = {err_round}; error = {aux_round}'
if accept:
self.logger.info(f"Accepting move {(kappa, omega)}: {epilog}")
self[key_new] = self.apply_phi(aux_new)
self.param.param_old[:] = self.param.param
return key_new
else:
self.logger.info(f"Rejecting move {(kappa, omega)}: {epilog}")
self[key_new] = aux_new
self[key_old] = self.apply_phi(self[key_old])
return key_old
def _do_inner5(self, mol_list: Optional[MolCollection], accept: bool,
aux_new: np.ndarray, aux_validation: np.ndarray,
pes_new: PesMapping, pes_validation: PesMapping,
kappa: int, omega: int) -> None:
"""Export the results to HDF5."""
self.to_hdf5(mol_list, accept, aux_new, aux_validation,
pes_new, pes_validation, kappa, omega)
not_accept = ~np.full_like(self.param.param_old.columns, accept, dtype=np.bool_)
self.param.param.loc[:, not_accept] = self.param.param_old.loc[:, not_accept]
@property
def apply_phi(self) -> Callable[..., np.ndarray]:
"""Apply :attr:`phi` to **value**."""
return self.phi
def _get_first_key(self, idx: int = 0) -> Key:
"""Create a the ``history_dict`` variable and its first key.
The to-be returned key servers as the starting argument for :meth:`.do_inner`,
the latter method relying on both producing and requiring a key as argument.
Parameters
----------
idx : :class:`int`
The column key for :attr:`param_mapping["param"]<ARMC.param_mapping.>`.
Returns
-------
:class:`tuple[float, ...]`
A tuple of Numpy arrays.
"""
key: Key = tuple(self.param.param[idx].values)
pes, _, _ = self.get_pes_descriptors(get_first_key=True)
self[key] = self.get_aux_error(pes)
self.param.param_old[idx] = self.param.param[idx]
return key
[docs] def to_hdf5(self, mol_list: Optional[MolCollection],
accept: np.bool_, aux_new: np.ndarray, aux_validation: np.ndarray,
pes_new: PesMapping, pes_validation: PesMapping,
kappa: int, omega: int) -> None:
r"""Construct a dictionary with the **hdf5_kwarg** and pass it to :func:`.to_hdf5`.
Parameters
----------
mol_list : :class:`list[FOX.MultiMolecule] <list>`, optional
An iterable consisting molecules instances (or ``None``).
accept : :class:`bool`
Whether or not the latest set of parameters was accepted.
aux_new : :class:`np.ndarray[np.float64] <numpy.ndarray>`
The latest auxiliary error.
aux_validation : :class:`np.ndarray[np.float64] <numpy.ndarray>`
The latest auxiliary error from the validation.
pes_new : :class:`dict[str, np.ndarray[np.float64]] <dict>`
A dictionary of PES descriptors.
pes_validation : :class:`dict[str, np.ndarray[np.float64]] <dict>`
A dictionary of PES descriptors from the validation.
kappa : :class:`int`
The super-iteration, :math:`\kappa`, in :meth:`ARMC.__call__`.
omega : :class:`int`
The sub-iteration, :math:`\omega`, in :meth:`ARMC.__call__`.
Returns
-------
:class:`dict[str, Any] <dict>`
A dictionary with the **hdf5_kwarg** argument for :func:`.to_hdf5`.
"""
self.logger.info(f"Exporting results to {os.path.basename(self.hdf5_file)!r}\n")
phi = self.phi.phi
if accept.size == 1:
iterator = repeat(('param' if accept else 'param_old'), len(self.param.param.columns))
else:
iterator = ('param' if acc else 'param_old' for acc in accept)
_aux_error_mod = [getattr(self.param, n)[i].values for i, n in enumerate(iterator)]
aux_error_mod = np.empty((len(self.param.param.columns), 1 + len(_aux_error_mod[0])))
aux_error_mod[..., :-1] = _aux_error_mod
aux_error_mod[..., -1] = phi
hdf5_kwarg = {
'param': self.param.param.values.T,
'xyz': mol_list,
'phi': phi,
'acceptance': accept,
'aux_error': aux_new,
'aux_error_mod': aux_error_mod,
'validation/aux_error': aux_validation,
}
hdf5_kwarg.update(pes_new)
for k, v in pes_validation.items():
hdf5_kwarg[f'validation/{k}'] = v
to_hdf5(self.hdf5_file, hdf5_kwarg, kappa, omega)
[docs] def get_aux_error(self, pes_dict: PesMapping, validation: bool = False) -> np.ndarray:
r"""Return the auxiliary error :math:`\Delta \varepsilon_{QM-MM}`.
The auxiliary error is constructed using the PES descriptors in **values**
with respect to **self.ref**.
The default function is equivalent to:
.. math::
\Delta \varepsilon_{QM-MM} =
\frac{ \sum_{i}^{N} |r_{i}^{QM} - r_{i}^{MM}|^2 }
{r_{i}^{QM}}
Parameters
----------
pes_dict : :class:`dict[str, np.ndarray[np.float64]] <dict>`
An dictionary with :math:`m*n` PES descriptors each.
Returns
-------
:class:`np.ndarray[np.float64] <numpy.ndarray>`, shape :math:`(m, n)`
An array with :math:`m*n` auxilary errors
"""
pes_dict_ref = self.pes if not validation else self.pes_validation
length = 1 + max((int(k.rsplit('.')[-1]) for k in pes_dict.keys()), default=0)
generator = (
pes_dict_ref[k].err_func(pes_dict_ref[k].ref, v) * pes_dict_ref[k].weight
for k, v in pes_dict.items()
)
ret = np.fromiter(generator, dtype=float, count=len(pes_dict))
ret.shape = length, -1
return ret
[docs] def restart(self) -> None:
r"""Restart a previously started Addaptive Rate Monte Carlo procedure."""
# Validate the xyz .hdf5 file; create a new one if required
xyz = _get_filename_xyz(self.hdf5_file)
if not os.path.isfile(xyz):
create_xyz_hdf5(self.hdf5_file, self.molecule, self.sub_iter_len, self.phi.phi)
# Check that both .hdf5 files can be opened; clear their status if not
closed1 = hdf5_clear_status(xyz)
if closed1 is not None:
self.logger.warning(f"Unable to open {xyz!r}, file status was forcibly reset")
closed2 = hdf5_clear_status(self.hdf5_file)
if closed2 is not None:
self.logger.warning(f"Unable to open {self.hdf5_file!r}, "
"file status was forcibly reset")
i, j, key, acceptance = self._restart_from_hdf5()
cls = type(self)
if not cls.HAS_LOOP:
key = key[0]
acceptance = acceptance[..., 0]
# Finish the current set of sub-iterations
j += 1
for omega in range(j, self.sub_iter_len):
key = self.do_inner(i, omega, acceptance, key)
self.phi.update(acceptance)
i += 1
# And continue
self(start=i, key_new=key)
def _restart_from_hdf5(self) -> Tuple[int, int, List[Key], np.ndarray]:
"""Read and process the .hdf5 file for :meth:`ARMC.restart`."""
with h5py.File(self.hdf5_file, 'r', libver='latest') as f:
i, j = f.attrs['super-iteration'], f.attrs['sub-iteration']
ij = j + 1 + i * len(f['param'])
if i < 0:
raise ValueError(f'i: {i.__class__.__name__} = {i}')
self.logger.info('Restarting ARMC procedure from super-iteration '
f'{i} & sub-iteration {j}')
# Set the (to-be incremented) auxiliary error for each parameter
param = f['param'][:i+1]
param.shape = (-1,) + param.shape[2:]
aux_error = f['aux_error'][:i+1]
aux_error.shape = (-1,) + aux_error.shape[2:]
aux_error = np.swapaxes(aux_error, -2, -1)
for keys, err in zip(param[:ij], aux_error[:ij]):
for _k, _err in zip(keys, err):
self[tuple(_k)] = _err
# Apply aforementioned increments
aux_error_mod = f['aux_error_mod'][:i+1]
aux_error_mod.shape = (-1,) + aux_error_mod.shape[2:]
for _arr in aux_error_mod[:ij]:
for arr in _arr:
*_k, err = arr
try:
self[tuple(_k)] += err
except KeyError:
pass
self.phi.phi = f['phi'][i]
self.param.param[:] = f['param'][i, j].T
self.param.param_old[:] = self.param.param
acceptance = f['acceptance'][:i+1]
# Find the last iteration that is not np.nan
final_key: List[Key] = self._find_restart_key(acceptance, param[:ij])
return i, j, final_key, acceptance[i]
@staticmethod
def _find_restart_key(acceptance: np.ndarray, param: np.ndarray) -> List[Key]:
"""Construct a key for the last parameter that was accepted."""
accept = acceptance.reshape(-1, *acceptance.shape[2:])[:len(param)]
if not accept.any():
raise RuntimeError('Not a single successful MD-calculation was found; '
'restarting is not possible')
j_array = -accept[::-1].argmax(axis=0)
j_array -= 1
return [tuple(param[i, j]) for j, i in enumerate(j_array)]