Source code for copulas.multivariate.tree

"""Multivariate trees module."""

import logging
from enum import Enum

import numpy as np
import scipy

from copulas.bivariate.base import Bivariate
from copulas.multivariate.base import Multivariate
from copulas.utils import EPSILON, get_qualified_name

LOGGER = logging.getLogger(__name__)


[docs] class TreeTypes(Enum): """The available types of trees.""" CENTER = 0 DIRECT = 1 REGULAR = 2
[docs] class Tree(Multivariate): """Helper class to instantiate a single tree in the vine model.""" tree_type = None fitted = False
[docs] def fit(self, index, n_nodes, tau_matrix, previous_tree, edges=None): """Fit this tree object. Args: index (int): index of the tree. n_nodes (int): number of nodes in the tree. tau_matrix (numpy.array): kendall's tau matrix of the data, shape (n_nodes, n_nodes). previous_tree (Tree): tree object of previous level. """ self.level = index + 1 self.n_nodes = n_nodes self.tau_matrix = tau_matrix self.previous_tree = previous_tree self.edges = edges or [] if not self.edges: if self.level == 1: self.u_matrix = previous_tree self._build_first_tree() else: self._build_kth_tree() self.prepare_next_tree() self.fitted = True
def _check_constraint(self, edge1, edge2): """Check if two edges satisfy vine constraint. Args: edge1 (Edge): edge object representing edge1 edge2 (Edge): edge object representing edge2 Returns: bool: True if the two edges satisfy vine constraints """ full_node = {edge1.L, edge1.R, edge2.L, edge2.R} full_node.update(edge1.D) full_node.update(edge2.D) return len(full_node) == (self.level + 1) def _get_constraints(self): """Get neighboring edges for each edge in the edges.""" num_edges = len(self.edges) for k in range(num_edges): for i in range(num_edges): # add to constraints if i shared an edge with k if k != i and self.edges[k].is_adjacent(self.edges[i]): self.edges[k].neighbors.append(i) def _sort_tau_by_y(self, y): """Sort tau matrix by dependece with variable y. Args: y (int): index of variable of intrest Returns: numpy.ndarray: sorted tau matrix. """ # first column is the variable of interest tau_y = self.tau_matrix[:, y] tau_y[y] = np.nan temp = np.empty([self.n_nodes, 3]) temp[:, 0] = np.arange(self.n_nodes) temp[:, 1] = tau_y temp[:, 2] = abs(tau_y) temp[np.isnan(temp)] = -10 sort_temp = temp[:, 2].argsort()[::-1] tau_sorted = temp[sort_temp] return tau_sorted
[docs] def get_tau_matrix(self): """Get tau matrix for adjacent pairs. Returns: tau (numpy.ndarray): tau matrix for the current tree """ num_edges = len(self.edges) tau = np.empty([num_edges, num_edges]) for i in range(num_edges): edge = self.edges[i] for j in edge.neighbors: if self.level == 1: left_u = self.u_matrix[:, edge.L] right_u = self.u_matrix[:, edge.R] else: left_parent, right_parent = edge.parents left_u, right_u = Edge.get_conditional_uni(left_parent, right_parent) tau[i, j], _pvalue = scipy.stats.kendalltau(left_u, right_u) return tau
[docs] def get_adjacent_matrix(self): """Get adjacency matrix. Returns: numpy.ndarray: adjacency matrix """ edges = self.edges num_edges = len(edges) + 1 adj = np.zeros([num_edges, num_edges]) for k in range(num_edges - 1): adj[edges[k].L, edges[k].R] = 1 adj[edges[k].R, edges[k].L] = 1 return adj
[docs] def prepare_next_tree(self): """Prepare conditional U matrix for next tree.""" for edge in self.edges: copula_theta = edge.theta if self.level == 1: left_u = self.u_matrix[:, edge.L] right_u = self.u_matrix[:, edge.R] else: left_parent, right_parent = edge.parents left_u, right_u = Edge.get_conditional_uni(left_parent, right_parent) # compute conditional cdfs C(i|j) = dC(i,j)/duj and dC(i,j)/du left_u = [x for x in left_u if x is not None] right_u = [x for x in right_u if x is not None] X_left_right = np.array([[x, y] for x, y in zip(left_u, right_u)]) X_right_left = np.array([[x, y] for x, y in zip(right_u, left_u)]) copula = Bivariate(copula_type=edge.name) copula.theta = copula_theta left_given_right = copula.partial_derivative(X_left_right) right_given_left = copula.partial_derivative(X_right_left) # correction of 0 or 1 left_given_right[left_given_right == 0] = EPSILON right_given_left[right_given_left == 0] = EPSILON left_given_right[left_given_right == 1] = 1 - EPSILON right_given_left[right_given_left == 1] = 1 - EPSILON edge.U = np.array([left_given_right, right_given_left])
[docs] def get_likelihood(self, uni_matrix): """Compute likelihood of the tree given an U matrix. Args: uni_matrix (numpy.array): univariate matrix to evaluate likelihood on. Returns: tuple[float, numpy.array]: likelihood of the current tree, next level conditional univariate matrix """ uni_dim = uni_matrix.shape[1] num_edge = len(self.edges) values = np.zeros([1, num_edge]) new_uni_matrix = np.empty([uni_dim, uni_dim]) for i in range(num_edge): edge = self.edges[i] value, left_u, right_u = edge.get_likelihood(uni_matrix) new_uni_matrix[edge.L, edge.R] = left_u.item() new_uni_matrix[edge.R, edge.L] = right_u.item() values[0, i] = np.log(value) return np.sum(values), new_uni_matrix
def __str__(self): """Produce printable representation of the class.""" template = 'L:{} R:{} D:{} Copula:{} Theta:{}' return '\n'.join([ template.format(edge.L, edge.R, edge.D, edge.name, edge.theta) for edge in self.edges ]) def _serialize_previous_tree(self): if self.level == 1: return self.previous_tree.tolist() return None @classmethod def _deserialize_previous_tree(cls, tree_dict, previous): if tree_dict['level'] == 1: return np.array(tree_dict['previous_tree']) return previous
[docs] def to_dict(self): """Return a `dict` with the parameters to replicate this Tree. Returns: dict: Parameters of this Tree. """ fitted = self.fitted result = {'tree_type': self.tree_type, 'type': get_qualified_name(self), 'fitted': fitted} if not fitted: return result result.update({ 'level': self.level, 'n_nodes': self.n_nodes, 'tau_matrix': self.tau_matrix.tolist(), 'previous_tree': self._serialize_previous_tree(), 'edges': [edge.to_dict() for edge in self.edges], }) return result
[docs] @classmethod def from_dict(cls, tree_dict, previous=None): """Create a new instance from a parameters dictionary. Args: params (dict): Parameters of the Tree, in the same format as the one returned by the ``to_dict`` method. Returns: Tree: Instance of the tree defined on the parameters. """ instance = get_tree(tree_dict['tree_type']) fitted = tree_dict['fitted'] instance.fitted = fitted if fitted: instance.level = tree_dict['level'] instance.n_nodes = tree_dict['n_nodes'] instance.tau_matrix = np.array(tree_dict['tau_matrix']) instance.previous_tree = cls._deserialize_previous_tree(tree_dict, previous) instance.edges = [Edge.from_dict(edge) for edge in tree_dict['edges']] return instance
[docs] class CenterTree(Tree): """Tree for a C-vine copula.""" tree_type = TreeTypes.CENTER def _build_first_tree(self): """Build first level tree.""" tau_sorted = self._sort_tau_by_y(0) for itr in range(self.n_nodes - 1): ind = int(tau_sorted[itr, 0]) copula = Bivariate.select_copula(self.u_matrix[:, (0, ind)]) name, theta = copula.copula_type, copula.theta new_edge = Edge(itr, 0, ind, name, theta) new_edge.tau = self.tau_matrix[0, ind] self.edges.append(new_edge) def _build_kth_tree(self): """Build k-th level tree.""" anchor = self.get_anchor() aux_sorted = self._sort_tau_by_y(anchor) edges = self.previous_tree.edges for itr in range(self.n_nodes - 1): right = int(aux_sorted[itr, 0]) left_parent, right_parent = Edge.sort_edge([edges[anchor], edges[right]]) new_edge = Edge.get_child_edge(itr, left_parent, right_parent) new_edge.tau = aux_sorted[itr, 1] self.edges.append(new_edge)
[docs] def get_anchor(self): """Find anchor variable with highest sum of dependence with the rest. Returns: int: Anchor variable. """ temp = np.empty([self.n_nodes, 2]) temp[:, 0] = np.arange(self.n_nodes, dtype=int) temp[:, 1] = np.sum(abs(self.tau_matrix), 1) anchor = int(temp[0, 0]) return anchor
[docs] class DirectTree(Tree): """DirectTree class.""" tree_type = TreeTypes.DIRECT def _build_first_tree(self): # find the pair of maximum tau tau_matrix = self.tau_matrix tau_sorted = self._sort_tau_by_y(0) left_ind = tau_sorted[0, 0] right_ind = tau_sorted[1, 0] T1 = np.array([left_ind, 0, right_ind]).astype(int) tau_T1 = tau_sorted[:2, 1] # replace tau matrix of the selected variables as a negative number tau_matrix[:, [T1]] = -10 for k in range(2, self.n_nodes - 1): left = np.argmax(tau_matrix[T1[0], :]) right = np.argmax(tau_matrix[T1[-1], :]) valL = np.max(tau_matrix[T1[0], :]) valR = np.max(tau_matrix[T1[-1], :]) if valL > valR: # add nodes to the left T1 = np.append(int(left), T1) tau_T1 = np.append(valL, tau_T1) tau_matrix[:, left] = -10 else: # add node to the right T1 = np.append(T1, int(right)) tau_T1 = np.append(tau_T1, valR) tau_matrix[:, right] = -10 for k in range(self.n_nodes - 1): copula = Bivariate.select_copula(self.u_matrix[:, (T1[k], T1[k + 1])]) name, theta = copula.copula_type, copula.theta left, right = sorted([T1[k], T1[k + 1]]) new_edge = Edge(k, left, right, name, theta) new_edge.tau = tau_T1[k] self.edges.append(new_edge) def _build_kth_tree(self): edges = self.previous_tree.edges for k in range(self.n_nodes - 1): left_parent, right_parent = Edge.sort_edge([edges[k], edges[k + 1]]) new_edge = Edge.get_child_edge(k, left_parent, right_parent) new_edge.tau = self.tau_matrix[k, k + 1] self.edges.append(new_edge)
[docs] class RegularTree(Tree): """RegularTree class.""" tree_type = TreeTypes.REGULAR def _build_first_tree(self): """Build the first tree with n-1 variable.""" # Prim's algorithm neg_tau = -1.0 * abs(self.tau_matrix) X = {0} while len(X) != self.n_nodes: adj_set = set() for x in X: for k in range(self.n_nodes): if k not in X and k != x: adj_set.add((x, k)) # noqa: PD005 # find edge with maximum edge = sorted(adj_set, key=lambda e: neg_tau[e[0]][e[1]])[0] copula = Bivariate.select_copula(self.u_matrix[:, (edge[0], edge[1])]) name, theta = copula.copula_type, copula.theta left, right = sorted([edge[0], edge[1]]) new_edge = Edge(len(X) - 1, left, right, name, theta) new_edge.tau = self.tau_matrix[edge[0], edge[1]] self.edges.append(new_edge) X.add(edge[1]) # noqa: PD005 def _build_kth_tree(self): """Build tree for level k.""" neg_tau = -1.0 * abs(self.tau_matrix) edges = self.previous_tree.edges visited = {0} unvisited = set(range(self.n_nodes)) while len(visited) != self.n_nodes: adj_set = set() for x in visited: for k in range(self.n_nodes): # check if (x,k) is a valid edge in the vine if k not in visited and k != x and self._check_constraint(edges[x], edges[k]): adj_set.add((x, k)) # noqa: PD005 # find edge with maximum tau if len(adj_set) == 0: visited.add(list(unvisited)[0]) # noqa: PD005 continue pairs = sorted(adj_set, key=lambda e: neg_tau[e[0]][e[1]])[0] left_parent, right_parent = Edge.sort_edge([edges[pairs[0]], edges[pairs[1]]]) new_edge = Edge.get_child_edge(len(visited) - 1, left_parent, right_parent) new_edge.tau = self.tau_matrix[pairs[0], pairs[1]] self.edges.append(new_edge) visited.add(pairs[1]) # noqa: PD005 unvisited.remove(pairs[1])
[docs] def get_tree(tree_type): """Get a Tree instance of the specified type. Args: tree_type (str or TreeTypes): Type of tree of which to get an instance. Returns: Tree: Instance of a Tree of the specified type. """ if not isinstance(tree_type, TreeTypes): if isinstance(tree_type, str) and tree_type.upper() in TreeTypes.__members__: tree_type = TreeTypes[tree_type.upper()] else: raise ValueError(f'Invalid tree type {tree_type}') if tree_type == TreeTypes.CENTER: return CenterTree() if tree_type == TreeTypes.REGULAR: return RegularTree() if tree_type == TreeTypes.DIRECT: return DirectTree()
[docs] class Edge(object): """Represents an edge in the copula.""" def __init__(self, index, left, right, copula_name, copula_theta): """Initialize an Edge object. Args: left (int): left_node index (smaller) right (int): right_node index (larger) copula_name (str): name of the fitted copula class copula_theta (float): parameters of the fitted copula class """ self.index = index self.L = left self.R = right self.D = set() # dependence_set self.parents = None self.neighbors = [] self.name = copula_name self.theta = copula_theta self.tau = None self.U = None self.likelihood = None @staticmethod def _identify_eds_ing(first, second): """Find nodes connecting adjacent edges. Args: first (Edge): Edge object representing the first edge. second (Edge): Edge object representing the second edge. Returns: tuple[int, int, set[int]]: The first two values represent left and right node indicies of the new edge. The third value is the new dependence set. """ A = {first.L, first.R} A.update(first.D) B = {second.L, second.R} B.update(second.D) depend_set = A & B left, right = sorted(A ^ B) return left, right, depend_set
[docs] def is_adjacent(self, another_edge): """Check if two edges are adjacent. Args: another_edge (Edge): edge object of another edge Returns: bool: True if the two edges are adjacent. """ return ( self.L == another_edge.L or self.L == another_edge.R or self.R == another_edge.L or self.R == another_edge.R )
[docs] @staticmethod def sort_edge(edges): """Sort iterable of edges first by left node indices then right. Args: edges (list[Edge]): List of edges to be sorted. Returns: list[Edge]: Sorted list by left and right node indices. """ return sorted(edges, key=lambda x: (x.L, x.R))
[docs] @classmethod def get_conditional_uni(cls, left_parent, right_parent): """Identify pair univariate value from parents. Args: left_parent (Edge): left parent right_parent (Edge): right parent Returns: tuple[np.ndarray, np.ndarray]: left and right parents univariate. """ left, right, _ = cls._identify_eds_ing(left_parent, right_parent) left_u = left_parent.U[0] if left_parent.L == left else left_parent.U[1] right_u = right_parent.U[0] if right_parent.L == right else right_parent.U[1] return left_u, right_u
[docs] @classmethod def get_child_edge(cls, index, left_parent, right_parent): """Construct a child edge from two parent edges. Args: index (int): Index of the new Edge. left_parent (Edge): Left parent right_parent (Edge): Right parent Returns: Edge: The new child edge. """ [ed1, ed2, depend_set] = cls._identify_eds_ing(left_parent, right_parent) left_u, right_u = cls.get_conditional_uni(left_parent, right_parent) X = np.array([[x, y] for x, y in zip(left_u, right_u)]) copula = Bivariate.select_copula(X) name, theta = copula.copula_type, copula.theta new_edge = Edge(index, ed1, ed2, name, theta) new_edge.D = depend_set new_edge.parents = [left_parent, right_parent] return new_edge
[docs] def get_likelihood(self, uni_matrix): """Compute likelihood given a U matrix. Args: uni_matrix (numpy.array): Matrix to compute the likelihood. Return: tuple (np.ndarray, np.ndarray, np.array): likelihood and conditional values. """ if self.parents is None: left_u = uni_matrix[:, self.L] right_u = uni_matrix[:, self.R] else: left_ing = list(self.D - self.parents[0].D)[0] right_ing = list(self.D - self.parents[1].D)[0] left_u = uni_matrix[self.L, left_ing] right_u = uni_matrix[self.R, right_ing] copula = Bivariate(copula_type=self.name) copula.theta = self.theta X_left_right = np.array([[left_u, right_u]]) X_right_left = np.array([[right_u, left_u]]) value = np.sum(copula.probability_density(X_left_right)) left_given_right = copula.partial_derivative(X_left_right) right_given_left = copula.partial_derivative(X_right_left) return value, left_given_right, right_given_left
[docs] def to_dict(self): """Return a `dict` with the parameters to replicate this Edge. Returns: dict: Parameters of this Edge. """ parents = None if self.parents: parents = [parent.to_dict() for parent in self.parents] U = None if self.U is not None: U = self.U.tolist() return { 'index': self.index, 'L': self.L, 'R': self.R, 'D': self.D, 'parents': parents, 'neighbors': self.neighbors, 'name': self.name, 'theta': self.theta, 'tau': self.tau, 'U': U, 'likelihood': self.likelihood, }
[docs] @classmethod def from_dict(cls, edge_dict): """Create a new instance from a parameters dictionary. Args: params (dict): Parameters of the Edge, in the same format as the one returned by the ``to_dict`` method. Returns: Edge: Instance of the edge defined on the parameters. """ instance = cls( edge_dict['index'], edge_dict['L'], edge_dict['R'], edge_dict['name'], edge_dict['theta'], ) instance.U = np.array(edge_dict['U']) parents = edge_dict['parents'] if parents: instance.parents = [] for parent in parents: edge = Edge.from_dict(parent) instance.parents.append(edge) regular_attributes = ['D', 'tau', 'likelihood', 'neighbors'] for key in regular_attributes: setattr(instance, key, edge_dict[key]) return instance