Dynamic load balancing among processors in a parallel computer6292822Abstract A parallel programming system implements dynamic load balancing to distribute processing workload to available processors in a parallel computer. A preprocessor in the system converts a nested parallel program into sequential code executable on processors of the parallel computer and calls to a message passing interface for inter-processor communication among the processors. When processing a nested parallel program, the preprocessor inserts a test function to evaluate the computational cost of a function call. At runtime, processors evaluate the test function to determine whether to ship a function call to another processor. This approach enables processors to offload function calls to other available processors in cases where it is more efficient to incur the cost of shipping the function call and receiving the results than it is to process the function call on the original processor. Claims I claim: Description FIELD OF THE INVENTION
vec_double quicksort (vec_double s)
{
double pivot;
vec_double les, eqi, grt, left, right, result;
if (length (s) < 2) {
return s;
} else {
pivot = get (s, length (s) / 2);
les = {x : x in s .vertline. x < pivot};
eql = {x : x in s .vertline. x == pivot};
grt = {x : x in s .vertline. x > pivot};
free (s);
split (left = quicksort (les.),
right = quicksort (grt));
result = append (left, eqi, right);
free (left); free (eqi); free (right);
return result;
}
}
1. The Machiavelli function length (vector) returns the length of a vector. Here it is used both to test for the base case, and to select the middle element of the vector, for use as the pivot. 2. The Machiavelli function get (vector, index) extracts an element of a vector. Here it is used to extract the pivot element. 3. The apply-to-each operator {expr: elt in vec.vertline.cond} is used for data-parallel expressions. It is read as "in parallel, for each element elt in the vector vec such that the condition cond holds, return the expression expr". Here the apply-to-each operator is being used to select out the elements less than, equal to, and greater than the pivot, to form three new vectors. 4. The Machiavelli function, limited by C's type system, is specialized for a particular type (doubles in this case). 5. in Machiavelli, a vector is specified by prepending the name of a type with vec_. 6. In Machiavelli, vectors are explicitly freed as reflected in the free function call. 7. Rather than allowing the application of parallel functions inside an apply-to-all, Machiavelli uses an explicit split syntax to express control parallelism. This is specialized for the recursion in a divide-and-conquer function. FIG. 4 is a block diagram illustrating the Machiavelli system, an implementation of the team parallel model. To produce efficient code from this syntax, the Machiavelli system uses three main components. First, a preprocessor 402 translates the syntactic extensions into pure C and MPI, and produces a parallel and serial version of each user-defined function. Second, a collection of predefined operations is specialized by the preprocessor for the types supplied by the user at compile time (for example, doubles in the above example). Third, a run-time system handles team operations and load balancing. A standard C compiler 404 compiles the code generated by the preprocessor 402, and a standard linker 406 links the compiled code against an MPI library 408 and the Machiavelli run-time library 410. In a distributed memory parallel machine, the binary version of the program is loaded into and executed from each node of the machine. Specifically, each processor in the machine executes a version of the program from its associated memory. In a shared memory machine, each processor executes the binary version from shared memory. 4.2.2 Vectors Machiavelli is built around the vector as its basic data structure. A vector is a dynamically-created ordered collection of values, similar to an array in sequential C, but is distributed across a team of processors. After being declared as a C variable, a vector is created when it is first assigned to. It is then only valid within the creating team. Vectors have reference semantics: thus, assigning one vector to another will result in the second vector sharing the same values as the first. To copy the values an explicit data-parallel operation must be used (see the section on data parallel operations below). A vector is also strongly typed: it cannot be "cast" to a vector of another type. Again, an explicit data-parallel operation must be used to copy and cast the values into a new vector. Vectors can be created for any of the basic types, and for any types that the user defines. Accessing the fields of vectors of user-defined types is done using the standard C "." operator. For example, given a user-defined type containing floating point fields x and y, and a vector points of such types, a vector of the product of the fields could be computed using: vec_double products={p.x*p.y:p in points } A vector is represented on every processor within its team by a structure containing its length, the number of elements that are currently on this processor, a pointer to a block of memory containing those elements, and a flag indicating whether the vector is unbalanced or not. Machiavelli normally uses a simple block distribution for vectors. This corresponds to the case of a vector balanced across all the processors within a team; the unbalanced case is discussed in Section 4.3.2 entitled Unbalanced Vectors below. Thus, for a vector of size n on P processors, the first processor in the team has the first n/P elements, the second processor has the next n/P elements, and so on. When the vector size n is not an exact multiple of the number of processors P, the last processor will have fewer than n/P elements on it. In the extreme case of n=1 then P-1 processors will have no elements. Given this distribution, it is easy to construct efficient functions of the vector length and team size that compute the maximum number of elements per processor (used to allocate space), the number of elements on a given processor, and the processor and offset on that processor that a specific index maps to. This last function is critical for performing irregular data-transfer operations, such as sends or fetches of elements. To allow vectors of user-defined types to be manipulated using MPI operations (for example, to extract an element from such a vector) Machavelli uses MPI's derived datatype functionality. This encodes all the relevant information about a C datatype in a single type descriptor, which can then be used by MPI communication functions to manipulate variables and buffers of the matching type. For every new type used in a vector, the preprocessor therefore generates initialization code to define a matching type descriptor. For example, the pseudocode listing below shows a point structure defined by the user, and the function generated by the preprocessor to create the appropriate type descriptor for MPI.
/* Structure defined by user */
typedef struct _point {
double x;
double y;
int tag;
} point;
/* Initialization code generated by preprocessor
*/MPI_Datatype _mpi_point;
void _mpi_point_init ( )
{
point example;
int i, count = 2;
int lengths[2] = { 2, 1 };
MPI_Aint size, displacements[2];
MPI_Datatype types [2];
MPI_Address (&example.x, &displacements[0]);
types[0] = MPI_DOUBLE;
MPI_Address (&example.tag, displacements[1]);
types[1] = MPI_INT;
for (i = 1; i >= 0; i--) {
displacements [i] -= displacements [0];
}
MPI_Type_ struct (count, lengths, displacements, types,
&_mpi_point);
MPI_Type_commit (&_mpi_point);
}
/* _mpi_point can now be used as an MPI type*/
To operate on the vector types, Machiavelli supplies a range of basic data-parallel vector functions. In choosing which functions to support a trade-off must be made between simplicity (providing a small set of functions that can be implemented easily and efficiently) and generality (providing a larger set of primitives that abstract out more high-level operations). The vector functions can be divided into four basic types: reductions, scans, vector reordering, and vector manipulation. Each of these function types are described further below. 4.2.3 Reductions A reduction operation on a vector returns the (scalar) result of combining all elements in the vector using a binary associative operator, .sym.. Machiavelli supports the reduction operations shown in Tablel below. The operations reduce_min_index and reduce_max_index extend the basic definition of reduction, in that they return the (integer) index of the minimum or maximum element, rather than the element itself.
TABLE 1
Function Name Operation Defined On
reduce_sum Sum All numeric types
reduce_product Product All numeric types
reduce_min Minimum Value All numeric types
reduce_max Maximum Value All numeric types
reduce_min_index Index of Minimum All numeric types
reduce_max_index Index of Maximum All numeric types
reduce_and Logical and Integer types
reduce_or Logical or Integer types
reduce_xor Logical exclusive-or Integer types
The implementation of reduction operations is very straightforward. Every processor performs a loop over its own section of the vector, accumulating results into a variable. They then combine the accumulated local results in an MPI_Allreduce operation, which returns a global result to all of the processors. The preprocessor generates reduction functions specialized for a particular type and operation as necessary. As an example, the following pseudocode listing shows the specialization forreduce_min_double, which returns the minimum element of a vector of doubles. The use of the team structure passed in the first argument will be explained in more detail in Section 4.4.
double reduce_min_double (team *tm, vec_double src)
{
double global, local = DBL_MAX;
int i, nelt = src.nelt_here;
for (i = 0; i < nelt; i++) {
double x = src.data[i];
if (x < local) local = x;
}
MPI_Allreduce (&local, &global, 1, MPI_DOUBLE, MPI_MIN,
tm->com);
return global;
}
4.2.4 Scans A scan, or parallel prefix, operation can be thought of as a generalized reduction. Take a vector v of length n, containing elements v.sub.0, V.sub.1, v.sub.2, . . . , and an associative binary operator .times. with an identity value of id. A scan of v returns a vector of the same length n, where the element v.sub.i has the value id.times. v.sub.0.times. v.sub.1.times. . . . v.sub.i-1 Note that this is the "exclusive" scan operation; the inclusive scan operation does not use the identity value, and instead sets the value of v.sub.1 to v.sub.0.times. v.sub.1.times. . . . . .times. v.sub.i-1 Machiavelli supplies a slightly smaller range of scans than of reductions, as shown in Table 2, because there is no equivalent of the maximum and minimum index operations.
TABLE 2
Function Name Operation Defined On
scan_sum Sum All numeric types
scan_product Product All numeric types
scan_min Minimum value All numeric types
scan_max Maximum value All numeric types
scan_and Logical and Integer types
scan_or Logical or Integer types
scan_xor Logical exclusive-or Integer types
Scans are only slightly more difficult to implement than reductions. Again, every processor performs a loop over its own section of the vector, accumulating a local result. The processors then combine their local results using an MPI_Scan operation, which returns an intermediate scan value to each processor. A second local loop is then performed, combining this scan value with the original source values in the vector to create the result vector. There is an additional complication in that MPI provides an inclusive scan rather than an exclusive one. For operations with a computable inverse (for example, sum) the exclusive scan can be computed by applying the inverse operation to the inclusive result and the local intermediate result. For operations without a computable inverse (for example, min), the inclusive scan is computed and the results are then shifted one processor to the "right" using MPI_Sendrecv. As an example, the following pseudocode shows scan_sum_int, which returns the exclusive prefix sum of a vector of integers.
vec_int scan_sum_int (team *tm, vec_int src)
{
int i, nelt = src.nelt_here;
int incl, excl, swap, local = 0;
vec_int result;
result = alloc_vec_int (tm, src.length);
/* Local serial exclusive scan */
for (i = 0; i < nelt; i++) {
swap = local;
local += src.data[i];
dst.data[i] = swap;
}
/* Turn inclusive MPI scan into exclusive result */
MPI_Scan (&local, &incl, 1, MPI_INT, MPI_SUM, tm->com);
excl = incl - local;
/* Combine exclusive result with previous scan */
for (i = 0; i < nelt; i++) {
dst.data[i] += excl;
}
return result;
}
4.2.5 Vector Reordering There are two basic vector reordering functions, send and fetch, which transfer source elements to a destination vector according to an index vector. In addition, the function pack, which is used to redistribute the data in an unbalanced vector can be seen as a specialized form of the send function. These are the most complicated Machiavelli functions, but they can be implemented using only one call to MPI_Alltoall to set up the communication, and one call to MPI_Alltoallv to actually perform the data transfer. send (vec_source, vec_indices, vec_dest) send is an indexed vector write operation. It sends the values from the source vector vec_source to the positions specified by the index vector vec_indices in the destination vector vec_dest, so that vec_dest[vec_indices[i]]=vec_source[i]. This is implemented using the following operations on each processor. For simplicity, assume that there are P processors, that the vectors are of length n, and that there are exactly n/P elements on each processor. 1. Create two arrays, num_to_send[P] and num_to_recv[P]. These will be used to store the number of elements to be sent to and received from every other processor, respectively. 2. Iterate over this processor's n/P local elements of vec_indices. For every index element i, compute the processor q that it maps to, and increment num_to_send[q]. Each processor now knows how many elements it will send to every other processor. 3. Exchange num_to_send[P] with every other processor using MPI_Alltoall( ). The result of this is nun_to_recv[P], the number of elements to receive from every other processor. 4. Allocate a data array data_to_send[n/P] and an index array indices_to_send[n/P]. These will be used to buffer data and indices before sending to other processors. Similarly, allocate a data array data_to_recv[n/P] and an index array indices_to_recv[n/P]. Conceptually, the data and index arrays are allocated and indexed separately, although in practice they can be allocated as an array of structures to improve locality. 5. Perform a plus-scan over num_to_recv[0 ] and num_to_send[ ], resulting in arrays of offsets send_ptr[P] and recv_ptr[P]. These offsets will act as pointers into the data_and indices_arrays. 6. Iterate over this processor's n/P local elements of vec_indices[ ]. For each element vec indices[i], compute the processor q and offset o that it maps to. Fetch and increment the current pointer, ptr=send_ptr[q]++. Copy vec_source[i] to data_to_send[ptr], and copy o to indices_to_send[ptr]. 7. Call MPI_Alltoallv( ). Send data from data_to_send[ ] according to the counts num_to_send[ ], and receive into data_to_recv[ ] according to the counts num_to_recv[ ]. Do the same for indices_to_send[ ]. 8. Iterate over data_to_recv[ ] and indices_to_recv[ ], performing the vec_dest[indices_to_recv[i]]=data_to_recv[i]. Note that steps 1-3 and 5 are independent of the particular data type being sent. They are therefore abstracted out into library functions. The remaining steps are type-dependent, and are generated as a function by the preprocessor for every type that is the subject of a send. fetch (vec_source, vec_indices, vec_dest) fetch is an indexed vector read operation. It fetches data values from the source vec_source (from the positions specified by the index vector vec_indices) and stores them in the destination vector vec_dest, so that vec_dest[i]=vec_source[vec_indices[i]]. Obviously, this could be implemented using two send operations--one to transfer the indices of the requested data items to the processors that hold the data, and a second to transfer the data back to the requesting processors. However, by combining them into a single function some redundant actions can be removed, since we know ahead of time how many items to send and receive in the second transfer. Again, for simplicity assume that there are P processors, that all the vectors are of length n, and that there are exactly n/P elements on each processor. 1. Create two arrays, num_to_request[P] and num_to_recv[P]. These will be used to store the number of requests to be sent to every other processor, and the number of requests to be received by every other processor, respectively. 2. Iterate over this processor's n/P local elements of vec_indices. For every index element i, compute the processor q that it maps to, and increment num_to_request[q]. Each processor now knows how many elements it will request from every other processor. 3. Exchange num_to_request[ ] with every other processor using MPI_Alltoall( ). The result of this is num_to_recv[ ], the number of requests to receive from every other processor (which is the same as the number of elements to send). 4. Allocate an index array indices_to_request[n/P]. This will be used to buffer indices to request before sending to other processors. Similarly, allocate an index array indices_to_recv[n/P]. 5. Perform a plus-scan over num_to_request[ ] and num_to_recv[ ], resulting in arrays of offsets request_ptr[P] and recv_ptr[P]. These offsets will act as pointers into the indices_to_request[ ] and indices_to_recv[ ] arrays. 6. Allocate an index array requested_index[n/P]. This will store the index in a received data buffer that we will eventually fetch the data from. 7. Iterate over this processor's n/P local elements of vec_indices[ ]. For each element vec_indices[i], compute the processor q and offset o that it maps to, Fetch and increment the current pointer, ptr=request_ptr[q]++. Copy o to indices_to_request[ptr]. 8. Call MPI_Alltoallv( ). Send data from request_to_send[ ] according to the element counts in num_to_request[ ], and receive into request_to_recv[ ] according to the element counts in num_to recv[ ]. 9. Allocate data arrays data_to_send[n/P] and data_to_recv[n/P]. 10. Iterate over request_to_recv[ ], extracting each offset in turn, fetching the requested element, and storing it in the data buffer, data_to_send[i]=vec_dest[request_to_recv[i]]. 11. Call MPI_Alltoallv( ). Send data from data_to send[ ] according to the element counts in num_to_recv[ ], and receive into data_to_recv[ ] according to the counts in num_to_request[ ]. 12. Iterate over data_to_recv[ ] and requested_index[ ], performing the operation vec_dest[i]=data_to_recv[requested_index[i]]. Again, steps 1-8 are independent of the particular data type being requested, and abstracted out into library functions, while the remaining steps are generated as a function by the preprocessor for every type that is the subject of a fetch. pack (vec_source) pack redistributes the data in an unbalanced vector so that it has the block distribution property described in Section 4.2.2. An unbalanced vector (that is, one that does not obey this property, but instead has an arbitrary amount of data on each processor) can be formed either by an apply-to-each operator with a conditional (see Section 4.3) or by appending the results of recursive function calls (see Section 4.5). The pack function is normally called as part of another Machiavelli operation. pack is simpler than send since we will send contiguous blocks of elements between processors, rather than sequences of elements with the appropriate offsets to s_to_re them in. 1. Exchange vec_source.nelt_here with every other processor using MPI_Alltoall( ). The result of this is num_on_each[P]. 2. Perform a plus-scan across num_on_each[P] into first_on_each[P]. The final result of the plus scan is the total length n of the vector. 3. From n, compute the number of elements per processor in the final block distribution, final_on_each[P], and allocate a receiving array data_to_recv[n/P]. 4. Allocate two arrays, num _to_recv[P] and num_to_send[P]. 5. Iterate over final_on_each[P], computing which processor(s) will contribute data for each destination processor in turn. If this processor will be receiving, update num_to_recv[ ]. If this processor will be sending, update num_to_send[ ]. 6. Call MPI_Alltoallv( ), sending data from vec_source.data[ ] according to the element counts in num_to_send[ ], and receiving into data_to_recv[ ] according to the element counts in num_to_recv. 7. Free the old data storage in vec_source.data[ ] and replace it with data_to_recv[ ]. 4.2.6 Vector Manipulation Machiavelli also supplies seven functions that manipulate vectors in various ways. Most of these have simple implementations. All but length and free are specialized for the particular type of vector being manipulated. free(vector) Frees the memory associated with vector vector. new_vec(n) Returns a vector of length n. This is translated in the parallel and serial code to calls to the underlying Machiavelli functions alloc_vec_type and alloc_vec_type_serial, respectively. length (vector) Returns the length of vector vector. This simply returns the length field of the vector structure, that is, vector.length. get (vector, index) Returns the value of the element of vector vector at index index. Using the length of vector, every processor computes the processor and offset that index maps to. The processors then perform a collective MPI_Broadcast operation, where the processor that holds the value contributes the result. As an example, the following code listing shows the parallel implementation of get for a user-defined point type.
point get_point (team *tm, vec_point src, int i)
{
point result;
int proc, offset;
proc_and_offset (I, src.length, tm->nproc, &proc,
&offset);
if (proc == tm->rank) {
dst = src.data[offset];
}
MPI_Bcast (&result, 1, mpi_point, proc, tm->com)
return result;
}
set (vector, index, value) Sets the element at index index of vector vector to the value value. Again, every processor computes the processor and offset that index maps to. The processor that holds the element then sets its value. As an example, the following code listing shows the parallel implementation of set for a vector of characters.
void set_char (team *tm, vec_char dst, int i, char elt)
{
int proc, offset;
proc.sub.-- and_offset (i, src.length, tm->nproc, &proc, &offset);
if (proc == tm->rank) {
dst.data[offset] = elt;
}
}
index (length, start, increment) Returns a vector of length length, containing the numbers start, start+increment, start+2*increment, . . . . This is implemented as a purely local loop on each processor, and is specialized for each numeric type. As an example, the following code listing shows the implementation of index for integer vectors.
vec_int index_int (team *tm, int len, int start, int incr)
{
int i, nelt;
vec_int result;
/* Start counting from first element on this processor */
start += first_elt_on_proc (tm->this_proc) * incr;
result = alloc_vec_int (tm, len);
nelt = result.nelt_here;
for (i = 0; i < nelt; i++, start += incr)
result.data[i] = start;
}
return result;
}
distribute (length, value) Returns a vector of length length, containing the value value in each element. This is defined for any user-defined type, as well as for the basic C types. Again, it is implemented with a purely local loop on each processor. As an example, the following code listing shows the parallel implementation of distribute for double-precision floating-point vectors.
vec_double distribute_double (team *tm, int len, double elt)
{
int i, nelt;
vec_int result;
result = alloc_vec_double (tm, len);
nelt = result.nelt_here;
for (i = 0; i < nelt; i++)
result.data[i] = elt;
}
return result;
}
vector (scalar) Returns a single-element vector containing the variable scalar. This is equivalent to dist (1, scalar), and is provided merely as a convenient shorthand. replicate (vector, n) Given a vector vector of length m, and an integer n, returns a vector of length m.times.n. containing n copies of vector. This is converted into a doubly-nested loop in serial code as shown in the following code listing, and into a sequence of n operations in parallel code.
vec_pair replicate_vec_pair_serial (vec_int src, int n)
{
int i, j, r, nelt;
vec_pair result;
nelt = src->nelt_here;
result = alloc_vec_pair_serial (nelt * n);
r = 0;
for (i = 0; i < n; i++) {
for (j = 0; j < nelt; j++) {
result.data[r++] = src->data[j];
}
}
return result;
}
append (vector, vector [, vector]) Appends two or more vectors together, returning the result of their concatenation as a new vector. This is implemented as successive calls to a variant of the pack function. Here it is used to redistribute elements of a vector that is spread equally among the processors to a smaller subset of processors, each representing a particular section of a longer vector. The Machiavelli preprocessor converts an n-argument append to n successive calls to pack, each to a different portion of the result vector. As an example, the following code listing shows the implementation of append for three integer vectors.
vec_int append_3_vec_int (team *tm, vec_int vec_1,
vec_int vec_2, vec_int vec_3)
{
int len_1 = vec_1.length;
int len_2 = vec_2.length;
int len_3 = vec_3.length;
vec_int result = alloc_vec_int (tm, len_1 + len_2 + len_3);
pack_vec_int (tm, result, vec_1, 0);
pack_vec_int (tm, result, vec_2, len_1);
pack_vec_int (tm, result, vec_3, len_1 + len_2);
}
The next four functions (odd, even, interleave, and transpose) can all be constructed from send combined with other primitives. However, providing direct functions allows for a more efficient implementation by removing the need for the use of generalized indexing. That is, each of the four functions preserves some property in its result that allows us to precompute the addresses to send blocks of elements to, rather than being forced to compute the address for every element, as in send. even (vector, n) odd (vector, n) Given a vector, and an integer n, even returns the vector composed of the even-numbered blocks of elements of length n from vector. Thus, even (foo, 3) returns the elements 0, 1, 2, 6, 7, 8, 12, 13, 14, . . . of vector foo . odd does the same, but for the odd-numbered blocks of elements. The length of vector is assumed to be an exact multiple of twice the block-size n. As an example, the next code listing shows the serial implementation of even for a user-defined pair type. The parallel implementations of odd and even simply discard the even and odd elements respectively, returning an unbalanced vector. Note that the use of generalized odd and even primitives (rather than simply basic single-element odd and even) allows them to be used for other purposes. For example. even (bar, length (bar)/2) returns the first half of vector bar.
vec_pair even_vec_pair (vec_pair src, int blocksize)
{
int i, j, r, nelt;
vec_pair result;
nelt = src.nelt_here;
alloc_vec_pair (nelt / 2, &result);
r = 0;
for (i = 0; i < nelt; i += blocksize) {
for (j = 0; j < blocksize; j++) {
result.data[r++] = src.data[i++];
}
}
return result;
}
interleave (vec1, vec2, n) Given two vectors vec1 and vec2, and an integer n, it returns the vector composed of the first fi elements from vecl, followed by the first n elements from vec2, followed by the second n elements from vecl, and so on. As such, it does the opposite of even and odd. Again, the use of a generalized interleave primitive allows it to be used for other purposes. For example, given two m.times.n matrices A and B,interleave(A, B, n) returns the 2m.times.n matrix whose rows consist of the appended rows of A and B. The lengths of vec1 and vec2 are assumed to be the same, and an exact multiple of the blocksize n. transpose (vector, m, n) Given a vector vector which represents an n x m matrix, returns the vector representing the transposed n.times.m matrix. 4.3 Data-Parallel Operations For general data-parallel computation, Machiavelli uses the apply-to-each operator, which has the following syntax: {expr}: elt. in vec [, elt in vec] [.vertline.cond]} expr is any expression (without side-effects) that can be the right-hand side of an assignment in C. elt is an iteration variable over a vector vec. The iteration variable is local to the body of the apply-to-each operator. There may be more than one vector, but they must have the same length. cond is any expression without side-effects that can be a conditional in C. The effect of this construct is to iterate over the source vector(s), testing whether the condition is true for each element, and if so evaluating the expression and writing the result to the result vector. 4.3 .1 Implementation The Machiavelli preprocessor converts an apply-to-each operation without a conditional into a purely local loop on each processor, iterating over the source vectors and writing the resultant expression for each element into the destination vector. The absence of synchronization between processors explains why the expressions within an apply-to-each cannot rely on side effects; any such effects would be per-processor, rather than global across the machine. In general, this means that C's pre- and post-operations to increment and decrement variables cannot be used inside an apply-to-each. As an example, the following code listing shows a simple data-parallel operation and the resulting code.
/* Machiavelli code generated from:
* vec_double diffs = {(elt - x_mean) 2 : elt in x}
*/
{
int i, nelt = x.nelt_here;
diffs = alloc_vec_double (tm, x.length);
for (i = 0; i < nelt; i++) {
double elt = x.data[i];
diffs.data[i] = (elt - x_mean) 2;
}
}
The independence of loop expressions also enables the Machiavelli preprocessor to perform loop fusion on adjacent apply-to-each operations that are iterating across the same vectors. 4.3.2 Unbalanced vectors If a conditional is used in an apply-to-each, then the per-processor pieces of the destination vector may not have the same length as the pieces of source vector(s). The result is that we are left with an unbalanced vector; that is, one in which the amount of data per processor is not fixed. This is marked as such using an "unbalanced" flag in its vector structure. As an example, the following code listing shows the parallel implementation of an apply-to-each with a simple conditional.
/* Machiavelli code generated from:
* vec_double resuit;
* result = {(val - mean) 2 : val in values, flag in flags
* .vertline. (flag != 0)};
*/
{
int i, ntrue = 0, nelt = values.nelt_here;
/* Overallocate the result vector */
result = alloc_vec_double (tm, values.length);
/* ntrue counts conditionals */
for (i = 0; i < nelt; i++) {
int flag = flags.data[i];
if (flags != 0) {
double val = values.data[i];
result.data[ntrue++] = (val - mean) 2;
}
}
/* Mark the result as unbalanced */
result.nelt_here = ntrue;
result.unbalanced = true;
}
An unbalanced vector can be balanced (that is, its data can be evenly redistributed) across the processors using a pack function, as described above. The advantage of not balancing a vector is that by not calling pack we avoid two MPI collective operations, one of which transfers a small and fixed amount of information between processors (MPI_Allgather) while the other may transfer a large and varying amount of data (MPI_Alltoallv). Given an unbalanced vector, we can still perform many operations on it in its unbalanced state. In particular, reductions, scans, and apply-to-each operations (including those with conditionals) that operate on a single vector are all oblivious to whether their input vector is balanced or unbalanced, since they merely loop over the number of elements present on each processor. Given the underlying assumption that local operations are much cheaper than transferring data between processors, it is likely that the time saved by avoiding data movement in this way outweighs the time lost in subsequent operations caused by not balancing the data across the processors, and hence resulting in all other processors waiting for the processor with the most data. Machiavelli only performs a pack when required, but allows the user to manually insert additional pack operations. The remaining Machiavelli operations that operate on vectors all require their input vectors to be packed before they can proceed. The implementations of get, set, send, fetch are therefore extended with a simple conditional that tests the "unbalanced" flag of their input vector structures, and performs a pack on any that are unbalanced. These operations share the common need to quickly compute the processor and offset that a specific vector index maps to; a balanced block distribution can satisfy this need. Apply-to-each operations on multiple vectors are also extended with test-and-pack, although in this case the requirement is to assure that vectors being iterated across in the same loop share the same distribution. Two further optimizations that are possible when using unbalanced vectors are discussed in the next section. 4.4 Teams Machiavelli uses teams to express control-parallel behavior between data-parallel sections of code, and in particular to represent the recursive branches of a divide-and-conquer algorithm. A team is a collection of processors, and acts as the context for all functions on vectors within it. A vector is distributed across the processors of its owning team, and can only be operated on by data-parallel functions within that team. Teams are divided when a divide-and-conquer algorithm makes recursive calls, and merged when the code returns from the calls. Otherwise, teams are mutually independent, and do not communicate or synchronize with each other. However, unless the programmer wants to bypass the preprocessor and gain direct access to the underlying team functions, the existence of teams is effectively hidden. 4.4.1 Implementation A team is represented by the MPI concept of a communicator. Specifically, a communicator describes a specific collection of processors, and when passed to an MPI communication function, restricts the "visible universe" of that communication function to the processors present in the communicator. The internal representation of a team consists of a structure containing the MPI communicator, the number of processors in the team, and the rank of this processor in the team. All processors begin in a "global" team, which is then subdivided by divide-and-conquer algorithms to form smaller teams. The preprocessor adds a pointer to the current team structure as an extra argument to every parallel function, as was seen in the implementations of Machiavelli functions above. In this way, the subdivision of teams on the way down a recursion tree, and their merging together on the way up the tree, is naturally encoded in the passing of smaller teams as arguments to recursive calls, and reverting to the parent teams when returning from a call. 4.5 Divide-and-conquer Recursion Machiavelli uses the following syntax: split (result.sub.1 =func (arg.sub.1), result.sub.2 =func (arg.sub.2) [, result.sub.n =func (arg.sub.n) ]) to represent the act of performing divide-and-conquer function calls. n is the result returned by invoking the function func on the argument list arg_n. Team parallelism is implemented by dividing the current team into one subteam per function call, sending the arguments to the subteams, recursing within the subteams, and then fetching the results from the subteams. Each of these steps will be described. Note that in the Machiavelli implementation func must be the same for every call in a given split, although this is not a requirement of the team parallel model 4.5.1 Computing Team Sizes Before subdividing a team into two or more subteams, we need to know how many processors to allocate to each team. For Machiavelli's approximate load balancing of teams, the subteam sizes are chosen according to the relative amount of expected work that the subtasks are expected to require. This is computed at runtime by calling an auxiliary cost function defined for each divide-and-conquer function. The cost function takes the same arguments as the divide-and-conquer function, but returns as a result an integer representing the relative amount of work that those arguments will require. By default, the preprocessor generates a cost function that returns the size of the first vector in the argument list (that is, it assumes that the cost will be a linear function of the first vector argument). This can be overridden for a particular divide-and-conquer function divconq by defining a cost function divconc qcost. The following code listing shows a simple cost function for quicksort, which has an expected cost of O(n log n).
int quicksort_cost (vec_double s) {
int n = length (s);
return (n * (int) log ((double) n));
}
The results of a cost function have no units, since they are merely compared to each other. The actual act of subdividing a team is performed with the MPI_Comm_split function which, when given a flag declaring which new subteam this processor should join, creates the appropriate MPI communicator. 4.5.2 Transferring Arguments and Results Having created the subteams, we must redistribute any vector arguments to the respective subteams (scalar arguments are already available on each processor, since we are programming in an SPMD style). The task is to transfer each vector to a smaller subset of processors. This can be accomplished with a specialized form of the pack function; all that is necessary is to change the computation of the number of elements per processor for the destination vector, and to supply a "processor offset" that serves as the starting point for the subset of processors. However, there are two optimizations that can be made to reduce the number of collective operations. First, the redistribution function can accept unbalanced vectors as arguments, just as the original pack function can. This is particularly important for divide-and-conquer functions, where the arguments to recursive calls may be computed using a conditional in an apply-to-each, which results in unbalanced vectors. Without the ability to redistribute these unbalanced vectors, the number of collective communication steps would be doubled (first to balance the vector across the original team, and then to redistribute the balanced vector to a smaller subteam). Second, the redistribution function can use a single call to MPI_Alltoallv to redistribute a vector to each of the subteams. Consider the recursion in quicksort: split (left=quicksort (les), right=quicksort (grt)); les and grt are being supplied as the argument to recursive function calls that will take place in different subteams. Since these subteams are disjoint, a given processor will send data from either les or grt to any other given processor, but never from both. We can therefore give MPI_Alltoallv the appropriate pointer for the data to send to each of the other processors, sending from les to processors in the first subteam and from grt to processors in the second subteam. Thus, only one call to MPI_Alltoallv is needed for each of the vector arguments to a function. After the recursive function call, we merge teams again, and must now extract a result vector from each subteam, and redistribute it across the original (and larger) team. This can be accomplished by simply setting the "unbalanced" flag of each result lector, and relying on later operations that need the vector to be in a balanced form to redistribute it across the new, larger team. This can be seen as a reverse form of the "don't pack until we recurse" optimization that was outlined above--now. we don't expand a vector until we need to. To illustrate these features, we will use the simplified version of quicksort shown below, which partitions the input into two vectors (containing those elements less than or equal to the pivot, and greater than the pivot) instead of three:
vec_int quicksort (s)
{
int pivot;
vec_int leq, grt, left, right, result;
if (length(s) < 2) {
return s;
} else {
pivot = get(s, length(s) / 2);
leq = {x : x in s .vertline. x <= pivot};
grt = {x : x in s .vertline. x > pivot};
free(s);
split (left = quicksort (leq),
right = quicksort (grt));
result = append(left, right)
free(left); free(right);
return result;
}
}
As an example, consider the algorithm sorting the following set of numbers from 0 to 15: [3 8 0 4 12 10 5 9 13 7 11 15 2 14 1 6] on four processors. FIG. 5A shows an overview of the recursion tree for this example. FIGS. 5B and 5C show the effects with and without both the "don't pack before split" optimization, and FIGS. 5C and 5E show the effects with and without the "postpone append till top level" optimization. FIG. 5A shows an example of how this version of quicksort travels down and then back up the recursion tree. As it travels down the recursion tree (levels 502 to 506), quicksort splits the set of numbers into smaller and smaller left and right parts. As it travels up the recursion tree (levels 508 to 512), quicksort combines the results. On each level traveling down the recursion tree, the middle element is chosen as the pivot, and two new subproblems are formed containing the elements less than or equal to, and greater than, the pivot. In this example, four processors are being used, so when four subproblems have been formed the individual processors run a serial version of the algorithm on their subproblem (step 506). They then return to the parallel code, which appends the subresults as it returns up the tree. FIGS. 5B and 5C show in more detail the layout of the vectors used on the way down the recursion tree, while FIGS. 5D and 5E show the layout of the vectors used on the way up the recursion tree. FIG. 5B shows the layout without the use of lazy vectors and the ability of the redistribution function to handle lazy input. The boxes show the data elements held on each processor, where a horizontal line represents a given vector. The contiguous segments of numbers represent the portions of a vector stored locally for one of the four processors. The enclosing ovals show the division of processors between teams. Initially, all processors are in one team. After the partitioning step. the resulting vectors are unbalanced (522). They are then rebalanced so that for each vector, each processor (except the last one, in this case) has an equal number of elements (see oval 524). In particular, quicksort picks "9" as the pivot element at level 522. and the two conditional apply-to-each expressions create two new vectors: leq, containing the elements less than or equal to the pivot (a total of 10 elements), and grt, containing the elements greater than the pivot (the remaining 6 elements). As can be seen, these vectors are unbalanced. The runtime system now uses two pack operations to make these vectors "balanced" as shown in oval 524 (that is, with a constant number of elements per processor, except for the last one). Note that if the system did not support lazy vectors, this rebalancing step would be part of the partitioning operation. If the system did support lazy vectors, but the redistribution function required balanced input, the redistribution function would call the pack function. After the vectors are rebalanced, the split step then subdivides the team into two new teams, and calls the redistribution function to redistribute the vectors to the two teams (526). The data is redistributed so that each team gets one (balanced) vector as shown in level 526. The code then recurses in each new team, creating unbalanced vectors after the partitioning step (528). In this example, the two teams pick "5" and "13" as their new pivot elements, and create two new sets of vectors. Again, two pack operations are required to balance the vectors on their respective teams (530). The two teams recurse again, creating four individual teams and redistributing the data so that each new team gets one of the four vectors shown at level 532. At this stage the subteams each contain a single processor, so the parallel code now calls the serial code instead of recursing again, resulting in the data on each processor being sorted (534). FIG. 5C shows the effect of extending the redistribution function to handle lazy input. In particular, FIG. 5C shows how the interprocessor communication for rebalancing the vectors before a split as in FIG. 5B is eliminated since Machiavelli's implementation of the split step supports lazy input. The steps in FIG. 5C are initially the same as in FIG. 5B. Note that levels 540 and 542 in FIG. 5B are the same as levels 520 and 522 in FIG. 5C. However, in contrast to FIG. 5B, the vectors are left in their unbalanced state after the partitioning step (542). The split step again subdivides the team and calls the redistribution function, which combines the balancing and redistribution of data into a single step. When the algorithm recurses, subdividing into two teams of two processors (see level 544 to 546), the unbalanced vectors are redistributed directly to their new teams, saving two intermediate pack operations. Again, when each of these two teams split into two new teams of one processor each (level 546 to 548), the two recursions now redistribute the unbalanced vectors directly to the four new teams, saving two intermediate pack operations each. This combination of rebalancing and redistributing the vectors into a single step saves an unnecessary data rearrangement step in each team on each level of recursion, and thus reduces expensive interprocessor communication. FIG. 5C ends similarly to FIG. 5B. When each team only has one processor, the parallel code on each processor invokes the serial code, which sorts the local elements of the vector (550). When the four individual processors finish the serial code, each contains a sorted portion of the result vector as shown. FIG. 5D shows the return from the recursion (when we are appending subresults together) in the case where lazy vectors are supported but not the optimization of postponing an append operation until the top level. Initially all the processors are in individual teams (560). First, they return to the parent teams from which the subteams were formed. Thanks to lazy vectors, no redistribution of data is necessary. Instead, the left and right vectors are created as unbalanced. Depending on the processor, the local data for these vectors will either be empty, or be the entire result of the subteam (562). Thus, no data elements are moved, either between processors or on processors (the data pointer in the representation of the left and right vectors will be either null or will point to the data of the result vector). Next, these vectors are appended together within each team, creating a new balanced result vector (564). The general append operation requires two pack operations to redistribute the data between the processors to form the balanced vectors shown at level 564. The processors then return from another level of recursion, to the original top-level team (566). The results of the two teams in turn become the left and right vectors of the top-level team as shown in level 566. Again, the new left and right vectors are left unbalanced, and no data movement is required until the final append step to create the final sorted result (568). In the final append step, these two unbalanced vectors are appended with a further pack to form the final result. FIG. 5E shows the effect of postponing the append step until the top level of the recursion tree. In effect, we are completely eliminating the append step, and leaving the overall result in an unbalanced state. Again, the return up the recursion tree starts with the four processors returning from serial code (580). As before, the subteams return to the parent teams, and left and right are created in an unbalanced state, pointing to the data from result (582). The append step now does much the same thing, creating a new result vector whose per-processor data pointer shares the data of left and right (584). However, the append step is now replaced by a local renaming operation that just chooses whether to use the elements from the left or right vector as the result vector on this processor. No interprocessor communication is needed to rearrange data, since the vector is left in an unbalanced state. At the next level of recursion, the results again become the left and right vectors of the top-level team (586). Another renaming step creates an unbalanced result vector (588). As with all lazy vectors, this will only be balanced if subsequent operations on it require it to be in a balanced state. Note that there is no data movement whatsoever between processors in the steps shown; we are taking advantage of the fact that the split function puts the subproblem of smaller numbers on the "left" and the subproblem of larger numbers on the "right", and thus their results will be naturally ordered. If any subsequent operations need the result vector to be balanced, a single pack operation will be required to redistribute the elements on the processors from the state shown at level 588 in FIG. 5E to the state shown in level 568 in FIG. 5D. As illustrated in the quicksort example above, the preprocessor for the Machiavelli system treats the Append operation immediately following a Split call as a special case. This special case implementation of the Append function avoids the inter-processor communication associated with the normal Append function. In the general Append operation, the preprocessor generates one Pack operation per source vector, to reshuffle the data across the processors as shown in 4.2.6. For example, given two equal-size source vectors to be appended together, which are spread across four processors, the run-time code performs the following steps: create a new vector twice as big (i.e. twice as many elements per processor); pack the first vector from all four processors into the result vector in the first two processors; and pack the second vector from all four processors into the result vector in the last two processors. The Machiavelli implementation uses the Pack code to implement the general Append function because this code is sufficiently general to handle a variety of redistribution functions. To clarify, the Machiavelli implementation uses the Pack code to redistribute data elements among processors in the following cases: 1) to redistribute or balance elements of a vector among processors, and 2) as part of the Append function to redistribute elements from more processors to fewer processors. It is important to note that the Pack code used in the Machiavelli system represents only one possible way of implementing functions for redistributing data among processors. It is a design choice to use Pack code in the Append function. It is also possible to implement distinct functions for re-balancing an unbalanced vector (e.g., a Rebalance or Redistribute function) and for distributing input vectors from more processors to fewer processors (e.g., a Subappend function). The names chosen for the functions are not critical to the invention. The Machiavelli preprocessor reduces inter-processor communication associated with returns from split calls by treating an Append after a Split as a special case. If the preprocessor determines that the results of a Split call are immediately used as input to an Append function, then it can replace the normal Append function with the "renaming" code described in connection with FIG. 5E. This special case code swaps pointers around so that the vector resulting from the append step on each processor points to the data that was just returned from the Split call. This approach eliminates inter-processor communication that would otherwise result using normal Append operations after Split functions while traveling back up the recursion tree. Note that the quicksort example makes the assumption that processors are assigned to subteams in numerically increasing order, e.g., processors 0-1 were used for the left part of the split in the example above, and processors 2-3 were used for the right part of the split. Obviously if the order were swapped so that processors 2-3 received the left and processors 0-1 received the right, then the "renaming" append code would give a back-to-front answer. The Machiavelli team-splitting code makes sure that this assumption is always true. The Machiavelli preprocessor implements the "renaming" optimization by looking one line ahead of a split call to determine if the next line includes an Append operation to join the results of the split call into a result vector. An alternative implementation of the preprocessor could use data-flow analysis to determine if this property holds in the more general case. In particular, standard data-flow analysis used in compilers could be used to find candidates for the renaming optimization in cases where there are additional statements between a Split call and an Append on the results of the Split call, or where vectors derived from the result of a Split call are being appended. As an example of the latter case, consider a program that includes code to add 1 to every element in the left vector, and 2 to every element in the right, creating two new unbalanced vectors, and then an Append function to append the two new vectors. Data-flow analysis could be used to determine that the renaming optimization could be applied to the two new vectors. An alternative implementation would use another flag in the structure representing a vector. If set, this flag indicates that the vector is not only unbalanced, but is the result of a split. It also records which subteam the vector came from. The flag is cleared after any operation that balances a vector. The flag is set for all vectors returned from a subteam, e.g., returned from a divide-and-conquer function. In contrast to the implementation where the preprocessor always generates code to call the append function when it sees "append( )" in the input code, this approach makes the append function adaptive at runtime based on the state of the input vector. At runtime, the Append function determines whether all of its input vectors are the results of a split, and whether they are in the correct order (e.g. the first vector came from subteam 0, the second from subteam 1, etc). If so, it performs the renaming optimization. Otherwise, it redistributes the data as before. This approach enables a preprocessor to (et the same result as achieved through data flow analysis without overly complicating the preprocessor code. In this particular application, it avoids unnecessary inter-processor communication in cases where the Append function operates on the results of the Split call or data derived from these results without using data-flow analysis before run-time. 4.5.3 Serial Code The Machiavelli preprocessor generates two versions of each user-defined and inbuilt function. The First version, as described in previous sections, uses MPI in parallel constructs, and team-parallelism in recursive calls. The second version is specialized for single processors with purely local data (that is, a team size of one). In this version, apply-to-each constructs reduce to simple loops, as do the predefined vector operations, and the team-based recursion is replaced with simple recursive calls. As was previously discussed, this results in much more efficient code. For example, the following code listing shows the serial implementation of a fetch function, which can be compared to the 12-step description of the parallel equivalent described above.
void fetch_vec_int_serial (vec_int src, vec_int indices,
vec_int dst) {
int i, nelt = dst.nelt_here;
for (i = 0; i < nelt; i++) {
dst[i] = src[indices[i]]
}
}
Where more efficient serial algorithms are available, they can be used in place of the serial versions of parallel algorithms that Machiavelli compiles. Specifically, the user can force the default serial version of a parallel function to be overridden by defining a function whose name matches that of the parallel function but has the added suffix "_serial". For example, the following code listing shows a more efficient serial implementation of quicksort supplied by the user:
void user_quicksort (double *A, int p, int r)
{
if (p < r) {
double x = A[p];
int i = p - 1;
int j = r + 1;
while (1) {
do { j--; } while (A[j] > x);
do { i++; } while (A[i] < x);
if (i < j) {
double swap = A[i];
A[i] = A[j];
A[j] = swap;
} else {
break;
}
}
user_quicksort (A, p, j);
user_quicksort (A, j+1, r);
}
}
vec_int quicksort_serial (vec_int src)
{
user_quicksort (src.data, 0, src.length - 1);
return src;
}
MPI functions can also be used within Machiavelli code, in cases where the primitives provided are too restrictive. As explained above, all user-defined types have equivalent MPI types constructed by the Machiavelli preprocessor, and these MPI types can be used to transfer instances of those types in MPI calls. The fields of a vector and of the current team structure (always available as the pointer tm in parallel code) are also accessible, as shown in the following code listing.
typedef struct _vector {
void *data; /* pointer to elements on this processor */
int nelt_here; /* number of elements on this processor */
int length; /* length of the vector */
} vector;
typedef struct _team {
int nproc; /* number of processors in this team */
int rank; /* rank of this processor within team */
MPI_Communicator com; /* MPI communicator containing team */
} team;
Putting all these steps together, the code generated by the preprocessor
for the recursion in the parallel version of quicksort is shown below.
/* Machiavelli code generated from:
split (left = quicksort (les),
* right = quicksort (grt)); */
/* Compute the costs of the two recursive calls */
cost_0 = quicksort_cost (tm, les);
cost_1 = quicksort_cost (tm, grt);
/* Calculate the team sizes, and which subteam to join */
which_team = calc_2_team_sizes (tm, cost_0, cost_1, &size_0,
&size_1);
/* Create a new team */
create_new_teams (tm, which_team, size_0, &new_team);
/* Allocate an argument vector to hold the result in the new
team */
arg = alloc_vec_double (&new_team, which team ?
grt.length : les.length);
/* Compute communication patterns among subteams */
setup_pack_to_two_subteams (tm, les.nelt_here,
grt.nelt_here, grt - les, les.length, grt.length, size_0,
size_1);
/* Perform the actual data movement of pack, into the argument
vector */
MPI_Alltoallv (les.data, _send_count, _send_disp,
MPI_DOUBLE,arg.data, _recv_count, _recv_disp,
MPI_DOUBLE, tm->com);
/* Recurse in two different subteams */
if (new_team.nproc == 1) {
/* If new team size is 1, run serial quicksort */
result = quicksort_serial (arg);
} else {
/* Else recurse in the new team */
result = quicksort (&new_team, arg);
}
/* Returning from the recursion,
we rejoin original team, discard old */
free_team (&new_team);
/* Assign tmp to left and nullify right, or vice versa,
forming two unbalanced vectors, each on a subset
of the processors in the team */
if (which_team == 1) {
right = tmp;
left.nelt_here = 0;
} else {
left = tmp;
right.nelt_here = 0;
}
4.6 Load Balancing As mentioned above, the team-parallel model restricts load balancing to individual processors because that is where most of the time is spent when n>>P. We are thus trying to cope with the situation where some processors finish first and are idle. Our goal is to ship function invocations from working processors to idle processors, and then return the result to the originating processor. The implementation of Machiavelli's load-balancing system is restricted by the capabilities of MPI, and by our desire to include as few parallel constructs as possible in the execution of sequential code on single processors. In the absence of threads or the ability to interrupt another processor, an idle processor is unable steal work from another processor, because the "victim" would have to be involved in receiving the steal message and sending the data. Therefore, the working processors must request help from idle processors. The request cannot use a broadcast operation because MPI broadcasts are a collective operation that all processors must be involved in, and therefore other working processors would block the progress of a request for help. Similarly, the working processor cannot use a randomized algorithm to ask one of its neighbors for help (in the hope that it picks an idle processor), because the neighbor might also be working, which would cause the request to block. Thus, to determine whether another processor is idle (and hence can respond quickly to our request) the run-time code dedicates one processor to keeping track of each processor's status. This processor acts as the manager of active load balancing. The load-balancing process employs a three-way hand-shaking protocol among the processor requesting help, the manager, and the idle processor. FIGS. 6, 7, and 8 are flow diagrams illustrating the operation of a processor seeking help, the manager, and the idle processor, respectively. The preprocessor inserts a load-balance test function into the sequential version of every divide-and-conquer function. This test determines whether to ask the manager for help with one or more of the recursive function calls. Decision block 602 in FIG. 6 represents the test executed to determine whether to make a request for help. if this test is satisfied, a short message is sent to the manager, containing the total size of the arguments to the function call (604). If the test is not satisfied, the requesting processor continues with sequential execution of the function (606). For binary divide-and-conquer algorithms, the requesting processor then blocks waiting for a reply (606). There is no point in postponing the test for a reply until after the requesting processor has finished the second recursive call, since at that point it would be faster to process the first recursive call locally than to send the arguments to a remote processor, wait for it to finish, and then receive the results. For divide-and-conquer algorithms with a branching factor greater than two, the requesting processor can proceed with another recursive call while waiting for the reply (shown as an optional step 608). FIG. 7 illustrates how the manager processor manages dynamic load balancing. The manager sits in a message-driven lo | ||||||
