Source code for copulas.bivariate.base

"""This module contains a base class for bivariate copulas."""

import json
import warnings
from enum import Enum

import numpy as np
from scipy import stats
from scipy.optimize import brentq

from copulas.bivariate.utils import split_matrix
from copulas.errors import NotFittedError
from copulas.utils import EPSILON, random_state, validate_random_state


[docs] class CopulaTypes(Enum): """Available copula families.""" CLAYTON = 0 FRANK = 1 GUMBEL = 2 INDEPENDENCE = 3
[docs] class Bivariate(object): """Base class for bivariate copulas. This class allows to instantiate all its subclasses and serves as a unique entry point for the bivariate copulas classes. >>> Bivariate(copula_type=CopulaTypes.FRANK).__class__ copulas.bivariate.frank.Frank >>> Bivariate(copula_type='frank').__class__ copulas.bivariate.frank.Frank Args: copula_type (Union[CopulaType, str]): Subtype of the copula. random_state (Union[int, np.random.RandomState, None]): Seed or RandomState for the random generator. Attributes: copula_type(CopulaTypes): Family of the copula a subclass belongs to. _subclasses(list[type]): List of declared subclasses. theta_interval(list[float]): Interval of valid thetas for the given copula family. invalid_thetas(list[float]): Values that, even though they belong to :attr:`theta_interval`, shouldn't be considered valid. tau (float): Kendall's tau for the data given at :meth:`fit`. theta(float): Parameter for the copula. """ copula_type = None _subclasses = [] theta_interval = [] invalid_thetas = [] theta = None tau = None @classmethod def _get_subclasses(cls): """Find recursively subclasses for the current class object. Returns: list[Bivariate]: List of subclass objects. """ subclasses = [] for subclass in cls.__subclasses__(): subclasses.append(subclass) subclasses.extend(subclass._get_subclasses()) return subclasses
[docs] @classmethod def subclasses(cls): """Return a list of subclasses for the current class object. Returns: list[Bivariate]: Subclasses for given class. """ if not cls._subclasses: cls._subclasses = cls._get_subclasses() return cls._subclasses
def __new__(cls, *args, **kwargs): """Create and return a new object. Returns: Bivariate: New object. """ copula_type = kwargs.get('copula_type', None) if copula_type is None: return super(Bivariate, cls).__new__(cls) if not isinstance(copula_type, CopulaTypes): if isinstance(copula_type, str) and copula_type.upper() in CopulaTypes.__members__: copula_type = CopulaTypes[copula_type.upper()] else: raise ValueError(f'Invalid copula type {copula_type}') for subclass in cls.subclasses(): if subclass.copula_type is copula_type: return super(Bivariate, cls).__new__(subclass) def __init__(self, copula_type=None, random_state=None): """Initialize Bivariate object. Args: copula_type (CopulaType or str): Subtype of the copula. random_state (int, np.random.RandomState, or None): Seed or RandomState for the random generator. """ self.random_state = validate_random_state(random_state)
[docs] def check_theta(self): """Validate the computed theta against the copula specification. This method is used to assert the computed theta is in the valid range for the copula. Raises: ValueError: If theta is not in :attr:`theta_interval` or is in :attr:`invalid_thetas`, """ lower, upper = self.theta_interval if (not lower <= self.theta <= upper) or (self.theta in self.invalid_thetas): message = 'The computed theta value {} is out of limits for the given {} copula.' raise ValueError(message.format(self.theta, self.copula_type.name))
[docs] def check_fit(self): """Assert that the model is fit and the computed `theta` is valid. Raises: NotFittedError: if the model is not fitted. ValueError: if the computed theta is invalid. """ if not self.theta: raise NotFittedError('This model is not fitted.') self.check_theta()
[docs] def check_marginal(self, u): """Check that the marginals are uniformly distributed. Args: u(np.ndarray): Array of datapoints with shape (n,). Raises: ValueError: If the data does not appear uniformly distributed. """ if min(u) < 0.0 or max(u) > 1.0: raise ValueError('Marginal value out of bounds.') emperical_cdf = np.sort(u) uniform_cdf = np.linspace(0.0, 1.0, num=len(u)) ks_statistic = max(np.abs(emperical_cdf - uniform_cdf)) if ks_statistic > 1.627 / np.sqrt(len(u)): # KS test with significance level 0.01 warnings.warn('Data does not appear to be uniform.', category=RuntimeWarning)
def _compute_theta(self): """Compute theta, validate it and assign it to self.""" self.theta = self.compute_theta() self.check_theta()
[docs] def fit(self, X): """Fit a model to the data updating the parameters. Args: X(np.ndarray): Array of datapoints with shape (n,2). Return: None """ U, V = split_matrix(X) self.check_marginal(U) self.check_marginal(V) self.tau = stats.kendalltau(U, V)[0] if np.isnan(self.tau): if len(np.unique(U)) == 1 or len(np.unique(V)) == 1: raise ValueError('Constant column.') raise ValueError('Unable to compute tau.') self._compute_theta()
[docs] def to_dict(self): """Return a `dict` with the parameters to replicate this object. Returns: dict: Parameters of the copula. """ return {'copula_type': self.copula_type.name, 'theta': self.theta, 'tau': self.tau}
[docs] @classmethod def from_dict(cls, copula_dict): """Create a new instance from the given parameters. Args: copula_dict: `dict` with the parameters to replicate the copula. Like the output of `Bivariate.to_dict` Returns: Bivariate: Instance of the copula defined on the parameters. """ instance = cls(copula_type=copula_dict['copula_type']) instance.theta = copula_dict['theta'] instance.tau = copula_dict['tau'] return instance
[docs] def infer(self, X): """Take in subset of values and predicts the rest.""" raise NotImplementedError
[docs] def generator(self, t): r"""Compute the generator function for Archimedian copulas. The generator is a function :math:`\psi: [0,1]\times\Theta \rightarrow [0, \infty)` # noqa: JS101 that given an Archimedian copula fulfills: .. math:: C(u,v) = \psi^{-1}(\psi(u) + \psi(v)) In a more generic way: .. math:: C(u_1, u_2, ..., u_n;\theta) = \psi^-1(\sum_0^n{\psi(u_i;\theta)}; \theta) """ raise NotImplementedError
[docs] def probability_density(self, X): r"""Compute probability density function for given copula family. The probability density(pdf) for a given copula is defined as: .. math:: c(U,V) = \frac{\partial^2 C(u,v)}{\partial v \partial u} Args: X(np.ndarray): Shape (n, 2).Datapoints to compute pdf. Returns: np.array: Probability density for the input values. """ raise NotImplementedError
[docs] def log_probability_density(self, X): """Return log probability density of model. The log probability should be overridden with numerically stable variants whenever possible. Arguments: X: `np.ndarray` of shape (n, 1). Returns: np.ndarray """ return np.log(self.probability_density(X))
[docs] def pdf(self, X): """Shortcut to :meth:`probability_density`.""" return self.probability_density(X)
[docs] def cumulative_distribution(self, X): """Compute the cumulative distribution function for the copula, :math:`C(u, v)`. Args: X(np.ndarray): Returns: numpy.array: cumulative probability """ raise NotImplementedError
[docs] def cdf(self, X): """Shortcut to :meth:`cumulative_distribution`.""" return self.cumulative_distribution(X)
[docs] def percent_point(self, y, V): """Compute the inverse of conditional cumulative distribution :math:`C(u|v)^{-1}`. Args: y: `np.ndarray` value of :math:`C(u|v)`. v: `np.ndarray` given value of v. """ self.check_fit() result = [] for _y, _v in zip(y, V): def f(u): return self.partial_derivative_scalar(u, _v) - _y minimum = brentq(f, EPSILON, 1.0) if isinstance(minimum, np.ndarray): minimum = minimum[0] result.append(minimum) return np.array(result)
[docs] def ppf(self, y, V): """Shortcut to :meth:`percent_point`.""" return self.percent_point(y, V)
[docs] def partial_derivative(self, X): r"""Compute partial derivative of cumulative distribution. The partial derivative of the copula(CDF) is the conditional CDF. .. math:: F(v|u) = \frac{\partial C(u,v)}{\partial u} The base class provides a finite difference approximation of the partial derivative of the CDF with respect to u. Args: X(np.ndarray) y(float) Returns: np.ndarray """ delta = -2 * (X[:, 1] > 0.5) + 1 delta = 0.0001 * delta X_prime = X.copy() X_prime[:, 1] += delta f = self.cumulative_distribution(X) f_prime = self.cumulative_distribution(X_prime) return (f_prime - f) / delta
[docs] def partial_derivative_scalar(self, U, V): """Compute partial derivative :math:`C(u|v)` of cumulative density of single values.""" self.check_fit() X = np.column_stack((U, V)) return self.partial_derivative(X).item()
[docs] def set_random_state(self, random_state): """Set the random state. Args: random_state (int, np.random.RandomState, or None): Seed or RandomState for the random generator. """ self.random_state = validate_random_state(random_state)
[docs] @random_state def sample(self, n_samples): """Generate specified `n_samples` of new data from model. The sampled are generated using the inverse transform method `v~U[0,1],v~C^-1(u|v)` Args: n_samples (int): amount of samples to create. Returns: np.ndarray: Array of length `n_samples` with generated data from the model. """ if self.tau > 1 or self.tau < -1: raise ValueError('The range for correlation measure is [-1,1].') v = np.random.uniform(0, 1, n_samples) c = np.random.uniform(0, 1, n_samples) u = self.percent_point(c, v) return np.column_stack((u, v))
[docs] def compute_theta(self): """Compute theta parameter using Kendall's tau.""" raise NotImplementedError
[docs] @classmethod def select_copula(cls, X): r"""Select best copula function based on likelihood. Given out candidate copulas the procedure proposed for selecting the one that best fit to a dataset of pairs :math:`\{(u_j, v_j )\}, j=1,2,...n` , is as follows: 1. Estimate the most likely parameter :math:`\theta` of each copula candidate for the given dataset. 2. Construct :math:`R(z|\theta)`. Calculate the area under the tail for each of the copula candidates. 3. Compare the areas: :math:`a_u` achieved using empirical copula against the ones achieved for the copula candidates. Score the outcome of the comparison from 3 (best) down to 1 (worst). 4. Proceed as in steps 2- 3 with the lower tail and function :math:`L`. 5. Finally the sum of empirical upper and lower tail functions is compared against :math:`R + L`. Scores of the three comparisons are summed and the candidate with the highest value is selected. Args: X(np.ndarray): Matrix of shape (n,2). Returns: copula: Best copula that fits for it. """ from copulas.bivariate import select_copula # noqa warnings.warn( '`Bivariate.select_copula` has been deprecated and will be removed in a later ' 'release. Please use `copulas.bivariate.select_copula` instead', DeprecationWarning, ) return select_copula(X)
[docs] def save(self, filename): """Save the internal state of a copula in the specified filename. Args: filename(str): Path to save. Returns: None """ content = self.to_dict() with open(filename, 'w') as f: json.dump(content, f)
[docs] @classmethod def load(cls, copula_path): """Create a new instance from a file. Args: copula_path(str): Path to file with the serialized copula. Returns: Bivariate: Instance with the parameters stored in the file. """ with open(copula_path) as f: copula_dict = json.load(f) return cls.from_dict(copula_dict)