Source code for percolate.hpc

# encoding: utf-8

"""
Low-level routines to implement the Newman-Ziff algorithm for HPC

See also
--------

percolate : The high-level module
"""

from __future__ import (absolute_import, division,
                        print_function, unicode_literals)
from future.builtins import dict, next, range

import numpy as np
import networkx as nx
import scipy.stats
import simoa


def _ndarray_dtype(fields):
    """
    Return the NumPy structured array data type

    Helper function
    """
    return [
        (np.str_(key), values)
        for key, values in fields
    ]


[docs]def microcanonical_statistics_dtype(spanning_cluster=True): """ Return the numpy structured array data type for sample states Helper function Parameters ---------- spanning_cluster : bool, optional Whether to detect a spanning cluster or not. Defaults to ``True``. Returns ------- ret : list of pairs of str A list of tuples of field names and data types to be used as ``dtype`` argument in numpy ndarray constructors See Also -------- http://docs.scipy.org/doc/numpy/user/basics.rec.html canonical_statistics_dtype """ fields = list() fields.extend([ ('n', 'uint32'), ('edge', 'uint32'), ]) if spanning_cluster: fields.extend([ ('has_spanning_cluster', 'bool'), ]) fields.extend([ ('max_cluster_size', 'uint32'), ('moments', '(5,)uint64'), ]) return _ndarray_dtype(fields)
[docs]def bond_sample_states( perc_graph, num_nodes, num_edges, seed, spanning_cluster=True, auxiliary_node_attributes=None, auxiliary_edge_attributes=None, spanning_sides=None, **kwargs ): ''' Generate successive sample states of the bond percolation model This is a :ref:`generator function <python:tut-generators>` to successively add one edge at a time from the graph to the percolation model. At each iteration, it calculates and returns the cluster statistics. CAUTION: it returns a reference to the internal array, not a copy. Parameters ---------- perc_graph : networkx.Graph The substrate graph on which percolation is to take place num_nodes : int Number ``N`` of sites in the graph num_edges : int Number ``M`` of bonds in the graph seed : {None, int, array_like} Random seed initializing the pseudo-random number generator. Piped through to `numpy.random.RandomState` constructor. spanning_cluster : bool, optional Whether to detect a spanning cluster or not. Defaults to ``True``. auxiliary_node_attributes : optional Return value of ``networkx.get_node_attributes(graph, 'span')`` auxiliary_edge_attributes : optional Return value of ``networkx.get_edge_attributes(graph, 'span')`` spanning_sides : list, optional List of keys (attribute values) of the two sides of the auxiliary nodes. Return value of ``list(set(auxiliary_node_attributes.values()))`` Yields ------ ret : ndarray Structured array with dtype ``dtype=[('has_spanning_cluster', 'bool'), ('max_cluster_size', 'uint32'), ('moments', 'int64', 5)]`` ret['n'] : ndarray of int The number of bonds added at the particular iteration ret['edge'] : ndarray of int The index of the edge added at the particular iteration Note that in the first step, when ``ret['n'] == 0``, this value is undefined! ret['has_spanning_cluster'] : ndarray of bool ``True`` if there is a spanning cluster, ``False`` otherwise. Only exists if `spanning_cluster` argument is set to ``True``. ret['max_cluster_size'] : int Size of the largest cluster (absolute number of sites) ret['moments'] : 1-D :py:class:`numpy.ndarray` of int Array of size ``5``. The ``k``-th entry is the ``k``-th raw moment of the (absolute) cluster size distribution, with ``k`` ranging from ``0`` to ``4``. Raises ------ ValueError If `spanning_cluster` is ``True``, but `graph` does not contain any auxiliary nodes to detect spanning clusters. See also -------- numpy.random.RandomState microcanonical_statistics_dtype Notes ----- Iterating through this generator is a single run of the Newman-Ziff algorithm. [12]_ The first iteration yields the trivial state with :math:`n = 0` occupied bonds. Spanning cluster In order to detect a spanning cluster, `graph` needs to contain auxiliary nodes and edges, cf. Reference [12]_, Figure 6. The auxiliary nodes and edges have the ``'span'`` `attribute <http://networkx.github.io/documentation/latest/tutorial/tutorial.html#node-attributes>`_. The value is either ``0`` or ``1``, distinguishing the two sides of the graph to span. Raw moments of the cluster size distribution The :math:`k`-th raw moment of the (absolute) cluster size distribution is :math:`\sum_s' s^k N_s`, where :math:`s` is the cluster size and :math:`N_s` is the number of clusters of size :math:`s`. [13]_ The primed sum :math:`\sum'` signifies that the largest cluster is excluded from the sum. [14]_ References ---------- .. [12] Newman, M. E. J. & Ziff, R. M. Fast monte carlo algorithm for site or bond percolation. Physical Review E 64, 016706+ (2001), `doi:10.1103/physreve.64.016706 <http://dx.doi.org/10.1103/physreve.64.016706>`_. .. [13] Stauffer, D. & Aharony, A. Introduction to Percolation Theory (Taylor & Francis, London, 1994), second edn. .. [14] Binder, K. & Heermann, D. W. Monte Carlo Simulation in Statistical Physics (Springer, Berlin, Heidelberg, 2010), `doi:10.1007/978-3-642-03163-2 <http://dx.doi.org/10.1007/978-3-642-03163-2>`_. ''' # construct random number generator rng = np.random.RandomState(seed=seed) if spanning_cluster: if len(spanning_sides) != 2: raise ValueError( 'Spanning cluster is to be detected, but auxiliary nodes ' 'of less or more than 2 types (sides) given.' ) # get a list of edges for easy access in later iterations perc_edges = perc_graph.edges() perm_edges = rng.permutation(num_edges) # initial iteration: no edges added yet (n == 0) ret = np.empty( 1, dtype=microcanonical_statistics_dtype(spanning_cluster) ) ret['n'] = 0 ret['max_cluster_size'] = 1 ret['moments'] = np.ones(5, dtype='uint64') * (num_nodes - 1) if spanning_cluster: ret['has_spanning_cluster'] = False # yield cluster statistics for n == 0 yield ret # set up disjoint set (union-find) data structure ds = nx.utils.union_find.UnionFind() if spanning_cluster: ds_spanning = nx.utils.union_find.UnionFind() # merge all auxiliary nodes for each side side_roots = dict() for side in spanning_sides: nodes = [ node for (node, node_side) in auxiliary_node_attributes.items() if node_side is side ] ds_spanning.union(*nodes) side_roots[side] = ds_spanning[nodes[0]] for (edge, edge_side) in auxiliary_edge_attributes.items(): ds_spanning.union(side_roots[edge_side], *edge) side_roots = [ ds_spanning[side_root] for side_root in side_roots.values() ] # get first node max_cluster_root = next(perc_graph.nodes_iter()) # loop over all edges (n == 1..M) for n in range(num_edges): ret['n'] += 1 # draw new edge from permutation edge_index = perm_edges[n] edge = perc_edges[edge_index] ret['edge'] = edge_index # find roots and weights roots = [ ds[node] for node in edge ] weights = [ ds.weights[root] for root in roots ] if roots[0] is not roots[1]: # not same cluster: union! ds.union(*roots) if spanning_cluster: ds_spanning.union(*roots) ret['has_spanning_cluster'] = ( ds_spanning[side_roots[0]] == ds_spanning[side_roots[1]] ) # find new root and weight root = ds[edge[0]] weight = ds.weights[root] # moments and maximum cluster size # deduct the previous sub-maximum clusters from moments for i in [0, 1]: if roots[i] is max_cluster_root: continue ret['moments'] -= weights[i] ** np.arange(5, dtype='uint64') if max_cluster_root in roots: # merged with maximum cluster max_cluster_root = root ret['max_cluster_size'] = weight else: # merged previously sub-maximum clusters if ret['max_cluster_size'] >= weight: # previously largest cluster remains largest cluster # add merged cluster to moments ret['moments'] += weight ** np.arange(5, dtype='uint64') else: # merged cluster overtook previously largest cluster # add previously largest cluster to moments max_cluster_root = root ret['moments'] += ret['max_cluster_size'] ** np.arange( 5, dtype='uint64' ) ret['max_cluster_size'] = weight yield ret
[docs]def bond_microcanonical_statistics( perc_graph, num_nodes, num_edges, seed, spanning_cluster=True, auxiliary_node_attributes=None, auxiliary_edge_attributes=None, spanning_sides=None, **kwargs ): """ Evolve a single run over all microstates (bond occupation numbers) Return the cluster statistics for each microstate Parameters ---------- perc_graph : networkx.Graph The substrate graph on which percolation is to take place num_nodes : int Number ``N`` of sites in the graph num_edges : int Number ``M`` of bonds in the graph seed : {None, int, array_like} Random seed initializing the pseudo-random number generator. Piped through to `numpy.random.RandomState` constructor. spanning_cluster : bool, optional Whether to detect a spanning cluster or not. Defaults to ``True``. auxiliary_node_attributes : optional Value of ``networkx.get_node_attributes(graph, 'span')`` auxiliary_edge_attributes : optional Value of ``networkx.get_edge_attributes(graph, 'span')`` spanning_sides : list, optional List of keys (attribute values) of the two sides of the auxiliary nodes. Return value of ``list(set(auxiliary_node_attributes.values()))`` Returns ------- ret : ndarray of size ``num_edges + 1`` Structured array with dtype ``dtype=[('has_spanning_cluster', 'bool'), ('max_cluster_size', 'uint32'), ('moments', 'uint64', 5)]`` ret['n'] : ndarray of int The number of bonds added at the particular iteration ret['edge'] : ndarray of int The index of the edge added at the particular iteration. Note that ``ret['edge'][0]`` is undefined! ret['has_spanning_cluster'] : ndarray of bool ``True`` if there is a spanning cluster, ``False`` otherwise. Only exists if `spanning_cluster` argument is set to ``True``. ret['max_cluster_size'] : int Size of the largest cluster (absolute number of sites) ret['moments'] : 2-D :py:class:`numpy.ndarray` of int Array of shape ``(num_edges + 1, 5)``. The ``k``-th entry is the ``k``-th raw moment of the (absolute) cluster size distribution, with ``k`` ranging from ``0`` to ``4``. See also -------- bond_sample_states microcanonical_statistics_dtype numpy.random.RandomState """ # initialize generator sample_states = bond_sample_states( perc_graph=perc_graph, num_nodes=num_nodes, num_edges=num_edges, seed=seed, spanning_cluster=spanning_cluster, auxiliary_node_attributes=auxiliary_node_attributes, auxiliary_edge_attributes=auxiliary_edge_attributes, spanning_sides=spanning_sides, ) # get cluster statistics over all microstates return np.fromiter( sample_states, dtype=microcanonical_statistics_dtype(spanning_cluster), count=num_edges + 1 )
[docs]def canonical_statistics_dtype(spanning_cluster=True): """ The NumPy Structured Array type for canonical statistics Helper function Parameters ---------- spanning_cluster : bool, optional Whether to detect a spanning cluster or not. Defaults to ``True``. Returns ------- ret : list of pairs of str A list of tuples of field names and data types to be used as ``dtype`` argument in numpy ndarray constructors See Also -------- http://docs.scipy.org/doc/numpy/user/basics.rec.html microcanoncial_statistics_dtype canonical_averages_dtype """ fields = list() if spanning_cluster: fields.extend([ ('percolation_probability', 'float64'), ]) fields.extend([ ('max_cluster_size', 'float64'), ('moments', '(5,)float64'), ]) return _ndarray_dtype(fields)
[docs]def bond_canonical_statistics( microcanonical_statistics, convolution_factors, **kwargs ): """ canonical cluster statistics for a single run and a single probability Parameters ---------- microcanonical_statistics : ndarray Return value of `bond_microcanonical_statistics` convolution_factors : 1-D array_like The coefficients of the convolution for the given probabilty ``p`` and for each occupation number ``n``. Returns ------- ret : ndarray of size ``1`` Structured array with dtype as returned by `canonical_statistics_dtype` ret['percolation_probability'] : ndarray of float The "percolation probability" of this run at the value of ``p``. Only exists if `microcanonical_statistics` argument has the ``has_spanning_cluster`` field. ret['max_cluster_size'] : ndarray of int Weighted size of the largest cluster (absolute number of sites) ret['moments'] : 1-D :py:class:`numpy.ndarray` of float Array of size ``5``. The ``k``-th entry is the weighted ``k``-th raw moment of the (absolute) cluster size distribution, with ``k`` ranging from ``0`` to ``4``. See Also -------- bond_microcanonical_statistics canonical_statistics_dtype """ # initialize return array spanning_cluster = ( 'has_spanning_cluster' in microcanonical_statistics.dtype.names ) ret = np.empty(1, dtype=canonical_statistics_dtype(spanning_cluster)) # compute percolation probability if spanning_cluster: ret['percolation_probability'] = np.sum( convolution_factors * microcanonical_statistics['has_spanning_cluster'] ) # convolve maximum cluster size ret['max_cluster_size'] = np.sum( convolution_factors * microcanonical_statistics['max_cluster_size'] ) # convolve moments ret['moments'] = np.sum( convolution_factors[:, np.newaxis] * microcanonical_statistics['moments'], axis=0, ) # return convolved cluster statistics return ret
[docs]def canonical_averages_dtype(spanning_cluster=True): """ The NumPy Structured Array type for canonical averages over several runs Helper function Parameters ---------- spanning_cluster : bool, optional Whether to detect a spanning cluster or not. Defaults to ``True``. Returns ------- ret : list of pairs of str A list of tuples of field names and data types to be used as ``dtype`` argument in numpy ndarray constructors See Also -------- http://docs.scipy.org/doc/numpy/user/basics.rec.html canonical_statistics_dtype finalized_canonical_averages_dtype """ fields = list() fields.extend([ ('number_of_runs', 'uint32'), ]) if spanning_cluster: fields.extend([ ('percolation_probability_mean', 'float64'), ('percolation_probability_m2', 'float64'), ]) fields.extend([ ('max_cluster_size_mean', 'float64'), ('max_cluster_size_m2', 'float64'), ('moments_mean', '(5,)float64'), ('moments_m2', '(5,)float64'), ]) return _ndarray_dtype(fields)
[docs]def bond_initialize_canonical_averages( canonical_statistics, **kwargs ): """ Initialize the canonical averages from a single-run cluster statistics Parameters ---------- canonical_statistics : 1-D structured ndarray Typically contains the canonical statistics for a range of values of the occupation probability ``p``. The dtype is the result of `canonical_statistics_dtype`. Returns ------- ret : structured ndarray The dype is the result of `canonical_averages_dtype`. ret['number_of_runs'] : 1-D ndarray of int Equals ``1`` (initial run). ret['percolation_probability_mean'] : 1-D array of float Equals ``canonical_statistics['percolation_probability']`` (if ``percolation_probability`` is present) ret['percolation_probability_m2'] : 1-D array of float Each entry is ``0.0`` ret['max_cluster_size_mean'] : 1-D array of float Equals ``canonical_statistics['max_cluster_size']`` ret['max_cluster_size_m2'] : 1-D array of float Each entry is ``0.0`` ret['moments_mean'] : 2-D array of float Equals ``canonical_statistics['moments']`` ret['moments_m2'] : 2-D array of float Each entry is ``0.0`` See Also -------- canonical_averages_dtype bond_canonical_statistics """ # initialize return array spanning_cluster = ( 'percolation_probability' in canonical_statistics.dtype.names ) # array should have the same size as the input array ret = np.empty_like( canonical_statistics, dtype=canonical_averages_dtype(spanning_cluster=spanning_cluster), ) ret['number_of_runs'] = 1 # initialize percolation probability mean and sum of squared differences if spanning_cluster: ret['percolation_probability_mean'] = ( canonical_statistics['percolation_probability'] ) ret['percolation_probability_m2'] = 0.0 # initialize maximum cluster size mean and sum of squared differences ret['max_cluster_size_mean'] = ( canonical_statistics['max_cluster_size'] ) ret['max_cluster_size_m2'] = 0.0 # initialize moments means and sums of squared differences ret['moments_mean'] = canonical_statistics['moments'] ret['moments_m2'] = 0.0 return ret
[docs]def bond_reduce(row_a, row_b): """ Reduce the canonical averages over several runs This is a "true" reducer. It is associative and commutative. This is a wrapper around `simoa.stats.online_variance`. Parameters ---------- row_a, row_b : structured ndarrays Output of this function, or initial input from `bond_initialize_canonical_averages` Returns ------- ret : structured ndarray Array is of dtype as returned by `canonical_averages_dtype` See Also -------- bond_initialize_canonical_averages canonical_averages_dtype simoa.stats.online_variance """ spanning_cluster = ( 'percolation_probability_mean' in row_a.dtype.names and 'percolation_probability_mean' in row_b.dtype.names and 'percolation_probability_m2' in row_a.dtype.names and 'percolation_probability_m2' in row_b.dtype.names ) # initialize return array ret = np.empty_like(row_a) def _reducer(key, transpose=False): mean_key = '{}_mean'.format(key) m2_key = '{}_m2'.format(key) res = simoa.stats.online_variance(*[ ( row['number_of_runs'], row[mean_key].T if transpose else row[mean_key], row[m2_key].T if transpose else row[m2_key], ) for row in [row_a, row_b] ]) ( ret[mean_key], ret[m2_key], ) = ( res[1].T, res[2].T, ) if transpose else res[1:] if spanning_cluster: _reducer('percolation_probability') _reducer('max_cluster_size') _reducer('moments', transpose=True) ret['number_of_runs'] = row_a['number_of_runs'] + row_b['number_of_runs'] return ret
[docs]def finalized_canonical_averages_dtype(spanning_cluster=True): """ The NumPy Structured Array type for finalized canonical averages over several runs Helper function Parameters ---------- spanning_cluster : bool, optional Whether to detect a spanning cluster or not. Defaults to ``True``. Returns ------- ret : list of pairs of str A list of tuples of field names and data types to be used as ``dtype`` argument in numpy ndarray constructors See Also -------- http://docs.scipy.org/doc/numpy/user/basics.rec.html canonical_averages_dtype """ fields = list() fields.extend([ ('number_of_runs', 'uint32'), ('p', 'float64'), ('alpha', 'float64'), ]) if spanning_cluster: fields.extend([ ('percolation_probability_mean', 'float64'), ('percolation_probability_std', 'float64'), ('percolation_probability_ci', '(2,)float64'), ]) fields.extend([ ('percolation_strength_mean', 'float64'), ('percolation_strength_std', 'float64'), ('percolation_strength_ci', '(2,)float64'), ('moments_mean', '(5,)float64'), ('moments_std', '(5,)float64'), ('moments_ci', '(5,2)float64'), ]) return _ndarray_dtype(fields)
[docs]def finalize_canonical_averages( number_of_nodes, ps, canonical_averages, alpha, ): """ Finalize canonical averages """ spanning_cluster = ( ( 'percolation_probability_mean' in canonical_averages.dtype.names ) and 'percolation_probability_m2' in canonical_averages.dtype.names ) # append values of p as an additional field ret = np.empty_like( canonical_averages, dtype=finalized_canonical_averages_dtype( spanning_cluster=spanning_cluster ), ) n = canonical_averages['number_of_runs'] sqrt_n = np.sqrt(canonical_averages['number_of_runs']) ret['number_of_runs'] = n ret['p'] = ps ret['alpha'] = alpha def _transform( original_key, final_key=None, normalize=False, transpose=False, ): if final_key is None: final_key = original_key keys_mean = [ '{}_mean'.format(key) for key in [original_key, final_key] ] keys_std = [ '{}_m2'.format(original_key), '{}_std'.format(final_key), ] key_ci = '{}_ci'.format(final_key) # calculate sample mean ret[keys_mean[1]] = canonical_averages[keys_mean[0]] if normalize: ret[keys_mean[1]] /= number_of_nodes # calculate sample standard deviation array = canonical_averages[keys_std[0]] result = np.sqrt( (array.T if transpose else array) / (n - 1) ) ret[keys_std[1]] = ( result.T if transpose else result ) if normalize: ret[keys_std[1]] /= number_of_nodes # calculate standard normal confidence interval array = ret[keys_std[1]] scale = (array.T if transpose else array) / sqrt_n array = ret[keys_mean[1]] mean = (array.T if transpose else array) result = scipy.stats.t.interval( 1 - alpha, df=n - 1, loc=mean, scale=scale, ) ( ret[key_ci][..., 0], ret[key_ci][..., 1] ) = ([my_array.T for my_array in result] if transpose else result) if spanning_cluster: _transform('percolation_probability') _transform('max_cluster_size', 'percolation_strength', normalize=True) _transform('moments', normalize=True, transpose=True) return ret