"""
partial_correlation_matrix.py
---------------------
Reconstruction of graphs using the partial correlation matrix.
author: Stefan McCabe
email: stefanmccabe at gmail dot com
Submitted as part of the 2019 NetSI Collabathon
"""
from .base import BaseReconstructor
import numpy as np
from scipy import stats, linalg
from ..utilities import create_graph, threshold
[docs]class PartialCorrelationMatrix(BaseReconstructor):
"""Uses a regularized form of the precision matrix."""
[docs] def fit(
self,
TS,
index=None,
drop_index=True,
of_residuals=False,
threshold_type="range",
**kwargs
):
"""Uses a regularized form of the precision matrix.
The results dictionary also stores the weight matrix as
`'weights_matrix'` and the thresholded version of the weight matrix
as `'thresholded_matrix'`. For details see [1]_.
Parameters
----------
index (int, array of ints, or None)
Take the partial correlations of each pair of elements holding
constant an index variable or set of index variables. If None,
take the partial correlations of the variables holding constant
all other variables.
drop_index (bool)
If True, drop the index variables after calculating the partial
correlations.
of_residuals (bool)
If True, after calculating the partial correlations (presumably
using a dropped index variable), recalculate the partial
correlations between each variable, holding constant all other
variables.
threshold_type (str)
Which thresholding function to use on the matrix of
weights. See `netrd.utilities.threshold.py` for
documentation. Pass additional arguments to the thresholder
using ``**kwargs``.
Returns
-------
G (nx.Graph)
a reconstructed graph.
References
----------
.. [1] https://bwlewis.github.io/correlation-regularization/
"""
p_cor = partial_corr(TS, index=index)
if drop_index and index is not None:
p_cor = np.delete(p_cor, index, axis=0)
p_cor = np.delete(p_cor, index, axis=1)
if of_residuals:
p_cor = partial_corr(p_cor, index=None)
self.results["weights_matrix"] = p_cor
# threshold the network
W_thresh = threshold(p_cor, threshold_type, **kwargs)
# construct the network
self.results["graph"] = create_graph(W_thresh)
self.results["thresholded_matrix"] = W_thresh
G = self.results["graph"]
return G
# This partial correlation function is adapted from Fabian Pedregosa-Izquierdo's
# implementation of partial correlation in Python, found at [this gist](
# https://gist.github.com/fabianp/9396204419c7b638d38f)
"""
Partial Correlation in Python (clone of Matlab's partialcorr)
This uses the linear regression approach to compute the partial
correlation (might be slow for a huge number of variables). The
algorithm is detailed here:
http://en.wikipedia.org/wiki/Partial_correlation#Using_linear_regression
Taking X and Y two variables of interest and Z the matrix with all the variable minus {X, Y},
the algorithm can be summarized as
1) perform a normal linear least-squares regression with X as the target and Z as the predictor
2) calculate the residuals in Step #1
3) perform a normal linear least-squares regression with Y as the target and Z as the predictor
4) calculate the residuals in Step #3
5) calculate the correlation coefficient between the residuals from Steps #2 and #4;
The result is the partial correlation between X and Y while controlling for the effect of Z
Date: Nov 2014
Author: Fabian Pedregosa-Izquierdo, f@bianp.net
Testing: Valentina Borghesani, valentinaborghesani@gmail.com
"""
def partial_corr(C, index=None):
"""Returns the sample linear partial correlation coefficients between pairs of
variables in C, controlling for the remaining variables in C.
Parameters
--------------
C : array-like, shape (p, n)
Array with the different variables. Each row of C is taken as a variable
Returns -------
P : array-like, shape (p, p)
P[i, j] contains the partial correlation of C[:, i] and C[:, j]
controlling for the remaining variables in C.
"""
C = np.asarray(C).T
p = C.shape[1]
P_corr = np.zeros((p, p), dtype=np.float64)
for i in range(p):
P_corr[i, i] = 1
for j in range(i + 1, p):
if index is None:
idx = np.ones(p, dtype=bool)
idx[i] = False
idx[j] = False
elif type(index) is int or (
isinstance(index, np.ndarray) and issubclass(index.dtype.type, np.int_)
):
idx = np.zeros(p, dtype=bool)
idx[index] = True
else:
raise ValueError(
"Index must be an integer, an array of " "integers, or None."
)
beta_i = linalg.lstsq(C[:, idx], C[:, j])[0]
beta_j = linalg.lstsq(C[:, idx], C[:, i])[0]
res_j = C[:, j] - C[:, idx].dot(beta_i)
res_i = C[:, i] - C[:, idx].dot(beta_j)
corr = stats.pearsonr(res_i, res_j)[0]
P_corr[i, j] = corr
P_corr[j, i] = corr
return P_corr