# Source code for copulas.multivariate.vine

"""VineCopula module."""

import logging
import sys
import warnings

import numpy as np
import pandas as pd

from copulas import (
EPSILON, check_valid_values, get_qualified_name, random_state, store_args,
validate_random_state)
from copulas.bivariate.base import Bivariate, CopulaTypes
from copulas.multivariate.base import Multivariate
from copulas.multivariate.tree import Tree, get_tree
from copulas.univariate.gaussian_kde import GaussianKDE

LOGGER = logging.getLogger(__name__)

[docs]class VineCopula(Multivariate):
"""Vine copula model.

A :math:vine is a graphical representation of one factorization of the n-variate probability
distribution in terms of :math:n(n − 1)/2 bivariate copulas by means of the chain rule.

It consists of a sequence of levels and as many levels as variables. Each level consists of
a tree (no isolated nodes and no loops) satisfying that if it has :math:n nodes there must
be :math:n − 1 edges.

Each node in tree :math:T_1 is a variable and edges are couplings of variables constructed
with bivariate copulas.

Each node in tree :math:T_{k+1} is a coupling in :math:T_{k}, expressed by the copula
of the variables; while edges are couplings between two vertices that must have one variable
in common, becoming a conditioning variable in the bivariate copula. Thus, every level has
one node less than the former. Once all the trees are drawn, the factorization is the product
of all the nodes.

Args:
vine_type (str):
type of the vine copula, could be 'center','direct','regular'
random_state (int or np.random.RandomState):
Random seed or RandomState to use.

Attributes:
model (copulas.univariate.Univariate):
Distribution to compute univariates.
u_matrix (numpy.array):
Univariates.
n_sample (int):
Number of samples.
n_var (int):
Number of variables.
columns (pandas.Series):
Names of the variables.
tau_mat (numpy.array):
Kendall correlation parameters for data.
truncated (int):
Max level used to build the vine.
depth (int):
Vine depth.
trees (list[Tree]):
List of trees used by this vine.
ppfs (list[callable]):
percent point functions from the univariates used by this vine.
"""

@store_args
def __init__(self, vine_type, random_state=None):
if sys.version_info > (3, 8):
warnings.warn(
'Vines have not been fully tested on Python >= 3.8 and might '
'produce wrong results.'
)

self.random_state = validate_random_state(random_state)
self.vine_type = vine_type
self.u_matrix = None

self.model = GaussianKDE

@classmethod
def _deserialize_trees(cls, tree_list):
previous = Tree.from_dict(tree_list)
trees = [previous]

for tree_dict in tree_list[1:]:
tree = Tree.from_dict(tree_dict, previous)
trees.append(tree)
previous = tree

return trees

[docs]    def to_dict(self):
"""Return a dict with the parameters to replicate this Vine.

Returns:
dict:
Parameters of this Vine.
"""
result = {
'type': get_qualified_name(self),
'vine_type': self.vine_type,
'fitted': self.fitted
}

if not self.fitted:
return result

result.update({
'n_sample': self.n_sample,
'n_var': self.n_var,
'depth': self.depth,
'truncated': self.truncated,
'trees': [tree.to_dict() for tree in self.trees],
'tau_mat': self.tau_mat.tolist(),
'u_matrix': self.u_matrix.tolist(),
'unis': [distribution.to_dict() for distribution in self.unis],
'columns': self.columns
})
return result

[docs]    @classmethod
def from_dict(cls, vine_dict):
"""Create a new instance from a parameters dictionary.

Args:
params (dict):
Parameters of the Vine, in the same format as the one
returned by the to_dict method.

Returns:
Vine:
Instance of the Vine defined on the parameters.
"""
instance = cls(vine_dict['vine_type'])
fitted = vine_dict['fitted']
if fitted:
instance.fitted = fitted
instance.n_sample = vine_dict['n_sample']
instance.n_var = vine_dict['n_var']
instance.truncated = vine_dict['truncated']
instance.depth = vine_dict['depth']
instance.trees = cls._deserialize_trees(vine_dict['trees'])
instance.unis = [GaussianKDE.from_dict(uni) for uni in vine_dict['unis']]
instance.ppfs = [uni.percent_point for uni in instance.unis]
instance.columns = vine_dict['columns']
instance.tau_mat = np.array(vine_dict['tau_mat'])
instance.u_matrix = np.array(vine_dict['u_matrix'])

return instance

@check_valid_values
def fit(self, X, truncated=3):
"""Fit a vine model to the data.

1. Transform all the variables by means of their marginals.
In other words, compute

.. math:: u_i = F_i(x_i), i = 1, ..., n

and compose the matrix :math:u = u_1, ..., u_n, where :math:u_i are their columns.

Args:
X (numpy.ndarray):
Data to be fitted to.
truncated (int):
Max level to build the vine.
"""
LOGGER.info('Fitting VineCopula("%s")', self.vine_type)
self.n_sample, self.n_var = X.shape
self.columns = X.columns
self.tau_mat = X.corr(method='kendall').to_numpy()
self.u_matrix = np.empty([self.n_sample, self.n_var])

self.truncated = truncated
self.depth = self.n_var - 1
self.trees = []

self.unis, self.ppfs = [], []
for i, col in enumerate(X):
uni = self.model()
uni.fit(X[col])
self.u_matrix[:, i] = uni.cumulative_distribution(X[col])
self.unis.append(uni)
self.ppfs.append(uni.percent_point)

self.train_vine(self.vine_type)
self.fitted = True

[docs]    def train_vine(self, tree_type):
r"""Build the vine.

1. For the construction of the first tree :math:T_1, assign one node to each variable
and then couple them by maximizing the measure of association considered.
Different vines impose different constraints on this construction. When those are
applied different trees are achieved at this level.

2. Select the copula that best fits to the pair of variables coupled by each edge in
:math:T_1.

3. Let :math:C_{ij}(u_i , u_j ) be the copula for a given edge :math:(u_i, u_j)
in :math:T_1. Then for every edge in :math:T_1, compute either

.. math:: {v^1}_{j|i} = \\frac{\\partial C_{ij}(u_i, u_j)}{\\partial u_j}

or similarly :math:{v^1}_{i|j}, which are conditional cdfs. When finished with
all the edges, construct the new matrix with :math:v^1 that has one less column u.

4. Set k = 2.

5. Assign one node of :math:T_k to each edge of :math:T_ {k−1}. The structure of
:math:T_{k−1} imposes a set of constraints on which edges of :math:T_k are
realizable. Hence the next step is to get a linked list of the accesible nodes for
every node in :math:T_k.

6. As in step 1, nodes of :math:T_k are coupled maximizing the measure of association
considered and satisfying the constraints impose by the kind of vine employed plus the
set of constraints imposed by tree :math:T_{k−1}.

7. Select the copula that best fit to each edge created in :math:T_k.

8. Recompute matrix :math:v_k as in step 4, but taking :math:T_k and :math:vk−1
instead of :math:T_1 and u.

9. Set :math:k = k + 1 and repeat from (5) until all the trees are constructed.

Args:
tree_type (str or TreeTypes):
Type of trees to use.
"""
LOGGER.debug('start building tree : 0')
# 1
tree_1 = get_tree(tree_type)
tree_1.fit(0, self.n_var, self.tau_mat, self.u_matrix)
self.trees.append(tree_1)
LOGGER.debug('finish building tree : 0')

for k in range(1, min(self.n_var - 1, self.truncated)):
# get constraints from previous tree
self.trees[k - 1]._get_constraints()
tau = self.trees[k - 1].get_tau_matrix()
LOGGER.debug(f'start building tree: {k}')
tree_k = get_tree(tree_type)
tree_k.fit(k, self.n_var - k, tau, self.trees[k - 1])
self.trees.append(tree_k)
LOGGER.debug(f'finish building tree: {k}')

[docs]    def get_likelihood(self, uni_matrix):
"""Compute likelihood of the vine."""
num_tree = len(self.trees)
values = np.empty([1, num_tree])

for i in range(num_tree):
value, new_uni_matrix = self.trees[i].get_likelihood(uni_matrix)
uni_matrix = new_uni_matrix
values[0, i] = value

return np.sum(values)

def _sample_row(self):
"""Generate a single sampled row from vine model.

Returns:
numpy.ndarray
"""
unis = np.random.uniform(0, 1, self.n_var)
first_ind = np.random.randint(0, self.n_var)
visited = []
explore = [first_ind]

sampled = np.zeros(self.n_var)
itr = 0
while explore:
current = explore.pop(0)
if itr == 0:
new_x = self.ppfs[current](unis[current])

else:
for i in range(itr - 1, -1, -1):
current_ind = -1

if i >= self.truncated:
continue

current_tree = self.trees[i].edges
# get index of edge to retrieve
for edge in current_tree:
if i == 0:
if (edge.L == current and edge.R == visited) or\
(edge.R == current and edge.L == visited):
current_ind = edge.index
break
else:
if edge.L == current or edge.R == current:
condition = set(edge.D)

visit_set = set(visited)

if condition.issubset(visit_set):
current_ind = edge.index
break

if current_ind != -1:
# the node is not indepedent contional on visited node
copula_type = current_tree[current_ind].name
copula = Bivariate(copula_type=CopulaTypes(copula_type))
copula.theta = current_tree[current_ind].theta

U = np.array([unis[visited]])
if i == itr - 1:
tmp = copula.percent_point(np.array([unis[current]]), U)
else:
tmp = copula.percent_point(np.array([tmp]), U)

tmp = min(max(tmp, EPSILON), 0.99)

new_x = self.ppfs[current](np.array([tmp]))

sampled[current] = new_x

for s in neighbors:
if s not in visited:
explore.insert(0, s)

itr += 1
visited.insert(0, current)

return sampled

@random_state
def sample(self, num_rows):
"""Sample new rows.

Args:
num_rows (int):
Number of rows to sample

Returns:
pandas.DataFrame:
sampled rows.
"""
sampled_values = []
for i in range(num_rows):
sampled_values.append(self._sample_row())

return pd.DataFrame(sampled_values, columns=self.columns)