Source code for netrd.reconstruction.convergent_cross_mapping


Graph reconstruction algorithm from time series data based on
Sugihara et al., Detecting Causality in Complex Ecosystems, Science (2012)
DOI: 10.1126/science.1227079

author: Chia-Hung Yang and Dina Mistry
Submitted as part of the 2019 NetSI Collabathon.

from .base import BaseReconstructor
import numpy as np
from itertools import permutations
from scipy.stats import pearsonr
import networkx as nx
from sklearn.neighbors import NearestNeighbors
from ..utilities import create_graph, threshold

[docs]class ConvergentCrossMapping(BaseReconstructor): """Infers dynamical causal relations."""
[docs] def fit( self, TS, tau=1, threshold_type='range', cutoffs=[(0.95, np.inf)], **kwargs ): r"""Infer causal relation applying Takens' Theorem of dynamical systems. Convergent cross-mapping infers dynamical causal relation between vairiables from time series data. Time series data portray an attractor manifold of the dynamical system of interest. Existing approaches of attractor reconstruction involved building the shadow manifold for a single variable :math:`X`, which is defined by the time-lagged vectors :math:`(X(t), X(t-\tau), X(t-2\tau), ..., X(t-(N-1)\tau))` where :math:`N` denotes number of variables in the system and :math:`\tau` is an arbitrary time- lagged interval. Takens' theorem and its generalization indicated that if a variable :math:`X` is causally influencing another variable :math:`Y` in a dynamical system, then there exists a one-to-one and local-structure- preserving mapping from :math:`X`'s shadow manifold to :math:`Y`'s. The convergent cross-mapping algorithm first constructs the shadow data cloud (a portray of the shadow manifold) for every variable, and it then examines the mutual predictability for all pairs of them. Specifically, for a point :math:`u(t)` in :math:`X`'s shadow data cloud, the time indices :math:`\{t_1, t_2, ..., t_{N+1}\}` of its :math:`N+1` nearest neighbors are obtained, which are mapped to a neighborhood in :math:`Y`'s shadow data cloud :math:`\{v(t_1), v(t_2), ..., v(t_{N+1})\}`. The estimate :math:`\hat{Y}(t)` is computed as an average over this neighborhood, with weights decaying exponentially with corresponding distance in :math:`X`'s shadow data cloud. The algorithm concludes a causal link from :math:`X` to :math:`Y` if correlation between :math:`Y` and :math:`\hat{Y}` is significant. Furthermore, Sugihara et al.'s simulation suggested that the correlation converging to 1 as the length of time series data grows is a necessary condition for :math:`X` causally affects :math:`Y` in a deterministic dynamical system. The convergent cross-mapping approach is thus numerically validated to infer causal relation from time series data. The results dictionary also includes the raw Pearson correlations between elements (`'correlation_matrix'`), their associated p-values (`'pvalues_matrix'`), and a matrix of the negative logarithm of the p-values (`'weights_matrix'`). Parameters ---------- TS (np.ndarray) :math:`N \times L` array consisting of :math:`L` observations from :math:`N` sensors. tau (int) Number of time steps for a single time-lag. Returns ------- G (nx.Graph) A reconstructed graph with :math:`N` nodes. Notes ----- 1. The length of time series data must be long enough such that :math:`L \geq 3 + (N-1)(1+\tau)`. 2. The :math:`(i,j)`-th entry of the correlation matrix entails the correlation between the :math:`j`-th variable and its estimate from the :math:`i`-th variable. A similar rule applies to the p-value matrix. 3. The computation complexity of this implementation is :math:`O(N^3 L)`. References ---------- .. [1] Sugihara et al., Detecting Causality in Complex Ecosystems, Science (2012) DOI: 10.1126/science.1227079 """ data = TS.T # Transpose the time series to make observations the rows L, N = data.shape # Raise error if there is not enough data to run the implementation if L < 3 + (N - 1) * (1 + tau): message = 'Need more data.' message += ' L must be not less than 3+(N-1)*(1+tau).' raise ValueError(message) # Create shadow data cloud for each variable shadows = [shadow_data_cloud(data[:, i], N, tau) for i in range(N)] # Obtain nearest neighbors of points in the shadow data clould and # their weights for time series estimates neighbors, distances = zip(*[nearest_neighbors(shad, L) for shad in shadows]) nei_weights = [neighbor_weights(dist) for dist in distances] # For every variable X and every other variable Y, # construct the estimates of Y from X's shadow data cloud and # compute the Pearson correlation between Y and its estimates # along with the p-value correlation = np.ones((N, N), dtype=float) pvalue = np.zeros((N, N), dtype=float) for i, j in permutations(range(N), 2): estimates = time_series_estimates(data[:, j], neighbors[i], nei_weights[i]) (M,) = estimates.shape correlation[i, j], pvalue[i, j] = pearsonr(estimates, data[-M:, j]) # Build the reconstructed graph by finding significantly correlated # variables, # where weights of reconstructed edges are selected such that we can # sort zero p-values in decreasing order and tell edges with zero p-value weights = np.full(pvalue.shape, np.inf) weights[pvalue > 0] = -np.log10(pvalue[pvalue > 0]) A = threshold(weights, threshold_type, cutoffs=cutoffs, **kwargs) G = create_graph(A, create_using=nx.DiGraph()) # Save the graph object, matrices of correlation and p-values into the # "results" field (dictionary) self.results['correlation_matrix'] = correlation self.results['pvalues_matrix'] = pvalue self.results['weights_matrix'] = weights self.results['thresholded_matrix'] = A self.results['graph'] = G return G
def shadow_data_cloud(data, N, tau): """Return the lagged-vector data cloud of a given variable's time series. Parameters ---------- data (np.ndarray): Length-:math:`L` 1D array of a single variable's times series. N (int): Number of variables. tau (int): Number of time steps for a single time-lag. Returns ------- shadow (np.ndarray): :math:`M \times N` array of the lagged-vector data cloud, where :math:`M = L - (N-1) * \tau` is the number of points in the data cloud. Notes ----- Given the data :math:`(x_1(t), x_2(t), ..., x_N(t)), t = 1, 2, ..., L`, the :math:`M` lagged vectors for the :math:`i`th variable are defined as :math:`(x_i(t), x_i(t-tau), x_i(t-2*tau), ..., x_i(t-(N-1)*tau))` for :math:`t = (N-1)*tau + 1, (N-1)*tau + 2, ..., L`. """ (L,) = data.shape M = L - (N - 1) * tau # Number of points in the shadow data cloud shadow = np.zeros((M, N), dtype=data.dtype) for j in reversed(range(N)): # Fill in column values from the right delta = (N - 1 - j) * tau # Amount of time-lag for this column shadow[:, j] = data[delta : delta + M] return shadow def nearest_neighbors(shadow, L): """ Return time indices of the N+1 nearest neighbors for every point in the shadow data cloud and their corresponding Euclidean distances. Parameters ---------- shadow (np.ndarray): Array of the shadow data cloud. L (int): Number of observations in the time series. Returns ------- nei (np.ndarray): :math:`M \times (N+1)` array of time indices of nearest neighbors where :math:`M` is the number of points in the shadow data cloud. dist (np.ndarray): :math:`M \times (N+1)` array of corresponding Euclidean distance between the data point to the neighbors. Notes ----- 1. The nearest neighbors are ordered in increasing order of distances. 2. If the length of time series is at a larger scale than the number of variables, specifically N + 2 < M/2, the implementation finds nearest neighbors using the Ball Tree data structure; otherwise the brute force method is applied. """ M, N = shadow.shape k = N + 2 # Number of nearest neighbors to be found, including itself method = 'ball_tree' if k < M / 2 else 'brute' nbrs = NearestNeighbors(n_neighbors=k, algorithm=method).fit(shadow) dist, nei = nbrs.kneighbors(shadow) # Remove the first column for both arrays, which stands for the data points # themselves nei = np.delete(nei, 0, axis=1) dist = np.delete(dist, 0, axis=1) # Modify the neighbors' indices to their time indices nei += L - M return nei, dist def neighbor_weights(dist): """Return the weights of neighbors in time seires estimates. Parameters ---------- dist (np.ndarray): :math:`M \times (N+1)` array of Euclidean distances between a point to its nearest neighbors in the shadow data cloud (sorted by increasing order of distances), where :math:`M` is the number of points in the shadow data cloud. Returns ------- wei (np.ndarray): :math:`M \times (N+1)` array of exponentially decaying weights of the nearest neighbors. Notes ----- For every point :math:`u(t)` in the shadow data cloud, based on its Euclidean distance to the nearest neighbors :math:`d(u(t), u(t_k))`, :math:`l = 1, 2, ..., N+1` (sorted by increasing order of distances), the weight of neighbor :math:`k` is :math:`w_k = f_k / \sum_{l=1}^{N+1} f_l` where :math:`f_k = e^{-\[ d(u(t), u(t_k)) / d(u(t), u(t_1)) \]}`. """ # For shadow data points where the nearest distance among the neighbors is # zero, let the weight be one for the nearest neighbor and zero otherwise wei = np.zeros(dist.shape) wei[:, 0] = 1 # For those where the nearest distance is positive, assign weights that are # exponentially decaying with the ratio to the nearest distance pos_nearest_dist = dist[:, 0] > 0 dist_pnd = dist[pos_nearest_dist, :] wei[pos_nearest_dist, :] = np.exp(-dist_pnd / dist_pnd[:, 0, np.newaxis]) wei = wei / wei.sum(axis=1, keepdims=True) return wei def time_series_estimates(data_y, nei_x, wei_x): """Return estimates of variable :math:`Y` from variable :math:`X`'s shadow data cloud. Parameters ---------- data_y (np.ndarray): 1D array of variable :math:`Y`'s time series data. nei_x (np.ndarray): :math:`M \times (N+1)` array of time indices of nearest neighbors in :math:`X`'s shadow data cloud, where :math:`M` is the number of points in the shadow data cloud. wei_x (np.ndarray): Array of corresponding weights of the nearest neighbors in :math:`X`'s shadow data cloud. Returns ------- ests (np.ndarray): Length-:math:`M` 1D array of estimates of :math:`Y`'s time series. Notes ----- Let :math:`t_1, t_2, ..., t_{N+1}` be the time indices of nearest neighbor of point :math:`t` in :math:`X`'s shadow data cloud. Its corresponding estimates of :math:`Y` is :math:`\hat{Y}(t) = \sum_{k=1}^{N+1} w(t_k) Y(t_k)`, where :math:`w`s are weights of nearest neighbors. """ ests = (data_y[nei_x] * wei_x).sum(axis=1) return ests