How RavenDB uses gossip protocol to replicate documents efficiently in a cluster

time to read 4 min | 714 words

imageA RavenDB database can reside on multiple nodes in the cluster. RavenDB uses a multi master protocol to handle writes. Any node holding the database will be able to accept writes. This is in contrast to other databases that use the leader/follower model. In such systems, only a single instance is able to accept writes at any given point in time.

The node that accepted the write is responsible for disseminating the write to the rest of the cluster. This should work even if there are some breaks in communication, mind, which makes things more interesting.

Consider the case of a write to node A. Node A will accept the write and then replicate that as part of its normal operations to the other nodes in the cluster.

In other words, we’ll have:

  • A –> B
  • A –> C
  • A –> D
  • A –> E

In a distributed system, we need to be prepare for all sort of strange failures. Consider the case where node A cannot talk to node C, but the other nodes can. In this scenario, we still expect node C to have the full data. Even if node A cannot send the data to it directly.

The simple solution would be to simply have each node replicate the data it get from any source to all its siblings. However, consider the cost involved?

  • Write to node A (1KB document) will result in 4 replication (4KB)
  • Node B will replicate to 4 nodes (including A, mind), so that it another 4KB.
  • Node C will replicate to 4 nodes, so that it another 4KB.
  • Node D will replicate to 4 nodes, so that it another 4KB.
  • Node E will replicate to 4 nodes, so that it another 4KB.

In other words, in a 5 nodes cluster, a single 1KB write will generate 20KB of network traffic, the vast majority of it totally unnecessary.

There are many gossip algorithms, and they are quite interesting, but they are usually not meant for a continuous stream of updates. They are focus on robustness over efficiency.

RavenDB takes the following approach, when a node accept a write from a client directly, it will send the new write to all its siblings immediately. However, if a node accept a write from replication, the situation is different. We assume that the node that replicate the document to us will also replicate the document to other nodes in the cluster. As such, we’ll not initiate replication immediately. What we’ll do, instead, it let all the nodes that replicate to us, that we got the new document.

If we don’t have any writes on the node, we’ll check every 15 seconds whatever we have documents that aren’t present on our siblings. Remember that the siblings will report to us what documents they currently have, proactively. There is no need to chat over the network about that.

In other words, during normal operations, what we’ll have is node A replicating the document to all the other nodes. They’ll each inform the other nodes that they have this document and nothing further needs to be done. However, in the case of a break between node A and node C, the other nodes will realize that they have a document that isn’t on node C, in which case they’ll complete the cycle and send it to node C, healing the gap in the network.

I’m using the term “tell the other nodes what documents we have”, but that isn’t what is actually going on. We use change vectors to track the replication state across the cluster. We don’t need to send each individual document write to the other side, instead, we can send a single change vector (a short string) that will tell the other side all the documents that we have in one shot.  You can read more about change vectors here.

In short, the behavior on the part of the node is simple:

  • On document write to the node, replicate the document to all siblings immediately.
  • On document replication, notify all siblings about the new change vector.
  • Every 15 seconds, replicate to siblings the documents that they missed.

Just these rules allow us to have a sophisticated system in practice, because we’ll not have excessive writes over the network but we’ll bypass any errors in the network layer without issue.