Quickstart High-Performance Computing with pypercolate

The percolate.hpc module is an implementation of the Newman–Ziff algorithm for High-Performance Computing (HPC). Here we demonstrate how to compute the raw data for a finite-size scaling study of bond percolation on a square grid. For high-performance computing, we utilize the task-based parallelization framework Jug. Our simulation design is as follows:

For each system size L:
    Prepare the graph of the current system size L
    Prepare the graph for pypercolate and precompute auxiliary information
    For each value p of the occupation probability:
        Precompute the convolution coefficients f
    Map-Reduce T tasks:
        Map-Reduce R runs:
            Seed the Random Number Generator
            For each occupation number n:
                Compute the microcanonical cluster statistics
            For each occupation probability p:
                Convolve the microcanonical statistics into the canonical statistics
    Finalize tasks results
    Write results to disk

A jugfile is a Python script to which Jug transparently adds parallelization magic.

Imports

First, we import the packages needed.

# Python 2/3 compatibility
from __future__ import unicode_literals
# http://python-future.org/compatible_idioms.html#zip-izip
from builtins import map, zip
from future.utils import iteritems

from functools import reduce
import itertools

import numpy as np
import scipy.stats

from jug import Task, TaskGenerator, barrier
from jug.utils import CustomHash
import jug.mapreduce

import percolate
import percolate.hpc

Define input

Here we assign the input parameters of the simulation study.

# script parameters
SYSTEM_DIMENSIONS = [8, 16, 32]
RUNS_PER_TASK = 100
NUMBER_OF_TASKS = 100
NUMBER_OF_RUNS = NUMBER_OF_TASKS * RUNS_PER_TASK
PS = np.linspace(0.4, 0.6, num=40)
SPANNING_CLUSTER = True
UINT32_MAX = 4294967296
SEED = 201508061904 % UINT32_MAX
ALPHA = 2 * scipy.stats.norm.cdf(-1.0)  # 1 sigma

The SPANNING_CLUSTER variable defines whether the script should detect spanning clusters. The UINT32_MAX integer is the upper bound of the seeds. The SEED seeds the central random number generator which in turn seeds the task random number generators.

Jug task to prepare the percolation graph and precomput the convolution factors

The following jug task creates the square grid graph of \(L \times L\) nodes, where \(L\) is the system size.

@TaskGenerator
def prepare_percolation_graph(dimension):
    graph = percolate.spanning_2d_grid(length=dimension)
    return percolate.percolate.percolation_graph(
        graph=graph, spanning_cluster=SPANNING_CLUSTER
    )

Next, the jugfile defines a task to precompute the convolution factors for a given occupation probability \(p\):

@TaskGenerator
def compute_convolution_factors_for_single_p(perc_graph_result, p):
    return percolate.percolate._binomial_pmf(
        n=perc_graph_result['num_edges'],
        p=p,
    )

An elementary run (sub-task)

Every task samples the bond percolation setting with a number of runs \(N_{\text{runs}}\). The following function implements a single run as described in the user guide. Note here that we carefully circumvent Jug’s automatic loading of task results: Instead of piping through the convolution factor tasks directly, we add a layer of indirectness and supply an iterator to these tasks. This way, Jug does not automatically load the task results, which would unnecessarily increase memory consumption and limit the system size. The inner loop convolves the microcanonical statistics for one occupation number after the other, carefully loading and unloading the respective convolution factor task results.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
def bond_run(perc_graph_result, seed, ps, convolution_factors_tasks):
    """
    Perform a single run (realization) over all microstates and return the
    canonical cluster statistics
    """
    microcanonical_statistics = percolate.hpc.bond_microcanonical_statistics(
        seed=seed, **perc_graph_result
    )

    # initialize statistics array
    canonical_statistics = np.empty(
        ps.size,
        dtype=percolate.hpc.canonical_statistics_dtype(
            spanning_cluster=SPANNING_CLUSTER,
        )
    )

    # loop over all p's and convolve canonical statistics
    # http://docs.scipy.org/doc/numpy/reference/arrays.nditer.html#modifying-array-values
    for row, convolution_factors_task in zip(
        np.nditer(canonical_statistics, op_flags=['writeonly']),
        convolution_factors_tasks,
    ):
        # load task result
        # http://jug.readthedocs.org/en/latest/api.html#jug.Task.load
        assert not convolution_factors_task.is_loaded()
        convolution_factors_task.load()
        # fetch task result
        my_convolution_factors = convolution_factors_task.result

        # convolve to canonical statistics
        row[...] = percolate.hpc.bond_canonical_statistics(
            microcanonical_statistics=microcanonical_statistics,
            convolution_factors=my_convolution_factors,
            spanning_cluster=SPANNING_CLUSTER,
        )
        # explicitly unload task to save memory
        # http://jug.readthedocs.org/en/latest/api.html#jug.Task.unload
        convolution_factors_task.unload()

    # initialize canonical averages for reduce
    ret = percolate.hpc.bond_initialize_canonical_averages(
        canonical_statistics=canonical_statistics,
        spanning_cluster=SPANNING_CLUSTER,
    )

    return ret

A simple task

Here is the definition of the actual task, each instance of which conducts a number of runs and reduces their results:

@TaskGenerator
def bond_task(
    perc_graph_result, seeds, ps, convolution_factors_tasks_iterator
):
    """
    Perform a number of runs

    The number of runs is the number of seeds

    convolution_factors_tasks_iterator needs to be an iterator

    We shield the convolution factors tasks from jug value/result mechanism
    by supplying an iterator to the list of tasks for lazy evaluation
    http://github.com/luispedro/jug/blob/43f0d80a78f418fd3aa2b8705eaf7c4a5175fff7/jug/task.py#L100
    http://github.com/luispedro/jug/blob/43f0d80a78f418fd3aa2b8705eaf7c4a5175fff7/jug/task.py#L455
    """

    # restore the list of convolution factors tasks
    convolution_factors_tasks = list(convolution_factors_tasks_iterator)

    return reduce(
        percolate.hpc.bond_reduce,
        map(
            bond_run,
            itertools.repeat(perc_graph_result),
            seeds,
            itertools.repeat(ps),
            itertools.repeat(convolution_factors_tasks),
        )
    )

Write to disk

Another task is to write the finalized results for a system size to disk. We provide an example implementation here that writes to a HDF5 file:

@TaskGenerator
def write_to_disk(dimension, canonical_averages):
    import h5py

    # Read/write if exsits, create otherwise
    f = h5py.File('percolate_hpc.hdf5', mode='a')

    key = '{}'.format(dimension)

    # dataset should not exist yet!
    if key in f:
        raise RuntimeError

    f.create_dataset(
        name=key,
        data=canonical_averages,
    )

    f.close()

Hashing the iterator

For supplying a task iterator, for technical reasons we need to supply a dummy hash function to Jug:

def dummy_hash(x):
    """
    Supplies a constant dummy hash
    """
    return 'dummy_hash'.encode('utf-8')

The control flow

Now, we are in the position to implement the control flow:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
convolution_factors = {}
perc_graph_results = {}
reduced_canonical_averages = {}
final_canonical_averages = {}

# initialize random number generator
rng = np.random.RandomState(seed=SEED)


# in general:
# avoid premature optimization: design simulation for large system sizes
# (small system sizes take much less time anyway)
for dimension in SYSTEM_DIMENSIONS:
    # building the graph takes ca. 10s per 10^6 nodes
    '''
    >>> import timeit
    >>> timeit.timeit(
    ...     stmt='percolate.spanning_2d_grid(1000)',
    ...     setup='import percolate',
    ...     number=1
    ... )
    >>>
    '''
    # So that is why we store every graph on disk (as a task)
    perc_graph_results[dimension] = prepare_percolation_graph(dimension)

    # computing the binomial PMF takes ca. 1s per 10^6 number of states *
    # number of p's
    '''
    >>> import timeit
    >>> s = """\
    ... [
    ...     percolate.percolate._binomial_pmf(n=1e4, p=p)
    ...     for p in 100 * [0.5]
    ... ]
    >>> timeit.timeit(stmt=s, setup='import percolate', number=1)
    '''
    # so store all binomial PMF (for every dimension, p) on disk
    convolution_factors[dimension] = [
        compute_convolution_factors_for_single_p(
            perc_graph_result=perc_graph_results[dimension], p=p,
        )
        for p in PS
    ]

    # finish all tasks up to here first before proceeding
    # http://jug.readthedocs.org/en/latest/barrier.html#barriers
    barrier()

    seeds = rng.randint(UINT32_MAX, size=NUMBER_OF_RUNS)

    # now process the actual tasks
    bond_tasks = list()
    for my_seeds in np.split(seeds, NUMBER_OF_TASKS):
        # pass an iterator to list of convolution factors tasks to circumvent
        # jug automatic fetching results of each task (iterator is not resolved
        # and hence we achieve lazy evaluation of tasks here)
        convolution_iterator = iter(
            convolution_factors[dimension]
        )
        # iter cannot be pickled / hashed, so supply a dummy hash
        convolution_iterator_hashed = CustomHash(
            convolution_iterator, dummy_hash,
        )
        bond_tasks.append(
            bond_task(
                perc_graph_result=perc_graph_results[dimension],
                seeds=my_seeds,
                ps=PS,
                convolution_factors_tasks_iterator=convolution_iterator_hashed,
            )
        )

    # reduce
    reduced_canonical_averages[dimension] = jug.mapreduce.reduce(
        reducer=percolate.hpc.bond_reduce,
        inputs=bond_tasks,
        reduce_step=100,
    )

    # finalize canonical averages
    final_canonical_averages[dimension] = Task(
        percolate.hpc.finalize_canonical_averages,
        number_of_nodes=perc_graph_results[dimension]['num_nodes'],
        ps=PS,
        canonical_averages=reduced_canonical_averages[dimension],
        alpha=ALPHA,
    )

    # write to disk
    write_to_disk(
        dimension=dimension,
        canonical_averages=final_canonical_averages[dimension],
    )

Note the use of Jug’s barrier command. It ensures that Jug finishes the computation of the percolation graph and all convolution factors, before proceeding with the actual bond tasks. Normally, Jug auto-detects such dependencies and transparently makes sure that all previous tasks a particular task depends on have been run. However, here we need to explicitly call barrier, as we circumvent this very mechanism. As explained earlier, this is to avoid the convolution factors of all occupation probabilities to be loaded into memory at the same time.

Executing the tasks

Finally, running

$ jug-execute jugfile.py

executes the tasks. This command can be run multiple times, in parallel: for example, on a cluster. Jug will automagically handle processing the tasks and storing intermediary results across worker nodes via the shared file system (NFS).

The complete jugfile

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
# encoding: utf-8

"""
A sample jugfile to use with Jug

jug execute jugfile
"""

# Python 2/3 compatibility
from __future__ import unicode_literals
# http://python-future.org/compatible_idioms.html#zip-izip
from builtins import map, zip
from future.utils import iteritems

from functools import reduce
import itertools

import numpy as np
import scipy.stats

from jug import Task, TaskGenerator, barrier
from jug.utils import CustomHash
import jug.mapreduce

import percolate
import percolate.hpc


# script parameters
SYSTEM_DIMENSIONS = [8, 16, 32]
RUNS_PER_TASK = 100
NUMBER_OF_TASKS = 100
NUMBER_OF_RUNS = NUMBER_OF_TASKS * RUNS_PER_TASK
PS = np.linspace(0.4, 0.6, num=40)
SPANNING_CLUSTER = True
UINT32_MAX = 4294967296
SEED = 201508061904 % UINT32_MAX
ALPHA = 2 * scipy.stats.norm.cdf(-1.0)  # 1 sigma


@TaskGenerator
def prepare_percolation_graph(dimension):
    graph = percolate.spanning_2d_grid(length=dimension)
    return percolate.percolate.percolation_graph(
        graph=graph, spanning_cluster=SPANNING_CLUSTER
    )


@TaskGenerator
def compute_convolution_factors_for_single_p(perc_graph_result, p):
    return percolate.percolate._binomial_pmf(
        n=perc_graph_result['num_edges'],
        p=p,
    )


def bond_run(perc_graph_result, seed, ps, convolution_factors_tasks):
    """
    Perform a single run (realization) over all microstates and return the
    canonical cluster statistics
    """
    microcanonical_statistics = percolate.hpc.bond_microcanonical_statistics(
        seed=seed, **perc_graph_result
    )

    # initialize statistics array
    canonical_statistics = np.empty(
        ps.size,
        dtype=percolate.hpc.canonical_statistics_dtype(
            spanning_cluster=SPANNING_CLUSTER,
        )
    )

    # loop over all p's and convolve canonical statistics
    # http://docs.scipy.org/doc/numpy/reference/arrays.nditer.html#modifying-array-values
    for row, convolution_factors_task in zip(
        np.nditer(canonical_statistics, op_flags=['writeonly']),
        convolution_factors_tasks,
    ):
        # load task result
        # http://jug.readthedocs.org/en/latest/api.html#jug.Task.load
        assert not convolution_factors_task.is_loaded()
        convolution_factors_task.load()
        # fetch task result
        my_convolution_factors = convolution_factors_task.result

        # convolve to canonical statistics
        row[...] = percolate.hpc.bond_canonical_statistics(
            microcanonical_statistics=microcanonical_statistics,
            convolution_factors=my_convolution_factors,
            spanning_cluster=SPANNING_CLUSTER,
        )
        # explicitly unload task to save memory
        # http://jug.readthedocs.org/en/latest/api.html#jug.Task.unload
        convolution_factors_task.unload()

    # initialize canonical averages for reduce
    ret = percolate.hpc.bond_initialize_canonical_averages(
        canonical_statistics=canonical_statistics,
        spanning_cluster=SPANNING_CLUSTER,
    )

    return ret


@TaskGenerator
def bond_task(
    perc_graph_result, seeds, ps, convolution_factors_tasks_iterator
):
    """
    Perform a number of runs

    The number of runs is the number of seeds

    convolution_factors_tasks_iterator needs to be an iterator

    We shield the convolution factors tasks from jug value/result mechanism
    by supplying an iterator to the list of tasks for lazy evaluation
    http://github.com/luispedro/jug/blob/43f0d80a78f418fd3aa2b8705eaf7c4a5175fff7/jug/task.py#L100
    http://github.com/luispedro/jug/blob/43f0d80a78f418fd3aa2b8705eaf7c4a5175fff7/jug/task.py#L455
    """

    # restore the list of convolution factors tasks
    convolution_factors_tasks = list(convolution_factors_tasks_iterator)

    return reduce(
        percolate.hpc.bond_reduce,
        map(
            bond_run,
            itertools.repeat(perc_graph_result),
            seeds,
            itertools.repeat(ps),
            itertools.repeat(convolution_factors_tasks),
        )
    )


@TaskGenerator
def write_to_disk(dimension, canonical_averages):
    import h5py

    # Read/write if exsits, create otherwise
    f = h5py.File('percolate_hpc.hdf5', mode='a')

    key = '{}'.format(dimension)

    # dataset should not exist yet!
    if key in f:
        raise RuntimeError

    f.create_dataset(
        name=key,
        data=canonical_averages,
    )

    f.close()


def dummy_hash(x):
    """
    Supplies a constant dummy hash
    """
    return 'dummy_hash'.encode('utf-8')


convolution_factors = {}
perc_graph_results = {}
reduced_canonical_averages = {}
final_canonical_averages = {}

# initialize random number generator
rng = np.random.RandomState(seed=SEED)


# in general:
# avoid premature optimization: design simulation for large system sizes
# (small system sizes take much less time anyway)
for dimension in SYSTEM_DIMENSIONS:
    # building the graph takes ca. 10s per 10^6 nodes
    '''
    >>> import timeit
    >>> timeit.timeit(
    ...     stmt='percolate.spanning_2d_grid(1000)',
    ...     setup='import percolate',
    ...     number=1
    ... )
    >>>
    '''
    # So that is why we store every graph on disk (as a task)
    perc_graph_results[dimension] = prepare_percolation_graph(dimension)

    # computing the binomial PMF takes ca. 1s per 10^6 number of states *
    # number of p's
    '''
    >>> import timeit
    >>> s = """\
    ... [
    ...     percolate.percolate._binomial_pmf(n=1e4, p=p)
    ...     for p in 100 * [0.5]
    ... ]
    >>> timeit.timeit(stmt=s, setup='import percolate', number=1)
    '''
    # so store all binomial PMF (for every dimension, p) on disk
    convolution_factors[dimension] = [
        compute_convolution_factors_for_single_p(
            perc_graph_result=perc_graph_results[dimension], p=p,
        )
        for p in PS
    ]

    # finish all tasks up to here first before proceeding
    # http://jug.readthedocs.org/en/latest/barrier.html#barriers
    barrier()

    seeds = rng.randint(UINT32_MAX, size=NUMBER_OF_RUNS)

    # now process the actual tasks
    bond_tasks = list()
    for my_seeds in np.split(seeds, NUMBER_OF_TASKS):
        # pass an iterator to list of convolution factors tasks to circumvent
        # jug automatic fetching results of each task (iterator is not resolved
        # and hence we achieve lazy evaluation of tasks here)
        convolution_iterator = iter(
            convolution_factors[dimension]
        )
        # iter cannot be pickled / hashed, so supply a dummy hash
        convolution_iterator_hashed = CustomHash(
            convolution_iterator, dummy_hash,
        )
        bond_tasks.append(
            bond_task(
                perc_graph_result=perc_graph_results[dimension],
                seeds=my_seeds,
                ps=PS,
                convolution_factors_tasks_iterator=convolution_iterator_hashed,
            )
        )

    # reduce
    reduced_canonical_averages[dimension] = jug.mapreduce.reduce(
        reducer=percolate.hpc.bond_reduce,
        inputs=bond_tasks,
        reduce_step=100,
    )

    # finalize canonical averages
    final_canonical_averages[dimension] = Task(
        percolate.hpc.finalize_canonical_averages,
        number_of_nodes=perc_graph_results[dimension]['num_nodes'],
        ps=PS,
        canonical_averages=reduced_canonical_averages[dimension],
        alpha=ALPHA,
    )

    # write to disk
    write_to_disk(
        dimension=dimension,
        canonical_averages=final_canonical_averages[dimension],
    )