Oren Eini

CEO of RavenDB

a NoSQL Open Source Document Database

Get in touch with me:

oren@ravendb.net +972 52-548-6969

Posts: 7,511
Comments: 51,108
Privacy Policy · Terms
filter by tags archive
time to read 4 min | 665 words

Well, it looks like I finally had completed all I wanted to say about DHTs. I can now go back to talking about multi tenancy :-)

The previous ones are:

    We have gone over a lot of options for how to use a DHT / Distributed Cache. Some of them are for intellectual curiosity, some of them are of practical use. I urge you to remember that a DHT is not an RDBMS, and that both the access patterns and usage are different. Trying to force one into the other can be painful.

    In the first post in the series, I presented a list of common operations that all DHT supports:

    • PUT key, data, expiration
      Will fail if item is already in cache
    • GET key
      Will return null if item is not in cache or if expired
    • DEL key
      Delete the key from the cache
    • UPDATE key, data, version
      Update the item if the version matches

    There are a few things that I would consider important as well:

    • Batching support - the ability to send several items to storage in a node, as well as getting several items from a node. The later is usually supported, but the former is generally not.
    • Operations such as add_to_list, remove_from_list can make some operations much simpler if they are implemented by the cache rather than built on top of it.
    • Automatic recognition of common conventions:
      • "{Customer#1}_Orders", which can be translated by the client automatically to the item group name. If we enforce locality on the groups, we can even have the server resolve that for us, without having to pay the cost for the extra call. But this can be implemented on both server and client.
      • Automatic recognition of locality, so for the purpose of node matching, we will consider only the parts before the colon in a key. This way "foo:1" and "foo:2" will end up on the same node.

    Integrating indexed properties and indexed ranges are things that should happen at the client level, so your API will look something like:

    Put(item, x => x.Name, x => x.Age)
    Get<Foo>( x => x.Name, "bar")
    GetRange<Foo>( x => x.Name, "foo", "tada" )

    Those are just rough ideas, but they should tell you how to deal with this.

    Building a DHT is a really simple task, assuming that you go with the memcached model of double hashing, so adding support for such things isn't particularly hard. But do consider if this is the right solution or not in those scenario.

    Done, at last. Hope you liked the series.

    time to read 15 min | 2854 words

    Here is another post in my series on using Distributed Hash Tables (DHT).

    The previous ones are:

    In this post, I would like to handle locking. As I mentioned in the previous post in the series, updating a single item safely can be done using optimistic concurrency techniques. Updating more than a single item is... harder. Let us go over some of the issues that we have while implementing locking on a DHT. Some of them are common to locks everywhere, others are unique to using a distributed solution.

    • Locks are voluntary
    • Deadlocks
    • Orphaned locks
    • Dead nodes
    • More (expensive) Remote calls

    The first issue that we have to face is the problem that locks are voluntary. There is nothing stopping you from making a modification to an item without taking a lock. This is a matter of being careful how you write the code.

    The second is the age old issue of deadlocks. If I take lock A and than wait for B, while you took B and wait for A, we are going to have a problem. Even worse, since the DHT doesn't even have the concept of locks, we don't get deadlock detection such as we would get with DB transactions, and our facilities to diagnose that are far poorer.

    Orphaned locks happen when a client have acquired a lock, but crashed before it could release it. In this case, the DHT has no idea that something happened, but the applicative protocol is broken, and we will soon have dead locked clients all over the place. Even if the client restarts, it will not know that it need to release the lock.

    Dead nodes are a particularly interesting problem, this happen when the node that contained the lock in the DHT has crashed. Effectively erasing the lock.

    The last problem is that dealing with locks means that we now have to make to make more remote calls. We need to add two more remote calls (acquire the lock and release it) for each item we are dealing with.

    Let us start planning our locking strategy and see how we can handle this.

    Simple locking

    The basic of locking in a DHT is a simple convention. We assume that the existence of "lock of [item key]" means that the item is locked.There for, we can now write the following code to acquire and release the lock.

    def LockItem(itemKey as string):
    	while true:
    		result = PUT( "lock of "+itemKey, clientName )
    		break if result is SuccessfulUpdate
    		// we don't sleep, the remote call take care of spacing the calls in time

    And releasing the lock is simply:

    def ReleaseLock(itemKey as string):
    	DEL "lock of "+itemKey

    Note that a version with a timeout is obviously something you would like to have, but I'll leave it for the reader to implement. Here we rely on that PUT will fail if the item is already in the cache. We simply retry until we are able to acquire the lock.

    Now the code for handling updates of related items is as followed:

    def AddNewItem(newItem):
    	LockItem("most recent news")
    	LockItem("all news")
    	mostRecentItems = GET "most recent news"
    	allNews = GET "all news"
    	SET "most recent news", mostRecentItems
    	SET "all news", allNews
    	ReleaseLock("most recent news")
    	ReleaseLock("all news")

    Obviously this ignores the issue of handling failures, but this is good pseudo code. Speaking of failures, what will happen if we are now waiting for "all news" to be locked? We are currently locking "most recent news", which means that if another client want to update just that, it is likely in a bit of a problem. We need a slightly smarter strategy to handle lock acquisition issues.

    Locking acquisition strategy

    I'll start by saying that this is a topic of deep research, and I am just spouting off, so feel free to head to the nearest academic paper which will tell you how it ought to be done, with all the implications.

    Basically, the idea of acquiring  each lock one at a time is not a good idea at all. What we want is to be able to batch lock all the items we are interested at. However, we can't do that, since they may very well reside on different machines. We can simulate that and get a slightly better approach for locking as well. Note that we are still not handling timeouts, that is still your job to implement.

    def AcquireLocks( itemsToLock as (string) ) as IDisposable:
    	while true:
    		for i in range(len(itemsToLock)):
    			result = PUT("lock of " + itemsToLock[i], clientName)
    			if result is not SuccessfulUpdate:
    				for j in range(i):
    					DEL "lock of " + itemsToLock[j]
    		return DisposableAction:
    			for item in itemsToLock:
    				DEL "lock of " + item

    Note that we have a very simple back off strategy. If we failed to acquire any of the locks that we need, we will release all the locks that we have acquired so far, and try again. This means that we have reduced the chance of a deadlock. (It is actually still possible, if two clients try to acquire two list of items that are inverse to one another and long enough, to still have a deadlock in this situation).

    The code for using this is simple:

    def AddNewItem(newItem):
    	using AcquireLock("most recent news", "all items"):
    		mostRecentItems = GET "most recent news"
    		allNews = GET "all news"
    		SET "most recent news", mostRecentItems
    		SET "all news", allNews

    Note the using statement, when Dispose is call, the code that we have in DispoableAction will be invoked, releasing all our locks.

    Now we are safe with regard to failure. Or are we? What happen if we crash after we took the lock but before we could release it?

    Timed locks

    We can't assume that once a client acquired a lock, it will release it. And not amount of code review can guarantee that. If the client crashed, whatever error recovery code that we have there isn't going to run. (And to be clear, crashed, in this instance, may simply mean that the client lost connectivity because the admin stepped on the LAN cable. Code review that!)

    We need some way to recover from such orphaned locks. But the DHT doesn't have the concept of locking. There is no way we can get it to clear our locks for us. Or is there?

    One of the properties of most DHT is the ability to expire an item (this is usually the case because DHT are very close to distributed caches). We can modify out lock to include an expiration date. This will mean that we get out of orphaned locks and do not have to worry about them. Our lock code now looks like this:

    def LockItem(itemKey as string):
    	while true:
    		result = PUT( "lock of "+itemKey, clientName, TimeSpan.FromMinutes(1) )
    		break if result is SuccessfulUpdate

    We have defined an expiry, and the worst that can happen is that an orphaned lock will hang around for a short time (actually, 1 minute is a long time, but we have a reason for that) and automatically release itself if not released. The reason for choosing a relatively long time for that is that you really don't want to have the lock expire on you (and have someone snick "behind your back" and update items that you thought you owned). There is a balancing act here, we want to keep the lock time short, so an orphaned lock will release itself, but at the same time, if it is too short, if our code takes longer than average, the lock may expire on us. I'll let you decide on the appropriate value on your own.

    But there is actually another problem with the AcquireLocks method. A pretty big one.

    Violating the First Fallacy of Distributed Computing

    Violating the Fallacies of Distributed Computing is a Bad Idea. How did we do it in the AcquireLocks method?

    return DisposableAction:
    	for item in itemsToLock:
    		DEL "lock of " + item

    We based our code on the first fallacy, the network is reliable. Here, an error in releasing one of the locks means that we aren't releasing any of the others.

    Why am I pointing this out? There are a lot of problems with the code above. It is pseudo code, not meant to be real production code, after all. I am pointing this out because it is important to spot and understand these kind of issues. There isn't a problem with the pseudo code, but when I looked at it, the issue jumped at me, and it is important enough to point out, in hope it will help you get into the habit of spotting this kind of issues.

    Lock ordering

    I mentioned in passing that even the lock back off strategy that we have above is vulnerable to dead locks under certain conditions. This is the issue when you have unordered lock acquisition. A very simple solution to the problem would be to order the locks before acquiring them. This ensures that all the items are always locked in the same order, preventing us from getting into a deadlock.

    The code is trivial.

    def AcquireLocks( itemsToLock as (string) ) as IDisposable:
    	while true:
    		for i in range(len(itemsToLock)):
    			result = PUT("lock of " + itemsToLock[i], clientName)
    			if result is not SuccessfulUpdate:
    				for j in range(i):
    					DEL "lock of " + itemsToLock[j]
    		return DisposableAction:
    			for item in itemsToLock:
    				DEL "lock of " + item

    We just added a sort. But the affect is profound.

    Once we have mastered simple locks, the next thought is usually of consistency. This is a problematic topic, so let us discuss this in depth.

    Data Consistency

    Using the code above, we don't have any assured consistency for the data. What?! I can hear you ask, we are using locks to get the data, obviously we are maintaining consistency. Well, not quite. The issue that we have here is that we lock for writes, but not for reads. Locking for reads would serialize all access to the data, and we really don't want to do that.

    Let us see what I mean by inconsistent data. Using the AddNewItem code above, it is entirely possible that I will execute the following piece of code and get a failure:

    recentItems = GET ("most recent news")
    allItems = GET("all items")
    for item in recentItems:
    	assert item in allItems

    The reason is that we grab "all items" before it was updated. As I mentioned, we really do not want to lock on each read, the reason for that is simple, we would serialize all access and bring the application to a halt. We need a better approach. We need the reader writer lock.

    Reader Writer Locks

    The reader writer lock is a simple concept. We can have many readers, single writer for any particular item. We acquire a read lock when we read the item, and a write lock when we want to write to it. This way, we can be safe that we can get the data in a consistent manner. The implementation of that requires two new API calls that I haven't talked about so far:

    • INC key
      Will increment the value of a numeric item in the DHT by one.
    • DEC key
      Will decrement the value of a numeric item in the DHT by one.

    These calls aren't strict necessary, we can simulate them using UPDATE, but they save a lot of potential remote calls, so they are very valuable. Given them, we now define the following semantics:

    • "read lock of [item key]" is the read lock, whose value is the number of current readers
    • "write lock of [item key]" is the write look, value is ignored, only the existence matter

    Given that, we can build the API like this:

    def AcquireReadLock(itemKey as string):
    	while true:
    		INC "read lock of " + itemKey // state intention to start reading
    		result = GET("write lock of "+itemKey)
    		break if result is null
    		DEC "read lock of " + itemKey // release read lock
    		while result is not null: // wait until write lock is not held
    			result = GET("write lock of "+itemKey)
    def AcquireWriteLock(itemKey as string):
    	while true://acquire write lock
    		result = PUT("write lock of " +itemKey)
    		break if result is SuccessfulUpdate
    	while true:// wait until there are no more readers
    		result = GET("read lock of " + itemKey)
    		break if result == 0
    def ReleaseWriteLock(itemKey as string):
    	DEL "write lock of " + itemKey
    def ReleaseReadLock(itemKey as string):
    	DEC "read lock of " + itemKey

    Using this approach, we can read and write fairly easily without worrying about consistency.

    Yet another violation of the first fallacy!

    But wait, we have a big problem here. This code assumes that all parties play well, and we still have this IT admin that likes to trip on the LAN cables. How do we recover from such a thing?

    Frankly, using the API most DHT will give us, we can't. At least I can't think of any way of implementing reader writer lock safely in the face of crashing clients. What we need is this API:

    • INC key, guid, expriation
    • DEC key, guid

    This API will ensure that DHT will, on incrementing a value, will decrement it when the expiration has passed. However, since we might want to decrement that manually, in which case the expiration no longer apply, we need the guid as well. We pass the same guid to the DHT to tell it that it can cancel the decrement request that we have created when incrementing with an expiry.

    As I said, this API doesn't exists, but it is trivial to build.

    I spoke quite a bit about relying the first fallacy of distributed computing, but I ignored the fact that I am relying on the second one: Latency is zero.

    Using locks has significantly increased the amount of remote calls that we need to make. This is a problem. But not an insurmountable one. We can deal with that by giving locks a special treatment.

    Lock locality

    Lock locality refers to the location of the lock of an item in the DHT. If we treat the lock as just another item, the lock may reside on any node in the DHT. This lead us to the requirement of making several remote calls, to several different nodes, and to the possibility of dead nodes leaving us with abandoned locks.

    Both issues can be handled by always putting the lock on the node the item reside on. This was, even if the node is dead, the locks it held are gone together with its items, so we deal with it in the same way we would deal with any dead node, no special treatment required for the case of using a lock.

    There is also another advantage here. We can use batched calls to ensure lock & get or put & release in the same remote call. Again, this is not something that your typical DHT will support, but it is something that I consider essential for any out of process calls, including DHT.

    Rethink your options

    Now, after spending so much time talking about locking strategies for DHT, I'll point out that if you are heading this way you need to stop and think about what you are doing. In general, it is tolerable to have momentary inconsistency in the data. I would strongly suggest against applying locking as a general approach to using a DHT.

    In particular, I suggest taking a look at Amazon Dynamo to see what the constraints that they are facing.  One of the more interesting constraints is that there are no operations that are composed of more than a single item. Their approach for consistency, for that matter, is extremely interesting.

    time to read 6 min | 1064 words

    Yesterday's post called them distributed in memory cache / storage, but I was reminded that the proper term for what I am talking about is distributed hash tables (DHT).

    I presented the problem of dealing with DHT in this post, mainly, the fact that we have only key based access and no way to compose several actions into a single transaction. I'll let you go read that post for all the gory details, and continue on with some useful patterns for dealing with this issue.

    As a reminder, here is the API that we have:

    • SET key, data, expiration
      Set the key value pair in the DHT
    • PUT key, data, expiration
      Will fail if item is already in DHT
    • GET key
      Will return null if item is not in DHT or if expired
    • DEL key
      Delete the key from the DHT
    • UPDATE key, data, version
      Update the item if the version matches

    Now, let us explore the some of the more useful patterns we need to deal with. We will start with grouping of items.

    One of the more annoying properties of key based access methods is... key based access. We have no way to perform a query (well, this is actually inaccurate, but we will touch it later).

    Let us assume that we want to get the recent posts from the cache. How are we going to do that? Using SQL, it is very easy:

    SELECT TOP 10 * FROM News ORDER BY PublishedDate DESC

    We don't have anything similar using the API above. What we have instead is key based access, and it turn out we can use that to create groups, which are very useful.

    A group is simple a key value pair where the value contains a list of keys. In the case of the query above, we can represent it using:

    newsItems = List()
    foreach news_id in GET("most recent news"):
    	item = GET(news_id)
    	continue if item is null // deleted news item

    The group itself would be:

      "most recent news" :

    But wait, why do we have to have groups, don't we violate a basic law of distributed work? SELECT N+1 ? Remember, each GET is a remote call. In this case, we make 4 remote calls, but what if we have 100 items in the recent news?

    A better approach for this would be to put the item data in the group directly, like this:

    	"most recent news" : 
    			"astounding occurrences in...",
    			"amazing revelations...",
    			"you are feeling sleepy..."

    Here, we have a single remote call, and we get all the data that we need. Much better, isn't it?

    Well, sort of. The problem that we face now is that now we need to track what we are doing with regards to the most recent news. Let us say that we need to update an article. We most certainly want this update to be reflected in the most recent news as well. But in order to do that, we need to track that we put the article in the most recent news, and extending this sample a bit toward other groups that we may have in our applications, we can see that this quickly becomes an unmanageable problem.

    By storing just the keys to other items in the list, we can now handle updates by updating a single key value pair, instead of updating a lot of separate items. In fact, this problem has a name, normalization / de-normalization. The tradeoffs of those are very well understood.

    Actually, I wasn't as honest with you as I could be. The worst case scenario for this approach is not N + 1, it is (N modulus M) +1, where N is the number of items in the list, and M is the number of nodes in the DHT. The reason for that is that all DHT support some form of batch GET, so the following piece of pseudo code is a generic way to get a group from a DHT in as performant way as possible.

    def GetGroup(groupName as string):
    	items = GET(groupName)
    	return EmptyList if items is null
    	itemsByNode = GroupKeysByNode(items)
    	futures = List()
    	for kvp in ItemsByNode:
    		futureGet = GET_ASYNC(kvp.Node, kvp.Items.ToArray())
    	results = List()
    	for futureGet in futures:
    		for item in futureGet.Results:
    			results.Add(item) if item is not null
    	return results

    There are other optimizations we can make, but this one hits about as many of them as we can hope to without becoming very complex.

    Now, remember the problem statement in the first post in this series, we have multiple clients working against the DHT in parallel. Let us assume that we have two clients that want to add an item to the most recent items. We want to preserve data integrity, so this is something that we need to think about. The way this is done is very simple:

    while true:
    	mostRecentItems, mostRecentItemsVersion = GetGroupWithVersion("most recent items")
    	mostRecentItems.Add( GenerateCacheId(newItem) )
    	result = UPDATE("most recent items", mostRecentItems, mostRecentItemsVersion)
    	break if result is SuccessfulUpdate

    Basically, we make a compare and exchange call, so if someone else snuck in and update the recent items group while we were doing it, we will simply fetch the list again and retry. A classic case of optimistic concurrency.

    The problem occurs when we need to ensure mutual updates to two or more groups (or just two or more items in the DHT). Now, just using optimistic concurrency will not save us, because we don't have a way to control updates over two items (which may very well reside on different nodes). This will be the subject of my next post.


    No future posts left, oh my!


    1. Challenge (75):
      01 Jul 2024 - Efficient snapshotable state
    2. Recording (14):
      19 Jun 2024 - Building a Database Engine in C# & .NET
    3. re (33):
      28 May 2024 - Secure Drop protocol
    4. Meta Blog (2):
      23 Jan 2024 - I'm a JS Developer now
    5. Production postmortem (51):
      12 Dec 2023 - The Spawn of Denial of Service
    View all series


    Main feed Feed Stats
    Comments feed   Comments Feed Stats