252 5 Message-Passing Programming
be used if the executing process will change the value of window entries using
MPI
Put() and if these entries could also be accessed by other processes.
A shared lock is indicated by lock
type = MPI LOCK SHARED. This lock
type guarantees that the following RMA operations of the calling process are pro-
tected from exclusive RMA operations of other processes, i.e., other processes are
not allowed to change entries of the window via RMA operations that are protected
by an exclusive lock. But other processes are allowed to perform RMA operations
on the same window that are also protected by a shared lock.
Shared locks should be used if the executing process accesses window entries
only by MPI
Get() or MPI Accumulate(). When a process wants to read or
manipulate entries of its local window using local operations, it must protect these
local operations with a lock mechanism, if these entries can also be accessed by
other processes.
An access epoch started by MPI
Win lock() for a window win can be termi-
nated by calling the MPI function
int MPI
Win unlock (int rank,
MPI
Win win)
where rank is the rank of the target process. The call of this function blocks until
all RMA operations issued by the calling process on the specified window have been
completed both at the calling process and at the target process. This guarantees that
all manipulations of window entries issued by the calling process have taken effect
at the target process.
Example The use of lock synchronization for the iterative computation of a dis-
tributed data structure is illustrated in the following example which is a variation
of the previous examples. Here, an exclusive lock is used to protect the RMA
operations:
while (!converged (A)) {
update(A);
update
buffer(A, from buf);
MPI
Win start(target group, 0, win);
for (i=0; i<num
neighbors; i++) {
MPI
Win lock(MPI LOCK EXCLUSIVE, neighbor[i], 0, win);
MPI
Put(&from buf[i], size[i], MPI INT, neighbor[i], to
disp[i],
size[i], MPI
INT, win);
MPI
Win unlock(neighbor[i], win);
}
}
5.5 Exercises for Chap. 5
Exercise 5.1 Consider the following incomplete piece of an MPI program:
5.5 Exercises for Chap. 5 253
int rank, p, size=8;
int left, right;
char send
buffer1[8], recv buffer1[8];
char send
buffer2[8], recv buffer2[8];
MPI
Comm rank(MPI COMM WORLD, 8 rank);
MPI
Comm size(MPI COMM WORLD, & p);
left = (rank-1 + p) % p;
right = (rank+1) % p;
MPI
Send(send buffer1, size, MPI CHAR, left, );
MPI
Recv(recv buffer1, size, MPI CHAR, right, );
MPI
Send(send buffer2, size, MPI CHAR, right, );
MPI
Recv(recv buffer2, size, MPI CHAR, left, );
(a) In the program, the processors are arranged in a logical ring and each
processor should exchange its name with its neighbor to the left and its neighbor
to the right. Assign a unique name to each MPI process and fill out the missing
pieces of the program such that each process prints its own name as well as its
neighbors’ names.
(b) In the given program piece, the MPI
Send() and MPI Recv() operations
are arranged such that depending on the implementation a deadlock can occur.
Describe how a deadlock may occur.
(c) Change the program such that no deadlock is possible by arranging the order of
the MPI
Send() and MPI Recv() operations appropriately.
(d) Change the program such that MPI
Sendrecv() is used to avoid deadlocks.
(e) Change the program such that MPI
Isend() and MPI Irecv() are used.
Exercise 5.2 Consider the MPI program in Fig. 5.3 for the collection of distributed
data block with point-to-point messages. The program assumes that all data blocks
have the same size blocksize. Generalize the program such that each process can
contribute a data block of a size different from the data blocks of the other processes.
To do so, assume that each process has a local variable which specifies the size of
its data block.
(Hint: First make the size of each data block available to each process in a
pre-collection phase with a similar communication pattern as in Fig. 5.3 and then
perform the actual collection of the data blocks.)
Exercise 5.3 Modify the program from the previous exercise for the collection of
data blocks of different sizes such that no pre-collection phase is used. Instead,
use MPI
Get count() to determine the size of the data block received in each
step. Compare the resulting execution time with the execution time of the program
254 5 Message-Passing Programming
from the previous exercise for different data block sizes and different numbers of
processors. Which of the programs is faster?
Exercise 5.4 Consider the program Gather
ring() from Fig. 5.3. As described
in the text, this program does not avoid deadlocks if the runtime system does not
use internal system buffers. Change the program such that deadlocks are avoided in
any case by arranging the order of the MPI
Send() and MPI Recv() operations
appropriately.
Exercise 5.5 The program in Fig. 5.3 arranges the processors logically in a ring to
perform the collection. Modify the program such that the processors are logically
arranged in a logical two-dimensional torus network. For simplicity, assume that all
data blocks have the same size. Develop a mechanism with which each processor
can determine its predecessor and successor in x and y directions. Perform the col-
lection of the data blocks in two phases, the first phase with communication in x
direction, the second phase with communication in y direction.
In both directions, communication in different rows or columns of the processor
torus can be performed concurrently. For the communication in y direction, each
process distributes all blocks that it has collected in the x direction phase. Use the
normal blocking send and receive operations for the communication. Compare the
resulting execution time with the execution time of the ring implementation from
Fig. 5.3 for different data block sizes and different numbers of processors. Which of
the programs is faster?
Exercise 5.6 Modify the program from the previous exercise such that non-blocking
communication operations are used.
Exercise 5.7 Consider the parallel computation of a matrix–vector multiplication
A · b using a distribution of the scalar products based on a rowwise distribution of
A, see Fig. 3.10, p. 127 for a sketch of a parallel pseudo program. Transform this
program into a running MPI program. Select the MPI communication operations for
the multi-broadcast operations appropriately.
Exercise 5.8 Similar to the preceding exercise, consider a matrix–vector multipli-
cation using a distribution of the linear combinations based on a columnwise distri-
bution of the matrix. Transform the pseudo program from Fig. 3.12, p. 129 to a run-
ning MPI program. Use appropriate MPI operations for the single-accumulation and
single-broadcast operations. Compare the execution time with the execution time of
the MPI program from the preceding exercise for different sizes of the matrix.
Exercise 5.9 For a broadcast operation a root process sends the same data block to
all other processes. Implement a broadcast operation by using point-to-point send
and receive operations (MPI
Send() and MPI Recv()) such that the same effect
as MPI
Bcast() is obtained. For the processes, use a logical ring arrangement
similar to Fig. 5.3.
Exercise 5.10 Modify the program from the previous exercise such that two other
logical arrangements are used for the processes: a two-dimensional mesh and a
5.5 Exercises for Chap. 5 255
three-dimensional hypercube. Measure the execution time of the three different ver-
sions (ring, mesh, hypercube) for eight processors for different sizes of the data
block and make a comparison by drawing a diagram. Use MPI
Wtime() for the
timing.
Exercise 5.11 Consider the construction of conflict-free spanning trees in a d-
dimensional hypercube network for the implementation of a multi-broadcast opera-
tion, see Sect. 4.3.2, p. 177, and Fig. 4.6. For d = 3, d = 4, and d = 5 write an MPI
program with 8, 16, and 32 processes, respectively, that uses these spanning trees
for a multi-broadcast operation.
(a) Implement the multi-broadcast by concurrent single-to-single transfers along
the spanning trees and measure the resulting execution time for different mes-
sage sizes.
(b) Implement the multi-broadcast by using multiple broadcast operations where
each broadcast operation is implemented by single-to-single transfers along the
usual spanning trees for hypercube networks as defined in p. 174, see Fig. 4.4.
These spanning trees do not avoid conflicts in the network. Measure the result-
ing execution time for different message sizes and compare them with the exe-
cution times from (a).
(c) Compare the execution times from (a) and (b) with the execution time of an
MPI
Allgather() operation to perform the same communication.
Exercise 5.12 For a global exchange operation, each process provides a potentially
different block of data for each other process, see pp. 122 and 225 for a detailed
explanation. Implement a global exchange operation by using point-to-point send
and receive operations (MPI
Send() and MPI Recv()) such that the same effect
as MPI
Alltoall() is obtained. For the processes, use a logical ring arrangement
similar to Fig. 5.3.
Exercise 5.13 Modify the program Gather
ring() from Fig. 5.3 such that syn-
chronous send operations (MPI
Send() and MPI Recv()) are used. Compare the
resulting execution time with the execution time obtained for the standard send and
receive operations from Fig. 5.3.
Exercise 5.14 Repeat the previous exercise with buffered send operations.
Exercise 5.15 Modify the program Gather
ring() from Fig. 5.3 such that the
MPI operation MPI
Test() is used instead of MPI Wait(). When a non-blocking
receive operation is found by MPI
Test() to be completed, the process sends the
received data block to the next process.
Exercise 5.16 Write an MPI program which implements a broadcast operation with
MPI
Send() and MPI Recv() operations. The program should use n = 2
k
pro-
cesses which should logically be arranged as a hypercube network. Based on this
arrangement the program should define a spanning tree in the network with root 0,
see Fig. 3.8 and p. 123, and should use this spanning tree to transfer a message
256 5 Message-Passing Programming
stepwise from the root along the tree edges up to the leaves. Each node in the
tree receives the message from its parent node and forwards it to its child nodes.
Measure the resulting runtime for different message sizes up to 1 MB for different
numbers of processors using MPI
Wtime() and compare the execution times with
the execution times of MPI
Bcast() performing the same operation.
Exercise 5.17 The execution time of point-to-point communication operations
between two processors can normally be described by a linear function of the form
t
s2s
(m) = τ +t
c
·m,
where m is the size of the message; τ is a startup time, which is independent of the
message size; and t
c
is the inverse of the network bandwidth. Verify this function by
measuring the time for a ping-pong message transmission where process A sends a
message to process B, and B sends the same message back to A. Use different mes-
sage sizes and draw a diagram which shows the dependence of the communication
time on the message size. Determine the size of τ and t
c
on your parallel computer.
Exercise 5.18 Write an MPI program which arranges 24 processes in a (periodic)
Cartesian grid structure of dimension 2×3×4usingMPI
Cart create(). Each
process should determine and print the process rank of its two neighbors in x, y, and
z directions.
For each of the three sub-grids in y-direction, a communicator should be defined.
This communicator should then be used to determine the maximum rank of the
processes in the sub-grid by using an appropriate MPI
Reduce() operation. This
maximum rank should be printed out.
Exercise 5.19 Write an MPI program which arranges the MPI processes in a two-
dimensional torus of size
√
p ×
√
p where p is the number of processes. Each
process exchanges its rank with its two neighbors in x and y dimensions. For the
exchange, one-sided communication operations should be used. Implement three
different schemes for the exchange with the following one-sided communication
operations:
(a) global synchronization with MPI
Win fence();
(b) loose synchronization by using MPI
Win start(), MPI Win post(),
MPI
Win complete(), and MPI Win wait();
(c) lock synchronization with MPI
Win lock() and MPI Win unlock().
Test your program for p = 16 processors, i.e., for a 4 × 4 torus network.
Chapter 6
Thread Programming
Several parallel computing platforms, in particular multicore platforms, offer a
shared address space. A natural programming model for these architectures is a
thread model in which all threads have access to shared variables. These shared
variables are then used for information and data exchange. To coordinate the access
to shared variables, synchronization mechanisms have to be used to avoid race con-
ditions in case of concurrent accesses. Basic synchronization mechanisms are lock
synchronization and condition synchronization, see Sect. 3.7 for an overview.
In this chapter, we consider thread programming in more detail. In particu-
lar, we have a closer look at synchronization problems like deadlocks or prior-
ity inversion that might occur and present programming techniques to avoid such
problems. Moreover, we show how basic synchronization mechanisms like lock
synchronization or condition synchronization can be used to build more complex
synchronization mechanisms like read/write locks. We also present a set of paral-
lel patterns like task-based or pipelined processing that can be used to structure a
parallel application. These issues are considered in the context of popular program-
ming environments for thread-based programming to directly show the usage of the
mechanisms in practice. The programming environments Pthreads, Java threads, and
OpenMP are introduced in detail. For Java, we also give an overview of the pack-
age java.util.concurrent which provides many advanced synchronization
mechanisms as well as a task-based execution environment. The goal of the chapter
is to enable the reader to develop correct and efficient thread programs that can be
used, for example, on multicore architectures.
6.1 Programming with Pthreads
POSIX threads (also called Pthreads) define a standard for the programming with
threads, based on the programming language C. The threads of a process share a
common address space. Thus, the global variables and dynamically generated data
objects can be accessed by all threads of a process. In addition, each thread has a
separate runtime stack which is used to control the functions activated and to store
their local variables. These variables declared locally within the functions are local
T. Rauber, G. R
¨
unger, Parallel Programming,
DOI 10.1007/978-3-642-04818-0
6,
C
Springer-Verlag Berlin Heidelberg 2010
257
258 6 Thread Programming
data of the executing thread and cannot be accessed directly by other threads. Since
the runtime stack of a thread is deleted after a thread is terminated, it is dangerous
to pass a reference to a local variable in the runtime stack of a thread A to another
thread B.
The data types, interface definitions, and macros of Pthreads are usually available
via the header file <pthread.h>. This header file must therefore be included into
a Pthreads program. The functions and data types of Pthreads are defined according
to a naming convention. According to this convention, Pthreads functions are named
in the form
pthread[
<object>] <operation> (),
where <operation> describes the operation to be performed and the optional
<object> describes the object to which this operation is applied. For example,
pthread
mutex init() is a function for the initialization of a mutex variable;
thus, the <object> is mutex and the <operation> is init;wegiveamore
detailed description later.
For functions which are involved in the manipulation of threads, the specification
of <object> is omitted. For example, the function for the generation of a thread
is pthread
create(). All Pthread functions yield a return value 0, if they are
executed without failure. In case of a failure, an error code from <error.h> will
be returned. Thus, this header file should also be included in the program. Pthread
data types describe, similarly to MPI, opaque objects whose exact implementation
is hidden from the programmer. Data types are named according to the syntax form
pthread
<object> t,
where <object> specifies the specific data object. For example, a mutex variable
is described by the data type pthread
mutex t.If<object> is omitted, the
data type pthread
t for threads results. The following table contains important
Pthread data types which will be described in more detail later.
Pthread data types Meaning
pthread t Thread ID
pthread
mutex t Mutex variable
pthread
cond t Condition variable
pthread
key t Access key
pthread
attr t Thread attributes object
pthread
mutexattr t Mutex attributes object
pthread
condattr t Condition variable attributes object
pthread
once t One-time initialization control context
For the execution of threads, we assume a two-step scheduling method accord-
ing to Fig. 3.16 in Chap. 3, as this is the most general case. In this model, the
programmer has to partition the program into a suitable number of user threads
which can be executed concurrently with each other. The user threads are mapped
6.1 Programming with Pthreads 259
by the library scheduler to system threads which are then brought to execution on the
processors of the computing system by the scheduler of the operating system. The
programmer cannot control the scheduler of the operating system and has only little
influence on the library scheduler. Thus, the programmer cannot directly perform the
mapping of the user-level threads to the processors of the computing system, e.g.,
by a scheduling at program level. This facilitates program development, but also
prevents an efficient mapping directly by the programmer according to his specific
needs. It should be noted that there are operating system–specific extensions that
allow thread execution to be bound to specific processors. But in most cases, the
scheduling provided by the library and the operating system leads to good results
and relieves the programmer from additional programming effort, thus providing
more benefits than drawbacks.
In this section, we give an overview of the programming with Pthreads. Sec-
tion 6.1.1 describes thread generation and management in Pthreads. Section 6.1.2
describes the lock mechanism for the synchronization of threads accessing shared
variables. Sections 6.1.3 and 6.1.4 introduce Pthreads condition variables and an
extended lock mechanism using condition variables, respectively. Sections 6.1.6,
6.1.7, and 6.1.8 describe the use of the basic synchronization techniques in the
context of more advanced synchronization patterns, like task pools, pipelining, and
client–server coordination. Section 6.1.9 discusses additional mechanisms for the
control of threads, including scheduling strategies. We describe in Sect. 6.1.10 how
the programmer can influence the scheduling controlled by the library. The phe-
nomenon of priority inversion is then explained in Sect. 6.1.11 and finally thread-
specific data is considered in Sect. 6.1.12. Only the most important mechanisms
of the Pthreads standard are described; for a more detailed description, we refer to
[25, 105, 117, 126, 143].
6.1.1 Creating and Merging Threads
When a Pthreads program is started, a single main thread is active, executing the
main() function of the program. The main thread can generate more threads by
calling the function
int pthread
create (pthread t
*
thread,
const pthread
attr t
*
attr,
void
*
(
*
start
routine)(void
*
),
void
*
arg).
The first argument is a pointer to an object of type pthread
t which is also
referred to as thread identifier (TID); this TID is generated by pthread
create()
and can later be used by other Pthreads functions to identify the generated thread.
The second argument is a pointer to a previously allocated and initialized attribute
object of type pthread
attr t, defining the desired attributes of the generated
thread. The argument value NULL causes the generation of a thread with default
260 6 Thread Programming
attributes. If different attribute values are desired, an attribute data structure has to
be created and initialized before calling pthread
create(); this mechanism
is described in more detail in Sect. 6.1.9. The third argument specifies the function
start
routine() which will be executed by the generated thread. The specified
function should expect a single argument of type void
*
and should have a return
value of the same type. The fourth argument is a pointer to the argument value with
which the thread function start
routine() will be executed.
To execute a thread function with more than one argument, all arguments must
be put into a single data structure; the address of this data structure can then be
specified as argument of the thread function. If several threads are started by a parent
thread using the same thread function but different argument values, separate data
structures should be used for each of the threads to specify the arguments. This
avoids situations where argument values are overwritten too early by the parent
thread before they are read by the child threads or where different child threads
manipulate the argument values in a common data structure concurrently.
A thread can determine its own thread identifier by calling the function
pthread
t pthread self().
This function returns the thread ID of the calling thread. To compare the thread ID
of two threads, the function
int pthread
equal (pthread t t1, pthread tt2)
can be used. This function returns the value 0 if t1 and t2 do not refer to the same
thread. Otherwise, a non-zero value is returned. Since pthread
t is an opaque
data structure, only pthread
equal should be used to compare thread IDs. The
number of threads that can be generated by a process is typically limited by the sys-
tem. The Pthreads standard determines that at least 64 threads can be generated by
any process. But depending on the specific system used, this limit may be larger. For
most systems, the maximum number of threads that can be started can be determined
by calling
maxThreads = sysconf (
SC THREAD THREADS MAX)
in the program. Knowing this limit, the program can avoid to start more than
maxThreads threads. If the limit is reached, a call of the pthread
create()
function returns the error value EAGAIN. A thread is terminated if its thread func-
tion terminates, e.g., by calling return. A thread can terminate itself explicitly by
calling the function
void pthread
exit (void
*
valuep)
The argument valuep specifies the value that will be returned to another thread
which waits for the termination of this thread using pthread
join(). When
6.1 Programming with Pthreads 261
a thread terminates its thread function, the function pthread exit() is called
implicitly, and the return value of the thread function is used as argument of this
implicit call of pthread
exit(). After the call to pthread exit(), the call-
ing thread is terminated, and its runtime stack is freed and can be used by other
threads. Therefore, the return value of the thread should not be a pointer to a local
variable of the thread function or another function called by the thread function.
These local variables are stored on the runtime stack and may not exist any longer
after the termination of the thread. Moreover, the memory space of local variables
can be reused by other threads, and it can usually not be determined when the mem-
ory space is overwritten, thereby destroying the original value of the local variable.
Instead of a local variable, a global variable or a variable that has been dynamically
allocated should be used.
A thread can wait for the termination of another thread by calling the function
int pthread
join (pthread t thread, void
**
valuep).
The argument thread specifies the thread ID of the thread for which the call-
ing thread waits to be terminated. The argument valuep specifies a memory
address where the return value of this thread should be stored. The thread call-
ing pthread
join() is blocked until the specified thread has terminated. Thus,
pthread
join() provides a possibility for the synchronization of threads. After
the thread with TID thread has terminated, its return value is stored at the
specified memory address. If several threads wait for the termination of the same
thread, using pthread
join(), all waiting threads are blocked until the specified
thread has terminated. But only one of the waiting threads successfully stores the
return value. For all other waiting threads, the return value of pthread
join()
is the error value ESRCH. The runtime system of the Pthreads library allocates
for each thread an internal data structure to store information and data needed to
control the execution of the thread. This internal data structure is preserved by
the runtime system also after the termination of the thread to ensure that another
thread can later successfully access the return value of the terminated thread using
pthread
join().
After the call to pthread
join(), the internal data structure of the terminated
thread is released and can no longer be accessed. If there is no pthread
join()
for a specific thread, its internal data structure is not released after its termination
and occupies memory space until the complete process is terminated. This can be
a problem for large programs with many thread creations and terminations without
corresponding calls to pthread
join(). The preservation of the internal data
structure of a thread after its termination can be avoided by calling the function
int pthread
detach (pthread t thread).
This function notifies the runtime system that the internal data structure of the thread
with TID thread can be detached as soon as the thread has terminated. A thread
may detach itself, and any thread may detach any other thread. After a thread has