Source code for skretrieval.retrieval.scipy

from __future__ import annotations

import numpy as np
from scipy import sparse
from scipy.optimize import least_squares, minimize
from scipy.optimize._optimize import MemoizeJac

from skretrieval.core.radianceformat import RadianceBase
from skretrieval.retrieval import ForwardModel, Minimizer, RetrievalTarget
from skretrieval.retrieval.erroranalysis import estimate_error


[docs] class SciPyMinimizer(Minimizer):
[docs] def __init__( self, method="trf", max_nfev=20, ftol=1e-3, xtol=1e-36, x_scale="jac", tr_solver="exact", apply_state_scaling=False, include_bounds=False, num_passes=1, **kwargs, ) -> None: """ A minimization wrapper around Scipy's least_squares function Parameters ---------- method : str, optional Minimization method, see scipy.least_squares, by default "trf". Recommended to only use "lm" or "trf". max_nfev : int, optional Maximum function evalations, see scipy.least_squares, by default 20 ftol : _type_, optional Tolerance on the cost function, see sci, by default 1e-3 xtol : _type_, optional Tolerance on the change in state, by default 1e-36 x_scale : str, optional Internal scaling applyed by the minimizer, by default "jac" tr_solver : str, optional For the "trf" method, how to solve the trust region problem, by default "exact" apply_state_scaling: bool, optional If true, then the state vector is scaled relative to the apriori in the solver, useful when the state vector elements are of largely varying magnitudes and you have a well specified prior, by default False include_bounds : bool, optional If true, then bounds are included inside the solver. Only has an effect weth method is "trf", by default False num_passes : int, optional Number of passes to do through the minimizer. After each pass the noise covariance is adjusted where measurements with large residuals are given less weight on the next pass, by default 1 """ self._method = method self._ftol = ftol self._xtol = xtol self._max_nfev = max_nfev self._x_scale = x_scale self._tr_solver = tr_solver self._include_bounds = include_bounds self._num_passes = num_passes self._apply_state_scaling = apply_state_scaling self._kwargs = kwargs
[docs] def retrieve( self, measurement_l1: RadianceBase, forward_model: ForwardModel, retrieval_target: RetrievalTarget, ): ### Get the prior values x_a = retrieval_target.apriori_state() initial_guess = retrieval_target.state_vector() lb = retrieval_target.lower_bound() ub = retrieval_target.upper_bound() inv_Sa = retrieval_target.inverse_apriori_covariance() if inv_Sa is None: # No apriori covariance/regularization # Use initial guess to make the matrices as x_a might be None as well inv_Sa = np.zeros((len(initial_guess), len(initial_guess))) if x_a is None: x_a = np.zeros_like(initial_guess) ### Get the measurement values y_meas_dict = retrieval_target.measurement_vector(measurement_l1) y_meas = y_meas_dict["y"] good_meas = ~np.isnan(y_meas) y_meas = y_meas[good_meas] if "y_error" in y_meas_dict: # Have measurement error if len(np.shape(y_meas_dict["y_error"])) == 1: # Only supplied is the diagonal of the error elements Sy = sparse.csc_matrix( sparse.diags(y_meas_dict["y_error"][good_meas], 0) ) inv_Sy = sparse.csc_matrix( sparse.diags(1 / y_meas_dict["y_error"][good_meas], 0) ) y_scaler_inv = np.linalg.cholesky(inv_Sy) elif sparse.issparse(y_meas_dict["y_error"]): Sy = y_meas_dict["y_error"][np.ix_(good_meas, good_meas)] rows = Sy.indices # Row indices of non-zero elements cols = np.repeat( np.arange(Sy.shape[1]), np.diff(Sy.indptr) ) # Column indices of non-zero elements # Check if all non-zero elements are on the diagonal is_diagonal = np.all(rows == cols) if is_diagonal: # Ensure there are no zeros on the diagonal if np.any(Sy.data == 0): msg = "Cannot invert a matrix with zeros on the diagonal." raise ZeroDivisionError(msg) # Compute the inverse of the diagonal elements inv_data = 1.0 / Sy.data # Create the inverse matrix using the same indices and indptr inv_Sy = sparse.csc_matrix( (inv_data, Sy.indices, Sy.indptr), shape=Sy.shape ) y_scaler_inv = sparse.csc_matrix( (np.sqrt(inv_data), Sy.indices, Sy.indptr), shape=Sy.shape ) else: Sy = y_meas_dict["y_error"][np.ix_(good_meas, good_meas)] inv_Sy = np.linalg.inv(Sy) y_scaler_inv = np.linalg.cholesky(inv_Sy) else: # No user supplied error, use identity matrix Sy = sparse.csc_matrix(sparse.eye(len(y_meas), len(y_meas))) inv_Sy = sparse.csc_matrix(sparse.eye(len(y_meas), len(y_meas))) y_scaler_inv = np.linalg.cholesky(inv_Sy) try: chol_inv_Sa = np.linalg.cholesky(inv_Sa) except np.linalg.LinAlgError: # If the inverse covariance is not positive definite, then we can't use the cholesky # decomposition, but we can use an eigenvalue decomposition eigvals, eigvecs = np.linalg.eigh(inv_Sa) eigvals[eigvals < 0] = 0 chol_inv_Sa = np.diag(np.sqrt(eigvals)) @ eigvecs.T if self._apply_state_scaling: x_scaler_inv = np.diag(1 / x_a) x_scaler = np.diag(x_a) else: x_scaler_inv = np.eye(len(x_a)) x_scaler = np.eye(len(x_a)) x_a = x_scaler_inv @ x_a def residual_fun(x): retrieval_target.update_state(x_scaler @ x) y_ret_dict = retrieval_target.measurement_vector( forward_model.calculate_radiance() ) K = y_ret_dict["jacobian"] K = y_scaler_inv @ K[good_meas, :] @ x_scaler y_ret = y_scaler_inv @ y_ret_dict["y"][good_meas] # First part of residuals is from y, y_meas - y_ret, and jacobian K res = y_ret - y_scaler_inv @ y_meas # Second part of residuals is x-x_a, with identity jacobian in scaled space res_x = chol_inv_Sa @ x_scaler @ (x - x_a) K_x = chol_inv_Sa @ x_scaler # To match the cost of the standard "Rodgers" minimizer we have to scale by the number of measurements, # and also multiply by 2 since the scipy least squares does 0.5 * res.T @ res n = len(res) / 2 return np.concatenate((res, res_x)) / np.sqrt(n), np.vstack( [K, K_x] ) / np.sqrt(n) fun = MemoizeJac(residual_fun) jac = fun.derivative bounds = ( (x_scaler_inv @ lb, x_scaler_inv @ ub) if self._include_bounds else (np.ones_like(lb) * (-np.inf), np.ones_like(ub) * np.inf) ) results = {} for _ in range(self._num_passes): results["minimizer"] = least_squares( fun, x0=x_scaler_inv @ initial_guess, jac=jac, x_scale=self._x_scale, verbose=2, tr_solver=self._tr_solver, max_nfev=self._max_nfev, tr_options={"regularize": False}, method=self._method, xtol=self._xtol, ftol=self._ftol, bounds=bounds, **self._kwargs, ) y_ret_dict = retrieval_target.measurement_vector( forward_model.calculate_radiance() ) meas_resid = y_meas - y_ret_dict["y"][good_meas] median_resid = np.median(np.abs(meas_resid)) # Adjust the scaler based on the residual fractions scaler = np.abs(meas_resid) / median_resid scaler[scaler < 1] = 1 # Rescale the measurement errrors y_scaler_inv = sparse.diags(np.sqrt(inv_Sy.diagonal() / scaler**2)) initial_guess = retrieval_target.state_vector() K = y_ret_dict["jacobian"][good_meas, :] results.update(estimate_error(K, Sy, inv_Sy, inv_Sa)) return results
class SciPyMinimizerGrad(Minimizer): def __init__(self) -> None: super().__init__() def retrieve( self, measurement_l1: RadianceBase, forward_model: ForwardModel, retrieval_target: RetrievalTarget, ): ### Get the prior values x_a = retrieval_target.apriori_state() initial_guess = retrieval_target.state_vector() inv_Sa = retrieval_target.inverse_apriori_covariance() if inv_Sa is None: # No apriori covariance/regularization # Use initial guess to make the matrices as x_a might be None as well inv_Sa = np.zeros((len(initial_guess), len(initial_guess))) if x_a is None: x_a = np.zeros_like(initial_guess) ### Get the measurement values y_meas_dict = retrieval_target.measurement_vector(measurement_l1) y_meas = y_meas_dict["y"] good_meas = ~np.isnan(y_meas) y_meas = y_meas[good_meas] if "y_error" in y_meas_dict: # Have measurement error if len(np.shape(y_meas_dict["y_error"])) == 1: # Only supplied is the diagonal of the error elements Sy = sparse.csc_matrix( sparse.diags(y_meas_dict["y_error"][good_meas], 0) ) inv_Sy = sparse.csc_matrix( sparse.diags(1 / y_meas_dict["y_error"][good_meas], 0) ) else: Sy = y_meas_dict["y_error"][np.ix_(good_meas, good_meas)] inv_Sy = np.linalg.inv(Sy) else: # No user supplied error, use identity matrix Sy = sparse.csc_matrix(sparse.eye(len(y_meas), len(y_meas))) inv_Sy = sparse.csc_matrix(sparse.eye(len(y_meas), len(y_meas))) y_scaler_inv = sparse.diags(np.sqrt(inv_Sy.diagonal())) x_scaler_inv = np.linalg.cholesky(inv_Sa) x_scaler = np.linalg.inv(x_scaler_inv) y_meas = y_scaler_inv @ y_meas x_a = x_scaler_inv @ x_a def residual_fun(x): retrieval_target.update_state(x_scaler @ x) y_ret_dict = retrieval_target.measurement_vector( forward_model.calculate_radiance() ) K = y_scaler_inv @ y_ret_dict["jacobian"][good_meas, :] @ x_scaler y_ret = y_scaler_inv @ y_ret_dict["y"][good_meas] cost = (y_ret - y_meas).T @ (y_ret - y_meas).T + (x - x_a).T @ (x - x_a) grad = K.T @ (y_ret - y_meas) + (x - x_a) K_x = np.eye(len(x)) full_K = np.vstack([K, K_x]) return (cost, 2 * grad), 2 * full_K.T @ full_K fun = MemoizeJac(residual_fun) hess = fun.derivative return minimize( fun, x0=x_scaler_inv @ initial_guess, jac=True, hess=hess, options={"disp": True, "maxiter": 30}, method="trust-exact", )