System and method for rapid completion of data processing tasks distributed on a network6775831Abstract A computer program and task request interface which enables large volumes of data to be processed efficiently and rapidly by multiple computers on a network. The data that requires processing can be in any format. The task request interface allows any arbitrary task to be performed on the input data. The number of computers which may participate in any given data processing task is essentially unlimited. The number of programs that can participate in the system is also essentially unlimited if the program uses run time evaluable code. Any computer operating system that runs a specified computer language interpreter can participate in the distributed processing of the input data. Claims What is claimed is: Description BACKGROUND
Creation Creation File Size
Date Time (Bytes) File Name
Mar. 23, 2004 4:27 p.m. 41.5KB Source Code 9623-166
In this example, `AdvertiserQueue` is the arbitrary name assigned to the task data queue. The data group of interest comes from the group `userClick`. Two output columns are desired: one referred to as`AdListingX`, the other as `IP_Address`. The output column `AdListingX` would consist of the data elements `AccountID` and `rank`, which are part of the `userClick` data group. The final presentation of the `AdListingX` data column would be in the form of ABC###3, supposing the advertiser ID was `ABC` and the rank was `3`. The output column `IP_Address` would merely contain the data value that the data element `clientIP` has from an instance of the data group `userClick`. Those skilled in the art will recognize that any valid Perl syntax can be used in the `source_routine` value to create derivations and modifications to input data fields of interest. The key named `delimiter` having the value `.backslash.t` indicates that the output fields should be separated by a tab character. In the present instance of this invention, the control file may include three additional, optional keys. These optional keys have the following names: `deltaT`, `restriction_args`, and `restriction`. The `deltaT` provides information to the data preprocessing routine about the earliest minute within an hour for which the output of the data preprocessing file should contain data. Legal values for this key are numbers between 0 and 59 inclusive; single digit numbers are preceded with a zero (i.e. if 1 is the value, spell out `01`). The `restriction_args` key works just like the `source_args` key previously mentioned. This value for this key provides input arguments to a user defined function. The elements of the list must be names of data elements within the data group of interest. The `restriction` key value is a valid Perl subroutine. For example,
{
'AdvertiserQueue' => {
'event' => 'userClick',
'columns' => {
'AdListingX' => {
'source_args' => ['AccountID','rank',],
'source_routine' => 'sub {
my($ald)=shift @$_;
my($rank)=shift @$_;
my($x)="$ald###$rank";
return $x;
'},
},
'IP_address' => {
'source_args' => ['clientIP',],
'source_routine' => 'sub {
return shift@$_;
'},
},
'delimiter' => '/t',
},
}
specifies that the first data groups of interested to be included in the output file should occur no sooner than nine minutes after the first hour of data seen by the file parsing routine. The only data group instances to be returned are those whose `bid` element has a value greater than zero. The `bid` element is passed to the user defined function specified via the `restriction` key. Once the data preprocessing routine has read the control file and evaluated the contents, it creates an output file that begins with a file header segment. The file header segment is written in the form of a Perl evaluable hash table. This file header segment has four required keys and three optional keys that are discussed in the next section (B. Grabber Routine). After having output the fileheader, the data preprocessing routine enters a token named "EndOfHeader" on the next output line. At this point, any instances of data groupings which meet any restriction criteria are assembled according the rules specified in the control file and then written out to the data portion of the output file, with the columns being delimited by the delimiting character specified in the control file. B. Grabber Routine. According to a preferred embodiment of the grabber routine, referred to as monkeyGrab, the preprocessor routine utilizes the data for which the user wishes computations to be performed on and places these data in a file. Thereafter, other routines that can read the file header with executable code, and execute the code, can read the data to be acted upon. Thus, a requirement of the grabber routine is to be able to read the file header created by the preprocessing routine so it can extract the appropriate data elements. Since the grabber routine also writes the minimum amount of data needed for a given task to a task file, the grabber routine can write out a file header which is bound by similar rules as the preprocessing routine's file headers. The grabber routine grabs data columns based on column names that are provided through input arguments delivered to the grabber routine. For example, a data file may contain a column containing a price value, such as a search result bid column, and a column containing a class descriptor, a key field that may consist of one or many fields. In such a case, the grabber routine could be invoked as follows: "grab -g price -g myKeyField". The data file from which the grabber routine works has in its file header entries for the columns named "price" and "myKeyField". The file header from the preprocessed routine should contain the appropriate header entries, i.e., the key names for the hash table which describes the data columns. The grabber routine reads the header information from the data file to obtain information about the location within the data file of the column locations, the character(s) which delimit the data columns, and any special handling rules, such as how to treat or value an empty column location. Once the grabber has ascertained the column locations and what processing rules are required for a given column, the grabber routine loads those columns of interest and places them in an output file. The output file, written by the grabber routine, has header information pertinent to the extracted columns. For explanatory purposes, the header is in the form of a Perl hash table with four keys in it. The four keys correspond to four of the seven keys included with the output of the preprocessor routine. These four keys are for the data columns, which of the data columns, or group of columns, make a given data row unique, the data column labels, and the output data field delimiter. The key for the data columns points to a second hash table. The second hash table has as its keys column names, and as its values a hash table with two keys. The first key of the innermost hash table describes the data column's location in the data portion of the file, where the innermost hash table is the most embedded hash table in a series of hash tables. The second key of the innermost hash table describes how to represent null data values. The key for the column which uniquely describes any given data row must have the name of the column that describes the data row. This name is a key in the hash table of columns. The key for the data column labels has the value of the Perl list. Lastly, the key describing the column delimiter has a value corresponding to the column delimiter. If this column delimiter includes any characters which are escape sequences in Perl, then these escape characters are preceded by a backslash --".backslash."--character. The preferred embodiment places a token at the end of the file header, so that the software knows when it is done reading header information and when it can begin reading data. C. Worker Routine. A preferred embodiment of the worker routine(s), referred to as monkeycount, much like the preferred embodiment of the preprocessor and the grabber routines, reads the data file headers and also output such file headers. Like the grabber routine, the worker routine reads the file header to determine which columns it is reading and in what order those columns appear in the data segment of the input file, and which of the input data columns constitute the class, or key, definition. Upon reading this information, the worker routine performs the operation desired on the input data. If the worker routine is a counting routine, it will output, for each class, a count of the number of rows which belong to this class, along with a descriptor of a class. For example, if the input data consists of seven rows and two columns, a key and an item to be counted for that key, is as follows:
{
. . .
'deltaT' => '09',
'restriction_args' => ['bid',],
'restriction' => 'sub {
my($x)=shift @$_;
return ($x > 0);
}',
. . .
}
the output data file contains the following data rows:
Key Search Term
Adv01 dog
Adv03 cat
Adv05 house
Adv03 mouse
Adv01 travel
Adv05 music
Adv01 sound
Likewise, if the worker routine were an adding program, it would sum up the data values in the column of interest for each key of interest. The worker routines minimize any given task in terms of file input/output and complexity, for example, if a count is requested, the worker routine only receives a key and the data element to be counted columns. This allows many worker assignments to occur on any arbitrary number of machines. One worker routine might count the total searches for each advertiser; another might count the number of unique IP addresses that clicked on any given advertisers listing within a specific time period. When the worker routine finishes its assignment, it writes out a file with the same header format as the input file, however, the values in the hash table describing the columns will be the key descriptor from the input file and name of the worked upon data. For example, if the input data file had a key of "Advertiser ID" and a column of "Search Term" and the worker routine was set up to count the number of searches which returned a given advertiser, the output file would have a key of "Advertiser ID" and a column of "Count of Searches". D. Data Reconstruction Routine. According to a preferred embodiment of the data reconstruction routine, referred to as monkeyjoin, all fields are reconstructed into one file, organized by key. The reconstruction occurs after the data to be worked on has been preprocessed, i.e., broken up into smaller work units, and the small work units are sent to machines on the network for processing. The data reconstruction facilitates convenient database loading. To accomplish data reconstruction, the data reconstruction routine is given input as to which data files need to be merged into one data file. Each of the data files that will become part of the database load file is supplied as an argument to the data reconstruction routine, in list format. For example, the data reconstruction routine, e.g., monkeyjoin, is called as follows: reconstruct file1 file2 file3 . . . fileN For each of the files supplied as an input argument, the reconstruction routine reads the file header information and stores the header information and data in a hash table. Once all of the headers and data have been read, each of the key values are cycled through. For every input file that had a matching key, the corresponding output columns are written. If one of the input files did not have a key entry or a value, the handling of missing or undefined values is invoked and the reconstruction routine supplies an appropriate value, per the notation in the input file header hash table. This file is written out, as the other files, with header information in the format of a Perl hash table. The hash table contains the same four keys as the hash table headers supplied by the grabber and worker routines. The values for the keys of this hash table include the same basic four keys required by this application: the columns hash table, the key hash table, the column delimiter specification and the hash table of column labels. E. Dispatching Routine. According to a preferred embodiment of the workload distributing routine, referred to as monkeyDispatcher, the CPU and computer memory intensive work occurs in the worker routines which perform the operations of interest, for example, counting unique instances within a class. This CPU and memory intensive work ideally is distributed to a number of computers. Preferably, the present system and method dispatches work to available computers on a network based on the distributing software's known usage load of the computers on the network. The dispatch routine allows one worker or grabber routine to run for each CPU of a computer attached to the network. For example, if there are twenty four counting operations to be performed, and there are twelve computers each equipped with two CPUs, two worker operations can be farmed off to each of the twelve computers for simultaneous processing to ensure the most rapid possible completion of the counting tasks. Likewise, the dispatcher routine needs information regarding which tasks or task components can be done simultaneously and which ones first require the completion of some other task component. Thus, the dispatching routine needs data about the machines which are capable of receiving the work orders, and how many work orders they may receive at once. For example, a four CPU machine could receive four orders at a time, a one CPU machine only one. The routine also stores data about 1) which machine is currently performing how many tasks at any given point in time and 2) which task(s) any of these machines is performing at any given point in time. Lastly, the dispatch routine can initiate the launching of code for processing the data on a remote machine. The preferred embodiment begins with a data file, written as a Perl hash table, which specifies the names of the available machines on the network and the total number of CPUs on the given machine. Also specified is the last known "busy/idle" state of each of the CPUs on a given machine, the last known start time. For example, in integer format, an integer indicates that a task was started on a CPU and a blank value indicates that no job is currently running on a given machine's CPU(s). Each machine on the network has one key in the hash table. Each key in this hash table points to a second hash table, the second hash table having key entries for the number of CPUs known for the given machine and the number of CPUs currently occupied doing work for that machine. On the first construction of the hash table, the value for CPUs currently occupied doing work is zero. Also in the data file the task queues are specified. The outermost key in this set of tasks hash table points to one or more hash tables which specify the components of a task, and whether or not these sub-tasks can be performed simultaneously. The keys of this outermost task hash table are simply integers, beginning with the number one, and incrementing by one for each task-set. Each of these numbered tasks points to yet another hash table, which contains keys to represent aspects of the task (such as data preprocessing, data grabbing/counting, data joining etc.). A preferred embodiment wraps these individual task keys inside a hash table whose single present key is named, for example, `parms`. The `parms` key points to a hash table with four key entries: `key`, `name`, `tasks` and `masterTaskFile`. These keys have the corresponding values, a descriptor of the column which constitutes the class level key, e.g., Advertiser ID, a tokenized, that is, a predefined representation of the data preprocessing task (for example, dejournal.lineads to represent the task list pertaining to the reduction of advertiser listings in an internet search engine), the list of paired "grabbing" and "counting" tasks which can be performed simultaneously, and the name of the output file of the data preprocessing routine. According to a preferred embodiment, the dispatch routine reads in the control file to identify available machines on the network and the machine's availability. As a task is dispatched to a machine, the dispatching software updates its memory copy of the machine availability hash table. Thus, if a machine has two CPUs and the dispatching routine sent a task to the machine with two CPUs, the dispatching software would increment the number of busy CPUs from 0 to 1, to indicate that one job has been sent to the machine on a network. When the machine performing the worker routine task finishes the task, the dispatcher decrements the busy CPU value by one for the machine that performed the task. With this mechanism in place, prior to assigning tasks to machines on the network, the dispatching software sorts the available machines by current tasks assigned to a machine. If machine X on the network has 0 busy CPUs and machine Y has 1 busy CPU, and both machines X and Y have a total of two CPUs, then the dispatching software will preferably first assign a task to machine X. This occurs because machine X has no busy CPUs as far as the dispatching software can determine. Machine X could be running some CPU intensive software without the dispatching routines knowledge. It is preferred that the computers having the CPUs only have the necessary operating system software running, to prevent the problem of work being sent to a computer whose processor is tied up with a non germane task, such as a word processing task. In the preferred embodiment, the computers only include an operating system, a program interpreter, such as a Perl interpreter, and a secure copying program, if necessary. If all machines on the network are equally busy, the dispatching software sorts the stack of available machines by machine name and assigns tasks in that way. If a machine is fully occupied, the dispatching software removes this machine from the available machine stack until the busy machine reports that it has finished at least one of its assigned tasks. If all machines are busy, the dispatching software waits for a first time period, for example, a few minutes, to retry task dispatching. If the dispatcher software has tasks queued but cannot find an available machine after a second time period, for example, fifteen minutes, the dispatcher software creates a warning message. This condition might indicate a larger system failure that would require resetting the software system and the tasks. The preferred embodiment of this invention supplies enough hardware on a network so that all pieces of hardware on this network are not likely to be completely busy for any given fifteen minutes. Once the dispatching software identifies the machine assigned a given task, the dispatching software begins assembling a set of commands to be executed on a remote computer. The command set the software assembles is specific to a task, guided by the information provided in the dispatching software's control file. The construction of these commands is specified as follows. The machine creates a name that will uniquely identify a task and the machine on which the task is to be run. This name then gets used as the directory entry mark which the software uses as an indication that a task is either running or completed. After constructing the unique name, the dispatching software uses the syntax of the freely available secure shell utility (also known as ssh) to create the command which will launch a program on a remote computer. Those skilled in the art will recognize that other existing utilities, such as remote shell execution (also known as rsh) could as readily be used. In its present form, the preferred embodiments have the computers on the network access shared disk space, so that the remote computer references the shared disk space for program code and data. Again, those skilled in the art will recognize that using existing network and remote execution tools, both program code and data could be copied to a remote computer's private disk. Thereafter, a remote execution utility could point the worker computer to the new location of the program code and data. Lastly, the dispatching software adds the syntax of the commands to remove the file and mark it created upon completion of the remotely executed task. Once this syntax is constructed, the dispatcher routine creates a copy of itself (known as forking) and overwrites this copy with a call to the constructed commands (known as a fork-exec combination). Pseudo-code is used to illustrate this process: $task="NumUniqueUsers"; $machineToUse="machine07"; $programToLaunch="monkeyGrab"; $dataFileToUse="AdvertiserReport" $programArguments="-g AdvertiserID -g $task"; $programLocation="/shared/disk/space/code"; $data Location="/shared/disk/space/data"; $dirEntryFileMark="$task. $machineToUse. system(.backslash."date.backslash.")"; $remoteExecutionTool="ssh"; $remoteExToolArgs="-secret/secrets/myKeyFile"; $commandSet="touch $dirEntryMark; x=`$remoteExecutionTool $remoteExToolArgs $machineToUse `$programToLaunch $programArguments $dataFileToUse``; if [$x eq 0]; then rm $dirEntryMark; fi"; forko( ) exec("$commandSet"); If the dispatcher routine's control file indicates that a particular process or process pair can be executed simultaneously, it loops over the steps just described to launch as many processes as are required by the control file and that can be handled by the existing network. If the control file indicates that a process must finish before one or more other processes must begin, the dispatcher routine waits for such a serial task to finish before launching more serial or parallel tasks within a task queue. F. Special Cases and General Extensibility A preferred embodiment includes a special case of a worker routine, referred to as monkeyLoad. Like all the other worker routines that can be created in the framework of the present system and method, monkeyLoad has the capability of parsing file headers which are in the form of Perl evaluable code. This monkeyLoad routine takes the file header information and creates a set of SQL (structured query language) statements to insert the data which follows the file header into a database. Through a set of standardized and freely available database interfaces for the Perl language, this routine can read the data lines in the output file and insert these as rows into a database. Those skilled in the art will recognize that the worker routine could also read and evaluate the file header, for example, to produce a control file for another routine, which might have more efficient interactions with a database routine, such as Oracle's SQL loader routine (sqlldr). The special requirement of this routine is that the database columns match the column labels provided in the file header. This detail is attended to at the beginning of the process, in the initial control file which a user creates and where the user can specify arbitrary data column labels. Any given worker routine functions by reading in the file header information, evaluating it, and upon output, creating another file header which has a minimum number of keys, e.g., four which any of the worker routines needs to function. The one special instance of a worker routine demonstrates that the present system and method can be generalized. Since the worker routines can parse the file header information, worker routines can accomplish many useful things. One can readily recognize that instead of being instructed to count unique instances, a worker routine could be written to add, subtract, divide, multiply, compute standard deviation and so forth. The unique functionality any such routine requires is the ability to evaluate the header information in a data file as executable code that translates into a hash table with a minimum number of keys. Although the invention has been described and illustrated with reference to specific illustrative embodiments thereof, it is not intended that the invention be limited to those illustrative embodiments. Those skilled in the art will recognize that variations and modifications can be made without departing from the true scope and spirit of the invention as defined by the claims that follow. It is therefore intended to include within the invention all such variations and modifications as fall within the scope of the appended claims and equivalents thereof.
|
Same subclass Same class Consider this |
||||||||||
