Distributed Snapshots: Chandy-Lamport protocol

Table of Contents

Some forms of distributed snapshots were around for a while already when Chandy-Lamport's distributed snapshots paper was first published in 1985. Lamport considers this protocol a straightforward application of the basic ideas from Lamport clocks. Other than reviewing the paper, in this post I'll also present some examples of real world implementations and a TLA+ specification of the protocol.

What problem is it trying to solve?

You need to record the global state of a program. Why? Because, for example, you have some complex computation ongoing, and you want to know which step has reached. Or you have a long-running computation, and you want to take a snapshot as a backup to allow restarting the computation again from the checkpoint rather than from the beginning in case any machine fails.

For the state of the program, we refer to the local variables and in general to the history of states that the program went through.

Why is taking snapshots hard? Well, first of all, the snapshotting algorithm should not interfere with the running computation.

Secondly, if your program is a single process on a single machine, this is straightforward! You could create an api to say "record the snapshot in 5 seconds" or "every 2 hours". For a multi-thread/multiprocess program running on a single machine, you can create a similar api.

In a distributed system, this api won't work because there is no global shared clock. You could end up with out-of-sync snapshots providing an inconsistent view of the system. Other than the state of the process itself, we could also have inflight messages that should be included in the snapshot. As an example of inconsistent snapshot, a process B could record that it received a message from A and A's snapshot does not include that the message was sent over to B.

The paper has a good visual representation: imagining you wanted to take a picture of a sky filled with migrating birds. One photo is not enough, you will need to take multiple pictures and stitch them together in a way that provides a consistent view of the landscape. This is the challenge that this paper is trying to solve.

What can we use it for?

More generally, we can use such a global-state detection algorithm to verify stability properties. Stability properties are monotonic in a way that if they hold now, they will keep holding forever. For example, properties like "has the computation reached the n-th step" or "is there a deadlock".

A more formal description of stability properties is the following. Given a property $y$, a state $S$ and any state $S'$ reachable from $S$, $y$ is a stability property iff $y(S) -> y(S')$.

System model

The system is composed of a set of processes which communicate between each other using channels—any process has a "logical" channel to every other process.

Channels are assumed to:

While the state of the channel is the sequence of messages sent along the channel, the process is defined by a set of states, an initial state and a set of events. Each event is atomically changing the state of the process p and the state of at most one channel c incident of p.

This way of describing a process using a state machine and a set of actions that change its state is common in Lamport's writing. In the paper, there is a reference to Lamport clocks which also has a similar approach to describe processes and actions. As a funny insight, the paper also references "LAMPORT, L., AND CHANDY, K. M. On partially-ordered event models of distributed computations. Submitted for publication." which according to Lamport himself:

In 2012, a reader noticed that the paper's reference list includes a paper by Chandy and me titled On Partially-Ordered Event Models of Distributed Computations, claiming it had been submitted for publication.  Several times I have made the mistake of referencing a paper of mine "to appear" that never appeared.  But I can't imagine that I would have claimed that a nonexistent paper had been submitted for publication.  However, neither Chandy nor I have any memory of that paper or the reference.  My guess is that we inserted the reference in a preliminary version when we expected to write and submit the other paper, and then we forgot to remove it.

I think it's easier to think channels as being unidirectional - process $P_i$ can send a message to process $P_j$ by using channel $C_{ij}$ . Every process is connected with a unidirectional channel to every other process.

Global-State-Detection Algorithm

The execution is decentralized, i.e., there is no "master" process coordinating the snapshot and can be initiated by any process. It's also important to notice that the algorithm is correct even if two processes start the snapshot protocol around the same time unbeknownst to each other.

The provided solution is designed to solve this problem only once (take one single snapshot)—extending it to multiple runs should be trivial.

The algorithm works by sending special messages called "marker message" through the channels. Processes will behave differently depending on if they ever received a marker message before. These marker messages are part of the algorithm and not part of the underlying computation - they won't appear in the snapshot itself. Essentially, when a process $P_i$ receives a first marker message through a channel, it will record its own state and start recording incoming messages from all the channels. It will stop recording incoming messages from a channel as soon as it receives a marker message through there. Let's see the rules in more detail.

Marker sending rule for Process $P_i$:

In this case, $P_i$ decides to start the snapshot algorithm.

P_i records its state
for each outgoing channel C_ij:
	P_i sends marker message over C_ij
	P_i starts recording incoming messages over C_ij

Marker receiver rule for process $P_i$:

Given processes $P_i$, $P_j$ and a channel $C_{ij}$, when $P_i$ receives a marker message from $P_j$ through $C_{ij}$:

if it is the first marker ever received:
	records its state
	records the state of C_ij as the empty set
	sends a marker message over all its outgoing channels 
	starts recording messages coming from other channels
	records the state of C_ij as the set of messages received over C_ij 
		since it started recording

What makes a snapshot consistent (or valid)?

On one hand, a global state is consistent if for all "received message x" event implies that our snapshot also includes the correspondent "sent message x" event. It would be weird if one process claimed to have received a message and the sender doesn't seem to know about it.

On the other hand, our snapshot should not include messages that are sent by a process after it has recorded its own state.

These two consistency properties define what should be included and what should be excluded from the snapshot to have a consistent global state.

More formally, a global state GS is defined as the union of all local states LS and channel states SC:

$$GS={\bigcup_{i}LS_i, \bigcup_{i}SC_{ij}}$$

and GS is a consistent global state if the two properties are satisfied:

How to collect the snapshots?

The algorithm only solves the problem of taking the consistent snapshot, but doesn't provide a way to collect them. In the paper, they offer a simple solution to this problem:

A simple algorithm for collecting information in a system whose topology is strongly connected is for each process to send the information it records along all outgoing channels, and for each process receiving information for the first time to copy it and propagate it along all of its outgoing channels.

All the recorded information will then get to all the processes in finite time, allowing all processes to determine the recorded global state.

Systems implementing this algorithm

These are the systems that have implemented distributed snapshots and use it in their operations.

Hazelcast Jet, a distributed stream processor, uses Distributed Snapshots to achieve fault tolerance for distributed computations.

At regular intervals, Jet raises a global flag that says "it's time for another snapshot". All processors belonging to source vertices observe the flag, save their state, emit a barrier item to the downstream processors and resume processing.

Source: Official docs, Paper.

Apache Flink, a stream processing framework, uses distributed snapshots in two ways:

First, it is used as a checkpointing for the regular backup of the global state. When applications fail, it is used for recovery. Second, it is used for deadlock detection. The current program continues to run after it is snapshot. Then, the snapshot is analyzed to see if a deadlock state exists in the application. If so, the corresponding processing will be carried out.

In Flink, at the end of the snapshot protocol, the snapshot collector (Central Server) starts collecting snapshots to form global consistency snapshots.

Source: Flink Course Series (4): Fault Tolerance in Flink—On AlibabaCloud blog.

Let me know if you have other examples, and I will add them here!

TLA+ specification

I've written the specification of this protocol using TLA+ and Pluscal. I will quickly highlight some interesting parts, and you can check the rest on GitHub.

Let's talk about channels. Initially I decided to model them as an array of arrays, where channel[i][j] would contain messages to process i from process j. This was very verbose to work with, eventually I've refactored them to be an array of messages, with messages using the format "[sender, content]". This also allows for just using head to get the next message and work with one message at a time.

I've decided to model a fictional computation. Other than snapshot related actions, a process can:

This local counter will act as a logical clock which I've used to help verify the consistency properties C1 and C2.

For condition C1:

\* Any message that is sent by a process before recording its snapshot, 
\* must be recorded in the global snapshot
ConsistentGlobalStateCondition1 == \A proc \in DOMAIN snapshot:
                                       \A receiver \in DOMAIN snapshot[proc]["chans"]:
                                            \A recordedMessage \in DOMAIN snapshot[proc]["chans"][receiver]:
                                                snapshot[proc]["chans"][receiver][recordedMessage] < snapshot[proc]["computation"]

Essentially, this is checking that all the messages sent by this process as recorded by all the other processes, should have a timestamp less than the process' recorded timestamp.

It's also interesting to note that TLA+ doesn't allow to use the format $\forall\ i \in [1,2,3]$. However, because sequences are just functions, we can use DOMAIN to get the domain of that function as a set and use it to iterate on the list. The domain of the sequence is effectively a set that contains the indexes 1..n.

The condition C2:

\* Any message that is sent by a process after recording its snapshot,
\* must not be recorded in the global snapshot
ConsistentGlobalStateCondition2 == \A procSnapshot \in DOMAIN snapshot:
                                        snapshot[procSnapshot]["computation"] # Null => \A proc \in DOMAIN channels:
                                            \A msg \in DOMAIN channels[proc]:                                        
                                                channels[proc][msg]["sender"] = procSnapshot => 
                                                    \/ channels[proc][msg]["content"] = Marker
                                                    \/ snapshot[procSnapshot]["computation"] < channels[proc][msg]["content"]

This ensures that after the snapshot is taken, any message flowing through the channels has a greater timestamp than the recorded computation.

And finally, the property to satisfy is:

ConsistentGlobalState == /\ ConsistentGlobalStateCondition1 
                         /\ ConsistentGlobalStateCondition2

I've also reused the trick I've learnt from the Lamport Clock spec to bound the model size.

\* State constraint to keep the model bounded
MaxCompMessagesConstraint == \A proc \in Processes: compMsgSent[proc] < MaxCompMessages
MaxCompStateConstraint == \A proc \in Processes: computation[proc] < MaxCompState

This will put an end to the otherwise infinite computation.

I've also decided to model a Termination property to ensure the protocol eventually ends:

\* If snapshot is taken anywhere, eventually every state is recorded and every channel is recorded up to the first Marker msg.
Snapshot == <>[] \E proc \in DOMAIN snapshot: 
                snapshot[proc]["computation"] # Null => /\ EveryStateIsRecorded 
                                                        /\ EveryChannelIsRecorded

I think the most challenging part was trying to come up with a way to verify that the global snapshot is consistent.

Regrettably, also for this spec I did not use the refinement process to write it, but maybe in the future I'll try to add that as a constraint. The protocol is presented in progressive refinement in Prof. Chandy's book referenced in the Resources section.


In this post, I've reviewed the Chandy-Lamport distributed snapshot protocol paper and presented a TLA+ specification I've written for it. This paper generalizes more specific algorithms written to solve specific instances like termination detection. Stability properties generalize properties like "has this computation terminated" or "is this computation in deadlock state".

Taking a global snapshot in a distributed system is hard because of the lacking of a shared clock and metaphorically, it's like trying to stitch multiple pictures of a sky in a consistent way.

After briefly presenting the protocol, I've mentioned real world systems that use it in their operations. If you know more systems that use it, please reach out and I'll include them.

Finally, I presented a TLA+ specification of the protocol. As always, when I sit down and try to actually model the spec is when I realize I might not fully understand it yet. TLA+ is beneficial to grok algorithms.