# Quickstart High-Performance Computing with pypercolate¶

The percolate.hpc module is an implementation of the Newman–Ziff algorithm for High-Performance Computing (HPC). Here we demonstrate how to compute the raw data for a finite-size scaling study of bond percolation on a square grid. For high-performance computing, we utilize the task-based parallelization framework Jug. Our simulation design is as follows:

For each system size L:
Prepare the graph of the current system size L
Prepare the graph for pypercolate and precompute auxiliary information
For each value p of the occupation probability:
Precompute the convolution coefficients f
Map-Reduce R runs:
Seed the Random Number Generator
For each occupation number n:
Compute the microcanonical cluster statistics
For each occupation probability p:
Convolve the microcanonical statistics into the canonical statistics
Write results to disk


A jugfile is a Python script to which Jug transparently adds parallelization magic.

## Imports¶

First, we import the packages needed.

# Python 2/3 compatibility
from __future__ import unicode_literals
# http://python-future.org/compatible_idioms.html#zip-izip
from builtins import map, zip
from future.utils import iteritems

from functools import reduce
import itertools

import numpy as np
import scipy.stats

from jug.utils import CustomHash
import jug.mapreduce

import percolate
import percolate.hpc


## Define input¶

Here we assign the input parameters of the simulation study.

# script parameters
SYSTEM_DIMENSIONS = [8, 16, 32]
PS = np.linspace(0.4, 0.6, num=40)
SPANNING_CLUSTER = True
UINT32_MAX = 4294967296
SEED = 201508061904 % UINT32_MAX
ALPHA = 2 * scipy.stats.norm.cdf(-1.0)  # 1 sigma


The SPANNING_CLUSTER variable defines whether the script should detect spanning clusters. The UINT32_MAX integer is the upper bound of the seeds. The SEED seeds the central random number generator which in turn seeds the task random number generators.

## Jug task to prepare the percolation graph and precomput the convolution factors¶

The following jug task creates the square grid graph of $$L \times L$$ nodes, where $$L$$ is the system size.

@TaskGenerator
def prepare_percolation_graph(dimension):
graph = percolate.spanning_2d_grid(length=dimension)
return percolate.percolate.percolation_graph(
graph=graph, spanning_cluster=SPANNING_CLUSTER
)


Next, the jugfile defines a task to precompute the convolution factors for a given occupation probability $$p$$:

@TaskGenerator
def compute_convolution_factors_for_single_p(perc_graph_result, p):
return percolate.percolate._binomial_pmf(
n=perc_graph_result['num_edges'],
p=p,
)


Every task samples the bond percolation setting with a number of runs $$N_{\text{runs}}$$. The following function implements a single run as described in the user guide. Note here that we carefully circumvent Jug’s automatic loading of task results: Instead of piping through the convolution factor tasks directly, we add a layer of indirectness and supply an iterator to these tasks. This way, Jug does not automatically load the task results, which would unnecessarily increase memory consumption and limit the system size. The inner loop convolves the microcanonical statistics for one occupation number after the other, carefully loading and unloading the respective convolution factor task results.

  1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 def bond_run(perc_graph_result, seed, ps, convolution_factors_tasks): """ Perform a single run (realization) over all microstates and return the canonical cluster statistics """ microcanonical_statistics = percolate.hpc.bond_microcanonical_statistics( seed=seed, **perc_graph_result ) # initialize statistics array canonical_statistics = np.empty( ps.size, dtype=percolate.hpc.canonical_statistics_dtype( spanning_cluster=SPANNING_CLUSTER, ) ) # loop over all p's and convolve canonical statistics # http://docs.scipy.org/doc/numpy/reference/arrays.nditer.html#modifying-array-values for row, convolution_factors_task in zip( np.nditer(canonical_statistics, op_flags=['writeonly']), convolution_factors_tasks, ): # load task result # http://jug.readthedocs.org/en/latest/api.html#jug.Task.load assert not convolution_factors_task.is_loaded() convolution_factors_task.load() # fetch task result my_convolution_factors = convolution_factors_task.result # convolve to canonical statistics row[...] = percolate.hpc.bond_canonical_statistics( microcanonical_statistics=microcanonical_statistics, convolution_factors=my_convolution_factors, spanning_cluster=SPANNING_CLUSTER, ) # explicitly unload task to save memory # http://jug.readthedocs.org/en/latest/api.html#jug.Task.unload convolution_factors_task.unload() # initialize canonical averages for reduce ret = percolate.hpc.bond_initialize_canonical_averages( canonical_statistics=canonical_statistics, spanning_cluster=SPANNING_CLUSTER, ) return ret 

Here is the definition of the actual task, each instance of which conducts a number of runs and reduces their results:

@TaskGenerator
):
"""
Perform a number of runs

The number of runs is the number of seeds

convolution_factors_tasks_iterator needs to be an iterator

We shield the convolution factors tasks from jug value/result mechanism
by supplying an iterator to the list of tasks for lazy evaluation
"""

# restore the list of convolution factors tasks

return reduce(
percolate.hpc.bond_reduce,
map(
bond_run,
itertools.repeat(perc_graph_result),
seeds,
itertools.repeat(ps),
)
)


## Write to disk¶

Another task is to write the finalized results for a system size to disk. We provide an example implementation here that writes to a HDF5 file:

@TaskGenerator
def write_to_disk(dimension, canonical_averages):
import h5py

# Read/write if exsits, create otherwise
f = h5py.File('percolate_hpc.hdf5', mode='a')

key = '{}'.format(dimension)

# dataset should not exist yet!
if key in f:
raise RuntimeError

f.create_dataset(
name=key,
data=canonical_averages,
)

f.close()


## Hashing the iterator¶

For supplying a task iterator, for technical reasons we need to supply a dummy hash function to Jug:

def dummy_hash(x):
"""
Supplies a constant dummy hash
"""
return 'dummy_hash'.encode('utf-8')


## The control flow¶

Now, we are in the position to implement the control flow:

  1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 convolution_factors = {} perc_graph_results = {} reduced_canonical_averages = {} final_canonical_averages = {} # initialize random number generator rng = np.random.RandomState(seed=SEED) # in general: # avoid premature optimization: design simulation for large system sizes # (small system sizes take much less time anyway) for dimension in SYSTEM_DIMENSIONS: # building the graph takes ca. 10s per 10^6 nodes ''' >>> import timeit >>> timeit.timeit( ... stmt='percolate.spanning_2d_grid(1000)', ... setup='import percolate', ... number=1 ... ) >>> ''' # So that is why we store every graph on disk (as a task) perc_graph_results[dimension] = prepare_percolation_graph(dimension) # computing the binomial PMF takes ca. 1s per 10^6 number of states * # number of p's ''' >>> import timeit >>> s = """\ ... [ ... percolate.percolate._binomial_pmf(n=1e4, p=p) ... for p in 100 * [0.5] ... ] >>> timeit.timeit(stmt=s, setup='import percolate', number=1) ''' # so store all binomial PMF (for every dimension, p) on disk convolution_factors[dimension] = [ compute_convolution_factors_for_single_p( perc_graph_result=perc_graph_results[dimension], p=p, ) for p in PS ] # finish all tasks up to here first before proceeding # http://jug.readthedocs.org/en/latest/barrier.html#barriers barrier() seeds = rng.randint(UINT32_MAX, size=NUMBER_OF_RUNS) # now process the actual tasks bond_tasks = list() for my_seeds in np.split(seeds, NUMBER_OF_TASKS): # pass an iterator to list of convolution factors tasks to circumvent # jug automatic fetching results of each task (iterator is not resolved # and hence we achieve lazy evaluation of tasks here) convolution_iterator = iter( convolution_factors[dimension] ) # iter cannot be pickled / hashed, so supply a dummy hash convolution_iterator_hashed = CustomHash( convolution_iterator, dummy_hash, ) bond_tasks.append( bond_task( perc_graph_result=perc_graph_results[dimension], seeds=my_seeds, ps=PS, convolution_factors_tasks_iterator=convolution_iterator_hashed, ) ) # reduce reduced_canonical_averages[dimension] = jug.mapreduce.reduce( reducer=percolate.hpc.bond_reduce, inputs=bond_tasks, reduce_step=100, ) # finalize canonical averages final_canonical_averages[dimension] = Task( percolate.hpc.finalize_canonical_averages, number_of_nodes=perc_graph_results[dimension]['num_nodes'], ps=PS, canonical_averages=reduced_canonical_averages[dimension], alpha=ALPHA, ) # write to disk write_to_disk( dimension=dimension, canonical_averages=final_canonical_averages[dimension], ) 

Note the use of Jug’s barrier command. It ensures that Jug finishes the computation of the percolation graph and all convolution factors, before proceeding with the actual bond tasks. Normally, Jug auto-detects such dependencies and transparently makes sure that all previous tasks a particular task depends on have been run. However, here we need to explicitly call barrier, as we circumvent this very mechanism. As explained earlier, this is to avoid the convolution factors of all occupation probabilities to be loaded into memory at the same time.

\$ jug-execute jugfile.py

  1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 # encoding: utf-8 """ A sample jugfile to use with Jug jug execute jugfile """ # Python 2/3 compatibility from __future__ import unicode_literals # http://python-future.org/compatible_idioms.html#zip-izip from builtins import map, zip from future.utils import iteritems from functools import reduce import itertools import numpy as np import scipy.stats from jug import Task, TaskGenerator, barrier from jug.utils import CustomHash import jug.mapreduce import percolate import percolate.hpc # script parameters SYSTEM_DIMENSIONS = [8, 16, 32] RUNS_PER_TASK = 100 NUMBER_OF_TASKS = 100 NUMBER_OF_RUNS = NUMBER_OF_TASKS * RUNS_PER_TASK PS = np.linspace(0.4, 0.6, num=40) SPANNING_CLUSTER = True UINT32_MAX = 4294967296 SEED = 201508061904 % UINT32_MAX ALPHA = 2 * scipy.stats.norm.cdf(-1.0) # 1 sigma @TaskGenerator def prepare_percolation_graph(dimension): graph = percolate.spanning_2d_grid(length=dimension) return percolate.percolate.percolation_graph( graph=graph, spanning_cluster=SPANNING_CLUSTER ) @TaskGenerator def compute_convolution_factors_for_single_p(perc_graph_result, p): return percolate.percolate._binomial_pmf( n=perc_graph_result['num_edges'], p=p, ) def bond_run(perc_graph_result, seed, ps, convolution_factors_tasks): """ Perform a single run (realization) over all microstates and return the canonical cluster statistics """ microcanonical_statistics = percolate.hpc.bond_microcanonical_statistics( seed=seed, **perc_graph_result ) # initialize statistics array canonical_statistics = np.empty( ps.size, dtype=percolate.hpc.canonical_statistics_dtype( spanning_cluster=SPANNING_CLUSTER, ) ) # loop over all p's and convolve canonical statistics # http://docs.scipy.org/doc/numpy/reference/arrays.nditer.html#modifying-array-values for row, convolution_factors_task in zip( np.nditer(canonical_statistics, op_flags=['writeonly']), convolution_factors_tasks, ): # load task result # http://jug.readthedocs.org/en/latest/api.html#jug.Task.load assert not convolution_factors_task.is_loaded() convolution_factors_task.load() # fetch task result my_convolution_factors = convolution_factors_task.result # convolve to canonical statistics row[...] = percolate.hpc.bond_canonical_statistics( microcanonical_statistics=microcanonical_statistics, convolution_factors=my_convolution_factors, spanning_cluster=SPANNING_CLUSTER, ) # explicitly unload task to save memory # http://jug.readthedocs.org/en/latest/api.html#jug.Task.unload convolution_factors_task.unload() # initialize canonical averages for reduce ret = percolate.hpc.bond_initialize_canonical_averages( canonical_statistics=canonical_statistics, spanning_cluster=SPANNING_CLUSTER, ) return ret @TaskGenerator def bond_task( perc_graph_result, seeds, ps, convolution_factors_tasks_iterator ): """ Perform a number of runs The number of runs is the number of seeds convolution_factors_tasks_iterator needs to be an iterator We shield the convolution factors tasks from jug value/result mechanism by supplying an iterator to the list of tasks for lazy evaluation http://github.com/luispedro/jug/blob/43f0d80a78f418fd3aa2b8705eaf7c4a5175fff7/jug/task.py#L100 http://github.com/luispedro/jug/blob/43f0d80a78f418fd3aa2b8705eaf7c4a5175fff7/jug/task.py#L455 """ # restore the list of convolution factors tasks convolution_factors_tasks = list(convolution_factors_tasks_iterator) return reduce( percolate.hpc.bond_reduce, map( bond_run, itertools.repeat(perc_graph_result), seeds, itertools.repeat(ps), itertools.repeat(convolution_factors_tasks), ) ) @TaskGenerator def write_to_disk(dimension, canonical_averages): import h5py # Read/write if exsits, create otherwise f = h5py.File('percolate_hpc.hdf5', mode='a') key = '{}'.format(dimension) # dataset should not exist yet! if key in f: raise RuntimeError f.create_dataset( name=key, data=canonical_averages, ) f.close() def dummy_hash(x): """ Supplies a constant dummy hash """ return 'dummy_hash'.encode('utf-8') convolution_factors = {} perc_graph_results = {} reduced_canonical_averages = {} final_canonical_averages = {} # initialize random number generator rng = np.random.RandomState(seed=SEED) # in general: # avoid premature optimization: design simulation for large system sizes # (small system sizes take much less time anyway) for dimension in SYSTEM_DIMENSIONS: # building the graph takes ca. 10s per 10^6 nodes ''' >>> import timeit >>> timeit.timeit( ... stmt='percolate.spanning_2d_grid(1000)', ... setup='import percolate', ... number=1 ... ) >>> ''' # So that is why we store every graph on disk (as a task) perc_graph_results[dimension] = prepare_percolation_graph(dimension) # computing the binomial PMF takes ca. 1s per 10^6 number of states * # number of p's ''' >>> import timeit >>> s = """\ ... [ ... percolate.percolate._binomial_pmf(n=1e4, p=p) ... for p in 100 * [0.5] ... ] >>> timeit.timeit(stmt=s, setup='import percolate', number=1) ''' # so store all binomial PMF (for every dimension, p) on disk convolution_factors[dimension] = [ compute_convolution_factors_for_single_p( perc_graph_result=perc_graph_results[dimension], p=p, ) for p in PS ] # finish all tasks up to here first before proceeding # http://jug.readthedocs.org/en/latest/barrier.html#barriers barrier() seeds = rng.randint(UINT32_MAX, size=NUMBER_OF_RUNS) # now process the actual tasks bond_tasks = list() for my_seeds in np.split(seeds, NUMBER_OF_TASKS): # pass an iterator to list of convolution factors tasks to circumvent # jug automatic fetching results of each task (iterator is not resolved # and hence we achieve lazy evaluation of tasks here) convolution_iterator = iter( convolution_factors[dimension] ) # iter cannot be pickled / hashed, so supply a dummy hash convolution_iterator_hashed = CustomHash( convolution_iterator, dummy_hash, ) bond_tasks.append( bond_task( perc_graph_result=perc_graph_results[dimension], seeds=my_seeds, ps=PS, convolution_factors_tasks_iterator=convolution_iterator_hashed, ) ) # reduce reduced_canonical_averages[dimension] = jug.mapreduce.reduce( reducer=percolate.hpc.bond_reduce, inputs=bond_tasks, reduce_step=100, ) # finalize canonical averages final_canonical_averages[dimension] = Task( percolate.hpc.finalize_canonical_averages, number_of_nodes=perc_graph_results[dimension]['num_nodes'], ps=PS, canonical_averages=reduced_canonical_averages[dimension], alpha=ALPHA, ) # write to disk write_to_disk( dimension=dimension, canonical_averages=final_canonical_averages[dimension], )