|
A message-passing subroutine that involves a group (collection) of processes is a collective communication subroutine. These routines provide capabilities such as:
Note: some collective communication subroutines involve mathematical operation on the data. |
|
Collective communications are guaranteed not to interfere with point-to-point communications nor do they interfere with other collective communications. A collective operation must be executed by every member of a group and must have matching arguments. Several of the collection subroutines specify a root process which generally serves a special function for the operation in question. For example, in a reduction operation, the sum will reside on the root process and, in a broadcast, the root process holds the data to be distributed. In collective communications which require a root process, all members of a group must specify the root process in their calling arguments. In collective communications, the size of the data specified by a sender must match the size of the data specified by a receiver. However, the shape (i.e., the layout in memory) need not be the same.
Except for the barrier subroutine, collective communications do not guarantee synchronization among the processes. A collective call returns as soon as it is safe to write in its data area, that is, it has either buffered the data or finished its participation in the collective effort.
The barrier subroutine, mpi_barrier, waits for all group members of a communicator to enter the barrier call. The syntax is:
call mpi_barrier (icomm, ierr)
where:
| Parameter | Description | Status |
|---|---|---|
icomm
|
communicator (context) | [IN] |
ierr
|
MPI error number (0 if no error) | [IN] |
The broadcast subroutine, mpi_bcast, sends data from the root process to all group members (of a communicator), itself included. All MPI datatypes (e.g., MPI_REAL, MPI_INTEGER, etc.) and derived data types are allowed for the broadcast data. The syntax is:
call mpi_bcast (data, icount, itype, iroot, icomm, ierr)
where:
| Parameter | Description | Status |
|---|---|---|
data
|
If on the root process, this is the data to be sent. If not on the root process, location for the received data. | [IN/OUT] |
icount
|
number of data elements (count) | [IN] |
itype
|
type of data elements (MPI_REAL, MPI_INTEGER, ...) | [IN] |
iroot
|
rank of process with data source | [IN] |
icomm
|
communicator of group members | [IN] |
ierr
|
MPI error number (0 if no error) | [OUT] |
The following section of code distributes the variable N after being read by
the root process (mype = 0 implies root process):
if (mype.eq.0) read*,N
call mpi_bcast (N, 1, MPI_INTEGER, 0, MPI_COMM_WORLD, ierr)
By using the mpi_gather subroutine, the root process collects data
from all members of a group (including itself) and stores the received block
of data from process isrc in location (isrc*icount+1)
of the root's receiving array (rdata). The syntax is:
call mpi_gather (sdata, iscount, istype, rdata, ircount, irtype, iroot, icomm, ierr )
where:
| Parameter | Description | Status |
|---|---|---|
sdata
|
data sent by each member process to root | [IN] |
iscount
|
number of elements in sdata array
|
[IN] |
istype
|
data type of sdata
|
[IN] |
rdata
|
accumulation array on root process | [OUT] |
ircount
|
number of elements to receive from each process | [IN] |
irtype
|
data type of rdata (may have different shape than
sdata
|
[IN] |
iroot
|
rank of process that collects sdata
|
[IN] |
icomm
|
communicator of group members | [IN] |
ierr
|
MPI error number (zero if no error) | [IN] |
The root process accepts sdata from all group members (itself
included), placing that data in its rdata array in rank order,
that is, sdata from process i is stored in locations
sdata(i*ircount+1) to sdata((i+1)*ircount), as
illustrated in the following diagram:

An example of collective communication
The following program gathers vectors from N processes and combines them into a matrix (each vector a column) on process zero (the root process).
Explanation: Array V on process i is filled with the
number i. The gather subroutine on process 0 receives
Vi from each process, placing V0 in
locations A(1,1) to A(N,1), V1 in
locations A(1,2) to A(N,2), etc. Since the
A matrix is order N and the vector size is N too, the N vectors
are stored contiguously in A, and coincidentally as appropriate
column vectors of A. Note that the count value for the accumulation data area,
A, is N, the size of the vector to be received from each process; its value
is not not NxN.
program gather
C Build matrix A from column vectors v; 4 processors, A=4x4.
C MAP: A = [v0,v1,v2,v3] vi = column vector from process I.
C *** Use real( for dble( on 64bit machines
implicit real*8 (a-h,o-z)
parameter (N=4)
dimension a(N,N),v(N)
include 'mpif.h'
C
call mpi_init(ierr)
call mpi_comm_rank(MPI_COMM_WORLD,mype,ierr)
call mpi_comm_size(MPI_COMM_WORLD,npes,ierr)
if(npes.ne.N) stop
C
v=dble(mype)
call mpi_gather(v,N,MPI_REAL8,
& a,N,MPI_REAL8, 0,MPI_COMM_WORLD,ierr)
if(mype.eq.0) write(6,'(4f5.0)') ((a(i,j),j=1,N),i=1,4)
call mpi_finalize(ierr)
end
Results:
0. 1. 2. 3.
0. 1. 2. 3.
0. 1. 2. 3.
0. 1. 2. 3.
A more general gather subroutine, mpi_gatherv, uses two arrays,
ivcnts and ivlocs, to define the number of elements
and the storage location within the collection array. The data types must match.
The syntax is:
call mpi_gatherv (sdata, iscount, istype, rdata, ivrcnt, ivrloc, irtype, iroot, icomm, ierr)
where:
| Parameter | Description | Status |
|---|---|---|
sdata
|
data sent by each member process to root | [IN] |
iscount
|
number of elements in sdata array
|
[IN] |
istype
|
data type of sdata
|
[IN] |
rdata
|
accumulation array on root process | [OUT] |
ivrcnt
|
array of data lengths (length of sdata on processor i
stored in ivrcnt(i))
|
[IN] |
ivrloc
|
array of storage locations (ivrloc(i) is the location in
rdata to store sdata from processor i)
|
[IN] |
irtype
|
data type of rdata
|
[IN] |
iroot
|
process that collects sdata
|
[IN] |
For example, consider the following collection of arrays: array sections A(1-4), A(1-2) and A(1-5) on processors 0, 1, and 2, respectively are to be stored in B on processor 0 in the order A(1-2) A(1-5) A(1-4). The 3-element array for the data lengths and locations will have the values
c ivcnt(1)=2, ivlocs(1)=7 c ivcnt(2)=5, ivlocs(2)=0 c ivcnt(3)=4, ivlocs(3)=2
and the calling statement is:
call mpi_gatherv (a, 1, MPI_REAL8, b, ivcnt, ivlocs, MPI_REAL8, 0, MPI_COMM_WORLD, ierr)
The following code is a more elaborate example which combines partial vectors on different processes into a single large vector.
Explanation: The program concatenates the partial vectors (vp) into a single vector (v) in reverse order, that is, partial vectors vp0, vp1, vp2 and vp3, on processes 0, 1, 2 and 3, are inserted into v in the order vp3, vp2, vp2, vp0. Note that the base storage location for the collection storage [v(1)] is not1 as a FORTRAN programmer would expect but is 0 as a C programmer would use.
program gatherv
C Build v from partial vectors, vp, in reverse order.
C MAP: v=[vp3,vp2,pv1,pv0] 4 processors, vp size=2
C vpi = partial vector from processor i.
C Replace dble( with real( on 64 bit machine.
implicit real*8 (a-h,o-z)
parameter (N=8,NP=2,NPES=N/NP)
dimension v(N),vp(NP),ivcnt(NPES),ivloc(NPES)
include 'mpif.h'
c
call mpi_init(ierr)
call mpi_comm_rank(MPI_COMM_WORLD,mype, ierr)
call mpi_comm_size(MPI_COMM_WORLD,nprocs,ierr)
if(nprocs.ne.NPES) stop 'NPES incorrect'
c
do i = 1,npes
ivcnt(i)= NP
ivloc(i)= N-2*i
enddo
vp=dble(mype)
call mpi_gatherv(vp, NP, MPI_REAL8,
& v, ivcnt,ivloc,MPI_REAL8,0,MPI_COMM_WORLD,ierr)
if(mype.eq.0) then
print*,' ',npes,' processors; partial vector len = ',NP
print*,' Reverse storage, locations =',ivloc
write(6,'(f5.0)') v
endif
call mpi_finalize(ierr)
end
Results for 4 processors; partial vector len = 2
Reverse storage, locations = 6 4 2 0
3.
3.
2.
2.
1.
1.
0.
0.
Operations which accumulate or derive a single result from a list of arguments are reduction operations. For example, the sum of a list of numbers or the determination of the maximum value in a list of numbers are two commonly used reduction operations. The mpi_reduce subroutine performs a reduction operation on data elements residing in different processes. A single mpi_reduce subroutine uses an argument to set the type of reduction operation, either via MPI parameters to designate one of several intrinsic operations or via a reference variable (created by the mpi_op_create subroutine) which points to a function that you define. User-defined reduction operations must be binary and associative.
The mpi_reduce subroutine reduces data elements to a single value and
places the result on the root process. If the input data is a scalar variable,
the
reduction is performed on the set of data variables of all processes within the
communicator group. If the input data is a vector of length N, N reductions
are performed: the data element data(i) on each process is reduced
and stored in result(i) of the root process. The syntax is:
call mpi_reduce (data, result, icount, itype, iop, iroot, icomm, ierr )
where:
| Parameter | Description | Status |
|---|---|---|
data
|
data to be reduced | [IN] |
result
|
reduction result if on root, otherwise no value returned | [OUT] |
icount
|
number of data elements
|
[IN] |
itype
|
type of data elements (MPI_REAL, MPI_INTEGER, ...)
|
[IN] |
iop
|
data reduction operation (as defined below) | [IN] |
iroot
|
rank of process with reduction result | [IN] |
icomm
|
communicator of group members | [IN] |
ierr
|
MPI error number (0 if no error) | [IN] |
Some important pre-defined reduction operations are shown below; but logical and bit-wise operations are neither shown nor discussed here.
Predefined Data Reduction Operations
| Parameter Name | Meaning |
|---|---|
| MPI_SUM | sum |
| MPI_PROD | product |
| MPI_MAX | find maximum value |
| MPI_MIN | find minimum value |
| MPI_MAXLOC | find maximum value and location |
| MPI_MINLOC | find minimum value and location |
There are three variations of the mpi_reduce subroutine:
- mpi_allreduce
-
mpi_allreduce distributes the reduction result(s) to all processes in
the group and has the same argument list and definitions as mpi_reduce
except for the deletion of the root argument.
- mpi_reduce_scatter
-
mpi_reduce_scatter is functionally equivalent to performing an
mpi_reduce opertion on a vector, and then scattering N segments
of the result vector to N processes (N is the number of processes,
segment lengths are described in an index array, and segment i is
placed on process i).
- mpi_scan
-
mpi_scan performs a reduction sequentially from process 0 to process
N-1, leaving the accumulated result in place on each process.
As with mpi_allreduce subroutine, no root designation is used.
Explanation: The variable value is set to the process
rank. mpi_allreduce sums the value of all 8 processes
and returns the sum to all processes in the variable sum.
C
C -- Sum to all --
C Program sums a from all processes.
C Each process ends up with the summed value.
C
program sum2all
implicit real*8 (a-h,o-z)
include 'mpif.h'
C
icomm=MPI_COMM_WORLD
call mpi_init(ierr)
call mpi_comm_rank(icomm,mype,ierr)
call mpi_comm_size(icomm,npes,ierr)
value = dble(mype)
call mpi_allreduce(value,sum,1,MPI_REAL8,MPI_SUM,icomm,ierr)
ncalc=(npes-1 +mod(npes,2))*(npes/2)
print*,' pe# sum calc. sum = ',mype,sum,ncalc
call mpi_finalize(ierr)
end



