Distributed task assignment with failover
When building a distributed system, one of the more interesting aspects is how you are going to distribute tasks assignment. In other words, given that you have multiple nodes, how do you decide which node will do what? In some cases, that is relatively easy, you can say “all nodes will process read requests”, but in others, this is more complex. Let us take the case where you have several nodes, and you need to have a regular backup of a database that is replicated between all those nodes. You probably don’t want to run the backup across all the nodes, after all, they are pretty much the same and you don’t want to backup the exact same thing multiple times. On the other hand, you probably don’t want to assign this work statically, if you do, and if the node that is responsible for the backup is down, you got no backup.
Another example of the problem can be seen when you have other processes that you would like to be sticky if possible, and only jump around if there is a failure. Brining up a new node online is a common thing to do in a cluster, and the ideal scenario in that case is that a single node will feed it all the data that it needs. If we have multiple nodes doing that, they are likely to overlap and they might very well overload the poor new server. So we want just one node to update its state, but if that node goes down midway, we need someone else to pick up the slack.. For RavenDB, those sort of tasks includes things like ETL processes, Subscriptions, backup, bootstrapping new servers and more. We keep discovering new things that can use this sort of behavior.
But how do we actually make this work?
One way of doing this is to take advantage of the fact that RavenDB is using Raft and have the leader do task assignment. That works quite well, but it doesn’t scale. What do I meant by that? I mean that as the number of tasks that we need to manage grows, the complexity in the task assignment portion of the code grows as well. Ideally, I don’t want to have to consider twenty different variables and considerations before deciding what operation should go on which server, and trying to balance that sort of work in one place has proven to be challenging.
Instead, we have decided to allocate work in the cluster based on simple rules. Each task in the cluster has a key (which is typically generated by hashing some of its parameters), and that task can be assigned to a group of servers. Given those two details, we can use Jump Consistent Hashing to spread the load around. However, that doesn’t handle failover. We have a heartbeat process that can detect and notify nodes that a sibling has went down, so combining those two, we get the following code:
What we are doing here is rely on two different properties. Jump Consistent Hashing to let us know which node is responsible for what, and the Raft cluster leader that keep track of all the live nodes and let us know when a node goes down. When we need to assign a task, we use its hashed key to find its preferred owner, and if it is alive, that is that. But if it currently down, we do two things, we remove the downed node from the topology and re-hash the key with the new number of nodes in the cluster. That gives us a new preferred node, and so on until we find a live one.
The reason we rehash on failover is that Jump Consistent Hashing is going to usually point to the same position in the topology (that is why we choose it in the first place, after all), so we rehash to get a different position so it won’t all fall unto the next node in the list. All downed node tasks are fairly distributed among the remaining live cluster members.
The nice thing about this is that aside from keeping the live/down list up to date, the Raft cluster doesn’t really need to do something. This is a consistent algorithm, so different nodes operating on the same data can arrive at the same result, so a node going down will result in another node picking up on updating the new server up to spec and another will start a backup process. And all of that logic is right where we want it, right next to where the task logic itself is written.
This allow us to reason much more effectively about the behavior of each independent task, and also allow each node to let you know where each task is executing.