Source code for copulas.multivariate.gaussian

"""GaussianMultivariate module."""

import logging
import sys

import numpy as np
import pandas as pd
from scipy import stats

from copulas import (
    EPSILON, check_valid_values, get_instance, get_qualified_name, random_state, store_args,
    validate_random_state)
from copulas.multivariate.base import Multivariate
from copulas.univariate import GaussianUnivariate, Univariate

LOGGER = logging.getLogger(__name__)
DEFAULT_DISTRIBUTION = Univariate


[docs]class GaussianMultivariate(Multivariate): """Class for a multivariate distribution that uses the Gaussian copula. Args: distribution (str or dict): Fully qualified name of the class to be used for modeling the marginal distributions or a dictionary mapping column names to the fully qualified distribution names. """ correlation = None columns = None univariates = None @store_args def __init__(self, distribution=DEFAULT_DISTRIBUTION, random_state=None): self.random_state = validate_random_state(random_state) self.distribution = distribution def __repr__(self): """Produce printable representation of the object.""" if self.distribution == DEFAULT_DISTRIBUTION: distribution = '' elif isinstance(self.distribution, type): distribution = f'distribution="{self.distribution.__name__}"' else: distribution = f'distribution="{self.distribution}"' return f'GaussianMultivariate({distribution})' def _transform_to_normal(self, X): if isinstance(X, pd.Series): X = X.to_frame().T elif not isinstance(X, pd.DataFrame): if len(X.shape) == 1: X = [X] X = pd.DataFrame(X, columns=self.columns) U = [] for column_name, univariate in zip(self.columns, self.univariates): if column_name in X: column = X[column_name] U.append(univariate.cdf(column.to_numpy()).clip(EPSILON, 1 - EPSILON)) return stats.norm.ppf(np.column_stack(U)) def _get_correlation(self, X): """Compute correlation matrix with transformed data. Args: X (numpy.ndarray): Data for which the correlation needs to be computed. Returns: numpy.ndarray: computed correlation matrix. """ result = self._transform_to_normal(X) correlation = pd.DataFrame(data=result).corr().to_numpy() correlation = np.nan_to_num(correlation, nan=0.0) # If singular, add some noise to the diagonal if np.linalg.cond(correlation) > 1.0 / sys.float_info.epsilon: correlation = correlation + np.identity(correlation.shape[0]) * EPSILON return pd.DataFrame(correlation, index=self.columns, columns=self.columns) @check_valid_values def fit(self, X): """Compute the distribution for each variable and then its correlation matrix. Arguments: X (pandas.DataFrame): Values of the random variables. """ LOGGER.info('Fitting %s', self) if not isinstance(X, pd.DataFrame): X = pd.DataFrame(X) columns = [] univariates = [] for column_name, column in X.items(): if isinstance(self.distribution, dict): distribution = self.distribution.get(column_name, DEFAULT_DISTRIBUTION) else: distribution = self.distribution LOGGER.debug('Fitting column %s to %s', column_name, distribution) univariate = get_instance(distribution) try: univariate.fit(column) except BaseException: log_message = ( f'Unable to fit to a {distribution} distribution for column {column_name}. ' 'Using a Gaussian distribution instead.' ) LOGGER.info(log_message) univariate = GaussianUnivariate() univariate.fit(column) columns.append(column_name) univariates.append(univariate) self.columns = columns self.univariates = univariates LOGGER.debug('Computing correlation') self.correlation = self._get_correlation(X) self.fitted = True LOGGER.debug('GaussianMultivariate fitted successfully')
[docs] def probability_density(self, X): """Compute the probability density for each point in X. Arguments: X (pandas.DataFrame): Values for which the probability density will be computed. Returns: numpy.ndarray: Probability density values for points in X. Raises: NotFittedError: if the model is not fitted. """ self.check_fit() transformed = self._transform_to_normal(X) return stats.multivariate_normal.pdf( transformed, cov=self.correlation, allow_singular=True)
[docs] def cumulative_distribution(self, X): """Compute the cumulative distribution value for each point in X. Arguments: X (pandas.DataFrame): Values for which the cumulative distribution will be computed. Returns: numpy.ndarray: Cumulative distribution values for points in X. Raises: NotFittedError: if the model is not fitted. """ self.check_fit() transformed = self._transform_to_normal(X) return stats.multivariate_normal.cdf(transformed, cov=self.correlation)
def _get_conditional_distribution(self, conditions): """Compute the parameters of a conditional multivariate normal distribution. The parameters of the conditioned distribution are computed as specified here: https://en.wikipedia.org/wiki/Multivariate_normal_distribution#Conditional_distributions Args: conditions (pandas.Series): Mapping of the column names and column values to condition on. The input values have already been transformed to their normal distribution. Returns: tuple: * means (numpy.array): mean values to use for the conditioned multivariate normal. * covariance (numpy.array): covariance matrix to use for the conditioned multivariate normal. * columns (list): names of the columns that will be sampled conditionally. """ columns2 = conditions.index columns1 = self.correlation.columns.difference(columns2) sigma11 = self.correlation.loc[columns1, columns1].to_numpy() sigma12 = self.correlation.loc[columns1, columns2].to_numpy() sigma21 = self.correlation.loc[columns2, columns1].to_numpy() sigma22 = self.correlation.loc[columns2, columns2].to_numpy() mu1 = np.zeros(len(columns1)) mu2 = np.zeros(len(columns2)) sigma12sigma22inv = sigma12 @ np.linalg.inv(sigma22) mu_bar = mu1 + sigma12sigma22inv @ (conditions - mu2) sigma_bar = sigma11 - sigma12sigma22inv @ sigma21 return mu_bar, sigma_bar, columns1 def _get_normal_samples(self, num_rows, conditions): """Get random rows in the standard normal space. If no conditions are given, the values are sampled from a standard normal multivariate. If conditions are given, they are transformed to their equivalent standard normal values using their marginals and then the values are sampled from a standard normal multivariate conditioned on the given condition values. """ if conditions is None: covariance = self.correlation columns = self.columns means = np.zeros(len(columns)) else: conditions = pd.Series(conditions) normal_conditions = self._transform_to_normal(conditions)[0] normal_conditions = pd.Series(normal_conditions, index=conditions.index) means, covariance, columns = self._get_conditional_distribution(normal_conditions) samples = np.random.multivariate_normal(means, covariance, size=num_rows) return pd.DataFrame(samples, columns=columns) @random_state def sample(self, num_rows=1, conditions=None): """Sample values from this model. Argument: num_rows (int): Number of rows to sample. conditions (dict or pd.Series): Mapping of the column names and column values to condition on. Returns: numpy.ndarray: Array of shape (n_samples, *) with values randomly sampled from this model distribution. If conditions have been given, the output array also contains the corresponding columns populated with the given values. Raises: NotFittedError: if the model is not fitted. """ self.check_fit() samples = self._get_normal_samples(num_rows, conditions) output = {} for column_name, univariate in zip(self.columns, self.univariates): if conditions and column_name in conditions: # Use the values that were given as conditions in the original space. output[column_name] = np.full(num_rows, conditions[column_name]) else: cdf = stats.norm.cdf(samples[column_name]) output[column_name] = univariate.percent_point(cdf) return pd.DataFrame(data=output)
[docs] def to_dict(self): """Return a `dict` with the parameters to replicate this object. Returns: dict: Parameters of this distribution. """ self.check_fit() univariates = [univariate.to_dict() for univariate in self.univariates] return { 'correlation': self.correlation.to_numpy().tolist(), 'univariates': univariates, 'columns': self.columns, 'type': get_qualified_name(self), }
[docs] @classmethod def from_dict(cls, copula_dict): """Create a new instance from a parameters dictionary. Args: params (dict): Parameters of the distribution, in the same format as the one returned by the ``to_dict`` method. Returns: Multivariate: Instance of the distribution defined on the parameters. """ instance = cls() instance.univariates = [] columns = copula_dict['columns'] instance.columns = columns for parameters in copula_dict['univariates']: instance.univariates.append(Univariate.from_dict(parameters)) correlation = copula_dict['correlation'] instance.correlation = pd.DataFrame(correlation, index=columns, columns=columns) instance.fitted = True return instance