of the more interesting requirements that came my way recently was to redesign Rhino DHT to support dynamically distributed network of nodes. Currently, the DHT support a static network layout, and while it can handle node failure (and recovery) via replication, it cannot handle node additions on the fly.
That is a problem. But another problem is that the configuration for the DHT is getting complex very fast. And we start putting more and more requirements on top of it. This post is going to articulate the requirements we have for the DHT, how the current design for solving them.
- Storage for the DHT is a solved problem, using Rhino Persistent Hash Table.
- Concurrency options for the DHT are those that are applicable for Rhino PHT.
- Merge concurrency
- Optimistic concurrency
- Last write win - for unchanging values
What we need to solve is how to generalize the PHT into a dynamic DHT, with failure tolerance and the ability to add and remove nodes to the network on the fly.
- A DHT cluster may contain 1 or more nodes
- A DHT cluster allow to extend it by adding new nodes on the fly
- Adding a node to a cluster is an expensive operation, requiring resharding of the data
- Removing a node is not explicitly supported, when a node is removed we assumed it is failed, and we reallocate its responsibilities
- A value on the DHT always have a master node, and replicated to 2 other machines
- A DHT node configuration includes:
- DHT Cluster name
- DHT options:
- max number of machines to replicate a value to
- The cluster topology is maintained in the DHT itself, and can be queried from any node
- Clients may receive a topology changed error when querying the DHT, and need to refresh their cached topology when this happens
Working from this set of assumptions, let us see how we can handle a DHT node startup. For now, we assume that this is the first time that it has woken up, and that there are no other nodes alive.
The DHT node publish a multicast UDP message on the network, announcing that it is up, and waits for replies. Since this is currently the only node, there are no replies and we are going to assume that we are the leader, and mark ourselves as such. A second node coming up is also going to announce itself on startup, and this time it is going to get a reply from the first node, telling it that the first node is the leader.
This is where things gets... tricky. A new node coming up means that we need to reshard the system. Sharding means that we put the values on more than a single machines, and resharding means that we need to allocate previously assign data range from the existing machines to the new one.
We handle this problem using the following approach:
The DHT is a key value store. Setting a value requires a key (string). That key is going to be MD5 hashed, in order to get reliable hashing algorithm across machines, runtime versions and maybe even different platforms. Using the hash, we are usually going to simple index into the list of nodes in order to find the appropriate node responsible for this. In essence, this is the code most often used:
nodes[ MD5_Hash(key) % nodes.length ].Send(key, val);
The problem with this approach is that adding/removing a node will invalidate the entire hashing strategy. In order to handle this problem, what we are going to do is to assume a 32 bits address space, giving us 4,294,967,296 distinct values. We are going to divide this address space into 1024 ranges, each of 4,194,304 values.
Each of those values is going to be assigned to a node. This gives us much more control on distributing the values across the network.
Back to the resharding issue. When we had a single node, it was the owner of all those ranges, and any value was set in that node. Now, we have to reallocate ranges, and we allocate half of them to the new node, giving us 512 ranges for each. Just reallocating the nodes isn't as simple as it may sound, however, since we have possible values allocated in them.
The algorithm for doing that is:
- The leader allocate a set of ranges to the newly arrived node, and tell the new node about its new ranges, and the old nodes that were assigned those ranges.
- The new node now access each old node and ask it to send it all the values in the ranges it was previously assigned. This is a somewhat complex procedure, since we need to keep the system alive while we do so. Internally, it is going to work as:
- Get all the values in the requested range and mark their timestamp
- Send all the values to the new node
- For all the sent values, check their timestamp and mark any that weren't changed with "forward to the new node", any request for that value would now generate an error saying that the client needs to go to the new node instead.
- Repeat until there are no values that has been changed during the send process.
- Mark the range as "forward to this node", any request will generate an error saying that the new node should be used.
- Once we get all the values in all the ranges allocated to us, we let the leader know about that.
- The leader tell all the nodes to accept the new topology. Now, any queries for the deallocated values on the old nodes will generate a topology changed error, which would force the client to reload the topology.
- Note, there is a difference between a "forward to this node" error vs. "topology changed" error. The first is transient, the second is permanent.
There are two major problems with this algorithm. The first is that it totally ignore the possibility of failure, the second is that it ignores the replication topology as well.
We will start with the notion of failure first, if the new node fails during the initial load process, it is going to cause some problems, because any newly accepted values on it will be dead, while the node that gave it will continue redirecting to the now dead new node. This isn't actually a problem, the standard failover mechanism will kick in, and we will query the node secondary (which hasn't changed yet) for the actual value. If we try to set a value, it will generate a replication to the old primary node, resulting in the value effectively being "resurrected" in the old node.
Since we don't expect this scenario to be very common, I think we can leave it as that, the situation will slowly correct itself over time, without us needing to do anything else.
A more interesting case is the failure of one of the old nodes during the copy process. Again, we will fail over to the range's secondary node, and copy the values from there. There is an interesting race condition here if the old node manage to recover in time, but again, that is not something that I am overly concerned about.
Replication topology is a more interesting scenario. Let us go over the topology of a four node cluster:
- Node 1
- Leader
- Primary for ranges 0 - 256
- Secondary for ranges - 256 - 512
- Tertiary for ranges - 512 - 768
- Node 2
- Primary for ranges - 256 - 512
- Secondary for ranges - 512 - 768
- Tertiary for ranges 768 - 1024
- Node 3
- Primary for ranges - 512 - 768
- Secondary for ranges - 768 - 1024
- Tertiary for ranges 0 - 256
- Node 4
- Primary for ranges - 768 - 1024
- Secondary for ranges - 0 - 256
- Tertiary for ranges - 256 - 512
This topology is stored in all the nodes, and can be queried from any of them. This means that it is very easy for any of the clients to know exactly which node is the master for any key. And in the case of a node failure, they also know which nodes to failover to.
Any node in the cluster is also aware of the secondary and tertiary replication backups for its ranges, and it used async messaging to let them know about new values. This keeps the node alive even when the replication nodes are down.
Changing the replication topology is done by the leader once the primary values has been successfully copy and we have a topology changeover for the primary. The process is identical to the way we handle the primary topology changeover.
So far, so good, but we haven't handled two other scenarios, leader failure and long node failure. As it stands currently, restarting a node should not cause a change in the network topology. All the clients will automatically failover for the secondary or tertiary nodes, and on startup, the node is going to query its secondary and tertiary nodes for any new values they accepted in its name before is starts listening to incoming requests.
The problem is when we have a node that is going to be down for a long period of time, or forever. Since we are attempting to get a zero configuration system, we need to handle this ourselves. We give a failed node a timeout of 15 minutes to resume operation. This is being taken care of by the leader, which is going to listen to heartbeats from all the nodes. Assuming that 15 minutes have passed without getting a heartbeat from the node, we are going to assume that the node is down, and that we need to reallocate its ranges.
This is actually a fairly simple process, we bump the secondary node for its ranges to be the primary node and the tertiary to be the secondary. The next issue is then to do replication rebalancing, to make sure that each range has a secondary and tertiary nodes, but that is already resolved with the new node addition.
We have a few other edge cases to handle. The most important one is the failure of the leader. This one is pretty easy to handle, I think. The first node in the topology is always the leader, and the topology is always replicated to all the nodes. The second node in the topology is a watchdog for the leader, always pinging it to see that it is alive and responding for requests.
If the leader is down for a period of time (15 seconds, let us say), the watchdog takes over, and let everyone else know that it is now the leader. The next one in the chain is now the watchdog. When the old leader comes up again, it joins the cluster as a normal node. The data that the leader stores is subject to the same rules as any other nodes, and 15 minutes after the node is down it will go through the same rebalancing act by the new leader.
Thoughts?