Ayende @ Rahien

Oren Eini aka Ayende Rahien CEO of Hibernating Rhinos LTD, which develops RavenDB, a NoSQL Open Source Document Database.

You can reach me by:

oren@ravendb.net

+972 52-548-6969

Posts: 7,103 | Comments: 49,934

filter by tags archive
time to read 8 min | 1554 words

Next to last, in this series on using Distributed Hash Tables (DHT)!

The previous ones are:

    In the previous post, we covered how we can create an index that will allow us to get all items that exactly match a specific value. However, we have seen that trying to query by range is not possible using this approach.

    This white paper (PDF) covers how we can use a Trie on top of a DHT. This is an intensely interesting subject. However, since I do not intend to regurgitate either the article or the discussion on tries, I would like to show an implementation of a trie on a DHT, instead. I strongly suggest reading the article, at least.

    Note about the implementation, yes, it is not as nice as it can be, probably inefficient and don't manage a lot of the things that the white paper calls for (like splitting nodes), I am aware of that, but I choose to go with a simple implementation to demonstrate the concept. If you would like to extend the implementation to show how it should be done, I would be very interested in seeing this.

    The code for the Trie implementation can be found in the scratch pad.

    First, we have our (mocked) distributed cache:

    public class DistributedCache
    {
    	private static readonly Hashtable[] distributedCache = new[]
    	{
    		new Hashtable(), new Hashtable(), new Hashtable(),
    		new Hashtable(), new Hashtable(), new Hashtable(),
    		new Hashtable(), new Hashtable(), new Hashtable(),
    		new Hashtable(), new Hashtable(), new Hashtable(),
    		new Hashtable(), new Hashtable(), new Hashtable(),
    		new Hashtable(), new Hashtable(), new Hashtable(),
    	};
    
    	public static void PutInCache(string key, object value)
    	{
    		var table = distributedCache[Math.Abs(key.GetHashCode()) % distributedCache.Length];
    		table[key] = value;
    	}
    
    	public static object GetFromCache(string key)
    	{
    		return distributedCache[Math.Abs(key.GetHashCode()) % distributedCache.Length][key];
    	}
    }

    Next we define a Trie Node:

    public class TrieNode
    {
    	public TrieNode()
    	{
    		Children = new SortedList<string,string>();
    	}
    
    	public string PartialValue { get; set; }
    	public SortedList<string, string> Children { get; set; }
    	public string KeyInCache { get; set; }
    }

    And now we need to take a look at Trie Manager itself. It is a bit complex, so we will look at each piece in isolation. We will start from the simple methods:

    public static void PutInCacheAndUpdateTrie(KeyValuePair<string, string> word)
    {
    	DistributedCache.PutInCache(word.Key, word.Value);
    	UpdateTrie(word);
    }
    
    public static string CreateNodeKey(string val)
    {
    	return "Node: " + val;
    }

    Putting an item in the cache will physically put it in the cache, and also update the trie. Each node of the trie is prefixed with "Node: " and the actual key it represent. The Update Trie method is interesting:

    private static void UpdateTrie(KeyValuePair<string, string> word)
    {
    	string currentKey = word.Value;
    	var node = DistributedCache.GetFromCache(CreateNodeKey(currentKey)) as TrieNode;
    	while (node == null)
    	{
    		// this code assumes that the empty node is already in the cache
    		currentKey = currentKey.Substring(0, currentKey.Length - 1);
    		node = DistributedCache.GetFromCache(CreateNodeKey(currentKey)) as TrieNode;
    	}
    	if (currentKey == word.Value)// value already in trie, overwrite it
    	{
    		node.KeyInCache = word.Key;
    		return;
    	}
    	// need to create child node(s)
    	while (currentKey != word.Value)
    	{
    		currentKey = currentKey + word.Value[currentKey.Length];
    
    		var childNode = new TrieNode
    		{
    			PartialValue = currentKey,
    			KeyInCache = currentKey == word.Value ? word.Key : null
    		};
    		string value = CreateNodeKey(currentKey);
    		node.Children.Add(value, value);
    		DistributedCache.PutInCache(CreateNodeKey(currentKey), childNode);
    		node = childNode;
    	}
    }

    The overall structure is:

    • Find the highest node that match the current value
    • If it exists, overrwrite the value (we allow only a single key per value in this impl.)
    • If it doesn't exists, create all the nodes from the root (which always exists) to the current value.

    This is basically it, all we have left is actually querying the trie:

    public static IEnumerable<KeyValuePair<string, string>> GetFromCacheInRange(string start, string end)
    {
    	var lcd = GetLowestCommonDenominator(start, end);
    	var cache = (TrieNode)DistributedCache.GetFromCache(CreateNodeKey(lcd));
    	if (cache == null)
    		return new KeyValuePair<string, string>[0];
    	int indexOfFirstKey = cache.Children.IndexOfKey(lcd);
    	if (indexOfFirstKey == -1)
    		indexOfFirstKey = 0;
    	var keys = new List<string>();
    	if (IsMatchingNode(cache, start, end))
    		keys.Add(cache.KeyInCache);
    	for (int i = indexOfFirstKey; i < cache.Children.Count; i++)
    	{
    		if (ProcessNode(keys, cache.Children.Values[i], start, end) == false)
    			break;
    	}
    	var result = new Dictionary<string, string>();
    	foreach (var s in keys)
    	{
    		result.Add(s, DistributedCache.GetFromCache(s).ToString());
    	}
    	return result;
    }

    Here we find the maximum shared string in the start of both start and end, then try to get the relevant node for it. Then we iterate over all the direct children of the node and add the items to the list of keys. Finally, we resolve the list of keys from the cache. Finding the maximum shared string start is as simple as this:

    private static string GetLowestCommonDenominator(string x, string y)
    {
    	var sb = new StringBuilder();
    	for (int i = 0; i < x.Length && i < y.Length; i++)
    	{
    		if (x[i] != y[i])
    			break;
    		sb.Append(x[i]);
    	}
    	return sb.ToString();
    }

    One important implementation detail is determining if a partial value string is between the start and end delimiters that we are given. We do this using IsMatchingNode method:

    private static bool IsMatchingNode(TrieNode cache, string start, string end)
    {
    	if (cache.KeyInCache == null)
    		return false;
    
    	bool greaterThanOrEqStart;
    	string partialValue = cache.PartialValue;
    	if (partialValue.Length > start.Length)
    		greaterThanOrEqStart = partialValue.Substring(0, start.Length).CompareTo(start) >= 0;
    	else
    		greaterThanOrEqStart = partialValue.CompareTo(start) >= 0;
    	if (greaterThanOrEqStart == false)
    		return false;
    	return StillInRange(partialValue, end);
    }
    
    private static bool StillInRange(string partialValue, string end)
    {
    	bool smallerOrEqEnd;
    	if (partialValue.Length > end.Length)
    		smallerOrEqEnd = partialValue.Substring(0, end.Length).CompareTo(end) <= 0;
    	else
    		smallerOrEqEnd = partialValue.CompareTo(end) <= 0;
    	return smallerOrEqEnd;
    }

    Those are ugly, I'll admit.

    Now the only thing that we have left is how we process a node recursively:

    private static bool ProcessNode(ICollection<string> keys, 
    	string nameOfItemInCache, string start, string end)
    {
    	var node = (TrieNode)DistributedCache.GetFromCache(nameOfItemInCache);
    	if (IsMatchingNode(node, start, end))
    		keys.Add(node.KeyInCache);
    	if (StillInRange(node.PartialValue, end) == false)
    		return false;
    	foreach (var value in node.Children.Values)
    	{
    		if (ProcessNode(keys, value, start, end) == false)
    			return false;
    	}
    	return true;
    }

    Given all of that, we can now write the following client code:

    public static void Main()
    {
    	var words = new Dictionary<string, string>
    	{
    		{"foo", "hello"},
    		{"bar", "beer"},
    		{"fubar", "snow"},
    		{"welcome", "black"},
    		{"forward", "rose"},
    		{"next", "lion"},
    		{"exit", "hungry"},
    		{"seek", "monk"},
    	};
    
    	DistributedCache.PutInCache(TrieManager.CreateNodeKey(""), new TrieNode { KeyInCache = null, PartialValue = "" });
    
    	foreach (var word in words)
    	{
    		TrieManager.PutInCacheAndUpdateTrie(word);
    	}
    	var items = TrieManager.GetFromCacheInRange("ba", "bz");
    	if (items == null)
    		return;
    	foreach (var item in items)
    	{
    		Console.WriteLine(item.Key + " = " + item.Value);
    	}
    }

    With the output being:

    bar = beer
    welcome = black

    This may not seem like an impressive fit, but imagine if we stored things like Users in the DHT, not just a string. This would allow us to perform a range query over the user names and get the actual users (changing the sample code to do this is left as an exercise for the reader).

    The code for the Trie implementation can be found in the scratch pad.

    time to read 3 min | 434 words

    Will this series on using Distributed Hash Tables (DHT) ever end?

    The previous ones are:

      One of the problems of using a DHT is that the only access strategy that you have is the key. And while key based access is fast, it is also fairly limited. We can't perform queries on anything but the key. And that sucks. As it turn out, we can provide our own indexes by defining groups keyed by values.

      Let us say that we need to find all the users which have not verified their email address. In SQL, we will write:

      select * from Users where EmailVerified = 0

      Using a DHT, it is slightly more complex than that. What we need to do is to create (and maintain, sadly) the index ourselves. In other words, here is the process of writing a User to the DHT:

      PUT "User #15", user
      ADD_TO_LIST "User_EmailVerified_" + user.EmailVerified, "User #15"

      Note that I added a new operation, add_to_list, which is usually does not exists in DHT, but will significantly simply the code, checkout the post about groups to see how we can implement that ourselves.

      Note how we structure the name of the list. We will have two such lists in the system, "User_EmailVerified_True" and "User_EmailVerified_False". We can get the list of all the users that match that property by simply getting the list of values from the list and resolving each of the keys.

      This is extremely simple approach, but it does come at a cost for managing that, and remembering that we need to manage that. One problem with this approach is that this is a highly specific one. We can lookup by range, only by specific value.

      On my next post, I'll cover range queries.

      time to read 5 min | 846 words

      And yet another post in my series on using Distributed Hash Tables (DHT).

      The previous ones are:

        And in this post, I want to discuss how we can implement cross item transactions. This is a bit more complex than it sounds, because most of the DHT out there will not allow you to have any atomic operation on more than a single item. This makes sense, since DHT is well... distributed, trying to coordinate a transaction in a DHT would require a distributed transaction, something that has a very high performance penalty.

        Again, I'll say upfront that if you need transactions in a DHT, you are probably doing something wrong. A DHT is based on a key / value lookup, trying to turn that into something that resemble a RDBMS will cause problems. Use this approach with care.

        Let us consider the following case, we have Customer, Order and the association between customer and the orders. We have decided to implement this in the following fashion:

        • Customer #1 - customer data
        • Customer #1 Orders - associated orders ids
        • Order #14 - order id

        I am repeating again that a probable better way would be to aggregate all the customer information (including its orders) in the customer data, instead of spreading that around on multiple keys, but I have to use some example here, and I choose that one.

        We want to be able to add an order to the customer in a single transaction, that is, with all the usual ACID properties. Let us say that adding an order will modify customer and the associated orders as well, so we can't have just update the associated key and end with that, we have to have a smarter approach, because we are updating two keys, which must be in sync.

        One aspect of the propose solution is that we have to think ahead and design what our transaction boundary will be. In effect, this is very similar to the way we draw transaction boundaries in DDD, although in this case this is actually mandatory.

        The proposed solution is quite simple, instead of putting the data in the cache as described above, we can put it using the following approach:

        • {Customer #1 Group} Customer #1
        • {Customer #1 Group} Customer #1 Orders
        • {Customer #1 Group} Order #14

        The important bit is the use of an item group, because we have this, we can now perform an update using the following approach:

        LOCK "Customer #1 Group"
        customerGroup = GET "Customer #1 Group"
        customer, associated_orders = GET customerGroup + " Customer #1", customerGroup + " Customer #1 Orders"
        
        // Do business logic to update customer & orders
        
        customerGroup = Guid.NewGuid()
        
        PUT customerGroup + " Customer #1", customer
        PUT customerGroup + " Customer #1 Orders", associated
        
        PUT "Customer #1 Group", customerGroup
        UNLOCK "Customer #1 Group"
        // optional: delete previous version, or just let it expire

        The logic here is simple, we lock on the group key, thus preventing anyone else from updating on it. Then we get the group key and the current state of the items in the group. We update them, generate a new group key and put the new items there, finally, we update the customer group and unlock it.

        Let us look at it from the ACID properties that we manage to get:

        • Atomicity - until we update the group key, which is the final stage, all of our changes are private, and not visible to the outside world. In fact, if another "transaction" has started reading while this was running, it will get the previous version of the value, while running concurrently with this one, instead of blocking.
        • Consistency - see atomcity, since updating the group key is the last step, we always publish a consistent state.
        • Isolation - see atomicity, no one can access the new values until we publish the group key
        • Durability - this depends on the guarantees that the DHT makes

        This is, I think, a very elegant solution. I would caution you again from adopting it across the board, use a DHT as it is meant to be used, not try to force it to be a RDBMS.

        time to read 3 min | 427 words

        And yet another post in my series on using Distributed Hash Tables (DHT).

        The previous ones are:

          Right now I want to talk about item groups, or namespaces. In DHT, a namespace isn't quite the same as the one we are familiar with from programming language. It is a way to refer to a group of items in a one sweep. Usually, this is used either to retire a whole bunch of items at the same time, or redirect them (which might be the same thing). This is usually useful in caching scenarios much more than in DHT scenarios, but it is an important building block.

          Let us consider the idea of having a shop, with all its products in the DHT. We have some batch process that updates a lot of the product information, so we want to be able to retire all the products in the cache in one go. Having to go through the entire products list would be expensive (N remote calls) and will produce invalid view of the data, since some of the items will be retired while some still in the cache, etc.

          One way of dealing with that is by defining an item group. An item group is defined so:

          • {group name}_product_1
          • {group name}_product_2
          • {group name}_product_3

          The {group name} is not the actual value there, of course. Instead, {group name} refers to a key in the DHT as well. That key contains a value (usually numeric one) that we use when we query the data. So, getting product #1 from the DHT will involve:

          • GET "{group name}" - return 432
          • GET "432_product_1"

          Using this approach, we can change, in one sweep, all the items that belong to this group. Again, this is nothing but convention, but it is a very useful one. In fact, we will see in a the next post that it can be used to do some pretty interesting things.

          time to read 2 min | 329 words

          Here is another post in my series on using Distributed Hash Tables (DHT).

          The previous ones are:

          Now I want to talk about locality, and why it is important. First, the idea of locality is very simple. Put related items together, so getting them from the DHT can be done in a single call.

          Let us say that we have the following objects in our application: User, Shopping Cart and Session. We can just put them in the DHT, and they may land wherever they want, but that is not the most optimized way to treat them. A DHT call is cheap, but it is still a remote call, and we really want to minimize that. Assuming that we understand the data access patterns in the application, we can do a bit better than making three remote calls to the DHT.

          Just about any DHT will support the idea of multi key get, so we can ask the following:

          GET 'User #1', 'User #1: Session', 'User #1: Shopping Cart'

          We need to ensure that when we are writing to the DHT, we will understand that the meaningful key name for node selection is the part that comes before the colon. And the same for reading.

          Now, reading all three items, is a single remote call. And that has significant performance implications. Note that you shouldn't rely on that too much, otherwise all your data will end up in a single node, but it is a good model to use in many cases, just don't go overboard with this.

          FUTURE POSTS

          No future posts left, oh my!

          RECENT SERIES

          1. Webinar recording (12):
            15 Jan 2021 - Filtered Replication in RavenDB
          2. Production postmortem (30):
            07 Jan 2021 - The file system limitation
          3. Open Source & Money (2):
            19 Nov 2020 - Part II
          4. re (27):
            27 Oct 2020 - Investigating query performance issue in RavenDB
          5. Reminder (10):
            25 Oct 2020 - Online RavenDB In Action Workshop tomorrow via NDC
          View all series

          Syndication

          Main feed Feed Stats
          Comments feed   Comments Feed Stats