Apparatus and method for decomposing database queries for database management system including multiprocessor digital data processing system6289334Abstract An improved system for database query processing by means of "query decomposition" intercepts database queries prior to processing by a database management system ("DBMS"). The system decomposes at least selected queries to generate multiple subqueries for application, in parallel, to the DBMS, in lieu of the intercepted query. Responses by the DBMS to the subqueries are assembled by the system to generate a final response. The system also provides improved methods and apparatus for storage and retrieval of records from a database utilizing the DBMS's cluster storage and index retrieval facilitates, in combination with a smaller-than-usual hash bucket size. Claims In view of the foregoing, what we claim is: Description REFERENCE TO APPENDICES
. . . .
. . . .
. . . .
Smith 1056 5 Oak Avenue 10
James 1058 3 State Street 41
Wright 1059 15 Main Street 25
. . . .
. . . .
. . . .
One or more indexes on large tables are generally provided to facilitate the most common data accesses, e.g., look-ups based on employee name. In relational systems, corresponding rows in two or more tables are identified by matching data values in one or more columns. For example, the department name corresponding to a given employee may be identified by matching his or her department number to a row in a department data table that gives department numbers and department names. This is in contrast to hierarchical, network, and other DBMS's that use pointers instead of data values to indicate corresponding rows when tables are combined, or "joined." Relational DBMS's typically permit the operator to access information in the database via a query. This is a command that specifies which data fields (columns) are to be retrieved from a database table and which records (rows) those fields are to be selected from. For example, a query for the names of all employees in department 10 might be fashioned as follows:
SELECT name, department_number
FROM employee
WHERE department_number = 10
There is no particular ordering of the resulting rows retrieved by the DBMS, unless the query specifies an ordering (e.g., ORDER BY name). A query may also involve multiple tables. For example, to retrieve department names instead of numbers, the above query might be refashioned as follows:
SELECT name, department_name
FROM employee, department
WHERE department_number = 10
AND employee.department_number=department.department_number
A particular relational data table need not be stored in a single computer file but, rather, can be partitioned among many files. This makes such tables particularly suited for use on multiprocessor computer systems, i.e., computer systems having multiple processors and multiple disk drives (or other storage devices) of the type disclosed in U.S. Pat. No. 5,055,999. Unfortunately, prior art DBMS's have not proven capable of taking full advantage of the power of such multiprocessing systems and, particularly, their power to simultaneously process data (in parallel) from multiple partitions on multiple storage devices with multiple central processing units. In view of the foregoing, an object of the invention is to provide improved methods and apparatus for database management and, particularly, improved methods and apparatus for data base management capable of operating on multiprocessor systems. A further object of the invention is to provide improved systems for database management capable of effectively accessing a relational database contained in multiple tables and multiple partitions. A still further object is to provide improved methods and apparatus for storing and retrieving data for access by a DBMS. These and other objects are evident in the attached drawings and the description which follows. SUMMARY OF THE INVENTION The foregoing and other objects are attained by the invention which provides, in one aspect, improvements to digital data processors of the type having a database management system (DBMS) that accesses data records stored in a database table contained among plural independently accessible partitions (e.g., data partitions contained on separate disk drives), where that DBMS has a standard interface for processing queries to access those data records. The improvement is characterized by a parallel interface that intercepts selected queries prior to substantive processing by the standard interface. The standard interface is often called the "server" interface; it is accessed by clients that are the source of queries. A decomposition element within the parallel interface generates multiple subqueries from the intercepted query. Those subqueries, each representing a request for access to data stored in a respective partition of the table, are applied in parallel to the standard interface in lieu of the intercepted query. Responses by the DBMS to the subqueries are reassembled to generate a final response representing the response the DBMS would have generated to the intercepted query signal itself. Such reassembly can include interleaving the data contained in the responses (e.g., to create a single sorted list) or applying an aggregate function (e.g., sum or average) to that data. According to a further aspect of the invention, the decomposition element generates the subqueries to be substantially identical to the intercepted signal but including an "intersecting predicate" (i.e., additional query conditions) that evaluates true for all data records in respective partitions of said database table and false for all others. This can be, for example, a logically AND'ed condition that evaluates true for records in the respective partition. Continuing the first example above, assuming that the employee database is partitioned randomly across multiple partitions, a subquery for the first partition could be generated as follows (where rowid has three parts, the last of which indicates the partition number):
SELECT name, department_number
FROM employee
WHERE department_number = 10 AND
employee.rowid>=0.01 AND
employee.rowid<0.0.2
In another aspect, the invention contemplates a further improvement to a digital data processing system of the type described above, wherein the DBMS responds to selected queries for accessing data recordsjoined from one or more of database tables, and wherein the DBMS includes an optimizer for determining an optimal strategy for applying such queries to the tables. The improvement of this aspect is characterized by an element for identifying, from output of the optimizer, a driving table whose partitions will be targeted by subqueries generated in responding to an intercepted query. The improvement is further characterized by generating the subqueries to include, in addition to the predicate list of the intercepted query, an intersecting predicate for all data records in respective partitions of the driving database table. Those skilled in the art will appreciate that tables referenced in the query other than the driving table need not be identically partitioned to the driving table, nor co-located with its partitions on storage devices. Tables may be accessed through either full-table scans or indexed scans, i.e., whether the DBMS searches all blocks of the relevant partition or only those indicated by a relevant index. According to another aspect, the invention provides an improvement to a digital data processing system of the type described, wherein the DBMS's standard interface is invoked by a procedure or function call. The improvement is characterized by functionality for invoking the parallel interface in lieu of the client-side portion of the standard interface in response to such a procedure/function call. And, by responding to a query for generating plural subqueries in the form of further procedures/functions to the standard server interface. The parallel interface can form part of an object code library for linking with a computer program including procedures/function calls for invoking the DBMS. In still another aspect, the invention contemplates an improvement to a digital data processing system as described above, wherein the standard interface normally responds to insert/select queries by placing requested data from the database table means in a further database table (i.e., as opposed to merely printing the requested data or otherwise outputting it in text form or merely returning the data to the requesting program). The improvement of this aspect is characterized by generating the plural subqueries so as to cause the DBMS to place the data requested from each respective partition in the designated database table. In yet another aspect of the invention, a digital data processing system as described above can include functionality for executing multiple threads, or "lightweight processes," each for applying a respective subquery signal to the DBMS's interface element. Those threads can be executed in parallel on multiple central processing units, and can be serviced by multiple server processes within the DBMS that also execute in parallel. Further aspects of the invention provide improvements to a digital data processing system of the type having a storage element (e.g., a disk drive or other random-access media) for storing and retrieving data records, as well as a DBMS having (i) a hashing element to effect storage of data records in "hash bucket" regions in the storage element, where each record is stored in a root hash bucket region corresponding to a hash function of a selected value of the data record or, alternatively, to effect storage of data records in an overflow hash bucket region associated with that root hash bucket region; and (2) an indexing element to index each stored data record for direct access in accord with a respective value of that data record. The improvement is characterized by a scatter cluster retrieval element that responds to a request for accessing a data record previously stored via the hashing element, by invoking the indexing element to retrieve that record in accord with the index value thereof, where stored records have previously been indexed by the indexing element with respect to the same fields (columns) used by the hashing element. In a related aspect of the invention, the hashing element stores the data records in hash bucket regions that are sized so as to create at least one overflow hash bucket region per root bucket region, and such that overflow bucket regions for a given root bucket region are distributed roughly evenly across different storage partitions. Another aspect of the invention provides a digital data processing system of the type described above, in which plural subcursor buffers are associated with each subquery signal for storing results generated by the DBMS's standard interface means in response to that subquery signal. To assemble all results of those subqueries, a root buffer stores a then-current result, while a fetching element simultaneously assembles a final result signal based upon those results currently stored in selected subcursor buffers. As results are taken from each of those buffers, they are emptied. For each such emptied buffer, a subquery is applied to the standard interface asynchronously with respect to demand for that buffer's contents in assembling the final result. In the case of queries involving aggregates, the root buffer stores then-current results in a temporary table to be queried later by an aggregate query generated by the decomposition element. In still other aspects, the invention provides a method for digital data processing paralleling the operation of the digital data processing system described above; i.e., "transparent" to the DBMS client other than by improved performance. BRIEF DESCRIPTION OF THE DRAWINGS A better appreciation of the invention may be attained by reference to the drawings, in which FIG. 1 depicts a preferred multiprocessing system used to practice the invention. FIG. 2 illustrates in greater detail processing cells and their interconnection within the processing system of FIG. 1. FIG. 3A depicts a standard arrangement of processes and software modules utilized in digital data processor 10 without query decomposition and data access according to the invention. FIG. 3B depicts a preferred arrangement of threads, processes and software modules utilized in digital data processor 10 for query decomposition and data access according to the invention. FIG. 4 shows the operation of assembler 74B on results generated by the DBMS 76 and threads 78A, 78B, 78C in response to the subquery signals. FIG. 5 depicts a preferred mechanism, referred to as "scatter clustering," for storing and retrieving data from database 72. FIGS. 6-7 are used in connection with the discussion of the operation and use of a preferred query decomposition system according to the invention. FIGS. 8-10 are used in connection with the discussion of design provided in Database Note #26. FIGS. 11-13 are used in connection with the discussion of query decomposition for applications running on client workstations in Database Note #61. FIGS. 14-16 are used in connection with the discussion of the framework of rules for automating query decomposition in Database Note #32. FIGS. 17-23 are used in connection with the discussion of parallel cursor building blocks in Database Note #36. FIGS. 24-25 are used in connection with the discussion of parse tree requirements for query decomposition in Database Note #37. FIGS. 26-27 are used in connection with the discussion of query decomposition control structures in Database Notes #41. FIGS. 28-30 are used in connection with the discussion of upper tree parallelism in parallel cursors in Database Note #42. DETAILED DESCRIPTION OF THE ILLUSTRATED EMBODIMENT FIG. 1 depicts a preferred multiprocessing system used to practice the invention. The illustrated system 10 includes three information transfer levels: level:0, level:1, and level:2. Each information transfer level includes one or more level segments, characterized by a bus element and a plurality of interface elements. Particularly, level:0 of the illustrated system 10 includes six segments, designated 12A, 12B, 12C, 12D, 12E and 12F, respectively. Similarly, level:1 includes segments 14A and 14B, while level:2 includes segment 16. Each segment of level:0, i.e., segments 12A, 12B, . . . 12F, comprise a plurality of processing cells. For example, segment 12A includes cells 18A, 18B and 18C; segment 12B includes cells 18D, 18E and 18F; and so forth. Each of those cells include a central processing unit and a memory element, interconnected along an intracellular processor bus (not shown). In accord with the preferred practice of the invention, the memory element contained in each cells stores all control and data signals used by its associated central processing unit. Certain cells of the processing system 10 are connected to secondary storage devices. In the illustrated system, for example, cell 18C is coupled with disk drive 19A, cell 18D is coupled with disk drive 19B, and cell 18O is coupled with disk drive 19C. The disk drives 19A-19C are of conventional design and can be selected from any of several commercially available devices. It will be appreciated that secondary storage devices other than disk drives, e.g., tape drives, can also be used to store information. FIG. 2 illustrates in greater detail processing cells and their interconnection within the processing system of FIG. 1. In the drawing, plural central processing units 40A, 40B and 40C are coupled, respectively, to associated memory elements 42A, 42B and 42C. Communications between the processing and memory units of each pair are carried along buses 44A, 44B and 44C, as shown. Network 46, representing the aforementioned level segments and routing cells, transfers information packets (passed to the network 46 over buses 48A, 48B and 48C) between the illustrated processing cells 42A-42C. In the illustrated embodiment, the central processing units 40A, 40B and 40C each include an access request element, labeled 50A, 50B and 50C, respectively. These access request elements generate requests for access to data stored in the memory elements 42A, 42B and 42C. Among access requests signals generated by elements 50A, 50B and 50C is the ownership-request, representing a request for exclusive, modification access to a datum stored in the memory elements. In a preferred embodiment, access request elements 50A, 50B and 50C comprise a subset of an instruction set implemented on CPU's 40A, 40B and 40C. This instruction subset is described below. The central processing units 40A, 40B, 40C operate under control of an operating system 51, portions 51A, 51B and 5IC of which are resident on respective ones of the central processing units. The operating system 51 provides an interface between applications programs executing on the central processing units and the system 10 facilities, and includes a virtual memory management system for managing data accesses and allocations. A preferred operating system for controlling central processing units 40A, 40B and 40C is a UNIX-like operating system and, more preferably, OSF/1, modified in accord with the teachings herein. The memory elements 40A, 40B and 40C include cache control units 52A, 52B and 52C, respectively. Each of these cache control units interfaces a data storage area 54A, 54B and 54C via a corresponding directory element 56A, 56B and 56C, as shown. Stores 54A, 54B and 54C are utilized by the illustrated system to provide physical storage space for data and instruction signals needed by their respective central processing units. A further appreciation of the structure and operation of the illustrated digital data processing system 10 may be attained by reference to the following co-pending, commonly assigned applications, the teachings of which are incorporated herein by reference:
Application No. Title Filing Date
07/136,930 MULTIPROCESSOR DIGITAL 12/22/87
(now U.S. PAT. NO. 5,055,999) DATA PROCESSING SYSTEM
07/696,291 MULTIPROCESSOR SYSTEM 04/26/91
(now U.S. PAT. NO. 5,119,481) WITH SHIFT REGISTER BUS
07/370,341 SHARED MEMORY
(now U.S. PAT. NO. 5,297,265) MULTIPROCESSOR SYSTEM 06/22/89
AND METHOD OF OPERATION
THEREOF
08/100,100 IMPROVED MEMORY SYSTEM 7/30/93
(now abandoned) FOR A MULTIPROCESSOR
07/370,287 IMPROVED MULTIPROCESSOR 06/22/89
(now U.S. PAT. NO. 5,251,308) SYSTEM
07/521,798 DYNAMIC PACKET ROUTING 05/10/90
(now U.S. PAT. NO. 5,182,201) NETWORK
07/763,507 PARALLEL PROCESSING 09/20/91
(now abandoned) APPARATUS AND METHOD FOR
UTILIZING TILING
07/499,182 HIGH-SPEED PACKET 03/26/90
(now U.S. PAT. NO. 5,335,325) SWITCHING APPARATUS AND
METHOD
07/526,396 PACKET ROUTING SWITCH 05/18/90
(now, U.S.
PAT. NO. 5,226,039)
07/531,506 DYNAMIC HIERARCHICAL 05/31/90
(now U.S. PAT. NO. 5,341,483) ASSOCIATIVE MEMORY
07/763,368 DIGITAL DATA PROCESSOR 09/20/91
(now abandoned) WITH IMPROVED PAGING
07/763,505 DIGITAL DATA PROCESSOR 09/20/91
(now U.S. PAT. NO. 5,313,647) WITH IMPROVED
CHECKPOINTING AND FORKING
07/763,132 IMPROVED DIGITAL DATA 09/20/91
(now abandoned) PROCESSOR WITH
DISTRIBUTED MEMORY
SYSTEM
07/763,677 FAULT CONTAINMENT SYSTEM 09/23/91
(now abandoned) FOR MULTIPROCESSOR WITH
SHARED MEMORY
Query Decomposition FIG. 3A depicts a standard arrangement of processes and software modules utilized in digital data processor 10 without query decomposition and data access according to the invention. FIG. 3B depicts a preferred arrangement of processes and software modules utilized in digital data processor 10 for query decomposition and data access according to the invention. An initiating process 70 generates a query for accessing data stored in relational database 72 having data partitions 72A, 72B, 72C. The query is generated in a conventional format otherwise intended for a conventional DBMS 76. In a preferred embodiment, that conventional format is SQL and that conventional DBMS is the ORACLE7.TM. Database Management System (hereinafter, "ORACLE" or "ORACLE Version 7") of Oracle Corporation. Those skilled in the art will appreciate that other DBMS's and query formats may be substituted for the preferred ones without deviating from the spirit of the invention. However, those skilled in the art will also appreciate that a DBMS (such as ORACLE Version 7) used in connection with the preferred embodiments of invention disclosed below must be capable of efficiently running queries that specify "intersecting predicates" against relevant database partitions, i.e., they must avoid searching partitions other than those specified in those predicates. Rather than being routed directly to DBMS 76, the query is intercepted by the parallel user program interface ("PUPI" or "parallel interface"). Element 74A (responsible for decomposing the query) routes queries not susceptible to decomposition to DBMS 76, but for a decomposable query it generates a set of subqueries, each of which is based on the initial query but which is directed to data in one or more respective of the partitions 72A, 72B, 72C of database 72. Then element 74A initiates and invokes threads 78A, 78B, 78C, which initiate execution of the subqueries. The subqueries corresponding to threads 78A, 78B, 78C are routed to the user program interface ("UPI" or "standard interface") of DBMS 76 (in lieu of the intercepted query), as shown in the drawing. Multiple subqueries are preferably applied to the UPI of DBMS 76 in parallel with one another, thus capitalizing on the database partitions and on the multiprocessing nature of the preferred digital data processing system 10. Each thread routes its subquery to a separate server process in DBMS 76. The DBMS 76 responds in the conventional manner to each subquery by generating appropriate requests (e.g., a disk read) for access to the database 73 and, particularly, for access to respective partitions of that database (unless the data requested is already in memory). Data retrieved from the database 72 in response to each subquery is processed in the normal manner by DBMS 76 and is routed to processes 76A, 76D and 76G. Those responses, in turn, are routed to parallel interface assembly section 74B which assembles a response like that which would have been generated by the DBMS 76 had the intercepted response been applied directly to it. The assembled response produced by assembly section 74B is generally returned to the initiating process 70 more quickly than that which would have been generated by the DBMS 76 had the intercepted query been applied directly to it. This is a consequence of decomposition of the intercepted query and its parallel application to the UPI of DBMS 76. It is also a consequence of the architecture of the underlying multiprocessor, which permits multiple server processes to run simultaneously. Though it will be appreciated that, even when running on a uniprocessor, the concurrent execution of multiple subqueries could speed access where there is overlapping I/O and CPU processing. As noted above, the decomposer 74A generates subqueries based on the conventionalformat query intercepted from the initiating process. For simple, single-table queries, the decomposer 74A generates corresponding .subqueries by duplicating the query and appending a predicate for matching records in the corresponding table partition. Thus, for example, a query in the form
SELECT name, department_number
FROM employee
WHERE department_number = 10
would result in the first subquery of the form:
SELECT name, department_number
FROM employee
WHERE department_number = 10 AND
employee.rowid>=0.0.1 AND
employee.rowid<0.0.2
where rowid has three parts, the last of which indicates the partition number. Other subqueries would be of similar form, with changes to the partition numbers referenced in the rowid predicates. For queries joining two or more tables, the decomposer 74A generates corresponding subqueries by duplicating the query and appending a predicate for matching records in the corresponding table partition of the driving table, which is selected by the decomposer 74A based on the access strategy chosen by the query optimizer portion 76B of the DBMS 76. Those skilled in the art will appreciate that information from the optimizer 76B, including possible tables to be chosen as the driving table, can be obtained from data files generated by the DBMS 76 in connection with the query, and accessed by use of the "EXPLAIN" command. FIG. 4 shows the operation of assembler 74B on results generated by the UPI of DBMS 76 and threads 78A, 78B, 78C in response to the subquery signals. More particularly, the drawing shows that for intercepted queries that call for aggregate data functions, element 74C performs a like or related data function of the results of the subqueries. Thus, for example, if the intercepted query seeks a minimum data value from the database table--and, likewise, the subqueries seek the same minimum value from their respective partitions--then element 74C generates a final result signal representing the minimum among those reported to the assembler 74B by the DBMS 76 and threads 78A, 78B, 78C. Likewise, if the intercepted query seeks an average value from the database table--and, likewise, the subqueries seek a sum and a count from the respective partitions--then element 74C generates an average table value through a weighted average of the reported subquery results. Moreover, if the intercepted query seeks a standard deviation or variance from the database tables, the decomposer 74A generates subqueries requesting related functions of the data, e.g., the sum, count and sum of the squares of the data. Such aggregate processing is preferably applied to, for example, intercepted queries requesting (i) a minimum or maximum of an item in the records (ii) an average of selected items, (iii) a standard deviation and variance of selected items, and (iv) a sum and a count of selected items. As further shown in FIG. 4, for intercepted queries that call for non-aggregate data functions, element 74D generates a final result signal by interleaving the results of the subqueries. For example, if the intercepted query seeks a sorted list of data values from the database table--and, likewise, the subqueries seek sorted lists from their respective partitions--then element 74D generates a final result signal by interleaving (in the specified sort order) the items presented in the results reported to the assembler 74B by the DBMS 76 and threads 78A, 78B, 78C. Other non-aggregate queries involving, for example, (i) a distinct value of an entire result row, (ii) a nested selection of items, and/or (iii) a correlated selection of items are processed accordingly. For queries that combine aggregate and non-aggregate functions, a combination of elements 74C and 74D are invoked. For queries involving grouping operations, the decomposer 74A generates corresponding subqueries by duplicating the query, along with the grouping clause in its predicate list. For each group, data retrieved by the DBMS in response to those subqueries is placed in a temporary table. For that group, the assembly section 74B generates and passes to the DBMS a "group by" combining query to be applied to the temporary table. The results of those queries are returned to the initiating process 70 in lieu of the response that would have been generated by the DBMS 76 had the intercepted query been applied directly to it. For queries involving grouping operations and including a "having" clause, the decomposer 74A and assembly section 74B operate in the manner describe above, except, that the "having" clause is not included in the subqueries. That clause is, however, incorporated into the combining queries that are executed on the temporary table. FIG. 5 depicts a preferred mechanism, referred to as "scatter clustering" or "small bucket hashing," for storing and retrieving data from database 72. The mechanism combines cluster-storage and index-access techniques to disperse and retrieve data records from storage media 80A, 80B, 80C (e.g., disk drives) upon which database 72 is contained. Data records are stored using the DBMS's 76 cluster-storing capabilities, based on a conventional hash function of its key value (as generated by element 76B), and using a smaller-than-normal bucket size chosen to insure that at least one overflow hash bucket will be created for each root bucket. More preferably, the bucket size is chosen to insure that hash buckets are spread over storage devices to maximize the potential for parallel access. Each stored record is simultaneously indexed for direct access in accord with the same key value(s) used by the hash fuiction. In operation, the DBMS 76 responds to requests to store data records by invoking the hashing element 76B to store those data records in accord with a hash on their key values. The DBMS 76 also populates index 76C by invoking DBMS's 76 corresponding indexing functionality. When accessing data records, the decomposer 74A generates subqueries specifying that requested data records are to be accessed via the index element 76c, not the hashing element 76b. It will be appreciated that, to maximize the performance of the system depicted in FIG. 3B, the database 72 is organized to achieve the best mix of I/O parallelism and hit ratio. Generally, the greater the former (I/O parallelism), the more threads 78A, 78B, 78C can be used, in parallel, to initiate data retrievals. The greater the latter (hit ratio), the greater the number of relevant records each thread 78A, 78B, 78C gets with each retrieval. Traditional indexed access schemes lend themselves to high degree of I/O parallelism, but low hit ratio. Parallelism is good because new records are allocated randomly in the physical disk structure. The hit ratio is low, however, because each disk access is likely to get little more of interest than the specific record sought (i.e., the data in neighbors of any given record are unlikely to have any relationship to the data in the given record). Traditional hashing schemes are generally of low I/O parallelism, but have a high hit ratio. Parallelism is low because most of the data with a given key value is stuffed into just a few buckets: the root and a few necessary overflows. The hit ratio is high, however, because each disk access will get several records of related data (i.e., the neighbors of any given record are likely to be related to the data in the given record). By combining the DBMS's 76 indexing and hashing mechanisms in the manner described above, the aforementioned scatter clustering technique achieves a good mix of I/O parallelism and hit ratio. It does this by storing the data records using the DBMS's 76 hash-based storage techniques with abnormally small bucket size, thereby distributing small bucket-size clusters of related information around the disk, and by retrieving the data using the DBMS's indexing mechanism. Those skilled in the art will, of course, appreciate that the invention contemplates operating on database tables with any plurality of partitions. And, that the invention contemplates using any plurality of subqueries (and corresponding threads) to execute retrievals against those partitions. Moreover, it will be appreciated that the invention does not require that the number of partitions and subqueries be identical. Preferably, the number of subqueries (and threads) is an integral divisor, greater than one, of the number of partitions. Thus, for example, three subqueries can be beneficially run against six partitions. The sections which follow discuss the design considerations of the illustrated, preferred embodiment of the invention, to wit, a system hereinafter referred to as the "Query Decomposer" or "QD" for parallelizing decision support queries for use on a multiprocessor system of the type shown in FIG. 1 (and commercially available from the assignee hereof, Kendall Square Research Corporation) in connection with version 7 of the ORACLE.TM. database management system (which is commercially available from Oracle Corporation and can be adapted for operation with a number of computer systems, including the Kendall Square Research Corporation multiprocessors). Each of the sections which follow is identified by a "Database Note Number," (or DBN #). Those identifications are used to cross-reference the sections, typically, in lieu of their titles. The inventors are alternatively referred to as "we," "I," "KSR," and other like terms. Notwithstanding the grammatical tense of the sections which follow, those skilled in the art will attain the requisite understanding of the invention and the disclosed system upon reading the sections which follow in connection with the other portions ofthis patent application. In this regard, it will also be appreciated that when the text of the section refers to material "below" or "above," such reference is typically with respect to material contained within that section itself. Those skilled in the art will attain from study of the sections that follow, not only an appreciation of the workings of an exemplary, preferred illustrated embodiment, but also of its application to other computer systems and DBMS's. The sections which immediately follow overview the operation and use of a preferred query decomposition system according to the invention. Parallelizing Decision Support Queries in Version 1 of ORACLE for KSR (Database Note #21) 1. Introduction Described below is a "front-end" to the ORACLE database management sytem that can parallelize a reasonable class of decision support queries without requiring major changes to the DBMS itself. To achieve this goal, we propose herein a new query decomposition approach, in which parallel subqueries are submitted to the DBMS, matching the physical data declustering already permitted through table "striping" in ORACLE. We believe that query decomposition is applicable to a very significant class of decision support queries, has excellent potential for performance gain for this class, and will be achievable with reasonable engineering effort at KSR. Furthermore, this is an approach that can eventually benefit all users of ORACLE on parallel and shared-memory multiprocessor machines. Section 2 (of this database note) describes our query decomposition approach in more detail, including a simple example. Section 3 discusses the critical problems that need to be solved to implement this approach. Section 4 analyzes the applicability of query decomposition with respect to a number of sample queries. 2. Query Decomposition Approach ORACLE permits the DBA to specify table "striping" in the CREATE TABLESPACE command. A large table may be broken up into a number of files, spread across multiple disks. This is mainly viewed as an OLTP-oriented technique, aimed at optimizing random access to tables. Depending on how the file extents are populated, there may be some degree of data skew in terms of tuple distributions. However, striping is effectively a physical partitioning that we believe is adequate, to support query decomposition. Query decomposition is done by making a number of copies of the original query, and then appending additional predicates to each subquery to make it match one of the existing partitions of one of the tables in the query. These subqueries are then executed in parallel. Finally, a combining query (or function) over the subquery results produces the result of the original query. Most commonly, this is the union over the subquery results. We use the notation "Q/t/i" to represent the ith subquery resulting from decomposing query Q to match an m-file physical partition of table t, where i=1, . . . , n. Tablet is called the partitioning table. We impose the reasonable constraint that n.ltoreq.m, so that we don't produce more subqueries than there are underlying data partitions. To give a simple example, assume that table emp is distributed over files with FILEIDs in the sorted list [2, 5, 91, 112, 113, 115], and that we want three subqueries to be formed from query Q, with emp as the partitioning table. In this case, m=6 and n=3. Assume further that an index exists on emp. location, and recall that in general, the FILEID component of a ROWID in table t can be calculated as SUBSTR(t.ROWID,15,4). Let Q be SELECT*FROM emp WHERE emp.location="Boston". Then we will produce three subqueries: Q/emp/1: SELECT*FROM emp WHERE emp.location="Boston"AND SUBSTR(emp.ROWID,15,4).gtoreq.2 AND SUBSTR(emp.ROWID,15,4)<91 Q/emp/2: SELECT*FROM emp WHERE emp.location="Boston"AND SUBSTR(emp.ROWID,15,4).gtoreq.91 AND SUBSTR(emp.ROWID,15,4)<113 Q/emp/3: SELECT*FROM emp WHERE emp.location="Boston"AND SUBSTR(emp.ROWID,15,4).gtoreq.113 The predicates on SUBSTR(emp.ROWID,15,4) can be evaluated using ROWID values from the index on emp.location. Each subquery therefore retrieves its results from a separate partition of the emp table. The union over the three subquery results yields the result of the original query Q. (Note that the predicates on, e.g., Q/emp/1, are equivalent to "AND emp.ROWID>=`0.0.2` AND emp.ROWID<`0.0.91`," the form used elsewhere.) In this query decomposition approach, the degree of parallelism is limited by the number of physical partitions of the partitioning table, but not by the inherent parallelism in the query, as is the case for inter-operator parallelism. In the future it should be possible to leverage our initial work by basing query decomposition on hash-partitioned data, or by decomposing queries according to other criteria than matching data partitions. 3. Critical Problems To Be Solved Critical problems to solve in implementing this approach are: (1) Decomposing queries into effectively parallelizable subqueries that match one or more partitions, (2) Submitting subqueries to the DBMS and executing them in parallel, (3) Avoiding excessive query optimization overhead for the multiple subqueries, (4) Producing correctly-optimized access plans for the multiple subqueries, (5) Restricting subqueries to reading only the relevant physical partitions of the partitioning table, and (6) Assembling the results of subqueries. Our initial cuts at solutions to these problems are presented below. Included are the modest requirements on the ORACLE DBMS that we believe are needed to support external query decomposition and subquery execution. 3.1 Decomposing queries into subqueries We plan to build a query decomposer module that will read user-specified "comments" on SQL queries and produce the appropriate subqueries. These directives disguised as comments will specify the partitioning table and (possibly) the maximum number of subqueries to be produced. The rules and hints in section 4.4 should help the application programmer to make these choices. The directive language should be consistent with ORACLE's version 7.0 language for passing directives to the query optimizer. It may also be possible for us to automate the choice of partitioning table. This avoids having to depend on the application programmer to correctly determine which queries can be effectively parallelized and how to do it. However, it requires the decomposer to analyze the entire query and predict optimization strategies. A few classes of queries will require more than just appending partition-matching predicates to produce effectively-parallelizable subqueries. For example, queries involving the aggregate function AVG will require additional expressions in the target list of each subquery in order to later assemble subquery results correctly. As discussed in section 4, several classes of queries are not effectively parallelizable. 4. Characterization of Decomposable Queries It is important to understand which queries are decomposable, since this defines the limits of applicability of the proposed decomposition approach. We begin with some useful notation. Then we treat abstract queries Q1-Q12, and more concrete queries Q13-Q16. Finally, we summarize the rules for choosing the partitioning table and join order, and characterize the class of decomposable queries. This is an initial cut, where we have considered a representative but not exhaustive set of queries. We assume the use of the ORACLE 7.0 query optimizer, but may not have captured its exact behavior. Many of the same results could be achieved with the 6.0 optimizer. A reader wishing to skip the details on first reading should jump ahead to section 4.4. 4.1 Notation As before, Q/t/i represents the ith subquery resulting from decomposing query Q to match an m-file physical partition of table t, where i=1, . . . , n. To make it simpler to describe the decomposed subqueries in sections 6.2 and 6.3, we introduce the in_interval predicate: in_interval(t.FILEID,i) is true for tuples in the ith group of files for table t. The predicate translates into the appropriate conditions on FILEIDs (i.e., on SUBSTR(t.ROWID,15,4), as was shown in the example in section 2. In the discussion, index(t.x) means there exists an index on the x attribute of table t. A nested loops join, with a as the outer table and b as the inner will be written NLJ(a,b). A merge join of a and b will be written MJ(a,b). 4.2 Abstract queries Queries Q1 through Q12 are against tables a, b, and c. By starting with simple, abstract queries and adding increasingly complex conditions, we hope to better characterize the applicability of the query decomposition approach. Given our decision-support orientation, we have considered just read-only queries, and not data manipulation statements that do updates, deletions, or modifications. We assume that all tables are partitioned across multiple disks, so that any table can be the partitioning table for a given query. Some of the case-by-case analyses below depend on the existence of indexes to support join predicates; in a reasonably-designed database, such indexes are usually present. Parallelizing subqueries effectively is taken to mean achieving a significant speedup through parallel execution. We assume that a combining query or function is used on the results of subquery execution. Simple selection Q1: SELECT*FROM a Q1/a/i: SELECT*FROM a WHERE in_interval(a.FILEID,i) Under ORACLE 6.0 or 7.0, this will result in a full table scan for each subquery, with no performance speedup at all. However, once ORACLE is able to use the extent directory as a FILEID "filter" for this class of query, then the subqueries can be effectively parallelized. Selection with a predicate Q2: SELECT*FROM a WHERE a.x=v1 Q2/a/i: SELECT*FROM a WHERE a.x=v1 AND in_interval(a.FILEID,i) Assume index(a.x). According to ORACLE, the index will be used to apply the predicate on a.x and the predicates on FILEID. This effectively parallelizes the subqueries. If there is no index, then the query can be treated as was Q1, with the a.x predicate being checked against all rows scanned by each subquery. Simple join Q3: SELECT*FROM a,b WHERE a.z=b.z Q3/a/i: SELECT*FROM a,b WHERE a.z=b.z AND in_interval(a.FILEID,i) Assume only index(b.z). Then the optimizer will generate NLJ(a,b). The tuples in each partition of a are joined with b, using the index on b, effectively parallelizing the subqueries. If index(a.z) instead, use b as the partitioning table and reverse the roles of the two tables. In other words, generate: Q3/b/i: SELECT*FROM a,b WHERE a.z=b.z AND in_interval(b.FILEID,i) If index(a.z) and index(b.z), then one of a and b will be chosen by the optimizer as the outer table, and should also be used as the partitioning table. By default, the optimizer will pick the smaller table as the outer one. However, if the smaller table has very few partitions, it is preferable to direct the optimizer to choose the larger table as the outer one, and to use it as the partitioning table as well. In either case, the subqueries can be effectively parallelized. Finally, in the rare case where no index exists to support the join, then ORACLE will generate MJ(a,b), and will sort both a and b before performing the join. While the query can still be decomposed into subqueries, say Q3/a/i, the problem is that each subquery will sort the entire b table. The likely result is relatively little performance speedup. Note that a parallel hash join operator would help in this case, if it were available. Strictly speaking, one can do a nested loops join even if there is no index on the inner table. This is appropriate if the inner table is small and can be quickly searched in main memory. The ORACLE 6.0 optimizer can be forced to choose this strategy if desired. Join with a single-table predicate Q4: SELECT*FROM a,b WHERE a.x=v1 AND a.z=b.z Q4/a/i: SELECT*FROM a,b WHERE a.x=v1 AND a.z=b.z AND in_interval(a.FILEID,i) If index(a.x) and index(b.z), then NLJ(a,b) will be generated. The index on a.x will be used to apply the predicate and to get FILEIDs; this is straightforward and effective. NLJ(a,b) will also be generated if index(a.x) and index(a.z) and index(b.z), with the two indexes on a being intersected before a tuples are retrieved. If index(a.x) and index(a.z), then b should be used as the partitioning table, since NLJ(b,a) will probably be generated, with the two indexes on a being intersected before inner tuples are fetched. In other words, generate: Q4/b/i: SELECT*FROM a,b WHERE a.x=v1 AND a.z=b.z AND in_interval(b.FILEID,i) If not index(a.x), Q4 reduces to the Q3 case. In other words, there is no problem unless not index(a.x) and not index(a.z) and not index(b.z). In that case, MJ(a,b) will be generated, and the subqueries cannot be effectively parallelized. Join with predicates on both tables Q5: SELECT*FROM a,b WHERE a.x=v1 AND b.y=v2 AND a.z=b.z Q5/a/i: SELECT*FROM a,b WHERE a.x=v1 AND b.y=v2 AND a.z=b.z AND in_interval(a.FILEID,i) Q5/b/i: SELECT*FROM a,b WHERE a.x=v1 AND b.y=v2 AND a.z=b.z AND in_interval(b.FILEID,i) If index(a.x) and index(b.y) and index(a.z) and index(b.z), then nested loop joins are possible with either a or b as the outer table. The choice will be made based on the selectivity of the two single-table predicates D the more selective predicate will be applied to the outer table. If NLJ(a,b) is generated, then Q5/a/i is appropriate; if it is NLJ(b,a), then Q5/b/i is the preferred decomposition into subqueries. Either way, the subqueries can be effectively parallelized. If only one of the indexes supporting single-table predicates is present, say index(a.x), then Q5 reduces to the Q4 case. If neither is present, then Q5 reduces to the Q3 case. Three-table join with predicates on two tables Q6: SELECT*FROM a,b,c WHERE a.x=v1 AND b.y=v2 AND a.z=b.z AND b.w=c.w We will not do an exhaustive, case-by-case analysis here. The heuristics to use for this query, and for more complicated p-way joins, are the following (generalized from Q3-Q5): (1) If all tables are indexed (on either a join or a non-join attribute), the application programmer should choose as partitioning table the one with the most selective index on a non-join attribute. This will be the outer table in an initial nested loop join, with FILEIDs taken from its non-join attribute index. (2) If all tables but one are indexed, choose that one as the partitioning table. This will be the outermost table in an initial nested loop join, with FILEIDs taken from its extent directory. (3) If two or more tables do not have indexes, the largest of the non-indexed tables should be chosen as the partitioning table. The others should be the last tables to be joined, to minimize sorting costs for the merge join(s) required. In summary, the preferred join order of tables is: first, the largest unindexed table, if one exists; followed by all indexed tables, in order of decreasing predicate selectivity (including both join predicates and single-table predicates); followed by all remaining unindexed tables, if any. This supports access plans that consist of one or more nested loops joins, followed by zero or more merge joins. Join with an ORDER BY clause Q7: SELECT*FROM a,b WHERE a.z=b.z ORDER BY a.x Q7/a/i: SELECT*FROM a,b WHERE a z=b.z AND in_interval(a.FILEID,i) ORDER BY a.x Assume the existence of at least one useful index, so that an effective decomposition exists without the ORDER BY clause. It is up to the combining query or function to handle the final step of merging sorted subquery results. This can be generalized: any multi-way join that can be effectively parallelized can still be effectively parallelized when a simple ORDER BY clause is added. Expressions in the ORDER BY clause may cause a problem, however. Simple aggregate retrieval Q8: SELECT MAX(a.x) FROM a Q8/a/i: SELECT MAX(a.x) FROM a AND in_interval(a.FILEID,i) The subqueries themselves can be effectively parallelized, but the union of the subquery results clearly does not produce the correct result for the query. What is needed is a combining query or fuiction over the union of the subquery results that selects (in this case) the maximum value. Distinct value selection Q9: SELECT DISTINCT a.x FROM a WHERE a.y=v1 Q9/a/i: SELECT DISTINCT a.x FROM a WHERE a.y=v1 AND in_interval(a.FILEID,i) The subqueries can be effectively parallelized. Since ORACLE currently does a sort on a.x for each subquery in order to weed out duplicates, the subquery results are assumed to be sorted on this field. Combining the subquery results then requires just one more level of duplicate elimination. The keyword DISTINCT can also appear inside of an aggregate function (e.g., AVG (DISTINCT a.y)). This construct cannot be effectively parallelized; it is impossible to combine subquery results in a meaningful way. Aggregate retrieval with a GROUP BY clause Q10: SELECT MIN(a.x) FROM a GROUP BY ay Q10/a/i: SELECTMIN(a.x) FROM a WHERE in_interval(a.FILEID,i) GROUP BY a.y This is similar to query Q8. It is possible to generate parallel subqueries, and execute them effectively. Combining the results requires merging the result groupings produced by the subqueries. HAVING clause with an aggregate Q11: SELECT a.x, MIN(a.y), AVG(a.z) FROM a GROUP BY a.x HAVING MIN(a.y)<v3 Q11/a/i: SELECT a.x, MIN(a.y), AVG(a.z) FROM a WHERE in_interval(a.FILEID,i) GROUP BY a.x HAVING MIN(a.y)<v3 This subquery formulation will not lead to the correct result for the original query. The problem is that the HAVING MIN(a.y)<v3 is only applied to a tuples for which in_interval(a.FILEID,i) is true (i.e., tuples in the subquery's partition). In fact, the HAVING clause should be applied to all a tuples instead. If the form above is too abstract, think of: SELECT emp.deptno, MIN(emp.sal), AVG(emp.sal) FROM emp GROUP BY emp.deptno HAVING MIN(emp.sal)<40000 Correlated subquery Q12: SELECT a.x, a.y, a.z FROM a aa WHERE a.x=v1 AND a.y>(SELECT AVG(a.y) FROM a WHERE a.z=aa.z) Q121a/i: SELECT a.x, a.y, a.z FROM a aa WHERE a.x=v1 AND in_interval(a.FILEID,i) AND a.y>(SELECT AVG(a.y) FROM a WHERE a.z=aa.z) This seems to be effectively parallelizable. The correlated subquery will be evaluated once for each tuple in table a satisfying the single-table predicate, but that happens in parallel, matching the partitioning of the table. If the form above is too abstract, think of: SELECT emp.location, emp.sal, emp.dept FROM emp empxx WHERE emp.location="Boston" AND emp.sal>(SELECT AVG(emp.sal) FROM emp WHERE emp.dept=empxx.dept) 4.3 Concrete queries These are divided by type of database design. Datacube-design query
Q13: SELECTSUM(sales.volume), product.name FROM sales,
product WHERE
product_code.gtoreq.6 AND product_code<12 AND sales.region=
"Boston" AND sales.quarter="Q2"
AND sales.year=1990 AND product.product_code=
sales.product_code GROUP BY
sales.product_code
This query is effectively parallelizable, given a sophisticated combining function. Hierarchical-design query
Q14: SELECT emp.last_name, emp.first_name FROM
emp WHERE (dept.dept_name="MFG"
OR dept.dept_name="QC") AND
emp.deptno=dept.deptno AND EXISTS (SELECT
training.type WHERE training.type="Quality
Control" AND training.date>"010188" AND
training.emp_name=emp.emp_name)
This matches the form of Q12, and is effectively parallelizable. Event-design queries
Q15: SELECT claim.amt, claim.classification,
vehicle.vno FROM claim, vehicle WHERE
claim.amt>10000 AND vehicle.state=`MA` AND
(claim.classification="Suspicious" OR
claim.classification IS NULL) AND claim.vno=vehicle.vno
Assuming reasonable indexes (say, at least index(vehicle.vno)), this is effectively parallelizable. It matches the form of Q5 with a few extra predicates.
Q16: SELECT * FROM policy vehicle,
more_vehicle_info, claim, estimate WHERE
vehicle.coverage_date>"010190" AND
estimate.claim#=claim.claim#AND
claim.veh#=vehicle.veh# AND
more_vehicle_info.veh#=vehicle.veh# AND
policy.pol#=vehicle.pol#
This is effectively parallelizable, with vehicle as the partitioning table (since indexes are assumed to exist on all relevant join fields). If claim and estimate tables are clustered, then one less join needs to be done. 4.4 Heuristic rules The following heuristic rules characterize the choice of partitioning table (also referred to as "driving table" elsewhere) and join order, and the set of decomposable queries (assuming that the underlying tables are all partitioned). We expect these rules to be refined over time. A first implementation may use the first table in the optimizer's EXPLAIN plan as the partitioning table. Choice of partitioning table (1) If all tables are indexed (on either a join or a non-join attribute), choose as partitioning table the one with the most selective index on a non-join attribute. This will be the outer table in an initial nested loop join, with FILEIDs taken from its non-join attribute index. (2) If all tables but one are indexed, choose that one as the partitioning table. This will be the outermost table in an initial nested loop join, with FILEIDs taken from its extent directory. (3) If two or more tables do not have indexes, the largest of the non-indexed tables should be chosen as the partitioning table. The others should be the last tables to be joined, to minimize sorting costs for the merge join(s) required. Choice of join order (4) The preferred join order of tables is: first, the largest unindexed table, if one exists; followed by all indexed tables, in order of decreasing predicate selectivity (including both join predicates and single-table predicates); followed by all remaining unindexed tables, if any. This supports access plans that consist of one or more nested loops joins, followed by zero or more merge joins. Decomposable queries (5) Queries containing any of the aggregate functions AVG, SUM, COUNT, STDDEV, and VARIANCE, modified by the keyword DISTINCT, cannot be effectively parallelized, because subquery results cannot be correctly combined to produce the result of the original query. (6) If an otherwise effectively parallelizable query contains AVG in a target list expression, the query is still effectively parallelizable, assuming a sophisticated combining function or query. However, additional expressions (i.e., COUNT and SUM in the target list of each subquery need to be generated so that subquery results can be assembled correctly. (7) Similarly, otherwise effectively parallelizable queries containing the aggregate functions STDDEV or VARIANCE can be effectively parallelized through target list modification and a sophisticated combining query. (8) If an otherwise effectively parallelizable query contains a GROUP BY clause (i.e., a single field reference to a field in the target list), the query is still effectively parallelizable. (9) If an otherwise effectively parallelizable query contains a HAVING clause, then the query is still effectively parallelizable by moving the having clause to the combining query. (10) If an otherwise effectively parallelizable query contains a simple ORDER BY clause (i.e., a position reference to the target list, or a single field reference to a field in the target list), the query is still effectively parallelizable. (11) If an otherwise effectively parallelizable query contains a SELECT DISTINCT, it can be effectively parallelized. In contrast to rule (6), DISTINCT is applied here to an expression in the target list. (12) Non-flattenable nested subqueries can be effectively parallelized, if they do not contain any other problematic constructs. (13) Clustered tables (such as emp kept clustered with dept) do not block effective parallelizability. Query Decomposition in ORACLE for KSR Preliminary Design (Database Note #26) 1 Introduction The process of decomposition requires the following questions to be answered: a) Is decomposition enabled? b) Can this query be correctly decomposed? c) Will decomposition be effective for this query? d) Which table should be used for partitioning? e) What is the degree of partitioning (i.e., number of subqueries)? Decomposition will be done when the answers to (a), (b), and (c) are yes. The user will always retain the ability to disable decomposition if desired. We intend to automate the answers to all of these questions. An application programmer can override any of the automatic decomposition decisions by using directives in the SELECT statement, in the form of embedded comments. The exact form of these directives are not described in this database note, but will adhere to the style used in ORACLE. For purposes of this database note, we will make some rational guesses about what they might look like. Query decomposition can be used with Pro*COBOL, Pro*C, SQL*Plus, OCI, SQL*Report, and possibly SQL*ReportWriter when it gets rewritten to use UPI in ORACLE version 7.0. (It might also work with the precompilers for other languages, but we will make no special effort to insure that.) We would like to support QD for PL/SQL, but have not yet determined how much additional work would be needed, if any. The parallel execution of queries via QD can be selectively enabled and disabled without changing any application code. A parallel application can be written and initially tested in serial mode. After it is working correctly, parallelization can be turned on with some kind of switch. We have a strong desire to preserve the existing application programming model and avoid embedding the notion of parallel programming in the application. An ORACLE application processes queries by iteratively performing fetches on a cursor, which steps through a virtual table of result rows. This result table does not necessarily exist as a complete entity at any point in time. It is frequently constructed on the fly, so that the result rows effectively "pass through it" on their way to the application. The application has the illusion of fetching directly from this virtual table. In general, we will use combining functions to assemble subquery results into the final result. The possibility of storing all subquery results in intermediate tables, and then using a separate combining query to read these tables, was also considered. It was rejected as an overall approach, but might be used in some situations where aggregation has reduced the cardinalities of the intermediate tables. Under our chosen approach, the results of parallel subqueries need not be stored in actual tables. Instead, we will try to maintain the concept of virtual result tables at the subquery level. When the application fetches from a cursor, we would like some or all of the subqueries to fetch from their corresponding cursors, as needed, with the results combined to return the appropriate row to the application. In this way, the results from all the subqueries would exist only in virtual tables, and not require any significant memory or I/O. 2 Design Overview One of our design goals is to modularize query decomposition to allow that code to be maintained separately from the rest of the ORACLE code. This follows Oracle's policies on port-specific modifications and will simplify the appropriate sharing of maintenance between KSR and Oracle. The UPI (User Program Interface) is the common point of access to the ORACLE kernel for all applications. A parallel UPI library (PUPI, pronounced "puppy") will be developed that intercepts each call to UPI (for performing operations like connect, parse, fetch, etc.) and generates multiple calls to UPI, which generally will be executed in parallel (see FIG. 8). This is only a conceptual view; in some cases, it will actually work a little differently. For example, during a CONNECT, we don't know how many additional connections to make because we don't yet know how many subqueries there will be. Therefore, the additional connections must be deferred until later. Most of our work will be implementing the PUPI, although a few enabling hooks might need to be added to other areas of the code. In principle, KSR ORACLE should be runable without the PUPI. PUPI will pass the original query on to UPI to have it parsed and verify that the syntax is correct. After that, the query will be scanned to parse the parallel directives, if any. By default, we will decompose any queries where it is correct and effective to do so, as long as decomposition has been enabled. The user can override the decision to decompose or the choice of partitioning table. Once the partitioning table has been determined, the PUPI will look up the table name in ORACLE's catalog to find out the number of files comprising it and the list of file_id's. The number of files determines the number of subqueries and, therefore, the number of additional connections to ORACLE that are needed. Multiple subqueries will be generated as copies of the original query with an additional predicate appended to them, specifying which data partition to use. Each partition corresponds to exactly one physical file. In order to correctly combine some subquery results, we may need to augment or otherwise transform the subquery select lists. For example, when the query contains an AVG function, we will also need to have each subquery return the number of rows used in calculating its average. Each AVG function in a query might use a different row count, since ORACLE does not include NULL values when calculating averages. Therefore, for each "AVG(XXX)" in the original query, we need to replace "AVG(XXX)" with "SUM(XXx)" and append "COUNT(XXX)" to the select list in each subquery. SUM is quicker to compute than AVG and will reduce the accumulation of roundoff errors when computing the overall average. Before the subqueries are parsed or executed, additional connections must be made to the same database, which is not necessarily the default database. (Initially, we might require that the default database be used, and later extend query decomposition to any database.) The additional connections will only exist during the execution of the subqueries. Each subsequent query must establish its own subquery connections, based on the partitioning of that query. After parsing the subqueries, allocate and open a cursor for each of them. The concept of a parallel cursor is introduced here (see FIG. 9). It will maintain the relationship between the cursor for the original query (the root cursor) and the cursors for the corresponding subqueries (subcursors). This will allow ORACLE to do parallel fetches from multiple cursors on behalf of an application. Rows will be fetched asynchronously from the subcursors and returned to the application as needed. The rows returned from the subcursors may need to be combined or ordered in some way before the root cursor's fetch can be satisfied. See the Parallel Cursors section below for more details. When the root cursor is closed, close all the subcursors associated with it and disconnect the corresponding sessions. This could also be done for each subcursor when it reaches end of file, to free up some resources sooner. If a COMMIT or ROLLBACK is done by the application, we must do one for each of the connections we have. 4 Design Details 4.1 Determining the Number of Subqueries It is reasonable but, perhaps, not optimal to have more than one file per subquery. Maximum parallelism (and performance) is achieved when all files are being processed at the same time. However, it makes no sense to have more subqueries than files. Since we cannot partition the work into units smaller than a file, the extra subqueries would have nothing to do. In the first implemenation, the number of subqueries will be exactly the number of files. Since we need to query the database to find out the file_id's, that will also tell us how many files there are and, therefore, how many subqueries to generate. There is no need for the application to tell us this, since we already know the correct answer. It requires no extra work to automate this, and it avoids checking for and dealing with incompatibilities between what the application tells us and what really exists. This could be changed later when there is explicit support for parallel reads. Until then, assigning one subquery to each file is one way to get the same benefits indirectly. Reducing the number of subqueries will reduce some of the overhead of query decomposition. This will improve performance, as long as we can still read the same number of files in parallel. 4.2 Parallel UPI Library The PUPI will consist of a set of functions that have the same external interface as their UPI counterparts, but will call the appropriate UPI functions multiple times. Not all the UPI functions will be duplicated in the PUPI, since not all of them can be or need to be parallelized. We need a way to easily switch between serial and parallel query processing. At different times, the same application may call either UPI or PUPI functions without (by our own requirements) changing any code. (See FIG. 10. The three functions shown in each library parse a query, execute it, and fetch the results. There are many more functions that need to be implemented.) The "Application" in this figure can be assumed to include SQLLIB and OCI, i.e., everything above the UPI level. All references in the existing code to UPI functions will be effectively changed (probably via conditionally-compiled macros so the actual code doesn't have to be touched) to function variables which can be assigned the name of a specific function at runtime (e.g., either pupiosq or upiosq). The initialization routine pupiini (parallel upi initialize) will be called at appropriate times to set the function variables to the proper values. This needs to be done shortly after each application is started up, and each time thereafter that parallel processing is enabled or disabled. Note: A slight modification to this scheme will be needed to handle the case of a parallel cursor and a non-parallel cursor being active at the same time. The macros could conditionally invoke the PUPI routines whenever a parallel cursor was referenced, or the PUPI routines could be called unconditionally, and optionally pass the calls directly to the UPI without modification. 4.3 Multiple Connections The UPI maintains a hstdef (host definition) structure for every connection that exists. We will allocate a hstdef for each additional connection we need (one for each subquery). The proper hstdef for each connection must be referenced when performing any actions related to the subqueries. The extra connections can't be made until after the original query has been parsed and the number of subqueries has been determined. At that time, we will also have access to the hstdef that was set up on the first connection, which may contain information we need in order to make additional connections to the same database. (We need to have access to the connect string (user, password, host, etc.), or its equivalent. Without that, we have no way of knowing where the original connection was made.) We may also need access to the transaction time stamp in order to insure read consistency, depending on how Oracle chooses to implement that feature. 4.4 Parsing/Generating Subqueries If the parser detects errors in the query, no decomposition will be done, since the subqueries will have the same errors, if not more. Any error messages issued by ORACLE at that time will refer to the original query. Subsequent errors in parsing the subqueries will likely be due to bugs in our code that generated invalid SQL. In that case, we should display a message that is meaningful to the user, to the effect that query decomposition has failed. To support debugging and offer a clue to possible workarounds, we should also display the error reported by ORACLE, along with the offending subquery. After the query has been successfully parsed, we need to scan it to search for "PARTITION=", embedded within a comment. The next token will be the partitioning table name. Look up this table in the view ALL_TABLES to get the tablespace_name for it. Then look up the tablespace_name in the view ALL_DATA_FILES to get a list of file_id's. The number of file_id's is how many subqueries are needed. (ALL_DATA_FILES doesn't yet exist, but could be created as a duplicate of DBA_DATA_FILES, with the additional condition that the tablespace_name must exist in ALL_TABLES. Alternatively, a public synonym could be created for DBA_DATA_FILES, with public select access. It depends on how concerned users are about letting everyone see what database files exist on the system.) All of the subqueries will initially be copies of the original query. Then, a predicate in the form of FILEID=n needs to be added to each one. The proper place for this depends on the form of the query (refer to the examples below). The rest of the WHERE clause, if any, needs to be enclosed in parentheses and preceded by "AND" to insure the desired precedence. Views containing joins may present additional problems and need to be studied further. Query examples: Before: SELECT ENAME FROM EMP; After: SELECT ENAME FROM EMP WHERE FILEID=1; Before: SELECT ENAME, SAL FROM EMP WHERE SAL<10000 OR JOB=`CLERK` ORDER BY SAL; After: SELECT ENAME, SAL FROM EMP WHERE FILEID=1 AND (SAL<10000 OR JOB=`CLERK`) ORDER BY SAL; 4.5 Combining Functions Returning the proper results to the application is not simply a matter of putting the rows from the various subqueries in the right order. Sometimes, several subquery rows are needed to produce a single result row--a result row being what the application sees. A set of combining functions will be developed to produce a single result row for the application from all of the subquery rows available for consideration. Only the most recent row from each subquery needs to be considered. The specific method used for merging or ordering the subquery results is completely dependent on the nature of the query. The existence of aggregate functions, ORDER BY, or GROUP BY clauses are the main factors to consider. Sometimes multiple combining functions need to be applied to the same query. For example, the query SELECT MIN(SAL), MAX(SAL) FROM EMP GROUP BY STATE. would require three combining functions to be applied. As mentioned above, in order to effectively determine what combining functions are needed for each query, we will need to determine or request certain information about the form of the query. Several questions need to be answered when deciding how to combine subquery results. The two main ones are: a) Which subquery rows do we want to use? b) How do we combine those rows? Which rows depends on the form of the query and the specific data values in the subquery results. How to combine the rows depends only on the form of the query. We are considering using combining queries to handle complex situations (e.g., HAVING clauses or expressions in the select list). 4.5.1 Selecting Subquery Rows In selecting or constructing a row to be returned to the application, we need to examine the most recent row fetched from one or more of the subqueries. If there are no aggregates in the query, then only one row from one subquery will be selected to satisfy each root cursor fetch. If there is an aggregate, then rows from several subqueries might be selected and combined into a single row. No aggregate: If there is no ORDER BY clause, then this is a simple union. Take one row at a time from each subcursor, in round-robin fashion. If there is an ORDER BY clause, then the sorted results of each subquery need to be merged. For each root cursor fetch, take the row with the highest or lowest sort column values, depending on whether ASC or DESC was specified. We must take into account the collating sequence currently in effect when determining high and low values. With an aggregate: If there is no GROUP BY clause, then each subquery will have returned a single row containing the aggregate result for its partition. Combine all of these rows into a single row, using the appropriate aggregate function(s). If there is a GROUP BY clause, then all the possible group values may not be present in every subquery result. For example, SELECT DEPTNO, AVG(SAL) FROM EMP GROUP BY DEPTNO; might produce the following partitioned results:
AVG
DEPTNO AVG(SAL) DEPTNO AVG(SAL) DEPTNO (SAL)
10 1500 10 2000
20 2250 20 3200 20 4000
30 1700 30 1100
In this case, the combining function cannot simply take one row from each subquery and combine them. It needs to select and combine rows where the group values match each other. For the first root cursor fetch, all the DEPTNO 10's will be combined; the next fetch will combine the 20's, etc. Since GROUP BY implies ascending ordering before the aggregate function was applied, we can select the lowest available group value and all of its duplicates. 4.5.2 How to Combine Subquery Rows Once the rows to be returned to the application have been selected, we need to combine them into a single row. If only one row was selected, obviously no combining is necessary. The particular combining technique to be used is dependent only on the form of the query, not on any specific data values. The need to combine multiple rows implies that the query has at least one aggregate. Combining can be viewed as collapsing several rows into one. All the eligible subquery rows are identical in the non-aggregate columns. These columns can simply be copied into the result row. The aggregate columns can be combined by calling the appropriate combining function, passing the column number and pointers to the relevant rows. Note that averages need some special handling--the corresponding COUNT column also needs to be identified and taken into account by the combining function. Example: Assume columns 1,2 are not aggregates and columns 3,4 are. for column=1,2 copy column_value(column, row_ptr) to result for column=3,4 copy combining_function(column, set_of_row_ptrs) to result After processing and disposing of each subquery row, set the buffer state to empty and notify the appropriate fetch thread so it will initiate another asynchronous fetch. Array fetches will need some special consideration. The combining functions may have to be called iteratively until the array is full. 4.6 Error Handling A detailed description of all possible errors has not yet been created. When we do, we should try to classify errors into the following severity categories and decide how each of them will be handled in each of our several versions: The user requested decomposition and the query cannot be decomposed correctly. The user requested decomposition and the query can be correctly decomposed, but not effectively. It may even run slower. Infinite loop, ORACLE or application crash, or database damage. Error handling might get a little tricky with multiple fetches going on at once. If any of the subcursor fetches encounters an error, bubble it up to the root cursor so the application knows about it. Maybe we need to terminate all the other subqueries, too. The P1 version might not be too robust in this area, and more issues will probably be uncovered during implementation. I haven't tried to predict them all at this time. 5. Limits of Parallelization The potential degree of parallelization, using query decomposition, is limited by several factors: The number of physical files comprising the partitioning table Data skew or partition skew in the partitioning table, with respect to the query. I am defining data skew here to mean any distribution of data that causes result rows to be fetched from the subcursors in something other than round-robin fashion. For example, sorted output may appear in clumps so that several rows in succession from the same subcursor are returned to the root cursor. During such periods of time, little, if any, parallel fetching will occur. This phenomenon may appear and disappear many times during the course of a single query. Increasing the number of fetch buffers per subquery will help to minimize the effects of this type of data skew. Partition skew is defmed as a distribution of data that results in unequal-sized partitions. During the latter part of query execution, and possibly even during the entire query, some partitions will have no more rows to fetch. This will reduce the degree of parallelism for the remainder of the query. The database partitions may actually be equal in size, but the effective partition size for any given query might be reduced by predicates in the query. The cost of the combining functions, relative to the cost of executing the subqueries The amount of processing done by the application for each row (single-threaded) ORACLE or OS limits on the number of processes, threads, connections, etc. Overhead of opening, closing, and maintaining extra connections and cursors. The number of partitions is limited by the maximum number of database files ORACLE supports, which is currently 256. To achieve a higher degree of parallelism (through query decomposition) we will need to increase the file limit, while reducing the maximum number of blocks per file by a corresponding factor. Bear in mind that query decomposition is designed to work in conjunction with other parallel processing techniques, such as parallel relational operators and pipelining. Thus, we are not depending solely on QD for parallelism in query processing. Query Decomposition and ORACLE Clustering Techniques (Database Note #76) This is an informal discussion which is a first attempt to pull together in one place the issues involved in using Query Decomposition in conjunction with ORACLE's clustering techniques and ORACLE's approaches to laying out extents and data blocks within files. A primary immediate goal is to identify assumptions about ORACLE's behavior which need to be verified, and questions which need to be answered by either of these means. A medium term goal is to develop application design guidelines for use in modeling and pilot projects. An ultimate goal is to develop end-user documentation providing DBAs with detailed guidelines for planning and configuring their databases and applications to make the best use of QD in conjunction with ORACLE's native techniques for optimizing data access. Overview of Basic Query Decomposition Mechanism Our Query Decomposition parallelizes a query by dividing it into subqueries, each of which use a rowid range predicate to specify one or more files to which that query's reads will be restricted. The approach depends on partioning tables across files on multiple disk drives, so that the files can be read in parallel. So, for a trivial example, if the table EMP is partitioned across 3 files with ORACLE fileid's 1, 2, and 3, then the query SELECT*FROM EMP can be decomposed into three subqueries: SELECT*FROM EMP WHERE ROWID>=`0.0.1` and ROWID<`0.0.2` SELECT*FROM EMP WHERE ROWID>=`0.0.2` and ROWID<`0.0.3` SELECT*FROM EMP WHERE ROWID>=`0.0.3` and ROWID<`0.0.4` The first query will only read blocks of the EMP table which are in file 1, the second will only read blocks from file 2, and the third from file 3. This is an example of decomposing a full table scan: the overall query needs to read all blocks of the table, and we gain near-linear speedup by reading the separate files across which the table is partitioned in parallel. The total number of reads has not been changed, but they happen in parallel. ORACLE has been modified to restrict reads during full table scans, based on rowid range predicates, as a necessary prerequisite to implementing this approach. Query Decomposition can also work with queries that use an index. Suppose our query were SELECT*FROM EMP WHERE DEPTNO=5, and there is an index on DEPTNO. This can be decomposed similarly to the first example: SELECT*FROM EMP WHERE DEPTNO=5 AND ROWID>=`0.0.1` and ROWID<`0.0.2` SELECT*FROM EMP WHERE DEPTNO=5 AND ROWID>=`0.0.2` and ROWID<`0.0.3` SELECT*FROM EMP WHERE DEPTNO=5 AND ROWID>=`0.0.3` and ROWID<`0.0.4` Each of these subqueries must redundantly read the same index blocks, to find index entries for DEPTNO 5, but hopefully the index blocks will be cached by the first subquery which gets to each one, so they are only read once. When a subquery finds an index entry for DEPTNO 5, however, it will examine the rowid stored in that index entry, to see whether it fall within the range for that subquery. Only if it does will that subquery read the data page containing the row with that DEPTNO value and rowid. Speedup is not as close to linear as with full table scans, because only the table reads are partitioned. Logically, the total reads are increased due to redundant reading of the index, but the redundant reading happens in parallel, and hopefully caching will eliminate most actual redundant I/O. Using QD with indexed queries depends on ORACLE implementing the feature of restricting table reads during indexed scans to blocks which fall within the specified rowid range predicate. ORACLE has not yet implemented this feature, but KSR has devised an interim implementation in our port of ORACLE 7.0.9. (KSR still relies on ORACLE to implement a "real" solution, because our interim solution is unduly CPU-intensive, since it re-evaluates the rowid range predicate for every fetch, rather than once when a cursor is opened.) Both full table scan QD and indexed scan QD rely for their effectiveness on good distribution of target data across the files of a partitioned table. For full table scans, this means that ideally each file should contain an equal proportion of the total blocks of the table, even when the table has only been loaded to a fraction of its capacity. For indexed scans, it also means that rows with duplicate key values, or rows with adjacent values of a unique key, should be well-scattered among the partitioning files, rather than contained within one or a few files. Query Decomposition and Clustering Query Decomposition as described above speeds up query execution by parallelizing the reads involved in a query, but not by reducing their total number. While this improves individual query response time, it does not improve system throughput (and may even reduce throughput, due to the added overhead of additional threads and processes, and of redundant index reads). ORACLE's clusters and hashed clusters are approaches to speeding up query execution by greatly reducing the number of reads needed to accomplish certain queries. "Regular" (i.e. non-hashed) clusters reduce the reads needed for commonly-executed joins by clustering together the rows of several related tables based on common join column values, further reducing the number of blocks needed to read a related set of rows by storing each cluster key value only once for all rows of all tables sharing that key value. This kind of cluster still has an associated index on the cluster key, but the index entries simply point the to root block for the cluster key value, rather than having separate rowid entries for individual rows. Hashed clusters reduce reads for queries which seek rows of an individual table that exactly match a given key value. Rows with key values that hash to the same hash key value are clustered together, and no index is needed to navigate directly to the root block for a given hash key value. Both of these clustering approaches require that a DBA decide in advance which access paths are likely to be used frequently enough to require organizing the data in a way that optimizes them. A given table can only be clustered on one column or set of columns, and doing so reduces performance of updates which change the values of cluster key columns. Query Decomposition has more general applicability: as long as a DBA decides in advance to partition a given table across multiple disks, Query Decomposition can be used on that table for any query that uses either a full table scan or any regular index, rather than being restricted to queries with predicates on certain predetermined columns. In general, Query Decomposition and clustering cannot be used in conjunction to optimize access to the same table in the same query. This is so because accessing a table through a cluster key, whether hashed or otherwise, does not use either a full table scan or a regular indexed scan. Instead, it uses the cluster index (for regular clusters) or hashing to find the root block for the cluster key value. Then, if all rows for the specified cluster key value are in that one block, that's all that has to be read, so there's no opportunity for parallel partitioning. Otherwise, all of the chained blocks for that cluster key value must be read in sequence, whether they are in the same or different files. Even in the case of a regular cluster where an index is used, the index entry for a particular key value just points to the first block of the overflow chain, so there's no opportunity to examine rowid's and decide whether they fall in a specified range, to decide whether to read a data block. Thus it would appear that there is no opportunity for the QD and clustering techniques to leverage each other to retrieve a particular table. (They can leverage each other to retrieve a join, in cases where the driving table of the join is partitioned an can be retrieved using QD, and where that table contains a foreign key that can be used to join to other tables that are clustered on that key.) However, KSR has devised a way of leveraging QD with hashed clustering, by using hashed clusters in a way rather different than that envisioned by ORACLE, in an approach we may designate "small bucket hashing". Small Bucket Hashing (elsewhere called "Scatter Clustering") If an index has a fairly small number of distinct values, relative to the number of rows in a table, and if rows with a given index value can be scattered anywhere in the table, without regard to their key value on that index, then even after using the index, a much larger volume of data may have to be read from the table than the volume represented by rows with the desired key values, because only a small fraction of each block read consists of the desired rows. In the worst cases, all blocks of the table must be read, so that performance is worse than if the index isn't used at all (because of the extra reads of the index, and because of the higher proportion of random to sequential I/O's). QD can ameliorate the problem by splitting up the load in parallel, but it remains the case that if the index doesn't provide speedup relative to full table scan without QD, then it won't provide speedup relative to full table scan with QD. If rows with matching key values could be clustered together, then using an index would reduce the total I/O in a much wider variety of cases, again, with or without QD. This is essentially what ORACLE clusters accomplish. Now, if instead of clustering rows with a given key value into one clump, they could be clustered in N clumps, where N is the degree of partitioning of the table, and if these N clumps could be read in parallel (i.e. if QD could be applied), we'd be better off by a factor approaching N. This can be accomplished by the following trick: create a hash cluster keyed on the desired columns, in a partitioned tablespace (i.e. the hash cluster is partitioned over multiple files, on multiple disks). Estimate the expected volume of data for each distinct key value, as you would for an ordinary hashed cluster. But instead of using that volume as the size to specify for a hash bucket when creating the hashed cluster, specify a much smaller bucket size (at the largest, V/N where V is the volume of data for each distinct key value, and N is the number of table partitions). Assuming that your ORACLE block size is also no larger than V/N (i.e. that V is large enough to be at least N*blocksize), when you load the table you get an overflow chain for each key value that has at least N blocks (just the opposite of the usual goal in configuring a hashed cluster). If you load the table cleverly (and we'll need some further experimentation to define cleverly in this context, but probably loading in random hash key sequence will work, if your order of extents round-robins through the files), you end up with the blocks for each overflow chain well-distributed among the files of the partitioned table. Now, create an (ordinary) index on the SAME columns as the hash columns. Because it is an ordinary index, each index entry consists of a key value /rowid pair, which points directly to the block containing the row in question. Also because it is a regular index, it can be used for range predicates as well as direct match predicates. When presented with a query that has an exact-match predicate on the hash key columns, the ORACLE optimizer will choose hashed access rather than using the index on those same columns, because under normal circumstances, hashed access would unquestionably be faster. However, when the Query Decomposer notices (in the EXPLAIN plan) that ORACLE has chosen hashed access, and that there is a regular index which has all of the columns of the hash key as its leading columns, it can generate an INDEX optimizer hint in the parallel subqueries, coercing the ORACLE optimizer to use the regular index rather than hashing. Since the parallel subqueries have rowid range predicates, this regular indexed query can be decomposed like any other. But because the data is clustered on the same column values, with blocks for each cluster key value well-distributed among the files of the partitioned table, many fewer blocks need to be read than if this were not a hashed table. As an example, consider the query: SELECT*FROM HASHED_TABLE WHERE HASHKEY_COLUMN=5 This would be decomposed into parallel subqueries of the form:
SELECT /*+ INDEX(HASHED_TABLE REGULAR_INDEX)
*/ * FROM HASHED_TABLE
WHERE HASHKEY_COLUMN = 5
AND ROWID >= <low end of range>
AND ROWID < <high end of range>
where a partitioned table called HASHED_TABLE is hashed on the column HASHKEY_COLUMN, and there is also an index called REGULAR_INDEX on that same column. The regular index may optionally contain additional trailing columns, beyond those which match columns of the hash key. This means it can be used to further restrict the rows read, according to additional predicates in the query. This could be particularly useful to give added flexibility, because a hash key must be decided upon by a DBA before a table is created, and once the hashed table is populated, it would require a complete reorg to add additional hash key columns. It is much easier, however, to add columns to an index (or replace it with a different index) without affecting the data itself. So if additional frequently-used selection criteria are identified after a hash table already exists, these columns could be added to the regular index. If more than one regular index has leading columns matching the hash key (but with different trailing columns), the Query Decomposer must choose one of these indexes arbitrarily, as the one it will tell ORACLE to use, because it is not equipped to perform the function of a full-fledged query optimizer, to analyze the predicates in the query and decide which index would be best to use. In this event, however, the user may optionally choose the index by placing the INDEX optimizer hint in the original query. The Query Decomposer always leaves any hints from the original query in the parallel subqueries, to provide the user this extra degree of customized control over optimization when needed in this or other situations. Supporting Query Decomposition for Applications Running on Client Workstations (Database Note #61) 1 Introduction Our Query Decomposition (QD) approach exploits the shared-memory parallel architecture of the KSR1 to speed up the execution of large ORACLE queries. It is our aim to support this approach for as wide a range of queries, and within as wide a range of ORACLE applications and contexts, as is feasible. ORACLE applications use a client-server architecture in which all database access is performed on behalf of an application program by a separate server or "shadow" process. While this architecture is used even when the client application and the server are running on the same machine, ORACLE's SQL*Net network software supports the seamless connection of remote clients and servers running on heterogeneous platforms. This permits the KSR1 to play the role of database server for a network of workstations, a configuration which is becoming increasingly prevalent, and may be preferred or even required by some potential KSR customers. Clearly, it would be desirable for Query Decomposition to work for queries issued from applications running on client workstations, against a KSR1 database server. While this does not pose a problem for the internal design of the QD code, it will require significant changes to the architecture by which QD is integrated with ORACLE. Section 1 below explains why remote workstations cannot be supported by the current QD architecture; Sections 3 and 4 present alternate architectures to solve the problem; and Section 5 draws conclusions about which architecture is likely to be preferable, and how much effort will be required to implement it. 2 The Problem If Query Decomposition were implemented as an integral part of ORACLE, the most natural approach would be to decompose a query inside the ORACLE kernel (which is in the server), and parallelize that portion of the kernel required to execute the parallel subqueries into which the original query is decomposed. Since KSR is implementing QD as a separate body of code which must be integrated with ORACLE as seemlessly as possible, but with the minimum necessary changes to ORACLE code, a rather different approach was chosen: QD is integrated with ORACLE within the ORACLE UPI (User Program Interface) layer. See DBN #26, Query Decomposition in ORACLE for KSR--Preliminary Design, for a detailed explanation of this design. This is the common set of function calls underlying all of the ORACLE front-end tools and APIs. UPI calls accomplish their functions by sending messages to the ORACLE server, which are serviced by corresponding OPI (ORACLE Program Interface) routines. Because the UPI is a part of client programs rather than a part of the ORACLE server, no architectural changes were required to the ORACLE kernel to implement this approach. Though, some changes were required in the mechanics of indexed and full table scans, to facilitate parallel partitioning Our version of the UPI is called the PUPI (Parallel User Program Interface). This set of routines emulates the calling sequence and behavior of the UPI routines, but is also capable of decomposing a query into parallel subqueries, creating and managing the threads in which those parallel subqueries are executed, and combining the results to emulate the result of the original query. For each parallel subquery, a separate thread is created, and a connection is made from within that thread to a separate ORACLE server. When a PUPI routine is called for a task which does not require parallelism, behavior is the same as for an ordinary UPI routine, and the call is serviced by the server from the original user connection (which we may designate the primary server to distinguish it from the servers used for parallel subqueries). This architecture is shown in FIG. 11. This architecture takes advantage of ORACLE's separation of client and server processes, even for local connections, to manage parallelism inside the client process, thereby requiring minimal change to the server. Unfortunately, this only works when the client is executing on the KSR1. To support a remote client, the architecture must be changed so that parallelism can be managed on the server side of the remote client/server boundary. 3 Moving QD Inside the ORACLE Kernel The approach which first suggests itself is to move the QD code from the client-side UPI, into the server-side OPI library. Since there is more or less a one-to-one correspondence between UPI and OPI routines, it would appear conceptually straightforward for KSR to develop a POPI (Parallel ORACLE Program Interface) library, along similar lines to the PUPI library. Like PUPI routines, POPI routines would determine whether a particular call required parallel processing; if not, they would behave like ordinary OPI routines. If parallel processing were called for, the POPI routines would behave as a client with respect to additional servers to which they would connect from parallel threads, to process parallel subqueries. To accomplish this, the POPI routines would have to call UPI routines to request particular services from the servers for the parallel subqueries. This architecture is shown in FIG. 12. This is not the same architecture cited at the beginning of Section 2. Rather than parallelizing the existing query execution code within the kernel, this approach introduces into the kernel new code which parallelizes client access to additional servers, each containing a complete, non-parallelized kernel. The QD logic itself would be identical to the current design. An advantage of this solution is that it introduces no new processes or connections, other than those specifically needed for executing parallel subqueries. When a client program makes sends a message to the server which does not require parallel processing, that call is simply passed on into the kernel, without requiring an additional message. Essentially, the ORACLE server is playing a dual role, both as a standard ORACLE server, and as a QD server. The chief disadvantage of this approach is the very fact that it places QD inside the ORACLE kernel. From the standpoint of detailed design and implementation, changes of this nature to the ORACLE kernel present much room for unpredictable difficulties and side effects. Prior experience indicates that it can be very difficult to emulate client behavior inside a server, since the two sides of a client/server interface, if not specifically implemented to allow for this, may contain variables with corresponding names and purposes, but which are used in subtly different ways. Furthermore, the current implementation of QD assumes its residence in the client; ORACLE functions are called which have similar but different counterparts on the server side. A potential security issue would also be raised by moving QD inside the kernel. Because QD code would have access to ORACLE's SGA (Shared Global Area), it could potentially bypass ORACLE's security enforcement. This can also be viewed as an advantage. Moving at least portions of QD inside the kernel has been previously proposed as a possible solution to security-related problems involved in decomposing queries over views. See DBN #55, Decomposing Queries Over Views--Issues and Options, for a full discussion of this complex issue. A separate QD server, as proposed in Section 4 of the current document, might also provide an avenue for solving view security problems 4 Separate QD Server A less obvious, but perhaps preferable approach, is to implement a separate QD server. From the perspective of the remote client application, this would behave exactly like an ORACLE server, servicing requests emanating from UPI calls in the client program. From the perspective of ORACLE, it would appear exactly like a local client application program containing the PUPI library (as in FIG. 11); it would contain PUPI routines which would pass messages, via UPI calls across a local connection, to a primary ORACLE server to perform non-parallel operations, and it would manage threads which connect locally to additional ORACLE servers, to execute parallel subqueries. The QD server would incorporate routines from the outermost, message handling layers of the ORACLE kernel (in particular, modules of the SQL*Net and Two Task Common, or TTC, layers), but its dispatcher would call PUPI routines, rather than OPI or POPI routines, to service requests. This architecture is shown in FIG. 13 below. A key advantage of this approach is that, while it incorporates some peripheral kernel routines, it does not constitute modification of the ORACLE kernel itself. As in the current architecture, QD code is completely segregated from the kernal. There are likely to be fewer dangers of side effects, and much less danger of unintentional security violations (the latter danger is not entirely eliminated, because emulating an ORACLE server from the client's perspective may still require access to the ORACLE SGA, but in a better-isolated and more easily-controlled context). Another seeming advantage is that the PUPI as currently implemented could be grafted unchanged into the QD server, rather than having to re-integrate QD with the OPI layer inside the ORACLE kernel. From a design standpoint, this is clearly a good thing, because it means that the actual interface between QD and ORACLE is the same for remote clients as for local clients; the extra mechanics of message relaying for the remote case are a clean add-on. From a development cost standpoint, however, this is likely to be more of a tradeoff than a straight savings, because while there is a general one-to-one correspondence in name and function between UPI and OPI routines, they do not take identical parameters or operate in an identical context. Some degree of message translation may be necessary to relay incoming messages, intended to be processed by OPI calls, to UPI or PUPI calls which will pass them along to an ORACLE server. Furthermore, while the majority of UPI calls do not require PUPI counterparts in the current implementation, because they are not directly related to retrieving query results (e.g. calls for managing transactions, for connecting to ORACLE, or for modifying data), a QD server would need to be able to relay all of these calls to an ORACLE server. More detailed study of the ORACLE code will be required to determine the amount of effort involved, and whether it outweighs the advantages of leaving QD in the PUPI layer. It could turn out that this approach is not as different from the approach of relocating QD inside the OPI layer as it would superficially appear to be. One disadvantage of this approach is that, by introducing a new server process to the overall ORACLE architecture, it adds complexity and introduces new unknowns. It may turn out to be fairly difficult to extract the appropriate SQL*Net, TTC, and other needed routines from their normal kernel contexts, to accomplish the goal of emulating the front-end of an ORACLE server. This approach also raises potential issues of packaging and code integration, since it introduces a new, KSR-specific executable to be shipped as part of ORACLE for KSR, and since it integrates in a single executable KSR-written code and code intended only as part of the ORACLE kernel. Another disadvantage of this approach is that requests for database operations which do not require parallelization must make an extra message hop to get from the client application to the ORACLE server which will service them. Since the QD code decides whether a given UPI call requires parallelization, if the QD code is in the QD server rather than in the application program, then the application program can't "know" whether to send a given request to the QD server or the ORACLE server, so it must always choose one or the other. We can provide mechanisms to let the DBA or application user decide globally or per application whether to enable QD for remote queries, so that applications with little or no need for QD can avoid the extra overhead of the intermediate QD server. Alternatively, a hybrid approach could place inside the application program those portions of QD logic which determine whether to decompose a query, while managing the parallelism in a QD server. This approach, however, would require substantially more effort to implement, since it would involve a re-partitioning of QD functionality among processes. A possible compromise approach would be to develop a means whereby those UPI calls that do not have PUPI counterparts are routed directly from the client application to the ORACLE server, while those which may require parallelism are routed to the QD server, which decides whether to parallelize or whether to "fall through" to ordinary UPI behavior. This would limit the extra hop overhead to calls which potentially require QD attention. 5 Conclusion At the current preliminary stage of analysis, the QD server approach appears preferable to the approach of locating QD in the ORACLE server, but not dramatically so. The QD server approach avoids modifying the ORACLE kernel, but this is somewhat offset by the added architectural complexity and possible complications in packaging and code integration. Maintaining the same QD/ORACLE interface for remote and local clients is certainly preferable conceptually, but may be offset by difficulties in relocating some kernel routines in a separate server, and in relaying messages to UPI routines which were intended for OPI routines. The QD server approach introduces extra performance overhead for non-parallelized ORACLE calls; this can be limited at the cost of slight extra administrative complexity, and might be reduced further by optional hybrid approaches, at the cost of greater development effort. A reasonably conservative initial estimate of development cost would be one person-month to implement the basic QD server functionality, with an additional two to three weeks to resolve peripheral issues of administration, configuration, and packaging. The initial phase of development would involve a detailed examination of the relevant ORACLE code, which would facilitate making a final decision between the alternate approaches, and producing a more reliable development cost estimate and task breakdown. While support for physically remote QD clients depends on porting ORACLE's SQL*Net software to the KSR1, SQL*Net is not a prerequisite for developing and debugging a QD server, because the distinction between a local and remote connection is transparent at the levels of ORACLE which are relevant for this project. Detailed analysis of the relevant code could begin at any time, and implementation could begin as soon as the initial port of the basic components of ORACLE 7.0.9 has been completed. Automating Query Decomposition--Framework for Rules (Database Note #32) Introduction This paper provides a conceptual framework for automating the process of query decomposition proposed in Database Notes #21 and #26. This framework can be viewed as a general structure within which to answer the question "What do we know, and when do we know it?", during the stages of transformation from an original input query to a decomposed query ready for parallel execution. In more down-to-earth terms, this paper provides a breakdown of the categories of rules involved in query decomposition, their input information and goals, and the categories of generated queries associated with them. Top Level: The OAT Model A good top level framework for query decomposition is provided by the OAT model, whose name is an acronym for three forms through which a collection of information passes during a transformation: the original form (O-form), the analyzed form (A-form), and the transformed form (T-form). The process of query decomposition consists of producing, for a given input query, the collection of parallel subqueries, combining queries, combining function control structures, and other control structures needed to retrieve data in parallel and combine it to emulate the result table of the original query. This can be viewed conceptually as a transformation of the original query (which we will designate as the O-form of the query) to that collection of objects which comprise the decomposed query (which we will designate the T-form of the query). To automate this process, we must specify a collection of rules whose starting point is the O-form of a query, and whose ultimate goal is the T-form. This highest-level goal path is shown in FIG. 14. An SQL query submitted to the system does not contain within itself all of the information needed to decompose it. Strategic information such as index usage, table cardinalities, predicate selectivity, and join order and method must be obtained from the query optimizer to make decisions about decomposition strategy, such as choice of a partitioning table. Semantic information about tables, columns, clauses and expressions in the query must be gathered from the data dictionary to determine the details of combining functions and queries (for example, what kind of comparisons to perform for a merge sort, depending on the datatypes of the ORDER BY columns). This col | ||||||
