Ayende @ Rahien

My name is Ayende Rahien
Founder of Hibernating Rhinos LTD and RavenDB.
You can reach me by phone or email:


+972 52-548-6969

, @ Q c

Posts: 5,949 | Comments: 44,548

filter by tags archive

That ain’t going to take you anywhere

As part of our usual work routine, we field customer questions and inquiries. A pretty common one is to take a look at their system to make sure that they are making a good use of RavenDB.

Usually, this involves going over the code with the team, and making some specific recommendations. Merge those indexes, re-model this bit to allow for this widget to operate more cleanly, etc.

Recently we had such a review in which what I ended up saying is: “Buy a bigger server, hope this work, and rewrite this from scratch as fast as possible”.

The really annoying thing is that someone who was quite talented has obviously spent a lot of time doing a lot of really complex things to end up where they are now. It strongly reminded me of this image:


At this point, you can have the best horse in the world, but the only thing that will happen if it runs is that you are going to be messed up.

What was so bad? Well, to start with, the application was designed to work with a dynamic data model. That is probably also why RavenDB was selected, since that is a great choice for dynamic data.

Then the designers sat down and created the following system of classes:

public class Table
	public Guid TableId {get;set;}
	public List<FieldInformation> Fields {get;set;}
	public List<Reference> References {get;set;}
	public List<Constraint> Constraints {get;set;}

public class FieldInformation
	public Guid FieldId {get;set;}
	public string Name {get;set;}
	public string Type {get;set;}
	public bool Required {get;set;}

public class Reference
	public Guid ReferenceId {get;set;}
	public string Field {get;set;}
	public Guid ReferencedTableId {get;set;}

public class Instance
	public Guid InstanceId {get;set;}
	public Guid TableId {get;set;}
	public List<Guid> References {get;set;}
	public List<FieldValue> Values {get;set;}

public class FieldValue
	public Guid FieldId {get;set;}
	public string Value {get;set;}

I’ll let you draw your own conclusions about how the documents looked like, or just how many calls you needed to load a single entity instance.

For that matter, it wasn’t possible to query such a system directly, obviously, so they created a set of multi-map/reduce indexes that took this data and translated that into something resembling a real entity, then queried that.

But the number of documents, indexes and the sheer travesty going on meant that actually:

  • Saving something to RavenDB took a long time.
  • Querying was really complex.
  • The number of indexes was high
  • Just figuring out what is going on in the system was nigh impossible without a map, a guide and a lot of good luck.

Just to cap things off, this is a .NET project, and in order to connect to RavenDB they used direct REST calls using HttpClient. Blithely ignoring all the man-decades that were spent in creating a good client side experience and integration. For example, they made no use of Etags or Not-Modified-Since, so a lot of the things that RavenDB can do (even under such… hardship) to make things better weren’t supported, because the client code won’t cooperate.

I don’t generally say things like “throw this all away”, but there is no mid or long term approach that could possibly work here.

Linux, Debts and Out Of Memory Killer

Imagine that you go to the bank, and ask for a 100,000$ mortgage. The nice guy in the bank agrees to lend you the money,  and since you need to pay that in 5 installments, you take 15,000$ to the contractor, and leave the rest in the bank until it is needed. The bank is doing brisk business, and promise a lot of customers that they can get their mortgage in the bank. Since most of the mortgages are also taken in installments, the bank never actually have enough money to hand over to all lenders. But it make do.

Until one dark day when you come to the bank and ask for the rest of the money, because it is time to install the kitchen cabinets, and you need to pay for that. The nice man in the bank tell you to wait a bit, and goes to see if they have any money. At this point, it would be embarrassing to tell you that they don’t have any money to give you, because they over committed themselves. The nice man from the bank murders you and bury your body in the desert, to avoid you complaining that you didn’t get the money that you were promised.  Actually, the nice man might go ahead and kill someone else (robbing them in the process), and then give you their money. You go home happy to your blood stained kitchen cabinets.

That is how memory management works in Linux.

After this dramatic opening, let us get down to what is really going on. Linux has a major problem. Its process model means that it is stuck up a tree and the only way down is via free fall. Whenever a process wants to create another process, the standard method in Linux is to call fork() and then call execv() to execute the new binary. The problem here is what fork() does. It needs to copy the entire process state to the new process. That include all memory, handles, registers, etc.

Let us assume that we have a process that allocated 1GB of memory for reading and writing, and then called fork(). The way things are setup, it is pretty cheap to create the new process, all we need to do is duplicate the kernel data structures and we are done. However, what happens when the memory that the process allocated? The fork() call requires that both processes will have access to that memory, and also that both of them may modify it. That means that we have a copy on write situation. Whenever one of the processes modify the memory, it is forcing the OS to copy that piece of memory to another physical memory location and remap the virtual addresses.

This allows the developer to do some really cool stuff. Redis implemented its backup strategy via the fork() call. By forking and then dumping the in memory process state to disk it can get consistent snapshot of the system with almost no code. It is the OS that is responsible for maintaining that invariant.

Speaking of invariants, it also means that there is absolutely no way that Linux can manage memory properly. If we have 2 GB of RAM on the machine, and we have a 1GB process that fork()-ed, what is going to happen? Well, it was promised 1 GB of RAM, and it got that. And it was also promised by fork() that both processes will be able to modify the full 1GB of RAM. If we also have some other processes taking memory (and assuming no swap for the moment), that pretty much means that someone is going to end up holding the dirty end of the stick.

Now, Linux has a configuration option that would prevent it (vm.overcommit_memory = 2, and the over commit ratio, but that isn’t really important. I’m including this here for the nitpickers, and yes, I’m aware that you can set oom_adj = –17 to protect myself from this issue, not the point.). This tell Linux that it shouldn’t over commit. In such cases, it would mean that the fork() method call would fail, and you’ll be left with an effectively a crippled system. So, we have the potential for a broken invariant. What is going to happen now?

Well, Linux promised you memory, and after exhausting all of the physical memory, it will start paging to swap file. But that can be exhausted to. That is when the Out Of Memory Killer gets to play, and it takes an axe and start choosing a likely candidate to be mercilessly murdered. The “nice” thing about this is that there is no control over that, and you might be a perfectly well behaved process that the OOM just doesn’t like this Monday, so buh-bye!

Looking around, it seems that we aren’t the only one that had run head first into this issue. The Oracle recommendation is to set things up to panic and reboot the entire machine when this happens, and that seems… unproductive.

The problem is that as a database, we aren’t really in control of how much we allocate, and we rely on the system to tell us when we do too much. Linux has no facility to do things like warn applications that memory is low, or even letting us know that by refusing to allocate more memory. Both are things that we already support, and would be very helpful.

That is quite annoying.

.NET Packaging mess

In the past few years, we had:

  • .NET Full
  • .NET Micro
  • .NET Client Profile
  • .NET Silverlight
  • .NET Portable Class Library
  • .NET WinRT
  • Core CLR
  • Core CLR (Cloud Optimized)*
  • MessingWithYa CLR

* Can’t care enough to figure out if this is the same as the previous one or not.

In each of those cases, they offered similar, but not identical API and options. That is completely ignoring the versioning side of things ,where we have .NET 2.0 (1.0 finally died a while ago), .NET 3.5, .NET 4.0 and .NET 4.5. I don’t think that something can be done about versioning, but the packaging issue is painful.

Here is a small example why:


In each case, we need to subtly tweak the system to accommodate the new packaging option. This is pure additional cost to the system, with zero net benefit. Each time that we have to do that, we add a whole new dimension to the testing and support matrix, leaving aside the fact that the complexity of the solution is increasing.

I wouldn’t mind it so much, if it weren’t for the fact that a lot of those are effectively drive-bys, it feels. Silverlight took a lot of effort, and it is dead. WinRT took a lot of effort, and it is effectively dead.

This adds a real cost in time and effort, and it is hurting the platform as a whole.

Now users are running into issues with the Core CLR not supporting stuff that we use. So we need to rip out MEF from some of our code, and implement it ourselves just to get things in the same place as before.

Excerpts from the RavenDB Performance team reportExpensive headers, and cache effects

This ended up being a pretty obvious, in retrospect. We noticed in the profiler that we spent a lot of time working with headers. Now, RavenDB is using REST as the communication layer, so it is doing a lot with that, but we should be able to do better.

Then Tal dug into the actual implementation and found:

public string GetHeader(string key)
	if (InnerHeaders.Contains(key) == false)
		return null;
	return InnerHeaders.GetValues(key).FirstOrDefault();

public List<string> GetHeaders(string key)
	if (InnerHeaders.Contains(key) == false)
		return null;
	return InnerHeaders.GetValues(key).ToList();

public HttpHeaders InnerHeaders
		var headers = new Headers();
		foreach (var header in InnerRequest.Headers)
			if (header.Value.Count() == 1)
				headers.Add(header.Key, header.Value.First());
				headers.Add(header.Key, header.Value.ToList());

		if (InnerRequest.Content == null)
			return headers;

		foreach (var header in InnerRequest.Content.Headers)
			if (header.Value.Count() == 1)
				headers.Add(header.Key, header.Value.First());
				headers.Add(header.Key, header.Value.ToList());

		return headers;

To be fair, this implementation was created very early on, and no one ever actually spent any time looking it since (why would they? it worked, and quite well). The problem is the number of copies that we have, and the fact that to pull a since header, we have to copy all the headers, sometimes multiple times. We replaced this with code that wasn’t doing stupid stuff, and we couldn’t even find the cost of working with headers in the profiler any longer.

But that brings up a really interesting question. How could we not know about this sort of thing? I mean, this isn’t the first time that we are doing a performance pass on the system. So how come we missed this?

The answer is that in this performance pass, we are doing something different. Usually we perf-test RavenDB as you would when using it on your own systems. But for the purpose of this suite of tests, and in order to find more stuff that we can optimize, we are actually working with a stripped down client, no caching, no attempt to optimize things across the entire board. In fact, we have put RavenDB in the worst possible situation, all new work, and no chance to do any sort of optimizations, then we start seeing how all of those code paths that were rarely hit started to light up quite nicely.

Gossip much? Use cases and bad practices for gossip protocols

My previous few posts has talked about specific algorithms for gossip protocols, specifically: HyParView and Plumtrees. They dealt with the technical behavior of the system, the process in which we are sending data over the cluster to all the nodes. In this post, I want to talk a bit about what kind of messages we are going to send in such a system.

The obvious one is to try to keep the entire state of the system up to date using gossip. So whenever we make a change, we gossip about it to the entire network, and we are able to get to an eventually consistent system in which all nodes have roughly the same data. There is one problem with that, you now have a lot of nodes with the same data on them. At some point, that stop making sense. Usually gossip is used when you have a large group of servers, and just keep all the data on all the nodes is not a good idea unless your data set is very small.

So you don’t do that.  Gossip is usually used to disseminate a small data set, one that can fit comfortably inside a single machine (usually it is a very small data set, a few hundred MB at most). Let us consider a few types of messages that would fit in a gossip setting.

The obvious example is the actual topology of the whole network. A node joining up the cluster will announce its presence, and that will percolate to the entire cluster, eventually. That can allow you to have an idea (note, this isn’t a certainty) about what is the structure of the cluster, and maybe make decisions based on it.

The system wide configuration data is also a good candidate for gossip, for example, you can use gossip as a distributed service locator in the cluster. Whenever a new SMTP server comes online, it announces itself via gossip to the cluster. It is added to the list of SMTP servers in all the nodes that heard about it, and then it get used. In this kind of system, you have to take into account that servers can be down for a long period of time, and miss up on messages. Gossip does not guarantee that the messages will arrive, after all. Oh, it is going to do its best, but you need to also build an anti entropy system. If a server finds that it missed up on too much, it can request one of its peers to send it a full snapshot of the current global state as that peer know it.

Going in the same vein, nodes can gossip about the health state of the network. If I’m trying to send an email via an SMTP server, and it is down, I’m going to try another server, and let the network know that I’ve failed to talk to that particular server. If enough nodes fail to communicate with the server, that become part of the state of the system, so nodes that learned about it can avoid that server for a period of time.

Moving into a different direction, you can also do gossip queries, that can be done by sending a gossip message on the cluster with a specific query to it. A typical example might be “which node has a free 10GB that I can use?”. Such queries typically carry with them a timeout element. You send the query, and any matches are sent back to (either directly or also via gossip). After a predefined timeout, you can assume that you got all the replies that you are going to get, so you can operate on that. More interesting is when you want to query for the actual data held in each node. If we want to find all the users who logged in today, for example.

The problem with doing something like that is that you might have a large result set, and you’ll need to have some way to work with that. You don’t want to send it all to a single destination, and what would you do with it, anyway? For that reason, most of the time gossip queries are actually aggregation. We can use that to get an estimate of certain things in our cluster. If we wanted to get the number of users per country, that would be a good candidate for this, for example. Note that you won’t necessarily get accurate results, if you have failures, so there are aggregation methods for getting a good probability of the actual value.

For fun, here is an interesting exercise. Look at trending topics in a large number of conversations. In this case, whenever you would get a new message, you would analyze the topics for this message, and periodically (every second, let us say), you’ll gossip to your peers about this. In this case, we don’t just blindly pass the gossip between nodes. Instead, we’ll use a slightly different method. Each second, every node will contact its peers to send them the current trending topics in the node. Each time the trending topics change, a version number is incremented. In addition, the node also send its peer the node ids and versions of the messages it got from other nodes. The peer, in reply, will send a confirmation about all the node ids and versions that it has. So the origin node can fill in about any new information that it go, or ask to get updates for information that it doesn’t have.

This reduce the number of updates that flow throughout the cluster, while still maintain an eventually consistent model. We’ll be able to tell, from each node, what are the current trending topics globally.

Gossip much? Operating with partial information, and getting the right result.

Unlike the previous two posts, this is going to be short. Primarily because what I wanted to talk about it what impresses me most with both HyParView and Plumtree. The really nice thing about them is that they are pretty simple, easy to understand and produce good results.

But the fun part, and what make it impressive is that they manage to achieve that with a small set of simple rules, and without any attempt to create a global view. They operate just fine with potentially very small set of the data overall, but still manage to operate, self optimize and get to the correct result. In fact, I did some very minor attempts to play with this at large scale, and we see a pretty amazing emergent behavior. Without anyone knowing what is going on globally, we are able to get to the optimal number of interactions in the cluster to distribute information.

That is really pretty cool.

And because this post is too short, I’ll leave you with a question. Given that you have this kind of infrastructure, what would you do with it? What sort of information or operations would you try to send using this way?

Gossip much? The gossip epidemic and other issues in polite society

In my previous post, I talked about the Hybrid Partial View protocol, and showed a visualization about how it actually works. Something that is important to note about this protocol, it is mostly meant to create a gossip topology that is resilient to failure. It is not meant to actually send messages, it is meant to serve as the backbone topology (the peer sampling service) for figuring out what are the nodes.

The reason for that can be seen in the following 10 node cluster (after running heartbeat enough times to get to a stable state:


Let us assume that we want to disseminate a message across the cluster. We select node A as our root, and then send a message. The rules are as follow:

  • Each node send the message to all its active connections (except the sender, of course).
  • A node that got the same message twice will ignore the message.

Based on those rules, and the topology above, we’re going to have the following chain of messages:

  • F – initial broadcast
  • F -> E, G, J
  • E -> G, I
  • G -> E, H
  • J -> I, H, J
  • H -> C, I
  • C -> B, A
  • B -> D, A
  • A -> B, D
  • D -> A

The total number of messages passed is 20. Which is twice as much as the optimal solution would generate.

What is worse, this is a very small network, and as the network grows, so will the number of redundant messages. This approach (called eager gossiping) has a major advantage, because it will traverse all paths in the graph, it will also traverse all the shortest paths. That means that the time to get a message from the origin to all nodes is the smallest, but the number of operations is high.

The Plumtree paper (Epidemic Broadcast Trees) presents a solution to this problem. It tries to minimize the number of messages while still maintaining both reliability and optimizing the number of messages that are passed as well as the distance they have to pass.

The way Plumtree works is explained in quite beautiful detail in the paper, but the underlying idea goes like this, we start using the same approach as the eager gossiping, but whenever we get a message that we already got, we will reply to the source and tell it to stop sending us further messages. This is done so the next time that a message will be sent, we can skip the known duplicate path, and reduce the number of overall messages that we have.

So the first run is going to generate 20 messages on the network. The second is going to generate just 13, you can see the non traversed paths in the following image:


Note that we didn’t pass any messages between I and J, or D and A. But a lot of the saving was achieved by avoiding duplicate notifications. So node I notified node H, but not vice versa. The next time we’ll run this, we have exactly 10 messages passing:


Now, obviously this is pretty cool, but that is under a stable state. What happens when they are failures? Well, at that point, the notion of lazy vs. eager peers come into play. One of the things we did initially was to clear the duplicate paths in the network, so we can optimize the number of messages being passed. That is pretty cool, but it also leave us vulnerable to failures. For example, imagine that nod H is down. What happens then?

There are two aspects of this that are interesting. Plumtrees only care about the notion of message passing. They don’t deal with topology changes. In this case, the responsibility to join the different parts of the network lies with the peer sampling service, which is HyParView in this case. That would figure out the communication issue, and forge new connections with the remaining nodes. Plumtree will get notified about that, and the process continue.

But let us leave that one aside, let us say that we have a static topology, how would Plumtree handle this? Well, at this point you have to realize that Plumtree doesn’t just drop a connection when a node tell it that it already heard about a message. It just move it to a lazy state. Periodically, a node will contact other nodes which told it that it wasn’t needed and tell them: “Hi, I got messages with ids (43,41,81), do you have them?”. In this way, a node whose contact point went down would become aware that there are missing messages. At that point, it start a timer, and if it didn’t hear about those missing messages, it will ask the node that told it about those messages to send them over, and initiate an active link. The end result here is that we send additional messages, but those tend to be pretty small, just the message ids.

During steady state, we’ll just ignore those messages, but if there is a failure, they can help us recover from errors by letting us know that there are messages that we are missing, and taking action to recover that.

There is also another important aspect of this behavior, detecting and circumventing slow nodes. If a node is slow to distribute messages to its peers, other nodes will notify those peers that those messages exists, and if that is the case, we’ll eventually move to a more efficient topology by routing around that slow node.

You can see a full visualization of that (and admire my rapidly improving UI skills) here. The JavaScript implementation of the algorithm is here.

Plumtree has a few weaknesses, mostly it is that it is optimized for a single source topology. In other words, the first node you start from will influence the optimization of the network, and if you start a broadcast from another node, it won’t be an optimal operation. That said, there are a few ways to handle that. The actual topology remains the same, what influence Plumtree is the rejection replies from nodes that say that the information it transmitted was already received. We can keep track on not only the nodes that rejected us, but the root source of that rejection, so a message originating in E wouldn’t stop us from propagating a message originating in J.

Because Plumtree is meant for very large clusters (the paper talks about testing this with 10,000 nodes), and you might have a message originate from any one of those, you probably want to limit the notion of “origin”, if you track the past three nodes it passed through, you get a reasonably small amount of data that you have to keep, and it is likely to be accurate enough to build multiple topologies that will optimize themselves based on actual conditions.

That is it for this post, I’ve got a couple more posts that I want to write about gossips, but that would be it for today.

Gossip much? Disseminating information among high number (10K) of nodes

Every once in a while, I like to sit down and read about what is going on outside my current immediate field of interest. This weekend, I chose to focus on efficient information dissemination with very large number of nodes.

The articles of interests for this weekend are HyParView and Epidemic Broadcast Trees (Plumtrees). There are a great read, and complement one another to a nice degree. HyParView is an algorithm that seeks to connect a set (of potentially very large number) of nodes together without trying to make each node connect to each other node. To simplify things, I’m going to talk about clusters of several dozens nodes, the articles have both been tested to the 10,000 nodes and with failure rates of up to 95% of the network. This post is here so I can work out the details in my mind, it may be that I’m wrong, so don’t try to treat this as a definitive source.

Let us assume that we have a network with 15 nodes in it. And we want to add a new node. One way of doing that would be to maintain a list of all the nodes in the system (that are what admins are for, after all) and have the node connect to all the other nodes. In this way, we can communicate between all the nodes very easily. Of course, that means that the number of connections we have in a network of 16(15+ new) nodes is 120. And that utterly ignore the notion of failure. But let us continue with this path, to see what unhappy landscape it is going to land us on.

We have a 15 node cluster, and we add a new node (so we have to give it all the other nodes), and it connects to all the other nodes and register with them. So far, so good. Now, let us say that there is a state change that we want to send to all the nodes in the network. We can do that by connecting to a single node, and having it distribute this information to all the other nodes. Cost of this would be 16 (1 to talk to the first node, then 15 for it to talk to the rest). That is very efficient, and it is easy to prove that this is indeed the most optimal way to disseminate information over the network (each node is only contacted once).

In a 16 node network, maybe that is even feasible. It is a small cluster, after all. But that is a big maybe, and I wouldn’t recommend it. If we grow the cluster size to a 100 node cluster, that gives us about 4,950(!) connections between all nodes, and the cost of sending a single piece of information is still the optimal N. But I think that this is easy to see that this isn’t the way to go about it. Mostly because you can’t do that, not even for the 16 node cluster. Usually when we talk about clusters we like to think about them as flat topologies, but that isn’t actually how it goes. Let us look at a better approximation of a real topology:


Yes, this isn’t quite right, but it is good enough for our purposes.

In this 16 node cluster, we have the green node, which is the one we initially contact to send some data to the entire cluster. What would happen if we tried to talk from that node to all the other nodes? Well, notice how much load it would place on the green’s node router. Or the general cost for the network in the area of the green node. Because of that, just straight on direct connection for the entire cluster is no something that you really want to do.

An alternative to do that, assuming that you have a fixed topology is to create a static tree structure, so you start with the green node, it then contacts three other nodes, who then each contact four other nodes. We still have the nice property so that each node is only getting the new information once. But we can parallelize the communication and reduce the load on a single segment of the network.

Which is great, if we have a static topology and zero failures. In practice, none of those is true, so we want something else, and hopefully something that would make this a lot easier to handle. This is where HyParView comes into play. I sat down and wrote a whole big description of how HyParView works, but it wasn’t something that you couldn’t get from the article. And one of the things that I did along the way was create a small implementation in JavaScript and plug this into a graph visualization, so I could follow what is going on there.


That means that I had to implement the HyParView protocol in JavaScript, but it turned out to be a great way to actually explore how the thing works, and it ended up with great visualization.

You can see it in action in this url, and you can read the actual ocde for the HyParView protocol here.

Here is the cluster at 5 nodes, just after we added E:


And here it is at 9 nodes, after it had a chance to be stable.


Note that we have the connections (active view) from each node to a up to 3 other nodes, but we also have other letters next to the node name, in []. That is the passive list, the list of nodes that we are not connected to, but will try if our connection to the one of the active list goes down.

In addition to just adding themselves to one of the nodes, the nodes will also attempt to learn the topology of the network in such a way that if there is a failure, they can recover from it. The JavaScript code I wrote is not a good JavaScript code, that isn’t my forte, but it should be enough to follow what is going on there. We are able to do very little work to have a self organizing system of nodes that discover the network.

Note that in large networks, none of the nodes would have the full picture of the entire network, but each node will have a partial view of it, and that is enough to send a message through the entire network. But I’m going to talk about this in another post.

In the meantime, go to this url and see it in action, (the actual ocde for the HyParView protocol here). Note that I've made the different action explicit, so you need to do heartbeats (and the algorithm relies on them for healing failures) to get proper behavior for the system. I've also created a predictable RNG, so we can always follow the same path in our iterations.


The Raft protocol gives us a stable replicated distributed log. In other words, all servers in the cluster will agree on all the committed entries to the log (both what they are, and in what position). We usually fill the logs in operations that a state machine will execute.

In the Tail/Feather example, the commands are set/del operations on the key value store. Note that this doesn’t mean that all servers will always have the same state. It is possible that a server (or set of servers) will have an outdated view of the log, but the log that they have will match up to the point that they have it.

So, what is the problem? What happens when we have an active system? Well, every time that we make a modification, we’ll add it to the log. That is all good and great, but what about the actual log? Well, it is going to stay there, we need it so we can catch up any new server that will join the cluster. But that means that over time, we are going to have an unbounded growth. Which isn’t a very nice thing to have.

Rachis handle this by asking the state machine to implement snapshots. A way to take the current state of the state machine and transmit it over the network. For example, assume that we have an entry full of these logs:

{ Op: "Add", Key: "users/1/login-attempts", "Value": 1}
{ Op: "Add", Key: "users/1/login-attempts", "Value": 2}
{ Op: "Add", Key: "users/1/login-attempts", "Value": 3}

// ...

{ Op: "Add", Key: "users/1/login-attempts", "Value": 300000}

The log for that is 300,000 entries long, but the current state of the machine:

{ "users/1/login-attempts": 300000 }

Which is obviously much smaller. Rachis doesn’t force a state machine to implement this, but if it isn’t doing so, we can never clear the log. But implementing snapshots has its own problems.

What about the actual cost of creating the snapshot? Imagine that we ask the state machine for a snapshot every 10,000 entries. In the example above, that would mean just writing out { "users/1/login-attempts": 300000 } or whatever the actual current value is.

{ Op: "Add", Key: "users/1/login-attempts", "Value": 1}
{ Op: "Add", Key: "users/2/login-attempts", "Value": 1}
{ Op: "Add", Key: "users/3/login-attempts", "Value": 1}

// ...

{ Op: "Add", Key: "users/300000/login-attempts", "Value": 1}

Note that instead of having 300,000 changes to the same key, we are going to have 300,000 keys. In this case, writing the full list down on every snapshot is very expensive. That is what incremental backups are here to solve.  We let Voron know that this is what we want by specifying:

options.IncrementalBackupEnabled = true;

And now it is time to define policies about taking snapshots. We are going to handle this using Voron full & incremental snapshots. You can see the logic in the following code.

public void CreateSnapshot(long index, long term, ManualResetEventSlim allowFurtherModifications)
    // we have not snapshot files, so this is the first time that we create a snapshot
    // we handle that by asking voron to create a full backup
    var files = Directory.GetFiles(_storageEnvironment.Options.BasePath, "*.Snapshot");
    Array.Sort(files, StringComparer.OrdinalIgnoreCase); // make sure we get it in sort order
    if (files.Any() == false)
        DoFullBackup(index, term, allowFurtherModifications);
    string lastFullBackup = null;
    int fullBackupIndex = -1;
    for (int i = files.Length - 1; i >= 0; i--)
        if (!files[i].StartsWith("Full")) 
        fullBackupIndex = i;
        lastFullBackup = files[i];
    if (lastFullBackup == null)
        // this shouldn't be the case, we must always have at least one full backup. 
        // maybe user deleted it? We'll do a full backup here to compensate
        DoFullBackup(index, term, allowFurtherModifications);
    var fullBackupSize = new FileInfo(lastFullBackup).Length;
    var incrementalBackupsSize = files.Skip(fullBackupIndex + 1).Sum(f => new FileInfo(f).Length);

    // now we need to decide whatever to do a full or incremental backup, doing incremental backups stop 
    // making sense if they will take more space than the full backup. Our cutoff point is when it passes to 50%
    // size of the full backup.
    // If full backup size is 1 GB, and we have 25 incrmeental backups that are 600 MB in size, we need to transfer
    // 1.6 GB to restore. If we generate a new full backup, we'll only need to transfer 1 GB to restore.

    if (incrementalBackupsSize / 2 > fullBackupSize)
        DoFullBackup(index, term, allowFurtherModifications);

    DeleteOldSnapshots(files.Take(fullBackupIndex - 1));// delete snapshots older than the current full backup

    var incrementalBackup = new IncrementalBackup();
        Path.Combine(_storageEnvironment.Options.BasePath, string.Format("Inc-{0:D19}-{1:D19}.Snapshot", index, term)),
        infoNotify: Console.WriteLine,
        backupStarted: allowFurtherModifications.Set);

private void DoFullBackup(long index, long term, ManualResetEventSlim allowFurtherModifications)
    var snapshotsToDelete = Directory.GetFiles(_storageEnvironment.Options.BasePath, "*.Snapshot");

    var fullBackup = new FullBackup();
        Path.Combine(_storageEnvironment.Options.BasePath, string.Format("Full-{0:D19}-{1:D19}.Snapshot", index, term)),
        infoNotify: Console.WriteLine,
        backupStarted: allowFurtherModifications.Set


private static void DeleteOldSnapshots(IEnumerable<string> snapshotsToDelete)
    foreach (var snapshot in snapshotsToDelete)
        catch (Exception)
            // we ignore snapshots we can't delete, they are expected if we are concurrently writing
            // the snapshot and creating a new one. We'll get them the next time.

Basically, we need to strike a balance between full and incremental backups. We do that by first taking a full backup, and then starting to take incremental backups until our incremental backups takes more than 50% of the full backup, at which point we are probably better off doing another full backup. Note that we use the event of a full backup to clear the old incremental and full backup files.

And with that, we can move to actually sending the snapshot over the wire. This is exposed by the GetSnapshotWriter() method. This just shell all the responsibility to the SnapshotWriter:

public ISnapshotWriter GetSnapshotWriter()
    return new SnapshotWriter(this);

public class SnapshotWriter : ISnapshotWriter
    private readonly KeyValueStateMachine _parent;

    private List<FileStream> _files = new List<FileStream>();

    public SnapshotWriter(KeyValueStateMachine parent)
        _parent = parent;
        var files = Directory.GetFiles(_parent._storageEnvironment.Options.BasePath, "*.Snapshot");
        var fullBackupIndex = GetFullBackupIndex(files);

        if (fullBackupIndex == -1)
            throw new InvalidOperationException("Could not find a full backup file to start the snapshot writing");

        var last = Path.GetFileNameWithoutExtension(files[files.Length-1]);
        Debug.Assert(last != null);
        var parts = last.Split('-');
        if(parts.Length != 3)
            throw new InvalidOperationException("Invalid snapshot file name " + files[files.Length - 1] + ", could not figure out index & term");

        Index = long.Parse(parts[1]);
        Term = long.Parse(parts[2]);

        for (int i = fullBackupIndex; i < files.Length; i++)

    public void Dispose()
        foreach (var file in _files)

    public long Index { get; private set; }
    public long Term { get; private set; }
    public void WriteSnapshot(Stream stream)
        var writer = new BinaryWriter(stream);
        foreach (var file in _files)
writer.Write(file.Length); writer.Flush(); file.CopyTo(stream); } } }

What is going on here? We get the snapshot files, and find the latest full backup, then we open all the files that we’ll need for the snapshot (the last full backup and everything afterward). We need to open them in the constructor to lock them for deletion by the CreateSnapshot() method.

Then we just concatenate them all and send them over the wire. And getting them? That is pretty easy as well:

public void ApplySnapshot(long term, long index, Stream stream)
    var basePath = _storageEnvironment.Options.BasePath;

    foreach (var file in Directory.EnumerateFiles(basePath))

    var files = new List<string>();

    var buffer = new byte[1024*16];
    var reader = new BinaryReader(stream);
    var filesCount = reader.ReadInt32();
    if (filesCount == 0)
        throw new InvalidOperationException("Snapshot cannot contain zero files");
    for (int i = 0; i < filesCount; i++)
        var name = reader.ReadString();
        var len = reader.ReadInt64();
        using (var file = File.Create(Path.Combine(basePath, name)))
            var totalFileRead = 0;
            while (totalFileRead < len)
                var read = stream.Read(buffer, 0, (int) Math.Min(buffer.Length, len - totalFileRead));
                if (read == 0)
                    throw new EndOfStreamException();
                totalFileRead += read;
                file.Write(buffer, 0, read);
    new FullBackup().Restore(Path.Combine(basePath, files[0]), basePath);

    var options = StorageEnvironmentOptions.ForPath(basePath);
    options.IncrementalBackupEnabled = true;
    //TODO: Copy any other customizations that might have happened on the options

    new IncrementalBackup().Restore(options, files.Skip(1));

    _storageEnvironment = new StorageEnvironment(options);

    using (var tx = _storageEnvironment.NewTransaction(TransactionFlags.ReadWrite))
        var metadata = tx.ReadTree("$metadata");
        metadata.Add("last-index", EndianBitConverter.Little.GetBytes(index));
        LastAppliedIndex = index;

Unpack the snapshots from the stream, then first apply a full backup, then all the incremental backups. Make sure to update the last applied index, and we are set Smile.

Tail/Feather–The client API

As I mentioned Tail/Feather is a weekend project to test out how stuff works for real. After creating the highly available distributed key/value store, we are now in need of actually building a client API for it.

Externally, that API is going to look like this:

public class TailFeatherClient : IDisposable
public TailFeatherClient(params Uri[] nodes);

public Task Set(string key, JToken value);

public Task<JToken> Get(string key);

public Task Remove(string key);

public void Dispose();

If this wasn’t a weekend project, I would add batch support, but that isn’t important for our purposes right now. The API itself is pretty stupid, which is great, but what about the actual behavior?

We want it to be able to handle dynamic cluster changes, and we need it to be smart about it. A lot of that is shared among all operations, so the next layer of the API is:

public Task Set(string key, JToken value)
return ContactServer(client => client.GetAsync(string.Format("tailfeather/key-val/set?key={0}&val={1}",
Uri.EscapeDataString(key), Uri.EscapeDataString(value.ToString(Formatting.None)))));

public async Task<JToken> Get(string key)
var reply = await ContactServer(client => client.GetAsync(string.Format("tailfeather/key-val/del?key={0}",
var result = JObject.Load(new JsonTextReader(new StreamReader(await reply.Content.ReadAsStreamAsync())));

if (result.Value<bool>("Missing"))
return null;

return result["Value"];

public Task Remove(string key)
return ContactServer(client => client.GetAsync(string.Format("tailfeather/key-val/del?key={0}",

The actual behavior is in ContactServer:

private readonly ConcurrentDictionary<Uri, HttpClient> _cache = new ConcurrentDictionary<Uri, HttpClient>();
private Task<TailFeatherTopology> _topologyTask;

public TailFeatherClient(params Uri[] nodes)
_topologyTask = FindLatestTopology(nodes);

private HttpClient GetHttpClient(Uri node)
return _cache.GetOrAdd(node, uri => new HttpClient { BaseAddress = uri });

private async Task<TailFeatherTopology> FindLatestTopology(IEnumerable<Uri> nodes)
var tasks = nodes.Select(node => GetHttpClient(node).GetAsync("tailfeather/admin/flock")).ToArray();

await Task.WhenAny(tasks);
var topologies = new List<TailFeatherTopology>();
foreach (var task in tasks)
var message = task.Result;
if (message.IsSuccessStatusCode == false)

topologies.Add(new JsonSerializer().Deserialize<TailFeatherTopology>(
new JsonTextReader(new StreamReader(await message.Content.ReadAsStreamAsync()))));

return topologies.OrderByDescending(x => x.CommitIndex).FirstOrDefault();

private async Task<HttpResponseMessage> ContactServer(Func<HttpClient, Task<HttpResponseMessage>> operation, int retries = 3)
if (retries < 0)
throw new InvalidOperationException("Cluster is not reachable, or no leader was selected. Out of retries, aborting.");

var topology = (await _topologyTask ?? new TailFeatherTopology());

var leader = topology.AllVotingNodes.FirstOrDefault(x => x.Name == topology.CurrentLeader);
if (leader == null)
_topologyTask = FindLatestTopology(topology.AllVotingNodes.Select(x => x.Uri));
return await ContactServer(operation, retries - 1);

// now we have a leader, we need to try calling it...
var httpResponseMessage = await operation(GetHttpClient(leader.Uri));
if (httpResponseMessage.IsSuccessStatusCode == false)
// we were sent to a different server, let try that...
if (httpResponseMessage.StatusCode == HttpStatusCode.Redirect)
var redirectUri = httpResponseMessage.Headers.Location;
httpResponseMessage = await operation(GetHttpClient(redirectUri));
if (httpResponseMessage.IsSuccessStatusCode)
// we successfully contacted the redirected server, this is probably the leader, let us ask it for the topology,
// it will be there for next time we access it
_topologyTask = FindLatestTopology(new[] { redirectUri }.Union(topology.AllVotingNodes.Select(x => x.Uri)));

return httpResponseMessage;

// we couldn't get to the server, and we didn't get redirected, we'll check in the cluster in general
_topologyTask = FindLatestTopology(topology.AllVotingNodes.Select(x => x.Uri));
return await ContactServer(operation, retries - 1);

// happy path, we are done
return httpResponseMessage;

There is quite a bit going on here. But the basic idea is simple. Starting from the initial list of nodes we have, contact all of them and find the topology with the highest commit index. That means that it is the freshest, so more likely to be the current one. From the topology, we take the leader, and send all queries to the leader.

If there is any sort of errors, we’ll contact all other servers to find who we are supposed to be using now. If we can’t find it after three tries, we give us and we let the caller sort it out, probably by retrying once the cluster is in a steady state again.

Now, this is really nice, but it is falling into the heading of weekend code. That is means that this is quite far from what I would call production code. What is missing?

  • Caching the topology locally in a persistent manner so we can restart when the known servers are down from last known good topology.
  • Proper error handling, and in particular, error reporting, to make sure that we can understand what is actually is going on.
  • Features such as allowing reads from non leaders, testing, etc.

But overall, I’m quite happy with this.


No future posts left, oh my!


  1. The RavenDB Comic Strip (3):
    28 May 2015 - Part III – High availability & sleeping soundly
  2. Special Offer (2):
    27 May 2015 - 29% discount for all our products
  3. RavenDB Sharding (3):
    22 May 2015 - Adding a new shard to an existing cluster, splitting the shard
  4. Challenge (45):
    28 Apr 2015 - What is the meaning of this change?
  5. Interview question (2):
    30 Mar 2015 - fix the index
View all series



Main feed Feed Stats
Comments feed   Comments Feed Stats