# Source code for copulas.multivariate.tree

"""Multivariate trees module."""

import logging
from enum import Enum

import numpy as np
import scipy

from copulas import EPSILON, get_qualified_name
from copulas.bivariate.base import Bivariate
from copulas.multivariate.base import Multivariate

LOGGER = logging.getLogger(__name__)

[docs]class TreeTypes(Enum):
"""The available types of trees."""

CENTER = 0
DIRECT = 1
REGULAR = 2

[docs]class Tree(Multivariate):
"""Helper class to instantiate a single tree in the vine model."""

tree_type = None
fitted = False

[docs]    def fit(self, index, n_nodes, tau_matrix, previous_tree, edges=None):
"""Fit this tree object.

Args:
index (int):
index of the tree.
n_nodes (int):
number of nodes in the tree.
tau_matrix (numpy.array):
kendall's tau matrix of the data, shape (n_nodes, n_nodes).
previous_tree (Tree):
tree object of previous level.
"""
self.level = index + 1
self.n_nodes = n_nodes
self.tau_matrix = tau_matrix
self.previous_tree = previous_tree
self.edges = edges or []

if not self.edges:
if self.level == 1:
self.u_matrix = previous_tree
self._build_first_tree()

else:
self._build_kth_tree()

self.prepare_next_tree()

self.fitted = True

def _check_constraint(self, edge1, edge2):
"""Check if two edges satisfy vine constraint.

Args:
edge1 (Edge):
edge object representing edge1
edge2 (Edge):
edge object representing edge2

Returns:
bool:
True if the two edges satisfy vine constraints
"""
full_node = {edge1.L, edge1.R, edge2.L, edge2.R}
full_node.update(edge1.D)
full_node.update(edge2.D)
return len(full_node) == (self.level + 1)

def _get_constraints(self):
"""Get neighboring edges for each edge in the edges."""
num_edges = len(self.edges)
for k in range(num_edges):
for i in range(num_edges):
# add to constraints if i shared an edge with k
if k != i and self.edges[k].is_adjacent(self.edges[i]):
self.edges[k].neighbors.append(i)

def _sort_tau_by_y(self, y):
"""Sort tau matrix by dependece with variable y.

Args:
y (int):
index of variable of intrest

Returns:
numpy.ndarray:
sorted tau matrix.
"""
# first column is the variable of interest
tau_y = self.tau_matrix[:, y]
tau_y[y] = np.NaN

temp = np.empty([self.n_nodes, 3])
temp[:, 0] = np.arange(self.n_nodes)
temp[:, 1] = tau_y
temp[:, 2] = abs(tau_y)
temp[np.isnan(temp)] = -10
sort_temp = temp[:, 2].argsort()[::-1]
tau_sorted = temp[sort_temp]

return tau_sorted

[docs]    def get_tau_matrix(self):
"""Get tau matrix for adjacent pairs.

Returns:
tau (numpy.ndarray):
tau matrix for the current tree
"""
num_edges = len(self.edges)
tau = np.empty([num_edges, num_edges])

for i in range(num_edges):
edge = self.edges[i]
for j in edge.neighbors:
if self.level == 1:
left_u = self.u_matrix[:, edge.L]
right_u = self.u_matrix[:, edge.R]

else:
left_parent, right_parent = edge.parents
left_u, right_u = Edge.get_conditional_uni(left_parent, right_parent)

tau[i, j], pvalue = scipy.stats.kendalltau(left_u, right_u)

return tau

Returns:
numpy.ndarray:
"""
edges = self.edges
num_edges = len(edges) + 1

for k in range(num_edges - 1):

[docs]    def prepare_next_tree(self):
"""Prepare conditional U matrix for next tree."""
for edge in self.edges:
copula_theta = edge.theta

if self.level == 1:
left_u = self.u_matrix[:, edge.L]
right_u = self.u_matrix[:, edge.R]

else:
left_parent, right_parent = edge.parents
left_u, right_u = Edge.get_conditional_uni(left_parent, right_parent)

# compute conditional cdfs C(i|j) = dC(i,j)/duj and dC(i,j)/du
left_u = [x for x in left_u if x is not None]
right_u = [x for x in right_u if x is not None]
X_left_right = np.array([[x, y] for x, y in zip(left_u, right_u)])
X_right_left = np.array([[x, y] for x, y in zip(right_u, left_u)])

copula = Bivariate(copula_type=edge.name)
copula.theta = copula_theta
left_given_right = copula.partial_derivative(X_left_right)
right_given_left = copula.partial_derivative(X_right_left)

# correction of 0 or 1
left_given_right[left_given_right == 0] = EPSILON
right_given_left[right_given_left == 0] = EPSILON
left_given_right[left_given_right == 1] = 1 - EPSILON
right_given_left[right_given_left == 1] = 1 - EPSILON
edge.U = np.array([left_given_right, right_given_left])

[docs]    def get_likelihood(self, uni_matrix):
"""Compute likelihood of the tree given an U matrix.

Args:
uni_matrix (numpy.array):
univariate matrix to evaluate likelihood on.

Returns:
tuple[float, numpy.array]:
likelihood of the current tree, next level conditional univariate matrix
"""
uni_dim = uni_matrix.shape[1]
num_edge = len(self.edges)
values = np.zeros([1, num_edge])
new_uni_matrix = np.empty([uni_dim, uni_dim])

for i in range(num_edge):
edge = self.edges[i]
value, left_u, right_u = edge.get_likelihood(uni_matrix)
new_uni_matrix[edge.L, edge.R] = left_u
new_uni_matrix[edge.R, edge.L] = right_u
values[0, i] = np.log(value)

return np.sum(values), new_uni_matrix

def __str__(self):
"""Produce printable representation of the class."""
template = 'L:{} R:{} D:{} Copula:{} Theta:{}'
return '\n'.join([
template.format(edge.L, edge.R, edge.D, edge.name, edge.theta)
for edge in self.edges
])

def _serialize_previous_tree(self):
if self.level == 1:
return self.previous_tree.tolist()

return None

@classmethod
def _deserialize_previous_tree(cls, tree_dict, previous):
if tree_dict['level'] == 1:
return np.array(tree_dict['previous_tree'])

return previous

[docs]    def to_dict(self):
"""Return a dict with the parameters to replicate this Tree.

Returns:
dict:
Parameters of this Tree.
"""
fitted = self.fitted
result = {
'tree_type': self.tree_type,
'type': get_qualified_name(self),
'fitted': fitted
}

if not fitted:
return result

result.update({
'level': self.level,
'n_nodes': self.n_nodes,
'tau_matrix': self.tau_matrix.tolist(),
'previous_tree': self._serialize_previous_tree(),
'edges': [edge.to_dict() for edge in self.edges],
})

return result

[docs]    @classmethod
def from_dict(cls, tree_dict, previous=None):
"""Create a new instance from a parameters dictionary.

Args:
params (dict):
Parameters of the Tree, in the same format as the one
returned by the to_dict method.

Returns:
Tree:
Instance of the tree defined on the parameters.
"""
instance = get_tree(tree_dict['tree_type'])

fitted = tree_dict['fitted']
instance.fitted = fitted
if fitted:
instance.level = tree_dict['level']
instance.n_nodes = tree_dict['n_nodes']
instance.tau_matrix = np.array(tree_dict['tau_matrix'])
instance.previous_tree = cls._deserialize_previous_tree(tree_dict, previous)
instance.edges = [Edge.from_dict(edge) for edge in tree_dict['edges']]

return instance

[docs]class CenterTree(Tree):
"""Tree for a C-vine copula."""

tree_type = TreeTypes.CENTER

def _build_first_tree(self):
"""Build first level tree."""
tau_sorted = self._sort_tau_by_y(0)
for itr in range(self.n_nodes - 1):
ind = int(tau_sorted[itr, 0])
copula = Bivariate.select_copula(self.u_matrix[:, (0, ind)])
name, theta = copula.copula_type, copula.theta

new_edge = Edge(itr, 0, ind, name, theta)
new_edge.tau = self.tau_matrix[0, ind]
self.edges.append(new_edge)

def _build_kth_tree(self):
"""Build k-th level tree."""
anchor = self.get_anchor()
aux_sorted = self._sort_tau_by_y(anchor)
edges = self.previous_tree.edges

for itr in range(self.n_nodes - 1):
right = int(aux_sorted[itr, 0])
left_parent, right_parent = Edge.sort_edge([edges[anchor], edges[right]])
new_edge = Edge.get_child_edge(itr, left_parent, right_parent)
new_edge.tau = aux_sorted[itr, 1]
self.edges.append(new_edge)

[docs]    def get_anchor(self):
"""Find anchor variable with highest sum of dependence with the rest.

Returns:
int:
Anchor variable.
"""
temp = np.empty([self.n_nodes, 2])
temp[:, 0] = np.arange(self.n_nodes, dtype=int)
temp[:, 1] = np.sum(abs(self.tau_matrix), 1)
anchor = int(temp[0, 0])
return anchor

[docs]class DirectTree(Tree):
"""DirectTree class."""

tree_type = TreeTypes.DIRECT

def _build_first_tree(self):
# find the pair of maximum tau
tau_matrix = self.tau_matrix
tau_sorted = self._sort_tau_by_y(0)
left_ind = tau_sorted[0, 0]
right_ind = tau_sorted[1, 0]
T1 = np.array([left_ind, 0, right_ind]).astype(int)
tau_T1 = tau_sorted[:2, 1]

# replace tau matrix of the selected variables as a negative number
tau_matrix[:, [T1]] = -10
for k in range(2, self.n_nodes - 1):
left = np.argmax(tau_matrix[T1[0], :])
right = np.argmax(tau_matrix[T1[-1], :])
valL = np.max(tau_matrix[T1[0], :])
valR = np.max(tau_matrix[T1[-1], :])

if valL > valR:
# add nodes to the left
T1 = np.append(int(left), T1)
tau_T1 = np.append(valL, tau_T1)
tau_matrix[:, left] = -10

else:
# add node to the right
T1 = np.append(T1, int(right))
tau_T1 = np.append(tau_T1, valR)
tau_matrix[:, right] = -10

for k in range(self.n_nodes - 1):
copula = Bivariate.select_copula(self.u_matrix[:, (T1[k], T1[k + 1])])
name, theta = copula.copula_type, copula.theta

left, right = sorted([T1[k], T1[k + 1]])
new_edge = Edge(k, left, right, name, theta)
new_edge.tau = tau_T1[k]
self.edges.append(new_edge)

def _build_kth_tree(self):
edges = self.previous_tree.edges
for k in range(self.n_nodes - 1):
left_parent, right_parent = Edge.sort_edge([edges[k], edges[k + 1]])
new_edge = Edge.get_child_edge(k, left_parent, right_parent)
new_edge.tau = self.tau_matrix[k, k + 1]
self.edges.append(new_edge)

[docs]class RegularTree(Tree):
"""RegularTree class."""

tree_type = TreeTypes.REGULAR

def _build_first_tree(self):
"""Build the first tree with n-1 variable."""
# Prim's algorithm
neg_tau = -1.0 * abs(self.tau_matrix)
X = {0}

while len(X) != self.n_nodes:
for x in X:
for k in range(self.n_nodes):
if k not in X and k != x:

# find edge with maximum
edge = sorted(adj_set, key=lambda e: neg_tau[e[0]][e[1]])[0]
copula = Bivariate.select_copula(self.u_matrix[:, (edge[0], edge[1])])
name, theta = copula.copula_type, copula.theta

left, right = sorted([edge[0], edge[1]])
new_edge = Edge(len(X) - 1, left, right, name, theta)
new_edge.tau = self.tau_matrix[edge[0], edge[1]]
self.edges.append(new_edge)

def _build_kth_tree(self):
"""Build tree for level k."""
neg_tau = -1.0 * abs(self.tau_matrix)
edges = self.previous_tree.edges
visited = {0}
unvisited = set(range(self.n_nodes))

while len(visited) != self.n_nodes:
for x in visited:
for k in range(self.n_nodes):
# check if (x,k) is a valid edge in the vine
if k not in visited and k != x and self._check_constraint(edges[x], edges[k]):

# find edge with maximum tau
continue

pairs = sorted(adj_set, key=lambda e: neg_tau[e[0]][e[1]])[0]
left_parent, right_parent = Edge.sort_edge([edges[pairs[0]], edges[pairs[1]]])

new_edge = Edge.get_child_edge(len(visited) - 1, left_parent, right_parent)
new_edge.tau = self.tau_matrix[pairs[0], pairs[1]]
self.edges.append(new_edge)

unvisited.remove(pairs[1])

[docs]def get_tree(tree_type):
"""Get a Tree instance of the specified type.

Args:
tree_type (str or TreeTypes):
Type of tree of which to get an instance.

Returns:
Tree:
Instance of a Tree of the specified type.
"""
if not isinstance(tree_type, TreeTypes):
if (isinstance(tree_type, str) and tree_type.upper() in TreeTypes.__members__):
tree_type = TreeTypes[tree_type.upper()]
else:
raise ValueError(f'Invalid tree type {tree_type}')

if tree_type == TreeTypes.CENTER:
return CenterTree()
if tree_type == TreeTypes.REGULAR:
return RegularTree()
if tree_type == TreeTypes.DIRECT:
return DirectTree()

[docs]class Edge(object):
"""Represents an edge in the copula."""

def __init__(self, index, left, right, copula_name, copula_theta):
"""Initialize an Edge object.

Args:
left (int):
left_node index (smaller)
right (int):
right_node index (larger)
copula_name (str):
name of the fitted copula class
copula_theta (float):
parameters of the fitted copula class
"""
self.index = index
self.L = left
self.R = right
self.D = set()  # dependence_set
self.parents = None
self.neighbors = []

self.name = copula_name
self.theta = copula_theta
self.tau = None
self.U = None
self.likelihood = None

@staticmethod
def _identify_eds_ing(first, second):

Args:
first (Edge):
Edge object representing the first edge.
second (Edge):
Edge object representing the second edge.

Returns:
tuple[int, int, set[int]]:
The first two values represent left and right node
indicies of the new edge. The third value is the new dependence set.
"""
A = {first.L, first.R}
A.update(first.D)

B = {second.L, second.R}
B.update(second.D)

depend_set = A & B
left, right = sorted(A ^ B)

return left, right, depend_set

"""Check if two edges are adjacent.

Args:
another_edge (Edge):
edge object of another edge

Returns:
bool:
True if the two edges are adjacent.
"""
return (
self.L == another_edge.L
or self.L == another_edge.R
or self.R == another_edge.L
or self.R == another_edge.R
)

[docs]    @staticmethod
def sort_edge(edges):
"""Sort iterable of edges first by left node indices then right.

Args:
edges (list[Edge]):
List of edges to be sorted.

Returns:
list[Edge]:
Sorted list by left and right node indices.
"""
return sorted(edges, key=lambda x: (x.L, x.R))

[docs]    @classmethod
def get_conditional_uni(cls, left_parent, right_parent):
"""Identify pair univariate value from parents.

Args:
left_parent (Edge):
left parent
right_parent (Edge):
right parent

Returns:
tuple[np.ndarray, np.ndarray]:
left and right parents univariate.
"""
left, right, _ = cls._identify_eds_ing(left_parent, right_parent)

left_u = left_parent.U[0] if left_parent.L == left else left_parent.U[1]
right_u = right_parent.U[0] if right_parent.L == right else right_parent.U[1]

return left_u, right_u

[docs]    @classmethod
def get_child_edge(cls, index, left_parent, right_parent):
"""Construct a child edge from two parent edges.

Args:
index (int):
Index of the new Edge.
left_parent (Edge):
Left parent
right_parent (Edge):
Right parent

Returns:
Edge:
The new child edge.
"""
[ed1, ed2, depend_set] = cls._identify_eds_ing(left_parent, right_parent)
left_u, right_u = cls.get_conditional_uni(left_parent, right_parent)
X = np.array([[x, y] for x, y in zip(left_u, right_u)])
copula = Bivariate.select_copula(X)
name, theta = copula.copula_type, copula.theta
new_edge = Edge(index, ed1, ed2, name, theta)
new_edge.D = depend_set
new_edge.parents = [left_parent, right_parent]
return new_edge

[docs]    def get_likelihood(self, uni_matrix):
"""Compute likelihood given a U matrix.

Args:
uni_matrix (numpy.array):
Matrix to compute the likelihood.

Return:
tuple (np.ndarray, np.ndarray, np.array):
likelihood and conditional values.
"""
if self.parents is None:
left_u = uni_matrix[:, self.L]
right_u = uni_matrix[:, self.R]

else:
left_ing = list(self.D - self.parents[0].D)[0]
right_ing = list(self.D - self.parents[1].D)[0]
left_u = uni_matrix[self.L, left_ing]
right_u = uni_matrix[self.R, right_ing]

copula = Bivariate(copula_type=self.name)
copula.theta = self.theta

X_left_right = np.array([[left_u, right_u]])
X_right_left = np.array([[right_u, left_u]])

value = np.sum(copula.probability_density(X_left_right))
left_given_right = copula.partial_derivative(X_left_right)
right_given_left = copula.partial_derivative(X_right_left)

return value, left_given_right, right_given_left

[docs]    def to_dict(self):
"""Return a dict with the parameters to replicate this Edge.

Returns:
dict:
Parameters of this Edge.
"""
parents = None
if self.parents:
parents = [parent.to_dict() for parent in self.parents]

U = None
if self.U is not None:
U = self.U.tolist()

return {
'index': self.index,
'L': self.L,
'R': self.R,
'D': self.D,
'parents': parents,
'neighbors': self.neighbors,
'name': self.name,
'theta': self.theta,
'tau': self.tau,
'U': U,
'likelihood': self.likelihood
}

[docs]    @classmethod
def from_dict(cls, edge_dict):
"""Create a new instance from a parameters dictionary.

Args:
params (dict):
Parameters of the Edge, in the same format as the one
returned by the to_dict method.

Returns:
Edge:
Instance of the edge defined on the parameters.
"""
instance = cls(
edge_dict['index'], edge_dict['L'], edge_dict['R'],
edge_dict['name'], edge_dict['theta']
)
instance.U = np.array(edge_dict['U'])
parents = edge_dict['parents']

if parents:
instance.parents = []
for parent in parents:
edge = Edge.from_dict(parent)
instance.parents.append(edge)

regular_attributes = ['D', 'tau', 'likelihood', 'neighbors']
for key in regular_attributes:
setattr(instance, key, edge_dict[key])

return instance