# This code is part of a Qiskit project.
#
# (C) Copyright IBM 2018, 2024.
#
# This code is licensed under the Apache License, Version 2.0. You may
# obtain a copy of this license in the LICENSE.txt file in the root directory
# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0.
#
# Any modifications or derivative works of this code must retain this
# copyright notice, and modified files need to carry a notice indicating
# that they have been altered from the originals.
"""Parallelized Limited-memory BFGS optimizer"""
from __future__ import annotations
import logging
import multiprocessing
import platform
from collections.abc import Callable
from typing import SupportsFloat
import numpy as np
from ..utils import algorithm_globals
from ..utils.validation import validate_min
from .optimizer import OptimizerResult, POINT
from .scipy_optimizer import SciPyOptimizer
logger = logging.getLogger(__name__)
[docs]
class P_BFGS(SciPyOptimizer): # pylint: disable=invalid-name
"""
Parallelized Limited-memory BFGS optimizer.
P-BFGS is a parallelized version of :class:`L_BFGS_B` with which it shares the same parameters.
P-BFGS can be useful when the target hardware is a quantum simulator running on a classical
machine. This allows the multiple processes to use simulation to potentially reach a minimum
faster. The parallelization may also help the optimizer avoid getting stuck at local optima.
Uses scipy.optimize.fmin_l_bfgs_b.
For further detail, please refer to
https://docs.scipy.org/doc/scipy/reference/generated/scipy.optimize.fmin_l_bfgs_b.html
.. note::
This component has some function that is normally random. If you want to reproduce behavior
then you should set the random number generator seed in the algorithm_globals
(``qiskit_machine_learning.utils.algorithm_globals.random_seed = seed``).
"""
_OPTIONS = ["maxfun", "ftol", "iprint"]
# pylint: disable=too-many-positional-arguments
# pylint: disable=unused-argument
def __init__(
self,
maxfun: int = 1000,
ftol: SupportsFloat = 10 * np.finfo(float).eps,
iprint: int = -1,
max_processes: int | None = None,
options: dict | None = None,
max_evals_grouped: int = 1,
**kwargs,
) -> None:
r"""
Args:
maxfun: Maximum number of function evaluations.
ftol: The iteration stops when (f\^k - f\^{k+1})/max{\|f\^k\|,\|f\^{k+1}\|,1} <= ftol.
iprint: Controls the frequency of output. iprint < 0 means no output;
iprint = 0 print only one line at the last iteration; 0 < iprint < 99
print also f and \|proj g\| every iprint iterations; iprint = 99 print
details of every iteration except n-vectors; iprint = 100 print also the
changes of active set and final x; iprint > 100 print details of
every iteration including x and g.
max_processes: maximum number of processes allowed, has a min. value of 1 if not None.
options: A dictionary of solver options.
max_evals_grouped: Max number of default gradient evaluations performed simultaneously.
kwargs: additional kwargs for scipy.optimize.minimize.
"""
if max_processes:
validate_min("max_processes", max_processes, 1)
if options is None:
options = {}
for k, v in list(locals().items()):
if k in self._OPTIONS:
options[k] = v
super().__init__(
method="L-BFGS-B",
options=options,
max_evals_grouped=max_evals_grouped,
**kwargs,
)
self._max_processes = max_processes
[docs]
def minimize(
self,
fun: Callable[[POINT], float],
x0: POINT,
jac: Callable[[POINT], POINT] | None = None,
bounds: list[tuple[float, float]] | None = None,
) -> OptimizerResult:
x0 = np.asarray(x0)
num_procs = multiprocessing.cpu_count() - 1
num_procs = (
num_procs if self._max_processes is None else min(num_procs, self._max_processes)
)
num_procs = num_procs if num_procs >= 0 else 0
if platform.system() == "Darwin":
# Changed in version 3.8: On macOS, the spawn start method is now the
# default. The fork start method should be considered unsafe as it can
# lead to crashes.
# However P_BFGS doesn't support spawn, so we revert to single process.
num_procs = 0
logger.warning(
"For MacOS, python >= 3.8, using only current process. "
"Multiple core use not supported."
)
elif platform.system() == "Windows":
num_procs = 0
logger.warning(
"For Windows, using only current process. Multiple core use not supported."
)
queue: multiprocessing.queues.Queue[tuple[POINT, float, int]] = multiprocessing.Queue()
# TODO: are automatic bounds a good idea? What if the circuit parameters are not
# just from plain Pauli rotations but have a coefficient?
# bounds for additional initial points in case bounds has any None values
threshold = 2 * np.pi
if bounds is None:
bounds = [(-threshold, threshold)] * x0.size
low = [(l if l is not None else -threshold) for (l, u) in bounds]
high = [(u if u is not None else threshold) for (l, u) in bounds]
def optimize_runner(_queue, _i_pt): # Multi-process sampling
_sol, _opt, _nfev = self._optimize(fun, _i_pt, jac, bounds)
_queue.put((_sol, _opt, _nfev))
# Start off as many other processes running the optimize (can be 0)
processes = []
for _ in range(num_procs):
i_pt = algorithm_globals.random.uniform(low, high) # Another random point in bounds
proc = multiprocessing.Process(target=optimize_runner, args=(queue, i_pt))
processes.append(proc)
proc.start()
# While the one optimize in this process below runs the other processes will
# be running too. This one runs
# with the supplied initial point. The process ones have their own random one
sol, opt, nfev = self._optimize(fun, x0, jac, bounds)
for proc in processes:
# For each other process we wait now for it to finish and see if it has
# a better result than above
proc.join()
p_sol, p_opt, p_nfev = queue.get()
if p_opt < opt:
sol, opt = p_sol, p_opt
nfev += p_nfev
result = OptimizerResult()
result.x = sol
result.fun = opt
result.nfev = nfev
return result
def _optimize(
self,
objective_function,
initial_point,
gradient_function=None,
variable_bounds=None,
) -> tuple[POINT, float, int]:
result = super().minimize(
objective_function, initial_point, gradient_function, variable_bounds
)
return result.x, result.fun, result.nfev