"""
FOX.classes.armc
================
A module for performing Addaptive Rate Monte Carlo (ARMC) forcefield parameter optimizations.
Index
-----
.. currentmodule:: FOX.classes.monte_carlo
.. autosummary::
ARMC
API
---
.. autoclass:: FOX.classes.monte_carlo.ARMC
:members:
:private-members:
:special-members:
"""
import os
import io
import logging
from typing import Tuple, Dict, Any, Optional, Iterable, Callable, Mapping, Union, AnyStr
from contextlib import AbstractContextManager, redirect_stdout
import yaml
import numpy as np
from scm.plams import init, finish, config, Settings
from .monte_carlo import MonteCarlo
from ..logger import Plams2Logger, get_logger
from ..io.hdf5_utils import (
create_hdf5, to_hdf5, create_xyz_hdf5, _get_filename_xyz, hdf5_clear_status
)
from ..functions.utils import get_template
from ..io.file_container import NullContext
from ..armc_functions.guess import guess_param
from ..armc_functions.df_to_dict import df_to_dict
from ..armc_functions.sanitization import init_armc_sanitization
__all__ = ['ARMC', 'run_armc']
try:
Dumper = yaml.CDumper
except AttributeError:
Dumper = yaml.Dumper
class Init(AbstractContextManager):
"""A context manager for calling :func:`init` and :func:`finish`."""
def __init__(self, path=None, folder=None) -> None:
self.path = path
self.folder = folder
def __enter__(self) -> None:
init(self.path, self.folder)
def __exit__(self, exc_type, exc_value, traceback) -> None:
finish()
def run_armc(armc: 'ARMC',
path: Optional[str] = None,
folder: Optional[str] = None,
logfile: Optional[str] = None,
psf: Optional[Iterable['PSFContainer']] = None,
restart: bool = False,
guess: Optional[Mapping[str, Mapping]] = None) -> None:
"""A wrapper arround :class:`ARMC` for handling the JobManager."""
if not armc.keep_files: # Disable rerun prevention if all jobs are deleted anyway
config.default_jobmanager.settings.hashing = None
# Create a .psf file if specified
if psf is not None:
for item in psf:
item.write(None)
# Guess the remaining unspecified parameters based on either UFF or the RDF
if guess is not None:
for k, v in guess.items():
frozen = (k if v['frozen'] else None)
guess_param(armc, mode=v['mode'], columns=k, frozen=frozen)
# Initialize the ARMC procedure
with Init(path=path, folder=folder):
armc.logger = _get_armc_logger(logfile, armc.__class__.__name__)
writer = Plams2Logger(armc.logger,
lambda n: 'STARTED' in n,
lambda n: 'Renaming' in n,
lambda n: 'Trying to obtain results of crashed or failed job' in n)
with redirect_stdout(writer):
if not restart: # To restart or not? That's the question
armc()
else:
armc.restart()
def _get_armc_logger(logfile: Optional[str], name: str, **kwargs) -> logging.Logger:
"""Substitute the PLAMS .log file for one created by a :class:`Logger<logging.Logger>`."""
# Define filenames
plams_logfile = config.default_jobmanager.logfile
logfile = os.path.abspath(logfile) if logfile is not None else plams_logfile
# Modify the plams logger
config.log.time = False
config.log.file = 0
# Replace the plams logger with a proper logging.Logger instance
os.remove(plams_logfile)
logger = get_logger(name, filename=logfile, **kwargs)
if plams_logfile != logfile:
try:
os.symlink(logfile, plams_logfile)
except OSError:
pass
return logger
[docs]class ARMC(MonteCarlo):
r"""The Addaptive Rate Monte Carlo class (:class:`.ARMC`).
A subclass of :class:`.MonteCarlo`.
Parameters
----------
iter_len : int
The total number of ARMC iterations :math:`\kappa \omega`.
sub_iter_len : int
The length of each ARMC subiteration :math:`\omega`.
gamma : float
The constant :math:`\gamma`.
a_target : float
The target acceptance rate :math:`\alpha_{t}`.
phi : float
The variable :math:`\phi`.
apply_phi : |Callable|_
The callable used for applying :math:`\phi` to the auxiliary error.
The callable should be able to take 2 floats as argument and return a new float.
\**kwargs : |Any|_
Keyword arguments for the :class:`MonteCarlo` superclass.
"""
@property
def super_iter_len(self) -> int:
"""Get :attr:`ARMC.iter_len` ``//`` :attr:`ARMC.sub_iter_len`."""
return self.iter_len // self.sub_iter_len
def __init__(self, iter_len: int = 50000, sub_iter_len: int = 100, gamma: int = 200,
a_target: float = 0.25, phi: float = 1.0,
apply_phi: Callable[[float, float], float] = np.add, **kwargs) -> None:
"""Initialize a :class:`ARMC` instance."""
super().__init__(**kwargs)
# Settings specific to addaptive rate Monte Carlo (ARMC)
self.iter_len: int = iter_len
self.sub_iter_len: int = sub_iter_len
self.gamma: int = gamma
self.a_target: float = a_target
# Settings specific to handling the phi parameter in ARMC
self.phi: float = phi
self.apply_phi: Callable[[float, float], float] = apply_phi
[docs] @classmethod
def from_yaml(cls, filename: str) -> Tuple['ARMC', dict]:
"""Create a :class:`.ARMC` instance from a .yaml file.
Parameters
----------
filename : str
The path+filename of a .yaml file containing all :class:`ARMC` settings.
Returns
-------
|FOX.ARMC|_ and |dict|_
A new :class:`ARMC` instance and
a dictionary with keyword arguments for :func:`.run_armc`.
"""
# Load the .yaml file
if os.path.isfile(filename):
path, filename = os.path.split(filename)
else:
path = None
yaml_dict = get_template(filename, path=path)
# Parse and sanitize the .yaml file
s, pes_kwarg, job_kwarg = init_armc_sanitization(yaml_dict)
self = cls.from_dict(s)
for name, options in pes_kwarg.items():
self.add_pes_evaluator(name, options.func, options.args, options.kwargs)
return self, job_kwarg
[docs] def to_yaml(self, filename: Union[AnyStr, os.PathLike, io.IOBase],
logfile: Optional[str] = None, path: Optional[str] = None,
folder: Optional[str] = None) -> None:
"""Convert an :class:`ARMC` instance into a .yaml readable by :class:`ARMC.from_yaml`.
Parameters
----------
filename : :class:`str`, :class:`bytes`, :class:`os.pathlike` or :class:`io.IOBase`
A filename or a file-like object.
"""
try: # is filename an actual filename or a file-like object?
assert callable(filename.write)
except (AttributeError, AssertionError):
manager = open
else:
manager = NullContext
# The armc block
s = Settings()
s.armc.iter_len = self.iter_len
s.armc.sub_iter_len = self.sub_iter_len
s.armc.gamma = self.gamma
s.armc.a_target = self.a_target
s.armc.phi = self.phi
# The hdf5 block
s.hdf5_file = self.hdf5_file
# The pram block
s.param = df_to_dict(self.param)
# The job block
s.job.path = os.getcwd() if path is None else str(path)
if logfile is not None:
s.job.logfile = logfile
if folder is not None:
s.job.folder = folder
s.job.keep_files = self.keep_files
s.job.rmsd_threshold = self.rmsd_threshold
s.job.job_type = f'{self.job_type.func.__module__}.{self.job_type.func.__qualname__}'
s.job.name = self.job_type.keywords['name']
s.job.preopt_settings = self.preopt_settings[0] if self.preopt_settings else None
s.job.md_settings = self.md_settings[0]
s.psf = {}
with Settings.supress_missing():
try:
s.psf.psf_file = [s.input.force_eval.subsys.topology.conn_file_name for
s in self.md_settings]
del s.job.md_settings.input.force_eval.subsys.topology.conn_file_name
except KeyError:
pass
try:
del s.job.preopt_settings.input.force_eval.subsys.topology.conn_file_name
except (AttributeError, KeyError):
pass
# The molecule block
s.molecule = [mol.properties.filename for mol in self.molecule]
# The pes block
for name, partial in self.pes.items():
pes_dict = s.pes[name.rsplit('.', maxsplit=1)[0]]
pes_dict.func = f'{partial.func.__module__}.{partial.func.__qualname__}'
pes_dict.args = list(partial.args)
if 'kwargs' not in pes_dict:
pes_dict.kwargs = []
pes_dict.kwargs.append(partial.keywords)
# The move block
s.move.range.stop = round(float(self.move_range.max() - 1), 8)
s.move.range.step = round(float(abs(self.move_range[-1] - self.move_range[-2])), 8)
s.move.range.start = round(float(self.move_range[len(self.move_range) // 2] - 1), 8)
# Write the file
with manager(filename, 'w') as f:
f.write(yaml.dump(s.as_dict(), Dumper=Dumper))
def __call__(self, start: int = 0, key_new: Optional[Tuple[np.ndarray, ...]] = None) -> None:
"""Initialize the Addaptive Rate Monte Carlo procedure."""
if start == 0:
create_hdf5(self.hdf5_file, self) # Construct the HDF5 file
key_new = self._get_first_key() # Initialize the first MD calculation
if np.inf in self[key_new]:
ex = RuntimeError('One or more jobs crashed in the first ARMC iteration; '
'manual inspection of the cp2k output is recomended')
self.logger.critical(f'{ex.__class__.__name__}: {ex}', exc_info=True)
raise ex
self.clear_job_cache()
elif key_new is None:
ex = TypeError("'key_new' cannot be 'None' if 'start' is larger than 0")
self.logger.critical(f'{ex.__class__.__name__}: {ex}', exc_info=True)
raise ex
# Start the main loop
for kappa in range(start, self.super_iter_len):
acceptance = np.zeros(self.sub_iter_len, dtype=bool)
create_xyz_hdf5(self.hdf5_file, self.molecule, iter_len=self.sub_iter_len)
for omega in range(self.sub_iter_len):
key_new = self.do_inner(kappa, omega, acceptance, key_new)
self.update_phi(acceptance)
[docs] def do_inner(self, kappa: int, omega: int, acceptance: np.ndarray,
key_old: Tuple[np.ndarray, ...]) -> Tuple[np.ndarray, ...]:
r"""Run the inner loop of the :meth:`ARMC.__call__` method.
Parameters
----------
kappa : int
The super-iteration, :math:`\kappa`, in :meth:`ARMC.__call__`.
omega : int
The sub-iteration, :math:`\omega`, in :meth:`ARMC.__call__`.
history_dict : |dict|_ [|tuple|_ [|float|_], |np.ndarray|_ [|np.float64|_]]
A dictionary with parameters as keys and a list of PES descriptors as values.
key_new : tuple [float]
A tuple with the latest set of forcefield parameters.
Returns
-------
|tuple|_ [|float|_]:
The latest set of parameters.
"""
# Step 1: Perform a random move
key_new = self.move()
# Step 2: Check if the move has been performed already; calculate PES descriptors if not
pes_new, mol_list = self.get_pes_descriptors(key_new)
# Step 3: Evaluate the auxiliary error; accept if the new parameter set lowers the error
aux_new = self.get_aux_error(pes_new)
aux_old = self[key_old]
error_change = (aux_new - aux_old).sum()
accept = error_change < 0
# Step 4: Update the auxiliary error history, apply phi & update job settings
acceptance[omega] = accept
if accept:
self.logger.info(f"Accepting move {(kappa, omega)}; total error change / error: "
f"{round(error_change, 4)} / {round(aux_new.sum(), 4)}\n")
self[key_new] = self.apply_phi(aux_new, self.phi)
self.param['param_old'] = self.param['param']
else:
self.logger.info(f"Rejecting move {(kappa, omega)}; total error change / error: "
f"{round(error_change, 4)} / {round(aux_new.sum(), 4)}\n")
self[key_new] = aux_new
self[key_old] = self.apply_phi(aux_old, self.phi)
key_new = key_old
# Step 5: Export the results to HDF5
hdf5_kwarg = self._hdf5_kwarg(mol_list, accept, aux_new, pes_new)
to_hdf5(self.hdf5_file, hdf5_kwarg, kappa, omega)
if not accept:
self.param['param'] = self.param['param_old']
return key_new
def _get_first_key(self) -> Tuple[np.ndarray, ...]:
"""Create a the ``history_dict`` variable and its first key.
The to-be returned key servers as the starting argument for :meth:`.do_inner`,
the latter method relying on both producing and requiring a key as argument.
Returns
-------
|tuple|_ [|np.ndarray|_ [|float|_]]
A tuple of Numpy arrays.
"""
key = tuple(self.param['param'].values)
pes, _ = self.get_pes_descriptors(key, get_first_key=True)
self[key] = self.get_aux_error(pes)
self.param['param_old'] = self.param['param']
return key
def _hdf5_kwarg(self, mol_list: Optional[Iterable['FOX.MultiMolecule']],
accept: bool, aux_new: np.ndarray,
pes_new: Mapping[str, np.ndarray]) -> Dict[str, Any]:
"""Construct a dictionary with the **hdf5_kwarg** argument for :func:`.to_hdf5`.
Parameters
----------
mol_list : |List|_ [|FOX.MultiMolecule|_]
An iterable consisting of :class:`.MultiMolecule` instances (or ``None``).
accept : bool
Whether or not the latest set of parameters was accepted.
aux_new : |np.ndarray|_
The latest auxiliary error.
pes_new : |dict|_ [|str|_, |np.ndarray|_]
A dictionary of PES descriptors.
Returns
-------
|dict|_
A dictionary with the **hdf5_kwarg** argument for :func:`.to_hdf5`.
"""
param_key = 'param' if accept else 'param_old'
hdf5_kwarg = {
'param': self.param['param'],
'xyz': mol_list if not None else np.nan,
'phi': self.phi,
'acceptance': accept,
'aux_error': aux_new,
'aux_error_mod': np.append(self.param[param_key].values, self.phi)
}
hdf5_kwarg.update(pes_new)
return hdf5_kwarg
[docs] def get_aux_error(self, pes_dict: Dict[str, np.ndarray]) -> np.ndarray:
r"""Return the auxiliary error :math:`\Delta \varepsilon_{QM-MM}`.
The auxiliary error is constructed using the PES descriptors in **values**
with respect to **self.ref**.
The default function is equivalent to:
.. math::
\Delta \varepsilon_{QM-MM} =
\frac{ \sum_{i}^{N} |r_{i}^{QM} - r_{i}^{MM}|^2 }
{r_{i}^{QM}}
Parameters
----------
pes_dict : [|dict|_ [str, |np.ndarray|_ [|np.float64|_]]
An dictionary with :math:`m*n` PES descriptors each.
Returns
-------
:math:`m*n` |np.ndarray|_ [|np.float64|_]:
An array with :math:`m*n` auxilary errors
"""
def norm_mean(key: str, mm: np.ndarray) -> float:
qm = self.pes[key].ref # qm and mm should be array-like
delta = np.abs(qm - mm)**2
ret = delta.sum() / np.abs(qm).sum()
return np.asarray(ret, dtype=float).sum()
length = 1 + max(int(k.rsplit('.')[-1]) for k in pes_dict.keys())
generator = (norm_mean(k, v) for k, v in pes_dict.items())
ret = np.fromiter(generator, dtype=float, count=len(pes_dict))
ret.shape = length, -1
return ret
[docs] def update_phi(self, acceptance: np.ndarray) -> None:
r"""Update the variable :math:`\phi`.
:math:`\phi` is updated based on the target accepatance rate, :math:`\alpha_{t}`, and the
acceptance rate, **acceptance**, of the current super-iteration.
* The values are updated according to the provided settings in **self.armc**.
The default function is equivalent to:
.. math::
\phi_{\kappa \omega} =
\phi_{ ( \kappa - 1 ) \omega} * \gamma^{
\text{sgn} ( \alpha_{t} - \overline{\alpha}_{ ( \kappa - 1 ) })
}
Parameters
----------
acceptance : |np.ndarray|_ [|bool|_]
A 1D boolean array denoting the accepted moves within a sub-iteration.
"""
sign = np.sign(self.a_target - np.mean(acceptance))
phi_old = self.phi
self.phi *= self.gamma**sign
self.logger.info(f"Updating phi: {phi_old} -> {self.phi}")
[docs] def restart(self) -> None:
r"""Restart a previously started Addaptive Rate Monte Carlo procedure."""
i, j, key, acceptance = self._restart_from_hdf5()
# Validate the xyz .hdf5 file; create a new one if required
xyz = _get_filename_xyz(self.hdf5_file)
if not os.path.isfile(xyz):
create_xyz_hdf5(self.hdf5_file, self.molecule, iter_len=self.sub_iter_len)
# Check that both .hdf5 files can be opened; clear their status if not
closed = hdf5_clear_status(xyz)
if not closed:
self.logger.warning(f"Unable to open ...{os.sep}{os.path.basename(xyz)}, "
"file status was forcibly reset")
closed = hdf5_clear_status(self.hdf5_file)
if not closed:
self.logger.warning(f"Unable to open ...{os.sep}{os.path.basename(self.hdf5_file)}, "
"file status was forcibly reset")
# Finish the current set of sub-iterations
j += 1
for omega in range(j, self.sub_iter_len):
key = self.do_inner(i, omega, acceptance, key)
self.update_phi(acceptance)
i += 1
# And continue
self(start=i, key_new=key)
def _restart_from_hdf5(self) -> Tuple[int, int, Tuple[np.ndarray, ...], np.ndarray]:
r"""Read and process the .hdf5 file for :meth:`ARMC.restart`."""
import h5py
with h5py.File(self.hdf5_file, 'r', libver='latest') as f:
i, j = f.attrs['super-iteration'], f.attrs['sub-iteration']
if i < 0:
raise ValueError(f'i: {i.__class__.__name__} = {i}')
self.logger.info('Restarting ARMC procedure from super-iteration '
f'{i} & sub-iteration {j}')
self.phi = f['phi'][i]
self.param['param'] = self.param['param_old'] = f['param'][i, j]
for key, err in zip(f['param'][i], f['aux_error'][i]):
key = tuple(key)
self[key] = err
acceptance = f['acceptance'][i]
# Find the last error which is not np.nan
i_ = i
while np.isnan(err).any():
if i_ < 0:
raise RuntimeError('Not a single successful MD-calculation was found; '
'restarting is not possible')
aux_error = f['aux_error'][i_]
param_old = f['param'][i_]
aux_nan = np.isnan(aux_error).any(axis=(1, 2))
try:
key = tuple(param_old[~aux_nan][-1])
except IndexError:
i_ -= 1
else:
err = aux_error[~aux_nan][-1] # Its no longer np.nan
return i, j, key, acceptance