The high level interview question
The following is likely to end up in the list of questions we’ll ask candidates to answer when they apply to Hibernating Rhinos.
Imagine a sharded database. A sharded database is one where the data is split among multiple nodes. To make things simple, we will assume that each datum in the database has a 64 bits key associated with it, and we are interested in distributing the information evenly among the nodes. This can be done using Jump Consistent Hashing (see paper for details), and can be implemented using the following simple C# function:
This function is responsible for taking a key and (given how many nodes there are in the cluster) provide which node this key resides on.
So far, so good, and this make quite a lot of things much simpler. This function ensures that roughly 1/N of the data items in the databases will require movement to a new node when it is introduced. Which is pretty much exactly what we want in a sharded environment. However, this function doesn’t help us figure out what to move.
Assume that we already have a database that has 5 nodes, and ten billion data items spread across all 5 nodes, spread according to the consistent jump function. Because of load, we need to add additional 2 nodes to the cluster, and we need to move 2/7 (2.8 billion data items) of the cluster data to the new nodes. However, since moving the data items alone is going to be costly, we want to avoid scanning through all 10 billion items in all nodes to figure out which ones we need to ship to the new nodes, and which ones should remain with the current node.
Suggest a way that will allow the database to find out which data items need to be moved to the new nodes, without having to scan through all of them. In other words, anything that requires O(number of items in each node) is out.
You are rated on the impact of your suggestion on the overall cluster behavior. The cheaper your option, the better. You are free to store additional information (at cluster creation / modification, as data items are entered into the cluster / deleted, etc) if this can help you, but be aware that any impact on standard operations (reads & writes) should be minimal and well justified.
You only need to consider adding nodes to the cluster, removing nodes from the cluster is not required.
Execute JumpConsistentHash for all numbers up to the highest key. For keys ending up on node 5 or 6 (0-based), move these items if the key is valid.
Bjarte, How is that different than O(N) cost?
Maybe when planning the cluster, you should have a lot of buckets. If you have 70 buckets and 7 servers each server takes responsibility of 10 buckets. When you change the number of servers, you just transfer some "virtual" buckets. Like having a big database with many small files. When you add storage you just copy some of the small files. Maybe use a number with a lot of prime factors like 60. 60=601, 302, 303, 154, 125, 106 (non-balanced load with 7,8, or 9 servers)
Article: j = (b + 1) * (double(1LL << 31) / double((key >> 33) + 1));
Your solution: index = (ulong)((choosenBucket + 1) * (double)(1L << 31) / (key >> 33) + 1);
The double cast is different. I guess it makes no difference. The mantissa is 52 bit and you are only working on the last 32 bits. http://stackoverflow.com/questions/389993/extracting-mantissa-and-exponent-from-double-in-c-sharp
Carsten, The whole idea is that I don't want to manually manage those buckets. So I define the shard function there, and the actual implementation doesn't really change that. I don't follow the numbers you have there and their purpose. I assume that you can obviously define virtual nodes, but how do you decide where to put the data? So how do you decide that? And when / what to move them?
Hi, if you have a key by datum I think you could notify cluster members in registration process about new key distribution nodes and just synchronize datum files between old key + datum owner with the new one. This must be a cluster management responsibility.
Adriano, The idea is that you have 100 million data items in the database across all nodes. There is no "key distribution nodes", which node a particular data item belongs to is determined by the hash function in the post.
There is no cluster management beyond that.
Well. O(N) is true, but it's not loading all the documents. Just iterating from 0 to the highest key existing in the db and calculating the suggested node. (Takes 17 seconds on one core on my machine for 10000000000 keys)
Trying a load based on a key should be cheap and you're only trying to load the documents with a suggested node of 5 or 6. And you would anyway have to load the document to move it, I presume.
No storing of additional information and no impact on read/write apart from the move itself.
Since it's allowed to store additional data as items are inserted, I'd suggest maintaining an index of keys by their next bucket IDs. There's no computational overhead when inserting an item, because the next bucket ID is already computed when the current bucket is selected (it's stored in the
indexvariable at the end of the loop), so all we have to do is to insert the key into the collection corresponding to the next bucket ID. This way when a new node is added to the cluster, we already know the exact list of keys that need to be transfered.
Bjarte, Those are 100 million data items that are currently on the system. Doesn't mean that all of them are sequential and there has been no deletes. In practice, the 64 key is a hash of the document key, so they are actually spread across the entire spectrum. Even with sequentially generated keys (which are tricky, distributed), you are going to run into problems with gaps. But that is a nice solution, certainly.
HellBrick, Very nice!
I miss understant the let me try again :), when a new node is added to the system new data is automatic routed to the right nide, maybe would be possible wait read and write operations and update the node in process. This way reduce rebalance load in the system.
Adriano , The problem isn't about new data, the issue is how you distribute the data you already have
It's because of the old data tha could be used read and write operation requests to start the migration process.
If you have a lot of "small files" you can just copy some of the files to the new servers.
You start with 60 files and one server.
When you add a new server you need to copy 30 of the files to the new server.
When you have three servers you copy 10 files from each server to the new server.
When you have four server you copy 5 files from each server.
You can select the files randomly. If you disconnect a server you copy the other way round.
Of course you might need a directory of the min/max values in the files.
60 is just a small number (kind of hash-key). It is larger than the number of servers.
NB: I look forward to see your solution.
I've been considering an alternative where I would not attempt to move all the data into the new nodes immediately. Instead, whenever a key was not found on the node that I expected it be on, I would compute which node(s) would have had it in the past, based on the past size(s) of the cluster. Then I store the value on the correct node, and delete the value from the old node.
This allows new nodes to be added very quickly, at the cost of slowing down initial access to data on the new nodes. That seems like a pretty big downside, especially depending on how often individual keys are accessed. But on the other hand copying billions of records might involve more downtime than is acceptable. Maybe the thing to do is to take a hybrid approach where you add the node and immediately start the cluster with the behavior I described, but you also run a job that moves the data that needs to be moved, based on an index like what HellBrick did.
@Hellbrick: If you don't know how many nodes will be added you cannot determin the right "next bucket". You might have a look at "Jump Consistent Hashing"-Paper. There k1 is stored in bucket/0 when using up to two buckets then moved to bucket/2 when using up to four buckets and then moved to bucket/4 when using more buckets. In this case the "next bucket" can be 2 or 4 depending on how many buckets you add.
TomCollins, All I need to do is the minimal node number where this node will move. Afterwhich I just scan all those that should move, and run the hash again, finding where they are supposed to be now
Came to the same conclusion on reading the paper as @Hellbrick. There's a Hash collision DOS attack on the shards caused by the algorithm (if one has the ability to craft arbitrary known keys, and a harder one if all you know is that this slgoruthm is in use)
@Hellbrick's solution is only for adding one shard -- which is technically correct as adding any number of shards = number x adding one shard. However, is there a method to add an arbitrary number of shards? The problem with that is that probably the records to move with +1 bucket may not be a record to move with +2 bucket... Can somebody confirm the math?
Also, this solution does not handle the case of removing shards. Saving another field of the one-lower bucket will do it, but then it will need to be indexed. With one field, the records can be physically ordered according to the next-bucket field, doing away with storing the field altogether and the need for an index. With +1 and -1 fields, then at least the -1 field must be stored (as a T/F bit) and indexed.
Joshka, Can you explain the DOS attack here? I assume you mean that you can route all the data to a single node, effectively blowing it up?
Stephen, In the case of multiple shards being added, you'll now know of all the keys that are going to be moved where the number of shards is smaller than X, and then for each of them, you check where they need to be in the new setup.
With regards to removing shards, you can do it by remembering the min / max size where this data item will stay in this node.
First, I am an occasional reader, and consider your site as one of the most profound tech sources in the web.<br>
The first step I would do in an interview is asking for the unit test of this very important method. So, I wrote a simple data-driven unit test based on page 4 of the linked doc, and the test fails, with the implementation giving vastly different results. Also, it never returns 1.<br>
Then looking at the code, there are a couple of problems surrounding the braces and the double-conversions of unsigned int values. <br>
For getting the feature to work, I presume, any shard has an index of the local keys and a working infrastructure to replicate and delete data items. Which is basically all we need. <br>
The new shard, once instated, should ask all existing other shards to "search the local data items moving to the new shard (= iterate all local keys,select those having the new shard as a valid jump target), replicate them to the new shard, finally delete (if source is no replication target)". The shard-delete action is basically the same operation, only the selector is different (= crying "lambda, lambda").<br>
The idea to persist additional information doesn't land. This smells like breaking the SoC principle, as it couples the persistence structure on one possible implementation of a higher-layered feature (one could always use the simpler but heavier Modulo-solution, or a future better algorithm). Calculating the next jump target may be a couple of seconds slower at execution time than having a persisted value - which is also debatable considering it can be parallelized - but this is a valid trade-off in what is a very time-consuming operation. The minor performance advantage of persistence is not worth the hassle, IMO.<br>
While building the new shard in a time-consuming operation, the big question is, what is going on the client side, while all the resharding takes place. I have solved such a problem by reverse-iterating from the largest shard id to the smallest in the client API. In pretty much 99.9% of the time, the first iteration step delivers the required data, while the 0.1% allows for nearly 100% availabilty in replication edge cases. This is the reason, why I implement such jump-hashes as an iterator, which is much nicer and more intuitive to use as an API, and vastly easier to read.<br>
Btw, what happens, if shard #3 out of a total of 5 gets killed, so that you get gaps in your sharding scheme?
McZ, You are doing something that is explicitly called out as forbidden, forcing iteration of the whole dataset. I don't know where you go the idea that persisting this information is a pref _benefit_, this is an extremely cheap function, and the cost of persistence is extremely high.
And you are asking questions that are nice (client, failover, etc) that are not actually relevant to the problem at hand.
I like this question, but extending it along the lines that McZ has taken it might lead to a more revealing question: How do you implement adding more servers to a system based on consistent hash, and how do you keep serving client requests while rebalancing is going on?
A downside of the broader question is that it takes a lot more time. But the upside is that you get to see how the candidate thinks through a larger problem. You will probably have to give the candidate some guidance along the way, and you can ask the specific version of the question if the candidate's solution is O(n). How the candidate deals with the underspecification of the broad question, and how they run with guidance can be revealing. And those sorts of things are a reality of software development.