Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions content/example/race.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from multiprocessing import Value

# define a function to increment the value by 1
def inc(i):
val.value += 1

# using a large number to see the problem
n = 100000

# create a shared data and initialize it to 0
val = Value('i', 0)
with ThreadPoolExecutor(max_workers=4) as pool:
pool.map(inc, range(n))

print(val.value)

# create a shared data and initialize it to 0
val = Value('i', 0)
with ProcessPoolExecutor(max_workers=4) as pool:
pool.map(inc, range(n))

print(val.value)
18 changes: 18 additions & 0 deletions content/exercise/race_dup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import Array


# define a function to increment the value by 1
def inc(i):
ind = mp.current_process().ident % 4
arr[ind] += 1

# define a large number
n = 100000

# create a shared data and initialize it to 0
arr = Array('i', [0]*4)
with ProcessPoolExecutor(max_workers=4) as pool:
pool.map(inc, range(n))

print(arr[:],sum(arr))
28 changes: 28 additions & 0 deletions content/exercise/race_lock.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from multiprocessing import Value, Lock

lock = Lock()

# adding lock
def inc(i):
lock.acquire()
val.value += 1
lock.release()


# define a large number
n = 100000

# create a shared data and initialize it to 0
val = Value('i', 0)
with ThreadPoolExecutor(max_workers=4) as pool:
pool.map(inc, range(n))

print(val.value)

# create a shared data and initialize it to 0
val = Value('i', 0)
with ProcessPoolExecutor(max_workers=4) as pool:
pool.map(inc, range(n))

print(val.value)
227 changes: 171 additions & 56 deletions content/parallel-computing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -24,33 +24,50 @@ and most of the speed-up in modern CPUs is coming from using multiple
CPU cores, i.e. parallel processing. Parallel processing is normally based
either on multiple threads or multiple processes.

There are three main models of parallel computing:

- **"Embarrassingly" parallel:** the code does not need to synchronize/communicate
with other instances, and you can run
multiple instances of the code separately, and combine the results
later. If you can do this, great!
There are two main models of parallel computing:

- **Shared memory parallelism (multithreading):**

- Parallel threads do separate work and communicate via the same memory and write to shared variables.
- Multiple threads in a single Python program cannot execute at the same time (see GIL below)
- Multiple threads in a single Python program cannot execute at the same time (see **global interpreter lock** below)
- Running multiple threads in Python is *only effective for certain I/O-bound tasks*
- External libraries in other languages (e.g. C) which are called from Python can still use multithreading

- **Distributed memory parallelism (multiprocessing):** Different processes manage their own memory segments and
share data by communicating (passing messages) as needed.
share data by communicating (e.g. passing messages using Message Passing Interface) as needed.

- A process can contain one or more threads
- Two processes can run on different CPU cores and different computers
- Processes have more overhead than threads (creating and destroying processes takes more time)
- Running multiple processes is *only effective for CPU-bound tasks*
- Running multiple processes is *only effective for compute-bound tasks*


.. note::

**"Embarrassingly" parallel**: If you can run multiple instances of a program and do not need to synchronize/communicate with other instances,
i.e. the problem at hand can be easily decomposed into independent tasks or datasets and there is no need to control access to shared resources,
it is known as an embarrassingly parallel program. A few examples are listed here:
- Monte Carlo analysis
- Ensemble calculations of numerical weather prediction
- Discrete Fourier transform
- Convolutional neural networks
- Applying same model on multiple datasets

**GPU computing**: This framwork takes advantages of the massively parallel compute units available in modern GPUs.
It is ideal when you need a large number of simple arithmetic operations

**Distributed computing (Spark, Dask)**: Master-worker parallelism. Master builds a graph of task dependencies and schedules to execute tasks in the appropriate order.
In the next episode we will look at `Dask <https://dask.org/>`__, an array model extension and task scheduler,
which combines multiprocessing with (embarrassingly) parallel workflows and "lazy" execution.

In the next episode we will look at `Dask <https://dask.org/>`__, an array model extension and task scheduler,
which combines multiprocessing with (embarrassingly) parallel workflows and "lazy" execution.

In the Python world, it is common to see the word `concurrency` denoting any type of simultaneous
processing, including *threads*, *tasks* and *processes*.
processing, including *threads*, *tasks* and *processes*.
- Concurrent tasks can be executed in any order but with the same final results
- Concurrent tasks can be but need not to be executed in parallel
- ``concurrent.futures`` module provides implementation of thread and process-based executors for managing resources pools for running concurrent tasks
- Concurrency is difficult: Race condition and Deadlock may arise in concurrent programs


.. warning::

Expand Down Expand Up @@ -92,7 +109,7 @@ However, multithreading is still relevant in two situations:
Multithreaded libraries
^^^^^^^^^^^^^^^^^^^^^^^

NumPy and SciPy are built on external libraries such as LAPACK, FFTW append BLAS,
NumPy and SciPy are built on external libraries such as LAPACK, FFTW, BLAS,
which provide optimized routines for linear algebra, Fourier transforms etc.
These libraries are written in C, C++ or Fortran and are thus not limited
by the GIL, so they typically support actual multihreading during the execution.
Expand All @@ -101,7 +118,7 @@ like matrix operations or frequency analysis.

Depending on configuration, NumPy will often use multiple threads by default,
but we can use the environment variable ``OMP_NUM_THREADS`` to set the number
of threads manually:
of threads manually in a Unix-like enviroment:

.. code-block:: console

Expand All @@ -110,23 +127,6 @@ of threads manually:
After setting this environment variable we continue as usual
and multithreading will be turned on.

.. demo:: Demo: Multithreading NumPy

Here is an example which does a symmetrical matrix inversion of size 4000 by 4000.
To run it, we can save it in a file named `omp_test.py` or download from :download:`here <example/omp_test.py>`.

.. literalinclude:: example/omp_test.py
:language: python

Let us test it with 1 and 4 threads:

.. code-block:: console

$ export OMP_NUM_THREADS=1
$ python omp_test.py

$ export OMP_NUM_THREADS=4
$ python omp_test.py

Multithreaded I/O
^^^^^^^^^^^^^^^^^
Expand All @@ -141,7 +141,7 @@ This is how an I/O-bound application might look:

The `threading library <https://docs.python.org/dev/library/threading.html#>`__
provides an API for creating and working with threads. The simplest approach to
create and manage threads is to use the ``ThreadPoolExecutor`` class.
create and manage threads is to use the ``ThreadPoolExecutor`` class from ``concurrent.futures`` module.
An example use case could be to download data from multiple websites using
multiple threads:

Expand All @@ -164,39 +164,89 @@ The speedup gained from multithreading I/O bound problems can be understood from
Further details on threading in Python can be found in the **See also** section below.



Multiprocessing
---------------

The ``multiprocessing`` module in Python supports spawning processes using an API
similar to the ``threading`` module. It effectively side-steps the GIL by using
*subprocesses* instead of threads, where each subprocess is an independent Python
process.

One of the simplest ways to use ``multiprocessing`` is via ``Pool`` objects and
process. One of the simplest ways to use ``multiprocessing`` is via ``Pool`` objects and
the parallel :meth:`Pool.map` function, similarly to what we saw for multithreading above.
In the following code, we define a :meth:`square`
function, call the :meth:`cpu_count` method to get the number of CPUs on the machine,
and then initialize a Pool object in a context manager and inside of it call the
:meth:`Pool.map` method to parallelize the computation.
We can save the code in a file named `mp_map.py` or download from :download:`here <example/mp_map.py>`.

.. literalinclude:: example/mp_map.py
:language: python
:emphasize-lines: 1, 11-12

.. note::

``concurrent.futures.ProcessPoolExecutor`` is actually a wrapper for
``multiprocessing.Pool`` to unify the threading and process interfaces.



Multiple arguments
^^^^^^^^^^^^^^^^^^

For functions that take multiple arguments one can instead use the :meth:`Pool.starmap`
function (save as `mp_starmap.py` or download :download:`here <example/mp_starmap.py>`)
function, and there are other options as well, see below:

.. tabs::

.. tab:: ``pool.starmap``

.. code-block:: python
:emphasize-lines: 6,8

import multiprocessing as mp

def power_n(x, n):
return x ** n

if __name__ == '__main__':
with mp.Pool(processes=4) as pool:
res = pool.starmap(power_n, [(x, 2) for x in range(20)])
print(res)

.. tab:: function adapter

.. code-block:: python
:emphasize-lines: 6,7,13

from concurrent.futures import ProcessPoolExecutor

def power_n(x, n):
return x ** n

def f_(args):
return power_n(*args)

xs = np.arange(10)
chunks = np.array_split(xs, xs.shape[0]//2)

with ProcessPoolExecutor(max_workers=4) as pool:
res = pool.map(f_, chunks)
print(list(res))


.. tab:: multiple argument iterables

.. code-block:: python
:emphasize-lines: 7

from concurrent.futures import ProcessPoolExecutor

def power_n(x, n):
return x ** n

with ProcessPoolExecutor(max_workers=4) as pool:
res = pool.map(power_n, range(0,10,2), range(1,11,2))
print(list(res))


.. literalinclude:: example/mp_starmap.py
:language: python
:emphasize-lines: 1, 10-11

.. callout:: Interactive environments

Functionality within multiprocessing requires that the ``__main__`` module be
importable by children processes. This means that for example ``multiprocessing.Pool``
will not work in the interactive interpreter. A fork of multiprocessing, called
``multiprocess``, can be used in interactive environments like Jupyter.
importable by children processes. This means that some functions may not work
in the interactive interpreter like Jupyter-notebook.

``multiprocessing`` has a number of other methods which can be useful for certain
use cases, including ``Process`` and ``Queue`` which make it possible to have direct
Expand All @@ -222,8 +272,7 @@ The idea behind MPI is that:
- Tasks communicate and share data by sending messages.
- Many higher-level functions exist to distribute information to other tasks
and gather information from other tasks.
- All tasks typically *run the entire code* and we have to be careful to avoid
that all tasks do the same thing.


``mpi4py`` provides Python bindings for the Message Passing Interface (MPI) standard.
This is how a hello world MPI program looks like in Python:
Expand Down Expand Up @@ -422,11 +471,76 @@ Upper-case methods are faster and are strongly recommended for large numeric dat
Exercises
---------

.. exercise:: Multithreading NumPy

Here is a piece of code which does a symmetrical matrix inversion of size 4000 by 4000.
To run it, we can save it in a file named `omp_test.py` or download from :download:`here <example/omp_test.py>`.

.. literalinclude:: example/omp_test.py
:language: python

Let us test it with 1 and 4 threads:

.. code-block:: console

$ export OMP_NUM_THREADS=1
$ python omp_test.py

$ export OMP_NUM_THREADS=4
$ python omp_test.py


.. exercise:: I/O-bound vs CPU-bound

In this exercise, we will simulate an I/O-bound process uing the :meth:`sleep` function.
Typical I/O-bounded processes are disk accesses, network requests etc.

.. literalinclude:: example/io_bound.py
:language: python

When the problem is compute intensive:

.. literalinclude:: example/cpu_bound.py
:language: python


.. exercise:: Race condition

Race condition is considered a common issue for multi-threading/processing applications,
which occurs when two or more threads attempt to access the shared data and
try to modify it at the same time. Try to run the example using different number ``n`` to see the differences.
Think about how we can solve this problem.


.. literalinclude:: example/race.py
:language: python

.. solution::

- locking resources: explicitly using locks
- duplicating resources: making copys of data to each threads/processes so that they do not need to share

.. tabs::

.. tab:: locking

.. literalinclude:: exercise/race_lock.py
:language: python
:emphasize-lines: 2,4,8,10

.. tab:: duplicating

.. literalinclude:: exercise/race_dup.py
:language: python




.. exercise:: Compute numerical integrals

The primary objective of this exercise is to compute integrals :math:`\int_0^1 x^{3/2} \, dx` numerically.
One approach to integration is by establishing a grid along the x-axis. Specifically, the integration range
is divided into 'n' segments or bins. Below is a basic serial code.
One approach to integration is by establishing a grid along the x-axis. Specifically, the integration range
is divided into 'n' segments or bins. Below is a basic serial code.

.. literalinclude:: exercise/1d_Integration_serial.py

Expand Down Expand Up @@ -652,5 +766,6 @@ See also

.. keypoints::

- 1 Beaware of GIL and its impact on performance
- 2 Use threads for I/O-bound tasks
- Beaware of GIL and its impact on performance
- Use threads for I/O-bound tasks and multiprocessing for compute-bound tasks
- Make it right before trying to make it fast