"""Base Univariate class."""
import pickle
from abc import ABC
from enum import Enum
import numpy as np
from copulas import (
NotFittedError, get_instance, get_qualified_name, random_state, store_args,
validate_random_state)
from copulas.univariate.selection import select_univariate
[docs]class ParametricType(Enum):
"""Parametric Enum."""
NON_PARAMETRIC = 0
PARAMETRIC = 1
[docs]class BoundedType(Enum):
"""Bounded Enum."""
UNBOUNDED = 0
SEMI_BOUNDED = 1
BOUNDED = 2
[docs]class Univariate(object):
"""Univariate Distribution.
Args:
candidates (list[str or type or Univariate]):
List of candidates to select the best univariate from.
It can be a list of strings representing Univariate FQNs,
or a list of Univariate subclasses or a list of instances.
parametric (ParametricType):
If not ``None``, only select subclasses of this type.
Ignored if ``candidates`` is passed.
bounded (BoundedType):
If not ``None``, only select subclasses of this type.
Ignored if ``candidates`` is passed.
random_state (int or np.random.RandomState):
Random seed or RandomState to use.
selection_sample_size (int):
Size of the subsample to use for candidate selection.
If ``None``, all the data is used.
"""
PARAMETRIC = ParametricType.NON_PARAMETRIC
BOUNDED = BoundedType.UNBOUNDED
fitted = False
_constant_value = None
_instance = None
@classmethod
def _select_candidates(cls, parametric=None, bounded=None):
"""Select which subclasses fulfill the specified constriants.
Args:
parametric (ParametricType):
If not ``None``, only select subclasses of this type.
bounded (BoundedType):
If not ``None``, only select subclasses of this type.
Returns:
list:
Selected subclasses.
"""
candidates = []
for subclass in cls.__subclasses__():
candidates.extend(subclass._select_candidates(parametric, bounded))
if ABC in subclass.__bases__:
continue
if parametric is not None and subclass.PARAMETRIC != parametric:
continue
if bounded is not None and subclass.BOUNDED != bounded:
continue
candidates.append(subclass)
return candidates
@store_args
def __init__(self, candidates=None, parametric=None, bounded=None, random_state=None,
selection_sample_size=None):
self.candidates = candidates or self._select_candidates(parametric, bounded)
self.random_state = validate_random_state(random_state)
self.selection_sample_size = selection_sample_size
@classmethod
def __repr__(cls):
"""Return class name."""
return cls.__name__
[docs] def check_fit(self):
"""Check whether this model has already been fit to a random variable.
Raise a ``NotFittedError`` if it has not.
Raises:
NotFittedError:
if the model is not fitted.
"""
if not self.fitted:
raise NotFittedError('This model is not fitted.')
def _constant_sample(self, num_samples):
"""Sample values for a constant distribution.
Args:
num_samples (int):
Number of rows to sample
Returns:
numpy.ndarray:
Sampled values. Array of shape (num_samples,).
"""
return np.full(num_samples, self._constant_value)
def _constant_cumulative_distribution(self, X):
"""Cumulative distribution for the degenerate case of constant distribution.
Note that the output of this method will be an array whose unique values are 0 and 1.
More information can be found here: https://en.wikipedia.org/wiki/Degenerate_distribution
Arguments:
X (numpy.ndarray):
Values for which the cumulative distribution will be computed.
It must have shape (n, 1).
Returns:
numpy.ndarray:
Cumulative distribution values for points in X.
"""
result = np.ones(X.shape)
result[np.nonzero(X < self._constant_value)] = 0
return result
def _constant_probability_density(self, X):
"""Probability density for the degenerate case of constant distribution.
Note that the output of this method will be an array whose unique values are 0 and 1.
More information can be found here: https://en.wikipedia.org/wiki/Degenerate_distribution
Arguments:
X (numpy.ndarray):
Values for which the probability density will be computed.
It must have shape (n, 1).
Returns:
numpy.ndarray:
Probability density values for points in X.
"""
result = np.zeros(X.shape)
result[np.nonzero(X == self._constant_value)] = 1
return result
def _constant_percent_point(self, X):
"""Percent point for the degenerate case of constant distribution.
Note that the output of this method will be an array whose unique values are `np.nan`
and self._constant_value.
More information can be found here: https://en.wikipedia.org/wiki/Degenerate_distribution
Arguments:
U (numpy.ndarray):
Values for which the cumulative distribution will be computed.
It must have shape (n, 1) and values must be in [0,1].
Returns:
numpy.ndarray:
Inverse cumulative distribution values for points in U.
"""
return np.full(X.shape, self._constant_value)
def _replace_constant_methods(self):
"""Replace conventional distribution methods by its constant counterparts."""
self.cumulative_distribution = self._constant_cumulative_distribution
self.percent_point = self._constant_percent_point
self.probability_density = self._constant_probability_density
self.sample = self._constant_sample
def _set_constant_value(self, constant_value):
"""Set the distribution up to behave as a degenerate distribution.
The constant value is stored as ``self._constant_value`` and all
the methods are replaced by their degenerate counterparts.
Args:
constant_value (float):
Value to set as the constant one.
"""
self._constant_value = constant_value
self._replace_constant_methods()
def _check_constant_value(self, X):
"""Check if a Series or array contains only one unique value.
If it contains only one value, set the instance up to behave accordingly.
Args:
X (numpy.ndarray):
Data to analyze.
Returns:
float:
Whether the input data had only one value or not.
"""
uniques = np.unique(X)
if len(uniques) == 1:
self._set_constant_value(uniques[0])
return True
return False
[docs] def fit(self, X):
"""Fit the model to a random variable.
Arguments:
X (numpy.ndarray):
Values of the random variable. It must have shape (n, 1).
"""
if self.selection_sample_size and self.selection_sample_size < len(X):
selection_sample = np.random.choice(X, size=self.selection_sample_size)
else:
selection_sample = X
self._instance = select_univariate(selection_sample, self.candidates)
self._instance.fit(X)
self.fitted = True
[docs] def probability_density(self, X):
"""Compute the probability density for each point in X.
Arguments:
X (numpy.ndarray):
Values for which the probability density will be computed.
It must have shape (n, 1).
Returns:
numpy.ndarray:
Probability density values for points in X.
Raises:
NotFittedError:
if the model is not fitted.
"""
self.check_fit()
return self._instance.probability_density(X)
[docs] def log_probability_density(self, X):
"""Compute the log of the probability density for each point in X.
It should be overridden with numerically stable variants whenever possible.
Arguments:
X (numpy.ndarray):
Values for which the log probability density will be computed.
It must have shape (n, 1).
Returns:
numpy.ndarray:
Log probability density values for points in X.
Raises:
NotFittedError:
if the model is not fitted.
"""
self.check_fit()
if self._instance:
return self._instance.log_probability_density(X)
return np.log(self.probability_density(X))
[docs] def pdf(self, X):
"""Compute the probability density for each point in X.
Arguments:
X (numpy.ndarray):
Values for which the probability density will be computed.
It must have shape (n, 1).
Returns:
numpy.ndarray:
Probability density values for points in X.
"""
return self.probability_density(X)
[docs] def cumulative_distribution(self, X):
"""Compute the cumulative distribution value for each point in X.
Arguments:
X (numpy.ndarray):
Values for which the cumulative distribution will be computed.
It must have shape (n, 1).
Returns:
numpy.ndarray:
Cumulative distribution values for points in X.
Raises:
NotFittedError:
if the model is not fitted.
"""
self.check_fit()
return self._instance.cumulative_distribution(X)
[docs] def cdf(self, X):
"""Compute the cumulative distribution value for each point in X.
Arguments:
X (numpy.ndarray):
Values for which the cumulative distribution will be computed.
It must have shape (n, 1).
Returns:
numpy.ndarray:
Cumulative distribution values for points in X.
"""
return self.cumulative_distribution(X)
[docs] def percent_point(self, U):
"""Compute the inverse cumulative distribution value for each point in U.
Arguments:
U (numpy.ndarray):
Values for which the cumulative distribution will be computed.
It must have shape (n, 1) and values must be in [0,1].
Returns:
numpy.ndarray:
Inverse cumulative distribution values for points in U.
Raises:
NotFittedError:
if the model is not fitted.
"""
self.check_fit()
return self._instance.percent_point(U)
[docs] def ppf(self, U):
"""Compute the inverse cumulative distribution value for each point in U.
Arguments:
U (numpy.ndarray):
Values for which the cumulative distribution will be computed.
It must have shape (n, 1) and values must be in [0,1].
Returns:
numpy.ndarray:
Inverse cumulative distribution values for points in U.
"""
return self.percent_point(U)
[docs] def set_random_state(self, random_state):
"""Set the random state.
Args:
random_state (int, np.random.RandomState, or None):
Seed or RandomState for the random generator.
"""
self.random_state = validate_random_state(random_state)
[docs] def sample(self, n_samples=1):
"""Sample values from this model.
Argument:
n_samples (int):
Number of values to sample
Returns:
numpy.ndarray:
Array of shape (n_samples, 1) with values randomly
sampled from this model distribution.
Raises:
NotFittedError:
if the model is not fitted.
"""
self.check_fit()
return self._instance.sample(n_samples)
def _get_params(self):
"""Return attributes from self.model to serialize.
Returns:
dict:
Parameters of the underlying distribution.
"""
return self._instance._get_params()
def _set_params(self, params):
"""Set the parameters of this univariate.
Must be implemented in all the subclasses.
Args:
dict:
Parameters to recreate this instance.
"""
raise NotImplementedError()
[docs] def to_dict(self):
"""Return the parameters of this model in a dict.
Returns:
dict:
Dictionary containing the distribution type and all
the parameters that define the distribution.
Raises:
NotFittedError:
if the model is not fitted.
"""
self.check_fit()
params = self._get_params()
if self.__class__ is Univariate:
params['type'] = get_qualified_name(self._instance)
else:
params['type'] = get_qualified_name(self)
return params
[docs] @classmethod
def from_dict(cls, params):
"""Build a distribution from its params dict.
Args:
params (dict):
Dictionary containing the FQN of the distribution and the
necessary parameters to rebuild it.
The input format is exactly the same that is outputted by
the distribution class ``to_dict`` method.
Returns:
Univariate:
Distribution instance.
"""
params = params.copy()
distribution = get_instance(params.pop('type'))
distribution._set_params(params)
distribution.fitted = True
return distribution
[docs] def save(self, path):
"""Serialize this univariate instance using pickle.
Args:
path (str):
Path to where this distribution will be serialized.
"""
with open(path, 'wb') as pickle_file:
pickle.dump(self, pickle_file)
[docs] @classmethod
def load(cls, path):
"""Load a Univariate instance from a pickle file.
Args:
path (str):
Path to the pickle file where the distribution has been serialized.
Returns:
Univariate:
Loaded instance.
"""
with open(path, 'rb') as pickle_file:
return pickle.load(pickle_file)
[docs]class ScipyModel(Univariate, ABC):
"""Wrapper for scipy models.
This class makes the probability_density, cumulative_distribution,
percent_point and sample point at the underlying pdf, cdf, ppd and rvs
methods respectively.
fit, _get_params and _set_params must be implemented by the subclasses.
"""
MODEL_CLASS = None
_params = None
def __init__(self, random_state=None):
"""Initialize Scipy model.
Overwrite Univariate __init__ to skip candidate initialization.
Args:
random_state (int, np.random.RandomState, or None): seed
or RandomState for random generator.
"""
self.random_state = validate_random_state(random_state)
[docs] def probability_density(self, X):
"""Compute the probability density for each point in X.
Arguments:
X (numpy.ndarray):
Values for which the probability density will be computed.
It must have shape (n, 1).
Returns:
numpy.ndarray:
Probability density values for points in X.
Raises:
NotFittedError:
if the model is not fitted.
"""
self.check_fit()
return self.MODEL_CLASS.pdf(X, **self._params)
[docs] def log_probability_density(self, X):
"""Compute the log of the probability density for each point in X.
Arguments:
X (numpy.ndarray):
Values for which the log probability density will be computed.
It must have shape (n, 1).
Returns:
numpy.ndarray:
Log probability density values for points in X.
Raises:
NotFittedError:
if the model is not fitted.
"""
self.check_fit()
if hasattr(self.MODEL_CLASS, 'logpdf'):
return self.MODEL_CLASS.logpdf(X, **self._params)
return np.log(self.probability_density(X))
[docs] def cumulative_distribution(self, X):
"""Compute the cumulative distribution value for each point in X.
Arguments:
X (numpy.ndarray):
Values for which the cumulative distribution will be computed.
It must have shape (n, 1).
Returns:
numpy.ndarray:
Cumulative distribution values for points in X.
Raises:
NotFittedError:
if the model is not fitted.
"""
self.check_fit()
return self.MODEL_CLASS.cdf(X, **self._params)
[docs] def percent_point(self, U):
"""Compute the inverse cumulative distribution value for each point in U.
Arguments:
U (numpy.ndarray):
Values for which the cumulative distribution will be computed.
It must have shape (n, 1) and values must be in [0,1].
Returns:
numpy.ndarray:
Inverse cumulative distribution values for points in U.
Raises:
NotFittedError:
if the model is not fitted.
"""
self.check_fit()
return self.MODEL_CLASS.ppf(U, **self._params)
@random_state
def sample(self, n_samples=1):
"""Sample values from this model.
Argument:
n_samples (int):
Number of values to sample
Returns:
numpy.ndarray:
Array of shape (n_samples, 1) with values randomly
sampled from this model distribution.
Raises:
NotFittedError:
if the model is not fitted.
"""
self.check_fit()
return self.MODEL_CLASS.rvs(size=n_samples, **self._params)
def _fit(self, X):
"""Fit the model to a non-constant random variable.
Must be implemented in all the subclasses.
Arguments:
X (numpy.ndarray):
Values of the random variable. It must have shape (n, 1).
"""
raise NotImplementedError()
[docs] def fit(self, X):
"""Fit the model to a random variable.
Arguments:
X (numpy.ndarray):
Values of the random variable. It must have shape (n, 1).
"""
if self._check_constant_value(X):
self._fit_constant(X)
else:
self._fit(X)
self.fitted = True
def _get_params(self):
"""Return attributes from self._model to serialize.
Must be implemented in all the subclasses.
Returns:
dict:
Parameters to recreate self._model in its current fit status.
"""
return self._params.copy()
def _set_params(self, params):
"""Set the parameters of this univariate.
Args:
params (dict):
Parameters to recreate this instance.
"""
self._params = params.copy()
if self._is_constant():
constant = self._extract_constant()
self._set_constant_value(constant)