Source code for netrd.dynamics.branching_process

"""
branching_process.py
--------------------

Adapted from:
Levina, Anna, and Viola Priesemann. "Subsampling scaling."
Nature communications 8 (2017): 15140.
at [this link](https://www.nature.com/articles/ncomms15140)

author: Brennan Klein
email: brennanjamesklein at gmail dot com
submitted as part of the 2019 NetSI Collabathon
"""

from .base import BaseDynamics
import networkx as nx
import numpy as np


[docs]class BranchingModel(BaseDynamics): """A sand-pile-like branching process."""
[docs] def simulate( self, G, L, initial_fraction=0.1, m=0.9975, target_Ahat=0.2, distribution_type='unif', scale=0.95, noise=True, ): r"""Simulate a (sand-pile-like) branching process dynamics . The results dictionary also stores the ground truth network as `'ground_truth'`. Parameters ---------- G (nx.Graph) directed or undirected ground truth graph L (int) desired length of time series initial_fraction (float) fraction of nodes that start as active m (float) branching ratio of the dynamical process. :math:`m=1.0` means the system will be at criticality target_Ahat (float) desired average activity. This will ensure the process does not reach a stationary state and will always have some external drive. num_edges (int) the length of the cache, which should correspond to the combination of all possible activity over the simulation. distribution_type (str) string describing which type of random numbers scale (float) scale for how likely nodes are to topple noise (bool) add nonzero values to the time series Returns ------- TS (np.ndarray) an :math:`N \times L` time series References ---------- .. [1] Levina, Anna, and Viola Priesemann. "Subsampling scaling." Nature communications 8 (2017) 15140. https://www.nature.com/articles/ncomms15140 """ N = G.number_of_nodes() # number of nodes M = G.number_of_edges() # number of edges A = nx.to_numpy_array(G) # adjacency matrix W = np.zeros(A.shape) # transition probability matrix (for weights) for i in range(A.shape[0]): if A[i].sum() > 0: W[i] = A[i] / A[i].sum() Gw = nx.from_numpy_array(W) G = nx.to_directed(Gw) # convert back into a graph object TS = initialize_history(N, L, initial_fraction, m, target_Ahat, noise) # because there's noise added, dont want to get false positives new_activity_times = np.nonzero(np.round(TS[:, 1:].sum(axis=0), 3))[0] # store cache = initialize_threshold_cache(M * L, distribution_type, scale) # now run dynamics for t in range(L - 1): if t not in list(new_activity_times): current_state = TS[:, t] # because there's noise added, dont want to get false positives active_nodes = list(np.nonzero(np.round(current_state, 3))[0]) active_edges = G.out_edges(nbunch=active_nodes, data=True) if len(active_edges) != 0: current_targets = list(list(zip(*active_edges))[1]) weights_array = np.array([j[2]['weight'] for j in active_edges]) if len(cache) <= len(weights_array): cache = initialize_threshold_cache( M * L, distribution_type, scale ) # find edges with edges that will exceed the weights cache # and thus will successfully propagate the information over_the_threshold = weights_array > cache[: len(weights_array)] cache = cache[(len(weights_array) + 1) :] # update the cache next_active_units = np.unique( np.array(current_targets)[over_the_threshold] ) TS[next_active_units, t + 1] = 1 # save the ground-truth network to results self.results['ground_truth'] = G # save the timeseries data to results self.results['TS'] = TS return TS
def initialize_history(N, L, initial_fraction, m, target_Ahat, noise): """ Initializes the TS of this simulation based on a configuration of parameters corresponding to the initial_fraction of active nodes, the branching ratio, m, and the target number of avalanches. Parameters ---------- N (int): number of nodes L (int): desired length of time series initial_fraction (float): fraction of nodes that start as active m (float): branching ratio of the branching process target_Ahat (float): desired average activity. This will ensure the process does not reach a stationary state and will always have some external drive. noise (bool): add nonzero values to the time series Returns ------- TS_init (np.ndarray): an N x L time series with nonzero entries in the first column """ TS_init = np.zeros((N, L)) num_init = np.round(initial_fraction * N).astype('int') TS_init[np.random.permutation(N)[0:num_init], 0] = 1 # maybe also here initialize TS_init with external drives? if m != 1.0: if N > 1000: N_nodes = N else: N_nodes = 1000 h_vals = np.random.poisson(target_Ahat * N_nodes * np.abs(1 - m), L) else: if N > 100: N_nodes = N else: N_nodes = 100 h_vals = np.random.poisson(0.01, L) # h_vals = np.random.poisson(target_Ahat*N_nodes * 0.01, L) sum_h_vals = sum(h_vals) external_drive_timestamps = sorted(list(np.nonzero(h_vals)[0])) external_drive_activenodes = list(np.random.choice(N, sum_h_vals)) for timestamp in external_drive_timestamps: num_pops = h_vals[timestamp] active_nodes = [external_drive_activenodes.pop() for i in range(num_pops)] TS_init[active_nodes, timestamp] = 1 # or maybe equals 1? if noise: TS_init += np.random.uniform(-np.exp(-12), np.exp(-12), TS_init.shape) return TS_init def initialize_threshold_cache(num_edges, distribution_type='unif', scale=1.0): """ A cache of random numbers. This is useful for speed, as calling the numpy random number generator can get costly with large networks and time series. Parameters ---------- num_edges (int): the length of the cache, which should correspond to the combination of all possible activity over the simulation. distribution_type (str): string that describes which type of random numbers. scale (float): scale for how likely nodes are to topple Returns ------- edges (np.ndarray): a vector of probability thresholds, above which the node will topple and send information to the following node. """ if distribution_type == 'unif': edges = scale * np.random.rand(num_edges) return edges elif distribution_type == 'normal': edges = scale * np.random.randn(num_edges) return edges