Source code for qiskit_machine_learning.optimizers.adam_amsgrad

# This code is part of a Qiskit project.
#
# (C) Copyright IBM 2019, 2024.
#
# This code is licensed under the Apache License, Version 2.0. You may
# obtain a copy of this license in the LICENSE.txt file in the root directory
# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0.
#
# Any modifications or derivative works of this code must retain this
# copyright notice, and modified files need to carry a notice indicating
# that they have been altered from the originals.

"""The Adam and AMSGRAD optimizers."""
from __future__ import annotations

from collections.abc import Callable
from typing import Any
import os

import csv
import numpy as np
from .optimizer import Optimizer, OptimizerSupportLevel, OptimizerResult, POINT

CALLBACK = Callable[[int, POINT, float], None]

# pylint: disable=invalid-name


[docs] class ADAM(Optimizer): """Adam and AMSGRAD optimizers. Adam [1] is a gradient-based optimization algorithm that is relies on adaptive estimates of lower-order moments. The algorithm requires little memory and is invariant to diagonal rescaling of the gradients. Furthermore, it is able to cope with non-stationary objective functions and noisy and/or sparse gradients. AMSGRAD [2] (a variant of Adam) uses a 'long-term memory' of past gradients and, thereby, improves convergence properties. References: [1]: Kingma, Diederik & Ba, Jimmy (2014), Adam: A Method for Stochastic Optimization. `arXiv:1412.6980 <https://arxiv.org/abs/1412.6980>`_ [2]: Sashank J. Reddi and Satyen Kale and Sanjiv Kumar (2018), On the Convergence of Adam and Beyond. `arXiv:1904.09237 <https://arxiv.org/abs/1904.09237>`_ """ _OPTIONS = [ "maxiter", "tol", "lr", "beta_1", "beta_2", "noise_factor", "eps", "amsgrad", "snapshot_dir", ] # pylint: disable=too-many-positional-arguments def __init__( self, maxiter: int = 10000, tol: float = 1e-6, lr: float = 1e-3, beta_1: float = 0.9, beta_2: float = 0.99, noise_factor: float = 1e-8, eps: float = 1e-10, amsgrad: bool = False, snapshot_dir: str | None = None, callback: CALLBACK | None = None, ) -> None: """ Args: maxiter: Maximum number of iterations tol: Tolerance for termination lr: Value >= 0, Learning rate. beta_1: Value in range 0 to 1, Generally close to 1. beta_2: Value in range 0 to 1, Generally close to 1. noise_factor: Value >= 0, Noise factor eps : Value >=0, Epsilon to be used for finite differences if no analytic gradient method is given. amsgrad: True to use AMSGRAD, False if not snapshot_dir: If not None save the optimizer's parameter after every step to the given directory callback: A callback function passed information in each iteration step. The information is, in this order: current time step, the parameters, the function value. """ super().__init__() self.callback = callback for k, v in list(locals().items()): if k in self._OPTIONS: self._options[k] = v self._maxiter = maxiter self._snapshot_dir = snapshot_dir self._tol = tol self._lr = lr self._beta_1 = beta_1 self._beta_2 = beta_2 self._noise_factor = noise_factor self._eps = eps self._amsgrad = amsgrad # runtime variables self._t = 0 # time steps self._m = np.zeros(1) self._v = np.zeros(1) if self._amsgrad: self._v_eff = np.zeros(1) if self._snapshot_dir is not None: file_path = os.path.join(self._snapshot_dir, "adam_params.csv") if not os.path.isfile(file_path): # pylint: disable=unspecified-encoding with open(file_path, mode="w") as csv_file: fieldnames = ["v", "v_eff", "m", "t"] if self._amsgrad else ["v", "m", "t"] writer = csv.DictWriter(csv_file, fieldnames=fieldnames) writer.writeheader() @property def settings(self) -> dict[str, Any]: return { "maxiter": self._maxiter, "tol": self._tol, "lr": self._lr, "beta_1": self._beta_1, "beta_2": self._beta_2, "noise_factor": self._noise_factor, "eps": self._eps, "amsgrad": self._amsgrad, "snapshot_dir": self._snapshot_dir, }
[docs] def get_support_level(self): """Return support level dictionary""" return { "gradient": OptimizerSupportLevel.supported, "bounds": OptimizerSupportLevel.ignored, "initial_point": OptimizerSupportLevel.supported, }
[docs] def save_params(self, snapshot_dir: str) -> None: """Save the current iteration parameters to a file called ``adam_params.csv``. Note: The current parameters are appended to the file, if it exists already. The file is not overwritten. Args: snapshot_dir: The directory to store the file in. """ # pylint: disable=unspecified-encoding file_path = os.path.join(snapshot_dir, "adam_params.csv") if not os.path.isfile(file_path): raise FileNotFoundError(f"The file {file_path} does not exist.") fieldnames = ["v", "v_eff", "m", "t"] if self._amsgrad else ["v", "m", "t"] with open(file_path, mode="a", newline="") as csv_file: writer = csv.DictWriter(csv_file, fieldnames=fieldnames) row = {"v": self._v, "m": self._m, "t": self._t} if self._amsgrad: row["v_eff"] = self._v_eff writer.writerow(row)
[docs] def load_params(self, load_dir: str) -> None: """Load iteration parameters for a file called ``adam_params.csv``. Args: load_dir: The directory containing ``adam_params.csv``. """ # pylint: disable=unspecified-encoding file_path = os.path.join(load_dir, "adam_params.csv") if not os.path.isfile(file_path): raise FileNotFoundError(f"The file {file_path} does not exist.") with open(file_path, mode="r", newline="") as csv_file: reader = csv.DictReader(csv_file) for line in reader: self._v = np.fromstring(line["v"].strip("[]"), dtype=float, sep=" ") if self._amsgrad: self._v_eff = np.fromstring(line["v_eff"].strip("[]"), dtype=float, sep=" ") self._m = np.fromstring(line["m"].strip("[]"), dtype=float, sep=" ") self._t = int(line["t"].strip("[]"))
[docs] def minimize( self, fun: Callable[[POINT], float], x0: POINT, jac: Callable[[POINT], POINT] | None = None, bounds: list[tuple[float, float]] | None = None, ) -> OptimizerResult: """Minimize the scalar function. Args: fun: The scalar function to minimize. x0: The initial point for the minimization. jac: The gradient of the scalar function ``fun``. bounds: Bounds for the variables of ``fun``. This argument might be ignored if the optimizer does not support bounds. Returns: The result of the optimization, containing e.g. the result as attribute ``x``. """ if jac is None: jac = Optimizer.wrap_function(Optimizer.gradient_num_diff, (fun, self._eps)) derivative = jac(x0) self._t = 0 self._m = np.zeros(np.shape(derivative)) self._v = np.zeros(np.shape(derivative)) if self._amsgrad: self._v_eff = np.zeros(np.shape(derivative)) params = params_new = x0 while self._t < self._maxiter: if self._t > 0: derivative = jac(params) self._t += 1 self._m = self._beta_1 * self._m + (1 - self._beta_1) * derivative self._v = self._beta_2 * self._v + (1 - self._beta_2) * derivative * derivative lr_eff = self._lr * np.sqrt(1 - self._beta_2**self._t) / (1 - self._beta_1**self._t) if not self._amsgrad: params_new = params - lr_eff * self._m.flatten() / ( np.sqrt(self._v.flatten()) + self._noise_factor ) else: self._v_eff = np.maximum(self._v_eff, self._v) params_new = params - lr_eff * self._m.flatten() / ( np.sqrt(self._v_eff.flatten()) + self._noise_factor ) if self._snapshot_dir is not None: self.save_params(self._snapshot_dir) if self.callback is not None: self.callback(self._t, params_new, fun(params_new)) # check termination if np.linalg.norm(params - params_new) < self._tol: break params = params_new result = OptimizerResult() result.x = params_new result.fun = fun(params_new) result.nfev = self._t return result