Sunday, December 31, 2017

Effectiveness of Replication Strategies

In [Noe 86] a simulation study for the comparison of available copies, quorum consensus, and regeneration was carried out to determine which replication protocol was the most efficient given a specific configuration of distributed system, and a certain set of failure characteristics.

The model was programmed in SIMULA [Birwhistle 73], and assumed a local area network consisting of a number of separate computers interconnected by a communications medium such as an Ethernet, with no communications failures. The parameters used in the simulation, such as crash rates and node load, were obtained from studies of existing distributed systems and from mathematical models, and all parameters were the same for each replication protocol simulated. Crash frequency varied between 100 and 300 days, with repair times having a mean of 7 days. The number of replicated resources ranged from only one copy to having three copies, and the ratio of read requests to write requests varied from a probability of 0.3 up to 0.7, with request frequencies varying from between 50 and 400 requests per day. The number of nodes in the system also varied from 10 to 30. All measured results were taken over a simulated time of 2 years of operation.

Simulation Results 

The quantities calculated from the results were the read and write availability of the replicated service. The read availability was defined and calculated as the total number of successful read requests divided by the total number of read requests. Write availability was similarly defined in terms of write requests.

What was found from the results was that replication provides a significant increase in availability. However, there is little point in going beyond a maximum of two copies. Both the Available Copies and Regeneration techniques provide a substantial increase in availability, raising the value of read and write availability very close to 1.0 i.e., whenever a request is performed upon a replicated resource it will be carried out successfully. There is very little additional gain with either of these protocols in having a maximum of 3 copies of each resource.

The Voting protocol provided less protection than either of the other protocols and would not even be considered until a maximum of 3 copies were used. In such a case the optimal size for a read and write quorum is 2; with a write quorum of 3 the replicated resource performed worse than in the non-replicated case because there are three ways to lose a single copy and destroy the write quorum.

Both Available Copies and Regeneration are preferable to Voting if network partitions are rare, or if measures are added to prevent or reconcile independent updates during partition rejoining. The read and write availability of the Available Copies technique are the same, and remain relatively constant despite changes in the request rate and the number of nodes. Regeneration can be preferable to Available Copies in an unstable environment that suffers from high crash frequencies, with a high number of updates and frequent reconfiguration of the network. Further, Regeneration can equal or surpass the performance of the Available Copies technique only if enough additional storage is supplied to allow regeneration. 

Optimistic and Pessimistic Consistency Control

The replication protocols described previously can all be considered pessimistic with regard to consistency of data in the presence of network partitions. Until a partitioned network is reconnected, it is impossible for nodes on one side of the partition to differentiate between being partitioned and a failure of the nodes on the other side of the partition. As has been described in previous sections, this can have an adverse affect on replicated groups which have also been partitioned, unless some method is provided to ensure that update operations can only be performed consistently on the entire group. Typically, these replication protocols are used in conjunction with atomic actions, and in the event of a network partition either only one partition (in the case of Voting) or no partition is allowed to continue to progress, meaning that any atomic actions that were executing must be aborted to maintain consistency of state between the partitioned replicas. They are pessimistic, using the principle that, if it is not possible to tell definitely that replicas have failed then it is better to do nothing at all. Those protocols which can operate correctly in the presence of a network partition (still maintain consistency of replicas), such as Voting, typically impose an overhead on the cost of performing operations on replicas (in the Voting protocol, the cost of performing a read operation is increased because a quorum of replicas must be obtained).

An optimistic consistency control scheme like those described in [Davidson84][Abbadi 89] take a different approach and allow actions to continue operating even in the event of a partition. When the partition is eventually resolved it must be possible to detect any conflicts that have arisen as a result of the original failure and to be able to resolve them. These protocols assume that it is possible for committed actions to be rolled back (i.e., un—committed). How the detection and resolution of conflicts is performed is system specific e.g., in some systems it must be done manually, whereas in [Davidson 84] a mechanism is described that will allow the system to automate much of the work. 

JBossTS/Narayana blog - a reminder

I haven't been cross-posting links to the JBossTS/Narayana blog recently and it's well worth taking a look at the team have written a number of great articles over the year, particularly around microservices and transactions.

Primary Copy

The Primary Copy [Alsberg 76] mechanism is an implementation of the passive replication strategy, where one copy of an object is designated as the primary, and the other copies become its backups. All write operations are performed on the primary copy first, which then propagates the update to the secondary copies before replying to the request. Reads can be serviced by any replica since they all contain consistent copies of the state. Any inaccessible secondary copies are typically marked as such (perhaps to a name-server) so that they cannot be used as either a future primary or to be read from until they have been brought up-to-date (another method would be physically to remove them from the replica group until they have been updated). If the Primary Copy fails then a reassignment takes place between the remaining copies to elect a new Primary.

A problem arises when the Primary copy fails. If the primary site is down a reassignment is in order. However, if the network has become partitioned a reassignment would compromise consistency. If network partitioning has a low probability of occurrence then the process for electing a new primary can be allowed: the secondaries should be notified of the primary's failure and they must agree amongst themselves which one is to become the new primary copy. If the election of a new primary takes place and the existing primary has not actually failed (perhaps it was not able to reply to the 'are you alive' probe-messages in time because of an overloaded node) then the protocol should ensure that the client will only accept a reply from the newly elected primary, as the old primary could be in an inconsistent state. The protocol described in [Alsberg 76] tolerates network partitions by allowing operations to continue in all partitioned segments and relying upon some "integration" protocol to merge the states of replica groups when the partitions are re-joined. However, such integration is not guaranteed to be resolvable.

If we take the case of only a fixed primary site i.e., no secondary takes over because partitioning is possible, then a resource replicated using this strategy only increases the read availability. Its write availability is the same as the availability of the primary site. The other replication strategies all provide ways of increasing write availability.


Regeneration [Pu 86] is a similar replication scheme to Available Copies in that a client only requires one replica to service a read request but a write must be performed by all copies. When an update occurs, if fewer than the required number of replicas are available then additional copies are regenerated on other operating nodes. In doing so the system must check that there is sufficient space available on the target node for the new replica (in terms of the volatile and stable storage that it may use) and also that a copy does not already reside there. A write failure occurs if it is not possible to update the correct number of replicas, and a read failure occurs if no replica is available.

A recovering node and replica cannot simply rejoin the system. For each replicated resource the system must check to see whether the maximum number of replicas already exist. If so the recovering replica is deleted. If the maximum number does not exist the system must check one of the available replicas to determine whether the state of the recovering replica is consistent. If the recovering replica is inconsistent (i.e. an update has occurred since this replica failed) then it must be deleted because a new copy has already been created to take its place but is currently unavailable. 

Weighted Voting

The Voting (or Quorum Consensus) replication protocol [Gifford 79] is a replication scheme which can operate correctly in the presence of network partitions. In this method a non—negative weight is assigned to each replica and this weight information is available to every client in the network. When a client wishes to read (write) the group it must first gain access to what is known as a read (write) quorum. A quorum is any set of replicas with (typically) more than half the total weight of all copies. A read quorum (Nr) and a write quorum (Nw) are defined such that Nr+Nw > N (the total weight).
A read operation requires the access of any Nr copies (only data from up—to—date replicas should be used), and a write operation requires N up—to—date copies (so updates are not applied to obsolete replicas). The number of inaccessible copies tolerated by a read is N-Nr, and for a write operation it is N—Nw. The purpose of having quora is to ensure that read and write operations have at least one copy of an object in common. If the network partitions then voting allows access only from the majority partition if one exists.

Associated with each replica is a timestamp or update number which clients can use to determine which replicas are up—to—date. If Nw ≠ N then a read quorum is required to obtain the most up-to-date version of this number. If Nw = N then every functioning copy must contain the same value because every write operation has been performed on every replica in the group. This update number is used by clients to ensure that they only read data from up-to-date replicas, even though they may acquire access to out-of-date replicas in their read quorum.

The write operation is a two-phase, atomic operation, because either the states of Nw copies are modified or none of them are changed (to ensure that subsequent read and write quorum overlap and that a majority of the replicas are consistent). If a write quorum cannot be obtained the transaction must be aborted. However, a separate transaction can be run to copy the state of a current replica to an out-of-date replica. It is always legal to copy the contents of replicas in this way.

The weights assigned to replicas should be based on their relative importance to the system e.g., a printer spooler which resides on a very fast node would be considered better for throughput than one which resides on a slower node and would therefore be assigned a higher weight than a replica on a slower node. A replica with higher weight is more likely to be in the quorum component.

A major problem with this protocol is that read operations require a quorum, even if there is a local copy of the object. This can prove inconvenient (and slow) if an object wishes to carry out many read operations on that object in a short space of time. There is also the problem of fault tolerance: many copies of an object must be created to be able to tolerate only a few failures e.g., we require five copies just to be able to tolerate two crashes. 

Note that to cut down on the storage requirements Witnesses [Paris 86] can be used in place of actual replicas. A witness only maintains the current version number of the data. They can take part in quora, but there must be at least one "real" replica in a quorum. Other replication protocols based on the Voting protocol also exist [Abbadi 90][Jajodia89][Davcev 89]. They address issues such as the need to acquire a quorum for a read the responses of "true" copies. A write can only succeed if a write quorum contains at least one non-ghost copy.

Because of the way in which ghosts are created and used, a ghost is used to ensure that a particular non-ghost copy has failed due to a node crash and not to a segment partition. If it is possible to create a ghost then the segment has not been partitioned and only the node has failed. In Available Copies when a replica does not respond it is simply assumed that it is because of node failure, which can result in inconsistencies if the copy was partitioned. When the node on which the non-ghost copy originally resided is re-booted it is possible to convert the ghost copy back into its live version. 

Friday, December 22, 2017

Available Copies

In the Available Copies [Bernstein 87] replication protocol, a user of a replicated service reads from one replica and writes to all available replicas. Prior to the execution of an action, each client determines how many replicas of the service there are available, and where they are, (this information may be stored in a naming service and is accessed before each atomic action is performed). Whenever a client detects a failure of a replica it must update the naming service (name-server) view of the replicated object by performing a delete operation for the failed copy. All copies of the name-server, if it too is replicated, must be updated atomically.

When a write operation is performed all copies are written to and they must all reply to this request within a specified time (it is assumed that it is always possible to communicate with non-faulty replicas). Locks must be acquired on all of the functioning replicas before the operations can be performed, and if conflicts between clients occur then some replicas will not be locked on behalf of a client, and the client will be informed, at which point the calling action is aborted. Using this locking policy and the serialisability property of the actions within which operations occur, it is possible to ensure that all replicas execute the operations in identical order. 

If all replicas reply to a write operation then the action may continue. However, if only a subset reply the action must ensure that the silent members have in fact failed, If the silent replicas subsequently reply then the action must abort and try again (this is because the states of the replicas may have diverged). However, if the silent copies have actually failed then the action can still commit since all available copies are in a consistent state.

Whenever a new copy is created (or recovers from a failure) it must be brought up-to-date before the name-server is informed of the recovery (before a client can make use of the replica). When this is done the copy can take requests from clients along with the other members of the group. The updating of recovered replicas can be done automatically if an out-of-date replica intercepts/receives a write request from a current transaction, as has been mentioned previously.

Consider the history of events shown in the diagram below, where Tand T2 are different transactions operating on two replica groups whose members are xl, x2 and yl, y2. Assume that T1 and T2 are using a "read—one copy, write—all—available copies" scheme and that there are initially two copies of objects x and y which they both wish to access. The execution of events is as shown, with time increasing down the y—axis.

If we examine the above history, it is clearly not 1SR i.e., neither the serial execution T1;T2 nor T2;T1 are consistent with the above history. Thus, the idea of "read—one copy, write—all—available copies" by itself cannot guarantee 1SR. It is necessary to execute a validation protocol before the transaction can commit to ensure correctness. In Available Copies this takes the form of ensuring that every copy that was accessed is still available at commit time, and every replica that was unavailable is still unavailable, otherwise the action must abort. 

Because of the assumption made by Available Copies that all functional replicas can
always be contacted, this means that this protocol cannot be used in the presence of network partitions. Anode which is partitioned cannot be distinguished from a failed node until it has been reconnected. If the replication protocol assumes that all nodes which are unavailable have failed when in fact some have only been partitioned, inconsistencies can result in the replicas. As such, if partitions can occur then the replication protocol must be sufficiently sophisticated that it can ensure consistent behaviour despite such failures. 

Data Replication in Atomic Action Systems

Data replication techniques for atomic action systems to maintain one-copy serialisability (1SR) have been extensively studied (most notably with regard to replicating databases). When designing a replication protocol it is natural to examine those protocols (and systems which use them) that already exist, to determine whether they have any relevance.

  • Definition: If the effect of a group of atomic actions executing on a replicated object is equivalent to running those same atomic actions on a single copy of the object then the overall execution is said to be 1SR.

  • Definition: A replica consistency protocol is one which ensures 1SR.

Because most replication protocols have been developed for use in database environments it is important to understand the differences between the way in which operations function in a database system and the way in which similar operations would function in an object-oriented environment. These differences are important as they affect the way in which the replication protocols function.

In a database system, which performs operations on data structures, a read operation is typically implemented as a "read entire data structure", and a write operation is in fact a "read entire data structure, update state locally to the invoker, then write entire new data state back". In this way, a single write operation can also update (or re-initialise) the state of an out-of-date data structure.

In an object-oriented system, the read operation is typically implemented as "read a specific data value". Similarly, the write is "perform some operation which will modify the state of the object". The object simply exports an interface with certain operations through which it is possible to manipulate the object state. Some of these operations may update the state of the object, whilst others will simply leave it unchanged. A write operation in this case may only modify a subset of an object's state, and so cannot be guaranteed to perform an update as in a database system.

In a database system, the fact that a single write operation can update the entire state of a replica is used in replication protocols such as Available Copies. If these protocols are to be used in an object-oriented system then they will require explicit update protocols.

Finally, in a database system the invoker of a given operation knows whether that operation is state modifying or not i.e., it knows which type of lock will be required. However, in an object-oriented system users of a given object only see the exported interface and see nothing of the implementation, and therefore do not know whether a given operation will modify the state of the object. This difference is important as many of the replication protocols to be described implicitly assume that clients have this type of knowledge (it is used to ensure that read operations can be executed faster than write operations).

In the next few entries we shall examine some of the replication protocols which have been proposed for managing replicated data.

Wednesday, December 20, 2017

Replication and Atomic Actions

Replication can be integrated into atomic actions in two ways: replicas within actions, and replicated actions. These two methods are substantially different from each other and yet attempt to solve the same problem: to ensure (with a high degree of probability) that an action can commit despite a finite number of component failures.

Replicas within Actions

Of the two methods of combining replication with atomic actions, this method is the most intuitive: each non-replicated object is replaced by a replica group. Whenever an action issues a request on the object this is translated into a request on the entire replica group (how the requests are interpreted depends upon the replica consistency protocol). Failures of replicas within a given replica group are handled by the replication protocol in an effort to mask them until a sufficient number of failures have occurred which makes masking impossible, and in this case the replica group fails. When an action comes to commit, the replication protocol dictates the minimum number of replicas which must be functioning in any given replica group for that group to be considered operational and allow the action to commit.

Problems which arise from this method have already been outlined in earlier entries. The replication protocol must be able to deal with multiple messages from both replicated client groups as well as replicated server groups; the majority of replication protocols assume that replicated objects within the same replica group are deterministic; problems can occur in maintaining consistency between replicas when local asynchronous events can occur such as RPC timeouts.

Replicated Actions 

The idea behind Replicated Actions [Ahamad 87][Ng 89] is to take an atomic action written for a non-replicated system and replicate the entire action to achieve availability. In this scheme the unit of replication is the action along with non-replicated objects used within it. In the Clouds distributed system [Ahamad 87] the replicated actions are called PETS (Parallel Execution Threads). By replicating the action N times (creating N PETS) and hence by also replicating the objects which the action uses, the probability of at least one action being able to commit successfully is increased. Although the actions and objects are replicated, each action only ever communicates with one replica from a given object-replica group and does not know about either the other replicas or the other actions until it comes to commit. Subsequently, if this replica fails, so too does the action (as is the case in a non-replicated system).

Because each action only uses one replica in a given group and each replica only receives messages from this one action, the replicas need not be deterministic, and there is no need to devise some protocol for dealing with multiple requests (or replies) from a client (server) group. When the replicated actions commit, it is necessary to ensure that only one action does so (to maintain consistency between the replicated objects). To do this, typically the first action to commit (the fastest) as part of its commit protocol copies the states of the objects it has used to those replicas it did not use (i.e. the replicas used by the other actions) and causes the other actions to abort. This is done atomically, so that if this atomic action should fail then the next replicated action which commits will proceed as though it had finished first.

The figure above shows the state of the objects A, B, C, and D which are replicated three
times, just as the replicated actions, which are also replicated three times, α, β, and ε, begin commit processing. The execution paths of these actions is indicated by the lines.
When actions α and β come to commit they will be unable to do so because the object D (which α used) and C (which β used) have failed, making commit impossible. However, action ε will be able to commit, and will then update the states of all remaining functioning replicas. Failed replicas must be updated before they can be reused.

Obviously the choice of which replicas the actions should use is important e.g. in
In the figure, if action ε had used the same copy of object D as action then the failure of this object would have caused the two actions to have failed instead of one. In some systems [Ahamad 87] the choice of which replica a particular action uses is made randomly because they make the assumption that with a sufficient number of object replicas the chances of two actions using the same copy are small. However, in [Ng 89] they propose a different approach by using information about the nature of the distributed system (reliability of nodes, probability of network partitions occurring) to come up with an optimal route for each action to take when using replicated objects. This route attempts to minimize the object overlap between replicated actions and minimize the number of different nodes that an action visits during the course of its execution.

The advantage of using replicated actions as opposed to using replicas within an action are the same advantages obtained from using a passive replication protocol as opposed to using an active replication protocol: there is no need to ensure that all copies of the same object are deterministic since the action which commits will impose its view of the state of the object on all other copies, and each replica will receive a reduced number of repeated requests from replicated clients (reduced to only one message if each action makes use of a different replica).
However, the scheme also suffers from the problems inherent with a passive replication scheme: checkpointing of state across a network can be a time consuming, expensive operation. If we assume failures to be rare (as would be the case) then this scheme will cause large numbers of actions to abort. The action which commits first will overwrite the states of the other replicas, effectively removing the knowledge that the other actions ever ran. In some applications it may prove more of an overhead to checkpoint the state of one action in this way rather than allowing all functioning actions simply to commit.

Monday, December 18, 2017

An overview of some existing distributed systems (of the time)

Existing distributed systems which make use of multicast communication can be categorised as using either one—to—many communication (one client interacting with a replica group) or many—to—many communication (replica groups interacting with replica groups). We shall now look at a number of the popular protocols and show how they have approached the problems of maintaining replica consistency to both external and internal events.

One-to-Many Communication 

The V System

The V System [Cheriton 84][Cheriton 85makes use of what they term one-to-many communication transactions, where the start of a transaction marks the end of the previous transaction (any outstanding messages associated with the old transaction are destroyed without being processed). Each group member has a finite message buffer into which replies are queued provided they arrive before the start of the next send transaction. Although a reliable communication protocol is not used, the assumption is made that by making use of retransmissions is it possible to ensure (with a high probability) that messages are delivered if the sender and receiver remain operational. A further assumption is that at least one operational group member receives and replies to a message.

The designers of the V System recognized the possibility of message loss due to message buffer overflow, and their solution was to make use of larger buffers and to modify the kernel to introduce a random delay when replying to a group send, thereby reducing the number of messages in a queue at any time. This imposes a consistent overhead on all group replies but still does not ensure that buffer overflow will never occur (if the number of different groups interacting with each other increases then even with this delay it is still probable that the finite sized message queues will overflow).

When servers reply to a client they do so on a many-to--one basis i.e., all servers which received a request from a particular client will attempt to reply to that client (and not to any replica group the client may be a member of). This means that there is the possibility of members of the same group taking different actions as a result of local events such as timeouts (resulting in state divergence), because the servers replied to some, but not all, clients on time.

The Andrew System 

The Andrew System [Satyanarayanan 90] assumes that the communication system is reliable in much the same manner as the V System i.e., given a sufficient number of retries then any operational server can always be contacted. Communication is one-to-many and the termination conditions for a particular client-server interaction is set on a per call basis (depending on the application). The designers recognized that calls between remote processes can take an indeterminate amount of time and so if a client times out it sends an "are you alive" message to any server replica which has not responded and if a reply is returned then it continues to wait. However, because servers reply on a many-to-one basis, state divergence between replicated clients can still occur: if only one client receives a reply sent before all servers fail, then the replicated clients may take different actions.

It was recognized that messages might be lost by processes because of message buffer overflow, but in Andrew it is assumed that such loses will be detected by retries. This implicitly relies on the servers keeping the last transmitted replies to every client, but could still result in state divergence if, for example, a server group is seen to have failed by one replicated client when it retries a request, when in fact all other members of the client group received the reply before the failure. 

Many-to-Many Communication 

The Circus System

The Circus System [Cooper 84b] (and Multi-Functions [Banatre 86b] in Gothic [Banatre 86a]) makes use of many-to-many communication, with both client and server groups. As with the V System they assume that with a sufficient number of transmission retries it is possible to deliver messages to all operational members of a group. Thus, if a call eventually times out (after retrying a sufficient number of times) a sender can assume that the receiver(s) has failed. Each member of a group has a message queue which is assumed to be infinite in size (if this assumption cannot be made then the designers of Circus assume that retransmissions of requests and replies can account for losses due to buffer overflow).

Throughout the description of Circus the assumption is made that the members of a group are detenninistic (given the same set of messages in the same order and the same starting state then they will all arrive at the same next state). Client groups' members operate as though they were not replicated and do not know that they are members of a replica group. No mention is made of how this determinism of group members extends to asynchronous local events such as timeouts.

Despite the assumptions made by the designers, both the problem of message buffer overflow, and local timeouts, can occur in the Circus system, leading to replica state divergence.  


Following on from the last entry I did the initial implementation of red/REL as part of my PhD and that document includes an overview of the protocol as well as some performance figures and proposed optimisations to the protocol. For now I won't cover that here as there's a good paper in the previous reference to read, or the PhD of course. However, maybe I'll come back to this later.

Sunday, December 17, 2017

Multicasts and Replication

In the previous entries we have discussed what delivery properties can be provided by the communication subsystem, and how they can be implemented. We shall now discuss how such communications primitives can be used in a replication scheme to aid in maintaining replica consistency.

Group invocations can be implemented as replicated RPCs by replacing the one-to-one communication of send_request and send_result in the figure below (which we saw in an earlier entry too) with one-to-many communication. 

Every non-faulty member of a client group sends the request message to every non-faulty member of the server group, which in turn send a reply back. If multiple client groups invoke methods on the same replicated server group then it must be ensured that concurrent invocations are executed in an identical order at all of the correctly functioning replicas, otherwise the states of the replicas may diverge. In order to ensure this property, the objects must not only be deterministic in nature, but all correctly functioning replicas must receive the same sets of messages in the same order i.e., a totally ordered multicast must be employed.

The State Machine conditions Cl and C2 can both be met by making use of totally ordered multicasts to deliver all messages transmitted by clients and servers. However, such total ordering of messages may be unnecessary for all interactions: if two, non-conflicting, non-related messages are received at members of the same replica group (e.g., two unrelated electronic mail messages from different users) then they need not be ordered consistently at these destinations. If they were related in some manner (e.g., from the same user) then they could be ordered consistently. Application level ordering can be achieved more efficiently as cheaper, reliable broadcast protocols can be used to deliver messages for which ordering is unimportant, resorting to the more complex order preserving protocols only where necessary. Since such protocols typically need more rounds of messages to be transmitted, the reduction in their use can be beneficial to the system as a whole, whilst maintaining overall replica consistency.

One method of achieving such application level ordering would be to transmit messages using unordered atomic multicasts (since it is still important that the messages are received by all functioning replicas) which only guarantee delivery to all functioning replicas but make no guarantee of the order (described in a previous entry), and then to impose ordering on top of this i.e., at a level above the communication layer. If atomic actions are used in the system then we can make use of their properties to impose the ordering on message execution that we require i.e., the order shall be equivalent to the serialization order imposed by atomic actions. This has the advantage that operations from different clients which do not conflict (e.g., multiple read operations) can be executed in a different order at each replica. Atomic actions will ensure that multiple accesses from different clients to the same resource will be allowed only if such interaction is serialisable.

Failed on historical blogging

Where has the time gone?! Around a year ago I said I would blog a bit from my PhD and though I did do a few entries I haven't been able to do anything since January! Looking back it's a combination of work, family commitments and writing a book with some of my friends, but I didn't realise they'd soaked up so much of my time. Oh well, there are a few days left in 2017 so let me see what I can do to try to catch up a bit.

Friday, January 06, 2017


Following on from the last entry, we shall now consider one implementation of a communication subsystem which provides some of the delivery properties described previously. It is important to understand how these ordering requirements can be met, and the overhead which is involved in guaranteeing them, before we discuss how such communication primitives can be used to provide replicated object groups. Other reliable communication subsystems exist, of which [Chang 84][Cristian 85][Cristian 90][Verissimo 89] are a sample, but we shall consider Psync because it illustrates many points clearly.


Psync [Peterson 87][Mishra 89] ("pseudosynchronous") is a communication subsystem designed to provide reliable multicast communication between objects, and is based on the message history approach described above. The system assumes that operations on objects which change the state occur atomically and are idempotent. Associated with each object is a manager process. A client process locates a particular manager (perhaps by consulting a naming service) and then invokes operations on the object by sending requests to that manager. When a manager receives a request to invoke a particular operation on an object, it encapsulates the operation in a message and uses the Psync many-to-many communications protocol to forward the message to all of the managers involved (including itself) if the object is a member of a group. Based on the set of received messages, each manager can then decide on an order in which to apply the operations to its
copy of the object. This protocol can be extended to be used for the interactions of replicated object groups, and the exact details of the replication protocol used in Psync will be described in a later posting.

Conversations and Context Graphs

Psync explicitly preserves the partial ordering of messages exchanged among a collection of processes in the presence of communication and processor failures (Psync cannot function in the presence of network partitions). A collection of processes exchange messages through a conversation abstraction. This conversation is defined by a directed acyclic graph (a context graph) that preserves the partial order of the exchanged messages. This ordering is made available to all managers involved in a conversation and by using this they can determine when to execute operations on their local objects.

When processes communicate they do so by sending messages in the context of those messages they have already received. Participants are able to receive all messages sent by the other participants in the conversation but they do not receive the messages that they themselves send. Each participant in a conversation has a view of the context graph that corresponds to those messages it has sent or received. The semantics of the communications primitives provided by Psync are defined in terms of the context graph and a participant's view.

The figure below shows an example of a context graph. This conversation is started with the initial message m1. Messages m2 and m3 were sent by processes that had received m1, but are independent of each other (hence no link between them), and m4 was sent by a process that had received m1 and m3, but not m2. Messages that are not in the context of some other message are said to be concurrent (occur at the same logical time). The relation < is defined to be "in the context of".
The context graph contains information about which processes have received what messages. The receipt of a message implies that the sender has seen all of the messages which came before it in the context graph. A message mp sent by process p is said to be stable if for each participant in the conversation q ≠ p, there exists vertex mq in the context graph sent by q, such that m < mq. For a message to be stable means that all participants except the sender have received it, it must follow that all future messages sent to the conversation must be in the context of the stable message.

In the figure below, we have a context graph which depicts a conversation between three participants, a, b, and c. Messages al, a2, ... denotes the sequence of messages sent by process a, and so on. Message al, b1, and c1 are the only stable messages; messages a2 and a3 are two unstable messages sent by process a.
Psync maintains a copy of a conversation's context graph at each host on which a participant in the conversation resides. Each process in the conversation receives messages from this local copy of the context graph, which is termed the image. Whenever a process at one host sends a message, Psync propagates a copy of the message to each of the hosts in the conversation. This message contains information about all messages upon which the new message depends, so that the receiving hosts can append it to their context graphs in the correct place.

Dealing with Network and Host Failures

Suppose message m is not delivered to some host because of a network failure. If at some future time a message m' arrives that depends on m then the host will detect that it is missing m and will send a retransmission request form to the host that sent m', (this host is guaranteed to have m as a local participant has just sent a message which is in the context of it).

The operations provided to aid applications in recovering from host failures include the ability for a participant to remove a failed participant from its definition of the participant set for a conversation. This is necessary so that messages will eventually stabilize relative to the functioning participants. Once a given participant has been masked out, Psync ignores all further messages from that process.

There is also an inverse operation that allows a participant to rejoin a participant set. It would be invoked when a participant becomes aware that another participant which was formally failed has now recovered.

When a host recovers, a participant can initiate a recovery action which will inform other participants that the invoking participant has restarted, and to initiate reconstruction of the local image of the context graph. Each member of the conversation will transmit its local copy of the context graph to the recovering participant who can then use this to reconstruct its own local copy.

Total Ordering

As described, the Psync protocol only gives a partial ordering of messages i.e., only the causal ordering of messages is preserved. To convert a partial order into a total order, whereby messages which are not causally related are ordered identically at all overlapping destinations, requires additional information to be shared amongst the destinations which indicates the order in which to place such messages. In Psync, the context graph which accompanies each message provides this information. The partial order that Psync provides can be used to give a total order if all participants perform the same topological sort of the context graph. This sort must be incremental i.e., each process waits for a portion of its view to be stabilized before allowing the sort to proceed. This is done to ensure that no future messages sent to the conversation will invalidate the total ordering. The replication protocol used in Psync uses just such a scheme and will be described in a later article.

Tuesday, January 03, 2017

Multicasts and Latency

The next few entries in the series will consider some aspects of multicast protocols.

As described in [Shrivastava 90a], the latency of a multicast service is defined to be the time taken for a message, once sent, to reach the destination processes. This latency is particularly important for protocols providing reliability and ordering guarantees. As we shall see, whereas the latency for an unreliable multicast service is bounded (typically of the order of a few milliseconds), the latency for a multicast service which operates in the presence of failures (message losses and node crashes) can be bounded or unbounded depending upon the implementation.

Existing order preserving protocols can be broadly classified in the following way:
  • message history based: the main idea behind such protocols is that when a process sends a message it appends some historical information about the messages it has received in its recent past. This historical information enables the receivers to retrieve any missing messages and to order them properly. This type of protocol ensures that an incomplete multicast is eventually completed, and hence possesses an unbounded latency property.
  • centralised distributors: here the sender delivers the message to a specific member of the group (the primary) who is then responsible for distributing the message to the fellow members of the group. The primary can assign a total order to the messages it receives. As we have already seen, failure detection mechanisms are necessary to detect failed primaries and to elect new primaries which can take over and complete the multicasts. Such protocols can possess bounded latency, but the necessity to detect asynchronously occurring failures can impose an overhead on performance.
  • multi-phase commit: these protocols, providing total order, use multiphase algorithms (typically 2 or 3 message rounds) which are similar to the two-phase algorithm described earlier for atomic action commits. The sender delivers the message to the destinations which return sufficient information to the sender about the messages that they have received so that in the subsequent rounds of the protocol the sender can impose an identical order of the message onto the destinations. The message is only considered to have been delivered if all of the phases of the protocol complete. Such protocols provide bounded latency.
  • clock-based: these protocols are an important class of the multi-phase algorithms, and assume the existence of a global time base. Timestamps derived from such a time base can then be used for imposing a total order on messages. Such protocols can provide constant latency communication, having the attractive property that if a sender multicasts a message at clock time T, then it can be sure that all functioning receivers will have received the message by clock time T + ∆, where ∆ is the constant indicating the protocol latency (∆ must be determined by applying worst case timing and failure assumptions).
We shall next examine a system which provides a reliable multicast protocol using a message history based approach.

Monday, January 02, 2017

A Java EE Interlude

I want to take a quick break from the other series for a moment to do something I've been remiss about for a while: addressing a recent report by Gartner on the state of Java EE. Before doing so I decided the other day to read the article my friend and colleague John Clingan made a few weeks ago and I realised that John had done a great job already. In fact so good I couldn't see myself adding much to it, but I'll try below. Note, there are other responses to this report and I haven't had a chance to read them but they might be just as good.

To start with, it's worth noting that I've known Anne for a number of years and we've had one or two disagreements during that time but also some agreements. Her announcements about the death of SOA seven years or so ago falls somewhere in between, for example. It's also important to realise, if you don't already, that I do believe everything has a natural cycle ("the circle of life") and nothing lasts forever; whether it's dinosaurs giving way to mammals (with some help from a meteor and/or the Deccan Traps), or CORBA shifting aside for J2EE, evolution is a fact of life. Therefore, whilst I disagree with Anne about the short-to-medium term future of Java EE, longer term it will pass into history. However, before doing so it will evolve and continue to influence the next generation of technologies, just as the dinosaurs became the birds and aspects of CORBA evolved into J2EE. For more details on my thinking on this topic over the years I leave it as an exercise to the reader to check this blog or my JBoss blog.

John covers my next point very well but it's worth stressing: I've mentioned on many occasions that my background is heavily scientific and I continue to try to employ objective scientific method to everything I do. So when I see blanket statements like "fade in relevance" or "lighter-weight approaches", whether it's in a Gartner report, a paper I'm reviewing as a PC member, or even something one of my team is writing, I immediately push back if there's no supporting evidence. And I read this report several times searching for it but kept coming up empty handed. It was one subjective statement after another, with no real attempt to justify them. That's disappointing because with my scientific hat on I love to see facts backing up theories, especially if those theories contradict my own. That's how we learn.

One other thing I did get from the report, and I'm prepared to admit this may be my own subjective reading, was that it seemed like a vague attempt to look into the future for middleware frameworks and stacks without really understanding what has brought us to where we are today. I've written about this many times before but weak assertions by the report that "modern applications" are somehow so different from those we've developed for the last 50 years that existing approaches such as Java EE are not useful really doesn't make any sense if you bother to think about it objectively. Distributed applications, services and components need to communicate with each other, which means some form of messaging (hopefully reliable, scalable and something which performs); there's really no such thing as a stateless application, so you'll need to save data somewhere (again, reliable, scalable and performant); hey, maybe you also want some consistency between applications or copies of data, so transactions of one form or another might be of help. And the list goes on.

Of course application requirements change over time (e.g., I recall doing research back in the 1980's where scale was measured in the 10's of machines), but new applications and architectures don't suddenly spring into being and throw away all previous requirements; it's evolution too though. I've presented my thoughts on this over the past couple of years at various conferences. In some ways you can consider Java EE a convenient packaging mechanism for these core services, which are typically deployed in a co-located (single process) manner. Yet if you look beyond the Java EE "veneer" you can still see the influence of enterprise distributed systems that predate it and also the similarities with where some next generation efforts are heading.

I suppose another key point which John also made well was that the report fails miserably to differentiate between Java EE as a standard and the various implementations. I'm not going to cover the entire spectrum of problems I have with this particular failure, but of course lightweight is one of them. Well over 5 years ago I wrote about the things we'd done years previously to make AS7 run on a plug computer or Android phone and we've kept up that level of innovation up throughout the intervening time. There's nothing in the Java EE standard which says you have to build a bloated, heavyweight application server and we, as well as other implementations, have proven that time and time again. It's disappointing that none of this is reflected in the report. I suppose it's one of those terms the authors hope that if they say it enough, people will attribute their own subjective opinions and assumptions to it.

Another one of those throw-away statements the report is good at is that "[...] at this point, Java developers eschew Java EE". I admit not every Java developer wants to use Java EE; not every Java developer wanted to use J2EE either. But you know what? Many Java developers do like Java EE and do want to use it. You only have to go to conferences like JavaOne, Devoxx or other popular Java events to find them in abundance. Or come and talk to some of our customers or those of IBM, Payara, Tomitribe or other members of the MicroProfile effort.

I could go on and on with this but the entry has already grown larger than I expected. John did a great job with his response and you should go read that for an objective analysis. Probably the only positive thing I could attribute to the original Gartner report is that its very existence probably proves that time travel is possible! It's a theory which fits the facts: a report which criticises something based on data which is largely a decade old means it was probably written a decade ago.

Sunday, January 01, 2017

Remote Object Invocation

The next in our series ...

Invocations on objects which are not replicated are traditionally based on the RPC as this retains the correct semantics of a procedure call i.e., a single flow (thread) of control from caller to callee and back again (as with a traditional procedure call). The previous entry described the concept of the Remote Procedure Call, and the simplified model of client-server interaction shown in the figure below will be assumed for the discussion to follow: a client uses the primitives sendjequest() for sending a call request and receive_result() for receiving the corresponding results. Clients and servers maintain enough state information to recognize and discard duplicate messages (filter requests). The server maintains a queue of messages from possibly multiple clients, and uses the primitive receive_re quest() to pick a message from the queue in a fifo order. After invoking the right method, the result is sent to the client with the send_result() primitive.
When making replicated invocations (such as when calling a replica group) the semantics of such communication differ considerably from that of the traditional RPC: there is no longer a single thread of control, but rather multiple threads which may all eventually return to the caller. Such invocations are typically referred to as Replicated Procedure Calls [Cooper 84a][Cooper 85], and can be implemented using one—to—many (or multicast) communication facilities. We discuss various aspects of multicast communication below.

One-to-Many Communication

The main services a multicast protocol provides can be categorised into three classes: ordering, reliability and latency. By imposing (increasing) ordering and reliability constraints on the delivery of multicast messages it is possible to define increasingly sophisticated protocols (typically at the expense of the latency). To understand these protocols first assume that a sender S is attempting to multicast to a group G = {P1,...,Pn}. Following the definitions outlined in [Shrivastava 90b][ANSA 90]:

Unordered and Unreliable

A multicast from S will be received by a subset of functioning nodes Pi ∈ G. Successive multicasts from S will be received in an arbitrary order at the destinations. The next figure shows sender S multicasting messages m1 and m2 to the group G. The first message is received by P2 and Pn in different orders, and message m2 is not received by P1

FIFO Multicast

Provided the sender does not crash while transmitting the message, all correctly functioning receivers are guaranteed to get the message. Furthermore, the multicasts will be received in the order they were made.

The next figure shows two senders (S 1 and S2) multicasting to the group G. All members of G received m1 before m2, but some members may receive m3 before m2. This last ordering is correct given the definition of the protocol: no information about the relative ordering of multicasts between senders is available to the receivers.

Atomic multicast

If the sender does not crash before completing a multicast, the message is guaranteed to be received by all functioning members. If, however, the sender crashes during a multicast, then it is guaranteed that the message is received by either all or none of the functioning processes (atomic deliveiy). All multicasts from the same sender are received in the order they were made.

Causal multicast

This multicast extends the ordering property of the Atomic multicast to causally related sends from different senders while still meeting the reliability guarantee. [Lamport 78] was the first to introduce the concept of potential causal relationships into computer interactions and showed what effects these relationships can have on the operations of interacting processes. Two events are potentially causally related if information from the first event could have reached the second event before it occurred. The notation used to denote such relationships is typically X → Y, where → means precedes (happened before). Note that if X and Y are events from the same process and Y follows X then Y is necessarily causally related to X. A causal communication system will only preserve an ordering of events if the order is causally related. If two events are not related in this way then there is no guarantee on the delivery order.
In the figure above S1 is multicasting the groups G 1and G2, P1is multicasting to group G1. G1={P2, P3} and G2={P1, P4}.Thereisapotentialflowofinformationfromsend(m1,Gi) to send(m2,G2), and from send(m 2,0 2) to send(m3,G1). This means that the sending of m3 by Pi is potentially causally related to the sending of m1 by S1. Hence the causal multicast protocol must ensure that all functioning members of G 1receive m1 before m3. Events such as m3 and m4 which are not causally related can be received in any order (they are termed concurrent).

Totally ordered multicast

The (partial) causal order can be extended to a total order of messages such that all messages (whether causally related or not) are received by all destinations in the same order (which must also preserve causality).