DEVICE DRIVER COMMUNICATION

Creation and migration of distributed streams in clusters of networked computers

6047323

Abstract

A distributed STREAMS process operates on a multicomputer system composed of a cluster of nodes of one or more processors running an operating system having a file system and a STREAMS message-passing mechanism implementing network protocols, client-server applications, and STREAMS-based pipes. A local node has a software application operative under the operating system. The distributed STREAMS process determines that it is operating within a cluster and transparently intercepts application open requests which are sent to a controlling thread (CT) created during node initialization. The CT determines whether the open is to occur on the local or a remote node and whether any cluster facility should be activated by examining major and minor numbers encoded within the file structure being opened. If the CT determines a local open is to occur, it performs a local open, as normal, and activates the indicated cluster facilities. If the CT determines a remote open is to occur, it uses the STREAMS mechanism to establish a connection to the remote node via a STREAMS software interconnect driver (S-ICS) operating on both nodes. The local node's CT then communicates the open request to the remote node's CT which performs an internal STREAMS open to create data structures and infrastructure to ensure proper communication and error recovery. If a failure occurs, the CT and S-ICS detect this failure and transparently initiate error recovery by migrating failed components, if possible, to new node(s) within the cluster. This migration capability can also be used to provide load balancing within the cluster of distributed STREAMS.


Claims

What is claimed is:

1. A multicomputer system capable of distributed STREAMS operation, the system comprising:

a cluster of two or more nodes connected via a data communications interconnect subsystem, each node including a computer having one or more system processor units, local memory, and input/output subsystem;

an operating system running on each of the system processor units in the cluster, the operating system including a STREAMS message-passing mechanism for use in implementing one or more of networking protocols, client/server applications and services;

a software application operative on the system processor unit of at least one node under control of the operating system to perform a task or solve a problem;

means for creating a distributed STREAMS instance on a first of said nodes (initiating node) which is independent of the application and the message-passing mechanism; and

means for migrating at least a portion of the distributed STREAMS instance to a second of said nodes (target node) for execution of a selected task of the software application on the second node transparently with respect to the networking protocols, client/server applications and services on the first node.

2. A multicomputer system according to claim 1 in which:

the STREAMS message-passing mechanism includes a data structure implemented in each computer which provides a bidirectional data path between a user process and a device driver via a module which is an intermediate element that can be dynamically added to or removed from the data path;

the device driver residing in a system kernel of the operating system and operative to control a peripheral device to transfer data between said kernel and said device so that data received by the device driver can travel upstream to the user process; and

the STREAM data structure including a stack stored in local memory and containing a set of function pointers for controlling execution of module and driver functions.

3. A multicomputer system according to claim 2 including:

controlling thread means in the operating system for communications with and control of the STREAMS message passing mechanism in accordance with a cluster parameter; and

means defining a set of preview functions including a dynamic function replacement function for multiplexing messages between components of the system;

the device driver including a physical cluster interconnect driver (P-ICS) that provides basic communications facilities between nodes in the system under control of the controlling thread and in accordance with the preview functions.

4. A multicomputer system according to claim 3 further including means defining a STREAMS software interconnect driver (S-ICS) for moving messages between two or more of the following system components: the application's STREAMS stack, the controlling thread, the P-ICS, and the preview functions.

5. A multicomputer system according to claim 4 including means defining a middleware thread operative to process and communicate messages between the operating system and the S-ICS software interconnect driver.

6. A multicomputer system according to claim 4 in which the means for creating a distributed STREAMS instance includes:

a file system in the operating system of each node operative upon performing an open function responsive to an open request from the application to initiate opening of the STREAMS mechanism;

means for mapping and saving a device number of the interconnect driver in a first of said nodes (initiating node) to a target device number;

means in the STREAMS mechanism for opening the selected driver on the first node;

means for remapping the saved device number into a node address for a second target node;

means in the interconnect driver for transmitting a message to the target node, the message including the application's open request and the saved device number; and

the file system in the target node being responsive to the message to open the target driver.

7. A multicomputer system according to claim 6 in which the target node operating system includes means for returning a message to the initiating node's interconnect driver including a stream head address for the target driver.

8. A multicomputer system according to claim 2 in which the means for migrating at least a portion of the distributed STREAMS instance to a second, target node includes means for communicating at least a portion of the stack on the first node to the second node.

9. A multicomputer system according to claim 8 including:

means for issuing a ioctl representing to the STREAMS mechanism that migration is to occur;

the STREAMS mechanism including means responsive to the ioctl to freeze the stack on the initiating node and forwards a message to the target node which includes information from which the stack can be re-created on the target node.

10. A multicomputer system according to claim 9 in which the STREAMS mechanism includes means for performing a marshalling function upon a portion of the data structure to assemble information needed to replicate a private data structure for a module represented in the stack on the target node.

11. In a multicomputer system capable of STREAMS operation, the system which including a cluster of two or more nodes including a local node and a remote node connected via a data communications interconnect subsystem, each node including a computer having one or more system processor units, local memory, and an input/output subsystem, an operating system running on each of the system processor units in the cluster, the operating system including a file system and a STREAMS message-passing mechanism having an open function for creating a stream head and a driver with bidirectional communications, client/server applications and services, a software application operative on the system processor unit of at least one node under control of the operating system to perform a task or solve a problem, a method of creating a distributed STREAMS data structure, the method comprising:

initiating a controlling thread on each node in the cluster, including determining where STREAMS drivers are located within each node and determining which facilities are associated with each driver;

setting a flag in the STREAMS mechanism on each node in the cluster indicating that clustering is enabled;

assigning a file name uniquely representing a specific driver on a node in the cluster, the file name encoding a major number table identification for each driver and a minor number parameter for selectably identifying at least one of a driver on a remote node and local facilities on the local node;

communicate the file names of the drivers on each node to the other nodes in the cluster through the file system;

running the STREAMS open function to open the driver for the file, looking at the file name to ascertain that it is a clustering facility and, if so, passing the initial major number and minor number to the controlling thread;

the controlling thread using the major and minor numbers to look up the represented devices and facilities and deriving a new major and minor number and passing those numbers along with a set of facilities identifiers back to the STREAMS mechanism; and

if the initial major number and minor number pertain to facilities on the remote node, the STREAMS mechanism in the local node opening a STREAMS software interconnect driver (S-ICS) on the local node and the S-ICS driver communicating an open request to the controlling thread on the remote node;

the controlling thread being operative upon initialization to establish a STREAMS software interconnect driver (S-ICS) on the local node;

the controlling thread on the remote node performing an internal STREAMS open to create on the remote node a distributed STREAMS instance of the STREAMS data structure on the local node.

12. A method according to claim 11 including, if the initial major number and minor number pertain to facilities on the local node, the STREAMS mechanism in the local node opening a designated local driver to enable the local facilities associated with that driver.

13. A method according to claim 11 in which:

the STREAMS message-passing mechanism includes a data structure implemented in each computer which provides a bidirectional data path between a user process and a device driver via a module which is an intermediate element that can be dynamically added to or removed from the data path;

the device driver residing in a system kernel of the operating system and operative to control a peripheral device to transfer data between said kernel and said device so that data received by the device driver can travel upstream to the user process; and

the STREAM data structure includes a stack stored in local memory and containing a set of function pointers for controlling execution of module and driver functions;

the method including migrating at least a portion of the distributed STREAMS instance to a second of said nodes by communicating at least a portion of the stack on the first node to the second node.

14. A method according to claim 13 in which the STREAMS stack executes on a different node from the node where the application accessing it is executing.

15. A method according to claim 13 in which the STREAM is broken into constituent components at the module/driver/stream head levels with each component being executed on different, individual nodes within the cluster.

16. A method according to claim 13 in which the distributed STEAMS data structure includes a STREAMS-based pipe having two pipe ends, each end executing on a different node within the cluster.

17. A method according to claim 13 in which the STREAMS stack is migrated, either in whole or part, from one node to another.

18. A method according to claim 13 including detecting a failure condition on the second node and initiating error recovery.

19. A method according to claim 13 including performing a marshalling function upon a portion of the data structure to assemble information needed to replicate a private data structure for a module represented in the stack on the second node.

20. A method for migrating at a portion of a stack on multicomputer system capable of distributed operation, from a first node to a second node, wherein the stack contains a set of function pointers for controlling execution of module and driver functions, the method comprising the steps of:

issuing a system call requesting migration;

freezing the stack to prevent message flow into and out of the stack;

marshaling all information necessary to replicate the portion;

delivering the marshaled information to a control thread on the second node;

recreating the portion on the second node;

releasing the remaining portion of the stack; and

activating a communications route between the portion on the second node and the remaining portion on the first node.

21. The method of claim 20, further comprising the step of:

informing the control thread on the second node that migration will occur.

22. The method of claim 20, wherein the step of freezing comprises the steps of:

preventing entities from queuing messages for the stack; and

preventing the stack from sending out messages.

23. The method of claim 20, wherein the step of marshaling comprises the step of:

forming a marshaling structure for each component of the stack which will be migrated.

24. The method of claim 23, wherein:

the marshaling structure includes state specific information.

25. The method of claim 24, wherein:

the state specific information includes information detailing queued messages awaiting processing, queued messages awaiting transmission, and timer values.

26. The method of claim 20, wherein:

the system call contains necessary information for the migration of the portion of the stack.

27. The method of claim 20, wherein:

the migration is occurring to facilitate load balancing of the nodes of the multicomputer system.

28. The method of claim 20, wherein:

the migration requires the stack to be split between a module and a driver.

29. The method of claim 20, wherein:

the migration requires the stack to be split between a multiplexor and a driver.

30. The method of claim 20, wherein the portion is a driver, the method further comprising the steps of:

installing a software interconnect driver in the first node; and

linking the software interconnect driver with the remaining portion of the stack in the first node to allow communications with the migrated driver in the second node.

31. The method of claim 30, further comprising the step of:

updating tables to allow inbound packets to be routed to the migrated driver on the second node through the software interconnect driver.

32. The method of claim 20, wherein the portion is a module, the method further comprising the steps of:

installing an upper level software interconnect driver in the first node; and

linking the upper level software interconnect driver with the remaining portion of the stack in the first node to allow communications with the migrated module in the second node.

33. The method of claim 32, further comprising the step of:

updating tables to allow inbound packets to be routed to the migrated module on the second node through the software interconnect driver.

34. The method of claim 20, further comprising the step of:

reallocating resources in the first node which were occupied by the migrated portion.

35. The method of claim 20, wherein:

the step of issuing a system call is performed by a third node.

36. The method of claim 20, wherein:

the distributed operation is STREAMS distributed operation, and the stack is a STREAMS stack.

37. A multicomputer system that is capable of distributed operations and includes a plurality of nodes connected via communications interconnects, the system is running an operating system which uses a messaging mechanism in a portion of its operations, the system comprising:

a plurality of controlling threads, with one controlling thread associated with each node of the plurality of nodes, wherein each controlling thread is resident on a kernel of its associated node, and each controlling thread managing the messaging mechanism resident within its node;

a plurality of stacks, wherein each stack is capable of containing a set of function pointers for controlling execution of module and driver functions useable by the plurality of nodes;

a plurality of a physical interconnection drivers (PID), with one PID associated with each node of the plurality of nodes, wherein each PID which provides communication facilities between its node and the remaining nodes of the plurality of nodes, each PID is under the control of the control thread of the same node;

a plurality of a software interconnection drivers (SID), with one SID associated with each node of the plurality of nodes, wherein each SID allows communications between the PID of its associated node and at least one other component of its node, each SID is under the control of the control thread of the same node; and

a plurality of preview functions, with at least one preview function associated with each node of the plurality of nodes, wherein each preview function is a policy regarding a system wide facility, the controlling thread of the associated node will use the at least one preview function to modify operations of the associated stack.

38. The system of claim 37, wherein:

each controlling thread acts as a third-party communication and control point for its associated node.

39. The system of claim 37, wherein:

the stack is created with assistance from the controlling thread of its associated node.

40. The system of claim 37, wherein:

a portion of the stack of one node of the plurality of nodes may be migrated to another node of the plurality of nodes.

41. The system of claim 37, wherein:

each stack may utilize components resident on other nodes of the plurality of nodes.

42. The system of claim 41, wherein:

a controlling thread may create components to allow the stack associated with its node to utilize components resident on the other nodes.

43. The system of claim 37, wherein:

a controlling thread creates the SID for its associated node.

44. The system of claim 37, wherein:

each node includes at least one SMP processor.

45. The system of claim 37, wherein:

the distributed operation is a STREAMS distributed operation, the messaging mechanism is a STREAMS messaging mechanism, and the stack is a STREAMS stack.

46. The system of claim 37, wherein:

a stack of the plurality of stacks includes a stream head and a driver.

47. The system of claim 37, wherein:

a stack of the plurality of stacks includes a stream head, at least one module, and at least one driver.


Description

Copyright Hewlett-Packard 1996: The disclosure of this patent document contains material which is subject to copyright protection. The copyright owner has no objection to facsimile reproduction by anyone of the patent document or the patent disclosure, as it appears in the Patent and Trademark Office patent file or records, but otherwise reserves all copyrights whatsoever.

BACKGROUND OF THE INVENTION

This invention relates to the interoperation of networked computers and more particularly to a distributed operating system for a cluster of computer nodes.

A node is a computer which is composed of one or more computational processors (potentially a SMP--Symmetrical Multi-Processor), local memory, an Input/Output subsystem, and optional peripheral devices. Hence, a cluster is a set of two or more nodes that cooperate at some level to solve a problem. The nodes within a cluster may be interconnected using a variety of software and hardware solutions depending upon the price-performance requirements. Depending upon how a cluster is implemented--shared memory or message passing--a cluster can provide the following capabilities: high availability since no single point of failure mandates halting a distributed application, application load-leveling to increase or more efficiently utilize cluster-wide resources, hardware sharing, increased aggregate computational and I/O bandwidth, increased access to resources such as mass storage, networking, etc.

STREAMS is a relatively light-weight message-passing framework that has become a deface standard for implementing networking protocols and client/server applications/services upon. The conventional STREAMS mechanism is described in UNIX System V Network Programming, S. A. Rago (1993), Ch. 3 and 9. For example, NFS, UDP/IP, TCP/IP, SPX, NetBIOS, SNA, and DLPI all define networking protocols which have been implemented by a number of vendors utilizing the STREAMS framework. Applications based on these protocols and services should be able to take advantage of what a cluster has to offer if clustering is to be successful.

Accordingly, a need remains to modify the STREAMS framework to comprehend cluster infrastructure needs and to facilitate applications and cluster facilities executing within this environment.

SUMMARY OF THE INVENTION

An object of the invention is to solve a large class of customer problems such as high-availability, increased computational and I/O performance, single-point of administration, etc. in the multicomputer environment (commonly known as a cluster) by enabling STREAMS to work in such an environment.

In accordance with the invention, the STREAMS framework is modified such that it can create distributed streams and be able to migrate these streams among nodes in the cluster.

Another object of the invention is to make all of this work transparent to customer applications, both in user-space and within the kernel. The process of the invention meets this need by modifying the STREAMS framework to shield the software from realizing it is within a clustered environment, a problem no other vendor has solved to my knowledge.

A cluster is an arbitrarily large set of computers which are interconnected via a high-speed communication link. The cluster, itself, can be connected to non-clustered nodes via standard networking links such as FDDI, ATM, or 100VG-hardware links and drivers installed on a subset of the cluster nodes providing gateway services. These standard networking links can use standard networking protocols such as TCP/IP to communicate while the cluster interconnect preferably uses custom lightweight protocols.

A node is a computer which is composed of one or more SPU (System Processor Units), local memory and cache, an Input/Output subsystem, and optional peripheral devices such as mass storage. A node is used to solve a problem.

A distributed stream is a stream which has been split between two nodes at some predesignated point. This will be commonly done between a module and a driver. For example, the TCP/IP protocol can be split such that TCP or UDP resides on one node and IP resides on another. The rules for deciding whether to split are usually based on what state information is maintained, where and performance considerations. For the reminder of this description, I will use TCP/IP as an example since it is well understood and is commonly implemented.

A distributed STREAM is a STREAM which has any of the following characteristics/properties:

The application and the entire STREAM stack may exist on the same node, just as in the non-clustered environment, but the STREAMS-related framework infrastructure can be utilized to take advantage of the cluster facilities such as load leveling, high availability, etc.

The STREAM stack may execute on a different node from where the application accessing it is executing.

The STREAM may be broken into constituent components at the module/driver/stream head levels with each component being executed on different, individual nodes within the cluster.

STREAMS-based pipes may have each pipe end executing on a different node within the cluster.

A STREAM stack may be migrated, either in whole or part, from one node to another.

The invention implements distributed STREAMS in a cluster using the following system components: a controlling thread, a set of preview functions, and a P-ICS driver; and preferably also a S-ICS driver.

A thread is a software processing element that provides a set of functionality via executing application instructions on a computer.

A controlling thread is a specialized thread which acts as a third-party communication and control point for a distributed STREAM. The basic functions are described in the detailed explanation of the technology.

A S-ICS driver is the STREAMS software interconnect driver which is responsible for moving messages between any of the following components: The application's STREAMS stack, the controlling thread, the P-ICS, and preview functions. A P-ICS is the physical cluster interconnect which provides the basic communication fabric.

A preview function is a STREAMS-based function enabled via STREAMS Dynamic Function Replacement which examines messages to multiplex them between the various components within this system.

FIG. 5 illustrates a distributed stream, an interconnect driver, and an independent middleware software thread. Note that the middleware involved, while key to making distributed STREAMS possible, the processes being described are completely independent of the middleware implementation--a significant advantage since it makes it possible to interoperate with multiple middleware suppliers as long as the communication interface is standardized.

As can be seen, the stream has been split onto two or more nodes and makes use of middleware existing upon one of the two nodes or a third node. The interconnect drivers are further split between an intelligent Streams-based driver (S-ICS) and a dumb, physical driver (P-ICS--FIGS. 2A, 2B, 3A, 3B) which may be reused for non-Streams related internodal communications. The intelligent Streams-based driver contains the protocol for communicating with the middleware and the kernel daemons which handle STREAMS requests/replies.

The basic stream creation algorithm for creating the illustrated configuration is given below. This algorithm utilizes a new in-kernel STREAMS interface.

Open the device file via the normal open() path. Since the file system has a single system view from the application's perspective, no application changes are necessary but it is necessary to determine the correct node to perform the open upon, as described.

When the file system performs the open, VFS (virtual file system) will eventually call the STREAMS framework open routine. Using the framework's autopush and configuration capabilities, instead of opening the target driver, IP in this case, we transparently open the interconnect driver. This is accomplished by remapping the device number of the interconnect driver to the target device number which is stored for later usage. The kernel remapping actually occurs at the time the drivers are installed within the system during system boot. The STREAMS framework may need to recognize when a driver has been configured for clustering.

STREAMS then performs the interconnect driver open just as it normally would for any driver. At this point, we have only a stream head and a driver instance on the node where the application currently resides.

The next step is to perform an open on the node where the actual driver resides. In order to do this, the saved device number is remapped into a node address. The STREAMS-based interconnect driver communicates to the middleware via a suitable protocol and determines what the target remote node is and all pertinent communication information. This communication may either be configured as part of the interconnect driver's open routine processing or it may be initiated via a transparent ioctl to the driver telling it to probe and take the following action--this is an implementation issue but it should be noted.

The interconnect driver then creates a message which is sent to the target node to a well-known address. Within this node, there exists an in-kernel daemon or controlling thread which listens for such requests and is awakened upon the message arrival. This daemon decodes the request and optionally (1) processes it itself, (2) performs a thread-create to handle the request, or (3) hands it off to an existing thread which is already multiplexing multiple target driver instances.

At this point, the thread handling the request will perform an in-kernel streams.sub.-- open() on the target node using the original device number derived from the application's open request. The streams.sub.-- open() opens up the IP driver just as though the IP driver existed on the same node as the application. Once IP is opened, the stream head address is returned to the initiating node's interconnect driver and this address combined with the remote node's address are used to create a unique addressing tuple within the cluster without having to create another independent naming scheme. This tuple will later be used for stream migration and verification.

At this point, the application should be able to return from the open() and continue processing without any modification.

As can be seen, this process does not require any modifications to the user application, the TCP module, or the IP driver--a key capability for a cluster configuration to be successful.

Stream migration involves migrating either one or both halves of the distributed stream within the cluster to new nodes. The migration must occur transparently to the application and should not require modifications to the stream stack's components. Using the previous example, we have two points of potential failure. The first point is the node where the IP driver resides. If this node fails for some reason, the connections currently executing through this node should not fail nor notice the failure. The interconnect driver would see that it has lost its connection with the node and initiate a recovery protocol using the previously described stream creation process.

For the upper-half of the stream, it is sometimes desirable to migrate applications for various reasons such as load-balancing or system maintenance. This type of migration is more difficult since the upper-half and the application must be placed in stasis until the migration is completed and the lower-half must not attempt to ship inbound data to the upper-half since the upper-half will not be able to process the information and this could lead to timing and data corruption problems.

Conventional implementations involving splitting TCP/IP onto different nodes have relied upon modifying the actual modules involved, which directly impacts performance, supportability, maintenance costs, increases development time, increases cost, etc. This is an unnecessary burden since it limits what modules and drivers can be easily executed within a clusters. In order to remove this burden, the following process is given:

The process proposed to migrate TCP (used only as an example since this is applicable to any module) within a distributed STREAMS environment utilizes key features of the STREAMS framework combined with a new ioctl to transparently migrate the TCP module without it being aware that the migration is occurring and without requiring modification to the module itself. This process requires one function to be written by the module/driver developer which is used to marshall the data structures and messages currently associated with the structure pointed to by q.fwdarw.q.sub.-- ptr. The actual details of this function are not relevant at this time, only that this is a critical step in transparently migrating a stream.

The basic stream migration is as follows:

An independent thread on node A or the application itself, should it be designed to migrate itself or connections from one node to another, issues a new ioctl which will be interpreted by the STREAMS framework that migration is to occur. This ioctl will contain all of the necessary information to move the stream to a new node.

The framework issues an ioctl to the interconnect driver to inform its remote driver handler thread that it is about to migrate and to treat it as being flow controlled, for all intents.

The framework then freezes the local stack and executes a marshalling function upon each module's q.sub.-- ptr structure, a module private data structure. This marshalling function will return all of the information necessary to replicate this structure and associated memory on the target node.

When this information is gathered, the framework delivers it and all framework specific information, such as stream head state and messages, and sends this to a daemon on the target node which then either re-creates the stream stack immediately or awaits the process to be re-established and then recreates based on which node opened the stream. For example, if this was an in-kernel application such as NFS then the stream could be completely recreated immediately and the NFS process would be notified of its creation and re-initialization. If this was a user-space socket application, the process waits until the application is fully migrated before re-creation occurs.

Once the stream stack has been recreated, the new interconnect driver instance informs the remote node that it is now capable of receiving messages and the communication begins anew. All of this is transparent to the application, the modules, and the drivers involved.

Similarly, should the node where the IP driver runs fail, the user-application will need to continue operation without change. For this to occur, the interconnect driver will need to recognize when failure has occurred and re-establish its connection via the previously described open process on a new node.

The processes described provide the following advantages within the cluster environment:

Streams may be split and distributed to multiple nodes without requiring modification to the drivers or the modules involved. In the past, only heavy modification to these components allowed them to be distributed which added complexity and cost performance.

No modifications means shorter development times, reduced support and troubleshooting time, and reduced testing times. In addition, the developer does not need to learn how the cluster environment works since, for all intents and purposes, well-designed STREAMS drivers and modules work together identically to the way they worked on a single node. This means a user of the invention will be able to open the cluster environment to third-party ISVs faster and at a lower cost than prior systems.

Streams may be migrated between nodes without requiring modifications to either the drivers or the modules, again removing the complexity and performance loss from the current mechanisms used.

Migration requirements are limited to a single function which is independent in its execution of the driver or module's normal STREAMS framework interaction.

The system may be utilize any P-ICS driver without requiring any modifications to the basic technology implementation within the controlling thread or the S-ICS; the technology provides a set of common data structures and interface which provides P-ICS independence. This allows the technology to transparently take advantage of future P-ICS improvements.

Any STREAMS-based implementation of a protocol or application stack may be transparently, i.e. ported without modification, implemented and executed within a cluster.

The basic STREAMS framework implementation operates without modification to any DDI/DKI (Device Driver Interface/Driver Kernel Interface) routines. In addition, all STREAMS framework messages, commands, logging and administration drivers, system calls, Dynamic Function Replacement, Dynamic Function Registration, and stream head behavior all execute just as in the non-distributed environment. This allows the stack to be designed using the normal single-system paradigm and does not require, aside from marshalling functions if supported, the designer to understand or develop with the cluster configuration in mind.

The distributed STREAMS implementation may be performed without modifying the VFS (virtual file system) implementation which ensures portability to other operating systems which implement STREAMS under VFS.

It is assumed that the reader is skilled in the art and familiar with clustering, operating system design, networking, distributed systems, and STREAMS concepts and framework design/implementation. Additional terminology is defined in the Detailed Description.

The design is protocol independent. This means that the design may be used for a variety of network or other application protocols and does not require any changes to be implemented. Some of the preview functions may require minor modifications but these will center more around how the solution is applied. For example, the IP bind preview functions are specific to the global port management solution but are not required to implement the base TCP/IP implementation using this technology.

The design may make use of any number of different P-ICS driver implementations without requiring S-ICS driver or controlling thread redesign. This allows the design to take advantage of new technologies or fabric paradigms such as shifting from receiver-based communication to sender-based communication which offers a number of simplifications and performance improvements.

The design illustrates how all aspects of the STREAMS framework are accomplished within a distributed environment. This includes commands, logging (strlog), administration (SAD), pipes, system calls, DDI routines, etc. For the most part, the STREAMS framework does not require many modifications since the controlling threads, S-ICS drivers, and some of the, preview functions perform a majority of the work. This allows the technology to be quickly integrated into other STREAMS implementations other than just HP's. In addition, because the S-ICS drivers and controlling threads are independent of the STREAMS implementation, other than requiring the Function Replacement capabilities, their design should be portable to other vendor's platforms.

The design does not require STREAMS-based modules and drivers to be modified to operate within a distributed environment. This is a key issue since both Sun and Locus appear to require the protocol modules such as TCP, UDP, and IP to be extensively modified to operate. This is really where this design differentiates itself from anything else that has been proposed. The design concentrates the changes within the different components outside of the modules and drivers supplied by the application which makes porting them and supporting them much faster and cost effective.

The design also eliminates the need to modify the VFS (virtual file system) layer to accommodate distributed driver opens. The design proposes a unique and protocol independent open and migration algorithm.

The foregoing and other objects, features and advantages of the invention will become more readily apparent from the following detailed description which proceeds with reference to the drawings.

BRIEF DESCRIPTION OF THE DRAWINGS

FIG. 1 is a block diagram of an example cluster of computer nodes interconnected by high speed digital data interconnects and operable in accordance with the present invention.

FIGS. 2A and 2B are more detailed block diagrams of a portion of the example cluster of FIG. 1 showing alternative modes of interoperation of nodes C and D.

FIGS. 3A and 3B are more detailed block diagrams of a portion of the example cluster of FIG. 1 showing alternative configurations for splitting the stream stacks for operation of TCP/IP on nodes A and B.

FIG. 4 is a diagram of the data structure packet routing process in the configurations of FIG. 3B.

FIG. 5 is a diagram similar to FIG. 3B showing the packet routing process in multiple instances for global mapping of multiple ports under control of a middleware thread.

FIG. 6 is a diagram of a data structure used to implement the packet routing process of FIGS. 4 and 5.

FIG. 7 is a diagram illustrating a portion of the flow of messages between the controlling thread and the middleware in the process of FIG. 5 showing how the controlling thread informs the middleware that a connection has been made.

FIG. 8 is a diagram illustrating a portion of the flow of messages between the controlling thread and the middleware in the process of FIG. 5 showing how the middleware informs the controlling thread on a source node of a component failure and sends recovery target node/thread information to the source node.

FIG. 9 is a diagram of an example of a data structure used to implement the S-ICS of FIGS. 4 and 5 and showing how routing can be accomplished in the present invention, including a route recovery mechanism that can be used in the system of FIG. 2B.

FIG. 10 is a state/flow diagram of a process for locating a remote index using the data structure of FIG. 9.

FIG. 11 is a state/flow diagram of a process used by the S-ICS of FIGS. 4 and 5 to locate the target queue in the data structure of FIG. 9.

FIG. 12 is a state/flow diagram of the process for locating the target queue index using the data structure of FIG. 9 to implement the preview function of FIG. 2B on Node D.

FIGS. 13A and 13B are a flow diagram of message routing through the P-ICS and S-ICS to the DLPI or IP in the system of FIGS. 2A and 2B.

FIG. 14 shows one alternative process for opening a driver in a STREAMS framework, the process being distributed over two nodes in the cluster of FIG. 1 with a recovery mechanism.

FIG. 15 shows a second alternative process for opening a driver in a STREAMS framework only on a remote node in the cluster of FIG. 1 without a recovery mechanism.

FIG. 16 is a diagram of the system of FIG. 1 showing operation of a log multiplexor in accordance with the invention to present a single system view of STREAMS logging.

APPENDIX A is A Transport Independent and STREAMS Kernel Interface description of the STREAMS interface implemented in HP-UX 10.10 and later releases.

DETAILED DESCRIPTION

1.0 General Description

1.1 Example Cluster Setup

Referring to FIG. 1, the example cluster is based on combining a number of new and existing technologies to create a unique cluster solution which cannot be easily duplicated using conventional distributed application techniques. The cluster can be used by remote client applications to access a distributed database which is used to implement a highly available data warehouse. The client applications can utilize a world-wide web browser such as Netscape to create query requests and to examine the results. A browser offers advantages over conventional client software in that it has a consistent interface and operating criteria which makes it easier for remote users to learn and execute and to examine data.

The number of nodes used to implement the various portions of this solution within the cluster depends upon the size of the data warehouse and the number of clients which are simultaneously accessing the data. For example, a small data warehouse with several hundred gigabytes of data can be implemented using four to eight nodes for the data warehouse and two nodes acting as the Internet gateways. A large data warehouse consisting of several terabytes of data would be implemented with sixteen to thirty-two nodes with four to eight nodes acting as Internet gateways if the number of remote clients is high with the remaining nodes acting as data base servers.

Each Internet gateway node can communicate to the remote clients using one to four 622 Mb ATM links. All nodes will be linked together using a cluster interconnect solution which may consist of a combination of hardware, software, and firmware technologies.

The example shown in FIG. 1 utilizes a small, four node cluster containing nodes A, B, C, and D. Nodes A and B implement the distributed data base which provides the data warehouse; nodes C and D act as the Internet gateways and query engines providing access to the remote clients. All nodes are interconnected via some high speed technology. The interconnect may be through some arbitrated loop or a switch technology. In any case, all nodes may directly access every node within the cluster. The actual technology used to implement the interconnects is independent of distributed STREAMS, but there are, of course, preferred technologies which make the STREAMS design and implementation easier. This independence is a design advantage since new technologies may be incorporated and taken advantage of relatively transparently--this leads to a longer-life, lower cost solution for applications, protocols, and cluster services built on top of distributed STREAMS.

The assumptions made about the cluster are:

The cluster is interconnected via software/hardware upon which STREAMS and streams-based drivers may execute.

Each node's operating system instance is multi-threaded and should be MP-scalable.

The cluster may provide an optional middleware framework to which STREAMS may probe or supply information. Such communication takes place via a well-defined interface. A middleware framework, at least some entity such as a controlling thread is needed if STREAMS is to make intelligent decisions for the various operations it performs. Note: while this STREAMS design, itself, is middleware independent, the actual extent to which the design implementation actually utilizes this middleware is strictly implementation dependent. This should be kept in mind when understanding what the controlling thread is and how it might be used to perform some, if not all, of the middleware tasks depending upon how it is implemented.

A distributed stream is a STREAM which can operate over a clustered environment in any of the following, but not restricted to, ways:

The application and the entire stream stack may exist on the same node, just as in the non-cluster environment, but the STREAMS-related cluster framework infrastructure is being utilized to take advantage of the cluster facilities such as load leveling, high availability, etc.;

The stream stack may execute on a different node from where the application accessing it is executing;

The stream may be broken into constituent components at the module/driver/stream head levels with each component potentially being executed on individual nodes within the cluster;

In the case of streams-based pipes, each end of the pipe may exist on a separate node within the cluster.

The main point is that the stream has been somehow separated from its accessing application in some manner which would not normally exist if the stream were operating on a single node. This separation is made for a number of reasons which will be explained below.

A stream is distributed for any of the following (but not restricted to) reasons: to take advantage of cluster-wide facilities, for applications to have a single system view of a cluster 20 which should facilitate their execution and management within a cluster, and to provide high availability, load balancing, and hardware sharing. A side effect of distribution is higher overall cluster performance with reduced CPU overhead. To illustrate these concepts, the following examples are given using the example cluster described in the next section.

1.2 Hardware Sharing

A cluster 20 implemented in accordance with the invention enables substantial hardware sharing. Hardware sharing offers a number of benefits over the conventional system configuration:

Increased hardware selection and flexibility since each node may be scaled in terms of processing capabilities, memory, I/O backplane, etc. to the task it is performing. In other words, the cluster is not required to be a homogenous set of nodes. In the FIG. 1 example cluster, the gateway node (C, D) configuration can be: 4-way SMP, 32-bit processors, two fast-wide SCSI 4GB discs, 256 MB of memory, two 622 Mb ATM networking cards, and one interconnect card though two could be installed if HA is an issue. The database server configuration can be: 4-way SMP, 64-bit processors to address a large address space, Fibre Channel attached disc arrays with multiple TB of disc capacity, and one (two) interconnect card

Lower hardware purchase and maintenance costs over the cluster lifetime. In the example cluster 20, only the gateway nodes C, D need to have 622 Mb ATM cards installed. This not only reduces the number of cards, which while short-term expensive are long-term inexpensive since the fixed card cost is amortized over the cluster lifetime, but it also reduces the number of communication lines that must be installed. A communication line that can support a 622 Mb ATM card is quite expensive and each line has recurring usage and maintenance costs that may be difficult to amortize over the cluster lifetime.

Improved security due to solution being partitioned. The three main parts in this example solution--the remote client, the gateway, and the database server--all execute on different nodes. This allows both physical and software security to be easily implemented and enforced at these well-defined execution points.

Solution partitioning also reduces the number and types of failure points. For nodes A and B, the failure points are restricted to mass storage, the database software, the distributed application, and the cluster interconnect. For nodes C and D, the failure points are restricted to mass storage (but on a much smaller scale since these nodes should not require much mass storage), the gateway hardware, the networking protocol stacks, the query engine, the Internet software, and the cluster interconnect. In any case, with a reduced number of failure points, the solution has fewer places where Murphy's law applies, increased fault isolation, easier debugging, reduced cluster complexity, and lower administration/support costs since a single point of database or gateway administration may be created.

Within the example cluster environment, hardware sharing also improves overall aggregate cluster performance and efficiency by keeping the TCP/IP stack executing only on the gateway nodes. This is accomplished using the standard RPC (Remote Procedure Call) function shipment paradigm to ship only the results of remote execution to each node.

In a conventional distributed client/server application, each node would normally execute the TCP/IP stack and then use packet forwarding to the gateway nodes to communicate to the remote clients. The problem with the conventional approach is that the number of cycles necessary to perform all of the work associated with creating, processing, and routing packets through the gateway may be several thousand of cycles per packet. Expending this many cycles per packet, considering the number of remote clients possible and the amount of data being routed, is not optimal and leads to reduces cluster performance:

Cycles spent on networking are not being spent on the database server so the customer is not realizing the maximum system performance that they were paying for. In addition, the instruction cache will suffer greater flux from switching between the database application and the networking code which will result in more cache misses reduced processor performance. Implementing network protocols over the cluster interconnect increases packet latency by increasing the contention for system resources (memory, processor and I/O buses, timers, etc.) and the interconnect I/O card. This translates into reduced cluster-wide performance for the distributed database and increased response time for the remote applications. Note: latency is viewed as the key to creating high-performance clusters so lower latency generally translates into faster response time and hence higher throughput and performance.

The interconnect solution can become overly complex, resulting in poor performance and higher design costs, if it must support all of the capabilities of transport protocols such as TCP or UDP require.

Using STREAMS-based TCP/IP as an example, instead of creating a client/server application between the database application on node A and the remote Internet application, the present invention distributes the application such that the TCP/IP connection need only exist on node C and is transparently accessed from the application on node A via the standard RPC (remote procedure call) function shipment paradigm. Function shipment basically ships functional execution from one location to another, which in this example, allows the cluster to off-load from nodes A and B all of the expensive TCP/IP processing cycles to nodes C and D; this should translate into higher system performance for nodes A and B. How this is accomplished within the STREAMS framework is discussed below.

1.3 Load Balancing

Load balancing is used to even out the work load among the nodes within the cluster. In the example cluster 20, load balancing can be utilized in two areas: the database server and the Internet gateway. For the database server nodes, if the database has been properly distributed, the probability of a request targeting each server node should be the same. Unfortunately, probability does not always reflect reality. If the requests create "hot spots" within the database, the cluster nodes will not be used equally and the aggregate cluster performance may be reduced. Similarly, if a disproportionate number of remote clients enter the cluster via Node C and not node D, then the cluster response time to client queries can be increased. In such situations, the solution is to impose load balancing policies and migrate applications, application instances, data, connections, or any combination of the above to lighter loaded nodes. The following example illustrates how this works using STREAMS-based pipes.

STREAMS-based pipes are bi-directional queues which can be used to send and receive data between two or more cooperating threads of execution. Examples range from the simple such as command-line processing to the complex where multiple clients are communicating with a server application via a well-known pipe end. Applications make use of STREAMS-based pipes instead of memory-based pipes when additional filtering or pre-processing of data is needed--memory-based pipes cannot have filtering modules pushed into the middle of the pipe to accomplish this work.

Within cluster example 20, the database server application can be composed of a set of cooperating threads that communicate via pipes. When the cluster becomes unbalanced, one solution is to migrate some component of the database or some of the database server instances to the other nodes within the cluster. In prior distributed environments, this approach means redesigning the application to utilize a different communication paradigm such as Sockets or to understand when a migration occurs how to switch to the new paradigm. Such redesigns are expensive and reduce the number of applications that can ported or are capable of taking advantage of the benefits of clustering--this is unacceptable to many customers. To solve this problem, the application can continue to utilize pipes (STREAMS-based pipes may be transparently substituted for memory-based pipes even if the application does not take advantage of the added functionality). When the unbalance is detected, either pipe end or the entire pipe and the associated thread of execution is migrated to a new node within the cluster. All of this occurs without having to modify a single line of code within the application.

1.4 High Availability

High availability is a commonly-used expression to describe a software or hardware solution which provides fault tolerant behavior without necessarily providing all of the complexity nor cost of most fault tolerant systems. Extending the ability to distribute or migrate STREAMS described above provides the ability to recover from most single point failures for software and hardware within the cluster. For example, in FIG. 1, if a remote Internet application were communicating to an application on node B via the node D gateway and the node D gateway failed, we could initiate a recovery mechanism on node C which would transparently handle all the application activity as though it had being doing so from the beginning. Similarly, if the application contained synchronization points on node A and node B, then if either node failed, the remote client application would continue unaware of the recovery occurring, i.e. it would continue to operate normally, never sensing that a problem has occurred.

Note: distributed STREAMS can be used to recover from some single points of failure, primarily those where the component is a stateless entity such as DLPI. Distributed STREAMS cannot be used to recover from single points of failure for state-maintaining components such as TCP in the event of node failure. In such a case, the remote client application would sense that their connection has gone away and would need to be restarted--this is tolerated by customers today, but might not be tolerated in the future. Perhaps distributed STREAMS could make use of the research and prototyped solution of Brevix, a HP Labs project. Brevix defines a mechanism to allow specific traps to invoke error handling routines instead of just crashing the system. These error handling routines could be used to migrate client applications, and, hence, STREAMS instances to other nodes within the cluster. If this research could be extended to include more traps and the usage of the "panic" function could be replaced with an evaluation routine which would determine if it is safe to migrate applications before taking the system down, distributed STREAMS could provide a valuable high-availability solution that would allow a vendor to differentiate itself in the clustering market.

Brevix is limited in the number of system traps it can handle. These are usually limited to trap 15--Data segmentation fault, and, I believe, traps 18, 26, 27, and 28, which deal with memory protection and unaligned memory references. The advantage of Brevix is that it can be performed on a per subsystem basis so a recovery mechanism would be able to determine with some confidence whether it is safe to migrate the components depending upon the subsystem being effected.

Many kernel subsystems will call panic() when they cannot make forward progress or something corrupt has been detected. If panic() were changed from only having a message string to having a policy parameter, then the recovery and migration could be done more intelligently. The policy parameter would include whether to consider the entire subsystem suspect, if it impacts other subsystems, and what are the steps to recovery.

1.5 Single System View

Since a cluster 20 is composed of multiple nodes, a cluster is inherently more complex and potentially difficult to manage on a node by node basis. If, however, we view the cluster, at least from the application's viewpoint, as a single system, then many problems become much easier to solve. For example, if the application is using TCP/IP and binds itself to a particular port and then that application wishes to migrate for any reason to another node, we must ensure that no other application has already bound that port on the target migration node or the migration cannot take place. The way to solve this problem is to prevent the possibility of the problem ever occurring by using a cluster middleware to manage a cluster-wide port-space. The distributed STREAMS design of this invention provides techniques, algorithms, and STREAMS framework modifications which eliminate the need to modify the existing modules or drivers and allows the applications to execute transparently on any combination of nodes without modification.

1.6 DLKM and Single System View Issues

If the operating system supports dynamically loadable kernel modules (DLKM), each node within a cluster can be self-configured to only load and execute those modules for the applications being currently executed. In a distributed STREAMS environment, this is accomplished by having a flat file accessible for each node which describes how to load the STREAMS subsystem. At this point, STREAMS can then bring over the necessary modules and drivers to allow a streams-based subsystem such as TCP/IP to be loaded and configured for the application being executed. From the single-system view, there must be a mechanism that allows each node to find the cluster-wide configuration data as well as node-specific configuration data.

For DLKM to function, each node will be required to contain a flat file which contains the following information:

A list of nodes from which the STREAMS subsystem can be loaded so the streams-init function can proceed down this list and communicate with via a well-defined connection management protocol. This protocol will allow the initiating node to determine which nodes, if any, are capable of downloading the subsystem and to perform the actual download.

The file will also contain a set of default STREAMS drivers and driver load parameters which should be loaded whenever STREAMS is loaded. At a minimum, the clone, strlog, and sad drivers should always be loaded. Though it could be argued that these are not needed until an application attempts to open the driver, nearly every STREAMS-based driver makes use of these drivers either directly or indirectly so it is faster and simpler to load them at initialization.

The file will also contain a list of STREAMS-based subsystems which may need to be loaded and their corresponding server nodes. The idea is to pre-stage sufficient information such that these subsystems do not require a flat-file of their own. This would work by altering str.sub.-- install() so that it can read this information via communicating with the server node and having it download the current data structures. The difference would be str.sub.-- install() would not actually load the subsystem; it would only create the needed STREAMS infrastructure such that a subsystem load could be accomplished faster and with less information, i.e. a new flat file. This might eliminate the STREAMS subsystems from having to create new load facilities since everything would occur via STREAMS. In addition, since this is a cluster, subsystem loading might have a HA quality to it.

1.7 Design Problems

This invention provides solutions to a number of design problems connected to distributed STREAMS and streams-based applications. These problems are listed to familiarize the reader with some of the issues that are particular to the cluster environment and will hopefully lead to a better understanding of how the proposed design works and the reasons why it is designed this way.

With a single system view of the cluster, there is only one set of device files that an application may open. The problem to be solved is how to determine which node is the right node to open a device upon since the potential corresponding target hardware or software may not exist on every node? In addition, how do we determine this node based on a device file name which does not communicate this information? Such a situation was illustrated in the previous hardware sharing example.

If different parts of a streams-based stack are to operate on different nodes, how are these components created and interconnected without requiring any component design modifications? This is a key goal to creating an open cluster environment that will encourage developers to port their modules and drivers to clusters.

If a stream is to migrate from one node to another for load balancing or high availability reasons or whatever, how do we maintain state and correctness and do so without modifying the components design (Note: for some cluster facilities, each component might require additional functionality in order to take advantage of these facilities; other facilities will have default behavior and require no modifications or additional code)? We must be concerned with STREAMS put and service routines which may be executing asynchronously to the migration effort.

For modules or drivers which utilize the DDI/DKI standard utility strlog(), how do we route messages to a single cluster log driver which an administrator may execute strace upon? Also, how do we distinguish between one nodes logging and another's logging, and do so without losing information or modifying the module or driver?

A number of STREAMS implementations (HP, Sun, OSF, Mentat, Unixware, etc.) utilize a number of different queue synchronization levels. These synchronization levels are key to ensuring correct queue access and operation. Within a cluster, how do we provide these capabilities and to what extent, if at all?

When creating streams-based multiplexors, how do we intelligently link the components together without creating unnecessary overhead and performance degradation? What do we do if one half of the multiplexor migrates to a new node and the other does not or cannot?

There are many more problems than these, but this should provide a basis to understand what must be solved before distributed STREAMS can be an effective solution within a cluster.

1.8 Design Rules

In evaluating a distributed STREAMS design, three design rules come to mind. The first rule is: any modifications to the STREAMS framework should not result in any performance degradation for the non-distributed STREAMS application. Too often, developers add a feature here and there to satisfy a subset of the customer base. Usually, no single feature costs much in terms of performance degradation, but over time, and an ever-expanding feature set, the sum of the performance degradations can add up to serious overall performance degradation which directly impacts the entire customer base. So, in adding distributed STREAMS functionality, the design must not add performance degradation that the entire customer base will experience. The temptation to spend a cycle here and there for convenience sake must be fought off whenever possible.

The second rule is: no STREAMS module or driver should require modification to work within a distributed STREAMS environment. This is key to getting third-party developers to develop their software on the target platform and to reduce the overall time and cost of developing, deploying, and supporting applications for the cluster environment. In the past, many cluster software developers have gone to such extremes as modifying the main paths of code to add checks to see if a cluster-specific operation must now be performed. This type of modification is not only unacceptable from a performance and cost perspective, but it restricts how clusters may be used and may reduce their appeal to customers. The only potential exception to this rule is that a module or driver may require additional functionality to be added in order to migrate an instance from one node to another. This functionality does not require modification of the existing code, but is added functionality which the STREAMS framework will utilize to migrate the module/driver's private data for which STREAMS normally remains ignorant. Also, module/driver functionality may be augmented using STREAMS Dynamic Function Replacement and Registration which are described in co-pending, commonly-owned U.S. Ser. No. 08/545,561, filed Oct. 19, 1995, and Ser. No. 08/593,313, filed Jan. 31, 1996, incorporated by reference herein.

The third rule is: to make the distributed STREAMS solution as middleware independent as possible. Wherever feasible, the design should confine the points and the circumstances when communication with the middleware takes place. This independence will yield more design flexibility and faster transition to new middleware technologies at a lower cost in terms of time to market, porting time, and overall product support.

2.0 Design Overview

Referring to FIGS. 2A and 2B, the basic architecture of a STREAMS stack is a stream head 30 and a driver 44 with zero or more modules 32 pushed on top of the driver. More complex stacks may be created using software multiplexors which connect multiple stream stacks into a tree-like structure which may now be accessed and treated as a single stack instance. Keeping this in mind, the distributed STREAMS design will be explained in step-wise refinement throughout the rest of this description.

At a minimum, the design of the present invention will include a controlling thread 34 and a physical cluster interconnect driver 36 (P-ICS) 36. In addition, there may exist one or more instances of a STREAMS-based driver called the software cluster interconnect driver S-ICS 38. Each of these components will be explained in detail in later sections, but first let's examine two potential design configurations and how they can be used to solve different cluster problems. These configurations are illustrated in FIG. 2A and FIG. 2B.

The P-ICS provides a high-speed interconnect link using a light-weight, low-latency protocol which may contain both software and hardware components. These interconnects, are, for all intents and purposes, virtual circuits. Virtual circuits offer a number of conceptual as well as implementation advantages, the least of which is the ability to move an instance from one to another transparently with respect to the application. In addition, virtual circuits eliminate the need to maintain protocol-specific information within the streams or physical interconnect drivers or within the STREAMS framework, which simplifies the overall design and implementation, improves performance, and increases code re-use and flexibility. Also keep in mind that the simple architectures shown in FIGS. 2A and 2B do not imply any limitations on the number of STREAMS which can be managed nor what cluster facilities may be made available to each stack.

In the first configuration, FIG. 2A shows a TCP/IP stack with multiple DLPI ATM drivers 40, 42 linked under the IP multiplexor 44, i.e. this is the standard configuration one might find in a STREAMS-based TCP/IP implementation. In addition, there are three other components illustrated. The first two are the controlling thread 34 and the P-ICS 36; the third is a set of preview functions 31, 33, 35 which can augment the TCP and IP behavior based upon what cluster facilities this stack will take advantage of. For TCP this could be a cluster-wide port management scheme, while for IP this could be a high availability set of functions to deal with error and card failure conditions; in either case, this is optional functionality which will require access to the controlling thread and the P-ICS. Note: all streams-based stacks will be known to the controlling thread when they are created so, at a minimum, all stacks should be able to migrate between nodes without modifying the non-clustered implementation, though the stacks will be required to provide a pair of functions for each component which has private data associated with it.

In the second configuration, FIG. 2B shows a TCP stack with multiple DLPI ATM drivers linked under the IP multiplexor, but in this case, one DLPI instance 42 is actually executing on a different node. For this to work, the S-ICS driver 38 is linked under the IP multiplexor 44 with its primary responsibility to route data from IP to the remote DLPI instance 42. This stack could have been created this way or may be the result of a high availability recovery scheme, for example, if a DLPI driver noted a card failure and sent a M.sub.-- ERROR message. This message will be intercepted by the IP augmented functions and the controlling thread will be informed and the recovery policy will be invoked. In this situation, the application never notices the card failure, but continues to operate as before. How this is accomplished is explained in a later section.

In both configurations, you will notice that there is no mention of a middleware entity since this is an optional component. In these configurations, the controlling threads have sufficient information to coordinate the different facilities and recovery mechanisms among all the nodes running this TCP/IP stack. Again, this coordination is explained in detail in later sections as the components are discussed and their interactions are noted.

2.1 Controlling Kernel Thread Overview

In order to transparently establish and migrate streams within a cluster there must exist a third-party entity which can communicate with the STREAMS framework, with other controlling thread instances, and with any, if present, cluster-wide management middleware. This is accomplished by creating one or more kernel management threads. These threads have, at a minimum, the following responsibilities/capabilities:

If a stream is being created normally as illustrated in the first configuration, the controlling thread will be responsible for noting the creation and informing the STREAMS framework to perform the function augmentation, if the stack has been configured to do so. The stack informs the STREAMS framework using str.sub.-- install(). The command str.sub.-- install() is used to install a STREAMS driver or module into a kernel. The parameters passed into this function define items such as synchronization levels, streamtab entries, etc. For clustering, we simply add a new parameter which will contain a set of policies that may be invoked. Since even a non-cluster port of a driver or module requires a recompile, the driver or module will automatically pick up a default set of policies without changing the implementation. The default policies should be to enable stack migration for load balancing and high availability.

If a stream is being created with its components existing on different nodes within the cluster, the controlling thread will be responsible for creating the components on the remote node. In the previous figure, the controlling thread would be responsible for creating the remote DLPI instance and a local stream head. If modules were needed to be pushed upon this driver instance, then it would also be responsible for pushing these modules.

The controlling thread uses the in-kernel STREAMS interface to communicate to stream instances on the local node. The preferred form of STREAMS interface is implemented in Hewlett-Packard Company's HP-UX 10.10 and 10.20 releases and described in accompanying Appendix A, incorporated herein by this reference. The reason to keep the controlling thread in the kernel is both performance and security. An in-kernel interface does not need to go through the excessive system call interface to communicate to its partners. As for security, this keeps all routing tables and management structures protected from other threads and snooping. Does this mean that the controlling threads could not be implemented in user-space? No, it does not and doing so would not require design changes for this solution since all that we are concerned with is the ability to send messages and not what the messages must go through to get there.

The controlling thread is responsible for establishing the S-ICS instance for the node. There may be only one instance for all controlling threads, in which case they share the stream head pointer address, or there may be multiples with each thread tailoring the information held within the S-ICS to the functionality it supports.

It is responsible for passing on stream head specific messages.

It is responsible for passing messages between the components and performing the necessary flow-control.

The controlling thread participates in stream migration.

If a node fails and part of a set of stream instance components were operating on that node, the controlling thread participates in error recovery which is associated with high availability solutions.

The controlling thread can communicate with the cluster-wide middleware to probe for information and to update the middleware on changes in what it is managing. This is one of two locations within the cluster which will have potentially detailed knowledge of the middleware and how to communicate with it. By limiting this information, the STREAMS framework remains independent of the middleware which allows more flexibility in working with and designing solutions for new middleware technologies.

How all of these things are accomplished is discussed in detail in the following sections.

2.2 S-ICS Overview

The S-ICS 38 is a key component in making the STREAMS framework design solution middleware independent. The S-ICS is a streams-based software driver which exists above the P-ICS, physical interconnect driver 36. The driver was chosen to be streams-based in order easily integrate into the STREAMS framework and because it offers some unique and standard mechanisms which simplify stream migration and recovery, not to mention that this allows the P-ICS implementation-specific dependencies to be isolated to a single point from the STREAMS framework perspective. At a minimum, the S-ICS 38 will have the following capabilities/responsibilities:

It will participate in a distributed stream component's creation.

It will be responsible for probing middleware and updating middleware about low-level STREAMS framework and application specific information, unless this information is stored within the controlling thread.

It will process messages to and from the P-ICS from the STREAMS framework, the controlling management thread, and from the middleware where applicable.

It will maintain potentially application specific caches of information to improve performance and simplify module and driver integration within a distributed STREAMS environment. These caches may contain information such as routing tables for packets which arrive at nodes which may not have the target application running.

These subjects will be explained in greater detail in the subsequent sections.

2.3 Component Operation Example

In order to illustrate how all of these components work together, we will walk through an example using what will most likely be the most common place distributed STREAMS will be applied: TCP/IP. This example is shown in FIGS. 3A and 3B in two alternative configurations. This section will not discuss how the stream is initially created, that's for later, but will concentrate on how it functions and what design solutions may be used to maintain the three design rules previously discussed.

2.3.1 Non-Split Stack TCP/IP

In the first configuration shown in FIG. 3A, the stream stack 30, 32, 44 is created normally from the stack's perspective but there is additional processing. During open, the controlling thread 34 is informed that a stack (FIG. 4) is being created. When the STREAMS framework open() code executes, the controlling thread took note of the stream head address and the drivers being opened and the modules being pushed. It then examines each driver/module to determine if there is a set of cluster policies associated. The controlling thread 34 then acts upon these policies by augmenting the stack with the appropriate functionality in order for the stack to be able to transparently take advantage of cluster-wide facilities.

Once the functions have been augmented, the controlling thread will only intervene or react based on what is forwarded to it and the policies set up. So, for the most part the TCP/IP stack operates exactly as it did in the non-cluster environment with the application seeing nearly the identical performance. It should be apparent that this configuration meets all design rules.

2.3.2 Split-Stack TCP/IP

When we distribute the stack, we may do so in two different ways with each having its trade-offs. The first alternative splits the stack between the TCP and IP modules. The benefits of this split are:

TCP maintains state and IP does not, so if we wish to migrate the stack to a new node, we only need to migrate TCP and not update IP which reduces the amount of migration work.

In addition, if the node executing IP were to fail, we can easily establish a new IP instance on another node within the cluster without effecting the TCP module or the application.

For a streams-based TCP, there will exist a TCP default queue for connection indications and such. The default queue may require additional code be added to route connect indications and such to the correct target node. Such a situation could occur if the application has been migrated to a new node. If the stack is split at this level, the code needed to perform this default queue processing could be re-used for the TCP/IP processing as well, at least in terms of determining where to route messages within the cluster.

The disadvantages are:

This alternative is not protocol implementation independent at all. In most cases, the complete modularity that on the surface appears to exist, in reality does not. If this split is utilized, the design must take into account any special forms of communication between these two modules and any pre-conceptions about private data and access to it. In other words, this alternative requires in-depth knowledge of the TCP/IP stack implementation, far beyond what should be necessary.

This design requires the S-ICS to be almost entirely protocol dependent instead of what should have been 95+% protocol independent.

The bulk of S-ICS cannot be re-used for other transport providers which means that the cost to develop and maintain a cluster based TCP/IP stack and other protocol stacks will be more than what I believe is acceptable.

In order to route messages, there must exist some mechanism which can contact the cluster middleware for route updates. Since TCP and IP are in the middle of the stream stack, they are not allowed to sleep and a request to a potentially different node would cause serious problems for the stack's execution, especially if the packet processing occurs while still on the interrupt control stack.

Splitting at this level results in many distributed stack instances within the cluster, i.e. one per connection.

The second alternative is to split the stack at the IP/DLPI level, keeping the TCP and IP modules together on each node at all times. This is illustrated in FIG. 3B.The advantages of this design are:

The design is 95+% protocol independent. We do not need to know much beyond understanding that a streams-based TCP will speak TPI (Transport Provider Interface) which is spoken and understood by a variety of other transport provider stacks--SNA, OSI, Netware, Appletalk, etc. which also all utilize the DLPI linked under their transport provider modules/drivers.

The bulk of the S-ICS can be re-used for other protocol stacks and potentially made sufficiently generic to be completely protocol independent. This leads to a lower cost solution, more flexibility in the transport stacks that may be ported to the cluster environment, and faster time-to-market of new technologies.

Depending upon how things are implemented, the number of distributed stacks will be less with the split at the IP/DLPI level. If IP is implemented as multiplexor, it will really have only one DLPI instance per I/O card. If the split is done at this level, then each remote DLPI will be linked (in reality the S-ICS will be the linked driver but will record sufficient information to understand which DLPI instance it is communicating with on the remote node for the correct/predictable message routing to be perform) under IP and any number of connections will flow through this single link. If the split is done at the TCP/IP or UDP/IP level, then there will exist one interconnect per TCP connection and this setup, which will occur more often within the cluster than with the split at the IP/DLPI level, will degrade cluster performance and increase migration costs.

DLPI is also stateless so if we wish to migrate a stack, the DLPI portion does not need to migrate.

If a DLPI instance were to fail, the TCP/IP portion could be migrated to a new DLPI instance. The new DLPI would just (ARP) the node and the remote so that the low-level address is understood. If the card is local to the stack, the new DLPI instance is simply informing a new or existing DLPI that it now has the following additional IP addresses assigned to it. If the DLPI is a remote instance, the fail-over uses the same process but now has the S-ICS being used as its DLPI instance. This could be accomplished by having all DLPI instances be linked within the cluster for each node or the configuration code could select which DLPI will be used as fail-over components--this is strictly implementation dependent.

The disadvantages are:

The S-ICS requires more thought and time to develop due its protocol independence but this effort is more than recouped by being able to re-use the same design for different stacks.

If the default TCP queue requires a route processing module, this will need to be created which adds cost. Of course, this code would just be a subset of the S-ICS code so the design could heavily leverage the S-ICS code and keep the costs within acceptable limits. This, of course, is the majority of the 5% protocol dependency that we may not be able to eliminate.

My recommendation is to use the second alternative shown in FIG. 3B based on examining the advantages and disadvantages. The next section discusses how one problem (global port mapping) associated with the single-system view of a cluster may be solved using the second alternative and how this solution may be applied to other protocol stacks.

Note: There are three numbers (1, 2, 3) marked with FIGS. 3A and 3B. These numbers refer to the following comments on FIGS. 3A and 3B concerning potential performance optimizations that could be employed if additional precautions occur.

1. If one can envision a cluster with multiple versions of alternative 2 (FIG. 3B) running and then add in the possibility that applications which access the TCP/IP stack portion can migrate from node to node, there will exist the possibility that packets could be routed to a node before all cluster-wide routing has been updated to reflect the new locations of stacks. In such a case, this would be found out when the IP uses its fan-out table and determined that the correct TCP or UDP instance is not on this node. In such a case, the packet would need to be routed back through the S-ICS and sent to the new node. Since IP would have already processed the packet, there is no reason to repeat this processing nor to worry about any effects/recovery that processing might present to the stack if we were to just reroute it to the correct node and back into IP. So, due to the controlled nature of the cluster, the packet may be successfully sent directly to the specific TCP or UDP instance on the new node. As stated, this eliminates any IP reprocessing problems while also improving the overall stack performance. How this rerouting is accomplished is discussed in just a moment, but briefly the fanout table will contain a set of TCP/UDP target queues. This is accomplished by changing the TCP or UDP target queue (read queue) to be the S-ICS write queue and when the putnex() is executed, it would invoke the S-ICS put routine instead of the TCP or UDP put routine. This removes any need to scan the message or modify IP code; it does require understanding this fundamental IP implementation design and requires the generation of code which can correctly modify this fanout table (this is the major place where we actually have to understand how a streams-based IP is implemented which is also why the design can only achieve, at most, a 95% protocol independence).

2. As with the previous , the first alternative does lend itself to place messages directly from the S-ICS to and from IP as a performance improvement. This, again, requires implementation dependent knowledge. In addition, it requires the implementation to deal with the possibility that the S-ICS determines that the remote node has gone down and might require additional preventative code be added.

3. Again, as with 2, it might be possible to improve performance by pushing messages directly to and from S-ICS to DLPI. The same concerns apply as before. In either case, the fact that messages would not need to flow through the stream head nor a potential context switch to the controlling kernel thread yields a good performance boost. This performance versus added complexity and preventative code can only be made on a protocol-by-protocol implementation basis. I recommend considering such a optimization if possible.

FIG. 4 illustrates how packets are routed if the second alternative of FIG. 3B is implemented (a more detailed explanation is found within the S-ICS Routing section). The controlling thread 34 accesses the local S-ICS 30 and the local stack instance 30, 32, 44 by using a hash function to determine the stream head address which it stores in its local data structures 46. The local stack instance also has a pointer 48 to the managing S-ICS instance 38 which it can then redirect messages directly to without going through the controlling thread. Similarly, the S-ICS does not need to access the controlling thread to send data to the DLPI driver 42.

2.3.3 Global Port Mapping

In order to create the illustrated configurations, one may implement a global port mapping facility. This facility would control the assignment of TCP ports throughout the entire cluster instead of just on a per-node basis. One way this could be implemented is to simply divide up the port space with port-space-size/number-nodes-in-cluster ports per node. While simple to implement, it does not offer sufficient flexibility nor does it adapt well to what each node within the cluster may be executing.

A more optimal solution is to create a cluster middleware control thread 50, as shown in FIG. 5, which maintains a list of what ports are active, what are available, and potentially where the ports are actually in use within the cluster. This approach allows the number of ports per node to fluctuate based on need while still allowing an application to migrate to any node within the cluster without having to worry about a duplicate port being in use. Note: FIG. 5 illustrates both the non-split stack and the split stack operations. A TCP or UDP instance could be executing over a local DLPI link 40 or over a remote DLPI link 42 and the algorithm is essentially the same. Side note: For a TCP/IP cluster implementation, it might be best to just link all DLPI instances under each IP at cluster initialization. This would allow faster recovery should a card fail and the S-ICS on each node will automatically know how to route the bulk of the packets between the nodes when migration occurs, which eliminates having to update the S-ICS and might eliminate the need for route failure error recovery.

For each node within the cluster, the following setup will occur. The Middleware may exist on a separate node but is shown in FIG. 5 to illustrate that the S-ICS may probe it for packet routing if its data cache does not have sufficient information. The S-ICS, of course, always maintains the option of simply dropping packets. If the S-ICS routes to another node, then the packet will be placed directly on the TCP or UDP read queue and will not go through IP again.

A packet coming into IP would flow as follows: The packet would arrive at the IP lower mux. IP would examine the packet and utilize the IP fanout table 52 to determine which queue to send it to. The fanout table is nothing more than a <port, queue> mapping function. If the packet is bound to a local TCP or UDP instance, then IP will putnext(tcp-queue, mp) or putnext(udp-queue, mp) directly. If the packet is not for a local endpoint but has a fanout table entry, the packet will be putnext(S-ICS-0 queue, mp). If there is no fanout table entry, then IP will either drop the packet or it may forward the packet to the S-ICS lower mux if IP forwarding has been enabled. The S-ICS would examine the packet for embedded routing information and then reference its data cache to determine the route. If S-ICS is a mux, then it may also use the knowledge of which S-ICS instance sent it the data to find a route.

In essence, there are three components to this solution: a set of augmented scanning functions, a controlling thread to manage this effort, and a middleware thread though the middleware thread could, in fact, be the controlling thread since the controlling threads are involved in nearly all aspects of management. The only disadvantages of doing so would be that the controlling threads might not be as protocol independent as one would like and it might be simpler and faster to design middleware threads which are specific to the protocol stacks involved while the controlling threads are kept completely protocol independent. Regardless of how this is actually implemented, the associated functionality and high-level implementation is described below.

1. When a transport endpoint is created, in this example either a TCP or UDP endpoint, we use STREAMS Dynamic Function Replacement as described in U.S. Ser. No. 08/545,561 to augment the put, service, and close routines associated with both the read and write queues (see a later section as to how this is initially set up). These functions are augmented to preview messages which are being sent to and from the stack. For the put and service routines, we are interested in M.sub.-- PROTO messages which may contain TPI T.sub.-- BIND.sub.-- REQ and T.sub.-- UNBIND.sub.-- REQ messages for the write queue function and T.sub.-- BIND.sub.-- ACK, T.sub.-- UNBIND.sub.-- ACK and T.sub.-- ERROR.sub.-- ACK messages for the read queue function. For the close routine, we are interested if it is invoked and the endpoint has not been unbound yet, in which case, we have some work to do. These functions scan for a particular message type or activity and then take action if the conditions are met. By using STREAMS Dynamic Function Replacement instead of a module which has been pushed onto the stack (this is an alternative to using this technology), we eliminate the extra set (read and write) of put routines being invoked on every message which improves performance. Note: all other messages are immediately processed using the original put or service routines, so the performance cost is the function call overhead and the single if statement within the given preview function.

2. The controlling thread, which is the same thread that has been discussed previously, optionally works with the middleware and works with the augmented functions. Continuing with the example, the endpoint has been opened and the functions have been augmented. When the application sends a T.sub.-- BIND.sub.-- REQ TPI message to the TCP/UDP module, the augmented write put routine senses this occurring and temporarily redirects the message to the controlling thread. After message redirection, it returns to allow processing to continue. This is possible because the application request will really not complete until it has received either a T.sub.-- BIND.sub.-- ACK or a T.sub.-- ERROR.sub.-- ACK TPI message is returned to the application--this is an asynchronous event and all TPI implementations and transport providers support this behavior.

3. The controlling thread extracts the bind information, in this case the associated port information, and probes the middleware thread which may be executing on a different node within the cluster.

4. The middleware thread examines the bind information and determines if the specified port is available or not. If the specified port is not available, it returns an error to the controlling thread which will take appropriate action. If the port is available, then it marks this port as "in use" and then informs the rest of the cluster, i.e. the rest of the controlling threads within the cluster, that this port is now in use and by which node.

5. Rather than just have each controlling thread maintain this knowledge which makes it implementation dependent, we do something different. Each node's controlling thread issues a bind request to IP, which should succeed. For this bind, we augment the fanout table <port, read queue address> to actually have the read queue address be the S-ICS write queue address. The S-ICS will also be informed of the bind also and will update its data cache to reflect this new routing information. So, in essence, each node now has an IP instance which has the bound port address and this address will be used to route any packets which arrive at a node which the application is not currently executing upon (potentially due to application migration or recovery should the DLPI component's node failing at some point and a new instance is started on a different node).

6. Once the cluster has completed the bind operations on each node, the middleware thread will inform the controlling thread of success or failure. If successful, then the controlling thread will use the in-kernel STREAMS interface to put the original bind message on the write queue of the transport module--TCP or UDP. This module and its IP instance will perform the necessary bind operation and will generate either a T.sub.-- BIND.sub.-- ACK or a T.sub.-- ERROR.sub.-- ACK. If a T.sub.-- ERROR.sub.-- ACK occurs, then since we already know we were processing a bind operation, we inform the controlling thread that the bind failed and it will inform the middleware thread which will unbind all of the other nodes instances for this port address. If the bind succeeded, then the stream stack operates normally--whether its components have been distributed or not, i.e. we can apply the above technique to a stream which is created normally and has all of its components executing on the same node.

7. At some time later, the application will either close the stream instance or issue an unbind operation. When either of these operations occurs, the augmented routines will sense this happening and take action. If the application closes the stream stack, the close routine just sends a message to the controlling thread which will note the activity and inform the local S-ICS instance to clean up its cache. In addition, the controlling thread will inform the middleware thread which will inform all other nodes within the cluster and issue an unbind operation to the appropriate IP instance. This will also result in each nodes S-ICS instance cleaning up its data cache. If the application had issued an unbind request instead of a close, the above algorithm would be the same except that the augmented function would sidetrack the message just like the bind operation did and wait until the controlling threads on all nodes had performed the unbind operation before continuing just like before. Sample bind preview code:

    ______________________________________
    tpi.sub.-- bind.sub.-- w.sub.-- preview(q, mp)
      {
      union T.sub.-- primitives *tpi;
      if (mp>b.sub.-- datap->db.sub.-- type != M.sub.-- PROTO)
      (q*->q.sub.-- qinfo->qi.sub.-- putp)(q,mp);
      else {
      tpi = (union cast it)mp->b.sub.-- rptr;
      if (tpi>PRIM.sub.-- type == T.sub.-- BIND.sub.-- REQ .vertline.
    .vertline.
      tpi->PRIM.sub.-- type == T.sub.-- UNBIND.sub.-- REQ) {
      /* Route the message to the controlling thread*/
      route.sub.-- msg(tpi.sub.-- ct.sub.-- endpoint, mp, write.sub.--
    queue);
      } else
      (q*->q.sub.-- qinfo->qi.sub.-- putp)(q,mp);
      }
      }
    ______________________________________


Within the controlling thread, it will be listening to multiple communication points and one of these will be the one associated with this activity. The controlling thread performs the necessary tasks as described and then invokes a streams.sub.-- putmsg() to place the message on the correct queue. If the operation fails, then the controlling thread will create a T.sub.-- ERROR.sub.-- ACK and perform a putnext(tp.sub.-- read.sub.-- queue, mp). If the operation on the cluster succeeds, then the controlling thread performs a putnext(tp.sub.-- write.sub.-- queue, mp) of the original message. On the read side, the preview function would be:

    ______________________________________
    tpi.sub.-- bind.sub.-- r.sub.-- preview(q,mp)
      {
      union T.sub.-- primitives *tpi;
      if (mp>b.sub.-- datap->db.sub.-- type != M.sub.-- PCPROTO)
      (*q->q.sub.-- info->qi.sub.-- putp)(q,mp);
      else {
      tpi = (union cast it)mp->b.sub.-- rptr;
      if (tpi->PRIM.sub.-- type == T.sub.-- ERROR.sub.-- ACK) {
      /* The bind operation failed, so inform the controlling thread
      *so it can perform any clean up operations.
      */
      route.sub.-- msg(tpi.sub.-- ct.sub.-- endpoint, mp, read.sub.-- queue);
     } else
     (*q->q.sub.-- qinfo->qi.sub.-- putp)(q, mp);
     }
     }
    ______________________________________


For close processing, the following augmented close function would be executed:

    ______________________________________
    tpi.sub.-- close(q, mp)
     {
     mblkP mp;
     /* Create a close message to inform the controlling thread */
     mp = create.sub.-- close.sub.-- msg();
     route.sub.-- msg(tpi.sub.-- ct.sub.-- endpoint, mp, read.sub.-- queue);
     /* Await a message from the controlling thread indicating the cluster
     is cleaned up */
     recv.sub.-- msg(q, mp);
     /* Invoke original close routine */
     (*q->q.sub.-- qinfo->qi.sub.-- putp)(q, mp);
     }
    ______________________________________


This function will perform a handshake operation with the controlling thread. The controlling thread will inform the other controlling threads that the port is freed and that all S-ICS should have their route tables updated to reflect that a close is occurring and the given queue is no longer valid. If there is a race condition, the S-ICS will contact the controlling thread which should see that the port is now available so it will tell the S-ICS to drop all messages for this port for the moment.

2.4 Port Map Optimizations

In the previous discussion, the controlling thread was used to pass the bind request to the middleware thread and return the result. While this message passing does allow the controlling thread to remain protocol independent, it does somewhat slow down the port mapping facility and if there are many connections being made, such as a web browser, then the speed of establishing these connections becomes important. To solve this problem, the controlling thread could become somewhat protocol dependent by having it recognize that it is performing a bind operation and it should invoke a protocol-specific handler function.

In our example, a TCP/IP handler function is defined as follows: There are 64 K ports available for an application to use. Of these ports, approximately 5 K are reserved for applications which allow the protocol to perform the port determination. The handler function would take this into account to make a local optimization. If the application requested any port, then it would allocate ports from a local pool of any ports which were divided among the cluster gateway nodes. In this case, there is no need to coordinate the port assignments between nodes and the remote, just need to note which port is now in use and establish a late bind, i.e. we perform lazy binds throughout the cluster to handle the routing issues if necessary. I say necessary since the application may provide a "life" duration via an option which it may set during initialization. For example, a web browser will make potentially many connections per multi-part document and each connection will be short-lived. A good browser would inform the transport that these connections will be short-lived which clustering can take advantage of to reduce the port management overhead. In the event that a node exhausts its pool of local "any address" ports, it posts a request to borrow ports from other nodes, or invokes a protocol to rebalance the available ports within the cluster.

For the rest of the port space, the controlling threads must coordinate port assignment and routing information as well as clean up on connection unbind or close. The following high-level algorithm illustrates how all of this will work:

if the bind address is for "any address"{

if the current node does not have an available port{

Initiate a group notify operation to rebalance the port space for which the initiating node will be guaranteed to obtain at least one port if there are any ports available within the cluster.

If no port is available, then inform application of bind failure by issuing a T.sub.-- ERROR.sub.-- ACK with a port in-use error condition by performing a putnext(OTHERQ(transport provider write queue), mp). This will prevent the local transport provider from ever seeing the bind request.

If the connection is not known to be short-lived or there is a policy to always inform all nodes caring about ports of all bind-related activity, then inform the remote nodes of the port and node address so they may take appropriate action. This should be accomplished using a reliable multicast protocol.

} else {<for a specific port>

We need to coordinate the port assignments among all nodes which care about ports. To do this we can use a group notify operation. A group notify allows a low-latency packet to be reliably sent to a set of nodes and each node will respond back to the initiator. In this example, the request would be the requested port address and the associated node; the response would be the acceptance or rejection of this request. When a node returns an accept, locally, it will: (1) mark the port in-use, (2) if the connection may be long-lived, then it will create a bind instance to update the fanout table, and (3) it will update the route information. If a node returns a reject, the requesting node will assume that the port must be in-use. No further assumptions about the rejecting node execution may be made.

}

In this scenario, there is a potential race condition since two nodes may attempt to bind the same port address. This is indicated by the group notify showing only partial success for each initiating node. If this occurs, then the implementation may optionally fail the bind operations or it may perform a random back-off algorithm and re-attempt the bind operation. A random backup can be accomplished by using the same random number generator but with a node-specific key to allow some predictability. This should allow one application to successfully bind and the other to fail.

Upon failure, the initiating node will inform those nodes which acknowledged the port acceptance of the failure and they will perform the necessary clean up. If the interconnect is unable to state who failed the node, then the initiating node must inform all nodes that it does not have the port bound and the remote nodes will examine the initiator address and the port to determine if clean up is necessary.

If there was partial acceptance, then a race condition exists between one application and another for binding to the same port. If this occurs, there are two possible solutions:

Each node could examine the ratio of accepting versus rejections, and if a node has a majority of nodes accepting the connection, then it should win the port. The loosing node will issue a T.sub.-- ERROR.sub.-- ACK with a port in-use error condition to the application and a error message to the nodes which accepted it as the port owner to clean up their tables and any allocated resources. The winning node will wait a sufficient amount of time to inform the other nodes, i.e. (N*time to process error)+(N*time to send error clean up completed messages), and then will initiate the bind request to the originally failing nodes.

The other alternative is to have each node inform all accepting nodes that the bind failed and to clean up. Then each node will wait a node-specific random amount of time (use a random number generator with a node-specific key so there is a predictable pattern for each node that can be re-created if need be for debugging or testing purposes). When the timer pops, the bind operation is initiated again and the results are checked. If there is another collision, then utilize a maximum collision counter to determine if another attempt should be made. While this could result in both applications failing to bind, the probability should be very low.

In case it isn't obvious, TCP/IP needs to have a larger port space if a cluster is going to handle thousands of connections simultaneously. This would require a protocol change approved by the IETF. Hopefully, something can be done in the future to remedy this problem but until this occurs, there may be an artificial limit to the number of connections a cluster have at a time.

2.4.1. Further Port Map Optimizations

It may be possible to further optimize the port mapping solution by performing binds only when an application has migrated. In the example cluster, there is at least one IP address per DLPI driver which means that there is no possibility that packets might arrive at other nodes or interfaces other than the one that the application is currently using. This holds true for the connection lifetime unless the application migrates to a new node. In such a case, the S-ICS must create a new forwarding route and the local IP fanout table must be updated to have a <port address, S-ICS write queue> entry so that any packets which arrive at this node will be forwarded to the new node. If all of this holds true, then, aside from informing other nodes of what port is in-use, there should not be a need to create a fan-out table entry for each bound application nor to perform additional cleanup upon unbind or a close operation other than informing the cluster that the port is now available for use.

2.5 STREAMS Dynamic Function Replacement Usage

Within a cluster, a stream stack instance may operate in the following modes:

The stream stack might not be aware that it is part of a cluster at all. It may operate just as it did on any stand-alone node and not be aware of nor being necessarily capable of directly taking advantage of the cluster facilities, though something such as migration may be possible.

The entire stream stack may execute on a single node but may be aware that it is part of a cluster and is capable of taking advantage of cluster facilities such as the global port mapping facility previously described. This does not imply that the stream stack's implementation has been directly modified, only potentially augmented.

The stream stack's components may be executing on two or more nodes within the cluster such as described in the TCP/IP/DLPI splitting example.

For the later two modes, it will be necessary to augment the different stream functions associated with each module or driver. STREAMS Dynamic Function Replacement allows either all instances of a module or driver to be augmented or the augmentation may be performed on a per stream queue basis. For the example global port mapping facility shown in FIG. 5, it is probably easiest to augment the functionality at the (TCP or UDP) module level by using an application (potentially the controlling thread) via the SAD driver to perform the function replacement steps--the details of how this is accomplished are given in U.S. Ser. No. 08/545,561. This change could be accomplished as part of system/cluster initialization.

On the other hand, we might have a cluster facility which does not want all instances of the stream stack to be modified at all times. In such cases, we would need to apply the augmentation at a later time based on some cluster-specific information. To accomplish this, we could just augment the open routine for all instances of a module or driver. The new open would be able to probe (such probes are allowed since a STREAMS open routine is allowed to sleep) the cluster for specific information and determine whether the entire stack or a particular module/driver within the stream stack being created required further augmentation.

An example of this would be the TCP/IP/DLPI stack being split at the IP/DLPI level, as shown in FIG. 3B. This example shows a potential performance improvement where messages are sent from DLPI to the S-ICS directly without needing to go through the local stream head and then being redirected by the controlling thread to the remote node. This may be accomplished in two ways: (1) the DLPI read queue q.sub.-- next field might be redirected to point to the S-ICS write queue--such redirection would complicate the S-ICS implementation but it is possible; (2) the DLPI stream head read queue put routine would be augmented to perform the redirection after providing some previewing of the data. The second way is the preferred way for the following reasons:

No queues are redirected or altered which keeps the basic functionality and flow of messages the same, i.e. the messages flow from the DLPI driver to the stream head just like before, and reduces potential support and debugging issues.

If the first way is implemented, the S-ICS would need to determine if a message should be reflected or redirected to the controlling thread for processing. Some messages such as a M.sub.-- SETOPTS might necessitate modifying the local stream head parameters as well as the remote stream head's parameters. This not only complicates the S-ICS design and implementation, but it also increases the amount of protocol-specific information that the S-ICS must know.

Using the second way allows the controlling thread to select the type of previewing to be performed on a per module/driver basis. This could be implemented in two ways: the cluster designer could write classes of previewing functions based on where stacks are split such as a general DLPI class, or each module/driver developer could define a protocol-specific stream head preview function. In either case, the previewing code would be focused on the immediate needs of the stack which allows faster design and development and greater flexibility while keeping the controlling thread fairly protocol independent. An example is if the DLPI driver detects a hardware problem and it will generate a M.sub.-- ERROR message. If the DLPI stream head preview function does not require anything special be invoked to handle this condition, the message would be forwarded just like any normal message to the S-ICS. But, if the cluster designer has decided that for the DLPI class of drivers or just for this particular driver type such as DLPI over ATM, the controlling thread must be involved to perform some for high availability recovery scheme ala migrating the DLPI instance to a new card or even a new node, then the preview function would invoke the stream head put routine and give the message to the controlling thread. The controlling thread would then decide if it even needs to forward the message to the application based on what it has been configured to perform for this message type and this class of stream module/driver.

For messages which would never require STREAMS framework action such as M.sub.-- DATA or M.sub.-- PROTO, the preview function would automatically execute putnext() on the S-ICS write queue. This avoids having to enqueue the messages on the stream head and have the controlling thread be context switched and examine the message only to find out it didn't need to be involved.

A preview function could be coded as follows:

    ______________________________________
    dlpi.sub.-- sth.sub.-- read.sub.-- preview(q, mp)
     {
     switch(mp->b.sub.-- datap->db.sub.-- type):
     case M.sub.-- HANGUP:
     case M.sub.-- ERROR:
     /* The controlling thread may initiate local ioctl operations so these
     * acks must be routed through it even though the it may just
     * forwardthe request.
     */
     case M.sub.-- IOCACK:
     case M.sub.-- IOCNAK:
     case M.sub.-- ERROR:
     /* Invoke the standard stream head read put routine and
     * allow the controlling thread to decide what to forward or to
     *invoke a recovery scheme for the M.sub.-- HANGUP/M.sub.-- ERROR
     *case.
     */
     (*q->q.sub.-- qinfo->qi.sub.-- putp)(q, mp);
     break;
     case M.sub.-- DATA:
     case M.sub.-- PROTO:
     case M.sub.-- PCPROTO:
     default: /* Any message type not listed is automatically
     forwarded */
     if (canput(q->q.sub.-- ptr->s.sub.-- sics->write.sub.-- queue)
     putnext(q->q.sub.-- ptr->s.sub.-- ics->write.sub.-- queue, mp);
     else
     /* The upper S-ICS mux instance is flow-controlled, deal
     with it */
     initiate.sub.-- flow.sub.-- control.sub.-- recovery(q, mp);
     }
     }
     }
    ______________________________________


The s.sub.-- ics pointer would be stored within the private data structure (q.fwdarw.q.sub.-- ptr) which defines the stream head associated with this DLPI instance. The s.sub.-- ics pointer is the stream head address of the S-ICS associated with this instance and the message will be placed upon the S-ICS write queue which will send it along to the remote node. Note shown in this sample function is how the routing information is embedded within the message.

3.0 Controlling Thread Design

In a previous section, an overview of the controlling thread 34 was given. This section will flesh out this thread in greater detail with the caveat that the final design details may be implementation specific depending upon: the target operating system, the STREAMS implementation, and the platform distributed STREAMS is implemented upon. This section will further clarify the high-level-design, how the thread is created, the data the controlling thread should track, and how the controlling thread will interact with the other components discussed in this solution. The controlling thread design and interactions for distributed stream creation, stream migration, and policy management are detailed in later sections since it is only one part of the solution.

Note: In a previous sections, the controlling thread 34 was shown operating directly over the P-ICS and the S-ICS. These two drivers have completely different interfaces and potentially different operating characteristics--one could be sender-managed while the other might be receiver managed communication. Such differences could lead to increased design complexity and potential performance trade-offs. If the differences are large, then I recommend that the controlling thread, while viewed as a single thread of execution with a shared set of data, be implemented using two threads. One thread would manage the STREAMS-related activity while the other managed the cluster management activity such as the port space and device driver management. This implementation will increase the protocol independence of the STREAMS-related drivers and code.

3.1 Controlling Thread=Middleware

One question that comes to mind is: "Can the controlling thread 34 replace the middleware component 50 within a cluster?". The answer to this question is "yes". The controlling threads basically become a distributed middleware implementation in addition to the stated functionality. This has already been illustrated in the global port mapping optimization discussions (FIG. 5) and, within the subsequent sections, the necessary enhancements to handle this functionality will be noted wherever possible.

3.2 Creation

The controlling thread 34 is created as follows:

If the controlling thread must be executing before the system completes its initialization, then the thread must be created as part of the standard system initialization effort. This may be the case if the node is being self-configured based on what the system needs. For example, if the system must load a particular networking subsystem and this subsystem is part of or relies upon the cluster facilities, then the controlling thread must be operational before this subsystem is initialized.

If the controlling thread is not needed to bring up the system, then it can be started via a cluster start-up command at any time with the following caveat: If a cluster facility involves a subsystem which may be brought up at system initialization, then either these subsystem instances are not affected by clustering, or, if they are, they must be converted in some fashion that the controlling thread is capable of communicating with them. An example of this would be a node which has NFS services executing which have bound a set of ports. Assuming that the NFS is operating over a streams-based transport provider, then the port knowledge must be communicated to the middleware thread and the appropriate ports must be bound on the other nodes within the cluster in order to ensure that packets are properly routed to this node or, if the application migrates, may be correctly forwarded. In addition, the function replacement activity previously described for the put, service, and close routines would need to take place.

In general, the controlling thread should operate as a real-time thread of execution in order to ensure that the cluster facilities that utilize this thread receive timely responses to requests. Typically this is done as part of creation, i.e. how the thread is started or via a set priority command.

The death of this thread will result in STREAM-related clustering activity ceasing on this node unless some steps are taken at creation time to allow the thread to be restarted. These steps are: (1) A well-known data structure is allocated within the kernel and its address is stored in a kernel global variable. (2) Within this data structure, the controlling thread must cache the following information: the address of all stream heads which have been opened or assigned to this controlling thread, the address of the middleware interface device (the structure that we communicate to the middleware through such as a file pointer or interconnect descriptor), and any policy information which has been communicated to the thread. (3) If sender-managed communication is used to communicate to the middleware threads, there will also need to be some mechanism for preserving the user-space structures. This may entail informing the cluster connection management subsystem how to preserve this data and how to restore it for a new controlling thread.

The node structure will depend upon how nodes are addressed and how routing is accomplished within the cluster. The proposed structure contains what are viewed as essential fields--others may be added as the implementation is performed. The given flags are the minimums needed.

    ______________________________________
    #define NODE.sub.-- INACTIVE
                     0x0000
      #define NODE.sub.-- ACTIVE 0x0001
      #define NODE.sub.-- IN.sub.-- ERROR 0x0002
      struct node.sub.-- t {
      int32 node.sub.-- flags; /* Remote node's status flags */
      node.sub.-- address.sub.-- t node.sub.-- address; /* Remote node's
                     address */
      node.sub.-- route.sub.-- t node.sub.-- route; /* Route from local node
                     to remote
       node */
      struct node.sub.-- t *next, *prev; /* Next/prev nodes within the list
                     */
      #define NODE.sub.-- HASH.sub.-- SIZE 1024
    ______________________________________


The following structure contains structure types that are P-ICS specific, i.e. it is not possible to illustrate the contents of these structures without knowing what the P-ICS is capable of and how it wishes to be communicated to. At a minimum, the structure should contain an endpoint definition sufficient for message processing--should include status flags and generic function vectors which may be invoked blindly by the S-ICS to invoke an operation--i.e. the S-ICS stays as ignorant of the P-ICS implementation as possible, a migration policy if the S-ICS needs to migrate from one P-ICS instance to another, a recovery policy to define how to recover from P-ICS errors or interconnect failures, etc.

    ______________________________________
    struct p.sub.-- ics.sub.-- endpoint.sub.-- t {
      uint32 p.sub.-- ics.sub.-- state;
      uint32 p.sub.-- ics.sub.-- flags;
      int32 (*p.sub.-- ics.sub.-- memory.sub.-- alloc)();
      int32 (*p.sub.-- ics.sub.-- memory.sub.-- free)();
      int32 (*p.sub.-- ics.sub.-- open)();
      int32 (*p.sub.-- ics.sub.-- close)();
      int32 (*p.sub.-- ics.sub.-- attach)();
      int32 (*p.sub.-- ics.sub.-- detach)();
      int32 (*p.sub.-- ics.sub.-- ctl)();
      int32 (*p.sub.-- ics.sub.-- send)();
      int32 (*p.sub.-- ics.sub.-- recv)();
      int32 (*p.sub.-- ics.sub.-- misc)();
      };
      typedef p.sub.-- ics.sub.-- endpoint.sub.-- t P.sub.-- ICS.sub.--
    ENDPOINT;
      struct p.sub.-- ics.sub.-- data {
      P.sub.-- ICS.sub.-- ENDPOINT pics.sub.-- endpoint;
      P.sub.-- ICS.sub.-- MIGRATION.sub.-- POLICY p.sub.-- ics.sub.--
    migration.sub.-- policy;
      P.sub.-- ICS.sub.-- RECOVERY.sub.-- POLICY p.sub.-- ics.sub.-- recovery.
    sub.-- policy;
     }
    ______________________________________


p.sub.-- ics.sub.-- endpoint defines the P-ICS endpoint and the vector of functions representing the set of operations possible.

p.sub.-- ics.sub.-- migration.sub.-- policy is a policy structure which defines what the policies are, when to invoke them, and a set of function vectors to execute the policies.

p.sub.-- ics.sub.-- recovery.sub.-- policy is a policy st