Examples#
Using MPIPool#
To see an example, download, read, and execute
the provided demo file
on just 2 processes (the
master process and 1 worker) using:
mpiexec -n 2 python mpi-demo.py
Using JoblibPool#
schwimmbad.JoblibPool
is a pool built using the Parallel
capabilities of the joblib module. To use this pool, first install
joblib. Then use it like
any other pool:
from schwimmbad import JoblibPool
with JoblibPool(4) as pool:
pool.map(expensive_code, iterator)
One benefit of the JoblibPool
is that it provides a threading
back end that can be used to parallelize tasks that release the GIL without
increasing the memory usage. If you have code that explicitly releases the GIL
(using the with nogil
Cython command, for example) you can parallelize it
using the JoblibPool
as follows:
with JoblibPool(4, backend="threading") as pool:
pool.map(nogil_code, iterator)
Using MultiPool (with emcee
)#
The emcee.EnsembleSampler
class in emcee supports multiprocessing
in two
ways: either with the n_threads
keyword argument, or by specifying your own
pool class. This can be any of the pool classes mentioned above, but as an
example of this we’ll use the MultiPool
class implemented in
schwimmbad.
To use emcee
, we’ll need to define a probability function to sample from.
For this demo, we’ll just use a Gaussian with mean mean
and variance
var
specified as arguments to the (log-)probability function:
from schwimmbad import MultiPool
import emcee
import matplotlib.pyplot as plt
def ln_probability(p, mean, var):
x = p[0]
return -0.5 * (x-mean)**2 / var
# initial positions for the walkers
n_walkers = 16
p0 = np.random.uniform(0, 10, (n_walkers, 1))
with MultiPool() as pool:
sampler = emcee.EnsembleSampler(n_walkers, dim=1,
lnpostfn=ln_probability,
args=(5, 1.2),
pool=pool) # the important line
pos,_,_ = sampler.run_mcmc(p0, 500)
sampler.reset()
sampler.run_mcmc(p0, 1000)
plt.figure()
plt.hist(sampler.flatchain)
Selecting a pool with command-line arguments#
schwimmbad
is particularly useful for writing code that can be executed in
multiple parallel processing environments without having to edit the code
directly. This is because once a pool
object is created, the subsequent
processing code can be agnostic to the particular mode of processing. For
example (a use-case from astronomy), imagine that we need to perform some type
of parameter fitting or optimization for a large number of objects and store the
results. We have many objects, so we ultimately want to deploy this script onto
a compute cluster where the work can be spread over multiple nodes on the
cluster. But, when developing and debugging the script, we want to be able to
execute the script locally both in serial-processing mode and perhaps also with
multiprocessing
to test any parallel functionality. Here’s a
demonstration of such a use case. The code below is meant to be added to a
Python script/module; the full example can be downloaded here
.
We start by defining a “worker” function: this is the function that will take a single “task” (e.g., one datum or one object’s data) and returns a result based on that task. For this example, we’ll simply evaluate some trigonometric functions on two values passed in with the “task.” Note that the worker function has to take a single argument (the task), but that can be an iterable:
import math
def worker(task):
a, b = task
return math.cos(a) + math.sin(b)
We next define a main()
function that accepts a pool
object and performs
the actual processing:
def main(pool):
# Here we generate some fake data
import random
a = [random.uniform(0, 2*math.pi) for _ in range(10000)]
b = [random.uniform(0, 2*math.pi) for _ in range(10000)]
tasks = list(zip(a, b))
results = pool.map(worker, tasks)
pool.close()
# Now we could save or do something with the results object
With a few extra lines of code using Python’s argparse
module, we can add
command-line flags to the script that allow us to choose the processing method
when we run the script. With the specified arguments below, we can either (1)
pass no flags, in which case the script is run in serial (with the
SerialPool
), (2) pass --ncores
with an integer to
specify the number of cores to run using Python’s multiprocessing
utilities (with the MultiPool
), or (3) pass --mpi
by
itself to specify that we’d like to run with MPI (with the
MPIPool
):
if __name__ == "__main__":
import schwimmbad
from argparse import ArgumentParser
parser = ArgumentParser(description="Schwimmbad example.")
group = parser.add_mutually_exclusive_group()
group.add_argument("--ncores", dest="n_cores", default=1,
type=int, help="Number of processes (uses multiprocessing).")
group.add_argument("--mpi", dest="mpi", default=False,
action="store_true", help="Run with MPI.")
args = parser.parse_args()
pool = schwimmbad.choose_pool(mpi=args.mpi, processes=args.n_cores)
main(pool)
Note that for the first two options, we can run the script as usual using, e.g.
python script-demo.py
or
python script-demo.py --ncores=4
To run with MPI, we have to use the compiled MPI executable, which depends on
the environment and MPI installation you are using. For example, for OpenMPI, by
default this is likely mpiexec
:
mpiexec -n 4 python script-demo.py --mpi
This full example can be downloaded here
.
Advanced usage: a class-based worker and callback functions#
This example will demonstrate two more advanced but common use-cases for parallel processing: (1) the need to write output to a file in a safe way, i.e. so that processes aren’t trying to write to the file at the same time, and (2) the need to pass some configuration settings or parameters to the worker function each time it is run.
To satisfy both of these needs, we’re going to create a class to act as our
worker (instead of a function), and allow the objects instantiaed from this
class to be called like a function by defining the __call__
method. The
arguments of the class initializer will allow us to set global parameters for
all workers. We’ll then also define a callback function as a method of the class
to handle writing output to a file (only ever from the master process). Let’s
consider a simple example: we need to pass a file path in to each walker, and we
need to write to that file each time a result is computed from the worker. Let’s
define a class that accepts a path to the output file, a method that actually
does some work (in this case, just computes a simple quantity based on the task
passed in), and defines a callback function that appends each result to the
specified output file:
import schwimmbad
class Worker(object):
def __init__(self, output_path):
self.output_path = output_path
def work(self, a, b):
# For example, all we do is compute a third value
c = 2*a*b - b**2
return c
def callback(self, result):
with open(self.output_path, 'a') as f:
f.write("{0}\n".format(result))
def __call__(self, task):
a, b = task
return self.work(a, b)
We can now follow a similar paradigm to that used in Selecting a pool with command-line arguments:
def main(pool):
worker = Worker('output_file.txt')
tasks = list(zip(range(16384), range(16384)[::-1]))
for r in pool.map(worker, tasks, callback=worker.callback):
pass
pool.close()
if __name__ == "__main__":
from argparse import ArgumentParser
parser = ArgumentParser(description="")
group = parser.add_mutually_exclusive_group()
group.add_argument("--ncores", dest="n_cores", default=1,
type=int, help="Number of processes (uses multiprocessing).")
group.add_argument("--mpi", dest="mpi", default=False,
action="store_true", help="Run with MPI.")
args = parser.parse_args()
pool = schwimmbad.choose_pool(mpi=args.mpi, processes=args.n_cores)
main(pool)