Patterns for using distributed hash tablesGroups

time to read 6 min | 1064 words

Yesterday's post called them distributed in memory cache / storage, but I was reminded that the proper term for what I am talking about is distributed hash tables (DHT).

I presented the problem of dealing with DHT in this post, mainly, the fact that we have only key based access and no way to compose several actions into a single transaction. I'll let you go read that post for all the gory details, and continue on with some useful patterns for dealing with this issue.

As a reminder, here is the API that we have:

  • SET key, data, expiration
    Set the key value pair in the DHT
  • PUT key, data, expiration
    Will fail if item is already in DHT
  • GET key
    Will return null if item is not in DHT or if expired
  • DEL key
    Delete the key from the DHT
  • UPDATE key, data, version
    Update the item if the version matches

Now, let us explore the some of the more useful patterns we need to deal with. We will start with grouping of items.

One of the more annoying properties of key based access methods is... key based access. We have no way to perform a query (well, this is actually inaccurate, but we will touch it later).

Let us assume that we want to get the recent posts from the cache. How are we going to do that? Using SQL, it is very easy:

SELECT TOP 10 * FROM News ORDER BY PublishedDate DESC

We don't have anything similar using the API above. What we have instead is key based access, and it turn out we can use that to create groups, which are very useful.

A group is simple a key value pair where the value contains a list of keys. In the case of the query above, we can represent it using:

newsItems = List()
foreach news_id in GET("most recent news"):
	item = GET(news_id)
	continue if item is null // deleted news item
	newsItmes.Add(item)

The group itself would be:

{
  "most recent news" :
	[
		"NewsItem#15",
		"NewsItem#16",
		"NewsItem#17"
	]
}

But wait, why do we have to have groups, don't we violate a basic law of distributed work? SELECT N+1 ? Remember, each GET is a remote call. In this case, we make 4 remote calls, but what if we have 100 items in the recent news?

A better approach for this would be to put the item data in the group directly, like this:

{ 
	"most recent news" : 
		[ 
			"astounding occurrences in...",
			"amazing revelations...",
			"you are feeling sleepy..."
		]
}

Here, we have a single remote call, and we get all the data that we need. Much better, isn't it?

Well, sort of. The problem that we face now is that now we need to track what we are doing with regards to the most recent news. Let us say that we need to update an article. We most certainly want this update to be reflected in the most recent news as well. But in order to do that, we need to track that we put the article in the most recent news, and extending this sample a bit toward other groups that we may have in our applications, we can see that this quickly becomes an unmanageable problem.

By storing just the keys to other items in the list, we can now handle updates by updating a single key value pair, instead of updating a lot of separate items. In fact, this problem has a name, normalization / de-normalization. The tradeoffs of those are very well understood.

Actually, I wasn't as honest with you as I could be. The worst case scenario for this approach is not N + 1, it is (N modulus M) +1, where N is the number of items in the list, and M is the number of nodes in the DHT. The reason for that is that all DHT support some form of batch GET, so the following piece of pseudo code is a generic way to get a group from a DHT in as performant way as possible.

def GetGroup(groupName as string):
	items = GET(groupName)
	return EmptyList if items is null
	itemsByNode = GroupKeysByNode(items)
	futures = List()
	for kvp in ItemsByNode:
		futureGet = GET_ASYNC(kvp.Node, kvp.Items.ToArray())
		futures.Add(futureGet)
	WaitForAll(futures)
	results = List()
	for futureGet in futures:
		for item in futureGet.Results:
			results.Add(item) if item is not null
	return results

There are other optimizations we can make, but this one hits about as many of them as we can hope to without becoming very complex.

Now, remember the problem statement in the first post in this series, we have multiple clients working against the DHT in parallel. Let us assume that we have two clients that want to add an item to the most recent items. We want to preserve data integrity, so this is something that we need to think about. The way this is done is very simple:

while true:
	mostRecentItems, mostRecentItemsVersion = GetGroupWithVersion("most recent items")
	mostRecentItems.Add( GenerateCacheId(newItem) )
	result = UPDATE("most recent items", mostRecentItems, mostRecentItemsVersion)
	break if result is SuccessfulUpdate

Basically, we make a compare and exchange call, so if someone else snuck in and update the recent items group while we were doing it, we will simply fetch the list again and retry. A classic case of optimistic concurrency.

The problem occurs when we need to ensure mutual updates to two or more groups (or just two or more items in the DHT). Now, just using optimistic concurrency will not save us, because we don't have a way to control updates over two items (which may very well reside on different nodes). This will be the subject of my next post.

More posts in "Patterns for using distributed hash tables" series:

  1. (09 Aug 2008) Conclusion
  2. (20 Jul 2008) Locking
  3. (20 Jul 2008) Groups