Source code for PARyOpt.paryopt

"""
---
    Copyright (c) 2018 Baskar Ganapathysubramanian, Balaji Sesha Sarath Pokuri
    
    Permission is hereby granted, free of charge, to any person obtaining a copy
    of this software and associated documentation files (the "Software"), to deal
    in the Software without restriction, including without limitation the rights
    to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
    copies of the Software, and to permit persons to whom the Software is
    furnished to do so, subject to the following conditions:

    The above copyright notice and this permission notice shall be included in all
    copies or substantial portions of the Software.

    THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
    IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
    FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
    AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
    LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
    OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
    SOFTWARE.
---
"""

## --- end license text --- ##
"""
Main class for bayesian optimization
"""
import logging
import math
import os
import pickle
from scipy import io
import traceback
import warnings
import csv  # for data export
from typing import Callable, Union, List, Tuple

import numpy as np
from scipy import optimize

import PARyOpt.acquisition_functions as acqf
import PARyOpt.kernel as kf
import PARyOpt.utils as ut


# logging
logger = logging.getLogger(__name__)


def _get_kernel_function(name: str) -> kf.KernelFunction:
    """
    Returns a kernel function corresponding to a name.
    Raises a ValueError if name is unknown.

    :param name: name of kernel function
    :return: function corresponding to name
    """
    if name == 'sqr_exp':
        return kf.SquaredExponential()
    elif name == 'matern_52':
        return kf.Matern52()
    elif name == 'matern_32':
        return kf.Matern32()
    else:
        raise ValueError('Unknown / unimplemented kernel function')


def _get_acquisition_function(name: str) -> Callable[[float, float, float, float], float]:
    """
    Returns an acquisition function corresponding to a name.
    Raises a ValueError if name is unknown.

    :param name:
    :return:
    """
    if name.upper() == 'LCB':
        # Lower confidence bound : for minimization problems
        return acqf.lower_confidence_bound
    elif name.upper() == 'UCB':
        # Upper confidence bound : for maximization problems
        return acqf.upper_confidence_bound
    elif name.upper() == 'PI':
        # Probability of improvement
        return acqf.probability_improvement
    elif name.upper() == 'EI':
        # expected improvement
        return acqf.expected_improvement
    else:
        raise ValueError('Unknown / unimplemented acquisition function')


def _resolve_acquisition_list(acq_list: List[Union[str, Callable]]= None) -> List[Callable]:
    """
    Resolves the list of acquisition functions and returns a list of acquisition function

    :param acq_list: list of strings of functions
    :return: list of functions
    """
    acq_function_list = []
    for acq_func in acq_list:
        if type(acq_func) is str:
            acq_function_list += [_get_acquisition_function(acq_func)]
        elif callable(acq_func):
            acq_function_list += [acq_func]
        else:
            raise ValueError('Unknown / unimplemented acquisition function')
    return acq_function_list


def _constant_kappa(curr_iter: int) -> float:
    """
    Return a constant kappa
    This should also be a sample of kappa strategy

    :param curr_iter: current iteration
    :return:
    """
    assert curr_iter >= 0
    return 2.0


def _init_strategy_lhs(n_dim: int, n_init: int, l_bound: np.array, u_bound: np.array) -> List[np.array]:
    """
    Generate initial locations for the population.
    The locations are sampled between 0 and 1 using Latin Hypercube Sampling, then scaled to the bounds.

    :param n_init: number of initial points
    :param n_dim: dimensionality
    :return: an array of shape (n_init, n_dim)
    """
    normalized_population = ut.lhs(n_dim, samples=n_init, criterion='cm', iterations=20)  # type: np.ndarray
    real_population = l_bound + normalized_population * (u_bound - l_bound)
    real_population = [p for p in real_population]
    return real_population


def _default_acq_optimizer(func: Callable[[np.array], float], x0: np.array, l_bound: np.array, u_bound: np.array) -> \
        np.array:
    """
    Default acquisition function optimizer : Uses the Powell method in scipy.optimize

    :param func: acquisition function
    :param x0: initial guess
    :param l_bound : lower bound
    :param u_bound : upper bound
    :return: Optimum location (array)
    """
    # Another example using the brute search
    # res = optimize.brute(func, bounds, Ns=50, full_output=True)
    # return res[0]

    bounds = list(zip(l_bound, u_bound))
    res = optimize.minimize(fun=func, x0=x0, bounds=bounds, method='Powell')
    return res.x


def _invert_matrix_mult(A: np.array, b: np.array) -> Tuple[np.array, np.array]:
    """
    Invert the input matrix and multiply with b to get coeff, i.e., returns inv(A) and A_inv_b
    :param A: Matrix
    :param b: vector
    :return: A^(-1) and A^(-1) b
    """
    n = A.shape[0]
    assert A.shape[0] == b.shape[0]
    try:
        # cholesky decomposition
        logger.debug('Eigen values of K are: {}'.format(np.linalg.eigvals(A)))
        l_cholesky = np.linalg.cholesky(A)
        l_inv = np.linalg.solve(l_cholesky, np.identity(n))
        A_inv = l_inv.transpose().dot(l_inv)
    except np.linalg.linalg.LinAlgError:
        logger.debug('Using direct inverse and not Cholesky decomposition. near singular matrix')
        A_inv = np.linalg.inv(A)

    A_inv_b = A_inv.dot(b)

    return A_inv, A_inv_b


[docs]class BayesOpt: """ Bayesian optimization class. :var curr_iter: iteration number of optimizer :var n_surrogate_opt: number of optima to get during surrogate optimization :var n_init: initial population size (>2) :var n_dim: dimensions of optimization :var l_bound, u_bound: lower and upper bounds on variables :var total_population: total set of population: list of arrays :var func_vector: functional values for the population : scalar for each population :var K: covariance matrix :var K_inv: inverse of covariance matrix :var K_inv_y: K_inv * func_vector : pre-computation to save costly / unstable inversion :var acquisition_function: acquisition function to optimize the surrogate :var cost_function: cost function to MINIMIZE: should be able to take a vector of locations :var constraints: list of constraint functions (only points that satisfy these functions will be visited) \ NOT enforced in the default init strategy!! :var if_restart: whether it should restart or not :var restart_filename: file to restart from. it will be `opt_state.dat` if nothing is specified :var acq_func_optimizer: optimizer for acquisition function. Should take in function, initial guess, bounds and \ function derivative. Returns the optimal location """ def __init__(self, cost_function: Union[ Callable[[List[np.array]], Tuple[List[Tuple[np.array, float]], List[np.array], List[np.array]]], Callable[[List[np.array], List[np.array]], Tuple[List[Tuple[np.array, float]], List[np.array], List[np.array]]]], l_bound: np.array, u_bound: np.array, n_dim: int, n_opt: int = 1, n_init: int = 0, init_strategy: Callable[[int, int, np.array, np.array], List[np.array]] = None, do_init: bool = True, kern_function: Union[str, kf.KernelFunction] = None, acq_func: Union[str, Callable[[float, float, float, float], float]] = None, acq_func_optimizer: Callable = None, kappa_strategy: Union[List[Callable[[int], float]], Callable[[int], float], None] = None, constraints: List[Callable[[np.array], bool]] = list(), if_restart: bool = None, restart_filename: str = 'opt_state.dat') -> None: assert l_bound.shape == u_bound.shape, "The supplied bounds have unequal shapes" assert len(l_bound) == n_dim, "The bounds have different dimensionality than n_dim" assert n_opt >= 1, "Asking for less than 1 optima per iteration, NOT possible. Terminating!" if do_init: assert n_init > 1, "Initial points are less than 2, not possible to build a surrogate" self.n_surrogate_opt = n_opt self.n_dim = n_dim self.n_init = n_init self.u_bound = u_bound self.l_bound = l_bound assert kern_function is not None, "No kernel function specified" if type(kern_function) is str: self.kernel = _get_kernel_function(kern_function) else: assert hasattr(kern_function, "eval"), "Kernel function does not implement the KernelFunction interface" self.kernel = kern_function # resolve acquisition functions assert callable(acq_func) or type(acq_func) is str or type(acq_func) is list, "Supplied acquisition function " \ "is neither callable nor a " \ "string" \ "nor a list of callables/string" if type(acq_func) is str or callable(acq_func): # only one string/function is provided, make it into a list of strings/functions acq_func_list = [acq_func for _ in range(self.n_surrogate_opt)] self.acquisition_function = _resolve_acquisition_list(acq_func_list) elif type(acq_func) is list: assert len(acq_func) == self.n_surrogate_opt, "Number of acquisition functions supplied do not " \ "match the requested optima per iteration" self.acquisition_function = _resolve_acquisition_list(acq_func) else: raise ValueError("Problem with acquisition function") # acquisition function optimizer assert acq_func_optimizer is None or callable(acq_func_optimizer), \ "Supplied acquition function optimizer is not callable" self.acq_func_optimizer = acq_func_optimizer if acq_func_optimizer is not None else _default_acq_optimizer assert cost_function is not None, "No cost function supplied, terminating!" self.cost_function = cost_function # managed variables self.curr_iter = 0 self.curr_points = 0 self.total_population = list() # type: List[np.array] # allocate function values for total population self.func_vector = list() # type: List[float] # covariance matrix & inverse self.K = np.identity(self.curr_points) self.K_inv = np.identity(self.curr_points) self.K_success = np.identity(self.curr_points) self.K_inv_success = np.identity(self.curr_points) # inverse times the function vector : self.K_inv_y = np.empty((self.curr_points, 1)) self.K_inv_y_success = np.empty((self.curr_points, 1)) # restart self.restart_file_path = restart_filename if if_restart is None: if_restart = os.path.exists(self.restart_file_path) self.if_restart = if_restart self.inside_iter = False # perform initialization based on strategy provided: if do_init and init_strategy is None: self.init_strategy = _init_strategy_lhs if len(constraints) > 0: warnings.warn("User-specified constraints are not respected by the default init_strategy. " "Initialization will probably fail.", category=RuntimeWarning) else: self.init_strategy = init_strategy # kappa strategy: # if not given, default to _constant_kappa if kappa_strategy is None: kappa_strategy = _constant_kappa if type(kappa_strategy) is list: # if the user passed a list, they have specified a kappa strategy for each surrogate optima # verify the number of functions matches and use it directly if not len(kappa_strategy) == self.n_surrogate_opt: raise ValueError("Number of kappa strategies does not match the number of surrogate optima") self.get_kappa = kappa_strategy elif callable(kappa_strategy): # kappa_strategy is a function: repeat it n_surrogate_opt times self.get_kappa = [kappa_strategy for _ in range(self.n_surrogate_opt)] else: raise ValueError("Invalid kappa_strategy (expected callable or list of callables)") # constraints self.constraints = constraints # initialize next query points and pending query points self.next_query_points = [np.zeros(self.n_dim) for _ in range(self.n_surrogate_opt)] self.pending_query_points = list() self.failed_query_points = list() self.failed_evaluations = list() ########### logging ############ logger.info('dimensionality: {}'.format(n_dim)) logger.info('Bounds: l_bound: {}, u_bound: {}'.format(l_bound, u_bound)) logger.info('kernel function: {}'.format(kern_function if type(kern_function) is str else type(kern_function).__name__)) logger.info('acquisition function: {}'. format(acq_func if type(acq_func) is str else acq_func.__name__)) logger.info('kernel function : theta: {}, theta0: {}'.format(self.kernel.theta, self.kernel.theta0)) logger.info('Init finished') if do_init: self._initialize() assert all(u_bound >= l_bound), "all the upper bound values are not greater than lower bound values" def _initialize(self) -> None: """ Initialize the population based on a random number, between the upper and lower bound :return: None """ if self.if_restart: self._restart_from_file() else: # randomly initialize population population = self.init_strategy(self.n_dim, self.n_init, self.l_bound, self.u_bound) # save pending and failed directly (since this is initialization) completed = self._require_all_cost_function(population) for x, y in completed: assert len(x.shape) == 1 and x.shape[0] == self.n_dim self.add_point(x, y, if_check_nearness=True) self.update_surrogate() # note that np.dot(self.K, self.K_inv) = np.identity if self.curr_points < 2: raise RuntimeError('Initialization strategy failed to provide at least two points. Cannot continue.')
[docs] def add_point(self, x: np.array, y: float = None, if_check_nearness: bool = True, is_failed: bool = False) -> None: """ Adds ONLY ONE point to the current set of data we have. if_check_nearness specifies if it is needed to check nearness constraints before adding the point into total_population. This should be true if the user is manually adding points into the population. Updates following variables: K, K_inv, total_population, func_vector If the point is a failed evaluation, then it does not add into total_population and func_vector :param x: array of shape (1, self.n_dim) :param y: cost function value at that location :param if_check_nearness: boolean, whether to check nearness or not. :param is_failed: boolean, whether the point to add is a failed evaluation or not :return: None """ assert len(x.shape) == 1, "Adding more than one point using add_point. NOT possible, Terminating!" assert x.shape[0] == self.n_dim if (not if_check_nearness) or (if_check_nearness and not self._is_near_to_population(x)): self.curr_points += 1 self._add_point_to_population(x, is_failed) k_vec = self._calc_k_vec_at(x, include_failed=True) if not is_failed: if y is None: logger.info('add_point(..) evaluating new point, no cost function value supplied') completed = self._require_all_cost_function([x]) y = completed[0] # add new location to total population self._add_point_to_func_vector(y) self._add_row_to_covariance(k_vec, is_failed) logger.debug("New point added {}!. Current size of population: {}".format( 'to population' if not is_failed else 'from failed list', self.curr_points)) else: logger.info('The added point ({}) is close to other existing points. Not adding into total population!'. format(x),
category=RuntimeWarning) def _calc_log_likelihood(self) -> float: """ Calculates the log-likelihood for the current kernel shape parameters (Maximum Likelihood Estimate, or MLE). This operation is done ONLY on self.K_success, i.e., for points with successful function evaluation. :return: log-likelihood value for current kernel shape parameters """ n = 1.0 * self.K_success.shape[0] ''' sum_log_eig = 0.0 eig_val, _ = np.linalg.eig(self.K) for i in range(0, n): sum_log_eig += log(eig_val[i]) sum_log_eig /= n ''' det_K = np.linalg.det(self.K_success) if det_K <= 1e-14: logger.warning('Negative or zero det(K) for theta = {}, returning penalty'.format(self.kernel.theta)) return 1.e16 # from 'Mongillo, Michael. "Choosing basis functions and shape parameters for radial basis function methods." # SIAM Undergraduate Research Online 4 (2011): 190-209.' sum_log_eig = math.log(det_K) / n yT_times_K_inv = np.dot(self.func_vector, self.K_inv_y_success) log_yT_c = math.log(yT_times_K_inv) mle = log_yT_c + sum_log_eig logger.debug("MLE = {}, for theta = {}".format(mle, self.kernel.theta)) return mle
[docs] def estimate_best_kernel_parameters(self, theta_bounds=None) -> None: """ Calculates and *sets* the best shape-parameter/characteristic length-scale for RBF kernel function and applies it to the current model. :param theta_bounds: array of bounds for theta :return: None """ old_parameters = self.kernel.theta0, self.kernel.theta if theta_bounds is None: theta_bounds = [[0.1, 12.0]] def calc_mle(x1: float, x2: float = None) -> float: """ Calculate the Maximum likelihood estimate :param x1: value of theta for kernel function :param x2: value of theta0, generally 1.0 :return: """ if x2 is None: x2 = 1.0 self.set_new_kernel_parameters(theta=float(x1), theta0=x2) mle = self._calc_log_likelihood() return mle try: theta_opt = optimize.brute(calc_mle, theta_bounds, Ns=30, full_output=False, finish=None) self.set_new_kernel_parameters(theta=theta_opt, theta0=1.0) logger.info('Kernel function parameter optimized: theta: {}, theta0: {}'.format(self.kernel.theta, self.kernel.theta0)) except Exception as _: logging.error(traceback.format_exc()) logger.warning('bayes opt hyper parameter optimization failed! Resetting to old parameters')
self.set_new_kernel_parameters(theta=old_parameters[1], theta0=old_parameters[0])
[docs] def update_iter(self, iter_max: int = 1) -> None: """ Finds the next query point and updates the data for the next iteration :return: None """ for _ in range(iter_max): if not self.inside_iter: self.next_query_points = self._find_next_query_point() self.inside_iter = True self._save_to_file() # unpack directly into pending completed, self.pending_query_points, failed =\ self.cost_function(self.next_query_points, self.pending_query_points) logger.info('Cost function evaluated! Number completed: {}, failed: {} and pending: {}'.format( len(completed), len(self.pending_query_points), len(failed))) for x, y in completed: self.add_point(x, y, if_check_nearness=True, is_failed=False) # add the failed points as interpolated points to covariance matrix for x in failed: self.add_point(x, if_check_nearness=True, is_failed=True) # update covariance matrix inverse after adding all points self.update_surrogate() self.curr_iter += 1 self.inside_iter = False self._save_to_file() if len(completed) == 0: msg = 'Failed to evaluate any new points during iteration ' + str(self.curr_iter) + '.' logger.warning(msg) self._save_to_file() logger.info('{} iterations completed, total population: {}, number of iterations: {}!'.
format(iter_max, self.curr_points, self.curr_iter)) def _find_next_query_point(self) -> List[np.array]: """ :return: list x values (length up to self.n_surrogate_opt - may be less) """ next_points = [] avoid_points = [] max_tries = 10 for i in range(self.n_surrogate_opt): tries = 0 while tries < max_tries: tries += 1 kappa = self.get_kappa[i](self.curr_iter) logger.info('opt = {0:d}, kappa = {1:.2f}'.format(i, kappa)) def opt_func(x1: np.array) -> float: """ function to optimize (acquisition with other arguments filled in) :param x1: :return: """ test_pt = np.asarray([x1]) if len(x1.shape) == 0 else x1 return self.acquisition(test_pt, kappa, avoid=next_points+avoid_points, opt_indx=i) x = self.acq_func_optimizer(func=opt_func, x0=np.mean([self.l_bound, self.u_bound], axis=0), l_bound=self.l_bound, u_bound=self.u_bound) x = np.reshape(x, (self.n_dim,)) # for cases of 1 dimension # never look for this location, even if it is not accepted within the tries. If it is accepted, it will # be included in next_points as well, which will increase the penalty at these locations avoid_points.append(x) if not self._is_near_to_population(x, population=self.total_population + next_points + self.pending_query_points + self.failed_query_points)\ and (all(x <= self.u_bound) and all(x >= self.l_bound))\ and all([c(x) for c in self.constraints]): next_points.append(x) logger.info('Next location: ' + str(x) + ', (acq value: ' + str(self.acquisition(x, kappa, avoid=next_points, opt_indx=i)) + ')') break if tries >= max_tries: logger.warning('Could not find surrogate optima ' + str(i) + ' (all points near population)') # find unique set in the list of next_points, this will make its size less than n_surrogate_opt a = [tuple(x.tolist()) for x in next_points] next_points = [np.array(x) for x in set(a)] return next_points
[docs] def acquisition(self, x: np.array, kappa: float, avoid: List[np.array] = None, opt_indx: int = 0) -> float: """ Calculates the acquisition function + penalty function at the given query location. If the query point is outside the bounds, it will return an infinity This is the function that needs to be optimized. :param x: location to evaluate acquisition function : shape: (1, self.n_dim) :param kappa: kappa value to pass to acquisition function :param avoid: list of x values to avoid (forwarded to penalty function) :param opt_indx: index of acquisition function that is being called :return: scalar acquisition function at x """ assert len(x.shape) == 1, "acquisition received more than one({}) query point!".format(len(x.shape)) if avoid is None: avoid = list() if (x > self.u_bound).any() or (x < self.l_bound).any(): return 1.e16 if not all([c(x) for c in self.constraints]): return 1.e16 mean, variance = self.evaluate_surrogate_at(x, include_failed=True) curr_best = np.min(self.func_vector) return self.acquisition_function[opt_indx](mean, np.sqrt(abs(variance)), curr_best, kappa)\
+ 10.0 * self.penalty_function(x, kappa, avoid)
[docs] def penalty_function(self, x: np.array, kappa: float, avoid: List[np.array] = list()) -> float: """ Calculates the penalty function, to prevent local optima to repeat :param x: location :param kappa: kappa value used in acquisition function :param avoid: extra points to avoid (other than self.total_pop/self.pending/self.failed) :return: penalty function value at x """ penalty_val = 0.0 penalty_wid = max(kappa, 1e-2) # penalize existing points and avoid list for pop in (self.total_population + self.pending_query_points + self.failed_query_points): penalty_val += ut.pdf_normal(x, pop, np.sqrt(penalty_wid)) for pop in avoid: penalty_val += 5.0 * ut.pdf_normal(x, pop, np.sqrt(penalty_wid)) # penalize points close to boundaries # iterate through all the boundaries for bndr_dim in range(self.n_dim): # penalty at the upper bound penalty_val += 5.0 * ut.pdf_normal(x[bndr_dim], self.u_bound[bndr_dim], 1. * np.sqrt(penalty_wid)) # penalty at the lower bound penalty_val += 5.0 * ut.pdf_normal(x[bndr_dim], self.l_bound[bndr_dim], 1. * np.sqrt(penalty_wid))
return penalty_val def _save_to_file(self) -> None: """ saves current data to file :return: None """ data = { "n_dim": self.n_dim, "curr_iter": self.curr_iter, "curr_points": self.curr_points, "total_population": self.total_population, "func_vector": self.func_vector, "inside_iter": self.inside_iter, "next_query_points": self.next_query_points, "pending_query_points": self.pending_query_points, "failed_query_points": self.failed_query_points, "u_bound": self.u_bound, "l_bound": self.l_bound, "theta0": self.kernel.theta0, "theta": self.kernel.theta, "K": self.K, "K_inv": self.K_inv, "K_inv_y": self.K_inv_y, } with open(self.restart_file_path, 'wb') as f: pickle.dump(data, f) io.savemat(self.restart_file_path[:-4] + '.mat', data) if self.inside_iter: logger.info("Saved optimizer state from iter " + str(self.curr_iter) + " to " + str(self.curr_iter + 1)) else: logger.info("Saved optimizer state for iteration " + str(self.curr_iter) + ".") def _restart_from_file(self) -> None: """ Restarts from a file :return: None, updates all the variables """ with open(self.restart_file_path, 'rb') as f: data = pickle.load(f) assert self.n_dim == data["n_dim"], "Dimensionality of restart and initialization are not equal" self.curr_iter = data["curr_iter"] curr_points = data.get("curr_points") total_population = data["total_population"] func_vector = data["func_vector"] self.inside_iter = data["inside_iter"] self.next_query_points = data["next_query_points"] self.pending_query_points = data["pending_query_points"] failed_query_points = data["failed_query_points"] self.u_bound = data["u_bound"] self.l_bound = data["l_bound"] self.kernel.theta = data["theta"] self.kernel.theta0 = data["theta0"] for pop, func in zip(total_population, func_vector): # add point to the population, no need to check nearness self.add_point(pop, func, if_check_nearness=False, is_failed=False) self._evaluate_surrogate_for_failed_list() for pop in failed_query_points: self.add_point(pop, if_check_nearness=True, is_failed=True) assert self.curr_points == curr_points, "Added points({}) and num of points({}) from restart file " \ "do not match".format(self.curr_points, curr_points) self.update_surrogate() logger.info("Resumed on iteration " + str(self.curr_iter) + ".")
[docs] def evaluate_surrogate_at(self, x: np.array, include_failed: bool = False) -> tuple: """ Evaluates the surrogate at given point Taken from : https://arxiv.org/pdf/1012.2599.pdf : "A Tutorial on Bayesian Optimization of Expensive Cost Functions, with Application to Active User Modeling and Hierarchical Reinforcement Learning" Note that it returns sigma^2 and not sigma The mean of the surrogate will not be affected by the failed points, they will only affect the variance of the surrogate :param x: location to evaluate the surrogate rbf approximation :param include_failed: whether to include failed points for surrogate :return: mean(mu), variance (sigma^2) """ k_vec = self._calc_k_vec_at(x, include_failed=include_failed) # row vector k_vec_transpose = np.asarray([k_vec]) # column vector k_cov_diag = self.kernel.eval(x, x) if include_failed: K_inv_y = self.K_inv_y K_inv = self.K_inv else: K_inv = self.K_inv_success K_inv_y = self.K_inv_y_success mean = k_vec_transpose.dot(K_inv_y) variance = k_cov_diag - k_vec_transpose.dot(K_inv.dot(k_vec))
return mean, variance def _evaluate_surrogate_for_failed_list(self) -> None: """ Evaluates the surrogate values for the failed list. This is used to include failed evaluations into the surrogate. The surrogate is updated by calling self.update_surrogate before evaluating the interpolated value. Updates the class variable self.failed_evaluations :return: None """ self.failed_evaluations = [] for failed_point in self.failed_query_points: mean, _ = self.evaluate_surrogate_at(failed_point, include_failed=False) self.failed_evaluations.append(mean) logger.debug('Surrogate evaluated for failed points.') def _calc_k_vec_at(self, x: np.array, include_failed: bool) -> np.array: """ Calculates the k_vector for any arbitrary point. This function can be useful when updating the covariance matrix (per iteration) or when evaluating the surrogate function at x :param x: location to calculate k_vector :return: vector of dimension curr_points """ # if not include_failed: # k_vec = np.asarray([self.kernel.eval(x, y) for y in self.total_population]) # else: # k_vec = np.asarray([self.kernel.eval(x, y) for y in self.total_population] + # [self.kernel.eval(x, y) for y in self.failed_query_points]) # lengths of successful and failed points pop_num = len(self.total_population) fail_num = len(self.failed_query_points) if include_failed: k_vec = np.empty(self.curr_points) else: k_vec = np.empty(pop_num) # fill k_vec for successful population for pop in range(pop_num): k_vec[pop] = self.kernel.eval(x, self.total_population[pop]) if include_failed: # for calculating kernel with failed points, the length scale has to be reduced self.kernel.theta /= 10 for pop in range(fail_num): k_vec[pop_num + pop] = self.kernel.eval(x, self.failed_query_points[pop]) # revert the length scale of the kernel self.kernel.theta *= 10 return k_vec def _add_point_to_population(self, x: np.array, is_failed: bool = False) -> None: """ adds an element to the total population In the case of a failed location, it appends to self.failed_query_points In the case of successful evaluation, it prepends to the self.total_population The above methodology is used to make calculations of k_vec easier, especially if failed points have smaller correlation length. The order of rows in covariance matrix K can be consistent with the "list: self.total_population + self.failed_query_points" :rtype: None :param is_failed: whether we are adding a failed location or not :param x: new location to be added to the total population :return: None """ if is_failed: self.failed_query_points.append(x) else: self.total_population.insert(0, x) # assert len(self.total_population)+len(self.failed_query_points) == self.curr_points, "total population is " \ # "not the same as current" \ # " number of points" def _add_point_to_func_vector(self, y: float) -> None: """ prepends an element to the func_vector. This is to consider the order of elements in self.total_population (see self._add_point_to_population() for exact details) :param y: value of cost function to be added to func_vector :return: None """ self.func_vector.insert(0, float(y)) def _add_row_to_covariance(self, k_vec_new: np.array, is_failed: bool = False) -> None: """ Adds a row to covariance matrix ENSURE that self.curr_points is updated before calling this function :param k_vec_new: new row to be added to covariance matrix :return: None """ # preallocate updated covariance matrix k_new = np.identity(self.curr_points) if is_failed: # failed point, so add to the bottom of the covariance matrix # gather from previous covariance matrix and put in the upper part of new covariance matrix k_new[0:self.curr_points - 1, 0:self.curr_points - 1] = self.K.copy() # fill in the new row/column k_new[-1, :] = k_vec_new k_new[:, -1] = k_vec_new else: # normal point, add to the top of the covariance matrix # gather from previous covariance matrix and put in the lower part of new covariance matrix k_new[1:self.curr_points, 1:self.curr_points] = self.K.copy() # fill in the new row/column k_new[0, :] = k_vec_new k_new[:, 0] = k_vec_new # update the class variable self.K = k_new.copy()
[docs] def update_surrogate(self) -> None: """ updates the inverse of covariance function. In order to incorporate failed locations as interpolated values, this update is 2 staged: 1. Create the surrogate (K,K_inv) with only the successfully evaluated points 2. a) Evaluate the mean of the surrogate at the failed points b) Incorporate the interpolated failed locations into the surrogate and update the surrogate (K,K_inv) Updates the values of self.K_success, self.K_inv_success, self.K_inv_y_success, self.K_inv and self.K_inv_y :return: None """ pop_num = len(self.total_population) fail_num = len(self.failed_query_points) assert self.K.shape[0] == self.curr_points assert self.curr_points == pop_num + fail_num # create the surrogate without failed points self.K_success = self.K[0:pop_num, 0:pop_num] self.K_inv_success, self.K_inv_y_success = _invert_matrix_mult(self.K_success, np.asarray(self.func_vector)) logger.info('Surrogate without failed points created! ({} points) '.format(pop_num)) # evaluate the interpolated values for failed points self._evaluate_surrogate_for_failed_list() # create the surrogate with failed points self.K_inv, self.K_inv_y = _invert_matrix_mult(self.K, np.asarray(self.func_vector + self.failed_evaluations))
logger.info('Surrogate with failed points created! ({} points) '.format(self.curr_points)) def _is_near_to_population(self, x: np.array, population: List[np.array] = None) -> bool: """ Checks whether the next point is "near" to the already existing points Returns True if it is near to already existing points :param x: location to check nearness :param population : population to check nearness from, by default it will be set to current total population :return: bool: True/False """ if population is None: population = self.total_population for member in population: dist = ut.distance(member, x) # estimating that if the distance is less that 1% of correlation length(theta), # then the rows will be very close if dist < 1e-2 * np.max(self.kernel.theta): return True return False
[docs] def set_new_kernel_parameters(self, theta: Union[float, np.array] = 1.0, theta0: float = 1.0) -> None: """ sets the kernel parameters: called from the user script. The covariance matrix needs to be re-calculated when this function is called :param theta: scaling of dimensions used with the distance function :param theta0: scaling of kernel function :return: None """ self.kernel.theta0 = theta0 self.kernel.theta = theta # recalculate covariance matrix using add_point curr_pop = self.total_population.copy() curr_failed = self.failed_query_points.copy() curr_y = self.func_vector.copy() self.total_population = [] self.failed_query_points = [] self.func_vector = [] self.curr_points = 0 self.K = np.identity(self.curr_points) for pop, func in zip(curr_pop, curr_y): self.add_point(pop, func, if_check_nearness=True, is_failed=False) for pop in curr_failed: self.add_point(pop, if_check_nearness=True, is_failed=True) self.update_surrogate() logger.debug('Kernel function parameters changed: theta: {}, theta0: {}'.format(self.kernel.theta, self.kernel.theta0))
logger.debug('Covariance matrix recalculated!') # getter functions for user access
[docs] def get_total_population(self) -> Tuple[List[np.array], List[float]]: """ returns the total population : for visualization purposes :return: total_population and function values at that location """
return self.total_population, self.func_vector
[docs] def get_current_iteration(self) -> int: """ Returns the current iteration number :return: integer """
return self.curr_iter
[docs] def get_current_best(self) -> tuple: """ returns the location of current best and value of current best :return: location, func_value """ best_index = np.argmin(self.func_vector)
return self.total_population[best_index], self.func_vector[best_index] def _require_all_cost_function(self, xs: List[np.array]) -> List[Tuple[np.array, float]]: """ Function that evaluates all cost function, instead of a required fraction :param xs: list of locations to evaluate :return: list of tuples of locations and cost functions """ pending = xs completed = list() failed = list() while len(pending) > 0: new_completed, pending, new_failed = self.cost_function(pending) completed += new_completed failed += new_failed if len(failed) > 0: warnings.warn('Could not evaluate all required points (' + str(len(failed)) + ' evaluations failed).', category=RuntimeWarning) return completed
[docs] def export_csv(self, path: str, **kwargs) -> None: """ Writes the data from all completed function evaluations (i.e. x, y pairs) to a CSV file. The file starts with the following header row: x0, x1, ..., x[n_dim-1], y. Additional arguments will be forwarded to the csv.writer function, which can be used to control the formatting of the CSV file. To write a TSV instead of CSV, you can pass dialect='excel-tab'. :param path: Path to write the file to. If the file already exists, it will be overwritten. :param kwargs: Forwarded to csv.writer(), can be used to override the CSV dialect and set formatting options. """ with open(path, 'w', newline='') as f: writer = csv.writer(f, **kwargs) # write header (x0, x1, ..., xN-1, y) writer.writerow(['x' + str(n) for n in range(self.n_dim)] + ['y']) # write data for x, y in zip(self.total_population, self.func_vector):
writer.writerow(list(x) + [y])