"""
FOX.classes.monte_carlo
=======================
A module for performing Monte Carlo-based forcefield parameter optimizations.
Index
-----
.. currentmodule:: FOX.classes.monte_carlo
.. autosummary::
MonteCarlo
API
---
.. autoclass:: FOX.classes.monte_carlo.MonteCarlo
:members:
:private-members:
:special-members:
"""
import os
import copy as pycopy
import shutil
import logging
import functools
from sys import version_info
from types import MappingProxyType
from itertools import repeat, cycle
from collections import abc
from typing import (
Tuple, List, Dict, Optional, Union, Iterable, Hashable, Iterator, Any, Mapping, Type, Callable,
KeysView, ValuesView, ItemsView, Sequence
)
import numpy as np
import pandas as pd
from scm.plams import Molecule, Settings, Cp2kJob, Cp2kResults, add_to_class
from scm.plams.core.basejob import Job
from assertionlib.dataclass import AbstractDataClass
from .multi_mol import MultiMolecule
from ..logger import get_logger
from ..io.read_xyz import XYZError
from ..functions.utils import _get_move_range
from ..functions.charge_utils import update_charge
if version_info.minor < 7:
from collections import OrderedDict # noqa
else: # Dictionaries are ordered starting from python 3.7
OrderedDict = dict
__all__ = []
@add_to_class(Cp2kResults)
def get_xyz_path(self):
"""Return the path + filename to an .xyz file."""
for file in self.files:
if '-pos' in file and '.xyz' in file:
return self[file]
raise FileNotFoundError('No .xyz files found in ' + self.job.path)
PostProcess = Callable[[Optional[Iterable[MultiMolecule]], Optional['MonteCarlo']], None]
[docs]class MonteCarlo(AbstractDataClass, abc.Mapping):
r"""The base :class:`.MonteCarlo` class."""
@property
def molecule(self) -> Tuple[MultiMolecule, ...]:
"""Get **value** or set **value** as a tuple of MultiMolecule instances."""
return self._molecule
@molecule.setter
def molecule(self, value: Union[MultiMolecule, Iterable[MultiMolecule]]) -> None:
self._molecule = (value,) if isinstance(value, MultiMolecule) else tuple(value)
self._plams_molecule = tuple(mol.as_Molecule(0)[0] for mol in self._molecule)
@property
def md_settings(self) -> Tuple[Settings, ...]:
"""Get **value** or set **value** as a |plams.Settings| instance."""
return self._md_settings
@md_settings.setter
def md_settings(self, value: Union[Mapping, Iterable[Mapping]]) -> None:
if isinstance(value, abc.Mapping):
self._md_settings = (Settings(value),)
else:
self._md_settings = tuple(Settings(i) for i in value)
@property
def preopt_settings(self) -> Tuple[Settings, ...]:
"""Get **value** or set **value** as a |plams.Settings| instance."""
return self._preopt_settings
@preopt_settings.setter
def preopt_settings(self, value: Union[None, Mapping, Iterable[Mapping]]) -> None:
if value is None:
self._preopt_settings = None
elif isinstance(value, abc.Mapping):
self._preopt_settings = (Settings(value),)
else:
self._preopt_settings = tuple(Settings(i) for i in value)
@property
def move_range(self) -> np.ndarray:
"""Get **value** or set **value** as a |np.ndarray|_."""
return self._move_range
@move_range.setter
def move_range(self, value: Optional[Iterable[float]]) -> np.ndarray:
if value is None:
self._move_range = _get_move_range()
else:
try:
self._move_range = np.array(value, dtype=float, ndmin=1, copy=False)
except TypeError:
self._move_range = np.fromiter(value, dtype=float)
@property
def job_name(self) -> str:
"""Get the (lowered) name of :attr:`MonteCarlo.job_type`."""
try:
return self.job_type.__name__.lower()
except AttributeError: # A functools.partial object
return self.job_type.func.__name__.lower()
@property
def logger(self) -> logging.Logger:
"""Get or set the logger."""
return self._logger
@logger.setter
def logger(self, value: Optional[logging.Logger]) -> None:
if value is not None:
self._logger = value
else:
self._logger = get_logger(self.__class__.__name__, handler_type=logging.StreamHandler)
@property
def pes_post_process(self) -> Tuple[Callable, ...]:
return self._pes_post_process
@pes_post_process.setter
def pes_post_process(self, value: Union[PostProcess, Iterable[PostProcess]]) -> None:
if value is None:
self._pes_post_process = ()
elif isinstance(value, abc.Iterable):
self._pes_post_process = tuple(value)
else:
self._pes_post_process = (value,)
_PRIVATE_ATTR = frozenset({'_plams_molecule', 'job_cache'})
def __init__(self, molecule: Union[MultiMolecule, Iterable[MultiMolecule]],
param: pd.DataFrame,
md_settings: Union[Mapping, Iterable[Mapping]],
preopt_settings: Union[None, Mapping, Iterable[Mapping]] = None,
rmsd_threshold: float = 5.0,
job_type: Type[Job] = Cp2kJob,
hdf5_file: str = 'ARMC.hdf5',
apply_move: Callable[[float, float], float] = np.multiply,
move_range: Optional[np.ndarray] = None,
keep_files: bool = False,
logger: Optional[logging.Logger] = None,
pes_post_process: Union[PostProcess, Iterable[PostProcess]] = None) -> None:
"""Initialize a :class:`MonteCarlo` instance."""
super().__init__()
# Set the inital forcefield parameters
self.param: pd.DataFrame = param
# Settings for running the actual MD calculations
self._plams_molecule: Tuple[Molecule, ...] = None # set by self.molecule
self.molecule: Tuple[MultiMolecule, ...] = molecule
self.job_type: Type[Job] = job_type
self.md_settings: Settings = md_settings
self.preopt_settings: Optional[Settings] = preopt_settings
self.rmsd_threshold: float = rmsd_threshold
self.keep_files: bool = keep_files
self.pes_post_process: Tuple[PostProcess, ...] = pes_post_process
# HDF5 settings
self.hdf5_file: str = hdf5_file
# Settings for generating Monte Carlo moves
self.apply_move: Callable[[float, float], float] = apply_move
self.move_range = move_range
# Logging settings
self.logger = logger
# Internally set attributes
self.history_dict = {}
self.pes: Dict[str, Callable[[np.ndarray], np.ndarray]] = OrderedDict()
self.job_cache: List[Job] = []
@AbstractDataClass.inherit_annotations()
def _str_iterator(self):
iterator = ((k.strip('_'), v) for k, v in super()._str_iterator())
return sorted(iterator)
def __eq__(self, value: Any) -> bool: return object.__eq__(self, value)
# Ensure compatibility with collections.abc.Mapping
def __setitem__(self, key: Hashable, value: Any) -> None:
"""Set :attr:`MonteCarlo.history_dict` ``[key]`` to ``value``."""
self.history_dict[key] = value
def __getitem__(self, key: Hashable) -> Any:
"""Return :attr:`MonteCarlo.history_dict` ``[key]``."""
return self.history_dict[key]
def __iter__(self) -> Iterator[Hashable]:
"""Iterate over :attr:`MonteCarlo.history_dict`."""
return iter(self.history_dict)
def __len__(self) -> int:
"""Return the number of items in :attr:`MonteCarlo.history_dict`."""
return len(self.history_dict)
def __contains__(self, key: Hashable) -> bool:
"""Return whether or not :attr:`MonteCarlo.history_dict` contains the specified key."""
return key in self.history_dict
[docs] def keys(self) -> KeysView:
"""Return a view of :attr:`MonteCarlo.history_dict`'s keys."""
return self.history_dict.keys()
[docs] def items(self) -> ItemsView:
"""Return a view of :attr:`MonteCarlo.history_dict`'s items."""
return self.history_dict.items()
[docs] def values(self) -> ValuesView:
"""Return a view of :attr:`MonteCarlo.history_dict`'s values."""
return self.history_dict.values()
[docs] def add_pes_evaluator(self, name: str, func: Callable, args: Sequence[Any] = (),
kwargs: Mapping[str, Any] = MappingProxyType({})) -> None:
r"""Add a callable to this instance for constructing PES-descriptors.
Examples
--------
.. code:: python
>>> from FOX import MonteCarlo, MultiMolecule
>>> mc = MonteCarlo(...)
>>> mol = MultiMolecule.from_xyz(...)
# Prepare arguments
>>> name = 'rdf'
>>> func = FOX.MultiMolecule.init_rdf
>>> atom_subset = ['Cd', 'Se', 'O'] # Keyword argument for func
# Add the PES-descriptor constructor
>>> mc.add_pes_evaluator(name, func, kwargs={'atom_subset': atom_subset})
Parameters
----------
name : str
The name under which the PES-descriptor will be stored (*e.g.* ``"RDF"``).
func : Callable
The callable for constructing the PES-descriptor.
The callable should take an array-like object as input and
return a new array-like object as output.
args : :class:`Sequence<collections.abc.Sequence>`
A sequence of positional arguments.
kwargs : :class:`dict` or :class:`Iterable<collections.abc.Iterable>` [:class:`dict`]
A dictionary or an iterable of dictionaries with keyword arguments.
Providing an iterable allows one to use a unique set of keyword arguments for each
molecule in :attr:`MonteCarlo.molecule`.
"""
mol_list = [m.copy() for m in self.molecule]
for f in self.pes_post_process:
f(mol_list, self)
if not isinstance(kwargs, abc.Mapping):
iterator = zip(mol_list, kwargs)
else:
iterator = zip(mol_list, repeat(kwargs, len(self.molecule)))
for i, (mol, kwarg) in enumerate(iterator):
partial = functools.partial(func, *args, **kwarg)
partial.__doc__ = func.__doc__
partial.__name__ = func.__name__
partial.ref = partial(mol)
self.pes[f'{name}.{i}'] = partial
[docs] def move(self) -> Tuple[float]:
"""Update a random parameter in **self.param** by a random value from **self.move.range**.
Performs in inplace update of the ``'param'`` column in **self.param**.
By default the move is applied in a multiplicative manner.
**self.job.md_settings** and **self.job.preopt_settings** are updated to reflect the
change in parameters.
Examples
--------
.. code:: python
>>> print(armc.param['param'])
charge Br -0.731687
Cs 0.731687
epsilon Br Br 1.045000
Cs Br 0.437800
Cs Cs 0.300000
sigma Br Br 0.421190
Cs Br 0.369909
Cs Cs 0.592590
Name: param, dtype: float64
>>> for _ in range(1000): # Perform 1000 random moves
>>> armc.move()
>>> print(armc.param['param'])
charge Br -0.597709
Cs 0.444592
epsilon Br Br 0.653053
Cs Br 1.088848
Cs Cs 1.025769
sigma Br Br 0.339293
Cs Br 0.136361
Cs Cs 0.101097
Name: param, dtype: float64
Returns
-------
|tuple|_ [|float|_]:
A tuple with the (new) values in the ``'param'`` column of **self.param**.
"""
def _update_settings(k: Tuple[str, ...], v: float, fstring: str) -> None:
for s in self.md_settings:
s.set_nested(k, fstring.format(v))
if self.preopt_settings is not None:
for s in self.preopt_settings:
s.set_nested(k, fstring.format(v))
# Unpack arguments
param = self.param
# Prepare arguments a move
idx, x1 = next(param.loc[:, 'param'].sample().items())
x2 = np.random.choice(self.move_range, 1)[0]
_value = self.apply_move(x1, x2)
# Ensure that the moved value does not exceed the user-specified minimum and maximum
value = self.clip_move(idx, _value)
self.logger.info(f"Moving {idx[0]} ({idx[1]}): {x1:.4f} -> {value:.4f}")
# Enforce all user-specified constraints
param_type, atom = idx
charge = param_type == 'charge'
constraint_dict = param.at[idx, 'constraints']
with pd.option_context('mode.chained_assignment', None):
update_charge(atom, value, param.loc[param_type], constraint_dict, charge=charge)
# Update the CP2K Settings
for k, v, fstring in param.loc[param_type, ('keys', 'param', 'unit')].values:
_update_settings(k, v, fstring)
return tuple(self.param['param'].values)
[docs] def clip_move(self, idx: Hashable, value: float) -> float:
"""Ensure that **value** falls within a user-specified range."""
prm_min = self.param.at[idx, 'min']
prm_max = self.param.at[idx, 'max']
if value < prm_min:
return value + (prm_min - value)
elif value > prm_max:
return value + (prm_max - value)
else:
return value
[docs] def run_md(self) -> Optional[List[MultiMolecule]]:
"""Run a geometry optimization followed by a molecular dynamics (MD) job.
Returns a new :class:`.MultiMolecule` instance constructed from the MD trajectory and the
path to the MD results.
If no trajectory is available (*i.e.* the job crashed) return *None* instead.
* The MD job is constructed according to the provided settings in **self.job**.
Returns
-------
|FOX.MultiMolecule|_ and |tuple|_ [|str|_]:
A list of :class:`.MultiMolecule` instance(s) constructed from the MD trajectory &
a list of paths to the PLAMS results directories.
The :class:`.MultiMolecule` list is replaced with ``None`` if the job crashes.
"""
# Prepare preoptimization settings
if self.preopt_settings is not None:
preopt_mol_list = self._md_preopt()
preopt_accept = self._evaluate_rmsd(preopt_mol_list, self.rmsd_threshold)
# Run an MD calculation
if preopt_accept and preopt_mol_list is not None:
return self._md(mol.as_Molecule(-1)[0] for mol in preopt_mol_list)
return None
else: # preoptimization is disabled
return self._md(self._plams_molecule)
def _md_preopt(self) -> Optional[List[MultiMolecule]]:
"""Peform a geometry optimization.
Optimizations are performed on all molecules in :attr:`MonteCarlo.job`[``"molecule"``].
Returns
-------
|FOX.MultiMolecule|_ and |str|_
A list of :class:`.MultiMolecule` instance constructed from the geometry optimization.
The :class:`.MultiMolecule` list is replaced with ``None`` if the job crashes.
"""
name = f'{self.job_name}.preopt'
job_type = self.job_type
job_list = [job_type(name=name, molecule=mol, settings=s) for
mol, s in zip(self._plams_molecule, self.preopt_settings)]
# Preoptimize
mol_list = []
for job in job_list:
job.name += '.opt'
self.job_cache.append(job)
try:
results = job.run()
except FileNotFoundError:
return None # Precaution against PLAMS unpickling old Jobs that don't exist
if job.status in {'crashed', 'failed'}:
return None
try: # Construct and return a MultiMolecule object
path = results.get_xyz_path()
mol = MultiMolecule.from_xyz(path)
mol.round(3)
except TypeError: # The geometry optimization crashed
return None
except XYZError: # The .xyz file is unreadable for some reason
self.logger.warning(f"Failed to parse ...{os.sep}{os.path.basename(path)}")
return None
mol_list.append(mol)
return mol_list
def _md(self, mol_preopt: Iterable[Molecule]) -> Optional[List[MultiMolecule]]:
"""Peform a molecular dynamics simulation (MD).
Simulations are performed on all molecules in **mol_preopt**.
Parameters
----------
mol_preopt : |list|_ [|Molecule|_]
An iterable consisting of PLAMS Molecules.
Returns
-------
|list|_ [|FOX.MultiMolecule|_], optional
A list of :class:`.MultiMolecule` instance(s) constructed from the MD simulation.
Return ``None`` if the job crashes.
"""
name = self.job_name
job_type = self.job_type
jobs = [job_type(name=name, molecule=mol, settings=s) for
mol, s in zip(mol_preopt, self.md_settings)]
# Run MD
mol_list = []
for job in jobs:
job.name += '.MD'
self.job_cache.append(job)
try:
results = job.run()
except FileNotFoundError:
return None # Precaution against PLAMS unpickling old Jobs that don't exist
if job.status in {'crashed', 'failed'}:
return None
try: # Construct and return a MultiMolecule object
path = results.get_xyz_path()
mol = MultiMolecule.from_xyz(path)
mol.round(3)
except TypeError: # The MD simulation crashed
return None
except XYZError: # The .xyz file is unreadable for some reason
self.logger.warning(f"Failed to parse ...{os.sep}{os.path.basename(path)}")
return None
mol_list.append(mol)
return mol_list
@staticmethod
def _evaluate_rmsd(mol_preopt: Optional[Iterable[MultiMolecule]],
threshold: Optional[float] = None) -> bool:
"""Evaluate the RMSD of the geometry optimization (see :meth:`_md_preopt`).
If the root mean square displacement (RMSD) of the last frame is
larger than **threshold**, return ``False``.
Otherwise, return ``True`` .
Parameters
----------
mol_preopt : |FOX.MultiMolecule|_
A :class:`.MultiMolecule` instance constructed from a geometry pre-optimization
(see :meth:`_md_preopt`).
threshold : float
Optional: An RMSD threshold in Angstrom.
Determines whether or not a given RMSD will return ``True`` or ``False``.
Returns
-------
|bool|_:
``False`` if th RMSD is larger than **threshold**, ``True`` if it is not.
"""
threshold_ = threshold or np.inf
if mol_preopt is None:
return False
for mol in mol_preopt:
rmsd = mol.get_rmsd(mol_subset=-1)
if rmsd > threshold_:
return False
return True
[docs] def clear_job_cache(self) -> None:
"""Clear :attr:`MonteCarlo.job_cache` and, optionally, delete all cp2k output files."""
if not self.keep_files:
for job in self.job_cache:
shutil.rmtree(job.path)
self.job_cache = []
[docs] def get_pes_descriptors(self, key: Tuple[float], get_first_key: bool = False,
) -> Tuple[Dict[str, np.ndarray], Optional[List[MultiMolecule]]]:
"""Check if a **key** is already present in **history_dict**.
If ``True``, return the matching list of PES descriptors;
If ``False``, construct and return a new list of PES descriptors.
* The PES descriptors are constructed by the provided settings in **self.pes**.
Parameters
----------
key : tuple [float]
A key in **history_dict**.
get_first_key : :class:`bool`
Keep the both the files and the job_cache if this is the first ARMC iteration.
Usefull for manual inspection in case cp2k hard-crashes at this point.
Returns
-------
|dict|_ [|str|_, |np.ndarray|_ [|np.float64|_]) and |FOX.MultiMolecule|_
A previous value from **history_dict** or a new value from an MD calculation &
a :class:`.MultiMolecule` instance constructed from the MD simulation.
Values are set to ``np.inf`` if the MD job crashed.
"""
# Generate PES descriptors
mol_list = self.run_md()
if mol_list is None: # The MD simulation crashed
ret = {key: np.inf for key in self.pes.keys()}
else:
for func in self.pes_post_process:
func(mol_list, self) # Post-process the MultiMolecules
iterator = zip(self.pes.items(), cycle(mol_list))
ret = {k: func(mol) for (k, func), mol in iterator}
if not get_first_key:
self.clear_job_cache()
return ret, mol_list
# Fix for TypeErrors raised during deep copying prior to python 3.7
if version_info.minor < 7:
@add_to_class(MonteCarlo)
def copy(self, deep: bool = False) -> MonteCarlo:
ret = AbstractDataClass.copy(self, deep=False)
if not deep:
return ret
dct = vars(ret)
for k, v in dct.items():
try:
dct[k] = pycopy.deepcopy(v)
except TypeError:
dct[k] = pycopy.copy(v)
return ret