Source code for netrd.reconstruction.free_energy_minimization

"""
free_energy_minimization.py
---------------------------
Reconstruction of graphs by minimizing a free energy of your data
author: Brennan Klein
email: brennanjamesklein at gmail dot com
submitted as part of the 2019 NetSI Collabathon
"""
from .base import BaseReconstructor
import numpy as np
from scipy import linalg
from ..utilities import create_graph, threshold


[docs]class FreeEnergyMinimization(BaseReconstructor): """Applies free energy principle."""
[docs] def fit(self, TS, threshold_type='degree', **kwargs): """Infer inter-node coupling weights by minimizing a free energy over the data structure. The results dictionary also stores the weight matrix as `'weights_matrix'` and the thresholded version of the weight matrix as `'thresholded_matrix'`. For details see [1]_. Parameters ---------- TS (np.ndarray) Array consisting of :math:`L` observations from :math.`N` sensors. threshold_type (str) Which thresholding function to use on the matrix of weights. See `netrd.utilities.threshold.py` for documentation. Pass additional arguments to the thresholder using ``**kwargs``. Returns ------- G (nx.Graph or nx.DiGraph) a reconstructed graph. References ---------- .. [1] https://github.com/nihcompmed/network-inference/blob/master/sphinx/codesource/inference.py """ N, L = np.shape(TS) # N nodes, length L m = np.mean(TS[:, :-1], axis=1) # model average ds = TS[:, :-1].T - m # discrepancy t1 = L - 1 # time limit # covariance of the discrepeancy c = np.cov(ds, rowvar=False, bias=True) c_inv = linalg.inv(c) # inverse dst = ds.T # discrepancy at time t # empty matrix to populate w/ inferred couplings W = np.empty((N, N)) nloop = 10000 # failsafe for i0 in range(N): # for each node TS1 = TS[i0, 1:] # take its entire time series h = TS1 # calculate the the local field cost = np.full(nloop, 100.0) for iloop in range(nloop): h_av = np.mean(h) # average local field hs_av = np.dot(dst, h - h_av) / t1 # deltaE_i delta\sigma_k w = np.dot(hs_av, c_inv) # expectation under model h = np.dot(TS[:, :-1].T, w[:]) # estimate of local field TS_model = np.tanh(h) # under kinetic Ising model # discrepancy cost cost[iloop] = np.mean((TS1[:] - TS_model[:]) ** 2) if cost[iloop] >= cost[iloop - 1]: break # if it increases, break # complicated, but this seems to be the estimate of W_i h *= np.divide( TS1, TS_model, out=np.ones_like(TS1), where=TS_model != 0 ) W[i0, :] = w[:] # threshold the network W_thresh = threshold(W, threshold_type, **kwargs) # construct the network self.results['graph'] = create_graph(W_thresh) self.results['weights_matrix'] = W self.results['thresholded_matrix'] = W_thresh G = self.results['graph'] return G