Source code for copulas

# -*- coding: utf-8 -*-

"""Top-level package for Copulas."""

__author__ = 'DataCebo, Inc.'
__email__ = 'info@sdv.dev'
__version__ = '0.11.0'

import contextlib
import importlib
import sys
import warnings
from copy import deepcopy
from importlib.metadata import entry_points
from operator import attrgetter

import numpy as np
import pandas as pd

EPSILON = np.finfo(np.float32).eps


[docs]class NotFittedError(Exception): """NotFittedError class."""
[docs]@contextlib.contextmanager def set_random_state(random_state, set_model_random_state): """Context manager for managing the random state. Args: random_state (int or np.random.RandomState): The random seed or RandomState. set_model_random_state (function): Function to set the random state on the model. """ original_state = np.random.get_state() np.random.set_state(random_state.get_state()) try: yield finally: current_random_state = np.random.RandomState() current_random_state.set_state(np.random.get_state()) set_model_random_state(current_random_state) np.random.set_state(original_state)
[docs]def random_state(function): """Set the random state before calling the function. Args: function (Callable): The function to wrap around. """ def wrapper(self, *args, **kwargs): if self.random_state is None: return function(self, *args, **kwargs) else: with set_random_state(self.random_state, self.set_random_state): return function(self, *args, **kwargs) return wrapper
[docs]def validate_random_state(random_state): """Validate random state argument. Args: random_state (int, numpy.random.RandomState, tuple, or None): Seed or RandomState for the random generator. Output: numpy.random.RandomState """ if random_state is None: return None if isinstance(random_state, int): return np.random.RandomState(seed=random_state) elif isinstance(random_state, np.random.RandomState): return random_state else: raise TypeError( f'`random_state` {random_state} expected to be an int ' 'or `np.random.RandomState` object.')
[docs]def get_instance(obj, **kwargs): """Create new instance of the ``obj`` argument. Args: obj (str, type, instance): """ instance = None if isinstance(obj, str): package, name = obj.rsplit('.', 1) instance = getattr(importlib.import_module(package), name)(**kwargs) elif isinstance(obj, type): instance = obj(**kwargs) else: if kwargs: instance = obj.__class__(**kwargs) else: args = getattr(obj, '__args__', ()) kwargs = getattr(obj, '__kwargs__', {}) instance = obj.__class__(*args, **kwargs) return instance
[docs]def store_args(__init__): """Save ``*args`` and ``**kwargs`` used in the ``__init__`` of a copula. Args: __init__(callable): ``__init__`` function to store their arguments. Returns: callable: Decorated ``__init__`` function. """ def new__init__(self, *args, **kwargs): args_copy = deepcopy(args) kwargs_copy = deepcopy(kwargs) __init__(self, *args, **kwargs) self.__args__ = args_copy self.__kwargs__ = kwargs_copy return new__init__
[docs]def get_qualified_name(_object): """Return the Fully Qualified Name from an instance or class.""" module = _object.__module__ if hasattr(_object, '__name__'): _class = _object.__name__ else: _class = _object.__class__.__name__ return module + '.' + _class
[docs]def vectorize(function): """Allow a method that only accepts scalars to accept vectors too. This decorator has two different behaviors depending on the dimensionality of the array passed as an argument: **1-d array** It will work under the assumption that the `function` argument is a callable with signature:: function(self, X, *args, **kwargs) where X is an scalar magnitude. In this case the arguments of the input array will be given one at a time, and both the input and output of the decorated function will have shape (n,). **2-d array** It will work under the assumption that the `function` argument is a callable with signature:: function(self, X0, ..., Xj, *args, **kwargs) where `Xi` are scalar magnitudes. It will pass the contents of each row unpacked on each call. The input is espected to have shape (n, j), the output a shape of (n,) It will return a function that is guaranteed to return a `numpy.array`. Args: function(callable): Function that only accept and return scalars. Returns: callable: Decorated function that can accept and return :attr:`numpy.array`. """ def decorated(self, X, *args, **kwargs): if not isinstance(X, np.ndarray): return function(self, X, *args, **kwargs) if len(X.shape) == 1: X = X.reshape([-1, 1]) if len(X.shape) == 2: return np.fromiter( (function(self, *x, *args, **kwargs) for x in X), np.dtype('float64') ) else: raise ValueError('Arrays of dimensionality higher than 2 are not supported.') decorated.__doc__ = function.__doc__ return decorated
[docs]def scalarize(function): """Allow methods that only accepts 1-d vectors to work with scalars. Args: function(callable): Function that accepts and returns vectors. Returns: callable: Decorated function that accepts and returns scalars. """ def decorated(self, X, *args, **kwargs): scalar = not isinstance(X, np.ndarray) if scalar: X = np.array([X]) result = function(self, X, *args, **kwargs) if scalar: result = result[0] return result decorated.__doc__ = function.__doc__ return decorated
[docs]def check_valid_values(function): """Raise an exception if the given values are not supported. Args: function(callable): Method whose unique argument is a numpy.array-like object. Returns: callable: Decorated function Raises: ValueError: If there are missing or invalid values or if the dataset is empty. """ def decorated(self, X, *args, **kwargs): if isinstance(X, pd.DataFrame): W = X.to_numpy() else: W = X if not len(W): raise ValueError('Your dataset is empty.') if not (np.issubdtype(W.dtype, np.floating) or np.issubdtype(W.dtype, np.integer)): raise ValueError('There are non-numerical values in your data.') if np.isnan(W).any().any(): raise ValueError('There are nan values in your data.') return function(self, X, *args, **kwargs) return decorated
def _get_addon_target(addon_path_name): """Find the target object for the add-on. Args: addon_path_name (str): The add-on's name. The add-on's name should be the full path of valid Python identifiers (i.e. importable.module:object.attr). Returns: tuple: * object: The base module or object the add-on should be added to. * str: The name the add-on should be added to under the module or object. """ module_path, _, object_path = addon_path_name.partition(':') module_path = module_path.split('.') if module_path[0] != __name__: msg = f"expected base module to be '{__name__}', found '{module_path[0]}'" raise AttributeError(msg) target_base = sys.modules[__name__] for submodule in module_path[1:-1]: target_base = getattr(target_base, submodule) addon_name = module_path[-1] if object_path: if len(module_path) > 1 and not hasattr(target_base, module_path[-1]): msg = f"cannot add '{object_path}' to unknown submodule '{'.'.join(module_path)}'" raise AttributeError(msg) if len(module_path) > 1: target_base = getattr(target_base, module_path[-1]) split_object = object_path.split('.') addon_name = split_object[-1] if len(split_object) > 1: target_base = attrgetter('.'.join(split_object[:-1]))(target_base) return target_base, addon_name def _find_addons(): """Find and load all copulas add-ons.""" group = 'copulas_modules' try: eps = entry_points(group=group) except TypeError: # Load-time selection requires Python >= 3.10 or importlib_metadata >= 3.6 eps = entry_points().get(group, []) for entry_point in eps: try: addon = entry_point.load() except Exception: # pylint: disable=broad-exception-caught msg = f'Failed to load "{entry_point.name}" from "{entry_point.module_name}".' warnings.warn(msg) continue try: addon_target, addon_name = _get_addon_target(entry_point.name) except AttributeError as error: msg = f"Failed to set '{entry_point.name}': {error}." warnings.warn(msg) continue setattr(addon_target, addon_name, addon) _find_addons()