Transcript Slides

Distributed Systems
CS 15-440
Fault Tolerance- Part II
Lecture 21, Nov 30, 2015
Mohammad Hammoud
1
Today…
 Last Session:
 Fault Tolerance- Part I
 Process Resilience
 Today’s Session:
 Fault Tolerance – Part II
 Reliable communication
 Announcements:
 Project 4 is due on Thursday, Dec 3rd by midnight
 Final exam is on Dec 10th from 1:00PM to 4:00PM at Room 1064
(all topics are included; it will be open books, open notes)
2
Objectives
Discussion on Fault Tolerance
Recovery from
failures
General
background on
fault tolerance
Process
resilience,
failure detection
and reliable
communication
Atomicity and
distributed
commit
protocols
Reliable Communication
 Fault tolerance in distributed
concentrates on faulty processes
P
 However,
we
also
communication failures
P
systems
typically
to
consider

need
 Two types of reliable communication:
 Reliable request-reply communication (e.g., RPC)
 Reliable group communication (e.g., multicasting schemes)
4
Reliable Communication
Reliable Communication
Reliable Request-Reply
Communication
Reliable Group
Communication
5
Request-Reply Communication
 The request-reply (RR) communication is designed to support the
roles and message exchanges in typical client-server interactions
Client
Server
Request Message
doOperation
•
•
(wait)
•
•
(continuation)
getRequest
select operation
execute operation
Reply Message
sendReply
 This sort of communication is mainly based on a trio of
communication primitives, doOperation, getRequest and sendReply
6
Timeout Mechanisms
 Request-reply communication may suffer from crash, omission,
timing, and byzantine failures
 To allow for occasions where a request or a reply message is not
delivered (e.g., lost), doOperation uses a timeout mechanism
 There are various options as to what doOperation can do
after a timeout:
 Return immediately with an indication to the client that the request
has failed
 Send the request message repeatedly until either a reply is received or
the server is assumed to have failed
7
Idempotent Operations
 In cases when the request message is retransmitted, the
server may receive it more than once
 This can cause the server executing an operation more than
once for the same request
 Not every operation can be executed more than once and
obtain the same results each time
 Operations that can be executed repeatedly with the
same effect are called idempotent operations
8
Duplicate Filtering
 To avoid problems with operations, the server should:
 Recognize successive messages from the “same” client
 Filter out duplicates
 Upon receiving a “duplicate” request, the server can either:
 Re-execute the operation again and reply
 Possible only for idempotent operations
 Or avoid re-executing the operation if it retained the outcome of
the first request
9
In Summary…
 RR protocol can be implemented in different ways to provide
different delivery guarantees. The main choices are:
1.
Retry request message (client side): Controls whether to retransmit
the request message until either a reply is received or the server is
assumed to have failed
2.
Duplicate filtering (server side): Controls when retransmissions are
used and whether to filter out duplicate requests at the server
3.
Retransmission of results (server side): Controls whether to keep a
history of result messages so as to enable lost replies to be
retransmitted without re-executing the operations at the server
10
Request-Reply Call Semantics
 Combinations of RR protocols lead to a variety of possible
semantics for the reliability of remote invocations
Fault Tolerance Measure
Retransmit
Request
Message
Duplicate
Filtering
Re-execute
Procedure or
Retransmit Reply
Call Semantics
(Pertaining to
Call Semantics
Remote
Procedures)
No
N/A
N/A
Maybe
Yes
No
Re-execute
Procedure
At-least-once
Yes
Yes
Retransmit Reply
At-most-once
11
Reliable Communication
Reliable Communication
Reliable Request-Reply
Communication
Reliable Group
Communication
12
Reliable Group Communication
 As we considered reliable request-reply communication,
we need also to consider reliable multicasting services
1
2
7
3
6
4
5
 E.g., Election algorithms use multicasting schemes
13
Reliable Group Communication
 A Basic Reliable-Multicasting Scheme
 Atomic Multicasting
14
Reliable Group Communication
 A Basic Reliable-Multicasting Scheme
 Atomic Multicasting
15
Reliable Multicasting
 Reliable multicasting indicates that a message that is sent to
a group of processes should be delivered to each member
of that group
 A distinction should be made between:
 Reliable communication in the presence of faulty processes
 Reliable communication in the presence of non-faulty processes
 How can we achieve reliable multicasting?
16
More Precisely…
 What happens if “during multicasting” a process P joins
or leaves a group?
 Should the sent message be delivered to other processes?
 Should P (if joining) also receive the message?
 What happens if the (sending) process crashes during
multicasting?
 What about message ordering?
17
Reliable Multicasting with Feedback
Messages
 Consider the case when a single sender, S, wants to
multicast a message to multiple receivers
 S’s message may be lost part way and delivered to some, but
not to all, of the intended receivers
 As of now, let us assume that messages are received in the
same order as they were sent
18
Reliable Multicasting with Feedback
Messages
Sender
History
Buffer
Receiver
Receiver
Receiver
Receiver
M25
Last = 24
Last = 24
Last = 23
Last = 24
Network
Sender
Receiver
Last = 24
Receiver
Last = 24
M25
ACK25
Receiver
Last = 23
M25
ACK25
Receiver
Last = 24
M25
Missed 24
M25
ACK25
An extensive and detailed survey of total-order broadcasts can be found
19 in Defago et al. (2004)
Reliable Group Communication
 A Basic Reliable-Multicasting Scheme
 Atomic Multicasting
20
Atomic Multicast
 Atomic Multicast guarantees that:
 A message is delivered to either all or none of the
processes
 All messages are delivered in the same order to
all processes
 As a result:
 Non-faulty processes can maintain a “consistent view”
 Reconciliation is enforced when a process recovers and
rejoins a group
21
Virtual Synchrony
 A multicast message, m, is uniquely associated with a list of
processes to which it should be delivered
 This delivery list corresponds to what is called a group view
 In principle, the delivery of m is allowed to fail:
1. When a change in group-membership is the result of the
sender of m crashing
 Accordingly, m may either be delivered to all remaining processes,
or ignored by each of them
2. When a change in group-membership is the result of a receiver
of m crashing
 Accordingly, m may be ignored by every other receiver
A reliable multicast with these properties is said to be “virtually synchronous”
22
The Principle of Virtual Synchrony
Reliable multicast by multiple
point-to-point messages
P3 crashes
P3 rejoins
P1
P2
P3
P4
G = {P1, P2, P3, P4}
G = {P1, P2, P4}
G = {P1, P2, P3, P4}
Partial multicast from P3 is discarded
23
Time
Message Ordering
 Four different virtually synchronous multicast orderings
are distinguished:
1. Unordered multicasts
2. FIFO-ordered multicasts
3. Causally-ordered multicasts
4. Totally-ordered multicasts
24
1. Unordered multicasts
 A reliable, unordered multicast is a virtually synchronous multicast in
which no guarantees are given concerning the order in which
received messages are delivered by different processes
Process P1
Process P2
Process P3
Sends m1
Receives m1
Receives m2
Sends m2
Receives m2
Receives m1
Three communicating processes in the same group
25
2. FIFO-Ordered Multicasts
 With FIFO-Ordered multicasts, the communication layer is forced to
deliver incoming messages from the same process in the same
order as they have been sent
Process P1
Process P2
Process P3
Process P4
Sends m1
Receives m1 Receives m3 Sends m3
Sends m2
Receives m3 Receives m1 Sends m4
Receives m2 Receives m2
Receives m4 Receives m4
Four processes in the same group with two different senders.
26
3-4. Causally-Ordered and
Total-Ordered Multicasts
 Causally-ordered multicasts preserve potential causality
between different messages
 If message m1 causally precedes another message m2, the
communication layer at each receiver will always deliver m1
before m2
 Total-ordered multicasts require that when messages are
delivered, they are delivered in the same order to all group
members
 This is regardless of whether message delivery is unordered,
FIFO-ordered, or causally-ordered
27
Virtually Synchronous Reliable
Multicasting
 A virtually synchronous reliable multicasting that offers total-ordered
delivery of messages is what we refer to as atomic multicasting
Multicast
Basic Message Ordering
Total-Ordered Delivery?
Reliable multicast
None
No
FIFO multicast
FIFO-ordered delivery
No
Causal multicast
Causal-ordered delivery
No
Atomic multicast
None
Yes
FIFO atomic multicast
FIFO-ordered delivery
Yes
Causal atomic multicast
Causal-ordered delivery
Yes
Six different versions of virtually synchronous reliable multicasting
28
Distributed Commit
 Atomic multicasting problem is an example of a more general
problem, known as distributed commit
 The distributed commit problem involves having an operation being
performed by each member of a process group, or none at all
 With reliable multicasting, the operation is the delivery of a message
 With distributed transactions, the operation may be the commit of a
transaction at a single site that takes part in the transaction
 Distributed commit is often
coordinator and participants
established
by
29
means
of
a
One-Phase Commit Protocol
 In a simple scheme, a coordinator can tell all participants
whether or not to (locally) perform the operation in question
 This scheme is referred to as a one-phase commit protocol
 The one-phase commit protocol has a main drawback that if
one of the participants cannot actually perform the operation,
there is no way to tell the coordinator
 In practice, more sophisticated schemes are needed
 The most common utilized one is the two-phase commit protocol
30
Two-Phase Commit Protocol
 Assuming that no failures occur, the two-phase commit protocol
(2PC) consists of the following two phases, each consisting of
two steps:
Phase I: Voting Phase
Step 1
Step 2
•
The coordinator sends a VOTE_REQUEST message to all
participants.
•
When a participant receives a VOTE_REQUEST message, it
returns either a VOTE_COMMIT message to the coordinator
telling the that
indicating
coordinator
it is prepared
that it to
is prepared
locally commit
to locally
its part
commit
of theits
part of the transaction,
transaction,
or otherwise
or aotherwise
VOTE_ABORT
a VOTE_ABORT
message.
message
31
Two-Phase Commit Protocol
Phase II: Decision Phase
•
The coordinator collects all votes from the participants.
•
If all participants have voted to commit the transaction, then so
will the coordinator. In that case, it sends a GLOBAL_COMMIT
message to all participants.
•
However, if one participant had voted to abort the transaction,
the coordinator will also decide to abort the transaction and
multicast a GLOBAL_ABORT message.
•
Each participant that voted for a commit waits for the final
reaction by the coordinator.
•
If a participant receives a GLOBAL_COMMIT message, it
locally commits the transaction.
•
Otherwise, when receiving a GLOBAL_ABORT message, the
transaction is locally aborted as well.
Step 1
Step 2
32
2PC Finite State Machines
Vote-request
Vote-abort
Commit
Vote-request
Vote-abort
Global-abort
ABORT
INIT
INIT
Vote-request
Vote-commit
WAIT
Vote-commit
Global-commit
COMMIT
The finite state machine for the
coordinator in 2PC
Global-abort
ACK
ABORT
WAIT
Global-commit
ACK
COMMIT
The finite state machine for a
participant in 2PC
33
2PC Algorithm
Actions by coordinator:
write START_2PC to local log;
multicast VOTE_REQUEST to all participants;
while not all votes have been collected{
wait for any incoming vote;
if timeout{
write GLOBAL_ABORT to local log;
multicast GLOBAL_ABORT to all participants;
exit;
}
record vote;
}
If all participants sent VOTE_COMMIT and coordinator votes COMMIT{
write GLOBAL_COMMIT to local log;
multicast GLOBAL_COMMIT to all participants;
}else{
write GLOBAL_ABORT to local log;
multicast GLOBAL_ABORT to all participants;
}
34
Two-Phase Commit Protocol
Actions by participants:
write INIT to local log;
Wait for VOTE_REQUEST from coordinator;
If timeout{
write VOTE_ABORT to local log;
exit;
}
If participant votes COMMIT{
write VOTE_COMMIT to local log;
send VOTE_COMMIT to coordinator;
wait for DECISION from coordinator;
if timeout{
multicast DECISION_RQUEST to other participants;
wait until DECISION is received; /*remain blocked*/
write DECISION to local log;
}
if DECISION == GLOBAL_COMMIT { write GLOBAL_COMMIT to local log;}
else if DECISION == GLOBAL_ABORT {write GLOBAL_ABORT to local log};
}else{
write VOTE_ABORT to local log;
send VOTE_ABORT to coordinator;
}
35
Two-Phase Commit Protocol
Actions for handling decision requests:
/*executed by separate thread*/
while true{
wait until any incoming DECISION_REQUEST is received; /*remain blocked*/
read most recently recorded STATE from the local log;
if STATE == GLOBAL_COMMIT
send GLOBAL_COMMIT to requesting participant;
else if STATE == INIT or STATE == GLOBAL_ABORT
send GLOBAL_ABORT to requesting participant;
else
skip; /*participant remains blocked*/
}
36
Objectives
Discussion on Fault Tolerance
Recovery from
failures
General
background on
fault tolerance
Process
resilience,
failure detection
and reliable
communication
Atomicity and
distributed
commit
protocols
Recovery
 Thus far, we have focused on algorithms that allow us to
tolerate faults
 However, once a process fails, it is essential that it can
recover to a correct state
 Two common recovery mechanisms:
 Checkpointing
 Message Logging
38
Why Checkpointing?
 In fault-tolerant distributed systems, processes
“regularly” save their states onto a stable storage
 This mechanism is referred to as checkpointing
 Checkpointing consists of storing a
snapshot” of the current application state
“distributed
 After a failure, the distributed snapshot is used to restart
the system (or part of it) from a correct state
39
Recovery Line
 In capturing a distributed snapshot, if a process P has recorded the
receipt of a message, m, then there should be also a process Q that
has recorded the sending of m
We are able to identify both, senders and receivers.
Initial state
A local snapshot
A recovery line
Not a recovery line
P
m
Q
Message sent from
Q to P
They jointly form a distributed
40
snapshot
A failure
Checkpointing
 Checkpointing can be classified into two types:
1. Independent Checkpointing, wherein each process simply
records its local state from time to time in an uncoordinated
fashion
2. Coordinated Checkpointing, wherein all processes
synchronize to jointly write their states to a stable storage
 Which algorithm among the ones we’ve studied can be
used to implement coordinated checkpointing?
 2PC
41
Domino Effect

Independent checkpointing may make it difficult to find a recovery line,
leading potentially to a domino effect resulting from cascaded rollbacks
Not a Recovery Line
Not a Recovery Line
Rollback
Not a Recovery Line
P
A failure
Q

With coordinated checkpointing, the saved state is automatically globally
consistent, hence, domino effect is inherently avoided
42
Why Message Logging?
 Considering that checkpointing is an expensive operation,
techniques have been sought to reduce the number of checkpoints,
but still enable recovery
 An important technique in distributed systems is message logging
 The basic idea is that if transmission of messages can be replayed,
we can still reach a globally consistent state, yet without having to
restore all the state from a distributed checkpoint
 In practice, the combination of having fewer checkpoints alongside
message logging is more efficient than having to take
many checkpoints
43
Message Logging
 Message logging can be classified into two types:
1. Sender-based logging: A process can log its messages before
sending them off
2. Receiver-based logging: A receiving process can first log an
incoming message before delivering it to the application
 When a sending or a receiving process crashes, it can restore
the most recently checkpointed state, and from there on
“replay” the logged messages
 Will this work for non-deterministic applications?
44
Replaying of Messages and
Orphan Processes
 Caveat: Incorrect logging/replaying of messages after recovery can
lead to orphan processes
Q crashes
Q recovers
M1 is replayed
M3 becomes an
orphan
P
M1
M1
Q
M2
M3
M2
M3
R
M2 can never be replayed
Logged Message
Unlogged Message
45
Objectives
Discussion on Fault Tolerance
Recovery from
failures
General
background on
fault tolerance
Process
resilience,
failure detection
and reliable
communication
Atomicity and
distributed
commit
protocols
All Covered!
Next Class
Distributed File Systems
Thank You!
47