Parallel merge sort method and apparatus5852826Abstract A parallel sorting technique for external and internal sorting which maximizes the use of multiple processes to sort records from an input data set. Performance of the sort linearly scales with the number of processors because multiple processors can perform every step of the technique. To begin, the records of a data set to be sorted are read from an input file and written into multiple buffers in memory so long as memory is available. The records within each buffer are then simultaneously sorted to create runs therein. A merge tree is constructed with the runs as stream elements into leaf nodes of the tree, where the stream elements are merged. The stream elements at each node are then merged by multiple processes working simultaneously at the node, thereby generating an output stream of elements for merging at a higher node. For an internal sort, the run that results from all of the merging is immediately written to an output device. For an external sort, the run is an intermediate run, written to secondary storage along with other intermediate runs. A forecast structure provides a forecast of the order of the run blocks from the multiple intermediate runs. These blocks are read in the forecasted order from secondary storage, written into memory and merged through a merge tree to form an ordered record stream that is a complete run for the data set. The ordered record stream is then written to the output device. Claims We claim: Description FIELD OF THE INVENTION
TABLE 1
______________________________________
While there is a block of input data to be read from the input device
While there is sufficient memory available and data to be read do
// tree construction part
Read an input block of data from the input device;
Form a record address vector (RAV) for the input block
Sort the RAV;
Form a stream element (SE) for the RAV;
Insert the SE into the merge tree and do appropriate
merging. // Table 2
Endwhile
// run generation part
Consolidate merge tree(s) into one tree;
Set up the root (top) node of the tree:
Merge and generate a single intermediate run. // Table 3, Table 4
Endwhile
______________________________________
In the tree construction part of the run generation phase, each of the processes reads an input block of data from the input device 12 and writes it to a buffer 30. Each process simultaneously with other processes then forms in the memory 14 an array of record addresses (i.e., pointers to the records in memory 14) called the RecordAddressVector (RAV) for the data block the process has written to the buffer. Each process then sorts the RAV it created on a designated key for the associated records using a conventional sorting method such as the Quicksoft algorithm. The use of RAVs and their pointers reduces the sorting time; however, the invention could be implemented with sorting of the actual records if desired. (For simplicity, sorting will be described in terms of records, although it should be understood from the context whether it is the record pointers that are sorted.) Each process then forms a stream element (SE) with its sorted RAV and some data structures for synchronization purposes. These stream elements are added to streams that connect nodes of a merge tree that is being constructed. In the present embodiment the streams between nodes are implemented as linked lists of stream elements, with the stream elements first in the stream being the first provided to a node. The stream elements created from the RAVs are marked in FIG. 3 as elements 32a-n. These stream elements are preferably large enough to contain all the pointers of the associated RAVs (and therefore contain all the records of the associated input buffer). Stream elements of smaller size, of course, may be used. Stream elements 32 are then inserted into the merge tree and merged appropriately. The inserting and merging of stream elements to construct the merge tree are shown in the pseudocode of Table 2 (where level L is taken to be level 0) and explained below.
TABLE 2
______________________________________
For I from level L to maxlevel do
If no unpaired merge node exists at level I then
Create a merge node at level I;
Insert SE at I as child1;
return,
Else
Insert SE as child2;
Construct a merge at level I producing a SE for higher level merge
node. // Table 3
Endif
Endfor
______________________________________
After a process forms a stream element 32 at the bottom level of the tree, it checks whether there already exists a leaf merge node 34 with an unpaired stream element. If no such node exists, then the process creates the leaf node and inserts its unpaired stream element into it. However, if such a leaf node exists, then the process inserts a second unpaired stream element into the node to pair up the elements. Once two stream elements are paired at a leaf node, a number of records from each element are merged at the node to produce as output a higher level stream element. This merging of stream elements and creating nodes continues until a level is reached in the merge tree where there are no more stream elements to be paired. The just-described merge step must ensure two things, as shown in the pseudocode of Table 3 below. First, it must ensure that there are not too many records merged by one process. If the number of records is too large, then a process handling the merging can take too long to complete it, causing other processes to unduly wait. To ensure that the number of records to be merged by a process is not too large, a limit (labeled RECS.sub.-- PER.sub.-- MERGE (rpm) in the present embodiment) is set. Second, a process must ensure that the records reserved from each stream form a mergeable set in the sense that the merge results can be concatenated with the results of the next merge to form a run. Table 3 states this condition in the case where there are no duplicate keys in the records. The duplicative case is similar but more complex to state.
TABLE 3
______________________________________
Limit number of records from two streams together to be
RECS.sub.-- PER.sub.-- MERGE.
Reserve r1 and r2 records (counting from 0) from the two streams using
binary search such that (r1-1)th rec in stream 1 < r2th rec in stream 2
and
(r2-1)th rec in stream 2 < r1th rec in stream 1.
______________________________________
In the merge step a process starts by identifying a certain number of records (rpm/2) from each stream into the node, beginning with the next record of the stream to be merged at the node. These next records are the starting points for the merge. The process then performs a binary search in the stream whose last identified record has the higher key, moving downward in the records toward lower keys until the conditions in Table 3 are satisfied. The records from the starting points to the r1th and r2th records (the ending points) in the streams are then reserved by the process for merging. The reserved records are merged at the node to form a higher level stream element. Once the process has reserved its records to merge, it leaves new starting points for another process to merge records in parallel at the same merge node. A second process identifies rpm/2 records in each stream beyond the previous process's ending points. The second process then performs the binary search on the stream whose last identified record has the highest key, setting ending points when it completes it search. Additional processes can use these ending points in the same manner to perform additional merges at the node. Once a process has determined what records it will merge, it begins merging them (the second part of the merge step). Ultimately several processes can perform the second part in parallel, dramatically increasing the rate at which records are merged at a node. This parallel merging is particularly useful when one portion of the merge tree is busy because of a skew in the data. Since multiple processes perform the merge, they need to reserve the records to be merged in a critical section that should be small for performance reasons. Thus, for the merge to be efficient the amount of time spent reserving records should be much smaller than the copying time. Processes reserve records from the two input streams using a binary search, and the time for the reservation of records is O(log N). The time for merging and copying is O(N). In general, reserving thousands of records per merge will be optimal since the logarithm of a thousand is much less than a thousand. But the number of records chosen to merge should not be too large or it causes a load imbalance. For example, with too many records to merge, the last process to finish holds up the other processes at the barrier before the final merge phase. Once a process has reserved its records, it may begin merging the records it has reserved. Simultaneous merging of the records reserved by each process at a node follows. The records merged by each process are then concatenated to produce an output stream element at a higher level. The number of records in the output stream is RECS.sub.-- PER.sub.-- MERGE or fewer, depending on the ending points in the binary search. Referring to FIG. 3 for an example, an equal number of records from stream elements 32a and 32b are identified (rpm/2). A process then performs a binary search on the stream element whose last identified record has the higher key, moving downward through the identified records. The reserved records are merged at node 34a to form a stream element 36a. The number of records in this stream element is at most RECORDS.sub.-- PER.sub.-- MERGE (rpm/2+rpm/2) and may be fewer depending on the outcome of the binary search. Stream element 36a becomes part of a stream of such elements that are merged with a stream containing stream elements 36b produced by leaf node 34b. To continue the tree construction, records from these elements are then merged at higher level node 38a, starting initially with rmp/2 records from each input stream. Merging in the tree construction part of the run generation phase thus is normally from the bottom of the tree upward. As one process is merging stream elements 32 at leaf nodes 34 and progressing up the tree, other processes can be filling input buffers 30 and merging stream elements as well. However, sometimes the input streams into a node may run dry during the tree construction part because the initial number of records merged has been spread too thin throughout the growing tree. To prevent this from occurring, the process moves back down the tree, performing a lower level merge to "backfill" the shorter input stream and thereby provide more records to a higher level node. In the preferred embodiment, in moving down the tree the process picks the lower level node whose data will next be needed. This need can be determined in a number of ways. One way is to look at the input stream whose last record has the lower key. Another way is to look at the first record of the four input streams of the two lower level nodes and determine which record has the lowest key. The steps of the tree construction part repeat until memory 14 is no longer available or the input data set has been completely read. At that point no more buffers 30 are used in memory 14 and no more leaf nodes are created. Because the act of reading data into the input blocks is performed simultaneously with the merging of stream elements, tree construction is almost complete by the time the last input buffer is filled. The run generation part of the run generation phase begins with consolidating all sub-trees of the merge tree into a tree with a single root (i.e., top) node. Examples exist where the number of buffers 30 is not a power of two, such as six, ten, thirteen, etc. In such a case, there may be unpaired merge nodes at any level of the tree. Starting at the bottom of the tree, the unpaired nodes are paired to produce an output stream element and merge node that in turn is paired with the next highest unpaired node and so on until the sub-trees are consolidated into a tree with a single root node (such as node 42 in FIG. 3) through which all records must pass in being flushed from the tree. With the merge tree consolidated, the run is generated by merging the records through the tree and writing them to secondary storage 16 or output device 18 (depending on the size of the data set). The flushing begins with merging of stream elements at root node 42 of the tree and continues downward to leaf nodes 32. Merging here is the same two-part step performed in the tree construction. Many processes such as five, eight, etc. can simultaneously participate in simultaneous merging at each node depending upon the number of processors in system 10 as described earlier in the tree construction part. Table 4 contains pseudocode that describes in detail how merging operates throughout the merge tree during this part of the run generation phase. Node M is initially the root node. Merging of the runs begins at node M and proceeds toward the bottom of the tree.
TABLE 4
______________________________________
While true do
Allocate records for a merge at node M
// Table 3
If records available for merge, then
Merge allocated records.
Else if end of stream marker from both input streams reached, then
Generate last end of stream marker at the output of node M.
return
If no child nodes exist then
return
Else
Pick M to be the child that generated the smaller last record.
Endwhile
______________________________________
As a stream into the root node runs dry and more records are needed, a process seeking rpm records will follow the stream with the lowest key to the next node down the tree in pursuit of additional records. For example, with reference to FIG. 3, assume that the key of the last record in the stream containing stream element 40a is abc and the key of the last record in the stream containing stream element 40b is def. If a process attempting to merge records from these two streams cannot find rpm/2 in each stream, it proceeds to node 38a and merges stream elements 36a and 36b to obtain more records for the stream feeding node 42. Similarly, when the streams into node 38a begin to run dry, a process will proceed to node 34a or 34b to merge records from the streams feeding into these nodes. This procedure of moving from the top to the bottom of the merge tree 31 is followed by all of processes that perform merging. When a process reaches a leaf node, it then returns to root node 42 which now has additional records to merge. In this manner the merge step is repeated from top to bottom until all of the records within memory have been flushed from the merge tree. If the run of records flushed through the merge tree is all the records of the input data set, then the run is complete and is written to output device 18. If the records do not represent the complete data set, then the run is an intermediate run that is written to the secondary storage 16. Pseudocode for merging and generating a run is shown in Table 5. As the pointers representing the records are flushed from the tree in stream elements 44, the records themselves are copied from their input buffers into an output buffer in memory (not shown). As they are copied to the buffer they are grouped in run blocks within a larger output block 46. The output block (rather than the records or run blocks individually) is then written to secondary storage 16 to reduce the number of disk accesses. As will be seen, the run blocks are read from the secondary storage 16 during the final merge phase. The use of an output block is helpful in writing ordered records to the secondary storage or the output device but is not necessary to the invention. If desired, the records could be written directly from the various input buffers to storage.
TABLE 5
______________________________________
While root SE is not the last stream element do
If root stream empty then
Generate SEs at root node by performing merges top to bottom.
// Table 4
Else
Select the top SE from the root stream.
If internal sort (data set completely within memory) then
Write to output device
Else
Copy records to output buffer, saving forecasting structure
entries.
Flush output block when full to secondary storage.
Endwhile
______________________________________
To further improve the sorting speed, the computer system 10 can be adapted to simultaneously write run blocks of the intermediate run to multiple locations in secondary storage 16, such as to a series of separate disks across which the sorted output data stream may be striped. During the run generation phase for an external sort, a forecast structure 48 (FIG. 3) such as a table or any other suitable data structure is generated with an entry for each run block written to the secondary storage. The forecast structure initially contains an unsorted list of the intermediate runs as they are stored to the secondary storage, indicating for each run the disk address for each run block and the first key in each run block. The forecast structure may reside in memory or secondary storage. For an input data set requiring external sorting, all the processes synchronize at a barrier after every intermediate run generation. Intermediate runs are generated until the last input data is read from the input device 18, sorted into buffers 30, flushed through merge tree 31 and written to secondary storage 16. The Final Merge Phase The final merge phase is used when external sorting is required to sort the input data set. In this situation the volume of input data to be sorted exceeds the space in memory 14, requiring that several intermediate runs be formed to sort the data. As described, these runs are stored temporarily in secondary storage 16. During the final merge phase, the intermediate runs are read from the secondary storage into memory 14 and merged into a complete run that is written to output device 18. The final merge phase is described with reference to FIGS. 4 and 5 and the following tables of pseudocode. FIG. 5 shows a merge tree 50 for the final merging, a number of forecast streams 52 (e.g., lists) for forecasting which run blocks will next be merged, and a staging area 54 for storing these forecasted blocks in memory as they are read from secondary storage 16. All of these data structures are present in memory 14. Merge tree 50 is constructed so that each intermediate run forms an input stream to the tree. For example, if there are N intermediate runs from the input data set, then there are N input streams to be merged through the leaf nodes of merge tree 50. The overall steps of the final merge phase are shown in the pseudocode of Table 6.
TABLE 6
______________________________________
Sort the forecast structure.
Distribute initial forecast to forecast streams.
// Table 7
Read run blocks from secondary storage using forecast
// Table 7
streams.
Construct merge tree. // Table 7
Perform merges to generate complete run.
// Tables 7, 8, 9
______________________________________
The final merge phase may also be thought of as having a tree construction part and a run generation part, although the two parts in this phase overlap. To begin the tree construction part, one of the several processes initially sorts forecast structure 48 by the first key of each run block while the other processes begin constructing the merge tree. FIG. 4 is an example of a sorted forecast structure 48. As illustrated, each entry has a field 48a for the run number, a field 48b for the run block disk address and a field 48c for the first key in the run block. In the example, the forecast structure is sorted in ascending order (A<B). The processes construct merge tree 50 by reading run blocks from secondary storage 16 into memory and merging them. Table 7 contains pseudocode that describes in more detail how this merging is done in the preferred embodiment 50.
TABLE 7
______________________________________
While not end of forecasting structure and memory available > a run
block do // Memory check
Get the next entry from the forecasting structure // Fill forecast
streams
Add entry info to the forecasting stream list of the corresponding run.
Endwhile
While runs visited < total number of runs or enough blocks not read
do // Construct tree and merge,
Choose a run for reading using a shared index;
While the read list for the run is not exhausted do
Combine contiguous run blocks from the read list up to KMAX
Perform a single read for the contiguous blocks.
Form a stream element and add to the merge tree.
Get the first unread key info from the next entry in the forecast
structure into the forecast stream for the run.
Endwhile
Endwhile
______________________________________
Initially the forecast table distributes a forecast of run blocks to be merged to the forecast streams 52. This initial forecast contains the first M run blocks in the forecast structure, where M is the number of run blocks that can be stored at one time in staging area 54. As they are read from the forecast structure, the run block addresses are listed in their associated run forecast stream. Using the forecast structure entries in FIG. 4 as an example with M being four, structure 48 would distribute addresses for the first four entries in the structure to forecast streams 52a and 52b. The other forecast streams would not receive run block addresses until space is available in staging area 54 for storing the associated run blocks. A pointer 56 is used to keep track of the next structure entry to be distributed to the forecast streams. Once addresses for the run blocks in the initial forecast have been distributed to the forecast stream, the several processes read the run blocks listed in the streams into staging area 54. This reading is done run by run through the use of another pointer 58 that points to the next forecast stream to be read. Pointer 58 repeatedly moves in circular fashion from stream 52a to 52n. Pointer 58 initially points to stream 52a (run 0), a process reads the run blocks identified in forecast stream 52a into staging area 54. A process reads a number of run blocks (up to a maximum number) in one access to secondary storage where the run blocks are contiguous in secondary storage and appear concurrently in a run's forecast stream, such as in stream 52a. The pointer moves to the next forecast stream and a process reads the blocks listed therein into the staging area. These steps repeat until the processes eventually read all of the run blocks from secondary storage 16 into staging area 54. After a process has read a series (one or more) of run blocks into the staging area, it generates an RAV and then a stream element for a run block using the same algorithm described in Table 2. In FIG. 5, for example, a process generates a stream elements 62a from run block 0 of run 0. The process then inserts the SE into merge tree 50 at a leaf node and performs merges from the leaf node to a level in the tree at which there is no stream to pair with, again as described in Table 2. Finally, the process reads the key from the first record of the next run block in the forecast structure for avoiding deadlock, for reasons to be described. The acts of reading the forecast streams, reading the blocks identified in the run forecast stream into the staging area, generating RAVs, SEs and merging them, etc. are carried on in parallel by the several processes. This leads to a quick construction of merge tree 50. Unlike in the run generation phase, however, merge tree 50 can begin flushing records before the tree reaches its full size because run blocks read into staging area 54 are already sorted. It is not necessary to have a run block from every run in the merge tree to begin merging these ordered records. The run generation part of the final merge phase begins with the flushing of records through a root node. In FIG. 5 the root node is marked 60, but it should be understood that the initial root node may be in a lower level if, for example, records are flushed from merge tree 50 before a run block from run N is read into staging area 54. Merging in the final merge phase is performed in the same manner as in the run generation part of the run generation phase, except that the merging is done from the bottom of the tree towards the root (top) node as shown in Table 8.
TABLE 8
______________________________________
Pick a leaf node M that has records to merge
While true do
Allocate records for merging at node M
// Table 9
If records available for merge then
Merge allocated records
Else if last stream elements from both input streams then
Generate last stream element for the output of node M
If parent of node M available then
let M point to the parent node.
Else
return
Endwhile
______________________________________
As a process completes merging at one node, it proceeds upward toward the next level node. When more than one process reaches a node, each merges a stream element in parallel with the other processes at the node and then continues up the tree. When a process can no longer move up the tree to merge records, it follows pointer 58 to find the next run blocks to be read into staging area 54. The process then advances the pointer, reads the listed blocks into the staging area, and repeats the steps of forming an RAV, SE, merging, etc, as described in Table 7. Additional run blocks are written to staging area 54 as the records of the complete run are flushed from root node 60 and memory is made available in the stageing area. In the present embodiment, a check is made for available memory each time pointer 58 completes a loop through the forecast streams. In FIG. 5, for example, the memory check is made after pointer 56 advances from stream 32n to stream 32a. Other memory checks, of course, can be used. The present memory check is described in Table 7. When multiple devices are used for the secondary storage, the large blocks of data can be allocated in a random fashion to allow simultaneous reading of data from the storage. Alternatively, the run blocks can be striped across several storage devices to increase read bandwidth. In the final merge phase, deadlocks may be a problem since all of the data are not available in the merge tree while performing merges. For example, in the run generation phase a minimum number of records are reserved, or allocated, for each merge to limit the overhead of allocation. But in the final merge phase a minimum allocation requirement can result in a deadlock when memory conditions are tight. To avoid this deadlock, whatever records that are available at a node are merged. A deadlock may also arise if data is not available at both input streams to a node. Since data are read from the secondary storage per the forecast structure, skewed data could cause one of the streams to become dry. This can result in a deadlock if data is not available in the required streams and the memory is full. This deadlock is avoided by the use of the forecast structure 48. Since it has the first key information for each run block, each run can be marked with the first key that is unread so far. This information can be propagated up the tree so that records can pass through a node so long as they are lower than the key of the first unread record. The merging of records throughout merge tree 50 produces a complete run of the records in the input data set. This complete run is continually written to the output device 18 as the run is being generated. As in the run generation phase, the records in the complete run are preferably copied from staging area 54 into an output buffer and then written as output blocks to the output device 18. And as in storing the intermediate runs, computer system 10 can be adapted to simultaneously write records to multiple locations such as by "striping" the data across several individual output devices. Writing the output stream to the output device then comprises writing portions of the output run in parallel to the multiple locations. Table 9 contains pseudocode that describes a preferred manner for writing the output stream to the output device 18.
TABLE 9
______________________________________
While root SE not the end of stream marker do
If root stream from the merge tree is empty then
Generate stream element at root by performing merges bottom
to top // Table 8
Else
Pick the top stream element from the root stream
Write to output device.
For each record address in the stream element do
If record address points to last record in the page then
mark the page as free for recycling
Endif
Endfor
If reading from secondary storage not done yet then
Read run blocks from secondary storage.
Endif
Generate SEs at root by performing merges bottom to top
Endwhile
______________________________________
Having illustrated and described the principles of the invention in a preferred embodiment, it should be apparent to those skilled in the art that the preferred embodiment can be modified in arrangement and detail without departing from such principles. For example, many of the software aspects of the embodiment may be implemented in hardware and vice versa. The embodiment described is a two-pass sort; but if the data set is large enough that two passes are not sufficient, it may be easily extended to a multi-pass sort of more than two passes. The records to be sorted may be of fixed or variable length. Features that may be added to improve cache effect include the use of processor affinity, the scheduling of merges in the merge trees and the placement of key prefixes in the RAVs. Conditional locking may also be added to remove lock waiting. Steps described above in sorting records can often be executed in different order as known to those of skill in the art. In view of the many possible embodiments to which the principles of the invention may be applied, it should be recognized that the illustrated embodiment is only a preferred example of the invention and should not be taken as a limitation on its scope. The invention is limited only by the scope of the following claims. We therefore claim as the invention all that comes within the scope of these claims.
|
Same subclass Same class Consider this |
||||||||||
