"""
free_energy_minimization.py
---------------------------
Reconstruction of graphs by minimizing a free energy of your data
author: Brennan Klein
email: brennanjamesklein at gmail dot com
submitted as part of the 2019 NetSI Collabathon
"""
from .base import BaseReconstructor
import numpy as np
from scipy import linalg
from ..utilities import create_graph, threshold
[docs]class FreeEnergyMinimization(BaseReconstructor):
"""Applies free energy principle."""
[docs] def fit(self, TS, threshold_type='degree', **kwargs):
"""Infer inter-node coupling weights by minimizing a free energy over the
data structure.
The results dictionary also stores the weight matrix as
`'weights_matrix'` and the thresholded version of the weight matrix
as `'thresholded_matrix'`. For details see [1]_.
Parameters
----------
TS (np.ndarray)
Array consisting of :math:`L` observations from :math.`N`
sensors.
threshold_type (str)
Which thresholding function to use on the matrix of
weights. See `netrd.utilities.threshold.py` for
documentation. Pass additional arguments to the thresholder
using ``**kwargs``.
Returns
-------
G (nx.Graph or nx.DiGraph)
a reconstructed graph.
References
----------
.. [1] https://github.com/nihcompmed/network-inference/blob/master/sphinx/codesource/inference.py
"""
N, L = np.shape(TS) # N nodes, length L
m = np.mean(TS[:, :-1], axis=1) # model average
ds = TS[:, :-1].T - m # discrepancy
t1 = L - 1 # time limit
# covariance of the discrepeancy
c = np.cov(ds, rowvar=False, bias=True)
c_inv = linalg.inv(c) # inverse
dst = ds.T # discrepancy at time t
# empty matrix to populate w/ inferred couplings
W = np.empty((N, N))
nloop = 10000 # failsafe
for i0 in range(N): # for each node
TS1 = TS[i0, 1:] # take its entire time series
h = TS1 # calculate the the local field
cost = np.full(nloop, 100.0)
for iloop in range(nloop):
h_av = np.mean(h) # average local field
hs_av = np.dot(dst, h - h_av) / t1 # deltaE_i delta\sigma_k
w = np.dot(hs_av, c_inv) # expectation under model
h = np.dot(TS[:, :-1].T, w[:]) # estimate of local field
TS_model = np.tanh(h) # under kinetic Ising model
# discrepancy cost
cost[iloop] = np.mean((TS1[:] - TS_model[:]) ** 2)
if cost[iloop] >= cost[iloop - 1]:
break # if it increases, break
# complicated, but this seems to be the estimate of W_i
h *= np.divide(
TS1, TS_model, out=np.ones_like(TS1), where=TS_model != 0
)
W[i0, :] = w[:]
# threshold the network
W_thresh = threshold(W, threshold_type, **kwargs)
# construct the network
self.results['graph'] = create_graph(W_thresh)
self.results['weights_matrix'] = W
self.results['thresholded_matrix'] = W_thresh
G = self.results['graph']
return G