Distributing computations among computers with MPI

Overview

Teaching: 45 min
Exercises: 10 min
Questions
  • What issued the message passing interface (MPI)?

  • How do I exploit parallelism using the message passing interface (MPI)?

Objectives
  • Explain how message passing allows performing computations in more than 1 computer at the same time.

  • Observe the effects of parallel execution of commands with a simple hostname call.

  • Measure the run time of parallel and MPI version of the implementation.

Lola Lazy is now confident enough to work with the batch system of the cluster. She now turns her attention to the problem at hand, i.e. estimating the value of Pi to very high precision.

One of her more experienced colleagues has suggested to her, to use the Message Passing Interface (in short: MPI) for that matter. As she has no prior knowledge in the field, accepting this advice is as good as trying some other technique on her how. She first explores the documentation of MPI a bit to get a feeling about the philosophy behind this approach.

Message Passing Interface

A long time before we had smart phones, tablets or laptops, compute clusters were already around and consisted of interconnected computers that had merely enough memory to show the first two frames of a movie (2*1920*1080*4 Bytes = 16 MB). However, scientific problems back than were equally demanding more and more memory than today. To overcome the lack of available hardware memory, specialists from academia and industry came about with the idea to consider the memory of several interconnected compute nodes as one. Given a standardized software that synchronizes the various states of memory between the client/slave nodes during the execution of driver application through the network interfaces. With this performing large calculations that required more memory than each individual cluster node can offer was possible. Moreover, this technique by passing messages (hence Message Passing Interface or MPI) on memory updates in a controlled fashion allowed to write parallel programs that were capable of running on a diverse set of cluster architectures.

Schematic View of a Compute Cluster with 4 nodes (12 cores each)

Lola becomes curious. She wants to experiment with this parallelization technique a bit. For this, she would like to print the name of the node where a specific driver application is run.

$ cat call_hostname.sh
#!/bin/bash

###                                                                                                                                                                                       
#SBATCH --job-name=mpi_hostname
#SBATCH --output=mpi_hostname.out.%J.%N
#SBATCH --error=mpi_hostname.err.%J.%N
# specify our current project
# replace XXXX with the code provided by your instructor
#SBATCH --account=scwXXXX
# specify the reservation we have for the training workshop
# remove this for your own work
# replace XXXX and YY with the code provided by your instructor
#SBATCH --reservation=scwXXXX_YY
# specify the partition, change to dev on Hawk
#SBATCH --partition=development
###
module load mpi
mpirun hostname
$ sbatch -n 4 mpi_hostname.sh

The log file that is filled by the command above, contains the following lines after finishing the job:

scs0001
scs0001
scs0001
scs0001

The output makes her wonder. Apparently, the command was cloned and executed on the same host 4 times. If she increases the number of processors to a number larger than the number of CPU cores each of here nodes has, this might change and the distributed nature of mpirun will reveal itself.

$ sbatch -n 16 mpi_hostname.sh
scs0001
scs0001
scs0001
scs0001
scs0001
scs0001
scs0001
scs0001
scs0001
scs0001
scs0001
scs0002
scs0001
scs0002
scs0002
scs0002

Execution of `mpirun hostname` on a Compute Cluster with 4 nodes (12 cores each)

As the figure above shows, 12 instances of hostname were called on scs0001 and 4 more on scs0002. Strange though, that the last 5 lines are not ordered correctly. Upon showing this result to her colleague, the latter explains: even though, the hostname command is run in parallel across the 2 nodes that are used here, the output of her 16 hostname calls need to be merged into one output file (that she called call_hostname.out) at the end. This synchronization performed by the mpirun application is not guaranteed to happen in an ordered fashion (how could it be as the commands were issued in parallel). Her colleague explains, that the hostname application itself is not aware of MPI in a way that it is not parallelized with it. Thus, the mpirun driver simply accesses the nodes that it is allowed to run on by the batch system and launches the hostname app. After that, mpirun collects the output of the executed commands at completion and writes it to the defined output file call_hostname.out.

Like a reflex, Lola asks how to write these MPI programs. Her colleague points out that she needs to program the languages that MPI supports, such as FORTRAN, C, C++, python and many more. As Lola is most confident with python, her colleague wants to give her a head start using mpi4py and provides a minimal example. This example is analogous to what Lola just played with. This python script called print_hostname.py prints the number of the current MPI rank (i.e. the unique id of the execution thread within one mpirun invocation), the total number of MPI ranks available and the hostname this rank is currently run on.

Before we can run Mpi4py programs we need to install the module through pip3. Installing the pip module also requires the mpi module to be loaded. The following commands will install mpi4py.

$ module load mpi
$ module load python/3.7.0
$ wget https://supercomputingwales.github.io/SCW-tutorial/code/print_hostname.py
$ pip3 install --user mpi4py

Now we can run the program by putting the following into py_mpi_hostname.sh

#!/bin/bash


###                                                                                                                                                                                       
#SBATCH --job-name=py_mpi_hostname
#SBATCH --output=py_mpi_hostname.out.%J.%N
#SBATCH --error=py_mpi_hostname.err.%J.%N
# specify our current project
# replace XXXX with the code provided by your instructor
#SBATCH --account=scwXXXX
# specify the reservation we have for the training workshop
# remove this for your own work
# replace XXXX and YY with the code provided by your instructor
#SBATCH --reservation=scwXXXX_YY
# specify the partition, change to dev on Hawk
#SBATCH --partition=development
###

module load mpi
module load python/3.7.0
mpirun python3 print_hostname.py

and running it with:

$ sbatch -n 16 py_mpi_hostname.sh
this is rank =  7 (total: 16) running on scs0028
this is rank =  0 (total: 16) running on scs0027
this is rank = 14 (total: 16) running on scs0030
this is rank =  6 (total: 16) running on scs0028
this is rank =  1 (total: 16) running on scs0027
this is rank = 15 (total: 16) running on scs0030
this is rank =  4 (total: 16) running on scs0028
this is rank =  2 (total: 16) running on scs0027
this is rank = 12 (total: 16) running on scs0030
this is rank =  5 (total: 16) running on scs0028
this is rank =  8 (total: 16) running on scs0029
this is rank = 13 (total: 16) running on scs0030
this is rank =  9 (total: 16) running on scs0029
this is rank = 10 (total: 16) running on scs0029
this is rank = 11 (total: 16) running on scs0029
this is rank =  3 (total: 16) running on scs0027

Again, the unordered output is visible. Now, the relation between the rank and the parameters -n to submit command becomes more clear. -n defines how many processors the current invocation of mpirun requires. With -n 16 the rank can run from 0 to 15.

Does mpirun really execute commands in parallel?

Launch the command date 16 times across your cluster. What do you observe? Play around with the precision of date through its flags (+%N for example) and study the distribution of the results.

Solution

#!/bin/sh
###
#SBATCH --job-name=mpi_date
#SBATCH --output=mpi_date.out.%J.%N
#SBATCH --error=mpi_date.err.%J.%N
#SBATCH --account=scwXXXX
#SBATCH --reservation=scwXXXX_YY
#SBATCH --partition=development
###
module load mpi
mpirun date +%M:%S.%N

Upgrade print_hostname.py and print the time-of-day as well

Download the print_hostname.py script if you haven’t already (wget https://supercomputingwales.github.io/SCW-tutorial/code/print_hostname.py) Open the print_hostname.py script with your editor and use the python3 datetime module to print the time of day next to the host name and rank number.

Solution

There are lots of possibly variations on this. Here’s a simple one:

from datetime import datetime
print(str(datetime.now()))

or

import time
print(time.strftime("%d/%m/%Y %H:%M:%S",time.gmtime()))

or

import time
print(time.time())

this gives seconds and microseconds since January 1st 1970

To finalize this day’s work, Lola wants to tackle distributed memory parallelization using the Message Passing Interface (MPI). For this, she uses the mpi4py library that is pre-installed on her cluster. She again starts from the serial implementation. At first, she expands the include statements a bit.

from mpi4py import MPI

comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()

These 4 lines will be very instrumental through out the entire MPI program. The entire MPI software stack builds upon the notion of a communicator. Here, we see the MPI.COMM_WORLD communicator by which all processes that are created talk to each other. We will use it as a hub to initiate communications among all participating processes. Subsequently, we ask comm how many participants are connected by calling comm.Get_size(). Then we’ll ask the communicator, what rank the current process is comm.Get_rank(). And with this, Lola has entered the dungeon of MPI.

Every Line Is Running in Parallel!

As discussed in the previous section, a call to <your scheduler> mpirun <your program> will do the following: - mpirun will obtain a list of available nodes from the scheduler - mpirun will then ssh onto these nodes for you and instantiate a local mpirun there - this local mpirun will execute <your program> in parallel to all the others and call every line of it from top to bottom - only if your program reaches a statement of the form comm.do_something(...), your program will start communicating through the mpi library with the other mpi processes; this communication can entail point-to-point data transfers or collective data transfers (that’s why it’s called ‘message passing’ because MPI does nothing else than provide mechanism to send messages around the cluster), depending on the type of communication, the MPI library might make your program wait until the all message passing has been completed In case you want to do something only on one rank specifically, you can do that by:

if rank == 0:
    print("Hello World")

Pushing the implementation further, the list of partitions needs to be established similar to what was done in the parallel implementation above. Also a list for the results is created and all items are initialized to 0.

if rank == 0:
    partitions = [ int(n_samples/size) for item in range(size)]
    counts = [ int(0) ] *size
else:
    partitions = None
    counts = None

In this example, you can see how the lists are only created on one rank for now (rank 0 to be precise). At this, point the contents of partitions and counts reside on rank 0 only. They now have to send to all other participating ranks.

partition_item = comm.scatter(partitions, root=0)
count_item = comm.scatter(counts, root=0)

Note how the input variable is partitions (aka a list of values) and the output variable is named partition_item. This is because, mpi4py returns only one item (namely the one item in partitions matching the rank of the current process, i.e. partitions[rank]) rather than the full list. Now, the actual work can be done.

count_item = inside_circle(partition_item)

This is the known function call from the serial implementation. After this, the results have to be communicated back again.

counts = comm.gather(count_item, root=0)

The logic from above is reverted now. A single item is used as input, aka count_item, and the result counts is a list again. In order to compute pi from this, the following operations should be restricted to rank=0 in order to minimize redundant operations:

if rank == 0:
    my_pi = 4.0 * sum(counts) /n_samples

And that’s it. Now, Lola can submit her first MPI job. Download the full code to try it yourself.

You can download it on the HPC with the command:

wget https://supercomputingwales.github.io/SCW-tutorial/code/mpi_numpi.py
#!/bin/bash

###                                                                                                                                                                                       
# job name
#SBATCH --job-name=mpi_numpi
# job stdout file
#SBATCH --output=mpi_numpi.out.%J.%N
# job stderr file
#SBATCH --error=mpi_numpi.err.%J.%N
# specify our current project
# replace XXXX with the code provided by your instructor
#SBATCH --account=scwXXXX
# specify the reservation we have for the training workshop
# remove this for your own work
# replace XXXX and YY with the code provided by your instructor
#SBATCH --reservation=scwXXXX_YY
# specify the partition, change to dev on Hawk
#SBATCH --partition=development
###

module load python/3.7.0
module load mpi
mpirun python3 mpi_numpi.py 1000000000                          

$ sbatch -n 48 mpi_numpi.sh

The output file mpi_numpi.out yields the following lines:

[     mpi version] required memory 11444.092 MB
[using  48 cores ] pi is 3.141679 from 1000000000 samples

real    0m3.924s
user    0m0.083s
sys     0m0.165s

Note here, that we are now free to scale this application to hundreds of cores if we wanted to. We are only restricted by Amdahl’s law, the size of our compute cluster and any limits the administrators apply (on Supercomputing Wales we can only use 26 nodes at once). Before finishing the day, Lola looks at the run time that her MPI job consumed. 3.92 seconds for a job that ran on six times as much cores as here parallel implementation before (which took 56s for the same configuration). To test the performance and work out how many cores she should use, she decided to write a small script which varied the number of cores being used.

for i in {seq 1 48} ; do
    sbatch -n $i mpi_numpi.sh
    sleep 2m
done

She collects the results into a spreadsheet and graphs them. For comparison she also graphs the performance of the PyMP job running on a single node. These show that

That is quite an achievement of the day!

Performance vs Core Count MPI

Performance vs Core Count PyMP

Why isn’t the graph smooth

The MPI graph shown above doesn’t a smooth curve like the PyMP one does when the number of cores are increased. Why might this be the case?

Solution

Some of the jobs will have run on the same nodes, others will have run across multiple nodes where data access is much slower. Exclusive use of the node wasn’t requested either so other jobs may have impacted our performance.

Use the batch system!

Launch the serial and MPI version of the pi_estimate using the batch system. Compare the run times of each. Note that with less than 100,000,000 samples there won’t be significant differences between runtimes.

Don’t Stress the Network

The MPI implementation given above transmits only the number of points in the circle to the main program. Rewrite the program so that each rank generates the random numbers and sends them back to rank 0.

Submit the job and look at the time it took. What do you observe? Why did the run time change?

Solution

The performance gets significantly worse as a lot more data needs to be sent. When running on different nodes this will particularly bad as transferring data over the interconnect is much slower than locally.

Key Points

  • The MPI driver mpirun sends compute jobs to a set of allocated computers.

  • The MPI software then executes these jobs on the remote hosts and synchronizes their state/memory.

  • MPI assigns a rank to each process, usually the one with a rank of zero does the coordination

  • MPI can be used to split a task into components and have several nodes run them.