Since at least the early 1990's our industry and academia have dabbled with eventual consistency. I've written and spoken a bit about eventual consistency around transactions and replication as it applies to scaling systems and also SOA over the years. As Eric wrote many years ago, some of it even made its way into standards. Recently the industry has been looking at how eventual consistency may be required within microservices. For instance, what about Eclipse MicroProfile and transactions (yes, a favourite of mine)? It's possible some of this may make its way into Eclipse EE4J. And now Matt Klein from Lyft has written a nice piece about eventual consistency and networks, which is also worth a read.
Just remember: right tool for right job; just as ACID transactions aren't right for evry use case, strong consistency isn't right for every use case, and hence neither is eventual consistency right for every use case. Think, understand, consider and apply.
Sunday, January 28, 2018
Sunday, December 31, 2017
Effectiveness of Replication Strategies
In [Noe 86] a simulation study for the comparison of available copies, quorum
consensus, and regeneration was carried out to determine which replication protocol was
the most efficient given a specific configuration of distributed system, and a certain set of
failure characteristics.
The model was programmed in SIMULA [Birwhistle 73], and assumed a local area
network consisting of a number of separate computers interconnected by a
communications medium such as an Ethernet, with no communications failures. The
parameters used in the simulation, such as crash rates and node load, were obtained from
studies of existing distributed systems and from mathematical models, and all parameters
were the same for each replication protocol simulated. Crash frequency varied between 100 and 300 days, with repair times having a mean of 7 days. The number of replicated
resources ranged from only one copy to having three copies, and the ratio of read requests
to write requests varied from a probability of 0.3 up to 0.7, with request frequencies
varying from between 50 and 400 requests per day. The number of nodes in the system also
varied from 10 to 30. All measured results were taken over a simulated time of 2 years of operation.
Simulation Results
The quantities calculated from the results were the read and write availability of the
replicated service. The read availability was defined and calculated as the total number of successful read requests divided by the total number of read requests. Write availability
was similarly defined in terms of write requests.
The Voting protocol provided less protection than either of the other protocols and
would not even be considered until a maximum of 3 copies were used. In such a case the
optimal size for a read and write quorum is 2; with a write quorum of 3 the replicated
resource performed worse than in the non-replicated case because there are three ways to
lose a single copy and destroy the write quorum.
What was found from the results was that replication provides a significant increase in
availability. However, there is little point in going beyond a maximum of two copies. Both
the Available Copies and Regeneration techniques provide a substantial increase in
availability, raising the value of read and write availability very close to 1.0 i.e., whenever a
request is performed upon a replicated resource it will be carried out successfully. There is
very little additional gain with either of these protocols in having a maximum of 3 copies of
each resource.
Both Available Copies and Regeneration are preferable to Voting if network
partitions are rare, or if measures are added to prevent or reconcile independent updates
during partition rejoining. The read and write availability of the Available Copies
technique are the same, and remain relatively constant despite changes in the request rate and the number of nodes. Regeneration can be preferable to Available Copies in an
unstable environment that suffers from high crash frequencies, with a high number of
updates and frequent reconfiguration of the network. Further, Regeneration can equal or
surpass the performance of the Available Copies technique only if enough additional
storage is supplied to allow regeneration.
Optimistic and Pessimistic Consistency Control
The replication protocols described previously can all be considered pessimistic with
regard to consistency of data in the presence of network partitions. Until a partitioned
network is reconnected, it is impossible for nodes on one side of the partition to
differentiate between being partitioned and a failure of the nodes on the other side of the
partition. As has been described in previous sections, this can have an adverse affect on
replicated groups which have also been partitioned, unless some method is provided to ensure that update operations can only be performed consistently on the entire group. Typically, these replication protocols are used in conjunction with atomic actions, and in
the event of a network partition either only one partition (in the case of Voting) or no
partition is allowed to continue to progress, meaning that any atomic actions that were
executing must be aborted to maintain consistency of state between the partitioned
replicas. They are pessimistic, using the principle that, if it is not possible to tell definitely
that replicas have failed then it is better to do nothing at all. Those protocols which can
operate correctly in the presence of a network partition (still maintain consistency of
replicas), such as Voting, typically impose an overhead on the cost of performing operations on replicas (in the Voting protocol, the cost of performing a read operation is
increased because a quorum of replicas must be obtained).
An optimistic consistency control scheme like those described in [Davidson84][Abbadi 89] take a different approach and allow actions to continue operating even in
the event of a partition. When the partition is eventually resolved it must be possible to
detect any conflicts that have arisen as a result of the original failure and to be able to
resolve them. These protocols assume that it is possible for committed actions to be rolled
back (i.e., un—committed). How the detection and resolution of conflicts is performed is system specific e.g., in some systems it must be done manually, whereas in [Davidson 84] a
mechanism is described that will allow the system to automate much of the work.
JBossTS/Narayana blog - a reminder
I haven't been cross-posting links to the JBossTS/Narayana blog recently and it's well worth taking a look at the team have written a number of great articles over the year, particularly around microservices and transactions.
Labels:
compensation transactions,
microservices,
Narayana
Primary Copy
The Primary Copy [Alsberg 76] mechanism is an implementation of the passive
replication strategy, where one copy of an object is designated as the primary, and the
other copies become its backups. All write operations are performed on the primary copy
first, which then propagates the update to the secondary copies before replying to the
request. Reads can be serviced by any replica since they all contain consistent copies of the
state. Any inaccessible secondary copies are typically marked as such (perhaps to a
name-server) so that they cannot be used as either a future primary or to be read from
until they have been brought up-to-date (another method would be physically to remove
them from the replica group until they have been updated). If the Primary Copy fails then a
reassignment takes place between the remaining copies to elect a new Primary.
A problem arises when the Primary copy fails. If the primary site is down a reassignment is in order. However, if the network has become partitioned a reassignment would compromise consistency. If network partitioning has a low probability of occurrence then the process for electing a new primary can be allowed: the secondaries should be notified of the primary's failure and they must agree amongst themselves which one is to become the new primary copy. If the election of a new primary takes place and the existing primary has not actually failed (perhaps it was not able to reply to the 'are you alive' probe-messages in time because of an overloaded node) then the protocol should ensure that the client will only accept a reply from the newly elected primary, as the old primary could be in an inconsistent state. The protocol described in [Alsberg 76] tolerates network partitions by allowing operations to continue in all partitioned segments and relying upon some "integration" protocol to merge the states of replica groups when the partitions are re-joined. However, such integration is not guaranteed to be resolvable.
If we take the case of only a fixed primary site i.e., no secondary takes over because partitioning is possible, then a resource replicated using this strategy only increases the read availability. Its write availability is the same as the availability of the primary site. The other replication strategies all provide ways of increasing write availability.
A problem arises when the Primary copy fails. If the primary site is down a reassignment is in order. However, if the network has become partitioned a reassignment would compromise consistency. If network partitioning has a low probability of occurrence then the process for electing a new primary can be allowed: the secondaries should be notified of the primary's failure and they must agree amongst themselves which one is to become the new primary copy. If the election of a new primary takes place and the existing primary has not actually failed (perhaps it was not able to reply to the 'are you alive' probe-messages in time because of an overloaded node) then the protocol should ensure that the client will only accept a reply from the newly elected primary, as the old primary could be in an inconsistent state. The protocol described in [Alsberg 76] tolerates network partitions by allowing operations to continue in all partitioned segments and relying upon some "integration" protocol to merge the states of replica groups when the partitions are re-joined. However, such integration is not guaranteed to be resolvable.
If we take the case of only a fixed primary site i.e., no secondary takes over because partitioning is possible, then a resource replicated using this strategy only increases the read availability. Its write availability is the same as the availability of the primary site. The other replication strategies all provide ways of increasing write availability.
Regeneration
Regeneration [Pu 86] is a similar replication scheme to Available Copies in that a
client only requires one replica to service a read request but a write must be performed by
all copies. When an update occurs, if fewer than the required number of replicas are
available then additional copies are regenerated on other operating nodes. In doing so the
system must check that there is sufficient space available on the target node for the new
replica (in terms of the volatile and stable storage that it may use) and also that a copy does
not already reside there. A write failure occurs if it is not possible to update the correct
number of replicas, and a read failure occurs if no replica is available.
A recovering node and replica cannot simply rejoin the system. For each replicated resource the system must check to see whether the maximum number of replicas already exist. If so the recovering replica is deleted. If the maximum number does not exist the system must check one of the available replicas to determine whether the state of the recovering replica is consistent. If the recovering replica is inconsistent (i.e. an update has occurred since this replica failed) then it must be deleted because a new copy has already been created to take its place but is currently unavailable.
A recovering node and replica cannot simply rejoin the system. For each replicated resource the system must check to see whether the maximum number of replicas already exist. If so the recovering replica is deleted. If the maximum number does not exist the system must check one of the available replicas to determine whether the state of the recovering replica is consistent. If the recovering replica is inconsistent (i.e. an update has occurred since this replica failed) then it must be deleted because a new copy has already been created to take its place but is currently unavailable.
Weighted Voting
The Voting (or Quorum Consensus) replication protocol [Gifford 79] is a replication
scheme which can operate correctly in the presence of network partitions. In this method a
non—negative weight is assigned to each replica and this weight information is available to
every client in the network. When a client wishes to read (write) the group it must first gain
access to what is known as a read (write) quorum. A quorum is any set of replicas with
(typically) more than half the total weight of all copies. A read quorum (Nr) and a write
quorum (Nw) are defined such that Nr+Nw > N (the total weight).
A read operation requires the access of any Nr copies (only data from up—to—date
replicas should be used), and a write operation requires N up—to—date copies (so updates
are not applied to obsolete replicas). The number of inaccessible copies tolerated by a
read is N-Nr, and for a write operation it is N—Nw. The purpose of having quora is to
ensure that read and write operations have at least one copy of an object in common. If the
network partitions then voting allows access only from the majority partition if one exists.
Associated with each replica is a timestamp or update number which clients can use to determine which replicas are up—to—date. If Nw ≠ N then a read quorum is required to obtain the most up-to-date version of this number. If Nw = N then every functioning copy must contain the same value because every write operation has been performed on every replica in the group. This update number is used by clients to ensure that they only read data from up-to-date replicas, even though they may acquire access to out-of-date replicas in their read quorum.
Associated with each replica is a timestamp or update number which clients can use to determine which replicas are up—to—date. If Nw ≠ N then a read quorum is required to obtain the most up-to-date version of this number. If Nw = N then every functioning copy must contain the same value because every write operation has been performed on every replica in the group. This update number is used by clients to ensure that they only read data from up-to-date replicas, even though they may acquire access to out-of-date replicas in their read quorum.
The write operation is a two-phase, atomic operation, because either the states of Nw copies are modified or none of them are changed (to ensure that subsequent read and
write quorum overlap and that a majority of the replicas are consistent). If a write quorum
cannot be obtained the transaction must be aborted. However, a separate transaction can
be run to copy the state of a current replica to an out-of-date replica. It is always legal to
copy the contents of replicas in this way.
The weights assigned to replicas should be based on their relative importance to the system e.g., a printer spooler which resides on a very fast node would be considered better for throughput than one which resides on a slower node and would therefore be assigned a higher weight than a replica on a slower node. A replica with higher weight is more likely to be in the quorum component.
A major problem with this protocol is that read operations require a quorum, even if there is a local copy of the object. This can prove inconvenient (and slow) if an object wishes to carry out many read operations on that object in a short space of time. There is also the problem of fault tolerance: many copies of an object must be created to be able to tolerate only a few failures e.g., we require five copies just to be able to tolerate two crashes.
The weights assigned to replicas should be based on their relative importance to the system e.g., a printer spooler which resides on a very fast node would be considered better for throughput than one which resides on a slower node and would therefore be assigned a higher weight than a replica on a slower node. A replica with higher weight is more likely to be in the quorum component.
A major problem with this protocol is that read operations require a quorum, even if there is a local copy of the object. This can prove inconvenient (and slow) if an object wishes to carry out many read operations on that object in a short space of time. There is also the problem of fault tolerance: many copies of an object must be created to be able to tolerate only a few failures e.g., we require five copies just to be able to tolerate two crashes.
Note that to cut down on the storage requirements Witnesses [Paris 86] can be used in
place of actual replicas. A witness only maintains the current version number of the data.
They can take part in quora, but there must be at least one "real" replica in a quorum.
Other replication protocols based on the Voting protocol also exist [Abbadi 90][Jajodia89][Davcev 89]. They address issues such as the need to acquire a quorum for a read the responses of "true" copies. A write can only succeed if a write quorum contains at least
one non-ghost copy.
Because of the way in which ghosts are created and used, a ghost is used to ensure that a particular non-ghost copy has failed due to a node crash and not to a segment partition. If it is possible to create a ghost then the segment has not been partitioned and only the node has failed. In Available Copies when a replica does not respond it is simply assumed that it is because of node failure, which can result in inconsistencies if the copy was partitioned. When the node on which the non-ghost copy originally resided is re-booted it is possible to convert the ghost copy back into its live version.
Because of the way in which ghosts are created and used, a ghost is used to ensure that a particular non-ghost copy has failed due to a node crash and not to a segment partition. If it is possible to create a ghost then the segment has not been partitioned and only the node has failed. In Available Copies when a replica does not respond it is simply assumed that it is because of node failure, which can result in inconsistencies if the copy was partitioned. When the node on which the non-ghost copy originally resided is re-booted it is possible to convert the ghost copy back into its live version.
Friday, December 22, 2017
Available Copies
In the Available Copies [Bernstein 87] replication protocol, a user of a replicated
service reads from one replica and writes to all available replicas. Prior to the execution of an action, each client determines how many replicas of the service there are available, and
where they are, (this information may be stored in a naming service and is accessed before
each atomic action is performed). Whenever a client detects a failure of a replica it must
update the naming service (name-server) view of the replicated object by performing a
delete operation for the failed copy. All copies of the name-server, if it too is replicated,
must be updated atomically.
When a write operation is performed all copies are written to and they must all reply to
this request within a specified time (it is assumed that it is always possible to communicate
with non-faulty replicas). Locks must be acquired on all of the functioning replicas before
the operations can be performed, and if conflicts between clients occur then some replicas
will not be locked on behalf of a client, and the client will be informed, at which point the
calling action is aborted. Using this locking policy and the serialisability property of the
actions within which operations occur, it is possible to ensure that all replicas execute the operations in identical order.
If all replicas reply to a write operation then the action may continue. However, if only
a subset reply the action must ensure that the silent members have in fact failed, If the
silent replicas subsequently reply then the action must abort and try again (this is because
the states of the replicas may have diverged). However, if the silent copies have actually
failed then the action can still commit since all available copies are in a consistent state.
Whenever a new copy is created (or recovers from a failure) it must be brought up-to-date before the name-server is informed of the recovery (before a client can make use of the replica). When this is done the copy can take requests from clients along with the other members of the group. The updating of recovered replicas can be done automatically if an out-of-date replica intercepts/receives a write request from a current transaction, as has been mentioned previously.
Consider the history of events shown in the diagram below, where T1 and T2 are different transactions operating on two replica groups whose members are xl, x2 and yl, y2. Assume that T1 and T2 are using a "read—one copy, write—all—available copies" scheme and that there are initially two copies of objects x and y which they both wish to access. The execution of events is as shown, with time increasing down the y—axis.
Whenever a new copy is created (or recovers from a failure) it must be brought up-to-date before the name-server is informed of the recovery (before a client can make use of the replica). When this is done the copy can take requests from clients along with the other members of the group. The updating of recovered replicas can be done automatically if an out-of-date replica intercepts/receives a write request from a current transaction, as has been mentioned previously.
Consider the history of events shown in the diagram below, where T1 and T2 are different transactions operating on two replica groups whose members are xl, x2 and yl, y2. Assume that T1 and T2 are using a "read—one copy, write—all—available copies" scheme and that there are initially two copies of objects x and y which they both wish to access. The execution of events is as shown, with time increasing down the y—axis.
If we examine the above history, it is clearly not 1SR i.e., neither the serial execution
T1;T2 nor T2;T1 are consistent with the above history. Thus, the idea of "read—one copy,
write—all—available copies" by itself cannot guarantee 1SR. It is necessary to execute a
validation protocol before the transaction can commit to ensure correctness. In Available
Copies this takes the form of ensuring that every copy that was accessed is still available at
commit time, and every replica that was unavailable is still unavailable, otherwise the
action must abort.
Because of the assumption made by Available Copies that all functional replicas can
always be contacted, this means that this protocol cannot be used in the presence of network partitions. Anode which is partitioned cannot be distinguished from a failed node until it has been reconnected. If the replication protocol assumes that all nodes which are unavailable have failed when in fact some have only been partitioned, inconsistencies can result in the replicas. As such, if partitions can occur then the replication protocol must be sufficiently sophisticated that it can ensure consistent behaviour despite such failures.
Because of the assumption made by Available Copies that all functional replicas can
always be contacted, this means that this protocol cannot be used in the presence of network partitions. Anode which is partitioned cannot be distinguished from a failed node until it has been reconnected. If the replication protocol assumes that all nodes which are unavailable have failed when in fact some have only been partitioned, inconsistencies can result in the replicas. As such, if partitions can occur then the replication protocol must be sufficiently sophisticated that it can ensure consistent behaviour despite such failures.
Data Replication in Atomic Action Systems
Data replication techniques for atomic action systems to maintain one-copy
serialisability (1SR) have been extensively studied (most notably with regard to replicating
databases). When designing a replication protocol it is natural to examine those protocols
(and systems which use them) that already exist, to determine whether they have any
relevance.
Because most replication protocols have been developed for use in database environments it is important to understand the differences between the way in which operations function in a database system and the way in which similar operations would function in an object-oriented environment. These differences are important as they affect the way in which the replication protocols function.
In a database system, which performs operations on data structures, a read operation is typically implemented as a "read entire data structure", and a write operation is in fact a "read entire data structure, update state locally to the invoker, then write entire new data state back". In this way, a single write operation can also update (or re-initialise) the state of an out-of-date data structure.
- Definition: If the effect of a group of atomic actions executing on a replicated object is equivalent to running those same atomic actions on a single copy of the object then the overall execution is said to be 1SR.
- Definition: A replica consistency protocol is one which ensures 1SR.
Because most replication protocols have been developed for use in database environments it is important to understand the differences between the way in which operations function in a database system and the way in which similar operations would function in an object-oriented environment. These differences are important as they affect the way in which the replication protocols function.
In a database system, which performs operations on data structures, a read operation is typically implemented as a "read entire data structure", and a write operation is in fact a "read entire data structure, update state locally to the invoker, then write entire new data state back". In this way, a single write operation can also update (or re-initialise) the state of an out-of-date data structure.
In an object-oriented system, the read operation is typically implemented as "read a
specific data value". Similarly, the write is "perform some operation which will modify the
state of the object". The object simply exports an interface with certain operations through
which it is possible to manipulate the object state. Some of these operations may update
the state of the object, whilst others will simply leave it unchanged. A write operation in
this case may only modify a subset of an object's state, and so cannot be guaranteed to
perform an update as in a database system.
In a database system, the fact that a single write operation can update the entire state of a replica is used in replication protocols such as Available Copies. If these protocols are to be used in an object-oriented system then they will require explicit update protocols.
Finally, in a database system the invoker of a given operation knows whether that operation is state modifying or not i.e., it knows which type of lock will be required. However, in an object-oriented system users of a given object only see the exported interface and see nothing of the implementation, and therefore do not know whether a given operation will modify the state of the object. This difference is important as many of the replication protocols to be described implicitly assume that clients have this type of knowledge (it is used to ensure that read operations can be executed faster than write operations).
In the next few entries we shall examine some of the replication protocols which have been proposed for managing replicated data.
In a database system, the fact that a single write operation can update the entire state of a replica is used in replication protocols such as Available Copies. If these protocols are to be used in an object-oriented system then they will require explicit update protocols.
Finally, in a database system the invoker of a given operation knows whether that operation is state modifying or not i.e., it knows which type of lock will be required. However, in an object-oriented system users of a given object only see the exported interface and see nothing of the implementation, and therefore do not know whether a given operation will modify the state of the object. This difference is important as many of the replication protocols to be described implicitly assume that clients have this type of knowledge (it is used to ensure that read operations can be executed faster than write operations).
In the next few entries we shall examine some of the replication protocols which have been proposed for managing replicated data.
Wednesday, December 20, 2017
Replication and Atomic Actions
Replication can be integrated into atomic actions in two ways: replicas within actions,
and replicated actions. These two methods are substantially different from each other and
yet attempt to solve the same problem: to ensure (with a high degree of probability) that an
action can commit despite a finite number of component failures.
Replicas within Actions
Of the two methods of combining replication with atomic actions, this method is the
most intuitive: each non-replicated object is replaced by a replica group. Whenever an
action issues a request on the object this is translated into a request on the entire replica
group (how the requests are interpreted depends upon the replica consistency protocol).
Failures of replicas within a given replica group are handled by the replication protocol in an effort to mask them until a sufficient number of failures have occurred which makes
masking impossible, and in this case the replica group fails. When an action comes to
commit, the replication protocol dictates the minimum number of replicas which must be
functioning in any given replica group for that group to be considered operational and
allow the action to commit.
Problems which arise from this method have already been outlined in earlier entries. The replication protocol must be able to deal with multiple messages from
both replicated client groups as well as replicated server groups; the majority of
replication protocols assume that replicated objects within the same replica group are
deterministic; problems can occur in maintaining consistency between replicas when local
asynchronous events can occur such as RPC timeouts.
Replicated Actions
The idea behind Replicated Actions [Ahamad 87][Ng 89] is to take an atomic action
written for a non-replicated system and replicate the entire action to achieve availability.
In this scheme the unit of replication is the action along with non-replicated objects used
within it. In the Clouds distributed system [Ahamad 87] the replicated actions are called
PETS (Parallel Execution Threads). By replicating the action N times (creating N PETS)
and hence by also replicating the objects which the action uses, the probability of at least
one action being able to commit successfully is increased. Although the actions and
objects are replicated, each action only ever communicates with one replica from a given
object-replica group and does not know about either the other replicas or the other
actions until it comes to commit. Subsequently, if this replica fails, so too does the action
(as is the case in a non-replicated system).
Because each action only uses one replica in a given group and each replica only receives messages from this one action, the replicas need not be deterministic, and there is no need to devise some protocol for dealing with multiple requests (or replies) from a client (server) group. When the replicated actions commit, it is necessary to ensure that only one action does so (to maintain consistency between the replicated objects). To do this, typically the first action to commit (the fastest) as part of its commit protocol copies the states of the objects it has used to those replicas it did not use (i.e. the replicas used by the other actions) and causes the other actions to abort. This is done atomically, so that if this atomic action should fail then the next replicated action which commits will proceed as though it had finished first.
However, the scheme also suffers from the problems inherent with a passive
replication scheme: checkpointing of state across a network can be a time consuming,
expensive operation. If we assume failures to be rare (as would be the case) then this
scheme will cause large numbers of actions to abort. The action which commits first will
overwrite the states of the other replicas, effectively removing the knowledge that the
other actions ever ran. In some applications it may prove more of an overhead to checkpoint the state of one action in this way rather than allowing all functioning actions
simply to commit.
Because each action only uses one replica in a given group and each replica only receives messages from this one action, the replicas need not be deterministic, and there is no need to devise some protocol for dealing with multiple requests (or replies) from a client (server) group. When the replicated actions commit, it is necessary to ensure that only one action does so (to maintain consistency between the replicated objects). To do this, typically the first action to commit (the fastest) as part of its commit protocol copies the states of the objects it has used to those replicas it did not use (i.e. the replicas used by the other actions) and causes the other actions to abort. This is done atomically, so that if this atomic action should fail then the next replicated action which commits will proceed as though it had finished first.
The figure above shows the state of the objects A, B, C, and D which are replicated three
times, just as the replicated actions, which are also replicated three times, α, β, and ε, begin commit processing. The execution paths of these actions is indicated by the lines.
When actions α and β come to commit they will be unable to do so because the object D (which α used) and C (which β used) have failed, making commit impossible. However, action ε will be able to commit, and will then update the states of all remaining functioning replicas. Failed replicas must be updated before they can be reused.
Obviously the choice of which replicas the actions should use is important e.g. in
In the figure, if action ε had used the same copy of object D as action then the failure of this object would have caused the two actions to have failed instead of one. In some systems [Ahamad 87] the choice of which replica a particular action uses is made randomly because they make the assumption that with a sufficient number of object replicas the chances of two actions using the same copy are small. However, in [Ng 89] they propose a different approach by using information about the nature of the distributed system (reliability of nodes, probability of network partitions occurring) to come up with an optimal route for each action to take when using replicated objects. This route attempts to minimize the object overlap between replicated actions and minimize the number of different nodes that an action visits during the course of its execution.
The advantage of using replicated actions as opposed to using replicas within an action are the same advantages obtained from using a passive replication protocol as opposed to using an active replication protocol: there is no need to ensure that all copies of the same object are deterministic since the action which commits will impose its view of the state of the object on all other copies, and each replica will receive a reduced number of repeated requests from replicated clients (reduced to only one message if each action makes use of a different replica).
times, just as the replicated actions, which are also replicated three times, α, β, and ε, begin commit processing. The execution paths of these actions is indicated by the lines.
When actions α and β come to commit they will be unable to do so because the object D (which α used) and C (which β used) have failed, making commit impossible. However, action ε will be able to commit, and will then update the states of all remaining functioning replicas. Failed replicas must be updated before they can be reused.
Obviously the choice of which replicas the actions should use is important e.g. in
In the figure, if action ε had used the same copy of object D as action then the failure of this object would have caused the two actions to have failed instead of one. In some systems [Ahamad 87] the choice of which replica a particular action uses is made randomly because they make the assumption that with a sufficient number of object replicas the chances of two actions using the same copy are small. However, in [Ng 89] they propose a different approach by using information about the nature of the distributed system (reliability of nodes, probability of network partitions occurring) to come up with an optimal route for each action to take when using replicated objects. This route attempts to minimize the object overlap between replicated actions and minimize the number of different nodes that an action visits during the course of its execution.
The advantage of using replicated actions as opposed to using replicas within an action are the same advantages obtained from using a passive replication protocol as opposed to using an active replication protocol: there is no need to ensure that all copies of the same object are deterministic since the action which commits will impose its view of the state of the object on all other copies, and each replica will receive a reduced number of repeated requests from replicated clients (reduced to only one message if each action makes use of a different replica).
Monday, December 18, 2017
An overview of some existing distributed systems (of the time)
Existing distributed systems which make use of multicast communication can be
categorised as using either one—to—many communication (one client interacting with a
replica group) or many—to—many communication (replica groups interacting with replica
groups). We shall now look at a number of the popular protocols and show how they have approached the problems of maintaining replica consistency to both external and internal
events.
One-to-Many Communication
The V System
The V System [Cheriton 84][Cheriton 85] makes use of what they term one-to-many
communication transactions, where the start of a transaction marks the end of the previous
transaction (any outstanding messages associated with the old transaction are destroyed
without being processed). Each group member has a finite message buffer into which
replies are queued provided they arrive before the start of the next send transaction. Although a reliable communication protocol is not used, the assumption is made that by
making use of retransmissions is it possible to ensure (with a high probability) that
messages are delivered if the sender and receiver remain operational. A further
assumption is that at least one operational group member receives and replies to a
message.
The designers of the V System recognized the possibility of message loss due to message buffer overflow, and their solution was to make use of larger buffers and to modify the kernel to introduce a random delay when replying to a group send, thereby reducing the number of messages in a queue at any time. This imposes a consistent overhead on all group replies but still does not ensure that buffer overflow will never occur (if the number of different groups interacting with each other increases then even with this delay it is still probable that the finite sized message queues will overflow).
When servers reply to a client they do so on a many-to--one basis i.e., all servers which received a request from a particular client will attempt to reply to that client (and not to any replica group the client may be a member of). This means that there is the possibility of members of the same group taking different actions as a result of local events such as timeouts (resulting in state divergence), because the servers replied to some, but not all, clients on time.
It was recognized that messages might be lost by processes because of message buffer
overflow, but in Andrew it is assumed that such loses will be detected by retries. This
implicitly relies on the servers keeping the last transmitted replies to every client, but
could still result in state divergence if, for example, a server group is seen to have failed by
one replicated client when it retries a request, when in fact all other members of the client
group received the reply before the failure.
Despite the assumptions made by the designers, both the problem of message buffer
overflow, and local timeouts, can occur in the Circus system, leading to replica state
divergence.
The designers of the V System recognized the possibility of message loss due to message buffer overflow, and their solution was to make use of larger buffers and to modify the kernel to introduce a random delay when replying to a group send, thereby reducing the number of messages in a queue at any time. This imposes a consistent overhead on all group replies but still does not ensure that buffer overflow will never occur (if the number of different groups interacting with each other increases then even with this delay it is still probable that the finite sized message queues will overflow).
When servers reply to a client they do so on a many-to--one basis i.e., all servers which received a request from a particular client will attempt to reply to that client (and not to any replica group the client may be a member of). This means that there is the possibility of members of the same group taking different actions as a result of local events such as timeouts (resulting in state divergence), because the servers replied to some, but not all, clients on time.
The Andrew System
The Andrew System [Satyanarayanan 90] assumes that the communication system is
reliable in much the same manner as the V System i.e., given a sufficient number of retries
then any operational server can always be contacted. Communication is one-to-many and
the termination conditions for a particular client-server interaction is set on a per call
basis (depending on the application). The designers recognized that calls between remote
processes can take an indeterminate amount of time and so if a client times out it sends an
"are you alive" message to any server replica which has not responded and if a reply is
returned then it continues to wait. However, because servers reply on a many-to-one
basis, state divergence between replicated clients can still occur: if only one client receives a reply sent before all servers fail, then the replicated clients may take different actions.
Many-to-Many Communication
The Circus System
The Circus System [Cooper 84b] (and Multi-Functions [Banatre 86b] in Gothic [Banatre 86a]) makes use of many-to-many communication, with both client and server groups. As with the V System they assume that with a sufficient number of transmission retries it is possible to deliver messages to all operational members of a group. Thus, if a call eventually times out (after retrying a sufficient number of times) a sender can assume that the receiver(s) has failed. Each member of a group has a message queue which is assumed to be infinite in size (if this assumption cannot be made then the designers of Circus assume that retransmissions of requests and replies can account for losses due to buffer overflow).
Throughout the description of Circus the assumption is made that the members of a
group are detenninistic (given the same set of messages in the same order and the same
starting state then they will all arrive at the same next state). Client groups' members
operate as though they were not replicated and do not know that they are members of a
replica group. No mention is made of how this determinism of group members extends to
asynchronous local events such as timeouts.
rel/REL
Following on from the last entry I did the initial implementation of red/REL as part of my PhD and that document includes an overview of the protocol as well as some performance figures and proposed optimisations to the protocol. For now I won't cover that here as there's a good paper in the previous reference to read, or the PhD of course. However, maybe I'll come back to this later.
Labels:
atomic multicast,
multicast,
rel/REL,
reliable delivery
Sunday, December 17, 2017
Multicasts and Replication
In the previous entries we have discussed what delivery properties can be provided by
the communication subsystem, and how they can be implemented. We shall now discuss
how such communications primitives can be used in a replication scheme to aid in
maintaining replica consistency.
Group invocations can be implemented as replicated RPCs by replacing the one-to-one communication of send_request and send_result in the figure below (which we saw in an earlier entry too) with one-to-many communication.

Every non-faulty member of a client group sends the request message to every non-faulty member of the server group, which in turn send a reply back. If multiple client groups invoke methods on the same replicated server group then it must be ensured that concurrent invocations are executed in an identical order at all of the correctly functioning replicas, otherwise the states of the replicas may diverge. In order to ensure this property, the objects must not only be deterministic in nature, but all correctly functioning replicas must receive the same sets of messages in the same order i.e., a totally ordered multicast must be employed.
The State Machine conditions Cl and C2 can both be met by making use of totally ordered multicasts to deliver all messages transmitted by clients and servers. However, such total ordering of messages may be unnecessary for all interactions: if two, non-conflicting, non-related messages are received at members of the same replica group (e.g., two unrelated electronic mail messages from different users) then they need not be ordered consistently at these destinations. If they were related in some manner (e.g., from the same user) then they could be ordered consistently. Application level ordering can be achieved more efficiently as cheaper, reliable broadcast protocols can be used to deliver messages for which ordering is unimportant, resorting to the more complex order preserving protocols only where necessary. Since such protocols typically need more rounds of messages to be transmitted, the reduction in their use can be beneficial to the system as a whole, whilst maintaining overall replica consistency.
One method of achieving such application level ordering would be to transmit messages using unordered atomic multicasts (since it is still important that the messages are received by all functioning replicas) which only guarantee delivery to all functioning replicas but make no guarantee of the order (described in a previous entry), and then to impose ordering on top of this i.e., at a level above the communication layer. If atomic actions are used in the system then we can make use of their properties to impose the ordering on message execution that we require i.e., the order shall be equivalent to the serialization order imposed by atomic actions. This has the advantage that operations from different clients which do not conflict (e.g., multiple read operations) can be executed in a different order at each replica. Atomic actions will ensure that multiple accesses from different clients to the same resource will be allowed only if such interaction is serialisable.
Group invocations can be implemented as replicated RPCs by replacing the one-to-one communication of send_request and send_result in the figure below (which we saw in an earlier entry too) with one-to-many communication.

Every non-faulty member of a client group sends the request message to every non-faulty member of the server group, which in turn send a reply back. If multiple client groups invoke methods on the same replicated server group then it must be ensured that concurrent invocations are executed in an identical order at all of the correctly functioning replicas, otherwise the states of the replicas may diverge. In order to ensure this property, the objects must not only be deterministic in nature, but all correctly functioning replicas must receive the same sets of messages in the same order i.e., a totally ordered multicast must be employed.
The State Machine conditions Cl and C2 can both be met by making use of totally ordered multicasts to deliver all messages transmitted by clients and servers. However, such total ordering of messages may be unnecessary for all interactions: if two, non-conflicting, non-related messages are received at members of the same replica group (e.g., two unrelated electronic mail messages from different users) then they need not be ordered consistently at these destinations. If they were related in some manner (e.g., from the same user) then they could be ordered consistently. Application level ordering can be achieved more efficiently as cheaper, reliable broadcast protocols can be used to deliver messages for which ordering is unimportant, resorting to the more complex order preserving protocols only where necessary. Since such protocols typically need more rounds of messages to be transmitted, the reduction in their use can be beneficial to the system as a whole, whilst maintaining overall replica consistency.
One method of achieving such application level ordering would be to transmit messages using unordered atomic multicasts (since it is still important that the messages are received by all functioning replicas) which only guarantee delivery to all functioning replicas but make no guarantee of the order (described in a previous entry), and then to impose ordering on top of this i.e., at a level above the communication layer. If atomic actions are used in the system then we can make use of their properties to impose the ordering on message execution that we require i.e., the order shall be equivalent to the serialization order imposed by atomic actions. This has the advantage that operations from different clients which do not conflict (e.g., multiple read operations) can be executed in a different order at each replica. Atomic actions will ensure that multiple accesses from different clients to the same resource will be allowed only if such interaction is serialisable.
Failed on historical blogging
Where has the time gone?! Around a year ago I said I would blog a bit from my PhD and though I did do a few entries I haven't been able to do anything since January! Looking back it's a combination of work, family commitments and writing a book with some of my friends, but I didn't realise they'd soaked up so much of my time. Oh well, there are a few days left in 2017 so let me see what I can do to try to catch up a bit.
Friday, January 06, 2017
Psync
Following on from the last entry, we shall now consider one implementation of a communication subsystem which provides some of the delivery properties described previously. It is important to understand how these ordering requirements can be met, and the overhead which is involved in guaranteeing them, before we discuss how such communication primitives can be used to provide replicated object groups. Other reliable communication subsystems exist, of which [Chang 84][Cristian 85][Cristian 90][Verissimo 89] are a sample, but we shall consider Psync because it illustrates many points clearly.
Psync
Psync [Peterson 87][Mishra 89] ("pseudosynchronous") is a communication subsystem designed to provide reliable multicast communication between objects, and is based on the message history approach described above. The system assumes that operations on objects which change the state occur atomically and are idempotent. Associated with each object is a manager process. A client process locates a particular manager (perhaps by consulting a naming service) and then invokes operations on the object by sending requests to that manager. When a manager receives a request to invoke a particular operation on an object, it encapsulates the operation in a message and uses the Psync many-to-many communications protocol to forward the message to all of the managers involved (including itself) if the object is a member of a group. Based on the set of received messages, each manager can then decide on an order in which to apply the operations to its
copy of the object. This protocol can be extended to be used for the interactions of replicated object groups, and the exact details of the replication protocol used in Psync will be described in a later posting.
Conversations and Context Graphs
Psync explicitly preserves the partial ordering of messages exchanged among a collection of processes in the presence of communication and processor failures (Psync cannot function in the presence of network partitions). A collection of processes exchange messages through a conversation abstraction. This conversation is defined by a directed acyclic graph (a context graph) that preserves the partial order of the exchanged messages. This ordering is made available to all managers involved in a conversation and by using this they can determine when to execute operations on their local objects.
When processes communicate they do so by sending messages in the context of those messages they have already received. Participants are able to receive all messages sent by the other participants in the conversation but they do not receive the messages that they themselves send. Each participant in a conversation has a view of the context graph that corresponds to those messages it has sent or received. The semantics of the communications primitives provided by Psync are defined in terms of the context graph and a participant's view.
The figure below shows an example of a context graph. This conversation is started with the initial message m1. Messages m2 and m3 were sent by processes that had received m1, but are independent of each other (hence no link between them), and m4 was sent by a process that had received m1 and m3, but not m2. Messages that are not in the context of some other message are said to be concurrent (occur at the same logical time). The relation < is defined to be "in the context of".
The context graph contains information about which processes have received what messages. The receipt of a message implies that the sender has seen all of the messages which came before it in the context graph. A message mp sent by process p is said to be stable if for each participant in the conversation q ≠ p, there exists vertex mq in the context graph sent by q, such that m < mq. For a message to be stable means that all participants except the sender have received it, it must follow that all future messages sent to the conversation must be in the context of the stable message.
In the figure below, we have a context graph which depicts a conversation between three participants, a, b, and c. Messages al, a2, ... denotes the sequence of messages sent by process a, and so on. Message al, b1, and c1 are the only stable messages; messages a2 and a3 are two unstable messages sent by process a.
Psync maintains a copy of a conversation's context graph at each host on which a participant in the conversation resides. Each process in the conversation receives messages from this local copy of the context graph, which is termed the image. Whenever a process at one host sends a message, Psync propagates a copy of the message to each of the hosts in the conversation. This message contains information about all messages upon which the new message depends, so that the receiving hosts can append it to their context graphs in the correct place.
Dealing with Network and Host Failures
Suppose message m is not delivered to some host because of a network failure. If at some future time a message m' arrives that depends on m then the host will detect that it is missing m and will send a retransmission request form to the host that sent m', (this host is guaranteed to have m as a local participant has just sent a message which is in the context of it).
The operations provided to aid applications in recovering from host failures include the ability for a participant to remove a failed participant from its definition of the participant set for a conversation. This is necessary so that messages will eventually stabilize relative to the functioning participants. Once a given participant has been masked out, Psync ignores all further messages from that process.
There is also an inverse operation that allows a participant to rejoin a participant set. It would be invoked when a participant becomes aware that another participant which was formally failed has now recovered.
When a host recovers, a participant can initiate a recovery action which will inform other participants that the invoking participant has restarted, and to initiate reconstruction of the local image of the context graph. Each member of the conversation will transmit its local copy of the context graph to the recovering participant who can then use this to reconstruct its own local copy.
Total Ordering
As described, the Psync protocol only gives a partial ordering of messages i.e., only the causal ordering of messages is preserved. To convert a partial order into a total order, whereby messages which are not causally related are ordered identically at all overlapping destinations, requires additional information to be shared amongst the destinations which indicates the order in which to place such messages. In Psync, the context graph which accompanies each message provides this information. The partial order that Psync provides can be used to give a total order if all participants perform the same topological sort of the context graph. This sort must be incremental i.e., each process waits for a portion of its view to be stabilized before allowing the sort to proceed. This is done to ensure that no future messages sent to the conversation will invalidate the total ordering. The replication protocol used in Psync uses just such a scheme and will be described in a later article.
Tuesday, January 03, 2017
Multicasts and Latency
The next few entries in the series will consider some aspects of multicast protocols.
As described in [Shrivastava 90a], the latency of a multicast service is defined to be the time taken for a message, once sent, to reach the destination processes. This latency is particularly important for protocols providing reliability and ordering guarantees. As we shall see, whereas the latency for an unreliable multicast service is bounded (typically of the order of a few milliseconds), the latency for a multicast service which operates in the presence of failures (message losses and node crashes) can be bounded or unbounded depending upon the implementation.
Existing order preserving protocols can be broadly classified in the following way:
- message history based: the main idea behind such protocols is that when a process sends a message it appends some historical information about the messages it has received in its recent past. This historical information enables the receivers to retrieve any missing messages and to order them properly. This type of protocol ensures that an incomplete multicast is eventually completed, and hence possesses an unbounded latency property.
- centralised distributors: here the sender delivers the message to a specific member of the group (the primary) who is then responsible for distributing the message to the fellow members of the group. The primary can assign a total order to the messages it receives. As we have already seen, failure detection mechanisms are necessary to detect failed primaries and to elect new primaries which can take over and complete the multicasts. Such protocols can possess bounded latency, but the necessity to detect asynchronously occurring failures can impose an overhead on performance.
- multi-phase commit: these protocols, providing total order, use multiphase algorithms (typically 2 or 3 message rounds) which are similar to the two-phase algorithm described earlier for atomic action commits. The sender delivers the message to the destinations which return sufficient information to the sender about the messages that they have received so that in the subsequent rounds of the protocol the sender can impose an identical order of the message onto the destinations. The message is only considered to have been delivered if all of the phases of the protocol complete. Such protocols provide bounded latency.
- clock-based: these protocols are an important class of the multi-phase algorithms, and assume the existence of a global time base. Timestamps derived from such a time base can then be used for imposing a total order on messages. Such protocols can provide constant latency communication, having the attractive property that if a sender multicasts a message at clock time T, then it can be sure that all functioning receivers will have received the message by clock time T + ∆, where ∆ is the constant indicating the protocol latency (∆ must be determined by applying worst case timing and failure assumptions).
Monday, January 02, 2017
A Java EE Interlude
I want to take a quick break from the other series for a moment to do something I've been remiss about for a while: addressing a recent report by Gartner on the state of Java EE. Before doing so I decided the other day to read the article my friend and colleague John Clingan made a few weeks ago and I realised that John had done a great job already. In fact so good I couldn't see myself adding much to it, but I'll try below. Note, there are other responses to this report and I haven't had a chance to read them but they might be just as good.
To start with, it's worth noting that I've known Anne for a number of years and we've had one or two disagreements during that time but also some agreements. Her announcements about the death of SOA seven years or so ago falls somewhere in between, for example. It's also important to realise, if you don't already, that I do believe everything has a natural cycle ("the circle of life") and nothing lasts forever; whether it's dinosaurs giving way to mammals (with some help from a meteor and/or the Deccan Traps), or CORBA shifting aside for J2EE, evolution is a fact of life. Therefore, whilst I disagree with Anne about the short-to-medium term future of Java EE, longer term it will pass into history. However, before doing so it will evolve and continue to influence the next generation of technologies, just as the dinosaurs became the birds and aspects of CORBA evolved into J2EE. For more details on my thinking on this topic over the years I leave it as an exercise to the reader to check this blog or my JBoss blog.
John covers my next point very well but it's worth stressing: I've mentioned on many occasions that my background is heavily scientific and I continue to try to employ objective scientific method to everything I do. So when I see blanket statements like "fade in relevance" or "lighter-weight approaches", whether it's in a Gartner report, a paper I'm reviewing as a PC member, or even something one of my team is writing, I immediately push back if there's no supporting evidence. And I read this report several times searching for it but kept coming up empty handed. It was one subjective statement after another, with no real attempt to justify them. That's disappointing because with my scientific hat on I love to see facts backing up theories, especially if those theories contradict my own. That's how we learn.
One other thing I did get from the report, and I'm prepared to admit this may be my own subjective reading, was that it seemed like a vague attempt to look into the future for middleware frameworks and stacks without really understanding what has brought us to where we are today. I've written about this many times before but weak assertions by the report that "modern applications" are somehow so different from those we've developed for the last 50 years that existing approaches such as Java EE are not useful really doesn't make any sense if you bother to think about it objectively. Distributed applications, services and components need to communicate with each other, which means some form of messaging (hopefully reliable, scalable and something which performs); there's really no such thing as a stateless application, so you'll need to save data somewhere (again, reliable, scalable and performant); hey, maybe you also want some consistency between applications or copies of data, so transactions of one form or another might be of help. And the list goes on.
Of course application requirements change over time (e.g., I recall doing research back in the 1980's where scale was measured in the 10's of machines), but new applications and architectures don't suddenly spring into being and throw away all previous requirements; it's evolution too though. I've presented my thoughts on this over the past couple of years at various conferences. In some ways you can consider Java EE a convenient packaging mechanism for these core services, which are typically deployed in a co-located (single process) manner. Yet if you look beyond the Java EE "veneer" you can still see the influence of enterprise distributed systems that predate it and also the similarities with where some next generation efforts are heading.
I suppose another key point which John also made well was that the report fails miserably to differentiate between Java EE as a standard and the various implementations. I'm not going to cover the entire spectrum of problems I have with this particular failure, but of course lightweight is one of them. Well over 5 years ago I wrote about the things we'd done years previously to make AS7 run on a plug computer or Android phone and we've kept up that level of innovation up throughout the intervening time. There's nothing in the Java EE standard which says you have to build a bloated, heavyweight application server and we, as well as other implementations, have proven that time and time again. It's disappointing that none of this is reflected in the report. I suppose it's one of those terms the authors hope that if they say it enough, people will attribute their own subjective opinions and assumptions to it.
Another one of those throw-away statements the report is good at is that "[...] at this point, Java developers eschew Java EE". I admit not every Java developer wants to use Java EE; not every Java developer wanted to use J2EE either. But you know what? Many Java developers do like Java EE and do want to use it. You only have to go to conferences like JavaOne, Devoxx or other popular Java events to find them in abundance. Or come and talk to some of our customers or those of IBM, Payara, Tomitribe or other members of the MicroProfile effort.
I could go on and on with this but the entry has already grown larger than I expected. John did a great job with his response and you should go read that for an objective analysis. Probably the only positive thing I could attribute to the original Gartner report is that its very existence probably proves that time travel is possible! It's a theory which fits the facts: a report which criticises something based on data which is largely a decade old means it was probably written a decade ago.
To start with, it's worth noting that I've known Anne for a number of years and we've had one or two disagreements during that time but also some agreements. Her announcements about the death of SOA seven years or so ago falls somewhere in between, for example. It's also important to realise, if you don't already, that I do believe everything has a natural cycle ("the circle of life") and nothing lasts forever; whether it's dinosaurs giving way to mammals (with some help from a meteor and/or the Deccan Traps), or CORBA shifting aside for J2EE, evolution is a fact of life. Therefore, whilst I disagree with Anne about the short-to-medium term future of Java EE, longer term it will pass into history. However, before doing so it will evolve and continue to influence the next generation of technologies, just as the dinosaurs became the birds and aspects of CORBA evolved into J2EE. For more details on my thinking on this topic over the years I leave it as an exercise to the reader to check this blog or my JBoss blog.
John covers my next point very well but it's worth stressing: I've mentioned on many occasions that my background is heavily scientific and I continue to try to employ objective scientific method to everything I do. So when I see blanket statements like "fade in relevance" or "lighter-weight approaches", whether it's in a Gartner report, a paper I'm reviewing as a PC member, or even something one of my team is writing, I immediately push back if there's no supporting evidence. And I read this report several times searching for it but kept coming up empty handed. It was one subjective statement after another, with no real attempt to justify them. That's disappointing because with my scientific hat on I love to see facts backing up theories, especially if those theories contradict my own. That's how we learn.
One other thing I did get from the report, and I'm prepared to admit this may be my own subjective reading, was that it seemed like a vague attempt to look into the future for middleware frameworks and stacks without really understanding what has brought us to where we are today. I've written about this many times before but weak assertions by the report that "modern applications" are somehow so different from those we've developed for the last 50 years that existing approaches such as Java EE are not useful really doesn't make any sense if you bother to think about it objectively. Distributed applications, services and components need to communicate with each other, which means some form of messaging (hopefully reliable, scalable and something which performs); there's really no such thing as a stateless application, so you'll need to save data somewhere (again, reliable, scalable and performant); hey, maybe you also want some consistency between applications or copies of data, so transactions of one form or another might be of help. And the list goes on.
Of course application requirements change over time (e.g., I recall doing research back in the 1980's where scale was measured in the 10's of machines), but new applications and architectures don't suddenly spring into being and throw away all previous requirements; it's evolution too though. I've presented my thoughts on this over the past couple of years at various conferences. In some ways you can consider Java EE a convenient packaging mechanism for these core services, which are typically deployed in a co-located (single process) manner. Yet if you look beyond the Java EE "veneer" you can still see the influence of enterprise distributed systems that predate it and also the similarities with where some next generation efforts are heading.
I suppose another key point which John also made well was that the report fails miserably to differentiate between Java EE as a standard and the various implementations. I'm not going to cover the entire spectrum of problems I have with this particular failure, but of course lightweight is one of them. Well over 5 years ago I wrote about the things we'd done years previously to make AS7 run on a plug computer or Android phone and we've kept up that level of innovation up throughout the intervening time. There's nothing in the Java EE standard which says you have to build a bloated, heavyweight application server and we, as well as other implementations, have proven that time and time again. It's disappointing that none of this is reflected in the report. I suppose it's one of those terms the authors hope that if they say it enough, people will attribute their own subjective opinions and assumptions to it.
Another one of those throw-away statements the report is good at is that "[...] at this point, Java developers eschew Java EE". I admit not every Java developer wants to use Java EE; not every Java developer wanted to use J2EE either. But you know what? Many Java developers do like Java EE and do want to use it. You only have to go to conferences like JavaOne, Devoxx or other popular Java events to find them in abundance. Or come and talk to some of our customers or those of IBM, Payara, Tomitribe or other members of the MicroProfile effort.
I could go on and on with this but the entry has already grown larger than I expected. John did a great job with his response and you should go read that for an objective analysis. Probably the only positive thing I could attribute to the original Gartner report is that its very existence probably proves that time travel is possible! It's a theory which fits the facts: a report which criticises something based on data which is largely a decade old means it was probably written a decade ago.
Labels:
cloud native,
frameworks,
Java EE,
MicroProfile,
microservices,
WildFly,
WildFly Swarm
Sunday, January 01, 2017
Remote Object Invocation
The next in our series ...
Invocations on objects which are not replicated are traditionally based on the RPC as this retains the correct semantics of a procedure call i.e., a single flow (thread) of control from caller to callee and back again (as with a traditional procedure call). The previous entry described the concept of the Remote Procedure Call, and the simplified model of client-server interaction shown in the figure below will be assumed for the discussion to follow: a client uses the primitives sendjequest() for sending a call request and receive_result() for receiving the corresponding results. Clients and servers maintain enough state information to recognize and discard duplicate messages (filter requests). The server maintains a queue of messages from possibly multiple clients, and uses the primitive receive_re quest() to pick a message from the queue in a fifo order. After invoking the right method, the result is sent to the client with the send_result() primitive.
Invocations on objects which are not replicated are traditionally based on the RPC as this retains the correct semantics of a procedure call i.e., a single flow (thread) of control from caller to callee and back again (as with a traditional procedure call). The previous entry described the concept of the Remote Procedure Call, and the simplified model of client-server interaction shown in the figure below will be assumed for the discussion to follow: a client uses the primitives sendjequest() for sending a call request and receive_result() for receiving the corresponding results. Clients and servers maintain enough state information to recognize and discard duplicate messages (filter requests). The server maintains a queue of messages from possibly multiple clients, and uses the primitive receive_re quest() to pick a message from the queue in a fifo order. After invoking the right method, the result is sent to the client with the send_result() primitive.
When making replicated invocations (such as when calling a replica group) the semantics of such communication differ considerably from that of the traditional RPC: there is no longer a single thread of control, but rather multiple threads which may all eventually return to the caller. Such invocations are typically referred to as Replicated Procedure Calls [Cooper 84a][Cooper 85], and can be implemented using one—to—many (or multicast) communication facilities. We discuss various aspects of multicast communication below.
One-to-Many Communication
The main services a multicast protocol provides can be categorised into three classes: ordering, reliability and latency. By imposing (increasing) ordering and reliability constraints on the delivery of multicast messages it is possible to define increasingly sophisticated protocols (typically at the expense of the latency). To understand these protocols first assume that a sender S is attempting to multicast to a group G = {P1,...,Pn}. Following the definitions outlined in [Shrivastava 90b][ANSA 90]:
Unordered and Unreliable
A multicast from S will be received by a subset of functioning nodes Pi ∈ G. Successive multicasts from S will be received in an arbitrary order at the destinations. The next figure shows sender S multicasting messages m1 and m2 to the group G. The first message is received by P2 and Pn in different orders, and message m2 is not received by P1
FIFO Multicast
Provided the sender does not crash while transmitting the message, all correctly functioning receivers are guaranteed to get the message. Furthermore, the multicasts will be received in the order they were made.
The next figure shows two senders (S 1 and S2) multicasting to the group G. All members of G received m1 before m2, but some members may receive m3 before m2. This last ordering is correct given the definition of the protocol: no information about the relative ordering of multicasts between senders is available to the receivers.
Atomic multicast
If the sender does not crash before completing a multicast, the message is guaranteed to be received by all functioning members. If, however, the sender crashes during a multicast, then it is guaranteed that the message is received by either all or none of the functioning processes (atomic deliveiy). All multicasts from the same sender are received in the order they were made.
Causal multicast
This multicast extends the ordering property of the Atomic multicast to causally related sends from different senders while still meeting the reliability guarantee. [Lamport 78] was the first to introduce the concept of potential causal relationships into computer interactions and showed what effects these relationships can have on the operations of interacting processes. Two events are potentially causally related if information from the first event could have reached the second event before it occurred. The notation used to denote such relationships is typically X → Y, where → means precedes (happened before). Note that if X and Y are events from the same process and Y follows X then Y is necessarily causally related to X. A causal communication system will only preserve an ordering of events if the order is causally related. If two events are not related in this way then there is no guarantee on the delivery order.
In the figure above S1 is multicasting the groups G 1and G2, P1is multicasting to group G1. G1={P2, P3} and G2={P1, P4}.Thereisapotentialflowofinformationfromsend(m1,Gi) to send(m2,G2), and from send(m 2,0 2) to send(m3,G1). This means that the sending of m3 by Pi is potentially causally related to the sending of m1 by S1. Hence the causal multicast protocol must ensure that all functioning members of G 1receive m1 before m3. Events such as m3 and m4 which are not causally related can be received in any order (they are termed concurrent).
Totally ordered multicast
The (partial) causal order can be extended to a total order of messages such that all messages (whether causally related or not) are received by all destinations in the same order (which must also preserve causality).
Saturday, December 31, 2016
RPCs, groups and multicast
This is first entry in the series I mentioned earlier. I've tried to replace the references with links to the actual papers or PhD theses where possible, but some are not available online.
Remote Procedure Call
The idea behind the Remote Procedure Call (RPC) [Birrell 84] is the fact that conventional procedure calls are well known and are a well understood mechanism for the transfer of data and control within a program running on a single processor. When a remote procedure is invoked, the calling process is suspended, any parameters are passed across the network to the node where the server resides, and then the desired procedure is executed. When the procedure finishes, any results are passed back to the calling process, where execution resumes as if returning from a local procedure call. Thus the RPC provides the system or application programmer a level of abstraction above the underlying message stream. Instead of sending and receiving messages, the programmer invokes remote procedures and receives return values.
The figure shows a client and server interacting via a Remote Procedure Call interface. When the client makes the call it is suspended until the server has sent a reply. To prevent the sender being suspended indefinitely the call can have a timeout value associated with it: after this time limit has elapsed the call could be retried or the sender could decide that the receiver has failed. Another method, which does not make use of timeouts in the manner described, instead relies on the sender and receiver transmitting additional probe messages which indicate that they are alive. As long as these messages are acknowledged then the original call can continue to be processed and the sender will continue to wait.
Groups
[ANSA 90][ANSA 91a][Liang 90][Olsen 91] describe the general role of groups in a distributed system. Groups provide a convenient and natural way to structure applications into a set of members cooperating to provide a service. They can be used as a transparent way of providing fault tolerance using replication, and also as a way of dividing up a task to exploit parallelism.
A group is a composite of objects sharing common application semantics as well as the same group identifier (address). Each group is viewed as a single logical entity, without exposing its internal structure and interactions to users. If a user cannot distinguish the interaction with a group from the interaction with a single member of that group, then the group is said to be fully transparent.
Objects are generally grouped together for several reasons: abstracting the common characteristics of group members and the services they provide; encapsulating the internal state and hiding interactions among group members from the clients so as to provide a uniform interface (group interface) to the external world; using groups as building blocks to construct larger system objects. A group may be composed of many objects (which may themselves be groups), but users of the group see only the single group interface. [ANSA 90] refers to such a group as an Interface Group.
An object group is defined to be a collection of objects which are grouped together to provide a service (the notion of an abstract component) and accessible only through the group interface. An object group is composed of one or more group members whose individual object interfaces must conform to that of the group.
Interfaces are types, so that if an interfacex has typeXand an interfacey has type Y, andX conforms to Y, thenx can be used wherey is used. This type conformance criteria is similar to that in Emerald [Black 86]. In the rest of this thesis, we shall assume for simplicity that a given object group is composed of objects which possess identical interfaces (although their internal implementations could be different).
The object group concept allows a service to be distributed transparently among a set of objects. Such a group could then be used to support replication to improve reliability of service (a replica group), or the objects could exploit parallelism by dividing tasks into parallel activities. Without the notion of the object group and the group interface through which all interactions take place, users of the group would have to implement their own protocols to ensure that interactions with the group members occur consistently e.g., to guarantee that each group member sees the same set of update requests.
By examining the different ways in which groups are required by different applications, it is possible to define certain requirements which are imposed on groups and the users of groups (e.g., whether collation of results is necessary from a group used for reliability purposes). [ANSA 9 la] discusses the logical components which constitute a generic group, some of which may not be required by every group for every application. These components are:
- an arbiter, which controls the order in which messages are seen by group members.
- a distributor/collator, which collates messages going out of the group, and distributes messages coming into the group.
- member servers, which are the actual group members to which invocations are directed.
For some applications collation may not be necessary e.g., if it can be guaranteed that all members of a group will always respond with the same result. As we shall see later, if the communication primitives can guarantee certain delivery properties for messages, then arbitration may also not be necessary. In general, all of these components constitute a group. In the rest of this thesis the logical components will not be mentioned explicitly, and the term group member will be used to mean a combination of these components.
Multicast Communication
Conventional RPC communication is a unicast call since it involves one—to—one interaction between a single client and a single server. However, when considering replication it is more natural to consider interactions with replica groups. Group communication is an access transparent way to communicate with the members of such a group. Such group communication is termed multicasting [Cheriton 85][Hughes 86].
Multicast communication schemes allow a client to send a message to multiple receivers simultaneously. The receivers are members of a group which the sender specifies as the destination of the message. A broadcast is the general case of a multicast whereby instead of speci!ring a subset of the receivers in the system every receiver is sent a copy.
Most multicast communication mechanisms are unreliable as they do not guarantee that delivery of a given message will occur even if the receiver is functioning correctly (e.g., the underlying communication medium could lose a message). When considering the interaction of client and replica group (or even replica group to replica group communication) such unreliable delivery can cause problems in maintaining consistency of state between the individual replicas, complicating the replication control protocol (if one replica fails to receive a given state-modifying request but continues to receive and respond to other requests, this resulting state divergence could result in inconsistencies at the clients). Thus, it is natural to consider such group-to-group communication to be carried out using reliable multicasts, which give certain guarantees about delivery in the presence of failures. These can include the guarantee that if a receiver is operational then the message will be delivered even if the sender fails during transmission, and that the only reason a destination will not receive a message is because that destination has failed. By using a reliable multicast communication protocol many of the problems posed by replicating services can be handled at this low level, simplifying the higher level replica consistency protocol.
Subscribe to:
Posts (Atom)