From 166016759639d616accdda1aba8790251b33ba02 Mon Sep 17 00:00:00 2001 From: Qiang Li Date: Sun, 19 Jan 2025 10:25:30 +0100 Subject: [PATCH 1/2] adding examples of race condition --- content/example/race.py | 23 +++++++++++++++++++++++ content/exercise/race_dup.py | 18 ++++++++++++++++++ content/exercise/race_lock.py | 28 ++++++++++++++++++++++++++++ 3 files changed, 69 insertions(+) create mode 100644 content/example/race.py create mode 100644 content/exercise/race_dup.py create mode 100644 content/exercise/race_lock.py diff --git a/content/example/race.py b/content/example/race.py new file mode 100644 index 0000000..91394a1 --- /dev/null +++ b/content/example/race.py @@ -0,0 +1,23 @@ +from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor +from multiprocessing import Value + +# define a function to increment the value by 1 +def inc(i): + val.value += 1 + +# using a large number to see the problem +n = 100000 + +# create a shared data and initialize it to 0 +val = Value('i', 0) +with ThreadPoolExecutor(max_workers=4) as pool: + pool.map(inc, range(n)) + +print(val.value) + +# create a shared data and initialize it to 0 +val = Value('i', 0) +with ProcessPoolExecutor(max_workers=4) as pool: + pool.map(inc, range(n)) + +print(val.value) diff --git a/content/exercise/race_dup.py b/content/exercise/race_dup.py new file mode 100644 index 0000000..2f5951b --- /dev/null +++ b/content/exercise/race_dup.py @@ -0,0 +1,18 @@ +from concurrent.futures import ProcessPoolExecutor +from multiprocessing import Array + + +# define a function to increment the value by 1 +def inc(i): + ind = mp.current_process().ident % 4 + arr[ind] += 1 + +# define a large number +n = 100000 + +# create a shared data and initialize it to 0 +arr = Array('i', [0]*4) +with ProcessPoolExecutor(max_workers=4) as pool: + pool.map(inc, range(n)) + +print(arr[:],sum(arr)) diff --git a/content/exercise/race_lock.py b/content/exercise/race_lock.py new file mode 100644 index 0000000..53682a5 --- /dev/null +++ b/content/exercise/race_lock.py @@ -0,0 +1,28 @@ +from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor +from multiprocessing import Value, Lock + +lock = Lock() + +# adding lock +def inc(i): + lock.acquire() + val.value += 1 + lock.release() + + +# define a large number +n = 100000 + +# create a shared data and initialize it to 0 +val = Value('i', 0) +with ThreadPoolExecutor(max_workers=4) as pool: + pool.map(inc, range(n)) + +print(val.value) + +# create a shared data and initialize it to 0 +val = Value('i', 0) +with ProcessPoolExecutor(max_workers=4) as pool: + pool.map(inc, range(n)) + +print(val.value) From 2444fcae077e3289ac22390ab2967d6974f8a8c8 Mon Sep 17 00:00:00 2001 From: Qiang Li Date: Sun, 19 Jan 2025 10:32:57 +0100 Subject: [PATCH 2/2] update lesson material for parallel computing episode --- content/parallel-computing.rst | 227 +++++++++++++++++++++++++-------- 1 file changed, 171 insertions(+), 56 deletions(-) diff --git a/content/parallel-computing.rst b/content/parallel-computing.rst index a0cd0cb..1489698 100644 --- a/content/parallel-computing.rst +++ b/content/parallel-computing.rst @@ -24,33 +24,50 @@ and most of the speed-up in modern CPUs is coming from using multiple CPU cores, i.e. parallel processing. Parallel processing is normally based either on multiple threads or multiple processes. -There are three main models of parallel computing: - -- **"Embarrassingly" parallel:** the code does not need to synchronize/communicate - with other instances, and you can run - multiple instances of the code separately, and combine the results - later. If you can do this, great! +There are two main models of parallel computing: - **Shared memory parallelism (multithreading):** - Parallel threads do separate work and communicate via the same memory and write to shared variables. - - Multiple threads in a single Python program cannot execute at the same time (see GIL below) + - Multiple threads in a single Python program cannot execute at the same time (see **global interpreter lock** below) - Running multiple threads in Python is *only effective for certain I/O-bound tasks* - External libraries in other languages (e.g. C) which are called from Python can still use multithreading - **Distributed memory parallelism (multiprocessing):** Different processes manage their own memory segments and - share data by communicating (passing messages) as needed. + share data by communicating (e.g. passing messages using Message Passing Interface) as needed. - A process can contain one or more threads - Two processes can run on different CPU cores and different computers - Processes have more overhead than threads (creating and destroying processes takes more time) - - Running multiple processes is *only effective for CPU-bound tasks* + - Running multiple processes is *only effective for compute-bound tasks* + + +.. note:: + + **"Embarrassingly" parallel**: If you can run multiple instances of a program and do not need to synchronize/communicate with other instances, + i.e. the problem at hand can be easily decomposed into independent tasks or datasets and there is no need to control access to shared resources, + it is known as an embarrassingly parallel program. A few examples are listed here: + - Monte Carlo analysis + - Ensemble calculations of numerical weather prediction + - Discrete Fourier transform + - Convolutional neural networks + - Applying same model on multiple datasets + + **GPU computing**: This framwork takes advantages of the massively parallel compute units available in modern GPUs. + It is ideal when you need a large number of simple arithmetic operations + + **Distributed computing (Spark, Dask)**: Master-worker parallelism. Master builds a graph of task dependencies and schedules to execute tasks in the appropriate order. + In the next episode we will look at `Dask `__, an array model extension and task scheduler, + which combines multiprocessing with (embarrassingly) parallel workflows and "lazy" execution. -In the next episode we will look at `Dask `__, an array model extension and task scheduler, -which combines multiprocessing with (embarrassingly) parallel workflows and "lazy" execution. In the Python world, it is common to see the word `concurrency` denoting any type of simultaneous -processing, including *threads*, *tasks* and *processes*. +processing, including *threads*, *tasks* and *processes*. + - Concurrent tasks can be executed in any order but with the same final results + - Concurrent tasks can be but need not to be executed in parallel + - ``concurrent.futures`` module provides implementation of thread and process-based executors for managing resources pools for running concurrent tasks + - Concurrency is difficult: Race condition and Deadlock may arise in concurrent programs + .. warning:: @@ -92,7 +109,7 @@ However, multithreading is still relevant in two situations: Multithreaded libraries ^^^^^^^^^^^^^^^^^^^^^^^ -NumPy and SciPy are built on external libraries such as LAPACK, FFTW append BLAS, +NumPy and SciPy are built on external libraries such as LAPACK, FFTW, BLAS, which provide optimized routines for linear algebra, Fourier transforms etc. These libraries are written in C, C++ or Fortran and are thus not limited by the GIL, so they typically support actual multihreading during the execution. @@ -101,7 +118,7 @@ like matrix operations or frequency analysis. Depending on configuration, NumPy will often use multiple threads by default, but we can use the environment variable ``OMP_NUM_THREADS`` to set the number -of threads manually: +of threads manually in a Unix-like enviroment: .. code-block:: console @@ -110,23 +127,6 @@ of threads manually: After setting this environment variable we continue as usual and multithreading will be turned on. -.. demo:: Demo: Multithreading NumPy - - Here is an example which does a symmetrical matrix inversion of size 4000 by 4000. - To run it, we can save it in a file named `omp_test.py` or download from :download:`here `. - - .. literalinclude:: example/omp_test.py - :language: python - - Let us test it with 1 and 4 threads: - - .. code-block:: console - - $ export OMP_NUM_THREADS=1 - $ python omp_test.py - - $ export OMP_NUM_THREADS=4 - $ python omp_test.py Multithreaded I/O ^^^^^^^^^^^^^^^^^ @@ -141,7 +141,7 @@ This is how an I/O-bound application might look: The `threading library `__ provides an API for creating and working with threads. The simplest approach to -create and manage threads is to use the ``ThreadPoolExecutor`` class. +create and manage threads is to use the ``ThreadPoolExecutor`` class from ``concurrent.futures`` module. An example use case could be to download data from multiple websites using multiple threads: @@ -164,39 +164,89 @@ The speedup gained from multithreading I/O bound problems can be understood from Further details on threading in Python can be found in the **See also** section below. + Multiprocessing --------------- The ``multiprocessing`` module in Python supports spawning processes using an API similar to the ``threading`` module. It effectively side-steps the GIL by using *subprocesses* instead of threads, where each subprocess is an independent Python -process. - -One of the simplest ways to use ``multiprocessing`` is via ``Pool`` objects and +process. One of the simplest ways to use ``multiprocessing`` is via ``Pool`` objects and the parallel :meth:`Pool.map` function, similarly to what we saw for multithreading above. -In the following code, we define a :meth:`square` -function, call the :meth:`cpu_count` method to get the number of CPUs on the machine, -and then initialize a Pool object in a context manager and inside of it call the -:meth:`Pool.map` method to parallelize the computation. -We can save the code in a file named `mp_map.py` or download from :download:`here `. -.. literalinclude:: example/mp_map.py - :language: python - :emphasize-lines: 1, 11-12 + +.. note:: + + ``concurrent.futures.ProcessPoolExecutor`` is actually a wrapper for + ``multiprocessing.Pool`` to unify the threading and process interfaces. + + + +Multiple arguments +^^^^^^^^^^^^^^^^^^ For functions that take multiple arguments one can instead use the :meth:`Pool.starmap` -function (save as `mp_starmap.py` or download :download:`here `) +function, and there are other options as well, see below: + +.. tabs:: + + .. tab:: ``pool.starmap`` + + .. code-block:: python + :emphasize-lines: 6,8 + + import multiprocessing as mp + + def power_n(x, n): + return x ** n + + if __name__ == '__main__': + with mp.Pool(processes=4) as pool: + res = pool.starmap(power_n, [(x, 2) for x in range(20)]) + print(res) + + .. tab:: function adapter + + .. code-block:: python + :emphasize-lines: 6,7,13 + + from concurrent.futures import ProcessPoolExecutor + + def power_n(x, n): + return x ** n + + def f_(args): + return power_n(*args) + + xs = np.arange(10) + chunks = np.array_split(xs, xs.shape[0]//2) + + with ProcessPoolExecutor(max_workers=4) as pool: + res = pool.map(f_, chunks) + print(list(res)) + + + .. tab:: multiple argument iterables + + .. code-block:: python + :emphasize-lines: 7 + + from concurrent.futures import ProcessPoolExecutor + + def power_n(x, n): + return x ** n + + with ProcessPoolExecutor(max_workers=4) as pool: + res = pool.map(power_n, range(0,10,2), range(1,11,2)) + print(list(res)) + -.. literalinclude:: example/mp_starmap.py - :language: python - :emphasize-lines: 1, 10-11 .. callout:: Interactive environments Functionality within multiprocessing requires that the ``__main__`` module be - importable by children processes. This means that for example ``multiprocessing.Pool`` - will not work in the interactive interpreter. A fork of multiprocessing, called - ``multiprocess``, can be used in interactive environments like Jupyter. + importable by children processes. This means that some functions may not work + in the interactive interpreter like Jupyter-notebook. ``multiprocessing`` has a number of other methods which can be useful for certain use cases, including ``Process`` and ``Queue`` which make it possible to have direct @@ -222,8 +272,7 @@ The idea behind MPI is that: - Tasks communicate and share data by sending messages. - Many higher-level functions exist to distribute information to other tasks and gather information from other tasks. -- All tasks typically *run the entire code* and we have to be careful to avoid - that all tasks do the same thing. + ``mpi4py`` provides Python bindings for the Message Passing Interface (MPI) standard. This is how a hello world MPI program looks like in Python: @@ -422,11 +471,76 @@ Upper-case methods are faster and are strongly recommended for large numeric dat Exercises --------- +.. exercise:: Multithreading NumPy + + Here is a piece of code which does a symmetrical matrix inversion of size 4000 by 4000. + To run it, we can save it in a file named `omp_test.py` or download from :download:`here `. + + .. literalinclude:: example/omp_test.py + :language: python + + Let us test it with 1 and 4 threads: + + .. code-block:: console + + $ export OMP_NUM_THREADS=1 + $ python omp_test.py + + $ export OMP_NUM_THREADS=4 + $ python omp_test.py + + +.. exercise:: I/O-bound vs CPU-bound + + In this exercise, we will simulate an I/O-bound process uing the :meth:`sleep` function. + Typical I/O-bounded processes are disk accesses, network requests etc. + + .. literalinclude:: example/io_bound.py + :language: python + + When the problem is compute intensive: + + .. literalinclude:: example/cpu_bound.py + :language: python + + +.. exercise:: Race condition + + Race condition is considered a common issue for multi-threading/processing applications, + which occurs when two or more threads attempt to access the shared data and + try to modify it at the same time. Try to run the example using different number ``n`` to see the differences. + Think about how we can solve this problem. + + + .. literalinclude:: example/race.py + :language: python + + .. solution:: + + - locking resources: explicitly using locks + - duplicating resources: making copys of data to each threads/processes so that they do not need to share + + .. tabs:: + + .. tab:: locking + + .. literalinclude:: exercise/race_lock.py + :language: python + :emphasize-lines: 2,4,8,10 + + .. tab:: duplicating + + .. literalinclude:: exercise/race_dup.py + :language: python + + + + .. exercise:: Compute numerical integrals The primary objective of this exercise is to compute integrals :math:`\int_0^1 x^{3/2} \, dx` numerically. -One approach to integration is by establishing a grid along the x-axis. Specifically, the integration range -is divided into 'n' segments or bins. Below is a basic serial code. + One approach to integration is by establishing a grid along the x-axis. Specifically, the integration range + is divided into 'n' segments or bins. Below is a basic serial code. .. literalinclude:: exercise/1d_Integration_serial.py @@ -652,5 +766,6 @@ See also .. keypoints:: - - 1 Beaware of GIL and its impact on performance - - 2 Use threads for I/O-bound tasks + - Beaware of GIL and its impact on performance + - Use threads for I/O-bound tasks and multiprocessing for compute-bound tasks + - Make it right before trying to make it fast