Source code for skretrieval.retrieval.rodgers

from __future__ import annotations

import logging
from copy import copy

import numpy as np
from scipy import sparse

from skretrieval.retrieval import ForwardModel, Minimizer, RetrievalTarget
from skretrieval.retrieval.erroranalysis import estimate_error


[docs] class Rodgers(Minimizer):
[docs] def __init__( self, max_iter: int = 10, lm_damping: float = 0, iterative_update_lm: bool = False, retreat_lm: bool = False, lm_change_factor: float = 1.5, convergence_factor: float = 1, convergence_check_method="linear", lm_damping_method="fletcher", apply_cholesky_scaling=False, ): """ Implements the standard inverse problem method described in "Inverse Methods for Atmospheric Sounding" by Rodgers (2000). Parameters ---------- max_iter: int, optional The maximum number of iterations to perform when calling retrieve. Default: 10 lm_damping: float, optional The Levenberg-Marquardt damping parameter. A value of 0 would indicate no damping. Default: 0 iterative_update_lm: bool, optional If True, the LM damping factor is modified each iteration based on how well the problem is converging. Default: False retreat_lm: bool, optional If True, if the chi sq is worse for one iteration the state vector is retreated back to the previous iteration's value. Default: False lm_change_factor: float, optional The multiplicative factor applied to the LM damping parameter each iteration if iterative_update_lm is set to True. Default: 1.5 convergence_factor: float, optional See convergence_check_method. Reasonable values are 1.01 if convergence_check_method is linear, and 1 if convergence_check_method is dcost. Default: 1 convergence_check_method: str, optional Sets the method of checking for convergence. If 'linear' then (expected chi_sq) / (chi_sq) is checked if it is less than convergence_factor. If 'dcost', then the derivative of the cost function is checked against convergence_factor. lm_damping_method: str, optional One of 'fletcher', 'prior', or 'identity'. If 'fletcher', the LM damping term will be lm_damping * diag(K^T inv_Sy K). If 'prior', the lm damping is lm_damping * inv_Sa. If 'identity' the damping is lm_damping * identity. Default is 'fletcher' apply_cholesky_scaling: bool, optional If True, then attempts will be made to rescale the state vector and measurement vectors to a more numerically suitable space """ self._max_iter = max_iter self._lm_damping = lm_damping self._iterative_update_lm = iterative_update_lm self._retreat_lm = retreat_lm self._lm_change_factor = lm_change_factor self._convergence_factor = convergence_factor self._convergence_check_method = convergence_check_method self._lm_damping_method = lm_damping_method self._apply_cholesky_scaling = apply_cholesky_scaling
@staticmethod def _measurement_parameters(retrieval_target: RetrievalTarget, measurement_l1): """ Calculates parameters necessary for the iteration that depend on the measurement data. Parameters ---------- retrieval_target: RetrievalTarget Target that is being retrieved measurement_l1: Measurement l1 data Returns ------- y_meas_dict: dict Dictionary containing various parameters of the measurement vector y_meas: np.array The measurement vector, length (m,) Sy: matrix like Matrix of size (m,m) that contains the error covariance of the measurements inv_Sy: matrix like Matrix of size (m,m), that is the inverse of inv_Sy good_meas: np.array Flag of which measurements are used in y_meas. y_meas = y_meas_dict['y'][good_meas] """ y_meas_dict = retrieval_target.measurement_vector(measurement_l1) y_meas = y_meas_dict["y"] good_meas = ~np.isnan(y_meas) y_meas = y_meas[good_meas] if "y_error" in y_meas_dict: # Have measurement error if len(np.shape(y_meas_dict["y_error"])) == 1: # Only supplied is the diagonal of the error elements Sy = sparse.csc_matrix( sparse.diags(y_meas_dict["y_error"][good_meas], 0) ) inv_Sy = sparse.csc_matrix( sparse.diags(1 / y_meas_dict["y_error"][good_meas], 0) ) elif sparse.issparse(y_meas_dict["y_error"]): Sy = y_meas_dict["y_error"][np.ix_(good_meas, good_meas)] rows = Sy.indices # Row indices of non-zero elements cols = np.repeat( np.arange(Sy.shape[1]), np.diff(Sy.indptr) ) # Column indices of non-zero elements # Check if all non-zero elements are on the diagonal is_diagonal = np.all(rows == cols) if is_diagonal: # Ensure there are no zeros on the diagonal if np.any(Sy.data == 0): msg = "Cannot invert a matrix with zeros on the diagonal." raise ZeroDivisionError(msg) # Compute the inverse of the diagonal elements inv_data = 1.0 / Sy.data # Create the inverse matrix using the same indices and indptr inv_Sy = sparse.csc_matrix( (inv_data, Sy.indices, Sy.indptr), shape=Sy.shape ) else: inv_Sy = sparse.csc_matrix(np.linalg.inv(Sy.toarray())) else: Sy = y_meas_dict["y_error"][np.ix_(good_meas, good_meas)] inv_Sy = np.linalg.inv(Sy) else: # No user supplied error, use identity matrix Sy = sparse.csc_matrix(sparse.eye(len(y_meas), len(y_meas))) inv_Sy = sparse.csc_matrix(sparse.eye(len(y_meas), len(y_meas))) return y_meas_dict, y_meas, Sy, inv_Sy, good_meas @staticmethod def _apriori_parameters(retrieval_target: RetrievalTarget): """ Calculates several parameters related to the apriori using the retrieval target. Parameters ---------- retrieval_target: RetrievalTarget Returns ------- x_a: np.array Array of length (n,) that is the apriori state inv_Sa: matrix_like Matrix of size (n,n) that is the apriori covariance initial_guess: np.array Array of length (n,) that is the current state """ x_a = retrieval_target.apriori_state() initial_guess = retrieval_target.state_vector() inv_Sa = retrieval_target.inverse_apriori_covariance() if inv_Sa is None: # No apriori covariance/regularization # Use initial guess to make the matrices as x_a might be None as well inv_Sa = np.zeros((len(initial_guess), len(initial_guess))) if x_a is None: x_a = np.zeros_like(initial_guess) return x_a, inv_Sa, initial_guess
[docs] def retrieve( self, measurement_l1, forward_model: ForwardModel, retrieval_target: RetrievalTarget, ): retrieval_target.initialize(forward_model, measurement_l1) output_dict = {} y_meas_dict, y_meas, Sy, inv_Sy, good_meas = self._measurement_parameters( retrieval_target, measurement_l1 ) x_a, inv_Sa, initial_guess = self._apriori_parameters(retrieval_target) xs = [] xs.append(initial_guess) ys = [] chi_sq_prev = None best_x = retrieval_target.state_vector() best_chi_sq = 1e99 if self._apply_cholesky_scaling: x_scaler_inv = sparse.diags(np.sqrt(inv_Sa.diagonal())) y_scaler = sparse.diags(1 / np.sqrt(inv_Sy.diagonal())) y_scaler_inv = sparse.diags(np.sqrt(inv_Sy.diagonal())) x_scaler = sparse.diags(1 / np.sqrt(inv_Sa.diagonal())) else: x_scaler_inv = sparse.eye(len(x_a)) x_scaler = x_scaler_inv y_scaler = sparse.eye(len(y_meas)) y_scaler_inv = sparse.eye(len(y_meas)) y_meas = y_scaler_inv @ y_meas inv_Sa = x_scaler @ inv_Sa @ x_scaler inv_Sy = y_scaler @ inv_Sy @ y_scaler x_a = x_scaler_inv @ x_a for iter_idx in range(self._max_iter): x = x_scaler_inv @ retrieval_target.state_vector() forward_l1 = forward_model.calculate_radiance() y_ret_dict = retrieval_target.measurement_vector(forward_l1) K = y_ret_dict["jacobian"] K = y_scaler_inv @ K[good_meas, :] @ x_scaler y_ret = y_scaler_inv @ y_ret_dict["y"][good_meas] ys.append(y_ret_dict["y"]) approx_hessian = K.T @ inv_Sy @ K # Left side of rodgers equation if sparse.issparse(K): if self._lm_damping_method.lower() == "fletcher": A = ( approx_hessian + self._lm_damping * sparse.diags((approx_hessian).diagonal()) + inv_Sa ) elif self._lm_damping_method.lower() == "prior": A = approx_hessian + (self._lm_damping + 1) * inv_Sa elif self._lm_damping_method == "identity": A = ( approx_hessian + inv_Sa + self._lm_damping * np.eye(inv_Sa.shape) ) else: msg = "lm_damping_method should be one of fletcher, prior, or identity" raise ValueError(msg) else: if self._lm_damping_method.lower() == "fletcher": A = ( approx_hessian + self._lm_damping * np.diag(np.diag(approx_hessian)) + inv_Sa ) elif self._lm_damping_method.lower() == "prior": A = approx_hessian + (self._lm_damping + 1) * inv_Sa elif self._lm_damping_method == "identity": A = ( approx_hessian + inv_Sa + self._lm_damping * np.eye(inv_Sa.shape[0]) ) else: msg = "lm_damping_method should be one of fletcher, prior, or identity" raise ValueError(msg) A_without_lm = approx_hessian + inv_Sa # Right side of rodgers equation B = K.T @ inv_Sy @ (y_meas - y_ret) - inv_Sa @ (x - x_a) if sparse.issparse(A): try: delta_x = np.linalg.solve(A.toarray(), B) except np.linalg.LinAlgError: delta_x = np.linalg.lstsq(A.toarray(), B)[0] else: try: delta_x = np.linalg.solve(A, B) except np.linalg.LinAlgError: delta_x = np.linalg.lstsq(A, B)[0] if sparse.issparse(A_without_lm): try: delta_x_without_lm = np.linalg.solve(A_without_lm.toarray(), B) except np.linalg.LinAlgError: delta_x_without_lm = np.linalg.lstsq(A_without_lm.toarray(), B)[0] else: try: delta_x_without_lm = np.linalg.solve(A_without_lm, B) except np.linalg.LinAlgError: delta_x_without_lm = np.linalg.lstsq(A_without_lm, B)[0] x_new = x_scaler @ (x + delta_x) chi_sq_only_meas = (y_meas - y_ret).T @ inv_Sy @ (y_meas - y_ret) chi_sq = chi_sq_only_meas + (x_a - x).T @ inv_Sa @ (x_a - x) chi_sq_only_meas_linear = ( (y_meas - y_ret - K @ delta_x_without_lm).T @ inv_Sy @ (y_meas - y_ret - K @ delta_x_without_lm) ) chi_sq_linear = (y_meas - y_ret - K @ delta_x_without_lm).T @ inv_Sy @ ( y_meas - y_ret - K @ delta_x_without_lm ) + (x_a - x - delta_x_without_lm).T @ inv_Sa @ ( x_a - x - delta_x_without_lm ) chi_sq_only_meas /= len(y_meas) chi_sq /= len(y_meas) if np.isnan(chi_sq) or np.isinf(chi_sq): msg = "chi_sq is infinite or nan" raise ValueError(msg) chi_sq_only_meas_linear /= len(y_meas) chi_sq_linear /= len(y_meas) logging.info("Iteration %i of %i", iter_idx + 1, self._max_iter) logging.info( " chi_sq: %f, chi_sq_only_meas: %f, expected_chi_sq: %f, chi_sq_only_meas_linear: %f", chi_sq, chi_sq_only_meas, chi_sq_linear, chi_sq_only_meas_linear, ) retrieval_target.update_state(x_new) dcost = ( delta_x_without_lm @ (K.T @ inv_Sy @ (y_meas - y_ret) + inv_Sa @ (x_a - x)) / len(x) ) logging.info(" dcost: %f", dcost) if chi_sq_prev is not None and chi_sq_prev < chi_sq: # Iteration was worse if self._iterative_update_lm: self._lm_damping *= self._lm_change_factor**2 if self._retreat_lm: retrieval_target.update_state(x_scaler @ best_x) else: chi_sq_prev = chi_sq logging.info( " Iteration was worse increasing LM factor to: %f", self._lm_damping, ) elif chi_sq_prev is None or chi_sq < chi_sq_prev: best_x = copy(x) best_chi_sq = chi_sq chi_sq_prev = best_chi_sq if self._iterative_update_lm and iter_idx > 0: self._lm_damping /= self._lm_change_factor logging.info( " Iteration was better, decreasing LM factor to: %f", self._lm_damping, ) xs.append(retrieval_target.state_vector()) if self._convergence_check_method.lower() == "linear": if ( chi_sq / chi_sq_linear < self._convergence_factor and chi_sq / chi_sq_linear > 1 ): logging.info( " Stopping due to early convergence, converage_ratio: %f", chi_sq / chi_sq_linear, ) break elif (self._convergence_check_method.lower() == "dcost") and ( dcost < self._convergence_factor ): logging.info( " Stopping due to early convergence. dcost is less than convergence_factor" ) break if iter_idx != self._max_iter - 1: predicted_delta_y = y_meas - y_ret - K @ delta_x_without_lm retrieval_target.adjust_parameters( forward_model, y_ret_dict, chi_sq, chi_sq_linear, iter_idx, predicted_delta_y, ) if retrieval_target.state_vector_allowed_to_change(): x_a, inv_Sa, initial_guess = self._apriori_parameters( retrieval_target ) if self._apply_cholesky_scaling: x_scaler_inv = sparse.diags(np.sqrt(inv_Sa.diagonal())) x_scaler = sparse.diags(1 / np.sqrt(inv_Sa.diagonal())) else: x_scaler_inv = sparse.eye(len(x_a)) x_scaler = x_scaler_inv x_a = x_scaler_inv @ x_a inv_Sa = x_scaler @ inv_Sa @ x_scaler if retrieval_target.measurement_vector_allowed_to_change(): ( y_meas_dict, y_meas, Sy, inv_Sy, good_meas, ) = self._measurement_parameters(retrieval_target, measurement_l1) if self._apply_cholesky_scaling: y_scaler = sparse.diags(1 / np.sqrt(inv_Sy.diagonal())) y_scaler_inv = sparse.diags(np.sqrt(inv_Sy.diagonal())) else: y_scaler = sparse.eye(len(y_meas)) y_scaler_inv = sparse.eye(len(y_meas)) y_meas = y_scaler_inv @ y_meas inv_Sy = y_scaler @ inv_Sy @ y_scaler if self._max_iter > 0: output_dict.update(estimate_error(K, Sy, inv_Sy, inv_Sa, A_without_lm)) output_dict["xs"] = xs output_dict["ys"] = ys output_dict["y_meas"] = y_meas_dict["y"] output_dict["forward_l1"] = forward_l1 if self._max_iter > 0: output_dict["chi_sq_meas"] = chi_sq_only_meas output_dict["chi_sq_meas_linear"] = chi_sq_only_meas_linear output_dict["chi_sq"] = chi_sq output_dict["chi_sq_linear"] = chi_sq_linear else: output_dict["chi_sq_meas"] = None output_dict["chi_sq_meas_linear"] = None output_dict["chi_sq"] = None output_dict["chi_sq_linear"] = None return output_dict