Quickstart High-Performance Computing with pypercolate¶
The percolate.hpc
module is an implementation of the Newman–Ziff algorithm for High-Performance Computing (HPC).
Here we demonstrate how to compute the raw data for a finite-size scaling study of bond percolation on a square grid.
For high-performance computing, we utilize the task-based parallelization framework Jug.
Our simulation design is as follows:
For each system size L:
Prepare the graph of the current system size L
Prepare the graph for pypercolate and precompute auxiliary information
For each value p of the occupation probability:
Precompute the convolution coefficients f
Map-Reduce T tasks:
Map-Reduce R runs:
Seed the Random Number Generator
For each occupation number n:
Compute the microcanonical cluster statistics
For each occupation probability p:
Convolve the microcanonical statistics into the canonical statistics
Finalize tasks results
Write results to disk
A jugfile is a Python script to which Jug transparently adds parallelization magic.
Imports¶
First, we import the packages needed.
# Python 2/3 compatibility
from __future__ import unicode_literals
# http://python-future.org/compatible_idioms.html#zip-izip
from builtins import map, zip
from future.utils import iteritems
from functools import reduce
import itertools
import numpy as np
import scipy.stats
from jug import Task, TaskGenerator, barrier
from jug.utils import CustomHash
import jug.mapreduce
import percolate
import percolate.hpc
Define input¶
Here we assign the input parameters of the simulation study.
# script parameters
SYSTEM_DIMENSIONS = [8, 16, 32]
RUNS_PER_TASK = 100
NUMBER_OF_TASKS = 100
NUMBER_OF_RUNS = NUMBER_OF_TASKS * RUNS_PER_TASK
PS = np.linspace(0.4, 0.6, num=40)
SPANNING_CLUSTER = True
UINT32_MAX = 4294967296
SEED = 201508061904 % UINT32_MAX
ALPHA = 2 * scipy.stats.norm.cdf(-1.0) # 1 sigma
The SPANNING_CLUSTER
variable defines whether the script should detect spanning clusters.
The UINT32_MAX
integer is the upper bound of the seeds.
The SEED
seeds the central random number generator which in turn seeds the task random number generators.
Jug task to prepare the percolation graph and precomput the convolution factors¶
The following jug task creates the square grid graph of \(L \times L\) nodes, where \(L\) is the system size.
@TaskGenerator
def prepare_percolation_graph(dimension):
graph = percolate.spanning_2d_grid(length=dimension)
return percolate.percolate.percolation_graph(
graph=graph, spanning_cluster=SPANNING_CLUSTER
)
Next, the jugfile defines a task to precompute the convolution factors for a given occupation probability \(p\):
@TaskGenerator
def compute_convolution_factors_for_single_p(perc_graph_result, p):
return percolate.percolate._binomial_pmf(
n=perc_graph_result['num_edges'],
p=p,
)
An elementary run (sub-task)¶
Every task samples the bond percolation setting with a number of runs \(N_{\text{runs}}\). The following function implements a single run as described in the user guide. Note here that we carefully circumvent Jug’s automatic loading of task results: Instead of piping through the convolution factor tasks directly, we add a layer of indirectness and supply an iterator to these tasks. This way, Jug does not automatically load the task results, which would unnecessarily increase memory consumption and limit the system size. The inner loop convolves the microcanonical statistics for one occupation number after the other, carefully loading and unloading the respective convolution factor task results.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 | def bond_run(perc_graph_result, seed, ps, convolution_factors_tasks):
"""
Perform a single run (realization) over all microstates and return the
canonical cluster statistics
"""
microcanonical_statistics = percolate.hpc.bond_microcanonical_statistics(
seed=seed, **perc_graph_result
)
# initialize statistics array
canonical_statistics = np.empty(
ps.size,
dtype=percolate.hpc.canonical_statistics_dtype(
spanning_cluster=SPANNING_CLUSTER,
)
)
# loop over all p's and convolve canonical statistics
# http://docs.scipy.org/doc/numpy/reference/arrays.nditer.html#modifying-array-values
for row, convolution_factors_task in zip(
np.nditer(canonical_statistics, op_flags=['writeonly']),
convolution_factors_tasks,
):
# load task result
# http://jug.readthedocs.org/en/latest/api.html#jug.Task.load
assert not convolution_factors_task.is_loaded()
convolution_factors_task.load()
# fetch task result
my_convolution_factors = convolution_factors_task.result
# convolve to canonical statistics
row[...] = percolate.hpc.bond_canonical_statistics(
microcanonical_statistics=microcanonical_statistics,
convolution_factors=my_convolution_factors,
spanning_cluster=SPANNING_CLUSTER,
)
# explicitly unload task to save memory
# http://jug.readthedocs.org/en/latest/api.html#jug.Task.unload
convolution_factors_task.unload()
# initialize canonical averages for reduce
ret = percolate.hpc.bond_initialize_canonical_averages(
canonical_statistics=canonical_statistics,
spanning_cluster=SPANNING_CLUSTER,
)
return ret
|
A simple task¶
Here is the definition of the actual task, each instance of which conducts a number of runs and reduces their results:
@TaskGenerator
def bond_task(
perc_graph_result, seeds, ps, convolution_factors_tasks_iterator
):
"""
Perform a number of runs
The number of runs is the number of seeds
convolution_factors_tasks_iterator needs to be an iterator
We shield the convolution factors tasks from jug value/result mechanism
by supplying an iterator to the list of tasks for lazy evaluation
http://github.com/luispedro/jug/blob/43f0d80a78f418fd3aa2b8705eaf7c4a5175fff7/jug/task.py#L100
http://github.com/luispedro/jug/blob/43f0d80a78f418fd3aa2b8705eaf7c4a5175fff7/jug/task.py#L455
"""
# restore the list of convolution factors tasks
convolution_factors_tasks = list(convolution_factors_tasks_iterator)
return reduce(
percolate.hpc.bond_reduce,
map(
bond_run,
itertools.repeat(perc_graph_result),
seeds,
itertools.repeat(ps),
itertools.repeat(convolution_factors_tasks),
)
)
Write to disk¶
Another task is to write the finalized results for a system size to disk. We provide an example implementation here that writes to a HDF5 file:
@TaskGenerator
def write_to_disk(dimension, canonical_averages):
import h5py
# Read/write if exsits, create otherwise
f = h5py.File('percolate_hpc.hdf5', mode='a')
key = '{}'.format(dimension)
# dataset should not exist yet!
if key in f:
raise RuntimeError
f.create_dataset(
name=key,
data=canonical_averages,
)
f.close()
Hashing the iterator¶
For supplying a task iterator, for technical reasons we need to supply a dummy hash function to Jug:
def dummy_hash(x):
"""
Supplies a constant dummy hash
"""
return 'dummy_hash'.encode('utf-8')
The control flow¶
Now, we are in the position to implement the control flow:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 | convolution_factors = {}
perc_graph_results = {}
reduced_canonical_averages = {}
final_canonical_averages = {}
# initialize random number generator
rng = np.random.RandomState(seed=SEED)
# in general:
# avoid premature optimization: design simulation for large system sizes
# (small system sizes take much less time anyway)
for dimension in SYSTEM_DIMENSIONS:
# building the graph takes ca. 10s per 10^6 nodes
'''
>>> import timeit
>>> timeit.timeit(
... stmt='percolate.spanning_2d_grid(1000)',
... setup='import percolate',
... number=1
... )
>>>
'''
# So that is why we store every graph on disk (as a task)
perc_graph_results[dimension] = prepare_percolation_graph(dimension)
# computing the binomial PMF takes ca. 1s per 10^6 number of states *
# number of p's
'''
>>> import timeit
>>> s = """\
... [
... percolate.percolate._binomial_pmf(n=1e4, p=p)
... for p in 100 * [0.5]
... ]
>>> timeit.timeit(stmt=s, setup='import percolate', number=1)
'''
# so store all binomial PMF (for every dimension, p) on disk
convolution_factors[dimension] = [
compute_convolution_factors_for_single_p(
perc_graph_result=perc_graph_results[dimension], p=p,
)
for p in PS
]
# finish all tasks up to here first before proceeding
# http://jug.readthedocs.org/en/latest/barrier.html#barriers
barrier()
seeds = rng.randint(UINT32_MAX, size=NUMBER_OF_RUNS)
# now process the actual tasks
bond_tasks = list()
for my_seeds in np.split(seeds, NUMBER_OF_TASKS):
# pass an iterator to list of convolution factors tasks to circumvent
# jug automatic fetching results of each task (iterator is not resolved
# and hence we achieve lazy evaluation of tasks here)
convolution_iterator = iter(
convolution_factors[dimension]
)
# iter cannot be pickled / hashed, so supply a dummy hash
convolution_iterator_hashed = CustomHash(
convolution_iterator, dummy_hash,
)
bond_tasks.append(
bond_task(
perc_graph_result=perc_graph_results[dimension],
seeds=my_seeds,
ps=PS,
convolution_factors_tasks_iterator=convolution_iterator_hashed,
)
)
# reduce
reduced_canonical_averages[dimension] = jug.mapreduce.reduce(
reducer=percolate.hpc.bond_reduce,
inputs=bond_tasks,
reduce_step=100,
)
# finalize canonical averages
final_canonical_averages[dimension] = Task(
percolate.hpc.finalize_canonical_averages,
number_of_nodes=perc_graph_results[dimension]['num_nodes'],
ps=PS,
canonical_averages=reduced_canonical_averages[dimension],
alpha=ALPHA,
)
# write to disk
write_to_disk(
dimension=dimension,
canonical_averages=final_canonical_averages[dimension],
)
|
Note the use of Jug’s barrier
command.
It ensures that Jug finishes the computation of the percolation graph and all convolution factors, before proceeding with the actual bond tasks.
Normally, Jug auto-detects such dependencies and transparently makes sure that all previous tasks a particular task depends on have been run.
However, here we need to explicitly call barrier
, as we circumvent this very mechanism.
As explained earlier, this is to avoid the convolution factors of all occupation probabilities to be loaded into memory at the same time.
Executing the tasks¶
Finally, running
$ jug-execute jugfile.py
executes the tasks. This command can be run multiple times, in parallel: for example, on a cluster. Jug will automagically handle processing the tasks and storing intermediary results across worker nodes via the shared file system (NFS).
The complete jugfile¶
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 | # encoding: utf-8
"""
A sample jugfile to use with Jug
jug execute jugfile
"""
# Python 2/3 compatibility
from __future__ import unicode_literals
# http://python-future.org/compatible_idioms.html#zip-izip
from builtins import map, zip
from future.utils import iteritems
from functools import reduce
import itertools
import numpy as np
import scipy.stats
from jug import Task, TaskGenerator, barrier
from jug.utils import CustomHash
import jug.mapreduce
import percolate
import percolate.hpc
# script parameters
SYSTEM_DIMENSIONS = [8, 16, 32]
RUNS_PER_TASK = 100
NUMBER_OF_TASKS = 100
NUMBER_OF_RUNS = NUMBER_OF_TASKS * RUNS_PER_TASK
PS = np.linspace(0.4, 0.6, num=40)
SPANNING_CLUSTER = True
UINT32_MAX = 4294967296
SEED = 201508061904 % UINT32_MAX
ALPHA = 2 * scipy.stats.norm.cdf(-1.0) # 1 sigma
@TaskGenerator
def prepare_percolation_graph(dimension):
graph = percolate.spanning_2d_grid(length=dimension)
return percolate.percolate.percolation_graph(
graph=graph, spanning_cluster=SPANNING_CLUSTER
)
@TaskGenerator
def compute_convolution_factors_for_single_p(perc_graph_result, p):
return percolate.percolate._binomial_pmf(
n=perc_graph_result['num_edges'],
p=p,
)
def bond_run(perc_graph_result, seed, ps, convolution_factors_tasks):
"""
Perform a single run (realization) over all microstates and return the
canonical cluster statistics
"""
microcanonical_statistics = percolate.hpc.bond_microcanonical_statistics(
seed=seed, **perc_graph_result
)
# initialize statistics array
canonical_statistics = np.empty(
ps.size,
dtype=percolate.hpc.canonical_statistics_dtype(
spanning_cluster=SPANNING_CLUSTER,
)
)
# loop over all p's and convolve canonical statistics
# http://docs.scipy.org/doc/numpy/reference/arrays.nditer.html#modifying-array-values
for row, convolution_factors_task in zip(
np.nditer(canonical_statistics, op_flags=['writeonly']),
convolution_factors_tasks,
):
# load task result
# http://jug.readthedocs.org/en/latest/api.html#jug.Task.load
assert not convolution_factors_task.is_loaded()
convolution_factors_task.load()
# fetch task result
my_convolution_factors = convolution_factors_task.result
# convolve to canonical statistics
row[...] = percolate.hpc.bond_canonical_statistics(
microcanonical_statistics=microcanonical_statistics,
convolution_factors=my_convolution_factors,
spanning_cluster=SPANNING_CLUSTER,
)
# explicitly unload task to save memory
# http://jug.readthedocs.org/en/latest/api.html#jug.Task.unload
convolution_factors_task.unload()
# initialize canonical averages for reduce
ret = percolate.hpc.bond_initialize_canonical_averages(
canonical_statistics=canonical_statistics,
spanning_cluster=SPANNING_CLUSTER,
)
return ret
@TaskGenerator
def bond_task(
perc_graph_result, seeds, ps, convolution_factors_tasks_iterator
):
"""
Perform a number of runs
The number of runs is the number of seeds
convolution_factors_tasks_iterator needs to be an iterator
We shield the convolution factors tasks from jug value/result mechanism
by supplying an iterator to the list of tasks for lazy evaluation
http://github.com/luispedro/jug/blob/43f0d80a78f418fd3aa2b8705eaf7c4a5175fff7/jug/task.py#L100
http://github.com/luispedro/jug/blob/43f0d80a78f418fd3aa2b8705eaf7c4a5175fff7/jug/task.py#L455
"""
# restore the list of convolution factors tasks
convolution_factors_tasks = list(convolution_factors_tasks_iterator)
return reduce(
percolate.hpc.bond_reduce,
map(
bond_run,
itertools.repeat(perc_graph_result),
seeds,
itertools.repeat(ps),
itertools.repeat(convolution_factors_tasks),
)
)
@TaskGenerator
def write_to_disk(dimension, canonical_averages):
import h5py
# Read/write if exsits, create otherwise
f = h5py.File('percolate_hpc.hdf5', mode='a')
key = '{}'.format(dimension)
# dataset should not exist yet!
if key in f:
raise RuntimeError
f.create_dataset(
name=key,
data=canonical_averages,
)
f.close()
def dummy_hash(x):
"""
Supplies a constant dummy hash
"""
return 'dummy_hash'.encode('utf-8')
convolution_factors = {}
perc_graph_results = {}
reduced_canonical_averages = {}
final_canonical_averages = {}
# initialize random number generator
rng = np.random.RandomState(seed=SEED)
# in general:
# avoid premature optimization: design simulation for large system sizes
# (small system sizes take much less time anyway)
for dimension in SYSTEM_DIMENSIONS:
# building the graph takes ca. 10s per 10^6 nodes
'''
>>> import timeit
>>> timeit.timeit(
... stmt='percolate.spanning_2d_grid(1000)',
... setup='import percolate',
... number=1
... )
>>>
'''
# So that is why we store every graph on disk (as a task)
perc_graph_results[dimension] = prepare_percolation_graph(dimension)
# computing the binomial PMF takes ca. 1s per 10^6 number of states *
# number of p's
'''
>>> import timeit
>>> s = """\
... [
... percolate.percolate._binomial_pmf(n=1e4, p=p)
... for p in 100 * [0.5]
... ]
>>> timeit.timeit(stmt=s, setup='import percolate', number=1)
'''
# so store all binomial PMF (for every dimension, p) on disk
convolution_factors[dimension] = [
compute_convolution_factors_for_single_p(
perc_graph_result=perc_graph_results[dimension], p=p,
)
for p in PS
]
# finish all tasks up to here first before proceeding
# http://jug.readthedocs.org/en/latest/barrier.html#barriers
barrier()
seeds = rng.randint(UINT32_MAX, size=NUMBER_OF_RUNS)
# now process the actual tasks
bond_tasks = list()
for my_seeds in np.split(seeds, NUMBER_OF_TASKS):
# pass an iterator to list of convolution factors tasks to circumvent
# jug automatic fetching results of each task (iterator is not resolved
# and hence we achieve lazy evaluation of tasks here)
convolution_iterator = iter(
convolution_factors[dimension]
)
# iter cannot be pickled / hashed, so supply a dummy hash
convolution_iterator_hashed = CustomHash(
convolution_iterator, dummy_hash,
)
bond_tasks.append(
bond_task(
perc_graph_result=perc_graph_results[dimension],
seeds=my_seeds,
ps=PS,
convolution_factors_tasks_iterator=convolution_iterator_hashed,
)
)
# reduce
reduced_canonical_averages[dimension] = jug.mapreduce.reduce(
reducer=percolate.hpc.bond_reduce,
inputs=bond_tasks,
reduce_step=100,
)
# finalize canonical averages
final_canonical_averages[dimension] = Task(
percolate.hpc.finalize_canonical_averages,
number_of_nodes=perc_graph_results[dimension]['num_nodes'],
ps=PS,
canonical_averages=reduced_canonical_averages[dimension],
alpha=ALPHA,
)
# write to disk
write_to_disk(
dimension=dimension,
canonical_averages=final_canonical_averages[dimension],
)
|