"""Multivariate trees module."""
import logging
from enum import Enum
import numpy as np
import scipy
from copulas import EPSILON, get_qualified_name
from copulas.bivariate.base import Bivariate
from copulas.multivariate.base import Multivariate
LOGGER = logging.getLogger(__name__)
[docs]class TreeTypes(Enum):
"""The available types of trees."""
CENTER = 0
DIRECT = 1
REGULAR = 2
[docs]class Tree(Multivariate):
"""Helper class to instantiate a single tree in the vine model."""
tree_type = None
fitted = False
[docs] def fit(self, index, n_nodes, tau_matrix, previous_tree, edges=None):
"""Fit this tree object.
Args:
index (int):
index of the tree.
n_nodes (int):
number of nodes in the tree.
tau_matrix (numpy.array):
kendall's tau matrix of the data, shape (n_nodes, n_nodes).
previous_tree (Tree):
tree object of previous level.
"""
self.level = index + 1
self.n_nodes = n_nodes
self.tau_matrix = tau_matrix
self.previous_tree = previous_tree
self.edges = edges or []
if not self.edges:
if self.level == 1:
self.u_matrix = previous_tree
self._build_first_tree()
else:
self._build_kth_tree()
self.prepare_next_tree()
self.fitted = True
def _check_constraint(self, edge1, edge2):
"""Check if two edges satisfy vine constraint.
Args:
edge1 (Edge):
edge object representing edge1
edge2 (Edge):
edge object representing edge2
Returns:
bool:
True if the two edges satisfy vine constraints
"""
full_node = {edge1.L, edge1.R, edge2.L, edge2.R}
full_node.update(edge1.D)
full_node.update(edge2.D)
return len(full_node) == (self.level + 1)
def _get_constraints(self):
"""Get neighboring edges for each edge in the edges."""
num_edges = len(self.edges)
for k in range(num_edges):
for i in range(num_edges):
# add to constraints if i shared an edge with k
if k != i and self.edges[k].is_adjacent(self.edges[i]):
self.edges[k].neighbors.append(i)
def _sort_tau_by_y(self, y):
"""Sort tau matrix by dependece with variable y.
Args:
y (int):
index of variable of intrest
Returns:
numpy.ndarray:
sorted tau matrix.
"""
# first column is the variable of interest
tau_y = self.tau_matrix[:, y]
tau_y[y] = np.NaN
temp = np.empty([self.n_nodes, 3])
temp[:, 0] = np.arange(self.n_nodes)
temp[:, 1] = tau_y
temp[:, 2] = abs(tau_y)
temp[np.isnan(temp)] = -10
sort_temp = temp[:, 2].argsort()[::-1]
tau_sorted = temp[sort_temp]
return tau_sorted
[docs] def get_tau_matrix(self):
"""Get tau matrix for adjacent pairs.
Returns:
tau (numpy.ndarray):
tau matrix for the current tree
"""
num_edges = len(self.edges)
tau = np.empty([num_edges, num_edges])
for i in range(num_edges):
edge = self.edges[i]
for j in edge.neighbors:
if self.level == 1:
left_u = self.u_matrix[:, edge.L]
right_u = self.u_matrix[:, edge.R]
else:
left_parent, right_parent = edge.parents
left_u, right_u = Edge.get_conditional_uni(left_parent, right_parent)
tau[i, j], pvalue = scipy.stats.kendalltau(left_u, right_u)
return tau
[docs] def get_adjacent_matrix(self):
"""Get adjacency matrix.
Returns:
numpy.ndarray:
adjacency matrix
"""
edges = self.edges
num_edges = len(edges) + 1
adj = np.zeros([num_edges, num_edges])
for k in range(num_edges - 1):
adj[edges[k].L, edges[k].R] = 1
adj[edges[k].R, edges[k].L] = 1
return adj
[docs] def prepare_next_tree(self):
"""Prepare conditional U matrix for next tree."""
for edge in self.edges:
copula_theta = edge.theta
if self.level == 1:
left_u = self.u_matrix[:, edge.L]
right_u = self.u_matrix[:, edge.R]
else:
left_parent, right_parent = edge.parents
left_u, right_u = Edge.get_conditional_uni(left_parent, right_parent)
# compute conditional cdfs C(i|j) = dC(i,j)/duj and dC(i,j)/du
left_u = [x for x in left_u if x is not None]
right_u = [x for x in right_u if x is not None]
X_left_right = np.array([[x, y] for x, y in zip(left_u, right_u)])
X_right_left = np.array([[x, y] for x, y in zip(right_u, left_u)])
copula = Bivariate(copula_type=edge.name)
copula.theta = copula_theta
left_given_right = copula.partial_derivative(X_left_right)
right_given_left = copula.partial_derivative(X_right_left)
# correction of 0 or 1
left_given_right[left_given_right == 0] = EPSILON
right_given_left[right_given_left == 0] = EPSILON
left_given_right[left_given_right == 1] = 1 - EPSILON
right_given_left[right_given_left == 1] = 1 - EPSILON
edge.U = np.array([left_given_right, right_given_left])
[docs] def get_likelihood(self, uni_matrix):
"""Compute likelihood of the tree given an U matrix.
Args:
uni_matrix (numpy.array):
univariate matrix to evaluate likelihood on.
Returns:
tuple[float, numpy.array]:
likelihood of the current tree, next level conditional univariate matrix
"""
uni_dim = uni_matrix.shape[1]
num_edge = len(self.edges)
values = np.zeros([1, num_edge])
new_uni_matrix = np.empty([uni_dim, uni_dim])
for i in range(num_edge):
edge = self.edges[i]
value, left_u, right_u = edge.get_likelihood(uni_matrix)
new_uni_matrix[edge.L, edge.R] = left_u
new_uni_matrix[edge.R, edge.L] = right_u
values[0, i] = np.log(value)
return np.sum(values), new_uni_matrix
def __str__(self):
"""Produce printable representation of the class."""
template = 'L:{} R:{} D:{} Copula:{} Theta:{}'
return '\n'.join([
template.format(edge.L, edge.R, edge.D, edge.name, edge.theta)
for edge in self.edges
])
def _serialize_previous_tree(self):
if self.level == 1:
return self.previous_tree.tolist()
return None
@classmethod
def _deserialize_previous_tree(cls, tree_dict, previous):
if tree_dict['level'] == 1:
return np.array(tree_dict['previous_tree'])
return previous
[docs] def to_dict(self):
"""Return a `dict` with the parameters to replicate this Tree.
Returns:
dict:
Parameters of this Tree.
"""
fitted = self.fitted
result = {
'tree_type': self.tree_type,
'type': get_qualified_name(self),
'fitted': fitted
}
if not fitted:
return result
result.update({
'level': self.level,
'n_nodes': self.n_nodes,
'tau_matrix': self.tau_matrix.tolist(),
'previous_tree': self._serialize_previous_tree(),
'edges': [edge.to_dict() for edge in self.edges],
})
return result
[docs] @classmethod
def from_dict(cls, tree_dict, previous=None):
"""Create a new instance from a parameters dictionary.
Args:
params (dict):
Parameters of the Tree, in the same format as the one
returned by the ``to_dict`` method.
Returns:
Tree:
Instance of the tree defined on the parameters.
"""
instance = get_tree(tree_dict['tree_type'])
fitted = tree_dict['fitted']
instance.fitted = fitted
if fitted:
instance.level = tree_dict['level']
instance.n_nodes = tree_dict['n_nodes']
instance.tau_matrix = np.array(tree_dict['tau_matrix'])
instance.previous_tree = cls._deserialize_previous_tree(tree_dict, previous)
instance.edges = [Edge.from_dict(edge) for edge in tree_dict['edges']]
return instance
[docs]class CenterTree(Tree):
"""Tree for a C-vine copula."""
tree_type = TreeTypes.CENTER
def _build_first_tree(self):
"""Build first level tree."""
tau_sorted = self._sort_tau_by_y(0)
for itr in range(self.n_nodes - 1):
ind = int(tau_sorted[itr, 0])
copula = Bivariate.select_copula(self.u_matrix[:, (0, ind)])
name, theta = copula.copula_type, copula.theta
new_edge = Edge(itr, 0, ind, name, theta)
new_edge.tau = self.tau_matrix[0, ind]
self.edges.append(new_edge)
def _build_kth_tree(self):
"""Build k-th level tree."""
anchor = self.get_anchor()
aux_sorted = self._sort_tau_by_y(anchor)
edges = self.previous_tree.edges
for itr in range(self.n_nodes - 1):
right = int(aux_sorted[itr, 0])
left_parent, right_parent = Edge.sort_edge([edges[anchor], edges[right]])
new_edge = Edge.get_child_edge(itr, left_parent, right_parent)
new_edge.tau = aux_sorted[itr, 1]
self.edges.append(new_edge)
[docs] def get_anchor(self):
"""Find anchor variable with highest sum of dependence with the rest.
Returns:
int:
Anchor variable.
"""
temp = np.empty([self.n_nodes, 2])
temp[:, 0] = np.arange(self.n_nodes, dtype=int)
temp[:, 1] = np.sum(abs(self.tau_matrix), 1)
anchor = int(temp[0, 0])
return anchor
[docs]class DirectTree(Tree):
"""DirectTree class."""
tree_type = TreeTypes.DIRECT
def _build_first_tree(self):
# find the pair of maximum tau
tau_matrix = self.tau_matrix
tau_sorted = self._sort_tau_by_y(0)
left_ind = tau_sorted[0, 0]
right_ind = tau_sorted[1, 0]
T1 = np.array([left_ind, 0, right_ind]).astype(int)
tau_T1 = tau_sorted[:2, 1]
# replace tau matrix of the selected variables as a negative number
tau_matrix[:, [T1]] = -10
for k in range(2, self.n_nodes - 1):
left = np.argmax(tau_matrix[T1[0], :])
right = np.argmax(tau_matrix[T1[-1], :])
valL = np.max(tau_matrix[T1[0], :])
valR = np.max(tau_matrix[T1[-1], :])
if valL > valR:
# add nodes to the left
T1 = np.append(int(left), T1)
tau_T1 = np.append(valL, tau_T1)
tau_matrix[:, left] = -10
else:
# add node to the right
T1 = np.append(T1, int(right))
tau_T1 = np.append(tau_T1, valR)
tau_matrix[:, right] = -10
for k in range(self.n_nodes - 1):
copula = Bivariate.select_copula(self.u_matrix[:, (T1[k], T1[k + 1])])
name, theta = copula.copula_type, copula.theta
left, right = sorted([T1[k], T1[k + 1]])
new_edge = Edge(k, left, right, name, theta)
new_edge.tau = tau_T1[k]
self.edges.append(new_edge)
def _build_kth_tree(self):
edges = self.previous_tree.edges
for k in range(self.n_nodes - 1):
left_parent, right_parent = Edge.sort_edge([edges[k], edges[k + 1]])
new_edge = Edge.get_child_edge(k, left_parent, right_parent)
new_edge.tau = self.tau_matrix[k, k + 1]
self.edges.append(new_edge)
[docs]class RegularTree(Tree):
"""RegularTree class."""
tree_type = TreeTypes.REGULAR
def _build_first_tree(self):
"""Build the first tree with n-1 variable."""
# Prim's algorithm
neg_tau = -1.0 * abs(self.tau_matrix)
X = {0}
while len(X) != self.n_nodes:
adj_set = set()
for x in X:
for k in range(self.n_nodes):
if k not in X and k != x:
adj_set.add((x, k)) # noqa: PD005
# find edge with maximum
edge = sorted(adj_set, key=lambda e: neg_tau[e[0]][e[1]])[0]
copula = Bivariate.select_copula(self.u_matrix[:, (edge[0], edge[1])])
name, theta = copula.copula_type, copula.theta
left, right = sorted([edge[0], edge[1]])
new_edge = Edge(len(X) - 1, left, right, name, theta)
new_edge.tau = self.tau_matrix[edge[0], edge[1]]
self.edges.append(new_edge)
X.add(edge[1]) # noqa: PD005
def _build_kth_tree(self):
"""Build tree for level k."""
neg_tau = -1.0 * abs(self.tau_matrix)
edges = self.previous_tree.edges
visited = {0}
unvisited = set(range(self.n_nodes))
while len(visited) != self.n_nodes:
adj_set = set()
for x in visited:
for k in range(self.n_nodes):
# check if (x,k) is a valid edge in the vine
if k not in visited and k != x and self._check_constraint(edges[x], edges[k]):
adj_set.add((x, k)) # noqa: PD005
# find edge with maximum tau
if len(adj_set) == 0:
visited.add(list(unvisited)[0]) # noqa: PD005
continue
pairs = sorted(adj_set, key=lambda e: neg_tau[e[0]][e[1]])[0]
left_parent, right_parent = Edge.sort_edge([edges[pairs[0]], edges[pairs[1]]])
new_edge = Edge.get_child_edge(len(visited) - 1, left_parent, right_parent)
new_edge.tau = self.tau_matrix[pairs[0], pairs[1]]
self.edges.append(new_edge)
visited.add(pairs[1]) # noqa: PD005
unvisited.remove(pairs[1])
[docs]def get_tree(tree_type):
"""Get a Tree instance of the specified type.
Args:
tree_type (str or TreeTypes):
Type of tree of which to get an instance.
Returns:
Tree:
Instance of a Tree of the specified type.
"""
if not isinstance(tree_type, TreeTypes):
if (isinstance(tree_type, str) and tree_type.upper() in TreeTypes.__members__):
tree_type = TreeTypes[tree_type.upper()]
else:
raise ValueError(f'Invalid tree type {tree_type}')
if tree_type == TreeTypes.CENTER:
return CenterTree()
if tree_type == TreeTypes.REGULAR:
return RegularTree()
if tree_type == TreeTypes.DIRECT:
return DirectTree()
[docs]class Edge(object):
"""Represents an edge in the copula."""
def __init__(self, index, left, right, copula_name, copula_theta):
"""Initialize an Edge object.
Args:
left (int):
left_node index (smaller)
right (int):
right_node index (larger)
copula_name (str):
name of the fitted copula class
copula_theta (float):
parameters of the fitted copula class
"""
self.index = index
self.L = left
self.R = right
self.D = set() # dependence_set
self.parents = None
self.neighbors = []
self.name = copula_name
self.theta = copula_theta
self.tau = None
self.U = None
self.likelihood = None
@staticmethod
def _identify_eds_ing(first, second):
"""Find nodes connecting adjacent edges.
Args:
first (Edge):
Edge object representing the first edge.
second (Edge):
Edge object representing the second edge.
Returns:
tuple[int, int, set[int]]:
The first two values represent left and right node
indicies of the new edge. The third value is the new dependence set.
"""
A = {first.L, first.R}
A.update(first.D)
B = {second.L, second.R}
B.update(second.D)
depend_set = A & B
left, right = sorted(A ^ B)
return left, right, depend_set
[docs] def is_adjacent(self, another_edge):
"""Check if two edges are adjacent.
Args:
another_edge (Edge):
edge object of another edge
Returns:
bool:
True if the two edges are adjacent.
"""
return (
self.L == another_edge.L
or self.L == another_edge.R
or self.R == another_edge.L
or self.R == another_edge.R
)
[docs] @staticmethod
def sort_edge(edges):
"""Sort iterable of edges first by left node indices then right.
Args:
edges (list[Edge]):
List of edges to be sorted.
Returns:
list[Edge]:
Sorted list by left and right node indices.
"""
return sorted(edges, key=lambda x: (x.L, x.R))
[docs] @classmethod
def get_conditional_uni(cls, left_parent, right_parent):
"""Identify pair univariate value from parents.
Args:
left_parent (Edge):
left parent
right_parent (Edge):
right parent
Returns:
tuple[np.ndarray, np.ndarray]:
left and right parents univariate.
"""
left, right, _ = cls._identify_eds_ing(left_parent, right_parent)
left_u = left_parent.U[0] if left_parent.L == left else left_parent.U[1]
right_u = right_parent.U[0] if right_parent.L == right else right_parent.U[1]
return left_u, right_u
[docs] @classmethod
def get_child_edge(cls, index, left_parent, right_parent):
"""Construct a child edge from two parent edges.
Args:
index (int):
Index of the new Edge.
left_parent (Edge):
Left parent
right_parent (Edge):
Right parent
Returns:
Edge:
The new child edge.
"""
[ed1, ed2, depend_set] = cls._identify_eds_ing(left_parent, right_parent)
left_u, right_u = cls.get_conditional_uni(left_parent, right_parent)
X = np.array([[x, y] for x, y in zip(left_u, right_u)])
copula = Bivariate.select_copula(X)
name, theta = copula.copula_type, copula.theta
new_edge = Edge(index, ed1, ed2, name, theta)
new_edge.D = depend_set
new_edge.parents = [left_parent, right_parent]
return new_edge
[docs] def get_likelihood(self, uni_matrix):
"""Compute likelihood given a U matrix.
Args:
uni_matrix (numpy.array):
Matrix to compute the likelihood.
Return:
tuple (np.ndarray, np.ndarray, np.array):
likelihood and conditional values.
"""
if self.parents is None:
left_u = uni_matrix[:, self.L]
right_u = uni_matrix[:, self.R]
else:
left_ing = list(self.D - self.parents[0].D)[0]
right_ing = list(self.D - self.parents[1].D)[0]
left_u = uni_matrix[self.L, left_ing]
right_u = uni_matrix[self.R, right_ing]
copula = Bivariate(copula_type=self.name)
copula.theta = self.theta
X_left_right = np.array([[left_u, right_u]])
X_right_left = np.array([[right_u, left_u]])
value = np.sum(copula.probability_density(X_left_right))
left_given_right = copula.partial_derivative(X_left_right)
right_given_left = copula.partial_derivative(X_right_left)
return value, left_given_right, right_given_left
[docs] def to_dict(self):
"""Return a `dict` with the parameters to replicate this Edge.
Returns:
dict:
Parameters of this Edge.
"""
parents = None
if self.parents:
parents = [parent.to_dict() for parent in self.parents]
U = None
if self.U is not None:
U = self.U.tolist()
return {
'index': self.index,
'L': self.L,
'R': self.R,
'D': self.D,
'parents': parents,
'neighbors': self.neighbors,
'name': self.name,
'theta': self.theta,
'tau': self.tau,
'U': U,
'likelihood': self.likelihood
}
[docs] @classmethod
def from_dict(cls, edge_dict):
"""Create a new instance from a parameters dictionary.
Args:
params (dict):
Parameters of the Edge, in the same format as the one
returned by the ``to_dict`` method.
Returns:
Edge:
Instance of the edge defined on the parameters.
"""
instance = cls(
edge_dict['index'], edge_dict['L'], edge_dict['R'],
edge_dict['name'], edge_dict['theta']
)
instance.U = np.array(edge_dict['U'])
parents = edge_dict['parents']
if parents:
instance.parents = []
for parent in parents:
edge = Edge.from_dict(parent)
instance.parents.append(edge)
regular_attributes = ['D', 'tau', 'likelihood', 'neighbors']
for key in regular_attributes:
setattr(instance, key, edge_dict[key])
return instance